From 8654ea7f2ed61de7ab3f0b305e37d190932ad97c Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Tue, 17 Mar 2020 15:28:08 +1300
Subject: [PATCH v7 1/4] Rationalize GetWalRcv{Write,Flush}RecPtr().

GetWalRcvWriteRecPtr() previously reported the latest *flushed*
location.  Adopt the conventional terminology used elsewhere in the tree
by renaming it to GetWalRcvFlushRecPtr(), and likewise for some related
variables that used the term "received".

Add a new definition of GetWalRcvWriteRecPtr(), which returns the latest
*written* value.  This will allow later patches to use the value for
non-data-integrity purposes, without having to wait for the flush
pointer to advance.

Reviewed-by: Alvaro Herrera <alvherre@2ndquadrant.com>
Reviewed-by: Andres Freund <andres@anarazel.de>
Discussion: https://postgr.es/m/CA%2BhUKGJ4VJN8ttxScUFM8dOKX0BrBiboo5uz1cq%3DAovOddfHpA%40mail.gmail.com
---
 src/backend/access/transam/xlog.c          | 20 +++++++++---------
 src/backend/access/transam/xlogfuncs.c     |  2 +-
 src/backend/replication/README             |  2 +-
 src/backend/replication/walreceiver.c      | 15 +++++++++-----
 src/backend/replication/walreceiverfuncs.c | 24 ++++++++++++++++------
 src/backend/replication/walsender.c        |  2 +-
 src/include/replication/walreceiver.h      | 18 ++++++++++++----
 7 files changed, 55 insertions(+), 28 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 8a4c1743e5..c60842ea03 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -209,8 +209,8 @@ HotStandbyState standbyState = STANDBY_DISABLED;
 
 static XLogRecPtr LastRec;
 
-/* Local copy of WalRcv->receivedUpto */
-static XLogRecPtr receivedUpto = 0;
+/* Local copy of WalRcv->flushedUpto */
+static XLogRecPtr flushedUpto = 0;
 static TimeLineID receiveTLI = 0;
 
 /*
@@ -9376,7 +9376,7 @@ CreateRestartPoint(int flags)
 	 * Retreat _logSegNo using the current end of xlog replayed or received,
 	 * whichever is later.
 	 */
-	receivePtr = GetWalRcvWriteRecPtr(NULL, NULL);
+	receivePtr = GetWalRcvFlushRecPtr(NULL, NULL);
 	replayPtr = GetXLogReplayRecPtr(&replayTLI);
 	endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr;
 	KeepLogSeg(endptr, &_logSegNo);
@@ -11869,7 +11869,7 @@ retry:
 	/* See if we need to retrieve more data */
 	if (readFile < 0 ||
 		(readSource == XLOG_FROM_STREAM &&
-		 receivedUpto < targetPagePtr + reqLen))
+		 flushedUpto < targetPagePtr + reqLen))
 	{
 		if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen,
 										 private->randAccess,
@@ -11900,10 +11900,10 @@ retry:
 	 */
 	if (readSource == XLOG_FROM_STREAM)
 	{
-		if (((targetPagePtr) / XLOG_BLCKSZ) != (receivedUpto / XLOG_BLCKSZ))
+		if (((targetPagePtr) / XLOG_BLCKSZ) != (flushedUpto / XLOG_BLCKSZ))
 			readLen = XLOG_BLCKSZ;
 		else
-			readLen = XLogSegmentOffset(receivedUpto, wal_segment_size) -
+			readLen = XLogSegmentOffset(flushedUpto, wal_segment_size) -
 				targetPageOff;
 	}
 	else
@@ -12318,7 +12318,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 						RequestXLogStreaming(tli, ptr, PrimaryConnInfo,
 											 PrimarySlotName,
 											 wal_receiver_create_temp_slot);
-						receivedUpto = 0;
+						flushedUpto = 0;
 					}
 
 					/*
@@ -12342,14 +12342,14 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 					 * XLogReceiptTime will not advance, so the grace time
 					 * allotted to conflicting queries will decrease.
 					 */
-					if (RecPtr < receivedUpto)
+					if (RecPtr < flushedUpto)
 						havedata = true;
 					else
 					{
 						XLogRecPtr	latestChunkStart;
 
-						receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart, &receiveTLI);
-						if (RecPtr < receivedUpto && receiveTLI == curFileTLI)
+						flushedUpto = GetWalRcvFlushRecPtr(&latestChunkStart, &receiveTLI);
+						if (RecPtr < flushedUpto && receiveTLI == curFileTLI)
 						{
 							havedata = true;
 							if (latestChunkStart <= RecPtr)
diff --git a/src/backend/access/transam/xlogfuncs.c b/src/backend/access/transam/xlogfuncs.c
index b84ba57259..00e1b33ed5 100644
--- a/src/backend/access/transam/xlogfuncs.c
+++ b/src/backend/access/transam/xlogfuncs.c
@@ -398,7 +398,7 @@ pg_last_wal_receive_lsn(PG_FUNCTION_ARGS)
 {
 	XLogRecPtr	recptr;
 
-	recptr = GetWalRcvWriteRecPtr(NULL, NULL);
+	recptr = GetWalRcvFlushRecPtr(NULL, NULL);
 
 	if (recptr == 0)
 		PG_RETURN_NULL();
diff --git a/src/backend/replication/README b/src/backend/replication/README
index 0cbb990613..8ccdd86e74 100644
--- a/src/backend/replication/README
+++ b/src/backend/replication/README
@@ -54,7 +54,7 @@ and WalRcvData->slotname, and initializes the starting point in
 WalRcvData->receiveStart.
 
 As walreceiver receives WAL from the master server, and writes and flushes
-it to disk (in pg_wal), it updates WalRcvData->receivedUpto and signals
+it to disk (in pg_wal), it updates WalRcvData->flushedUpto and signals
 the startup process to know how far WAL replay can advance.
 
 Walreceiver sends information about replication progress to the master server
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index aee67c61aa..d69fb90132 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -12,7 +12,7 @@
  * in the primary server), and then keeps receiving XLOG records and
  * writing them to the disk as long as the connection is alive. As XLOG
  * records are received and flushed to disk, it updates the
- * WalRcv->receivedUpto variable in shared memory, to inform the startup
+ * WalRcv->flushedUpto variable in shared memory, to inform the startup
  * process of how far it can proceed with XLOG replay.
  *
  * A WAL receiver cannot directly load GUC parameters used when establishing
@@ -261,6 +261,8 @@ WalReceiverMain(void)
 
 	SpinLockRelease(&walrcv->mutex);
 
+	pg_atomic_init_u64(&WalRcv->writtenUpto, 0);
+
 	/* Arrange to clean up at walreceiver exit */
 	on_shmem_exit(WalRcvDie, 0);
 
@@ -984,6 +986,9 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
 
 		LogstreamResult.Write = recptr;
 	}
+
+	/* Update shared-memory status */
+	pg_atomic_write_u64(&WalRcv->writtenUpto, LogstreamResult.Write);
 }
 
 /*
@@ -1005,10 +1010,10 @@ XLogWalRcvFlush(bool dying)
 
 		/* Update shared-memory status */
 		SpinLockAcquire(&walrcv->mutex);
-		if (walrcv->receivedUpto < LogstreamResult.Flush)
+		if (walrcv->flushedUpto < LogstreamResult.Flush)
 		{
-			walrcv->latestChunkStart = walrcv->receivedUpto;
-			walrcv->receivedUpto = LogstreamResult.Flush;
+			walrcv->latestChunkStart = walrcv->flushedUpto;
+			walrcv->flushedUpto = LogstreamResult.Flush;
 			walrcv->receivedTLI = ThisTimeLineID;
 		}
 		SpinLockRelease(&walrcv->mutex);
@@ -1361,7 +1366,7 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
 	state = WalRcv->walRcvState;
 	receive_start_lsn = WalRcv->receiveStart;
 	receive_start_tli = WalRcv->receiveStartTLI;
-	received_lsn = WalRcv->receivedUpto;
+	received_lsn = WalRcv->flushedUpto;
 	received_tli = WalRcv->receivedTLI;
 	last_send_time = WalRcv->lastMsgSendTime;
 	last_receipt_time = WalRcv->lastMsgReceiptTime;
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index 21d1823607..4afad83539 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -282,11 +282,11 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
 
 	/*
 	 * If this is the first startup of walreceiver (on this timeline),
-	 * initialize receivedUpto and latestChunkStart to the starting point.
+	 * initialize flushedUpto and latestChunkStart to the starting point.
 	 */
 	if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli)
 	{
-		walrcv->receivedUpto = recptr;
+		walrcv->flushedUpto = recptr;
 		walrcv->receivedTLI = tli;
 		walrcv->latestChunkStart = recptr;
 	}
@@ -304,7 +304,7 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
 }
 
 /*
- * Returns the last+1 byte position that walreceiver has written.
+ * Returns the last+1 byte position that walreceiver has flushed.
  *
  * Optionally, returns the previous chunk start, that is the first byte
  * written in the most recent walreceiver flush cycle.  Callers not
@@ -312,13 +312,13 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
  * receiveTLI.
  */
 XLogRecPtr
-GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
+GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
 {
 	WalRcvData *walrcv = WalRcv;
 	XLogRecPtr	recptr;
 
 	SpinLockAcquire(&walrcv->mutex);
-	recptr = walrcv->receivedUpto;
+	recptr = walrcv->flushedUpto;
 	if (latestChunkStart)
 		*latestChunkStart = walrcv->latestChunkStart;
 	if (receiveTLI)
@@ -328,6 +328,18 @@ GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
 	return recptr;
 }
 
+/*
+ * Returns the last+1 byte position that walreceiver has written.
+ * This returns a recently written value without taking a lock.
+ */
+XLogRecPtr
+GetWalRcvWriteRecPtr(void)
+{
+	WalRcvData *walrcv = WalRcv;
+
+	return pg_atomic_read_u64(&walrcv->writtenUpto);
+}
+
 /*
  * Returns the replication apply delay in ms or -1
  * if the apply delay info is not available
@@ -345,7 +357,7 @@ GetReplicationApplyDelay(void)
 	TimestampTz chunkReplayStartTime;
 
 	SpinLockAcquire(&walrcv->mutex);
-	receivePtr = walrcv->receivedUpto;
+	receivePtr = walrcv->flushedUpto;
 	SpinLockRelease(&walrcv->mutex);
 
 	replayPtr = GetXLogReplayRecPtr(NULL);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 06e8b79036..122d884f3e 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2949,7 +2949,7 @@ GetStandbyFlushRecPtr(void)
 	 * has streamed, but hasn't been replayed yet.
 	 */
 
-	receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI);
+	receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
 	replayPtr = GetXLogReplayRecPtr(&replayTLI);
 
 	ThisTimeLineID = replayTLI;
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index cf3e43128c..f1aa6e9977 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -16,6 +16,7 @@
 #include "access/xlogdefs.h"
 #include "getaddrinfo.h"		/* for NI_MAXHOST */
 #include "pgtime.h"
+#include "port/atomics.h"
 #include "replication/logicalproto.h"
 #include "replication/walsender.h"
 #include "storage/latch.h"
@@ -73,19 +74,19 @@ typedef struct
 	TimeLineID	receiveStartTLI;
 
 	/*
-	 * receivedUpto-1 is the last byte position that has already been
+	 * flushedUpto-1 is the last byte position that has already been
 	 * received, and receivedTLI is the timeline it came from.  At the first
 	 * startup of walreceiver, these are set to receiveStart and
 	 * receiveStartTLI. After that, walreceiver updates these whenever it
 	 * flushes the received WAL to disk.
 	 */
-	XLogRecPtr	receivedUpto;
+	XLogRecPtr	flushedUpto;
 	TimeLineID	receivedTLI;
 
 	/*
 	 * latestChunkStart is the starting byte position of the current "batch"
 	 * of received WAL.  It's actually the same as the previous value of
-	 * receivedUpto before the last flush to disk.  Startup process can use
+	 * flushedUpto before the last flush to disk.  Startup process can use
 	 * this to detect whether it's keeping up or not.
 	 */
 	XLogRecPtr	latestChunkStart;
@@ -141,6 +142,14 @@ typedef struct
 
 	slock_t		mutex;			/* locks shared variables shown above */
 
+	/*
+	 * Like flushedUpto, but advanced after writing and before flushing,
+	 * without the need to acquire the spin lock.  Data can be read by another
+	 * process up to this point, but shouldn't be used for data integrity
+	 * purposes.
+	 */
+	pg_atomic_uint64 writtenUpto;
+
 	/*
 	 * force walreceiver reply?  This doesn't need to be locked; memory
 	 * barriers for ordering are sufficient.  But we do need atomic fetch and
@@ -322,7 +331,8 @@ extern bool WalRcvRunning(void);
 extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr,
 								 const char *conninfo, const char *slotname,
 								 bool create_temp_slot);
-extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI);
+extern XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI);
+extern XLogRecPtr GetWalRcvWriteRecPtr(void);
 extern int	GetReplicationApplyDelay(void);
 extern int	GetReplicationTransferLatency(void);
 extern void WalRcvForceReply(void);
-- 
2.20.1

