From faa1ebbb5a4d0959e72ecd750fecf803d4acbe59 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Sat, 31 Jul 2021 15:06:39 +0300
Subject: [PATCH v4 3/3] Move code to apply one WAL record to a subroutine.

---
 src/backend/access/transam/xlogrecovery.c | 283 +++++++++++-----------
 1 file changed, 148 insertions(+), 135 deletions(-)

diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 6030d6fe819..85909c9b686 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -366,6 +366,7 @@ static char recoveryStopName[MAXFNAMELEN];
 static bool recoveryStopAfter;
 
 /* prototypes for local functions */
+static void ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record);
 static void xlog_block_info(StringInfo buf, XLogReaderState *record);
 
 static void readRecoverySignalFile(void);
@@ -1374,11 +1375,8 @@ PerformWalRecovery(void)
 
 	if (record != NULL)
 	{
-		ErrorContextCallback errcallback;
 		TimestampTz xtime;
 		PGRUsage	ru0;
-		XLogRecPtr	ReadRecPtr;
-		XLogRecPtr	EndRecPtr;
 
 		pg_rusage_init(&ru0);
 
@@ -1400,11 +1398,6 @@ PerformWalRecovery(void)
 		 */
 		do
 		{
-			bool		switchedTLI = false;
-
-			ReadRecPtr = xlogreader->ReadRecPtr;
-			EndRecPtr = xlogreader->EndRecPtr;
-
 #ifdef WAL_DEBUG
 			if (XLOG_DEBUG ||
 				(rmid == RM_XACT_ID && trace_recovery_messages <= DEBUG2) ||
@@ -1414,8 +1407,8 @@ PerformWalRecovery(void)
 
 				initStringInfo(&buf);
 				appendStringInfo(&buf, "REDO @ %X/%X; LSN %X/%X: ",
-								 LSN_FORMAT_ARGS(ReadRecPtr),
-								 LSN_FORMAT_ARGS(EndRecPtr));
+								 LSN_FORMAT_ARGS(xlogreader->ReadRecPtr),
+								 LSN_FORMAT_ARGS(xlogreader->EndRecPtr));
 				xlog_outrec(&buf, xlogreader);
 				appendStringInfoString(&buf, " - ");
 				xlog_outdesc(&buf, xlogreader);
@@ -1470,132 +1463,10 @@ PerformWalRecovery(void)
 					recoveryPausesHere(false);
 			}
 
-			/* Setup error traceback support for ereport() */
-			errcallback.callback = rm_redo_error_callback;
-			errcallback.arg = (void *) xlogreader;
-			errcallback.previous = error_context_stack;
-			error_context_stack = &errcallback;
-
 			/*
-			 * ShmemVariableCache->nextXid must be beyond record's xid.
+			 * Apply the record
 			 */
-			AdvanceNextFullTransactionIdPastXid(record->xl_xid);
-
-			/*
-			 * Before replaying this record, check if this record causes the
-			 * current timeline to change. The record is already considered to
-			 * be part of the new timeline, so we update ThisTimeLineID before
-			 * replaying it. That's important so that replayEndTLI, which is
-			 * recorded as the minimum recovery point's TLI if recovery stops
-			 * after this record, is set correctly.
-			 */
-			if (record->xl_rmid == RM_XLOG_ID)
-			{
-				TimeLineID	newTLI = ThisTimeLineID;
-				TimeLineID	prevTLI = ThisTimeLineID;
-				uint8		info = record->xl_info & ~XLR_INFO_MASK;
-
-				if (info == XLOG_CHECKPOINT_SHUTDOWN)
-				{
-					CheckPoint	checkPoint;
-
-					memcpy(&checkPoint, XLogRecGetData(xlogreader), sizeof(CheckPoint));
-					newTLI = checkPoint.ThisTimeLineID;
-					prevTLI = checkPoint.PrevTimeLineID;
-				}
-				else if (info == XLOG_END_OF_RECOVERY)
-				{
-					xl_end_of_recovery xlrec;
-
-					memcpy(&xlrec, XLogRecGetData(xlogreader), sizeof(xl_end_of_recovery));
-					newTLI = xlrec.ThisTimeLineID;
-					prevTLI = xlrec.PrevTimeLineID;
-				}
-
-				if (newTLI != ThisTimeLineID)
-				{
-					/* Check that it's OK to switch to this TLI */
-					checkTimeLineSwitch(EndRecPtr, newTLI, prevTLI);
-
-					/* Following WAL records should be run with new TLI */
-					ThisTimeLineID = newTLI;
-					switchedTLI = true;
-				}
-			}
-
-			/*
-			 * Update shared replayEndRecPtr before replaying this record, so
-			 * that XLogFlush will update minRecoveryPoint correctly.
-			 */
-			SpinLockAcquire(&XLogRecCtl->info_lck);
-			XLogRecCtl->replayEndRecPtr = EndRecPtr;
-			XLogRecCtl->replayEndTLI = ThisTimeLineID;
-			SpinLockRelease(&XLogRecCtl->info_lck);
-
-			/*
-			 * If we are attempting to enter Hot Standby mode, process XIDs we
-			 * see
-			 */
-			if (standbyState >= STANDBY_INITIALIZED &&
-				TransactionIdIsValid(record->xl_xid))
-				RecordKnownAssignedTransactionIds(record->xl_xid);
-
-			/* Now apply the WAL record itself */
-			RmgrTable[record->xl_rmid].rm_redo(xlogreader);
-
-			/*
-			 * After redo, check whether the backup pages associated with the
-			 * WAL record are consistent with the existing pages. This check
-			 * is done only if consistency check is enabled for this record.
-			 */
-			if ((record->xl_info & XLR_CHECK_CONSISTENCY) != 0)
-				checkXLogConsistency(xlogreader);
-
-			/* Pop the error context stack */
-			error_context_stack = errcallback.previous;
-
-			/*
-			 * Update lastReplayedEndRecPtr after this record has been
-			 * successfully replayed.
-			 */
-			SpinLockAcquire(&XLogRecCtl->info_lck);
-			XLogRecCtl->lastReplayedEndRecPtr = EndRecPtr;
-			XLogRecCtl->lastReplayedTLI = ThisTimeLineID;
-			SpinLockRelease(&XLogRecCtl->info_lck);
-
-			/* Also remember its starting position. */
-			LastReplayedReadRecPtr = ReadRecPtr;
-
-			/*
-			 * If rm_redo called XLogRequestWalReceiverReply, then we wake up
-			 * the receiver so that it notices the updated
-			 * lastReplayedEndRecPtr and sends a reply to the primary.
-			 */
-			if (doRequestWalReceiverReply)
-			{
-				doRequestWalReceiverReply = false;
-				WalRcvForceReply();
-			}
-
-			/* Allow read-only connections if we're consistent now */
-			CheckRecoveryConsistency();
-
-			/* Is this a timeline switch? */
-			if (switchedTLI)
-			{
-				/*
-				 * Before we continue on the new timeline, clean up any
-				 * (possibly bogus) future WAL segments on the old timeline.
-				 */
-				RemoveNonParentXlogFiles(EndRecPtr, ThisTimeLineID);
-
-				/*
-				 * Wake up any walsenders to notice that we are on a new
-				 * timeline.
-				 */
-				if (AllowCascadeReplication())
-					WalSndWakeup();
-			}
+			ApplyWalRecord(xlogreader, record);
 
 			/* Exit loop if we reached inclusive recovery target */
 			if (recoveryStopsAfter(xlogreader))
@@ -1654,7 +1525,7 @@ PerformWalRecovery(void)
 
 		ereport(LOG,
 				(errmsg("redo done at %X/%X system usage: %s",
-						LSN_FORMAT_ARGS(ReadRecPtr),
+						LSN_FORMAT_ARGS(xlogreader->ReadRecPtr),
 						pg_rusage_show(&ru0))));
 		xtime = GetLatestXTime();
 		if (xtime)
@@ -1683,6 +1554,148 @@ PerformWalRecovery(void)
 				(errmsg("recovery ended before configured recovery target was reached")));
 }
 
+/*
+ * Subroutine of PerformWalRecovery, to apply one WAL record.
+ */
+static void
+ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record)
+{
+	XLogRecPtr	ReadRecPtr;
+	XLogRecPtr	EndRecPtr;
+	ErrorContextCallback errcallback;
+	bool		switchedTLI = false;
+
+	ReadRecPtr = xlogreader->ReadRecPtr;
+	EndRecPtr = xlogreader->EndRecPtr;
+
+	/* Setup error traceback support for ereport() */
+	errcallback.callback = rm_redo_error_callback;
+	errcallback.arg = (void *) xlogreader;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/*
+	 * ShmemVariableCache->nextXid must be beyond record's xid.
+	 */
+	AdvanceNextFullTransactionIdPastXid(record->xl_xid);
+
+	/*
+	 * Before replaying this record, check if this record causes the
+	 * current timeline to change. The record is already considered to
+	 * be part of the new timeline, so we update ThisTimeLineID before
+	 * replaying it. That's important so that replayEndTLI, which is
+	 * recorded as the minimum recovery point's TLI if recovery stops
+	 * after this record, is set correctly.
+	 */
+	if (record->xl_rmid == RM_XLOG_ID)
+	{
+		TimeLineID	newTLI = ThisTimeLineID;
+		TimeLineID	prevTLI = ThisTimeLineID;
+		uint8		info = record->xl_info & ~XLR_INFO_MASK;
+
+		if (info == XLOG_CHECKPOINT_SHUTDOWN)
+		{
+			CheckPoint	checkPoint;
+
+			memcpy(&checkPoint, XLogRecGetData(xlogreader), sizeof(CheckPoint));
+			newTLI = checkPoint.ThisTimeLineID;
+			prevTLI = checkPoint.PrevTimeLineID;
+		}
+		else if (info == XLOG_END_OF_RECOVERY)
+		{
+			xl_end_of_recovery xlrec;
+
+			memcpy(&xlrec, XLogRecGetData(xlogreader), sizeof(xl_end_of_recovery));
+			newTLI = xlrec.ThisTimeLineID;
+			prevTLI = xlrec.PrevTimeLineID;
+		}
+
+		if (newTLI != ThisTimeLineID)
+		{
+			/* Check that it's OK to switch to this TLI */
+			checkTimeLineSwitch(EndRecPtr, newTLI, prevTLI);
+
+			/* Following WAL records should be run with new TLI */
+			ThisTimeLineID = newTLI;
+			switchedTLI = true;
+		}
+	}
+
+	/*
+	 * Update shared replayEndRecPtr before replaying this record, so
+	 * that XLogFlush will update minRecoveryPoint correctly.
+	 */
+	SpinLockAcquire(&XLogRecCtl->info_lck);
+	XLogRecCtl->replayEndRecPtr = EndRecPtr;
+	XLogRecCtl->replayEndTLI = ThisTimeLineID;
+	SpinLockRelease(&XLogRecCtl->info_lck);
+
+	/*
+	 * If we are attempting to enter Hot Standby mode, process XIDs we
+	 * see
+	 */
+	if (standbyState >= STANDBY_INITIALIZED &&
+		TransactionIdIsValid(record->xl_xid))
+		RecordKnownAssignedTransactionIds(record->xl_xid);
+
+	/* Now apply the WAL record itself */
+	RmgrTable[record->xl_rmid].rm_redo(xlogreader);
+
+	/*
+	 * After redo, check whether the backup pages associated with the
+	 * WAL record are consistent with the existing pages. This check
+	 * is done only if consistency check is enabled for this record.
+	 */
+	if ((record->xl_info & XLR_CHECK_CONSISTENCY) != 0)
+		checkXLogConsistency(xlogreader);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+
+	/*
+	 * Update lastReplayedEndRecPtr after this record has been
+	 * successfully replayed.
+	 */
+	SpinLockAcquire(&XLogRecCtl->info_lck);
+	XLogRecCtl->lastReplayedEndRecPtr = EndRecPtr;
+	XLogRecCtl->lastReplayedTLI = ThisTimeLineID;
+	SpinLockRelease(&XLogRecCtl->info_lck);
+
+	/* Also remember its starting position. */
+	LastReplayedReadRecPtr = ReadRecPtr;
+
+	/*
+	 * If rm_redo called XLogRequestWalReceiverReply, then we wake up
+	 * the receiver so that it notices the updated
+	 * lastReplayedEndRecPtr and sends a reply to the primary.
+	 */
+	if (doRequestWalReceiverReply)
+	{
+		doRequestWalReceiverReply = false;
+		WalRcvForceReply();
+	}
+
+	/* Allow read-only connections if we're consistent now */
+	CheckRecoveryConsistency();
+
+	/* Is this a timeline switch? */
+	if (switchedTLI)
+	{
+		/*
+		 * Before we continue on the new timeline, clean up any
+		 * (possibly bogus) future WAL segments on the old timeline.
+		 */
+		RemoveNonParentXlogFiles(EndRecPtr, ThisTimeLineID);
+
+		/*
+		 * Wake up any walsenders to notice that we are on a new
+		 * timeline.
+		 */
+		if (AllowCascadeReplication())
+			WalSndWakeup();
+	}
+}
+
 /*
  * Error context callback for errors occurring during rm_redo().
  */
-- 
2.30.2

