From bc8d153732678c2abf5ce2e1c7b59ebcbdf6dba6 Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Sun, 1 Sep 2024 17:50:31 +0800 Subject: [PATCH v21 6/6] Add a tap test to verify the new slot xmin mechanism --- src/test/subscription/meson.build | 1 + .../t/034_confl_update_deleted.pl | 208 ++++++++++++++++++ 2 files changed, 209 insertions(+) create mode 100644 src/test/subscription/t/034_confl_update_deleted.pl diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build index c591cd7d61..6303322d41 100644 --- a/src/test/subscription/meson.build +++ b/src/test/subscription/meson.build @@ -40,6 +40,7 @@ tests += { 't/031_column_list.pl', 't/032_subscribe_use_index.pl', 't/033_run_as_table_owner.pl', + 't/034_confl_update_deleted.pl', 't/100_bugs.pl', ], }, diff --git a/src/test/subscription/t/034_confl_update_deleted.pl b/src/test/subscription/t/034_confl_update_deleted.pl new file mode 100644 index 0000000000..a35448b861 --- /dev/null +++ b/src/test/subscription/t/034_confl_update_deleted.pl @@ -0,0 +1,208 @@ + +# Copyright (c) 2024, PostgreSQL Global Development Group + +# Test the CREATE SUBSCRIPTION 'feedback_slots' parameter and its interaction +# with the xmin value of replication slots. +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +my $subname_AB = 'tap_sub_a_b'; +my $subname_BA = 'tap_sub_b_a'; + +############################################################################### +# Setup a bidirectional logical replication between node_A & node_B +############################################################################### + +# Initialize nodes. Enable the track_commit_timestamp on both nodes to detect +# the conflict when attempting to update a row that was previously modified by +# a different origin. + +# node_A +my $node_A = PostgreSQL::Test::Cluster->new('node_A'); +$node_A->init(allows_streaming => 'logical'); +$node_A->append_conf('postgresql.conf', 'track_commit_timestamp = on'); +$node_A->start; + +# node_B +my $node_B = PostgreSQL::Test::Cluster->new('node_B'); +$node_B->init(allows_streaming => 'logical'); +$node_B->append_conf('postgresql.conf', 'track_commit_timestamp = on'); +$node_B->start; + +# Create table on node_A +$node_A->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY)"); + +# Create the same table on node_B +$node_B->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY)"); + +# Setup logical replication +# node_A (pub) -> node_B (sub) +my $node_A_connstr = $node_A->connstr . ' dbname=postgres'; +$node_A->safe_psql('postgres', "CREATE PUBLICATION tap_pub_A FOR TABLE tab"); +$node_B->safe_psql( + 'postgres', " + CREATE SUBSCRIPTION $subname_BA + CONNECTION '$node_A_connstr application_name=$subname_BA' + PUBLICATION tap_pub_A + WITH (origin = none)"); + +# node_B (pub) -> node_A (sub) +my $node_B_connstr = $node_B->connstr . ' dbname=postgres'; +$node_B->safe_psql('postgres', "CREATE PUBLICATION tap_pub_B FOR TABLE tab"); +$node_A->safe_psql( + 'postgres', " + CREATE SUBSCRIPTION $subname_AB + CONNECTION '$node_B_connstr application_name=$subname_AB' + PUBLICATION tap_pub_B + WITH (origin = none, copy_data = off)"); + +# Wait for initial table sync to finish +$node_A->wait_for_subscription_sync($node_B, $subname_AB); +$node_B->wait_for_subscription_sync($node_A, $subname_BA); + +is(1, 1, 'Bidirectional replication setup is complete'); + +# Confirm that the initial xmin value of each replication slot is NULL +is( $node_A->safe_psql( + 'postgres', + "SELECT xmin IS NULL from pg_replication_slots WHERE slot_name = '$subname_BA'" + ), + "t", + "The initial xmin value of slot '$subname_BA' is NULL"); + +is( $node_B->safe_psql( + 'postgres', + "SELECT xmin IS NULL from pg_replication_slots WHERE slot_name = '$subname_AB'" + ), + "t", + "The initial xmin value of slot '$subname_AB' is NULL"); + +############################################################################### +# Check that the xmin value of each replication slots will become valid after +# setting feedback_slots +############################################################################### + +# Log a xl_running_xacts to accelerate the advancement of xmin +$node_A->safe_psql('postgres', 'SELECT pg_log_standby_snapshot();'); +$node_B->safe_psql('postgres', 'SELECT pg_log_standby_snapshot();'); + +$node_B->wait_for_catchup($subname_AB); +$node_A->wait_for_catchup($subname_BA); + +$node_B->safe_psql( + 'postgres', " + ALTER SUBSCRIPTION $subname_BA SET (feedback_slots = '$subname_AB')"); + +# Insert a record to ensure that the apply worker on Node B reports its flush +# position. Once the apply worker sends this feedback, the walsender on Node A +# will update the xmin. +$node_A->safe_psql('postgres', "INSERT INTO tab VALUES (3);"); +$node_A->wait_for_catchup($subname_BA); + +ok( $node_A->poll_query_until( + 'postgres', + "SELECT xmin IS NOT NULL from pg_replication_slots WHERE slot_name = '$subname_BA'" + ), + "the xmin value of slot '$subname_BA' becomes valid"); + +$node_A->safe_psql( + 'postgres', " + ALTER SUBSCRIPTION $subname_AB SET (feedback_slots = '$subname_BA')"); + +# Insert a record to ensure that the apply worker on Node A reports its flush +# position. Once the apply worker sends this feedback, the walsender on Node B +# will update the xmin. +$node_B->safe_psql('postgres', "INSERT INTO tab VALUES (4);"); +$node_B->wait_for_catchup($subname_AB); + +my $result = $node_A->safe_psql('postgres', "SELECT * FROM tab;"); +is( $result, qq(3 +4), + 'check replicated insert on node A' +); + +$result = $node_B->safe_psql('postgres', "SELECT * FROM tab;"); +is( $result, qq(3 +4), + 'check replicated insert on node B' +); + +ok( $node_B->poll_query_until( + 'postgres', + "SELECT xmin IS NOT NULL from pg_replication_slots WHERE slot_name = '$subname_AB'" + ), + "the xmin value of slot '$subname_AB' becomes valid"); + +# Cleanup the test data +$node_B->safe_psql('postgres', "DELETE FROM tab;"); +$node_B->wait_for_catchup($subname_AB); + +############################################################################### +# Check that dead tuples on node A cannot be cleaned by VACUUM until the DELETE +# operations that occurred on the node A have been replayed on the node B, and +# any changes on node B that occurred before the replay of these DELETE +# operations are also replayed on node A. +############################################################################### + +# insert a record +$node_A->safe_psql('postgres', "INSERT INTO tab VALUES (1);"); +$node_A->wait_for_catchup($subname_BA); + +$result = $node_B->safe_psql('postgres', "SELECT * FROM tab;"); +is($result, qq(1), 'check replicated insert on node B'); + +# Disable the logical replication from node B to node A +$node_A->safe_psql('postgres', "ALTER SUBSCRIPTION $subname_AB DISABLE"); + +my $log_location = -s $node_B->logfile; + +$node_B->safe_psql('postgres', "UPDATE tab SET a=a+1;"); +$node_A->safe_psql('postgres', "DELETE FROM tab;"); + +$node_A->wait_for_catchup($subname_BA); + +my $logfile = slurp_file($node_B->logfile(), $log_location); +ok( $logfile =~ + qr/conflict detected on relation "public.tab": conflict=delete_missing.*\n.*DETAIL:.* Could not find the row to be deleted.*\n.*Replica identity \(a\)=\(1\)/, + 'delete target row is missing in tab'); + +$log_location = -s $node_A->logfile; + +$node_A->safe_psql( + 'postgres', "ALTER SUBSCRIPTION $subname_AB ENABLE;"); +$node_B->wait_for_catchup($subname_AB); + +$logfile = slurp_file($node_A->logfile(), $log_location); +ok( $logfile =~ + qr/conflict detected on relation "public.tab": conflict=update_deleted.*\n.*DETAIL:.* The row to be updated was deleted locally in transaction [0-9]+ at .*\n.*Remote tuple \(2\); replica identity \(a\)=\(1\)/, + 'update target row was deleted in tab'); + +############################################################################### +# Check that the xmin value of each replication slots will become invalid after +# disabling feedback_slots +############################################################################### + +$node_B->safe_psql( + 'postgres', " + ALTER SUBSCRIPTION $subname_BA SET (feedback_slots = NONE)"); + +ok( $node_A->poll_query_until( + 'postgres', + "SELECT xmin IS NULL from pg_replication_slots WHERE slot_name = '$subname_BA'" + ), + "the xmin value of slot '$subname_BA' becomes invalid"); + +$node_A->safe_psql( + 'postgres', " + ALTER SUBSCRIPTION $subname_AB SET (feedback_slots = NONE)"); + +ok( $node_B->poll_query_until( + 'postgres', + "SELECT xmin IS NULL from pg_replication_slots WHERE slot_name = '$subname_AB'" + ), + "the xmin value of slot '$subname_AB' becomes invalid"); + +done_testing(); -- 2.30.0.windows.2