From 55b6d87b3cf032a65efdce260f5422fdee6426f1 Mon Sep 17 00:00:00 2001
From: Michael Paquier <michael@paquier.xyz>
Date: Thu, 16 Jan 2025 16:45:33 +0900
Subject: [PATCH v2 2/2] Integrate deeper FullTransactionIds into twophase.c

---
 src/backend/access/transam/twophase.c | 115 ++++++++++++--------------
 1 file changed, 55 insertions(+), 60 deletions(-)

diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 7a162938f4..75008fb949 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -159,7 +159,7 @@ typedef struct GlobalTransactionData
 	 */
 	XLogRecPtr	prepare_start_lsn;	/* XLOG offset of prepare record start */
 	XLogRecPtr	prepare_end_lsn;	/* XLOG offset of prepare record end */
-	TransactionId xid;			/* The GXACT id */
+	FullTransactionId fxid;		/* The GXACT full xid */
 
 	Oid			owner;			/* ID of user that executed the xact */
 	ProcNumber	locking_backend;	/* backend currently working on the xact */
@@ -224,11 +224,11 @@ static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len);
 static char *ProcessTwoPhaseBuffer(FullTransactionId xid,
 								   XLogRecPtr prepare_start_lsn,
 								   bool fromdisk, bool setParent, bool setNextXid);
-static void MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid,
+static void MarkAsPreparingGuts(GlobalTransaction gxact, FullTransactionId fxid,
 								const char *gid, TimestampTz prepared_at, Oid owner,
 								Oid databaseid);
 static void RemoveTwoPhaseFile(FullTransactionId fxid, bool giveWarning);
-static void RecreateTwoPhaseFile(TransactionId xid, void *content, int len);
+static void RecreateTwoPhaseFile(FullTransactionId fxid, void *content, int len);
 
 /*
  * Initialization of shared memory
@@ -360,6 +360,7 @@ MarkAsPreparing(TransactionId xid, const char *gid,
 				TimestampTz prepared_at, Oid owner, Oid databaseid)
 {
 	GlobalTransaction gxact;
+	FullTransactionId fxid;
 	int			i;
 
 	if (strlen(gid) >= GIDSIZE)
@@ -407,7 +408,9 @@ MarkAsPreparing(TransactionId xid, const char *gid,
 	gxact = TwoPhaseState->freeGXacts;
 	TwoPhaseState->freeGXacts = gxact->next;
 
-	MarkAsPreparingGuts(gxact, xid, gid, prepared_at, owner, databaseid);
+	fxid = FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(),
+											xid);
+	MarkAsPreparingGuts(gxact, fxid, gid, prepared_at, owner, databaseid);
 
 	gxact->ondisk = false;
 
@@ -430,11 +433,13 @@ MarkAsPreparing(TransactionId xid, const char *gid,
  * Note: This function should be called with appropriate locks held.
  */
 static void
-MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid,
-					TimestampTz prepared_at, Oid owner, Oid databaseid)
+MarkAsPreparingGuts(GlobalTransaction gxact, FullTransactionId fxid,
+					const char *gid, TimestampTz prepared_at, Oid owner,
+					Oid databaseid)
 {
 	PGPROC	   *proc;
 	int			i;
+	TransactionId xid = XidFromFullTransactionId(fxid);
 
 	Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
 
@@ -479,7 +484,7 @@ MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid,
 	proc->subxidStatus.count = 0;
 
 	gxact->prepared_at = prepared_at;
-	gxact->xid = xid;
+	gxact->fxid = fxid;
 	gxact->owner = owner;
 	gxact->locking_backend = MyProcNumber;
 	gxact->valid = false;
@@ -800,6 +805,7 @@ static GlobalTransaction
 TwoPhaseGetGXact(TransactionId xid, bool lock_held)
 {
 	GlobalTransaction result = NULL;
+	FullTransactionId fxid;
 	int			i;
 
 	static TransactionId cached_xid = InvalidTransactionId;
@@ -817,11 +823,14 @@ TwoPhaseGetGXact(TransactionId xid, bool lock_held)
 	if (!lock_held)
 		LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
 
+	fxid = FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(),
+											xid);
+
 	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
 	{
 		GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
 
-		if (gxact->xid == xid)
+		if (FullTransactionIdEquals(gxact->fxid, fxid))
 		{
 			result = gxact;
 			break;
@@ -881,7 +890,7 @@ TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid,
 				*have_more = true;
 				break;
 			}
-			result = gxact->xid;
+			result = XidFromFullTransactionId(gxact->fxid);
 		}
 	}
 
@@ -1032,7 +1041,7 @@ void
 StartPrepare(GlobalTransaction gxact)
 {
 	PGPROC	   *proc = GetPGProcByNumber(gxact->pgprocno);
-	TransactionId xid = gxact->xid;
+	TransactionId xid = XidFromFullTransactionId(gxact->fxid);
 	TwoPhaseFileHeader hdr;
 	TransactionId *children;
 	RelFileLocator *commitrels;
@@ -1268,7 +1277,7 @@ RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
  * after discarding two-phase files from frozen epochs.
  */
 static char *
-ReadTwoPhaseFile(TransactionId xid, bool missing_ok)
+ReadTwoPhaseFile(FullTransactionId fxid, bool missing_ok)
 {
 	char		path[MAXPGPATH];
 	char	   *buf;
@@ -1279,9 +1288,7 @@ ReadTwoPhaseFile(TransactionId xid, bool missing_ok)
 	pg_crc32c	calc_crc,
 				file_crc;
 	int			r;
-	FullTransactionId fxid;
 
-	fxid = FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(), xid);
 	TwoPhaseFilePath(path, fxid);
 
 	fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
@@ -1447,6 +1454,7 @@ StandbyTransactionIdIsPrepared(TransactionId xid)
 	char	   *buf;
 	TwoPhaseFileHeader *hdr;
 	bool		result;
+	FullTransactionId fxid;
 
 	Assert(TransactionIdIsValid(xid));
 
@@ -1454,7 +1462,8 @@ StandbyTransactionIdIsPrepared(TransactionId xid)
 		return false;			/* nothing to do */
 
 	/* Read and validate file */
-	buf = ReadTwoPhaseFile(xid, true);
+	fxid = FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(), xid);
+	buf = ReadTwoPhaseFile(fxid, true);
 	if (buf == NULL)
 		return false;
 
@@ -1474,6 +1483,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 {
 	GlobalTransaction gxact;
 	PGPROC	   *proc;
+	FullTransactionId fxid;
 	TransactionId xid;
 	bool		ondisk;
 	char	   *buf;
@@ -1495,7 +1505,8 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 	 */
 	gxact = LockGXact(gid, GetUserId());
 	proc = GetPGProcByNumber(gxact->pgprocno);
-	xid = gxact->xid;
+	fxid = gxact->fxid;
+	xid = XidFromFullTransactionId(fxid);
 
 	/*
 	 * Read and validate 2PC state data. State data will typically be stored
@@ -1503,7 +1514,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 	 * to disk if for some reason they have lived for a long time.
 	 */
 	if (gxact->ondisk)
-		buf = ReadTwoPhaseFile(xid, false);
+		buf = ReadTwoPhaseFile(fxid, false);
 	else
 		XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
 
@@ -1648,13 +1659,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 
 	/* And now we can clean up any files we may have left. */
 	if (ondisk)
-	{
-		FullTransactionId fxid;
-
-		fxid = FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(),
-												xid);
 		RemoveTwoPhaseFile(fxid, true);
-	}
 
 	MyLockedGxact = NULL;
 
@@ -1717,19 +1722,17 @@ RemoveTwoPhaseFile(FullTransactionId fxid, bool giveWarning)
  * Note: content and len don't include CRC.
  */
 static void
-RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
+RecreateTwoPhaseFile(FullTransactionId fxid, void *content, int len)
 {
 	char		path[MAXPGPATH];
 	pg_crc32c	statefile_crc;
 	int			fd;
-	FullTransactionId fxid;
 
 	/* Recompute CRC */
 	INIT_CRC32C(statefile_crc);
 	COMP_CRC32C(statefile_crc, content, len);
 	FIN_CRC32C(statefile_crc);
 
-	fxid = FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(), xid);
 	TwoPhaseFilePath(path, fxid);
 
 	fd = OpenTransientFile(path,
@@ -1842,7 +1845,7 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
 			int			len;
 
 			XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, &len);
-			RecreateTwoPhaseFile(gxact->xid, buf, len);
+			RecreateTwoPhaseFile(gxact->fxid, buf, len);
 			gxact->ondisk = true;
 			gxact->prepare_start_lsn = InvalidXLogRecPtr;
 			gxact->prepare_end_lsn = InvalidXLogRecPtr;
@@ -1964,9 +1967,8 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
 
 		Assert(gxact->inredo);
 
-		xid = gxact->xid;
-
-		fxid = FullTransactionIdFromAllowableAt(nextXid, xid);
+		fxid = gxact->fxid;
+		xid = XidFromFullTransactionId(fxid);
 		buf = ProcessTwoPhaseBuffer(fxid,
 									gxact->prepare_start_lsn,
 									gxact->ondisk, false, true);
@@ -2029,24 +2031,16 @@ void
 StandbyRecoverPreparedTransactions(void)
 {
 	int			i;
-	FullTransactionId nextFullXid;
-
-	nextFullXid = ReadNextFullTransactionId();
 
 	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
 	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
 	{
-		TransactionId xid;
-		FullTransactionId fxid;
 		char	   *buf;
 		GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
 
 		Assert(gxact->inredo);
 
-		xid = gxact->xid;
-
-		fxid = FullTransactionIdFromAllowableAt(nextFullXid, xid);
-		buf = ProcessTwoPhaseBuffer(fxid,
+		buf = ProcessTwoPhaseBuffer(gxact->fxid,
 									gxact->prepare_start_lsn,
 									gxact->ondisk, true, false);
 		if (buf != NULL)
@@ -2075,15 +2069,11 @@ void
 RecoverPreparedTransactions(void)
 {
 	int			i;
-	FullTransactionId nextFullXid;
-
-	nextFullXid = ReadNextFullTransactionId();
 
 	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
 	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
 	{
 		TransactionId xid;
-		FullTransactionId fxid;
 		char	   *buf;
 		GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
 		char	   *bufptr;
@@ -2091,8 +2081,6 @@ RecoverPreparedTransactions(void)
 		TransactionId *subxids;
 		const char *gid;
 
-		xid = gxact->xid;
-
 		/*
 		 * Reconstruct subtrans state for the transaction --- needed because
 		 * pg_subtrans is not preserved over a restart.  Note that we are
@@ -2102,15 +2090,17 @@ RecoverPreparedTransactions(void)
 		 * SubTransSetParent has been set before, if the prepared transaction
 		 * generated xid assignment records.
 		 */
-		fxid = FullTransactionIdFromAllowableAt(nextFullXid, xid);
-		buf = ProcessTwoPhaseBuffer(fxid,
+		buf = ProcessTwoPhaseBuffer(gxact->fxid,
 									gxact->prepare_start_lsn,
 									gxact->ondisk, true, false);
 		if (buf == NULL)
 			continue;
 
+		xid = XidFromFullTransactionId(gxact->fxid);
 		ereport(LOG,
-				(errmsg("recovering prepared transaction %u from shared memory", xid)));
+				(errmsg("recovering prepared transaction %u of epoch %u from shared memory",
+						XidFromFullTransactionId(gxact->fxid),
+						EpochFromFullTransactionId(gxact->fxid))));
 
 		hdr = (TwoPhaseFileHeader *) buf;
 		Assert(TransactionIdEquals(hdr->xid, xid));
@@ -2129,7 +2119,7 @@ RecoverPreparedTransactions(void)
 		 * Recreate its GXACT and dummy PGPROC. But, check whether it was
 		 * added in redo and already has a shmem entry for it.
 		 */
-		MarkAsPreparingGuts(gxact, xid, gid,
+		MarkAsPreparingGuts(gxact, gxact->fxid, gid,
 							hdr->prepared_at,
 							hdr->owner, hdr->database);
 
@@ -2264,7 +2254,7 @@ ProcessTwoPhaseBuffer(FullTransactionId fxid,
 	if (fromdisk)
 	{
 		/* Read and validate file */
-		buf = ReadTwoPhaseFile(xid, false);
+		buf = ReadTwoPhaseFile(fxid, false);
 	}
 	else
 	{
@@ -2570,7 +2560,8 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
 	gxact->prepared_at = hdr->prepared_at;
 	gxact->prepare_start_lsn = start_lsn;
 	gxact->prepare_end_lsn = end_lsn;
-	gxact->xid = hdr->xid;
+	gxact->fxid = FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(),
+												   hdr->xid);
 	gxact->owner = hdr->owner;
 	gxact->locking_backend = INVALID_PROC_NUMBER;
 	gxact->valid = false;
@@ -2589,7 +2580,9 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
 						   false /* backward */ , false /* WAL */ );
 	}
 
-	elog(DEBUG2, "added 2PC data in shared memory for transaction %u", gxact->xid);
+	elog(DEBUG2, "added 2PC data in shared memory for transaction %u of epoch %u",
+		 XidFromFullTransactionId(gxact->fxid),
+		 EpochFromFullTransactionId(gxact->fxid));
 }
 
 /*
@@ -2605,17 +2598,21 @@ void
 PrepareRedoRemove(TransactionId xid, bool giveWarning)
 {
 	GlobalTransaction gxact = NULL;
+	FullTransactionId fxid;
 	int			i;
 	bool		found = false;
 
 	Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
 	Assert(RecoveryInProgress());
 
+	fxid = FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(),
+											xid);
+
 	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
 	{
 		gxact = TwoPhaseState->prepXacts[i];
 
-		if (gxact->xid == xid)
+		if (FullTransactionIdEquals(gxact->fxid, fxid))
 		{
 			Assert(gxact->inredo);
 			found = true;
@@ -2632,15 +2629,13 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning)
 	/*
 	 * And now we can clean up any files we may have left.
 	 */
-	elog(DEBUG2, "removing 2PC data for transaction %u", xid);
-	if (gxact->ondisk)
-	{
-		FullTransactionId fxid;
+	elog(DEBUG2, "removing 2PC data for transaction %u of epoch %u ",
+		 XidFromFullTransactionId(fxid),
+		 EpochFromFullTransactionId(fxid));
 
-		fxid = FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(),
-												xid);
+	if (gxact->ondisk)
 		RemoveTwoPhaseFile(fxid, giveWarning);
-	}
+
 	RemoveGXact(gxact);
 }
 
@@ -2688,7 +2683,7 @@ LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn,
 			 * between publisher and subscriber.
 			 */
 			if (gxact->ondisk)
-				buf = ReadTwoPhaseFile(gxact->xid, false);
+				buf = ReadTwoPhaseFile(gxact->fxid, false);
 			else
 			{
 				Assert(gxact->prepare_start_lsn);
-- 
2.47.1

