From 02a03ee9767fbb2ef6fc62bdf1e64c0fe24eccfa Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Tue, 17 Mar 2020 15:28:08 +1300
Subject: [PATCH 2/5] Rename GetWalRcvWriteRecPtr() to GetWalRcvFlushRecPtr().

The new name better reflects the fact that the value it returns is
updated only when received data has been flushed to disk.  Also rename a
couple of variables relating to this value.

An upcoming patch will make use of the latest data that was written
without waiting for it to be flushed, so let's use more precise function
names.

Reviewed-by: Alvaro Herrera <alvherre@2ndquadrant.com>
Discussion: https://postgr.es/m/CA%2BhUKGJ4VJN8ttxScUFM8dOKX0BrBiboo5uz1cq%3DAovOddfHpA%40mail.gmail.com
---
 src/backend/access/transam/xlog.c          | 20 ++++++++++----------
 src/backend/access/transam/xlogfuncs.c     |  2 +-
 src/backend/replication/README             |  2 +-
 src/backend/replication/walreceiver.c      | 10 +++++-----
 src/backend/replication/walreceiverfuncs.c | 12 ++++++------
 src/backend/replication/walsender.c        |  2 +-
 src/include/replication/walreceiver.h      |  8 ++++----
 7 files changed, 28 insertions(+), 28 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index de2d4ee582..abb227ce66 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -205,8 +205,8 @@ HotStandbyState standbyState = STANDBY_DISABLED;
 
 static XLogRecPtr LastRec;
 
-/* Local copy of WalRcv->receivedUpto */
-static XLogRecPtr receivedUpto = 0;
+/* Local copy of WalRcv->flushedUpto */
+static XLogRecPtr flushedUpto = 0;
 static TimeLineID receiveTLI = 0;
 
 /*
@@ -9288,7 +9288,7 @@ CreateRestartPoint(int flags)
 	 * Retreat _logSegNo using the current end of xlog replayed or received,
 	 * whichever is later.
 	 */
-	receivePtr = GetWalRcvWriteRecPtr(NULL, NULL);
+	receivePtr = GetWalRcvFlushRecPtr(NULL, NULL);
 	replayPtr = GetXLogReplayRecPtr(&replayTLI);
 	endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr;
 	KeepLogSeg(endptr, &_logSegNo);
@@ -11682,7 +11682,7 @@ retry:
 	/* See if we need to retrieve more data */
 	if (readFile < 0 ||
 		(readSource == XLOG_FROM_STREAM &&
-		 receivedUpto < targetPagePtr + reqLen))
+		 flushedUpto < targetPagePtr + reqLen))
 	{
 		if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen,
 										 private->randAccess,
@@ -11713,10 +11713,10 @@ retry:
 	 */
 	if (readSource == XLOG_FROM_STREAM)
 	{
-		if (((targetPagePtr) / XLOG_BLCKSZ) != (receivedUpto / XLOG_BLCKSZ))
+		if (((targetPagePtr) / XLOG_BLCKSZ) != (flushedUpto / XLOG_BLCKSZ))
 			readLen = XLOG_BLCKSZ;
 		else
-			readLen = XLogSegmentOffset(receivedUpto, wal_segment_size) -
+			readLen = XLogSegmentOffset(flushedUpto, wal_segment_size) -
 				targetPageOff;
 	}
 	else
@@ -11952,7 +11952,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 						curFileTLI = tli;
 						RequestXLogStreaming(tli, ptr, PrimaryConnInfo,
 											 PrimarySlotName);
-						receivedUpto = 0;
+						flushedUpto = 0;
 					}
 
 					/*
@@ -12132,14 +12132,14 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 					 * XLogReceiptTime will not advance, so the grace time
 					 * allotted to conflicting queries will decrease.
 					 */
-					if (RecPtr < receivedUpto)
+					if (RecPtr < flushedUpto)
 						havedata = true;
 					else
 					{
 						XLogRecPtr	latestChunkStart;
 
-						receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart, &receiveTLI);
-						if (RecPtr < receivedUpto && receiveTLI == curFileTLI)
+						flushedUpto = GetWalRcvFlushRecPtr(&latestChunkStart, &receiveTLI);
+						if (RecPtr < flushedUpto && receiveTLI == curFileTLI)
 						{
 							havedata = true;
 							if (latestChunkStart <= RecPtr)
diff --git a/src/backend/access/transam/xlogfuncs.c b/src/backend/access/transam/xlogfuncs.c
index 20316539b6..e075c1c71b 100644
--- a/src/backend/access/transam/xlogfuncs.c
+++ b/src/backend/access/transam/xlogfuncs.c
@@ -398,7 +398,7 @@ pg_last_wal_receive_lsn(PG_FUNCTION_ARGS)
 {
 	XLogRecPtr	recptr;
 
-	recptr = GetWalRcvWriteRecPtr(NULL, NULL);
+	recptr = GetWalRcvFlushRecPtr(NULL, NULL);
 
 	if (recptr == 0)
 		PG_RETURN_NULL();
diff --git a/src/backend/replication/README b/src/backend/replication/README
index 0cbb990613..8ccdd86e74 100644
--- a/src/backend/replication/README
+++ b/src/backend/replication/README
@@ -54,7 +54,7 @@ and WalRcvData->slotname, and initializes the starting point in
 WalRcvData->receiveStart.
 
 As walreceiver receives WAL from the master server, and writes and flushes
-it to disk (in pg_wal), it updates WalRcvData->receivedUpto and signals
+it to disk (in pg_wal), it updates WalRcvData->flushedUpto and signals
 the startup process to know how far WAL replay can advance.
 
 Walreceiver sends information about replication progress to the master server
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 25e0333c9e..0bdd0c3074 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -12,7 +12,7 @@
  * in the primary server), and then keeps receiving XLOG records and
  * writing them to the disk as long as the connection is alive. As XLOG
  * records are received and flushed to disk, it updates the
- * WalRcv->receivedUpto variable in shared memory, to inform the startup
+ * WalRcv->flushedUpto variable in shared memory, to inform the startup
  * process of how far it can proceed with XLOG replay.
  *
  * If the primary server ends streaming, but doesn't disconnect, walreceiver
@@ -1006,10 +1006,10 @@ XLogWalRcvFlush(bool dying)
 
 		/* Update shared-memory status */
 		SpinLockAcquire(&walrcv->mutex);
-		if (walrcv->receivedUpto < LogstreamResult.Flush)
+		if (walrcv->flushedUpto < LogstreamResult.Flush)
 		{
-			walrcv->latestChunkStart = walrcv->receivedUpto;
-			walrcv->receivedUpto = LogstreamResult.Flush;
+			walrcv->latestChunkStart = walrcv->flushedUpto;
+			walrcv->flushedUpto = LogstreamResult.Flush;
 			walrcv->receivedTLI = ThisTimeLineID;
 		}
 		SpinLockRelease(&walrcv->mutex);
@@ -1362,7 +1362,7 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
 	state = WalRcv->walRcvState;
 	receive_start_lsn = WalRcv->receiveStart;
 	receive_start_tli = WalRcv->receiveStartTLI;
-	received_lsn = WalRcv->receivedUpto;
+	received_lsn = WalRcv->flushedUpto;
 	received_tli = WalRcv->receivedTLI;
 	last_send_time = WalRcv->lastMsgSendTime;
 	last_receipt_time = WalRcv->lastMsgReceiptTime;
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index 89c903e45a..31025f97e3 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -264,11 +264,11 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
 
 	/*
 	 * If this is the first startup of walreceiver (on this timeline),
-	 * initialize receivedUpto and latestChunkStart to the starting point.
+	 * initialize flushedUpto and latestChunkStart to the starting point.
 	 */
 	if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli)
 	{
-		walrcv->receivedUpto = recptr;
+		walrcv->flushedUpto = recptr;
 		walrcv->receivedTLI = tli;
 		walrcv->latestChunkStart = recptr;
 	}
@@ -286,7 +286,7 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
 }
 
 /*
- * Returns the last+1 byte position that walreceiver has written.
+ * Returns the last+1 byte position that walreceiver has flushed.
  *
  * Optionally, returns the previous chunk start, that is the first byte
  * written in the most recent walreceiver flush cycle.  Callers not
@@ -294,13 +294,13 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
  * receiveTLI.
  */
 XLogRecPtr
-GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
+GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
 {
 	WalRcvData *walrcv = WalRcv;
 	XLogRecPtr	recptr;
 
 	SpinLockAcquire(&walrcv->mutex);
-	recptr = walrcv->receivedUpto;
+	recptr = walrcv->flushedUpto;
 	if (latestChunkStart)
 		*latestChunkStart = walrcv->latestChunkStart;
 	if (receiveTLI)
@@ -327,7 +327,7 @@ GetReplicationApplyDelay(void)
 	TimestampTz chunkReplayStartTime;
 
 	SpinLockAcquire(&walrcv->mutex);
-	receivePtr = walrcv->receivedUpto;
+	receivePtr = walrcv->flushedUpto;
 	SpinLockRelease(&walrcv->mutex);
 
 	replayPtr = GetXLogReplayRecPtr(NULL);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 76ec3c7dd0..928a27dbaf 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2913,7 +2913,7 @@ GetStandbyFlushRecPtr(void)
 	 * has streamed, but hasn't been replayed yet.
 	 */
 
-	receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI);
+	receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
 	replayPtr = GetXLogReplayRecPtr(&replayTLI);
 
 	ThisTimeLineID = replayTLI;
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index e08afc6548..9ed71139ce 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -74,19 +74,19 @@ typedef struct
 	TimeLineID	receiveStartTLI;
 
 	/*
-	 * receivedUpto-1 is the last byte position that has already been
+	 * flushedUpto-1 is the last byte position that has already been
 	 * received, and receivedTLI is the timeline it came from.  At the first
 	 * startup of walreceiver, these are set to receiveStart and
 	 * receiveStartTLI. After that, walreceiver updates these whenever it
 	 * flushes the received WAL to disk.
 	 */
-	XLogRecPtr	receivedUpto;
+	XLogRecPtr	flushedUpto;
 	TimeLineID	receivedTLI;
 
 	/*
 	 * latestChunkStart is the starting byte position of the current "batch"
 	 * of received WAL.  It's actually the same as the previous value of
-	 * receivedUpto before the last flush to disk.  Startup process can use
+	 * flushedUpto before the last flush to disk.  Startup process can use
 	 * this to detect whether it's keeping up or not.
 	 */
 	XLogRecPtr	latestChunkStart;
@@ -322,7 +322,7 @@ extern bool WalRcvStreaming(void);
 extern bool WalRcvRunning(void);
 extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr,
 								 const char *conninfo, const char *slotname);
-extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI);
+extern XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI);
 extern int	GetReplicationApplyDelay(void);
 extern int	GetReplicationTransferLatency(void);
 extern void WalRcvForceReply(void);
-- 
2.20.1

