From 664ece95655bfba9ed565c77e17a1ca73b5fe11c Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Tue, 7 Apr 2020 22:56:27 +1200
Subject: [PATCH v6 7/8] Allow XLogReadRecord() to be non-blocking.

Extend read_local_xlog_page() to support non-blocking modes:

1. Reading as far as the WAL receiver has written so far.
2. Reading all the way to the end, when the end LSN is unknown.

Discussion: https://postgr.es/m/CA%2BhUKGJ4VJN8ttxScUFM8dOKX0BrBiboo5uz1cq%3DAovOddfHpA%40mail.gmail.com
---
 src/backend/access/transam/xlogreader.c | 37 +++++++++++----
 src/backend/access/transam/xlogutils.c  | 61 +++++++++++++++++++++++--
 src/backend/replication/walsender.c     |  2 +-
 src/include/access/xlogreader.h         |  4 ++
 src/include/access/xlogutils.h          | 16 +++++++
 5 files changed, 107 insertions(+), 13 deletions(-)

diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index f3fea5132f..e2f2998911 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -254,6 +254,9 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
  * If the reading fails for some other reason, NULL is also returned, and
  * *errormsg is set to a string with details of the failure.
  *
+ * If the read_page callback is one that returns XLOGPAGEREAD_WOULDBLOCK rather
+ * than waiting for WAL to arrive, NULL is also returned in that case.
+ *
  * The returned pointer (or *errormsg) points to an internal buffer that's
  * valid until the next call to XLogReadRecord.
  */
@@ -543,10 +546,11 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
 err:
 
 	/*
-	 * Invalidate the read state. We might read from a different source after
-	 * failure.
+	 * Invalidate the read state, if this was an error. We might read from a
+	 * different source after failure.
 	 */
-	XLogReaderInvalReadState(state);
+	if (readOff != XLOGPAGEREAD_WOULDBLOCK)
+		XLogReaderInvalReadState(state);
 
 	if (state->errormsg_buf[0] != '\0')
 		*errormsg = state->errormsg_buf;
@@ -558,8 +562,9 @@ err:
  * Read a single xlog page including at least [pageptr, reqLen] of valid data
  * via the read_page() callback.
  *
- * Returns -1 if the required page cannot be read for some reason; errormsg_buf
- * is set in that case (unless the error occurs in the read_page callback).
+ * Returns XLOGPAGEREAD_ERROR or XLOGPAGEREAD_WOULDBLOCK if the required page
+ * cannot be read for some reason; errormsg_buf is set in the former case
+ * (unless the error occurs in the read_page callback).
  *
  * We fetch the page from a reader-local cache if we know we have the required
  * data and if there hasn't been any error since caching the data.
@@ -656,8 +661,11 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
 	return readLen;
 
 err:
+	if (readLen == XLOGPAGEREAD_WOULDBLOCK)
+		return XLOGPAGEREAD_WOULDBLOCK;
+
 	XLogReaderInvalReadState(state);
-	return -1;
+	return XLOGPAGEREAD_ERROR;
 }
 
 /*
@@ -936,6 +944,7 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
 	XLogRecPtr	found = InvalidXLogRecPtr;
 	XLogPageHeader header;
 	char	   *errormsg;
+	int			readLen;
 
 	Assert(!XLogRecPtrIsInvalid(RecPtr));
 
@@ -949,7 +958,6 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
 		XLogRecPtr	targetPagePtr;
 		int			targetRecOff;
 		uint32		pageHeaderSize;
-		int			readLen;
 
 		/*
 		 * Compute targetRecOff. It should typically be equal or greater than
@@ -1030,7 +1038,8 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
 	}
 
 err:
-	XLogReaderInvalReadState(state);
+	if (readLen != XLOGPAGEREAD_WOULDBLOCK)
+		XLogReaderInvalReadState(state);
 
 	return InvalidXLogRecPtr;
 }
@@ -1081,13 +1090,23 @@ WALRead(char *buf, XLogRecPtr startptr, Size count, TimeLineID tli,
 			tli != seg->ws_tli)
 		{
 			XLogSegNo	nextSegNo;
-
 			if (seg->ws_file >= 0)
 				close(seg->ws_file);
 
 			XLByteToSeg(recptr, nextSegNo, segcxt->ws_segsize);
 			seg->ws_file = openSegment(nextSegNo, segcxt, &tli);
 
+			/* callback reported that there was no such file */
+			if (seg->ws_file < 0)
+			{
+				errinfo->wre_errno = errno;
+				errinfo->wre_req = segbytes;
+				errinfo->wre_read = readbytes;
+				errinfo->wre_off = startoff;
+				errinfo->wre_seg = *seg;
+				return false;
+			}
+
 			/* Update the current segment info. */
 			seg->ws_tli = tli;
 			seg->ws_segno = nextSegNo;
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 6cb143e161..5031877e7c 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -25,6 +25,7 @@
 #include "access/xlogutils.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "replication/walreceiver.h"
 #include "storage/smgr.h"
 #include "utils/guc.h"
 #include "utils/hsearch.h"
@@ -783,6 +784,30 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
 	}
 }
 
+/* openSegment callback for WALRead */
+static int
+wal_segment_try_open(XLogSegNo nextSegNo,
+					 WALSegmentContext *segcxt,
+					 TimeLineID *tli_p)
+{
+	TimeLineID	tli = *tli_p;
+	char		path[MAXPGPATH];
+	int			fd;
+
+	XLogFilePath(path, tli, nextSegNo, segcxt->ws_segsize);
+	fd = BasicOpenFile(path, O_RDONLY | PG_BINARY);
+	if (fd >= 0)
+		return fd;
+
+	if (errno != ENOENT)
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not open file \"%s\": %m",
+						path)));
+
+	return -1;					/* keep compiler quiet */
+}
+
 /* openSegment callback for WALRead */
 static int
 wal_segment_open(XLogSegNo nextSegNo, WALSegmentContext * segcxt,
@@ -831,6 +856,8 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 	TimeLineID	tli;
 	int			count;
 	WALReadError errinfo;
+	XLogReadLocalOptions *options = (XLogReadLocalOptions *) state->private_data;
+	bool		try_read = false;
 
 	loc = targetPagePtr + reqLen;
 
@@ -845,7 +872,24 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 		 * notices recovery finishes, so we only have to maintain it for the
 		 * local process until recovery ends.
 		 */
-		if (!RecoveryInProgress())
+		if (options)
+		{
+			switch (options->read_upto_policy)
+			{
+			case XLRO_WALRCV_WRITTEN:
+				read_upto = GetWalRcvWriteRecPtr();
+				break;
+			case XLRO_END:
+				read_upto = (XLogRecPtr) -1;
+				try_read = true;
+				break;
+			default:
+				read_upto = 0;
+				elog(ERROR, "unknown read_upto_policy value");
+				break;
+			}
+		}
+		else if (!RecoveryInProgress())
 			read_upto = GetFlushRecPtr();
 		else
 			read_upto = GetXLogReplayRecPtr(&ThisTimeLineID);
@@ -883,6 +927,10 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 			if (loc <= read_upto)
 				break;
 
+			/* not enough data there, but we were asked not to wait */
+			if (options && options->nowait)
+				return XLOGPAGEREAD_WOULDBLOCK;
+
 			CHECK_FOR_INTERRUPTS();
 			pg_usleep(1000L);
 		}
@@ -924,7 +972,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 	else if (targetPagePtr + reqLen > read_upto)
 	{
 		/* not enough data there */
-		return -1;
+		return XLOGPAGEREAD_ERROR;
 	}
 	else
 	{
@@ -938,8 +986,15 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 	 * zero-padded up to the page boundary if it's incomplete.
 	 */
 	if (!WALRead(cur_page, targetPagePtr, XLOG_BLCKSZ, tli, &state->seg,
-				 &state->segcxt, wal_segment_open, &errinfo))
+				 &state->segcxt,
+				 try_read ? wal_segment_try_open : wal_segment_open,
+				 &errinfo))
+	{
+		/* Caller asked for XLRO_END, so there may be no file at all. */
+		if (try_read)
+			return XLOGPAGEREAD_ERROR;
 		WALReadRaiseError(&errinfo);
+	}
 
 	/* number of valid bytes in the buffer */
 	return count;
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 414cf67d3d..37ec3ddc7b 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -818,7 +818,7 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 
 	/* fail if not (implies we are going to shut down) */
 	if (flushptr < targetPagePtr + reqLen)
-		return -1;
+		return XLOGPAGEREAD_ERROR;
 
 	if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
 		count = XLOG_BLCKSZ;	/* more than one block available */
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 4582196e18..dc99d02b60 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -50,6 +50,10 @@ typedef struct WALSegmentContext
 
 typedef struct XLogReaderState XLogReaderState;
 
+/* Special negative return values for XLogPageReadCB functions */
+#define XLOGPAGEREAD_ERROR		-1
+#define XLOGPAGEREAD_WOULDBLOCK	-2
+
 /* Function type definition for the read_page callback */
 typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader,
 							   XLogRecPtr targetPagePtr,
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index 5181a077d9..440dffac1a 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -47,6 +47,22 @@ extern Buffer XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
 extern Relation CreateFakeRelcacheEntry(RelFileNode rnode);
 extern void FreeFakeRelcacheEntry(Relation fakerel);
 
+/*
+ * A pointer to an XLogReadLocalOptions struct can supplied as the private data
+ * for an XLogReader, causing read_local_xlog_page() to modify its behavior.
+ */
+typedef struct XLogReadLocalOptions
+{
+	/* Don't block waiting for new WAL to arrive. */
+	bool		nowait;
+
+	/* How far to read. */
+	enum {
+		XLRO_WALRCV_WRITTEN,
+		XLRO_END
+	} read_upto_policy;
+} XLogReadLocalOptions;
+
 extern int	read_local_xlog_page(XLogReaderState *state,
 								 XLogRecPtr targetPagePtr, int reqLen,
 								 XLogRecPtr targetRecPtr, char *cur_page);
-- 
2.20.1

