diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index 54d9ea7..8920134 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -553,6 +553,114 @@ LaunchParallelWorkers(ParallelContext *pcxt) } /* + * Wait for all workers to start or fail even if one of the workers fails to + * attach. + */ +void +WaitForParallelWorkersToAttach(ParallelContext *pcxt) +{ + int i; + int nknown_started_workers = 0; + bool *known_started_workers; + + known_started_workers = + (bool *) palloc0(pcxt->nworkers_launched * sizeof(bool)); + + for (;;) + { + /* + * This will process any parallel messages that are pending and it may + * also throw an error propagated from a worker. + */ + CHECK_FOR_INTERRUPTS(); + + for (i = 0; i < pcxt->nworkers_launched; ++i) + { + BgwHandleStatus status; + shm_mq *mq; + int rc; + pid_t pid; + + if (!known_started_workers[i]) + { + /* + * If error_mqh is NULL, then the worker has already exited + * cleanly. + */ + if (pcxt->worker[i].error_mqh == NULL) + { + known_started_workers[i] = true; + ++nknown_started_workers; + continue; + } + + status = GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle, &pid); + if (status == BGWH_STARTED) + { + /* Check if worker is attached to error queue? */ + mq = shm_mq_get_queue(pcxt->worker[i].error_mqh); + if (shm_mq_get_sender(mq) != NULL) + { + known_started_workers[i] = true; + ++nknown_started_workers; + } + } + else if (status == BGWH_STOPPED) + { + /* + * Check whether the worker ended up stopped without ever + * attaching to the error queue. If so, the postmaster + * was unable to fork the worker or it exited without + * initializing properly. We must throw an error, since + * the caller may have been expecting the worker to do + * some work before exiting. + */ + mq = shm_mq_get_queue(pcxt->worker[i].error_mqh); + if (shm_mq_get_sender(mq) == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("parallel worker failed to initialize"), + errhint("More details may be available in the server log."))); + + known_started_workers[i] = true; + ++nknown_started_workers; + } + else + { + /* + * Worker not yet started, so it is better to wait for + * sometime rather than busy looping. We need timeout + * because we generally don't get notified via latch about + * the worker attach. But we don't expect to have to wait + * long. + */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + 10L, WAIT_EVENT_BGWORKER_STARTUP); + + /* emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + + if (rc & WL_LATCH_SET) + ResetLatch(MyLatch); + } + } + } + + /* If all workers are known to have started, we're done. */ + if (nknown_started_workers >= pcxt->nworkers_launched) + { + Assert(nknown_started_workers == pcxt->nworkers_launched); + break; + } + } + + /* be tidy */ + pfree(known_started_workers); +} + +/* * Wait for all workers to finish computing. * * Even if the parallel operation seems to have completed successfully, it's diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h index 32c2e32..98b5f76 100644 --- a/src/include/access/parallel.h +++ b/src/include/access/parallel.h @@ -62,6 +62,7 @@ extern ParallelContext *CreateParallelContext(const char *library_name, const ch extern void InitializeParallelDSM(ParallelContext *pcxt); extern void ReinitializeParallelDSM(ParallelContext *pcxt); extern void LaunchParallelWorkers(ParallelContext *pcxt); +extern void WaitForParallelWorkersToAttach(ParallelContext *pcxt); extern void WaitForParallelWorkersToFinish(ParallelContext *pcxt); extern void DestroyParallelContext(ParallelContext *pcxt); extern bool ParallelContextActive(void);