Get stuck when dropping a subscription during synchronizing table
Hi,
I encountered a situation where DROP SUBSCRIPTION got stuck when
initial table sync is in progress. In my environment, I created
several tables with some data on publisher. I created subscription on
subscriber and drop subscription immediately after that. It doesn't
always happen but I often encountered it on my environment.
ps -x command shows the following.
96796 ? Ss 0:00 postgres: masahiko postgres [local] DROP
SUBSCRIPTION
96801 ? Ts 0:00 postgres: bgworker: logical replication
worker for subscription 40993 waiting
96805 ? Ss 0:07 postgres: bgworker: logical replication
worker for subscription 40993 sync 16418
96806 ? Ss 0:01 postgres: wal sender process masahiko [local] idle
96807 ? Ss 0:00 postgres: bgworker: logical replication
worker for subscription 40993 sync 16421
96808 ? Ss 0:00 postgres: wal sender process masahiko [local] idle
The DROP SUBSCRIPTION process (pid 96796) is waiting for the apply
worker process (pid 96801) to stop while holding a lock on
pg_subscription_rel. On the other hand the apply worker is waiting for
acquiring a tuple lock on pg_subscription_rel needed for heap_update.
Also table sync workers (pid 96805 and 96807) are waiting for the
apply worker process to change their status.
Also, even when DROP SUBSCRIPTION is done successfully, the table sync
worker can be orphaned because I guess that the apply worker can exit
before change status of table sync worker.
I'm using 1f30295.
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 2017-05-08 11:27, Masahiko Sawada wrote:
Hi,
I encountered a situation where DROP SUBSCRIPTION got stuck when
initial table sync is in progress. In my environment, I created
several tables with some data on publisher. I created subscription on
subscriber and drop subscription immediately after that. It doesn't
always happen but I often encountered it on my environment.ps -x command shows the following.
96796 ? Ss 0:00 postgres: masahiko postgres [local] DROP
SUBSCRIPTION
96801 ? Ts 0:00 postgres: bgworker: logical replication
worker for subscription 40993 waiting
96805 ? Ss 0:07 postgres: bgworker: logical replication
worker for subscription 40993 sync 16418
96806 ? Ss 0:01 postgres: wal sender process masahiko
[local] idle
96807 ? Ss 0:00 postgres: bgworker: logical replication
worker for subscription 40993 sync 16421
96808 ? Ss 0:00 postgres: wal sender process masahiko
[local] idle
FWIW, running
0001-WIP-Fix-off-by-one-around-GetLastImportantRecPtr.patch+
0002-WIP-Possibly-more-robust-snapbuild-approach.patch +
fix-statistics-reporting-in-logical-replication-work.patch
(on top of 44c528810)
I have encountered the same condition as well in the last few days, a
few times (I think 2 or 3 times).
Erik Rijkers
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Mon, May 8, 2017 at 7:14 PM, Erik Rijkers <er@xs4all.nl> wrote:
On 2017-05-08 11:27, Masahiko Sawada wrote:
Hi,
I encountered a situation where DROP SUBSCRIPTION got stuck when
initial table sync is in progress. In my environment, I created
several tables with some data on publisher. I created subscription on
subscriber and drop subscription immediately after that. It doesn't
always happen but I often encountered it on my environment.ps -x command shows the following.
96796 ? Ss 0:00 postgres: masahiko postgres [local] DROP
SUBSCRIPTION
96801 ? Ts 0:00 postgres: bgworker: logical replication
worker for subscription 40993 waiting
96805 ? Ss 0:07 postgres: bgworker: logical replication
worker for subscription 40993 sync 16418
96806 ? Ss 0:01 postgres: wal sender process masahiko [local]
idle
96807 ? Ss 0:00 postgres: bgworker: logical replication
worker for subscription 40993 sync 16421
96808 ? Ss 0:00 postgres: wal sender process masahiko [local]
idleFWIW, running
0001-WIP-Fix-off-by-one-around-GetLastImportantRecPtr.patch+
0002-WIP-Possibly-more-robust-snapbuild-approach.patch +
fix-statistics-reporting-in-logical-replication-work.patch(on top of 44c528810)
Thanks, which thread are these patches attached on?
I have encountered the same condition as well in the last few days, a few
times (I think 2 or 3 times).
IIUC there are two issues; one is that the deadlock can happen between
the DROP SUBSCRIPTION and the apply worker process, another one is the
table sync worker can be orphaned if the apply worker exits before
changing status. The latter might relate to another issue reported by
Jeff[1]/messages/by-id/CAMkU=1xUJKs=2etq2K7bmbY51Q7g853HLxJ7qEB2Snog9oRvDw@mail.gmail.com.
[1]: /messages/by-id/CAMkU=1xUJKs=2etq2K7bmbY51Q7g853HLxJ7qEB2Snog9oRvDw@mail.gmail.com
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 08/05/17 11:27, Masahiko Sawada wrote:
Hi,
I encountered a situation where DROP SUBSCRIPTION got stuck when
initial table sync is in progress. In my environment, I created
several tables with some data on publisher. I created subscription on
subscriber and drop subscription immediately after that. It doesn't
always happen but I often encountered it on my environment.ps -x command shows the following.
96796 ? Ss 0:00 postgres: masahiko postgres [local] DROP
SUBSCRIPTION
96801 ? Ts 0:00 postgres: bgworker: logical replication
worker for subscription 40993 waiting
96805 ? Ss 0:07 postgres: bgworker: logical replication
worker for subscription 40993 sync 16418
96806 ? Ss 0:01 postgres: wal sender process masahiko [local] idle
96807 ? Ss 0:00 postgres: bgworker: logical replication
worker for subscription 40993 sync 16421
96808 ? Ss 0:00 postgres: wal sender process masahiko [local] idleThe DROP SUBSCRIPTION process (pid 96796) is waiting for the apply
worker process (pid 96801) to stop while holding a lock on
pg_subscription_rel. On the other hand the apply worker is waiting for
acquiring a tuple lock on pg_subscription_rel needed for heap_update.
Also table sync workers (pid 96805 and 96807) are waiting for the
apply worker process to change their status.
Looks like we should kill apply before dropping dependencies.
Also, even when DROP SUBSCRIPTION is done successfully, the table sync
worker can be orphaned because I guess that the apply worker can exit
before change status of table sync worker.
Well the tablesync worker should stop itself if the subscription got
removed, but of course again the dependencies are an issue, so we should
probably kill those explicitly as well.
--
Petr Jelinek http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 2017-05-08 13:13, Masahiko Sawada wrote:
On Mon, May 8, 2017 at 7:14 PM, Erik Rijkers <er@xs4all.nl> wrote:
On 2017-05-08 11:27, Masahiko Sawada wrote:
FWIW, running
0001-WIP-Fix-off-by-one-around-GetLastImportantRecPtr.patch+
0002-WIP-Possibly-more-robust-snapbuild-approach.patch +
fix-statistics-reporting-in-logical-replication-work.patch(on top of 44c528810)
Thanks, which thread are these patches attached on?
The first two patches are here:
/messages/by-id/20170505004237.edtahvrwb3uwd5rs@alap3.anarazel.de
and last one:
/messages/by-id/22cc402c-88eb-fa35-217f-0060db2c72f0@2ndquadrant.com
( I have to include that last one or my tests fail within minutes. )
Erik Rijkers
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Mon, May 8, 2017 at 8:53 PM, Erik Rijkers <er@xs4all.nl> wrote:
On 2017-05-08 13:13, Masahiko Sawada wrote:
On Mon, May 8, 2017 at 7:14 PM, Erik Rijkers <er@xs4all.nl> wrote:
On 2017-05-08 11:27, Masahiko Sawada wrote:
FWIW, running
0001-WIP-Fix-off-by-one-around-GetLastImportantRecPtr.patch+
0002-WIP-Possibly-more-robust-snapbuild-approach.patch +
fix-statistics-reporting-in-logical-replication-work.patch(on top of 44c528810)
Thanks, which thread are these patches attached on?
The first two patches are here:
/messages/by-id/20170505004237.edtahvrwb3uwd5rs@alap3.anarazel.deand last one:
/messages/by-id/22cc402c-88eb-fa35-217f-0060db2c72f0@2ndquadrant.com( I have to include that last one or my tests fail within minutes. )
Thank you! I will look at these patches.
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Mon, May 8, 2017 at 8:42 PM, Petr Jelinek
<petr.jelinek@2ndquadrant.com> wrote:
On 08/05/17 11:27, Masahiko Sawada wrote:
Hi,
I encountered a situation where DROP SUBSCRIPTION got stuck when
initial table sync is in progress. In my environment, I created
several tables with some data on publisher. I created subscription on
subscriber and drop subscription immediately after that. It doesn't
always happen but I often encountered it on my environment.ps -x command shows the following.
96796 ? Ss 0:00 postgres: masahiko postgres [local] DROP
SUBSCRIPTION
96801 ? Ts 0:00 postgres: bgworker: logical replication
worker for subscription 40993 waiting
96805 ? Ss 0:07 postgres: bgworker: logical replication
worker for subscription 40993 sync 16418
96806 ? Ss 0:01 postgres: wal sender process masahiko [local] idle
96807 ? Ss 0:00 postgres: bgworker: logical replication
worker for subscription 40993 sync 16421
96808 ? Ss 0:00 postgres: wal sender process masahiko [local] idleThe DROP SUBSCRIPTION process (pid 96796) is waiting for the apply
worker process (pid 96801) to stop while holding a lock on
pg_subscription_rel. On the other hand the apply worker is waiting for
acquiring a tuple lock on pg_subscription_rel needed for heap_update.
Also table sync workers (pid 96805 and 96807) are waiting for the
apply worker process to change their status.Looks like we should kill apply before dropping dependencies.
Sorry, after investigated I found out that DROP SUBSCRIPTION process
is holding AccessExclusiveLock on pg_subscription (, not
pg_subscription_rel) and apply worker is waiting for acquiring a lock
on it. So I guess that the dropping dependencies are not relevant with
this. It seems to me that the main cause is that DROP SUBSCRIPTION
waits for apply worker to finish while keeping to hold
AccessExclusiveLock on pg_subscription. Perhaps we need to contrive
ways to reduce lock level somehow.
Also, even when DROP SUBSCRIPTION is done successfully, the table sync
worker can be orphaned because I guess that the apply worker can exit
before change status of table sync worker.Well the tablesync worker should stop itself if the subscription got
removed, but of course again the dependencies are an issue, so we should
probably kill those explicitly as well.
Yeah, I think that we should ensure that the apply worker exits after
killed all involved table sync workers.
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Wed, May 10, 2017 at 2:46 AM, Masahiko Sawada <sawada.mshk@gmail.com> wrote:
On Mon, May 8, 2017 at 8:42 PM, Petr Jelinek
<petr.jelinek@2ndquadrant.com> wrote:On 08/05/17 11:27, Masahiko Sawada wrote:
Hi,
I encountered a situation where DROP SUBSCRIPTION got stuck when
initial table sync is in progress. In my environment, I created
several tables with some data on publisher. I created subscription on
subscriber and drop subscription immediately after that. It doesn't
always happen but I often encountered it on my environment.ps -x command shows the following.
96796 ? Ss 0:00 postgres: masahiko postgres [local] DROP
SUBSCRIPTION
96801 ? Ts 0:00 postgres: bgworker: logical replication
worker for subscription 40993 waiting
96805 ? Ss 0:07 postgres: bgworker: logical replication
worker for subscription 40993 sync 16418
96806 ? Ss 0:01 postgres: wal sender process masahiko [local] idle
96807 ? Ss 0:00 postgres: bgworker: logical replication
worker for subscription 40993 sync 16421
96808 ? Ss 0:00 postgres: wal sender process masahiko [local] idleThe DROP SUBSCRIPTION process (pid 96796) is waiting for the apply
worker process (pid 96801) to stop while holding a lock on
pg_subscription_rel. On the other hand the apply worker is waiting for
acquiring a tuple lock on pg_subscription_rel needed for heap_update.
Also table sync workers (pid 96805 and 96807) are waiting for the
apply worker process to change their status.Looks like we should kill apply before dropping dependencies.
Sorry, after investigated I found out that DROP SUBSCRIPTION process
is holding AccessExclusiveLock on pg_subscription (, not
pg_subscription_rel) and apply worker is waiting for acquiring a lock
on it.
Hmm it seems there are two cases. One is that the apply worker waits
to acquire AccessShareLock on pg_subscription but DropSubscription
already acquired AcessExclusiveLock on it and waits for the apply
worker to finish. Another case is that the apply worker waits to
acquire a tuple lock on pg_subscrption_rel but DropSubscription (maybe
droppoing dependencies) already acquired it.
So I guess that the dropping dependencies are not relevant with
this. It seems to me that the main cause is that DROP SUBSCRIPTION
waits for apply worker to finish while keeping to hold
AccessExclusiveLock on pg_subscription. Perhaps we need to contrive
ways to reduce lock level somehow.Also, even when DROP SUBSCRIPTION is done successfully, the table sync
worker can be orphaned because I guess that the apply worker can exit
before change status of table sync worker.Well the tablesync worker should stop itself if the subscription got
removed, but of course again the dependencies are an issue, so we should
probably kill those explicitly as well.Yeah, I think that we should ensure that the apply worker exits after
killed all involved table sync workers.
Barring any objections, I'll add these two issues to open item.
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Wed, May 10, 2017 at 11:57 AM, Masahiko Sawada <sawada.mshk@gmail.com> wrote:
Barring any objections, I'll add these two issues to open item.
It seems to me that those open items have not been added yet to the
list. If I am following correctly, they could be defined as follows:
- Dropping subscription may stuck if done during tablesync.
-- Analyze deadlock issues with DROP SUBSCRIPTION and apply worker process.
-- Avoid orphaned tablesync worker if apply worker exits before
changing its status.
I am playing with the code to look at both of them... But feel free to
update this thread if I don't show up. There are no test cases, but
some well-placed pg_usleep calls should make both issues easily
reproducible. I have the gut feeling that other things are hidden
behind though.
--
Michael
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Thu, May 11, 2017 at 4:06 PM, Michael Paquier
<michael.paquier@gmail.com> wrote:
On Wed, May 10, 2017 at 11:57 AM, Masahiko Sawada <sawada.mshk@gmail.com> wrote:
Barring any objections, I'll add these two issues to open item.
It seems to me that those open items have not been added yet to the
list. If I am following correctly, they could be defined as follows:
- Dropping subscription may stuck if done during tablesync.
-- Analyze deadlock issues with DROP SUBSCRIPTION and apply worker process.
-- Avoid orphaned tablesync worker if apply worker exits before
changing its status.
Thanks, I think correct. Added it to open item.
I am playing with the code to look at both of them... But feel free to
update this thread if I don't show up. There are no test cases, but
some well-placed pg_usleep calls should make both issues easily
reproducible. I have the gut feeling that other things are hidden
behind though.
I'm also working on this, so will update it if there is.
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 11/05/17 10:10, Masahiko Sawada wrote:
On Thu, May 11, 2017 at 4:06 PM, Michael Paquier
<michael.paquier@gmail.com> wrote:On Wed, May 10, 2017 at 11:57 AM, Masahiko Sawada <sawada.mshk@gmail.com> wrote:
Barring any objections, I'll add these two issues to open item.
It seems to me that those open items have not been added yet to the
list. If I am following correctly, they could be defined as follows:
- Dropping subscription may stuck if done during tablesync.
-- Analyze deadlock issues with DROP SUBSCRIPTION and apply worker process.
I think the solution to this is to reintroduce the LWLock that was
removed and replaced with the exclusive lock on catalog [1]/messages/by-id/CAHGQGwHPi8ky-yANFfe0sgmhKtsYcQLTnKx07bW9S7-Rn1746w@mail.gmail.com. I am afraid
that correct way of handling this is to do both LWLock and catalog lock
(first LWLock under which we kill the workers and then catalog lock so
that something that prevents launcher from restarting them is held till
the end of transaction).
-- Avoid orphaned tablesync worker if apply worker exits before
changing its status.
The behavior question I have about this is if sync workers should die
when apply worker dies (ie they are tied to apply worker) or if they
should be tied to the subscription.
I guess taking down all the sync workers when apply worker has exited is
easier to solve. Of course it means that if apply worker restarts in
middle of table synchronization, the table synchronization will have to
start from scratch. That being said, in normal operation apply worker
should only exit/restart if subscription has changed or has been
dropped/disabled and I think sync workers want to exit/restart in that
situation as well.
So for example having shmem detach hook for an apply worker (or reusing
the existing one) that searches for all the other workers for same
subscription and shuts them down as well sounds like solution to this.
[1]: /messages/by-id/CAHGQGwHPi8ky-yANFfe0sgmhKtsYcQLTnKx07bW9S7-Rn1746w@mail.gmail.com
/messages/by-id/CAHGQGwHPi8ky-yANFfe0sgmhKtsYcQLTnKx07bW9S7-Rn1746w@mail.gmail.com
--
Petr Jelinek http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Thu, May 11, 2017 at 6:16 PM, Petr Jelinek
<petr.jelinek@2ndquadrant.com> wrote:
On 11/05/17 10:10, Masahiko Sawada wrote:
On Thu, May 11, 2017 at 4:06 PM, Michael Paquier
<michael.paquier@gmail.com> wrote:On Wed, May 10, 2017 at 11:57 AM, Masahiko Sawada <sawada.mshk@gmail.com> wrote:
Barring any objections, I'll add these two issues to open item.
It seems to me that those open items have not been added yet to the
list. If I am following correctly, they could be defined as follows:
- Dropping subscription may stuck if done during tablesync.
-- Analyze deadlock issues with DROP SUBSCRIPTION and apply worker process.I think the solution to this is to reintroduce the LWLock that was
removed and replaced with the exclusive lock on catalog [1]. I am afraid
that correct way of handling this is to do both LWLock and catalog lock
(first LWLock under which we kill the workers and then catalog lock so
that something that prevents launcher from restarting them is held till
the end of transaction).
I agree to reintroduce LWLock and to stop logical rep worker first and
then modify catalog. That way we can reduce catalog lock level (maybe
to RowExclusiveLock) so that apply worker can see it. Also I think
that we need to do more things like in order to prevent that we keep
to hold LWLock until end of transaction, because holding LWLock until
end of transaction is not good idea and could be cause of deadlock. So
for example we can commit the transaction in DropSubscription after
cleaned pg_subscription record and all its dependencies and then start
new transaction for the remaining work. Of course we also need to
disallow DROP SUBSCRIPTION being executed in a user transaction
though.
-- Avoid orphaned tablesync worker if apply worker exits before
changing its status.The behavior question I have about this is if sync workers should die
when apply worker dies (ie they are tied to apply worker) or if they
should be tied to the subscription.I guess taking down all the sync workers when apply worker has exited is
easier to solve. Of course it means that if apply worker restarts in
middle of table synchronization, the table synchronization will have to
start from scratch. That being said, in normal operation apply worker
should only exit/restart if subscription has changed or has been
dropped/disabled and I think sync workers want to exit/restart in that
situation as well.
I agree that sync workers are tied to the apply worker.
So for example having shmem detach hook for an apply worker (or reusing
the existing one) that searches for all the other workers for same
subscription and shuts them down as well sounds like solution to this.
Seems reasonable solution.
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Fri, May 12, 2017 at 11:24 AM, Masahiko Sawada <sawada.mshk@gmail.com> wrote:
On Thu, May 11, 2017 at 6:16 PM, Petr Jelinek
<petr.jelinek@2ndquadrant.com> wrote:On 11/05/17 10:10, Masahiko Sawada wrote:
On Thu, May 11, 2017 at 4:06 PM, Michael Paquier
<michael.paquier@gmail.com> wrote:On Wed, May 10, 2017 at 11:57 AM, Masahiko Sawada <sawada.mshk@gmail.com> wrote:
Barring any objections, I'll add these two issues to open item.
It seems to me that those open items have not been added yet to the
list. If I am following correctly, they could be defined as follows:
- Dropping subscription may stuck if done during tablesync.
-- Analyze deadlock issues with DROP SUBSCRIPTION and apply worker process.I think the solution to this is to reintroduce the LWLock that was
removed and replaced with the exclusive lock on catalog [1]. I am afraid
that correct way of handling this is to do both LWLock and catalog lock
(first LWLock under which we kill the workers and then catalog lock so
that something that prevents launcher from restarting them is held till
the end of transaction).I agree to reintroduce LWLock and to stop logical rep worker first and
then modify catalog. That way we can reduce catalog lock level (maybe
to RowExclusiveLock) so that apply worker can see it. Also I think
that we need to do more things like in order to prevent that we keep
to hold LWLock until end of transaction, because holding LWLock until
end of transaction is not good idea and could be cause of deadlock. So
for example we can commit the transaction in DropSubscription after
cleaned pg_subscription record and all its dependencies and then start
new transaction for the remaining work. Of course we also need to
disallow DROP SUBSCRIPTION being executed in a user transaction
though.
Attached two draft patches to solve these issues.
Attached 0001 patch reintroduces LogicalRepLauncherLock and makes DROP
SUBSCRIPTION keep holding it until commit. To prevent from deadlock
possibility, I disallowed DROP SUBSCRIPTION being called in a
transaction block. But there might be more sensible solution for this.
please give me feedback.
-- Avoid orphaned tablesync worker if apply worker exits before
changing its status.The behavior question I have about this is if sync workers should die
when apply worker dies (ie they are tied to apply worker) or if they
should be tied to the subscription.I guess taking down all the sync workers when apply worker has exited is
easier to solve. Of course it means that if apply worker restarts in
middle of table synchronization, the table synchronization will have to
start from scratch. That being said, in normal operation apply worker
should only exit/restart if subscription has changed or has been
dropped/disabled and I think sync workers want to exit/restart in that
situation as well.I agree that sync workers are tied to the apply worker.
So for example having shmem detach hook for an apply worker (or reusing
the existing one) that searches for all the other workers for same
subscription and shuts them down as well sounds like solution to this.Seems reasonable solution.
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
Attachments:
0001-Fix-a-deadlock-bug-between-DROP-SUBSCRIPTION-and-app.patchapplication/octet-stream; name=0001-Fix-a-deadlock-bug-between-DROP-SUBSCRIPTION-and-app.patchDownload
From 241adc15d4175d66ee96414e360c9878a5f24023 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Fri, 12 May 2017 14:28:05 +0900
Subject: [PATCH 1/2] Fix a deadlock bug between DROP SUBSCRIPTION and apply
worker.
---
src/backend/commands/subscriptioncmds.c | 48 +++++++++++++++---------------
src/backend/replication/logical/launcher.c | 8 +++++
src/backend/storage/lmgr/lwlock.c | 2 +-
src/backend/storage/lmgr/lwlocknames.txt | 1 +
4 files changed, 34 insertions(+), 25 deletions(-)
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index b76cdc5..1cb8353 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -811,18 +811,32 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
StringInfoData cmd;
/*
- * Lock pg_subscription with AccessExclusiveLock to ensure
- * that the launcher doesn't restart new worker during dropping
- * the subscription
+ * To prevent from launcher restarts the new worker, we need to
+ * keep holding LogicalRepLauncherLock until commit. On the
+ * other hand, keeping to hold LWLock until commit can be cause
+ * of deadlock. So we disallow DROP SUBSCRIPTION being called
+ * in a transaction block.
+ *
+ * XXX The command name should really be something like "DROP SUBSCRIPTION
+ * of a subscription that is associated with a replication slot", but we
+ * don't have the proper facilities for that.
+ */
+ PreventTransactionChain(isTopLevel, "DROP SUBSCRIPTION");
+
+ /*
+ * Protect against launcher restarting the worker. This lock will
+ * be released at commit.
*/
- rel = heap_open(SubscriptionRelationId, AccessExclusiveLock);
+ LWLockAcquire(LogicalRepLauncherLock, LW_EXCLUSIVE);
+
+ rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
tup = SearchSysCache2(SUBSCRIPTIONNAME, MyDatabaseId,
CStringGetDatum(stmt->subname));
if (!HeapTupleIsValid(tup))
{
- heap_close(rel, NoLock);
+ heap_close(rel, RowExclusiveLock);
if (!stmt->missing_ok)
ereport(ERROR,
@@ -844,6 +858,9 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_SUBSCRIPTION,
stmt->subname);
+ /* Kill the apply worker so that the slot becomes accessible first. */
+ logicalrep_worker_stop(subid, InvalidOid);
+
/* DROP hook for the subscription being removed */
InvokeObjectDropHook(SubscriptionRelationId, subid, 0);
@@ -873,20 +890,6 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
else
slotname = NULL;
- /*
- * Since dropping a replication slot is not transactional, the replication
- * slot stays dropped even if the transaction rolls back. So we cannot
- * run DROP SUBSCRIPTION inside a transaction block if dropping the
- * replication slot.
- *
- * XXX The command name should really be something like "DROP SUBSCRIPTION
- * of a subscription that is associated with a replication slot", but we
- * don't have the proper facilities for that.
- */
- if (slotname)
- PreventTransactionChain(isTopLevel, "DROP SUBSCRIPTION");
-
-
ObjectAddressSet(myself, SubscriptionRelationId, subid);
EventTriggerSQLDropAddObject(&myself, true, true);
@@ -901,9 +904,6 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
/* Remove any associated relation synchronization states. */
RemoveSubscriptionRel(subid, InvalidOid);
- /* Kill the apply worker so that the slot becomes accessible. */
- logicalrep_worker_stop(subid, InvalidOid);
-
/* Remove the origin tracking if exists. */
snprintf(originname, sizeof(originname), "pg_%u", subid);
originid = replorigin_by_name(originname, true);
@@ -913,7 +913,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
/* If there is no slot associated with the subscription, we can finish here. */
if (!slotname)
{
- heap_close(rel, NoLock);
+ heap_close(rel, RowExclusiveLock);
return;
}
@@ -964,7 +964,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
pfree(cmd.data);
- heap_close(rel, NoLock);
+ heap_close(rel, RowExclusiveLock);
}
/*
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 09c87d7..dfce49d 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -405,6 +405,9 @@ retry:
/*
* Stop the logical replication worker and wait until it detaches from the
* slot.
+ *
+ * The caller must hold LogicalRepLauncherLock to ensure that new workers are
+ * not being started during this function call.
*/
void
logicalrep_worker_stop(Oid subid, Oid relid)
@@ -832,6 +835,9 @@ ApplyLauncherMain(Datum main_arg)
ALLOCSET_DEFAULT_MAXSIZE);
oldctx = MemoryContextSwitchTo(subctx);
+ /* Block any concurrent DROP SUBSCRIPTION */
+ LWLockAcquire(LogicalRepLauncherLock, LW_EXCLUSIVE);
+
/* search for subscriptions to start or stop. */
sublist = get_subscription_list();
@@ -855,6 +861,8 @@ ApplyLauncherMain(Datum main_arg)
}
}
+ LWLockRelease(LogicalRepLauncherLock);
+
/* Switch back to original memory context. */
MemoryContextSwitchTo(oldctx);
/* Clean the temporary memory. */
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index 3e13394..949b333 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -494,7 +494,7 @@ RegisterLWLockTranches(void)
if (LWLockTrancheArray == NULL)
{
- LWLockTranchesAllocated = 64;
+ LWLockTranchesAllocated = 128;
LWLockTrancheArray = (char **)
MemoryContextAllocZero(TopMemoryContext,
LWLockTranchesAllocated * sizeof(char *));
diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt
index e6025ec..eda66fe 100644
--- a/src/backend/storage/lmgr/lwlocknames.txt
+++ b/src/backend/storage/lmgr/lwlocknames.txt
@@ -50,3 +50,4 @@ OldSnapshotTimeMapLock 42
BackendRandomLock 43
LogicalRepWorkerLock 44
CLogTruncationLock 45
+LogicalRepLauncherLock 46
--
2.8.1
0002-Wait-for-table-sync-worker-to-finish-when-apply-work.patchapplication/octet-stream; name=0002-Wait-for-table-sync-worker-to-finish-when-apply-work.patchDownload
From d0089cf006f4d3afbccf1923761a85afee713b91 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Fri, 12 May 2017 16:53:08 +0900
Subject: [PATCH 2/2] Wait for table sync worker to finish when apply worker
exits.
---
src/backend/replication/logical/launcher.c | 52 +++++++++++++++++++++++++++++-
src/include/replication/worker_internal.h | 1 +
2 files changed, 52 insertions(+), 1 deletion(-)
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index dfce49d..9888758 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -403,8 +403,54 @@ retry:
}
/*
+ * Stop all table sync workers associated with given subid.
+ *
+ * This function is called by apply worker. Since table sync
+ * worker associated with same subscription is launched by
+ * only the apply worker. We don't need to acquire
+ * LogicalRepLauncherLock here.
+ */
+void
+logicalrep_sync_workers_stop(Oid subid)
+{
+ List *relid_list = NIL;
+ ListCell *cell;
+ int i;
+
+ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+
+ /*
+ * Walks the workers array and get relid list that matches
+ * given subscription id.
+ */
+ for (i = 0; i < max_logical_replication_workers; i++)
+ {
+ LogicalRepWorker *w = &LogicalRepCtx->workers[i];
+
+ if (w->in_use && w->subid == subid &&
+ OidIsValid(w->relid))
+ relid_list = lappend_oid(relid_list, w->relid);
+ }
+
+ LWLockRelease(LogicalRepWorkerLock);
+
+ /* Return if there is no table sync worker associated with myself */
+ if (relid_list == NIL)
+ return;
+
+ foreach (cell, relid_list)
+ {
+ Oid relid = lfirst_oid(cell);
+
+ logicalrep_worker_stop(subid, relid);
+ }
+}
+
+/*
* Stop the logical replication worker and wait until it detaches from the
- * slot.
+ * slot. This function can be called by both logical replication launcher
+ * and apply worker to stop apply worker and table sync worker.
+ *
*
* The caller must hold LogicalRepLauncherLock to ensure that new workers are
* not being started during this function call.
@@ -573,6 +619,10 @@ logicalrep_worker_attach(int slot)
static void
logicalrep_worker_detach(void)
{
+ /* Stop all sync workers associated if apply worker */
+ if (!am_tablesync_worker())
+ logicalrep_sync_workers_stop(MyLogicalRepWorker->subid);
+
/* Block concurrent access. */
LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 26788fe..2fec0b0 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -78,6 +78,7 @@ extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
extern void logicalrep_worker_stop(Oid subid, Oid relid);
extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
+extern void logicalrep_sync_workers_stop(Oid subid);
extern int logicalrep_sync_worker_count(Oid subid);
--
2.8.1
On Mon, May 08, 2017 at 06:27:30PM +0900, Masahiko Sawada wrote:
I encountered a situation where DROP SUBSCRIPTION got stuck when
initial table sync is in progress. In my environment, I created
several tables with some data on publisher. I created subscription on
subscriber and drop subscription immediately after that. It doesn't
always happen but I often encountered it on my environment.ps -x command shows the following.
96796 ? Ss 0:00 postgres: masahiko postgres [local] DROP
SUBSCRIPTION
96801 ? Ts 0:00 postgres: bgworker: logical replication
worker for subscription 40993 waiting
96805 ? Ss 0:07 postgres: bgworker: logical replication
worker for subscription 40993 sync 16418
96806 ? Ss 0:01 postgres: wal sender process masahiko [local] idle
96807 ? Ss 0:00 postgres: bgworker: logical replication
worker for subscription 40993 sync 16421
96808 ? Ss 0:00 postgres: wal sender process masahiko [local] idleThe DROP SUBSCRIPTION process (pid 96796) is waiting for the apply
worker process (pid 96801) to stop while holding a lock on
pg_subscription_rel. On the other hand the apply worker is waiting for
acquiring a tuple lock on pg_subscription_rel needed for heap_update.
Also table sync workers (pid 96805 and 96807) are waiting for the
apply worker process to change their status.Also, even when DROP SUBSCRIPTION is done successfully, the table sync
worker can be orphaned because I guess that the apply worker can exit
before change status of table sync worker.I'm using 1f30295.
[Action required within three days. This is a generic notification.]
The above-described topic is currently a PostgreSQL 10 open item. Peter,
since you committed the patch believed to have created it, you own this open
item. If some other commit is more relevant or if this does not belong as a
v10 open item, please let us know. Otherwise, please observe the policy on
open item ownership[1]/messages/by-id/20170404140717.GA2675809@tornado.leadboat.com and send a status update within three calendar days of
this message. Include a date for your subsequent status update. Testers may
discover new open items at any time, and I want to plan to get them all fixed
well in advance of shipping v10. Consequently, I will appreciate your efforts
toward speedy resolution. Thanks.
[1]: /messages/by-id/20170404140717.GA2675809@tornado.leadboat.com
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Hello,
At Fri, 12 May 2017 17:24:07 +0900, Masahiko Sawada <sawada.mshk@gmail.com> wrote in <CAD21AoDJihMvdiZv7d_bpMPUK1G379WfxWpeanmJVn1KvEGy0Q@mail.gmail.com>
On Fri, May 12, 2017 at 11:24 AM, Masahiko Sawada <sawada.mshk@gmail.com> wrote:
On Thu, May 11, 2017 at 6:16 PM, Petr Jelinek
<petr.jelinek@2ndquadrant.com> wrote:On 11/05/17 10:10, Masahiko Sawada wrote:
On Thu, May 11, 2017 at 4:06 PM, Michael Paquier
<michael.paquier@gmail.com> wrote:On Wed, May 10, 2017 at 11:57 AM, Masahiko Sawada <sawada.mshk@gmail.com> wrote:
Barring any objections, I'll add these two issues to open item.
It seems to me that those open items have not been added yet to the
list. If I am following correctly, they could be defined as follows:
- Dropping subscription may stuck if done during tablesync.
-- Analyze deadlock issues with DROP SUBSCRIPTION and apply worker process.I think the solution to this is to reintroduce the LWLock that was
removed and replaced with the exclusive lock on catalog [1]. I am afraid
that correct way of handling this is to do both LWLock and catalog lock
(first LWLock under which we kill the workers and then catalog lock so
that something that prevents launcher from restarting them is held till
the end of transaction).I agree to reintroduce LWLock and to stop logical rep worker first and
then modify catalog. That way we can reduce catalog lock level (maybe
to RowExclusiveLock) so that apply worker can see it. Also I think
that we need to do more things like in order to prevent that we keep
to hold LWLock until end of transaction, because holding LWLock until
end of transaction is not good idea and could be cause of deadlock. So
for example we can commit the transaction in DropSubscription after
cleaned pg_subscription record and all its dependencies and then start
new transaction for the remaining work. Of course we also need to
disallow DROP SUBSCRIPTION being executed in a user transaction
though.Attached two draft patches to solve these issues.
Attached 0001 patch reintroduces LogicalRepLauncherLock and makes DROP
SUBSCRIPTION keep holding it until commit. To prevent from deadlock
possibility, I disallowed DROP SUBSCRIPTION being called in a
transaction block. But there might be more sensible solution for this.
please give me feedback.
+ * Protect against launcher restarting the worker. This lock will
+ * be released at commit.
This is wrong. COMMIT doesn't release left-over LWLocks, only
ABORT does (precisely, it seems intended to fire on ERRORs). So
with this patch, the second DROP SUBSCRIPTION is stuck on the
LWLock acquired at the first time. And as Petr said, LWLock with
such a duration seems bad.
The cause seems to be that workers ignore sigterm on certain
conditions. One of the choke points is GetSubscription, the other
is get_subscription_list. I think we can treat the both cases
without LWLocks.
The attached patch does that.
- heap_close + UnlockRelationOid in get_subscription_list() is
equivalent to one heap_close or relation_close but I took seeming
symmetricity.
- 0.5 seconds for the sleep in ApplyWorkerMain is quite
arbitrary. NAPTIME_PER_CYCLE * 1000 could be used instead.
- NULL MySubscription without SIGTERM might not need to be an
ERROR.
Any more thoughts?
FYI, I reproduced the situation by the following steps. This
effectively reproduced the situation without delay insertion for
me.
# Creating 5 tables with 100000 rows on the publisher
create table t1 (a int);
...
create table t5 (a int);
insert into t1 (select * from generate_series(0, 99999) a);
...
insert into t5 (select * from generate_series(0, 99999) a);
create publication p1 for table t1, t2, t3, t4, t5;
# Subscribe them, wait 1sec, then unsbscribe.
create table t1 (a int);
...
create table t5 (a int);
truncate t1, t2, t3, t4, t5; create subscription s1 CONNECTION 'host=/tmp port=5432 dbname=postgres' publication p1; select pg_sleep(1); drop subscription s1;
Repeated test can be performed by repeatedly enter the last line.
-- Avoid orphaned tablesync worker if apply worker exits before
changing its status.The behavior question I have about this is if sync workers should die
when apply worker dies (ie they are tied to apply worker) or if they
should be tied to the subscription.I guess taking down all the sync workers when apply worker has exited is
easier to solve. Of course it means that if apply worker restarts in
middle of table synchronization, the table synchronization will have to
start from scratch. That being said, in normal operation apply worker
should only exit/restart if subscription has changed or has been
dropped/disabled and I think sync workers want to exit/restart in that
situation as well.I agree that sync workers are tied to the apply worker.
So for example having shmem detach hook for an apply worker (or reusing
the existing one) that searches for all the other workers for same
subscription and shuts them down as well sounds like solution to this.Seems reasonable solution.
regards,
--
Kyotaro Horiguchi
NTT Open Source Software Center
Attachments:
avoid_orphan_repworker_on_immediately_drop_01.patchtext/x-patch; charset=us-asciiDownload
*** a/src/backend/replication/logical/launcher.c
--- b/src/backend/replication/logical/launcher.c
***************
*** 42,47 ****
--- 42,48 ----
#include "replication/worker_internal.h"
#include "storage/ipc.h"
+ #include "storage/lmgr.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/procsignal.h"
***************
*** 116,122 **** get_subscription_list(void)
StartTransactionCommand();
(void) GetTransactionSnapshot();
! rel = heap_open(SubscriptionRelationId, AccessShareLock);
scan = heap_beginscan_catalog(rel, 0, NULL);
while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
--- 117,131 ----
StartTransactionCommand();
(void) GetTransactionSnapshot();
! /*
! * This lock cannot be aquired while subsciption commands are updating the
! * relation. We can safely skip over for the case.
! */
! if (!ConditionalLockRelationOid(SubscriptionRelationId, AccessShareLock))
! return NIL;
!
! rel = heap_open(SubscriptionRelationId, NoLock);
!
scan = heap_beginscan_catalog(rel, 0, NULL);
while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
***************
*** 146,152 **** get_subscription_list(void)
}
heap_endscan(scan);
! heap_close(rel, AccessShareLock);
CommitTransactionCommand();
--- 155,162 ----
}
heap_endscan(scan);
! heap_close(rel, NoLock);
! UnlockRelationOid(SubscriptionRelationId, AccessShareLock);
CommitTransactionCommand();
***************
*** 403,410 **** retry:
}
/*
* Stop the logical replication worker and wait until it detaches from the
! * slot.
*/
void
logicalrep_worker_stop(Oid subid, Oid relid)
--- 413,465 ----
}
/*
+ * Stop all table sync workers associated with given subid.
+ *
+ * This function is called by apply worker. Since table sync
+ * worker associated with same subscription is launched by
+ * only the apply worker. We don't need to acquire
+ * LogicalRepLauncherLock here.
+ */
+ void
+ logicalrep_sync_workers_stop(Oid subid)
+ {
+ List *relid_list = NIL;
+ ListCell *cell;
+ int i;
+
+ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+
+ /*
+ * Walks the workers array and get relid list that matches
+ * given subscription id.
+ */
+ for (i = 0; i < max_logical_replication_workers; i++)
+ {
+ LogicalRepWorker *w = &LogicalRepCtx->workers[i];
+
+ if (w->in_use && w->subid == subid &&
+ OidIsValid(w->relid))
+ relid_list = lappend_oid(relid_list, w->relid);
+ }
+
+ LWLockRelease(LogicalRepWorkerLock);
+
+ /* Return if there is no table sync worker associated with myself */
+ if (relid_list == NIL)
+ return;
+
+ foreach (cell, relid_list)
+ {
+ Oid relid = lfirst_oid(cell);
+
+ logicalrep_worker_stop(subid, relid);
+ }
+ }
+
+ /*
* Stop the logical replication worker and wait until it detaches from the
! * slot. This function can be called by both logical replication launcher
! * and apply worker to stop apply worker and table sync worker.
*/
void
logicalrep_worker_stop(Oid subid, Oid relid)
***************
*** 570,575 **** logicalrep_worker_attach(int slot)
--- 625,634 ----
static void
logicalrep_worker_detach(void)
{
+ /* Stop all sync workers associated if apply worker */
+ if (!am_tablesync_worker())
+ logicalrep_sync_workers_stop(MyLogicalRepWorker->subid);
+
/* Block concurrent access. */
LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
*** a/src/backend/replication/logical/worker.c
--- b/src/backend/replication/logical/worker.c
***************
*** 1455,1468 **** ApplyWorkerMain(Datum main_arg)
char *myslotname;
WalRcvStreamOptions options;
- /* Attach to slot */
- logicalrep_worker_attach(worker_slot);
-
/* Setup signal handling */
pqsignal(SIGHUP, logicalrep_worker_sighup);
pqsignal(SIGTERM, logicalrep_worker_sigterm);
BackgroundWorkerUnblockSignals();
/* Initialise stats to a sanish value */
MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
--- 1455,1471 ----
char *myslotname;
WalRcvStreamOptions options;
/* Setup signal handling */
pqsignal(SIGHUP, logicalrep_worker_sighup);
pqsignal(SIGTERM, logicalrep_worker_sigterm);
BackgroundWorkerUnblockSignals();
+ /*
+ * Attach to slot. This should be after signal handling setup since
+ * signals may come as soon as attached.
+ */
+ logicalrep_worker_attach(worker_slot);
+
/* Initialise stats to a sanish value */
MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
***************
*** 1492,1498 **** ApplyWorkerMain(Datum main_arg)
ALLOCSET_DEFAULT_SIZES);
StartTransactionCommand();
oldctx = MemoryContextSwitchTo(ApplyContext);
! MySubscription = GetSubscription(MyLogicalRepWorker->subid, false);
MySubscriptionValid = true;
MemoryContextSwitchTo(oldctx);
--- 1495,1532 ----
ALLOCSET_DEFAULT_SIZES);
StartTransactionCommand();
oldctx = MemoryContextSwitchTo(ApplyContext);
!
! /*
! * Wait for the catalog is available. The subscription for this worker
! * might be already dropped. We should receive SIGTERM in the case so
! * obey it.
! */
! while (!ConditionalLockRelationOid(SubscriptionRelationId, AccessShareLock))
! {
! pg_usleep(500 * 1000L); /* 0.5s */
!
! /* We are apparently killed, exit silently. */
! if (got_SIGTERM)
! proc_exit(0);
! }
!
! MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
! UnlockRelationOid(SubscriptionRelationId, AccessShareLock);
!
! /* There's a race codition here. Check if MySubscription is valid. */
! if (MySubscription == NULL)
! {
! /* If we got SIGTERM, we are explicitly killed */
! if (got_SIGTERM)
! proc_exit(0);
!
! /* Otherwise something uncertain happned */
! ereport(ERROR,
! (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
! errmsg("subscription for this worker not found: %s",
! MySubscription->name)));
! }
!
MySubscriptionValid = true;
MemoryContextSwitchTo(oldctx);
*** a/src/include/replication/worker_internal.h
--- b/src/include/replication/worker_internal.h
***************
*** 78,83 **** extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
--- 78,84 ----
extern void logicalrep_worker_stop(Oid subid, Oid relid);
extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
+ extern void logicalrep_sync_workers_stop(Oid subid);
extern int logicalrep_sync_worker_count(Oid subid);
On Mon, May 15, 2017 at 8:02 PM, Kyotaro HORIGUCHI
<horiguchi.kyotaro@lab.ntt.co.jp> wrote:
Hello,
At Fri, 12 May 2017 17:24:07 +0900, Masahiko Sawada <sawada.mshk@gmail.com> wrote in <CAD21AoDJihMvdiZv7d_bpMPUK1G379WfxWpeanmJVn1KvEGy0Q@mail.gmail.com>
On Fri, May 12, 2017 at 11:24 AM, Masahiko Sawada <sawada.mshk@gmail.com> wrote:
On Thu, May 11, 2017 at 6:16 PM, Petr Jelinek
<petr.jelinek@2ndquadrant.com> wrote:On 11/05/17 10:10, Masahiko Sawada wrote:
On Thu, May 11, 2017 at 4:06 PM, Michael Paquier
<michael.paquier@gmail.com> wrote:On Wed, May 10, 2017 at 11:57 AM, Masahiko Sawada <sawada.mshk@gmail.com> wrote:
Barring any objections, I'll add these two issues to open item.
It seems to me that those open items have not been added yet to the
list. If I am following correctly, they could be defined as follows:
- Dropping subscription may stuck if done during tablesync.
-- Analyze deadlock issues with DROP SUBSCRIPTION and apply worker process.I think the solution to this is to reintroduce the LWLock that was
removed and replaced with the exclusive lock on catalog [1]. I am afraid
that correct way of handling this is to do both LWLock and catalog lock
(first LWLock under which we kill the workers and then catalog lock so
that something that prevents launcher from restarting them is held till
the end of transaction).I agree to reintroduce LWLock and to stop logical rep worker first and
then modify catalog. That way we can reduce catalog lock level (maybe
to RowExclusiveLock) so that apply worker can see it. Also I think
that we need to do more things like in order to prevent that we keep
to hold LWLock until end of transaction, because holding LWLock until
end of transaction is not good idea and could be cause of deadlock. So
for example we can commit the transaction in DropSubscription after
cleaned pg_subscription record and all its dependencies and then start
new transaction for the remaining work. Of course we also need to
disallow DROP SUBSCRIPTION being executed in a user transaction
though.Attached two draft patches to solve these issues.
Attached 0001 patch reintroduces LogicalRepLauncherLock and makes DROP
SUBSCRIPTION keep holding it until commit. To prevent from deadlock
possibility, I disallowed DROP SUBSCRIPTION being called in a
transaction block. But there might be more sensible solution for this.
please give me feedback.+ * Protect against launcher restarting the worker. This lock will + * be released at commit.This is wrong. COMMIT doesn't release left-over LWLocks, only
ABORT does (precisely, it seems intended to fire on ERRORs). So
with this patch, the second DROP SUBSCRIPTION is stuck on the
LWLock acquired at the first time. And as Petr said, LWLock with
such a duration seems bad.
Oh I understood. Thank you for pointing out.
The cause seems to be that workers ignore sigterm on certain
conditions. One of the choke points is GetSubscription, the other
is get_subscription_list. I think we can treat the both cases
without LWLocks.The attached patch does that.
- heap_close + UnlockRelationOid in get_subscription_list() is
equivalent to one heap_close or relation_close but I took seeming
symmetricity.- 0.5 seconds for the sleep in ApplyWorkerMain is quite
arbitrary. NAPTIME_PER_CYCLE * 1000 could be used instead.- NULL MySubscription without SIGTERM might not need to be an
ERROR.Any more thoughts?
I think the above changes can solve this issue but It seems to me that
holding AccessExclusiveLock on pg_subscription by DROP SUBSCRIPTION
until commit could lead another deadlock problem in the future. So I'd
to contrive ways to reduce lock level somehow if possible. For
example, if we change the apply launcher so that it gets the
subscription list only when pg_subscription gets invalid, apply
launcher cannot try to launch the apply worker being stopped. We
invalidate pg_subscription at commit of DROP SUBSCRIPTION and the
apply launcher can get new subscription list which doesn't include the
entry we removed. That way we can reduce lock level to
ShareUpdateExclusiveLock and solve this issue.
Also in your patch, we need to change DROP SUBSCRIPTION as well to
resolve another case I encountered, where DROP SUBSCRIPTION waits for
apply worker while holding a tuple lock on pg_subscription_rel and the
apply worker waits for same tuple on pg_subscription_rel in
SetSubscriptionRelState().
FYI, I reproduced the situation by the following steps. This
effectively reproduced the situation without delay insertion for
me.# Creating 5 tables with 100000 rows on the publisher
create table t1 (a int);
...
create table t5 (a int);
insert into t1 (select * from generate_series(0, 99999) a);
...
insert into t5 (select * from generate_series(0, 99999) a);
create publication p1 for table t1, t2, t3, t4, t5;# Subscribe them, wait 1sec, then unsbscribe.
create table t1 (a int);
...
create table t5 (a int);
truncate t1, t2, t3, t4, t5; create subscription s1 CONNECTION 'host=/tmp port=5432 dbname=postgres' publication p1; select pg_sleep(1); drop subscription s1;Repeated test can be performed by repeatedly enter the last line.
-- Avoid orphaned tablesync worker if apply worker exits before
changing its status.The behavior question I have about this is if sync workers should die
when apply worker dies (ie they are tied to apply worker) or if they
should be tied to the subscription.I guess taking down all the sync workers when apply worker has exited is
easier to solve. Of course it means that if apply worker restarts in
middle of table synchronization, the table synchronization will have to
start from scratch. That being said, in normal operation apply worker
should only exit/restart if subscription has changed or has been
dropped/disabled and I think sync workers want to exit/restart in that
situation as well.I agree that sync workers are tied to the apply worker.
So for example having shmem detach hook for an apply worker (or reusing
the existing one) that searches for all the other workers for same
subscription and shuts them down as well sounds like solution to this.Seems reasonable solution.
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Wed, May 17, 2017 at 6:58 AM, Masahiko Sawada <sawada.mshk@gmail.com> wrote:
I think the above changes can solve this issue but It seems to me that
holding AccessExclusiveLock on pg_subscription by DROP SUBSCRIPTION
until commit could lead another deadlock problem in the future. So I'd
to contrive ways to reduce lock level somehow if possible. For
example, if we change the apply launcher so that it gets the
subscription list only when pg_subscription gets invalid, apply
launcher cannot try to launch the apply worker being stopped. We
invalidate pg_subscription at commit of DROP SUBSCRIPTION and the
apply launcher can get new subscription list which doesn't include the
entry we removed. That way we can reduce lock level to
ShareUpdateExclusiveLock and solve this issue.
Also in your patch, we need to change DROP SUBSCRIPTION as well to
resolve another case I encountered, where DROP SUBSCRIPTION waits for
apply worker while holding a tuple lock on pg_subscription_rel and the
apply worker waits for same tuple on pg_subscription_rel in
SetSubscriptionRelState().
I don't really understand the issue being discussed here in any
detail, but as a general point I'd say that it might be more
productive to make the locks finer-grained rather than struggling to
reduce the lock level. For example, instead of locking all of
pg_subscription, use LockSharedObject() to lock the individual
subscription, still with AccessExclusiveLock. That means that other
accesses to that subscription also need to take a lock so that you
actually get a conflict when there should be one, but that should be
doable. I expect that trying to manage locking conflicts using only
catalog-wide locks is a doomed strategy.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Mon, May 15, 2017 at 03:28:14AM +0000, Noah Misch wrote:
On Mon, May 08, 2017 at 06:27:30PM +0900, Masahiko Sawada wrote:
I encountered a situation where DROP SUBSCRIPTION got stuck when
initial table sync is in progress. In my environment, I created
several tables with some data on publisher. I created subscription on
subscriber and drop subscription immediately after that. It doesn't
always happen but I often encountered it on my environment.ps -x command shows the following.
96796 ? Ss 0:00 postgres: masahiko postgres [local] DROP
SUBSCRIPTION
96801 ? Ts 0:00 postgres: bgworker: logical replication
worker for subscription 40993 waiting
96805 ? Ss 0:07 postgres: bgworker: logical replication
worker for subscription 40993 sync 16418
96806 ? Ss 0:01 postgres: wal sender process masahiko [local] idle
96807 ? Ss 0:00 postgres: bgworker: logical replication
worker for subscription 40993 sync 16421
96808 ? Ss 0:00 postgres: wal sender process masahiko [local] idleThe DROP SUBSCRIPTION process (pid 96796) is waiting for the apply
worker process (pid 96801) to stop while holding a lock on
pg_subscription_rel. On the other hand the apply worker is waiting for
acquiring a tuple lock on pg_subscription_rel needed for heap_update.
Also table sync workers (pid 96805 and 96807) are waiting for the
apply worker process to change their status.Also, even when DROP SUBSCRIPTION is done successfully, the table sync
worker can be orphaned because I guess that the apply worker can exit
before change status of table sync worker.I'm using 1f30295.
[Action required within three days. This is a generic notification.]
The above-described topic is currently a PostgreSQL 10 open item. Peter,
since you committed the patch believed to have created it, you own this open
item. If some other commit is more relevant or if this does not belong as a
v10 open item, please let us know. Otherwise, please observe the policy on
open item ownership[1] and send a status update within three calendar days of
this message. Include a date for your subsequent status update. Testers may
discover new open items at any time, and I want to plan to get them all fixed
well in advance of shipping v10. Consequently, I will appreciate your efforts
toward speedy resolution. Thanks.[1] /messages/by-id/20170404140717.GA2675809@tornado.leadboat.com
IMMEDIATE ATTENTION REQUIRED. This PostgreSQL 10 open item is past due for
your status update. Please reacquaint yourself with the policy on open item
ownership[1]/messages/by-id/20170404140717.GA2675809@tornado.leadboat.com and then reply immediately. If I do not hear from you by
2017-05-19 16:00 UTC, I will transfer this item to release management team
ownership without further notice.
[1]: /messages/by-id/20170404140717.GA2675809@tornado.leadboat.com
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 5/18/17 11:11, Noah Misch wrote:
IMMEDIATE ATTENTION REQUIRED. This PostgreSQL 10 open item is past due for
your status update. Please reacquaint yourself with the policy on open item
ownership[1] and then reply immediately. If I do not hear from you by
2017-05-19 16:00 UTC, I will transfer this item to release management team
ownership without further notice.
There is no progress on this issue at the moment. I will report again
next Wednesday.
--
Peter Eisentraut http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Hello,
At Thu, 18 May 2017 10:16:35 -0400, Robert Haas <robertmhaas@gmail.com> wrote in <CA+TgmobJk9QWkHp98pxWk8rMe-EC8BVdE6F9zPH6Yt1dbAGYBg@mail.gmail.com>
On Wed, May 17, 2017 at 6:58 AM, Masahiko Sawada <sawada.mshk@gmail.com> wrote:
I think the above changes can solve this issue but It seems to me that
holding AccessExclusiveLock on pg_subscription by DROP SUBSCRIPTION
until commit could lead another deadlock problem in the future. So I'd
to contrive ways to reduce lock level somehow if possible. For
example, if we change the apply launcher so that it gets the
subscription list only when pg_subscription gets invalid, apply
launcher cannot try to launch the apply worker being stopped. We
invalidate pg_subscription at commit of DROP SUBSCRIPTION and the
apply launcher can get new subscription list which doesn't include the
entry we removed. That way we can reduce lock level to
ShareUpdateExclusiveLock and solve this issue.
Also in your patch, we need to change DROP SUBSCRIPTION as well to
resolve another case I encountered, where DROP SUBSCRIPTION waits for
apply worker while holding a tuple lock on pg_subscription_rel and the
apply worker waits for same tuple on pg_subscription_rel in
SetSubscriptionRelState().
Sorry, I don't have enough time to consider this
profoundly. Perhaps will return later.
I don't really understand the issue being discussed here in any
detail, but as a general point I'd say that it might be more
productive to make the locks finer-grained rather than struggling to
reduce the lock level. For example, instead of locking all of
pg_subscription, use LockSharedObject() to lock the individual
subscription, still with AccessExclusiveLock. That means that other
accesses to that subscription also need to take a lock so that you
actually get a conflict when there should be one, but that should be
doable. I expect that trying to manage locking conflicts using only
catalog-wide locks is a doomed strategy.
Thank you for the suggestion. I think it is a bit differnt from
that. The problem here is that a replication worker may try
reading exactly the tuple for the subscription being deleted just
before responding to a received termination request. So the
finer-graind lock doesn't help.
The focus of resolving this is preventing blocking of workers
caused by DROP SUBSCRIPTION. So Sadasan's patch immediately
released the lock on pg_subscrption and uses another lock for
exclusion. My patch just give up to read the catalog when not
available.
regards,
--
Kyotaro Horiguchi
NTT Open Source Software Center
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 18/05/17 16:16, Robert Haas wrote:
On Wed, May 17, 2017 at 6:58 AM, Masahiko Sawada <sawada.mshk@gmail.com> wrote:
I think the above changes can solve this issue but It seems to me that
holding AccessExclusiveLock on pg_subscription by DROP SUBSCRIPTION
until commit could lead another deadlock problem in the future. So I'd
to contrive ways to reduce lock level somehow if possible. For
example, if we change the apply launcher so that it gets the
subscription list only when pg_subscription gets invalid, apply
launcher cannot try to launch the apply worker being stopped. We
invalidate pg_subscription at commit of DROP SUBSCRIPTION and the
apply launcher can get new subscription list which doesn't include the
entry we removed. That way we can reduce lock level to
ShareUpdateExclusiveLock and solve this issue.
Also in your patch, we need to change DROP SUBSCRIPTION as well to
resolve another case I encountered, where DROP SUBSCRIPTION waits for
apply worker while holding a tuple lock on pg_subscription_rel and the
apply worker waits for same tuple on pg_subscription_rel in
SetSubscriptionRelState().I don't really understand the issue being discussed here in any
detail, but as a general point I'd say that it might be more
productive to make the locks finer-grained rather than struggling to
reduce the lock level. For example, instead of locking all of
pg_subscription, use LockSharedObject() to lock the individual
subscription, still with AccessExclusiveLock. That means that other
accesses to that subscription also need to take a lock so that you
actually get a conflict when there should be one, but that should be
doable. I expect that trying to manage locking conflicts using only
catalog-wide locks is a doomed strategy.
We do LockSharedObject() but it's rather useless the way it's done now
as no other access locks it. We can't block all other accesses however,
the workers need to be able to access the catalog during clean shutdown
in some situations. What we need is to block starting of new workers for
that subscription so only those code paths would need to block. So I
think we might want to do both finer-grained locking and decreasing lock
level.
--
Petr Jelinek http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Hi,
I finally had time to properly analyze this, and turns out we've been
all just trying to fix symptoms and the actual problems.
All the locking works just fine the way it is in master. The issue with
deadlock with apply comes from the wrong handling of the SIGTERM in the
apply (we didn't set InterruptPending). I changed the SIGTERM handler in
patch 0001 just to die which is actually the correct behavior for apply
workers. I also moved the connection cleanup code to the
before_shmem_exit callback (similar to walreceiver) and now that part
works correctly.
The issue with orphaned sync workers is actually two separate issues.
First, due to thinko we always searched for sync worker in
wait_for_sync_status_change instead of searching for opposite worker as
was the intention (i.e. sync worker should search for apply and apply
should search for sync). Thats fixed by 0002. And second, we didn't
accept any invalidation messages until the whole sync process finished
(because it flattens all the remote transactions in the single one) so
sync worker didn't learn about subscription changes/drop until it has
finished, which I now fixed in 0003.
There is still outstanding issue that sync worker will keep running
inside the long COPY because the invalidation messages are also not
processed until it finishes but all the original issues reported here
disappear for me with the attached patches applied.
--
Petr Jelinek http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
Attachments:
0003-Receive-invalidation-messages-correctly-in-tablesync.patchbinary/octet-stream; name=0003-Receive-invalidation-messages-correctly-in-tablesync.patchDownload
From dcfb9952ca3d26fe7aaf3cadaa81f2fa1d60c9d7 Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmodos@pjmodos.net>
Date: Wed, 24 May 2017 21:07:15 +0200
Subject: [PATCH 3/3] Receive invalidation messages correctly in tablesync
worker
---
src/backend/replication/logical/worker.c | 23 +++++++++++++++--------
1 file changed, 15 insertions(+), 8 deletions(-)
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 971f76b..87e8470 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -118,7 +118,7 @@ static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
static void store_flush_position(XLogRecPtr remote_lsn);
-static void reread_subscription(void);
+static void maybe_reread_subscription(void);
/* Flags set by signal handlers */
static volatile sig_atomic_t got_SIGHUP = false;
@@ -165,8 +165,7 @@ ensure_transaction(void)
StartTransactionCommand();
- if (!MySubscriptionValid)
- reread_subscription();
+ maybe_reread_subscription();
MemoryContextSwitchTo(ApplyMessageContext);
return true;
@@ -463,6 +462,12 @@ apply_handle_commit(StringInfo s)
store_flush_position(commit_data.end_lsn);
}
+ else
+ {
+ /* Process any invalidation messages that might have accumulated. */
+ AcceptInvalidationMessages();
+ maybe_reread_subscription();
+ }
in_remote_transaction = false;
@@ -1119,8 +1124,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
* now.
*/
AcceptInvalidationMessages();
- if (!MySubscriptionValid)
- reread_subscription();
+ maybe_reread_subscription();
/* Process any table synchronization changes. */
process_syncing_tables(last_received);
@@ -1302,17 +1306,20 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
last_flushpos = flushpos;
}
-
/*
- * Reread subscription info and exit on change.
+ * Reread subscription info if needed. Most changes will be exit.
*/
static void
-reread_subscription(void)
+maybe_reread_subscription(void)
{
MemoryContext oldctx;
Subscription *newsub;
bool started_tx = false;
+ /* When cache state is valid there is nothing to do here. */
+ if (MySubscriptionValid)
+ return;
+
/* This function might be called inside or outside of transaction. */
if (!IsTransactionState())
{
--
2.7.4
0002-Make-tablesync-worker-exit-when-apply-dies-while-it-.patchbinary/octet-stream; name=0002-Make-tablesync-worker-exit-when-apply-dies-while-it-.patchDownload
From e0a7a2a8ae3479cad609ea4a60fca49910fe0ed0 Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmodos@pjmodos.net>
Date: Wed, 24 May 2017 20:37:12 +0200
Subject: [PATCH 2/3] Make tablesync worker exit when apply dies while it was
waiting for it
---
src/backend/replication/logical/tablesync.c | 21 ++++++++++++++++++---
1 file changed, 18 insertions(+), 3 deletions(-)
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index b92cc85..fe94d45 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -144,7 +144,12 @@ finish_sync_worker(void)
/*
* Wait until the table synchronization change.
*
- * Returns false if the relation subscription state disappeared.
+ * If called from apply worker, it will wait for the synchronization worker
+ * to change table state in shmem, and when called from synchronization
+ * worker it will wait for apply worker to change table state in shmem.
+ *
+ * Returns false if the opposite worker has disappeared or table state has
+ * been reset.
*/
static bool
wait_for_sync_status_change(Oid relid, char origstate)
@@ -159,14 +164,24 @@ wait_for_sync_status_change(Oid relid, char origstate)
CHECK_FOR_INTERRUPTS();
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+
+ /* Check if the opposite worker is still running and bail if not. */
worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
- relid, false);
+ IsTablesyncWorker() ? InvalidOid : relid,
+ false);
if (!worker)
{
LWLockRelease(LogicalRepWorkerLock);
return false;
}
+
+ /* From now on worker is expected to be synchronization worker. */
+ if (IsTablesyncWorker())
+ worker = MyLogicalRepWorker;
+
+ Assert(worker->relid == relid);
state = worker->relstate;
+
LWLockRelease(LogicalRepWorkerLock);
if (state == SUBREL_STATE_UNKNOWN)
@@ -177,7 +192,7 @@ wait_for_sync_status_change(Oid relid, char origstate)
rc = WaitLatch(&MyProc->procLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
- 10000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
+ 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
/* emergency bailout if postmaster has died */
if (rc & WL_POSTMASTER_DEATH)
--
2.7.4
0001-Fix-signal-handling-in-logical-workers.patchbinary/octet-stream; name=0001-Fix-signal-handling-in-logical-workers.patchDownload
From 55367abdf4ce81e022211cb94e8d036e916069c9 Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmodos@pjmodos.net>
Date: Wed, 24 May 2017 02:27:38 +0200
Subject: [PATCH 1/3] Fix signal handling in logical workers
---
src/backend/replication/logical/launcher.c | 22 +++++++-----
src/backend/replication/logical/tablesync.c | 23 +++++++-----
src/backend/replication/logical/worker.c | 54 ++++++++++++++++++++---------
src/backend/tcop/postgres.c | 5 +++
src/include/replication/logicalworker.h | 3 ++
src/include/replication/worker_internal.h | 10 ------
6 files changed, 74 insertions(+), 43 deletions(-)
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 4e2c350..2e80b4c 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -80,8 +80,8 @@ static void logicalrep_worker_detach(void);
static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
/* Flags set by signal handlers */
-volatile sig_atomic_t got_SIGHUP = false;
-volatile sig_atomic_t got_SIGTERM = false;
+static volatile sig_atomic_t got_SIGHUP = false;
+static volatile sig_atomic_t got_SIGTERM = false;
static bool on_commit_launcher_wakeup = false;
@@ -614,12 +614,18 @@ logicalrep_launcher_onexit(int code, Datum arg)
static void
logicalrep_worker_onexit(int code, Datum arg)
{
+ /* Disconnect gracefully from the remote side. */
+ if (wrconn)
+ walrcv_disconnect(wrconn);
+
logicalrep_worker_detach();
+
+ ApplyLauncherWakeup();
}
/* SIGTERM: set flag to exit at next convenient time */
-void
-logicalrep_worker_sigterm(SIGNAL_ARGS)
+static void
+logicalrep_launcher_sigterm(SIGNAL_ARGS)
{
int save_errno = errno;
@@ -632,8 +638,8 @@ logicalrep_worker_sigterm(SIGNAL_ARGS)
}
/* SIGHUP: set flag to reload configuration at next convenient time */
-void
-logicalrep_worker_sighup(SIGNAL_ARGS)
+static void
+logicalrep_launcher_sighup(SIGNAL_ARGS)
{
int save_errno = errno;
@@ -793,8 +799,8 @@ ApplyLauncherMain(Datum main_arg)
before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
/* Establish signal handlers. */
- pqsignal(SIGHUP, logicalrep_worker_sighup);
- pqsignal(SIGTERM, logicalrep_worker_sigterm);
+ pqsignal(SIGHUP, logicalrep_launcher_sighup);
+ pqsignal(SIGTERM, logicalrep_launcher_sigterm);
BackgroundWorkerUnblockSignals();
/* Make it easy to identify our processes. */
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 1e3753b..b92cc85 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -97,6 +97,7 @@
#include "replication/logicallauncher.h"
#include "replication/logicalrelation.h"
+#include "replication/logicalworker.h"
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
@@ -137,7 +138,6 @@ finish_sync_worker(void)
(errmsg("logical replication synchronization worker finished processing")));
/* Stop gracefully */
- walrcv_disconnect(wrconn);
proc_exit(0);
}
@@ -152,10 +152,12 @@ wait_for_sync_status_change(Oid relid, char origstate)
int rc;
char state = origstate;
- while (!got_SIGTERM)
+ for (;;)
{
LogicalRepWorker *worker;
+ CHECK_FOR_INTERRUPTS();
+
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
relid, false);
@@ -476,7 +478,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
void
process_syncing_tables(XLogRecPtr current_lsn)
{
- if (am_tablesync_worker())
+ if (IsTablesyncWorker())
process_syncing_tables_for_sync(current_lsn);
else
process_syncing_tables_for_apply(current_lsn);
@@ -533,7 +535,7 @@ copy_read_data(void *outbuf, int minread, int maxread)
bytesread += avail;
}
- while (!got_SIGTERM && maxread > 0 && bytesread < minread)
+ while (maxread > 0 && bytesread < minread)
{
pgsocket fd = PGINVALID_SOCKET;
int rc;
@@ -587,10 +589,6 @@ copy_read_data(void *outbuf, int minread, int maxread)
ResetLatch(&MyProc->procLatch);
}
- /* Check for exit condition. */
- if (got_SIGTERM)
- proc_exit(0);
-
return bytesread;
}
@@ -910,3 +908,12 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
return slotname;
}
+
+/*
+ * Is current process a tablesync worker?
+ */
+bool
+IsTablesyncWorker(void)
+{
+ return MyLogicalRepWorker != NULL && OidIsValid(MyLogicalRepWorker->relid);
+}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 7d1787d..971f76b 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -72,6 +72,8 @@
#include "storage/proc.h"
#include "storage/procarray.h"
+#include "tcop/tcopprot.h"
+
#include "utils/builtins.h"
#include "utils/catcache.h"
#include "utils/datum.h"
@@ -118,6 +120,9 @@ static void store_flush_position(XLogRecPtr remote_lsn);
static void reread_subscription(void);
+/* Flags set by signal handlers */
+static volatile sig_atomic_t got_SIGHUP = false;
+
/*
* Should this worker apply changes for given relation.
*
@@ -134,7 +139,7 @@ static void reread_subscription(void);
static bool
should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
{
- if (am_tablesync_worker())
+ if (IsTablesyncWorker())
return MyLogicalRepWorker->relid == rel->localreloid;
else
return (rel->state == SUBREL_STATE_READY ||
@@ -444,7 +449,7 @@ apply_handle_commit(StringInfo s)
Assert(commit_data.commit_lsn == remote_final_lsn);
/* The synchronization worker runs in single transaction. */
- if (IsTransactionState() && !am_tablesync_worker())
+ if (IsTransactionState() && !IsTablesyncWorker())
{
/*
* Update origin state so we can restart streaming from correct
@@ -480,7 +485,7 @@ apply_handle_origin(StringInfo s)
* actual writes.
*/
if (!in_remote_transaction ||
- (IsTransactionState() && !am_tablesync_worker()))
+ (IsTransactionState() && !IsTablesyncWorker()))
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("ORIGIN message sent out of order")));
@@ -1005,7 +1010,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
/* mark as idle, before starting to loop */
pgstat_report_activity(STATE_IDLE, NULL);
- while (!got_SIGTERM)
+ for (;;)
{
pgsocket fd = PGINVALID_SOCKET;
int rc;
@@ -1015,6 +1020,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
TimestampTz last_recv_timestamp = GetCurrentTimestamp();
bool ping_sent = false;
+ CHECK_FOR_INTERRUPTS();
+
MemoryContextSwitchTo(ApplyMessageContext);
len = walrcv_receive(wrconn, &buf, &fd);
@@ -1129,7 +1136,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
TimeLineID tli;
walrcv_endstreaming(wrconn, &tli);
- break;
+ proc_exit(0);
}
/*
@@ -1329,7 +1336,6 @@ reread_subscription(void)
"stop because the subscription was removed",
MySubscription->name)));
- walrcv_disconnect(wrconn);
proc_exit(0);
}
@@ -1344,7 +1350,6 @@ reread_subscription(void)
"stop because the subscription was disabled",
MySubscription->name)));
- walrcv_disconnect(wrconn);
proc_exit(0);
}
@@ -1359,7 +1364,6 @@ reread_subscription(void)
"restart because the connection information was changed",
MySubscription->name)));
- walrcv_disconnect(wrconn);
proc_exit(0);
}
@@ -1374,7 +1378,6 @@ reread_subscription(void)
"restart because subscription was renamed",
MySubscription->name)));
- walrcv_disconnect(wrconn);
proc_exit(0);
}
@@ -1392,7 +1395,6 @@ reread_subscription(void)
"restart because the replication slot name was changed",
MySubscription->name)));
- walrcv_disconnect(wrconn);
proc_exit(0);
}
@@ -1407,7 +1409,6 @@ reread_subscription(void)
"restart because subscription's publications were changed",
MySubscription->name)));
- walrcv_disconnect(wrconn);
proc_exit(0);
}
@@ -1443,6 +1444,19 @@ subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
MySubscriptionValid = false;
}
+/* SIGHUP: set flag to reload configuration at next convenient time */
+static void
+logicalrep_worker_sighup(SIGNAL_ARGS)
+{
+ int save_errno = errno;
+
+ got_SIGHUP = true;
+
+ /* Waken anything waiting on the process latch */
+ SetLatch(MyLatch);
+
+ errno = save_errno;
+}
/* Logical Replication Apply worker entry point */
void
@@ -1460,7 +1474,7 @@ ApplyWorkerMain(Datum main_arg)
/* Setup signal handling */
pqsignal(SIGHUP, logicalrep_worker_sighup);
- pqsignal(SIGTERM, logicalrep_worker_sigterm);
+ pqsignal(SIGTERM, die);
BackgroundWorkerUnblockSignals();
/* Initialise stats to a sanish value */
@@ -1515,7 +1529,7 @@ ApplyWorkerMain(Datum main_arg)
subscription_change_cb,
(Datum) 0);
- if (am_tablesync_worker())
+ if (IsTablesyncWorker())
elog(LOG, "logical replication sync for subscription %s, table %s started",
MySubscription->name, get_rel_name(MyLogicalRepWorker->relid));
else
@@ -1528,7 +1542,7 @@ ApplyWorkerMain(Datum main_arg)
elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
MySubscription->conninfo);
- if (am_tablesync_worker())
+ if (IsTablesyncWorker())
{
char *syncslotname;
@@ -1608,8 +1622,14 @@ ApplyWorkerMain(Datum main_arg)
/* Run the main loop. */
LogicalRepApplyLoop(origin_startpos);
- walrcv_disconnect(wrconn);
+ /* Not reached. */
+}
- /* We should only get here if we received SIGTERM */
- proc_exit(0);
+/*
+ * Is current process a apply worker?
+ */
+bool
+IsApplyWorker(void)
+{
+ return MyLogicalRepWorker != NULL;
}
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 75c2d9a..1b1134c8 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -55,6 +55,7 @@
#include "pg_getopt.h"
#include "postmaster/autovacuum.h"
#include "postmaster/postmaster.h"
+#include "replication/logicalworker.h"
#include "replication/slot.h"
#include "replication/walsender.h"
#include "rewrite/rewriteHandler.h"
@@ -2845,6 +2846,10 @@ ProcessInterrupts(void)
ereport(FATAL,
(errcode(ERRCODE_ADMIN_SHUTDOWN),
errmsg("terminating autovacuum process due to administrator command")));
+ else if (IsApplyWorker())
+ ereport(FATAL,
+ (errcode(ERRCODE_ADMIN_SHUTDOWN),
+ errmsg("terminating logical replication worker due to administrator command")));
else if (RecoveryConflictPending && RecoveryConflictRetryable)
{
pgstat_report_recovery_conflict(RecoveryConflictReason);
diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h
index 3e0affa..6c71343 100644
--- a/src/include/replication/logicalworker.h
+++ b/src/include/replication/logicalworker.h
@@ -14,4 +14,7 @@
extern void ApplyWorkerMain(Datum main_arg);
+extern bool IsApplyWorker(void);
+extern bool IsTablesyncWorker(void);
+
#endif /* LOGICALWORKER_H */
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 0654461..c310ca5 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -67,8 +67,6 @@ extern Subscription *MySubscription;
extern LogicalRepWorker *MyLogicalRepWorker;
extern bool in_remote_transaction;
-extern volatile sig_atomic_t got_SIGHUP;
-extern volatile sig_atomic_t got_SIGTERM;
extern void logicalrep_worker_attach(int slot);
extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
@@ -81,17 +79,9 @@ extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
extern int logicalrep_sync_worker_count(Oid subid);
-extern void logicalrep_worker_sighup(SIGNAL_ARGS);
-extern void logicalrep_worker_sigterm(SIGNAL_ARGS);
extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos);
void process_syncing_tables(XLogRecPtr current_lsn);
void invalidate_syncing_table_states(Datum arg, int cacheid,
uint32 hashvalue);
-static inline bool
-am_tablesync_worker(void)
-{
- return OidIsValid(MyLogicalRepWorker->relid);
-}
-
#endif /* WORKER_INTERNAL_H */
--
2.7.4
On Wed, May 24, 2017 at 3:14 PM, Petr Jelinek
<petr.jelinek@2ndquadrant.com> wrote:
Hi,
I finally had time to properly analyze this, and turns out we've been
all just trying to fix symptoms and the actual problems.
Thank you for the patches!
All the locking works just fine the way it is in master.
The issue with
deadlock with apply comes from the wrong handling of the SIGTERM in the
apply (we didn't set InterruptPending). I changed the SIGTERM handler in
patch 0001 just to die which is actually the correct behavior for apply
workers. I also moved the connection cleanup code to the
before_shmem_exit callback (similar to walreceiver) and now that part
works correctly.The issue with orphaned sync workers is actually two separate issues.
First, due to thinko we always searched for sync worker in
wait_for_sync_status_change instead of searching for opposite worker as
was the intention (i.e. sync worker should search for apply and apply
should search for sync). Thats fixed by 0002. And second, we didn't
accept any invalidation messages until the whole sync process finished
(because it flattens all the remote transactions in the single one) so
sync worker didn't learn about subscription changes/drop until it has
finished, which I now fixed in 0003.There is still outstanding issue that sync worker will keep running
inside the long COPY because the invalidation messages are also not
processed until it finishes but all the original issues reported here
disappear for me with the attached patches applied.
The issue reported on this thread seems to be solved with your patch.
But because DROP SUBSCRIPTION is only one DDL command that acquires
lock on system catalog with AccessExclusiveLock and the logical
replication mechanism is complex, I'm concerned that there might be
another potential deadlock issue due to acquire lock on system catalog
with strong lock level. It might be good idea to do either
fine-grained locking or reducing lock level or both.
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 05/25/2017 12:44 AM, Petr Jelinek wrote:
There is still outstanding issue that sync worker will keep running
inside the long COPY because the invalidation messages are also not
processed until it finishes but all the original issues reported here
disappear for me with the attached patches applied.
After applying all your patches, drop subscription no more hangs while
dropping subscription but there is an error "ERROR: tuple
concurrently updated" in the log file of
standby.
---
logical replication synchronization worker finished processing
2017-05-25 09:15:52.654 BST [18575] LOG: logical replication
synchronization worker finished processing
2017-05-25 09:15:52.656 BST [18563] LOG: starting logical replication
worker for subscription "sub"
2017-05-25 09:15:52.662 BST [18577] LOG: logical replication sync for
subscription sub, table t14 started
2017-05-25 09:15:53.657 BST [18563] LOG: starting logical replication
worker for subscription "sub"
2017-05-25 09:15:53.663 BST [18579] LOG: logical replication sync for
subscription sub, table t15 started
2017-05-25 09:15:53.724 BST [18563] FATAL: terminating logical
replication worker due to administrator command
2017-05-25 09:15:53.725 BST [18521] LOG: worker process: logical
replication worker for subscription 16684 (PID 18563) exited with exit
code 1
2017-05-25 09:15:54.734 BST [18579] ERROR: tuple concurrently updated
2017-05-25 09:15:54.735 BST [18577] ERROR: tuple concurrently updated
2017-05-25 09:15:54.736 BST [18521] LOG: worker process: logical
replication worker for subscription 16684 sync 16426 (PID 18579) exited
with exit code 1
2017-05-25 09:15:54.736 BST [18521] LOG: worker process: logical
replication worker for subscription 16684 sync 16423 (PID 18577) exited
with exit code 1
~
~
~
Steps to reproduce -
X cluster -> create 100 tables , publish all tables (create publication
pub for all tables);
Y Cluster -> create 100 tables ,create subscription(create subscription
sub connection 'user=centos host=localhost' publication pub;
Y cluster ->drop subscription - drop subscription sub;
check the log file on Y cluster.
Sometime , i have seen this error on psql prompt and drop subscription
operation got failed at first attempt.
postgres=# drop subscription sub;
ERROR: tuple concurrently updated
postgres=# drop subscription sub;
NOTICE: dropped replication slot "sub" on publisher
DROP SUBSCRIPTION
--
regards,tushar
EnterpriseDB https://www.enterprisedb.com/
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Thu, May 18, 2017 at 10:27:51PM -0400, Peter Eisentraut wrote:
On 5/18/17 11:11, Noah Misch wrote:
IMMEDIATE ATTENTION REQUIRED. This PostgreSQL 10 open item is past due for
your status update. Please reacquaint yourself with the policy on open item
ownership[1] and then reply immediately. If I do not hear from you by
2017-05-19 16:00 UTC, I will transfer this item to release management team
ownership without further notice.There is no progress on this issue at the moment. I will report again
next Wednesday.
IMMEDIATE ATTENTION REQUIRED. This PostgreSQL 10 open item is long past due
for your status update. If I do not hear one from you by 2017-05-31 02:00
UTC, I will transfer this item to release management team ownership without
further notice.
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Tue, May 30, 2017 at 01:30:35AM +0000, Noah Misch wrote:
On Thu, May 18, 2017 at 10:27:51PM -0400, Peter Eisentraut wrote:
On 5/18/17 11:11, Noah Misch wrote:
IMMEDIATE ATTENTION REQUIRED. This PostgreSQL 10 open item is past due for
your status update. Please reacquaint yourself with the policy on open item
ownership[1] and then reply immediately. If I do not hear from you by
2017-05-19 16:00 UTC, I will transfer this item to release management team
ownership without further notice.There is no progress on this issue at the moment. I will report again
next Wednesday.IMMEDIATE ATTENTION REQUIRED. This PostgreSQL 10 open item is long past due
for your status update. If I do not hear one from you by 2017-05-31 02:00
UTC, I will transfer this item to release management team ownership without
further notice.
This PostgreSQL 10 open item now needs a permanent owner. Would any other
committer like to take ownership? If this role interests you, please read
this thread and the policy linked above, then send an initial status update
bearing a date for your subsequent status update. If the item does not have a
permanent owner by 2017-06-03 07:00 UTC, I will investigate feature removals
that would resolve the item.
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Thu, May 25, 2017 at 4:14 AM, Petr Jelinek
<petr.jelinek@2ndquadrant.com> wrote:
Hi,
I finally had time to properly analyze this, and turns out we've been
all just trying to fix symptoms and the actual problems.All the locking works just fine the way it is in master. The issue with
deadlock with apply comes from the wrong handling of the SIGTERM in the
apply (we didn't set InterruptPending). I changed the SIGTERM handler in
patch 0001 just to die which is actually the correct behavior for apply
workers. I also moved the connection cleanup code to the
before_shmem_exit callback (similar to walreceiver) and now that part
works correctly.The issue with orphaned sync workers is actually two separate issues.
First, due to thinko we always searched for sync worker in
wait_for_sync_status_change instead of searching for opposite worker as
was the intention (i.e. sync worker should search for apply and apply
should search for sync). Thats fixed by 0002. And second, we didn't
accept any invalidation messages until the whole sync process finished
(because it flattens all the remote transactions in the single one) so
sync worker didn't learn about subscription changes/drop until it has
finished, which I now fixed in 0003.There is still outstanding issue that sync worker will keep running
inside the long COPY because the invalidation messages are also not
processed until it finishes but all the original issues reported here
disappear for me with the attached patches applied.
These patches conflict with current HEAD, I attached updated version patches.
Also, the issue that sync worker will keep running inside the long
COPY can lead the another problem that the user could not create new
subscription with some workers due to not enough free logical
replication worker slots until the long COPY finishes. Attached 0004
patch is the updated version patch I submitted before.
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
Attachments:
0001-Fix-signal-handling-in-logical-workers.patchapplication/octet-stream; name=0001-Fix-signal-handling-in-logical-workers.patchDownload
From 17fe2c62444916fb3ba4417701d4c25b625d5622 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Thu, 1 Jun 2017 11:56:16 +0900
Subject: [PATCH 1/4] Fix signal handling in logical workers
---
src/backend/replication/logical/launcher.c | 22 +++++++-----
src/backend/replication/logical/tablesync.c | 23 +++++++-----
src/backend/replication/logical/worker.c | 54 ++++++++++++++++++++---------
src/backend/tcop/postgres.c | 5 +++
src/include/replication/logicalworker.h | 3 ++
src/include/replication/worker_internal.h | 10 ------
6 files changed, 74 insertions(+), 43 deletions(-)
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index b956052..345a415 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -80,8 +80,8 @@ static void logicalrep_worker_detach(void);
static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
/* Flags set by signal handlers */
-volatile sig_atomic_t got_SIGHUP = false;
-volatile sig_atomic_t got_SIGTERM = false;
+static volatile sig_atomic_t got_SIGHUP = false;
+static volatile sig_atomic_t got_SIGTERM = false;
static bool on_commit_launcher_wakeup = false;
@@ -614,12 +614,18 @@ logicalrep_launcher_onexit(int code, Datum arg)
static void
logicalrep_worker_onexit(int code, Datum arg)
{
+ /* Disconnect gracefully from the remote side. */
+ if (wrconn)
+ walrcv_disconnect(wrconn);
+
logicalrep_worker_detach();
+
+ ApplyLauncherWakeup();
}
/* SIGTERM: set flag to exit at next convenient time */
-void
-logicalrep_worker_sigterm(SIGNAL_ARGS)
+static void
+logicalrep_launcher_sigterm(SIGNAL_ARGS)
{
int save_errno = errno;
@@ -632,8 +638,8 @@ logicalrep_worker_sigterm(SIGNAL_ARGS)
}
/* SIGHUP: set flag to reload configuration at next convenient time */
-void
-logicalrep_worker_sighup(SIGNAL_ARGS)
+static void
+logicalrep_launcher_sighup(SIGNAL_ARGS)
{
int save_errno = errno;
@@ -793,8 +799,8 @@ ApplyLauncherMain(Datum main_arg)
before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
/* Establish signal handlers. */
- pqsignal(SIGHUP, logicalrep_worker_sighup);
- pqsignal(SIGTERM, logicalrep_worker_sigterm);
+ pqsignal(SIGHUP, logicalrep_launcher_sighup);
+ pqsignal(SIGTERM, logicalrep_launcher_sigterm);
BackgroundWorkerUnblockSignals();
/* Make it easy to identify our processes. */
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index fe45fb8..e61237f 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -97,6 +97,7 @@
#include "replication/logicallauncher.h"
#include "replication/logicalrelation.h"
+#include "replication/logicalworker.h"
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
@@ -140,7 +141,6 @@ finish_sync_worker(void)
CommitTransactionCommand();
/* Stop gracefully */
- walrcv_disconnect(wrconn);
proc_exit(0);
}
@@ -155,10 +155,12 @@ wait_for_sync_status_change(Oid relid, char origstate)
int rc;
char state = origstate;
- while (!got_SIGTERM)
+ for (;;)
{
LogicalRepWorker *worker;
+ CHECK_FOR_INTERRUPTS();
+
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
relid, false);
@@ -479,7 +481,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
void
process_syncing_tables(XLogRecPtr current_lsn)
{
- if (am_tablesync_worker())
+ if (IsTablesyncWorker())
process_syncing_tables_for_sync(current_lsn);
else
process_syncing_tables_for_apply(current_lsn);
@@ -526,7 +528,7 @@ copy_read_data(void *outbuf, int minread, int maxread)
bytesread += avail;
}
- while (!got_SIGTERM && maxread > 0 && bytesread < minread)
+ while (maxread > 0 && bytesread < minread)
{
pgsocket fd = PGINVALID_SOCKET;
int rc;
@@ -580,10 +582,6 @@ copy_read_data(void *outbuf, int minread, int maxread)
ResetLatch(&MyProc->procLatch);
}
- /* Check for exit condition. */
- if (got_SIGTERM)
- proc_exit(0);
-
return bytesread;
}
@@ -903,3 +901,12 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
return slotname;
}
+
+/*
+ * Is current process a tablesync worker?
+ */
+bool
+IsTablesyncWorker(void)
+{
+ return MyLogicalRepWorker != NULL && OidIsValid(MyLogicalRepWorker->relid);
+}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index c67720b..889e5f3 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -72,6 +72,8 @@
#include "storage/proc.h"
#include "storage/procarray.h"
+#include "tcop/tcopprot.h"
+
#include "utils/builtins.h"
#include "utils/catcache.h"
#include "utils/datum.h"
@@ -118,6 +120,9 @@ static void store_flush_position(XLogRecPtr remote_lsn);
static void reread_subscription(void);
+/* Flags set by signal handlers */
+static volatile sig_atomic_t got_SIGHUP = false;
+
/*
* Should this worker apply changes for given relation.
*
@@ -134,7 +139,7 @@ static void reread_subscription(void);
static bool
should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
{
- if (am_tablesync_worker())
+ if (IsTablesyncWorker())
return MyLogicalRepWorker->relid == rel->localreloid;
else
return (rel->state == SUBREL_STATE_READY ||
@@ -444,7 +449,7 @@ apply_handle_commit(StringInfo s)
Assert(commit_data.commit_lsn == remote_final_lsn);
/* The synchronization worker runs in single transaction. */
- if (IsTransactionState() && !am_tablesync_worker())
+ if (IsTransactionState() && !IsTablesyncWorker())
{
/*
* Update origin state so we can restart streaming from correct
@@ -480,7 +485,7 @@ apply_handle_origin(StringInfo s)
* actual writes.
*/
if (!in_remote_transaction ||
- (IsTransactionState() && !am_tablesync_worker()))
+ (IsTransactionState() && !IsTablesyncWorker()))
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("ORIGIN message sent out of order")));
@@ -1005,7 +1010,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
/* mark as idle, before starting to loop */
pgstat_report_activity(STATE_IDLE, NULL);
- while (!got_SIGTERM)
+ for (;;)
{
pgsocket fd = PGINVALID_SOCKET;
int rc;
@@ -1015,6 +1020,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
TimestampTz last_recv_timestamp = GetCurrentTimestamp();
bool ping_sent = false;
+ CHECK_FOR_INTERRUPTS();
+
MemoryContextSwitchTo(ApplyMessageContext);
len = walrcv_receive(wrconn, &buf, &fd);
@@ -1129,7 +1136,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
TimeLineID tli;
walrcv_endstreaming(wrconn, &tli);
- break;
+ proc_exit(0);
}
/*
@@ -1329,7 +1336,6 @@ reread_subscription(void)
"stop because the subscription was removed",
MySubscription->name)));
- walrcv_disconnect(wrconn);
proc_exit(0);
}
@@ -1344,7 +1350,6 @@ reread_subscription(void)
"stop because the subscription was disabled",
MySubscription->name)));
- walrcv_disconnect(wrconn);
proc_exit(0);
}
@@ -1359,7 +1364,6 @@ reread_subscription(void)
"restart because the connection information was changed",
MySubscription->name)));
- walrcv_disconnect(wrconn);
proc_exit(0);
}
@@ -1374,7 +1378,6 @@ reread_subscription(void)
"restart because subscription was renamed",
MySubscription->name)));
- walrcv_disconnect(wrconn);
proc_exit(0);
}
@@ -1392,7 +1395,6 @@ reread_subscription(void)
"restart because the replication slot name was changed",
MySubscription->name)));
- walrcv_disconnect(wrconn);
proc_exit(0);
}
@@ -1407,7 +1409,6 @@ reread_subscription(void)
"restart because subscription's publications were changed",
MySubscription->name)));
- walrcv_disconnect(wrconn);
proc_exit(0);
}
@@ -1443,6 +1444,19 @@ subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
MySubscriptionValid = false;
}
+/* SIGHUP: set flag to reload configuration at next convenient time */
+static void
+logicalrep_worker_sighup(SIGNAL_ARGS)
+{
+ int save_errno = errno;
+
+ got_SIGHUP = true;
+
+ /* Waken anything waiting on the process latch */
+ SetLatch(MyLatch);
+
+ errno = save_errno;
+}
/* Logical Replication Apply worker entry point */
void
@@ -1460,7 +1474,7 @@ ApplyWorkerMain(Datum main_arg)
/* Setup signal handling */
pqsignal(SIGHUP, logicalrep_worker_sighup);
- pqsignal(SIGTERM, logicalrep_worker_sigterm);
+ pqsignal(SIGTERM, die);
BackgroundWorkerUnblockSignals();
/* Initialise stats to a sanish value */
@@ -1515,7 +1529,7 @@ ApplyWorkerMain(Datum main_arg)
subscription_change_cb,
(Datum) 0);
- if (am_tablesync_worker())
+ if (IsTablesyncWorker())
ereport(LOG,
(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
MySubscription->name, get_rel_name(MyLogicalRepWorker->relid))));
@@ -1530,7 +1544,7 @@ ApplyWorkerMain(Datum main_arg)
elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
MySubscription->conninfo);
- if (am_tablesync_worker())
+ if (IsTablesyncWorker())
{
char *syncslotname;
@@ -1610,8 +1624,14 @@ ApplyWorkerMain(Datum main_arg)
/* Run the main loop. */
LogicalRepApplyLoop(origin_startpos);
- walrcv_disconnect(wrconn);
+ /* Not reached. */
+}
- /* We should only get here if we received SIGTERM */
- proc_exit(0);
+/*
+ * Is current process a apply worker?
+ */
+bool
+IsApplyWorker(void)
+{
+ return MyLogicalRepWorker != NULL;
}
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 75c2d9a..1b1134c8 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -55,6 +55,7 @@
#include "pg_getopt.h"
#include "postmaster/autovacuum.h"
#include "postmaster/postmaster.h"
+#include "replication/logicalworker.h"
#include "replication/slot.h"
#include "replication/walsender.h"
#include "rewrite/rewriteHandler.h"
@@ -2845,6 +2846,10 @@ ProcessInterrupts(void)
ereport(FATAL,
(errcode(ERRCODE_ADMIN_SHUTDOWN),
errmsg("terminating autovacuum process due to administrator command")));
+ else if (IsApplyWorker())
+ ereport(FATAL,
+ (errcode(ERRCODE_ADMIN_SHUTDOWN),
+ errmsg("terminating logical replication worker due to administrator command")));
else if (RecoveryConflictPending && RecoveryConflictRetryable)
{
pgstat_report_recovery_conflict(RecoveryConflictReason);
diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h
index 3e0affa..6c71343 100644
--- a/src/include/replication/logicalworker.h
+++ b/src/include/replication/logicalworker.h
@@ -14,4 +14,7 @@
extern void ApplyWorkerMain(Datum main_arg);
+extern bool IsApplyWorker(void);
+extern bool IsTablesyncWorker(void);
+
#endif /* LOGICALWORKER_H */
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 0654461..c310ca5 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -67,8 +67,6 @@ extern Subscription *MySubscription;
extern LogicalRepWorker *MyLogicalRepWorker;
extern bool in_remote_transaction;
-extern volatile sig_atomic_t got_SIGHUP;
-extern volatile sig_atomic_t got_SIGTERM;
extern void logicalrep_worker_attach(int slot);
extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
@@ -81,17 +79,9 @@ extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
extern int logicalrep_sync_worker_count(Oid subid);
-extern void logicalrep_worker_sighup(SIGNAL_ARGS);
-extern void logicalrep_worker_sigterm(SIGNAL_ARGS);
extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos);
void process_syncing_tables(XLogRecPtr current_lsn);
void invalidate_syncing_table_states(Datum arg, int cacheid,
uint32 hashvalue);
-static inline bool
-am_tablesync_worker(void)
-{
- return OidIsValid(MyLogicalRepWorker->relid);
-}
-
#endif /* WORKER_INTERNAL_H */
--
2.8.1
0002-Make-tablesync-worker-exit-when-apply-dies-while-it-.patchapplication/octet-stream; name=0002-Make-tablesync-worker-exit-when-apply-dies-while-it-.patchDownload
From bd2ce07dc709354bad7e707e4e6e6bd004db353e Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmodos@pjmodos.net>
Date: Wed, 24 May 2017 20:37:12 +0200
Subject: [PATCH 2/4] Make tablesync worker exit when apply dies while it was
waiting for it
---
src/backend/replication/logical/tablesync.c | 21 ++++++++++++++++++---
1 file changed, 18 insertions(+), 3 deletions(-)
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index e61237f..b82ed25 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -147,7 +147,12 @@ finish_sync_worker(void)
/*
* Wait until the table synchronization change.
*
- * Returns false if the relation subscription state disappeared.
+ * If called from apply worker, it will wait for the synchronization worker
+ * to change table state in shmem, and when called from synchronization
+ * worker it will wait for apply worker to change table state in shmem.
+ *
+ * Returns false if the opposite worker has disappeared or table state has
+ * been reset.
*/
static bool
wait_for_sync_status_change(Oid relid, char origstate)
@@ -162,14 +167,24 @@ wait_for_sync_status_change(Oid relid, char origstate)
CHECK_FOR_INTERRUPTS();
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+
+ /* Check if the opposite worker is still running and bail if not. */
worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
- relid, false);
+ IsTablesyncWorker() ? InvalidOid : relid,
+ false);
if (!worker)
{
LWLockRelease(LogicalRepWorkerLock);
return false;
}
+
+ /* From now on worker is expected to be synchronization worker. */
+ if (IsTablesyncWorker())
+ worker = MyLogicalRepWorker;
+
+ Assert(worker->relid == relid);
state = worker->relstate;
+
LWLockRelease(LogicalRepWorkerLock);
if (state == SUBREL_STATE_UNKNOWN)
@@ -180,7 +195,7 @@ wait_for_sync_status_change(Oid relid, char origstate)
rc = WaitLatch(&MyProc->procLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
- 10000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
+ 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
/* emergency bailout if postmaster has died */
if (rc & WL_POSTMASTER_DEATH)
--
2.8.1
0003-Receive-invalidation-messages-correctly-in-tablesync.patchapplication/octet-stream; name=0003-Receive-invalidation-messages-correctly-in-tablesync.patchDownload
From 958b31f3156cd97d96b164dc0d7263988820f420 Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmodos@pjmodos.net>
Date: Wed, 24 May 2017 21:07:15 +0200
Subject: [PATCH 3/4] Receive invalidation messages correctly in tablesync
worker
---
src/backend/replication/logical/worker.c | 23 +++++++++++++++--------
1 file changed, 15 insertions(+), 8 deletions(-)
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 889e5f3..194d9d1 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -118,7 +118,7 @@ static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
static void store_flush_position(XLogRecPtr remote_lsn);
-static void reread_subscription(void);
+static void maybe_reread_subscription(void);
/* Flags set by signal handlers */
static volatile sig_atomic_t got_SIGHUP = false;
@@ -165,8 +165,7 @@ ensure_transaction(void)
StartTransactionCommand();
- if (!MySubscriptionValid)
- reread_subscription();
+ maybe_reread_subscription();
MemoryContextSwitchTo(ApplyMessageContext);
return true;
@@ -463,6 +462,12 @@ apply_handle_commit(StringInfo s)
store_flush_position(commit_data.end_lsn);
}
+ else
+ {
+ /* Process any invalidation messages that might have accumulated. */
+ AcceptInvalidationMessages();
+ maybe_reread_subscription();
+ }
in_remote_transaction = false;
@@ -1119,8 +1124,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
* now.
*/
AcceptInvalidationMessages();
- if (!MySubscriptionValid)
- reread_subscription();
+ maybe_reread_subscription();
/* Process any table synchronization changes. */
process_syncing_tables(last_received);
@@ -1302,17 +1306,20 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
last_flushpos = flushpos;
}
-
/*
- * Reread subscription info and exit on change.
+ * Reread subscription info if needed. Most changes will be exit.
*/
static void
-reread_subscription(void)
+maybe_reread_subscription(void)
{
MemoryContext oldctx;
Subscription *newsub;
bool started_tx = false;
+ /* When cache state is valid there is nothing to do here. */
+ if (MySubscriptionValid)
+ return;
+
/* This function might be called inside or outside of transaction. */
if (!IsTransactionState())
{
--
2.8.1
0004-Wait-for-table-sync-worker-to-finish-when-apply-work.patchapplication/octet-stream; name=0004-Wait-for-table-sync-worker-to-finish-when-apply-work.patchDownload
From 54476f422646209ddb21c1e073c1a41852dfb31d Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Thu, 1 Jun 2017 12:26:41 +0900
Subject: [PATCH 4/4] Wait for table sync worker to finish when apply worker
exits.
---
src/backend/replication/logical/launcher.c | 51 +++++++++++++++++++++++++++++-
src/include/replication/worker_internal.h | 1 +
2 files changed, 51 insertions(+), 1 deletion(-)
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 345a415..0ae3751 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -404,8 +404,53 @@ retry:
}
/*
+ * Stop all table sync workers associated with given subid.
+ *
+ * This function is called by apply worker. Since table sync
+ * worker associated with same subscription is launched by
+ * only the apply worker. We don't need to acquire
+ * LogicalRepLauncherLock here.
+ */
+void
+logicalrep_sync_workers_stop(Oid subid)
+{
+ List *relid_list = NIL;
+ ListCell *cell;
+ int i;
+
+ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+
+ /*
+ * Walks the workers array and get relid list that matches
+ * given subscription id.
+ */
+ for (i = 0; i < max_logical_replication_workers; i++)
+ {
+ LogicalRepWorker *w = &LogicalRepCtx->workers[i];
+
+ if (w->in_use && w->subid == subid &&
+ OidIsValid(w->relid))
+ relid_list = lappend_oid(relid_list, w->relid);
+ }
+
+ LWLockRelease(LogicalRepWorkerLock);
+
+ /* Return if there is no table sync worker associated with myself */
+ if (relid_list == NIL)
+ return;
+
+ foreach (cell, relid_list)
+ {
+ Oid relid = lfirst_oid(cell);
+
+ logicalrep_worker_stop(subid, relid);
+ }
+}
+
+/*
* Stop the logical replication worker and wait until it detaches from the
- * slot.
+ * slot. This function can be called by both logical replication launcher
+ * and apply worker to stop apply worker and table sync worker.
*/
void
logicalrep_worker_stop(Oid subid, Oid relid)
@@ -571,6 +616,10 @@ logicalrep_worker_attach(int slot)
static void
logicalrep_worker_detach(void)
{
+ /* Stop all sync workers associated if apply worker */
+ if (!IsTablesyncWorker())
+ logicalrep_sync_workers_stop(MyLogicalRepWorker->subid);
+
/* Block concurrent access. */
LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index c310ca5..e20933a 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -76,6 +76,7 @@ extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
extern void logicalrep_worker_stop(Oid subid, Oid relid);
extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
+extern void logicalrep_sync_workers_stop(Oid subid);
extern int logicalrep_sync_worker_count(Oid subid);
--
2.8.1
On 5/31/17 02:51, Noah Misch wrote:
On Tue, May 30, 2017 at 01:30:35AM +0000, Noah Misch wrote:
On Thu, May 18, 2017 at 10:27:51PM -0400, Peter Eisentraut wrote:
On 5/18/17 11:11, Noah Misch wrote:
IMMEDIATE ATTENTION REQUIRED. This PostgreSQL 10 open item is past due for
your status update. Please reacquaint yourself with the policy on open item
ownership[1] and then reply immediately. If I do not hear from you by
2017-05-19 16:00 UTC, I will transfer this item to release management team
ownership without further notice.There is no progress on this issue at the moment. I will report again
next Wednesday.IMMEDIATE ATTENTION REQUIRED. This PostgreSQL 10 open item is long past due
for your status update. If I do not hear one from you by 2017-05-31 02:00
UTC, I will transfer this item to release management team ownership without
further notice.This PostgreSQL 10 open item now needs a permanent owner. Would any other
committer like to take ownership? If this role interests you, please read
this thread and the policy linked above, then send an initial status update
bearing a date for your subsequent status update. If the item does not have a
permanent owner by 2017-06-03 07:00 UTC, I will investigate feature removals
that would resolve the item.
It seems I lost track of this item between all the other ones. I will
continue to work on this item. We have patches proposed and I will work
on committing them until Friday.
I think I now have updates posted on all my items.
--
Peter Eisentraut http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Thu, May 25, 2017 at 5:29 PM, tushar <tushar.ahuja@enterprisedb.com> wrote:
On 05/25/2017 12:44 AM, Petr Jelinek wrote:
There is still outstanding issue that sync worker will keep running
inside the long COPY because the invalidation messages are also not
processed until it finishes but all the original issues reported here
disappear for me with the attached patches applied.After applying all your patches, drop subscription no more hangs while
dropping subscription but there is an error "ERROR: tuple concurrently
updated" in the log file of
standby.
I tried to reproduce this issue with latest four patches I submit but
it didn't happen. I guess this issue doesn't related to the issues
reported on this thread and another factor might be cause of this
issue. It would be good to test the same again with latest four
patches or after solved current some open items.
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 01/06/17 17:32, Masahiko Sawada wrote:
On Thu, May 25, 2017 at 5:29 PM, tushar <tushar.ahuja@enterprisedb.com> wrote:
On 05/25/2017 12:44 AM, Petr Jelinek wrote:
There is still outstanding issue that sync worker will keep running
inside the long COPY because the invalidation messages are also not
processed until it finishes but all the original issues reported here
disappear for me with the attached patches applied.After applying all your patches, drop subscription no more hangs while
dropping subscription but there is an error "ERROR: tuple concurrently
updated" in the log file of
standby.I tried to reproduce this issue with latest four patches I submit but
it didn't happen. I guess this issue doesn't related to the issues
reported on this thread and another factor might be cause of this
issue. It would be good to test the same again with latest four
patches or after solved current some open items.
That's because your additional patch kills the workers that do the
concurrent update. While that's probably okay, I still plan to look into
making the subscription and state locking more robust.
--
Petr Jelinek http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Thu, Jun 01, 2017 at 12:00:33AM -0400, Peter Eisentraut wrote:
On 5/31/17 02:51, Noah Misch wrote:
On Tue, May 30, 2017 at 01:30:35AM +0000, Noah Misch wrote:
On Thu, May 18, 2017 at 10:27:51PM -0400, Peter Eisentraut wrote:
On 5/18/17 11:11, Noah Misch wrote:
IMMEDIATE ATTENTION REQUIRED. This PostgreSQL 10 open item is past due for
your status update. Please reacquaint yourself with the policy on open item
ownership[1] and then reply immediately. If I do not hear from you by
2017-05-19 16:00 UTC, I will transfer this item to release management team
ownership without further notice.There is no progress on this issue at the moment. I will report again
next Wednesday.IMMEDIATE ATTENTION REQUIRED. This PostgreSQL 10 open item is long past due
for your status update. If I do not hear one from you by 2017-05-31 02:00
UTC, I will transfer this item to release management team ownership without
further notice.This PostgreSQL 10 open item now needs a permanent owner. Would any other
committer like to take ownership? If this role interests you, please read
this thread and the policy linked above, then send an initial status update
bearing a date for your subsequent status update. If the item does not have a
permanent owner by 2017-06-03 07:00 UTC, I will investigate feature removals
that would resolve the item.It seems I lost track of this item between all the other ones. I will
continue to work on this item. We have patches proposed and I will work
on committing them until Friday.
If any other committer cares about logical replication features in v10, I'd
recommend he take ownership of this open item despite your plan to work on it.
Otherwise, if you miss fixing this on Friday, it will be too late for others
to volunteer.
I think I now have updates posted on all my items.
As of your writing that, two of your open items had no conforming status
update, and they still don't:
- Background worker display in pg_stat_activity (logical replication especially)
- ALTER SUBSCRIPTION REFRESH and table sync worker
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 5/24/17 15:14, Petr Jelinek wrote:
All the locking works just fine the way it is in master. The issue with
deadlock with apply comes from the wrong handling of the SIGTERM in the
apply (we didn't set InterruptPending). I changed the SIGTERM handler in
patch 0001 just to die which is actually the correct behavior for apply
workers. I also moved the connection cleanup code to the
before_shmem_exit callback (similar to walreceiver) and now that part
works correctly.
I have committed this, in two separate parts. This should fix the
originally reported issue.
I will continue to work through your other patches. I notice there is
still a bit of discussion about another patch, so please let me know if
there is anything else I should be looking for.
--
Peter Eisentraut http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 6/2/17 14:52, Peter Eisentraut wrote:
On 5/24/17 15:14, Petr Jelinek wrote:
All the locking works just fine the way it is in master. The issue with
deadlock with apply comes from the wrong handling of the SIGTERM in the
apply (we didn't set InterruptPending). I changed the SIGTERM handler in
patch 0001 just to die which is actually the correct behavior for apply
workers. I also moved the connection cleanup code to the
before_shmem_exit callback (similar to walreceiver) and now that part
works correctly.I have committed this, in two separate parts. This should fix the
originally reported issue.I will continue to work through your other patches. I notice there is
still a bit of discussion about another patch, so please let me know if
there is anything else I should be looking for.
I have committed the remaining two patches. I believe this fixes the
originally reported issue.
--
Peter Eisentraut http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Sun, Jun 4, 2017 at 1:53 AM, Peter Eisentraut
<peter.eisentraut@2ndquadrant.com> wrote:
On 6/2/17 14:52, Peter Eisentraut wrote:
On 5/24/17 15:14, Petr Jelinek wrote:
All the locking works just fine the way it is in master. The issue with
deadlock with apply comes from the wrong handling of the SIGTERM in the
apply (we didn't set InterruptPending). I changed the SIGTERM handler in
patch 0001 just to die which is actually the correct behavior for apply
workers. I also moved the connection cleanup code to the
before_shmem_exit callback (similar to walreceiver) and now that part
works correctly.I have committed this, in two separate parts. This should fix the
originally reported issue.I will continue to work through your other patches. I notice there is
still a bit of discussion about another patch, so please let me know if
there is anything else I should be looking for.I have committed the remaining two patches. I believe this fixes the
originally reported issue.
IIUC the issue that sync worker could be orphaned and keep running
inside the long COPY is not fixed yet by commit
3c9bc2157a4f465b3c070d1250597568d2dc285f, and should be fixed. Am I
missing something?
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 03/06/17 18:53, Peter Eisentraut wrote:
On 6/2/17 14:52, Peter Eisentraut wrote:
On 5/24/17 15:14, Petr Jelinek wrote:
All the locking works just fine the way it is in master. The issue with
deadlock with apply comes from the wrong handling of the SIGTERM in the
apply (we didn't set InterruptPending). I changed the SIGTERM handler in
patch 0001 just to die which is actually the correct behavior for apply
workers. I also moved the connection cleanup code to the
before_shmem_exit callback (similar to walreceiver) and now that part
works correctly.I have committed this, in two separate parts. This should fix the
originally reported issue.I will continue to work through your other patches. I notice there is
still a bit of discussion about another patch, so please let me know if
there is anything else I should be looking for.I have committed the remaining two patches. I believe this fixes the
originally reported issue.
So the fact that we moved workers to standard interrupt handling broke
launcher in subtle ways because it still uses it's own SIGTERM handling
but some function it calls are using CHECK_FOR_INTERRUPTS (they are used
by worker as well). I think we need to move launcher to standard
interrupt handling as well. It's not same as other processes though as
it's allowed to be terminated any time (just like autovacuum launcher)
so we just proc_exit(0) instead of FATALing out.
This is related to the nightjar failures btw.
As a side note, we are starting to have several IsSomeTypeOfProcess
functions for these kind of things. I wonder if bgworker infrastructure
should somehow provide this type of stuff (the proposed bgw_type might
help there) as well as maybe being able to register interrupt handler
for the worker (it's really hard to do it via custom SIGTERM handler as
visible on worker, launcher and walsender issues we are fixing).
Obviously this is PG11 related thought.
--
Petr Jelinek http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
Attachments:
Use-standard-interrupt-handling-in-logical-replicati.patchtext/x-patch; name=Use-standard-interrupt-handling-in-logical-replicati.patchDownload
From 3fb7ed57d669beba3feba894a11c9dcff8e60414 Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmodos@pjmodos.net>
Date: Tue, 6 Jun 2017 19:26:12 +0200
Subject: [PATCH] Use standard interrupt handling in logical replication
launcher
---
src/backend/replication/logical/launcher.c | 41 ++++++++++++------------------
src/backend/tcop/postgres.c | 9 +++++++
src/include/replication/logicallauncher.h | 2 ++
3 files changed, 27 insertions(+), 25 deletions(-)
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 5aaf24b..52169f1 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -81,7 +81,6 @@ static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
/* Flags set by signal handlers */
static volatile sig_atomic_t got_SIGHUP = false;
-static volatile sig_atomic_t got_SIGTERM = false;
static bool on_commit_launcher_wakeup = false;
@@ -623,20 +622,6 @@ logicalrep_worker_onexit(int code, Datum arg)
ApplyLauncherWakeup();
}
-/* SIGTERM: set flag to exit at next convenient time */
-static void
-logicalrep_launcher_sigterm(SIGNAL_ARGS)
-{
- int save_errno = errno;
-
- got_SIGTERM = true;
-
- /* Waken anything waiting on the process latch */
- SetLatch(MyLatch);
-
- errno = save_errno;
-}
-
/* SIGHUP: set flag to reload configuration at next convenient time */
static void
logicalrep_launcher_sighup(SIGNAL_ARGS)
@@ -798,13 +783,14 @@ ApplyLauncherMain(Datum main_arg)
before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
+ Assert(LogicalRepCtx->launcher_pid == 0);
+ LogicalRepCtx->launcher_pid = MyProcPid;
+
/* Establish signal handlers. */
pqsignal(SIGHUP, logicalrep_launcher_sighup);
- pqsignal(SIGTERM, logicalrep_launcher_sigterm);
+ pqsignal(SIGTERM, die);
BackgroundWorkerUnblockSignals();
- LogicalRepCtx->launcher_pid = MyProcPid;
-
/*
* Establish connection to nailed catalogs (we only ever access
* pg_subscription).
@@ -812,7 +798,7 @@ ApplyLauncherMain(Datum main_arg)
BackgroundWorkerInitializeConnection(NULL, NULL);
/* Enter main loop */
- while (!got_SIGTERM)
+ for (;;)
{
int rc;
List *sublist;
@@ -822,6 +808,8 @@ ApplyLauncherMain(Datum main_arg)
TimestampTz now;
long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
+ CHECK_FOR_INTERRUPTS();
+
now = GetCurrentTimestamp();
/* Limit the start retry to once a wal_retrieve_retry_interval */
@@ -894,13 +882,16 @@ ApplyLauncherMain(Datum main_arg)
ResetLatch(&MyProc->procLatch);
}
- LogicalRepCtx->launcher_pid = 0;
-
- /* ... and if it returns, we're done */
- ereport(DEBUG1,
- (errmsg("logical replication launcher shutting down")));
+ /* Not reachable */
+}
- proc_exit(0);
+/*
+ * Is current process the logical replication launcher?
+ */
+bool
+IsLogicalLauncher(void)
+{
+ return LogicalRepCtx->launcher_pid == MyProcPid;
}
/*
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 1c60b43..91ca8df 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -55,6 +55,7 @@
#include "pg_getopt.h"
#include "postmaster/autovacuum.h"
#include "postmaster/postmaster.h"
+#include "replication/logicallauncher.h"
#include "replication/logicalworker.h"
#include "replication/slot.h"
#include "replication/walsender.h"
@@ -2848,6 +2849,14 @@ ProcessInterrupts(void)
ereport(FATAL,
(errcode(ERRCODE_ADMIN_SHUTDOWN),
errmsg("terminating logical replication worker due to administrator command")));
+ else if (IsLogicalLauncher())
+ {
+ ereport(DEBUG1,
+ (errmsg("logical replication launcher shutting down")));
+
+ /* The logical replication launcher can be stopped at any time. */
+ proc_exit(0);
+ }
else if (RecoveryConflictPending && RecoveryConflictRetryable)
{
pgstat_report_recovery_conflict(RecoveryConflictReason);
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
index d202a23..4f3e89e 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -24,4 +24,6 @@ extern void ApplyLauncherShmemInit(void);
extern void ApplyLauncherWakeupAtCommit(void);
extern void AtEOXact_ApplyLauncher(bool isCommit);
+extern bool IsLogicalLauncher(void);
+
#endif /* LOGICALLAUNCHER_H */
--
2.7.4
On 2017-06-06 19:36:13 +0200, Petr Jelinek wrote:
So the fact that we moved workers to standard interrupt handling broke
launcher in subtle ways because it still uses it's own SIGTERM handling
but some function it calls are using CHECK_FOR_INTERRUPTS (they are used
by worker as well). I think we need to move launcher to standard
interrupt handling as well.
Sounds like a good plan.
As a side note, we are starting to have several IsSomeTypeOfProcess
functions for these kind of things. I wonder if bgworker infrastructure
should somehow provide this type of stuff (the proposed bgw_type might
help there) as well as maybe being able to register interrupt handler
for the worker (it's really hard to do it via custom SIGTERM handler as
visible on worker, launcher and walsender issues we are fixing).
Obviously this is PG11 related thought.
Maybe it's also a sign that the bgworker infrastructure isn't really the
best thing to use for internal processes like parallel workers and lrep
processes - it's really built so core code *doesn't* know anything
specific about them, which isn't really what we want in some of these
cases. That's not to say that bgworkers and parallelism/lrep shouldn't
share infrastructure, don't get me wrong.
- LogicalRepCtx->launcher_pid = 0; - - /* ... and if it returns, we're done */ - ereport(DEBUG1, - (errmsg("logical replication launcher shutting down"))); + /* Not reachable */ +}
Maybe put a pg_unreachable() there?
@@ -2848,6 +2849,14 @@ ProcessInterrupts(void) ereport(FATAL, (errcode(ERRCODE_ADMIN_SHUTDOWN), errmsg("terminating logical replication worker due to administrator command"))); + else if (IsLogicalLauncher()) + { + ereport(DEBUG1, + (errmsg("logical replication launcher shutting down"))); + + /* The logical replication launcher can be stopped at any time. */ + proc_exit(0); + }
We could use PgBackendStatus->st_backendType for these, merging these
different paths.
I can take care of this one, if you/Peter want.
Greetings,
Andres Freund
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 07/06/17 03:00, Andres Freund wrote:
On 2017-06-06 19:36:13 +0200, Petr Jelinek wrote:
As a side note, we are starting to have several IsSomeTypeOfProcess
functions for these kind of things. I wonder if bgworker infrastructure
should somehow provide this type of stuff (the proposed bgw_type might
help there) as well as maybe being able to register interrupt handler
for the worker (it's really hard to do it via custom SIGTERM handler as
visible on worker, launcher and walsender issues we are fixing).
Obviously this is PG11 related thought.Maybe it's also a sign that the bgworker infrastructure isn't really the
best thing to use for internal processes like parallel workers and lrep
processes - it's really built so core code *doesn't* know anything
specific about them, which isn't really what we want in some of these
cases. That's not to say that bgworkers and parallelism/lrep shouldn't
share infrastructure, don't get me wrong.
Well the nice thing about bgworkers is that it provides the basic
infrastructure. Main reason lrep stuff is using it is that I didn't want
to add another special backend kind to 20 different places, but turns
out it still needs to be in some. So either we need to add more support
for these kind of things to bgworkers or write something for internal
workers that shares the infrastructure with bgworkers as you say.
- LogicalRepCtx->launcher_pid = 0; - - /* ... and if it returns, we're done */ - ereport(DEBUG1, - (errmsg("logical replication launcher shutting down"))); + /* Not reachable */ +}Maybe put a pg_unreachable() there?
Ah didn't know it existed.
@@ -2848,6 +2849,14 @@ ProcessInterrupts(void) ereport(FATAL, (errcode(ERRCODE_ADMIN_SHUTDOWN), errmsg("terminating logical replication worker due to administrator command"))); + else if (IsLogicalLauncher()) + { + ereport(DEBUG1, + (errmsg("logical replication launcher shutting down"))); + + /* The logical replication launcher can be stopped at any time. */ + proc_exit(0); + }We could use PgBackendStatus->st_backendType for these, merging these
different paths.
Hmm, that's not that easy, st_backendType will be generic type for
bgworker as the bgw_type patch didn't land yet (which is discussed in
yet another thread [1]/messages/by-id/0d795703-a885-2193-2331-f00d7a3a4e42@2ndquadrant.com). It seems like an argument for going forward
with it (the bgw_type patch) in PG10.
I can take care of this one, if you/Peter want.
I don't mind, it has some overlap with your proposed fixes for latching
so if you are willing go ahead.
[1]: /messages/by-id/0d795703-a885-2193-2331-f00d7a3a4e42@2ndquadrant.com
/messages/by-id/0d795703-a885-2193-2331-f00d7a3a4e42@2ndquadrant.com
--
Petr Jelinek http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 2017-06-07 11:51:12 +0200, Petr Jelinek wrote:
On 07/06/17 03:00, Andres Freund wrote:
On 2017-06-06 19:36:13 +0200, Petr Jelinek wrote:
As a side note, we are starting to have several IsSomeTypeOfProcess
functions for these kind of things. I wonder if bgworker infrastructure
should somehow provide this type of stuff (the proposed bgw_type might
help there) as well as maybe being able to register interrupt handler
for the worker (it's really hard to do it via custom SIGTERM handler as
visible on worker, launcher and walsender issues we are fixing).
Obviously this is PG11 related thought.Maybe it's also a sign that the bgworker infrastructure isn't really the
best thing to use for internal processes like parallel workers and lrep
processes - it's really built so core code *doesn't* know anything
specific about them, which isn't really what we want in some of these
cases. That's not to say that bgworkers and parallelism/lrep shouldn't
share infrastructure, don't get me wrong.Well the nice thing about bgworkers is that it provides the basic
infrastructure.
Right.
Main reason lrep stuff is using it is that I didn't want to add
another special backend kind to 20 different places, but turns out it
still needs to be in some. So either we need to add more support for
these kind of things to bgworkers or write something for internal
workers that shares the infrastructure with bgworkers as you say.
Yea, I think we should radically unify a lot of the related
infrastructure between all processes. We've grown a lot of duplication.
@@ -2848,6 +2849,14 @@ ProcessInterrupts(void) ereport(FATAL, (errcode(ERRCODE_ADMIN_SHUTDOWN), errmsg("terminating logical replication worker due to administrator command"))); + else if (IsLogicalLauncher()) + { + ereport(DEBUG1, + (errmsg("logical replication launcher shutting down"))); + + /* The logical replication launcher can be stopped at any time. */ + proc_exit(0); + }We could use PgBackendStatus->st_backendType for these, merging these
different paths.Hmm, that's not that easy, st_backendType will be generic type for
bgworker as the bgw_type patch didn't land yet (which is discussed in
yet another thread [1]). It seems like an argument for going forward
with it (the bgw_type patch) in PG10.
Yea. I left it as is for now.
I don't mind, it has some overlap with your proposed fixes for latching
so if you are willing go ahead.
Done.
Greetings,
Andres Freund
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Thu, May 25, 2017 at 01:59:51PM +0530, tushar wrote:
After applying all your patches, drop subscription no more hangs while
dropping subscription but there is an error "ERROR: tuple concurrently
updated" in the log file of
standby.---
logical replication synchronization worker finished processing
2017-05-25 09:15:52.654 BST [18575] LOG: logical replication
synchronization worker finished processing
2017-05-25 09:15:52.656 BST [18563] LOG: starting logical replication
worker for subscription "sub"
2017-05-25 09:15:52.662 BST [18577] LOG: logical replication sync for
subscription sub, table t14 started
2017-05-25 09:15:53.657 BST [18563] LOG: starting logical replication
worker for subscription "sub"
2017-05-25 09:15:53.663 BST [18579] LOG: logical replication sync for
subscription sub, table t15 started
2017-05-25 09:15:53.724 BST [18563] FATAL: terminating logical replication
worker due to administrator command
2017-05-25 09:15:53.725 BST [18521] LOG: worker process: logical
replication worker for subscription 16684 (PID 18563) exited with exit code
1
2017-05-25 09:15:54.734 BST [18579] ERROR: tuple concurrently updated
2017-05-25 09:15:54.735 BST [18577] ERROR: tuple concurrently updated
2017-05-25 09:15:54.736 BST [18521] LOG: worker process: logical
replication worker for subscription 16684 sync 16426 (PID 18579) exited with
exit code 1
2017-05-25 09:15:54.736 BST [18521] LOG: worker process: logical
replication worker for subscription 16684 sync 16423 (PID 18577) exited with
exit code 1
~
~
~Steps to reproduce -
X cluster -> create 100 tables , publish all tables (create publication pub
for all tables);
Y Cluster -> create 100 tables ,create subscription(create subscription sub
connection 'user=centos host=localhost' publication pub;
Y cluster ->drop subscription - drop subscription sub;check the log file on Y cluster.
Sometime , i have seen this error on psql prompt and drop subscription
operation got failed at first attempt.postgres=# drop subscription sub;
ERROR: tuple concurrently updated
postgres=# drop subscription sub;
NOTICE: dropped replication slot "sub" on publisher
DROP SUBSCRIPTION
[Action required within three days. This is a generic notification.]
The above-described topic is currently a PostgreSQL 10 open item. Peter,
since you committed the patch believed to have created it, you own this open
item. If some other commit is more relevant or if this does not belong as a
v10 open item, please let us know. Otherwise, please observe the policy on
open item ownership[1]/messages/by-id/20170404140717.GA2675809@tornado.leadboat.com and send a status update within three calendar days of
this message. Include a date for your subsequent status update. Testers may
discover new open items at any time, and I want to plan to get them all fixed
well in advance of shipping v10. Consequently, I will appreciate your efforts
toward speedy resolution. Thanks.
[1]: /messages/by-id/20170404140717.GA2675809@tornado.leadboat.com
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 6/13/17 02:33, Noah Misch wrote:
Steps to reproduce -
X cluster -> create 100 tables , publish all tables (create publication pub
for all tables);
Y Cluster -> create 100 tables ,create subscription(create subscription sub
connection 'user=centos host=localhost' publication pub;
Y cluster ->drop subscription - drop subscription sub;check the log file on Y cluster.
Sometime , i have seen this error on psql prompt and drop subscription
operation got failed at first attempt.postgres=# drop subscription sub;
ERROR: tuple concurrently updated
postgres=# drop subscription sub;
NOTICE: dropped replication slot "sub" on publisher
DROP SUBSCRIPTION[Action required within three days. This is a generic notification.]
It's being worked on. Let's see by Thursday.
--
Peter Eisentraut http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 13/06/17 21:49, Peter Eisentraut wrote:
On 6/13/17 02:33, Noah Misch wrote:
Steps to reproduce -
X cluster -> create 100 tables , publish all tables (create publication pub
for all tables);
Y Cluster -> create 100 tables ,create subscription(create subscription sub
connection 'user=centos host=localhost' publication pub;
Y cluster ->drop subscription - drop subscription sub;check the log file on Y cluster.
Sometime , i have seen this error on psql prompt and drop subscription
operation got failed at first attempt.postgres=# drop subscription sub;
ERROR: tuple concurrently updated
postgres=# drop subscription sub;
NOTICE: dropped replication slot "sub" on publisher
DROP SUBSCRIPTION[Action required within three days. This is a generic notification.]
It's being worked on. Let's see by Thursday.
Attached fixes it (it was mostly about order of calls). I also split the
SetSubscriptionRelState into 2 separate interface while I was changing
it, because now that the update_only bool was added it has become quite
strange to have single interface for what is basically two separate
functions.
There are still couple of remaining issues from this thread though.
Namely the AccessExclusiveLock of the pg_subscription catalog which is
not very pretty, but we need a way to block launcher from accessing the
subscription which is being dropped and make sure it will not start new
workers for it afterwards. Question is how however as by the time
launcher can lock individual subscription it is already processing it.
So it looks to me like we'd need to reread the catalog with new snapshot
after the lock was acquired which seems bit wasteful (I wonder if we
could just AcceptInvalidationMessages and refetch from syscache). Any
better ideas?
Other related problem is locking of subscriptions during operations on
them, especially AlterSubscription seems like it should lock the
subscription itself. I did that in 0002.
--
Petr Jelinek http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
Attachments:
0002-Lock-subscription-when-altering-it.patchtext/x-patch; name=0002-Lock-subscription-when-altering-it.patchDownload
From 005eeb820f4e0528513744136582c4489e2429e3 Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmodos@pjmodos.net>
Date: Wed, 14 Jun 2017 08:14:20 +0200
Subject: [PATCH 2/2] Lock subscription when altering it
---
src/backend/commands/subscriptioncmds.c | 7 +++++++
1 file changed, 7 insertions(+)
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 14c8f3f..e0ec8ea 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -642,6 +642,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
stmt->subname);
subid = HeapTupleGetOid(tup);
+
+ /*
+ * Lock the subscription so nobody else can do anything with it (including
+ * the replication workers).
+ */
+ LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
+
sub = GetSubscription(subid, false);
/* Form a new tuple. */
--
2.7.4
0001-Improve-the-pg_subscription_rel-handling.patchtext/x-patch; name=0001-Improve-the-pg_subscription_rel-handling.patchDownload
From 9011698ae800e0f45f960e91f6b16eab15634fac Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmodos@pjmodos.net>
Date: Tue, 13 Jun 2017 19:26:51 +0200
Subject: [PATCH 1/2] Improve the pg_subscription_rel handling
Split the SetSubscriptionRelState into separate Add and Update
functions, removing the unsafe upsert logic as callers are supposed to
know whether they are updating or adding new row.
Reorder the code in the above mentioned functions to avoid "tuple
updated concurrently" warnings.
---
src/backend/catalog/pg_subscription.c | 131 +++++++++++++++-------------
src/backend/commands/subscriptioncmds.c | 14 +--
src/backend/replication/logical/tablesync.c | 33 ++++---
src/include/catalog/pg_subscription_rel.h | 6 +-
4 files changed, 98 insertions(+), 86 deletions(-)
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index c5b2541..fd19675 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -225,24 +225,15 @@ textarray_to_stringlist(ArrayType *textarray)
}
/*
- * Set the state of a subscription table.
- *
- * If update_only is true and the record for given table doesn't exist, do
- * nothing. This can be used to avoid inserting a new record that was deleted
- * by someone else. Generally, subscription DDL commands should use false,
- * workers should use true.
- *
- * The insert-or-update logic in this function is not concurrency safe so it
- * might raise an error in rare circumstances. But if we took a stronger lock
- * such as ShareRowExclusiveLock, we would risk more deadlocks.
+ * Add new state record for a subscription table.
*/
Oid
-SetSubscriptionRelState(Oid subid, Oid relid, char state,
- XLogRecPtr sublsn, bool update_only)
+AddSubscriptionRelState(Oid subid, Oid relid, char state,
+ XLogRecPtr sublsn)
{
Relation rel;
HeapTuple tup;
- Oid subrelid = InvalidOid;
+ Oid subrelid;
bool nulls[Natts_pg_subscription_rel];
Datum values[Natts_pg_subscription_rel];
@@ -252,57 +243,79 @@ SetSubscriptionRelState(Oid subid, Oid relid, char state,
tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
ObjectIdGetDatum(relid),
ObjectIdGetDatum(subid));
+ if (HeapTupleIsValid(tup))
+ elog(ERROR, "subscription table %u in subscription %u already exists",
+ relid, subid);
- /*
- * If the record for given table does not exist yet create new record,
- * otherwise update the existing one.
- */
- if (!HeapTupleIsValid(tup) && !update_only)
- {
- /* Form the tuple. */
- memset(values, 0, sizeof(values));
- memset(nulls, false, sizeof(nulls));
- values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
- values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
- values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
- if (sublsn != InvalidXLogRecPtr)
- values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
- else
- nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
-
- tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
-
- /* Insert tuple into catalog. */
- subrelid = CatalogTupleInsert(rel, tup);
-
- heap_freetuple(tup);
- }
- else if (HeapTupleIsValid(tup))
- {
- bool replaces[Natts_pg_subscription_rel];
+ /* Form the tuple. */
+ memset(values, 0, sizeof(values));
+ memset(nulls, false, sizeof(nulls));
+ values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
+ values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
+ values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
+ if (sublsn != InvalidXLogRecPtr)
+ values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
+ else
+ nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
- /* Update the tuple. */
- memset(values, 0, sizeof(values));
- memset(nulls, false, sizeof(nulls));
- memset(replaces, false, sizeof(replaces));
+ tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
- replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
- values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
+ /* Insert tuple into catalog. */
+ subrelid = CatalogTupleInsert(rel, tup);
- replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
- if (sublsn != InvalidXLogRecPtr)
- values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
- else
- nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
+ heap_freetuple(tup);
- tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
- replaces);
+ /* Cleanup. */
+ heap_close(rel, NoLock);
- /* Update the catalog. */
- CatalogTupleUpdate(rel, &tup->t_self, tup);
+ return subrelid;
+}
- subrelid = HeapTupleGetOid(tup);
- }
+/*
+ * Update the state of a subscription table.
+ */
+Oid
+UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
+ XLogRecPtr sublsn)
+{
+ Relation rel;
+ HeapTuple tup;
+ Oid subrelid;
+ bool nulls[Natts_pg_subscription_rel];
+ Datum values[Natts_pg_subscription_rel];
+ bool replaces[Natts_pg_subscription_rel];
+
+ rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock);
+
+ /* Try finding existing mapping. */
+ tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
+ ObjectIdGetDatum(relid),
+ ObjectIdGetDatum(subid));
+ if (!HeapTupleIsValid(tup))
+ elog(ERROR, "subscription table %u in subscription %u does not exist",
+ relid, subid);
+
+ /* Update the tuple. */
+ memset(values, 0, sizeof(values));
+ memset(nulls, false, sizeof(nulls));
+ memset(replaces, false, sizeof(replaces));
+
+ replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
+ values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
+
+ replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
+ if (sublsn != InvalidXLogRecPtr)
+ values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
+ else
+ nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
+
+ tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+ replaces);
+
+ /* Update the catalog. */
+ CatalogTupleUpdate(rel, &tup->t_self, tup);
+
+ subrelid = HeapTupleGetOid(tup);
/* Cleanup. */
heap_close(rel, NoLock);
@@ -377,6 +390,8 @@ RemoveSubscriptionRel(Oid subid, Oid relid)
HeapTuple tup;
int nkeys = 0;
+ Assert(OidIsValid(subid) || OidIsValid(relid));
+
rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock);
if (OidIsValid(subid))
@@ -400,9 +415,7 @@ RemoveSubscriptionRel(Oid subid, Oid relid)
/* Do the search and delete what we found. */
scan = heap_beginscan_catalog(rel, nkeys, skey);
while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
- {
simple_heap_delete(rel, &tup->t_self);
- }
heap_endscan(scan);
heap_close(rel, RowExclusiveLock);
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 5aae7b6..14c8f3f 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -450,8 +450,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
CheckSubscriptionRelkind(get_rel_relkind(relid),
rv->schemaname, rv->relname);
- SetSubscriptionRelState(subid, relid, table_state,
- InvalidXLogRecPtr, false);
+ AddSubscriptionRelState(subid, relid, table_state,
+ InvalidXLogRecPtr);
}
/*
@@ -569,9 +569,9 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
if (!bsearch(&relid, subrel_local_oids,
list_length(subrel_states), sizeof(Oid), oid_cmp))
{
- SetSubscriptionRelState(sub->oid, relid,
+ AddSubscriptionRelState(sub->oid, relid,
copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
- InvalidXLogRecPtr, false);
+ InvalidXLogRecPtr);
ereport(NOTICE,
(errmsg("added subscription for table %s.%s",
quote_identifier(rv->schemaname),
@@ -906,15 +906,15 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
ReleaseSysCache(tup);
+ /* Kill the apply worker associated with the subscription. */
+ logicalrep_worker_stop(subid, InvalidOid);
+
/* Clean up dependencies */
deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
/* Remove any associated relation synchronization states. */
RemoveSubscriptionRel(subid, InvalidOid);
- /* Kill the apply worker so that the slot becomes accessible. */
- logicalrep_worker_stop(subid, InvalidOid);
-
/* Remove the origin tracking if exists. */
snprintf(originname, sizeof(originname), "pg_%u", subid);
originid = replorigin_by_name(originname, true);
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 3ff08bf..28accda 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -285,11 +285,10 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
SpinLockRelease(&MyLogicalRepWorker->relmutex);
- SetSubscriptionRelState(MyLogicalRepWorker->subid,
- MyLogicalRepWorker->relid,
- MyLogicalRepWorker->relstate,
- MyLogicalRepWorker->relstate_lsn,
- true);
+ UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+ MyLogicalRepWorker->relid,
+ MyLogicalRepWorker->relstate,
+ MyLogicalRepWorker->relstate_lsn);
walrcv_endstreaming(wrconn, &tli);
finish_sync_worker();
@@ -414,9 +413,9 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
StartTransactionCommand();
started_tx = true;
}
- SetSubscriptionRelState(MyLogicalRepWorker->subid,
- rstate->relid, rstate->state,
- rstate->lsn, true);
+ UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+ rstate->relid, rstate->state,
+ rstate->lsn);
}
}
else
@@ -844,11 +843,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
/* Update the state and make it visible to others. */
StartTransactionCommand();
- SetSubscriptionRelState(MyLogicalRepWorker->subid,
- MyLogicalRepWorker->relid,
- MyLogicalRepWorker->relstate,
- MyLogicalRepWorker->relstate_lsn,
- true);
+ UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+ MyLogicalRepWorker->relid,
+ MyLogicalRepWorker->relstate,
+ MyLogicalRepWorker->relstate_lsn);
CommitTransactionCommand();
pgstat_report_stat(false);
@@ -933,11 +931,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* Update the new state in catalog. No need to bother
* with the shmem state as we are exiting for good.
*/
- SetSubscriptionRelState(MyLogicalRepWorker->subid,
- MyLogicalRepWorker->relid,
- SUBREL_STATE_SYNCDONE,
- *origin_startpos,
- true);
+ UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+ MyLogicalRepWorker->relid,
+ SUBREL_STATE_SYNCDONE,
+ *origin_startpos);
finish_sync_worker();
}
break;
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index f5f6191..e6a2dd5 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -70,8 +70,10 @@ typedef struct SubscriptionRelState
char state;
} SubscriptionRelState;
-extern Oid SetSubscriptionRelState(Oid subid, Oid relid, char state,
- XLogRecPtr sublsn, bool update_only);
+extern Oid AddSubscriptionRelState(Oid subid, Oid relid, char state,
+ XLogRecPtr sublsn);
+extern Oid UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
+ XLogRecPtr sublsn);
extern char GetSubscriptionRelState(Oid subid, Oid relid,
XLogRecPtr *sublsn, bool missing_ok);
extern void RemoveSubscriptionRel(Oid subid, Oid relid);
--
2.7.4
On Thu, Jun 15, 2017 at 7:35 AM, Petr Jelinek
<petr.jelinek@2ndquadrant.com> wrote:
On 13/06/17 21:49, Peter Eisentraut wrote:
On 6/13/17 02:33, Noah Misch wrote:
Steps to reproduce -
X cluster -> create 100 tables , publish all tables (create publication pub
for all tables);
Y Cluster -> create 100 tables ,create subscription(create subscription sub
connection 'user=centos host=localhost' publication pub;
Y cluster ->drop subscription - drop subscription sub;check the log file on Y cluster.
Sometime , i have seen this error on psql prompt and drop subscription
operation got failed at first attempt.postgres=# drop subscription sub;
ERROR: tuple concurrently updated
postgres=# drop subscription sub;
NOTICE: dropped replication slot "sub" on publisher
DROP SUBSCRIPTION[Action required within three days. This is a generic notification.]
It's being worked on. Let's see by Thursday.
Attached fixes it (it was mostly about order of calls). I also split the
SetSubscriptionRelState into 2 separate interface while I was changing
it, because now that the update_only bool was added it has become quite
strange to have single interface for what is basically two separate
functions.There are still couple of remaining issues from this thread though.
Namely the AccessExclusiveLock of the pg_subscription catalog which is
not very pretty, but we need a way to block launcher from accessing the
subscription which is being dropped and make sure it will not start new
workers for it afterwards. Question is how however as by the time
launcher can lock individual subscription it is already processing it.
So it looks to me like we'd need to reread the catalog with new snapshot
after the lock was acquired which seems bit wasteful (I wonder if we
could just AcceptInvalidationMessages and refetch from syscache). Any
better ideas?Other related problem is locking of subscriptions during operations on
them, especially AlterSubscription seems like it should lock the
subscription itself. I did that in 0002.
Thank you for the patch! Sorry I don't have a time for it today but
I'll review these patches tomorrow.
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 6/14/17 18:35, Petr Jelinek wrote:
Attached fixes it (it was mostly about order of calls).
So do I understand this right that the actual fix is just moving up the
logicalrep_worker_stop() call in DropSubscription().
I also split the
SetSubscriptionRelState into 2 separate interface while I was changing
it, because now that the update_only bool was added it has become quite
strange to have single interface for what is basically two separate
functions.
makes sense
Other related problem is locking of subscriptions during operations on
them, especially AlterSubscription seems like it should lock the
subscription itself. I did that in 0002.
More detail here please. AlterSubscription() does locking via
heap_open(). This introduces a new locking method. What are the
implications?
--
Peter Eisentraut http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 15/06/17 17:53, Peter Eisentraut wrote:
On 6/14/17 18:35, Petr Jelinek wrote:
Attached fixes it (it was mostly about order of calls).
So do I understand this right that the actual fix is just moving up the
logicalrep_worker_stop() call in DropSubscription().
No the fix is heap_open before SearchSysCache().
I also split the
SetSubscriptionRelState into 2 separate interface while I was changing
it, because now that the update_only bool was added it has become quite
strange to have single interface for what is basically two separate
functions.makes sense
Other related problem is locking of subscriptions during operations on
them, especially AlterSubscription seems like it should lock the
subscription itself. I did that in 0002.More detail here please. AlterSubscription() does locking via
heap_open(). This introduces a new locking method. What are the
implications?
I don't think heap_open will be enough once we remove the
AccessExclusiveLock of the catalog in DropSubscription because
concurrent AlterSubscription might happily add tables to the
subscription that has been dropped if we don't lock it. But you made me
realize that even my patch is not enough because we then reread the
subscription info only from syscache without any kind of invalidation
attempt.
--
Petr Jelinek http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 6/15/17 12:22, Petr Jelinek wrote:
On 15/06/17 17:53, Peter Eisentraut wrote:
On 6/14/17 18:35, Petr Jelinek wrote:
Attached fixes it (it was mostly about order of calls).
So do I understand this right that the actual fix is just moving up the
logicalrep_worker_stop() call in DropSubscription().No the fix is heap_open before SearchSysCache().
Right. Is there a reason for moving the _stop() call then?
--
Peter Eisentraut http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 15/06/17 18:36, Peter Eisentraut wrote:
On 6/15/17 12:22, Petr Jelinek wrote:
On 15/06/17 17:53, Peter Eisentraut wrote:
On 6/14/17 18:35, Petr Jelinek wrote:
Attached fixes it (it was mostly about order of calls).
So do I understand this right that the actual fix is just moving up the
logicalrep_worker_stop() call in DropSubscription().No the fix is heap_open before SearchSysCache().
Right. Is there a reason for moving the _stop() call then?
Nothing specific, just felt it's better there when I was messing with
the function.
--
Petr Jelinek http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 6/15/17 12:22, Petr Jelinek wrote:
On 15/06/17 17:53, Peter Eisentraut wrote:
On 6/14/17 18:35, Petr Jelinek wrote:
Attached fixes it (it was mostly about order of calls).
So do I understand this right that the actual fix is just moving up the
logicalrep_worker_stop() call in DropSubscription().No the fix is heap_open before SearchSysCache().
The existing code already does that.
--
Peter Eisentraut http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 6/13/17 15:49, Peter Eisentraut wrote:
On 6/13/17 02:33, Noah Misch wrote:
Steps to reproduce -
X cluster -> create 100 tables , publish all tables (create publication pub
for all tables);
Y Cluster -> create 100 tables ,create subscription(create subscription sub
connection 'user=centos host=localhost' publication pub;
Y cluster ->drop subscription - drop subscription sub;check the log file on Y cluster.
Sometime , i have seen this error on psql prompt and drop subscription
operation got failed at first attempt.postgres=# drop subscription sub;
ERROR: tuple concurrently updated
postgres=# drop subscription sub;
NOTICE: dropped replication slot "sub" on publisher
DROP SUBSCRIPTION[Action required within three days. This is a generic notification.]
It's being worked on. Let's see by Thursday.
A patch has been posted, and it's being reviewed. Next update Monday.
--
Peter Eisentraut http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Thu, Jun 15, 2017 at 10:22 AM, Masahiko Sawada <sawada.mshk@gmail.com> wrote:
On Thu, Jun 15, 2017 at 7:35 AM, Petr Jelinek
<petr.jelinek@2ndquadrant.com> wrote:On 13/06/17 21:49, Peter Eisentraut wrote:
On 6/13/17 02:33, Noah Misch wrote:
Steps to reproduce -
X cluster -> create 100 tables , publish all tables (create publication pub
for all tables);
Y Cluster -> create 100 tables ,create subscription(create subscription sub
connection 'user=centos host=localhost' publication pub;
Y cluster ->drop subscription - drop subscription sub;check the log file on Y cluster.
Sometime , i have seen this error on psql prompt and drop subscription
operation got failed at first attempt.postgres=# drop subscription sub;
ERROR: tuple concurrently updated
postgres=# drop subscription sub;
NOTICE: dropped replication slot "sub" on publisher
DROP SUBSCRIPTION[Action required within three days. This is a generic notification.]
It's being worked on. Let's see by Thursday.
I've reviewed these patches. 0001 patch conflicts with commit
a571c7f661a7b601aafcb12196d004cdb8b8cb23.
Attached fixes it (it was mostly about order of calls). I also split the
SetSubscriptionRelState into 2 separate interface while I was changing
it, because now that the update_only bool was added it has become quite
strange to have single interface for what is basically two separate
functions.
+1 from me, too.
A subscription relation state may have been removed already when we
try to update it. SetSubscriptionRelState didn't emit an error in such
case but with this patch we end up with an error. Since we shouldn't
ignore such error in UpdateSubscriptionRelState I'd say we can let the
user know about that possibility in the error message.
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 6/16/17 04:16, Masahiko Sawada wrote:
A subscription relation state may have been removed already when we
try to update it. SetSubscriptionRelState didn't emit an error in such
case but with this patch we end up with an error. Since we shouldn't
ignore such error in UpdateSubscriptionRelState I'd say we can let the
user know about that possibility in the error message.
So are you saying it's good to have the error message?
--
Peter Eisentraut http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 6/1/17 13:37, Petr Jelinek wrote:
On 01/06/17 17:32, Masahiko Sawada wrote:
On Thu, May 25, 2017 at 5:29 PM, tushar <tushar.ahuja@enterprisedb.com> wrote:
After applying all your patches, drop subscription no more hangs while
dropping subscription but there is an error "ERROR: tuple concurrently
updated" in the log file of
standby.I tried to reproduce this issue with latest four patches I submit but
it didn't happen. I guess this issue doesn't related to the issues
reported on this thread and another factor might be cause of this
issue. It would be good to test the same again with latest four
patches or after solved current some open items.That's because your additional patch kills the workers that do the
concurrent update. While that's probably okay, I still plan to look into
making the subscription and state locking more robust.
It seems we have gotten off track here a bit. What is the current
proposal to fix "tuple concurrently updated" during DROP SUBSCRiPTION?
It seems to me we could just take a stronger lock around
RemoveSubscriptionRel(), so that workers can't write in there concurrently.
--
Peter Eisentraut http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Tue, Jun 20, 2017 at 10:47 AM, Peter Eisentraut
<peter.eisentraut@2ndquadrant.com> wrote:
On 6/16/17 04:16, Masahiko Sawada wrote:
A subscription relation state may have been removed already when we
try to update it. SetSubscriptionRelState didn't emit an error in such
case but with this patch we end up with an error. Since we shouldn't
ignore such error in UpdateSubscriptionRelState I'd say we can let the
user know about that possibility in the error message.So are you saying it's good to have the error message?
Yes. UpdateSubscriptionRelState failure means that the subscription
relation state has disappeared or also means something wrong. So I
think it's good to have it as perhaps errdetail. Thought?
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Tue, Jun 20, 2017 at 10:55 AM, Peter Eisentraut
<peter.eisentraut@2ndquadrant.com> wrote:
On 6/1/17 13:37, Petr Jelinek wrote:
On 01/06/17 17:32, Masahiko Sawada wrote:
On Thu, May 25, 2017 at 5:29 PM, tushar <tushar.ahuja@enterprisedb.com> wrote:
After applying all your patches, drop subscription no more hangs while
dropping subscription but there is an error "ERROR: tuple concurrently
updated" in the log file of
standby.I tried to reproduce this issue with latest four patches I submit but
it didn't happen. I guess this issue doesn't related to the issues
reported on this thread and another factor might be cause of this
issue. It would be good to test the same again with latest four
patches or after solved current some open items.That's because your additional patch kills the workers that do the
concurrent update. While that's probably okay, I still plan to look into
making the subscription and state locking more robust.It seems we have gotten off track here a bit. What is the current
proposal to fix "tuple concurrently updated" during DROP SUBSCRiPTION?
I think there is no proposal for it so far. The current proposal is to
fix this problem during ALTER SUBSCRIPTION.
It seems to me we could just take a stronger lock around
RemoveSubscriptionRel(), so that workers can't write in there concurrently.
Since we reduced the lock level of updating pg_subscription_rel by
commit 521fd4795e3e the same deadlock issue will appear if we just
take a stronger lock level.
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 6/19/17 22:54, Masahiko Sawada wrote:
It seems to me we could just take a stronger lock around
RemoveSubscriptionRel(), so that workers can't write in there concurrently.Since we reduced the lock level of updating pg_subscription_rel by
commit 521fd4795e3e the same deadlock issue will appear if we just
take a stronger lock level.
I was thinking about a more refined approach, like in the attached
patch. It just changes the locking when in DropSubscription(), so that
that doesn't fail if workers are doing stuff concurrently. Everything
else stays the same.
--
Peter Eisentraut http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
Attachments:
0001-WIP-Parametrize-locking-in-RemoveSubscriptionRel.patchtext/plain; charset=UTF-8; name=0001-WIP-Parametrize-locking-in-RemoveSubscriptionRel.patch; x-mac-creator=0; x-mac-type=0Download
From c101b6da52dd4f9f041390a937b337f20d212e5c Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <peter_e@gmx.net>
Date: Tue, 20 Jun 2017 19:06:42 -0400
Subject: [PATCH] WIP Parametrize locking in RemoveSubscriptionRel
---
src/backend/catalog/heap.c | 2 +-
src/backend/catalog/pg_subscription.c | 6 +++---
src/backend/commands/subscriptioncmds.c | 4 ++--
src/include/catalog/pg_subscription_rel.h | 2 +-
4 files changed, 7 insertions(+), 7 deletions(-)
diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c
index 4e5b79ef94..643e4bb1d5 100644
--- a/src/backend/catalog/heap.c
+++ b/src/backend/catalog/heap.c
@@ -1844,7 +1844,7 @@ heap_drop_with_catalog(Oid relid)
/*
* Remove any associated relation synchronization states.
*/
- RemoveSubscriptionRel(InvalidOid, relid);
+ RemoveSubscriptionRel(InvalidOid, relid, RowExclusiveLock);
/*
* Forget any ON COMMIT action for the rel
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index c69c461b62..88b81920c8 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -369,7 +369,7 @@ GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn,
* subscription, or for a particular relation, or both.
*/
void
-RemoveSubscriptionRel(Oid subid, Oid relid)
+RemoveSubscriptionRel(Oid subid, Oid relid, LOCKMODE lockmode)
{
Relation rel;
HeapScanDesc scan;
@@ -377,7 +377,7 @@ RemoveSubscriptionRel(Oid subid, Oid relid)
HeapTuple tup;
int nkeys = 0;
- rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock);
+ rel = heap_open(SubscriptionRelRelationId, lockmode);
if (OidIsValid(subid))
{
@@ -405,7 +405,7 @@ RemoveSubscriptionRel(Oid subid, Oid relid)
}
heap_endscan(scan);
- heap_close(rel, RowExclusiveLock);
+ heap_close(rel, lockmode);
}
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 5aae7b6f91..02cb7c47b8 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -595,7 +595,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
{
char *namespace;
- RemoveSubscriptionRel(sub->oid, relid);
+ RemoveSubscriptionRel(sub->oid, relid, RowExclusiveLock);
logicalrep_worker_stop(sub->oid, relid);
@@ -910,7 +910,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
/* Remove any associated relation synchronization states. */
- RemoveSubscriptionRel(subid, InvalidOid);
+ RemoveSubscriptionRel(subid, InvalidOid, ExclusiveLock);
/* Kill the apply worker so that the slot becomes accessible. */
logicalrep_worker_stop(subid, InvalidOid);
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index f5f6191676..b2cadb4b13 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -74,7 +74,7 @@ extern Oid SetSubscriptionRelState(Oid subid, Oid relid, char state,
XLogRecPtr sublsn, bool update_only);
extern char GetSubscriptionRelState(Oid subid, Oid relid,
XLogRecPtr *sublsn, bool missing_ok);
-extern void RemoveSubscriptionRel(Oid subid, Oid relid);
+extern void RemoveSubscriptionRel(Oid subid, Oid relid, LOCKMODE lockmode);
extern List *GetSubscriptionRelations(Oid subid);
extern List *GetSubscriptionNotReadyRelations(Oid subid);
--
2.13.1
On Thu, Jun 15, 2017 at 11:40:52PM -0400, Peter Eisentraut wrote:
On 6/13/17 15:49, Peter Eisentraut wrote:
On 6/13/17 02:33, Noah Misch wrote:
Steps to reproduce -
X cluster -> create 100 tables , publish all tables (create publication pub
for all tables);
Y Cluster -> create 100 tables ,create subscription(create subscription sub
connection 'user=centos host=localhost' publication pub;
Y cluster ->drop subscription - drop subscription sub;check the log file on Y cluster.
Sometime , i have seen this error on psql prompt and drop subscription
operation got failed at first attempt.postgres=# drop subscription sub;
ERROR: tuple concurrently updated
postgres=# drop subscription sub;
NOTICE: dropped replication slot "sub" on publisher
DROP SUBSCRIPTION[Action required within three days. This is a generic notification.]
It's being worked on. Let's see by Thursday.
A patch has been posted, and it's being reviewed. Next update Monday.
This PostgreSQL 10 open item is past due for your status update. Kindly send
a status update within 24 hours, and include a date for your subsequent status
update. Refer to the policy on open item ownership:
/messages/by-id/20170404140717.GA2675809@tornado.leadboat.com
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 6/20/17 19:10, Peter Eisentraut wrote:
On 6/19/17 22:54, Masahiko Sawada wrote:
It seems to me we could just take a stronger lock around
RemoveSubscriptionRel(), so that workers can't write in there concurrently.Since we reduced the lock level of updating pg_subscription_rel by
commit 521fd4795e3e the same deadlock issue will appear if we just
take a stronger lock level.I was thinking about a more refined approach, like in the attached
patch. It just changes the locking when in DropSubscription(), so that
that doesn't fail if workers are doing stuff concurrently. Everything
else stays the same.
The alternative is that we use the LockSharedObject() approach that was
already alluded to, like in the attached patch. Both approaches would
work equally fine AFAICT.
--
Peter Eisentraut http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
Attachments:
0001-WIP-Add-locking-SetSubscriptionRelState.patchtext/plain; charset=UTF-8; name=0001-WIP-Add-locking-SetSubscriptionRelState.patch; x-mac-creator=0; x-mac-type=0Download
From 10afa9807014e14596cb05d70ba302c86bf30dd3 Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <peter_e@gmx.net>
Date: Wed, 21 Jun 2017 22:40:22 -0400
Subject: [PATCH] WIP Add locking SetSubscriptionRelState()
---
src/backend/catalog/pg_subscription.c | 4 ++++
1 file changed, 4 insertions(+)
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index c69c461b62..f794d4c526 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -28,6 +28,8 @@
#include "nodes/makefuncs.h"
+#include "storage/lmgr.h"
+
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
@@ -246,6 +248,8 @@ SetSubscriptionRelState(Oid subid, Oid relid, char state,
bool nulls[Natts_pg_subscription_rel];
Datum values[Natts_pg_subscription_rel];
+ LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
+
rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock);
/* Try finding existing mapping. */
--
2.13.1
On 6/19/17 22:41, Masahiko Sawada wrote:
On Tue, Jun 20, 2017 at 10:47 AM, Peter Eisentraut
<peter.eisentraut@2ndquadrant.com> wrote:On 6/16/17 04:16, Masahiko Sawada wrote:
A subscription relation state may have been removed already when we
try to update it. SetSubscriptionRelState didn't emit an error in such
case but with this patch we end up with an error. Since we shouldn't
ignore such error in UpdateSubscriptionRelState I'd say we can let the
user know about that possibility in the error message.So are you saying it's good to have the error message?
Yes. UpdateSubscriptionRelState failure means that the subscription
relation state has disappeared or also means something wrong. So I
think it's good to have it as perhaps errdetail. Thought?
I think this could lead to errors in the tablesync workers if they are
trying to write while the entries have already been deleted as part of
the subscription or the table being deleted.
--
Peter Eisentraut http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 6/20/17 22:44, Noah Misch wrote:
A patch has been posted, and it's being reviewed. Next update Monday.
This PostgreSQL 10 open item is past due for your status update. Kindly send
a status update within 24 hours, and include a date for your subsequent status
update. Refer to the policy on open item ownership:
/messages/by-id/20170404140717.GA2675809@tornado.leadboat.com
I'm not sure how to proceed here. Nobody is else seems terribly
interested, and this is really a minor issue, so I don't want to muck
around with the locking endlessly. Let's say, if there are no new
insights by Friday, I'll pick one of the proposed patches or just close
it without any patch.
--
Peter Eisentraut http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Hello,
At Wed, 21 Jun 2017 22:43:32 -0400, Peter Eisentraut <peter.eisentraut@2ndquadrant.com> wrote in <501f75c9-c5d6-d023-add0-3b670ac86de2@2ndquadrant.com>
On 6/20/17 19:10, Peter Eisentraut wrote:
On 6/19/17 22:54, Masahiko Sawada wrote:
It seems to me we could just take a stronger lock around
RemoveSubscriptionRel(), so that workers can't write in there concurrently.Since we reduced the lock level of updating pg_subscription_rel by
commit 521fd4795e3e the same deadlock issue will appear if we just
take a stronger lock level.I was thinking about a more refined approach, like in the attached
patch. It just changes the locking when in DropSubscription(), so that
that doesn't fail if workers are doing stuff concurrently. Everything
else stays the same.The alternative is that we use the LockSharedObject() approach that was
already alluded to, like in the attached patch. Both approaches would
work equally fine AFAICT.
However I haven't seen this deeply, just making
SetSubscriptionRelState exlusive seems to have a chance to create
a false record on pg_subscription_rel. Am I misunderstanding?
--
Kyotaro Horiguchi
NTT Open Source Software Center
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Wed, Jun 21, 2017 at 8:10 AM, Peter Eisentraut
<peter.eisentraut@2ndquadrant.com> wrote:
On 6/19/17 22:54, Masahiko Sawada wrote:
It seems to me we could just take a stronger lock around
RemoveSubscriptionRel(), so that workers can't write in there concurrently.Since we reduced the lock level of updating pg_subscription_rel by
commit 521fd4795e3e the same deadlock issue will appear if we just
take a stronger lock level.I was thinking about a more refined approach, like in the attached
patch. It just changes the locking when in DropSubscription(), so that
that doesn't fail if workers are doing stuff concurrently. Everything
else stays the same.
Thank you for the patch. Some comment and question.
DropSubscriptionRelState requests ExclusiveLock but why is it not
ShareRowExclusiveLock?
I test DROP SUBSCIPTION case but even with this patch, "tuple
concurrently updated" is still occurred.
@@ -405,7 +405,7 @@ RemoveSubscriptionRel(Oid subid, Oid relid)
}
heap_endscan(scan);
- heap_close(rel, RowExclusiveLock);
+ heap_close(rel, lockmode);
}
I think we should not release the table lock here, should be
heap_close(rel, NoLock) instead? After changed it so on my
environment, DROP SUBSCRIPTION seems to work fine.
Also, ALTER SUBSCRIPTION SET PUBLICATION seems the same. Even with
Petr's patch, the same error still occurred during ALTER SUBSCRIPTION
SET PUBLICATION. Currently it acquires RowExclusiveLock on
pg_subscription_rel but as long as both DDL and table sync worker
could modify the same record on pg_subscription this error can
happen. On the other hand if we take a strong lock on pg_subscription
during DDL the deadlock risk will be increased.
One solution I came up with is that we use other than
simple_heap_update/delete so that we accept the return value
HeapTupleUpdated of heap_update/delete. It makes pg_subscription_rel
something like heap table rather than system catalog, though.
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 6/22/17 03:26, Masahiko Sawada wrote:
Thank you for the patch. Some comment and question.
DropSubscriptionRelState requests ExclusiveLock but why is it not
ShareRowExclusiveLock?
fixed
I test DROP SUBSCIPTION case but even with this patch, "tuple
concurrently updated" is still occurred.@@ -405,7 +405,7 @@ RemoveSubscriptionRel(Oid subid, Oid relid)
}
heap_endscan(scan);- heap_close(rel, RowExclusiveLock); + heap_close(rel, lockmode); }I think we should not release the table lock here, should be
heap_close(rel, NoLock) instead? After changed it so on my
environment, DROP SUBSCRIPTION seems to work fine.
fixed
Also, ALTER SUBSCRIPTION SET PUBLICATION seems the same. Even with
Petr's patch, the same error still occurred during ALTER SUBSCRIPTION
SET PUBLICATION. Currently it acquires RowExclusiveLock on
pg_subscription_rel but as long as both DDL and table sync worker
could modify the same record on pg_subscription this error can
happen.
fixed
On the other hand if we take a strong lock on pg_subscription
during DDL the deadlock risk will be increased.
The original problem was that DROP TABLE locked things in the order 1)
user table, 2) pg_subscription_rel, whereas a full-database VACUUM would
lock things in the opposite order. I think this was a pretty wide
window if you have many tables. In this case, however, we are only
dealing with pg_subscription and pg_subscription_rel, and I think even
VACUUM would always processes them in the same order.
Could please try the attached patch so see if it addresses your test
scenarios?
--
Peter Eisentraut http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
Attachments:
v2-0001-Parametrize-locking-in-RemoveSubscriptionRel.patchtext/plain; charset=UTF-8; name=v2-0001-Parametrize-locking-in-RemoveSubscriptionRel.patch; x-mac-creator=0; x-mac-type=0Download
From fc50b9e287e8c3c29be21d82c07c39023cbf3882 Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <peter_e@gmx.net>
Date: Fri, 23 Jun 2017 21:26:46 -0400
Subject: [PATCH v2] Parametrize locking in RemoveSubscriptionRel
Revising the change in 521fd4795e3ec3d0b263b62e5eb58e1557be9c86, we need
to take a stronger lock when removing entries from pg_subscription_rel
when ALTER/DROP SUBSCRIPTION. Otherwise, users might see "tuple
concurrently updated" when table sync workers make updates at the same
time as DDL commands are run. For DROP TABLE, we keep the lower lock
level that was put in place by the above commit, so that DROP TABLE does
not deadlock with whole-database VACUUM.
---
src/backend/catalog/heap.c | 2 +-
src/backend/catalog/pg_subscription.c | 6 +++---
src/backend/commands/subscriptioncmds.c | 4 ++--
src/include/catalog/pg_subscription_rel.h | 2 +-
4 files changed, 7 insertions(+), 7 deletions(-)
diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c
index a376b99f1e..8a96bbece6 100644
--- a/src/backend/catalog/heap.c
+++ b/src/backend/catalog/heap.c
@@ -1844,7 +1844,7 @@ heap_drop_with_catalog(Oid relid)
/*
* Remove any associated relation synchronization states.
*/
- RemoveSubscriptionRel(InvalidOid, relid);
+ RemoveSubscriptionRel(InvalidOid, relid, RowExclusiveLock);
/*
* Forget any ON COMMIT action for the rel
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index c69c461b62..1d80191ca8 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -369,7 +369,7 @@ GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn,
* subscription, or for a particular relation, or both.
*/
void
-RemoveSubscriptionRel(Oid subid, Oid relid)
+RemoveSubscriptionRel(Oid subid, Oid relid, LOCKMODE lockmode)
{
Relation rel;
HeapScanDesc scan;
@@ -377,7 +377,7 @@ RemoveSubscriptionRel(Oid subid, Oid relid)
HeapTuple tup;
int nkeys = 0;
- rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock);
+ rel = heap_open(SubscriptionRelRelationId, lockmode);
if (OidIsValid(subid))
{
@@ -405,7 +405,7 @@ RemoveSubscriptionRel(Oid subid, Oid relid)
}
heap_endscan(scan);
- heap_close(rel, RowExclusiveLock);
+ heap_close(rel, NoLock);
}
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 9cbd36f646..57954c978b 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -595,7 +595,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
{
char *namespace;
- RemoveSubscriptionRel(sub->oid, relid);
+ RemoveSubscriptionRel(sub->oid, relid, ShareRowExclusiveLock);
logicalrep_worker_stop(sub->oid, relid);
@@ -910,7 +910,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
/* Remove any associated relation synchronization states. */
- RemoveSubscriptionRel(subid, InvalidOid);
+ RemoveSubscriptionRel(subid, InvalidOid, ShareRowExclusiveLock);
/* Kill the apply worker so that the slot becomes accessible. */
logicalrep_worker_stop(subid, InvalidOid);
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index 991ca9d552..a1f1db08d0 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -74,7 +74,7 @@ extern Oid SetSubscriptionRelState(Oid subid, Oid relid, char state,
XLogRecPtr sublsn, bool update_only);
extern char GetSubscriptionRelState(Oid subid, Oid relid,
XLogRecPtr *sublsn, bool missing_ok);
-extern void RemoveSubscriptionRel(Oid subid, Oid relid);
+extern void RemoveSubscriptionRel(Oid subid, Oid relid, LOCKMODE lockmode);
extern List *GetSubscriptionRelations(Oid subid);
extern List *GetSubscriptionNotReadyRelations(Oid subid);
--
2.13.1
On 6/21/17 22:47, Peter Eisentraut wrote:
On 6/20/17 22:44, Noah Misch wrote:
A patch has been posted, and it's being reviewed. Next update Monday.
This PostgreSQL 10 open item is past due for your status update. Kindly send
a status update within 24 hours, and include a date for your subsequent status
update. Refer to the policy on open item ownership:
/messages/by-id/20170404140717.GA2675809@tornado.leadboat.comI'm not sure how to proceed here. Nobody is else seems terribly
interested, and this is really a minor issue, so I don't want to muck
around with the locking endlessly. Let's say, if there are no new
insights by Friday, I'll pick one of the proposed patches or just close
it without any patch.
After some comments, a new patch has been posted, and I'll give until
Monday for review.
--
Peter Eisentraut http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
(was away for a while, got some time now for this again)
On 22/06/17 04:43, Peter Eisentraut wrote:
The alternative is that we use the LockSharedObject() approach that was
already alluded to, like in the attached patch. Both approaches would
work equally fine AFAICT.
I agree, but I think we need bigger overhaul of the locking/management
in general. So here is patch which does much more changes.
The patch does several important things (in no particular order):
- Split SetSubscriptionRelState into AddSubscriptionRelState and
UpdateSubscriptionRelState for the reasons said upstream, it's cleaner,
there is no half-broken upsert logic and it has proper error checking
for each action.
- Do LockSharedObject in ALTER SUBSCRIPTION, DROP SUBSCRIPTION (this one
is preexisting but mentioning it for context), SetSubscriptionRelState,
AddSubscriptionRelState, and in the logicalrep_worker_launch. This means
we use granular per object locks to deal with concurrency.
- Because of above, the AccessExclusiveLock on pg_subscription is no
longer needed, just normal RowExlusiveLock is used now.
- logicalrep_worker_stop is also simplified due to the proper locking
- There is new interface logicalrep_worker_stop_at_commit which is used
by ALTER SUBSCRIPTION ... REFRESH PUBLICATION and by transactional
variant of DROP SUBSCRIPTION to only kill workers at the end of transaction.
- Locking/reading of subscription info is unified between DROP and ALTER
SUBSCRIPTION commands.
- DROP SUBSCRIPTION will kill all workers associated with subscription,
not just apply.
- The sync worker checks during startup if the relation is still subscribed.
- The sync worker will exit when waiting for apply and apply has shut-down.
- The log messages around workers and removed or disabled subscription
are now more consistent between startup and normal runtime of the worker.
- Some code deduplication and stylistic changes/simplification in
related areas.
- Fixed catcache's IndexScanOK() handling of the subscription catalog.
It's bit bigger patch but solves issues from multiple threads around
handling of ALTER/DROP subscription.
A lot of the locking that I added is normally done transparently by
dependency handling, but subscriptions and subscription relation status
do not use that much as it was deemed to bloat pg_depend needlessly
during the original patch review (it's also probably why this has
slipped through).
--
Petr Jelinek http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
Attachments:
0001-Rework-subscription-worker-and-relation-status-handl.patchtext/x-patch; name=0001-Rework-subscription-worker-and-relation-status-handl.patchDownload
From d7038474012769c9c3b50231af76dd7796fe593f Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmodos@pjmodos.net>
Date: Sat, 24 Jun 2017 19:38:21 +0200
Subject: [PATCH] Rework subscription worker and relation status handling
---
src/backend/catalog/pg_subscription.c | 137 +++++++------
src/backend/commands/subscriptioncmds.c | 98 +++++-----
src/backend/replication/logical/launcher.c | 293 +++++++++++++++-------------
src/backend/replication/logical/tablesync.c | 97 +++++----
src/backend/replication/logical/worker.c | 23 ++-
src/backend/utils/cache/catcache.c | 6 +-
src/include/catalog/pg_subscription_rel.h | 6 +-
src/include/replication/worker_internal.h | 6 +-
8 files changed, 367 insertions(+), 299 deletions(-)
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index c69c461..b643e54 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -28,6 +28,8 @@
#include "nodes/makefuncs.h"
+#include "storage/lmgr.h"
+
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
@@ -225,84 +227,101 @@ textarray_to_stringlist(ArrayType *textarray)
}
/*
- * Set the state of a subscription table.
- *
- * If update_only is true and the record for given table doesn't exist, do
- * nothing. This can be used to avoid inserting a new record that was deleted
- * by someone else. Generally, subscription DDL commands should use false,
- * workers should use true.
- *
- * The insert-or-update logic in this function is not concurrency safe so it
- * might raise an error in rare circumstances. But if we took a stronger lock
- * such as ShareRowExclusiveLock, we would risk more deadlocks.
+ * Add new state record for a subscription table.
*/
Oid
-SetSubscriptionRelState(Oid subid, Oid relid, char state,
- XLogRecPtr sublsn, bool update_only)
+AddSubscriptionRelState(Oid subid, Oid relid, char state,
+ XLogRecPtr sublsn)
{
Relation rel;
HeapTuple tup;
- Oid subrelid = InvalidOid;
+ Oid subrelid;
bool nulls[Natts_pg_subscription_rel];
Datum values[Natts_pg_subscription_rel];
+ LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+
rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock);
/* Try finding existing mapping. */
tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
ObjectIdGetDatum(relid),
ObjectIdGetDatum(subid));
+ if (HeapTupleIsValid(tup))
+ elog(ERROR, "subscription table %u in subscription %u already exists",
+ relid, subid);
- /*
- * If the record for given table does not exist yet create new record,
- * otherwise update the existing one.
- */
- if (!HeapTupleIsValid(tup) && !update_only)
- {
- /* Form the tuple. */
- memset(values, 0, sizeof(values));
- memset(nulls, false, sizeof(nulls));
- values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
- values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
- values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
- if (sublsn != InvalidXLogRecPtr)
- values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
- else
- nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
-
- tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
-
- /* Insert tuple into catalog. */
- subrelid = CatalogTupleInsert(rel, tup);
-
- heap_freetuple(tup);
- }
- else if (HeapTupleIsValid(tup))
- {
- bool replaces[Natts_pg_subscription_rel];
+ /* Form the tuple. */
+ memset(values, 0, sizeof(values));
+ memset(nulls, false, sizeof(nulls));
+ values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
+ values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
+ values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
+ if (sublsn != InvalidXLogRecPtr)
+ values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
+ else
+ nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
- /* Update the tuple. */
- memset(values, 0, sizeof(values));
- memset(nulls, false, sizeof(nulls));
- memset(replaces, false, sizeof(replaces));
+ tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
- replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
- values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
+ /* Insert tuple into catalog. */
+ subrelid = CatalogTupleInsert(rel, tup);
- replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
- if (sublsn != InvalidXLogRecPtr)
- values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
- else
- nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
+ heap_freetuple(tup);
- tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
- replaces);
+ /* Cleanup. */
+ heap_close(rel, NoLock);
- /* Update the catalog. */
- CatalogTupleUpdate(rel, &tup->t_self, tup);
+ return subrelid;
+}
- subrelid = HeapTupleGetOid(tup);
- }
+/*
+ * Update the state of a subscription table.
+ */
+Oid
+UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
+ XLogRecPtr sublsn)
+{
+ Relation rel;
+ HeapTuple tup;
+ Oid subrelid;
+ bool nulls[Natts_pg_subscription_rel];
+ Datum values[Natts_pg_subscription_rel];
+ bool replaces[Natts_pg_subscription_rel];
+
+ LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+
+ rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock);
+
+ /* Try finding existing mapping. */
+ tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
+ ObjectIdGetDatum(relid),
+ ObjectIdGetDatum(subid));
+ if (!HeapTupleIsValid(tup))
+ elog(ERROR, "subscription table %u in subscription %u does not exist",
+ relid, subid);
+
+ /* Update the tuple. */
+ memset(values, 0, sizeof(values));
+ memset(nulls, false, sizeof(nulls));
+ memset(replaces, false, sizeof(replaces));
+
+ replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
+ values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
+
+ replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
+ if (sublsn != InvalidXLogRecPtr)
+ values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
+ else
+ nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
+
+ tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+ replaces);
+
+ /* Update the catalog. */
+ CatalogTupleUpdate(rel, &tup->t_self, tup);
+
+ subrelid = HeapTupleGetOid(tup);
/* Cleanup. */
heap_close(rel, NoLock);
@@ -377,6 +396,8 @@ RemoveSubscriptionRel(Oid subid, Oid relid)
HeapTuple tup;
int nkeys = 0;
+ Assert(OidIsValid(subid) || OidIsValid(relid));
+
rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock);
if (OidIsValid(subid))
@@ -400,9 +421,7 @@ RemoveSubscriptionRel(Oid subid, Oid relid)
/* Do the search and delete what we found. */
scan = heap_beginscan_catalog(rel, nkeys, skey);
while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
- {
CatalogTupleDelete(rel, &tup->t_self);
- }
heap_endscan(scan);
heap_close(rel, RowExclusiveLock);
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 9cbd36f..3dc1f4c 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -450,8 +450,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
CheckSubscriptionRelkind(get_rel_relkind(relid),
rv->schemaname, rv->relname);
- SetSubscriptionRelState(subid, relid, table_state,
- InvalidXLogRecPtr, false);
+ AddSubscriptionRelState(subid, relid, table_state,
+ InvalidXLogRecPtr);
}
/*
@@ -569,9 +569,9 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
if (!bsearch(&relid, subrel_local_oids,
list_length(subrel_states), sizeof(Oid), oid_cmp))
{
- SetSubscriptionRelState(sub->oid, relid,
- copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
- InvalidXLogRecPtr, false);
+ AddSubscriptionRelState(sub->oid, relid,
+ copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
+ InvalidXLogRecPtr);
ereport(NOTICE,
(errmsg("added subscription for table %s.%s",
quote_identifier(rv->schemaname),
@@ -597,7 +597,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
RemoveSubscriptionRel(sub->oid, relid);
- logicalrep_worker_stop(sub->oid, relid);
+ logicalrep_worker_stop_at_commit(sub->oid, relid);
namespace = get_namespace_name(get_rel_namespace(relid));
ereport(NOTICE,
@@ -636,14 +636,18 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
errmsg("subscription \"%s\" does not exist",
stmt->subname)));
+ subid = HeapTupleGetOid(tup);
+
/* must be owner */
- if (!pg_subscription_ownercheck(HeapTupleGetOid(tup), GetUserId()))
+ if (!pg_subscription_ownercheck(subid, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_SUBSCRIPTION,
stmt->subname);
- subid = HeapTupleGetOid(tup);
sub = GetSubscription(subid, false);
+ /* Lock the subscription so nobody else can do anything with it. */
+ LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
+
/* Form a new tuple. */
memset(values, 0, sizeof(values));
memset(nulls, false, sizeof(nulls));
@@ -811,14 +815,12 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
ObjectAddress myself;
HeapTuple tup;
Oid subid;
- Datum datum;
- bool isnull;
- char *subname;
- char *conninfo;
- char *slotname;
+ List *subworkers;
+ ListCell *lc;
char originname[NAMEDATALEN];
- char *err = NULL;
RepOriginId originid;
+ char *err = NULL;
+ Subscription *sub;
WalReceiverConn *wrconn = NULL;
StringInfoData cmd;
@@ -826,7 +828,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
* Lock pg_subscription with AccessExclusiveLock to ensure that the
* launcher doesn't restart new worker during dropping the subscription
*/
- rel = heap_open(SubscriptionRelationId, AccessExclusiveLock);
+ rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
tup = SearchSysCache2(SUBSCRIPTIONNAME, MyDatabaseId,
CStringGetDatum(stmt->subname));
@@ -858,31 +860,10 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
/* DROP hook for the subscription being removed */
InvokeObjectDropHook(SubscriptionRelationId, subid, 0);
- /*
- * Lock the subscription so nobody else can do anything with it (including
- * the replication workers).
- */
- LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
+ sub = GetSubscription(subid, false);
- /* Get subname */
- datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
- Anum_pg_subscription_subname, &isnull);
- Assert(!isnull);
- subname = pstrdup(NameStr(*DatumGetName(datum)));
-
- /* Get conninfo */
- datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
- Anum_pg_subscription_subconninfo, &isnull);
- Assert(!isnull);
- conninfo = TextDatumGetCString(datum);
-
- /* Get slotname */
- datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
- Anum_pg_subscription_subslotname, &isnull);
- if (!isnull)
- slotname = pstrdup(NameStr(*DatumGetName(datum)));
- else
- slotname = NULL;
+ /* Lock the subscription so nobody else can do anything with it. */
+ LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
/*
* Since dropping a replication slot is not transactional, the replication
@@ -894,7 +875,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
* of a subscription that is associated with a replication slot", but we
* don't have the proper facilities for that.
*/
- if (slotname)
+ if (sub->slotname)
PreventTransactionChain(isTopLevel, "DROP SUBSCRIPTION");
@@ -906,15 +887,33 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
ReleaseSysCache(tup);
+ /*
+ * If we are dropping slot, stop all the subscription workers immediately
+ * so that the slot is accessible, otherwise just shedule the stop at the
+ * end of the transaction.
+ *
+ * New workers won't be started because we hold exclusive lock on the
+ * subscription till the end of transaction.
+ */
+ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+ subworkers = logicalrep_sub_workers_find(subid, false);
+ LWLockRelease(LogicalRepWorkerLock);
+ foreach (lc, subworkers)
+ {
+ LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
+ if (sub->slotname)
+ logicalrep_worker_stop(w->subid, w->relid);
+ else
+ logicalrep_worker_stop_at_commit(w->subid, w->relid);
+ }
+ list_free(subworkers);
+
/* Clean up dependencies */
deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
/* Remove any associated relation synchronization states. */
RemoveSubscriptionRel(subid, InvalidOid);
- /* Kill the apply worker so that the slot becomes accessible. */
- logicalrep_worker_stop(subid, InvalidOid);
-
/* Remove the origin tracking if exists. */
snprintf(originname, sizeof(originname), "pg_%u", subid);
originid = replorigin_by_name(originname, true);
@@ -925,7 +924,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
* If there is no slot associated with the subscription, we can finish
* here.
*/
- if (!slotname)
+ if (!sub->slotname)
{
heap_close(rel, NoLock);
return;
@@ -938,13 +937,14 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
load_file("libpqwalreceiver", false);
initStringInfo(&cmd);
- appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s", quote_identifier(slotname));
+ appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s",
+ quote_identifier(sub->slotname));
- wrconn = walrcv_connect(conninfo, true, subname, &err);
+ wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
if (wrconn == NULL)
ereport(ERROR,
(errmsg("could not connect to publisher when attempting to "
- "drop the replication slot \"%s\"", slotname),
+ "drop the replication slot \"%s\"", sub->slotname),
errdetail("The error was: %s", err),
errhint("Use ALTER SUBSCRIPTION ... SET (slot_name = NONE) "
"to disassociate the subscription from the slot.")));
@@ -958,12 +958,12 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
if (res->status != WALRCV_OK_COMMAND)
ereport(ERROR,
(errmsg("could not drop the replication slot \"%s\" on publisher",
- slotname),
+ sub->slotname),
errdetail("The error was: %s", res->err)));
else
ereport(NOTICE,
(errmsg("dropped replication slot \"%s\" on publisher",
- slotname)));
+ sub->slotname)));
walrcv_clear_result(res);
}
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 86a2b14..410ad18 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -42,12 +42,14 @@
#include "replication/worker_internal.h"
#include "storage/ipc.h"
+#include "storage/lmgr.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/procsignal.h"
#include "tcop/tcopprot.h"
+#include "utils/inval.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/ps_status.h"
@@ -73,6 +75,14 @@ typedef struct LogicalRepCtxStruct
LogicalRepCtxStruct *LogicalRepCtx;
+typedef struct LogicalRepWorkerId
+{
+ Oid subid;
+ Oid relid;
+} LogicalRepWorkerId;
+
+static List *on_commit_stop_workers = NIL;
+
static void ApplyLauncherWakeup(void);
static void logicalrep_launcher_onexit(int code, Datum arg);
static void logicalrep_worker_onexit(int code, Datum arg);
@@ -86,12 +96,11 @@ static bool on_commit_launcher_wakeup = false;
Datum pg_stat_get_subscription(PG_FUNCTION_ARGS);
-
/*
* Load the list of subscriptions.
*
- * Only the fields interesting for worker start/stop functions are filled for
- * each subscription.
+ * Only the fields interesting for worker start are filled for each
+ * subscription.
*/
static List *
get_subscription_list(void)
@@ -100,19 +109,13 @@ get_subscription_list(void)
Relation rel;
HeapScanDesc scan;
HeapTuple tup;
- MemoryContext resultcxt;
-
- /* This is the context that we will allocate our output data in */
- resultcxt = CurrentMemoryContext;
/*
- * Start a transaction so we can access pg_database, and get a snapshot.
* We don't have a use for the snapshot itself, but we're interested in
* the secondary effect that it sets RecentGlobalXmin. (This is critical
* for anything that reads heap pages, because HOT may decide to prune
* them even if the process doesn't attempt to modify any tuples.)
*/
- StartTransactionCommand();
(void) GetTransactionSnapshot();
rel = heap_open(SubscriptionRelationId, AccessShareLock);
@@ -121,34 +124,17 @@ get_subscription_list(void)
while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
{
Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
- Subscription *sub;
- MemoryContext oldcxt;
- /*
- * Allocate our results in the caller's context, not the
- * transaction's. We do this inside the loop, and restore the original
- * context at the end, so that leaky things like heap_getnext() are
- * not called in a potentially long-lived context.
- */
- oldcxt = MemoryContextSwitchTo(resultcxt);
-
- sub = (Subscription *) palloc0(sizeof(Subscription));
- sub->oid = HeapTupleGetOid(tup);
- sub->dbid = subform->subdbid;
- sub->owner = subform->subowner;
- sub->enabled = subform->subenabled;
- sub->name = pstrdup(NameStr(subform->subname));
- /* We don't fill fields we are not interested in. */
-
- res = lappend(res, sub);
- MemoryContextSwitchTo(oldcxt);
+ /* We only care about enabled subscriptions. */
+ if (!subform->subenabled)
+ continue;
+
+ res = lappend_oid(res, HeapTupleGetOid(tup));
}
heap_endscan(scan);
heap_close(rel, AccessShareLock);
- CommitTransactionCommand();
-
return res;
}
@@ -250,23 +236,68 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
}
/*
- * Start new apply background worker.
+ * Similar as logicalrep_worker_find(), but returns list of all workers
+ * for the subscription instead just one.
+ */
+List *
+logicalrep_sub_workers_find(Oid subid, bool only_running)
+{
+ int i;
+ List *res = NIL;
+
+ Assert(LWLockHeldByMe(LogicalRepWorkerLock));
+
+ /* Search for attached worker for a given subscription id. */
+ for (i = 0; i < max_logical_replication_workers; i++)
+ {
+ LogicalRepWorker *w = &LogicalRepCtx->workers[i];
+
+ if (w->in_use && w->subid == subid && (!only_running || w->proc))
+ res = lappend(res, w);
+ }
+
+ return res;
+}
+
+/*
+ * Start new logical replication background worker.
*/
void
-logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
- Oid relid)
+logicalrep_worker_launch(Oid subid, Oid relid)
{
BackgroundWorker bgw;
BackgroundWorkerHandle *bgw_handle;
int i;
int slot = 0;
- LogicalRepWorker *worker = NULL;
- int nsyncworkers;
+ List *subworkers;
+ ListCell *lc;
TimestampTz now;
+ int nsyncworkers = 0;
+ Subscription *sub;
+ LogicalRepWorker *worker = NULL;
ereport(DEBUG1,
- (errmsg("starting logical replication worker for subscription \"%s\"",
- subname)));
+ (errmsg("starting logical replication worker for subscription %u",
+ subid)));
+
+ /* Block any concurrent DDL on the subscription. */
+ LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+
+ /*
+ * Subscription might have been dropped in meantime, make sure our cache
+ * is up to date.
+ */
+ AcceptInvalidationMessages();
+
+ /* Get info about subscription. */
+ sub = GetSubscription(subid, true);
+ if (!sub)
+ {
+ ereport(DEBUG1,
+ (errmsg("subscription %u not found, not starting worker for it",
+ subid)));
+ return;
+ }
/* Report this after the initial starting message for consistency. */
if (max_replication_slots == 0)
@@ -294,7 +325,14 @@ retry:
}
}
- nsyncworkers = logicalrep_sync_worker_count(subid);
+ subworkers = logicalrep_sub_workers_find(subid, false);
+ foreach (lc, subworkers)
+ {
+ LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
+ if (w->relid != InvalidOid)
+ nsyncworkers ++;
+ }
+ list_free(subworkers);
now = GetCurrentTimestamp();
@@ -340,6 +378,7 @@ retry:
if (nsyncworkers >= max_sync_workers_per_subscription)
{
LWLockRelease(LogicalRepWorkerLock);
+ UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
return;
}
@@ -350,6 +389,7 @@ retry:
if (worker == NULL)
{
LWLockRelease(LogicalRepWorkerLock);
+ UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
ereport(WARNING,
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
errmsg("out of logical replication worker slots"),
@@ -362,8 +402,8 @@ retry:
worker->in_use = true;
worker->generation++;
worker->proc = NULL;
- worker->dbid = dbid;
- worker->userid = userid;
+ worker->dbid = sub->dbid;
+ worker->userid = sub->owner;
worker->subid = subid;
worker->relid = relid;
worker->relstate = SUBREL_STATE_UNKNOWN;
@@ -374,8 +414,6 @@ retry:
worker->reply_lsn = InvalidXLogRecPtr;
TIMESTAMP_NOBEGIN(worker->reply_time);
- LWLockRelease(LogicalRepWorkerLock);
-
/* Register the new dynamic worker. */
memset(&bgw, 0, sizeof(bgw));
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
@@ -394,8 +432,13 @@ retry:
bgw.bgw_notify_pid = MyProcPid;
bgw.bgw_main_arg = Int32GetDatum(slot);
+ /* Try to register the worker and cleanup in case of failure. */
if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
{
+ logicalrep_worker_cleanup(worker);
+ LWLockRelease(LogicalRepWorkerLock);
+ UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+
ereport(WARNING,
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
errmsg("out of background worker slots"),
@@ -403,13 +446,24 @@ retry:
return;
}
+ /* Done with the worker array. */
+ LWLockRelease(LogicalRepWorkerLock);
+
/* Now wait until it attaches. */
WaitForReplicationWorkerAttach(worker, bgw_handle);
+
+ /*
+ * Worker either started or died, in any case we are done with the
+ * subscription.
+ */
+ UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
}
/*
* Stop the logical replication worker and wait until it detaches from the
* slot.
+ *
+ * Callers of this function better have exclusive lock on the subscription.
*/
void
logicalrep_worker_stop(Oid subid, Oid relid)
@@ -417,7 +471,8 @@ logicalrep_worker_stop(Oid subid, Oid relid)
LogicalRepWorker *worker;
uint16 generation;
- LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+ /* Exclusive is needed for logicalrep_worker_cleanup(). */
+ LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
worker = logicalrep_worker_find(subid, relid, false);
@@ -428,56 +483,16 @@ logicalrep_worker_stop(Oid subid, Oid relid)
return;
}
+ /* If there is worker but it's not running, clean it up. */
+ if (!worker->proc)
+ logicalrep_worker_cleanup(worker);
+
/*
* Remember which generation was our worker so we can check if what we see
* is still the same one.
*/
generation = worker->generation;
- /*
- * If we found worker but it does not have proc set it is starting up,
- * wait for it to finish and then kill it.
- */
- while (worker->in_use && !worker->proc)
- {
- int rc;
-
- LWLockRelease(LogicalRepWorkerLock);
-
- /* Wait for signal. */
- rc = WaitLatch(MyLatch,
- WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
- 1000L, WAIT_EVENT_BGWORKER_STARTUP);
-
- /* emergency bailout if postmaster has died */
- if (rc & WL_POSTMASTER_DEATH)
- proc_exit(1);
-
- if (rc & WL_LATCH_SET)
- {
- ResetLatch(MyLatch);
- CHECK_FOR_INTERRUPTS();
- }
-
- /* Check worker status. */
- LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-
- /*
- * Check whether the worker slot is no longer used, which would mean
- * that the worker has exited, or whether the worker generation is
- * different, meaning that a different worker has taken the slot.
- */
- if (!worker->in_use || worker->generation != generation)
- {
- LWLockRelease(LogicalRepWorkerLock);
- return;
- }
-
- /* Worker has assigned proc, so it has started. */
- if (worker->proc)
- break;
- }
-
/* Now terminate the worker ... */
kill(worker->proc->pid, SIGTERM);
LWLockRelease(LogicalRepWorkerLock);
@@ -497,7 +512,10 @@ logicalrep_worker_stop(Oid subid, Oid relid)
CHECK_FOR_INTERRUPTS();
- /* Wait for more work. */
+ /*
+ * We need timeout because we generally don't get notified via latch
+ * about the worker attach.
+ */
rc = WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
1000L, WAIT_EVENT_BGWORKER_SHUTDOWN);
@@ -515,6 +533,22 @@ logicalrep_worker_stop(Oid subid, Oid relid)
}
/*
+ * Request worker to be stopped on commit.
+ */
+void
+logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
+{
+ LogicalRepWorkerId *wid;
+ wid = MemoryContextAlloc(TopTransactionContext,
+ sizeof(LogicalRepWorkerId));
+ wid->subid = subid;
+ wid->relid = relid;
+
+ on_commit_stop_workers = lappend(on_commit_stop_workers, wid);
+}
+
+
+/*
* Wake up (using latch) the logical replication worker.
*/
void
@@ -648,30 +682,6 @@ logicalrep_launcher_sighup(SIGNAL_ARGS)
}
/*
- * Count the number of registered (not necessarily running) sync workers
- * for a subscription.
- */
-int
-logicalrep_sync_worker_count(Oid subid)
-{
- int i;
- int res = 0;
-
- Assert(LWLockHeldByMe(LogicalRepWorkerLock));
-
- /* Search for attached worker for a given subscription id. */
- for (i = 0; i < max_logical_replication_workers; i++)
- {
- LogicalRepWorker *w = &LogicalRepCtx->workers[i];
-
- if (w->subid == subid && OidIsValid(w->relid))
- res++;
- }
-
- return res;
-}
-
-/*
* ApplyLauncherShmemSize
* Compute space needed for replication launcher shared memory
*/
@@ -754,9 +764,25 @@ ApplyLauncherShmemInit(void)
void
AtEOXact_ApplyLauncher(bool isCommit)
{
- if (isCommit && on_commit_launcher_wakeup)
- ApplyLauncherWakeup();
+ ListCell *lc;
+
+ if (isCommit)
+ {
+ foreach (lc, on_commit_stop_workers)
+ {
+ LogicalRepWorkerId *wid = lfirst(lc);
+ logicalrep_worker_stop(wid->subid, wid->relid);
+ }
+ if (on_commit_launcher_wakeup)
+ ApplyLauncherWakeup();
+ }
+
+ /*
+ * No need to pfree on_commit_stop_workers, it's been allocated in
+ * transaction memory context which is going to be cleaned soon.
+ */
+ on_commit_stop_workers = NIL;
on_commit_launcher_wakeup = false;
}
@@ -814,8 +840,6 @@ ApplyLauncherMain(Datum main_arg)
int rc;
List *sublist;
ListCell *lc;
- MemoryContext subctx;
- MemoryContext oldctx;
TimestampTz now;
long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
@@ -827,41 +851,38 @@ ApplyLauncherMain(Datum main_arg)
if (TimestampDifferenceExceeds(last_start_time, now,
wal_retrieve_retry_interval))
{
- /* Use temporary context for the database list and worker info. */
- subctx = AllocSetContextCreate(TopMemoryContext,
- "Logical Replication Launcher sublist",
- ALLOCSET_DEFAULT_MINSIZE,
- ALLOCSET_DEFAULT_INITSIZE,
- ALLOCSET_DEFAULT_MAXSIZE);
- oldctx = MemoryContextSwitchTo(subctx);
-
- /* search for subscriptions to start or stop. */
+ /*
+ * Start new transaction so that we can take locks and snapshots.
+ *
+ * Any allocations will also be made inside the transaction memory
+ * context.
+ */
+ StartTransactionCommand();
+
+ /* Search for subscriptions to start. */
sublist = get_subscription_list();
- /* Start the missing workers for enabled subscriptions. */
+ /* Start the missing workers. */
foreach(lc, sublist)
{
- Subscription *sub = (Subscription *) lfirst(lc);
+ Oid subid = lfirst_oid(lc);
LogicalRepWorker *w;
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
- w = logicalrep_worker_find(sub->oid, InvalidOid, false);
+ w = logicalrep_worker_find(subid, InvalidOid, false);
LWLockRelease(LogicalRepWorkerLock);
- if (sub->enabled && w == NULL)
+ if (w == NULL)
{
last_start_time = now;
wait_time = wal_retrieve_retry_interval;
- logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
- sub->owner, InvalidOid);
+ /* Start the worker. */
+ logicalrep_worker_launch(subid, InvalidOid);
}
}
- /* Switch back to original memory context. */
- MemoryContextSwitchTo(oldctx);
- /* Clean the temporary memory. */
- MemoryContextDelete(subctx);
+ CommitTransactionCommand();
}
else
{
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 3ef12df..11f4977 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -215,7 +215,7 @@ wait_for_relation_state_change(Oid relid, char expected_state)
* Returns false if the apply worker has disappeared or the table state has been
* reset.
*/
-static bool
+static void
wait_for_worker_state_change(char expected_state)
{
int rc;
@@ -232,10 +232,13 @@ wait_for_worker_state_change(char expected_state)
InvalidOid, false);
LWLockRelease(LogicalRepWorkerLock);
if (!worker)
- return false;
+ ereport(FATAL,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("terminating logical replication synchronization "
+ "worker due to subscription apply worker exit")));
if (MyLogicalRepWorker->relstate == expected_state)
- return true;
+ return;
rc = WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
@@ -247,8 +250,6 @@ wait_for_worker_state_change(char expected_state)
ResetLatch(MyLatch);
}
-
- return false;
}
/*
@@ -285,11 +286,10 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
SpinLockRelease(&MyLogicalRepWorker->relmutex);
- SetSubscriptionRelState(MyLogicalRepWorker->subid,
- MyLogicalRepWorker->relid,
- MyLogicalRepWorker->relstate,
- MyLogicalRepWorker->relstate_lsn,
- true);
+ UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+ MyLogicalRepWorker->relid,
+ MyLogicalRepWorker->relstate,
+ MyLogicalRepWorker->relstate_lsn);
walrcv_endstreaming(wrconn, &tli);
finish_sync_worker();
@@ -332,6 +332,13 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
ListCell *lc;
bool started_tx = false;
+#define ensure_transaction() \
+ if (!started_tx) \
+ {\
+ StartTransactionCommand(); \
+ started_tx = true; \
+ }
+
Assert(!IsTransactionState());
/* We need up-to-date sync state info for subscription tables here. */
@@ -346,8 +353,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
list_free_deep(table_states);
table_states = NIL;
- StartTransactionCommand();
- started_tx = true;
+ ensure_transaction();
/* Fetch all non-ready tables. */
rstates = GetSubscriptionNotReadyRelations(MySubscription->oid);
@@ -409,14 +415,11 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
{
rstate->state = SUBREL_STATE_READY;
rstate->lsn = current_lsn;
- if (!started_tx)
- {
- StartTransactionCommand();
- started_tx = true;
- }
- SetSubscriptionRelState(MyLogicalRepWorker->subid,
- rstate->relid, rstate->state,
- rstate->lsn, true);
+
+ ensure_transaction();
+ UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+ rstate->relid, rstate->state,
+ rstate->lsn);
}
}
else
@@ -435,13 +438,26 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
SpinLockRelease(&syncworker->relmutex);
}
else
+ {
+ List *subworkers;
+ ListCell *lc;
/*
* If there is no sync worker for this table yet, count
* running sync workers for this subscription, while we have
* the lock, for later.
*/
- nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
+ subworkers =
+ logicalrep_sub_workers_find(MyLogicalRepWorker->subid,
+ false);
+ foreach (lc, subworkers)
+ {
+ LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
+ if (w->relid != InvalidOid)
+ nsyncworkers ++;
+ }
+ list_free(subworkers);
+ }
LWLockRelease(LogicalRepWorkerLock);
/*
@@ -467,11 +483,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
* Enter busy loop and wait for synchronization worker to
* reach expected state (or die trying).
*/
- if (!started_tx)
- {
- StartTransactionCommand();
- started_tx = true;
- }
+ ensure_transaction();
wait_for_relation_state_change(rstate->relid,
SUBREL_STATE_SYNCDONE);
}
@@ -493,10 +505,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
TimestampDifferenceExceeds(hentry->last_start_time, now,
wal_retrieve_retry_interval))
{
- logicalrep_worker_launch(MyLogicalRepWorker->dbid,
- MySubscription->oid,
- MySubscription->name,
- MyLogicalRepWorker->userid,
+ ensure_transaction();
+ logicalrep_worker_launch(MySubscription->oid,
rstate->relid);
hentry->last_start_time = now;
}
@@ -798,6 +808,15 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
MyLogicalRepWorker->relid,
&relstate_lsn, true);
+ if (relstate == SUBREL_STATE_UNKNOWN)
+ {
+ ereport(LOG,
+ (errmsg("logical replication table synchronization worker for subscription \"%s\", "
+ "table \"%s\" will stop because the table is no longer subscribed",
+ MySubscription->name,
+ get_rel_name(MyLogicalRepWorker->relid))));
+ proc_exit(0);
+ }
CommitTransactionCommand();
SpinLockAcquire(&MyLogicalRepWorker->relmutex);
@@ -844,11 +863,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
/* Update the state and make it visible to others. */
StartTransactionCommand();
- SetSubscriptionRelState(MyLogicalRepWorker->subid,
- MyLogicalRepWorker->relid,
- MyLogicalRepWorker->relstate,
- MyLogicalRepWorker->relstate_lsn,
- true);
+ UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+ MyLogicalRepWorker->relid,
+ MyLogicalRepWorker->relstate,
+ MyLogicalRepWorker->relstate_lsn);
CommitTransactionCommand();
pgstat_report_stat(false);
@@ -933,11 +951,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* Update the new state in catalog. No need to bother
* with the shmem state as we are exiting for good.
*/
- SetSubscriptionRelState(MyLogicalRepWorker->subid,
- MyLogicalRepWorker->relid,
- SUBREL_STATE_SYNCDONE,
- *origin_startpos,
- true);
+ UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+ MyLogicalRepWorker->relid,
+ SUBREL_STATE_SYNCDONE,
+ *origin_startpos);
finish_sync_worker();
}
break;
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 898c497..085dd8c 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1516,24 +1516,31 @@ ApplyWorkerMain(Datum main_arg)
ALLOCSET_DEFAULT_SIZES);
StartTransactionCommand();
oldctx = MemoryContextSwitchTo(ApplyContext);
- MySubscription = GetSubscription(MyLogicalRepWorker->subid, false);
+ MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
+ if (!MySubscription)
+ {
+ ereport(LOG,
+ (errmsg("logical replication apply worker for subscription %u will "
+ "stop because the subscription was removed",
+ MyLogicalRepWorker->subid)));
+ proc_exit(0);
+ }
MySubscriptionValid = true;
MemoryContextSwitchTo(oldctx);
- /* Setup synchronous commit according to the user's wishes */
- SetConfigOption("synchronous_commit", MySubscription->synccommit,
- PGC_BACKEND, PGC_S_OVERRIDE);
-
if (!MySubscription->enabled)
{
ereport(LOG,
- (errmsg("logical replication apply worker for subscription \"%s\" will not "
- "start because the subscription was disabled during startup",
+ (errmsg("logical replication apply worker for subscription \"%s\" will "
+ "stop because the subscription was disabled",
MySubscription->name)));
-
proc_exit(0);
}
+ /* Setup synchronous commit according to the user's wishes */
+ SetConfigOption("synchronous_commit", MySubscription->synccommit,
+ PGC_BACKEND, PGC_S_OVERRIDE);
+
/* Keep us informed about subscription changes. */
CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
subscription_change_cb,
diff --git a/src/backend/utils/cache/catcache.c b/src/backend/utils/cache/catcache.c
index e7e8e3b..639b4eb 100644
--- a/src/backend/utils/cache/catcache.c
+++ b/src/backend/utils/cache/catcache.c
@@ -1052,10 +1052,12 @@ IndexScanOK(CatCache *cache, ScanKey cur_skey)
case AUTHNAME:
case AUTHOID:
case AUTHMEMMEMROLE:
+ case SUBSCRIPTIONOID:
+ case SUBSCRIPTIONNAME:
/*
- * Protect authentication lookups occurring before relcache has
- * collected entries for shared indexes.
+ * Protect authentication and subscription lookups occurring
+ * before relcache has collected entries for shared indexes.
*/
if (!criticalSharedRelcachesBuilt)
return false;
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index 991ca9d..c5b0b9c 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -70,8 +70,10 @@ typedef struct SubscriptionRelState
char state;
} SubscriptionRelState;
-extern Oid SetSubscriptionRelState(Oid subid, Oid relid, char state,
- XLogRecPtr sublsn, bool update_only);
+extern Oid AddSubscriptionRelState(Oid subid, Oid relid, char state,
+ XLogRecPtr sublsn);
+extern Oid UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
+ XLogRecPtr sublsn);
extern char GetSubscriptionRelState(Oid subid, Oid relid,
XLogRecPtr *sublsn, bool missing_ok);
extern void RemoveSubscriptionRel(Oid subid, Oid relid);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 494a3a3..add7841 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -71,13 +71,13 @@ extern bool in_remote_transaction;
extern void logicalrep_worker_attach(int slot);
extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
bool only_running);
-extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
- Oid userid, Oid relid);
+extern void logicalrep_worker_launch(Oid subid, Oid relid);
extern void logicalrep_worker_stop(Oid subid, Oid relid);
+extern void logicalrep_worker_stop_at_commit(Oid subid, Oid relid);
extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
-extern int logicalrep_sync_worker_count(Oid subid);
+extern List *logicalrep_sub_workers_find(Oid subid, bool only_running);
extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos);
void process_syncing_tables(XLogRecPtr current_lsn);
--
2.7.4
On Sun, Jun 25, 2017 at 7:35 PM, Petr Jelinek
<petr.jelinek@2ndquadrant.com> wrote:
(was away for a while, got some time now for this again)
On 22/06/17 04:43, Peter Eisentraut wrote:
The alternative is that we use the LockSharedObject() approach that was
already alluded to, like in the attached patch. Both approaches would
work equally fine AFAICT.I agree, but I think we need bigger overhaul of the locking/management
in general. So here is patch which does much more changes.The patch does several important things (in no particular order):
- Split SetSubscriptionRelState into AddSubscriptionRelState and
UpdateSubscriptionRelState for the reasons said upstream, it's cleaner,
there is no half-broken upsert logic and it has proper error checking
for each action.- Do LockSharedObject in ALTER SUBSCRIPTION, DROP SUBSCRIPTION (this one
is preexisting but mentioning it for context), SetSubscriptionRelState,
AddSubscriptionRelState, and in the logicalrep_worker_launch. This means
we use granular per object locks to deal with concurrency.- Because of above, the AccessExclusiveLock on pg_subscription is no
longer needed, just normal RowExlusiveLock is used now.- logicalrep_worker_stop is also simplified due to the proper locking
- There is new interface logicalrep_worker_stop_at_commit which is used
by ALTER SUBSCRIPTION ... REFRESH PUBLICATION and by transactional
variant of DROP SUBSCRIPTION to only kill workers at the end of transaction.- Locking/reading of subscription info is unified between DROP and ALTER
SUBSCRIPTION commands.- DROP SUBSCRIPTION will kill all workers associated with subscription,
not just apply.- The sync worker checks during startup if the relation is still subscribed.
- The sync worker will exit when waiting for apply and apply has shut-down.
- The log messages around workers and removed or disabled subscription
are now more consistent between startup and normal runtime of the worker.- Some code deduplication and stylistic changes/simplification in
related areas.- Fixed catcache's IndexScanOK() handling of the subscription catalog.
It's bit bigger patch but solves issues from multiple threads around
handling of ALTER/DROP subscription.A lot of the locking that I added is normally done transparently by
dependency handling, but subscriptions and subscription relation status
do not use that much as it was deemed to bloat pg_depend needlessly
during the original patch review (it's also probably why this has
slipped through).
Thank you for reworking on this! I'll review this patch on Tuesday.
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Mon, Jun 26, 2017 at 12:12 PM, Masahiko Sawada <sawada.mshk@gmail.com> wrote:
On Sun, Jun 25, 2017 at 7:35 PM, Petr Jelinek
<petr.jelinek@2ndquadrant.com> wrote:(was away for a while, got some time now for this again)
On 22/06/17 04:43, Peter Eisentraut wrote:
The alternative is that we use the LockSharedObject() approach that was
already alluded to, like in the attached patch. Both approaches would
work equally fine AFAICT.I agree, but I think we need bigger overhaul of the locking/management
in general. So here is patch which does much more changes.The patch does several important things (in no particular order):
- Split SetSubscriptionRelState into AddSubscriptionRelState and
UpdateSubscriptionRelState for the reasons said upstream, it's cleaner,
there is no half-broken upsert logic and it has proper error checking
for each action.- Do LockSharedObject in ALTER SUBSCRIPTION, DROP SUBSCRIPTION (this one
is preexisting but mentioning it for context), SetSubscriptionRelState,
AddSubscriptionRelState, and in the logicalrep_worker_launch. This means
we use granular per object locks to deal with concurrency.- Because of above, the AccessExclusiveLock on pg_subscription is no
longer needed, just normal RowExlusiveLock is used now.- logicalrep_worker_stop is also simplified due to the proper locking
- There is new interface logicalrep_worker_stop_at_commit which is used
by ALTER SUBSCRIPTION ... REFRESH PUBLICATION and by transactional
variant of DROP SUBSCRIPTION to only kill workers at the end of transaction.- Locking/reading of subscription info is unified between DROP and ALTER
SUBSCRIPTION commands.- DROP SUBSCRIPTION will kill all workers associated with subscription,
not just apply.- The sync worker checks during startup if the relation is still subscribed.
- The sync worker will exit when waiting for apply and apply has shut-down.
- The log messages around workers and removed or disabled subscription
are now more consistent between startup and normal runtime of the worker.- Some code deduplication and stylistic changes/simplification in
related areas.- Fixed catcache's IndexScanOK() handling of the subscription catalog.
It's bit bigger patch but solves issues from multiple threads around
handling of ALTER/DROP subscription.A lot of the locking that I added is normally done transparently by
dependency handling, but subscriptions and subscription relation status
do not use that much as it was deemed to bloat pg_depend needlessly
during the original patch review (it's also probably why this has
slipped through).
I've reviewed this patch briefly.
@@ -515,6 +533,31 @@ logicalrep_worker_stop(Oid subid, Oid relid)
}
/*
+ * Request worker to be stopped on commit.
+ */
+void
+logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
+{
+ LogicalRepWorkerId *wid;
+ MemoryContext old;
+
+ old = MemoryContextSwitchTo(TopTransactionContext);
+
+ wid = (LogicalRepWorkerId *) palloc(sizeof(LogicalRepWorkerId));
+
+ /*
+ wid = MemoryContextAlloc(TopTransactionContext,
+
sizeof(LogicalRepWorkerId));
+ */
+ wid->subid = subid;
+ wid->relid = relid;
+
+ on_commit_stop_workers = lappend(on_commit_stop_workers, wid);
+
+ MemoryContextSwitchTo(old);
+}
logicalrep_worker_stop_at_commit() has a problem that new_list()
called by lappend() allocates the memory from current memory context.
It should switch the memory context and then allocate the memory for
wid and append it to the list.
--------
@@ -754,9 +773,25 @@ ApplyLauncherShmemInit(void)
void
AtEOXact_ApplyLauncher(bool isCommit)
{
- if (isCommit && on_commit_launcher_wakeup)
- ApplyLauncherWakeup();
+ ListCell *lc;
+
+ if (isCommit)
+ {
+ foreach (lc, on_commit_stop_workers)
+ {
+ LogicalRepWorkerId *wid = lfirst(lc);
+ logicalrep_worker_stop(wid->subid, wid->relid);
+ }
+
+ if (on_commit_launcher_wakeup)
+ ApplyLauncherWakeup();
Stopping the logical rep worker in AtEOXact_ApplyLauncher doesn't
support the prepared transaction. Since we allocate the list
on_commit_stop_workers in TopTransactionContext the postgres crashes
if we execute any query after prepared transaction that removes
subscription relation state. Also after fixed this issue, we still
need to something: the list of on_commit_stop_workers is not
associated the prepared transaction. A query next to "preapre
transaction" tries to stop workers at the commit. There was similar
discussion before.
--------
+
+ ensure_transaction();
+
UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+
rstate->relid, rstate->state,
+
rstate->lsn);
Should we commit the transaction if we started a new transaction
before update the subscription relation state, or it could be deadlock
risk.
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 27/06/17 10:51, Masahiko Sawada wrote:
On Mon, Jun 26, 2017 at 12:12 PM, Masahiko Sawada <sawada.mshk@gmail.com> wrote:
I've reviewed this patch briefly.
Thanks!
@@ -515,6 +533,31 @@ logicalrep_worker_stop(Oid subid, Oid relid)
}/* + * Request worker to be stopped on commit. + */ +void +logicalrep_worker_stop_at_commit(Oid subid, Oid relid) +{ + LogicalRepWorkerId *wid; + MemoryContext old; + + old = MemoryContextSwitchTo(TopTransactionContext); + + wid = (LogicalRepWorkerId *) palloc(sizeof(LogicalRepWorkerId)); + + /* + wid = MemoryContextAlloc(TopTransactionContext, + sizeof(LogicalRepWorkerId)); + */ + wid->subid = subid; + wid->relid = relid; + + on_commit_stop_workers = lappend(on_commit_stop_workers, wid); + + MemoryContextSwitchTo(old); +}logicalrep_worker_stop_at_commit() has a problem that new_list()
called by lappend() allocates the memory from current memory context.
It should switch the memory context and then allocate the memory for
wid and append it to the list.
Right, fixed (I see you did that locally as well based on the above
excerpt ;) ).
-------- @@ -754,9 +773,25 @@ ApplyLauncherShmemInit(void) void AtEOXact_ApplyLauncher(bool isCommit) { - if (isCommit && on_commit_launcher_wakeup) - ApplyLauncherWakeup(); + ListCell *lc; + + if (isCommit) + { + foreach (lc, on_commit_stop_workers) + { + LogicalRepWorkerId *wid = lfirst(lc); + logicalrep_worker_stop(wid->subid, wid->relid); + } + + if (on_commit_launcher_wakeup) + ApplyLauncherWakeup();Stopping the logical rep worker in AtEOXact_ApplyLauncher doesn't
support the prepared transaction. Since we allocate the list
on_commit_stop_workers in TopTransactionContext the postgres crashes
if we execute any query after prepared transaction that removes
subscription relation state. Also after fixed this issue, we still
need to something: the list of on_commit_stop_workers is not
associated the prepared transaction. A query next to "preapre
transaction" tries to stop workers at the commit. There was similar
discussion before.
Hmm, good point. I think for now it makes sense to simply don't allow
PREPARE for transactions that manipulate workers similar to what we do
when there are exported snapshots. Done it that way in attached.
-------- + + ensure_transaction(); + UpdateSubscriptionRelState(MyLogicalRepWorker->subid, + rstate->relid, rstate->state, + rstate->lsn);Should we commit the transaction if we started a new transaction
before update the subscription relation state, or it could be deadlock
risk.
We only lock the whole subscription (and only conflicting things are
DROP and ALTER SUBSCRIPTION), not individual subscription-relation
mapping so it doesn't seem to me like there is any higher risk of
deadlocks than anything else which works with multiple tables (or
compared to previous behavior).
--
Petr Jelinek http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
Attachments:
v2-0001-Rework-subscription-worker-and-relation-status-handl.patchtext/x-patch; name=v2-0001-Rework-subscription-worker-and-relation-status-handl.patchDownload
From 88da433110aa3bde3dcce33ebf62d41a08c191b9 Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmodos@pjmodos.net>
Date: Sat, 24 Jun 2017 19:38:21 +0200
Subject: [PATCH] Rework subscription worker and relation status handling
---
src/backend/access/transam/xact.c | 9 +
src/backend/catalog/pg_subscription.c | 137 ++++++------
src/backend/commands/subscriptioncmds.c | 98 ++++-----
src/backend/replication/logical/launcher.c | 309 ++++++++++++++++------------
src/backend/replication/logical/tablesync.c | 97 +++++----
src/backend/replication/logical/worker.c | 23 ++-
src/backend/utils/cache/catcache.c | 6 +-
src/include/catalog/pg_subscription_rel.h | 6 +-
src/include/replication/logicallauncher.h | 1 +
src/include/replication/worker_internal.h | 6 +-
10 files changed, 393 insertions(+), 299 deletions(-)
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index b0aa69f..322502d 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -2277,6 +2277,15 @@ PrepareTransaction(void)
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot PREPARE a transaction that has exported snapshots")));
+ /*
+ * Similar to above, don't allow PREPARE but for transaction that kill
+ * logical replication, workers.
+ */
+ if (XactManipulatesLogicalReplicationWorkers())
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot PREPARE a transaction that has manipulated logical replication workers")));
+
/* Prevent cancel/die interrupt while cleaning up */
HOLD_INTERRUPTS();
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index c69c461..b643e54 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -28,6 +28,8 @@
#include "nodes/makefuncs.h"
+#include "storage/lmgr.h"
+
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
@@ -225,84 +227,101 @@ textarray_to_stringlist(ArrayType *textarray)
}
/*
- * Set the state of a subscription table.
- *
- * If update_only is true and the record for given table doesn't exist, do
- * nothing. This can be used to avoid inserting a new record that was deleted
- * by someone else. Generally, subscription DDL commands should use false,
- * workers should use true.
- *
- * The insert-or-update logic in this function is not concurrency safe so it
- * might raise an error in rare circumstances. But if we took a stronger lock
- * such as ShareRowExclusiveLock, we would risk more deadlocks.
+ * Add new state record for a subscription table.
*/
Oid
-SetSubscriptionRelState(Oid subid, Oid relid, char state,
- XLogRecPtr sublsn, bool update_only)
+AddSubscriptionRelState(Oid subid, Oid relid, char state,
+ XLogRecPtr sublsn)
{
Relation rel;
HeapTuple tup;
- Oid subrelid = InvalidOid;
+ Oid subrelid;
bool nulls[Natts_pg_subscription_rel];
Datum values[Natts_pg_subscription_rel];
+ LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+
rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock);
/* Try finding existing mapping. */
tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
ObjectIdGetDatum(relid),
ObjectIdGetDatum(subid));
+ if (HeapTupleIsValid(tup))
+ elog(ERROR, "subscription table %u in subscription %u already exists",
+ relid, subid);
- /*
- * If the record for given table does not exist yet create new record,
- * otherwise update the existing one.
- */
- if (!HeapTupleIsValid(tup) && !update_only)
- {
- /* Form the tuple. */
- memset(values, 0, sizeof(values));
- memset(nulls, false, sizeof(nulls));
- values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
- values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
- values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
- if (sublsn != InvalidXLogRecPtr)
- values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
- else
- nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
-
- tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
-
- /* Insert tuple into catalog. */
- subrelid = CatalogTupleInsert(rel, tup);
-
- heap_freetuple(tup);
- }
- else if (HeapTupleIsValid(tup))
- {
- bool replaces[Natts_pg_subscription_rel];
+ /* Form the tuple. */
+ memset(values, 0, sizeof(values));
+ memset(nulls, false, sizeof(nulls));
+ values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
+ values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
+ values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
+ if (sublsn != InvalidXLogRecPtr)
+ values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
+ else
+ nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
- /* Update the tuple. */
- memset(values, 0, sizeof(values));
- memset(nulls, false, sizeof(nulls));
- memset(replaces, false, sizeof(replaces));
+ tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
- replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
- values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
+ /* Insert tuple into catalog. */
+ subrelid = CatalogTupleInsert(rel, tup);
- replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
- if (sublsn != InvalidXLogRecPtr)
- values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
- else
- nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
+ heap_freetuple(tup);
- tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
- replaces);
+ /* Cleanup. */
+ heap_close(rel, NoLock);
- /* Update the catalog. */
- CatalogTupleUpdate(rel, &tup->t_self, tup);
+ return subrelid;
+}
- subrelid = HeapTupleGetOid(tup);
- }
+/*
+ * Update the state of a subscription table.
+ */
+Oid
+UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
+ XLogRecPtr sublsn)
+{
+ Relation rel;
+ HeapTuple tup;
+ Oid subrelid;
+ bool nulls[Natts_pg_subscription_rel];
+ Datum values[Natts_pg_subscription_rel];
+ bool replaces[Natts_pg_subscription_rel];
+
+ LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+
+ rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock);
+
+ /* Try finding existing mapping. */
+ tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
+ ObjectIdGetDatum(relid),
+ ObjectIdGetDatum(subid));
+ if (!HeapTupleIsValid(tup))
+ elog(ERROR, "subscription table %u in subscription %u does not exist",
+ relid, subid);
+
+ /* Update the tuple. */
+ memset(values, 0, sizeof(values));
+ memset(nulls, false, sizeof(nulls));
+ memset(replaces, false, sizeof(replaces));
+
+ replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
+ values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
+
+ replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
+ if (sublsn != InvalidXLogRecPtr)
+ values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
+ else
+ nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
+
+ tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+ replaces);
+
+ /* Update the catalog. */
+ CatalogTupleUpdate(rel, &tup->t_self, tup);
+
+ subrelid = HeapTupleGetOid(tup);
/* Cleanup. */
heap_close(rel, NoLock);
@@ -377,6 +396,8 @@ RemoveSubscriptionRel(Oid subid, Oid relid)
HeapTuple tup;
int nkeys = 0;
+ Assert(OidIsValid(subid) || OidIsValid(relid));
+
rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock);
if (OidIsValid(subid))
@@ -400,9 +421,7 @@ RemoveSubscriptionRel(Oid subid, Oid relid)
/* Do the search and delete what we found. */
scan = heap_beginscan_catalog(rel, nkeys, skey);
while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
- {
CatalogTupleDelete(rel, &tup->t_self);
- }
heap_endscan(scan);
heap_close(rel, RowExclusiveLock);
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 9cbd36f..3dc1f4c 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -450,8 +450,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
CheckSubscriptionRelkind(get_rel_relkind(relid),
rv->schemaname, rv->relname);
- SetSubscriptionRelState(subid, relid, table_state,
- InvalidXLogRecPtr, false);
+ AddSubscriptionRelState(subid, relid, table_state,
+ InvalidXLogRecPtr);
}
/*
@@ -569,9 +569,9 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
if (!bsearch(&relid, subrel_local_oids,
list_length(subrel_states), sizeof(Oid), oid_cmp))
{
- SetSubscriptionRelState(sub->oid, relid,
- copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
- InvalidXLogRecPtr, false);
+ AddSubscriptionRelState(sub->oid, relid,
+ copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
+ InvalidXLogRecPtr);
ereport(NOTICE,
(errmsg("added subscription for table %s.%s",
quote_identifier(rv->schemaname),
@@ -597,7 +597,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
RemoveSubscriptionRel(sub->oid, relid);
- logicalrep_worker_stop(sub->oid, relid);
+ logicalrep_worker_stop_at_commit(sub->oid, relid);
namespace = get_namespace_name(get_rel_namespace(relid));
ereport(NOTICE,
@@ -636,14 +636,18 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
errmsg("subscription \"%s\" does not exist",
stmt->subname)));
+ subid = HeapTupleGetOid(tup);
+
/* must be owner */
- if (!pg_subscription_ownercheck(HeapTupleGetOid(tup), GetUserId()))
+ if (!pg_subscription_ownercheck(subid, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_SUBSCRIPTION,
stmt->subname);
- subid = HeapTupleGetOid(tup);
sub = GetSubscription(subid, false);
+ /* Lock the subscription so nobody else can do anything with it. */
+ LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
+
/* Form a new tuple. */
memset(values, 0, sizeof(values));
memset(nulls, false, sizeof(nulls));
@@ -811,14 +815,12 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
ObjectAddress myself;
HeapTuple tup;
Oid subid;
- Datum datum;
- bool isnull;
- char *subname;
- char *conninfo;
- char *slotname;
+ List *subworkers;
+ ListCell *lc;
char originname[NAMEDATALEN];
- char *err = NULL;
RepOriginId originid;
+ char *err = NULL;
+ Subscription *sub;
WalReceiverConn *wrconn = NULL;
StringInfoData cmd;
@@ -826,7 +828,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
* Lock pg_subscription with AccessExclusiveLock to ensure that the
* launcher doesn't restart new worker during dropping the subscription
*/
- rel = heap_open(SubscriptionRelationId, AccessExclusiveLock);
+ rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
tup = SearchSysCache2(SUBSCRIPTIONNAME, MyDatabaseId,
CStringGetDatum(stmt->subname));
@@ -858,31 +860,10 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
/* DROP hook for the subscription being removed */
InvokeObjectDropHook(SubscriptionRelationId, subid, 0);
- /*
- * Lock the subscription so nobody else can do anything with it (including
- * the replication workers).
- */
- LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
+ sub = GetSubscription(subid, false);
- /* Get subname */
- datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
- Anum_pg_subscription_subname, &isnull);
- Assert(!isnull);
- subname = pstrdup(NameStr(*DatumGetName(datum)));
-
- /* Get conninfo */
- datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
- Anum_pg_subscription_subconninfo, &isnull);
- Assert(!isnull);
- conninfo = TextDatumGetCString(datum);
-
- /* Get slotname */
- datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
- Anum_pg_subscription_subslotname, &isnull);
- if (!isnull)
- slotname = pstrdup(NameStr(*DatumGetName(datum)));
- else
- slotname = NULL;
+ /* Lock the subscription so nobody else can do anything with it. */
+ LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
/*
* Since dropping a replication slot is not transactional, the replication
@@ -894,7 +875,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
* of a subscription that is associated with a replication slot", but we
* don't have the proper facilities for that.
*/
- if (slotname)
+ if (sub->slotname)
PreventTransactionChain(isTopLevel, "DROP SUBSCRIPTION");
@@ -906,15 +887,33 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
ReleaseSysCache(tup);
+ /*
+ * If we are dropping slot, stop all the subscription workers immediately
+ * so that the slot is accessible, otherwise just shedule the stop at the
+ * end of the transaction.
+ *
+ * New workers won't be started because we hold exclusive lock on the
+ * subscription till the end of transaction.
+ */
+ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+ subworkers = logicalrep_sub_workers_find(subid, false);
+ LWLockRelease(LogicalRepWorkerLock);
+ foreach (lc, subworkers)
+ {
+ LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
+ if (sub->slotname)
+ logicalrep_worker_stop(w->subid, w->relid);
+ else
+ logicalrep_worker_stop_at_commit(w->subid, w->relid);
+ }
+ list_free(subworkers);
+
/* Clean up dependencies */
deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
/* Remove any associated relation synchronization states. */
RemoveSubscriptionRel(subid, InvalidOid);
- /* Kill the apply worker so that the slot becomes accessible. */
- logicalrep_worker_stop(subid, InvalidOid);
-
/* Remove the origin tracking if exists. */
snprintf(originname, sizeof(originname), "pg_%u", subid);
originid = replorigin_by_name(originname, true);
@@ -925,7 +924,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
* If there is no slot associated with the subscription, we can finish
* here.
*/
- if (!slotname)
+ if (!sub->slotname)
{
heap_close(rel, NoLock);
return;
@@ -938,13 +937,14 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
load_file("libpqwalreceiver", false);
initStringInfo(&cmd);
- appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s", quote_identifier(slotname));
+ appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s",
+ quote_identifier(sub->slotname));
- wrconn = walrcv_connect(conninfo, true, subname, &err);
+ wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
if (wrconn == NULL)
ereport(ERROR,
(errmsg("could not connect to publisher when attempting to "
- "drop the replication slot \"%s\"", slotname),
+ "drop the replication slot \"%s\"", sub->slotname),
errdetail("The error was: %s", err),
errhint("Use ALTER SUBSCRIPTION ... SET (slot_name = NONE) "
"to disassociate the subscription from the slot.")));
@@ -958,12 +958,12 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
if (res->status != WALRCV_OK_COMMAND)
ereport(ERROR,
(errmsg("could not drop the replication slot \"%s\" on publisher",
- slotname),
+ sub->slotname),
errdetail("The error was: %s", res->err)));
else
ereport(NOTICE,
(errmsg("dropped replication slot \"%s\" on publisher",
- slotname)));
+ sub->slotname)));
walrcv_clear_result(res);
}
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 86a2b14..01ef614 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -42,12 +42,14 @@
#include "replication/worker_internal.h"
#include "storage/ipc.h"
+#include "storage/lmgr.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/procsignal.h"
#include "tcop/tcopprot.h"
+#include "utils/inval.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/ps_status.h"
@@ -73,6 +75,14 @@ typedef struct LogicalRepCtxStruct
LogicalRepCtxStruct *LogicalRepCtx;
+typedef struct LogicalRepWorkerId
+{
+ Oid subid;
+ Oid relid;
+} LogicalRepWorkerId;
+
+static List *on_commit_stop_workers = NIL;
+
static void ApplyLauncherWakeup(void);
static void logicalrep_launcher_onexit(int code, Datum arg);
static void logicalrep_worker_onexit(int code, Datum arg);
@@ -86,12 +96,11 @@ static bool on_commit_launcher_wakeup = false;
Datum pg_stat_get_subscription(PG_FUNCTION_ARGS);
-
/*
* Load the list of subscriptions.
*
- * Only the fields interesting for worker start/stop functions are filled for
- * each subscription.
+ * Only the fields interesting for worker start are filled for each
+ * subscription.
*/
static List *
get_subscription_list(void)
@@ -100,19 +109,13 @@ get_subscription_list(void)
Relation rel;
HeapScanDesc scan;
HeapTuple tup;
- MemoryContext resultcxt;
-
- /* This is the context that we will allocate our output data in */
- resultcxt = CurrentMemoryContext;
/*
- * Start a transaction so we can access pg_database, and get a snapshot.
* We don't have a use for the snapshot itself, but we're interested in
* the secondary effect that it sets RecentGlobalXmin. (This is critical
* for anything that reads heap pages, because HOT may decide to prune
* them even if the process doesn't attempt to modify any tuples.)
*/
- StartTransactionCommand();
(void) GetTransactionSnapshot();
rel = heap_open(SubscriptionRelationId, AccessShareLock);
@@ -121,34 +124,17 @@ get_subscription_list(void)
while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
{
Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
- Subscription *sub;
- MemoryContext oldcxt;
- /*
- * Allocate our results in the caller's context, not the
- * transaction's. We do this inside the loop, and restore the original
- * context at the end, so that leaky things like heap_getnext() are
- * not called in a potentially long-lived context.
- */
- oldcxt = MemoryContextSwitchTo(resultcxt);
-
- sub = (Subscription *) palloc0(sizeof(Subscription));
- sub->oid = HeapTupleGetOid(tup);
- sub->dbid = subform->subdbid;
- sub->owner = subform->subowner;
- sub->enabled = subform->subenabled;
- sub->name = pstrdup(NameStr(subform->subname));
- /* We don't fill fields we are not interested in. */
-
- res = lappend(res, sub);
- MemoryContextSwitchTo(oldcxt);
+ /* We only care about enabled subscriptions. */
+ if (!subform->subenabled)
+ continue;
+
+ res = lappend_oid(res, HeapTupleGetOid(tup));
}
heap_endscan(scan);
heap_close(rel, AccessShareLock);
- CommitTransactionCommand();
-
return res;
}
@@ -250,23 +236,68 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
}
/*
- * Start new apply background worker.
+ * Similar as logicalrep_worker_find(), but returns list of all workers
+ * for the subscription instead just one.
+ */
+List *
+logicalrep_sub_workers_find(Oid subid, bool only_running)
+{
+ int i;
+ List *res = NIL;
+
+ Assert(LWLockHeldByMe(LogicalRepWorkerLock));
+
+ /* Search for attached worker for a given subscription id. */
+ for (i = 0; i < max_logical_replication_workers; i++)
+ {
+ LogicalRepWorker *w = &LogicalRepCtx->workers[i];
+
+ if (w->in_use && w->subid == subid && (!only_running || w->proc))
+ res = lappend(res, w);
+ }
+
+ return res;
+}
+
+/*
+ * Start new logical replication background worker.
*/
void
-logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
- Oid relid)
+logicalrep_worker_launch(Oid subid, Oid relid)
{
BackgroundWorker bgw;
BackgroundWorkerHandle *bgw_handle;
int i;
int slot = 0;
- LogicalRepWorker *worker = NULL;
- int nsyncworkers;
+ List *subworkers;
+ ListCell *lc;
TimestampTz now;
+ int nsyncworkers = 0;
+ Subscription *sub;
+ LogicalRepWorker *worker = NULL;
ereport(DEBUG1,
- (errmsg("starting logical replication worker for subscription \"%s\"",
- subname)));
+ (errmsg("starting logical replication worker for subscription %u",
+ subid)));
+
+ /* Block any concurrent DDL on the subscription. */
+ LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+
+ /*
+ * Subscription might have been dropped in meantime, make sure our cache
+ * is up to date.
+ */
+ AcceptInvalidationMessages();
+
+ /* Get info about subscription. */
+ sub = GetSubscription(subid, true);
+ if (!sub)
+ {
+ ereport(DEBUG1,
+ (errmsg("subscription %u not found, not starting worker for it",
+ subid)));
+ return;
+ }
/* Report this after the initial starting message for consistency. */
if (max_replication_slots == 0)
@@ -294,7 +325,14 @@ retry:
}
}
- nsyncworkers = logicalrep_sync_worker_count(subid);
+ subworkers = logicalrep_sub_workers_find(subid, false);
+ foreach (lc, subworkers)
+ {
+ LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
+ if (w->relid != InvalidOid)
+ nsyncworkers ++;
+ }
+ list_free(subworkers);
now = GetCurrentTimestamp();
@@ -340,6 +378,7 @@ retry:
if (nsyncworkers >= max_sync_workers_per_subscription)
{
LWLockRelease(LogicalRepWorkerLock);
+ UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
return;
}
@@ -350,6 +389,7 @@ retry:
if (worker == NULL)
{
LWLockRelease(LogicalRepWorkerLock);
+ UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
ereport(WARNING,
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
errmsg("out of logical replication worker slots"),
@@ -362,8 +402,8 @@ retry:
worker->in_use = true;
worker->generation++;
worker->proc = NULL;
- worker->dbid = dbid;
- worker->userid = userid;
+ worker->dbid = sub->dbid;
+ worker->userid = sub->owner;
worker->subid = subid;
worker->relid = relid;
worker->relstate = SUBREL_STATE_UNKNOWN;
@@ -374,8 +414,6 @@ retry:
worker->reply_lsn = InvalidXLogRecPtr;
TIMESTAMP_NOBEGIN(worker->reply_time);
- LWLockRelease(LogicalRepWorkerLock);
-
/* Register the new dynamic worker. */
memset(&bgw, 0, sizeof(bgw));
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
@@ -394,8 +432,13 @@ retry:
bgw.bgw_notify_pid = MyProcPid;
bgw.bgw_main_arg = Int32GetDatum(slot);
+ /* Try to register the worker and cleanup in case of failure. */
if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
{
+ logicalrep_worker_cleanup(worker);
+ LWLockRelease(LogicalRepWorkerLock);
+ UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+
ereport(WARNING,
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
errmsg("out of background worker slots"),
@@ -403,13 +446,24 @@ retry:
return;
}
+ /* Done with the worker array. */
+ LWLockRelease(LogicalRepWorkerLock);
+
/* Now wait until it attaches. */
WaitForReplicationWorkerAttach(worker, bgw_handle);
+
+ /*
+ * Worker either started or died, in any case we are done with the
+ * subscription.
+ */
+ UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
}
/*
* Stop the logical replication worker and wait until it detaches from the
* slot.
+ *
+ * Callers of this function better have exclusive lock on the subscription.
*/
void
logicalrep_worker_stop(Oid subid, Oid relid)
@@ -417,7 +471,8 @@ logicalrep_worker_stop(Oid subid, Oid relid)
LogicalRepWorker *worker;
uint16 generation;
- LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+ /* Exclusive is needed for logicalrep_worker_cleanup(). */
+ LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
worker = logicalrep_worker_find(subid, relid, false);
@@ -428,56 +483,16 @@ logicalrep_worker_stop(Oid subid, Oid relid)
return;
}
+ /* If there is worker but it's not running, clean it up. */
+ if (!worker->proc)
+ logicalrep_worker_cleanup(worker);
+
/*
* Remember which generation was our worker so we can check if what we see
* is still the same one.
*/
generation = worker->generation;
- /*
- * If we found worker but it does not have proc set it is starting up,
- * wait for it to finish and then kill it.
- */
- while (worker->in_use && !worker->proc)
- {
- int rc;
-
- LWLockRelease(LogicalRepWorkerLock);
-
- /* Wait for signal. */
- rc = WaitLatch(MyLatch,
- WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
- 1000L, WAIT_EVENT_BGWORKER_STARTUP);
-
- /* emergency bailout if postmaster has died */
- if (rc & WL_POSTMASTER_DEATH)
- proc_exit(1);
-
- if (rc & WL_LATCH_SET)
- {
- ResetLatch(MyLatch);
- CHECK_FOR_INTERRUPTS();
- }
-
- /* Check worker status. */
- LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-
- /*
- * Check whether the worker slot is no longer used, which would mean
- * that the worker has exited, or whether the worker generation is
- * different, meaning that a different worker has taken the slot.
- */
- if (!worker->in_use || worker->generation != generation)
- {
- LWLockRelease(LogicalRepWorkerLock);
- return;
- }
-
- /* Worker has assigned proc, so it has started. */
- if (worker->proc)
- break;
- }
-
/* Now terminate the worker ... */
kill(worker->proc->pid, SIGTERM);
LWLockRelease(LogicalRepWorkerLock);
@@ -497,7 +512,10 @@ logicalrep_worker_stop(Oid subid, Oid relid)
CHECK_FOR_INTERRUPTS();
- /* Wait for more work. */
+ /*
+ * We need timeout because we generally don't get notified via latch
+ * about the worker attach.
+ */
rc = WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
1000L, WAIT_EVENT_BGWORKER_SHUTDOWN);
@@ -515,6 +533,27 @@ logicalrep_worker_stop(Oid subid, Oid relid)
}
/*
+ * Request worker to be stopped on commit.
+ */
+void
+logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
+{
+ LogicalRepWorkerId *wid;
+ MemoryContext oldctx;
+
+ /* Make sure we store the info in context which survives until commit. */
+ oldctx = MemoryContextSwitchTo(TopTransactionContext);
+
+ wid = palloc(sizeof(LogicalRepWorkerId));
+ wid->subid = subid;
+ wid->relid = relid;
+
+ on_commit_stop_workers = lappend(on_commit_stop_workers, wid);
+
+ MemoryContextSwitchTo(oldctx);
+}
+
+/*
* Wake up (using latch) the logical replication worker.
*/
void
@@ -648,30 +687,6 @@ logicalrep_launcher_sighup(SIGNAL_ARGS)
}
/*
- * Count the number of registered (not necessarily running) sync workers
- * for a subscription.
- */
-int
-logicalrep_sync_worker_count(Oid subid)
-{
- int i;
- int res = 0;
-
- Assert(LWLockHeldByMe(LogicalRepWorkerLock));
-
- /* Search for attached worker for a given subscription id. */
- for (i = 0; i < max_logical_replication_workers; i++)
- {
- LogicalRepWorker *w = &LogicalRepCtx->workers[i];
-
- if (w->subid == subid && OidIsValid(w->relid))
- res++;
- }
-
- return res;
-}
-
-/*
* ApplyLauncherShmemSize
* Compute space needed for replication launcher shared memory
*/
@@ -749,14 +764,41 @@ ApplyLauncherShmemInit(void)
}
/*
+ * XactManipulatesLogicalReplicationWorkers
+ * Check whether current transaction has manipulated logical replication
+ * workers.
+ */
+bool
+XactManipulatesLogicalReplicationWorkers(void)
+{
+ return (on_commit_stop_workers != NIL);
+}
+
+/*
* Wakeup the launcher on commit if requested.
*/
void
AtEOXact_ApplyLauncher(bool isCommit)
{
- if (isCommit && on_commit_launcher_wakeup)
- ApplyLauncherWakeup();
+ ListCell *lc;
+
+ if (isCommit)
+ {
+ foreach (lc, on_commit_stop_workers)
+ {
+ LogicalRepWorkerId *wid = lfirst(lc);
+ logicalrep_worker_stop(wid->subid, wid->relid);
+ }
+
+ if (on_commit_launcher_wakeup)
+ ApplyLauncherWakeup();
+ }
+ /*
+ * No need to pfree on_commit_stop_workers, it's been allocated in
+ * transaction memory context which is going to be cleaned soon.
+ */
+ on_commit_stop_workers = NIL;
on_commit_launcher_wakeup = false;
}
@@ -814,8 +856,6 @@ ApplyLauncherMain(Datum main_arg)
int rc;
List *sublist;
ListCell *lc;
- MemoryContext subctx;
- MemoryContext oldctx;
TimestampTz now;
long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
@@ -827,41 +867,38 @@ ApplyLauncherMain(Datum main_arg)
if (TimestampDifferenceExceeds(last_start_time, now,
wal_retrieve_retry_interval))
{
- /* Use temporary context for the database list and worker info. */
- subctx = AllocSetContextCreate(TopMemoryContext,
- "Logical Replication Launcher sublist",
- ALLOCSET_DEFAULT_MINSIZE,
- ALLOCSET_DEFAULT_INITSIZE,
- ALLOCSET_DEFAULT_MAXSIZE);
- oldctx = MemoryContextSwitchTo(subctx);
-
- /* search for subscriptions to start or stop. */
+ /*
+ * Start new transaction so that we can take locks and snapshots.
+ *
+ * Any allocations will also be made inside the transaction memory
+ * context.
+ */
+ StartTransactionCommand();
+
+ /* Search for subscriptions to start. */
sublist = get_subscription_list();
- /* Start the missing workers for enabled subscriptions. */
+ /* Start the missing workers. */
foreach(lc, sublist)
{
- Subscription *sub = (Subscription *) lfirst(lc);
+ Oid subid = lfirst_oid(lc);
LogicalRepWorker *w;
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
- w = logicalrep_worker_find(sub->oid, InvalidOid, false);
+ w = logicalrep_worker_find(subid, InvalidOid, false);
LWLockRelease(LogicalRepWorkerLock);
- if (sub->enabled && w == NULL)
+ if (w == NULL)
{
last_start_time = now;
wait_time = wal_retrieve_retry_interval;
- logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
- sub->owner, InvalidOid);
+ /* Start the worker. */
+ logicalrep_worker_launch(subid, InvalidOid);
}
}
- /* Switch back to original memory context. */
- MemoryContextSwitchTo(oldctx);
- /* Clean the temporary memory. */
- MemoryContextDelete(subctx);
+ CommitTransactionCommand();
}
else
{
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 3ef12df..11f4977 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -215,7 +215,7 @@ wait_for_relation_state_change(Oid relid, char expected_state)
* Returns false if the apply worker has disappeared or the table state has been
* reset.
*/
-static bool
+static void
wait_for_worker_state_change(char expected_state)
{
int rc;
@@ -232,10 +232,13 @@ wait_for_worker_state_change(char expected_state)
InvalidOid, false);
LWLockRelease(LogicalRepWorkerLock);
if (!worker)
- return false;
+ ereport(FATAL,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("terminating logical replication synchronization "
+ "worker due to subscription apply worker exit")));
if (MyLogicalRepWorker->relstate == expected_state)
- return true;
+ return;
rc = WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
@@ -247,8 +250,6 @@ wait_for_worker_state_change(char expected_state)
ResetLatch(MyLatch);
}
-
- return false;
}
/*
@@ -285,11 +286,10 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
SpinLockRelease(&MyLogicalRepWorker->relmutex);
- SetSubscriptionRelState(MyLogicalRepWorker->subid,
- MyLogicalRepWorker->relid,
- MyLogicalRepWorker->relstate,
- MyLogicalRepWorker->relstate_lsn,
- true);
+ UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+ MyLogicalRepWorker->relid,
+ MyLogicalRepWorker->relstate,
+ MyLogicalRepWorker->relstate_lsn);
walrcv_endstreaming(wrconn, &tli);
finish_sync_worker();
@@ -332,6 +332,13 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
ListCell *lc;
bool started_tx = false;
+#define ensure_transaction() \
+ if (!started_tx) \
+ {\
+ StartTransactionCommand(); \
+ started_tx = true; \
+ }
+
Assert(!IsTransactionState());
/* We need up-to-date sync state info for subscription tables here. */
@@ -346,8 +353,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
list_free_deep(table_states);
table_states = NIL;
- StartTransactionCommand();
- started_tx = true;
+ ensure_transaction();
/* Fetch all non-ready tables. */
rstates = GetSubscriptionNotReadyRelations(MySubscription->oid);
@@ -409,14 +415,11 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
{
rstate->state = SUBREL_STATE_READY;
rstate->lsn = current_lsn;
- if (!started_tx)
- {
- StartTransactionCommand();
- started_tx = true;
- }
- SetSubscriptionRelState(MyLogicalRepWorker->subid,
- rstate->relid, rstate->state,
- rstate->lsn, true);
+
+ ensure_transaction();
+ UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+ rstate->relid, rstate->state,
+ rstate->lsn);
}
}
else
@@ -435,13 +438,26 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
SpinLockRelease(&syncworker->relmutex);
}
else
+ {
+ List *subworkers;
+ ListCell *lc;
/*
* If there is no sync worker for this table yet, count
* running sync workers for this subscription, while we have
* the lock, for later.
*/
- nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
+ subworkers =
+ logicalrep_sub_workers_find(MyLogicalRepWorker->subid,
+ false);
+ foreach (lc, subworkers)
+ {
+ LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
+ if (w->relid != InvalidOid)
+ nsyncworkers ++;
+ }
+ list_free(subworkers);
+ }
LWLockRelease(LogicalRepWorkerLock);
/*
@@ -467,11 +483,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
* Enter busy loop and wait for synchronization worker to
* reach expected state (or die trying).
*/
- if (!started_tx)
- {
- StartTransactionCommand();
- started_tx = true;
- }
+ ensure_transaction();
wait_for_relation_state_change(rstate->relid,
SUBREL_STATE_SYNCDONE);
}
@@ -493,10 +505,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
TimestampDifferenceExceeds(hentry->last_start_time, now,
wal_retrieve_retry_interval))
{
- logicalrep_worker_launch(MyLogicalRepWorker->dbid,
- MySubscription->oid,
- MySubscription->name,
- MyLogicalRepWorker->userid,
+ ensure_transaction();
+ logicalrep_worker_launch(MySubscription->oid,
rstate->relid);
hentry->last_start_time = now;
}
@@ -798,6 +808,15 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
MyLogicalRepWorker->relid,
&relstate_lsn, true);
+ if (relstate == SUBREL_STATE_UNKNOWN)
+ {
+ ereport(LOG,
+ (errmsg("logical replication table synchronization worker for subscription \"%s\", "
+ "table \"%s\" will stop because the table is no longer subscribed",
+ MySubscription->name,
+ get_rel_name(MyLogicalRepWorker->relid))));
+ proc_exit(0);
+ }
CommitTransactionCommand();
SpinLockAcquire(&MyLogicalRepWorker->relmutex);
@@ -844,11 +863,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
/* Update the state and make it visible to others. */
StartTransactionCommand();
- SetSubscriptionRelState(MyLogicalRepWorker->subid,
- MyLogicalRepWorker->relid,
- MyLogicalRepWorker->relstate,
- MyLogicalRepWorker->relstate_lsn,
- true);
+ UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+ MyLogicalRepWorker->relid,
+ MyLogicalRepWorker->relstate,
+ MyLogicalRepWorker->relstate_lsn);
CommitTransactionCommand();
pgstat_report_stat(false);
@@ -933,11 +951,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* Update the new state in catalog. No need to bother
* with the shmem state as we are exiting for good.
*/
- SetSubscriptionRelState(MyLogicalRepWorker->subid,
- MyLogicalRepWorker->relid,
- SUBREL_STATE_SYNCDONE,
- *origin_startpos,
- true);
+ UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+ MyLogicalRepWorker->relid,
+ SUBREL_STATE_SYNCDONE,
+ *origin_startpos);
finish_sync_worker();
}
break;
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 898c497..085dd8c 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1516,24 +1516,31 @@ ApplyWorkerMain(Datum main_arg)
ALLOCSET_DEFAULT_SIZES);
StartTransactionCommand();
oldctx = MemoryContextSwitchTo(ApplyContext);
- MySubscription = GetSubscription(MyLogicalRepWorker->subid, false);
+ MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
+ if (!MySubscription)
+ {
+ ereport(LOG,
+ (errmsg("logical replication apply worker for subscription %u will "
+ "stop because the subscription was removed",
+ MyLogicalRepWorker->subid)));
+ proc_exit(0);
+ }
MySubscriptionValid = true;
MemoryContextSwitchTo(oldctx);
- /* Setup synchronous commit according to the user's wishes */
- SetConfigOption("synchronous_commit", MySubscription->synccommit,
- PGC_BACKEND, PGC_S_OVERRIDE);
-
if (!MySubscription->enabled)
{
ereport(LOG,
- (errmsg("logical replication apply worker for subscription \"%s\" will not "
- "start because the subscription was disabled during startup",
+ (errmsg("logical replication apply worker for subscription \"%s\" will "
+ "stop because the subscription was disabled",
MySubscription->name)));
-
proc_exit(0);
}
+ /* Setup synchronous commit according to the user's wishes */
+ SetConfigOption("synchronous_commit", MySubscription->synccommit,
+ PGC_BACKEND, PGC_S_OVERRIDE);
+
/* Keep us informed about subscription changes. */
CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
subscription_change_cb,
diff --git a/src/backend/utils/cache/catcache.c b/src/backend/utils/cache/catcache.c
index e7e8e3b..639b4eb 100644
--- a/src/backend/utils/cache/catcache.c
+++ b/src/backend/utils/cache/catcache.c
@@ -1052,10 +1052,12 @@ IndexScanOK(CatCache *cache, ScanKey cur_skey)
case AUTHNAME:
case AUTHOID:
case AUTHMEMMEMROLE:
+ case SUBSCRIPTIONOID:
+ case SUBSCRIPTIONNAME:
/*
- * Protect authentication lookups occurring before relcache has
- * collected entries for shared indexes.
+ * Protect authentication and subscription lookups occurring
+ * before relcache has collected entries for shared indexes.
*/
if (!criticalSharedRelcachesBuilt)
return false;
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index 991ca9d..c5b0b9c 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -70,8 +70,10 @@ typedef struct SubscriptionRelState
char state;
} SubscriptionRelState;
-extern Oid SetSubscriptionRelState(Oid subid, Oid relid, char state,
- XLogRecPtr sublsn, bool update_only);
+extern Oid AddSubscriptionRelState(Oid subid, Oid relid, char state,
+ XLogRecPtr sublsn);
+extern Oid UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
+ XLogRecPtr sublsn);
extern char GetSubscriptionRelState(Oid subid, Oid relid,
XLogRecPtr *sublsn, bool missing_ok);
extern void RemoveSubscriptionRel(Oid subid, Oid relid);
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
index aac7d32..78016c4 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -22,6 +22,7 @@ extern Size ApplyLauncherShmemSize(void);
extern void ApplyLauncherShmemInit(void);
extern void ApplyLauncherWakeupAtCommit(void);
+extern bool XactManipulatesLogicalReplicationWorkers(void);
extern void AtEOXact_ApplyLauncher(bool isCommit);
extern bool IsLogicalLauncher(void);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 494a3a3..add7841 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -71,13 +71,13 @@ extern bool in_remote_transaction;
extern void logicalrep_worker_attach(int slot);
extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
bool only_running);
-extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
- Oid userid, Oid relid);
+extern void logicalrep_worker_launch(Oid subid, Oid relid);
extern void logicalrep_worker_stop(Oid subid, Oid relid);
+extern void logicalrep_worker_stop_at_commit(Oid subid, Oid relid);
extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
-extern int logicalrep_sync_worker_count(Oid subid);
+extern List *logicalrep_sub_workers_find(Oid subid, bool only_running);
extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos);
void process_syncing_tables(XLogRecPtr current_lsn);
--
2.7.4
On Fri, Jun 23, 2017 at 09:42:10PM -0400, Peter Eisentraut wrote:
On 6/21/17 22:47, Peter Eisentraut wrote:
On 6/20/17 22:44, Noah Misch wrote:
A patch has been posted, and it's being reviewed. Next update Monday.
This PostgreSQL 10 open item is past due for your status update. Kindly send
a status update within 24 hours, and include a date for your subsequent status
update. Refer to the policy on open item ownership:
/messages/by-id/20170404140717.GA2675809@tornado.leadboat.comI'm not sure how to proceed here. Nobody is else seems terribly
interested, and this is really a minor issue, so I don't want to muck
around with the locking endlessly. Let's say, if there are no new
insights by Friday, I'll pick one of the proposed patches or just close
it without any patch.After some comments, a new patch has been posted, and I'll give until
Monday for review.
This PostgreSQL 10 open item is past due for your status update. Kindly send
a status update within 24 hours, and include a date for your subsequent status
update. Refer to the policy on open item ownership:
/messages/by-id/20170404140717.GA2675809@tornado.leadboat.com
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Wed, Jun 28, 2017 at 1:47 AM, Petr Jelinek
<petr.jelinek@2ndquadrant.com> wrote:
On 27/06/17 10:51, Masahiko Sawada wrote:
On Mon, Jun 26, 2017 at 12:12 PM, Masahiko Sawada <sawada.mshk@gmail.com> wrote:
I've reviewed this patch briefly.
Thanks!
@@ -515,6 +533,31 @@ logicalrep_worker_stop(Oid subid, Oid relid)
}/* + * Request worker to be stopped on commit. + */ +void +logicalrep_worker_stop_at_commit(Oid subid, Oid relid) +{ + LogicalRepWorkerId *wid; + MemoryContext old; + + old = MemoryContextSwitchTo(TopTransactionContext); + + wid = (LogicalRepWorkerId *) palloc(sizeof(LogicalRepWorkerId)); + + /* + wid = MemoryContextAlloc(TopTransactionContext, + sizeof(LogicalRepWorkerId)); + */ + wid->subid = subid; + wid->relid = relid; + + on_commit_stop_workers = lappend(on_commit_stop_workers, wid); + + MemoryContextSwitchTo(old); +}logicalrep_worker_stop_at_commit() has a problem that new_list()
called by lappend() allocates the memory from current memory context.
It should switch the memory context and then allocate the memory for
wid and append it to the list.
Thank you for updating the patch!
Right, fixed (I see you did that locally as well based on the above
excerpt ;) ).
Oops, yeah that's my test code.
-------- @@ -754,9 +773,25 @@ ApplyLauncherShmemInit(void) void AtEOXact_ApplyLauncher(bool isCommit) { - if (isCommit && on_commit_launcher_wakeup) - ApplyLauncherWakeup(); + ListCell *lc; + + if (isCommit) + { + foreach (lc, on_commit_stop_workers) + { + LogicalRepWorkerId *wid = lfirst(lc); + logicalrep_worker_stop(wid->subid, wid->relid); + } + + if (on_commit_launcher_wakeup) + ApplyLauncherWakeup();Stopping the logical rep worker in AtEOXact_ApplyLauncher doesn't
support the prepared transaction. Since we allocate the list
on_commit_stop_workers in TopTransactionContext the postgres crashes
if we execute any query after prepared transaction that removes
subscription relation state. Also after fixed this issue, we still
need to something: the list of on_commit_stop_workers is not
associated the prepared transaction. A query next to "preapre
transaction" tries to stop workers at the commit. There was similar
discussion before.Hmm, good point. I think for now it makes sense to simply don't allow
PREPARE for transactions that manipulate workers similar to what we do
when there are exported snapshots. Done it that way in attached.
Agreed. Should we note that in docs?
-------- + + ensure_transaction(); + UpdateSubscriptionRelState(MyLogicalRepWorker->subid, + rstate->relid, rstate->state, + rstate->lsn);Should we commit the transaction if we started a new transaction
before update the subscription relation state, or it could be deadlock
risk.We only lock the whole subscription (and only conflicting things are
DROP and ALTER SUBSCRIPTION), not individual subscription-relation
mapping so it doesn't seem to me like there is any higher risk of
deadlocks than anything else which works with multiple tables (or
compared to previous behavior).
I'm concerned that a lock for whole subscription could be conflicted
between ALTER SUBSCRIPTION, table sync worker and apply worker:
Please imagine the following case.
1. The apply worker updates a subscription relation state R1 of
subscription S1.
-> It acquires AccessShareLock on S1, and keep holding.
2. ALTER SUBSCRIPTION SET PUBLICATION tries to acquire
AccessExclusiveLock on S1.
-> But it waits for the apply worker to release the lock.
3. The apply worker calls wait_for_relation_state_change(relid,
SUBREL_STATE_SYNCDONE) and waits for the table sync worker
for R2 to change its status.
-> Note that the apply worker is still holding AccessShareLock on S1
4. The table sync worker tries to update its status to SYNCDONE
-> In UpdateSubscriptionRelState(), it tires to acquire AccessShareLock
on S1 but waits for it. *deadlock*
To summary, because the apply worker keeps holding AccessShareLock on
S1, the acquiring AccessExclusiveLock by ALTER SUBSCRIPTION waits for
the apply worker, and then the table sync worker also waits for the
ALTER SUBSCRIPTION in order to change its status. And the apply worker
waits for the table sync worker to change its status.
I encountered the similar case once on my environment. But since it
completely depends on timing I don't have the concrete reproducing
steps.
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Wed, Jun 28, 2017 at 03:22:18AM +0000, Noah Misch wrote:
On Fri, Jun 23, 2017 at 09:42:10PM -0400, Peter Eisentraut wrote:
On 6/21/17 22:47, Peter Eisentraut wrote:
On 6/20/17 22:44, Noah Misch wrote:
A patch has been posted, and it's being reviewed. Next update Monday.
This PostgreSQL 10 open item is past due for your status update. Kindly send
a status update within 24 hours, and include a date for your subsequent status
update. Refer to the policy on open item ownership:
/messages/by-id/20170404140717.GA2675809@tornado.leadboat.comI'm not sure how to proceed here. Nobody is else seems terribly
interested, and this is really a minor issue, so I don't want to muck
around with the locking endlessly. Let's say, if there are no new
insights by Friday, I'll pick one of the proposed patches or just close
it without any patch.After some comments, a new patch has been posted, and I'll give until
Monday for review.This PostgreSQL 10 open item is past due for your status update. Kindly send
a status update within 24 hours, and include a date for your subsequent status
update. Refer to the policy on open item ownership:
/messages/by-id/20170404140717.GA2675809@tornado.leadboat.com
IMMEDIATE ATTENTION REQUIRED. This PostgreSQL 10 open item is long past due
for your status update. Please reacquaint yourself with the policy on open
item ownership[1]/messages/by-id/20170404140717.GA2675809@tornado.leadboat.com and then reply immediately. If I do not hear from you by
2017-07-01 04:00 UTC, I will transfer this item to release management team
ownership without further notice.
[1]: /messages/by-id/20170404140717.GA2675809@tornado.leadboat.com
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 6/29/17 23:39, Noah Misch wrote:
IMMEDIATE ATTENTION REQUIRED. This PostgreSQL 10 open item is long past due
for your status update. Please reacquaint yourself with the policy on open
item ownership[1] and then reply immediately. If I do not hear from you by
2017-07-01 04:00 UTC, I will transfer this item to release management team
ownership without further notice.
I will work on this over the weekend and report back on Monday.
--
Peter Eisentraut http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Wed, Jun 28, 2017 at 2:13 PM, Masahiko Sawada <sawada.mshk@gmail.com> wrote:
On Wed, Jun 28, 2017 at 1:47 AM, Petr Jelinek
<petr.jelinek@2ndquadrant.com> wrote:On 27/06/17 10:51, Masahiko Sawada wrote:
On Mon, Jun 26, 2017 at 12:12 PM, Masahiko Sawada <sawada.mshk@gmail.com> wrote:
I've reviewed this patch briefly.
Thanks!
@@ -515,6 +533,31 @@ logicalrep_worker_stop(Oid subid, Oid relid)
}/* + * Request worker to be stopped on commit. + */ +void +logicalrep_worker_stop_at_commit(Oid subid, Oid relid) +{ + LogicalRepWorkerId *wid; + MemoryContext old; + + old = MemoryContextSwitchTo(TopTransactionContext); + + wid = (LogicalRepWorkerId *) palloc(sizeof(LogicalRepWorkerId)); + + /* + wid = MemoryContextAlloc(TopTransactionContext, + sizeof(LogicalRepWorkerId)); + */ + wid->subid = subid; + wid->relid = relid; + + on_commit_stop_workers = lappend(on_commit_stop_workers, wid); + + MemoryContextSwitchTo(old); +}logicalrep_worker_stop_at_commit() has a problem that new_list()
called by lappend() allocates the memory from current memory context.
It should switch the memory context and then allocate the memory for
wid and append it to the list.Thank you for updating the patch!
Right, fixed (I see you did that locally as well based on the above
excerpt ;) ).Oops, yeah that's my test code.
-------- @@ -754,9 +773,25 @@ ApplyLauncherShmemInit(void) void AtEOXact_ApplyLauncher(bool isCommit) { - if (isCommit && on_commit_launcher_wakeup) - ApplyLauncherWakeup(); + ListCell *lc; + + if (isCommit) + { + foreach (lc, on_commit_stop_workers) + { + LogicalRepWorkerId *wid = lfirst(lc); + logicalrep_worker_stop(wid->subid, wid->relid); + } + + if (on_commit_launcher_wakeup) + ApplyLauncherWakeup();Stopping the logical rep worker in AtEOXact_ApplyLauncher doesn't
support the prepared transaction. Since we allocate the list
on_commit_stop_workers in TopTransactionContext the postgres crashes
if we execute any query after prepared transaction that removes
subscription relation state. Also after fixed this issue, we still
need to something: the list of on_commit_stop_workers is not
associated the prepared transaction. A query next to "preapre
transaction" tries to stop workers at the commit. There was similar
discussion before.Hmm, good point. I think for now it makes sense to simply don't allow
PREPARE for transactions that manipulate workers similar to what we do
when there are exported snapshots. Done it that way in attached.Agreed. Should we note that in docs?
-------- + + ensure_transaction(); + UpdateSubscriptionRelState(MyLogicalRepWorker->subid, + rstate->relid, rstate->state, + rstate->lsn);Should we commit the transaction if we started a new transaction
before update the subscription relation state, or it could be deadlock
risk.We only lock the whole subscription (and only conflicting things are
DROP and ALTER SUBSCRIPTION), not individual subscription-relation
mapping so it doesn't seem to me like there is any higher risk of
deadlocks than anything else which works with multiple tables (or
compared to previous behavior).I'm concerned that a lock for whole subscription could be conflicted
between ALTER SUBSCRIPTION, table sync worker and apply worker:Please imagine the following case.
1. The apply worker updates a subscription relation state R1 of
subscription S1.
-> It acquires AccessShareLock on S1, and keep holding.
2. ALTER SUBSCRIPTION SET PUBLICATION tries to acquire
AccessExclusiveLock on S1.
-> But it waits for the apply worker to release the lock.
3. The apply worker calls wait_for_relation_state_change(relid,
SUBREL_STATE_SYNCDONE) and waits for the table sync worker
for R2 to change its status.
-> Note that the apply worker is still holding AccessShareLock on S1
4. The table sync worker tries to update its status to SYNCDONE
-> In UpdateSubscriptionRelState(), it tires to acquire AccessShareLock
on S1 but waits for it. *deadlock*To summary, because the apply worker keeps holding AccessShareLock on
S1, the acquiring AccessExclusiveLock by ALTER SUBSCRIPTION waits for
the apply worker, and then the table sync worker also waits for the
ALTER SUBSCRIPTION in order to change its status. And the apply worker
waits for the table sync worker to change its status.I encountered the similar case once on my environment. But since it
completely depends on timing I don't have the concrete reproducing
steps.
Also, now the patch conflicts with current HEAD, so need to be updated.
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 6/25/17 06:35, Petr Jelinek wrote:
- Do LockSharedObject in ALTER SUBSCRIPTION, DROP SUBSCRIPTION (this one
is preexisting but mentioning it for context), SetSubscriptionRelState,
AddSubscriptionRelState, and in the logicalrep_worker_launch. This means
we use granular per object locks to deal with concurrency.
I have committed those locking changes, as we had already discussed them
previously. This should address the open item.
Maybe we can start new threads for the other parts of the patch and
maybe split the patch up a bit. At this point I don't want to commit
major code rearrangements without specific reasons and detailed analysis.
--
Peter Eisentraut http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Tue, Jul 4, 2017 at 12:21 PM, Peter Eisentraut
<peter.eisentraut@2ndquadrant.com> wrote:
On 6/25/17 06:35, Petr Jelinek wrote:
- Do LockSharedObject in ALTER SUBSCRIPTION, DROP SUBSCRIPTION (this one
is preexisting but mentioning it for context), SetSubscriptionRelState,
AddSubscriptionRelState, and in the logicalrep_worker_launch. This means
we use granular per object locks to deal with concurrency.I have committed those locking changes, as we had already discussed them
previously. This should address the open item.
Thank you for committing the patch!
Maybe we can start new threads for the other parts of the patch and
maybe split the patch up a bit.
Yeah, let's discuss about reworking the locking and management on new thread.
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers