From 260563b1400f32a94ee4cc5e4552d17fecfb4ea3 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Tue, 17 Mar 2020 15:28:08 +1300
Subject: [PATCH v6 2/8] Rename GetWalRcvWriteRecPtr() to
 GetWalRcvFlushRecPtr().

The new name better reflects the fact that the value it returns is
updated only when received data has been flushed to disk.  Also rename a
couple of variables relating to this value.

An upcoming patch will make use of the latest data that was written
without waiting for it to be flushed, so let's use more precise function
names.

Reviewed-by: Alvaro Herrera <alvherre@2ndquadrant.com>
Discussion: https://postgr.es/m/CA%2BhUKGJ4VJN8ttxScUFM8dOKX0BrBiboo5uz1cq%3DAovOddfHpA%40mail.gmail.com
---
 src/backend/access/transam/xlog.c          | 20 ++++++++++----------
 src/backend/access/transam/xlogfuncs.c     |  2 +-
 src/backend/replication/README             |  2 +-
 src/backend/replication/walreceiver.c      | 10 +++++-----
 src/backend/replication/walreceiverfuncs.c | 12 ++++++------
 src/backend/replication/walsender.c        |  2 +-
 src/include/replication/walreceiver.h      |  8 ++++----
 7 files changed, 28 insertions(+), 28 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index abf954ba39..658af40816 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -207,8 +207,8 @@ HotStandbyState standbyState = STANDBY_DISABLED;
 
 static XLogRecPtr LastRec;
 
-/* Local copy of WalRcv->receivedUpto */
-static XLogRecPtr receivedUpto = 0;
+/* Local copy of WalRcv->flushedUpto */
+static XLogRecPtr flushedUpto = 0;
 static TimeLineID receiveTLI = 0;
 
 /*
@@ -9335,7 +9335,7 @@ CreateRestartPoint(int flags)
 	 * Retreat _logSegNo using the current end of xlog replayed or received,
 	 * whichever is later.
 	 */
-	receivePtr = GetWalRcvWriteRecPtr(NULL, NULL);
+	receivePtr = GetWalRcvFlushRecPtr(NULL, NULL);
 	replayPtr = GetXLogReplayRecPtr(&replayTLI);
 	endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr;
 	KeepLogSeg(endptr, &_logSegNo);
@@ -11732,7 +11732,7 @@ retry:
 	/* See if we need to retrieve more data */
 	if (readFile < 0 ||
 		(readSource == XLOG_FROM_STREAM &&
-		 receivedUpto < targetPagePtr + reqLen))
+		 flushedUpto < targetPagePtr + reqLen))
 	{
 		if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen,
 										 private->randAccess,
@@ -11763,10 +11763,10 @@ retry:
 	 */
 	if (readSource == XLOG_FROM_STREAM)
 	{
-		if (((targetPagePtr) / XLOG_BLCKSZ) != (receivedUpto / XLOG_BLCKSZ))
+		if (((targetPagePtr) / XLOG_BLCKSZ) != (flushedUpto / XLOG_BLCKSZ))
 			readLen = XLOG_BLCKSZ;
 		else
-			readLen = XLogSegmentOffset(receivedUpto, wal_segment_size) -
+			readLen = XLogSegmentOffset(flushedUpto, wal_segment_size) -
 				targetPageOff;
 	}
 	else
@@ -12181,7 +12181,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 						RequestXLogStreaming(tli, ptr, PrimaryConnInfo,
 											 PrimarySlotName,
 											 wal_receiver_create_temp_slot);
-						receivedUpto = 0;
+						flushedUpto = 0;
 					}
 
 					/*
@@ -12205,14 +12205,14 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 					 * XLogReceiptTime will not advance, so the grace time
 					 * allotted to conflicting queries will decrease.
 					 */
-					if (RecPtr < receivedUpto)
+					if (RecPtr < flushedUpto)
 						havedata = true;
 					else
 					{
 						XLogRecPtr	latestChunkStart;
 
-						receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart, &receiveTLI);
-						if (RecPtr < receivedUpto && receiveTLI == curFileTLI)
+						flushedUpto = GetWalRcvFlushRecPtr(&latestChunkStart, &receiveTLI);
+						if (RecPtr < flushedUpto && receiveTLI == curFileTLI)
 						{
 							havedata = true;
 							if (latestChunkStart <= RecPtr)
diff --git a/src/backend/access/transam/xlogfuncs.c b/src/backend/access/transam/xlogfuncs.c
index b84ba57259..00e1b33ed5 100644
--- a/src/backend/access/transam/xlogfuncs.c
+++ b/src/backend/access/transam/xlogfuncs.c
@@ -398,7 +398,7 @@ pg_last_wal_receive_lsn(PG_FUNCTION_ARGS)
 {
 	XLogRecPtr	recptr;
 
-	recptr = GetWalRcvWriteRecPtr(NULL, NULL);
+	recptr = GetWalRcvFlushRecPtr(NULL, NULL);
 
 	if (recptr == 0)
 		PG_RETURN_NULL();
diff --git a/src/backend/replication/README b/src/backend/replication/README
index 0cbb990613..8ccdd86e74 100644
--- a/src/backend/replication/README
+++ b/src/backend/replication/README
@@ -54,7 +54,7 @@ and WalRcvData->slotname, and initializes the starting point in
 WalRcvData->receiveStart.
 
 As walreceiver receives WAL from the master server, and writes and flushes
-it to disk (in pg_wal), it updates WalRcvData->receivedUpto and signals
+it to disk (in pg_wal), it updates WalRcvData->flushedUpto and signals
 the startup process to know how far WAL replay can advance.
 
 Walreceiver sends information about replication progress to the master server
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index aee67c61aa..1363c3facc 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -12,7 +12,7 @@
  * in the primary server), and then keeps receiving XLOG records and
  * writing them to the disk as long as the connection is alive. As XLOG
  * records are received and flushed to disk, it updates the
- * WalRcv->receivedUpto variable in shared memory, to inform the startup
+ * WalRcv->flushedUpto variable in shared memory, to inform the startup
  * process of how far it can proceed with XLOG replay.
  *
  * A WAL receiver cannot directly load GUC parameters used when establishing
@@ -1005,10 +1005,10 @@ XLogWalRcvFlush(bool dying)
 
 		/* Update shared-memory status */
 		SpinLockAcquire(&walrcv->mutex);
-		if (walrcv->receivedUpto < LogstreamResult.Flush)
+		if (walrcv->flushedUpto < LogstreamResult.Flush)
 		{
-			walrcv->latestChunkStart = walrcv->receivedUpto;
-			walrcv->receivedUpto = LogstreamResult.Flush;
+			walrcv->latestChunkStart = walrcv->flushedUpto;
+			walrcv->flushedUpto = LogstreamResult.Flush;
 			walrcv->receivedTLI = ThisTimeLineID;
 		}
 		SpinLockRelease(&walrcv->mutex);
@@ -1361,7 +1361,7 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
 	state = WalRcv->walRcvState;
 	receive_start_lsn = WalRcv->receiveStart;
 	receive_start_tli = WalRcv->receiveStartTLI;
-	received_lsn = WalRcv->receivedUpto;
+	received_lsn = WalRcv->flushedUpto;
 	received_tli = WalRcv->receivedTLI;
 	last_send_time = WalRcv->lastMsgSendTime;
 	last_receipt_time = WalRcv->lastMsgReceiptTime;
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index 21d1823607..32260c2236 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -282,11 +282,11 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
 
 	/*
 	 * If this is the first startup of walreceiver (on this timeline),
-	 * initialize receivedUpto and latestChunkStart to the starting point.
+	 * initialize flushedUpto and latestChunkStart to the starting point.
 	 */
 	if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli)
 	{
-		walrcv->receivedUpto = recptr;
+		walrcv->flushedUpto = recptr;
 		walrcv->receivedTLI = tli;
 		walrcv->latestChunkStart = recptr;
 	}
@@ -304,7 +304,7 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
 }
 
 /*
- * Returns the last+1 byte position that walreceiver has written.
+ * Returns the last+1 byte position that walreceiver has flushed.
  *
  * Optionally, returns the previous chunk start, that is the first byte
  * written in the most recent walreceiver flush cycle.  Callers not
@@ -312,13 +312,13 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
  * receiveTLI.
  */
 XLogRecPtr
-GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
+GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
 {
 	WalRcvData *walrcv = WalRcv;
 	XLogRecPtr	recptr;
 
 	SpinLockAcquire(&walrcv->mutex);
-	recptr = walrcv->receivedUpto;
+	recptr = walrcv->flushedUpto;
 	if (latestChunkStart)
 		*latestChunkStart = walrcv->latestChunkStart;
 	if (receiveTLI)
@@ -345,7 +345,7 @@ GetReplicationApplyDelay(void)
 	TimestampTz chunkReplayStartTime;
 
 	SpinLockAcquire(&walrcv->mutex);
-	receivePtr = walrcv->receivedUpto;
+	receivePtr = walrcv->flushedUpto;
 	SpinLockRelease(&walrcv->mutex);
 
 	replayPtr = GetXLogReplayRecPtr(NULL);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 9e5611574c..414cf67d3d 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2949,7 +2949,7 @@ GetStandbyFlushRecPtr(void)
 	 * has streamed, but hasn't been replayed yet.
 	 */
 
-	receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI);
+	receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
 	replayPtr = GetXLogReplayRecPtr(&replayTLI);
 
 	ThisTimeLineID = replayTLI;
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index cf3e43128c..6298ca07be 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -73,19 +73,19 @@ typedef struct
 	TimeLineID	receiveStartTLI;
 
 	/*
-	 * receivedUpto-1 is the last byte position that has already been
+	 * flushedUpto-1 is the last byte position that has already been
 	 * received, and receivedTLI is the timeline it came from.  At the first
 	 * startup of walreceiver, these are set to receiveStart and
 	 * receiveStartTLI. After that, walreceiver updates these whenever it
 	 * flushes the received WAL to disk.
 	 */
-	XLogRecPtr	receivedUpto;
+	XLogRecPtr	flushedUpto;
 	TimeLineID	receivedTLI;
 
 	/*
 	 * latestChunkStart is the starting byte position of the current "batch"
 	 * of received WAL.  It's actually the same as the previous value of
-	 * receivedUpto before the last flush to disk.  Startup process can use
+	 * flushedUpto before the last flush to disk.  Startup process can use
 	 * this to detect whether it's keeping up or not.
 	 */
 	XLogRecPtr	latestChunkStart;
@@ -322,7 +322,7 @@ extern bool WalRcvRunning(void);
 extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr,
 								 const char *conninfo, const char *slotname,
 								 bool create_temp_slot);
-extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI);
+extern XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI);
 extern int	GetReplicationApplyDelay(void);
 extern int	GetReplicationTransferLatency(void);
 extern void WalRcvForceReply(void);
-- 
2.20.1

