From cff177a82f36cca1cd7e216898c29f9b2677edac Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Tue, 28 Feb 2023 13:27:14 +0200
Subject: [PATCH 4/4] Include command ID in heapam records on catalog tables.

Logical decoding needs to track the cmin/cmax of catalog tuples, so
that it can build the "historic snapshots". The cmin/cmax are not
WAL-logged, so visibility checks on historic snapshots could not rely
on the cmin/cmax on the heap tuple itself. Instead, every modification
on a catalog table wrote an additional XLOG_HEAP2_NEW_CID record, with
the cmin/cmax of the tuple, and we would track those in the reorder
buffer.
---
 src/backend/access/heap/heapam.c              | 190 ++++++------------
 src/backend/access/rmgrdesc/heapdesc.c        |  16 --
 src/backend/replication/logical/decode.c      | 185 ++++++++++++++++-
 .../replication/logical/reorderbuffer.c       |  49 ++++-
 src/backend/replication/logical/snapbuild.c   |  45 ++---
 src/include/access/heapam_xlog.h              |  33 ++-
 src/include/replication/reorderbuffer.h       |   3 +-
 src/include/replication/snapbuild.h           |   4 +-
 8 files changed, 330 insertions(+), 195 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index f82176491a1..df304eef9b4 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -75,7 +75,7 @@
 
 static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup,
 									 TransactionId xid, CommandId cid, int options);
-static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
+static XLogRecPtr log_heap_update(Relation reln, CommandId cid, Buffer oldbuf,
 								  Buffer newbuf, HeapTuple oldtup,
 								  HeapTuple newtup, HeapTuple old_key_tuple,
 								  bool all_visible_cleared, bool new_all_visible_cleared);
@@ -111,7 +111,6 @@ static bool ConditionalMultiXactIdWait(MultiXactId multi, MultiXactStatus status
 									   uint16 infomask, Relation rel, int *remaining);
 static void index_delete_sort(TM_IndexDeleteOp *delstate);
 static int	bottomup_sort_and_shrink(TM_IndexDeleteOp *delstate);
-static XLogRecPtr log_heap_new_cid(Relation relation, HeapTuple tup);
 static HeapTuple ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required,
 										bool *copy);
 
@@ -1903,13 +1902,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
 		Page		page = BufferGetPage(buffer);
 		uint8		info = XLOG_HEAP_INSERT;
 		int			bufflags = 0;
-
-		/*
-		 * If this is a catalog, we need to transmit combo CIDs to properly
-		 * decode, so log that as well.
-		 */
-		if (RelationIsAccessibleInLogicalDecoding(relation))
-			log_heap_new_cid(relation, heaptup);
+		bool		need_cid = RelationIsAccessibleInLogicalDecoding(relation);
 
 		/*
 		 * If this is the single and first tuple on page, we can reinit the
@@ -1929,6 +1922,9 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
 			xlrec.flags |= XLH_INSERT_ALL_VISIBLE_CLEARED;
 		if (options & HEAP_INSERT_SPECULATIVE)
 			xlrec.flags |= XLH_INSERT_IS_SPECULATIVE;
+		if (need_cid)
+			xlrec.flags |= XLH_INSERT_ON_CATALOG_RELATION | XLH_INSERT_CONTAINS_CID;
+
 		Assert(ItemPointerGetBlockNumber(&heaptup->t_self) == BufferGetBlockNumber(buffer));
 
 		/*
@@ -1953,6 +1949,10 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
 		xlhdr.t_infomask = heaptup->t_data->t_infomask;
 		xlhdr.t_hoff = heaptup->t_data->t_hoff;
 
+		/* include command id if needed for logical decoding */
+		if (need_cid)
+			XLogRegisterData((char *) &cid, sizeof(CommandId));
+
 		/*
 		 * note we mark xlhdr as belonging to buffer; if XLogInsert decides to
 		 * write the whole page to the xlog, we don't need to store
@@ -2075,7 +2075,7 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
 	bool		needwal;
 	Size		saveFreeSpace;
 	bool		need_tuple_data = RelationIsLogicallyLogged(relation);
-	bool		need_cids = RelationIsAccessibleInLogicalDecoding(relation);
+	bool		need_cid = RelationIsAccessibleInLogicalDecoding(relation);
 
 	/* currently not needed (thus unsupported) for heap_multi_insert() */
 	Assert(!(options & HEAP_INSERT_NO_LOGICAL));
@@ -2159,13 +2159,6 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
 		 */
 		RelationPutHeapTuple(relation, buffer, heaptuples[ndone], false);
 
-		/*
-		 * For logical decoding we need combo CIDs to properly decode the
-		 * catalog.
-		 */
-		if (needwal && need_cids)
-			log_heap_new_cid(relation, heaptuples[ndone]);
-
 		for (nthispage = 1; ndone + nthispage < ntuples; nthispage++)
 		{
 			HeapTuple	heaptup = heaptuples[ndone + nthispage];
@@ -2174,13 +2167,6 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
 				break;
 
 			RelationPutHeapTuple(relation, buffer, heaptup, false);
-
-			/*
-			 * For logical decoding we need combo CIDs to properly decode the
-			 * catalog.
-			 */
-			if (needwal && need_cids)
-				log_heap_new_cid(relation, heaptup);
 		}
 
 		/*
@@ -2216,6 +2202,7 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
 			char	   *tupledata;
 			int			totaldatalen;
 			char	   *scratchptr = scratch.data;
+			uint16	   *offsets = NULL;
 			bool		init;
 			int			bufflags = 0;
 
@@ -2229,6 +2216,13 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
 			xlrec = (xl_heap_multi_insert *) scratchptr;
 			scratchptr += SizeOfHeapMultiInsert;
 
+			/* include command id if needed for logical decoding */
+			if (need_cid)
+			{
+				memcpy(scratchptr, &cid, sizeof(CommandId));
+				scratchptr += sizeof(CommandId);
+			}
+
 			/*
 			 * Allocate offsets array. Unless we're reinitializing the page,
 			 * in that case the tuples are stored in order starting at
@@ -2236,7 +2230,10 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
 			 * explicitly.
 			 */
 			if (!init)
+			{
+				offsets = (uint16 *) scratchptr;
 				scratchptr += nthispage * sizeof(OffsetNumber);
+			}
 
 			/* the rest of the scratch space is used for tuple data */
 			tupledata = scratchptr;
@@ -2263,7 +2260,7 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
 				int			datalen;
 
 				if (!init)
-					xlrec->offsets[i] = ItemPointerGetOffsetNumber(&heaptup->t_self);
+					offsets[i] = ItemPointerGetOffsetNumber(&heaptup->t_self);
 				/* xl_multi_insert_tuple needs two-byte alignment. */
 				tuphdr = (xl_multi_insert_tuple *) SHORTALIGN(scratchptr);
 				scratchptr = ((char *) tuphdr) + SizeOfMultiInsertTuple;
@@ -2286,6 +2283,9 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
 			if (need_tuple_data)
 				xlrec->flags |= XLH_INSERT_CONTAINS_NEW_TUPLE;
 
+			if (need_cid)
+				xlrec->flags |= XLH_INSERT_ON_CATALOG_RELATION | XLH_INSERT_CONTAINS_CID;
+
 			/*
 			 * Signal that this is the last xl_heap_multi_insert record
 			 * emitted by this call to heap_multi_insert(). Needed for logical
@@ -2766,13 +2766,7 @@ l1:
 		xl_heap_delete xlrec;
 		xl_heap_header xlhdr;
 		XLogRecPtr	recptr;
-
-		/*
-		 * For logical decode we need combo CIDs to properly decode the
-		 * catalog
-		 */
-		if (RelationIsAccessibleInLogicalDecoding(relation))
-			log_heap_new_cid(relation, &tp);
+		bool		need_cid = RelationIsAccessibleInLogicalDecoding(relation);
 
 		xlrec.flags = 0;
 		if (all_visible_cleared)
@@ -2791,10 +2785,16 @@ l1:
 			else
 				xlrec.flags |= XLH_DELETE_CONTAINS_OLD_KEY;
 		}
+		if (need_cid)
+			xlrec.flags |= XLH_DELETE_ON_CATALOG_RELATION | XLH_DELETE_CONTAINS_CID;
 
 		XLogBeginInsert();
 		XLogRegisterData((char *) &xlrec, SizeOfHeapDelete);
 
+		/* include command id if needed for logical decoding */
+		if (need_cid)
+			XLogRegisterData((char *) &cid, sizeof(CommandId));
+
 		XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
 
 		/*
@@ -2963,6 +2963,7 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
 				infomask2_old_tuple,
 				infomask_new_tuple,
 				infomask2_new_tuple;
+	CommandId	orig_cid = cid;
 
 	Assert(ItemPointerIsValid(otid));
 
@@ -3728,17 +3729,7 @@ l2:
 	{
 		XLogRecPtr	recptr;
 
-		/*
-		 * For logical decoding we need combo CIDs to properly decode the
-		 * catalog.
-		 */
-		if (RelationIsAccessibleInLogicalDecoding(relation))
-		{
-			log_heap_new_cid(relation, &oldtup);
-			log_heap_new_cid(relation, heaptup);
-		}
-
-		recptr = log_heap_update(relation, buffer,
+		recptr = log_heap_update(relation, orig_cid, buffer,
 								 newbuf, &oldtup, heaptup,
 								 old_key_tuple,
 								 all_visible_cleared,
@@ -8267,9 +8258,13 @@ log_heap_visible(RelFileLocator rlocator, Buffer heap_buffer, Buffer vm_buffer,
 /*
  * Perform XLogInsert for a heap-update operation.  Caller must already
  * have modified the buffer(s) and marked them dirty.
+ *
+ * 'cid' is the command ID performing the update. On the old tuple, it
+ * might've been replaced with a combocid; 'cid' is the original, real
+ * command ID.
  */
 static XLogRecPtr
-log_heap_update(Relation reln, Buffer oldbuf,
+log_heap_update(Relation reln, CommandId cid, Buffer oldbuf,
 				Buffer newbuf, HeapTuple oldtup, HeapTuple newtup,
 				HeapTuple old_key_tuple,
 				bool all_visible_cleared, bool new_all_visible_cleared)
@@ -8284,6 +8279,7 @@ log_heap_update(Relation reln, Buffer oldbuf,
 	XLogRecPtr	recptr;
 	Page		page = BufferGetPage(newbuf);
 	bool		need_tuple_data = RelationIsLogicallyLogged(reln);
+	bool		need_cid = RelationIsAccessibleInLogicalDecoding(reln);
 	bool		init;
 	int			bufflags;
 
@@ -8359,6 +8355,9 @@ log_heap_update(Relation reln, Buffer oldbuf,
 		xlrec.flags |= XLH_UPDATE_PREFIX_FROM_OLD;
 	if (suffixlen > 0)
 		xlrec.flags |= XLH_UPDATE_SUFFIX_FROM_OLD;
+	if (need_cid)
+		xlrec.flags |= XLH_UPDATE_ON_CATALOG_RELATION | XLH_UPDATE_CONTAINS_CID;
+
 	if (need_tuple_data)
 	{
 		xlrec.flags |= XLH_UPDATE_CONTAINS_NEW_TUPLE;
@@ -8403,6 +8402,10 @@ log_heap_update(Relation reln, Buffer oldbuf,
 
 	XLogRegisterData((char *) &xlrec, SizeOfHeapUpdate);
 
+	/* include command id if needed for logical decoding */
+	if (need_cid)
+		XLogRegisterData((char *) &cid, sizeof(CommandId));
+
 	/*
 	 * Prepare WAL data for the new tuple.
 	 */
@@ -8484,78 +8487,6 @@ log_heap_update(Relation reln, Buffer oldbuf,
 	return recptr;
 }
 
-/*
- * Perform XLogInsert of an XLOG_HEAP2_NEW_CID record
- *
- * This is only used in wal_level >= WAL_LEVEL_LOGICAL, and only for catalog
- * tuples.
- */
-static XLogRecPtr
-log_heap_new_cid(Relation relation, HeapTuple tup)
-{
-	xl_heap_new_cid xlrec;
-
-	XLogRecPtr	recptr;
-	HeapTupleHeader hdr = tup->t_data;
-
-	Assert(ItemPointerIsValid(&tup->t_self));
-	Assert(tup->t_tableOid != InvalidOid);
-
-	xlrec.top_xid = GetTopTransactionId();
-	xlrec.target_locator = relation->rd_locator;
-	xlrec.target_tid = tup->t_self;
-
-	/*
-	 * If the tuple got inserted & deleted in the same TX we definitely have a
-	 * combo CID, set cmin and cmax.
-	 */
-	if (hdr->t_infomask & HEAP_COMBOCID)
-	{
-		Assert(!(hdr->t_infomask & HEAP_XMAX_INVALID));
-		Assert(!HeapTupleHeaderXminInvalid(hdr));
-		xlrec.cmin = HeapTupleHeaderGetCmin(hdr);
-		xlrec.cmax = HeapTupleHeaderGetCmax(hdr);
-	}
-	/* No combo CID, so only cmin or cmax can be set by this TX */
-	else
-	{
-		/*
-		 * Tuple inserted.
-		 *
-		 * We need to check for LOCK ONLY because multixacts might be
-		 * transferred to the new tuple in case of FOR KEY SHARE updates in
-		 * which case there will be an xmax, although the tuple just got
-		 * inserted.
-		 */
-		if (hdr->t_infomask & HEAP_XMAX_INVALID ||
-			HEAP_XMAX_IS_LOCKED_ONLY(hdr->t_infomask))
-		{
-			xlrec.cmin = HeapTupleHeaderGetRawCommandId(hdr);
-			xlrec.cmax = InvalidCommandId;
-		}
-		/* Tuple from a different tx updated or deleted. */
-		else
-		{
-			xlrec.cmin = InvalidCommandId;
-			xlrec.cmax = HeapTupleHeaderGetRawCommandId(hdr);
-		}
-	}
-
-	/*
-	 * Note that we don't need to register the buffer here, because this
-	 * operation does not modify the page. The insert/update/delete that
-	 * called us certainly did, but that's WAL-logged separately.
-	 */
-	XLogBeginInsert();
-	XLogRegisterData((char *) &xlrec, SizeOfHeapNewCid);
-
-	/* will be looked at irrespective of origin */
-
-	recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_NEW_CID);
-
-	return recptr;
-}
-
 /*
  * Build a heap tuple representing the configured REPLICA IDENTITY to represent
  * the old tuple in an UPDATE or DELETE.
@@ -9253,13 +9184,29 @@ heap_xlog_multi_insert(XLogReaderState *record)
 	Size		freespace = 0;
 	int			i;
 	bool		isinit = (XLogRecGetInfo(record) & XLOG_HEAP_INIT_PAGE) != 0;
+	CommandId	cid = InvalidCommandId;
+	OffsetNumber *offsets = NULL;
 	XLogRedoAction action;
+	char	   *recdata;
 
 	/*
 	 * Insertion doesn't overwrite MVCC data, so no conflict processing is
 	 * required.
 	 */
-	xlrec = (xl_heap_multi_insert *) XLogRecGetData(record);
+	recdata = XLogRecGetData(record);
+	xlrec = (xl_heap_multi_insert *) recdata;
+	recdata += SizeOfHeapMultiInsert;
+
+	if ((xlrec->flags & XLH_INSERT_CONTAINS_CID) != 0)
+	{
+		memcpy(&cid, recdata, sizeof(CommandId));
+		recdata += sizeof(CommandId);
+	}
+	if (!isinit)
+	{
+		offsets = (OffsetNumber *) recdata;
+		recdata += xlrec->ntuples * sizeof(OffsetNumber);
+	}
 
 	XLogRecGetBlockTag(record, 0, &rlocator, NULL, &blkno);
 
@@ -9316,7 +9263,7 @@ heap_xlog_multi_insert(XLogReaderState *record)
 			if (isinit)
 				offnum = FirstOffsetNumber + i;
 			else
-				offnum = xlrec->offsets[i];
+				offnum = offsets[i];
 			if (PageGetMaxOffsetNumber(page) + 1 < offnum)
 				elog(PANIC, "invalid max offset number");
 
@@ -9932,13 +9879,6 @@ heap2_redo(XLogReaderState *record)
 		case XLOG_HEAP2_LOCK_UPDATED:
 			heap_xlog_lock_updated(record);
 			break;
-		case XLOG_HEAP2_NEW_CID:
-
-			/*
-			 * Nothing to do on a real replay, only used during logical
-			 * decoding.
-			 */
-			break;
 		case XLOG_HEAP2_REWRITE:
 			heap_xlog_logical_rewrite(record);
 			break;
diff --git a/src/backend/access/rmgrdesc/heapdesc.c b/src/backend/access/rmgrdesc/heapdesc.c
index ae35f2c88dc..e617949cec7 100644
--- a/src/backend/access/rmgrdesc/heapdesc.c
+++ b/src/backend/access/rmgrdesc/heapdesc.c
@@ -165,19 +165,6 @@ heap2_desc(StringInfo buf, XLogReaderState *record)
 						 xlrec->offnum, xlrec->xmax, xlrec->flags);
 		out_infobits(buf, xlrec->infobits_set);
 	}
-	else if (info == XLOG_HEAP2_NEW_CID)
-	{
-		xl_heap_new_cid *xlrec = (xl_heap_new_cid *) rec;
-
-		appendStringInfo(buf, "rel %u/%u/%u; tid %u/%u",
-						 xlrec->target_locator.spcOid,
-						 xlrec->target_locator.dbOid,
-						 xlrec->target_locator.relNumber,
-						 ItemPointerGetBlockNumber(&(xlrec->target_tid)),
-						 ItemPointerGetOffsetNumber(&(xlrec->target_tid)));
-		appendStringInfo(buf, "; cmin: %u, cmax: %u",
-						 xlrec->cmin, xlrec->cmax);
-	}
 }
 
 const char *
@@ -253,9 +240,6 @@ heap2_identify(uint8 info)
 		case XLOG_HEAP2_LOCK_UPDATED:
 			id = "LOCK_UPDATED";
 			break;
-		case XLOG_HEAP2_NEW_CID:
-			id = "NEW_CID";
-			break;
 		case XLOG_HEAP2_REWRITE:
 			id = "REWRITE";
 			break;
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index d8962345da4..1b4767b950e 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -51,6 +51,11 @@ static void DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 
+static void DecodeInsertOnCatalog(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeUpdateOnCatalog(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeDeleteOnCatalog(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeMultiInsertOnCatalog(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+
 static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 						 xl_xact_parsed_commit *parsed, TransactionId xid,
 						 bool two_phase);
@@ -396,16 +401,8 @@ heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 		case XLOG_HEAP2_MULTI_INSERT:
 			if (SnapBuildProcessChange(builder, xid, buf->origptr))
 				DecodeMultiInsert(ctx, buf);
+			DecodeMultiInsertOnCatalog(ctx, buf);
 			break;
-		case XLOG_HEAP2_NEW_CID:
-			{
-				xl_heap_new_cid *xlrec;
-
-				xlrec = (xl_heap_new_cid *) XLogRecGetData(buf->record);
-				SnapBuildProcessNewCid(builder, xid, buf->origptr, xlrec);
-
-				break;
-			}
 		case XLOG_HEAP2_REWRITE:
 
 			/*
@@ -455,6 +452,7 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 		case XLOG_HEAP_INSERT:
 			if (SnapBuildProcessChange(builder, xid, buf->origptr))
 				DecodeInsert(ctx, buf);
+			DecodeInsertOnCatalog(ctx, buf);
 			break;
 
 			/*
@@ -466,11 +464,13 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 		case XLOG_HEAP_UPDATE:
 			if (SnapBuildProcessChange(builder, xid, buf->origptr))
 				DecodeUpdate(ctx, buf);
+			DecodeUpdateOnCatalog(ctx, buf);
 			break;
 
 		case XLOG_HEAP_DELETE:
 			if (SnapBuildProcessChange(builder, xid, buf->origptr))
 				DecodeDelete(ctx, buf);
+			DecodeDeleteOnCatalog(ctx, buf);
 			break;
 
 		case XLOG_HEAP_TRUNCATE:
@@ -1060,6 +1060,173 @@ DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 							 buf->origptr, change, false);
 }
 
+/* If this is an INSERT on a catalog table, extract cmin */
+static void
+DecodeInsertOnCatalog(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	XLogReaderState *r = buf->record;
+	xl_heap_insert *xlrec;
+	RelFileLocator rlocator;
+	BlockNumber blknum;
+	ItemPointerData ctid;
+	CommandId	cid;
+	char	   *recdata;
+
+	recdata = XLogRecGetData(r);
+
+	xlrec = (xl_heap_insert *) recdata;
+	recdata += SizeOfHeapInsert;
+
+	if (!(xlrec->flags & XLH_INSERT_ON_CATALOG_RELATION))
+		return;
+	Assert((xlrec->flags & XLH_INSERT_CONTAINS_CID) != 0);
+
+	memcpy(&cid, recdata, sizeof(CommandId));
+	recdata += sizeof(CommandId);
+
+	XLogRecGetBlockTag(r, 0, &rlocator, NULL, &blknum);
+
+	ItemPointerSet(&ctid, blknum, xlrec->offnum);
+
+	SnapBuildProcessNewCid(ctx->snapshot_builder,
+						   XLogRecGetXid(r),
+						   rlocator,
+						   ctid,
+						   buf->origptr, cid, InvalidCommandId);
+}
+
+/* If this is an UPDATE on a catalog table, extract cmin/cmax */
+static void
+DecodeUpdateOnCatalog(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	XLogReaderState *r = buf->record;
+	xl_heap_update *xlrec;
+	RelFileLocator rlocator;
+	BlockNumber newblk;
+	BlockNumber oldblk;
+	ItemPointerData ctid;
+	CommandId	cid;
+	char	   *recdata;
+
+	recdata = XLogRecGetData(r);
+	xlrec = (xl_heap_update *) recdata;
+	recdata += SizeOfHeapUpdate;
+
+	if (!(xlrec->flags & XLH_UPDATE_ON_CATALOG_RELATION))
+		return;
+	Assert((xlrec->flags & XLH_UPDATE_CONTAINS_CID) != 0);
+
+	memcpy(&cid, recdata, sizeof(CommandId));
+	recdata += sizeof(CommandId);
+
+	/* cmin on new tuple */
+	XLogRecGetBlockTag(r, 0, &rlocator, NULL, &newblk);
+	ItemPointerSet(&ctid, newblk, xlrec->new_offnum);
+	SnapBuildProcessNewCid(ctx->snapshot_builder,
+						   XLogRecGetXid(r),
+						   rlocator,
+						   ctid,
+						   buf->origptr, cid, InvalidCommandId);
+
+	/* cmax on old tuple */
+	if (!XLogRecGetBlockTagExtended(r, 1, NULL, NULL, &oldblk, NULL))
+		oldblk = newblk;
+	ItemPointerSet(&ctid, oldblk, xlrec->old_offnum);
+	SnapBuildProcessNewCid(ctx->snapshot_builder,
+						   XLogRecGetXid(r),
+						   rlocator,
+						   ctid,
+						   buf->origptr, InvalidCommandId, cid);
+}
+
+/* If this is a DELETE on a catalog table, extract cmax */
+static void
+DecodeDeleteOnCatalog(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	XLogReaderState *r = buf->record;
+	xl_heap_delete *xlrec;
+	RelFileLocator rlocator;
+	BlockNumber blknum;
+	ItemPointerData ctid;
+	CommandId	cid;
+	char	   *recdata;
+
+	recdata = XLogRecGetData(r);
+
+	xlrec = (xl_heap_delete *) XLogRecGetData(r);
+	recdata += SizeOfHeapUpdate;
+
+	if (!(xlrec->flags & XLH_DELETE_ON_CATALOG_RELATION))
+		return;
+	Assert((xlrec->flags & XLH_DELETE_CONTAINS_CID) != 0);
+
+	memcpy(&cid, recdata, sizeof(CommandId));
+	recdata += sizeof(CommandId);
+
+	XLogRecGetBlockTag(r, 0, &rlocator, NULL, &blknum);
+	ItemPointerSet(&ctid, blknum, xlrec->offnum);
+	SnapBuildProcessNewCid(ctx->snapshot_builder,
+						   XLogRecGetXid(r),
+						   rlocator,
+						   ctid,
+						   buf->origptr, InvalidCommandId, cid);
+}
+
+/* If this is a multi-INSERT on a catalog table, extract cmin */
+static void
+DecodeMultiInsertOnCatalog(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	XLogReaderState *r = buf->record;
+	xl_heap_multi_insert *xlrec;
+	RelFileLocator rlocator;
+	BlockNumber blknum;
+	ItemPointerData ctid;
+	bool		isinit;
+	CommandId	cid;
+	OffsetNumber *offsets = NULL;
+	char	   *recdata;
+
+	recdata = XLogRecGetData(r);
+
+	xlrec = (xl_heap_multi_insert *) recdata;
+	recdata += SizeOfHeapMultiInsert;
+
+	if (!(xlrec->flags & XLH_INSERT_ON_CATALOG_RELATION))
+		return;
+	Assert((xlrec->flags & XLH_INSERT_CONTAINS_CID) != 0);
+
+	memcpy(&cid, recdata, sizeof(CommandId));
+	recdata += sizeof(CommandId);
+
+	isinit = (XLogRecGetInfo(buf->record) & XLOG_HEAP_INIT_PAGE) != 0;
+	if (!isinit)
+	{
+		offsets = (uint16 *) recdata;
+		recdata += xlrec->ntuples * sizeof(uint16);
+	}
+
+	XLogRecGetBlockTag(r, 0, &rlocator, NULL, &blknum);
+
+	/* Record the same command ID for all the inserted tuples */
+	ItemPointerSetBlockNumber(&ctid, blknum);
+	for (int i = 0; i < xlrec->ntuples; i++)
+	{
+		OffsetNumber offnum;
+
+		if (isinit)
+			offnum = FirstOffsetNumber + i;
+		else
+			offnum = offsets[i];
+		ItemPointerSetOffsetNumber(&ctid, offnum);
+
+		SnapBuildProcessNewCid(ctx->snapshot_builder,
+							   XLogRecGetXid(r),
+							   rlocator,
+							   ctid,
+							   buf->origptr, cid, InvalidCommandId);
+	}
+}
+
 /*
  * Decode XLOG_HEAP2_MULTI_INSERT_insert record into multiple tuplebufs.
  *
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 1e7c7e55f45..5aeab8b40af 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1137,6 +1137,8 @@ static void
 ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn,
 								  ReorderBufferTXN *subtxn)
 {
+	dlist_mutable_iter iter;
+
 	Assert(subtxn->toplevel_xid == txn->xid);
 
 	if (subtxn->base_snapshot != NULL)
@@ -1181,6 +1183,28 @@ ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn,
 			subtxn->base_snapshot_lsn = InvalidXLogRecPtr;
 		}
 	}
+
+	/*
+	 * Also move the tuplecids entries to the parent
+	 */
+	dlist_foreach_modify(iter, &subtxn->tuplecids)
+	{
+		ReorderBufferChange *change;
+
+		change = dlist_container(ReorderBufferChange, node, iter.cur);
+
+		/* Check we're not mixing changes from different transactions. */
+		Assert(change->txn == subtxn);
+		Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
+
+		dlist_delete(&change->node);
+		subtxn->ntuplecids--;
+
+		dlist_push_tail(&txn->tuplecids, &change->node);
+		txn->ntuplecids++;
+
+		change->txn = txn;
+	}
 }
 
 /*
@@ -1542,7 +1566,6 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 
 	/*
 	 * Cleanup the tuplecids we stored for decoding catalog snapshot access.
-	 * They are always stored in the toplevel transaction.
 	 */
 	dlist_foreach_modify(iter, &txn->tuplecids)
 	{
@@ -1672,8 +1695,7 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep
 	{
 		/*
 		 * If this is a prepared txn, cleanup the tuplecids we stored for
-		 * decoding catalog snapshot access. They are always stored in the
-		 * toplevel transaction.
+		 * decoding catalog snapshot access.
 		 */
 		dlist_foreach_modify(iter, &txn->tuplecids)
 		{
@@ -1776,10 +1798,10 @@ ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
 		else
 		{
 			/*
-			 * Maybe we already saw this tuple before in this transaction, but
-			 * if so it must have the same cmin.
+			 * Maybe we already saw this tuple before in this transaction.
+			 * Cmin should be set only in the original insert, and not change afterwards.
 			 */
-			Assert(ent->cmin == change->data.tuplecid.cmin);
+			Assert(change->data.tuplecid.cmin == InvalidCommandId);
 
 			/*
 			 * cmax may be initially invalid, but once set it can only grow,
@@ -3231,7 +3253,7 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 }
 
 /*
- * Add new (relfilelocator, tid) -> (cmin, cmax) mappings.
+ * Add new (relfilelocator, tid) -> (cmin, cmax) mapping.
  *
  * We do not include this change type in memory accounting, because we
  * keep CIDs in a separate list and do not evict them when reaching
@@ -3248,6 +3270,12 @@ ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
 
 	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
 
+	if (TransactionIdIsValid(txn->toplevel_xid))
+	{
+		xid = txn->toplevel_xid;
+		txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
+	}
+
 	change->data.tuplecid.locator = locator;
 	change->data.tuplecid.tid = tid;
 	change->data.tuplecid.cmin = cmin;
@@ -5090,7 +5118,10 @@ ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
 			 * old records that did not yet have a cmax (e.g. pg_class' own
 			 * entry while rewriting it) during rewrites, so allow that.
 			 */
-			Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
+			if (new_ent->cmin == InvalidCommandId)
+				new_ent->cmin = ent->cmin;
+			else
+				Assert(new_ent->cmin == ent->cmin);
 			Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
 		}
 		else
@@ -5214,7 +5245,7 @@ UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
 
 /*
  * Lookup cmin/cmax of a tuple, during logical decoding where we can't rely on
- * combo CIDs.
+ * cmin/cmax stored on the tuple.
  */
 bool
 ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data,
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index a86d5b1c74d..da081f0694f 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -41,16 +41,15 @@
  * transactions we need Snapshots that see intermediate versions of the
  * catalog in a transaction. During normal operation this is achieved by using
  * CommandIds/cmin/cmax. The problem with that however is that for space
- * efficiency reasons, the cmin and cmax are not included in WAL records. We
- * cannot read the cmin/cmax from the tuple itself, either, because it is
- * reset on crash recovery. Even if we could, we could not decode combocids
+ * efficiency reasons, the cmin and cmax are not normally included in WAL
+ * records. We cannot read the value from the tuple itself, either, because it
+ * is reset on crash recovery. Even if we could, we could not decode combocids
  * which are only tracked in the original backend's memory. To work around
- * that, heapam writes an extra WAL record (XLOG_HEAP2_NEW_CID) every time a
- * catalog row is modified, which includes the cmin and cmax of the
- * tuple. During decoding, we insert the ctid->(cmin,cmax) mappings into the
- * reorder buffer, and use them at visibility checks instead of the cmin/cmax
- * on the tuple itself. Check the reorderbuffer.c's comment above
- * ResolveCminCmaxDuringDecoding() for details.
+ * that, we do include the command ID in heap records on catalog tables, if
+ * wal_level >= logical. During decoding, we insert the ctid -> (cmin,cmax)
+ * mappings into the reorder buffer, and use them at visibility checks instead
+ * of the cin/cmax on the tuple itself. Check the reorderbuffer.c's comment
+ * above ResolveCminCmaxDuringDecoding() for details.
  *
  * To facilitate all this we need our own visibility routine, as the normal
  * ones are optimized for different usecases.
@@ -811,13 +810,14 @@ SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
 }
 
 /*
- * Do CommandId/combo CID handling after reading an xl_heap_new_cid record.
- * This implies that a transaction has done some form of write to system
- * catalogs.
+ * Do CommandId/combo CID handling after reading a heap insert/uppdate/delete
+ * record on a catalog table. This implies that a transaction has done some
+ * form of write to system catalogs.
  */
 void
 SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
-					   XLogRecPtr lsn, xl_heap_new_cid *xlrec)
+					   RelFileLocator rlocator, ItemPointerData ctid,
+					   XLogRecPtr lsn, CommandId cmin, CommandId cmax)
 {
 	CommandId	cid;
 
@@ -827,18 +827,17 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
 	 */
 	ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn);
 
-	ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn,
-								 xlrec->target_locator, xlrec->target_tid,
-								 xlrec->cmin, xlrec->cmax);
+	ReorderBufferAddNewTupleCids(builder->reorder, xid, lsn,
+								 rlocator, ctid,
+								 cmin, cmax);
 
 	/* figure out new command id */
-	if (xlrec->cmin != InvalidCommandId &&
-		xlrec->cmax != InvalidCommandId)
-		cid = Max(xlrec->cmin, xlrec->cmax);
-	else if (xlrec->cmax != InvalidCommandId)
-		cid = xlrec->cmax;
-	else if (xlrec->cmin != InvalidCommandId)
-		cid = xlrec->cmin;
+	if (cmin != InvalidCommandId && cmax != InvalidCommandId)
+		cid = Max(cmin, cmax);
+	else if (cmax != InvalidCommandId)
+		cid = cmax;
+	else if (cmin != InvalidCommandId)
+		cid = cmin;
 	else
 	{
 		cid = InvalidCommandId; /* silence compiler */
diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h
index d3b475e3144..13029a8bc50 100644
--- a/src/include/access/heapam_xlog.h
+++ b/src/include/access/heapam_xlog.h
@@ -57,7 +57,7 @@
 #define XLOG_HEAP2_VISIBLE		0x40
 #define XLOG_HEAP2_MULTI_INSERT 0x50
 #define XLOG_HEAP2_LOCK_UPDATED 0x60
-#define XLOG_HEAP2_NEW_CID		0x70
+/* 0x70 was XLOG_HEAP2_NEW_CID */
 
 /*
  * xl_heap_insert/xl_heap_multi_insert flag values, 8 bits are available.
@@ -68,12 +68,13 @@
 #define XLH_INSERT_IS_SPECULATIVE				(1<<2)
 #define XLH_INSERT_CONTAINS_NEW_TUPLE			(1<<3)
 #define XLH_INSERT_ON_TOAST_RELATION			(1<<4)
-
 /* all_frozen_set always implies all_visible_set */
 #define XLH_INSERT_ALL_FROZEN_SET				(1<<5)
+#define XLH_INSERT_CONTAINS_CID					(1<<6)
+#define XLH_INSERT_ON_CATALOG_RELATION			(1<<7)
 
 /*
- * xl_heap_update flag values, 8 bits are available.
+ * xl_heap_update flag values, 16 bits are available.
  */
 /* PD_ALL_VISIBLE was cleared */
 #define XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED		(1<<0)
@@ -84,6 +85,8 @@
 #define XLH_UPDATE_CONTAINS_NEW_TUPLE			(1<<4)
 #define XLH_UPDATE_PREFIX_FROM_OLD				(1<<5)
 #define XLH_UPDATE_SUFFIX_FROM_OLD				(1<<6)
+#define XLH_UPDATE_CONTAINS_CID					(1<<7)
+#define XLH_UPDATE_ON_CATALOG_RELATION			(1<<8)
 
 /* convenience macro for checking whether any form of old tuple was logged */
 #define XLH_UPDATE_CONTAINS_OLD						\
@@ -98,6 +101,8 @@
 #define XLH_DELETE_CONTAINS_OLD_KEY				(1<<2)
 #define XLH_DELETE_IS_SUPER						(1<<3)
 #define XLH_DELETE_IS_PARTITION_MOVE			(1<<4)
+#define XLH_DELETE_CONTAINS_CID					(1<<5)
+#define XLH_DELETE_ON_CATALOG_RELATION			(1<<6)
 
 /* convenience macro for checking whether any form of old tuple was logged */
 #define XLH_DELETE_CONTAINS_OLD						\
@@ -110,6 +115,8 @@ typedef struct xl_heap_delete
 	OffsetNumber offnum;		/* deleted tuple's offset */
 	uint8		infobits_set;	/* infomask bits */
 	uint8		flags;
+
+	/* If XLH_DELETE_CONTAINS_CID is set, command ID follows */
 } xl_heap_delete;
 
 #define SizeOfHeapDelete	(offsetof(xl_heap_delete, flags) + sizeof(uint8))
@@ -156,6 +163,8 @@ typedef struct xl_heap_insert
 	OffsetNumber offnum;		/* inserted tuple's offset */
 	uint8		flags;
 
+	/* If XLH_INSERT_CONTAINS_CID is set, command ID follows */
+
 	/* xl_heap_header & TUPLE DATA in backup block 0 */
 } xl_heap_insert;
 
@@ -176,10 +185,14 @@ typedef struct xl_heap_multi_insert
 {
 	uint8		flags;
 	uint16		ntuples;
-	OffsetNumber offsets[FLEXIBLE_ARRAY_MEMBER];
+
+	/* If XLH_INSERT_CONTAINS_CID is set, command ID follows */
+
+	/* If XLOG_HEAP_INIT_PAGE is set, offsets array follows */
+
 } xl_heap_multi_insert;
 
-#define SizeOfHeapMultiInsert	offsetof(xl_heap_multi_insert, offsets)
+#define SizeOfHeapMultiInsert	(offsetof(xl_heap_multi_insert, ntuples) + sizeof(uint16))
 
 typedef struct xl_multi_insert_tuple
 {
@@ -212,11 +225,13 @@ typedef struct xl_multi_insert_tuple
 typedef struct xl_heap_update
 {
 	TransactionId old_xmax;		/* xmax of the old tuple */
-	OffsetNumber old_offnum;	/* old tuple's offset */
-	uint8		old_infobits_set;	/* infomask bits to set on old tuple */
-	uint8		flags;
 	TransactionId new_xmax;		/* xmax of the new tuple */
+	OffsetNumber old_offnum;	/* old tuple's offset */
 	OffsetNumber new_offnum;	/* new tuple's offset */
+	uint16		flags;
+	uint8		old_infobits_set;	/* infomask bits to set on old tuple */
+
+	/* If XLH_UPDATE_CONTAINS_CID is set, command ID follows */
 
 	/*
 	 * If XLH_UPDATE_CONTAINS_OLD_TUPLE or XLH_UPDATE_CONTAINS_OLD_KEY flags
@@ -224,7 +239,7 @@ typedef struct xl_heap_update
 	 */
 } xl_heap_update;
 
-#define SizeOfHeapUpdate	(offsetof(xl_heap_update, new_offnum) + sizeof(OffsetNumber))
+#define SizeOfHeapUpdate	(offsetof(xl_heap_update, old_infobits_set) + sizeof(uint8))
 
 /*
  * This is what we need to know about page pruning (both during VACUUM and
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index d13cef4eae2..0fd783dc98d 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -356,8 +356,7 @@ typedef struct ReorderBufferTXN
 
 	/*
 	 * List of (relation, ctid) => (cmin, cmax) mappings for catalog tuples.
-	 * Those are always assigned to the toplevel transaction. (Keep track of
-	 * #entries to create a hash of the right size)
+	 * (Keep track of #entries to create a hash of the right size)
 	 */
 	dlist_head	tuplecids;
 	uint64		ntuplecids;
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
index f49b941b53e..11cad6c5363 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -85,8 +85,8 @@ extern void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn,
 extern bool SnapBuildProcessChange(SnapBuild *builder, TransactionId xid,
 								   XLogRecPtr lsn);
 extern void SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
-								   XLogRecPtr lsn,
-								   struct xl_heap_new_cid *xlrec);
+								   RelFileLocator rlocator, ItemPointerData ctid,
+								   XLogRecPtr lsn, CommandId cmin, CommandId cmax);
 extern void SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn,
 										 struct xl_running_xacts *running);
 extern void SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn);
-- 
2.30.2

