From 57402e74fac06e7c875e5f9b941cf893f3f8c6d2 Mon Sep 17 00:00:00 2001 From: Peter Smith Date: Fri, 11 Aug 2023 18:47:29 +1000 Subject: [PATCH v1] logicalrep_worker_launch limit checks --- src/backend/replication/logical/launcher.c | 39 ++++++++++++++++++++++-------- 1 file changed, 29 insertions(+), 10 deletions(-) diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index e231fa7..0fd15f6 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -312,10 +312,11 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, int i; int slot = 0; LogicalRepWorker *worker = NULL; - int nsyncworkers; - int nparallelapplyworkers; TimestampTz now; + bool is_tablesync_worker = OidIsValid(relid); bool is_parallel_apply_worker = (subworker_dsm != DSM_HANDLE_INVALID); + bool at_limit_of_tablesync_workers = false; + bool at_limit_of_parallel_apply_workers = false; /* Sanity check - tablesync worker cannot be a subworker */ Assert(!(is_parallel_apply_worker && OidIsValid(relid))); @@ -350,7 +351,21 @@ retry: } } - nsyncworkers = logicalrep_sync_worker_count(subid); + /* Check if we are at the configured limit for the worker type */ + if (is_tablesync_worker) + { + int n = logicalrep_sync_worker_count(subid); + + if (n >= max_sync_workers_per_subscription) + at_limit_of_tablesync_workers = true; + } + else if (is_parallel_apply_worker) + { + int n = logicalrep_pa_worker_count(subid); + + if (n >= max_parallel_apply_workers_per_subscription) + at_limit_of_parallel_apply_workers = true; + } now = GetCurrentTimestamp(); @@ -358,8 +373,12 @@ retry: * If we didn't find a free slot, try to do garbage collection. The * reason we do this is because if some worker failed to start up and its * parent has crashed while waiting, the in_use state was never cleared. + * + * If the worker type has reached its limit we also want to trigger garbage + * collection. This is in case one of those workers was previously in error + * and maybe now can be re-used. */ - if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription) + if (worker == NULL || at_limit_of_tablesync_workers || at_limit_of_parallel_apply_workers) { bool did_cleanup = false; @@ -393,20 +412,17 @@ retry: * sync worker limit per subscription. So, just return silently as we * might get here because of an otherwise harmless race condition. */ - if (OidIsValid(relid) && nsyncworkers >= max_sync_workers_per_subscription) + if (is_tablesync_worker && at_limit_of_tablesync_workers) { LWLockRelease(LogicalRepWorkerLock); return false; } - nparallelapplyworkers = logicalrep_pa_worker_count(subid); - /* * Return false if the number of parallel apply workers reached the limit * per subscription. */ - if (is_parallel_apply_worker && - nparallelapplyworkers >= max_parallel_apply_workers_per_subscription) + if (is_parallel_apply_worker && at_limit_of_parallel_apply_workers) { LWLockRelease(LogicalRepWorkerLock); return false; @@ -842,7 +858,10 @@ logicalrep_sync_worker_count(Oid subid) Assert(LWLockHeldByMe(LogicalRepWorkerLock)); - /* Search for attached worker for a given subscription id. */ + /* + * Scan all attached tablesync workers, only counting those which + * have the given subscription id. + */ for (i = 0; i < max_logical_replication_workers; i++) { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; -- 1.8.3.1