From 5b2bf6411e0792fac0e667d5dee3272ba3eb0908 Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Thu, 5 Sep 2024 12:41:52 +0800 Subject: [PATCH v21 5/6] Support copying xmin value of slots during slotsync --- src/backend/replication/logical/slotsync.c | 17 ++++++-- .../t/040_standby_failover_slots_sync.pl | 39 +++++++++++++++---- 2 files changed, 45 insertions(+), 11 deletions(-) diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index f9649eec1a..7e8b8767a5 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -145,6 +145,9 @@ typedef struct RemoteSlot /* RS_INVAL_NONE if valid, or the reason of invalidation */ ReplicationSlotInvalidationCause invalidated; + + /* Valid iff the feedback_slot is enabled */ + XLogRecPtr xmin; } RemoteSlot; static void slotsync_failure_callback(int code, Datum arg); @@ -231,7 +234,8 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid, else if (remote_slot->confirmed_lsn > slot->data.confirmed_flush || remote_slot->restart_lsn > slot->data.restart_lsn || TransactionIdFollows(remote_slot->catalog_xmin, - slot->data.catalog_xmin)) + slot->data.catalog_xmin) || + TransactionIdFollows(remote_slot->xmin, slot->data.xmin)) { /* * We can't directly copy the remote slot's LSN or xmin unless there @@ -252,6 +256,7 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid, slot->data.restart_lsn = remote_slot->restart_lsn; slot->data.confirmed_flush = remote_slot->confirmed_lsn; slot->data.catalog_xmin = remote_slot->catalog_xmin; + slot->data.xmin = remote_slot->xmin; SpinLockRelease(&slot->mutex); if (found_consistent_snapshot) @@ -316,6 +321,7 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid, { SpinLockAcquire(&slot->mutex); slot->effective_catalog_xmin = remote_slot->catalog_xmin; + slot->effective_xmin = remote_slot->xmin; SpinLockRelease(&slot->mutex); ReplicationSlotsComputeRequiredXmin(false); @@ -790,9 +796,9 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) static bool synchronize_slots(WalReceiverConn *wrconn) { -#define SLOTSYNC_COLUMN_COUNT 9 +#define SLOTSYNC_COLUMN_COUNT 10 Oid slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID, - LSNOID, XIDOID, BOOLOID, BOOLOID, TEXTOID, TEXTOID}; + LSNOID, XIDOID, BOOLOID, BOOLOID, TEXTOID, TEXTOID, XIDOID}; WalRcvExecResult *res; TupleTableSlot *tupslot; @@ -801,7 +807,7 @@ synchronize_slots(WalReceiverConn *wrconn) bool started_tx = false; const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn," " restart_lsn, catalog_xmin, two_phase, failover," - " database, invalidation_reason" + " database, invalidation_reason, xmin" " FROM pg_catalog.pg_replication_slots" " WHERE failover and NOT temporary"; @@ -867,6 +873,9 @@ synchronize_slots(WalReceiverConn *wrconn) remote_slot->invalidated = isnull ? RS_INVAL_NONE : GetSlotInvalidationCause(TextDatumGetCString(d)); + d = slot_getattr(tupslot, ++col, &isnull); + remote_slot->xmin = isnull ? InvalidXLogRecPtr : DatumGetTransactionId(d); + /* Sanity check */ Assert(col == SLOTSYNC_COLUMN_COUNT); diff --git a/src/test/recovery/t/040_standby_failover_slots_sync.pl b/src/test/recovery/t/040_standby_failover_slots_sync.pl index a474c626d9..f267d0575d 100644 --- a/src/test/recovery/t/040_standby_failover_slots_sync.pl +++ b/src/test/recovery/t/040_standby_failover_slots_sync.pl @@ -33,6 +33,7 @@ my $publisher_connstr = $publisher->connstr . ' dbname=postgres'; # Create a subscriber node, wait for sync to complete my $subscriber1 = PostgreSQL::Test::Cluster->new('subscriber1'); $subscriber1->init; +$subscriber1->append_conf('postgresql.conf', 'wal_level = logical'); $subscriber1->start; # Capture the time before the logical failover slot is created on the @@ -542,8 +543,8 @@ $standby1->reload; $standby1->wait_for_log(qr/slot sync worker started/, $log_offset); ################################################## -# Test to confirm that confirmed_flush_lsn of the logical slot on the primary -# is synced to the standby via the slot sync worker. +# Test to confirm that confirmed_flush_lsn and xmin of the logical slot on the +# primary is synced to the standby via the slot sync worker. ################################################## # Insert data on the primary @@ -553,11 +554,15 @@ $primary->safe_psql( INSERT INTO tab_int SELECT generate_series(1, 10); ]); +# Create another replication slot needed for feedback_slots option +$subscriber1->safe_psql('postgres', + "SELECT pg_create_logical_replication_slot('for_feedback', 'test_decoding');"); + # Subscribe to the new table data and wait for it to arrive $subscriber1->safe_psql( 'postgres', qq[ CREATE TABLE tab_int (a int PRIMARY KEY); - CREATE SUBSCRIPTION regress_mysub1 CONNECTION '$publisher_connstr' PUBLICATION regress_mypub WITH (slot_name = lsub1_slot, failover = true, create_slot = false); + CREATE SUBSCRIPTION regress_mysub1 CONNECTION '$publisher_connstr' PUBLICATION regress_mypub WITH (slot_name = lsub1_slot, failover = true, create_slot = false, feedback_slots = 'for_feedback'); ]); $subscriber1->wait_for_subscription_sync; @@ -573,17 +578,37 @@ $primary->poll_query_until( "SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active='f'", 1); -# Get the confirmed_flush_lsn for the logical slot lsub1_slot on the primary +# Get confirmed_flush_lsn for the logical slot lsub1_slot on the primary my $primary_flush_lsn = $primary->safe_psql('postgres', "SELECT confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot';" ); -# Confirm that confirmed_flush_lsn of lsub1_slot slot is synced to the standby +# Get xmin as well, but ensure the attribute has been updated +ok( $primary->poll_query_until( + 'postgres', + "SELECT xmin IS NOT NULL from pg_replication_slots WHERE slot_name = 'lsub1_slot';" + ), + 'xmin is set'); + +my $primary_xmin = $primary->safe_psql('postgres', + "SELECT xmin from pg_replication_slots WHERE slot_name = 'lsub1_slot';" +); + +# Confirm that both confirmed_flush_lsn and xmin of lsub1_slot slot are synced +# to the standby ok( $standby1->poll_query_until( 'postgres', - "SELECT '$primary_flush_lsn' = confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot' AND synced AND NOT temporary;" + "SELECT '$primary_flush_lsn' = confirmed_flush_lsn AND '$primary_xmin' = xmin from pg_replication_slots WHERE slot_name = 'lsub1_slot' AND synced AND NOT temporary;" ), - 'confirmed_flush_lsn of slot lsub1_slot synced to standby'); + 'confirmed_flush_lsn and xmin of slot lsub1_slot synced to standby'); + +# Reset feedback_slots and drop the slot because it won't be used by upcoming +# tests +$subscriber1->safe_psql('postgres', + "ALTER SUBSCRIPTION regress_mysub1 SET (feedback_slots = NONE);"); + +$subscriber1->safe_psql('postgres', + "SELECT pg_drop_replication_slot('for_feedback');"); ################################################## # Test that logical failover replication slots wait for the specified -- 2.30.0.windows.2