From 16a86cbc1e3cfd62b1c94ced1a67338ed12223cd Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Fri, 17 Dec 2021 12:04:24 +0200
Subject: [PATCH v9 1/5] Refactor setting XLP_FIRST_IS_OVERWRITE_CONTRECORD.

Set it directly in CreateOverwriteContrecordRecord(). That way,
AdvanceXLInsertBuffer() doesn't need the missingContrecPtr global
variable.
---
 src/backend/access/transam/xlog.c | 73 ++++++++++++++++++++++---------
 1 file changed, 53 insertions(+), 20 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 1e1fbe957fa..61b79fed30f 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -913,7 +913,9 @@ static void VerifyOverwriteContrecord(xl_overwrite_contrecord *xlrec,
 									  XLogReaderState *state);
 static int LocalSetXLogInsertAllowed(void);
 static void CreateEndOfRecoveryRecord(void);
-static XLogRecPtr CreateOverwriteContrecordRecord(XLogRecPtr aborted_lsn);
+static XLogRecPtr CreateOverwriteContrecordRecord(XLogRecPtr aborted_lsn,
+												  XLogRecPtr missingContrecPtr,
+												  TimeLineID newTLI);
 static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
 static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
 static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void);
@@ -2295,18 +2297,6 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
 		if (!Insert->forcePageWrites)
 			NewPage->xlp_info |= XLP_BKP_REMOVABLE;
 
-		/*
-		 * If a record was found to be broken at the end of recovery, and
-		 * we're going to write on the page where its first contrecord was
-		 * lost, set the XLP_FIRST_IS_OVERWRITE_CONTRECORD flag on the page
-		 * header.  See CreateOverwriteContrecordRecord().
-		 */
-		if (missingContrecPtr == NewPageBeginPtr)
-		{
-			NewPage->xlp_info |= XLP_FIRST_IS_OVERWRITE_CONTRECORD;
-			missingContrecPtr = InvalidXLogRecPtr;
-		}
-
 		/*
 		 * If first page of an XLOG segment file, make it a long header.
 		 */
@@ -8150,7 +8140,7 @@ StartupXLOG(void)
 	if (!XLogRecPtrIsInvalid(abortedRecPtr))
 	{
 		Assert(!XLogRecPtrIsInvalid(missingContrecPtr));
-		CreateOverwriteContrecordRecord(abortedRecPtr);
+		CreateOverwriteContrecordRecord(abortedRecPtr, missingContrecPtr, newTLI);
 		abortedRecPtr = InvalidXLogRecPtr;
 		missingContrecPtr = InvalidXLogRecPtr;
 	}
@@ -9531,27 +9521,70 @@ CreateEndOfRecoveryRecord(void)
  * skip the record it was reading, and pass back the LSN of the skipped
  * record, so that its caller can verify (on "replay" of that record) that the
  * XLOG_OVERWRITE_CONTRECORD matches what was effectively overwritten.
+ *
+ * 'aborted_lsn' is the beginning position of the record that was incomplete.
+ * It is included in the WAL record.  'pagePtr' and 'newTLI' point to the
+ * beginning of XLOG page where the record is to be inserted.  They must
+ * match the current WAL insert position, they're passed here just so that we
+ * can verify that.
  */
 static XLogRecPtr
-CreateOverwriteContrecordRecord(XLogRecPtr aborted_lsn)
+CreateOverwriteContrecordRecord(XLogRecPtr aborted_lsn, XLogRecPtr pagePtr,
+								TimeLineID newTLI)
 {
 	xl_overwrite_contrecord xlrec;
 	XLogRecPtr	recptr;
+	XLogPageHeader pagehdr;
+	XLogRecPtr	startPos;
 
-	/* sanity check */
+	/* sanity checks */
 	if (!RecoveryInProgress())
 		elog(ERROR, "can only be used at end of recovery");
-
-	xlrec.overwritten_lsn = aborted_lsn;
-	xlrec.overwrite_time = GetCurrentTimestamp();
+	if (pagePtr % XLOG_BLCKSZ != 0)
+		elog(ERROR, "invalid position for missing continuation record %X/%X",
+			 LSN_FORMAT_ARGS(pagePtr));
+
+	/* The current WAL insert position should be right after the page header */
+	startPos = pagePtr;
+	if (XLogSegmentOffset(startPos, wal_segment_size) == 0)
+		startPos += SizeOfXLogLongPHD;
+	else
+		startPos += SizeOfXLogShortPHD;
+	recptr = GetXLogInsertRecPtr();
+	if (recptr != startPos)
+		elog(ERROR, "invalid WAL insert position %X/%X for OVERWRITE_CONTRECORD",
+			 LSN_FORMAT_ARGS(recptr));
 
 	START_CRIT_SECTION();
 
+	/*
+	 * Initialize the XLOG page header (by GetXLogBuffer), and set the
+	 * XLP_FIRST_IS_OVERWRITE_CONTRECORD flag.
+	 *
+	 * No other backend is allowed to write WAL yet, so acquiring the WAL
+	 * insertion lock is just pro forma.
+	 */
+	WALInsertLockAcquire();
+	pagehdr = (XLogPageHeader) GetXLogBuffer(pagePtr, newTLI);
+	pagehdr->xlp_info |= XLP_FIRST_IS_OVERWRITE_CONTRECORD;
+	WALInsertLockRelease();
+
+	/*
+	 * Insert the XLOG_OVERWRITE_CONTRECORD record as the first record on the
+	 * page.  We know it becomes the first record, because no other backend is
+	 * allowed to write WAL yet.
+	 */
 	XLogBeginInsert();
+	xlrec.overwritten_lsn = aborted_lsn;
+	xlrec.overwrite_time = GetCurrentTimestamp();
 	XLogRegisterData((char *) &xlrec, sizeof(xl_overwrite_contrecord));
-
 	recptr = XLogInsert(RM_XLOG_ID, XLOG_OVERWRITE_CONTRECORD);
 
+	/* check that the record was inserted to the right place */
+	if (ProcLastRecPtr != startPos)
+		elog(ERROR, "OVERWRITE_CONTRECORD was inserted to unexpected position %X/%X",
+			 LSN_FORMAT_ARGS(ProcLastRecPtr));
+
 	XLogFlush(recptr);
 
 	END_CRIT_SECTION();
-- 
2.30.2

