From bc400e7c9c8ad2ac4a0c431dfab9ec2f78bd5870 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.vondra@postgresql.org>
Date: Thu, 16 Nov 2023 19:57:33 +0100
Subject: [PATCH 2/3] experimental fix

---
 src/backend/replication/pgoutput/pgoutput.c | 54 ++++++++++++++++++++-
 1 file changed, 53 insertions(+), 1 deletion(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 09f53bea0a0..adcd5688045 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -12,6 +12,8 @@
  */
 #include "postgres.h"
 
+#include "access/genam.h"
+#include "access/table.h"
 #include "access/tupconvert.h"
 #include "catalog/partition.h"
 #include "catalog/pg_publication.h"
@@ -29,6 +31,7 @@
 #include "replication/origin.h"
 #include "replication/pgoutput.h"
 #include "utils/builtins.h"
+#include "utils/fmgroids.h"
 #include "utils/inval.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
@@ -1958,6 +1961,51 @@ set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
 	MemoryContextSwitchTo(oldctx);
 }
 
+static List*
+GetRelationPublicationsRaw(Oid relid)
+{
+	List *result = NIL;
+	SysScanDesc scandesc;
+	Relation relation;
+	ScanKeyData key[1];
+	HeapTuple	tup;
+
+	elog(LOG, "GetRelationPublicationsRaw relid %d / start", relid);
+
+	relation = table_open(PublicationRelRelationId, AccessShareLock);
+
+	ScanKeyInit(&key[0],
+				Anum_pg_publication_rel_prrelid,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(relid));
+
+	scandesc = systable_beginscan(relation,
+								  PublicationRelPrrelidPrpubidIndexId,
+								  true,
+								  NULL,
+								  1,
+								  key);
+
+	while (HeapTupleIsValid(tup = systable_getnext(scandesc)))
+	{
+		Form_pg_publication_rel form;
+
+		form = (Form_pg_publication_rel) GETSTRUCT(tup);
+
+		elog(LOG, "GetRelationPublicationsRaw relid %d / tuple pub %d", relid, form->prpubid);
+
+		result = lappend_oid(result, form->prpubid);
+	}
+
+	systable_endscan(scandesc);
+
+	table_close(relation, AccessShareLock);
+
+	elog(LOG, "GetRelationPublicationsRaw relid %d / end", relid);
+
+	return result;
+}
+
 /*
  * Find or create entry in the relation schema cache.
  *
@@ -2003,11 +2051,15 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 
 	elog(LOG, "get_rel_sync_entry relation %d replicate_valid %d", RelationGetRelid(relation), entry->replicate_valid);
 
+	/* force refresh of the entry for each change */
+	entry->replicate_valid = false;
+
 	/* Validate the entry */
 	if (!entry->replicate_valid)
 	{
 		Oid			schemaId = get_rel_namespace(relid);
-		List	   *pubids = GetRelationPublications(relid);
+		List	   *pubids = GetRelationPublicationsRaw(relid);
+		// List	   *pubids = GetRelationPublications(relid);
 
 		/*
 		 * We don't acquire a lock on the namespace system table as we build
-- 
2.41.0

