From 549f809fa122ca0842ec4bfc775afd08feee0d80 Mon Sep 17 00:00:00 2001
From: Vitaly Davydov <v.davydov@postgrespro.ru>
Date: Tue, 27 Feb 2024 14:02:23 +0300
Subject: [PATCH] Add asynchronous commit support for 2PC

---
 src/backend/access/transam/twophase.c | 111 +++++++++++++++++++++++++-
 1 file changed, 108 insertions(+), 3 deletions(-)

diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index c6af8cfd7e..52f0853db8 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -109,6 +109,8 @@
 #include "utils/memutils.h"
 #include "utils/timestamp.h"
 
+#define POSTGRESQL_TWOPHASE_SUPPORT_ASYNC_COMMIT
+
 /*
  * Directory where Two-phase commit files reside within PGDATA
  */
@@ -163,6 +165,9 @@ typedef struct GlobalTransactionData
 	 */
 	XLogRecPtr	prepare_start_lsn;	/* XLOG offset of prepare record start */
 	XLogRecPtr	prepare_end_lsn;	/* XLOG offset of prepare record end */
+	void*       prepare_2pc_mem_data;
+	size_t      prepare_2pc_mem_len;
+	pid_t       prepare_2pc_proc;
 	TransactionId xid;			/* The GXACT id */
 
 	Oid			owner;			/* ID of user that executed the xact */
@@ -427,6 +432,9 @@ MarkAsPreparing(TransactionId xid, const char *gid,
 
 	MarkAsPreparingGuts(gxact, xid, gid, prepared_at, owner, databaseid);
 
+	Assert(gxact->prepare_2pc_mem_data == NULL);
+	Assert(gxact->prepare_2pc_proc == 0);
+
 	gxact->ondisk = false;
 
 	/* And insert it into the active array */
@@ -1129,6 +1137,8 @@ StartPrepare(GlobalTransaction gxact)
 	}
 }
 
+extern bool IsLogicalWorker(void);
+
 /*
  * Finish preparing state data and writing it to WAL.
  */
@@ -1167,6 +1177,37 @@ EndPrepare(GlobalTransaction gxact)
 				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
 				 errmsg("two-phase state file maximum length exceeded")));
 
+	Assert(gxact->prepare_2pc_mem_data == NULL);
+	Assert(gxact->prepare_2pc_proc == 0);
+
+	if (IsLogicalWorker())
+	{
+		size_t			len = 0;
+		size_t			offset = 0;
+
+		for (record = records.head; record != NULL; record = record->next)
+			len += record->len;
+
+		if (len > 0)
+		{
+			MemoryContext	oldmemctx;
+
+			oldmemctx = MemoryContextSwitchTo(TopMemoryContext);
+
+			gxact->prepare_2pc_mem_data = palloc(len);
+			gxact->prepare_2pc_mem_len = len;
+			gxact->prepare_2pc_proc = getpid();
+
+			for (record = records.head; record != NULL; record = record->next)
+			{
+				memcpy((char *)gxact->prepare_2pc_mem_data + offset, record->data, record->len);
+				offset += record->len;
+			}
+
+			MemoryContextSwitchTo(oldmemctx);
+		}
+	}
+
 	/*
 	 * Now writing 2PC state data to WAL. We let the WAL's CRC protection
 	 * cover us, so no need to calculate a separate CRC.
@@ -1202,8 +1243,24 @@ EndPrepare(GlobalTransaction gxact)
 								   gxact->prepare_end_lsn);
 	}
 
+#if !defined(POSTGRESQL_TWOPHASE_SUPPORT_ASYNC_COMMIT)
+
 	XLogFlush(gxact->prepare_end_lsn);
 
+#else
+
+	if (synchronous_commit > SYNCHRONOUS_COMMIT_OFF)
+	{
+		/* Flush XLOG to disk */
+		XLogFlush(gxact->prepare_end_lsn);
+	}
+	else
+	{
+		XLogSetAsyncXactLSN(gxact->prepare_end_lsn);
+	}
+
+#endif
+
 	/* If we crash now, we have prepared: WAL replay will fix things */
 
 	/* Store record's start location to read that later on Commit */
@@ -1495,6 +1552,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 	xl_xact_stats_item *commitstats;
 	xl_xact_stats_item *abortstats;
 	SharedInvalidationMessage *invalmsgs;
+	bool		is_local_2pc_buf = false;
 
 	/*
 	 * Validate the GID, and lock the GXACT to ensure that two backends do not
@@ -1509,12 +1567,19 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 	 * in WAL files if the LSN is after the last checkpoint record, or moved
 	 * to disk if for some reason they have lived for a long time.
 	 */
-	if (gxact->ondisk)
+	if (gxact->prepare_2pc_mem_data != NULL && gxact->prepare_2pc_proc == getpid())
+	{
+		Assert(IsLogicalWorker());
+		Assert(gxact->prepare_2pc_proc == getpid());
+		buf = gxact->prepare_2pc_mem_data;
+		/* ereport(LOG, errmsg("%s:%d", __FUNCTION__, __LINE__)); */
+		is_local_2pc_buf = true;
+	}
+	else if (gxact->ondisk)
 		buf = ReadTwoPhaseFile(xid, false);
 	else
 		XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
 
-
 	/*
 	 * Disassemble the header area
 	 */
@@ -1638,6 +1703,19 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 	/* Clear shared memory state */
 	RemoveGXact(gxact);
 
+	if (gxact->prepare_2pc_mem_data != NULL)
+	{
+		MemoryContext		memctx;
+		//Assert(gxact->prepare_2pc_proc == getpid());
+		memctx = MemoryContextSwitchTo(TopMemoryContext);
+		if (gxact->prepare_2pc_proc == getpid())
+			pfree(gxact->prepare_2pc_mem_data);
+		gxact->prepare_2pc_mem_data = NULL;
+		gxact->prepare_2pc_mem_len = 0;
+		gxact->prepare_2pc_proc = 0;
+		MemoryContextSwitchTo(memctx);
+	}
+
 	/*
 	 * Release the lock as all callbacks are called and shared memory cleanup
 	 * is done.
@@ -1657,7 +1735,8 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 
 	RESUME_INTERRUPTS();
 
-	pfree(buf);
+	if (!is_local_2pc_buf)
+		pfree(buf);
 }
 
 /*
@@ -2349,12 +2428,38 @@ RecordTransactionCommitPrepared(TransactionId xid,
 	 * a contradiction)
 	 */
 
+#if !defined(POSTGRESQL_TWOPHASE_SUPPORT_ASYNC_COMMIT)
+
 	/* Flush XLOG to disk */
 	XLogFlush(recptr);
 
 	/* Mark the transaction committed in pg_xact */
 	TransactionIdCommitTree(xid, nchildren, children);
 
+#else
+
+	if (synchronous_commit > SYNCHRONOUS_COMMIT_OFF)
+	{
+		/* Flush XLOG to disk */
+		XLogFlush(recptr);
+
+		/* Mark the transaction committed in pg_xact */
+		TransactionIdCommitTree(xid, nchildren, children);
+	}
+	else
+	{
+		XLogSetAsyncXactLSN(recptr);
+
+		/*
+		 * We must not immediately update the CLOG, since we didn't flush the
+		 * XLOG. Instead, we store the LSN up to which the XLOG must be
+		 * flushed before the CLOG may be updated.
+		 */
+		TransactionIdAsyncCommitTree(xid, nchildren, children, recptr);
+	}
+
+#endif
+
 	/* Checkpoint can proceed now */
 	MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
 
-- 
2.34.1

