logicalrep_worker_launch -- counting/checking the worker limits
While reviewing other threads I have been looking more closely at the
the logicalrep_worker_launch() function. IMO the logic of that
function seems not quite right.
Here are a few things I felt are strange:
1. The function knows exactly what type of worker it is launching, but
still, it is calling the worker counting functions
logicalrep_sync_worker_count() and logicalrep_pa_worker_count() even
when launching a worker of a *different* type.
1a. I think should only count/check the tablesync worker limit when
trying to launch a tablesync worker
1b. I think should only count/check the parallel apply worker limit
when trying to launch a parallel apply worker
~
2. There is some condition for attempting the garbage-collection of workers:
/*
* If we didn't find a free slot, try to do garbage collection. The
* reason we do this is because if some worker failed to start up and its
* parent has crashed while waiting, the in_use state was never cleared.
*/
if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
The inclusion of that nsyncworkers check here has very subtle
importance. AFAICT this means that even if we *did* find a free
worker, we still need to do garbage collection just in case one of
those 'in_use' tablesync worker was in error (e.g. crashed after
marked in_use). By garbage-collecting (and then re-counting
nsyncworkers) we might be able to launch the tablesync successfully
instead of just returning that it has maxed out.
2a. IIUC that is all fine. The problem is that I think there should be
exactly the same logic for the parallel apply workers here also.
2b. The comment above should explain more about the reason for the
nsyncworkers condition -- the existing comment doesn't really cover
it.
~
3. There is a wrong cut/paste comment in the body of
logicalrep_sync_worker_count().
That comment should be changed to read similarly to the equivalent
comment in logicalrep_pa_worker_count.
------
PSA a patch to address all these items.
This patch is about making the function logically consistent. Removing
some of the redundant countings should also be more efficient in
theory, but in practice, I think the unnecessary worker loops are too
short (max_logical_replication_workers) for any performance
improvements to be noticeable.
Thoughts?
------
Kind Regards,
Peter Smith.
Fujitsu Australia
Attachments:
v1-0001-logicalrep_worker_launch-limit-checks.patchapplication/octet-stream; name=v1-0001-logicalrep_worker_launch-limit-checks.patchDownload
From 57402e74fac06e7c875e5f9b941cf893f3f8c6d2 Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Fri, 11 Aug 2023 18:47:29 +1000
Subject: [PATCH v1] logicalrep_worker_launch limit checks
---
src/backend/replication/logical/launcher.c | 39 ++++++++++++++++++++++--------
1 file changed, 29 insertions(+), 10 deletions(-)
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index e231fa7..0fd15f6 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -312,10 +312,11 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
int i;
int slot = 0;
LogicalRepWorker *worker = NULL;
- int nsyncworkers;
- int nparallelapplyworkers;
TimestampTz now;
+ bool is_tablesync_worker = OidIsValid(relid);
bool is_parallel_apply_worker = (subworker_dsm != DSM_HANDLE_INVALID);
+ bool at_limit_of_tablesync_workers = false;
+ bool at_limit_of_parallel_apply_workers = false;
/* Sanity check - tablesync worker cannot be a subworker */
Assert(!(is_parallel_apply_worker && OidIsValid(relid)));
@@ -350,7 +351,21 @@ retry:
}
}
- nsyncworkers = logicalrep_sync_worker_count(subid);
+ /* Check if we are at the configured limit for the worker type */
+ if (is_tablesync_worker)
+ {
+ int n = logicalrep_sync_worker_count(subid);
+
+ if (n >= max_sync_workers_per_subscription)
+ at_limit_of_tablesync_workers = true;
+ }
+ else if (is_parallel_apply_worker)
+ {
+ int n = logicalrep_pa_worker_count(subid);
+
+ if (n >= max_parallel_apply_workers_per_subscription)
+ at_limit_of_parallel_apply_workers = true;
+ }
now = GetCurrentTimestamp();
@@ -358,8 +373,12 @@ retry:
* If we didn't find a free slot, try to do garbage collection. The
* reason we do this is because if some worker failed to start up and its
* parent has crashed while waiting, the in_use state was never cleared.
+ *
+ * If the worker type has reached its limit we also want to trigger garbage
+ * collection. This is in case one of those workers was previously in error
+ * and maybe now can be re-used.
*/
- if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
+ if (worker == NULL || at_limit_of_tablesync_workers || at_limit_of_parallel_apply_workers)
{
bool did_cleanup = false;
@@ -393,20 +412,17 @@ retry:
* sync worker limit per subscription. So, just return silently as we
* might get here because of an otherwise harmless race condition.
*/
- if (OidIsValid(relid) && nsyncworkers >= max_sync_workers_per_subscription)
+ if (is_tablesync_worker && at_limit_of_tablesync_workers)
{
LWLockRelease(LogicalRepWorkerLock);
return false;
}
- nparallelapplyworkers = logicalrep_pa_worker_count(subid);
-
/*
* Return false if the number of parallel apply workers reached the limit
* per subscription.
*/
- if (is_parallel_apply_worker &&
- nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
+ if (is_parallel_apply_worker && at_limit_of_parallel_apply_workers)
{
LWLockRelease(LogicalRepWorkerLock);
return false;
@@ -842,7 +858,10 @@ logicalrep_sync_worker_count(Oid subid)
Assert(LWLockHeldByMe(LogicalRepWorkerLock));
- /* Search for attached worker for a given subscription id. */
+ /*
+ * Scan all attached tablesync workers, only counting those which
+ * have the given subscription id.
+ */
for (i = 0; i < max_logical_replication_workers; i++)
{
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
--
1.8.3.1
The previous patch was accidentally not resetting the boolean limit
flags to false for retries.
Fixed in v2.
------
Kind Regards,
Peter Smith.
Fujitsu Australia
Attachments:
v2-0001-logicalrep_worker_launch-limit-checks.patchapplication/octet-stream; name=v2-0001-logicalrep_worker_launch-limit-checks.patchDownload
From bc4838daa127a95eebaca6e5a4420b1d475e94c6 Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Sat, 12 Aug 2023 09:44:25 +1000
Subject: [PATCH v2] logicalrep_worker_launch limit checks
---
src/backend/replication/logical/launcher.c | 36 +++++++++++++++++++++---------
1 file changed, 26 insertions(+), 10 deletions(-)
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index e231fa7..35a4428 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -312,10 +312,10 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
int i;
int slot = 0;
LogicalRepWorker *worker = NULL;
- int nsyncworkers;
- int nparallelapplyworkers;
TimestampTz now;
+ bool is_tablesync_worker = OidIsValid(relid);
bool is_parallel_apply_worker = (subworker_dsm != DSM_HANDLE_INVALID);
+ bool reached_limit_for_type = false;
/* Sanity check - tablesync worker cannot be a subworker */
Assert(!(is_parallel_apply_worker && OidIsValid(relid)));
@@ -350,7 +350,19 @@ retry:
}
}
- nsyncworkers = logicalrep_sync_worker_count(subid);
+ /* Check if we are at the configured limit for the worker type */
+ if (is_tablesync_worker)
+ {
+ int n = logicalrep_sync_worker_count(subid);
+
+ reached_limit_for_type = (n >= max_sync_workers_per_subscription);
+ }
+ else if (is_parallel_apply_worker)
+ {
+ int n = logicalrep_pa_worker_count(subid);
+
+ reached_limit_for_type = (n >= max_parallel_apply_workers_per_subscription);
+ }
now = GetCurrentTimestamp();
@@ -358,8 +370,12 @@ retry:
* If we didn't find a free slot, try to do garbage collection. The
* reason we do this is because if some worker failed to start up and its
* parent has crashed while waiting, the in_use state was never cleared.
+ *
+ * If the worker type has reached its limit we also want to trigger garbage
+ * collection. This is in case one of those workers was previously in error
+ * and maybe now can be re-used.
*/
- if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
+ if (worker == NULL || reached_limit_for_type)
{
bool did_cleanup = false;
@@ -393,20 +409,17 @@ retry:
* sync worker limit per subscription. So, just return silently as we
* might get here because of an otherwise harmless race condition.
*/
- if (OidIsValid(relid) && nsyncworkers >= max_sync_workers_per_subscription)
+ if (is_tablesync_worker && reached_limit_for_type)
{
LWLockRelease(LogicalRepWorkerLock);
return false;
}
- nparallelapplyworkers = logicalrep_pa_worker_count(subid);
-
/*
* Return false if the number of parallel apply workers reached the limit
* per subscription.
*/
- if (is_parallel_apply_worker &&
- nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
+ if (is_parallel_apply_worker && reached_limit_for_type)
{
LWLockRelease(LogicalRepWorkerLock);
return false;
@@ -842,7 +855,10 @@ logicalrep_sync_worker_count(Oid subid)
Assert(LWLockHeldByMe(LogicalRepWorkerLock));
- /* Search for attached worker for a given subscription id. */
+ /*
+ * Scan all attached tablesync workers, only counting those which
+ * have the given subscription id.
+ */
for (i = 0; i < max_logical_replication_workers; i++)
{
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
--
1.8.3.1
On Fri, Aug 11, 2023 at 2:29 PM Peter Smith <smithpb2250@gmail.com> wrote:
While reviewing other threads I have been looking more closely at the
the logicalrep_worker_launch() function. IMO the logic of that
function seems not quite right.Here are a few things I felt are strange:
...
2. There is some condition for attempting the garbage-collection of workers:
/*
* If we didn't find a free slot, try to do garbage collection. The
* reason we do this is because if some worker failed to start up and its
* parent has crashed while waiting, the in_use state was never cleared.
*/
if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)The inclusion of that nsyncworkers check here has very subtle
importance. AFAICT this means that even if we *did* find a free
worker, we still need to do garbage collection just in case one of
those 'in_use' tablesync worker was in error (e.g. crashed after
marked in_use). By garbage-collecting (and then re-counting
nsyncworkers) we might be able to launch the tablesync successfully
instead of just returning that it has maxed out.2a. IIUC that is all fine. The problem is that I think there should be
exactly the same logic for the parallel apply workers here also.
Did you try to reproduce this condition, if not, can you please try it
once? I wonder if the leader worker crashed, won't it lead to a
restart of the server?
--
With Regards,
Amit Kapila.
A rebase was needed due to a recent push [1]https://github.com/postgres/postgres/commit/2a8b40e3681921943a2989fd4ec6cdbf8766566c.
PSA v3.
------
[1]: https://github.com/postgres/postgres/commit/2a8b40e3681921943a2989fd4ec6cdbf8766566c
Kind Regards,
Peter Smith.
Fujitsu Australia
Attachments:
v3-0001-logicalrep_worker_launch-limit-checks.patchapplication/octet-stream; name=v3-0001-logicalrep_worker_launch-limit-checks.patchDownload
From d9a8efd395daf14f7ac945c0106a289902958653 Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Tue, 15 Aug 2023 12:35:30 +1000
Subject: [PATCH v3] logicalrep_worker_launch limit checks
---
src/backend/replication/logical/launcher.c | 35 +++++++++++++++++++++---------
1 file changed, 25 insertions(+), 10 deletions(-)
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 7cc0a16..81f7af4 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -313,11 +313,10 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype,
int i;
int slot = 0;
LogicalRepWorker *worker = NULL;
- int nsyncworkers;
- int nparallelapplyworkers;
TimestampTz now;
bool is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
bool is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
+ bool reached_limit_for_type = false;
/*----------
* Sanity checks:
@@ -359,7 +358,19 @@ retry:
}
}
- nsyncworkers = logicalrep_sync_worker_count(subid);
+ /* Check if we are at the configured limit for the worker type */
+ if (is_tablesync_worker)
+ {
+ int n = logicalrep_sync_worker_count(subid);
+
+ reached_limit_for_type = (n >= max_sync_workers_per_subscription);
+ }
+ else if (is_parallel_apply_worker)
+ {
+ int n = logicalrep_pa_worker_count(subid);
+
+ reached_limit_for_type = (n >= max_parallel_apply_workers_per_subscription);
+ }
now = GetCurrentTimestamp();
@@ -367,8 +378,12 @@ retry:
* If we didn't find a free slot, try to do garbage collection. The
* reason we do this is because if some worker failed to start up and its
* parent has crashed while waiting, the in_use state was never cleared.
+ *
+ * If the worker type has reached its limit we also want to trigger garbage
+ * collection. This is in case one of those workers was previously in error
+ * and maybe now can be re-used.
*/
- if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
+ if (worker == NULL || reached_limit_for_type)
{
bool did_cleanup = false;
@@ -402,20 +417,17 @@ retry:
* sync worker limit per subscription. So, just return silently as we
* might get here because of an otherwise harmless race condition.
*/
- if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription)
+ if (is_tablesync_worker && reached_limit_for_type)
{
LWLockRelease(LogicalRepWorkerLock);
return false;
}
- nparallelapplyworkers = logicalrep_pa_worker_count(subid);
-
/*
* Return false if the number of parallel apply workers reached the limit
* per subscription.
*/
- if (is_parallel_apply_worker &&
- nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
+ if (is_parallel_apply_worker && reached_limit_for_type)
{
LWLockRelease(LogicalRepWorkerLock);
return false;
@@ -852,7 +864,10 @@ logicalrep_sync_worker_count(Oid subid)
Assert(LWLockHeldByMe(LogicalRepWorkerLock));
- /* Search for attached worker for a given subscription id. */
+ /*
+ * Scan all attached tablesync workers, only counting those which
+ * have the given subscription id.
+ */
for (i = 0; i < max_logical_replication_workers; i++)
{
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
--
1.8.3.1
On Tue, 15 Aug 2023 at 08:09, Peter Smith <smithpb2250@gmail.com> wrote:
A rebase was needed due to a recent push [1].
I have changed the status of the patch to "Waiting on Author" as
Amit's queries at [1]/messages/by-id/CAA4eK1LtFyiMV6e9+Rr66pe5e-MX7Pk6N3iHd4JgcBW1X4kS6Q@mail.gmail.com have not been verified and concluded. Please
feel free to address them and change the status back again.
[1]: /messages/by-id/CAA4eK1LtFyiMV6e9+Rr66pe5e-MX7Pk6N3iHd4JgcBW1X4kS6Q@mail.gmail.com
Regards,
Vignesh
On 15 Aug 2023, at 07:38, Peter Smith <smithpb2250@gmail.com> wrote:
A rebase was needed due to a recent push [1].
PSA v3.
On 14 Jan 2024, at 10:43, vignesh C <vignesh21@gmail.com> wrote:
I have changed the status of the patch to "Waiting on Author" as
Amit's queries at [1] have not been verified and concluded. Please
feel free to address them and change the status back again.
Hi Peter!
Are you still interested in this thread? If so - please post an answer to Amit's question.
If you are not interested - please Withdraw a CF entry [0]https://commitfest.postgresql.org/47/4499/.
Thanks!
Best regards, Andrey Borodin.
On Sun, Mar 31, 2024 at 8:12 PM Andrey M. Borodin <x4mmm@yandex-team.ru> wrote:
On 15 Aug 2023, at 07:38, Peter Smith <smithpb2250@gmail.com> wrote:
A rebase was needed due to a recent push [1].
PSA v3.
On 14 Jan 2024, at 10:43, vignesh C <vignesh21@gmail.com> wrote:
I have changed the status of the patch to "Waiting on Author" as
Amit's queries at [1] have not been verified and concluded. Please
feel free to address them and change the status back again.Hi Peter!
Are you still interested in this thread? If so - please post an answer to Amit's question.
If you are not interested - please Withdraw a CF entry [0].Thanks!
Yeah, sorry for the long period of inactivity on this thread. Although
I still have some interest in it, I don't know when I'll get back to
it again so meantime I've withdrawn this from the CF as requested.
Kind regards,
Peter Smith
Fujitsu Australia