From 56c94091614a884f08ca2aa11e96a1718605e1d9 Mon Sep 17 00:00:00 2001
From: Michael Paquier <michael@paquier.xyz>
Date: Fri, 31 Jan 2025 08:47:24 +0900
Subject: [PATCH v2 2/2] Fix issues with 2PC file handling at recovery

This addresses two issues:
- Avoid CLOG file lookups until we are sure that this is safe.  This is
now done at the end of recovery.
- Avoid mishandling of 2PC shmem state data.

Tests are added to show the problems possible.

Backpatch-through: 13
---
 src/backend/access/transam/twophase.c | 125 ++++++++++++-----------
 src/test/recovery/t/009_twophase.pl   | 140 ++++++++++++++++++++++++++
 2 files changed, 209 insertions(+), 56 deletions(-)

diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 4f5b4542662..c8a8d774b10 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -1893,13 +1893,16 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
  * Scan pg_twophase and fill TwoPhaseState depending on the on-disk data.
  * This is called once at the beginning of recovery, saving any extra
  * lookups in the future.  Two-phase files that are newer than the
- * minimum XID horizon are discarded on the way.
+ * minimum XID horizon are discarded on the way, as much as files that
+ * are older than the oldest XID horizon.
  */
 void
 restoreTwoPhaseData(void)
 {
 	DIR		   *cldir;
 	struct dirent *clde;
+	FullTransactionId nextXid = TransamVariables->nextXid;
+	FullTransactionId oldestXid = AdjustToFullTransactionId(TransamVariables->oldestXid);
 
 	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
 	cldir = AllocateDir(TWOPHASE_DIR);
@@ -1913,10 +1916,26 @@ restoreTwoPhaseData(void)
 
 			fxid = FullTransactionIdFromU64(strtou64(clde->d_name, NULL, 16));
 
+			/* Reject XID if too new or too old */
+			if (FullTransactionIdFollowsOrEquals(fxid, nextXid) ||
+				FullTransactionIdPrecedes(fxid, oldestXid))
+			{
+				if (FullTransactionIdFollowsOrEquals(fxid, nextXid))
+					ereport(WARNING,
+							(errmsg("removing future two-phase state file for transaction %u of epoch %u",
+									XidFromFullTransactionId(fxid),
+									EpochFromFullTransactionId(fxid))));
+				else
+					ereport(WARNING,
+							(errmsg("removing past two-phase state file for transaction %u of epoch %u",
+									XidFromFullTransactionId(fxid),
+									EpochFromFullTransactionId(fxid))));
+				RemoveTwoPhaseFile(fxid, true);
+				continue;
+			}
+
 			buf = ProcessTwoPhaseBuffer(fxid, InvalidXLogRecPtr,
 										true, false, false);
-			if (buf == NULL)
-				continue;
 
 			PrepareRedoAdd(fxid, buf, InvalidXLogRecPtr,
 						   InvalidXLogRecPtr, InvalidRepOriginId);
@@ -1982,9 +2001,6 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
 									gxact->prepare_start_lsn,
 									gxact->ondisk, false, true);
 
-		if (buf == NULL)
-			continue;
-
 		/*
 		 * OK, we think this file is valid.  Incorporate xid into the
 		 * running-minimum result.
@@ -2052,8 +2068,7 @@ StandbyRecoverPreparedTransactions(void)
 		buf = ProcessTwoPhaseBuffer(gxact->fxid,
 									gxact->prepare_start_lsn,
 									gxact->ondisk, true, false);
-		if (buf != NULL)
-			pfree(buf);
+		pfree(buf);
 	}
 	LWLockRelease(TwoPhaseStateLock);
 }
@@ -2078,8 +2093,21 @@ void
 RecoverPreparedTransactions(void)
 {
 	int			i;
+	FullTransactionId *remove_fxids;
+	int			remove_fxids_cnt;
 
 	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
+
+	/*
+	 * Track XIDs candidate for removal if found as already committed or
+	 * aborted, once the first scan through TwoPhaseState is done.  This
+	 * cannot happen while going through the entries in TwoPhaseState as
+	 * PrepareRedoRemove() manipulates it.
+	 */
+	remove_fxids_cnt = 0;
+	remove_fxids = (FullTransactionId *) palloc(TwoPhaseState->numPrepXacts *
+												sizeof(FullTransactionId));
+
 	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
 	{
 		char	   *buf;
@@ -2090,6 +2118,27 @@ RecoverPreparedTransactions(void)
 		TransactionId *subxids;
 		const char *gid;
 
+		/*
+		 * Is this transaction already aborted or committed?  If yes, mark it
+		 * for removal.
+		 *
+		 * Checking CLOGs if these transactions have been already aborted or
+		 * committed is safe at this stage; we are at the end of recovery and
+		 * all WAL has been replayed, all 2PC transactions are reinstated and
+		 * should be tracked in TwoPhaseState.
+		 */
+		if (TransactionIdDidCommit(XidFromFullTransactionId(fxid)) ||
+			TransactionIdDidAbort(XidFromFullTransactionId(fxid)))
+		{
+			/*
+			 * Track this transaction ID for its removal from the shared
+			 * memory state at the end.
+			 */
+			remove_fxids[remove_fxids_cnt] = fxid;
+			remove_fxids_cnt++;
+			continue;
+		}
+
 		/*
 		 * Reconstruct subtrans state for the transaction --- needed because
 		 * pg_subtrans is not preserved over a restart.  Note that we are
@@ -2102,8 +2151,6 @@ RecoverPreparedTransactions(void)
 		buf = ProcessTwoPhaseBuffer(gxact->fxid,
 									gxact->prepare_start_lsn,
 									gxact->ondisk, true, false);
-		if (buf == NULL)
-			continue;
 
 		ereport(LOG,
 				(errmsg("recovering prepared transaction %u of epoch %u from shared memory",
@@ -2164,6 +2211,18 @@ RecoverPreparedTransactions(void)
 		LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
 	}
 
+	for (i = 0; i < remove_fxids_cnt; i++)
+	{
+		FullTransactionId fxid = remove_fxids[i];
+
+		ereport(WARNING,
+				(errmsg("removing stale two-phase state from memory for transaction %u of epoch %u",
+						XidFromFullTransactionId(fxid),
+						EpochFromFullTransactionId(fxid))));
+
+		PrepareRedoRemoveFull(fxid, true);
+	}
+
 	LWLockRelease(TwoPhaseStateLock);
 }
 
@@ -2184,7 +2243,6 @@ ProcessTwoPhaseBuffer(FullTransactionId fxid,
 					  bool fromdisk,
 					  bool setParent, bool setNextXid)
 {
-	FullTransactionId nextXid = TransamVariables->nextXid;
 	TransactionId *subxids;
 	char	   *buf;
 	TwoPhaseFileHeader *hdr;
@@ -2195,51 +2253,6 @@ ProcessTwoPhaseBuffer(FullTransactionId fxid,
 	if (!fromdisk)
 		Assert(prepare_start_lsn != InvalidXLogRecPtr);
 
-	/* Already processed? */
-	if (TransactionIdDidCommit(XidFromFullTransactionId(fxid)) ||
-		TransactionIdDidAbort(XidFromFullTransactionId(fxid)))
-	{
-		if (fromdisk)
-		{
-			ereport(WARNING,
-					(errmsg("removing stale two-phase state file for transaction %u of epoch %u",
-							XidFromFullTransactionId(fxid),
-							EpochFromFullTransactionId(fxid))));
-			RemoveTwoPhaseFile(fxid, true);
-		}
-		else
-		{
-			ereport(WARNING,
-					(errmsg("removing stale two-phase state from memory for transaction %u of epoch %u",
-							XidFromFullTransactionId(fxid),
-							EpochFromFullTransactionId(fxid))));
-			PrepareRedoRemoveFull(fxid, true);
-		}
-		return NULL;
-	}
-
-	/* Reject XID if too new */
-	if (FullTransactionIdFollowsOrEquals(fxid, nextXid))
-	{
-		if (fromdisk)
-		{
-			ereport(WARNING,
-					(errmsg("removing future two-phase state file for transaction %u of epoch %u",
-							XidFromFullTransactionId(fxid),
-							EpochFromFullTransactionId(fxid))));
-			RemoveTwoPhaseFile(fxid, true);
-		}
-		else
-		{
-			ereport(WARNING,
-					(errmsg("removing future two-phase state from memory for transaction %u of epoch %u",
-							XidFromFullTransactionId(fxid),
-							EpochFromFullTransactionId(fxid))));
-			PrepareRedoRemoveFull(fxid, true);
-		}
-		return NULL;
-	}
-
 	if (fromdisk)
 	{
 		/* Read and validate file */
diff --git a/src/test/recovery/t/009_twophase.pl b/src/test/recovery/t/009_twophase.pl
index 1a662ebe499..3a3714a2d8b 100644
--- a/src/test/recovery/t/009_twophase.pl
+++ b/src/test/recovery/t/009_twophase.pl
@@ -5,6 +5,7 @@
 use strict;
 use warnings FATAL => 'all';
 
+use File::Copy;
 use PostgreSQL::Test::Cluster;
 use PostgreSQL::Test::Utils;
 use Test::More;
@@ -28,6 +29,15 @@ sub configure_and_reload
 	return;
 }
 
+sub twophase_file_name
+{
+	local $Test::Builder::Level = $Test::Builder::Level + 1;
+
+	my $epoch = shift;
+	my $xid = shift;
+	return sprintf("%08X%08X", $epoch, $xid);
+}
+
 # Set up two nodes, which will alternately be primary and replication standby.
 
 # Setup london node
@@ -572,4 +582,134 @@ my $nsubtrans = $cur_primary->safe_psql('postgres',
 );
 isnt($osubtrans, $nsubtrans, "contents of pg_subtrans/ have changed");
 
+###############################################################################
+# Check handling of already committed or aborted 2PC files at recovery.
+# This test does a manual copy of 2PC files created in a running server,
+# to cheaply emulate situations that could be found in base backups.
+###############################################################################
+
+# Issue a set of transactions that will be used for this portion of the test:
+# - One transaction to hold on the minimum xid horizon at bay.
+# - One transaction that will be found as already committed at recovery.
+# - One transaction that will be fonnd as already rollbacked at recovery.
+$cur_primary->psql(
+	'postgres', "
+	BEGIN;
+	INSERT INTO t_009_tbl VALUES (40, 'transaction: xid horizon');
+	PREPARE TRANSACTION 'xact_009_40';
+	BEGIN;
+	INSERT INTO t_009_tbl VALUES (41, 'transaction: commit-prepared');
+	PREPARE TRANSACTION 'xact_009_41';
+	BEGIN;
+	INSERT INTO t_009_tbl VALUES (42, 'transaction: rollback-prepared');
+	PREPARE TRANSACTION 'xact_009_42';");
+
+# Issue a checkpoint, fixing the XID horizon based on the first transaction,
+# flushing to disk the two files to use.
+$cur_primary->psql('postgres', "CHECKPOINT");
+
+# Get the transaction IDs of the ones to 2PC files to manipulate.
+my $commit_prepared_xid = int(
+	$cur_primary->safe_psql(
+		'postgres',
+		"SELECT transaction FROM pg_prepared_xacts WHERE gid = 'xact_009_41'")
+);
+my $abort_prepared_xid = int(
+	$cur_primary->safe_psql(
+		'postgres',
+		"SELECT transaction FROM pg_prepared_xacts WHERE gid = 'xact_009_42'")
+);
+
+# Copy the two-phase files that will be put back later.  Assume an
+# epoch of 0.
+my $commit_prepared_name = twophase_file_name(0, $commit_prepared_xid);
+my $abort_prepared_name = twophase_file_name(0, $abort_prepared_xid);
+
+my $twophase_tmpdir = $PostgreSQL::Test::Utils::tmp_check . '/' . "2pc_files";
+mkdir($twophase_tmpdir);
+my $primary_twophase_folder = $cur_primary->data_dir . '/pg_twophase/';
+copy("$primary_twophase_folder/$commit_prepared_name", $twophase_tmpdir);
+copy("$primary_twophase_folder/$abort_prepared_name", $twophase_tmpdir);
+
+# Issue abort/commit prepared.
+$cur_primary->psql('postgres', "COMMIT PREPARED 'xact_009_41'");
+$cur_primary->psql('postgres', "ROLLBACK PREPARED 'xact_009_42'");
+
+# Again checkpoint, to advance the LSN past the point where the two previous
+# transaction records would be replayed.
+$cur_primary->psql('postgres', "CHECKPOINT");
+
+# Take down node.
+$cur_primary->teardown_node;
+
+# Move back the two twophase files.
+copy("$twophase_tmpdir/$commit_prepared_name", $primary_twophase_folder);
+copy("$twophase_tmpdir/$abort_prepared_name", $primary_twophase_folder);
+
+# Grab location in logs of primary
+my $log_offset = -s $cur_primary->logfile;
+
+# Start node and check that the two previous files are removed by checking the
+# server logs, following the CLOG lookup done at the end of recovery.
+$cur_primary->start;
+
+$cur_primary->log_check(
+	"two-phase files of committed transactions removed at recovery",
+	$log_offset,
+	log_like => [
+		qr/removing stale two-phase state from memory for transaction $commit_prepared_xid of epoch 0/,
+		qr/removing stale two-phase state from memory for transaction $abort_prepared_xid of epoch 0/
+	]);
+
+# Commit the first transaction.
+$cur_primary->psql('postgres', "COMMIT PREPARED 'xact_009_40'");
+# After replay, there should be no 2PC transactions.
+$cur_primary->psql(
+	'postgres',
+	"SELECT * FROM pg_prepared_xact",
+	stdout => \$psql_out);
+is($psql_out, qq{}, "Check expected pg_prepared_xact data on primary");
+# Data from transactions should be around.
+$cur_primary->psql(
+	'postgres',
+	"SELECT * FROM t_009_tbl WHERE id IN (40, 41, 42);",
+	stdout => \$psql_out);
+is( $psql_out, qq{40|transaction: xid horizon
+41|transaction: commit-prepared},
+	"Check expected table data on primary");
+
+###############################################################################
+# Check handling of orphaned 2PC files at recovery.
+###############################################################################
+
+$cur_standby->teardown_node;
+$cur_primary->teardown_node;
+
+# Grab location in logs of primary
+$log_offset = -s $cur_primary->logfile;
+
+# Create fake files with a transaction ID large or low enough to be in the
+# future or the past, in different epochs, then check that the primary is able
+# to start and remove these files at recovery.
+
+# First bump the epoch with pg_resetwal.
+$cur_primary->command_ok(
+	[ 'pg_resetwal', '-e', 256, '-f', $cur_primary->data_dir ],
+	'bump epoch of primary');
+
+my $future_2pc_file =
+  $cur_primary->data_dir . '/pg_twophase/000001FF00000FFF';
+append_to_file $future_2pc_file, "";
+my $past_2pc_file = $cur_primary->data_dir . '/pg_twophase/000000EE00000FFF';
+append_to_file $past_2pc_file, "";
+
+$cur_primary->start;
+$cur_primary->log_check(
+	"two-phase files removed at recovery",
+	$log_offset,
+	log_like => [
+		qr/removing past two-phase state file for transaction 4095 of epoch 238/,
+		qr/removing future two-phase state file for transaction 4095 of epoch 511/
+	]);
+
 done_testing();
-- 
2.47.2

