From b72ea52c865b2d7f0d7d29d0834d71e1ec33d54a Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmodos@pjmodos.net>
Date: Thu, 6 Jul 2017 18:16:44 +0200
Subject: [PATCH] Wait for slot to become free in when dropping it

---
 src/backend/replication/logical/logicalfuncs.c |  2 +-
 src/backend/replication/slot.c                 | 43 +++++++++++++++++++++-----
 src/backend/replication/slotfuncs.c            |  2 +-
 src/backend/replication/walsender.c            |  6 ++--
 src/include/replication/slot.h                 |  8 +++--
 5 files changed, 46 insertions(+), 15 deletions(-)

diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 363ca82..a3ba2b1 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -244,7 +244,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 	else
 		end_of_wal = GetXLogReplayRecPtr(&ThisTimeLineID);
 
-	ReplicationSlotAcquire(NameStr(*name));
+	ReplicationSlotAcquire(NameStr(*name), true);
 
 	PG_TRY();
 	{
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index dc7de20..2993bb9 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -46,6 +46,7 @@
 #include "pgstat.h"
 #include "replication/slot.h"
 #include "storage/fd.h"
+#include "storage/ipc.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
 #include "utils/builtins.h"
@@ -157,6 +158,7 @@ ReplicationSlotsShmemInit(void)
 			/* everything else is zeroed by the memset above */
 			SpinLockInit(&slot->mutex);
 			LWLockInitialize(&slot->io_in_progress_lock, LWTRANCHE_REPLICATION_SLOT_IO_IN_PROGRESS);
+			ConditionVariableInit(&slot->active_cv);
 		}
 	}
 }
@@ -323,7 +325,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
  * Find a previously created slot and mark it as used by this backend.
  */
 void
-ReplicationSlotAcquire(const char *name)
+ReplicationSlotAcquire(const char *name, bool nowait)
 {
 	ReplicationSlot *slot = NULL;
 	int			i;
@@ -331,6 +333,8 @@ ReplicationSlotAcquire(const char *name)
 
 	Assert(MyReplicationSlot == NULL);
 
+retry:
+
 	/* Search for the named slot and mark it active if we find it. */
 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
 	for (i = 0; i < max_replication_slots; i++)
@@ -342,7 +346,10 @@ ReplicationSlotAcquire(const char *name)
 			SpinLockAcquire(&s->mutex);
 			active_pid = s->active_pid;
 			if (active_pid == 0)
+			{
 				active_pid = s->active_pid = MyProcPid;
+				ConditionVariableBroadcast(&s->active_cv);
+			}
 			SpinLockRelease(&s->mutex);
 			slot = s;
 			break;
@@ -350,16 +357,33 @@ ReplicationSlotAcquire(const char *name)
 	}
 	LWLockRelease(ReplicationSlotControlLock);
 
-	/* If we did not find the slot or it was already active, error out. */
+	/* If we did not find the slot, error out. */
 	if (slot == NULL)
 		ereport(ERROR,
 				(errcode(ERRCODE_UNDEFINED_OBJECT),
 				 errmsg("replication slot \"%s\" does not exist", name)));
+
+	/*
+	 * If we did find the slot but it's already acquired by another backend,
+	 * we either error out or retry after short wait, depending on what was
+	 * the behavior requested by caller.
+	 */
 	if (active_pid != MyProcPid)
-		ereport(ERROR,
-				(errcode(ERRCODE_OBJECT_IN_USE),
-				 errmsg("replication slot \"%s\" is active for PID %d",
-						name, active_pid)));
+	{
+		if (nowait)
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_IN_USE),
+					 errmsg("replication slot \"%s\" is active for PID %d",
+							name, active_pid)));
+
+		/* Wait for condition variable signal from ReplicationSlotRelease. */
+		ConditionVariableSleep(&slot->active_cv, PG_WAIT_LOCK);
+		ConditionVariableCancelSleep();
+
+		goto retry;
+	}
+
+
 
 	/* We made this slot active, so it's ours now. */
 	MyReplicationSlot = slot;
@@ -393,6 +417,7 @@ ReplicationSlotRelease(void)
 		 */
 		SpinLockAcquire(&slot->mutex);
 		slot->active_pid = 0;
+		ConditionVariableBroadcast(&slot->active_cv);
 		SpinLockRelease(&slot->mutex);
 	}
 
@@ -451,11 +476,11 @@ ReplicationSlotCleanup(void)
  * Permanently drop replication slot identified by the passed in name.
  */
 void
-ReplicationSlotDrop(const char *name)
+ReplicationSlotDrop(const char *name, bool nowait)
 {
 	Assert(MyReplicationSlot == NULL);
 
-	ReplicationSlotAcquire(name);
+	ReplicationSlotAcquire(name, nowait);
 
 	ReplicationSlotDropAcquired();
 }
@@ -525,6 +550,7 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
 
 		SpinLockAcquire(&slot->mutex);
 		slot->active_pid = 0;
+		ConditionVariableBroadcast(&slot->active_cv);
 		SpinLockRelease(&slot->mutex);
 
 		ereport(fail_softly ? WARNING : ERROR,
@@ -543,6 +569,7 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
 	LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
 	slot->active_pid = 0;
 	slot->in_use = false;
+	ConditionVariableBroadcast(&slot->active_cv);
 	LWLockRelease(ReplicationSlotControlLock);
 
 	/*
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 6dc8088..a5ecc85 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -171,7 +171,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
 
 	CheckSlotRequirements();
 
-	ReplicationSlotDrop(NameStr(*name));
+	ReplicationSlotDrop(NameStr(*name), false);
 
 	PG_RETURN_VOID();
 }
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 002143b..9a2babe 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -541,7 +541,7 @@ StartReplication(StartReplicationCmd *cmd)
 
 	if (cmd->slotname)
 	{
-		ReplicationSlotAcquire(cmd->slotname);
+		ReplicationSlotAcquire(cmd->slotname, true);
 		if (SlotIsLogical(MyReplicationSlot))
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -1028,7 +1028,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 static void
 DropReplicationSlot(DropReplicationSlotCmd *cmd)
 {
-	ReplicationSlotDrop(cmd->slotname);
+	ReplicationSlotDrop(cmd->slotname, false);
 	EndCommand("DROP_REPLICATION_SLOT", DestRemote);
 }
 
@@ -1046,7 +1046,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 
 	Assert(!MyReplicationSlot);
 
-	ReplicationSlotAcquire(cmd->slotname);
+	ReplicationSlotAcquire(cmd->slotname, true);
 
 	/*
 	 * Force a disconnect, so that the decoding code doesn't need to care
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index a283f4e..f97679e 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -12,6 +12,7 @@
 #include "fmgr.h"
 #include "access/xlog.h"
 #include "access/xlogreader.h"
+#include "storage/condition_variable.h"
 #include "storage/lwlock.h"
 #include "storage/shmem.h"
 #include "storage/spin.h"
@@ -93,6 +94,9 @@ typedef struct ReplicationSlot
 	/* Who is streaming out changes for this slot? 0 in unused slots. */
 	pid_t		active_pid;
 
+	/* Conditional variable which is signalled when the above changes. */
+	ConditionVariable active_cv;
+
 	/* any outstanding modifications? */
 	bool		just_dirtied;
 	bool		dirty;
@@ -162,9 +166,9 @@ extern void ReplicationSlotsShmemInit(void);
 extern void ReplicationSlotCreate(const char *name, bool db_specific,
 					  ReplicationSlotPersistency p);
 extern void ReplicationSlotPersist(void);
-extern void ReplicationSlotDrop(const char *name);
+extern void ReplicationSlotDrop(const char *name, bool nowait);
 
-extern void ReplicationSlotAcquire(const char *name);
+extern void ReplicationSlotAcquire(const char *name, bool nowait);
 extern void ReplicationSlotRelease(void);
 extern void ReplicationSlotCleanup(void);
 extern void ReplicationSlotSave(void);
-- 
2.7.4

