From 2c5d347e9b6819ba59fa0e72e4227a2cb4d1230f Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: Mon, 23 Aug 2021 18:25:48 -0400
Subject: [PATCH] Don't stream non-final WAL segments

Avoid setting the physical-stream replication read pointer in the middle
of a WAL record.  This can occur if a record is split in two (or more)
across segment boundaries.  The reason to avoid it is that if we stream
the segment containing the first half, and then we crash before writing
the next segment, the primary will rewrite the tail of the segment with
a new WAL record (having discarded the incomplete record), but the
replica will be stuck trying to replay a broken file (since the next
segment will never contain the now-gone data).

To do this, change streaming replication to retreat the flush pointer
according to registered segment boundaries.
---
 src/backend/access/transam/xlog.c   | 39 ++++++++++++++++++++++++++---
 src/backend/replication/walsender.c |  2 +-
 src/include/access/xlog.h           |  1 +
 3 files changed, 37 insertions(+), 5 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 24165ab03e..2b85d1b2ae 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -731,8 +731,10 @@ typedef struct XLogCtlData
 	 */
 	XLogSegNo	lastNotifiedSeg;
 	XLogSegNo	earliestSegBoundary;
+	XLogRecPtr	earliestSegBoundaryStartPtr;
 	XLogRecPtr	earliestSegBoundaryEndPtr;
 	XLogSegNo	latestSegBoundary;
+	XLogRecPtr	latestSegBoundaryStartPtr;
 	XLogRecPtr	latestSegBoundaryEndPtr;
 
 	slock_t		segtrack_lck;	/* locks shared variables shown above */
@@ -932,7 +934,7 @@ static void RemoveXlogFile(const char *segname, XLogSegNo recycleSegNo,
 						   XLogSegNo *endlogSegNo);
 static void UpdateLastRemovedPtr(char *filename);
 static void ValidateXLOGDirectoryStructure(void);
-static void RegisterSegmentBoundary(XLogSegNo seg, XLogRecPtr pos);
+static void RegisterSegmentBoundary(XLogSegNo seg, XLogRecPtr startpos, XLogRecPtr endpos);
 static void CleanupBackupHistory(void);
 static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force);
 static XLogRecord *ReadRecord(XLogReaderState *xlogreader,
@@ -1183,11 +1185,11 @@ XLogInsertRecord(XLogRecData *rdata,
 		 *
 		 * Note that we did not use XLByteToPrevSeg() for determining the
 		 * ending segment.  This is so that a record that fits perfectly into
-		 * the end of the segment causes the latter to get marked ready for
+		 * the end of the segment causes said segment to get marked ready for
 		 * archival immediately.
 		 */
 		if (StartSeg != EndSeg && XLogArchivingActive())
-			RegisterSegmentBoundary(EndSeg, EndPos);
+			RegisterSegmentBoundary(EndSeg, StartPos, EndPos);
 
 		/*
 		 * Advance LogwrtRqst.Write so that it includes new block(s).
@@ -4398,7 +4400,7 @@ ValidateXLOGDirectoryStructure(void)
  * to delay until the end segment is known flushed.
  */
 static void
-RegisterSegmentBoundary(XLogSegNo seg, XLogRecPtr endpos)
+RegisterSegmentBoundary(XLogSegNo seg, XLogRecPtr startpos, XLogRecPtr endpos)
 {
 	XLogSegNo	segno PG_USED_FOR_ASSERTS_ONLY;
 
@@ -4415,6 +4417,7 @@ RegisterSegmentBoundary(XLogSegNo seg, XLogRecPtr endpos)
 	if (XLogCtl->earliestSegBoundary == MaxXLogSegNo)
 	{
 		XLogCtl->earliestSegBoundary = seg;
+		XLogCtl->earliestSegBoundaryStartPtr = startpos;
 		XLogCtl->earliestSegBoundaryEndPtr = endpos;
 	}
 	else if (seg > XLogCtl->earliestSegBoundary &&
@@ -4422,6 +4425,7 @@ RegisterSegmentBoundary(XLogSegNo seg, XLogRecPtr endpos)
 			  seg > XLogCtl->latestSegBoundary))
 	{
 		XLogCtl->latestSegBoundary = seg;
+		XLogCtl->latestSegBoundaryStartPtr = startpos;
 		XLogCtl->latestSegBoundaryEndPtr = endpos;
 	}
 
@@ -4481,15 +4485,18 @@ NotifySegmentsReadyForArchive(XLogRecPtr flushRecPtr)
 	if (keep_latest)
 	{
 		XLogCtl->earliestSegBoundary = XLogCtl->latestSegBoundary;
+		XLogCtl->earliestSegBoundaryStartPtr = XLogCtl->latestSegBoundaryStartPtr;
 		XLogCtl->earliestSegBoundaryEndPtr = XLogCtl->latestSegBoundaryEndPtr;
 	}
 	else
 	{
 		XLogCtl->earliestSegBoundary = MaxXLogSegNo;
+		XLogCtl->earliestSegBoundaryStartPtr = InvalidXLogRecPtr;
 		XLogCtl->earliestSegBoundaryEndPtr = InvalidXLogRecPtr;
 	}
 
 	XLogCtl->latestSegBoundary = MaxXLogSegNo;
+	XLogCtl->latestSegBoundaryStartPtr = InvalidXLogRecPtr;
 	XLogCtl->latestSegBoundaryEndPtr = InvalidXLogRecPtr;
 
 	SpinLockRelease(&XLogCtl->segtrack_lck);
@@ -8776,6 +8783,30 @@ GetFlushRecPtr(void)
 	return LogwrtResult.Flush;
 }
 
+/*
+ * GetSafeFlushRecPtr -- Returns a "safe" flush position.
+ *
+ * Similar to the above, except that avoid reporting a location that might be
+ * overwritten if there's a crash before syncing the next segment.
+ */
+XLogRecPtr
+GetSafeFlushRecPtr(void)
+{
+	XLogRecPtr		flush;
+
+	SpinLockAcquire(&XLogCtl->info_lck);
+	flush = XLogCtl->LogwrtResult.Flush;
+	SpinLockRelease(&XLogCtl->info_lck);
+
+	SpinLockAcquire(&XLogCtl->segtrack_lck);
+	if (XLogCtl->earliestSegBoundary != MaxXLogSegNo &&
+		XLogCtl->earliestSegBoundaryStartPtr < flush)
+		flush = XLogCtl->earliestSegBoundaryStartPtr;
+	SpinLockRelease(&XLogCtl->segtrack_lck);
+
+	return flush;
+}
+
 /*
  * GetLastImportantRecPtr -- Returns the LSN of the last important record
  * inserted. All records not explicitly marked as unimportant are considered
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 3ca2a11389..4c98fecdce 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2650,7 +2650,7 @@ XLogSendPhysical(void)
 		 * primary: if the primary subsequently crashes and restarts, standbys
 		 * must not have applied any WAL that got lost on the primary.
 		 */
-		SendRqstPtr = GetFlushRecPtr();
+		SendRqstPtr = GetSafeFlushRecPtr();
 	}
 
 	/*
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 6b6ae81c2d..1af59c36d4 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -313,6 +313,7 @@ extern void GetFullPageWriteInfo(XLogRecPtr *RedoRecPtr_p, bool *doPageWrites_p)
 extern XLogRecPtr GetRedoRecPtr(void);
 extern XLogRecPtr GetInsertRecPtr(void);
 extern XLogRecPtr GetFlushRecPtr(void);
+extern XLogRecPtr GetSafeFlushRecPtr(void);
 extern XLogRecPtr GetLastImportantRecPtr(void);
 extern void RemovePromoteSignalFiles(void);
 extern void NotifySegmentsReadyForArchive(XLogRecPtr flushRecPtr);
-- 
2.30.2

