From 2a02c117e538f5085e82f211b41c3b14c37ce3ff Mon Sep 17 00:00:00 2001
From: Arseny Sher <sher-ars@yandex.ru>
Date: Tue, 27 Feb 2018 19:49:28 +0300
Subject: [PATCH] Use single snapshot for replaying transaction.

---
 src/backend/replication/logical/reorderbuffer.c | 39 +++++---------------
 src/backend/replication/logical/snapbuild.c     | 49 ++++++++++---------------
 2 files changed, 28 insertions(+), 60 deletions(-)

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index c72a611a39..04b8068ddd 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1479,36 +1479,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 					break;
 
 				case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
-					/* get rid of the old */
-					TeardownHistoricSnapshot(false);
 
-					if (snapshot_now->copied)
-					{
-						ReorderBufferFreeSnap(rb, snapshot_now);
-						snapshot_now =
-							ReorderBufferCopySnap(rb, change->data.snapshot,
-												  txn, command_id);
-					}
-
-					/*
-					 * Restored from disk, need to be careful not to double
-					 * free. We could introduce refcounting for that, but for
-					 * now this seems infrequent enough not to care.
-					 */
-					else if (change->data.snapshot->copied)
-					{
-						snapshot_now =
-							ReorderBufferCopySnap(rb, change->data.snapshot,
-												  txn, command_id);
-					}
-					else
-					{
-						snapshot_now = change->data.snapshot;
-					}
-
-
-					/* and continue with the new one */
-					SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
 					break;
 
 				case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
@@ -1831,7 +1802,15 @@ ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid,
 	ReorderBufferTXN *txn;
 	bool		is_new;
 
-	txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
+	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
+	/*
+	 * If xact still doesn't exist during the commit, it is empty and hadn't
+	 * had any subxacts (if it had ones, XLOG_XACT_ASSIGNMENT would be logged
+	 * and it would present in ReorderBuffer anyway).
+	 */
+	if (txn == NULL)
+		return;
+
 	Assert(txn->base_snapshot == NULL);
 	Assert(snap != NULL);
 
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 4123cdebcf..2b258ff7b9 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -734,29 +734,6 @@ SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
 		TransactionIdPrecedes(xid, SnapBuildNextPhaseAt(builder)))
 		return false;
 
-	/*
-	 * If the reorderbuffer doesn't yet have a snapshot, add one now, it will
-	 * be needed to decode the change we're currently processing.
-	 */
-	if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
-	{
-		/* only build a new snapshot if we don't have a prebuilt one */
-		if (builder->snapshot == NULL)
-		{
-			builder->snapshot = SnapBuildBuildSnapshot(builder);
-			/* increase refcount for the snapshot builder */
-			SnapBuildSnapIncRefcount(builder->snapshot);
-		}
-
-		/*
-		 * Increase refcount for the transaction we're handing the snapshot
-		 * out to.
-		 */
-		SnapBuildSnapIncRefcount(builder->snapshot);
-		ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
-									 builder->snapshot);
-	}
-
 	return true;
 }
 
@@ -965,6 +942,25 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
 		}
 	}
 
+	/*
+	 * Snapbuilder's snapshot at the moment of COMMIT is the snapshot we will
+	 * use to decode the xact.
+	 */
+	if (builder->state >= SNAPBUILD_FULL_SNAPSHOT)
+	{
+		/* only build a new snapshot if we don't have a prebuilt one */
+		if (builder->snapshot == NULL)
+		{
+			builder->snapshot = SnapBuildBuildSnapshot(builder);
+			/* increase refcount for the snapshot builder */
+			SnapBuildSnapIncRefcount(builder->snapshot);
+		}
+
+		SnapBuildSnapIncRefcount(builder->snapshot);
+		ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
+									 builder->snapshot);
+	}
+
 	for (nxact = 0; nxact < nsubxacts; nxact++)
 	{
 		TransactionId subxid = subxacts[nxact];
@@ -1063,13 +1059,6 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
 
 		builder->snapshot = SnapBuildBuildSnapshot(builder);
 
-		/* we might need to execute invalidations, add snapshot */
-		if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
-		{
-			SnapBuildSnapIncRefcount(builder->snapshot);
-			ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
-										 builder->snapshot);
-		}
 
 		/* refcount of the snapshot builder for the new snapshot */
 		SnapBuildSnapIncRefcount(builder->snapshot);
-- 
2.11.0

