From 743e11495e81af1f96ca304baf130b20dba056e5 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Tue, 7 Apr 2020 22:56:27 +1200
Subject: [PATCH v8 2/3] Allow XLogReadRecord() to be non-blocking.

Extend read_local_xlog_page() to support non-blocking modes:

1. Reading as far as the WAL receiver has written so far.
2. Reading all the way to the end, when the end LSN is unknown.

Discussion: https://postgr.es/m/CA%2BhUKGJ4VJN8ttxScUFM8dOKX0BrBiboo5uz1cq%3DAovOddfHpA%40mail.gmail.com
---
 src/backend/access/transam/xlogreader.c |  37 ++++--
 src/backend/access/transam/xlogutils.c  | 151 +++++++++++++++++-------
 src/backend/replication/walsender.c     |   2 +-
 src/include/access/xlogreader.h         |  20 +++-
 src/include/access/xlogutils.h          |  23 ++++
 5 files changed, 178 insertions(+), 55 deletions(-)

diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 79ff976474..554b2029da 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -257,6 +257,9 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
  * If the reading fails for some other reason, NULL is also returned, and
  * *errormsg is set to a string with details of the failure.
  *
+ * If the read_page callback is one that returns XLOGPAGEREAD_WOULDBLOCK rather
+ * than waiting for WAL to arrive, NULL is also returned in that case.
+ *
  * The returned pointer (or *errormsg) points to an internal buffer that's
  * valid until the next call to XLogReadRecord.
  */
@@ -546,10 +549,11 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
 err:
 
 	/*
-	 * Invalidate the read state. We might read from a different source after
-	 * failure.
+	 * Invalidate the read state, if this was an error. We might read from a
+	 * different source after failure.
 	 */
-	XLogReaderInvalReadState(state);
+	if (readOff != XLOGPAGEREAD_WOULDBLOCK)
+		XLogReaderInvalReadState(state);
 
 	if (state->errormsg_buf[0] != '\0')
 		*errormsg = state->errormsg_buf;
@@ -561,8 +565,9 @@ err:
  * Read a single xlog page including at least [pageptr, reqLen] of valid data
  * via the read_page() callback.
  *
- * Returns -1 if the required page cannot be read for some reason; errormsg_buf
- * is set in that case (unless the error occurs in the read_page callback).
+ * Returns XLOGPAGEREAD_ERROR or XLOGPAGEREAD_WOULDBLOCK if the required page
+ * cannot be read for some reason; errormsg_buf is set in the former case
+ * (unless the error occurs in the read_page callback).
  *
  * We fetch the page from a reader-local cache if we know we have the required
  * data and if there hasn't been any error since caching the data.
@@ -659,8 +664,11 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
 	return readLen;
 
 err:
+	if (readLen == XLOGPAGEREAD_WOULDBLOCK)
+		return XLOGPAGEREAD_WOULDBLOCK;
+
 	XLogReaderInvalReadState(state);
-	return -1;
+	return XLOGPAGEREAD_ERROR;
 }
 
 /*
@@ -939,6 +947,7 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
 	XLogRecPtr	found = InvalidXLogRecPtr;
 	XLogPageHeader header;
 	char	   *errormsg;
+	int			readLen;
 
 	Assert(!XLogRecPtrIsInvalid(RecPtr));
 
@@ -952,7 +961,6 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
 		XLogRecPtr	targetPagePtr;
 		int			targetRecOff;
 		uint32		pageHeaderSize;
-		int			readLen;
 
 		/*
 		 * Compute targetRecOff. It should typically be equal or greater than
@@ -1033,7 +1041,8 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
 	}
 
 err:
-	XLogReaderInvalReadState(state);
+	if (readLen != XLOGPAGEREAD_WOULDBLOCK)
+		XLogReaderInvalReadState(state);
 
 	return InvalidXLogRecPtr;
 }
@@ -1084,13 +1093,23 @@ WALRead(char *buf, XLogRecPtr startptr, Size count, TimeLineID tli,
 			tli != seg->ws_tli)
 		{
 			XLogSegNo	nextSegNo;
-
 			if (seg->ws_file >= 0)
 				close(seg->ws_file);
 
 			XLByteToSeg(recptr, nextSegNo, segcxt->ws_segsize);
 			seg->ws_file = openSegment(nextSegNo, segcxt, &tli);
 
+			/* callback reported that there was no such file */
+			if (seg->ws_file < 0)
+			{
+				errinfo->wre_errno = errno;
+				errinfo->wre_req = 0;
+				errinfo->wre_read = 0;
+				errinfo->wre_off = startoff;
+				errinfo->wre_seg = *seg;
+				return false;
+			}
+
 			/* Update the current segment info. */
 			seg->ws_tli = tli;
 			seg->ws_segno = nextSegNo;
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 6cb143e161..2d702437dd 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -25,6 +25,7 @@
 #include "access/xlogutils.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "replication/walreceiver.h"
 #include "storage/smgr.h"
 #include "utils/guc.h"
 #include "utils/hsearch.h"
@@ -783,6 +784,30 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
 	}
 }
 
+/* openSegment callback for WALRead */
+static int
+wal_segment_try_open(XLogSegNo nextSegNo,
+					 WALSegmentContext *segcxt,
+					 TimeLineID *tli_p)
+{
+	TimeLineID	tli = *tli_p;
+	char		path[MAXPGPATH];
+	int			fd;
+
+	XLogFilePath(path, tli, nextSegNo, segcxt->ws_segsize);
+	fd = BasicOpenFile(path, O_RDONLY | PG_BINARY);
+	if (fd >= 0)
+		return fd;
+
+	if (errno != ENOENT)
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not open file \"%s\": %m",
+						path)));
+
+	return -1;					/* keep compiler quiet */
+}
+
 /* openSegment callback for WALRead */
 static int
 wal_segment_open(XLogSegNo nextSegNo, WALSegmentContext * segcxt,
@@ -831,58 +856,92 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 	TimeLineID	tli;
 	int			count;
 	WALReadError errinfo;
+	bool		try_read = false;
+	XLogReadLocalOptions *options =
+		(XLogReadLocalOptions *) state->read_page_data;
 
 	loc = targetPagePtr + reqLen;
 
 	/* Loop waiting for xlog to be available if necessary */
 	while (1)
 	{
-		/*
-		 * Determine the limit of xlog we can currently read to, and what the
-		 * most recent timeline is.
-		 *
-		 * RecoveryInProgress() will update ThisTimeLineID when it first
-		 * notices recovery finishes, so we only have to maintain it for the
-		 * local process until recovery ends.
-		 */
-		if (!RecoveryInProgress())
-			read_upto = GetFlushRecPtr();
-		else
-			read_upto = GetXLogReplayRecPtr(&ThisTimeLineID);
-		tli = ThisTimeLineID;
+		switch (options ? options->read_upto_policy : -1)
+		{
+		case XLRO_WALRCV_WRITTEN:
+			/*
+			 * We'll try to read as far as has been written by the WAL
+			 * receiver, on the requested timeline.  When we run out of valid
+			 * data, we'll return an error.  This is used by xlogprefetch.c
+			 * while streaming.
+			 */
+			read_upto = GetWalRcvWriteRecPtr();
+			try_read = true;
+			state->currTLI = tli = options->tli;
+			break;
 
-		/*
-		 * Check which timeline to get the record from.
-		 *
-		 * We have to do it each time through the loop because if we're in
-		 * recovery as a cascading standby, the current timeline might've
-		 * become historical. We can't rely on RecoveryInProgress() because in
-		 * a standby configuration like
-		 *
-		 * A => B => C
-		 *
-		 * if we're a logical decoding session on C, and B gets promoted, our
-		 * timeline will change while we remain in recovery.
-		 *
-		 * We can't just keep reading from the old timeline as the last WAL
-		 * archive in the timeline will get renamed to .partial by
-		 * StartupXLOG().
-		 *
-		 * If that happens after our caller updated ThisTimeLineID but before
-		 * we actually read the xlog page, we might still try to read from the
-		 * old (now renamed) segment and fail. There's not much we can do
-		 * about this, but it can only happen when we're a leaf of a cascading
-		 * standby whose master gets promoted while we're decoding, so a
-		 * one-off ERROR isn't too bad.
-		 */
-		XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
+		case XLRO_END:
+			/*
+			 * We'll try to read as far as we can on one timeline.  This is
+			 * used by xlogprefetch.c for crash recovery.
+			 */
+			read_upto = (XLogRecPtr) -1;
+			try_read = true;
+			state->currTLI = tli = options->tli;
+			break;
+
+		default:
+			/*
+			 * Determine the limit of xlog we can currently read to, and what the
+			 * most recent timeline is.
+			 *
+			 * RecoveryInProgress() will update ThisTimeLineID when it first
+			 * notices recovery finishes, so we only have to maintain it for
+			 * the local process until recovery ends.
+			 */
+			if (!RecoveryInProgress())
+				read_upto = GetFlushRecPtr();
+			else
+				read_upto = GetXLogReplayRecPtr(&ThisTimeLineID);
+			tli = ThisTimeLineID;
+
+			/*
+			 * Check which timeline to get the record from.
+			 *
+			 * We have to do it each time through the loop because if we're in
+			 * recovery as a cascading standby, the current timeline might've
+			 * become historical. We can't rely on RecoveryInProgress()
+			 * because in a standby configuration like
+			 *
+			 * A => B => C
+			 *
+			 * if we're a logical decoding session on C, and B gets promoted,
+			 * our timeline will change while we remain in recovery.
+			 *
+			 * We can't just keep reading from the old timeline as the last
+			 * WAL archive in the timeline will get renamed to .partial by
+			 * StartupXLOG().
+			 *
+			 * If that happens after our caller updated ThisTimeLineID but
+			 * before we actually read the xlog page, we might still try to
+			 * read from the old (now renamed) segment and fail. There's not
+			 * much we can do about this, but it can only happen when we're a
+			 * leaf of a cascading standby whose master gets promoted while
+			 * we're decoding, so a one-off ERROR isn't too bad.
+			 */
+			XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
+			break;
+		}
 
-		if (state->currTLI == ThisTimeLineID)
+		if (state->currTLI == tli)
 		{
 
 			if (loc <= read_upto)
 				break;
 
+			/* not enough data there, but we were asked not to wait */
+			if (options && options->nowait)
+				return XLOGPAGEREAD_WOULDBLOCK;
+
 			CHECK_FOR_INTERRUPTS();
 			pg_usleep(1000L);
 		}
@@ -924,7 +983,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 	else if (targetPagePtr + reqLen > read_upto)
 	{
 		/* not enough data there */
-		return -1;
+		return XLOGPAGEREAD_ERROR;
 	}
 	else
 	{
@@ -938,8 +997,18 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 	 * zero-padded up to the page boundary if it's incomplete.
 	 */
 	if (!WALRead(cur_page, targetPagePtr, XLOG_BLCKSZ, tli, &state->seg,
-				 &state->segcxt, wal_segment_open, &errinfo))
+				 &state->segcxt,
+				 try_read ? wal_segment_try_open : wal_segment_open,
+				 &errinfo))
+	{
+		/*
+		 * When on one single timeline, we may read past the end of available
+		 * segments.  Report lack of file as an error.
+		 */
+		if (try_read)
+			return XLOGPAGEREAD_ERROR;
 		WALReadRaiseError(&errinfo);
+	}
 
 	/* number of valid bytes in the buffer */
 	return count;
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 122d884f3e..15ff3d35e4 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -818,7 +818,7 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 
 	/* fail if not (implies we are going to shut down) */
 	if (flushptr < targetPagePtr + reqLen)
-		return -1;
+		return XLOGPAGEREAD_ERROR;
 
 	if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
 		count = XLOG_BLCKSZ;	/* more than one block available */
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 4582196e18..a3ac7f414b 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -50,6 +50,10 @@ typedef struct WALSegmentContext
 
 typedef struct XLogReaderState XLogReaderState;
 
+/* Special negative return values for XLogPageReadCB functions */
+#define XLOGPAGEREAD_ERROR		-1
+#define XLOGPAGEREAD_WOULDBLOCK	-2
+
 /* Function type definition for the read_page callback */
 typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader,
 							   XLogRecPtr targetPagePtr,
@@ -99,10 +103,13 @@ struct XLogReaderState
 	 * This callback shall read at least reqLen valid bytes of the xlog page
 	 * starting at targetPagePtr, and store them in readBuf.  The callback
 	 * shall return the number of bytes read (never more than XLOG_BLCKSZ), or
-	 * -1 on failure.  The callback shall sleep, if necessary, to wait for the
-	 * requested bytes to become available.  The callback will not be invoked
-	 * again for the same page unless more than the returned number of bytes
-	 * are needed.
+	 * XLOGPAGEREAD_ERROR on failure.  The callback may either sleep or return
+	 * XLOGPAGEREAD_WOULDBLOCK, if necessary, to wait for the requested bytes
+	 * to become available.  If a callback that can return
+	 * XLOGPAGEREAD_WOULDBLOCK is installed, the reader client must expect to
+	 * fail to read when there is not enough data.  The callback will not be
+	 * invoked again for the same page unless more than the returned number of
+	 * bytes are needed.
 	 *
 	 * targetRecPtr is the position of the WAL record we're reading.  Usually
 	 * it is equal to targetPagePtr + reqLen, but sometimes xlogreader needs
@@ -126,6 +133,11 @@ struct XLogReaderState
 	 */
 	void	   *private_data;
 
+	/*
+	 * Opaque data for callbacks to use.  Not used by XLogReader.
+	 */
+	void	   *read_page_data;
+
 	/*
 	 * Start and end point of last record read.  EndRecPtr is also used as the
 	 * position to read next.  Calling XLogBeginRead() sets EndRecPtr to the
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index 5181a077d9..89c9ce90f8 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -47,6 +47,29 @@ extern Buffer XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
 extern Relation CreateFakeRelcacheEntry(RelFileNode rnode);
 extern void FreeFakeRelcacheEntry(Relation fakerel);
 
+/*
+ * A pointer to an XLogReadLocalOptions struct can supplied as the private data
+ * for an XLogReader, causing read_local_xlog_page() to modify its behavior.
+ */
+typedef struct XLogReadLocalOptions
+{
+	/* Don't block waiting for new WAL to arrive. */
+	bool		nowait;
+
+	/*
+	 * For XLRO_WALRCV_WRITTEN and XLRO_END modes, the timeline ID must be
+	 * provided.
+	 */
+	TimeLineID	tli;
+
+	/* How far to read. */
+	enum {
+		XLRO_STANDARD,
+		XLRO_WALRCV_WRITTEN,
+		XLRO_END
+	} read_upto_policy;
+} XLogReadLocalOptions;
+
 extern int	read_local_xlog_page(XLogReaderState *state,
 								 XLogRecPtr targetPagePtr, int reqLen,
 								 XLogRecPtr targetRecPtr, char *cur_page);
-- 
2.20.1

