From b7a7c951750b961fc16ffbd1063ee87570d77219 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Fri, 17 Dec 2021 09:00:44 +0200
Subject: [PATCH v9 5/5] Move code to apply one WAL record to a subroutine.

---
 src/backend/access/transam/xlogrecovery.c | 282 +++++++++++-----------
 1 file changed, 147 insertions(+), 135 deletions(-)

diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index b9fb61d1dbb..875188cc7a8 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -374,6 +374,8 @@ static char recoveryStopName[MAXFNAMELEN];
 static bool recoveryStopAfter;
 
 /* prototypes for local functions */
+static void ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *replayTLI);
+
 static void readRecoverySignalFile(void);
 static void validateRecoveryParameters(void);
 static bool read_backup_label(XLogRecPtr *checkPointLoc,
@@ -1569,7 +1571,6 @@ PerformWalRecovery(void)
 
 	if (record != NULL)
 	{
-		ErrorContextCallback errcallback;
 		TimestampTz xtime;
 		PGRUsage	ru0;
 
@@ -1597,8 +1598,6 @@ PerformWalRecovery(void)
 		 */
 		do
 		{
-			bool		switchedTLI = false;
-
 			if (!StandbyMode)
 				ereport_startup_progress("redo in progress, elapsed time: %ld.%02d s, current LSN: %X/%X",
 										 LSN_FORMAT_ARGS(xlogreader->ReadRecPtr));
@@ -1668,140 +1667,10 @@ PerformWalRecovery(void)
 					recoveryPausesHere(false);
 			}
 
-			/* Setup error traceback support for ereport() */
-			errcallback.callback = rm_redo_error_callback;
-			errcallback.arg = (void *) xlogreader;
-			errcallback.previous = error_context_stack;
-			error_context_stack = &errcallback;
-
-			/*
-			 * ShmemVariableCache->nextXid must be beyond record's xid.
-			 */
-			AdvanceNextFullTransactionIdPastXid(record->xl_xid);
-
-			/*
-			 * Before replaying this record, check if this record causes the
-			 * current timeline to change. The record is already considered to
-			 * be part of the new timeline, so we update ThisTimeLineID before
-			 * replaying it. That's important so that replayEndTLI, which is
-			 * recorded as the minimum recovery point's TLI if recovery stops
-			 * after this record, is set correctly.
-			 */
-			if (record->xl_rmid == RM_XLOG_ID)
-			{
-				TimeLineID	newReplayTLI = replayTLI;
-				TimeLineID	prevReplayTLI = replayTLI;
-				uint8		info = record->xl_info & ~XLR_INFO_MASK;
-
-				if (info == XLOG_CHECKPOINT_SHUTDOWN)
-				{
-					CheckPoint	checkPoint;
-
-					memcpy(&checkPoint, XLogRecGetData(xlogreader), sizeof(CheckPoint));
-					newReplayTLI = checkPoint.ThisTimeLineID;
-					prevReplayTLI = checkPoint.PrevTimeLineID;
-				}
-				else if (info == XLOG_END_OF_RECOVERY)
-				{
-					xl_end_of_recovery xlrec;
-
-					memcpy(&xlrec, XLogRecGetData(xlogreader), sizeof(xl_end_of_recovery));
-					newReplayTLI = xlrec.ThisTimeLineID;
-					prevReplayTLI = xlrec.PrevTimeLineID;
-				}
-
-				if (newReplayTLI != replayTLI)
-				{
-					/* Check that it's OK to switch to this TLI */
-					checkTimeLineSwitch(xlogreader->EndRecPtr, newReplayTLI,
-										prevReplayTLI, replayTLI);
-
-					/* Following WAL records should be run with new TLI */
-					replayTLI = newReplayTLI;
-					switchedTLI = true;
-				}
-			}
-
-			/*
-			 * Update shared replayEndRecPtr before replaying this record, so
-			 * that XLogFlush will update minRecoveryPoint correctly.
-			 */
-			SpinLockAcquire(&XLogRecoveryCtl->info_lck);
-			XLogRecoveryCtl->replayEndRecPtr = xlogreader->EndRecPtr;
-			XLogRecoveryCtl->replayEndTLI = replayTLI;
-			SpinLockRelease(&XLogRecoveryCtl->info_lck);
-
-			/*
-			 * If we are attempting to enter Hot Standby mode, process XIDs we
-			 * see
-			 */
-			if (standbyState >= STANDBY_INITIALIZED &&
-				TransactionIdIsValid(record->xl_xid))
-				RecordKnownAssignedTransactionIds(record->xl_xid);
-
-			/*
-			 * Some XLOG record types that are related to recovery are
-			 * processed directly here, rather than in xlog_redo()
-			 */
-			if (record->xl_rmid == RM_XLOG_ID)
-				xlogrecovery_redo(xlogreader, replayTLI);
-
-			/* Now apply the WAL record itself */
-			RmgrTable[record->xl_rmid].rm_redo(xlogreader);
-
-			/*
-			 * After redo, check whether the backup pages associated with the
-			 * WAL record are consistent with the existing pages. This check
-			 * is done only if consistency check is enabled for this record.
-			 */
-			if ((record->xl_info & XLR_CHECK_CONSISTENCY) != 0)
-				verifyBackupPageConsistency(xlogreader);
-
-			/* Pop the error context stack */
-			error_context_stack = errcallback.previous;
-
 			/*
-			 * Update lastReplayedEndRecPtr after this record has been
-			 * successfully replayed.
+			 * Apply the record
 			 */
-			SpinLockAcquire(&XLogRecoveryCtl->info_lck);
-			XLogRecoveryCtl->lastReplayedEndRecPtr = xlogreader->EndRecPtr;
-			XLogRecoveryCtl->lastReplayedTLI = replayTLI;
-			SpinLockRelease(&XLogRecoveryCtl->info_lck);
-
-			/* Also remember its starting position. */
-			LastReplayedReadRecPtr = xlogreader->ReadRecPtr;
-
-			/*
-			 * If rm_redo called XLogRequestWalReceiverReply, then we wake up
-			 * the receiver so that it notices the updated
-			 * lastReplayedEndRecPtr and sends a reply to the primary.
-			 */
-			if (doRequestWalReceiverReply)
-			{
-				doRequestWalReceiverReply = false;
-				WalRcvForceReply();
-			}
-
-			/* Allow read-only connections if we're consistent now */
-			CheckRecoveryConsistency();
-
-			/* Is this a timeline switch? */
-			if (switchedTLI)
-			{
-				/*
-				 * Before we continue on the new timeline, clean up any
-				 * (possibly bogus) future WAL segments on the old timeline.
-				 */
-				RemoveNonParentXlogFiles(xlogreader->EndRecPtr, replayTLI);
-
-				/*
-				 * Wake up any walsenders to notice that we are on a new
-				 * timeline.
-				 */
-				if (AllowCascadeReplication())
-					WalSndWakeup();
-			}
+			ApplyWalRecord(xlogreader, record, &replayTLI);
 
 			/* Exit loop if we reached inclusive recovery target */
 			if (recoveryStopsAfter(xlogreader))
@@ -1889,6 +1758,149 @@ PerformWalRecovery(void)
 				(errmsg("recovery ended before configured recovery target was reached")));
 }
 
+/*
+ * Subroutine of PerformWalRecovery, to apply one WAL record.
+ */
+static void
+ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *replayTLI)
+{
+	ErrorContextCallback errcallback;
+	bool		switchedTLI = false;
+
+	/* Setup error traceback support for ereport() */
+	errcallback.callback = rm_redo_error_callback;
+	errcallback.arg = (void *) xlogreader;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/*
+	 * ShmemVariableCache->nextXid must be beyond record's xid.
+	 */
+	AdvanceNextFullTransactionIdPastXid(record->xl_xid);
+
+	/*
+	 * Before replaying this record, check if this record causes the current
+	 * timeline to change. The record is already considered to be part of the
+	 * new timeline, so we update replayTLI before replaying it. That's
+	 * important so that replayEndTLI, which is recorded as the minimum
+	 * recovery point's TLI if recovery stops after this record, is set
+	 * correctly.
+	 */
+	if (record->xl_rmid == RM_XLOG_ID)
+	{
+		TimeLineID	newReplayTLI = *replayTLI;
+		TimeLineID	prevReplayTLI = *replayTLI;
+		uint8		info = record->xl_info & ~XLR_INFO_MASK;
+
+		if (info == XLOG_CHECKPOINT_SHUTDOWN)
+		{
+			CheckPoint	checkPoint;
+
+			memcpy(&checkPoint, XLogRecGetData(xlogreader), sizeof(CheckPoint));
+			newReplayTLI = checkPoint.ThisTimeLineID;
+			prevReplayTLI = checkPoint.PrevTimeLineID;
+		}
+		else if (info == XLOG_END_OF_RECOVERY)
+		{
+			xl_end_of_recovery xlrec;
+
+			memcpy(&xlrec, XLogRecGetData(xlogreader), sizeof(xl_end_of_recovery));
+			newReplayTLI = xlrec.ThisTimeLineID;
+			prevReplayTLI = xlrec.PrevTimeLineID;
+		}
+
+		if (newReplayTLI != *replayTLI)
+		{
+			/* Check that it's OK to switch to this TLI */
+			checkTimeLineSwitch(xlogreader->EndRecPtr,
+								newReplayTLI, prevReplayTLI, *replayTLI);
+
+			/* Following WAL records should be run with new TLI */
+			*replayTLI = newReplayTLI;
+			switchedTLI = true;
+		}
+	}
+
+	/*
+	 * Update shared replayEndRecPtr before replaying this record, so that
+	 * XLogFlush will update minRecoveryPoint correctly.
+	 */
+	SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+	XLogRecoveryCtl->replayEndRecPtr = xlogreader->EndRecPtr;
+	XLogRecoveryCtl->replayEndTLI = *replayTLI;
+	SpinLockRelease(&XLogRecoveryCtl->info_lck);
+
+	/*
+	 * If we are attempting to enter Hot Standby mode, process XIDs we see
+	 */
+	if (standbyState >= STANDBY_INITIALIZED &&
+		TransactionIdIsValid(record->xl_xid))
+		RecordKnownAssignedTransactionIds(record->xl_xid);
+
+	/*
+	 * Some XLOG record types that are related to recovery are processed
+	 * directly here, rather than in xlog_redo()
+	 */
+	if (record->xl_rmid == RM_XLOG_ID)
+		xlogrecovery_redo(xlogreader, *replayTLI);
+
+	/* Now apply the WAL record itself */
+	RmgrTable[record->xl_rmid].rm_redo(xlogreader);
+
+	/*
+	 * After redo, check whether the backup pages associated with the WAL
+	 * record are consistent with the existing pages. This check is done only
+	 * if consistency check is enabled for this record.
+	 */
+	if ((record->xl_info & XLR_CHECK_CONSISTENCY) != 0)
+		verifyBackupPageConsistency(xlogreader);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+
+	/*
+	 * Update lastReplayedEndRecPtr after this record has been successfully
+	 * replayed.
+	 */
+	SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+	XLogRecoveryCtl->lastReplayedEndRecPtr = xlogreader->EndRecPtr;
+	XLogRecoveryCtl->lastReplayedTLI = *replayTLI;
+	SpinLockRelease(&XLogRecoveryCtl->info_lck);
+
+	/* Also remember its starting position. */
+	LastReplayedReadRecPtr = xlogreader->ReadRecPtr;
+
+	/*
+	 * If rm_redo called XLogRequestWalReceiverReply, then we wake up the
+	 * receiver so that it notices the updated lastReplayedEndRecPtr and sends
+	 * a reply to the primary.
+	 */
+	if (doRequestWalReceiverReply)
+	{
+		doRequestWalReceiverReply = false;
+		WalRcvForceReply();
+	}
+
+	/* Allow read-only connections if we're consistent now */
+	CheckRecoveryConsistency();
+
+	/* Is this a timeline switch? */
+	if (switchedTLI)
+	{
+		/*
+		 * Before we continue on the new timeline, clean up any (possibly
+		 * bogus) future WAL segments on the old timeline.
+		 */
+		RemoveNonParentXlogFiles(xlogreader->EndRecPtr, *replayTLI);
+
+		/*
+		 * Wake up any walsenders to notice that we are on a new timeline.
+		 */
+		if (AllowCascadeReplication())
+			WalSndWakeup();
+	}
+}
+
 /*
  * Some XLOG RM record types that are directly related to WAL recovery are
  * handled here rather than in the xlog_redo()
-- 
2.30.2

