[PoC] Asynchronous execution again (which is not parallel)
Hello, the parallel scan became to work. So I'd like to repropose
the 'asynchronous execution' or 'early execution'.
In previous proposal, I had only foreign scan as workable
example, but now I can use the parallel execution instead to make
this distinctive from parallel execution itself.
I could put more work on this before proposal but I'd like to
show this at this time in order to judge wheter this deserves
further work.
==== Overview of asynchronos execution
"Asynchronous execution" is a feature to start substantial work
of nodes before doing Exec*. This can reduce total startup time
by folding startup time of multiple execution nodes. Especially
effective for the combination of joins or appends and their
multiple children that needs long time to startup.
This patch does that by inserting another phase "Start*" between
ExecInit* and Exec* to launch parallel processing including
pgworker and FDWs before requesting the very first tuple of the
result.
==== About this patch
As a proof of concept, the first tree patchs adds such start
phase to executor and add facility to trace node status for
almost all kind of the executor nodes (Part of this would be
useless, though). Then the two last implement an example usage of
the infrastracture.
The two introduced GUCs enable_parasortmerge and
enable_asyncexec respecively controls whether to use gather for
sorts under merge join and whether to make asyncronous execution
effective.
For evaluation, I made merge join to use bgworker for some
codition as an example. It is mere a mock implement but enough to
show the difference between parallel execution and async
execution (More appropriate names are welcome) and its
effectiveness. Thanks for Amit's great work.
==== Performance test
Apply all the patches then do the following in order. Of course
this test is artificially made so that this patch wins:)
CREATE TABLE t1 (a int, b int);
CREATE TABLE t2 (a int, b int);
CREATE TABLE t3 (a int, b int);
INSERT INTO t1 (SELECT (a / 1000) + (a % 1000) * 1000, a FROM generate_series(0, 999999) a);
INSERT INTO t2 (SELECT (a / 1000) + (a % 1000) * 1000, a FROM generate_series(0, 999999) a);
INSERT INTO t3 (SELECT (a / 1000) + (a % 1000) * 1000, a FROM generate_series(0, 999999) a);
ANALYZE t1;
ANALYZE t2;
ANALYZE t3;
SET enable_nestloop TO true;
SET enable_hashjoin TO true;
SET enable_material TO true;
SET enable_parasortmerge TO false;
SET enable_asyncexec TO false;
EXPLAIN (COSTS off, ANALYZE) SELECT * FROM t1 JOIN t2 ON (t1.a = t2.a) JOIN t3 on (t1.a = t3.a) ORDER BY t1.a LIMIT 10;
SET enable_nestloop TO false;
SET enable_hashjoin TO false;
SET enable_material TO false;
EXPLAIN (COSTS off, ANALYZE) SELECT * FROM t1 JOIN t2 ON (t1.a = t2.a) JOIN t3 on (t1.a = t3.a) ORDER BY t1.a LIMIT 10;
SET enable_parasortmerge TO true;
EXPLAIN (COSTS off, ANALYZE) SELECT * FROM t1 JOIN t2 ON (t1.a = t2.a) JOIN t3 on (t1.a = t3.a) ORDER BY t1.a LIMIT 10;
SET enable_asyncexec TO true;
EXPLAIN (COSTS off, ANALYZE) SELECT * FROM t1 JOIN t2 ON (t1.a = t2.a) JOIN t3 on (t1.a = t3.a) ORDER BY t1.a LIMIT 10;
==== Test results
On my environment, the following results were given.
- The first attempt, planner chooses hash join plan and it takes about 3.3s.
- The second, Merge Joins are done in single backend, takes about 5.1s.
- The third, simply use parallel execution of MJ, takes about 5.8s
- The fourth, start execution asynchronously of MJ, takes about 3.0s.
So asynchronous exeuction at least accelerates parallel execution
for this case, even faster than the current fastest (maybe) plan.
====== TODO or random thoughts, not restricted on this patch.
- This patch doesn't contain planner part, it must be aware of
async execution in order that this can be in effective.
- Some measture to control execution on bgworker would be
needed. At least merge join requires position mark/reset
functions.
- Currently, more tuples make reduce effectiveness of parallel
execution, some method to transfer tuples in larger unit would
be needed, or would be good to have shared workmem?
- The term "asynchronous execution" looks a little confusing with
paralle execution. Early execution/start might be usable but
I'm not so confident.
Any suggestions? thoughts?
I must apologize for the incomplete proposal and cluttered thoughts.
regards,
--
Kyotaro Horiguchi
NTT Open Source Software Center
Attachments:
0001-Add-infrastructure-for-executor-node-run-state.patchtext/x-patch; charset=us-asciiDownload
>From 415d4d0784e45c066b727a0a18716dd449aea044 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Wed, 8 Jul 2015 11:48:12 +0900
Subject: [PATCH 1/5] Add infrastructure for executor node run state.
This infrastructure expands the node state from what ResultNode did to
general form having four states.
The states are Inited, Started, Running and Done. Running and Done are
the same as what rs_done of ResultNode indicated. Inited state
indiates that the node has been initialized but not executed. Started
state indicates that the node has been executed but the first tuple
have not received yet. Running indicates that the node is returning
tuples and Done indicates that the node has no more tuple to return.
The nodes Group, ModifyTable, SetOp and WindowAgg had their own
run-state management so they are moved to this infrastructure by this
patch.
---
src/backend/commands/explain.c | 2 +-
src/backend/executor/nodeAgg.c | 1 +
src/backend/executor/nodeAppend.c | 1 +
src/backend/executor/nodeBitmapAnd.c | 1 +
src/backend/executor/nodeBitmapHeapscan.c | 1 +
src/backend/executor/nodeBitmapIndexscan.c | 1 +
src/backend/executor/nodeBitmapOr.c | 1 +
src/backend/executor/nodeCtescan.c | 1 +
src/backend/executor/nodeCustom.c | 1 +
src/backend/executor/nodeForeignscan.c | 1 +
src/backend/executor/nodeFunctionscan.c | 1 +
src/backend/executor/nodeGather.c | 10 ++++++----
src/backend/executor/nodeGroup.c | 14 +++++++++-----
src/backend/executor/nodeHash.c | 1 +
src/backend/executor/nodeHashjoin.c | 1 +
src/backend/executor/nodeIndexonlyscan.c | 1 +
src/backend/executor/nodeIndexscan.c | 1 +
src/backend/executor/nodeLimit.c | 1 +
src/backend/executor/nodeLockRows.c | 1 +
src/backend/executor/nodeMaterial.c | 1 +
src/backend/executor/nodeMergeAppend.c | 1 +
src/backend/executor/nodeMergejoin.c | 1 +
src/backend/executor/nodeModifyTable.c | 9 ++++++---
src/backend/executor/nodeNestloop.c | 1 +
src/backend/executor/nodeRecursiveunion.c | 1 +
src/backend/executor/nodeResult.c | 1 +
src/backend/executor/nodeSamplescan.c | 1 +
src/backend/executor/nodeSeqscan.c | 1 +
src/backend/executor/nodeSetOp.c | 20 ++++++++++++--------
src/backend/executor/nodeSort.c | 1 +
src/backend/executor/nodeSubqueryscan.c | 1 +
src/backend/executor/nodeTidscan.c | 1 +
src/backend/executor/nodeUnique.c | 1 +
src/backend/executor/nodeValuesscan.c | 1 +
src/backend/executor/nodeWindowAgg.c | 12 ++++++++----
src/backend/executor/nodeWorktablescan.c | 1 +
src/include/nodes/execnodes.h | 27 ++++++++++++++++++++++-----
37 files changed, 94 insertions(+), 30 deletions(-)
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 183d3d9..fb07213 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -2107,7 +2107,7 @@ static void
show_sort_info(SortState *sortstate, ExplainState *es)
{
Assert(IsA(sortstate, SortState));
- if (es->analyze && sortstate->sort_Done &&
+ if (es->analyze && ExecNode_is_done(sortstate) &&
sortstate->tuplesortstate != NULL)
{
Tuplesortstate *state = (Tuplesortstate *) sortstate->tuplesortstate;
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index 2e36855..2ef3bdf 100644
--- a/src/backend/executor/nodeAgg.c
+++ b/src/backend/executor/nodeAgg.c
@@ -2039,6 +2039,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
aggstate = makeNode(AggState);
aggstate->ss.ps.plan = (Plan *) node;
aggstate->ss.ps.state = estate;
+ SetNodeRunState(aggstate, Inited);
aggstate->aggs = NIL;
aggstate->numaggs = 0;
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index 2cffef8..4718c0f 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -140,6 +140,7 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
*/
appendstate->ps.plan = (Plan *) node;
appendstate->ps.state = estate;
+ SetNodeRunState(appendstate, Inited);
appendstate->appendplans = appendplanstates;
appendstate->as_nplans = nplans;
diff --git a/src/backend/executor/nodeBitmapAnd.c b/src/backend/executor/nodeBitmapAnd.c
index 205980e..8bc5bbe 100644
--- a/src/backend/executor/nodeBitmapAnd.c
+++ b/src/backend/executor/nodeBitmapAnd.c
@@ -63,6 +63,7 @@ ExecInitBitmapAnd(BitmapAnd *node, EState *estate, int eflags)
*/
bitmapandstate->ps.plan = (Plan *) node;
bitmapandstate->ps.state = estate;
+ SetNodeRunState(bitmapandstate, Inited);
bitmapandstate->bitmapplans = bitmapplanstates;
bitmapandstate->nplans = nplans;
diff --git a/src/backend/executor/nodeBitmapHeapscan.c b/src/backend/executor/nodeBitmapHeapscan.c
index c784b9e..04ce35a 100644
--- a/src/backend/executor/nodeBitmapHeapscan.c
+++ b/src/backend/executor/nodeBitmapHeapscan.c
@@ -556,6 +556,7 @@ ExecInitBitmapHeapScan(BitmapHeapScan *node, EState *estate, int eflags)
scanstate = makeNode(BitmapHeapScanState);
scanstate->ss.ps.plan = (Plan *) node;
scanstate->ss.ps.state = estate;
+ SetNodeRunState(scanstate, Inited);
scanstate->tbm = NULL;
scanstate->tbmiterator = NULL;
diff --git a/src/backend/executor/nodeBitmapIndexscan.c b/src/backend/executor/nodeBitmapIndexscan.c
index 77fc1e5..613054f 100644
--- a/src/backend/executor/nodeBitmapIndexscan.c
+++ b/src/backend/executor/nodeBitmapIndexscan.c
@@ -206,6 +206,7 @@ ExecInitBitmapIndexScan(BitmapIndexScan *node, EState *estate, int eflags)
indexstate = makeNode(BitmapIndexScanState);
indexstate->ss.ps.plan = (Plan *) node;
indexstate->ss.ps.state = estate;
+ SetNodeRunState(indexstate, Inited);
/* normally we don't make the result bitmap till runtime */
indexstate->biss_result = NULL;
diff --git a/src/backend/executor/nodeBitmapOr.c b/src/backend/executor/nodeBitmapOr.c
index 353a5b6..fcdaeaf 100644
--- a/src/backend/executor/nodeBitmapOr.c
+++ b/src/backend/executor/nodeBitmapOr.c
@@ -64,6 +64,7 @@ ExecInitBitmapOr(BitmapOr *node, EState *estate, int eflags)
*/
bitmaporstate->ps.plan = (Plan *) node;
bitmaporstate->ps.state = estate;
+ SetNodeRunState(bitmaporstate, Inited);
bitmaporstate->bitmapplans = bitmapplanstates;
bitmaporstate->nplans = nplans;
diff --git a/src/backend/executor/nodeCtescan.c b/src/backend/executor/nodeCtescan.c
index 75c1ab3..666ef91 100644
--- a/src/backend/executor/nodeCtescan.c
+++ b/src/backend/executor/nodeCtescan.c
@@ -191,6 +191,7 @@ ExecInitCteScan(CteScan *node, EState *estate, int eflags)
scanstate = makeNode(CteScanState);
scanstate->ss.ps.plan = (Plan *) node;
scanstate->ss.ps.state = estate;
+ SetNodeRunState(scanstate, Inited);
scanstate->eflags = eflags;
scanstate->cte_table = NULL;
scanstate->eof_cte = false;
diff --git a/src/backend/executor/nodeCustom.c b/src/backend/executor/nodeCustom.c
index 0a022df..e7e3b17 100644
--- a/src/backend/executor/nodeCustom.c
+++ b/src/backend/executor/nodeCustom.c
@@ -43,6 +43,7 @@ ExecInitCustomScan(CustomScan *cscan, EState *estate, int eflags)
/* fill up fields of ScanState */
css->ss.ps.plan = &cscan->scan.plan;
css->ss.ps.state = estate;
+ SetNodeRunState(css, Inited);
/* create expression context for node */
ExecAssignExprContext(estate, &css->ss.ps);
diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c
index 6165e4a..90483e4 100644
--- a/src/backend/executor/nodeForeignscan.c
+++ b/src/backend/executor/nodeForeignscan.c
@@ -128,6 +128,7 @@ ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags)
scanstate = makeNode(ForeignScanState);
scanstate->ss.ps.plan = (Plan *) node;
scanstate->ss.ps.state = estate;
+ SetNodeRunState(scanstate, Inited);
/*
* Miscellaneous initialization
diff --git a/src/backend/executor/nodeFunctionscan.c b/src/backend/executor/nodeFunctionscan.c
index f5fa2b3..849b54f 100644
--- a/src/backend/executor/nodeFunctionscan.c
+++ b/src/backend/executor/nodeFunctionscan.c
@@ -299,6 +299,7 @@ ExecInitFunctionScan(FunctionScan *node, EState *estate, int eflags)
scanstate = makeNode(FunctionScanState);
scanstate->ss.ps.plan = (Plan *) node;
scanstate->ss.ps.state = estate;
+ SetNodeRunState(scanstate, Inited);
scanstate->eflags = eflags;
/*
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index f8c1ba6..0e71dfc 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -67,6 +67,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
gatherstate = makeNode(GatherState);
gatherstate->ps.plan = (Plan *) node;
gatherstate->ps.state = estate;
+ SetNodeRunState(gatherstate, Inited);
gatherstate->need_to_scan_locally = !node->single_copy;
/*
@@ -140,7 +141,7 @@ ExecGather(GatherState *node)
* needs to allocate large dynamic segement, so it is better to do if it
* is really needed.
*/
- if (!node->initialized)
+ if (!ExecNode_is_running(node))
{
EState *estate = node->ps.state;
Gather *gather = (Gather *) node->ps.plan;
@@ -196,7 +197,8 @@ ExecGather(GatherState *node)
/* Run plan locally if no workers or not single-copy. */
node->need_to_scan_locally = (node->reader == NULL)
|| !gather->single_copy;
- node->initialized = true;
+
+ SetNodeRunState(node, Running);
}
/*
@@ -455,10 +457,10 @@ ExecReScanGather(GatherState *node)
*/
ExecShutdownGatherWorkers(node);
- node->initialized = false;
-
if (node->pei)
ExecParallelReinitialize(node->pei);
+ SetNodeRunState(node, Inited);
+
ExecReScan(node->ps.lefttree);
}
diff --git a/src/backend/executor/nodeGroup.c b/src/backend/executor/nodeGroup.c
index 5e47854..1a8f669 100644
--- a/src/backend/executor/nodeGroup.c
+++ b/src/backend/executor/nodeGroup.c
@@ -40,10 +40,13 @@ ExecGroup(GroupState *node)
TupleTableSlot *firsttupleslot;
TupleTableSlot *outerslot;
+ /* Advance the state to running if just after initialized */
+ AdvanceNodeRunStateTo(node, Running);
+
/*
* get state info from node
*/
- if (node->grp_done)
+ if (ExecNode_is_done(node))
return NULL;
econtext = node->ss.ps.ps_ExprContext;
numCols = ((Group *) node->ss.ps.plan)->numCols;
@@ -86,7 +89,7 @@ ExecGroup(GroupState *node)
if (TupIsNull(outerslot))
{
/* empty input, so return nothing */
- node->grp_done = TRUE;
+ SetNodeRunState(node, Done);
return NULL;
}
/* Copy tuple into firsttupleslot */
@@ -138,7 +141,7 @@ ExecGroup(GroupState *node)
if (TupIsNull(outerslot))
{
/* no more groups, so we're done */
- node->grp_done = TRUE;
+ SetNodeRunState(node, Done);
return NULL;
}
@@ -207,7 +210,7 @@ ExecInitGroup(Group *node, EState *estate, int eflags)
grpstate = makeNode(GroupState);
grpstate->ss.ps.plan = (Plan *) node;
grpstate->ss.ps.state = estate;
- grpstate->grp_done = FALSE;
+ SetNodeRunState(grpstate, Inited);
/*
* create expression context
@@ -282,7 +285,6 @@ ExecReScanGroup(GroupState *node)
{
PlanState *outerPlan = outerPlanState(node);
- node->grp_done = FALSE;
node->ss.ps.ps_TupFromTlist = false;
/* must clear first tuple */
ExecClearTuple(node->ss.ss_ScanTupleSlot);
@@ -293,4 +295,6 @@ ExecReScanGroup(GroupState *node)
*/
if (outerPlan->chgParam == NULL)
ExecReScan(outerPlan);
+
+ SetNodeRunState(node, Inited);
}
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 5e05ec3..fcbc44e 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -172,6 +172,7 @@ ExecInitHash(Hash *node, EState *estate, int eflags)
hashstate = makeNode(HashState);
hashstate->ps.plan = (Plan *) node;
hashstate->ps.state = estate;
+ SetNodeRunState(hashstate, Inited);
hashstate->hashtable = NULL;
hashstate->hashkeys = NIL; /* will be set by parent HashJoin */
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index 1d78cdf..064421e 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -451,6 +451,7 @@ ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
hjstate = makeNode(HashJoinState);
hjstate->js.ps.plan = (Plan *) node;
hjstate->js.ps.state = estate;
+ SetNodeRunState(hjstate, Inited);
/*
* Miscellaneous initialization
diff --git a/src/backend/executor/nodeIndexonlyscan.c b/src/backend/executor/nodeIndexonlyscan.c
index 9f54c46..0e84314 100644
--- a/src/backend/executor/nodeIndexonlyscan.c
+++ b/src/backend/executor/nodeIndexonlyscan.c
@@ -403,6 +403,7 @@ ExecInitIndexOnlyScan(IndexOnlyScan *node, EState *estate, int eflags)
indexstate = makeNode(IndexOnlyScanState);
indexstate->ss.ps.plan = (Plan *) node;
indexstate->ss.ps.state = estate;
+ SetNodeRunState(indexstate, Inited);
indexstate->ioss_HeapFetches = 0;
/*
diff --git a/src/backend/executor/nodeIndexscan.c b/src/backend/executor/nodeIndexscan.c
index c0f14db..534d2f4 100644
--- a/src/backend/executor/nodeIndexscan.c
+++ b/src/backend/executor/nodeIndexscan.c
@@ -828,6 +828,7 @@ ExecInitIndexScan(IndexScan *node, EState *estate, int eflags)
indexstate = makeNode(IndexScanState);
indexstate->ss.ps.plan = (Plan *) node;
indexstate->ss.ps.state = estate;
+ SetNodeRunState(indexstate, Inited);
/*
* Miscellaneous initialization
diff --git a/src/backend/executor/nodeLimit.c b/src/backend/executor/nodeLimit.c
index 40ac0d7..1b675d4 100644
--- a/src/backend/executor/nodeLimit.c
+++ b/src/backend/executor/nodeLimit.c
@@ -384,6 +384,7 @@ ExecInitLimit(Limit *node, EState *estate, int eflags)
limitstate = makeNode(LimitState);
limitstate->ps.plan = (Plan *) node;
limitstate->ps.state = estate;
+ SetNodeRunState(limitstate, Inited);
limitstate->lstate = LIMIT_INITIAL;
diff --git a/src/backend/executor/nodeLockRows.c b/src/backend/executor/nodeLockRows.c
index b9b0f06..eeeca0b 100644
--- a/src/backend/executor/nodeLockRows.c
+++ b/src/backend/executor/nodeLockRows.c
@@ -365,6 +365,7 @@ ExecInitLockRows(LockRows *node, EState *estate, int eflags)
lrstate = makeNode(LockRowsState);
lrstate->ps.plan = (Plan *) node;
lrstate->ps.state = estate;
+ SetNodeRunState(lrstate, Inited);
/*
* Miscellaneous initialization
diff --git a/src/backend/executor/nodeMaterial.c b/src/backend/executor/nodeMaterial.c
index b2b5aa7..d9a67f4 100644
--- a/src/backend/executor/nodeMaterial.c
+++ b/src/backend/executor/nodeMaterial.c
@@ -171,6 +171,7 @@ ExecInitMaterial(Material *node, EState *estate, int eflags)
matstate = makeNode(MaterialState);
matstate->ss.ps.plan = (Plan *) node;
matstate->ss.ps.state = estate;
+ SetNodeRunState(matstate, Inited);
/*
* We must have a tuplestore buffering the subplan output to do backward
diff --git a/src/backend/executor/nodeMergeAppend.c b/src/backend/executor/nodeMergeAppend.c
index bdf7680..3901255 100644
--- a/src/backend/executor/nodeMergeAppend.c
+++ b/src/backend/executor/nodeMergeAppend.c
@@ -83,6 +83,7 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
*/
mergestate->ps.plan = (Plan *) node;
mergestate->ps.state = estate;
+ SetNodeRunState(mergestate, Inited);
mergestate->mergeplans = mergeplanstates;
mergestate->ms_nplans = nplans;
diff --git a/src/backend/executor/nodeMergejoin.c b/src/backend/executor/nodeMergejoin.c
index 34b6cf6..9970db1 100644
--- a/src/backend/executor/nodeMergejoin.c
+++ b/src/backend/executor/nodeMergejoin.c
@@ -1485,6 +1485,7 @@ ExecInitMergeJoin(MergeJoin *node, EState *estate, int eflags)
mergestate = makeNode(MergeJoinState);
mergestate->js.ps.plan = (Plan *) node;
mergestate->js.ps.state = estate;
+ SetNodeRunState(mergestate, Inited);
/*
* Miscellaneous initialization
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index dabaea9..b59c94e 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -1268,6 +1268,9 @@ ExecModifyTable(ModifyTableState *node)
HeapTupleData oldtupdata;
HeapTuple oldtuple;
+ /* Advance the state to running if just after initialized */
+ AdvanceNodeRunStateTo(node, Running);
+
/*
* This should NOT get called during EvalPlanQual; we should have passed a
* subplan tree to EvalPlanQual, instead. Use a runtime test not just
@@ -1286,7 +1289,7 @@ ExecModifyTable(ModifyTableState *node)
* our subplan's nodes aren't necessarily robust against being called
* extra times.
*/
- if (node->mt_done)
+ if (ExecNode_is_done(node))
return NULL;
/*
@@ -1463,7 +1466,7 @@ ExecModifyTable(ModifyTableState *node)
*/
fireASTriggers(node);
- node->mt_done = true;
+ SetNodeRunState(node, Done);
return NULL;
}
@@ -1494,11 +1497,11 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
mtstate = makeNode(ModifyTableState);
mtstate->ps.plan = (Plan *) node;
mtstate->ps.state = estate;
+ SetNodeRunState(mtstate, Inited);
mtstate->ps.targetlist = NIL; /* not actually used */
mtstate->operation = operation;
mtstate->canSetTag = node->canSetTag;
- mtstate->mt_done = false;
mtstate->mt_plans = (PlanState **) palloc0(sizeof(PlanState *) * nplans);
mtstate->resultRelInfo = estate->es_result_relations + node->resultRelIndex;
diff --git a/src/backend/executor/nodeNestloop.c b/src/backend/executor/nodeNestloop.c
index e66bcda..b4c2f26 100644
--- a/src/backend/executor/nodeNestloop.c
+++ b/src/backend/executor/nodeNestloop.c
@@ -309,6 +309,7 @@ ExecInitNestLoop(NestLoop *node, EState *estate, int eflags)
nlstate = makeNode(NestLoopState);
nlstate->js.ps.plan = (Plan *) node;
nlstate->js.ps.state = estate;
+ SetNodeRunState(nlstate, Inited);
/*
* Miscellaneous initialization
diff --git a/src/backend/executor/nodeRecursiveunion.c b/src/backend/executor/nodeRecursiveunion.c
index 8df1639..118496e 100644
--- a/src/backend/executor/nodeRecursiveunion.c
+++ b/src/backend/executor/nodeRecursiveunion.c
@@ -176,6 +176,7 @@ ExecInitRecursiveUnion(RecursiveUnion *node, EState *estate, int eflags)
rustate = makeNode(RecursiveUnionState);
rustate->ps.plan = (Plan *) node;
rustate->ps.state = estate;
+ SetNodeRunState(rustate, Inited);
rustate->eqfunctions = NULL;
rustate->hashfunctions = NULL;
diff --git a/src/backend/executor/nodeResult.c b/src/backend/executor/nodeResult.c
index 8d3dde0..b4ee402 100644
--- a/src/backend/executor/nodeResult.c
+++ b/src/backend/executor/nodeResult.c
@@ -217,6 +217,7 @@ ExecInitResult(Result *node, EState *estate, int eflags)
resstate = makeNode(ResultState);
resstate->ps.plan = (Plan *) node;
resstate->ps.state = estate;
+ SetNodeRunState(resstate, Inited);
resstate->rs_done = false;
resstate->rs_checkqual = (node->resconstantqual == NULL) ? false : true;
diff --git a/src/backend/executor/nodeSamplescan.c b/src/backend/executor/nodeSamplescan.c
index dbe84b0..0f75110 100644
--- a/src/backend/executor/nodeSamplescan.c
+++ b/src/backend/executor/nodeSamplescan.c
@@ -152,6 +152,7 @@ ExecInitSampleScan(SampleScan *node, EState *estate, int eflags)
scanstate = makeNode(SampleScanState);
scanstate->ss.ps.plan = (Plan *) node;
scanstate->ss.ps.state = estate;
+ SetNodeRunState(scanstate, Inited);
/*
* Miscellaneous initialization
diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c
index b858f2f..0ee33ed 100644
--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -177,6 +177,7 @@ ExecInitSeqScan(SeqScan *node, EState *estate, int eflags)
scanstate = makeNode(SeqScanState);
scanstate->ss.ps.plan = (Plan *) node;
scanstate->ss.ps.state = estate;
+ SetNodeRunState(scanstate, Inited);
/*
* Miscellaneous initialization
diff --git a/src/backend/executor/nodeSetOp.c b/src/backend/executor/nodeSetOp.c
index 7d00cc5..123d051 100644
--- a/src/backend/executor/nodeSetOp.c
+++ b/src/backend/executor/nodeSetOp.c
@@ -197,6 +197,9 @@ ExecSetOp(SetOpState *node)
SetOp *plannode = (SetOp *) node->ps.plan;
TupleTableSlot *resultTupleSlot = node->ps.ps_ResultTupleSlot;
+ /* Advance the state to running if just after initialized */
+ AdvanceNodeRunStateTo(node, Running);
+
/*
* If the previously-returned tuple needs to be returned more than once,
* keep returning it.
@@ -208,7 +211,7 @@ ExecSetOp(SetOpState *node)
}
/* Otherwise, we're done if we are out of groups */
- if (node->setop_done)
+ if (ExecNode_is_done(node))
return NULL;
/* Fetch the next tuple group according to the correct strategy */
@@ -244,7 +247,7 @@ setop_retrieve_direct(SetOpState *setopstate)
/*
* We loop retrieving groups until we find one we should return
*/
- while (!setopstate->setop_done)
+ while (ExecNode_is_running(setopstate))
{
/*
* If we don't already have the first tuple of the new group, fetch it
@@ -261,7 +264,7 @@ setop_retrieve_direct(SetOpState *setopstate)
else
{
/* outer plan produced no tuples at all */
- setopstate->setop_done = true;
+ SetNodeRunState(setopstate, Done);
return NULL;
}
}
@@ -293,7 +296,7 @@ setop_retrieve_direct(SetOpState *setopstate)
if (TupIsNull(outerslot))
{
/* no more outer-plan tuples available */
- setopstate->setop_done = true;
+ SetNodeRunState(setopstate, Done);
break;
}
@@ -433,7 +436,7 @@ setop_retrieve_hash_table(SetOpState *setopstate)
/*
* We loop retrieving groups until we find one we should return
*/
- while (!setopstate->setop_done)
+ while (ExecNode_is_running(setopstate))
{
/*
* Find the next entry in the hash table
@@ -442,7 +445,7 @@ setop_retrieve_hash_table(SetOpState *setopstate)
if (entry == NULL)
{
/* No more entries in hashtable, so done */
- setopstate->setop_done = true;
+ SetNodeRunState(setopstate, Done);
return NULL;
}
@@ -490,7 +493,7 @@ ExecInitSetOp(SetOp *node, EState *estate, int eflags)
setopstate->eqfunctions = NULL;
setopstate->hashfunctions = NULL;
- setopstate->setop_done = false;
+ SetNodeRunState(setopstate, Inited);
setopstate->numOutput = 0;
setopstate->pergroup = NULL;
setopstate->grp_firstTuple = NULL;
@@ -601,7 +604,6 @@ void
ExecReScanSetOp(SetOpState *node)
{
ExecClearTuple(node->ps.ps_ResultTupleSlot);
- node->setop_done = false;
node->numOutput = 0;
if (((SetOp *) node->ps.plan)->strategy == SETOP_HASHED)
@@ -651,4 +653,6 @@ ExecReScanSetOp(SetOpState *node)
*/
if (node->ps.lefttree->chgParam == NULL)
ExecReScan(node->ps.lefttree);
+
+ SetNodeRunState(node, Inited);
}
diff --git a/src/backend/executor/nodeSort.c b/src/backend/executor/nodeSort.c
index af1dccf..3ae5b89 100644
--- a/src/backend/executor/nodeSort.c
+++ b/src/backend/executor/nodeSort.c
@@ -162,6 +162,7 @@ ExecInitSort(Sort *node, EState *estate, int eflags)
sortstate = makeNode(SortState);
sortstate->ss.ps.plan = (Plan *) node;
sortstate->ss.ps.state = estate;
+ SetNodeRunState(sortstate, Inited);
/*
* We must have random access to the sort output to do backward scan or
diff --git a/src/backend/executor/nodeSubqueryscan.c b/src/backend/executor/nodeSubqueryscan.c
index e5d1e54..497d6df 100644
--- a/src/backend/executor/nodeSubqueryscan.c
+++ b/src/backend/executor/nodeSubqueryscan.c
@@ -117,6 +117,7 @@ ExecInitSubqueryScan(SubqueryScan *node, EState *estate, int eflags)
subquerystate = makeNode(SubqueryScanState);
subquerystate->ss.ps.plan = (Plan *) node;
subquerystate->ss.ps.state = estate;
+ SetNodeRunState(subquerystate, Inited);
/*
* Miscellaneous initialization
diff --git a/src/backend/executor/nodeTidscan.c b/src/backend/executor/nodeTidscan.c
index 203f1ac..f19e735 100644
--- a/src/backend/executor/nodeTidscan.c
+++ b/src/backend/executor/nodeTidscan.c
@@ -461,6 +461,7 @@ ExecInitTidScan(TidScan *node, EState *estate, int eflags)
tidstate = makeNode(TidScanState);
tidstate->ss.ps.plan = (Plan *) node;
tidstate->ss.ps.state = estate;
+ SetNodeRunState(tidstate, Inited);
/*
* Miscellaneous initialization
diff --git a/src/backend/executor/nodeUnique.c b/src/backend/executor/nodeUnique.c
index 1cb4a8a..f259f32 100644
--- a/src/backend/executor/nodeUnique.c
+++ b/src/backend/executor/nodeUnique.c
@@ -122,6 +122,7 @@ ExecInitUnique(Unique *node, EState *estate, int eflags)
uniquestate = makeNode(UniqueState);
uniquestate->ps.plan = (Plan *) node;
uniquestate->ps.state = estate;
+ SetNodeRunState(uniquestate, Inited);
/*
* Miscellaneous initialization
diff --git a/src/backend/executor/nodeValuesscan.c b/src/backend/executor/nodeValuesscan.c
index a39695a..c56199c 100644
--- a/src/backend/executor/nodeValuesscan.c
+++ b/src/backend/executor/nodeValuesscan.c
@@ -205,6 +205,7 @@ ExecInitValuesScan(ValuesScan *node, EState *estate, int eflags)
scanstate = makeNode(ValuesScanState);
scanstate->ss.ps.plan = (Plan *) node;
scanstate->ss.ps.state = estate;
+ SetNodeRunState(scanstate, Inited);
/*
* Miscellaneous initialization
diff --git a/src/backend/executor/nodeWindowAgg.c b/src/backend/executor/nodeWindowAgg.c
index c371d4d..8734e8e 100644
--- a/src/backend/executor/nodeWindowAgg.c
+++ b/src/backend/executor/nodeWindowAgg.c
@@ -1564,7 +1564,10 @@ ExecWindowAgg(WindowAggState *winstate)
int i;
int numfuncs;
- if (winstate->all_done)
+ /* Advance the state to running if just after initialized */
+ AdvanceNodeRunStateTo(winstate, Running);
+
+ if (ExecNode_is_done(winstate))
return NULL;
/*
@@ -1686,7 +1689,7 @@ restart:
}
else
{
- winstate->all_done = true;
+ SetNodeRunState(winstate, Done);
return NULL;
}
}
@@ -1787,6 +1790,7 @@ ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags)
winstate = makeNode(WindowAggState);
winstate->ss.ps.plan = (Plan *) node;
winstate->ss.ps.state = estate;
+ SetNodeRunState(winstate, Inited);
/*
* Create expression contexts. We need two, one for per-input-tuple
@@ -2060,8 +2064,6 @@ ExecReScanWindowAgg(WindowAggState *node)
PlanState *outerPlan = outerPlanState(node);
ExprContext *econtext = node->ss.ps.ps_ExprContext;
- node->all_done = false;
-
node->ss.ps.ps_TupFromTlist = false;
node->all_first = true;
@@ -2085,6 +2087,8 @@ ExecReScanWindowAgg(WindowAggState *node)
*/
if (outerPlan->chgParam == NULL)
ExecReScan(outerPlan);
+
+ SetNodeRunState(node, Inited);
}
/*
diff --git a/src/backend/executor/nodeWorktablescan.c b/src/backend/executor/nodeWorktablescan.c
index 618508e..799e96b 100644
--- a/src/backend/executor/nodeWorktablescan.c
+++ b/src/backend/executor/nodeWorktablescan.c
@@ -144,6 +144,7 @@ ExecInitWorkTableScan(WorkTableScan *node, EState *estate, int eflags)
scanstate = makeNode(WorkTableScanState);
scanstate->ss.ps.plan = (Plan *) node;
scanstate->ss.ps.state = estate;
+ SetNodeRunState(scanstate, Inited);
scanstate->rustate = NULL; /* we'll set this later */
/*
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index eb3591a..4527a98 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -344,6 +344,27 @@ typedef struct ResultRelInfo
} ResultRelInfo;
/* ----------------
+ * Enumeration and macros for executor node running state.
+ */
+typedef enum ExecNodeRunState
+{
+ ERunState_Inited, /* Just after initialized */
+ ERunState_Started, /* Execution started but needs one more call
+ * for the first tuple */
+ ERunState_Running, /* Returning the next tuple */
+ ERunState_Done /* No tuple to return */
+} ExecNodeRunState;
+
+#define SetNodeRunState(nd,st) (((PlanState*)nd)->runstate = (ERunState_##st))
+#define AdvanceNodeRunStateTo(nd,st) \
+ do {\
+ if (((PlanState*)nd)->runstate < (ERunState_##st))\
+ ((PlanState*)nd)->runstate = (ERunState_##st);\
+ } while(0);
+#define ExecNode_is_running(nd) (((PlanState*)nd)->runstate == ERunState_Running)
+#define ExecNode_is_done(nd) (((PlanState*)nd)->runstate == ERunState_Done)
+
+/* ----------------
* EState information
*
* Master working state for an Executor invocation
@@ -1056,6 +1077,7 @@ typedef struct PlanState
ProjectionInfo *ps_ProjInfo; /* info for doing tuple projection */
bool ps_TupFromTlist;/* state flag for processing set-valued
* functions in targetlist */
+ ExecNodeRunState runstate; /* Execution state of this node */
} PlanState;
/* ----------------
@@ -1117,7 +1139,6 @@ typedef struct ModifyTableState
PlanState ps; /* its first field is NodeTag */
CmdType operation; /* INSERT, UPDATE, or DELETE */
bool canSetTag; /* do we set the command tag/es_processed? */
- bool mt_done; /* are we done? */
PlanState **mt_plans; /* subplans (one per target rel) */
int mt_nplans; /* number of plans in the array */
int mt_whichplan; /* which one is being executed (0..n-1) */
@@ -1812,7 +1833,6 @@ typedef struct GroupState
{
ScanState ss; /* its first field is NodeTag */
FmgrInfo *eqfunctions; /* per-field lookup data for equality fns */
- bool grp_done; /* indicates completion of Group scan */
} GroupState;
/* ---------------------
@@ -1917,7 +1937,6 @@ typedef struct WindowAggState
ExprContext *tmpcontext; /* short-term evaluation context */
bool all_first; /* true if the scan is starting */
- bool all_done; /* true if the scan is finished */
bool partition_spooled; /* true if all tuples in current
* partition have been spooled into
* tuplestore */
@@ -1965,7 +1984,6 @@ typedef struct UniqueState
typedef struct GatherState
{
PlanState ps; /* its first field is NodeTag */
- bool initialized;
struct ParallelExecutorInfo *pei;
int nreaders;
int nextreader;
@@ -2003,7 +2021,6 @@ typedef struct SetOpState
PlanState ps; /* its first field is NodeTag */
FmgrInfo *eqfunctions; /* per-grouping-field equality fns */
FmgrInfo *hashfunctions; /* per-grouping-field hash fns */
- bool setop_done; /* indicates completion of output scan */
long numOutput; /* number of dups left to output */
MemoryContext tempContext; /* short-term context for comparisons */
/* these fields are used in SETOP_SORTED mode: */
--
1.8.3.1
0002-Change-all-tuple-returning-execution-nodes-to-mainta.patchtext/x-patch; charset=us-asciiDownload
>From e3f764239f6494616137c2c23f8246bec9c52cd4 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Wed, 8 Jul 2015 17:39:47 +0900
Subject: [PATCH 2/5] Change all tuple-returning execution nodes to maintain
run-state appropriately.
This doesn't change any behavior but maintain run-state to be
consistent with whether returning tuple is null or not at
ExecProcNode.
---
src/backend/executor/nodeAgg.c | 6 ++++++
src/backend/executor/nodeAppend.c | 7 +++++++
src/backend/executor/nodeBitmapAnd.c | 6 ++++++
src/backend/executor/nodeBitmapHeapscan.c | 13 ++++++++++++-
src/backend/executor/nodeBitmapIndexscan.c | 6 ++++++
src/backend/executor/nodeBitmapOr.c | 6 ++++++
src/backend/executor/nodeCtescan.c | 13 ++++++++++++-
src/backend/executor/nodeCustom.c | 15 ++++++++++++++-
src/backend/executor/nodeForeignscan.c | 12 +++++++++++-
src/backend/executor/nodeFunctionscan.c | 13 ++++++++++++-
src/backend/executor/nodeGather.c | 9 +++++++--
src/backend/executor/nodeGroup.c | 4 ++--
src/backend/executor/nodeHash.c | 6 ++++++
src/backend/executor/nodeHashjoin.c | 11 +++++++++++
src/backend/executor/nodeIndexonlyscan.c | 16 +++++++++++++++-
src/backend/executor/nodeIndexscan.c | 18 ++++++++++++++++--
src/backend/executor/nodeLimit.c | 22 ++++++++++++++++++++++
src/backend/executor/nodeLockRows.c | 7 +++++++
src/backend/executor/nodeMaterial.c | 9 +++++++++
src/backend/executor/nodeMergeAppend.c | 5 +++++
src/backend/executor/nodeMergejoin.c | 12 +++++++++++-
src/backend/executor/nodeNestloop.c | 5 +++++
src/backend/executor/nodeRecursiveunion.c | 5 +++++
src/backend/executor/nodeResult.c | 12 ++++++++++++
src/backend/executor/nodeSamplescan.c | 10 +++++++++-
src/backend/executor/nodeSeqscan.c | 11 ++++++++++-
src/backend/executor/nodeSetOp.c | 3 +--
src/backend/executor/nodeSort.c | 11 +++++++++++
src/backend/executor/nodeSubqueryscan.c | 11 ++++++++++-
src/backend/executor/nodeTidscan.c | 12 +++++++++++-
src/backend/executor/nodeUnique.c | 5 +++++
src/backend/executor/nodeValuesscan.c | 12 +++++++++++-
src/backend/executor/nodeWindowAgg.c | 3 +--
src/backend/executor/nodeWorktablescan.c | 12 +++++++++++-
src/include/nodes/execnodes.h | 12 +++++++-----
35 files changed, 312 insertions(+), 28 deletions(-)
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index 2ef3bdf..ed29e3a 100644
--- a/src/backend/executor/nodeAgg.c
+++ b/src/backend/executor/nodeAgg.c
@@ -1522,6 +1522,8 @@ ExecAgg(AggState *node)
{
TupleTableSlot *result;
+ SetNodeRunState(node, Running);
+
/*
* Check to see if we're still projecting out tuples from a previous agg
* tuple (because there is a function-returning-set in the projection
@@ -1562,6 +1564,7 @@ ExecAgg(AggState *node)
return result;
}
+ SetNodeRunState(node, Done);
return NULL;
}
@@ -2961,6 +2964,9 @@ ExecReScanAgg(AggState *node)
int numGroupingSets = Max(node->maxsets, 1);
int setno;
+
+ SetNodeRunState(node, Inited);
+
node->agg_done = false;
node->ss.ps.ps_TupFromTlist = false;
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index 4718c0f..03b3b66 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -194,6 +194,8 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
TupleTableSlot *
ExecAppend(AppendState *node)
{
+ SetNodeRunState(node, Running);
+
for (;;)
{
PlanState *subnode;
@@ -229,7 +231,10 @@ ExecAppend(AppendState *node)
else
node->as_whichplan--;
if (!exec_append_initialize_next(node))
+ {
+ SetNodeRunState(node, Done);
return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+ }
/* Else loop back and try to get a tuple from the new subplan */
}
@@ -268,6 +273,8 @@ ExecReScanAppend(AppendState *node)
{
int i;
+ SetNodeRunState(node, Inited);
+
for (i = 0; i < node->as_nplans; i++)
{
PlanState *subnode = node->appendplans[i];
diff --git a/src/backend/executor/nodeBitmapAnd.c b/src/backend/executor/nodeBitmapAnd.c
index 8bc5bbe..64f202e 100644
--- a/src/backend/executor/nodeBitmapAnd.c
+++ b/src/backend/executor/nodeBitmapAnd.c
@@ -105,6 +105,8 @@ MultiExecBitmapAnd(BitmapAndState *node)
if (node->ps.instrument)
InstrStartNode(node->ps.instrument);
+ SetNodeRunState(node, Running);
+
/*
* get information from the node
*/
@@ -146,6 +148,8 @@ MultiExecBitmapAnd(BitmapAndState *node)
if (result == NULL)
elog(ERROR, "BitmapAnd doesn't support zero inputs");
+ SetNodeRunState(node, Done);
+
/* must provide our own instrumentation support */
if (node->ps.instrument)
InstrStopNode(node->ps.instrument, 0 /* XXX */ );
@@ -189,6 +193,8 @@ ExecReScanBitmapAnd(BitmapAndState *node)
{
int i;
+ SetNodeRunState(node, Inited);
+
for (i = 0; i < node->nplans; i++)
{
PlanState *subnode = node->bitmapplans[i];
diff --git a/src/backend/executor/nodeBitmapHeapscan.c b/src/backend/executor/nodeBitmapHeapscan.c
index 04ce35a..0dc61ba 100644
--- a/src/backend/executor/nodeBitmapHeapscan.c
+++ b/src/backend/executor/nodeBitmapHeapscan.c
@@ -437,9 +437,18 @@ BitmapHeapRecheck(BitmapHeapScanState *node, TupleTableSlot *slot)
TupleTableSlot *
ExecBitmapHeapScan(BitmapHeapScanState *node)
{
- return ExecScan(&node->ss,
+ TupleTableSlot *slot;
+
+ SetNodeRunState(node, Running);
+
+ slot = ExecScan(&node->ss,
(ExecScanAccessMtd) BitmapHeapNext,
(ExecScanRecheckMtd) BitmapHeapRecheck);
+
+ if (TupIsNull(slot))
+ SetNodeRunState(node, Done);
+
+ return slot;
}
/* ----------------------------------------------------------------
@@ -451,6 +460,8 @@ ExecReScanBitmapHeapScan(BitmapHeapScanState *node)
{
PlanState *outerPlan = outerPlanState(node);
+ SetNodeRunState(node, Inited);
+
/* rescan to release any page pin */
heap_rescan(node->ss.ss_currentScanDesc, NULL);
diff --git a/src/backend/executor/nodeBitmapIndexscan.c b/src/backend/executor/nodeBitmapIndexscan.c
index 613054f..acfce3d 100644
--- a/src/backend/executor/nodeBitmapIndexscan.c
+++ b/src/backend/executor/nodeBitmapIndexscan.c
@@ -44,6 +44,8 @@ MultiExecBitmapIndexScan(BitmapIndexScanState *node)
if (node->ss.ps.instrument)
InstrStartNode(node->ss.ps.instrument);
+ SetNodeRunState(node, Running);
+
/*
* extract necessary information from index scan node
*/
@@ -98,6 +100,8 @@ MultiExecBitmapIndexScan(BitmapIndexScanState *node)
NULL, 0);
}
+ SetNodeRunState(node, Done);
+
/* must provide our own instrumentation support */
if (node->ss.ps.instrument)
InstrStopNode(node->ss.ps.instrument, nTuples);
@@ -117,6 +121,8 @@ ExecReScanBitmapIndexScan(BitmapIndexScanState *node)
{
ExprContext *econtext = node->biss_RuntimeContext;
+ SetNodeRunState(node, Inited);
+
/*
* Reset the runtime-key context so we don't leak memory as each outer
* tuple is scanned. Note this assumes that we will recalculate *all*
diff --git a/src/backend/executor/nodeBitmapOr.c b/src/backend/executor/nodeBitmapOr.c
index fcdaeaf..7a5bcf5 100644
--- a/src/backend/executor/nodeBitmapOr.c
+++ b/src/backend/executor/nodeBitmapOr.c
@@ -106,6 +106,8 @@ MultiExecBitmapOr(BitmapOrState *node)
if (node->ps.instrument)
InstrStartNode(node->ps.instrument);
+ SetNodeRunState(node, Running);
+
/*
* get information from the node
*/
@@ -162,6 +164,8 @@ MultiExecBitmapOr(BitmapOrState *node)
if (result == NULL)
elog(ERROR, "BitmapOr doesn't support zero inputs");
+ SetNodeRunState(node, Done);
+
/* must provide our own instrumentation support */
if (node->ps.instrument)
InstrStopNode(node->ps.instrument, 0 /* XXX */ );
@@ -205,6 +209,8 @@ ExecReScanBitmapOr(BitmapOrState *node)
{
int i;
+ SetNodeRunState(node, Inited);
+
for (i = 0; i < node->nplans; i++)
{
PlanState *subnode = node->bitmapplans[i];
diff --git a/src/backend/executor/nodeCtescan.c b/src/backend/executor/nodeCtescan.c
index 666ef91..d237370 100644
--- a/src/backend/executor/nodeCtescan.c
+++ b/src/backend/executor/nodeCtescan.c
@@ -152,9 +152,18 @@ CteScanRecheck(CteScanState *node, TupleTableSlot *slot)
TupleTableSlot *
ExecCteScan(CteScanState *node)
{
- return ExecScan(&node->ss,
+ TupleTableSlot *slot;
+
+ SetNodeRunState(node, Running);
+
+ slot = ExecScan(&node->ss,
(ExecScanAccessMtd) CteScanNext,
(ExecScanRecheckMtd) CteScanRecheck);
+
+ if (TupIsNull(slot))
+ SetNodeRunState(node, Done);
+
+ return slot;
}
@@ -312,6 +321,8 @@ ExecReScanCteScan(CteScanState *node)
{
Tuplestorestate *tuplestorestate = node->leader->cte_table;
+ SetNodeRunState(node, Inited);
+
ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
ExecScanReScan(&node->ss);
diff --git a/src/backend/executor/nodeCustom.c b/src/backend/executor/nodeCustom.c
index e7e3b17..9f85fd7 100644
--- a/src/backend/executor/nodeCustom.c
+++ b/src/backend/executor/nodeCustom.c
@@ -110,8 +110,16 @@ ExecInitCustomScan(CustomScan *cscan, EState *estate, int eflags)
TupleTableSlot *
ExecCustomScan(CustomScanState *node)
{
+ TupleTableSlot *slot;
+
Assert(node->methods->ExecCustomScan != NULL);
- return node->methods->ExecCustomScan(node);
+ SetNodeRunState(node, Running);
+ slot = node->methods->ExecCustomScan(node);
+
+ if (TupIsNull(slot))
+ SetNodeRunState(node, Done);
+
+ return slot;
}
void
@@ -136,6 +144,7 @@ void
ExecReScanCustomScan(CustomScanState *node)
{
Assert(node->methods->ReScanCustomScan != NULL);
+ SetNodeRunState(node, Inited);
node->methods->ReScanCustomScan(node);
}
@@ -158,5 +167,9 @@ ExecCustomRestrPos(CustomScanState *node)
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("custom-scan \"%s\" does not support MarkPos",
node->methods->CustomName)));
+
node->methods->RestrPosCustomScan(node);
+
+ /* Restoring position in turn restores run state */
+ SetNodeRunState(node, Running);
}
diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c
index 90483e4..895af86 100644
--- a/src/backend/executor/nodeForeignscan.c
+++ b/src/backend/executor/nodeForeignscan.c
@@ -100,9 +100,17 @@ ForeignRecheck(ForeignScanState *node, TupleTableSlot *slot)
TupleTableSlot *
ExecForeignScan(ForeignScanState *node)
{
- return ExecScan((ScanState *) node,
+ TupleTableSlot * slot;
+
+ SetNodeRunState(node, Running);
+ slot = ExecScan((ScanState *) node,
(ExecScanAccessMtd) ForeignNext,
(ExecScanRecheckMtd) ForeignRecheck);
+
+ if (TupIsNull(slot))
+ SetNodeRunState(node, Done);
+
+ return slot;
}
@@ -247,6 +255,8 @@ ExecEndForeignScan(ForeignScanState *node)
void
ExecReScanForeignScan(ForeignScanState *node)
{
+ SetNodeRunState(node, Inited);
+
node->fdwroutine->ReScanForeignScan(node);
ExecScanReScan(&node->ss);
diff --git a/src/backend/executor/nodeFunctionscan.c b/src/backend/executor/nodeFunctionscan.c
index 849b54f..08f9bbf 100644
--- a/src/backend/executor/nodeFunctionscan.c
+++ b/src/backend/executor/nodeFunctionscan.c
@@ -265,9 +265,18 @@ FunctionRecheck(FunctionScanState *node, TupleTableSlot *slot)
TupleTableSlot *
ExecFunctionScan(FunctionScanState *node)
{
- return ExecScan(&node->ss,
+ TupleTableSlot *slot;
+
+ SetNodeRunState(node, Running);
+
+ slot = ExecScan(&node->ss,
(ExecScanAccessMtd) FunctionNext,
(ExecScanRecheckMtd) FunctionRecheck);
+
+ if (TupIsNull(slot))
+ SetNodeRunState(node, Done);
+
+ return slot;
}
/* ----------------------------------------------------------------
@@ -569,6 +578,8 @@ ExecReScanFunctionScan(FunctionScanState *node)
int i;
Bitmapset *chgparam = node->ss.ps.chgParam;
+ SetNodeRunState(node, Inited);
+
ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
for (i = 0; i < node->nfuncs; i++)
{
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index 0e71dfc..aceb358 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -135,13 +135,15 @@ ExecGather(GatherState *node)
ExprDoneCond isDone;
ExprContext *econtext;
+ SetNodeRunState(node, Running);
+
/*
* Initialize the parallel context and workers on first execution. We do
* this on first execution rather than during node initialization, as it
* needs to allocate large dynamic segement, so it is better to do if it
* is really needed.
*/
- if (!ExecNode_is_running(node))
+ if (ExecNode_is_inited(node))
{
EState *estate = node->ps.state;
Gather *gather = (Gather *) node->ps.plan;
@@ -232,7 +234,10 @@ ExecGather(GatherState *node)
*/
slot = gather_getnext(node);
if (TupIsNull(slot))
+ {
+ SetNodeRunState(node, Done);
return NULL;
+ }
/*
* form the result tuple using ExecProject(), and return it --- unless
@@ -461,6 +466,6 @@ ExecReScanGather(GatherState *node)
ExecParallelReinitialize(node->pei);
SetNodeRunState(node, Inited);
-
+
ExecReScan(node->ps.lefttree);
}
diff --git a/src/backend/executor/nodeGroup.c b/src/backend/executor/nodeGroup.c
index 1a8f669..a593d9f 100644
--- a/src/backend/executor/nodeGroup.c
+++ b/src/backend/executor/nodeGroup.c
@@ -285,6 +285,8 @@ ExecReScanGroup(GroupState *node)
{
PlanState *outerPlan = outerPlanState(node);
+ SetNodeRunState(node, Inited);
+
node->ss.ps.ps_TupFromTlist = false;
/* must clear first tuple */
ExecClearTuple(node->ss.ss_ScanTupleSlot);
@@ -295,6 +297,4 @@ ExecReScanGroup(GroupState *node)
*/
if (outerPlan->chgParam == NULL)
ExecReScan(outerPlan);
-
- SetNodeRunState(node, Inited);
}
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index fcbc44e..5a71fe3 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -84,6 +84,8 @@ MultiExecHash(HashState *node)
if (node->ps.instrument)
InstrStartNode(node->ps.instrument);
+ SetNodeRunState(node, Running);
+
/*
* get state info from node
*/
@@ -138,6 +140,8 @@ MultiExecHash(HashState *node)
if (hashtable->spaceUsed > hashtable->spacePeak)
hashtable->spacePeak = hashtable->spaceUsed;
+ SetNodeRunState(node, Done);
+
/* must provide our own instrumentation support */
if (node->ps.instrument)
InstrStopNode(node->ps.instrument, hashtable->totalTuples);
@@ -1270,6 +1274,8 @@ ExecHashTableResetMatchFlags(HashJoinTable hashtable)
void
ExecReScanHash(HashState *node)
{
+ SetNodeRunState(node, Inited);
+
/*
* if chgParam of subnode is not null then plan will be re-scanned by
* first ExecProcNode.
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index 064421e..dbaabc4 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -72,6 +72,8 @@ ExecHashJoin(HashJoinState *node)
uint32 hashvalue;
int batchno;
+ SetNodeRunState(node, Running);
+
/*
* get information from HashJoin node
*/
@@ -155,6 +157,7 @@ ExecHashJoin(HashJoinState *node)
if (TupIsNull(node->hj_FirstOuterTupleSlot))
{
node->hj_OuterNotEmpty = false;
+ SetNodeRunState(node, Done);
return NULL;
}
else
@@ -183,7 +186,10 @@ ExecHashJoin(HashJoinState *node)
* outer relation.
*/
if (hashtable->totalTuples == 0 && !HJ_FILL_OUTER(node))
+ {
+ SetNodeRunState(node, Done);
return NULL;
+ }
/*
* need to remember whether nbatch has increased since we
@@ -414,7 +420,10 @@ ExecHashJoin(HashJoinState *node)
* Try to advance to next batch. Done if there are no more.
*/
if (!ExecHashJoinNewBatch(node))
+ {
+ SetNodeRunState(node, Done);
return NULL; /* end of join */
+ }
node->hj_JoinState = HJ_NEED_NEW_OUTER;
break;
@@ -944,6 +953,8 @@ ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
void
ExecReScanHashJoin(HashJoinState *node)
{
+ SetNodeRunState(node, Inited);
+
/*
* In a multi-batch join, we currently have to do rescans the hard way,
* primarily because batch temp files may have already been released. But
diff --git a/src/backend/executor/nodeIndexonlyscan.c b/src/backend/executor/nodeIndexonlyscan.c
index 0e84314..b3676c9 100644
--- a/src/backend/executor/nodeIndexonlyscan.c
+++ b/src/backend/executor/nodeIndexonlyscan.c
@@ -252,15 +252,24 @@ IndexOnlyRecheck(IndexOnlyScanState *node, TupleTableSlot *slot)
TupleTableSlot *
ExecIndexOnlyScan(IndexOnlyScanState *node)
{
+ TupleTableSlot *slot;
+
/*
* If we have runtime keys and they've not already been set up, do it now.
*/
if (node->ioss_NumRuntimeKeys != 0 && !node->ioss_RuntimeKeysReady)
ExecReScan((PlanState *) node);
- return ExecScan(&node->ss,
+ SetNodeRunState(node, Running);
+
+ slot = ExecScan(&node->ss,
(ExecScanAccessMtd) IndexOnlyNext,
(ExecScanRecheckMtd) IndexOnlyRecheck);
+
+ if (TupIsNull(slot))
+ SetNodeRunState(node, Done);
+
+ return slot;
}
/* ----------------------------------------------------------------
@@ -277,6 +286,8 @@ ExecIndexOnlyScan(IndexOnlyScanState *node)
void
ExecReScanIndexOnlyScan(IndexOnlyScanState *node)
{
+ SetNodeRunState(node, Inited);
+
/*
* If we are doing runtime key calculations (ie, any of the index key
* values weren't simple Consts), compute the new key values. But first,
@@ -376,6 +387,9 @@ void
ExecIndexOnlyRestrPos(IndexOnlyScanState *node)
{
index_restrpos(node->ioss_ScanDesc);
+
+ /* Restoring position in turn restores run state */
+ SetNodeRunState(node, Running);
}
/* ----------------------------------------------------------------
diff --git a/src/backend/executor/nodeIndexscan.c b/src/backend/executor/nodeIndexscan.c
index 534d2f4..a343c5c 100644
--- a/src/backend/executor/nodeIndexscan.c
+++ b/src/backend/executor/nodeIndexscan.c
@@ -484,20 +484,29 @@ reorderqueue_pop(IndexScanState *node)
TupleTableSlot *
ExecIndexScan(IndexScanState *node)
{
+ TupleTableSlot *slot;
+
/*
* If we have runtime keys and they've not already been set up, do it now.
*/
if (node->iss_NumRuntimeKeys != 0 && !node->iss_RuntimeKeysReady)
ExecReScan((PlanState *) node);
+ SetNodeRunState(node, Running);
+
if (node->iss_NumOrderByKeys > 0)
- return ExecScan(&node->ss,
+ slot = ExecScan(&node->ss,
(ExecScanAccessMtd) IndexNextWithReorder,
(ExecScanRecheckMtd) IndexRecheck);
else
- return ExecScan(&node->ss,
+ slot = ExecScan(&node->ss,
(ExecScanAccessMtd) IndexNext,
(ExecScanRecheckMtd) IndexRecheck);
+
+ if (TupIsNull(slot))
+ SetNodeRunState(node, Done);
+
+ return slot;
}
/* ----------------------------------------------------------------
@@ -514,6 +523,8 @@ ExecIndexScan(IndexScanState *node)
void
ExecReScanIndexScan(IndexScanState *node)
{
+ SetNodeRunState(node, Inited);
+
/*
* If we are doing runtime key calculations (ie, any of the index key
* values weren't simple Consts), compute the new key values. But first,
@@ -802,6 +813,9 @@ void
ExecIndexRestrPos(IndexScanState *node)
{
index_restrpos(node->iss_ScanDesc);
+
+ /* Restoring position in turn restores run state */
+ SetNodeRunState(node, Running);
}
/* ----------------------------------------------------------------
diff --git a/src/backend/executor/nodeLimit.c b/src/backend/executor/nodeLimit.c
index 1b675d4..e59d71f 100644
--- a/src/backend/executor/nodeLimit.c
+++ b/src/backend/executor/nodeLimit.c
@@ -43,6 +43,8 @@ ExecLimit(LimitState *node)
TupleTableSlot *slot;
PlanState *outerPlan;
+ SetNodeRunState(node, Running);
+
/*
* get information from the node
*/
@@ -72,7 +74,10 @@ ExecLimit(LimitState *node)
* If backwards scan, just return NULL without changing state.
*/
if (!ScanDirectionIsForward(direction))
+ {
+ SetNodeRunState(node, Done);
return NULL;
+ }
/*
* Check for empty window; if so, treat like empty subplan.
@@ -80,6 +85,7 @@ ExecLimit(LimitState *node)
if (node->count <= 0 && !node->noCount)
{
node->lstate = LIMIT_EMPTY;
+ SetNodeRunState(node, Done);
return NULL;
}
@@ -96,6 +102,7 @@ ExecLimit(LimitState *node)
* any output at all.
*/
node->lstate = LIMIT_EMPTY;
+ SetNodeRunState(node, Done);
return NULL;
}
node->subSlot = slot;
@@ -115,6 +122,7 @@ ExecLimit(LimitState *node)
* The subplan is known to return no tuples (or not more than
* OFFSET tuples, in general). So we return no tuples.
*/
+ SetNodeRunState(node, Done);
return NULL;
case LIMIT_INWINDOW:
@@ -130,6 +138,7 @@ ExecLimit(LimitState *node)
node->position - node->offset >= node->count)
{
node->lstate = LIMIT_WINDOWEND;
+ SetNodeRunState(node, Done);
return NULL;
}
@@ -140,6 +149,7 @@ ExecLimit(LimitState *node)
if (TupIsNull(slot))
{
node->lstate = LIMIT_SUBPLANEOF;
+ SetNodeRunState(node, Done);
return NULL;
}
node->subSlot = slot;
@@ -154,6 +164,7 @@ ExecLimit(LimitState *node)
if (node->position <= node->offset + 1)
{
node->lstate = LIMIT_WINDOWSTART;
+ SetNodeRunState(node, Done);
return NULL;
}
@@ -170,7 +181,10 @@ ExecLimit(LimitState *node)
case LIMIT_SUBPLANEOF:
if (ScanDirectionIsForward(direction))
+ {
+ SetNodeRunState(node, Done);
return NULL;
+ }
/*
* Backing up from subplan EOF, so re-fetch previous tuple; there
@@ -186,7 +200,10 @@ ExecLimit(LimitState *node)
case LIMIT_WINDOWEND:
if (ScanDirectionIsForward(direction))
+ {
+ SetNodeRunState(node, Done);
return NULL;
+ }
/*
* Backing up from window end: simply re-return the last tuple
@@ -199,7 +216,10 @@ ExecLimit(LimitState *node)
case LIMIT_WINDOWSTART:
if (!ScanDirectionIsForward(direction))
+ {
+ SetNodeRunState(node, Done);
return NULL;
+ }
/*
* Advancing after having backed off window start: simply
@@ -443,6 +463,8 @@ ExecEndLimit(LimitState *node)
void
ExecReScanLimit(LimitState *node)
{
+ SetNodeRunState(node, Inited);
+
/*
* Recompute limit/offset in case parameters changed, and reset the state
* machine. We must do this before rescanning our child node, in case
diff --git a/src/backend/executor/nodeLockRows.c b/src/backend/executor/nodeLockRows.c
index eeeca0b..2ccf05d 100644
--- a/src/backend/executor/nodeLockRows.c
+++ b/src/backend/executor/nodeLockRows.c
@@ -44,6 +44,8 @@ ExecLockRows(LockRowsState *node)
bool epq_needed;
ListCell *lc;
+ SetNodeRunState(node, Running);
+
/*
* get information from the node
*/
@@ -57,7 +59,10 @@ lnext:
slot = ExecProcNode(outerPlan);
if (TupIsNull(slot))
+ {
+ SetNodeRunState(node, Done);
return NULL;
+ }
/* We don't need EvalPlanQual unless we get updated tuple version(s) */
epq_needed = false;
@@ -460,6 +465,8 @@ ExecEndLockRows(LockRowsState *node)
void
ExecReScanLockRows(LockRowsState *node)
{
+ SetNodeRunState(node, Inited);
+
/*
* if chgParam of subnode is not null then plan will be re-scanned by
* first ExecProcNode.
diff --git a/src/backend/executor/nodeMaterial.c b/src/backend/executor/nodeMaterial.c
index d9a67f4..981398a 100644
--- a/src/backend/executor/nodeMaterial.c
+++ b/src/backend/executor/nodeMaterial.c
@@ -45,6 +45,8 @@ ExecMaterial(MaterialState *node)
bool eof_tuplestore;
TupleTableSlot *slot;
+ SetNodeRunState(node, Running);
+
/*
* get state info from node
*/
@@ -132,6 +134,7 @@ ExecMaterial(MaterialState *node)
if (TupIsNull(outerslot))
{
node->eof_underlying = true;
+ SetNodeRunState(node, Done);
return NULL;
}
@@ -152,6 +155,7 @@ ExecMaterial(MaterialState *node)
/*
* Nothing left ...
*/
+ SetNodeRunState(node, Done);
return ExecClearTuple(slot);
}
@@ -307,6 +311,9 @@ ExecMaterialRestrPos(MaterialState *node)
* copy the mark to the active read pointer.
*/
tuplestore_copy_read_pointer(node->tuplestorestate, 1, 0);
+
+ /* Restoring position in turn restores run state */
+ SetNodeRunState(node, Running);
}
/* ----------------------------------------------------------------
@@ -320,6 +327,8 @@ ExecReScanMaterial(MaterialState *node)
{
PlanState *outerPlan = outerPlanState(node);
+ SetNodeRunState(node, Inited);
+
ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
if (node->eflags != 0)
diff --git a/src/backend/executor/nodeMergeAppend.c b/src/backend/executor/nodeMergeAppend.c
index 3901255..4678d7c 100644
--- a/src/backend/executor/nodeMergeAppend.c
+++ b/src/backend/executor/nodeMergeAppend.c
@@ -170,6 +170,8 @@ ExecMergeAppend(MergeAppendState *node)
TupleTableSlot *result;
SlotNumber i;
+ SetNodeRunState(node, Running);
+
if (!node->ms_initialized)
{
/*
@@ -207,6 +209,7 @@ ExecMergeAppend(MergeAppendState *node)
{
/* All the subplans are exhausted, and so is the heap */
result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
+ SetNodeRunState(node, Done);
}
else
{
@@ -289,6 +292,8 @@ ExecReScanMergeAppend(MergeAppendState *node)
{
int i;
+ SetNodeRunState(node, Inited);
+
for (i = 0; i < node->ms_nplans; i++)
{
PlanState *subnode = node->mergeplans[i];
diff --git a/src/backend/executor/nodeMergejoin.c b/src/backend/executor/nodeMergejoin.c
index 9970db1..74ceaa2 100644
--- a/src/backend/executor/nodeMergejoin.c
+++ b/src/backend/executor/nodeMergejoin.c
@@ -630,6 +630,8 @@ ExecMergeJoin(MergeJoinState *node)
bool doFillOuter;
bool doFillInner;
+ SetNodeRunState(node, Running);
+
/*
* get information from node
*/
@@ -728,6 +730,7 @@ ExecMergeJoin(MergeJoinState *node)
break;
}
/* Otherwise we're done. */
+ SetNodeRunState(node, Done);
return NULL;
}
break;
@@ -785,6 +788,7 @@ ExecMergeJoin(MergeJoinState *node)
break;
}
/* Otherwise we're done. */
+ SetNodeRunState(node, Done);
return NULL;
}
break;
@@ -1039,6 +1043,7 @@ ExecMergeJoin(MergeJoinState *node)
break;
}
/* Otherwise we're done. */
+ SetNodeRunState(node, Done);
return NULL;
}
break;
@@ -1174,6 +1179,7 @@ ExecMergeJoin(MergeJoinState *node)
break;
}
/* Otherwise we're done. */
+ SetNodeRunState(node, Done);
return NULL;
}
}
@@ -1292,6 +1298,7 @@ ExecMergeJoin(MergeJoinState *node)
break;
}
/* Otherwise we're done. */
+ SetNodeRunState(node, Done);
return NULL;
}
break;
@@ -1362,6 +1369,7 @@ ExecMergeJoin(MergeJoinState *node)
break;
}
/* Otherwise we're done. */
+ SetNodeRunState(node, Done);
return NULL;
}
break;
@@ -1406,6 +1414,7 @@ ExecMergeJoin(MergeJoinState *node)
if (TupIsNull(innerTupleSlot))
{
MJ_printf("ExecMergeJoin: end of inner subplan\n");
+ SetNodeRunState(node, Done);
return NULL;
}
@@ -1448,6 +1457,7 @@ ExecMergeJoin(MergeJoinState *node)
if (TupIsNull(outerTupleSlot))
{
MJ_printf("ExecMergeJoin: end of outer subplan\n");
+ SetNodeRunState(node, Done);
return NULL;
}
@@ -1682,6 +1692,7 @@ ExecEndMergeJoin(MergeJoinState *node)
void
ExecReScanMergeJoin(MergeJoinState *node)
{
+ SetNodeRunState(node, Inited);
ExecClearTuple(node->mj_MarkedTupleSlot);
node->mj_JoinState = EXEC_MJ_INITIALIZE_OUTER;
@@ -1699,5 +1710,4 @@ ExecReScanMergeJoin(MergeJoinState *node)
ExecReScan(node->js.ps.lefttree);
if (node->js.ps.righttree->chgParam == NULL)
ExecReScan(node->js.ps.righttree);
-
}
diff --git a/src/backend/executor/nodeNestloop.c b/src/backend/executor/nodeNestloop.c
index b4c2f26..ae69176 100644
--- a/src/backend/executor/nodeNestloop.c
+++ b/src/backend/executor/nodeNestloop.c
@@ -69,6 +69,8 @@ ExecNestLoop(NestLoopState *node)
ExprContext *econtext;
ListCell *lc;
+ SetNodeRunState(node, Running);
+
/*
* get information from the node
*/
@@ -128,6 +130,7 @@ ExecNestLoop(NestLoopState *node)
if (TupIsNull(outerTupleSlot))
{
ENL1_printf("no outer tuple, ending join");
+ SetNodeRunState(node, Done);
return NULL;
}
@@ -429,6 +432,8 @@ ExecReScanNestLoop(NestLoopState *node)
{
PlanState *outerPlan = outerPlanState(node);
+ SetNodeRunState(node, Inited);
+
/*
* If outerPlan->chgParam is not null then plan will be automatically
* re-scanned by first ExecProcNode.
diff --git a/src/backend/executor/nodeRecursiveunion.c b/src/backend/executor/nodeRecursiveunion.c
index 118496e..27a86d3 100644
--- a/src/backend/executor/nodeRecursiveunion.c
+++ b/src/backend/executor/nodeRecursiveunion.c
@@ -81,6 +81,8 @@ ExecRecursiveUnion(RecursiveUnionState *node)
TupleTableSlot *slot;
bool isnew;
+ SetNodeRunState(node, Running);
+
/* 1. Evaluate non-recursive term */
if (!node->recursing)
{
@@ -154,6 +156,7 @@ ExecRecursiveUnion(RecursiveUnionState *node)
return slot;
}
+ SetNodeRunState(node, Done);
return NULL;
}
@@ -309,6 +312,8 @@ ExecReScanRecursiveUnion(RecursiveUnionState *node)
PlanState *innerPlan = innerPlanState(node);
RecursiveUnion *plan = (RecursiveUnion *) node->ps.plan;
+ SetNodeRunState(node, Inited);
+
/*
* Set recursive term's chgParam to tell it that we'll modify the working
* table and therefore it has to rescan.
diff --git a/src/backend/executor/nodeResult.c b/src/backend/executor/nodeResult.c
index b4ee402..ec81eda 100644
--- a/src/backend/executor/nodeResult.c
+++ b/src/backend/executor/nodeResult.c
@@ -72,6 +72,7 @@ ExecResult(ResultState *node)
ExprContext *econtext;
ExprDoneCond isDone;
+ SetNodeRunState(node, Running);
econtext = node->ps.ps_ExprContext;
/*
@@ -87,6 +88,7 @@ ExecResult(ResultState *node)
if (!qualResult)
{
node->rs_done = true;
+ SetNodeRunState(node, Done);
return NULL;
}
}
@@ -130,7 +132,10 @@ ExecResult(ResultState *node)
outerTupleSlot = ExecProcNode(outerPlan);
if (TupIsNull(outerTupleSlot))
+ {
+ SetNodeRunState(node, Done);
return NULL;
+ }
/*
* prepare to compute projection expressions, which will expect to
@@ -161,6 +166,7 @@ ExecResult(ResultState *node)
}
}
+ SetNodeRunState(node, Done);
return NULL;
}
@@ -189,7 +195,12 @@ ExecResultRestrPos(ResultState *node)
PlanState *outerPlan = outerPlanState(node);
if (outerPlan != NULL)
+ {
ExecRestrPos(outerPlan);
+
+ /* Restoring position in turn restores run state */
+ SetNodeRunState(node, Running);
+ }
else
elog(ERROR, "Result nodes do not support mark/restore");
}
@@ -295,6 +306,7 @@ ExecEndResult(ResultState *node)
void
ExecReScanResult(ResultState *node)
{
+ SetNodeRunState(node, Inited);
node->rs_done = false;
node->ps.ps_TupFromTlist = false;
node->rs_checkqual = (node->resconstantqual == NULL) ? false : true;
diff --git a/src/backend/executor/nodeSamplescan.c b/src/backend/executor/nodeSamplescan.c
index 0f75110..f46569e 100644
--- a/src/backend/executor/nodeSamplescan.c
+++ b/src/backend/executor/nodeSamplescan.c
@@ -98,9 +98,17 @@ SampleRecheck(SampleScanState *node, TupleTableSlot *slot)
TupleTableSlot *
ExecSampleScan(SampleScanState *node)
{
- return ExecScan((ScanState *) node,
+ TupleTableSlot *slot;
+
+ SetNodeRunState(node, Running);
+ slot = ExecScan((ScanState *) node,
(ExecScanAccessMtd) SampleNext,
(ExecScanRecheckMtd) SampleRecheck);
+
+ if (TupIsNull(slot))
+ SetNodeRunState(node, Done);
+
+ return slot;
}
/* ----------------------------------------------------------------
diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c
index 0ee33ed..d443c09 100644
--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -124,9 +124,17 @@ SeqRecheck(SeqScanState *node, TupleTableSlot *slot)
TupleTableSlot *
ExecSeqScan(SeqScanState *node)
{
- return ExecScan((ScanState *) node,
+ TupleTableSlot *slot;
+
+ SetNodeRunState(node, Running);
+ slot = ExecScan((ScanState *) node,
(ExecScanAccessMtd) SeqNext,
(ExecScanRecheckMtd) SeqRecheck);
+
+ if (TupIsNull(slot))
+ SetNodeRunState(node, Done);
+
+ return slot;
}
/* ----------------------------------------------------------------
@@ -275,6 +283,7 @@ ExecReScanSeqScan(SeqScanState *node)
{
HeapScanDesc scan;
+ SetNodeRunState(node, Inited);
scan = node->ss.ss_currentScanDesc;
if (scan != NULL)
diff --git a/src/backend/executor/nodeSetOp.c b/src/backend/executor/nodeSetOp.c
index 123d051..c248ff3 100644
--- a/src/backend/executor/nodeSetOp.c
+++ b/src/backend/executor/nodeSetOp.c
@@ -603,6 +603,7 @@ ExecEndSetOp(SetOpState *node)
void
ExecReScanSetOp(SetOpState *node)
{
+ SetNodeRunState(node, Inited);
ExecClearTuple(node->ps.ps_ResultTupleSlot);
node->numOutput = 0;
@@ -653,6 +654,4 @@ ExecReScanSetOp(SetOpState *node)
*/
if (node->ps.lefttree->chgParam == NULL)
ExecReScan(node->ps.lefttree);
-
- SetNodeRunState(node, Inited);
}
diff --git a/src/backend/executor/nodeSort.c b/src/backend/executor/nodeSort.c
index 3ae5b89..a2abec7 100644
--- a/src/backend/executor/nodeSort.c
+++ b/src/backend/executor/nodeSort.c
@@ -49,6 +49,8 @@ ExecSort(SortState *node)
SO1_printf("ExecSort: %s\n",
"entering routine");
+ SetNodeRunState(node, Running);
+
estate = node->ss.ps.state;
dir = estate->es_direction;
tuplesortstate = (Tuplesortstate *) node->tuplesortstate;
@@ -138,6 +140,10 @@ ExecSort(SortState *node)
(void) tuplesort_gettupleslot(tuplesortstate,
ScanDirectionIsForward(dir),
slot);
+
+ if (TupIsNull(slot))
+ SetNodeRunState(node, Done);
+
return slot;
}
@@ -282,6 +288,9 @@ ExecSortRestrPos(SortState *node)
if (!node->sort_Done)
return;
+ /* Restoring position in turn restores run state */
+ SetNodeRunState(node, Running);
+
/*
* restore the scan to the previously marked position
*/
@@ -293,6 +302,8 @@ ExecReScanSort(SortState *node)
{
PlanState *outerPlan = outerPlanState(node);
+ SetNodeRunState(node, Inited);
+
/*
* If we haven't sorted yet, just return. If outerplan's chgParam is not
* NULL then it will be re-scanned by ExecProcNode, else no reason to
diff --git a/src/backend/executor/nodeSubqueryscan.c b/src/backend/executor/nodeSubqueryscan.c
index 497d6df..d8799d1 100644
--- a/src/backend/executor/nodeSubqueryscan.c
+++ b/src/backend/executor/nodeSubqueryscan.c
@@ -90,9 +90,17 @@ SubqueryRecheck(SubqueryScanState *node, TupleTableSlot *slot)
TupleTableSlot *
ExecSubqueryScan(SubqueryScanState *node)
{
- return ExecScan(&node->ss,
+ TupleTableSlot *slot;
+
+ SetNodeRunState(node, Running);
+ slot = ExecScan(&node->ss,
(ExecScanAccessMtd) SubqueryNext,
(ExecScanRecheckMtd) SubqueryRecheck);
+
+ if (TupIsNull(slot))
+ SetNodeRunState(node, Done);
+
+ return slot;
}
/* ----------------------------------------------------------------
@@ -199,6 +207,7 @@ ExecEndSubqueryScan(SubqueryScanState *node)
void
ExecReScanSubqueryScan(SubqueryScanState *node)
{
+ SetNodeRunState(node, Inited);
ExecScanReScan(&node->ss);
/*
diff --git a/src/backend/executor/nodeTidscan.c b/src/backend/executor/nodeTidscan.c
index f19e735..724016d 100644
--- a/src/backend/executor/nodeTidscan.c
+++ b/src/backend/executor/nodeTidscan.c
@@ -390,9 +390,17 @@ TidRecheck(TidScanState *node, TupleTableSlot *slot)
TupleTableSlot *
ExecTidScan(TidScanState *node)
{
- return ExecScan(&node->ss,
+ TupleTableSlot *slot;
+
+ SetNodeRunState(node, Running);
+ slot = ExecScan(&node->ss,
(ExecScanAccessMtd) TidNext,
(ExecScanRecheckMtd) TidRecheck);
+
+ if (TupIsNull(slot))
+ SetNodeRunState(node, Done);
+
+ return slot;
}
/* ----------------------------------------------------------------
@@ -402,6 +410,8 @@ ExecTidScan(TidScanState *node)
void
ExecReScanTidScan(TidScanState *node)
{
+ SetNodeRunState(node, Inited);
+
if (node->tss_TidList)
pfree(node->tss_TidList);
node->tss_TidList = NULL;
diff --git a/src/backend/executor/nodeUnique.c b/src/backend/executor/nodeUnique.c
index f259f32..1f7ca10 100644
--- a/src/backend/executor/nodeUnique.c
+++ b/src/backend/executor/nodeUnique.c
@@ -50,6 +50,8 @@ ExecUnique(UniqueState *node)
TupleTableSlot *slot;
PlanState *outerPlan;
+ SetNodeRunState(node, Running);
+
/*
* get information from the node
*/
@@ -71,6 +73,7 @@ ExecUnique(UniqueState *node)
{
/* end of subplan, so we're done */
ExecClearTuple(resultTupleSlot);
+ SetNodeRunState(node, Done);
return NULL;
}
@@ -187,6 +190,8 @@ ExecEndUnique(UniqueState *node)
void
ExecReScanUnique(UniqueState *node)
{
+ SetNodeRunState(node, Inited);
+
/* must clear result tuple so first input tuple is returned */
ExecClearTuple(node->ps.ps_ResultTupleSlot);
diff --git a/src/backend/executor/nodeValuesscan.c b/src/backend/executor/nodeValuesscan.c
index c56199c..48d1ad8 100644
--- a/src/backend/executor/nodeValuesscan.c
+++ b/src/backend/executor/nodeValuesscan.c
@@ -175,9 +175,18 @@ ValuesRecheck(ValuesScanState *node, TupleTableSlot *slot)
TupleTableSlot *
ExecValuesScan(ValuesScanState *node)
{
- return ExecScan(&node->ss,
+ TupleTableSlot *slot;
+
+ /* Advance the state to running if just after initialized */
+ SetNodeRunState(node, Running);
+ slot = ExecScan(&node->ss,
(ExecScanAccessMtd) ValuesNext,
(ExecScanRecheckMtd) ValuesRecheck);
+
+ if (TupIsNull(slot))
+ SetNodeRunState(node, Done);
+
+ return slot;
}
/* ----------------------------------------------------------------
@@ -302,6 +311,7 @@ ExecEndValuesScan(ValuesScanState *node)
void
ExecReScanValuesScan(ValuesScanState *node)
{
+ SetNodeRunState(node, Inited);
ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
ExecScanReScan(&node->ss);
diff --git a/src/backend/executor/nodeWindowAgg.c b/src/backend/executor/nodeWindowAgg.c
index 8734e8e..0d127b4 100644
--- a/src/backend/executor/nodeWindowAgg.c
+++ b/src/backend/executor/nodeWindowAgg.c
@@ -2064,6 +2064,7 @@ ExecReScanWindowAgg(WindowAggState *node)
PlanState *outerPlan = outerPlanState(node);
ExprContext *econtext = node->ss.ps.ps_ExprContext;
+ SetNodeRunState(node, Inited);
node->ss.ps.ps_TupFromTlist = false;
node->all_first = true;
@@ -2087,8 +2088,6 @@ ExecReScanWindowAgg(WindowAggState *node)
*/
if (outerPlan->chgParam == NULL)
ExecReScan(outerPlan);
-
- SetNodeRunState(node, Inited);
}
/*
diff --git a/src/backend/executor/nodeWorktablescan.c b/src/backend/executor/nodeWorktablescan.c
index 799e96b..46350ab 100644
--- a/src/backend/executor/nodeWorktablescan.c
+++ b/src/backend/executor/nodeWorktablescan.c
@@ -80,6 +80,10 @@ WorkTableScanRecheck(WorkTableScanState *node, TupleTableSlot *slot)
TupleTableSlot *
ExecWorkTableScan(WorkTableScanState *node)
{
+ TupleTableSlot *slot;
+
+ SetNodeRunState(node, Running);
+
/*
* On the first call, find the ancestor RecursiveUnion's state via the
* Param slot reserved for it. (We can't do this during node init because
@@ -114,9 +118,14 @@ ExecWorkTableScan(WorkTableScanState *node)
ExecAssignScanProjectionInfo(&node->ss);
}
- return ExecScan(&node->ss,
+ slot = ExecScan(&node->ss,
(ExecScanAccessMtd) WorkTableScanNext,
(ExecScanRecheckMtd) WorkTableScanRecheck);
+
+ if (TupIsNull(slot))
+ SetNodeRunState(node, Done);
+
+ return slot;
}
@@ -210,6 +219,7 @@ ExecEndWorkTableScan(WorkTableScanState *node)
void
ExecReScanWorkTableScan(WorkTableScanState *node)
{
+ SetNodeRunState(node, Inited);
ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
ExecScanReScan(&node->ss);
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 4527a98..ee706fc 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -355,14 +355,16 @@ typedef enum ExecNodeRunState
ERunState_Done /* No tuple to return */
} ExecNodeRunState;
-#define SetNodeRunState(nd,st) (((PlanState*)nd)->runstate = (ERunState_##st))
+#define SetNodeRunState(nd,st) (((PlanState*)(nd))->runstate = (ERunState_##st))
+#define ExecNode_is(nd,st) (((PlanState*)(nd))->runstate == (ERunState_##st))
+#define ExecNode_is_inited(nd) (ExecNode_is((nd),Inited))
+#define ExecNode_is_running(nd) (ExecNode_is((nd),Running))
+#define ExecNode_is_done(nd) (ExecNode_is((nd),Done))
#define AdvanceNodeRunStateTo(nd,st) \
do {\
- if (((PlanState*)nd)->runstate < (ERunState_##st))\
- ((PlanState*)nd)->runstate = (ERunState_##st);\
+ if (((PlanState*)(nd))->runstate < (ERunState_##st))\
+ ((PlanState*)(nd))->runstate = (ERunState_##st);\
} while(0);
-#define ExecNode_is_running(nd) (((PlanState*)nd)->runstate == ERunState_Running)
-#define ExecNode_is_done(nd) (((PlanState*)nd)->runstate == ERunState_Done)
/* ----------------
* EState information
--
1.8.3.1
0003-Add-a-feature-to-start-node-asynchronously.patchtext/x-patch; charset=us-asciiDownload
>From bee3f8a971a1c86a7c53929a17e358a763348193 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Thu, 9 Jul 2015 19:34:15 +0900
Subject: [PATCH 3/5] Add a feature to start node asynchronously
Add a feature to start child nodes asynchronously in join nodes and
Append/MergeAppend nodes. At this point, no node can be started
asynchronously so the behavior is not changed.
---
src/backend/executor/execProcnode.c | 106 ++++++++++++++++++++++++
src/backend/executor/nodeAgg.c | 22 +++++
src/backend/executor/nodeAppend.c | 29 +++++++
src/backend/executor/nodeCtescan.c | 21 +++++
src/backend/executor/nodeGather.c | 141 +++++++++++++++++++-------------
src/backend/executor/nodeGroup.c | 22 +++++
src/backend/executor/nodeHash.c | 22 +++++
src/backend/executor/nodeHashjoin.c | 54 ++++++++++++
src/backend/executor/nodeLimit.c | 22 +++++
src/backend/executor/nodeLockRows.c | 22 +++++
src/backend/executor/nodeMaterial.c | 23 ++++++
src/backend/executor/nodeMergeAppend.c | 30 +++++++
src/backend/executor/nodeMergejoin.c | 29 +++++++
src/backend/executor/nodeNestloop.c | 34 ++++++++
src/backend/executor/nodeResult.c | 24 ++++++
src/backend/executor/nodeSetOp.c | 22 +++++
src/backend/executor/nodeSort.c | 22 +++++
src/backend/executor/nodeSubqueryscan.c | 22 +++++
src/backend/executor/nodeUnique.c | 22 +++++
src/backend/executor/nodeWindowAgg.c | 22 +++++
src/include/executor/executor.h | 1 +
src/include/executor/nodeAgg.h | 1 +
src/include/executor/nodeAppend.h | 1 +
src/include/executor/nodeCtescan.h | 1 +
src/include/executor/nodeGather.h | 1 +
src/include/executor/nodeGroup.h | 1 +
src/include/executor/nodeHash.h | 1 +
src/include/executor/nodeHashjoin.h | 1 +
src/include/executor/nodeLimit.h | 1 +
src/include/executor/nodeLockRows.h | 1 +
src/include/executor/nodeMaterial.h | 1 +
src/include/executor/nodeMergeAppend.h | 1 +
src/include/executor/nodeMergejoin.h | 1 +
src/include/executor/nodeNestloop.h | 1 +
src/include/executor/nodeResult.h | 1 +
src/include/executor/nodeSetOp.h | 1 +
src/include/executor/nodeSort.h | 1 +
src/include/executor/nodeSubqueryscan.h | 1 +
src/include/executor/nodeUnique.h | 1 +
src/include/executor/nodeWindowAgg.h | 1 +
40 files changed, 672 insertions(+), 59 deletions(-)
diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index 6f5c554..a9d973d 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -786,6 +786,112 @@ ExecEndNode(PlanState *node)
}
/*
+ * StartProcNode - asynchronously execnode nodes underneath if possible
+ *
+ * Returns true if the node has been started asynchronously. Some of the nodes
+ * may be started even if false.
+ */
+bool
+StartProcNode(PlanState *node)
+{
+ /*
+ * Refuse duplicate start. This occurs for skipped children on rescan on
+ * nodes such like MergeAppend.
+ */
+ if (node->runstate > ERunState_Started)
+ return false;
+
+ switch (nodeTag(node))
+ {
+ case T_ResultState:
+ return StartResult((ResultState *)node);
+
+ case T_AppendState:
+ return StartAppend((AppendState *)node);
+
+ case T_MergeAppendState:
+ return StartMergeAppend((MergeAppendState *)node);
+
+ case T_SubqueryScanState:
+ return StartSubqueryScan((SubqueryScanState *)node);
+
+ case T_CteScanState:
+ return StartCteScan((CteScanState *)node);
+
+ /*
+ * join nodes
+ */
+ case T_NestLoopState:
+ return StartNestLoop((NestLoopState *)node);
+
+ case T_MergeJoinState:
+ return StartMergeJoin((MergeJoinState *)node);
+
+ case T_HashJoinState:
+ return StartHashJoin((HashJoinState *)node);
+
+ /*
+ * materialization nodes
+ */
+ case T_MaterialState:
+ return StartMaterial((MaterialState *)node);
+
+ case T_SortState:
+ return StartSort((SortState *)node);
+
+ case T_GroupState:
+ return StartGroup((GroupState *)node);
+
+ case T_AggState:
+ return StartAgg((AggState *)node);
+
+ case T_WindowAggState:
+ return StartWindowAgg((WindowAggState *)node);
+
+ case T_UniqueState:
+ return StartUnique((UniqueState *)node);
+
+ case T_HashState:
+ return StartHash((HashState *)node);
+
+ case T_SetOpState:
+ return StartSetOp((SetOpState *)node);
+
+ case T_LockRowsState:
+ return StartLockRows((LockRowsState *)node);
+
+ case T_LimitState:
+ return StartLimit((LimitState *)node);
+
+ case T_GatherState:
+ return StartGather((GatherState *)node);
+
+ /* These nodes cannot run asynchronously */
+ case T_ForeignScanState:
+ case T_WorkTableScanState:
+ case T_CustomScanState:
+ case T_FunctionScanState:
+ case T_ValuesScanState:
+ case T_SeqScanState:
+ case T_SampleScanState:
+ case T_IndexScanState:
+ case T_IndexOnlyScanState:
+ case T_BitmapIndexScanState:
+ case T_BitmapHeapScanState:
+ case T_TidScanState:
+ case T_ModifyTableState:
+ case T_RecursiveUnionState:
+ case T_BitmapAndState:
+ case T_BitmapOrState:
+ return false;
+
+ default:
+ elog(ERROR, "unrecognized node type: %d", (int) nodeTag(node));
+ break;
+ }
+}
+
+/*
* ExecShutdownNode
*
* Give execution nodes a chance to stop asynchronous resource consumption
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index ed29e3a..a40cb48 100644
--- a/src/backend/executor/nodeAgg.c
+++ b/src/backend/executor/nodeAgg.c
@@ -1569,6 +1569,28 @@ ExecAgg(AggState *node)
}
/*
+ * StartAgg - Try asynchronous execution of this node
+ *
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
+ */
+bool
+StartAgg(AggState *node)
+{
+ if (!ExecNode_is_inited(node))
+ return false;
+
+ if (StartProcNode(outerPlanState(node)))
+ {
+ SetNodeRunState(node, Started);
+ return true;
+ }
+
+ return false;
+}
+
+
+/*
* ExecAgg for non-hashed case
*/
static TupleTableSlot *
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index 03b3b66..2b918b2 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -194,6 +194,10 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
TupleTableSlot *
ExecAppend(AppendState *node)
{
+ /* start child nodes asynchronously if possible */
+ if (ExecNode_is_inited(node))
+ StartAppend(node);
+
SetNodeRunState(node, Running);
for (;;)
@@ -241,6 +245,31 @@ ExecAppend(AppendState *node)
}
/* ----------------------------------------------------------------
+ * StartAppend
+ *
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
+ * ----------------------------------------------------------------
+ */
+bool
+StartAppend(AppendState *node)
+{
+ int i;
+ bool async = false;
+
+ if (!ExecNode_is_inited(node))
+ return false;
+
+ for (i = 0 ; i < node->as_nplans ; i++)
+ async |= StartProcNode(node->appendplans[i]);
+
+ if (async)
+ SetNodeRunState(node, Started);
+
+ return async;
+}
+
+/* ----------------------------------------------------------------
* ExecEndAppend
*
* Shuts down the subscans of the append node.
diff --git a/src/backend/executor/nodeCtescan.c b/src/backend/executor/nodeCtescan.c
index d237370..cae0aca 100644
--- a/src/backend/executor/nodeCtescan.c
+++ b/src/backend/executor/nodeCtescan.c
@@ -166,6 +166,27 @@ ExecCteScan(CteScanState *node)
return slot;
}
+/* ----------------------------------------------------------------
+ * StartCteScan
+ *
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
+ * ----------------------------------------------------------------
+ */
+bool
+StartCteScan(CteScanState *node)
+{
+ if (ExecNode_is_inited(node))
+ return false;
+
+ if (StartProcNode(node->cteplanstate))
+ {
+ SetNodeRunState(node, Started);
+ return true;
+ }
+
+ return false;
+}
/* ----------------------------------------------------------------
* ExecInitCteScan
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index aceb358..5364acb 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -69,6 +69,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
gatherstate->ps.state = estate;
SetNodeRunState(gatherstate, Inited);
gatherstate->need_to_scan_locally = !node->single_copy;
+ SetNodeRunState(gatherstate, Inited);
/*
* Miscellaneous initialization
@@ -119,23 +120,24 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
}
/* ----------------------------------------------------------------
- * ExecGather(node)
+ * StartGather
*
- * Scans the relation via multiple workers and returns
- * the next qualifying tuple.
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
* ----------------------------------------------------------------
*/
-TupleTableSlot *
-ExecGather(GatherState *node)
+bool
+StartGather(GatherState *node)
{
+ EState *estate = node->ps.state;
+ Gather *gather = (Gather *) node->ps.plan;
TupleTableSlot *fslot = node->funnel_slot;
- int i;
- TupleTableSlot *slot;
- TupleTableSlot *resultSlot;
- ExprDoneCond isDone;
- ExprContext *econtext;
+ int i;
- SetNodeRunState(node, Running);
+ if (!ExecNode_is_inited(node))
+ return false;
+
+ SetNodeRunState(node, Started);
/*
* Initialize the parallel context and workers on first execution. We do
@@ -143,66 +145,85 @@ ExecGather(GatherState *node)
* needs to allocate large dynamic segement, so it is better to do if it
* is really needed.
*/
- if (ExecNode_is_inited(node))
+
+ /*
+ * Sometimes we might have to run without parallelism; but if
+ * parallel mode is active then we can try to fire up some workers.
+ */
+ if (gather->num_workers > 0 && IsInParallelMode())
{
- EState *estate = node->ps.state;
- Gather *gather = (Gather *) node->ps.plan;
+ ParallelContext *pcxt;
+ bool got_any_worker = false;
+
+ /* Initialize the workers required to execute Gather node. */
+ if (!node->pei)
+ node->pei = ExecInitParallelPlan(node->ps.lefttree,
+ estate,
+ gather->num_workers);
/*
- * Sometimes we might have to run without parallelism; but if
- * parallel mode is active then we can try to fire up some workers.
+ * Register backend workers. We might not get as many as we
+ * requested, or indeed any at all.
*/
- if (gather->num_workers > 0 && IsInParallelMode())
- {
- ParallelContext *pcxt;
- bool got_any_worker = false;
+ pcxt = node->pei->pcxt;
+ LaunchParallelWorkers(pcxt);
- /* Initialize the workers required to execute Gather node. */
- if (!node->pei)
- node->pei = ExecInitParallelPlan(node->ps.lefttree,
- estate,
- gather->num_workers);
-
- /*
- * Register backend workers. We might not get as many as we
- * requested, or indeed any at all.
- */
- pcxt = node->pei->pcxt;
- LaunchParallelWorkers(pcxt);
+ /* Set up tuple queue readers to read the results. */
+ if (pcxt->nworkers > 0)
+ {
+ node->nreaders = 0;
+ node->reader =
+ palloc(pcxt->nworkers * sizeof(TupleQueueReader *));
- /* Set up tuple queue readers to read the results. */
- if (pcxt->nworkers > 0)
+ for (i = 0; i < pcxt->nworkers; ++i)
{
- node->nreaders = 0;
- node->reader =
- palloc(pcxt->nworkers * sizeof(TupleQueueReader *));
-
- for (i = 0; i < pcxt->nworkers; ++i)
- {
- if (pcxt->worker[i].bgwhandle == NULL)
- continue;
-
- shm_mq_set_handle(node->pei->tqueue[i],
- pcxt->worker[i].bgwhandle);
- node->reader[node->nreaders++] =
- CreateTupleQueueReader(node->pei->tqueue[i],
- fslot->tts_tupleDescriptor);
- got_any_worker = true;
- }
+ if (pcxt->worker[i].bgwhandle == NULL)
+ continue;
+
+ shm_mq_set_handle(node->pei->tqueue[i],
+ pcxt->worker[i].bgwhandle);
+ node->reader[node->nreaders++] =
+ CreateTupleQueueReader(node->pei->tqueue[i],
+ fslot->tts_tupleDescriptor);
+ got_any_worker = true;
}
-
- /* No workers? Then never mind. */
- if (!got_any_worker)
- ExecShutdownGatherWorkers(node);
}
- /* Run plan locally if no workers or not single-copy. */
- node->need_to_scan_locally = (node->reader == NULL)
- || !gather->single_copy;
-
- SetNodeRunState(node, Running);
+ /* No workers? Then never mind. */
+ if (!got_any_worker)
+ ExecShutdownGatherWorkers(node);
}
+ /* Run plan locally if no workers or not single-copy. */
+ node->need_to_scan_locally = (node->reader == NULL)
+ || !gather->single_copy;
+
+ /* Plans on worker are always executed asynchronously */
+ SetNodeRunState(node, Started);
+ return true;
+}
+
+/* ----------------------------------------------------------------
+ * ExecGather(node)
+ *
+ * Scans the relation via multiple workers and returns
+ * the next qualifying tuple.
+ * ----------------------------------------------------------------
+ */
+TupleTableSlot *
+ExecGather(GatherState *node)
+{
+ TupleTableSlot *slot;
+ TupleTableSlot *resultSlot;
+ ExprDoneCond isDone;
+ ExprContext *econtext;
+
+ /* Execute childs asynchronously if possible */
+ if (ExecNode_is_inited(node))
+ StartGather(node);
+
+ SetNodeRunState(node, Running);
+
/*
* Check to see if we're still projecting out tuples from a previous scan
* tuple (because there is a function-returning-set in the projection
@@ -462,6 +483,8 @@ ExecReScanGather(GatherState *node)
*/
ExecShutdownGatherWorkers(node);
+ SetNodeRunState(node, Inited);
+
if (node->pei)
ExecParallelReinitialize(node->pei);
diff --git a/src/backend/executor/nodeGroup.c b/src/backend/executor/nodeGroup.c
index a593d9f..ea947b9 100644
--- a/src/backend/executor/nodeGroup.c
+++ b/src/backend/executor/nodeGroup.c
@@ -190,6 +190,28 @@ ExecGroup(GroupState *node)
}
/* -----------------
+ * StartGroup
+ *
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
+ * -----------------
+ */
+bool
+StartGroup(GroupState *node)
+{
+ if (!ExecNode_is_inited(node))
+ return false;
+
+ if (StartProcNode(outerPlanState(node)))
+ {
+ SetNodeRunState(node, Started);
+ return true;
+ }
+
+ return false;
+}
+
+/* -----------------
* ExecInitGroup
*
* Creates the run-time information for the group node produced by the
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 5a71fe3..e04a78f 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -157,6 +157,28 @@ MultiExecHash(HashState *node)
}
/* ----------------------------------------------------------------
+ * AsyncStartHash
+ *
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
+ * ----------------------------------------------------------------
+ */
+bool
+StartHash(HashState *node)
+{
+ if (!ExecNode_is_inited(node))
+ return false;
+
+ if (StartProcNode(outerPlanState(node)))
+ {
+ SetNodeRunState(node, Started);
+ return true;
+ }
+
+ return false;
+}
+
+/* ----------------------------------------------------------------
* ExecInitHash
*
* Init routine for Hash node
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index dbaabc4..ada9290 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -72,6 +72,10 @@ ExecHashJoin(HashJoinState *node)
uint32 hashvalue;
int batchno;
+ /* Try to start asynchronously */
+ if (ExecNode_is_inited(node))
+ StartHashJoin(node);
+
SetNodeRunState(node, Running);
/*
@@ -435,6 +439,56 @@ ExecHashJoin(HashJoinState *node)
}
/* ----------------------------------------------------------------
+ * StartHashJoin
+ *
+ * This function behaves a bit different from StartNode functions of other
+ * nodes from the behavior of ExecHashJoin.
+ * Returns true if any of underlying nodes started asynchronously
+ * ----------------------------------------------------------------
+ */
+bool
+StartHashJoin(HashJoinState *node)
+{
+ PlanState *outerNode = outerPlanState(node);
+ HashState *hashNode = (HashState *) innerPlanState(node);
+ bool async;
+
+ if (!ExecNode_is_inited(node))
+ return false;
+
+ async = StartProcNode(outerNode);
+
+ /*
+ * This condition is the same to that to check the necessity of inner hash
+ * at HJ_BUILD_HASHTABLE of ExecHashJoin.
+ */
+ if (!HJ_FILL_INNER(node) &&
+ (HJ_FILL_OUTER(node) ||
+ (outerNode->plan->startup_cost < hashNode->ps.plan->total_cost &&
+ !node->hj_OuterNotEmpty)))
+ {
+ /*
+ * The first tuple of outer plan is needed to judge the necessity of
+ * inner hash here so don't start inner plan. Although the condition
+ * to come here is dependent on the costs of outer startup and hash
+ * creation and asynchronous execution will break this balance, we
+ * continue to depend on this formula for now, because of the lack of
+ * appropriate alternative.
+ */
+ }
+ else
+ {
+ /* Hash will be created. Start the inner node. */
+ async |= StartProcNode((PlanState *)hashNode);
+ }
+
+ if (async)
+ SetNodeRunState(node, Started);
+
+ return async;
+}
+
+/* ----------------------------------------------------------------
* ExecInitHashJoin
*
* Init routine for HashJoin node.
diff --git a/src/backend/executor/nodeLimit.c b/src/backend/executor/nodeLimit.c
index e59d71f..21b2c37 100644
--- a/src/backend/executor/nodeLimit.c
+++ b/src/backend/executor/nodeLimit.c
@@ -243,6 +243,28 @@ ExecLimit(LimitState *node)
return slot;
}
+/* ----------------------------------------------------------------
+ * StartLimit
+ *
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
+ * ----------------------------------------------------------------
+ */
+bool
+StartLimit(LimitState *node)
+{
+ if (!ExecNode_is_inited(node))
+ return false;
+
+ if (StartProcNode(outerPlanState(node)))
+ {
+ SetNodeRunState(node, Started);
+ return true;
+ }
+
+ return false;
+}
+
/*
* Evaluate the limit/offset expressions --- done at startup or rescan.
*
diff --git a/src/backend/executor/nodeLockRows.c b/src/backend/executor/nodeLockRows.c
index 2ccf05d..e741a93 100644
--- a/src/backend/executor/nodeLockRows.c
+++ b/src/backend/executor/nodeLockRows.c
@@ -347,6 +347,28 @@ lnext:
}
/* ----------------------------------------------------------------
+ * StartLockRows
+ *
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
+ * ----------------------------------------------------------------
+ */
+bool
+StartLockRows(LockRowsState *node)
+{
+ if (!ExecNode_is_inited(node))
+ return false;
+
+ if (StartProcNode(outerPlanState(node)))
+ {
+ SetNodeRunState(node, Started);
+ return true;
+ }
+
+ return false;
+}
+
+/* ----------------------------------------------------------------
* ExecInitLockRows
*
* This initializes the LockRows node state structures and
diff --git a/src/backend/executor/nodeMaterial.c b/src/backend/executor/nodeMaterial.c
index 981398a..4e41c1c 100644
--- a/src/backend/executor/nodeMaterial.c
+++ b/src/backend/executor/nodeMaterial.c
@@ -160,6 +160,28 @@ ExecMaterial(MaterialState *node)
}
/* ----------------------------------------------------------------
+ * StartMaterial
+ *
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
+ * ----------------------------------------------------------------
+ */
+bool
+StartMaterial(MaterialState *node)
+{
+ if (!ExecNode_is_inited(node))
+ return false;
+
+ if (StartProcNode(outerPlanState(node)))
+ {
+ SetNodeRunState(node, Started);
+ return true;
+ }
+
+ return false;
+}
+
+/* ----------------------------------------------------------------
* ExecInitMaterial
* ----------------------------------------------------------------
*/
@@ -330,6 +352,7 @@ ExecReScanMaterial(MaterialState *node)
SetNodeRunState(node, Inited);
ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
+ SetNodeRunState(node, Inited);
if (node->eflags != 0)
{
diff --git a/src/backend/executor/nodeMergeAppend.c b/src/backend/executor/nodeMergeAppend.c
index 4678d7c..ab6c304 100644
--- a/src/backend/executor/nodeMergeAppend.c
+++ b/src/backend/executor/nodeMergeAppend.c
@@ -170,6 +170,10 @@ ExecMergeAppend(MergeAppendState *node)
TupleTableSlot *result;
SlotNumber i;
+ /* start child nodes asynchronously if possible */
+ if (ExecNode_is_inited(node))
+ StartMergeAppend(node);
+
SetNodeRunState(node, Running);
if (!node->ms_initialized)
@@ -220,6 +224,32 @@ ExecMergeAppend(MergeAppendState *node)
return result;
}
+/* ----------------------------------------------------------------
+ * StartMergeAppend
+ *
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
+ * ----------------------------------------------------------------
+ */
+bool
+StartMergeAppend(MergeAppendState *node)
+{
+ int i;
+ bool async = false;
+
+ if (ExecNode_is_inited(node))
+ return false;
+
+ for (i = 0 ; i < node->ms_nplans ; i++)
+ async |= StartProcNode(node->mergeplans[i]);
+
+ if (async)
+ SetNodeRunState(node, Started);
+
+ return async;
+}
+
+
/*
* Compare the tuples in the two given slots.
*/
diff --git a/src/backend/executor/nodeMergejoin.c b/src/backend/executor/nodeMergejoin.c
index 74ceaa2..32bd8a5 100644
--- a/src/backend/executor/nodeMergejoin.c
+++ b/src/backend/executor/nodeMergejoin.c
@@ -630,6 +630,10 @@ ExecMergeJoin(MergeJoinState *node)
bool doFillOuter;
bool doFillInner;
+ /* Execute childs asynchronously if possible */
+ if (ExecNode_is_inited(node))
+ StartMergeJoin(node);
+
SetNodeRunState(node, Running);
/*
@@ -1475,6 +1479,31 @@ ExecMergeJoin(MergeJoinState *node)
}
/* ----------------------------------------------------------------
+ * AsyncStartMergeJoin
+ *
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
+ * ----------------------------------------------------------------
+ */
+bool
+StartMergeJoin(MergeJoinState *node)
+{
+ bool async;
+
+ if (!ExecNode_is_inited(node))
+ return false;
+
+ /* Merge join can unconditionally start child nodes asynchronously */
+ async = StartProcNode(innerPlanState(node));
+ async |= StartProcNode(outerPlanState(node));
+
+ if (async)
+ SetNodeRunState(node, Started);
+
+ return async;
+}
+
+/* ----------------------------------------------------------------
* ExecInitMergeJoin
* ----------------------------------------------------------------
*/
diff --git a/src/backend/executor/nodeNestloop.c b/src/backend/executor/nodeNestloop.c
index ae69176..cb95a3d 100644
--- a/src/backend/executor/nodeNestloop.c
+++ b/src/backend/executor/nodeNestloop.c
@@ -69,6 +69,10 @@ ExecNestLoop(NestLoopState *node)
ExprContext *econtext;
ListCell *lc;
+ /* Execute childs asynchronously if possible */
+ if (ExecNode_is_inited(node))
+ StartNestLoop(node);
+
SetNodeRunState(node, Running);
/*
@@ -292,6 +296,36 @@ ExecNestLoop(NestLoopState *node)
}
/* ----------------------------------------------------------------
+ * StartNestLoop
+ *
+ * The inner plan of nest loop won't be executed asynchronously if it is
+ * parameterized.
+ * Returns true if any of underlying nodes started asynchronously
+ * ----------------------------------------------------------------
+ */
+bool
+StartNestLoop(NestLoopState *node)
+{
+ NestLoop *nl = (NestLoop *) node->js.ps.plan;
+ bool async;
+
+ if (!ExecNode_is_inited(node))
+ return true;
+
+ /* Always try async execution of outer plan */
+ async = StartProcNode(outerPlanState(node));
+
+ /* This inner node cannot be asynchronous if it is parameterized */
+ if (list_length(nl->nestParams) < 1)
+ async |= StartProcNode(innerPlanState(node));
+
+ if (async)
+ SetNodeRunState(node, Started);
+
+ return async;
+}
+
+/* ----------------------------------------------------------------
* ExecInitNestLoop
* ----------------------------------------------------------------
*/
diff --git a/src/backend/executor/nodeResult.c b/src/backend/executor/nodeResult.c
index ec81eda..c33443a 100644
--- a/src/backend/executor/nodeResult.c
+++ b/src/backend/executor/nodeResult.c
@@ -171,6 +171,30 @@ ExecResult(ResultState *node)
}
/* ----------------------------------------------------------------
+ * StartResult
+ *
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
+ * ----------------------------------------------------------------
+ */
+bool
+StartResult(ResultState * node)
+{
+ PlanState *subnode = outerPlanState(node);
+
+ if (!ExecNode_is_inited(node))
+ return false;
+
+ if (subnode && StartProcNode(subnode))
+ {
+ SetNodeRunState(node, Started);
+ return true;
+ }
+
+ return false;
+}
+
+/* ----------------------------------------------------------------
* ExecResultMarkPos
* ----------------------------------------------------------------
*/
diff --git a/src/backend/executor/nodeSetOp.c b/src/backend/executor/nodeSetOp.c
index c248ff3..a0dbec6 100644
--- a/src/backend/executor/nodeSetOp.c
+++ b/src/backend/executor/nodeSetOp.c
@@ -225,6 +225,28 @@ ExecSetOp(SetOpState *node)
return setop_retrieve_direct(node);
}
+/* ----------------------------------------------------------------
+ * StartSetOp
+ *
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
+ * ----------------------------------------------------------------
+ */
+bool
+StartSetOp(SetOpState *node)
+{
+ if (!ExecNode_is_inited(node))
+ return false;
+
+ if (StartProcNode(outerPlanState(node)))
+ {
+ SetNodeRunState(node, Started);
+ return true;
+ }
+
+ return false;
+}
+
/*
* ExecSetOp for non-hashed case
*/
diff --git a/src/backend/executor/nodeSort.c b/src/backend/executor/nodeSort.c
index a2abec7..f0c9a63 100644
--- a/src/backend/executor/nodeSort.c
+++ b/src/backend/executor/nodeSort.c
@@ -148,6 +148,28 @@ ExecSort(SortState *node)
}
/* ----------------------------------------------------------------
+ * StartSort
+ *
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
+ * ----------------------------------------------------------------
+ */
+bool
+StartSort(SortState *node)
+{
+ if (!ExecNode_is_inited(node))
+ return false;
+
+ if (StartProcNode(outerPlanState(node)))
+ {
+ SetNodeRunState(node, Started);
+ return true;
+ }
+
+ return false;
+}
+
+/* ----------------------------------------------------------------
* ExecInitSort
*
* Creates the run-time state information for the sort node
diff --git a/src/backend/executor/nodeSubqueryscan.c b/src/backend/executor/nodeSubqueryscan.c
index d8799d1..4899d93 100644
--- a/src/backend/executor/nodeSubqueryscan.c
+++ b/src/backend/executor/nodeSubqueryscan.c
@@ -104,6 +104,28 @@ ExecSubqueryScan(SubqueryScanState *node)
}
/* ----------------------------------------------------------------
+ * StartSubqueryScan
+ *
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
+ * ----------------------------------------------------------------
+ */
+bool
+StartSubqueryScan(SubqueryScanState *node)
+{
+ if (ExecNode_is_inited(node))
+ return false;
+
+ if (StartProcNode(node->subplan))
+ {
+ SetNodeRunState(node, Started);
+ return true;
+ }
+
+ return false;
+}
+
+/* ----------------------------------------------------------------
* ExecInitSubqueryScan
* ----------------------------------------------------------------
*/
diff --git a/src/backend/executor/nodeUnique.c b/src/backend/executor/nodeUnique.c
index 1f7ca10..53da967 100644
--- a/src/backend/executor/nodeUnique.c
+++ b/src/backend/executor/nodeUnique.c
@@ -105,6 +105,28 @@ ExecUnique(UniqueState *node)
}
/* ----------------------------------------------------------------
+ * StartUnique
+ *
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
+ * ----------------------------------------------------------------
+ */
+bool
+StartUnique(UniqueState *node)
+{
+ if (!ExecNode_is_inited(node))
+ return false;
+
+ if (StartProcNode(outerPlanState(node)))
+ {
+ SetNodeRunState(node, Started);
+ return true;
+ }
+
+ return false;
+}
+
+/* ----------------------------------------------------------------
* ExecInitUnique
*
* This initializes the unique node state structures and
diff --git a/src/backend/executor/nodeWindowAgg.c b/src/backend/executor/nodeWindowAgg.c
index 0d127b4..6327d55 100644
--- a/src/backend/executor/nodeWindowAgg.c
+++ b/src/backend/executor/nodeWindowAgg.c
@@ -1760,6 +1760,28 @@ restart:
}
/* -----------------
+ * StartWindowAgg
+ *
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
+ * -----------------
+ */
+bool
+StartWindowAgg(WindowAggState *node)
+{
+ if (!ExecNode_is_inited(node))
+ return false;
+
+ if (StartProcNode(outerPlanState(node)))
+ {
+ SetNodeRunState(node, Started);
+ return true;
+ }
+
+ return false;
+}
+
+/* -----------------
* ExecInitWindowAgg
*
* Creates the run-time information for the WindowAgg node produced by the
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 4f77692..230c9af 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -223,6 +223,7 @@ extern void EvalPlanQualEnd(EPQState *epqstate);
*/
extern PlanState *ExecInitNode(Plan *node, EState *estate, int eflags);
extern TupleTableSlot *ExecProcNode(PlanState *node);
+extern bool StartProcNode(PlanState *node);
extern Node *MultiExecProcNode(PlanState *node);
extern void ExecEndNode(PlanState *node);
extern bool ExecShutdownNode(PlanState *node);
diff --git a/src/include/executor/nodeAgg.h b/src/include/executor/nodeAgg.h
index fe3b81a..7fb0a6f 100644
--- a/src/include/executor/nodeAgg.h
+++ b/src/include/executor/nodeAgg.h
@@ -18,6 +18,7 @@
extern AggState *ExecInitAgg(Agg *node, EState *estate, int eflags);
extern TupleTableSlot *ExecAgg(AggState *node);
+extern bool StartAgg(AggState *node);
extern void ExecEndAgg(AggState *node);
extern void ExecReScanAgg(AggState *node);
diff --git a/src/include/executor/nodeAppend.h b/src/include/executor/nodeAppend.h
index f2d920b..d77b70e 100644
--- a/src/include/executor/nodeAppend.h
+++ b/src/include/executor/nodeAppend.h
@@ -18,6 +18,7 @@
extern AppendState *ExecInitAppend(Append *node, EState *estate, int eflags);
extern TupleTableSlot *ExecAppend(AppendState *node);
+extern bool StartAppend(AppendState *node);
extern void ExecEndAppend(AppendState *node);
extern void ExecReScanAppend(AppendState *node);
diff --git a/src/include/executor/nodeCtescan.h b/src/include/executor/nodeCtescan.h
index 369dafa..e418786 100644
--- a/src/include/executor/nodeCtescan.h
+++ b/src/include/executor/nodeCtescan.h
@@ -18,6 +18,7 @@
extern CteScanState *ExecInitCteScan(CteScan *node, EState *estate, int eflags);
extern TupleTableSlot *ExecCteScan(CteScanState *node);
+extern bool StartCteScan(CteScanState *node);
extern void ExecEndCteScan(CteScanState *node);
extern void ExecReScanCteScan(CteScanState *node);
diff --git a/src/include/executor/nodeGather.h b/src/include/executor/nodeGather.h
index 9e5d8fc..e7cbe21 100644
--- a/src/include/executor/nodeGather.h
+++ b/src/include/executor/nodeGather.h
@@ -17,6 +17,7 @@
#include "nodes/execnodes.h"
extern GatherState *ExecInitGather(Gather *node, EState *estate, int eflags);
+extern bool StartGather(GatherState *node);
extern TupleTableSlot *ExecGather(GatherState *node);
extern void ExecEndGather(GatherState *node);
extern void ExecShutdownGather(GatherState *node);
diff --git a/src/include/executor/nodeGroup.h b/src/include/executor/nodeGroup.h
index 3485fe8..bfc75cd 100644
--- a/src/include/executor/nodeGroup.h
+++ b/src/include/executor/nodeGroup.h
@@ -18,6 +18,7 @@
extern GroupState *ExecInitGroup(Group *node, EState *estate, int eflags);
extern TupleTableSlot *ExecGroup(GroupState *node);
+extern bool StartGroup(GroupState *node);
extern void ExecEndGroup(GroupState *node);
extern void ExecReScanGroup(GroupState *node);
diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h
index acc28438..b0855d3 100644
--- a/src/include/executor/nodeHash.h
+++ b/src/include/executor/nodeHash.h
@@ -19,6 +19,7 @@
extern HashState *ExecInitHash(Hash *node, EState *estate, int eflags);
extern TupleTableSlot *ExecHash(HashState *node);
extern Node *MultiExecHash(HashState *node);
+extern bool StartHash(HashState *node);
extern void ExecEndHash(HashState *node);
extern void ExecReScanHash(HashState *node);
diff --git a/src/include/executor/nodeHashjoin.h b/src/include/executor/nodeHashjoin.h
index c35a51c..826f639 100644
--- a/src/include/executor/nodeHashjoin.h
+++ b/src/include/executor/nodeHashjoin.h
@@ -19,6 +19,7 @@
extern HashJoinState *ExecInitHashJoin(HashJoin *node, EState *estate, int eflags);
extern TupleTableSlot *ExecHashJoin(HashJoinState *node);
+extern bool StartHashJoin(HashJoinState *node);
extern void ExecEndHashJoin(HashJoinState *node);
extern void ExecReScanHashJoin(HashJoinState *node);
diff --git a/src/include/executor/nodeLimit.h b/src/include/executor/nodeLimit.h
index 44f2936..5e8d2ea 100644
--- a/src/include/executor/nodeLimit.h
+++ b/src/include/executor/nodeLimit.h
@@ -18,6 +18,7 @@
extern LimitState *ExecInitLimit(Limit *node, EState *estate, int eflags);
extern TupleTableSlot *ExecLimit(LimitState *node);
+extern bool StartLimit(LimitState *node);
extern void ExecEndLimit(LimitState *node);
extern void ExecReScanLimit(LimitState *node);
diff --git a/src/include/executor/nodeLockRows.h b/src/include/executor/nodeLockRows.h
index 41764a1..c450233 100644
--- a/src/include/executor/nodeLockRows.h
+++ b/src/include/executor/nodeLockRows.h
@@ -18,6 +18,7 @@
extern LockRowsState *ExecInitLockRows(LockRows *node, EState *estate, int eflags);
extern TupleTableSlot *ExecLockRows(LockRowsState *node);
+extern bool StartLockRows(LockRowsState *node);
extern void ExecEndLockRows(LockRowsState *node);
extern void ExecReScanLockRows(LockRowsState *node);
diff --git a/src/include/executor/nodeMaterial.h b/src/include/executor/nodeMaterial.h
index cfb7a13..0392d29 100644
--- a/src/include/executor/nodeMaterial.h
+++ b/src/include/executor/nodeMaterial.h
@@ -18,6 +18,7 @@
extern MaterialState *ExecInitMaterial(Material *node, EState *estate, int eflags);
extern TupleTableSlot *ExecMaterial(MaterialState *node);
+extern bool StartMaterial(MaterialState *node);
extern void ExecEndMaterial(MaterialState *node);
extern void ExecMaterialMarkPos(MaterialState *node);
extern void ExecMaterialRestrPos(MaterialState *node);
diff --git a/src/include/executor/nodeMergeAppend.h b/src/include/executor/nodeMergeAppend.h
index 3c5068c..2f637dc 100644
--- a/src/include/executor/nodeMergeAppend.h
+++ b/src/include/executor/nodeMergeAppend.h
@@ -18,6 +18,7 @@
extern MergeAppendState *ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags);
extern TupleTableSlot *ExecMergeAppend(MergeAppendState *node);
+extern bool StartMergeAppend(MergeAppendState *node);
extern void ExecEndMergeAppend(MergeAppendState *node);
extern void ExecReScanMergeAppend(MergeAppendState *node);
diff --git a/src/include/executor/nodeMergejoin.h b/src/include/executor/nodeMergejoin.h
index bee5367..ead6898 100644
--- a/src/include/executor/nodeMergejoin.h
+++ b/src/include/executor/nodeMergejoin.h
@@ -18,6 +18,7 @@
extern MergeJoinState *ExecInitMergeJoin(MergeJoin *node, EState *estate, int eflags);
extern TupleTableSlot *ExecMergeJoin(MergeJoinState *node);
+extern bool StartMergeJoin(MergeJoinState *node);
extern void ExecEndMergeJoin(MergeJoinState *node);
extern void ExecReScanMergeJoin(MergeJoinState *node);
diff --git a/src/include/executor/nodeNestloop.h b/src/include/executor/nodeNestloop.h
index ff0720f..f79a002 100644
--- a/src/include/executor/nodeNestloop.h
+++ b/src/include/executor/nodeNestloop.h
@@ -18,6 +18,7 @@
extern NestLoopState *ExecInitNestLoop(NestLoop *node, EState *estate, int eflags);
extern TupleTableSlot *ExecNestLoop(NestLoopState *node);
+extern bool StartNestLoop(NestLoopState *node);
extern void ExecEndNestLoop(NestLoopState *node);
extern void ExecReScanNestLoop(NestLoopState *node);
diff --git a/src/include/executor/nodeResult.h b/src/include/executor/nodeResult.h
index 17a7bb6..84b375d 100644
--- a/src/include/executor/nodeResult.h
+++ b/src/include/executor/nodeResult.h
@@ -18,6 +18,7 @@
extern ResultState *ExecInitResult(Result *node, EState *estate, int eflags);
extern TupleTableSlot *ExecResult(ResultState *node);
+extern bool StartResult(ResultState *node);
extern void ExecEndResult(ResultState *node);
extern void ExecResultMarkPos(ResultState *node);
extern void ExecResultRestrPos(ResultState *node);
diff --git a/src/include/executor/nodeSetOp.h b/src/include/executor/nodeSetOp.h
index ed6c96a..f960dda 100644
--- a/src/include/executor/nodeSetOp.h
+++ b/src/include/executor/nodeSetOp.h
@@ -18,6 +18,7 @@
extern SetOpState *ExecInitSetOp(SetOp *node, EState *estate, int eflags);
extern TupleTableSlot *ExecSetOp(SetOpState *node);
+extern bool StartSetOp(SetOpState *node);
extern void ExecEndSetOp(SetOpState *node);
extern void ExecReScanSetOp(SetOpState *node);
diff --git a/src/include/executor/nodeSort.h b/src/include/executor/nodeSort.h
index 20d909b..0c6d12d 100644
--- a/src/include/executor/nodeSort.h
+++ b/src/include/executor/nodeSort.h
@@ -18,6 +18,7 @@
extern SortState *ExecInitSort(Sort *node, EState *estate, int eflags);
extern TupleTableSlot *ExecSort(SortState *node);
+extern bool StartSort(SortState *node);
extern void ExecEndSort(SortState *node);
extern void ExecSortMarkPos(SortState *node);
extern void ExecSortRestrPos(SortState *node);
diff --git a/src/include/executor/nodeSubqueryscan.h b/src/include/executor/nodeSubqueryscan.h
index 56e3aec..0301edd 100644
--- a/src/include/executor/nodeSubqueryscan.h
+++ b/src/include/executor/nodeSubqueryscan.h
@@ -18,6 +18,7 @@
extern SubqueryScanState *ExecInitSubqueryScan(SubqueryScan *node, EState *estate, int eflags);
extern TupleTableSlot *ExecSubqueryScan(SubqueryScanState *node);
+extern bool StartSubqueryScan(SubqueryScanState *node);
extern void ExecEndSubqueryScan(SubqueryScanState *node);
extern void ExecReScanSubqueryScan(SubqueryScanState *node);
diff --git a/src/include/executor/nodeUnique.h b/src/include/executor/nodeUnique.h
index ec2df59..76727aa 100644
--- a/src/include/executor/nodeUnique.h
+++ b/src/include/executor/nodeUnique.h
@@ -18,6 +18,7 @@
extern UniqueState *ExecInitUnique(Unique *node, EState *estate, int eflags);
extern TupleTableSlot *ExecUnique(UniqueState *node);
+extern bool StartUnique(UniqueState *node);
extern void ExecEndUnique(UniqueState *node);
extern void ExecReScanUnique(UniqueState *node);
diff --git a/src/include/executor/nodeWindowAgg.h b/src/include/executor/nodeWindowAgg.h
index 8a7b1fa..e9699b0 100644
--- a/src/include/executor/nodeWindowAgg.h
+++ b/src/include/executor/nodeWindowAgg.h
@@ -18,6 +18,7 @@
extern WindowAggState *ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags);
extern TupleTableSlot *ExecWindowAgg(WindowAggState *node);
+extern bool StartWindowAgg(WindowAggState *node);
extern void ExecEndWindowAgg(WindowAggState *node);
extern void ExecReScanWindowAgg(WindowAggState *node);
--
1.8.3.1
0004-temporarily-introduce-a-guc-enable_asyncexec-for-tes.patchtext/x-patch; charset=us-asciiDownload
>From d2033f22bd45231caf2c9ea2f0af964b7b052c32 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Mon, 30 Nov 2015 17:41:03 +0900
Subject: [PATCH 4/5] temporarily introduce a guc enable_asyncexec for test
usage.
enable_asyncexec = false inhibits asynchronous execution of
nodes. This is added for the purpose of comparing performance with and
without async execution.
---
src/backend/executor/execMain.c | 2 ++
src/backend/executor/nodeAppend.c | 3 ++-
src/backend/executor/nodeGather.c | 1 +
src/backend/executor/nodeHashjoin.c | 4 +++-
src/backend/executor/nodeMergeAppend.c | 3 ++-
src/backend/executor/nodeMergejoin.c | 3 ++-
src/backend/executor/nodeNestloop.c | 3 ++-
src/backend/utils/misc/guc.c | 9 +++++++++
src/include/optimizer/cost.h | 1 +
9 files changed, 24 insertions(+), 5 deletions(-)
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 9f2af6d..a58cced 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -95,6 +95,8 @@ static char *ExecBuildSlotValueDescription(Oid reloid,
static void EvalPlanQualStart(EPQState *epqstate, EState *parentestate,
Plan *planTree);
+bool enable_asyncexec = false;
+
/*
* Note that GetUpdatedColumns() also exists in commands/trigger.c. There does
* not appear to be any good header to put it into, given the structures that
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index 2b918b2..4a7d781 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -62,6 +62,7 @@
static bool exec_append_initialize_next(AppendState *appendstate);
+extern bool enable_asyncexec;
/* ----------------------------------------------------------------
* exec_append_initialize_next
@@ -195,7 +196,7 @@ TupleTableSlot *
ExecAppend(AppendState *node)
{
/* start child nodes asynchronously if possible */
- if (ExecNode_is_inited(node))
+ if (enable_asyncexec && ExecNode_is_inited(node))
StartAppend(node);
SetNodeRunState(node, Running);
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index 5364acb..b642d51 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -40,6 +40,7 @@
#include "utils/memutils.h"
#include "utils/rel.h"
+extern bool enable_asyncexec;
static TupleTableSlot *gather_getnext(GatherState *gatherstate);
static HeapTuple gather_readnext(GatherState *gatherstate);
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index ada9290..cb54aba 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -39,6 +39,8 @@
/* Returns true if doing null-fill on inner relation */
#define HJ_FILL_INNER(hjstate) ((hjstate)->hj_NullOuterTupleSlot != NULL)
+extern bool enable_asyncexec;
+
static TupleTableSlot *ExecHashJoinOuterGetTuple(PlanState *outerNode,
HashJoinState *hjstate,
uint32 *hashvalue);
@@ -73,7 +75,7 @@ ExecHashJoin(HashJoinState *node)
int batchno;
/* Try to start asynchronously */
- if (ExecNode_is_inited(node))
+ if (enable_asyncexec && ExecNode_is_inited(node))
StartHashJoin(node);
SetNodeRunState(node, Running);
diff --git a/src/backend/executor/nodeMergeAppend.c b/src/backend/executor/nodeMergeAppend.c
index ab6c304..fd5abb8 100644
--- a/src/backend/executor/nodeMergeAppend.c
+++ b/src/backend/executor/nodeMergeAppend.c
@@ -52,6 +52,7 @@ typedef int32 SlotNumber;
static int heap_compare_slots(Datum a, Datum b, void *arg);
+extern bool enable_asyncexec;
/* ----------------------------------------------------------------
* ExecInitMergeAppend
@@ -171,7 +172,7 @@ ExecMergeAppend(MergeAppendState *node)
SlotNumber i;
/* start child nodes asynchronously if possible */
- if (ExecNode_is_inited(node))
+ if (enable_asyncexec && ExecNode_is_inited(node))
StartMergeAppend(node);
SetNodeRunState(node, Running);
diff --git a/src/backend/executor/nodeMergejoin.c b/src/backend/executor/nodeMergejoin.c
index 32bd8a5..d2b141b 100644
--- a/src/backend/executor/nodeMergejoin.c
+++ b/src/backend/executor/nodeMergejoin.c
@@ -151,6 +151,7 @@ typedef enum
#define MarkInnerTuple(innerTupleSlot, mergestate) \
ExecCopySlot((mergestate)->mj_MarkedTupleSlot, (innerTupleSlot))
+extern bool enable_asyncexec;
/*
* MJExamineQuals
@@ -631,7 +632,7 @@ ExecMergeJoin(MergeJoinState *node)
bool doFillInner;
/* Execute childs asynchronously if possible */
- if (ExecNode_is_inited(node))
+ if (enable_asyncexec && ExecNode_is_inited(node))
StartMergeJoin(node);
SetNodeRunState(node, Running);
diff --git a/src/backend/executor/nodeNestloop.c b/src/backend/executor/nodeNestloop.c
index cb95a3d..f55cd87 100644
--- a/src/backend/executor/nodeNestloop.c
+++ b/src/backend/executor/nodeNestloop.c
@@ -25,6 +25,7 @@
#include "executor/nodeNestloop.h"
#include "utils/memutils.h"
+extern bool enable_asyncexec;
/* ----------------------------------------------------------------
* ExecNestLoop(node)
@@ -70,7 +71,7 @@ ExecNestLoop(NestLoopState *node)
ListCell *lc;
/* Execute childs asynchronously if possible */
- if (ExecNode_is_inited(node))
+ if (enable_asyncexec && ExecNode_is_inited(node))
StartNestLoop(node);
SetNodeRunState(node, Running);
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index a185749..16e04d2 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -855,6 +855,15 @@ static struct config_bool ConfigureNamesBool[] =
NULL, NULL, NULL
},
{
+ {"enable_asyncexec", PGC_USERSET, QUERY_TUNING_METHOD,
+ gettext_noop("Enable early execution."),
+ NULL
+ },
+ &enable_asyncexec,
+ false,
+ NULL, NULL, NULL
+ },
+ {
{"enable_hashjoin", PGC_USERSET, QUERY_TUNING_METHOD,
gettext_noop("Enables the planner's use of hash join plans."),
NULL
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index ac21a3a..e9b7595 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -65,6 +65,7 @@ extern bool enable_hashagg;
extern bool enable_nestloop;
extern bool enable_material;
extern bool enable_mergejoin;
+extern bool enable_asyncexec;
extern bool enable_hashjoin;
extern int constraint_exclusion;
--
1.8.3.1
0005-Temporary-implement-of-merge-join-with-parallel-sort.patchtext/x-patch; charset=us-asciiDownload
>From fb77b5fb53c5aa86fd227c00d7bdd06bdce66dc1 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Mon, 30 Nov 2015 17:46:19 +0900
Subject: [PATCH 5/5] Temporary implement of merge join with parallel sort
This patch enables asynchronous execution of parallel execution of
both subtree of merge join if both side are explicitly sorted. This is
quite artifitial behavior but convenient to see difference of
performance.
---
src/backend/executor/execAmi.c | 8 ++++++
src/backend/executor/nodeGather.c | 35 ++++++++++++++++++++++++-
src/backend/optimizer/plan/createplan.c | 45 ++++++++++++++++++++++++++++++++
src/backend/utils/misc/guc.c | 9 +++++++
src/include/executor/nodeGather.h | 2 ++
src/include/optimizer/cost.h | 1 +
src/test/regress/expected/rangefuncs.out | 4 ++-
7 files changed, 102 insertions(+), 2 deletions(-)
diff --git a/src/backend/executor/execAmi.c b/src/backend/executor/execAmi.c
index b969fc0..8289a26 100644
--- a/src/backend/executor/execAmi.c
+++ b/src/backend/executor/execAmi.c
@@ -315,6 +315,10 @@ ExecMarkPos(PlanState *node)
ExecSortMarkPos((SortState *) node);
break;
+ case T_GatherState:
+ ExecGatherMarkPos((GatherState *) node);
+ break;
+
case T_ResultState:
ExecResultMarkPos((ResultState *) node);
break;
@@ -364,6 +368,10 @@ ExecRestrPos(PlanState *node)
ExecSortRestrPos((SortState *) node);
break;
+ case T_GatherState:
+ ExecGatherRestrPos((GatherState *) node);
+ break;
+
case T_ResultState:
ExecResultRestrPos((ResultState *) node);
break;
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index b642d51..ec45622 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -8,7 +8,7 @@
*
* A Gather executor launches parallel workers to run multiple copies of a
* plan. It can also run the plan itself, if the workers are not available
- * or have not started up yet. It then merges all of the results it produces
+ * or have not stated up yet. It then merges all of the results it produces
* and the results from the workers into a single output stream. Therefore,
* it will normally be used with a plan where running multiple copies of the
* same plan does not produce duplicate output, such as PartialSeqScan.
@@ -127,6 +127,8 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
* Returns true if any of underlying nodes started asynchronously
* ----------------------------------------------------------------
*/
+/* for test */
+static bool from_execgather = false;
bool
StartGather(GatherState *node)
{
@@ -138,6 +140,10 @@ StartGather(GatherState *node)
if (!ExecNode_is_inited(node))
return false;
+ elog(DEBUG1, "nodeGather executed %s",
+ from_execgather ? "synchronously" : "asynchronously");
+ from_execgather = false;
+
SetNodeRunState(node, Started);
/*
@@ -220,6 +226,7 @@ ExecGather(GatherState *node)
ExprContext *econtext;
/* Execute childs asynchronously if possible */
+ from_execgather = true;
if (ExecNode_is_inited(node))
StartGather(node);
@@ -468,6 +475,32 @@ ExecShutdownGather(GatherState *node)
*/
/* ----------------------------------------------------------------
+ * ExecGatherMarkPos
+ *
+ * Calls MarkPos of the top node on the worker
+ * ----------------------------------------------------------------
+ */
+void
+ExecGatherMarkPos(GatherState *node)
+{
+ /* There's no means to command worker? */
+ elog(DEBUG1, "MarkPos on Gather is not implemented");
+}
+
+/* ----------------------------------------------------------------
+ * ExecSortRestrPos
+ *
+ * Calls tuplesort to restore the last saved sort file position.
+ * ----------------------------------------------------------------
+ */
+void
+ExecGatherRestrPos(GatherState *node)
+{
+ /* There's no means to command worker? */
+ elog(ERROR, "RestrPos on Gather is not implemented");
+}
+
+/* ----------------------------------------------------------------
* ExecReScanGather
*
* Re-initialize the workers and rescans a relation via them.
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index 411b36c..e3d3456 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -844,10 +844,24 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path)
/* Now, insert a Sort node if subplan isn't sufficiently ordered */
if (!pathkeys_contained_in(pathkeys, subpath->pathkeys))
+ {
subplan = (Plan *) make_sort(root, subplan, numsortkeys,
sortColIdx, sortOperators,
collations, nullsFirst,
best_path->limit_tuples);
+ if (enable_parasortmerge)
+ {
+ Gather *gather;
+
+ gather = make_gather(subplan->targetlist,
+ NIL,
+ 1, /* num_workers */
+ true, /* single_copy */
+ subplan);
+ subplan = (Plan *)gather;
+ root->glob->parallelModeNeeded = true;
+ }
+ }
subplans = lappend(subplans, subplan);
}
@@ -2360,6 +2374,9 @@ create_nestloop_plan(PlannerInfo *root,
return join_plan;
}
+bool enable_parasortmerge = false;
+extern bool enable_asyncexec;
+
static MergeJoin *
create_mergejoin_plan(PlannerInfo *root,
MergePath *best_path,
@@ -2637,6 +2654,34 @@ create_mergejoin_plan(PlannerInfo *root,
/*
* Now we can build the mergejoin node.
*/
+
+ /* Try sort in bgworker, even if both side is not sort */
+ if (enable_parasortmerge)
+ {
+ Gather *gather;
+
+ if (IsA(outer_plan, Sort))
+ {
+ gather = make_gather(outer_plan->targetlist,
+ NIL,
+ 1, /* num_workers */
+ true, /* single_copy */
+ outer_plan);
+ outer_plan = (Plan *)gather;
+ root->glob->parallelModeNeeded = true;
+ }
+
+ if (IsA(inner_plan, Sort))
+ {
+ gather = make_gather(inner_plan->targetlist,
+ NIL,
+ 1, /* num_workers */
+ true, /* single_copy */
+ inner_plan);
+ inner_plan = (Plan *)gather;
+ root->glob->parallelModeNeeded = true;
+ }
+ }
join_plan = make_mergejoin(tlist,
joinclauses,
otherclauses,
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 16e04d2..61afd1e 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -855,6 +855,15 @@ static struct config_bool ConfigureNamesBool[] =
NULL, NULL, NULL
},
{
+ {"enable_parasortmerge", PGC_USERSET, QUERY_TUNING_METHOD,
+ gettext_noop("Enables parallel sort if both side of mergejoin need to be sorted."),
+ NULL
+ },
+ &enable_parasortmerge,
+ false,
+ NULL, NULL, NULL
+ },
+ {
{"enable_asyncexec", PGC_USERSET, QUERY_TUNING_METHOD,
gettext_noop("Enable early execution."),
NULL
diff --git a/src/include/executor/nodeGather.h b/src/include/executor/nodeGather.h
index e7cbe21..d724a2e 100644
--- a/src/include/executor/nodeGather.h
+++ b/src/include/executor/nodeGather.h
@@ -21,6 +21,8 @@ extern bool StartGather(GatherState *node);
extern TupleTableSlot *ExecGather(GatherState *node);
extern void ExecEndGather(GatherState *node);
extern void ExecShutdownGather(GatherState *node);
+extern void ExecGatherMarkPos(GatherState *node);
+extern void ExecGatherRestrPos(GatherState *node);
extern void ExecReScanGather(GatherState *node);
#endif /* NODEGATHER_H */
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index e9b7595..4f9de21 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -65,6 +65,7 @@ extern bool enable_hashagg;
extern bool enable_nestloop;
extern bool enable_material;
extern bool enable_mergejoin;
+extern bool enable_parasortmerge;
extern bool enable_asyncexec;
extern bool enable_hashjoin;
extern int constraint_exclusion;
diff --git a/src/test/regress/expected/rangefuncs.out b/src/test/regress/expected/rangefuncs.out
index 00ef421..73b5aaa 100644
--- a/src/test/regress/expected/rangefuncs.out
+++ b/src/test/regress/expected/rangefuncs.out
@@ -1,6 +1,7 @@
SELECT name, setting FROM pg_settings WHERE name LIKE 'enable%';
name | setting
----------------------+---------
+ enable_asyncexec | off
enable_bitmapscan | on
enable_hashagg | on
enable_hashjoin | on
@@ -9,10 +10,11 @@ SELECT name, setting FROM pg_settings WHERE name LIKE 'enable%';
enable_material | on
enable_mergejoin | on
enable_nestloop | on
+ enable_parasortmerge | off
enable_seqscan | on
enable_sort | on
enable_tidscan | on
-(11 rows)
+(13 rows)
CREATE TABLE foo2(fooid int, f2 int);
INSERT INTO foo2 VALUES(1, 11);
--
1.8.3.1
On Mon, Nov 30, 2015 at 6:17 PM, Kyotaro HORIGUCHI <
horiguchi.kyotaro@lab.ntt.co.jp> wrote:
====== TODO or random thoughts, not restricted on this patch.
- This patch doesn't contain planner part, it must be aware of
async execution in order that this can be in effective.
How will you decide whether sync-execution is cheaper than parallel
execution. Do you have some specific cases in mind where async
execution will be more useful than parallel execution?
- Some measture to control execution on bgworker would be
needed. At least merge join requires position mark/reset
functions.- Currently, more tuples make reduce effectiveness of parallel
execution, some method to transfer tuples in larger unit would
be needed, or would be good to have shared workmem?
Yeah, I think here one thing we need to figure out is whether the
performance bottleneck is due to the amount of data that is transferred
between worker and master or something else. One idea could be to pass
TID and may be keep the buffer pin (which will be released by master
backend), but on the other hand if we have to perform costly target list
evaluation by bgworker, then it might be beneficial to pass the projected
list back.
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com
Thank you for picking this up.
At Tue, 1 Dec 2015 20:33:02 +0530, Amit Kapila <amit.kapila16@gmail.com> wrote in <CAA4eK1LBwj7heY8pxRmMCOLhuMFr81TLHck-+ByBFuUADgeu+A@mail.gmail.com>
On Mon, Nov 30, 2015 at 6:17 PM, Kyotaro HORIGUCHI <
horiguchi.kyotaro@lab.ntt.co.jp> wrote:====== TODO or random thoughts, not restricted on this patch.
- This patch doesn't contain planner part, it must be aware of
async execution in order that this can be in effective.How will you decide whether sync-execution is cheaper than parallel
execution. Do you have some specific cases in mind where async
execution will be more useful than parallel execution?
Mmm.. Some confusion in wording? Sync-async is a discrimination
about when to start execution of a node (and its
descendents). Parallel-serial(sequential) is that of whether
multiple nodes can execute simultaneously. Async execution
premises parallel execution in any terms, bgworker or FDW.
As I wrote in the previous mail, async execution reduces startup
time of execution of parallel execution. So async execution is
not useful than parallel execution, but it accelerates parallel
execution. Is is effective when startup time of every parallel
execution node is rather long. We have enough numbers to cost it.
- Some measture to control execution on bgworker would be
needed. At least merge join requires position mark/reset
functions.- Currently, more tuples make reduce effectiveness of parallel
execution, some method to transfer tuples in larger unit would
be needed, or would be good to have shared workmem?Yeah, I think here one thing we need to figure out is whether the
performance bottleneck is due to the amount of data that is transferred
between worker and master or something else. One idea could be to pass
TID and may be keep the buffer pin (which will be released by master
backend), but on the other hand if we have to perform costly target list
evaluation by bgworker, then it might be beneficial to pass the projected
list back.
On possible bottle neck is singnalling between backends. Current
parallel execution uses signal to make producer-consumer world go
round. Conveying TID won't make it faster if the bottleneck is
the inter-process communication. I brought up bulk-transferring
or shared workmem as a example menas to reduce IPC frequency.
regards,
--
Kyotaro Horiguchi
NTT Open Source Software Center
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Wed, Dec 2, 2015 at 7:45 AM, Kyotaro HORIGUCHI <
horiguchi.kyotaro@lab.ntt.co.jp> wrote:
Thank you for picking this up.
At Tue, 1 Dec 2015 20:33:02 +0530, Amit Kapila <amit.kapila16@gmail.com>
wrote in <CAA4eK1LBwj7heY8pxRmMCOLhuMFr81TLHck-+ByBFuUADgeu+A@mail.gmail.com
On Mon, Nov 30, 2015 at 6:17 PM, Kyotaro HORIGUCHI <
horiguchi.kyotaro@lab.ntt.co.jp> wrote:====== TODO or random thoughts, not restricted on this patch.
- This patch doesn't contain planner part, it must be aware of
async execution in order that this can be in effective.How will you decide whether sync-execution is cheaper than parallel
execution. Do you have some specific cases in mind where async
execution will be more useful than parallel execution?Mmm.. Some confusion in wording? Sync-async is a discrimination
about when to start execution of a node (and its
descendents). Parallel-serial(sequential) is that of whether
multiple nodes can execute simultaneously. Async execution
premises parallel execution in any terms, bgworker or FDW.As I wrote in the previous mail, async execution reduces startup
time of execution of parallel execution.
Could you please explain in more detail how async execution reduces
the startup time for parallel execution? Can you share the plans for
both the execution plans(with and without async execution), that might
help to understand the reason for same?
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com
On Mon, Nov 30, 2015 at 7:47 AM, Kyotaro HORIGUCHI
<horiguchi.kyotaro@lab.ntt.co.jp> wrote:
"Asynchronous execution" is a feature to start substantial work
of nodes before doing Exec*. This can reduce total startup time
by folding startup time of multiple execution nodes. Especially
effective for the combination of joins or appends and their
multiple children that needs long time to startup.This patch does that by inserting another phase "Start*" between
ExecInit* and Exec* to launch parallel processing including
pgworker and FDWs before requesting the very first tuple of the
result.
I have thought about this, too, but I'm not very convinced that this
is the right model. In a typical case involving parallelism, you hope
to have the Gather node as close to the top of the plan tree as
possible. Therefore, the start phase will not happen much before the
first execution of the node, and you don't get much benefit.
Moreover, I think that prefetching can be useful not only at the start
of the query - which is the only thing that your model supports - but
also in mid-query. For example, consider an Append of two ForeignScan
nodes. Ideally we'd like to return the results in the order that they
become available, rather than serially. This model might help with
that for the first batch of rows you fetch, but not after that.
There are a couple of other problems here that are specific to this
example. You get a benefit here because you've got two Gather nodes
that both get kicked off before we try to read tuples from either, but
that's generally something to avoid - you can only use 3 processes and
typically at most 2 of those will actually be running (as opposed to
sleeping) at the same time: the workers will run to completion, and
then the leader will wake up and do its thing. I'm not saying our
current implementation of parallel query scales well to a large number
of workers (it doesn't) but I think that's more about improving the
implementation than any theoretical problem, so this seems a little
worse. Also, currently, both merge and hash joins have an
optimization wherein if the outer side of the join turns out to be
empty, we avoid paying the startup cost for the inner side of the
join; kicking off the work on the inner side of the merge join
asynchronously before we've gotten any tuples from the outer side
loses the benefit of that optimization.
I suspect there is no single paradigm that will help with all of the
cases where asynchronous execution is useful. We're going to need a
series of changes that are targeted at specific problems. For
example, here it would be useful to have one side of the join confirm
at the earliest possible stage that it will definitely return at least
one tuple eventually, but then return control to the caller so that we
can kick off the other side of the join. The sort node never
eliminates anything, so as soon as the sequential scan underneath it
coughs up a tuple, we're definitely getting a return value eventually.
At that point it's safe to kick off the other Gather node. I don't
quite know how to design a signalling system for that, but it could be
done.
But is it important enough to be worthwhile? Maybe, maybe not. I
think we should be working toward a world where the Gather is at the
top of the plan tree as often as possible, in which case
asynchronously kicking off a Gather node won't be that exciting any
more - see notes on the "parallelism + sorting" thread where I talk
about primitives that would allow massively parallel merge joins,
rather than 2 or 3 way parallel. From my point of view, the case
where we really need some kind of asynchronous execution solution is a
ForeignScan, and in particular a ForeignScan which is the child of an
Append. In that case it's obviously really useful to be able to kick
off all the foreign scans and then return a tuple from whichever one
coughs it up first. Is that the ONLY case where asynchronous
execution is useful? Probably not, but I bet it's the big one.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Tue, Dec 8, 2015 at 9:10 PM, Robert Haas <robertmhaas@gmail.com> wrote:
On Mon, Nov 30, 2015 at 7:47 AM, Kyotaro HORIGUCHI
<horiguchi.kyotaro@lab.ntt.co.jp> wrote:But is it important enough to be worthwhile? Maybe, maybe not. I
think we should be working toward a world where the Gather is at the
top of the plan tree as often as possible, in which case
asynchronously kicking off a Gather node won't be that exciting any
more - see notes on the "parallelism + sorting" thread where I talk
about primitives that would allow massively parallel merge joins,
rather than 2 or 3 way parallel. From my point of view, the case
where we really need some kind of asynchronous execution solution is a
ForeignScan, and in particular a ForeignScan which is the child of an
Append. In that case it's obviously really useful to be able to kick
off all the foreign scans and then return a tuple from whichever one
coughs it up first.
How will this be better than doing the same thing in a way we have done
Parallel Sequential Scan at ExecutorRun() time?
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com
Hello, thank you for the comment.
At Tue, 8 Dec 2015 10:40:20 -0500, Robert Haas <robertmhaas@gmail.com> wrote in <CA+TgmobLEaho40e9puy3pLbeUx_a6hKBoDUqDNQO4rwORUM-eA@mail.gmail.com>
On Mon, Nov 30, 2015 at 7:47 AM, Kyotaro HORIGUCHI
<horiguchi.kyotaro@lab.ntt.co.jp> wrote:"Asynchronous execution" is a feature to start substantial work
of nodes before doing Exec*. This can reduce total startup time
by folding startup time of multiple execution nodes. Especially
effective for the combination of joins or appends and their
multiple children that needs long time to startup.This patch does that by inserting another phase "Start*" between
ExecInit* and Exec* to launch parallel processing including
pgworker and FDWs before requesting the very first tuple of the
result.I have thought about this, too, but I'm not very convinced that this
is the right model. In a typical case involving parallelism, you hope
to have the Gather node as close to the top of the plan tree as
possible. Therefore, the start phase will not happen much before the
first execution of the node, and you don't get much benefit.
Obeying the Init-Exec semantics, Gather node cannot execute
underlying, say, Sort node before the upper node requests for the
first tuple. Async execution also potentially works for the case.
On the other hand, the patch is currently desined considering
Gahter as driven-all-time node. Since it has the same
characteristic with Append or MergeAppend in the sense that it
potentially executes multiple (and various kinds of) underlying
nodes, the patch should be redesigned following that but as far
as I can see for now that Gather executes multiple same (or
divided) scan nodes so I haven't make Gather
"asynch-aware". (If I didn't take it wrongly.)
And if necessary, we can mark the query as 'async requested' in
planning phase.
Moreover, I think that prefetching can be useful not only at the start
of the query - which is the only thing that your model supports - but
also in mid-query. For example, consider an Append of two ForeignScan
nodes. Ideally we'd like to return the results in the order that they
become available, rather than serially. This model might help with
that for the first batch of rows you fetch, but not after that.
Yeah, async-exec can have the similar mechanism as Gahter to
fetch tuples from underlying nodes.
There are a couple of other problems here that are specific to this
example. You get a benefit here because you've got two Gather nodes
that both get kicked off before we try to read tuples from either, but
that's generally something to avoid - you can only use 3 processes and
typically at most 2 of those will actually be running (as opposed to
Yes, it is one of the reason why I said the example as artificial.
sleeping) at the same time: the workers will run to completion, and
then the leader will wake up and do its thing. I'm not saying our
current implementation of parallel query scales well to a large number
of workers (it doesn't) but I think that's more about improving the
implementation than any theoretical problem, so this seems a little
worse. Also, currently, both merge and hash joins have an
optimization wherein if the outer side of the join turns out to be
empty, we avoid paying the startup cost for the inner side of the
join; kicking off the work on the inner side of the merge join
asynchronously before we've gotten any tuples from the outer side
loses the benefit of that optimization.
It is a matter of comparson, async wins if the startup time of
the outer is longer (to some extent) than the time to build the
inner hash. But it requries planner part. I'll take it into
account if async exec itself is found to be useful.
I suspect there is no single paradigm that will help with all of the
cases where asynchronous execution is useful. We're going to need a
series of changes that are targeted at specific problems. For
example, here it would be useful to have one side of the join confirm
at the earliest possible stage that it will definitely return at least
one tuple eventually, but then return control to the caller so that we
can kick off the other side of the join. The sort node never
eliminates anything, so as soon as the sequential scan underneath it
coughs up a tuple, we're definitely getting a return value eventually.
It's quite impressive. But it might be a business of the planner.
At that point it's safe to kick off the other Gather node. I don't
quite know how to design a signalling system for that, but it could be
done.
I agree. I'll make further considertaion on that.
But is it important enough to be worthwhile? Maybe, maybe not. I
think we should be working toward a world where the Gather is at the
top of the plan tree as often as possible, in which case
asynchronously kicking off a Gather node won't be that exciting any
more - see notes on the "parallelism + sorting" thread where I talk
about primitives that would allow massively parallel merge joins,
rather than 2 or 3 way parallel.
Could you give me the subject of the thread? Or important message
of that.
From my point of view, the case
where we really need some kind of asynchronous execution solution is a
ForeignScan, and in particular a ForeignScan which is the child of an
Append. In that case it's obviously really useful to be able to kick
off all the foreign scans and then return a tuple from whichever one
coughs it up first. Is that the ONLY case where asynchronous
execution is useful? Probably not, but I bet it's the big one.
Yes, the most significant and obvious (but hard to estimate the
benefit) target of async execution is (Merge)Append-ForeignScan,
which is narrow but freuquently used. And this patch has started
from it.
It is because of the startup-heavy nature of FDW. So I involved
sort as a target later then redesigned to give the ability on all
nodes. If it is obviously over-done for the (currently) expected
benefit and if it is preferable to shrink this patch so as to
touch only the portion where async-exec has a benefit, I'll do
so.
regards,
--
Kyotaro Horiguchi
NTT Open Source Software Center
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Hi,
On 2015/12/14 17:34, Kyotaro HORIGUCHI wrote:
At Tue, 8 Dec 2015 10:40:20 -0500, Robert Haas <robertmhaas@gmail.com> wrote
But is it important enough to be worthwhile? Maybe, maybe not. I
think we should be working toward a world where the Gather is at the
top of the plan tree as often as possible, in which case
asynchronously kicking off a Gather node won't be that exciting any
more - see notes on the "parallelism + sorting" thread where I talk
about primitives that would allow massively parallel merge joins,
rather than 2 or 3 way parallel.Could you give me the subject of the thread? Or important message
of that.
I think that would be the following thread:
* parallelism and sorting *
/messages/by-id/CA+TgmoYh4zsQMgqiyra7zO1RBBvG1qHn1fJT5q0Fpw+Q0xAjrg@mail.gmail.com
Thanks,
Amit
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Thank you a lot!
At Mon, 14 Dec 2015 17:51:41 +0900, Amit Langote <Langote_Amit_f8@lab.ntt.co.jp> wrote in <566E831D.1050703@lab.ntt.co.jp>
Hi,
On 2015/12/14 17:34, Kyotaro HORIGUCHI wrote:
At Tue, 8 Dec 2015 10:40:20 -0500, Robert Haas <robertmhaas@gmail.com> wrote
But is it important enough to be worthwhile? Maybe, maybe not. I
think we should be working toward a world where the Gather is at the
top of the plan tree as often as possible, in which case
asynchronously kicking off a Gather node won't be that exciting any
more - see notes on the "parallelism + sorting" thread where I talk
about primitives that would allow massively parallel merge joins,
rather than 2 or 3 way parallel.Could you give me the subject of the thread? Or important message
of that.I think that would be the following thread:
* parallelism and sorting *
/messages/by-id/CA+TgmoYh4zsQMgqiyra7zO1RBBvG1qHn1fJT5q0Fpw+Q0xAjrg@mail.gmail.com
Thank you for the pointer. I'll read it.
# It's hard for me to do eyeball-greping on English texts..
regards,
--
Kyotaro Horiguchi
NTT Open Source Software Center
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Fri, Dec 11, 2015 at 11:49 PM, Amit Kapila <amit.kapila16@gmail.com> wrote:
But is it important enough to be worthwhile? Maybe, maybe not. I
think we should be working toward a world where the Gather is at the
top of the plan tree as often as possible, in which case
asynchronously kicking off a Gather node won't be that exciting any
more - see notes on the "parallelism + sorting" thread where I talk
about primitives that would allow massively parallel merge joins,
rather than 2 or 3 way parallel. From my point of view, the case
where we really need some kind of asynchronous execution solution is a
ForeignScan, and in particular a ForeignScan which is the child of an
Append. In that case it's obviously really useful to be able to kick
off all the foreign scans and then return a tuple from whichever one
coughs it up first.How will this be better than doing the same thing in a way we have done
Parallel Sequential Scan at ExecutorRun() time?
I'm not sure if this is what you are asking, but I think it probably
should be done at ExecutorRun() time, rather than a separate phase.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Mon, Dec 14, 2015 at 3:34 AM, Kyotaro HORIGUCHI
<horiguchi.kyotaro@lab.ntt.co.jp> wrote:
Yes, the most significant and obvious (but hard to estimate the
benefit) target of async execution is (Merge)Append-ForeignScan,
which is narrow but freuquently used. And this patch has started
from it.It is because of the startup-heavy nature of FDW. So I involved
sort as a target later then redesigned to give the ability on all
nodes. If it is obviously over-done for the (currently) expected
benefit and if it is preferable to shrink this patch so as to
touch only the portion where async-exec has a benefit, I'll do
so.
Suppose we equip each EState with the ability to fire "callbacks".
Callbacks have the signature:
typedef bool (*ExecCallback)(PlanState *planstate, TupleTableSlot
*slot, void *context);
Executor nodes can register immediate callbacks to be run at the
earliest possible opportunity using a function like
ExecRegisterCallback(estate, callback, planstate, slot, context).
They can registered deferred callbacks that will be called when a file
descriptor becomes ready for I/O, or when the process latch is set,
using a call like ExecRegisterFileCallback(estate, fd, event,
callback, planstate, slot, context) or
ExecRegisterLatchCallback(estate, callback, planstate, slot, context).
To execute callbacks, an executor node can call ExecFireCallbacks(),
which will fire immediate callbacks in order of registration, and wait
for the file descriptors for which callbacks have been registered and
for the process latch when no immediate callbacks remain but there are
still deferred callbacks. It will return when (1) there are no
remaining immediate or deferred callbacks or (2) one of the callbacks
returns "true".
Then, suppose we add a function bool ExecStartAsync(PlanState *target,
ExecCallback callback, PlanState *cb_planstate, void *cb_context).
For non-async-aware plan nodes, this just returns false. async-aware
plan nodes should initiate some work, register some callbacks, and
return. The callback that get registered should arrange in turn to
register the callback passed as an argument when a tuple becomes
available, passing the planstate and context provided by
ExecStartAsync's caller, plus the TupleTableSlot containing the tuple.
So, in response to ExecStartAsync, if there's no tuple currently
available, postgres_fdw can send a query to the remote server and
request a callback when the fd becomes ready-ready. It must save the
callback passed to ExecStartAsync inside the PlanState someplace so
that when a tuple becomes available it can register that callback.
ExecAppend can call ExecStartAsync on each of its subplans. For any
subplan where ExecStartAsync returns false, ExecAppend will just
execute it normally, by calling ExecProcNode repeatedly until no more
tuples are returned. But for async-capable subplans, it can call
ExecStartAsync on all of them, and then call ExecFireCallbacks. The
tuple-ready callback it passes to its child plans will take the tuple
provided by the child plan and store it into the Append node's slot.
It will then return true if, and only if, ExecFireCallbacks is being
invoked from ExecAppend (which it can figure out via some kind of
signalling either through its own PlanState or centralized signalling
through the EState). That way, if ExecAppend were itself invoked
asynchronously, its tuple-ready callback could simply populate a slot
appropriately register its invoker's tuple-ready callback. Whether
called synchronously or asynchronously, each invocation of as
asynchronous append after the first would just need to again
ExecStartAsync on the child that last returned a tuple.
It seems pretty straightforward to fit Gather into this infrastructure.
It is unclear to me how useful this is beyond ForeignScan, Gather, and
Append. MergeAppend's ordering constraint makes it less useful; we
can asynchronously kick off the request for the next tuple before
returning the previous one, but we're going to need to have that tuple
before we can return the next one. But it could be done. It could
potentially even be applied to seq scans or index scans using some set
of asynchronous I/O interfaces, but I don't see how it could be
applied to joins or aggregates, which typically can't really proceed
until they get the next tuple. They could be plugged into this
interface easily enough but it would only help to the extent that it
enabled asynchrony elsewhere in the plan tree to be pulled up towards
the root.
Thoughts?
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Wed, Dec 16, 2015 at 4:54 AM, Robert Haas <robertmhaas@gmail.com> wrote:
On Fri, Dec 11, 2015 at 11:49 PM, Amit Kapila <amit.kapila16@gmail.com>
wrote:
But is it important enough to be worthwhile? Maybe, maybe not. I
think we should be working toward a world where the Gather is at the
top of the plan tree as often as possible, in which case
asynchronously kicking off a Gather node won't be that exciting any
more - see notes on the "parallelism + sorting" thread where I talk
about primitives that would allow massively parallel merge joins,
rather than 2 or 3 way parallel. From my point of view, the case
where we really need some kind of asynchronous execution solution is a
ForeignScan, and in particular a ForeignScan which is the child of an
Append. In that case it's obviously really useful to be able to kick
off all the foreign scans and then return a tuple from whichever one
coughs it up first.How will this be better than doing the same thing in a way we have done
Parallel Sequential Scan at ExecutorRun() time?I'm not sure if this is what you are asking, but I think it probably
should be done at ExecutorRun() time, rather than a separate phase.
Yes, thats one thing I wanted to know, yet another point which is not
clear to me about this Async infrastructure is why the current
infrastructure
of Parallelism can't be used to achieve the Async benefits of ForeignScan?
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com
On Wed, Dec 16, 2015 at 1:34 AM, Amit Kapila <amit.kapila16@gmail.com> wrote:
Yes, thats one thing I wanted to know, yet another point which is not
clear to me about this Async infrastructure is why the current
infrastructure
of Parallelism can't be used to achieve the Async benefits of ForeignScan?
Well, all a ForeignScan by postgres_fdw does is read the tuples that
are generated remotely. Turning around and sticking those into a
Funnel doesn't seem like it gains much: now instead of having to read
tuples from someplace, the leader has to read tuples from some other
place. Yeah, there are cases where it could win, like when there's a
selective nonpushable qual, but that's not that exciting.
There's another, more serious problem: if the leader has a connection
open to the remote server and that connection is in mid-transaction,
you can't have a worker open a new connection without changing the
semantics. Working around that problem looks hard to me.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Wed, Dec 16, 2015 at 6:04 PM, Robert Haas <robertmhaas@gmail.com> wrote:
On Wed, Dec 16, 2015 at 1:34 AM, Amit Kapila <amit.kapila16@gmail.com>
wrote:
Yes, thats one thing I wanted to know, yet another point which is not
clear to me about this Async infrastructure is why the current
infrastructure
of Parallelism can't be used to achieve the Async benefits of
ForeignScan?
Well, all a ForeignScan by postgres_fdw does is read the tuples that
are generated remotely. Turning around and sticking those into a
Funnel doesn't seem like it gains much: now instead of having to read
tuples from someplace, the leader has to read tuples from some other
place. Yeah, there are cases where it could win, like when there's a
selective nonpushable qual, but that's not that exciting.There's another, more serious problem: if the leader has a connection
open to the remote server and that connection is in mid-transaction,
you can't have a worker open a new connection without changing the
semantics. Working around that problem looks hard to me.
Okay. It seems there are cases where it could benefit from Async
execution infrastructure instead of directly using Gather node kind
of infrastructure. I am not the right person to judge whether there
are enough cases that we need a new infrastructure for such executions,
but I think it is a point to consider and I am sure you will make the
right move.
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com
Hi Robert and others,
First, I currently don't know the postgresql code well enough yet. I still
hope my toughts are usefull.
Robert Haas wrote:
It is unclear to me how useful this is beyond ForeignScan, Gather, and
Append. MergeAppend's ordering constraint makes it less useful; we
can asynchronously kick off the request for the next tuple before
returning the previous one, but we're going to need to have that tuple
before we can return the next one. But it could be done. It could
potentially even be applied to seq scans or index scans using some set
of asynchronous I/O interfaces, but I don't see how it could be
applied to joins or aggregates, which typically can't really proceed
until they get the next tuple. They could be plugged into this
interface easily enough but it would only help to the extent that it
enabled asynchrony elsewhere in the plan tree to be pulled up towards
the root.
As far as I understand, this comes down to a number of subplans. The subplan
can be asked to return a tuple directly or at some later point in time
(asynchrone). Conceptionally, the subplan goes in a "tuples wanted" state
and it starts it works that need to be done to receive that tuple. Later, it
either returns the tuple or the message that there are no tuples.
I see opportunities to use this feature to make some queryplans less memory
intensive without increasing the total amount of work to be done. I think
the same subplan can be placed at several places in the execution plan. If
the subplan ends with a tuple store, then if a row is requested, it can
either return it from store, or generate it in the subplan. If both outputs
of the subplan request tuples at around the same rate, the tuple store size
is limited where we currently need to save all the tuples or generate the
tuples multiple times. I think this can be potentionally beneficial for
CTE's.
I also think the planner can predict what the chances are that a subplan can
be reused. If both subplans are on a different side of a hash join, it can
know that one output will be exhausted before the second output will request
the first row. That would mean that the everything needs to be stored. Also,
not every callback needs to be invoked at the same rate: tuple storage might
be avoided by choosing the callback to invoke wisely.
I am a little bit worried about the performance as a result of context
switching. I think it is a good idea to only register the callback if it
actually hits a point where the tuple can't be generated directly.
Thoughts?
Regards,
Mart
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Thank you for the comment.
At Tue, 15 Dec 2015 21:01:27 -0500, Robert Haas <robertmhaas@gmail.com> wrote in <CA+TgmoZuAqVDJQ14YHCa3izbdaaaUSuwrG1YbtJD0rKO5EmeKQ@mail.gmail.com>
On Mon, Dec 14, 2015 at 3:34 AM, Kyotaro HORIGUCHI
<horiguchi.kyotaro@lab.ntt.co.jp> wrote:Yes, the most significant and obvious (but hard to estimate the
benefit) target of async execution is (Merge)Append-ForeignScan,
which is narrow but freuquently used. And this patch has started
from it.It is because of the startup-heavy nature of FDW. So I involved
sort as a target later then redesigned to give the ability on all
nodes. If it is obviously over-done for the (currently) expected
benefit and if it is preferable to shrink this patch so as to
touch only the portion where async-exec has a benefit, I'll do
so.Suppose we equip each EState with the ability to fire "callbacks".
Callbacks have the signature:typedef bool (*ExecCallback)(PlanState *planstate, TupleTableSlot
*slot, void *context);Executor nodes can register immediate callbacks to be run at the
earliest possible opportunity using a function like
ExecRegisterCallback(estate, callback, planstate, slot, context).
They can registered deferred callbacks that will be called when a file
descriptor becomes ready for I/O, or when the process latch is set,
using a call like ExecRegisterFileCallback(estate, fd, event,
callback, planstate, slot, context) or
ExecRegisterLatchCallback(estate, callback, planstate, slot, context).To execute callbacks, an executor node can call ExecFireCallbacks(),
which will fire immediate callbacks in order of registration, and wait
for the file descriptors for which callbacks have been registered and
for the process latch when no immediate callbacks remain but there are
still deferred callbacks. It will return when (1) there are no
remaining immediate or deferred callbacks or (2) one of the callbacks
returns "true".
Excellent! I unconsciously excluded the case of callbacks because
I supposed (without certain ground) all executor nodes can have a
chance to win from this. Such callback is a good choice to do
what Start*Node did in the lastest patch.
Then, suppose we add a function bool ExecStartAsync(PlanState *target,
ExecCallback callback, PlanState *cb_planstate, void *cb_context).
For non-async-aware plan nodes, this just returns false. async-aware
plan nodes should initiate some work, register some callbacks, and
return. The callback that get registered should arrange in turn to
register the callback passed as an argument when a tuple becomes
available, passing the planstate and context provided by
ExecStartAsync's caller, plus the TupleTableSlot containing the tuple.
Although I don't imagine clearly about the case of
async-aware-nodes under non-aware-nodes, it seems to have a high
affinity with (true) parallel execution framework.
So, in response to ExecStartAsync, if there's no tuple currently
available, postgres_fdw can send a query to the remote server and
request a callback when the fd becomes ready-ready. It must save the
callback passed to ExecStartAsync inside the PlanState someplace so
that when a tuple becomes available it can register that callback.ExecAppend can call ExecStartAsync on each of its subplans. For any
subplan where ExecStartAsync returns false, ExecAppend will just
execute it normally, by calling ExecProcNode repeatedly until no more
tuples are returned. But for async-capable subplans, it can call
ExecStartAsync on all of them, and then call ExecFireCallbacks. The
tuple-ready callback it passes to its child plans will take the tuple
provided by the child plan and store it into the Append node's slot.
It will then return true if, and only if, ExecFireCallbacks is being
invoked from ExecAppend (which it can figure out via some kind of
signalling either through its own PlanState or centralized signalling
through the EState). That way, if ExecAppend were itself invoked
asynchronously, its tuple-ready callback could simply populate a slot
appropriately register its invoker's tuple-ready callback. Whether
called synchronously or asynchronously, each invocation of as
asynchronous append after the first would just need to again
ExecStartAsync on the child that last returned a tuple.
Thanks for the attentive explanation. My concern about this is
that the latency by synchronizing one by one for every tuple
between the producer and the consumer. My previous patch is not
asynchronous on every tuple so it can give a pure gain without
loss from tuple-wise synchronization. But it looks clean and I
like it so I'll consider this.
It seems pretty straightforward to fit Gather into this infrastructure.
Yes.
It is unclear to me how useful this is beyond ForeignScan, Gather, and
Append. MergeAppend's ordering constraint makes it less useful; we
can asynchronously kick off the request for the next tuple before
returning the previous one, but we're going to need to have that tuple
before we can return the next one. But it could be done. It could
potentially even be applied to seq scans or index scans using some set
of asynchronous I/O interfaces, but I don't see how it could be
applied to joins or aggregates, which typically can't really proceed
until they get the next tuple. They could be plugged into this
interface easily enough but it would only help to the extent that it
enabled asynchrony elsewhere in the plan tree to be pulled up towards
the root.
This is mainly not an argument on "asynchronous execution/start"
but "asynchronous tuple-passing". As I showed before, a merge
join on asynchronous and parallel children running sort *can* win
over a hash join (if planner foresees that). If asynchronous
tuple-passing is not so effective like MergeAppend, we can
simplly refrain from doing that. But cost modeling for it is a
difficult problem.
Thoughts?
I'll try the callback framework and in-process asynchronous
tuple-passing (like select(2)). Please wait for a while.
regares,
--
Kyotaro Horiguchi
NTT Open Source Software Center
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Hello,
I put some consideration and trial on callbacks as a means to
async(early)-execution.
Suppose we equip each EState with the ability to fire "callbacks".
Callbacks have the signature:typedef bool (*ExecCallback)(PlanState *planstate, TupleTableSlot
*slot, void *context);Executor nodes can register immediate callbacks to be run at the
earliest possible opportunity using a function like
ExecRegisterCallback(estate, callback, planstate, slot, context).
They can registered deferred callbacks that will be called when a file
descriptor becomes ready for I/O, or when the process latch is set,
using a call like ExecRegisterFileCallback(estate, fd, event,
callback, planstate, slot, context) or
ExecRegisterLatchCallback(estate, callback, planstate, slot, context).
I considered on this. The immediate callbacks seems fine but
using latch or fds to signal tuple availability doesn't seem to
fit callbacks stored in estate. They are deferrable until
parent's tuple request and such kind of events can be handled at
the time as ExecGather does now. However some kind of
synchronize/waiting mechanism like latch or select() is needed
anyway.
To execute callbacks, an executor node can call ExecFireCallbacks(),
which will fire immediate callbacks in order of registration, and wait
for the file descriptors for which callbacks have been registered and
for the process latch when no immediate callbacks remain but there are
still deferred callbacks. It will return when (1) there are no
remaining immediate or deferred callbacks or (2) one of the callbacks
returns "true".Excellent! I unconsciously excluded the case of callbacks because
I supposed (without certain ground) all executor nodes can have a
chance to win from this. Such callback is a good choice to do
what Start*Node did in the lastest patch.
The previous code added a large amount of garbage, which was the
mechanism of async-execution including additional code for
ExecStartNode phase in the same manner to ExecProcNode and
ExecEndNode. Most of the additional code is totally useless for
most of the types of node.
Callback is usable for not-so-common invoked-for-a-event-at-once
operations such like error-handling. For this case, the
operations can be asynch-execution of a node and the event can be
just before ExecProcNode on the topmost node. The first patch
attached allows async-capable nodes to register callbacks on Init
phase and executes them just before Exec phase on the topmost
node. It grately reduces the additional code as the result. My
first impression from the word "callbacks" is this.
The following operation yields LOG messages from dummy callback
with this patch.
CREATE TABLE t1 (a int, b int);
INSERT INTO t1 (SELECT a, 1 FROM generate_series(0, 99) a);
CREATE TABLE t2 (a int, b int);
INSERT INTO t2 (SELECT a, 2 FROM generate_series(0, 99) a);
CREATE TABLE t3 (a int, b int);
INSERT INTO t3 (SELECT a, 3 FROM generate_series(0, 99) a);
SELECT * FROM t1 UNION ALL SELECT * FROM t2 UNION ALL SELECT * FROM t3;
===
LOG: dummy_async_cb is called for 0x2783a98
LOG: dummy_async_cb is called for 0x2784248
LOG: dummy_async_cb is called for 0x2784ad0
What my previous patch could do is doable by this first patch
with far less diffs.
If this design is not bad, I'll do postgres_fdw part.
Next is discussion about async tuple fetching.
Then, suppose we add a function bool ExecStartAsync(PlanState *target,
ExecCallback callback, PlanState *cb_planstate, void *cb_context).
For non-async-aware plan nodes, this just returns false. async-aware
plan nodes should initiate some work, register some callbacks, and
return. The callback that get registered should arrange in turn to
register the callback passed as an argument when a tuple becomes
available, passing the planstate and context provided by
ExecStartAsync's caller, plus the TupleTableSlot containing the tuple.Although I don't imagine clearly about the case of
async-aware-nodes under non-aware-nodes, it seems to have a high
affinity with (true) parallel execution framework.
The ExecStartAsync is similar to ExecStartNode of my old
patch. One of the most annoying things of that is that it needs
to walk down to their descendents and in turn it needs garbageous
corresponding additional codes for all type of nodes which can
have children.
Instead, in the second patch, I modified ExecProcNode to return
async status in EState. It will be EXEC_READY or EXEC_EOT(End of
table/No more tuple?) for non-async-capable nodes and
async-capable nodes can set it EXEC_NOT_READY, which indicates
that there could be more tuple but not available yet.
Async-aware nodes such as Append can go to the next child if the
predecessor returned EXEC_NOT_READY. If all !EXEC_EOT nodes
returned EXEC_NOT_READY, Append will wait using some signaling
mechanism (it runs busily now instead.). As an example, the
second patch modifies ExecAppend to handle it and modified
ExecSeqScan to return EXEC_NOT_READY by certain probability as an
emulation of asynchronous tuple fetching. The UNION ALL query
above returns results stirred among the tree tables as the result.
So, in response to ExecStartAsync, if there's no tuple currently
available, postgres_fdw can send a query to the remote server and
request a callback when the fd becomes ready-ready. It must save the
callback passed to ExecStartAsync inside the PlanState someplace so
that when a tuple becomes available it can register that callback.ExecAppend can call ExecStartAsync on each of its subplans. For any
subplan where ExecStartAsync returns false, ExecAppend will just
execute it normally, by calling ExecProcNode repeatedly until no more
tuples are returned. But for async-capable subplans, it can call
ExecStartAsync on all of them, and then call ExecFireCallbacks. The
tuple-ready callback it passes to its child plans will take the tuple
provided by the child plan and store it into the Append node's slot.
It will then return true if, and only if, ExecFireCallbacks is being
invoked from ExecAppend (which it can figure out via some kind of
signalling either through its own PlanState or centralized signalling
through the EState). That way, if ExecAppend were itself invoked
asynchronously, its tuple-ready callback could simply populate a slot
appropriately register its invoker's tuple-ready callback. Whether
called synchronously or asynchronously, each invocation of as
asynchronous append after the first would just need to again
ExecStartAsync on the child that last returned a tuple.Thanks for the attentive explanation. My concern about this is
that the latency by synchronizing one by one for every tuple
between the producer and the consumer. My previous patch is not
asynchronous on every tuple so it can give a pure gain without
loss from tuple-wise synchronization. But it looks clean and I
like it so I'll consider this.It seems pretty straightforward to fit Gather into this infrastructure.
Yes.
If Gather's children become a regular node struct with a name
like Worker(Node), instead of non-Node structure as it is now, we
can generalize the tuple-synchronization mecanism so that it can
be used by other nodes such as ForeginScan. Append(ForegnScan,
ForegnScan,...) with async tuple passing can average multiple
foreign servers so I suppose that it is preferable if no penalty
exists.
It is unclear to me how useful this is beyond ForeignScan, Gather, and
Append. MergeAppend's ordering constraint makes it less useful; we
can asynchronously kick off the request for the next tuple before
returning the previous one, but we're going to need to have that tuple
before we can return the next one. But it could be done. It could
potentially even be applied to seq scans or index scans using some set
of asynchronous I/O interfaces, but I don't see how it could be
applied to joins or aggregates, which typically can't really proceed
until they get the next tuple. They could be plugged into this
interface easily enough but it would only help to the extent that it
enabled asynchrony elsewhere in the plan tree to be pulled up towards
the root.This is mainly not an argument on "asynchronous execution/start"
but "asynchronous tuple-passing". As I showed before, a merge
join on asynchronous and parallel children running sort *can* win
over a hash join (if planner foresees that). If asynchronous
tuple-passing is not so effective like MergeAppend, we can
simplly refrain from doing that. But cost modeling for it is a
difficult problem.Thoughts?
I'll try the callback framework and in-process asynchronous
tuple-passing (like select(2)). Please wait for a while.
Finally, the two patches attached became somewhat different from
Robert's suggestion and lacks of synchronization feature. However
if this way to is not so bad, I'll build the feature on this way.
Suggestions? Thoughts?
regards,
--
Kyotaro Horiguchi
NTT Open Source Software Center
Attachments:
0001-PoC-Async-start-callback-for-executor.patchtext/x-patch; charset=us-asciiDownload
>From 87e5c9eb6f230b9682fe300bc1592cb9f4fcadb5 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Thu, 21 Jan 2016 09:37:25 +0900
Subject: [PATCH 1/2] PoC: Async start callback for executor.
This patch allows async-capable nodes to register callbacks to run the
node before ExecProcNode(). eflags has new bit EXEC_FLAG_ASYNC to
request asynchronous execution to children on ExecInit phase.
As an example, nodeSeqscan registers dummy callback if requested, and
nodeAppend unconditionally requests to its children. So a plan
Append(SeqScan, SeqScan) runs the callback and yields LOG messages.
---
src/backend/executor/execMain.c | 2 +
src/backend/executor/execProcnode.c | 9 ++
src/backend/executor/execUtils.c | 39 ++++++++
src/backend/executor/nodeAppend.c | 2 +
src/backend/executor/nodeGather.c | 166 ++++++++++++++++++++-------------
src/backend/executor/nodeMergeAppend.c | 3 +
src/backend/executor/nodeNestloop.c | 13 +++
src/backend/executor/nodeSeqscan.c | 9 ++
src/include/executor/executor.h | 2 +
src/include/nodes/execnodes.h | 23 ++++-
src/include/nodes/plannodes.h | 1 -
11 files changed, 202 insertions(+), 67 deletions(-)
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 76f7297..7fe188a 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -1552,6 +1552,8 @@ ExecutePlan(EState *estate,
if (use_parallel_mode)
EnterParallelMode();
+ AsyncStartNode(planstate);
+
/*
* Loop until we've processed the proper number of tuples from the plan.
*/
diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index a31dbc9..df9e533 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -786,6 +786,15 @@ ExecEndNode(PlanState *node)
}
/*
+ * AsyncStartNode - execute registered early-startup callbacks
+ */
+void
+AsyncStartNode(PlanState *node)
+{
+ RunAsyncCallbacks(node->state->es_async_cb_list);
+}
+
+/*
* ExecShutdownNode
*
* Give execution nodes a chance to stop asynchronous resource consumption
diff --git a/src/backend/executor/execUtils.c b/src/backend/executor/execUtils.c
index e937cf8..0627772 100644
--- a/src/backend/executor/execUtils.c
+++ b/src/backend/executor/execUtils.c
@@ -964,3 +964,42 @@ ShutdownExprContext(ExprContext *econtext, bool isCommit)
MemoryContextSwitchTo(oldcontext);
}
+
+/*
+ * Register a async startup callback to EState.
+ *
+ * The callbacks are executed from the first of the list and this function
+ * puts the callbacks in registered order. This is not necessary if they are
+ * truely asynchronous and independent but the ordering is safer if some of
+ * them have an execution order in back.
+ */
+void
+RegisterAsyncCallback(EState *estate, AsyncStartCallback func, PlanState *node,
+ int eflags)
+{
+ AsyncStartListItem *elem = palloc(sizeof(AsyncStartListItem));
+ elem->cbfunc = func;
+ elem->node = node;
+
+ if (eflags & EXEC_FLAG_ASYNC)
+ estate->es_async_cb_list =
+ lappend(estate->es_async_cb_list, elem);
+}
+
+/*
+ * Run callbacks in the list
+ */
+void
+RunAsyncCallbacks(List *list)
+{
+ ListCell *lc;
+
+ foreach (lc, list)
+ {
+ AsyncStartListItem *cb = (AsyncStartListItem *) lfirst(lc);
+
+ cb->cbfunc(cb->node);
+ }
+
+ return;
+}
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index a26bd63..d10364c 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -165,6 +165,8 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
{
Plan *initNode = (Plan *) lfirst(lc);
+ /* always request async-execition for children */
+ eflags |= EXEC_FLAG_ASYNC;
appendplanstates[i] = ExecInitNode(initNode, estate, eflags);
i++;
}
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index 16c981b..3f9b8b0 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -45,7 +45,90 @@
static TupleTableSlot *gather_getnext(GatherState *gatherstate);
static HeapTuple gather_readnext(GatherState *gatherstate);
static void ExecShutdownGatherWorkers(GatherState *node);
+static bool StartGather(PlanState *psnode);
+/* ----------------------------------------------------------------
+ * StartGather
+ *
+ * Gather node can have an advantage from asynchronous execution in most
+ * cases because of its startup cost.
+ * ----------------------------------------------------------------
+ */
+static bool
+StartGather(PlanState *psnode)
+{
+ GatherState *node = (GatherState *)psnode;
+ EState *estate = node->ps.state;
+ Gather *gather = (Gather *) node->ps.plan;
+ TupleTableSlot *fslot = node->funnel_slot;
+ int i;
+
+ /* Don't start if already started or explicitly inhibited by the upper */
+ if (node->initialized)
+ return false;
+
+ /*
+ * Initialize the parallel context and workers on first execution. We do
+ * this on first execution rather than during node initialization, as it
+ * needs to allocate large dynamic segment, so it is better to do if it
+ * is really needed.
+ */
+
+ /*
+ * Sometimes we might have to run without parallelism; but if
+ * parallel mode is active then we can try to fire up some workers.
+ */
+ if (gather->num_workers > 0 && IsInParallelMode())
+ {
+ ParallelContext *pcxt;
+ bool got_any_worker = false;
+
+ /* Initialize the workers required to execute Gather node. */
+ if (!node->pei)
+ node->pei = ExecInitParallelPlan(node->ps.lefttree,
+ estate,
+ gather->num_workers);
+
+ /*
+ * Register backend workers. We might not get as many as we
+ * requested, or indeed any at all.
+ */
+ pcxt = node->pei->pcxt;
+ LaunchParallelWorkers(pcxt);
+
+ /* Set up tuple queue readers to read the results. */
+ if (pcxt->nworkers > 0)
+ {
+ node->nreaders = 0;
+ node->reader =
+ palloc(pcxt->nworkers * sizeof(TupleQueueReader *));
+
+ for (i = 0; i < pcxt->nworkers; ++i)
+ {
+ if (pcxt->worker[i].bgwhandle == NULL)
+ continue;
+
+ shm_mq_set_handle(node->pei->tqueue[i],
+ pcxt->worker[i].bgwhandle);
+ node->reader[node->nreaders++] =
+ CreateTupleQueueReader(node->pei->tqueue[i],
+ fslot->tts_tupleDescriptor);
+ got_any_worker = true;
+ }
+ }
+
+ /* No workers? Then never mind. */
+ if (!got_any_worker)
+ ExecShutdownGatherWorkers(node);
+ }
+
+ /* Run plan locally if no workers or not single-copy. */
+ node->need_to_scan_locally = (node->reader == NULL)
+ || !gather->single_copy;
+
+ node->initialized = true;
+ return true;
+}
/* ----------------------------------------------------------------
* ExecInitGather
@@ -58,6 +141,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
Plan *outerNode;
bool hasoid;
TupleDesc tupDesc;
+ int child_eflags;
/* Gather node doesn't have innerPlan node. */
Assert(innerPlan(node) == NULL);
@@ -97,6 +181,11 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
* now initialize outer plan
*/
outerNode = outerPlan(node);
+ /*
+ * This outer plan is executed in another process so don't start
+ * asynchronously in this process
+ */
+ child_eflags = eflags & ~EXEC_FLAG_ASYNC;
outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, eflags);
gatherstate->ps.ps_TupFromTlist = false;
@@ -115,6 +204,16 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
tupDesc = ExecTypeFromTL(outerNode->targetlist, hasoid);
ExecSetSlotDescriptor(gatherstate->funnel_slot, tupDesc);
+ /*
+ * Register asynchronous execution callback for this node. Backend workers
+ * needs to allocate large dynamic segment, and it is better to execute
+ * them at the time of first execution from this aspect. So asynchronous
+ * execution should be decided considering that but we omit the aspect for
+ * now.
+ */
+ RegisterAsyncCallback(estate, StartGather, (PlanState *)gatherstate,
+ eflags);
+
return gatherstate;
}
@@ -128,77 +227,14 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
TupleTableSlot *
ExecGather(GatherState *node)
{
- TupleTableSlot *fslot = node->funnel_slot;
- int i;
TupleTableSlot *slot;
TupleTableSlot *resultSlot;
ExprDoneCond isDone;
ExprContext *econtext;
- /*
- * Initialize the parallel context and workers on first execution. We do
- * this on first execution rather than during node initialization, as it
- * needs to allocate large dynamic segment, so it is better to do if it
- * is really needed.
- */
+ /* Initialize workers if not yet. */
if (!node->initialized)
- {
- EState *estate = node->ps.state;
- Gather *gather = (Gather *) node->ps.plan;
-
- /*
- * Sometimes we might have to run without parallelism; but if
- * parallel mode is active then we can try to fire up some workers.
- */
- if (gather->num_workers > 0 && IsInParallelMode())
- {
- ParallelContext *pcxt;
- bool got_any_worker = false;
-
- /* Initialize the workers required to execute Gather node. */
- if (!node->pei)
- node->pei = ExecInitParallelPlan(node->ps.lefttree,
- estate,
- gather->num_workers);
-
- /*
- * Register backend workers. We might not get as many as we
- * requested, or indeed any at all.
- */
- pcxt = node->pei->pcxt;
- LaunchParallelWorkers(pcxt);
-
- /* Set up tuple queue readers to read the results. */
- if (pcxt->nworkers > 0)
- {
- node->nreaders = 0;
- node->reader =
- palloc(pcxt->nworkers * sizeof(TupleQueueReader *));
-
- for (i = 0; i < pcxt->nworkers; ++i)
- {
- if (pcxt->worker[i].bgwhandle == NULL)
- continue;
-
- shm_mq_set_handle(node->pei->tqueue[i],
- pcxt->worker[i].bgwhandle);
- node->reader[node->nreaders++] =
- CreateTupleQueueReader(node->pei->tqueue[i],
- fslot->tts_tupleDescriptor);
- got_any_worker = true;
- }
- }
-
- /* No workers? Then never mind. */
- if (!got_any_worker)
- ExecShutdownGatherWorkers(node);
- }
-
- /* Run plan locally if no workers or not single-copy. */
- node->need_to_scan_locally = (node->reader == NULL)
- || !gather->single_copy;
- node->initialized = true;
- }
+ StartGather((PlanState *)node);
/*
* Check to see if we're still projecting out tuples from a previous scan
diff --git a/src/backend/executor/nodeMergeAppend.c b/src/backend/executor/nodeMergeAppend.c
index e271927..65ef13b 100644
--- a/src/backend/executor/nodeMergeAppend.c
+++ b/src/backend/executor/nodeMergeAppend.c
@@ -112,6 +112,9 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
{
Plan *initNode = (Plan *) lfirst(lc);
+ /* always request async execution for now */
+ eflags = eflags | EXEC_FLAG_ASYNC;
+
mergeplanstates[i] = ExecInitNode(initNode, estate, eflags);
i++;
}
diff --git a/src/backend/executor/nodeNestloop.c b/src/backend/executor/nodeNestloop.c
index 555fa09..16c317c 100644
--- a/src/backend/executor/nodeNestloop.c
+++ b/src/backend/executor/nodeNestloop.c
@@ -340,11 +340,24 @@ ExecInitNestLoop(NestLoop *node, EState *estate, int eflags)
* inner child, because it will always be rescanned with fresh parameter
* values.
*/
+
+ /*
+ * async execution of outer plan is benetifical if this join is requested
+ * as async
+ */
outerPlanState(nlstate) = ExecInitNode(outerPlan(node), estate, eflags);
if (node->nestParams == NIL)
eflags |= EXEC_FLAG_REWIND;
else
eflags &= ~EXEC_FLAG_REWIND;
+
+ /*
+ * Async execution of the inner is inhibited if parameterized by the
+ * outer
+ */
+ if (list_length(node->nestParams) > 0)
+ eflags &= ~ EXEC_FLAG_ASYNC;
+
innerPlanState(nlstate) = ExecInitNode(innerPlan(node), estate, eflags);
/*
diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c
index f12921d..2ae598d 100644
--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -39,6 +39,12 @@ static TupleTableSlot *SeqNext(SeqScanState *node);
* ----------------------------------------------------------------
*/
+static void
+dummy_async_cb(PlanState *ps)
+{
+ elog(LOG, "dummy_async_cb is called for %p", ps);
+}
+
/* ----------------------------------------------------------------
* SeqNext
*
@@ -214,6 +220,9 @@ ExecInitSeqScan(SeqScan *node, EState *estate, int eflags)
ExecAssignResultTypeFromTL(&scanstate->ss.ps);
ExecAssignScanProjectionInfo(&scanstate->ss);
+ /* Register dummy async callback if requested */
+ RegisterAsyncCallback(estate, dummy_async_cb, scanstate, eflags);
+
return scanstate;
}
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 1a44085..b1a17eb 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -62,6 +62,7 @@
#define EXEC_FLAG_WITH_OIDS 0x0020 /* force OIDs in returned tuples */
#define EXEC_FLAG_WITHOUT_OIDS 0x0040 /* force no OIDs in returned tuples */
#define EXEC_FLAG_WITH_NO_DATA 0x0080 /* rel scannability doesn't matter */
+#define EXEC_FLAG_ASYNC 0x0100 /* request asynchronous execution */
/*
@@ -225,6 +226,7 @@ extern PlanState *ExecInitNode(Plan *node, EState *estate, int eflags);
extern TupleTableSlot *ExecProcNode(PlanState *node);
extern Node *MultiExecProcNode(PlanState *node);
extern void ExecEndNode(PlanState *node);
+extern void AsyncStartNode(PlanState *node);
extern bool ExecShutdownNode(PlanState *node);
/*
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 07cd20a..1e8936c 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -343,6 +343,17 @@ typedef struct ResultRelInfo
List *ri_onConflictSetWhere;
} ResultRelInfo;
+/* ---------------
+ * Struct and enum for async-execution
+ */
+typedef struct PlanState PlanState;
+typedef void (*AsyncStartCallback)(PlanState *node);
+typedef struct AsyncStartListItem
+{
+ AsyncStartCallback cbfunc; /* the callback function */
+ PlanState *node; /* parameter to give the callback */
+} AsyncStartListItem;
+
/* ----------------
* EState information
*
@@ -419,9 +430,19 @@ typedef struct EState
HeapTuple *es_epqTuple; /* array of EPQ substitute tuples */
bool *es_epqTupleSet; /* true if EPQ tuple is provided */
bool *es_epqScanDone; /* true if EPQ tuple has been fetched */
-} EState;
+ /*
+ * Early-start callback list. These functions are executed just before
+ * ExecProcNode of the top-node.
+ */
+ List *es_async_cb_list;
+ List *es_private_async_cb_list;
+} EState;
+/* in execUtils.c */
+void RegisterAsyncCallback(EState *estate, AsyncStartCallback func,
+ PlanState *node, int eflags);
+void RunAsyncCallbacks(List *list);
/*
* ExecRowMark -
* runtime representation of FOR [KEY] UPDATE/SHARE clauses
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index e823c83..cbd58cb 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -79,7 +79,6 @@ typedef struct PlannedStmt
#define exec_subplan_get_plan(plannedstmt, subplan) \
((Plan *) list_nth((plannedstmt)->subplans, (subplan)->plan_id - 1))
-
/* ----------------
* Plan node
*
--
1.8.3.1
0002-PoC-Example-implement-of-asynchronous-tuple-passing.patchtext/x-patch; charset=us-asciiDownload
>From 8a65bfc57897d7be07d9bb3506550c50cf99b957 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Thu, 21 Jan 2016 16:47:31 +0900
Subject: [PATCH 2/2] PoC: Example implement of asynchronous tuple passing
Aside from early node execution, tuples from multiple children of a
node can be received asynchronously. This patch makes ExecProcNode to
return the third status EXEC_NOT_READY using estate addition to that
previously returned via result. It means that the node may have more
tuple to return but not available for the time.
As an example, this patch also modifies nodeSeqscan to return
EXEC_NOT_READY by certain probability and nodeAppend skips to the next
child if it is returned.
---
src/backend/executor/execProcnode.c | 6 ++++
src/backend/executor/nodeAppend.c | 64 ++++++++++++++++++++++---------------
src/backend/executor/nodeGather.c | 10 +++---
src/backend/executor/nodeSeqscan.c | 11 +++++--
src/include/nodes/execnodes.h | 13 ++++++++
5 files changed, 71 insertions(+), 33 deletions(-)
diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index df9e533..febc41a 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -383,6 +383,8 @@ ExecProcNode(PlanState *node)
if (node->instrument)
InstrStartNode(node->instrument);
+ node->state->exec_status = EXEC_READY;
+
switch (nodeTag(node))
{
/*
@@ -540,6 +542,10 @@ ExecProcNode(PlanState *node)
if (node->instrument)
InstrStopNode(node->instrument, TupIsNull(result) ? 0.0 : 1.0);
+ if (TupIsNull(result) &&
+ node->state->exec_status == EXEC_READY)
+ node->state->exec_status = EXEC_EOT;
+
return result;
}
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index d10364c..6ba13e9 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -121,6 +121,7 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
{
AppendState *appendstate = makeNode(AppendState);
PlanState **appendplanstates;
+ bool *stopped;
int nplans;
int i;
ListCell *lc;
@@ -134,6 +135,7 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
nplans = list_length(node->appendplans);
appendplanstates = (PlanState **) palloc0(nplans * sizeof(PlanState *));
+ stopped = (bool *) palloc0(nplans * sizeof(bool));
/*
* create new AppendState for our append node
@@ -141,6 +143,7 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
appendstate->ps.plan = (Plan *) node;
appendstate->ps.state = estate;
appendstate->appendplans = appendplanstates;
+ appendstate->stopped = stopped;
appendstate->as_nplans = nplans;
/*
@@ -195,45 +198,54 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
TupleTableSlot *
ExecAppend(AppendState *node)
{
- for (;;)
+ bool all_eot = false;
+ EState *estate = node->ps.state;
+ TupleTableSlot *result;
+
+ /*!!!! This node currently works only for monotonic-forwarding scan */
+ while (!all_eot)
{
PlanState *subnode;
- TupleTableSlot *result;
+ int i;
- /*
- * figure out which subplan we are currently processing
- */
- subnode = node->appendplans[node->as_whichplan];
+ all_eot = true;
+ /* Scan the children in registered order. */
+ for (i = node->as_whichplan ; i < node->as_nplans ; i++)
+ {
+ if (node->stopped[i])
+ continue;
- /*
- * get a tuple from the subplan
- */
- result = ExecProcNode(subnode);
+ subnode = node->appendplans[i];
+
+ result = ExecProcNode(subnode);
- if (!TupIsNull(result))
- {
/*
* If the subplan gave us something then return it as-is. We do
* NOT make use of the result slot that was set up in
* ExecInitAppend; there's no need for it.
*/
- return result;
+ switch (estate->exec_status)
+ {
+ case EXEC_READY:
+ return result;
+
+ case EXEC_NOT_READY:
+ all_eot = false;
+ break;
+
+ case EXEC_EOT:
+ node->stopped[i] = true;
+ break;
+
+ default:
+ elog(ERROR, "Unkown node status: %d", estate->exec_status);
+ }
}
- /*
- * Go on to the "next" subplan in the appropriate direction. If no
- * more subplans, return the empty slot set up for us by
- * ExecInitAppend.
- */
- if (ScanDirectionIsForward(node->ps.state->es_direction))
- node->as_whichplan++;
- else
- node->as_whichplan--;
- if (!exec_append_initialize_next(node))
- return ExecClearTuple(node->ps.ps_ResultTupleSlot);
-
- /* Else loop back and try to get a tuple from the new subplan */
+ /* XXXXX: some waiting measure is needed to wait new tuple */
}
+
+ return NULL;
}
/* ----------------------------------------------------------------
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index 3f9b8b0..1b990b4 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -45,7 +45,7 @@
static TupleTableSlot *gather_getnext(GatherState *gatherstate);
static HeapTuple gather_readnext(GatherState *gatherstate);
static void ExecShutdownGatherWorkers(GatherState *node);
-static bool StartGather(PlanState *psnode);
+static void StartGather(PlanState *psnode);
/* ----------------------------------------------------------------
* StartGather
@@ -54,7 +54,7 @@ static bool StartGather(PlanState *psnode);
* cases because of its startup cost.
* ----------------------------------------------------------------
*/
-static bool
+static void
StartGather(PlanState *psnode)
{
GatherState *node = (GatherState *)psnode;
@@ -65,7 +65,7 @@ StartGather(PlanState *psnode)
/* Don't start if already started or explicitly inhibited by the upper */
if (node->initialized)
- return false;
+ return;
/*
* Initialize the parallel context and workers on first execution. We do
@@ -127,7 +127,7 @@ StartGather(PlanState *psnode)
|| !gather->single_copy;
node->initialized = true;
- return true;
+ return;
}
/* ----------------------------------------------------------------
@@ -186,7 +186,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
* asynchronously in this process
*/
child_eflags = eflags & ~EXEC_FLAG_ASYNC;
- outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, eflags);
+ outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, child_eflags);
gatherstate->ps.ps_TupFromTlist = false;
diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c
index 2ae598d..f345d8c 100644
--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -130,6 +130,13 @@ SeqRecheck(SeqScanState *node, TupleTableSlot *slot)
TupleTableSlot *
ExecSeqScan(SeqScanState *node)
{
+ /* Make the caller wait by some probability */
+ if (random() < RAND_MAX / 10)
+ {
+ node->ss.ps.state->exec_status = EXEC_NOT_READY;
+ return NULL;
+ }
+
return ExecScan((ScanState *) node,
(ExecScanAccessMtd) SeqNext,
(ExecScanRecheckMtd) SeqRecheck);
@@ -160,7 +167,6 @@ InitScanRelation(SeqScanState *node, EState *estate, int eflags)
ExecAssignScanType(&node->ss, RelationGetDescr(currentRelation));
}
-
/* ----------------------------------------------------------------
* ExecInitSeqScan
* ----------------------------------------------------------------
@@ -221,7 +227,8 @@ ExecInitSeqScan(SeqScan *node, EState *estate, int eflags)
ExecAssignScanProjectionInfo(&scanstate->ss);
/* Register dummy async callback if requested */
- RegisterAsyncCallback(estate, dummy_async_cb, scanstate, eflags);
+ RegisterAsyncCallback(estate, dummy_async_cb,
+ (PlanState *)scanstate, eflags);
return scanstate;
}
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 1e8936c..714178a 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -354,6 +354,15 @@ typedef struct AsyncStartListItem
PlanState *node; /* parameter to give the callback */
} AsyncStartListItem;
+/* Enum for the return of AsyncExecNode */
+typedef enum NodeStatus
+{
+ EXEC_NOT_READY,
+ EXEC_READY,
+ EXEC_EOT
+} NodeStatus;
+
+
/* ----------------
* EState information
*
@@ -437,6 +446,8 @@ typedef struct EState
*/
List *es_async_cb_list;
List *es_private_async_cb_list;
+
+ NodeStatus exec_status;
} EState;
/* in execUtils.c */
@@ -1078,6 +1089,7 @@ typedef struct PlanState
ProjectionInfo *ps_ProjInfo; /* info for doing tuple projection */
bool ps_TupFromTlist;/* state flag for processing set-valued
* functions in targetlist */
+ bool ps_async_tuple; /* tuple is passed semi-asynchronously */
} PlanState;
/* ----------------
@@ -1168,6 +1180,7 @@ typedef struct AppendState
{
PlanState ps; /* its first field is NodeTag */
PlanState **appendplans; /* array of PlanStates for my inputs */
+ bool *stopped;
int as_nplans;
int as_whichplan;
} AppendState;
--
1.8.3.1
Hi!
On 2016/01/21 18:26, Kyotaro HORIGUCHI wrote:
Then, suppose we add a function bool ExecStartAsync(PlanState *target,
ExecCallback callback, PlanState *cb_planstate, void *cb_context).
For non-async-aware plan nodes, this just returns false. async-aware
plan nodes should initiate some work, register some callbacks, and
return. The callback that get registered should arrange in turn to
register the callback passed as an argument when a tuple becomes
available, passing the planstate and context provided by
ExecStartAsync's caller, plus the TupleTableSlot containing the tuple.Although I don't imagine clearly about the case of
async-aware-nodes under non-aware-nodes, it seems to have a high
affinity with (true) parallel execution framework.The ExecStartAsync is similar to ExecStartNode of my old
patch. One of the most annoying things of that is that it needs
to walk down to their descendents and in turn it needs garbageous
corresponding additional codes for all type of nodes which can
have children.
Unless I am missing something, I wonder if this is where
planstate_tree_walker() introduced by commit 8dd401aa is useful. For
example, I see that it's used in ExecShutdownNode() in a manner that looks
interesting for this discussion.
Thanks,
Amit
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Hi.
At Thu, 21 Jan 2016 19:09:19 +0900, Amit Langote <Langote_Amit_f8@lab.ntt.co.jp> wrote in <56A0AE4F.9000508@lab.ntt.co.jp>
Hi!
On 2016/01/21 18:26, Kyotaro HORIGUCHI wrote:
Then, suppose we add a function bool ExecStartAsync(PlanState *target,
ExecCallback callback, PlanState *cb_planstate, void *cb_context).
For non-async-aware plan nodes, this just returns false. async-aware
plan nodes should initiate some work, register some callbacks, and
return. The callback that get registered should arrange in turn to
register the callback passed as an argument when a tuple becomes
available, passing the planstate and context provided by
ExecStartAsync's caller, plus the TupleTableSlot containing the tuple.Although I don't imagine clearly about the case of
async-aware-nodes under non-aware-nodes, it seems to have a high
affinity with (true) parallel execution framework.The ExecStartAsync is similar to ExecStartNode of my old
patch. One of the most annoying things of that is that it needs
to walk down to their descendents and in turn it needs garbageous
corresponding additional codes for all type of nodes which can
have children.Unless I am missing something, I wonder if this is where
planstate_tree_walker() introduced by commit 8dd401aa is useful. For
example, I see that it's used in ExecShutdownNode() in a manner that looks
interesting for this discussion.
Oh, that's a part of parallel execution sutff. Thanks for letting
me know of that. The walker approach also fits to kick functions
that most types of node is unrelated. Only one (or two, including
ForeignScan) types of nodes are involved.
The attached patches have the same functionality but using
planstate_tree_walker instead of callbacks. This seems further
simpler the callbacks.
regards,
--
Kyotaro Horiguchi
NTT Open Source Software Center
Attachments:
0001-PoC-Async-start-callback-for-executor-using-planstat.patchtext/x-patch; charset=us-asciiDownload
>From a7f0f1f9077b474dd212db1fb690413dd7c4ef79 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Thu, 21 Jan 2016 09:37:25 +0900
Subject: [PATCH 1/2] PoC: Async start callback for executor using
planstate_tree_walker
This patch allows async-capable nodes to run the node before
ExecProcNode(). eflags has new bit EXEC_FLAG_ASYNC to request
asynchronous execution to children on ExecInit phase.
As an example, nodeSeqscan registers dummy callback if requested, and
nodeAppend unconditionally requests to its children. So a plan
Append(SeqScan, SeqScan) runs the callback and yields LOG messages.
---
src/backend/executor/execMain.c | 2 +
src/backend/executor/execProcnode.c | 24 +++++
src/backend/executor/nodeAppend.c | 2 +
src/backend/executor/nodeGather.c | 167 ++++++++++++++++++++-------------
src/backend/executor/nodeMergeAppend.c | 3 +
src/backend/executor/nodeNestloop.c | 13 +++
src/backend/executor/nodeSeqscan.c | 16 ++++
src/include/executor/executor.h | 2 +
src/include/executor/nodeGather.h | 1 +
src/include/executor/nodeSeqscan.h | 1 +
src/include/nodes/execnodes.h | 2 +
11 files changed, 167 insertions(+), 66 deletions(-)
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 76f7297..32b7bc3 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -1552,6 +1552,8 @@ ExecutePlan(EState *estate,
if (use_parallel_mode)
EnterParallelMode();
+ ExecStartNode(planstate);
+
/*
* Loop until we've processed the proper number of tuples from the plan.
*/
diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index a31dbc9..2107ced 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -786,6 +786,30 @@ ExecEndNode(PlanState *node)
}
/*
+ * ExecStartNode - execute registered early-startup callbacks
+ */
+bool
+ExecStartNode(PlanState *node)
+{
+ if (node == NULL)
+ return false;
+
+ switch (nodeTag(node))
+ {
+ case T_GatherState:
+ return ExecStartGather((GatherState *)node);
+ break;
+ case T_SeqScanState:
+ return ExecStartSeqScan((SeqScanState *)node);
+ break;
+ default:
+ break;
+ }
+
+ return planstate_tree_walker(node, ExecStartNode, NULL);
+}
+
+/*
* ExecShutdownNode
*
* Give execution nodes a chance to stop asynchronous resource consumption
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index a26bd63..d10364c 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -165,6 +165,8 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
{
Plan *initNode = (Plan *) lfirst(lc);
+ /* always request async-execition for children */
+ eflags |= EXEC_FLAG_ASYNC;
appendplanstates[i] = ExecInitNode(initNode, estate, eflags);
i++;
}
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index 16c981b..097f4bb 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -46,6 +46,88 @@ static TupleTableSlot *gather_getnext(GatherState *gatherstate);
static HeapTuple gather_readnext(GatherState *gatherstate);
static void ExecShutdownGatherWorkers(GatherState *node);
+/* ----------------------------------------------------------------
+ * StartGather
+ *
+ * Gather node can have an advantage from asynchronous execution in most
+ * cases because of its startup cost.
+ * ----------------------------------------------------------------
+ */
+bool
+ExecStartGather(GatherState *node)
+{
+ EState *estate = node->ps.state;
+ Gather *gather = (Gather *) node->ps.plan;
+ TupleTableSlot *fslot = node->funnel_slot;
+ int i;
+
+ /* Don't start if already started or explicitly inhibited by the upper */
+ if (node->initialized || !node->early_start)
+ return false;
+
+ /*
+ * Initialize the parallel context and workers on first execution. We do
+ * this on first execution rather than during node initialization, as it
+ * needs to allocate large dynamic segment, so it is better to do if it
+ * is really needed.
+ */
+
+ /*
+ * Sometimes we might have to run without parallelism; but if
+ * parallel mode is active then we can try to fire up some workers.
+ */
+ if (gather->num_workers > 0 && IsInParallelMode())
+ {
+ ParallelContext *pcxt;
+ bool got_any_worker = false;
+
+ /* Initialize the workers required to execute Gather node. */
+ if (!node->pei)
+ node->pei = ExecInitParallelPlan(node->ps.lefttree,
+ estate,
+ gather->num_workers);
+
+ /*
+ * Register backend workers. We might not get as many as we
+ * requested, or indeed any at all.
+ */
+ pcxt = node->pei->pcxt;
+ LaunchParallelWorkers(pcxt);
+
+ /* Set up tuple queue readers to read the results. */
+ if (pcxt->nworkers > 0)
+ {
+ node->nreaders = 0;
+ node->reader =
+ palloc(pcxt->nworkers * sizeof(TupleQueueReader *));
+
+ for (i = 0; i < pcxt->nworkers; ++i)
+ {
+ if (pcxt->worker[i].bgwhandle == NULL)
+ continue;
+
+ shm_mq_set_handle(node->pei->tqueue[i],
+ pcxt->worker[i].bgwhandle);
+ node->reader[node->nreaders++] =
+ CreateTupleQueueReader(node->pei->tqueue[i],
+ fslot->tts_tupleDescriptor);
+ got_any_worker = true;
+ }
+ }
+
+ /* No workers? Then never mind. */
+ if (!got_any_worker)
+ ExecShutdownGatherWorkers(node);
+ }
+
+ /* Run plan locally if no workers or not single-copy. */
+ node->need_to_scan_locally = (node->reader == NULL)
+ || !gather->single_copy;
+
+ node->early_start = false;
+ node->initialized = true;
+ return false;
+}
/* ----------------------------------------------------------------
* ExecInitGather
@@ -58,6 +140,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
Plan *outerNode;
bool hasoid;
TupleDesc tupDesc;
+ int child_eflags;
/* Gather node doesn't have innerPlan node. */
Assert(innerPlan(node) == NULL);
@@ -97,7 +180,12 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
* now initialize outer plan
*/
outerNode = outerPlan(node);
- outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, eflags);
+ /*
+ * This outer plan is executed in another process so don't start
+ * asynchronously in this process
+ */
+ child_eflags = eflags & ~EXEC_FLAG_ASYNC;
+ outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, child_eflags);
gatherstate->ps.ps_TupFromTlist = false;
@@ -115,6 +203,16 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
tupDesc = ExecTypeFromTL(outerNode->targetlist, hasoid);
ExecSetSlotDescriptor(gatherstate->funnel_slot, tupDesc);
+ /*
+ * Register asynchronous execution callback for this node. Backend workers
+ * needs to allocate large dynamic segment, and it is better to execute
+ * them at the time of first execution from this aspect. So asynchronous
+ * execution should be decided considering that but we omit the aspect for
+ * now.
+ */
+ if (eflags & EXEC_FLAG_ASYNC)
+ gatherstate->early_start = true;
+
return gatherstate;
}
@@ -128,77 +226,14 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
TupleTableSlot *
ExecGather(GatherState *node)
{
- TupleTableSlot *fslot = node->funnel_slot;
- int i;
TupleTableSlot *slot;
TupleTableSlot *resultSlot;
ExprDoneCond isDone;
ExprContext *econtext;
- /*
- * Initialize the parallel context and workers on first execution. We do
- * this on first execution rather than during node initialization, as it
- * needs to allocate large dynamic segment, so it is better to do if it
- * is really needed.
- */
+ /* Initialize workers if not yet. */
if (!node->initialized)
- {
- EState *estate = node->ps.state;
- Gather *gather = (Gather *) node->ps.plan;
-
- /*
- * Sometimes we might have to run without parallelism; but if
- * parallel mode is active then we can try to fire up some workers.
- */
- if (gather->num_workers > 0 && IsInParallelMode())
- {
- ParallelContext *pcxt;
- bool got_any_worker = false;
-
- /* Initialize the workers required to execute Gather node. */
- if (!node->pei)
- node->pei = ExecInitParallelPlan(node->ps.lefttree,
- estate,
- gather->num_workers);
-
- /*
- * Register backend workers. We might not get as many as we
- * requested, or indeed any at all.
- */
- pcxt = node->pei->pcxt;
- LaunchParallelWorkers(pcxt);
-
- /* Set up tuple queue readers to read the results. */
- if (pcxt->nworkers > 0)
- {
- node->nreaders = 0;
- node->reader =
- palloc(pcxt->nworkers * sizeof(TupleQueueReader *));
-
- for (i = 0; i < pcxt->nworkers; ++i)
- {
- if (pcxt->worker[i].bgwhandle == NULL)
- continue;
-
- shm_mq_set_handle(node->pei->tqueue[i],
- pcxt->worker[i].bgwhandle);
- node->reader[node->nreaders++] =
- CreateTupleQueueReader(node->pei->tqueue[i],
- fslot->tts_tupleDescriptor);
- got_any_worker = true;
- }
- }
-
- /* No workers? Then never mind. */
- if (!got_any_worker)
- ExecShutdownGatherWorkers(node);
- }
-
- /* Run plan locally if no workers or not single-copy. */
- node->need_to_scan_locally = (node->reader == NULL)
- || !gather->single_copy;
- node->initialized = true;
- }
+ ExecStartGather(node);
/*
* Check to see if we're still projecting out tuples from a previous scan
diff --git a/src/backend/executor/nodeMergeAppend.c b/src/backend/executor/nodeMergeAppend.c
index e271927..65ef13b 100644
--- a/src/backend/executor/nodeMergeAppend.c
+++ b/src/backend/executor/nodeMergeAppend.c
@@ -112,6 +112,9 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
{
Plan *initNode = (Plan *) lfirst(lc);
+ /* always request async execution for now */
+ eflags = eflags | EXEC_FLAG_ASYNC;
+
mergeplanstates[i] = ExecInitNode(initNode, estate, eflags);
i++;
}
diff --git a/src/backend/executor/nodeNestloop.c b/src/backend/executor/nodeNestloop.c
index 555fa09..16c317c 100644
--- a/src/backend/executor/nodeNestloop.c
+++ b/src/backend/executor/nodeNestloop.c
@@ -340,11 +340,24 @@ ExecInitNestLoop(NestLoop *node, EState *estate, int eflags)
* inner child, because it will always be rescanned with fresh parameter
* values.
*/
+
+ /*
+ * async execution of outer plan is benetifical if this join is requested
+ * as async
+ */
outerPlanState(nlstate) = ExecInitNode(outerPlan(node), estate, eflags);
if (node->nestParams == NIL)
eflags |= EXEC_FLAG_REWIND;
else
eflags &= ~EXEC_FLAG_REWIND;
+
+ /*
+ * Async execution of the inner is inhibited if parameterized by the
+ * outer
+ */
+ if (list_length(node->nestParams) > 0)
+ eflags &= ~ EXEC_FLAG_ASYNC;
+
innerPlanState(nlstate) = ExecInitNode(innerPlan(node), estate, eflags);
/*
diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c
index f12921d..3ee678d 100644
--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -39,6 +39,18 @@ static TupleTableSlot *SeqNext(SeqScanState *node);
* ----------------------------------------------------------------
*/
+bool
+ExecStartSeqScan(SeqScanState *node)
+{
+ if (node->early_start)
+ {
+ elog(LOG, "dummy_async_cb is called for %p", node);
+ node->early_start = false;
+ }
+
+ return false;
+}
+
/* ----------------------------------------------------------------
* SeqNext
*
@@ -214,6 +226,10 @@ ExecInitSeqScan(SeqScan *node, EState *estate, int eflags)
ExecAssignResultTypeFromTL(&scanstate->ss.ps);
ExecAssignScanProjectionInfo(&scanstate->ss);
+ /* Do early-start when requested */
+ if (eflags & EXEC_FLAG_ASYNC)
+ scanstate->early_start = true;
+
return scanstate;
}
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 1a44085..3d13217 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -62,6 +62,7 @@
#define EXEC_FLAG_WITH_OIDS 0x0020 /* force OIDs in returned tuples */
#define EXEC_FLAG_WITHOUT_OIDS 0x0040 /* force no OIDs in returned tuples */
#define EXEC_FLAG_WITH_NO_DATA 0x0080 /* rel scannability doesn't matter */
+#define EXEC_FLAG_ASYNC 0x0100 /* request asynchronous execution */
/*
@@ -224,6 +225,7 @@ extern void EvalPlanQualEnd(EPQState *epqstate);
extern PlanState *ExecInitNode(Plan *node, EState *estate, int eflags);
extern TupleTableSlot *ExecProcNode(PlanState *node);
extern Node *MultiExecProcNode(PlanState *node);
+extern bool ExecStartNode(PlanState *node);
extern void ExecEndNode(PlanState *node);
extern bool ExecShutdownNode(PlanState *node);
diff --git a/src/include/executor/nodeGather.h b/src/include/executor/nodeGather.h
index f76d9be..0a48a03 100644
--- a/src/include/executor/nodeGather.h
+++ b/src/include/executor/nodeGather.h
@@ -18,6 +18,7 @@
extern GatherState *ExecInitGather(Gather *node, EState *estate, int eflags);
extern TupleTableSlot *ExecGather(GatherState *node);
+extern bool ExecStartGather(GatherState *node);
extern void ExecEndGather(GatherState *node);
extern void ExecShutdownGather(GatherState *node);
extern void ExecReScanGather(GatherState *node);
diff --git a/src/include/executor/nodeSeqscan.h b/src/include/executor/nodeSeqscan.h
index f2e61ff..daf54ac 100644
--- a/src/include/executor/nodeSeqscan.h
+++ b/src/include/executor/nodeSeqscan.h
@@ -19,6 +19,7 @@
extern SeqScanState *ExecInitSeqScan(SeqScan *node, EState *estate, int eflags);
extern TupleTableSlot *ExecSeqScan(SeqScanState *node);
+extern bool ExecStartSeqScan(SeqScanState *node);
extern void ExecEndSeqScan(SeqScanState *node);
extern void ExecReScanSeqScan(SeqScanState *node);
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 07cd20a..4ffc2a8 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1257,6 +1257,7 @@ typedef struct SeqScanState
{
ScanState ss; /* its first field is NodeTag */
Size pscan_len; /* size of parallel heap scan descriptor */
+ bool early_start;
} SeqScanState;
/* ----------------
@@ -1968,6 +1969,7 @@ typedef struct UniqueState
typedef struct GatherState
{
PlanState ps; /* its first field is NodeTag */
+ bool early_start;
bool initialized;
struct ParallelExecutorInfo *pei;
int nreaders;
--
1.8.3.1
0002-PoC-Example-implement-of-asynchronous-tuple-passing.patchtext/x-patch; charset=us-asciiDownload
>From ee2e01b8edfec09584822f94663e9bb2e03b7e95 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Thu, 21 Jan 2016 16:47:31 +0900
Subject: [PATCH 2/2] PoC: Example implement of asynchronous tuple passing
Aside from early node execution, tuples from multiple children of a
node can be received asynchronously. This patch makes ExecProcNode to
return the third status EXEC_NOT_READY using estate addition to that
previously returned via result. It means that the node may have more
tuple to return but not available for the time.
As an example, this patch also modifies nodeSeqscan to return
EXEC_NOT_READY by certain probability and nodeAppend skips to the next
child if it is returned.
Conflicts:
src/backend/executor/nodeGather.c
src/backend/executor/nodeSeqscan.c
src/include/nodes/execnodes.h
---
src/backend/executor/execProcnode.c | 6 ++++
src/backend/executor/nodeAppend.c | 64 ++++++++++++++++++++++---------------
src/backend/executor/nodeSeqscan.c | 8 ++++-
src/include/nodes/execnodes.h | 11 +++++++
4 files changed, 62 insertions(+), 27 deletions(-)
diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index 2107ced..5398ca0 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -383,6 +383,8 @@ ExecProcNode(PlanState *node)
if (node->instrument)
InstrStartNode(node->instrument);
+ node->state->exec_status = EXEC_READY;
+
switch (nodeTag(node))
{
/*
@@ -540,6 +542,10 @@ ExecProcNode(PlanState *node)
if (node->instrument)
InstrStopNode(node->instrument, TupIsNull(result) ? 0.0 : 1.0);
+ if (TupIsNull(result) &&
+ node->state->exec_status == EXEC_READY)
+ node->state->exec_status = EXEC_EOT;
+
return result;
}
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index d10364c..6ba13e9 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -121,6 +121,7 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
{
AppendState *appendstate = makeNode(AppendState);
PlanState **appendplanstates;
+ bool *stopped;
int nplans;
int i;
ListCell *lc;
@@ -134,6 +135,7 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
nplans = list_length(node->appendplans);
appendplanstates = (PlanState **) palloc0(nplans * sizeof(PlanState *));
+ stopped = (bool *) palloc0(nplans * sizeof(bool));
/*
* create new AppendState for our append node
@@ -141,6 +143,7 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
appendstate->ps.plan = (Plan *) node;
appendstate->ps.state = estate;
appendstate->appendplans = appendplanstates;
+ appendstate->stopped = stopped;
appendstate->as_nplans = nplans;
/*
@@ -195,45 +198,54 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
TupleTableSlot *
ExecAppend(AppendState *node)
{
- for (;;)
+ bool all_eot = false;
+ EState *estate = node->ps.state;
+ TupleTableSlot *result;
+
+ /*!!!! This node currently works only for monotonic-forwarding scan */
+ while (!all_eot)
{
PlanState *subnode;
- TupleTableSlot *result;
+ int i;
- /*
- * figure out which subplan we are currently processing
- */
- subnode = node->appendplans[node->as_whichplan];
+ all_eot = true;
+ /* Scan the children in registered order. */
+ for (i = node->as_whichplan ; i < node->as_nplans ; i++)
+ {
+ if (node->stopped[i])
+ continue;
- /*
- * get a tuple from the subplan
- */
- result = ExecProcNode(subnode);
+ subnode = node->appendplans[i];
+
+ result = ExecProcNode(subnode);
- if (!TupIsNull(result))
- {
/*
* If the subplan gave us something then return it as-is. We do
* NOT make use of the result slot that was set up in
* ExecInitAppend; there's no need for it.
*/
- return result;
+ switch (estate->exec_status)
+ {
+ case EXEC_READY:
+ return result;
+
+ case EXEC_NOT_READY:
+ all_eot = false;
+ break;
+
+ case EXEC_EOT:
+ node->stopped[i] = true;
+ break;
+
+ default:
+ elog(ERROR, "Unkown node status: %d", estate->exec_status);
+ }
}
- /*
- * Go on to the "next" subplan in the appropriate direction. If no
- * more subplans, return the empty slot set up for us by
- * ExecInitAppend.
- */
- if (ScanDirectionIsForward(node->ps.state->es_direction))
- node->as_whichplan++;
- else
- node->as_whichplan--;
- if (!exec_append_initialize_next(node))
- return ExecClearTuple(node->ps.ps_ResultTupleSlot);
-
- /* Else loop back and try to get a tuple from the new subplan */
+ /* XXXXX: some waiting measure is needed to wait new tuple */
}
+
+ return NULL;
}
/* ----------------------------------------------------------------
diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c
index 3ee678d..61916bf 100644
--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -136,6 +136,13 @@ SeqRecheck(SeqScanState *node, TupleTableSlot *slot)
TupleTableSlot *
ExecSeqScan(SeqScanState *node)
{
+ /* Make the caller wait by some probability */
+ if (random() < RAND_MAX / 10)
+ {
+ node->ss.ps.state->exec_status = EXEC_NOT_READY;
+ return NULL;
+ }
+
return ExecScan((ScanState *) node,
(ExecScanAccessMtd) SeqNext,
(ExecScanRecheckMtd) SeqRecheck);
@@ -166,7 +173,6 @@ InitScanRelation(SeqScanState *node, EState *estate, int eflags)
ExecAssignScanType(&node->ss, RelationGetDescr(currentRelation));
}
-
/* ----------------------------------------------------------------
* ExecInitSeqScan
* ----------------------------------------------------------------
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 4ffc2a8..45c6fba 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -343,6 +343,14 @@ typedef struct ResultRelInfo
List *ri_onConflictSetWhere;
} ResultRelInfo;
+/* Enum for async awareness */
+typedef enum NodeStatus
+{
+ EXEC_NOT_READY,
+ EXEC_READY,
+ EXEC_EOT
+} NodeStatus;
+
/* ----------------
* EState information
*
@@ -419,6 +427,8 @@ typedef struct EState
HeapTuple *es_epqTuple; /* array of EPQ substitute tuples */
bool *es_epqTupleSet; /* true if EPQ tuple is provided */
bool *es_epqScanDone; /* true if EPQ tuple has been fetched */
+
+ NodeStatus exec_status;
} EState;
@@ -1147,6 +1157,7 @@ typedef struct AppendState
{
PlanState ps; /* its first field is NodeTag */
PlanState **appendplans; /* array of PlanStates for my inputs */
+ bool *stopped;
int as_nplans;
int as_whichplan;
} AppendState;
--
1.8.3.1
On Thu, Jan 21, 2016 at 4:26 AM, Kyotaro HORIGUCHI
<horiguchi.kyotaro@lab.ntt.co.jp> wrote:
I put some consideration and trial on callbacks as a means to
async(early)-execution.
Thanks for working on this.
Suppose we equip each EState with the ability to fire "callbacks".
Callbacks have the signature:typedef bool (*ExecCallback)(PlanState *planstate, TupleTableSlot
*slot, void *context);Executor nodes can register immediate callbacks to be run at the
earliest possible opportunity using a function like
ExecRegisterCallback(estate, callback, planstate, slot, context).
They can registered deferred callbacks that will be called when a file
descriptor becomes ready for I/O, or when the process latch is set,
using a call like ExecRegisterFileCallback(estate, fd, event,
callback, planstate, slot, context) or
ExecRegisterLatchCallback(estate, callback, planstate, slot, context).I considered on this. The immediate callbacks seems fine but
using latch or fds to signal tuple availability doesn't seem to
fit callbacks stored in estate. They are deferrable until
parent's tuple request and such kind of events can be handled at
the time as ExecGather does now. However some kind of
synchronize/waiting mechanism like latch or select() is needed
anyway.
I am not entirely sure I understand what you are trying to say here,
but if I do understand it then I disagree. Consider an Append node
with 1000 children, each a ForeignScan. What we want to do is fire
off all 1000 remote queries and then return a tuple from whichever
ForeignScan becomes ready first. What we do NOT want to do is fire
off all 1000 remote queries and have to either (a) iterate through all
1000 subplans checking repeatedly whether each is ready or (b) pick
one of those 1000 subplans to wait for and ignore the other 999 until
the one we pick is ready. I don't see any way to achieve what we need
here without some way to convince the executor to do a select() across
all 1000 fds and take some action based on which one becomes
read-ready.
Callback is usable for not-so-common invoked-for-a-event-at-once
operations such like error-handling. For this case, the
operations can be asynch-execution of a node and the event can be
just before ExecProcNode on the topmost node. The first patch
attached allows async-capable nodes to register callbacks on Init
phase and executes them just before Exec phase on the topmost
node. It grately reduces the additional code as the result. My
first impression from the word "callbacks" is this.
This strikes me as pretty much uninteresting. I bet if you test this
you'll find that it doesn't make anything faster. You're kicking
things off asynchronously only a tiny fraction of a second before you
would have started them anyway. What we need is a way to proceed
asynchronously through the entire execution of the query, not just at
the beginning. I understand that your merge-join-double-gather
example can benefit from this, but it does so by kicking off both
subplans before we know for sure that we'll want to read from both
subplans. I admit that optimization rarely kicks in, but it's a
potentially huge savings of effort when it does.
Instead, in the second patch, I modified ExecProcNode to return
async status in EState. It will be EXEC_READY or EXEC_EOT(End of
table/No more tuple?) for non-async-capable nodes and
async-capable nodes can set it EXEC_NOT_READY, which indicates
that there could be more tuple but not available yet.Async-aware nodes such as Append can go to the next child if the
predecessor returned EXEC_NOT_READY. If all !EXEC_EOT nodes
returned EXEC_NOT_READY, Append will wait using some signaling
mechanism (it runs busily now instead.). As an example, the
second patch modifies ExecAppend to handle it and modified
ExecSeqScan to return EXEC_NOT_READY by certain probability as an
emulation of asynchronous tuple fetching. The UNION ALL query
above returns results stirred among the tree tables as the result.
I think the idea of distinguishing between "end of tuples" and "no
tuples currently ready" is a good one. I am not particularly excited
about this particular signalling mechanism. I am not sure why you
want to put this in the EState - isn't that shared between all nodes
in the plan tree, and doesn't that create therefore the risk of
confusion? What I might imagine is giving ExecProcNode a second
argument that functions as an out parameter and is only meaningful
when TupIsNull(result). It could for example be a boolean indicating
whether more tuples might become available later. Maybe an enum is
better, but let's suppose a Boolean for now. At the top of
ExecProcNode we would do *async_pending = false. Then, we'd pass
async_pending to ExecWhatever function that is potentially
async-capable and it could set *async_pending = true if it wants.
Thanks for the attentive explanation. My concern about this is
that the latency by synchronizing one by one for every tuple
between the producer and the consumer. My previous patch is not
asynchronous on every tuple so it can give a pure gain without
loss from tuple-wise synchronization. But it looks clean and I
like it so I'll consider this.It seems pretty straightforward to fit Gather into this infrastructure.
Yes.
If Gather's children become a regular node struct with a name
like Worker(Node), instead of non-Node structure as it is now, we
can generalize the tuple-synchronization mecanism so that it can
be used by other nodes such as ForeginScan. Append(ForegnScan,
ForegnScan,...) with async tuple passing can average multiple
foreign servers so I suppose that it is preferable if no penalty
exists.
I don't quite understand what you mean by saying that Gather's
children are not a "regular node struct". Which struct are you
talking about?
I do agree that transferring tuples one by one seems like it might be
inefficient. Gather suffered from a problem of this nature which was
repaired, at least partially, by commit
bc7fcab5e36b9597857fa7e3fa6d9ba54aaea167. I'm wondering if we need
some kind of tuple-buffering abstraction. A tuple-buffer could have
an on-receipt-of-tuples callback; if it doesn't, then the tuples can
be read out synchronously one by one.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers