Logical replication - schema change not invalidating the relation cache

Started by vignesh Cover 4 years ago14 messages
#1vignesh C
vignesh21@gmail.com
1 attachment(s)

Hi,

I found a strange behavior when there is an insert after renaming the
schema. The test steps for the same are given below, Here after the
schema is renamed, the renamed schema table data should not be sent,
but the data was being sent. I felt the schema invalidation was not
called, attached a patch to handle the same. Thoughts?

step 1)
Create schema sch1;
Create table sch1.t1(c1 int);
CREATE PUBLICATION mypub1 FOR all tables;

Step 2)
CREATE SCHEMA sch1;
CREATE TABLE sch1.t1(c1 int);
CREATE SCHEMA sch2;
CREATE TABLE sch2.t1(c1 int);
CREATE TABLE t1(c1 int);
CREATE SUBSCRIPTION mysub1 CONNECTION 'host=localhost port=5432
dbname=postgres' PUBLICATION mypub1;

Step 3)
begin;
insert into sch1.t1 values(1);
alter schema sch1 rename to sch2;
create schema sch1;
create table sch1.t1(c1 int);
insert into sch1.t1 values(2);
insert into sch2.t1 values(3);
commit;

step 4)
select * from sch1.t1; # In subscriber
Got:
c1
----
1
2
3
(3 rows)

Expected:
c1
----
1
2
(2 rows)

Regards,
Vignesh

Attachments:

0001-Fix-for-invalidating-logical-replication-relations-w.patchtext/x-patch; charset=US-ASCII; name=0001-Fix-for-invalidating-logical-replication-relations-w.patchDownload
From 66b96e213ee390f0f069b81b4a60d8afc91014c6 Mon Sep 17 00:00:00 2001
From: vignesh <vignesh21@gmail.com>
Date: Fri, 2 Jul 2021 10:42:37 +0530
Subject: [PATCH] Fix for invalidating logical replication relations when there
 is a change in schema.

When the schema gets changed, the rel sync cache invalidation was not
happening, fixed it by adding a callback for schema change.
---
 src/backend/replication/pgoutput/pgoutput.c | 51 ++++++++++++++
 src/test/subscription/t/001_rep_changes.pl  | 76 ++++++++++++++++++++-
 2 files changed, 126 insertions(+), 1 deletion(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index abd5217ab1..aca4d72e14 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -126,6 +126,8 @@ static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid);
 static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
 static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
 										  uint32 hashvalue);
+static void rel_sync_cache_namespace_cb(Datum arg, int cacheid,
+										uint32 hashvalue);
 static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
 											TransactionId xid);
 static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
@@ -960,6 +962,9 @@ init_rel_sync_cache(MemoryContext cachectx)
 	CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
 								  rel_sync_cache_publication_cb,
 								  (Datum) 0);
+	CacheRegisterSyscacheCallback(NAMESPACEOID,
+								  rel_sync_cache_namespace_cb,
+								  (Datum) 0);
 }
 
 /*
@@ -1234,6 +1239,52 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
 	}
 }
 
+/*
+ * Namespace syscache invalidation callback
+ */
+static void
+rel_sync_cache_namespace_cb(Datum arg, int cacheid, uint32 hashvalue)
+{
+	HASH_SEQ_STATUS status;
+	RelationSyncEntry *entry;
+
+	/*
+	 * We can get here if the plugin was used in SQL interface as the
+	 * RelSchemaSyncCache is destroyed when the decoding finishes, but there
+	 * is no way to unregister the relcache invalidation callback.
+	 */
+	if (RelationSyncCache == NULL)
+		return;
+
+	hash_seq_init(&status, RelationSyncCache);
+	while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
+	{
+		/*
+		 * Reset schema sent status as the relation definition may have changed.
+		 * Also free any objects that depended on the earlier definition.
+		 */
+		entry->schema_sent = false;
+		list_free(entry->streamed_txns);
+		entry->streamed_txns = NIL;
+		if (entry->map)
+		{
+			/*
+			 * Must free the TupleDescs contained in the map explicitly,
+			 * because free_conversion_map() doesn't.
+			 */
+			FreeTupleDesc(entry->map->indesc);
+			FreeTupleDesc(entry->map->outdesc);
+			free_conversion_map(entry->map);
+		}
+		entry->map = NULL;
+
+		if (hash_search(RelationSyncCache,
+				(void *) &entry->relid,
+				HASH_REMOVE, NULL) == NULL)
+			elog(ERROR, "hash table corrupted");
+	}
+}
+
 /*
  * Publication relation map syscache invalidation callback
  */
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
index dee5f5c30a..aeb4157044 100644
--- a/src/test/subscription/t/001_rep_changes.pl
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -6,7 +6,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 32;
+use Test::More tests => 33;
 
 # Initialize publisher node
 my $node_publisher = get_new_node('publisher');
@@ -518,6 +518,80 @@ is($result, qq(0), 'check replication origin was dropped on subscriber');
 $node_subscriber->stop('fast');
 $node_publisher->stop('fast');
 
+# Test schema invalidation by renaming the schema
+# Create tables on publisher
+# Initialize publisher node
+my $node_publisher1 = get_new_node('publisher1');
+$node_publisher1->init(allows_streaming => 'logical');
+$node_publisher1->start;
+
+# Create subscriber node
+my $node_subscriber1 = get_new_node('subscriber1');
+$node_subscriber1->init(allows_streaming => 'logical');
+$node_subscriber1->start;
+
+my $publisher1_connstr = $node_publisher1->connstr . ' dbname=postgres';
+
+$node_publisher1->safe_psql('postgres', "CREATE SCHEMA sch1");
+$node_publisher1->safe_psql('postgres', "CREATE TABLE sch1.t1 (c1 int)");
+
+# Create tables on subscriber
+$node_subscriber1->safe_psql('postgres', "CREATE SCHEMA sch1");
+$node_subscriber1->safe_psql('postgres', "CREATE TABLE sch1.t1 (c1 int)");
+$node_subscriber1->safe_psql('postgres', "CREATE SCHEMA sch2");
+$node_subscriber1->safe_psql('postgres', "CREATE TABLE sch2.t1 (c1 int)");
+
+# Setup logical replication that will only be used for this test
+$node_publisher1->safe_psql('postgres',
+        "CREATE PUBLICATION tap_pub_sch FOR ALL TABLES"
+);
+$node_subscriber1->safe_psql('postgres',
+        "CREATE SUBSCRIPTION tap_sub_sch CONNECTION '$publisher1_connstr' PUBLICATION tap_pub_sch"
+);
+
+$node_publisher1->wait_for_catchup('tap_sub_sch');
+
+# Also wait for initial table sync to finish
+$synced_query =
+  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber1->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+$node_publisher1->safe_psql('postgres',
+        "begin;
+insert into sch1.t1 values(1);
+alter schema sch1 rename to sch2;
+create schema sch1;
+create table sch1.t1(c1 int);
+insert into sch1.t1 values(2);
+insert into sch2.t1 values(3);
+commit;");
+
+$node_publisher1->wait_for_catchup('tap_sub_sch');
+$node_subscriber1->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# Subscriber should not receive the inserted row for renamed schema
+$result =
+  $node_subscriber1->safe_psql('postgres', "SELECT * FROM sch1.t1");
+is($result, qq(1
+2), 'check rows on subscriber after schema invalidation');
+
+# Drop subscription as we don't need it anymore
+$node_subscriber1->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_sch");
+
+# Drop publications as we don't need them anymore
+$node_publisher1->safe_psql('postgres', "DROP PUBLICATION tap_pub_sch");
+
+# Clean up the tables on both publisher and subscriber as we don't need them
+$node_publisher1->safe_psql('postgres', "DROP SCHEMA sch1 cascade");
+$node_publisher1->safe_psql('postgres', "DROP SCHEMA sch2 cascade");
+$node_subscriber1->safe_psql('postgres', "DROP SCHEMA sch1 cascade");
+$node_subscriber1->safe_psql('postgres', "DROP SCHEMA sch2 cascade");
+
+$node_subscriber1->stop('fast');
+$node_publisher1->stop('fast');
+
 # CREATE PUBLICATION while wal_level=minimal should succeed, with a WARNING
 $node_publisher->append_conf(
 	'postgresql.conf', qq(
-- 
2.25.1

#2Dilip Kumar
dilipbalaut@gmail.com
In reply to: vignesh C (#1)
Re: Logical replication - schema change not invalidating the relation cache

On Fri, Jul 2, 2021 at 11:11 AM vignesh C <vignesh21@gmail.com> wrote:

Hi,

I found a strange behavior when there is an insert after renaming the
schema. The test steps for the same are given below, Here after the
schema is renamed, the renamed schema table data should not be sent,
but the data was being sent. I felt the schema invalidation was not
called, attached a patch to handle the same. Thoughts?

step 1)
Create schema sch1;
Create table sch1.t1(c1 int);
CREATE PUBLICATION mypub1 FOR all tables;

Step 2)
CREATE SCHEMA sch1;
CREATE TABLE sch1.t1(c1 int);
CREATE SCHEMA sch2;
CREATE TABLE sch2.t1(c1 int);
CREATE TABLE t1(c1 int);
CREATE SUBSCRIPTION mysub1 CONNECTION 'host=localhost port=5432
dbname=postgres' PUBLICATION mypub1;

Step 3)
begin;
insert into sch1.t1 values(1);
alter schema sch1 rename to sch2;
create schema sch1;
create table sch1.t1(c1 int);
insert into sch1.t1 values(2);
insert into sch2.t1 values(3);
commit;

step 4)
select * from sch1.t1; # In subscriber
Got:
c1
----
1
2
3
(3 rows)

Expected:
c1
----
1
2
(2 rows)

Yeah, this looks like a bug. I will look at the patch.

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

#3Dilip Kumar
dilipbalaut@gmail.com
In reply to: Dilip Kumar (#2)
Re: Logical replication - schema change not invalidating the relation cache

On Fri, Jul 2, 2021 at 12:03 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

Yeah, this looks like a bug. I will look at the patch.

While looking into this, I think the main cause of the problem is that
schema rename does not invalidate the relation cache right? I also
tried other cases e.g. if there is an open cursor and we rename the
schema

CREATE SCHEMA sch1;
CREATE TABLE sch1.t1(c1 int);
insert into sch1.t1 values(1);
insert into sch1.t1 values(2);
insert into sch1.t1 values(3);
BEGIN;
DECLARE mycur CURSOR FOR SELECT * FROM sch1.t1;
FETCH NEXT FROM mycur ;
----------At this point rename sch1 to sch2 from another session------
FETCH NEXT FROM mycur ;
UPDATE sch2.t1 SET c1 = 20 WHERE CURRENT OF mycur;
select * from sch2.t1 ;

So even after the schema rename the cursor is able to fetch and its
also able to update on the same table in the new schema, ideally using
CURRENT OF CUR, you can update the same table for which you have
declared the cursor. I am giving this example because this behavior
also looks somewhat similar.

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

#4vignesh C
vignesh21@gmail.com
In reply to: Dilip Kumar (#3)
Re: Logical replication - schema change not invalidating the relation cache

On Sat, Jul 3, 2021 at 11:23 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Fri, Jul 2, 2021 at 12:03 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

Yeah, this looks like a bug. I will look at the patch.

While looking into this, I think the main cause of the problem is that
schema rename does not invalidate the relation cache right? I also
tried other cases e.g. if there is an open cursor and we rename the
schema

CREATE SCHEMA sch1;
CREATE TABLE sch1.t1(c1 int);
insert into sch1.t1 values(1);
insert into sch1.t1 values(2);
insert into sch1.t1 values(3);
BEGIN;
DECLARE mycur CURSOR FOR SELECT * FROM sch1.t1;
FETCH NEXT FROM mycur ;
----------At this point rename sch1 to sch2 from another session------
FETCH NEXT FROM mycur ;
UPDATE sch2.t1 SET c1 = 20 WHERE CURRENT OF mycur;
select * from sch2.t1 ;

So even after the schema rename the cursor is able to fetch and its
also able to update on the same table in the new schema, ideally using
CURRENT OF CUR, you can update the same table for which you have
declared the cursor. I am giving this example because this behavior
also looks somewhat similar.

It works in this case because it uses the relation id for performing
the next fetch and the relation id does not get changed after renaming
the schema. Also since it holds a lock on the relation, alter/drop
operations will not be allowed. I felt this behavior might be ok. But
the original scenario reported is an issue because it replicates the
data of both the original table and the renamed schema's table.

Regards,
Vignesh

#5vignesh C
vignesh21@gmail.com
In reply to: vignesh C (#4)
1 attachment(s)
Re: Logical replication - schema change not invalidating the relation cache

On Fri, Jul 16, 2021 at 10:51 PM vignesh C <vignesh21@gmail.com> wrote:

On Sat, Jul 3, 2021 at 11:23 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Fri, Jul 2, 2021 at 12:03 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

Yeah, this looks like a bug. I will look at the patch.

While looking into this, I think the main cause of the problem is that
schema rename does not invalidate the relation cache right? I also
tried other cases e.g. if there is an open cursor and we rename the
schema

CREATE SCHEMA sch1;
CREATE TABLE sch1.t1(c1 int);
insert into sch1.t1 values(1);
insert into sch1.t1 values(2);
insert into sch1.t1 values(3);
BEGIN;
DECLARE mycur CURSOR FOR SELECT * FROM sch1.t1;
FETCH NEXT FROM mycur ;
----------At this point rename sch1 to sch2 from another session------
FETCH NEXT FROM mycur ;
UPDATE sch2.t1 SET c1 = 20 WHERE CURRENT OF mycur;
select * from sch2.t1 ;

So even after the schema rename the cursor is able to fetch and its
also able to update on the same table in the new schema, ideally using
CURRENT OF CUR, you can update the same table for which you have
declared the cursor. I am giving this example because this behavior
also looks somewhat similar.

It works in this case because it uses the relation id for performing
the next fetch and the relation id does not get changed after renaming
the schema. Also since it holds a lock on the relation, alter/drop
operations will not be allowed. I felt this behavior might be ok. But
the original scenario reported is an issue because it replicates the
data of both the original table and the renamed schema's table.

The previous patch was failing because of the recent test changes made
by commit 201a76183e2 which unified new and get_new_node, attached
patch has the changes to handle the changes accordingly.

Regards,
Vignesh

Attachments:

v1-0001-Fix-for-invalidating-logical-replication-relation.patchapplication/x-patch; name=v1-0001-Fix-for-invalidating-logical-replication-relation.patchDownload
From e86247f9502727f2c2e5f41489f8bbd4f69b24e4 Mon Sep 17 00:00:00 2001
From: Vigneshwaran C <vignesh21@gmail.com>
Date: Thu, 26 Aug 2021 19:55:35 +0530
Subject: [PATCH v1] Fix for invalidating logical replication relations when
 there is a change in schema.

When the schema gets changed, the rel sync cache invalidation was not
happening, fixed it by adding a callback for schema change.
---
 src/backend/replication/pgoutput/pgoutput.c | 51 ++++++++++++++
 src/test/subscription/t/001_rep_changes.pl  | 76 ++++++++++++++++++++-
 2 files changed, 126 insertions(+), 1 deletion(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 14d737fd93..1eec9f603d 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -141,6 +141,8 @@ static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid);
 static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
 static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
 										  uint32 hashvalue);
+static void rel_sync_cache_namespace_cb(Datum arg, int cacheid,
+										uint32 hashvalue);
 static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
 											TransactionId xid);
 static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
@@ -1068,6 +1070,9 @@ init_rel_sync_cache(MemoryContext cachectx)
 	CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
 								  rel_sync_cache_publication_cb,
 								  (Datum) 0);
+	CacheRegisterSyscacheCallback(NAMESPACEOID,
+								  rel_sync_cache_namespace_cb,
+								  (Datum) 0);
 }
 
 /*
@@ -1342,6 +1347,52 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
 	}
 }
 
+/*
+ * Namespace syscache invalidation callback
+ */
+static void
+rel_sync_cache_namespace_cb(Datum arg, int cacheid, uint32 hashvalue)
+{
+	HASH_SEQ_STATUS status;
+	RelationSyncEntry *entry;
+
+	/*
+	 * We can get here if the plugin was used in SQL interface as the
+	 * RelSchemaSyncCache is destroyed when the decoding finishes, but there
+	 * is no way to unregister the relcache invalidation callback.
+	 */
+	if (RelationSyncCache == NULL)
+		return;
+
+	hash_seq_init(&status, RelationSyncCache);
+	while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
+	{
+		/*
+		 * Reset schema sent status as the relation definition may have changed.
+		 * Also free any objects that depended on the earlier definition.
+		 */
+		entry->schema_sent = false;
+		list_free(entry->streamed_txns);
+		entry->streamed_txns = NIL;
+		if (entry->map)
+		{
+			/*
+			 * Must free the TupleDescs contained in the map explicitly,
+			 * because free_conversion_map() doesn't.
+			 */
+			FreeTupleDesc(entry->map->indesc);
+			FreeTupleDesc(entry->map->outdesc);
+			free_conversion_map(entry->map);
+		}
+		entry->map = NULL;
+
+		if (hash_search(RelationSyncCache,
+				(void *) &entry->relid,
+				HASH_REMOVE, NULL) == NULL)
+			elog(ERROR, "hash table corrupted");
+	}
+}
+
 /*
  * Publication relation map syscache invalidation callback
  */
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
index 0c84d87873..88763771b2 100644
--- a/src/test/subscription/t/001_rep_changes.pl
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -6,7 +6,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 32;
+use Test::More tests => 33;
 
 # Initialize publisher node
 my $node_publisher = PostgresNode->new('publisher');
@@ -518,6 +518,80 @@ is($result, qq(0), 'check replication origin was dropped on subscriber');
 $node_subscriber->stop('fast');
 $node_publisher->stop('fast');
 
+# Test schema invalidation by renaming the schema
+# Create tables on publisher
+# Initialize publisher node
+my $node_publisher1 = PostgresNode->new('publisher1');
+$node_publisher1->init(allows_streaming => 'logical');
+$node_publisher1->start;
+
+# Create subscriber node
+my $node_subscriber1 = PostgresNode->new('subscriber1');
+$node_subscriber1->init(allows_streaming => 'logical');
+$node_subscriber1->start;
+
+my $publisher1_connstr = $node_publisher1->connstr . ' dbname=postgres';
+
+$node_publisher1->safe_psql('postgres', "CREATE SCHEMA sch1");
+$node_publisher1->safe_psql('postgres', "CREATE TABLE sch1.t1 (c1 int)");
+
+# Create tables on subscriber
+$node_subscriber1->safe_psql('postgres', "CREATE SCHEMA sch1");
+$node_subscriber1->safe_psql('postgres', "CREATE TABLE sch1.t1 (c1 int)");
+$node_subscriber1->safe_psql('postgres', "CREATE SCHEMA sch2");
+$node_subscriber1->safe_psql('postgres', "CREATE TABLE sch2.t1 (c1 int)");
+
+# Setup logical replication that will only be used for this test
+$node_publisher1->safe_psql('postgres',
+        "CREATE PUBLICATION tap_pub_sch FOR ALL TABLES"
+);
+$node_subscriber1->safe_psql('postgres',
+        "CREATE SUBSCRIPTION tap_sub_sch CONNECTION '$publisher1_connstr' PUBLICATION tap_pub_sch"
+);
+
+$node_publisher1->wait_for_catchup('tap_sub_sch');
+
+# Also wait for initial table sync to finish
+$synced_query =
+  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber1->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+$node_publisher1->safe_psql('postgres',
+        "begin;
+insert into sch1.t1 values(1);
+alter schema sch1 rename to sch2;
+create schema sch1;
+create table sch1.t1(c1 int);
+insert into sch1.t1 values(2);
+insert into sch2.t1 values(3);
+commit;");
+
+$node_publisher1->wait_for_catchup('tap_sub_sch');
+$node_subscriber1->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# Subscriber should not receive the inserted row for renamed schema
+$result =
+  $node_subscriber1->safe_psql('postgres', "SELECT * FROM sch1.t1");
+is($result, qq(1
+2), 'check rows on subscriber after schema invalidation');
+
+# Drop subscription as we don't need it anymore
+$node_subscriber1->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_sch");
+
+# Drop publications as we don't need them anymore
+$node_publisher1->safe_psql('postgres', "DROP PUBLICATION tap_pub_sch");
+
+# Clean up the tables on both publisher and subscriber as we don't need them
+$node_publisher1->safe_psql('postgres', "DROP SCHEMA sch1 cascade");
+$node_publisher1->safe_psql('postgres', "DROP SCHEMA sch2 cascade");
+$node_subscriber1->safe_psql('postgres', "DROP SCHEMA sch1 cascade");
+$node_subscriber1->safe_psql('postgres', "DROP SCHEMA sch2 cascade");
+
+$node_subscriber1->stop('fast');
+$node_publisher1->stop('fast');
+
 # CREATE PUBLICATION while wal_level=minimal should succeed, with a WARNING
 $node_publisher->append_conf(
 	'postgresql.conf', qq(
-- 
2.30.2

#6Michael Paquier
michael@paquier.xyz
In reply to: vignesh C (#5)
Re: Logical replication - schema change not invalidating the relation cache

On Thu, Aug 26, 2021 at 09:00:39PM +0530, vignesh C wrote:

The previous patch was failing because of the recent test changes made
by commit 201a76183e2 which unified new and get_new_node, attached
patch has the changes to handle the changes accordingly.

Please note that the CF app is complaining about this patch, so a
rebase is required. I have moved it to next CF, waiting on author,
for now.
--
Michael

#7vignesh C
vignesh21@gmail.com
In reply to: Michael Paquier (#6)
1 attachment(s)
Re: Logical replication - schema change not invalidating the relation cache

On Fri, Dec 3, 2021 at 1:13 PM Michael Paquier <michael@paquier.xyz> wrote:

On Thu, Aug 26, 2021 at 09:00:39PM +0530, vignesh C wrote:

The previous patch was failing because of the recent test changes made
by commit 201a76183e2 which unified new and get_new_node, attached
patch has the changes to handle the changes accordingly.

Please note that the CF app is complaining about this patch, so a
rebase is required. I have moved it to next CF, waiting on author,
for now.

Thanks for letting me know, I have rebased it on top of HEAD, the
attached v2 version has the rebased changes.

Regards,
Vignesh

Attachments:

v2-0001-Fix-for-invalidating-logical-replication-relation.patchtext/x-patch; charset=US-ASCII; name=v2-0001-Fix-for-invalidating-logical-replication-relation.patchDownload
From e91a4ff5d3f3d615de3e959c7dae691bda798195 Mon Sep 17 00:00:00 2001
From: Vigneshwaran C <vignesh21@gmail.com>
Date: Fri, 3 Dec 2021 15:18:53 +0530
Subject: [PATCH v2] Fix for invalidating logical replication relations when
 there is a change in schema.

When the schema gets changed, the rel sync cache invalidation was not
happening, fixed it by adding a callback for schema change.
---
 src/backend/replication/pgoutput/pgoutput.c | 51 ++++++++++++++
 src/test/subscription/t/001_rep_changes.pl  | 76 ++++++++++++++++++++-
 2 files changed, 126 insertions(+), 1 deletion(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 6f6a203dea..03a105dc86 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -141,6 +141,8 @@ static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid);
 static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
 static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
 										  uint32 hashvalue);
+static void rel_sync_cache_namespace_cb(Datum arg, int cacheid,
+										uint32 hashvalue);
 static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
 											TransactionId xid);
 static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
@@ -1071,6 +1073,9 @@ init_rel_sync_cache(MemoryContext cachectx)
 	CacheRegisterSyscacheCallback(PUBLICATIONNAMESPACEMAP,
 								  rel_sync_cache_publication_cb,
 								  (Datum) 0);
+	CacheRegisterSyscacheCallback(NAMESPACEOID,
+								  rel_sync_cache_namespace_cb,
+								  (Datum) 0);
 }
 
 /*
@@ -1357,6 +1362,52 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
 	}
 }
 
+/*
+ * Namespace syscache invalidation callback
+ */
+static void
+rel_sync_cache_namespace_cb(Datum arg, int cacheid, uint32 hashvalue)
+{
+	HASH_SEQ_STATUS status;
+	RelationSyncEntry *entry;
+
+	/*
+	 * We can get here if the plugin was used in SQL interface as the
+	 * RelSchemaSyncCache is destroyed when the decoding finishes, but there
+	 * is no way to unregister the relcache invalidation callback.
+	 */
+	if (RelationSyncCache == NULL)
+		return;
+
+	hash_seq_init(&status, RelationSyncCache);
+	while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
+	{
+		/*
+		 * Reset schema sent status as the relation definition may have changed.
+		 * Also free any objects that depended on the earlier definition.
+		 */
+		entry->schema_sent = false;
+		list_free(entry->streamed_txns);
+		entry->streamed_txns = NIL;
+		if (entry->map)
+		{
+			/*
+			 * Must free the TupleDescs contained in the map explicitly,
+			 * because free_conversion_map() doesn't.
+			 */
+			FreeTupleDesc(entry->map->indesc);
+			FreeTupleDesc(entry->map->outdesc);
+			free_conversion_map(entry->map);
+		}
+		entry->map = NULL;
+
+		if (hash_search(RelationSyncCache,
+				(void *) &entry->relid,
+				HASH_REMOVE, NULL) == NULL)
+			elog(ERROR, "hash table corrupted");
+	}
+}
+
 /*
  * Publication relation/schema map syscache invalidation callback
  */
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
index 9531d81f19..622c508b3b 100644
--- a/src/test/subscription/t/001_rep_changes.pl
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -6,7 +6,7 @@ use strict;
 use warnings;
 use PostgreSQL::Test::Cluster;
 use PostgreSQL::Test::Utils;
-use Test::More tests => 32;
+use Test::More tests => 33;
 
 # Initialize publisher node
 my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
@@ -520,6 +520,80 @@ is($result, qq(0), 'check replication origin was dropped on subscriber');
 $node_subscriber->stop('fast');
 $node_publisher->stop('fast');
 
+# Test schema invalidation by renaming the schema
+# Create tables on publisher
+# Initialize publisher node
+my $node_publisher1 = PostgreSQL::Test::Cluster->new('publisher1');
+$node_publisher1->init(allows_streaming => 'logical');
+$node_publisher1->start;
+
+# Create subscriber node
+my $node_subscriber1 = PostgreSQL::Test::Cluster->new('subscriber1');
+$node_subscriber1->init(allows_streaming => 'logical');
+$node_subscriber1->start;
+
+my $publisher1_connstr = $node_publisher1->connstr . ' dbname=postgres';
+
+$node_publisher1->safe_psql('postgres', "CREATE SCHEMA sch1");
+$node_publisher1->safe_psql('postgres', "CREATE TABLE sch1.t1 (c1 int)");
+
+# Create tables on subscriber
+$node_subscriber1->safe_psql('postgres', "CREATE SCHEMA sch1");
+$node_subscriber1->safe_psql('postgres', "CREATE TABLE sch1.t1 (c1 int)");
+$node_subscriber1->safe_psql('postgres', "CREATE SCHEMA sch2");
+$node_subscriber1->safe_psql('postgres', "CREATE TABLE sch2.t1 (c1 int)");
+
+# Setup logical replication that will only be used for this test
+$node_publisher1->safe_psql('postgres',
+        "CREATE PUBLICATION tap_pub_sch FOR ALL TABLES"
+);
+$node_subscriber1->safe_psql('postgres',
+        "CREATE SUBSCRIPTION tap_sub_sch CONNECTION '$publisher1_connstr' PUBLICATION tap_pub_sch"
+);
+
+$node_publisher1->wait_for_catchup('tap_sub_sch');
+
+# Also wait for initial table sync to finish
+$synced_query =
+  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber1->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+$node_publisher1->safe_psql('postgres',
+        "begin;
+insert into sch1.t1 values(1);
+alter schema sch1 rename to sch2;
+create schema sch1;
+create table sch1.t1(c1 int);
+insert into sch1.t1 values(2);
+insert into sch2.t1 values(3);
+commit;");
+
+$node_publisher1->wait_for_catchup('tap_sub_sch');
+$node_subscriber1->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# Subscriber should not receive the inserted row for renamed schema
+$result =
+  $node_subscriber1->safe_psql('postgres', "SELECT * FROM sch1.t1");
+is($result, qq(1
+2), 'check rows on subscriber after schema invalidation');
+
+# Drop subscription as we don't need it anymore
+$node_subscriber1->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_sch");
+
+# Drop publications as we don't need them anymore
+$node_publisher1->safe_psql('postgres', "DROP PUBLICATION tap_pub_sch");
+
+# Clean up the tables on both publisher and subscriber as we don't need them
+$node_publisher1->safe_psql('postgres', "DROP SCHEMA sch1 cascade");
+$node_publisher1->safe_psql('postgres', "DROP SCHEMA sch2 cascade");
+$node_subscriber1->safe_psql('postgres', "DROP SCHEMA sch1 cascade");
+$node_subscriber1->safe_psql('postgres', "DROP SCHEMA sch2 cascade");
+
+$node_subscriber1->stop('fast');
+$node_publisher1->stop('fast');
+
 # CREATE PUBLICATION while wal_level=minimal should succeed, with a WARNING
 $node_publisher->append_conf(
 	'postgresql.conf', qq(
-- 
2.32.0

#8vignesh C
vignesh21@gmail.com
In reply to: vignesh C (#7)
1 attachment(s)
Re: Logical replication - schema change not invalidating the relation cache

On Fri, Dec 3, 2021 at 3:21 PM vignesh C <vignesh21@gmail.com> wrote:

On Fri, Dec 3, 2021 at 1:13 PM Michael Paquier <michael@paquier.xyz> wrote:

On Thu, Aug 26, 2021 at 09:00:39PM +0530, vignesh C wrote:

The previous patch was failing because of the recent test changes made
by commit 201a76183e2 which unified new and get_new_node, attached
patch has the changes to handle the changes accordingly.

Please note that the CF app is complaining about this patch, so a
rebase is required. I have moved it to next CF, waiting on author,
for now.

Thanks for letting me know, I have rebased it on top of HEAD, the
attached v2 version has the rebased changes.

The patch was not applying on top of the HEAD, attached v3 version
which has the rebased changes.

Regards,
Vignesh

Attachments:

v3-0001-Fix-for-invalidating-logical-replication-relation.patchtext/x-patch; charset=US-ASCII; name=v3-0001-Fix-for-invalidating-logical-replication-relation.patchDownload
From ad79b95e9362239e32cc597aeac1d6e22aaa7504 Mon Sep 17 00:00:00 2001
From: Vigneshwaran C <vignesh21@gmail.com>
Date: Sat, 12 Mar 2022 13:04:55 +0530
Subject: [PATCH v3] Fix for invalidating logical replication relations when
 there is a change in schema.

When the schema gets changed, the rel sync cache invalidation was not
happening, fixed it by adding a callback for schema change.
---
 src/backend/replication/pgoutput/pgoutput.c | 44 ++++++++++++
 src/test/subscription/t/001_rep_changes.pl  | 74 +++++++++++++++++++++
 2 files changed, 118 insertions(+)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index ea57a0477f..2ce634a90b 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -176,6 +176,8 @@ static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
 static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
 static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
 										  uint32 hashvalue);
+static void rel_sync_cache_namespace_cb(Datum arg, int cacheid,
+										uint32 hashvalue);
 static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
 											TransactionId xid);
 static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
@@ -1658,6 +1660,9 @@ init_rel_sync_cache(MemoryContext cachectx)
 	CacheRegisterSyscacheCallback(PUBLICATIONNAMESPACEMAP,
 								  rel_sync_cache_publication_cb,
 								  (Datum) 0);
+	CacheRegisterSyscacheCallback(NAMESPACEOID,
+								  rel_sync_cache_namespace_cb,
+								  (Datum) 0);
 }
 
 /*
@@ -1989,6 +1994,45 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
 	}
 }
 
+/*
+ * Namespace syscache invalidation callback
+ */
+static void
+rel_sync_cache_namespace_cb(Datum arg, int cacheid, uint32 hashvalue)
+{
+	HASH_SEQ_STATUS status;
+	RelationSyncEntry *entry;
+
+	/*
+	 * We can get here if the plugin was used in SQL interface as the
+	 * RelSchemaSyncCache is destroyed when the decoding finishes, but there
+	 * is no way to unregister the relcache invalidation callback.
+	 */
+	if (RelationSyncCache == NULL)
+		return;
+
+	hash_seq_init(&status, RelationSyncCache);
+	while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
+	{
+		/*
+		 * Reset schema sent status as the relation definition may have changed.
+		 * Also free any objects that depended on the earlier definition.
+		 */
+		entry->schema_sent = false;
+		list_free(entry->streamed_txns);
+		entry->streamed_txns = NIL;
+
+		if (entry->attrmap)
+			free_attrmap(entry->attrmap);
+		entry->attrmap = NULL;
+
+		if (hash_search(RelationSyncCache,
+				(void *) &entry->relid,
+				HASH_REMOVE, NULL) == NULL)
+			elog(ERROR, "hash table corrupted");
+	}
+}
+
 /*
  * Publication relation/schema map syscache invalidation callback
  *
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
index af0cff6a30..4b7b9e54ff 100644
--- a/src/test/subscription/t/001_rep_changes.pl
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -520,6 +520,80 @@ is($result, qq(0), 'check replication origin was dropped on subscriber');
 $node_subscriber->stop('fast');
 $node_publisher->stop('fast');
 
+# Test schema invalidation by renaming the schema
+# Create tables on publisher
+# Initialize publisher node
+my $node_publisher1 = PostgreSQL::Test::Cluster->new('publisher1');
+$node_publisher1->init(allows_streaming => 'logical');
+$node_publisher1->start;
+
+# Create subscriber node
+my $node_subscriber1 = PostgreSQL::Test::Cluster->new('subscriber1');
+$node_subscriber1->init(allows_streaming => 'logical');
+$node_subscriber1->start;
+
+my $publisher1_connstr = $node_publisher1->connstr . ' dbname=postgres';
+
+$node_publisher1->safe_psql('postgres', "CREATE SCHEMA sch1");
+$node_publisher1->safe_psql('postgres', "CREATE TABLE sch1.t1 (c1 int)");
+
+# Create tables on subscriber
+$node_subscriber1->safe_psql('postgres', "CREATE SCHEMA sch1");
+$node_subscriber1->safe_psql('postgres', "CREATE TABLE sch1.t1 (c1 int)");
+$node_subscriber1->safe_psql('postgres', "CREATE SCHEMA sch2");
+$node_subscriber1->safe_psql('postgres', "CREATE TABLE sch2.t1 (c1 int)");
+
+# Setup logical replication that will only be used for this test
+$node_publisher1->safe_psql('postgres',
+        "CREATE PUBLICATION tap_pub_sch FOR ALL TABLES"
+);
+$node_subscriber1->safe_psql('postgres',
+        "CREATE SUBSCRIPTION tap_sub_sch CONNECTION '$publisher1_connstr' PUBLICATION tap_pub_sch"
+);
+
+$node_publisher1->wait_for_catchup('tap_sub_sch');
+
+# Also wait for initial table sync to finish
+$synced_query =
+  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber1->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+$node_publisher1->safe_psql('postgres',
+        "begin;
+insert into sch1.t1 values(1);
+alter schema sch1 rename to sch2;
+create schema sch1;
+create table sch1.t1(c1 int);
+insert into sch1.t1 values(2);
+insert into sch2.t1 values(3);
+commit;");
+
+$node_publisher1->wait_for_catchup('tap_sub_sch');
+$node_subscriber1->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# Subscriber should not receive the inserted row for renamed schema
+$result =
+  $node_subscriber1->safe_psql('postgres', "SELECT * FROM sch1.t1");
+is($result, qq(1
+2), 'check rows on subscriber after schema invalidation');
+
+# Drop subscription as we don't need it anymore
+$node_subscriber1->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_sch");
+
+# Drop publications as we don't need them anymore
+$node_publisher1->safe_psql('postgres', "DROP PUBLICATION tap_pub_sch");
+
+# Clean up the tables on both publisher and subscriber as we don't need them
+$node_publisher1->safe_psql('postgres', "DROP SCHEMA sch1 cascade");
+$node_publisher1->safe_psql('postgres', "DROP SCHEMA sch2 cascade");
+$node_subscriber1->safe_psql('postgres', "DROP SCHEMA sch1 cascade");
+$node_subscriber1->safe_psql('postgres', "DROP SCHEMA sch2 cascade");
+
+$node_subscriber1->stop('fast');
+$node_publisher1->stop('fast');
+
 # CREATE PUBLICATION while wal_level=minimal should succeed, with a WARNING
 $node_publisher->append_conf(
 	'postgresql.conf', qq(
-- 
2.32.0

#9vignesh C
vignesh21@gmail.com
In reply to: vignesh C (#8)
1 attachment(s)
Re: Logical replication - schema change not invalidating the relation cache

On Sat, Mar 12, 2022 at 1:29 PM vignesh C <vignesh21@gmail.com> wrote:

On Fri, Dec 3, 2021 at 3:21 PM vignesh C <vignesh21@gmail.com> wrote:

On Fri, Dec 3, 2021 at 1:13 PM Michael Paquier <michael@paquier.xyz> wrote:

On Thu, Aug 26, 2021 at 09:00:39PM +0530, vignesh C wrote:

The previous patch was failing because of the recent test changes made
by commit 201a76183e2 which unified new and get_new_node, attached
patch has the changes to handle the changes accordingly.

Please note that the CF app is complaining about this patch, so a
rebase is required. I have moved it to next CF, waiting on author,
for now.

Thanks for letting me know, I have rebased it on top of HEAD, the
attached v2 version has the rebased changes.

The patch was not applying on top of the HEAD, attached v3 version
which has the rebased changes.

The patch needed to be rebased on top of HEAD because of commit
"0c20dd33db1607d6a85ffce24238c1e55e384b49", attached a rebased v3
version for the changes of the same.

Regards,
Vignesh

Attachments:

v3-0001-Fix-for-invalidating-logical-replication-relation.patchapplication/octet-stream; name=v3-0001-Fix-for-invalidating-logical-replication-relation.patchDownload
From 4413489e37b4a196d938e340a10da5eb13317dcd Mon Sep 17 00:00:00 2001
From: Vigneshwaran C <vignesh21@gmail.com>
Date: Sat, 12 Mar 2022 13:04:55 +0530
Subject: [PATCH v3] Fix for invalidating logical replication relations when
 there is a change in schema.

When the schema gets changed, the rel sync cache invalidation was not
happening, fixed it by adding a callback for schema change.
---
 src/backend/replication/pgoutput/pgoutput.c | 44 ++++++++++++++
 src/test/subscription/t/001_rep_changes.pl  | 67 +++++++++++++++++++++
 2 files changed, 111 insertions(+)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index a3c1ba8a40..9afe09c49f 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -219,6 +219,8 @@ static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
 static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
 static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
 										  uint32 hashvalue);
+static void rel_sync_cache_namespace_cb(Datum arg, int cacheid,
+										uint32 hashvalue);
 static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
 											TransactionId xid);
 static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
@@ -1938,6 +1940,9 @@ init_rel_sync_cache(MemoryContext cachectx)
 	CacheRegisterSyscacheCallback(PUBLICATIONNAMESPACEMAP,
 								  rel_sync_cache_publication_cb,
 								  (Datum) 0);
+	CacheRegisterSyscacheCallback(NAMESPACEOID,
+								  rel_sync_cache_namespace_cb,
+								  (Datum) 0);
 }
 
 /*
@@ -2324,6 +2329,45 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
 	}
 }
 
+/*
+ * Namespace syscache invalidation callback
+ */
+static void
+rel_sync_cache_namespace_cb(Datum arg, int cacheid, uint32 hashvalue)
+{
+	HASH_SEQ_STATUS status;
+	RelationSyncEntry *entry;
+
+	/*
+	 * We can get here if the plugin was used in SQL interface as the
+	 * RelSchemaSyncCache is destroyed when the decoding finishes, but there
+	 * is no way to unregister the relcache invalidation callback.
+	 */
+	if (RelationSyncCache == NULL)
+		return;
+
+	hash_seq_init(&status, RelationSyncCache);
+	while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
+	{
+		/*
+		 * Reset schema sent status as the relation definition may have changed.
+		 * Also free any objects that depended on the earlier definition.
+		 */
+		entry->schema_sent = false;
+		list_free(entry->streamed_txns);
+		entry->streamed_txns = NIL;
+
+		if (entry->attrmap)
+			free_attrmap(entry->attrmap);
+		entry->attrmap = NULL;
+
+		if (hash_search(RelationSyncCache,
+				(void *) &entry->relid,
+				HASH_REMOVE, NULL) == NULL)
+			elog(ERROR, "hash table corrupted");
+	}
+}
+
 /*
  * Publication relation/schema map syscache invalidation callback
  *
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
index c5b5be419c..719cd42084 100644
--- a/src/test/subscription/t/001_rep_changes.pl
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -542,6 +542,73 @@ is($result, qq(0), 'check replication origin was dropped on subscriber');
 $node_subscriber->stop('fast');
 $node_publisher->stop('fast');
 
+# Test schema invalidation by renaming the schema
+# Create tables on publisher
+# Initialize publisher node
+my $node_publisher1 = PostgreSQL::Test::Cluster->new('publisher1');
+$node_publisher1->init(allows_streaming => 'logical');
+$node_publisher1->start;
+
+# Create subscriber node
+my $node_subscriber1 = PostgreSQL::Test::Cluster->new('subscriber1');
+$node_subscriber1->init(allows_streaming => 'logical');
+$node_subscriber1->start;
+
+my $publisher1_connstr = $node_publisher1->connstr . ' dbname=postgres';
+
+$node_publisher1->safe_psql('postgres', "CREATE SCHEMA sch1");
+$node_publisher1->safe_psql('postgres', "CREATE TABLE sch1.t1 (c1 int)");
+
+# Create tables on subscriber
+$node_subscriber1->safe_psql('postgres', "CREATE SCHEMA sch1");
+$node_subscriber1->safe_psql('postgres', "CREATE TABLE sch1.t1 (c1 int)");
+$node_subscriber1->safe_psql('postgres', "CREATE SCHEMA sch2");
+$node_subscriber1->safe_psql('postgres', "CREATE TABLE sch2.t1 (c1 int)");
+
+# Setup logical replication that will only be used for this test
+$node_publisher1->safe_psql('postgres',
+        "CREATE PUBLICATION tap_pub_sch FOR ALL TABLES"
+);
+$node_subscriber1->safe_psql('postgres',
+        "CREATE SUBSCRIPTION tap_sub_sch CONNECTION '$publisher1_connstr' PUBLICATION tap_pub_sch"
+);
+
+# Wait for initial table sync to finish
+$node_subscriber1->wait_for_subscription_sync($node_publisher1, 'tap_sub_sch');
+
+$node_publisher1->safe_psql('postgres',
+        "begin;
+insert into sch1.t1 values(1);
+alter schema sch1 rename to sch2;
+create schema sch1;
+create table sch1.t1(c1 int);
+insert into sch1.t1 values(2);
+insert into sch2.t1 values(3);
+commit;");
+
+$node_subscriber1->wait_for_subscription_sync($node_publisher1, 'tap_sub_sch');
+
+# Subscriber should not receive the inserted row for renamed schema
+$result =
+  $node_subscriber1->safe_psql('postgres', "SELECT * FROM sch1.t1");
+is($result, qq(1
+2), 'check rows on subscriber after schema invalidation');
+
+# Drop subscription as we don't need it anymore
+$node_subscriber1->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_sch");
+
+# Drop publications as we don't need them anymore
+$node_publisher1->safe_psql('postgres', "DROP PUBLICATION tap_pub_sch");
+
+# Clean up the tables on both publisher and subscriber as we don't need them
+$node_publisher1->safe_psql('postgres', "DROP SCHEMA sch1 cascade");
+$node_publisher1->safe_psql('postgres', "DROP SCHEMA sch2 cascade");
+$node_subscriber1->safe_psql('postgres', "DROP SCHEMA sch1 cascade");
+$node_subscriber1->safe_psql('postgres', "DROP SCHEMA sch2 cascade");
+
+$node_subscriber1->stop('fast');
+$node_publisher1->stop('fast');
+
 # CREATE PUBLICATION while wal_level=minimal should succeed, with a WARNING
 $node_publisher->append_conf(
 	'postgresql.conf', qq(
-- 
2.32.0

#10Tom Lane
tgl@sss.pgh.pa.us
In reply to: vignesh C (#9)
Re: Logical replication - schema change not invalidating the relation cache

vignesh C <vignesh21@gmail.com> writes:

[ v3-0001-Fix-for-invalidating-logical-replication-relation.patch ]

(btw, please don't send multiple patch versions with the same number,
it's very confusing.)

I looked briefly at this patch. I wonder why you wrote a whole new
callback function instead of just using rel_sync_cache_publication_cb
for this case too.

The bigger picture here though is that in examples such as the one
you gave at the top of the thread, it's not very clear to me that
there's *any* principled behavior. If the connection between publisher
and subscriber tables is only the relation name, fine ... but exactly
which relation name applies? If you've got a transaction that is both
inserting some data and renaming the table, it's really debatable which
insertions should be sent under which name(s). So how much should we
actually care about such cases? Do we really want to force a cache flush
any time somebody changes a (possibly unrelated) pg_namespace entry?
We could be giving up significant performance and not accomplishing much
except changing from one odd behavior to a different one.

regards, tom lane

#11vignesh C
vignesh21@gmail.com
In reply to: Tom Lane (#10)
1 attachment(s)
Re: Logical replication - schema change not invalidating the relation cache

On Thu, 5 Jan 2023 at 03:17, Tom Lane <tgl@sss.pgh.pa.us> wrote:

vignesh C <vignesh21@gmail.com> writes:

[ v3-0001-Fix-for-invalidating-logical-replication-relation.patch ]

(btw, please don't send multiple patch versions with the same number,
it's very confusing.)

Since it was just rebasing on top of HEAD, I did not change the
version, I will take care of this point in the later versions.

I looked briefly at this patch. I wonder why you wrote a whole new
callback function instead of just using rel_sync_cache_publication_cb
for this case too.

Yes we can use rel_sync_cache_publication_cb itself for the
invalidation of the relations, I have changed it.

The bigger picture here though is that in examples such as the one
you gave at the top of the thread, it's not very clear to me that
there's *any* principled behavior. If the connection between publisher
and subscriber tables is only the relation name, fine ... but exactly
which relation name applies? If you've got a transaction that is both
inserting some data and renaming the table, it's really debatable which
insertions should be sent under which name(s). So how much should we
actually care about such cases? Do we really want to force a cache flush
any time somebody changes a (possibly unrelated) pg_namespace entry?
We could be giving up significant performance and not accomplishing much
except changing from one odd behavior to a different one.

The connection between publisher and subscriber table is based on
relation id, During the first change relid, relname and schema name
from publisher will be sent to the subscriber. Subscriber stores these
id, relname and schema name in the LogicalRepRelMap hash for which
relation id is the key. Subsequent data received in the subscriber
will use the relation id received from the publisher and apply the
changes in the subscriber.
The problem does not stop even after the transaction that renames the
schema is completed(Step3 in first mail). Even after the transaction
is completed i.e after Step 3 the inserts of sch1.t1 and sch2.t1 both
get replicated to sch1.t1 in the subscriber side. This happens because
the publisher id's of sch2.t1 and sch1.t1 are mapped to sch1.t1 in the
subscriber side and both inserts are successful.
Step4) In Publisher
postgres=# insert into sch2.t1 values(11);
INSERT 0 1
postgres=# insert into sch1.t1 values(12);
INSERT 0 1

Step5) In Subscriber
postgres=# select * from sch1.t1;
c1
----
11
12
(2 rows)

During the sch1.t1 first insertion the relid, relname and schema name
from publisher will be sent to the subscriber, this entry will be
mapped to sch1.t1 in subscriber side and any insert from the publisher
will insert to sch1.t1.
After the rename of schema(relid will not be changed) since this entry
is not invalidated, even though we are inserting to sch2.t1 as the
relid is not changed, subscriber will continue to insert into sch1.t1
in subscriber.
During the first insert of new table sch1.t1, the relid, relname and
schema name from publisher will be sent to the subscriber, this entry
will be again mapped to sch1.t1 in the subscriber side.
Since both the entries sch1.t1 and sch2.t1 are mapped to sch1.t1 in
the subscriber side, both inserts will insert to the same table.
This issue will get fixed if we invalidate the relation and update the
relmap in the subscriber.
I did not like the behavior where any insert on sch1.t1 or sch2.t1
replicates the changes to sch1.t1 in the subscriber. I felt it might
be better to fix this issue. I agree that the invalidations are
costly. If you feel this is a very corner case then we can skip it.

Attached an updated patch.

Regards,
Vignesh

Attachments:

v4-0001-Fix-for-invalidating-logical-replication-relation.patchtext/x-patch; charset=US-ASCII; name=v4-0001-Fix-for-invalidating-logical-replication-relation.patchDownload
From 74211d016528699205163dab8ecc7fe04aec09b2 Mon Sep 17 00:00:00 2001
From: Vigneshwaran C <vignesh21@gmail.com>
Date: Sat, 12 Mar 2022 13:04:55 +0530
Subject: [PATCH v4] Fix for invalidating logical replication relations when
 there is a change in schema.

When the schema gets changed, the rel sync cache invalidation was not
happening, fixed it by adding a callback for schema change.
---
 src/backend/replication/pgoutput/pgoutput.c |  3 +
 src/test/subscription/t/001_rep_changes.pl  | 67 +++++++++++++++++++++
 2 files changed, 70 insertions(+)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 7737242516..f68c271cf2 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -1936,6 +1936,9 @@ init_rel_sync_cache(MemoryContext cachectx)
 	CacheRegisterSyscacheCallback(PUBLICATIONNAMESPACEMAP,
 								  rel_sync_cache_publication_cb,
 								  (Datum) 0);
+	CacheRegisterSyscacheCallback(NAMESPACEOID,
+								  rel_sync_cache_publication_cb,
+								  (Datum) 0);
 }
 
 /*
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
index 91aa068c95..3e91640c03 100644
--- a/src/test/subscription/t/001_rep_changes.pl
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -542,6 +542,73 @@ is($result, qq(0), 'check replication origin was dropped on subscriber');
 $node_subscriber->stop('fast');
 $node_publisher->stop('fast');
 
+# Test schema invalidation by renaming the schema
+# Create tables on publisher
+# Initialize publisher node
+my $node_publisher1 = PostgreSQL::Test::Cluster->new('publisher1');
+$node_publisher1->init(allows_streaming => 'logical');
+$node_publisher1->start;
+
+# Create subscriber node
+my $node_subscriber1 = PostgreSQL::Test::Cluster->new('subscriber1');
+$node_subscriber1->init(allows_streaming => 'logical');
+$node_subscriber1->start;
+
+my $publisher1_connstr = $node_publisher1->connstr . ' dbname=postgres';
+
+$node_publisher1->safe_psql('postgres', "CREATE SCHEMA sch1");
+$node_publisher1->safe_psql('postgres', "CREATE TABLE sch1.t1 (c1 int)");
+
+# Create tables on subscriber
+$node_subscriber1->safe_psql('postgres', "CREATE SCHEMA sch1");
+$node_subscriber1->safe_psql('postgres', "CREATE TABLE sch1.t1 (c1 int)");
+$node_subscriber1->safe_psql('postgres', "CREATE SCHEMA sch2");
+$node_subscriber1->safe_psql('postgres', "CREATE TABLE sch2.t1 (c1 int)");
+
+# Setup logical replication that will only be used for this test
+$node_publisher1->safe_psql('postgres',
+        "CREATE PUBLICATION tap_pub_sch FOR ALL TABLES"
+);
+$node_subscriber1->safe_psql('postgres',
+        "CREATE SUBSCRIPTION tap_sub_sch CONNECTION '$publisher1_connstr' PUBLICATION tap_pub_sch"
+);
+
+# Wait for initial table sync to finish
+$node_subscriber1->wait_for_subscription_sync($node_publisher1, 'tap_sub_sch');
+
+$node_publisher1->safe_psql('postgres',
+        "begin;
+insert into sch1.t1 values(1);
+alter schema sch1 rename to sch2;
+create schema sch1;
+create table sch1.t1(c1 int);
+insert into sch1.t1 values(2);
+insert into sch2.t1 values(3);
+commit;");
+
+$node_subscriber1->wait_for_subscription_sync($node_publisher1, 'tap_sub_sch');
+
+# Subscriber should not receive the inserted row for renamed schema
+$result =
+  $node_subscriber1->safe_psql('postgres', "SELECT * FROM sch1.t1");
+is($result, qq(1
+2), 'check rows on subscriber after schema invalidation');
+
+# Drop subscription as we don't need it anymore
+$node_subscriber1->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_sch");
+
+# Drop publications as we don't need them anymore
+$node_publisher1->safe_psql('postgres', "DROP PUBLICATION tap_pub_sch");
+
+# Clean up the tables on both publisher and subscriber as we don't need them
+$node_publisher1->safe_psql('postgres', "DROP SCHEMA sch1 cascade");
+$node_publisher1->safe_psql('postgres', "DROP SCHEMA sch2 cascade");
+$node_subscriber1->safe_psql('postgres', "DROP SCHEMA sch1 cascade");
+$node_subscriber1->safe_psql('postgres', "DROP SCHEMA sch2 cascade");
+
+$node_subscriber1->stop('fast');
+$node_publisher1->stop('fast');
+
 # CREATE PUBLICATION while wal_level=minimal should succeed, with a WARNING
 $node_publisher->append_conf(
 	'postgresql.conf', qq(
-- 
2.34.1

#12Tom Lane
tgl@sss.pgh.pa.us
In reply to: vignesh C (#11)
1 attachment(s)
Re: Logical replication - schema change not invalidating the relation cache

vignesh C <vignesh21@gmail.com> writes:

On Thu, 5 Jan 2023 at 03:17, Tom Lane <tgl@sss.pgh.pa.us> wrote:

The bigger picture here though is that in examples such as the one
you gave at the top of the thread, it's not very clear to me that
there's *any* principled behavior. If the connection between publisher
and subscriber tables is only the relation name, fine ... but exactly
which relation name applies?

The connection between publisher and subscriber table is based on
relation id, During the first change relid, relname and schema name
from publisher will be sent to the subscriber. Subscriber stores these
id, relname and schema name in the LogicalRepRelMap hash for which
relation id is the key. Subsequent data received in the subscriber
will use the relation id received from the publisher and apply the
changes in the subscriber.

Hm. I spent some time cleaning up this patch, and found that there's
still a problem. ISTM that the row with value "3" ought to end up
in the subscriber's sch2.t1 table, but it does not: the attached
test script fails with

t/100_bugs.pl .. 6/?
# Failed test 'check data in subscriber sch2.t1 after schema rename'
# at t/100_bugs.pl line 361.
# got: ''
# expected: '3'
# Looks like you failed 1 test of 9.
t/100_bugs.pl .. Dubious, test returned 1 (wstat 256, 0x100)
Failed 1/9 subtests

What's up with that?

regards, tom lane

Attachments:

v5-0001-Fix-for-invalidating-logical-replication-relation.patchtext/x-diff; charset=us-ascii; name*0=v5-0001-Fix-for-invalidating-logical-replication-relation.p; name*1=atchDownload
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 7737242516..876adab38e 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -1929,7 +1929,22 @@ init_rel_sync_cache(MemoryContext cachectx)
 
 	Assert(RelationSyncCache != NULL);
 
+	/* We must update the cache entry for a relation after a relcache flush */
 	CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
+
+	/*
+	 * Flush all cache entries after a pg_namespace change, in case it was a
+	 * schema rename affecting a relation being replicated.
+	 */
+	CacheRegisterSyscacheCallback(NAMESPACEOID,
+								  rel_sync_cache_publication_cb,
+								  (Datum) 0);
+
+	/*
+	 * Flush all cache entries after any publication changes.  (We need no
+	 * callback entry for pg_publication, because publication_invalidation_cb
+	 * will take care of it.)
+	 */
 	CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
 								  rel_sync_cache_publication_cb,
 								  (Datum) 0);
@@ -2325,8 +2340,8 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
 /*
  * Publication relation/schema map syscache invalidation callback
  *
- * Called for invalidations on pg_publication, pg_publication_rel, and
- * pg_publication_namespace.
+ * Called for invalidations on pg_publication, pg_publication_rel,
+ * pg_publication_namespace, and pg_namespace.
  */
 static void
 rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
@@ -2337,14 +2352,14 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
 	/*
 	 * We can get here if the plugin was used in SQL interface as the
 	 * RelSchemaSyncCache is destroyed when the decoding finishes, but there
-	 * is no way to unregister the relcache invalidation callback.
+	 * is no way to unregister the invalidation callbacks.
 	 */
 	if (RelationSyncCache == NULL)
 		return;
 
 	/*
-	 * There is no way to find which entry in our cache the hash belongs to so
-	 * mark the whole cache as invalid.
+	 * We have no easy way to identify which cache entries this invalidation
+	 * event might have affected, so just mark them all invalid.
 	 */
 	hash_seq_init(&status, RelationSyncCache);
 	while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
diff --git a/src/test/subscription/t/100_bugs.pl b/src/test/subscription/t/100_bugs.pl
index 143caac792..de827f8777 100644
--- a/src/test/subscription/t/100_bugs.pl
+++ b/src/test/subscription/t/100_bugs.pl
@@ -70,9 +70,10 @@ $node_publisher->wait_for_catchup('sub1');
 pass('index predicates do not cause crash');
 
 # We'll re-use these nodes below, so drop their replication state.
-# We don't bother to drop the tables though.
 $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION sub1");
 $node_publisher->safe_psql('postgres', "DROP PUBLICATION pub1");
+# Drop the tables too.
+$node_publisher->safe_psql('postgres', "DROP TABLE tab1");
 
 $node_publisher->stop('fast');
 $node_subscriber->stop('fast');
@@ -307,6 +308,58 @@ is( $node_subscriber->safe_psql(
 	qq(-1|1),
 	"update works with REPLICA IDENTITY");
 
+# Clean up
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub");
+$node_publisher->safe_psql('postgres', "DROP TABLE tab_replidentity_index");
+$node_subscriber->safe_psql('postgres', "DROP TABLE tab_replidentity_index");
+
+# Test schema invalidation by renaming the schema
+
+# Create tables on publisher
+$node_publisher->safe_psql('postgres', "CREATE SCHEMA sch1");
+$node_publisher->safe_psql('postgres', "CREATE TABLE sch1.t1 (c1 int)");
+
+# Create tables on subscriber
+$node_subscriber->safe_psql('postgres', "CREATE SCHEMA sch1");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE sch1.t1 (c1 int)");
+$node_subscriber->safe_psql('postgres', "CREATE SCHEMA sch2");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE sch2.t1 (c1 int)");
+
+# Setup logical replication that will cover t1 under both schema names
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_sch FOR ALL TABLES");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_sch CONNECTION '$publisher_connstr' PUBLICATION tap_pub_sch"
+);
+
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub_sch');
+
+# Check what happens to data inserted before and after schema rename
+$node_publisher->safe_psql(
+	'postgres',
+	"begin;
+insert into sch1.t1 values(1);
+alter schema sch1 rename to sch2;
+create schema sch1;
+create table sch1.t1(c1 int);
+insert into sch1.t1 values(2);
+insert into sch2.t1 values(3);
+commit;");
+
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub_sch');
+
+# Subscriber's sch1.t1 should receive the row inserted into the new sch1.t1,
+# but not the row inserted into the old sch1.t1 post-rename.
+my $result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.t1");
+is( $result, qq(1
+2), 'check data in subscriber sch1.t1 after schema rename');
+
+# Instead, that row should appear in sch2.t1.
+$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch2.t1");
+is($result, qq(3), 'check data in subscriber sch2.t1 after schema rename');
+
 $node_publisher->stop('fast');
 $node_subscriber->stop('fast');
 
#13vignesh C
vignesh21@gmail.com
In reply to: Tom Lane (#12)
1 attachment(s)
Re: Logical replication - schema change not invalidating the relation cache

On Fri, 6 Jan 2023 at 04:32, Tom Lane <tgl@sss.pgh.pa.us> wrote:

vignesh C <vignesh21@gmail.com> writes:

On Thu, 5 Jan 2023 at 03:17, Tom Lane <tgl@sss.pgh.pa.us> wrote:

The bigger picture here though is that in examples such as the one
you gave at the top of the thread, it's not very clear to me that
there's *any* principled behavior. If the connection between publisher
and subscriber tables is only the relation name, fine ... but exactly
which relation name applies?

The connection between publisher and subscriber table is based on
relation id, During the first change relid, relname and schema name
from publisher will be sent to the subscriber. Subscriber stores these
id, relname and schema name in the LogicalRepRelMap hash for which
relation id is the key. Subsequent data received in the subscriber
will use the relation id received from the publisher and apply the
changes in the subscriber.

Hm. I spent some time cleaning up this patch, and found that there's
still a problem. ISTM that the row with value "3" ought to end up
in the subscriber's sch2.t1 table, but it does not: the attached
test script fails with

t/100_bugs.pl .. 6/?
# Failed test 'check data in subscriber sch2.t1 after schema rename'
# at t/100_bugs.pl line 361.
# got: ''
# expected: '3'
# Looks like you failed 1 test of 9.
t/100_bugs.pl .. Dubious, test returned 1 (wstat 256, 0x100)
Failed 1/9 subtests

What's up with that?

When the subscription is created, the subscriber will create a
subscription relation map of the corresponding relations from the
publication. The subscription relation map will only have sch1.t1
entry. As sch2.t1 was not present in the publisher when the
subscription was created, subscription will not have this entry in the
subscription relation map. So the insert operations performed on the
new table sch2.t1 will not be applied by the subscriber. We will have
to refresh the publication using 'ALTER SUBSCRIPTION ... REFRESH
PUBLICATION' to fetch missing table information from publisher. This
will start replication of tables that were added to the subscribed-to
publications since CREATE SUBSCRIPTION or the last invocation of
REFRESH PUBLICATION.
I have modified the test to include 'ALTER SUBSCRIPTION ... REFRESH
PUBLICATION' to get the new data. The test should expect 1 & 3 for
sch2.t1 as the record with value 1 was already inserted before rename.
The updated v6 patch has the changes for the same.

Regards,
Vignesh

Attachments:

v6-0001-Fix-for-invalidating-logical-replication-relation.patchtext/x-patch; charset=US-ASCII; name=v6-0001-Fix-for-invalidating-logical-replication-relation.patchDownload
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 7737242516..876adab38e 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -1929,7 +1929,22 @@ init_rel_sync_cache(MemoryContext cachectx)
 
 	Assert(RelationSyncCache != NULL);
 
+	/* We must update the cache entry for a relation after a relcache flush */
 	CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
+
+	/*
+	 * Flush all cache entries after a pg_namespace change, in case it was a
+	 * schema rename affecting a relation being replicated.
+	 */
+	CacheRegisterSyscacheCallback(NAMESPACEOID,
+								  rel_sync_cache_publication_cb,
+								  (Datum) 0);
+
+	/*
+	 * Flush all cache entries after any publication changes.  (We need no
+	 * callback entry for pg_publication, because publication_invalidation_cb
+	 * will take care of it.)
+	 */
 	CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
 								  rel_sync_cache_publication_cb,
 								  (Datum) 0);
@@ -2325,8 +2340,8 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
 /*
  * Publication relation/schema map syscache invalidation callback
  *
- * Called for invalidations on pg_publication, pg_publication_rel, and
- * pg_publication_namespace.
+ * Called for invalidations on pg_publication, pg_publication_rel,
+ * pg_publication_namespace, and pg_namespace.
  */
 static void
 rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
@@ -2337,14 +2352,14 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
 	/*
 	 * We can get here if the plugin was used in SQL interface as the
 	 * RelSchemaSyncCache is destroyed when the decoding finishes, but there
-	 * is no way to unregister the relcache invalidation callback.
+	 * is no way to unregister the invalidation callbacks.
 	 */
 	if (RelationSyncCache == NULL)
 		return;
 
 	/*
-	 * There is no way to find which entry in our cache the hash belongs to so
-	 * mark the whole cache as invalid.
+	 * We have no easy way to identify which cache entries this invalidation
+	 * event might have affected, so just mark them all invalid.
 	 */
 	hash_seq_init(&status, RelationSyncCache);
 	while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
diff --git a/src/test/subscription/t/100_bugs.pl b/src/test/subscription/t/100_bugs.pl
index 143caac792..ed310184ff 100644
--- a/src/test/subscription/t/100_bugs.pl
+++ b/src/test/subscription/t/100_bugs.pl
@@ -70,9 +70,10 @@ $node_publisher->wait_for_catchup('sub1');
 pass('index predicates do not cause crash');
 
 # We'll re-use these nodes below, so drop their replication state.
-# We don't bother to drop the tables though.
 $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION sub1");
 $node_publisher->safe_psql('postgres', "DROP PUBLICATION pub1");
+# Drop the tables too.
+$node_publisher->safe_psql('postgres', "DROP TABLE tab1");
 
 $node_publisher->stop('fast');
 $node_subscriber->stop('fast');
@@ -307,6 +308,64 @@ is( $node_subscriber->safe_psql(
 	qq(-1|1),
 	"update works with REPLICA IDENTITY");
 
+# Clean up
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub");
+$node_publisher->safe_psql('postgres', "DROP TABLE tab_replidentity_index");
+$node_subscriber->safe_psql('postgres', "DROP TABLE tab_replidentity_index");
+
+# Test schema invalidation by renaming the schema
+
+# Create tables on publisher
+$node_publisher->safe_psql('postgres', "CREATE SCHEMA sch1");
+$node_publisher->safe_psql('postgres', "CREATE TABLE sch1.t1 (c1 int)");
+
+# Create tables on subscriber
+$node_subscriber->safe_psql('postgres', "CREATE SCHEMA sch1");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE sch1.t1 (c1 int)");
+$node_subscriber->safe_psql('postgres', "CREATE SCHEMA sch2");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE sch2.t1 (c1 int)");
+
+# Setup logical replication that will cover t1 under both schema names
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_sch FOR ALL TABLES");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_sch CONNECTION '$publisher_connstr' PUBLICATION tap_pub_sch"
+);
+
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub_sch');
+
+# Check what happens to data inserted before and after schema rename
+$node_publisher->safe_psql(
+	'postgres',
+	"begin;
+insert into sch1.t1 values(1);
+alter schema sch1 rename to sch2;
+create schema sch1;
+create table sch1.t1(c1 int);
+insert into sch1.t1 values(2);
+insert into sch2.t1 values(3);
+commit;");
+
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub_sch');
+
+# Subscriber's sch1.t1 should receive the row inserted into the new sch1.t1,
+# but not the row inserted into the old sch1.t1 post-rename.
+my $result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.t1");
+is( $result, qq(1
+2), 'check data in subscriber sch1.t1 after schema rename');
+
+$node_subscriber->safe_psql('postgres',
+	'ALTER SUBSCRIPTION tap_sub_sch REFRESH PUBLICATION');
+
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub_sch');
+
+# Instead, that row should appear in sch2.t1.
+$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch2.t1");
+is($result, qq(1
+3), 'check data in subscriber sch2.t1 after schema rename');
+
 $node_publisher->stop('fast');
 $node_subscriber->stop('fast');
 
#14Tom Lane
tgl@sss.pgh.pa.us
In reply to: vignesh C (#13)
Re: Logical replication - schema change not invalidating the relation cache

vignesh C <vignesh21@gmail.com> writes:

On Fri, 6 Jan 2023 at 04:32, Tom Lane <tgl@sss.pgh.pa.us> wrote:

Hm. I spent some time cleaning up this patch, and found that there's
still a problem. ISTM that the row with value "3" ought to end up
in the subscriber's sch2.t1 table, but it does not: the attached
test script fails with
...
What's up with that?

When the subscription is created, the subscriber will create a
subscription relation map of the corresponding relations from the
publication. The subscription relation map will only have sch1.t1
entry. As sch2.t1 was not present in the publisher when the
subscription was created, subscription will not have this entry in the
subscription relation map. So the insert operations performed on the
new table sch2.t1 will not be applied by the subscriber. We will have
to refresh the publication using 'ALTER SUBSCRIPTION ... REFRESH
PUBLICATION' to fetch missing table information from publisher. This
will start replication of tables that were added to the subscribed-to
publications since CREATE SUBSCRIPTION or the last invocation of
REFRESH PUBLICATION.

But ... but ... but ... that's the exact opposite of what the test
case shows to be happening. To wit, the newly created table
(the second coming of sch1.t1) *is* replicated immediately, while
the pre-existing t1 (now sch2.t1) is not. It's impossible to
explain those two facts under either a model of "tables are matched
by name" or "tables are matched by OID". So I'm still of the opinion
that there's some very dubious behavior here.

However, it does seem that the cache flush makes one aspect better,
so I pushed this after a little further work on the test case.

regards, tom lane