From 6ef218f60cab62ecbd5ad120cf535cb4e5045f45 Mon Sep 17 00:00:00 2001
From: Thomas Munro <tmunro@postgresql.org>
Date: Mon, 9 Dec 2019 17:22:07 +1300
Subject: [PATCH 3/5] Add WalRcvGetWriteRecPtr() (new definition).

A later patch will read received WAL to prefetch referenced blocks,
without waiting for the data to be flushed to disk.  To do that, it
needs to be able to see the write pointer advancing in shared memory.

The function formerly bearing name was recently renamed to
WalRcvGetFlushRecPtr(), which better described what it does.

Reviewed-by: Alvaro Herrera <alvherre@2ndquadrant.com>
Discussion: https://postgr.es/m/CA%2BhUKGJ4VJN8ttxScUFM8dOKX0BrBiboo5uz1cq%3DAovOddfHpA%40mail.gmail.com
---
 src/backend/replication/walreceiver.c      |  5 +++++
 src/backend/replication/walreceiverfuncs.c | 12 ++++++++++++
 src/include/replication/walreceiver.h      | 10 ++++++++++
 3 files changed, 27 insertions(+)

diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 0bdd0c3074..e250f5583c 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -245,6 +245,8 @@ WalReceiverMain(void)
 
 	SpinLockRelease(&walrcv->mutex);
 
+	pg_atomic_init_u64(&WalRcv->writtenUpto, 0);
+
 	/* Arrange to clean up at walreceiver exit */
 	on_shmem_exit(WalRcvDie, 0);
 
@@ -985,6 +987,9 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
 
 		LogstreamResult.Write = recptr;
 	}
+
+	/* Update shared-memory status */
+	pg_atomic_write_u64(&WalRcv->writtenUpto, LogstreamResult.Write);
 }
 
 /*
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index 31025f97e3..96b44e2c88 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -310,6 +310,18 @@ GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
 	return recptr;
 }
 
+/*
+ * Returns the last+1 byte position that walreceiver has written.
+ * This returns a recently written value without taking a lock.
+ */
+XLogRecPtr
+GetWalRcvWriteRecPtr(void)
+{
+	WalRcvData *walrcv = WalRcv;
+
+	return pg_atomic_read_u64(&walrcv->writtenUpto);
+}
+
 /*
  * Returns the replication apply delay in ms or -1
  * if the apply delay info is not available
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 9ed71139ce..914e6e3d44 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -16,6 +16,7 @@
 #include "access/xlogdefs.h"
 #include "getaddrinfo.h"		/* for NI_MAXHOST */
 #include "pgtime.h"
+#include "port/atomics.h"
 #include "replication/logicalproto.h"
 #include "replication/walsender.h"
 #include "storage/latch.h"
@@ -142,6 +143,14 @@ typedef struct
 
 	slock_t		mutex;			/* locks shared variables shown above */
 
+	/*
+	 * Like flushedUpto, but advanced after writing and before flushing,
+	 * without the need to acquire the spin lock.  Data can be read by another
+	 * process up to this point, but shouldn't be used for data integrity
+	 * purposes.
+	 */
+	pg_atomic_uint64 writtenUpto;
+
 	/*
 	 * force walreceiver reply?  This doesn't need to be locked; memory
 	 * barriers for ordering are sufficient.  But we do need atomic fetch and
@@ -323,6 +332,7 @@ extern bool WalRcvRunning(void);
 extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr,
 								 const char *conninfo, const char *slotname);
 extern XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI);
+extern XLogRecPtr GetWalRcvWriteRecPtr(void);
 extern int	GetReplicationApplyDelay(void);
 extern int	GetReplicationTransferLatency(void);
 extern void WalRcvForceReply(void);
-- 
2.20.1

