From 2b7e89d347c154a2bdb2e07b8c962df5e1a70e25 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Thu, 18 Apr 2019 15:35:10 +0900
Subject: [PATCH 07/10] Make logical rep stuff not use callback but call the
 function directly

This is a bit different from the two before. This patch moves the
callback from XLogReaderState to LogicalDecodingContext. Then
invalidate the parameters callback and private for XLogReaderAllocate.
---
 src/backend/replication/logical/logical.c      | 19 +++++++++----------
 src/backend/replication/logical/logicalfuncs.c | 19 +++++--------------
 src/backend/replication/slotfuncs.c            |  9 +++------
 src/backend/replication/walsender.c            | 17 +++++++++--------
 src/include/replication/logical.h              | 13 +++++++++----
 src/include/replication/logicalfuncs.h         |  5 +----
 6 files changed, 36 insertions(+), 46 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 1740753b76..6dc87207b0 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -123,7 +123,7 @@ StartupDecodingContext(List *output_plugin_options,
 					   TransactionId xmin_horizon,
 					   bool need_full_snapshot,
 					   bool fast_forward,
-					   XLogPageReadCB read_page,
+					   LogicalDecodingXLogReadPageCB read_page,
 					   LogicalOutputPluginWriterPrepareWrite prepare_write,
 					   LogicalOutputPluginWriterWrite do_write,
 					   LogicalOutputPluginWriterUpdateProgress update_progress)
@@ -172,12 +172,14 @@ StartupDecodingContext(List *output_plugin_options,
 
 	ctx->slot = slot;
 
-	ctx->reader = XLogReaderAllocate(wal_segment_size, read_page, ctx);
+	ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, NULL);
 	if (!ctx->reader)
 		ereport(ERROR,
 				(errcode(ERRCODE_OUT_OF_MEMORY),
 				 errmsg("out of memory")));
 
+	ctx->read_page = read_page;
+
 	ctx->reorder = ReorderBufferAllocate();
 	ctx->snapshot_builder =
 		AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn,
@@ -231,7 +233,7 @@ CreateInitDecodingContext(char *plugin,
 						  List *output_plugin_options,
 						  bool need_full_snapshot,
 						  XLogRecPtr restart_lsn,
-						  XLogPageReadCB read_page,
+						  LogicalDecodingXLogReadPageCB read_page,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
 						  LogicalOutputPluginWriterWrite do_write,
 						  LogicalOutputPluginWriterUpdateProgress update_progress)
@@ -373,7 +375,7 @@ LogicalDecodingContext *
 CreateDecodingContext(XLogRecPtr start_lsn,
 					  List *output_plugin_options,
 					  bool fast_forward,
-					  XLogPageReadCB read_page,
+					  LogicalDecodingXLogReadPageCB read_page,
 					  LogicalOutputPluginWriterPrepareWrite prepare_write,
 					  LogicalOutputPluginWriterWrite do_write,
 					  LogicalOutputPluginWriterUpdateProgress update_progress)
@@ -474,6 +476,8 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
 		 (uint32) (slot->data.restart_lsn >> 32),
 		 (uint32) slot->data.restart_lsn);
 
+	XLREAD_RESET(ctx->reader);
+
 	/* Wait for a consistent starting point */
 	for (;;)
 	{
@@ -483,12 +487,7 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
 		/* the read_page callback waits for new WAL */
 		while (XLogReadRecord(ctx->reader, startptr, &record, &err) ==
 			   XLREAD_NEED_DATA)
-			ctx->reader->read_page(ctx->reader,
-								   ctx->reader->loadPagePtr,
-								   ctx->reader->loadLen,
-								   ctx->reader->currRecPtr,
-								   ctx->reader->readBuf,
-								   &ctx->reader->readPageTLI);
+			ctx->read_page(ctx);
 
 		if (err)
 			elog(ERROR, "%s", err);
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 240a375d8f..0c4bc62cfa 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -115,15 +115,9 @@ check_permissions(void)
 }
 
 void
-logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
-							 int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
+logical_read_local_xlog_page(LogicalDecodingContext *ctx)
 {
-	Assert(targetPagePtr == state->loadPagePtr &&
-		   reqLen == state->loadLen &&
-		   targetRecPtr == state->currRecPtr &&
-		   cur_page == state->readBuf &&
-		   pageTLI == &state->readPageTLI);
-	read_local_xlog_page(state);
+	read_local_xlog_page(ctx->reader);
 }
 
 /*
@@ -286,6 +280,8 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 		/* invalidate non-timetravel entries */
 		InvalidateSystemCaches();
 
+		XLREAD_RESET(ctx->reader);
+
 		/* Decode until we run out of records */
 		while ((startptr != InvalidXLogRecPtr && startptr < end_of_wal) ||
 			   (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < end_of_wal))
@@ -295,12 +291,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 
 			while (XLogReadRecord(ctx->reader, startptr, &record, &errm) ==
 				   XLREAD_NEED_DATA)
-				ctx->reader->read_page(ctx->reader,
-									   ctx->reader->loadPagePtr,
-									   ctx->reader->loadLen,
-									   ctx->reader->currRecPtr,
-									   ctx->reader->readBuf,
-									   &ctx->reader->readPageTLI);
+				ctx->read_page(ctx);
 
 			if (errm)
 				elog(ERROR, "%s", errm);
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 4a8952931d..afdd3dea37 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -420,6 +420,8 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
 		/* invalidate non-timetravel entries */
 		InvalidateSystemCaches();
 
+		XLREAD_RESET(ctx->reader);
+
 		/* Decode at least one record, until we run out of records */
 		while ((!XLogRecPtrIsInvalid(startlsn) &&
 				startlsn < moveto) ||
@@ -435,12 +437,7 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
 			 */
 			while (XLogReadRecord(ctx->reader, startlsn, &record, &errm) ==
 				   XLREAD_NEED_DATA)
-				ctx->reader->read_page(ctx->reader,
-									   ctx->reader->loadPagePtr,
-									   ctx->reader->loadLen,
-									   ctx->reader->currRecPtr,
-									   ctx->reader->readBuf,
-									   &ctx->reader->readPageTLI);
+				ctx->read_page(ctx);
 
 			if (errm)
 				elog(ERROR, "%s", errm);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index cb85ba3abf..5ac3ef4c66 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -762,9 +762,13 @@ StartReplication(StartReplicationCmd *cmd)
  * set every time WAL is flushed.
  */
 static void
-logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
-					   XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
+logical_read_xlog_page(LogicalDecodingContext *ctx)
 {
+	XLogReaderState *state = ctx->reader;
+	XLogRecPtr		targetPagePtr = state->loadPagePtr;
+	int				reqLen		  = state->loadLen;
+	char		   *cur_page	  = state->readBuf;
+
 	XLogRecPtr	flushptr;
 	int			count;
 
@@ -2821,14 +2825,11 @@ XLogSendLogical(void)
 	 */
 	WalSndCaughtUp = false;
 
+	XLREAD_RESET(logical_decoding_ctx->reader);
+
 	while (XLogReadRecord(logical_decoding_ctx->reader,
 						  logical_startptr, &record, &errm) == XLREAD_NEED_DATA)
-		logical_decoding_ctx->reader->read_page(logical_decoding_ctx->reader,
-								   logical_decoding_ctx->reader->loadPagePtr,
-								   logical_decoding_ctx->reader->loadLen,
-								   logical_decoding_ctx->reader->currRecPtr,
-								   logical_decoding_ctx->reader->readBuf,
-								   &logical_decoding_ctx->reader->readPageTLI);
+		logical_decoding_ctx->read_page(logical_decoding_ctx);
 
 	logical_startptr = InvalidXLogRecPtr;
 
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 0a2a63a48c..70339eb8be 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -32,7 +32,11 @@ typedef void (*LogicalOutputPluginWriterUpdateProgress) (
 														 TransactionId xid
 );
 
-typedef struct LogicalDecodingContext
+typedef struct LogicalDecodingContext LogicalDecodingContext;
+
+typedef void (*LogicalDecodingXLogReadPageCB)(LogicalDecodingContext *ctx);
+
+struct LogicalDecodingContext
 {
 	/* memory context this is all allocated in */
 	MemoryContext context;
@@ -42,6 +46,7 @@ typedef struct LogicalDecodingContext
 
 	/* infrastructure pieces for decoding */
 	XLogReaderState *reader;
+	LogicalDecodingXLogReadPageCB read_page;
 	struct ReorderBuffer *reorder;
 	struct SnapBuild *snapshot_builder;
 
@@ -89,7 +94,7 @@ typedef struct LogicalDecodingContext
 	bool		prepared_write;
 	XLogRecPtr	write_location;
 	TransactionId write_xid;
-} LogicalDecodingContext;
+};
 
 
 extern void CheckLogicalDecodingRequirements(void);
@@ -98,7 +103,7 @@ extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
 						  List *output_plugin_options,
 						  bool need_full_snapshot,
 						  XLogRecPtr restart_lsn,
-						  XLogPageReadCB read_page,
+						  LogicalDecodingXLogReadPageCB read_page,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
 						  LogicalOutputPluginWriterWrite do_write,
 						  LogicalOutputPluginWriterUpdateProgress update_progress);
@@ -106,7 +111,7 @@ extern LogicalDecodingContext *CreateDecodingContext(
 					  XLogRecPtr start_lsn,
 					  List *output_plugin_options,
 					  bool fast_forward,
-					  XLogPageReadCB read_page,
+					  LogicalDecodingXLogReadPageCB read_page,
 					  LogicalOutputPluginWriterPrepareWrite prepare_write,
 					  LogicalOutputPluginWriterWrite do_write,
 					  LogicalOutputPluginWriterUpdateProgress update_progress);
diff --git a/src/include/replication/logicalfuncs.h b/src/include/replication/logicalfuncs.h
index 3d00dee067..04a9fe10fa 100644
--- a/src/include/replication/logicalfuncs.h
+++ b/src/include/replication/logicalfuncs.h
@@ -11,9 +11,6 @@
 
 #include "replication/logical.h"
 
-extern void logical_read_local_xlog_page(XLogReaderState *state,
-							 XLogRecPtr targetPagePtr,
-							 int reqLen, XLogRecPtr targetRecPtr,
-							 char *cur_page, TimeLineID *pageTLI);
+extern void logical_read_local_xlog_page(LogicalDecodingContext *ctx);
 
 #endif
-- 
2.16.3

