>From 87e5c9eb6f230b9682fe300bc1592cb9f4fcadb5 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Thu, 21 Jan 2016 09:37:25 +0900
Subject: [PATCH 1/2] PoC: Async start callback for executor.

This patch allows async-capable nodes to register callbacks to run the
node before ExecProcNode(). eflags has new bit EXEC_FLAG_ASYNC to
request asynchronous execution to children on ExecInit phase.

As an example, nodeSeqscan registers dummy callback if requested, and
nodeAppend unconditionally requests to its children. So a plan
Append(SeqScan, SeqScan) runs the callback and yields LOG messages.
---
 src/backend/executor/execMain.c        |   2 +
 src/backend/executor/execProcnode.c    |   9 ++
 src/backend/executor/execUtils.c       |  39 ++++++++
 src/backend/executor/nodeAppend.c      |   2 +
 src/backend/executor/nodeGather.c      | 166 ++++++++++++++++++++-------------
 src/backend/executor/nodeMergeAppend.c |   3 +
 src/backend/executor/nodeNestloop.c    |  13 +++
 src/backend/executor/nodeSeqscan.c     |   9 ++
 src/include/executor/executor.h        |   2 +
 src/include/nodes/execnodes.h          |  23 ++++-
 src/include/nodes/plannodes.h          |   1 -
 11 files changed, 202 insertions(+), 67 deletions(-)

diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 76f7297..7fe188a 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -1552,6 +1552,8 @@ ExecutePlan(EState *estate,
 	if (use_parallel_mode)
 		EnterParallelMode();
 
+	AsyncStartNode(planstate);
+
 	/*
 	 * Loop until we've processed the proper number of tuples from the plan.
 	 */
diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index a31dbc9..df9e533 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -786,6 +786,15 @@ ExecEndNode(PlanState *node)
 }
 
 /*
+ * AsyncStartNode - execute registered early-startup callbacks
+ */
+void
+AsyncStartNode(PlanState *node)
+{
+	RunAsyncCallbacks(node->state->es_async_cb_list);
+}
+
+/*
  * ExecShutdownNode
  *
  * Give execution nodes a chance to stop asynchronous resource consumption
diff --git a/src/backend/executor/execUtils.c b/src/backend/executor/execUtils.c
index e937cf8..0627772 100644
--- a/src/backend/executor/execUtils.c
+++ b/src/backend/executor/execUtils.c
@@ -964,3 +964,42 @@ ShutdownExprContext(ExprContext *econtext, bool isCommit)
 
 	MemoryContextSwitchTo(oldcontext);
 }
+
+/*
+ * Register a async startup callback to EState.
+ *
+ * The callbacks are executed from the first of the list and this function
+ * puts the callbacks in registered order. This is not necessary if they are
+ * truely asynchronous and independent but the ordering is safer if some of
+ * them have an execution order in back.
+ */
+void
+RegisterAsyncCallback(EState *estate, AsyncStartCallback func, PlanState *node,
+					  int eflags)
+{
+	AsyncStartListItem *elem = palloc(sizeof(AsyncStartListItem));
+	elem->cbfunc = func;
+	elem->node = node;
+
+	if (eflags & EXEC_FLAG_ASYNC)
+		estate->es_async_cb_list =
+			lappend(estate->es_async_cb_list, elem);
+}
+
+/*
+ * Run callbacks in the list
+ */
+void
+RunAsyncCallbacks(List *list)
+{
+	ListCell *lc;
+
+	foreach (lc, list)
+	{
+		AsyncStartListItem *cb = (AsyncStartListItem *) lfirst(lc);
+
+		cb->cbfunc(cb->node);
+	}
+
+	return;
+}
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index a26bd63..d10364c 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -165,6 +165,8 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
 	{
 		Plan	   *initNode = (Plan *) lfirst(lc);
 
+		/* always request async-execition for children */
+		eflags |= EXEC_FLAG_ASYNC;
 		appendplanstates[i] = ExecInitNode(initNode, estate, eflags);
 		i++;
 	}
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index 16c981b..3f9b8b0 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -45,7 +45,90 @@
 static TupleTableSlot *gather_getnext(GatherState *gatherstate);
 static HeapTuple gather_readnext(GatherState *gatherstate);
 static void ExecShutdownGatherWorkers(GatherState *node);
+static bool StartGather(PlanState *psnode);
 
+/* ----------------------------------------------------------------
+ *		StartGather
+ *
+ *		Gather node can have an advantage from asynchronous execution in most
+ *		cases because of its startup cost.
+ *		----------------------------------------------------------------
+ */
+static bool
+StartGather(PlanState *psnode)
+{
+	GatherState   *node = (GatherState *)psnode;
+	EState	   *estate = node->ps.state;
+	Gather	   *gather = (Gather *) node->ps.plan;
+	TupleTableSlot *fslot = node->funnel_slot;
+	int i;
+
+	/* Don't start if already started or explicitly inhibited by the upper */
+	if (node->initialized)
+		return false;
+
+	/*
+	 * Initialize the parallel context and workers on first execution. We do
+	 * this on first execution rather than during node initialization, as it
+	 * needs to allocate large dynamic segment, so it is better to do if it
+	 * is really needed.
+	 */
+
+	/*
+	 * Sometimes we might have to run without parallelism; but if
+	 * parallel mode is active then we can try to fire up some workers.
+	 */
+	if (gather->num_workers > 0 && IsInParallelMode())
+	{
+		ParallelContext *pcxt;
+		bool	got_any_worker = false;
+
+		/* Initialize the workers required to execute Gather node. */
+		if (!node->pei)
+			node->pei = ExecInitParallelPlan(node->ps.lefttree,
+											 estate,
+											 gather->num_workers);
+
+		/*
+		 * Register backend workers. We might not get as many as we
+		 * requested, or indeed any at all.
+		 */
+		pcxt = node->pei->pcxt;
+		LaunchParallelWorkers(pcxt);
+
+		/* Set up tuple queue readers to read the results. */
+		if (pcxt->nworkers > 0)
+		{
+			node->nreaders = 0;
+			node->reader =
+				palloc(pcxt->nworkers * sizeof(TupleQueueReader *));
+
+			for (i = 0; i < pcxt->nworkers; ++i)
+			{
+				if (pcxt->worker[i].bgwhandle == NULL)
+					continue;
+
+				shm_mq_set_handle(node->pei->tqueue[i],
+								  pcxt->worker[i].bgwhandle);
+				node->reader[node->nreaders++] =
+					CreateTupleQueueReader(node->pei->tqueue[i],
+										   fslot->tts_tupleDescriptor);
+				got_any_worker = true;
+			}
+		}
+
+		/* No workers?  Then never mind. */
+		if (!got_any_worker)
+			ExecShutdownGatherWorkers(node);
+	}
+
+	/* Run plan locally if no workers or not single-copy. */
+	node->need_to_scan_locally = (node->reader == NULL)
+		|| !gather->single_copy;
+
+	node->initialized = true;
+	return true;
+}
 
 /* ----------------------------------------------------------------
  *		ExecInitGather
@@ -58,6 +141,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
 	Plan	   *outerNode;
 	bool		hasoid;
 	TupleDesc	tupDesc;
+	int			child_eflags;
 
 	/* Gather node doesn't have innerPlan node. */
 	Assert(innerPlan(node) == NULL);
@@ -97,6 +181,11 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
 	 * now initialize outer plan
 	 */
 	outerNode = outerPlan(node);
+	/*
+	 * This outer plan is executed in another process so don't start
+	 * asynchronously in this process
+	 */
+	child_eflags = eflags & ~EXEC_FLAG_ASYNC;
 	outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, eflags);
 
 	gatherstate->ps.ps_TupFromTlist = false;
@@ -115,6 +204,16 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
 	tupDesc = ExecTypeFromTL(outerNode->targetlist, hasoid);
 	ExecSetSlotDescriptor(gatherstate->funnel_slot, tupDesc);
 
+	/*
+	 * Register asynchronous execution callback for this node. Backend workers
+	 * needs to allocate large dynamic segment, and it is better to execute
+	 * them at the time of first execution from this aspect. So asynchronous
+	 * execution should be decided considering that but we omit the aspect for
+	 * now.
+	 */
+	RegisterAsyncCallback(estate, StartGather, (PlanState *)gatherstate,
+						  eflags);
+
 	return gatherstate;
 }
 
@@ -128,77 +227,14 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
 TupleTableSlot *
 ExecGather(GatherState *node)
 {
-	TupleTableSlot *fslot = node->funnel_slot;
-	int			i;
 	TupleTableSlot *slot;
 	TupleTableSlot *resultSlot;
 	ExprDoneCond isDone;
 	ExprContext *econtext;
 
-	/*
-	 * Initialize the parallel context and workers on first execution. We do
-	 * this on first execution rather than during node initialization, as it
-	 * needs to allocate large dynamic segment, so it is better to do if it
-	 * is really needed.
-	 */
+	/* Initialize workers if not yet. */
 	if (!node->initialized)
-	{
-		EState	   *estate = node->ps.state;
-		Gather	   *gather = (Gather *) node->ps.plan;
-
-		/*
-		 * Sometimes we might have to run without parallelism; but if
-		 * parallel mode is active then we can try to fire up some workers.
-		 */
-		if (gather->num_workers > 0 && IsInParallelMode())
-		{
-			ParallelContext *pcxt;
-			bool	got_any_worker = false;
-
-			/* Initialize the workers required to execute Gather node. */
-			if (!node->pei)
-				node->pei = ExecInitParallelPlan(node->ps.lefttree,
-												 estate,
-												 gather->num_workers);
-
-			/*
-			 * Register backend workers. We might not get as many as we
-			 * requested, or indeed any at all.
-			 */
-			pcxt = node->pei->pcxt;
-			LaunchParallelWorkers(pcxt);
-
-			/* Set up tuple queue readers to read the results. */
-			if (pcxt->nworkers > 0)
-			{
-				node->nreaders = 0;
-				node->reader =
-					palloc(pcxt->nworkers * sizeof(TupleQueueReader *));
-
-				for (i = 0; i < pcxt->nworkers; ++i)
-				{
-					if (pcxt->worker[i].bgwhandle == NULL)
-						continue;
-
-					shm_mq_set_handle(node->pei->tqueue[i],
-									  pcxt->worker[i].bgwhandle);
-					node->reader[node->nreaders++] =
-						CreateTupleQueueReader(node->pei->tqueue[i],
-											   fslot->tts_tupleDescriptor);
-					got_any_worker = true;
-				}
-			}
-
-			/* No workers?  Then never mind. */
-			if (!got_any_worker)
-				ExecShutdownGatherWorkers(node);
-		}
-
-		/* Run plan locally if no workers or not single-copy. */
-		node->need_to_scan_locally = (node->reader == NULL)
-			|| !gather->single_copy;
-		node->initialized = true;
-	}
+		StartGather((PlanState *)node);
 
 	/*
 	 * Check to see if we're still projecting out tuples from a previous scan
diff --git a/src/backend/executor/nodeMergeAppend.c b/src/backend/executor/nodeMergeAppend.c
index e271927..65ef13b 100644
--- a/src/backend/executor/nodeMergeAppend.c
+++ b/src/backend/executor/nodeMergeAppend.c
@@ -112,6 +112,9 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
 	{
 		Plan	   *initNode = (Plan *) lfirst(lc);
 
+		/* always request async execution for now */
+		eflags = eflags | EXEC_FLAG_ASYNC;
+
 		mergeplanstates[i] = ExecInitNode(initNode, estate, eflags);
 		i++;
 	}
diff --git a/src/backend/executor/nodeNestloop.c b/src/backend/executor/nodeNestloop.c
index 555fa09..16c317c 100644
--- a/src/backend/executor/nodeNestloop.c
+++ b/src/backend/executor/nodeNestloop.c
@@ -340,11 +340,24 @@ ExecInitNestLoop(NestLoop *node, EState *estate, int eflags)
 	 * inner child, because it will always be rescanned with fresh parameter
 	 * values.
 	 */
+
+	/*
+	 * async execution of outer plan is benetifical if this join is requested
+	 * as async
+	 */
 	outerPlanState(nlstate) = ExecInitNode(outerPlan(node), estate, eflags);
 	if (node->nestParams == NIL)
 		eflags |= EXEC_FLAG_REWIND;
 	else
 		eflags &= ~EXEC_FLAG_REWIND;
+
+	/*
+	 * Async execution of the inner is inhibited if parameterized by the
+	 * outer
+	 */
+	if (list_length(node->nestParams) > 0)
+		eflags &= ~ EXEC_FLAG_ASYNC;
+
 	innerPlanState(nlstate) = ExecInitNode(innerPlan(node), estate, eflags);
 
 	/*
diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c
index f12921d..2ae598d 100644
--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -39,6 +39,12 @@ static TupleTableSlot *SeqNext(SeqScanState *node);
  * ----------------------------------------------------------------
  */
 
+static void
+dummy_async_cb(PlanState *ps)
+{
+	elog(LOG, "dummy_async_cb is called for %p", ps);
+}
+
 /* ----------------------------------------------------------------
  *		SeqNext
  *
@@ -214,6 +220,9 @@ ExecInitSeqScan(SeqScan *node, EState *estate, int eflags)
 	ExecAssignResultTypeFromTL(&scanstate->ss.ps);
 	ExecAssignScanProjectionInfo(&scanstate->ss);
 
+	/*  Register dummy async callback if requested */
+	RegisterAsyncCallback(estate, dummy_async_cb, scanstate, eflags);
+
 	return scanstate;
 }
 
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 1a44085..b1a17eb 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -62,6 +62,7 @@
 #define EXEC_FLAG_WITH_OIDS		0x0020	/* force OIDs in returned tuples */
 #define EXEC_FLAG_WITHOUT_OIDS	0x0040	/* force no OIDs in returned tuples */
 #define EXEC_FLAG_WITH_NO_DATA	0x0080	/* rel scannability doesn't matter */
+#define EXEC_FLAG_ASYNC			0x0100	/* request asynchronous execution */
 
 
 /*
@@ -225,6 +226,7 @@ extern PlanState *ExecInitNode(Plan *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecProcNode(PlanState *node);
 extern Node *MultiExecProcNode(PlanState *node);
 extern void ExecEndNode(PlanState *node);
+extern void AsyncStartNode(PlanState *node);
 extern bool ExecShutdownNode(PlanState *node);
 
 /*
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 07cd20a..1e8936c 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -343,6 +343,17 @@ typedef struct ResultRelInfo
 	List	   *ri_onConflictSetWhere;
 } ResultRelInfo;
 
+/* ---------------
+ *  Struct and enum for async-execution
+ */
+typedef struct PlanState PlanState;
+typedef void (*AsyncStartCallback)(PlanState *node);
+typedef struct AsyncStartListItem
+{
+	AsyncStartCallback		cbfunc;	/* the callback function  */
+	PlanState			   *node;	/* parameter to give the callback */
+} AsyncStartListItem;
+
 /* ----------------
  *	  EState information
  *
@@ -419,9 +430,19 @@ typedef struct EState
 	HeapTuple  *es_epqTuple;	/* array of EPQ substitute tuples */
 	bool	   *es_epqTupleSet; /* true if EPQ tuple is provided */
 	bool	   *es_epqScanDone; /* true if EPQ tuple has been fetched */
-} EState;
 
+	/*
+	 * Early-start callback list. These functions are executed just before
+	 * ExecProcNode of the top-node.
+	 */
+	List	*es_async_cb_list;
+	List	*es_private_async_cb_list;
+} EState;
 
+/* in execUtils.c */
+void RegisterAsyncCallback(EState *estate, AsyncStartCallback func,
+						   PlanState *node, int eflags);
+void RunAsyncCallbacks(List *list);
 /*
  * ExecRowMark -
  *	   runtime representation of FOR [KEY] UPDATE/SHARE clauses
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index e823c83..cbd58cb 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -79,7 +79,6 @@ typedef struct PlannedStmt
 #define exec_subplan_get_plan(plannedstmt, subplan) \
 	((Plan *) list_nth((plannedstmt)->subplans, (subplan)->plan_id - 1))
 
-
 /* ----------------
  *		Plan node
  *
-- 
1.8.3.1

