From f2cd3a14f9513b880becedd95c933dad53c7d9e3 Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilip.kumar@enterprisedb.com>
Date: Mon, 18 Nov 2019 09:53:08 +0530
Subject: [PATCH v6 01/12] Immediately WAL-log assignments

The logical decoding infrastructure needs to know which top-level
transaction the subxact belongs to, in order to decode all the
changes. Until now that might be delayed until commit, due to the
caching (GPROC_MAX_CACHED_SUBXIDS), preventing features requiring
incremental decoding.

So we also write the assignment info into WAL immediately, as part
of the next WAL record (to minimize overhead).  However, we can not
remove the existing XLOG_XACT_ASSIGNMENT wal as that is required
for avoiding overflow in the hot standby snapshot.
---
 src/backend/access/transam/xact.c        | 45 ++++++++++++++++++++++++
 src/backend/access/transam/xloginsert.c  | 22 ++++++++++--
 src/backend/access/transam/xlogreader.c  |  5 +++
 src/backend/replication/logical/decode.c | 39 ++++++++++----------
 src/include/access/xact.h                |  3 ++
 src/include/access/xlog.h                |  1 +
 src/include/access/xlogreader.h          |  3 ++
 src/include/access/xlogrecord.h          |  1 +
 8 files changed, 98 insertions(+), 21 deletions(-)

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 017f03b6d8..51557e2951 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -190,6 +190,7 @@ typedef struct TransactionStateData
 	bool		didLogXid;		/* has xid been included in WAL record? */
 	int			parallelModeLevel;	/* Enter/ExitParallelMode counter */
 	bool		chain;			/* start a new block after this one */
+	bool		assigned;		/* assigned to toplevel XID */
 	struct TransactionStateData *parent;	/* back link to parent */
 } TransactionStateData;
 
@@ -5096,6 +5097,7 @@ PushTransaction(void)
 	GetUserIdAndSecContext(&s->prevUser, &s->prevSecContext);
 	s->prevXactReadOnly = XactReadOnly;
 	s->parallelModeLevel = 0;
+	s->assigned = false;
 
 	CurrentTransactionState = s;
 
@@ -6002,3 +6004,46 @@ xact_redo(XLogReaderState *record)
 	else
 		elog(PANIC, "xact_redo: unknown op code %u", info);
 }
+
+/*
+ *	IsSubTransactionAssignmentPending
+ *
+ *	This returns true if we are inside a valid substransaction, for which
+ *	the assignment was not yet written to any WAL record.
+ */
+bool
+IsSubTransactionAssignmentPending(void)
+{
+	if (!XLogLogicalInfoActive())
+		return false;
+
+	/* we need to be in a transaction state */
+	if (!IsTransactionState())
+		return false;
+
+	/* it has to be a subtransaction */
+	if (!IsSubTransaction())
+		return false;
+
+	/* the subtransaction has to have a XID assigned */
+	if (!TransactionIdIsValid(GetCurrentTransactionIdIfAny()))
+		return false;
+
+	/* and it needs to have 'assigned' */
+	return !CurrentTransactionState->assigned;
+
+}
+
+/*
+ *	MarkSubTransactionAssigned
+ *
+ *	Mark the subtransaction assignment as completed.
+ */
+void
+MarkSubTransactionAssigned(void)
+{
+	Assert(IsSubTransactionAssignmentPending());
+
+	CurrentTransactionState->assigned = true;
+
+}
diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c
index 2fa0a7f667..b11b0c2940 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -88,11 +88,13 @@ static XLogRecData hdr_rdt;
 static char *hdr_scratch = NULL;
 
 #define SizeOfXlogOrigin	(sizeof(RepOriginId) + sizeof(char))
+#define SizeOfTransactionId	(sizeof(TransactionId) + sizeof(char))
 
 #define HEADER_SCRATCH_SIZE \
 	(SizeOfXLogRecord + \
 	 MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \
-	 SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin)
+	 SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin + \
+	 SizeOfTransactionId)
 
 /*
  * An array of XLogRecData structs, to hold registered data.
@@ -194,6 +196,10 @@ XLogResetInsertion(void)
 {
 	int			i;
 
+	/* reset the subxact assignment flag (if needed) */
+	if (curinsert_flags & XLOG_INCLUDE_XID)
+		MarkSubTransactionAssigned();
+
 	for (i = 0; i < max_registered_block_id; i++)
 		registered_buffers[i].in_use = false;
 
@@ -397,7 +403,7 @@ void
 XLogSetRecordFlags(uint8 flags)
 {
 	Assert(begininsert_called);
-	curinsert_flags = flags;
+	curinsert_flags |= flags;
 }
 
 /*
@@ -743,6 +749,18 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 		scratch += sizeof(replorigin_session_origin);
 	}
 
+	/* followed by toplevel XID, if not already included in previous record */
+	if (IsSubTransactionAssignmentPending())
+	{
+		TransactionId	xid = GetTopTransactionIdIfAny();
+
+		/* update the flag (later used by XLogInsertRecord) */
+		curinsert_flags |= XLOG_INCLUDE_XID;
+		*(scratch++) = (char) XLR_BLOCK_ID_TOPLEVEL_XID;
+		memcpy(scratch, &xid, sizeof(TransactionId));
+		scratch += sizeof(TransactionId);
+	}
+
 	/* followed by main data, if any */
 	if (mainrdata_len > 0)
 	{
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 3aa68127a3..8c281e821a 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -1165,6 +1165,7 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
 
 	state->decoded_record = record;
 	state->record_origin = InvalidRepOriginId;
+	state->toplevel_xid = InvalidTransactionId;
 
 	ptr = (char *) record;
 	ptr += SizeOfXLogRecord;
@@ -1203,6 +1204,10 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
 		{
 			COPY_HEADER_FIELD(&state->record_origin, sizeof(RepOriginId));
 		}
+		else if (block_id == XLR_BLOCK_ID_TOPLEVEL_XID)
+		{
+			COPY_HEADER_FIELD(&state->toplevel_xid, sizeof(TransactionId));
+		}
 		else if (block_id <= XLR_MAX_BLOCK_ID)
 		{
 			/* XLogRecordBlockHeader */
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 5e1dc8a651..a99fcaf0a1 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -93,12 +93,28 @@ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
 void
 LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
 {
-	XLogRecordBuffer buf;
+	XLogRecordBuffer	buf;
+	TransactionId		txid;
 
 	buf.origptr = ctx->reader->ReadRecPtr;
 	buf.endptr = ctx->reader->EndRecPtr;
 	buf.record = record;
 
+	txid = XLogRecGetTopXid(record);
+
+	/*
+	 * If the toplevel_xid is valid, we need to assign the subxact to the
+	 * toplevel transaction. We need to do this for all records, hence we
+	 * do it before the switch.
+	 */
+	if (TransactionIdIsValid(txid))
+	{
+		ReorderBufferAssignChild(ctx->reorder,
+								 record->toplevel_xid,
+								 record->decoded_record->xl_xid,
+								 buf.origptr);
+	}
+
 	/* cast so we get a warning when new rmgrs are added */
 	switch ((RmgrIds) XLogRecGetRmid(record))
 	{
@@ -217,12 +233,12 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	 * If the snapshot isn't yet fully built, we cannot decode anything, so
 	 * bail out.
 	 *
-	 * However, it's critical to process XLOG_XACT_ASSIGNMENT records even
+	 * However, it's critical to process records with subxid assignment even
 	 * when the snapshot is being built: it is possible to get later records
 	 * that require subxids to be properly assigned.
 	 */
 	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT &&
-		info != XLOG_XACT_ASSIGNMENT)
+		!TransactionIdIsValid(r->toplevel_xid))
 		return;
 
 	switch (info)
@@ -264,22 +280,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				break;
 			}
 		case XLOG_XACT_ASSIGNMENT:
-			{
-				xl_xact_assignment *xlrec;
-				int			i;
-				TransactionId *sub_xid;
-
-				xlrec = (xl_xact_assignment *) XLogRecGetData(r);
-
-				sub_xid = &xlrec->xsub[0];
-
-				for (i = 0; i < xlrec->nsubxacts; i++)
-				{
-					ReorderBufferAssignChild(reorder, xlrec->xtop,
-											 *(sub_xid++), buf->origptr);
-				}
-				break;
-			}
+			break;
 		case XLOG_XACT_PREPARE:
 
 			/*
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 7ee04babc2..8645b3816c 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -428,6 +428,9 @@ extern void UnregisterXactCallback(XactCallback callback, void *arg);
 extern void RegisterSubXactCallback(SubXactCallback callback, void *arg);
 extern void UnregisterSubXactCallback(SubXactCallback callback, void *arg);
 
+extern bool IsSubTransactionAssignmentPending(void);
+extern void MarkSubTransactionAssigned(void);
+
 extern int	xactGetCommittedChildren(TransactionId **ptr);
 
 extern XLogRecPtr XactLogCommitRecord(TimestampTz commit_time,
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 98b033fc20..e23892ab87 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -227,6 +227,7 @@ extern bool XLOG_DEBUG;
  */
 #define XLOG_INCLUDE_ORIGIN		0x01	/* include the replication origin */
 #define XLOG_MARK_UNIMPORTANT	0x02	/* record not important for durability */
+#define XLOG_INCLUDE_XID		0x04	/* include XID of toplevel xact */
 
 
 /* Checkpoint statistics */
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index f7cc8c4e1d..4805caec67 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -147,6 +147,8 @@ struct XLogReaderState
 
 	RepOriginId record_origin;
 
+	TransactionId	toplevel_xid;	/* XID of toplevel transaction */
+
 	/* information about blocks referenced by the record. */
 	DecodedBkpBlock blocks[XLR_MAX_BLOCK_ID + 1];
 
@@ -280,6 +282,7 @@ extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record,
 #define XLogRecGetRmid(decoder) ((decoder)->decoded_record->xl_rmid)
 #define XLogRecGetXid(decoder) ((decoder)->decoded_record->xl_xid)
 #define XLogRecGetOrigin(decoder) ((decoder)->record_origin)
+#define XLogRecGetTopXid(decoder) ((decoder)->toplevel_xid)
 #define XLogRecGetData(decoder) ((decoder)->main_data)
 #define XLogRecGetDataLen(decoder) ((decoder)->main_data_len)
 #define XLogRecHasAnyBlockRefs(decoder) ((decoder)->max_block_id >= 0)
diff --git a/src/include/access/xlogrecord.h b/src/include/access/xlogrecord.h
index acd9af0194..2f0c8bf589 100644
--- a/src/include/access/xlogrecord.h
+++ b/src/include/access/xlogrecord.h
@@ -223,5 +223,6 @@ typedef struct XLogRecordDataHeaderLong
 #define XLR_BLOCK_ID_DATA_SHORT		255
 #define XLR_BLOCK_ID_DATA_LONG		254
 #define XLR_BLOCK_ID_ORIGIN			253
+#define XLR_BLOCK_ID_TOPLEVEL_XID	252
 
 #endif							/* XLOGRECORD_H */
-- 
2.20.1

