Allow logical failover slots to wait on synchronous replication
Hi hackers,
Building on bf279ddd1c, this patch introduces a GUC
'standby_slot_names_from_syncrep' which allows logical failover slots
to wait for changes to have been synchronously replicated before sending
the decoded changes to logical subscribers.
The existing 'standby_slot_names' isn't great for users who are running
clusters with quorum-based synchronous replicas. For instance, if
the user has synchronous_standby_names = 'ANY 3 (A,B,C,D,E)' it's a
bit tedious to have to reconfigure the standby_slot_names to set it to
the most updated 3 sync replicas whenever different sync replicas start
lagging. In the event that both GUCs are set, 'standby_slot_names' takes
precedence.
I did some very brief pgbench runs to compare the latency. Client instance
was running pgbench and 10 logical clients while the Postgres box hosted
the writer and 5 synchronous replicas.
There's a hit to TPS, which I'm thinking is due to more contention on the
SyncRepLock, and that scales with the number of logical walsenders. I'm
guessing we can avoid this if we introduce another set of
lsn[NUM_SYNC_REP_WAIT_MODE] and have the logical walsenders check
and wait on that instead but I wasn't sure if that's the right approach.
pgbench numbers:
// Empty standby_slot_names_from_syncrep
query mode: simple
number of clients: 8
number of threads: 8
maximum number of tries: 1
duration: 1800 s
number of transactions actually processed: 1720173
number of failed transactions: 0 (0.000%)
latency average = 8.371 ms
initial connection time = 7.963 ms
tps = 955.651025 (without initial connection time)
// standby_slot_names_from_syncrep = 'true'
scaling factor: 200
query mode: simple
number of clients: 8
number of threads: 8
maximum number of tries: 1
duration: 1800 s
number of transactions actually processed: 1630105
number of failed transactions: 0 (0.000%)
latency average = 8.834 ms
initial connection time = 7.670 ms
tps = 905.610230 (without initial connection time)
Thanks,
--
John Hsu - Amazon Web Services
Attachments:
0001-Introduce-a-new-guc-standby_slot_names_from_syncrep.patchapplication/octet-stream; name=0001-Introduce-a-new-guc-standby_slot_names_from_syncrep.patchDownload
From c19dd30a46379f8030ea41357c74bc4734e17083 Mon Sep 17 00:00:00 2001
From: John Hsu <johnhyvr@gmail.com>
Date: Tue, 4 Jun 2024 20:27:47 +0000
Subject: [PATCH] Introduce a new guc 'standby_slot_names_from_syncrep'
If synchronous replication is enabled, this patch
allows logical subscribers to wait for changes to
be replicated to synchronous replicas before
consuming the changes.
In the event that both 'standby_slot_names' and
'standby_slot_names_from_syncrep' are set, the
former takes precedence.
---
src/backend/replication/slot.c | 261 ++++++++++--------
src/backend/replication/syncrep.c | 11 +-
src/backend/utils/misc/guc_tables.c | 14 +
src/include/replication/slot.h | 1 +
src/include/replication/syncrep.h | 2 +
.../t/040_standby_failover_slots_sync.pl | 137 +++++++++
6 files changed, 313 insertions(+), 113 deletions(-)
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 0e54ea5bb9..b5b69a0d6f 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -49,6 +49,7 @@
#include "postmaster/interrupt.h"
#include "replication/slotsync.h"
#include "replication/slot.h"
+#include "replication/syncrep.h"
#include "replication/walsender_private.h"
#include "storage/fd.h"
#include "storage/ipc.h"
@@ -146,6 +147,7 @@ int max_replication_slots = 10; /* the maximum number of replication
* logical WAL sender processes will wait for.
*/
char *standby_slot_names;
+bool standby_slot_names_from_syncrep;
/* This is the parsed and cached configuration for standby_slot_names */
static StandbySlotNamesConfigData *standby_slot_names_config;
@@ -2572,8 +2574,8 @@ SlotExistsInStandbySlotNames(const char *slot_name)
}
/*
- * Return true if the slots specified in standby_slot_names have caught up to
- * the given WAL location, false otherwise.
+ * Return true if the slots specified in standby_slot_names or synchronous_commit settings,
+ * have caught up to the given WAL location, false otherwise.
*
* The elevel parameter specifies the error level used for logging messages
* related to slots that do not exist, are invalidated, or are inactive.
@@ -2587,9 +2589,9 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
/*
* Don't need to wait for the standbys to catch up if there is no value in
- * standby_slot_names.
+ * standby_slot_names and not waiting for synchronous replication.
*/
- if (standby_slot_names_config == NULL)
+ if (standby_slot_names_config == NULL && !standby_slot_names_from_syncrep)
return true;
/*
@@ -2600,136 +2602,172 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
return true;
/*
- * Don't need to wait for the standbys to catch up if they are already
- * beyond the specified WAL location.
- */
- if (!XLogRecPtrIsInvalid(ss_oldest_flush_lsn) &&
- ss_oldest_flush_lsn >= wait_for_lsn)
- return true;
-
- /*
- * To prevent concurrent slot dropping and creation while filtering the
- * slots, take the ReplicationSlotControlLock outside of the loop.
+ * In the event that both standby_slot_names_config and standby_slot_names_from_syncrep is enabled,
+ * have the former take precedence.
*/
- LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
-
- name = standby_slot_names_config->slot_names;
- for (int i = 0; i < standby_slot_names_config->nslotnames; i++)
+ if (standby_slot_names_config != NULL)
{
- XLogRecPtr restart_lsn;
- bool invalidated;
- bool inactive;
- ReplicationSlot *slot;
+ /*
+ * Don't need to wait for the standbys to catch up if they are already
+ * beyond the specified WAL location.
+ */
+ if (!XLogRecPtrIsInvalid(ss_oldest_flush_lsn) &&
+ ss_oldest_flush_lsn >= wait_for_lsn)
+ return true;
- slot = SearchNamedReplicationSlot(name, false);
+ /*
+ * To prevent concurrent slot dropping and creation while filtering the
+ * slots, take the ReplicationSlotControlLock outside of the loop.
+ */
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
- if (!slot)
+ name = standby_slot_names_config->slot_names;
+ for (int i = 0; i < standby_slot_names_config->nslotnames; i++)
{
- /*
- * If a slot name provided in standby_slot_names does not exist,
- * report a message and exit the loop. A user can specify a slot
- * name that does not exist just before the server startup. The
- * GUC check_hook(validate_standby_slots) cannot validate such a
- * slot during startup as the ReplicationSlotCtl shared memory is
- * not initialized at that time. It is also possible for a user to
- * drop the slot in standby_slot_names afterwards.
- */
- ereport(elevel,
- errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("replication slot \"%s\" specified in parameter %s does not exist",
- name, "standby_slot_names"),
- errdetail("Logical replication is waiting on the standby associated with \"%s\".",
- name),
- errhint("Consider creating the slot \"%s\" or amend parameter %s.",
- name, "standby_slot_names"));
- break;
- }
+ XLogRecPtr restart_lsn;
+ bool invalidated;
+ bool inactive;
+ ReplicationSlot *slot;
- if (SlotIsLogical(slot))
- {
- /*
- * If a logical slot name is provided in standby_slot_names,
- * report a message and exit the loop. Similar to the non-existent
- * case, a user can specify a logical slot name in
- * standby_slot_names before the server startup, or drop an
- * existing physical slot and recreate a logical slot with the
- * same name.
- */
- ereport(elevel,
- errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("cannot have logical replication slot \"%s\" in parameter %s",
- name, "standby_slot_names"),
- errdetail("Logical replication is waiting for correction on \"%s\".",
- name),
- errhint("Consider removing logical slot \"%s\" from parameter %s.",
- name, "standby_slot_names"));
- break;
- }
+ slot = SearchNamedReplicationSlot(name, false);
- SpinLockAcquire(&slot->mutex);
- restart_lsn = slot->data.restart_lsn;
- invalidated = slot->data.invalidated != RS_INVAL_NONE;
- inactive = slot->active_pid == 0;
- SpinLockRelease(&slot->mutex);
+ if (!slot)
+ {
+ /*
+ * If a slot name provided in standby_slot_names does not exist,
+ * report a message and exit the loop. A user can specify a slot
+ * name that does not exist just before the server startup. The
+ * GUC check_hook(validate_standby_slots) cannot validate such a
+ * slot during startup as the ReplicationSlotCtl shared memory is
+ * not initialized at that time. It is also possible for a user to
+ * drop the slot in standby_slot_names afterwards.
+ */
+ ereport(elevel,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("replication slot \"%s\" specified in parameter %s does not exist",
+ name, "standby_slot_names"),
+ errdetail("Logical replication is waiting on the standby associated with \"%s\".",
+ name),
+ errhint("Consider creating the slot \"%s\" or amend parameter %s.",
+ name, "standby_slot_names"));
+ break;
+ }
- if (invalidated)
- {
- /* Specified physical slot has been invalidated */
- ereport(elevel,
- errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("physical slot \"%s\" specified in parameter %s has been invalidated",
- name, "standby_slot_names"),
- errdetail("Logical replication is waiting on the standby associated with \"%s\".",
- name),
- errhint("Consider dropping and recreating the slot \"%s\" or amend parameter %s.",
- name, "standby_slot_names"));
- break;
- }
+ if (SlotIsLogical(slot))
+ {
+ /*
+ * If a logical slot name is provided in standby_slot_names,
+ * report a message and exit the loop. Similar to the non-existent
+ * case, a user can specify a logical slot name in
+ * standby_slot_names before the server startup, or drop an
+ * existing physical slot and recreate a logical slot with the
+ * same name.
+ */
+ ereport(elevel,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("cannot have logical replication slot \"%s\" in parameter %s",
+ name, "standby_slot_names"),
+ errdetail("Logical replication is waiting for correction on \"%s\".",
+ name),
+ errhint("Consider removing logical slot \"%s\" from parameter %s.",
+ name, "standby_slot_names"));
+ break;
+ }
- if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn < wait_for_lsn)
- {
- /* Log a message if no active_pid for this physical slot */
- if (inactive)
+ SpinLockAcquire(&slot->mutex);
+ restart_lsn = slot->data.restart_lsn;
+ invalidated = slot->data.invalidated != RS_INVAL_NONE;
+ inactive = slot->active_pid == 0;
+ SpinLockRelease(&slot->mutex);
+
+ if (invalidated)
+ {
+ /* Specified physical slot has been invalidated */
ereport(elevel,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("replication slot \"%s\" specified in parameter %s does not have active_pid",
- name, "standby_slot_names"),
+ errmsg("physical slot \"%s\" specified in parameter %s has been invalidated",
+ name, "standby_slot_names"),
errdetail("Logical replication is waiting on the standby associated with \"%s\".",
- name),
- errhint("Consider starting standby associated with \"%s\" or amend parameter %s.",
+ name),
+ errhint("Consider dropping and recreating the slot \"%s\" or amend parameter %s.",
name, "standby_slot_names"));
+ break;
+ }
- /* Continue if the current slot hasn't caught up. */
- break;
+ if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn < wait_for_lsn)
+ {
+ /* Log a message if no active_pid for this physical slot */
+ if (inactive)
+ ereport(elevel,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("replication slot \"%s\" specified in parameter %s does not have active_pid",
+ name, "standby_slot_names"),
+ errdetail("Logical replication is waiting on the standby associated with \"%s\".",
+ name),
+ errhint("Consider starting standby associated with \"%s\" or amend parameter %s.",
+ name, "standby_slot_names"));
+
+ /* Continue if the current slot hasn't caught up. */
+ break;
+ }
+
+ Assert(restart_lsn >= wait_for_lsn);
+
+ if (XLogRecPtrIsInvalid(min_restart_lsn) ||
+ min_restart_lsn > restart_lsn)
+ min_restart_lsn = restart_lsn;
+
+ caught_up_slot_num++;
+
+ name += strlen(name) + 1;
}
- Assert(restart_lsn >= wait_for_lsn);
+ LWLockRelease(ReplicationSlotControlLock);
- if (XLogRecPtrIsInvalid(min_restart_lsn) ||
- min_restart_lsn > restart_lsn)
- min_restart_lsn = restart_lsn;
+ /*
+ * Return false if not all the standbys have caught up to the specified
+ * WAL location.
+ */
+ if (caught_up_slot_num != standby_slot_names_config->nslotnames)
+ return false;
+
+ /* The ss_oldest_flush_lsn must not retreat. */
+ Assert(XLogRecPtrIsInvalid(ss_oldest_flush_lsn) ||
+ min_restart_lsn >= ss_oldest_flush_lsn);
- caught_up_slot_num++;
+ ss_oldest_flush_lsn = min_restart_lsn;
- name += strlen(name) + 1;
+ return true;
}
+ else
+ {
+ volatile WalSndCtlData *walsndctl = WalSndCtl;
+ static XLogRecPtr lsn[NUM_SYNC_REP_WAIT_MODE] = {InvalidXLogRecPtr};
+ int mode = SyncRepWaitMode;
+ int i;
- LWLockRelease(ReplicationSlotControlLock);
+ /*
+ * Don't wait if synchronous_commit is not configured properly
+ */
+ if (mode == SYNC_REP_NO_WAIT)
+ return true;
- /*
- * Return false if not all the standbys have caught up to the specified
- * WAL location.
- */
- if (caught_up_slot_num != standby_slot_names_config->nslotnames)
- return false;
+ if (!XLogRecPtrIsInvalid(lsn[mode]) && lsn[mode] >= wait_for_lsn)
+ return true;
- /* The ss_oldest_flush_lsn must not retreat. */
- Assert(XLogRecPtrIsInvalid(ss_oldest_flush_lsn) ||
- min_restart_lsn >= ss_oldest_flush_lsn);
+ LWLockAcquire(SyncRepLock, LW_SHARED);
+ for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
+ {
+ lsn[i] = walsndctl->lsn[i];
+ }
- ss_oldest_flush_lsn = min_restart_lsn;
+ LWLockRelease(SyncRepLock);
- return true;
+ if (!XLogRecPtrIsInvalid(lsn[mode]) && lsn[mode] >= wait_for_lsn)
+ return true;
+
+ return false;
+ }
}
/*
@@ -2744,13 +2782,12 @@ WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
/*
* Don't need to wait for the standby to catch up if the current acquired
* slot is not a logical failover slot, or there is no value in
- * standby_slot_names.
+ * standby_slot_names and standby_slot_names_from_syncrep.
*/
- if (!MyReplicationSlot->data.failover || !standby_slot_names_config)
+ if (!MyReplicationSlot->data.failover || !(standby_slot_names_config || standby_slot_names_from_syncrep))
return;
ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
-
for (;;)
{
CHECK_FOR_INTERRUPTS();
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index fa5988c824..0b85f3c3b8 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -78,6 +78,7 @@
#include "common/int.h"
#include "miscadmin.h"
#include "pgstat.h"
+#include "replication/slot.h"
#include "replication/syncrep.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
@@ -95,7 +96,7 @@ char *SyncRepStandbyNames;
static bool announce_next_takeover = true;
SyncRepConfigData *SyncRepConfig = NULL;
-static int SyncRepWaitMode = SYNC_REP_NO_WAIT;
+int SyncRepWaitMode = SYNC_REP_NO_WAIT;
static void SyncRepQueueInsert(int mode);
static void SyncRepCancelWait(void);
@@ -523,6 +524,14 @@ SyncRepReleaseWaiters(void)
LWLockRelease(SyncRepLock);
+ /*
+ * Only wake if standby_slot_names_from_syncrep is the only value set
+ */
+ if (standby_slot_names_from_syncrep && strcmp(standby_slot_names, "") == 0)
+ {
+ ConditionVariableBroadcast(&WalSndCtl->wal_confirm_rcv_cv);
+ }
+
elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X",
numwrite, LSN_FORMAT_ARGS(writePtr),
numflush, LSN_FORMAT_ARGS(flushPtr),
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 46c258be28..eb4ec4800c 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -2025,6 +2025,20 @@ struct config_bool ConfigureNamesBool[] =
NULL, NULL, NULL
},
+ {
+ {"standby_slot_names_from_syncrep", PGC_SIGHUP, REPLICATION_PRIMARY,
+ gettext_noop("Logical WAL sender processes will wait on the current"
+ "synchronous replication settings."),
+ gettext_noop("Logical WAL sender processes will send decoded "
+ "changes to plugins only after the changes have "
+ "been synchronously replicated."),
+ GUC_NOT_IN_SAMPLE
+ },
+ &standby_slot_names_from_syncrep,
+ false,
+ NULL, NULL, NULL
+ },
+
/* End-of-list marker */
{
{NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 1bc80960ef..77052cb6a5 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -230,6 +230,7 @@ extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot;
/* GUCs */
extern PGDLLIMPORT int max_replication_slots;
extern PGDLLIMPORT char *standby_slot_names;
+extern PGDLLIMPORT bool standby_slot_names_from_syncrep;
/* shmem initialization functions */
extern Size ReplicationSlotsShmemSize(void);
diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h
index ea439e6da6..fb783576b5 100644
--- a/src/include/replication/syncrep.h
+++ b/src/include/replication/syncrep.h
@@ -96,6 +96,8 @@ extern int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys);
/* called by checkpointer */
extern void SyncRepUpdateSyncStandbysDefined(void);
+extern int SyncRepWaitMode;
+
/*
* Internal functions for parsing synchronous_standby_names grammar,
* in syncrep_gram.y and syncrep_scanner.l
diff --git a/src/test/recovery/t/040_standby_failover_slots_sync.pl b/src/test/recovery/t/040_standby_failover_slots_sync.pl
index 3b6dddba71..94f3a5b54c 100644
--- a/src/test/recovery/t/040_standby_failover_slots_sync.pl
+++ b/src/test/recovery/t/040_standby_failover_slots_sync.pl
@@ -710,6 +710,143 @@ $result = $subscriber1->safe_psql('postgres',
is($result, 't',
"subscriber1 gets data from primary after standby1 acknowledges changes");
+##################################################
+# Test that logical failover replication slots wait for the specified
+# synchronous replicas to receive the changes first. It uses the
+# following set up:
+#
+# (synchronous physical standbys)
+# | ----> standby1 (application_name = standby1)
+# | ----> standby2 (application_name = standby2)
+# primary ----- |
+# (logical replication)
+# | ----> subscriber1 (failover = true, slot_name = lsub1_slot)
+# | ----> subscriber2 (failover = false, slot_name = lsub2_slot)
+#
+# synchronous_commit = 'on'
+# synchronous_standby_names = 'ANY 2 (standby1, standby2)'
+# standby_slot_names_from_syncrep = 'true'
+#
+# The setup is configured in such a way that the logical slot of subscriber1 is
+# enabled for failover, and thus the subscriber1 will wait for the changes to have
+# been synchronously replicated before receiving the decoded changes.
+##################################################
+
+$primary->safe_psql('postgres', "TRUNCATE tab_int;");
+# Setup synchronous replication
+$primary->append_conf(
+ 'postgresql.conf', qq(
+ synchronous_commit = 'on'
+ synchronous_standby_names = 'ANY 2 (standby1, standby2)'
+ standby_slot_names_from_syncrep = 'true'
+));
+
+$primary->reload;
+
+$standby1->append_conf('postgresql.conf', "primary_conninfo = '$connstr_1 dbname=postgres application_name=standby1'");
+$standby1->reload;
+
+$standby2->append_conf('postgresql.conf', "primary_conninfo = '$connstr_1 dbname=postgres application_name=standby2'");
+$standby2->reload;
+
+# Check that synchronous replication is setup properly
+$standby2->stop;
+
+# Create some data on the primary
+$primary_row_count = 10;
+
+my $sync_back_q = $primary->background_psql(
+ 'postgres',
+ on_error_stop => 0,
+ timeout => $PostgreSQL::Test::Utils::timeout_default);
+
+$sync_back_q->query_until(qr/insert_blocked_on_sync_rep/, q(
+ \echo insert_blocked_on_sync_rep
+ INSERT INTO tab_int SELECT generate_series(1, 10);
+));
+
+$result = $primary->safe_psql('postgres',
+ "SELECT count(*) = $primary_row_count FROM tab_int;");
+
+is($result, 'f', "primary row count is not updated due to synchronous replication");
+
+# Verify the standby specified in standby_slot_names (sb1_slot aka standby1)
+# catches up with the primary.
+$primary->wait_for_replay_catchup($standby1);
+$result = $standby1->safe_psql('postgres',
+ "SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't', "standby1 gets data from primary");
+
+# Validate that standby_slot_names overrides standby_slot_names_from_syncrep
+# since the slot specified (sb1_slot) has received the changes, primary can send
+# the decoded changes to the subscription enabled for failover
+# (i.e. regress_mysub1).
+$primary->wait_for_catchup('regress_mysub1');
+$result = $subscriber1->safe_psql('postgres',
+ "SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't',
+ "subscriber1 gets data from primary after standby1 acknowledges changes, overriding standby_slot_names_from_syncrep");
+
+$standby2->start;
+$primary->wait_for_replay_catchup($standby2);
+
+# Unset standby_slot_names to test standby_slot_names_from_syncrep
+# blocks primary from sending logical decoded changes to failover slots until
+# changes have been synchronously replicated.
+$primary->append_conf(
+ 'postgresql.conf', qq(
+ standby_slot_names = ''));
+$primary->reload;
+
+$primary_row_count = 20;
+$standby2->stop;
+
+$sync_back_q->query_until(
+ qr/insert_blocked_on_additional_sync_rep/, q(
+ \echo insert_blocked_on_additional_sync_rep
+ INSERT INTO tab_int SELECT generate_series(11, 20);
+));
+
+# Since $standby2 has not received the changes, validate that subscriber1 (failover = true)
+# has not received the decoded changes, but subscriber2 (failover = false) has.
+$primary->wait_for_catchup('regress_mysub2');
+
+$result = $subscriber2->safe_psql('postgres',
+ "SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't',
+ "subscriber2 gets data from primary even if the changes have not been synchronously acknowledged.");
+
+$result = $subscriber1->safe_psql('postgres',
+ "SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 'f',
+ "subscriber1 does not get data from primary since changes have not been synchronously acknowledged.");
+
+# Start standby2 to allow the changes to be acknowledged by all the synchronous standbys.
+$standby2->start;
+$primary->wait_for_replay_catchup($standby2);
+$primary->wait_for_catchup('regress_mysub1');
+
+# Now that the changes have been replicated to all synchronous nodes,
+# primary can send the decoded changes to the subscription enabled for failover
+# (i.e. regress_mysub1). While the standby was down, regress_mysub1 didn't
+# receive any data from the primary. i.e. the primary didn't allow it to go
+# ahead of requirements of synchronous commit.
+$result = $subscriber1->safe_psql('postgres',
+ "SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't',
+ "subscriber1 gets data from primary since changes have been syhcronously acknowledged.");
+# No longer need to run the background session.
+$sync_back_q->quit;
+
+# Reset standby_slot_names and synchronous commit for below test cases
+$primary->append_conf(
+ 'postgresql.conf', qq(
+standby_slot_names = 'sb1_slot'
+synchronous_standby_names = ''
+standby_slot_names_from_syncrep = 'false'
+));
+$primary->reload;
+
##################################################
# Verify that when using pg_logical_slot_get_changes to consume changes from a
# logical failover slot, it will also wait for the slots specified in
--
2.40.1
On Mon, Jun 10, 2024 at 03:51:05PM -0700, John H wrote:
The existing 'standby_slot_names' isn't great for users who are running
clusters with quorum-based synchronous replicas. For instance, if
the user has synchronous_standby_names = 'ANY 3 (A,B,C,D,E)' it's a
bit tedious to have to reconfigure the standby_slot_names to set it to
the most updated 3 sync replicas whenever different sync replicas start
lagging. In the event that both GUCs are set, 'standby_slot_names' takes
precedence.
Hm. IIUC you'd essentially need to set standby_slot_names to "A,B,C,D,E"
to get the desired behavior today. That might ordinarily be okay, but it
could cause logical replication to be held back unnecessarily if one of the
replicas falls behind for whatever reason. A way to tie standby_slot_names
to synchronous replication instead does seem like it would be useful in
this case.
--
nathan
Hi,
On Mon, Jun 10, 2024 at 09:25:10PM -0500, Nathan Bossart wrote:
On Mon, Jun 10, 2024 at 03:51:05PM -0700, John H wrote:
The existing 'standby_slot_names' isn't great for users who are running
clusters with quorum-based synchronous replicas. For instance, if
the user has synchronous_standby_names = 'ANY 3 (A,B,C,D,E)' it's a
bit tedious to have to reconfigure the standby_slot_names to set it to
the most updated 3 sync replicas whenever different sync replicas start
lagging. In the event that both GUCs are set, 'standby_slot_names' takes
precedence.Hm. IIUC you'd essentially need to set standby_slot_names to "A,B,C,D,E"
to get the desired behavior today. That might ordinarily be okay, but it
could cause logical replication to be held back unnecessarily if one of the
replicas falls behind for whatever reason. A way to tie standby_slot_names
to synchronous replication instead does seem like it would be useful in
this case.
FWIW, I have the same understanding and also think your proposal would be
useful in this case.
Regards,
--
Bertrand Drouvot
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
Hi,
On Tue, Jun 11, 2024 at 10:00:46AM +0000, Bertrand Drouvot wrote:
Hi,
On Mon, Jun 10, 2024 at 09:25:10PM -0500, Nathan Bossart wrote:
On Mon, Jun 10, 2024 at 03:51:05PM -0700, John H wrote:
The existing 'standby_slot_names' isn't great for users who are running
clusters with quorum-based synchronous replicas. For instance, if
the user has synchronous_standby_names = 'ANY 3 (A,B,C,D,E)' it's a
bit tedious to have to reconfigure the standby_slot_names to set it to
the most updated 3 sync replicas whenever different sync replicas start
lagging. In the event that both GUCs are set, 'standby_slot_names' takes
precedence.Hm. IIUC you'd essentially need to set standby_slot_names to "A,B,C,D,E"
to get the desired behavior today. That might ordinarily be okay, but it
could cause logical replication to be held back unnecessarily if one of the
replicas falls behind for whatever reason. A way to tie standby_slot_names
to synchronous replication instead does seem like it would be useful in
this case.FWIW, I have the same understanding and also think your proposal would be
useful in this case.
A few random comments about v1:
1 ====
+ int mode = SyncRepWaitMode;
It's set to SyncRepWaitMode and then never change. Worth to get rid of "mode"?
2 ====
+ static XLogRecPtr lsn[NUM_SYNC_REP_WAIT_MODE] = {InvalidXLogRecPtr};
I did some testing and saw that the lsn[] values were not always set to
InvalidXLogRecPtr right after. It looks like that, in that case, we should
avoid setting the lsn[] values at compile time. Then, what about?
1. remove the "static".
or
2. keep the static but set the lsn[] values after its declaration.
3 ====
- /*
- * Return false if not all the standbys have caught up to the specified
- * WAL location.
- */
- if (caught_up_slot_num != standby_slot_names_config->nslotnames)
- return false;
+ if (!XLogRecPtrIsInvalid(lsn[mode]) && lsn[mode] >= wait_for_lsn)
+ return true;
lsn[] values are/(should have been, see 2 above) just been initialized to
InvalidXLogRecPtr so that XLogRecPtrIsInvalid(lsn[mode]) will always return
true. I think this check is not needed then.
4 ====
I did some very brief pgbench runs to compare the latency. Client instance
was running pgbench and 10 logical clients while the Postgres box hosted
the writer and 5 synchronous replicas.
There's a hit to TPS
Out of curiosity, did you compare with standby_slot_names_from_syncrep set to off
and standby_slot_names not empty?
Regards,
--
Bertrand Drouvot
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
On Tue, Jun 11, 2024 at 4:21 AM John H <johnhyvr@gmail.com> wrote:
Building on bf279ddd1c, this patch introduces a GUC
'standby_slot_names_from_syncrep' which allows logical failover slots
to wait for changes to have been synchronously replicated before sending
the decoded changes to logical subscribers.The existing 'standby_slot_names' isn't great for users who are running
clusters with quorum-based synchronous replicas. For instance, if
the user has synchronous_standby_names = 'ANY 3 (A,B,C,D,E)' it's a
bit tedious to have to reconfigure the standby_slot_names to set it to
the most updated 3 sync replicas whenever different sync replicas start
lagging. In the event that both GUCs are set, 'standby_slot_names' takes
precedence.I did some very brief pgbench runs to compare the latency. Client instance
was running pgbench and 10 logical clients while the Postgres box hosted
the writer and 5 synchronous replicas.There's a hit to TPS, which I'm thinking is due to more contention on the
SyncRepLock, and that scales with the number of logical walsenders. I'm
guessing we can avoid this if we introduce another set of
lsn[NUM_SYNC_REP_WAIT_MODE] and have the logical walsenders check
and wait on that instead but I wasn't sure if that's the right approach.pgbench numbers:
// Empty standby_slot_names_from_syncrep
query mode: simple
..
latency average = 8.371 ms
initial connection time = 7.963 ms
tps = 955.651025 (without initial connection time)// standby_slot_names_from_syncrep = 'true'
scaling factor: 200
...
latency average = 8.834 ms
initial connection time = 7.670 ms
tps = 905.610230 (without initial connection time)
The reading indicates when you set 'standby_slot_names_from_syncrep',
the TPS reduces as compared to when it is not set. It would be better
to see the data comparing 'standby_slot_names_from_syncrep' and the
existing parameter 'standby_slot_names'.
I see the value in your idea but was wondering if can we do something
without introducing a new GUC for this. Can we make it a default
behavior that logical slots marked with a failover option will wait
for 'synchronous_standby_names' as per your patch's idea unless
'standby_slot_names' is specified? I don't know if there is any value
in setting the 'failover' option for a slot without specifying
'standby_slot_names', so was wondering if we can additionally tie it
to 'synchronous_standby_names'. Any better ideas?
--
With Regards,
Amit Kapila.
Hi,
Thanks Bertrand for taking a look at the patch.
On Mon, Jun 17, 2024 at 8:19 AM Bertrand Drouvot
<bertranddrouvot.pg@gmail.com> wrote:
+ int mode = SyncRepWaitMode;
It's set to SyncRepWaitMode and then never change. Worth to get rid of "mode"?
I took a deeper look at this with GDB and I think it's necessary to
cache the value of "mode".
We first check:
if (mode == SYNC_REP_NO_WAIT)
return true;
However after this check it's possible to receive a SIGHUP changing
SyncRepWaitMode
to SYNC_REP_NO_WAIT (e.g. synchronous_commit = 'on' -> 'off'), leading
to lsn[-1].
2 ====
+ static XLogRecPtr lsn[NUM_SYNC_REP_WAIT_MODE] = {InvalidXLogRecPtr};
I did some testing and saw that the lsn[] values were not always set to
InvalidXLogRecPtr right after. It looks like that, in that case, we should
avoid setting the lsn[] values at compile time. Then, what about?1. remove the "static".
or
2. keep the static but set the lsn[] values after its declaration.
I'd prefer to keep the static because it reduces unnecessary
contention on SyncRepLock if logical client has fallen behind.
I'll add a change with your second suggestion.
3 ====
- /* - * Return false if not all the standbys have caught up to the specified - * WAL location. - */ - if (caught_up_slot_num != standby_slot_names_config->nslotnames) - return false; + if (!XLogRecPtrIsInvalid(lsn[mode]) && lsn[mode] >= wait_for_lsn) + return true;lsn[] values are/(should have been, see 2 above) just been initialized to
InvalidXLogRecPtr so that XLogRecPtrIsInvalid(lsn[mode]) will always return
true. I think this check is not needed then.
Removed.
Out of curiosity, did you compare with standby_slot_names_from_syncrep set to off
and standby_slot_names not empty?
I didn't think 'standby_slot_names' would impact TPS as much since
it's not grabbing the SyncRepLock but here's a quick test.
Writer with 5 synchronous replicas, 10 pg_recvlogical clients and
pgbench all running from the same server.
Command: pgbench -c 4 -j 4 -T 600 -U "ec2-user" -d postgres -r -P 5
Result with: standby_slot_names =
'replica_1,replica_2,replica_3,replica_4,replica_5'
latency average = 5.600 ms
latency stddev = 2.854 ms
initial connection time = 5.503 ms
tps = 714.148263 (without initial connection time)
Result with: standby_slot_names_from_syncrep = 'true',
synchronous_standby_names = 'ANY 3 (A,B,C,D,E)'
latency average = 5.740 ms
latency stddev = 2.543 ms
initial connection time = 4.093 ms
tps = 696.776249 (without initial connection time)
Result with nothing set:
latency average = 5.090 ms
latency stddev = 3.467 ms
initial connection time = 4.989 ms
tps = 785.665963 (without initial connection time)
Again I think it's possible to improve the synchronous numbers if we
cache but I'll try that out in a bit.
--
John Hsu - Amazon Web Services
Hi Amit,
Thanks for taking a look.
On Tue, Jun 18, 2024 at 10:34 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
The reading indicates when you set 'standby_slot_names_from_syncrep',
the TPS reduces as compared to when it is not set. It would be better
to see the data comparing 'standby_slot_names_from_syncrep' and the
existing parameter 'standby_slot_names'.
I added new benchmark numbers in the reply to Bertrand, but I'll
include in this thread for posterity.
Writer with 5 synchronous replicas, 10 pg_recvlogical clients and
pgbench all running from the same server.
Command: pgbench -c 4 -j 4 -T 600 -U "ec2-user" -d postgres -r -P 5
Result with: standby_slot_names =
'replica_1,replica_2,replica_3,replica_4,replica_5'
latency average = 5.600 ms
latency stddev = 2.854 ms
initial connection time = 5.503 ms
tps = 714.148263 (without initial connection time)
Result with: standby_slot_names_from_syncrep = 'true',
synchronous_standby_names = 'ANY 3 (A,B,C,D,E)'
latency average = 5.740 ms
latency stddev = 2.543 ms
initial connection time = 4.093 ms
tps = 696.776249 (without initial connection time)
Result with nothing set:
latency average = 5.090 ms
latency stddev = 3.467 ms
initial connection time = 4.989 ms
tps = 785.665963 (without initial connection time)
Can we make it a default
behavior that logical slots marked with a failover option will wait
for 'synchronous_standby_names' as per your patch's idea unless
'standby_slot_names' is specified? I don't know if there is any value
in setting the 'failover' option for a slot without specifying
'standby_slot_names', so was wondering if we can additionally tie it
to 'synchronous_standby_names'. Any better ideas?
No, I think that works pretty cleanly actually. Reserving some special
keyword isn't great
which is the only other thing I can think of. I've updated the patch
and tests to reflect that.
Attached the patch that addresses these changes.
--
John Hsu - Amazon Web Services
Attachments:
0002-Wait-on-synchronous-replication-by-default-for-logic.patchapplication/octet-stream; name=0002-Wait-on-synchronous-replication-by-default-for-logic.patchDownload
From 805e5b7e35913f53b9caac996aaf0b62e937f274 Mon Sep 17 00:00:00 2001
From: John Hsu <johnhyvr@gmail.com>
Date: Tue, 4 Jun 2024 20:27:47 +0000
Subject: [PATCH] Wait on synchronous replication by default for logical
failover slots
If synchronous replication is enabled, this patch
allows logical subscribers to wait for changes to
be replicated to synchronous replicas before
consuming the changes.
In the event that 'synchronized_standby_slots' is set, it
will take precedence.
---
src/backend/replication/slot.c | 271 ++++++++++--------
src/backend/replication/syncrep.c | 27 +-
src/include/replication/syncrep.h | 3 +
.../t/040_standby_failover_slots_sync.pl | 135 +++++++++
4 files changed, 321 insertions(+), 115 deletions(-)
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index baf9b89dc4..fafb6c7873 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -49,6 +49,7 @@
#include "postmaster/interrupt.h"
#include "replication/slotsync.h"
#include "replication/slot.h"
+#include "replication/syncrep.h"
#include "replication/walsender_private.h"
#include "storage/fd.h"
#include "storage/ipc.h"
@@ -2570,8 +2571,8 @@ SlotExistsInSyncStandbySlots(const char *slot_name)
}
/*
- * Return true if the slots specified in synchronized_standby_slots have caught up to
- * the given WAL location, false otherwise.
+ * Return true if the slots specified in synchronized_standby_slots or synchronous
+ * replication have caught up to the given WAL location, false otherwise.
*
* The elevel parameter specifies the error level used for logging messages
* related to slots that do not exist, are invalidated, or are inactive.
@@ -2585,9 +2586,9 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
/*
* Don't need to wait for the standbys to catch up if there is no value in
- * synchronized_standby_slots.
+ * synchronized_standby_slots or synchronous replication is not configured.
*/
- if (synchronized_standby_slots_config == NULL)
+ if (synchronized_standby_slots_config == NULL && !SyncRepConfigured())
return true;
/*
@@ -2598,144 +2599,190 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
return true;
/*
- * Don't need to wait for the standbys to catch up if they are already
- * beyond the specified WAL location.
- */
- if (!XLogRecPtrIsInvalid(ss_oldest_flush_lsn) &&
- ss_oldest_flush_lsn >= wait_for_lsn)
- return true;
-
- /*
- * To prevent concurrent slot dropping and creation while filtering the
- * slots, take the ReplicationSlotControlLock outside of the loop.
+ * In the event that synchronized_standby_slots and synchronous replication is
+ * configured, have the former take precedence.
*/
- LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
-
- name = synchronized_standby_slots_config->slot_names;
- for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
+ if (synchronized_standby_slots_config != NULL)
{
- XLogRecPtr restart_lsn;
- bool invalidated;
- bool inactive;
- ReplicationSlot *slot;
+ /*
+ * Don't need to wait for the standbys to catch up if they are already
+ * beyond the specified WAL location.
+ */
+ if (!XLogRecPtrIsInvalid(ss_oldest_flush_lsn) &&
+ ss_oldest_flush_lsn >= wait_for_lsn)
+ return true;
- slot = SearchNamedReplicationSlot(name, false);
+ /*
+ * To prevent concurrent slot dropping and creation while filtering the
+ * slots, take the ReplicationSlotControlLock outside of the loop.
+ */
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
- if (!slot)
+ name = synchronized_standby_slots_config->slot_names;
+ for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
{
- /*
- * If a slot name provided in synchronized_standby_slots does not
- * exist, report a message and exit the loop. A user can specify a
- * slot name that does not exist just before the server startup.
- * The GUC check_hook(validate_sync_standby_slots) cannot validate
- * such a slot during startup as the ReplicationSlotCtl shared
- * memory is not initialized at that time. It is also possible for
- * a user to drop the slot in synchronized_standby_slots
- * afterwards.
- */
- ereport(elevel,
- errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("replication slot \"%s\" specified in parameter %s does not exist",
- name, "synchronized_standby_slots"),
- errdetail("Logical replication is waiting on the standby associated with \"%s\".",
- name),
- errhint("Consider creating the slot \"%s\" or amend parameter %s.",
- name, "synchronized_standby_slots"));
- break;
- }
+ XLogRecPtr restart_lsn;
+ bool invalidated;
+ bool inactive;
+ ReplicationSlot *slot;
- if (SlotIsLogical(slot))
- {
- /*
- * If a logical slot name is provided in
- * synchronized_standby_slots, report a message and exit the loop.
- * Similar to the non-existent case, a user can specify a logical
- * slot name in synchronized_standby_slots before the server
- * startup, or drop an existing physical slot and recreate a
- * logical slot with the same name.
- */
- ereport(elevel,
- errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("cannot have logical replication slot \"%s\" in parameter %s",
- name, "synchronized_standby_slots"),
- errdetail("Logical replication is waiting for correction on \"%s\".",
- name),
- errhint("Consider removing logical slot \"%s\" from parameter %s.",
- name, "synchronized_standby_slots"));
- break;
- }
+ slot = SearchNamedReplicationSlot(name, false);
- SpinLockAcquire(&slot->mutex);
- restart_lsn = slot->data.restart_lsn;
- invalidated = slot->data.invalidated != RS_INVAL_NONE;
- inactive = slot->active_pid == 0;
- SpinLockRelease(&slot->mutex);
+ if (!slot)
+ {
+ /*
+ * If a slot name provided in synchronized_standby_slots does not
+ * exist, report a message and exit the loop. A user can specify a
+ * slot name that does not exist just before the server startup.
+ * The GUC check_hook(validate_sync_standby_slots) cannot validate
+ * such a slot during startup as the ReplicationSlotCtl shared
+ * memory is not initialized at that time. It is also possible for
+ * a user to drop the slot in synchronized_standby_slots
+ * afterwards.
+ */
+ ereport(elevel,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("replication slot \"%s\" specified in parameter %s does not exist",
+ name, "synchronized_standby_slots"),
+ errdetail("Logical replication is waiting on the standby associated with \"%s\".",
+ name),
+ errhint("Consider creating the slot \"%s\" or amend parameter %s.",
+ name, "synchronized_standby_slots"));
+ break;
+ }
- if (invalidated)
- {
- /* Specified physical slot has been invalidated */
- ereport(elevel,
- errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("physical slot \"%s\" specified in parameter %s has been invalidated",
- name, "synchronized_standby_slots"),
- errdetail("Logical replication is waiting on the standby associated with \"%s\".",
- name),
- errhint("Consider dropping and recreating the slot \"%s\" or amend parameter %s.",
- name, "synchronized_standby_slots"));
- break;
- }
+ if (SlotIsLogical(slot))
+ {
+ /*
+ * If a logical slot name is provided in
+ * synchronized_standby_slots, report a message and exit the loop.
+ * Similar to the non-existent case, a user can specify a logical
+ * slot name in synchronized_standby_slots before the server
+ * startup, or drop an existing physical slot and recreate a
+ * logical slot with the same name.
+ */
+ ereport(elevel,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("cannot have logical replication slot \"%s\" in parameter %s",
+ name, "synchronized_standby_slots"),
+ errdetail("Logical replication is waiting for correction on \"%s\".",
+ name),
+ errhint("Consider removing logical slot \"%s\" from parameter %s.",
+ name, "synchronized_standby_slots"));
+ break;
+ }
- if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn < wait_for_lsn)
- {
- /* Log a message if no active_pid for this physical slot */
- if (inactive)
+ SpinLockAcquire(&slot->mutex);
+ restart_lsn = slot->data.restart_lsn;
+ invalidated = slot->data.invalidated != RS_INVAL_NONE;
+ inactive = slot->active_pid == 0;
+ SpinLockRelease(&slot->mutex);
+
+ if (invalidated)
+ {
+ /* Specified physical slot has been invalidated */
ereport(elevel,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("replication slot \"%s\" specified in parameter %s does not have active_pid",
- name, "synchronized_standby_slots"),
+ errmsg("physical slot \"%s\" specified in parameter %s has been invalidated",
+ name, "synchronized_standby_slots"),
errdetail("Logical replication is waiting on the standby associated with \"%s\".",
- name),
- errhint("Consider starting standby associated with \"%s\" or amend parameter %s.",
+ name),
+ errhint("Consider dropping and recreating the slot \"%s\" or amend parameter %s.",
name, "synchronized_standby_slots"));
+ break;
+ }
- /* Continue if the current slot hasn't caught up. */
- break;
+ if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn < wait_for_lsn)
+ {
+ /* Log a message if no active_pid for this physical slot */
+ if (inactive)
+ ereport(elevel,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("replication slot \"%s\" specified in parameter %s does not have active_pid",
+ name, "synchronized_standby_slots"),
+ errdetail("Logical replication is waiting on the standby associated with \"%s\".",
+ name),
+ errhint("Consider starting standby associated with \"%s\" or amend parameter %s.",
+ name, "synchronized_standby_slots"));
+
+ /* Continue if the current slot hasn't caught up. */
+ break;
+ }
+
+ Assert(restart_lsn >= wait_for_lsn);
+
+ if (XLogRecPtrIsInvalid(min_restart_lsn) ||
+ min_restart_lsn > restart_lsn)
+ min_restart_lsn = restart_lsn;
+
+ caught_up_slot_num++;
+
+ name += strlen(name) + 1;
}
- Assert(restart_lsn >= wait_for_lsn);
+ LWLockRelease(ReplicationSlotControlLock);
+
+ /*
+ * Return false if not all the standbys have caught up to the specified
+ * WAL location.
+ */
+ if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
+ return false;
- if (XLogRecPtrIsInvalid(min_restart_lsn) ||
- min_restart_lsn > restart_lsn)
- min_restart_lsn = restart_lsn;
+ /* The ss_oldest_flush_lsn must not retreat. */
+ Assert(XLogRecPtrIsInvalid(ss_oldest_flush_lsn) ||
+ min_restart_lsn >= ss_oldest_flush_lsn);
- caught_up_slot_num++;
+ ss_oldest_flush_lsn = min_restart_lsn;
- name += strlen(name) + 1;
+ return true;
}
+ else
+ {
+ volatile WalSndCtlData *walsndctl = WalSndCtl;
+ static XLogRecPtr lsn[NUM_SYNC_REP_WAIT_MODE];
+ static bool initialized = false;
+ /* Initialize value in case SIGHUP changing to SYNC_REP_NO_WAIT */
+ int mode = SyncRepWaitMode;
+ int i;
+
+ if (!initialized)
+ {
+ for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
+ {
+ lsn[i] = InvalidXLogRecPtr;
+ }
+ }
- LWLockRelease(ReplicationSlotControlLock);
+ if (mode == SYNC_REP_NO_WAIT)
+ return true;
- /*
- * Return false if not all the standbys have caught up to the specified
- * WAL location.
- */
- if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
- return false;
+ if (lsn[mode] >= wait_for_lsn)
+ return true;
+
+ LWLockAcquire(SyncRepLock, LW_SHARED);
- /* The ss_oldest_flush_lsn must not retreat. */
- Assert(XLogRecPtrIsInvalid(ss_oldest_flush_lsn) ||
- min_restart_lsn >= ss_oldest_flush_lsn);
+ /* Cache values to reduce contention on lock */
+ for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
+ {
+ lsn[i] = walsndctl->lsn[i];
+ }
- ss_oldest_flush_lsn = min_restart_lsn;
+ LWLockRelease(SyncRepLock);
- return true;
+ if (lsn[mode] >= wait_for_lsn)
+ return true;
+
+ return false;
+ }
}
/*
* Wait for physical standbys to confirm receiving the given lsn.
*
* Used by logical decoding SQL functions. It waits for physical standbys
- * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
+ * corresponding to the physical slots specified in the synchronized_standby_slots GUC,
+ * or synchronous replication.
*/
void
WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
@@ -2745,7 +2792,7 @@ WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
* slot is not a logical failover slot, or there is no value in
* synchronized_standby_slots.
*/
- if (!MyReplicationSlot->data.failover || !synchronized_standby_slots_config)
+ if (!MyReplicationSlot->data.failover || !(synchronized_standby_slots_config || SyncRepConfigured()))
return;
ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index fa5988c824..69c061d8dc 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -78,6 +78,7 @@
#include "common/int.h"
#include "miscadmin.h"
#include "pgstat.h"
+#include "replication/slot.h"
#include "replication/syncrep.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
@@ -95,7 +96,7 @@ char *SyncRepStandbyNames;
static bool announce_next_takeover = true;
SyncRepConfigData *SyncRepConfig = NULL;
-static int SyncRepWaitMode = SYNC_REP_NO_WAIT;
+int SyncRepWaitMode = SYNC_REP_NO_WAIT;
static void SyncRepQueueInsert(int mode);
static void SyncRepCancelWait(void);
@@ -124,6 +125,8 @@ static int cmp_lsn(const void *a, const void *b);
static bool SyncRepQueueIsOrderedByLSN(int mode);
#endif
+bool SyncRepConfigured(void);
+
/*
* ===========================================================
* Synchronous Replication functions for normal user backends
@@ -169,8 +172,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
* described in SyncRepUpdateSyncStandbysDefined(). On the other hand, if
* it's false, the lock is not necessary because we don't touch the queue.
*/
- if (!SyncRepRequested() ||
- !((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
+ if (!SyncRepConfigured())
return;
/* Cap the level for anything other than commit to remote flush only. */
@@ -523,6 +525,15 @@ SyncRepReleaseWaiters(void)
LWLockRelease(SyncRepLock);
+ /*
+ * If synchronized_standby_slots is set, the respective walsender's
+ * will be responsible for broadcasting the value.
+ */
+ if (strcmp(synchronized_standby_slots, "") == 0)
+ {
+ ConditionVariableBroadcast(&WalSndCtl->wal_confirm_rcv_cv);
+ }
+
elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X",
numwrite, LSN_FORMAT_ARGS(writePtr),
numflush, LSN_FORMAT_ARGS(flushPtr),
@@ -1069,3 +1080,13 @@ assign_synchronous_commit(int newval, void *extra)
break;
}
}
+
+bool
+SyncRepConfigured()
+{
+ if (!SyncRepRequested() ||
+ !((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
+ return false;
+
+ return true;
+}
diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h
index ea439e6da6..ec22bc72df 100644
--- a/src/include/replication/syncrep.h
+++ b/src/include/replication/syncrep.h
@@ -82,6 +82,7 @@ extern PGDLLIMPORT char *SyncRepStandbyNames;
/* called by user backend */
extern void SyncRepWaitForLSN(XLogRecPtr lsn, bool commit);
+extern bool SyncRepConfigured();
/* called at backend exit */
extern void SyncRepCleanupAtProcExit(void);
@@ -96,6 +97,8 @@ extern int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys);
/* called by checkpointer */
extern void SyncRepUpdateSyncStandbysDefined(void);
+extern int SyncRepWaitMode;
+
/*
* Internal functions for parsing synchronous_standby_names grammar,
* in syncrep_gram.y and syncrep_scanner.l
diff --git a/src/test/recovery/t/040_standby_failover_slots_sync.pl b/src/test/recovery/t/040_standby_failover_slots_sync.pl
index 2c51cfc3c8..00130d92dc 100644
--- a/src/test/recovery/t/040_standby_failover_slots_sync.pl
+++ b/src/test/recovery/t/040_standby_failover_slots_sync.pl
@@ -710,6 +710,141 @@ $result = $subscriber1->safe_psql('postgres',
is($result, 't',
"subscriber1 gets data from primary after standby1 acknowledges changes");
+##################################################
+# Test that logical failover replication slots wait for the specified
+# synchronous replicas to receive the changes first. It uses the
+# following set up:
+#
+# (synchronous physical standbys)
+# | ----> standby1 (application_name = standby1)
+# | ----> standby2 (application_name = standby2)
+# primary ----- |
+# (logical replication)
+# | ----> subscriber1 (failover = true, slot_name = lsub1_slot)
+# | ----> subscriber2 (failover = false, slot_name = lsub2_slot)
+#
+# synchronous_commit = 'on'
+# synchronous_standby_names = 'ANY 2 (standby1, standby2)'
+#
+# The setup is configured in such a way that the logical slot of subscriber1 is
+# enabled for failover, and thus the subscriber1 will wait for the changes to have
+# been synchronously replicated before receiving the decoded changes.
+##################################################
+
+$primary->safe_psql('postgres', "TRUNCATE tab_int;");
+# Setup synchronous replication
+$primary->append_conf(
+ 'postgresql.conf', qq(
+ synchronous_commit = 'on'
+ synchronous_standby_names = 'ANY 2 (standby1, standby2)'
+));
+
+$primary->reload;
+
+$standby1->append_conf('postgresql.conf', "primary_conninfo = '$connstr_1 dbname=postgres application_name=standby1'");
+$standby1->reload;
+
+$standby2->append_conf('postgresql.conf', "primary_conninfo = '$connstr_1 dbname=postgres application_name=standby2'");
+$standby2->reload;
+
+# Check that synchronous replication is setup properly
+$standby2->stop;
+
+# Create some data on the primary
+$primary_row_count = 10;
+
+my $sync_back_q = $primary->background_psql(
+ 'postgres',
+ on_error_stop => 0,
+ timeout => $PostgreSQL::Test::Utils::timeout_default);
+
+$sync_back_q->query_until(qr/insert_blocked_on_sync_rep/, q(
+ \echo insert_blocked_on_sync_rep
+ INSERT INTO tab_int SELECT generate_series(1, 10);
+));
+
+$result = $primary->safe_psql('postgres',
+ "SELECT count(*) = $primary_row_count FROM tab_int;");
+
+is($result, 'f', "primary row count is not updated due to synchronous replication");
+
+# Verify the standby specified in synchronized_standby_slots (sb1_slot aka standby1)
+# catches up with the primary.
+$primary->wait_for_replay_catchup($standby1);
+$result = $standby1->safe_psql('postgres',
+ "SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't', "standby1 gets data from primary");
+
+# Validate that synchronized_standby_slots takes precedence over waiting for
+# changes to have been synchronous replicated.
+# Since the slot specified (sb1_slot) has received the changes, primary can send
+# the decoded changes to the subscription enabled for failover
+# (i.e. regress_mysub1).
+$primary->wait_for_catchup('regress_mysub1');
+$result = $subscriber1->safe_psql('postgres',
+ "SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't',
+ "subscriber1 gets data from primary after standby1 acknowledges changes");
+
+$standby2->start;
+$primary->wait_for_replay_catchup($standby2);
+
+# Unset synchronized_standby_slots to test synchronous replication
+# blocks primary from sending logical decoded changes to failover slots until
+# changes have been synchronously replicated.
+$primary->append_conf(
+ 'postgresql.conf', qq(
+ synchronized_standby_slots = ''));
+$primary->reload;
+
+$primary_row_count = 20;
+$standby2->stop;
+
+$sync_back_q->query_until(
+ qr/insert_blocked_on_additional_sync_rep/, q(
+ \echo insert_blocked_on_additional_sync_rep
+ INSERT INTO tab_int SELECT generate_series(11, 20);
+));
+
+# Since $standby2 has not received the changes, validate that subscriber1 (failover = true)
+# has not received the decoded changes, but subscriber2 (failover = false) has.
+$primary->wait_for_catchup('regress_mysub2');
+
+$result = $subscriber2->safe_psql('postgres',
+ "SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't',
+ "subscriber2 gets data from primary even if the changes have not been synchronously acknowledged.");
+
+$result = $subscriber1->safe_psql('postgres',
+ "SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 'f',
+ "subscriber1 does not get data from primary since changes have not been synchronously acknowledged.");
+
+# Start standby2 to allow the changes to be acknowledged by all the synchronous standbys.
+$standby2->start;
+$primary->wait_for_replay_catchup($standby2);
+$primary->wait_for_catchup('regress_mysub1');
+
+# Now that the changes have been replicated to all synchronous nodes,
+# primary can send the decoded changes to the subscription enabled for failover
+# (i.e. regress_mysub1). While the standby was down, regress_mysub1 didn't
+# receive any data from the primary. i.e. the primary didn't allow it to go
+# ahead of requirements of synchronous commit.
+$result = $subscriber1->safe_psql('postgres',
+ "SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't',
+ "subscriber1 gets data from primary since changes have been syhcronously acknowledged.");
+# No longer need to run the background session.
+$sync_back_q->quit;
+
+# Reset synchronized_standby_slots and synchronous commit for below test cases
+$primary->append_conf(
+ 'postgresql.conf', qq(
+synchronized_standby_slots = 'sb1_slot'
+synchronous_standby_names = ''
+));
+$primary->reload;
+
##################################################
# Verify that when using pg_logical_slot_get_changes to consume changes from a
# logical failover slot, it will also wait for the slots specified in
--
2.40.1
Hi,
On Mon, Jul 08, 2024 at 12:08:58PM -0700, John H wrote:
I took a deeper look at this with GDB and I think it's necessary to
cache the value of "mode".
We first check:if (mode == SYNC_REP_NO_WAIT)
return true;However after this check it's possible to receive a SIGHUP changing
SyncRepWaitMode
to SYNC_REP_NO_WAIT (e.g. synchronous_commit = 'on' -> 'off'), leading
to lsn[-1].
What about adding "SyncRepWaitMode" as a third StandbySlotsHaveCaughtup()
parameter then? (so that the function will used whatever value was passed during
the call).
2 ====
+ static XLogRecPtr lsn[NUM_SYNC_REP_WAIT_MODE] = {InvalidXLogRecPtr};
I did some testing and saw that the lsn[] values were not always set to
InvalidXLogRecPtr right after. It looks like that, in that case, we should
avoid setting the lsn[] values at compile time. Then, what about?1. remove the "static".
or
2. keep the static but set the lsn[] values after its declaration.
I'd prefer to keep the static because it reduces unnecessary
contention on SyncRepLock if logical client has fallen behind.
I'll add a change with your second suggestion.
Got it, you want lsn[] to be initialized only one time and that each call to
StandbySlotsHaveCaughtup() relies on the values that were previously stored in
lsn[] and then return if lsn[mode] >= wait_for_lsn.
Then I think that:
1 ===
That's worth additional comments in the code.
2 ===
+ if (!initialized)
+ {
+ for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
+ {
+ lsn[i] = InvalidXLogRecPtr;
+ }
+ }
Looks like setting initialized to true is missing once done.
Also,
3 ===
+ /* Cache values to reduce contention on lock */
+ for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
+ {
+ lsn[i] = walsndctl->lsn[i];
+ }
NUM_SYNC_REP_WAIT_MODE is small but as the goal is the keep the lock time as
short as possible I wonder if it wouldn't be better to use memcpy() here instead
of this for loop.
Regards,
--
Bertrand Drouvot
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
On Tue, Jul 9, 2024 at 12:42 AM John H <johnhyvr@gmail.com> wrote:
Can we make it a default
behavior that logical slots marked with a failover option will wait
for 'synchronous_standby_names' as per your patch's idea unless
'standby_slot_names' is specified? I don't know if there is any value
in setting the 'failover' option for a slot without specifying
'standby_slot_names', so was wondering if we can additionally tie it
to 'synchronous_standby_names'. Any better ideas?No, I think that works pretty cleanly actually. Reserving some special
keyword isn't great
which is the only other thing I can think of. I've updated the patch
and tests to reflect that.Attached the patch that addresses these changes.
Thank You for the patch. I like the overall idea, it is a useful one
and will make user experience better. Please find few comments.
1)
I agree with the idea that instead of introducing new GUC, we can make
failover logical slots wait on 'synchronous_standby_names', but this
will leave user no option to unlink failover-enabled logical
subscribers from the wait on synchronous standbys. We provide user a
way to switch off automatic slot-sync by disabling
'sync_replication_slots' and we also provide user a way to manually
sync the slots using function 'pg_sync_replication_slots()' and
nowhere we make it mandatory to set 'synchronized_standby_slots'; in
fact in docs, it is a recommended setting and not a mandatory one.
User can always create failover slots, switch off automatic slot sync
and disable wait on standbys by not specifying
'synchronized_standby_slots' and do the slot-sync and consistency
checks manually when needed. I feel, for worst case scenarios, we
should provide user an option to delink failover-enabled logical
subscriptions from 'synchronous_standby_names'.
We can have 'synchronized_standby_slots' (standby_slot_names) to
accept one such option as in 'SAME_AS_SYNCREP_STANDBYS' or say
'DEFAULT'. So when 'synchronous_standby_names' is comma separated
list, we pick those slots; if it is empty, then no wait on standbys,
and if its value is 'DEFAULT' as configured by user, then go with
'synchronous_standby_names'. Thoughts?
2)
When 'synchronized_standby_slots' is configured but standby named in
it is down blocking logical replication, then we get a WARNING in
subscriber's log file:
WARNING: replication slot "standby_2" specified in parameter
synchronized_standby_slots does not have active_pid.
DETAIL: Logical replication is waiting on the standby associated with
"standby_2".
HINT: Consider starting standby associated with "standby_2" or amend
parameter synchronized_standby_slots.
But OTOH, when 'synchronous_standby_names' is configured instead of
'synchronized_standby_slots' and any of the standbys listed is down
blocking logical replication, we do not get any sort of warning. It is
inconsistent behavior. Also user might be left clueless on why
subscribers are not getting changes.
thanks
Shveta
On Thu, Jul 18, 2024 at 9:25 AM shveta malik <shveta.malik@gmail.com> wrote:
On Tue, Jul 9, 2024 at 12:42 AM John H <johnhyvr@gmail.com> wrote:
Can we make it a default
behavior that logical slots marked with a failover option will wait
for 'synchronous_standby_names' as per your patch's idea unless
'standby_slot_names' is specified? I don't know if there is any value
in setting the 'failover' option for a slot without specifying
'standby_slot_names', so was wondering if we can additionally tie it
to 'synchronous_standby_names'. Any better ideas?No, I think that works pretty cleanly actually. Reserving some special
keyword isn't great
which is the only other thing I can think of. I've updated the patch
and tests to reflect that.Attached the patch that addresses these changes.
Thank You for the patch. I like the overall idea, it is a useful one
and will make user experience better. Please find few comments.1)
I agree with the idea that instead of introducing new GUC, we can make
failover logical slots wait on 'synchronous_standby_names', but this
will leave user no option to unlink failover-enabled logical
subscribers from the wait on synchronous standbys. We provide user a
way to switch off automatic slot-sync by disabling
'sync_replication_slots' and we also provide user a way to manually
sync the slots using function 'pg_sync_replication_slots()' and
nowhere we make it mandatory to set 'synchronized_standby_slots'; in
fact in docs, it is a recommended setting and not a mandatory one.
User can always create failover slots, switch off automatic slot sync
and disable wait on standbys by not specifying
'synchronized_standby_slots' and do the slot-sync and consistency
checks manually when needed. I feel, for worst case scenarios, we
should provide user an option to delink failover-enabled logical
subscriptions from 'synchronous_standby_names'.We can have 'synchronized_standby_slots' (standby_slot_names) to
accept one such option as in 'SAME_AS_SYNCREP_STANDBYS' or say
'DEFAULT'. So when 'synchronous_standby_names' is comma separated
list, we pick those slots; if it is empty, then no wait on standbys,
and if its value is 'DEFAULT' as configured by the user, then go with
'synchronous_standby_names'. Thoughts?
One correction here
('synchronous_standby_names-->synchronized_standby_slots). Corrected
version:
So when 'synchronized_standby_slots' is comma separated list, we pick
those slots; if it is empty, then no wait on standbys, and if its
value is 'DEFAULT' as configured by user, then go with
'synchronous_standby_names'. Thoughts?
Show quoted text
2)
When 'synchronized_standby_slots' is configured but standby named in
it is down blocking logical replication, then we get a WARNING in
subscriber's log file:WARNING: replication slot "standby_2" specified in parameter
synchronized_standby_slots does not have active_pid.
DETAIL: Logical replication is waiting on the standby associated with
"standby_2".
HINT: Consider starting standby associated with "standby_2" or amend
parameter synchronized_standby_slots.But OTOH, when 'synchronous_standby_names' is configured instead of
'synchronized_standby_slots' and any of the standbys listed is down
blocking logical replication, we do not get any sort of warning. It is
inconsistent behavior. Also user might be left clueless on why
subscribers are not getting changes.thanks
Shveta
Hi Shveta,
Thanks for taking a look at the patch.
will leave user no option to unlink failover-enabled logical
subscribers from the wait on synchronous standbys.
That's a good point. I'm a bit biased in that I don't think there's a
great reason why someone would
want to replicate logical changes out of the synchronous cluster
without it having been synchronously replicated
but yes this would be different behavior compared to strictly the slot one.
...
So when 'synchronized_standby_slots' is comma separated list, we pick
those slots; if it is empty, then no wait on standbys, and if its
value is 'DEFAULT' as configured by user, then go with
'synchronous_standby_names'. Thoughts?
I think I'd prefer having a separate GUC if the alternative is to reserve
special keywords in 'synchronized_standby_slots' but I'm not sure if I
feel strongly about that.
2)
When 'synchronized_standby_slots' is configured but standby named in
it is down blocking logical replication, then we get a WARNING in
subscriber's log file:WARNING: replication slot "standby_2" specified in parameter
synchronized_standby_slots does not have active_pid.
DETAIL: Logical replication is waiting on the standby associated with
"standby_2".
HINT: Consider starting standby associated with "standby_2" or amend
parameter synchronized_standby_slots.But OTOH, when 'synchronous_standby_names' is configured instead of
'synchronized_standby_slots' and any of the standbys listed is down
blocking logical replication, we do not get any sort of warning. It is
inconsistent behavior. Also user might be left clueless on why
subscribers are not getting changes.
Ah that's a gap. Let me add some logging/warning in a similar fashion.
Although I think I'd have the warning be relatively generic (e.g.
changes are blocked because
they're not synchronously committed)
Thanks,
--
John Hsu - Amazon Web Services
Hi Bertrand,
1 ===
...
That's worth additional comments in the code.
There's this comment already about caching the value already, not sure
if you prefer something more?
/* Cache values to reduce contention on lock */
2 ===
...
Looks like setting initialized to true is missing once done.
Thanks, will update.
3 ===
...
NUM_SYNC_REP_WAIT_MODE is small but as the goal is the keep the lock time as
short as possible I wonder if it wouldn't be better to use memcpy() here instead
of this for loop.
It results in a "Wdiscarded-qualifiers" which is safe given we take
the lock, but adds noise?
What do you think?
"slot.c:2756:46: warning: passing argument 2 of ‘memcpy’ discards
‘volatile’ qualifier from pointer target type
[-Wdiscarded-qualifiers]"
Thanks,
--
John Hsu - Amazon Web Services
On Fri, Jul 19, 2024 at 2:52 AM John H <johnhyvr@gmail.com> wrote:
Hi Shveta,
Thanks for taking a look at the patch.
will leave user no option to unlink failover-enabled logical
subscribers from the wait on synchronous standbys.That's a good point. I'm a bit biased in that I don't think there's a
great reason why someone would
want to replicate logical changes out of the synchronous cluster
without it having been synchronously replicated
but yes this would be different behavior compared to strictly the slot one....
So when 'synchronized_standby_slots' is comma separated list, we pick
those slots; if it is empty, then no wait on standbys, and if its
value is 'DEFAULT' as configured by user, then go with
'synchronous_standby_names'. Thoughts?I think I'd prefer having a separate GUC if the alternative is to reserve
special keywords in 'synchronized_standby_slots' but I'm not sure if I
feel strongly about that.
My only concern is, earlier we provided a way to set the failover
property of slots even without mandatorily wait on physical standbys.
But now we will be changing this behaviour. Okay, we can see what
other comments. If we plan to go this way, we can change docs to
clearly mention this.
2)
When 'synchronized_standby_slots' is configured but standby named in
it is down blocking logical replication, then we get a WARNING in
subscriber's log file:WARNING: replication slot "standby_2" specified in parameter
synchronized_standby_slots does not have active_pid.
DETAIL: Logical replication is waiting on the standby associated with
"standby_2".
HINT: Consider starting standby associated with "standby_2" or amend
parameter synchronized_standby_slots.But OTOH, when 'synchronous_standby_names' is configured instead of
'synchronized_standby_slots' and any of the standbys listed is down
blocking logical replication, we do not get any sort of warning. It is
inconsistent behavior. Also user might be left clueless on why
subscribers are not getting changes.Ah that's a gap. Let me add some logging/warning in a similar fashion.
Although I think I'd have the warning be relatively generic (e.g.
changes are blocked because
they're not synchronously committed)
okay, sounds good.
thanks
Shveta
On Mon, Jul 22, 2024 at 9:12 AM shveta malik <shveta.malik@gmail.com> wrote:
On Fri, Jul 19, 2024 at 2:52 AM John H <johnhyvr@gmail.com> wrote:
Hi Shveta,
Thanks for taking a look at the patch.
will leave user no option to unlink failover-enabled logical
subscribers from the wait on synchronous standbys.That's a good point. I'm a bit biased in that I don't think there's a
great reason why someone would
want to replicate logical changes out of the synchronous cluster
without it having been synchronously replicated
but yes this would be different behavior compared to strictly the slot one....
So when 'synchronized_standby_slots' is comma separated list, we pick
those slots; if it is empty, then no wait on standbys, and if its
value is 'DEFAULT' as configured by user, then go with
'synchronous_standby_names'. Thoughts?I think I'd prefer having a separate GUC if the alternative is to reserve
special keywords in 'synchronized_standby_slots' but I'm not sure if I
feel strongly about that.My only concern is, earlier we provided a way to set the failover
property of slots even without mandatorily wait on physical standbys.
But now we will be changing this behaviour.
Adding a new GUC as John suggests addressing this concern is one way
to go but we should think some more before adding a new GUC. Then
second as you are proposing to add a special value for GUC
synchronized_standby_slots will also have a downside in that it will
create dependency among two GUCs (synchronized_standby_slots and
synchronous_standby_names) which can also make the code complex.
Yet another possibility is to have a slot-level parameter (something
like failover_wait_for) which can be used to decide the GUC preference
for failover-enabled slots.
As this is a new feature and we haven't got much feedback from users
so like John, I am also not very sure how much merit we have in
retaining the old behavior where failover slots don't need to wait for
any of the standbys. But anyway, we have at least some escape route
where logical subscribers keep on waiting for some physical standby
that is down to come back and one may want to use that in some
situations, so there is clearly some value in retaining old behavior.
--
With Regards,
Amit Kapila.
On Tue, Jul 9, 2024 at 12:39 AM John H <johnhyvr@gmail.com> wrote:
Out of curiosity, did you compare with standby_slot_names_from_syncrep set to off
and standby_slot_names not empty?I didn't think 'standby_slot_names' would impact TPS as much since
it's not grabbing the SyncRepLock but here's a quick test.
Writer with 5 synchronous replicas, 10 pg_recvlogical clients and
pgbench all running from the same server.Command: pgbench -c 4 -j 4 -T 600 -U "ec2-user" -d postgres -r -P 5
Result with: standby_slot_names =
'replica_1,replica_2,replica_3,replica_4,replica_5'latency average = 5.600 ms
latency stddev = 2.854 ms
initial connection time = 5.503 ms
tps = 714.148263 (without initial connection time)Result with: standby_slot_names_from_syncrep = 'true',
synchronous_standby_names = 'ANY 3 (A,B,C,D,E)'latency average = 5.740 ms
latency stddev = 2.543 ms
initial connection time = 4.093 ms
tps = 696.776249 (without initial connection time)Result with nothing set:
latency average = 5.090 ms
latency stddev = 3.467 ms
initial connection time = 4.989 ms
tps = 785.665963 (without initial connection time)Again I think it's possible to improve the synchronous numbers if we
cache but I'll try that out in a bit.
Okay, so the tests done till now conclude that we won't get the
benefit by using 'standby_slot_names_from_syncrep'. Now, if we
increase the number of standby's in both lists and still keep ANY 3 in
synchronous_standby_names then the results may vary. We should try to
find out if there is a performance benefit with the use of
synchronous_standby_names in the normal configurations like the one
you used in the above tests to prove the value of this patch.
--
With Regards,
Amit Kapila.
On Tue, Jul 23, 2024 at 10:35 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Tue, Jul 9, 2024 at 12:39 AM John H <johnhyvr@gmail.com> wrote:
Out of curiosity, did you compare with standby_slot_names_from_syncrep set to off
and standby_slot_names not empty?I didn't think 'standby_slot_names' would impact TPS as much since
it's not grabbing the SyncRepLock but here's a quick test.
Writer with 5 synchronous replicas, 10 pg_recvlogical clients and
pgbench all running from the same server.Command: pgbench -c 4 -j 4 -T 600 -U "ec2-user" -d postgres -r -P 5
Result with: standby_slot_names =
'replica_1,replica_2,replica_3,replica_4,replica_5'latency average = 5.600 ms
latency stddev = 2.854 ms
initial connection time = 5.503 ms
tps = 714.148263 (without initial connection time)Result with: standby_slot_names_from_syncrep = 'true',
synchronous_standby_names = 'ANY 3 (A,B,C,D,E)'latency average = 5.740 ms
latency stddev = 2.543 ms
initial connection time = 4.093 ms
tps = 696.776249 (without initial connection time)Result with nothing set:
latency average = 5.090 ms
latency stddev = 3.467 ms
initial connection time = 4.989 ms
tps = 785.665963 (without initial connection time)Again I think it's possible to improve the synchronous numbers if we
cache but I'll try that out in a bit.Okay, so the tests done till now conclude that we won't get the
benefit by using 'standby_slot_names_from_syncrep'. Now, if we
increase the number of standby's in both lists and still keep ANY 3 in
synchronous_standby_names then the results may vary. We should try to
find out if there is a performance benefit with the use of
synchronous_standby_names in the normal configurations like the one
you used in the above tests to prove the value of this patch.
I didn't fully understand the parameters mentioned above, specifically
what 'latency stddev' and 'latency average' represent.. But shouldn't
we see the benefit/value of this patch by having a setup where a
particular standby is slow in sending the response back to primary
(could be due to network lag or other reasons) and then measuring the
latency in receiving changes on failover-enabled logical subscribers?
We can perform this test with both of the below settings and say make
D and E slow in sending responses:
1) synchronous_standby_names = 'ANY 3 (A,B,C,D,E)'
2) standby_slot_names = A_slot, B_slot, C_slot, D_slot, E_slot.
thanks
Shveta
On Fri, Jul 26, 2024 at 3:28 PM shveta malik <shveta.malik@gmail.com> wrote:
On Tue, Jul 23, 2024 at 10:35 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Tue, Jul 9, 2024 at 12:39 AM John H <johnhyvr@gmail.com> wrote:
Out of curiosity, did you compare with standby_slot_names_from_syncrep set to off
and standby_slot_names not empty?I didn't think 'standby_slot_names' would impact TPS as much since
it's not grabbing the SyncRepLock but here's a quick test.
Writer with 5 synchronous replicas, 10 pg_recvlogical clients and
pgbench all running from the same server.Command: pgbench -c 4 -j 4 -T 600 -U "ec2-user" -d postgres -r -P 5
Result with: standby_slot_names =
'replica_1,replica_2,replica_3,replica_4,replica_5'latency average = 5.600 ms
latency stddev = 2.854 ms
initial connection time = 5.503 ms
tps = 714.148263 (without initial connection time)Result with: standby_slot_names_from_syncrep = 'true',
synchronous_standby_names = 'ANY 3 (A,B,C,D,E)'latency average = 5.740 ms
latency stddev = 2.543 ms
initial connection time = 4.093 ms
tps = 696.776249 (without initial connection time)Result with nothing set:
latency average = 5.090 ms
latency stddev = 3.467 ms
initial connection time = 4.989 ms
tps = 785.665963 (without initial connection time)Again I think it's possible to improve the synchronous numbers if we
cache but I'll try that out in a bit.Okay, so the tests done till now conclude that we won't get the
benefit by using 'standby_slot_names_from_syncrep'. Now, if we
increase the number of standby's in both lists and still keep ANY 3 in
synchronous_standby_names then the results may vary. We should try to
find out if there is a performance benefit with the use of
synchronous_standby_names in the normal configurations like the one
you used in the above tests to prove the value of this patch.I didn't fully understand the parameters mentioned above, specifically
what 'latency stddev' and 'latency average' represent.. But shouldn't
we see the benefit/value of this patch by having a setup where a
particular standby is slow in sending the response back to primary
(could be due to network lag or other reasons) and then measuring the
latency in receiving changes on failover-enabled logical subscribers?
We can perform this test with both of the below settings and say make
D and E slow in sending responses:
1) synchronous_standby_names = 'ANY 3 (A,B,C,D,E)'
2) standby_slot_names = A_slot, B_slot, C_slot, D_slot, E_slot.
Yes, I also expect the patch should perform better in such a scenario
but it is better to test it. Also, irrespective of that, we should
investigate why the reported case is slower for
synchronous_standby_names and see if we can improve it.
BTW, you for 2), I think you wanted to say synchronized_standby_slots,
not standby_slot_names. We have recently changed the GUC name.
--
With Regards,
Amit Kapila.
On Fri, Jul 26, 2024 at 5:11 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Fri, Jul 26, 2024 at 3:28 PM shveta malik <shveta.malik@gmail.com> wrote:
On Tue, Jul 23, 2024 at 10:35 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Tue, Jul 9, 2024 at 12:39 AM John H <johnhyvr@gmail.com> wrote:
Out of curiosity, did you compare with standby_slot_names_from_syncrep set to off
and standby_slot_names not empty?I didn't think 'standby_slot_names' would impact TPS as much since
it's not grabbing the SyncRepLock but here's a quick test.
Writer with 5 synchronous replicas, 10 pg_recvlogical clients and
pgbench all running from the same server.Command: pgbench -c 4 -j 4 -T 600 -U "ec2-user" -d postgres -r -P 5
Result with: standby_slot_names =
'replica_1,replica_2,replica_3,replica_4,replica_5'latency average = 5.600 ms
latency stddev = 2.854 ms
initial connection time = 5.503 ms
tps = 714.148263 (without initial connection time)Result with: standby_slot_names_from_syncrep = 'true',
synchronous_standby_names = 'ANY 3 (A,B,C,D,E)'latency average = 5.740 ms
latency stddev = 2.543 ms
initial connection time = 4.093 ms
tps = 696.776249 (without initial connection time)Result with nothing set:
latency average = 5.090 ms
latency stddev = 3.467 ms
initial connection time = 4.989 ms
tps = 785.665963 (without initial connection time)Again I think it's possible to improve the synchronous numbers if we
cache but I'll try that out in a bit.Okay, so the tests done till now conclude that we won't get the
benefit by using 'standby_slot_names_from_syncrep'. Now, if we
increase the number of standby's in both lists and still keep ANY 3 in
synchronous_standby_names then the results may vary. We should try to
find out if there is a performance benefit with the use of
synchronous_standby_names in the normal configurations like the one
you used in the above tests to prove the value of this patch.I didn't fully understand the parameters mentioned above, specifically
what 'latency stddev' and 'latency average' represent.. But shouldn't
we see the benefit/value of this patch by having a setup where a
particular standby is slow in sending the response back to primary
(could be due to network lag or other reasons) and then measuring the
latency in receiving changes on failover-enabled logical subscribers?
We can perform this test with both of the below settings and say make
D and E slow in sending responses:
1) synchronous_standby_names = 'ANY 3 (A,B,C,D,E)'
2) standby_slot_names = A_slot, B_slot, C_slot, D_slot, E_slot.Yes, I also expect the patch should perform better in such a scenario
but it is better to test it. Also, irrespective of that, we should
investigate why the reported case is slower for
synchronous_standby_names and see if we can improve it.
+1
BTW, you for 2), I think you wanted to say synchronized_standby_slots,
not standby_slot_names. We have recently changed the GUC name.
yes, sorry, synchronized_standby_slots it is.
thanks
Shveta
Hi John,
On Thu, Jul 18, 2024 at 02:22:08PM -0700, John H wrote:
Hi Bertrand,
1 ===
...
That's worth additional comments in the code.There's this comment already about caching the value already, not sure
if you prefer something more?/* Cache values to reduce contention on lock */
Yeah, at the same place as the static lsn[] declaration, something like:
static XLogRecPtr lsn[NUM_SYNC_REP_WAIT_MODE]; /* cached LSNs */
but that may just be a matter of taste.
3 ===
...
NUM_SYNC_REP_WAIT_MODE is small but as the goal is the keep the lock time as
short as possible I wonder if it wouldn't be better to use memcpy() here instead
of this for loop.It results in a "Wdiscarded-qualifiers" which is safe given we take
the lock, but adds noise?
What do you think?"slot.c:2756:46: warning: passing argument 2 of ‘memcpy’ discards
‘volatile’ qualifier from pointer target type
[-Wdiscarded-qualifiers]"
Right, we may want to cast it then but given that the for loop is "small" I think
that's also fine to keep the for loop.
Regards,
--
Bertrand Drouvot
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
Hi Shveta,
On Sun, Jul 21, 2024 at 8:42 PM shveta malik <shveta.malik@gmail.com> wrote:
Ah that's a gap. Let me add some logging/warning in a similar fashion.
Although I think I'd have the warning be relatively generic (e.g.
changes are blocked because
they're not synchronously committed)okay, sounds good.
thanks
Shveta
I took a look at having similar warnings the existing
'synchronized_standby_slots' feature has
and I don't think it's particularly feasible.
The checks/warnings for 'synchronized_standby_slots' are intended to
protect against misconfiguration.
They consist of slot validation (valid slot_name, not logical slot,
slot has not been invalidated), and
whether or not the slot is active.
I don't think there's a "misconfiguration" equivalent for waiting on
synchronous_commit.
With the current proposal, once you have (synchronous_commit enabled
&& failover_slots), logical
decoding is dependent on whether or not the writes have been
replicated to a synchronous replica.
If there is no data being replicated out of the logical slot, it is
because from the perspective of the
database no writes have been committed yet. I don't think it would
make sense to add logging/warning as to
why a transaction is still not committed (e.g. which potential replica
is the one lagging). There isn't a
nice way to determine why synchronous commit is waiting without being
particularly invasive, and even then
it could be particularly noisy (e.g. provide all the application_names
that we are potentially waiting on).
Thanks,
--
John Hsu - Amazon Web Services
Hi Shveta, Amit,
... We should try to
find out if there is a performance benefit with the use of
synchronous_standby_names in the normal configurations like the one
you used in the above tests to prove the value of this patch.
I don't expect there to be a performance benefit, if anything I would
expect it to perform
slightly worse because of the contention on SyncRepLock. The main
value of the patch
for me is it makes it easy for administrators to set the parameter and
avoid having to
re-toggle configuration if they want very up-to-date logical clients
when one of the
replicas they previously specified in 'synchronized_standby_slots ' starts being
unavailable in a synchronous configuration setup.
I didn't fully understand the parameters mentioned above, specifically
what 'latency stddev' and 'latency average' represent
If I understand correctly, latency is just representing the average latency of
each transaction from commit, while stddev is the standard deviation of these
transactions.
Yes, I also expect the patch should perform better in such a scenario
but it is better to test it. Also, irrespective of that, we should
investigate why the reported case is slower for
synchronous_standby_names and see if we can improve it.
We could test it but I'm not sure how interesting it is since depending
on how much the chosen slot in 'synchronized_standby_slots' lags behind
we can easily show that this patch will perform better.
For instance, in Shveta's suggestion of
We can perform this test with both of the below settings and say make
D and E slow in sending responses:
1) synchronous_standby_names = 'ANY 3 (A,B,C,D,E)'
2) standby_slot_names = A_slot, B_slot, C_slot, D_slot, E_slot.
if the server associated with E_slot is just down or undergoing
some sort of maintenance, then all logical consumers would start lagging until
the server is back up. I could also mimic a network lag of 20 seconds
and it's guaranteed
that this patch will perform better.
I re-ran the benchmarks with a longer run time of 3 hours, and testing
a new shared cache
for walsenders to check the value before obtaining the SyncRepLock.
I also saw I was being throttled on storage in my previous benchmarks
so I moved to a new setup.
I benchmarked a new test case with an additional shared cache between
all the walsenders to
reduce potential contention on SyncRepLock, and have attached said patch.
Database: Writer on it's own disk, 5 RRs on the other disk together
Client: 10 logical clients, pgbench running from here as well
'pgbench -c 32 -j 4 -T 10800 -U "ec2-user" -d postgres -r -P 1'
# Test failover_slots with synchronized_standby_slots = 'rr_1, rr_2,
rr_3, rr_4, rr_5'
latency average = 10.683 ms
latency stddev = 11.851 ms
initial connection time = 145.876 ms
tps = 2994.595673 (without initial connection time)
# Test failover_slots waiting on sync_rep no new shared cache
latency average = 10.684 ms
latency stddev = 12.247 ms
initial connection time = 142.561 ms
tps = 2994.160136 (without initial connection time)
statement latencies in milliseconds and failures:
# Test failover slots with additional shared cache
latency average = 10.674 ms
latency stddev = 11.917 ms
initial connection time = 142.486 ms
tps = 2997.315874 (without initial connection time)
The tps improvement between no cache and shared_cache seems marginal, but we do
see the slight improvement in stddev which makes sense from a
contention perspective.
I think the cache would demonstrate a lot more improvement if we had
say 1000 logical slots
and all of them are trying to obtain SyncRepLock for updating its values.
I've attached the patch but don't feel particularly strongly about the
new shared LSN values.
Thanks,
--
John Hsu - Amazon Web Services
Attachments:
0003-Wait-on-synchronous-replication-by-default-for-logic.patchapplication/octet-stream; name=0003-Wait-on-synchronous-replication-by-default-for-logic.patchDownload
From 627a468a1402fde3632c8d409a32a4fceaf8cfcc Mon Sep 17 00:00:00 2001
From: John Hsu <johnhyvr@gmail.com>
Date: Mon, 26 Aug 2024 18:23:20 +0000
Subject: [PATCH] Wait on synchronous replication by default for logical
failover slots
If synchronous replication is enabled, this patch
allows logical subscribers with failover_slots enabled
to wait for changes to be replicated to synchronous replicas
before consuming the changes.
---
src/backend/replication/slot.c | 289 +++++++++++-------
src/backend/replication/syncrep.c | 27 +-
.../utils/activity/wait_event_names.txt | 1 +
src/include/replication/syncrep.h | 3 +
src/include/replication/walsender_private.h | 6 +
src/include/storage/lwlocklist.h | 1 +
.../t/040_standby_failover_slots_sync.pl | 135 ++++++++
7 files changed, 346 insertions(+), 116 deletions(-)
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index c290339af5..22b0772eee 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -49,6 +49,7 @@
#include "postmaster/interrupt.h"
#include "replication/slotsync.h"
#include "replication/slot.h"
+#include "replication/syncrep.h"
#include "replication/walsender_private.h"
#include "storage/fd.h"
#include "storage/ipc.h"
@@ -2591,8 +2592,8 @@ SlotExistsInSyncStandbySlots(const char *slot_name)
}
/*
- * Return true if the slots specified in synchronized_standby_slots have caught up to
- * the given WAL location, false otherwise.
+ * Return true if the slots specified in synchronized_standby_slots or synchronous
+ * replication have caught up to the given WAL location, false otherwise.
*
* The elevel parameter specifies the error level used for logging messages
* related to slots that do not exist, are invalidated, or are inactive.
@@ -2606,9 +2607,9 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
/*
* Don't need to wait for the standbys to catch up if there is no value in
- * synchronized_standby_slots.
+ * synchronized_standby_slots or synchronous replication is not configured.
*/
- if (synchronized_standby_slots_config == NULL)
+ if (synchronized_standby_slots_config == NULL && !SyncRepConfigured())
return true;
/*
@@ -2619,144 +2620,206 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
return true;
/*
- * Don't need to wait for the standbys to catch up if they are already
- * beyond the specified WAL location.
+ * In the event that synchronized_standby_slots and synchronous replication is
+ * configured, have the former take precedence.
*/
- if (!XLogRecPtrIsInvalid(ss_oldest_flush_lsn) &&
- ss_oldest_flush_lsn >= wait_for_lsn)
- return true;
-
- /*
- * To prevent concurrent slot dropping and creation while filtering the
- * slots, take the ReplicationSlotControlLock outside of the loop.
- */
- LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
-
- name = synchronized_standby_slots_config->slot_names;
- for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
+ if (synchronized_standby_slots_config != NULL)
{
- XLogRecPtr restart_lsn;
- bool invalidated;
- bool inactive;
- ReplicationSlot *slot;
-
- slot = SearchNamedReplicationSlot(name, false);
+ /*
+ * Don't need to wait for the standbys to catch up if they are already
+ * beyond the specified WAL location.
+ */
+ if (!XLogRecPtrIsInvalid(ss_oldest_flush_lsn) &&
+ ss_oldest_flush_lsn >= wait_for_lsn)
+ return true;
- if (!slot)
+ /*
+ * To prevent concurrent slot dropping and creation while filtering the
+ * slots, take the ReplicationSlotControlLock outside of the loop.
+ */
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+ name = synchronized_standby_slots_config->slot_names;
+ for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
{
- /*
- * If a slot name provided in synchronized_standby_slots does not
- * exist, report a message and exit the loop. A user can specify a
- * slot name that does not exist just before the server startup.
- * The GUC check_hook(validate_sync_standby_slots) cannot validate
- * such a slot during startup as the ReplicationSlotCtl shared
- * memory is not initialized at that time. It is also possible for
- * a user to drop the slot in synchronized_standby_slots
- * afterwards.
- */
- ereport(elevel,
- errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("replication slot \"%s\" specified in parameter %s does not exist",
- name, "synchronized_standby_slots"),
- errdetail("Logical replication is waiting on the standby associated with \"%s\".",
- name),
- errhint("Consider creating the slot \"%s\" or amend parameter %s.",
- name, "synchronized_standby_slots"));
- break;
- }
+ XLogRecPtr restart_lsn;
+ bool invalidated;
+ bool inactive;
+ ReplicationSlot *slot;
- if (SlotIsLogical(slot))
- {
- /*
- * If a logical slot name is provided in
- * synchronized_standby_slots, report a message and exit the loop.
- * Similar to the non-existent case, a user can specify a logical
- * slot name in synchronized_standby_slots before the server
- * startup, or drop an existing physical slot and recreate a
- * logical slot with the same name.
- */
- ereport(elevel,
- errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("cannot have logical replication slot \"%s\" in parameter %s",
- name, "synchronized_standby_slots"),
- errdetail("Logical replication is waiting for correction on \"%s\".",
- name),
- errhint("Consider removing logical slot \"%s\" from parameter %s.",
- name, "synchronized_standby_slots"));
- break;
- }
+ slot = SearchNamedReplicationSlot(name, false);
- SpinLockAcquire(&slot->mutex);
- restart_lsn = slot->data.restart_lsn;
- invalidated = slot->data.invalidated != RS_INVAL_NONE;
- inactive = slot->active_pid == 0;
- SpinLockRelease(&slot->mutex);
+ if (!slot)
+ {
+ /*
+ * If a slot name provided in synchronized_standby_slots does not
+ * exist, report a message and exit the loop. A user can specify a
+ * slot name that does not exist just before the server startup.
+ * The GUC check_hook(validate_sync_standby_slots) cannot validate
+ * such a slot during startup as the ReplicationSlotCtl shared
+ * memory is not initialized at that time. It is also possible for
+ * a user to drop the slot in synchronized_standby_slots
+ * afterwards.
+ */
+ ereport(elevel,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("replication slot \"%s\" specified in parameter %s does not exist",
+ name, "synchronized_standby_slots"),
+ errdetail("Logical replication is waiting on the standby associated with \"%s\".",
+ name),
+ errhint("Consider creating the slot \"%s\" or amend parameter %s.",
+ name, "synchronized_standby_slots"));
+ break;
+ }
- if (invalidated)
- {
- /* Specified physical slot has been invalidated */
- ereport(elevel,
- errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("physical slot \"%s\" specified in parameter %s has been invalidated",
- name, "synchronized_standby_slots"),
- errdetail("Logical replication is waiting on the standby associated with \"%s\".",
- name),
- errhint("Consider dropping and recreating the slot \"%s\" or amend parameter %s.",
- name, "synchronized_standby_slots"));
- break;
- }
+ if (SlotIsLogical(slot))
+ {
+ /*
+ * If a logical slot name is provided in
+ * synchronized_standby_slots, report a message and exit the loop.
+ * Similar to the non-existent case, a user can specify a logical
+ * slot name in synchronized_standby_slots before the server
+ * startup, or drop an existing physical slot and recreate a
+ * logical slot with the same name.
+ */
+ ereport(elevel,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("cannot have logical replication slot \"%s\" in parameter %s",
+ name, "synchronized_standby_slots"),
+ errdetail("Logical replication is waiting for correction on \"%s\".",
+ name),
+ errhint("Consider removing logical slot \"%s\" from parameter %s.",
+ name, "synchronized_standby_slots"));
+ break;
+ }
- if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn < wait_for_lsn)
- {
- /* Log a message if no active_pid for this physical slot */
- if (inactive)
+ SpinLockAcquire(&slot->mutex);
+ restart_lsn = slot->data.restart_lsn;
+ invalidated = slot->data.invalidated != RS_INVAL_NONE;
+ inactive = slot->active_pid == 0;
+ SpinLockRelease(&slot->mutex);
+
+ if (invalidated)
+ {
+ /* Specified physical slot has been invalidated */
ereport(elevel,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("replication slot \"%s\" specified in parameter %s does not have active_pid",
- name, "synchronized_standby_slots"),
+ errmsg("physical slot \"%s\" specified in parameter %s has been invalidated",
+ name, "synchronized_standby_slots"),
errdetail("Logical replication is waiting on the standby associated with \"%s\".",
- name),
- errhint("Consider starting standby associated with \"%s\" or amend parameter %s.",
+ name),
+ errhint("Consider dropping and recreating the slot \"%s\" or amend parameter %s.",
name, "synchronized_standby_slots"));
+ break;
+ }
- /* Continue if the current slot hasn't caught up. */
- break;
+ if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn < wait_for_lsn)
+ {
+ /* Log a message if no active_pid for this physical slot */
+ if (inactive)
+ ereport(elevel,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("replication slot \"%s\" specified in parameter %s does not have active_pid",
+ name, "synchronized_standby_slots"),
+ errdetail("Logical replication is waiting on the standby associated with \"%s\".",
+ name),
+ errhint("Consider starting standby associated with \"%s\" or amend parameter %s.",
+ name, "synchronized_standby_slots"));
+
+ /* Continue if the current slot hasn't caught up. */
+ break;
+ }
+
+ Assert(restart_lsn >= wait_for_lsn);
+
+ if (XLogRecPtrIsInvalid(min_restart_lsn) ||
+ min_restart_lsn > restart_lsn)
+ min_restart_lsn = restart_lsn;
+
+ caught_up_slot_num++;
+
+ name += strlen(name) + 1;
}
- Assert(restart_lsn >= wait_for_lsn);
+ LWLockRelease(ReplicationSlotControlLock);
+
+ /*
+ * Return false if not all the standbys have caught up to the specified
+ * WAL location.
+ */
+ if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
+ return false;
- if (XLogRecPtrIsInvalid(min_restart_lsn) ||
- min_restart_lsn > restart_lsn)
- min_restart_lsn = restart_lsn;
+ /* The ss_oldest_flush_lsn must not retreat. */
+ Assert(XLogRecPtrIsInvalid(ss_oldest_flush_lsn) ||
+ min_restart_lsn >= ss_oldest_flush_lsn);
- caught_up_slot_num++;
+ ss_oldest_flush_lsn = min_restart_lsn;
- name += strlen(name) + 1;
+ return true;
}
+ else
+ {
+ volatile WalSndCtlData *walsndctl = WalSndCtl;
+ static XLogRecPtr lsn[NUM_SYNC_REP_WAIT_MODE]; /* Cache LSNs */
+ static bool initialized = false;
+ /* Initialize value in case SIGHUP changing to SYNC_REP_NO_WAIT */
+ int mode = SyncRepWaitMode;
+ int i;
+
+ if (!initialized)
+ {
+ for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
+ {
+ lsn[i] = InvalidXLogRecPtr;
+ }
+ }
- LWLockRelease(ReplicationSlotControlLock);
+ if (mode == SYNC_REP_NO_WAIT)
+ return true;
- /*
- * Return false if not all the standbys have caught up to the specified
- * WAL location.
- */
- if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
- return false;
+ if (lsn[mode] >= wait_for_lsn)
+ return true;
+
+ /* Check shared cache first to see if values updated */
+ LWLockAcquire(SyncRepLogicalCacheLock, LW_SHARED);
+ if (walsndctl->cached_lsn[mode] >= wait_for_lsn)
+ {
+ memcpy(lsn, (XLogRecPtr *) walsndctl->cached_lsn, sizeof(lsn));
+ LWLockRelease(SyncRepLogicalCacheLock);
+ return true;
+ }
- /* The ss_oldest_flush_lsn must not retreat. */
- Assert(XLogRecPtrIsInvalid(ss_oldest_flush_lsn) ||
- min_restart_lsn >= ss_oldest_flush_lsn);
+ LWLockRelease(SyncRepLogicalCacheLock);
- ss_oldest_flush_lsn = min_restart_lsn;
+ LWLockAcquire(SyncRepLogicalCacheLock, LW_EXCLUSIVE);
+ /* Another walsender could have updated shared cache prior to obtaining lock */
+ if (walsndctl->cached_lsn[mode] >= wait_for_lsn)
+ {
+ memcpy(lsn, (XLogRecPtr *) walsndctl->cached_lsn, sizeof(lsn));
+ LWLockRelease(SyncRepLogicalCacheLock);
+ return true;
+ }
- return true;
+ /* Cache values to reduce contention */
+ LWLockAcquire(SyncRepLock, LW_SHARED);
+ memcpy((XLogRecPtr *) walsndctl->cached_lsn, (XLogRecPtr *) walsndctl->lsn, sizeof(lsn));
+ LWLockRelease(SyncRepLock);
+ memcpy(lsn, (XLogRecPtr *) walsndctl->lsn, sizeof(lsn));
+ LWLockRelease(SyncRepLogicalCacheLock);
+
+ if (lsn[mode] >= wait_for_lsn)
+ return true;
+
+ return false;
+ }
}
/*
* Wait for physical standbys to confirm receiving the given lsn.
*
* Used by logical decoding SQL functions. It waits for physical standbys
- * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
+ * corresponding to the physical slots specified in the synchronized_standby_slots GUC,
+ * or synchronous replication.
*/
void
WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
@@ -2766,7 +2829,7 @@ WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
* slot is not a logical failover slot, or there is no value in
* synchronized_standby_slots.
*/
- if (!MyReplicationSlot->data.failover || !synchronized_standby_slots_config)
+ if (!MyReplicationSlot->data.failover || !(synchronized_standby_slots_config || SyncRepConfigured()))
return;
ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index fa5988c824..69c061d8dc 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -78,6 +78,7 @@
#include "common/int.h"
#include "miscadmin.h"
#include "pgstat.h"
+#include "replication/slot.h"
#include "replication/syncrep.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
@@ -95,7 +96,7 @@ char *SyncRepStandbyNames;
static bool announce_next_takeover = true;
SyncRepConfigData *SyncRepConfig = NULL;
-static int SyncRepWaitMode = SYNC_REP_NO_WAIT;
+int SyncRepWaitMode = SYNC_REP_NO_WAIT;
static void SyncRepQueueInsert(int mode);
static void SyncRepCancelWait(void);
@@ -124,6 +125,8 @@ static int cmp_lsn(const void *a, const void *b);
static bool SyncRepQueueIsOrderedByLSN(int mode);
#endif
+bool SyncRepConfigured(void);
+
/*
* ===========================================================
* Synchronous Replication functions for normal user backends
@@ -169,8 +172,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
* described in SyncRepUpdateSyncStandbysDefined(). On the other hand, if
* it's false, the lock is not necessary because we don't touch the queue.
*/
- if (!SyncRepRequested() ||
- !((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
+ if (!SyncRepConfigured())
return;
/* Cap the level for anything other than commit to remote flush only. */
@@ -523,6 +525,15 @@ SyncRepReleaseWaiters(void)
LWLockRelease(SyncRepLock);
+ /*
+ * If synchronized_standby_slots is set, the respective walsender's
+ * will be responsible for broadcasting the value.
+ */
+ if (strcmp(synchronized_standby_slots, "") == 0)
+ {
+ ConditionVariableBroadcast(&WalSndCtl->wal_confirm_rcv_cv);
+ }
+
elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X",
numwrite, LSN_FORMAT_ARGS(writePtr),
numflush, LSN_FORMAT_ARGS(flushPtr),
@@ -1069,3 +1080,13 @@ assign_synchronous_commit(int newval, void *extra)
break;
}
}
+
+bool
+SyncRepConfigured()
+{
+ if (!SyncRepRequested() ||
+ !((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
+ return false;
+
+ return true;
+}
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index 8efb4044d6..c40884c44e 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -347,6 +347,7 @@ DSMRegistry "Waiting to read or update the dynamic shared memory registry."
InjectionPoint "Waiting to read or update information related to injection points."
SerialControl "Waiting to read or update shared <filename>pg_serial</filename> state."
WaitLSN "Waiting to read or update shared Wait-for-LSN state."
+SyncRepLogicalCache "Waiting to read or update information about the tatc of synchronous replication LSN cache."
#
# END OF PREDEFINED LWLOCKS (DO NOT CHANGE THIS LINE)
diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h
index ea439e6da6..ec22bc72df 100644
--- a/src/include/replication/syncrep.h
+++ b/src/include/replication/syncrep.h
@@ -82,6 +82,7 @@ extern PGDLLIMPORT char *SyncRepStandbyNames;
/* called by user backend */
extern void SyncRepWaitForLSN(XLogRecPtr lsn, bool commit);
+extern bool SyncRepConfigured();
/* called at backend exit */
extern void SyncRepCleanupAtProcExit(void);
@@ -96,6 +97,8 @@ extern int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys);
/* called by checkpointer */
extern void SyncRepUpdateSyncStandbysDefined(void);
+extern int SyncRepWaitMode;
+
/*
* Internal functions for parsing synchronous_standby_names grammar,
* in syncrep_gram.y and syncrep_scanner.l
diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h
index cf32ac2488..878492d3ae 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -102,6 +102,12 @@ typedef struct
*/
XLogRecPtr lsn[NUM_SYNC_REP_WAIT_MODE];
+ /*
+ * Cache of lsn[NUM_SYNC_REP_WAIT_MODE] above when SyncRep is enabled. Checked by logical walsenders
+ * first to reduce contention on SyncRepLock. Protected by SyncRepLogicalCacheLock.
+ */
+ XLogRecPtr cached_lsn[NUM_SYNC_REP_WAIT_MODE];
+
/*
* Are any sync standbys defined? Waiting backends can't reload the
* config file safely, so checkpointer updates this value as needed.
diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h
index 88dc79b2bd..8ea84e5ec0 100644
--- a/src/include/storage/lwlocklist.h
+++ b/src/include/storage/lwlocklist.h
@@ -84,3 +84,4 @@ PG_LWLOCK(50, DSMRegistry)
PG_LWLOCK(51, InjectionPoint)
PG_LWLOCK(52, SerialControl)
PG_LWLOCK(53, WaitLSN)
+PG_LWLOCK(54, SyncRepLogicalCache)
\ No newline at end of file
diff --git a/src/test/recovery/t/040_standby_failover_slots_sync.pl b/src/test/recovery/t/040_standby_failover_slots_sync.pl
index 2c51cfc3c8..00130d92dc 100644
--- a/src/test/recovery/t/040_standby_failover_slots_sync.pl
+++ b/src/test/recovery/t/040_standby_failover_slots_sync.pl
@@ -710,6 +710,141 @@ $result = $subscriber1->safe_psql('postgres',
is($result, 't',
"subscriber1 gets data from primary after standby1 acknowledges changes");
+##################################################
+# Test that logical failover replication slots wait for the specified
+# synchronous replicas to receive the changes first. It uses the
+# following set up:
+#
+# (synchronous physical standbys)
+# | ----> standby1 (application_name = standby1)
+# | ----> standby2 (application_name = standby2)
+# primary ----- |
+# (logical replication)
+# | ----> subscriber1 (failover = true, slot_name = lsub1_slot)
+# | ----> subscriber2 (failover = false, slot_name = lsub2_slot)
+#
+# synchronous_commit = 'on'
+# synchronous_standby_names = 'ANY 2 (standby1, standby2)'
+#
+# The setup is configured in such a way that the logical slot of subscriber1 is
+# enabled for failover, and thus the subscriber1 will wait for the changes to have
+# been synchronously replicated before receiving the decoded changes.
+##################################################
+
+$primary->safe_psql('postgres', "TRUNCATE tab_int;");
+# Setup synchronous replication
+$primary->append_conf(
+ 'postgresql.conf', qq(
+ synchronous_commit = 'on'
+ synchronous_standby_names = 'ANY 2 (standby1, standby2)'
+));
+
+$primary->reload;
+
+$standby1->append_conf('postgresql.conf', "primary_conninfo = '$connstr_1 dbname=postgres application_name=standby1'");
+$standby1->reload;
+
+$standby2->append_conf('postgresql.conf', "primary_conninfo = '$connstr_1 dbname=postgres application_name=standby2'");
+$standby2->reload;
+
+# Check that synchronous replication is setup properly
+$standby2->stop;
+
+# Create some data on the primary
+$primary_row_count = 10;
+
+my $sync_back_q = $primary->background_psql(
+ 'postgres',
+ on_error_stop => 0,
+ timeout => $PostgreSQL::Test::Utils::timeout_default);
+
+$sync_back_q->query_until(qr/insert_blocked_on_sync_rep/, q(
+ \echo insert_blocked_on_sync_rep
+ INSERT INTO tab_int SELECT generate_series(1, 10);
+));
+
+$result = $primary->safe_psql('postgres',
+ "SELECT count(*) = $primary_row_count FROM tab_int;");
+
+is($result, 'f', "primary row count is not updated due to synchronous replication");
+
+# Verify the standby specified in synchronized_standby_slots (sb1_slot aka standby1)
+# catches up with the primary.
+$primary->wait_for_replay_catchup($standby1);
+$result = $standby1->safe_psql('postgres',
+ "SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't', "standby1 gets data from primary");
+
+# Validate that synchronized_standby_slots takes precedence over waiting for
+# changes to have been synchronous replicated.
+# Since the slot specified (sb1_slot) has received the changes, primary can send
+# the decoded changes to the subscription enabled for failover
+# (i.e. regress_mysub1).
+$primary->wait_for_catchup('regress_mysub1');
+$result = $subscriber1->safe_psql('postgres',
+ "SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't',
+ "subscriber1 gets data from primary after standby1 acknowledges changes");
+
+$standby2->start;
+$primary->wait_for_replay_catchup($standby2);
+
+# Unset synchronized_standby_slots to test synchronous replication
+# blocks primary from sending logical decoded changes to failover slots until
+# changes have been synchronously replicated.
+$primary->append_conf(
+ 'postgresql.conf', qq(
+ synchronized_standby_slots = ''));
+$primary->reload;
+
+$primary_row_count = 20;
+$standby2->stop;
+
+$sync_back_q->query_until(
+ qr/insert_blocked_on_additional_sync_rep/, q(
+ \echo insert_blocked_on_additional_sync_rep
+ INSERT INTO tab_int SELECT generate_series(11, 20);
+));
+
+# Since $standby2 has not received the changes, validate that subscriber1 (failover = true)
+# has not received the decoded changes, but subscriber2 (failover = false) has.
+$primary->wait_for_catchup('regress_mysub2');
+
+$result = $subscriber2->safe_psql('postgres',
+ "SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't',
+ "subscriber2 gets data from primary even if the changes have not been synchronously acknowledged.");
+
+$result = $subscriber1->safe_psql('postgres',
+ "SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 'f',
+ "subscriber1 does not get data from primary since changes have not been synchronously acknowledged.");
+
+# Start standby2 to allow the changes to be acknowledged by all the synchronous standbys.
+$standby2->start;
+$primary->wait_for_replay_catchup($standby2);
+$primary->wait_for_catchup('regress_mysub1');
+
+# Now that the changes have been replicated to all synchronous nodes,
+# primary can send the decoded changes to the subscription enabled for failover
+# (i.e. regress_mysub1). While the standby was down, regress_mysub1 didn't
+# receive any data from the primary. i.e. the primary didn't allow it to go
+# ahead of requirements of synchronous commit.
+$result = $subscriber1->safe_psql('postgres',
+ "SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't',
+ "subscriber1 gets data from primary since changes have been syhcronously acknowledged.");
+# No longer need to run the background session.
+$sync_back_q->quit;
+
+# Reset synchronized_standby_slots and synchronous commit for below test cases
+$primary->append_conf(
+ 'postgresql.conf', qq(
+synchronized_standby_slots = 'sb1_slot'
+synchronous_standby_names = ''
+));
+$primary->reload;
+
##################################################
# Verify that when using pg_logical_slot_get_changes to consume changes from a
# logical failover slot, it will also wait for the slots specified in
--
2.40.1
Hi Bertrand,
On Sun, Jul 28, 2024 at 10:00 PM Bertrand Drouvot
<bertranddrouvot.pg@gmail.com> wrote:
Yeah, at the same place as the static lsn[] declaration, something like:
static XLogRecPtr lsn[NUM_SYNC_REP_WAIT_MODE]; /* cached LSNs */
but that may just be a matter of taste.
I've updated the patch to reflect that.
Right, we may want to cast it then but given that the for loop is "small" I think
that's also fine to keep the for loop.
Ah I see what you mean. I've updated these changes and attached the
patch to the other thread.
Thanks,
--
John Hsu - Amazon Web Services
On Tue, Aug 27, 2024 at 12:58 AM John H <johnhyvr@gmail.com> wrote:
For instance, in Shveta's suggestion of
We can perform this test with both of the below settings and say make
D and E slow in sending responses:
1) synchronous_standby_names = 'ANY 3 (A,B,C,D,E)'
2) standby_slot_names = A_slot, B_slot, C_slot, D_slot, E_slot.if the server associated with E_slot is just down or undergoing
some sort of maintenance, then all logical consumers would start lagging until
the server is back up. I could also mimic a network lag of 20 seconds
and it's guaranteed
that this patch will perform better.
I wanted a simple test where in the first case you use
synchronous_standby_names = 'ANY 3 (A,B,C,D,E)' and in the second case
use standby_slot_names = A_slot, B_slot, C_slot, D_slot, E_slot. You
can try some variations of it as well. The idea is that even if the
performance is less for synchronous_standby_names configuration, we
should be able to document it. This will help users to decide what is
best for them.
I re-ran the benchmarks with a longer run time of 3 hours, and testing
a new shared cache
for walsenders to check the value before obtaining the SyncRepLock.I also saw I was being throttled on storage in my previous benchmarks
so I moved to a new setup.
I benchmarked a new test case with an additional shared cache between
all the walsenders to
reduce potential contention on SyncRepLock, and have attached said patch.Database: Writer on it's own disk, 5 RRs on the other disk together
Client: 10 logical clients, pgbench running from here as well'pgbench -c 32 -j 4 -T 10800 -U "ec2-user" -d postgres -r -P 1'
# Test failover_slots with synchronized_standby_slots = 'rr_1, rr_2,
rr_3, rr_4, rr_5'
latency average = 10.683 ms
latency stddev = 11.851 ms
initial connection time = 145.876 ms
tps = 2994.595673 (without initial connection time)# Test failover_slots waiting on sync_rep no new shared cache
latency average = 10.684 ms
latency stddev = 12.247 ms
initial connection time = 142.561 ms
tps = 2994.160136 (without initial connection time)
statement latencies in milliseconds and failures:# Test failover slots with additional shared cache
latency average = 10.674 ms
latency stddev = 11.917 ms
initial connection time = 142.486 ms
tps = 2997.315874 (without initial connection time)The tps improvement between no cache and shared_cache seems marginal, but we do
see the slight improvement in stddev which makes sense from a
contention perspective.
What is the difference between "Test failover_slots with
synchronized_standby_slots = 'rr_1, rr_2,
rr_3, rr_4, rr_5'" and "Test failover_slots waiting on sync_rep no new shared cache"? I want to know what configuration did you used for synchronous_standby_names in the latter case.
I think the cache would demonstrate a lot more improvement if we had
say 1000 logical slots
and all of them are trying to obtain SyncRepLock for updating its values.I've attached the patch but don't feel particularly strongly about the
new shared LSN values.
I am also not sure especially as the test results didn't shown much
improvement and the code also becomes bit complicated. BTW, in the
0003 version in the below code:
+ /* Cache values to reduce contention */
+ LWLockAcquire(SyncRepLock, LW_SHARED);
+ memcpy((XLogRecPtr *) walsndctl->cached_lsn, (XLogRecPtr *)
walsndctl->lsn, sizeof(lsn));
+ LWLockRelease(SyncRepLock);
Which mode lsn is being copied? I am not sure if I understood this
part of the code.
In the 0002 version, in the following code [1], you are referring to
LSN mode which is enabled for logical walsender irrespective of the
mode used by the physical walsender. It is possible that they are
always the same but that is not evident from the code or comments in
the patch.
[1] :
+ /* Cache values to reduce contention on lock */
+ for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
+ {
+ lsn[i] = walsndctl->lsn[i];
+ }
- ss_oldest_flush_lsn = min_restart_lsn;
+ LWLockRelease(SyncRepLock);
- return true;
+ if (lsn[mode] >= wait_for_lsn)
+ return true;
--
With Regards,
Amit Kapila.
On Tue, Aug 27, 2024 at 12:56 AM John H <johnhyvr@gmail.com> wrote:
Hi Shveta,
On Sun, Jul 21, 2024 at 8:42 PM shveta malik <shveta.malik@gmail.com> wrote:
Ah that's a gap. Let me add some logging/warning in a similar fashion.
Although I think I'd have the warning be relatively generic (e.g.
changes are blocked because
they're not synchronously committed)okay, sounds good.
thanks
ShvetaI took a look at having similar warnings the existing
'synchronized_standby_slots' feature has
and I don't think it's particularly feasible.The checks/warnings for 'synchronized_standby_slots' are intended to
protect against misconfiguration.
They consist of slot validation (valid slot_name, not logical slot,
slot has not been invalidated), and
whether or not the slot is active.I don't think there's a "misconfiguration" equivalent for waiting on
synchronous_commit.
With the current proposal, once you have (synchronous_commit enabled
&& failover_slots), logical
decoding is dependent on whether or not the writes have been
replicated to a synchronous replica.
If there is no data being replicated out of the logical slot, it is
because from the perspective of the
database no writes have been committed yet. I don't think it would
make sense to add logging/warning as to
why a transaction is still not committed (e.g. which potential replica
is the one lagging). There isn't a
nice way to determine why synchronous commit is waiting without being
particularly invasive, and even then
it could be particularly noisy (e.g. provide all the application_names
that we are potentially waiting on).
Okay. Thanks for the details. I see your point. I will review to see
if anything comes to my mind for a simpler way to do this.
thanks
Shveta
Hi Amit,
On Mon, Aug 26, 2024 at 11:00 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
I wanted a simple test where in the first case you use
synchronous_standby_names = 'ANY 3 (A,B,C,D,E)' and in the second case
use standby_slot_names = A_slot, B_slot, C_slot, D_slot, E_slot. You
can try some variations of it as well. The idea is that even if the
performance is less for synchronous_standby_names configuration, we
should be able to document it. This will help users to decide what is
...
What is the difference between "Test failover_slots with
synchronized_standby_slots = 'rr_1, rr_2,rr_3, rr_4, rr_5'" and "Test failover_slots waiting on sync_rep no new shared cache"? I want to know what configuration did you used for synchronous_standby_names in the latter case.
Sorry for the confusion due to the bad-naming of the test cases, let
me rephrase.
All three tests had synchronous_standby_names = 'ANY 3 (A,B,C,D,E)'
set with synchronous_commit = 'on', and failover_slots = 'on'
for the 10 logical slots.
# Test failover_slots with synchronized_standby_slots = 'rr_1, rr_2,
rr_3, rr_4, rr_5'
This is the test you wanted where the logical clients are waiting on
all 5 slots to acknowledge the change since
'synchronized_standby_slots' takes priority when set.
# Test failover_slots sync rep no cache
This test has 'synchronized_standby_slots' commented out, and without
relying on the new cache introduced in 0003.
Logical clients will wait on synchronous_standby_names in this case.
# Test failover slots with additional shared cache
This test also has 'synchronized_standby_slots' commented out, and
logical clients will wait on the LSNs
reported from synchronous_standby_names but it relies on a new cache
to reduce contention on SyncRepLock.
The idea is that even if the
performance is less for synchronous_standby_names configuration, we
should be able to document it. This will help users to decide what is
best for them.
Makes sense.
I am also not sure especially as the test results didn't shown much
improvement and the code also becomes bit complicated. BTW, in the
0003 version in the below code:
That's fair, I've updated to be more in line with 0002.
+ /* Cache values to reduce contention */ + LWLockAcquire(SyncRepLock, LW_SHARED); + memcpy((XLogRecPtr *) walsndctl->cached_lsn, (XLogRecPtr *) walsndctl->lsn, sizeof(lsn)); + LWLockRelease(SyncRepLock);Which mode lsn is being copied? I am not sure if I understood this
part of the code.
All of the mode LSNs are being copied in case SyncRepWaitMode changes in
the next iteration. I've removed that part but kept:
+ memcpy(lsn, (XLogRecPtr *) walsndctl->lsn, sizeof(lsn));
as suggested by Bertrand to avoid the for loop updating values one-by-one.
Here's what's logged after the memcpy:
2024-08-28 19:41:13.798 UTC [1160413] LOG: lsn[0] after memcpy is: 279/752C7FF0
2024-08-28 19:41:13.798 UTC [1160413] LOG: lsn[1] after memcpy is: 279/752C7F20
2024-08-28 19:41:13.798 UTC [1160413] LOG: lsn[2] after memcpy is: 279/752C7F20
In the 0002 version, in the following code [1], you are referring to
LSN mode which is enabled for logical walsender irrespective of the
mode used by the physical walsender. It is possible that they are
always the same but that is not evident from the code or comments in
the patch.
They are almost always the same, I tried to indicate that with the
following comment in the patch, but I could make it more explicit?
/* Initialize value in case SIGHUP changing to SYNC_REP_NO_WAIT */
At the beginning we set
int mode = SyncRepWaitMode;
At this time, the logical walsender mode it's checking against is the
same as what the physical walsenders are using.
It's possible that this mode is no longer the same when we execute the
following check:
if (lsn[mode] >= wait_for_lsn)
because of a SIGHUP to synchronous_commit that changes SyncRepWaitMode
to some other value
We cache the value instead of
if (lsn[SyncRepWaitMode] >= wait_for_lsn)
because SYNC_REP_NO_WAIT is -1. If SyncRepWaitMode is set to this it
leads to out of bounds access.
I've attached a new patch that removes the shared cache introduced in 0003.
Thanks,
--
John Hsu - Amazon Web Services
Attachments:
0004-Wait-on-synchronous-replication-by-default-for-logic.patchapplication/octet-stream; name=0004-Wait-on-synchronous-replication-by-default-for-logic.patchDownload
From 603e01d634a30de0a0103f657010e4fce953390e Mon Sep 17 00:00:00 2001
From: John Hsu <johnhyvr@gmail.com>
Date: Mon, 26 Aug 2024 18:23:20 +0000
Subject: [PATCH] Wait on synchronous replication by default for logical
failover slots
If synchronous replication is enabled, this patch
allows logical subscribers with failover_slots enabled
to wait for changes to be replicated to synchronous replicas
before consuming the changes.
---
src/backend/replication/slot.c | 266 ++++++++++--------
src/backend/replication/syncrep.c | 27 +-
src/include/replication/syncrep.h | 3 +
src/include/storage/lwlocklist.h | 2 +-
.../t/040_standby_failover_slots_sync.pl | 135 +++++++++
5 files changed, 316 insertions(+), 117 deletions(-)
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index c290339af5..460358d259 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -49,6 +49,7 @@
#include "postmaster/interrupt.h"
#include "replication/slotsync.h"
#include "replication/slot.h"
+#include "replication/syncrep.h"
#include "replication/walsender_private.h"
#include "storage/fd.h"
#include "storage/ipc.h"
@@ -2591,8 +2592,8 @@ SlotExistsInSyncStandbySlots(const char *slot_name)
}
/*
- * Return true if the slots specified in synchronized_standby_slots have caught up to
- * the given WAL location, false otherwise.
+ * Return true if the slots specified in synchronized_standby_slots or synchronous
+ * replication have caught up to the given WAL location, false otherwise.
*
* The elevel parameter specifies the error level used for logging messages
* related to slots that do not exist, are invalidated, or are inactive.
@@ -2606,9 +2607,9 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
/*
* Don't need to wait for the standbys to catch up if there is no value in
- * synchronized_standby_slots.
+ * synchronized_standby_slots or synchronous replication is not configured.
*/
- if (synchronized_standby_slots_config == NULL)
+ if (synchronized_standby_slots_config == NULL && !SyncRepConfigured())
return true;
/*
@@ -2619,144 +2620,183 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
return true;
/*
- * Don't need to wait for the standbys to catch up if they are already
- * beyond the specified WAL location.
- */
- if (!XLogRecPtrIsInvalid(ss_oldest_flush_lsn) &&
- ss_oldest_flush_lsn >= wait_for_lsn)
- return true;
-
- /*
- * To prevent concurrent slot dropping and creation while filtering the
- * slots, take the ReplicationSlotControlLock outside of the loop.
+ * In the event that synchronized_standby_slots and synchronous replication is
+ * configured, have the former take precedence.
*/
- LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
-
- name = synchronized_standby_slots_config->slot_names;
- for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
+ if (synchronized_standby_slots_config != NULL)
{
- XLogRecPtr restart_lsn;
- bool invalidated;
- bool inactive;
- ReplicationSlot *slot;
-
- slot = SearchNamedReplicationSlot(name, false);
+ /*
+ * Don't need to wait for the standbys to catch up if they are already
+ * beyond the specified WAL location.
+ */
+ if (!XLogRecPtrIsInvalid(ss_oldest_flush_lsn) &&
+ ss_oldest_flush_lsn >= wait_for_lsn)
+ return true;
- if (!slot)
+ /*
+ * To prevent concurrent slot dropping and creation while filtering the
+ * slots, take the ReplicationSlotControlLock outside of the loop.
+ */
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+ name = synchronized_standby_slots_config->slot_names;
+ for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
{
- /*
- * If a slot name provided in synchronized_standby_slots does not
- * exist, report a message and exit the loop. A user can specify a
- * slot name that does not exist just before the server startup.
- * The GUC check_hook(validate_sync_standby_slots) cannot validate
- * such a slot during startup as the ReplicationSlotCtl shared
- * memory is not initialized at that time. It is also possible for
- * a user to drop the slot in synchronized_standby_slots
- * afterwards.
- */
- ereport(elevel,
- errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("replication slot \"%s\" specified in parameter %s does not exist",
- name, "synchronized_standby_slots"),
- errdetail("Logical replication is waiting on the standby associated with \"%s\".",
- name),
- errhint("Consider creating the slot \"%s\" or amend parameter %s.",
- name, "synchronized_standby_slots"));
- break;
- }
+ XLogRecPtr restart_lsn;
+ bool invalidated;
+ bool inactive;
+ ReplicationSlot *slot;
- if (SlotIsLogical(slot))
- {
- /*
- * If a logical slot name is provided in
- * synchronized_standby_slots, report a message and exit the loop.
- * Similar to the non-existent case, a user can specify a logical
- * slot name in synchronized_standby_slots before the server
- * startup, or drop an existing physical slot and recreate a
- * logical slot with the same name.
- */
- ereport(elevel,
- errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("cannot have logical replication slot \"%s\" in parameter %s",
- name, "synchronized_standby_slots"),
- errdetail("Logical replication is waiting for correction on \"%s\".",
- name),
- errhint("Consider removing logical slot \"%s\" from parameter %s.",
- name, "synchronized_standby_slots"));
- break;
- }
+ slot = SearchNamedReplicationSlot(name, false);
- SpinLockAcquire(&slot->mutex);
- restart_lsn = slot->data.restart_lsn;
- invalidated = slot->data.invalidated != RS_INVAL_NONE;
- inactive = slot->active_pid == 0;
- SpinLockRelease(&slot->mutex);
+ if (!slot)
+ {
+ /*
+ * If a slot name provided in synchronized_standby_slots does not
+ * exist, report a message and exit the loop. A user can specify a
+ * slot name that does not exist just before the server startup.
+ * The GUC check_hook(validate_sync_standby_slots) cannot validate
+ * such a slot during startup as the ReplicationSlotCtl shared
+ * memory is not initialized at that time. It is also possible for
+ * a user to drop the slot in synchronized_standby_slots
+ * afterwards.
+ */
+ ereport(elevel,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("replication slot \"%s\" specified in parameter %s does not exist",
+ name, "synchronized_standby_slots"),
+ errdetail("Logical replication is waiting on the standby associated with \"%s\".",
+ name),
+ errhint("Consider creating the slot \"%s\" or amend parameter %s.",
+ name, "synchronized_standby_slots"));
+ break;
+ }
- if (invalidated)
- {
- /* Specified physical slot has been invalidated */
- ereport(elevel,
- errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("physical slot \"%s\" specified in parameter %s has been invalidated",
- name, "synchronized_standby_slots"),
- errdetail("Logical replication is waiting on the standby associated with \"%s\".",
- name),
- errhint("Consider dropping and recreating the slot \"%s\" or amend parameter %s.",
- name, "synchronized_standby_slots"));
- break;
- }
+ if (SlotIsLogical(slot))
+ {
+ /*
+ * If a logical slot name is provided in
+ * synchronized_standby_slots, report a message and exit the loop.
+ * Similar to the non-existent case, a user can specify a logical
+ * slot name in synchronized_standby_slots before the server
+ * startup, or drop an existing physical slot and recreate a
+ * logical slot with the same name.
+ */
+ ereport(elevel,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("cannot have logical replication slot \"%s\" in parameter %s",
+ name, "synchronized_standby_slots"),
+ errdetail("Logical replication is waiting for correction on \"%s\".",
+ name),
+ errhint("Consider removing logical slot \"%s\" from parameter %s.",
+ name, "synchronized_standby_slots"));
+ break;
+ }
- if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn < wait_for_lsn)
- {
- /* Log a message if no active_pid for this physical slot */
- if (inactive)
+ SpinLockAcquire(&slot->mutex);
+ restart_lsn = slot->data.restart_lsn;
+ invalidated = slot->data.invalidated != RS_INVAL_NONE;
+ inactive = slot->active_pid == 0;
+ SpinLockRelease(&slot->mutex);
+
+ if (invalidated)
+ {
+ /* Specified physical slot has been invalidated */
ereport(elevel,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("replication slot \"%s\" specified in parameter %s does not have active_pid",
- name, "synchronized_standby_slots"),
+ errmsg("physical slot \"%s\" specified in parameter %s has been invalidated",
+ name, "synchronized_standby_slots"),
errdetail("Logical replication is waiting on the standby associated with \"%s\".",
- name),
- errhint("Consider starting standby associated with \"%s\" or amend parameter %s.",
+ name),
+ errhint("Consider dropping and recreating the slot \"%s\" or amend parameter %s.",
name, "synchronized_standby_slots"));
+ break;
+ }
- /* Continue if the current slot hasn't caught up. */
- break;
+ if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn < wait_for_lsn)
+ {
+ /* Log a message if no active_pid for this physical slot */
+ if (inactive)
+ ereport(elevel,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("replication slot \"%s\" specified in parameter %s does not have active_pid",
+ name, "synchronized_standby_slots"),
+ errdetail("Logical replication is waiting on the standby associated with \"%s\".",
+ name),
+ errhint("Consider starting standby associated with \"%s\" or amend parameter %s.",
+ name, "synchronized_standby_slots"));
+
+ /* Continue if the current slot hasn't caught up. */
+ break;
+ }
+
+ Assert(restart_lsn >= wait_for_lsn);
+
+ if (XLogRecPtrIsInvalid(min_restart_lsn) ||
+ min_restart_lsn > restart_lsn)
+ min_restart_lsn = restart_lsn;
+
+ caught_up_slot_num++;
+
+ name += strlen(name) + 1;
}
- Assert(restart_lsn >= wait_for_lsn);
+ LWLockRelease(ReplicationSlotControlLock);
+
+ /*
+ * Return false if not all the standbys have caught up to the specified
+ * WAL location.
+ */
+ if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
+ return false;
- if (XLogRecPtrIsInvalid(min_restart_lsn) ||
- min_restart_lsn > restart_lsn)
- min_restart_lsn = restart_lsn;
+ /* The ss_oldest_flush_lsn must not retreat. */
+ Assert(XLogRecPtrIsInvalid(ss_oldest_flush_lsn) ||
+ min_restart_lsn >= ss_oldest_flush_lsn);
- caught_up_slot_num++;
+ ss_oldest_flush_lsn = min_restart_lsn;
- name += strlen(name) + 1;
+ return true;
}
+ else
+ {
+ volatile WalSndCtlData *walsndctl = WalSndCtl;
+ static XLogRecPtr lsn[NUM_SYNC_REP_WAIT_MODE]; /* Cache LSNs */
+ static bool initialized = false;
+ /* Initialize value in case SIGHUP changing to SYNC_REP_NO_WAIT */
+ int mode = SyncRepWaitMode;
+ int i;
+
+ if (!initialized)
+ {
+ for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
+ {
+ lsn[i] = InvalidXLogRecPtr;
+ }
+ }
- LWLockRelease(ReplicationSlotControlLock);
+ if (mode == SYNC_REP_NO_WAIT)
+ return true;
- /*
- * Return false if not all the standbys have caught up to the specified
- * WAL location.
- */
- if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
- return false;
+ if (lsn[mode] >= wait_for_lsn)
+ return true;
- /* The ss_oldest_flush_lsn must not retreat. */
- Assert(XLogRecPtrIsInvalid(ss_oldest_flush_lsn) ||
- min_restart_lsn >= ss_oldest_flush_lsn);
+ LWLockAcquire(SyncRepLock, LW_SHARED);
+ memcpy(lsn, (XLogRecPtr *) walsndctl->lsn, sizeof(lsn));
+ LWLockRelease(SyncRepLock);
- ss_oldest_flush_lsn = min_restart_lsn;
+ if (lsn[mode] >= wait_for_lsn)
+ return true;
- return true;
+ return false;
+ }
}
/*
* Wait for physical standbys to confirm receiving the given lsn.
*
* Used by logical decoding SQL functions. It waits for physical standbys
- * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
+ * corresponding to the physical slots specified in the synchronized_standby_slots GUC,
+ * or synchronous replication.
*/
void
WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
@@ -2766,7 +2806,7 @@ WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
* slot is not a logical failover slot, or there is no value in
* synchronized_standby_slots.
*/
- if (!MyReplicationSlot->data.failover || !synchronized_standby_slots_config)
+ if (!MyReplicationSlot->data.failover || !(synchronized_standby_slots_config || SyncRepConfigured()))
return;
ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index fa5988c824..69c061d8dc 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -78,6 +78,7 @@
#include "common/int.h"
#include "miscadmin.h"
#include "pgstat.h"
+#include "replication/slot.h"
#include "replication/syncrep.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
@@ -95,7 +96,7 @@ char *SyncRepStandbyNames;
static bool announce_next_takeover = true;
SyncRepConfigData *SyncRepConfig = NULL;
-static int SyncRepWaitMode = SYNC_REP_NO_WAIT;
+int SyncRepWaitMode = SYNC_REP_NO_WAIT;
static void SyncRepQueueInsert(int mode);
static void SyncRepCancelWait(void);
@@ -124,6 +125,8 @@ static int cmp_lsn(const void *a, const void *b);
static bool SyncRepQueueIsOrderedByLSN(int mode);
#endif
+bool SyncRepConfigured(void);
+
/*
* ===========================================================
* Synchronous Replication functions for normal user backends
@@ -169,8 +172,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
* described in SyncRepUpdateSyncStandbysDefined(). On the other hand, if
* it's false, the lock is not necessary because we don't touch the queue.
*/
- if (!SyncRepRequested() ||
- !((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
+ if (!SyncRepConfigured())
return;
/* Cap the level for anything other than commit to remote flush only. */
@@ -523,6 +525,15 @@ SyncRepReleaseWaiters(void)
LWLockRelease(SyncRepLock);
+ /*
+ * If synchronized_standby_slots is set, the respective walsender's
+ * will be responsible for broadcasting the value.
+ */
+ if (strcmp(synchronized_standby_slots, "") == 0)
+ {
+ ConditionVariableBroadcast(&WalSndCtl->wal_confirm_rcv_cv);
+ }
+
elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X",
numwrite, LSN_FORMAT_ARGS(writePtr),
numflush, LSN_FORMAT_ARGS(flushPtr),
@@ -1069,3 +1080,13 @@ assign_synchronous_commit(int newval, void *extra)
break;
}
}
+
+bool
+SyncRepConfigured()
+{
+ if (!SyncRepRequested() ||
+ !((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
+ return false;
+
+ return true;
+}
diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h
index ea439e6da6..ec22bc72df 100644
--- a/src/include/replication/syncrep.h
+++ b/src/include/replication/syncrep.h
@@ -82,6 +82,7 @@ extern PGDLLIMPORT char *SyncRepStandbyNames;
/* called by user backend */
extern void SyncRepWaitForLSN(XLogRecPtr lsn, bool commit);
+extern bool SyncRepConfigured();
/* called at backend exit */
extern void SyncRepCleanupAtProcExit(void);
@@ -96,6 +97,8 @@ extern int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys);
/* called by checkpointer */
extern void SyncRepUpdateSyncStandbysDefined(void);
+extern int SyncRepWaitMode;
+
/*
* Internal functions for parsing synchronous_standby_names grammar,
* in syncrep_gram.y and syncrep_scanner.l
diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h
index 88dc79b2bd..e44bc3a07b 100644
--- a/src/include/storage/lwlocklist.h
+++ b/src/include/storage/lwlocklist.h
@@ -83,4 +83,4 @@ PG_LWLOCK(49, WALSummarizer)
PG_LWLOCK(50, DSMRegistry)
PG_LWLOCK(51, InjectionPoint)
PG_LWLOCK(52, SerialControl)
-PG_LWLOCK(53, WaitLSN)
+PG_LWLOCK(53, WaitLSN)
\ No newline at end of file
diff --git a/src/test/recovery/t/040_standby_failover_slots_sync.pl b/src/test/recovery/t/040_standby_failover_slots_sync.pl
index 2c51cfc3c8..00130d92dc 100644
--- a/src/test/recovery/t/040_standby_failover_slots_sync.pl
+++ b/src/test/recovery/t/040_standby_failover_slots_sync.pl
@@ -710,6 +710,141 @@ $result = $subscriber1->safe_psql('postgres',
is($result, 't',
"subscriber1 gets data from primary after standby1 acknowledges changes");
+##################################################
+# Test that logical failover replication slots wait for the specified
+# synchronous replicas to receive the changes first. It uses the
+# following set up:
+#
+# (synchronous physical standbys)
+# | ----> standby1 (application_name = standby1)
+# | ----> standby2 (application_name = standby2)
+# primary ----- |
+# (logical replication)
+# | ----> subscriber1 (failover = true, slot_name = lsub1_slot)
+# | ----> subscriber2 (failover = false, slot_name = lsub2_slot)
+#
+# synchronous_commit = 'on'
+# synchronous_standby_names = 'ANY 2 (standby1, standby2)'
+#
+# The setup is configured in such a way that the logical slot of subscriber1 is
+# enabled for failover, and thus the subscriber1 will wait for the changes to have
+# been synchronously replicated before receiving the decoded changes.
+##################################################
+
+$primary->safe_psql('postgres', "TRUNCATE tab_int;");
+# Setup synchronous replication
+$primary->append_conf(
+ 'postgresql.conf', qq(
+ synchronous_commit = 'on'
+ synchronous_standby_names = 'ANY 2 (standby1, standby2)'
+));
+
+$primary->reload;
+
+$standby1->append_conf('postgresql.conf', "primary_conninfo = '$connstr_1 dbname=postgres application_name=standby1'");
+$standby1->reload;
+
+$standby2->append_conf('postgresql.conf', "primary_conninfo = '$connstr_1 dbname=postgres application_name=standby2'");
+$standby2->reload;
+
+# Check that synchronous replication is setup properly
+$standby2->stop;
+
+# Create some data on the primary
+$primary_row_count = 10;
+
+my $sync_back_q = $primary->background_psql(
+ 'postgres',
+ on_error_stop => 0,
+ timeout => $PostgreSQL::Test::Utils::timeout_default);
+
+$sync_back_q->query_until(qr/insert_blocked_on_sync_rep/, q(
+ \echo insert_blocked_on_sync_rep
+ INSERT INTO tab_int SELECT generate_series(1, 10);
+));
+
+$result = $primary->safe_psql('postgres',
+ "SELECT count(*) = $primary_row_count FROM tab_int;");
+
+is($result, 'f', "primary row count is not updated due to synchronous replication");
+
+# Verify the standby specified in synchronized_standby_slots (sb1_slot aka standby1)
+# catches up with the primary.
+$primary->wait_for_replay_catchup($standby1);
+$result = $standby1->safe_psql('postgres',
+ "SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't', "standby1 gets data from primary");
+
+# Validate that synchronized_standby_slots takes precedence over waiting for
+# changes to have been synchronous replicated.
+# Since the slot specified (sb1_slot) has received the changes, primary can send
+# the decoded changes to the subscription enabled for failover
+# (i.e. regress_mysub1).
+$primary->wait_for_catchup('regress_mysub1');
+$result = $subscriber1->safe_psql('postgres',
+ "SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't',
+ "subscriber1 gets data from primary after standby1 acknowledges changes");
+
+$standby2->start;
+$primary->wait_for_replay_catchup($standby2);
+
+# Unset synchronized_standby_slots to test synchronous replication
+# blocks primary from sending logical decoded changes to failover slots until
+# changes have been synchronously replicated.
+$primary->append_conf(
+ 'postgresql.conf', qq(
+ synchronized_standby_slots = ''));
+$primary->reload;
+
+$primary_row_count = 20;
+$standby2->stop;
+
+$sync_back_q->query_until(
+ qr/insert_blocked_on_additional_sync_rep/, q(
+ \echo insert_blocked_on_additional_sync_rep
+ INSERT INTO tab_int SELECT generate_series(11, 20);
+));
+
+# Since $standby2 has not received the changes, validate that subscriber1 (failover = true)
+# has not received the decoded changes, but subscriber2 (failover = false) has.
+$primary->wait_for_catchup('regress_mysub2');
+
+$result = $subscriber2->safe_psql('postgres',
+ "SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't',
+ "subscriber2 gets data from primary even if the changes have not been synchronously acknowledged.");
+
+$result = $subscriber1->safe_psql('postgres',
+ "SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 'f',
+ "subscriber1 does not get data from primary since changes have not been synchronously acknowledged.");
+
+# Start standby2 to allow the changes to be acknowledged by all the synchronous standbys.
+$standby2->start;
+$primary->wait_for_replay_catchup($standby2);
+$primary->wait_for_catchup('regress_mysub1');
+
+# Now that the changes have been replicated to all synchronous nodes,
+# primary can send the decoded changes to the subscription enabled for failover
+# (i.e. regress_mysub1). While the standby was down, regress_mysub1 didn't
+# receive any data from the primary. i.e. the primary didn't allow it to go
+# ahead of requirements of synchronous commit.
+$result = $subscriber1->safe_psql('postgres',
+ "SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't',
+ "subscriber1 gets data from primary since changes have been syhcronously acknowledged.");
+# No longer need to run the background session.
+$sync_back_q->quit;
+
+# Reset synchronized_standby_slots and synchronous commit for below test cases
+$primary->append_conf(
+ 'postgresql.conf', qq(
+synchronized_standby_slots = 'sb1_slot'
+synchronous_standby_names = ''
+));
+$primary->reload;
+
##################################################
# Verify that when using pg_logical_slot_get_changes to consume changes from a
# logical failover slot, it will also wait for the slots specified in
--
2.40.1
On Thu, Aug 29, 2024 at 2:31 AM John H <johnhyvr@gmail.com> wrote:
Hi Amit,
On Mon, Aug 26, 2024 at 11:00 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
I wanted a simple test where in the first case you use
synchronous_standby_names = 'ANY 3 (A,B,C,D,E)' and in the second case
use standby_slot_names = A_slot, B_slot, C_slot, D_slot, E_slot. You
can try some variations of it as well. The idea is that even if the
performance is less for synchronous_standby_names configuration, we
should be able to document it. This will help users to decide what is
...
What is the difference between "Test failover_slots with
synchronized_standby_slots = 'rr_1, rr_2,rr_3, rr_4, rr_5'" and "Test failover_slots waiting on sync_rep no new shared cache"? I want to know what configuration did you used for synchronous_standby_names in the latter case.
Sorry for the confusion due to the bad-naming of the test cases, let
me rephrase.
All three tests had synchronous_standby_names = 'ANY 3 (A,B,C,D,E)'
set with synchronous_commit = 'on', and failover_slots = 'on'
for the 10 logical slots.# Test failover_slots with synchronized_standby_slots = 'rr_1, rr_2,
rr_3, rr_4, rr_5'
This is the test you wanted where the logical clients are waiting on
all 5 slots to acknowledge the change since
'synchronized_standby_slots' takes priority when set.# Test failover_slots sync rep no cache
This test has 'synchronized_standby_slots' commented out, and without
relying on the new cache introduced in 0003.
Logical clients will wait on synchronous_standby_names in this case.# Test failover slots with additional shared cache
This test also has 'synchronized_standby_slots' commented out, and
logical clients will wait on the LSNs
reported from synchronous_standby_names but it relies on a new cache
to reduce contention on SyncRepLock.The idea is that even if the
performance is less for synchronous_standby_names configuration, we
should be able to document it. This will help users to decide what is
best for them.Makes sense.
I am also not sure especially as the test results didn't shown much
improvement and the code also becomes bit complicated. BTW, in the
0003 version in the below code:That's fair, I've updated to be more in line with 0002.
+ /* Cache values to reduce contention */ + LWLockAcquire(SyncRepLock, LW_SHARED); + memcpy((XLogRecPtr *) walsndctl->cached_lsn, (XLogRecPtr *) walsndctl->lsn, sizeof(lsn)); + LWLockRelease(SyncRepLock);Which mode lsn is being copied? I am not sure if I understood this
part of the code.All of the mode LSNs are being copied in case SyncRepWaitMode changes in
the next iteration. I've removed that part but kept:+ memcpy(lsn, (XLogRecPtr *) walsndctl->lsn, sizeof(lsn));
as suggested by Bertrand to avoid the for loop updating values one-by-one.
Here's what's logged after the memcpy:
2024-08-28 19:41:13.798 UTC [1160413] LOG: lsn[0] after memcpy is: 279/752C7FF0
2024-08-28 19:41:13.798 UTC [1160413] LOG: lsn[1] after memcpy is: 279/752C7F20
2024-08-28 19:41:13.798 UTC [1160413] LOG: lsn[2] after memcpy is: 279/752C7F20In the 0002 version, in the following code [1], you are referring to
LSN mode which is enabled for logical walsender irrespective of the
mode used by the physical walsender. It is possible that they are
always the same but that is not evident from the code or comments in
the patch.They are almost always the same, I tried to indicate that with the
following comment in the patch, but I could make it more explicit?/* Initialize value in case SIGHUP changing to SYNC_REP_NO_WAIT */
At the beginning we set
int mode = SyncRepWaitMode;
At this time, the logical walsender mode it's checking against is the
same as what the physical walsenders are using.
It's possible that this mode is no longer the same when we execute the
following check:if (lsn[mode] >= wait_for_lsn)
because of a SIGHUP to synchronous_commit that changes SyncRepWaitMode
to some other valueWe cache the value instead of
if (lsn[SyncRepWaitMode] >= wait_for_lsn)
because SYNC_REP_NO_WAIT is -1. If SyncRepWaitMode is set to this it
leads to out of bounds access.I've attached a new patch that removes the shared cache introduced in 0003.
Thanks for the patch. Few comments and queries:
1)
+ static XLogRecPtr lsn[NUM_SYNC_REP_WAIT_MODE];
We shall name it as 'lsns' as there are multiple.
2)
+ for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
+ {
+ lsn[i] = InvalidXLogRecPtr;
+ }
Can we do it like below similar to what you have done at another place:
memset(lsn, InvalidXLogRecPtr, sizeof(lsn));
3)
+ if (!initialized)
+ {
+ for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
+ {
+ lsn[i] = InvalidXLogRecPtr;
+ }
+ }
I do not see 'initialized' set to TRUE anywhere. Can you please
elaborate the intent here?
4)
+ int mode = SyncRepWaitMode;
+ int i;
+
+ if (!initialized)
+ {
+ for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
+ {
+ lsn[i] = InvalidXLogRecPtr;
+ }
+ }
+ if (mode == SYNC_REP_NO_WAIT)
+ return true;
I do not understand this code well. As Amit also pointed out, 'mode'
may change. When we initialize 'mode' lets say SyncRepWaitMode is
SYNC_REP_NO_WAIT but by the time we check 'if (mode ==
SYNC_REP_NO_WAIT)', SyncRepWaitMode has changed to say
SYNC_REP_WAIT_FLUSH, if so, then we will wrongly return true from
here. Is that a possibility? ProcessConfigFile() is in the caller, and
thus we may end up using the wrong mode.
thanks
Shveta
Hi Shveta,
Thanks for reviewing it so quickly.
On Thu, Aug 29, 2024 at 2:35 AM shveta malik <shveta.malik@gmail.com> wrote:
Thanks for the patch. Few comments and queries:
1)
+ static XLogRecPtr lsn[NUM_SYNC_REP_WAIT_MODE];We shall name it as 'lsns' as there are multiple.
This follows the same naming convention in walsender_private.h
2)
+ for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++) + { + lsn[i] = InvalidXLogRecPtr; + }Can we do it like below similar to what you have done at another place:
memset(lsn, InvalidXLogRecPtr, sizeof(lsn));
I don't think memset works in this case? Well, I think *technically* works but
not sure if that's something worth optimizing.
If I understand correctly, memset takes in a char for the value and
not XLogRecPtr (uint64).
So something like: memset(lsn, 0, sizeof(lsn))
InvalidXLogRecPtr is defined as 0 so I think it works but there's an
implicit dependency here
for correctness.
3) + if (!initialized) + { + for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++) + { + lsn[i] = InvalidXLogRecPtr; + } + }I do not see 'initialized' set to TRUE anywhere. Can you please
elaborate the intent here?
You're right I thought I fixed this. WIll update.
4) + int mode = SyncRepWaitMode; + int i; + + if (!initialized) + { + for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++) + { + lsn[i] = InvalidXLogRecPtr; + } + } + if (mode == SYNC_REP_NO_WAIT) + return true;I do not understand this code well. As Amit also pointed out, 'mode'
may change. When we initialize 'mode' lets say SyncRepWaitMode is
SYNC_REP_NO_WAIT but by the time we check 'if (mode ==
SYNC_REP_NO_WAIT)', SyncRepWaitMode has changed to say
SYNC_REP_WAIT_FLUSH, if so, then we will wrongly return true from
here. Is that a possibility? ProcessConfigFile() is in the caller, and
thus we may end up using the wrong mode.
Yes it's possible for mode to change. In my comment to Amit in the other thread,
I think we have to store mode and base our execution of this logic and ignore
SyncRepWaitMode changing due to ProcesConfigFile/SIGHUP for one iteration.
We can store the value of mode later, so something like:
if (SyncRepWaitMode == SYNC_REP_NO_WAIT)
return true;
mode = SyncRepWaitMode
if (lsn[mode] >= wait_for_lsn)
return true;
But it's the same issue which is when you check lsn[mode],
SyncRepWaitMode could have changed to
something else, so you always have to initialize the value and will
always have this discrepancy.
I'm skeptical end users are changing SyncRepWaitMode in their database
clusters as
it has implications for their durability and I would assume they run
with the same durability guarantees.
Thanks,
--
John Hsu - Amazon Web Services
On Fri, Aug 30, 2024 at 12:56 AM John H <johnhyvr@gmail.com> wrote:
Hi Shveta,
Thanks for reviewing it so quickly.
On Thu, Aug 29, 2024 at 2:35 AM shveta malik <shveta.malik@gmail.com> wrote:
Thanks for the patch. Few comments and queries:
1)
+ static XLogRecPtr lsn[NUM_SYNC_REP_WAIT_MODE];We shall name it as 'lsns' as there are multiple.
This follows the same naming convention in walsender_private.h
2)
+ for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++) + { + lsn[i] = InvalidXLogRecPtr; + }Can we do it like below similar to what you have done at another place:
memset(lsn, InvalidXLogRecPtr, sizeof(lsn));I don't think memset works in this case? Well, I think *technically* works but
not sure if that's something worth optimizing.
If I understand correctly, memset takes in a char for the value and
not XLogRecPtr (uint64).So something like: memset(lsn, 0, sizeof(lsn))
InvalidXLogRecPtr is defined as 0 so I think it works but there's an
implicit dependency here
for correctness.3) + if (!initialized) + { + for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++) + { + lsn[i] = InvalidXLogRecPtr; + } + }I do not see 'initialized' set to TRUE anywhere. Can you please
elaborate the intent here?You're right I thought I fixed this. WIll update.
4) + int mode = SyncRepWaitMode; + int i; + + if (!initialized) + { + for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++) + { + lsn[i] = InvalidXLogRecPtr; + } + } + if (mode == SYNC_REP_NO_WAIT) + return true;I do not understand this code well. As Amit also pointed out, 'mode'
may change. When we initialize 'mode' lets say SyncRepWaitMode is
SYNC_REP_NO_WAIT but by the time we check 'if (mode ==
SYNC_REP_NO_WAIT)', SyncRepWaitMode has changed to say
SYNC_REP_WAIT_FLUSH, if so, then we will wrongly return true from
here. Is that a possibility? ProcessConfigFile() is in the caller, and
thus we may end up using the wrong mode.Yes it's possible for mode to change. In my comment to Amit in the other thread,
I think we have to store mode and base our execution of this logic and ignore
SyncRepWaitMode changing due to ProcesConfigFile/SIGHUP for one iteration.We can store the value of mode later, so something like:
if (SyncRepWaitMode == SYNC_REP_NO_WAIT)
return true;
mode = SyncRepWaitMode
if (lsn[mode] >= wait_for_lsn)
return true;But it's the same issue which is when you check lsn[mode],
SyncRepWaitMode could have changed to
something else, so you always have to initialize the value and will
always have this discrepancy.I'm skeptical end users are changing SyncRepWaitMode in their database
clusters as
it has implications for their durability and I would assume they run
with the same durability guarantees.
I was trying to have a look at the patch again, it doesn't apply on
the head, needs rebase.
Regarding 'mode = SyncRepWaitMode', FWIW, SyncRepWaitForLSN() also
does in a similar way. It gets mode in local var initially and uses it
later. See [1]mode = SyncRepWaitMode; ..... .... if (!WalSndCtl->sync_standbys_defined || lsn <= WalSndCtl->lsn[mode]) { LWLockRelease(SyncRepLock); return; }. So isn't there a chance too that 'SyncRepWaitMode' can
change in between.
[1]: mode = SyncRepWaitMode; ..... .... if (!WalSndCtl->sync_standbys_defined || lsn <= WalSndCtl->lsn[mode]) { LWLockRelease(SyncRepLock); return; }
mode = SyncRepWaitMode;
.....
....
if (!WalSndCtl->sync_standbys_defined ||
lsn <= WalSndCtl->lsn[mode])
{
LWLockRelease(SyncRepLock);
return;
}
thanks
Shveta
Hi Shveta,
On Sun, Sep 8, 2024 at 11:16 PM shveta malik <shveta.malik@gmail.com> wrote:
I was trying to have a look at the patch again, it doesn't apply on
the head, needs rebase.
Rebased with the latest changes.
Regarding 'mode = SyncRepWaitMode', FWIW, SyncRepWaitForLSN() also
does in a similar way. It gets mode in local var initially and uses it
later. See [1]. So isn't there a chance too that 'SyncRepWaitMode' can
change in between.[1]:
mode = SyncRepWaitMode;
.....
....
if (!WalSndCtl->sync_standbys_defined ||
lsn <= WalSndCtl->lsn[mode])
{
LWLockRelease(SyncRepLock);
return;
}
You are right, thanks for the correction. I tried reproducing with GDB
where SyncRepWaitMode
changes due to pg_ctl reload but was unable to do so. It seems like
SIGHUP only sets ConfigReloadPending = true,
which gets processed in the next loop in WalSndLoop() and that's
probably where I was getting confused.
In the latest patch, I've added:
Assert(SyncRepWaitMode >= 0);
which should be true since we call SyncRepConfigured() at the
beginning of StandbySlotsHaveCaughtup(),
and used SyncRepWaitMode directly.
Thank you
--
John Hsu - Amazon Web Services
Attachments:
0005-Wait-on-synchronous-replication-by-default-for-logic.patchapplication/octet-stream; name=0005-Wait-on-synchronous-replication-by-default-for-logic.patchDownload
From 366303c7ff79e4bea2c7c6eb491b2ce641d90b72 Mon Sep 17 00:00:00 2001
From: John Hsu <johnhyvr@gmail.com>
Date: Mon, 26 Aug 2024 18:23:20 +0000
Subject: [PATCH] Wait on synchronous replication by default for logical
failover slots
If synchronous replication is enabled, this patch
allows logical subscribers with failover_slots enabled
to wait for changes to be replicated to synchronous replicas
before consuming the changes.
---
src/backend/replication/slot.c | 259 ++++++++++--------
src/backend/replication/syncrep.c | 27 +-
src/include/replication/syncrep.h | 3 +
.../t/040_standby_failover_slots_sync.pl | 135 +++++++++
4 files changed, 311 insertions(+), 113 deletions(-)
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 0a03776156..0708e46468 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -49,6 +49,7 @@
#include "postmaster/interrupt.h"
#include "replication/slotsync.h"
#include "replication/slot.h"
+#include "replication/syncrep.h"
#include "replication/walsender_private.h"
#include "storage/fd.h"
#include "storage/ipc.h"
@@ -2591,8 +2592,8 @@ SlotExistsInSyncStandbySlots(const char *slot_name)
}
/*
- * Return true if the slots specified in synchronized_standby_slots have caught up to
- * the given WAL location, false otherwise.
+ * Return true if the slots specified in synchronized_standby_slots or synchronous
+ * replication have caught up to the given WAL location, false otherwise.
*
* The elevel parameter specifies the error level used for logging messages
* related to slots that do not exist, are invalidated, or are inactive.
@@ -2606,9 +2607,9 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
/*
* Don't need to wait for the standbys to catch up if there is no value in
- * synchronized_standby_slots.
+ * synchronized_standby_slots or synchronous replication is not configured.
*/
- if (synchronized_standby_slots_config == NULL)
+ if (synchronized_standby_slots_config == NULL && !SyncRepConfigured())
return true;
/*
@@ -2619,144 +2620,182 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
return true;
/*
- * Don't need to wait for the standbys to catch up if they are already
- * beyond the specified WAL location.
- */
- if (!XLogRecPtrIsInvalid(ss_oldest_flush_lsn) &&
- ss_oldest_flush_lsn >= wait_for_lsn)
- return true;
-
- /*
- * To prevent concurrent slot dropping and creation while filtering the
- * slots, take the ReplicationSlotControlLock outside of the loop.
+ * In the event that synchronized_standby_slots and synchronous replication is
+ * configured, have the former take precedence.
*/
- LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
-
- name = synchronized_standby_slots_config->slot_names;
- for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
+ if (synchronized_standby_slots_config != NULL)
{
- XLogRecPtr restart_lsn;
- bool invalidated;
- bool inactive;
- ReplicationSlot *slot;
+ /*
+ * Don't need to wait for the standbys to catch up if they are already
+ * beyond the specified WAL location.
+ */
+ if (!XLogRecPtrIsInvalid(ss_oldest_flush_lsn) &&
+ ss_oldest_flush_lsn >= wait_for_lsn)
+ return true;
- slot = SearchNamedReplicationSlot(name, false);
+ /*
+ * To prevent concurrent slot dropping and creation while filtering the
+ * slots, take the ReplicationSlotControlLock outside of the loop.
+ */
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
- if (!slot)
+ name = synchronized_standby_slots_config->slot_names;
+ for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
{
- /*
- * If a slot name provided in synchronized_standby_slots does not
- * exist, report a message and exit the loop. A user can specify a
- * slot name that does not exist just before the server startup.
- * The GUC check_hook(validate_sync_standby_slots) cannot validate
- * such a slot during startup as the ReplicationSlotCtl shared
- * memory is not initialized at that time. It is also possible for
- * a user to drop the slot in synchronized_standby_slots
- * afterwards.
- */
- ereport(elevel,
- errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist",
- name, "synchronized_standby_slots"),
- errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
- name),
- errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
- name, "synchronized_standby_slots"));
- break;
- }
+ XLogRecPtr restart_lsn;
+ bool invalidated;
+ bool inactive;
+ ReplicationSlot *slot;
- if (SlotIsLogical(slot))
- {
- /*
- * If a logical slot name is provided in
- * synchronized_standby_slots, report a message and exit the loop.
- * Similar to the non-existent case, a user can specify a logical
- * slot name in synchronized_standby_slots before the server
- * startup, or drop an existing physical slot and recreate a
- * logical slot with the same name.
- */
- ereport(elevel,
- errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"",
- name, "synchronized_standby_slots"),
- errdetail("Logical replication is waiting for correction on replication slot \"%s\".",
- name),
- errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
- name, "synchronized_standby_slots"));
- break;
- }
+ slot = SearchNamedReplicationSlot(name, false);
- SpinLockAcquire(&slot->mutex);
- restart_lsn = slot->data.restart_lsn;
- invalidated = slot->data.invalidated != RS_INVAL_NONE;
- inactive = slot->active_pid == 0;
- SpinLockRelease(&slot->mutex);
+ if (!slot)
+ {
+ /*
+ * If a slot name provided in synchronized_standby_slots does not
+ * exist, report a message and exit the loop. A user can specify a
+ * slot name that does not exist just before the server startup.
+ * The GUC check_hook(validate_sync_standby_slots) cannot validate
+ * such a slot during startup as the ReplicationSlotCtl shared
+ * memory is not initialized at that time. It is also possible for
+ * a user to drop the slot in synchronized_standby_slots
+ * afterwards.
+ */
+ ereport(elevel,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist",
+ name, "synchronized_standby_slots"),
+ errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
+ name),
+ errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
+ name, "synchronized_standby_slots"));
+ break;
+ }
- if (invalidated)
- {
- /* Specified physical slot has been invalidated */
- ereport(elevel,
- errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
- name, "synchronized_standby_slots"),
- errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
- name),
- errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
- name, "synchronized_standby_slots"));
- break;
- }
+ if (SlotIsLogical(slot))
+ {
+ /*
+ * If a logical slot name is provided in
+ * synchronized_standby_slots, report a message and exit the loop.
+ * Similar to the non-existent case, a user can specify a logical
+ * slot name in synchronized_standby_slots before the server
+ * startup, or drop an existing physical slot and recreate a
+ * logical slot with the same name.
+ */
+ ereport(elevel,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"",
+ name, "synchronized_standby_slots"),
+ errdetail("Logical replication is waiting for correction on replication slot \"%s\".",
+ name),
+ errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
+ name, "synchronized_standby_slots"));
+ break;
+ }
- if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn < wait_for_lsn)
- {
- /* Log a message if no active_pid for this physical slot */
- if (inactive)
+ SpinLockAcquire(&slot->mutex);
+ restart_lsn = slot->data.restart_lsn;
+ invalidated = slot->data.invalidated != RS_INVAL_NONE;
+ inactive = slot->active_pid == 0;
+ SpinLockRelease(&slot->mutex);
+
+ if (invalidated)
+ {
+ /* Specified physical slot has been invalidated */
ereport(elevel,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
+ errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
name, "synchronized_standby_slots"),
errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
name),
- errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
+ errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
name, "synchronized_standby_slots"));
+ break;
+ }
- /* Continue if the current slot hasn't caught up. */
- break;
+ if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn < wait_for_lsn)
+ {
+ /* Log a message if no active_pid for this physical slot */
+ if (inactive)
+ ereport(elevel,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
+ name, "synchronized_standby_slots"),
+ errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
+ name),
+ errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
+ name, "synchronized_standby_slots"));
+
+ /* Continue if the current slot hasn't caught up. */
+ break;
+ }
+
+ Assert(restart_lsn >= wait_for_lsn);
+
+ if (XLogRecPtrIsInvalid(min_restart_lsn) ||
+ min_restart_lsn > restart_lsn)
+ min_restart_lsn = restart_lsn;
+
+ caught_up_slot_num++;
+
+ name += strlen(name) + 1;
}
- Assert(restart_lsn >= wait_for_lsn);
+ LWLockRelease(ReplicationSlotControlLock);
- if (XLogRecPtrIsInvalid(min_restart_lsn) ||
- min_restart_lsn > restart_lsn)
- min_restart_lsn = restart_lsn;
+ /*
+ * Return false if not all the standbys have caught up to the specified
+ * WAL location.
+ */
+ if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
+ return false;
- caught_up_slot_num++;
+ /* The ss_oldest_flush_lsn must not retreat. */
+ Assert(XLogRecPtrIsInvalid(ss_oldest_flush_lsn) ||
+ min_restart_lsn >= ss_oldest_flush_lsn);
- name += strlen(name) + 1;
+ ss_oldest_flush_lsn = min_restart_lsn;
+
+ return true;
}
+ else
+ {
+ volatile WalSndCtlData *walsndctl = WalSndCtl;
+ static XLogRecPtr lsn[NUM_SYNC_REP_WAIT_MODE]; /* Cache LSNs */
+ static bool initialized = false;
+ int i;
- LWLockRelease(ReplicationSlotControlLock);
+ if (!initialized)
+ {
+ for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
+ {
+ lsn[i] = InvalidXLogRecPtr;
+ }
+ initialized = true;
+ }
- /*
- * Return false if not all the standbys have caught up to the specified
- * WAL location.
- */
- if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
- return false;
+ Assert(SyncRepWaitMode >= 0);
- /* The ss_oldest_flush_lsn must not retreat. */
- Assert(XLogRecPtrIsInvalid(ss_oldest_flush_lsn) ||
- min_restart_lsn >= ss_oldest_flush_lsn);
+ if (lsn[SyncRepWaitMode] >= wait_for_lsn)
+ return true;
- ss_oldest_flush_lsn = min_restart_lsn;
+ LWLockAcquire(SyncRepLock, LW_SHARED);
+ memcpy(lsn, (XLogRecPtr *) walsndctl->lsn, sizeof(lsn));
+ LWLockRelease(SyncRepLock);
- return true;
+ if (lsn[SyncRepWaitMode] >= wait_for_lsn)
+ return true;
+
+ return false;
+ }
}
/*
* Wait for physical standbys to confirm receiving the given lsn.
*
* Used by logical decoding SQL functions. It waits for physical standbys
- * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
+ * corresponding to the physical slots specified in the synchronized_standby_slots GUC,
+ * or synchronous replication.
*/
void
WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
@@ -2766,7 +2805,7 @@ WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
* slot is not a logical failover slot, or there is no value in
* synchronized_standby_slots.
*/
- if (!MyReplicationSlot->data.failover || !synchronized_standby_slots_config)
+ if (!MyReplicationSlot->data.failover || !(synchronized_standby_slots_config || SyncRepConfigured()))
return;
ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index fa5988c824..69c061d8dc 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -78,6 +78,7 @@
#include "common/int.h"
#include "miscadmin.h"
#include "pgstat.h"
+#include "replication/slot.h"
#include "replication/syncrep.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
@@ -95,7 +96,7 @@ char *SyncRepStandbyNames;
static bool announce_next_takeover = true;
SyncRepConfigData *SyncRepConfig = NULL;
-static int SyncRepWaitMode = SYNC_REP_NO_WAIT;
+int SyncRepWaitMode = SYNC_REP_NO_WAIT;
static void SyncRepQueueInsert(int mode);
static void SyncRepCancelWait(void);
@@ -124,6 +125,8 @@ static int cmp_lsn(const void *a, const void *b);
static bool SyncRepQueueIsOrderedByLSN(int mode);
#endif
+bool SyncRepConfigured(void);
+
/*
* ===========================================================
* Synchronous Replication functions for normal user backends
@@ -169,8 +172,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
* described in SyncRepUpdateSyncStandbysDefined(). On the other hand, if
* it's false, the lock is not necessary because we don't touch the queue.
*/
- if (!SyncRepRequested() ||
- !((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
+ if (!SyncRepConfigured())
return;
/* Cap the level for anything other than commit to remote flush only. */
@@ -523,6 +525,15 @@ SyncRepReleaseWaiters(void)
LWLockRelease(SyncRepLock);
+ /*
+ * If synchronized_standby_slots is set, the respective walsender's
+ * will be responsible for broadcasting the value.
+ */
+ if (strcmp(synchronized_standby_slots, "") == 0)
+ {
+ ConditionVariableBroadcast(&WalSndCtl->wal_confirm_rcv_cv);
+ }
+
elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X",
numwrite, LSN_FORMAT_ARGS(writePtr),
numflush, LSN_FORMAT_ARGS(flushPtr),
@@ -1069,3 +1080,13 @@ assign_synchronous_commit(int newval, void *extra)
break;
}
}
+
+bool
+SyncRepConfigured()
+{
+ if (!SyncRepRequested() ||
+ !((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
+ return false;
+
+ return true;
+}
diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h
index ea439e6da6..ec22bc72df 100644
--- a/src/include/replication/syncrep.h
+++ b/src/include/replication/syncrep.h
@@ -82,6 +82,7 @@ extern PGDLLIMPORT char *SyncRepStandbyNames;
/* called by user backend */
extern void SyncRepWaitForLSN(XLogRecPtr lsn, bool commit);
+extern bool SyncRepConfigured();
/* called at backend exit */
extern void SyncRepCleanupAtProcExit(void);
@@ -96,6 +97,8 @@ extern int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys);
/* called by checkpointer */
extern void SyncRepUpdateSyncStandbysDefined(void);
+extern int SyncRepWaitMode;
+
/*
* Internal functions for parsing synchronous_standby_names grammar,
* in syncrep_gram.y and syncrep_scanner.l
diff --git a/src/test/recovery/t/040_standby_failover_slots_sync.pl b/src/test/recovery/t/040_standby_failover_slots_sync.pl
index a474c626d9..5a69837f98 100644
--- a/src/test/recovery/t/040_standby_failover_slots_sync.pl
+++ b/src/test/recovery/t/040_standby_failover_slots_sync.pl
@@ -710,6 +710,141 @@ $result = $subscriber1->safe_psql('postgres',
is($result, 't',
"subscriber1 gets data from primary after standby1 acknowledges changes");
+##################################################
+# Test that logical failover replication slots wait for the specified
+# synchronous replicas to receive the changes first. It uses the
+# following set up:
+#
+# (synchronous physical standbys)
+# | ----> standby1 (application_name = standby1)
+# | ----> standby2 (application_name = standby2)
+# primary ----- |
+# (logical replication)
+# | ----> subscriber1 (failover = true, slot_name = lsub1_slot)
+# | ----> subscriber2 (failover = false, slot_name = lsub2_slot)
+#
+# synchronous_commit = 'on'
+# synchronous_standby_names = 'ANY 2 (standby1, standby2)'
+#
+# The setup is configured in such a way that the logical slot of subscriber1 is
+# enabled for failover, and thus the subscriber1 will wait for the changes to have
+# been synchronously replicated before receiving the decoded changes.
+##################################################
+
+$primary->safe_psql('postgres', "TRUNCATE tab_int;");
+# Setup synchronous replication
+$primary->append_conf(
+ 'postgresql.conf', qq(
+ synchronous_commit = 'on'
+ synchronous_standby_names = 'ANY 2 (standby1, standby2)'
+));
+
+$primary->reload;
+
+$standby1->append_conf('postgresql.conf', "primary_conninfo = '$connstr_1 dbname=postgres application_name=standby1'");
+$standby1->reload;
+
+$standby2->append_conf('postgresql.conf', "primary_conninfo = '$connstr_1 dbname=postgres application_name=standby2'");
+$standby2->reload;
+
+# Check that synchronous replication is setup properly
+$standby2->stop;
+
+# Create some data on the primary
+$primary_row_count = 10;
+
+my $sync_back_q = $primary->background_psql(
+ 'postgres',
+ on_error_stop => 0,
+ timeout => $PostgreSQL::Test::Utils::timeout_default);
+
+$sync_back_q->query_until(qr/insert_blocked_on_sync_rep/, q(
+ \echo insert_blocked_on_sync_rep
+ INSERT INTO tab_int SELECT generate_series(1, 10);
+));
+
+$result = $primary->safe_psql('postgres',
+ "SELECT count(*) = $primary_row_count FROM tab_int;");
+
+is($result, 'f', "primary row count is not updated due to synchronous replication");
+
+# Verify the standby specified in synchronized_standby_slots (sb1_slot aka standby1)
+# catches up with the primary.
+$primary->wait_for_replay_catchup($standby1);
+$result = $standby1->safe_psql('postgres',
+ "SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't', "standby1 gets data from primary");
+
+# Validate that synchronized_standby_slots takes precedence over waiting for
+# changes to have been synchronous replicated.
+# Since the slot specified (sb1_slot) has received the changes, primary can send
+# the decoded changes to the subscription enabled for failover
+# (i.e. regress_mysub1).
+$primary->wait_for_catchup('regress_mysub1');
+$result = $subscriber1->safe_psql('postgres',
+ "SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't',
+ "subscriber1 gets data from primary after standby1 acknowledges changes");
+
+$standby2->start;
+$primary->wait_for_replay_catchup($standby2);
+
+# Unset synchronized_standby_slots to test synchronous replication
+# blocks primary from sending logical decoded changes to failover slots until
+# changes have been synchronously replicated.
+$primary->append_conf(
+ 'postgresql.conf', qq(
+ synchronized_standby_slots = ''));
+$primary->reload;
+
+$primary_row_count = 20;
+$standby2->stop;
+
+$sync_back_q->query_until(
+ qr/insert_blocked_on_additional_sync_rep/, q(
+ \echo insert_blocked_on_additional_sync_rep
+ INSERT INTO tab_int SELECT generate_series(11, 20);
+));
+
+# Since $standby2 has not received the changes, validate that subscriber1 (failover = true)
+# has not received the decoded changes, but subscriber2 (failover = false) has.
+$primary->wait_for_catchup('regress_mysub2');
+
+$result = $subscriber2->safe_psql('postgres',
+ "SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't',
+ "subscriber2 gets data from primary even if the changes have not been synchronously acknowledged.");
+
+$result = $subscriber1->safe_psql('postgres',
+ "SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 'f',
+ "subscriber1 does not get data from primary since changes have not been synchronously acknowledged.");
+
+# Start standby2 to allow the changes to be acknowledged by all the synchronous standbys.
+$standby2->start;
+$primary->wait_for_replay_catchup($standby2);
+$primary->wait_for_catchup('regress_mysub1');
+
+# Now that the changes have been replicated to all synchronous nodes,
+# primary can send the decoded changes to the subscription enabled for failover
+# (i.e. regress_mysub1). While the standby was down, regress_mysub1 didn't
+# receive any data from the primary. i.e. the primary didn't allow it to go
+# ahead of requirements of synchronous commit.
+$result = $subscriber1->safe_psql('postgres',
+ "SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't',
+ "subscriber1 gets data from primary since changes have been syhcronously acknowledged.");
+# No longer need to run the background session.
+$sync_back_q->quit;
+
+# Reset synchronized_standby_slots and synchronous commit for below test cases
+$primary->append_conf(
+ 'postgresql.conf', qq(
+synchronized_standby_slots = 'sb1_slot'
+synchronous_standby_names = ''
+));
+$primary->reload;
+
##################################################
# Verify that when using pg_logical_slot_get_changes to consume changes from a
# logical failover slot, it will also wait for the slots specified in
--
2.40.1
On Wed, Sep 11, 2024 at 2:40 AM John H <johnhyvr@gmail.com> wrote:
Hi Shveta,
On Sun, Sep 8, 2024 at 11:16 PM shveta malik <shveta.malik@gmail.com> wrote:
I was trying to have a look at the patch again, it doesn't apply on
the head, needs rebase.Rebased with the latest changes.
Regarding 'mode = SyncRepWaitMode', FWIW, SyncRepWaitForLSN() also
does in a similar way. It gets mode in local var initially and uses it
later. See [1]. So isn't there a chance too that 'SyncRepWaitMode' can
change in between.[1]:
mode = SyncRepWaitMode;
.....
....
if (!WalSndCtl->sync_standbys_defined ||
lsn <= WalSndCtl->lsn[mode])
{
LWLockRelease(SyncRepLock);
return;
}You are right, thanks for the correction. I tried reproducing with GDB
where SyncRepWaitMode
changes due to pg_ctl reload but was unable to do so. It seems like
SIGHUP only sets ConfigReloadPending = true,
which gets processed in the next loop in WalSndLoop() and that's
probably where I was getting confused.
yes, SIGHUP will be processed in the caller of
StandbySlotsHaveCaughtup() (see ProcessConfigFile() in
WaitForStandbyConfirmation()). So we can use 'SyncRepWaitMode'
directly as it is not going to change in StandbySlotsHaveCaughtup()
even if user triggers the change. And thus it was okay to use it even
in the local variable too similar to how SyncRepWaitForLSN() does it.
In the latest patch, I've added:
Assert(SyncRepWaitMode >= 0);
which should be true since we call SyncRepConfigured() at the
beginning of StandbySlotsHaveCaughtup(),
and used SyncRepWaitMode directly.
Yes, it should be okay I think. As SyncRepRequested() in the beginning
makes sure synchronous_commit > SYNCHRONOUS_COMMIT_LOCAL_FLUSH and
thus SyncRepWaitMode should be mapped to either of
WAIT_WRITE/FLUSH/APPLY etc. Will review further.
thanks
Shveta
On Thu, Sep 12, 2024 at 3:04 PM shveta malik <shveta.malik@gmail.com> wrote:
On Wed, Sep 11, 2024 at 2:40 AM John H <johnhyvr@gmail.com> wrote:
Hi Shveta,
On Sun, Sep 8, 2024 at 11:16 PM shveta malik <shveta.malik@gmail.com> wrote:
I was trying to have a look at the patch again, it doesn't apply on
the head, needs rebase.Rebased with the latest changes.
Regarding 'mode = SyncRepWaitMode', FWIW, SyncRepWaitForLSN() also
does in a similar way. It gets mode in local var initially and uses it
later. See [1]. So isn't there a chance too that 'SyncRepWaitMode' can
change in between.[1]:
mode = SyncRepWaitMode;
.....
....
if (!WalSndCtl->sync_standbys_defined ||
lsn <= WalSndCtl->lsn[mode])
{
LWLockRelease(SyncRepLock);
return;
}You are right, thanks for the correction. I tried reproducing with GDB
where SyncRepWaitMode
changes due to pg_ctl reload but was unable to do so. It seems like
SIGHUP only sets ConfigReloadPending = true,
which gets processed in the next loop in WalSndLoop() and that's
probably where I was getting confused.yes, SIGHUP will be processed in the caller of
StandbySlotsHaveCaughtup() (see ProcessConfigFile() in
WaitForStandbyConfirmation()). So we can use 'SyncRepWaitMode'
directly as it is not going to change in StandbySlotsHaveCaughtup()
even if user triggers the change. And thus it was okay to use it even
in the local variable too similar to how SyncRepWaitForLSN() does it.In the latest patch, I've added:
Assert(SyncRepWaitMode >= 0);
which should be true since we call SyncRepConfigured() at the
beginning of StandbySlotsHaveCaughtup(),
and used SyncRepWaitMode directly.Yes, it should be okay I think. As SyncRepRequested() in the beginning
makes sure synchronous_commit > SYNCHRONOUS_COMMIT_LOCAL_FLUSH and
thus SyncRepWaitMode should be mapped to either of
WAIT_WRITE/FLUSH/APPLY etc. Will review further.
I was wondering if we need somethign similar to SyncRepWaitForLSN() here:
/* Cap the level for anything other than commit to remote flush only. */
if (commit)
mode = SyncRepWaitMode;
else
mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH);
The header comment says:
* 'lsn' represents the LSN to wait for. 'commit' indicates whether this LSN
* represents a commit record. If it doesn't, then we wait only for the WAL
* to be flushed if synchronous_commit is set to the higher level of
* remote_apply, because only commit records provide apply feedback.
If we don't do something similar, then aren't there chances that we
keep on waiting on the wrong lsn[mode] for the case when mode =
SYNC_REP_WAIT_APPLY while sync-rep-wait infrastructure is updating
different mode's lsn. Is my understanding correct?
thanks
Shveta
On Fri, Sep 13, 2024 at 3:13 PM shveta malik <shveta.malik@gmail.com> wrote:
On Thu, Sep 12, 2024 at 3:04 PM shveta malik <shveta.malik@gmail.com> wrote:
On Wed, Sep 11, 2024 at 2:40 AM John H <johnhyvr@gmail.com> wrote:
Hi Shveta,
On Sun, Sep 8, 2024 at 11:16 PM shveta malik <shveta.malik@gmail.com> wrote:
I was trying to have a look at the patch again, it doesn't apply on
the head, needs rebase.Rebased with the latest changes.
Regarding 'mode = SyncRepWaitMode', FWIW, SyncRepWaitForLSN() also
does in a similar way. It gets mode in local var initially and uses it
later. See [1]. So isn't there a chance too that 'SyncRepWaitMode' can
change in between.[1]:
mode = SyncRepWaitMode;
.....
....
if (!WalSndCtl->sync_standbys_defined ||
lsn <= WalSndCtl->lsn[mode])
{
LWLockRelease(SyncRepLock);
return;
}You are right, thanks for the correction. I tried reproducing with GDB
where SyncRepWaitMode
changes due to pg_ctl reload but was unable to do so. It seems like
SIGHUP only sets ConfigReloadPending = true,
which gets processed in the next loop in WalSndLoop() and that's
probably where I was getting confused.yes, SIGHUP will be processed in the caller of
StandbySlotsHaveCaughtup() (see ProcessConfigFile() in
WaitForStandbyConfirmation()). So we can use 'SyncRepWaitMode'
directly as it is not going to change in StandbySlotsHaveCaughtup()
even if user triggers the change. And thus it was okay to use it even
in the local variable too similar to how SyncRepWaitForLSN() does it.In the latest patch, I've added:
Assert(SyncRepWaitMode >= 0);
which should be true since we call SyncRepConfigured() at the
beginning of StandbySlotsHaveCaughtup(),
and used SyncRepWaitMode directly.Yes, it should be okay I think. As SyncRepRequested() in the beginning
makes sure synchronous_commit > SYNCHRONOUS_COMMIT_LOCAL_FLUSH and
thus SyncRepWaitMode should be mapped to either of
WAIT_WRITE/FLUSH/APPLY etc. Will review further.I was wondering if we need somethign similar to SyncRepWaitForLSN() here:
/* Cap the level for anything other than commit to remote flush only. */
if (commit)
mode = SyncRepWaitMode;
else
mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH);The header comment says:
* 'lsn' represents the LSN to wait for. 'commit' indicates whether this LSN
* represents a commit record. If it doesn't, then we wait only for the WAL
* to be flushed if synchronous_commit is set to the higher level of
* remote_apply, because only commit records provide apply feedback.If we don't do something similar, then aren't there chances that we
keep on waiting on the wrong lsn[mode] for the case when mode =
SYNC_REP_WAIT_APPLY while sync-rep-wait infrastructure is updating
different mode's lsn. Is my understanding correct?
I think here we always need the lsn values corresponding to
SYNC_REP_WAIT_FLUSH as we want to ensure that the WAL has to be
flushed on physical standby before sending it to the logical
subscriber. See ProcessStandbyReplyMessage() where we always use
flushPtr to advance the physical_slot via
PhysicalConfirmReceivedLocation().
Another question aside from the above point, what if someone has
specified logical subscribers in synchronous_standby_names? In the
case of synchronized_standby_slots, we won't proceed with such slots.
--
With Regards,
Amit Kapila.
On Mon, Sep 16, 2024 at 11:13 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Fri, Sep 13, 2024 at 3:13 PM shveta malik <shveta.malik@gmail.com> wrote:
On Thu, Sep 12, 2024 at 3:04 PM shveta malik <shveta.malik@gmail.com> wrote:
On Wed, Sep 11, 2024 at 2:40 AM John H <johnhyvr@gmail.com> wrote:
Hi Shveta,
On Sun, Sep 8, 2024 at 11:16 PM shveta malik <shveta.malik@gmail.com> wrote:
I was trying to have a look at the patch again, it doesn't apply on
the head, needs rebase.Rebased with the latest changes.
Regarding 'mode = SyncRepWaitMode', FWIW, SyncRepWaitForLSN() also
does in a similar way. It gets mode in local var initially and uses it
later. See [1]. So isn't there a chance too that 'SyncRepWaitMode' can
change in between.[1]:
mode = SyncRepWaitMode;
.....
....
if (!WalSndCtl->sync_standbys_defined ||
lsn <= WalSndCtl->lsn[mode])
{
LWLockRelease(SyncRepLock);
return;
}You are right, thanks for the correction. I tried reproducing with GDB
where SyncRepWaitMode
changes due to pg_ctl reload but was unable to do so. It seems like
SIGHUP only sets ConfigReloadPending = true,
which gets processed in the next loop in WalSndLoop() and that's
probably where I was getting confused.yes, SIGHUP will be processed in the caller of
StandbySlotsHaveCaughtup() (see ProcessConfigFile() in
WaitForStandbyConfirmation()). So we can use 'SyncRepWaitMode'
directly as it is not going to change in StandbySlotsHaveCaughtup()
even if user triggers the change. And thus it was okay to use it even
in the local variable too similar to how SyncRepWaitForLSN() does it.In the latest patch, I've added:
Assert(SyncRepWaitMode >= 0);
which should be true since we call SyncRepConfigured() at the
beginning of StandbySlotsHaveCaughtup(),
and used SyncRepWaitMode directly.Yes, it should be okay I think. As SyncRepRequested() in the beginning
makes sure synchronous_commit > SYNCHRONOUS_COMMIT_LOCAL_FLUSH and
thus SyncRepWaitMode should be mapped to either of
WAIT_WRITE/FLUSH/APPLY etc. Will review further.I was wondering if we need somethign similar to SyncRepWaitForLSN() here:
/* Cap the level for anything other than commit to remote flush only. */
if (commit)
mode = SyncRepWaitMode;
else
mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH);The header comment says:
* 'lsn' represents the LSN to wait for. 'commit' indicates whether this LSN
* represents a commit record. If it doesn't, then we wait only for the WAL
* to be flushed if synchronous_commit is set to the higher level of
* remote_apply, because only commit records provide apply feedback.If we don't do something similar, then aren't there chances that we
keep on waiting on the wrong lsn[mode] for the case when mode =
SYNC_REP_WAIT_APPLY while sync-rep-wait infrastructure is updating
different mode's lsn. Is my understanding correct?I think here we always need the lsn values corresponding to
SYNC_REP_WAIT_FLUSH as we want to ensure that the WAL has to be
flushed on physical standby before sending it to the logical
subscriber. See ProcessStandbyReplyMessage() where we always use
flushPtr to advance the physical_slot via
PhysicalConfirmReceivedLocation().
I agree. So even if the mode is SYNC_REP_WAIT_WRITE (lower one) or
SYNC_REP_WAIT_APPLY (higher one), we need to wait for
lsn[SYNC_REP_WAIT_FLUSH].
Another question aside from the above point, what if someone has
specified logical subscribers in synchronous_standby_names? In the
case of synchronized_standby_slots, we won't proceed with such slots.
Yes, it is a possibility. I have missed this point earlier. Now I
tried a case where I give a mix of logical subscriber and physical
standby in 'synchronous_standby_names' on pgHead, it even takes that
'mix' configuration and starts waiting accordingly.
synchronous_standby_names = 'FIRST 2(logicalsub_1, phy_standby_1,
phy_standby_2)';
thanks
Shveta
On Mon, Sep 16, 2024 at 2:55 PM shveta malik <shveta.malik@gmail.com> wrote:
On Mon, Sep 16, 2024 at 11:13 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
Another question aside from the above point, what if someone has
specified logical subscribers in synchronous_standby_names? In the
case of synchronized_standby_slots, we won't proceed with such slots.Yes, it is a possibility. I have missed this point earlier. Now I
tried a case where I give a mix of logical subscriber and physical
standby in 'synchronous_standby_names' on pgHead, it even takes that
'mix' configuration and starts waiting accordingly.synchronous_standby_names = 'FIRST 2(logicalsub_1, phy_standby_1,
phy_standby_2)';
This should not happen as we don't support syncing failover slots on
logical subscribers. The other point to consider here is that the user
may not have set 'sync_replication_slots' on all the physical standbys
mentioned in 'synchronous_standby_names' and in that case, it doesn't
make sense to wait for WAL to get flushed on those standbys. What do
you think?
--
With Regards,
Amit Kapila.
On Mon, Sep 16, 2024 at 4:04 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Mon, Sep 16, 2024 at 2:55 PM shveta malik <shveta.malik@gmail.com> wrote:
On Mon, Sep 16, 2024 at 11:13 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
Another question aside from the above point, what if someone has
specified logical subscribers in synchronous_standby_names? In the
case of synchronized_standby_slots, we won't proceed with such slots.Yes, it is a possibility. I have missed this point earlier. Now I
tried a case where I give a mix of logical subscriber and physical
standby in 'synchronous_standby_names' on pgHead, it even takes that
'mix' configuration and starts waiting accordingly.synchronous_standby_names = 'FIRST 2(logicalsub_1, phy_standby_1,
phy_standby_2)';This should not happen as we don't support syncing failover slots on
logical subscribers.
+1
The other point to consider here is that the user
may not have set 'sync_replication_slots' on all the physical standbys
mentioned in 'synchronous_standby_names' and in that case, it doesn't
make sense to wait for WAL to get flushed on those standbys. What do
you think?
Yes, it is a possibility. But then it is a possibility in case of
'synchronized_standby_slots' as well. User may always configure one of
the standbys in 'synchronized_standby_slots' while may not configure
slot-sync GUCs on that standby (hot_standby_feedback,
sync_replication_slots etc). In such a case, logical replication is
dependent upon the concerned physical standby even though latter is
not syncing failover slots.
But there is no reliable way to detect this at the publisher side to
stop the 'wait' for the concerned physical standby. We tried in the
past but it was not that simple as the sync related GUCs may change
anytime on the physical standby and thus need consistent feedback
mechanism to detect this. IMO, we can explain the recommendations and
risks for 'synchronous_standby_names' in docs similar to what we do
for 'sync_replication_slots'. Or do you have something else in mind?
thanks
Shveta
On Tue, Sep 17, 2024 at 9:08 AM shveta malik <shveta.malik@gmail.com> wrote:
On Mon, Sep 16, 2024 at 4:04 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Mon, Sep 16, 2024 at 2:55 PM shveta malik <shveta.malik@gmail.com> wrote:
On Mon, Sep 16, 2024 at 11:13 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
Another question aside from the above point, what if someone has
specified logical subscribers in synchronous_standby_names? In the
case of synchronized_standby_slots, we won't proceed with such slots.Yes, it is a possibility. I have missed this point earlier. Now I
tried a case where I give a mix of logical subscriber and physical
standby in 'synchronous_standby_names' on pgHead, it even takes that
'mix' configuration and starts waiting accordingly.synchronous_standby_names = 'FIRST 2(logicalsub_1, phy_standby_1,
phy_standby_2)';This should not happen as we don't support syncing failover slots on
logical subscribers.+1
The other point to consider here is that the user
may not have set 'sync_replication_slots' on all the physical standbys
mentioned in 'synchronous_standby_names' and in that case, it doesn't
make sense to wait for WAL to get flushed on those standbys. What do
you think?Yes, it is a possibility. But then it is a possibility in case of
'synchronized_standby_slots' as well. User may always configure one of
the standbys in 'synchronized_standby_slots' while may not configure
slot-sync GUCs on that standby (hot_standby_feedback,
sync_replication_slots etc). In such a case, logical replication is
dependent upon the concerned physical standby even though latter is
not syncing failover slots.
The difference is that the purpose of 'synchronized_standby_slots' is
to mention slot names for which the user expects logical walsenders to
wait before sending the logical changes to subscribers. OTOH,
'synchronous_standby_names' has a different purpose as well. It is not
clear to me if the users would be interested in syncing failover slots
to all the standbys mentioned in 'synchronous_standby_names'.
--
With Regards,
Amit Kapila.
On Thu, Sep 19, 2024 at 12:02 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Tue, Sep 17, 2024 at 9:08 AM shveta malik <shveta.malik@gmail.com> wrote:
On Mon, Sep 16, 2024 at 4:04 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Mon, Sep 16, 2024 at 2:55 PM shveta malik <shveta.malik@gmail.com> wrote:
On Mon, Sep 16, 2024 at 11:13 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
Another question aside from the above point, what if someone has
specified logical subscribers in synchronous_standby_names? In the
case of synchronized_standby_slots, we won't proceed with such slots.Yes, it is a possibility. I have missed this point earlier. Now I
tried a case where I give a mix of logical subscriber and physical
standby in 'synchronous_standby_names' on pgHead, it even takes that
'mix' configuration and starts waiting accordingly.synchronous_standby_names = 'FIRST 2(logicalsub_1, phy_standby_1,
phy_standby_2)';This should not happen as we don't support syncing failover slots on
logical subscribers.+1
The other point to consider here is that the user
may not have set 'sync_replication_slots' on all the physical standbys
mentioned in 'synchronous_standby_names' and in that case, it doesn't
make sense to wait for WAL to get flushed on those standbys. What do
you think?Yes, it is a possibility. But then it is a possibility in case of
'synchronized_standby_slots' as well. User may always configure one of
the standbys in 'synchronized_standby_slots' while may not configure
slot-sync GUCs on that standby (hot_standby_feedback,
sync_replication_slots etc). In such a case, logical replication is
dependent upon the concerned physical standby even though latter is
not syncing failover slots.The difference is that the purpose of 'synchronized_standby_slots' is
to mention slot names for which the user expects logical walsenders to
wait before sending the logical changes to subscribers. OTOH,
'synchronous_standby_names' has a different purpose as well. It is not
clear to me if the users would be interested in syncing failover slots
to all the standbys mentioned in 'synchronous_standby_names'.
Okay, I see your point. But not sure what could be the solution here
except documenting. But let me think more.
thanks
Shveta
Hi,
On Mon, Sep 16, 2024 at 2:25 AM shveta malik <shveta.malik@gmail.com> wrote:
If we don't do something similar, then aren't there chances that we
keep on waiting on the wrong lsn[mode] for the case when mode =
SYNC_REP_WAIT_APPLY while sync-rep-wait infrastructure is updating
different mode's lsn. Is my understanding correct?
Let me take a deeper look at this, I think you're right though.
I agree. So even if the mode is SYNC_REP_WAIT_WRITE (lower one) or
SYNC_REP_WAIT_APPLY (higher one), we need to wait for
lsn[SYNC_REP_WAIT_FLUSH].
I'm not sure if I agree with that. I think the sychronous_commit mode should be
a good enough proxy for what the user wants from a durability
perspective for their
application.
For an application writing to the database, if they've set mode as
SYNC_REP_WAIT_WRITE
as fine being when a commit is treated as durable, why do we need to
be concerned
with overriding that to SYNC_REP_WAIT_FLUSH?
Similarly, if a user has mode set to SYNC_REP_WAIT_APPLY, to me it's even more
confusing that there can be scenarios where the application wouldn't
see the data as committed
nor would subsequent reads but a logical consumer would be able to.
The database should be
treated as the source of truth and I don't think logical consumers
should be ever ahead of
what the database is treating as committed.
Thanks,
--
John Hsu - Amazon Web Services
Hi,
On Fri, Sep 20, 2024 at 2:44 AM shveta malik <shveta.malik@gmail.com> wrote:
The difference is that the purpose of 'synchronized_standby_slots' is
to mention slot names for which the user expects logical walsenders to
wait before sending the logical changes to subscribers. OTOH,
'synchronous_standby_names' has a different purpose as well. It is not
clear to me if the users would be interested in syncing failover slots
to all the standbys mentioned in 'synchronous_standby_names'.Okay, I see your point. But not sure what could be the solution here
except documenting. But let me think more.
That's a great find. I didn't consider mixed physical and logical
replicas in synchronous_standby_names.
I wonder if there are users running synchronous_standby_names with a
mix of logical and
physical replicas and what the use case would be.
Not sure if there's anything straight forward we could do in general
for slot syncing if synchronous_standby_names
refers to application_names of logical replicas, the feature can't be supported.
--
John Hsu - Amazon Web Services
On Sat, Sep 21, 2024 at 6:34 AM John H <johnhyvr@gmail.com> wrote:
On Fri, Sep 20, 2024 at 2:44 AM shveta malik <shveta.malik@gmail.com> wrote:
The difference is that the purpose of 'synchronized_standby_slots' is
to mention slot names for which the user expects logical walsenders to
wait before sending the logical changes to subscribers. OTOH,
'synchronous_standby_names' has a different purpose as well. It is not
clear to me if the users would be interested in syncing failover slots
to all the standbys mentioned in 'synchronous_standby_names'.Okay, I see your point. But not sure what could be the solution here
except documenting. But let me think more.That's a great find. I didn't consider mixed physical and logical
replicas in synchronous_standby_names.
I wonder if there are users running synchronous_standby_names with a
mix of logical and
physical replicas and what the use case would be.
I am also not aware of the actual use cases of mixing physical and
logical synchronous standbys but as we provide that functionality, we
can't ignore it. BTW, I am also not sure if users would like the slots
to be synced on all the standbys mentioned in
synchronous_standby_names. and even, if they are, it is better to have
an explicit way of letting users specify it.
One possible approach is to extend the syntax of
"synchronized_standby_slots" similar to "synchronous_standby_names" so
that users can directly specify slots similarly and avoid waiting for
more than required standbys.
--
With Regards,
Amit Kapila.