From 1236dc75d5609f14d359afb66a0a5f8bbfbf353e Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tv@fuzzy.cz>
Date: Thu, 26 Sep 2019 17:26:31 +0200
Subject: [PATCH v6 03/12] Extend the output plugin API with stream methods

This adds four methods the output plugin API, adding support
for streaming changes for large transactions.

* stream_message
* stream_change
* stream_truncate
* stream_abort
* stream_commit
* stream_start
* stream_stop

Most of this is a simple extension of the existing methods, with
the semantic difference that the transaction (or subtransaction)
is incomplete and may be aborted later (which is something the
regular API does not really need to deal with).

This also extends the 'test_decoding' plugin, implementing these
new stream methods.

The stream_start/start_stop are used to demarcate the a chunk
of changes streamed for a particular toplevel transaction.
---
 contrib/test_decoding/test_decoding.c     | 100 ++++++
 doc/src/sgml/logicaldecoding.sgml         | 209 +++++++++++++
 src/backend/replication/logical/logical.c | 361 ++++++++++++++++++++++
 src/include/replication/logical.h         |   5 +
 src/include/replication/output_plugin.h   |  69 +++++
 src/include/replication/reorderbuffer.h   |  57 ++++
 6 files changed, 801 insertions(+)

diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index cd105d91e0..a3efbfdd76 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -62,6 +62,28 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
 							  ReorderBufferTXN *txn, XLogRecPtr message_lsn,
 							  bool transactional, const char *prefix,
 							  Size sz, const char *message);
+static void pg_decode_stream_message(LogicalDecodingContext *ctx,
+									 ReorderBufferTXN *txn, XLogRecPtr message_lsn,
+									 bool transactional, const char *prefix,
+									 Size sz, const char *message);
+static void pg_decode_stream_change(LogicalDecodingContext *ctx,
+									ReorderBufferTXN *txn,
+									Relation relation,
+									ReorderBufferChange *change);
+static void pg_decode_stream_truncate(LogicalDecodingContext *ctx,
+									  ReorderBufferTXN *txn,
+									  int nrelations, Relation relations[],
+									  ReorderBufferChange *change);
+static void pg_decode_stream_abort(LogicalDecodingContext *ctx,
+								   ReorderBufferTXN *txn,
+								   XLogRecPtr abort_lsn);
+static void pg_decode_stream_commit(LogicalDecodingContext *ctx,
+									ReorderBufferTXN *txn,
+									XLogRecPtr apply_lsn);
+static void pg_decode_stream_start(LogicalDecodingContext *ctx,
+								   ReorderBufferTXN *txn);
+static void pg_decode_stream_stop(LogicalDecodingContext *ctx,
+								  ReorderBufferTXN *txn);
 
 void
 _PG_init(void)
@@ -83,6 +105,13 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->filter_by_origin_cb = pg_decode_filter;
 	cb->shutdown_cb = pg_decode_shutdown;
 	cb->message_cb = pg_decode_message;
+	cb->stream_message_cb = pg_decode_stream_message;
+	cb->stream_change_cb = pg_decode_stream_change;
+	cb->stream_truncate_cb = pg_decode_stream_truncate;
+	cb->stream_abort_cb = pg_decode_stream_abort;
+	cb->stream_commit_cb = pg_decode_stream_commit;
+	cb->stream_start_cb = pg_decode_stream_start;
+	cb->stream_stop_cb = pg_decode_stream_stop;
 }
 
 
@@ -542,3 +571,74 @@ pg_decode_message(LogicalDecodingContext *ctx,
 	appendBinaryStringInfo(ctx->out, message, sz);
 	OutputPluginWrite(ctx, true);
 }
+
+static void
+pg_decode_stream_message(LogicalDecodingContext *ctx,
+						 ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
+						 const char *prefix, Size sz, const char *message)
+{
+	OutputPluginPrepareWrite(ctx, true);
+	appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu content:",
+					 transactional, prefix, sz);
+	appendBinaryStringInfo(ctx->out, message, sz);
+	OutputPluginWrite(ctx, true);
+}
+
+static void
+pg_decode_stream_change(LogicalDecodingContext *ctx,
+						ReorderBufferTXN *txn,
+						Relation relation,
+						ReorderBufferChange *change)
+{
+	OutputPluginPrepareWrite(ctx, true);
+	appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
+	OutputPluginWrite(ctx, true);
+}
+
+static void
+pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+						  int nrelations, Relation relations[],
+						  ReorderBufferChange *change)
+{
+	OutputPluginPrepareWrite(ctx, true);
+	appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);
+	OutputPluginWrite(ctx, true);
+}
+
+static void
+pg_decode_stream_abort(LogicalDecodingContext *ctx,
+					   ReorderBufferTXN *txn,
+					   XLogRecPtr abort_lsn)
+{
+	OutputPluginPrepareWrite(ctx, true);
+	appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid);
+	OutputPluginWrite(ctx, true);
+}
+
+static void
+pg_decode_stream_commit(LogicalDecodingContext *ctx,
+						ReorderBufferTXN *txn,
+						XLogRecPtr apply_lsn)
+{
+	OutputPluginPrepareWrite(ctx, true);
+	appendStringInfo(ctx->out, "committing streamed transaction TXN %u", txn->xid);
+	OutputPluginWrite(ctx, true);
+}
+
+static void
+pg_decode_stream_start(LogicalDecodingContext *ctx,
+					   ReorderBufferTXN *txn)
+{
+	OutputPluginPrepareWrite(ctx, true);
+	appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid);
+	OutputPluginWrite(ctx, true);
+}
+
+static void
+pg_decode_stream_stop(LogicalDecodingContext *ctx,
+					  ReorderBufferTXN *txn)
+{
+	OutputPluginPrepareWrite(ctx, true);
+	appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid);
+	OutputPluginWrite(ctx, true);
+}
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 8db968641e..ace21ec8e5 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -388,6 +388,13 @@ typedef struct OutputPluginCallbacks
     LogicalDecodeMessageCB message_cb;
     LogicalDecodeFilterByOriginCB filter_by_origin_cb;
     LogicalDecodeShutdownCB shutdown_cb;
+    LogicalDecodeStreamChangeCB stream_change_cb;
+    LogicalDecodeStreamTruncateCB stream_truncate_cb;
+    LogicalDecodeStreamMessageCB stream_message_cb;
+    LogicalDecodeStreamCommitCB stream_commit_cb;
+    LogicalDecodeStreamAbortCB stream_abort_cb;
+    LogicalDecodeStreamStartCB stream_start_cb;
+    LogicalDecodeStreamStopCB stream_stop_cb;
 } OutputPluginCallbacks;
 
 typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
@@ -400,6 +407,15 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
      If <function>truncate_cb</function> is not set but a
      <command>TRUNCATE</command> is to be decoded, the action will be ignored.
     </para>
+
+    <para>
+     An output plugin may also define functions to support streaming of large,
+     in-progress transactions. The <function>stream_change_cb</function>,
+     <function>stream_commit_cb</function>, <function>stream_abort_cb</function>,
+     <function>stream_start_cb</function> and <function>stream_stop_cb</function>
+     are required, while <function>stream_message_cb</function> and
+     <function>stream_truncate_cb</function> are optional.
+    </para>
    </sect2>
 
    <sect2 id="logicaldecoding-capabilities">
@@ -678,6 +694,112 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
      </para>
     </sect3>
 
+    <sect3 id="logicaldecoding-output-plugin-stream-start">
+     <title>Stream Start Callback</title>
+     <para>
+      The <function>stream_start_cb</function> callback is called when opening
+      a block of streamed changes from an in-progress transaction.
+<programlisting>
+typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx,
+                                        ReorderBufferTXN *txn);
+</programlisting>
+     </para>
+    </sect3>
+
+    <sect3 id="logicaldecoding-output-plugin-stream-stop">
+     <title>Stream Stop Callback</title>
+     <para>
+      The <function>stream_stop_cb</function> callback is called when closing
+      a block of streamed changes from an in-progress transaction.
+<programlisting>
+typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx,
+                                        ReorderBufferTXN *txn);
+</programlisting>
+     </para>
+    </sect3>
+
+    <sect3 id="logicaldecoding-output-plugin-stream-change">
+     <title>Stream Change Callback</title>
+     <para>
+      The <function>stream_change_cb</function> callback is called when sending
+      a change in a block of streamed changes (demarcated by
+      <function>stream_start_cb</function> and <function>stream_stop_cb</function> calls).
+<programlisting>
+typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx,
+                                        ReorderBufferTXN *txn,
+                                        Relation relation,
+                                        ReorderBufferChange *change);
+</programlisting>
+     </para>
+    </sect3>
+
+    <sect3 id="logicaldecoding-output-plugin-stream-truncate">
+     <title>Stream Truncate Callback</title>
+     <para>
+      The <function>stream_truncate_cb</function> callback is called for a
+      <command>TRUNCATE</command> command in a block of streamed changes
+      (demarcated by <function>stream_start_cb</function> and
+      <function>stream_stop_cb</function> calls).
+<programlisting>
+typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx,
+                                         ReorderBufferTXN *txn,
+                                         int nrelations,
+                                         Relation relations[],
+                                         ReorderBufferChange *change);
+</programlisting>
+      The parameters are analogous to the <function>stream_change_cb</function>
+      callback.  However, because <command>TRUNCATE</command> actions on
+      tables connected by foreign keys need to be executed together, this
+      callback receives an array of relations instead of just a single one.
+      See the description of the <xref linkend="sql-truncate"/> statement for
+      details.
+     </para>
+    </sect3>
+
+    <sect3 id="logicaldecoding-output-plugin-stream-message">
+     <title>Stream Message Callback</title>
+     <para>
+      The <function>stream_message_cb</function> callback is called when sending
+      a generic message in a block of streamed changes (demarcated by
+      <function>stream_start_cb</function> and <function>stream_stop_cb</function> calls).
+<programlisting>
+typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx,
+                                        ReorderBufferTXN *txn,
+                                        XLogRecPtr message_lsn,
+                                        bool transactional,
+                                        const char *prefix,
+                                        Size message_size,
+                                        const char *message);
+</programlisting>
+     </para>
+    </sect3>
+
+    <sect3 id="logicaldecoding-output-plugin-stream-commit">
+     <title>Stream Commit Callback</title>
+     <para>
+      The <function>stream_commit_cb</function> callback is called to commit
+      a previously streamed transaction.
+<programlisting>
+typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx,
+                                        ReorderBufferTXN *txn,
+                                        XLogRecPtr commit_lsn);
+</programlisting>
+     </para>
+    </sect3>
+
+    <sect3 id="logicaldecoding-output-plugin-stream-abort">
+     <title>Stream Abort Callback</title>
+     <para>
+      The <function>stream_abort_cb</function> callback is called to abort
+      a previously streamed transaction.
+<programlisting>
+typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
+                                        ReorderBufferTXN *txn,
+                                        XLogRecPtr abort_lsn);
+</programlisting>
+     </para>
+    </sect3>
+
    </sect2>
 
    <sect2 id="logicaldecoding-output-plugin-output">
@@ -746,4 +868,91 @@ OutputPluginWrite(ctx, true);
      </para>
    </note>
   </sect1>
+
+  <sect1 id="logicaldecoding-streaming">
+   <title>Streaming of Large Transactions for Logical Decoding</title>
+
+   <para>
+    The basic output plugin callbacks (e.g. <function>begin_cb</function>,
+    <function>change_cb</function>, <function>commit_cb</function> and
+    <function>message_cb</function>) are only invoked when the transaction
+    actually commits. The changes are still decoded from the transaction
+    log, but are only passed to the output plugin at commit (and discarded
+    if the transaction aborts).
+   </para>
+
+   <para>
+    This means that while the decoding happens incrementally, and may spill
+    to disk to keep memory usage under control, all the decoded changes have
+    to be transmitted when the transaction finally commits (or more precisely,
+    when the commit is decoded from the transaction log). Depending on the
+    size of the transaction and network bandwidth, the transfer time may
+    significantly increase the apply lag.
+   </para>
+
+   <para>
+    To reduce the apply lag caused by large transactions, an output plugin
+    may provide additional callback to support incremental streaming of
+    in-progress transactions. There are multiple required streaming callbacks
+    (<function>stream_change_cb</function>, <function>stream_commit_cb</function>,
+    <function>stream_abort_cb</function>, <function>stream_start_cb</function>
+    and <function>stream_stop_cb</function>) and one optional callback
+    (<function>stream_message_cb</function>).
+   </para>
+
+   <para>
+    When streaming an in-progress transaction, the changes (and messages) are
+    streamed in blocks demarcated by <function>stream_start_cb</function>
+    and <function>stream_stop_cb</function> callbacks. Once all the decoded
+    changes are transmitted, the transaction is committed using the
+    <function>stream_commit_cb</function> callback (or possibly aborted using
+    the <function>stream_abort_cb</function> callback).
+   </para>
+
+   <para>
+    One example sequence of streaming callback calls for one transaction may
+    look like this:
+<programlisting>
+stream_start_cb(...);   &lt;-- start of first block of changes
+  stream_change_cb(...);
+  stream_change_cb(...);
+  stream_message_cb(...);
+  stream_change_cb(...);
+  ...
+  stream_change_cb(...);
+stream_stop_cb(...);    &lt;-- end of first block of changes
+
+stream_start_cb(...);   &lt;-- start of second block of changes
+  stream_change_cb(...);
+  stream_change_cb(...);
+  stream_change_cb(...);
+  ...
+  stream_message_cb(...);
+  stream_change_cb(...);
+stream_stop_cb(...);    &lt;-- end of second block of changes
+
+stream_commit_cb(...);  &lt;-- commit of the streamed transaction
+</programlisting>
+   </para>
+
+   <para>
+    The actual sequence of callback calls may be more complicated, of course.
+    There may be blocks for multiple streamed transactions, some of the
+    transactions may get aborted, etc.
+   </para>
+
+   <para>
+    Similar to spill-to-disk behavior, streaming is triggered when the total
+    amount of changes decoded from the WAL (for all in-progress transactions)
+    exceeds limit defined by <varname>logical_decoding_work_mem</varname> setting.
+    At that point the largest toplevel transaction (measured by amount of memory
+    currently used for decoded changes) is selected and streamed.
+   </para>
+
+   <para>
+    Even when streaming large transactions, the changes are still applied in
+    commit order, preserving the same guarantees as the non-streaming mode.
+   </para>
+
+  </sect1>
  </chapter>
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index bdf4389a57..9c95fc1ed8 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -65,6 +65,21 @@ static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 							   XLogRecPtr message_lsn, bool transactional,
 							   const char *prefix, Size message_size, const char *message);
 
+/* streaming callbacks */
+static void stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+						 Relation relation, ReorderBufferChange *change);
+static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+						   int nrelations, Relation relations[], ReorderBufferChange *change);
+static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+						  XLogRecPtr message_lsn, bool transactional,
+						  const char *prefix, Size message_size, const char *message);
+static void stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+						XLogRecPtr abort_lsn);
+static void stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+						 XLogRecPtr apply_lsn);
+static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn);
+static void stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn);
+
 static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin);
 
 /*
@@ -189,6 +204,39 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder->commit = commit_cb_wrapper;
 	ctx->reorder->message = message_cb_wrapper;
 
+	/*
+	 * To support streaming, we require change/commit/abort callbacks. The
+	 * message callback is optional, similarly to regular output plugins. We
+	 * however enable streaming when at least one of the methods is enabled,
+	 * so that we can easily identify missing methods.
+	 *
+	 * We decide it here, but only check it later in the wrappers.
+	 */
+	ctx->streaming = (ctx->callbacks.stream_change_cb != NULL) ||
+		(ctx->callbacks.stream_abort_cb != NULL) ||
+		(ctx->callbacks.stream_message_cb != NULL) ||
+		(ctx->callbacks.stream_truncate_cb != NULL) ||
+		(ctx->callbacks.stream_commit_cb != NULL) ||
+		(ctx->callbacks.stream_start_cb != NULL) ||
+		(ctx->callbacks.stream_stop_cb != NULL);
+
+	/*
+	 * streaming callbacks
+	 *
+	 * stream_message and stream_truncate callbacks are optional,
+	 * so we do not fail with ERROR when missing, but the wrappers
+	 * simply do nothing. We must set the ReorderBuffer callbacks
+	 * to something, otherwise the calls from there will crash (we
+	 * don't want to move the checks there).
+	 */
+	ctx->reorder->stream_change = stream_change_cb_wrapper;
+	ctx->reorder->stream_abort = stream_abort_cb_wrapper;
+	ctx->reorder->stream_commit = stream_commit_cb_wrapper;
+	ctx->reorder->stream_start = stream_start_cb_wrapper;
+	ctx->reorder->stream_stop = stream_stop_cb_wrapper;
+	ctx->reorder->stream_message = stream_message_cb_wrapper;
+	ctx->reorder->stream_truncate = stream_truncate_cb_wrapper;
+
 	ctx->out = makeStringInfo();
 	ctx->prepare_write = prepare_write;
 	ctx->write = do_write;
@@ -863,6 +911,319 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	error_context_stack = errcallback.previous;
 }
 
+static void
+stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+						 Relation relation, ReorderBufferChange *change)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	Assert(!ctx->fast_forward);
+
+	/* We're only supposed to call this when streaming is supported. */
+	Assert(ctx->streaming);
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "stream_change";
+	state.report_location = change->lsn;
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn->xid;
+
+	/*
+	 * report this change's lsn so replies from clients can give an up2date
+	 * answer. This won't ever be enough (and shouldn't be!) to confirm
+	 * receipt of this transaction, but it might allow another transaction's
+	 * commit to be confirmed with one message.
+	 */
+	ctx->write_location = change->lsn;
+
+	/* in streaming mode, stream_change_cb is required */
+	if (ctx->callbacks.stream_change_cb == NULL)
+		ereport(ERROR,
+				(errmsg("Output plugin supports streaming, but has not registered "
+						"stream_change_cb callback.")));
+
+	ctx->callbacks.stream_change_cb(ctx, txn, relation, change);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
+static void
+stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+						   int nrelations, Relation relations[],
+						   ReorderBufferChange *change)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	Assert(!ctx->fast_forward);
+
+	/* We're only supposed to call this when streaming is supported. */
+	Assert(ctx->streaming);
+
+	/* this callback is optional */
+	if (!ctx->callbacks.stream_truncate_cb)
+		return;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "stream_truncate";
+	state.report_location = change->lsn;
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn->xid;
+
+	/*
+	 * report this change's lsn so replies from clients can give an up2date
+	 * answer. This won't ever be enough (and shouldn't be!) to confirm
+	 * receipt of this transaction, but it might allow another transaction's
+	 * commit to be confirmed with one message.
+	 */
+	ctx->write_location = change->lsn;
+
+	ctx->callbacks.stream_truncate_cb(ctx, txn, nrelations, relations, change);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
+static void
+stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+						  XLogRecPtr message_lsn, bool transactional,
+						  const char *prefix, Size message_size, const char *message)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	Assert(!ctx->fast_forward);
+
+	/* We're only supposed to call this when streaming is supported. */
+	Assert(ctx->streaming);
+
+	/* this callback is optional */
+	if (ctx->callbacks.stream_message_cb == NULL)
+		return;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "stream_message";
+	state.report_location = message_lsn;
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
+	ctx->write_location = message_lsn;
+
+	/* do the actual work: call callback */
+	ctx->callbacks.stream_message_cb(ctx, txn, message_lsn, transactional, prefix,
+									 message_size, message);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
+static void
+stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+						XLogRecPtr abort_lsn)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	Assert(!ctx->fast_forward);
+
+	/* We're only supposed to call this when streaming is supported. */
+	Assert(ctx->streaming);
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "stream_abort";
+	state.report_location = abort_lsn;
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn->xid;
+
+	/*
+	 * report this change's lsn so replies from clients can give an up2date
+	 * answer. This won't ever be enough (and shouldn't be!) to confirm
+	 * receipt of this transaction, but it might allow another transaction's
+	 * commit to be confirmed with one message.
+	 */
+	ctx->write_location = abort_lsn;
+
+	/* in streaming mode, stream_abort_cb is required */
+	if (ctx->callbacks.stream_abort_cb == NULL)
+		ereport(ERROR,
+				(errmsg("Output plugin supports streaming, but has not registered "
+						"stream_abort_cb callback.")));
+
+	ctx->callbacks.stream_abort_cb(ctx, txn, abort_lsn);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
+static void
+stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+						 XLogRecPtr apply_lsn)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	Assert(!ctx->fast_forward);
+
+	/* We're only supposed to call this when streaming is supported. */
+	Assert(ctx->streaming);
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "stream_commit";
+	state.report_location = apply_lsn;
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn->xid;
+
+	/*
+	 * report this change's lsn so replies from clients can give an up2date
+	 * answer. This won't ever be enough (and shouldn't be!) to confirm
+	 * receipt of this transaction, but it might allow another transaction's
+	 * commit to be confirmed with one message.
+	 */
+	ctx->write_location = apply_lsn;
+
+	/* in streaming mode, stream_abort_cb is required */
+	if (ctx->callbacks.stream_commit_cb == NULL)
+		ereport(ERROR,
+				(errmsg("Output plugin supports streaming, but has not registered "
+						"stream_commit_cb callback.")));
+
+	ctx->callbacks.stream_commit_cb(ctx, txn, apply_lsn);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
+static void
+stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	Assert(!ctx->fast_forward);
+
+	/* We're only supposed to call this when streaming is supported. */
+	Assert(ctx->streaming);
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "stream_start";
+	/* state.report_location = apply_lsn; */
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn->xid;
+
+	/*
+	 * report this change's lsn so replies from clients can give an up2date
+	 * answer. This won't ever be enough (and shouldn't be!) to confirm
+	 * receipt of this transaction, but it might allow another transaction's
+	 * commit to be confirmed with one message.
+	 */
+	ctx->write_location = txn->first_lsn;
+
+	/* in streaming mode, stream_start_cb is required */
+	if (ctx->callbacks.stream_start_cb == NULL)
+		ereport(ERROR,
+				(errmsg("Output plugin supports streaming, but has not registered "
+						"stream_start_cb callback.")));
+
+	ctx->callbacks.stream_start_cb(ctx, txn);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
+static void
+stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	Assert(!ctx->fast_forward);
+
+	/* We're only supposed to call this when streaming is supported. */
+	Assert(ctx->streaming);
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "stream_stop";
+	/* state.report_location = apply_lsn; */
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn->xid;
+
+	/*
+	 * report this change's lsn so replies from clients can give an up2date
+	 * answer. This won't ever be enough (and shouldn't be!) to confirm
+	 * receipt of this transaction, but it might allow another transaction's
+	 * commit to be confirmed with one message.
+	 */
+	ctx->write_location = txn->final_lsn;
+
+	/* in streaming mode, stream_stop_cb is required */
+	if (ctx->callbacks.stream_stop_cb == NULL)
+		ereport(ERROR,
+				(errmsg("Output plugin supports streaming, but has not registered "
+						"stream_stop_cb callback.")));
+
+	ctx->callbacks.stream_stop_cb(ctx, txn);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
 /*
  * Set the required catalog xmin horizon for historic snapshots in the current
  * replication slot.
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 3b7ca7f1da..f24e2468ac 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -79,6 +79,11 @@ typedef struct LogicalDecodingContext
 	 */
 	void	   *output_writer_private;
 
+	/*
+	 * Does the output plugin support streaming, and is it enabled?
+	 */
+	bool		streaming;
+
 	/*
 	 * State for writing output.
 	 */
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 3dd9236c57..0d0a94a648 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -99,6 +99,67 @@ typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ct
  */
 typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
 
+/*
+ * Callback for streaming individual changes from in-progress transactions.
+ */
+typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx,
+											 ReorderBufferTXN *txn,
+											 Relation relation,
+											 ReorderBufferChange *change);
+
+/*
+ * Callback for streaming truncates from in-progress transactions.
+ */
+typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx,
+										 ReorderBufferTXN *txn,
+										 int nrelations,
+										 Relation relations[],
+										 ReorderBufferChange *change);
+
+/*
+ * Callback for streaming generic logical decoding messages from in-progress
+ * transactions.
+ */
+typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx,
+											  ReorderBufferTXN *txn,
+											  XLogRecPtr message_lsn,
+											  bool transactional,
+											  const char *prefix,
+											  Size message_size,
+											  const char *message);
+
+/*
+ * Called to discard changes streamed to remote node from in-progress
+ * transaction.
+ */
+typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
+											ReorderBufferTXN *txn,
+											XLogRecPtr abort_lsn);
+
+/*
+ * Called to apply changes streamed to remote node from in-progress
+ * transaction.
+ */
+typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx,
+											 ReorderBufferTXN *txn,
+											 XLogRecPtr commit_lsn);
+
+/*
+ * Called when starting to stream a block of changes from in-progress
+ * transaction (may be called repeatedly, if it's streamed in multiple
+ * chunks).
+ */
+typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx,
+											ReorderBufferTXN *txn);
+
+/*
+ * Called when stopping to stream a block of changes from in-progress
+ * transaction to a remote node (may be called repeatedly, if it's streamed
+ * in multiple chunks).
+ */
+typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx,
+										   ReorderBufferTXN *txn);
+
 /*
  * Output plugin callbacks
  */
@@ -112,6 +173,14 @@ typedef struct OutputPluginCallbacks
 	LogicalDecodeMessageCB message_cb;
 	LogicalDecodeFilterByOriginCB filter_by_origin_cb;
 	LogicalDecodeShutdownCB shutdown_cb;
+	/* streaming of changes */
+	LogicalDecodeStreamChangeCB stream_change_cb;
+	LogicalDecodeStreamTruncateCB stream_truncate_cb;
+	LogicalDecodeStreamMessageCB stream_message_cb;
+	LogicalDecodeStreamAbortCB stream_abort_cb;
+	LogicalDecodeStreamCommitCB stream_commit_cb;
+	LogicalDecodeStreamStartCB stream_start_cb;
+	LogicalDecodeStreamStopCB stream_stop_cb;
 } OutputPluginCallbacks;
 
 /* Functions in replication/logical/logical.c */
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index fa41115db9..79ea33cd26 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -355,6 +355,52 @@ typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb,
 										const char *prefix, Size sz,
 										const char *message);
 
+/* stream change callback signature */
+typedef void (*ReorderBufferStreamChangeCB) (
+											 ReorderBuffer *rb,
+											 ReorderBufferTXN *txn,
+											 Relation relation,
+											 ReorderBufferChange *change);
+
+/* stream truncate callback signature */
+typedef void (*ReorderBufferStreamTruncateCB) (
+											  ReorderBuffer *rb,
+											  ReorderBufferTXN *txn,
+											  int nrelations,
+											  Relation relations[],
+											  ReorderBufferChange *change);
+
+/* stream message callback signature */
+typedef void (*ReorderBufferStreamMessageCB) (
+											  ReorderBuffer *rb,
+											  ReorderBufferTXN *txn,
+											  XLogRecPtr message_lsn,
+											  bool transactional,
+											  const char *prefix, Size sz,
+											  const char *message);
+
+/* discard streamed transaction callback signature */
+typedef void (*ReorderBufferStreamAbortCB) (
+											ReorderBuffer *rb,
+											ReorderBufferTXN *txn,
+											XLogRecPtr abort_lsn);
+
+/* commit streamed transaction callback signature */
+typedef void (*ReorderBufferStreamCommitCB) (
+											 ReorderBuffer *rb,
+											 ReorderBufferTXN *txn,
+											 XLogRecPtr commit_lsn);
+
+/* start streaming transaction callback signature */
+typedef void (*ReorderBufferStreamStartCB) (
+											ReorderBuffer *rb,
+											ReorderBufferTXN *txn);
+
+/* stop streaming transaction callback signature */
+typedef void (*ReorderBufferStreamStopCB) (
+										   ReorderBuffer *rb,
+										   ReorderBufferTXN *txn);
+
 struct ReorderBuffer
 {
 	/*
@@ -393,6 +439,17 @@ struct ReorderBuffer
 	ReorderBufferCommitCB commit;
 	ReorderBufferMessageCB message;
 
+	/*
+	 * Callbacks to be called when streaming a transaction.
+	 */
+	ReorderBufferStreamStartCB stream_start;
+	ReorderBufferStreamStopCB stream_stop;
+	ReorderBufferStreamChangeCB stream_change;
+	ReorderBufferStreamTruncateCB stream_truncate;
+	ReorderBufferStreamMessageCB stream_message;
+	ReorderBufferStreamAbortCB stream_abort;
+	ReorderBufferStreamCommitCB stream_commit;
+
 	/*
 	 * Pointer that will be passed untouched to the callbacks.
 	 */
-- 
2.20.1

