From 7d5b48c8cb80e7c867b2096c999d08feda50b197 Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmodos@pjmodos.net>
Date: Fri, 24 Feb 2017 21:39:03 +0100
Subject: [PATCH 1/5] Reserve global xmin for create slot snasphot export

Otherwise the VACUUM or pruning might remove tuples still needed by the
exported snapshot.
---
 src/backend/replication/logical/logical.c | 21 ++++++++++++++++++++-
 1 file changed, 20 insertions(+), 1 deletion(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 5529ac8..57c392c 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -267,12 +267,18 @@ CreateInitDecodingContext(char *plugin,
 	 * the slot machinery about the new limit. Once that's done the
 	 * ProcArrayLock can be released as the slot machinery now is
 	 * protecting against vacuum.
+	 *
+	 * Note that we only store the global xmin temporarily in the in-memory
+	 * state so that the initial snapshot can be exported. After initial
+	 * snapshot is done global xmin should be reset and not tracked anymore
+	 * so we are fine with losing the global xmin after crash.
 	 * ----
 	 */
 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
 
 	slot->effective_catalog_xmin = GetOldestSafeDecodingTransactionId();
 	slot->data.catalog_xmin = slot->effective_catalog_xmin;
+	slot->effective_xmin = slot->effective_catalog_xmin;
 
 	ReplicationSlotsComputeRequiredXmin(true);
 
@@ -282,7 +288,7 @@ CreateInitDecodingContext(char *plugin,
 	 * tell the snapshot builder to only assemble snapshot once reaching the
 	 * running_xact's record with the respective xmin.
 	 */
-	xmin_horizon = slot->data.catalog_xmin;
+	xmin_horizon = slot->effective_xmin;
 
 	ReplicationSlotMarkDirty();
 	ReplicationSlotSave();
@@ -456,9 +462,22 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
 void
 FreeDecodingContext(LogicalDecodingContext *ctx)
 {
+	ReplicationSlot *slot = MyReplicationSlot;
+
 	if (ctx->callbacks.shutdown_cb != NULL)
 		shutdown_cb_wrapper(ctx);
 
+	/*
+	 * Cleanup global xmin for the slot that we may have set in
+	 * CreateInitDecodingContext(). We do not take ProcArrayLock or similar
+	 * since we only reset xmin here and there's not much harm done by a
+	 * concurrent computation missing that.
+	 */
+	SpinLockAcquire(&slot->mutex);
+	slot->effective_xmin = InvalidTransactionId;
+	SpinLockRelease(&slot->mutex);
+	ReplicationSlotsComputeRequiredXmin(false);
+
 	ReorderBufferFree(ctx->reorder);
 	FreeSnapshotBuilder(ctx->snapshot_builder);
 	XLogReaderFree(ctx->reader);
-- 
2.7.4

