From 1e3ec40d70c2e7f8482632333001fe52527ef17a Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyota.ntt@gmail.com>
Date: Mon, 31 Jan 2022 15:03:33 +0900
Subject: [PATCH v4] Avoid an error while creating logical replication slot

Snapshot can hold top XIDs up to the number of server processes but
SnapBuildInitialSnapshot tries to store all top-level and sub XIDs to
there and easily fails with "initial slot snapshot too large" error.

Instead, create a "takenDuringRecovery" snapshot, which stores all
XIDs in subxip array. This alone largely reduces the chance of getting
the error. But to eliminate the chance of the error to occur, allow to
create an overflowed snapshot by adding to reorder buffer a function
to tell whether an XID is a top-level or not.

Author: Dilip Kumar and Kyotaro Horiguchi
---
 .../replication/logical/reorderbuffer.c       | 20 +++++++
 src/backend/replication/logical/snapbuild.c   | 54 ++++++++++++++-----
 src/include/replication/reorderbuffer.h       |  1 +
 3 files changed, 63 insertions(+), 12 deletions(-)

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 19b2ba2000..429e6296ab 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -3326,6 +3326,26 @@ ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
 }
 
 
+/*
+ * ReorderBufferXidIsKnownSubXact
+ *		Returns true if the xid is a known subtransaction.
+ */
+bool
+ReorderBufferXidIsKnownSubXact(ReorderBuffer *rb, TransactionId xid)
+{
+	ReorderBufferTXN *txn;
+
+	txn = ReorderBufferTXNByXid(rb, xid, false,
+								NULL, InvalidXLogRecPtr, false);
+
+	/* a known subtxn? */
+	if (txn && rbtxn_is_known_subxact(txn))
+		return true;
+
+	return false;
+}
+
+
 /*
  * ---------------------------------------
  * Disk serialization support
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 83fca8a77d..21f30fa1d3 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -531,8 +531,9 @@ SnapBuildInitialSnapshot(SnapBuild *builder)
 {
 	Snapshot	snap;
 	TransactionId xid;
-	TransactionId *newxip;
-	int			newxcnt = 0;
+	TransactionId *newsubxip;
+	int			newsubxcnt;
+	bool		overflowed = false;
 
 	Assert(!FirstSnapshotSet);
 	Assert(XactIsoLevel == XACT_REPEATABLE_READ);
@@ -568,9 +569,12 @@ SnapBuildInitialSnapshot(SnapBuild *builder)
 
 	MyProc->xmin = snap->xmin;
 
-	/* allocate in transaction context */
-	newxip = (TransactionId *)
-		palloc(sizeof(TransactionId) * GetMaxSnapshotXidCount());
+	/*
+	 * Allocate in transaction context. We use only subxip to store xids. See
+	 * below and GetSnapshotData for details.
+	 */
+	newsubxip = (TransactionId *)
+		palloc(sizeof(TransactionId) * GetMaxSnapshotSubxidCount());
 
 	/*
 	 * snapbuild.c builds transactions in an "inverted" manner, which means it
@@ -578,6 +582,9 @@ SnapBuildInitialSnapshot(SnapBuild *builder)
 	 * classical snapshot by marking all non-committed transactions as
 	 * in-progress. This can be expensive.
 	 */
+retry:
+	newsubxcnt = 0;
+
 	for (xid = snap->xmin; NormalTransactionIdPrecedes(xid, snap->xmax);)
 	{
 		void	   *test;
@@ -591,12 +598,28 @@ SnapBuildInitialSnapshot(SnapBuild *builder)
 
 		if (test == NULL)
 		{
-			if (newxcnt >= GetMaxSnapshotXidCount())
-				ereport(ERROR,
-						(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
-						 errmsg("initial slot snapshot too large")));
+			/*
+			 * If subtransactions fit the subxid array, we fill the array and
+			 * call it a day. Otherwise we store only top-level transactions
+			 * into the same subxip array.
+			 */
+			if (!overflowed ||
+				!ReorderBufferXidIsKnownSubXact(builder->reorder, xid))
+			{
+				if (newsubxcnt >= GetMaxSnapshotSubxidCount())
+				{
+					if (overflowed)
+						ereport(ERROR,
+								(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+								 errmsg("initial slot snapshot too large")));
 
-			newxip[newxcnt++] = xid;
+					/* retry excluding subxids */
+					overflowed = true;
+					goto retry;
+				}
+
+				newsubxip[newsubxcnt++] = xid;
+			}
 		}
 
 		TransactionIdAdvance(xid);
@@ -604,8 +627,15 @@ SnapBuildInitialSnapshot(SnapBuild *builder)
 
 	/* adjust remaining snapshot fields as needed */
 	snap->snapshot_type = SNAPSHOT_MVCC;
-	snap->xcnt = newxcnt;
-	snap->xip = newxip;
+	snap->xcnt = 0;
+	snap->subxcnt = newsubxcnt;
+	snap->subxip = newsubxip;
+	snap->suboverflowed = overflowed;
+	/*
+	 * Although this snapshot is taken actually not during recovery, all XIDs
+	 * are stored in subxip even if it is not overflowed.
+	 */
+	snap->takenDuringRecovery = true;
 
 	return snap;
 }
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index aa0a73382f..c05cf3078c 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -669,6 +669,7 @@ void		ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLog
 bool		ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid);
 bool		ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid);
 
+bool		ReorderBufferXidIsKnownSubXact(ReorderBuffer *rb, TransactionId xid);
 bool		ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid,
 											 XLogRecPtr prepare_lsn, XLogRecPtr end_lsn,
 											 TimestampTz prepare_time,
-- 
2.27.0

