>From 8a65bfc57897d7be07d9bb3506550c50cf99b957 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Thu, 21 Jan 2016 16:47:31 +0900
Subject: [PATCH 2/2] PoC: Example implement of asynchronous tuple passing

Aside from early node execution, tuples from multiple children of a
node can be received asynchronously. This patch makes ExecProcNode to
return the third status EXEC_NOT_READY using estate addition to that
previously returned via result. It means that the node may have more
tuple to return but not available for the time.

As an example, this patch also modifies nodeSeqscan to return
EXEC_NOT_READY by certain probability and nodeAppend skips to the next
child if it is returned.
---
 src/backend/executor/execProcnode.c |  6 ++++
 src/backend/executor/nodeAppend.c   | 64 ++++++++++++++++++++++---------------
 src/backend/executor/nodeGather.c   | 10 +++---
 src/backend/executor/nodeSeqscan.c  | 11 +++++--
 src/include/nodes/execnodes.h       | 13 ++++++++
 5 files changed, 71 insertions(+), 33 deletions(-)

diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index df9e533..febc41a 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -383,6 +383,8 @@ ExecProcNode(PlanState *node)
 	if (node->instrument)
 		InstrStartNode(node->instrument);
 
+	node->state->exec_status = EXEC_READY;
+
 	switch (nodeTag(node))
 	{
 			/*
@@ -540,6 +542,10 @@ ExecProcNode(PlanState *node)
 	if (node->instrument)
 		InstrStopNode(node->instrument, TupIsNull(result) ? 0.0 : 1.0);
 
+	if (TupIsNull(result) &&
+		node->state->exec_status == EXEC_READY)
+		node->state->exec_status = EXEC_EOT;
+
 	return result;
 }
 
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index d10364c..6ba13e9 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -121,6 +121,7 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
 {
 	AppendState *appendstate = makeNode(AppendState);
 	PlanState **appendplanstates;
+	bool	   *stopped;
 	int			nplans;
 	int			i;
 	ListCell   *lc;
@@ -134,6 +135,7 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
 	nplans = list_length(node->appendplans);
 
 	appendplanstates = (PlanState **) palloc0(nplans * sizeof(PlanState *));
+	stopped = (bool *) palloc0(nplans * sizeof(bool));
 
 	/*
 	 * create new AppendState for our append node
@@ -141,6 +143,7 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
 	appendstate->ps.plan = (Plan *) node;
 	appendstate->ps.state = estate;
 	appendstate->appendplans = appendplanstates;
+	appendstate->stopped = stopped;
 	appendstate->as_nplans = nplans;
 
 	/*
@@ -195,45 +198,54 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
 TupleTableSlot *
 ExecAppend(AppendState *node)
 {
-	for (;;)
+	bool		all_eot = false;
+	EState	   *estate = node->ps.state;
+	TupleTableSlot *result;
+
+	/*!!!! This node currently works only for monotonic-forwarding scan */
+	while (!all_eot)
 	{
 		PlanState  *subnode;
-		TupleTableSlot *result;
+		int i;
 
-		/*
-		 * figure out which subplan we are currently processing
-		 */
-		subnode = node->appendplans[node->as_whichplan];
+		all_eot = true;
+		/* Scan the children in registered order. */
+		for (i = node->as_whichplan ; i < node->as_nplans ; i++)
+		{
+			if (node->stopped[i])
+				continue;
 
-		/*
-		 * get a tuple from the subplan
-		 */
-		result = ExecProcNode(subnode);
+			subnode = node->appendplans[i];
+
+			result = ExecProcNode(subnode);
 
-		if (!TupIsNull(result))
-		{
 			/*
 			 * If the subplan gave us something then return it as-is. We do
 			 * NOT make use of the result slot that was set up in
 			 * ExecInitAppend; there's no need for it.
 			 */
-			return result;
+			switch (estate->exec_status)
+			{
+			case  EXEC_READY:
+				return result;
+
+			case  EXEC_NOT_READY:
+				all_eot = false;
+				break;
+
+			case EXEC_EOT:
+				node->stopped[i] = true;
+				break;
+
+			default:
+				elog(ERROR, "Unkown node status: %d", estate->exec_status);
+			}				
 		}
 
-		/*
-		 * Go on to the "next" subplan in the appropriate direction. If no
-		 * more subplans, return the empty slot set up for us by
-		 * ExecInitAppend.
-		 */
-		if (ScanDirectionIsForward(node->ps.state->es_direction))
-			node->as_whichplan++;
-		else
-			node->as_whichplan--;
-		if (!exec_append_initialize_next(node))
-			return ExecClearTuple(node->ps.ps_ResultTupleSlot);
-
-		/* Else loop back and try to get a tuple from the new subplan */
+		/* XXXXX: some waiting measure is needed to wait new tuple */
 	}
+
+	return NULL;
 }
 
 /* ----------------------------------------------------------------
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index 3f9b8b0..1b990b4 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -45,7 +45,7 @@
 static TupleTableSlot *gather_getnext(GatherState *gatherstate);
 static HeapTuple gather_readnext(GatherState *gatherstate);
 static void ExecShutdownGatherWorkers(GatherState *node);
-static bool StartGather(PlanState *psnode);
+static void StartGather(PlanState *psnode);
 
 /* ----------------------------------------------------------------
  *		StartGather
@@ -54,7 +54,7 @@ static bool StartGather(PlanState *psnode);
  *		cases because of its startup cost.
  *		----------------------------------------------------------------
  */
-static bool
+static void
 StartGather(PlanState *psnode)
 {
 	GatherState   *node = (GatherState *)psnode;
@@ -65,7 +65,7 @@ StartGather(PlanState *psnode)
 
 	/* Don't start if already started or explicitly inhibited by the upper */
 	if (node->initialized)
-		return false;
+		return;
 
 	/*
 	 * Initialize the parallel context and workers on first execution. We do
@@ -127,7 +127,7 @@ StartGather(PlanState *psnode)
 		|| !gather->single_copy;
 
 	node->initialized = true;
-	return true;
+	return;
 }
 
 /* ----------------------------------------------------------------
@@ -186,7 +186,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
 	 * asynchronously in this process
 	 */
 	child_eflags = eflags & ~EXEC_FLAG_ASYNC;
-	outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, eflags);
+	outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, child_eflags);
 
 	gatherstate->ps.ps_TupFromTlist = false;
 
diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c
index 2ae598d..f345d8c 100644
--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -130,6 +130,13 @@ SeqRecheck(SeqScanState *node, TupleTableSlot *slot)
 TupleTableSlot *
 ExecSeqScan(SeqScanState *node)
 {
+	/* Make the caller wait by some probability */
+	if (random() < RAND_MAX / 10)
+	{
+		node->ss.ps.state->exec_status = EXEC_NOT_READY;
+		return NULL;
+	}
+
 	return ExecScan((ScanState *) node,
 					(ExecScanAccessMtd) SeqNext,
 					(ExecScanRecheckMtd) SeqRecheck);
@@ -160,7 +167,6 @@ InitScanRelation(SeqScanState *node, EState *estate, int eflags)
 	ExecAssignScanType(&node->ss, RelationGetDescr(currentRelation));
 }
 
-
 /* ----------------------------------------------------------------
  *		ExecInitSeqScan
  * ----------------------------------------------------------------
@@ -221,7 +227,8 @@ ExecInitSeqScan(SeqScan *node, EState *estate, int eflags)
 	ExecAssignScanProjectionInfo(&scanstate->ss);
 
 	/*  Register dummy async callback if requested */
-	RegisterAsyncCallback(estate, dummy_async_cb, scanstate, eflags);
+	RegisterAsyncCallback(estate, dummy_async_cb,
+						  (PlanState *)scanstate, eflags);
 
 	return scanstate;
 }
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 1e8936c..714178a 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -354,6 +354,15 @@ typedef struct AsyncStartListItem
 	PlanState			   *node;	/* parameter to give the callback */
 } AsyncStartListItem;
 
+/* Enum for the return of AsyncExecNode */
+typedef enum NodeStatus
+{
+	EXEC_NOT_READY,
+	EXEC_READY,
+	EXEC_EOT
+} NodeStatus;
+
+
 /* ----------------
  *	  EState information
  *
@@ -437,6 +446,8 @@ typedef struct EState
 	 */
 	List	*es_async_cb_list;
 	List	*es_private_async_cb_list;
+
+	NodeStatus exec_status;
 } EState;
 
 /* in execUtils.c */
@@ -1078,6 +1089,7 @@ typedef struct PlanState
 	ProjectionInfo *ps_ProjInfo;	/* info for doing tuple projection */
 	bool		ps_TupFromTlist;/* state flag for processing set-valued
 								 * functions in targetlist */
+	bool		ps_async_tuple;	/* tuple is passed semi-asynchronously */
 } PlanState;
 
 /* ----------------
@@ -1168,6 +1180,7 @@ typedef struct AppendState
 {
 	PlanState	ps;				/* its first field is NodeTag */
 	PlanState **appendplans;	/* array of PlanStates for my inputs */
+	bool	   *stopped;
 	int			as_nplans;
 	int			as_whichplan;
 } AppendState;
-- 
1.8.3.1

