From 84b94c7c49ca7aae737ffb1451eee9098c483578 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: Fri, 10 Jan 2020 18:03:27 -0300
Subject: [PATCH v6 05/12] Implement streaming mode in ReorderBuffer

Instead of serializing the transaction to disk after reaching the
maximum number of changes in memory (4096 changes), we consume the
changes we have in memory and invoke new stream API methods. This
happens in ReorderBufferStreamTXN() using about the same logic as
in ReorderBufferCommit() logic.

We can do this incremental processing thanks to having assignments
(associating subxact with toplevel xacts) in WAL right away, and
thanks to logging the invalidation messages.

This adds a second iterator for the streaming case, without the
spill-to-disk functionality and only processing changes currently
in memory.

Theoretically, we could get rid of the k-way merge, and append the
changes to the toplevel xact directly (and remember the position
in the list in case the subxact gets aborted later).

It also adds ReorderBufferTXN pointer to two places:

* ReorderBufferChange, so that we know which xact it belongs to
* ReorderBufferTXN, pointing to toplevel xact (from subxact)

The output plugin can use this to decide which changes to discard
in case of stream_abort_cb (e.g. when a subxact gets discarded).
---
 src/backend/access/heap/heapam_visibility.c   |  38 +-
 .../replication/logical/reorderbuffer.c       | 691 +++++++++++++++---
 src/include/replication/reorderbuffer.h       |  36 +
 3 files changed, 672 insertions(+), 93 deletions(-)

diff --git a/src/backend/access/heap/heapam_visibility.c b/src/backend/access/heap/heapam_visibility.c
index dba10890aa..160b167adb 100644
--- a/src/backend/access/heap/heapam_visibility.c
+++ b/src/backend/access/heap/heapam_visibility.c
@@ -1571,8 +1571,23 @@ HeapTupleSatisfiesHistoricMVCC(HeapTuple htup, Snapshot snapshot,
 												 htup, buffer,
 												 &cmin, &cmax);
 
+		/*
+		 * If we haven't resolved the combocid to cmin/cmax, that means
+		 * we have not decoded the combocid yet. That means the cmin is
+		 * definitely in the future, and we're not supposed to see the
+		 * tuple yet.
+		 *
+		 * XXX This only applies to decoding of in-progress transactions.
+		 * In regular logical decoding we only execute this code at commit
+		 * time, at which point we should have seen all relevant combocids.
+		 * So we should error out in this case.
+		 *
+		 * XXX For the streaming case, we can track the largest combocid
+		 * assigned, and error out based on this (when unable to resolve
+		 * combocid below that observed maximum value).
+		 */
 		if (!resolved)
-			elog(ERROR, "could not resolve cmin/cmax of catalog tuple");
+			return false;
 
 		Assert(cmin != InvalidCommandId);
 
@@ -1642,10 +1657,23 @@ HeapTupleSatisfiesHistoricMVCC(HeapTuple htup, Snapshot snapshot,
 												 htup, buffer,
 												 &cmin, &cmax);
 
-		if (!resolved)
-			elog(ERROR, "could not resolve combocid to cmax");
-
-		Assert(cmax != InvalidCommandId);
+		/*
+		 * If we haven't resolved the combocid to cmin/cmax, that means
+		 * we have not decoded the combocid yet. That means the cmax is
+		 * definitely in the future, and we're still supposed to see the
+		 * tuple.
+		 *
+		 * XXX This only applies to decoding of in-progress transactions.
+		 * In regular logical decoding we only execute this code at commit
+		 * time, at which point we should have seen all relevant combocids.
+		 * So we should error out in this case.
+		 *
+		 * XXX For the streaming case, we can track the largest combocid
+		 * assigned, and error out based on this (when unable to resolve
+		 * combocid below that observed maximum value).
+		 */
+		if (!resolved || cmax == InvalidCommandId)
+			return true;
 
 		if (cmax >= snapshot->curcid)
 			return true;		/* deleted after scan started */
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 2da0a23a7e..50341a6d9e 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -236,6 +236,7 @@ static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn
 static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 									   char *change);
 static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
+static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
 static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
 static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
 										TransactionId xid, XLogSegNo segno);
@@ -244,6 +245,15 @@ static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
 static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
 									  ReorderBufferTXN *txn, CommandId cid);
 
+/*
+ * ---------------------------------------
+ * Streaming support functions
+ * ---------------------------------------
+ */
+static bool ReorderBufferCanStream(ReorderBuffer *rb);
+static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
+static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn);
+
 /* ---------------------------------------
  * toast reassembly support
  * ---------------------------------------
@@ -371,6 +381,9 @@ ReorderBufferGetTXN(ReorderBuffer *rb)
 	dlist_init(&txn->tuplecids);
 	dlist_init(&txn->subtxns);
 
+	/* InvalidCommandId is not zero, so set it explicitly */
+	txn->command_id = InvalidCommandId;
+
 	return txn;
 }
 
@@ -768,6 +781,38 @@ AssertTXNLsnOrder(ReorderBuffer *rb)
 #endif
 }
 
+/*
+ * AssertChangeLsnOrder
+ *
+ * Check ordering of changes in the toplevel transaction.
+ */
+static void
+AssertChangeLsnOrder(ReorderBufferTXN *txn)
+{
+#ifdef USE_ASSERT_CHECKING
+	dlist_iter	iter;
+	XLogRecPtr	prev_lsn = txn->first_lsn;
+
+	dlist_foreach(iter, &txn->changes)
+	{
+		ReorderBufferChange *cur_change;
+
+		cur_change = dlist_container(ReorderBufferChange, node, iter.cur);
+
+		Assert(txn->first_lsn != InvalidXLogRecPtr);
+		Assert(cur_change->lsn != InvalidXLogRecPtr);
+		Assert(txn->first_lsn <= cur_change->lsn);
+
+		if (txn->end_lsn != InvalidXLogRecPtr)
+			Assert(cur_change->lsn <= txn->end_lsn);
+
+		Assert(prev_lsn <= cur_change->lsn);
+
+		prev_lsn = cur_change->lsn;
+	}
+#endif
+}
+
 /*
  * ReorderBufferGetOldestTXN
  *		Return oldest transaction in reorderbuffer
@@ -864,6 +909,9 @@ ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid,
 	subtxn->toplevel_xid = xid;
 	Assert(subtxn->nsubtxns == 0);
 
+	/* set the reference to toplevel transaction */
+	subtxn->toptxn = txn;
+
 	/* add to subtransaction list */
 	dlist_push_tail(&txn->subtxns, &subtxn->node);
 	txn->nsubtxns++;
@@ -987,7 +1035,7 @@ ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid,
  */
 
 /*
- * Binary heap comparison function.
+ * Binary heap comparison function (regular non-streaming iterator).
  */
 static int
 ReorderBufferIterCompare(Datum a, Datum b, void *arg)
@@ -1023,6 +1071,9 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
 
 	*iter_state = NULL;
 
+	/* Check ordering of changes in the toplevel transaction. */
+	AssertChangeLsnOrder(txn);
+
 	/*
 	 * Calculate the size of our heap: one element for every transaction that
 	 * contains changes.  (Besides the transactions already in the reorder
@@ -1037,6 +1088,9 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
 
 		cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
 
+		/* Check ordering of changes in this subtransaction. */
+		AssertChangeLsnOrder(cur_txn);
+
 		if (cur_txn->nentries > 0)
 			nr_txns++;
 	}
@@ -1319,6 +1373,15 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 		dlist_delete(&txn->base_snapshot_node);
 	}
 
+	/*
+	 * Cleanup the snapshot for the last streamed run.
+	 */
+	if (txn->snapshot_now != NULL)
+	{
+		Assert(rbtxn_is_streamed(txn));
+		ReorderBufferFreeSnap(rb, txn->snapshot_now);
+	}
+
 	/*
 	 * Remove TXN from its containing list.
 	 *
@@ -1344,9 +1407,94 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 	ReorderBufferReturnTXN(rb, txn);
 }
 
+/*
+ * Discard changes from a transaction (and subtransactions), after streaming
+ * them. Keep the remaining info - transactions, tuplecids and snapshots.
+ */
+static void
+ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
+{
+	dlist_mutable_iter iter;
+
+	/* cleanup subtransactions & their changes */
+	dlist_foreach_modify(iter, &txn->subtxns)
+	{
+		ReorderBufferTXN *subtxn;
+
+		subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
+
+		/*
+		 * Subtransactions are always associated to the toplevel TXN, even if
+		 * they originally were happening inside another subtxn, so we won't
+		 * ever recurse more than one level deep here.
+		 */
+		Assert(rbtxn_is_known_subxact(subtxn));
+		Assert(subtxn->nsubtxns == 0);
+
+		ReorderBufferTruncateTXN(rb, subtxn);
+	}
+
+	/* cleanup changes in the toplevel txn */
+	dlist_foreach_modify(iter, &txn->changes)
+	{
+		ReorderBufferChange *change;
+
+		change = dlist_container(ReorderBufferChange, node, iter.cur);
+
+		/* remove the change from it's containing list */
+		dlist_delete(&change->node);
+
+		ReorderBufferReturnChange(rb, change);
+	}
+
+	/*
+	 * Mark the transaction as streamed.
+	 *
+	 * The toplevel transaction, identified by (toptxn==NULL), is marked
+	 * as streamed always, even if it does not contain any changes (that
+	 * is, when all the changes are in subtransactions).
+	 *
+	 * For subtransactions, we only mark them as streamed when there are
+	 * any changes in them.
+	 *
+	 * We do it this way because of aborts - we don't want to send aborts
+	 * for XIDs the downstream is not aware of. And of course, it always
+	 * knows about the toplevel xact (we send the XID in all messages),
+	 * but we never stream XIDs of empty subxacts.
+	 */
+	if ((!txn->toptxn) || (txn->nentries_mem != 0))
+		txn->txn_flags |= RBTXN_IS_STREAMED;
+
+	/*
+	 * Destroy the (relfilenode, ctid) hashtable, so that we don't leak
+	 * any memory. We could also keep the hash table and update it with
+	 * new ctid values, but this seems simpler and good enough for now.
+	 */
+	if (txn->tuplecid_hash != NULL)
+	{
+		hash_destroy(txn->tuplecid_hash);
+		txn->tuplecid_hash = NULL;
+	}
+
+	/* also reset the number of entries in the transaction */
+	txn->nentries_mem = 0;
+	txn->nentries = 0;
+}
+
 /*
  * Build a hash with a (relfilenode, ctid) -> (cmin, cmax) mapping for use by
- * HeapTupleSatisfiesHistoricMVCC.
+ * tqual.c's HeapTupleSatisfiesHistoricMVCC.
+ *
+ * We do build the hash table even if there are no CIDs. That's
+ * because when streaming in-progress transactions we may run into
+ * tuples with the CID before actually decoding them. Think e.g. about
+ * INSERT followed by TRUNCATE, where the TRUNCATE may not be decoded
+ * yet when applying the INSERT. So we build a hash table so that
+ * ResolveCminCmaxDuringDecoding does not segfault in this case.
+ *
+ * XXX We might limit this behavior to streaming mode, and just bail
+ * out when decoding transaction at commit time (at which point it's
+ * guaranteed to see all CIDs).
  */
 static void
 ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
@@ -1354,9 +1502,6 @@ ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
 	dlist_iter	iter;
 	HASHCTL		hash_ctl;
 
-	if (!rbtxn_has_catalog_changes(txn) || dlist_is_empty(&txn->tuplecids))
-		return;
-
 	memset(&hash_ctl, 0, sizeof(hash_ctl));
 
 	hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
@@ -1495,63 +1640,48 @@ ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
 }
 
 /*
- * Perform the replay of a transaction and its non-aborted subtransactions.
- *
- * Subtransactions previously have to be processed by
- * ReorderBufferCommitChild(), even if previously assigned to the toplevel
- * transaction with ReorderBufferAssignChild.
- *
- * We currently can only decode a transaction's contents when its commit
- * record is read because that's the only place where we know about cache
- * invalidations. Thus, once a toplevel commit is read, we iterate over the top
- * and subtransactions (using a k-way merge) and replay the changes in lsn
- * order.
+ * If the transaction was (partially) streamed, we need to commit it in a
+ * 'streamed' way. That is, we first stream the remaining part of the
+ * transaction, and then invoke stream_commit message.
  */
-void
-ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
-					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
-					TimestampTz commit_time,
-					RepOriginId origin_id, XLogRecPtr origin_lsn)
+static void
+ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
+{
+	/* we should only call this for previously streamed transactions */
+	Assert(rbtxn_is_streamed(txn));
+
+	ReorderBufferStreamTXN(rb, txn);
+
+	rb->stream_commit(rb, txn, txn->final_lsn);
+
+	ReorderBufferCleanupTXN(rb, txn);
+}
+
+/*
+ * Helper function for ReorderBufferCommit and ReorderBufferStreamTXN
+ *
+ * Send data of a transaction (and its subtransactions) to the
+ * output plugin. If streaming is true then data will be sent using stream API.
+ */
+static void
+ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
+						XLogRecPtr commit_lsn,
+						volatile Snapshot snapshot_now,
+						volatile CommandId command_id,
+						bool streaming)
 {
-	ReorderBufferTXN *txn;
-	volatile Snapshot snapshot_now;
-	volatile CommandId command_id = FirstCommandId;
 	bool		using_subtxn;
+	MemoryContext ccxt = CurrentMemoryContext;
 	ReorderBufferIterTXNState *volatile iterstate = NULL;
-
-	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
-								false);
-
-	/* unknown transaction, nothing to replay */
-	if (txn == NULL)
-		return;
-
-	txn->final_lsn = commit_lsn;
-	txn->end_lsn = end_lsn;
-	txn->commit_time = commit_time;
-	txn->origin_id = origin_id;
-	txn->origin_lsn = origin_lsn;
+	volatile XLogRecPtr	prev_lsn = InvalidXLogRecPtr;
 
 	/*
-	 * If this transaction has no snapshot, it didn't make any changes to the
-	 * database, so there's nothing to decode.  Note that
-	 * ReorderBufferCommitChild will have transferred any snapshots from
-	 * subtransactions if there were any.
+	 * build data to be able to lookup the CommandIds of catalog tuples
 	 */
-	if (txn->base_snapshot == NULL)
-	{
-		Assert(txn->ninvalidations == 0);
-		ReorderBufferCleanupTXN(rb, txn);
-		return;
-	}
-
-	snapshot_now = txn->base_snapshot;
-
-	/* build data to be able to lookup the CommandIds of catalog tuples */
 	ReorderBufferBuildTupleCidHash(rb, txn);
 
 	/* setup the initial snapshot */
-	SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash, xid);
+	SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash, txn->xid);
 
 	/*
 	 * Decoding needs access to syscaches et al., which in turn use
@@ -1567,15 +1697,20 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 
 	PG_TRY();
 	{
+		XLogRecPtr	prev_lsn = InvalidXLogRecPtr;
 		ReorderBufferChange *change;
 		ReorderBufferChange *specinsert = NULL;
 
 		if (using_subtxn)
-			BeginInternalSubTransaction("replay");
+			BeginInternalSubTransaction("stream");
 		else
 			StartTransactionCommand();
 
-		rb->begin(rb, txn);
+		/* start streaming this chunk of transaction */
+		if (streaming)
+			rb->stream_start(rb, txn);
+		else
+			rb->begin(rb, txn);
 
 		ReorderBufferIterTXNInit(rb, txn, &iterstate);
 		while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
@@ -1583,6 +1718,35 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 			Relation	relation = NULL;
 			Oid			reloid;
 
+			/*
+			 * Enforce correct ordering of changes, merged from multiple
+			 * subtransactions. The changes may have the same LSN due to
+			 * MULTI_INSERT xlog records.
+			 */
+			Assert(prev_lsn == InvalidXLogRecPtr || prev_lsn <= change->lsn);
+
+			prev_lsn = change->lsn;
+
+			if (streaming)
+			{
+				/*
+				 * While streaming an in-progress transaction there is a
+				 * possibility that the (sub)transaction might get aborted
+				 * concurrently.  In such case if the (sub)transaction has
+				 * catalog update then we might decode the tuple using wrong
+				 * catalog version.  So for detecting the concurrent abort we
+				 * set CheckXidAlive to the current (sub)transaction's xid for
+				 * which this change belongs to.  And, during catalog scan we
+				 * can check the status of the xid and if it is aborted we will
+				 * report an specific error which we can ignore.  We might have
+				 * already streamed some of the changes for the aborted
+				 * (sub)transaction, but that is fine because when we decode the
+				 * abort we will stream abort message to truncate the changes in
+				 * the subscriber.
+				 */
+				CheckXidAlive = change->txn->xid;
+			}
+
 			switch (change->action)
 			{
 				case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
@@ -1592,8 +1756,6 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 					 * use as a normal record. It'll be cleaned up at the end
 					 * of INSERT processing.
 					 */
-					if (specinsert == NULL)
-						elog(ERROR, "invalid ordering of speculative insertion changes");
 					Assert(specinsert->data.tp.oldtuple == NULL);
 					change = specinsert;
 					change->action = REORDER_BUFFER_CHANGE_INSERT;
@@ -1659,7 +1821,15 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 					if (!IsToastRelation(relation))
 					{
 						ReorderBufferToastReplace(rb, txn, relation, change);
-						rb->apply_change(rb, txn, relation, change);
+						if (streaming)
+						{
+							rb->stream_change(rb, txn, relation, change);
+
+							/* Remember that we have sent some data for this xid. */
+							change->txn->any_data_sent = true;
+						}
+						else
+							rb->apply_change(rb, txn, relation, change);
 
 						/*
 						 * Only clear reassembled toast chunks if we're sure
@@ -1680,8 +1850,6 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 						 * freed/reused while restoring spooled data from
 						 * disk.
 						 */
-						Assert(change->data.tp.newtuple != NULL);
-
 						dlist_delete(&change->node);
 						ReorderBufferToastAppendChunk(rb, txn, relation,
 													  change);
@@ -1699,7 +1867,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 						specinsert = NULL;
 					}
 
-					if (relation != NULL)
+					if (RelationIsValid(relation))
 					{
 						RelationClose(relation);
 						relation = NULL;
@@ -1757,7 +1925,15 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 							relations[nrelations++] = relation;
 						}
 
-						rb->apply_truncate(rb, txn, nrelations, relations, change);
+						if (streaming)
+						{
+							rb->stream_truncate(rb, txn, nrelations, relations, change);
+
+							/* Remember that we have sent some data. */
+							change->txn->any_data_sent = true;
+						}
+						else
+							rb->apply_truncate(rb, txn, nrelations, relations, change);
 
 						for (i = 0; i < nrelations; i++)
 							RelationClose(relations[i]);
@@ -1766,10 +1942,16 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 					}
 
 				case REORDER_BUFFER_CHANGE_MESSAGE:
-					rb->message(rb, txn, change->lsn, true,
-								change->data.msg.prefix,
-								change->data.msg.message_size,
-								change->data.msg.message);
+					if (streaming)
+						rb->stream_message(rb, txn, change->lsn, true,
+										   change->data.msg.prefix,
+										   change->data.msg.message_size,
+										   change->data.msg.message);
+					else
+						rb->message(rb, txn, change->lsn, true,
+										   change->data.msg.prefix,
+										   change->data.msg.message_size,
+										   change->data.msg.message);
 					break;
 
 				case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
@@ -1800,9 +1982,9 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 						snapshot_now = change->data.snapshot;
 					}
 
-
 					/* and continue with the new one */
-					SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash, xid);
+					SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash,
+										  txn->xid);
 					break;
 
 				case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
@@ -1822,7 +2004,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 						snapshot_now->curcid = command_id;
 
 						TeardownHistoricSnapshot(false);
-						SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash, xid);
+						SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash,
+											  txn->xid);
 					}
 
 					break;
@@ -1860,14 +2043,40 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 		ReorderBufferIterTXNFinish(rb, iterstate);
 		iterstate = NULL;
 
-		/* call commit callback */
-		rb->commit(rb, txn, commit_lsn);
+		/*
+		 * Done with current changes, call stream_stop callback for streaming
+		 * transaction, commit callback otherwise.
+		 */
+		if (streaming)
+		{
+			/*
+			 * Set the last last of the stream as the final lsn before calling
+			 * stream stop.
+			 */
+			txn->final_lsn = prev_lsn;
+			rb->stream_stop(rb, txn);
+		}
+		else
+			rb->commit(rb, txn, commit_lsn);
 
 		/* this is just a sanity check against bad output plugin behaviour */
 		if (GetCurrentTransactionIdIfAny() != InvalidTransactionId)
 			elog(ERROR, "output plugin used XID %u",
 				 GetCurrentTransactionId());
 
+		/*
+		 * Remember the command ID and snapshot if transaction is streaming
+		 * otherwise free the snapshot if we have copied it.
+		 */
+		if (streaming)
+		{
+			txn->command_id = command_id;
+			txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
+													  txn, command_id);
+		}
+		else if (snapshot_now->copied)
+			ReorderBufferFreeSnap(rb, snapshot_now);
+
 		/* cleanup */
 		TeardownHistoricSnapshot(false);
 
@@ -1885,14 +2094,22 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 		if (using_subtxn)
 			RollbackAndReleaseCurrentSubTransaction();
 
-		if (snapshot_now->copied)
-			ReorderBufferFreeSnap(rb, snapshot_now);
-
-		/* remove potential on-disk data, and deallocate */
-		ReorderBufferCleanupTXN(rb, txn);
+		/*
+		 * If we are streaming the in-progress transaction then Discard the
+		 * changes that we just streamed, and mark the transactions as streamed
+		 * (if they contained changes). Otherwise, remove all the changes and
+		 * deallocate the ReorderBufferTXN.
+		 */
+		if (streaming)
+			ReorderBufferTruncateTXN(rb, txn);
+		else
+			ReorderBufferCleanupTXN(rb, txn);
 	}
 	PG_CATCH();
 	{
+		MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
+		ErrorData  *errdata = CopyErrorData();
+
 		/* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
 		if (iterstate)
 			ReorderBufferIterTXNFinish(rb, iterstate);
@@ -1911,17 +2128,116 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 		if (using_subtxn)
 			RollbackAndReleaseCurrentSubTransaction();
 
-		if (snapshot_now->copied)
-			ReorderBufferFreeSnap(rb, snapshot_now);
+		if (streaming)
+		{
+			/* Discard the changes that we just streamed. */
+			ReorderBufferTruncateTXN(rb, txn);
 
-		/* remove potential on-disk data, and deallocate */
-		ReorderBufferCleanupTXN(rb, txn);
+			/* Re-throw only if it's not an abort. */
+			if (errdata->sqlerrcode != ERRCODE_TRANSACTION_ROLLBACK)
+			{
+				MemoryContextSwitchTo(ecxt);
+				PG_RE_THROW();
+			}
+			else
+			{
+				/* remember the command ID and snapshot for the streaming run */
+				txn->command_id = command_id;
+				txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
+														  txn, command_id);
+				/*
+				 * Set the last last of the stream as the final lsn before
+				 * calling stream stop.
+				 */
+				txn->final_lsn = prev_lsn;
+				rb->stream_stop(rb, txn);
 
-		PG_RE_THROW();
+				FlushErrorState();
+			}
+		}
+		else
+		{
+			ReorderBufferCleanupTXN(rb, txn);
+			PG_RE_THROW();
+		}
 	}
 	PG_END_TRY();
 }
 
+/*
+ * Perform the replay of a transaction and its non-aborted subtransactions.
+ *
+ * Subtransactions previously have to be processed by
+ * ReorderBufferCommitChild(), even if previously assigned to the toplevel
+ * transaction with ReorderBufferAssignChild.
+ *
+ * We currently can only decode a transaction's contents when its commit
+ * record is read because that's the only place where we know about cache
+ * invalidations. Thus, once a toplevel commit is read, we iterate over the top
+ * and subtransactions (using a k-way merge) and replay the changes in lsn
+ * order.
+ */
+void
+ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
+					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+					TimestampTz commit_time,
+					RepOriginId origin_id, XLogRecPtr origin_lsn)
+{
+	ReorderBufferTXN *txn;
+	volatile Snapshot snapshot_now;
+	volatile CommandId command_id = FirstCommandId;
+
+	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+								false);
+
+	/* unknown transaction, nothing to replay */
+	if (txn == NULL)
+		return;
+
+	txn->final_lsn = commit_lsn;
+	txn->end_lsn = end_lsn;
+	txn->commit_time = commit_time;
+	txn->origin_id = origin_id;
+	txn->origin_lsn = origin_lsn;
+
+	/*
+	 * If the transaction was (partially) streamed, we need to commit it in a
+	 * 'streamed' way. That is, we first stream the remaining part of the
+	 * transaction, and then invoke stream_commit message.
+	 *
+	 * XXX Called after everything (origin ID and LSN, ...) is stored in the
+	 * transaction, so we don't pass that directly.
+	 *
+	 * XXX Somewhat hackish redirection, perhaps needs to be refactored?
+	 */
+	if (rbtxn_is_streamed(txn))
+	{
+		ReorderBufferStreamCommit(rb, txn);
+		return;
+	}
+
+	/*
+	 * If this transaction has no snapshot, it didn't make any changes to the
+	 * database, so there's nothing to decode.  Note that
+	 * ReorderBufferCommitChild will have transferred any snapshots from
+	 * subtransactions if there were any.
+	 */
+	if (txn->base_snapshot == NULL)
+	{
+		Assert(txn->ninvalidations == 0);
+		ReorderBufferCleanupTXN(rb, txn);
+		return;
+	}
+
+	snapshot_now = txn->base_snapshot;
+
+	/*
+	 * Access the main routine to decode the changes and send to output plugin.
+	 */
+	ReorderBufferProcessTXN(rb, txn, commit_lsn, snapshot_now,
+							command_id, false);
+}
+
 /*
  * Abort a transaction that possibly has previous changes. Needs to be first
  * called for subtransactions and then for the toplevel xid.
@@ -1946,6 +2262,13 @@ ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 	if (txn == NULL)
 		return;
 
+	/*
+	 * When the (sub)transaction was streamed, notify the remote node
+	 * about the abort only if we have sent any data for this transaction.
+	 */
+	if (rbtxn_is_streamed(txn) && txn->any_data_sent)
+		rb->stream_abort(rb, txn, lsn);
+
 	/* cosmetic... */
 	txn->final_lsn = lsn;
 
@@ -2030,6 +2353,13 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 	if (txn == NULL)
 		return;
 
+	/*
+	 * When the (sub)transaction was streamed, notify the remote node
+	 * about the abort.
+	 */
+	if (rbtxn_is_streamed(txn))
+		rb->stream_abort(rb, txn, lsn);
+
 	/* cosmetic... */
 	txn->final_lsn = lsn;
 
@@ -2165,8 +2495,17 @@ ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid,
 }
 
 /*
- * Update the memory accounting info. We track memory used by the whole
- * reorder buffer and the transaction containing the change.
+ * Update memory counters to account for the new or removed change.
+ *
+ * We update two counters - in the reorder buffer, and in the transaction
+ * containing the change. The reorder buffer counter allows us to quickly
+ * decide if we reached the memory limit, the transaction counter allows
+ * us to quickly pick the largest transaction for eviction.
+ *
+ * When streaming is enabled, we need to update the toplevel transaction
+ * counters instead - we don't really care about subtransactions as we
+ * can't stream them individually anyway, and we only pick toplevel
+ * transactions for eviction. So only toplevel transactions matter.
  */
 static void
 ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
@@ -2174,6 +2513,7 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 								bool addition)
 {
 	Size		sz;
+	ReorderBufferTXN *txn;
 
 	Assert(change->txn);
 
@@ -2185,19 +2525,28 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 	if (change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID)
 		return;
 
+	txn = change->txn;
+
+	/* if subxact, and streaming supported, use the toplevel instead */
+	if (txn->toptxn && ReorderBufferCanStream(rb))
+		txn = txn->toptxn;
+
 	sz = ReorderBufferChangeSize(change);
 
 	if (addition)
 	{
-		change->txn->size += sz;
+		txn->size += sz;
 		rb->size += sz;
 	}
 	else
 	{
-		Assert((rb->size >= sz) && (change->txn->size >= sz));
-		change->txn->size -= sz;
+		Assert((rb->size >= sz) && (txn->size >= sz));
+		txn->size -= sz;
 		rb->size -= sz;
 	}
+
+	Assert(txn->size <= rb->size);
+	Assert((txn->size >= 0) && (rb->size >= 0));
 }
 
 /*
@@ -2226,6 +2575,7 @@ ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
 	change->lsn = lsn;
 	change->txn = txn;
 	change->action = REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID;
+	change->txn = txn;
 
 	dlist_push_tail(&txn->tuplecids, &change->node);
 	txn->ntuplecids++;
@@ -2315,6 +2665,13 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
 	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
 
 	txn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
+
+	/*
+	 * TOCHECK: Mark toplevel transaction as having catalog changes too
+	 * if one of its children has.
+	 */
+	if (txn->toptxn != NULL)
+		txn->toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
 }
 
 /*
@@ -2418,6 +2775,38 @@ ReorderBufferLargestTXN(ReorderBuffer *rb)
 	return largest;
 }
 
+/*
+ * Find the largest toplevel transaction to evict (by streaming).
+ *
+ * This can be seen as an optimized version of ReorderBufferLargestTXN, which
+ * should give us the same transaction (because we don't update memory account
+ * for subtransaction with streaming, so it's always 0). But we can simply
+ * iterate over the limited number of toplevel transactions.
+ */
+static ReorderBufferTXN *
+ReorderBufferLargestTopTXN(ReorderBuffer *rb)
+{
+	dlist_iter	iter;
+	ReorderBufferTXN *largest = NULL;
+
+	dlist_foreach(iter, &rb->toplevel_by_lsn)
+	{
+		ReorderBufferTXN *txn;
+
+		txn = dlist_container(ReorderBufferTXN, node, iter.cur);
+
+		/* if the current transaction is larger, remember it */
+		if ((!largest) || (txn->size > largest->size))
+			largest = txn;
+	}
+
+	Assert(largest);
+	Assert(largest->size > 0);
+	Assert(largest->size <= rb->size);
+
+	return largest;
+}
+
 /*
  * Check whether the logical_decoding_work_mem limit was reached, and if yes
  * pick the transaction to evict and spill the changes to disk.
@@ -2438,15 +2827,46 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
 
 	/*
 	 * Pick the largest transaction (or subtransaction) and evict it from
-	 * memory by serializing it to disk.
+	 * memory by streaming, if supported. Otherwise spill to disk.
 	 */
-	txn = ReorderBufferLargestTXN(rb);
+	if (ReorderBufferCanStream(rb))
+	{
+		/*
+		 * Pick the largest toplevel transaction and evict it from memory by
+		 * streaming the already decoded part.
+		 */
+		txn = ReorderBufferLargestTopTXN(rb);
 
-	ReorderBufferSerializeTXN(rb, txn);
+		/* we know there has to be one, because the size is not zero */
+		Assert(txn && !txn->toptxn);
+		Assert(txn->size > 0);
+		Assert(rb->size >= txn->size);
+
+		ReorderBufferStreamTXN(rb, txn);
+	}
+	else
+	{
+		/*
+		 * Pick the largest transaction (or subtransaction) and evict it from
+		 * memory by serializing it to disk.
+		 */
+		txn = ReorderBufferLargestTXN(rb);
+
+		/* we know there has to be one, because the size is not zero */
+		Assert(txn);
+		Assert(txn->size > 0);
+		Assert(rb->size >= txn->size);
+
+		ReorderBufferSerializeTXN(rb, txn);
+	}
 
 	/*
 	 * After eviction, the transaction should have no entries in memory, and
 	 * should use 0 bytes for changes.
+	 *
+	 * XXX Checking the size is fine for both cases - spill to disk and
+	 * streaming. But for streaming we should really check nentries_mem for
+	 * all subtransactions too.
 	 */
 	Assert(txn->size == 0);
 	Assert(txn->nentries_mem == 0);
@@ -2739,6 +3159,101 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 	Assert(ondisk->change.action == change->action);
 }
 
+static bool
+ReorderBufferCanStream(ReorderBuffer *rb)
+{
+	LogicalDecodingContext *ctx = rb->private_data;
+
+	return ctx->streaming;
+}
+
+/*
+ * Send data of a large transaction (and its subtransactions) to the
+ * output plugin, but using the stream API.
+ *
+ * XXX Do we need to check if the transaction has some changes to stream
+ * (maybe it got streamed right before the commit, which attempts to
+ * stream it again before the commit)?
+ */
+static void
+ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
+{
+	volatile Snapshot snapshot_now;
+	volatile CommandId command_id;
+
+	/* We can never reach here for a sub transaction. */
+	Assert(txn->toptxn == NULL);
+
+	/*
+	 * XXX Not sure if we can make any assumptions about base snapshot here,
+	 * similarly to what ReorderBufferCommit() does. That relies on
+	 * base_snapshot getting transferred from subxact in
+	 * ReorderBufferCommitChild(), but that was not yet called as the
+	 * transaction is in-progress.
+	 *
+	 * So just walk the subxacts and use the same logic here. But we only need
+	 * to do that once, when the transaction is streamed for the first time.
+	 * After that we need to reuse the snapshot from the previous run.
+	 */
+	if (txn->snapshot_now == NULL)
+	{
+		dlist_iter	subxact_i;
+
+		/* make sure this transaction is streamed for the first time */
+		Assert(!rbtxn_is_streamed(txn));
+
+		/* at the beginning we should have invalid command ID */
+		Assert(txn->command_id == InvalidCommandId);
+
+		dlist_foreach(subxact_i, &txn->subtxns)
+		{
+			ReorderBufferTXN *subtxn;
+
+			subtxn = dlist_container(ReorderBufferTXN, node, subxact_i.cur);
+			ReorderBufferTransferSnapToParent(txn, subtxn);
+		}
+
+		command_id = FirstCommandId;
+		snapshot_now = ReorderBufferCopySnap(rb, txn->base_snapshot,
+											 txn, command_id);
+	}
+	else
+	{
+		/* the transaction must have been already streamed */
+		Assert(rbtxn_is_streamed(txn));
+
+		/*
+		 * Nah, we already have snapshot from the previous streaming run. We
+		 * assume new subxacts can't move the LSN backwards, and so can't beat
+		 * the LSN condition in the previous branch (so no need to walk
+		 * through subxacts again). In fact, we must not do that as we may be
+		 * using snapshot half-way through the subxact.
+		 */
+		command_id = txn->command_id;
+
+		/*
+		 * We can not use txn->snapshot_now directly because after we there
+		 * might be some new sub-transaction which after the last streaming run
+		 * so we need to add those sub-xip in the snapshot.
+		 */
+		snapshot_now = ReorderBufferCopySnap(rb, txn->snapshot_now,
+											 txn, command_id);
+
+		/* Free the previously copied snapshot. */
+		ReorderBufferFreeSnap(rb, txn->snapshot_now);
+	}
+
+	/*
+	 * Access the main routine to decode the changes and send to output plugin.
+	 */
+	ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now,
+							command_id, true);
+
+	Assert(dlist_is_empty(&txn->changes));
+	Assert(txn->nentries == 0);
+	Assert(txn->nentries_mem == 0);
+}
+
 /*
  * Size of a change in memory.
  */
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 79ea33cd26..629eeca7f6 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -173,6 +173,7 @@ typedef struct ReorderBufferChange
 #define RBTXN_HAS_CATALOG_CHANGES 0x0001
 #define RBTXN_IS_SUBXACT          0x0002
 #define RBTXN_IS_SERIALIZED       0x0004
+#define RBTXN_IS_STREAMED         0x0008
 
 /* Does the transaction have catalog changes? */
 #define rbtxn_has_catalog_changes(txn) \
@@ -192,6 +193,24 @@ typedef struct ReorderBufferChange
 	((txn)->txn_flags & RBTXN_IS_SERIALIZED) != 0 \
 )
 
+/*
+ * Has this transaction been streamed to downstream?
+ *
+ * (It's not possible to deduce this from nentries and nentries_mem for
+ * various reasons. For example, all changes may be in subtransactions in
+ * which case we'd have nentries==0 for the toplevel one, which would say
+ * nothing about the streaming. So we maintain this flag, but only for the
+ * toplevel transaction.)
+ *
+ * Note: We never do both stream and serialize a transaction (we only spill
+ * to disk when streaming is not supported by the plugin), so only one of
+ * those two flags may be set at any given time.
+ */
+#define rbtxn_is_streamed(txn) \
+( \
+	((txn)->txn_flags & RBTXN_IS_STREAMED) != 0 \
+)
+
 typedef struct ReorderBufferTXN
 {
 	/* See above */
@@ -225,6 +244,16 @@ typedef struct ReorderBufferTXN
 	 */
 	XLogRecPtr	final_lsn;
 
+	/*
+	 * Have we sent any changes for this transaction in output plugin?
+	 */
+	bool		any_data_sent;
+
+	/*
+	 * Toplevel transaction for this subxact (NULL for top-level).
+	 */
+	struct ReorderBufferTXN *toptxn;
+
 	/*
 	 * LSN pointing to the end of the commit record + 1.
 	 */
@@ -255,6 +284,13 @@ typedef struct ReorderBufferTXN
 	XLogRecPtr	base_snapshot_lsn;
 	dlist_node	base_snapshot_node; /* link in txns_by_base_snapshot_lsn */
 
+	/*
+	 * Snapshot/CID from the previous streaming run. Only valid for already
+	 * streamed transactions (NULL/InvalidCommandId otherwise).
+	 */
+	Snapshot	snapshot_now;
+	CommandId	command_id;
+
 	/*
 	 * How many ReorderBufferChange's do we have in this txn.
 	 *
-- 
2.20.1

