From 90b8df330df049bd1d7e881dc6e9b108c17b0924 Mon Sep 17 00:00:00 2001
From: "yizhi.fzh" <yizhi.fzh@alibaba-inc.com>
Date: Wed, 21 Feb 2024 18:40:03 +0800
Subject: [PATCH v1 1/1] Make get_rel_sync_entry less depending on transaction
 state.

get_rel_sync_entry needs transaction only a replicate_valid = false
entry is found, this should be some rare case. However the caller can't
know if a entry is valid, so they have to prepare the transaction state
before calling this function. Such preparation is expensive.

This patch makes the get_rel_sync_entry can manage a transaction stage
only if necessary. so the callers don't need to prepare it blindly.
---
 src/backend/replication/pgoutput/pgoutput.c | 60 ++++++++++++++++++++-
 1 file changed, 59 insertions(+), 1 deletion(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 998f92d671..25e55590a2 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -13,6 +13,7 @@
 #include "postgres.h"
 
 #include "access/tupconvert.h"
+#include "access/relation.h"
 #include "catalog/partition.h"
 #include "catalog/pg_publication.h"
 #include "catalog/pg_publication_rel.h"
@@ -214,6 +215,11 @@ static void init_rel_sync_cache(MemoryContext cachectx);
 static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
 static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
 											 Relation relation);
+static RelationSyncEntry *get_rel_sync_entry_by_relid(PGOutputData *data,
+													  Oid relid);
+static RelationSyncEntry *get_rel_sync_entry_internal(PGOutputData *data,
+													  Relation relation,
+													  Oid relid);
 static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
 static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
 										  uint32 hashvalue);
@@ -1962,11 +1968,29 @@ set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
  */
 static RelationSyncEntry *
 get_rel_sync_entry(PGOutputData *data, Relation relation)
+{
+	return get_rel_sync_entry_internal(data, relation, InvalidOid);
+}
+
+static RelationSyncEntry *
+__attribute__ ((unused))
+get_rel_sync_entry_by_relid(PGOutputData *data, Oid relid)
+{
+	return get_rel_sync_entry_internal(data, NULL, relid);
+}
+
+static RelationSyncEntry *
+get_rel_sync_entry_internal(PGOutputData *data, Relation relation, Oid oid)
 {
 	RelationSyncEntry *entry;
 	bool		found;
 	MemoryContext oldctx;
-	Oid			relid = RelationGetRelid(relation);
+	Oid			relid = OidIsValid(oid) ? oid: RelationGetRelid(relation);
+	bool		started_xact = false, using_subtxn;
+
+	/* either oid or relation is provided. */
+	Assert(OidIsValid(oid) || RelationIsValid(relation));
+	Assert(!(OidIsValid(oid) && RelationIsValid(relation)));
 
 	Assert(RelationSyncCache != NULL);
 
@@ -1993,6 +2017,23 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 		entry->attrmap = NULL;
 	}
 
+	if (!entry->replicate_valid && !IsTransactionOrTransactionBlock())
+	{
+		/*
+		 * Validating the entry needs to access syscache, which must
+		 * be in a transaction state, if that's not ready, start one.
+		 */
+
+		using_subtxn = IsTransactionOrTransactionBlock();
+
+		if (using_subtxn)
+			BeginInternalSubTransaction(__func__);
+		else
+			StartTransactionCommand();
+
+		started_xact = true;
+	}
+
 	/* Validate the entry */
 	if (!entry->replicate_valid)
 	{
@@ -2198,9 +2239,19 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 		if (entry->pubactions.pubinsert || entry->pubactions.pubupdate ||
 			entry->pubactions.pubdelete)
 		{
+			bool rel_opened = false;
+
+			if (!RelationIsValid(relation))
+			{
+				relation = relation_open(oid, AccessShareLock);
+				rel_opened = true;
+			}
 			/* Initialize the tuple slot and map */
 			init_tuple_slot(data, relation, entry);
 
+			if (rel_opened)
+				relation_close(relation, AccessShareLock);
+
 			/* Initialize the row filter */
 			pgoutput_row_filter_init(data, rel_publications, entry);
 
@@ -2215,6 +2266,13 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 		entry->replicate_valid = true;
 	}
 
+	if (started_xact)
+	{
+		AbortCurrentTransaction();
+		if (using_subtxn)
+			RollbackAndReleaseCurrentSubTransaction();
+	}
+
 	return entry;
 }
 
-- 
2.34.1

