From 1998e14fbad74aa2deb4c4bead40dd019b5a6706 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Tue, 10 Sep 2019 12:58:27 +0900
Subject: [PATCH v6 2/4] Move page-reader out of XLogReadRecord

This is the second step of removing callbacks from WAL record reader.
Since it is essential to take in additional data while reading a
record, the function have to ask caller for new data while keeping
working state. Thus the function is turned into a state machine.
---
 src/backend/access/transam/twophase.c          |  11 +-
 src/backend/access/transam/xlog.c              |  50 +-
 src/backend/access/transam/xlogreader.c        | 708 +++++++++++++++----------
 src/backend/access/transam/xlogutils.c         |  14 +-
 src/backend/replication/logical/logical.c      |  17 +-
 src/backend/replication/logical/logicalfuncs.c |  14 +-
 src/backend/replication/slotfuncs.c            |   8 +-
 src/backend/replication/walsender.c            |  14 +-
 src/bin/pg_rewind/parsexlog.c                  |  83 ++-
 src/bin/pg_waldump/pg_waldump.c                |  24 +-
 src/include/access/xlogreader.h                |  90 ++--
 src/include/access/xlogutils.h                 |   5 +-
 src/include/replication/logical.h              |   9 +-
 src/include/replication/logicalfuncs.h         |   5 +-
 14 files changed, 617 insertions(+), 435 deletions(-)

diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 477709bbc2..a736504d62 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -1386,15 +1386,20 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
 	XLogReaderState *xlogreader;
 	char	   *errormsg;
 
-	xlogreader = XLogReaderAllocate(wal_segment_size, &read_local_xlog_page,
-									NULL);
+	xlogreader = XLogReaderAllocate(wal_segment_size);
 	if (!xlogreader)
 		ereport(ERROR,
 				(errcode(ERRCODE_OUT_OF_MEMORY),
 				 errmsg("out of memory"),
 				 errdetail("Failed while allocating a WAL reading processor.")));
 
-	record = XLogReadRecord(xlogreader, lsn, &errormsg);
+	while (XLogReadRecord(xlogreader, lsn, &record, &errormsg) ==
+		   XLREAD_NEED_DATA)
+	{
+		if (!read_local_xlog_page(xlogreader))
+			break;
+	}
+
 	if (record == NULL)
 		ereport(ERROR,
 				(errcode_for_file_access(),
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index d7f899e738..34ab4ea359 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -803,13 +803,6 @@ static XLogSource readSource = 0;	/* XLOG_FROM_* code */
 static XLogSource currentSource = 0;	/* XLOG_FROM_* code */
 static bool lastSourceFailed = false;
 
-typedef struct XLogPageReadPrivate
-{
-	int			emode;
-	bool		fetching_ckpt;	/* are we fetching a checkpoint record? */
-	bool		randAccess;
-} XLogPageReadPrivate;
-
 /*
  * These variables track when we last obtained some WAL data to process,
  * and where we got it from.  (XLogReceiptSource is initially the same as
@@ -884,9 +877,8 @@ static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
 static int	XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
 						 int source, bool notfoundOk);
 static int	XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source);
-static bool	XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
-						 int reqLen, XLogRecPtr targetRecPtr, char *readBuf,
-						 TimeLineID *readTLI);
+static bool XLogPageRead(XLogReaderState *xlogreader,
+						 bool fetching_ckpt, int emode, bool randAccess);
 static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 										bool fetching_ckpt, XLogRecPtr tliRecPtr);
 static int	emode_for_corrupt_record(int emode, XLogRecPtr RecPtr);
@@ -1195,7 +1187,7 @@ XLogInsertRecord(XLogRecData *rdata,
 			appendBinaryStringInfo(&recordBuf, rdata->data, rdata->len);
 
 		if (!debug_reader)
-			debug_reader = XLogReaderAllocate(wal_segment_size, NULL, NULL);
+			debug_reader = XLogReaderAllocate(wal_segment_size);
 
 		if (!debug_reader)
 		{
@@ -4247,11 +4239,7 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
 		   bool fetching_ckpt)
 {
 	XLogRecord *record;
-	XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data;
-
-	private->fetching_ckpt = fetching_ckpt;
-	private->emode = emode;
-	private->randAccess = (RecPtr != InvalidXLogRecPtr);
+	bool		randAccess = (RecPtr != InvalidXLogRecPtr);
 
 	/* This is the first attempt to read this page. */
 	lastSourceFailed = false;
@@ -4259,8 +4247,15 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
 	for (;;)
 	{
 		char	   *errormsg;
+		XLogReadRecordResult result;
+
+		while ((result = XLogReadRecord(xlogreader, RecPtr, &record, &errormsg))
+			   == XLREAD_NEED_DATA)
+		{
+			if (!XLogPageRead(xlogreader, fetching_ckpt, emode, randAccess))
+				break;
+		}
 
-		record = XLogReadRecord(xlogreader, RecPtr, &errormsg);
 		ReadRecPtr = xlogreader->ReadRecPtr;
 		EndRecPtr = xlogreader->EndRecPtr;
 		if (record == NULL)
@@ -6210,7 +6205,6 @@ StartupXLOG(void)
 	bool		backupFromStandby = false;
 	DBState		dbstate_at_startup;
 	XLogReaderState *xlogreader;
-	XLogPageReadPrivate private;
 	bool		fast_promoted = false;
 	struct stat st;
 
@@ -6351,8 +6345,7 @@ StartupXLOG(void)
 		OwnLatch(&XLogCtl->recoveryWakeupLatch);
 
 	/* Set up XLOG reader facility */
-	MemSet(&private, 0, sizeof(XLogPageReadPrivate));
-	xlogreader = XLogReaderAllocate(wal_segment_size, &XLogPageRead, &private);
+	xlogreader = XLogReaderAllocate(wal_segment_size);
 	if (!xlogreader)
 		ereport(ERROR,
 				(errcode(ERRCODE_OUT_OF_MEMORY),
@@ -11521,12 +11514,14 @@ CancelBackup(void)
  * sleep and retry.
  */
 static bool
-XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
-			 XLogRecPtr targetRecPtr, char *readBuf, TimeLineID *readTLI)
+XLogPageRead(XLogReaderState *xlogreader,
+			 bool fetching_ckpt, int emode, bool randAccess)
 {
-	XLogPageReadPrivate *private =
-	(XLogPageReadPrivate *) xlogreader->private_data;
-	int			emode = private->emode;
+	char *readBuf				= xlogreader->readBuf;
+	XLogRecPtr targetPagePtr	= xlogreader->readPagePtr;
+	int reqLen					= xlogreader->readLen;
+	XLogRecPtr targetRecPtr		= xlogreader->ReadRecPtr;
+	TimeLineID *readTLI			= &xlogreader->readPageTLI;
 	uint32		targetPageOff;
 	XLogSegNo	targetSegNo PG_USED_FOR_ASSERTS_ONLY;
 	int			r;
@@ -11569,8 +11564,8 @@ retry:
 		 receivedUpto < targetPagePtr + reqLen))
 	{
 		if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen,
-										 private->randAccess,
-										 private->fetching_ckpt,
+										 randAccess,
+										 fetching_ckpt,
 										 targetRecPtr))
 		{
 			if (readFile >= 0)
@@ -11675,6 +11670,7 @@ retry:
 		goto next_record_is_invalid;
 	}
 
+	Assert(xlogreader->readPagePtr == targetPagePtr);
 	xlogreader->readLen = readLen;
 	return true;
 
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 12a52159a9..e00fa270ae 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -38,7 +38,7 @@ static bool XLogNeedData(XLogReaderState *state, XLogRecPtr pageptr,
 						 int reqLen, bool header_inclusive);
 static void XLogReaderDiscardReadingPage(XLogReaderState *state);
 static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
-								  XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess);
+								  XLogRecPtr PrevRecPtr, XLogRecord *record);
 static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record,
 							XLogRecPtr recptr);
 static void ResetDecoder(XLogReaderState *state);
@@ -68,8 +68,7 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...)
  * Returns NULL if the xlogreader couldn't be allocated.
  */
 XLogReaderState *
-XLogReaderAllocate(int wal_segment_size, XLogPageReadCB pagereadfunc,
-				   void *private_data)
+XLogReaderAllocate(int wal_segment_size)
 {
 	XLogReaderState *state;
 
@@ -97,11 +96,6 @@ XLogReaderAllocate(int wal_segment_size, XLogPageReadCB pagereadfunc,
 	}
 
 	state->wal_segment_size = wal_segment_size;
-	state->read_page = pagereadfunc;
-	/* system_identifier initialized to zeroes above */
-	state->private_data = private_data;
-	/* ReadRecPtr and EndRecPtr initialized to zeroes above */
-	/* readSegNo, readLen, readPageTLI initialized to zeroes above */
 	state->errormsg_buf = palloc_extended(MAX_ERRORMSG_LEN + 1,
 										  MCXT_ALLOC_NO_OOM);
 	if (!state->errormsg_buf)
@@ -201,321 +195,464 @@ allocate_recordbuf(XLogReaderState *state, uint32 reclength)
 /*
  * Attempt to read an XLOG record.
  *
- * If RecPtr is valid, try to read a record at that position.  Otherwise
- * try to read a record just after the last one previously read.
+ * When starting to read a new record, valid RecPtr starts reading the record
+ * at that position. If invalid RecPtr is given try to start reading a record
+ * just after the last one previously read. Anytime (means in any internal
+ * state) when valid new RecPtr is given, starts reading the record at that
+ * position. This function may return XLREAD_NEED_DATA several times before
+ * returning a result record. The caller shall read in some new data then call
+ * this function again with the same parameters.
  *
- * If the read_page callback fails to read the requested data, NULL is
- * returned.  The callback is expected to have reported the error; errormsg
- * is set to NULL.
+ * When a record is successfully read, returns XLREAD_SUCCESS with result
+ * record being stored in *record. Otherwise *record is NULL.
  *
- * If the reading fails for some other reason, NULL is also returned, and
- * *errormsg is set to a string with details of the failure.
+ * Returns XLREAD_NEED_DATA if more data is needed to finish reading the
+ * current record.  In that case, state->readPagePtr and state->readLen inform
+ * the desired position and minimum length of data needed. The caller shall
+ * read in the requested data and set state->readBuf to point to a buffer
+ * containing it. The caller must also set state->readPageTLI and
+ * state->readLen to indicate the timeline that it was read from, and the
+ * length of data that is now available (which must be >= given readLen),
+ * respectively.
  *
- * The returned pointer (or *errormsg) points to an internal buffer that's
- * valid until the next call to XLogReadRecord.
- */
-XLogRecord *
-XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
-{
-	XLogRecord *record;
-	XLogRecPtr	targetPagePtr;
-	bool		randAccess;
-	uint32		len,
-				total_len;
-	uint32		targetRecOff;
-	uint32		pageHeaderSize;
-	bool		gotheader;
+ * If invalid data is encountered, returns XLREAD_FAIL with *record being set to
+ * NULL. *errormsg is set to a string with details of the failure.
+ * The returned pointer (or *errormsg) points to an internal buffer that's valid
+ * until the next call to XLogReadRecord.
+ *
+ *
+ * This function runs a state machine consists of the following states.
+ *
+ * XLREAD_NEXT_RECORD :
+ *    The initial state, if called with valid RecPtr, try to read a record at
+ *    that position.  If invalid RecPtr is given try to read a record just after
+ *    the last one previously read.
+ *    This state ens after setting ReadRecPtr. Then goes to XLREAD_TOT_LEN.
+ *
+ * XLREAD_TOT_LEN:
+ *    Examining record header. Ends after reading record total
+ *    length. recordRemainLen and recordGotLen are initialized.
+ *
+ * XLREAD_FIRST_FRAGMENT:
+ *    Reading the first fragment. Ends with finishing reading a single
+ *    record. Goes to XLREAD_NEXT_RECORD if that's all or
+ *    XLREAD_CONTINUATION if we have continuation.
 
-	/*
-	 * randAccess indicates whether to verify the previous-record pointer of
-	 * the record we're reading.  We only do this if we're reading
-	 * sequentially, which is what we initially assume.
-	 */
-	randAccess = false;
+ * XLREAD_CONTINUATION:
+ *    Reading continuation of record. Ends with finishing the whole record then
+ *    goes to XLREAD_NEXT_RECORD. During this state, recordRemainLen indicates
+ *    how much is left and readRecordBuf holds the partially assert
+ *    record.recordContRecPtr points to the beginning of the next page where to
+ *    continue.
+ *
+ * If wrong data found in any state, the state machine stays at the current
+ * state. This behavior allows to continue reading a reacord switching among
+ * different souces, while streaming replication.
+ */
+XLogReadRecordResult
+XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, XLogRecord **record,
+			   char **errormsg)
+{
+	XLogRecord *prec;
+
+	*record = NULL;
 
 	/* reset error state */
 	*errormsg = NULL;
 	state->errormsg_buf[0] = '\0';
 
-	ResetDecoder(state);
-
-	if (RecPtr == InvalidXLogRecPtr)
-	{
-		/* No explicit start point; read the record after the one we just read */
-		RecPtr = state->EndRecPtr;
-
-		if (state->ReadRecPtr == InvalidXLogRecPtr)
-			randAccess = true;
-
-		/*
-		 * RecPtr is pointing to end+1 of the previous WAL record.  If we're
-		 * at a page boundary, no more records can fit on the current page. We
-		 * must skip over the page header, but we can't do that until we've
-		 * read in the page, since the header size is variable.
-		 */
-	}
-	else
-	{
-		/*
-		 * Caller supplied a position to start at.
-		 *
-		 * In this case, the passed-in record pointer should already be
-		 * pointing to a valid record starting position.
-		 */
-		Assert(XRecOffIsValid(RecPtr));
-		randAccess = true;
-	}
-
-	state->currRecPtr = RecPtr;
-
-	targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ);
-	targetRecOff = RecPtr % XLOG_BLCKSZ;
-
 	/*
-	 * Read the page containing the record into state->readBuf. Request enough
-	 * byte to cover the whole record header, or at least the part of it that
-	 * fits on the same page.
+	 * Reset to the initial state anytime the caller requested new record.
 	 */
-	while (XLogNeedData(state, targetPagePtr,
-						Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ),
-						targetRecOff != 0))
+	if (RecPtr != InvalidXLogRecPtr && RecPtr != state->ReadRecPtr)
+		state->readRecordState = XLREAD_NEXT_RECORD;
+
+	switch (state->readRecordState)
 	{
-		if (!state->read_page(state, state->readPagePtr, state->readLen,
-							  RecPtr, state->readBuf,
-							  &state->readPageTLI))
-			break;
-	}
+		case XLREAD_NEXT_RECORD:
+			ResetDecoder(state);
 
-	if (!state->page_verified)
-		goto err;
-
-	/*
-	 * We have loaded at least the page header, so we can examine it now.
-	 */
-	pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
-	if (targetRecOff == 0)
-	{
-		/*
-		 * At page start, so skip over page header.
-		 */
-		RecPtr += pageHeaderSize;
-		targetRecOff = pageHeaderSize;
-	}
-	else if (targetRecOff < pageHeaderSize)
-	{
-		report_invalid_record(state, "invalid record offset at %X/%X",
-							  (uint32) (RecPtr >> 32), (uint32) RecPtr);
-		goto err;
-	}
-
-	if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
-		targetRecOff == pageHeaderSize)
-	{
-		report_invalid_record(state, "contrecord is requested by %X/%X",
-							  (uint32) (RecPtr >> 32), (uint32) RecPtr);
-		goto err;
-	}
-
-	/* XLogNeedData has verified the page header */
-	Assert(pageHeaderSize <= state->readLen);
-
-	/*
-	 * Read the record length.
-	 *
-	 * NB: Even though we use an XLogRecord pointer here, the whole record
-	 * header might not fit on this page. xl_tot_len is the first field of the
-	 * struct, so it must be on this page (the records are MAXALIGNed), but we
-	 * cannot access any other fields until we've verified that we got the
-	 * whole header.
-	 */
-	record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ);
-	total_len = record->xl_tot_len;
-
-	/*
-	 * If the whole record header is on this page, validate it immediately.
-	 * Otherwise do just a basic sanity check on xl_tot_len, and validate the
-	 * rest of the header after reading it from the next page.  The xl_tot_len
-	 * check is necessary here to ensure that we enter the "Need to reassemble
-	 * record" code path below; otherwise we might fail to apply
-	 * ValidXLogRecordHeader at all.
-	 */
-	if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
-	{
-		if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, record,
-								   randAccess))
-			goto err;
-		gotheader = true;
-	}
-	else
-	{
-		/* XXX: more validation should be done here */
-		if (total_len < SizeOfXLogRecord)
-		{
-			report_invalid_record(state,
-								  "invalid record length at %X/%X: wanted %u, got %u",
-								  (uint32) (RecPtr >> 32), (uint32) RecPtr,
-								  (uint32) SizeOfXLogRecord, total_len);
-			goto err;
-		}
-		gotheader = false;
-	}
-
-	len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ;
-	if (total_len > len)
-	{
-		/* Need to reassemble record */
-		char	   *contdata;
-		XLogPageHeader pageHeader;
-		char	   *buffer;
-		uint32		gotlen;
-
-		/*
-		 * Enlarge readRecordBuf as needed.
-		 */
-		if (total_len > state->readRecordBufSize &&
-			!allocate_recordbuf(state, total_len))
-		{
-			/* We treat this as a "bogus data" condition */
-			report_invalid_record(state, "record length %u at %X/%X too long",
-								  total_len,
-								  (uint32) (RecPtr >> 32), (uint32) RecPtr);
-			goto err;
-		}
-
-		/* Copy the first fragment of the record from the first page. */
-		memcpy(state->readRecordBuf,
-			   state->readBuf + RecPtr % XLOG_BLCKSZ, len);
-		buffer = state->readRecordBuf + len;
-		gotlen = len;
-
-		do
-		{
-			int rest_len = total_len - gotlen;
-
-			/* Calculate pointer to beginning of next page */
-			targetPagePtr += XLOG_BLCKSZ;
-
-			/* Wait for the next page to become available */
-			while (XLogNeedData(state, targetPagePtr,
-								Min(rest_len, XLOG_BLCKSZ),
-								false))
+			if (RecPtr != InvalidXLogRecPtr)
 			{
-				if (!state->read_page(state, state->readPagePtr, state->readLen,
-									  state->ReadRecPtr, state->readBuf,
-									  &state->readPageTLI))
-					break;
+				/*
+				 * Caller supplied a position to start at.
+				 *
+				 * In this case, the passed-in record pointer should already be
+				 * pointing to a valid record starting position.
+				 */
+				state->ReadRecPtr = RecPtr;
+
+				/*
+				 * We cannot verify the previous-record pointer when we're
+				 * seeking to a particular record. Reset ReadRecPtr so that we
+				 * won't try doing that.
+				 */
+				state->PrevRecPtr = InvalidXLogRecPtr;
+				state->EndRecPtr = InvalidXLogRecPtr; 		/* to be tidy */
+			}
+			else
+			{
+				/*
+				 * Otherwise, read the record after the one we just read. (Or
+				 * the first record, if this is the first call. In that case,
+				 * EndRecPtr was set to the desired starting point above.)
+				 *
+				 * EndRecPtr is pointing to end+1 of the previous WAL record.
+				 * If we're at a page boundary, no more records can fit on the
+				 * current page. We must skip over the page header on the next
+				 * page, but we can't do that until we've read in the page,
+				 * since the header size is variable.
+				 */
+				state->PrevRecPtr = state->ReadRecPtr;
+				state->ReadRecPtr = state->EndRecPtr;
 			}
 
+			state->record_verified = false;
+			state->readRecordState = XLREAD_TOT_LEN;
+			/* fall through */
+
+		case XLREAD_TOT_LEN:
+		{
+			uint32		total_len;
+			uint32		pageHeaderSize;
+			XLogRecPtr	targetPagePtr;
+			uint32		targetRecOff;
+			XLogPageHeader pageHeader;
+
+			targetPagePtr =
+				state->ReadRecPtr - (state->ReadRecPtr % XLOG_BLCKSZ);
+			targetRecOff = state->ReadRecPtr % XLOG_BLCKSZ;
+
+			/*
+			 * Check if we have enough data. For the first record in the page,
+			 * the requesting length doesn't contain page header.
+			 */
+			if (XLogNeedData(state, targetPagePtr,
+							 Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ),
+							 targetRecOff != 0))
+				return XLREAD_NEED_DATA;
+
+			/* error out if caller supplied bogus page */
 			if (!state->page_verified)
 				goto err;
 
-			Assert(SizeOfXLogShortPHD <= state->readLen);
-
-			/* Check that the continuation on next page looks valid */
-			pageHeader = (XLogPageHeader) state->readBuf;
-			if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
+			/* examine page header now. */
+			pageHeaderSize =
+				XLogPageHeaderSize((XLogPageHeader) state->readBuf);
+			if (targetRecOff == 0)
 			{
-				report_invalid_record(state,
-									  "there is no contrecord flag at %X/%X",
-									  (uint32) (RecPtr >> 32), (uint32) RecPtr);
+				/* At page start, so skip over page header. */
+				state->ReadRecPtr += pageHeaderSize;
+				targetRecOff = pageHeaderSize;
+			}
+			else if (targetRecOff < pageHeaderSize)
+			{
+				report_invalid_record(state, "invalid record offset at %X/%X",
+									  (uint32) (state->ReadRecPtr >> 32),
+									  (uint32) state->ReadRecPtr);
 				goto err;
 			}
 
+			pageHeader = (XLogPageHeader) state->readBuf;
+			if ((pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
+				targetRecOff == pageHeaderSize)
+			{
+				report_invalid_record(state, "contrecord is requested by %X/%X",
+									  (uint32) (state->ReadRecPtr >> 32),
+									  (uint32) state->ReadRecPtr);
+				goto err;
+			}
+
+			/* XLogNeedData has verified the page header */
+			Assert(pageHeaderSize <= state->readLen);
+
+			/*
+			 * Read the record length.
+			 *
+			 * NB: Even though we use an XLogRecord pointer here, the whole
+			 * record header might not fit on this page. xl_tot_len is the first
+			 * field of the struct, so it must be on this page (the records are
+			 * MAXALIGNed), but we cannot access any other fields until we've
+			 * verified that we got the whole header.
+			 */
+			prec = (XLogRecord *) (state->readBuf +
+								   state->ReadRecPtr % XLOG_BLCKSZ);
+			total_len = prec->xl_tot_len;
+
+			/*
+			 * If the whole record header is on this page, validate it
+			 * immediately.  Otherwise do just a basic sanity check on
+			 * xl_tot_len, and validate the rest of the header after reading it
+			 * from the next page.  The xl_tot_len check is necessary here to
+			 * ensure that we enter the XLREAD_CONTINUATION state below;
+			 * otherwise we might fail to apply ValidXLogRecordHeader at all.
+			 */
+			if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
+			{
+				if (!ValidXLogRecordHeader(state, state->ReadRecPtr,
+										   state->PrevRecPtr, prec))
+					goto err;
+
+				state->record_verified = true;
+			}
+			else
+			{
+				/* XXX: more validation should be done here */
+				if (total_len < SizeOfXLogRecord)
+				{
+					report_invalid_record(state,
+										  "invalid record length at %X/%X: wanted %u, got %u",
+										  (uint32) (state->ReadRecPtr >> 32),
+										  (uint32) state->ReadRecPtr,
+										  (uint32) SizeOfXLogRecord, total_len);
+					goto err;
+				}
+			}
+
 			/*
-			 * Cross-check that xlp_rem_len agrees with how much of the record
-			 * we expect there to be left.
+			 * Wait for the rest of the record, or the part of the record that
+			 * fit on the first page if crossed a page boundary, to become
+			 * available.
 			 */
-			if (pageHeader->xlp_rem_len == 0 ||
-				total_len != (pageHeader->xlp_rem_len + gotlen))
+			state->recordGotLen = 0;
+			state->recordRemainLen = total_len;
+			state->readRecordState = XLREAD_FIRST_FRAGMENT;
+		}
+		/* fall through */
+
+		case XLREAD_FIRST_FRAGMENT:
+		{
+			uint32		total_len = state->recordRemainLen;
+			uint32		request_len;
+			uint32		record_len;
+			XLogRecPtr	targetPagePtr;
+			uint32		targetRecOff;
+
+			/*
+			 * Wait for the rest of the record on the first page to become
+			 * available
+			 */
+			targetPagePtr =
+				state->ReadRecPtr - (state->ReadRecPtr % XLOG_BLCKSZ);
+			targetRecOff = state->ReadRecPtr % XLOG_BLCKSZ;
+
+			request_len = Min(targetRecOff + total_len, XLOG_BLCKSZ);
+			record_len = request_len - targetRecOff;
+
+			/* ReadRecPtr contains page header */
+			Assert (targetRecOff != 0);
+			if (XLogNeedData(state, targetPagePtr, request_len, true))
+				return XLREAD_NEED_DATA;
+
+			/* error out if caller supplied bogus page */
+			if (!state->page_verified)
+				goto err;
+
+			prec = (XLogRecord *) (state->readBuf + targetRecOff);
+
+			/* validate record header if not yet */
+			if (!state->record_verified && record_len >= SizeOfXLogRecord)
 			{
+				if (!ValidXLogRecordHeader(state, state->ReadRecPtr,
+										   state->PrevRecPtr, prec))
+					goto err;
+
+				state->record_verified = true;
+			}
+
+
+			if (total_len == record_len)
+			{
+				/* Record does not cross a page boundary */
+				Assert(state->record_verified);
+
+				if (!ValidXLogRecord(state, prec, state->ReadRecPtr))
+					goto err;
+
+				state->record_verified = true;  /* to be tidy */
+
+				/* We already checked the header earlier */
+				state->EndRecPtr = state->ReadRecPtr + MAXALIGN(record_len);
+
+				*record = prec;
+				state->readRecordState = XLREAD_NEXT_RECORD;
+				break;
+			}
+
+			/*
+			 * The record continues on the next page. Need to reassemble
+			 * record
+			 */
+			Assert(total_len > record_len);
+
+			/* Enlarge readRecordBuf as needed. */
+			if (total_len > state->readRecordBufSize &&
+				!allocate_recordbuf(state, total_len))
+			{
+				/* We treat this as a "bogus data" condition */
 				report_invalid_record(state,
-									  "invalid contrecord length %u at %X/%X",
-									  pageHeader->xlp_rem_len,
-									  (uint32) (RecPtr >> 32), (uint32) RecPtr);
+									  "record length %u at %X/%X too long",
+									  total_len,
+									  (uint32) (state->ReadRecPtr >> 32),
+									  (uint32) state->ReadRecPtr);
 				goto err;
 			}
 
-			/* Append the continuation from this page to the buffer */
-			pageHeaderSize = XLogPageHeaderSize(pageHeader);
+			/* Copy the first fragment of the record from the first page. */
+			memcpy(state->readRecordBuf, state->readBuf + targetRecOff,
+				   record_len);
+			state->recordGotLen += record_len;
+			state->recordRemainLen -= record_len;
 
-			Assert (pageHeaderSize <= state->readLen);
+			/* Calculate pointer to beginning of next page */
+			state->recordContRecPtr = state->ReadRecPtr + record_len;
+			Assert(state->recordContRecPtr % XLOG_BLCKSZ == 0);
 
-			contdata = (char *) state->readBuf + pageHeaderSize;
-			len = XLOG_BLCKSZ - pageHeaderSize;
-			if (pageHeader->xlp_rem_len < len)
-				len = pageHeader->xlp_rem_len;
-
-			Assert (pageHeaderSize + len <= state->readLen);
-			memcpy(buffer, (char *) contdata, len);
-			buffer += len;
-			gotlen += len;
-
-			/* If we just reassembled the record header, validate it. */
-			if (!gotheader)
-			{
-				record = (XLogRecord *) state->readRecordBuf;
-				if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr,
-										   record, randAccess))
-					goto err;
-				gotheader = true;
-			}
-		} while (gotlen < total_len);
-
-		Assert(gotheader);
-
-		record = (XLogRecord *) state->readRecordBuf;
-		if (!ValidXLogRecord(state, record, RecPtr))
-			goto err;
-
-		pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
-		state->ReadRecPtr = RecPtr;
-		state->EndRecPtr = targetPagePtr + pageHeaderSize
-			+ MAXALIGN(pageHeader->xlp_rem_len);
-	}
-	else
-	{
-		/* Wait for the record data to become available */
-		while (XLogNeedData(state, targetPagePtr,
-							Min(targetRecOff + total_len, XLOG_BLCKSZ), true))
-		{
-			if (!state->read_page(state, state->readPagePtr, state->readLen,
-								  state->ReadRecPtr, state->readBuf,
-								  &state->readPageTLI))
-				break;
+			state->readRecordState = XLREAD_CONTINUATION;
 		}
+		/* fall through */
 
-		if (!state->page_verified)
-			goto err;
+		case XLREAD_CONTINUATION:
+		{
+			XLogPageHeader pageHeader;
+			uint32		pageHeaderSize;
+			XLogRecPtr	targetPagePtr;
 
-		/* Record does not cross a page boundary */
-		if (!ValidXLogRecord(state, record, RecPtr))
-			goto err;
+			/* we enter this state only if we haven't read the whole record. */
+			Assert (state->recordRemainLen > 0);
 
-		state->EndRecPtr = RecPtr + MAXALIGN(total_len);
+			while(state->recordRemainLen > 0)
+			{
+				char	   *contdata;
+				uint32		request_len;
+				uint32		record_len;
 
-		state->ReadRecPtr = RecPtr;
+				/* Wait for the next page to become available */
+				targetPagePtr = state->recordContRecPtr;
+
+				/* this request contains page header */
+				Assert (targetPagePtr != 0);
+				if (XLogNeedData(state, targetPagePtr,
+								 Min(state->recordRemainLen, XLOG_BLCKSZ),
+								 false))
+					return XLREAD_NEED_DATA;
+
+				if (!state->page_verified)
+					goto err;
+
+				Assert(SizeOfXLogShortPHD <= state->readLen);
+
+				/* Check that the continuation on next page looks valid */
+				pageHeader = (XLogPageHeader) state->readBuf;
+				if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
+				{
+					report_invalid_record(
+						state,
+						"there is no contrecord flag at %X/%X reading %X/%X",
+						(uint32) (state->recordContRecPtr >> 32),
+						(uint32) state->recordContRecPtr,
+						(uint32) (state->ReadRecPtr >> 32),
+						(uint32) state->ReadRecPtr);
+					goto err;
+				}
+
+				/*
+				 * Cross-check that xlp_rem_len agrees with how much of the
+				 * record we expect there to be left.
+				 */
+				if (pageHeader->xlp_rem_len == 0 ||
+					pageHeader->xlp_rem_len != state->recordRemainLen)
+				{
+					report_invalid_record(
+						state,
+						"invalid contrecord length %u at %X/%X reading %X/%X, expected %u",
+						pageHeader->xlp_rem_len,
+						(uint32) (state->recordContRecPtr >> 32),
+						(uint32) state->recordContRecPtr,
+						(uint32) (state->ReadRecPtr >> 32),
+						(uint32) state->ReadRecPtr,
+						state->recordRemainLen);
+					goto err;
+				}
+
+				/* Append the continuation from this page to the buffer */
+				pageHeaderSize = XLogPageHeaderSize(pageHeader);
+
+				/*
+				 * XLogNeedData should have ensured that the whole page header
+				 * was read
+				 */
+				Assert(state->readLen >= pageHeaderSize);
+
+				contdata = (char *) state->readBuf + pageHeaderSize;
+				record_len = XLOG_BLCKSZ - pageHeaderSize;
+				if (pageHeader->xlp_rem_len < record_len)
+					record_len = pageHeader->xlp_rem_len;
+
+				request_len = record_len + pageHeaderSize;
+
+				/* XLogNeedData should have ensured all needed data was read */
+				Assert (state->readLen >= request_len);
+
+				memcpy(state->readRecordBuf + state->recordGotLen,
+					   (char *) contdata, record_len);
+				state->recordGotLen += record_len;
+				state->recordRemainLen -= record_len;
+
+				/* If we just reassembled the record header, validate it. */
+				if (!state->record_verified)
+				{
+					Assert(state->recordGotLen >= SizeOfXLogRecord);
+					if (!ValidXLogRecordHeader(state, state->ReadRecPtr,
+											   state->PrevRecPtr,
+											   (XLogRecord *) state->readRecordBuf))
+						goto err;
+
+					state->record_verified = true;
+				}
+
+				/* Calculate pointer to beginning of next page, and continue */
+				state->recordContRecPtr += XLOG_BLCKSZ;
+			}
+
+			/* targetPagePtr is pointing the last-read page here */
+			prec = (XLogRecord *) state->readRecordBuf;
+			if (!ValidXLogRecord(state, prec, state->ReadRecPtr))
+				goto err;
+
+			pageHeaderSize =
+				XLogPageHeaderSize((XLogPageHeader) state->readBuf);
+			state->EndRecPtr = targetPagePtr + pageHeaderSize
+				+ MAXALIGN(pageHeader->xlp_rem_len);
+
+			*record = prec;
+			state->readRecordState = XLREAD_NEXT_RECORD;
+			break;
+		}
 	}
 
 	/*
 	 * Special processing if it's an XLOG SWITCH record
 	 */
-	if (record->xl_rmid == RM_XLOG_ID &&
-		(record->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH)
+	if ((*record)->xl_rmid == RM_XLOG_ID &&
+		((*record)->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH)
 	{
 		/* Pretend it extends to end of segment */
 		state->EndRecPtr += state->wal_segment_size - 1;
 		state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->wal_segment_size);
 	}
 
-	if (DecodeXLogRecord(state, record, errormsg))
-		return record;
-	else
-		return NULL;
+	Assert (!*record || state->readLen >= 0);
+	if (DecodeXLogRecord(state, *record, errormsg))
+		return XLREAD_SUCCESS;
+
+	*record = NULL;
+	return XLREAD_FAIL;
 
 err:
 
 	/*
-	 * Invalidate the read state. We might read from a different source after
+	 * Invalidate the read page. We might read from a different source after
 	 * failure.
 	 */
 	XLogReaderDiscardReadingPage(state);
@@ -523,7 +660,8 @@ err:
 	if (state->errormsg_buf[0] != '\0')
 		*errormsg = state->errormsg_buf;
 
-	return NULL;
+	*record = NULL;
+	return XLREAD_FAIL;
 }
 
 /*
@@ -711,11 +849,12 @@ XLogReaderDiscardReadingPage(XLogReaderState *state)
  *
  * This is just a convenience subroutine to avoid duplicated code in
  * XLogReadRecord.  It's not intended for use from anywhere else.
+ *
+ * If PrevRecPtr is valid, the xl_prev is is cross-checked with it.
  */
 static bool
 ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
-					  XLogRecPtr PrevRecPtr, XLogRecord *record,
-					  bool randAccess)
+					  XLogRecPtr PrevRecPtr, XLogRecord *record)
 {
 	if (record->xl_tot_len < SizeOfXLogRecord)
 	{
@@ -733,7 +872,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
 							  (uint32) RecPtr);
 		return false;
 	}
-	if (randAccess)
+	if (PrevRecPtr == InvalidXLogRecPtr)
 	{
 		/*
 		 * We can't exactly verify the prev-link, but surely it should be less
@@ -961,12 +1100,15 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
  * debugging purposes.
  */
 XLogRecPtr
-XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
+XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr,
+				   XLogFindNextRecordCB read_page, void *private)
 {
 	XLogReaderState saved_state = *state;
 	XLogRecPtr	tmpRecPtr;
 	XLogRecPtr	found = InvalidXLogRecPtr;
 	XLogPageHeader header;
+	XLogRecord *record;
+	XLogReadRecordResult result;
 	char	   *errormsg;
 
 	Assert(!XLogRecPtrIsInvalid(RecPtr));
@@ -999,9 +1141,7 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
 		while(XLogNeedData(state, targetPagePtr, targetRecOff,
 						   targetRecOff != 0))
 		{
-			if (!state->read_page(state, state->readPagePtr, state->readLen,
-								  state->ReadRecPtr, state->readBuf,
-								  &state->readPageTLI))
+			if (!read_page(state, private))
 				break;
 		}
 
@@ -1052,11 +1192,19 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
 	 * because either we're at the first record after the beginning of a page
 	 * or we just jumped over the remaining data of a continuation.
 	 */
-	while (XLogReadRecord(state, tmpRecPtr, &errormsg) != NULL)
+	while ((result = XLogReadRecord(state, tmpRecPtr, &record, &errormsg)) !=
+		   XLREAD_FAIL)
 	{
 		/* continue after the record */
 		tmpRecPtr = InvalidXLogRecPtr;
 
+		if (result == XLREAD_NEED_DATA)
+		{
+			if (!read_page(state, private))
+				goto err;
+			continue;
+		}
+
 		/* past the record we've found, break out */
 		if (RecPtr <= state->ReadRecPtr)
 		{
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index dad9074b9f..d50a0ca187 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -802,8 +802,7 @@ XLogRead(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr,
 void
 XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
 {
-	const XLogRecPtr lastReadPage = state->readSegNo *
-	state->wal_segment_size + state->readLen;
+	const XLogRecPtr lastReadPage = state->readPagePtr;
 
 	Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0);
 	Assert(wantLength <= XLOG_BLCKSZ);
@@ -818,7 +817,7 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
 	 * current TLI has since become historical.
 	 */
 	if (lastReadPage == wantPage &&
-		state->readLen != 0 &&
+		state->page_verified &&
 		lastReadPage + state->readLen >= wantPage + Min(wantLength, XLOG_BLCKSZ - 1))
 		return;
 
@@ -908,10 +907,12 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
  * loop for now.
  */
 bool
-read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
-					 int reqLen, XLogRecPtr targetRecPtr, char *cur_page,
-					 TimeLineID *pageTLI)
+read_local_xlog_page(XLogReaderState *state)
 {
+	XLogRecPtr	targetPagePtr = state->readPagePtr;
+	int			reqLen		  = state->readLen;
+	char	   *cur_page	  = state->readBuf;
+	TimeLineID *pageTLI		  = &state->readPageTLI;
 	XLogRecPtr	read_upto,
 				loc;
 	int			count;
@@ -1027,6 +1028,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 			 XLOG_BLCKSZ);
 
 	/* number of valid bytes in the buffer */
+	state->readPagePtr = targetPagePtr;
 	state->readLen = count;
 	return true;
 }
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index f8b9020081..11e52e4c01 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -124,7 +124,7 @@ StartupDecodingContext(List *output_plugin_options,
 					   TransactionId xmin_horizon,
 					   bool need_full_snapshot,
 					   bool fast_forward,
-					   XLogPageReadCB read_page,
+					   LogicalDecodingXLogReadPageCB read_page,
 					   LogicalOutputPluginWriterPrepareWrite prepare_write,
 					   LogicalOutputPluginWriterWrite do_write,
 					   LogicalOutputPluginWriterUpdateProgress update_progress)
@@ -173,11 +173,12 @@ StartupDecodingContext(List *output_plugin_options,
 
 	ctx->slot = slot;
 
-	ctx->reader = XLogReaderAllocate(wal_segment_size, read_page, ctx);
+	ctx->reader = XLogReaderAllocate(wal_segment_size);
 	if (!ctx->reader)
 		ereport(ERROR,
 				(errcode(ERRCODE_OUT_OF_MEMORY),
 				 errmsg("out of memory")));
+	ctx->read_page = read_page;
 
 	ctx->reorder = ReorderBufferAllocate();
 	ctx->snapshot_builder =
@@ -232,7 +233,7 @@ CreateInitDecodingContext(char *plugin,
 						  List *output_plugin_options,
 						  bool need_full_snapshot,
 						  XLogRecPtr restart_lsn,
-						  XLogPageReadCB read_page,
+						  LogicalDecodingXLogReadPageCB read_page,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
 						  LogicalOutputPluginWriterWrite do_write,
 						  LogicalOutputPluginWriterUpdateProgress update_progress)
@@ -374,7 +375,7 @@ LogicalDecodingContext *
 CreateDecodingContext(XLogRecPtr start_lsn,
 					  List *output_plugin_options,
 					  bool fast_forward,
-					  XLogPageReadCB read_page,
+					  LogicalDecodingXLogReadPageCB read_page,
 					  LogicalOutputPluginWriterPrepareWrite prepare_write,
 					  LogicalOutputPluginWriterWrite do_write,
 					  LogicalOutputPluginWriterUpdateProgress update_progress)
@@ -482,7 +483,13 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
 		char	   *err = NULL;
 
 		/* the read_page callback waits for new WAL */
-		record = XLogReadRecord(ctx->reader, startptr, &err);
+		while (XLogReadRecord(ctx->reader, startptr, &record, &err) ==
+			   XLREAD_NEED_DATA)
+		{
+			if (!ctx->read_page(ctx))
+				break;
+		}
+
 		if (err)
 			elog(ERROR, "%s", err);
 		if (!record)
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 7210a940bd..5270f646bd 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -115,11 +115,9 @@ check_permissions(void)
 }
 
 bool
-logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
-							 int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
+logical_read_local_xlog_page(LogicalDecodingContext *ctx)
 {
-	return read_local_xlog_page(state, targetPagePtr, reqLen,
-						 targetRecPtr, cur_page, pageTLI);
+	return read_local_xlog_page(ctx->reader);
 }
 
 /*
@@ -289,7 +287,13 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 			XLogRecord *record;
 			char	   *errm = NULL;
 
-			record = XLogReadRecord(ctx->reader, startptr, &errm);
+			while (XLogReadRecord(ctx->reader, startptr, &record, &errm) ==
+				   XLREAD_NEED_DATA)
+			{
+				if (!ctx->read_page(ctx))
+					break;
+			}
+
 			if (errm)
 				elog(ERROR, "%s", errm);
 
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 808a6f5b83..fb5c0a702d 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -433,7 +433,13 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
 			 * Read records.  No changes are generated in fast_forward mode,
 			 * but snapbuilder/slot statuses are updated properly.
 			 */
-			record = XLogReadRecord(ctx->reader, startlsn, &errm);
+			while (XLogReadRecord(ctx->reader, startlsn, &record, &errm) ==
+				   XLREAD_NEED_DATA)
+			{
+				if (!ctx->read_page(ctx))
+					break;
+			}
+
 			if (errm)
 				elog(ERROR, "%s", errm);
 
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index cc35e2a04d..a4518f5b55 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -762,9 +762,12 @@ StartReplication(StartReplicationCmd *cmd)
  * set every time WAL is flushed.
  */
 static bool
-logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
-					   XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
+logical_read_xlog_page(LogicalDecodingContext *ctx)
 {
+	XLogReaderState *state = ctx->reader;
+	XLogRecPtr		targetPagePtr = state->readPagePtr;
+	int				reqLen		  = state->readLen;
+	char		   *cur_page	  = state->readBuf;
 	XLogRecPtr	flushptr;
 	int			count;
 
@@ -2827,7 +2830,12 @@ XLogSendLogical(void)
 	 */
 	WalSndCaughtUp = false;
 
-	record = XLogReadRecord(logical_decoding_ctx->reader, logical_startptr, &errm);
+	while (XLogReadRecord(logical_decoding_ctx->reader,
+						  logical_startptr, &record, &errm) == XLREAD_NEED_DATA)
+	{
+		if (!logical_decoding_ctx->read_page(logical_decoding_ctx))
+			break;
+	}
 	logical_startptr = InvalidXLogRecPtr;
 
 	/* xlog record was invalid */
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 4df53964e4..ff26b30f82 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -41,16 +41,8 @@ static int	xlogreadfd = -1;
 static XLogSegNo xlogreadsegno = -1;
 static char xlogfpath[MAXPGPATH];
 
-typedef struct XLogPageReadPrivate
-{
-	const char *datadir;
-	int			tliIndex;
-} XLogPageReadPrivate;
-
-static bool	SimpleXLogPageRead(XLogReaderState *xlogreader,
-							   XLogRecPtr targetPagePtr,
-							   int reqLen, XLogRecPtr targetRecPtr, char *readBuf,
-							   TimeLineID *pageTLI);
+static bool SimpleXLogPageRead(XLogReaderState *xlogreader,
+							   const char *datadir, int *tliIndex);
 
 /*
  * Read WAL from the datadir/pg_wal, starting from 'startpoint' on timeline
@@ -64,24 +56,26 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex,
 	XLogRecord *record;
 	XLogReaderState *xlogreader;
 	char	   *errormsg;
-	XLogPageReadPrivate private;
 
-	private.datadir = datadir;
-	private.tliIndex = tliIndex;
-	xlogreader = XLogReaderAllocate(WalSegSz, &SimpleXLogPageRead,
-									&private);
+	xlogreader = XLogReaderAllocate(WalSegSz);
 	if (xlogreader == NULL)
 		pg_fatal("out of memory");
 
 	do
 	{
-		record = XLogReadRecord(xlogreader, startpoint, &errormsg);
+		while (XLogReadRecord(xlogreader, startpoint, &record, &errormsg) ==
+			   XLREAD_NEED_DATA)
+		{
+			if (!SimpleXLogPageRead(xlogreader, datadir, &tliIndex))
+				break;
+		}
 
 		if (record == NULL)
 		{
-			XLogRecPtr	errptr;
+			XLogRecPtr	errptr = xlogreader->EndRecPtr;
 
-			errptr = startpoint ? startpoint : xlogreader->EndRecPtr;
+			if (startpoint)
+				errptr = startpoint;
 
 			if (errormsg)
 				pg_fatal("could not read WAL record at %X/%X: %s",
@@ -116,17 +110,18 @@ readOneRecord(const char *datadir, XLogRecPtr ptr, int tliIndex)
 	XLogRecord *record;
 	XLogReaderState *xlogreader;
 	char	   *errormsg;
-	XLogPageReadPrivate private;
 	XLogRecPtr	endptr;
 
-	private.datadir = datadir;
-	private.tliIndex = tliIndex;
-	xlogreader = XLogReaderAllocate(WalSegSz, &SimpleXLogPageRead,
-									&private);
+	xlogreader = XLogReaderAllocate(WalSegSz);
 	if (xlogreader == NULL)
 		pg_fatal("out of memory");
 
-	record = XLogReadRecord(xlogreader, ptr, &errormsg);
+	while (XLogReadRecord(xlogreader, ptr, &record, &errormsg) ==
+		   XLREAD_NEED_DATA)
+	{
+		if (!SimpleXLogPageRead(xlogreader, datadir, &tliIndex))
+			break;
+	}
 	if (record == NULL)
 	{
 		if (errormsg)
@@ -161,7 +156,6 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex,
 	XLogRecPtr	searchptr;
 	XLogReaderState *xlogreader;
 	char	   *errormsg;
-	XLogPageReadPrivate private;
 
 	/*
 	 * The given fork pointer points to the end of the last common record,
@@ -177,10 +171,7 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex,
 			forkptr += SizeOfXLogShortPHD;
 	}
 
-	private.datadir = datadir;
-	private.tliIndex = tliIndex;
-	xlogreader = XLogReaderAllocate(WalSegSz, &SimpleXLogPageRead,
-									&private);
+	xlogreader = XLogReaderAllocate(WalSegSz);
 	if (xlogreader == NULL)
 		pg_fatal("out of memory");
 
@@ -189,7 +180,12 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex,
 	{
 		uint8		info;
 
-		record = XLogReadRecord(xlogreader, searchptr, &errormsg);
+		while (XLogReadRecord(xlogreader, searchptr, &record, &errormsg) ==
+			   XLREAD_NEED_DATA)
+		{
+			if (!SimpleXLogPageRead(xlogreader, datadir, &tliIndex))
+				break;
+		}
 
 		if (record == NULL)
 		{
@@ -236,11 +232,12 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex,
 
 /* XLogReader callback function, to read a WAL page */
 static bool
-SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
-				   int reqLen, XLogRecPtr targetRecPtr, char *readBuf,
-				   TimeLineID *pageTLI)
+SimpleXLogPageRead(XLogReaderState *xlogreader,
+				   const char*datadir, int *tliIndex)
 {
-	XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data;
+	XLogRecPtr	targetPagePtr = xlogreader->readPagePtr;
+	char	   *readBuf		  = xlogreader->readBuf;
+	TimeLineID *pageTLI		  = &xlogreader->readPageTLI;
 	uint32		targetPageOff;
 	XLogRecPtr	targetSegEnd;
 	XLogSegNo	targetSegNo;
@@ -273,17 +270,17 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
 		 * be done both forward and backward, consider also switching timeline
 		 * accordingly.
 		 */
-		while (private->tliIndex < targetNentries - 1 &&
-			   targetHistory[private->tliIndex].end < targetSegEnd)
-			private->tliIndex++;
-		while (private->tliIndex > 0 &&
-			   targetHistory[private->tliIndex].begin >= targetSegEnd)
-			private->tliIndex--;
+		while (*tliIndex < targetNentries - 1 &&
+			   targetHistory[*tliIndex].end < targetSegEnd)
+			(*tliIndex)++;
+		while (*tliIndex > 0 &&
+			   targetHistory[*tliIndex].begin >= targetSegEnd)
+			(*tliIndex)--;
 
-		XLogFileName(xlogfname, targetHistory[private->tliIndex].tli,
+		XLogFileName(xlogfname, targetHistory[*tliIndex].tli,
 					 xlogreadsegno, WalSegSz);
 
-		snprintf(xlogfpath, MAXPGPATH, "%s/" XLOGDIR "/%s", private->datadir, xlogfname);
+		snprintf(xlogfpath, MAXPGPATH, "%s/" XLOGDIR "/%s", datadir, xlogfname);
 
 		xlogreadfd = open(xlogfpath, O_RDONLY | PG_BINARY, 0);
 
@@ -324,7 +321,7 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
 
 	Assert(targetSegNo == xlogreadsegno);
 
-	*pageTLI = targetHistory[private->tliIndex].tli;
+	*pageTLI = targetHistory[*tliIndex].tli;
 
 	xlogreader->readLen = XLOG_BLCKSZ;
 	return true;
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index 96d1f36ebc..a61a5a91cb 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -422,10 +422,12 @@ XLogDumpXLogRead(const char *directory, TimeLineID timeline_id,
  * XLogReader read_page callback
  */
 static bool
-XLogDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
-				 XLogRecPtr targetPtr, char *readBuff, TimeLineID *curFileTLI)
+XLogDumpReadPage(XLogReaderState *state, void *priv)
 {
-	XLogDumpPrivate *private = state->private_data;
+	XLogRecPtr	targetPagePtr = state->readPagePtr;
+	int			reqLen		  = state->readLen;
+	char	   *readBuff	  = state->readBuf;
+	XLogDumpPrivate *private  = (XLogDumpPrivate *) priv;
 	int			count = XLOG_BLCKSZ;
 
 	if (private->endptr != InvalidXLogRecPtr)
@@ -445,6 +447,7 @@ XLogDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
 	XLogDumpXLogRead(private->inpath, private->timeline, targetPagePtr,
 					 readBuff, count);
 
+	Assert(count >= state->readLen);
 	state->readLen = count;
 	return true;
 }
@@ -1102,13 +1105,13 @@ main(int argc, char **argv)
 	/* done with argument parsing, do the actual work */
 
 	/* we have everything we need, start reading */
-	xlogreader_state = XLogReaderAllocate(WalSegSz, XLogDumpReadPage,
-										  &private);
+	xlogreader_state = XLogReaderAllocate(WalSegSz);
 	if (!xlogreader_state)
 		fatal_error("out of memory");
 
 	/* first find a valid recptr to start from */
-	first_record = XLogFindNextRecord(xlogreader_state, private.startptr);
+	first_record = XLogFindNextRecord(xlogreader_state, private.startptr,
+									  &XLogDumpReadPage, (void*) &private);
 
 	if (first_record == InvalidXLogRecPtr)
 		fatal_error("could not find a valid record after %X/%X",
@@ -1132,7 +1135,14 @@ main(int argc, char **argv)
 	for (;;)
 	{
 		/* try to read the next record */
-		record = XLogReadRecord(xlogreader_state, first_record, &errormsg);
+		while (XLogReadRecord(xlogreader_state,
+							  first_record, &record, &errormsg) ==
+			   XLREAD_NEED_DATA)
+		{
+			if (!XLogDumpReadPage(xlogreader_state, (void *) &private))
+				break;
+		}
+
 		if (!record)
 		{
 			if (!config.follow || private.endptr_reached)
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 6de7c19a2a..11ae82b96d 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -33,14 +33,6 @@
 
 typedef struct XLogReaderState XLogReaderState;
 
-/* Function type definition for the read_page callback */
-typedef bool (*XLogPageReadCB) (XLogReaderState *xlogreader,
-							   XLogRecPtr targetPagePtr,
-							   int reqLen,
-							   XLogRecPtr targetRecPtr,
-							   char *readBuf,
-							   TimeLineID *pageTLI);
-
 typedef struct
 {
 	/* Is this block ref in use? */
@@ -70,6 +62,29 @@ typedef struct
 	uint16		data_bufsz;
 } DecodedBkpBlock;
 
+/* Return code from XLogReadRecord */
+typedef enum XLogReadRecordResult
+{
+	XLREAD_SUCCESS,		/* record is successfully read */
+	XLREAD_NEED_DATA,	/* need more data. see XLogReadRecord. */
+	XLREAD_FAIL			/* failed during reading a record */
+} XLogReadRecordResult;
+
+/*
+ * internal state of XLogReadRecord
+ *
+ * XLogReadState runs a state machine while reading a record. Theses states
+ * are not seen outside the function. Each state may repeat several times
+ * exiting requesting caller for new data. See the comment of XLogReadRecrod
+ * for details.
+ */
+typedef enum XLogReadRecordState {
+	XLREAD_NEXT_RECORD,
+	XLREAD_TOT_LEN,
+	XLREAD_FIRST_FRAGMENT,
+	XLREAD_CONTINUATION
+} XLogReadRecordState;
+
 struct XLogReaderState
 {
 	/* ----------------------------------------
@@ -82,46 +97,19 @@ struct XLogReaderState
 	 */
 	int			wal_segment_size;
 
-	/*
-	 * Data input callback (mandatory).
-	 *
-	 * This callback shall read at least reqLen valid bytes of the xlog page
-	 * starting at targetPagePtr, and store them in readBuf.  The callback
-	 * shall return the number of bytes read (never more than XLOG_BLCKSZ), or
-	 * -1 on failure.  The callback shall sleep, if necessary, to wait for the
-	 * requested bytes to become available.  The callback will not be invoked
-	 * again for the same page unless more than the returned number of bytes
-	 * are needed.
-	 *
-	 * targetRecPtr is the position of the WAL record we're reading.  Usually
-	 * it is equal to targetPagePtr + reqLen, but sometimes xlogreader needs
-	 * to read and verify the page or segment header, before it reads the
-	 * actual WAL record it's interested in.  In that case, targetRecPtr can
-	 * be used to determine which timeline to read the page from.
-	 *
-	 * The callback shall set *pageTLI to the TLI of the file the page was
-	 * read from.  It is currently used only for error reporting purposes, to
-	 * reconstruct the name of the WAL file where an error occurred.
-	 */
-	XLogPageReadCB read_page;
-
 	/*
 	 * System identifier of the xlog files we're about to read.  Set to zero
 	 * (the default value) if unknown or unimportant.
 	 */
 	uint64		system_identifier;
 
-	/*
-	 * Opaque data for callbacks to use.  Not used by XLogReader.
-	 */
-	void	   *private_data;
-
 	/*
 	 * Start and end point of last record read.  EndRecPtr is also used as the
 	 * position to read next, if XLogReadRecord receives an invalid recptr.
 	 */
-	XLogRecPtr	ReadRecPtr;		/* start of last record read */
+	XLogRecPtr	ReadRecPtr;		/* start of last record read or being read */
 	XLogRecPtr	EndRecPtr;		/* end+1 of last record read */
+	XLogRecPtr	PrevRecPtr;		/* start of previous record read */
 
 	/* ----------------------------------------
 	 * Communication with page reader
@@ -163,6 +151,7 @@ struct XLogReaderState
 
 	/* last read segment and segment offset for data currently in readBuf */
 	bool		page_verified;
+	bool		record_verified;
 	XLogSegNo	readSegNo;
 
 	/*
@@ -172,8 +161,6 @@ struct XLogReaderState
 	XLogRecPtr	latestPagePtr;
 	TimeLineID	latestPageTLI;
 
-	/* beginning of the WAL record being read. */
-	XLogRecPtr	currRecPtr;
 	/* timeline to read it from, 0 if a lookup is required */
 	TimeLineID	currTLI;
 
@@ -200,28 +187,41 @@ struct XLogReaderState
 	char	   *readRecordBuf;
 	uint32		readRecordBufSize;
 
+	/*
+	 * XLogReadRecord() state
+	 */
+	XLogReadRecordState	readRecordState;/* state machine state */
+	int			recordGotLen;		/* amount of current record that has
+									 * already been read */
+	int			recordRemainLen;	/* length of current record that remains */
+	XLogRecPtr	recordContRecPtr;	/* where the current record continues */
+
 	/* Buffer to hold error message */
 	char	   *errormsg_buf;
 };
 
 /* Get a new XLogReader */
-extern XLogReaderState *XLogReaderAllocate(int wal_segment_size,
-										   XLogPageReadCB pagereadfunc,
-										   void *private_data);
+extern XLogReaderState *XLogReaderAllocate(int wal_segment_size);
 
 /* Free an XLogReader */
 extern void XLogReaderFree(XLogReaderState *state);
 
 /* Read the next XLog record. Returns NULL on end-of-WAL or failure */
-extern struct XLogRecord *XLogReadRecord(XLogReaderState *state,
-										 XLogRecPtr recptr, char **errormsg);
+extern XLogReadRecordResult XLogReadRecord(XLogReaderState *state,
+										   XLogRecPtr recptr,
+										   XLogRecord **record,
+										   char **errormsg);
 
 /* Validate a page */
 extern bool XLogReaderValidatePageHeader(XLogReaderState *state,
 										 XLogRecPtr recptr, char *phdr);
 
 #ifdef FRONTEND
-extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr);
+/* Function type definition for the read_page callback */
+typedef bool (*XLogFindNextRecordCB) (XLogReaderState *xlogreader,
+									  void *private);
+extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr,
+									 XLogFindNextRecordCB read_page, void *private);
 #endif							/* FRONTEND */
 /* Functions for decoding an XLogRecord */
 
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index 0842af9f95..55a9b6237a 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -47,10 +47,7 @@ extern Buffer XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
 extern Relation CreateFakeRelcacheEntry(RelFileNode rnode);
 extern void FreeFakeRelcacheEntry(Relation fakerel);
 
-extern bool	read_local_xlog_page(XLogReaderState *state,
-								 XLogRecPtr targetPagePtr, int reqLen,
-								 XLogRecPtr targetRecPtr, char *cur_page,
-								 TimeLineID *pageTLI);
+extern bool read_local_xlog_page(XLogReaderState *state);
 
 extern void XLogReadDetermineTimeline(XLogReaderState *state,
 									  XLogRecPtr wantPage, uint32 wantLength);
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 31c796b765..482d3d311c 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -30,6 +30,10 @@ typedef void (*LogicalOutputPluginWriterUpdateProgress) (struct LogicalDecodingC
 														 TransactionId xid
 );
 
+typedef struct LogicalDecodingContext LogicalDecodingContext;
+
+typedef bool (*LogicalDecodingXLogReadPageCB)(LogicalDecodingContext *ctx);
+
 typedef struct LogicalDecodingContext
 {
 	/* memory context this is all allocated in */
@@ -40,6 +44,7 @@ typedef struct LogicalDecodingContext
 
 	/* infrastructure pieces for decoding */
 	XLogReaderState *reader;
+	LogicalDecodingXLogReadPageCB read_page;
 	struct ReorderBuffer *reorder;
 	struct SnapBuild *snapshot_builder;
 
@@ -96,14 +101,14 @@ extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
 														 List *output_plugin_options,
 														 bool need_full_snapshot,
 														 XLogRecPtr restart_lsn,
-														 XLogPageReadCB read_page,
+														 LogicalDecodingXLogReadPageCB read_page,
 														 LogicalOutputPluginWriterPrepareWrite prepare_write,
 														 LogicalOutputPluginWriterWrite do_write,
 														 LogicalOutputPluginWriterUpdateProgress update_progress);
 extern LogicalDecodingContext *CreateDecodingContext(XLogRecPtr start_lsn,
 													 List *output_plugin_options,
 													 bool fast_forward,
-													 XLogPageReadCB read_page,
+													 LogicalDecodingXLogReadPageCB read_page,
 													 LogicalOutputPluginWriterPrepareWrite prepare_write,
 													 LogicalOutputPluginWriterWrite do_write,
 													 LogicalOutputPluginWriterUpdateProgress update_progress);
diff --git a/src/include/replication/logicalfuncs.h b/src/include/replication/logicalfuncs.h
index 8e52b1f4aa..25fa68d5b9 100644
--- a/src/include/replication/logicalfuncs.h
+++ b/src/include/replication/logicalfuncs.h
@@ -11,9 +11,6 @@
 
 #include "replication/logical.h"
 
-extern bool	logical_read_local_xlog_page(XLogReaderState *state,
-										 XLogRecPtr targetPagePtr,
-										 int reqLen, XLogRecPtr targetRecPtr,
-										 char *cur_page, TimeLineID *pageTLI);
+extern bool logical_read_local_xlog_page(LogicalDecodingContext *ctx);
 
 #endif
-- 
2.16.3

