From 036b1a50ad79ebfde990e178bead9ba03c753d02 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Wed, 17 Apr 2019 14:15:16 +0900
Subject: [PATCH 02/10] Make ReadPageInternal a state machine

This patch set aims to remove read_page call back from
XLogReaderState. This is done in two steps. This patch does the first
step. Makes ReadPageInternal, which currently calling read_page
callback, a state machine and move the caller sites to XLogReadRecord
and other direct callers of the callback.
---
 src/backend/access/transam/xlog.c              |  15 ++-
 src/backend/access/transam/xlogreader.c        | 180 +++++++++++++++----------
 src/backend/access/transam/xlogutils.c         |   8 +-
 src/backend/replication/logical/logicalfuncs.c |   6 +-
 src/backend/replication/walsender.c            |  10 +-
 src/bin/pg_rewind/parsexlog.c                  |  17 ++-
 src/bin/pg_waldump/pg_waldump.c                |   8 +-
 src/include/access/xlogreader.h                |  39 ++++--
 src/include/access/xlogutils.h                 |   2 +-
 src/include/replication/logicalfuncs.h         |   2 +-
 10 files changed, 177 insertions(+), 110 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 1c7dd51b9f..d1a29fe5cd 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -885,7 +885,7 @@ static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
 static int	XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
 						 int source, bool notfoundOk);
 static int	XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source);
-static int	XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
+static void	XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
 						 int reqLen, XLogRecPtr targetRecPtr, char *readBuf,
 						 TimeLineID *readTLI);
 static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
@@ -11507,7 +11507,7 @@ CancelBackup(void)
  * XLogPageRead() to try fetching the record from another source, or to
  * sleep and retry.
  */
-static int
+static void
 XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
 			 XLogRecPtr targetRecPtr, char *readBuf, TimeLineID *readTLI)
 {
@@ -11566,7 +11566,8 @@ retry:
 			readLen = 0;
 			readSource = 0;
 
-			return -1;
+			xlogreader->readLen = -1;
+			return;
 		}
 	}
 
@@ -11661,7 +11662,8 @@ retry:
 		goto next_record_is_invalid;
 	}
 
-	return readLen;
+	xlogreader->readLen = readLen;
+	return;
 
 next_record_is_invalid:
 	lastSourceFailed = true;
@@ -11675,8 +11677,9 @@ next_record_is_invalid:
 	/* In standby-mode, keep trying */
 	if (StandbyMode)
 		goto retry;
-	else
-		return -1;
+
+	xlogreader->readLen = -1;
+	return;
 }
 
 /*
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index baf04b9f1f..f7499aff5e 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -118,8 +118,8 @@ static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
 								  XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess);
 static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record,
 							XLogRecPtr recptr);
-static int	ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr,
-							 int reqLen);
+static bool XLogNeedData(XLogReaderState *state, XLogRecPtr pageptr,
+						 int reqLen);
 static void report_invalid_record(XLogReaderState *state, const char *fmt,...) pg_attribute_printf(2, 3);
 
 static void ResetDecoder(XLogReaderState *state);
@@ -306,7 +306,6 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
 	uint32		targetRecOff;
 	uint32		pageHeaderSize;
 	bool		gotheader;
-	int			readOff;
 
 	/*
 	 * randAccess indicates whether to verify the previous-record pointer of
@@ -358,15 +357,17 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
 	 * byte to cover the whole record header, or at least the part of it that
 	 * fits on the same page.
 	 */
-	readOff = ReadPageInternal(state,
-							   targetPagePtr,
-							   Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ));
-	if (readOff < 0)
+	while (XLogNeedData(state, targetPagePtr,
+						Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ)))
+		state->read_page(state, state->loadPagePtr, state->loadLen,
+						 state->currRecPtr, state->readBuf,
+						 &state->readPageTLI);
+	
+	if (state->readLen < 0)
 		goto err;
 
 	/*
-	 * ReadPageInternal always returns at least the page header, so we can
-	 * examine it now.
+	 * We have loaded at least the page header, so we can examine it now.
 	 */
 	pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
 	if (targetRecOff == 0)
@@ -392,8 +393,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
 		goto err;
 	}
 
-	/* ReadPageInternal has verified the page header */
-	Assert(pageHeaderSize <= readOff);
+	/* XLogNeedData has verified the page header */
+	Assert(pageHeaderSize <= state->readLen);
 
 	/*
 	 * Read the record length.
@@ -470,14 +471,17 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
 			targetPagePtr += XLOG_BLCKSZ;
 
 			/* Wait for the next page to become available */
-			readOff = ReadPageInternal(state, targetPagePtr,
-									   Min(total_len - gotlen + SizeOfXLogShortPHD,
-										   XLOG_BLCKSZ));
+			while (XLogNeedData(state, targetPagePtr,
+								Min(total_len - gotlen + SizeOfXLogShortPHD,
+									XLOG_BLCKSZ)))
+				state->read_page(state, state->loadPagePtr, state->loadLen,
+								 state->currRecPtr, state->readBuf,
+								 &state->readPageTLI);
 
-			if (readOff < 0)
+			if (state->readLen < 0)
 				goto err;
 
-			Assert(SizeOfXLogShortPHD <= readOff);
+			Assert(SizeOfXLogShortPHD <= state->readLen);
 
 			/* Check that the continuation on next page looks valid */
 			pageHeader = (XLogPageHeader) state->readBuf;
@@ -506,20 +510,28 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
 			/* Append the continuation from this page to the buffer */
 			pageHeaderSize = XLogPageHeaderSize(pageHeader);
 
-			if (readOff < pageHeaderSize)
-				readOff = ReadPageInternal(state, targetPagePtr,
-										   pageHeaderSize);
+			if (state->readLen < pageHeaderSize)
+			{
+				while (XLogNeedData(state, targetPagePtr, pageHeaderSize))
+					state->read_page(state, state->loadPagePtr, state->loadLen,
+									 state->currRecPtr, state->readBuf,
+									 &state->readPageTLI);
+			}
 
-			Assert(pageHeaderSize <= readOff);
+			Assert(pageHeaderSize <= state->readLen);
 
 			contdata = (char *) state->readBuf + pageHeaderSize;
 			len = XLOG_BLCKSZ - pageHeaderSize;
 			if (pageHeader->xlp_rem_len < len)
 				len = pageHeader->xlp_rem_len;
 
-			if (readOff < pageHeaderSize + len)
-				readOff = ReadPageInternal(state, targetPagePtr,
-										   pageHeaderSize + len);
+			if (state->readLen < pageHeaderSize + len)
+			{
+				if (XLogNeedData(state, targetPagePtr, pageHeaderSize + len))
+					state->read_page(state, state->loadPagePtr, state->loadLen,
+									 state->currRecPtr, state->readBuf,
+									 &state->readPageTLI);
+			}
 
 			memcpy(buffer, (char *) contdata, len);
 			buffer += len;
@@ -550,9 +562,13 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
 	else
 	{
 		/* Wait for the record data to become available */
-		readOff = ReadPageInternal(state, targetPagePtr,
-								   Min(targetRecOff + total_len, XLOG_BLCKSZ));
-		if (readOff < 0)
+		while (XLogNeedData(state, targetPagePtr,
+							Min(targetRecOff + total_len, XLOG_BLCKSZ)))
+			state->read_page(state, state->loadPagePtr, state->loadLen,
+							 state->currRecPtr, state->readBuf,
+							 &state->readPageTLI);
+
+		if (state->readLen < 0)
 			goto err;
 
 		/* Record does not cross a page boundary */
@@ -595,39 +611,48 @@ err:
 }
 
 /*
- * Read a single xlog page including at least [pageptr, reqLen] of valid data
- * via the read_page() callback.
+ * Checks that an xlog page loaded in state->readBuf is including at least
+ * [pageptr, reqLen] and the page is valid.
  *
- * Returns -1 if the required page cannot be read for some reason; errormsg_buf
- * is set in that case (unless the error occurs in the read_page callback).
+ * Returns false if the required data is fully loaded. state->readLen is set to
+ * -1 when the loaded data is found to be invalid.
  *
- * We fetch the page from a reader-local cache if we know we have the required
- * data and if there hasn't been any error since caching the data.
+ * Otherwise, returns true and requests data using state->loadPagePtr and
+ * state->loadLen. The caller should load the region to state->readBuf and
+ * call this function again.
+ *
+ * Note: This function is not reentrant. The state is maintained internally in
+ * the function. DO NOT ereport(ERROR)-out from inside of this function.
  */
-static int
-ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
+static bool
+XLogNeedData(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
 {
-	int			readLen;
-	uint32		targetPageOff;
-	XLogSegNo	targetSegNo;
-	XLogPageHeader hdr;
+	/*
+	 * This function is a state machine that can exit and reenter at any place
+	 * marked as XLR_LEAVE. All internal state is preserved through multiple
+	 * calls.
+	 */
+	static uint32		targetPageOff;
+	static XLogSegNo	targetSegNo;
+	static XLogPageHeader hdr;
 
-	Assert((pageptr % XLOG_BLCKSZ) == 0);
+	XLR_SWITCH (XLND_STATE_INIT);
 
 	XLByteToSeg(pageptr, targetSegNo, state->wal_segment_size);
 	targetPageOff = XLogSegmentOffset(pageptr, state->wal_segment_size);
+	Assert((pageptr % XLOG_BLCKSZ) == 0);
 
 	/* check whether we have all the requested data already */
 	if (targetSegNo == state->readSegNo && targetPageOff == state->readOff &&
 		reqLen <= state->readLen)
-		return state->readLen;
+		XLR_RETURN(false);
 
 	/*
 	 * Data is not in our buffer.
 	 *
 	 * Every time we actually read the page, even if we looked at parts of it
-	 * before, we need to do verification as the read_page callback might now
-	 * be rereading data from a different source.
+	 * before, we need to do verification as the caller might now be rereading
+	 * data from a different source.
 	 *
 	 * Whenever switching to a new WAL segment, we read the first page of the
 	 * file and validate its header, even if that's not where the target
@@ -636,18 +661,17 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
 	 */
 	if (targetSegNo != state->readSegNo && targetPageOff != 0)
 	{
-		XLogRecPtr	targetSegmentPtr = pageptr - targetPageOff;
+		state->loadPagePtr = pageptr - targetPageOff;
+		state->loadLen = XLOG_BLCKSZ;
+		XLR_LEAVE(XLND_STATE_SEGHEADER, true);
 
-		readLen = state->read_page(state, targetSegmentPtr, XLOG_BLCKSZ,
-								   state->currRecPtr,
-								   state->readBuf, &state->readPageTLI);
-		if (readLen < 0)
+		if (state->readLen < 0)
 			goto err;
 
 		/* we can be sure to have enough WAL available, we scrolled back */
-		Assert(readLen == XLOG_BLCKSZ);
+		Assert(state->readLen == XLOG_BLCKSZ);
 
-		if (!XLogReaderValidatePageHeader(state, targetSegmentPtr,
+		if (!XLogReaderValidatePageHeader(state, state->loadPagePtr,
 										  state->readBuf))
 			goto err;
 	}
@@ -656,48 +680,53 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
 	 * First, read the requested data length, but at least a short page header
 	 * so that we can validate it.
 	 */
-	readLen = state->read_page(state, pageptr, Max(reqLen, SizeOfXLogShortPHD),
-							   state->currRecPtr,
-							   state->readBuf, &state->readPageTLI);
-	if (readLen < 0)
+	state->loadPagePtr = pageptr;
+	state->loadLen = Max(reqLen, SizeOfXLogShortPHD);
+	XLR_LEAVE(XLND_STATE_PAGEHEADER, true);
+
+	if (state->readLen < 0)
 		goto err;
 
-	Assert(readLen <= XLOG_BLCKSZ);
+	Assert(state->readLen <= XLOG_BLCKSZ);
 
 	/* Do we have enough data to check the header length? */
-	if (readLen <= SizeOfXLogShortPHD)
+	if (state->readLen <= SizeOfXLogShortPHD)
 		goto err;
 
-	Assert(readLen >= reqLen);
+	Assert(state->readLen >= state->loadLen);
 
 	hdr = (XLogPageHeader) state->readBuf;
 
 	/* still not enough */
-	if (readLen < XLogPageHeaderSize(hdr))
+	if (state->readLen < XLogPageHeaderSize(hdr))
 	{
-		readLen = state->read_page(state, pageptr, XLogPageHeaderSize(hdr),
-								   state->currRecPtr,
-								   state->readBuf, &state->readPageTLI);
-		if (readLen < 0)
+		state->loadPagePtr = pageptr;
+		state->loadLen = XLogPageHeaderSize(hdr);
+		XLR_LEAVE(XLND_STATE_PAGELONGHEADER, true);
+
+		if (state->readLen < 0)
 			goto err;
 	}
 
+	XLR_SWITCH_END();
+
 	/*
 	 * Now that we know we have the full header, validate it.
 	 */
-	if (!XLogReaderValidatePageHeader(state, pageptr, (char *) hdr))
-		goto err;
+	if (!XLogReaderValidatePageHeader(state, pageptr, (char *) state->readBuf))
+			goto err;
 
 	/* update read state information */
 	state->readSegNo = targetSegNo;
 	state->readOff = targetPageOff;
-	state->readLen = readLen;
 
-	return readLen;
+	XLR_RETURN(false);
 
 err:
 	XLogReaderInvalReadState(state);
-	return -1;
+	state->readLen = -1;
+
+	XLR_RETURN(false);
 }
 
 /*
@@ -996,7 +1025,6 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
 		XLogRecPtr	targetPagePtr;
 		int			targetRecOff;
 		uint32		pageHeaderSize;
-		int			readLen;
 
 		/*
 		 * Compute targetRecOff. It should typically be equal or greater than
@@ -1004,7 +1032,7 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
 		 * that, except when caller has explicitly specified the offset that
 		 * falls somewhere there or when we are skipping multi-page
 		 * continuation record. It doesn't matter though because
-		 * ReadPageInternal() is prepared to handle that and will read at
+		 * CheckPage() is prepared to handle that and will read at
 		 * least short page-header worth of data
 		 */
 		targetRecOff = tmpRecPtr % XLOG_BLCKSZ;
@@ -1013,8 +1041,12 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
 		targetPagePtr = tmpRecPtr - targetRecOff;
 
 		/* Read the page containing the record */
-		readLen = ReadPageInternal(state, targetPagePtr, targetRecOff);
-		if (readLen < 0)
+		while(XLogNeedData(state, targetPagePtr, targetRecOff))
+			state->read_page(state, state->loadPagePtr, state->loadLen,
+							 state->currRecPtr, state->readBuf,
+							 &state->readPageTLI);
+
+		if (state->readLen < 0)
 			goto err;
 
 		header = (XLogPageHeader) state->readBuf;
@@ -1022,8 +1054,12 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
 		pageHeaderSize = XLogPageHeaderSize(header);
 
 		/* make sure we have enough data for the page header */
-		readLen = ReadPageInternal(state, targetPagePtr, pageHeaderSize);
-		if (readLen < 0)
+		while (XLogNeedData(state, targetPagePtr, pageHeaderSize))
+			state->read_page(state, state->loadPagePtr, state->loadLen,
+							 state->currRecPtr, state->readBuf,
+							 &state->readPageTLI);
+
+		if (state->readLen < 0)
 			goto err;
 
 		/* skip over potential continuation data */
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 10a663bae6..c853e1f0e3 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -907,7 +907,7 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
  * exists for normal backends, so we have to do a check/sleep/repeat style of
  * loop for now.
  */
-int
+void
 read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 					 int reqLen, XLogRecPtr targetRecPtr, char *cur_page,
 					 TimeLineID *pageTLI)
@@ -1009,7 +1009,8 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 	else if (targetPagePtr + reqLen > read_upto)
 	{
 		/* not enough data there */
-		return -1;
+		state->readLen = -1;
+		return;
 	}
 	else
 	{
@@ -1026,5 +1027,6 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 			 XLOG_BLCKSZ);
 
 	/* number of valid bytes in the buffer */
-	return count;
+	state->readLen = count;
+	return;
 }
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index d974400d6e..16c6095179 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -114,12 +114,12 @@ check_permissions(void)
 				 (errmsg("must be superuser or replication role to use replication slots"))));
 }
 
-int
+void
 logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 							 int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
 {
-	return read_local_xlog_page(state, targetPagePtr, reqLen,
-								targetRecPtr, cur_page, pageTLI);
+	read_local_xlog_page(state, targetPagePtr, reqLen,
+						 targetRecPtr, cur_page, pageTLI);
 }
 
 /*
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 3f31368022..082c98b2ec 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -761,7 +761,7 @@ StartReplication(StartReplicationCmd *cmd)
  * which has to do a plain sleep/busy loop, because the walsender's latch gets
  * set every time WAL is flushed.
  */
-static int
+static void
 logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
 					   XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
 {
@@ -779,7 +779,10 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 
 	/* fail if not (implies we are going to shut down) */
 	if (flushptr < targetPagePtr + reqLen)
-		return -1;
+	{
+		state->readLen = -1;
+		return;
+	}
 
 	if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
 		count = XLOG_BLCKSZ;	/* more than one block available */
@@ -789,7 +792,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 	/* now actually read the data, we know it's there */
 	XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
 
-	return count;
+	state->readLen = count;
+	return;
 }
 
 /*
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 287af60c4e..0bfbea8f09 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -47,7 +47,7 @@ typedef struct XLogPageReadPrivate
 	int			tliIndex;
 } XLogPageReadPrivate;
 
-static int	SimpleXLogPageRead(XLogReaderState *xlogreader,
+static void	SimpleXLogPageRead(XLogReaderState *xlogreader,
 							   XLogRecPtr targetPagePtr,
 							   int reqLen, XLogRecPtr targetRecPtr, char *readBuf,
 							   TimeLineID *pageTLI);
@@ -236,7 +236,7 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex,
 }
 
 /* XLogreader callback function, to read a WAL page */
-static int
+static void
 SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
 				   int reqLen, XLogRecPtr targetRecPtr, char *readBuf,
 				   TimeLineID *pageTLI)
@@ -291,7 +291,8 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
 		if (xlogreadfd < 0)
 		{
 			pg_log_error("could not open file \"%s\": %m", xlogfpath);
-			return -1;
+			xlogreader->readLen = -1;
+			return;
 		}
 	}
 
@@ -304,7 +305,8 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
 	if (lseek(xlogreadfd, (off_t) targetPageOff, SEEK_SET) < 0)
 	{
 		pg_log_error("could not seek in file \"%s\": %m", xlogfpath);
-		return -1;
+		xlogreader->readLen = -1;
+		return;
 	}
 
 
@@ -317,13 +319,16 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
 			pg_log_error("could not read file \"%s\": read %d of %zu",
 						 xlogfpath, r, (Size) XLOG_BLCKSZ);
 
-		return -1;
+		xlogreader->readLen = -1;
+		return;
 	}
 
 	Assert(targetSegNo == xlogreadsegno);
 
 	*pageTLI = targetHistory[private->tliIndex].tli;
-	return XLOG_BLCKSZ;
+
+	xlogreader->readLen = XLOG_BLCKSZ;
+	return;
 }
 
 /*
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index c40014d483..c748820a5f 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -421,7 +421,7 @@ XLogDumpXLogRead(const char *directory, TimeLineID timeline_id,
 /*
  * XLogReader read_page callback
  */
-static int
+static void
 XLogDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
 				 XLogRecPtr targetPtr, char *readBuff, TimeLineID *curFileTLI)
 {
@@ -437,14 +437,16 @@ XLogDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
 		else
 		{
 			private->endptr_reached = true;
-			return -1;
+			state->readLen = -1;
+			return;
 		}
 	}
 
 	XLogDumpXLogRead(private->inpath, private->timeline, targetPagePtr,
 					 readBuff, count);
 
-	return count;
+	state->readLen = count;
+	return;
 }
 
 /*
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 04228e2a87..d3b3e4b7ba 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -30,7 +30,7 @@
 typedef struct XLogReaderState XLogReaderState;
 
 /* Function type definition for the read_page callback */
-typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader,
+typedef void (*XLogPageReadCB) (XLogReaderState *xlogreader,
 							   XLogRecPtr targetPagePtr,
 							   int reqLen,
 							   XLogRecPtr targetRecPtr,
@@ -66,6 +66,15 @@ typedef struct
 	uint16		data_bufsz;
 } DecodedBkpBlock;
 
+/* internal state of XLogNeedData() */
+typedef enum xlnd_stateid
+{
+	XLND_STATE_INIT,
+	XLND_STATE_SEGHEADER,
+	XLND_STATE_PAGEHEADER,
+	XLND_STATE_PAGELONGHEADER
+} xlnd_stateid;
+
 struct XLogReaderState
 {
 	/* ----------------------------------------
@@ -120,6 +129,22 @@ struct XLogReaderState
 	XLogRecPtr	EndRecPtr;		/* end+1 of last record read */
 
 
+	/* ----------------------------------------
+	 * Communication with page reader
+	 * readBuf is XLOG_BLCKSZ bytes, valid up to at least readLen bytes.
+	 *  ----------------------------------------
+	 */
+	/* parameters to page reader */
+	XLogRecPtr	loadPagePtr;	/* Pointer to the page  */
+	int			loadLen;		/* wanted length in bytes */
+	char	   *readBuf;		/* buffer to store data */
+	XLogRecPtr	currRecPtr;		/* beginning of the WAL record being read */
+
+	/* return from page reader */
+	int32		readLen;		/* bytes actually read, must be at least
+								 * loadLen. -1 on error. */
+	TimeLineID	readPageTLI;	/* TLI for data currently in readBuf */
+
 	/* ----------------------------------------
 	 * Decoded representation of current record
 	 *
@@ -145,17 +170,9 @@ struct XLogReaderState
 	 * ----------------------------------------
 	 */
 
-	/*
-	 * Buffer for currently read page (XLOG_BLCKSZ bytes, valid up to at least
-	 * readLen bytes)
-	 */
-	char	   *readBuf;
-	uint32		readLen;
-
-	/* last read segment, segment offset, TLI for data currently in readBuf */
+	/* last read segment and segment offset for data currently in readBuf */
 	XLogSegNo	readSegNo;
 	uint32		readOff;
-	TimeLineID	readPageTLI;
 
 	/*
 	 * beginning of prior page read, and its TLI.  Doesn't necessarily
@@ -164,8 +181,6 @@ struct XLogReaderState
 	XLogRecPtr	latestPagePtr;
 	TimeLineID	latestPageTLI;
 
-	/* beginning of the WAL record being read. */
-	XLogRecPtr	currRecPtr;
 	/* timeline to read it from, 0 if a lookup is required */
 	TimeLineID	currTLI;
 
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index 4105b59904..be0c79d18c 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -47,7 +47,7 @@ extern Buffer XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
 extern Relation CreateFakeRelcacheEntry(RelFileNode rnode);
 extern void FreeFakeRelcacheEntry(Relation fakerel);
 
-extern int	read_local_xlog_page(XLogReaderState *state,
+extern void	read_local_xlog_page(XLogReaderState *state,
 								 XLogRecPtr targetPagePtr, int reqLen,
 								 XLogRecPtr targetRecPtr, char *cur_page,
 								 TimeLineID *pageTLI);
diff --git a/src/include/replication/logicalfuncs.h b/src/include/replication/logicalfuncs.h
index a9c178a9e6..b0306c54ab 100644
--- a/src/include/replication/logicalfuncs.h
+++ b/src/include/replication/logicalfuncs.h
@@ -11,7 +11,7 @@
 
 #include "replication/logical.h"
 
-extern int	logical_read_local_xlog_page(XLogReaderState *state,
+extern void	logical_read_local_xlog_page(XLogReaderState *state,
 										 XLogRecPtr targetPagePtr,
 										 int reqLen, XLogRecPtr targetRecPtr,
 										 char *cur_page, TimeLineID *pageTLI);
-- 
2.16.3

