From fa8ab3e2455ced1975199ba512f7a4e73b4bb23e Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Wed, 17 Apr 2019 09:57:07 +0900
Subject: [PATCH 03/10] Change interface of XLogReadRecord

As a preparation to the second step, this patch changes the interface
of XLogReadRecord so that the function can request the callers for
reading more data.
---
 src/backend/access/transam/twophase.c          |  2 +-
 src/backend/access/transam/xlog.c              |  2 +-
 src/backend/access/transam/xlogreader.c        | 52 ++++++++++++++------------
 src/backend/replication/logical/logical.c      |  2 +-
 src/backend/replication/logical/logicalfuncs.c |  2 +-
 src/backend/replication/slotfuncs.c            |  2 +-
 src/backend/replication/walsender.c            |  2 +-
 src/bin/pg_rewind/parsexlog.c                  |  6 +--
 src/bin/pg_waldump/pg_waldump.c                |  2 +-
 src/include/access/xlogreader.h                | 11 +++++-
 10 files changed, 47 insertions(+), 36 deletions(-)

diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index a399c0052d..5dba27e5dd 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -1394,7 +1394,7 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
 				 errmsg("out of memory"),
 				 errdetail("Failed while allocating a WAL reading processor.")));
 
-	record = XLogReadRecord(xlogreader, lsn, &errormsg);
+	XLogReadRecord(xlogreader, lsn, &record, &errormsg);
 	if (record == NULL)
 		ereport(ERROR,
 				(errcode_for_file_access(),
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 2130671d36..b6cb4111b8 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -4262,7 +4262,7 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
 	{
 		char	   *errormsg;
 
-		record = XLogReadRecord(xlogreader, RecPtr, &errormsg);
+		XLogReadRecord(xlogreader, RecPtr, &record, &errormsg);
 		ReadRecPtr = xlogreader->ReadRecPtr;
 		EndRecPtr = xlogreader->EndRecPtr;
 		if (record == NULL)
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 311606c5cb..2c6109a254 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -254,20 +254,21 @@ allocate_recordbuf(XLogReaderState *state, uint32 reclength)
  * If RecPtr is valid, try to read a record at that position.  Otherwise
  * try to read a record just after the last one previously read.
  *
- * If the read_page callback fails to read the requested data, NULL is
- * returned.  The callback is expected to have reported the error; errormsg
- * is set to NULL.
+ * If the read_page callback fails to read the requested data, *record is set
+ * to NULL and XLREAD_FAIL is returned.  The callback is expected to have
+ * reported the error; errormsg is set to NULL.
  *
- * If the reading fails for some other reason, NULL is also returned, and
- * *errormsg is set to a string with details of the failure.
+ * If the reading fails for some other reason, *record is also set to NULL and
+ * XLREAD_FAIL is returned. *errormsg is set to a string with details of the
+ * failure.
  *
  * The returned pointer (or *errormsg) points to an internal buffer that's
  * valid until the next call to XLogReadRecord.
  */
-XLogRecord *
-XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
+XLogReadRecordResult
+XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr,
+			   XLogRecord **record, char **errormsg)
 {
-	XLogRecord *record;
 	XLogRecPtr	targetPagePtr;
 	bool		randAccess;
 	uint32		len,
@@ -374,8 +375,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
 	 * cannot access any other fields until we've verified that we got the
 	 * whole header.
 	 */
-	record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ);
-	total_len = record->xl_tot_len;
+	*record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ);
+	total_len = (*record)->xl_tot_len;
 
 	/*
 	 * If the whole record header is on this page, validate it immediately.
@@ -387,7 +388,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
 	 */
 	if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
 	{
-		if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, record,
+		if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, *record,
 								   randAccess))
 			goto err;
 		gotheader = true;
@@ -509,9 +510,9 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
 			/* If we just reassembled the record header, validate it. */
 			if (!gotheader)
 			{
-				record = (XLogRecord *) state->readRecordBuf;
+				*record = (XLogRecord *) state->readRecordBuf;
 				if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr,
-										   record, randAccess))
+										   *record, randAccess))
 					goto err;
 				gotheader = true;
 			}
@@ -519,8 +520,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
 
 		Assert(gotheader);
 
-		record = (XLogRecord *) state->readRecordBuf;
-		if (!ValidXLogRecord(state, record, RecPtr))
+		*record = (XLogRecord *) state->readRecordBuf;
+		if (!ValidXLogRecord(state, *record, RecPtr))
 			goto err;
 
 		pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
@@ -541,7 +542,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
 			goto err;
 
 		/* Record does not cross a page boundary */
-		if (!ValidXLogRecord(state, record, RecPtr))
+		if (!ValidXLogRecord(state, *record, RecPtr))
 			goto err;
 
 		state->EndRecPtr = RecPtr + MAXALIGN(total_len);
@@ -552,18 +553,19 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
 	/*
 	 * Special processing if it's an XLOG SWITCH record
 	 */
-	if (record->xl_rmid == RM_XLOG_ID &&
-		(record->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH)
+	if ((*record)->xl_rmid == RM_XLOG_ID &&
+		((*record)->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH)
 	{
 		/* Pretend it extends to end of segment */
 		state->EndRecPtr += state->wal_segment_size - 1;
 		state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->wal_segment_size);
 	}
 
-	if (DecodeXLogRecord(state, record, errormsg))
-		return record;
-	else
-		return NULL;
+	if (DecodeXLogRecord(state, *record, errormsg))
+		return XLREAD_SUCCESS;
+
+	*record = NULL;
+	return XLREAD_FAIL;
 
 err:
 
@@ -576,7 +578,8 @@ err:
 	if (state->errormsg_buf[0] != '\0')
 		*errormsg = state->errormsg_buf;
 
-	return NULL;
+	*record = NULL;
+	return XLREAD_FAIL;
 }
 
 /*
@@ -980,6 +983,7 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
 	XLogRecPtr	tmpRecPtr;
 	XLogRecPtr	found = InvalidXLogRecPtr;
 	XLogPageHeader header;
+	XLogRecord *record;
 	char	   *errormsg;
 
 	Assert(!XLogRecPtrIsInvalid(RecPtr));
@@ -1068,7 +1072,7 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
 	 * because either we're at the first record after the beginning of a page
 	 * or we just jumped over the remaining data of a continuation.
 	 */
-	while (XLogReadRecord(state, tmpRecPtr, &errormsg) != NULL)
+	while (XLogReadRecord(state, tmpRecPtr, &record, &errormsg) == XLREAD_SUCCESS)
 	{
 		/* continue after the record */
 		tmpRecPtr = InvalidXLogRecPtr;
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index acb4d9a106..d86feb0a0e 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -481,7 +481,7 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
 		char	   *err = NULL;
 
 		/* the read_page callback waits for new WAL */
-		record = XLogReadRecord(ctx->reader, startptr, &err);
+		XLogReadRecord(ctx->reader, startptr, &record, &err);
 		if (err)
 			elog(ERROR, "%s", err);
 		if (!record)
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 16c6095179..6fc78d7445 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -289,7 +289,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 			XLogRecord *record;
 			char	   *errm = NULL;
 
-			record = XLogReadRecord(ctx->reader, startptr, &errm);
+			XLogReadRecord(ctx->reader, startptr, &record, &errm);
 			if (errm)
 				elog(ERROR, "%s", errm);
 
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 182fe5bc82..d261b402eb 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -433,7 +433,7 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
 			 * Read records.  No changes are generated in fast_forward mode,
 			 * but snapbuilder/slot statuses are updated properly.
 			 */
-			record = XLogReadRecord(ctx->reader, startlsn, &errm);
+			XLogReadRecord(ctx->reader, startlsn, &record, &errm);
 			if (errm)
 				elog(ERROR, "%s", errm);
 
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 1ef952b39a..7ca1536eaf 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2821,7 +2821,7 @@ XLogSendLogical(void)
 	 */
 	WalSndCaughtUp = false;
 
-	record = XLogReadRecord(logical_decoding_ctx->reader, logical_startptr, &errm);
+	XLogReadRecord(logical_decoding_ctx->reader, logical_startptr, &record, &errm);
 	logical_startptr = InvalidXLogRecPtr;
 
 	/* xlog record was invalid */
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index c3b9e738f7..b110559e63 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -76,7 +76,7 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex,
 
 	do
 	{
-		record = XLogReadRecord(xlogreader, startpoint, &errormsg);
+		XLogReadRecord(xlogreader, startpoint, &record, &errormsg);
 
 		if (record == NULL)
 		{
@@ -128,7 +128,7 @@ readOneRecord(const char *datadir, XLogRecPtr ptr, int tliIndex)
 	if (xlogreader == NULL)
 		pg_fatal("out of memory");
 
-	record = XLogReadRecord(xlogreader, ptr, &errormsg);
+	XLogReadRecord(xlogreader, ptr, &record, &errormsg);
 	if (record == NULL)
 	{
 		if (errormsg)
@@ -191,7 +191,7 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex,
 	{
 		uint8		info;
 
-		record = XLogReadRecord(xlogreader, searchptr, &errormsg);
+		XLogReadRecord(xlogreader, searchptr, &record, &errormsg);
 
 		if (record == NULL)
 		{
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index 9ad70a2f5c..280e4754ca 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -1125,7 +1125,7 @@ main(int argc, char **argv)
 	for (;;)
 	{
 		/* try to read the next record */
-		record = XLogReadRecord(xlogreader_state, first_record, &errormsg);
+		XLogReadRecord(xlogreader_state, first_record, &record, &errormsg);
 		if (!record)
 		{
 			if (!config.follow || private.endptr_reached)
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index f23d68c030..17ee30fa36 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -66,6 +66,13 @@ typedef struct
 	uint16		data_bufsz;
 } DecodedBkpBlock;
 
+/* Return code from XLogReadRecord */
+typedef enum XLogReadRecordResult
+{
+	XLREAD_SUCCESS,		/* record is successfully read */
+	XLREAD_FAIL			/* failed during reading a record */
+} XLogReadRecordResult;
+
 /* internal state of XLogNeedData() */
 typedef enum xlnd_stateid
 {
@@ -227,8 +234,8 @@ extern XLogReaderState *XLogReaderAllocate(int wal_segment_size,
 extern void XLogReaderFree(XLogReaderState *state);
 
 /* Read the next XLog record. Returns NULL on end-of-WAL or failure */
-extern struct XLogRecord *XLogReadRecord(XLogReaderState *state,
-			   XLogRecPtr recptr, char **errormsg);
+extern XLogReadRecordResult XLogReadRecord(XLogReaderState *state,
+					XLogRecPtr recptr, XLogRecord **record, char **errormsg);
 
 /* Validate a page */
 extern bool XLogReaderValidatePageHeader(XLogReaderState *state,
-- 
2.16.3

