From 67a44702ff146756b33e8d15e91a02f5d9e86792 Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmodos@pjmodos.net>
Date: Fri, 24 Feb 2017 21:39:03 +0100
Subject: [PATCH 1/4] Reserve global xmin for create slot snasphot export

Otherwise the VACUUM or pruning might remove tuples still needed by the
exported snapshot.
---
 src/backend/replication/logical/logical.c | 31 +++++++++++++++++++++++++++----
 1 file changed, 27 insertions(+), 4 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 5529ac8..9062244 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -267,12 +267,18 @@ CreateInitDecodingContext(char *plugin,
 	 * the slot machinery about the new limit. Once that's done the
 	 * ProcArrayLock can be released as the slot machinery now is
 	 * protecting against vacuum.
+	 *
+	 * Note that we only store the global xmin temporarily so that the initial
+	 * snapshot can be exported. After initial snapshot is done global xmin
+	 * should be reset and not tracked anymore.
 	 * ----
 	 */
 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
 
 	slot->effective_catalog_xmin = GetOldestSafeDecodingTransactionId();
 	slot->data.catalog_xmin = slot->effective_catalog_xmin;
+	slot->effective_xmin = slot->effective_catalog_xmin;
+	slot->data.xmin = slot->effective_catalog_xmin;
 
 	ReplicationSlotsComputeRequiredXmin(true);
 
@@ -282,7 +288,7 @@ CreateInitDecodingContext(char *plugin,
 	 * tell the snapshot builder to only assemble snapshot once reaching the
 	 * running_xact's record with the respective xmin.
 	 */
-	xmin_horizon = slot->data.catalog_xmin;
+	xmin_horizon = slot->effective_xmin;
 
 	ReplicationSlotMarkDirty();
 	ReplicationSlotSave();
@@ -456,12 +462,29 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
 void
 FreeDecodingContext(LogicalDecodingContext *ctx)
 {
+	ReplicationSlot *slot = MyReplicationSlot;
+
 	if (ctx->callbacks.shutdown_cb != NULL)
 		shutdown_cb_wrapper(ctx);
 
-	ReorderBufferFree(ctx->reorder);
-	FreeSnapshotBuilder(ctx->snapshot_builder);
-	XLogReaderFree(ctx->reader);
+	/*
+	 * Cleanup global xmin for the slot that we may have set in
+	 * CreateInitDecodingContext(). We do not take ProcArrayLock or similar
+	 * since we only reset xmin here and there's not much harm done by a
+	 * concurrent computation missing that.
+	 */
+	SpinLockAcquire(&slot->mutex);
+	slot->effective_xmin = InvalidTransactionId;
+	slot->data.xmin = InvalidTransactionId;
+	SpinLockRelease(&slot->mutex);
+	ReplicationSlotsComputeRequiredXmin(false);
+
+	if (ctx->reorder)
+		ReorderBufferFree(ctx->reorder);
+	if (ctx->snapshot_builder)
+		FreeSnapshotBuilder(ctx->snapshot_builder);
+	if (ctx->reader)
+		XLogReaderFree(ctx->reader);
 	MemoryContextDelete(ctx->context);
 }
 
-- 
2.7.4

