doc/src/sgml/custom-scan.sgml | 12 ++++++++++++
doc/src/sgml/fdwhandler.sgml | 13 +++++++++++++
src/backend/executor/execParallel.c | 32 ++++++++++++++++++++++++++------
src/backend/executor/nodeCustom.c | 9 +++++++++
src/backend/executor/nodeForeignscan.c | 16 ++++++++++++++++
src/include/executor/nodeCustom.h | 2 ++
src/include/executor/nodeForeignscan.h | 2 ++
src/include/foreign/fdwapi.h | 3 +++
src/include/nodes/extensible.h | 2 ++
9 files changed, 85 insertions(+), 6 deletions(-)
diff --git a/doc/src/sgml/custom-scan.sgml b/doc/src/sgml/custom-scan.sgml
index 1ca9247..4bd20dd 100644
--- a/doc/src/sgml/custom-scan.sgml
+++ b/doc/src/sgml/custom-scan.sgml
@@ -340,6 +340,18 @@ void (*InitializeWorkerCustomScan) (CustomScanState *node,
+void (*ParallelFinishCustomScan) (CustomScanState *node,
+ ParallelContext *pcxt);
+
+ Retrieve the custom state after all the worker get finished but prior
+ to the release of DSM segment.
+ This callback is optional, and needs only be supplied if this custom
+ path wants to reference the DSM segment in the master process's context
+ after the worker's exit.
+
+
+
+
void (*ExplainCustomScan) (CustomScanState *node,
List *ancestors,
ExplainState *es);
diff --git a/doc/src/sgml/fdwhandler.sgml b/doc/src/sgml/fdwhandler.sgml
index 0c1db07..9ae006a 100644
--- a/doc/src/sgml/fdwhandler.sgml
+++ b/doc/src/sgml/fdwhandler.sgml
@@ -1254,6 +1254,19 @@ InitializeWorkerForeignScan(ForeignScanState *node, shm_toc *toc,
This callback is optional, and needs only be supplied if this
custom path supports parallel execution.
+
+
+
+void
+ParallelFinishForeignScan(ForeignScanState *node, ParallelContext *pcxt);
+
+ Retrieve the custom state after all the worker get finished but prior
+ to the release of DSM segment.
+ This callback is optional, and needs only be supplied if this custom
+ path wants to reference the DSM segment in the master process's context
+ after the worker's exit.
+
+
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index 98d4f1e..fee937d 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -106,7 +106,7 @@ static bool ExecParallelInitializeDSM(PlanState *node,
static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
bool reinitialize);
static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
- SharedExecutorInstrumentation *instrumentation);
+ ParallelExecutorInfo *pei);
/* Helper functions that run in the parallel worker. */
static void ParallelQueryMain(dsm_segment *seg, shm_toc *toc);
@@ -533,8 +533,9 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
*/
static bool
ExecParallelRetrieveInstrumentation(PlanState *planstate,
- SharedExecutorInstrumentation *instrumentation)
+ ParallelExecutorInfo *pei)
{
+ SharedExecutorInstrumentation *instrumentation = pei->instrumentation;
Instrumentation *instrument;
int i;
int n;
@@ -571,13 +572,33 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate,
planstate->worker_instrument->num_workers = instrumentation->num_workers;
memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
+ /*
+ * Allows to store the node-specific run-time statistics, more than
+ * Instrumentation. Right now, only ForeignScan and CustomScan may
+ * have own private fields on DSM area. They can utilize this hook
+ * to retrieve the run-time statistics accumulated on the DSM area,
+ * prior to its release.
+ */
+ switch (nodeTag(planstate))
+ {
+ case T_ForeignScanState:
+ ExecForeignScanParallelFinish((ForeignScanState *) planstate,
+ pei->pcxt);
+ break;
+ case T_CustomScanState:
+ ExecCustomScanParallelFinish((CustomScanState *) planstate,
+ pei->pcxt);
+ break;
+ default:
+ break;
+ }
return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation,
- instrumentation);
+ pei);
}
/*
* Finish parallel execution. We wait for parallel workers to finish, and
- * accumulate their buffer usage and instrumentation.
+ * accumulate their buffer usage, instrumentation and others.
*/
void
ExecParallelFinish(ParallelExecutorInfo *pei)
@@ -596,8 +617,7 @@ ExecParallelFinish(ParallelExecutorInfo *pei)
/* Finally, accumulate instrumentation, if any. */
if (pei->instrumentation)
- ExecParallelRetrieveInstrumentation(pei->planstate,
- pei->instrumentation);
+ ExecParallelRetrieveInstrumentation(pei->planstate, pei);
pei->finished = true;
}
diff --git a/src/backend/executor/nodeCustom.c b/src/backend/executor/nodeCustom.c
index 16343a5..d0bc015 100644
--- a/src/backend/executor/nodeCustom.c
+++ b/src/backend/executor/nodeCustom.c
@@ -202,3 +202,12 @@ ExecCustomScanInitializeWorker(CustomScanState *node, shm_toc *toc)
methods->InitializeWorkerCustomScan(node, toc, coordinate);
}
}
+
+void
+ExecCustomScanParallelFinish(CustomScanState *node, ParallelContext *pcxt)
+{
+ const CustomExecMethods *methods = node->methods;
+
+ if (methods->ParallelFinishCustomScan)
+ methods->ParallelFinishCustomScan(node, pcxt);
+}
diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c
index 86a77e3..c1dcf11 100644
--- a/src/backend/executor/nodeForeignscan.c
+++ b/src/backend/executor/nodeForeignscan.c
@@ -353,3 +353,19 @@ ExecForeignScanInitializeWorker(ForeignScanState *node, shm_toc *toc)
fdwroutine->InitializeWorkerForeignScan(node, toc, coordinate);
}
}
+
+/* ----------------------------------------------------------------
+ * ExecForeignScanParallelFinish
+ *
+ * Retrieve FDW's own run-time statistics on the parallel coordication
+ * information prior to its release.
+ * ----------------------------------------------------------------
+ */
+void
+ExecForeignScanParallelFinish(ForeignScanState *node, ParallelContext *pcxt)
+{
+ FdwRoutine *fdwroutine = node->fdwroutine;
+
+ if (fdwroutine->ParallelFinishForeignScan)
+ fdwroutine->ParallelFinishForeignScan(node, pcxt);
+}
diff --git a/src/include/executor/nodeCustom.h b/src/include/executor/nodeCustom.h
index 19d5d04..58eb808 100644
--- a/src/include/executor/nodeCustom.h
+++ b/src/include/executor/nodeCustom.h
@@ -37,5 +37,7 @@ extern void ExecCustomScanInitializeDSM(CustomScanState *node,
ParallelContext *pcxt);
extern void ExecCustomScanInitializeWorker(CustomScanState *node,
shm_toc *toc);
+extern void ExecCustomScanParallelFinish(CustomScanState *node,
+ ParallelContext *pcxt);
#endif /* NODECUSTOM_H */
diff --git a/src/include/executor/nodeForeignscan.h b/src/include/executor/nodeForeignscan.h
index f0e942a..65d09bf 100644
--- a/src/include/executor/nodeForeignscan.h
+++ b/src/include/executor/nodeForeignscan.h
@@ -28,5 +28,7 @@ extern void ExecForeignScanInitializeDSM(ForeignScanState *node,
ParallelContext *pcxt);
extern void ExecForeignScanInitializeWorker(ForeignScanState *node,
shm_toc *toc);
+extern void ExecForeignScanParallelFinish(ForeignScanState *node,
+ ParallelContext *pcxt);
#endif /* NODEFOREIGNSCAN_H */
diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h
index 523d415..f32c1f0 100644
--- a/src/include/foreign/fdwapi.h
+++ b/src/include/foreign/fdwapi.h
@@ -151,6 +151,8 @@ typedef void (*InitializeDSMForeignScan_function) (ForeignScanState *node,
typedef void (*InitializeWorkerForeignScan_function) (ForeignScanState *node,
shm_toc *toc,
void *coordinate);
+typedef void (*ParallelFinishForeignScan_function) (ForeignScanState *node,
+ ParallelContext *pcxt);
typedef bool (*IsForeignScanParallelSafe_function) (PlannerInfo *root,
RelOptInfo *rel,
RangeTblEntry *rte);
@@ -224,6 +226,7 @@ typedef struct FdwRoutine
EstimateDSMForeignScan_function EstimateDSMForeignScan;
InitializeDSMForeignScan_function InitializeDSMForeignScan;
InitializeWorkerForeignScan_function InitializeWorkerForeignScan;
+ ParallelFinishForeignScan_function ParallelFinishForeignScan;
} FdwRoutine;
diff --git a/src/include/nodes/extensible.h b/src/include/nodes/extensible.h
index 7e860b0..73dc89b 100644
--- a/src/include/nodes/extensible.h
+++ b/src/include/nodes/extensible.h
@@ -139,6 +139,8 @@ typedef struct CustomExecMethods
void (*InitializeWorkerCustomScan) (CustomScanState *node,
shm_toc *toc,
void *coordinate);
+ void (*ParallelFinishCustomScan) (CustomScanState *node,
+ ParallelContext *pcxt);
/* Optional: print additional information in EXPLAIN */
void (*ExplainCustomScan) (CustomScanState *node,