From 4dac078a5e6069b29e70302c8d2a31ba899f14cc Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Mon, 24 Feb 2025 17:24:41 +0800 Subject: [PATCH v3 2/2] Copy the two_phase option in pg_copy_logical_replication_slot Previously, the two_phase option was not copied when calling pg_copy_logical_replication_slot(). However, since it is feasible and more intuitive to copy this option rather than documenting its exclusion, this patch implements the copying of the two_phase option. --- contrib/test_decoding/expected/slot.out | 26 ++++++++++++------------- contrib/test_decoding/sql/slot.sql | 6 +++--- src/backend/replication/slotfuncs.c | 11 +++++++++++ 3 files changed, 27 insertions(+), 16 deletions(-) diff --git a/contrib/test_decoding/expected/slot.out b/contrib/test_decoding/expected/slot.out index 7de03c79f6..c590901181 100644 --- a/contrib/test_decoding/expected/slot.out +++ b/contrib/test_decoding/expected/slot.out @@ -176,7 +176,7 @@ SELECT pg_drop_replication_slot('regression_slot3'); -- Test copy functions for logical replication slots -- -- Create and copy logical slots -SELECT 'init' FROM pg_create_logical_replication_slot('orig_slot1', 'test_decoding', false); +SELECT 'init' FROM pg_create_logical_replication_slot('orig_slot1', 'test_decoding', false, true); ?column? ---------- init @@ -202,18 +202,18 @@ SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_ -- Check all copied slots status SELECT - o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary + o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary, c.two_phase FROM (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn AND o.confirmed_flush_lsn = c.confirmed_flush_lsn WHERE o.slot_name != c.slot_name ORDER BY o.slot_name, c.slot_name; - slot_name | plugin | temporary | slot_name | plugin | temporary -------------+---------------+-----------+---------------------------------+---------------+----------- - orig_slot1 | test_decoding | f | copied_slot1_change_plugin | pgoutput | f - orig_slot1 | test_decoding | f | copied_slot1_change_plugin_temp | pgoutput | t - orig_slot1 | test_decoding | f | copied_slot1_no_change | test_decoding | f + slot_name | plugin | temporary | slot_name | plugin | temporary | two_phase +------------+---------------+-----------+---------------------------------+---------------+-----------+----------- + orig_slot1 | test_decoding | f | copied_slot1_change_plugin | pgoutput | f | t + orig_slot1 | test_decoding | f | copied_slot1_change_plugin_temp | pgoutput | t | t + orig_slot1 | test_decoding | f | copied_slot1_no_change | test_decoding | f | t (3 rows) -- Now we have maximum 4 replication slots. Check slots are properly @@ -267,18 +267,18 @@ SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_ -- Check all copied slots status SELECT - o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary + o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary, c.two_phase FROM (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn AND o.confirmed_flush_lsn = c.confirmed_flush_lsn WHERE o.slot_name != c.slot_name ORDER BY o.slot_name, c.slot_name; - slot_name | plugin | temporary | slot_name | plugin | temporary -------------+---------------+-----------+---------------------------------+---------------+----------- - orig_slot2 | test_decoding | t | copied_slot2_change_plugin | pgoutput | t - orig_slot2 | test_decoding | t | copied_slot2_change_plugin_temp | pgoutput | f - orig_slot2 | test_decoding | t | copied_slot2_no_change | test_decoding | t + slot_name | plugin | temporary | slot_name | plugin | temporary | two_phase +------------+---------------+-----------+---------------------------------+---------------+-----------+----------- + orig_slot2 | test_decoding | t | copied_slot2_change_plugin | pgoutput | t | f + orig_slot2 | test_decoding | t | copied_slot2_change_plugin_temp | pgoutput | f | f + orig_slot2 | test_decoding | t | copied_slot2_no_change | test_decoding | t | f (3 rows) -- Cannot copy a logical slot to a physical slot diff --git a/contrib/test_decoding/sql/slot.sql b/contrib/test_decoding/sql/slot.sql index 580e3ae3be..70356e7a56 100644 --- a/contrib/test_decoding/sql/slot.sql +++ b/contrib/test_decoding/sql/slot.sql @@ -88,14 +88,14 @@ SELECT pg_drop_replication_slot('regression_slot3'); -- -- Create and copy logical slots -SELECT 'init' FROM pg_create_logical_replication_slot('orig_slot1', 'test_decoding', false); +SELECT 'init' FROM pg_create_logical_replication_slot('orig_slot1', 'test_decoding', false, true); SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_no_change'); SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_change_plugin', false, 'pgoutput'); SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_change_plugin_temp', true, 'pgoutput'); -- Check all copied slots status SELECT - o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary + o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary, c.two_phase FROM (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn AND o.confirmed_flush_lsn = c.confirmed_flush_lsn @@ -120,7 +120,7 @@ SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_ -- Check all copied slots status SELECT - o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary + o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary, c.two_phase FROM (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn AND o.confirmed_flush_lsn = c.confirmed_flush_lsn diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 8222e5a109..acca37d333 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -707,6 +707,11 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) * up, logical replication cannot continue using the synchronized slot * on the promoted standby because the slot retains the restart_lsn and * confirmed_flush_lsn that are much later than expected. + * + * The two_phase option is not copied initially because the source + * option value could change concurrently anyway. Instead, it will be + * copied after the slot creation along with other properties like + * confirmed_flush. */ create_logical_replication_slot(NameStr(*dst_name), plugin, @@ -733,6 +738,8 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) TransactionId copy_catalog_xmin; XLogRecPtr copy_restart_lsn; XLogRecPtr copy_confirmed_flush; + XLogRecPtr copy_two_phase_at; + bool copy_two_phase; bool copy_islogical; char *copy_name; @@ -748,6 +755,8 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) copy_catalog_xmin = second_slot_contents.data.catalog_xmin; copy_restart_lsn = second_slot_contents.data.restart_lsn; copy_confirmed_flush = second_slot_contents.data.confirmed_flush; + copy_two_phase_at = second_slot_contents.data.two_phase_at; + copy_two_phase = second_slot_contents.data.two_phase; /* for existence check */ copy_name = NameStr(second_slot_contents.data.name); @@ -788,6 +797,8 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) MyReplicationSlot->data.catalog_xmin = copy_catalog_xmin; MyReplicationSlot->data.restart_lsn = copy_restart_lsn; MyReplicationSlot->data.confirmed_flush = copy_confirmed_flush; + MyReplicationSlot->data.two_phase_at = copy_two_phase_at; + MyReplicationSlot->data.two_phase = copy_two_phase; SpinLockRelease(&MyReplicationSlot->mutex); ReplicationSlotMarkDirty(); -- 2.30.0.windows.2