>From ee2e01b8edfec09584822f94663e9bb2e03b7e95 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Thu, 21 Jan 2016 16:47:31 +0900
Subject: [PATCH 2/2] PoC: Example implement of asynchronous tuple passing

Aside from early node execution, tuples from multiple children of a
node can be received asynchronously. This patch makes ExecProcNode to
return the third status EXEC_NOT_READY using estate addition to that
previously returned via result. It means that the node may have more
tuple to return but not available for the time.

As an example, this patch also modifies nodeSeqscan to return
EXEC_NOT_READY by certain probability and nodeAppend skips to the next
child if it is returned.

Conflicts:
	src/backend/executor/nodeGather.c
	src/backend/executor/nodeSeqscan.c
	src/include/nodes/execnodes.h
---
 src/backend/executor/execProcnode.c |  6 ++++
 src/backend/executor/nodeAppend.c   | 64 ++++++++++++++++++++++---------------
 src/backend/executor/nodeSeqscan.c  |  8 ++++-
 src/include/nodes/execnodes.h       | 11 +++++++
 4 files changed, 62 insertions(+), 27 deletions(-)

diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index 2107ced..5398ca0 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -383,6 +383,8 @@ ExecProcNode(PlanState *node)
 	if (node->instrument)
 		InstrStartNode(node->instrument);
 
+	node->state->exec_status = EXEC_READY;
+
 	switch (nodeTag(node))
 	{
 			/*
@@ -540,6 +542,10 @@ ExecProcNode(PlanState *node)
 	if (node->instrument)
 		InstrStopNode(node->instrument, TupIsNull(result) ? 0.0 : 1.0);
 
+	if (TupIsNull(result) &&
+		node->state->exec_status == EXEC_READY)
+		node->state->exec_status = EXEC_EOT;
+
 	return result;
 }
 
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index d10364c..6ba13e9 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -121,6 +121,7 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
 {
 	AppendState *appendstate = makeNode(AppendState);
 	PlanState **appendplanstates;
+	bool	   *stopped;
 	int			nplans;
 	int			i;
 	ListCell   *lc;
@@ -134,6 +135,7 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
 	nplans = list_length(node->appendplans);
 
 	appendplanstates = (PlanState **) palloc0(nplans * sizeof(PlanState *));
+	stopped = (bool *) palloc0(nplans * sizeof(bool));
 
 	/*
 	 * create new AppendState for our append node
@@ -141,6 +143,7 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
 	appendstate->ps.plan = (Plan *) node;
 	appendstate->ps.state = estate;
 	appendstate->appendplans = appendplanstates;
+	appendstate->stopped = stopped;
 	appendstate->as_nplans = nplans;
 
 	/*
@@ -195,45 +198,54 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
 TupleTableSlot *
 ExecAppend(AppendState *node)
 {
-	for (;;)
+	bool		all_eot = false;
+	EState	   *estate = node->ps.state;
+	TupleTableSlot *result;
+
+	/*!!!! This node currently works only for monotonic-forwarding scan */
+	while (!all_eot)
 	{
 		PlanState  *subnode;
-		TupleTableSlot *result;
+		int i;
 
-		/*
-		 * figure out which subplan we are currently processing
-		 */
-		subnode = node->appendplans[node->as_whichplan];
+		all_eot = true;
+		/* Scan the children in registered order. */
+		for (i = node->as_whichplan ; i < node->as_nplans ; i++)
+		{
+			if (node->stopped[i])
+				continue;
 
-		/*
-		 * get a tuple from the subplan
-		 */
-		result = ExecProcNode(subnode);
+			subnode = node->appendplans[i];
+
+			result = ExecProcNode(subnode);
 
-		if (!TupIsNull(result))
-		{
 			/*
 			 * If the subplan gave us something then return it as-is. We do
 			 * NOT make use of the result slot that was set up in
 			 * ExecInitAppend; there's no need for it.
 			 */
-			return result;
+			switch (estate->exec_status)
+			{
+			case  EXEC_READY:
+				return result;
+
+			case  EXEC_NOT_READY:
+				all_eot = false;
+				break;
+
+			case EXEC_EOT:
+				node->stopped[i] = true;
+				break;
+
+			default:
+				elog(ERROR, "Unkown node status: %d", estate->exec_status);
+			}				
 		}
 
-		/*
-		 * Go on to the "next" subplan in the appropriate direction. If no
-		 * more subplans, return the empty slot set up for us by
-		 * ExecInitAppend.
-		 */
-		if (ScanDirectionIsForward(node->ps.state->es_direction))
-			node->as_whichplan++;
-		else
-			node->as_whichplan--;
-		if (!exec_append_initialize_next(node))
-			return ExecClearTuple(node->ps.ps_ResultTupleSlot);
-
-		/* Else loop back and try to get a tuple from the new subplan */
+		/* XXXXX: some waiting measure is needed to wait new tuple */
 	}
+
+	return NULL;
 }
 
 /* ----------------------------------------------------------------
diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c
index 3ee678d..61916bf 100644
--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -136,6 +136,13 @@ SeqRecheck(SeqScanState *node, TupleTableSlot *slot)
 TupleTableSlot *
 ExecSeqScan(SeqScanState *node)
 {
+	/* Make the caller wait by some probability */
+	if (random() < RAND_MAX / 10)
+	{
+		node->ss.ps.state->exec_status = EXEC_NOT_READY;
+		return NULL;
+	}
+
 	return ExecScan((ScanState *) node,
 					(ExecScanAccessMtd) SeqNext,
 					(ExecScanRecheckMtd) SeqRecheck);
@@ -166,7 +173,6 @@ InitScanRelation(SeqScanState *node, EState *estate, int eflags)
 	ExecAssignScanType(&node->ss, RelationGetDescr(currentRelation));
 }
 
-
 /* ----------------------------------------------------------------
  *		ExecInitSeqScan
  * ----------------------------------------------------------------
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 4ffc2a8..45c6fba 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -343,6 +343,14 @@ typedef struct ResultRelInfo
 	List	   *ri_onConflictSetWhere;
 } ResultRelInfo;
 
+/* Enum for async awareness */
+typedef enum NodeStatus
+{
+	EXEC_NOT_READY,
+	EXEC_READY,
+	EXEC_EOT
+} NodeStatus;
+
 /* ----------------
  *	  EState information
  *
@@ -419,6 +427,8 @@ typedef struct EState
 	HeapTuple  *es_epqTuple;	/* array of EPQ substitute tuples */
 	bool	   *es_epqTupleSet; /* true if EPQ tuple is provided */
 	bool	   *es_epqScanDone; /* true if EPQ tuple has been fetched */
+
+	NodeStatus	exec_status;
 } EState;
 
 
@@ -1147,6 +1157,7 @@ typedef struct AppendState
 {
 	PlanState	ps;				/* its first field is NodeTag */
 	PlanState **appendplans;	/* array of PlanStates for my inputs */
+	bool	   *stopped;
 	int			as_nplans;
 	int			as_whichplan;
 } AppendState;
-- 
1.8.3.1

