From 0e1538fa72ede3566ea3dd03958540b80e9fd15c Mon Sep 17 00:00:00 2001
From: Michael Paquier <michael@paquier.xyz>
Date: Thu, 16 Jan 2025 15:14:06 +0900
Subject: [PATCH v2 1/2] Fix twophase.c XID epoch tracking

Half the time after epoch 0, allowable XIDs span two epochs.  This would
have no user-visible consequences during epoch 0, but I expect
(unconfirmed) twophase breakage during other epochs.

FIXME likely rework this in favor of broader fulltransaction use in
twophase.c
---
 src/include/access/transam.h            | 29 ++++++++++
 src/backend/access/transam/twophase.c   | 75 ++++++-------------------
 src/backend/access/transam/xlogreader.c | 18 +-----
 src/backend/utils/adt/xid8funcs.c       | 43 ++++----------
 4 files changed, 58 insertions(+), 107 deletions(-)

diff --git a/src/include/access/transam.h b/src/include/access/transam.h
index 0cab8653f1..48fce16aeb 100644
--- a/src/include/access/transam.h
+++ b/src/include/access/transam.h
@@ -77,6 +77,35 @@ FullTransactionIdFromEpochAndXid(uint32 epoch, TransactionId xid)
 	return result;
 }
 
+/*
+ * Compute FullTransactionId for the given TransactionId, assuming xid was
+ * between [oldestXid, nextXid] at the time when TransamVariables->nextXid was
+ * nextFullXid.
+ */
+static inline FullTransactionId
+FullTransactionIdFromAllowableAt(FullTransactionId nextFullXid,
+								 TransactionId xid)
+{
+	uint32		epoch;
+
+	/* Special transaction ID. */
+	if (!TransactionIdIsNormal(xid))
+		return FullTransactionIdFromEpochAndXid(0, xid);
+
+	/*
+	 * The 64 bit result must be <= nextFullXid, since nextFullXid hadn't been
+	 * issued yet when xid was in the past.  The xid must therefore be from
+	 * the epoch of nextFullXid or the epoch before.  We know this because we
+	 * must remove (by freezing) an XID before assigning the XID half an epoch
+	 * ahead of it.
+	 */
+	epoch = EpochFromFullTransactionId(nextFullXid);
+	if (xid > XidFromFullTransactionId(nextFullXid))
+		epoch--;
+
+	return FullTransactionIdFromEpochAndXid(epoch, xid);
+}
+
 static inline FullTransactionId
 FullTransactionIdFromU64(uint64 value)
 {
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index a3190dc4f1..7a162938f4 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -926,24 +926,6 @@ TwoPhaseGetDummyProc(TransactionId xid, bool lock_held)
 /* State file support													*/
 /************************************************************************/
 
-/*
- * Compute FullTransactionId for the given TransactionId, using the current
- * epoch.
- */
-static inline FullTransactionId
-FullTransactionIdFromCurrentEpoch(TransactionId xid)
-{
-	FullTransactionId fxid;
-	FullTransactionId nextFullXid;
-	uint32		epoch;
-
-	nextFullXid = ReadNextFullTransactionId();
-	epoch = EpochFromFullTransactionId(nextFullXid);
-
-	fxid = FullTransactionIdFromEpochAndXid(epoch, xid);
-	return fxid;
-}
-
 static inline int
 TwoPhaseFilePath(char *path, FullTransactionId fxid)
 {
@@ -1283,7 +1265,7 @@ RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
  * contents of the file, issuing an error when finding corrupted data.  If
  * missing_ok is true, which indicates that missing files can be safely
  * ignored, then return NULL.  This state can be reached when doing recovery
- * after discarding two-phase files from other epochs.
+ * after discarding two-phase files from frozen epochs.
  */
 static char *
 ReadTwoPhaseFile(TransactionId xid, bool missing_ok)
@@ -1299,7 +1281,7 @@ ReadTwoPhaseFile(TransactionId xid, bool missing_ok)
 	int			r;
 	FullTransactionId fxid;
 
-	fxid = FullTransactionIdFromCurrentEpoch(xid);
+	fxid = FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(), xid);
 	TwoPhaseFilePath(path, fxid);
 
 	fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
@@ -1664,15 +1646,13 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 	/* Count the prepared xact as committed or aborted */
 	AtEOXact_PgStat(isCommit, false);
 
-	/*
-	 * And now we can clean up any files we may have left.  These should be
-	 * from the current epoch.
-	 */
+	/* And now we can clean up any files we may have left. */
 	if (ondisk)
 	{
 		FullTransactionId fxid;
 
-		fxid = FullTransactionIdFromCurrentEpoch(xid);
+		fxid = FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(),
+												xid);
 		RemoveTwoPhaseFile(fxid, true);
 	}
 
@@ -1749,8 +1729,7 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
 	COMP_CRC32C(statefile_crc, content, len);
 	FIN_CRC32C(statefile_crc);
 
-	/* Use current epoch */
-	fxid = FullTransactionIdFromCurrentEpoch(xid);
+	fxid = FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(), xid);
 	TwoPhaseFilePath(path, fxid);
 
 	fd = OpenTransientFile(path,
@@ -1900,7 +1879,7 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
  * This is called once at the beginning of recovery, saving any extra
  * lookups in the future.  Two-phase files that are newer than the
  * minimum XID horizon are discarded on the way.  Two-phase files with
- * an epoch older or newer than the current checkpoint's record epoch
+ * an epoch frozen relative to the current checkpoint's record epoch
  * are also discarded.
  */
 void
@@ -1971,7 +1950,6 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
 	TransactionId origNextXid = XidFromFullTransactionId(nextXid);
 	TransactionId result = origNextXid;
 	TransactionId *xids = NULL;
-	uint32		epoch = EpochFromFullTransactionId(nextXid);
 	int			nxids = 0;
 	int			allocsize = 0;
 	int			i;
@@ -1988,11 +1966,7 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
 
 		xid = gxact->xid;
 
-		/*
-		 * All two-phase files with past and future epoch in pg_twophase are
-		 * gone at this point, so we're OK to rely on only the current epoch.
-		 */
-		fxid = FullTransactionIdFromEpochAndXid(epoch, xid);
+		fxid = FullTransactionIdFromAllowableAt(nextXid, xid);
 		buf = ProcessTwoPhaseBuffer(fxid,
 									gxact->prepare_start_lsn,
 									gxact->ondisk, false, true);
@@ -2055,12 +2029,9 @@ void
 StandbyRecoverPreparedTransactions(void)
 {
 	int			i;
-	uint32		epoch;
 	FullTransactionId nextFullXid;
 
-	/* get current epoch */
 	nextFullXid = ReadNextFullTransactionId();
-	epoch = EpochFromFullTransactionId(nextFullXid);
 
 	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
 	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
@@ -2074,11 +2045,7 @@ StandbyRecoverPreparedTransactions(void)
 
 		xid = gxact->xid;
 
-		/*
-		 * At this stage, we're OK to work with the current epoch as all past
-		 * and future files have been already discarded.
-		 */
-		fxid = FullTransactionIdFromEpochAndXid(epoch, xid);
+		fxid = FullTransactionIdFromAllowableAt(nextFullXid, xid);
 		buf = ProcessTwoPhaseBuffer(fxid,
 									gxact->prepare_start_lsn,
 									gxact->ondisk, true, false);
@@ -2108,12 +2075,9 @@ void
 RecoverPreparedTransactions(void)
 {
 	int			i;
-	uint32		epoch;
 	FullTransactionId nextFullXid;
 
-	/* get current epoch */
 	nextFullXid = ReadNextFullTransactionId();
-	epoch = EpochFromFullTransactionId(nextFullXid);
 
 	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
 	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
@@ -2127,10 +2091,6 @@ RecoverPreparedTransactions(void)
 		TransactionId *subxids;
 		const char *gid;
 
-		/*
-		 * At this stage, we're OK to work with the current epoch as all past
-		 * and future files have been already discarded.
-		 */
 		xid = gxact->xid;
 
 		/*
@@ -2142,7 +2102,7 @@ RecoverPreparedTransactions(void)
 		 * SubTransSetParent has been set before, if the prepared transaction
 		 * generated xid assignment records.
 		 */
-		fxid = FullTransactionIdFromEpochAndXid(epoch, xid);
+		fxid = FullTransactionIdFromAllowableAt(nextFullXid, xid);
 		buf = ProcessTwoPhaseBuffer(fxid,
 									gxact->prepare_start_lsn,
 									gxact->ondisk, true, false);
@@ -2260,8 +2220,9 @@ ProcessTwoPhaseBuffer(FullTransactionId fxid,
 		return NULL;
 	}
 
-	/* Discard files from past epochs */
-	if (EpochFromFullTransactionId(fxid) < EpochFromFullTransactionId(nextXid))
+	/* Discard files from frozen epochs */
+	if (EpochFromFullTransactionId(fxid) + 1 <
+		EpochFromFullTransactionId(nextXid))
 	{
 		if (fromdisk)
 		{
@@ -2576,8 +2537,8 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
 		char		path[MAXPGPATH];
 		FullTransactionId fxid;
 
-		/* Use current epoch */
-		fxid = FullTransactionIdFromCurrentEpoch(hdr->xid);
+		fxid = FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(),
+												hdr->xid);
 		TwoPhaseFilePath(path, fxid);
 
 		if (access(path, F_OK) == 0)
@@ -2676,10 +2637,8 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning)
 	{
 		FullTransactionId fxid;
 
-		/*
-		 * We should deal with a file at the current epoch here.
-		 */
-		fxid = FullTransactionIdFromCurrentEpoch(xid);
+		fxid = FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(),
+												xid);
 		RemoveTwoPhaseFile(fxid, giveWarning);
 	}
 	RemoveGXact(gxact);
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 3596af0617..91b6a91767 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -2166,28 +2166,14 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
 FullTransactionId
 XLogRecGetFullXid(XLogReaderState *record)
 {
-	TransactionId xid,
-				next_xid;
-	uint32		epoch;
-
 	/*
 	 * This function is only safe during replay, because it depends on the
 	 * replay state.  See AdvanceNextFullTransactionIdPastXid() for more.
 	 */
 	Assert(AmStartupProcess() || !IsUnderPostmaster);
 
-	xid = XLogRecGetXid(record);
-	next_xid = XidFromFullTransactionId(TransamVariables->nextXid);
-	epoch = EpochFromFullTransactionId(TransamVariables->nextXid);
-
-	/*
-	 * If xid is numerically greater than next_xid, it has to be from the last
-	 * epoch.
-	 */
-	if (unlikely(xid > next_xid))
-		--epoch;
-
-	return FullTransactionIdFromEpochAndXid(epoch, xid);
+	return FullTransactionIdFromAllowableAt(TransamVariables->nextXid,
+											XLogRecGetXid(record));
 }
 
 #endif
diff --git a/src/backend/utils/adt/xid8funcs.c b/src/backend/utils/adt/xid8funcs.c
index 4736755b29..b17395617f 100644
--- a/src/backend/utils/adt/xid8funcs.c
+++ b/src/backend/utils/adt/xid8funcs.c
@@ -154,35 +154,6 @@ TransactionIdInRecentPast(FullTransactionId fxid, TransactionId *extracted_xid)
 	return !FullTransactionIdPrecedes(fxid, oldest_fxid);
 }
 
-/*
- * Convert a TransactionId obtained from a snapshot held by the caller to a
- * FullTransactionId.  Use next_fxid as a reference FullTransactionId, so that
- * we can compute the high order bits.  It must have been obtained by the
- * caller with ReadNextFullTransactionId() after the snapshot was created.
- */
-static FullTransactionId
-widen_snapshot_xid(TransactionId xid, FullTransactionId next_fxid)
-{
-	TransactionId next_xid = XidFromFullTransactionId(next_fxid);
-	uint32		epoch = EpochFromFullTransactionId(next_fxid);
-
-	/* Special transaction ID. */
-	if (!TransactionIdIsNormal(xid))
-		return FullTransactionIdFromEpochAndXid(0, xid);
-
-	/*
-	 * The 64 bit result must be <= next_fxid, since next_fxid hadn't been
-	 * issued yet when the snapshot was created.  Every TransactionId in the
-	 * snapshot must therefore be from the same epoch as next_fxid, or the
-	 * epoch before.  We know this because next_fxid is never allow to get
-	 * more than one epoch ahead of the TransactionIds in any snapshot.
-	 */
-	if (xid > next_xid)
-		epoch--;
-
-	return FullTransactionIdFromEpochAndXid(epoch, xid);
-}
-
 /*
  * txid comparator for qsort/bsearch
  */
@@ -420,12 +391,18 @@ pg_current_snapshot(PG_FUNCTION_ARGS)
 	nxip = cur->xcnt;
 	snap = palloc(PG_SNAPSHOT_SIZE(nxip));
 
-	/* fill */
-	snap->xmin = widen_snapshot_xid(cur->xmin, next_fxid);
-	snap->xmax = widen_snapshot_xid(cur->xmax, next_fxid);
+	/*
+	 * Fill.  This is the current backend's active snapshot, so MyProc->xmin
+	 * is <= all these XIDs.  As long as that remains so, oldestXid can't
+	 * advance past any of these XIDs.  Hence, these XIDs remain allowable
+	 * relative to next_fxid.
+	 */
+	snap->xmin = FullTransactionIdFromAllowableAt(next_fxid, cur->xmin);
+	snap->xmax = FullTransactionIdFromAllowableAt(next_fxid, cur->xmax);
 	snap->nxip = nxip;
 	for (i = 0; i < nxip; i++)
-		snap->xip[i] = widen_snapshot_xid(cur->xip[i], next_fxid);
+		snap->xip[i] =
+			FullTransactionIdFromAllowableAt(next_fxid, cur->xip[i]);
 
 	/*
 	 * We want them guaranteed to be in ascending order.  This also removes
-- 
2.47.1

