From 12c863ddd9721e87c81ad755d1729d8241ab2204 Mon Sep 17 00:00:00 2001
From: Michael Paquier <michael@paquier.xyz>
Date: Thu, 22 May 2025 10:09:36 +0900
Subject: [PATCH v3 2/2] Improve handling of 2PC files during recovery

This addresses two issues with 2PC files.

First, at the beginning of recovery, 2PC file data is loaded from
pg_twophase/.  The previous codes used CLOG lookups to determine fif
transactions were committed and/or aborted, which is unsafe at this
stage of recovery if a consistent state may not have been reached.  The
code is changed so as files are discarded based on the horizon seen by
the checkpoint record from where recovery begins:
- If 2PC files are older than the checkpoint XID horizon, these files
are useless, and can be safely removed/
- If 2PC files are newer than the next XID available, these files should
not exist yet, and can be discarded.  If a 2PC transaction exists with
the same transaction ID, WAL replay will take care of adding it.

Second, at the end of recovery, it may be possible that dangling files
exist for transactions already committed and/or aborted, when these are
loaded into memory based on the XID horizon available at the beginning
of recovery.  At this stage of recovery, CLOG lookups are safe as
recovery finishes so the state of the on-disk CLOG data is consistent.

The first step is now done by restoreTwoPhaseData(), which is the code
in charge of loading the 2PC state files from pg_twophase at the
beginning of recovery.  The second step is done in
RecoverPreparedTransactions(), called at the end of recovery to restore
full the state of existing 2PC transactions based on their in-memory
data filled during recovery.  Previously, ProcessTwoPhaseBuffer() was in
charge of doing such checks, based on CLOG lookups, which was unreliable
at the beginning of recovery.

Tests are added to show these two problems.
---
 src/backend/access/transam/twophase.c | 140 +++++++++++++----------
 src/test/recovery/t/009_twophase.pl   | 153 ++++++++++++++++++++++++++
 2 files changed, 236 insertions(+), 57 deletions(-)

diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 82324395f5a1..644642e80df0 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -54,7 +54,9 @@
  *		* At the beginning of recovery, pg_twophase is scanned once, filling
  *		  TwoPhaseState with entries marked with gxact->inredo and
  *		  gxact->ondisk.  Two-phase file data older than the XID horizon of
- *		  the redo position are discarded.
+ *		  the redo position and two-phase file data newer than the next XID
+ *		  are discarded, based on the information of the checkpoint record
+ *		  retrieved at the beginning of recovery.
  *		* On PREPARE redo, the transaction is added to TwoPhaseState->prepXacts.
  *		  gxact->inredo is set to true for such entries.
  *		* On Checkpoint we iterate through TwoPhaseState->prepXacts entries
@@ -66,6 +68,10 @@
  *		* RecoverPreparedTransactions(), StandbyRecoverPreparedTransactions()
  *		  and PrescanPreparedTransactions() have been modified to go through
  *		  gxact->inredo entries that have not made it to disk.
+ *		* At the end of recovery, RecoverPreparedTransactions() performs an
+ *		  extra check for transactions that could be found as already
+ *		  committed or aborted.  This is safe to do as recovery is done,
+ *		  making CLOG lookups a safe operation.
  *
  *-------------------------------------------------------------------------
  */
@@ -1893,13 +1899,16 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
  * Scan pg_twophase and fill TwoPhaseState depending on the on-disk data.
  * This is called once at the beginning of recovery, saving any extra
  * lookups in the future.  Two-phase files that are newer than the
- * minimum XID horizon are discarded on the way.
+ * minimum XID horizon and two-phase files that are older than the oldest
+ * XID horizon are discarded on the way.
  */
 void
 restoreTwoPhaseData(void)
 {
 	DIR		   *cldir;
 	struct dirent *clde;
+	FullTransactionId nextXid = TransamVariables->nextXid;
+	FullTransactionId oldestXid = AdjustToFullTransactionId(TransamVariables->oldestXid);
 
 	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
 	cldir = AllocateDir(TWOPHASE_DIR);
@@ -1913,10 +1922,26 @@ restoreTwoPhaseData(void)
 
 			fxid = FullTransactionIdFromU64(strtou64(clde->d_name, NULL, 16));
 
+			/* Reject XID if too new or too old */
+			if (FullTransactionIdFollowsOrEquals(fxid, nextXid) ||
+				FullTransactionIdPrecedes(fxid, oldestXid))
+			{
+				if (FullTransactionIdFollowsOrEquals(fxid, nextXid))
+					ereport(WARNING,
+							(errmsg("removing future two-phase state file for transaction %u of epoch %u",
+									XidFromFullTransactionId(fxid),
+									EpochFromFullTransactionId(fxid))));
+				else
+					ereport(WARNING,
+							(errmsg("removing past two-phase state file for transaction %u of epoch %u",
+									XidFromFullTransactionId(fxid),
+									EpochFromFullTransactionId(fxid))));
+				RemoveTwoPhaseFile(fxid, true);
+				continue;
+			}
+
 			buf = ProcessTwoPhaseBuffer(fxid, InvalidXLogRecPtr,
 										true, false, false);
-			if (buf == NULL)
-				continue;
 
 			PrepareRedoAdd(fxid, buf, InvalidXLogRecPtr,
 						   InvalidXLogRecPtr, InvalidRepOriginId);
@@ -1981,9 +2006,6 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
 									gxact->prepare_start_lsn,
 									gxact->ondisk, false, true);
 
-		if (buf == NULL)
-			continue;
-
 		/*
 		 * OK, we think this file is valid.  Incorporate xid into the
 		 * running-minimum result.
@@ -2052,8 +2074,7 @@ StandbyRecoverPreparedTransactions(void)
 		buf = ProcessTwoPhaseBuffer(gxact->fxid,
 									gxact->prepare_start_lsn,
 									gxact->ondisk, true, false);
-		if (buf != NULL)
-			pfree(buf);
+		pfree(buf);
 	}
 	LWLockRelease(TwoPhaseStateLock);
 }
@@ -2078,8 +2099,28 @@ void
 RecoverPreparedTransactions(void)
 {
 	int			i;
+	FullTransactionId *remove_fxids;
+	int			remove_fxids_cnt;
 
 	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
+
+	/* Fast-exit path if no work is required */
+	if (TwoPhaseState->numPrepXacts == 0)
+	{
+		LWLockRelease(TwoPhaseStateLock);
+		return;
+	}
+
+	/*
+	 * Track XIDs candidate for removal if found as already committed or
+	 * aborted, once the first scan through TwoPhaseState is done.  This
+	 * cannot happen while going through the entries in TwoPhaseState as
+	 * PrepareRedoRemove() manipulates it.
+	 */
+	remove_fxids_cnt = 0;
+	remove_fxids = (FullTransactionId *) palloc(TwoPhaseState->numPrepXacts *
+												sizeof(FullTransactionId));
+
 	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
 	{
 		char	   *buf;
@@ -2090,6 +2131,27 @@ RecoverPreparedTransactions(void)
 		TransactionId *subxids;
 		const char *gid;
 
+		/*
+		 * Is this transaction already aborted or committed?  If yes, mark it
+		 * for removal.
+		 *
+		 * Checking CLOGs if these transactions have been already aborted or
+		 * committed is safe at this stage; we are at the end of recovery and
+		 * all WAL has been replayed, all 2PC transactions are reinstated and
+		 * are tracked in TwoPhaseState.
+		 */
+		if (TransactionIdDidCommit(XidFromFullTransactionId(fxid)) ||
+			TransactionIdDidAbort(XidFromFullTransactionId(fxid)))
+		{
+			/*
+			 * Track this transaction ID for its removal from the shared
+			 * memory state at the end.
+			 */
+			remove_fxids[remove_fxids_cnt] = fxid;
+			remove_fxids_cnt++;
+			continue;
+		}
+
 		/*
 		 * Reconstruct subtrans state for the transaction --- needed because
 		 * pg_subtrans is not preserved over a restart.  Note that we are
@@ -2102,8 +2164,6 @@ RecoverPreparedTransactions(void)
 		buf = ProcessTwoPhaseBuffer(gxact->fxid,
 									gxact->prepare_start_lsn,
 									gxact->ondisk, true, false);
-		if (buf == NULL)
-			continue;
 
 		ereport(LOG,
 				(errmsg("recovering prepared transaction %u of epoch %u from shared memory",
@@ -2164,6 +2224,18 @@ RecoverPreparedTransactions(void)
 		LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
 	}
 
+	for (i = 0; i < remove_fxids_cnt; i++)
+	{
+		FullTransactionId fxid = remove_fxids[i];
+
+		ereport(WARNING,
+				(errmsg("removing stale two-phase state from memory for transaction %u of epoch %u",
+						XidFromFullTransactionId(fxid),
+						EpochFromFullTransactionId(fxid))));
+
+		PrepareRedoRemoveFull(fxid, true);
+	}
+
 	LWLockRelease(TwoPhaseStateLock);
 }
 
@@ -2184,7 +2256,6 @@ ProcessTwoPhaseBuffer(FullTransactionId fxid,
 					  bool fromdisk,
 					  bool setParent, bool setNextXid)
 {
-	FullTransactionId nextXid = TransamVariables->nextXid;
 	TransactionId *subxids;
 	char	   *buf;
 	TwoPhaseFileHeader *hdr;
@@ -2195,51 +2266,6 @@ ProcessTwoPhaseBuffer(FullTransactionId fxid,
 	if (!fromdisk)
 		Assert(prepare_start_lsn != InvalidXLogRecPtr);
 
-	/* Already processed? */
-	if (TransactionIdDidCommit(XidFromFullTransactionId(fxid)) ||
-		TransactionIdDidAbort(XidFromFullTransactionId(fxid)))
-	{
-		if (fromdisk)
-		{
-			ereport(WARNING,
-					(errmsg("removing stale two-phase state file for transaction %u of epoch %u",
-							XidFromFullTransactionId(fxid),
-							EpochFromFullTransactionId(fxid))));
-			RemoveTwoPhaseFile(fxid, true);
-		}
-		else
-		{
-			ereport(WARNING,
-					(errmsg("removing stale two-phase state from memory for transaction %u of epoch %u",
-							XidFromFullTransactionId(fxid),
-							EpochFromFullTransactionId(fxid))));
-			PrepareRedoRemoveFull(fxid, true);
-		}
-		return NULL;
-	}
-
-	/* Reject XID if too new */
-	if (FullTransactionIdFollowsOrEquals(fxid, nextXid))
-	{
-		if (fromdisk)
-		{
-			ereport(WARNING,
-					(errmsg("removing future two-phase state file for transaction %u of epoch %u",
-							XidFromFullTransactionId(fxid),
-							EpochFromFullTransactionId(fxid))));
-			RemoveTwoPhaseFile(fxid, true);
-		}
-		else
-		{
-			ereport(WARNING,
-					(errmsg("removing future two-phase state from memory for transaction %u of epoch %u",
-							XidFromFullTransactionId(fxid),
-							EpochFromFullTransactionId(fxid))));
-			PrepareRedoRemoveFull(fxid, true);
-		}
-		return NULL;
-	}
-
 	if (fromdisk)
 	{
 		/* Read and validate file */
diff --git a/src/test/recovery/t/009_twophase.pl b/src/test/recovery/t/009_twophase.pl
index 1a662ebe499d..2d0bf63a27c0 100644
--- a/src/test/recovery/t/009_twophase.pl
+++ b/src/test/recovery/t/009_twophase.pl
@@ -5,6 +5,7 @@
 use strict;
 use warnings FATAL => 'all';
 
+use File::Copy;
 use PostgreSQL::Test::Cluster;
 use PostgreSQL::Test::Utils;
 use Test::More;
@@ -28,6 +29,15 @@ sub configure_and_reload
 	return;
 }
 
+sub twophase_file_name
+{
+	local $Test::Builder::Level = $Test::Builder::Level + 1;
+
+	my $epoch = shift;
+	my $xid = shift;
+	return sprintf("%08X%08X", $epoch, $xid);
+}
+
 # Set up two nodes, which will alternately be primary and replication standby.
 
 # Setup london node
@@ -572,4 +582,147 @@ my $nsubtrans = $cur_primary->safe_psql('postgres',
 );
 isnt($osubtrans, $nsubtrans, "contents of pg_subtrans/ have changed");
 
+###############################################################################
+# Check handling of already committed or aborted 2PC files at recovery.
+# This test does a manual copy of 2PC files created in a running server,
+# to cheaply emulate situations that could be found in base backups.
+###############################################################################
+
+# Issue a set of transactions that will be used for this portion of the test:
+# - One transaction to hold on the minimum xid horizon at bay.
+# - One transaction that will be found as already committed at recovery.
+# - One transaction that will be fonnd as already rollbacked at recovery.
+$cur_primary->psql(
+	'postgres', "
+	BEGIN;
+	INSERT INTO t_009_tbl VALUES (40, 'transaction: xid horizon');
+	PREPARE TRANSACTION 'xact_009_40';
+	BEGIN;
+	INSERT INTO t_009_tbl VALUES (41, 'transaction: commit-prepared');
+	PREPARE TRANSACTION 'xact_009_41';
+	BEGIN;
+	INSERT INTO t_009_tbl VALUES (42, 'transaction: rollback-prepared');
+	PREPARE TRANSACTION 'xact_009_42';");
+
+# Issue a checkpoint, defining the XID horizon based on the first transaction
+# and flushing to disk the two-phase files that are used later in this test.
+$cur_primary->psql('postgres', "CHECKPOINT");
+
+# Get the transaction IDs of the 2PC files.
+my $horizon_xid = int(
+	$cur_primary->safe_psql(
+		'postgres',
+		"SELECT transaction FROM pg_prepared_xacts WHERE gid = 'xact_009_40'")
+);
+my $commit_prepared_xid = int(
+	$cur_primary->safe_psql(
+		'postgres',
+		"SELECT transaction FROM pg_prepared_xacts WHERE gid = 'xact_009_41'")
+);
+my $abort_prepared_xid = int(
+	$cur_primary->safe_psql(
+		'postgres',
+		"SELECT transaction FROM pg_prepared_xacts WHERE gid = 'xact_009_42'")
+);
+
+# Copy the two-phase files that will be put back later.  Assume an
+# epoch of 0.
+my $commit_prepared_name = twophase_file_name(0, $commit_prepared_xid);
+my $abort_prepared_name = twophase_file_name(0, $abort_prepared_xid);
+
+my $twophase_tmpdir = $PostgreSQL::Test::Utils::tmp_check . '/' . "2pc_files";
+mkdir($twophase_tmpdir);
+my $primary_twophase_folder = $cur_primary->data_dir . '/pg_twophase/';
+copy("$primary_twophase_folder/$commit_prepared_name", $twophase_tmpdir);
+copy("$primary_twophase_folder/$abort_prepared_name", $twophase_tmpdir);
+
+# Issue abort/commit prepared.
+$cur_primary->psql('postgres', "COMMIT PREPARED 'xact_009_41'");
+$cur_primary->psql('postgres', "ROLLBACK PREPARED 'xact_009_42'");
+
+# Again checkpoint, to advance the LSN past the point where the two previous
+# transaction records would be replayed.
+$cur_primary->psql('postgres', "CHECKPOINT");
+
+# Take down node.
+$cur_primary->teardown_node;
+
+# Move back the two twophase files.
+copy("$twophase_tmpdir/$commit_prepared_name", $primary_twophase_folder);
+copy("$twophase_tmpdir/$abort_prepared_name", $primary_twophase_folder);
+
+# Grab location in logs of primary
+my $log_offset = -s $cur_primary->logfile;
+
+# Start node and check how the two previous files are handled during recovery
+# by checking the server logs: they are loaded when the contents of
+# pg_twophase/ are initially scanned, then removed at the end of recovery once
+# CLOG lookups are safe to do.
+$cur_primary->start;
+
+$cur_primary->log_check(
+	"two-phase file of transaction recovered at end of recovery",
+	$log_offset,
+	log_like => [
+		qr/recovering prepared transaction $horizon_xid of epoch 0 from shared memory/,
+	]);
+$cur_primary->log_check(
+	"two-phase files of transactions removed at end of recovery",
+	$log_offset,
+	log_like => [
+		qr/removing stale two-phase state from memory for transaction $commit_prepared_xid of epoch 0/,
+		qr/removing stale two-phase state from memory for transaction $abort_prepared_xid of epoch 0/,
+	]);
+
+# Commit the first transaction.
+$cur_primary->psql('postgres', "COMMIT PREPARED 'xact_009_40'");
+# After replay, there should be no 2PC transactions.
+$cur_primary->psql(
+	'postgres',
+	"SELECT * FROM pg_prepared_xact",
+	stdout => \$psql_out);
+is($psql_out, qq{}, "Check expected pg_prepared_xact data on primary");
+# Data from transactions should be around.
+$cur_primary->psql(
+	'postgres',
+	"SELECT * FROM t_009_tbl WHERE id IN (40, 41, 42);",
+	stdout => \$psql_out);
+is( $psql_out, qq{40|transaction: xid horizon
+41|transaction: commit-prepared},
+	"Check expected table data on primary");
+
+###############################################################################
+# Check handling of orphaned 2PC files at recovery.
+###############################################################################
+
+$cur_standby->teardown_node;
+$cur_primary->teardown_node;
+
+# Grab location in logs of primary
+$log_offset = -s $cur_primary->logfile;
+
+# Create fake files with a transaction ID large or low enough to be in the
+# future or the past, in different epochs, then check that the primary is able
+# to start and remove these files at recovery.
+
+# First bump the epoch with pg_resetwal.
+$cur_primary->command_ok(
+	[ 'pg_resetwal', '-e', 256, '-f', $cur_primary->data_dir ],
+	'bump epoch of primary');
+
+my $future_2pc_file =
+  $cur_primary->data_dir . '/pg_twophase/000001FF00000FFF';
+append_to_file $future_2pc_file, "";
+my $past_2pc_file = $cur_primary->data_dir . '/pg_twophase/000000EE00000FFF';
+append_to_file $past_2pc_file, "";
+
+$cur_primary->start;
+$cur_primary->log_check(
+	"two-phase files removed at recovery",
+	$log_offset,
+	log_like => [
+		qr/removing past two-phase state file for transaction 4095 of epoch 238/,
+		qr/removing future two-phase state file for transaction 4095 of epoch 511/
+	]);
+
 done_testing();
-- 
2.49.0

