From f2df8156500f55a4db433dc385da5b4020fe8fa9 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathandbossart@gmail.com>
Date: Wed, 4 Jan 2023 11:38:20 -0800
Subject: [PATCH v12 2/2] bypass wal_retrieve_retry_interval for logical
 workers as appropriate

---
 src/backend/commands/subscriptioncmds.c     |   9 +-
 src/backend/replication/logical/launcher.c  | 155 +++++++++++++++-----
 src/backend/replication/logical/tablesync.c |   1 +
 src/backend/replication/logical/worker.c    |   2 +-
 src/include/replication/worker_internal.h   |   3 +
 5 files changed, 128 insertions(+), 42 deletions(-)

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index b9bbb2cf4e..3293ec2043 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -1137,8 +1137,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					BoolGetDatum(opts.enabled);
 				replaces[Anum_pg_subscription_subenabled - 1] = true;
 
-				if (opts.enabled)
-					ApplyLauncherWakeupAtCommit();
+				/*
+				 * Even if the subscription is disabled, we wake up the
+				 * launcher so that it clears its last start time.  This
+				 * ensures the workers will be able to start up right away when
+				 * the subscription is enabled again.
+				 */
+				ApplyLauncherWakeupAtCommit();
 
 				update_tuple = true;
 				break;
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index a69e371c05..544e73dd62 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -297,7 +297,7 @@ retry:
 	{
 		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
 
-		if (!w->in_use)
+		if (!w->in_use && !w->restart_immediately)
 		{
 			worker = w;
 			slot = i;
@@ -620,8 +620,14 @@ logicalrep_worker_cleanup(LogicalRepWorker *worker)
 	worker->proc = NULL;
 	worker->dbid = InvalidOid;
 	worker->userid = InvalidOid;
-	worker->subid = InvalidOid;
 	worker->relid = InvalidOid;
+
+	/*
+	 * If restart_immediately was set, retain the subscription ID so that the
+	 * launcher knows which worker it should restart right away.
+	 */
+	if (!worker->restart_immediately)
+		worker->subid = InvalidOid;
 }
 
 /*
@@ -801,7 +807,13 @@ ApplyLauncherWakeup(void)
 void
 ApplyLauncherMain(Datum main_arg)
 {
-	TimestampTz last_start_time = 0;
+	struct launcher_start_time_mapping
+	{
+		Oid			subid;
+		TimestampTz last_start_time;
+	};
+	HTAB	   *last_start_times = NULL;
+	HASHCTL		ctl;
 
 	ereport(DEBUG1,
 			(errmsg_internal("logical replication launcher started")));
@@ -822,6 +834,18 @@ ApplyLauncherMain(Datum main_arg)
 	 */
 	BackgroundWorkerInitializeConnection(NULL, NULL, 0);
 
+	/*
+	 * Prepare a hash table for tracking last start times of workers, to avoid
+	 * immediate restarts.  Ideally, this hash table would be created in shared
+	 * memory so that other backends could adjust it directly to avoid race
+	 * conditions, but it would need to be a fixed size, and the number of
+	 * subscriptions is virtually unbounded.
+	 */
+	ctl.keysize = sizeof(Oid);
+	ctl.entrysize = sizeof(struct launcher_start_time_mapping);
+	last_start_times = hash_create("Logical replication apply worker start times",
+								   256, &ctl, HASH_ELEM | HASH_BLOBS);
+
 	/* Enter main loop */
 	for (;;)
 	{
@@ -832,63 +856,116 @@ ApplyLauncherMain(Datum main_arg)
 		MemoryContext oldctx;
 		TimestampTz now;
 		long		wait_time = DEFAULT_NAPTIME_PER_CYCLE;
+		HASH_SEQ_STATUS status;
+		struct launcher_start_time_mapping *hentry;
 
 		CHECK_FOR_INTERRUPTS();
 
 		now = GetCurrentTimestamp();
 
-		/* Limit the start retry to once a wal_retrieve_retry_interval */
-		if (TimestampDifferenceExceeds(last_start_time, now,
-									   wal_retrieve_retry_interval))
+		/* Use temporary context for the database list and worker info. */
+		subctx = AllocSetContextCreate(TopMemoryContext,
+									   "Logical Replication Launcher sublist",
+									   ALLOCSET_DEFAULT_SIZES);
+		oldctx = MemoryContextSwitchTo(subctx);
+
+		sublist = get_subscription_list();
+		foreach(lc, sublist)
 		{
-			/* Use temporary context for the database list and worker info. */
-			subctx = AllocSetContextCreate(TopMemoryContext,
-										   "Logical Replication Launcher sublist",
-										   ALLOCSET_DEFAULT_SIZES);
-			oldctx = MemoryContextSwitchTo(subctx);
+			Subscription *sub = (Subscription *) lfirst(lc);
+			bool		bypass_retry_interval = false;
+			LogicalRepWorker *w;
 
-			/* search for subscriptions to start or stop. */
-			sublist = get_subscription_list();
+			LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
 
-			/* Start the missing workers for enabled subscriptions. */
-			foreach(lc, sublist)
+			/*
+			 * Bypass wal_retrieve_retry_interval if the worker set
+			 * restart_immediately.  We do this before checking if the
+			 * subscription is enabled so that the slot can be freed
+			 * regardless.
+			 */
+			for (int i = 0; i < max_logical_replication_workers; i++)
 			{
-				Subscription *sub = (Subscription *) lfirst(lc);
-				LogicalRepWorker *w;
+				w = &LogicalRepCtx->workers[i];
 
-				if (!sub->enabled)
-					continue;
+				if (!w->in_use && !w->proc && w->subid == sub->oid &&
+					w->relid == InvalidOid && w->restart_immediately)
+				{
+					w->restart_immediately = false;
+					logicalrep_worker_cleanup(w);
+					bypass_retry_interval = true;
+					break;
+				}
+			}
 
-				LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-				w = logicalrep_worker_find(sub->oid, InvalidOid, false);
-				LWLockRelease(LogicalRepWorkerLock);
+			w = logicalrep_worker_find(sub->oid, InvalidOid, false);
+			LWLockRelease(LogicalRepWorkerLock);
 
-				if (w == NULL)
-				{
-					last_start_time = now;
-					wait_time = wal_retrieve_retry_interval;
+			/*
+			 * If the subscription isn't enabled, clear its entry in
+			 * last_start_times so that its apply worker is restarted right
+			 * away when it is enabled again.  There is a chance that the
+			 * subscription could be enabled again before we've had a chance to
+			 * clear its entry, in which case we'll wait a little bit before
+			 * starting the worker.
+			 */
+			if (!sub->enabled)
+			{
+				hash_search(last_start_times, &sub->oid, HASH_REMOVE, NULL);
+				continue;
+			}
+
+			/*
+			 * If its okay to start the worker now, do so.  Otherwise, adjust
+			 * wait_time so that we wake up when we can start it.
+			 */
+			if (w == NULL)
+			{
+				bool		found;
 
+				hentry = hash_search(last_start_times, &sub->oid,
+									 HASH_ENTER, &found);
+
+				if (bypass_retry_interval || !found ||
+					TimestampDifferenceExceeds(hentry->last_start_time, now,
+											   wal_retrieve_retry_interval))
+				{
+					hentry->last_start_time = now;
 					logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
 											 sub->owner, InvalidOid);
 				}
-			}
+				else
+				{
+					long		elapsed;
 
-			/* Switch back to original memory context. */
-			MemoryContextSwitchTo(oldctx);
-			/* Clean the temporary memory. */
-			MemoryContextDelete(subctx);
+					elapsed = TimestampDifferenceMilliseconds(hentry->last_start_time, now);
+					wait_time = Min(wait_time, wal_retrieve_retry_interval - elapsed);
+				}
+			}
 		}
-		else
+
+		/*
+		 * Do garbage collection on the last_start_times hash table.  In
+		 * theory, a subscription OID could be reused before its entry is
+		 * removed, but the risk of that seems low, and at worst the launcher
+		 * will wait a bit longer before starting the new subscription's apply
+		 * worker.  This risk could be reduced by removing entries for
+		 * subscriptions that aren't in sublist, but it doesn't seem worth the
+		 * trouble.
+		 */
+		hash_seq_init(&status, last_start_times);
+		while ((hentry = (struct launcher_start_time_mapping *) hash_seq_search(&status)) != NULL)
 		{
-			/*
-			 * The wait in previous cycle was interrupted in less than
-			 * wal_retrieve_retry_interval since last worker was started, this
-			 * usually means crash of the worker, so we should retry in
-			 * wal_retrieve_retry_interval again.
-			 */
-			wait_time = wal_retrieve_retry_interval;
+			if (TimestampDifferenceExceeds(hentry->last_start_time, now,
+										   wal_retrieve_retry_interval))
+				hash_search(last_start_times, &hentry->subid, HASH_REMOVE, NULL);
 		}
 
+		/* Switch back to original memory context. */
+		MemoryContextSwitchTo(oldctx);
+		/* Clean the temporary memory. */
+		MemoryContextDelete(subctx);
+
 		/* Wait for more work. */
 		rc = WaitLatch(MyLatch,
 					   WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 85e2a2861f..d6213c55fc 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -618,6 +618,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 					(errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
 							MySubscription->name)));
 
+			MyLogicalRepWorker->restart_immediately = true;
 			should_exit = true;
 		}
 
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 3e2ea32e1e..0521459898 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3130,7 +3130,7 @@ maybe_reread_subscription(void)
 		ereport(LOG,
 				(errmsg("logical replication apply worker for subscription \"%s\" will restart because of a parameter change",
 						MySubscription->name)));
-
+		MyLogicalRepWorker->restart_immediately = true;
 		proc_exit(0);
 	}
 
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 2a3ec5c2d8..68a56c175b 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -66,6 +66,9 @@ typedef struct LogicalRepWorker
 	TimestampTz last_recv_time;
 	XLogRecPtr	reply_lsn;
 	TimestampTz reply_time;
+
+	/* Should the launcher restart the worker immediately? */
+	bool		restart_immediately;
 } LogicalRepWorker;
 
 /* Main memory context for apply worker. Permanent during worker lifetime. */
-- 
2.25.1

