From c05006a7cb1be1c1949bf0413e5ca363177ece02 Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmodos@pjmodos.net>
Date: Fri, 7 Jul 2017 16:27:17 +0200
Subject: [PATCH 6/6] Improve locking for subscriptions and subscribed
 relations

Remove the exclusive lock on the catalog the DROP SUBSCRIPTION was
using and use more granular locking of individual subscriptions.

This should make overall behavior of the subscriptions and their workers
more sane in terms of concurrency and race conditions.
---
 src/backend/commands/subscriptioncmds.c     |  59 +++----
 src/backend/replication/logical/launcher.c  | 253 +++++++++++++---------------
 src/backend/replication/logical/tablesync.c |  63 ++++---
 src/include/replication/worker_internal.h   |   5 +-
 4 files changed, 172 insertions(+), 208 deletions(-)

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index f25a79f..3dc1f4c 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -636,12 +636,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 				 errmsg("subscription \"%s\" does not exist",
 						stmt->subname)));
 
+	subid = HeapTupleGetOid(tup);
+
 	/* must be owner */
-	if (!pg_subscription_ownercheck(HeapTupleGetOid(tup), GetUserId()))
+	if (!pg_subscription_ownercheck(subid, GetUserId()))
 		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_SUBSCRIPTION,
 					   stmt->subname);
 
-	subid = HeapTupleGetOid(tup);
 	sub = GetSubscription(subid, false);
 
 	/* Lock the subscription so nobody else can do anything with it. */
@@ -814,14 +815,12 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	ObjectAddress myself;
 	HeapTuple	tup;
 	Oid			subid;
-	Datum		datum;
-	bool		isnull;
-	char	   *subname;
-	char	   *conninfo;
-	char	   *slotname;
+	List	   *subworkers;
+	ListCell   *lc;
 	char		originname[NAMEDATALEN];
-	char	   *err = NULL;
 	RepOriginId originid;
+	char	   *err = NULL;
+	Subscription *sub;
 	WalReceiverConn *wrconn = NULL;
 	StringInfoData cmd;
 
@@ -829,7 +828,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	 * Lock pg_subscription with AccessExclusiveLock to ensure that the
 	 * launcher doesn't restart new worker during dropping the subscription
 	 */
-	rel = heap_open(SubscriptionRelationId, AccessExclusiveLock);
+	rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
 
 	tup = SearchSysCache2(SUBSCRIPTIONNAME, MyDatabaseId,
 						  CStringGetDatum(stmt->subname));
@@ -861,31 +860,10 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	/* DROP hook for the subscription being removed */
 	InvokeObjectDropHook(SubscriptionRelationId, subid, 0);
 
-	/*
-	 * Lock the subscription so nobody else can do anything with it (including
-	 * the replication workers).
-	 */
-	LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
+	sub = GetSubscription(subid, false);
 
-	/* Get subname */
-	datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
-							Anum_pg_subscription_subname, &isnull);
-	Assert(!isnull);
-	subname = pstrdup(NameStr(*DatumGetName(datum)));
-
-	/* Get conninfo */
-	datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
-							Anum_pg_subscription_subconninfo, &isnull);
-	Assert(!isnull);
-	conninfo = TextDatumGetCString(datum);
-
-	/* Get slotname */
-	datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
-							Anum_pg_subscription_subslotname, &isnull);
-	if (!isnull)
-		slotname = pstrdup(NameStr(*DatumGetName(datum)));
-	else
-		slotname = NULL;
+	/* Lock the subscription so nobody else can do anything with it. */
+	LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
 
 	/*
 	 * Since dropping a replication slot is not transactional, the replication
@@ -897,7 +875,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	 * of a subscription that is associated with a replication slot", but we
 	 * don't have the proper facilities for that.
 	 */
-	if (slotname)
+	if (sub->slotname)
 		PreventTransactionChain(isTopLevel, "DROP SUBSCRIPTION");
 
 
@@ -946,7 +924,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	 * If there is no slot associated with the subscription, we can finish
 	 * here.
 	 */
-	if (!slotname)
+	if (!sub->slotname)
 	{
 		heap_close(rel, NoLock);
 		return;
@@ -959,13 +937,14 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	load_file("libpqwalreceiver", false);
 
 	initStringInfo(&cmd);
-	appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s", quote_identifier(slotname));
+	appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s",
+					 quote_identifier(sub->slotname));
 
-	wrconn = walrcv_connect(conninfo, true, subname, &err);
+	wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
 	if (wrconn == NULL)
 		ereport(ERROR,
 				(errmsg("could not connect to publisher when attempting to "
-						"drop the replication slot \"%s\"", slotname),
+						"drop the replication slot \"%s\"", sub->slotname),
 				 errdetail("The error was: %s", err),
 				 errhint("Use ALTER SUBSCRIPTION ... SET (slot_name = NONE) "
 						 "to disassociate the subscription from the slot.")));
@@ -979,12 +958,12 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 		if (res->status != WALRCV_OK_COMMAND)
 			ereport(ERROR,
 					(errmsg("could not drop the replication slot \"%s\" on publisher",
-							slotname),
+							sub->slotname),
 					 errdetail("The error was: %s", res->err)));
 		else
 			ereport(NOTICE,
 					(errmsg("dropped replication slot \"%s\" on publisher",
-							slotname)));
+							sub->slotname)));
 
 		walrcv_clear_result(res);
 	}
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index caf4844..eea125b 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -42,6 +42,7 @@
 #include "replication/worker_internal.h"
 
 #include "storage/ipc.h"
+#include "storage/lmgr.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
 #include "storage/procsignal.h"
@@ -94,33 +95,23 @@ static bool on_commit_launcher_wakeup = false;
 
 Datum		pg_stat_get_subscription(PG_FUNCTION_ARGS);
 
-
 /*
- * Load the list of subscriptions.
- *
- * Only the fields interesting for worker start/stop functions are filled for
- * each subscription.
+ * Load the list of emabled subscription oids.
  */
 static List *
-get_subscription_list(void)
+get_subscription_oids(void)
 {
 	List	   *res = NIL;
 	Relation	rel;
 	HeapScanDesc scan;
 	HeapTuple	tup;
-	MemoryContext resultcxt;
-
-	/* This is the context that we will allocate our output data in */
-	resultcxt = CurrentMemoryContext;
 
 	/*
-	 * Start a transaction so we can access pg_database, and get a snapshot.
 	 * We don't have a use for the snapshot itself, but we're interested in
 	 * the secondary effect that it sets RecentGlobalXmin.  (This is critical
 	 * for anything that reads heap pages, because HOT may decide to prune
 	 * them even if the process doesn't attempt to modify any tuples.)
 	 */
-	StartTransactionCommand();
 	(void) GetTransactionSnapshot();
 
 	rel = heap_open(SubscriptionRelationId, AccessShareLock);
@@ -129,34 +120,17 @@ get_subscription_list(void)
 	while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
 	{
 		Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
-		Subscription *sub;
-		MemoryContext oldcxt;
 
-		/*
-		 * Allocate our results in the caller's context, not the
-		 * transaction's. We do this inside the loop, and restore the original
-		 * context at the end, so that leaky things like heap_getnext() are
-		 * not called in a potentially long-lived context.
-		 */
-		oldcxt = MemoryContextSwitchTo(resultcxt);
-
-		sub = (Subscription *) palloc0(sizeof(Subscription));
-		sub->oid = HeapTupleGetOid(tup);
-		sub->dbid = subform->subdbid;
-		sub->owner = subform->subowner;
-		sub->enabled = subform->subenabled;
-		sub->name = pstrdup(NameStr(subform->subname));
-		/* We don't fill fields we are not interested in. */
-
-		res = lappend(res, sub);
-		MemoryContextSwitchTo(oldcxt);
+		/* We only care about enabled subscriptions. */
+		if (!subform->subenabled)
+			continue;
+
+		res = lappend_oid(res, HeapTupleGetOid(tup));
 	}
 
 	heap_endscan(scan);
 	heap_close(rel, AccessShareLock);
 
-	CommitTransactionCommand();
-
 	return res;
 }
 
@@ -258,23 +232,62 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
 }
 
 /*
- * Start new apply background worker.
+ * Similar as logicalrep_worker_find(), but returns list of all workers
+ * for the subscription instead just one.
+ */
+List *
+logicalrep_sub_workers_find(Oid subid, bool only_running)
+{
+	int			i;
+	List	   *res = NIL;
+
+	Assert(LWLockHeldByMe(LogicalRepWorkerLock));
+
+	/* Search for attached worker for a given subscription id. */
+	for (i = 0; i < max_logical_replication_workers; i++)
+	{
+		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
+
+		if (w->in_use && w->subid == subid && (!only_running || w->proc))
+			res = lappend(res, w);
+	}
+
+	return res;
+}
+
+/*
+ * Start new logical replication background worker.
  */
 void
-logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
-						 Oid relid)
+logicalrep_worker_launch(Oid subid, Oid relid)
 {
 	BackgroundWorker bgw;
 	BackgroundWorkerHandle *bgw_handle;
 	int			i;
 	int			slot = 0;
-	LogicalRepWorker *worker = NULL;
-	int			nsyncworkers;
+	List	   *subworkers;
+	ListCell   *lc;
 	TimestampTz now;
+	int			nsyncworkers = 0;
+	Subscription *sub;
+	LogicalRepWorker *worker = NULL;
 
 	ereport(DEBUG1,
-			(errmsg("starting logical replication worker for subscription \"%s\"",
-					subname)));
+			(errmsg("starting logical replication worker for subscription %u",
+					subid)));
+
+	/* Block any concurrent DDL on the subscription. */
+	LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+
+	/* Get info about subscription. */
+	sub = GetSubscription(subid, true);
+	if (!sub)
+	{
+		ereport(DEBUG1,
+				(errmsg("subscription %u not found, not starting worker for it",
+						subid)));
+		return;
+	}
 
 	/* Report this after the initial starting message for consistency. */
 	if (max_replication_slots == 0)
@@ -302,7 +315,14 @@ retry:
 		}
 	}
 
-	nsyncworkers = logicalrep_sync_worker_count(subid);
+	subworkers = logicalrep_sub_workers_find(subid, false);
+	foreach (lc, subworkers)
+	{
+		LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
+		if (w->relid != InvalidOid)
+			nsyncworkers ++;
+	}
+	list_free(subworkers);
 
 	now = GetCurrentTimestamp();
 
@@ -348,6 +368,7 @@ retry:
 	if (nsyncworkers >= max_sync_workers_per_subscription)
 	{
 		LWLockRelease(LogicalRepWorkerLock);
+		UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
 		return;
 	}
 
@@ -358,6 +379,7 @@ retry:
 	if (worker == NULL)
 	{
 		LWLockRelease(LogicalRepWorkerLock);
+		UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
 		ereport(WARNING,
 				(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
 				 errmsg("out of logical replication worker slots"),
@@ -370,8 +392,8 @@ retry:
 	worker->in_use = true;
 	worker->generation++;
 	worker->proc = NULL;
-	worker->dbid = dbid;
-	worker->userid = userid;
+	worker->dbid = sub->dbid;
+	worker->userid = sub->owner;
 	worker->subid = subid;
 	worker->relid = relid;
 	worker->relstate = SUBREL_STATE_UNKNOWN;
@@ -382,8 +404,6 @@ retry:
 	worker->reply_lsn = InvalidXLogRecPtr;
 	TIMESTAMP_NOBEGIN(worker->reply_time);
 
-	LWLockRelease(LogicalRepWorkerLock);
-
 	/* Register the new dynamic worker. */
 	memset(&bgw, 0, sizeof(bgw));
 	bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
@@ -402,8 +422,13 @@ retry:
 	bgw.bgw_notify_pid = MyProcPid;
 	bgw.bgw_main_arg = Int32GetDatum(slot);
 
+	/* Try to register the worker and cleanup in case of failure. */
 	if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
 	{
+		logicalrep_worker_cleanup(worker);
+		LWLockRelease(LogicalRepWorkerLock);
+		UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+
 		ereport(WARNING,
 				(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
 				 errmsg("out of background worker slots"),
@@ -411,13 +436,24 @@ retry:
 		return;
 	}
 
+	/* Done with the worker array. */
+	LWLockRelease(LogicalRepWorkerLock);
+
 	/* Now wait until it attaches. */
 	WaitForReplicationWorkerAttach(worker, bgw_handle);
+
+	/*
+	 * Worker either started or died, in any case we are done with the
+	 * subscription.
+	 */
+	UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
 }
 
 /*
  * Stop the logical replication worker for subid/relid, if any, and wait until
  * it detaches from the slot.
+ *
+ * Callers of this function better have exclusive lock on the subscription.
  */
 void
 logicalrep_worker_stop(Oid subid, Oid relid)
@@ -425,7 +461,8 @@ logicalrep_worker_stop(Oid subid, Oid relid)
 	LogicalRepWorker *worker;
 	uint16		generation;
 
-	LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+	/* Exclusive is needed for logicalrep_worker_cleanup(). */
+	LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
 
 	worker = logicalrep_worker_find(subid, relid, false);
 
@@ -436,56 +473,20 @@ logicalrep_worker_stop(Oid subid, Oid relid)
 		return;
 	}
 
+	/* If there is worker but it's not running, clean it up. */
+	if (!worker->proc)
+	{
+		logicalrep_worker_cleanup(worker);
+		LWLockRelease(LogicalRepWorkerLock);
+		return;
+	}
+
 	/*
 	 * Remember which generation was our worker so we can check if what we see
 	 * is still the same one.
 	 */
 	generation = worker->generation;
 
-	/*
-	 * If we found a worker but it does not have proc set then it is still
-	 * starting up; wait for it to finish starting and then kill it.
-	 */
-	while (worker->in_use && !worker->proc)
-	{
-		int			rc;
-
-		LWLockRelease(LogicalRepWorkerLock);
-
-		/* Wait a bit --- we don't expect to have to wait long. */
-		rc = WaitLatch(MyLatch,
-					   WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
-					   10L, WAIT_EVENT_BGWORKER_STARTUP);
-
-		/* emergency bailout if postmaster has died */
-		if (rc & WL_POSTMASTER_DEATH)
-			proc_exit(1);
-
-		if (rc & WL_LATCH_SET)
-		{
-			ResetLatch(MyLatch);
-			CHECK_FOR_INTERRUPTS();
-		}
-
-		/* Recheck worker status. */
-		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-
-		/*
-		 * Check whether the worker slot is no longer used, which would mean
-		 * that the worker has exited, or whether the worker generation is
-		 * different, meaning that a different worker has taken the slot.
-		 */
-		if (!worker->in_use || worker->generation != generation)
-		{
-			LWLockRelease(LogicalRepWorkerLock);
-			return;
-		}
-
-		/* Worker has assigned proc, so it has started. */
-		if (worker->proc)
-			break;
-	}
-
 	/* Now terminate the worker ... */
 	kill(worker->proc->pid, SIGTERM);
 
@@ -515,6 +516,11 @@ logicalrep_worker_stop(Oid subid, Oid relid)
 			CHECK_FOR_INTERRUPTS();
 		}
 
+		/*
+		 * Shared lock is enough for the loop as we don't need to do the slot
+		 * cleanup because at this point we know that the worker has attached
+		 * to the shmem and will clean the slot on detach automatically.
+		 */
 		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 	}
 
@@ -682,30 +688,6 @@ logicalrep_launcher_sighup(SIGNAL_ARGS)
 }
 
 /*
- * Count the number of registered (not necessarily running) sync workers
- * for a subscription.
- */
-int
-logicalrep_sync_worker_count(Oid subid)
-{
-	int			i;
-	int			res = 0;
-
-	Assert(LWLockHeldByMe(LogicalRepWorkerLock));
-
-	/* Search for attached worker for a given subscription id. */
-	for (i = 0; i < max_logical_replication_workers; i++)
-	{
-		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
-
-		if (w->subid == subid && OidIsValid(w->relid))
-			res++;
-	}
-
-	return res;
-}
-
-/*
  * ApplyLauncherShmemSize
  *		Compute space needed for replication launcher shared memory
  */
@@ -875,8 +857,6 @@ ApplyLauncherMain(Datum main_arg)
 		int			rc;
 		List	   *sublist;
 		ListCell   *lc;
-		MemoryContext subctx;
-		MemoryContext oldctx;
 		TimestampTz now;
 		long		wait_time = DEFAULT_NAPTIME_PER_CYCLE;
 
@@ -888,41 +868,38 @@ ApplyLauncherMain(Datum main_arg)
 		if (TimestampDifferenceExceeds(last_start_time, now,
 									   wal_retrieve_retry_interval))
 		{
-			/* Use temporary context for the database list and worker info. */
-			subctx = AllocSetContextCreate(TopMemoryContext,
-										   "Logical Replication Launcher sublist",
-										   ALLOCSET_DEFAULT_MINSIZE,
-										   ALLOCSET_DEFAULT_INITSIZE,
-										   ALLOCSET_DEFAULT_MAXSIZE);
-			oldctx = MemoryContextSwitchTo(subctx);
-
-			/* search for subscriptions to start or stop. */
-			sublist = get_subscription_list();
-
-			/* Start the missing workers for enabled subscriptions. */
+			/*
+			 * Start new transaction so that we can take locks and snapshots.
+			 *
+			 * Any allocations will also be made inside the transaction memory
+			 * context.
+			 */
+			StartTransactionCommand();
+
+			/* Search for subscriptions to start. */
+			sublist = get_subscription_oids();
+
+			/* Start the missing workers. */
 			foreach(lc, sublist)
 			{
-				Subscription *sub = (Subscription *) lfirst(lc);
+				Oid	subid = lfirst_oid(lc);
 				LogicalRepWorker *w;
 
 				LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-				w = logicalrep_worker_find(sub->oid, InvalidOid, false);
+				w = logicalrep_worker_find(subid, InvalidOid, false);
 				LWLockRelease(LogicalRepWorkerLock);
 
-				if (sub->enabled && w == NULL)
+				if (w == NULL)
 				{
 					last_start_time = now;
 					wait_time = wal_retrieve_retry_interval;
 
-					logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
-											 sub->owner, InvalidOid);
+					/* Start the worker. */
+					logicalrep_worker_launch(subid, InvalidOid);
 				}
 			}
 
-			/* Switch back to original memory context. */
-			MemoryContextSwitchTo(oldctx);
-			/* Clean the temporary memory. */
-			MemoryContextDelete(subctx);
+			CommitTransactionCommand();
 		}
 		else
 		{
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 2f6c7b4..d49d186 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -102,12 +102,13 @@
 #include "replication/walreceiver.h"
 #include "replication/worker_internal.h"
 
-#include "utils/snapmgr.h"
 #include "storage/ipc.h"
+#include "storage/lmgr.h"
 
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
+#include "utils/snapmgr.h"
 
 static bool table_states_valid = false;
 
@@ -211,10 +212,8 @@ wait_for_relation_state_change(Oid relid, char expected_state)
  * worker to the expected one.
  *
  * Used when transitioning from SYNCWAIT state to CATCHUP.
- *
- * Returns false if the apply worker has disappeared.
  */
-static bool
+static void
 wait_for_worker_state_change(char expected_state)
 {
 	int			rc;
@@ -230,7 +229,7 @@ wait_for_worker_state_change(char expected_state)
 		 * enough to not give a misleading answer if we do it with no lock.)
 		 */
 		if (MyLogicalRepWorker->relstate == expected_state)
-			return true;
+			return;
 
 		/*
 		 * Bail out if the apply worker has died, else signal it we're
@@ -243,7 +242,10 @@ wait_for_worker_state_change(char expected_state)
 			logicalrep_worker_wakeup_ptr(worker);
 		LWLockRelease(LogicalRepWorkerLock);
 		if (!worker)
-			break;
+			ereport(FATAL,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("terminating logical replication synchronization "
+							"worker due to subscription apply worker exit")));
 
 		/*
 		 * Wait.  We expect to get a latch signal back from the apply worker,
@@ -260,8 +262,6 @@ wait_for_worker_state_change(char expected_state)
 		if (rc & WL_LATCH_SET)
 			ResetLatch(MyLatch);
 	}
-
-	return false;
 }
 
 /*
@@ -346,6 +346,15 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 
 	Assert(!IsTransactionState());
 
+#define ensure_transaction_and_lock() \
+	if (!started_tx) \
+	{\
+		StartTransactionCommand(); \
+		LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0, \
+						 AccessShareLock); \
+		started_tx = true; \
+	}
+
 	/* We need up-to-date sync state info for subscription tables here. */
 	if (!table_states_valid)
 	{
@@ -358,8 +367,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 		list_free_deep(table_states);
 		table_states = NIL;
 
-		StartTransactionCommand();
-		started_tx = true;
+		ensure_transaction_and_lock();
 
 		/* Fetch all non-ready tables. */
 		rstates = GetSubscriptionNotReadyRelations(MySubscription->oid);
@@ -421,11 +429,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 			{
 				rstate->state = SUBREL_STATE_READY;
 				rstate->lsn = current_lsn;
-				if (!started_tx)
-				{
-					StartTransactionCommand();
-					started_tx = true;
-				}
+
+				ensure_transaction_and_lock();
 
 				UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
 										   rstate->relid, rstate->state,
@@ -476,12 +481,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 					 * Enter busy loop and wait for synchronization worker to
 					 * reach expected state (or die trying).
 					 */
-					if (!started_tx)
-					{
-						StartTransactionCommand();
-						started_tx = true;
-					}
-
+					ensure_transaction_and_lock();
 					wait_for_relation_state_change(rstate->relid,
 												   SUBREL_STATE_SYNCDONE);
 				}
@@ -495,8 +495,19 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 				 * running sync workers for this subscription, while we have
 				 * the lock.
 				 */
-				int			nsyncworkers =
-				logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
+				List	   *subworkers;
+				ListCell   *lc;
+				int			nsyncworkers = 0;
+
+				subworkers = logicalrep_sub_workers_find(MySubscription->oid,
+														 false);
+				foreach (lc, subworkers)
+				{
+					LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
+					if (w->relid != InvalidOid)
+						nsyncworkers ++;
+				}
+				list_free(subworkers);
 
 				/* Now safe to release the LWLock */
 				LWLockRelease(LogicalRepWorkerLock);
@@ -518,10 +529,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 						TimestampDifferenceExceeds(hentry->last_start_time, now,
 												   wal_retrieve_retry_interval))
 					{
-						logicalrep_worker_launch(MyLogicalRepWorker->dbid,
-												 MySubscription->oid,
-												 MySubscription->name,
-												 MyLogicalRepWorker->userid,
+						ensure_transaction_and_lock();
+						logicalrep_worker_launch(MySubscription->oid,
 												 rstate->relid);
 						hentry->last_start_time = now;
 					}
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 402b166..add7841 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -71,14 +71,13 @@ extern bool in_remote_transaction;
 extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
 					   bool only_running);
-extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
-						 Oid userid, Oid relid);
+extern void logicalrep_worker_launch(Oid subid, Oid relid);
 extern void logicalrep_worker_stop(Oid subid, Oid relid);
 extern void logicalrep_worker_stop_at_commit(Oid subid, Oid relid);
 extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
 extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
 
-extern int	logicalrep_sync_worker_count(Oid subid);
+extern List *logicalrep_sub_workers_find(Oid subid, bool only_running);
 
 extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos);
 void		process_syncing_tables(XLogRecPtr current_lsn);
-- 
2.7.4

