From 949d47fe75f33a2da7cbd44c6fd3e7116381e0a6 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Fri, 24 May 2019 09:22:44 +0900
Subject: [PATCH 03/10] Change interface of XLogReadRecord

As a preparation to the second step, this patch changes the interface
of XLogReadRecord so that the function can request the callers for
reading more data.
---
 src/backend/access/transam/twophase.c          |  2 +-
 src/backend/access/transam/xlog.c              |  2 +-
 src/backend/access/transam/xlogreader.c        | 52 ++++++++++++++------------
 src/backend/replication/logical/logical.c      |  2 +-
 src/backend/replication/logical/logicalfuncs.c |  2 +-
 src/backend/replication/slotfuncs.c            |  2 +-
 src/backend/replication/walsender.c            |  2 +-
 src/bin/pg_rewind/parsexlog.c                  |  6 +--
 src/bin/pg_waldump/pg_waldump.c                |  2 +-
 src/include/access/xlogreader.h                | 13 ++++++-
 10 files changed, 49 insertions(+), 36 deletions(-)

diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 477709bbc2..653f685870 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -1394,7 +1394,7 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
 				 errmsg("out of memory"),
 				 errdetail("Failed while allocating a WAL reading processor.")));
 
-	record = XLogReadRecord(xlogreader, lsn, &errormsg);
+	XLogReadRecord(xlogreader, lsn, &record, &errormsg);
 	if (record == NULL)
 		ereport(ERROR,
 				(errcode_for_file_access(),
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index f517dec16c..27ab6cc815 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -4261,7 +4261,7 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
 	{
 		char	   *errormsg;
 
-		record = XLogReadRecord(xlogreader, RecPtr, &errormsg);
+		XLogReadRecord(xlogreader, RecPtr, &record, &errormsg);
 		ReadRecPtr = xlogreader->ReadRecPtr;
 		EndRecPtr = xlogreader->EndRecPtr;
 		if (record == NULL)
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 45d201fc23..8bd0e2925d 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -285,20 +285,21 @@ allocate_recordbuf(XLogReaderState *state, uint32 reclength)
  * If RecPtr is valid, try to read a record at that position.  Otherwise
  * try to read a record just after the last one previously read.
  *
- * If the read_page callback fails to read the requested data, NULL is
- * returned.  The callback is expected to have reported the error; errormsg
- * is set to NULL.
+ * If the read_page callback fails to read the requested data, *record is set
+ * to NULL and XLREAD_FAIL is returned.  The callback is expected to have
+ * reported the error; errormsg is set to NULL.
  *
- * If the reading fails for some other reason, NULL is also returned, and
- * *errormsg is set to a string with details of the failure.
+ * If the reading fails for some other reason, *record is also set to NULL and
+ * XLREAD_FAIL is returned. *errormsg is set to a string with details of the
+ * failure.
  *
  * The returned pointer (or *errormsg) points to an internal buffer that's
  * valid until the next call to XLogReadRecord.
  */
-XLogRecord *
-XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
+XLogReadRecordResult
+XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr,
+			   XLogRecord **record, char **errormsg)
 {
-	XLogRecord *record;
 	XLogRecPtr	targetPagePtr;
 	bool		randAccess;
 	uint32		len,
@@ -405,8 +406,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
 	 * cannot access any other fields until we've verified that we got the
 	 * whole header.
 	 */
-	record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ);
-	total_len = record->xl_tot_len;
+	*record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ);
+	total_len = (*record)->xl_tot_len;
 
 	/*
 	 * If the whole record header is on this page, validate it immediately.
@@ -418,7 +419,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
 	 */
 	if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
 	{
-		if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, record,
+		if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, *record,
 								   randAccess))
 			goto err;
 		gotheader = true;
@@ -540,9 +541,9 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
 			/* If we just reassembled the record header, validate it. */
 			if (!gotheader)
 			{
-				record = (XLogRecord *) state->readRecordBuf;
+				*record = (XLogRecord *) state->readRecordBuf;
 				if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr,
-										   record, randAccess))
+										   *record, randAccess))
 					goto err;
 				gotheader = true;
 			}
@@ -550,8 +551,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
 
 		Assert(gotheader);
 
-		record = (XLogRecord *) state->readRecordBuf;
-		if (!ValidXLogRecord(state, record, RecPtr))
+		*record = (XLogRecord *) state->readRecordBuf;
+		if (!ValidXLogRecord(state, *record, RecPtr))
 			goto err;
 
 		pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
@@ -572,7 +573,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
 			goto err;
 
 		/* Record does not cross a page boundary */
-		if (!ValidXLogRecord(state, record, RecPtr))
+		if (!ValidXLogRecord(state, *record, RecPtr))
 			goto err;
 
 		state->EndRecPtr = RecPtr + MAXALIGN(total_len);
@@ -583,18 +584,19 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
 	/*
 	 * Special processing if it's an XLOG SWITCH record
 	 */
-	if (record->xl_rmid == RM_XLOG_ID &&
-		(record->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH)
+	if ((*record)->xl_rmid == RM_XLOG_ID &&
+		((*record)->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH)
 	{
 		/* Pretend it extends to end of segment */
 		state->EndRecPtr += state->wal_segment_size - 1;
 		state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->wal_segment_size);
 	}
 
-	if (DecodeXLogRecord(state, record, errormsg))
-		return record;
-	else
-		return NULL;
+	if (DecodeXLogRecord(state, *record, errormsg))
+		return XLREAD_SUCCESS;
+
+	*record = NULL;
+	return XLREAD_FAIL;
 
 err:
 
@@ -607,7 +609,8 @@ err:
 	if (state->errormsg_buf[0] != '\0')
 		*errormsg = state->errormsg_buf;
 
-	return NULL;
+	*record = NULL;
+	return XLREAD_FAIL;
 }
 
 /*
@@ -1001,6 +1004,7 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
 	XLogRecPtr	tmpRecPtr;
 	XLogRecPtr	found = InvalidXLogRecPtr;
 	XLogPageHeader header;
+	XLogRecord *record;
 	char	   *errormsg;
 
 	Assert(!XLogRecPtrIsInvalid(RecPtr));
@@ -1089,7 +1093,7 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
 	 * because either we're at the first record after the beginning of a page
 	 * or we just jumped over the remaining data of a continuation.
 	 */
-	while (XLogReadRecord(state, tmpRecPtr, &errormsg) != NULL)
+	while (XLogReadRecord(state, tmpRecPtr, &record, &errormsg) == XLREAD_SUCCESS)
 	{
 		/* continue after the record */
 		tmpRecPtr = InvalidXLogRecPtr;
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 9853be6d1c..4f383721eb 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -481,7 +481,7 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
 		char	   *err = NULL;
 
 		/* the read_page callback waits for new WAL */
-		record = XLogReadRecord(ctx->reader, startptr, &err);
+		XLogReadRecord(ctx->reader, startptr, &record, &err);
 		if (err)
 			elog(ERROR, "%s", err);
 		if (!record)
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 16c6095179..6fc78d7445 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -289,7 +289,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 			XLogRecord *record;
 			char	   *errm = NULL;
 
-			record = XLogReadRecord(ctx->reader, startptr, &errm);
+			XLogReadRecord(ctx->reader, startptr, &record, &errm);
 			if (errm)
 				elog(ERROR, "%s", errm);
 
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 808a6f5b83..7db8e0d044 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -433,7 +433,7 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
 			 * Read records.  No changes are generated in fast_forward mode,
 			 * but snapbuilder/slot statuses are updated properly.
 			 */
-			record = XLogReadRecord(ctx->reader, startlsn, &errm);
+			XLogReadRecord(ctx->reader, startlsn, &record, &errm);
 			if (errm)
 				elog(ERROR, "%s", errm);
 
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 130656e5b1..4ba43592ca 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2827,7 +2827,7 @@ XLogSendLogical(void)
 	 */
 	WalSndCaughtUp = false;
 
-	record = XLogReadRecord(logical_decoding_ctx->reader, logical_startptr, &errm);
+	XLogReadRecord(logical_decoding_ctx->reader, logical_startptr, &record, &errm);
 	logical_startptr = InvalidXLogRecPtr;
 
 	/* xlog record was invalid */
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 0bfbea8f09..512005de1c 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -75,7 +75,7 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex,
 
 	do
 	{
-		record = XLogReadRecord(xlogreader, startpoint, &errormsg);
+		XLogReadRecord(xlogreader, startpoint, &record, &errormsg);
 
 		if (record == NULL)
 		{
@@ -127,7 +127,7 @@ readOneRecord(const char *datadir, XLogRecPtr ptr, int tliIndex)
 	if (xlogreader == NULL)
 		pg_fatal("out of memory");
 
-	record = XLogReadRecord(xlogreader, ptr, &errormsg);
+	XLogReadRecord(xlogreader, ptr, &record, &errormsg);
 	if (record == NULL)
 	{
 		if (errormsg)
@@ -190,7 +190,7 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex,
 	{
 		uint8		info;
 
-		record = XLogReadRecord(xlogreader, searchptr, &errormsg);
+		XLogReadRecord(xlogreader, searchptr, &record, &errormsg);
 
 		if (record == NULL)
 		{
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index f4dfdd12ca..41aa108215 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -1132,7 +1132,7 @@ main(int argc, char **argv)
 	for (;;)
 	{
 		/* try to read the next record */
-		record = XLogReadRecord(xlogreader_state, first_record, &errormsg);
+		XLogReadRecord(xlogreader_state, first_record, &record, &errormsg);
 		if (!record)
 		{
 			if (!config.follow || private.endptr_reached)
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index d3b3e4b7ba..0e734d27f1 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -66,6 +66,13 @@ typedef struct
 	uint16		data_bufsz;
 } DecodedBkpBlock;
 
+/* Return code from XLogReadRecord */
+typedef enum XLogReadRecordResult
+{
+	XLREAD_SUCCESS,		/* record is successfully read */
+	XLREAD_FAIL			/* failed during reading a record */
+} XLogReadRecordResult;
+
 /* internal state of XLogNeedData() */
 typedef enum xlnd_stateid
 {
@@ -220,8 +227,10 @@ extern XLogReaderState *XLogReaderAllocate(int wal_segment_size,
 extern void XLogReaderFree(XLogReaderState *state);
 
 /* Read the next XLog record. Returns NULL on end-of-WAL or failure */
-extern struct XLogRecord *XLogReadRecord(XLogReaderState *state,
-										 XLogRecPtr recptr, char **errormsg);
+extern XLogReadRecordResult XLogReadRecord(XLogReaderState *state,
+										   XLogRecPtr recptr,
+										   XLogRecord **record,
+										   char **errormsg);
 
 /* Validate a page */
 extern bool XLogReaderValidatePageHeader(XLogReaderState *state,
-- 
2.16.3

