Selectively invalidate caches in pgoutput module

Started by Hayato Kuroda (Fujitsu)11 months ago28 messages
#1Hayato Kuroda (Fujitsu)
kuroda.hayato@fujitsu.com
1 attachment(s)

Dear hackers,

Hi, this is a fork thread from [1]/messages/by-id/de52b282-1166-1180-45a2-8d8917ca74c6@enterprisedb.com. I want to propose a small optimization for
logical replication system.

Background
==========

When the ALTER PUBLICATION command is executed, all entries in RelationSyncCache
will be discarded anyway. This mechanism works well but is sometimes not efficient.
For example, when the ALTER PUBLICATION DROP TABLE is executed,
1) the specific entry in RelationSyncCache will be removed, and then
2) all entries will be discarded twice.

This happens because the pgoutput plugin registers both RelcacheCallback
(rel_sync_cache_relation_cb) and SyscacheCallback (publication_invalidation_cb,
rel_sync_cache_publication_cb). Then, when ALTER PUBLICATION ADD/SET/DROP is executed,
both the relation cache of added tables and the syscache of pg_publication_rel and
pg_publication are invalidated.
The callback for the relation cache will remove an entry from the hash table, and
syscache callbacks will look up all entries and invalidate them. However, AFAICS
does not need to invalidate all of them.

I grepped source codes and found this happens since the initial version.

Currently the effect of the behavior may not be large, but [1]/messages/by-id/de52b282-1166-1180-45a2-8d8917ca74c6@enterprisedb.com may affect
significantly because it propagates invalidation messages to all in-progress
decoding transactions.

Patch overview
============

Based on the background, the patch avoids dropping all entries in RelationSyncCache
when ALTER PUBLICATION is executed. It removes sys cache callbacks for pg_publication_rel
and pg_publication_namespace and avoids discarding entries in sys cache for pg_publication.

Apart from the above, this patch also ensures that relcaches of publishing tables
are invalidated when ALTER PUBLICATION is executed. ADD/SET/DROP already has this
mechanism, but ALTER PUBLICATION OWNER TO and RENAME TO do not.
Regarding RENAME TO, now we are using a common function, but it is replaced with
RenamePublication() to do invalidations.

How do you think?

[1]: /messages/by-id/de52b282-1166-1180-45a2-8d8917ca74c6@enterprisedb.com

Best regards,
Hayato Kuroda
FUJITSU LIMITED

Attachments:

0001-Selectively-invalidate-cache-in-pgoutput.patchapplication/octet-stream; name=0001-Selectively-invalidate-cache-in-pgoutput.patchDownload
From 5ea4095c44b1911e802161936db1e5dd770927da Mon Sep 17 00:00:00 2001
From: Shlok Kyal <shlok.kyal.oss@gmail.com>
Date: Mon, 24 Feb 2025 15:34:31 +0530
Subject: [PATCH] Selectively invalidate cache in pgoutput

When the ALTER PUBLICATION command is executed, all entries in RelationSyncCache
will be discarded anyway. This mechanism works well but is sometimes not
efficient. For example, when the ALTER PUBLICATION DROP TABLE is executed,
1) the specific entry in RelationSyncCache will be removed, and then 2) all
entries will be discarded.

This patch avoids dropping all entries in RelationSyncCache when ALTER
PUBLICATION is executed. Theoretically, it is enough to invalidate the related
relacache since RelacheCallback has already been registered. The mechanism
exists for ALTER PUBLICATION ADD/SET/DROP, so added for OWNER TO and RENAME
TO statements. Regarding the RENAME TO, we stop using a common function and
replaced it with RenamePublication().

Author: Shlok Kyal and Hayato Kuroda
---
 src/backend/commands/alter.c                |   4 +-
 src/backend/commands/publicationcmds.c      | 107 ++++++++++++++++++++
 src/backend/parser/gram.y                   |   2 +-
 src/backend/replication/pgoutput/pgoutput.c |  18 ----
 src/include/commands/publicationcmds.h      |   1 +
 5 files changed, 112 insertions(+), 20 deletions(-)

diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c
index 78c1d4e1b8..a79329acc1 100644
--- a/src/backend/commands/alter.c
+++ b/src/backend/commands/alter.c
@@ -400,6 +400,9 @@ ExecRenameStmt(RenameStmt *stmt)
 		case OBJECT_TYPE:
 			return RenameType(stmt);
 
+		case OBJECT_PUBLICATION:
+			return RenamePublication(stmt->subname, stmt->newname);
+
 		case OBJECT_AGGREGATE:
 		case OBJECT_COLLATION:
 		case OBJECT_CONVERSION:
@@ -417,7 +420,6 @@ ExecRenameStmt(RenameStmt *stmt)
 		case OBJECT_TSDICTIONARY:
 		case OBJECT_TSPARSER:
 		case OBJECT_TSTEMPLATE:
-		case OBJECT_PUBLICATION:
 		case OBJECT_SUBSCRIPTION:
 			{
 				ObjectAddress address;
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 150a768d16..182d2187f1 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -491,6 +491,87 @@ pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
 	return *invalid_column_list || *invalid_gen_col;
 }
 
+/*
+ * Execute ALTER PUBLICATION RENAME
+ */
+ObjectAddress
+RenamePublication(const char *oldname, const char *newname)
+{
+	Relation			rel;
+	HeapTuple			tup;
+	ObjectAddress		address;
+	Form_pg_publication	pubform;
+	bool				replaces[Natts_pg_publication];
+	bool				nulls[Natts_pg_publication];
+	Datum				values[Natts_pg_publication];
+
+	rel = table_open(PublicationRelationId, RowExclusiveLock);
+
+	tup = SearchSysCacheCopy1(PUBLICATIONNAME,
+							  CStringGetDatum(oldname));
+
+	if (!HeapTupleIsValid(tup))
+		ereport(ERROR,
+				(errcode(ERRCODE_UNDEFINED_OBJECT),
+				 errmsg("publication \"%s\" does not exist",
+						oldname)));
+
+	pubform = (Form_pg_publication) GETSTRUCT(tup);
+
+	/* must be owner */
+	if (!object_ownercheck(PublicationRelationId, pubform->oid, GetUserId()))
+		aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
+					   NameStr(pubform->pubname));
+
+	/* Everything ok, form a new tuple. */
+	memset(values, 0, sizeof(values));
+	memset(nulls, false, sizeof(nulls));
+	memset(replaces, false, sizeof(replaces));
+
+	/* Only update the pubname */
+	values[Anum_pg_publication_pubname - 1] =
+		DirectFunctionCall1(namein, CStringGetDatum(newname));
+	replaces[Anum_pg_publication_pubname - 1] = true;
+
+	tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+							replaces);
+
+	/* Invalidate the relcache. */
+	if (pubform->puballtables)
+	{
+		CacheInvalidateRelcacheAll();
+	}
+	else
+	{
+		List	   *relids = NIL;
+		List	   *schemarelids = NIL;
+
+		/*
+		 * For partition table, when we insert data, get_rel_sync_entry is
+		 * called and a hash entry is created for the corresponding leaf table.
+		 * So invalidating the leaf nodes would be sufficient here.
+		 */
+		relids = GetPublicationRelations(pubform->oid,
+										 PUBLICATION_PART_LEAF);
+		schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
+														PUBLICATION_PART_LEAF);
+
+		relids = list_concat_unique_oid(relids, schemarelids);
+
+		InvalidatePublicationRels(relids);
+	}
+
+	CatalogTupleUpdate(rel, &tup->t_self, tup);
+
+	ObjectAddressSet(address, PublicationRelationId, pubform->oid);
+
+	heap_freetuple(tup);
+
+	table_close(rel, RowExclusiveLock);
+
+	return address;
+}
+
 /* check_functions_in_node callback */
 static bool
 contain_mutable_or_user_functions_checker(Oid func_id, void *context)
@@ -1996,6 +2077,32 @@ AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
 	}
 
 	form->pubowner = newOwnerId;
+
+	/* Invalidate the relcache. */
+	if (form->puballtables)
+	{
+		CacheInvalidateRelcacheAll();
+	}
+	else
+	{
+		List	   *relids = NIL;
+		List	   *schemarelids = NIL;
+
+		/*
+		 * For partition table, when we insert data, get_rel_sync_entry is
+		 * called and a hash entry is created for the corresponding leaf table.
+		 * So invalidating the leaf nodes would be sufficient here.
+		 */
+		relids = GetPublicationRelations(form->oid,
+										 PUBLICATION_PART_LEAF);
+		schemarelids = GetAllSchemaPublicationRelations(form->oid,
+														PUBLICATION_PART_LEAF);
+
+		relids = list_concat_unique_oid(relids, schemarelids);
+
+		InvalidatePublicationRels(relids);
+	}
+
 	CatalogTupleUpdate(rel, &tup->t_self, tup);
 
 	/* Update owner dependency reference */
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 7d99c9355c..49fe0567c5 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -9513,7 +9513,7 @@ RenameStmt: ALTER AGGREGATE aggregate_with_argtypes RENAME TO name
 					RenameStmt *n = makeNode(RenameStmt);
 
 					n->renameType = OBJECT_PUBLICATION;
-					n->object = (Node *) makeString($3);
+					n->subname = $3;
 					n->newname = $6;
 					n->missing_ok = false;
 					$$ = (Node *) n;
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 7d464f656a..b28ce636d5 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -1789,12 +1789,6 @@ static void
 publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
 {
 	publications_valid = false;
-
-	/*
-	 * Also invalidate per-relation cache so that next time the filtering info
-	 * is checked it will be updated with the new publication settings.
-	 */
-	rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
 }
 
 /*
@@ -1970,18 +1964,6 @@ init_rel_sync_cache(MemoryContext cachectx)
 								  rel_sync_cache_publication_cb,
 								  (Datum) 0);
 
-	/*
-	 * Flush all cache entries after any publication changes.  (We need no
-	 * callback entry for pg_publication, because publication_invalidation_cb
-	 * will take care of it.)
-	 */
-	CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
-								  rel_sync_cache_publication_cb,
-								  (Datum) 0);
-	CacheRegisterSyscacheCallback(PUBLICATIONNAMESPACEMAP,
-								  rel_sync_cache_publication_cb,
-								  (Datum) 0);
-
 	relation_callbacks_registered = true;
 }
 
diff --git a/src/include/commands/publicationcmds.h b/src/include/commands/publicationcmds.h
index e11a942ea0..3dfceef70f 100644
--- a/src/include/commands/publicationcmds.h
+++ b/src/include/commands/publicationcmds.h
@@ -38,5 +38,6 @@ extern bool pub_contains_invalid_column(Oid pubid, Relation relation,
 										char pubgencols_type,
 										bool *invalid_column_list,
 										bool *invalid_gen_col);
+extern ObjectAddress RenamePublication(const char *oldname, const char *newname);
 
 #endif							/* PUBLICATIONCMDS_H */
-- 
2.43.5

#2Amit Kapila
amit.kapila16@gmail.com
In reply to: Hayato Kuroda (Fujitsu) (#1)
Re: Selectively invalidate caches in pgoutput module

On Mon, Mar 3, 2025 at 1:27 PM Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:

Hi, this is a fork thread from [1]. I want to propose a small optimization for
logical replication system.

Background
==========

When the ALTER PUBLICATION command is executed, all entries in RelationSyncCache
will be discarded anyway. This mechanism works well but is sometimes not efficient.
For example, when the ALTER PUBLICATION DROP TABLE is executed,
1) the specific entry in RelationSyncCache will be removed, and then
2) all entries will be discarded twice.

In (2) Why twice? Is it because we call
rel_sync_cache_publication_cb() first for PUBLICATIONRELMAP and then
for PUBLICATIONOID via publication_invalidation_cb()?

This happens because the pgoutput plugin registers both RelcacheCallback
(rel_sync_cache_relation_cb) and SyscacheCallback (publication_invalidation_cb,
rel_sync_cache_publication_cb). Then, when ALTER PUBLICATION ADD/SET/DROP is executed,
both the relation cache of added tables and the syscache of pg_publication_rel and
pg_publication are invalidated.
The callback for the relation cache will remove an entry from the hash table, and
syscache callbacks will look up all entries and invalidate them. However, AFAICS
does not need to invalidate all of them.

IIUC, you want to say in the above case only (1) would be sufficient, right?

I grepped source codes and found this happens since the initial version.

Currently the effect of the behavior may not be large,

Is it possible to see the impact? I guess if there are a large number
of relations (or partitions), then all will get invalidated even if
one relation is added/dropped from the publication; if so, this should
impact the performance.

but [1] may affect

significantly because it propagates invalidation messages to all in-progress
decoding transactions.

Patch overview
============

Based on the background, the patch avoids dropping all entries in RelationSyncCache
when ALTER PUBLICATION is executed. It removes sys cache callbacks for pg_publication_rel
and pg_publication_namespace and avoids discarding entries in sys cache for pg_publication.

Apart from the above, this patch also ensures that relcaches of publishing tables
are invalidated when ALTER PUBLICATION is executed. ADD/SET/DROP already has this
mechanism, but ALTER PUBLICATION OWNER TO and RENAME TO do not.
Regarding RENAME TO, now we are using a common function, but it is replaced with
RenamePublication() to do invalidations.

For Rename/Owner, can we use a new invalidation instead of using
relcache invalidation, as that can impact other sessions for no
benefit? IIUC, changing the other properties of publication like
dropping the table requires us to invalidate even the corresponding
relcache entry because it contains the publication descriptor
(rd_pubdesc). See RelationBuildPublicationDesc().

Also, it is better to consider doing rename/owner_change related
changes in a separate patch as that will make the first patch easier
to review and commit.

--
With Regards,
Amit Kapila.

#3Hayato Kuroda (Fujitsu)
kuroda.hayato@fujitsu.com
In reply to: Amit Kapila (#2)
3 attachment(s)
RE: Selectively invalidate caches in pgoutput module

Dear Amit,

When the ALTER PUBLICATION command is executed, all entries in

RelationSyncCache

will be discarded anyway. This mechanism works well but is sometimes not

efficient.

For example, when the ALTER PUBLICATION DROP TABLE is executed,
1) the specific entry in RelationSyncCache will be removed, and then
2) all entries will be discarded twice.

In (2) Why twice? Is it because we call
rel_sync_cache_publication_cb() first for PUBLICATIONRELMAP and then
for PUBLICATIONOID via publication_invalidation_cb()?

This part was not correct. I considered that ALTER PUBLICATION DROP TABLE also
invalidated the pg_publication syscache, but it would not do. So...the command
firstly invalidate a specific entry, and then invalidate all entries once.

This happens because the pgoutput plugin registers both RelcacheCallback
(rel_sync_cache_relation_cb) and SyscacheCallback

(publication_invalidation_cb,

rel_sync_cache_publication_cb). Then, when ALTER PUBLICATION

ADD/SET/DROP is executed,

both the relation cache of added tables and the syscache of pg_publication_rel

and

pg_publication are invalidated.
The callback for the relation cache will remove an entry from the hash table, and
syscache callbacks will look up all entries and invalidate them. However,

AFAICS

does not need to invalidate all of them.

IIUC, you want to say in the above case only (1) would be sufficient, right?

Right, this is a main aim of this proposal.

Is it possible to see the impact? I guess if there are a large number
of relations (or partitions), then all will get invalidated even if
one relation is added/dropped from the publication; if so, this should
impact the performance.

Agreed. I'm planning to do and share results. Please wait...

For Rename/Owner, can we use a new invalidation instead of using
relcache invalidation, as that can impact other sessions for no
benefit? IIUC, changing the other properties of publication like
dropping the table requires us to invalidate even the corresponding
relcache entry because it contains the publication descriptor
(rd_pubdesc). See RelationBuildPublicationDesc().

Attached patchset implemented with the approach.
0001 contains only part to avoid whole cache invalidation, for ADD/SET/DROP command.
0002 introduces new type of invalidation message which only invalidates caches on
the decoding plugin. Better name is very welcome.
0003 uses the mechanism to avoid discarding relcache for RENAME/OWNER case.

Best regards,
Hayato Kuroda
FUJITSU LIMITED

Attachments:

v2-0001-Avoid-Invalidating-all-entries-when-ALTER-PUBLICA.patchapplication/octet-stream; name=v2-0001-Avoid-Invalidating-all-entries-when-ALTER-PUBLICA.patchDownload
From 025ecfd3b105508c50936be170b2ccbc3a25c994 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Mon, 3 Mar 2025 19:40:10 +0900
Subject: [PATCH v2 1/3] Avoid Invalidating all entries when ALTER PUBLICATION
 is executed

When the ALTER PUBLICATION command is executed, all entries in RelationSyncCache
will be discarded anyway. This mechanism works well but is sometimes not
efficient. For example, when the ALTER PUBLICATION DROP TABLE is executed,
1) the specific entry in RelationSyncCache will be removed, and then 2) all
entries will be discarded.

This patch avoids dropping all entries in RelationSyncCache when ALTER
PUBLICATION is executed. Theoretically, it is enough to invalidate the related
relacache since RelacheCallback has already been registered.

Author: Shlok Kyal and Hayato Kuroda
---
 src/backend/replication/pgoutput/pgoutput.c | 12 ------------
 1 file changed, 12 deletions(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 7d464f656a..64aea3af83 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -1970,18 +1970,6 @@ init_rel_sync_cache(MemoryContext cachectx)
 								  rel_sync_cache_publication_cb,
 								  (Datum) 0);
 
-	/*
-	 * Flush all cache entries after any publication changes.  (We need no
-	 * callback entry for pg_publication, because publication_invalidation_cb
-	 * will take care of it.)
-	 */
-	CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
-								  rel_sync_cache_publication_cb,
-								  (Datum) 0);
-	CacheRegisterSyscacheCallback(PUBLICATIONNAMESPACEMAP,
-								  rel_sync_cache_publication_cb,
-								  (Datum) 0);
-
 	relation_callbacks_registered = true;
 }
 
-- 
2.43.5

v2-0002-Introduce-a-new-invalidation-message-to-invalidat.patchapplication/octet-stream; name=v2-0002-Introduce-a-new-invalidation-message-to-invalidat.patchDownload
From 07f9825641c6f64fb1fc475121df79ad361867be Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Tue, 4 Mar 2025 16:51:19 +0900
Subject: [PATCH v2 2/3] Introduce a new invalidation message to invalidate
 caches in output plugins

---
 src/backend/utils/cache/inval.c | 87 +++++++++++++++++++++++++++++++++
 src/include/storage/sinval.h    | 11 +++++
 src/include/utils/inval.h       | 10 ++++
 3 files changed, 108 insertions(+)

diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c
index 700ccb6df9..d793bc9281 100644
--- a/src/backend/utils/cache/inval.c
+++ b/src/backend/utils/cache/inval.c
@@ -271,6 +271,7 @@ int			debug_discard_caches = 0;
 
 #define MAX_SYSCACHE_CALLBACKS 64
 #define MAX_RELCACHE_CALLBACKS 10
+#define MAX_RELSYNC_CALLBACKS 10
 
 static struct SYSCACHECALLBACK
 {
@@ -292,6 +293,15 @@ static struct RELCACHECALLBACK
 
 static int	relcache_callback_count = 0;
 
+static struct RELSYNCCALLBACK
+{
+	RelSyncCallbackFunction function;
+	Datum		arg;
+}			relsync_callback_list[MAX_RELSYNC_CALLBACKS];
+
+static int	relsync_callback_count = 0;
+
+
 /* ----------------------------------------------------------------
  *				Invalidation subgroup support functions
  * ----------------------------------------------------------------
@@ -832,6 +842,12 @@ LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
 		else if (msg->sn.dbId == MyDatabaseId)
 			InvalidateCatalogSnapshot();
 	}
+	else if (msg->id == SHAREDINVALRELSYNC_ID)
+	{
+		/* We only care about our own database */
+		if (msg->rs.dbId == MyDatabaseId)
+			CallRelSyncCallbacks(msg->rs.relid);
+	}
 	else
 		elog(FATAL, "unrecognized SI message ID: %d", msg->id);
 }
@@ -1695,6 +1711,42 @@ CacheInvalidateRelmap(Oid databaseId)
 }
 
 
+/*
+ * RelationCacheInvalidate
+ *		Register invalidation of the cache in logical decoding output plugin
+ *		for a database.
+ *
+ * This type of invalidation message is used for the specific purpose of output
+ * plugins. Processes which do not decode WALs would do nothing even when it
+ * receives the message.
+ */
+void
+CacheInvalidateRelSync(Oid relid)
+{
+	SharedInvalidationMessage msg;
+
+	msg.rs.id = SHAREDINVALRELSYNC_ID;
+	msg.rs.dbId = MyDatabaseId;
+	msg.rs.relid = relid;
+	/* check AddCatcacheInvalidationMessage() for an explanation */
+	VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg));
+
+	SendSharedInvalidMessages(&msg, 1);
+}
+
+
+/*
+ * CacheInvalidateRelSyncAll
+ *		Register invalidation of the whole cache in logical decoding output
+ *		plugin.
+ */
+void
+CacheInvalidateRelSyncAll(void)
+{
+	CacheInvalidateRelSync(InvalidOid);
+}
+
+
 /*
  * CacheRegisterSyscacheCallback
  *		Register the specified function to be called for all future
@@ -1763,6 +1815,27 @@ CacheRegisterRelcacheCallback(RelcacheCallbackFunction func,
 	++relcache_callback_count;
 }
 
+/*
+ * CacheRegisterRelSyncCallback
+ *		Register the specified function to be called for all future
+ *		decoding-cache invalidation events.
+ *
+ * This function is intended to be call from the logical decoding output
+ * plugins.
+ */
+void
+CacheRegisterRelSyncCallback(RelSyncCallbackFunction func,
+							 Datum arg)
+{
+	if (relsync_callback_count >= MAX_RELSYNC_CALLBACKS)
+		elog(FATAL, "out of relsync_callback_list slots");
+
+	relsync_callback_list[relsync_callback_count].function = func;
+	relsync_callback_list[relsync_callback_count].arg = arg;
+
+	++relsync_callback_count;
+}
+
 /*
  * CallSyscacheCallbacks
  *
@@ -1788,6 +1861,20 @@ CallSyscacheCallbacks(int cacheid, uint32 hashvalue)
 	}
 }
 
+/*
+ * CallSyscacheCallbacks
+ */
+void
+CallRelSyncCallbacks(Oid relid)
+{
+	for (int i = 0; i < relsync_callback_count; i++)
+	{
+		struct RELSYNCCALLBACK *ccitem = relsync_callback_list + i;
+
+		ccitem->function(ccitem->arg, relid);
+	}
+}
+
 /*
  * LogLogicalInvalidations
  *
diff --git a/src/include/storage/sinval.h b/src/include/storage/sinval.h
index 2463c0f9fa..90a5af4ed8 100644
--- a/src/include/storage/sinval.h
+++ b/src/include/storage/sinval.h
@@ -27,6 +27,7 @@
  *	* invalidate an smgr cache entry for a specific physical relation
  *	* invalidate the mapped-relation mapping for a given database
  *	* invalidate any saved snapshot that might be used to scan a given relation
+ *	* invalidate a specific entry for specific output plugin
  * More types could be added if needed.  The message type is identified by
  * the first "int8" field of the message struct.  Zero or positive means a
  * specific-catcache inval message (and also serves as the catcache ID field).
@@ -110,6 +111,15 @@ typedef struct
 	Oid			relId;			/* relation ID */
 } SharedInvalSnapshotMsg;
 
+#define SHAREDINVALRELSYNC_ID	(-6)
+
+typedef struct
+{
+	int8		id;				/* type field --- must be first */
+	Oid			dbId;			/* database ID */
+	Oid			relid;			/* relation ID, or 0 if whole relcache */
+} SharedInvalRelSyncMsg;
+
 typedef union
 {
 	int8		id;				/* type field --- must be first */
@@ -119,6 +129,7 @@ typedef union
 	SharedInvalSmgrMsg sm;
 	SharedInvalRelmapMsg rm;
 	SharedInvalSnapshotMsg sn;
+	SharedInvalRelSyncMsg rs;
 } SharedInvalidationMessage;
 
 
diff --git a/src/include/utils/inval.h b/src/include/utils/inval.h
index 40658ba2ff..5922306c11 100644
--- a/src/include/utils/inval.h
+++ b/src/include/utils/inval.h
@@ -22,6 +22,7 @@ extern PGDLLIMPORT int debug_discard_caches;
 
 typedef void (*SyscacheCallbackFunction) (Datum arg, int cacheid, uint32 hashvalue);
 typedef void (*RelcacheCallbackFunction) (Datum arg, Oid relid);
+typedef void (*RelSyncCallbackFunction) (Datum arg, Oid relid);
 
 
 extern void AcceptInvalidationMessages(void);
@@ -59,6 +60,10 @@ extern void CacheInvalidateSmgr(RelFileLocatorBackend rlocator);
 
 extern void CacheInvalidateRelmap(Oid databaseId);
 
+extern void CacheInvalidateRelSync(Oid relid);
+
+extern void CacheInvalidateRelSyncAll(void);
+
 extern void CacheRegisterSyscacheCallback(int cacheid,
 										  SyscacheCallbackFunction func,
 										  Datum arg);
@@ -66,8 +71,13 @@ extern void CacheRegisterSyscacheCallback(int cacheid,
 extern void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func,
 										  Datum arg);
 
+extern void CacheRegisterRelSyncCallback(RelSyncCallbackFunction func,
+										 Datum arg);
+
 extern void CallSyscacheCallbacks(int cacheid, uint32 hashvalue);
 
+extern void CallRelSyncCallbacks(Oid relid);
+
 extern void InvalidateSystemCaches(void);
 extern void InvalidateSystemCachesExtended(bool debug_discard);
 
-- 
2.43.5

v2-0003-Invalidate-Relcaches-while-ALTER-PUBLICATION-OWNE.patchapplication/octet-stream; name=v2-0003-Invalidate-Relcaches-while-ALTER-PUBLICATION-OWNE.patchDownload
From 5a49160c929b84ad7a62f8e1bccf215a53c0fb93 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Mon, 3 Mar 2025 19:41:04 +0900
Subject: [PATCH v2 3/3] Invalidate Relcaches while ALTER PUBLICATION OWNER
 TO/RENAME TO

---
 src/backend/commands/alter.c                |   4 +-
 src/backend/commands/publicationcmds.c      | 139 ++++++++++++++++++++
 src/backend/parser/gram.y                   |   2 +-
 src/backend/replication/pgoutput/pgoutput.c |   8 +-
 src/include/commands/publicationcmds.h      |   2 +
 5 files changed, 147 insertions(+), 8 deletions(-)

diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c
index 78c1d4e1b8..a79329acc1 100644
--- a/src/backend/commands/alter.c
+++ b/src/backend/commands/alter.c
@@ -400,6 +400,9 @@ ExecRenameStmt(RenameStmt *stmt)
 		case OBJECT_TYPE:
 			return RenameType(stmt);
 
+		case OBJECT_PUBLICATION:
+			return RenamePublication(stmt->subname, stmt->newname);
+
 		case OBJECT_AGGREGATE:
 		case OBJECT_COLLATION:
 		case OBJECT_CONVERSION:
@@ -417,7 +420,6 @@ ExecRenameStmt(RenameStmt *stmt)
 		case OBJECT_TSDICTIONARY:
 		case OBJECT_TSPARSER:
 		case OBJECT_TSTEMPLATE:
-		case OBJECT_PUBLICATION:
 		case OBJECT_SUBSCRIPTION:
 			{
 				ObjectAddress address;
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 150a768d16..c1cab00ddb 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -491,6 +491,95 @@ pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
 	return *invalid_column_list || *invalid_gen_col;
 }
 
+/*
+ * Execute ALTER PUBLICATION RENAME
+ */
+ObjectAddress
+RenamePublication(const char *oldname, const char *newname)
+{
+	Relation			rel;
+	HeapTuple			tup;
+	ObjectAddress		address;
+	Form_pg_publication	pubform;
+	bool				replaces[Natts_pg_publication];
+	bool				nulls[Natts_pg_publication];
+	Datum				values[Natts_pg_publication];
+
+	rel = table_open(PublicationRelationId, RowExclusiveLock);
+
+	tup = SearchSysCacheCopy1(PUBLICATIONNAME,
+							  CStringGetDatum(oldname));
+
+	if (!HeapTupleIsValid(tup))
+		ereport(ERROR,
+				(errcode(ERRCODE_UNDEFINED_OBJECT),
+				 errmsg("publication \"%s\" does not exist",
+						oldname)));
+
+	pubform = (Form_pg_publication) GETSTRUCT(tup);
+
+	/* must be owner */
+	if (!object_ownercheck(PublicationRelationId, pubform->oid, GetUserId()))
+		aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
+					   NameStr(pubform->pubname));
+
+	/* Everything ok, form a new tuple. */
+	memset(values, 0, sizeof(values));
+	memset(nulls, false, sizeof(nulls));
+	memset(replaces, false, sizeof(replaces));
+
+	/* Only update the pubname */
+	values[Anum_pg_publication_pubname - 1] =
+		DirectFunctionCall1(namein, CStringGetDatum(newname));
+	replaces[Anum_pg_publication_pubname - 1] = true;
+
+	tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+							replaces);
+
+	/*
+	 * Invalidate caches on the logical decoding output plugin.
+	 *
+	 * Apart from the ALTER PUBLICATION ADD/SET/DROP commands, we do not have
+	 * to invalidate relcaches. They are required to refresh the publication's
+	 * descriptor. The publication's name is not recorded in the attribute, so
+	 * the RENAME commands do not require a refresh. Instead, we only
+	 * invalidate a cache on the output plugin to rebuild its cache.
+	 */
+	if (pubform->puballtables)
+	{
+		CacheInvalidateRelSyncAll();
+	}
+	else
+	{
+		List	   *relids = NIL;
+		List	   *schemarelids = NIL;
+
+		/*
+		 * For partition table, when we insert data, get_rel_sync_entry is
+		 * called and a hash entry is created for the corresponding leaf table.
+		 * So invalidating the leaf nodes would be sufficient here.
+		 */
+		relids = GetPublicationRelations(pubform->oid,
+										 PUBLICATION_PART_LEAF);
+		schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
+														PUBLICATION_PART_LEAF);
+
+		relids = list_concat_unique_oid(relids, schemarelids);
+
+		InvalidateRelSyncCaches(relids);
+	}
+
+	CatalogTupleUpdate(rel, &tup->t_self, tup);
+
+	ObjectAddressSet(address, PublicationRelationId, pubform->oid);
+
+	heap_freetuple(tup);
+
+	table_close(rel, RowExclusiveLock);
+
+	return address;
+}
+
 /* check_functions_in_node callback */
 static bool
 contain_mutable_or_user_functions_checker(Oid func_id, void *context)
@@ -1996,6 +2085,37 @@ AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
 	}
 
 	form->pubowner = newOwnerId;
+
+	/*
+	 * Invalidate caches on the logical decoding output plugin.
+	 *
+	 * No need to invalidate relcache as the same reason as RENAME command.
+	 * Please see comments in RenamePublication().
+	 */
+	if (form->puballtables)
+	{
+		CacheInvalidateRelSyncAll();
+	}
+	else
+	{
+		List	   *relids = NIL;
+		List	   *schemarelids = NIL;
+
+		/*
+		 * For partition table, when we insert data, get_rel_sync_entry is
+		 * called and a hash entry is created for the corresponding leaf table.
+		 * So invalidating the leaf nodes would be sufficient here.
+		 */
+		relids = GetPublicationRelations(form->oid,
+										 PUBLICATION_PART_LEAF);
+		schemarelids = GetAllSchemaPublicationRelations(form->oid,
+														PUBLICATION_PART_LEAF);
+
+		relids = list_concat_unique_oid(relids, schemarelids);
+
+		InvalidateRelSyncCaches(relids);
+	}
+
 	CatalogTupleUpdate(rel, &tup->t_self, tup);
 
 	/* Update owner dependency reference */
@@ -2096,3 +2216,22 @@ defGetGeneratedColsOption(DefElem *def)
 
 	return PUBLISH_GENCOLS_NONE;	/* keep compiler quiet */
 }
+
+
+void
+InvalidateRelSyncCaches(List *relids)
+{
+	/*
+	 * We don't want to send too many individual messages, at some point it's
+	 * cheaper to just reset whole relcache.
+	 */
+	if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
+	{
+		ListCell   *lc;
+
+		foreach(lc, relids)
+			CacheInvalidateRelSync(lfirst_oid(lc));
+	}
+	else
+		CacheInvalidateRelSync(InvalidOid);
+}
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 7d99c9355c..49fe0567c5 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -9513,7 +9513,7 @@ RenameStmt: ALTER AGGREGATE aggregate_with_argtypes RENAME TO name
 					RenameStmt *n = makeNode(RenameStmt);
 
 					n->renameType = OBJECT_PUBLICATION;
-					n->object = (Node *) makeString($3);
+					n->subname = $3;
 					n->newname = $6;
 					n->missing_ok = false;
 					$$ = (Node *) n;
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 64aea3af83..527165dc1a 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -531,6 +531,8 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 			CacheRegisterSyscacheCallback(PUBLICATIONOID,
 										  publication_invalidation_cb,
 										  (Datum) 0);
+			CacheRegisterRelSyncCallback(rel_sync_cache_relation_cb,
+										 (Datum) 0);
 			publication_callback_registered = true;
 		}
 
@@ -1789,12 +1791,6 @@ static void
 publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
 {
 	publications_valid = false;
-
-	/*
-	 * Also invalidate per-relation cache so that next time the filtering info
-	 * is checked it will be updated with the new publication settings.
-	 */
-	rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
 }
 
 /*
diff --git a/src/include/commands/publicationcmds.h b/src/include/commands/publicationcmds.h
index e11a942ea0..f130ea3090 100644
--- a/src/include/commands/publicationcmds.h
+++ b/src/include/commands/publicationcmds.h
@@ -38,5 +38,7 @@ extern bool pub_contains_invalid_column(Oid pubid, Relation relation,
 										char pubgencols_type,
 										bool *invalid_column_list,
 										bool *invalid_gen_col);
+extern ObjectAddress RenamePublication(const char *oldname, const char *newname);
+extern void InvalidateRelSyncCaches(List *relids);
 
 #endif							/* PUBLICATIONCMDS_H */
-- 
2.43.5

#4Zhijie Hou (Fujitsu)
houzj.fnst@fujitsu.com
In reply to: Hayato Kuroda (Fujitsu) (#3)
RE: Selectively invalidate caches in pgoutput module

On Tuesday, March 4, 2025 7:44 PM Kuroda, Hayato/黒田 隼人 <kuroda.hayato@fujitsu.com> wrote:

Hi,

When the ALTER PUBLICATION command is executed, all entries in

RelationSyncCache

will be discarded anyway. This mechanism works well but is sometimes
not

efficient.

For example, when the ALTER PUBLICATION DROP TABLE is executed,
1) the specific entry in RelationSyncCache will be removed, and then
2) all entries will be discarded twice.

In (2) Why twice? Is it because we call
rel_sync_cache_publication_cb() first for PUBLICATIONRELMAP and then
for PUBLICATIONOID via publication_invalidation_cb()?

This part was not correct. I considered that ALTER PUBLICATION DROP TABLE
also invalidated the pg_publication syscache, but it would not do. So...the
command firstly invalidate a specific entry, and then invalidate all entries once.

This happens because the pgoutput plugin registers both
RelcacheCallback
(rel_sync_cache_relation_cb) and SyscacheCallback

(publication_invalidation_cb,

rel_sync_cache_publication_cb). Then, when ALTER PUBLICATION

ADD/SET/DROP is executed,

both the relation cache of added tables and the syscache of
pg_publication_rel

and

pg_publication are invalidated.
The callback for the relation cache will remove an entry from the
hash table, and syscache callbacks will look up all entries and
invalidate them. However,

AFAICS

does not need to invalidate all of them.

IIUC, you want to say in the above case only (1) would be sufficient, right?

Right, this is a main aim of this proposal.

Is it possible to see the impact? I guess if there are a large number
of relations (or partitions), then all will get invalidated even if
one relation is added/dropped from the publication; if so, this should
impact the performance.

Agreed. I'm planning to do and share results. Please wait...

For Rename/Owner, can we use a new invalidation instead of using
relcache invalidation, as that can impact other sessions for no
benefit? IIUC, changing the other properties of publication like
dropping the table requires us to invalidate even the corresponding
relcache entry because it contains the publication descriptor
(rd_pubdesc). See RelationBuildPublicationDesc().

Attached patchset implemented with the approach.
0001 contains only part to avoid whole cache invalidation, for ADD/SET/DROP
command.

I think the changes proposed in 0001 are reasonable. Basically, any
modifications to pg_publication_rel or pg_publication_namespace should trigger
relcache invalidation messages. This is necessary for rebuilding the cache
stored in RelationData::rd_pubdesc, given that these changes could impact
whether a table is published. Following this logic, it should be safe and
appropriate to depend on individual relcache invalidation messages to
invalidate the relsynccache in pgoutput, rather than invalidating the entire
relsynccache for a single catalog change.

Additionally, I've verified the scenario involving partitioned tables to ensure
the invalidation behaves correctly. When a parent table is added to a
publication (with pubviaroot=true), it should influence the publication status
of its child tables as well. And this case works as expected, since
invalidation messages are dispatched for all items in the partition tree
(GetPubPartitionOptionRelations()).

One nitpick for 0001: the comments atop of the rel_sync_cache_publication_cb()
need updating, as they still reference pg_publication_rel and
pg_publication_namespace. Aside from this, 0001 looks good to me.

Best Regards,
Hou zj

#5Hayato Kuroda (Fujitsu)
kuroda.hayato@fujitsu.com
In reply to: Zhijie Hou (Fujitsu) (#4)
3 attachment(s)
RE: Selectively invalidate caches in pgoutput module

Dear Hou,

Thanks for reading the thread!

Attached patchset implemented with the approach.
0001 contains only part to avoid whole cache invalidation, for ADD/SET/DROP
command.

I think the changes proposed in 0001 are reasonable. Basically, any
modifications to pg_publication_rel or pg_publication_namespace should trigger
relcache invalidation messages. This is necessary for rebuilding the cache
stored in RelationData::rd_pubdesc, given that these changes could impact
whether a table is published. Following this logic, it should be safe and
appropriate to depend on individual relcache invalidation messages to
invalidate the relsynccache in pgoutput, rather than invalidating the entire
relsynccache for a single catalog change.

Right. I should have clarified these points on my previous post.

One nitpick for 0001: the comments atop of the rel_sync_cache_publication_cb()
need updating, as they still reference pg_publication_rel and
pg_publication_namespace. Aside from this, 0001 looks good to me.

Thanks for pointing out, fixed. PSA new version.

Additionally, I found a feasibility that discarding whole-cache can be avoided
more. Currently, all caches are unconditionally removed when pg_namesace is updated.
Invalidation is needed to update the qualified name of tables in the replication
messages after the schema renames, but IIUC it is OK to do for relations that
belong to the renamed schema.

IIUC there are two approaches to implement it.
The first idea is to introduce a new type of invalidation message. This is similar
to what 0002 does. The invalidation message can contain relids that belong to
the renaming schema, and pgoutput can register callback for it.
The second idea is to use syscache callback. IIUC, the callback is passed the
hash of oid. Thus, RelSyncCache can contain hash values of its schema, and they
can be compared with a callback function.

I noted the idea on the code as XXX code comment in 0001.

Best regards,
Hayato Kuroda
FUJITSU LIMITED

Attachments:

v3-0001-Avoid-Invalidating-all-entries-when-ALTER-PUBLICA.patchapplication/octet-stream; name=v3-0001-Avoid-Invalidating-all-entries-when-ALTER-PUBLICA.patchDownload
From 0dd03d8d3476810bae5304472ad5d1aba057388b Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Mon, 3 Mar 2025 19:40:10 +0900
Subject: [PATCH v3 1/3] Avoid Invalidating all entries when ALTER PUBLICATION
 is executed

When the ALTER PUBLICATION command is executed, all entries in RelationSyncCache
will be discarded anyway. This mechanism works well but is sometimes not
efficient. For example, when the ALTER PUBLICATION DROP TABLE is executed,
1) the specific entry in RelationSyncCache will be removed, and then 2) all
entries will be discarded.

This patch avoids dropping all entries in RelationSyncCache when ALTER
PUBLICATION is executed. Theoretically, it is enough to invalidate the related
relacache since RelacheCallback has already been registered.

Author: Shlok Kyal and Hayato Kuroda
---
 src/backend/replication/pgoutput/pgoutput.c | 25 +++++++++------------
 1 file changed, 11 insertions(+), 14 deletions(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 7d464f656a..aebf032892 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -1965,23 +1965,21 @@ init_rel_sync_cache(MemoryContext cachectx)
 	/*
 	 * Flush all cache entries after a pg_namespace change, in case it was a
 	 * schema rename affecting a relation being replicated.
+	 *
+	 * XXX: it may be feasible to avoid setting rel_sync_cache_publication_cb()
+	 * as a syscache callback. One approach is to introduce a new type of
+	 * invalidation message. It can be sent when a schema is modified and
+	 * contains oid of relations included by the schema. Then, pgoutput can set
+	 * a new callback for the message invalidating listed relations. Another
+	 * approach is to store a hash value of pg_namespace.oid that relations
+	 * belong to RelSyncCache entries. Syscache invalidation callbacks are
+	 * passed the hash value of oid for the invalidated tuple, so syscache
+	 * callback can lookup entries and invalidate if matched.
 	 */
 	CacheRegisterSyscacheCallback(NAMESPACEOID,
 								  rel_sync_cache_publication_cb,
 								  (Datum) 0);
 
-	/*
-	 * Flush all cache entries after any publication changes.  (We need no
-	 * callback entry for pg_publication, because publication_invalidation_cb
-	 * will take care of it.)
-	 */
-	CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
-								  rel_sync_cache_publication_cb,
-								  (Datum) 0);
-	CacheRegisterSyscacheCallback(PUBLICATIONNAMESPACEMAP,
-								  rel_sync_cache_publication_cb,
-								  (Datum) 0);
-
 	relation_callbacks_registered = true;
 }
 
@@ -2397,8 +2395,7 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
 /*
  * Publication relation/schema map syscache invalidation callback
  *
- * Called for invalidations on pg_publication, pg_publication_rel,
- * pg_publication_namespace, and pg_namespace.
+ * Called for invalidations on pg_publication and pg_namespace.
  */
 static void
 rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
-- 
2.43.5

v3-0002-Introduce-a-new-invalidation-message-to-invalidat.patchapplication/octet-stream; name=v3-0002-Introduce-a-new-invalidation-message-to-invalidat.patchDownload
From c291e5f8466b4b6806e01d1927332c44d84b3076 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Tue, 4 Mar 2025 16:51:19 +0900
Subject: [PATCH v3 2/3] Introduce a new invalidation message to invalidate
 caches in output plugins

---
 src/backend/utils/cache/inval.c | 87 +++++++++++++++++++++++++++++++++
 src/include/storage/sinval.h    | 11 +++++
 src/include/utils/inval.h       | 10 ++++
 3 files changed, 108 insertions(+)

diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c
index 700ccb6df9..d793bc9281 100644
--- a/src/backend/utils/cache/inval.c
+++ b/src/backend/utils/cache/inval.c
@@ -271,6 +271,7 @@ int			debug_discard_caches = 0;
 
 #define MAX_SYSCACHE_CALLBACKS 64
 #define MAX_RELCACHE_CALLBACKS 10
+#define MAX_RELSYNC_CALLBACKS 10
 
 static struct SYSCACHECALLBACK
 {
@@ -292,6 +293,15 @@ static struct RELCACHECALLBACK
 
 static int	relcache_callback_count = 0;
 
+static struct RELSYNCCALLBACK
+{
+	RelSyncCallbackFunction function;
+	Datum		arg;
+}			relsync_callback_list[MAX_RELSYNC_CALLBACKS];
+
+static int	relsync_callback_count = 0;
+
+
 /* ----------------------------------------------------------------
  *				Invalidation subgroup support functions
  * ----------------------------------------------------------------
@@ -832,6 +842,12 @@ LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
 		else if (msg->sn.dbId == MyDatabaseId)
 			InvalidateCatalogSnapshot();
 	}
+	else if (msg->id == SHAREDINVALRELSYNC_ID)
+	{
+		/* We only care about our own database */
+		if (msg->rs.dbId == MyDatabaseId)
+			CallRelSyncCallbacks(msg->rs.relid);
+	}
 	else
 		elog(FATAL, "unrecognized SI message ID: %d", msg->id);
 }
@@ -1695,6 +1711,42 @@ CacheInvalidateRelmap(Oid databaseId)
 }
 
 
+/*
+ * RelationCacheInvalidate
+ *		Register invalidation of the cache in logical decoding output plugin
+ *		for a database.
+ *
+ * This type of invalidation message is used for the specific purpose of output
+ * plugins. Processes which do not decode WALs would do nothing even when it
+ * receives the message.
+ */
+void
+CacheInvalidateRelSync(Oid relid)
+{
+	SharedInvalidationMessage msg;
+
+	msg.rs.id = SHAREDINVALRELSYNC_ID;
+	msg.rs.dbId = MyDatabaseId;
+	msg.rs.relid = relid;
+	/* check AddCatcacheInvalidationMessage() for an explanation */
+	VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg));
+
+	SendSharedInvalidMessages(&msg, 1);
+}
+
+
+/*
+ * CacheInvalidateRelSyncAll
+ *		Register invalidation of the whole cache in logical decoding output
+ *		plugin.
+ */
+void
+CacheInvalidateRelSyncAll(void)
+{
+	CacheInvalidateRelSync(InvalidOid);
+}
+
+
 /*
  * CacheRegisterSyscacheCallback
  *		Register the specified function to be called for all future
@@ -1763,6 +1815,27 @@ CacheRegisterRelcacheCallback(RelcacheCallbackFunction func,
 	++relcache_callback_count;
 }
 
+/*
+ * CacheRegisterRelSyncCallback
+ *		Register the specified function to be called for all future
+ *		decoding-cache invalidation events.
+ *
+ * This function is intended to be call from the logical decoding output
+ * plugins.
+ */
+void
+CacheRegisterRelSyncCallback(RelSyncCallbackFunction func,
+							 Datum arg)
+{
+	if (relsync_callback_count >= MAX_RELSYNC_CALLBACKS)
+		elog(FATAL, "out of relsync_callback_list slots");
+
+	relsync_callback_list[relsync_callback_count].function = func;
+	relsync_callback_list[relsync_callback_count].arg = arg;
+
+	++relsync_callback_count;
+}
+
 /*
  * CallSyscacheCallbacks
  *
@@ -1788,6 +1861,20 @@ CallSyscacheCallbacks(int cacheid, uint32 hashvalue)
 	}
 }
 
+/*
+ * CallSyscacheCallbacks
+ */
+void
+CallRelSyncCallbacks(Oid relid)
+{
+	for (int i = 0; i < relsync_callback_count; i++)
+	{
+		struct RELSYNCCALLBACK *ccitem = relsync_callback_list + i;
+
+		ccitem->function(ccitem->arg, relid);
+	}
+}
+
 /*
  * LogLogicalInvalidations
  *
diff --git a/src/include/storage/sinval.h b/src/include/storage/sinval.h
index 2463c0f9fa..90a5af4ed8 100644
--- a/src/include/storage/sinval.h
+++ b/src/include/storage/sinval.h
@@ -27,6 +27,7 @@
  *	* invalidate an smgr cache entry for a specific physical relation
  *	* invalidate the mapped-relation mapping for a given database
  *	* invalidate any saved snapshot that might be used to scan a given relation
+ *	* invalidate a specific entry for specific output plugin
  * More types could be added if needed.  The message type is identified by
  * the first "int8" field of the message struct.  Zero or positive means a
  * specific-catcache inval message (and also serves as the catcache ID field).
@@ -110,6 +111,15 @@ typedef struct
 	Oid			relId;			/* relation ID */
 } SharedInvalSnapshotMsg;
 
+#define SHAREDINVALRELSYNC_ID	(-6)
+
+typedef struct
+{
+	int8		id;				/* type field --- must be first */
+	Oid			dbId;			/* database ID */
+	Oid			relid;			/* relation ID, or 0 if whole relcache */
+} SharedInvalRelSyncMsg;
+
 typedef union
 {
 	int8		id;				/* type field --- must be first */
@@ -119,6 +129,7 @@ typedef union
 	SharedInvalSmgrMsg sm;
 	SharedInvalRelmapMsg rm;
 	SharedInvalSnapshotMsg sn;
+	SharedInvalRelSyncMsg rs;
 } SharedInvalidationMessage;
 
 
diff --git a/src/include/utils/inval.h b/src/include/utils/inval.h
index 40658ba2ff..5922306c11 100644
--- a/src/include/utils/inval.h
+++ b/src/include/utils/inval.h
@@ -22,6 +22,7 @@ extern PGDLLIMPORT int debug_discard_caches;
 
 typedef void (*SyscacheCallbackFunction) (Datum arg, int cacheid, uint32 hashvalue);
 typedef void (*RelcacheCallbackFunction) (Datum arg, Oid relid);
+typedef void (*RelSyncCallbackFunction) (Datum arg, Oid relid);
 
 
 extern void AcceptInvalidationMessages(void);
@@ -59,6 +60,10 @@ extern void CacheInvalidateSmgr(RelFileLocatorBackend rlocator);
 
 extern void CacheInvalidateRelmap(Oid databaseId);
 
+extern void CacheInvalidateRelSync(Oid relid);
+
+extern void CacheInvalidateRelSyncAll(void);
+
 extern void CacheRegisterSyscacheCallback(int cacheid,
 										  SyscacheCallbackFunction func,
 										  Datum arg);
@@ -66,8 +71,13 @@ extern void CacheRegisterSyscacheCallback(int cacheid,
 extern void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func,
 										  Datum arg);
 
+extern void CacheRegisterRelSyncCallback(RelSyncCallbackFunction func,
+										 Datum arg);
+
 extern void CallSyscacheCallbacks(int cacheid, uint32 hashvalue);
 
+extern void CallRelSyncCallbacks(Oid relid);
+
 extern void InvalidateSystemCaches(void);
 extern void InvalidateSystemCachesExtended(bool debug_discard);
 
-- 
2.43.5

v3-0003-Invalidate-Relcaches-while-ALTER-PUBLICATION-OWNE.patchapplication/octet-stream; name=v3-0003-Invalidate-Relcaches-while-ALTER-PUBLICATION-OWNE.patchDownload
From 8a30616b994899dcc5d95ec25bb4d837247a90b4 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Mon, 3 Mar 2025 19:41:04 +0900
Subject: [PATCH v3 3/3] Invalidate Relcaches while ALTER PUBLICATION OWNER
 TO/RENAME TO

---
 src/backend/commands/alter.c                |   4 +-
 src/backend/commands/publicationcmds.c      | 139 ++++++++++++++++++++
 src/backend/parser/gram.y                   |   2 +-
 src/backend/replication/pgoutput/pgoutput.c |   8 +-
 src/include/commands/publicationcmds.h      |   2 +
 5 files changed, 147 insertions(+), 8 deletions(-)

diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c
index 78c1d4e1b8..a79329acc1 100644
--- a/src/backend/commands/alter.c
+++ b/src/backend/commands/alter.c
@@ -400,6 +400,9 @@ ExecRenameStmt(RenameStmt *stmt)
 		case OBJECT_TYPE:
 			return RenameType(stmt);
 
+		case OBJECT_PUBLICATION:
+			return RenamePublication(stmt->subname, stmt->newname);
+
 		case OBJECT_AGGREGATE:
 		case OBJECT_COLLATION:
 		case OBJECT_CONVERSION:
@@ -417,7 +420,6 @@ ExecRenameStmt(RenameStmt *stmt)
 		case OBJECT_TSDICTIONARY:
 		case OBJECT_TSPARSER:
 		case OBJECT_TSTEMPLATE:
-		case OBJECT_PUBLICATION:
 		case OBJECT_SUBSCRIPTION:
 			{
 				ObjectAddress address;
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 150a768d16..c1cab00ddb 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -491,6 +491,95 @@ pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
 	return *invalid_column_list || *invalid_gen_col;
 }
 
+/*
+ * Execute ALTER PUBLICATION RENAME
+ */
+ObjectAddress
+RenamePublication(const char *oldname, const char *newname)
+{
+	Relation			rel;
+	HeapTuple			tup;
+	ObjectAddress		address;
+	Form_pg_publication	pubform;
+	bool				replaces[Natts_pg_publication];
+	bool				nulls[Natts_pg_publication];
+	Datum				values[Natts_pg_publication];
+
+	rel = table_open(PublicationRelationId, RowExclusiveLock);
+
+	tup = SearchSysCacheCopy1(PUBLICATIONNAME,
+							  CStringGetDatum(oldname));
+
+	if (!HeapTupleIsValid(tup))
+		ereport(ERROR,
+				(errcode(ERRCODE_UNDEFINED_OBJECT),
+				 errmsg("publication \"%s\" does not exist",
+						oldname)));
+
+	pubform = (Form_pg_publication) GETSTRUCT(tup);
+
+	/* must be owner */
+	if (!object_ownercheck(PublicationRelationId, pubform->oid, GetUserId()))
+		aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
+					   NameStr(pubform->pubname));
+
+	/* Everything ok, form a new tuple. */
+	memset(values, 0, sizeof(values));
+	memset(nulls, false, sizeof(nulls));
+	memset(replaces, false, sizeof(replaces));
+
+	/* Only update the pubname */
+	values[Anum_pg_publication_pubname - 1] =
+		DirectFunctionCall1(namein, CStringGetDatum(newname));
+	replaces[Anum_pg_publication_pubname - 1] = true;
+
+	tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+							replaces);
+
+	/*
+	 * Invalidate caches on the logical decoding output plugin.
+	 *
+	 * Apart from the ALTER PUBLICATION ADD/SET/DROP commands, we do not have
+	 * to invalidate relcaches. They are required to refresh the publication's
+	 * descriptor. The publication's name is not recorded in the attribute, so
+	 * the RENAME commands do not require a refresh. Instead, we only
+	 * invalidate a cache on the output plugin to rebuild its cache.
+	 */
+	if (pubform->puballtables)
+	{
+		CacheInvalidateRelSyncAll();
+	}
+	else
+	{
+		List	   *relids = NIL;
+		List	   *schemarelids = NIL;
+
+		/*
+		 * For partition table, when we insert data, get_rel_sync_entry is
+		 * called and a hash entry is created for the corresponding leaf table.
+		 * So invalidating the leaf nodes would be sufficient here.
+		 */
+		relids = GetPublicationRelations(pubform->oid,
+										 PUBLICATION_PART_LEAF);
+		schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
+														PUBLICATION_PART_LEAF);
+
+		relids = list_concat_unique_oid(relids, schemarelids);
+
+		InvalidateRelSyncCaches(relids);
+	}
+
+	CatalogTupleUpdate(rel, &tup->t_self, tup);
+
+	ObjectAddressSet(address, PublicationRelationId, pubform->oid);
+
+	heap_freetuple(tup);
+
+	table_close(rel, RowExclusiveLock);
+
+	return address;
+}
+
 /* check_functions_in_node callback */
 static bool
 contain_mutable_or_user_functions_checker(Oid func_id, void *context)
@@ -1996,6 +2085,37 @@ AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
 	}
 
 	form->pubowner = newOwnerId;
+
+	/*
+	 * Invalidate caches on the logical decoding output plugin.
+	 *
+	 * No need to invalidate relcache as the same reason as RENAME command.
+	 * Please see comments in RenamePublication().
+	 */
+	if (form->puballtables)
+	{
+		CacheInvalidateRelSyncAll();
+	}
+	else
+	{
+		List	   *relids = NIL;
+		List	   *schemarelids = NIL;
+
+		/*
+		 * For partition table, when we insert data, get_rel_sync_entry is
+		 * called and a hash entry is created for the corresponding leaf table.
+		 * So invalidating the leaf nodes would be sufficient here.
+		 */
+		relids = GetPublicationRelations(form->oid,
+										 PUBLICATION_PART_LEAF);
+		schemarelids = GetAllSchemaPublicationRelations(form->oid,
+														PUBLICATION_PART_LEAF);
+
+		relids = list_concat_unique_oid(relids, schemarelids);
+
+		InvalidateRelSyncCaches(relids);
+	}
+
 	CatalogTupleUpdate(rel, &tup->t_self, tup);
 
 	/* Update owner dependency reference */
@@ -2096,3 +2216,22 @@ defGetGeneratedColsOption(DefElem *def)
 
 	return PUBLISH_GENCOLS_NONE;	/* keep compiler quiet */
 }
+
+
+void
+InvalidateRelSyncCaches(List *relids)
+{
+	/*
+	 * We don't want to send too many individual messages, at some point it's
+	 * cheaper to just reset whole relcache.
+	 */
+	if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
+	{
+		ListCell   *lc;
+
+		foreach(lc, relids)
+			CacheInvalidateRelSync(lfirst_oid(lc));
+	}
+	else
+		CacheInvalidateRelSync(InvalidOid);
+}
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index c11a3beff0..55153a1db4 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -9513,7 +9513,7 @@ RenameStmt: ALTER AGGREGATE aggregate_with_argtypes RENAME TO name
 					RenameStmt *n = makeNode(RenameStmt);
 
 					n->renameType = OBJECT_PUBLICATION;
-					n->object = (Node *) makeString($3);
+					n->subname = $3;
 					n->newname = $6;
 					n->missing_ok = false;
 					$$ = (Node *) n;
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index aebf032892..bd4007ea98 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -531,6 +531,8 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 			CacheRegisterSyscacheCallback(PUBLICATIONOID,
 										  publication_invalidation_cb,
 										  (Datum) 0);
+			CacheRegisterRelSyncCallback(rel_sync_cache_relation_cb,
+										 (Datum) 0);
 			publication_callback_registered = true;
 		}
 
@@ -1789,12 +1791,6 @@ static void
 publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
 {
 	publications_valid = false;
-
-	/*
-	 * Also invalidate per-relation cache so that next time the filtering info
-	 * is checked it will be updated with the new publication settings.
-	 */
-	rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
 }
 
 /*
diff --git a/src/include/commands/publicationcmds.h b/src/include/commands/publicationcmds.h
index e11a942ea0..f130ea3090 100644
--- a/src/include/commands/publicationcmds.h
+++ b/src/include/commands/publicationcmds.h
@@ -38,5 +38,7 @@ extern bool pub_contains_invalid_column(Oid pubid, Relation relation,
 										char pubgencols_type,
 										bool *invalid_column_list,
 										bool *invalid_gen_col);
+extern ObjectAddress RenamePublication(const char *oldname, const char *newname);
+extern void InvalidateRelSyncCaches(List *relids);
 
 #endif							/* PUBLICATIONCMDS_H */
-- 
2.43.5

#6Hayato Kuroda (Fujitsu)
kuroda.hayato@fujitsu.com
In reply to: Hayato Kuroda (Fujitsu) (#5)
1 attachment(s)
RE: Selectively invalidate caches in pgoutput module

Dear hackers,

I did a performance testing with HEAD and v2-0001, and confirmed that it could
improve performance around 20% in the typical case.

Workload
======
I emulated a scenario that there are many tables to be published and only one
table is stop and resume publishing. In HEAD, ALTER PUBLICATION DROP/ADD command
invalidates all entries in the RelSyncCache, but v2 invalidates only a needed
one - this means the decoding time would be reduced after patching.

1. Initialized an instance
2. Created a root table and 5000 leaf tables.
3. Created another table which did not have parent-child relationship.
4. Created a publication which included a root table and another table
5. Created a replication slot with pgoutput plugin.
6. Executed a transaction which would be decoded. In the transaction:
a. Inserted tuples to all the leaf tables
b. Altered publication to drop another table
c. Altered publication again to add the dropped one
d. Inserted tuples to all the leaf tables again.
7. measured decoding time via SQL interfaces, five times.

Attached script automated above.

Results
=====
Each cell is a median value of 5 runs. Compared with HEAD, V2-0001 can reduce
the decoding time, relatively 20%.

head [ms] v2-0001 [ms]
252 198

I'm planning to do further tests to prove the benefit for 0002, 0003 patches.

Best regards,
Hayato Kuroda
FUJITSU LIMITED

Attachments:

test_p.shapplication/octet-stream; name=test_p.shDownload
#7Amit Kapila
amit.kapila16@gmail.com
In reply to: Hayato Kuroda (Fujitsu) (#6)
Re: Selectively invalidate caches in pgoutput module

On Wed, Mar 5, 2025 at 2:35 PM Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:

I did a performance testing with HEAD and v2-0001, and confirmed that it could
improve performance around 20% in the typical case.

Workload
======
I emulated a scenario that there are many tables to be published and only one
table is stop and resume publishing. In HEAD, ALTER PUBLICATION DROP/ADD command
invalidates all entries in the RelSyncCache, but v2 invalidates only a needed
one - this means the decoding time would be reduced after patching.

1. Initialized an instance
2. Created a root table and 5000 leaf tables.
3. Created another table which did not have parent-child relationship.
4. Created a publication which included a root table and another table
5. Created a replication slot with pgoutput plugin.
6. Executed a transaction which would be decoded. In the transaction:
a. Inserted tuples to all the leaf tables
b. Altered publication to drop another table
c. Altered publication again to add the dropped one
d. Inserted tuples to all the leaf tables again.
7. measured decoding time via SQL interfaces, five times.

Attached script automated above.

Results
=====
Each cell is a median value of 5 runs. Compared with HEAD, V2-0001 can reduce
the decoding time, relatively 20%.

Yeah, this is a good improvement and the patch looks safe to me, so I
pushed with minor changes in the comments.

..

I'm planning to do further tests to prove the benefit for 0002, 0003 patches.

Ideally, we need the time of only step-6d with and without the patch.
Step 6-a is required to build the cache, but in actual workloads via
walsender, we won't need that, and also, at the end of SQL API
execution, we will forcibly invalidate all caches, so that is also not
required. See, if in the performance measurement of the next set of
patches, you can avoid that.

--
With Regards,
Amit Kapila.

#8Hayato Kuroda (Fujitsu)
kuroda.hayato@fujitsu.com
In reply to: Amit Kapila (#7)
3 attachment(s)
RE: Selectively invalidate caches in pgoutput module

Dear Amit, hackers,

Yeah, this is a good improvement and the patch looks safe to me, so I
pushed with minor changes in the comments.

Thanks! PSA rebased patch. While considering more about 0002, I found a
conceptual bug - when relsync cache could not be re-built when SQL interfaces are
used. In old vesrion inval messages were not recorded on the WAL, so relsync
cache could not be discarded if decoding happed later.
In new version, the invalidation is handled as transactional and serialized to
the WAL. More detail, messages are stored in InvalMessageArrays and consumed at
the end of transactions. For simplfication, the subgroup of the message is set
as "relcache" when serializing. But IIUC the handling is OK - snapshot
invalidation does the same approach.

I did a performance testing with given patch. ()
Mostly same as previous test but used two publications.

1. Initialize an instance
2. Create a root table and leaf tables. The variable $NUM_PART controls
how many partitions exist on the instance.
3. Create another table
4. Create publications (p1, p2). One includes a root table, and another one includes
independent table.
5. Create a replication slot with pgoutput plugint.
6. Execute a transaction which would be decoded. In the transaction:
a. Insert tuples to all the leaf tables
b. Rename publication p2 to p2_renamed
c. Rename publication p2_renamed to p2
d. Insert tuples to all the leaf tables again.
7. decode all the changes via SQL interfaces.

My expectation is for HEAD, leaves are cached at a), but they would be
discarded at b) and c), then they are cached again at d).
For patched case, it only an independent table would be discarded at b) and c) so the
decoding time should be reduced.

Below results are the median of five runs. ITSM around 15% improvements.

head [ms] patched (0001-0003)
239 200

Ideally, we need the time of only step-6d with and without the patch.
Step 6-a is required to build the cache, but in actual workloads via
walsender, we won't need that, and also, at the end of SQL API
execution, we will forcibly invalidate all caches, so that is also not
required. See, if in the performance measurement of the next set of
patches, you can avoid that.

Hmm. It requires additional debug messages. Let me consider.

Best regards,
Hayato Kuroda
FUJITSU LIMITED

Attachments:

v4-0001-Introduce-a-new-invalidation-message-to-invalidat.patchapplication/octet-stream; name=v4-0001-Introduce-a-new-invalidation-message-to-invalidat.patchDownload
From 7983fec0e7c82649d5ce8db77b9d19c81e2346ed Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Tue, 4 Mar 2025 16:51:19 +0900
Subject: [PATCH v4 1/2] Introduce a new invalidation message to invalidate
 caches in output plugins

A new invalidation message is generated when either ALTER PUBLICATION RENAME TO
or OWNER TO is executed. The primal use-case of the message is to invalidate
caches on the logical decoding output plugin. Plugins can register callback
functions for the message via CacheRegisterRelSyncCallback(), and the function
can invalidate the cache for the specified relation.

A new invalidation message is transactional, and decoder processes should
recognize the message and invalidate specified caches. Thus, the messages are
stored in InvalMessageArray and serialized at the end of the transaction.
---
 src/backend/access/rmgrdesc/standbydesc.c |   2 +
 src/backend/utils/cache/inval.c           | 120 ++++++++++++++++++++++
 src/include/storage/sinval.h              |  11 ++
 src/include/utils/inval.h                 |  10 ++
 4 files changed, 143 insertions(+)

diff --git a/src/backend/access/rmgrdesc/standbydesc.c b/src/backend/access/rmgrdesc/standbydesc.c
index d849f8e54b..81eff5f31c 100644
--- a/src/backend/access/rmgrdesc/standbydesc.c
+++ b/src/backend/access/rmgrdesc/standbydesc.c
@@ -132,6 +132,8 @@ standby_desc_invalidations(StringInfo buf,
 			appendStringInfo(buf, " relmap db %u", msg->rm.dbId);
 		else if (msg->id == SHAREDINVALSNAPSHOT_ID)
 			appendStringInfo(buf, " snapshot %u", msg->sn.relId);
+		else if (msg->id == SHAREDINVALRELSYNC_ID)
+			appendStringInfo(buf, " relsync %u", msg->rs.relid);
 		else
 			appendStringInfo(buf, " unrecognized id %d", msg->id);
 	}
diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c
index 700ccb6df9..acc3325b27 100644
--- a/src/backend/utils/cache/inval.c
+++ b/src/backend/utils/cache/inval.c
@@ -271,6 +271,7 @@ int			debug_discard_caches = 0;
 
 #define MAX_SYSCACHE_CALLBACKS 64
 #define MAX_RELCACHE_CALLBACKS 10
+#define MAX_RELSYNC_CALLBACKS 10
 
 static struct SYSCACHECALLBACK
 {
@@ -292,6 +293,15 @@ static struct RELCACHECALLBACK
 
 static int	relcache_callback_count = 0;
 
+static struct RELSYNCCALLBACK
+{
+	RelSyncCallbackFunction function;
+	Datum		arg;
+}			relsync_callback_list[MAX_RELSYNC_CALLBACKS];
+
+static int	relsync_callback_count = 0;
+
+
 /* ----------------------------------------------------------------
  *				Invalidation subgroup support functions
  * ----------------------------------------------------------------
@@ -484,6 +494,34 @@ AddRelcacheInvalidationMessage(InvalidationMsgsGroup *group,
 	AddInvalidationMessage(group, RelCacheMsgs, &msg);
 }
 
+/*
+ * Add a relsync inval entry
+ *
+ * We put these into the relcache subgroup for simplicity.
+ */
+static void
+AddRelsyncInvalidationMessage(InvalidationMsgsGroup *group,
+							  Oid dbId, Oid relId)
+{
+	SharedInvalidationMessage msg;
+
+	/* Don't add a duplicate item */
+	/* We assume dbId need not be checked because it will never change */
+	ProcessMessageSubGroup(group, RelCacheMsgs,
+						   if (msg->rc.id == SHAREDINVALRELSYNC_ID &&
+							   msg->rc.relId == relId)
+						   return);
+
+	/* OK, add the item */
+	msg.rc.id = SHAREDINVALRELSYNC_ID;
+	msg.rc.dbId = dbId;
+	msg.rc.relId = relId;
+	/* check AddCatcacheInvalidationMessage() for an explanation */
+	VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg));
+
+	AddInvalidationMessage(group, RelCacheMsgs, &msg);
+}
+
 /*
  * Add a snapshot inval entry
  *
@@ -611,6 +649,18 @@ RegisterRelcacheInvalidation(InvalidationInfo *info, Oid dbId, Oid relId)
 		info->RelcacheInitFileInval = true;
 }
 
+/*
+ * RegisterRelcacheInvalidation
+ *
+ * As above, but register a relsync invalidation event.
+ */
+static void
+RegisterRelsyncInvalidation(InvalidationInfo *info, Oid dbId, Oid relId)
+{
+	AddRelsyncInvalidationMessage(&info->CurrentCmdInvalidMsgs, dbId, relId);
+}
+
+
 /*
  * RegisterSnapshotInvalidation
  *
@@ -832,6 +882,12 @@ LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
 		else if (msg->sn.dbId == MyDatabaseId)
 			InvalidateCatalogSnapshot();
 	}
+	else if (msg->id == SHAREDINVALRELSYNC_ID)
+	{
+		/* We only care about our own database */
+		if (msg->rs.dbId == MyDatabaseId)
+			CallRelSyncCallbacks(msg->rs.relid);
+	}
 	else
 		elog(FATAL, "unrecognized SI message ID: %d", msg->id);
 }
@@ -1622,6 +1678,35 @@ CacheInvalidateRelcacheByRelid(Oid relid)
 }
 
 
+/*
+ * RelationCacheInvalidate
+ *		Register invalidation of the cache in logical decoding output plugin
+ *		for a database.
+ *
+ * This type of invalidation message is used for the specific purpose of output
+ * plugins. Processes which do not decode WALs would do nothing even when it
+ * receives the message.
+ */
+void
+CacheInvalidateRelSync(Oid relid)
+{
+	RegisterRelsyncInvalidation(PrepareInvalidationState(),
+								MyDatabaseId, relid);
+}
+
+
+/*
+ * CacheInvalidateRelSyncAll
+ *		Register invalidation of the whole cache in logical decoding output
+ *		plugin.
+ */
+void
+CacheInvalidateRelSyncAll(void)
+{
+	CacheInvalidateRelSync(InvalidOid);
+}
+
+
 /*
  * CacheInvalidateSmgr
  *		Register invalidation of smgr references to a physical relation.
@@ -1763,6 +1848,27 @@ CacheRegisterRelcacheCallback(RelcacheCallbackFunction func,
 	++relcache_callback_count;
 }
 
+/*
+ * CacheRegisterRelSyncCallback
+ *		Register the specified function to be called for all future
+ *		decoding-cache invalidation events.
+ *
+ * This function is intended to be call from the logical decoding output
+ * plugins.
+ */
+void
+CacheRegisterRelSyncCallback(RelSyncCallbackFunction func,
+							 Datum arg)
+{
+	if (relsync_callback_count >= MAX_RELSYNC_CALLBACKS)
+		elog(FATAL, "out of relsync_callback_list slots");
+
+	relsync_callback_list[relsync_callback_count].function = func;
+	relsync_callback_list[relsync_callback_count].arg = arg;
+
+	++relsync_callback_count;
+}
+
 /*
  * CallSyscacheCallbacks
  *
@@ -1788,6 +1894,20 @@ CallSyscacheCallbacks(int cacheid, uint32 hashvalue)
 	}
 }
 
+/*
+ * CallSyscacheCallbacks
+ */
+void
+CallRelSyncCallbacks(Oid relid)
+{
+	for (int i = 0; i < relsync_callback_count; i++)
+	{
+		struct RELSYNCCALLBACK *ccitem = relsync_callback_list + i;
+
+		ccitem->function(ccitem->arg, relid);
+	}
+}
+
 /*
  * LogLogicalInvalidations
  *
diff --git a/src/include/storage/sinval.h b/src/include/storage/sinval.h
index 2463c0f9fa..90a5af4ed8 100644
--- a/src/include/storage/sinval.h
+++ b/src/include/storage/sinval.h
@@ -27,6 +27,7 @@
  *	* invalidate an smgr cache entry for a specific physical relation
  *	* invalidate the mapped-relation mapping for a given database
  *	* invalidate any saved snapshot that might be used to scan a given relation
+ *	* invalidate a specific entry for specific output plugin
  * More types could be added if needed.  The message type is identified by
  * the first "int8" field of the message struct.  Zero or positive means a
  * specific-catcache inval message (and also serves as the catcache ID field).
@@ -110,6 +111,15 @@ typedef struct
 	Oid			relId;			/* relation ID */
 } SharedInvalSnapshotMsg;
 
+#define SHAREDINVALRELSYNC_ID	(-6)
+
+typedef struct
+{
+	int8		id;				/* type field --- must be first */
+	Oid			dbId;			/* database ID */
+	Oid			relid;			/* relation ID, or 0 if whole relcache */
+} SharedInvalRelSyncMsg;
+
 typedef union
 {
 	int8		id;				/* type field --- must be first */
@@ -119,6 +129,7 @@ typedef union
 	SharedInvalSmgrMsg sm;
 	SharedInvalRelmapMsg rm;
 	SharedInvalSnapshotMsg sn;
+	SharedInvalRelSyncMsg rs;
 } SharedInvalidationMessage;
 
 
diff --git a/src/include/utils/inval.h b/src/include/utils/inval.h
index 40658ba2ff..9b871caef6 100644
--- a/src/include/utils/inval.h
+++ b/src/include/utils/inval.h
@@ -22,6 +22,7 @@ extern PGDLLIMPORT int debug_discard_caches;
 
 typedef void (*SyscacheCallbackFunction) (Datum arg, int cacheid, uint32 hashvalue);
 typedef void (*RelcacheCallbackFunction) (Datum arg, Oid relid);
+typedef void (*RelSyncCallbackFunction) (Datum arg, Oid relid);
 
 
 extern void AcceptInvalidationMessages(void);
@@ -55,6 +56,10 @@ extern void CacheInvalidateRelcacheByTuple(HeapTuple classTuple);
 
 extern void CacheInvalidateRelcacheByRelid(Oid relid);
 
+extern void CacheInvalidateRelSync(Oid relid);
+
+extern void CacheInvalidateRelSyncAll(void);
+
 extern void CacheInvalidateSmgr(RelFileLocatorBackend rlocator);
 
 extern void CacheInvalidateRelmap(Oid databaseId);
@@ -66,8 +71,13 @@ extern void CacheRegisterSyscacheCallback(int cacheid,
 extern void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func,
 										  Datum arg);
 
+extern void CacheRegisterRelSyncCallback(RelSyncCallbackFunction func,
+										 Datum arg);
+
 extern void CallSyscacheCallbacks(int cacheid, uint32 hashvalue);
 
+extern void CallRelSyncCallbacks(Oid relid);
+
 extern void InvalidateSystemCaches(void);
 extern void InvalidateSystemCachesExtended(bool debug_discard);
 
-- 
2.43.5

v4-0002-Invalidate-Relcaches-while-ALTER-PUBLICATION-OWNE.patchapplication/octet-stream; name=v4-0002-Invalidate-Relcaches-while-ALTER-PUBLICATION-OWNE.patchDownload
From b18ad5d2cf4b7fbe9f35a69dcb44701244f04d50 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Mon, 3 Mar 2025 19:41:04 +0900
Subject: [PATCH v4 2/2] Invalidate Relcaches while ALTER PUBLICATION OWNER
 TO/RENAME TO

---
 src/backend/commands/alter.c                |   4 +-
 src/backend/commands/publicationcmds.c      | 139 ++++++++++++++++++++
 src/backend/parser/gram.y                   |   2 +-
 src/backend/replication/pgoutput/pgoutput.c |   8 +-
 src/include/commands/publicationcmds.h      |   2 +
 5 files changed, 147 insertions(+), 8 deletions(-)

diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c
index 78c1d4e1b8..a79329acc1 100644
--- a/src/backend/commands/alter.c
+++ b/src/backend/commands/alter.c
@@ -400,6 +400,9 @@ ExecRenameStmt(RenameStmt *stmt)
 		case OBJECT_TYPE:
 			return RenameType(stmt);
 
+		case OBJECT_PUBLICATION:
+			return RenamePublication(stmt->subname, stmt->newname);
+
 		case OBJECT_AGGREGATE:
 		case OBJECT_COLLATION:
 		case OBJECT_CONVERSION:
@@ -417,7 +420,6 @@ ExecRenameStmt(RenameStmt *stmt)
 		case OBJECT_TSDICTIONARY:
 		case OBJECT_TSPARSER:
 		case OBJECT_TSTEMPLATE:
-		case OBJECT_PUBLICATION:
 		case OBJECT_SUBSCRIPTION:
 			{
 				ObjectAddress address;
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 150a768d16..c1cab00ddb 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -491,6 +491,95 @@ pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
 	return *invalid_column_list || *invalid_gen_col;
 }
 
+/*
+ * Execute ALTER PUBLICATION RENAME
+ */
+ObjectAddress
+RenamePublication(const char *oldname, const char *newname)
+{
+	Relation			rel;
+	HeapTuple			tup;
+	ObjectAddress		address;
+	Form_pg_publication	pubform;
+	bool				replaces[Natts_pg_publication];
+	bool				nulls[Natts_pg_publication];
+	Datum				values[Natts_pg_publication];
+
+	rel = table_open(PublicationRelationId, RowExclusiveLock);
+
+	tup = SearchSysCacheCopy1(PUBLICATIONNAME,
+							  CStringGetDatum(oldname));
+
+	if (!HeapTupleIsValid(tup))
+		ereport(ERROR,
+				(errcode(ERRCODE_UNDEFINED_OBJECT),
+				 errmsg("publication \"%s\" does not exist",
+						oldname)));
+
+	pubform = (Form_pg_publication) GETSTRUCT(tup);
+
+	/* must be owner */
+	if (!object_ownercheck(PublicationRelationId, pubform->oid, GetUserId()))
+		aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
+					   NameStr(pubform->pubname));
+
+	/* Everything ok, form a new tuple. */
+	memset(values, 0, sizeof(values));
+	memset(nulls, false, sizeof(nulls));
+	memset(replaces, false, sizeof(replaces));
+
+	/* Only update the pubname */
+	values[Anum_pg_publication_pubname - 1] =
+		DirectFunctionCall1(namein, CStringGetDatum(newname));
+	replaces[Anum_pg_publication_pubname - 1] = true;
+
+	tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+							replaces);
+
+	/*
+	 * Invalidate caches on the logical decoding output plugin.
+	 *
+	 * Apart from the ALTER PUBLICATION ADD/SET/DROP commands, we do not have
+	 * to invalidate relcaches. They are required to refresh the publication's
+	 * descriptor. The publication's name is not recorded in the attribute, so
+	 * the RENAME commands do not require a refresh. Instead, we only
+	 * invalidate a cache on the output plugin to rebuild its cache.
+	 */
+	if (pubform->puballtables)
+	{
+		CacheInvalidateRelSyncAll();
+	}
+	else
+	{
+		List	   *relids = NIL;
+		List	   *schemarelids = NIL;
+
+		/*
+		 * For partition table, when we insert data, get_rel_sync_entry is
+		 * called and a hash entry is created for the corresponding leaf table.
+		 * So invalidating the leaf nodes would be sufficient here.
+		 */
+		relids = GetPublicationRelations(pubform->oid,
+										 PUBLICATION_PART_LEAF);
+		schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
+														PUBLICATION_PART_LEAF);
+
+		relids = list_concat_unique_oid(relids, schemarelids);
+
+		InvalidateRelSyncCaches(relids);
+	}
+
+	CatalogTupleUpdate(rel, &tup->t_self, tup);
+
+	ObjectAddressSet(address, PublicationRelationId, pubform->oid);
+
+	heap_freetuple(tup);
+
+	table_close(rel, RowExclusiveLock);
+
+	return address;
+}
+
 /* check_functions_in_node callback */
 static bool
 contain_mutable_or_user_functions_checker(Oid func_id, void *context)
@@ -1996,6 +2085,37 @@ AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
 	}
 
 	form->pubowner = newOwnerId;
+
+	/*
+	 * Invalidate caches on the logical decoding output plugin.
+	 *
+	 * No need to invalidate relcache as the same reason as RENAME command.
+	 * Please see comments in RenamePublication().
+	 */
+	if (form->puballtables)
+	{
+		CacheInvalidateRelSyncAll();
+	}
+	else
+	{
+		List	   *relids = NIL;
+		List	   *schemarelids = NIL;
+
+		/*
+		 * For partition table, when we insert data, get_rel_sync_entry is
+		 * called and a hash entry is created for the corresponding leaf table.
+		 * So invalidating the leaf nodes would be sufficient here.
+		 */
+		relids = GetPublicationRelations(form->oid,
+										 PUBLICATION_PART_LEAF);
+		schemarelids = GetAllSchemaPublicationRelations(form->oid,
+														PUBLICATION_PART_LEAF);
+
+		relids = list_concat_unique_oid(relids, schemarelids);
+
+		InvalidateRelSyncCaches(relids);
+	}
+
 	CatalogTupleUpdate(rel, &tup->t_self, tup);
 
 	/* Update owner dependency reference */
@@ -2096,3 +2216,22 @@ defGetGeneratedColsOption(DefElem *def)
 
 	return PUBLISH_GENCOLS_NONE;	/* keep compiler quiet */
 }
+
+
+void
+InvalidateRelSyncCaches(List *relids)
+{
+	/*
+	 * We don't want to send too many individual messages, at some point it's
+	 * cheaper to just reset whole relcache.
+	 */
+	if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
+	{
+		ListCell   *lc;
+
+		foreach(lc, relids)
+			CacheInvalidateRelSync(lfirst_oid(lc));
+	}
+	else
+		CacheInvalidateRelSync(InvalidOid);
+}
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 271ae26cba..0fb4041f15 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -9541,7 +9541,7 @@ RenameStmt: ALTER AGGREGATE aggregate_with_argtypes RENAME TO name
 					RenameStmt *n = makeNode(RenameStmt);
 
 					n->renameType = OBJECT_PUBLICATION;
-					n->object = (Node *) makeString($3);
+					n->subname = $3;
 					n->newname = $6;
 					n->missing_ok = false;
 					$$ = (Node *) n;
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 9063af6e1d..ed806c5430 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -531,6 +531,8 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 			CacheRegisterSyscacheCallback(PUBLICATIONOID,
 										  publication_invalidation_cb,
 										  (Datum) 0);
+			CacheRegisterRelSyncCallback(rel_sync_cache_relation_cb,
+										 (Datum) 0);
 			publication_callback_registered = true;
 		}
 
@@ -1789,12 +1791,6 @@ static void
 publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
 {
 	publications_valid = false;
-
-	/*
-	 * Also invalidate per-relation cache so that next time the filtering info
-	 * is checked it will be updated with the new publication settings.
-	 */
-	rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
 }
 
 /*
diff --git a/src/include/commands/publicationcmds.h b/src/include/commands/publicationcmds.h
index e11a942ea0..f130ea3090 100644
--- a/src/include/commands/publicationcmds.h
+++ b/src/include/commands/publicationcmds.h
@@ -38,5 +38,7 @@ extern bool pub_contains_invalid_column(Oid pubid, Relation relation,
 										char pubgencols_type,
 										bool *invalid_column_list,
 										bool *invalid_gen_col);
+extern ObjectAddress RenamePublication(const char *oldname, const char *newname);
+extern void InvalidateRelSyncCaches(List *relids);
 
 #endif							/* PUBLICATIONCMDS_H */
-- 
2.43.5

test_r.shapplication/octet-stream; name=test_r.shDownload
#9Amit Kapila
amit.kapila16@gmail.com
In reply to: Hayato Kuroda (Fujitsu) (#8)
Re: Selectively invalidate caches in pgoutput module

On Thu, Mar 6, 2025 at 5:16 PM Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:

Dear Amit, hackers,

Yeah, this is a good improvement and the patch looks safe to me, so I
pushed with minor changes in the comments.

Thanks! PSA rebased patch.

Few comments:
1. Why do we need to invalidate relsync entries when owner of its
publication changes?

I think the owner change will impact the future Alter Publication ...
Add/Drop/Set/Rename operations as that will be allowed only to new
owner (or super users), otherwise, there shouldn't be an impact on
RelSyncCache entries. Am, I missing something?

2.
+ if (pubform->puballtables)
+ {
+ CacheInvalidateRelSyncAll();
+ }
+ else
+ {
+ List    *relids = NIL;
+ List    *schemarelids = NIL;
+
+ /*
+ * For partition table, when we insert data, get_rel_sync_entry is
+ * called and a hash entry is created for the corresponding leaf table.
+ * So invalidating the leaf nodes would be sufficient here.
+ */
+ relids = GetPublicationRelations(pubform->oid,
+ PUBLICATION_PART_LEAF);
+ schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
+ PUBLICATION_PART_LEAF);
+
+ relids = list_concat_unique_oid(relids, schemarelids);
+
+ InvalidateRelSyncCaches(relids);
+ }
+
+ CatalogTupleUpdate(rel, &tup->t_self, tup);

Shouldn't we need to update the CatalogTuple before invalidations.

3.
+ if (pubform->puballtables)
+ {
+ CacheInvalidateRelSyncAll();
+ }
+ else
+ {
+ List    *relids = NIL;
+ List    *schemarelids = NIL;
+
+ /*
+ * For partition table, when we insert data, get_rel_sync_entry is
+ * called and a hash entry is created for the corresponding leaf table.
+ * So invalidating the leaf nodes would be sufficient here.
+ */
+ relids = GetPublicationRelations(pubform->oid,
+ PUBLICATION_PART_LEAF);
+ schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
+ PUBLICATION_PART_LEAF);
+
+ relids = list_concat_unique_oid(relids, schemarelids);
+
+ InvalidateRelSyncCaches(relids);
+ }

This is a duplicate code. Can we write a function to eliminate this duplicacy?

--
With Regards,
Amit Kapila.

#10Hayato Kuroda (Fujitsu)
kuroda.hayato@fujitsu.com
In reply to: Hayato Kuroda (Fujitsu) (#8)
2 attachment(s)
RE: Selectively invalidate caches in pgoutput module

Dear hackers,

Hmm. It requires additional debug messages. Let me consider.

I did further performance testing to see the re-building time, and confirmed that
the decoding time of second INSERT part became around 1/10. In this test number
of partitions was also varied. Each cell is a median of 5 runs.

number of partitions HEAD [micro second] v4 patched [micro second]
--------------------------------------------------------------------------------
1000 6030 635
5000 33456 3555
15000 106688 14049

Workload
======
Workload is mostly the same, but I ensured that step6 is done within a same transaction.
I also attached just in case.

Used source
========
Attached diff file was applied atop HEAD (d611f8) and v4 patchset. Patched source outputs
current timestamp (in micro seconds) when 1) INSERT is decoded, 2) invalidation
happens or 3) WAL processing is finished for the transaction.
Also, it has a ratchet mechanism not to log every time: continuous INSERT or
INVALIDATION is ignored. Typically, the output is like below:

```
postgres=# SELECT count(*) FROM pg_logical_slot_peek_binary_changes('test', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub1');
NOTICE: XXX start inserting. Time 794645780406191
NOTICE: XXX start invalidating. Time 794645780407423
NOTICE: XXX start inserting. Time 794645780407448 // (a)
NOTICE: XXX finish processing. Time 794645780407488 // (b)
count
-------
7
(1 row)
```

How I compared
===========
We can obtain the elapsed time for second INSERT part, by (b) - (a).
I took the calculated data for each runs and noted a median in above table.

Best regards,
Hayato Kuroda
FUJITSU LIMITED

Attachments:

add_measure_points.diffsapplication/octet-stream; name=add_measure_points.diffsDownload
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 5186ad2..8dad183 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2202,6 +2202,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 	ReorderBufferChange *volatile specinsert = NULL;
 	volatile bool stream_started = false;
 	ReorderBufferTXN *volatile curtxn = NULL;
+	bool debug_inserting = false;
 
 	/* build data to be able to lookup the CommandIds of catalog tuples */
 	ReorderBufferBuildTupleCidHash(rb, txn);
@@ -2358,6 +2359,12 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 					if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
 						goto change_done;
 
+					if (!debug_inserting)
+					{
+						debug_inserting = true;
+						elog(NOTICE, "XXX start inserting. Time " UINT64_FORMAT, GetCurrentTimestamp());
+					}
+
 					/* user-triggered change */
 					if (!IsToastRelation(relation))
 					{
@@ -2503,6 +2510,12 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 					break;
 
 				case REORDER_BUFFER_CHANGE_INVALIDATION:
+					if (debug_inserting)
+					{
+						debug_inserting = false;
+						elog(NOTICE, "XXX start invalidating. Time " UINT64_FORMAT, GetCurrentTimestamp());
+					}
+
 					/* Execute the invalidation messages locally */
 					ReorderBufferExecuteInvalidations(change->data.inval.ninvalidations,
 													  change->data.inval.invalidations);
@@ -2760,6 +2773,8 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		}
 	}
 	PG_END_TRY();
+
+	elog(NOTICE, "XXX finish processing. Time " UINT64_FORMAT, GetCurrentTimestamp());
 }
 
 /*
test_r.shapplication/octet-stream; name=test_r.shDownload
#11Hayato Kuroda (Fujitsu)
kuroda.hayato@fujitsu.com
In reply to: Amit Kapila (#9)
2 attachment(s)
RE: Selectively invalidate caches in pgoutput module

Dear Amit,

Thanks for the comment! PSA new version.

Few comments:
1. Why do we need to invalidate relsync entries when owner of its
publication changes?

I think the owner change will impact the future Alter Publication ...
Add/Drop/Set/Rename operations as that will be allowed only to new
owner (or super users), otherwise, there shouldn't be an impact on
RelSyncCache entries. Am, I missing something?

Actually, I did not find reasons to invalidate even OWNER command for now. I
included it to 1) follow current rule, and to 2) prepare for future update.

1) - Currently RelSyncCache entries are discarded even by ALTER PUBLICATION OWNER
statements. I wanted to preserve it.
2) - In future versions, privilege might be introduced for the publications,
some publications would not be visible for other db users. In this case
I feel we should modify RelationSyncEntry::pubactions, columns and
exprstate thus entries must be discarded.

Now I removed invalidations for OWNER command. Let's revert the change if we miss
something.

2.
+ if (pubform->puballtables)
+ {
+ CacheInvalidateRelSyncAll();
+ }
+ else
+ {
+ List    *relids = NIL;
+ List    *schemarelids = NIL;
+
+ /*
+ * For partition table, when we insert data, get_rel_sync_entry is
+ * called and a hash entry is created for the corresponding leaf table.
+ * So invalidating the leaf nodes would be sufficient here.
+ */
+ relids = GetPublicationRelations(pubform->oid,
+ PUBLICATION_PART_LEAF);
+ schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
+ PUBLICATION_PART_LEAF);
+
+ relids = list_concat_unique_oid(relids, schemarelids);
+
+ InvalidateRelSyncCaches(relids);
+ }
+
+ CatalogTupleUpdate(rel, &tup->t_self, tup);

Shouldn't we need to update the CatalogTuple before invalidations.

Right, fixed.

3.
+ if (pubform->puballtables)
+ {
+ CacheInvalidateRelSyncAll();
+ }
+ else
+ {
+ List    *relids = NIL;
+ List    *schemarelids = NIL;
+
+ /*
+ * For partition table, when we insert data, get_rel_sync_entry is
+ * called and a hash entry is created for the corresponding leaf table.
+ * So invalidating the leaf nodes would be sufficient here.
+ */
+ relids = GetPublicationRelations(pubform->oid,
+ PUBLICATION_PART_LEAF);
+ schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
+ PUBLICATION_PART_LEAF);
+
+ relids = list_concat_unique_oid(relids, schemarelids);
+
+ InvalidateRelSyncCaches(relids);
+ }

This is a duplicate code. Can we write a function to eliminate this duplicacy?

Since the part has been removed from OWNER command, duplicacy was removed.
I did not introduce a function for this.

Best regards,
Hayato Kuroda
FUJITSU LIMITED

Attachments:

v5-0001-Introduce-a-new-invalidation-message-to-invalidat.patchapplication/octet-stream; name=v5-0001-Introduce-a-new-invalidation-message-to-invalidat.patchDownload
From 6a30f3fb73624749519c42bcbb7916dc9fcc1184 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Tue, 4 Mar 2025 16:51:19 +0900
Subject: [PATCH v5 1/2] Introduce a new invalidation message to invalidate
 caches in output plugins

A new invalidation message is generated when either ALTER PUBLICATION RENAME TO
or is executed. The primal use-case of the message is to invalidate caches on
the logical decoding output plugin. Plugins can register callback functions for
the message via CacheRegisterRelSyncCallback(), and the function can invalidate
the cache for the specified relation.

A new invalidation message is transactional, and decoder processes should
recognize the message and invalidate specified caches. Thus, the messages are
stored in InvalMessageArray and serialized at the end of the transaction.
---
 src/backend/access/rmgrdesc/standbydesc.c |   2 +
 src/backend/utils/cache/inval.c           | 120 ++++++++++++++++++++++
 src/include/storage/sinval.h              |  11 ++
 src/include/utils/inval.h                 |  10 ++
 4 files changed, 143 insertions(+)

diff --git a/src/backend/access/rmgrdesc/standbydesc.c b/src/backend/access/rmgrdesc/standbydesc.c
index d849f8e54b..81eff5f31c 100644
--- a/src/backend/access/rmgrdesc/standbydesc.c
+++ b/src/backend/access/rmgrdesc/standbydesc.c
@@ -132,6 +132,8 @@ standby_desc_invalidations(StringInfo buf,
 			appendStringInfo(buf, " relmap db %u", msg->rm.dbId);
 		else if (msg->id == SHAREDINVALSNAPSHOT_ID)
 			appendStringInfo(buf, " snapshot %u", msg->sn.relId);
+		else if (msg->id == SHAREDINVALRELSYNC_ID)
+			appendStringInfo(buf, " relsync %u", msg->rs.relid);
 		else
 			appendStringInfo(buf, " unrecognized id %d", msg->id);
 	}
diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c
index 700ccb6df9..acc3325b27 100644
--- a/src/backend/utils/cache/inval.c
+++ b/src/backend/utils/cache/inval.c
@@ -271,6 +271,7 @@ int			debug_discard_caches = 0;
 
 #define MAX_SYSCACHE_CALLBACKS 64
 #define MAX_RELCACHE_CALLBACKS 10
+#define MAX_RELSYNC_CALLBACKS 10
 
 static struct SYSCACHECALLBACK
 {
@@ -292,6 +293,15 @@ static struct RELCACHECALLBACK
 
 static int	relcache_callback_count = 0;
 
+static struct RELSYNCCALLBACK
+{
+	RelSyncCallbackFunction function;
+	Datum		arg;
+}			relsync_callback_list[MAX_RELSYNC_CALLBACKS];
+
+static int	relsync_callback_count = 0;
+
+
 /* ----------------------------------------------------------------
  *				Invalidation subgroup support functions
  * ----------------------------------------------------------------
@@ -484,6 +494,34 @@ AddRelcacheInvalidationMessage(InvalidationMsgsGroup *group,
 	AddInvalidationMessage(group, RelCacheMsgs, &msg);
 }
 
+/*
+ * Add a relsync inval entry
+ *
+ * We put these into the relcache subgroup for simplicity.
+ */
+static void
+AddRelsyncInvalidationMessage(InvalidationMsgsGroup *group,
+							  Oid dbId, Oid relId)
+{
+	SharedInvalidationMessage msg;
+
+	/* Don't add a duplicate item */
+	/* We assume dbId need not be checked because it will never change */
+	ProcessMessageSubGroup(group, RelCacheMsgs,
+						   if (msg->rc.id == SHAREDINVALRELSYNC_ID &&
+							   msg->rc.relId == relId)
+						   return);
+
+	/* OK, add the item */
+	msg.rc.id = SHAREDINVALRELSYNC_ID;
+	msg.rc.dbId = dbId;
+	msg.rc.relId = relId;
+	/* check AddCatcacheInvalidationMessage() for an explanation */
+	VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg));
+
+	AddInvalidationMessage(group, RelCacheMsgs, &msg);
+}
+
 /*
  * Add a snapshot inval entry
  *
@@ -611,6 +649,18 @@ RegisterRelcacheInvalidation(InvalidationInfo *info, Oid dbId, Oid relId)
 		info->RelcacheInitFileInval = true;
 }
 
+/*
+ * RegisterRelcacheInvalidation
+ *
+ * As above, but register a relsync invalidation event.
+ */
+static void
+RegisterRelsyncInvalidation(InvalidationInfo *info, Oid dbId, Oid relId)
+{
+	AddRelsyncInvalidationMessage(&info->CurrentCmdInvalidMsgs, dbId, relId);
+}
+
+
 /*
  * RegisterSnapshotInvalidation
  *
@@ -832,6 +882,12 @@ LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
 		else if (msg->sn.dbId == MyDatabaseId)
 			InvalidateCatalogSnapshot();
 	}
+	else if (msg->id == SHAREDINVALRELSYNC_ID)
+	{
+		/* We only care about our own database */
+		if (msg->rs.dbId == MyDatabaseId)
+			CallRelSyncCallbacks(msg->rs.relid);
+	}
 	else
 		elog(FATAL, "unrecognized SI message ID: %d", msg->id);
 }
@@ -1622,6 +1678,35 @@ CacheInvalidateRelcacheByRelid(Oid relid)
 }
 
 
+/*
+ * RelationCacheInvalidate
+ *		Register invalidation of the cache in logical decoding output plugin
+ *		for a database.
+ *
+ * This type of invalidation message is used for the specific purpose of output
+ * plugins. Processes which do not decode WALs would do nothing even when it
+ * receives the message.
+ */
+void
+CacheInvalidateRelSync(Oid relid)
+{
+	RegisterRelsyncInvalidation(PrepareInvalidationState(),
+								MyDatabaseId, relid);
+}
+
+
+/*
+ * CacheInvalidateRelSyncAll
+ *		Register invalidation of the whole cache in logical decoding output
+ *		plugin.
+ */
+void
+CacheInvalidateRelSyncAll(void)
+{
+	CacheInvalidateRelSync(InvalidOid);
+}
+
+
 /*
  * CacheInvalidateSmgr
  *		Register invalidation of smgr references to a physical relation.
@@ -1763,6 +1848,27 @@ CacheRegisterRelcacheCallback(RelcacheCallbackFunction func,
 	++relcache_callback_count;
 }
 
+/*
+ * CacheRegisterRelSyncCallback
+ *		Register the specified function to be called for all future
+ *		decoding-cache invalidation events.
+ *
+ * This function is intended to be call from the logical decoding output
+ * plugins.
+ */
+void
+CacheRegisterRelSyncCallback(RelSyncCallbackFunction func,
+							 Datum arg)
+{
+	if (relsync_callback_count >= MAX_RELSYNC_CALLBACKS)
+		elog(FATAL, "out of relsync_callback_list slots");
+
+	relsync_callback_list[relsync_callback_count].function = func;
+	relsync_callback_list[relsync_callback_count].arg = arg;
+
+	++relsync_callback_count;
+}
+
 /*
  * CallSyscacheCallbacks
  *
@@ -1788,6 +1894,20 @@ CallSyscacheCallbacks(int cacheid, uint32 hashvalue)
 	}
 }
 
+/*
+ * CallSyscacheCallbacks
+ */
+void
+CallRelSyncCallbacks(Oid relid)
+{
+	for (int i = 0; i < relsync_callback_count; i++)
+	{
+		struct RELSYNCCALLBACK *ccitem = relsync_callback_list + i;
+
+		ccitem->function(ccitem->arg, relid);
+	}
+}
+
 /*
  * LogLogicalInvalidations
  *
diff --git a/src/include/storage/sinval.h b/src/include/storage/sinval.h
index 2463c0f9fa..90a5af4ed8 100644
--- a/src/include/storage/sinval.h
+++ b/src/include/storage/sinval.h
@@ -27,6 +27,7 @@
  *	* invalidate an smgr cache entry for a specific physical relation
  *	* invalidate the mapped-relation mapping for a given database
  *	* invalidate any saved snapshot that might be used to scan a given relation
+ *	* invalidate a specific entry for specific output plugin
  * More types could be added if needed.  The message type is identified by
  * the first "int8" field of the message struct.  Zero or positive means a
  * specific-catcache inval message (and also serves as the catcache ID field).
@@ -110,6 +111,15 @@ typedef struct
 	Oid			relId;			/* relation ID */
 } SharedInvalSnapshotMsg;
 
+#define SHAREDINVALRELSYNC_ID	(-6)
+
+typedef struct
+{
+	int8		id;				/* type field --- must be first */
+	Oid			dbId;			/* database ID */
+	Oid			relid;			/* relation ID, or 0 if whole relcache */
+} SharedInvalRelSyncMsg;
+
 typedef union
 {
 	int8		id;				/* type field --- must be first */
@@ -119,6 +129,7 @@ typedef union
 	SharedInvalSmgrMsg sm;
 	SharedInvalRelmapMsg rm;
 	SharedInvalSnapshotMsg sn;
+	SharedInvalRelSyncMsg rs;
 } SharedInvalidationMessage;
 
 
diff --git a/src/include/utils/inval.h b/src/include/utils/inval.h
index 40658ba2ff..9b871caef6 100644
--- a/src/include/utils/inval.h
+++ b/src/include/utils/inval.h
@@ -22,6 +22,7 @@ extern PGDLLIMPORT int debug_discard_caches;
 
 typedef void (*SyscacheCallbackFunction) (Datum arg, int cacheid, uint32 hashvalue);
 typedef void (*RelcacheCallbackFunction) (Datum arg, Oid relid);
+typedef void (*RelSyncCallbackFunction) (Datum arg, Oid relid);
 
 
 extern void AcceptInvalidationMessages(void);
@@ -55,6 +56,10 @@ extern void CacheInvalidateRelcacheByTuple(HeapTuple classTuple);
 
 extern void CacheInvalidateRelcacheByRelid(Oid relid);
 
+extern void CacheInvalidateRelSync(Oid relid);
+
+extern void CacheInvalidateRelSyncAll(void);
+
 extern void CacheInvalidateSmgr(RelFileLocatorBackend rlocator);
 
 extern void CacheInvalidateRelmap(Oid databaseId);
@@ -66,8 +71,13 @@ extern void CacheRegisterSyscacheCallback(int cacheid,
 extern void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func,
 										  Datum arg);
 
+extern void CacheRegisterRelSyncCallback(RelSyncCallbackFunction func,
+										 Datum arg);
+
 extern void CallSyscacheCallbacks(int cacheid, uint32 hashvalue);
 
+extern void CallRelSyncCallbacks(Oid relid);
+
 extern void InvalidateSystemCaches(void);
 extern void InvalidateSystemCachesExtended(bool debug_discard);
 
-- 
2.43.5

v5-0002-Invalidate-Relcaches-while-ALTER-PUBLICATION-RENA.patchapplication/octet-stream; name=v5-0002-Invalidate-Relcaches-while-ALTER-PUBLICATION-RENA.patchDownload
From 7f6dbeee67277d021c4ee6c921d0b5b397210f5f Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Mon, 3 Mar 2025 19:41:04 +0900
Subject: [PATCH v5 2/2] Invalidate Relcaches while ALTER PUBLICATION RENAME TO

---
 src/backend/commands/alter.c                |   4 +-
 src/backend/commands/publicationcmds.c      | 109 ++++++++++++++++++++
 src/backend/parser/gram.y                   |   2 +-
 src/backend/replication/pgoutput/pgoutput.c |   8 +-
 src/include/commands/publicationcmds.h      |   2 +
 5 files changed, 117 insertions(+), 8 deletions(-)

diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c
index 78c1d4e1b8..a79329acc1 100644
--- a/src/backend/commands/alter.c
+++ b/src/backend/commands/alter.c
@@ -400,6 +400,9 @@ ExecRenameStmt(RenameStmt *stmt)
 		case OBJECT_TYPE:
 			return RenameType(stmt);
 
+		case OBJECT_PUBLICATION:
+			return RenamePublication(stmt->subname, stmt->newname);
+
 		case OBJECT_AGGREGATE:
 		case OBJECT_COLLATION:
 		case OBJECT_CONVERSION:
@@ -417,7 +420,6 @@ ExecRenameStmt(RenameStmt *stmt)
 		case OBJECT_TSDICTIONARY:
 		case OBJECT_TSPARSER:
 		case OBJECT_TSTEMPLATE:
-		case OBJECT_PUBLICATION:
 		case OBJECT_SUBSCRIPTION:
 			{
 				ObjectAddress address;
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 150a768d16..139971e8f9 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -491,6 +491,96 @@ pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
 	return *invalid_column_list || *invalid_gen_col;
 }
 
+/*
+ * Execute ALTER PUBLICATION RENAME
+ */
+ObjectAddress
+RenamePublication(const char *oldname, const char *newname)
+{
+	Relation			rel;
+	HeapTuple			tup;
+	ObjectAddress		address;
+	Form_pg_publication	pubform;
+	bool				replaces[Natts_pg_publication];
+	bool				nulls[Natts_pg_publication];
+	Datum				values[Natts_pg_publication];
+
+	rel = table_open(PublicationRelationId, RowExclusiveLock);
+
+	tup = SearchSysCacheCopy1(PUBLICATIONNAME,
+							  CStringGetDatum(oldname));
+
+	if (!HeapTupleIsValid(tup))
+		ereport(ERROR,
+				(errcode(ERRCODE_UNDEFINED_OBJECT),
+				 errmsg("publication \"%s\" does not exist",
+						oldname)));
+
+	pubform = (Form_pg_publication) GETSTRUCT(tup);
+
+	/* must be owner */
+	if (!object_ownercheck(PublicationRelationId, pubform->oid, GetUserId()))
+		aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
+					   NameStr(pubform->pubname));
+
+	/* Everything ok, form a new tuple. */
+	memset(values, 0, sizeof(values));
+	memset(nulls, false, sizeof(nulls));
+	memset(replaces, false, sizeof(replaces));
+
+	/* Only update the pubname */
+	values[Anum_pg_publication_pubname - 1] =
+		DirectFunctionCall1(namein, CStringGetDatum(newname));
+	replaces[Anum_pg_publication_pubname - 1] = true;
+
+	tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+							replaces);
+
+	/* Update the catalog. */
+	CatalogTupleUpdate(rel, &tup->t_self, tup);
+
+	/*
+	 * Invalidate caches on the logical decoding output plugin.
+	 *
+	 * Apart from the ALTER PUBLICATION ADD/SET/DROP commands, we do not have
+	 * to invalidate relcaches. They are required to refresh the publication's
+	 * descriptor. The publication's name is not recorded in the attribute, so
+	 * the RENAME commands do not require a refresh. Instead, we only
+	 * invalidate a cache on the output plugin to rebuild its cache.
+	 */
+	if (pubform->puballtables)
+	{
+		CacheInvalidateRelSyncAll();
+	}
+	else
+	{
+		List	   *relids = NIL;
+		List	   *schemarelids = NIL;
+
+		/*
+		 * For partition table, when we insert data, get_rel_sync_entry is
+		 * called and a hash entry is created for the corresponding leaf table.
+		 * So invalidating the leaf nodes would be sufficient here.
+		 */
+		relids = GetPublicationRelations(pubform->oid,
+										 PUBLICATION_PART_LEAF);
+		schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
+														PUBLICATION_PART_LEAF);
+
+		relids = list_concat_unique_oid(relids, schemarelids);
+
+		InvalidateRelSyncCaches(relids);
+	}
+
+	ObjectAddressSet(address, PublicationRelationId, pubform->oid);
+
+	heap_freetuple(tup);
+
+	table_close(rel, RowExclusiveLock);
+
+	return address;
+}
+
 /* check_functions_in_node callback */
 static bool
 contain_mutable_or_user_functions_checker(Oid func_id, void *context)
@@ -2096,3 +2186,22 @@ defGetGeneratedColsOption(DefElem *def)
 
 	return PUBLISH_GENCOLS_NONE;	/* keep compiler quiet */
 }
+
+
+void
+InvalidateRelSyncCaches(List *relids)
+{
+	/*
+	 * We don't want to send too many individual messages, at some point it's
+	 * cheaper to just reset whole relcache.
+	 */
+	if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
+	{
+		ListCell   *lc;
+
+		foreach(lc, relids)
+			CacheInvalidateRelSync(lfirst_oid(lc));
+	}
+	else
+		CacheInvalidateRelSync(InvalidOid);
+}
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 271ae26cba..0fb4041f15 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -9541,7 +9541,7 @@ RenameStmt: ALTER AGGREGATE aggregate_with_argtypes RENAME TO name
 					RenameStmt *n = makeNode(RenameStmt);
 
 					n->renameType = OBJECT_PUBLICATION;
-					n->object = (Node *) makeString($3);
+					n->subname = $3;
 					n->newname = $6;
 					n->missing_ok = false;
 					$$ = (Node *) n;
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 9063af6e1d..ed806c5430 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -531,6 +531,8 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 			CacheRegisterSyscacheCallback(PUBLICATIONOID,
 										  publication_invalidation_cb,
 										  (Datum) 0);
+			CacheRegisterRelSyncCallback(rel_sync_cache_relation_cb,
+										 (Datum) 0);
 			publication_callback_registered = true;
 		}
 
@@ -1789,12 +1791,6 @@ static void
 publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
 {
 	publications_valid = false;
-
-	/*
-	 * Also invalidate per-relation cache so that next time the filtering info
-	 * is checked it will be updated with the new publication settings.
-	 */
-	rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
 }
 
 /*
diff --git a/src/include/commands/publicationcmds.h b/src/include/commands/publicationcmds.h
index e11a942ea0..f130ea3090 100644
--- a/src/include/commands/publicationcmds.h
+++ b/src/include/commands/publicationcmds.h
@@ -38,5 +38,7 @@ extern bool pub_contains_invalid_column(Oid pubid, Relation relation,
 										char pubgencols_type,
 										bool *invalid_column_list,
 										bool *invalid_gen_col);
+extern ObjectAddress RenamePublication(const char *oldname, const char *newname);
+extern void InvalidateRelSyncCaches(List *relids);
 
 #endif							/* PUBLICATIONCMDS_H */
-- 
2.43.5

#12Amit Kapila
amit.kapila16@gmail.com
In reply to: Hayato Kuroda (Fujitsu) (#11)
Re: Selectively invalidate caches in pgoutput module

On Fri, Mar 7, 2025 at 4:28 PM Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:

Dear Amit,

Thanks for the comment! PSA new version.

Don't we need to call this invalidation function from
InvalidateSystemCachesExtended()?

--
With Regards,
Amit Kapila.

#13Hayato Kuroda (Fujitsu)
kuroda.hayato@fujitsu.com
In reply to: Amit Kapila (#12)
2 attachment(s)
RE: Selectively invalidate caches in pgoutput module

Dear Amit,

Don't we need to call this invalidation function from
InvalidateSystemCachesExtended()?

Hmm. I did not call relsync callback functions in InvalidateSystemCachesExtended()
intentionally, but added now. Please see my analysis below and let me know how
you think.

In the first place, InvalidateSystemCachesExtended() can be called by
1) InvalidateSystemCaches(), and 2) AcceptInvalidationMessages().

Regarding the 1), it could be used when a) the parallel worker is launched,
b) decoder starts decoding, or c) decoder finishes decoding and after decoding context is free'd.
However, parallel worker won't do a decoding, initially the relsync cache is empty
and relsync would be dropped when the decding context is removed.
Based on the fact I did not called the function.

As for the 2), the path exists only when the debug build mode and debug_discard_caches is set.
According to the manual and code comments, it must discard all caches anytime.
So...OK, I decided to follow the rule.

Best regards,
Hayato Kuroda
FUJITSU LIMITED

Attachments:

v6-0001-Introduce-a-new-invalidation-message-to-invalidat.patchapplication/octet-stream; name=v6-0001-Introduce-a-new-invalidation-message-to-invalidat.patchDownload
From 3de3e445502735fc396650c7a84cad7c6fdc8d58 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Tue, 4 Mar 2025 16:51:19 +0900
Subject: [PATCH v6 1/2] Introduce a new invalidation message to invalidate
 caches in output plugins

A new invalidation message is generated when either ALTER PUBLICATION RENAME TO
or is executed. The primal use-case of the message is to invalidate caches on
the logical decoding output plugin. Plugins can register callback functions for
the message via CacheRegisterRelSyncCallback(), and the function can invalidate
the cache for the specified relation.

A new invalidation message is transactional, and decoder processes should
recognize the message and invalidate specified caches. Thus, the messages are
stored in InvalMessageArray and serialized at the end of the transaction.
---
 src/backend/access/rmgrdesc/standbydesc.c |   2 +
 src/backend/utils/cache/inval.c           | 127 ++++++++++++++++++++++
 src/include/pg_config_manual.h            |   8 +-
 src/include/storage/sinval.h              |  11 ++
 src/include/utils/inval.h                 |  10 ++
 5 files changed, 154 insertions(+), 4 deletions(-)

diff --git a/src/backend/access/rmgrdesc/standbydesc.c b/src/backend/access/rmgrdesc/standbydesc.c
index d849f8e54b..81eff5f31c 100644
--- a/src/backend/access/rmgrdesc/standbydesc.c
+++ b/src/backend/access/rmgrdesc/standbydesc.c
@@ -132,6 +132,8 @@ standby_desc_invalidations(StringInfo buf,
 			appendStringInfo(buf, " relmap db %u", msg->rm.dbId);
 		else if (msg->id == SHAREDINVALSNAPSHOT_ID)
 			appendStringInfo(buf, " snapshot %u", msg->sn.relId);
+		else if (msg->id == SHAREDINVALRELSYNC_ID)
+			appendStringInfo(buf, " relsync %u", msg->rs.relid);
 		else
 			appendStringInfo(buf, " unrecognized id %d", msg->id);
 	}
diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c
index 700ccb6df9..812c5db36a 100644
--- a/src/backend/utils/cache/inval.c
+++ b/src/backend/utils/cache/inval.c
@@ -271,6 +271,7 @@ int			debug_discard_caches = 0;
 
 #define MAX_SYSCACHE_CALLBACKS 64
 #define MAX_RELCACHE_CALLBACKS 10
+#define MAX_RELSYNC_CALLBACKS 10
 
 static struct SYSCACHECALLBACK
 {
@@ -292,6 +293,15 @@ static struct RELCACHECALLBACK
 
 static int	relcache_callback_count = 0;
 
+static struct RELSYNCCALLBACK
+{
+	RelSyncCallbackFunction function;
+	Datum		arg;
+}			relsync_callback_list[MAX_RELSYNC_CALLBACKS];
+
+static int	relsync_callback_count = 0;
+
+
 /* ----------------------------------------------------------------
  *				Invalidation subgroup support functions
  * ----------------------------------------------------------------
@@ -484,6 +494,34 @@ AddRelcacheInvalidationMessage(InvalidationMsgsGroup *group,
 	AddInvalidationMessage(group, RelCacheMsgs, &msg);
 }
 
+/*
+ * Add a relsync inval entry
+ *
+ * We put these into the relcache subgroup for simplicity.
+ */
+static void
+AddRelsyncInvalidationMessage(InvalidationMsgsGroup *group,
+							  Oid dbId, Oid relId)
+{
+	SharedInvalidationMessage msg;
+
+	/* Don't add a duplicate item */
+	/* We assume dbId need not be checked because it will never change */
+	ProcessMessageSubGroup(group, RelCacheMsgs,
+						   if (msg->rc.id == SHAREDINVALRELSYNC_ID &&
+							   msg->rc.relId == relId)
+						   return);
+
+	/* OK, add the item */
+	msg.rc.id = SHAREDINVALRELSYNC_ID;
+	msg.rc.dbId = dbId;
+	msg.rc.relId = relId;
+	/* check AddCatcacheInvalidationMessage() for an explanation */
+	VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg));
+
+	AddInvalidationMessage(group, RelCacheMsgs, &msg);
+}
+
 /*
  * Add a snapshot inval entry
  *
@@ -611,6 +649,18 @@ RegisterRelcacheInvalidation(InvalidationInfo *info, Oid dbId, Oid relId)
 		info->RelcacheInitFileInval = true;
 }
 
+/*
+ * RegisterRelcacheInvalidation
+ *
+ * As above, but register a relsync invalidation event.
+ */
+static void
+RegisterRelsyncInvalidation(InvalidationInfo *info, Oid dbId, Oid relId)
+{
+	AddRelsyncInvalidationMessage(&info->CurrentCmdInvalidMsgs, dbId, relId);
+}
+
+
 /*
  * RegisterSnapshotInvalidation
  *
@@ -751,6 +801,13 @@ InvalidateSystemCachesExtended(bool debug_discard)
 
 		ccitem->function(ccitem->arg, InvalidOid);
 	}
+
+	for (int i = 0; i < relsync_callback_count; i++)
+	{
+		struct RELSYNCCALLBACK *ccitem = relsync_callback_list + i;
+
+		ccitem->function(ccitem->arg, InvalidOid);
+	}
 }
 
 /*
@@ -832,6 +889,12 @@ LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
 		else if (msg->sn.dbId == MyDatabaseId)
 			InvalidateCatalogSnapshot();
 	}
+	else if (msg->id == SHAREDINVALRELSYNC_ID)
+	{
+		/* We only care about our own database */
+		if (msg->rs.dbId == MyDatabaseId)
+			CallRelSyncCallbacks(msg->rs.relid);
+	}
 	else
 		elog(FATAL, "unrecognized SI message ID: %d", msg->id);
 }
@@ -1622,6 +1685,35 @@ CacheInvalidateRelcacheByRelid(Oid relid)
 }
 
 
+/*
+ * RelationCacheInvalidate
+ *		Register invalidation of the cache in logical decoding output plugin
+ *		for a database.
+ *
+ * This type of invalidation message is used for the specific purpose of output
+ * plugins. Processes which do not decode WALs would do nothing even when it
+ * receives the message.
+ */
+void
+CacheInvalidateRelSync(Oid relid)
+{
+	RegisterRelsyncInvalidation(PrepareInvalidationState(),
+								MyDatabaseId, relid);
+}
+
+
+/*
+ * CacheInvalidateRelSyncAll
+ *		Register invalidation of the whole cache in logical decoding output
+ *		plugin.
+ */
+void
+CacheInvalidateRelSyncAll(void)
+{
+	CacheInvalidateRelSync(InvalidOid);
+}
+
+
 /*
  * CacheInvalidateSmgr
  *		Register invalidation of smgr references to a physical relation.
@@ -1763,6 +1855,27 @@ CacheRegisterRelcacheCallback(RelcacheCallbackFunction func,
 	++relcache_callback_count;
 }
 
+/*
+ * CacheRegisterRelSyncCallback
+ *		Register the specified function to be called for all future
+ *		decoding-cache invalidation events.
+ *
+ * This function is intended to be call from the logical decoding output
+ * plugins.
+ */
+void
+CacheRegisterRelSyncCallback(RelSyncCallbackFunction func,
+							 Datum arg)
+{
+	if (relsync_callback_count >= MAX_RELSYNC_CALLBACKS)
+		elog(FATAL, "out of relsync_callback_list slots");
+
+	relsync_callback_list[relsync_callback_count].function = func;
+	relsync_callback_list[relsync_callback_count].arg = arg;
+
+	++relsync_callback_count;
+}
+
 /*
  * CallSyscacheCallbacks
  *
@@ -1788,6 +1901,20 @@ CallSyscacheCallbacks(int cacheid, uint32 hashvalue)
 	}
 }
 
+/*
+ * CallSyscacheCallbacks
+ */
+void
+CallRelSyncCallbacks(Oid relid)
+{
+	for (int i = 0; i < relsync_callback_count; i++)
+	{
+		struct RELSYNCCALLBACK *ccitem = relsync_callback_list + i;
+
+		ccitem->function(ccitem->arg, relid);
+	}
+}
+
 /*
  * LogLogicalInvalidations
  *
diff --git a/src/include/pg_config_manual.h b/src/include/pg_config_manual.h
index 449e50bd78..23308f1de1 100644
--- a/src/include/pg_config_manual.h
+++ b/src/include/pg_config_manual.h
@@ -282,10 +282,10 @@
 
 /*
  * For cache-invalidation debugging, define DISCARD_CACHES_ENABLED to enable
- * use of the debug_discard_caches GUC to aggressively flush syscache/relcache
- * entries whenever it's possible to deliver invalidations.  See
- * AcceptInvalidationMessages() in src/backend/utils/cache/inval.c for
- * details.
+ * use of the debug_discard_caches GUC to aggressively flush
+ * syscache/relcache/relsync cache entries whenever it's possible to deliver
+ * invalidations.  See AcceptInvalidationMessages() in
+ * src/backend/utils/cache/inval.c for details.
  *
  * USE_ASSERT_CHECKING builds default to enabling this.  It's possible to use
  * DISCARD_CACHES_ENABLED without a cassert build and the implied
diff --git a/src/include/storage/sinval.h b/src/include/storage/sinval.h
index 2463c0f9fa..90a5af4ed8 100644
--- a/src/include/storage/sinval.h
+++ b/src/include/storage/sinval.h
@@ -27,6 +27,7 @@
  *	* invalidate an smgr cache entry for a specific physical relation
  *	* invalidate the mapped-relation mapping for a given database
  *	* invalidate any saved snapshot that might be used to scan a given relation
+ *	* invalidate a specific entry for specific output plugin
  * More types could be added if needed.  The message type is identified by
  * the first "int8" field of the message struct.  Zero or positive means a
  * specific-catcache inval message (and also serves as the catcache ID field).
@@ -110,6 +111,15 @@ typedef struct
 	Oid			relId;			/* relation ID */
 } SharedInvalSnapshotMsg;
 
+#define SHAREDINVALRELSYNC_ID	(-6)
+
+typedef struct
+{
+	int8		id;				/* type field --- must be first */
+	Oid			dbId;			/* database ID */
+	Oid			relid;			/* relation ID, or 0 if whole relcache */
+} SharedInvalRelSyncMsg;
+
 typedef union
 {
 	int8		id;				/* type field --- must be first */
@@ -119,6 +129,7 @@ typedef union
 	SharedInvalSmgrMsg sm;
 	SharedInvalRelmapMsg rm;
 	SharedInvalSnapshotMsg sn;
+	SharedInvalRelSyncMsg rs;
 } SharedInvalidationMessage;
 
 
diff --git a/src/include/utils/inval.h b/src/include/utils/inval.h
index 40658ba2ff..9b871caef6 100644
--- a/src/include/utils/inval.h
+++ b/src/include/utils/inval.h
@@ -22,6 +22,7 @@ extern PGDLLIMPORT int debug_discard_caches;
 
 typedef void (*SyscacheCallbackFunction) (Datum arg, int cacheid, uint32 hashvalue);
 typedef void (*RelcacheCallbackFunction) (Datum arg, Oid relid);
+typedef void (*RelSyncCallbackFunction) (Datum arg, Oid relid);
 
 
 extern void AcceptInvalidationMessages(void);
@@ -55,6 +56,10 @@ extern void CacheInvalidateRelcacheByTuple(HeapTuple classTuple);
 
 extern void CacheInvalidateRelcacheByRelid(Oid relid);
 
+extern void CacheInvalidateRelSync(Oid relid);
+
+extern void CacheInvalidateRelSyncAll(void);
+
 extern void CacheInvalidateSmgr(RelFileLocatorBackend rlocator);
 
 extern void CacheInvalidateRelmap(Oid databaseId);
@@ -66,8 +71,13 @@ extern void CacheRegisterSyscacheCallback(int cacheid,
 extern void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func,
 										  Datum arg);
 
+extern void CacheRegisterRelSyncCallback(RelSyncCallbackFunction func,
+										 Datum arg);
+
 extern void CallSyscacheCallbacks(int cacheid, uint32 hashvalue);
 
+extern void CallRelSyncCallbacks(Oid relid);
+
 extern void InvalidateSystemCaches(void);
 extern void InvalidateSystemCachesExtended(bool debug_discard);
 
-- 
2.43.5

v6-0002-Invalidate-Relcaches-while-ALTER-PUBLICATION-RENA.patchapplication/octet-stream; name=v6-0002-Invalidate-Relcaches-while-ALTER-PUBLICATION-RENA.patchDownload
From 9fdabef17e930bdd92a971aa1f271f4fae96b611 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Mon, 3 Mar 2025 19:41:04 +0900
Subject: [PATCH v6 2/2] Invalidate Relcaches while ALTER PUBLICATION RENAME TO

---
 src/backend/commands/alter.c                |   4 +-
 src/backend/commands/publicationcmds.c      | 109 ++++++++++++++++++++
 src/backend/parser/gram.y                   |   2 +-
 src/backend/replication/pgoutput/pgoutput.c |   8 +-
 src/include/commands/publicationcmds.h      |   2 +
 5 files changed, 117 insertions(+), 8 deletions(-)

diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c
index 78c1d4e1b8..a79329acc1 100644
--- a/src/backend/commands/alter.c
+++ b/src/backend/commands/alter.c
@@ -400,6 +400,9 @@ ExecRenameStmt(RenameStmt *stmt)
 		case OBJECT_TYPE:
 			return RenameType(stmt);
 
+		case OBJECT_PUBLICATION:
+			return RenamePublication(stmt->subname, stmt->newname);
+
 		case OBJECT_AGGREGATE:
 		case OBJECT_COLLATION:
 		case OBJECT_CONVERSION:
@@ -417,7 +420,6 @@ ExecRenameStmt(RenameStmt *stmt)
 		case OBJECT_TSDICTIONARY:
 		case OBJECT_TSPARSER:
 		case OBJECT_TSTEMPLATE:
-		case OBJECT_PUBLICATION:
 		case OBJECT_SUBSCRIPTION:
 			{
 				ObjectAddress address;
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 150a768d16..139971e8f9 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -491,6 +491,96 @@ pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
 	return *invalid_column_list || *invalid_gen_col;
 }
 
+/*
+ * Execute ALTER PUBLICATION RENAME
+ */
+ObjectAddress
+RenamePublication(const char *oldname, const char *newname)
+{
+	Relation			rel;
+	HeapTuple			tup;
+	ObjectAddress		address;
+	Form_pg_publication	pubform;
+	bool				replaces[Natts_pg_publication];
+	bool				nulls[Natts_pg_publication];
+	Datum				values[Natts_pg_publication];
+
+	rel = table_open(PublicationRelationId, RowExclusiveLock);
+
+	tup = SearchSysCacheCopy1(PUBLICATIONNAME,
+							  CStringGetDatum(oldname));
+
+	if (!HeapTupleIsValid(tup))
+		ereport(ERROR,
+				(errcode(ERRCODE_UNDEFINED_OBJECT),
+				 errmsg("publication \"%s\" does not exist",
+						oldname)));
+
+	pubform = (Form_pg_publication) GETSTRUCT(tup);
+
+	/* must be owner */
+	if (!object_ownercheck(PublicationRelationId, pubform->oid, GetUserId()))
+		aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
+					   NameStr(pubform->pubname));
+
+	/* Everything ok, form a new tuple. */
+	memset(values, 0, sizeof(values));
+	memset(nulls, false, sizeof(nulls));
+	memset(replaces, false, sizeof(replaces));
+
+	/* Only update the pubname */
+	values[Anum_pg_publication_pubname - 1] =
+		DirectFunctionCall1(namein, CStringGetDatum(newname));
+	replaces[Anum_pg_publication_pubname - 1] = true;
+
+	tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+							replaces);
+
+	/* Update the catalog. */
+	CatalogTupleUpdate(rel, &tup->t_self, tup);
+
+	/*
+	 * Invalidate caches on the logical decoding output plugin.
+	 *
+	 * Apart from the ALTER PUBLICATION ADD/SET/DROP commands, we do not have
+	 * to invalidate relcaches. They are required to refresh the publication's
+	 * descriptor. The publication's name is not recorded in the attribute, so
+	 * the RENAME commands do not require a refresh. Instead, we only
+	 * invalidate a cache on the output plugin to rebuild its cache.
+	 */
+	if (pubform->puballtables)
+	{
+		CacheInvalidateRelSyncAll();
+	}
+	else
+	{
+		List	   *relids = NIL;
+		List	   *schemarelids = NIL;
+
+		/*
+		 * For partition table, when we insert data, get_rel_sync_entry is
+		 * called and a hash entry is created for the corresponding leaf table.
+		 * So invalidating the leaf nodes would be sufficient here.
+		 */
+		relids = GetPublicationRelations(pubform->oid,
+										 PUBLICATION_PART_LEAF);
+		schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
+														PUBLICATION_PART_LEAF);
+
+		relids = list_concat_unique_oid(relids, schemarelids);
+
+		InvalidateRelSyncCaches(relids);
+	}
+
+	ObjectAddressSet(address, PublicationRelationId, pubform->oid);
+
+	heap_freetuple(tup);
+
+	table_close(rel, RowExclusiveLock);
+
+	return address;
+}
+
 /* check_functions_in_node callback */
 static bool
 contain_mutable_or_user_functions_checker(Oid func_id, void *context)
@@ -2096,3 +2186,22 @@ defGetGeneratedColsOption(DefElem *def)
 
 	return PUBLISH_GENCOLS_NONE;	/* keep compiler quiet */
 }
+
+
+void
+InvalidateRelSyncCaches(List *relids)
+{
+	/*
+	 * We don't want to send too many individual messages, at some point it's
+	 * cheaper to just reset whole relcache.
+	 */
+	if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
+	{
+		ListCell   *lc;
+
+		foreach(lc, relids)
+			CacheInvalidateRelSync(lfirst_oid(lc));
+	}
+	else
+		CacheInvalidateRelSync(InvalidOid);
+}
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 271ae26cba..0fb4041f15 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -9541,7 +9541,7 @@ RenameStmt: ALTER AGGREGATE aggregate_with_argtypes RENAME TO name
 					RenameStmt *n = makeNode(RenameStmt);
 
 					n->renameType = OBJECT_PUBLICATION;
-					n->object = (Node *) makeString($3);
+					n->subname = $3;
 					n->newname = $6;
 					n->missing_ok = false;
 					$$ = (Node *) n;
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 9063af6e1d..ed806c5430 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -531,6 +531,8 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 			CacheRegisterSyscacheCallback(PUBLICATIONOID,
 										  publication_invalidation_cb,
 										  (Datum) 0);
+			CacheRegisterRelSyncCallback(rel_sync_cache_relation_cb,
+										 (Datum) 0);
 			publication_callback_registered = true;
 		}
 
@@ -1789,12 +1791,6 @@ static void
 publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
 {
 	publications_valid = false;
-
-	/*
-	 * Also invalidate per-relation cache so that next time the filtering info
-	 * is checked it will be updated with the new publication settings.
-	 */
-	rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
 }
 
 /*
diff --git a/src/include/commands/publicationcmds.h b/src/include/commands/publicationcmds.h
index e11a942ea0..f130ea3090 100644
--- a/src/include/commands/publicationcmds.h
+++ b/src/include/commands/publicationcmds.h
@@ -38,5 +38,7 @@ extern bool pub_contains_invalid_column(Oid pubid, Relation relation,
 										char pubgencols_type,
 										bool *invalid_column_list,
 										bool *invalid_gen_col);
+extern ObjectAddress RenamePublication(const char *oldname, const char *newname);
+extern void InvalidateRelSyncCaches(List *relids);
 
 #endif							/* PUBLICATIONCMDS_H */
-- 
2.43.5

#14Hayato Kuroda (Fujitsu)
kuroda.hayato@fujitsu.com
In reply to: Hayato Kuroda (Fujitsu) (#13)
2 attachment(s)
RE: Selectively invalidate caches in pgoutput module

I found cfbot got angry due to a variable-shadowing. PSA fixed version.

Best regards,
Hayato Kuroda
FUJITSU LIMITED

Attachments:

v7-0001-Introduce-a-new-invalidation-message-to-invalidat.patchapplication/octet-stream; name=v7-0001-Introduce-a-new-invalidation-message-to-invalidat.patchDownload
From 9962a4dd473d0c862be1fabb5a8605d1e3be3d3e Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Tue, 4 Mar 2025 16:51:19 +0900
Subject: [PATCH v7 1/2] Introduce a new invalidation message to invalidate
 caches in output plugins

A new invalidation message is generated when either ALTER PUBLICATION RENAME TO
or is executed. The primal use-case of the message is to invalidate caches on
the logical decoding output plugin. Plugins can register callback functions for
the message via CacheRegisterRelSyncCallback(), and the function can invalidate
the cache for the specified relation.

A new invalidation message is transactional, and decoder processes should
recognize the message and invalidate specified caches. Thus, the messages are
stored in InvalMessageArray and serialized at the end of the transaction.
---
 src/backend/access/rmgrdesc/standbydesc.c |   2 +
 src/backend/utils/cache/inval.c           | 127 ++++++++++++++++++++++
 src/include/pg_config_manual.h            |   8 +-
 src/include/storage/sinval.h              |  11 ++
 src/include/utils/inval.h                 |  10 ++
 5 files changed, 154 insertions(+), 4 deletions(-)

diff --git a/src/backend/access/rmgrdesc/standbydesc.c b/src/backend/access/rmgrdesc/standbydesc.c
index d849f8e54b..81eff5f31c 100644
--- a/src/backend/access/rmgrdesc/standbydesc.c
+++ b/src/backend/access/rmgrdesc/standbydesc.c
@@ -132,6 +132,8 @@ standby_desc_invalidations(StringInfo buf,
 			appendStringInfo(buf, " relmap db %u", msg->rm.dbId);
 		else if (msg->id == SHAREDINVALSNAPSHOT_ID)
 			appendStringInfo(buf, " snapshot %u", msg->sn.relId);
+		else if (msg->id == SHAREDINVALRELSYNC_ID)
+			appendStringInfo(buf, " relsync %u", msg->rs.relid);
 		else
 			appendStringInfo(buf, " unrecognized id %d", msg->id);
 	}
diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c
index 700ccb6df9..35df9be5e5 100644
--- a/src/backend/utils/cache/inval.c
+++ b/src/backend/utils/cache/inval.c
@@ -271,6 +271,7 @@ int			debug_discard_caches = 0;
 
 #define MAX_SYSCACHE_CALLBACKS 64
 #define MAX_RELCACHE_CALLBACKS 10
+#define MAX_RELSYNC_CALLBACKS 10
 
 static struct SYSCACHECALLBACK
 {
@@ -292,6 +293,15 @@ static struct RELCACHECALLBACK
 
 static int	relcache_callback_count = 0;
 
+static struct RELSYNCCALLBACK
+{
+	RelSyncCallbackFunction function;
+	Datum		arg;
+}			relsync_callback_list[MAX_RELSYNC_CALLBACKS];
+
+static int	relsync_callback_count = 0;
+
+
 /* ----------------------------------------------------------------
  *				Invalidation subgroup support functions
  * ----------------------------------------------------------------
@@ -484,6 +494,34 @@ AddRelcacheInvalidationMessage(InvalidationMsgsGroup *group,
 	AddInvalidationMessage(group, RelCacheMsgs, &msg);
 }
 
+/*
+ * Add a relsync inval entry
+ *
+ * We put these into the relcache subgroup for simplicity.
+ */
+static void
+AddRelsyncInvalidationMessage(InvalidationMsgsGroup *group,
+							  Oid dbId, Oid relId)
+{
+	SharedInvalidationMessage msg;
+
+	/* Don't add a duplicate item */
+	/* We assume dbId need not be checked because it will never change */
+	ProcessMessageSubGroup(group, RelCacheMsgs,
+						   if (msg->rc.id == SHAREDINVALRELSYNC_ID &&
+							   msg->rc.relId == relId)
+						   return);
+
+	/* OK, add the item */
+	msg.rc.id = SHAREDINVALRELSYNC_ID;
+	msg.rc.dbId = dbId;
+	msg.rc.relId = relId;
+	/* check AddCatcacheInvalidationMessage() for an explanation */
+	VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg));
+
+	AddInvalidationMessage(group, RelCacheMsgs, &msg);
+}
+
 /*
  * Add a snapshot inval entry
  *
@@ -611,6 +649,18 @@ RegisterRelcacheInvalidation(InvalidationInfo *info, Oid dbId, Oid relId)
 		info->RelcacheInitFileInval = true;
 }
 
+/*
+ * RegisterRelcacheInvalidation
+ *
+ * As above, but register a relsync invalidation event.
+ */
+static void
+RegisterRelsyncInvalidation(InvalidationInfo *info, Oid dbId, Oid relId)
+{
+	AddRelsyncInvalidationMessage(&info->CurrentCmdInvalidMsgs, dbId, relId);
+}
+
+
 /*
  * RegisterSnapshotInvalidation
  *
@@ -751,6 +801,13 @@ InvalidateSystemCachesExtended(bool debug_discard)
 
 		ccitem->function(ccitem->arg, InvalidOid);
 	}
+
+	for (i = 0; i < relsync_callback_count; i++)
+	{
+		struct RELSYNCCALLBACK *ccitem = relsync_callback_list + i;
+
+		ccitem->function(ccitem->arg, InvalidOid);
+	}
 }
 
 /*
@@ -832,6 +889,12 @@ LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
 		else if (msg->sn.dbId == MyDatabaseId)
 			InvalidateCatalogSnapshot();
 	}
+	else if (msg->id == SHAREDINVALRELSYNC_ID)
+	{
+		/* We only care about our own database */
+		if (msg->rs.dbId == MyDatabaseId)
+			CallRelSyncCallbacks(msg->rs.relid);
+	}
 	else
 		elog(FATAL, "unrecognized SI message ID: %d", msg->id);
 }
@@ -1622,6 +1685,35 @@ CacheInvalidateRelcacheByRelid(Oid relid)
 }
 
 
+/*
+ * RelationCacheInvalidate
+ *		Register invalidation of the cache in logical decoding output plugin
+ *		for a database.
+ *
+ * This type of invalidation message is used for the specific purpose of output
+ * plugins. Processes which do not decode WALs would do nothing even when it
+ * receives the message.
+ */
+void
+CacheInvalidateRelSync(Oid relid)
+{
+	RegisterRelsyncInvalidation(PrepareInvalidationState(),
+								MyDatabaseId, relid);
+}
+
+
+/*
+ * CacheInvalidateRelSyncAll
+ *		Register invalidation of the whole cache in logical decoding output
+ *		plugin.
+ */
+void
+CacheInvalidateRelSyncAll(void)
+{
+	CacheInvalidateRelSync(InvalidOid);
+}
+
+
 /*
  * CacheInvalidateSmgr
  *		Register invalidation of smgr references to a physical relation.
@@ -1763,6 +1855,27 @@ CacheRegisterRelcacheCallback(RelcacheCallbackFunction func,
 	++relcache_callback_count;
 }
 
+/*
+ * CacheRegisterRelSyncCallback
+ *		Register the specified function to be called for all future
+ *		decoding-cache invalidation events.
+ *
+ * This function is intended to be call from the logical decoding output
+ * plugins.
+ */
+void
+CacheRegisterRelSyncCallback(RelSyncCallbackFunction func,
+							 Datum arg)
+{
+	if (relsync_callback_count >= MAX_RELSYNC_CALLBACKS)
+		elog(FATAL, "out of relsync_callback_list slots");
+
+	relsync_callback_list[relsync_callback_count].function = func;
+	relsync_callback_list[relsync_callback_count].arg = arg;
+
+	++relsync_callback_count;
+}
+
 /*
  * CallSyscacheCallbacks
  *
@@ -1788,6 +1901,20 @@ CallSyscacheCallbacks(int cacheid, uint32 hashvalue)
 	}
 }
 
+/*
+ * CallSyscacheCallbacks
+ */
+void
+CallRelSyncCallbacks(Oid relid)
+{
+	for (int i = 0; i < relsync_callback_count; i++)
+	{
+		struct RELSYNCCALLBACK *ccitem = relsync_callback_list + i;
+
+		ccitem->function(ccitem->arg, relid);
+	}
+}
+
 /*
  * LogLogicalInvalidations
  *
diff --git a/src/include/pg_config_manual.h b/src/include/pg_config_manual.h
index 449e50bd78..23308f1de1 100644
--- a/src/include/pg_config_manual.h
+++ b/src/include/pg_config_manual.h
@@ -282,10 +282,10 @@
 
 /*
  * For cache-invalidation debugging, define DISCARD_CACHES_ENABLED to enable
- * use of the debug_discard_caches GUC to aggressively flush syscache/relcache
- * entries whenever it's possible to deliver invalidations.  See
- * AcceptInvalidationMessages() in src/backend/utils/cache/inval.c for
- * details.
+ * use of the debug_discard_caches GUC to aggressively flush
+ * syscache/relcache/relsync cache entries whenever it's possible to deliver
+ * invalidations.  See AcceptInvalidationMessages() in
+ * src/backend/utils/cache/inval.c for details.
  *
  * USE_ASSERT_CHECKING builds default to enabling this.  It's possible to use
  * DISCARD_CACHES_ENABLED without a cassert build and the implied
diff --git a/src/include/storage/sinval.h b/src/include/storage/sinval.h
index 2463c0f9fa..90a5af4ed8 100644
--- a/src/include/storage/sinval.h
+++ b/src/include/storage/sinval.h
@@ -27,6 +27,7 @@
  *	* invalidate an smgr cache entry for a specific physical relation
  *	* invalidate the mapped-relation mapping for a given database
  *	* invalidate any saved snapshot that might be used to scan a given relation
+ *	* invalidate a specific entry for specific output plugin
  * More types could be added if needed.  The message type is identified by
  * the first "int8" field of the message struct.  Zero or positive means a
  * specific-catcache inval message (and also serves as the catcache ID field).
@@ -110,6 +111,15 @@ typedef struct
 	Oid			relId;			/* relation ID */
 } SharedInvalSnapshotMsg;
 
+#define SHAREDINVALRELSYNC_ID	(-6)
+
+typedef struct
+{
+	int8		id;				/* type field --- must be first */
+	Oid			dbId;			/* database ID */
+	Oid			relid;			/* relation ID, or 0 if whole relcache */
+} SharedInvalRelSyncMsg;
+
 typedef union
 {
 	int8		id;				/* type field --- must be first */
@@ -119,6 +129,7 @@ typedef union
 	SharedInvalSmgrMsg sm;
 	SharedInvalRelmapMsg rm;
 	SharedInvalSnapshotMsg sn;
+	SharedInvalRelSyncMsg rs;
 } SharedInvalidationMessage;
 
 
diff --git a/src/include/utils/inval.h b/src/include/utils/inval.h
index 40658ba2ff..9b871caef6 100644
--- a/src/include/utils/inval.h
+++ b/src/include/utils/inval.h
@@ -22,6 +22,7 @@ extern PGDLLIMPORT int debug_discard_caches;
 
 typedef void (*SyscacheCallbackFunction) (Datum arg, int cacheid, uint32 hashvalue);
 typedef void (*RelcacheCallbackFunction) (Datum arg, Oid relid);
+typedef void (*RelSyncCallbackFunction) (Datum arg, Oid relid);
 
 
 extern void AcceptInvalidationMessages(void);
@@ -55,6 +56,10 @@ extern void CacheInvalidateRelcacheByTuple(HeapTuple classTuple);
 
 extern void CacheInvalidateRelcacheByRelid(Oid relid);
 
+extern void CacheInvalidateRelSync(Oid relid);
+
+extern void CacheInvalidateRelSyncAll(void);
+
 extern void CacheInvalidateSmgr(RelFileLocatorBackend rlocator);
 
 extern void CacheInvalidateRelmap(Oid databaseId);
@@ -66,8 +71,13 @@ extern void CacheRegisterSyscacheCallback(int cacheid,
 extern void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func,
 										  Datum arg);
 
+extern void CacheRegisterRelSyncCallback(RelSyncCallbackFunction func,
+										 Datum arg);
+
 extern void CallSyscacheCallbacks(int cacheid, uint32 hashvalue);
 
+extern void CallRelSyncCallbacks(Oid relid);
+
 extern void InvalidateSystemCaches(void);
 extern void InvalidateSystemCachesExtended(bool debug_discard);
 
-- 
2.43.5

v7-0002-Invalidate-Relcaches-while-ALTER-PUBLICATION-RENA.patchapplication/octet-stream; name=v7-0002-Invalidate-Relcaches-while-ALTER-PUBLICATION-RENA.patchDownload
From c54cc76942d6ef7a5cf17f5a5fc37c58595f3315 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Mon, 3 Mar 2025 19:41:04 +0900
Subject: [PATCH v7 2/2] Invalidate Relcaches while ALTER PUBLICATION RENAME TO

---
 src/backend/commands/alter.c                |   4 +-
 src/backend/commands/publicationcmds.c      | 109 ++++++++++++++++++++
 src/backend/parser/gram.y                   |   2 +-
 src/backend/replication/pgoutput/pgoutput.c |   8 +-
 src/include/commands/publicationcmds.h      |   2 +
 5 files changed, 117 insertions(+), 8 deletions(-)

diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c
index 78c1d4e1b8..a79329acc1 100644
--- a/src/backend/commands/alter.c
+++ b/src/backend/commands/alter.c
@@ -400,6 +400,9 @@ ExecRenameStmt(RenameStmt *stmt)
 		case OBJECT_TYPE:
 			return RenameType(stmt);
 
+		case OBJECT_PUBLICATION:
+			return RenamePublication(stmt->subname, stmt->newname);
+
 		case OBJECT_AGGREGATE:
 		case OBJECT_COLLATION:
 		case OBJECT_CONVERSION:
@@ -417,7 +420,6 @@ ExecRenameStmt(RenameStmt *stmt)
 		case OBJECT_TSDICTIONARY:
 		case OBJECT_TSPARSER:
 		case OBJECT_TSTEMPLATE:
-		case OBJECT_PUBLICATION:
 		case OBJECT_SUBSCRIPTION:
 			{
 				ObjectAddress address;
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 150a768d16..139971e8f9 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -491,6 +491,96 @@ pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
 	return *invalid_column_list || *invalid_gen_col;
 }
 
+/*
+ * Execute ALTER PUBLICATION RENAME
+ */
+ObjectAddress
+RenamePublication(const char *oldname, const char *newname)
+{
+	Relation			rel;
+	HeapTuple			tup;
+	ObjectAddress		address;
+	Form_pg_publication	pubform;
+	bool				replaces[Natts_pg_publication];
+	bool				nulls[Natts_pg_publication];
+	Datum				values[Natts_pg_publication];
+
+	rel = table_open(PublicationRelationId, RowExclusiveLock);
+
+	tup = SearchSysCacheCopy1(PUBLICATIONNAME,
+							  CStringGetDatum(oldname));
+
+	if (!HeapTupleIsValid(tup))
+		ereport(ERROR,
+				(errcode(ERRCODE_UNDEFINED_OBJECT),
+				 errmsg("publication \"%s\" does not exist",
+						oldname)));
+
+	pubform = (Form_pg_publication) GETSTRUCT(tup);
+
+	/* must be owner */
+	if (!object_ownercheck(PublicationRelationId, pubform->oid, GetUserId()))
+		aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
+					   NameStr(pubform->pubname));
+
+	/* Everything ok, form a new tuple. */
+	memset(values, 0, sizeof(values));
+	memset(nulls, false, sizeof(nulls));
+	memset(replaces, false, sizeof(replaces));
+
+	/* Only update the pubname */
+	values[Anum_pg_publication_pubname - 1] =
+		DirectFunctionCall1(namein, CStringGetDatum(newname));
+	replaces[Anum_pg_publication_pubname - 1] = true;
+
+	tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+							replaces);
+
+	/* Update the catalog. */
+	CatalogTupleUpdate(rel, &tup->t_self, tup);
+
+	/*
+	 * Invalidate caches on the logical decoding output plugin.
+	 *
+	 * Apart from the ALTER PUBLICATION ADD/SET/DROP commands, we do not have
+	 * to invalidate relcaches. They are required to refresh the publication's
+	 * descriptor. The publication's name is not recorded in the attribute, so
+	 * the RENAME commands do not require a refresh. Instead, we only
+	 * invalidate a cache on the output plugin to rebuild its cache.
+	 */
+	if (pubform->puballtables)
+	{
+		CacheInvalidateRelSyncAll();
+	}
+	else
+	{
+		List	   *relids = NIL;
+		List	   *schemarelids = NIL;
+
+		/*
+		 * For partition table, when we insert data, get_rel_sync_entry is
+		 * called and a hash entry is created for the corresponding leaf table.
+		 * So invalidating the leaf nodes would be sufficient here.
+		 */
+		relids = GetPublicationRelations(pubform->oid,
+										 PUBLICATION_PART_LEAF);
+		schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
+														PUBLICATION_PART_LEAF);
+
+		relids = list_concat_unique_oid(relids, schemarelids);
+
+		InvalidateRelSyncCaches(relids);
+	}
+
+	ObjectAddressSet(address, PublicationRelationId, pubform->oid);
+
+	heap_freetuple(tup);
+
+	table_close(rel, RowExclusiveLock);
+
+	return address;
+}
+
 /* check_functions_in_node callback */
 static bool
 contain_mutable_or_user_functions_checker(Oid func_id, void *context)
@@ -2096,3 +2186,22 @@ defGetGeneratedColsOption(DefElem *def)
 
 	return PUBLISH_GENCOLS_NONE;	/* keep compiler quiet */
 }
+
+
+void
+InvalidateRelSyncCaches(List *relids)
+{
+	/*
+	 * We don't want to send too many individual messages, at some point it's
+	 * cheaper to just reset whole relcache.
+	 */
+	if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
+	{
+		ListCell   *lc;
+
+		foreach(lc, relids)
+			CacheInvalidateRelSync(lfirst_oid(lc));
+	}
+	else
+		CacheInvalidateRelSync(InvalidOid);
+}
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 271ae26cba..0fb4041f15 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -9541,7 +9541,7 @@ RenameStmt: ALTER AGGREGATE aggregate_with_argtypes RENAME TO name
 					RenameStmt *n = makeNode(RenameStmt);
 
 					n->renameType = OBJECT_PUBLICATION;
-					n->object = (Node *) makeString($3);
+					n->subname = $3;
 					n->newname = $6;
 					n->missing_ok = false;
 					$$ = (Node *) n;
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 9063af6e1d..ed806c5430 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -531,6 +531,8 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 			CacheRegisterSyscacheCallback(PUBLICATIONOID,
 										  publication_invalidation_cb,
 										  (Datum) 0);
+			CacheRegisterRelSyncCallback(rel_sync_cache_relation_cb,
+										 (Datum) 0);
 			publication_callback_registered = true;
 		}
 
@@ -1789,12 +1791,6 @@ static void
 publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
 {
 	publications_valid = false;
-
-	/*
-	 * Also invalidate per-relation cache so that next time the filtering info
-	 * is checked it will be updated with the new publication settings.
-	 */
-	rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
 }
 
 /*
diff --git a/src/include/commands/publicationcmds.h b/src/include/commands/publicationcmds.h
index e11a942ea0..f130ea3090 100644
--- a/src/include/commands/publicationcmds.h
+++ b/src/include/commands/publicationcmds.h
@@ -38,5 +38,7 @@ extern bool pub_contains_invalid_column(Oid pubid, Relation relation,
 										char pubgencols_type,
 										bool *invalid_column_list,
 										bool *invalid_gen_col);
+extern ObjectAddress RenamePublication(const char *oldname, const char *newname);
+extern void InvalidateRelSyncCaches(List *relids);
 
 #endif							/* PUBLICATIONCMDS_H */
-- 
2.43.5

#15Hayato Kuroda (Fujitsu)
kuroda.hayato@fujitsu.com
In reply to: Hayato Kuroda (Fujitsu) (#14)
2 attachment(s)
RE: Selectively invalidate caches in pgoutput module

Dear hackers,

I did a self-reviewing and updated a patch. PSA new version. What's new:

1. Fixed a bug which existing name can be specified by ALTER PUBLICATION RENAME.
A validation is added in RenamePublication().

Just in case, we want to discuss the case that the renaming publication has thousands
of tables. Before the patch, The ALTER PUBLICATION RENAME command just invalidates
a syscache, and registered callback would invalidate all of them.
With the patch, however, all relsync entries are invalidated within the command
so that the execution time can be longer. I feel this can be acceptable because
some codes have such a part; E.g., ATTACH/DETACH partition command invalidates
relaches of the children when the specified relation also has children - even
if it has thousands of leaves. (See d6f1e16). Also note that all other processes
like walsender can be faster by avoiding discarding entire caches.

how do you feel?

Best regards,
Hayato Kuroda
FUJITSU LIMITED

Attachments:

v8-0001-Introduce-a-new-invalidation-message-to-invalidat.patchapplication/octet-stream; name=v8-0001-Introduce-a-new-invalidation-message-to-invalidat.patchDownload
From ffc38f7ee81422af468fdad9901e2be5f1ede588 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Tue, 4 Mar 2025 16:51:19 +0900
Subject: [PATCH v8 1/2] Introduce a new invalidation message to invalidate
 caches in output plugins

A new invalidation message is generated when either ALTER PUBLICATION RENAME TO
or is executed. The primal use-case of the message is to invalidate caches on
the logical decoding output plugin. Plugins can register callback functions for
the message via CacheRegisterRelSyncCallback(), and the function can invalidate
the cache for the specified relation.

A new invalidation message is transactional, and decoder processes should
recognize the message and invalidate specified caches. Thus, the messages are
stored in InvalMessageArray and serialized at the end of the transaction.
---
 src/backend/access/rmgrdesc/standbydesc.c |   2 +
 src/backend/utils/cache/inval.c           | 127 ++++++++++++++++++++++
 src/include/pg_config_manual.h            |   8 +-
 src/include/storage/sinval.h              |  11 ++
 src/include/utils/inval.h                 |  10 ++
 5 files changed, 154 insertions(+), 4 deletions(-)

diff --git a/src/backend/access/rmgrdesc/standbydesc.c b/src/backend/access/rmgrdesc/standbydesc.c
index d849f8e54b..81eff5f31c 100644
--- a/src/backend/access/rmgrdesc/standbydesc.c
+++ b/src/backend/access/rmgrdesc/standbydesc.c
@@ -132,6 +132,8 @@ standby_desc_invalidations(StringInfo buf,
 			appendStringInfo(buf, " relmap db %u", msg->rm.dbId);
 		else if (msg->id == SHAREDINVALSNAPSHOT_ID)
 			appendStringInfo(buf, " snapshot %u", msg->sn.relId);
+		else if (msg->id == SHAREDINVALRELSYNC_ID)
+			appendStringInfo(buf, " relsync %u", msg->rs.relid);
 		else
 			appendStringInfo(buf, " unrecognized id %d", msg->id);
 	}
diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c
index 700ccb6df9..35df9be5e5 100644
--- a/src/backend/utils/cache/inval.c
+++ b/src/backend/utils/cache/inval.c
@@ -271,6 +271,7 @@ int			debug_discard_caches = 0;
 
 #define MAX_SYSCACHE_CALLBACKS 64
 #define MAX_RELCACHE_CALLBACKS 10
+#define MAX_RELSYNC_CALLBACKS 10
 
 static struct SYSCACHECALLBACK
 {
@@ -292,6 +293,15 @@ static struct RELCACHECALLBACK
 
 static int	relcache_callback_count = 0;
 
+static struct RELSYNCCALLBACK
+{
+	RelSyncCallbackFunction function;
+	Datum		arg;
+}			relsync_callback_list[MAX_RELSYNC_CALLBACKS];
+
+static int	relsync_callback_count = 0;
+
+
 /* ----------------------------------------------------------------
  *				Invalidation subgroup support functions
  * ----------------------------------------------------------------
@@ -484,6 +494,34 @@ AddRelcacheInvalidationMessage(InvalidationMsgsGroup *group,
 	AddInvalidationMessage(group, RelCacheMsgs, &msg);
 }
 
+/*
+ * Add a relsync inval entry
+ *
+ * We put these into the relcache subgroup for simplicity.
+ */
+static void
+AddRelsyncInvalidationMessage(InvalidationMsgsGroup *group,
+							  Oid dbId, Oid relId)
+{
+	SharedInvalidationMessage msg;
+
+	/* Don't add a duplicate item */
+	/* We assume dbId need not be checked because it will never change */
+	ProcessMessageSubGroup(group, RelCacheMsgs,
+						   if (msg->rc.id == SHAREDINVALRELSYNC_ID &&
+							   msg->rc.relId == relId)
+						   return);
+
+	/* OK, add the item */
+	msg.rc.id = SHAREDINVALRELSYNC_ID;
+	msg.rc.dbId = dbId;
+	msg.rc.relId = relId;
+	/* check AddCatcacheInvalidationMessage() for an explanation */
+	VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg));
+
+	AddInvalidationMessage(group, RelCacheMsgs, &msg);
+}
+
 /*
  * Add a snapshot inval entry
  *
@@ -611,6 +649,18 @@ RegisterRelcacheInvalidation(InvalidationInfo *info, Oid dbId, Oid relId)
 		info->RelcacheInitFileInval = true;
 }
 
+/*
+ * RegisterRelcacheInvalidation
+ *
+ * As above, but register a relsync invalidation event.
+ */
+static void
+RegisterRelsyncInvalidation(InvalidationInfo *info, Oid dbId, Oid relId)
+{
+	AddRelsyncInvalidationMessage(&info->CurrentCmdInvalidMsgs, dbId, relId);
+}
+
+
 /*
  * RegisterSnapshotInvalidation
  *
@@ -751,6 +801,13 @@ InvalidateSystemCachesExtended(bool debug_discard)
 
 		ccitem->function(ccitem->arg, InvalidOid);
 	}
+
+	for (i = 0; i < relsync_callback_count; i++)
+	{
+		struct RELSYNCCALLBACK *ccitem = relsync_callback_list + i;
+
+		ccitem->function(ccitem->arg, InvalidOid);
+	}
 }
 
 /*
@@ -832,6 +889,12 @@ LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
 		else if (msg->sn.dbId == MyDatabaseId)
 			InvalidateCatalogSnapshot();
 	}
+	else if (msg->id == SHAREDINVALRELSYNC_ID)
+	{
+		/* We only care about our own database */
+		if (msg->rs.dbId == MyDatabaseId)
+			CallRelSyncCallbacks(msg->rs.relid);
+	}
 	else
 		elog(FATAL, "unrecognized SI message ID: %d", msg->id);
 }
@@ -1622,6 +1685,35 @@ CacheInvalidateRelcacheByRelid(Oid relid)
 }
 
 
+/*
+ * RelationCacheInvalidate
+ *		Register invalidation of the cache in logical decoding output plugin
+ *		for a database.
+ *
+ * This type of invalidation message is used for the specific purpose of output
+ * plugins. Processes which do not decode WALs would do nothing even when it
+ * receives the message.
+ */
+void
+CacheInvalidateRelSync(Oid relid)
+{
+	RegisterRelsyncInvalidation(PrepareInvalidationState(),
+								MyDatabaseId, relid);
+}
+
+
+/*
+ * CacheInvalidateRelSyncAll
+ *		Register invalidation of the whole cache in logical decoding output
+ *		plugin.
+ */
+void
+CacheInvalidateRelSyncAll(void)
+{
+	CacheInvalidateRelSync(InvalidOid);
+}
+
+
 /*
  * CacheInvalidateSmgr
  *		Register invalidation of smgr references to a physical relation.
@@ -1763,6 +1855,27 @@ CacheRegisterRelcacheCallback(RelcacheCallbackFunction func,
 	++relcache_callback_count;
 }
 
+/*
+ * CacheRegisterRelSyncCallback
+ *		Register the specified function to be called for all future
+ *		decoding-cache invalidation events.
+ *
+ * This function is intended to be call from the logical decoding output
+ * plugins.
+ */
+void
+CacheRegisterRelSyncCallback(RelSyncCallbackFunction func,
+							 Datum arg)
+{
+	if (relsync_callback_count >= MAX_RELSYNC_CALLBACKS)
+		elog(FATAL, "out of relsync_callback_list slots");
+
+	relsync_callback_list[relsync_callback_count].function = func;
+	relsync_callback_list[relsync_callback_count].arg = arg;
+
+	++relsync_callback_count;
+}
+
 /*
  * CallSyscacheCallbacks
  *
@@ -1788,6 +1901,20 @@ CallSyscacheCallbacks(int cacheid, uint32 hashvalue)
 	}
 }
 
+/*
+ * CallSyscacheCallbacks
+ */
+void
+CallRelSyncCallbacks(Oid relid)
+{
+	for (int i = 0; i < relsync_callback_count; i++)
+	{
+		struct RELSYNCCALLBACK *ccitem = relsync_callback_list + i;
+
+		ccitem->function(ccitem->arg, relid);
+	}
+}
+
 /*
  * LogLogicalInvalidations
  *
diff --git a/src/include/pg_config_manual.h b/src/include/pg_config_manual.h
index 449e50bd78..23308f1de1 100644
--- a/src/include/pg_config_manual.h
+++ b/src/include/pg_config_manual.h
@@ -282,10 +282,10 @@
 
 /*
  * For cache-invalidation debugging, define DISCARD_CACHES_ENABLED to enable
- * use of the debug_discard_caches GUC to aggressively flush syscache/relcache
- * entries whenever it's possible to deliver invalidations.  See
- * AcceptInvalidationMessages() in src/backend/utils/cache/inval.c for
- * details.
+ * use of the debug_discard_caches GUC to aggressively flush
+ * syscache/relcache/relsync cache entries whenever it's possible to deliver
+ * invalidations.  See AcceptInvalidationMessages() in
+ * src/backend/utils/cache/inval.c for details.
  *
  * USE_ASSERT_CHECKING builds default to enabling this.  It's possible to use
  * DISCARD_CACHES_ENABLED without a cassert build and the implied
diff --git a/src/include/storage/sinval.h b/src/include/storage/sinval.h
index 2463c0f9fa..90a5af4ed8 100644
--- a/src/include/storage/sinval.h
+++ b/src/include/storage/sinval.h
@@ -27,6 +27,7 @@
  *	* invalidate an smgr cache entry for a specific physical relation
  *	* invalidate the mapped-relation mapping for a given database
  *	* invalidate any saved snapshot that might be used to scan a given relation
+ *	* invalidate a specific entry for specific output plugin
  * More types could be added if needed.  The message type is identified by
  * the first "int8" field of the message struct.  Zero or positive means a
  * specific-catcache inval message (and also serves as the catcache ID field).
@@ -110,6 +111,15 @@ typedef struct
 	Oid			relId;			/* relation ID */
 } SharedInvalSnapshotMsg;
 
+#define SHAREDINVALRELSYNC_ID	(-6)
+
+typedef struct
+{
+	int8		id;				/* type field --- must be first */
+	Oid			dbId;			/* database ID */
+	Oid			relid;			/* relation ID, or 0 if whole relcache */
+} SharedInvalRelSyncMsg;
+
 typedef union
 {
 	int8		id;				/* type field --- must be first */
@@ -119,6 +129,7 @@ typedef union
 	SharedInvalSmgrMsg sm;
 	SharedInvalRelmapMsg rm;
 	SharedInvalSnapshotMsg sn;
+	SharedInvalRelSyncMsg rs;
 } SharedInvalidationMessage;
 
 
diff --git a/src/include/utils/inval.h b/src/include/utils/inval.h
index 40658ba2ff..9b871caef6 100644
--- a/src/include/utils/inval.h
+++ b/src/include/utils/inval.h
@@ -22,6 +22,7 @@ extern PGDLLIMPORT int debug_discard_caches;
 
 typedef void (*SyscacheCallbackFunction) (Datum arg, int cacheid, uint32 hashvalue);
 typedef void (*RelcacheCallbackFunction) (Datum arg, Oid relid);
+typedef void (*RelSyncCallbackFunction) (Datum arg, Oid relid);
 
 
 extern void AcceptInvalidationMessages(void);
@@ -55,6 +56,10 @@ extern void CacheInvalidateRelcacheByTuple(HeapTuple classTuple);
 
 extern void CacheInvalidateRelcacheByRelid(Oid relid);
 
+extern void CacheInvalidateRelSync(Oid relid);
+
+extern void CacheInvalidateRelSyncAll(void);
+
 extern void CacheInvalidateSmgr(RelFileLocatorBackend rlocator);
 
 extern void CacheInvalidateRelmap(Oid databaseId);
@@ -66,8 +71,13 @@ extern void CacheRegisterSyscacheCallback(int cacheid,
 extern void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func,
 										  Datum arg);
 
+extern void CacheRegisterRelSyncCallback(RelSyncCallbackFunction func,
+										 Datum arg);
+
 extern void CallSyscacheCallbacks(int cacheid, uint32 hashvalue);
 
+extern void CallRelSyncCallbacks(Oid relid);
+
 extern void InvalidateSystemCaches(void);
 extern void InvalidateSystemCachesExtended(bool debug_discard);
 
-- 
2.43.5

v8-0002-Invalidate-Relcaches-while-ALTER-PUBLICATION-RENA.patchapplication/octet-stream; name=v8-0002-Invalidate-Relcaches-while-ALTER-PUBLICATION-RENA.patchDownload
From 9126766877967f3ab60e25e94057ff25bbd5fb9b Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Mon, 3 Mar 2025 19:41:04 +0900
Subject: [PATCH v8 2/2] Invalidate Relcaches while ALTER PUBLICATION RENAME TO

---
 src/backend/commands/alter.c                |   4 +-
 src/backend/commands/publicationcmds.c      | 110 ++++++++++++++++++++
 src/backend/parser/gram.y                   |   2 +-
 src/backend/replication/pgoutput/pgoutput.c |   8 +-
 src/include/commands/publicationcmds.h      |   2 +
 5 files changed, 118 insertions(+), 8 deletions(-)

diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c
index 78c1d4e1b8..a79329acc1 100644
--- a/src/backend/commands/alter.c
+++ b/src/backend/commands/alter.c
@@ -400,6 +400,9 @@ ExecRenameStmt(RenameStmt *stmt)
 		case OBJECT_TYPE:
 			return RenameType(stmt);
 
+		case OBJECT_PUBLICATION:
+			return RenamePublication(stmt->subname, stmt->newname);
+
 		case OBJECT_AGGREGATE:
 		case OBJECT_COLLATION:
 		case OBJECT_CONVERSION:
@@ -417,7 +420,6 @@ ExecRenameStmt(RenameStmt *stmt)
 		case OBJECT_TSDICTIONARY:
 		case OBJECT_TSPARSER:
 		case OBJECT_TSTEMPLATE:
-		case OBJECT_PUBLICATION:
 		case OBJECT_SUBSCRIPTION:
 			{
 				ObjectAddress address;
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 150a768d16..4ef0b0804c 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -491,6 +491,104 @@ pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
 	return *invalid_column_list || *invalid_gen_col;
 }
 
+/*
+ * Execute ALTER PUBLICATION RENAME
+ */
+ObjectAddress
+RenamePublication(const char *oldname, const char *newname)
+{
+	Relation			rel;
+	HeapTuple			tup;
+	ObjectAddress		address;
+	Form_pg_publication	pubform;
+	bool				replaces[Natts_pg_publication];
+	bool				nulls[Natts_pg_publication];
+	Datum				values[Natts_pg_publication];
+
+	rel = table_open(PublicationRelationId, RowExclusiveLock);
+
+	tup = SearchSysCacheCopy1(PUBLICATIONNAME,
+							  CStringGetDatum(oldname));
+
+	if (!HeapTupleIsValid(tup))
+		ereport(ERROR,
+				(errcode(ERRCODE_UNDEFINED_OBJECT),
+				 errmsg("publication \"%s\" does not exist",
+						oldname)));
+
+	pubform = (Form_pg_publication) GETSTRUCT(tup);
+
+	/* must be owner */
+	if (!object_ownercheck(PublicationRelationId, pubform->oid, GetUserId()))
+		aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
+					   NameStr(pubform->pubname));
+
+	/* Check if name is used */
+	if (OidIsValid(GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
+								   CStringGetDatum(newname))))
+		ereport(ERROR,
+				(errcode(ERRCODE_DUPLICATE_OBJECT),
+				 errmsg("publication \"%s\" already exists",
+				 newname)));
+
+	/* Everything ok, form a new tuple. */
+	memset(values, 0, sizeof(values));
+	memset(nulls, false, sizeof(nulls));
+	memset(replaces, false, sizeof(replaces));
+
+	/* Only update the pubname */
+	values[Anum_pg_publication_pubname - 1] =
+		DirectFunctionCall1(namein, CStringGetDatum(newname));
+	replaces[Anum_pg_publication_pubname - 1] = true;
+
+	tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+							replaces);
+
+	/* Update the catalog. */
+	CatalogTupleUpdate(rel, &tup->t_self, tup);
+
+	/*
+	 * Invalidate caches on the logical decoding output plugin.
+	 *
+	 * Apart from the ALTER PUBLICATION ADD/SET/DROP commands, we do not have
+	 * to invalidate relcaches. They are required to refresh the publication's
+	 * descriptor. The publication's name is not recorded in the attribute, so
+	 * the RENAME commands do not require a refresh. Instead, we only
+	 * invalidate a cache on the output plugin to rebuild its cache.
+	 */
+	if (pubform->puballtables)
+	{
+		CacheInvalidateRelSyncAll();
+	}
+	else
+	{
+		List	   *relids = NIL;
+		List	   *schemarelids = NIL;
+
+		/*
+		 * For partition table, when we insert data, get_rel_sync_entry is
+		 * called and a hash entry is created for the corresponding leaf table.
+		 * So invalidating the leaf nodes would be sufficient here.
+		 */
+		relids = GetPublicationRelations(pubform->oid,
+										 PUBLICATION_PART_LEAF);
+		schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
+														PUBLICATION_PART_LEAF);
+
+		relids = list_concat_unique_oid(relids, schemarelids);
+
+		InvalidateRelSyncCaches(relids);
+	}
+
+	ObjectAddressSet(address, PublicationRelationId, pubform->oid);
+
+	heap_freetuple(tup);
+
+	table_close(rel, RowExclusiveLock);
+
+	return address;
+}
+
 /* check_functions_in_node callback */
 static bool
 contain_mutable_or_user_functions_checker(Oid func_id, void *context)
@@ -2096,3 +2194,15 @@ defGetGeneratedColsOption(DefElem *def)
 
 	return PUBLISH_GENCOLS_NONE;	/* keep compiler quiet */
 }
+
+/*
+ * Invalidate the caches on the logical decoding output plugin.
+ */
+void
+InvalidateRelSyncCaches(List *relids)
+{
+	ListCell   *lc;
+
+	foreach(lc, relids)
+		CacheInvalidateRelSync(lfirst_oid(lc));
+}
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 271ae26cba..0fb4041f15 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -9541,7 +9541,7 @@ RenameStmt: ALTER AGGREGATE aggregate_with_argtypes RENAME TO name
 					RenameStmt *n = makeNode(RenameStmt);
 
 					n->renameType = OBJECT_PUBLICATION;
-					n->object = (Node *) makeString($3);
+					n->subname = $3;
 					n->newname = $6;
 					n->missing_ok = false;
 					$$ = (Node *) n;
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 9063af6e1d..ed806c5430 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -531,6 +531,8 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 			CacheRegisterSyscacheCallback(PUBLICATIONOID,
 										  publication_invalidation_cb,
 										  (Datum) 0);
+			CacheRegisterRelSyncCallback(rel_sync_cache_relation_cb,
+										 (Datum) 0);
 			publication_callback_registered = true;
 		}
 
@@ -1789,12 +1791,6 @@ static void
 publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
 {
 	publications_valid = false;
-
-	/*
-	 * Also invalidate per-relation cache so that next time the filtering info
-	 * is checked it will be updated with the new publication settings.
-	 */
-	rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
 }
 
 /*
diff --git a/src/include/commands/publicationcmds.h b/src/include/commands/publicationcmds.h
index e11a942ea0..f130ea3090 100644
--- a/src/include/commands/publicationcmds.h
+++ b/src/include/commands/publicationcmds.h
@@ -38,5 +38,7 @@ extern bool pub_contains_invalid_column(Oid pubid, Relation relation,
 										char pubgencols_type,
 										bool *invalid_column_list,
 										bool *invalid_gen_col);
+extern ObjectAddress RenamePublication(const char *oldname, const char *newname);
+extern void InvalidateRelSyncCaches(List *relids);
 
 #endif							/* PUBLICATIONCMDS_H */
-- 
2.43.5

#16Zhijie Hou (Fujitsu)
houzj.fnst@fujitsu.com
In reply to: Hayato Kuroda (Fujitsu) (#15)
RE: Selectively invalidate caches in pgoutput module

On Monday, March 10, 2025 7:00 PM Kuroda, Hayato <kuroda.hayato@fujitsu.com> wrote:

I did a self-reviewing and updated a patch. PSA new version. What's new:

Thanks for updating the patch.

I tested the behavior for partitioned table and have a comment on this.

+		relids = GetPublicationRelations(pubform->oid,
+										 PUBLICATION_PART_LEAF);

Currently, only the leaf partition is invalidated when the published table is
partitioned. However, I think pgoutput could cache both the partitioned table
and the leaf partition table as relsync entries.

For INSERT/UPDATE/DELETE on a partitioned table, only the leaf partition's
relsync entry is used in pgoutput, but the TRUNCATE references the parent
table's relsync entry.

For example[1]-- Example --- create table test(a int primary key) PARTITION BY RANGE (a); CREATE TABLE test_1 PARTITION OF test FOR VALUES FROM (0) TO (2); CREATE TABLE test_2 PARTITION OF test FOR VALUES FROM (2) TO (4);, if the parent table's relsync entry is not invalidated after a
RENAME operation, it results in the TRUNCATE to be missed. So I think we should
Invalidate all the tables in the partition tree by passing
PUBLICATION_PART_ALL, which is also consistent with ALTER PUB ADD/SET/DROP
TABLE.

[1]: -- Example --- create table test(a int primary key) PARTITION BY RANGE (a); CREATE TABLE test_1 PARTITION OF test FOR VALUES FROM (0) TO (2); CREATE TABLE test_2 PARTITION OF test FOR VALUES FROM (2) TO (4);
create table test(a int primary key) PARTITION BY RANGE (a);
CREATE TABLE test_1 PARTITION OF test
FOR VALUES FROM (0) TO (2);
CREATE TABLE test_2 PARTITION OF test
FOR VALUES FROM (2) TO (4);

CREATE PUBLICATION pub;
CREATE PUBLICATION pub2 FOR TABLE test WITH (PUBLISH_VIA_PARTITION_ROOT);

SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'pgoutput');

TRUNCATE test;
ALTER PUBLICATION pub RENAME TO pub3;
ALTER PUBLICATION pub2 RENAME TO pub;
TRUNCATE test;

-- I can consume some changes using the following function on HEAD, but got
-- nothing after applying the patch.
SELECT * FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub', 'streaming', 'on');

---

Best Regards,
Hou zj

#17Hayato Kuroda (Fujitsu)
kuroda.hayato@fujitsu.com
In reply to: Zhijie Hou (Fujitsu) (#16)
2 attachment(s)
RE: Selectively invalidate caches in pgoutput module

Dear Hou,

Currently, only the leaf partition is invalidated when the published table is
partitioned. However, I think pgoutput could cache both the partitioned table
and the leaf partition table as relsync entries.

For INSERT/UPDATE/DELETE on a partitioned table, only the leaf partition's
relsync entry is used in pgoutput, but the TRUNCATE references the parent
table's relsync entry.

I think your analysis is correct. PSA new version. Below part contains my analysis.

In ExecuteTruncate(), if the specified relation has children, all of them are
checked via find_all_inheritors() and listed as target. Also, ExecuteTruncateGuts()
serializes both a parent and children in XLOG_HEAP_TRUNCATE WAL record.
Decoding layer passes relations as-is. These facts mean that output plugins can
store caches on the memory.

Best regards,
Hayato Kuroda
FUJITSU LIMITED

Attachments:

v9-0001-Introduce-a-new-invalidation-message-to-invalidat.patchapplication/octet-stream; name=v9-0001-Introduce-a-new-invalidation-message-to-invalidat.patchDownload
From 7bbf7bc3278b3ad9e6071dca9eb78c8c6b80f4b0 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Tue, 4 Mar 2025 16:51:19 +0900
Subject: [PATCH v9 1/2] Introduce a new invalidation message to invalidate
 caches in output plugins

A new invalidation message is generated when either ALTER PUBLICATION RENAME TO
or is executed. The primal use-case of the message is to invalidate caches on
the logical decoding output plugin. Plugins can register callback functions for
the message via CacheRegisterRelSyncCallback(), and the function can invalidate
the cache for the specified relation.

A new invalidation message is transactional, and decoder processes should
recognize the message and invalidate specified caches. Thus, the messages are
stored in InvalMessageArray and serialized at the end of the transaction.
---
 src/backend/access/rmgrdesc/standbydesc.c |   2 +
 src/backend/utils/cache/inval.c           | 127 ++++++++++++++++++++++
 src/include/pg_config_manual.h            |   8 +-
 src/include/storage/sinval.h              |  11 ++
 src/include/utils/inval.h                 |  10 ++
 5 files changed, 154 insertions(+), 4 deletions(-)

diff --git a/src/backend/access/rmgrdesc/standbydesc.c b/src/backend/access/rmgrdesc/standbydesc.c
index d849f8e54b..81eff5f31c 100644
--- a/src/backend/access/rmgrdesc/standbydesc.c
+++ b/src/backend/access/rmgrdesc/standbydesc.c
@@ -132,6 +132,8 @@ standby_desc_invalidations(StringInfo buf,
 			appendStringInfo(buf, " relmap db %u", msg->rm.dbId);
 		else if (msg->id == SHAREDINVALSNAPSHOT_ID)
 			appendStringInfo(buf, " snapshot %u", msg->sn.relId);
+		else if (msg->id == SHAREDINVALRELSYNC_ID)
+			appendStringInfo(buf, " relsync %u", msg->rs.relid);
 		else
 			appendStringInfo(buf, " unrecognized id %d", msg->id);
 	}
diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c
index 700ccb6df9..35df9be5e5 100644
--- a/src/backend/utils/cache/inval.c
+++ b/src/backend/utils/cache/inval.c
@@ -271,6 +271,7 @@ int			debug_discard_caches = 0;
 
 #define MAX_SYSCACHE_CALLBACKS 64
 #define MAX_RELCACHE_CALLBACKS 10
+#define MAX_RELSYNC_CALLBACKS 10
 
 static struct SYSCACHECALLBACK
 {
@@ -292,6 +293,15 @@ static struct RELCACHECALLBACK
 
 static int	relcache_callback_count = 0;
 
+static struct RELSYNCCALLBACK
+{
+	RelSyncCallbackFunction function;
+	Datum		arg;
+}			relsync_callback_list[MAX_RELSYNC_CALLBACKS];
+
+static int	relsync_callback_count = 0;
+
+
 /* ----------------------------------------------------------------
  *				Invalidation subgroup support functions
  * ----------------------------------------------------------------
@@ -484,6 +494,34 @@ AddRelcacheInvalidationMessage(InvalidationMsgsGroup *group,
 	AddInvalidationMessage(group, RelCacheMsgs, &msg);
 }
 
+/*
+ * Add a relsync inval entry
+ *
+ * We put these into the relcache subgroup for simplicity.
+ */
+static void
+AddRelsyncInvalidationMessage(InvalidationMsgsGroup *group,
+							  Oid dbId, Oid relId)
+{
+	SharedInvalidationMessage msg;
+
+	/* Don't add a duplicate item */
+	/* We assume dbId need not be checked because it will never change */
+	ProcessMessageSubGroup(group, RelCacheMsgs,
+						   if (msg->rc.id == SHAREDINVALRELSYNC_ID &&
+							   msg->rc.relId == relId)
+						   return);
+
+	/* OK, add the item */
+	msg.rc.id = SHAREDINVALRELSYNC_ID;
+	msg.rc.dbId = dbId;
+	msg.rc.relId = relId;
+	/* check AddCatcacheInvalidationMessage() for an explanation */
+	VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg));
+
+	AddInvalidationMessage(group, RelCacheMsgs, &msg);
+}
+
 /*
  * Add a snapshot inval entry
  *
@@ -611,6 +649,18 @@ RegisterRelcacheInvalidation(InvalidationInfo *info, Oid dbId, Oid relId)
 		info->RelcacheInitFileInval = true;
 }
 
+/*
+ * RegisterRelcacheInvalidation
+ *
+ * As above, but register a relsync invalidation event.
+ */
+static void
+RegisterRelsyncInvalidation(InvalidationInfo *info, Oid dbId, Oid relId)
+{
+	AddRelsyncInvalidationMessage(&info->CurrentCmdInvalidMsgs, dbId, relId);
+}
+
+
 /*
  * RegisterSnapshotInvalidation
  *
@@ -751,6 +801,13 @@ InvalidateSystemCachesExtended(bool debug_discard)
 
 		ccitem->function(ccitem->arg, InvalidOid);
 	}
+
+	for (i = 0; i < relsync_callback_count; i++)
+	{
+		struct RELSYNCCALLBACK *ccitem = relsync_callback_list + i;
+
+		ccitem->function(ccitem->arg, InvalidOid);
+	}
 }
 
 /*
@@ -832,6 +889,12 @@ LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
 		else if (msg->sn.dbId == MyDatabaseId)
 			InvalidateCatalogSnapshot();
 	}
+	else if (msg->id == SHAREDINVALRELSYNC_ID)
+	{
+		/* We only care about our own database */
+		if (msg->rs.dbId == MyDatabaseId)
+			CallRelSyncCallbacks(msg->rs.relid);
+	}
 	else
 		elog(FATAL, "unrecognized SI message ID: %d", msg->id);
 }
@@ -1622,6 +1685,35 @@ CacheInvalidateRelcacheByRelid(Oid relid)
 }
 
 
+/*
+ * RelationCacheInvalidate
+ *		Register invalidation of the cache in logical decoding output plugin
+ *		for a database.
+ *
+ * This type of invalidation message is used for the specific purpose of output
+ * plugins. Processes which do not decode WALs would do nothing even when it
+ * receives the message.
+ */
+void
+CacheInvalidateRelSync(Oid relid)
+{
+	RegisterRelsyncInvalidation(PrepareInvalidationState(),
+								MyDatabaseId, relid);
+}
+
+
+/*
+ * CacheInvalidateRelSyncAll
+ *		Register invalidation of the whole cache in logical decoding output
+ *		plugin.
+ */
+void
+CacheInvalidateRelSyncAll(void)
+{
+	CacheInvalidateRelSync(InvalidOid);
+}
+
+
 /*
  * CacheInvalidateSmgr
  *		Register invalidation of smgr references to a physical relation.
@@ -1763,6 +1855,27 @@ CacheRegisterRelcacheCallback(RelcacheCallbackFunction func,
 	++relcache_callback_count;
 }
 
+/*
+ * CacheRegisterRelSyncCallback
+ *		Register the specified function to be called for all future
+ *		decoding-cache invalidation events.
+ *
+ * This function is intended to be call from the logical decoding output
+ * plugins.
+ */
+void
+CacheRegisterRelSyncCallback(RelSyncCallbackFunction func,
+							 Datum arg)
+{
+	if (relsync_callback_count >= MAX_RELSYNC_CALLBACKS)
+		elog(FATAL, "out of relsync_callback_list slots");
+
+	relsync_callback_list[relsync_callback_count].function = func;
+	relsync_callback_list[relsync_callback_count].arg = arg;
+
+	++relsync_callback_count;
+}
+
 /*
  * CallSyscacheCallbacks
  *
@@ -1788,6 +1901,20 @@ CallSyscacheCallbacks(int cacheid, uint32 hashvalue)
 	}
 }
 
+/*
+ * CallSyscacheCallbacks
+ */
+void
+CallRelSyncCallbacks(Oid relid)
+{
+	for (int i = 0; i < relsync_callback_count; i++)
+	{
+		struct RELSYNCCALLBACK *ccitem = relsync_callback_list + i;
+
+		ccitem->function(ccitem->arg, relid);
+	}
+}
+
 /*
  * LogLogicalInvalidations
  *
diff --git a/src/include/pg_config_manual.h b/src/include/pg_config_manual.h
index 449e50bd78..23308f1de1 100644
--- a/src/include/pg_config_manual.h
+++ b/src/include/pg_config_manual.h
@@ -282,10 +282,10 @@
 
 /*
  * For cache-invalidation debugging, define DISCARD_CACHES_ENABLED to enable
- * use of the debug_discard_caches GUC to aggressively flush syscache/relcache
- * entries whenever it's possible to deliver invalidations.  See
- * AcceptInvalidationMessages() in src/backend/utils/cache/inval.c for
- * details.
+ * use of the debug_discard_caches GUC to aggressively flush
+ * syscache/relcache/relsync cache entries whenever it's possible to deliver
+ * invalidations.  See AcceptInvalidationMessages() in
+ * src/backend/utils/cache/inval.c for details.
  *
  * USE_ASSERT_CHECKING builds default to enabling this.  It's possible to use
  * DISCARD_CACHES_ENABLED without a cassert build and the implied
diff --git a/src/include/storage/sinval.h b/src/include/storage/sinval.h
index 2463c0f9fa..90a5af4ed8 100644
--- a/src/include/storage/sinval.h
+++ b/src/include/storage/sinval.h
@@ -27,6 +27,7 @@
  *	* invalidate an smgr cache entry for a specific physical relation
  *	* invalidate the mapped-relation mapping for a given database
  *	* invalidate any saved snapshot that might be used to scan a given relation
+ *	* invalidate a specific entry for specific output plugin
  * More types could be added if needed.  The message type is identified by
  * the first "int8" field of the message struct.  Zero or positive means a
  * specific-catcache inval message (and also serves as the catcache ID field).
@@ -110,6 +111,15 @@ typedef struct
 	Oid			relId;			/* relation ID */
 } SharedInvalSnapshotMsg;
 
+#define SHAREDINVALRELSYNC_ID	(-6)
+
+typedef struct
+{
+	int8		id;				/* type field --- must be first */
+	Oid			dbId;			/* database ID */
+	Oid			relid;			/* relation ID, or 0 if whole relcache */
+} SharedInvalRelSyncMsg;
+
 typedef union
 {
 	int8		id;				/* type field --- must be first */
@@ -119,6 +129,7 @@ typedef union
 	SharedInvalSmgrMsg sm;
 	SharedInvalRelmapMsg rm;
 	SharedInvalSnapshotMsg sn;
+	SharedInvalRelSyncMsg rs;
 } SharedInvalidationMessage;
 
 
diff --git a/src/include/utils/inval.h b/src/include/utils/inval.h
index 40658ba2ff..9b871caef6 100644
--- a/src/include/utils/inval.h
+++ b/src/include/utils/inval.h
@@ -22,6 +22,7 @@ extern PGDLLIMPORT int debug_discard_caches;
 
 typedef void (*SyscacheCallbackFunction) (Datum arg, int cacheid, uint32 hashvalue);
 typedef void (*RelcacheCallbackFunction) (Datum arg, Oid relid);
+typedef void (*RelSyncCallbackFunction) (Datum arg, Oid relid);
 
 
 extern void AcceptInvalidationMessages(void);
@@ -55,6 +56,10 @@ extern void CacheInvalidateRelcacheByTuple(HeapTuple classTuple);
 
 extern void CacheInvalidateRelcacheByRelid(Oid relid);
 
+extern void CacheInvalidateRelSync(Oid relid);
+
+extern void CacheInvalidateRelSyncAll(void);
+
 extern void CacheInvalidateSmgr(RelFileLocatorBackend rlocator);
 
 extern void CacheInvalidateRelmap(Oid databaseId);
@@ -66,8 +71,13 @@ extern void CacheRegisterSyscacheCallback(int cacheid,
 extern void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func,
 										  Datum arg);
 
+extern void CacheRegisterRelSyncCallback(RelSyncCallbackFunction func,
+										 Datum arg);
+
 extern void CallSyscacheCallbacks(int cacheid, uint32 hashvalue);
 
+extern void CallRelSyncCallbacks(Oid relid);
+
 extern void InvalidateSystemCaches(void);
 extern void InvalidateSystemCachesExtended(bool debug_discard);
 
-- 
2.43.5

v9-0002-Invalidate-Relcaches-while-ALTER-PUBLICATION-RENA.patchapplication/octet-stream; name=v9-0002-Invalidate-Relcaches-while-ALTER-PUBLICATION-RENA.patchDownload
From e9d6bd5b003b9b8b4a6dc9eacc4e2440d338b58d Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Mon, 3 Mar 2025 19:41:04 +0900
Subject: [PATCH v9 2/2] Invalidate Relcaches while ALTER PUBLICATION RENAME TO

---
 src/backend/commands/alter.c                |   4 +-
 src/backend/commands/publicationcmds.c      | 111 ++++++++++++++++++++
 src/backend/parser/gram.y                   |   2 +-
 src/backend/replication/pgoutput/pgoutput.c |   8 +-
 src/include/commands/publicationcmds.h      |   2 +
 5 files changed, 119 insertions(+), 8 deletions(-)

diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c
index 78c1d4e1b8..a79329acc1 100644
--- a/src/backend/commands/alter.c
+++ b/src/backend/commands/alter.c
@@ -400,6 +400,9 @@ ExecRenameStmt(RenameStmt *stmt)
 		case OBJECT_TYPE:
 			return RenameType(stmt);
 
+		case OBJECT_PUBLICATION:
+			return RenamePublication(stmt->subname, stmt->newname);
+
 		case OBJECT_AGGREGATE:
 		case OBJECT_COLLATION:
 		case OBJECT_CONVERSION:
@@ -417,7 +420,6 @@ ExecRenameStmt(RenameStmt *stmt)
 		case OBJECT_TSDICTIONARY:
 		case OBJECT_TSPARSER:
 		case OBJECT_TSTEMPLATE:
-		case OBJECT_PUBLICATION:
 		case OBJECT_SUBSCRIPTION:
 			{
 				ObjectAddress address;
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 150a768d16..b1e38ed782 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -491,6 +491,105 @@ pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
 	return *invalid_column_list || *invalid_gen_col;
 }
 
+/*
+ * Execute ALTER PUBLICATION RENAME
+ */
+ObjectAddress
+RenamePublication(const char *oldname, const char *newname)
+{
+	Relation			rel;
+	HeapTuple			tup;
+	ObjectAddress		address;
+	Form_pg_publication	pubform;
+	bool				replaces[Natts_pg_publication];
+	bool				nulls[Natts_pg_publication];
+	Datum				values[Natts_pg_publication];
+
+	rel = table_open(PublicationRelationId, RowExclusiveLock);
+
+	tup = SearchSysCacheCopy1(PUBLICATIONNAME,
+							  CStringGetDatum(oldname));
+
+	if (!HeapTupleIsValid(tup))
+		ereport(ERROR,
+				(errcode(ERRCODE_UNDEFINED_OBJECT),
+				 errmsg("publication \"%s\" does not exist",
+						oldname)));
+
+	pubform = (Form_pg_publication) GETSTRUCT(tup);
+
+	/* must be owner */
+	if (!object_ownercheck(PublicationRelationId, pubform->oid, GetUserId()))
+		aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
+					   NameStr(pubform->pubname));
+
+	/* Check if name is used */
+	if (OidIsValid(GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
+								   CStringGetDatum(newname))))
+		ereport(ERROR,
+				(errcode(ERRCODE_DUPLICATE_OBJECT),
+				 errmsg("publication \"%s\" already exists",
+				 newname)));
+
+	/* Everything ok, form a new tuple. */
+	memset(values, 0, sizeof(values));
+	memset(nulls, false, sizeof(nulls));
+	memset(replaces, false, sizeof(replaces));
+
+	/* Only update the pubname */
+	values[Anum_pg_publication_pubname - 1] =
+		DirectFunctionCall1(namein, CStringGetDatum(newname));
+	replaces[Anum_pg_publication_pubname - 1] = true;
+
+	tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+							replaces);
+
+	/* Update the catalog. */
+	CatalogTupleUpdate(rel, &tup->t_self, tup);
+
+	/*
+	 * Invalidate caches on the logical decoding output plugin.
+	 *
+	 * Apart from the ALTER PUBLICATION ADD/SET/DROP commands, we do not have
+	 * to invalidate relcaches. They are required to refresh the publication's
+	 * descriptor. The publication's name is not recorded in the attribute, so
+	 * the RENAME commands do not require a refresh. Instead, we only
+	 * invalidate a cache on the output plugin to rebuild its cache.
+	 */
+	if (pubform->puballtables)
+	{
+		CacheInvalidateRelSyncAll();
+	}
+	else
+	{
+		List	   *relids = NIL;
+		List	   *schemarelids = NIL;
+
+		/*
+		 * For partitioned tables, we must invalidate all partitions and
+		 * itself. WAL records for INSERT/UPDATE/DELETE specify leaf
+		 * tables as a target. However, WAL records for TRUNCATE specify
+		 * both a root and its leaves.
+		 */
+		relids = GetPublicationRelations(pubform->oid,
+										 PUBLICATION_PART_ALL);
+		schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
+														PUBLICATION_PART_ALL);
+
+		relids = list_concat_unique_oid(relids, schemarelids);
+
+		InvalidateRelSyncCaches(relids);
+	}
+
+	ObjectAddressSet(address, PublicationRelationId, pubform->oid);
+
+	heap_freetuple(tup);
+
+	table_close(rel, RowExclusiveLock);
+
+	return address;
+}
+
 /* check_functions_in_node callback */
 static bool
 contain_mutable_or_user_functions_checker(Oid func_id, void *context)
@@ -2096,3 +2195,15 @@ defGetGeneratedColsOption(DefElem *def)
 
 	return PUBLISH_GENCOLS_NONE;	/* keep compiler quiet */
 }
+
+/*
+ * Invalidate the caches on the logical decoding output plugin.
+ */
+void
+InvalidateRelSyncCaches(List *relids)
+{
+	ListCell   *lc;
+
+	foreach(lc, relids)
+		CacheInvalidateRelSync(lfirst_oid(lc));
+}
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 271ae26cba..0fb4041f15 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -9541,7 +9541,7 @@ RenameStmt: ALTER AGGREGATE aggregate_with_argtypes RENAME TO name
 					RenameStmt *n = makeNode(RenameStmt);
 
 					n->renameType = OBJECT_PUBLICATION;
-					n->object = (Node *) makeString($3);
+					n->subname = $3;
 					n->newname = $6;
 					n->missing_ok = false;
 					$$ = (Node *) n;
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 9063af6e1d..ed806c5430 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -531,6 +531,8 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 			CacheRegisterSyscacheCallback(PUBLICATIONOID,
 										  publication_invalidation_cb,
 										  (Datum) 0);
+			CacheRegisterRelSyncCallback(rel_sync_cache_relation_cb,
+										 (Datum) 0);
 			publication_callback_registered = true;
 		}
 
@@ -1789,12 +1791,6 @@ static void
 publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
 {
 	publications_valid = false;
-
-	/*
-	 * Also invalidate per-relation cache so that next time the filtering info
-	 * is checked it will be updated with the new publication settings.
-	 */
-	rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
 }
 
 /*
diff --git a/src/include/commands/publicationcmds.h b/src/include/commands/publicationcmds.h
index e11a942ea0..f130ea3090 100644
--- a/src/include/commands/publicationcmds.h
+++ b/src/include/commands/publicationcmds.h
@@ -38,5 +38,7 @@ extern bool pub_contains_invalid_column(Oid pubid, Relation relation,
 										char pubgencols_type,
 										bool *invalid_column_list,
 										bool *invalid_gen_col);
+extern ObjectAddress RenamePublication(const char *oldname, const char *newname);
+extern void InvalidateRelSyncCaches(List *relids);
 
 #endif							/* PUBLICATIONCMDS_H */
-- 
2.43.5

#18Amit Kapila
amit.kapila16@gmail.com
In reply to: Hayato Kuroda (Fujitsu) (#17)
1 attachment(s)
Re: Selectively invalidate caches in pgoutput module

On Mon, Mar 10, 2025 at 6:42 PM Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:

Currently, only the leaf partition is invalidated when the published table is
partitioned. However, I think pgoutput could cache both the partitioned table
and the leaf partition table as relsync entries.

For INSERT/UPDATE/DELETE on a partitioned table, only the leaf partition's
relsync entry is used in pgoutput, but the TRUNCATE references the parent
table's relsync entry.

I think your analysis is correct. PSA new version. Below part contains my analysis.

I have made several cosmetic changes atop 0001 patch in the attached.
Additionally, fixed an issue in AddRelsyncInvalidationMessage() to
consider invalidating all the RelSyncCache entries. Kindly include
these in the next version if you find the changes are okay.

--
With Regards,
Amit Kapila.

Attachments:

v9_001_topup_amit.patch.txttext/plain; charset=US-ASCII; name=v9_001_topup_amit.patch.txtDownload
diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c
index 35df9be5e54..a0c49519977 100644
--- a/src/backend/utils/cache/inval.c
+++ b/src/backend/utils/cache/inval.c
@@ -497,7 +497,9 @@ AddRelcacheInvalidationMessage(InvalidationMsgsGroup *group,
 /*
  * Add a relsync inval entry
  *
- * We put these into the relcache subgroup for simplicity.
+ * We put these into the relcache subgroup for simplicity. This message is the
+ * same as AddRelcacheInvalidationMessage() except that it is for
+ * RelationSyncCache maintained by decoding plugin pgoutput.
  */
 static void
 AddRelsyncInvalidationMessage(InvalidationMsgsGroup *group,
@@ -505,11 +507,11 @@ AddRelsyncInvalidationMessage(InvalidationMsgsGroup *group,
 {
 	SharedInvalidationMessage msg;
 
-	/* Don't add a duplicate item */
-	/* We assume dbId need not be checked because it will never change */
+	/* Don't add a duplicate item. */
 	ProcessMessageSubGroup(group, RelCacheMsgs,
 						   if (msg->rc.id == SHAREDINVALRELSYNC_ID &&
-							   msg->rc.relId == relId)
+							   (msg->rc.relId == relId ||
+								msg->rc.relId == InvalidOid))
 						   return);
 
 	/* OK, add the item */
@@ -650,9 +652,9 @@ RegisterRelcacheInvalidation(InvalidationInfo *info, Oid dbId, Oid relId)
 }
 
 /*
- * RegisterRelcacheInvalidation
+ * RegisterRelsyncInvalidation
  *
- * As above, but register a relsync invalidation event.
+ * As above, but register a relsynccache invalidation event.
  */
 static void
 RegisterRelsyncInvalidation(InvalidationInfo *info, Oid dbId, Oid relId)
@@ -660,7 +662,6 @@ RegisterRelsyncInvalidation(InvalidationInfo *info, Oid dbId, Oid relId)
 	AddRelsyncInvalidationMessage(&info->CurrentCmdInvalidMsgs, dbId, relId);
 }
 
-
 /*
  * RegisterSnapshotInvalidation
  *
@@ -1684,9 +1685,8 @@ CacheInvalidateRelcacheByRelid(Oid relid)
 	ReleaseSysCache(tup);
 }
 
-
 /*
- * RelationCacheInvalidate
+ * CacheInvalidateRelSync
  *		Register invalidation of the cache in logical decoding output plugin
  *		for a database.
  *
@@ -1701,7 +1701,6 @@ CacheInvalidateRelSync(Oid relid)
 								MyDatabaseId, relid);
 }
 
-
 /*
  * CacheInvalidateRelSyncAll
  *		Register invalidation of the whole cache in logical decoding output
@@ -1713,7 +1712,6 @@ CacheInvalidateRelSyncAll(void)
 	CacheInvalidateRelSync(InvalidOid);
 }
 
-
 /*
  * CacheInvalidateSmgr
  *		Register invalidation of smgr references to a physical relation.
@@ -1858,7 +1856,7 @@ CacheRegisterRelcacheCallback(RelcacheCallbackFunction func,
 /*
  * CacheRegisterRelSyncCallback
  *		Register the specified function to be called for all future
- *		decoding-cache invalidation events.
+ *		decoding plugin's relation cache invalidation events.
  *
  * This function is intended to be call from the logical decoding output
  * plugins.
diff --git a/src/include/storage/sinval.h b/src/include/storage/sinval.h
index 90a5af4ed8f..f168b5fbf8c 100644
--- a/src/include/storage/sinval.h
+++ b/src/include/storage/sinval.h
@@ -27,7 +27,7 @@
  *	* invalidate an smgr cache entry for a specific physical relation
  *	* invalidate the mapped-relation mapping for a given database
  *	* invalidate any saved snapshot that might be used to scan a given relation
- *	* invalidate a specific entry for specific output plugin
+ *	* invalidate a RelationSyncCache entry for a specific relation
  * More types could be added if needed.  The message type is identified by
  * the first "int8" field of the message struct.  Zero or positive means a
  * specific-catcache inval message (and also serves as the catcache ID field).
@@ -47,12 +47,12 @@
  * catcache inval messages must be generated for each of its caches, since
  * the hash keys will generally be different.
  *
- * Catcache, relcache, and snapshot invalidations are transactional, and so
- * are sent to other backends upon commit.  Internally to the generating
- * backend, they are also processed at CommandCounterIncrement so that later
- * commands in the same transaction see the new state.  The generating backend
- * also has to process them at abort, to flush out any cache state it's loaded
- * from no-longer-valid entries.
+ * Catcache, relcache, relsynccache, and snapshot invalidations are
+ * transactional, and so are sent to other backends upon commit.  Internally
+ * to the generating backend, they are also processed at
+ * CommandCounterIncrement so that later commands in the same transaction see
+ * the new state.  The generating backend also has to process them at abort,
+ * to flush out any cache state it's loaded from no-longer-valid entries.
  *
  * smgr and relation mapping invalidations are non-transactional: they are
  * sent immediately when the underlying file change is made.
@@ -117,7 +117,7 @@ typedef struct
 {
 	int8		id;				/* type field --- must be first */
 	Oid			dbId;			/* database ID */
-	Oid			relid;			/* relation ID, or 0 if whole relcache */
+	Oid			relid;			/* relation ID, or 0 if whole RelationSyncCache */
 } SharedInvalRelSyncMsg;
 
 typedef union
#19Zhijie Hou (Fujitsu)
houzj.fnst@fujitsu.com
In reply to: Hayato Kuroda (Fujitsu) (#17)
1 attachment(s)
RE: Selectively invalidate caches in pgoutput module

On Monday, March 10, 2025 9:12 PM Kuroda, Hayato <kuroda.hayato@fujitsu.com> wrote:

Currently, only the leaf partition is invalidated when the published
table is partitioned. However, I think pgoutput could cache both the
partitioned table and the leaf partition table as relsync entries.

For INSERT/UPDATE/DELETE on a partitioned table, only the leaf
partition's relsync entry is used in pgoutput, but the TRUNCATE
references the parent table's relsync entry.

I think your analysis is correct. PSA new version. Below part contains my
analysis.

In ExecuteTruncate(), if the specified relation has children, all of them are
checked via find_all_inheritors() and listed as target. Also,
ExecuteTruncateGuts() serializes both a parent and children in
XLOG_HEAP_TRUNCATE WAL record.
Decoding layer passes relations as-is. These facts mean that output plugins
can store caches on the memory.

Thanks for updating the patch.

I have reviewed patch 0001 and did not find issues, aside from a few issues for
code comments that were mentioned in Amit's email. Here are some analyses and
insights gathered during the review of 0001:

The functions and variables in the 0001 patch uses 'Relsync' (e.g.,
RegisterRelsyncInvalidation) instead of the longer 'RelsyncCache'. After
internal discussions, we think it's OK, as using 'RelsyncCache' could
unnecessarily lengthen the names.

Furthermore, considering we're introducing a new invalidation message for the
RelationSyncCache within pgoutput, which is an output plugin, we discussed
whether a more general invalidation name should be adopted in case other output
plugins might use it. However, after reviewing all the plugins listed in
Wiki[1], we did not find any other plugins that reference the built-in
publication catalog. Therefore, this scenario appears to be uncommon.
Additionally, the current naming convention is sufficiently intuitive for
output plugin developers. Hence, we feel it is reasonable to retain the
existing names.

For patch 0002, I think the implementation could be improved. The
current patch introduces a new function, RenamePublication, to replace the
existing generic approach in ExecRenameStmt->AlterObjectRename_internal.
However, this creates inconsistency because the original code uses
AccessExclusiveLock for publication renaming, making concurrent renaming
impossible. The current patch seems to overlook this aspect.

Additionally, introducing a new function results in code duplication, which can
be avoided. After further consideration, handling the publication rename
separately seems unnecessary, given it requires only sending a few extra
invalidation messages. Therefore, I suggest reusing the generic handling and
simply extending AlterObjectRename_internal to include the additional
invalidation messages.

I've attached a diff with the proposed changes for 0002.

Best Regards,
Hou zj

Attachments:

0001-refactor.patchapplication/octet-stream; name=0001-refactor.patchDownload
From 6e4200c2e4acfcf90a4aaa835259ae4843b2cdcc Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.fnst@cn.fujitsu.com>
Date: Tue, 11 Mar 2025 18:13:49 +0800
Subject: [PATCH] refactor

---
 src/backend/commands/alter.c           | 20 ++++++-
 src/backend/commands/publicationcmds.c | 83 +++-----------------------
 src/backend/parser/gram.y              |  2 +-
 src/include/commands/publicationcmds.h |  3 +-
 4 files changed, 29 insertions(+), 79 deletions(-)

diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c
index a79329acc1f..838a264f474 100644
--- a/src/backend/commands/alter.c
+++ b/src/backend/commands/alter.c
@@ -306,6 +306,22 @@ AlterObjectRename_internal(Relation rel, Oid objectId, const char *new_name)
 		/* Wake up related replication workers to handle this change quickly */
 		LogicalRepWorkersWakeupAtCommit(objectId);
 	}
+	else if (classId == PublicationRelationId)
+	{
+		Form_pg_publication pub = (Form_pg_publication) GETSTRUCT(oldtup);
+
+		if (SearchSysCacheExists1(PUBLICATIONNAME, CStringGetDatum(new_name)))
+			report_name_conflict(classId, new_name);
+
+		/*
+		 * Unlike ALTER PUBLICATION ADD/SET/DROP commands, renaming a
+		 * publication does not impact the publication status of tables.
+		 * Therefore, we do not need to invalidate relcache to rebuild the
+		 * rd_pubdesc. Instead, we invalidate only the cache within the output
+		 * plugin to ensure its cache is rebuilt.
+		 */
+		InvalidatePubRelSyncCaches(pub->oid, pub->puballtables);
+	}
 	else if (nameCacheId >= 0)
 	{
 		if (OidIsValid(namespaceId))
@@ -400,9 +416,6 @@ ExecRenameStmt(RenameStmt *stmt)
 		case OBJECT_TYPE:
 			return RenameType(stmt);
 
-		case OBJECT_PUBLICATION:
-			return RenamePublication(stmt->subname, stmt->newname);
-
 		case OBJECT_AGGREGATE:
 		case OBJECT_COLLATION:
 		case OBJECT_CONVERSION:
@@ -420,6 +433,7 @@ ExecRenameStmt(RenameStmt *stmt)
 		case OBJECT_TSDICTIONARY:
 		case OBJECT_TSPARSER:
 		case OBJECT_TSTEMPLATE:
+		case OBJECT_PUBLICATION:
 		case OBJECT_SUBSCRIPTION:
 			{
 				ObjectAddress address;
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index b1e38ed7822..e18d2d06862 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -492,71 +492,15 @@ pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
 }
 
 /*
- * Execute ALTER PUBLICATION RENAME
+ * Invalidate entries in the RelationSyncCache for relations included in the
+ * specified publication, either via FOR TABLE or FOR TABLES IN SCHEMA.
+ *
+ * If 'puballtables' is true, invalidate all cache entries.
  */
-ObjectAddress
-RenamePublication(const char *oldname, const char *newname)
+void
+InvalidatePubRelSyncCaches(Oid pubid, bool puballtables)
 {
-	Relation			rel;
-	HeapTuple			tup;
-	ObjectAddress		address;
-	Form_pg_publication	pubform;
-	bool				replaces[Natts_pg_publication];
-	bool				nulls[Natts_pg_publication];
-	Datum				values[Natts_pg_publication];
-
-	rel = table_open(PublicationRelationId, RowExclusiveLock);
-
-	tup = SearchSysCacheCopy1(PUBLICATIONNAME,
-							  CStringGetDatum(oldname));
-
-	if (!HeapTupleIsValid(tup))
-		ereport(ERROR,
-				(errcode(ERRCODE_UNDEFINED_OBJECT),
-				 errmsg("publication \"%s\" does not exist",
-						oldname)));
-
-	pubform = (Form_pg_publication) GETSTRUCT(tup);
-
-	/* must be owner */
-	if (!object_ownercheck(PublicationRelationId, pubform->oid, GetUserId()))
-		aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
-					   NameStr(pubform->pubname));
-
-	/* Check if name is used */
-	if (OidIsValid(GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
-								   CStringGetDatum(newname))))
-		ereport(ERROR,
-				(errcode(ERRCODE_DUPLICATE_OBJECT),
-				 errmsg("publication \"%s\" already exists",
-				 newname)));
-
-	/* Everything ok, form a new tuple. */
-	memset(values, 0, sizeof(values));
-	memset(nulls, false, sizeof(nulls));
-	memset(replaces, false, sizeof(replaces));
-
-	/* Only update the pubname */
-	values[Anum_pg_publication_pubname - 1] =
-		DirectFunctionCall1(namein, CStringGetDatum(newname));
-	replaces[Anum_pg_publication_pubname - 1] = true;
-
-	tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
-							replaces);
-
-	/* Update the catalog. */
-	CatalogTupleUpdate(rel, &tup->t_self, tup);
-
-	/*
-	 * Invalidate caches on the logical decoding output plugin.
-	 *
-	 * Apart from the ALTER PUBLICATION ADD/SET/DROP commands, we do not have
-	 * to invalidate relcaches. They are required to refresh the publication's
-	 * descriptor. The publication's name is not recorded in the attribute, so
-	 * the RENAME commands do not require a refresh. Instead, we only
-	 * invalidate a cache on the output plugin to rebuild its cache.
-	 */
-	if (pubform->puballtables)
+	if (puballtables)
 	{
 		CacheInvalidateRelSyncAll();
 	}
@@ -571,23 +515,14 @@ RenamePublication(const char *oldname, const char *newname)
 		 * tables as a target. However, WAL records for TRUNCATE specify
 		 * both a root and its leaves.
 		 */
-		relids = GetPublicationRelations(pubform->oid,
-										 PUBLICATION_PART_ALL);
-		schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
+		relids = GetPublicationRelations(pubid, PUBLICATION_PART_ALL);
+		schemarelids = GetAllSchemaPublicationRelations(pubid,
 														PUBLICATION_PART_ALL);
 
 		relids = list_concat_unique_oid(relids, schemarelids);
 
 		InvalidateRelSyncCaches(relids);
 	}
-
-	ObjectAddressSet(address, PublicationRelationId, pubform->oid);
-
-	heap_freetuple(tup);
-
-	table_close(rel, RowExclusiveLock);
-
-	return address;
 }
 
 /* check_functions_in_node callback */
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 0fb4041f15b..271ae26cbaf 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -9541,7 +9541,7 @@ RenameStmt: ALTER AGGREGATE aggregate_with_argtypes RENAME TO name
 					RenameStmt *n = makeNode(RenameStmt);
 
 					n->renameType = OBJECT_PUBLICATION;
-					n->subname = $3;
+					n->object = (Node *) makeString($3);
 					n->newname = $6;
 					n->missing_ok = false;
 					$$ = (Node *) n;
diff --git a/src/include/commands/publicationcmds.h b/src/include/commands/publicationcmds.h
index f130ea3090a..ca55d2fe315 100644
--- a/src/include/commands/publicationcmds.h
+++ b/src/include/commands/publicationcmds.h
@@ -38,7 +38,8 @@ extern bool pub_contains_invalid_column(Oid pubid, Relation relation,
 										char pubgencols_type,
 										bool *invalid_column_list,
 										bool *invalid_gen_col);
-extern ObjectAddress RenamePublication(const char *oldname, const char *newname);
+extern void InvalidatePubRelSyncCaches(Oid pubid, bool puballtables);
+
 extern void InvalidateRelSyncCaches(List *relids);
 
 #endif							/* PUBLICATIONCMDS_H */
-- 
2.30.0.windows.2

#20Hayato Kuroda (Fujitsu)
kuroda.hayato@fujitsu.com
In reply to: Zhijie Hou (Fujitsu) (#19)
2 attachment(s)
RE: Selectively invalidate caches in pgoutput module

Dear Amit, Hou,

Thanks for giving comments!

For patch 0002, I think the implementation could be improved. The
current patch introduces a new function, RenamePublication, to replace the
existing generic approach in ExecRenameStmt->AlterObjectRename_internal.
However, this creates inconsistency because the original code uses
AccessExclusiveLock for publication renaming, making concurrent renaming
impossible. The current patch seems to overlook this aspect.

Oh, I missed the point, thanks.

Additionally, introducing a new function results in code duplication, which can
be avoided. After further consideration, handling the publication rename
separately seems unnecessary, given it requires only sending a few extra
invalidation messages. Therefore, I suggest reusing the generic handling and
simply extending AlterObjectRename_internal to include the additional
invalidation messages.

I've attached a diff with the proposed changes for 0002.

Hmm, possible. previously I introduced new function to preserve existing codes as
much as possible. But this introduced some duplicy. Your approach which extends
the common function looks nice, apart from two points;

1. Relsync cache invalidations should be done after the catalog update, but
proposed one did before that. I preferred to add new if-statementfor post catalog-update.
2. Also, common check for the name conflict can be reused.

Attached patch address comments by you and Amit [1]. What's new;

* Skip adding new relsync messages when message for InvalidOid has already exist.
Per comment from Amit [1].
* Update some code comments. Some of them are pointed out by [1].
* Stop using RenamePublication() and extend the common function.

Best regards,
Hayato Kuroda
FUJITSU LIMITED

Attachments:

v10-0001-Introduce-a-new-invalidation-message-to-invalida.patchapplication/octet-stream; name=v10-0001-Introduce-a-new-invalidation-message-to-invalida.patchDownload
From 68caed94ad297cdc8a82d9926c39db5ff96ed876 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Tue, 4 Mar 2025 16:51:19 +0900
Subject: [PATCH v10 1/2] Introduce a new invalidation message to invalidate
 caches in output plugins

A new invalidation message is generated when either ALTER PUBLICATION RENAME TO
or is executed. The primal use-case of the message is to invalidate caches on
the logical decoding output plugin. Plugins can register callback functions for
the message via CacheRegisterRelSyncCallback(), and the function can invalidate
the cache for the specified relation.

A new invalidation message is transactional, and decoder processes should
recognize the message and invalidate specified caches. Thus, the messages are
stored in InvalMessageArray and serialized at the end of the transaction.
---
 src/backend/access/rmgrdesc/standbydesc.c |   2 +
 src/backend/utils/cache/inval.c           | 125 ++++++++++++++++++++++
 src/include/pg_config_manual.h            |   8 +-
 src/include/storage/sinval.h              |  23 ++--
 src/include/utils/inval.h                 |  10 ++
 5 files changed, 158 insertions(+), 10 deletions(-)

diff --git a/src/backend/access/rmgrdesc/standbydesc.c b/src/backend/access/rmgrdesc/standbydesc.c
index d849f8e54b..81eff5f31c 100644
--- a/src/backend/access/rmgrdesc/standbydesc.c
+++ b/src/backend/access/rmgrdesc/standbydesc.c
@@ -132,6 +132,8 @@ standby_desc_invalidations(StringInfo buf,
 			appendStringInfo(buf, " relmap db %u", msg->rm.dbId);
 		else if (msg->id == SHAREDINVALSNAPSHOT_ID)
 			appendStringInfo(buf, " snapshot %u", msg->sn.relId);
+		else if (msg->id == SHAREDINVALRELSYNC_ID)
+			appendStringInfo(buf, " relsync %u", msg->rs.relid);
 		else
 			appendStringInfo(buf, " unrecognized id %d", msg->id);
 	}
diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c
index 700ccb6df9..4eb6772073 100644
--- a/src/backend/utils/cache/inval.c
+++ b/src/backend/utils/cache/inval.c
@@ -271,6 +271,7 @@ int			debug_discard_caches = 0;
 
 #define MAX_SYSCACHE_CALLBACKS 64
 #define MAX_RELCACHE_CALLBACKS 10
+#define MAX_RELSYNC_CALLBACKS 10
 
 static struct SYSCACHECALLBACK
 {
@@ -292,6 +293,15 @@ static struct RELCACHECALLBACK
 
 static int	relcache_callback_count = 0;
 
+static struct RELSYNCCALLBACK
+{
+	RelSyncCallbackFunction function;
+	Datum		arg;
+}			relsync_callback_list[MAX_RELSYNC_CALLBACKS];
+
+static int	relsync_callback_count = 0;
+
+
 /* ----------------------------------------------------------------
  *				Invalidation subgroup support functions
  * ----------------------------------------------------------------
@@ -484,6 +494,36 @@ AddRelcacheInvalidationMessage(InvalidationMsgsGroup *group,
 	AddInvalidationMessage(group, RelCacheMsgs, &msg);
 }
 
+/*
+ * Add a relsync inval entry
+ *
+ * We put these into the relcache subgroup for simplicity. This message is the
+ * same as AddRelcacheInvalidationMessage() except that it is for
+ * RelationSyncCache maintained by decoding plugin pgoutput.
+ */
+static void
+AddRelsyncInvalidationMessage(InvalidationMsgsGroup *group,
+							  Oid dbId, Oid relId)
+{
+	SharedInvalidationMessage msg;
+
+	/* Don't add a duplicate item. */
+	ProcessMessageSubGroup(group, RelCacheMsgs,
+						   if (msg->rc.id == SHAREDINVALRELSYNC_ID &&
+							   (msg->rc.relId == relId ||
+								msg->rc.relId == InvalidOid))
+						   return);
+
+	/* OK, add the item */
+	msg.rc.id = SHAREDINVALRELSYNC_ID;
+	msg.rc.dbId = dbId;
+	msg.rc.relId = relId;
+	/* check AddCatcacheInvalidationMessage() for an explanation */
+	VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg));
+
+	AddInvalidationMessage(group, RelCacheMsgs, &msg);
+}
+
 /*
  * Add a snapshot inval entry
  *
@@ -611,6 +651,17 @@ RegisterRelcacheInvalidation(InvalidationInfo *info, Oid dbId, Oid relId)
 		info->RelcacheInitFileInval = true;
 }
 
+/*
+ * RegisterRelsyncInvalidation
+ *
+ * As above, but register a relsynccache invalidation event.
+ */
+static void
+RegisterRelsyncInvalidation(InvalidationInfo *info, Oid dbId, Oid relId)
+{
+	AddRelsyncInvalidationMessage(&info->CurrentCmdInvalidMsgs, dbId, relId);
+}
+
 /*
  * RegisterSnapshotInvalidation
  *
@@ -751,6 +802,13 @@ InvalidateSystemCachesExtended(bool debug_discard)
 
 		ccitem->function(ccitem->arg, InvalidOid);
 	}
+
+	for (i = 0; i < relsync_callback_count; i++)
+	{
+		struct RELSYNCCALLBACK *ccitem = relsync_callback_list + i;
+
+		ccitem->function(ccitem->arg, InvalidOid);
+	}
 }
 
 /*
@@ -832,6 +890,12 @@ LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
 		else if (msg->sn.dbId == MyDatabaseId)
 			InvalidateCatalogSnapshot();
 	}
+	else if (msg->id == SHAREDINVALRELSYNC_ID)
+	{
+		/* We only care about our own database */
+		if (msg->rs.dbId == MyDatabaseId)
+			CallRelSyncCallbacks(msg->rs.relid);
+	}
 	else
 		elog(FATAL, "unrecognized SI message ID: %d", msg->id);
 }
@@ -1621,6 +1685,32 @@ CacheInvalidateRelcacheByRelid(Oid relid)
 	ReleaseSysCache(tup);
 }
 
+/*
+ * CacheInvalidateRelSync
+ *		Register invalidation of the cache in logical decoding output plugin
+ *		for a database.
+ *
+ * This type of invalidation message is used for the specific purpose of output
+ * plugins. Processes which do not decode WALs would do nothing even when it
+ * receives the message.
+ */
+void
+CacheInvalidateRelSync(Oid relid)
+{
+	RegisterRelsyncInvalidation(PrepareInvalidationState(),
+								MyDatabaseId, relid);
+}
+
+/*
+ * CacheInvalidateRelSyncAll
+ *		Register invalidation of the whole cache in logical decoding output
+ *		plugin.
+ */
+void
+CacheInvalidateRelSyncAll(void)
+{
+	CacheInvalidateRelSync(InvalidOid);
+}
 
 /*
  * CacheInvalidateSmgr
@@ -1763,6 +1853,27 @@ CacheRegisterRelcacheCallback(RelcacheCallbackFunction func,
 	++relcache_callback_count;
 }
 
+/*
+ * CacheRegisterRelSyncCallback
+ *		Register the specified function to be called for all future
+ *		relsynccache invalidation events.
+ *
+ * This function is intended to be call from the logical decoding output
+ * plugins.
+ */
+void
+CacheRegisterRelSyncCallback(RelSyncCallbackFunction func,
+							 Datum arg)
+{
+	if (relsync_callback_count >= MAX_RELSYNC_CALLBACKS)
+		elog(FATAL, "out of relsync_callback_list slots");
+
+	relsync_callback_list[relsync_callback_count].function = func;
+	relsync_callback_list[relsync_callback_count].arg = arg;
+
+	++relsync_callback_count;
+}
+
 /*
  * CallSyscacheCallbacks
  *
@@ -1788,6 +1899,20 @@ CallSyscacheCallbacks(int cacheid, uint32 hashvalue)
 	}
 }
 
+/*
+ * CallSyscacheCallbacks
+ */
+void
+CallRelSyncCallbacks(Oid relid)
+{
+	for (int i = 0; i < relsync_callback_count; i++)
+	{
+		struct RELSYNCCALLBACK *ccitem = relsync_callback_list + i;
+
+		ccitem->function(ccitem->arg, relid);
+	}
+}
+
 /*
  * LogLogicalInvalidations
  *
diff --git a/src/include/pg_config_manual.h b/src/include/pg_config_manual.h
index 449e50bd78..125d3eb5ff 100644
--- a/src/include/pg_config_manual.h
+++ b/src/include/pg_config_manual.h
@@ -282,10 +282,10 @@
 
 /*
  * For cache-invalidation debugging, define DISCARD_CACHES_ENABLED to enable
- * use of the debug_discard_caches GUC to aggressively flush syscache/relcache
- * entries whenever it's possible to deliver invalidations.  See
- * AcceptInvalidationMessages() in src/backend/utils/cache/inval.c for
- * details.
+ * use of the debug_discard_caches GUC to aggressively flush
+ * syscache/relcache/relsynccache entries whenever it's possible to deliver
+ * invalidations.  See AcceptInvalidationMessages() in
+ * src/backend/utils/cache/inval.c for details.
  *
  * USE_ASSERT_CHECKING builds default to enabling this.  It's possible to use
  * DISCARD_CACHES_ENABLED without a cassert build and the implied
diff --git a/src/include/storage/sinval.h b/src/include/storage/sinval.h
index 2463c0f9fa..f168b5fbf8 100644
--- a/src/include/storage/sinval.h
+++ b/src/include/storage/sinval.h
@@ -27,6 +27,7 @@
  *	* invalidate an smgr cache entry for a specific physical relation
  *	* invalidate the mapped-relation mapping for a given database
  *	* invalidate any saved snapshot that might be used to scan a given relation
+ *	* invalidate a RelationSyncCache entry for a specific relation
  * More types could be added if needed.  The message type is identified by
  * the first "int8" field of the message struct.  Zero or positive means a
  * specific-catcache inval message (and also serves as the catcache ID field).
@@ -46,12 +47,12 @@
  * catcache inval messages must be generated for each of its caches, since
  * the hash keys will generally be different.
  *
- * Catcache, relcache, and snapshot invalidations are transactional, and so
- * are sent to other backends upon commit.  Internally to the generating
- * backend, they are also processed at CommandCounterIncrement so that later
- * commands in the same transaction see the new state.  The generating backend
- * also has to process them at abort, to flush out any cache state it's loaded
- * from no-longer-valid entries.
+ * Catcache, relcache, relsynccache, and snapshot invalidations are
+ * transactional, and so are sent to other backends upon commit.  Internally
+ * to the generating backend, they are also processed at
+ * CommandCounterIncrement so that later commands in the same transaction see
+ * the new state.  The generating backend also has to process them at abort,
+ * to flush out any cache state it's loaded from no-longer-valid entries.
  *
  * smgr and relation mapping invalidations are non-transactional: they are
  * sent immediately when the underlying file change is made.
@@ -110,6 +111,15 @@ typedef struct
 	Oid			relId;			/* relation ID */
 } SharedInvalSnapshotMsg;
 
+#define SHAREDINVALRELSYNC_ID	(-6)
+
+typedef struct
+{
+	int8		id;				/* type field --- must be first */
+	Oid			dbId;			/* database ID */
+	Oid			relid;			/* relation ID, or 0 if whole RelationSyncCache */
+} SharedInvalRelSyncMsg;
+
 typedef union
 {
 	int8		id;				/* type field --- must be first */
@@ -119,6 +129,7 @@ typedef union
 	SharedInvalSmgrMsg sm;
 	SharedInvalRelmapMsg rm;
 	SharedInvalSnapshotMsg sn;
+	SharedInvalRelSyncMsg rs;
 } SharedInvalidationMessage;
 
 
diff --git a/src/include/utils/inval.h b/src/include/utils/inval.h
index 40658ba2ff..9b871caef6 100644
--- a/src/include/utils/inval.h
+++ b/src/include/utils/inval.h
@@ -22,6 +22,7 @@ extern PGDLLIMPORT int debug_discard_caches;
 
 typedef void (*SyscacheCallbackFunction) (Datum arg, int cacheid, uint32 hashvalue);
 typedef void (*RelcacheCallbackFunction) (Datum arg, Oid relid);
+typedef void (*RelSyncCallbackFunction) (Datum arg, Oid relid);
 
 
 extern void AcceptInvalidationMessages(void);
@@ -55,6 +56,10 @@ extern void CacheInvalidateRelcacheByTuple(HeapTuple classTuple);
 
 extern void CacheInvalidateRelcacheByRelid(Oid relid);
 
+extern void CacheInvalidateRelSync(Oid relid);
+
+extern void CacheInvalidateRelSyncAll(void);
+
 extern void CacheInvalidateSmgr(RelFileLocatorBackend rlocator);
 
 extern void CacheInvalidateRelmap(Oid databaseId);
@@ -66,8 +71,13 @@ extern void CacheRegisterSyscacheCallback(int cacheid,
 extern void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func,
 										  Datum arg);
 
+extern void CacheRegisterRelSyncCallback(RelSyncCallbackFunction func,
+										 Datum arg);
+
 extern void CallSyscacheCallbacks(int cacheid, uint32 hashvalue);
 
+extern void CallRelSyncCallbacks(Oid relid);
+
 extern void InvalidateSystemCaches(void);
 extern void InvalidateSystemCachesExtended(bool debug_discard);
 
-- 
2.43.5

v10-0002-Invalidate-Relcaches-while-ALTER-PUBLICATION-REN.patchapplication/octet-stream; name=v10-0002-Invalidate-Relcaches-while-ALTER-PUBLICATION-REN.patchDownload
From f1ecaa5e9c08886f0eab2c038e13d25a195e7d33 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Mon, 3 Mar 2025 19:41:04 +0900
Subject: [PATCH v10 2/2] Invalidate Relcaches while ALTER PUBLICATION RENAME
 TO

---
 src/backend/commands/alter.c                | 16 +++++++
 src/backend/commands/publicationcmds.c      | 49 +++++++++++++++++++++
 src/backend/replication/pgoutput/pgoutput.c |  8 +---
 src/include/commands/publicationcmds.h      |  2 +
 4 files changed, 69 insertions(+), 6 deletions(-)

diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c
index 78c1d4e1b8..2187d55a2e 100644
--- a/src/backend/commands/alter.c
+++ b/src/backend/commands/alter.c
@@ -338,6 +338,22 @@ AlterObjectRename_internal(Relation rel, Oid objectId, const char *new_name)
 
 	InvokeObjectPostAlterHook(classId, objectId, 0);
 
+	/* Do post catalog-update tasks */
+	if (classId == PublicationRelationId)
+	{
+		Form_pg_publication pub = (Form_pg_publication) GETSTRUCT(oldtup);
+
+		/*
+		 * Invalidate relsynccaches.
+		 *
+		 * Unlike ALTER PUBLICATION ADD/SET/DROP commands, renaming a
+		 * publication does not impact the publication status of tables.
+		 * Therefore, we do not need to invalidate relcache to rebuild the
+		 * rd_pubdesc. Instead, we invalidate only the relsyncache.
+		 */
+		InvalidatePubRelSyncCaches(pub->oid, pub->puballtables);
+	}
+
 	/* Release memory */
 	pfree(values);
 	pfree(nulls);
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 150a768d16..2ae0f807f2 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -491,6 +491,43 @@ pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
 	return *invalid_column_list || *invalid_gen_col;
 }
 
+/*
+ * Invalidate entries in the RelationSyncCache for relations included in the
+ * specified publication, either via FOR TABLE or FOR TABLES IN SCHEMA.
+ *
+ * If 'puballtables' is true, invalidate all cache entries.
+ */
+void
+InvalidatePubRelSyncCaches(Oid pubid, bool puballtables)
+{
+	if (puballtables)
+	{
+		CacheInvalidateRelSyncAll();
+	}
+	else
+	{
+		List	   *relids = NIL;
+		List	   *schemarelids = NIL;
+
+		/*
+		 * For partitioned tables, we must invalidate all partitions and
+		 * itself. WAL records for INSERT/UPDATE/DELETE specify leaf
+		 * tables as a target. However, WAL records for TRUNCATE specify
+		 * both a root and its leaves.
+		 */
+		relids = GetPublicationRelations(pubid,
+										 PUBLICATION_PART_ALL);
+		schemarelids = GetAllSchemaPublicationRelations(pubid,
+														PUBLICATION_PART_ALL);
+
+		relids = list_concat_unique_oid(relids, schemarelids);
+
+		InvalidateRelSyncCaches(relids);
+	}
+
+	return;
+}
+
 /* check_functions_in_node callback */
 static bool
 contain_mutable_or_user_functions_checker(Oid func_id, void *context)
@@ -2096,3 +2133,15 @@ defGetGeneratedColsOption(DefElem *def)
 
 	return PUBLISH_GENCOLS_NONE;	/* keep compiler quiet */
 }
+
+/*
+ * Invalidate the relsyncaches.
+ */
+void
+InvalidateRelSyncCaches(List *relids)
+{
+	ListCell   *lc;
+
+	foreach(lc, relids)
+		CacheInvalidateRelSync(lfirst_oid(lc));
+}
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 9063af6e1d..ed806c5430 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -531,6 +531,8 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 			CacheRegisterSyscacheCallback(PUBLICATIONOID,
 										  publication_invalidation_cb,
 										  (Datum) 0);
+			CacheRegisterRelSyncCallback(rel_sync_cache_relation_cb,
+										 (Datum) 0);
 			publication_callback_registered = true;
 		}
 
@@ -1789,12 +1791,6 @@ static void
 publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
 {
 	publications_valid = false;
-
-	/*
-	 * Also invalidate per-relation cache so that next time the filtering info
-	 * is checked it will be updated with the new publication settings.
-	 */
-	rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
 }
 
 /*
diff --git a/src/include/commands/publicationcmds.h b/src/include/commands/publicationcmds.h
index e11a942ea0..8eb1795d47 100644
--- a/src/include/commands/publicationcmds.h
+++ b/src/include/commands/publicationcmds.h
@@ -38,5 +38,7 @@ extern bool pub_contains_invalid_column(Oid pubid, Relation relation,
 										char pubgencols_type,
 										bool *invalid_column_list,
 										bool *invalid_gen_col);
+extern void InvalidatePubRelSyncCaches(Oid pubid, bool puballtables);
+extern void InvalidateRelSyncCaches(List *relids);
 
 #endif							/* PUBLICATIONCMDS_H */
-- 
2.43.5

#21Amit Kapila
amit.kapila16@gmail.com
In reply to: Hayato Kuroda (Fujitsu) (#20)
1 attachment(s)
Re: Selectively invalidate caches in pgoutput module

On Tue, Mar 11, 2025 at 5:47 PM Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:

Attached patch address comments by you and Amit [1]. What's new;

Thanks, the patch looks mostly good to me. I have made few cosmetic
changes in the attached and combined both the patches. The existing
Alter Publication ... Rename tests don't test invalidations arising
from that command. As this patch changes that code path, it would be
good to add a few tests for the same. We can add one for individual
relations and another for ALL Tables publication.

--
With Regards,
Amit Kapila.

Attachments:

v11-0001-Avoid-invalidating-all-RelationSyncCache-entries.patchapplication/octet-stream; name=v11-0001-Avoid-invalidating-all-RelationSyncCache-entries.patchDownload
From 96a0bf20db6757ee09fca9a52a2b5020aca244cb Mon Sep 17 00:00:00 2001
From: Amit Kapila <akapila@postgresql.org>
Date: Wed, 12 Mar 2025 14:41:26 +0530
Subject: [PATCH v11] Avoid invalidating all RelationSyncCache entries on
 publication rename.

On Publication rename, we need to only invalidate the RelationSyncCache
entries corresponding to relations that are part of the publication being
renamed.

As part of this patch, we introduce a new invalidation message to
invalidate the cache maintained by the logical decoding output plugin. We
can't use existing relcache invalidation for this purpose, as that would
unnecessarily cause relcache invalidations in other backends.

Author: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Author: Shlok Kyal <shlok.kyal.oss@gmail.com>
Reviewed-by: Hou Zhijie <houzj.fnst@fujitsu.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Discussion: https://postgr.es/m/OSCPR01MB14966C09AA201EFFA706576A7F5C92@OSCPR01MB14966.jpnprd01.prod.outlook.com
---
 src/backend/access/rmgrdesc/standbydesc.c   |   2 +
 src/backend/commands/alter.c                |  16 +++
 src/backend/commands/publicationcmds.c      |  39 ++++++
 src/backend/replication/pgoutput/pgoutput.c |   8 +-
 src/backend/utils/cache/inval.c             | 125 ++++++++++++++++++++
 src/include/commands/publicationcmds.h      |   1 +
 src/include/pg_config_manual.h              |   8 +-
 src/include/storage/sinval.h                |  23 +++-
 src/include/utils/inval.h                   |  10 ++
 9 files changed, 216 insertions(+), 16 deletions(-)

diff --git a/src/backend/access/rmgrdesc/standbydesc.c b/src/backend/access/rmgrdesc/standbydesc.c
index d849f8e54ba..81eff5f31c4 100644
--- a/src/backend/access/rmgrdesc/standbydesc.c
+++ b/src/backend/access/rmgrdesc/standbydesc.c
@@ -132,6 +132,8 @@ standby_desc_invalidations(StringInfo buf,
 			appendStringInfo(buf, " relmap db %u", msg->rm.dbId);
 		else if (msg->id == SHAREDINVALSNAPSHOT_ID)
 			appendStringInfo(buf, " snapshot %u", msg->sn.relId);
+		else if (msg->id == SHAREDINVALRELSYNC_ID)
+			appendStringInfo(buf, " relsync %u", msg->rs.relid);
 		else
 			appendStringInfo(buf, " unrecognized id %d", msg->id);
 	}
diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c
index 78c1d4e1b84..3b7114b8594 100644
--- a/src/backend/commands/alter.c
+++ b/src/backend/commands/alter.c
@@ -338,6 +338,22 @@ AlterObjectRename_internal(Relation rel, Oid objectId, const char *new_name)
 
 	InvokeObjectPostAlterHook(classId, objectId, 0);
 
+	/* Do post catalog-update tasks */
+	if (classId == PublicationRelationId)
+	{
+		Form_pg_publication pub = (Form_pg_publication) GETSTRUCT(oldtup);
+
+		/*
+		 * Invalidate relsynccache entries.
+		 *
+		 * Unlike ALTER PUBLICATION ADD/SET/DROP commands, renaming a
+		 * publication does not impact the publication status of tables. So, we
+		 * don't need to invalidate relcache to rebuild the rd_pubdesc.
+		 * Instead, we invalidate only the relsyncache.
+		 */
+		InvalidatePubRelSyncCache(pub->oid, pub->puballtables);
+	}
+
 	/* Release memory */
 	pfree(values);
 	pfree(nulls);
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 150a768d16f..94a5a2b6c76 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -491,6 +491,45 @@ pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
 	return *invalid_column_list || *invalid_gen_col;
 }
 
+/*
+ * Invalidate entries in the RelationSyncCache for relations included in the
+ * specified publication, either via FOR TABLE or FOR TABLES IN SCHEMA.
+ *
+ * If 'puballtables' is true, invalidate all cache entries.
+ */
+void
+InvalidatePubRelSyncCache(Oid pubid, bool puballtables)
+{
+	if (puballtables)
+	{
+		CacheInvalidateRelSyncAll();
+	}
+	else
+	{
+		List	   *relids = NIL;
+		List	   *schemarelids = NIL;
+
+		/*
+		 * For partitioned tables, we must invalidate all partitions and
+		 * itself. WAL records for INSERT/UPDATE/DELETE specify leaf
+		 * tables as a target. However, WAL records for TRUNCATE specify
+		 * both a root and its leaves.
+		 */
+		relids = GetPublicationRelations(pubid,
+										 PUBLICATION_PART_ALL);
+		schemarelids = GetAllSchemaPublicationRelations(pubid,
+														PUBLICATION_PART_ALL);
+
+		relids = list_concat_unique_oid(relids, schemarelids);
+
+		/* Invalidate the relsyncache */
+		foreach_oid(relid, relids)
+			CacheInvalidateRelSync(relid);
+	}
+
+	return;
+}
+
 /* check_functions_in_node callback */
 static bool
 contain_mutable_or_user_functions_checker(Oid func_id, void *context)
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 9063af6e1df..ed806c54300 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -531,6 +531,8 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 			CacheRegisterSyscacheCallback(PUBLICATIONOID,
 										  publication_invalidation_cb,
 										  (Datum) 0);
+			CacheRegisterRelSyncCallback(rel_sync_cache_relation_cb,
+										 (Datum) 0);
 			publication_callback_registered = true;
 		}
 
@@ -1789,12 +1791,6 @@ static void
 publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
 {
 	publications_valid = false;
-
-	/*
-	 * Also invalidate per-relation cache so that next time the filtering info
-	 * is checked it will be updated with the new publication settings.
-	 */
-	rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
 }
 
 /*
diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c
index 700ccb6df9b..4eb67720737 100644
--- a/src/backend/utils/cache/inval.c
+++ b/src/backend/utils/cache/inval.c
@@ -271,6 +271,7 @@ int			debug_discard_caches = 0;
 
 #define MAX_SYSCACHE_CALLBACKS 64
 #define MAX_RELCACHE_CALLBACKS 10
+#define MAX_RELSYNC_CALLBACKS 10
 
 static struct SYSCACHECALLBACK
 {
@@ -292,6 +293,15 @@ static struct RELCACHECALLBACK
 
 static int	relcache_callback_count = 0;
 
+static struct RELSYNCCALLBACK
+{
+	RelSyncCallbackFunction function;
+	Datum		arg;
+}			relsync_callback_list[MAX_RELSYNC_CALLBACKS];
+
+static int	relsync_callback_count = 0;
+
+
 /* ----------------------------------------------------------------
  *				Invalidation subgroup support functions
  * ----------------------------------------------------------------
@@ -484,6 +494,36 @@ AddRelcacheInvalidationMessage(InvalidationMsgsGroup *group,
 	AddInvalidationMessage(group, RelCacheMsgs, &msg);
 }
 
+/*
+ * Add a relsync inval entry
+ *
+ * We put these into the relcache subgroup for simplicity. This message is the
+ * same as AddRelcacheInvalidationMessage() except that it is for
+ * RelationSyncCache maintained by decoding plugin pgoutput.
+ */
+static void
+AddRelsyncInvalidationMessage(InvalidationMsgsGroup *group,
+							  Oid dbId, Oid relId)
+{
+	SharedInvalidationMessage msg;
+
+	/* Don't add a duplicate item. */
+	ProcessMessageSubGroup(group, RelCacheMsgs,
+						   if (msg->rc.id == SHAREDINVALRELSYNC_ID &&
+							   (msg->rc.relId == relId ||
+								msg->rc.relId == InvalidOid))
+						   return);
+
+	/* OK, add the item */
+	msg.rc.id = SHAREDINVALRELSYNC_ID;
+	msg.rc.dbId = dbId;
+	msg.rc.relId = relId;
+	/* check AddCatcacheInvalidationMessage() for an explanation */
+	VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg));
+
+	AddInvalidationMessage(group, RelCacheMsgs, &msg);
+}
+
 /*
  * Add a snapshot inval entry
  *
@@ -611,6 +651,17 @@ RegisterRelcacheInvalidation(InvalidationInfo *info, Oid dbId, Oid relId)
 		info->RelcacheInitFileInval = true;
 }
 
+/*
+ * RegisterRelsyncInvalidation
+ *
+ * As above, but register a relsynccache invalidation event.
+ */
+static void
+RegisterRelsyncInvalidation(InvalidationInfo *info, Oid dbId, Oid relId)
+{
+	AddRelsyncInvalidationMessage(&info->CurrentCmdInvalidMsgs, dbId, relId);
+}
+
 /*
  * RegisterSnapshotInvalidation
  *
@@ -751,6 +802,13 @@ InvalidateSystemCachesExtended(bool debug_discard)
 
 		ccitem->function(ccitem->arg, InvalidOid);
 	}
+
+	for (i = 0; i < relsync_callback_count; i++)
+	{
+		struct RELSYNCCALLBACK *ccitem = relsync_callback_list + i;
+
+		ccitem->function(ccitem->arg, InvalidOid);
+	}
 }
 
 /*
@@ -832,6 +890,12 @@ LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
 		else if (msg->sn.dbId == MyDatabaseId)
 			InvalidateCatalogSnapshot();
 	}
+	else if (msg->id == SHAREDINVALRELSYNC_ID)
+	{
+		/* We only care about our own database */
+		if (msg->rs.dbId == MyDatabaseId)
+			CallRelSyncCallbacks(msg->rs.relid);
+	}
 	else
 		elog(FATAL, "unrecognized SI message ID: %d", msg->id);
 }
@@ -1621,6 +1685,32 @@ CacheInvalidateRelcacheByRelid(Oid relid)
 	ReleaseSysCache(tup);
 }
 
+/*
+ * CacheInvalidateRelSync
+ *		Register invalidation of the cache in logical decoding output plugin
+ *		for a database.
+ *
+ * This type of invalidation message is used for the specific purpose of output
+ * plugins. Processes which do not decode WALs would do nothing even when it
+ * receives the message.
+ */
+void
+CacheInvalidateRelSync(Oid relid)
+{
+	RegisterRelsyncInvalidation(PrepareInvalidationState(),
+								MyDatabaseId, relid);
+}
+
+/*
+ * CacheInvalidateRelSyncAll
+ *		Register invalidation of the whole cache in logical decoding output
+ *		plugin.
+ */
+void
+CacheInvalidateRelSyncAll(void)
+{
+	CacheInvalidateRelSync(InvalidOid);
+}
 
 /*
  * CacheInvalidateSmgr
@@ -1763,6 +1853,27 @@ CacheRegisterRelcacheCallback(RelcacheCallbackFunction func,
 	++relcache_callback_count;
 }
 
+/*
+ * CacheRegisterRelSyncCallback
+ *		Register the specified function to be called for all future
+ *		relsynccache invalidation events.
+ *
+ * This function is intended to be call from the logical decoding output
+ * plugins.
+ */
+void
+CacheRegisterRelSyncCallback(RelSyncCallbackFunction func,
+							 Datum arg)
+{
+	if (relsync_callback_count >= MAX_RELSYNC_CALLBACKS)
+		elog(FATAL, "out of relsync_callback_list slots");
+
+	relsync_callback_list[relsync_callback_count].function = func;
+	relsync_callback_list[relsync_callback_count].arg = arg;
+
+	++relsync_callback_count;
+}
+
 /*
  * CallSyscacheCallbacks
  *
@@ -1788,6 +1899,20 @@ CallSyscacheCallbacks(int cacheid, uint32 hashvalue)
 	}
 }
 
+/*
+ * CallSyscacheCallbacks
+ */
+void
+CallRelSyncCallbacks(Oid relid)
+{
+	for (int i = 0; i < relsync_callback_count; i++)
+	{
+		struct RELSYNCCALLBACK *ccitem = relsync_callback_list + i;
+
+		ccitem->function(ccitem->arg, relid);
+	}
+}
+
 /*
  * LogLogicalInvalidations
  *
diff --git a/src/include/commands/publicationcmds.h b/src/include/commands/publicationcmds.h
index e11a942ea0f..e41df6db038 100644
--- a/src/include/commands/publicationcmds.h
+++ b/src/include/commands/publicationcmds.h
@@ -38,5 +38,6 @@ extern bool pub_contains_invalid_column(Oid pubid, Relation relation,
 										char pubgencols_type,
 										bool *invalid_column_list,
 										bool *invalid_gen_col);
+extern void InvalidatePubRelSyncCache(Oid pubid, bool puballtables);
 
 #endif							/* PUBLICATIONCMDS_H */
diff --git a/src/include/pg_config_manual.h b/src/include/pg_config_manual.h
index 449e50bd78c..125d3eb5fff 100644
--- a/src/include/pg_config_manual.h
+++ b/src/include/pg_config_manual.h
@@ -282,10 +282,10 @@
 
 /*
  * For cache-invalidation debugging, define DISCARD_CACHES_ENABLED to enable
- * use of the debug_discard_caches GUC to aggressively flush syscache/relcache
- * entries whenever it's possible to deliver invalidations.  See
- * AcceptInvalidationMessages() in src/backend/utils/cache/inval.c for
- * details.
+ * use of the debug_discard_caches GUC to aggressively flush
+ * syscache/relcache/relsynccache entries whenever it's possible to deliver
+ * invalidations.  See AcceptInvalidationMessages() in
+ * src/backend/utils/cache/inval.c for details.
  *
  * USE_ASSERT_CHECKING builds default to enabling this.  It's possible to use
  * DISCARD_CACHES_ENABLED without a cassert build and the implied
diff --git a/src/include/storage/sinval.h b/src/include/storage/sinval.h
index 2463c0f9fac..f168b5fbf8c 100644
--- a/src/include/storage/sinval.h
+++ b/src/include/storage/sinval.h
@@ -27,6 +27,7 @@
  *	* invalidate an smgr cache entry for a specific physical relation
  *	* invalidate the mapped-relation mapping for a given database
  *	* invalidate any saved snapshot that might be used to scan a given relation
+ *	* invalidate a RelationSyncCache entry for a specific relation
  * More types could be added if needed.  The message type is identified by
  * the first "int8" field of the message struct.  Zero or positive means a
  * specific-catcache inval message (and also serves as the catcache ID field).
@@ -46,12 +47,12 @@
  * catcache inval messages must be generated for each of its caches, since
  * the hash keys will generally be different.
  *
- * Catcache, relcache, and snapshot invalidations are transactional, and so
- * are sent to other backends upon commit.  Internally to the generating
- * backend, they are also processed at CommandCounterIncrement so that later
- * commands in the same transaction see the new state.  The generating backend
- * also has to process them at abort, to flush out any cache state it's loaded
- * from no-longer-valid entries.
+ * Catcache, relcache, relsynccache, and snapshot invalidations are
+ * transactional, and so are sent to other backends upon commit.  Internally
+ * to the generating backend, they are also processed at
+ * CommandCounterIncrement so that later commands in the same transaction see
+ * the new state.  The generating backend also has to process them at abort,
+ * to flush out any cache state it's loaded from no-longer-valid entries.
  *
  * smgr and relation mapping invalidations are non-transactional: they are
  * sent immediately when the underlying file change is made.
@@ -110,6 +111,15 @@ typedef struct
 	Oid			relId;			/* relation ID */
 } SharedInvalSnapshotMsg;
 
+#define SHAREDINVALRELSYNC_ID	(-6)
+
+typedef struct
+{
+	int8		id;				/* type field --- must be first */
+	Oid			dbId;			/* database ID */
+	Oid			relid;			/* relation ID, or 0 if whole RelationSyncCache */
+} SharedInvalRelSyncMsg;
+
 typedef union
 {
 	int8		id;				/* type field --- must be first */
@@ -119,6 +129,7 @@ typedef union
 	SharedInvalSmgrMsg sm;
 	SharedInvalRelmapMsg rm;
 	SharedInvalSnapshotMsg sn;
+	SharedInvalRelSyncMsg rs;
 } SharedInvalidationMessage;
 
 
diff --git a/src/include/utils/inval.h b/src/include/utils/inval.h
index 40658ba2ffc..9b871caef62 100644
--- a/src/include/utils/inval.h
+++ b/src/include/utils/inval.h
@@ -22,6 +22,7 @@ extern PGDLLIMPORT int debug_discard_caches;
 
 typedef void (*SyscacheCallbackFunction) (Datum arg, int cacheid, uint32 hashvalue);
 typedef void (*RelcacheCallbackFunction) (Datum arg, Oid relid);
+typedef void (*RelSyncCallbackFunction) (Datum arg, Oid relid);
 
 
 extern void AcceptInvalidationMessages(void);
@@ -55,6 +56,10 @@ extern void CacheInvalidateRelcacheByTuple(HeapTuple classTuple);
 
 extern void CacheInvalidateRelcacheByRelid(Oid relid);
 
+extern void CacheInvalidateRelSync(Oid relid);
+
+extern void CacheInvalidateRelSyncAll(void);
+
 extern void CacheInvalidateSmgr(RelFileLocatorBackend rlocator);
 
 extern void CacheInvalidateRelmap(Oid databaseId);
@@ -66,8 +71,13 @@ extern void CacheRegisterSyscacheCallback(int cacheid,
 extern void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func,
 										  Datum arg);
 
+extern void CacheRegisterRelSyncCallback(RelSyncCallbackFunction func,
+										 Datum arg);
+
 extern void CallSyscacheCallbacks(int cacheid, uint32 hashvalue);
 
+extern void CallRelSyncCallbacks(Oid relid);
+
 extern void InvalidateSystemCaches(void);
 extern void InvalidateSystemCachesExtended(bool debug_discard);
 
-- 
2.39.1

#22Hayato Kuroda (Fujitsu)
kuroda.hayato@fujitsu.com
In reply to: Amit Kapila (#21)
2 attachment(s)
RE: Selectively invalidate caches in pgoutput module

Dear Amit,

Thanks, the patch looks mostly good to me. I have made few cosmetic
changes in the attached and combined both the patches.

Thanks, it looks good to me.

The existing
Alter Publication ... Rename tests don't test invalidations arising
from that command. As this patch changes that code path, it would be
good to add a few tests for the same. We can add one for individual
relations and another for ALL Tables publication.

I created new patch which adds a test code. I added in 007_ddl.pl, but I feel
it is OK to introduce new test file. How do you think?

Best regards,
Hayato Kuroda
FUJITSU LIMITED

Attachments:

v12-0001-Avoid-invalidating-all-RelationSyncCache-entries.patchapplication/octet-stream; name=v12-0001-Avoid-invalidating-all-RelationSyncCache-entries.patchDownload
From 8c921192b855432783d6c09f32dcbf1f6701fee0 Mon Sep 17 00:00:00 2001
From: Amit Kapila <akapila@postgresql.org>
Date: Wed, 12 Mar 2025 14:41:26 +0530
Subject: [PATCH v12 1/2] Avoid invalidating all RelationSyncCache entries on
 publication rename.

On Publication rename, we need to only invalidate the RelationSyncCache
entries corresponding to relations that are part of the publication being
renamed.

As part of this patch, we introduce a new invalidation message to
invalidate the cache maintained by the logical decoding output plugin. We
can't use existing relcache invalidation for this purpose, as that would
unnecessarily cause relcache invalidations in other backends.

Author: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Author: Shlok Kyal <shlok.kyal.oss@gmail.com>
Reviewed-by: Hou Zhijie <houzj.fnst@fujitsu.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Discussion: https://postgr.es/m/OSCPR01MB14966C09AA201EFFA706576A7F5C92@OSCPR01MB14966.jpnprd01.prod.outlook.com
---
 src/backend/access/rmgrdesc/standbydesc.c   |   2 +
 src/backend/commands/alter.c                |  16 +++
 src/backend/commands/publicationcmds.c      |  39 ++++++
 src/backend/replication/pgoutput/pgoutput.c |   8 +-
 src/backend/utils/cache/inval.c             | 125 ++++++++++++++++++++
 src/include/commands/publicationcmds.h      |   1 +
 src/include/pg_config_manual.h              |   8 +-
 src/include/storage/sinval.h                |  23 +++-
 src/include/utils/inval.h                   |  10 ++
 9 files changed, 216 insertions(+), 16 deletions(-)

diff --git a/src/backend/access/rmgrdesc/standbydesc.c b/src/backend/access/rmgrdesc/standbydesc.c
index d849f8e54b..81eff5f31c 100644
--- a/src/backend/access/rmgrdesc/standbydesc.c
+++ b/src/backend/access/rmgrdesc/standbydesc.c
@@ -132,6 +132,8 @@ standby_desc_invalidations(StringInfo buf,
 			appendStringInfo(buf, " relmap db %u", msg->rm.dbId);
 		else if (msg->id == SHAREDINVALSNAPSHOT_ID)
 			appendStringInfo(buf, " snapshot %u", msg->sn.relId);
+		else if (msg->id == SHAREDINVALRELSYNC_ID)
+			appendStringInfo(buf, " relsync %u", msg->rs.relid);
 		else
 			appendStringInfo(buf, " unrecognized id %d", msg->id);
 	}
diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c
index 78c1d4e1b8..3b7114b859 100644
--- a/src/backend/commands/alter.c
+++ b/src/backend/commands/alter.c
@@ -338,6 +338,22 @@ AlterObjectRename_internal(Relation rel, Oid objectId, const char *new_name)
 
 	InvokeObjectPostAlterHook(classId, objectId, 0);
 
+	/* Do post catalog-update tasks */
+	if (classId == PublicationRelationId)
+	{
+		Form_pg_publication pub = (Form_pg_publication) GETSTRUCT(oldtup);
+
+		/*
+		 * Invalidate relsynccache entries.
+		 *
+		 * Unlike ALTER PUBLICATION ADD/SET/DROP commands, renaming a
+		 * publication does not impact the publication status of tables. So, we
+		 * don't need to invalidate relcache to rebuild the rd_pubdesc.
+		 * Instead, we invalidate only the relsyncache.
+		 */
+		InvalidatePubRelSyncCache(pub->oid, pub->puballtables);
+	}
+
 	/* Release memory */
 	pfree(values);
 	pfree(nulls);
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 150a768d16..94a5a2b6c7 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -491,6 +491,45 @@ pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
 	return *invalid_column_list || *invalid_gen_col;
 }
 
+/*
+ * Invalidate entries in the RelationSyncCache for relations included in the
+ * specified publication, either via FOR TABLE or FOR TABLES IN SCHEMA.
+ *
+ * If 'puballtables' is true, invalidate all cache entries.
+ */
+void
+InvalidatePubRelSyncCache(Oid pubid, bool puballtables)
+{
+	if (puballtables)
+	{
+		CacheInvalidateRelSyncAll();
+	}
+	else
+	{
+		List	   *relids = NIL;
+		List	   *schemarelids = NIL;
+
+		/*
+		 * For partitioned tables, we must invalidate all partitions and
+		 * itself. WAL records for INSERT/UPDATE/DELETE specify leaf
+		 * tables as a target. However, WAL records for TRUNCATE specify
+		 * both a root and its leaves.
+		 */
+		relids = GetPublicationRelations(pubid,
+										 PUBLICATION_PART_ALL);
+		schemarelids = GetAllSchemaPublicationRelations(pubid,
+														PUBLICATION_PART_ALL);
+
+		relids = list_concat_unique_oid(relids, schemarelids);
+
+		/* Invalidate the relsyncache */
+		foreach_oid(relid, relids)
+			CacheInvalidateRelSync(relid);
+	}
+
+	return;
+}
+
 /* check_functions_in_node callback */
 static bool
 contain_mutable_or_user_functions_checker(Oid func_id, void *context)
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 9063af6e1d..ed806c5430 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -531,6 +531,8 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 			CacheRegisterSyscacheCallback(PUBLICATIONOID,
 										  publication_invalidation_cb,
 										  (Datum) 0);
+			CacheRegisterRelSyncCallback(rel_sync_cache_relation_cb,
+										 (Datum) 0);
 			publication_callback_registered = true;
 		}
 
@@ -1789,12 +1791,6 @@ static void
 publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
 {
 	publications_valid = false;
-
-	/*
-	 * Also invalidate per-relation cache so that next time the filtering info
-	 * is checked it will be updated with the new publication settings.
-	 */
-	rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
 }
 
 /*
diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c
index 700ccb6df9..4eb6772073 100644
--- a/src/backend/utils/cache/inval.c
+++ b/src/backend/utils/cache/inval.c
@@ -271,6 +271,7 @@ int			debug_discard_caches = 0;
 
 #define MAX_SYSCACHE_CALLBACKS 64
 #define MAX_RELCACHE_CALLBACKS 10
+#define MAX_RELSYNC_CALLBACKS 10
 
 static struct SYSCACHECALLBACK
 {
@@ -292,6 +293,15 @@ static struct RELCACHECALLBACK
 
 static int	relcache_callback_count = 0;
 
+static struct RELSYNCCALLBACK
+{
+	RelSyncCallbackFunction function;
+	Datum		arg;
+}			relsync_callback_list[MAX_RELSYNC_CALLBACKS];
+
+static int	relsync_callback_count = 0;
+
+
 /* ----------------------------------------------------------------
  *				Invalidation subgroup support functions
  * ----------------------------------------------------------------
@@ -484,6 +494,36 @@ AddRelcacheInvalidationMessage(InvalidationMsgsGroup *group,
 	AddInvalidationMessage(group, RelCacheMsgs, &msg);
 }
 
+/*
+ * Add a relsync inval entry
+ *
+ * We put these into the relcache subgroup for simplicity. This message is the
+ * same as AddRelcacheInvalidationMessage() except that it is for
+ * RelationSyncCache maintained by decoding plugin pgoutput.
+ */
+static void
+AddRelsyncInvalidationMessage(InvalidationMsgsGroup *group,
+							  Oid dbId, Oid relId)
+{
+	SharedInvalidationMessage msg;
+
+	/* Don't add a duplicate item. */
+	ProcessMessageSubGroup(group, RelCacheMsgs,
+						   if (msg->rc.id == SHAREDINVALRELSYNC_ID &&
+							   (msg->rc.relId == relId ||
+								msg->rc.relId == InvalidOid))
+						   return);
+
+	/* OK, add the item */
+	msg.rc.id = SHAREDINVALRELSYNC_ID;
+	msg.rc.dbId = dbId;
+	msg.rc.relId = relId;
+	/* check AddCatcacheInvalidationMessage() for an explanation */
+	VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg));
+
+	AddInvalidationMessage(group, RelCacheMsgs, &msg);
+}
+
 /*
  * Add a snapshot inval entry
  *
@@ -611,6 +651,17 @@ RegisterRelcacheInvalidation(InvalidationInfo *info, Oid dbId, Oid relId)
 		info->RelcacheInitFileInval = true;
 }
 
+/*
+ * RegisterRelsyncInvalidation
+ *
+ * As above, but register a relsynccache invalidation event.
+ */
+static void
+RegisterRelsyncInvalidation(InvalidationInfo *info, Oid dbId, Oid relId)
+{
+	AddRelsyncInvalidationMessage(&info->CurrentCmdInvalidMsgs, dbId, relId);
+}
+
 /*
  * RegisterSnapshotInvalidation
  *
@@ -751,6 +802,13 @@ InvalidateSystemCachesExtended(bool debug_discard)
 
 		ccitem->function(ccitem->arg, InvalidOid);
 	}
+
+	for (i = 0; i < relsync_callback_count; i++)
+	{
+		struct RELSYNCCALLBACK *ccitem = relsync_callback_list + i;
+
+		ccitem->function(ccitem->arg, InvalidOid);
+	}
 }
 
 /*
@@ -832,6 +890,12 @@ LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
 		else if (msg->sn.dbId == MyDatabaseId)
 			InvalidateCatalogSnapshot();
 	}
+	else if (msg->id == SHAREDINVALRELSYNC_ID)
+	{
+		/* We only care about our own database */
+		if (msg->rs.dbId == MyDatabaseId)
+			CallRelSyncCallbacks(msg->rs.relid);
+	}
 	else
 		elog(FATAL, "unrecognized SI message ID: %d", msg->id);
 }
@@ -1621,6 +1685,32 @@ CacheInvalidateRelcacheByRelid(Oid relid)
 	ReleaseSysCache(tup);
 }
 
+/*
+ * CacheInvalidateRelSync
+ *		Register invalidation of the cache in logical decoding output plugin
+ *		for a database.
+ *
+ * This type of invalidation message is used for the specific purpose of output
+ * plugins. Processes which do not decode WALs would do nothing even when it
+ * receives the message.
+ */
+void
+CacheInvalidateRelSync(Oid relid)
+{
+	RegisterRelsyncInvalidation(PrepareInvalidationState(),
+								MyDatabaseId, relid);
+}
+
+/*
+ * CacheInvalidateRelSyncAll
+ *		Register invalidation of the whole cache in logical decoding output
+ *		plugin.
+ */
+void
+CacheInvalidateRelSyncAll(void)
+{
+	CacheInvalidateRelSync(InvalidOid);
+}
 
 /*
  * CacheInvalidateSmgr
@@ -1763,6 +1853,27 @@ CacheRegisterRelcacheCallback(RelcacheCallbackFunction func,
 	++relcache_callback_count;
 }
 
+/*
+ * CacheRegisterRelSyncCallback
+ *		Register the specified function to be called for all future
+ *		relsynccache invalidation events.
+ *
+ * This function is intended to be call from the logical decoding output
+ * plugins.
+ */
+void
+CacheRegisterRelSyncCallback(RelSyncCallbackFunction func,
+							 Datum arg)
+{
+	if (relsync_callback_count >= MAX_RELSYNC_CALLBACKS)
+		elog(FATAL, "out of relsync_callback_list slots");
+
+	relsync_callback_list[relsync_callback_count].function = func;
+	relsync_callback_list[relsync_callback_count].arg = arg;
+
+	++relsync_callback_count;
+}
+
 /*
  * CallSyscacheCallbacks
  *
@@ -1788,6 +1899,20 @@ CallSyscacheCallbacks(int cacheid, uint32 hashvalue)
 	}
 }
 
+/*
+ * CallSyscacheCallbacks
+ */
+void
+CallRelSyncCallbacks(Oid relid)
+{
+	for (int i = 0; i < relsync_callback_count; i++)
+	{
+		struct RELSYNCCALLBACK *ccitem = relsync_callback_list + i;
+
+		ccitem->function(ccitem->arg, relid);
+	}
+}
+
 /*
  * LogLogicalInvalidations
  *
diff --git a/src/include/commands/publicationcmds.h b/src/include/commands/publicationcmds.h
index e11a942ea0..e41df6db03 100644
--- a/src/include/commands/publicationcmds.h
+++ b/src/include/commands/publicationcmds.h
@@ -38,5 +38,6 @@ extern bool pub_contains_invalid_column(Oid pubid, Relation relation,
 										char pubgencols_type,
 										bool *invalid_column_list,
 										bool *invalid_gen_col);
+extern void InvalidatePubRelSyncCache(Oid pubid, bool puballtables);
 
 #endif							/* PUBLICATIONCMDS_H */
diff --git a/src/include/pg_config_manual.h b/src/include/pg_config_manual.h
index 449e50bd78..125d3eb5ff 100644
--- a/src/include/pg_config_manual.h
+++ b/src/include/pg_config_manual.h
@@ -282,10 +282,10 @@
 
 /*
  * For cache-invalidation debugging, define DISCARD_CACHES_ENABLED to enable
- * use of the debug_discard_caches GUC to aggressively flush syscache/relcache
- * entries whenever it's possible to deliver invalidations.  See
- * AcceptInvalidationMessages() in src/backend/utils/cache/inval.c for
- * details.
+ * use of the debug_discard_caches GUC to aggressively flush
+ * syscache/relcache/relsynccache entries whenever it's possible to deliver
+ * invalidations.  See AcceptInvalidationMessages() in
+ * src/backend/utils/cache/inval.c for details.
  *
  * USE_ASSERT_CHECKING builds default to enabling this.  It's possible to use
  * DISCARD_CACHES_ENABLED without a cassert build and the implied
diff --git a/src/include/storage/sinval.h b/src/include/storage/sinval.h
index 2463c0f9fa..f168b5fbf8 100644
--- a/src/include/storage/sinval.h
+++ b/src/include/storage/sinval.h
@@ -27,6 +27,7 @@
  *	* invalidate an smgr cache entry for a specific physical relation
  *	* invalidate the mapped-relation mapping for a given database
  *	* invalidate any saved snapshot that might be used to scan a given relation
+ *	* invalidate a RelationSyncCache entry for a specific relation
  * More types could be added if needed.  The message type is identified by
  * the first "int8" field of the message struct.  Zero or positive means a
  * specific-catcache inval message (and also serves as the catcache ID field).
@@ -46,12 +47,12 @@
  * catcache inval messages must be generated for each of its caches, since
  * the hash keys will generally be different.
  *
- * Catcache, relcache, and snapshot invalidations are transactional, and so
- * are sent to other backends upon commit.  Internally to the generating
- * backend, they are also processed at CommandCounterIncrement so that later
- * commands in the same transaction see the new state.  The generating backend
- * also has to process them at abort, to flush out any cache state it's loaded
- * from no-longer-valid entries.
+ * Catcache, relcache, relsynccache, and snapshot invalidations are
+ * transactional, and so are sent to other backends upon commit.  Internally
+ * to the generating backend, they are also processed at
+ * CommandCounterIncrement so that later commands in the same transaction see
+ * the new state.  The generating backend also has to process them at abort,
+ * to flush out any cache state it's loaded from no-longer-valid entries.
  *
  * smgr and relation mapping invalidations are non-transactional: they are
  * sent immediately when the underlying file change is made.
@@ -110,6 +111,15 @@ typedef struct
 	Oid			relId;			/* relation ID */
 } SharedInvalSnapshotMsg;
 
+#define SHAREDINVALRELSYNC_ID	(-6)
+
+typedef struct
+{
+	int8		id;				/* type field --- must be first */
+	Oid			dbId;			/* database ID */
+	Oid			relid;			/* relation ID, or 0 if whole RelationSyncCache */
+} SharedInvalRelSyncMsg;
+
 typedef union
 {
 	int8		id;				/* type field --- must be first */
@@ -119,6 +129,7 @@ typedef union
 	SharedInvalSmgrMsg sm;
 	SharedInvalRelmapMsg rm;
 	SharedInvalSnapshotMsg sn;
+	SharedInvalRelSyncMsg rs;
 } SharedInvalidationMessage;
 
 
diff --git a/src/include/utils/inval.h b/src/include/utils/inval.h
index 40658ba2ff..9b871caef6 100644
--- a/src/include/utils/inval.h
+++ b/src/include/utils/inval.h
@@ -22,6 +22,7 @@ extern PGDLLIMPORT int debug_discard_caches;
 
 typedef void (*SyscacheCallbackFunction) (Datum arg, int cacheid, uint32 hashvalue);
 typedef void (*RelcacheCallbackFunction) (Datum arg, Oid relid);
+typedef void (*RelSyncCallbackFunction) (Datum arg, Oid relid);
 
 
 extern void AcceptInvalidationMessages(void);
@@ -55,6 +56,10 @@ extern void CacheInvalidateRelcacheByTuple(HeapTuple classTuple);
 
 extern void CacheInvalidateRelcacheByRelid(Oid relid);
 
+extern void CacheInvalidateRelSync(Oid relid);
+
+extern void CacheInvalidateRelSyncAll(void);
+
 extern void CacheInvalidateSmgr(RelFileLocatorBackend rlocator);
 
 extern void CacheInvalidateRelmap(Oid databaseId);
@@ -66,8 +71,13 @@ extern void CacheRegisterSyscacheCallback(int cacheid,
 extern void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func,
 										  Datum arg);
 
+extern void CacheRegisterRelSyncCallback(RelSyncCallbackFunction func,
+										 Datum arg);
+
 extern void CallSyscacheCallbacks(int cacheid, uint32 hashvalue);
 
+extern void CallRelSyncCallbacks(Oid relid);
+
 extern void InvalidateSystemCaches(void);
 extern void InvalidateSystemCachesExtended(bool debug_discard);
 
-- 
2.43.5

v12-0002-Add-testcase.patchapplication/octet-stream; name=v12-0002-Add-testcase.patchDownload
From 2a5efca7adb5f91f51cb63cde13e845b454c9f34 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Wed, 12 Mar 2025 20:53:45 +0900
Subject: [PATCH v12 2/2] Add testcase

---
 src/test/subscription/t/007_ddl.pl | 81 ++++++++++++++++++++++++++++++
 1 file changed, 81 insertions(+)

diff --git a/src/test/subscription/t/007_ddl.pl b/src/test/subscription/t/007_ddl.pl
index 4d3b917ac0..e663e9f515 100644
--- a/src/test/subscription/t/007_ddl.pl
+++ b/src/test/subscription/t/007_ddl.pl
@@ -69,6 +69,87 @@ ok( $stderr =~
 	"Alter subscription set publication throws warning for non-existent publication"
 );
 
+# Cleanup
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION mypub;");
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION mysub1");
+
+#
+# Test ALTER PUBLICATION RENAME command during the replication
+#
+
+pass "renaming publications can work";
+
+# Test function for swaping name of publications
+sub test_swap
+{
+	my ($table_name, $pubname, $appname) = @_;
+
+	# Confirms tuples can be replicated
+	$node_publisher->safe_psql('postgres', "INSERT INTO $table_name VALUES (1);");
+	$node_publisher->wait_for_catchup($appname);
+	my $result =
+		$node_subscriber->safe_psql('postgres', "SELECT a FROM $table_name");
+	is($result, qq(1), 'check replication worked well');
+
+	# Swap the name of publications; $pubname <-> pub_empty
+	$node_publisher->safe_psql('postgres', qq[
+		ALTER PUBLICATION $pubname RENAME TO tap_pub_tmp;
+		ALTER PUBLICATION pub_empty RENAME TO $pubname;
+		ALTER PUBLICATION tap_pub_tmp RENAME TO pub_empty;
+	]);
+
+	# Insert the data again
+	$node_publisher->safe_psql('postgres', "INSERT INTO $table_name VALUES (2);");
+	$node_publisher->wait_for_catchup($appname);
+
+	# Confirms the second tuple won't be replicated because $pubname does not
+	# contains relations anymore.
+	$result =
+		$node_subscriber->safe_psql('postgres', "SELECT a FROM $table_name ORDER BY a");
+	is($result, qq(1),
+		'check the tuple inserted after the RENAME was not replicated');
+
+	# Swap the name of publications again
+	$node_publisher->safe_psql('postgres', qq[
+		ALTER PUBLICATION $pubname RENAME TO tap_pub_tmp;
+		ALTER PUBLICATION pub_empty RENAME TO $pubname;
+		ALTER PUBLICATION tap_pub_tmp RENAME TO pub_empty;
+	]);
+
+	# Confirms the replication is now resumed
+	$node_publisher->safe_psql('postgres', "INSERT INTO $table_name VALUES (3);");
+	$result =
+		$node_subscriber->safe_psql('postgres', "SELECT a FROM $table_name ORDER BY a");
+	is($result, qq(1
+3), 'check replicated resumed after renaming gain');
+}
+
+# Create another table
+$ddl = "CREATE TABLE test2 (a int, b text);";
+$node_publisher->safe_psql('postgres', $ddl);
+$node_subscriber->safe_psql('postgres', $ddl);
+
+# Create publications and a subscription
+$node_publisher->safe_psql('postgres', qq[
+	CREATE PUBLICATION pub_empty;
+	CREATE PUBLICATION pub_for_tab FOR TABLE test1;
+	CREATE PUBLICATION pub_for_all_tables FOR ALL TABLES;
+]);
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION pub_for_tab WITH (copy_data = off)"
+);
+
+# Confirms RENAME command works well for a publication
+test_swap('test1', 'pub_for_tab', 'tap_sub');
+
+# Switches a publication which includes all tables
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION tap_sub SET PUBLICATION pub_for_all_tables WITH (refresh = true, copy_data = false);"
+);
+
+# Confirms RENAME command works well for ALL TABLES publication
+test_swap('test2', 'pub_for_all_tables', 'tap_sub');
+
 $node_subscriber->stop;
 $node_publisher->stop;
 
-- 
2.43.5

#23Amit Kapila
amit.kapila16@gmail.com
In reply to: Hayato Kuroda (Fujitsu) (#22)
Re: Selectively invalidate caches in pgoutput module

On Wed, Mar 12, 2025 at 5:46 PM Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:

Thanks, the patch looks mostly good to me. I have made few cosmetic
changes in the attached and combined both the patches.

Thanks, it looks good to me.

The existing
Alter Publication ... Rename tests don't test invalidations arising
from that command. As this patch changes that code path, it would be
good to add a few tests for the same. We can add one for individual
relations and another for ALL Tables publication.

I created new patch which adds a test code.

Thanks. I have pushed the patch after minor changes in the test.

--
With Regards,
Amit Kapila.

#24Andres Freund
andres@anarazel.de
In reply to: Amit Kapila (#23)
Re: Selectively invalidate caches in pgoutput module

Hi,

On 2025-03-13 14:13:39 +0530, Amit Kapila wrote:

On Wed, Mar 12, 2025 at 5:46 PM Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:

Thanks, the patch looks mostly good to me. I have made few cosmetic
changes in the attached and combined both the patches.

Thanks, it looks good to me.

The existing
Alter Publication ... Rename tests don't test invalidations arising
from that command. As this patch changes that code path, it would be
good to add a few tests for the same. We can add one for individual
relations and another for ALL Tables publication.

I created new patch which adds a test code.

Thanks. I have pushed the patch after minor changes in the test.

I think the new tests just failed in CI:
https://cirrus-ci.com/task/5602950271205376?logs=test_world#L268

[13:34:38.562] ――――――――――――――――――――――――――――――――――――― ✀ ―――――――――――――――――――――――――――――――――――――
[13:34:38.562] stderr:
[13:34:38.562] # Failed test 'check replication worked well before renaming a publication'
[13:34:38.562] # at /tmp/cirrus-ci-build/src/test/subscription/t/007_ddl.pl line 93.
[13:34:38.562] # got: ''
[13:34:38.562] # expected: '1'
[13:34:38.562] # Failed test 'check the tuple inserted after the RENAME was not replicated'
[13:34:38.562] # at /tmp/cirrus-ci-build/src/test/subscription/t/007_ddl.pl line 110.
[13:34:38.562] # got: ''
[13:34:38.562] # expected: '1'
[13:34:38.562] # Looks like you failed 2 tests of 8.
[13:34:38.562]
[13:34:38.562] (test program exited with status code 2)
[13:34:38.562] ――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――
[13:34:38.562]

Greetings,

Andres Freund

#25Hayato Kuroda (Fujitsu)
kuroda.hayato@fujitsu.com
In reply to: Andres Freund (#24)
RE: Selectively invalidate caches in pgoutput module

Dear Andres,

I think the new tests just failed in CI:
https://cirrus-ci.com/task/5602950271205376?logs=test_world#L268

Thanks for reporting, I'll look into it.

Best regards,
Hayato Kuroda
FUJITSU LIMITED

#26Amit Kapila
amit.kapila16@gmail.com
In reply to: Hayato Kuroda (Fujitsu) (#25)
Re: Selectively invalidate caches in pgoutput module

On Fri, Mar 28, 2025 at 6:03 AM Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:

Dear Andres,

I think the new tests just failed in CI:
https://cirrus-ci.com/task/5602950271205376?logs=test_world#L268

Thanks for reporting, I'll look into it.

The problem here is that after ALTER SUBSCRIPTION tap_sub SET
PUBLICATION ..., we didn't wait for the new walsender on publisher to
start. We must use wait_for_subscription_sync both after the "CREATE
SUBSCRIPTION ..." and the "ALTER SUBSCRIPTION ..." commands and keep
copy_data=true to ensure the initial replication is setup between
publisher and subscriber. This is how we use these commands at other
places.

--
With Regards,
Amit Kapila.

#27Hayato Kuroda (Fujitsu)
kuroda.hayato@fujitsu.com
In reply to: Amit Kapila (#26)
1 attachment(s)
RE: Selectively invalidate caches in pgoutput module

Dear Amit,

The problem here is that after ALTER SUBSCRIPTION tap_sub SET
PUBLICATION ..., we didn't wait for the new walsender on publisher to
start. We must use wait_for_subscription_sync both after the "CREATE
SUBSCRIPTION ..." and the "ALTER SUBSCRIPTION ..." commands and keep
copy_data=true to ensure the initial replication is setup between
publisher and subscriber. This is how we use these commands at other
places.

Agreed. PSA the patch to fix the issue.

Best regards,
Hayato Kuroda
FUJITSU LIMITED

Attachments:

0001-Stablize-tests-added-in-3abe9dc188.patchapplication/octet-stream; name=0001-Stablize-tests-added-in-3abe9dc188.patchDownload
From 67c9a2c1a83c94b9c30454592b6b498f9bfbd41f Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Fri, 28 Mar 2025 14:02:29 +0900
Subject: [PATCH] Stablize tests added in 3abe9dc188.

The problem is that after the ALTER SUBSCRIPTION tap_sub SET PUBLICATION
command, we didn't wait for the new walsender to start on the publisher.
Immediately after ALTER, we performed Insert and expected it to replicate.
However, the replication could start from a point after the INSERT location,
and as the subscription isn't copying initial data, we could miss such an
Insert.

The fix is to wait for connection to be established between publisher and
subscriber before starting DML operations that are expected to replicate.

As per CI.

Reported-by: Andres Freund <andres@anarazel.de>
Author: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Discussion: https://postgr.es/m/CALDaNm2ms1deM5EYNLFEfESv_Kw=Y4AiTB0LP=qGS-UpFwGbPg@mail.gmail.com
---
 src/test/subscription/t/007_ddl.pl | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/src/test/subscription/t/007_ddl.pl b/src/test/subscription/t/007_ddl.pl
index 7d4c2d51c3a..36f1817ece6 100644
--- a/src/test/subscription/t/007_ddl.pl
+++ b/src/test/subscription/t/007_ddl.pl
@@ -130,16 +130,18 @@ $node_publisher->safe_psql('postgres', qq[
 	CREATE PUBLICATION pub_for_all_tables FOR ALL TABLES;
 ]);
 $node_subscriber->safe_psql('postgres',
-	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION pub_for_tab WITH (copy_data = off)"
+	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION pub_for_tab"
 );
+$node_subscriber->wait_for_subscription_sync;
 
 # Confirms RENAME command works well for a publication
 test_swap('test1', 'pub_for_tab', 'tap_sub');
 
 # Switches a publication which includes all tables
 $node_subscriber->safe_psql('postgres',
-	"ALTER SUBSCRIPTION tap_sub SET PUBLICATION pub_for_all_tables WITH (refresh = true, copy_data = false);"
+	"ALTER SUBSCRIPTION tap_sub SET PUBLICATION pub_for_all_tables;"
 );
+$node_subscriber->wait_for_subscription_sync;
 
 # Confirms RENAME command works well for ALL TABLES publication
 test_swap('test2', 'pub_for_all_tables', 'tap_sub');
-- 
2.43.5

#28Amit Kapila
amit.kapila16@gmail.com
In reply to: Hayato Kuroda (Fujitsu) (#27)
Re: Selectively invalidate caches in pgoutput module

On Fri, Mar 28, 2025 at 10:46 AM Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:

The problem here is that after ALTER SUBSCRIPTION tap_sub SET
PUBLICATION ..., we didn't wait for the new walsender on publisher to
start. We must use wait_for_subscription_sync both after the "CREATE
SUBSCRIPTION ..." and the "ALTER SUBSCRIPTION ..." commands and keep
copy_data=true to ensure the initial replication is setup between
publisher and subscriber. This is how we use these commands at other
places.

Agreed. PSA the patch to fix the issue.

Pushed after slight modification.

--
With Regards,
Amit Kapila.