From 81d3e58a1f017f34bb654cc4f66a4b9646469349 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Thu, 5 Sep 2019 20:21:55 +0900
Subject: [PATCH v6 1/4] Move callback-call from ReadPageInternal to
 XLogReadRecord.

The current WAL record reader reads page data using a call back
function.  Although it is not so problematic alone, it would be a
problem if we are going to do add tasks like encryption which is
performed on page data before WAL reader reads them. To avoid that the
record reader facility has to have a new code path corresponds to
every new callback, this patch separates page reader from WAL record
reading facility by modifying the current WAL record reader to a state
machine.

As the first step of that change, this patch moves the page reader
function out of ReadPageInternal, then the remaining tasks of the
function are taken over by the new function XLogNeedData. As the
result XLogPageRead directly calls the page reader callback function
according to the feedback from XLogNeedData.
---
 src/backend/access/transam/xlog.c              |  16 +-
 src/backend/access/transam/xlogreader.c        | 336 +++++++++++++++----------
 src/backend/access/transam/xlogutils.c         |  10 +-
 src/backend/replication/logical/logicalfuncs.c |   4 +-
 src/backend/replication/walsender.c            |  10 +-
 src/bin/pg_rewind/parsexlog.c                  |  17 +-
 src/bin/pg_waldump/pg_waldump.c                |   8 +-
 src/include/access/xlogreader.h                |  26 +-
 src/include/access/xlogutils.h                 |   2 +-
 src/include/replication/logicalfuncs.h         |   2 +-
 10 files changed, 265 insertions(+), 166 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 6876537b62..d7f899e738 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -884,7 +884,7 @@ static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
 static int	XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
 						 int source, bool notfoundOk);
 static int	XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source);
-static int	XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
+static bool	XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
 						 int reqLen, XLogRecPtr targetRecPtr, char *readBuf,
 						 TimeLineID *readTLI);
 static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
@@ -4249,7 +4249,6 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
 	XLogRecord *record;
 	XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data;
 
-	/* Pass through parameters to XLogPageRead */
 	private->fetching_ckpt = fetching_ckpt;
 	private->emode = emode;
 	private->randAccess = (RecPtr != InvalidXLogRecPtr);
@@ -11521,7 +11520,7 @@ CancelBackup(void)
  * XLogPageRead() to try fetching the record from another source, or to
  * sleep and retry.
  */
-static int
+static bool
 XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
 			 XLogRecPtr targetRecPtr, char *readBuf, TimeLineID *readTLI)
 {
@@ -11580,7 +11579,8 @@ retry:
 			readLen = 0;
 			readSource = 0;
 
-			return -1;
+			xlogreader->readLen = -1;
+			return false;
 		}
 	}
 
@@ -11675,7 +11675,8 @@ retry:
 		goto next_record_is_invalid;
 	}
 
-	return readLen;
+	xlogreader->readLen = readLen;
+	return true;
 
 next_record_is_invalid:
 	lastSourceFailed = true;
@@ -11689,8 +11690,9 @@ next_record_is_invalid:
 	/* In standby-mode, keep trying */
 	if (StandbyMode)
 		goto retry;
-	else
-		return -1;
+
+	xlogreader->readLen = -1;
+	return false;
 }
 
 /*
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index a66e3324b1..12a52159a9 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -34,9 +34,9 @@
 static void report_invalid_record(XLogReaderState *state, const char *fmt,...)
 			pg_attribute_printf(2, 3);
 static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength);
-static int	ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr,
-							 int reqLen);
-static void XLogReaderInvalReadState(XLogReaderState *state);
+static bool XLogNeedData(XLogReaderState *state, XLogRecPtr pageptr,
+						 int reqLen, bool header_inclusive);
+static void XLogReaderDiscardReadingPage(XLogReaderState *state);
 static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
 								  XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess);
 static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record,
@@ -101,7 +101,7 @@ XLogReaderAllocate(int wal_segment_size, XLogPageReadCB pagereadfunc,
 	/* system_identifier initialized to zeroes above */
 	state->private_data = private_data;
 	/* ReadRecPtr and EndRecPtr initialized to zeroes above */
-	/* readSegNo, readOff, readLen, readPageTLI initialized to zeroes above */
+	/* readSegNo, readLen, readPageTLI initialized to zeroes above */
 	state->errormsg_buf = palloc_extended(MAX_ERRORMSG_LEN + 1,
 										  MCXT_ALLOC_NO_OOM);
 	if (!state->errormsg_buf)
@@ -225,7 +225,6 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
 	uint32		targetRecOff;
 	uint32		pageHeaderSize;
 	bool		gotheader;
-	int			readOff;
 
 	/*
 	 * randAccess indicates whether to verify the previous-record pointer of
@@ -277,15 +276,21 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
 	 * byte to cover the whole record header, or at least the part of it that
 	 * fits on the same page.
 	 */
-	readOff = ReadPageInternal(state,
-							   targetPagePtr,
-							   Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ));
-	if (readOff < 0)
+	while (XLogNeedData(state, targetPagePtr,
+						Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ),
+						targetRecOff != 0))
+	{
+		if (!state->read_page(state, state->readPagePtr, state->readLen,
+							  RecPtr, state->readBuf,
+							  &state->readPageTLI))
+			break;
+	}
+
+	if (!state->page_verified)
 		goto err;
 
 	/*
-	 * ReadPageInternal always returns at least the page header, so we can
-	 * examine it now.
+	 * We have loaded at least the page header, so we can examine it now.
 	 */
 	pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
 	if (targetRecOff == 0)
@@ -311,8 +316,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
 		goto err;
 	}
 
-	/* ReadPageInternal has verified the page header */
-	Assert(pageHeaderSize <= readOff);
+	/* XLogNeedData has verified the page header */
+	Assert(pageHeaderSize <= state->readLen);
 
 	/*
 	 * Read the record length.
@@ -385,18 +390,26 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
 
 		do
 		{
+			int rest_len = total_len - gotlen;
+
 			/* Calculate pointer to beginning of next page */
 			targetPagePtr += XLOG_BLCKSZ;
 
 			/* Wait for the next page to become available */
-			readOff = ReadPageInternal(state, targetPagePtr,
-									   Min(total_len - gotlen + SizeOfXLogShortPHD,
-										   XLOG_BLCKSZ));
+			while (XLogNeedData(state, targetPagePtr,
+								Min(rest_len, XLOG_BLCKSZ),
+								false))
+			{
+				if (!state->read_page(state, state->readPagePtr, state->readLen,
+									  state->ReadRecPtr, state->readBuf,
+									  &state->readPageTLI))
+					break;
+			}
 
-			if (readOff < 0)
+			if (!state->page_verified)
 				goto err;
 
-			Assert(SizeOfXLogShortPHD <= readOff);
+			Assert(SizeOfXLogShortPHD <= state->readLen);
 
 			/* Check that the continuation on next page looks valid */
 			pageHeader = (XLogPageHeader) state->readBuf;
@@ -425,21 +438,14 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
 			/* Append the continuation from this page to the buffer */
 			pageHeaderSize = XLogPageHeaderSize(pageHeader);
 
-			if (readOff < pageHeaderSize)
-				readOff = ReadPageInternal(state, targetPagePtr,
-										   pageHeaderSize);
-
-			Assert(pageHeaderSize <= readOff);
+			Assert (pageHeaderSize <= state->readLen);
 
 			contdata = (char *) state->readBuf + pageHeaderSize;
 			len = XLOG_BLCKSZ - pageHeaderSize;
 			if (pageHeader->xlp_rem_len < len)
 				len = pageHeader->xlp_rem_len;
 
-			if (readOff < pageHeaderSize + len)
-				readOff = ReadPageInternal(state, targetPagePtr,
-										   pageHeaderSize + len);
-
+			Assert (pageHeaderSize + len <= state->readLen);
 			memcpy(buffer, (char *) contdata, len);
 			buffer += len;
 			gotlen += len;
@@ -469,9 +475,16 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
 	else
 	{
 		/* Wait for the record data to become available */
-		readOff = ReadPageInternal(state, targetPagePtr,
-								   Min(targetRecOff + total_len, XLOG_BLCKSZ));
-		if (readOff < 0)
+		while (XLogNeedData(state, targetPagePtr,
+							Min(targetRecOff + total_len, XLOG_BLCKSZ), true))
+		{
+			if (!state->read_page(state, state->readPagePtr, state->readLen,
+								  state->ReadRecPtr, state->readBuf,
+								  &state->readPageTLI))
+				break;
+		}
+
+		if (!state->page_verified)
 			goto err;
 
 		/* Record does not cross a page boundary */
@@ -505,7 +518,7 @@ err:
 	 * Invalidate the read state. We might read from a different source after
 	 * failure.
 	 */
-	XLogReaderInvalReadState(state);
+	XLogReaderDiscardReadingPage(state);
 
 	if (state->errormsg_buf[0] != '\0')
 		*errormsg = state->errormsg_buf;
@@ -514,120 +527,183 @@ err:
 }
 
 /*
- * Read a single xlog page including at least [pageptr, reqLen] of valid data
- * via the read_page() callback.
+ * Checks that an xlog page loaded in state->readBuf is including at least
+ * [pageptr, reqLen] and the page is valid. header_inclusive indicates that
+ * reqLen is calculated including page header length.
  *
- * Returns -1 if the required page cannot be read for some reason; errormsg_buf
- * is set in that case (unless the error occurs in the read_page callback).
+ * Returns false if the buffer already contains the requested data, or found
+ * error. state->page_verified is set to true for the former and false for the
+ * latter.
  *
- * We fetch the page from a reader-local cache if we know we have the required
- * data and if there hasn't been any error since caching the data.
+ * Otherwise returns true and requests data loaded onto state->readBuf by
+ * state->readPagePtr and state->readLen. The caller shall call this function
+ * again after filling the buffer at least with that portion of data and set
+ * state->readLen to the length of actually loaded data.
+ *
+ * If header_inclusive is false, corrects reqLen internally by adding the
+ * actual page header length and may request caller for new data.
  */
-static int
-ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
+static bool
+XLogNeedData(XLogReaderState *state, XLogRecPtr pageptr, int reqLen,
+			 bool header_inclusive)
 {
-	int			readLen;
 	uint32		targetPageOff;
 	XLogSegNo	targetSegNo;
-	XLogPageHeader hdr;
-
-	Assert((pageptr % XLOG_BLCKSZ) == 0);
-
-	XLByteToSeg(pageptr, targetSegNo, state->wal_segment_size);
-	targetPageOff = XLogSegmentOffset(pageptr, state->wal_segment_size);
+	uint32		addLen = 0;
 
 	/* check whether we have all the requested data already */
-	if (targetSegNo == state->readSegNo && targetPageOff == state->readOff &&
-		reqLen <= state->readLen)
-		return state->readLen;
+	if (state->page_verified &&	pageptr == state->readPagePtr)
+	{
+		if (!header_inclusive)
+		{
+			/*
+			 * calculate additional length for page header so that the total
+			 * length doesn't exceed the block size.
+			 */
+			uint32 pageHeaderSize =
+				XLogPageHeaderSize((XLogPageHeader) state->readBuf);
+
+			addLen = pageHeaderSize;
+			if (reqLen + pageHeaderSize <= XLOG_BLCKSZ)
+				addLen = pageHeaderSize;
+			else
+				addLen = XLOG_BLCKSZ - reqLen;
+		}
+
+		if (reqLen + addLen <= state->readLen)
+			return false;
+	}
+
+	/* Haven't loaded any data yet? Then request it. */
+	if (XLogRecPtrIsInvalid(state->readPagePtr) ||
+		state->readPagePtr != pageptr || state->readLen < 0)
+	{
+		state->readPagePtr = pageptr;
+		state->readLen = Max(reqLen, SizeOfXLogShortPHD);
+		state->page_verified = false;
+		return true;
+	}
+
+	if (!state->page_verified)
+	{
+		uint32	pageHeaderSize;
+		uint32	addLen = 0;
+
+		/* just loaded new data so needs to verify page header */
+
+		/* The caller must have loaded at least page header */
+		Assert(state->readLen >= SizeOfXLogShortPHD);
+
+		/*
+		 * We have enough data to check the header length. Recheck the loaded
+		 * length if it is a long header if any.
+		 */
+		pageHeaderSize =  XLogPageHeaderSize((XLogPageHeader) state->readBuf);
+
+		/*
+		 * If we have not loaded a page so far, readLen is zero, which is
+		 * shorter than pageHeaderSize here.
+		 */
+		if (state->readLen < pageHeaderSize)
+		{
+			state->readPagePtr = pageptr;
+			state->readLen = pageHeaderSize;
+			return true;
+		}
+
+		/*
+		 * Now that we know we have the full header, validate it.
+		 */
+		if (!XLogReaderValidatePageHeader(state, state->readPagePtr,
+										  (char *) state->readBuf))
+		{
+			/* force reading the page again. */
+			XLogReaderDiscardReadingPage(state);
+
+			return false;
+		}
+
+		state->page_verified = true;
+
+		XLByteToSeg(state->readPagePtr, state->readSegNo,
+					state->wal_segment_size);
+
+		/*
+		 * calculate additional length for page header so that the total
+		 * length doesn't exceed the block size.
+		 */
+		if (!header_inclusive)
+		{
+			addLen = pageHeaderSize;
+			if (reqLen + pageHeaderSize <= XLOG_BLCKSZ)
+				addLen = pageHeaderSize;
+			else
+				addLen = XLOG_BLCKSZ - reqLen;
+
+			Assert(addLen >= 0);
+		}
+
+		/* Usually we have requested data loaded in buffer here. */
+		if (pageptr == state->readPagePtr && reqLen + addLen <= state->readLen)
+			return false;
+
+		/*
+		 * In the case the page is requested for the first record in the page,
+		 * the previous request have been made not counting page header
+		 * length. Request again for the same page with the length known to be
+		 * needed. Otherwise we don't know the header length of the new page.
+		 */
+		if (pageptr != state->readPagePtr)
+			addLen = 0;
+	}
+
+	/* Data is not in our buffer, make a new load request to the caller. */
+	XLByteToSeg(pageptr, targetSegNo, state->wal_segment_size);
+	targetPageOff = XLogSegmentOffset(pageptr, state->wal_segment_size);
+	Assert((pageptr % XLOG_BLCKSZ) == 0);
+
+	/*
+	 * Every time we request to load new data of a page to the caller, even if
+	 * we looked at a part of it before, we need to do verification on the next
+	 * invocation as the caller might now be rereading data from a different
+	 * source.
+	 */
+	state->page_verified = false;
 
 	/*
-	 * Data is not in our buffer.
-	 *
-	 * Every time we actually read the page, even if we looked at parts of it
-	 * before, we need to do verification as the read_page callback might now
-	 * be rereading data from a different source.
-	 *
 	 * Whenever switching to a new WAL segment, we read the first page of the
 	 * file and validate its header, even if that's not where the target
 	 * record is.  This is so that we can check the additional identification
 	 * info that is present in the first page's "long" header.
+	 * Don't do this if the caller requested the first page in the segment.
 	 */
 	if (targetSegNo != state->readSegNo && targetPageOff != 0)
 	{
-		XLogRecPtr	targetSegmentPtr = pageptr - targetPageOff;
-
-		readLen = state->read_page(state, targetSegmentPtr, XLOG_BLCKSZ,
-								   state->currRecPtr,
-								   state->readBuf, &state->readPageTLI);
-		if (readLen < 0)
-			goto err;
-
-		/* we can be sure to have enough WAL available, we scrolled back */
-		Assert(readLen == XLOG_BLCKSZ);
-
-		if (!XLogReaderValidatePageHeader(state, targetSegmentPtr,
-										  state->readBuf))
-			goto err;
+		/*
+		 * Then we'll see that the targetSegNo now matches the readSegNo, and
+		 * will not come back here, but will request the actual target page.
+		 */
+		state->readPagePtr = pageptr - targetPageOff;
+		state->readLen = XLOG_BLCKSZ;
+		return true;
 	}
 
 	/*
-	 * First, read the requested data length, but at least a short page header
-	 * so that we can validate it.
+	 * Request the caller to load the requested page. We need at least a short
+	 * page header so that we can validate it.
 	 */
-	readLen = state->read_page(state, pageptr, Max(reqLen, SizeOfXLogShortPHD),
-							   state->currRecPtr,
-							   state->readBuf, &state->readPageTLI);
-	if (readLen < 0)
-		goto err;
-
-	Assert(readLen <= XLOG_BLCKSZ);
-
-	/* Do we have enough data to check the header length? */
-	if (readLen <= SizeOfXLogShortPHD)
-		goto err;
-
-	Assert(readLen >= reqLen);
-
-	hdr = (XLogPageHeader) state->readBuf;
-
-	/* still not enough */
-	if (readLen < XLogPageHeaderSize(hdr))
-	{
-		readLen = state->read_page(state, pageptr, XLogPageHeaderSize(hdr),
-								   state->currRecPtr,
-								   state->readBuf, &state->readPageTLI);
-		if (readLen < 0)
-			goto err;
-	}
-
-	/*
-	 * Now that we know we have the full header, validate it.
-	 */
-	if (!XLogReaderValidatePageHeader(state, pageptr, (char *) hdr))
-		goto err;
-
-	/* update read state information */
-	state->readSegNo = targetSegNo;
-	state->readOff = targetPageOff;
-	state->readLen = readLen;
-
-	return readLen;
-
-err:
-	XLogReaderInvalReadState(state);
-	return -1;
+	state->readPagePtr = pageptr;
+	state->readLen = Max(reqLen + addLen, SizeOfXLogShortPHD);
+	return true;
 }
 
 /*
- * Invalidate the xlogreader's read state to force a re-read.
+ * Invalidate current reading page buffer
  */
 static void
-XLogReaderInvalReadState(XLogReaderState *state)
+XLogReaderDiscardReadingPage(XLogReaderState *state)
 {
-	state->readSegNo = 0;
-	state->readOff = 0;
-	state->readLen = 0;
+	state->readPagePtr = InvalidXLogRecPtr;
 }
 
 /*
@@ -905,7 +981,6 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
 		XLogRecPtr	targetPagePtr;
 		int			targetRecOff;
 		uint32		pageHeaderSize;
-		int			readLen;
 
 		/*
 		 * Compute targetRecOff. It should typically be equal or greater than
@@ -913,7 +988,7 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
 		 * that, except when caller has explicitly specified the offset that
 		 * falls somewhere there or when we are skipping multi-page
 		 * continuation record. It doesn't matter though because
-		 * ReadPageInternal() is prepared to handle that and will read at
+		 * CheckPage() is prepared to handle that and will read at
 		 * least short page-header worth of data
 		 */
 		targetRecOff = tmpRecPtr % XLOG_BLCKSZ;
@@ -921,19 +996,24 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
 		/* scroll back to page boundary */
 		targetPagePtr = tmpRecPtr - targetRecOff;
 
-		/* Read the page containing the record */
-		readLen = ReadPageInternal(state, targetPagePtr, targetRecOff);
-		if (readLen < 0)
+		while(XLogNeedData(state, targetPagePtr, targetRecOff,
+						   targetRecOff != 0))
+		{
+			if (!state->read_page(state, state->readPagePtr, state->readLen,
+								  state->ReadRecPtr, state->readBuf,
+								  &state->readPageTLI))
+				break;
+		}
+
+		if (!state->page_verified)
 			goto err;
 
 		header = (XLogPageHeader) state->readBuf;
 
 		pageHeaderSize = XLogPageHeaderSize(header);
 
-		/* make sure we have enough data for the page header */
-		readLen = ReadPageInternal(state, targetPagePtr, pageHeaderSize);
-		if (readLen < 0)
-			goto err;
+		/* we should have read the page header */
+		Assert (state->readLen >= pageHeaderSize);
 
 		/* skip over potential continuation data */
 		if (header->xlp_info & XLP_FIRST_IS_CONTRECORD)
@@ -990,7 +1070,7 @@ out:
 	/* Reset state to what we had before finding the record */
 	state->ReadRecPtr = saved_state.ReadRecPtr;
 	state->EndRecPtr = saved_state.EndRecPtr;
-	XLogReaderInvalReadState(state);
+	XLogReaderDiscardReadingPage(state);
 
 	return found;
 }
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 1fc39333f1..dad9074b9f 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -803,7 +803,7 @@ void
 XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
 {
 	const XLogRecPtr lastReadPage = state->readSegNo *
-	state->wal_segment_size + state->readOff;
+	state->wal_segment_size + state->readLen;
 
 	Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0);
 	Assert(wantLength <= XLOG_BLCKSZ);
@@ -907,7 +907,7 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
  * exists for normal backends, so we have to do a check/sleep/repeat style of
  * loop for now.
  */
-int
+bool
 read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 					 int reqLen, XLogRecPtr targetRecPtr, char *cur_page,
 					 TimeLineID *pageTLI)
@@ -1009,7 +1009,8 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 	else if (targetPagePtr + reqLen > read_upto)
 	{
 		/* not enough data there */
-		return -1;
+		state->readLen = -1;
+		return false;
 	}
 	else
 	{
@@ -1026,5 +1027,6 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 			 XLOG_BLCKSZ);
 
 	/* number of valid bytes in the buffer */
-	return count;
+	state->readLen = count;
+	return true;
 }
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index d974400d6e..7210a940bd 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -114,12 +114,12 @@ check_permissions(void)
 				 (errmsg("must be superuser or replication role to use replication slots"))));
 }
 
-int
+bool
 logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 							 int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
 {
 	return read_local_xlog_page(state, targetPagePtr, reqLen,
-								targetRecPtr, cur_page, pageTLI);
+						 targetRecPtr, cur_page, pageTLI);
 }
 
 /*
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 23870a25a5..cc35e2a04d 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -761,7 +761,7 @@ StartReplication(StartReplicationCmd *cmd)
  * which has to do a plain sleep/busy loop, because the walsender's latch gets
  * set every time WAL is flushed.
  */
-static int
+static bool
 logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
 					   XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
 {
@@ -779,7 +779,10 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 
 	/* fail if not (implies we are going to shut down) */
 	if (flushptr < targetPagePtr + reqLen)
-		return -1;
+	{
+		state->readLen = -1;
+		return false;
+	}
 
 	if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
 		count = XLOG_BLCKSZ;	/* more than one block available */
@@ -789,7 +792,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 	/* now actually read the data, we know it's there */
 	XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
 
-	return count;
+	state->readLen = count;
+	return true;
 }
 
 /*
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 63c3879ead..4df53964e4 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -47,7 +47,7 @@ typedef struct XLogPageReadPrivate
 	int			tliIndex;
 } XLogPageReadPrivate;
 
-static int	SimpleXLogPageRead(XLogReaderState *xlogreader,
+static bool	SimpleXLogPageRead(XLogReaderState *xlogreader,
 							   XLogRecPtr targetPagePtr,
 							   int reqLen, XLogRecPtr targetRecPtr, char *readBuf,
 							   TimeLineID *pageTLI);
@@ -235,7 +235,7 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex,
 }
 
 /* XLogReader callback function, to read a WAL page */
-static int
+static bool
 SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
 				   int reqLen, XLogRecPtr targetRecPtr, char *readBuf,
 				   TimeLineID *pageTLI)
@@ -290,7 +290,8 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
 		if (xlogreadfd < 0)
 		{
 			pg_log_error("could not open file \"%s\": %m", xlogfpath);
-			return -1;
+			xlogreader->readLen = -1;
+			return false;
 		}
 	}
 
@@ -303,7 +304,8 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
 	if (lseek(xlogreadfd, (off_t) targetPageOff, SEEK_SET) < 0)
 	{
 		pg_log_error("could not seek in file \"%s\": %m", xlogfpath);
-		return -1;
+		xlogreader->readLen = -1;
+		return false;
 	}
 
 
@@ -316,13 +318,16 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
 			pg_log_error("could not read file \"%s\": read %d of %zu",
 						 xlogfpath, r, (Size) XLOG_BLCKSZ);
 
-		return -1;
+		xlogreader->readLen = -1;
+		return false;
 	}
 
 	Assert(targetSegNo == xlogreadsegno);
 
 	*pageTLI = targetHistory[private->tliIndex].tli;
-	return XLOG_BLCKSZ;
+
+	xlogreader->readLen = XLOG_BLCKSZ;
+	return true;
 }
 
 /*
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index b95d467805..96d1f36ebc 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -421,7 +421,7 @@ XLogDumpXLogRead(const char *directory, TimeLineID timeline_id,
 /*
  * XLogReader read_page callback
  */
-static int
+static bool
 XLogDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
 				 XLogRecPtr targetPtr, char *readBuff, TimeLineID *curFileTLI)
 {
@@ -437,14 +437,16 @@ XLogDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
 		else
 		{
 			private->endptr_reached = true;
-			return -1;
+			state->readLen = -1;
+			return false;
 		}
 	}
 
 	XLogDumpXLogRead(private->inpath, private->timeline, targetPagePtr,
 					 readBuff, count);
 
-	return count;
+	state->readLen = count;
+	return true;
 }
 
 /*
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 735b1bd2fd..6de7c19a2a 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -34,7 +34,7 @@
 typedef struct XLogReaderState XLogReaderState;
 
 /* Function type definition for the read_page callback */
-typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader,
+typedef bool (*XLogPageReadCB) (XLogReaderState *xlogreader,
 							   XLogRecPtr targetPagePtr,
 							   int reqLen,
 							   XLogRecPtr targetRecPtr,
@@ -123,6 +123,18 @@ struct XLogReaderState
 	XLogRecPtr	ReadRecPtr;		/* start of last record read */
 	XLogRecPtr	EndRecPtr;		/* end+1 of last record read */
 
+	/* ----------------------------------------
+	 * Communication with page reader
+	 * readBuf is XLOG_BLCKSZ bytes, valid up to at least readLen bytes.
+	 *  ----------------------------------------
+	 */
+	/* variables to communicate with page reader */
+	XLogRecPtr	readPagePtr;	/* page pointer to read */
+	int32		readLen;		/* bytes requested to reader, or actual bytes
+								 * read by reader, which must be larger than
+								 * the request, or -1 on error */
+	TimeLineID	readPageTLI;	/* TLI for data currently in readBuf */
+	char	   *readBuf;		/* buffer to store data */
 
 	/* ----------------------------------------
 	 * Decoded representation of current record
@@ -149,17 +161,9 @@ struct XLogReaderState
 	 * ----------------------------------------
 	 */
 
-	/*
-	 * Buffer for currently read page (XLOG_BLCKSZ bytes, valid up to at least
-	 * readLen bytes)
-	 */
-	char	   *readBuf;
-	uint32		readLen;
-
-	/* last read segment, segment offset, TLI for data currently in readBuf */
+	/* last read segment and segment offset for data currently in readBuf */
+	bool		page_verified;
 	XLogSegNo	readSegNo;
-	uint32		readOff;
-	TimeLineID	readPageTLI;
 
 	/*
 	 * beginning of prior page read, and its TLI.  Doesn't necessarily
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index 4105b59904..0842af9f95 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -47,7 +47,7 @@ extern Buffer XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
 extern Relation CreateFakeRelcacheEntry(RelFileNode rnode);
 extern void FreeFakeRelcacheEntry(Relation fakerel);
 
-extern int	read_local_xlog_page(XLogReaderState *state,
+extern bool	read_local_xlog_page(XLogReaderState *state,
 								 XLogRecPtr targetPagePtr, int reqLen,
 								 XLogRecPtr targetRecPtr, char *cur_page,
 								 TimeLineID *pageTLI);
diff --git a/src/include/replication/logicalfuncs.h b/src/include/replication/logicalfuncs.h
index a9c178a9e6..8e52b1f4aa 100644
--- a/src/include/replication/logicalfuncs.h
+++ b/src/include/replication/logicalfuncs.h
@@ -11,7 +11,7 @@
 
 #include "replication/logical.h"
 
-extern int	logical_read_local_xlog_page(XLogReaderState *state,
+extern bool	logical_read_local_xlog_page(XLogReaderState *state,
 										 XLogRecPtr targetPagePtr,
 										 int reqLen, XLogRecPtr targetRecPtr,
 										 char *cur_page, TimeLineID *pageTLI);
-- 
2.16.3

