From e2bda9fa26f9d92059490d2b9ea7c37ae45f81b1 Mon Sep 17 00:00:00 2001
From: Craig Ringer <craig@2ndquadrant.com>
Date: Wed, 16 Mar 2016 15:12:34 +0800
Subject: [PATCH] Dirty replication slots when confirm_lsn is changed

---
 src/backend/replication/logical/logical.c | 62 +++++++++++++++++++++----------
 1 file changed, 42 insertions(+), 20 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 2c7b749..d3fb1a5 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -440,6 +440,7 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
 	}
 
 	ctx->slot->data.confirmed_flush = ctx->reader->EndRecPtr;
+	ReplicationSlotMarkDirty();
 }
 
 /*
@@ -850,10 +851,15 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 	{
 		bool		updated_xmin = false;
 		bool		updated_restart = false;
+		bool		updated_confirm = false;
 
 		SpinLockAcquire(&MyReplicationSlot->mutex);
 
-		MyReplicationSlot->data.confirmed_flush = lsn;
+		if (MyReplicationSlot->data.confirmed_flush != lsn)
+		{
+			MyReplicationSlot->data.confirmed_flush = lsn;
+			updated_confirm = true;
+		}
 
 		/* if were past the location required for bumping xmin, do so */
 		if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr &&
@@ -891,34 +897,50 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 
 		SpinLockRelease(&MyReplicationSlot->mutex);
 
-		/* first write new xmin to disk, so we know whats up after a crash */
-		if (updated_xmin || updated_restart)
+		if (updated_xmin || updated_restart || updated_confirm)
 		{
 			ReplicationSlotMarkDirty();
-			ReplicationSlotSave();
-			elog(DEBUG1, "updated xmin: %u restart: %u", updated_xmin, updated_restart);
-		}
 
-		/*
-		 * Now the new xmin is safely on disk, we can let the global value
-		 * advance. We do not take ProcArrayLock or similar since we only
-		 * advance xmin here and there's not much harm done by a concurrent
-		 * computation missing that.
-		 */
-		if (updated_xmin)
-		{
-			SpinLockAcquire(&MyReplicationSlot->mutex);
-			MyReplicationSlot->effective_catalog_xmin = MyReplicationSlot->data.catalog_xmin;
-			SpinLockRelease(&MyReplicationSlot->mutex);
+			/*
+			 * first write new xmin to disk, so we know whats up
+			 * after a crash.
+			 */
+			if (updated_xmin || updated_restart)
+			{
+				ReplicationSlotSave();
+				elog(DEBUG1, "updated xmin: %u restart: %u", updated_xmin, updated_restart);
+			}
 
-			ReplicationSlotsUpdateRequiredXmin(false);
-			ReplicationSlotsUpdateRequiredLSN();
+			/*
+			 * Now the new xmin is safely on disk, we can let the global value
+			 * advance. We do not take ProcArrayLock or similar since we only
+			 * advance xmin here and there's not much harm done by a concurrent
+			 * computation missing that.
+			 */
+			if (updated_xmin)
+			{
+				SpinLockAcquire(&MyReplicationSlot->mutex);
+				MyReplicationSlot->effective_catalog_xmin = MyReplicationSlot->data.catalog_xmin;
+				SpinLockRelease(&MyReplicationSlot->mutex);
+
+				ReplicationSlotsUpdateRequiredXmin(false);
+				ReplicationSlotsUpdateRequiredLSN();
+			}
 		}
 	}
 	else
 	{
+		bool dirtied = false;
+
 		SpinLockAcquire(&MyReplicationSlot->mutex);
-		MyReplicationSlot->data.confirmed_flush = lsn;
+		if (MyReplicationSlot->data.confirmed_flush != lsn)
+		{
+			MyReplicationSlot->data.confirmed_flush = lsn;
+			dirtied = true;
+		}
 		SpinLockRelease(&MyReplicationSlot->mutex);
+
+		if (dirtied)
+			ReplicationSlotMarkDirty();
 	}
 }
-- 
2.1.0

