From 1ae59996301aefd91efd10fcbee50b2c3d7c140e Mon Sep 17 00:00:00 2001
From: Thomas Munro <tmunro@postgresql.org>
Date: Mon, 9 Dec 2019 17:22:07 +1300
Subject: [PATCH v6 3/8] Add GetWalRcvWriteRecPtr() (new definition).

A later patch will read received WAL to prefetch referenced blocks,
without waiting for the data to be flushed to disk.  To do that, it
needs to be able to see the write pointer advancing in shared memory.

The function formerly bearing name was recently renamed to
GetWalRcvFlushRecPtr(), which better described what it does.

Reviewed-by: Alvaro Herrera <alvherre@2ndquadrant.com>
Discussion: https://postgr.es/m/CA%2BhUKGJ4VJN8ttxScUFM8dOKX0BrBiboo5uz1cq%3DAovOddfHpA%40mail.gmail.com
---
 src/backend/replication/walreceiver.c      |  5 +++++
 src/backend/replication/walreceiverfuncs.c | 12 ++++++++++++
 src/include/replication/walreceiver.h      | 10 ++++++++++
 3 files changed, 27 insertions(+)

diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 1363c3facc..d69fb90132 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -261,6 +261,8 @@ WalReceiverMain(void)
 
 	SpinLockRelease(&walrcv->mutex);
 
+	pg_atomic_init_u64(&WalRcv->writtenUpto, 0);
+
 	/* Arrange to clean up at walreceiver exit */
 	on_shmem_exit(WalRcvDie, 0);
 
@@ -984,6 +986,9 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
 
 		LogstreamResult.Write = recptr;
 	}
+
+	/* Update shared-memory status */
+	pg_atomic_write_u64(&WalRcv->writtenUpto, LogstreamResult.Write);
 }
 
 /*
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index 32260c2236..4afad83539 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -328,6 +328,18 @@ GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
 	return recptr;
 }
 
+/*
+ * Returns the last+1 byte position that walreceiver has written.
+ * This returns a recently written value without taking a lock.
+ */
+XLogRecPtr
+GetWalRcvWriteRecPtr(void)
+{
+	WalRcvData *walrcv = WalRcv;
+
+	return pg_atomic_read_u64(&walrcv->writtenUpto);
+}
+
 /*
  * Returns the replication apply delay in ms or -1
  * if the apply delay info is not available
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 6298ca07be..f1aa6e9977 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -16,6 +16,7 @@
 #include "access/xlogdefs.h"
 #include "getaddrinfo.h"		/* for NI_MAXHOST */
 #include "pgtime.h"
+#include "port/atomics.h"
 #include "replication/logicalproto.h"
 #include "replication/walsender.h"
 #include "storage/latch.h"
@@ -141,6 +142,14 @@ typedef struct
 
 	slock_t		mutex;			/* locks shared variables shown above */
 
+	/*
+	 * Like flushedUpto, but advanced after writing and before flushing,
+	 * without the need to acquire the spin lock.  Data can be read by another
+	 * process up to this point, but shouldn't be used for data integrity
+	 * purposes.
+	 */
+	pg_atomic_uint64 writtenUpto;
+
 	/*
 	 * force walreceiver reply?  This doesn't need to be locked; memory
 	 * barriers for ordering are sufficient.  But we do need atomic fetch and
@@ -323,6 +332,7 @@ extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr,
 								 const char *conninfo, const char *slotname,
 								 bool create_temp_slot);
 extern XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI);
+extern XLogRecPtr GetWalRcvWriteRecPtr(void);
 extern int	GetReplicationApplyDelay(void);
 extern int	GetReplicationTransferLatency(void);
 extern void WalRcvForceReply(void);
-- 
2.20.1

