>From a7f0f1f9077b474dd212db1fb690413dd7c4ef79 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Thu, 21 Jan 2016 09:37:25 +0900
Subject: [PATCH 1/2] PoC: Async start callback for executor using
 planstate_tree_walker

This patch allows async-capable nodes to run the node before
ExecProcNode(). eflags has new bit EXEC_FLAG_ASYNC to request
asynchronous execution to children on ExecInit phase.

As an example, nodeSeqscan registers dummy callback if requested, and
nodeAppend unconditionally requests to its children. So a plan
Append(SeqScan, SeqScan) runs the callback and yields LOG messages.
---
 src/backend/executor/execMain.c        |   2 +
 src/backend/executor/execProcnode.c    |  24 +++++
 src/backend/executor/nodeAppend.c      |   2 +
 src/backend/executor/nodeGather.c      | 167 ++++++++++++++++++++-------------
 src/backend/executor/nodeMergeAppend.c |   3 +
 src/backend/executor/nodeNestloop.c    |  13 +++
 src/backend/executor/nodeSeqscan.c     |  16 ++++
 src/include/executor/executor.h        |   2 +
 src/include/executor/nodeGather.h      |   1 +
 src/include/executor/nodeSeqscan.h     |   1 +
 src/include/nodes/execnodes.h          |   2 +
 11 files changed, 167 insertions(+), 66 deletions(-)

diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 76f7297..32b7bc3 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -1552,6 +1552,8 @@ ExecutePlan(EState *estate,
 	if (use_parallel_mode)
 		EnterParallelMode();
 
+	ExecStartNode(planstate);
+
 	/*
 	 * Loop until we've processed the proper number of tuples from the plan.
 	 */
diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index a31dbc9..2107ced 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -786,6 +786,30 @@ ExecEndNode(PlanState *node)
 }
 
 /*
+ * ExecStartNode - execute registered early-startup callbacks
+ */
+bool
+ExecStartNode(PlanState *node)
+{
+	if (node == NULL)
+		return false;
+
+	switch (nodeTag(node))
+	{
+	case T_GatherState:
+		return ExecStartGather((GatherState *)node);
+		break;
+	case T_SeqScanState:
+		return ExecStartSeqScan((SeqScanState *)node);
+		break;
+	default:
+		break;	
+	}
+
+	return planstate_tree_walker(node, ExecStartNode, NULL);
+}
+
+/*
  * ExecShutdownNode
  *
  * Give execution nodes a chance to stop asynchronous resource consumption
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index a26bd63..d10364c 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -165,6 +165,8 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
 	{
 		Plan	   *initNode = (Plan *) lfirst(lc);
 
+		/* always request async-execition for children */
+		eflags |= EXEC_FLAG_ASYNC;
 		appendplanstates[i] = ExecInitNode(initNode, estate, eflags);
 		i++;
 	}
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index 16c981b..097f4bb 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -46,6 +46,88 @@ static TupleTableSlot *gather_getnext(GatherState *gatherstate);
 static HeapTuple gather_readnext(GatherState *gatherstate);
 static void ExecShutdownGatherWorkers(GatherState *node);
 
+/* ----------------------------------------------------------------
+ *		StartGather
+ *
+ *		Gather node can have an advantage from asynchronous execution in most
+ *		cases because of its startup cost.
+ *		----------------------------------------------------------------
+ */
+bool
+ExecStartGather(GatherState *node)
+{
+	EState	   *estate = node->ps.state;
+	Gather	   *gather = (Gather *) node->ps.plan;
+	TupleTableSlot *fslot = node->funnel_slot;
+	int i;
+
+	/* Don't start if already started or explicitly inhibited by the upper */
+	if (node->initialized || !node->early_start)
+		return false;
+
+	/*
+	 * Initialize the parallel context and workers on first execution. We do
+	 * this on first execution rather than during node initialization, as it
+	 * needs to allocate large dynamic segment, so it is better to do if it
+	 * is really needed.
+	 */
+
+	/*
+	 * Sometimes we might have to run without parallelism; but if
+	 * parallel mode is active then we can try to fire up some workers.
+	 */
+	if (gather->num_workers > 0 && IsInParallelMode())
+	{
+		ParallelContext *pcxt;
+		bool	got_any_worker = false;
+
+		/* Initialize the workers required to execute Gather node. */
+		if (!node->pei)
+			node->pei = ExecInitParallelPlan(node->ps.lefttree,
+											 estate,
+											 gather->num_workers);
+
+		/*
+		 * Register backend workers. We might not get as many as we
+		 * requested, or indeed any at all.
+		 */
+		pcxt = node->pei->pcxt;
+		LaunchParallelWorkers(pcxt);
+
+		/* Set up tuple queue readers to read the results. */
+		if (pcxt->nworkers > 0)
+		{
+			node->nreaders = 0;
+			node->reader =
+				palloc(pcxt->nworkers * sizeof(TupleQueueReader *));
+
+			for (i = 0; i < pcxt->nworkers; ++i)
+			{
+				if (pcxt->worker[i].bgwhandle == NULL)
+					continue;
+
+				shm_mq_set_handle(node->pei->tqueue[i],
+								  pcxt->worker[i].bgwhandle);
+				node->reader[node->nreaders++] =
+					CreateTupleQueueReader(node->pei->tqueue[i],
+										   fslot->tts_tupleDescriptor);
+				got_any_worker = true;
+			}
+		}
+
+		/* No workers?  Then never mind. */
+		if (!got_any_worker)
+			ExecShutdownGatherWorkers(node);
+	}
+
+	/* Run plan locally if no workers or not single-copy. */
+	node->need_to_scan_locally = (node->reader == NULL)
+		|| !gather->single_copy;
+
+	node->early_start = false;
+	node->initialized = true;
+	return false;
+}
 
 /* ----------------------------------------------------------------
  *		ExecInitGather
@@ -58,6 +140,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
 	Plan	   *outerNode;
 	bool		hasoid;
 	TupleDesc	tupDesc;
+	int			child_eflags;
 
 	/* Gather node doesn't have innerPlan node. */
 	Assert(innerPlan(node) == NULL);
@@ -97,7 +180,12 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
 	 * now initialize outer plan
 	 */
 	outerNode = outerPlan(node);
-	outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, eflags);
+	/*
+	 * This outer plan is executed in another process so don't start
+	 * asynchronously in this process
+	 */
+	child_eflags = eflags & ~EXEC_FLAG_ASYNC;
+	outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, child_eflags);
 
 	gatherstate->ps.ps_TupFromTlist = false;
 
@@ -115,6 +203,16 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
 	tupDesc = ExecTypeFromTL(outerNode->targetlist, hasoid);
 	ExecSetSlotDescriptor(gatherstate->funnel_slot, tupDesc);
 
+	/*
+	 * Register asynchronous execution callback for this node. Backend workers
+	 * needs to allocate large dynamic segment, and it is better to execute
+	 * them at the time of first execution from this aspect. So asynchronous
+	 * execution should be decided considering that but we omit the aspect for
+	 * now.
+	 */
+	if (eflags & EXEC_FLAG_ASYNC)
+		gatherstate->early_start = true;
+
 	return gatherstate;
 }
 
@@ -128,77 +226,14 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
 TupleTableSlot *
 ExecGather(GatherState *node)
 {
-	TupleTableSlot *fslot = node->funnel_slot;
-	int			i;
 	TupleTableSlot *slot;
 	TupleTableSlot *resultSlot;
 	ExprDoneCond isDone;
 	ExprContext *econtext;
 
-	/*
-	 * Initialize the parallel context and workers on first execution. We do
-	 * this on first execution rather than during node initialization, as it
-	 * needs to allocate large dynamic segment, so it is better to do if it
-	 * is really needed.
-	 */
+	/* Initialize workers if not yet. */
 	if (!node->initialized)
-	{
-		EState	   *estate = node->ps.state;
-		Gather	   *gather = (Gather *) node->ps.plan;
-
-		/*
-		 * Sometimes we might have to run without parallelism; but if
-		 * parallel mode is active then we can try to fire up some workers.
-		 */
-		if (gather->num_workers > 0 && IsInParallelMode())
-		{
-			ParallelContext *pcxt;
-			bool	got_any_worker = false;
-
-			/* Initialize the workers required to execute Gather node. */
-			if (!node->pei)
-				node->pei = ExecInitParallelPlan(node->ps.lefttree,
-												 estate,
-												 gather->num_workers);
-
-			/*
-			 * Register backend workers. We might not get as many as we
-			 * requested, or indeed any at all.
-			 */
-			pcxt = node->pei->pcxt;
-			LaunchParallelWorkers(pcxt);
-
-			/* Set up tuple queue readers to read the results. */
-			if (pcxt->nworkers > 0)
-			{
-				node->nreaders = 0;
-				node->reader =
-					palloc(pcxt->nworkers * sizeof(TupleQueueReader *));
-
-				for (i = 0; i < pcxt->nworkers; ++i)
-				{
-					if (pcxt->worker[i].bgwhandle == NULL)
-						continue;
-
-					shm_mq_set_handle(node->pei->tqueue[i],
-									  pcxt->worker[i].bgwhandle);
-					node->reader[node->nreaders++] =
-						CreateTupleQueueReader(node->pei->tqueue[i],
-											   fslot->tts_tupleDescriptor);
-					got_any_worker = true;
-				}
-			}
-
-			/* No workers?  Then never mind. */
-			if (!got_any_worker)
-				ExecShutdownGatherWorkers(node);
-		}
-
-		/* Run plan locally if no workers or not single-copy. */
-		node->need_to_scan_locally = (node->reader == NULL)
-			|| !gather->single_copy;
-		node->initialized = true;
-	}
+		ExecStartGather(node);
 
 	/*
 	 * Check to see if we're still projecting out tuples from a previous scan
diff --git a/src/backend/executor/nodeMergeAppend.c b/src/backend/executor/nodeMergeAppend.c
index e271927..65ef13b 100644
--- a/src/backend/executor/nodeMergeAppend.c
+++ b/src/backend/executor/nodeMergeAppend.c
@@ -112,6 +112,9 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
 	{
 		Plan	   *initNode = (Plan *) lfirst(lc);
 
+		/* always request async execution for now */
+		eflags = eflags | EXEC_FLAG_ASYNC;
+
 		mergeplanstates[i] = ExecInitNode(initNode, estate, eflags);
 		i++;
 	}
diff --git a/src/backend/executor/nodeNestloop.c b/src/backend/executor/nodeNestloop.c
index 555fa09..16c317c 100644
--- a/src/backend/executor/nodeNestloop.c
+++ b/src/backend/executor/nodeNestloop.c
@@ -340,11 +340,24 @@ ExecInitNestLoop(NestLoop *node, EState *estate, int eflags)
 	 * inner child, because it will always be rescanned with fresh parameter
 	 * values.
 	 */
+
+	/*
+	 * async execution of outer plan is benetifical if this join is requested
+	 * as async
+	 */
 	outerPlanState(nlstate) = ExecInitNode(outerPlan(node), estate, eflags);
 	if (node->nestParams == NIL)
 		eflags |= EXEC_FLAG_REWIND;
 	else
 		eflags &= ~EXEC_FLAG_REWIND;
+
+	/*
+	 * Async execution of the inner is inhibited if parameterized by the
+	 * outer
+	 */
+	if (list_length(node->nestParams) > 0)
+		eflags &= ~ EXEC_FLAG_ASYNC;
+
 	innerPlanState(nlstate) = ExecInitNode(innerPlan(node), estate, eflags);
 
 	/*
diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c
index f12921d..3ee678d 100644
--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -39,6 +39,18 @@ static TupleTableSlot *SeqNext(SeqScanState *node);
  * ----------------------------------------------------------------
  */
 
+bool
+ExecStartSeqScan(SeqScanState *node)
+{
+	if (node->early_start)
+	{
+		elog(LOG, "dummy_async_cb is called for %p", node);
+		node->early_start = false;
+	}
+
+	return false;
+}
+
 /* ----------------------------------------------------------------
  *		SeqNext
  *
@@ -214,6 +226,10 @@ ExecInitSeqScan(SeqScan *node, EState *estate, int eflags)
 	ExecAssignResultTypeFromTL(&scanstate->ss.ps);
 	ExecAssignScanProjectionInfo(&scanstate->ss);
 
+	/*  Do early-start when requested */
+	if (eflags & EXEC_FLAG_ASYNC)
+		scanstate->early_start = true;
+
 	return scanstate;
 }
 
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 1a44085..3d13217 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -62,6 +62,7 @@
 #define EXEC_FLAG_WITH_OIDS		0x0020	/* force OIDs in returned tuples */
 #define EXEC_FLAG_WITHOUT_OIDS	0x0040	/* force no OIDs in returned tuples */
 #define EXEC_FLAG_WITH_NO_DATA	0x0080	/* rel scannability doesn't matter */
+#define EXEC_FLAG_ASYNC			0x0100	/* request asynchronous execution */
 
 
 /*
@@ -224,6 +225,7 @@ extern void EvalPlanQualEnd(EPQState *epqstate);
 extern PlanState *ExecInitNode(Plan *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecProcNode(PlanState *node);
 extern Node *MultiExecProcNode(PlanState *node);
+extern bool ExecStartNode(PlanState *node);
 extern void ExecEndNode(PlanState *node);
 extern bool ExecShutdownNode(PlanState *node);
 
diff --git a/src/include/executor/nodeGather.h b/src/include/executor/nodeGather.h
index f76d9be..0a48a03 100644
--- a/src/include/executor/nodeGather.h
+++ b/src/include/executor/nodeGather.h
@@ -18,6 +18,7 @@
 
 extern GatherState *ExecInitGather(Gather *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecGather(GatherState *node);
+extern bool ExecStartGather(GatherState *node);
 extern void ExecEndGather(GatherState *node);
 extern void ExecShutdownGather(GatherState *node);
 extern void ExecReScanGather(GatherState *node);
diff --git a/src/include/executor/nodeSeqscan.h b/src/include/executor/nodeSeqscan.h
index f2e61ff..daf54ac 100644
--- a/src/include/executor/nodeSeqscan.h
+++ b/src/include/executor/nodeSeqscan.h
@@ -19,6 +19,7 @@
 
 extern SeqScanState *ExecInitSeqScan(SeqScan *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecSeqScan(SeqScanState *node);
+extern bool ExecStartSeqScan(SeqScanState *node);
 extern void ExecEndSeqScan(SeqScanState *node);
 extern void ExecReScanSeqScan(SeqScanState *node);
 
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 07cd20a..4ffc2a8 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1257,6 +1257,7 @@ typedef struct SeqScanState
 {
 	ScanState	ss;				/* its first field is NodeTag */
 	Size		pscan_len;		/* size of parallel heap scan descriptor */
+	bool		early_start;
 } SeqScanState;
 
 /* ----------------
@@ -1968,6 +1969,7 @@ typedef struct UniqueState
 typedef struct GatherState
 {
 	PlanState	ps;				/* its first field is NodeTag */
+	bool		early_start;
 	bool		initialized;
 	struct ParallelExecutorInfo *pei;
 	int			nreaders;
-- 
1.8.3.1

