From d0993b0c0ea3a1a4e65ed2b83a51d192f81b7c39 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Wed, 17 Apr 2019 09:55:58 +0900
Subject: [PATCH 04/10] Make XLogReadRecord a state machine

This patch moves the caller sites of the callback above XLogReadRecord
by making XLogReadRecord a state machine.
---
 src/backend/access/transam/twophase.c          |   8 +-
 src/backend/access/transam/xlog.c              |  10 ++-
 src/backend/access/transam/xlogreader.c        | 109 ++++++++++++++++---------
 src/backend/replication/logical/logical.c      |  10 ++-
 src/backend/replication/logical/logicalfuncs.c |  10 ++-
 src/backend/replication/slotfuncs.c            |  10 ++-
 src/backend/replication/walsender.c            |  10 ++-
 src/bin/pg_rewind/parsexlog.c                  |  28 ++++++-
 src/bin/pg_waldump/pg_waldump.c                |  11 ++-
 src/include/access/xlogreader.h                |  20 ++++-
 10 files changed, 175 insertions(+), 51 deletions(-)

diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 5dba27e5dd..a3573ad0af 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -1394,7 +1394,13 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
 				 errmsg("out of memory"),
 				 errdetail("Failed while allocating a WAL reading processor.")));
 
-	XLogReadRecord(xlogreader, lsn, &record, &errormsg);
+	while (XLogReadRecord(xlogreader, lsn, &record, &errormsg) ==
+		   XLREAD_NEED_DATA)
+			xlogreader->read_page(xlogreader,
+								  xlogreader->loadPagePtr, xlogreader->loadLen,
+								  xlogreader->currRecPtr, xlogreader->readBuf,
+								  &xlogreader->readPageTLI);
+
 	if (record == NULL)
 		ereport(ERROR,
 				(errcode_for_file_access(),
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index b6cb4111b8..59fd12153a 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -4258,11 +4258,19 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
 	/* This is the first attempt to read this page. */
 	lastSourceFailed = false;
 
+	XLREAD_RESET(xlogreader);
+
 	for (;;)
 	{
 		char	   *errormsg;
 
-		XLogReadRecord(xlogreader, RecPtr, &record, &errormsg);
+		while (XLogReadRecord(xlogreader, RecPtr, &record, &errormsg)
+			   == XLREAD_NEED_DATA)
+			xlogreader->read_page(xlogreader,
+								  xlogreader->loadPagePtr, xlogreader->loadLen,
+								  xlogreader->currRecPtr, xlogreader->readBuf,
+								  &xlogreader->readPageTLI);
+
 		ReadRecPtr = xlogreader->ReadRecPtr;
 		EndRecPtr = xlogreader->EndRecPtr;
 		if (record == NULL)
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 7d9438f0ea..05a57a1ebd 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -261,31 +261,56 @@ allocate_recordbuf(XLogReaderState *state, uint32 reclength)
 /*
  * Attempt to read an XLOG record.
  *
- * If RecPtr is valid, try to read a record at that position.  Otherwise
- * try to read a record just after the last one previously read.
+ * This function runs a state machine and may need to call several times until
+ * a record is read.
  *
- * If the read_page callback fails to read the requested data, *record is set
- * to NULL and XLREAD_FAIL is returned.  The callback is expected to have
- * reported the error; errormsg is set to NULL.
+ * At the initial state, if called with valid pRecPtr, try to read a
+ * record at that position.  Otherwise try to read a record just after
+ * the last one previously read.
  *
- * If the reading fails for some other reason, *record is also set to NULL and
- * XLREAD_FAIL is returned. *errormsg is set to a string with details of the
- * failure.
+ * When a record is successfully read, returns XLREAD_SUCCESS and the
+ * result record is stored in *record then reset to initial state.
+ *
+ * Returns XLREAD_NEED_DATA if more data is needed. The caller shall
+ * read some more XLOG data into readBuf and call this function again.
+ * In that case loadPagePtr and loadLen in state is set to inform the
+ * required WAL data. The caller shall read in the requested data into
+ * readBuf and set readLen and readPageTLI to the length of the data
+ * actually read and the TLI for the data read in respectively. In
+ * case of failure readLen shall be set to -1 to inform error and
+ * store error message in errormsg_buf.
+ *
+ * If the reading fails for some other reason, *record is also set to
+ * NULL and XLREAD_FAIL is returned. *errormsg is set to a string with
+ * details of the failure. Reset to initial state.
  *
  * The returned pointer (or *errormsg) points to an internal buffer that's
  * valid until the next call to XLogReadRecord.
  */
 XLogReadRecordResult
-XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr,
+XLogReadRecord(XLogReaderState *state, XLogRecPtr pRecPtr,
 			   XLogRecord **record, char **errormsg)
 {
-	XLogRecPtr	targetPagePtr;
-	bool		randAccess;
-	uint32		len,
-				total_len;
-	uint32		targetRecOff;
-	uint32		pageHeaderSize;
-	bool		gotheader;
+	/*
+	 * This function is a state machine that can exit and reenter at any place
+	 * marked as XLR_LEAVE. All internal state need to be preserved through
+	 * multiple calls.
+	 */
+	static XLogRecPtr	targetPagePtr;
+	static bool			randAccess;
+	static uint32		len,
+						total_len;
+	static uint32		targetRecOff;
+	static uint32		pageHeaderSize;
+	static bool			gotheader;
+	static XLogRecPtr	RecPtr;
+
+#define XLR_STATE state->xlread_state
+#define XLR_INIT_STATE XLREAD_STATE_INIT
+
+	XLR_SWITCH();
+
+	RecPtr = pRecPtr;
 
 	/*
 	 * randAccess indicates whether to verify the previous-record pointer of
@@ -339,10 +364,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr,
 	 */
 	while (XLogNeedData(state, targetPagePtr,
 						Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ)))
-		state->read_page(state, state->loadPagePtr, state->loadLen,
-						 state->currRecPtr, state->readBuf,
-						 &state->readPageTLI);
-	
+		XLR_LEAVE(XLREAD_STATE_READ_PAGE, XLREAD_NEED_DATA);
+
 	if (state->readLen < 0)
 		goto err;
 
@@ -421,10 +444,10 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr,
 	if (total_len > len)
 	{
 		/* Need to reassemble record */
-		char	   *contdata;
-		XLogPageHeader pageHeader;
-		char	   *buffer;
-		uint32		gotlen;
+		static char	   *contdata;
+		static XLogPageHeader pageHeader;
+		static char	   *buffer;
+		static uint32	gotlen;
 
 		/*
 		 * Enlarge readRecordBuf as needed.
@@ -454,9 +477,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr,
 			while (XLogNeedData(state, targetPagePtr,
 								Min(total_len - gotlen + SizeOfXLogShortPHD,
 									XLOG_BLCKSZ)))
-				state->read_page(state, state->loadPagePtr, state->loadLen,
-								 state->currRecPtr, state->readBuf,
-								 &state->readPageTLI);
+				XLR_LEAVE(XLREAD_STATE_READ_NEXT_PAGE, XLREAD_NEED_DATA);
 
 			if (state->readLen < 0)
 				goto err;
@@ -493,9 +514,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr,
 			if (state->readLen < pageHeaderSize)
 			{
 				while (XLogNeedData(state, targetPagePtr, pageHeaderSize))
-					state->read_page(state, state->loadPagePtr, state->loadLen,
-									 state->currRecPtr, state->readBuf,
-									 &state->readPageTLI);
+					XLR_LEAVE(XLREAD_STATE_READ_PAGE_HADER, XLREAD_NEED_DATA);
 			}
 
 			Assert(pageHeaderSize <= state->readLen);
@@ -508,9 +527,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr,
 			if (state->readLen < pageHeaderSize + len)
 			{
 				if (XLogNeedData(state, targetPagePtr, pageHeaderSize + len))
-					state->read_page(state, state->loadPagePtr, state->loadLen,
-									 state->currRecPtr, state->readBuf,
-									 &state->readPageTLI);
+					XLR_LEAVE(XLREAD_STATE_READ_CONTRECORD, XLREAD_NEED_DATA);
 			}
 
 			memcpy(buffer, (char *) contdata, len);
@@ -544,9 +561,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr,
 		/* Wait for the record data to become available */
 		while (XLogNeedData(state, targetPagePtr,
 							Min(targetRecOff + total_len, XLOG_BLCKSZ)))
-			state->read_page(state, state->loadPagePtr, state->loadLen,
-							 state->currRecPtr, state->readBuf,
-							 &state->readPageTLI);
+			XLR_LEAVE(XLREAD_STATE_READ_RECORD, XLREAD_NEED_DATA);
 
 		if (state->readLen < 0)
 			goto err;
@@ -560,6 +575,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr,
 		state->ReadRecPtr = RecPtr;
 	}
 
+	XLR_SWITCH_END();
+
 	/*
 	 * Special processing if it's an XLOG SWITCH record
 	 */
@@ -572,10 +589,10 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr,
 	}
 
 	if (DecodeXLogRecord(state, *record, errormsg))
-		return XLREAD_SUCCESS;
+		XLR_RETURN(XLREAD_SUCCESS);
 
 	*record = NULL;
-	return XLREAD_FAIL;
+	XLR_RETURN(XLREAD_FAIL);
 
 err:
 
@@ -589,7 +606,7 @@ err:
 		*errormsg = state->errormsg_buf;
 
 	*record = NULL;
-	return XLREAD_FAIL;
+	XLR_RETURN(XLREAD_FAIL);
 }
 
 /*
@@ -615,6 +632,8 @@ XLogNeedData(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
 	static XLogSegNo	targetSegNo;
 	static XLogPageHeader hdr;
 
+#undef XLR_STATE
+#undef XLR_INIT_STATE
 #define XLR_STATE state->xlnd_state
 #define XLR_INIT_STATE XLND_STATE_INIT
 
@@ -994,6 +1013,7 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
 	XLogRecPtr	found = InvalidXLogRecPtr;
 	XLogPageHeader header;
 	XLogRecord *record;
+	XLogReadRecordResult result;
 	char	   *errormsg;
 
 	Assert(!XLogRecPtrIsInvalid(RecPtr));
@@ -1082,8 +1102,17 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
 	 * because either we're at the first record after the beginning of a page
 	 * or we just jumped over the remaining data of a continuation.
 	 */
-	while (XLogReadRecord(state, tmpRecPtr, &record, &errormsg) == XLREAD_SUCCESS)
+	while ((result = XLogReadRecord(state, tmpRecPtr, &record, &errormsg)) !=
+		   XLREAD_FAIL)
 	{
+		if (result == XLREAD_NEED_DATA)
+		{
+			state->read_page(state, state->loadPagePtr, state->loadLen,
+							 state->currRecPtr,	state->readBuf,
+							 &state->readPageTLI);
+			continue;
+		}
+
 		/* continue after the record */
 		tmpRecPtr = InvalidXLogRecPtr;
 
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index d86feb0a0e..1740753b76 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -481,7 +481,15 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
 		char	   *err = NULL;
 
 		/* the read_page callback waits for new WAL */
-		XLogReadRecord(ctx->reader, startptr, &record, &err);
+		while (XLogReadRecord(ctx->reader, startptr, &record, &err) ==
+			   XLREAD_NEED_DATA)
+			ctx->reader->read_page(ctx->reader,
+								   ctx->reader->loadPagePtr,
+								   ctx->reader->loadLen,
+								   ctx->reader->currRecPtr,
+								   ctx->reader->readBuf,
+								   &ctx->reader->readPageTLI);
+
 		if (err)
 			elog(ERROR, "%s", err);
 		if (!record)
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 6fc78d7445..4d09255504 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -289,7 +289,15 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 			XLogRecord *record;
 			char	   *errm = NULL;
 
-			XLogReadRecord(ctx->reader, startptr, &record, &errm);
+			while (XLogReadRecord(ctx->reader, startptr, &record, &errm) ==
+				   XLREAD_NEED_DATA)
+				ctx->reader->read_page(ctx->reader,
+									   ctx->reader->loadPagePtr,
+									   ctx->reader->loadLen,
+									   ctx->reader->currRecPtr,
+									   ctx->reader->readBuf,
+									   &ctx->reader->readPageTLI);
+
 			if (errm)
 				elog(ERROR, "%s", errm);
 
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index d261b402eb..4a8952931d 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -433,7 +433,15 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
 			 * Read records.  No changes are generated in fast_forward mode,
 			 * but snapbuilder/slot statuses are updated properly.
 			 */
-			XLogReadRecord(ctx->reader, startlsn, &record, &errm);
+			while (XLogReadRecord(ctx->reader, startlsn, &record, &errm) ==
+				   XLREAD_NEED_DATA)
+				ctx->reader->read_page(ctx->reader,
+									   ctx->reader->loadPagePtr,
+									   ctx->reader->loadLen,
+									   ctx->reader->currRecPtr,
+									   ctx->reader->readBuf,
+									   &ctx->reader->readPageTLI);
+
 			if (errm)
 				elog(ERROR, "%s", errm);
 
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 7ca1536eaf..cb85ba3abf 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2821,7 +2821,15 @@ XLogSendLogical(void)
 	 */
 	WalSndCaughtUp = false;
 
-	XLogReadRecord(logical_decoding_ctx->reader, logical_startptr, &record, &errm);
+	while (XLogReadRecord(logical_decoding_ctx->reader,
+						  logical_startptr, &record, &errm) == XLREAD_NEED_DATA)
+		logical_decoding_ctx->reader->read_page(logical_decoding_ctx->reader,
+								   logical_decoding_ctx->reader->loadPagePtr,
+								   logical_decoding_ctx->reader->loadLen,
+								   logical_decoding_ctx->reader->currRecPtr,
+								   logical_decoding_ctx->reader->readBuf,
+								   &logical_decoding_ctx->reader->readPageTLI);
+
 	logical_startptr = InvalidXLogRecPtr;
 
 	/* xlog record was invalid */
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index b110559e63..21b638ed34 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -76,7 +76,14 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex,
 
 	do
 	{
-		XLogReadRecord(xlogreader, startpoint, &record, &errormsg);
+		while (XLogReadRecord(xlogreader, startpoint, &record, &errormsg) ==
+			   XLREAD_NEED_DATA)
+			xlogreader->read_page(xlogreader,
+								  xlogreader->loadPagePtr,
+								  xlogreader->loadLen,
+								  xlogreader->currRecPtr,
+								  xlogreader->readBuf,
+								  &xlogreader->readPageTLI);
 
 		if (record == NULL)
 		{
@@ -128,7 +135,15 @@ readOneRecord(const char *datadir, XLogRecPtr ptr, int tliIndex)
 	if (xlogreader == NULL)
 		pg_fatal("out of memory");
 
-	XLogReadRecord(xlogreader, ptr, &record, &errormsg);
+	while (XLogReadRecord(xlogreader, ptr, &record, &errormsg) ==
+		   XLREAD_NEED_DATA)
+				xlogreader->read_page(xlogreader,
+									  xlogreader->loadPagePtr,
+									  xlogreader->loadLen,
+									  xlogreader->currRecPtr,
+									  xlogreader->readBuf,
+									  &xlogreader->readPageTLI);
+
 	if (record == NULL)
 	{
 		if (errormsg)
@@ -191,7 +206,14 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex,
 	{
 		uint8		info;
 
-		XLogReadRecord(xlogreader, searchptr, &record, &errormsg);
+		while (XLogReadRecord(xlogreader, searchptr, &record, &errormsg) ==
+			   XLREAD_NEED_DATA)
+			xlogreader->read_page(xlogreader,
+								  xlogreader->loadPagePtr,
+								  xlogreader->loadLen,
+								  xlogreader->currRecPtr,
+								  xlogreader->readBuf,
+								  &xlogreader->readPageTLI);
 
 		if (record == NULL)
 		{
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index 280e4754ca..acee7ae199 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -1125,7 +1125,16 @@ main(int argc, char **argv)
 	for (;;)
 	{
 		/* try to read the next record */
-		XLogReadRecord(xlogreader_state, first_record, &record, &errormsg);
+		while (XLogReadRecord(xlogreader_state,
+							  first_record, &record, &errormsg) ==
+			   XLREAD_NEED_DATA)
+			xlogreader_state->read_page(xlogreader_state,
+										xlogreader_state->loadPagePtr,
+										xlogreader_state->loadLen,
+										xlogreader_state->currRecPtr,
+										xlogreader_state->readBuf,
+										&xlogreader_state->readPageTLI);
+
 		if (!record)
 		{
 			if (!config.follow || private.endptr_reached)
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index c8bd7afebe..338dc2c14d 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -70,6 +70,7 @@ typedef struct
 typedef enum XLogReadRecordResult
 {
 	XLREAD_SUCCESS,		/* record is successfully read */
+	XLREAD_NEED_DATA,	/* need more data. see XLogReadRecord. */
 	XLREAD_FAIL			/* failed during reading a record */
 } XLogReadRecordResult;
 
@@ -82,6 +83,17 @@ typedef enum xlnd_stateid
 	XLND_STATE_PAGEFULLHEAD
 } xlnd_stateid;
 
+/* internal state of XLogReadRecord() */
+typedef enum xlread_stateid
+{
+	XLREAD_STATE_INIT,
+	XLREAD_STATE_READ_PAGE,
+	XLREAD_STATE_READ_NEXT_PAGE,
+	XLREAD_STATE_READ_PAGE_HADER,
+	XLREAD_STATE_READ_CONTRECORD,
+	XLREAD_STATE_READ_RECORD
+} xlread_stateid;
+
 struct XLogReaderState
 {
 	/* ----------------------------------------
@@ -217,12 +229,18 @@ struct XLogReaderState
 	/* Buffer to hold error message */
 	char	   *errormsg_buf;
 
-	/* Internal state of XLogNeedData */
+	/* Internal state of XLogNeedData and XLogReadRecord */
 	union
 	{
 		xlnd_stateid	c;
 		void 	 	   *j;
 	} xlnd_state;
+
+	union
+	{
+		xlread_stateid c;
+		void 		  *j;
+	} xlread_state;
 };
 
 /* Get a new XLogReader */
-- 
2.16.3

