>From 06bc3275788b494306c717019c9b1b6082ea579a Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Fri, 20 Feb 2015 12:30:57 +0100
Subject: [PATCH] Debloat and deduplicate transaction commit/abort records.

---
 src/backend/access/rmgrdesc/xactdesc.c   | 189 +++++++++-----
 src/backend/access/transam/twophase.c    |  56 ++---
 src/backend/access/transam/xact.c        | 416 +++++++++++++++++++------------
 src/backend/access/transam/xlog.c        |  16 +-
 src/backend/replication/logical/decode.c | 101 +++-----
 src/include/access/xact.h                | 162 ++++++++----
 6 files changed, 564 insertions(+), 376 deletions(-)

diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c
index 3e87978..0686d55 100644
--- a/src/backend/access/rmgrdesc/xactdesc.c
+++ b/src/backend/access/rmgrdesc/xactdesc.c
@@ -19,48 +19,143 @@
 #include "storage/sinval.h"
 #include "utils/timestamp.h"
 
+/* Parse the WAL format of a xact abort into a easier to understand format. */
+void
+ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *parsed)
+{
+	char	   *data = ((char *) xlrec) + MinSizeOfXactCommit;
+
+	memset(parsed, 0, sizeof(*parsed));
+
+	parsed->xinfo = 0; /* default, if no XLOG_XACT_HAS_INFO is present */
+
+	if (info & XLOG_XACT_HAS_INFO)
+	{
+		parsed->xinfo = *(uint32 *) data;
+
+		data += sizeof(uint32);
+	}
+
+	if (parsed->xinfo & XACT_XINFO_HAS_DBINFO)
+	{
+		xl_xact_dbinfo *xl_dbinfo = (xl_xact_dbinfo *) data;
+
+		parsed->dbId = xl_dbinfo->dbId;
+		parsed->tsId = xl_dbinfo->tsId;
+
+		data += sizeof(xl_xact_dbinfo);
+	}
+
+	if (parsed->xinfo & XACT_XINFO_HAS_SUBXACTS)
+	{
+		xl_xact_subxacts   *xl_subxacts = (xl_xact_subxacts *) data;
+
+		parsed->nsubxacts = xl_subxacts->nsubxacts;
+		parsed->subxacts = xl_subxacts->subxacts;
+
+		data += MinSizeOfXactSubxacts;
+		data += parsed->nsubxacts * sizeof(TransactionId);
+	}
+
+	if (parsed->xinfo & XACT_XINFO_HAS_RELFILENODES)
+	{
+		xl_xact_relfilenodes *xl_relfilenodes = (xl_xact_relfilenodes *) data;
+
+		parsed->nrels = xl_relfilenodes->nrels;
+		parsed->xnodes = xl_relfilenodes->xnodes;
+
+		data += MinSizeOfXactRelfilenodes;
+		data += xl_relfilenodes->nrels * sizeof(RelFileNode);
+	}
+
+	if (parsed->xinfo & XACT_XINFO_HAS_INVALS)
+	{
+		xl_xact_invals *xl_invals = (xl_xact_invals *) data;
+
+		parsed->nmsgs = xl_invals->nmsgs;
+		parsed->msgs = xl_invals->msgs;
+
+		data += MinSizeOfXactInvals;
+		data += xl_invals->nmsgs * sizeof(SharedInvalidationMessage);
+	}
+}
+
+/* Parse the WAL format of xact abort into a easier to understand format. */
+void
+ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)
+{
+	char	   *data = ((char *) xlrec) + MinSizeOfXactAbort;
+
+	memset(parsed, 0, sizeof(*parsed));
+
+	parsed->xinfo = 0; /* default, if no XLOG_XACT_HAS_INFO is present */
+
+	if (info & XLOG_XACT_HAS_INFO)
+	{
+		parsed->xinfo = *(uint32 *) data;
+		data += sizeof(uint32);
+	}
+
+	if (parsed->xinfo & XACT_XINFO_HAS_SUBXACTS)
+	{
+		xl_xact_subxacts   *xl_subxacts = (xl_xact_subxacts *) data;
+
+		parsed->nsubxacts = xl_subxacts->nsubxacts;
+		parsed->subxacts = xl_subxacts->subxacts;
+
+		data += MinSizeOfXactSubxacts;
+		data += parsed->nsubxacts * sizeof(TransactionId);
+	}
+
+	if (parsed->xinfo & XACT_XINFO_HAS_RELFILENODES)
+	{
+		xl_xact_relfilenodes *xl_relfilenodes = (xl_xact_relfilenodes *) data;
+
+		parsed->nrels = xl_relfilenodes->nrels;
+		parsed->xnodes = xl_relfilenodes->xnodes;
+
+		data += MinSizeOfXactRelfilenodes;
+		data += xl_relfilenodes->nrels * sizeof(RelFileNode);
+	}
+}
 
 static void
-xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec)
+xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec)
 {
+	xl_xact_parsed_commit parsed_c;
 	int			i;
-	TransactionId *subxacts;
 
-	subxacts = (TransactionId *) &xlrec->xnodes[xlrec->nrels];
+	ParseCommitRecord(info, xlrec, &parsed_c);
 
 	appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time));
 
-	if (xlrec->nrels > 0)
+	if (parsed_c.nrels > 0)
 	{
 		appendStringInfoString(buf, "; rels:");
-		for (i = 0; i < xlrec->nrels; i++)
+		for (i = 0; i < parsed_c.nrels; i++)
 		{
-			char	   *path = relpathperm(xlrec->xnodes[i], MAIN_FORKNUM);
+			char	   *path = relpathperm(parsed_c.xnodes[i], MAIN_FORKNUM);
 
 			appendStringInfo(buf, " %s", path);
 			pfree(path);
 		}
 	}
-	if (xlrec->nsubxacts > 0)
+	if (parsed_c.nsubxacts > 0)
 	{
 		appendStringInfoString(buf, "; subxacts:");
-		for (i = 0; i < xlrec->nsubxacts; i++)
-			appendStringInfo(buf, " %u", subxacts[i]);
+		for (i = 0; i < parsed_c.nsubxacts; i++)
+			appendStringInfo(buf, " %u", parsed_c.subxacts[i]);
 	}
-	if (xlrec->nmsgs > 0)
+	if (parsed_c.nmsgs > 0)
 	{
-		SharedInvalidationMessage *msgs;
-
-		msgs = (SharedInvalidationMessage *) &subxacts[xlrec->nsubxacts];
-
-		if (XactCompletionRelcacheInitFileInval(xlrec->xinfo))
+		if (XactCompletionRelcacheInitFileInval(parsed_c.xinfo))
 			appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u",
-							 xlrec->dbId, xlrec->tsId);
+							 parsed_c.dbId, parsed_c.tsId);
 
 		appendStringInfoString(buf, "; inval msgs:");
-		for (i = 0; i < xlrec->nmsgs; i++)
+		for (i = 0; i < parsed_c.nmsgs; i++)
 		{
-			SharedInvalidationMessage *msg = &msgs[i];
+			SharedInvalidationMessage *msg = &parsed_c.msgs[i];
 
 			if (msg->id >= 0)
 				appendStringInfo(buf, " catcache %d", msg->id);
@@ -83,45 +178,31 @@ xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec)
 }
 
 static void
-xact_desc_commit_compact(StringInfo buf, xl_xact_commit_compact *xlrec)
+xact_desc_abort(StringInfo buf, uint8 info, xl_xact_abort *xlrec)
 {
+	xl_xact_parsed_abort parsed_a;
 	int			i;
 
-	appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time));
-
-	if (xlrec->nsubxacts > 0)
-	{
-		appendStringInfoString(buf, "; subxacts:");
-		for (i = 0; i < xlrec->nsubxacts; i++)
-			appendStringInfo(buf, " %u", xlrec->subxacts[i]);
-	}
-}
-
-static void
-xact_desc_abort(StringInfo buf, xl_xact_abort *xlrec)
-{
-	int			i;
+	ParseAbortRecord(info, xlrec, &parsed_a);
 
 	appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time));
-	if (xlrec->nrels > 0)
+	if (parsed_a.nrels > 0)
 	{
 		appendStringInfoString(buf, "; rels:");
-		for (i = 0; i < xlrec->nrels; i++)
+		for (i = 0; i < parsed_a.nrels; i++)
 		{
-			char	   *path = relpathperm(xlrec->xnodes[i], MAIN_FORKNUM);
+			char	   *path = relpathperm(parsed_a.xnodes[i], MAIN_FORKNUM);
 
 			appendStringInfo(buf, " %s", path);
 			pfree(path);
 		}
 	}
-	if (xlrec->nsubxacts > 0)
-	{
-		TransactionId *xacts = (TransactionId *)
-		&xlrec->xnodes[xlrec->nrels];
 
+	if (parsed_a.nsubxacts > 0)
+	{
 		appendStringInfoString(buf, "; subxacts:");
-		for (i = 0; i < xlrec->nsubxacts; i++)
-			appendStringInfo(buf, " %u", xacts[i]);
+		for (i = 0; i < parsed_a.nsubxacts; i++)
+			appendStringInfo(buf, " %u", parsed_a.subxacts[i]);
 	}
 }
 
@@ -142,37 +223,33 @@ xact_desc(StringInfo buf, XLogReaderState *record)
 	char	   *rec = XLogRecGetData(record);
 	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
 
-	if (info == XLOG_XACT_COMMIT_COMPACT)
-	{
-		xl_xact_commit_compact *xlrec = (xl_xact_commit_compact *) rec;
+	info &= XLOG_XACT_OPMASK;
 
-		xact_desc_commit_compact(buf, xlrec);
-	}
-	else if (info == XLOG_XACT_COMMIT)
+	if (info == XLOG_XACT_COMMIT)
 	{
 		xl_xact_commit *xlrec = (xl_xact_commit *) rec;
 
-		xact_desc_commit(buf, xlrec);
+		xact_desc_commit(buf, XLogRecGetInfo(record), xlrec);
 	}
 	else if (info == XLOG_XACT_ABORT)
 	{
 		xl_xact_abort *xlrec = (xl_xact_abort *) rec;
 
-		xact_desc_abort(buf, xlrec);
+		xact_desc_abort(buf, XLogRecGetInfo(record), xlrec);
 	}
 	else if (info == XLOG_XACT_COMMIT_PREPARED)
 	{
 		xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared *) rec;
 
 		appendStringInfo(buf, "%u: ", xlrec->xid);
-		xact_desc_commit(buf, &xlrec->crec);
+		xact_desc_commit(buf, XLogRecGetInfo(record), &xlrec->crec);
 	}
 	else if (info == XLOG_XACT_ABORT_PREPARED)
 	{
 		xl_xact_abort_prepared *xlrec = (xl_xact_abort_prepared *) rec;
 
 		appendStringInfo(buf, "%u: ", xlrec->xid);
-		xact_desc_abort(buf, &xlrec->arec);
+		xact_desc_abort(buf, XLogRecGetInfo(record), &xlrec->arec);
 	}
 	else if (info == XLOG_XACT_ASSIGNMENT)
 	{
@@ -193,7 +270,10 @@ xact_identify(uint8 info)
 {
 	const char *id = NULL;
 
-	switch (info & ~XLR_INFO_MASK)
+	info &= XLR_INFO_MASK;
+	info &= XLOG_XACT_OPMASK;
+
+	switch (info)
 	{
 		case XLOG_XACT_COMMIT:
 			id = "COMMIT";
@@ -213,9 +293,6 @@ xact_identify(uint8 info)
 		case XLOG_XACT_ASSIGNMENT:
 			id = "ASSIGNMENT";
 			break;
-		case XLOG_XACT_COMMIT_COMPACT:
-			id = "COMMIT_COMPACT";
-			break;
 	}
 
 	return id;
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 6c7029e..a69b9fc 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -2083,6 +2083,9 @@ RecordTransactionCommitPrepared(TransactionId xid,
 								bool initfileinval)
 {
 	xl_xact_commit_prepared xlrec;
+
+	uint8 info;
+
 	XLogRecPtr	recptr;
 
 	START_CRIT_SECTION();
@@ -2090,37 +2093,20 @@ RecordTransactionCommitPrepared(TransactionId xid,
 	/* See notes in RecordTransactionCommit */
 	MyPgXact->delayChkpt = true;
 
-	/* Emit the XLOG commit record */
 	xlrec.xid = xid;
 
-	xlrec.crec.xinfo = initfileinval ? XACT_COMPLETION_UPDATE_RELCACHE_FILE : 0;
-
-	xlrec.crec.dbId = MyDatabaseId;
-	xlrec.crec.tsId = MyDatabaseTableSpace;
-
-	xlrec.crec.xact_time = GetCurrentTimestamp();
-	xlrec.crec.nrels = nrels;
-	xlrec.crec.nsubxacts = nchildren;
-	xlrec.crec.nmsgs = ninvalmsgs;
-
 	XLogBeginInsert();
-	XLogRegisterData((char *) (&xlrec), MinSizeOfXactCommitPrepared);
-
-	/* dump rels to delete */
-	if (nrels > 0)
-		XLogRegisterData((char *) rels, nrels * sizeof(RelFileNode));
 
-	/* dump committed child Xids */
-	if (nchildren > 0)
-		XLogRegisterData((char *) children,
-						 nchildren * sizeof(TransactionId));
+	/* emit the twophase part of the record */
+	XLogRegisterData((char *) (&xlrec), offsetof(xl_xact_commit_prepared, crec));
 
-	/* dump cache invalidation messages */
-	if (ninvalmsgs > 0)
-		XLogRegisterData((char *) invalmsgs,
-						 ninvalmsgs * sizeof(SharedInvalidationMessage));
+	/* and then the embedded commit record */
+	info = XactEmitCommitRecord(GetCurrentTimestamp(),
+								nchildren, children, nrels, rels,
+								ninvalmsgs, invalmsgs,
+								initfileinval, false);
 
-	recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_PREPARED);
+	recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_PREPARED | info);
 
 	/*
 	 * We don't currently try to sleep before flush here ... nor is there any
@@ -2165,6 +2151,7 @@ RecordTransactionAbortPrepared(TransactionId xid,
 {
 	xl_xact_abort_prepared xlrec;
 	XLogRecPtr	recptr;
+	uint8 info;
 
 	/*
 	 * Catch the scenario where we aborted partway through
@@ -2178,23 +2165,18 @@ RecordTransactionAbortPrepared(TransactionId xid,
 
 	/* Emit the XLOG abort record */
 	xlrec.xid = xid;
-	xlrec.arec.xact_time = GetCurrentTimestamp();
-	xlrec.arec.nrels = nrels;
-	xlrec.arec.nsubxacts = nchildren;
 
 	XLogBeginInsert();
-	XLogRegisterData((char *) (&xlrec), MinSizeOfXactAbortPrepared);
 
-	/* dump rels to delete */
-	if (nrels > 0)
-		XLogRegisterData((char *) rels, nrels * sizeof(RelFileNode));
+	/* emit the twophase part of the record */
+	XLogRegisterData((char *) (&xlrec), offsetof(xl_xact_abort_prepared, arec));
 
-	/* dump committed child Xids */
-	if (nchildren > 0)
-		XLogRegisterData((char *) children,
-						 nchildren * sizeof(TransactionId));
+	/* and then the embedded abort record */
+	info = XactEmitAbortRecord(GetCurrentTimestamp(),
+							   nchildren, children,
+							   nrels, rels);
 
-	recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT_PREPARED);
+	recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT_PREPARED | info);
 
 	/* Always flush, since we're about to remove the 2PC state file */
 	XLogFlush(recptr);
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 97000ef..134f852 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1041,6 +1041,8 @@ RecordTransactionCommit(void)
 	}
 	else
 	{
+		uint8				info = 0;
+
 		/*
 		 * Begin commit critical section and insert the commit XLOG record.
 		 */
@@ -1069,70 +1071,14 @@ RecordTransactionCommit(void)
 
 		SetCurrentTransactionStopTimestamp();
 
-		/*
-		 * Do we need the long commit record? If not, use the compact format.
-		 *
-		 * For now always use the non-compact version if wal_level=logical, so
-		 * we can hide commits from other databases. TODO: In the future we
-		 * should merge compact and non-compact commits and use a flags
-		 * variable to determine if it contains subxacts, relations or
-		 * invalidation messages, that's more extensible and degrades more
-		 * gracefully. Till then, it's just 20 bytes of overhead.
-		 */
-		if (nrels > 0 || nmsgs > 0 || RelcacheInitFileInval || forceSyncCommit ||
-			XLogLogicalInfoActive())
-		{
-			xl_xact_commit xlrec;
-
-			/*
-			 * Set flags required for recovery processing of commits.
-			 */
-			xlrec.xinfo = 0;
-			if (RelcacheInitFileInval)
-				xlrec.xinfo |= XACT_COMPLETION_UPDATE_RELCACHE_FILE;
-			if (forceSyncCommit)
-				xlrec.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT;
-
-			xlrec.dbId = MyDatabaseId;
-			xlrec.tsId = MyDatabaseTableSpace;
-
-			xlrec.xact_time = xactStopTimestamp;
-			xlrec.nrels = nrels;
-			xlrec.nsubxacts = nchildren;
-			xlrec.nmsgs = nmsgs;
-
-			XLogBeginInsert();
-			XLogRegisterData((char *) (&xlrec), MinSizeOfXactCommit);
-			/* dump rels to delete */
-			if (nrels > 0)
-				XLogRegisterData((char *) rels,
-								 nrels * sizeof(RelFileNode));
-			/* dump committed child Xids */
-			if (nchildren > 0)
-				XLogRegisterData((char *) children,
-								 nchildren * sizeof(TransactionId));
-			/* dump shared cache invalidation messages */
-			if (nmsgs > 0)
-				XLogRegisterData((char *) invalMessages,
-								 nmsgs * sizeof(SharedInvalidationMessage));
-			(void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT);
-		}
-		else
-		{
-			xl_xact_commit_compact xlrec;
-
-			xlrec.xact_time = xactStopTimestamp;
-			xlrec.nsubxacts = nchildren;
+		XLogBeginInsert();
 
-			XLogBeginInsert();
-			XLogRegisterData((char *) (&xlrec), MinSizeOfXactCommitCompact);
-			/* dump committed child Xids */
-			if (nchildren > 0)
-				XLogRegisterData((char *) children,
-								 nchildren * sizeof(TransactionId));
+		info = XactEmitCommitRecord(xactStopTimestamp,
+									nchildren, children, nrels, rels,
+									nmsgs, invalMessages,
+									RelcacheInitFileInval, forceSyncCommit);
 
-			(void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_COMPACT);
-		}
+		(void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT | info);
 	}
 
 	/*
@@ -1419,7 +1365,8 @@ RecordTransactionAbort(bool isSubXact)
 	RelFileNode *rels;
 	int			nchildren;
 	TransactionId *children;
-	xl_xact_abort xlrec;
+	uint8		info = 0;
+	TimestampTz xact_time;
 
 	/*
 	 * If we haven't been assigned an XID, nobody will care whether we aborted
@@ -1459,28 +1406,20 @@ RecordTransactionAbort(bool isSubXact)
 
 	/* Write the ABORT record */
 	if (isSubXact)
-		xlrec.xact_time = GetCurrentTimestamp();
+		xact_time = GetCurrentTimestamp();
 	else
 	{
 		SetCurrentTransactionStopTimestamp();
-		xlrec.xact_time = xactStopTimestamp;
+		xact_time = xactStopTimestamp;
 	}
-	xlrec.nrels = nrels;
-	xlrec.nsubxacts = nchildren;
 
 	XLogBeginInsert();
-	XLogRegisterData((char *) (&xlrec), MinSizeOfXactAbort);
 
-	/* dump rels to delete */
-	if (nrels > 0)
-		XLogRegisterData((char *) rels, nrels * sizeof(RelFileNode));
-
-	/* dump committed child Xids */
-	if (nchildren > 0)
-		XLogRegisterData((char *) children,
-						 nchildren * sizeof(TransactionId));
+	XactEmitAbortRecord(xact_time,
+						nchildren, children,
+						nrels, rels);
 
-	(void) XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT);
+	(void) XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT | info);
 
 	/*
 	 * Report the latest async abort LSN, so that the WAL writer knows to
@@ -4654,23 +4593,186 @@ xactGetCommittedChildren(TransactionId **ptr)
  *	XLOG support routines
  */
 
+
+/*
+ * Emit a commit record based on the passed in parameters. A xlog insertion
+ * already must have been started. The record isn't inserted though.
+ *
+ * Returns info bytes that have to be ORed with the record type passed to
+ * XLogInsert().
+ */
+uint8
+XactEmitCommitRecord(TimestampTz commit_time,
+					 int nsubxacts, TransactionId *subxacts,
+					 int nrels, RelFileNode *rels,
+					 int nmsgs, SharedInvalidationMessage *msgs,
+					 bool relcacheInval, bool forceSync)
+{
+	uint8						info = 0;
+	/* static, so they're valid until the caller does the XLogInsert() */
+	static uint32				xinfo;
+	static xl_xact_commit		xlrec;
+	static xl_xact_dbinfo		xl_dbinfo;
+	static xl_xact_subxacts		xl_subxacts;
+	static xl_xact_relfilenodes xl_relfilenodes;
+	static xl_xact_invals		xl_invals;
+
+	xinfo = 0;
+
+	/*
+	 * First figure out and collect all the information needed
+	 */
+	xlrec.xact_time = commit_time;
+
+	if (relcacheInval)
+		xinfo |= XACT_COMPLETION_UPDATE_RELCACHE_FILE;
+	if (forceSyncCommit)
+		xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT;
+
+	/*
+	 * Relcache invalidation requires information database and so does
+	 * logical decoding.
+	 */
+	if (nmsgs > 0 || XLogLogicalInfoActive())
+	{
+		xinfo |= XACT_XINFO_HAS_DBINFO;
+		xl_dbinfo.dbId = MyDatabaseId;
+		xl_dbinfo.tsId = MyDatabaseTableSpace;
+	}
+
+	if (nsubxacts > 0)
+	{
+		xinfo |= XACT_XINFO_HAS_SUBXACTS;
+		xl_subxacts.nsubxacts = nsubxacts;
+	}
+
+	if (nrels > 0)
+	{
+		xinfo |= XACT_XINFO_HAS_RELFILENODES;
+		xl_relfilenodes.nrels = nrels;
+	}
+
+	if (nmsgs > 0)
+	{
+		xinfo |= XACT_XINFO_HAS_INVALS;
+		xl_invals.nmsgs = nmsgs;
+	}
+
+	if (xinfo != 0)
+		info |= XLOG_XACT_HAS_INFO;
+
+	/*
+	 * Then include all the collected data.
+	 */
+	XLogRegisterData((char *) (&xlrec), sizeof(xl_xact_commit));
+
+	if (xinfo != 0)
+		XLogRegisterData((char *) (&xinfo), sizeof(xinfo));
+
+	if (xinfo & XACT_XINFO_HAS_DBINFO)
+		XLogRegisterData((char *) (&xl_dbinfo), sizeof(xl_dbinfo));
+
+	if (xinfo & XACT_XINFO_HAS_SUBXACTS)
+	{
+		XLogRegisterData((char *) (&xl_subxacts),
+						 MinSizeOfXactSubxacts);
+		XLogRegisterData((char *) subxacts,
+						 nsubxacts * sizeof(TransactionId));
+	}
+
+	if (xinfo & XACT_XINFO_HAS_RELFILENODES)
+	{
+		XLogRegisterData((char *) (&xl_relfilenodes),
+						 MinSizeOfXactRelfilenodes);
+		XLogRegisterData((char *) rels,
+						 nrels * sizeof(RelFileNode));
+	}
+
+	if (xinfo & XACT_XINFO_HAS_INVALS)
+	{
+		XLogRegisterData((char *) (&xl_invals), MinSizeOfXactInvals);
+		XLogRegisterData((char *) msgs,
+						 nmsgs * sizeof(SharedInvalidationMessage));
+	}
+
+	return info;
+}
+
+/*
+ * Emit, but don't insert, a abort record.
+ *
+ * See XactEmitCommitRecord for details.
+ */
+uint8
+XactEmitAbortRecord(TimestampTz abort_time,
+					int nsubxacts, TransactionId *subxacts,
+					int nrels, RelFileNode *rels)
+{
+	uint8						info = 0;
+	/* static, so they're valid until the caller does the XLogInsert() */
+	static uint32				xinfo;
+	static xl_xact_commit		xlrec;
+	static xl_xact_subxacts		xl_subxacts;
+	static xl_xact_relfilenodes xl_relfilenodes;
+
+	xinfo = 0;
+
+	/* collect data to log */
+	if (nsubxacts > 0)
+	{
+		xinfo |= XACT_XINFO_HAS_SUBXACTS;
+		xl_subxacts.nsubxacts = nsubxacts;
+	}
+
+   if (nrels > 0)
+   {
+	   xinfo |= XACT_XINFO_HAS_RELFILENODES;
+	   xl_relfilenodes.nrels = nrels;
+   }
+
+   if (xinfo != 0)
+	   info |= XLOG_XACT_HAS_INFO;
+
+   /* and actually log data */
+   XLogRegisterData((char *) (&xlrec), MinSizeOfXactAbort);
+
+   if (xinfo != 0)
+	   XLogRegisterData((char *) (&xinfo), sizeof(xinfo));
+
+   if (xinfo & XACT_XINFO_HAS_SUBXACTS)
+   {
+	   XLogRegisterData((char *) (&xl_subxacts),
+						MinSizeOfXactSubxacts);
+	   XLogRegisterData((char *) subxacts,
+						nsubxacts * sizeof(TransactionId));
+   }
+
+   if (xinfo & XACT_XINFO_HAS_RELFILENODES)
+   {
+	   XLogRegisterData((char *) (&xl_relfilenodes),
+						MinSizeOfXactRelfilenodes);
+	   XLogRegisterData((char *) rels,
+						nrels * sizeof(RelFileNode));
+   }
+
+   return info;
+}
+
 /*
  * Before 9.0 this was a fairly short function, but now it performs many
  * actions for which the order of execution is critical.
  */
 static void
-xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
-						  TimestampTz commit_time,
-						  TransactionId *sub_xids, int nsubxacts,
-						  SharedInvalidationMessage *inval_msgs, int nmsgs,
-						  RelFileNode *xnodes, int nrels,
-						  Oid dbId, Oid tsId,
-						  uint32 xinfo)
+xact_redo_commit(uint8 info, xl_xact_commit *xlrec,
+				 TransactionId xid, XLogRecPtr lsn)
 {
+	xl_xact_parsed_commit parsed_c;
 	TransactionId max_xid;
 	int			i;
 
-	max_xid = TransactionIdLatest(xid, nsubxacts, sub_xids);
+	ParseCommitRecord(info, xlrec, &parsed_c);
+
+	max_xid = TransactionIdLatest(xid, parsed_c.nsubxacts, parsed_c.subxacts);
 
 	/*
 	 * Make sure nextXid is beyond any XID mentioned in the record.
@@ -4689,15 +4791,16 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
 	}
 
 	/* Set the transaction commit timestamp and metadata */
-	TransactionTreeSetCommitTsData(xid, nsubxacts, sub_xids,
-								   commit_time, InvalidCommitTsNodeId, false);
+	TransactionTreeSetCommitTsData(xid, parsed_c.nsubxacts, parsed_c.subxacts,
+								   xlrec->xact_time, InvalidCommitTsNodeId,
+								   false);
 
 	if (standbyState == STANDBY_DISABLED)
 	{
 		/*
 		 * Mark the transaction committed in pg_clog.
 		 */
-		TransactionIdCommitTree(xid, nsubxacts, sub_xids);
+		TransactionIdCommitTree(xid, parsed_c.nsubxacts, parsed_c.subxacts);
 	}
 	else
 	{
@@ -4721,21 +4824,21 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
 		 * bits set on changes made by transactions that haven't yet
 		 * recovered. It's unlikely but it's good to be safe.
 		 */
-		TransactionIdAsyncCommitTree(xid, nsubxacts, sub_xids, lsn);
+		TransactionIdAsyncCommitTree(xid, parsed_c.nsubxacts, parsed_c.subxacts, lsn);
 
 		/*
 		 * We must mark clog before we update the ProcArray.
 		 */
-		ExpireTreeKnownAssignedTransactionIds(xid, nsubxacts, sub_xids, max_xid);
+		ExpireTreeKnownAssignedTransactionIds(xid, parsed_c.nsubxacts, parsed_c.subxacts, max_xid);
 
 		/*
 		 * Send any cache invalidations attached to the commit. We must
 		 * maintain the same order of invalidation then release locks as
 		 * occurs in CommitTransaction().
 		 */
-		ProcessCommittedInvalidationMessages(inval_msgs, nmsgs,
-								  XactCompletionRelcacheInitFileInval(xinfo),
-											 dbId, tsId);
+		ProcessCommittedInvalidationMessages(parsed_c.msgs, parsed_c.nmsgs,
+											 XactCompletionRelcacheInitFileInval(parsed_c.xinfo),
+											 parsed_c.dbId, parsed_c.tsId);
 
 		/*
 		 * Release locks, if any. We do this for both two phase and normal one
@@ -4748,7 +4851,7 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
 	}
 
 	/* Make sure files supposed to be dropped are dropped */
-	if (nrels > 0)
+	if (parsed_c.nrels > 0)
 	{
 		/*
 		 * First update minimum recovery point to cover this WAL record. Once
@@ -4767,13 +4870,13 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
 		 */
 		XLogFlush(lsn);
 
-		for (i = 0; i < nrels; i++)
+		for (i = 0; i < parsed_c.nrels; i++)
 		{
-			SMgrRelation srel = smgropen(xnodes[i], InvalidBackendId);
+			SMgrRelation srel = smgropen(parsed_c.xnodes[i], InvalidBackendId);
 			ForkNumber	fork;
 
 			for (fork = 0; fork <= MAX_FORKNUM; fork++)
-				XLogDropRelation(xnodes[i], fork);
+				XLogDropRelation(parsed_c.xnodes[i], fork);
 			smgrdounlink(srel, true);
 			smgrclose(srel);
 		}
@@ -4791,52 +4894,12 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
 	 * minRecoveryPoint during recovery) helps to reduce that problem window,
 	 * for any user that requested ForceSyncCommit().
 	 */
-	if (XactCompletionForceSyncCommit(xinfo))
+	if (XactCompletionForceSyncCommit(parsed_c.xinfo))
 		XLogFlush(lsn);
 
 }
 
 /*
- * Utility function to call xact_redo_commit_internal after breaking down xlrec
- */
-static void
-xact_redo_commit(xl_xact_commit *xlrec,
-				 TransactionId xid, XLogRecPtr lsn)
-{
-	TransactionId *subxacts;
-	SharedInvalidationMessage *inval_msgs;
-
-	/* subxid array follows relfilenodes */
-	subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
-	/* invalidation messages array follows subxids */
-	inval_msgs = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
-
-	xact_redo_commit_internal(xid, lsn, xlrec->xact_time,
-							  subxacts, xlrec->nsubxacts,
-							  inval_msgs, xlrec->nmsgs,
-							  xlrec->xnodes, xlrec->nrels,
-							  xlrec->dbId,
-							  xlrec->tsId,
-							  xlrec->xinfo);
-}
-
-/*
- * Utility function to call xact_redo_commit_internal  for compact form of message.
- */
-static void
-xact_redo_commit_compact(xl_xact_commit_compact *xlrec,
-						 TransactionId xid, XLogRecPtr lsn)
-{
-	xact_redo_commit_internal(xid, lsn, xlrec->xact_time,
-							  xlrec->subxacts, xlrec->nsubxacts,
-							  NULL, 0,	/* inval msgs */
-							  NULL, 0,	/* relfilenodes */
-							  InvalidOid,		/* dbId */
-							  InvalidOid,		/* tsId */
-							  0);		/* xinfo */
-}
-
-/*
  * Be careful with the order of execution, as with xact_redo_commit().
  * The two functions are similar but differ in key places.
  *
@@ -4846,14 +4909,49 @@ xact_redo_commit_compact(xl_xact_commit_compact *xlrec,
  * because subtransaction commit is never WAL logged.
  */
 static void
-xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid)
+xact_redo_abort(uint8 info, xl_xact_abort *xlrec, TransactionId xid)
 {
+	int			i;
+	char	   *data = ((char *) xlrec) + MinSizeOfXactCommit;
 	TransactionId *sub_xids;
 	TransactionId max_xid;
-	int			i;
+	uint32		xinfo = 0;
+	int			nsubxacts = 0;
+	RelFileNode *xnodes = NULL;
+	int			nrels = 0;
+
+	if (info & XLOG_XACT_HAS_INFO)
+	{
+		xinfo = *(uint32 *) data;
+		data += sizeof(uint32);
+	}
+
+	if (xinfo & XACT_XINFO_HAS_SUBXACTS)
+	{
+		xl_xact_subxacts   *xl_subxacts = (xl_xact_subxacts *) data;
 
-	sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
-	max_xid = TransactionIdLatest(xid, xlrec->nsubxacts, sub_xids);
+		nsubxacts = xl_subxacts->nsubxacts;
+		sub_xids = xl_subxacts->subxacts;
+
+		data += MinSizeOfXactSubxacts;
+		data += nsubxacts * sizeof(TransactionId);
+
+		max_xid = TransactionIdLatest(xid, xl_subxacts->nsubxacts,
+									  xl_subxacts->subxacts);
+	}
+	else
+		max_xid = xid;
+
+	if (xinfo & XACT_XINFO_HAS_RELFILENODES)
+	{
+		xl_xact_relfilenodes *xl_relfilenodes = (xl_xact_relfilenodes *) data;
+
+		data += MinSizeOfXactRelfilenodes;
+		data += xl_relfilenodes->nrels * sizeof(TransactionId);
+
+		nrels = xl_relfilenodes->nrels;
+		xnodes = xl_relfilenodes->xnodes;
+	}
 
 	/*
 	 * Make sure nextXid is beyond any XID mentioned in the record.
@@ -4874,7 +4972,7 @@ xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid)
 	if (standbyState == STANDBY_DISABLED)
 	{
 		/* Mark the transaction aborted in pg_clog, no need for async stuff */
-		TransactionIdAbortTree(xid, xlrec->nsubxacts, sub_xids);
+		TransactionIdAbortTree(xid, nsubxacts, sub_xids);
 	}
 	else
 	{
@@ -4890,12 +4988,12 @@ xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid)
 		RecordKnownAssignedTransactionIds(max_xid);
 
 		/* Mark the transaction aborted in pg_clog, no need for async stuff */
-		TransactionIdAbortTree(xid, xlrec->nsubxacts, sub_xids);
+		TransactionIdAbortTree(xid, nsubxacts, sub_xids);
 
 		/*
 		 * We must update the ProcArray after we have marked clog.
 		 */
-		ExpireTreeKnownAssignedTransactionIds(xid, xlrec->nsubxacts, sub_xids, max_xid);
+		ExpireTreeKnownAssignedTransactionIds(xid, nsubxacts, sub_xids, max_xid);
 
 		/*
 		 * There are no flat files that need updating, nor invalidation
@@ -4905,17 +5003,17 @@ xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid)
 		/*
 		 * Release locks, if any. There are no invalidations to send.
 		 */
-		StandbyReleaseLockTree(xid, xlrec->nsubxacts, sub_xids);
+		StandbyReleaseLockTree(xid, nsubxacts, sub_xids);
 	}
 
 	/* Make sure files supposed to be dropped are dropped */
-	for (i = 0; i < xlrec->nrels; i++)
+	for (i = 0; i < nrels; i++)
 	{
-		SMgrRelation srel = smgropen(xlrec->xnodes[i], InvalidBackendId);
+		SMgrRelation srel = smgropen(xnodes[i], InvalidBackendId);
 		ForkNumber	fork;
 
 		for (fork = 0; fork <= MAX_FORKNUM; fork++)
-			XLogDropRelation(xlrec->xnodes[i], fork);
+			XLogDropRelation(xnodes[i], fork);
 		smgrdounlink(srel, true);
 		smgrclose(srel);
 	}
@@ -4926,26 +5024,23 @@ xact_redo(XLogReaderState *record)
 {
 	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
 
+	info &= XLOG_XACT_OPMASK;
+
 	/* Backup blocks are not used in xact records */
 	Assert(!XLogRecHasAnyBlockRefs(record));
 
-	if (info == XLOG_XACT_COMMIT_COMPACT)
-	{
-		xl_xact_commit_compact *xlrec = (xl_xact_commit_compact *) XLogRecGetData(record);
-
-		xact_redo_commit_compact(xlrec, XLogRecGetXid(record), record->EndRecPtr);
-	}
-	else if (info == XLOG_XACT_COMMIT)
+	if (info == XLOG_XACT_COMMIT)
 	{
 		xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
 
-		xact_redo_commit(xlrec, XLogRecGetXid(record), record->EndRecPtr);
+		xact_redo_commit(XLogRecGetInfo(record), xlrec,
+						 XLogRecGetXid(record), record->EndRecPtr);
 	}
 	else if (info == XLOG_XACT_ABORT)
 	{
 		xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record);
 
-		xact_redo_abort(xlrec, XLogRecGetXid(record));
+		xact_redo_abort(XLogRecGetInfo(record), xlrec, XLogRecGetXid(record));
 	}
 	else if (info == XLOG_XACT_PREPARE)
 	{
@@ -4957,14 +5052,15 @@ xact_redo(XLogReaderState *record)
 	{
 		xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared *) XLogRecGetData(record);
 
-		xact_redo_commit(&xlrec->crec, xlrec->xid, record->EndRecPtr);
+		xact_redo_commit(XLogRecGetInfo(record), &xlrec->crec, xlrec->xid,
+						 record->EndRecPtr);
 		RemoveTwoPhaseFile(xlrec->xid, false);
 	}
 	else if (info == XLOG_XACT_ABORT_PREPARED)
 	{
 		xl_xact_abort_prepared *xlrec = (xl_xact_abort_prepared *) XLogRecGetData(record);
 
-		xact_redo_abort(&xlrec->arec, xlrec->xid);
+		xact_redo_abort(XLogRecGetInfo(record), &xlrec->arec, xlrec->xid);
 		RemoveTwoPhaseFile(xlrec->xid, false);
 	}
 	else if (info == XLOG_XACT_ASSIGNMENT)
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 629a457..f0e83ff 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -5106,11 +5106,6 @@ getRecordTimestamp(XLogReaderState *record, TimestampTz *recordXtime)
 		*recordXtime = ((xl_restore_point *) XLogRecGetData(record))->rp_time;
 		return true;
 	}
-	if (rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT_COMPACT)
-	{
-		*recordXtime = ((xl_xact_commit_compact *) XLogRecGetData(record))->xact_time;
-		return true;
-	}
 	if (rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT)
 	{
 		*recordXtime = ((xl_xact_commit *) XLogRecGetData(record))->xact_time;
@@ -5169,7 +5164,7 @@ recoveryStopsBefore(XLogReaderState *record)
 		return false;
 	record_info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
 
-	if (record_info == XLOG_XACT_COMMIT_COMPACT || record_info == XLOG_XACT_COMMIT)
+	if (record_info == XLOG_XACT_COMMIT)
 	{
 		isCommit = true;
 		recordXid = XLogRecGetXid(record);
@@ -5289,8 +5284,7 @@ recoveryStopsAfter(XLogReaderState *record)
 	}
 
 	if (rmid == RM_XACT_ID &&
-		(record_info == XLOG_XACT_COMMIT_COMPACT ||
-		 record_info == XLOG_XACT_COMMIT ||
+		(record_info == XLOG_XACT_COMMIT ||
 		 record_info == XLOG_XACT_COMMIT_PREPARED ||
 		 record_info == XLOG_XACT_ABORT ||
 		 record_info == XLOG_XACT_ABORT_PREPARED))
@@ -5326,8 +5320,7 @@ recoveryStopsAfter(XLogReaderState *record)
 			recoveryStopTime = recordXtime;
 			recoveryStopName[0] = '\0';
 
-			if (record_info == XLOG_XACT_COMMIT_COMPACT ||
-				record_info == XLOG_XACT_COMMIT ||
+			if (record_info == XLOG_XACT_COMMIT ||
 				record_info == XLOG_XACT_COMMIT_PREPARED)
 			{
 				ereport(LOG,
@@ -5443,8 +5436,7 @@ recoveryApplyDelay(XLogReaderState *record)
 	 */
 	record_info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
 	if (!(XLogRecGetRmid(record) == RM_XACT_ID &&
-		  (record_info == XLOG_XACT_COMMIT_COMPACT ||
-		   record_info == XLOG_XACT_COMMIT ||
+		  (record_info == XLOG_XACT_COMMIT ||
 		   record_info == XLOG_XACT_COMMIT_PREPARED)))
 		return false;
 
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 77c02ba..8211daf 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -64,12 +64,9 @@ static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
-			 TransactionId xid, Oid dboid,
-			 TimestampTz commit_time,
-			 int nsubxacts, TransactionId *sub_xids,
-			 int ninval_msgs, SharedInvalidationMessage *msg);
-static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecPtr lsn,
-			TransactionId xid, TransactionId *sub_xids, int nsubxacts);
+						 xl_xact_commit *xlrec, TransactionId xid);
+static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+						xl_xact_abort *xlrec, TransactionId xid);
 
 /* common function to decode tuples */
 static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
@@ -194,23 +191,17 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
 		return;
 
+	info &= XLOG_XACT_OPMASK;
+
 	switch (info)
 	{
 		case XLOG_XACT_COMMIT:
 			{
 				xl_xact_commit *xlrec;
-				TransactionId *subxacts = NULL;
-				SharedInvalidationMessage *invals = NULL;
 
 				xlrec = (xl_xact_commit *) XLogRecGetData(r);
 
-				subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
-				invals = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
-
-				DecodeCommit(ctx, buf, XLogRecGetXid(r), xlrec->dbId,
-							 xlrec->xact_time,
-							 xlrec->nsubxacts, subxacts,
-							 xlrec->nmsgs, invals);
+				DecodeCommit(ctx, buf, xlrec, XLogRecGetXid(r));
 
 				break;
 			}
@@ -218,63 +209,35 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 			{
 				xl_xact_commit_prepared *prec;
 				xl_xact_commit *xlrec;
-				TransactionId *subxacts;
-				SharedInvalidationMessage *invals = NULL;
 
 				/* Prepared commits contain a normal commit record... */
 				prec = (xl_xact_commit_prepared *) XLogRecGetData(r);
 				xlrec = &prec->crec;
 
-				subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
-				invals = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
-
-				DecodeCommit(ctx, buf, prec->xid, xlrec->dbId,
-							 xlrec->xact_time,
-							 xlrec->nsubxacts, subxacts,
-							 xlrec->nmsgs, invals);
-
-				break;
-			}
-		case XLOG_XACT_COMMIT_COMPACT:
-			{
-				xl_xact_commit_compact *xlrec;
-
-				xlrec = (xl_xact_commit_compact *) XLogRecGetData(r);
+				DecodeCommit(ctx, buf, xlrec, prec->xid);
 
-				DecodeCommit(ctx, buf, XLogRecGetXid(r), InvalidOid,
-							 xlrec->xact_time,
-							 xlrec->nsubxacts, xlrec->subxacts,
-							 0, NULL);
 				break;
 			}
 		case XLOG_XACT_ABORT:
 			{
 				xl_xact_abort *xlrec;
-				TransactionId *sub_xids;
 
 				xlrec = (xl_xact_abort *) XLogRecGetData(r);
 
-				sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
-
-				DecodeAbort(ctx, buf->origptr, XLogRecGetXid(r),
-							sub_xids, xlrec->nsubxacts);
+				DecodeAbort(ctx, buf, xlrec, XLogRecGetXid(r));
 				break;
 			}
 		case XLOG_XACT_ABORT_PREPARED:
 			{
 				xl_xact_abort_prepared *prec;
 				xl_xact_abort *xlrec;
-				TransactionId *sub_xids;
 
 				/* prepared abort contain a normal commit abort... */
 				prec = (xl_xact_abort_prepared *) XLogRecGetData(r);
 				xlrec = &prec->arec;
 
-				sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
-
 				/* r->xl_xid is committed in a separate record */
-				DecodeAbort(ctx, buf->origptr, prec->xid,
-							sub_xids, xlrec->nsubxacts);
+				DecodeAbort(ctx, buf, xlrec, prec->xid);
 				break;
 			}
 
@@ -477,27 +440,27 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
  */
 static void
 DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
-			 TransactionId xid, Oid dboid,
-			 TimestampTz commit_time,
-			 int nsubxacts, TransactionId *sub_xids,
-			 int ninval_msgs, SharedInvalidationMessage *msgs)
+			 xl_xact_commit *xlrec, TransactionId xid)
 {
+	xl_xact_parsed_commit parsed_c;
 	int			i;
 
+	ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed_c);
+
 	/*
 	 * Process invalidation messages, even if we're not interested in the
 	 * transaction's contents, since the various caches need to always be
 	 * consistent.
 	 */
-	if (ninval_msgs > 0)
+	if (parsed_c.nmsgs > 0)
 	{
 		ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr,
-									  ninval_msgs, msgs);
+									  parsed_c.nmsgs, parsed_c.msgs);
 		ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
 	}
 
 	SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
-					   nsubxacts, sub_xids);
+					   parsed_c.nsubxacts, parsed_c.subxacts);
 
 	/* ----
 	 * Check whether we are interested in this specific transaction, and tell
@@ -524,12 +487,11 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	 * ---
 	 */
 	if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
-		(dboid != InvalidOid && dboid != ctx->slot->data.database))
+		(parsed_c.dbId != InvalidOid && parsed_c.dbId != ctx->slot->data.database))
 	{
-		for (i = 0; i < nsubxacts; i++)
+		for (i = 0; i < parsed_c.nsubxacts; i++)
 		{
-			ReorderBufferForget(ctx->reorder, *sub_xids, buf->origptr);
-			sub_xids++;
+			ReorderBufferForget(ctx->reorder, parsed_c.subxacts[i], buf->origptr);
 		}
 		ReorderBufferForget(ctx->reorder, xid, buf->origptr);
 
@@ -537,16 +499,15 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	}
 
 	/* tell the reorderbuffer about the surviving subtransactions */
-	for (i = 0; i < nsubxacts; i++)
+	for (i = 0; i < parsed_c.nsubxacts; i++)
 	{
-		ReorderBufferCommitChild(ctx->reorder, xid, *sub_xids,
+		ReorderBufferCommitChild(ctx->reorder, xid, parsed_c.subxacts[i],
 								 buf->origptr, buf->endptr);
-		sub_xids++;
 	}
 
 	/* replay actions of all transaction + subtransactions in order */
 	ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
-						commit_time);
+						xlrec->xact_time);
 }
 
 /*
@@ -554,20 +515,24 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
  * snapbuild.c and reorderbuffer.c
  */
 static void
-DecodeAbort(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
-			TransactionId *sub_xids, int nsubxacts)
+DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+			xl_xact_abort *xlrec, TransactionId xid)
 {
+	xl_xact_parsed_abort parsed_a;
 	int			i;
 
-	SnapBuildAbortTxn(ctx->snapshot_builder, lsn, xid, nsubxacts, sub_xids);
+	ParseAbortRecord(XLogRecGetInfo(buf->record), xlrec, &parsed_a);
+
+	SnapBuildAbortTxn(ctx->snapshot_builder, buf->record->EndRecPtr, xid,
+					  parsed_a.nsubxacts, parsed_a.subxacts);
 
-	for (i = 0; i < nsubxacts; i++)
+	for (i = 0; i < parsed_a.nsubxacts; i++)
 	{
-		ReorderBufferAbort(ctx->reorder, *sub_xids, lsn);
-		sub_xids++;
+		ReorderBufferAbort(ctx->reorder, parsed_a.subxacts[i],
+						   buf->record->EndRecPtr);
 	}
 
-	ReorderBufferAbort(ctx->reorder, xid, lsn);
+	ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr);
 }
 
 /*
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index d7e5f64..122a383 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -18,6 +18,7 @@
 #include "lib/stringinfo.h"
 #include "nodes/pg_list.h"
 #include "storage/relfilenode.h"
+#include "storage/sinval.h"
 #include "utils/datetime.h"
 
 
@@ -103,8 +104,8 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
  */
 
 /*
- * XLOG allows to store some information in high 4 bits of log
- * record xl_info field
+ * XLOG allows to store some information in high 4 bits of log record xl_info
+ * field. We use 3 for the opcode, and one about an optional flag variable.
  */
 #define XLOG_XACT_COMMIT			0x00
 #define XLOG_XACT_PREPARE			0x10
@@ -112,7 +113,36 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
 #define XLOG_XACT_COMMIT_PREPARED	0x30
 #define XLOG_XACT_ABORT_PREPARED	0x40
 #define XLOG_XACT_ASSIGNMENT		0x50
-#define XLOG_XACT_COMMIT_COMPACT	0x60
+/* free opcode 0x60 */
+/* free opcode 0x70 */
+
+#define XLOG_XACT_OPMASK			0x70
+
+#define XLOG_XACT_HAS_INFO			0x80
+
+/*
+ * The following flags, stored in xinfo, determine which information is
+ * contained in commit/abort records.
+ */
+#define XACT_XINFO_HAS_DBINFO			(1U << 0)
+#define XACT_XINFO_HAS_SUBXACTS			(1U << 1)
+#define XACT_XINFO_HAS_RELFILENODES		(1U << 2)
+#define XACT_XINFO_HAS_INVALS			(1U << 3)
+
+/*
+ * Also stored in xinfo, these indicating a variety of additional actions that
+ * need to occur when emulating transaction effects during recovery.
+ *
+ * They are named XactCompletion... to differentiate them from
+ * EOXact... routines which run at the end of the original transaction
+ * completion.
+ */
+#define XACT_COMPLETION_UPDATE_RELCACHE_FILE	(1U << 30)
+#define XACT_COMPLETION_FORCE_SYNC_COMMIT		(1U << 31)
+
+/* Access macros for above flags */
+#define XactCompletionRelcacheInitFileInval(xinfo)	(xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE)
+#define XactCompletionForceSyncCommit(xinfo)		(xinfo & XACT_COMPLETION_FORCE_SYNC_COMMIT)
 
 typedef struct xl_xact_assignment
 {
@@ -123,61 +153,62 @@ typedef struct xl_xact_assignment
 
 #define MinSizeOfXactAssignment offsetof(xl_xact_assignment, xsub)
 
-typedef struct xl_xact_commit_compact
+/* sub-records for commit/abort */
+typedef struct xl_xact_dbinfo
+{
+	Oid			dbId;			/* MyDatabaseId */
+	Oid			tsId;			/* MyDatabaseTableSpace */
+} xl_xact_dbinfo;
+
+typedef struct xl_xact_subxacts
 {
-	TimestampTz xact_time;		/* time of commit */
 	int			nsubxacts;		/* number of subtransaction XIDs */
-	/* ARRAY OF COMMITTED SUBTRANSACTION XIDs FOLLOWS */
 	TransactionId subxacts[FLEXIBLE_ARRAY_MEMBER];
-} xl_xact_commit_compact;
+} xl_xact_subxacts;
 
-#define MinSizeOfXactCommitCompact offsetof(xl_xact_commit_compact, subxacts)
+#define MinSizeOfXactSubxacts offsetof(xl_xact_subxacts, subxacts)
 
-typedef struct xl_xact_commit
+typedef struct xl_xact_relfilenodes
 {
-	TimestampTz xact_time;		/* time of commit */
-	uint32		xinfo;			/* info flags */
-	int			nrels;			/* number of RelFileNodes */
-	int			nsubxacts;		/* number of subtransaction XIDs */
-	int			nmsgs;			/* number of shared inval msgs */
-	Oid			dbId;			/* MyDatabaseId */
-	Oid			tsId;			/* MyDatabaseTableSpace */
-	/* Array of RelFileNode(s) to drop at commit */
+	int			nrels;		/* number of subtransaction XIDs */
 	RelFileNode xnodes[FLEXIBLE_ARRAY_MEMBER];
-	/* ARRAY OF COMMITTED SUBTRANSACTION XIDs FOLLOWS */
-	/* ARRAY OF SHARED INVALIDATION MESSAGES FOLLOWS */
-} xl_xact_commit;
+} xl_xact_relfilenodes;
 
-#define MinSizeOfXactCommit offsetof(xl_xact_commit, xnodes)
+#define MinSizeOfXactRelfilenodes offsetof(xl_xact_relfilenodes, xnodes)
 
-/*
- * These flags are set in the xinfo fields of WAL commit records,
- * indicating a variety of additional actions that need to occur
- * when emulating transaction effects during recovery.
- * They are named XactCompletion... to differentiate them from
- * EOXact... routines which run at the end of the original
- * transaction completion.
- */
-#define XACT_COMPLETION_UPDATE_RELCACHE_FILE	0x01
-#define XACT_COMPLETION_FORCE_SYNC_COMMIT		0x02
+typedef struct xl_xact_invals
+{
+	int			nmsgs;			/* number of shared inval msgs */
+	SharedInvalidationMessage msgs[FLEXIBLE_ARRAY_MEMBER];
+} xl_xact_invals;
 
-/* Access macros for above flags */
-#define XactCompletionRelcacheInitFileInval(xinfo)	(xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE)
-#define XactCompletionForceSyncCommit(xinfo)		(xinfo & XACT_COMPLETION_FORCE_SYNC_COMMIT)
+#define MinSizeOfXactInvals offsetof(xl_xact_invals, msgs)
+
+typedef struct xl_xact_commit
+{
+	TimestampTz xact_time;		/* time of commit */
+
+	/* xinfo flags follows if XLOG_XACT_HAS_INFO */
+	/* xl_xact_dbinfo follows if XINFO_HAS_DBINFO */
+	/* xl_xact_subxacts follows if XINFO_HAS_SUBXACT  */
+	/* xl_xact_relfilenodes follows if XINFO_HAS_RELFILENODES  */
+	/* xl_xact_invals follows if XINFO_HAS_INVALS  */
+} xl_xact_commit;
+
+#define MinSizeOfXactCommit (offsetof(xl_xact_commit, xact_time) + sizeof(TimestampTz))
 
 typedef struct xl_xact_abort
 {
 	TimestampTz xact_time;		/* time of abort */
-	int			nrels;			/* number of RelFileNodes */
-	int			nsubxacts;		/* number of subtransaction XIDs */
-	/* Array of RelFileNode(s) to drop at abort */
-	RelFileNode xnodes[FLEXIBLE_ARRAY_MEMBER];
-	/* ARRAY OF ABORTED SUBTRANSACTION XIDs FOLLOWS */
+
+	/* xinfo flags follows if XLOG_XACT_HAS_INFO */
+	/* xl_xact_subxacts follows if HAS_SUBXACT  */
+	/* xl_xact_relfilenodes follows if HAS_RELFILENODES  */
 } xl_xact_abort;
 
 /* Note the intentional lack of an invalidation message array c.f. commit */
 
-#define MinSizeOfXactAbort offsetof(xl_xact_abort, xnodes)
+#define MinSizeOfXactAbort sizeof(xl_xact_abort)
 
 /*
  * COMMIT_PREPARED and ABORT_PREPARED are identical to COMMIT/ABORT records
@@ -192,8 +223,6 @@ typedef struct xl_xact_commit_prepared
 	/* MORE DATA FOLLOWS AT END OF STRUCT */
 } xl_xact_commit_prepared;
 
-#define MinSizeOfXactCommitPrepared offsetof(xl_xact_commit_prepared, crec.xnodes)
-
 typedef struct xl_xact_abort_prepared
 {
 	TransactionId xid;			/* XID of prepared xact */
@@ -201,7 +230,39 @@ typedef struct xl_xact_abort_prepared
 	/* MORE DATA FOLLOWS AT END OF STRUCT */
 } xl_xact_abort_prepared;
 
-#define MinSizeOfXactAbortPrepared offsetof(xl_xact_abort_prepared, arec.xnodes)
+/*
+ * Commit/Abort records in the above form are a bit verbose to parse, so
+ * there's a deconstructed versions generated by ParseCommit/AbortRecord().
+ */
+typedef struct xl_xact_parsed_commit
+{
+	TimestampTz		xact_time;
+	uint32			xinfo;
+
+	Oid			dbId;			/* MyDatabaseId */
+	Oid			tsId;			/* MyDatabaseTableSpace */
+
+	int				nsubxacts;
+	TransactionId  *subxacts;
+
+	int				nrels;
+	RelFileNode	   *xnodes;
+
+	int				nmsgs;
+	SharedInvalidationMessage *msgs;
+} xl_xact_parsed_commit;
+
+typedef struct xl_xact_parsed_abort
+{
+	TimestampTz		xact_time;
+	uint32			xinfo;
+
+	int				nsubxacts;
+	TransactionId  *subxacts;
+
+	int				nrels;
+	RelFileNode	   *xnodes;
+} xl_xact_parsed_abort;
 
 
 /* ----------------
@@ -256,8 +317,23 @@ extern void UnregisterSubXactCallback(SubXactCallback callback, void *arg);
 
 extern int	xactGetCommittedChildren(TransactionId **ptr);
 
+extern uint8 XactEmitCommitRecord(TimestampTz commit_time,
+								  int nsubxacts, TransactionId *subxacts,
+								  int nrels, RelFileNode *rels,
+								  int nmsgs, SharedInvalidationMessage *msgs,
+								  bool relcacheInval, bool forceSync);
+
+extern uint8 XactEmitAbortRecord(TimestampTz abort_time,
+								 int nsubxacts, TransactionId *subxacts,
+								 int nrels, RelFileNode *rels);
 extern void xact_redo(XLogReaderState *record);
+
+/* xactdesc.c */
 extern void xact_desc(StringInfo buf, XLogReaderState *record);
 extern const char *xact_identify(uint8 info);
 
+/* also in xactdesc.c, so they can be shared between front/backend code */
+extern void ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *parsed);
+extern void ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed);
+
 #endif   /* XACT_H */
-- 
2.3.0.149.gf3f4077.dirty

