From be5eb6701e232dd475cdc2639c423bcbe638b068 Mon Sep 17 00:00:00 2001
From: Vignesh C <vignesh21@gmail.com>
Date: Tue, 9 Jul 2024 19:23:10 +0530
Subject: [PATCH v2] Fix random data loss during logical replication.

Previously, when adding tables to a publication in PostgreSQL, they were
locked using ShareUpdateExclusiveLock mode. This mode allowed the lock to
succeed even if there were ongoing transactions on the table. As a
consequence, the ALTER PUBLICATION command could complete before these
transactions, leading to a scenario where the catalog snapshot used for
replication did not include changes from transactions initiated before
the alteration.

To fix this issue, the locking mechanism has been revised. Tables are
now locked using ShareRowExclusiveLock mode during the addition to a
publication. This adjustment ensures that the ALTER PUBLICATION command
waits for any ongoing transactions on these tables to complete before
proceeding. As a result, transactions initiated before the publication
alteration are correctly included in the replication process, maintaining
data consistency across replicas.
---
 src/backend/commands/publicationcmds.c | 12 +++--
 src/test/subscription/t/100_bugs.pl    | 72 ++++++++++++++++++++++++++
 2 files changed, 81 insertions(+), 3 deletions(-)

diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 6ea709988e..be1fcf4f58 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -1542,8 +1542,14 @@ RemovePublicationSchemaById(Oid psoid)
 
 /*
  * Open relations specified by a PublicationTable list.
- * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
- * add them to a publication.
+ *
+ * The returned tables are locked in ShareRowExclusiveLock mode in order to
+ * add them to a publication. The table needs to be locked in
+ * ShareRowExclusiveLock mode to ensure that any ongoing transactions involving
+ * the table are completed before adding it to the publication. Failing to do
+ * so means that transactions initiated before the alteration of the
+ * publication will continue to use a catalog snapshot predating the
+ * publication change, leading to non-replication of these transaction changes.
  */
 static List *
 OpenTableList(List *tables)
@@ -1568,7 +1574,7 @@ OpenTableList(List *tables)
 		/* Allow query cancel in case this takes a long time */
 		CHECK_FOR_INTERRUPTS();
 
-		rel = table_openrv(t->relation, ShareUpdateExclusiveLock);
+		rel = table_openrv(t->relation, ShareRowExclusiveLock);
 		myrelid = RelationGetRelid(rel);
 
 		/*
diff --git a/src/test/subscription/t/100_bugs.pl b/src/test/subscription/t/100_bugs.pl
index cb36ca7b16..3316e57ff5 100644
--- a/src/test/subscription/t/100_bugs.pl
+++ b/src/test/subscription/t/100_bugs.pl
@@ -487,6 +487,78 @@ $result =
 is( $result, qq(2|f
 3|t), 'check replicated update on subscriber');
 
+# Incremental data synchronization skipped when a new table is added, if
+# there is a concurrent active transaction involving the same table.
+
+# Create table in publisher and subscriber.
+$node_publisher->safe_psql('postgres', "CREATE TABLE tab_conc(a int)");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_conc(a int)");
+
+# Bump the query timeout to avoid false negatives on slow test systems.
+my $psql_timeout_secs = 4 * $PostgreSQL::Test::Utils::timeout_default;
+
+# Initiate a background session that keeps a transaction active.
+my $background_psql1 = $node_publisher->background_psql(
+	'postgres',
+	on_error_stop => 0,
+	timeout => $psql_timeout_secs);
+
+# Maintain an active transaction with the table.
+$background_psql1->set_query_timer_restart();
+$background_psql1->query_safe(
+	qq[
+	BEGIN;
+	INSERT INTO tab_conc VALUES (1);
+]);
+
+# Add the table to the publication using background_psql, as the alter
+# publication operation will wait for the lock and can only be completed after
+# the previous open transaction is committed.
+my $background_psql2 = $node_publisher->background_psql(
+	'postgres',
+	on_error_stop => 0,
+	timeout => $psql_timeout_secs);
+
+$background_psql2->set_query_timer_restart();
+
+# This operation will wait because there is an open transaction holding a lock.
+$background_psql2->query_until(qr//,
+	"ALTER PUBLICATION pub1 ADD TABLE tab_conc;\n");
+
+# Verify that the table addition is waiting to acquire a ShareRowExclusiveLock.
+$node_publisher->poll_query_until('postgres',
+	"SELECT COUNT(1) = 1 FROM pg_locks WHERE relation = 'tab_conc'::regclass AND mode = 'ShareRowExclusiveLock' AND waitstart IS NOT NULL;"
+  )
+  or die
+  "Timed out while waiting for alter publication tries to wait on ShareRowExclusiveLock";
+
+# Complete the old transaction.
+$background_psql1->query_safe(qq[COMMIT]);
+$background_psql1->quit;
+
+$background_psql1->query_safe(qq[INSERT INTO tab_conc VALUES (2)]);
+$background_psql1->quit;
+
+# Refresh the publication.
+$node_subscriber->safe_psql('postgres',
+	'ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION');
+
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'sub1');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM tab_conc");
+is( $result, qq(1
+2), 'Ensure that the data from the tab_conc table is synchronized to the subscriber after the subscription is refreshed');
+
+# Perform an insert.
+$node_publisher->safe_psql('postgres', "INSERT INTO tab_conc values(3)");
+$node_publisher->wait_for_catchup('sub1');
+
+# Verify that the insert is replicated to the subscriber.
+$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM tab_conc");
+is( $result, qq(1
+2
+3), 'Verify that the incremental data added after table synchronization is replicated to the subscriber');
+
 $node_publisher->stop('fast');
 $node_subscriber->stop('fast');
 
-- 
2.34.1

