Wait for parallel workers to attach
During the recent development of parallel operation (parallel create
index)[1]/messages/by-id/CAA4eK1KgmdT3ivm1vG+rJzKOKeYQU2XLhp6ma5LMHxaG89brsA@mail.gmail.com, a need has been arised for $SUBJECT. The idea is to allow
leader backend to rely on number of workers that are successfully
started. This API allows leader to wait for all the workers to start
or fail even if one of the workers fails to attach. We consider
workers started/attached once they are attached to error queue. This
will ensure that any error after the workers are attached won't be
silently ignored by leader.
I have used wait event as WAIT_EVENT_BGWORKER_STARTUP similar to
WaitForReplicationWorkerAttach, but we might want to change it.
I have tested this patch by calling this API in nodeGather.c and then
introducing failuires at various places: (a) induce fork failure for
background workers (force_fork_failure_v1.patch), (b) Exit parallel
worker before attaching to the error queue
(exit_parallel_worker_before_error_queue_attach_v1.patch) and (c) Exit
parallel worker after attaching to the error queue
(exit_parallel_worker_after_error_queue_attach_v1.patch).
In all above cases, I got the errors as expected.
[1]: /messages/by-id/CAA4eK1KgmdT3ivm1vG+rJzKOKeYQU2XLhp6ma5LMHxaG89brsA@mail.gmail.com
--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com
Attachments:
WaitForParallelWorkersToAttach_v1.patchapplication/octet-stream; name=WaitForParallelWorkersToAttach_v1.patchDownload
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index 54d9ea7..8920134 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -553,6 +553,114 @@ LaunchParallelWorkers(ParallelContext *pcxt)
}
/*
+ * Wait for all workers to start or fail even if one of the workers fails to
+ * attach.
+ */
+void
+WaitForParallelWorkersToAttach(ParallelContext *pcxt)
+{
+ int i;
+ int nknown_started_workers = 0;
+ bool *known_started_workers;
+
+ known_started_workers =
+ (bool *) palloc0(pcxt->nworkers_launched * sizeof(bool));
+
+ for (;;)
+ {
+ /*
+ * This will process any parallel messages that are pending and it may
+ * also throw an error propagated from a worker.
+ */
+ CHECK_FOR_INTERRUPTS();
+
+ for (i = 0; i < pcxt->nworkers_launched; ++i)
+ {
+ BgwHandleStatus status;
+ shm_mq *mq;
+ int rc;
+ pid_t pid;
+
+ if (!known_started_workers[i])
+ {
+ /*
+ * If error_mqh is NULL, then the worker has already exited
+ * cleanly.
+ */
+ if (pcxt->worker[i].error_mqh == NULL)
+ {
+ known_started_workers[i] = true;
+ ++nknown_started_workers;
+ continue;
+ }
+
+ status = GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle, &pid);
+ if (status == BGWH_STARTED)
+ {
+ /* Check if worker is attached to error queue? */
+ mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
+ if (shm_mq_get_sender(mq) != NULL)
+ {
+ known_started_workers[i] = true;
+ ++nknown_started_workers;
+ }
+ }
+ else if (status == BGWH_STOPPED)
+ {
+ /*
+ * Check whether the worker ended up stopped without ever
+ * attaching to the error queue. If so, the postmaster
+ * was unable to fork the worker or it exited without
+ * initializing properly. We must throw an error, since
+ * the caller may have been expecting the worker to do
+ * some work before exiting.
+ */
+ mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
+ if (shm_mq_get_sender(mq) == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("parallel worker failed to initialize"),
+ errhint("More details may be available in the server log.")));
+
+ known_started_workers[i] = true;
+ ++nknown_started_workers;
+ }
+ else
+ {
+ /*
+ * Worker not yet started, so it is better to wait for
+ * sometime rather than busy looping. We need timeout
+ * because we generally don't get notified via latch about
+ * the worker attach. But we don't expect to have to wait
+ * long.
+ */
+ rc = WaitLatch(MyLatch,
+ WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
+ 10L, WAIT_EVENT_BGWORKER_STARTUP);
+
+ /* emergency bailout if postmaster has died */
+ if (rc & WL_POSTMASTER_DEATH)
+ proc_exit(1);
+
+ if (rc & WL_LATCH_SET)
+ ResetLatch(MyLatch);
+ }
+ }
+ }
+
+ /* If all workers are known to have started, we're done. */
+ if (nknown_started_workers >= pcxt->nworkers_launched)
+ {
+ Assert(nknown_started_workers == pcxt->nworkers_launched);
+ break;
+ }
+ }
+
+ /* be tidy */
+ pfree(known_started_workers);
+}
+
+/*
* Wait for all workers to finish computing.
*
* Even if the parallel operation seems to have completed successfully, it's
diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h
index 32c2e32..98b5f76 100644
--- a/src/include/access/parallel.h
+++ b/src/include/access/parallel.h
@@ -62,6 +62,7 @@ extern ParallelContext *CreateParallelContext(const char *library_name, const ch
extern void InitializeParallelDSM(ParallelContext *pcxt);
extern void ReinitializeParallelDSM(ParallelContext *pcxt);
extern void LaunchParallelWorkers(ParallelContext *pcxt);
+extern void WaitForParallelWorkersToAttach(ParallelContext *pcxt);
extern void WaitForParallelWorkersToFinish(ParallelContext *pcxt);
extern void DestroyParallelContext(ParallelContext *pcxt);
extern bool ParallelContextActive(void);
modify_gather_to_wait_for_attach_v1.patchapplication/octet-stream; name=modify_gather_to_wait_for_attach_v1.patchDownload
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index 89266b5..384817c 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -173,6 +173,7 @@ ExecGather(PlanState *pstate)
*/
pcxt = node->pei->pcxt;
LaunchParallelWorkers(pcxt);
+ WaitForParallelWorkersToAttach(pcxt);
/* We save # workers launched for the benefit of EXPLAIN */
node->nworkers_launched = pcxt->nworkers_launched;
force_fork_failure_v1.patchapplication/octet-stream; name=force_fork_failure_v1.patchDownload
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index f3ddf82..2a0a60d 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -5699,9 +5699,9 @@ do_start_bgworker(RegisteredBgWorker *rw)
rw->rw_worker.bgw_name)));
#ifdef EXEC_BACKEND
- switch ((worker_pid = bgworker_forkexec(rw->rw_shmem_slot)))
+ switch (worker_pid = -1)
#else
- switch ((worker_pid = fork_process()))
+ switch (worker_pid = -1)
#endif
{
case -1:
exit_parallel_worker_before_error_queue_attach_v1.patchapplication/octet-stream; name=exit_parallel_worker_before_error_queue_attach_v1.patchDownload
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index 8920134..138c926 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -1221,6 +1221,8 @@ ParallelWorkerMain(Datum main_arg)
ParallelMasterBackendId = fps->parallel_master_backend_id;
on_shmem_exit(ParallelWorkerShutdown, (Datum) 0);
+ proc_exit(1);
+
/*
* Now we can find and attach to the error queue provided for us. That's
* good, because until we do that, any errors that happen here will not be
exit_parallel_worker_after_error_queue_attach_v1.patchapplication/octet-stream; name=exit_parallel_worker_after_error_queue_attach_v1.patchDownload
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index 8920134..886f03c 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -1236,6 +1236,8 @@ ParallelWorkerMain(Datum main_arg)
pq_set_parallel_master(fps->parallel_master_pid,
fps->parallel_master_backend_id);
+ proc_exit(1);
+
/*
* Send a BackendKeyData message to the process that initiated parallelism
* so that it has access to our PID before it receives any other messages
On Sat, Jan 27, 2018 at 12:14 AM, Amit Kapila <amit.kapila16@gmail.com> wrote:
During the recent development of parallel operation (parallel create
index)[1], a need has been arised for $SUBJECT. The idea is to allow
leader backend to rely on number of workers that are successfully
started. This API allows leader to wait for all the workers to start
or fail even if one of the workers fails to attach. We consider
workers started/attached once they are attached to error queue. This
will ensure that any error after the workers are attached won't be
silently ignored by leader.
I've modified my working copy of the parallel CREATE INDEX patch to
call WaitForParallelWorkersToAttach(), just after the leader
participates as a worker.
I have tested this patch by calling this API in nodeGather.c and then
introducing failuires at various places: (a) induce fork failure for
background workers (force_fork_failure_v1.patch), (b) Exit parallel
worker before attaching to the error queue
(exit_parallel_worker_before_error_queue_attach_v1.patch) and (c) Exit
parallel worker after attaching to the error queue
(exit_parallel_worker_after_error_queue_attach_v1.patch).In all above cases, I got the errors as expected.
I also found that all of these errors were propagated. The patch helps
parallel CREATE INDEX as expected/designed.
Some small things that I noticed about the patch:
* Maybe "if (!known_started_workers[i])" should be converted to "if
(known_started_workers[i]) continue;", to decrease the indentation
level of the WaitForParallelWorkersToAttach() loop.
* There might be some opportunity to share some of the new code with
the code recently committed to WaitForParallelWorkersToFinish(). For
one thing, the logic in this block could be refactored into a
dedicated function that is called by both
WaitForParallelWorkersToAttach() and WaitForParallelWorkersToFinish():
+ else if (status == BGWH_STOPPED) + { + /* + * Check whether the worker ended up stopped without ever + * attaching to the error queue. If so, the postmaster + * was unable to fork the worker or it exited without + * initializing properly. We must throw an error, since + * the caller may have been expecting the worker to do + * some work before exiting. + */ + mq = shm_mq_get_queue(pcxt->worker[i].error_mqh); + if (shm_mq_get_sender(mq) == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("parallel worker failed to initialize"), + errhint("More details may be available in the server log."))); + + known_started_workers[i] = true; + ++nknown_started_workers; + }
* If we don't actually commit the patch to make nodeGather.c call
WaitForParallelWorkersToAttach(), which I suspect will happen, then I
think we should instead at least have a comment saying why it's okay
that we don't call WaitForParallelWorkersToAttach(). If we go this
way, the comment should directly replace the
modify_gather_to_wait_for_attach_v1.patch call to
WaitForParallelWorkersToAttach() -- this comment should go in
ExecGather().
* Maybe the comments at the top of WaitForParallelWorkersToAttach()
should at least allude to the ExecGather() special case I just
mentioned.
* Maybe the comments at the top of WaitForParallelWorkersToAttach()
should also advise callers that it's a good idea to try to do other
leader-only work that doesn't involve a WaitLatch() before they call.
In general, WaitForParallelWorkersToAttach() needs to be called before
any WaitLatch() (or barrier wait, or condition variable wait) that
waits on workers, and after workers are first launched, but callers
may be able to arrange to do plenty of other work, just like nbtsort.c
does. To be clear: IMHO calling WaitForParallelWorkersToAttach()
should be the rule, not the exception.
* Finally, perhaps the comments at the top of
WaitForParallelWorkersToAttach() should also describe how it relates
to WaitForParallelWorkersToFinish().
ISTM that WaitForParallelWorkersToAttach() is a subset of
WaitForParallelWorkersToFinish(), that does all that is needed to rely
on nworkers_launched actually being the number of worker processes
that are attached to the error queue. As such, caller can expect
propagation of errors from workers using standard parallel message
interrupts once WaitForParallelWorkersToAttach() returns. You probably
shouldn't directly reference nworkers_launched, though, since that
doesn't necessarily have to be involved for parallel client code to
run into trouble. (You only need to wait on workers changing something
in shared memory, and failing to actively inform leader of failure
through a parallel message -- this might not involve testing
nworkers_launched in the way parallel CREATE INDEX happens to.)
All in all, I'm very pleased with where this leaves parallel CREATE
INDEX. I hope that Robert can review and commit this patch quickly, so
that I can use the new infrastructure. I can then post what I hope to
be the final revision of parallel CREATE INDEX. ISTM that the question
of handling things like parallel worker fork() failure is the only
real blocker to committing parallel CREATE INDEX, since we've reached
agreement on all other issues.
Thanks
--
Peter Geoghegan
On Sat, Jan 27, 2018 at 3:14 AM, Amit Kapila <amit.kapila16@gmail.com> wrote:
During the recent development of parallel operation (parallel create
index)[1], a need has been arised for $SUBJECT. The idea is to allow
leader backend to rely on number of workers that are successfully
started. This API allows leader to wait for all the workers to start
or fail even if one of the workers fails to attach. We consider
workers started/attached once they are attached to error queue. This
will ensure that any error after the workers are attached won't be
silently ignored by leader.
known_started_workers looks a lot like any_message_received. Perhaps
any_message_received should be renamed to known_started_workers and
reused here. After all, if we know that a worker was started, there's
no need for WaitForParallelWorkersToFinish to again call
GetBackgroundWorkerPid() for it.
I think that you shouldn't need the 10ms delay loop; waiting forever
should work. If a work fails to start, the postmaster should send
SIGUSR1 which should set our latch.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
On Mon, Jan 29, 2018 at 8:25 PM, Robert Haas <robertmhaas@gmail.com> wrote:
On Sat, Jan 27, 2018 at 3:14 AM, Amit Kapila <amit.kapila16@gmail.com> wrote:
During the recent development of parallel operation (parallel create
index)[1], a need has been arised for $SUBJECT. The idea is to allow
leader backend to rely on number of workers that are successfully
started. This API allows leader to wait for all the workers to start
or fail even if one of the workers fails to attach. We consider
workers started/attached once they are attached to error queue. This
will ensure that any error after the workers are attached won't be
silently ignored by leader.known_started_workers looks a lot like any_message_received. Perhaps
any_message_received should be renamed to known_started_workers and
reused here.
Sure, that sounds good to me. Do you prefer a separate patch for
renaming any_message_received to known_started_workers or is it okay
to have it along with the main patch?
After all, if we know that a worker was started, there's
no need for WaitForParallelWorkersToFinish to again call
GetBackgroundWorkerPid() for it.
I think in above sentence you wanted to say
WaitForParallelWorkersToAttach, not WaitForParallelWorkersToFinish.
Am I right?
I think that you shouldn't need the 10ms delay loop; waiting forever
should work. If a work fails to start, the postmaster should send
SIGUSR1 which should set our latch.
I am not getting what exactly you are suggesting here. The wait loop
is intended for the case when some workers are not started. We want
to wait for sometime before checking again whether workers are
started. I wanted to avoid busy looping waiting for some worker to
start. I think in most cases we don't need to wait, but for some
corner cases where postmaster didn't get chance to start a worker, we
should avoid busy looping waiting for a worker to start.
--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com
On Tue, Jan 30, 2018 at 10:10 PM, Amit Kapila <amit.kapila16@gmail.com> wrote:
known_started_workers looks a lot like any_message_received. Perhaps
any_message_received should be renamed to known_started_workers and
reused here.Sure, that sounds good to me. Do you prefer a separate patch for
renaming any_message_received to known_started_workers or is it okay
to have it along with the main patch?
A single patch sounds OK.
After all, if we know that a worker was started, there's
no need for WaitForParallelWorkersToFinish to again call
GetBackgroundWorkerPid() for it.I think in above sentence you wanted to say
WaitForParallelWorkersToAttach, not WaitForParallelWorkersToFinish.
Am I right?
Yes.
I think that you shouldn't need the 10ms delay loop; waiting forever
should work. If a work fails to start, the postmaster should send
SIGUSR1 which should set our latch.I am not getting what exactly you are suggesting here. The wait loop
is intended for the case when some workers are not started. We want
to wait for sometime before checking again whether workers are
started. I wanted to avoid busy looping waiting for some worker to
start. I think in most cases we don't need to wait, but for some
corner cases where postmaster didn't get chance to start a worker, we
should avoid busy looping waiting for a worker to start.
I agree we need to avoid busy-looping. What I'm saying is that we
don't need a timeout. Why do you think we need a timeout? We have
bgw_notify_pid so that we will get a signal instead of having to poll.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
On Wed, Jan 31, 2018 at 8:46 AM, Robert Haas <robertmhaas@gmail.com> wrote:
On Tue, Jan 30, 2018 at 10:10 PM, Amit Kapila <amit.kapila16@gmail.com> wrote:
I am not getting what exactly you are suggesting here. The wait loop
is intended for the case when some workers are not started. We want
to wait for sometime before checking again whether workers are
started. I wanted to avoid busy looping waiting for some worker to
start. I think in most cases we don't need to wait, but for some
corner cases where postmaster didn't get chance to start a worker, we
should avoid busy looping waiting for a worker to start.I agree we need to avoid busy-looping. What I'm saying is that we
don't need a timeout. Why do you think we need a timeout?
I thought we need it for worker startup, but now after again looking
at the code, it seems we do notify at worker startup as well. So, we
don't need a timeout.
--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com
On Mon, Jan 29, 2018 at 4:01 AM, Peter Geoghegan <pg@bowt.ie> wrote:
On Sat, Jan 27, 2018 at 12:14 AM, Amit Kapila <amit.kapila16@gmail.com> wrote:
I also found that all of these errors were propagated. The patch helps
parallel CREATE INDEX as expected/designed.
Great!
Some small things that I noticed about the patch:
* Maybe "if (!known_started_workers[i])" should be converted to "if
(known_started_workers[i]) continue;", to decrease the indentation
level of the WaitForParallelWorkersToAttach() loop.
Changed as per suggestion.
* There might be some opportunity to share some of the new code with
the code recently committed to WaitForParallelWorkersToFinish(). For
one thing, the logic in this block could be refactored into a
dedicated function that is called by both
WaitForParallelWorkersToAttach() and WaitForParallelWorkersToFinish():+ else if (status == BGWH_STOPPED) + { + /* + * Check whether the worker ended up stopped without ever + * attaching to the error queue. If so, the postmaster + * was unable to fork the worker or it exited without + * initializing properly. We must throw an error, since + * the caller may have been expecting the worker to do + * some work before exiting. + */ + mq = shm_mq_get_queue(pcxt->worker[i].error_mqh); + if (shm_mq_get_sender(mq) == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("parallel worker failed to initialize"), + errhint("More details may be available in the server log."))); + + known_started_workers[i] = true; + ++nknown_started_workers; + }
I had thought about this earlier but left it as the common code was
too less, however as you have pointed out, I had extracted the common
code into a separate function.
* If we don't actually commit the patch to make nodeGather.c call
WaitForParallelWorkersToAttach(), which I suspect will happen, then I
think we should instead at least have a comment saying why it's okay
that we don't call WaitForParallelWorkersToAttach(). If we go this
way, the comment should directly replace the
modify_gather_to_wait_for_attach_v1.patch call to
WaitForParallelWorkersToAttach() -- this comment should go in
ExecGather().* Maybe the comments at the top of WaitForParallelWorkersToAttach()
should at least allude to the ExecGather() special case I just
mentioned.
I think we should not touch anything related to Gather (merge) as they
don't need it for the purpose of correctness. However, we might want
to improve them by using this new API at a certain point if the need
arises. I guess we can use this API to detect failures early.
* Maybe the comments at the top of WaitForParallelWorkersToAttach()
should also advise callers that it's a good idea to try to do other
leader-only work that doesn't involve a WaitLatch() before they call.In general, WaitForParallelWorkersToAttach() needs to be called before
any WaitLatch() (or barrier wait, or condition variable wait) that
waits on workers, and after workers are first launched, but callers
may be able to arrange to do plenty of other work, just like nbtsort.c
does. To be clear: IMHO calling WaitForParallelWorkersToAttach()
should be the rule, not the exception.* Finally, perhaps the comments at the top of
WaitForParallelWorkersToAttach() should also describe how it relates
to WaitForParallelWorkersToFinish().ISTM that WaitForParallelWorkersToAttach() is a subset of
WaitForParallelWorkersToFinish(), that does all that is needed to rely
on nworkers_launched actually being the number of worker processes
that are attached to the error queue. As such, caller can expect
propagation of errors from workers using standard parallel message
interrupts once WaitForParallelWorkersToAttach() returns. You probably
shouldn't directly reference nworkers_launched, though, since that
doesn't necessarily have to be involved for parallel client code to
run into trouble. (You only need to wait on workers changing something
in shared memory, and failing to actively inform leader of failure
through a parallel message -- this might not involve testing
nworkers_launched in the way parallel CREATE INDEX happens to.)
I have updated the comments atop WaitForParallelWorkersToAttach() to
address your above two points.
known_started_workers looks a lot like any_message_received. Perhaps any_message_received should be renamed to known_started_workers and reused here. After all, if we know that a
worker was started, there's no need for WaitForParallelWorkersToFinish to again call GetBackgroundWorkerPid() for it.
Changed as per above suggestion.
I think that you shouldn't need the 10ms delay loop; waiting forever should work.
As discussed, I have changed the code as per your suggestion.
--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com
Attachments:
WaitForParallelWorkersToAttach_v2.patchapplication/octet-stream; name=WaitForParallelWorkersToAttach_v2.patchDownload
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index 54d9ea7..bdece27 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -133,6 +133,7 @@ static const struct
};
/* Private functions. */
+static bool CheckParallelWorkerAttachedToEQ(ParallelContext *pcxt, int i);
static void HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg);
static void WaitForParallelWorkersToExit(ParallelContext *pcxt);
static parallel_worker_main_type LookupParallelWorkerFunction(const char *libraryname, const char *funcname);
@@ -437,10 +438,11 @@ ReinitializeParallelDSM(ParallelContext *pcxt)
WaitForParallelWorkersToFinish(pcxt);
WaitForParallelWorkersToExit(pcxt);
pcxt->nworkers_launched = 0;
- if (pcxt->any_message_received)
+ if (pcxt->known_started_workers)
{
- pfree(pcxt->any_message_received);
- pcxt->any_message_received = NULL;
+ pfree(pcxt->known_started_workers);
+ pcxt->known_started_workers = NULL;
+ pcxt->nknown_started_workers = 0;
}
}
@@ -542,17 +544,119 @@ LaunchParallelWorkers(ParallelContext *pcxt)
/*
* Now that nworkers_launched has taken its final value, we can initialize
- * any_message_received.
+ * known_started_workers.
*/
if (pcxt->nworkers_launched > 0)
- pcxt->any_message_received =
+ {
+ pcxt->known_started_workers =
palloc0(sizeof(bool) * pcxt->nworkers_launched);
+ pcxt->nknown_started_workers = 0;
+ }
/* Restore previous memory context. */
MemoryContextSwitchTo(oldcontext);
}
/*
+ * Wait for all workers to start or fail even if one of the workers fails to
+ * attach. This is a performance sensitive API as here we wait for all the
+ * launched workers to start. So caller should use this API cautiously.
+ * For example, the caller can arrange to do a lot of work in the leader which
+ * can be done without doing any sort of wait.
+ *
+ * This API must be used after the caller has launched parallel workers. This
+ * has some similarity with WaitForParallelWorkersToFinish such that if any
+ * worker fails to attach to the error queue, it will return an error.
+ * However, unlike WaitForParallelWorkersToFinish which waits for workers to
+ * finish computing, this just waits for workers to start and attach to the
+ * error queue.
+ */
+void
+WaitForParallelWorkersToAttach(ParallelContext *pcxt)
+{
+ int i;
+
+ /* Skip this if we have no launched workers. */
+ if (pcxt->nworkers_launched == 0)
+ return;
+
+ for (;;)
+ {
+ /*
+ * This will process any parallel messages that are pending and it may
+ * also throw an error propagated from a worker.
+ */
+ CHECK_FOR_INTERRUPTS();
+
+ for (i = 0; i < pcxt->nworkers_launched; ++i)
+ {
+ BgwHandleStatus status;
+ shm_mq *mq;
+ int rc;
+ pid_t pid;
+
+ if (pcxt->known_started_workers[i])
+ continue;
+
+ /*
+ * If error_mqh is NULL, then the worker has already exited
+ * cleanly.
+ */
+ if (pcxt->worker[i].error_mqh == NULL)
+ {
+ pcxt->known_started_workers[i] = true;
+ ++pcxt->nknown_started_workers;
+ continue;
+ }
+
+ status = GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle, &pid);
+ if (status == BGWH_STARTED)
+ {
+ /* Check if worker is attached to error queue? */
+ mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
+ if (shm_mq_get_sender(mq) != NULL)
+ {
+ pcxt->known_started_workers[i] = true;
+ ++pcxt->nknown_started_workers;
+ }
+ }
+ else if (status == BGWH_STOPPED)
+ {
+ CheckParallelWorkerAttachedToEQ(pcxt, i);
+
+ pcxt->known_started_workers[i] = true;
+ ++pcxt->nknown_started_workers;
+ }
+ else
+ {
+ /*
+ * Worker not yet started, so it is better to wait rather than
+ * busy looping. We don't need timeout because we get
+ * notified via latch about the worker attach.
+ */
+ rc = WaitLatch(MyLatch,
+ WL_LATCH_SET | WL_POSTMASTER_DEATH,
+ -1, WAIT_EVENT_BGWORKER_STARTUP);
+
+ /* emergency bailout if postmaster has died */
+ if (rc & WL_POSTMASTER_DEATH)
+ proc_exit(1);
+
+ if (rc & WL_LATCH_SET)
+ ResetLatch(MyLatch);
+ }
+ }
+
+ /* If all workers are known to have started, we're done. */
+ if (pcxt->nknown_started_workers >= pcxt->nworkers_launched)
+ {
+ Assert(pcxt->nknown_started_workers == pcxt->nworkers_launched);
+ break;
+ }
+ }
+}
+
+/*
* Wait for all workers to finish computing.
*
* Even if the parallel operation seems to have completed successfully, it's
@@ -589,7 +693,7 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt)
*/
if (pcxt->worker[i].error_mqh == NULL)
++nfinished;
- else if (pcxt->any_message_received[i])
+ else if (pcxt->known_started_workers[i])
{
anyone_alive = true;
break;
@@ -614,7 +718,6 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt)
for (i = 0; i < pcxt->nworkers_launched; ++i)
{
pid_t pid;
- shm_mq *mq;
/*
* If the worker is BGWH_NOT_YET_STARTED or BGWH_STARTED, we
@@ -627,20 +730,7 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt)
&pid) != BGWH_STOPPED)
continue;
- /*
- * Check whether the worker ended up stopped without ever
- * attaching to the error queue. If so, the postmaster was
- * unable to fork the worker or it exited without initializing
- * properly. We must throw an error, since the caller may
- * have been expecting the worker to do some work before
- * exiting.
- */
- mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
- if (shm_mq_get_sender(mq) == NULL)
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("parallel worker failed to initialize"),
- errhint("More details may be available in the server log.")));
+ CheckParallelWorkerAttachedToEQ(pcxt, i);
/*
* The worker is stopped, but is attached to the error queue.
@@ -789,6 +879,29 @@ DestroyParallelContext(ParallelContext *pcxt)
}
/*
+ * Check whether the worker ended up stopped without ever
+ * attaching to the error queue. If so, the postmaster was
+ * unable to fork the worker or it exited without initializing
+ * properly. We must throw an error, since the caller may
+ * have been expecting the worker to do some work before
+ * exiting.
+ */
+static bool
+CheckParallelWorkerAttachedToEQ(ParallelContext *pcxt, int i)
+{
+ shm_mq *mq;
+
+ mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
+ if (shm_mq_get_sender(mq) == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("parallel worker failed to initialize"),
+ errhint("More details may be available in the server log.")));
+
+ return true;
+}
+
+/*
* Are there any parallel contexts currently active?
*/
bool
@@ -909,8 +1022,12 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
{
char msgtype;
- if (pcxt->any_message_received != NULL)
- pcxt->any_message_received[i] = true;
+ if (pcxt->known_started_workers != NULL &&
+ !pcxt->known_started_workers[i])
+ {
+ pcxt->known_started_workers[i] = true;
+ pcxt->nknown_started_workers++;
+ }
msgtype = pq_getmsgbyte(msg);
diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h
index 32c2e32..0d4a7ed 100644
--- a/src/include/access/parallel.h
+++ b/src/include/access/parallel.h
@@ -43,7 +43,8 @@ typedef struct ParallelContext
void *private_memory;
shm_toc *toc;
ParallelWorkerInfo *worker;
- bool *any_message_received;
+ int nknown_started_workers;
+ bool *known_started_workers;
} ParallelContext;
typedef struct ParallelWorkerContext
@@ -62,6 +63,7 @@ extern ParallelContext *CreateParallelContext(const char *library_name, const ch
extern void InitializeParallelDSM(ParallelContext *pcxt);
extern void ReinitializeParallelDSM(ParallelContext *pcxt);
extern void LaunchParallelWorkers(ParallelContext *pcxt);
+extern void WaitForParallelWorkersToAttach(ParallelContext *pcxt);
extern void WaitForParallelWorkersToFinish(ParallelContext *pcxt);
extern void DestroyParallelContext(ParallelContext *pcxt);
extern bool ParallelContextActive(void);
On Wed, Jan 31, 2018 at 3:57 AM, Amit Kapila <amit.kapila16@gmail.com> wrote:
* There might be some opportunity to share some of the new code with
the code recently committed to WaitForParallelWorkersToFinish(). For
one thing, the logic in this block could be refactored into a
dedicated function that is called by both
WaitForParallelWorkersToAttach() and WaitForParallelWorkersToFinish():I had thought about this earlier but left it as the common code was
too less, however as you have pointed out, I had extracted the common
code into a separate function.
I like it better the other way, so I've changed it back in the
attached version, which also works over the comments fairly heavily.
I think we should not touch anything related to Gather (merge) as they
don't need it for the purpose of correctness. However, we might want
to improve them by using this new API at a certain point if the need
arises. I guess we can use this API to detect failures early.
I added a comment in this version explaining why it works, so that we
don't forget (again). If we decide to change it in the future then we
can remove or update the comment.
Another thing I did was known_started_workers ->
known_attached_workers, which I think is more precisely correct.
Please let me know your thoughts about this version. If it looks OK,
I'll commit it.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
Attachments:
wait-for-attach-rmh.patchapplication/octet-stream; name=wait-for-attach-rmh.patchDownload
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index 54d9ea7be0..5b45b07e7c 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -437,10 +437,11 @@ ReinitializeParallelDSM(ParallelContext *pcxt)
WaitForParallelWorkersToFinish(pcxt);
WaitForParallelWorkersToExit(pcxt);
pcxt->nworkers_launched = 0;
- if (pcxt->any_message_received)
+ if (pcxt->known_attached_workers)
{
- pfree(pcxt->any_message_received);
- pcxt->any_message_received = NULL;
+ pfree(pcxt->known_attached_workers);
+ pcxt->known_attached_workers = NULL;
+ pcxt->nknown_attached_workers = 0;
}
}
@@ -542,17 +543,148 @@ LaunchParallelWorkers(ParallelContext *pcxt)
/*
* Now that nworkers_launched has taken its final value, we can initialize
- * any_message_received.
+ * known_attached_workers.
*/
if (pcxt->nworkers_launched > 0)
- pcxt->any_message_received =
+ {
+ pcxt->known_attached_workers =
palloc0(sizeof(bool) * pcxt->nworkers_launched);
+ pcxt->nknown_attached_workers = 0;
+ }
/* Restore previous memory context. */
MemoryContextSwitchTo(oldcontext);
}
/*
+ * Wait for all workers to attach to their error queues, and throw an error if
+ * any worker fails to do this.
+ *
+ * Callers can assume that if this function returns successfully, then the
+ * number of workers given by pcxt->nworkers_launched have initialized and
+ * attached to their error queues. Whether or not these workers are guaranteed
+ * to still be running depends on what code the caller asked them to run;
+ * this function does not guarantee that they have not exited. However, it
+ * does guarantee that any workers which exited must have done so cleanly and
+ * after successfully performing the work with which they were tasked.
+ *
+ * If this function is not called, then some of the workers that were launched
+ * may not have been started due to a fork() failure, or may have exited during
+ * early startup prior to attaching to the error queue, so nworkers_launched
+ * cannot be viewed as completely reliable. It will never be less than the
+ * number of workers which actually started, but it might be more. Any workers
+ * that failed to start will still be discovered by
+ * WaitForParallelWorkersToFinish and an error will be thrown at that time,
+ * provided that function is eventually reached.
+ *
+ * In general, the leader process should do as much work as possible before
+ * calling this function. fork() failures and other early-startup failures
+ * are very uncommon, and having the leader sit idle when it could be doing
+ * useful work is undesirable. However, if the leader needs to wait for
+ * all of its workers or for a specific worker, it may want to call this
+ * function before doing so. If not, it must make some other provision for
+ * the failure-to-start case, lest it wait forever. On the other hand, a
+ * leader which never waits for a worker that might not be started yet, or
+ * at least never does so prior to WaitForParallelWorkersToFinish(), need not
+ * call this function at all.
+ */
+void
+WaitForParallelWorkersToAttach(ParallelContext *pcxt)
+{
+ int i;
+
+ /* Skip this if we have no launched workers. */
+ if (pcxt->nworkers_launched == 0)
+ return;
+
+ for (;;)
+ {
+ /*
+ * This will process any parallel messages that are pending and it may
+ * also throw an error propagated from a worker.
+ */
+ CHECK_FOR_INTERRUPTS();
+
+ for (i = 0; i < pcxt->nworkers_launched; ++i)
+ {
+ BgwHandleStatus status;
+ shm_mq *mq;
+ int rc;
+ pid_t pid;
+
+ if (pcxt->known_attached_workers[i])
+ continue;
+
+ /*
+ * If error_mqh is NULL, then the worker has already exited
+ * cleanly.
+ */
+ if (pcxt->worker[i].error_mqh == NULL)
+ {
+ pcxt->known_attached_workers[i] = true;
+ ++pcxt->nknown_attached_workers;
+ continue;
+ }
+
+ status = GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle, &pid);
+ if (status == BGWH_STARTED)
+ {
+ /* Has the worker attached to the error queue? */
+ mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
+ if (shm_mq_get_sender(mq) != NULL)
+ {
+ /* Yes, so it is known to be attached. */
+ pcxt->known_attached_workers[i] = true;
+ ++pcxt->nknown_attached_workers;
+ }
+ }
+ else if (status == BGWH_STOPPED)
+ {
+ /*
+ * If the worker stopped without attaching to the error queue,
+ * throw an error.
+ */
+ mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
+ if (shm_mq_get_sender(mq) == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("parallel worker failed to initialize"),
+ errhint("More details may be available in the server log.")));
+
+ pcxt->known_attached_workers[i] = true;
+ ++pcxt->nknown_attached_workers;
+ }
+ else
+ {
+ /*
+ * Worker not yet started, so we must wait. The postmaster
+ * will notify us if the worker's state changes. Our latch
+ * might also get set for some other reason, but if so we'll
+ * just end up waiting for the same worker again.
+ */
+ rc = WaitLatch(MyLatch,
+ WL_LATCH_SET | WL_POSTMASTER_DEATH,
+ -1, WAIT_EVENT_BGWORKER_STARTUP);
+
+ /* emergency bailout if postmaster has died */
+ if (rc & WL_POSTMASTER_DEATH)
+ proc_exit(1);
+
+ if (rc & WL_LATCH_SET)
+ ResetLatch(MyLatch);
+ }
+ }
+
+ /* If all workers are known to have started, we're done. */
+ if (pcxt->nknown_attached_workers >= pcxt->nworkers_launched)
+ {
+ Assert(pcxt->nknown_attached_workers == pcxt->nworkers_launched);
+ break;
+ }
+ }
+}
+
+/*
* Wait for all workers to finish computing.
*
* Even if the parallel operation seems to have completed successfully, it's
@@ -589,7 +721,7 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt)
*/
if (pcxt->worker[i].error_mqh == NULL)
++nfinished;
- else if (pcxt->any_message_received[i])
+ else if (pcxt->known_attached_workers[i])
{
anyone_alive = true;
break;
@@ -909,8 +1041,12 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
{
char msgtype;
- if (pcxt->any_message_received != NULL)
- pcxt->any_message_received[i] = true;
+ if (pcxt->known_attached_workers != NULL &&
+ !pcxt->known_attached_workers[i])
+ {
+ pcxt->known_attached_workers[i] = true;
+ pcxt->nknown_attached_workers++;
+ }
msgtype = pq_getmsgbyte(msg);
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index 89266b5371..58eadd45b8 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -312,7 +312,14 @@ gather_readnext(GatherState *gatherstate)
/* Check for async events, particularly messages from workers. */
CHECK_FOR_INTERRUPTS();
- /* Attempt to read a tuple, but don't block if none is available. */
+ /*
+ * Attempt to read a tuple, but don't block if none is available.
+ *
+ * Note that TupleQueueReaderNext will just return NULL for a worker
+ * which fails to initialize. We'll treat that worker as having
+ * produced no tuples; WaitForParallelWorkersToFinish will error out
+ * when we get there.
+ */
Assert(gatherstate->nextreader < gatherstate->nreaders);
reader = gatherstate->reader[gatherstate->nextreader];
tup = TupleQueueReaderNext(reader, true, &readerdone);
diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c
index a3e34c6980..6858c91e8c 100644
--- a/src/backend/executor/nodeGatherMerge.c
+++ b/src/backend/executor/nodeGatherMerge.c
@@ -710,7 +710,14 @@ gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait,
/* Check for async events, particularly messages from workers. */
CHECK_FOR_INTERRUPTS();
- /* Attempt to read a tuple. */
+ /*
+ * Attempt to read a tuple.
+ *
+ * Note that TupleQueueReaderNext will just return NULL for a worker which
+ * fails to initialize. We'll treat that worker as having produced no
+ * tuples; WaitForParallelWorkersToFinish will error out when we get
+ * there.
+ */
reader = gm_state->reader[nreader - 1];
tup = TupleQueueReaderNext(reader, nowait, done);
diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h
index 32c2e32bea..d0c218b185 100644
--- a/src/include/access/parallel.h
+++ b/src/include/access/parallel.h
@@ -43,7 +43,8 @@ typedef struct ParallelContext
void *private_memory;
shm_toc *toc;
ParallelWorkerInfo *worker;
- bool *any_message_received;
+ int nknown_attached_workers;
+ bool *known_attached_workers;
} ParallelContext;
typedef struct ParallelWorkerContext
@@ -62,6 +63,7 @@ extern ParallelContext *CreateParallelContext(const char *library_name, const ch
extern void InitializeParallelDSM(ParallelContext *pcxt);
extern void ReinitializeParallelDSM(ParallelContext *pcxt);
extern void LaunchParallelWorkers(ParallelContext *pcxt);
+extern void WaitForParallelWorkersToAttach(ParallelContext *pcxt);
extern void WaitForParallelWorkersToFinish(ParallelContext *pcxt);
extern void DestroyParallelContext(ParallelContext *pcxt);
extern bool ParallelContextActive(void);
On Wed, Jan 31, 2018 at 9:53 PM, Robert Haas <robertmhaas@gmail.com> wrote:
On Wed, Jan 31, 2018 at 3:57 AM, Amit Kapila <amit.kapila16@gmail.com> wrote:
* There might be some opportunity to share some of the new code with
the code recently committed to WaitForParallelWorkersToFinish(). For
one thing, the logic in this block could be refactored into a
dedicated function that is called by both
WaitForParallelWorkersToAttach() and WaitForParallelWorkersToFinish():I had thought about this earlier but left it as the common code was
too less, however as you have pointed out, I had extracted the common
code into a separate function.I like it better the other way, so I've changed it back in the
attached version,
Okay, no problem.
which also works over the comments fairly heavily.
+ * However, if the leader needs to wait for
+ * all of its workers or for a specific worker, it may want to call this
+ * function before doing so.
I think suggesting to use this API to wait "for a specific worker"
doesn't seem like a good idea as it doesn't have any such provision.
Other than that the patch looks good.
--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com
On Wed, Jan 31, 2018 at 10:08 PM, Amit Kapila <amit.kapila16@gmail.com> wrote:
I think suggesting to use this API to wait "for a specific worker"
doesn't seem like a good idea as it doesn't have any such provision.
I see your point, but in the absence of a more specific API it could
be used that way, and it wouldn't be unreasonable. Just might wait a
little longer than absolutely necessary.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
On Thu, Feb 1, 2018 at 9:09 PM, Robert Haas <robertmhaas@gmail.com> wrote:
On Wed, Jan 31, 2018 at 10:08 PM, Amit Kapila <amit.kapila16@gmail.com> wrote:
I think suggesting to use this API to wait "for a specific worker"
doesn't seem like a good idea as it doesn't have any such provision.I see your point, but in the absence of a more specific API it could
be used that way, and it wouldn't be unreasonable. Just might wait a
little longer than absolutely necessary.
Fair enough, you can proceed with the patch.
--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com
On Thu, Feb 1, 2018 at 9:48 PM, Amit Kapila <amit.kapila16@gmail.com> wrote:
On Thu, Feb 1, 2018 at 9:09 PM, Robert Haas <robertmhaas@gmail.com> wrote:
On Wed, Jan 31, 2018 at 10:08 PM, Amit Kapila <amit.kapila16@gmail.com> wrote:
I think suggesting to use this API to wait "for a specific worker"
doesn't seem like a good idea as it doesn't have any such provision.I see your point, but in the absence of a more specific API it could
be used that way, and it wouldn't be unreasonable. Just might wait a
little longer than absolutely necessary.Fair enough, you can proceed with the patch.
Committed. Now, on to the main event!
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company