From 43de23ddafb17950add6ffff585e501e5ce4c54b Mon Sep 17 00:00:00 2001
From: Craig Ringer <craig@2ndquadrant.com>
Date: Mon, 5 Sep 2016 22:19:03 +0800
Subject: [PATCH 2/2] Second part of Vladimir Gordiychuk's patch

This component allows streaming to be interrupted during ReorderBufferCommit.

TODO: explain, write up, document
---
 src/backend/replication/logical/logical.c       | 15 ++++++----
 src/backend/replication/logical/logicalfuncs.c  | 15 +++++++++-
 src/backend/replication/logical/reorderbuffer.c |  9 ++++--
 src/backend/replication/slotfuncs.c             |  2 +-
 src/backend/replication/walsender.c             | 39 +++++++++++++------------
 src/include/replication/logical.h               | 13 +++++++--
 src/include/replication/reorderbuffer.h         |  8 +++++
 7 files changed, 70 insertions(+), 31 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 1512be5..818b7cf 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -116,7 +116,8 @@ StartupDecodingContext(List *output_plugin_options,
 					   TransactionId xmin_horizon,
 					   XLogPageReadCB read_page,
 					   LogicalOutputPluginWriterPrepareWrite prepare_write,
-					   LogicalOutputPluginWriterWrite do_write)
+					   LogicalOutputPluginWriterWrite do_write,
+					   ReorderBufferIsActive is_active)
 {
 	ReplicationSlot *slot;
 	MemoryContext context,
@@ -180,6 +181,7 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder->apply_change = change_cb_wrapper;
 	ctx->reorder->commit = commit_cb_wrapper;
 	ctx->reorder->message = message_cb_wrapper;
+	ctx->reorder->is_active = is_active;
 
 	ctx->out = makeStringInfo();
 	ctx->prepare_write = prepare_write;
@@ -212,7 +214,8 @@ CreateInitDecodingContext(char *plugin,
 						  List *output_plugin_options,
 						  XLogPageReadCB read_page,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
-						  LogicalOutputPluginWriterWrite do_write)
+						  LogicalOutputPluginWriterWrite do_write,
+						  LogicalDecondingContextIsActive is_active)
 {
 	TransactionId xmin_horizon = InvalidTransactionId;
 	ReplicationSlot *slot;
@@ -288,7 +291,7 @@ CreateInitDecodingContext(char *plugin,
 	ReplicationSlotSave();
 
 	ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
-								 read_page, prepare_write, do_write);
+								 read_page, prepare_write, do_write, is_active);
 
 	/* call output plugin initialization callback */
 	old_context = MemoryContextSwitchTo(ctx->context);
@@ -328,7 +331,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 					  List *output_plugin_options,
 					  XLogPageReadCB read_page,
 					  LogicalOutputPluginWriterPrepareWrite prepare_write,
-					  LogicalOutputPluginWriterWrite do_write)
+					  LogicalOutputPluginWriterWrite do_write,
+					  LogicalDecondingContextIsActive is_active)
 {
 	LogicalDecodingContext *ctx;
 	ReplicationSlot *slot;
@@ -378,7 +382,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 
 	ctx = StartupDecodingContext(output_plugin_options,
 								 start_lsn, InvalidTransactionId,
-								 read_page, prepare_write, do_write);
+								 read_page, prepare_write, do_write,
+								 is_active);
 
 	/* call output plugin initialization callback */
 	old_context = MemoryContextSwitchTo(ctx->context);
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 4e4c8cd..e0efd4c 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -104,6 +104,17 @@ LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi
 	p->returned_rows++;
 }
 
+/**
+ * Stab function that necessary for LogicalDecondign context.
+ * Function always return true and it means that decoding WALs
+ * can't be interrupt in contrast of logical replication.
+ */
+static bool
+LogicalContextAlwaysActive(void)
+{
+	return true;
+}
+
 static void
 check_permissions(void)
 {
@@ -249,7 +260,9 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 									options,
 									logical_read_local_xlog_page,
 									LogicalOutputPrepareWrite,
-									LogicalOutputWrite);
+									LogicalOutputWrite,
+									LogicalContextAlwaysActive /* converting to Datum(sql api) can't be interrupted in contrast of replication*/
+									);
 
 		MemoryContextSwitchTo(oldcontext);
 
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 9b430b9..26b8e71 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1417,7 +1417,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 		rb->begin(rb, txn);
 
 		iterstate = ReorderBufferIterTXNInit(rb, txn);
-		while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
+		while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL && rb->is_active())
 		{
 			Relation	relation = NULL;
 			Oid			reloid;
@@ -1643,8 +1643,11 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 		ReorderBufferIterTXNFinish(rb, iterstate);
 		iterstate = NULL;
 
-		/* call commit callback */
-		rb->commit(rb, txn, commit_lsn);
+		if(rb->is_active())
+		{
+			/* call commit callback */
+			rb->commit(rb, txn, commit_lsn);
+		}
 
 		/* this is just a sanity check against bad output plugin behaviour */
 		if (GetCurrentTransactionIdIfAny() != InvalidTransactionId)
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index f908761..976bc0c 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -127,7 +127,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 	 */
 	ctx = CreateInitDecodingContext(
 									NameStr(*plugin), NIL,
-									logical_read_local_xlog_page, NULL, NULL);
+									logical_read_local_xlog_page, NULL, NULL, NULL);
 
 	/* build initial snapshot, might take a while */
 	DecodingContextFindStartpoint(ctx);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index c43310c..9f4c7b5 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -218,6 +218,8 @@ static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
 
 static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
 
+static bool IsStreamingActive(void);
+
 
 /* Initialize walsender process before entering the main command loop */
 void
@@ -823,7 +825,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 
 		ctx = CreateInitDecodingContext(cmd->plugin, NIL,
 										logical_read_xlog_page,
-										WalSndPrepareWrite, WalSndWriteData);
+										WalSndPrepareWrite, WalSndWriteData,
+										IsStreamingActive);
 
 		/*
 		 * Signal that we don't need the timeout mechanism. We're just
@@ -1002,7 +1005,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 	logical_decoding_ctx = CreateDecodingContext(
 											   cmd->startpoint, cmd->options,
 												 logical_read_xlog_page,
-										WalSndPrepareWrite, WalSndWriteData);
+										WalSndPrepareWrite, WalSndWriteData, IsStreamingActive);
 
 	/* Start reading WAL from the oldest required WAL. */
 	logical_startptr = MyReplicationSlot->data.restart_lsn;
@@ -1093,14 +1096,6 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 	memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
 		   tmpbuf.data, sizeof(int64));
 
-	/* fast path */
-	/* Try to flush pending output to the client */
-	if (pq_flush_if_writable() != 0)
-		WalSndShutdown();
-
-	if (!pq_is_send_pending())
-		return;
-
 	for (;;)
 	{
 		int			wakeEvents;
@@ -1235,7 +1230,14 @@ WalSndWaitForWal(XLogRecPtr loc)
 			break;
 
 		/*
-		 * We only send regular messages to the client for full decoded
+		 * If we have received CopyDone from the client, sent CopyDone
+		 * ourselves, it's time to exit streaming.
+		 */
+		if (!IsStreamingActive()) {
+			break;
+		}
+
+		/* We only send regular messages to the client for full decoded
 		 * transactions, but a synchronous replication and walsender shutdown
 		 * possibly are waiting for a later location. So we send pings
 		 * containing the flush location every now and then.
@@ -1797,14 +1799,7 @@ WalSndCheckTimeOut(TimestampTz now)
 	}
 }
 
-/*
- * Main loop of walsender process that streams the WAL over Copy messages.
- *
- * The send_data callback must enqueue complete CopyData messages to libpq
- * using pq_putmessage_noblock or similar, since the walsender loop may send
- * CopyDone then exit and return to command mode in response to a client
- * CopyDone between calls to send_data.
- */
+/* Main loop of walsender process that streams the WAL over Copy messages. */
 static void
 WalSndLoop(WalSndSendDataCallback send_data)
 {
@@ -2951,3 +2946,9 @@ WalSndKeepaliveIfNecessary(TimestampTz now)
 			WalSndShutdown();
 	}
 }
+
+static
+bool IsStreamingActive(void)
+{
+	return !streamingDoneReceiving && !streamingDoneSending;
+}
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 947000e..ba54ddc 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -26,6 +26,13 @@ typedef void (*LogicalOutputPluginWriterWrite) (
 
 typedef LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite;
 
+/*
+ * Callback function that allow interrupt logical replication during decoding.
+ * Function return true if decoding can be continue decode, but if function return false
+ * logical decoding will stop as soon as possible.
+ */
+typedef ReorderBufferIsActive LogicalDecondingContextIsActive;
+
 typedef struct LogicalDecodingContext
 {
 	/* memory context this is all allocated in */
@@ -81,13 +88,15 @@ extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
 						  List *output_plugin_options,
 						  XLogPageReadCB read_page,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
-						  LogicalOutputPluginWriterWrite do_write);
+						  LogicalOutputPluginWriterWrite do_write,
+						  LogicalDecondingContextIsActive is_active);
 extern LogicalDecodingContext *CreateDecodingContext(
 					  XLogRecPtr start_lsn,
 					  List *output_plugin_options,
 					  XLogPageReadCB read_page,
 					  LogicalOutputPluginWriterPrepareWrite prepare_write,
-					  LogicalOutputPluginWriterWrite do_write);
+					  LogicalOutputPluginWriterWrite do_write,
+					  LogicalDecondingContextIsActive is_active);
 extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx);
 extern bool DecodingContextReady(LogicalDecodingContext *ctx);
 extern void FreeDecodingContext(LogicalDecodingContext *ctx);
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 9e209ae..b0916d7 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -292,6 +292,9 @@ typedef void (*ReorderBufferMessageCB) (
 												 const char *prefix, Size sz,
 													const char *message);
 
+/* callback signature for check decoding status */
+typedef bool (*ReorderBufferIsActive) (void);
+
 struct ReorderBuffer
 {
 	/*
@@ -321,6 +324,11 @@ struct ReorderBuffer
 	ReorderBufferMessageCB message;
 
 	/*
+	 * Callback to define status of decoding. Return false if decoding not necessary continue
+	 */
+	ReorderBufferIsActive is_active;
+
+	/*
 	 * Pointer that will be passed untouched to the callbacks.
 	 */
 	void	   *private_data;
-- 
2.5.5

