promotion related handling in pg_sync_replication_slots()
Hello hackers,
Currently, promotion related handling is missing in the slot sync SQL
function pg_sync_replication_slots(). Here is the background on how
it is done in slot sync worker:
During promotion, the startup process in order to shut down the
slot-sync worker, sets the 'stopSignaled' flag, sends the shut-down
signal, and waits for slot sync worker to exit. Meanwhile if the
postmaster has not noticed the promotion yet, it may end up restarting
slot sync worker. In such a case, the worker exits if 'stopSignaled'
is set.
Since there is a chance that the user (or any of his scripts/tools)
may execute SQL function pg_sync_replication_slots() in parallel to
promotion, such handling is needed in this SQL function as well, The
attached patch attempts to implement the same. Changes are:
1) If pg_sync_replication_slots() is already running when the
promotion is triggered, ShutDownSlotSync() checks the
'SlotSyncCtx->syncing' flag as well and waits for it to become false
i.e. waits till parallel running SQL function is finished.
2) If pg_sync_replication_slots() is invoked when promotion is
already in progress, pg_sync_replication_slots() respects the
'stopSignaled' flag set by the startup process and becomes a no-op.
thanks
Shveta
Attachments:
v1-0001-Handle-stopSignaled-during-sync-function-call.patchapplication/octet-stream; name=v1-0001-Handle-stopSignaled-during-sync-function-call.patchDownload
From 61d17c40775bd6b3fe5f92a89aa896930068b5dd Mon Sep 17 00:00:00 2001
From: Shveta Malik <shveta.malik@gmail.com>
Date: Thu, 4 Apr 2024 15:58:28 +0530
Subject: [PATCH v1] Handle stopSignaled during sync function call.
During promotion, startup process shuts down slot sync
worker and sets 'stopSignaled'. Menawhile if slot sync worker
is restarted by postmaster, the worker exits if 'stopSignaled'
is set.
This handling was missing in slot sync SQL function pg_sync_replication_slots().
This patch adds same handling for this function call. Changes are:
1) ShutDownSlotSync() now checks 'SlotSyncCtx->syncing' flag as well and
waits for it to become false i.e. waits till parallel running SQL
function is finished.
2) On the other hand, pg_sync_replication_slots() respects the 'stopSignaled'
flag and becomes a no-op if executed by the user in parallel to promotion.
---
src/backend/replication/logical/slotsync.c | 37 +++++++++++++++++++---
1 file changed, 32 insertions(+), 5 deletions(-)
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 9ac847b780..16b200b895 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -1357,6 +1357,10 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
/*
* Shut down the slot sync worker.
+ *
+ * It sends signal to shutdown slot sync worker. It also waits till
+ * the slot sync worker has exited and pg_sync_replication_slots()
+ * has finished.
*/
void
ShutDownSlotSync(void)
@@ -1365,16 +1369,21 @@ ShutDownSlotSync(void)
SlotSyncCtx->stopSignaled = true;
- if (SlotSyncCtx->pid == InvalidPid)
+ /*
+ * Return if neither the slot sync worker is running nor the function
+ * pg_sync_replication_slots().
+ */
+ if ((SlotSyncCtx->pid == InvalidPid) && !SlotSyncCtx->syncing)
{
SpinLockRelease(&SlotSyncCtx->mutex);
return;
}
SpinLockRelease(&SlotSyncCtx->mutex);
- kill(SlotSyncCtx->pid, SIGINT);
+ if (SlotSyncCtx->pid != InvalidPid)
+ kill(SlotSyncCtx->pid, SIGINT);
- /* Wait for it to die */
+ /* Wait for slot sync to exit */
for (;;)
{
int rc;
@@ -1392,8 +1401,11 @@ ShutDownSlotSync(void)
SpinLockAcquire(&SlotSyncCtx->mutex);
- /* Is it gone? */
- if (SlotSyncCtx->pid == InvalidPid)
+ /*
+ * Confirm that both the worker and the function
+ * pg_sync_replication_slots() are done.
+ */
+ if ((SlotSyncCtx->pid == InvalidPid) && !SlotSyncCtx->syncing)
break;
SpinLockRelease(&SlotSyncCtx->mutex);
@@ -1500,6 +1512,21 @@ slotsync_failure_callback(int code, Datum arg)
void
SyncReplicationSlots(WalReceiverConn *wrconn)
{
+ /*
+ * Startup process signaled the slot sync to stop, so if meanwhile user
+ * has initiated slot sync function call, be no-op.
+ */
+ SpinLockAcquire(&SlotSyncCtx->mutex);
+ if (SlotSyncCtx->stopSignaled)
+ {
+ ereport(LOG,
+ errmsg("skipping slot synchronization as slot sync shutdown is signaled during promotion"));
+
+ SpinLockRelease(&SlotSyncCtx->mutex);
+ return;
+ }
+ SpinLockRelease(&SlotSyncCtx->mutex);
+
PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
{
validate_remote_info(wrconn);
--
2.34.1
On Thu, Apr 4, 2024 at 5:05 PM shveta malik <shveta.malik@gmail.com> wrote:
Hello hackers,
Currently, promotion related handling is missing in the slot sync SQL
function pg_sync_replication_slots(). Here is the background on how
it is done in slot sync worker:
During promotion, the startup process in order to shut down the
slot-sync worker, sets the 'stopSignaled' flag, sends the shut-down
signal, and waits for slot sync worker to exit. Meanwhile if the
postmaster has not noticed the promotion yet, it may end up restarting
slot sync worker. In such a case, the worker exits if 'stopSignaled'
is set.Since there is a chance that the user (or any of his scripts/tools)
may execute SQL function pg_sync_replication_slots() in parallel to
promotion, such handling is needed in this SQL function as well, The
attached patch attempts to implement the same.
Thanks for the report and patch. I'll look into it.
--
With Regards,
Amit Kapila.
On Thu, Apr 4, 2024 at 5:17 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Thu, Apr 4, 2024 at 5:05 PM shveta malik <shveta.malik@gmail.com> wrote:
Hello hackers,
Currently, promotion related handling is missing in the slot sync SQL
function pg_sync_replication_slots(). Here is the background on how
it is done in slot sync worker:
During promotion, the startup process in order to shut down the
slot-sync worker, sets the 'stopSignaled' flag, sends the shut-down
signal, and waits for slot sync worker to exit. Meanwhile if the
postmaster has not noticed the promotion yet, it may end up restarting
slot sync worker. In such a case, the worker exits if 'stopSignaled'
is set.Since there is a chance that the user (or any of his scripts/tools)
may execute SQL function pg_sync_replication_slots() in parallel to
promotion, such handling is needed in this SQL function as well, The
attached patch attempts to implement the same.Thanks for the report and patch. I'll look into it.
Please find v2. Changes are:
1) Rebased the patch as there was conflict due to recent commit 6f132ed.
2) Added an Assert in update_synced_slots_inactive_since() to ensure
that the slot does not have active_pid.
3) Improved commit msg and comments.
thanks
Shveta
Attachments:
v2-0001-Handle-stopSignaled-during-sync-function-call.patchapplication/octet-stream; name=v2-0001-Handle-stopSignaled-during-sync-function-call.patchDownload
From 29b4a4f2301184665710fbd09fcdb077fd419125 Mon Sep 17 00:00:00 2001
From: Shveta Malik <shveta.malik@gmail.com>
Date: Fri, 5 Apr 2024 10:16:03 +0530
Subject: [PATCH v2] Handle stopSignaled during sync function call.
Currently, promotion related handling is missing in the slot sync SQL
function pg_sync_replication_slots(). Here is the background on how
it is done in slot sync worker:
During promotion, the startup process in order to shut down the
slot-sync worker, sets the 'stopSignaled' flag, sends the shut-down
signal, and waits for slot sync worker to exit. Meanwhile if the
postmaster has not noticed the promotion yet, it may end up restarting
slot sync worker. In such a case, the worker exits if 'stopSignaled'
is set.
Since there is a chance that the user (or any of his scripts/tools)
may execute SQL function pg_sync_replication_slots() in parallel to
promotion, such handling is needed in this SQL function as well, The
attached patch attempts to implement the same. Changes are:
1) If pg_sync_replication_slots() is already running when the
promotion is triggered, ShutDownSlotSync() checks the
'SlotSyncCtx->syncing' flag as well and waits for it to become false
i.e. waits till parallel running SQL function is finished.
2) If pg_sync_replication_slots() is invoked when promotion is
already in progress, pg_sync_replication_slots() respects the
'stopSignaled' flag set by the startup process and becomes a no-op.
---
src/backend/replication/logical/slotsync.c | 38 +++++++++++++++++++---
1 file changed, 33 insertions(+), 5 deletions(-)
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index d18e2c7342..0064578e9c 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -1395,6 +1395,7 @@ update_synced_slots_inactive_since(void)
if (s->in_use && s->data.synced)
{
Assert(SlotIsLogical(s));
+ Assert(s->active_pid == 0);
/* Use the same inactive_since time for all the slots. */
if (now == 0)
@@ -1411,6 +1412,10 @@ update_synced_slots_inactive_since(void)
/*
* Shut down the slot sync worker.
+ *
+ * It sends signal to shutdown slot sync worker. It also waits till
+ * the slot sync worker has exited and pg_sync_replication_slots()
+ * has finished.
*/
void
ShutDownSlotSync(void)
@@ -1419,7 +1424,11 @@ ShutDownSlotSync(void)
SlotSyncCtx->stopSignaled = true;
- if (SlotSyncCtx->pid == InvalidPid)
+ /*
+ * Return if neither the slot sync worker is running nor the function
+ * pg_sync_replication_slots() is executing.
+ */
+ if ((SlotSyncCtx->pid == InvalidPid) && !SlotSyncCtx->syncing)
{
SpinLockRelease(&SlotSyncCtx->mutex);
update_synced_slots_inactive_since();
@@ -1427,9 +1436,10 @@ ShutDownSlotSync(void)
}
SpinLockRelease(&SlotSyncCtx->mutex);
- kill(SlotSyncCtx->pid, SIGINT);
+ if (SlotSyncCtx->pid != InvalidPid)
+ kill(SlotSyncCtx->pid, SIGINT);
- /* Wait for it to die */
+ /* Wait for worker to exit and SQL function to finish */
for (;;)
{
int rc;
@@ -1447,8 +1457,11 @@ ShutDownSlotSync(void)
SpinLockAcquire(&SlotSyncCtx->mutex);
- /* Is it gone? */
- if (SlotSyncCtx->pid == InvalidPid)
+ /*
+ * Confirm that both the worker and the function
+ * pg_sync_replication_slots() are done.
+ */
+ if ((SlotSyncCtx->pid == InvalidPid) && !SlotSyncCtx->syncing)
break;
SpinLockRelease(&SlotSyncCtx->mutex);
@@ -1557,6 +1570,21 @@ slotsync_failure_callback(int code, Datum arg)
void
SyncReplicationSlots(WalReceiverConn *wrconn)
{
+ /*
+ * Startup process signaled the slot sync to stop, so if meanwhile user
+ * has invoked slot sync SQL function, simply return.
+ */
+ SpinLockAcquire(&SlotSyncCtx->mutex);
+ if (SlotSyncCtx->stopSignaled)
+ {
+ ereport(LOG,
+ errmsg("skipping slot synchronization as slot sync shutdown is signaled during promotion"));
+
+ SpinLockRelease(&SlotSyncCtx->mutex);
+ return;
+ }
+ SpinLockRelease(&SlotSyncCtx->mutex);
+
PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
{
validate_remote_info(wrconn);
--
2.34.1
On Fri, Apr 5, 2024 at 10:31 AM shveta malik <shveta.malik@gmail.com> wrote:
Please find v2. Changes are:
1) Rebased the patch as there was conflict due to recent commit 6f132ed.
2) Added an Assert in update_synced_slots_inactive_since() to ensure
that the slot does not have active_pid.
3) Improved commit msg and comments.
Few comments:
==============
1.
void
SyncReplicationSlots(WalReceiverConn *wrconn)
{
+ /*
+ * Startup process signaled the slot sync to stop, so if meanwhile user
+ * has invoked slot sync SQL function, simply return.
+ */
+ SpinLockAcquire(&SlotSyncCtx->mutex);
+ if (SlotSyncCtx->stopSignaled)
+ {
+ ereport(LOG,
+ errmsg("skipping slot synchronization as slot sync shutdown is
signaled during promotion"));
+
+ SpinLockRelease(&SlotSyncCtx->mutex);
+ return;
+ }
+ SpinLockRelease(&SlotSyncCtx->mutex);
There is a race condition with this code. Say during promotion
ShutDownSlotSync() is just before setting this flag and the user has
invoked pg_sync_replication_slots() and passed this check but still
didn't set the SlotSyncCtx->syncing flag. So, now, the promotion would
recognize that there is slot sync going on in parallel, and slot sync
wouldn't know that the promotion is in progress.
The current coding for slot sync worker ensures there is no such race
by checking the stopSignaled and setting the pid together under
spinlock. I think we need to move the setting of the syncing flag
similarly. Once we do that probably checking SlotSyncCtx->syncing
should be sufficient in ShutDownSlotSync(). If we change the location
of setting the 'syncing' flag then please ensure its cleanup as we
currently do in slotsync_failure_callback().
2.
@@ -1395,6 +1395,7 @@ update_synced_slots_inactive_since(void)
if (s->in_use && s->data.synced)
{
Assert(SlotIsLogical(s));
+ Assert(s->active_pid == 0);
We can add a comment like: "The slot must not be acquired by any process"
--
With Regards,
Amit Kapila.
On Fri, Apr 5, 2024 at 10:31 AM shveta malik <shveta.malik@gmail.com> wrote:
Please find v2. Changes are:
Thanks for the patch. Here are some comments.
1. Can we have a clear saying in the shmem variable who's syncing at
the moment? Is it a slot sync worker or a backend via SQL function?
Perhaps turn "bool syncing;" to "SlotSyncSource sync_source;"
typedef enum SlotSyncSource
{
SLOT_SYNC_NONE,
SLOT_SYNC_WORKER,
SLOT_SYNC_BACKEND,
} SlotSyncSource;
Then, the check in ShutDownSlotSync can be:
+ /*
+ * Return if neither the slot sync worker is running nor the function
+ * pg_sync_replication_slots() is executing.
+ */
+ if ((SlotSyncCtx->pid == InvalidPid) &&
SlotSyncCtx->sync_source != SLOT_SYNC_BACKEND)
{
2.
SyncReplicationSlots(WalReceiverConn *wrconn)
{
+ /*
+ * Startup process signaled the slot sync to stop, so if meanwhile user
+ * has invoked slot sync SQL function, simply return.
+ */
+ SpinLockAcquire(&SlotSyncCtx->mutex);
+ if (SlotSyncCtx->stopSignaled)
+ {
+ ereport(LOG,
+ errmsg("skipping slot synchronization as slot sync
shutdown is signaled during promotion"));
+
Unless I'm missing something, I think this can't detect if the backend
via SQL function is already half-way through syncing in
synchronize_one_slot. So, better move this check to (or also have it
there) slot sync loop that calls synchronize_one_slot. To avoid
spinlock acquisitions, we can perhaps do this check in when we acquire
the spinlock for synced flag.
/* Search for the named slot */
if ((slot = SearchNamedReplicationSlot(remote_slot->name, true)))
{
bool synced;
SpinLockAcquire(&slot->mutex);
synced = slot->data.synced;
<< get SlotSyncCtx->stopSignaled here >>
SpinLockRelease(&slot->mutex);
<< do the slot sync skip check here if (stopSignaled) >>
3. Can we have a test or steps at least to check the consequences
manually one can get if slot syncing via SQL function is happening
during the promotion?
IIUC, we need to ensure there is no backend acquiring it and
performing sync while the slot sync worker is shutting down/standby
promotion is occuring. Otherwise, some of the slots can get resynced
and some are not while we are shutting down the slot sync worker as
part of the standby promotion which might leave the slots in an
inconsistent state.
--
Bharath Rupireddy
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
On Sat, Apr 6, 2024 at 11:49 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Fri, Apr 5, 2024 at 10:31 AM shveta malik <shveta.malik@gmail.com> wrote:
Please find v2. Changes are:
1) Rebased the patch as there was conflict due to recent commit 6f132ed.
2) Added an Assert in update_synced_slots_inactive_since() to ensure
that the slot does not have active_pid.
3) Improved commit msg and comments.Few comments: ============== 1. void SyncReplicationSlots(WalReceiverConn *wrconn) { + /* + * Startup process signaled the slot sync to stop, so if meanwhile user + * has invoked slot sync SQL function, simply return. + */ + SpinLockAcquire(&SlotSyncCtx->mutex); + if (SlotSyncCtx->stopSignaled) + { + ereport(LOG, + errmsg("skipping slot synchronization as slot sync shutdown is signaled during promotion")); + + SpinLockRelease(&SlotSyncCtx->mutex); + return; + } + SpinLockRelease(&SlotSyncCtx->mutex);There is a race condition with this code. Say during promotion
ShutDownSlotSync() is just before setting this flag and the user has
invoked pg_sync_replication_slots() and passed this check but still
didn't set the SlotSyncCtx->syncing flag. So, now, the promotion would
recognize that there is slot sync going on in parallel, and slot sync
wouldn't know that the promotion is in progress.
Did you mean that now, the promotion *would not* recognize...
I see, I will fix this.
The current coding for slot sync worker ensures there is no such race
by checking the stopSignaled and setting the pid together under
spinlock. I think we need to move the setting of the syncing flag
similarly. Once we do that probably checking SlotSyncCtx->syncing
should be sufficient in ShutDownSlotSync(). If we change the location
of setting the 'syncing' flag then please ensure its cleanup as we
currently do in slotsync_failure_callback().
Sure, let me review.
Show quoted text
2.
@@ -1395,6 +1395,7 @@ update_synced_slots_inactive_since(void)
if (s->in_use && s->data.synced)
{
Assert(SlotIsLogical(s));
+ Assert(s->active_pid == 0);We can add a comment like: "The slot must not be acquired by any process"
--
With Regards,
Amit Kapila.
On Sat, Apr 6, 2024 at 12:25 PM Bharath Rupireddy
<bharath.rupireddyforpostgres@gmail.com> wrote:
On Fri, Apr 5, 2024 at 10:31 AM shveta malik <shveta.malik@gmail.com> wrote:
Please find v2. Changes are:
Thanks for the patch. Here are some comments.
Thanks for reviewing.
1. Can we have a clear saying in the shmem variable who's syncing at
the moment? Is it a slot sync worker or a backend via SQL function?
Perhaps turn "bool syncing;" to "SlotSyncSource sync_source;"typedef enum SlotSyncSource
{
SLOT_SYNC_NONE,
SLOT_SYNC_WORKER,
SLOT_SYNC_BACKEND,
} SlotSyncSource;Then, the check in ShutDownSlotSync can be:
+ /* + * Return if neither the slot sync worker is running nor the function + * pg_sync_replication_slots() is executing. + */ + if ((SlotSyncCtx->pid == InvalidPid) && SlotSyncCtx->sync_source != SLOT_SYNC_BACKEND) {2. SyncReplicationSlots(WalReceiverConn *wrconn) { + /* + * Startup process signaled the slot sync to stop, so if meanwhile user + * has invoked slot sync SQL function, simply return. + */ + SpinLockAcquire(&SlotSyncCtx->mutex); + if (SlotSyncCtx->stopSignaled) + { + ereport(LOG, + errmsg("skipping slot synchronization as slot sync shutdown is signaled during promotion")); +Unless I'm missing something, I think this can't detect if the backend
via SQL function is already half-way through syncing in
synchronize_one_slot. So, better move this check to (or also have it
there) slot sync loop that calls synchronize_one_slot. To avoid
spinlock acquisitions, we can perhaps do this check in when we acquire
the spinlock for synced flag.
If the sync via SQL function is already half-way, then promotion
should wait for it to finish. I don't think it is a good idea to move
the check to synchronize_one_slot(). The sync-call should either not
start (if it noticed the promotion) or finish the sync and then let
promotion proceed. But I would like to know others' opinion on this.
/* Search for the named slot */
if ((slot = SearchNamedReplicationSlot(remote_slot->name, true)))
{
bool synced;SpinLockAcquire(&slot->mutex);
synced = slot->data.synced;
<< get SlotSyncCtx->stopSignaled here >>
SpinLockRelease(&slot->mutex);<< do the slot sync skip check here if (stopSignaled) >>
3. Can we have a test or steps at least to check the consequences
manually one can get if slot syncing via SQL function is happening
during the promotion?IIUC, we need to ensure there is no backend acquiring it and
performing sync while the slot sync worker is shutting down/standby
promotion is occuring. Otherwise, some of the slots can get resynced
and some are not while we are shutting down the slot sync worker as
part of the standby promotion which might leave the slots in an
inconsistent state.
I do not think that we can reach a state (exception is some error
scenario) where some of the slots are synced while the rest are not
during a *particular* sync-cycle only because promotion is going in
parallel. (And yes we need to fix the race-condition stated by Amit
up-thread for this statement to be true.)
thanks
Shveta
On Fri, Apr 12, 2024 at 7:47 AM shveta malik <shveta.malik@gmail.com> wrote:
On Sat, Apr 6, 2024 at 11:49 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
Few comments: ============== 1. void SyncReplicationSlots(WalReceiverConn *wrconn) { + /* + * Startup process signaled the slot sync to stop, so if meanwhile user + * has invoked slot sync SQL function, simply return. + */ + SpinLockAcquire(&SlotSyncCtx->mutex); + if (SlotSyncCtx->stopSignaled) + { + ereport(LOG, + errmsg("skipping slot synchronization as slot sync shutdown is signaled during promotion")); + + SpinLockRelease(&SlotSyncCtx->mutex); + return; + } + SpinLockRelease(&SlotSyncCtx->mutex);There is a race condition with this code. Say during promotion
ShutDownSlotSync() is just before setting this flag and the user has
invoked pg_sync_replication_slots() and passed this check but still
didn't set the SlotSyncCtx->syncing flag. So, now, the promotion would
recognize that there is slot sync going on in parallel, and slot sync
wouldn't know that the promotion is in progress.Did you mean that now, the promotion *would not* recognize...
Right.
I see, I will fix this.
Thanks.
--
With Regards,
Amit Kapila.
On Fri, Apr 12, 2024 at 7:57 AM shveta malik <shveta.malik@gmail.com> wrote:
On Sat, Apr 6, 2024 at 12:25 PM Bharath Rupireddy
<bharath.rupireddyforpostgres@gmail.com> wrote:On Fri, Apr 5, 2024 at 10:31 AM shveta malik <shveta.malik@gmail.com> wrote:
Please find v2. Changes are:
Thanks for the patch. Here are some comments.
Thanks for reviewing.
1. Can we have a clear saying in the shmem variable who's syncing at
the moment? Is it a slot sync worker or a backend via SQL function?
Perhaps turn "bool syncing;" to "SlotSyncSource sync_source;"typedef enum SlotSyncSource
{
SLOT_SYNC_NONE,
SLOT_SYNC_WORKER,
SLOT_SYNC_BACKEND,
} SlotSyncSource;Then, the check in ShutDownSlotSync can be:
+ /* + * Return if neither the slot sync worker is running nor the function + * pg_sync_replication_slots() is executing. + */ + if ((SlotSyncCtx->pid == InvalidPid) && SlotSyncCtx->sync_source != SLOT_SYNC_BACKEND) {
I don't know if this will be help, especially after fixing the race
condition I mentioned. But otherwise, also, at this stage it doesn't
seem helpful to add the source of sync explicitly.
2. SyncReplicationSlots(WalReceiverConn *wrconn) { + /* + * Startup process signaled the slot sync to stop, so if meanwhile user + * has invoked slot sync SQL function, simply return. + */ + SpinLockAcquire(&SlotSyncCtx->mutex); + if (SlotSyncCtx->stopSignaled) + { + ereport(LOG, + errmsg("skipping slot synchronization as slot sync shutdown is signaled during promotion")); +Unless I'm missing something, I think this can't detect if the backend
via SQL function is already half-way through syncing in
synchronize_one_slot. So, better move this check to (or also have it
there) slot sync loop that calls synchronize_one_slot. To avoid
spinlock acquisitions, we can perhaps do this check in when we acquire
the spinlock for synced flag.If the sync via SQL function is already half-way, then promotion
should wait for it to finish. I don't think it is a good idea to move
the check to synchronize_one_slot(). The sync-call should either not
start (if it noticed the promotion) or finish the sync and then let
promotion proceed. But I would like to know others' opinion on this.
Agreed.
--
With Regards,
Amit Kapila.
On Fri, Apr 12, 2024 at 4:18 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Fri, Apr 12, 2024 at 7:57 AM shveta malik <shveta.malik@gmail.com> wrote:
On Sat, Apr 6, 2024 at 12:25 PM Bharath Rupireddy
<bharath.rupireddyforpostgres@gmail.com> wrote:On Fri, Apr 5, 2024 at 10:31 AM shveta malik <shveta.malik@gmail.com> wrote:
Please find v2. Changes are:
Thanks for the patch. Here are some comments.
Thanks for reviewing.
1. Can we have a clear saying in the shmem variable who's syncing at
the moment? Is it a slot sync worker or a backend via SQL function?
Perhaps turn "bool syncing;" to "SlotSyncSource sync_source;"typedef enum SlotSyncSource
{
SLOT_SYNC_NONE,
SLOT_SYNC_WORKER,
SLOT_SYNC_BACKEND,
} SlotSyncSource;Then, the check in ShutDownSlotSync can be:
+ /* + * Return if neither the slot sync worker is running nor the function + * pg_sync_replication_slots() is executing. + */ + if ((SlotSyncCtx->pid == InvalidPid) && SlotSyncCtx->sync_source != SLOT_SYNC_BACKEND) {I don't know if this will be help, especially after fixing the race
condition I mentioned. But otherwise, also, at this stage it doesn't
seem helpful to add the source of sync explicitly.
Agreed.
Please find v3 addressing race-condition and one other comment.
Up-thread it was suggested that, probably, checking
SlotSyncCtx->syncing should be sufficient in ShutDownSlotSync(). On
re-thinking, it might not be. Slot sync worker sets and resets
'syncing' with each sync-cycle, and thus we need to rely on worker's
pid in ShutDownSlotSync(), as there could be a window where promotion
is triggered and 'syncing' is not set for worker, while the worker is
still running. This implementation of setting and resetting syncing
with each sync-cycle looks better as compared to setting syncing
during the entire life-cycle of the worker. So, I did not change it.
To fix the race condition, I moved the setting of the 'syncing' flag
together with the 'stopSignaled' check under the same spinLock for the
SQL function. OTOH, for worker, I feel it is good to check
'stopSignaled' at the beginning itself, while retaining the
setting/resetting of 'syncing' at a later stage during the actual sync
cycle. This makes handling for SQL function and worker slightly
different. And thus to achieve this, I had to take the 'syncing' flag
handling out of synchronize_slots() and move it to both worker and SQL
function by introducing 2 new functions check_and_set_syncing_flag()
and reset_syncing_flag().
I am analyzing if there are better ways to achieve this, any
suggestions are welcome.
thanks
Shveta
Attachments:
v3-0001-Handle-stopSignaled-during-sync-function-call.patchapplication/octet-stream; name=v3-0001-Handle-stopSignaled-during-sync-function-call.patchDownload
From e3669efdf973a2c57c9eaba4b499d15fbb3116b8 Mon Sep 17 00:00:00 2001
From: Shveta Malik <shveta.malik@gmail.com>
Date: Fri, 5 Apr 2024 10:16:03 +0530
Subject: [PATCH v3] Handle stopSignaled during sync function call.
Currently, promotion related handling is missing in the slot sync SQL
function pg_sync_replication_slots(). Here is the background on how
it is done in slot sync worker:
During promotion, the startup process in order to shut down the
slot-sync worker, sets the 'stopSignaled' flag, sends the shut-down
signal, and waits for slot sync worker to exit. Meanwhile if the
postmaster has not noticed the promotion yet, it may end up restarting
slot sync worker. In such a case, the worker exits if 'stopSignaled'
is set.
Since there is a chance that the user (or any of his scripts/tools)
may execute SQL function pg_sync_replication_slots() in parallel to
promotion, such handling is needed in this SQL function as well, The
attached patch attempts to implement the same. Changes are:
1) If pg_sync_replication_slots() is already running when the
promotion is triggered, ShutDownSlotSync() checks the
'SlotSyncCtx->syncing' flag as well and waits for it to become false
i.e. waits till parallel running SQL function is finished.
2) If pg_sync_replication_slots() is invoked when promotion is
already in progress, pg_sync_replication_slots() respects the
'stopSignaled' flag set by the startup process and becomes a no-op.
---
src/backend/replication/logical/slotsync.c | 120 +++++++++++++++------
1 file changed, 90 insertions(+), 30 deletions(-)
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index bda0de52db..aade737b73 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -807,20 +807,6 @@ synchronize_slots(WalReceiverConn *wrconn)
" FROM pg_catalog.pg_replication_slots"
" WHERE failover and NOT temporary";
- SpinLockAcquire(&SlotSyncCtx->mutex);
- if (SlotSyncCtx->syncing)
- {
- SpinLockRelease(&SlotSyncCtx->mutex);
- ereport(ERROR,
- errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("cannot synchronize replication slots concurrently"));
- }
-
- SlotSyncCtx->syncing = true;
- SpinLockRelease(&SlotSyncCtx->mutex);
-
- syncing_slots = true;
-
/* The syscache access in walrcv_exec() needs a transaction env. */
if (!IsTransactionState())
{
@@ -937,12 +923,6 @@ synchronize_slots(WalReceiverConn *wrconn)
if (started_tx)
CommitTransactionCommand();
- SpinLockAcquire(&SlotSyncCtx->mutex);
- SlotSyncCtx->syncing = false;
- SpinLockRelease(&SlotSyncCtx->mutex);
-
- syncing_slots = false;
-
return some_slot_updated;
}
@@ -1242,6 +1222,47 @@ wait_for_slot_activity(bool some_slot_updated)
ResetLatch(MyLatch);
}
+/*
+ * Check syncing flag and error out if it is concurrent sync call.
+ * Otherwise, set syncing flag.
+ */
+static void
+check_and_set_syncing_flag(bool acquire_lock)
+{
+
+ /* Acquire spinLock if caller asked so */
+ if (acquire_lock)
+ SpinLockAcquire(&SlotSyncCtx->mutex);
+
+ if (SlotSyncCtx->syncing)
+ {
+ SpinLockRelease(&SlotSyncCtx->mutex);
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot synchronize replication slots concurrently"));
+ }
+
+ SlotSyncCtx->syncing = true;
+
+ if (acquire_lock)
+ SpinLockRelease(&SlotSyncCtx->mutex);
+
+ syncing_slots = true;
+}
+
+/*
+ * Reset syncing flag.
+ */
+static void
+reset_syncing_flag()
+{
+ SpinLockAcquire(&SlotSyncCtx->mutex);
+ SlotSyncCtx->syncing = false;
+ SpinLockRelease(&SlotSyncCtx->mutex);
+
+ syncing_slots = false;
+};
+
/*
* The main loop of our worker process.
*
@@ -1424,8 +1445,12 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
ProcessSlotSyncInterrupts(wrconn);
+ check_and_set_syncing_flag(true);
+
some_slot_updated = synchronize_slots(wrconn);
+ reset_syncing_flag();
+
wait_for_slot_activity(some_slot_updated);
}
@@ -1471,6 +1496,9 @@ update_synced_slots_inactive_since(void)
{
Assert(SlotIsLogical(s));
+ /* The slot must not be acquired by any process */
+ Assert(s->active_pid == 0);
+
/* Use the same inactive_since time for all the slots. */
if (now == 0)
now = GetCurrentTimestamp();
@@ -1486,6 +1514,10 @@ update_synced_slots_inactive_since(void)
/*
* Shut down the slot sync worker.
+ *
+ * It sends signal to shutdown slot sync worker. It also waits till
+ * the slot sync worker has exited and pg_sync_replication_slots()
+ * has finished.
*/
void
ShutDownSlotSync(void)
@@ -1494,7 +1526,11 @@ ShutDownSlotSync(void)
SlotSyncCtx->stopSignaled = true;
- if (SlotSyncCtx->pid == InvalidPid)
+ /*
+ * Return if neither the slot sync worker is running nor the function
+ * pg_sync_replication_slots() is executing.
+ */
+ if ((SlotSyncCtx->pid == InvalidPid) && !SlotSyncCtx->syncing)
{
SpinLockRelease(&SlotSyncCtx->mutex);
update_synced_slots_inactive_since();
@@ -1502,9 +1538,10 @@ ShutDownSlotSync(void)
}
SpinLockRelease(&SlotSyncCtx->mutex);
- kill(SlotSyncCtx->pid, SIGINT);
+ if (SlotSyncCtx->pid != InvalidPid)
+ kill(SlotSyncCtx->pid, SIGINT);
- /* Wait for it to die */
+ /* Wait for worker to exit and SQL function to finish */
for (;;)
{
int rc;
@@ -1522,8 +1559,11 @@ ShutDownSlotSync(void)
SpinLockAcquire(&SlotSyncCtx->mutex);
- /* Is it gone? */
- if (SlotSyncCtx->pid == InvalidPid)
+ /*
+ * Confirm that both the worker and the function
+ * pg_sync_replication_slots() are done.
+ */
+ if ((SlotSyncCtx->pid == InvalidPid) && !SlotSyncCtx->syncing)
break;
SpinLockRelease(&SlotSyncCtx->mutex);
@@ -1615,11 +1655,7 @@ slotsync_failure_callback(int code, Datum arg)
* without resetting the flag. So, we need to clean up shared memory
* and reset the flag here.
*/
- SpinLockAcquire(&SlotSyncCtx->mutex);
- SlotSyncCtx->syncing = false;
- SpinLockRelease(&SlotSyncCtx->mutex);
-
- syncing_slots = false;
+ reset_syncing_flag();
}
walrcv_disconnect(wrconn);
@@ -1634,9 +1670,33 @@ SyncReplicationSlots(WalReceiverConn *wrconn)
{
PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
{
+ SpinLockAcquire(&SlotSyncCtx->mutex);
+
+ /*
+ * Startup process signaled the slot sync to stop, so if meanwhile
+ * user has invoked slot sync SQL function, error out.
+ */
+ if (SlotSyncCtx->stopSignaled)
+ {
+ SpinLockRelease(&SlotSyncCtx->mutex);
+ ereport(ERROR,
+ errmsg("promotion in progress, can not synchronize replication slots"));
+ }
+
+ /*
+ * Advertise that we are syncing, so that the startup process knows
+ * about this sync call during promotion.
+ */
+ check_and_set_syncing_flag(false);
+
+ SpinLockRelease(&SlotSyncCtx->mutex);
+
validate_remote_info(wrconn);
synchronize_slots(wrconn);
+
+ /* We are done with sync, so reset sync flag */
+ reset_syncing_flag();
}
PG_END_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
}
--
2.34.1
On Fri, Apr 12, 2024 at 5:25 PM shveta malik <shveta.malik@gmail.com> wrote:
Please find v3 addressing race-condition and one other comment.
Up-thread it was suggested that, probably, checking
SlotSyncCtx->syncing should be sufficient in ShutDownSlotSync(). On
re-thinking, it might not be. Slot sync worker sets and resets
'syncing' with each sync-cycle, and thus we need to rely on worker's
pid in ShutDownSlotSync(), as there could be a window where promotion
is triggered and 'syncing' is not set for worker, while the worker is
still running. This implementation of setting and resetting syncing
with each sync-cycle looks better as compared to setting syncing
during the entire life-cycle of the worker. So, I did not change it.
To retain this we need to have different handling for 'syncing' for
workers and function which seems like more maintenance burden than the
value it provides. Moreover, in SyncReplicationSlots(), we are calling
a function after acquiring spinlock which is not our usual coding
practice.
One minor comment:
* All the fields except 'syncing' are used only by slotsync worker.
* 'syncing' is used both by worker and SQL function pg_sync_replication_slots.
*/
typedef struct SlotSyncCtxStruct
{
pid_t pid;
bool stopSignaled;
bool syncing;
time_t last_start_time;
slock_t mutex;
} SlotSyncCtxStruct;
I feel the above comment is no longer valid after this patch. We can
probably remove this altogether.
--
With Regards,
Amit Kapila.
On Mon, Apr 15, 2024 at 2:29 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Fri, Apr 12, 2024 at 5:25 PM shveta malik <shveta.malik@gmail.com> wrote:
Please find v3 addressing race-condition and one other comment.
Up-thread it was suggested that, probably, checking
SlotSyncCtx->syncing should be sufficient in ShutDownSlotSync(). On
re-thinking, it might not be. Slot sync worker sets and resets
'syncing' with each sync-cycle, and thus we need to rely on worker's
pid in ShutDownSlotSync(), as there could be a window where promotion
is triggered and 'syncing' is not set for worker, while the worker is
still running. This implementation of setting and resetting syncing
with each sync-cycle looks better as compared to setting syncing
during the entire life-cycle of the worker. So, I did not change it.To retain this we need to have different handling for 'syncing' for
workers and function which seems like more maintenance burden than the
value it provides. Moreover, in SyncReplicationSlots(), we are calling
a function after acquiring spinlock which is not our usual coding
practice.
Okay. Changed it to consistent handling. Now both worker and SQL
function set 'syncing' when they start and reset it when they exit.
One minor comment:
* All the fields except 'syncing' are used only by slotsync worker.
* 'syncing' is used both by worker and SQL function pg_sync_replication_slots.
*/
typedef struct SlotSyncCtxStruct
{
pid_t pid;
bool stopSignaled;
bool syncing;
time_t last_start_time;
slock_t mutex;
} SlotSyncCtxStruct;I feel the above comment is no longer valid after this patch. We can
probably remove this altogether.
Yes, changed.
Please find v4 addressing the above comments.
thanks
Shveta
Attachments:
v4-0001-Handle-stopSignaled-during-sync-function-call.patchapplication/octet-stream; name=v4-0001-Handle-stopSignaled-during-sync-function-call.patchDownload
From 2c0433119820216708a5076f2ba90c1436f782df Mon Sep 17 00:00:00 2001
From: Shveta Malik <shveta.malik@gmail.com>
Date: Fri, 5 Apr 2024 10:16:03 +0530
Subject: [PATCH v4] Handle stopSignaled during sync function call.
Currently, promotion related handling is missing in the slot sync SQL
function pg_sync_replication_slots(). Here is the background on how
it is done in slot sync worker:
During promotion, the startup process in order to shut down the
slot-sync worker, sets the 'stopSignaled' flag, sends the shut-down
signal, and waits for slot sync worker to exit. Meanwhile if the
postmaster has not noticed the promotion yet, it may end up restarting
slot sync worker. In such a case, the worker exits if 'stopSignaled'
is set.
Since there is a chance that the user (or any of his scripts/tools)
may execute SQL function pg_sync_replication_slots() in parallel to
promotion, such handling is needed in this SQL function as well, The
attached patch attempts to implement the same. Changes are:
1) If pg_sync_replication_slots() is already running when the
promotion is triggered, ShutDownSlotSync() checks the
'SlotSyncCtx->syncing' flag as well and waits for it to become false
i.e. waits till parallel running SQL function is finished.
2) If pg_sync_replication_slots() is invoked when promotion is
already in progress, pg_sync_replication_slots() respects the
'stopSignaled' flag set by the startup process and becomes a no-op.
---
src/backend/replication/logical/slotsync.c | 196 +++++++++++++--------
1 file changed, 122 insertions(+), 74 deletions(-)
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index bda0de52db..a0e0c05d20 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -92,9 +92,6 @@
* is expected (e.g., slot sync GUCs change), slot sync worker will reset
* last_start_time before exiting, so that postmaster can start the worker
* without waiting for SLOTSYNC_RESTART_INTERVAL_SEC.
- *
- * All the fields except 'syncing' are used only by slotsync worker.
- * 'syncing' is used both by worker and SQL function pg_sync_replication_slots.
*/
typedef struct SlotSyncCtxStruct
{
@@ -807,20 +804,6 @@ synchronize_slots(WalReceiverConn *wrconn)
" FROM pg_catalog.pg_replication_slots"
" WHERE failover and NOT temporary";
- SpinLockAcquire(&SlotSyncCtx->mutex);
- if (SlotSyncCtx->syncing)
- {
- SpinLockRelease(&SlotSyncCtx->mutex);
- ereport(ERROR,
- errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("cannot synchronize replication slots concurrently"));
- }
-
- SlotSyncCtx->syncing = true;
- SpinLockRelease(&SlotSyncCtx->mutex);
-
- syncing_slots = true;
-
/* The syscache access in walrcv_exec() needs a transaction env. */
if (!IsTransactionState())
{
@@ -937,12 +920,6 @@ synchronize_slots(WalReceiverConn *wrconn)
if (started_tx)
CommitTransactionCommand();
- SpinLockAcquire(&SlotSyncCtx->mutex);
- SlotSyncCtx->syncing = false;
- SpinLockRelease(&SlotSyncCtx->mutex);
-
- syncing_slots = false;
-
return some_slot_updated;
}
@@ -1199,7 +1176,20 @@ static void
slotsync_worker_onexit(int code, Datum arg)
{
SpinLockAcquire(&SlotSyncCtx->mutex);
+
SlotSyncCtx->pid = InvalidPid;
+
+ /*
+ * If syncing_slots is true, it indicates that the process errored out
+ * without resetting the flag. So, we need to clean up shared memory and
+ * reset the flag here.
+ */
+ if (syncing_slots)
+ {
+ SlotSyncCtx->syncing = false;
+ syncing_slots = false;
+ }
+
SpinLockRelease(&SlotSyncCtx->mutex);
}
@@ -1242,6 +1232,63 @@ wait_for_slot_activity(bool some_slot_updated)
ResetLatch(MyLatch);
}
+/*
+ * Check stopSignaled and syncing flags. Emit error if promotion has
+ * already set stopSignaled or if it is concurrent sync call.
+ * Otherwise, set 'syncing' flag and pid info.
+ */
+static void
+check_flags_and_set_sync_info(pid_t worker_pid)
+{
+ SpinLockAcquire(&SlotSyncCtx->mutex);
+
+ /* If it is worker, check that we do not have pid set already */
+ if (worker_pid != InvalidPid)
+ Assert(SlotSyncCtx->pid == InvalidPid);
+
+ /*
+ * Startup process signaled the slot sync machinery to stop, so if meanwhile
+ * postmaster ended up starting the worker again or user has invoked
+ * pg_sync_replication_slots(), error out.
+ */
+ if (SlotSyncCtx->stopSignaled)
+ {
+ SpinLockRelease(&SlotSyncCtx->mutex);
+ ereport(ERROR,
+ errmsg("promotion in progress, can not synchronize replication slots"));
+ }
+
+ if (SlotSyncCtx->syncing)
+ {
+ SpinLockRelease(&SlotSyncCtx->mutex);
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot synchronize replication slots concurrently"));
+ }
+
+ /* Advertise our PID so that the startup process can kill us on promotion */
+ SlotSyncCtx->pid = worker_pid;
+
+ SlotSyncCtx->syncing = true;
+
+ SpinLockRelease(&SlotSyncCtx->mutex);
+
+ syncing_slots = true;
+}
+
+/*
+ * Reset syncing flag.
+ */
+static void
+reset_syncing_flag()
+{
+ SpinLockAcquire(&SlotSyncCtx->mutex);
+ SlotSyncCtx->syncing = false;
+ SpinLockRelease(&SlotSyncCtx->mutex);
+
+ syncing_slots = false;
+};
+
/*
* The main loop of our worker process.
*
@@ -1278,47 +1325,6 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
Assert(SlotSyncCtx != NULL);
- SpinLockAcquire(&SlotSyncCtx->mutex);
- Assert(SlotSyncCtx->pid == InvalidPid);
-
- /*
- * Startup process signaled the slot sync worker to stop, so if meanwhile
- * postmaster ended up starting the worker again, exit.
- */
- if (SlotSyncCtx->stopSignaled)
- {
- SpinLockRelease(&SlotSyncCtx->mutex);
- proc_exit(0);
- }
-
- /* Advertise our PID so that the startup process can kill us on promotion */
- SlotSyncCtx->pid = MyProcPid;
- SpinLockRelease(&SlotSyncCtx->mutex);
-
- ereport(LOG, errmsg("slot sync worker started"));
-
- /* Register it as soon as SlotSyncCtx->pid is initialized. */
- before_shmem_exit(slotsync_worker_onexit, (Datum) 0);
-
- /* Setup signal handling */
- pqsignal(SIGHUP, SignalHandlerForConfigReload);
- pqsignal(SIGINT, SignalHandlerForShutdownRequest);
- pqsignal(SIGTERM, die);
- pqsignal(SIGFPE, FloatExceptionHandler);
- pqsignal(SIGUSR1, procsignal_sigusr1_handler);
- pqsignal(SIGUSR2, SIG_IGN);
- pqsignal(SIGPIPE, SIG_IGN);
- pqsignal(SIGCHLD, SIG_DFL);
-
- /*
- * Establishes SIGALRM handler and initialize timeout module. It is needed
- * by InitPostgres to register different timeouts.
- */
- InitializeTimeouts();
-
- /* Load the libpq-specific functions */
- load_file("libpqwalreceiver", false);
-
/*
* If an exception is encountered, processing resumes here.
*
@@ -1350,6 +1356,32 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
/* We can now handle ereport(ERROR) */
PG_exception_stack = &local_sigjmp_buf;
+ check_flags_and_set_sync_info(MyProcPid);
+
+ ereport(LOG, errmsg("slot sync worker started"));
+
+ /* Register it as soon as SlotSyncCtx->pid is initialized. */
+ before_shmem_exit(slotsync_worker_onexit, (Datum) 0);
+
+ /* Setup signal handling */
+ pqsignal(SIGHUP, SignalHandlerForConfigReload);
+ pqsignal(SIGINT, SignalHandlerForShutdownRequest);
+ pqsignal(SIGTERM, die);
+ pqsignal(SIGFPE, FloatExceptionHandler);
+ pqsignal(SIGUSR1, procsignal_sigusr1_handler);
+ pqsignal(SIGUSR2, SIG_IGN);
+ pqsignal(SIGPIPE, SIG_IGN);
+ pqsignal(SIGCHLD, SIG_DFL);
+
+ /*
+ * Establishes SIGALRM handler and initialize timeout module. It is needed
+ * by InitPostgres to register different timeouts.
+ */
+ InitializeTimeouts();
+
+ /* Load the libpq-specific functions */
+ load_file("libpqwalreceiver", false);
+
/*
* Unblock signals (they were blocked when the postmaster forked us)
*/
@@ -1471,6 +1503,9 @@ update_synced_slots_inactive_since(void)
{
Assert(SlotIsLogical(s));
+ /* The slot must not be acquired by any process */
+ Assert(s->active_pid == 0);
+
/* Use the same inactive_since time for all the slots. */
if (now == 0)
now = GetCurrentTimestamp();
@@ -1486,6 +1521,10 @@ update_synced_slots_inactive_since(void)
/*
* Shut down the slot sync worker.
+ *
+ * It sends signal to shutdown slot sync worker. It also waits till
+ * the slot sync worker has exited and pg_sync_replication_slots()
+ * has finished.
*/
void
ShutDownSlotSync(void)
@@ -1494,7 +1533,11 @@ ShutDownSlotSync(void)
SlotSyncCtx->stopSignaled = true;
- if (SlotSyncCtx->pid == InvalidPid)
+ /*
+ * Return if neither the slot sync worker is running nor the function
+ * pg_sync_replication_slots() is executing.
+ */
+ if (!SlotSyncCtx->syncing)
{
SpinLockRelease(&SlotSyncCtx->mutex);
update_synced_slots_inactive_since();
@@ -1502,9 +1545,10 @@ ShutDownSlotSync(void)
}
SpinLockRelease(&SlotSyncCtx->mutex);
- kill(SlotSyncCtx->pid, SIGINT);
+ if (SlotSyncCtx->pid != InvalidPid)
+ kill(SlotSyncCtx->pid, SIGINT);
- /* Wait for it to die */
+ /* Wait for slot sync to end */
for (;;)
{
int rc;
@@ -1522,8 +1566,11 @@ ShutDownSlotSync(void)
SpinLockAcquire(&SlotSyncCtx->mutex);
- /* Is it gone? */
- if (SlotSyncCtx->pid == InvalidPid)
+ /*
+ * Confirm that both the worker and the function
+ * pg_sync_replication_slots() are done.
+ */
+ if (!SlotSyncCtx->syncing)
break;
SpinLockRelease(&SlotSyncCtx->mutex);
@@ -1615,11 +1662,7 @@ slotsync_failure_callback(int code, Datum arg)
* without resetting the flag. So, we need to clean up shared memory
* and reset the flag here.
*/
- SpinLockAcquire(&SlotSyncCtx->mutex);
- SlotSyncCtx->syncing = false;
- SpinLockRelease(&SlotSyncCtx->mutex);
-
- syncing_slots = false;
+ reset_syncing_flag();
}
walrcv_disconnect(wrconn);
@@ -1634,9 +1677,14 @@ SyncReplicationSlots(WalReceiverConn *wrconn)
{
PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
{
+ check_flags_and_set_sync_info(InvalidPid);
+
validate_remote_info(wrconn);
synchronize_slots(wrconn);
+
+ /* We are done with sync, so reset sync flag */
+ reset_syncing_flag();
}
PG_END_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
}
--
2.34.1
Hi,
On Mon, Apr 15, 2024 at 03:38:53PM +0530, shveta malik wrote:
On Mon, Apr 15, 2024 at 2:29 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Fri, Apr 12, 2024 at 5:25 PM shveta malik <shveta.malik@gmail.com> wrote:
Please find v3 addressing race-condition and one other comment.
Up-thread it was suggested that, probably, checking
SlotSyncCtx->syncing should be sufficient in ShutDownSlotSync(). On
re-thinking, it might not be. Slot sync worker sets and resets
'syncing' with each sync-cycle, and thus we need to rely on worker's
pid in ShutDownSlotSync(), as there could be a window where promotion
is triggered and 'syncing' is not set for worker, while the worker is
still running. This implementation of setting and resetting syncing
with each sync-cycle looks better as compared to setting syncing
during the entire life-cycle of the worker. So, I did not change it.To retain this we need to have different handling for 'syncing' for
workers and function which seems like more maintenance burden than the
value it provides. Moreover, in SyncReplicationSlots(), we are calling
a function after acquiring spinlock which is not our usual coding
practice.Okay. Changed it to consistent handling.
Thanks for the patch!
Now both worker and SQL
function set 'syncing' when they start and reset it when they exit.
It means that it's not possible anymore to trigger a manual sync if
sync_replication_slots is on. Indeed that would trigger:
postgres=# select pg_sync_replication_slots();
ERROR: cannot synchronize replication slots concurrently
That looks like an issue to me, thoughts?
Regards,
--
Bertrand Drouvot
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
On Mon, Apr 15, 2024 at 6:21 PM Bertrand Drouvot
<bertranddrouvot.pg@gmail.com> wrote:
On Mon, Apr 15, 2024 at 03:38:53PM +0530, shveta malik wrote:
On Mon, Apr 15, 2024 at 2:29 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Fri, Apr 12, 2024 at 5:25 PM shveta malik <shveta.malik@gmail.com> wrote:
Please find v3 addressing race-condition and one other comment.
Up-thread it was suggested that, probably, checking
SlotSyncCtx->syncing should be sufficient in ShutDownSlotSync(). On
re-thinking, it might not be. Slot sync worker sets and resets
'syncing' with each sync-cycle, and thus we need to rely on worker's
pid in ShutDownSlotSync(), as there could be a window where promotion
is triggered and 'syncing' is not set for worker, while the worker is
still running. This implementation of setting and resetting syncing
with each sync-cycle looks better as compared to setting syncing
during the entire life-cycle of the worker. So, I did not change it.To retain this we need to have different handling for 'syncing' for
workers and function which seems like more maintenance burden than the
value it provides. Moreover, in SyncReplicationSlots(), we are calling
a function after acquiring spinlock which is not our usual coding
practice.Okay. Changed it to consistent handling.
Thanks for the patch!
Now both worker and SQL
function set 'syncing' when they start and reset it when they exit.It means that it's not possible anymore to trigger a manual sync if
sync_replication_slots is on. Indeed that would trigger:postgres=# select pg_sync_replication_slots();
ERROR: cannot synchronize replication slots concurrentlyThat looks like an issue to me, thoughts?
This is intentional as of now for the sake of keeping
implementation/code simple. It is not difficult to allow them but I am
not sure whether we want to add another set of conditions allowing
them in parallel. And that too in an unpredictable way as the API will
work only for the time slot sync worker is not performing the sync.
--
With Regards,
Amit Kapila.
Hi,
On Mon, Apr 15, 2024 at 06:29:49PM +0530, Amit Kapila wrote:
On Mon, Apr 15, 2024 at 6:21 PM Bertrand Drouvot
<bertranddrouvot.pg@gmail.com> wrote:On Mon, Apr 15, 2024 at 03:38:53PM +0530, shveta malik wrote:
Now both worker and SQL
function set 'syncing' when they start and reset it when they exit.It means that it's not possible anymore to trigger a manual sync if
sync_replication_slots is on. Indeed that would trigger:postgres=# select pg_sync_replication_slots();
ERROR: cannot synchronize replication slots concurrentlyThat looks like an issue to me, thoughts?
This is intentional as of now for the sake of keeping
implementation/code simple. It is not difficult to allow them but I am
not sure whether we want to add another set of conditions allowing
them in parallel.
I think that the ability to launch a manual sync before a switchover would be
missed. Except for this case I don't think that's an issue to prevent them to
run in parallel.
And that too in an unpredictable way as the API will
work only for the time slot sync worker is not performing the sync.
Yeah but then at least you would know that there is "really" a sync in progress
(which is not the case currently with v4, as the sync worker being started is
enough to prevent a manual sync even if a sync is not in progress).
Regards,
--
Bertrand Drouvot
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
On Mon, Apr 15, 2024 at 7:47 PM Bertrand Drouvot
<bertranddrouvot.pg@gmail.com> wrote:
On Mon, Apr 15, 2024 at 06:29:49PM +0530, Amit Kapila wrote:
On Mon, Apr 15, 2024 at 6:21 PM Bertrand Drouvot
<bertranddrouvot.pg@gmail.com> wrote:On Mon, Apr 15, 2024 at 03:38:53PM +0530, shveta malik wrote:
Now both worker and SQL
function set 'syncing' when they start and reset it when they exit.It means that it's not possible anymore to trigger a manual sync if
sync_replication_slots is on. Indeed that would trigger:postgres=# select pg_sync_replication_slots();
ERROR: cannot synchronize replication slots concurrentlyThat looks like an issue to me, thoughts?
This is intentional as of now for the sake of keeping
implementation/code simple. It is not difficult to allow them but I am
not sure whether we want to add another set of conditions allowing
them in parallel.I think that the ability to launch a manual sync before a switchover would be
missed. Except for this case I don't think that's an issue to prevent them to
run in parallel.
I think if the slotsync worker is available, it can do that as well.
There is no clear use case for allowing them in parallel and I feel it
would add more confusion when it can work sometimes but not other
times. However, if we receive some report from the field where there
is a real demand for such a thing, it should be easy to achieve. For
example, I can imagine that we can have sync_state that has values
'started', 'in_progress' , and 'finished'. This should allow us to
achieve what the current proposed patch is doing along with allowing
the API to work in parallel when the sync_state is not 'in_progress'.
I think for now let's restrict their usage in parallel and make the
promotion behavior consistent both for worker and API.
--
With Regards,
Amit Kapila.
On Monday, April 15, 2024 6:09 PM shveta malik <shveta.malik@gmail.com> wrote:
Please find v4 addressing the above comments.
Thanks for the patch.
Here are few comments:
1.
+ ereport(ERROR,
+ errmsg("promotion in progress, can not synchronize replication slots"));
+ }
I think an errcode is needed.
The style of the error message seems a bit unnatural to me. I suggest:
"cannot synchronize replication slots when standby promotion is ongoing"
2.
+ if (worker_pid != InvalidPid)
+ Assert(SlotSyncCtx->pid == InvalidPid);
We could merge the checks into one Assert().
Assert(SlotSyncCtx->pid == InvalidPid || worker_pid == InvalidPid);
3.
- pqsignal(SIGINT, SignalHandlerForShutdownRequest);
I realized that we should register this before setting SlotSyncCtx->pid,
otherwise if the standby is promoted after setting pid and before registering
signal handle function, the slotsync worker could miss to handle SIGINT sent by
startup process(ShutDownSlotSync). This is an existing issue for slotsync
worker, but maybe we could fix it together with the patch.
Best Regards,
Hou zj
On Tue, Apr 16, 2024 at 9:27 AM Zhijie Hou (Fujitsu)
<houzj.fnst@fujitsu.com> wrote:
On Monday, April 15, 2024 6:09 PM shveta malik <shveta.malik@gmail.com> wrote:
Please find v4 addressing the above comments.
Thanks for the patch.
Here are few comments:
Thanks for reviewing the patch.
1.
+ ereport(ERROR, + errmsg("promotion in progress, can not synchronize replication slots")); + }I think an errcode is needed.
The style of the error message seems a bit unnatural to me. I suggest:
"cannot synchronize replication slots when standby promotion is ongoing"
Modified.
2.
+ if (worker_pid != InvalidPid) + Assert(SlotSyncCtx->pid == InvalidPid);We could merge the checks into one Assert().
Assert(SlotSyncCtx->pid == InvalidPid || worker_pid == InvalidPid);
Modified.
3.
- pqsignal(SIGINT, SignalHandlerForShutdownRequest);
I realized that we should register this before setting SlotSyncCtx->pid,
otherwise if the standby is promoted after setting pid and before registering
signal handle function, the slotsync worker could miss to handle SIGINT sent by
startup process(ShutDownSlotSync). This is an existing issue for slotsync
worker, but maybe we could fix it together with the patch.
Yes, it seems like a problem. Fixed it. Also to be consistent, moved
other signal handlers' registration as well before we set pid.
Please find v5 addressing above comments.
thanks
Shveta
Attachments:
v5-0001-Handle-stopSignaled-during-sync-function-call.patchapplication/octet-stream; name=v5-0001-Handle-stopSignaled-during-sync-function-call.patchDownload
From af4d1479c9e20d6103c2726f4c53489be4f02d53 Mon Sep 17 00:00:00 2001
From: Shveta Malik <shveta.malik@gmail.com>
Date: Fri, 5 Apr 2024 10:16:03 +0530
Subject: [PATCH v5] Handle stopSignaled during sync function call.
This patch attempts to fix two issues:
1)
It implements promotion related handling needed for slot sync SQL
function pg_sync_replication_slots() simiar to slot sync worker.
Since there is a chance that the user (or any of his scripts/tools)
may execute SQL function pg_sync_replication_slots() in parallel to
promotion, both the promotion and SQL call need to handle it well.
Changes are:
a) If pg_sync_replication_slots() is already running when the
promotion is triggered, ShutDownSlotSync() checks the
'SlotSyncCtx->syncing' flag as well and waits for it to become false
i.e. waits till parallel running SQL function is finished.
b) If pg_sync_replication_slots() is invoked when promotion is
already in progress, pg_sync_replication_slots() respects the
'stopSignaled' flag set by the startup process and errors out.
2)
This patch fixes another issue in slot sync worker where
SignalHandlerForShutdownRequest() needs to be registered *before* setting
SlotSyncCtx->pid, otherwise the slotsync worker could miss to handle
SIGINT sent by startup process(ShutDownSlotSync). To be consistent, all
signale handlers' registration is moved to a prior location before we set
pid.
---
src/backend/replication/logical/slotsync.c | 200 +++++++++++++--------
1 file changed, 124 insertions(+), 76 deletions(-)
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index bda0de52db..c85779b247 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -92,9 +92,6 @@
* is expected (e.g., slot sync GUCs change), slot sync worker will reset
* last_start_time before exiting, so that postmaster can start the worker
* without waiting for SLOTSYNC_RESTART_INTERVAL_SEC.
- *
- * All the fields except 'syncing' are used only by slotsync worker.
- * 'syncing' is used both by worker and SQL function pg_sync_replication_slots.
*/
typedef struct SlotSyncCtxStruct
{
@@ -807,20 +804,6 @@ synchronize_slots(WalReceiverConn *wrconn)
" FROM pg_catalog.pg_replication_slots"
" WHERE failover and NOT temporary";
- SpinLockAcquire(&SlotSyncCtx->mutex);
- if (SlotSyncCtx->syncing)
- {
- SpinLockRelease(&SlotSyncCtx->mutex);
- ereport(ERROR,
- errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("cannot synchronize replication slots concurrently"));
- }
-
- SlotSyncCtx->syncing = true;
- SpinLockRelease(&SlotSyncCtx->mutex);
-
- syncing_slots = true;
-
/* The syscache access in walrcv_exec() needs a transaction env. */
if (!IsTransactionState())
{
@@ -937,12 +920,6 @@ synchronize_slots(WalReceiverConn *wrconn)
if (started_tx)
CommitTransactionCommand();
- SpinLockAcquire(&SlotSyncCtx->mutex);
- SlotSyncCtx->syncing = false;
- SpinLockRelease(&SlotSyncCtx->mutex);
-
- syncing_slots = false;
-
return some_slot_updated;
}
@@ -1199,7 +1176,20 @@ static void
slotsync_worker_onexit(int code, Datum arg)
{
SpinLockAcquire(&SlotSyncCtx->mutex);
+
SlotSyncCtx->pid = InvalidPid;
+
+ /*
+ * If syncing_slots is true, it indicates that the process errored out
+ * without resetting the flag. So, we need to clean up shared memory and
+ * reset the flag here.
+ */
+ if (syncing_slots)
+ {
+ SlotSyncCtx->syncing = false;
+ syncing_slots = false;
+ }
+
SpinLockRelease(&SlotSyncCtx->mutex);
}
@@ -1242,6 +1232,63 @@ wait_for_slot_activity(bool some_slot_updated)
ResetLatch(MyLatch);
}
+/*
+ * Check stopSignaled and syncing flags. Emit error if promotion has
+ * already set stopSignaled or if it is concurrent sync call. Otherwise,
+ * set 'syncing' flag and pid info.
+ */
+static void
+check_flags_and_set_sync_info(pid_t worker_pid)
+{
+ SpinLockAcquire(&SlotSyncCtx->mutex);
+
+ /* The worker pid must not be already assigned in SlotSyncCtx */
+ Assert(worker_pid == InvalidPid || SlotSyncCtx->pid == InvalidPid);
+
+ /*
+ * Startup process signaled the slot sync machinery to stop, so if
+ * meanwhile postmaster ended up starting the worker again or user has
+ * invoked pg_sync_replication_slots(), error out.
+ */
+ if (SlotSyncCtx->stopSignaled)
+ {
+ SpinLockRelease(&SlotSyncCtx->mutex);
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot synchronize replication slots when standby promotion is ongoing"));
+ }
+
+ if (SlotSyncCtx->syncing)
+ {
+ SpinLockRelease(&SlotSyncCtx->mutex);
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot synchronize replication slots concurrently"));
+ }
+
+ /* Advertise our PID so that the startup process can kill us on promotion */
+ SlotSyncCtx->pid = worker_pid;
+
+ SlotSyncCtx->syncing = true;
+
+ SpinLockRelease(&SlotSyncCtx->mutex);
+
+ syncing_slots = true;
+}
+
+/*
+ * Reset syncing flag.
+ */
+static void
+reset_syncing_flag()
+{
+ SpinLockAcquire(&SlotSyncCtx->mutex);
+ SlotSyncCtx->syncing = false;
+ SpinLockRelease(&SlotSyncCtx->mutex);
+
+ syncing_slots = false;
+};
+
/*
* The main loop of our worker process.
*
@@ -1278,47 +1325,6 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
Assert(SlotSyncCtx != NULL);
- SpinLockAcquire(&SlotSyncCtx->mutex);
- Assert(SlotSyncCtx->pid == InvalidPid);
-
- /*
- * Startup process signaled the slot sync worker to stop, so if meanwhile
- * postmaster ended up starting the worker again, exit.
- */
- if (SlotSyncCtx->stopSignaled)
- {
- SpinLockRelease(&SlotSyncCtx->mutex);
- proc_exit(0);
- }
-
- /* Advertise our PID so that the startup process can kill us on promotion */
- SlotSyncCtx->pid = MyProcPid;
- SpinLockRelease(&SlotSyncCtx->mutex);
-
- ereport(LOG, errmsg("slot sync worker started"));
-
- /* Register it as soon as SlotSyncCtx->pid is initialized. */
- before_shmem_exit(slotsync_worker_onexit, (Datum) 0);
-
- /* Setup signal handling */
- pqsignal(SIGHUP, SignalHandlerForConfigReload);
- pqsignal(SIGINT, SignalHandlerForShutdownRequest);
- pqsignal(SIGTERM, die);
- pqsignal(SIGFPE, FloatExceptionHandler);
- pqsignal(SIGUSR1, procsignal_sigusr1_handler);
- pqsignal(SIGUSR2, SIG_IGN);
- pqsignal(SIGPIPE, SIG_IGN);
- pqsignal(SIGCHLD, SIG_DFL);
-
- /*
- * Establishes SIGALRM handler and initialize timeout module. It is needed
- * by InitPostgres to register different timeouts.
- */
- InitializeTimeouts();
-
- /* Load the libpq-specific functions */
- load_file("libpqwalreceiver", false);
-
/*
* If an exception is encountered, processing resumes here.
*
@@ -1350,6 +1356,32 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
/* We can now handle ereport(ERROR) */
PG_exception_stack = &local_sigjmp_buf;
+ /* Setup signal handling */
+ pqsignal(SIGHUP, SignalHandlerForConfigReload);
+ pqsignal(SIGINT, SignalHandlerForShutdownRequest);
+ pqsignal(SIGTERM, die);
+ pqsignal(SIGFPE, FloatExceptionHandler);
+ pqsignal(SIGUSR1, procsignal_sigusr1_handler);
+ pqsignal(SIGUSR2, SIG_IGN);
+ pqsignal(SIGPIPE, SIG_IGN);
+ pqsignal(SIGCHLD, SIG_DFL);
+
+ check_flags_and_set_sync_info(MyProcPid);
+
+ ereport(LOG, errmsg("slot sync worker started"));
+
+ /* Register it as soon as SlotSyncCtx->pid is initialized. */
+ before_shmem_exit(slotsync_worker_onexit, (Datum) 0);
+
+ /*
+ * Establishes SIGALRM handler and initialize timeout module. It is needed
+ * by InitPostgres to register different timeouts.
+ */
+ InitializeTimeouts();
+
+ /* Load the libpq-specific functions */
+ load_file("libpqwalreceiver", false);
+
/*
* Unblock signals (they were blocked when the postmaster forked us)
*/
@@ -1457,8 +1489,8 @@ update_synced_slots_inactive_since(void)
if (!StandbyMode)
return;
- /* The slot sync worker mustn't be running by now */
- Assert(SlotSyncCtx->pid == InvalidPid);
+ /* The slot sync worker or SQL function mustn't be running by now */
+ Assert((SlotSyncCtx->pid == InvalidPid) && !SlotSyncCtx->syncing);
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
@@ -1471,6 +1503,9 @@ update_synced_slots_inactive_since(void)
{
Assert(SlotIsLogical(s));
+ /* The slot must not be acquired by any process */
+ Assert(s->active_pid == 0);
+
/* Use the same inactive_since time for all the slots. */
if (now == 0)
now = GetCurrentTimestamp();
@@ -1486,6 +1521,10 @@ update_synced_slots_inactive_since(void)
/*
* Shut down the slot sync worker.
+ *
+ * It sends signal to shutdown slot sync worker. It also waits till
+ * the slot sync worker has exited and pg_sync_replication_slots()
+ * has finished.
*/
void
ShutDownSlotSync(void)
@@ -1494,7 +1533,11 @@ ShutDownSlotSync(void)
SlotSyncCtx->stopSignaled = true;
- if (SlotSyncCtx->pid == InvalidPid)
+ /*
+ * Return if neither the slot sync worker is running nor the function
+ * pg_sync_replication_slots() is executing.
+ */
+ if (!SlotSyncCtx->syncing)
{
SpinLockRelease(&SlotSyncCtx->mutex);
update_synced_slots_inactive_since();
@@ -1502,9 +1545,10 @@ ShutDownSlotSync(void)
}
SpinLockRelease(&SlotSyncCtx->mutex);
- kill(SlotSyncCtx->pid, SIGINT);
+ if (SlotSyncCtx->pid != InvalidPid)
+ kill(SlotSyncCtx->pid, SIGINT);
- /* Wait for it to die */
+ /* Wait for slot sync to end */
for (;;)
{
int rc;
@@ -1522,8 +1566,11 @@ ShutDownSlotSync(void)
SpinLockAcquire(&SlotSyncCtx->mutex);
- /* Is it gone? */
- if (SlotSyncCtx->pid == InvalidPid)
+ /*
+ * Confirm that both the worker and the function
+ * pg_sync_replication_slots() are done.
+ */
+ if (!SlotSyncCtx->syncing)
break;
SpinLockRelease(&SlotSyncCtx->mutex);
@@ -1615,11 +1662,7 @@ slotsync_failure_callback(int code, Datum arg)
* without resetting the flag. So, we need to clean up shared memory
* and reset the flag here.
*/
- SpinLockAcquire(&SlotSyncCtx->mutex);
- SlotSyncCtx->syncing = false;
- SpinLockRelease(&SlotSyncCtx->mutex);
-
- syncing_slots = false;
+ reset_syncing_flag();
}
walrcv_disconnect(wrconn);
@@ -1634,9 +1677,14 @@ SyncReplicationSlots(WalReceiverConn *wrconn)
{
PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
{
+ check_flags_and_set_sync_info(InvalidPid);
+
validate_remote_info(wrconn);
synchronize_slots(wrconn);
+
+ /* We are done with sync, so reset sync flag */
+ reset_syncing_flag();
}
PG_END_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
}
--
2.34.1
Hi,
On Tue, Apr 16, 2024 at 08:21:04AM +0530, Amit Kapila wrote:
On Mon, Apr 15, 2024 at 7:47 PM Bertrand Drouvot
<bertranddrouvot.pg@gmail.com> wrote:On Mon, Apr 15, 2024 at 06:29:49PM +0530, Amit Kapila wrote:
On Mon, Apr 15, 2024 at 6:21 PM Bertrand Drouvot
<bertranddrouvot.pg@gmail.com> wrote:On Mon, Apr 15, 2024 at 03:38:53PM +0530, shveta malik wrote:
Now both worker and SQL
function set 'syncing' when they start and reset it when they exit.It means that it's not possible anymore to trigger a manual sync if
sync_replication_slots is on. Indeed that would trigger:postgres=# select pg_sync_replication_slots();
ERROR: cannot synchronize replication slots concurrentlyThat looks like an issue to me, thoughts?
This is intentional as of now for the sake of keeping
implementation/code simple. It is not difficult to allow them but I am
not sure whether we want to add another set of conditions allowing
them in parallel.I think that the ability to launch a manual sync before a switchover would be
missed. Except for this case I don't think that's an issue to prevent them to
run in parallel.I think if the slotsync worker is available, it can do that as well.
Right, but one has no control as to when the sync is triggered.
There is no clear use case for allowing them in parallel and I feel it
would add more confusion when it can work sometimes but not other
times. However, if we receive some report from the field where there
is a real demand for such a thing, it should be easy to achieve. For
example, I can imagine that we can have sync_state that has values
'started', 'in_progress' , and 'finished'. This should allow us to
achieve what the current proposed patch is doing along with allowing
the API to work in parallel when the sync_state is not 'in_progress'.I think for now let's restrict their usage in parallel and make the
promotion behavior consistent both for worker and API.
Okay, let's do it that way. Is it worth to add a few words in the doc related to
pg_sync_replication_slots() though? (to mention it can not be used if the sync
slot worker is running).
Regards,
--
Bertrand Drouvot
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
On Tue, Apr 16, 2024 at 12:03 PM Bertrand Drouvot
<bertranddrouvot.pg@gmail.com> wrote:
On Tue, Apr 16, 2024 at 08:21:04AM +0530, Amit Kapila wrote:
There is no clear use case for allowing them in parallel and I feel it
would add more confusion when it can work sometimes but not other
times. However, if we receive some report from the field where there
is a real demand for such a thing, it should be easy to achieve. For
example, I can imagine that we can have sync_state that has values
'started', 'in_progress' , and 'finished'. This should allow us to
achieve what the current proposed patch is doing along with allowing
the API to work in parallel when the sync_state is not 'in_progress'.I think for now let's restrict their usage in parallel and make the
promotion behavior consistent both for worker and API.Okay, let's do it that way. Is it worth to add a few words in the doc related to
pg_sync_replication_slots() though? (to mention it can not be used if the sync
slot worker is running).
Yes, this makes sense to me.
--
With Regards,
Amit Kapila.
Hi,
On Tue, Apr 16, 2024 at 10:00:04AM +0530, shveta malik wrote:
Please find v5 addressing above comments.
Thanks!
@@ -1634,9 +1677,14 @@ SyncReplicationSlots(WalReceiverConn *wrconn)
{
PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
{
+ check_flags_and_set_sync_info(InvalidPid);
+
Given the fact that if the sync worker is running it won't be possible to trigger
a manual sync with pg_sync_replication_slots(), what about also checking the
"sync_replication_slots" value at the start of SyncReplicationSlots() and
emmit an error if sync_replication_slots is set to on? (The message could explicitly
states that it's not possible to use the function if sync_replication_slots is
set to on).
Regards,
--
Bertrand Drouvot
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
On Tue, Apr 16, 2024 at 12:03 PM Bertrand Drouvot
<bertranddrouvot.pg@gmail.com> wrote:
I think for now let's restrict their usage in parallel and make the
promotion behavior consistent both for worker and API.Okay, let's do it that way. Is it worth to add a few words in the doc related to
pg_sync_replication_slots() though? (to mention it can not be used if the sync
slot worker is running).
+1. Please find v6 having the suggested doc changes.
thanks
Shveta
Attachments:
v6-0001-Handle-stopSignaled-during-sync-function-call.patchapplication/octet-stream; name=v6-0001-Handle-stopSignaled-during-sync-function-call.patchDownload
From 1cc185edb4b9852d21ac7d97343c342bb4f16a2d Mon Sep 17 00:00:00 2001
From: Shveta Malik <shveta.malik@gmail.com>
Date: Fri, 5 Apr 2024 10:16:03 +0530
Subject: [PATCH v6] Handle stopSignaled during sync function call.
This patch attempts to fix two issues:
1)
It implements promotion related handling needed for slot sync SQL
function pg_sync_replication_slots() simiar to slot sync worker.
Since there is a chance that the user (or any of his scripts/tools)
may execute SQL function pg_sync_replication_slots() in parallel to
promotion, both the promotion and SQL call need to handle it well.
Changes are:
a) If pg_sync_replication_slots() is already running when the
promotion is triggered, ShutDownSlotSync() checks the
'SlotSyncCtx->syncing' flag as well and waits for it to become false
i.e. waits till parallel running SQL function is finished.
b) If pg_sync_replication_slots() is invoked when promotion is
already in progress, pg_sync_replication_slots() respects the
'stopSignaled' flag set by the startup process and errors out.
2)
This patch fixes another issue in slot sync worker where
SignalHandlerForShutdownRequest() needs to be registered *before* setting
SlotSyncCtx->pid, otherwise the slotsync worker could miss to handle
SIGINT sent by startup process(ShutDownSlotSync). To be consistent, all
signale handlers' registration is moved to a prior location before we set
pid.
---
doc/src/sgml/func.sgml | 4 +
src/backend/replication/logical/slotsync.c | 200 +++++++++++++--------
2 files changed, 128 insertions(+), 76 deletions(-)
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 8dfb42ad4d..65a5fa6bb2 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -29348,6 +29348,10 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
standby server. Temporary synced slots, if any, cannot be used for
logical decoding and must be dropped after promotion. See
<xref linkend="logicaldecoding-replication-slots-synchronization"/> for details.
+ Note that this function cannot be executed if
+ <link linkend="guc-sync-replication-slots"><varname>
+ sync_replication_slots</varname></link> is enabled and the slotsync
+ worker is already running to perform the synchronization of slots.
</para>
<caution>
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index bda0de52db..c85779b247 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -92,9 +92,6 @@
* is expected (e.g., slot sync GUCs change), slot sync worker will reset
* last_start_time before exiting, so that postmaster can start the worker
* without waiting for SLOTSYNC_RESTART_INTERVAL_SEC.
- *
- * All the fields except 'syncing' are used only by slotsync worker.
- * 'syncing' is used both by worker and SQL function pg_sync_replication_slots.
*/
typedef struct SlotSyncCtxStruct
{
@@ -807,20 +804,6 @@ synchronize_slots(WalReceiverConn *wrconn)
" FROM pg_catalog.pg_replication_slots"
" WHERE failover and NOT temporary";
- SpinLockAcquire(&SlotSyncCtx->mutex);
- if (SlotSyncCtx->syncing)
- {
- SpinLockRelease(&SlotSyncCtx->mutex);
- ereport(ERROR,
- errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("cannot synchronize replication slots concurrently"));
- }
-
- SlotSyncCtx->syncing = true;
- SpinLockRelease(&SlotSyncCtx->mutex);
-
- syncing_slots = true;
-
/* The syscache access in walrcv_exec() needs a transaction env. */
if (!IsTransactionState())
{
@@ -937,12 +920,6 @@ synchronize_slots(WalReceiverConn *wrconn)
if (started_tx)
CommitTransactionCommand();
- SpinLockAcquire(&SlotSyncCtx->mutex);
- SlotSyncCtx->syncing = false;
- SpinLockRelease(&SlotSyncCtx->mutex);
-
- syncing_slots = false;
-
return some_slot_updated;
}
@@ -1199,7 +1176,20 @@ static void
slotsync_worker_onexit(int code, Datum arg)
{
SpinLockAcquire(&SlotSyncCtx->mutex);
+
SlotSyncCtx->pid = InvalidPid;
+
+ /*
+ * If syncing_slots is true, it indicates that the process errored out
+ * without resetting the flag. So, we need to clean up shared memory and
+ * reset the flag here.
+ */
+ if (syncing_slots)
+ {
+ SlotSyncCtx->syncing = false;
+ syncing_slots = false;
+ }
+
SpinLockRelease(&SlotSyncCtx->mutex);
}
@@ -1242,6 +1232,63 @@ wait_for_slot_activity(bool some_slot_updated)
ResetLatch(MyLatch);
}
+/*
+ * Check stopSignaled and syncing flags. Emit error if promotion has
+ * already set stopSignaled or if it is concurrent sync call. Otherwise,
+ * set 'syncing' flag and pid info.
+ */
+static void
+check_flags_and_set_sync_info(pid_t worker_pid)
+{
+ SpinLockAcquire(&SlotSyncCtx->mutex);
+
+ /* The worker pid must not be already assigned in SlotSyncCtx */
+ Assert(worker_pid == InvalidPid || SlotSyncCtx->pid == InvalidPid);
+
+ /*
+ * Startup process signaled the slot sync machinery to stop, so if
+ * meanwhile postmaster ended up starting the worker again or user has
+ * invoked pg_sync_replication_slots(), error out.
+ */
+ if (SlotSyncCtx->stopSignaled)
+ {
+ SpinLockRelease(&SlotSyncCtx->mutex);
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot synchronize replication slots when standby promotion is ongoing"));
+ }
+
+ if (SlotSyncCtx->syncing)
+ {
+ SpinLockRelease(&SlotSyncCtx->mutex);
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot synchronize replication slots concurrently"));
+ }
+
+ /* Advertise our PID so that the startup process can kill us on promotion */
+ SlotSyncCtx->pid = worker_pid;
+
+ SlotSyncCtx->syncing = true;
+
+ SpinLockRelease(&SlotSyncCtx->mutex);
+
+ syncing_slots = true;
+}
+
+/*
+ * Reset syncing flag.
+ */
+static void
+reset_syncing_flag()
+{
+ SpinLockAcquire(&SlotSyncCtx->mutex);
+ SlotSyncCtx->syncing = false;
+ SpinLockRelease(&SlotSyncCtx->mutex);
+
+ syncing_slots = false;
+};
+
/*
* The main loop of our worker process.
*
@@ -1278,47 +1325,6 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
Assert(SlotSyncCtx != NULL);
- SpinLockAcquire(&SlotSyncCtx->mutex);
- Assert(SlotSyncCtx->pid == InvalidPid);
-
- /*
- * Startup process signaled the slot sync worker to stop, so if meanwhile
- * postmaster ended up starting the worker again, exit.
- */
- if (SlotSyncCtx->stopSignaled)
- {
- SpinLockRelease(&SlotSyncCtx->mutex);
- proc_exit(0);
- }
-
- /* Advertise our PID so that the startup process can kill us on promotion */
- SlotSyncCtx->pid = MyProcPid;
- SpinLockRelease(&SlotSyncCtx->mutex);
-
- ereport(LOG, errmsg("slot sync worker started"));
-
- /* Register it as soon as SlotSyncCtx->pid is initialized. */
- before_shmem_exit(slotsync_worker_onexit, (Datum) 0);
-
- /* Setup signal handling */
- pqsignal(SIGHUP, SignalHandlerForConfigReload);
- pqsignal(SIGINT, SignalHandlerForShutdownRequest);
- pqsignal(SIGTERM, die);
- pqsignal(SIGFPE, FloatExceptionHandler);
- pqsignal(SIGUSR1, procsignal_sigusr1_handler);
- pqsignal(SIGUSR2, SIG_IGN);
- pqsignal(SIGPIPE, SIG_IGN);
- pqsignal(SIGCHLD, SIG_DFL);
-
- /*
- * Establishes SIGALRM handler and initialize timeout module. It is needed
- * by InitPostgres to register different timeouts.
- */
- InitializeTimeouts();
-
- /* Load the libpq-specific functions */
- load_file("libpqwalreceiver", false);
-
/*
* If an exception is encountered, processing resumes here.
*
@@ -1350,6 +1356,32 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
/* We can now handle ereport(ERROR) */
PG_exception_stack = &local_sigjmp_buf;
+ /* Setup signal handling */
+ pqsignal(SIGHUP, SignalHandlerForConfigReload);
+ pqsignal(SIGINT, SignalHandlerForShutdownRequest);
+ pqsignal(SIGTERM, die);
+ pqsignal(SIGFPE, FloatExceptionHandler);
+ pqsignal(SIGUSR1, procsignal_sigusr1_handler);
+ pqsignal(SIGUSR2, SIG_IGN);
+ pqsignal(SIGPIPE, SIG_IGN);
+ pqsignal(SIGCHLD, SIG_DFL);
+
+ check_flags_and_set_sync_info(MyProcPid);
+
+ ereport(LOG, errmsg("slot sync worker started"));
+
+ /* Register it as soon as SlotSyncCtx->pid is initialized. */
+ before_shmem_exit(slotsync_worker_onexit, (Datum) 0);
+
+ /*
+ * Establishes SIGALRM handler and initialize timeout module. It is needed
+ * by InitPostgres to register different timeouts.
+ */
+ InitializeTimeouts();
+
+ /* Load the libpq-specific functions */
+ load_file("libpqwalreceiver", false);
+
/*
* Unblock signals (they were blocked when the postmaster forked us)
*/
@@ -1457,8 +1489,8 @@ update_synced_slots_inactive_since(void)
if (!StandbyMode)
return;
- /* The slot sync worker mustn't be running by now */
- Assert(SlotSyncCtx->pid == InvalidPid);
+ /* The slot sync worker or SQL function mustn't be running by now */
+ Assert((SlotSyncCtx->pid == InvalidPid) && !SlotSyncCtx->syncing);
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
@@ -1471,6 +1503,9 @@ update_synced_slots_inactive_since(void)
{
Assert(SlotIsLogical(s));
+ /* The slot must not be acquired by any process */
+ Assert(s->active_pid == 0);
+
/* Use the same inactive_since time for all the slots. */
if (now == 0)
now = GetCurrentTimestamp();
@@ -1486,6 +1521,10 @@ update_synced_slots_inactive_since(void)
/*
* Shut down the slot sync worker.
+ *
+ * It sends signal to shutdown slot sync worker. It also waits till
+ * the slot sync worker has exited and pg_sync_replication_slots()
+ * has finished.
*/
void
ShutDownSlotSync(void)
@@ -1494,7 +1533,11 @@ ShutDownSlotSync(void)
SlotSyncCtx->stopSignaled = true;
- if (SlotSyncCtx->pid == InvalidPid)
+ /*
+ * Return if neither the slot sync worker is running nor the function
+ * pg_sync_replication_slots() is executing.
+ */
+ if (!SlotSyncCtx->syncing)
{
SpinLockRelease(&SlotSyncCtx->mutex);
update_synced_slots_inactive_since();
@@ -1502,9 +1545,10 @@ ShutDownSlotSync(void)
}
SpinLockRelease(&SlotSyncCtx->mutex);
- kill(SlotSyncCtx->pid, SIGINT);
+ if (SlotSyncCtx->pid != InvalidPid)
+ kill(SlotSyncCtx->pid, SIGINT);
- /* Wait for it to die */
+ /* Wait for slot sync to end */
for (;;)
{
int rc;
@@ -1522,8 +1566,11 @@ ShutDownSlotSync(void)
SpinLockAcquire(&SlotSyncCtx->mutex);
- /* Is it gone? */
- if (SlotSyncCtx->pid == InvalidPid)
+ /*
+ * Confirm that both the worker and the function
+ * pg_sync_replication_slots() are done.
+ */
+ if (!SlotSyncCtx->syncing)
break;
SpinLockRelease(&SlotSyncCtx->mutex);
@@ -1615,11 +1662,7 @@ slotsync_failure_callback(int code, Datum arg)
* without resetting the flag. So, we need to clean up shared memory
* and reset the flag here.
*/
- SpinLockAcquire(&SlotSyncCtx->mutex);
- SlotSyncCtx->syncing = false;
- SpinLockRelease(&SlotSyncCtx->mutex);
-
- syncing_slots = false;
+ reset_syncing_flag();
}
walrcv_disconnect(wrconn);
@@ -1634,9 +1677,14 @@ SyncReplicationSlots(WalReceiverConn *wrconn)
{
PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
{
+ check_flags_and_set_sync_info(InvalidPid);
+
validate_remote_info(wrconn);
synchronize_slots(wrconn);
+
+ /* We are done with sync, so reset sync flag */
+ reset_syncing_flag();
}
PG_END_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
}
--
2.34.1
On Tuesday, April 16, 2024 2:52 PM Bertrand Drouvot <bertranddrouvot.pg@gmail.com> wrote:
Hi,
On Tue, Apr 16, 2024 at 10:00:04AM +0530, shveta malik wrote:
Please find v5 addressing above comments.
Thanks!
@@ -1634,9 +1677,14 @@ SyncReplicationSlots(WalReceiverConn *wrconn) { PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn)); { + check_flags_and_set_sync_info(InvalidPid); +Given the fact that if the sync worker is running it won't be possible to trigger a
manual sync with pg_sync_replication_slots(), what about also checking the
"sync_replication_slots" value at the start of SyncReplicationSlots() and emmit
an error if sync_replication_slots is set to on? (The message could explicitly
states that it's not possible to use the function if sync_replication_slots is set to
on).
I personally feel adding the additional check for sync_replication_slots may
not improve the situation here. Because the GUC sync_replication_slots can
change at any point, the GUC could be false when performing this addition check
and is set to true immediately after the check, so It could not simplify the logic
anyway.
Best Regards,
Hou zj
On Tue, Apr 16, 2024 at 1:55 PM Zhijie Hou (Fujitsu)
<houzj.fnst@fujitsu.com> wrote:
On Tuesday, April 16, 2024 2:52 PM Bertrand Drouvot <bertranddrouvot.pg@gmail.com> wrote:
Hi,
On Tue, Apr 16, 2024 at 10:00:04AM +0530, shveta malik wrote:
Please find v5 addressing above comments.
Thanks!
@@ -1634,9 +1677,14 @@ SyncReplicationSlots(WalReceiverConn *wrconn) { PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn)); { + check_flags_and_set_sync_info(InvalidPid); +Given the fact that if the sync worker is running it won't be possible to trigger a
manual sync with pg_sync_replication_slots(), what about also checking the
"sync_replication_slots" value at the start of SyncReplicationSlots() and emmit
an error if sync_replication_slots is set to on? (The message could explicitly
states that it's not possible to use the function if sync_replication_slots is set to
on).I personally feel adding the additional check for sync_replication_slots may
not improve the situation here. Because the GUC sync_replication_slots can
change at any point, the GUC could be false when performing this addition check
and is set to true immediately after the check, so It could not simplify the logic
anyway.
+1.
I feel doc and "cannot synchronize replication slots concurrently"
check should suffice.
In the scenario which Hou-San pointed out, if after performing the
GUC check in SQL function, this GUC is enabled immediately and say
worker is started sooner than the function could get chance to sync,
in that case as well, SQL function will ultimately get error "cannot
synchronize replication slots concurrently", even though GUC is
enabled. Thus, I feel we should stick with samer error in all
scenarios.
thanks
Shveta
Hi,
On Tue, Apr 16, 2024 at 02:06:45PM +0530, shveta malik wrote:
On Tue, Apr 16, 2024 at 1:55 PM Zhijie Hou (Fujitsu)
<houzj.fnst@fujitsu.com> wrote:I personally feel adding the additional check for sync_replication_slots may
not improve the situation here. Because the GUC sync_replication_slots can
change at any point, the GUC could be false when performing this addition check
and is set to true immediately after the check, so It could not simplify the logic
anyway.+1.
I feel doc and "cannot synchronize replication slots concurrently"
check should suffice.In the scenario which Hou-San pointed out, if after performing the
GUC check in SQL function, this GUC is enabled immediately and say
worker is started sooner than the function could get chance to sync,
in that case as well, SQL function will ultimately get error "cannot
synchronize replication slots concurrently", even though GUC is
enabled. Thus, I feel we should stick with samer error in all
scenarios.
Okay, fine by me, let's forget about checking sync_replication_slots then.
Regards,
--
Bertrand Drouvot
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
On Tue, Apr 16, 2024 at 2:52 PM Bertrand Drouvot
<bertranddrouvot.pg@gmail.com> wrote:
Hi,
On Tue, Apr 16, 2024 at 02:06:45PM +0530, shveta malik wrote:
On Tue, Apr 16, 2024 at 1:55 PM Zhijie Hou (Fujitsu)
<houzj.fnst@fujitsu.com> wrote:I personally feel adding the additional check for sync_replication_slots may
not improve the situation here. Because the GUC sync_replication_slots can
change at any point, the GUC could be false when performing this addition check
and is set to true immediately after the check, so It could not simplify the logic
anyway.+1.
I feel doc and "cannot synchronize replication slots concurrently"
check should suffice.In the scenario which Hou-San pointed out, if after performing the
GUC check in SQL function, this GUC is enabled immediately and say
worker is started sooner than the function could get chance to sync,
in that case as well, SQL function will ultimately get error "cannot
synchronize replication slots concurrently", even though GUC is
enabled. Thus, I feel we should stick with samer error in all
scenarios.Okay, fine by me, let's forget about checking sync_replication_slots then.
Thanks.
While reviewing and testing this patch further, we encountered 2
race-conditions which needs to be handled:
1) For slot sync worker, the order of cleanup execution was a) first
reset 'syncing' flag (slotsync_failure_callback) b) then reset pid and
syncing (slotsync_worker_onexit). But in ShutDownSlotSync(), we rely
only on the 'syncing' flag for wait-exit logic. So it may so happen
that in the window between these two callbacks, ShutDownSlotSync()
proceeds and calls update_synced_slots_inactive_since() which may then
hit assert Assert((SlotSyncCtx->pid == InvalidPid).
2) Another problem as described by Hou-San off-list:
When the slotsync worker error out after acquiring a slot, it will
first call slotsync_worker_onexit() and then
ReplicationSlotShmemExit(), so in the window between these two
callbacks, it's possible that the SlotSyncCtx->syncing
SlotSyncCtx->pid has been reset but the slot->active_pid is still
valid. The Assert will be broken in this.
@@ -1471,6 +1503,9 @@ update_synced_slots_inactive_since(void)
{
Assert(SlotIsLogical(s));
+ /* The slot must not be acquired by any process */
+ Assert(s->active_pid == 0);
+
To fix above issues, these changes have been made in v7:
1) For worker, replaced slotsync_failure_callback() with
slotsync_worker_disconnect() so that the latter only disconnects and
thus slotsync_worker_onexit() does pid cleanup followed by syncing
flag cleanup. This will make ShutDownSlotSync()'s wait exit reliably.
2) To fix second problem, changes are:
2.1) For worker, moved slotsync_worker_onexit() registration before
BaseInit() (BaseInit is the one doing ReplicationSlotShmemExit
registration). If we do this change in order of registration, then
order of cleanup for worker will be a) slotsync_worker_disconnect() b)
ReplicationSlotShmemExit() c) slotsync_worker_onexit(). This order
ensures that the worker is actually done with slots release and
cleanup before it marks itself as done syncing.
2.2) For SQL function, did ReplicationSlotRelease() and
ReplicationSlotCleanup() as first step in slotsync_failure_callback().
While doing change 2.2, it occurred to us, that it would be a clean
solution to do ReplicationSlotCleanup() even on successful execution
of SQL function. It seems better that the temporary slots are
cleaned-up when SQL function exists, as we do not know when the user
will run this SQL function again and thus leaving temp slots for
longer does not seem a good idea. Thoughts?
thanks
Shveta
Attachments:
v7-0001-Handle-stopSignaled-during-sync-function-call.patchapplication/octet-stream; name=v7-0001-Handle-stopSignaled-during-sync-function-call.patchDownload
From c15d6002b51a4e512e277ed570bd279e13d87b2a Mon Sep 17 00:00:00 2001
From: Shveta Malik <shveta.malik@gmail.com>
Date: Fri, 5 Apr 2024 10:16:03 +0530
Subject: [PATCH v7] Handle stopSignaled during sync function call.
This patch attempts to fix two issues:
1)
It implements promotion related handling needed for slot sync SQL
function pg_sync_replication_slots() simiar to slot sync worker.
Since there is a chance that the user (or any of his scripts/tools)
may execute SQL function pg_sync_replication_slots() in parallel to
promotion, both the promotion and SQL call need to handle it well.
Changes are:
a) If pg_sync_replication_slots() is already running when the
promotion is triggered, ShutDownSlotSync() checks the
'SlotSyncCtx->syncing' flag as well and waits for it to become false
i.e. waits till parallel running SQL function is finished.
b) If pg_sync_replication_slots() is invoked when promotion is
already in progress, pg_sync_replication_slots() respects the
'stopSignaled' flag set by the startup process and errors out.
2)
This patch fixes another issue in slot sync worker where
SignalHandlerForShutdownRequest() needs to be registered *before* setting
SlotSyncCtx->pid, otherwise the slotsync worker could miss to handle
SIGINT sent by startup process(ShutDownSlotSync). To be consistent, all
signale handlers' registration is moved to a prior location before we set
pid.
---
doc/src/sgml/func.sgml | 4 +
src/backend/replication/logical/slotsync.c | 251 ++++++++++++++-------
2 files changed, 171 insertions(+), 84 deletions(-)
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 8dfb42ad4d..65a5fa6bb2 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -29348,6 +29348,10 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
standby server. Temporary synced slots, if any, cannot be used for
logical decoding and must be dropped after promotion. See
<xref linkend="logicaldecoding-replication-slots-synchronization"/> for details.
+ Note that this function cannot be executed if
+ <link linkend="guc-sync-replication-slots"><varname>
+ sync_replication_slots</varname></link> is enabled and the slotsync
+ worker is already running to perform the synchronization of slots.
</para>
<caution>
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index bda0de52db..fea1c678c7 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -92,9 +92,6 @@
* is expected (e.g., slot sync GUCs change), slot sync worker will reset
* last_start_time before exiting, so that postmaster can start the worker
* without waiting for SLOTSYNC_RESTART_INTERVAL_SEC.
- *
- * All the fields except 'syncing' are used only by slotsync worker.
- * 'syncing' is used both by worker and SQL function pg_sync_replication_slots.
*/
typedef struct SlotSyncCtxStruct
{
@@ -807,20 +804,6 @@ synchronize_slots(WalReceiverConn *wrconn)
" FROM pg_catalog.pg_replication_slots"
" WHERE failover and NOT temporary";
- SpinLockAcquire(&SlotSyncCtx->mutex);
- if (SlotSyncCtx->syncing)
- {
- SpinLockRelease(&SlotSyncCtx->mutex);
- ereport(ERROR,
- errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("cannot synchronize replication slots concurrently"));
- }
-
- SlotSyncCtx->syncing = true;
- SpinLockRelease(&SlotSyncCtx->mutex);
-
- syncing_slots = true;
-
/* The syscache access in walrcv_exec() needs a transaction env. */
if (!IsTransactionState())
{
@@ -937,12 +920,6 @@ synchronize_slots(WalReceiverConn *wrconn)
if (started_tx)
CommitTransactionCommand();
- SpinLockAcquire(&SlotSyncCtx->mutex);
- SlotSyncCtx->syncing = false;
- SpinLockRelease(&SlotSyncCtx->mutex);
-
- syncing_slots = false;
-
return some_slot_updated;
}
@@ -1190,6 +1167,19 @@ ProcessSlotSyncInterrupts(WalReceiverConn *wrconn)
slotsync_reread_config();
}
+/*
+ * Connection cleanup function for slotsync worker.
+ *
+ * Called on slotsync worker exit.
+ */
+static void
+slotsync_worker_disconnect(int code, Datum arg)
+{
+ WalReceiverConn *wrconn = (WalReceiverConn *) DatumGetPointer(arg);
+
+ walrcv_disconnect(wrconn);
+}
+
/*
* Cleanup function for slotsync worker.
*
@@ -1199,7 +1189,20 @@ static void
slotsync_worker_onexit(int code, Datum arg)
{
SpinLockAcquire(&SlotSyncCtx->mutex);
+
SlotSyncCtx->pid = InvalidPid;
+
+ /*
+ * If syncing_slots is true, it indicates that the process errored out
+ * without resetting the flag. So, we need to clean up shared memory and
+ * reset the flag here.
+ */
+ if (syncing_slots)
+ {
+ SlotSyncCtx->syncing = false;
+ syncing_slots = false;
+ }
+
SpinLockRelease(&SlotSyncCtx->mutex);
}
@@ -1242,6 +1245,63 @@ wait_for_slot_activity(bool some_slot_updated)
ResetLatch(MyLatch);
}
+/*
+ * Check stopSignaled and syncing flags. Emit error if promotion has
+ * already set stopSignaled or if it is concurrent sync call. Otherwise,
+ * set 'syncing' flag and pid info.
+ */
+static void
+check_flags_and_set_sync_info(pid_t worker_pid)
+{
+ SpinLockAcquire(&SlotSyncCtx->mutex);
+
+ /* The worker pid must not be already assigned in SlotSyncCtx */
+ Assert(worker_pid == InvalidPid || SlotSyncCtx->pid == InvalidPid);
+
+ /*
+ * Startup process signaled the slot sync machinery to stop, so if
+ * meanwhile postmaster ended up starting the worker again or user has
+ * invoked pg_sync_replication_slots(), error out.
+ */
+ if (SlotSyncCtx->stopSignaled)
+ {
+ SpinLockRelease(&SlotSyncCtx->mutex);
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot synchronize replication slots when standby promotion is ongoing"));
+ }
+
+ if (SlotSyncCtx->syncing)
+ {
+ SpinLockRelease(&SlotSyncCtx->mutex);
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot synchronize replication slots concurrently"));
+ }
+
+ SlotSyncCtx->syncing = true;
+
+ /* Advertise our PID so that the startup process can kill us on promotion */
+ SlotSyncCtx->pid = worker_pid;
+
+ SpinLockRelease(&SlotSyncCtx->mutex);
+
+ syncing_slots = true;
+}
+
+/*
+ * Reset syncing flag.
+ */
+static void
+reset_syncing_flag()
+{
+ SpinLockAcquire(&SlotSyncCtx->mutex);
+ SlotSyncCtx->syncing = false;
+ SpinLockRelease(&SlotSyncCtx->mutex);
+
+ syncing_slots = false;
+};
+
/*
* The main loop of our worker process.
*
@@ -1272,52 +1332,21 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
InitProcess();
/*
- * Early initialization.
+ * Register slotsync_worker_onexit() before we register
+ * ReplicationSlotShmemExit() in BaseInit(), to ensure while exiting slot
+ * sync worker, it calls ReplicationSlotShmemExit() first followed by
+ * slotsync_worker_onexit(). Startup process during promotion depends upon
+ * 'syncing' and 'pid', so worker should be done with releasing slots
+ * before it actually marks itself as finished syncing.
*/
- BaseInit();
-
- Assert(SlotSyncCtx != NULL);
-
- SpinLockAcquire(&SlotSyncCtx->mutex);
- Assert(SlotSyncCtx->pid == InvalidPid);
-
- /*
- * Startup process signaled the slot sync worker to stop, so if meanwhile
- * postmaster ended up starting the worker again, exit.
- */
- if (SlotSyncCtx->stopSignaled)
- {
- SpinLockRelease(&SlotSyncCtx->mutex);
- proc_exit(0);
- }
-
- /* Advertise our PID so that the startup process can kill us on promotion */
- SlotSyncCtx->pid = MyProcPid;
- SpinLockRelease(&SlotSyncCtx->mutex);
-
- ereport(LOG, errmsg("slot sync worker started"));
-
- /* Register it as soon as SlotSyncCtx->pid is initialized. */
before_shmem_exit(slotsync_worker_onexit, (Datum) 0);
- /* Setup signal handling */
- pqsignal(SIGHUP, SignalHandlerForConfigReload);
- pqsignal(SIGINT, SignalHandlerForShutdownRequest);
- pqsignal(SIGTERM, die);
- pqsignal(SIGFPE, FloatExceptionHandler);
- pqsignal(SIGUSR1, procsignal_sigusr1_handler);
- pqsignal(SIGUSR2, SIG_IGN);
- pqsignal(SIGPIPE, SIG_IGN);
- pqsignal(SIGCHLD, SIG_DFL);
-
/*
- * Establishes SIGALRM handler and initialize timeout module. It is needed
- * by InitPostgres to register different timeouts.
+ * Early initialization.
*/
- InitializeTimeouts();
+ BaseInit();
- /* Load the libpq-specific functions */
- load_file("libpqwalreceiver", false);
+ Assert(SlotSyncCtx != NULL);
/*
* If an exception is encountered, processing resumes here.
@@ -1350,6 +1379,29 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
/* We can now handle ereport(ERROR) */
PG_exception_stack = &local_sigjmp_buf;
+ /* Setup signal handling */
+ pqsignal(SIGHUP, SignalHandlerForConfigReload);
+ pqsignal(SIGINT, SignalHandlerForShutdownRequest);
+ pqsignal(SIGTERM, die);
+ pqsignal(SIGFPE, FloatExceptionHandler);
+ pqsignal(SIGUSR1, procsignal_sigusr1_handler);
+ pqsignal(SIGUSR2, SIG_IGN);
+ pqsignal(SIGPIPE, SIG_IGN);
+ pqsignal(SIGCHLD, SIG_DFL);
+
+ check_flags_and_set_sync_info(MyProcPid);
+
+ ereport(LOG, errmsg("slot sync worker started"));
+
+ /*
+ * Establishes SIGALRM handler and initialize timeout module. It is needed
+ * by InitPostgres to register different timeouts.
+ */
+ InitializeTimeouts();
+
+ /* Load the libpq-specific functions */
+ load_file("libpqwalreceiver", false);
+
/*
* Unblock signals (they were blocked when the postmaster forked us)
*/
@@ -1408,7 +1460,7 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
* slotsync_worker_onexit() but that will need the connection to be made
* global and we want to avoid introducing global for this purpose.
*/
- before_shmem_exit(slotsync_failure_callback, PointerGetDatum(wrconn));
+ before_shmem_exit(slotsync_worker_disconnect, PointerGetDatum(wrconn));
/*
* Using the specified primary server connection, check that we are not a
@@ -1457,8 +1509,8 @@ update_synced_slots_inactive_since(void)
if (!StandbyMode)
return;
- /* The slot sync worker mustn't be running by now */
- Assert(SlotSyncCtx->pid == InvalidPid);
+ /* The slot sync worker or SQL function mustn't be running by now */
+ Assert((SlotSyncCtx->pid == InvalidPid) && !SlotSyncCtx->syncing);
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
@@ -1471,6 +1523,9 @@ update_synced_slots_inactive_since(void)
{
Assert(SlotIsLogical(s));
+ /* The slot must not be acquired by any process */
+ Assert(s->active_pid == 0);
+
/* Use the same inactive_since time for all the slots. */
if (now == 0)
now = GetCurrentTimestamp();
@@ -1486,6 +1541,10 @@ update_synced_slots_inactive_since(void)
/*
* Shut down the slot sync worker.
+ *
+ * It sends signal to shutdown slot sync worker. It also waits till
+ * the slot sync worker has exited and pg_sync_replication_slots()
+ * has finished.
*/
void
ShutDownSlotSync(void)
@@ -1494,7 +1553,11 @@ ShutDownSlotSync(void)
SlotSyncCtx->stopSignaled = true;
- if (SlotSyncCtx->pid == InvalidPid)
+ /*
+ * Return if neither the slot sync worker is running nor the function
+ * pg_sync_replication_slots() is executing.
+ */
+ if (!SlotSyncCtx->syncing)
{
SpinLockRelease(&SlotSyncCtx->mutex);
update_synced_slots_inactive_since();
@@ -1502,9 +1565,10 @@ ShutDownSlotSync(void)
}
SpinLockRelease(&SlotSyncCtx->mutex);
- kill(SlotSyncCtx->pid, SIGINT);
+ if (SlotSyncCtx->pid != InvalidPid)
+ kill(SlotSyncCtx->pid, SIGINT);
- /* Wait for it to die */
+ /* Wait for slot sync to end */
for (;;)
{
int rc;
@@ -1522,8 +1586,11 @@ ShutDownSlotSync(void)
SpinLockAcquire(&SlotSyncCtx->mutex);
- /* Is it gone? */
- if (SlotSyncCtx->pid == InvalidPid)
+ /*
+ * Confirm that both the worker and the function
+ * pg_sync_replication_slots() are done.
+ */
+ if (!SlotSyncCtx->syncing)
break;
SpinLockRelease(&SlotSyncCtx->mutex);
@@ -1601,26 +1668,34 @@ SlotSyncShmemInit(void)
}
/*
- * Error cleanup callback for slot synchronization.
+ * Error cleanup callback for slot sync SQL function.
*/
static void
slotsync_failure_callback(int code, Datum arg)
{
WalReceiverConn *wrconn = (WalReceiverConn *) DatumGetPointer(arg);
- if (syncing_slots)
- {
- /*
- * If syncing_slots is true, it indicates that the process errored out
- * without resetting the flag. So, we need to clean up shared memory
- * and reset the flag here.
- */
- SpinLockAcquire(&SlotSyncCtx->mutex);
- SlotSyncCtx->syncing = false;
- SpinLockRelease(&SlotSyncCtx->mutex);
+ /*
+ * We need to do slots cleanup here just like WalSndErrorCleanup().
+ *
+ * Startup process during promotion depends upon 'syncing' flag, so SQL
+ * function should be done with slots cleanup before it actually marks
+ * itself as finished syncing.
+ */
+ /* Make sure active replication slots are released */
+ if (MyReplicationSlot != NULL)
+ ReplicationSlotRelease();
- syncing_slots = false;
- }
+ /* Also cleanup all the temporary slots. */
+ ReplicationSlotCleanup();
+
+ /*
+ * If syncing_slots is true, it indicates that the process errored out
+ * without resetting the flag. So, we need to clean up shared memory and
+ * reset the flag here.
+ */
+ if (syncing_slots)
+ reset_syncing_flag();
walrcv_disconnect(wrconn);
}
@@ -1634,9 +1709,17 @@ SyncReplicationSlots(WalReceiverConn *wrconn)
{
PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
{
+ check_flags_and_set_sync_info(InvalidPid);
+
validate_remote_info(wrconn);
synchronize_slots(wrconn);
+
+ /* Cleanup the temporary slots */
+ ReplicationSlotCleanup();
+
+ /* We are done with sync, so reset sync flag */
+ reset_syncing_flag();
}
PG_END_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
}
--
2.34.1
On Thu, Apr 18, 2024 at 12:35 PM shveta malik <shveta.malik@gmail.com> wrote:
To fix above issues, these changes have been made in v7:
Please find v8 attached. Changes are:
1) It fixes ShutDownSlotSync() issue, where we perform
kill(SlotSyncCtx->pid). There are chances that after we release
spin-lock and before we perform kill, slot-sync worker has error-ed
out and has set SlotSyncCtx->pid to InvalidPid (-1) already. And thus
kill(-1) could result in abnormal process kills on some platforms.
Now, we get pid under spin-lock and then use it to perform kill to
avoid pid=-1 kill. This is on a similar line of how ShutdownWalRcv()
does it.
2) Improved comments in code.
3) Updated commit message with new fixes. I had missed to update it in
the previous version.
thanks
Shveta
Attachments:
v8-0001-Handle-stopSignaled-during-sync-function-call.patchapplication/octet-stream; name=v8-0001-Handle-stopSignaled-during-sync-function-call.patchDownload
From d16cd7b89c0d20b109acd525175bb530e91b2e46 Mon Sep 17 00:00:00 2001
From: Shveta Malik <shveta.malik@gmail.com>
Date: Fri, 5 Apr 2024 10:16:03 +0530
Subject: [PATCH v8] Handle stopSignaled during sync function call.
This patch attempts to fix below issues:
1) It implements promotion related handling needed for slot sync
SQL function pg_sync_replication_slots() similar to slot sync worker.
pg_sync_replication_slots() now checks 'stopSignaled' flag before
proceeding with slot-sync, ShutDownSlotSync() OTOH checks 'syncing'
flag instead of 'SlotSyncCtx->pid' to ensure slot sync machinery
is shut down. It also changes the order of slot-sync cleanup
callbacks registration, so that we first release the replication slot,
then set pid to invalid one and then set syncing to false.
Setting 'syncing' to false needs to be performed as the last step of
cleanup so that shutdown flow during promotion can reliably exit
from wait-loop on seeing 'syncing' as false.
2) This patch fixes another issue in slot sync worker where
SignalHandlerForShutdownRequest() needs to be registered *before* setting
SlotSyncCtx->pid, otherwise the slotsync worker could miss to handle
SIGINT sent by the startup process(ShutDownSlotSync) if it is sent before
worker could register SignalHandlerForShutdownRequest(). To be consistent, all
signal handlers' registration is moved to a prior location before we set
worker's pid.
3) This patch fixes another issue in ShutDownSlotSync() where we perform
kill(SlotSyncCtx->pid) after releasing spin-lock. There are chances that
after we release spin-lock and before we perform kill, slot-sync worker
has error-ed out and has set SlotSyncCtx->pid to InvalidPid (-1) and thus
kill(-1) could result in abnormal process kills on some platforms. Now,
we capture SlotSyncCtx->pid in a local variable under spin-lock and later
use that to perform kill.
---
doc/src/sgml/func.sgml | 4 +
src/backend/replication/logical/slotsync.c | 258 ++++++++++++++-------
2 files changed, 178 insertions(+), 84 deletions(-)
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 8dfb42ad4d..65a5fa6bb2 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -29348,6 +29348,10 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
standby server. Temporary synced slots, if any, cannot be used for
logical decoding and must be dropped after promotion. See
<xref linkend="logicaldecoding-replication-slots-synchronization"/> for details.
+ Note that this function cannot be executed if
+ <link linkend="guc-sync-replication-slots"><varname>
+ sync_replication_slots</varname></link> is enabled and the slotsync
+ worker is already running to perform the synchronization of slots.
</para>
<caution>
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index bda0de52db..60300899fc 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -92,9 +92,6 @@
* is expected (e.g., slot sync GUCs change), slot sync worker will reset
* last_start_time before exiting, so that postmaster can start the worker
* without waiting for SLOTSYNC_RESTART_INTERVAL_SEC.
- *
- * All the fields except 'syncing' are used only by slotsync worker.
- * 'syncing' is used both by worker and SQL function pg_sync_replication_slots.
*/
typedef struct SlotSyncCtxStruct
{
@@ -807,20 +804,6 @@ synchronize_slots(WalReceiverConn *wrconn)
" FROM pg_catalog.pg_replication_slots"
" WHERE failover and NOT temporary";
- SpinLockAcquire(&SlotSyncCtx->mutex);
- if (SlotSyncCtx->syncing)
- {
- SpinLockRelease(&SlotSyncCtx->mutex);
- ereport(ERROR,
- errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("cannot synchronize replication slots concurrently"));
- }
-
- SlotSyncCtx->syncing = true;
- SpinLockRelease(&SlotSyncCtx->mutex);
-
- syncing_slots = true;
-
/* The syscache access in walrcv_exec() needs a transaction env. */
if (!IsTransactionState())
{
@@ -937,12 +920,6 @@ synchronize_slots(WalReceiverConn *wrconn)
if (started_tx)
CommitTransactionCommand();
- SpinLockAcquire(&SlotSyncCtx->mutex);
- SlotSyncCtx->syncing = false;
- SpinLockRelease(&SlotSyncCtx->mutex);
-
- syncing_slots = false;
-
return some_slot_updated;
}
@@ -1190,6 +1167,19 @@ ProcessSlotSyncInterrupts(WalReceiverConn *wrconn)
slotsync_reread_config();
}
+/*
+ * Connection cleanup function for slotsync worker.
+ *
+ * Called on slotsync worker exit.
+ */
+static void
+slotsync_worker_disconnect(int code, Datum arg)
+{
+ WalReceiverConn *wrconn = (WalReceiverConn *) DatumGetPointer(arg);
+
+ walrcv_disconnect(wrconn);
+}
+
/*
* Cleanup function for slotsync worker.
*
@@ -1199,7 +1189,20 @@ static void
slotsync_worker_onexit(int code, Datum arg)
{
SpinLockAcquire(&SlotSyncCtx->mutex);
+
SlotSyncCtx->pid = InvalidPid;
+
+ /*
+ * If syncing_slots is true, it indicates that the process errored out
+ * without resetting the flag. So, we need to clean up shared memory and
+ * reset the flag here.
+ */
+ if (syncing_slots)
+ {
+ SlotSyncCtx->syncing = false;
+ syncing_slots = false;
+ }
+
SpinLockRelease(&SlotSyncCtx->mutex);
}
@@ -1242,6 +1245,63 @@ wait_for_slot_activity(bool some_slot_updated)
ResetLatch(MyLatch);
}
+/*
+ * Check stopSignaled and syncing flags. Emit error if promotion has
+ * already set stopSignaled or if it is concurrent sync call. Otherwise,
+ * set 'syncing' flag and pid info.
+ */
+static void
+check_flags_and_set_sync_info(pid_t worker_pid)
+{
+ SpinLockAcquire(&SlotSyncCtx->mutex);
+
+ /* The worker pid must not be already assigned in SlotSyncCtx */
+ Assert(worker_pid == InvalidPid || SlotSyncCtx->pid == InvalidPid);
+
+ /*
+ * Startup process signaled the slot sync machinery to stop, so if
+ * meanwhile postmaster ended up starting the worker again or user has
+ * invoked pg_sync_replication_slots(), error out.
+ */
+ if (SlotSyncCtx->stopSignaled)
+ {
+ SpinLockRelease(&SlotSyncCtx->mutex);
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot synchronize replication slots when standby promotion is ongoing"));
+ }
+
+ if (SlotSyncCtx->syncing)
+ {
+ SpinLockRelease(&SlotSyncCtx->mutex);
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot synchronize replication slots concurrently"));
+ }
+
+ SlotSyncCtx->syncing = true;
+
+ /* Advertise our PID so that the startup process can kill us on promotion */
+ SlotSyncCtx->pid = worker_pid;
+
+ SpinLockRelease(&SlotSyncCtx->mutex);
+
+ syncing_slots = true;
+}
+
+/*
+ * Reset syncing flag.
+ */
+static void
+reset_syncing_flag()
+{
+ SpinLockAcquire(&SlotSyncCtx->mutex);
+ SlotSyncCtx->syncing = false;
+ SpinLockRelease(&SlotSyncCtx->mutex);
+
+ syncing_slots = false;
+};
+
/*
* The main loop of our worker process.
*
@@ -1272,52 +1332,22 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
InitProcess();
/*
- * Early initialization.
+ * Register slotsync_worker_onexit() before we register
+ * ReplicationSlotShmemExit() in BaseInit(), to ensure that during exit of
+ * slot sync worker, ReplicationSlotShmemExit() is called first, followed
+ * by slotsync_worker_onexit(). Startup process during promotion waits for
+ * slot sync to finish and it does that by checking the 'syncing' flag.
+ * Thus it is important that worker should be done with slots' release and
+ * cleanup before it actually marks itself as finished syncing.
*/
- BaseInit();
-
- Assert(SlotSyncCtx != NULL);
-
- SpinLockAcquire(&SlotSyncCtx->mutex);
- Assert(SlotSyncCtx->pid == InvalidPid);
-
- /*
- * Startup process signaled the slot sync worker to stop, so if meanwhile
- * postmaster ended up starting the worker again, exit.
- */
- if (SlotSyncCtx->stopSignaled)
- {
- SpinLockRelease(&SlotSyncCtx->mutex);
- proc_exit(0);
- }
-
- /* Advertise our PID so that the startup process can kill us on promotion */
- SlotSyncCtx->pid = MyProcPid;
- SpinLockRelease(&SlotSyncCtx->mutex);
-
- ereport(LOG, errmsg("slot sync worker started"));
-
- /* Register it as soon as SlotSyncCtx->pid is initialized. */
before_shmem_exit(slotsync_worker_onexit, (Datum) 0);
- /* Setup signal handling */
- pqsignal(SIGHUP, SignalHandlerForConfigReload);
- pqsignal(SIGINT, SignalHandlerForShutdownRequest);
- pqsignal(SIGTERM, die);
- pqsignal(SIGFPE, FloatExceptionHandler);
- pqsignal(SIGUSR1, procsignal_sigusr1_handler);
- pqsignal(SIGUSR2, SIG_IGN);
- pqsignal(SIGPIPE, SIG_IGN);
- pqsignal(SIGCHLD, SIG_DFL);
-
/*
- * Establishes SIGALRM handler and initialize timeout module. It is needed
- * by InitPostgres to register different timeouts.
+ * Early initialization.
*/
- InitializeTimeouts();
+ BaseInit();
- /* Load the libpq-specific functions */
- load_file("libpqwalreceiver", false);
+ Assert(SlotSyncCtx != NULL);
/*
* If an exception is encountered, processing resumes here.
@@ -1350,6 +1380,29 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
/* We can now handle ereport(ERROR) */
PG_exception_stack = &local_sigjmp_buf;
+ /* Setup signal handling */
+ pqsignal(SIGHUP, SignalHandlerForConfigReload);
+ pqsignal(SIGINT, SignalHandlerForShutdownRequest);
+ pqsignal(SIGTERM, die);
+ pqsignal(SIGFPE, FloatExceptionHandler);
+ pqsignal(SIGUSR1, procsignal_sigusr1_handler);
+ pqsignal(SIGUSR2, SIG_IGN);
+ pqsignal(SIGPIPE, SIG_IGN);
+ pqsignal(SIGCHLD, SIG_DFL);
+
+ check_flags_and_set_sync_info(MyProcPid);
+
+ ereport(LOG, errmsg("slot sync worker started"));
+
+ /*
+ * Establishes SIGALRM handler and initialize timeout module. It is needed
+ * by InitPostgres to register different timeouts.
+ */
+ InitializeTimeouts();
+
+ /* Load the libpq-specific functions */
+ load_file("libpqwalreceiver", false);
+
/*
* Unblock signals (they were blocked when the postmaster forked us)
*/
@@ -1408,7 +1461,7 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
* slotsync_worker_onexit() but that will need the connection to be made
* global and we want to avoid introducing global for this purpose.
*/
- before_shmem_exit(slotsync_failure_callback, PointerGetDatum(wrconn));
+ before_shmem_exit(slotsync_worker_disconnect, PointerGetDatum(wrconn));
/*
* Using the specified primary server connection, check that we are not a
@@ -1457,8 +1510,8 @@ update_synced_slots_inactive_since(void)
if (!StandbyMode)
return;
- /* The slot sync worker mustn't be running by now */
- Assert(SlotSyncCtx->pid == InvalidPid);
+ /* The slot sync worker or SQL function mustn't be running by now */
+ Assert((SlotSyncCtx->pid == InvalidPid) && !SlotSyncCtx->syncing);
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
@@ -1471,6 +1524,9 @@ update_synced_slots_inactive_since(void)
{
Assert(SlotIsLogical(s));
+ /* The slot must not be acquired by any process */
+ Assert(s->active_pid == 0);
+
/* Use the same inactive_since time for all the slots. */
if (now == 0)
now = GetCurrentTimestamp();
@@ -1486,25 +1542,39 @@ update_synced_slots_inactive_since(void)
/*
* Shut down the slot sync worker.
+ *
+ * It sends signal to shutdown slot sync worker. It also waits till
+ * the slot sync worker has exited and pg_sync_replication_slots()
+ * has finished.
*/
void
ShutDownSlotSync(void)
{
+ pid_t worker_pid;
+
SpinLockAcquire(&SlotSyncCtx->mutex);
SlotSyncCtx->stopSignaled = true;
- if (SlotSyncCtx->pid == InvalidPid)
+ /*
+ * Return if neither the slot sync worker is running nor the function
+ * pg_sync_replication_slots() is executing.
+ */
+ if (!SlotSyncCtx->syncing)
{
SpinLockRelease(&SlotSyncCtx->mutex);
update_synced_slots_inactive_since();
return;
}
+
+ worker_pid = SlotSyncCtx->pid;
+
SpinLockRelease(&SlotSyncCtx->mutex);
- kill(SlotSyncCtx->pid, SIGINT);
+ if (worker_pid != InvalidPid)
+ kill(worker_pid, SIGINT);
- /* Wait for it to die */
+ /* Wait for slot sync to end */
for (;;)
{
int rc;
@@ -1522,8 +1592,11 @@ ShutDownSlotSync(void)
SpinLockAcquire(&SlotSyncCtx->mutex);
- /* Is it gone? */
- if (SlotSyncCtx->pid == InvalidPid)
+ /*
+ * Confirm that both the worker and the function
+ * pg_sync_replication_slots() are done.
+ */
+ if (!SlotSyncCtx->syncing)
break;
SpinLockRelease(&SlotSyncCtx->mutex);
@@ -1601,26 +1674,35 @@ SlotSyncShmemInit(void)
}
/*
- * Error cleanup callback for slot synchronization.
+ * Error cleanup callback for slot sync SQL function.
*/
static void
slotsync_failure_callback(int code, Datum arg)
{
WalReceiverConn *wrconn = (WalReceiverConn *) DatumGetPointer(arg);
- if (syncing_slots)
- {
- /*
- * If syncing_slots is true, it indicates that the process errored out
- * without resetting the flag. So, we need to clean up shared memory
- * and reset the flag here.
- */
- SpinLockAcquire(&SlotSyncCtx->mutex);
- SlotSyncCtx->syncing = false;
- SpinLockRelease(&SlotSyncCtx->mutex);
+ /*
+ * We need to do slots cleanup here just like WalSndErrorCleanup() does.
+ *
+ * Startup process during promotion waits for slot sync to finish and it
+ * does that by checking the 'syncing' flag. Thus it is important that SQL
+ * function should be done with slots' release and cleanup before it
+ * actually marks itself as finished syncing.
+ */
+ /* Make sure active replication slots are released */
+ if (MyReplicationSlot != NULL)
+ ReplicationSlotRelease();
- syncing_slots = false;
- }
+ /* Also cleanup all the temporary slots. */
+ ReplicationSlotCleanup();
+
+ /*
+ * If syncing_slots is true, it indicates that the process errored out
+ * without resetting the flag. So, we need to clean up shared memory and
+ * reset the flag here.
+ */
+ if (syncing_slots)
+ reset_syncing_flag();
walrcv_disconnect(wrconn);
}
@@ -1634,9 +1716,17 @@ SyncReplicationSlots(WalReceiverConn *wrconn)
{
PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
{
+ check_flags_and_set_sync_info(InvalidPid);
+
validate_remote_info(wrconn);
synchronize_slots(wrconn);
+
+ /* Cleanup the temporary slots */
+ ReplicationSlotCleanup();
+
+ /* We are done with sync, so reset sync flag */
+ reset_syncing_flag();
}
PG_END_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
}
--
2.34.1
Hi,
On Thu, Apr 18, 2024 at 05:36:05PM +0530, shveta malik wrote:
Please find v8 attached. Changes are:
Thanks!
A few comments:
1 ===
@@ -1440,7 +1461,7 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
* slotsync_worker_onexit() but that will need the connection to be made
* global and we want to avoid introducing global for this purpose.
*/
- before_shmem_exit(slotsync_failure_callback, PointerGetDatum(wrconn));
+ before_shmem_exit(slotsync_worker_disconnect, PointerGetDatum(wrconn));
The comment above this change still states "Register the failure callback once
we have the connection", I think it has to be reworded a bit now that v8 is
making use of slotsync_worker_disconnect().
2 ===
+ * Register slotsync_worker_onexit() before we register
+ * ReplicationSlotShmemExit() in BaseInit(), to ensure that during exit of
+ * slot sync worker, ReplicationSlotShmemExit() is called first, followed
+ * by slotsync_worker_onexit(). Startup process during promotion waits for
Worth to mention in shmem_exit() (where it "while (--before_shmem_exit_index >= 0)"
or before the shmem_exit() definition) that ReplSlotSyncWorkerMain() relies on
this LIFO behavior? (not sure if there is other "strong" LIFO requirement in
other part of the code).
3 ===
+ * Startup process during promotion waits for slot sync to finish and it
+ * does that by checking the 'syncing' flag.
worth to mention ShutDownSlotSync()?
4 ===
I did a few tests manually (launching ShutDownSlotSync() through gdb / with and
without sync worker and with / without pg_sync_replication_slots() running
concurrently) and it looks like it works as designed.
Having said that, the logic that is in place to take care of the corner cases
described up-thread seems reasonable to me.
Regards,
--
Bertrand Drouvot
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
On Fri, Apr 19, 2024 at 10:53 AM Bertrand Drouvot
<bertranddrouvot.pg@gmail.com> wrote:
Hi,
On Thu, Apr 18, 2024 at 05:36:05PM +0530, shveta malik wrote:
Please find v8 attached. Changes are:
Thanks!
A few comments:
Thanks for reviewing.
1 ===
@@ -1440,7 +1461,7 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len) * slotsync_worker_onexit() but that will need the connection to be made * global and we want to avoid introducing global for this purpose. */ - before_shmem_exit(slotsync_failure_callback, PointerGetDatum(wrconn)); + before_shmem_exit(slotsync_worker_disconnect, PointerGetDatum(wrconn));The comment above this change still states "Register the failure callback once
we have the connection", I think it has to be reworded a bit now that v8 is
making use of slotsync_worker_disconnect().2 ===
+ * Register slotsync_worker_onexit() before we register + * ReplicationSlotShmemExit() in BaseInit(), to ensure that during exit of + * slot sync worker, ReplicationSlotShmemExit() is called first, followed + * by slotsync_worker_onexit(). Startup process during promotion waits forWorth to mention in shmem_exit() (where it "while (--before_shmem_exit_index >= 0)"
or before the shmem_exit() definition) that ReplSlotSyncWorkerMain() relies on
this LIFO behavior? (not sure if there is other "strong" LIFO requirement in
other part of the code).
I see other modules as well relying on LIFO behavior.
Please see applyparallelworker.c where
'before_shmem_exit(pa_shutdown)' is needed to be done after
'before_shmem_exit(logicalrep_worker_onexit)' (commit id 3d144c6).
Also in postinit.c, I see such comments atop
'before_shmem_exit(ShutdownPostgres, 0)'.
I feel we can skip adding this specific comment about
ReplSlotSyncWorkerMain() in ipc.c, as none of the other modules has
also not added any. I will address the rest of your comments in the
next version.
3 ===
+ * Startup process during promotion waits for slot sync to finish and it + * does that by checking the 'syncing' flag.worth to mention ShutDownSlotSync()?
4 ===
I did a few tests manually (launching ShutDownSlotSync() through gdb / with and
without sync worker and with / without pg_sync_replication_slots() running
concurrently) and it looks like it works as designed.
Thanks for testing it.
Having said that, the logic that is in place to take care of the corner cases
described up-thread seems reasonable to me.
thanks
Shveta
On Fri, Apr 19, 2024 at 11:37 AM shveta malik <shveta.malik@gmail.com> wrote:
On Fri, Apr 19, 2024 at 10:53 AM Bertrand Drouvot
<bertranddrouvot.pg@gmail.com> wrote:Hi,
On Thu, Apr 18, 2024 at 05:36:05PM +0530, shveta malik wrote:
Please find v8 attached. Changes are:
Thanks!
A few comments:
Thanks for reviewing.
1 ===
@@ -1440,7 +1461,7 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len) * slotsync_worker_onexit() but that will need the connection to be made * global and we want to avoid introducing global for this purpose. */ - before_shmem_exit(slotsync_failure_callback, PointerGetDatum(wrconn)); + before_shmem_exit(slotsync_worker_disconnect, PointerGetDatum(wrconn));The comment above this change still states "Register the failure callback once
we have the connection", I think it has to be reworded a bit now that v8 is
making use of slotsync_worker_disconnect().2 ===
+ * Register slotsync_worker_onexit() before we register + * ReplicationSlotShmemExit() in BaseInit(), to ensure that during exit of + * slot sync worker, ReplicationSlotShmemExit() is called first, followed + * by slotsync_worker_onexit(). Startup process during promotion waits forWorth to mention in shmem_exit() (where it "while (--before_shmem_exit_index >= 0)"
or before the shmem_exit() definition) that ReplSlotSyncWorkerMain() relies on
this LIFO behavior? (not sure if there is other "strong" LIFO requirement in
other part of the code).I see other modules as well relying on LIFO behavior.
Please see applyparallelworker.c where
'before_shmem_exit(pa_shutdown)' is needed to be done after
'before_shmem_exit(logicalrep_worker_onexit)' (commit id 3d144c6).
Also in postinit.c, I see such comments atop
'before_shmem_exit(ShutdownPostgres, 0)'.
I feel we can skip adding this specific comment about
ReplSlotSyncWorkerMain() in ipc.c, as none of the other modules has
also not added any. I will address the rest of your comments in the
next version.3 ===
+ * Startup process during promotion waits for slot sync to finish and it + * does that by checking the 'syncing' flag.worth to mention ShutDownSlotSync()?
4 ===
I did a few tests manually (launching ShutDownSlotSync() through gdb / with and
without sync worker and with / without pg_sync_replication_slots() running
concurrently) and it looks like it works as designed.Thanks for testing it.
Having said that, the logic that is in place to take care of the corner cases
described up-thread seems reasonable to me.
Please find v9 with the above comments addressed.
thanks
Shveta
Attachments:
v9-0001-Handle-stopSignaled-during-sync-function-call.patchapplication/octet-stream; name=v9-0001-Handle-stopSignaled-during-sync-function-call.patchDownload
From 3f89c4894552a7c2282308f4d4ff921949264fa2 Mon Sep 17 00:00:00 2001
From: Shveta Malik <shveta.malik@gmail.com>
Date: Fri, 5 Apr 2024 10:16:03 +0530
Subject: [PATCH v9] Handle stopSignaled during sync function call.
This patch attempts to fix below issues:
1) It implements promotion related handling needed for slot sync
SQL function pg_sync_replication_slots() similar to slot sync worker.
pg_sync_replication_slots() now checks 'stopSignaled' flag before
proceeding with slot sync; ShutDownSlotSync() OTOH checks 'syncing'
flag instead of 'SlotSyncCtx->pid' to ensure slot sync machinery
is shut down. It also changes the order of slot-sync cleanup
callbacks registration, so that we first release the replication slot,
then set pid to invalid one and then set syncing to false.
Setting 'syncing' to false needs to be performed as the last step of
cleanup so that shutdown flow during promotion can reliably exit
from wait-loop on seeing 'syncing' as false.
2) This patch fixes another issue in slot sync worker where
SignalHandlerForShutdownRequest() needs to be registered *before* setting
SlotSyncCtx->pid, otherwise the slotsync worker could miss to handle
SIGINT sent by the startup process(ShutDownSlotSync) if it is sent before
worker could register SignalHandlerForShutdownRequest(). To be consistent, all
signal handlers' registration is moved to a prior location before we set
worker's pid.
3) This patch fixes another issue in ShutDownSlotSync() where we perform
kill(SlotSyncCtx->pid) after releasing spin-lock. There are chances that
after we release spin-lock and before we perform kill, slot-sync worker
has error-ed out and has set SlotSyncCtx->pid to InvalidPid (-1) and thus
kill(-1) could result in abnormal process kills on some platforms. Now,
we capture SlotSyncCtx->pid in a local variable under spin-lock and later
use that to perform kill.
---
doc/src/sgml/func.sgml | 4 +
src/backend/replication/logical/slotsync.c | 264 ++++++++++++++-------
2 files changed, 182 insertions(+), 86 deletions(-)
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 8dfb42ad4d..65a5fa6bb2 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -29348,6 +29348,10 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
standby server. Temporary synced slots, if any, cannot be used for
logical decoding and must be dropped after promotion. See
<xref linkend="logicaldecoding-replication-slots-synchronization"/> for details.
+ Note that this function cannot be executed if
+ <link linkend="guc-sync-replication-slots"><varname>
+ sync_replication_slots</varname></link> is enabled and the slotsync
+ worker is already running to perform the synchronization of slots.
</para>
<caution>
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index bda0de52db..32a262d613 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -92,9 +92,6 @@
* is expected (e.g., slot sync GUCs change), slot sync worker will reset
* last_start_time before exiting, so that postmaster can start the worker
* without waiting for SLOTSYNC_RESTART_INTERVAL_SEC.
- *
- * All the fields except 'syncing' are used only by slotsync worker.
- * 'syncing' is used both by worker and SQL function pg_sync_replication_slots.
*/
typedef struct SlotSyncCtxStruct
{
@@ -807,20 +804,6 @@ synchronize_slots(WalReceiverConn *wrconn)
" FROM pg_catalog.pg_replication_slots"
" WHERE failover and NOT temporary";
- SpinLockAcquire(&SlotSyncCtx->mutex);
- if (SlotSyncCtx->syncing)
- {
- SpinLockRelease(&SlotSyncCtx->mutex);
- ereport(ERROR,
- errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("cannot synchronize replication slots concurrently"));
- }
-
- SlotSyncCtx->syncing = true;
- SpinLockRelease(&SlotSyncCtx->mutex);
-
- syncing_slots = true;
-
/* The syscache access in walrcv_exec() needs a transaction env. */
if (!IsTransactionState())
{
@@ -937,12 +920,6 @@ synchronize_slots(WalReceiverConn *wrconn)
if (started_tx)
CommitTransactionCommand();
- SpinLockAcquire(&SlotSyncCtx->mutex);
- SlotSyncCtx->syncing = false;
- SpinLockRelease(&SlotSyncCtx->mutex);
-
- syncing_slots = false;
-
return some_slot_updated;
}
@@ -1190,6 +1167,19 @@ ProcessSlotSyncInterrupts(WalReceiverConn *wrconn)
slotsync_reread_config();
}
+/*
+ * Connection cleanup function for slotsync worker.
+ *
+ * Called on slotsync worker exit.
+ */
+static void
+slotsync_worker_disconnect(int code, Datum arg)
+{
+ WalReceiverConn *wrconn = (WalReceiverConn *) DatumGetPointer(arg);
+
+ walrcv_disconnect(wrconn);
+}
+
/*
* Cleanup function for slotsync worker.
*
@@ -1199,7 +1189,20 @@ static void
slotsync_worker_onexit(int code, Datum arg)
{
SpinLockAcquire(&SlotSyncCtx->mutex);
+
SlotSyncCtx->pid = InvalidPid;
+
+ /*
+ * If syncing_slots is true, it indicates that the process errored out
+ * without resetting the flag. So, we need to clean up shared memory and
+ * reset the flag here.
+ */
+ if (syncing_slots)
+ {
+ SlotSyncCtx->syncing = false;
+ syncing_slots = false;
+ }
+
SpinLockRelease(&SlotSyncCtx->mutex);
}
@@ -1242,6 +1245,63 @@ wait_for_slot_activity(bool some_slot_updated)
ResetLatch(MyLatch);
}
+/*
+ * Check stopSignaled and syncing flags. Emit error if promotion has
+ * already set stopSignaled or if it is concurrent sync call. Otherwise,
+ * set 'syncing' flag and pid info.
+ */
+static void
+check_flags_and_set_sync_info(pid_t worker_pid)
+{
+ SpinLockAcquire(&SlotSyncCtx->mutex);
+
+ /* The worker pid must not be already assigned in SlotSyncCtx */
+ Assert(worker_pid == InvalidPid || SlotSyncCtx->pid == InvalidPid);
+
+ /*
+ * Startup process signaled the slot sync machinery to stop, so if
+ * meanwhile postmaster ended up starting the worker again or user has
+ * invoked pg_sync_replication_slots(), error out.
+ */
+ if (SlotSyncCtx->stopSignaled)
+ {
+ SpinLockRelease(&SlotSyncCtx->mutex);
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot synchronize replication slots when standby promotion is ongoing"));
+ }
+
+ if (SlotSyncCtx->syncing)
+ {
+ SpinLockRelease(&SlotSyncCtx->mutex);
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot synchronize replication slots concurrently"));
+ }
+
+ SlotSyncCtx->syncing = true;
+
+ /* Advertise our PID so that the startup process can kill us on promotion */
+ SlotSyncCtx->pid = worker_pid;
+
+ SpinLockRelease(&SlotSyncCtx->mutex);
+
+ syncing_slots = true;
+}
+
+/*
+ * Reset syncing flag.
+ */
+static void
+reset_syncing_flag()
+{
+ SpinLockAcquire(&SlotSyncCtx->mutex);
+ SlotSyncCtx->syncing = false;
+ SpinLockRelease(&SlotSyncCtx->mutex);
+
+ syncing_slots = false;
+};
+
/*
* The main loop of our worker process.
*
@@ -1272,52 +1332,23 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
InitProcess();
/*
- * Early initialization.
+ * Register slotsync_worker_onexit() before we register
+ * ReplicationSlotShmemExit() in BaseInit(), to ensure that during exit of
+ * slot sync worker, ReplicationSlotShmemExit() is called first, followed
+ * by slotsync_worker_onexit(). Startup process during promotion calls
+ * ShutDownSlotSync() which waits for slot sync to finish and it does that
+ * by checking the 'syncing' flag. Thus it is important that worker should
+ * be done with slots' release and cleanup before it actually marks itself
+ * as finished syncing.
*/
- BaseInit();
-
- Assert(SlotSyncCtx != NULL);
-
- SpinLockAcquire(&SlotSyncCtx->mutex);
- Assert(SlotSyncCtx->pid == InvalidPid);
-
- /*
- * Startup process signaled the slot sync worker to stop, so if meanwhile
- * postmaster ended up starting the worker again, exit.
- */
- if (SlotSyncCtx->stopSignaled)
- {
- SpinLockRelease(&SlotSyncCtx->mutex);
- proc_exit(0);
- }
-
- /* Advertise our PID so that the startup process can kill us on promotion */
- SlotSyncCtx->pid = MyProcPid;
- SpinLockRelease(&SlotSyncCtx->mutex);
-
- ereport(LOG, errmsg("slot sync worker started"));
-
- /* Register it as soon as SlotSyncCtx->pid is initialized. */
before_shmem_exit(slotsync_worker_onexit, (Datum) 0);
- /* Setup signal handling */
- pqsignal(SIGHUP, SignalHandlerForConfigReload);
- pqsignal(SIGINT, SignalHandlerForShutdownRequest);
- pqsignal(SIGTERM, die);
- pqsignal(SIGFPE, FloatExceptionHandler);
- pqsignal(SIGUSR1, procsignal_sigusr1_handler);
- pqsignal(SIGUSR2, SIG_IGN);
- pqsignal(SIGPIPE, SIG_IGN);
- pqsignal(SIGCHLD, SIG_DFL);
-
/*
- * Establishes SIGALRM handler and initialize timeout module. It is needed
- * by InitPostgres to register different timeouts.
+ * Early initialization.
*/
- InitializeTimeouts();
+ BaseInit();
- /* Load the libpq-specific functions */
- load_file("libpqwalreceiver", false);
+ Assert(SlotSyncCtx != NULL);
/*
* If an exception is encountered, processing resumes here.
@@ -1350,6 +1381,29 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
/* We can now handle ereport(ERROR) */
PG_exception_stack = &local_sigjmp_buf;
+ /* Setup signal handling */
+ pqsignal(SIGHUP, SignalHandlerForConfigReload);
+ pqsignal(SIGINT, SignalHandlerForShutdownRequest);
+ pqsignal(SIGTERM, die);
+ pqsignal(SIGFPE, FloatExceptionHandler);
+ pqsignal(SIGUSR1, procsignal_sigusr1_handler);
+ pqsignal(SIGUSR2, SIG_IGN);
+ pqsignal(SIGPIPE, SIG_IGN);
+ pqsignal(SIGCHLD, SIG_DFL);
+
+ check_flags_and_set_sync_info(MyProcPid);
+
+ ereport(LOG, errmsg("slot sync worker started"));
+
+ /*
+ * Establishes SIGALRM handler and initialize timeout module. It is needed
+ * by InitPostgres to register different timeouts.
+ */
+ InitializeTimeouts();
+
+ /* Load the libpq-specific functions */
+ load_file("libpqwalreceiver", false);
+
/*
* Unblock signals (they were blocked when the postmaster forked us)
*/
@@ -1402,13 +1456,13 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
errmsg("could not connect to the primary server: %s", err));
/*
- * Register the failure callback once we have the connection.
+ * Register the disconnection callback.
*
- * XXX: This can be combined with previous such cleanup registration of
+ * XXX: This can be combined with previous cleanup registration of
* slotsync_worker_onexit() but that will need the connection to be made
* global and we want to avoid introducing global for this purpose.
*/
- before_shmem_exit(slotsync_failure_callback, PointerGetDatum(wrconn));
+ before_shmem_exit(slotsync_worker_disconnect, PointerGetDatum(wrconn));
/*
* Using the specified primary server connection, check that we are not a
@@ -1457,8 +1511,8 @@ update_synced_slots_inactive_since(void)
if (!StandbyMode)
return;
- /* The slot sync worker mustn't be running by now */
- Assert(SlotSyncCtx->pid == InvalidPid);
+ /* The slot sync worker or SQL function mustn't be running by now */
+ Assert((SlotSyncCtx->pid == InvalidPid) && !SlotSyncCtx->syncing);
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
@@ -1471,6 +1525,9 @@ update_synced_slots_inactive_since(void)
{
Assert(SlotIsLogical(s));
+ /* The slot must not be acquired by any process */
+ Assert(s->active_pid == 0);
+
/* Use the same inactive_since time for all the slots. */
if (now == 0)
now = GetCurrentTimestamp();
@@ -1486,25 +1543,39 @@ update_synced_slots_inactive_since(void)
/*
* Shut down the slot sync worker.
+ *
+ * It sends signal to shutdown slot sync worker. It also waits till
+ * the slot sync worker has exited and pg_sync_replication_slots()
+ * has finished.
*/
void
ShutDownSlotSync(void)
{
+ pid_t worker_pid;
+
SpinLockAcquire(&SlotSyncCtx->mutex);
SlotSyncCtx->stopSignaled = true;
- if (SlotSyncCtx->pid == InvalidPid)
+ /*
+ * Return if neither the slot sync worker is running nor the function
+ * pg_sync_replication_slots() is executing.
+ */
+ if (!SlotSyncCtx->syncing)
{
SpinLockRelease(&SlotSyncCtx->mutex);
update_synced_slots_inactive_since();
return;
}
+
+ worker_pid = SlotSyncCtx->pid;
+
SpinLockRelease(&SlotSyncCtx->mutex);
- kill(SlotSyncCtx->pid, SIGINT);
+ if (worker_pid != InvalidPid)
+ kill(worker_pid, SIGINT);
- /* Wait for it to die */
+ /* Wait for slot sync to end */
for (;;)
{
int rc;
@@ -1522,8 +1593,11 @@ ShutDownSlotSync(void)
SpinLockAcquire(&SlotSyncCtx->mutex);
- /* Is it gone? */
- if (SlotSyncCtx->pid == InvalidPid)
+ /*
+ * Confirm that both the worker and the function
+ * pg_sync_replication_slots() are done.
+ */
+ if (!SlotSyncCtx->syncing)
break;
SpinLockRelease(&SlotSyncCtx->mutex);
@@ -1601,26 +1675,36 @@ SlotSyncShmemInit(void)
}
/*
- * Error cleanup callback for slot synchronization.
+ * Error cleanup callback for slot sync SQL function.
*/
static void
slotsync_failure_callback(int code, Datum arg)
{
WalReceiverConn *wrconn = (WalReceiverConn *) DatumGetPointer(arg);
- if (syncing_slots)
- {
- /*
- * If syncing_slots is true, it indicates that the process errored out
- * without resetting the flag. So, we need to clean up shared memory
- * and reset the flag here.
- */
- SpinLockAcquire(&SlotSyncCtx->mutex);
- SlotSyncCtx->syncing = false;
- SpinLockRelease(&SlotSyncCtx->mutex);
+ /*
+ * We need to do slots cleanup here just like WalSndErrorCleanup() does.
+ *
+ * Startup process during promotion calls ShutDownSlotSync() which waits
+ * for slot sync to finish and it does that by checking the 'syncing'
+ * flag. Thus it is important that SQL function should be done with slots'
+ * release and cleanup before it actually marks itself as finished
+ * syncing.
+ */
+ /* Make sure active replication slots are released */
+ if (MyReplicationSlot != NULL)
+ ReplicationSlotRelease();
- syncing_slots = false;
- }
+ /* Also cleanup all the temporary slots. */
+ ReplicationSlotCleanup();
+
+ /*
+ * If syncing_slots is true, it indicates that the process errored out
+ * without resetting the flag. So, we need to clean up shared memory and
+ * reset the flag here.
+ */
+ if (syncing_slots)
+ reset_syncing_flag();
walrcv_disconnect(wrconn);
}
@@ -1634,9 +1718,17 @@ SyncReplicationSlots(WalReceiverConn *wrconn)
{
PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
{
+ check_flags_and_set_sync_info(InvalidPid);
+
validate_remote_info(wrconn);
synchronize_slots(wrconn);
+
+ /* Cleanup the temporary slots */
+ ReplicationSlotCleanup();
+
+ /* We are done with sync, so reset sync flag */
+ reset_syncing_flag();
}
PG_END_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
}
--
2.34.1
On Friday, April 19, 2024 4:22 PM shveta malik <shveta.malik@gmail.com> wrote:
On Fri, Apr 19, 2024 at 11:37 AM shveta malik <shveta.malik@gmail.com> wrote:
On Fri, Apr 19, 2024 at 10:53 AM Bertrand Drouvot
<bertranddrouvot.pg@gmail.com> wrote:Hi,
On Thu, Apr 18, 2024 at 05:36:05PM +0530, shveta malik wrote:
Please find v8 attached. Changes are:
Thanks!
A few comments:
Thanks for reviewing.
1 ===
@@ -1440,7 +1461,7 @@ ReplSlotSyncWorkerMain(char *startup_data,
size_t startup_data_len)
* slotsync_worker_onexit() but that will need the connection to be
made
* global and we want to avoid introducing global for this purpose.
*/
- before_shmem_exit(slotsync_failure_callback,PointerGetDatum(wrconn));
+ before_shmem_exit(slotsync_worker_disconnect, + PointerGetDatum(wrconn));The comment above this change still states "Register the failure
callback once we have the connection", I think it has to be reworded
a bit now that v8 is making use of slotsync_worker_disconnect().2 ===
+ * Register slotsync_worker_onexit() before we register + * ReplicationSlotShmemExit() in BaseInit(), to ensure that duringexit of
+ * slot sync worker, ReplicationSlotShmemExit() is called first,
followed
+ * by slotsync_worker_onexit(). Startup process during + promotion waits forWorth to mention in shmem_exit() (where it "while
(--before_shmem_exit_index >= 0)"
or before the shmem_exit() definition) that ReplSlotSyncWorkerMain()
relies on this LIFO behavior? (not sure if there is other "strong"
LIFO requirement in other part of the code).I see other modules as well relying on LIFO behavior.
Please see applyparallelworker.c where
'before_shmem_exit(pa_shutdown)' is needed to be done after
'before_shmem_exit(logicalrep_worker_onexit)' (commit id 3d144c6).
Also in postinit.c, I see such comments atop
'before_shmem_exit(ShutdownPostgres, 0)'.
I feel we can skip adding this specific comment about
ReplSlotSyncWorkerMain() in ipc.c, as none of the other modules has
also not added any. I will address the rest of your comments in the
next version.3 ===
+ * Startup process during promotion waits for slot sync to finish
and it
+ * does that by checking the 'syncing' flag.
worth to mention ShutDownSlotSync()?
4 ===
I did a few tests manually (launching ShutDownSlotSync() through gdb
/ with and without sync worker and with / without
pg_sync_replication_slots() running
concurrently) and it looks like it works as designed.Thanks for testing it.
Having said that, the logic that is in place to take care of the
corner cases described up-thread seems reasonable to me.Please find v9 with the above comments addressed.
Thanks, the patch looks good to me. I also tested a few concurrent
promotion/function execution cases and didn't find issues.
Best Regards,
Hou zj
On Fri, Apr 19, 2024 at 1:52 PM shveta malik <shveta.malik@gmail.com> wrote:
Please find v9 with the above comments addressed.
I have made minor modifications in the comments and a function name.
Please see the attached top-up patch. Apart from this, the patch looks
good to me.
--
With Regards,
Amit Kapila.
Attachments:
v9_amit_1.patch.txttext/plain; charset=US-ASCII; name=v9_amit_1.patch.txtDownload
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 66fb46ac27..72a775b09c 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -79,10 +79,11 @@
* and also sets stopSignaled=true to handle the race condition when the
* postmaster has not noticed the promotion yet and thus may end up restarting
* the slot sync worker. If stopSignaled is set, the worker will exit in such a
- * case. Note that we don't need to reset this variable as after promotion the
- * slot sync worker won't be restarted because the pmState changes to PM_RUN from
- * PM_HOT_STANDBY and we don't support demoting primary without restarting the
- * server. See MaybeStartSlotSyncWorker.
+ * case. The SQL function pg_sync_replication_slots() will also error out if
+ * this flag is set. Note that we don't need to reset this variable as after
+ * promotion the slot sync worker won't be restarted because the pmState
+ * changes to PM_RUN from PM_HOT_STANDBY and we don't support demoting
+ * primary without restarting the server. See MaybeStartSlotSyncWorker.
*
* The 'syncing' flag is needed to prevent concurrent slot syncs to avoid slot
* overwrites.
@@ -1246,12 +1247,11 @@ wait_for_slot_activity(bool some_slot_updated)
}
/*
- * Check stopSignaled and syncing flags. Emit error if promotion has
- * already set stopSignaled or if it is concurrent sync call. Otherwise,
- * set 'syncing' flag and pid info.
+ * Emit an error if a promotion or a concurrent sync call is in progress.
+ * Otherwise, advertise that a sync is in progress.
*/
static void
-check_flags_and_set_sync_info(pid_t worker_pid)
+check_and_set_sync_info(pid_t worker_pid)
{
SpinLockAcquire(&SlotSyncCtx->mutex);
@@ -1259,9 +1259,8 @@ check_flags_and_set_sync_info(pid_t worker_pid)
Assert(worker_pid == InvalidPid || SlotSyncCtx->pid == InvalidPid);
/*
- * Startup process signaled the slot sync machinery to stop, so if
- * meanwhile postmaster ended up starting the worker again or user has
- * invoked pg_sync_replication_slots(), error out.
+ * Emit an error if startup process signaled the slot sync machinery to
+ * stop. See comments atop SlotSyncCtxStruct.
*/
if (SlotSyncCtx->stopSignaled)
{
@@ -1281,7 +1280,10 @@ check_flags_and_set_sync_info(pid_t worker_pid)
SlotSyncCtx->syncing = true;
- /* Advertise our PID so that the startup process can kill us on promotion */
+ /*
+ * Advertise the required PID so that the startup process can kill the slot
+ * sync worker on promotion.
+ */
SlotSyncCtx->pid = worker_pid;
SpinLockRelease(&SlotSyncCtx->mutex);
@@ -1333,13 +1335,13 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
/*
* Register slotsync_worker_onexit() before we register
- * ReplicationSlotShmemExit() in BaseInit(), to ensure that during exit of
- * slot sync worker, ReplicationSlotShmemExit() is called first, followed
- * by slotsync_worker_onexit(). Startup process during promotion calls
- * ShutDownSlotSync() which waits for slot sync to finish and it does that
- * by checking the 'syncing' flag. Thus it is important that worker should
- * be done with slots' release and cleanup before it actually marks itself
- * as finished syncing.
+ * ReplicationSlotShmemExit() in BaseInit(), to ensure that during the exit
+ * of the slot sync worker, ReplicationSlotShmemExit() is called first,
+ * followed by slotsync_worker_onexit(). The startup process during
+ * promotion invokes ShutDownSlotSync() which waits for slot sync to finish
+ * and it does that by checking the 'syncing' flag. Thus worker must be
+ * done with the slots' release and cleanup before it marks itself as
+ * finished syncing.
*/
before_shmem_exit(slotsync_worker_onexit, (Datum) 0);
@@ -1391,7 +1393,7 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
pqsignal(SIGPIPE, SIG_IGN);
pqsignal(SIGCHLD, SIG_DFL);
- check_flags_and_set_sync_info(MyProcPid);
+ check_and_set_sync_info(MyProcPid);
ereport(LOG, errmsg("slot sync worker started"));
@@ -1544,9 +1546,9 @@ update_synced_slots_inactive_since(void)
/*
* Shut down the slot sync worker.
*
- * It sends signal to shutdown slot sync worker. It also waits till
- * the slot sync worker has exited and pg_sync_replication_slots()
- * has finished.
+ * This function sends signal to shutdown slot sync worker, if required. It
+ * also waits till the slot sync worker has exited or
+ * pg_sync_replication_slots() has finished.
*/
void
ShutDownSlotSync(void)
@@ -1593,10 +1595,7 @@ ShutDownSlotSync(void)
SpinLockAcquire(&SlotSyncCtx->mutex);
- /*
- * Confirm that both the worker and the function
- * pg_sync_replication_slots() are done.
- */
+ /* Ensure that no process is syncing the slots. */
if (!SlotSyncCtx->syncing)
break;
@@ -1685,12 +1684,13 @@ slotsync_failure_callback(int code, Datum arg)
/*
* We need to do slots cleanup here just like WalSndErrorCleanup() does.
*
- * Startup process during promotion calls ShutDownSlotSync() which waits
- * for slot sync to finish and it does that by checking the 'syncing'
- * flag. Thus it is important that SQL function should be done with slots'
- * release and cleanup before it actually marks itself as finished
- * syncing.
+ * The startup process during promotion invokes ShutDownSlotSync() which
+ * waits for slot sync to finish and it does that by checking the 'syncing'
+ * flag. Thus the SQL function must be done with slots' release and cleanup
+ * to avoid any dangling temporary slots or active slots before it marks
+ * itself as finished syncing.
*/
+
/* Make sure active replication slots are released */
if (MyReplicationSlot != NULL)
ReplicationSlotRelease();
@@ -1699,9 +1699,9 @@ slotsync_failure_callback(int code, Datum arg)
ReplicationSlotCleanup();
/*
- * If syncing_slots is true, it indicates that the process errored out
- * without resetting the flag. So, we need to clean up shared memory and
- * reset the flag here.
+ * The set syncing_slots indicates that the process errored out without
+ * resetting the flag. So, we need to clean up shared memory and reset the
+ * flag here.
*/
if (syncing_slots)
reset_syncing_flag();
@@ -1718,7 +1718,7 @@ SyncReplicationSlots(WalReceiverConn *wrconn)
{
PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
{
- check_flags_and_set_sync_info(InvalidPid);
+ check_and_set_sync_info(InvalidPid);
validate_remote_info(wrconn);
On Mon, Apr 22, 2024 at 5:10 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Fri, Apr 19, 2024 at 1:52 PM shveta malik <shveta.malik@gmail.com> wrote:
Please find v9 with the above comments addressed.
I have made minor modifications in the comments and a function name.
Please see the attached top-up patch. Apart from this, the patch looks
good to me.
Thanks for the patch, the changes look good Amit. Please find the merged patch.
thanks
Shveta
Attachments:
v10-0001-Handle-stopSignaled-during-sync-function-call.patchapplication/octet-stream; name=v10-0001-Handle-stopSignaled-during-sync-function-call.patchDownload
From 6e35ac0a298817c534e15e4d4bee5b9fb9f9e830 Mon Sep 17 00:00:00 2001
From: Shveta Malik <shveta.malik@gmail.com>
Date: Fri, 5 Apr 2024 10:16:03 +0530
Subject: [PATCH v10] Handle stopSignaled during sync function call.
This patch attempts to fix below issues:
1) It implements promotion related handling needed for slot sync
SQL function pg_sync_replication_slots() similar to slot sync worker.
pg_sync_replication_slots() now checks 'stopSignaled' flag before
proceeding with slot sync; ShutDownSlotSync() OTOH checks 'syncing'
flag instead of 'SlotSyncCtx->pid' to ensure slot sync machinery
is shut down. It also changes the order of slot-sync cleanup
callbacks registration, so that we first release the replication slot,
then set pid to invalid one and then set syncing to false.
Setting 'syncing' to false needs to be performed as the last step of
cleanup so that shutdown flow during promotion can reliably exit
from wait-loop on seeing 'syncing' as false.
2) This patch fixes another issue in slot sync worker where
SignalHandlerForShutdownRequest() needs to be registered *before* setting
SlotSyncCtx->pid, otherwise the slotsync worker could miss to handle
SIGINT sent by the startup process(ShutDownSlotSync) if it is sent before
worker could register SignalHandlerForShutdownRequest(). To be consistent, all
signal handlers' registration is moved to a prior location before we set
worker's pid.
3) This patch fixes another issue in ShutDownSlotSync() where we perform
kill(SlotSyncCtx->pid) after releasing spin-lock. There are chances that
after we release spin-lock and before we perform kill, slot-sync worker
has error-ed out and has set SlotSyncCtx->pid to InvalidPid (-1) and thus
kill(-1) could result in abnormal process kills on some platforms. Now,
we capture SlotSyncCtx->pid in a local variable under spin-lock and later
use that to perform kill.
---
doc/src/sgml/func.sgml | 4 +
src/backend/replication/logical/slotsync.c | 272 ++++++++++++++-------
2 files changed, 186 insertions(+), 90 deletions(-)
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 92a0f49e6a..e3e22645ed 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -29349,6 +29349,10 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
standby server. Temporary synced slots, if any, cannot be used for
logical decoding and must be dropped after promotion. See
<xref linkend="logicaldecoding-replication-slots-synchronization"/> for details.
+ Note that this function cannot be executed if
+ <link linkend="guc-sync-replication-slots"><varname>
+ sync_replication_slots</varname></link> is enabled and the slotsync
+ worker is already running to perform the synchronization of slots.
</para>
<caution>
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index cb39adcd0e..7fb4733504 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -79,10 +79,11 @@
* and also sets stopSignaled=true to handle the race condition when the
* postmaster has not noticed the promotion yet and thus may end up restarting
* the slot sync worker. If stopSignaled is set, the worker will exit in such a
- * case. Note that we don't need to reset this variable as after promotion the
- * slot sync worker won't be restarted because the pmState changes to PM_RUN from
- * PM_HOT_STANDBY and we don't support demoting primary without restarting the
- * server. See MaybeStartSlotSyncWorker.
+ * case. The SQL function pg_sync_replication_slots() will also error out if
+ * this flag is set. Note that we don't need to reset this variable as after
+ * promotion the slot sync worker won't be restarted because the pmState
+ * changes to PM_RUN from PM_HOT_STANDBY and we don't support demoting
+ * primary without restarting the server. See MaybeStartSlotSyncWorker.
*
* The 'syncing' flag is needed to prevent concurrent slot syncs to avoid slot
* overwrites.
@@ -92,9 +93,6 @@
* is expected (e.g., slot sync GUCs change), slot sync worker will reset
* last_start_time before exiting, so that postmaster can start the worker
* without waiting for SLOTSYNC_RESTART_INTERVAL_SEC.
- *
- * All the fields except 'syncing' are used only by slotsync worker.
- * 'syncing' is used both by worker and SQL function pg_sync_replication_slots.
*/
typedef struct SlotSyncCtxStruct
{
@@ -807,20 +805,6 @@ synchronize_slots(WalReceiverConn *wrconn)
" FROM pg_catalog.pg_replication_slots"
" WHERE failover and NOT temporary";
- SpinLockAcquire(&SlotSyncCtx->mutex);
- if (SlotSyncCtx->syncing)
- {
- SpinLockRelease(&SlotSyncCtx->mutex);
- ereport(ERROR,
- errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("cannot synchronize replication slots concurrently"));
- }
-
- SlotSyncCtx->syncing = true;
- SpinLockRelease(&SlotSyncCtx->mutex);
-
- syncing_slots = true;
-
/* The syscache access in walrcv_exec() needs a transaction env. */
if (!IsTransactionState())
{
@@ -937,12 +921,6 @@ synchronize_slots(WalReceiverConn *wrconn)
if (started_tx)
CommitTransactionCommand();
- SpinLockAcquire(&SlotSyncCtx->mutex);
- SlotSyncCtx->syncing = false;
- SpinLockRelease(&SlotSyncCtx->mutex);
-
- syncing_slots = false;
-
return some_slot_updated;
}
@@ -1190,6 +1168,19 @@ ProcessSlotSyncInterrupts(WalReceiverConn *wrconn)
slotsync_reread_config();
}
+/*
+ * Connection cleanup function for slotsync worker.
+ *
+ * Called on slotsync worker exit.
+ */
+static void
+slotsync_worker_disconnect(int code, Datum arg)
+{
+ WalReceiverConn *wrconn = (WalReceiverConn *) DatumGetPointer(arg);
+
+ walrcv_disconnect(wrconn);
+}
+
/*
* Cleanup function for slotsync worker.
*
@@ -1199,7 +1190,20 @@ static void
slotsync_worker_onexit(int code, Datum arg)
{
SpinLockAcquire(&SlotSyncCtx->mutex);
+
SlotSyncCtx->pid = InvalidPid;
+
+ /*
+ * If syncing_slots is true, it indicates that the process errored out
+ * without resetting the flag. So, we need to clean up shared memory and
+ * reset the flag here.
+ */
+ if (syncing_slots)
+ {
+ SlotSyncCtx->syncing = false;
+ syncing_slots = false;
+ }
+
SpinLockRelease(&SlotSyncCtx->mutex);
}
@@ -1242,6 +1246,64 @@ wait_for_slot_activity(bool some_slot_updated)
ResetLatch(MyLatch);
}
+/*
+ * Emit an error if a promotion or a concurrent sync call is in progress.
+ * Otherwise, advertise that a sync is in progress.
+ */
+static void
+check_and_set_sync_info(pid_t worker_pid)
+{
+ SpinLockAcquire(&SlotSyncCtx->mutex);
+
+ /* The worker pid must not be already assigned in SlotSyncCtx */
+ Assert(worker_pid == InvalidPid || SlotSyncCtx->pid == InvalidPid);
+
+ /*
+ * Emit an error if startup process signaled the slot sync machinery to
+ * stop. See comments atop SlotSyncCtxStruct.
+ */
+ if (SlotSyncCtx->stopSignaled)
+ {
+ SpinLockRelease(&SlotSyncCtx->mutex);
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot synchronize replication slots when standby promotion is ongoing"));
+ }
+
+ if (SlotSyncCtx->syncing)
+ {
+ SpinLockRelease(&SlotSyncCtx->mutex);
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot synchronize replication slots concurrently"));
+ }
+
+ SlotSyncCtx->syncing = true;
+
+ /*
+ * Advertise the required PID so that the startup process can kill the
+ * slot sync worker on promotion.
+ */
+ SlotSyncCtx->pid = worker_pid;
+
+ SpinLockRelease(&SlotSyncCtx->mutex);
+
+ syncing_slots = true;
+}
+
+/*
+ * Reset syncing flag.
+ */
+static void
+reset_syncing_flag()
+{
+ SpinLockAcquire(&SlotSyncCtx->mutex);
+ SlotSyncCtx->syncing = false;
+ SpinLockRelease(&SlotSyncCtx->mutex);
+
+ syncing_slots = false;
+};
+
/*
* The main loop of our worker process.
*
@@ -1272,52 +1334,23 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
InitProcess();
/*
- * Early initialization.
+ * Register slotsync_worker_onexit() before we register
+ * ReplicationSlotShmemExit() in BaseInit(), to ensure that during the
+ * exit of the slot sync worker, ReplicationSlotShmemExit() is called
+ * first, followed by slotsync_worker_onexit(). The startup process during
+ * promotion invokes ShutDownSlotSync() which waits for slot sync to
+ * finish and it does that by checking the 'syncing' flag. Thus worker
+ * must be done with the slots' release and cleanup before it marks itself
+ * as finished syncing.
*/
- BaseInit();
-
- Assert(SlotSyncCtx != NULL);
-
- SpinLockAcquire(&SlotSyncCtx->mutex);
- Assert(SlotSyncCtx->pid == InvalidPid);
-
- /*
- * Startup process signaled the slot sync worker to stop, so if meanwhile
- * postmaster ended up starting the worker again, exit.
- */
- if (SlotSyncCtx->stopSignaled)
- {
- SpinLockRelease(&SlotSyncCtx->mutex);
- proc_exit(0);
- }
-
- /* Advertise our PID so that the startup process can kill us on promotion */
- SlotSyncCtx->pid = MyProcPid;
- SpinLockRelease(&SlotSyncCtx->mutex);
-
- ereport(LOG, errmsg("slot sync worker started"));
-
- /* Register it as soon as SlotSyncCtx->pid is initialized. */
before_shmem_exit(slotsync_worker_onexit, (Datum) 0);
- /* Setup signal handling */
- pqsignal(SIGHUP, SignalHandlerForConfigReload);
- pqsignal(SIGINT, SignalHandlerForShutdownRequest);
- pqsignal(SIGTERM, die);
- pqsignal(SIGFPE, FloatExceptionHandler);
- pqsignal(SIGUSR1, procsignal_sigusr1_handler);
- pqsignal(SIGUSR2, SIG_IGN);
- pqsignal(SIGPIPE, SIG_IGN);
- pqsignal(SIGCHLD, SIG_DFL);
-
/*
- * Establishes SIGALRM handler and initialize timeout module. It is needed
- * by InitPostgres to register different timeouts.
+ * Early initialization.
*/
- InitializeTimeouts();
+ BaseInit();
- /* Load the libpq-specific functions */
- load_file("libpqwalreceiver", false);
+ Assert(SlotSyncCtx != NULL);
/*
* If an exception is encountered, processing resumes here.
@@ -1350,6 +1383,29 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
/* We can now handle ereport(ERROR) */
PG_exception_stack = &local_sigjmp_buf;
+ /* Setup signal handling */
+ pqsignal(SIGHUP, SignalHandlerForConfigReload);
+ pqsignal(SIGINT, SignalHandlerForShutdownRequest);
+ pqsignal(SIGTERM, die);
+ pqsignal(SIGFPE, FloatExceptionHandler);
+ pqsignal(SIGUSR1, procsignal_sigusr1_handler);
+ pqsignal(SIGUSR2, SIG_IGN);
+ pqsignal(SIGPIPE, SIG_IGN);
+ pqsignal(SIGCHLD, SIG_DFL);
+
+ check_and_set_sync_info(MyProcPid);
+
+ ereport(LOG, errmsg("slot sync worker started"));
+
+ /*
+ * Establishes SIGALRM handler and initialize timeout module. It is needed
+ * by InitPostgres to register different timeouts.
+ */
+ InitializeTimeouts();
+
+ /* Load the libpq-specific functions */
+ load_file("libpqwalreceiver", false);
+
/*
* Unblock signals (they were blocked when the postmaster forked us)
*/
@@ -1402,13 +1458,13 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
errmsg("could not connect to the primary server: %s", err));
/*
- * Register the failure callback once we have the connection.
+ * Register the disconnection callback.
*
- * XXX: This can be combined with previous such cleanup registration of
+ * XXX: This can be combined with previous cleanup registration of
* slotsync_worker_onexit() but that will need the connection to be made
* global and we want to avoid introducing global for this purpose.
*/
- before_shmem_exit(slotsync_failure_callback, PointerGetDatum(wrconn));
+ before_shmem_exit(slotsync_worker_disconnect, PointerGetDatum(wrconn));
/*
* Using the specified primary server connection, check that we are not a
@@ -1457,8 +1513,8 @@ update_synced_slots_inactive_since(void)
if (!StandbyMode)
return;
- /* The slot sync worker mustn't be running by now */
- Assert(SlotSyncCtx->pid == InvalidPid);
+ /* The slot sync worker or SQL function mustn't be running by now */
+ Assert((SlotSyncCtx->pid == InvalidPid) && !SlotSyncCtx->syncing);
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
@@ -1471,6 +1527,9 @@ update_synced_slots_inactive_since(void)
{
Assert(SlotIsLogical(s));
+ /* The slot must not be acquired by any process */
+ Assert(s->active_pid == 0);
+
/* Use the same inactive_since time for all the slots. */
if (now == 0)
now = GetCurrentTimestamp();
@@ -1486,25 +1545,39 @@ update_synced_slots_inactive_since(void)
/*
* Shut down the slot sync worker.
+ *
+ * This function sends signal to shutdown slot sync worker, if required. It
+ * also waits till the slot sync worker has exited or
+ * pg_sync_replication_slots() has finished.
*/
void
ShutDownSlotSync(void)
{
+ pid_t worker_pid;
+
SpinLockAcquire(&SlotSyncCtx->mutex);
SlotSyncCtx->stopSignaled = true;
- if (SlotSyncCtx->pid == InvalidPid)
+ /*
+ * Return if neither the slot sync worker is running nor the function
+ * pg_sync_replication_slots() is executing.
+ */
+ if (!SlotSyncCtx->syncing)
{
SpinLockRelease(&SlotSyncCtx->mutex);
update_synced_slots_inactive_since();
return;
}
+
+ worker_pid = SlotSyncCtx->pid;
+
SpinLockRelease(&SlotSyncCtx->mutex);
- kill(SlotSyncCtx->pid, SIGINT);
+ if (worker_pid != InvalidPid)
+ kill(worker_pid, SIGINT);
- /* Wait for it to die */
+ /* Wait for slot sync to end */
for (;;)
{
int rc;
@@ -1522,8 +1595,8 @@ ShutDownSlotSync(void)
SpinLockAcquire(&SlotSyncCtx->mutex);
- /* Is it gone? */
- if (SlotSyncCtx->pid == InvalidPid)
+ /* Ensure that no process is syncing the slots. */
+ if (!SlotSyncCtx->syncing)
break;
SpinLockRelease(&SlotSyncCtx->mutex);
@@ -1601,26 +1674,37 @@ SlotSyncShmemInit(void)
}
/*
- * Error cleanup callback for slot synchronization.
+ * Error cleanup callback for slot sync SQL function.
*/
static void
slotsync_failure_callback(int code, Datum arg)
{
WalReceiverConn *wrconn = (WalReceiverConn *) DatumGetPointer(arg);
- if (syncing_slots)
- {
- /*
- * If syncing_slots is true, it indicates that the process errored out
- * without resetting the flag. So, we need to clean up shared memory
- * and reset the flag here.
- */
- SpinLockAcquire(&SlotSyncCtx->mutex);
- SlotSyncCtx->syncing = false;
- SpinLockRelease(&SlotSyncCtx->mutex);
+ /*
+ * We need to do slots cleanup here just like WalSndErrorCleanup() does.
+ *
+ * The startup process during promotion invokes ShutDownSlotSync() which
+ * waits for slot sync to finish and it does that by checking the
+ * 'syncing' flag. Thus the SQL function must be done with slots' release
+ * and cleanup to avoid any dangling temporary slots or active slots
+ * before it marks itself as finished syncing.
+ */
- syncing_slots = false;
- }
+ /* Make sure active replication slots are released */
+ if (MyReplicationSlot != NULL)
+ ReplicationSlotRelease();
+
+ /* Also cleanup all the temporary slots. */
+ ReplicationSlotCleanup();
+
+ /*
+ * The set syncing_slots indicates that the process errored out without
+ * resetting the flag. So, we need to clean up shared memory and reset the
+ * flag here.
+ */
+ if (syncing_slots)
+ reset_syncing_flag();
walrcv_disconnect(wrconn);
}
@@ -1634,9 +1718,17 @@ SyncReplicationSlots(WalReceiverConn *wrconn)
{
PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
{
+ check_and_set_sync_info(InvalidPid);
+
validate_remote_info(wrconn);
synchronize_slots(wrconn);
+
+ /* Cleanup the temporary slots */
+ ReplicationSlotCleanup();
+
+ /* We are done with sync, so reset sync flag */
+ reset_syncing_flag();
}
PG_END_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
}
--
2.34.1
On Mon, Apr 22, 2024 at 9:02 PM shveta malik <shveta.malik@gmail.com> wrote:
On Mon, Apr 22, 2024 at 5:10 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Fri, Apr 19, 2024 at 1:52 PM shveta malik <shveta.malik@gmail.com> wrote:
Please find v9 with the above comments addressed.
I have made minor modifications in the comments and a function name.
Please see the attached top-up patch. Apart from this, the patch looks
good to me.Thanks for the patch, the changes look good Amit. Please find the merged patch.
I've reviewed the patch and have some comments:
---
/*
- * Early initialization.
+ * Register slotsync_worker_onexit() before we register
+ * ReplicationSlotShmemExit() in BaseInit(), to ensure that during the
+ * exit of the slot sync worker, ReplicationSlotShmemExit() is called
+ * first, followed by slotsync_worker_onexit(). The startup process during
+ * promotion invokes ShutDownSlotSync() which waits for slot sync to
+ * finish and it does that by checking the 'syncing' flag. Thus worker
+ * must be done with the slots' release and cleanup before it marks itself
+ * as finished syncing.
*/
I'm slightly worried that we register the slotsync_worker_onexit()
callback before BaseInit(), because it could be a blocker when we want
to add more work in the callback, for example sending the stats.
---
synchronize_slots(wrconn);
+
+ /* Cleanup the temporary slots */
+ ReplicationSlotCleanup();
+
+ /* We are done with sync, so reset sync flag */
+ reset_syncing_flag();
I think it ends up removing other temp slots that are created by the
same backend process using
pg_create_{physical,logical_replication_slots() function, which could
be a large side effect of this function for users. Also, if users want
to have a process periodically calling pg_sync_replication_slots()
instead of the slotsync worker, it doesn't support a case where we
create a temp not-ready slot and turn it into a persistent slot if
it's ready for sync.
Regards,
--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com
On Mon, Apr 22, 2024 at 7:04 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
On Mon, Apr 22, 2024 at 9:02 PM shveta malik <shveta.malik@gmail.com> wrote:
Thanks for the patch, the changes look good Amit. Please find the merged patch.
I've reviewed the patch and have some comments:
--- /* - * Early initialization. + * Register slotsync_worker_onexit() before we register + * ReplicationSlotShmemExit() in BaseInit(), to ensure that during the + * exit of the slot sync worker, ReplicationSlotShmemExit() is called + * first, followed by slotsync_worker_onexit(). The startup process during + * promotion invokes ShutDownSlotSync() which waits for slot sync to + * finish and it does that by checking the 'syncing' flag. Thus worker + * must be done with the slots' release and cleanup before it marks itself + * as finished syncing. */I'm slightly worried that we register the slotsync_worker_onexit()
callback before BaseInit(), because it could be a blocker when we want
to add more work in the callback, for example sending the stats.
The other possibility is that we do slot release/clean up in the
slotsync_worker_onexit() call itself and then we can do it after
BaseInit(). Do you have any other/better idea for this?
--- synchronize_slots(wrconn); + + /* Cleanup the temporary slots */ + ReplicationSlotCleanup(); + + /* We are done with sync, so reset sync flag */ + reset_syncing_flag();I think it ends up removing other temp slots that are created by the
same backend process using
pg_create_{physical,logical_replication_slots() function, which could
be a large side effect of this function for users.
True, I think here we should either remove only temporary and synced
marked slots. The other possibility is to create slots as RS_EPHEMERAL
initially when called from the SQL function but that doesn't sound
like a neat approach.
Also, if users want
to have a process periodically calling pg_sync_replication_slots()
instead of the slotsync worker, it doesn't support a case where we
create a temp not-ready slot and turn it into a persistent slot if
it's ready for sync.
True, but eventually the API should be able to directly create the
persistent slots and anyway this can happen only for the first time
(till the slots are created and marked persistent) and one who wants
to use this function periodically should be able to see regular syncs.
OTOH, leaving temp slots created via this API could remain as-is after
promotion and we need to document for users to remove such slots. Now,
we can do that if we want but I think it is better to clean up such
slots rather than putting the onus on users to remove them after
promotion.
--
With Regards,
Amit Kapila.
On Tue, Apr 23, 2024 at 9:07 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Mon, Apr 22, 2024 at 7:04 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
On Mon, Apr 22, 2024 at 9:02 PM shveta malik <shveta.malik@gmail.com> wrote:
Thanks for the patch, the changes look good Amit. Please find the merged patch.
I've reviewed the patch and have some comments:
Thanks for the comments.
--- /* - * Early initialization. + * Register slotsync_worker_onexit() before we register + * ReplicationSlotShmemExit() in BaseInit(), to ensure that during the + * exit of the slot sync worker, ReplicationSlotShmemExit() is called + * first, followed by slotsync_worker_onexit(). The startup process during + * promotion invokes ShutDownSlotSync() which waits for slot sync to + * finish and it does that by checking the 'syncing' flag. Thus worker + * must be done with the slots' release and cleanup before it marks itself + * as finished syncing. */I'm slightly worried that we register the slotsync_worker_onexit()
callback before BaseInit(), because it could be a blocker when we want
to add more work in the callback, for example sending the stats.The other possibility is that we do slot release/clean up in the
slotsync_worker_onexit() call itself and then we can do it after
BaseInit(). Do you have any other/better idea for this?
I have currently implemented it this way in v11.
--- synchronize_slots(wrconn); + + /* Cleanup the temporary slots */ + ReplicationSlotCleanup(); + + /* We are done with sync, so reset sync flag */ + reset_syncing_flag();I think it ends up removing other temp slots that are created by the
same backend process using
pg_create_{physical,logical_replication_slots() function, which could
be a large side effect of this function for users.
Yes, this is a problem. Thanks for catching it.
True, I think here we should either remove only temporary and synced
marked slots. The other possibility is to create slots as RS_EPHEMERAL
initially when called from the SQL function but that doesn't sound
like a neat approach.
Modified the logic to remove only synced temporary slots during
SQL-function exit.
Please find v11 with above changes.
thanks
Shveta
Attachments:
v11-0001-Handle-stopSignaled-during-sync-function-call.patchapplication/octet-stream; name=v11-0001-Handle-stopSignaled-during-sync-function-call.patchDownload
From b0ac6625ca925b7f8981d99c5e85e4758297054c Mon Sep 17 00:00:00 2001
From: Shveta Malik <shveta.malik@gmail.com>
Date: Fri, 5 Apr 2024 10:16:03 +0530
Subject: [PATCH v11] Handle stopSignaled during sync function call.
This patch attempts to fix below issues:
1) It implements promotion related handling needed for slot sync
SQL function pg_sync_replication_slots() similar to slot sync worker.
pg_sync_replication_slots() now checks 'stopSignaled' flag before
proceeding with slot sync; ShutDownSlotSync() OTOH checks 'syncing'
flag instead of 'SlotSyncCtx->pid' to ensure slot sync machinery
is shut down. It also changes the order of slot-sync cleanup, so
that we first release the replication slot, then set pid to invalid
one and syncing to false. Setting 'syncing' to false needs to be
performed as the last step of cleanup so that shutdown flow during
promotion can reliably exit from wait-loop on seeing 'syncing' as
false.
2) This patch fixes another issue in slot sync worker where
SignalHandlerForShutdownRequest() needs to be registered *before* setting
SlotSyncCtx->pid, otherwise the slotsync worker could miss to handle
SIGINT sent by the startup process(ShutDownSlotSync) if it is sent before
worker could register SignalHandlerForShutdownRequest(). To be consistent, all
signal handlers' registration is moved to a prior location before we set
worker's pid.
3) This patch fixes another issue in ShutDownSlotSync() where we perform
kill(SlotSyncCtx->pid) after releasing spin-lock. There are chances that
after we release spin-lock and before we perform kill, slot-sync worker
has error-ed out and has set SlotSyncCtx->pid to InvalidPid (-1) and thus
kill(-1) could result in abnormal process kills on some platforms. Now,
we capture SlotSyncCtx->pid in a local variable under spin-lock and later
use that to perform kill.
---
doc/src/sgml/func.sgml | 4 +
src/backend/replication/logical/slotsync.c | 282 ++++++++++++++-------
src/backend/replication/slot.c | 12 +-
src/backend/replication/walsender.c | 2 +-
src/backend/tcop/postgres.c | 2 +-
src/include/replication/slot.h | 2 +-
6 files changed, 206 insertions(+), 98 deletions(-)
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 92a0f49e6a..e3e22645ed 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -29349,6 +29349,10 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
standby server. Temporary synced slots, if any, cannot be used for
logical decoding and must be dropped after promotion. See
<xref linkend="logicaldecoding-replication-slots-synchronization"/> for details.
+ Note that this function cannot be executed if
+ <link linkend="guc-sync-replication-slots"><varname>
+ sync_replication_slots</varname></link> is enabled and the slotsync
+ worker is already running to perform the synchronization of slots.
</para>
<caution>
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index cb39adcd0e..578cfce896 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -79,10 +79,11 @@
* and also sets stopSignaled=true to handle the race condition when the
* postmaster has not noticed the promotion yet and thus may end up restarting
* the slot sync worker. If stopSignaled is set, the worker will exit in such a
- * case. Note that we don't need to reset this variable as after promotion the
- * slot sync worker won't be restarted because the pmState changes to PM_RUN from
- * PM_HOT_STANDBY and we don't support demoting primary without restarting the
- * server. See MaybeStartSlotSyncWorker.
+ * case. The SQL function pg_sync_replication_slots() will also error out if
+ * this flag is set. Note that we don't need to reset this variable as after
+ * promotion the slot sync worker won't be restarted because the pmState
+ * changes to PM_RUN from PM_HOT_STANDBY and we don't support demoting
+ * primary without restarting the server. See MaybeStartSlotSyncWorker.
*
* The 'syncing' flag is needed to prevent concurrent slot syncs to avoid slot
* overwrites.
@@ -92,9 +93,6 @@
* is expected (e.g., slot sync GUCs change), slot sync worker will reset
* last_start_time before exiting, so that postmaster can start the worker
* without waiting for SLOTSYNC_RESTART_INTERVAL_SEC.
- *
- * All the fields except 'syncing' are used only by slotsync worker.
- * 'syncing' is used both by worker and SQL function pg_sync_replication_slots.
*/
typedef struct SlotSyncCtxStruct
{
@@ -807,20 +805,6 @@ synchronize_slots(WalReceiverConn *wrconn)
" FROM pg_catalog.pg_replication_slots"
" WHERE failover and NOT temporary";
- SpinLockAcquire(&SlotSyncCtx->mutex);
- if (SlotSyncCtx->syncing)
- {
- SpinLockRelease(&SlotSyncCtx->mutex);
- ereport(ERROR,
- errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("cannot synchronize replication slots concurrently"));
- }
-
- SlotSyncCtx->syncing = true;
- SpinLockRelease(&SlotSyncCtx->mutex);
-
- syncing_slots = true;
-
/* The syscache access in walrcv_exec() needs a transaction env. */
if (!IsTransactionState())
{
@@ -937,12 +921,6 @@ synchronize_slots(WalReceiverConn *wrconn)
if (started_tx)
CommitTransactionCommand();
- SpinLockAcquire(&SlotSyncCtx->mutex);
- SlotSyncCtx->syncing = false;
- SpinLockRelease(&SlotSyncCtx->mutex);
-
- syncing_slots = false;
-
return some_slot_updated;
}
@@ -1190,6 +1168,19 @@ ProcessSlotSyncInterrupts(WalReceiverConn *wrconn)
slotsync_reread_config();
}
+/*
+ * Connection cleanup function for slotsync worker.
+ *
+ * Called on slotsync worker exit.
+ */
+static void
+slotsync_worker_disconnect(int code, Datum arg)
+{
+ WalReceiverConn *wrconn = (WalReceiverConn *) DatumGetPointer(arg);
+
+ walrcv_disconnect(wrconn);
+}
+
/*
* Cleanup function for slotsync worker.
*
@@ -1198,8 +1189,38 @@ ProcessSlotSyncInterrupts(WalReceiverConn *wrconn)
static void
slotsync_worker_onexit(int code, Datum arg)
{
+ /*
+ * We need to do slots cleanup here just like WalSndErrorCleanup() does.
+ *
+ * The startup process during promotion invokes ShutDownSlotSync() which
+ * waits for slot sync to finish and it does that by checking the
+ * 'syncing' flag. Thus the slot sync worker must be done with slots'
+ * release and cleanup to avoid any dangling temporary slots or active
+ * slots before it marks itself as finished syncing.
+ */
+
+ /* Make sure active replication slots are released */
+ if (MyReplicationSlot != NULL)
+ ReplicationSlotRelease();
+
+ /* Also cleanup the temporary slots. */
+ ReplicationSlotCleanup(false);
+
SpinLockAcquire(&SlotSyncCtx->mutex);
+
SlotSyncCtx->pid = InvalidPid;
+
+ /*
+ * If syncing_slots is true, it indicates that the process errored out
+ * without resetting the flag. So, we need to clean up shared memory and
+ * reset the flag here.
+ */
+ if (syncing_slots)
+ {
+ SlotSyncCtx->syncing = false;
+ syncing_slots = false;
+ }
+
SpinLockRelease(&SlotSyncCtx->mutex);
}
@@ -1242,6 +1263,64 @@ wait_for_slot_activity(bool some_slot_updated)
ResetLatch(MyLatch);
}
+/*
+ * Emit an error if a promotion or a concurrent sync call is in progress.
+ * Otherwise, advertise that a sync is in progress.
+ */
+static void
+check_and_set_sync_info(pid_t worker_pid)
+{
+ SpinLockAcquire(&SlotSyncCtx->mutex);
+
+ /* The worker pid must not be already assigned in SlotSyncCtx */
+ Assert(worker_pid == InvalidPid || SlotSyncCtx->pid == InvalidPid);
+
+ /*
+ * Emit an error if startup process signaled the slot sync machinery to
+ * stop. See comments atop SlotSyncCtxStruct.
+ */
+ if (SlotSyncCtx->stopSignaled)
+ {
+ SpinLockRelease(&SlotSyncCtx->mutex);
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot synchronize replication slots when standby promotion is ongoing"));
+ }
+
+ if (SlotSyncCtx->syncing)
+ {
+ SpinLockRelease(&SlotSyncCtx->mutex);
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot synchronize replication slots concurrently"));
+ }
+
+ SlotSyncCtx->syncing = true;
+
+ /*
+ * Advertise the required PID so that the startup process can kill the
+ * slot sync worker on promotion.
+ */
+ SlotSyncCtx->pid = worker_pid;
+
+ SpinLockRelease(&SlotSyncCtx->mutex);
+
+ syncing_slots = true;
+}
+
+/*
+ * Reset syncing flag.
+ */
+static void
+reset_syncing_flag()
+{
+ SpinLockAcquire(&SlotSyncCtx->mutex);
+ SlotSyncCtx->syncing = false;
+ SpinLockRelease(&SlotSyncCtx->mutex);
+
+ syncing_slots = false;
+};
+
/*
* The main loop of our worker process.
*
@@ -1278,47 +1357,6 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
Assert(SlotSyncCtx != NULL);
- SpinLockAcquire(&SlotSyncCtx->mutex);
- Assert(SlotSyncCtx->pid == InvalidPid);
-
- /*
- * Startup process signaled the slot sync worker to stop, so if meanwhile
- * postmaster ended up starting the worker again, exit.
- */
- if (SlotSyncCtx->stopSignaled)
- {
- SpinLockRelease(&SlotSyncCtx->mutex);
- proc_exit(0);
- }
-
- /* Advertise our PID so that the startup process can kill us on promotion */
- SlotSyncCtx->pid = MyProcPid;
- SpinLockRelease(&SlotSyncCtx->mutex);
-
- ereport(LOG, errmsg("slot sync worker started"));
-
- /* Register it as soon as SlotSyncCtx->pid is initialized. */
- before_shmem_exit(slotsync_worker_onexit, (Datum) 0);
-
- /* Setup signal handling */
- pqsignal(SIGHUP, SignalHandlerForConfigReload);
- pqsignal(SIGINT, SignalHandlerForShutdownRequest);
- pqsignal(SIGTERM, die);
- pqsignal(SIGFPE, FloatExceptionHandler);
- pqsignal(SIGUSR1, procsignal_sigusr1_handler);
- pqsignal(SIGUSR2, SIG_IGN);
- pqsignal(SIGPIPE, SIG_IGN);
- pqsignal(SIGCHLD, SIG_DFL);
-
- /*
- * Establishes SIGALRM handler and initialize timeout module. It is needed
- * by InitPostgres to register different timeouts.
- */
- InitializeTimeouts();
-
- /* Load the libpq-specific functions */
- load_file("libpqwalreceiver", false);
-
/*
* If an exception is encountered, processing resumes here.
*
@@ -1350,6 +1388,32 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
/* We can now handle ereport(ERROR) */
PG_exception_stack = &local_sigjmp_buf;
+ /* Setup signal handling */
+ pqsignal(SIGHUP, SignalHandlerForConfigReload);
+ pqsignal(SIGINT, SignalHandlerForShutdownRequest);
+ pqsignal(SIGTERM, die);
+ pqsignal(SIGFPE, FloatExceptionHandler);
+ pqsignal(SIGUSR1, procsignal_sigusr1_handler);
+ pqsignal(SIGUSR2, SIG_IGN);
+ pqsignal(SIGPIPE, SIG_IGN);
+ pqsignal(SIGCHLD, SIG_DFL);
+
+ check_and_set_sync_info(MyProcPid);
+
+ ereport(LOG, errmsg("slot sync worker started"));
+
+ /* Register it as soon as SlotSyncCtx->pid is initialized. */
+ before_shmem_exit(slotsync_worker_onexit, (Datum) 0);
+
+ /*
+ * Establishes SIGALRM handler and initialize timeout module. It is needed
+ * by InitPostgres to register different timeouts.
+ */
+ InitializeTimeouts();
+
+ /* Load the libpq-specific functions */
+ load_file("libpqwalreceiver", false);
+
/*
* Unblock signals (they were blocked when the postmaster forked us)
*/
@@ -1402,13 +1466,13 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
errmsg("could not connect to the primary server: %s", err));
/*
- * Register the failure callback once we have the connection.
+ * Register the disconnection callback.
*
- * XXX: This can be combined with previous such cleanup registration of
+ * XXX: This can be combined with previous cleanup registration of
* slotsync_worker_onexit() but that will need the connection to be made
* global and we want to avoid introducing global for this purpose.
*/
- before_shmem_exit(slotsync_failure_callback, PointerGetDatum(wrconn));
+ before_shmem_exit(slotsync_worker_disconnect, PointerGetDatum(wrconn));
/*
* Using the specified primary server connection, check that we are not a
@@ -1457,8 +1521,8 @@ update_synced_slots_inactive_since(void)
if (!StandbyMode)
return;
- /* The slot sync worker mustn't be running by now */
- Assert(SlotSyncCtx->pid == InvalidPid);
+ /* The slot sync worker or SQL function mustn't be running by now */
+ Assert((SlotSyncCtx->pid == InvalidPid) && !SlotSyncCtx->syncing);
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
@@ -1471,6 +1535,9 @@ update_synced_slots_inactive_since(void)
{
Assert(SlotIsLogical(s));
+ /* The slot must not be acquired by any process */
+ Assert(s->active_pid == 0);
+
/* Use the same inactive_since time for all the slots. */
if (now == 0)
now = GetCurrentTimestamp();
@@ -1486,25 +1553,39 @@ update_synced_slots_inactive_since(void)
/*
* Shut down the slot sync worker.
+ *
+ * This function sends signal to shutdown slot sync worker, if required. It
+ * also waits till the slot sync worker has exited or
+ * pg_sync_replication_slots() has finished.
*/
void
ShutDownSlotSync(void)
{
+ pid_t worker_pid;
+
SpinLockAcquire(&SlotSyncCtx->mutex);
SlotSyncCtx->stopSignaled = true;
- if (SlotSyncCtx->pid == InvalidPid)
+ /*
+ * Return if neither the slot sync worker is running nor the function
+ * pg_sync_replication_slots() is executing.
+ */
+ if (!SlotSyncCtx->syncing)
{
SpinLockRelease(&SlotSyncCtx->mutex);
update_synced_slots_inactive_since();
return;
}
+
+ worker_pid = SlotSyncCtx->pid;
+
SpinLockRelease(&SlotSyncCtx->mutex);
- kill(SlotSyncCtx->pid, SIGINT);
+ if (worker_pid != InvalidPid)
+ kill(worker_pid, SIGINT);
- /* Wait for it to die */
+ /* Wait for slot sync to end */
for (;;)
{
int rc;
@@ -1522,8 +1603,8 @@ ShutDownSlotSync(void)
SpinLockAcquire(&SlotSyncCtx->mutex);
- /* Is it gone? */
- if (SlotSyncCtx->pid == InvalidPid)
+ /* Ensure that no process is syncing the slots. */
+ if (!SlotSyncCtx->syncing)
break;
SpinLockRelease(&SlotSyncCtx->mutex);
@@ -1601,26 +1682,37 @@ SlotSyncShmemInit(void)
}
/*
- * Error cleanup callback for slot synchronization.
+ * Error cleanup callback for slot sync SQL function.
*/
static void
slotsync_failure_callback(int code, Datum arg)
{
WalReceiverConn *wrconn = (WalReceiverConn *) DatumGetPointer(arg);
- if (syncing_slots)
- {
- /*
- * If syncing_slots is true, it indicates that the process errored out
- * without resetting the flag. So, we need to clean up shared memory
- * and reset the flag here.
- */
- SpinLockAcquire(&SlotSyncCtx->mutex);
- SlotSyncCtx->syncing = false;
- SpinLockRelease(&SlotSyncCtx->mutex);
+ /*
+ * We need to do slots cleanup here just like WalSndErrorCleanup() does.
+ *
+ * The startup process during promotion invokes ShutDownSlotSync() which
+ * waits for slot sync to finish and it does that by checking the
+ * 'syncing' flag. Thus the SQL function must be done with slots' release
+ * and cleanup to avoid any dangling temporary slots or active slots
+ * before it marks itself as finished syncing.
+ */
- syncing_slots = false;
- }
+ /* Make sure active replication slots are released */
+ if (MyReplicationSlot != NULL)
+ ReplicationSlotRelease();
+
+ /* Also cleanup the synced temporary slots. */
+ ReplicationSlotCleanup(true);
+
+ /*
+ * The set syncing_slots indicates that the process errored out without
+ * resetting the flag. So, we need to clean up shared memory and reset the
+ * flag here.
+ */
+ if (syncing_slots)
+ reset_syncing_flag();
walrcv_disconnect(wrconn);
}
@@ -1634,9 +1726,17 @@ SyncReplicationSlots(WalReceiverConn *wrconn)
{
PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
{
+ check_and_set_sync_info(InvalidPid);
+
validate_remote_info(wrconn);
synchronize_slots(wrconn);
+
+ /* Cleanup the synced temporary slots */
+ ReplicationSlotCleanup(true);
+
+ /* We are done with sync, so reset sync flag */
+ reset_syncing_flag();
}
PG_END_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
}
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index cebf44bb0f..aa4ea387da 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -237,7 +237,7 @@ ReplicationSlotShmemExit(int code, Datum arg)
ReplicationSlotRelease();
/* Also cleanup all the temporary slots. */
- ReplicationSlotCleanup();
+ ReplicationSlotCleanup(false);
}
/*
@@ -736,10 +736,13 @@ ReplicationSlotRelease(void)
}
/*
- * Cleanup all temporary slots created in current session.
+ * Cleanup temporary slots created in current session.
+ *
+ * Cleanup only synced temporary slots if 'synced_only' is true, else
+ * cleanup all temporary slots.
*/
void
-ReplicationSlotCleanup(void)
+ReplicationSlotCleanup(bool synced_only)
{
int i;
@@ -755,7 +758,8 @@ restart:
continue;
SpinLockAcquire(&s->mutex);
- if (s->active_pid == MyProcPid)
+ if ((s->active_pid == MyProcPid &&
+ (!synced_only || s->data.synced)))
{
Assert(s->data.persistency == RS_TEMPORARY);
SpinLockRelease(&s->mutex);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 9bf7c67f37..c623b07cf0 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -336,7 +336,7 @@ WalSndErrorCleanup(void)
if (MyReplicationSlot != NULL)
ReplicationSlotRelease();
- ReplicationSlotCleanup();
+ ReplicationSlotCleanup(false);
replication_active = false;
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 76f48b13d2..2dff28afce 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -4410,7 +4410,7 @@ PostgresMain(const char *dbname, const char *username)
ReplicationSlotRelease();
/* We also want to cleanup temporary slots on error. */
- ReplicationSlotCleanup();
+ ReplicationSlotCleanup(false);
jit_reset_after_error();
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 7b937d1a0c..1bc80960ef 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -247,7 +247,7 @@ extern void ReplicationSlotAlter(const char *name, bool failover);
extern void ReplicationSlotAcquire(const char *name, bool nowait);
extern void ReplicationSlotRelease(void);
-extern void ReplicationSlotCleanup(void);
+extern void ReplicationSlotCleanup(bool synced_only);
extern void ReplicationSlotSave(void);
extern void ReplicationSlotMarkDirty(void);
--
2.34.1
On Tue, Apr 23, 2024 at 12:37 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Mon, Apr 22, 2024 at 7:04 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
On Mon, Apr 22, 2024 at 9:02 PM shveta malik <shveta.malik@gmail.com> wrote:
Thanks for the patch, the changes look good Amit. Please find the merged patch.
I've reviewed the patch and have some comments:
--- /* - * Early initialization. + * Register slotsync_worker_onexit() before we register + * ReplicationSlotShmemExit() in BaseInit(), to ensure that during the + * exit of the slot sync worker, ReplicationSlotShmemExit() is called + * first, followed by slotsync_worker_onexit(). The startup process during + * promotion invokes ShutDownSlotSync() which waits for slot sync to + * finish and it does that by checking the 'syncing' flag. Thus worker + * must be done with the slots' release and cleanup before it marks itself + * as finished syncing. */I'm slightly worried that we register the slotsync_worker_onexit()
callback before BaseInit(), because it could be a blocker when we want
to add more work in the callback, for example sending the stats.The other possibility is that we do slot release/clean up in the
slotsync_worker_onexit() call itself and then we can do it after
BaseInit().
This approach sounds clearer and safer to me. The current approach
relies on the callback registration order of
ReplicationSlotShmemExit(). If it changes in the future, we will
silently have the same problem. Every slot sync related work should be
done before allowing someone to touch synced slots by clearing the
'syncing' flag.
--- synchronize_slots(wrconn); + + /* Cleanup the temporary slots */ + ReplicationSlotCleanup(); + + /* We are done with sync, so reset sync flag */ + reset_syncing_flag();I think it ends up removing other temp slots that are created by the
same backend process using
pg_create_{physical,logical_replication_slots() function, which could
be a large side effect of this function for users.True, I think here we should either remove only temporary and synced
marked slots. The other possibility is to create slots as RS_EPHEMERAL
initially when called from the SQL function but that doesn't sound
like a neat approach.Also, if users want
to have a process periodically calling pg_sync_replication_slots()
instead of the slotsync worker, it doesn't support a case where we
create a temp not-ready slot and turn it into a persistent slot if
it's ready for sync.True, but eventually the API should be able to directly create the
persistent slots and anyway this can happen only for the first time
(till the slots are created and marked persistent) and one who wants
to use this function periodically should be able to see regular syncs.
I agree that we remove temp-and-synced slots created via the API at
the end of the API . We end up creating and dropping slots in every
API call but since the pg_sync_replication_slots() function is a kind
of debug-purpose function and it will not be common to call this
function regularly instead of using the slot sync worker, we can live
with such overhead.
Regards,
--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com
On Wed, Apr 24, 2024 at 10:28 AM shveta malik <shveta.malik@gmail.com> wrote:
Modified the logic to remove only synced temporary slots during
SQL-function exit.Please find v11 with above changes.
LGTM, so pushed!
--
With Regards,
Amit Kapila.