doc/src/sgml/custom-scan.sgml | 12 ++++++++++++ doc/src/sgml/fdwhandler.sgml | 13 +++++++++++++ src/backend/executor/execParallel.c | 32 ++++++++++++++++++++++++++------ src/backend/executor/nodeCustom.c | 9 +++++++++ src/backend/executor/nodeForeignscan.c | 16 ++++++++++++++++ src/include/executor/nodeCustom.h | 2 ++ src/include/executor/nodeForeignscan.h | 2 ++ src/include/foreign/fdwapi.h | 3 +++ src/include/nodes/extensible.h | 2 ++ 9 files changed, 85 insertions(+), 6 deletions(-) diff --git a/doc/src/sgml/custom-scan.sgml b/doc/src/sgml/custom-scan.sgml index 1ca9247..4bd20dd 100644 --- a/doc/src/sgml/custom-scan.sgml +++ b/doc/src/sgml/custom-scan.sgml @@ -340,6 +340,18 @@ void (*InitializeWorkerCustomScan) (CustomScanState *node, +void (*ParallelFinishCustomScan) (CustomScanState *node, + ParallelContext *pcxt); + + Retrieve the custom state after all the worker get finished but prior + to the release of DSM segment. + This callback is optional, and needs only be supplied if this custom + path wants to reference the DSM segment in the master process's context + after the worker's exit. + + + + void (*ExplainCustomScan) (CustomScanState *node, List *ancestors, ExplainState *es); diff --git a/doc/src/sgml/fdwhandler.sgml b/doc/src/sgml/fdwhandler.sgml index 0c1db07..9ae006a 100644 --- a/doc/src/sgml/fdwhandler.sgml +++ b/doc/src/sgml/fdwhandler.sgml @@ -1254,6 +1254,19 @@ InitializeWorkerForeignScan(ForeignScanState *node, shm_toc *toc, This callback is optional, and needs only be supplied if this custom path supports parallel execution. + + + +void +ParallelFinishForeignScan(ForeignScanState *node, ParallelContext *pcxt); + + Retrieve the custom state after all the worker get finished but prior + to the release of DSM segment. + This callback is optional, and needs only be supplied if this custom + path wants to reference the DSM segment in the master process's context + after the worker's exit. + + diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 98d4f1e..fee937d 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -106,7 +106,7 @@ static bool ExecParallelInitializeDSM(PlanState *node, static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize); static bool ExecParallelRetrieveInstrumentation(PlanState *planstate, - SharedExecutorInstrumentation *instrumentation); + ParallelExecutorInfo *pei); /* Helper functions that run in the parallel worker. */ static void ParallelQueryMain(dsm_segment *seg, shm_toc *toc); @@ -533,8 +533,9 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) */ static bool ExecParallelRetrieveInstrumentation(PlanState *planstate, - SharedExecutorInstrumentation *instrumentation) + ParallelExecutorInfo *pei) { + SharedExecutorInstrumentation *instrumentation = pei->instrumentation; Instrumentation *instrument; int i; int n; @@ -571,13 +572,33 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate, planstate->worker_instrument->num_workers = instrumentation->num_workers; memcpy(&planstate->worker_instrument->instrument, instrument, ibytes); + /* + * Allows to store the node-specific run-time statistics, more than + * Instrumentation. Right now, only ForeignScan and CustomScan may + * have own private fields on DSM area. They can utilize this hook + * to retrieve the run-time statistics accumulated on the DSM area, + * prior to its release. + */ + switch (nodeTag(planstate)) + { + case T_ForeignScanState: + ExecForeignScanParallelFinish((ForeignScanState *) planstate, + pei->pcxt); + break; + case T_CustomScanState: + ExecCustomScanParallelFinish((CustomScanState *) planstate, + pei->pcxt); + break; + default: + break; + } return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation, - instrumentation); + pei); } /* * Finish parallel execution. We wait for parallel workers to finish, and - * accumulate their buffer usage and instrumentation. + * accumulate their buffer usage, instrumentation and others. */ void ExecParallelFinish(ParallelExecutorInfo *pei) @@ -596,8 +617,7 @@ ExecParallelFinish(ParallelExecutorInfo *pei) /* Finally, accumulate instrumentation, if any. */ if (pei->instrumentation) - ExecParallelRetrieveInstrumentation(pei->planstate, - pei->instrumentation); + ExecParallelRetrieveInstrumentation(pei->planstate, pei); pei->finished = true; } diff --git a/src/backend/executor/nodeCustom.c b/src/backend/executor/nodeCustom.c index 16343a5..d0bc015 100644 --- a/src/backend/executor/nodeCustom.c +++ b/src/backend/executor/nodeCustom.c @@ -202,3 +202,12 @@ ExecCustomScanInitializeWorker(CustomScanState *node, shm_toc *toc) methods->InitializeWorkerCustomScan(node, toc, coordinate); } } + +void +ExecCustomScanParallelFinish(CustomScanState *node, ParallelContext *pcxt) +{ + const CustomExecMethods *methods = node->methods; + + if (methods->ParallelFinishCustomScan) + methods->ParallelFinishCustomScan(node, pcxt); +} diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c index 86a77e3..c1dcf11 100644 --- a/src/backend/executor/nodeForeignscan.c +++ b/src/backend/executor/nodeForeignscan.c @@ -353,3 +353,19 @@ ExecForeignScanInitializeWorker(ForeignScanState *node, shm_toc *toc) fdwroutine->InitializeWorkerForeignScan(node, toc, coordinate); } } + +/* ---------------------------------------------------------------- + * ExecForeignScanParallelFinish + * + * Retrieve FDW's own run-time statistics on the parallel coordication + * information prior to its release. + * ---------------------------------------------------------------- + */ +void +ExecForeignScanParallelFinish(ForeignScanState *node, ParallelContext *pcxt) +{ + FdwRoutine *fdwroutine = node->fdwroutine; + + if (fdwroutine->ParallelFinishForeignScan) + fdwroutine->ParallelFinishForeignScan(node, pcxt); +} diff --git a/src/include/executor/nodeCustom.h b/src/include/executor/nodeCustom.h index 19d5d04..58eb808 100644 --- a/src/include/executor/nodeCustom.h +++ b/src/include/executor/nodeCustom.h @@ -37,5 +37,7 @@ extern void ExecCustomScanInitializeDSM(CustomScanState *node, ParallelContext *pcxt); extern void ExecCustomScanInitializeWorker(CustomScanState *node, shm_toc *toc); +extern void ExecCustomScanParallelFinish(CustomScanState *node, + ParallelContext *pcxt); #endif /* NODECUSTOM_H */ diff --git a/src/include/executor/nodeForeignscan.h b/src/include/executor/nodeForeignscan.h index f0e942a..65d09bf 100644 --- a/src/include/executor/nodeForeignscan.h +++ b/src/include/executor/nodeForeignscan.h @@ -28,5 +28,7 @@ extern void ExecForeignScanInitializeDSM(ForeignScanState *node, ParallelContext *pcxt); extern void ExecForeignScanInitializeWorker(ForeignScanState *node, shm_toc *toc); +extern void ExecForeignScanParallelFinish(ForeignScanState *node, + ParallelContext *pcxt); #endif /* NODEFOREIGNSCAN_H */ diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h index 523d415..f32c1f0 100644 --- a/src/include/foreign/fdwapi.h +++ b/src/include/foreign/fdwapi.h @@ -151,6 +151,8 @@ typedef void (*InitializeDSMForeignScan_function) (ForeignScanState *node, typedef void (*InitializeWorkerForeignScan_function) (ForeignScanState *node, shm_toc *toc, void *coordinate); +typedef void (*ParallelFinishForeignScan_function) (ForeignScanState *node, + ParallelContext *pcxt); typedef bool (*IsForeignScanParallelSafe_function) (PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte); @@ -224,6 +226,7 @@ typedef struct FdwRoutine EstimateDSMForeignScan_function EstimateDSMForeignScan; InitializeDSMForeignScan_function InitializeDSMForeignScan; InitializeWorkerForeignScan_function InitializeWorkerForeignScan; + ParallelFinishForeignScan_function ParallelFinishForeignScan; } FdwRoutine; diff --git a/src/include/nodes/extensible.h b/src/include/nodes/extensible.h index 7e860b0..73dc89b 100644 --- a/src/include/nodes/extensible.h +++ b/src/include/nodes/extensible.h @@ -139,6 +139,8 @@ typedef struct CustomExecMethods void (*InitializeWorkerCustomScan) (CustomScanState *node, shm_toc *toc, void *coordinate); + void (*ParallelFinishCustomScan) (CustomScanState *node, + ParallelContext *pcxt); /* Optional: print additional information in EXPLAIN */ void (*ExplainCustomScan) (CustomScanState *node,