From c3ac6cb8c4d4478053686d9485b74e912949c391 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Fri, 24 May 2019 09:33:29 +0900
Subject: [PATCH 04/10] Make XLogReadRecord a state machine

This patch moves the caller sites of the callback above XLogReadRecord
by making XLogReadRecord a state machine.
---
 src/backend/access/transam/twophase.c          |   8 +-
 src/backend/access/transam/xlog.c              |   8 +-
 src/backend/access/transam/xlogreader.c        | 108 ++++++++++++++++---------
 src/backend/replication/logical/logical.c      |  10 ++-
 src/backend/replication/logical/logicalfuncs.c |  10 ++-
 src/backend/replication/slotfuncs.c            |  10 ++-
 src/backend/replication/walsender.c            |  10 ++-
 src/bin/pg_rewind/parsexlog.c                  |  28 ++++++-
 src/bin/pg_waldump/pg_waldump.c                |  11 ++-
 src/include/access/xlogreader.h                |  12 +++
 10 files changed, 165 insertions(+), 50 deletions(-)

diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 653f685870..6feca69126 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -1394,7 +1394,13 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
 				 errmsg("out of memory"),
 				 errdetail("Failed while allocating a WAL reading processor.")));
 
-	XLogReadRecord(xlogreader, lsn, &record, &errormsg);
+	while (XLogReadRecord(xlogreader, lsn, &record, &errormsg) ==
+		   XLREAD_NEED_DATA)
+			xlogreader->read_page(xlogreader,
+								  xlogreader->loadPagePtr, xlogreader->loadLen,
+								  xlogreader->currRecPtr, xlogreader->readBuf,
+								  &xlogreader->readPageTLI);
+
 	if (record == NULL)
 		ereport(ERROR,
 				(errcode_for_file_access(),
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 27ab6cc815..573b49050d 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -4261,7 +4261,13 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
 	{
 		char	   *errormsg;
 
-		XLogReadRecord(xlogreader, RecPtr, &record, &errormsg);
+		while (XLogReadRecord(xlogreader, RecPtr, &record, &errormsg)
+			   == XLREAD_NEED_DATA)
+			xlogreader->read_page(xlogreader,
+								  xlogreader->loadPagePtr, xlogreader->loadLen,
+								  xlogreader->currRecPtr, xlogreader->readBuf,
+								  &xlogreader->readPageTLI);
+
 		ReadRecPtr = xlogreader->ReadRecPtr;
 		EndRecPtr = xlogreader->EndRecPtr;
 		if (record == NULL)
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 8bd0e2925d..4ab0655af5 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -282,31 +282,57 @@ allocate_recordbuf(XLogReaderState *state, uint32 reclength)
 /*
  * Attempt to read an XLOG record.
  *
- * If RecPtr is valid, try to read a record at that position.  Otherwise
- * try to read a record just after the last one previously read.
+ * This function runs a state machine and may need to call several times until
+ * a record is read.
  *
- * If the read_page callback fails to read the requested data, *record is set
- * to NULL and XLREAD_FAIL is returned.  The callback is expected to have
- * reported the error; errormsg is set to NULL.
+ * At the initial state, if called with valid pRecPtr, try to read a record at
+ * that position.  If invalid pRecPtr is given try to read a record just after
+ * the last one previously read.
  *
- * If the reading fails for some other reason, *record is also set to NULL and
- * XLREAD_FAIL is returned. *errormsg is set to a string with details of the
- * failure.
+ * When a record is successfully read, returns XLREAD_SUCCESS with result
+ * record being stored in *record then the state machine is reset to initial
+ * state.
+ *
+ * Returns XLREAD_NEED_DATA if needs more data fed.  In that case loadPagePtr
+ * and loadLen in state is set to inform the required WAL data. The caller
+ * shall read in the requested data into readBuf and set readLen and
+ * readPageTLI to the length of the data actually read and the TLI for the
+ * data read in respectively. In case of failure the caller shall call the
+ * function setting readLen to -1 and storing error message in errormsg_buf to
+ * inform error.
+ *
+ * If the reading fails for some reasons including caller-side error mentioned
+ * above, returns XLREAD_FAIL with *record being set to NULL. *errormsg is set
+ * to a string with details of the failure. The state machine is reset to
+ * initial state.
  *
  * The returned pointer (or *errormsg) points to an internal buffer that's
  * valid until the next call to XLogReadRecord.
+ *
+ * Note: This function is not reentrant. The state is maintained internally in
+ * the function. DO NOT non-local exit (ereport) from inside of this function.
  */
 XLogReadRecordResult
-XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr,
+XLogReadRecord(XLogReaderState *state, XLogRecPtr pRecPtr,
 			   XLogRecord **record, char **errormsg)
 {
-	XLogRecPtr	targetPagePtr;
-	bool		randAccess;
-	uint32		len,
-				total_len;
-	uint32		targetRecOff;
-	uint32		pageHeaderSize;
-	bool		gotheader;
+	/*
+	 * This function is a state machine that can exit and reenter at any place
+	 * marked as XLR_LEAVE. All internal state needs to be preserved through
+	 * multiple calls.
+	 */
+	static XLogRecPtr	targetPagePtr;
+	static bool			randAccess;
+	static uint32		len,
+						total_len;
+	static uint32		targetRecOff;
+	static uint32		pageHeaderSize;
+	static bool			gotheader;
+	static XLogRecPtr	RecPtr;
+
+	XLR_SWITCH(XLREAD_STATE_INIT);
+
+	RecPtr = pRecPtr;
 
 	/*
 	 * randAccess indicates whether to verify the previous-record pointer of
@@ -360,10 +386,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr,
 	 */
 	while (XLogNeedData(state, targetPagePtr,
 						Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ)))
-		state->read_page(state, state->loadPagePtr, state->loadLen,
-						 state->currRecPtr, state->readBuf,
-						 &state->readPageTLI);
-	
+		XLR_LEAVE(XLREAD_STATE_PAGE, XLREAD_NEED_DATA);
+
 	if (state->readLen < 0)
 		goto err;
 
@@ -442,10 +466,10 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr,
 	if (total_len > len)
 	{
 		/* Need to reassemble record */
-		char	   *contdata;
-		XLogPageHeader pageHeader;
-		char	   *buffer;
-		uint32		gotlen;
+		static char	   *contdata;
+		static XLogPageHeader pageHeader;
+		static char	   *buffer;
+		static uint32	gotlen;
 
 		/*
 		 * Enlarge readRecordBuf as needed.
@@ -475,9 +499,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr,
 			while (XLogNeedData(state, targetPagePtr,
 								Min(total_len - gotlen + SizeOfXLogShortPHD,
 									XLOG_BLCKSZ)))
-				state->read_page(state, state->loadPagePtr, state->loadLen,
-								 state->currRecPtr, state->readBuf,
-								 &state->readPageTLI);
+				XLR_LEAVE(XLREAD_STATE_CONTPAGE, XLREAD_NEED_DATA);
 
 			if (state->readLen < 0)
 				goto err;
@@ -514,9 +536,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr,
 			if (state->readLen < pageHeaderSize)
 			{
 				while (XLogNeedData(state, targetPagePtr, pageHeaderSize))
-					state->read_page(state, state->loadPagePtr, state->loadLen,
-									 state->currRecPtr, state->readBuf,
-									 &state->readPageTLI);
+					XLR_LEAVE(XLREAD_STATE_CONTPAGE_HEADER, XLREAD_NEED_DATA);
 			}
 
 			Assert(pageHeaderSize <= state->readLen);
@@ -529,9 +549,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr,
 			if (state->readLen < pageHeaderSize + len)
 			{
 				if (XLogNeedData(state, targetPagePtr, pageHeaderSize + len))
-					state->read_page(state, state->loadPagePtr, state->loadLen,
-									 state->currRecPtr, state->readBuf,
-									 &state->readPageTLI);
+					XLR_LEAVE(XLREAD_STATE_CONTRECORD, XLREAD_NEED_DATA);
 			}
 
 			memcpy(buffer, (char *) contdata, len);
@@ -565,9 +583,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr,
 		/* Wait for the record data to become available */
 		while (XLogNeedData(state, targetPagePtr,
 							Min(targetRecOff + total_len, XLOG_BLCKSZ)))
-			state->read_page(state, state->loadPagePtr, state->loadLen,
-							 state->currRecPtr, state->readBuf,
-							 &state->readPageTLI);
+			XLR_LEAVE(XLREAD_STATE_RECORD, XLREAD_NEED_DATA);
 
 		if (state->readLen < 0)
 			goto err;
@@ -581,6 +597,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr,
 		state->ReadRecPtr = RecPtr;
 	}
 
+	XLR_SWITCH_END();
+
 	/*
 	 * Special processing if it's an XLOG SWITCH record
 	 */
@@ -593,10 +611,10 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr,
 	}
 
 	if (DecodeXLogRecord(state, *record, errormsg))
-		return XLREAD_SUCCESS;
+		XLR_RETURN(XLREAD_SUCCESS);
 
 	*record = NULL;
-	return XLREAD_FAIL;
+	XLR_RETURN(XLREAD_FAIL);
 
 err:
 
@@ -610,7 +628,7 @@ err:
 		*errormsg = state->errormsg_buf;
 
 	*record = NULL;
-	return XLREAD_FAIL;
+	XLR_RETURN(XLREAD_FAIL);
 }
 
 /*
@@ -1005,6 +1023,7 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
 	XLogRecPtr	found = InvalidXLogRecPtr;
 	XLogPageHeader header;
 	XLogRecord *record;
+	XLogReadRecordResult result;
 	char	   *errormsg;
 
 	Assert(!XLogRecPtrIsInvalid(RecPtr));
@@ -1093,8 +1112,17 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
 	 * because either we're at the first record after the beginning of a page
 	 * or we just jumped over the remaining data of a continuation.
 	 */
-	while (XLogReadRecord(state, tmpRecPtr, &record, &errormsg) == XLREAD_SUCCESS)
+	while ((result = XLogReadRecord(state, tmpRecPtr, &record, &errormsg)) !=
+		   XLREAD_FAIL)
 	{
+		if (result == XLREAD_NEED_DATA)
+		{
+			state->read_page(state, state->loadPagePtr, state->loadLen,
+							 state->currRecPtr,	state->readBuf,
+							 &state->readPageTLI);
+			continue;
+		}
+
 		/* continue after the record */
 		tmpRecPtr = InvalidXLogRecPtr;
 
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 4f383721eb..06200ea2e9 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -481,7 +481,15 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
 		char	   *err = NULL;
 
 		/* the read_page callback waits for new WAL */
-		XLogReadRecord(ctx->reader, startptr, &record, &err);
+		while (XLogReadRecord(ctx->reader, startptr, &record, &err) ==
+			   XLREAD_NEED_DATA)
+			ctx->reader->read_page(ctx->reader,
+								   ctx->reader->loadPagePtr,
+								   ctx->reader->loadLen,
+								   ctx->reader->currRecPtr,
+								   ctx->reader->readBuf,
+								   &ctx->reader->readPageTLI);
+
 		if (err)
 			elog(ERROR, "%s", err);
 		if (!record)
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 6fc78d7445..4d09255504 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -289,7 +289,15 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 			XLogRecord *record;
 			char	   *errm = NULL;
 
-			XLogReadRecord(ctx->reader, startptr, &record, &errm);
+			while (XLogReadRecord(ctx->reader, startptr, &record, &errm) ==
+				   XLREAD_NEED_DATA)
+				ctx->reader->read_page(ctx->reader,
+									   ctx->reader->loadPagePtr,
+									   ctx->reader->loadLen,
+									   ctx->reader->currRecPtr,
+									   ctx->reader->readBuf,
+									   &ctx->reader->readPageTLI);
+
 			if (errm)
 				elog(ERROR, "%s", errm);
 
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 7db8e0d044..f4f4a907ad 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -433,7 +433,15 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
 			 * Read records.  No changes are generated in fast_forward mode,
 			 * but snapbuilder/slot statuses are updated properly.
 			 */
-			XLogReadRecord(ctx->reader, startlsn, &record, &errm);
+			while (XLogReadRecord(ctx->reader, startlsn, &record, &errm) ==
+				   XLREAD_NEED_DATA)
+				ctx->reader->read_page(ctx->reader,
+									   ctx->reader->loadPagePtr,
+									   ctx->reader->loadLen,
+									   ctx->reader->currRecPtr,
+									   ctx->reader->readBuf,
+									   &ctx->reader->readPageTLI);
+
 			if (errm)
 				elog(ERROR, "%s", errm);
 
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 4ba43592ca..36e14ab822 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2827,7 +2827,15 @@ XLogSendLogical(void)
 	 */
 	WalSndCaughtUp = false;
 
-	XLogReadRecord(logical_decoding_ctx->reader, logical_startptr, &record, &errm);
+	while (XLogReadRecord(logical_decoding_ctx->reader,
+						  logical_startptr, &record, &errm) == XLREAD_NEED_DATA)
+		logical_decoding_ctx->reader->read_page(logical_decoding_ctx->reader,
+								   logical_decoding_ctx->reader->loadPagePtr,
+								   logical_decoding_ctx->reader->loadLen,
+								   logical_decoding_ctx->reader->currRecPtr,
+								   logical_decoding_ctx->reader->readBuf,
+								   &logical_decoding_ctx->reader->readPageTLI);
+
 	logical_startptr = InvalidXLogRecPtr;
 
 	/* xlog record was invalid */
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 512005de1c..e26127206c 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -75,7 +75,14 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex,
 
 	do
 	{
-		XLogReadRecord(xlogreader, startpoint, &record, &errormsg);
+		while (XLogReadRecord(xlogreader, startpoint, &record, &errormsg) ==
+			   XLREAD_NEED_DATA)
+			xlogreader->read_page(xlogreader,
+								  xlogreader->loadPagePtr,
+								  xlogreader->loadLen,
+								  xlogreader->currRecPtr,
+								  xlogreader->readBuf,
+								  &xlogreader->readPageTLI);
 
 		if (record == NULL)
 		{
@@ -127,7 +134,15 @@ readOneRecord(const char *datadir, XLogRecPtr ptr, int tliIndex)
 	if (xlogreader == NULL)
 		pg_fatal("out of memory");
 
-	XLogReadRecord(xlogreader, ptr, &record, &errormsg);
+	while (XLogReadRecord(xlogreader, ptr, &record, &errormsg) ==
+		   XLREAD_NEED_DATA)
+				xlogreader->read_page(xlogreader,
+									  xlogreader->loadPagePtr,
+									  xlogreader->loadLen,
+									  xlogreader->currRecPtr,
+									  xlogreader->readBuf,
+									  &xlogreader->readPageTLI);
+
 	if (record == NULL)
 	{
 		if (errormsg)
@@ -190,7 +205,14 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex,
 	{
 		uint8		info;
 
-		XLogReadRecord(xlogreader, searchptr, &record, &errormsg);
+		while (XLogReadRecord(xlogreader, searchptr, &record, &errormsg) ==
+			   XLREAD_NEED_DATA)
+			xlogreader->read_page(xlogreader,
+								  xlogreader->loadPagePtr,
+								  xlogreader->loadLen,
+								  xlogreader->currRecPtr,
+								  xlogreader->readBuf,
+								  &xlogreader->readPageTLI);
 
 		if (record == NULL)
 		{
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index 41aa108215..e2e93f144a 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -1132,7 +1132,16 @@ main(int argc, char **argv)
 	for (;;)
 	{
 		/* try to read the next record */
-		XLogReadRecord(xlogreader_state, first_record, &record, &errormsg);
+		while (XLogReadRecord(xlogreader_state,
+							  first_record, &record, &errormsg) ==
+			   XLREAD_NEED_DATA)
+			xlogreader_state->read_page(xlogreader_state,
+										xlogreader_state->loadPagePtr,
+										xlogreader_state->loadLen,
+										xlogreader_state->currRecPtr,
+										xlogreader_state->readBuf,
+										&xlogreader_state->readPageTLI);
+
 		if (!record)
 		{
 			if (!config.follow || private.endptr_reached)
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 0e734d27f1..bc0c642906 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -70,6 +70,7 @@ typedef struct
 typedef enum XLogReadRecordResult
 {
 	XLREAD_SUCCESS,		/* record is successfully read */
+	XLREAD_NEED_DATA,	/* need more data. see XLogReadRecord. */
 	XLREAD_FAIL			/* failed during reading a record */
 } XLogReadRecordResult;
 
@@ -82,6 +83,17 @@ typedef enum xlnd_stateid
 	XLND_STATE_PAGELONGHEADER
 } xlnd_stateid;
 
+/* internal state of XLogReadRecord() */
+typedef enum xlread_stateid
+{
+	XLREAD_STATE_INIT,
+	XLREAD_STATE_PAGE,
+	XLREAD_STATE_CONTPAGE,
+	XLREAD_STATE_CONTPAGE_HEADER,
+	XLREAD_STATE_CONTRECORD,
+	XLREAD_STATE_RECORD
+} xlread_stateid;
+
 struct XLogReaderState
 {
 	/* ----------------------------------------
-- 
2.16.3

