From 59debb27c5aefedb8add2925e983815a5e0b9bd6 Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmodos@pjmodos.net>
Date: Thu, 6 Jul 2017 23:57:05 +0200
Subject: [PATCH 4/6] Only kill sync workers at commit time in SUBSCRIPTION DDL

---
 src/backend/access/transam/xact.c          |  9 +++++
 src/backend/commands/subscriptioncmds.c    | 26 +++++++++++--
 src/backend/replication/logical/launcher.c | 60 +++++++++++++++++++++++++++++-
 src/include/replication/logicallauncher.h  |  1 +
 src/include/replication/worker_internal.h  |  1 +
 5 files changed, 91 insertions(+), 6 deletions(-)

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index b0aa69f..322502d 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -2277,6 +2277,15 @@ PrepareTransaction(void)
 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 				 errmsg("cannot PREPARE a transaction that has exported snapshots")));
 
+	/*
+	 * Similar to above, don't allow PREPARE but for transaction that kill
+	 * logical replication, workers.
+	 */
+	if (XactManipulatesLogicalReplicationWorkers())
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("cannot PREPARE a transaction that has manipulated logical replication workers")));
+
 	/* Prevent cancel/die interrupt while cleaning up */
 	HOLD_INTERRUPTS();
 
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 8d144ab..f25a79f 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -597,7 +597,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 
 			RemoveSubscriptionRel(sub->oid, relid);
 
-			logicalrep_worker_stop(sub->oid, relid);
+			logicalrep_worker_stop_at_commit(sub->oid, relid);
 
 			namespace = get_namespace_name(get_rel_namespace(relid));
 			ereport(NOTICE,
@@ -909,15 +909,33 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 
 	ReleaseSysCache(tup);
 
+	/*
+	 * If we are dropping slot, stop all the subscription workers immediately
+	 * so that the slot is accessible, otherwise just shedule the stop at the
+	 * end of the transaction.
+	 *
+	 * New workers won't be started because we hold exclusive lock on the
+	 * subscription till the end of transaction.
+	 */
+	LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+	subworkers = logicalrep_sub_workers_find(subid, false);
+	LWLockRelease(LogicalRepWorkerLock);
+	foreach (lc, subworkers)
+	{
+		LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
+		if (sub->slotname)
+			logicalrep_worker_stop(w->subid, w->relid);
+		else
+			logicalrep_worker_stop_at_commit(w->subid, w->relid);
+	}
+	list_free(subworkers);
+
 	/* Clean up dependencies */
 	deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
 
 	/* Remove any associated relation synchronization states. */
 	RemoveSubscriptionRel(subid, InvalidOid);
 
-	/* Kill the apply worker so that the slot becomes accessible. */
-	logicalrep_worker_stop(subid, InvalidOid);
-
 	/* Remove the origin tracking if exists. */
 	snprintf(originname, sizeof(originname), "pg_%u", subid);
 	originid = replorigin_by_name(originname, true);
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index d165d51..caf4844 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -73,6 +73,14 @@ typedef struct LogicalRepCtxStruct
 
 LogicalRepCtxStruct *LogicalRepCtx;
 
+typedef struct LogicalRepWorkerId
+{
+	Oid	subid;
+	Oid relid;
+} LogicalRepWorkerId;
+
+static List *on_commit_stop_workers = NIL;
+
 static void ApplyLauncherWakeup(void);
 static void logicalrep_launcher_onexit(int code, Datum arg);
 static void logicalrep_worker_onexit(int code, Datum arg);
@@ -514,6 +522,27 @@ logicalrep_worker_stop(Oid subid, Oid relid)
 }
 
 /*
+ * Request worker for specified sub/rel to be stopped on commit.
+ */
+void
+logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
+{
+	LogicalRepWorkerId *wid;
+	MemoryContext		oldctx;
+
+	/* Make sure we store the info in context which survives until commit. */
+	oldctx = MemoryContextSwitchTo(TopTransactionContext);
+
+	wid = palloc(sizeof(LogicalRepWorkerId));
+	wid->subid = subid;
+	wid->relid = relid;
+
+	on_commit_stop_workers = lappend(on_commit_stop_workers, wid);
+
+	MemoryContextSwitchTo(oldctx);
+}
+
+/*
  * Wake up (using latch) any logical replication worker for specified sub/rel.
  */
 void
@@ -754,14 +783,41 @@ ApplyLauncherShmemInit(void)
 }
 
 /*
+ * XactManipulatesLogicalReplicationWorkers
+ *		Check whether current transaction has manipulated logical replication
+ *		workers.
+ */
+bool
+XactManipulatesLogicalReplicationWorkers(void)
+{
+	return (on_commit_stop_workers != NIL);
+}
+
+/*
  * Wakeup the launcher on commit if requested.
  */
 void
 AtEOXact_ApplyLauncher(bool isCommit)
 {
-	if (isCommit && on_commit_launcher_wakeup)
-		ApplyLauncherWakeup();
+	ListCell *lc;
 
+	if (isCommit)
+	{
+		foreach (lc, on_commit_stop_workers)
+		{
+			LogicalRepWorkerId *wid = lfirst(lc);
+			logicalrep_worker_stop(wid->subid, wid->relid);
+		}
+
+		if (on_commit_launcher_wakeup)
+			ApplyLauncherWakeup();
+	}
+
+	/*
+	 * No need to pfree on_commit_stop_workers, it's been allocated in
+	 * transaction memory context which is going to be cleaned soon.
+	 */
+	on_commit_stop_workers = NIL;
 	on_commit_launcher_wakeup = false;
 }
 
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
index aac7d32..78016c4 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -22,6 +22,7 @@ extern Size ApplyLauncherShmemSize(void);
 extern void ApplyLauncherShmemInit(void);
 
 extern void ApplyLauncherWakeupAtCommit(void);
+extern bool XactManipulatesLogicalReplicationWorkers(void);
 extern void AtEOXact_ApplyLauncher(bool isCommit);
 
 extern bool IsLogicalLauncher(void);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 494a3a3..402b166 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -74,6 +74,7 @@ extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
 extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
 						 Oid userid, Oid relid);
 extern void logicalrep_worker_stop(Oid subid, Oid relid);
+extern void logicalrep_worker_stop_at_commit(Oid subid, Oid relid);
 extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
 extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
 
-- 
2.7.4

