From 6a2475aec9a871def5f194058f62f3f6991777e9 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyoga.ntt@gmail.com>
Date: Thu, 25 Jun 2020 08:50:54 +0900
Subject: [PATCH] Avoid to archive immature records

For a segment-spanning record, if primary crashes after the first
segment is archived and before finishing the full record, crash
recovery causes the last record of the first segment overwritten and
history diverges between pg_wal and archive. Avoid that corruption by
preventing immature records from being archived.  Prevent walsender
from sending immature records for the same reason.
---
 src/backend/access/transam/xlog.c   | 124 +++++++++++++++++++++++++++-
 src/backend/replication/walsender.c |  14 ++--
 src/include/access/xlog.h           |   1 +
 3 files changed, 131 insertions(+), 8 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index a1256a103b..b3d49fbc8b 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -724,6 +724,16 @@ typedef struct XLogCtlData
 	 */
 	XLogRecPtr	lastFpwDisableRecPtr;
 
+	/* The last segment notified to be archived. Protected by WALWriteLock */
+	XLogSegNo	lastNotifiedSeg;
+
+	/*
+	 * Remember the range of the last segment-spanning record. Protected by
+	 * info_lck
+	 */
+	XLogRecPtr	lastSegContRecStart;
+	XLogRecPtr	lastSegContRecEnd;
+
 	slock_t		info_lck;		/* locks shared variables shown above */
 } XLogCtlData;
 
@@ -1158,6 +1168,9 @@ XLogInsertRecord(XLogRecData *rdata,
 	 */
 	if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
 	{
+		XLogSegNo startseg;
+		XLogSegNo endseg;
+
 		SpinLockAcquire(&XLogCtl->info_lck);
 		/* advance global request to include new block(s) */
 		if (XLogCtl->LogwrtRqst.Write < EndPos)
@@ -1165,6 +1178,21 @@ XLogInsertRecord(XLogRecData *rdata,
 		/* update local result copy while I have the chance */
 		LogwrtResult = XLogCtl->LogwrtResult;
 		SpinLockRelease(&XLogCtl->info_lck);
+
+		/* Remember the range of the record if it spans over segments */
+		XLByteToSeg(StartPos, startseg, wal_segment_size);
+		XLByteToPrevSeg(EndPos, endseg, wal_segment_size);
+
+		if (startseg != endseg)
+		{
+			SpinLockAcquire(&XLogCtl->info_lck);
+			if (XLogCtl->lastSegContRecEnd < StartPos)
+			{
+				XLogCtl->lastSegContRecStart = StartPos;
+				XLogCtl->lastSegContRecEnd = EndPos;
+			}
+			SpinLockRelease(&XLogCtl->info_lck);
+		}
 	}
 
 	/*
@@ -2396,6 +2424,56 @@ XLogCheckpointNeeded(XLogSegNo new_segno)
 	return false;
 }
 
+/*
+ * Notify segments that are surely stable.
+ *
+ * If the last segment in pg_wal is complete and ended with a continuation
+ * record, crash recovery results in a diverged historiy from archive.  Don't
+ * archive a segment until the whole record is finished writing.
+ */
+static void
+NotifyStableSegments(XLogSegNo notifySegNo)
+{
+	XLogRecPtr	archiveTargetRecPtr;
+	XLogSegNo i;
+
+	if (XLogCtl->lastNotifiedSeg < notifySegNo)
+	{
+		XLogRecPtr lastSegContRecStart;
+		XLogRecPtr lastSegContRecEnd;
+		XLogSegNo	notifyUpTo = 0;
+
+		SpinLockAcquire(&XLogCtl->info_lck);
+		lastSegContRecStart = XLogCtl->lastSegContRecStart;
+		lastSegContRecEnd = XLogCtl->lastSegContRecEnd;
+		SpinLockRelease(&XLogCtl->info_lck);
+
+		/*
+		 * Use start position of the last segmenet-spanning continuation record
+		 * when the record is not flushed completely.
+		 */
+		if (lastSegContRecStart < LogwrtResult.Flush &&
+			LogwrtResult.Flush <= lastSegContRecEnd)
+			archiveTargetRecPtr = lastSegContRecStart;
+		else
+			archiveTargetRecPtr = LogwrtResult.Flush;
+
+		XLByteToSeg(archiveTargetRecPtr, notifyUpTo, wal_segment_size);
+
+		/* back to the last complete segment */
+		notifyUpTo--;
+
+		/* cap by given segment */
+		if (notifyUpTo > notifySegNo)
+			notifyUpTo = notifySegNo;
+
+		for (i = XLogCtl->lastNotifiedSeg + 1 ; i <= notifyUpTo ; i++)
+			XLogArchiveNotifySeg(i);
+
+		XLogCtl->lastNotifiedSeg = notifyUpTo;
+	}
+}
+
 /*
  * Write and/or fsync the log at least as far as WriteRqst indicates.
  *
@@ -2583,7 +2661,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 				LogwrtResult.Flush = LogwrtResult.Write;	/* end of page */
 
 				if (XLogArchivingActive())
-					XLogArchiveNotifySeg(openLogSegNo);
+					NotifyStableSegments(openLogSegNo);
 
 				XLogCtl->lastSegSwitchTime = (pg_time_t) time(NULL);
 				XLogCtl->lastSegSwitchLSN = LogwrtResult.Flush;
@@ -2653,6 +2731,10 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 		WalSndWakeupRequest();
 
 		LogwrtResult.Flush = LogwrtResult.Write;
+
+		/* Now the record is fully written, try to notify stable segments */
+		if (XLogArchivingActive())
+			NotifyStableSegments(openLogSegNo - 1);
 	}
 
 	/*
@@ -7703,6 +7785,18 @@ StartupXLOG(void)
 	XLogCtl->LogwrtRqst.Write = EndOfLog;
 	XLogCtl->LogwrtRqst.Flush = EndOfLog;
 
+	/*
+	 * We have archived up to the previous segment of EndOfLog so far.
+	 * Initialize lastNotifiedSeg if needed.
+	 */
+	if (XLogArchivingActive())
+	{
+		XLogSegNo	endLogSegNo;
+
+		XLByteToSeg(EndOfLog, endLogSegNo, wal_segment_size);
+		XLogCtl->lastNotifiedSeg = endLogSegNo - 1;
+	}
+
 	/*
 	 * Update full_page_writes in shared memory and write an XLOG_FPW_CHANGE
 	 * record before resource manager writes cleanup WAL records or checkpoint
@@ -8426,6 +8520,34 @@ GetFlushRecPtr(void)
 	return LogwrtResult.Flush;
 }
 
+/*
+ * GetReplicationTargetRecPtr -- Returns the latest position that can be
+ * replicated.  WAL records up to this position won't be overwritten even after
+ * a crash of primary.
+ */
+XLogRecPtr
+GetReplicationTargetRecPtr(void)
+{
+	XLogRecPtr lastSegContRecStart;
+	XLogRecPtr lastSegContRecEnd;
+
+	SpinLockAcquire(&XLogCtl->info_lck);
+	LogwrtResult = XLogCtl->LogwrtResult;
+	lastSegContRecStart = XLogCtl->lastSegContRecStart;
+	lastSegContRecEnd = XLogCtl->lastSegContRecEnd;
+	SpinLockRelease(&XLogCtl->info_lck);
+
+	/*
+	 * Use start position of the last segmenet-spanning continuation record
+	 * when the record is not flushed completely.
+	 */
+	if (lastSegContRecStart < LogwrtResult.Flush &&
+		LogwrtResult.Flush <= lastSegContRecEnd)
+		return lastSegContRecStart;
+
+	return LogwrtResult.Flush;
+}
+
 /*
  * GetLastImportantRecPtr -- Returns the LSN of the last important record
  * inserted. All records not explicitly marked as unimportant are considered
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index e2477c47e0..c5682a8836 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2630,14 +2630,14 @@ XLogSendPhysical(void)
 		/*
 		 * Streaming the current timeline on a master.
 		 *
-		 * Attempt to send all data that's already been written out and
-		 * fsync'd to disk.  We cannot go further than what's been written out
-		 * given the current implementation of WALRead().  And in any case
-		 * it's unsafe to send WAL that is not securely down to disk on the
-		 * master: if the master subsequently crashes and restarts, standbys
-		 * must not have applied any WAL that got lost on the master.
+		 * Attempt to send all data that's can be replicated.  We cannot go
+		 * further than what's been written out given the current
+		 * implementation of WALRead().  And in any case it's unsafe to send
+		 * WAL that is not securely down to disk on the master: if the master
+		 * subsequently crashes and restarts, standbys must not have applied
+		 * any WAL that got lost on the master.
 		 */
-		SendRqstPtr = GetFlushRecPtr();
+		SendRqstPtr = GetReplicationTargetRecPtr();
 	}
 
 	/*
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 347a38f57c..ef21418093 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -335,6 +335,7 @@ extern void GetFullPageWriteInfo(XLogRecPtr *RedoRecPtr_p, bool *doPageWrites_p)
 extern XLogRecPtr GetRedoRecPtr(void);
 extern XLogRecPtr GetInsertRecPtr(void);
 extern XLogRecPtr GetFlushRecPtr(void);
+extern XLogRecPtr GetReplicationTargetRecPtr(void);
 extern XLogRecPtr GetLastImportantRecPtr(void);
 extern void RemovePromoteSignalFiles(void);
 
-- 
2.18.4

