From 80e9bcf6ce4f169386faacbe661752cf5cae9ba1 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Thu, 16 Sep 2021 11:07:45 +0300
Subject: [PATCH v7 5/5] Move code to apply one WAL record to a subroutine.

---
 src/backend/access/transam/xlogrecovery.c | 284 +++++++++++-----------
 1 file changed, 147 insertions(+), 137 deletions(-)

diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 5b9d928a8ab..fe6b215b9c5 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -367,6 +367,7 @@ static char recoveryStopName[MAXFNAMELEN];
 static bool recoveryStopAfter;
 
 /* prototypes for local functions */
+static void ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *replayTLI);
 static void xlog_block_info(StringInfo buf, XLogReaderState *record);
 
 static void readRecoverySignalFile(void);
@@ -1396,11 +1397,8 @@ PerformWalRecovery(void)
 
 	if (record != NULL)
 	{
-		ErrorContextCallback errcallback;
 		TimestampTz xtime;
 		PGRUsage	ru0;
-		XLogRecPtr	ReadRecPtr;
-		XLogRecPtr	EndRecPtr;
 
 		pg_rusage_init(&ru0);
 
@@ -1426,14 +1424,9 @@ PerformWalRecovery(void)
 		 */
 		do
 		{
-			bool		switchedTLI = false;
-
-			ReadRecPtr = xlogreader->ReadRecPtr;
-			EndRecPtr = xlogreader->EndRecPtr;
-
 			if (!StandbyMode)
 				ereport_startup_progress("redo in progress, elapsed time: %ld.%02d s, current LSN: %X/%X",
-										 LSN_FORMAT_ARGS(ReadRecPtr));
+										 LSN_FORMAT_ARGS(xlogreader->ReadRecPtr));
 
 #ifdef WAL_DEBUG
 			if (XLOG_DEBUG ||
@@ -1444,8 +1437,8 @@ PerformWalRecovery(void)
 
 				initStringInfo(&buf);
 				appendStringInfo(&buf, "REDO @ %X/%X; LSN %X/%X: ",
-								 LSN_FORMAT_ARGS(ReadRecPtr),
-								 LSN_FORMAT_ARGS(EndRecPtr));
+								 LSN_FORMAT_ARGS(xlogreader->ReadRecPtr),
+								 LSN_FORMAT_ARGS(xlogreader->EndRecPtr));
 				xlog_outrec(&buf, xlogreader);
 				appendStringInfoString(&buf, " - ");
 				xlog_outdesc(&buf, xlogreader);
@@ -1500,133 +1493,10 @@ PerformWalRecovery(void)
 					recoveryPausesHere(false);
 			}
 
-			/* Setup error traceback support for ereport() */
-			errcallback.callback = rm_redo_error_callback;
-			errcallback.arg = (void *) xlogreader;
-			errcallback.previous = error_context_stack;
-			error_context_stack = &errcallback;
-
-			/*
-			 * ShmemVariableCache->nextXid must be beyond record's xid.
-			 */
-			AdvanceNextFullTransactionIdPastXid(record->xl_xid);
-
-			/*
-			 * Before replaying this record, check if this record causes the
-			 * current timeline to change. The record is already considered to
-			 * be part of the new timeline, so we update ThisTimeLineID before
-			 * replaying it. That's important so that replayEndTLI, which is
-			 * recorded as the minimum recovery point's TLI if recovery stops
-			 * after this record, is set correctly.
-			 */
-			if (record->xl_rmid == RM_XLOG_ID)
-			{
-				TimeLineID	newReplayTLI = replayTLI;
-				TimeLineID	prevReplayTLI = replayTLI;
-				uint8		info = record->xl_info & ~XLR_INFO_MASK;
-
-				if (info == XLOG_CHECKPOINT_SHUTDOWN)
-				{
-					CheckPoint	checkPoint;
-
-					memcpy(&checkPoint, XLogRecGetData(xlogreader), sizeof(CheckPoint));
-					newReplayTLI = checkPoint.ThisTimeLineID;
-					prevReplayTLI = checkPoint.PrevTimeLineID;
-				}
-				else if (info == XLOG_END_OF_RECOVERY)
-				{
-					xl_end_of_recovery xlrec;
-
-					memcpy(&xlrec, XLogRecGetData(xlogreader), sizeof(xl_end_of_recovery));
-					newReplayTLI = xlrec.ThisTimeLineID;
-					prevReplayTLI = xlrec.PrevTimeLineID;
-				}
-
-				if (newReplayTLI != replayTLI)
-				{
-					/* Check that it's OK to switch to this TLI */
-					checkTimeLineSwitch(EndRecPtr, newReplayTLI,
-										prevReplayTLI, replayTLI);
-
-					/* Following WAL records should be run with new TLI */
-					replayTLI = newReplayTLI;
-					switchedTLI = true;
-				}
-			}
-
-			/*
-			 * Update shared replayEndRecPtr before replaying this record, so
-			 * that XLogFlush will update minRecoveryPoint correctly.
-			 */
-			SpinLockAcquire(&XLogRecoveryCtl->info_lck);
-			XLogRecoveryCtl->replayEndRecPtr = EndRecPtr;
-			XLogRecoveryCtl->replayEndTLI = replayTLI;
-			SpinLockRelease(&XLogRecoveryCtl->info_lck);
-
-			/*
-			 * If we are attempting to enter Hot Standby mode, process XIDs we
-			 * see
-			 */
-			if (standbyState >= STANDBY_INITIALIZED &&
-				TransactionIdIsValid(record->xl_xid))
-				RecordKnownAssignedTransactionIds(record->xl_xid);
-
-			/* Now apply the WAL record itself */
-			RmgrTable[record->xl_rmid].rm_redo(xlogreader);
-
-			/*
-			 * After redo, check whether the backup pages associated with the
-			 * WAL record are consistent with the existing pages. This check
-			 * is done only if consistency check is enabled for this record.
-			 */
-			if ((record->xl_info & XLR_CHECK_CONSISTENCY) != 0)
-				checkXLogConsistency(xlogreader);
-
-			/* Pop the error context stack */
-			error_context_stack = errcallback.previous;
-
-			/*
-			 * Update lastReplayedEndRecPtr after this record has been
-			 * successfully replayed.
-			 */
-			SpinLockAcquire(&XLogRecoveryCtl->info_lck);
-			XLogRecoveryCtl->lastReplayedEndRecPtr = EndRecPtr;
-			XLogRecoveryCtl->lastReplayedTLI = replayTLI;
-			SpinLockRelease(&XLogRecoveryCtl->info_lck);
-
-			/* Also remember its starting position. */
-			LastReplayedReadRecPtr = ReadRecPtr;
-
 			/*
-			 * If rm_redo called XLogRequestWalReceiverReply, then we wake up
-			 * the receiver so that it notices the updated
-			 * lastReplayedEndRecPtr and sends a reply to the primary.
+			 * Apply the record
 			 */
-			if (doRequestWalReceiverReply)
-			{
-				doRequestWalReceiverReply = false;
-				WalRcvForceReply();
-			}
-
-			/* Allow read-only connections if we're consistent now */
-			CheckRecoveryConsistency();
-
-			/* Is this a timeline switch? */
-			if (switchedTLI)
-			{
-				/*
-				 * Before we continue on the new timeline, clean up any
-				 * (possibly bogus) future WAL segments on the old timeline.
-				 */
-				RemoveNonParentXlogFiles(EndRecPtr, replayTLI);
-
-				/*
-				 * Wake up any walsenders to notice that we are on a new
-				 * timeline.
-				 */
-				if (AllowCascadeReplication())
-					WalSndWakeup();
-			}
+			ApplyWalRecord(xlogreader, record, &replayTLI);
 
 			/* Exit loop if we reached inclusive recovery target */
 			if (recoveryStopsAfter(xlogreader))
@@ -1685,7 +1555,7 @@ PerformWalRecovery(void)
 
 		ereport(LOG,
 				(errmsg("redo done at %X/%X system usage: %s",
-						LSN_FORMAT_ARGS(ReadRecPtr),
+						LSN_FORMAT_ARGS(xlogreader->ReadRecPtr),
 						pg_rusage_show(&ru0))));
 		xtime = GetLatestXTime();
 		if (xtime)
@@ -1714,6 +1584,146 @@ PerformWalRecovery(void)
 				(errmsg("recovery ended before configured recovery target was reached")));
 }
 
+/*
+ * Subroutine of PerformWalRecovery, to apply one WAL record.
+ */
+static void
+ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *replayTLI)
+{
+	XLogRecPtr	ReadRecPtr;
+	XLogRecPtr	EndRecPtr;
+	ErrorContextCallback errcallback;
+	bool		switchedTLI = false;
+
+	ReadRecPtr = xlogreader->ReadRecPtr;
+	EndRecPtr = xlogreader->EndRecPtr;
+
+	/* Setup error traceback support for ereport() */
+	errcallback.callback = rm_redo_error_callback;
+	errcallback.arg = (void *) xlogreader;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/*
+	 * ShmemVariableCache->nextXid must be beyond record's xid.
+	 */
+	AdvanceNextFullTransactionIdPastXid(record->xl_xid);
+
+	/*
+	 * Before replaying this record, check if this record causes the current
+	 * timeline to change. The record is already considered to be part of the
+	 * new timeline, so we update replayTLI before replaying it. That's
+	 * important so that replayEndTLI, which is recorded as the minimum
+	 * recovery point's TLI if recovery stops after this record, is set
+	 * correctly.
+	 */
+	if (record->xl_rmid == RM_XLOG_ID)
+	{
+		TimeLineID	newReplayTLI = *replayTLI;
+		TimeLineID	prevReplayTLI = *replayTLI;
+		uint8		info = record->xl_info & ~XLR_INFO_MASK;
+
+		if (info == XLOG_CHECKPOINT_SHUTDOWN)
+		{
+			CheckPoint	checkPoint;
+
+			memcpy(&checkPoint, XLogRecGetData(xlogreader), sizeof(CheckPoint));
+			newReplayTLI = checkPoint.ThisTimeLineID;
+			prevReplayTLI = checkPoint.PrevTimeLineID;
+		}
+		else if (info == XLOG_END_OF_RECOVERY)
+		{
+			xl_end_of_recovery xlrec;
+
+			memcpy(&xlrec, XLogRecGetData(xlogreader), sizeof(xl_end_of_recovery));
+			newReplayTLI = xlrec.ThisTimeLineID;
+			prevReplayTLI = xlrec.PrevTimeLineID;
+		}
+
+		if (newReplayTLI != *replayTLI)
+		{
+			/* Check that it's OK to switch to this TLI */
+			checkTimeLineSwitch(EndRecPtr, newReplayTLI, prevReplayTLI, *replayTLI);
+
+			/* Following WAL records should be run with new TLI */
+			*replayTLI = newReplayTLI;
+			switchedTLI = true;
+		}
+	}
+
+	/*
+	 * Update shared replayEndRecPtr before replaying this record, so that
+	 * XLogFlush will update minRecoveryPoint correctly.
+	 */
+	SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+	XLogRecoveryCtl->replayEndRecPtr = EndRecPtr;
+	XLogRecoveryCtl->replayEndTLI = *replayTLI;
+	SpinLockRelease(&XLogRecoveryCtl->info_lck);
+
+	/*
+	 * If we are attempting to enter Hot Standby mode, process XIDs we see
+	 */
+	if (standbyState >= STANDBY_INITIALIZED &&
+		TransactionIdIsValid(record->xl_xid))
+		RecordKnownAssignedTransactionIds(record->xl_xid);
+
+	/* Now apply the WAL record itself */
+	RmgrTable[record->xl_rmid].rm_redo(xlogreader);
+
+	/*
+	 * After redo, check whether the backup pages associated with the WAL
+	 * record are consistent with the existing pages. This check is done only
+	 * if consistency check is enabled for this record.
+	 */
+	if ((record->xl_info & XLR_CHECK_CONSISTENCY) != 0)
+		checkXLogConsistency(xlogreader);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+
+	/*
+	 * Update lastReplayedEndRecPtr after this record has been successfully
+	 * replayed.
+	 */
+	SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+	XLogRecoveryCtl->lastReplayedEndRecPtr = EndRecPtr;
+	XLogRecoveryCtl->lastReplayedTLI = *replayTLI;
+	SpinLockRelease(&XLogRecoveryCtl->info_lck);
+
+	/* Also remember its starting position. */
+	LastReplayedReadRecPtr = ReadRecPtr;
+
+	/*
+	 * If rm_redo called XLogRequestWalReceiverReply, then we wake up the
+	 * receiver so that it notices the updated lastReplayedEndRecPtr and sends
+	 * a reply to the primary.
+	 */
+	if (doRequestWalReceiverReply)
+	{
+		doRequestWalReceiverReply = false;
+		WalRcvForceReply();
+	}
+
+	/* Allow read-only connections if we're consistent now */
+	CheckRecoveryConsistency();
+
+	/* Is this a timeline switch? */
+	if (switchedTLI)
+	{
+		/*
+		 * Before we continue on the new timeline, clean up any (possibly
+		 * bogus) future WAL segments on the old timeline.
+		 */
+		RemoveNonParentXlogFiles(EndRecPtr, *replayTLI);
+
+		/*
+		 * Wake up any walsenders to notice that we are on a new timeline.
+		 */
+		if (AllowCascadeReplication())
+			WalSndWakeup();
+	}
+}
+
 /*
  * Error context callback for errors occurring during rm_redo().
  */
-- 
2.30.2

