diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index 54d9ea7..bdece27 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -133,6 +133,7 @@ static const struct }; /* Private functions. */ +static bool CheckParallelWorkerAttachedToEQ(ParallelContext *pcxt, int i); static void HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg); static void WaitForParallelWorkersToExit(ParallelContext *pcxt); static parallel_worker_main_type LookupParallelWorkerFunction(const char *libraryname, const char *funcname); @@ -437,10 +438,11 @@ ReinitializeParallelDSM(ParallelContext *pcxt) WaitForParallelWorkersToFinish(pcxt); WaitForParallelWorkersToExit(pcxt); pcxt->nworkers_launched = 0; - if (pcxt->any_message_received) + if (pcxt->known_started_workers) { - pfree(pcxt->any_message_received); - pcxt->any_message_received = NULL; + pfree(pcxt->known_started_workers); + pcxt->known_started_workers = NULL; + pcxt->nknown_started_workers = 0; } } @@ -542,17 +544,119 @@ LaunchParallelWorkers(ParallelContext *pcxt) /* * Now that nworkers_launched has taken its final value, we can initialize - * any_message_received. + * known_started_workers. */ if (pcxt->nworkers_launched > 0) - pcxt->any_message_received = + { + pcxt->known_started_workers = palloc0(sizeof(bool) * pcxt->nworkers_launched); + pcxt->nknown_started_workers = 0; + } /* Restore previous memory context. */ MemoryContextSwitchTo(oldcontext); } /* + * Wait for all workers to start or fail even if one of the workers fails to + * attach. This is a performance sensitive API as here we wait for all the + * launched workers to start. So caller should use this API cautiously. + * For example, the caller can arrange to do a lot of work in the leader which + * can be done without doing any sort of wait. + * + * This API must be used after the caller has launched parallel workers. This + * has some similarity with WaitForParallelWorkersToFinish such that if any + * worker fails to attach to the error queue, it will return an error. + * However, unlike WaitForParallelWorkersToFinish which waits for workers to + * finish computing, this just waits for workers to start and attach to the + * error queue. + */ +void +WaitForParallelWorkersToAttach(ParallelContext *pcxt) +{ + int i; + + /* Skip this if we have no launched workers. */ + if (pcxt->nworkers_launched == 0) + return; + + for (;;) + { + /* + * This will process any parallel messages that are pending and it may + * also throw an error propagated from a worker. + */ + CHECK_FOR_INTERRUPTS(); + + for (i = 0; i < pcxt->nworkers_launched; ++i) + { + BgwHandleStatus status; + shm_mq *mq; + int rc; + pid_t pid; + + if (pcxt->known_started_workers[i]) + continue; + + /* + * If error_mqh is NULL, then the worker has already exited + * cleanly. + */ + if (pcxt->worker[i].error_mqh == NULL) + { + pcxt->known_started_workers[i] = true; + ++pcxt->nknown_started_workers; + continue; + } + + status = GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle, &pid); + if (status == BGWH_STARTED) + { + /* Check if worker is attached to error queue? */ + mq = shm_mq_get_queue(pcxt->worker[i].error_mqh); + if (shm_mq_get_sender(mq) != NULL) + { + pcxt->known_started_workers[i] = true; + ++pcxt->nknown_started_workers; + } + } + else if (status == BGWH_STOPPED) + { + CheckParallelWorkerAttachedToEQ(pcxt, i); + + pcxt->known_started_workers[i] = true; + ++pcxt->nknown_started_workers; + } + else + { + /* + * Worker not yet started, so it is better to wait rather than + * busy looping. We don't need timeout because we get + * notified via latch about the worker attach. + */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_POSTMASTER_DEATH, + -1, WAIT_EVENT_BGWORKER_STARTUP); + + /* emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + + if (rc & WL_LATCH_SET) + ResetLatch(MyLatch); + } + } + + /* If all workers are known to have started, we're done. */ + if (pcxt->nknown_started_workers >= pcxt->nworkers_launched) + { + Assert(pcxt->nknown_started_workers == pcxt->nworkers_launched); + break; + } + } +} + +/* * Wait for all workers to finish computing. * * Even if the parallel operation seems to have completed successfully, it's @@ -589,7 +693,7 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt) */ if (pcxt->worker[i].error_mqh == NULL) ++nfinished; - else if (pcxt->any_message_received[i]) + else if (pcxt->known_started_workers[i]) { anyone_alive = true; break; @@ -614,7 +718,6 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt) for (i = 0; i < pcxt->nworkers_launched; ++i) { pid_t pid; - shm_mq *mq; /* * If the worker is BGWH_NOT_YET_STARTED or BGWH_STARTED, we @@ -627,20 +730,7 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt) &pid) != BGWH_STOPPED) continue; - /* - * Check whether the worker ended up stopped without ever - * attaching to the error queue. If so, the postmaster was - * unable to fork the worker or it exited without initializing - * properly. We must throw an error, since the caller may - * have been expecting the worker to do some work before - * exiting. - */ - mq = shm_mq_get_queue(pcxt->worker[i].error_mqh); - if (shm_mq_get_sender(mq) == NULL) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("parallel worker failed to initialize"), - errhint("More details may be available in the server log."))); + CheckParallelWorkerAttachedToEQ(pcxt, i); /* * The worker is stopped, but is attached to the error queue. @@ -789,6 +879,29 @@ DestroyParallelContext(ParallelContext *pcxt) } /* + * Check whether the worker ended up stopped without ever + * attaching to the error queue. If so, the postmaster was + * unable to fork the worker or it exited without initializing + * properly. We must throw an error, since the caller may + * have been expecting the worker to do some work before + * exiting. + */ +static bool +CheckParallelWorkerAttachedToEQ(ParallelContext *pcxt, int i) +{ + shm_mq *mq; + + mq = shm_mq_get_queue(pcxt->worker[i].error_mqh); + if (shm_mq_get_sender(mq) == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("parallel worker failed to initialize"), + errhint("More details may be available in the server log."))); + + return true; +} + +/* * Are there any parallel contexts currently active? */ bool @@ -909,8 +1022,12 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg) { char msgtype; - if (pcxt->any_message_received != NULL) - pcxt->any_message_received[i] = true; + if (pcxt->known_started_workers != NULL && + !pcxt->known_started_workers[i]) + { + pcxt->known_started_workers[i] = true; + pcxt->nknown_started_workers++; + } msgtype = pq_getmsgbyte(msg); diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h index 32c2e32..0d4a7ed 100644 --- a/src/include/access/parallel.h +++ b/src/include/access/parallel.h @@ -43,7 +43,8 @@ typedef struct ParallelContext void *private_memory; shm_toc *toc; ParallelWorkerInfo *worker; - bool *any_message_received; + int nknown_started_workers; + bool *known_started_workers; } ParallelContext; typedef struct ParallelWorkerContext @@ -62,6 +63,7 @@ extern ParallelContext *CreateParallelContext(const char *library_name, const ch extern void InitializeParallelDSM(ParallelContext *pcxt); extern void ReinitializeParallelDSM(ParallelContext *pcxt); extern void LaunchParallelWorkers(ParallelContext *pcxt); +extern void WaitForParallelWorkersToAttach(ParallelContext *pcxt); extern void WaitForParallelWorkersToFinish(ParallelContext *pcxt); extern void DestroyParallelContext(ParallelContext *pcxt); extern bool ParallelContextActive(void);