From f00da4131ee46e0bab5d7fc715378eb6c2a7f6d1 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Fri, 17 Dec 2021 10:47:30 +0200
Subject: [PATCH v9 4/5] Handle some XLOG record types directly in
 xlogrecovery.c

This eliminates the need for the HandleBackupEndRecord() callback from
xlog.c to xlogrecovery.c.
---
 src/backend/access/transam/xlog.c         |  59 ++----------
 src/backend/access/transam/xlogrecovery.c | 111 +++++++++++++++++-----
 src/include/access/xlogrecovery.h         |   2 -
 3 files changed, 93 insertions(+), 79 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 1a53b0d571d..5adb55e1bfd 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -662,8 +662,6 @@ static void CleanupAfterArchiveRecovery(TimeLineID EndOfLogTLI,
 										TimeLineID newTLI);
 static void CheckRequiredParameterValues(void);
 static void XLogReportParameters(void);
-static void VerifyOverwriteContrecord(xl_overwrite_contrecord *xlrec,
-									  XLogReaderState *state);
 static int LocalSetXLogInsertAllowed(void);
 static void CreateEndOfRecoveryRecord(void);
 static XLogRecPtr CreateOverwriteContrecordRecord(XLogRecPtr aborted_lsn,
@@ -7493,6 +7491,9 @@ UpdateFullPageWrites(void)
  *
  * Definitions of info values are in include/catalog/pg_control.h, though
  * not all record types are related to control file updates.
+ *
+ * NOTE: Some XLOG record types that rare directly related to WAL recovery
+ * are handled in xlogrecovery_redo().
  */
 void
 xlog_redo(XLogReaderState *record)
@@ -7681,33 +7682,11 @@ xlog_redo(XLogReaderState *record)
 	}
 	else if (info == XLOG_OVERWRITE_CONTRECORD)
 	{
-		xl_overwrite_contrecord xlrec;
-
-		memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_overwrite_contrecord));
-		VerifyOverwriteContrecord(&xlrec, record);
+		/* nothing to do here, handled in xlogrecovery_redo() */
 	}
 	else if (info == XLOG_END_OF_RECOVERY)
 	{
-		xl_end_of_recovery xlrec;
-		TimeLineID	replayTLI;
-
-		memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_end_of_recovery));
-
-		/*
-		 * For Hot Standby, we could treat this like a Shutdown Checkpoint,
-		 * but this case is rarer and harder to test, so the benefit doesn't
-		 * outweigh the potential extra cost of maintenance.
-		 */
-
-		/*
-		 * We should've already switched to the new TLI before replaying this
-		 * record.
-		 */
-		(void) GetCurrentReplayRecPtr(&replayTLI);
-		if (xlrec.ThisTimeLineID != replayTLI)
-			ereport(PANIC,
-					(errmsg("unexpected timeline ID %u (should be %u) in checkpoint record",
-							xlrec.ThisTimeLineID, replayTLI)));
+		/* nothing to do here, handled in xlogrecovery_redo() */
 	}
 	else if (info == XLOG_NOOP)
 	{
@@ -7719,7 +7698,7 @@ xlog_redo(XLogReaderState *record)
 	}
 	else if (info == XLOG_RESTORE_POINT)
 	{
-		/* nothing to do here */
+		/* nothing to do here, handled in xlogrecovery.c */
 	}
 	else if (info == XLOG_FPI || info == XLOG_FPI_FOR_HINT)
 	{
@@ -7757,11 +7736,7 @@ xlog_redo(XLogReaderState *record)
 	}
 	else if (info == XLOG_BACKUP_END)
 	{
-		XLogRecPtr	startpoint;
-
-		memcpy(&startpoint, XLogRecGetData(record), sizeof(startpoint));
-
-		HandleBackupEndRecord(startpoint, lsn);
+		/* nothing to do here, handled in xlogrecovery_redo() */
 	}
 	else if (info == XLOG_PARAMETER_CHANGE)
 	{
@@ -7835,26 +7810,6 @@ xlog_redo(XLogReaderState *record)
 	}
 }
 
-/*
- * Verify the payload of a XLOG_OVERWRITE_CONTRECORD record.
- */
-static void
-VerifyOverwriteContrecord(xl_overwrite_contrecord *xlrec, XLogReaderState *state)
-{
-	if (xlrec->overwritten_lsn != state->overwrittenRecPtr)
-		elog(FATAL, "mismatching overwritten LSN %X/%X -> %X/%X",
-			 LSN_FORMAT_ARGS(xlrec->overwritten_lsn),
-			 LSN_FORMAT_ARGS(state->overwrittenRecPtr));
-
-	ereport(LOG,
-			(errmsg("successfully skipped missing contrecord at %X/%X, overwritten at %s",
-					LSN_FORMAT_ARGS(xlrec->overwritten_lsn),
-					timestamptz_to_str(xlrec->overwrite_time))));
-
-	/* Verifying the record should only happen once */
-	state->overwrittenRecPtr = InvalidXLogRecPtr;
-}
-
 /*
  * Return the (possible) sync flag used for opening a file, depending on the
  * value of the GUC wal_sync_method.
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index e3925480aa4..b9fb61d1dbb 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -381,6 +381,7 @@ static bool read_backup_label(XLogRecPtr *checkPointLoc,
 							  bool *backupEndRequired, bool *backupFromStandby);
 static bool read_tablespace_map(List **tablespaces);
 
+static void xlogrecovery_redo(XLogReaderState *record, TimeLineID replayTLI);
 static void CheckRecoveryConsistency(void);
 static void rm_redo_error_callback(void *arg);
 #ifdef WAL_DEBUG
@@ -1738,6 +1739,13 @@ PerformWalRecovery(void)
 				TransactionIdIsValid(record->xl_xid))
 				RecordKnownAssignedTransactionIds(record->xl_xid);
 
+			/*
+			 * Some XLOG record types that are related to recovery are
+			 * processed directly here, rather than in xlog_redo()
+			 */
+			if (record->xl_rmid == RM_XLOG_ID)
+				xlogrecovery_redo(xlogreader, replayTLI);
+
 			/* Now apply the WAL record itself */
 			RmgrTable[record->xl_rmid].rm_redo(xlogreader);
 
@@ -1881,6 +1889,84 @@ PerformWalRecovery(void)
 				(errmsg("recovery ended before configured recovery target was reached")));
 }
 
+/*
+ * Some XLOG RM record types that are directly related to WAL recovery are
+ * handled here rather than in the xlog_redo()
+ */
+static void
+xlogrecovery_redo(XLogReaderState *record, TimeLineID replayTLI)
+{
+	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+	XLogRecPtr	lsn = record->EndRecPtr;
+
+	Assert(XLogRecGetRmid(record) == RM_XLOG_ID);
+
+	if (info == XLOG_OVERWRITE_CONTRECORD)
+	{
+		/* Verify the payload of a XLOG_OVERWRITE_CONTRECORD record. */
+		xl_overwrite_contrecord xlrec;
+
+		memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_overwrite_contrecord));
+		if (xlrec.overwritten_lsn != record->overwrittenRecPtr)
+			elog(FATAL, "mismatching overwritten LSN %X/%X -> %X/%X",
+				 LSN_FORMAT_ARGS(xlrec.overwritten_lsn),
+				 LSN_FORMAT_ARGS(record->overwrittenRecPtr));
+
+		ereport(LOG,
+				(errmsg("successfully skipped missing contrecord at %X/%X, overwritten at %s",
+						LSN_FORMAT_ARGS(xlrec.overwritten_lsn),
+						timestamptz_to_str(xlrec.overwrite_time))));
+
+		/* Verifying the record should only happen once */
+		record->overwrittenRecPtr = InvalidXLogRecPtr;
+	}
+	else if (info == XLOG_END_OF_RECOVERY)
+	{
+		xl_end_of_recovery xlrec;
+
+		memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_end_of_recovery));
+
+		/*
+		 * For Hot Standby, we could treat this like a Shutdown Checkpoint,
+		 * but this case is rarer and harder to test, so the benefit doesn't
+		 * outweigh the potential extra cost of maintenance.
+		 */
+
+		/*
+		 * We should've already switched to the new TLI before replaying this
+		 * record.
+		 */
+		if (xlrec.ThisTimeLineID != replayTLI)
+			ereport(PANIC,
+					(errmsg("unexpected timeline ID %u (should be %u) in checkpoint record",
+							xlrec.ThisTimeLineID, replayTLI)));
+	}
+	else if (info == XLOG_BACKUP_END)
+	{
+		XLogRecPtr	startpoint;
+
+		memcpy(&startpoint, XLogRecGetData(record), sizeof(startpoint));
+
+		if (backupStartPoint == startpoint)
+		{
+			/*
+			 * We have reached the end of base backup, the point where
+			 * pg_stop_backup() was done.  The data on disk is now consistent
+			 * (assuming we have also reached minRecoveryPoint).  Set
+			 * backupEndPoint to the current LSN, so that the next call to
+			 * CheckRecoveryConsistency() will notice it and do the
+			 * end-of-backup processing.
+			 */
+			elog(DEBUG1, "end of backup record reached");
+
+			backupEndPoint = lsn;
+		}
+		else
+			elog(DEBUG1, "saw end-of-backup record for backup starting at %X/%X, waiting for %X/%X",
+				 LSN_FORMAT_ARGS(startpoint), LSN_FORMAT_ARGS(backupStartPoint));
+	}
+}
+
 /*
  * Checks if recovery has reached a consistent state. When consistency is
  * reached and we have a valid starting standby snapshot, tell postmaster
@@ -3780,31 +3866,6 @@ ReadCheckpointRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr,
 	return record;
 }
 
-/*
- * Called when we see an end-of-backup record.
- */
-void
-HandleBackupEndRecord(XLogRecPtr startpoint, XLogRecPtr endLsn)
-{
-	if (backupStartPoint == startpoint)
-	{
-		/*
-		 * We have reached the end of base backup, the point where
-		 * pg_stop_backup() was done.  The data on disk is now consistent
-		 * (assuming we have also reached minRecoveryPoint).  Set
-		 * backupEndPoint to the current LSN, so that the next call to
-		 * CheckRecoveryConsistency() will notice it and do the end-of-backup
-		 * processing.
-		 */
-		elog(DEBUG1, "end of backup record reached");
-
-		backupEndPoint = endLsn;
-	}
-	else
-		elog(DEBUG1, "saw end-of-backup record for backup starting at %X/%X, waiting for %X/%X",
-			 LSN_FORMAT_ARGS(startpoint), LSN_FORMAT_ARGS(backupStartPoint));
-}
-
 /*
  * Scan for new timelines that might have appeared in the archive since we
  * started recovery.
diff --git a/src/include/access/xlogrecovery.h b/src/include/access/xlogrecovery.h
index 566e264a5ce..06e5f4b954e 100644
--- a/src/include/access/xlogrecovery.h
+++ b/src/include/access/xlogrecovery.h
@@ -127,8 +127,6 @@ extern EndOfWalRecoveryInfo *FinishWalRecovery(void);
 extern void ShutdownWalRecovery(void);
 extern void RemovePromoteSignalFiles(void);
 
-extern void HandleBackupEndRecord(XLogRecPtr startpoint, XLogRecPtr endLsn);
-
 extern bool HotStandbyActive(void);
 extern XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI);
 extern RecoveryPauseState GetRecoveryPauseState(void);
-- 
2.30.2

