Questions about logicalrep_worker_launch()

Started by Fujii Masao9 months ago6 messages
#1Fujii Masao
masao.fujii@oss.nttdata.com
1 attachment(s)

Hi,

While reading the code of logicalrep_worker_launch(), I had two questions:

(1)
When the sync worker limit per subscription is reached, logicalrep_worker_launch()
runs garbage collection to try to free up slots before checking the limit again.
That makes sense.

But should we do the same when the parallel apply worker limit is reached?
Currently, if we've hit the parallel apply worker limit but not the sync worker limit
and we find an unused worker slot, garbage collection doesn't run. Would it
make sense to also run garbage collection in that case?

(2)
If garbage collection removes at least one worker, logicalrep_worker_launch()
scans all worker slots again to look for a free one. But since we know at least one
slot was freed, this retry might be unnecessary. We could just reuse the freed
slot directly. Is that correct?

The attached patch addresses both points. Since logicalrep_worker_launch()
isn't a performance-critical path, this might not be a high-priority change.
But if my understanding is correct, I'm a bit tempted to apply it as a refactoring.

Regards,

--
Fujii Masao
Advanced Computing Technology Center
Research and Development Headquarters
NTT DATA CORPORATION

Attachments:

v1-0001-Refactor-logicalrep_worker_launch.patchtext/plain; charset=UTF-8; name=v1-0001-Refactor-logicalrep_worker_launch.patchDownload
From 3e541cbaacfde2c212dd95c7d53329399511a9ec Mon Sep 17 00:00:00 2001
From: Fujii Masao <fujii@postgresql.org>
Date: Sat, 26 Apr 2025 00:06:40 +0900
Subject: [PATCH v1] Refactor logicalrep_worker_launch().

---
 src/backend/replication/logical/launcher.c | 64 +++++++++++-----------
 1 file changed, 31 insertions(+), 33 deletions(-)

diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 10677da56b2..e2af6ccbdeb 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -96,7 +96,7 @@ static void logicalrep_launcher_onexit(int code, Datum arg);
 static void logicalrep_worker_onexit(int code, Datum arg);
 static void logicalrep_worker_detach(void);
 static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
-static int	logicalrep_pa_worker_count(Oid subid);
+static void logicalrep_worker_count(Oid subid, int *nsync, int *npa);
 static void logicalrep_launcher_attach_dshmem(void);
 static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
 static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
@@ -336,7 +336,6 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype,
 	 */
 	LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
 
-retry:
 	/* Find unused worker slot. */
 	for (i = 0; i < max_logical_replication_workers; i++)
 	{
@@ -350,7 +349,7 @@ retry:
 		}
 	}
 
-	nsyncworkers = logicalrep_sync_worker_count(subid);
+	logicalrep_worker_count(subid, &nsyncworkers, &nparallelapplyworkers);
 
 	now = GetCurrentTimestamp();
 
@@ -359,7 +358,8 @@ retry:
 	 * reason we do this is because if some worker failed to start up and its
 	 * parent has crashed while waiting, the in_use state was never cleared.
 	 */
-	if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
+	if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription ||
+		nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
 	{
 		bool		did_cleanup = false;
 
@@ -381,11 +381,17 @@ retry:
 
 				logicalrep_worker_cleanup(w);
 				did_cleanup = true;
+
+				if (worker == NULL)
+				{
+					worker = w;
+					slot = i;
+				}
 			}
 		}
 
 		if (did_cleanup)
-			goto retry;
+			logicalrep_worker_count(subid, &nsyncworkers, &nparallelapplyworkers);
 	}
 
 	/*
@@ -399,8 +405,6 @@ retry:
 		return false;
 	}
 
-	nparallelapplyworkers = logicalrep_pa_worker_count(subid);
-
 	/*
 	 * Return false if the number of parallel apply workers reached the limit
 	 * per subscription.
@@ -844,48 +848,42 @@ logicalrep_worker_onexit(int code, Datum arg)
 int
 logicalrep_sync_worker_count(Oid subid)
 {
-	int			i;
 	int			res = 0;
 
-	Assert(LWLockHeldByMe(LogicalRepWorkerLock));
-
-	/* Search for attached worker for a given subscription id. */
-	for (i = 0; i < max_logical_replication_workers; i++)
-	{
-		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
-
-		if (isTablesyncWorker(w) && w->subid == subid)
-			res++;
-	}
-
+	logicalrep_worker_count(subid, &res, NULL);
 	return res;
 }
 
 /*
- * Count the number of registered (but not necessarily running) parallel apply
- * workers for a subscription.
+ * Count the number of registered (but not necessarily running) sync workers
+ * and parallel apply workers for a subscription.
  */
-static int
-logicalrep_pa_worker_count(Oid subid)
+static void
+logicalrep_worker_count(Oid subid, int *nsync, int *npa)
 {
-	int			i;
-	int			res = 0;
-
 	Assert(LWLockHeldByMe(LogicalRepWorkerLock));
 
+	if (nsync != NULL)
+		*nsync = 0;
+	if (npa != NULL)
+		*npa = 0;
+
 	/*
-	 * Scan all attached parallel apply workers, only counting those which
-	 * have the given subscription id.
+	 * Scan all attached sync and parallel apply workers, only counting those
+	 * which have the given subscription id.
 	 */
-	for (i = 0; i < max_logical_replication_workers; i++)
+	for (int i = 0; i < max_logical_replication_workers; i++)
 	{
 		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
 
-		if (isParallelApplyWorker(w) && w->subid == subid)
-			res++;
+		if (w->subid == subid)
+		{
+			if (nsync != NULL && isTablesyncWorker(w))
+				(*nsync)++;
+			if (npa != NULL && isParallelApplyWorker(w))
+				(*npa)++;
+		}
 	}
-
-	return res;
 }
 
 /*
-- 
2.49.0

#2Masahiko Sawada
sawada.mshk@gmail.com
In reply to: Fujii Masao (#1)
Re: Questions about logicalrep_worker_launch()

Hi,

On Fri, Apr 25, 2025 at 9:10 AM Fujii Masao <masao.fujii@oss.nttdata.com> wrote:

Hi,

While reading the code of logicalrep_worker_launch(), I had two questions:

(1)
When the sync worker limit per subscription is reached, logicalrep_worker_launch()
runs garbage collection to try to free up slots before checking the limit again.
That makes sense.

But should we do the same when the parallel apply worker limit is reached?
Currently, if we've hit the parallel apply worker limit but not the sync worker limit
and we find an unused worker slot, garbage collection doesn't run. Would it
make sense to also run garbage collection in that case?

Good point. In that case, we would end up not being able to launch
parallel apply workers for the subscription until either we used up
all worker slots or reached the sync worker limit, which is bad.

(2)
If garbage collection removes at least one worker, logicalrep_worker_launch()
scans all worker slots again to look for a free one. But since we know at least one
slot was freed, this retry might be unnecessary. We could just reuse the freed
slot directly. Is that correct?

Agreed. Since these codes are protected by LogicalRepWorkerLock in an
exclusive mode, we should be able to use the worker slot that we just
cleaned up.

The attached patch addresses both points. Since logicalrep_worker_launch()
isn't a performance-critical path, this might not be a high-priority change.
But if my understanding is correct, I'm a bit tempted to apply it as a refactoring.

I agree with these changes.

I think that while the changes for (2) should be for v19, the changes
for (1) might be treated as a bug fix?

Regards,

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com

#3Fujii Masao
masao.fujii@oss.nttdata.com
In reply to: Masahiko Sawada (#2)
3 attachment(s)
Re: Questions about logicalrep_worker_launch()

On 2025/04/26 3:03, Masahiko Sawada wrote:

I agree with these changes.

I think that while the changes for (2) should be for v19, the changes
for (1) might be treated as a bug fix?

Agreed. I've split the patch into two parts:

0001 is for (1) and is a bug fix that should be back-patched to v16,
where parallel apply workers were introduced. Since it didn't apply
cleanly to v16, I also created a separate patch specifically for v16.

0002 is for (2) and is intended for v19.

Regards,

--
Fujii Masao
Advanced Computing Technology Center
Research and Development Headquarters
NTT DATA CORPORATION

Attachments:

v2-0001-master-PG17-Fix-bug-that-could-block-the-startup-of-parallel-.patchtext/plain; charset=UTF-8; name=v2-0001-master-PG17-Fix-bug-that-could-block-the-startup-of-parallel-.patchDownload
From 711df86e4b90d2578f15606abeaa3db558434b92 Mon Sep 17 00:00:00 2001
From: Fujii Masao <fujii@postgresql.org>
Date: Mon, 28 Apr 2025 14:29:26 +0900
Subject: [PATCH v2 1/2] Fix bug that could block the startup of parallel apply
 workers.

If a logical replication worker fails to start and its parent crashes
while waiting, its worker slot can remain marked as "in use".
This can prevent new workers from starting, as the launcher may not
find a free slot or may incorrectly think the sync or parallel apply
worker limits have been reached.

To handle this, the launcher already performs garbage collection
when no free slot is found or when the sync worker limit is hit,
and then retries launching workers. However, it previously did not
trigger garbage collection when the parallel apply worker limit
was reached. As a result, stale slots could block new parallel apply
workers from starting, even though they could have been launched
after cleanup.

This commit fixes the issue by triggering garbage collection
when the parallel apply worker limit is reached as well. If stale slots
are cleared and the number of parallel apply workers drops below
the limit, new parallel apply worker can then be started successfully.

Back-patch to v16, where parallel apply workers were introduced.
---
 src/backend/replication/logical/launcher.c | 65 +++++++++++-----------
 1 file changed, 31 insertions(+), 34 deletions(-)

diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 10677da56b2..ac95afe4bae 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -96,7 +96,7 @@ static void logicalrep_launcher_onexit(int code, Datum arg);
 static void logicalrep_worker_onexit(int code, Datum arg);
 static void logicalrep_worker_detach(void);
 static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
-static int	logicalrep_pa_worker_count(Oid subid);
+static void logicalrep_worker_count(Oid subid, int *nsync, int *nparallelapply);
 static void logicalrep_launcher_attach_dshmem(void);
 static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
 static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
@@ -350,16 +350,21 @@ retry:
 		}
 	}
 
-	nsyncworkers = logicalrep_sync_worker_count(subid);
+	logicalrep_worker_count(subid, &nsyncworkers, &nparallelapplyworkers);
 
 	now = GetCurrentTimestamp();
 
 	/*
-	 * If we didn't find a free slot, try to do garbage collection.  The
-	 * reason we do this is because if some worker failed to start up and its
-	 * parent has crashed while waiting, the in_use state was never cleared.
+	 * If we can't start a new logical replication background worker because
+	 * no free slot is available, or because the number of sync workers or
+	 * parallel apply workers has reached the limit per subscriptoin, try
+	 * running garbage collection. The reason we do this is because if some
+	 * workers failed to start up and their parent has crashed while waiting,
+	 * the in_use state was never cleared. By freeing up these stale worker
+	 * slots, we may be able to start a new worker.
 	 */
-	if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
+	if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription ||
+		nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
 	{
 		bool		did_cleanup = false;
 
@@ -399,8 +404,6 @@ retry:
 		return false;
 	}
 
-	nparallelapplyworkers = logicalrep_pa_worker_count(subid);
-
 	/*
 	 * Return false if the number of parallel apply workers reached the limit
 	 * per subscription.
@@ -844,48 +847,42 @@ logicalrep_worker_onexit(int code, Datum arg)
 int
 logicalrep_sync_worker_count(Oid subid)
 {
-	int			i;
 	int			res = 0;
 
-	Assert(LWLockHeldByMe(LogicalRepWorkerLock));
-
-	/* Search for attached worker for a given subscription id. */
-	for (i = 0; i < max_logical_replication_workers; i++)
-	{
-		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
-
-		if (isTablesyncWorker(w) && w->subid == subid)
-			res++;
-	}
-
+	logicalrep_worker_count(subid, &res, NULL);
 	return res;
 }
 
 /*
- * Count the number of registered (but not necessarily running) parallel apply
- * workers for a subscription.
+ * Count the number of registered (but not necessarily running) sync workers
+ * and parallel apply workers for a subscription.
  */
-static int
-logicalrep_pa_worker_count(Oid subid)
+static void
+logicalrep_worker_count(Oid subid, int *nsync, int *nparallelapply)
 {
-	int			i;
-	int			res = 0;
-
 	Assert(LWLockHeldByMe(LogicalRepWorkerLock));
 
+	if (nsync != NULL)
+		*nsync = 0;
+	if (nparallelapply != NULL)
+		*nparallelapply = 0;
+
 	/*
-	 * Scan all attached parallel apply workers, only counting those which
-	 * have the given subscription id.
+	 * Scan all attached sync and parallel apply workers, only counting those
+	 * which have the given subscription id.
 	 */
-	for (i = 0; i < max_logical_replication_workers; i++)
+	for (int i = 0; i < max_logical_replication_workers; i++)
 	{
 		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
 
-		if (isParallelApplyWorker(w) && w->subid == subid)
-			res++;
+		if (w->subid == subid)
+		{
+			if (nsync != NULL && isTablesyncWorker(w))
+				(*nsync)++;
+			if (nparallelapply != NULL && isParallelApplyWorker(w))
+				(*nparallelapply)++;
+		}
 	}
-
-	return res;
 }
 
 /*
-- 
2.49.0

v2-0001-PG16-Fix-bug-that-could-block-the-startup-of-parallel-.patchtext/plain; charset=UTF-8; name=v2-0001-PG16-Fix-bug-that-could-block-the-startup-of-parallel-.patchDownload
From e5bdcc64258e4b68f42d868b13612b771e153268 Mon Sep 17 00:00:00 2001
From: Fujii Masao <fujii@postgresql.org>
Date: Mon, 28 Apr 2025 14:29:26 +0900
Subject: [PATCH v2] Fix bug that could block the startup of parallel apply
 workers.

If a logical replication worker fails to start and its parent crashes
while waiting, its worker slot can remain marked as "in use".
This can prevent new workers from starting, as the launcher may not
find a free slot or may incorrectly think the sync or parallel apply
worker limits have been reached.

To handle this, the launcher already performs garbage collection
when no free slot is found or when the sync worker limit is hit,
and then retries launching workers. However, it previously did not
trigger garbage collection when the parallel apply worker limit
was reached. As a result, stale slots could block new parallel apply
workers from starting, even though they could have been launched
after cleanup.

This commit fixes the issue by triggering garbage collection
when the parallel apply worker limit is reached as well. If stale slots
are cleared and the number of parallel apply workers drops below
the limit, new parallel apply worker can then be started successfully.

Back-patch to v16, where parallel apply workers were introduced.
---
 src/backend/replication/logical/launcher.c | 65 +++++++++++-----------
 1 file changed, 31 insertions(+), 34 deletions(-)

diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 8395ae7b23c..ebf3220da01 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -102,7 +102,7 @@ static void logicalrep_launcher_onexit(int code, Datum arg);
 static void logicalrep_worker_onexit(int code, Datum arg);
 static void logicalrep_worker_detach(void);
 static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
-static int	logicalrep_pa_worker_count(Oid subid);
+static void logicalrep_worker_count(Oid subid, int *nsync, int *nparallelapply);
 static void logicalrep_launcher_attach_dshmem(void);
 static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
 static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
@@ -350,16 +350,21 @@ retry:
 		}
 	}
 
-	nsyncworkers = logicalrep_sync_worker_count(subid);
+	logicalrep_worker_count(subid, &nsyncworkers, &nparallelapplyworkers);
 
 	now = GetCurrentTimestamp();
 
 	/*
-	 * If we didn't find a free slot, try to do garbage collection.  The
-	 * reason we do this is because if some worker failed to start up and its
-	 * parent has crashed while waiting, the in_use state was never cleared.
+	 * If we can't start a new logical replication background worker because
+	 * no free slot is available, or because the number of sync workers or
+	 * parallel apply workers has reached the limit per subscriptoin, try
+	 * running garbage collection. The reason we do this is because if some
+	 * workers failed to start up and their parent has crashed while waiting,
+	 * the in_use state was never cleared. By freeing up these stale worker
+	 * slots, we may be able to start a new worker.
 	 */
-	if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
+	if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription ||
+		nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
 	{
 		bool		did_cleanup = false;
 
@@ -399,8 +404,6 @@ retry:
 		return false;
 	}
 
-	nparallelapplyworkers = logicalrep_pa_worker_count(subid);
-
 	/*
 	 * Return false if the number of parallel apply workers reached the limit
 	 * per subscription.
@@ -831,48 +834,42 @@ logicalrep_worker_onexit(int code, Datum arg)
 int
 logicalrep_sync_worker_count(Oid subid)
 {
-	int			i;
 	int			res = 0;
 
-	Assert(LWLockHeldByMe(LogicalRepWorkerLock));
-
-	/* Search for attached worker for a given subscription id. */
-	for (i = 0; i < max_logical_replication_workers; i++)
-	{
-		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
-
-		if (w->subid == subid && OidIsValid(w->relid))
-			res++;
-	}
-
+	logicalrep_worker_count(subid, &res, NULL);
 	return res;
 }
 
 /*
- * Count the number of registered (but not necessarily running) parallel apply
- * workers for a subscription.
+ * Count the number of registered (but not necessarily running) sync workers
+ * and parallel apply workers for a subscription.
  */
-static int
-logicalrep_pa_worker_count(Oid subid)
+static void
+logicalrep_worker_count(Oid subid, int *nsync, int *nparallelapply)
 {
-	int			i;
-	int			res = 0;
-
 	Assert(LWLockHeldByMe(LogicalRepWorkerLock));
 
+	if (nsync != NULL)
+		*nsync = 0;
+	if (nparallelapply != NULL)
+		*nparallelapply = 0;
+
 	/*
-	 * Scan all attached parallel apply workers, only counting those which
-	 * have the given subscription id.
+	 * Scan all attached sync and parallel apply workers, only counting those
+	 * which have the given subscription id.
 	 */
-	for (i = 0; i < max_logical_replication_workers; i++)
+	for (int i = 0; i < max_logical_replication_workers; i++)
 	{
 		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
 
-		if (w->subid == subid && isParallelApplyWorker(w))
-			res++;
+		if (w->subid == subid)
+		{
+			if (nsync != NULL && OidIsValid(w->relid))
+				(*nsync)++;
+			if (nparallelapply != NULL && isParallelApplyWorker(w))
+				(*nparallelapply)++;
+		}
 	}
-
-	return res;
 }
 
 /*
-- 
2.49.0

v2-0002-Optimize-slot-reuse-after-garbage-collection-in-l.patchtext/plain; charset=UTF-8; name=v2-0002-Optimize-slot-reuse-after-garbage-collection-in-l.patchDownload
From e3e8335fede955bda99fc896f1f44a8249113e39 Mon Sep 17 00:00:00 2001
From: Fujii Masao <fujii@postgresql.org>
Date: Mon, 28 Apr 2025 17:12:47 +0900
Subject: [PATCH v2 2/2] Optimize slot reuse after garbage collection in
 logicalrep_worker_launch().

Previously, when logicalrep_worker_launch() ran garbage collection and
cleaned up at least one worker slot, it would rescan all worker slots to
find a free one. However, since it is guaranteed that at least one slot was
freed in this case, this additional scan was unnecessary.

This commit removes the redundant scan and makes logicalrep_worker_launch()
immediately reuse the freed slot.
---
 src/backend/replication/logical/launcher.c | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)

diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index ac95afe4bae..400f06de5af 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -336,7 +336,6 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype,
 	 */
 	LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
 
-retry:
 	/* Find unused worker slot. */
 	for (i = 0; i < max_logical_replication_workers; i++)
 	{
@@ -386,11 +385,21 @@ retry:
 
 				logicalrep_worker_cleanup(w);
 				did_cleanup = true;
+
+				if (worker == NULL)
+				{
+					worker = w;
+					slot = i;
+				}
 			}
 		}
 
+		/*
+		 * Count the current number of sync and parallel apply workers again,
+		 * since garbage collection may have changed it.
+		 */
 		if (did_cleanup)
-			goto retry;
+			logicalrep_worker_count(subid, &nsyncworkers, &nparallelapplyworkers);
 	}
 
 	/*
-- 
2.49.0

#4Amit Kapila
amit.kapila16@gmail.com
In reply to: Fujii Masao (#3)
Re: Questions about logicalrep_worker_launch()

On Mon, Apr 28, 2025 at 2:37 PM Fujii Masao <masao.fujii@oss.nttdata.com> wrote:

On 2025/04/26 3:03, Masahiko Sawada wrote:

I agree with these changes.

I think that while the changes for (2) should be for v19, the changes
for (1) might be treated as a bug fix?

Agreed. I've split the patch into two parts:

0001 is for (1) and is a bug fix that should be back-patched to v16,
where parallel apply workers were introduced. Since it didn't apply
cleanly to v16, I also created a separate patch specifically for v16.

The situation for parallel_apply workers is not as bad as for
tablesync workers because even if the worker for parallel apply is not
available, the apply worker will apply the changes by itself. OTOH, if
the tablesync worker is not available, the tablesync will be pending
till the time a worker for the same is not available. So, I am not
sure if this is a clear cut bug which requires a fix in backbranches.

Additionally, shall we try to reproduce this case for parallel apply workers?

--
With Regards,
Amit Kapila.

#5Fujii Masao
masao.fujii@oss.nttdata.com
In reply to: Amit Kapila (#4)
2 attachment(s)
Re: Questions about logicalrep_worker_launch()

On 2025/04/29 21:21, Amit Kapila wrote:

On Mon, Apr 28, 2025 at 2:37 PM Fujii Masao <masao.fujii@oss.nttdata.com> wrote:

On 2025/04/26 3:03, Masahiko Sawada wrote:

I agree with these changes.

I think that while the changes for (2) should be for v19, the changes
for (1) might be treated as a bug fix?

Agreed. I've split the patch into two parts:

0001 is for (1) and is a bug fix that should be back-patched to v16,
where parallel apply workers were introduced. Since it didn't apply
cleanly to v16, I also created a separate patch specifically for v16.

The situation for parallel_apply workers is not as bad as for
tablesync workers because even if the worker for parallel apply is not
available, the apply worker will apply the changes by itself. OTOH, if
the tablesync worker is not available, the tablesync will be pending
till the time a worker for the same is not available. So, I am not
sure if this is a clear cut bug which requires a fix in backbranches.

I'm fine with treating this as an improvement rather than a bug fix.
In any case, I've registered the patches for the next CommitFest.

The attached patches are the same as before, just rebased for the master branch.

Additionally, shall we try to reproduce this case for parallel apply workers?

I noticed this issue while reading the code, so I haven't actually reproduced it.
Are you saying it's not possible to reproduce this in practice?

Regards,

--
Fujii Masao
Advanced Computing Technology Center
Research and Development Headquarters
NTT DATA CORPORATION

Attachments:

v3-0001-Fix-bug-that-could-block-the-startup-of-parallel-.patchtext/plain; charset=UTF-8; name=v3-0001-Fix-bug-that-could-block-the-startup-of-parallel-.patchDownload
From 134331b89c0c413b20866e25c332b23c253bfa54 Mon Sep 17 00:00:00 2001
From: Fujii Masao <fujii@postgresql.org>
Date: Mon, 28 Apr 2025 14:29:26 +0900
Subject: [PATCH v3 1/2] Fix bug that could block the startup of parallel apply
 workers.

If a logical replication worker fails to start and its parent crashes
while waiting, its worker slot can remain marked as "in use".
This can prevent new workers from starting, as the launcher may not
find a free slot or may incorrectly think the sync or parallel apply
worker limits have been reached.

To handle this, the launcher already performs garbage collection
when no free slot is found or when the sync worker limit is hit,
and then retries launching workers. However, it previously did not
trigger garbage collection when the parallel apply worker limit
was reached. As a result, stale slots could block new parallel apply
workers from starting, even though they could have been launched
after cleanup.

This commit fixes the issue by triggering garbage collection
when the parallel apply worker limit is reached as well. If stale slots
are cleared and the number of parallel apply workers drops below
the limit, new parallel apply worker can then be started successfully.
---
 src/backend/replication/logical/launcher.c | 65 +++++++++++-----------
 1 file changed, 31 insertions(+), 34 deletions(-)

diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 10677da56b2..ac95afe4bae 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -96,7 +96,7 @@ static void logicalrep_launcher_onexit(int code, Datum arg);
 static void logicalrep_worker_onexit(int code, Datum arg);
 static void logicalrep_worker_detach(void);
 static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
-static int	logicalrep_pa_worker_count(Oid subid);
+static void logicalrep_worker_count(Oid subid, int *nsync, int *nparallelapply);
 static void logicalrep_launcher_attach_dshmem(void);
 static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
 static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
@@ -350,16 +350,21 @@ retry:
 		}
 	}
 
-	nsyncworkers = logicalrep_sync_worker_count(subid);
+	logicalrep_worker_count(subid, &nsyncworkers, &nparallelapplyworkers);
 
 	now = GetCurrentTimestamp();
 
 	/*
-	 * If we didn't find a free slot, try to do garbage collection.  The
-	 * reason we do this is because if some worker failed to start up and its
-	 * parent has crashed while waiting, the in_use state was never cleared.
+	 * If we can't start a new logical replication background worker because
+	 * no free slot is available, or because the number of sync workers or
+	 * parallel apply workers has reached the limit per subscriptoin, try
+	 * running garbage collection. The reason we do this is because if some
+	 * workers failed to start up and their parent has crashed while waiting,
+	 * the in_use state was never cleared. By freeing up these stale worker
+	 * slots, we may be able to start a new worker.
 	 */
-	if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
+	if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription ||
+		nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
 	{
 		bool		did_cleanup = false;
 
@@ -399,8 +404,6 @@ retry:
 		return false;
 	}
 
-	nparallelapplyworkers = logicalrep_pa_worker_count(subid);
-
 	/*
 	 * Return false if the number of parallel apply workers reached the limit
 	 * per subscription.
@@ -844,48 +847,42 @@ logicalrep_worker_onexit(int code, Datum arg)
 int
 logicalrep_sync_worker_count(Oid subid)
 {
-	int			i;
 	int			res = 0;
 
-	Assert(LWLockHeldByMe(LogicalRepWorkerLock));
-
-	/* Search for attached worker for a given subscription id. */
-	for (i = 0; i < max_logical_replication_workers; i++)
-	{
-		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
-
-		if (isTablesyncWorker(w) && w->subid == subid)
-			res++;
-	}
-
+	logicalrep_worker_count(subid, &res, NULL);
 	return res;
 }
 
 /*
- * Count the number of registered (but not necessarily running) parallel apply
- * workers for a subscription.
+ * Count the number of registered (but not necessarily running) sync workers
+ * and parallel apply workers for a subscription.
  */
-static int
-logicalrep_pa_worker_count(Oid subid)
+static void
+logicalrep_worker_count(Oid subid, int *nsync, int *nparallelapply)
 {
-	int			i;
-	int			res = 0;
-
 	Assert(LWLockHeldByMe(LogicalRepWorkerLock));
 
+	if (nsync != NULL)
+		*nsync = 0;
+	if (nparallelapply != NULL)
+		*nparallelapply = 0;
+
 	/*
-	 * Scan all attached parallel apply workers, only counting those which
-	 * have the given subscription id.
+	 * Scan all attached sync and parallel apply workers, only counting those
+	 * which have the given subscription id.
 	 */
-	for (i = 0; i < max_logical_replication_workers; i++)
+	for (int i = 0; i < max_logical_replication_workers; i++)
 	{
 		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
 
-		if (isParallelApplyWorker(w) && w->subid == subid)
-			res++;
+		if (w->subid == subid)
+		{
+			if (nsync != NULL && isTablesyncWorker(w))
+				(*nsync)++;
+			if (nparallelapply != NULL && isParallelApplyWorker(w))
+				(*nparallelapply)++;
+		}
 	}
-
-	return res;
 }
 
 /*
-- 
2.49.0

v3-0002-Optimize-slot-reuse-after-garbage-collection-in-l.patchtext/plain; charset=UTF-8; name=v3-0002-Optimize-slot-reuse-after-garbage-collection-in-l.patchDownload
From b43a2ae74c6f6ae5f3a2fcba1d9e722735b55b35 Mon Sep 17 00:00:00 2001
From: Fujii Masao <fujii@postgresql.org>
Date: Mon, 28 Apr 2025 17:12:47 +0900
Subject: [PATCH v3 2/2] Optimize slot reuse after garbage collection in
 logicalrep_worker_launch().

Previously, when logicalrep_worker_launch() ran garbage collection and
cleaned up at least one worker slot, it would rescan all worker slots to
find a free one. However, since it is guaranteed that at least one slot was
freed in this case, this additional scan was unnecessary.

This commit removes the redundant scan and makes logicalrep_worker_launch()
immediately reuse the freed slot.
---
 src/backend/replication/logical/launcher.c | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)

diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index ac95afe4bae..400f06de5af 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -336,7 +336,6 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype,
 	 */
 	LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
 
-retry:
 	/* Find unused worker slot. */
 	for (i = 0; i < max_logical_replication_workers; i++)
 	{
@@ -386,11 +385,21 @@ retry:
 
 				logicalrep_worker_cleanup(w);
 				did_cleanup = true;
+
+				if (worker == NULL)
+				{
+					worker = w;
+					slot = i;
+				}
 			}
 		}
 
+		/*
+		 * Count the current number of sync and parallel apply workers again,
+		 * since garbage collection may have changed it.
+		 */
 		if (did_cleanup)
-			goto retry;
+			logicalrep_worker_count(subid, &nsyncworkers, &nparallelapplyworkers);
 	}
 
 	/*
-- 
2.49.0

#6Amit Kapila
amit.kapila16@gmail.com
In reply to: Fujii Masao (#5)
Re: Questions about logicalrep_worker_launch()

On Thu, May 1, 2025 at 10:27 AM Fujii Masao <masao.fujii@oss.nttdata.com> wrote:

Additionally, shall we try to reproduce this case for parallel apply workers?

I noticed this issue while reading the code, so I haven't actually reproduced it.
Are you saying it's not possible to reproduce this in practice?

No, the idea was that if we are changing the code, it is better to
test it via a testcase instead of changing based on assumptions.

--
With Regards,
Amit Kapila.