>From bee3f8a971a1c86a7c53929a17e358a763348193 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Thu, 9 Jul 2015 19:34:15 +0900
Subject: [PATCH 3/5] Add a feature to start node asynchronously

Add a feature to start child nodes asynchronously in join nodes and
Append/MergeAppend nodes.  At this point, no node can be started
asynchronously so the behavior is not changed.
---
 src/backend/executor/execProcnode.c     | 106 ++++++++++++++++++++++++
 src/backend/executor/nodeAgg.c          |  22 +++++
 src/backend/executor/nodeAppend.c       |  29 +++++++
 src/backend/executor/nodeCtescan.c      |  21 +++++
 src/backend/executor/nodeGather.c       | 141 +++++++++++++++++++-------------
 src/backend/executor/nodeGroup.c        |  22 +++++
 src/backend/executor/nodeHash.c         |  22 +++++
 src/backend/executor/nodeHashjoin.c     |  54 ++++++++++++
 src/backend/executor/nodeLimit.c        |  22 +++++
 src/backend/executor/nodeLockRows.c     |  22 +++++
 src/backend/executor/nodeMaterial.c     |  23 ++++++
 src/backend/executor/nodeMergeAppend.c  |  30 +++++++
 src/backend/executor/nodeMergejoin.c    |  29 +++++++
 src/backend/executor/nodeNestloop.c     |  34 ++++++++
 src/backend/executor/nodeResult.c       |  24 ++++++
 src/backend/executor/nodeSetOp.c        |  22 +++++
 src/backend/executor/nodeSort.c         |  22 +++++
 src/backend/executor/nodeSubqueryscan.c |  22 +++++
 src/backend/executor/nodeUnique.c       |  22 +++++
 src/backend/executor/nodeWindowAgg.c    |  22 +++++
 src/include/executor/executor.h         |   1 +
 src/include/executor/nodeAgg.h          |   1 +
 src/include/executor/nodeAppend.h       |   1 +
 src/include/executor/nodeCtescan.h      |   1 +
 src/include/executor/nodeGather.h       |   1 +
 src/include/executor/nodeGroup.h        |   1 +
 src/include/executor/nodeHash.h         |   1 +
 src/include/executor/nodeHashjoin.h     |   1 +
 src/include/executor/nodeLimit.h        |   1 +
 src/include/executor/nodeLockRows.h     |   1 +
 src/include/executor/nodeMaterial.h     |   1 +
 src/include/executor/nodeMergeAppend.h  |   1 +
 src/include/executor/nodeMergejoin.h    |   1 +
 src/include/executor/nodeNestloop.h     |   1 +
 src/include/executor/nodeResult.h       |   1 +
 src/include/executor/nodeSetOp.h        |   1 +
 src/include/executor/nodeSort.h         |   1 +
 src/include/executor/nodeSubqueryscan.h |   1 +
 src/include/executor/nodeUnique.h       |   1 +
 src/include/executor/nodeWindowAgg.h    |   1 +
 40 files changed, 672 insertions(+), 59 deletions(-)

diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index 6f5c554..a9d973d 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -786,6 +786,112 @@ ExecEndNode(PlanState *node)
 }
 
 /*
+ * StartProcNode - asynchronously execnode nodes underneath if possible
+ *
+ * Returns true if the node has been started asynchronously. Some of the nodes
+ * may be started even if false.
+ */
+bool
+StartProcNode(PlanState *node)
+{
+	/*
+	 * Refuse duplicate start. This occurs for skipped children on rescan on
+	 * nodes such like MergeAppend.
+	 */
+	if (node->runstate > ERunState_Started)
+		return false;
+
+	switch (nodeTag(node))
+	{
+	case T_ResultState:
+		return StartResult((ResultState *)node);
+
+	case T_AppendState:
+		return StartAppend((AppendState *)node);
+
+	case T_MergeAppendState:
+		return StartMergeAppend((MergeAppendState *)node);
+
+	case T_SubqueryScanState:
+		return StartSubqueryScan((SubqueryScanState *)node);
+
+	case T_CteScanState:
+		return StartCteScan((CteScanState *)node);
+
+		/*
+		 * join nodes
+		 */
+	case T_NestLoopState:
+		return StartNestLoop((NestLoopState *)node);
+
+	case T_MergeJoinState:
+		return StartMergeJoin((MergeJoinState *)node);
+
+	case T_HashJoinState:
+		return StartHashJoin((HashJoinState *)node);
+
+		/*
+		 * materialization nodes
+		 */
+	case T_MaterialState:
+		return StartMaterial((MaterialState *)node);
+
+	case T_SortState:
+		return StartSort((SortState *)node);
+
+	case T_GroupState:
+		return StartGroup((GroupState *)node);
+
+	case T_AggState:
+		return StartAgg((AggState *)node);
+
+	case T_WindowAggState:
+		return StartWindowAgg((WindowAggState *)node);
+
+	case T_UniqueState:
+		return StartUnique((UniqueState *)node);
+
+	case T_HashState:
+		return StartHash((HashState *)node);
+
+	case T_SetOpState:
+		return StartSetOp((SetOpState *)node);
+
+	case T_LockRowsState:
+		return StartLockRows((LockRowsState *)node);
+
+	case T_LimitState:
+		return StartLimit((LimitState *)node);
+
+	case T_GatherState:
+		return StartGather((GatherState *)node);
+
+	/* These nodes cannot run asynchronously */
+	case T_ForeignScanState:
+	case T_WorkTableScanState:
+	case T_CustomScanState:
+	case T_FunctionScanState:
+	case T_ValuesScanState:
+	case T_SeqScanState:
+	case T_SampleScanState:
+	case T_IndexScanState:
+	case T_IndexOnlyScanState:
+	case T_BitmapIndexScanState:
+	case T_BitmapHeapScanState:
+	case T_TidScanState:
+	case T_ModifyTableState:
+	case T_RecursiveUnionState:
+	case T_BitmapAndState:
+	case T_BitmapOrState:
+		return false;
+
+	default:
+		elog(ERROR, "unrecognized node type: %d", (int) nodeTag(node));
+		break;
+	}
+}
+
+/*
  * ExecShutdownNode
  *
  * Give execution nodes a chance to stop asynchronous resource consumption
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index ed29e3a..a40cb48 100644
--- a/src/backend/executor/nodeAgg.c
+++ b/src/backend/executor/nodeAgg.c
@@ -1569,6 +1569,28 @@ ExecAgg(AggState *node)
 }
 
 /*
+ * StartAgg - Try asynchronous execution of this node
+ *
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
+ */
+bool
+StartAgg(AggState *node)
+{
+	if (!ExecNode_is_inited(node))
+		return false;
+
+	if (StartProcNode(outerPlanState(node)))
+	{
+		SetNodeRunState(node, Started);
+		return true;
+	}
+
+	return false;
+}
+
+
+/*
  * ExecAgg for non-hashed case
  */
 static TupleTableSlot *
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index 03b3b66..2b918b2 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -194,6 +194,10 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
 TupleTableSlot *
 ExecAppend(AppendState *node)
 {
+	/* start child nodes asynchronously if possible */
+	if (ExecNode_is_inited(node))
+		StartAppend(node);
+
 	SetNodeRunState(node, Running);
 
 	for (;;)
@@ -241,6 +245,31 @@ ExecAppend(AppendState *node)
 }
 
 /* ----------------------------------------------------------------
+ *		StartAppend
+ *
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
+ * ----------------------------------------------------------------
+ */
+bool
+StartAppend(AppendState *node)
+{
+	int i;
+	bool async = false;
+
+	if (!ExecNode_is_inited(node))
+		return false;
+
+	for (i = 0 ; i < node->as_nplans ; i++)
+		async |= StartProcNode(node->appendplans[i]);
+
+	if (async)
+		SetNodeRunState(node, Started);
+
+	return async;
+}
+
+/* ----------------------------------------------------------------
  *		ExecEndAppend
  *
  *		Shuts down the subscans of the append node.
diff --git a/src/backend/executor/nodeCtescan.c b/src/backend/executor/nodeCtescan.c
index d237370..cae0aca 100644
--- a/src/backend/executor/nodeCtescan.c
+++ b/src/backend/executor/nodeCtescan.c
@@ -166,6 +166,27 @@ ExecCteScan(CteScanState *node)
 	return slot;
 }
 
+/* ----------------------------------------------------------------
+ *		StartCteScan
+ *
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
+ * ----------------------------------------------------------------
+ */
+bool
+StartCteScan(CteScanState *node)
+{
+	if (ExecNode_is_inited(node))
+		return false;
+
+	if (StartProcNode(node->cteplanstate))
+	{
+		SetNodeRunState(node, Started);
+		return true;
+	}
+
+	return false;
+}
 
 /* ----------------------------------------------------------------
  *		ExecInitCteScan
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index aceb358..5364acb 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -69,6 +69,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
 	gatherstate->ps.state = estate;
 	SetNodeRunState(gatherstate, Inited);
 	gatherstate->need_to_scan_locally = !node->single_copy;
+	SetNodeRunState(gatherstate, Inited);
 
 	/*
 	 * Miscellaneous initialization
@@ -119,23 +120,24 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
 }
 
 /* ----------------------------------------------------------------
- *		ExecGather(node)
+ *		StartGather
  *
- *		Scans the relation via multiple workers and returns
- *		the next qualifying tuple.
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
  * ----------------------------------------------------------------
  */
-TupleTableSlot *
-ExecGather(GatherState *node)
+bool
+StartGather(GatherState *node)
 {
+	EState	   *estate = node->ps.state;
+	Gather	   *gather = (Gather *) node->ps.plan;
 	TupleTableSlot *fslot = node->funnel_slot;
-	int			i;
-	TupleTableSlot *slot;
-	TupleTableSlot *resultSlot;
-	ExprDoneCond isDone;
-	ExprContext *econtext;
+	int i;
 
-	SetNodeRunState(node, Running);
+	if (!ExecNode_is_inited(node))
+		return false;
+
+	SetNodeRunState(node, Started);
 
 	/*
 	 * Initialize the parallel context and workers on first execution. We do
@@ -143,66 +145,85 @@ ExecGather(GatherState *node)
 	 * needs to allocate large dynamic segement, so it is better to do if it
 	 * is really needed.
 	 */
-	if (ExecNode_is_inited(node))
+
+	/*
+	 * Sometimes we might have to run without parallelism; but if
+	 * parallel mode is active then we can try to fire up some workers.
+	 */
+	if (gather->num_workers > 0 && IsInParallelMode())
 	{
-		EState	   *estate = node->ps.state;
-		Gather	   *gather = (Gather *) node->ps.plan;
+		ParallelContext *pcxt;
+		bool	got_any_worker = false;
+
+		/* Initialize the workers required to execute Gather node. */
+		if (!node->pei)
+			node->pei = ExecInitParallelPlan(node->ps.lefttree,
+											 estate,
+											 gather->num_workers);
 
 		/*
-		 * Sometimes we might have to run without parallelism; but if
-		 * parallel mode is active then we can try to fire up some workers.
+		 * Register backend workers. We might not get as many as we
+		 * requested, or indeed any at all.
 		 */
-		if (gather->num_workers > 0 && IsInParallelMode())
-		{
-			ParallelContext *pcxt;
-			bool	got_any_worker = false;
+		pcxt = node->pei->pcxt;
+		LaunchParallelWorkers(pcxt);
 
-			/* Initialize the workers required to execute Gather node. */
-			if (!node->pei)
-				node->pei = ExecInitParallelPlan(node->ps.lefttree,
-												 estate,
-												 gather->num_workers);
-
-			/*
-			 * Register backend workers. We might not get as many as we
-			 * requested, or indeed any at all.
-			 */
-			pcxt = node->pei->pcxt;
-			LaunchParallelWorkers(pcxt);
+		/* Set up tuple queue readers to read the results. */
+		if (pcxt->nworkers > 0)
+		{
+			node->nreaders = 0;
+			node->reader =
+				palloc(pcxt->nworkers * sizeof(TupleQueueReader *));
 
-			/* Set up tuple queue readers to read the results. */
-			if (pcxt->nworkers > 0)
+			for (i = 0; i < pcxt->nworkers; ++i)
 			{
-				node->nreaders = 0;
-				node->reader =
-					palloc(pcxt->nworkers * sizeof(TupleQueueReader *));
-
-				for (i = 0; i < pcxt->nworkers; ++i)
-				{
-					if (pcxt->worker[i].bgwhandle == NULL)
-						continue;
-
-					shm_mq_set_handle(node->pei->tqueue[i],
-									  pcxt->worker[i].bgwhandle);
-					node->reader[node->nreaders++] =
-						CreateTupleQueueReader(node->pei->tqueue[i],
-											   fslot->tts_tupleDescriptor);
-					got_any_worker = true;
-				}
+				if (pcxt->worker[i].bgwhandle == NULL)
+					continue;
+
+				shm_mq_set_handle(node->pei->tqueue[i],
+								  pcxt->worker[i].bgwhandle);
+				node->reader[node->nreaders++] =
+					CreateTupleQueueReader(node->pei->tqueue[i],
+										   fslot->tts_tupleDescriptor);
+				got_any_worker = true;
 			}
-
-			/* No workers?  Then never mind. */
-			if (!got_any_worker)
-				ExecShutdownGatherWorkers(node);
 		}
 
-		/* Run plan locally if no workers or not single-copy. */
-		node->need_to_scan_locally = (node->reader == NULL)
-			|| !gather->single_copy;
-
-		SetNodeRunState(node, Running);
+		/* No workers?  Then never mind. */
+		if (!got_any_worker)
+			ExecShutdownGatherWorkers(node);
 	}
 
+	/* Run plan locally if no workers or not single-copy. */
+	node->need_to_scan_locally = (node->reader == NULL)
+		|| !gather->single_copy;
+
+	/* Plans on worker are always executed asynchronously */
+	SetNodeRunState(node, Started);
+	return true;
+}
+
+/* ----------------------------------------------------------------
+ *		ExecGather(node)
+ *
+ *		Scans the relation via multiple workers and returns
+ *		the next qualifying tuple.
+ * ----------------------------------------------------------------
+ */
+TupleTableSlot *
+ExecGather(GatherState *node)
+{
+	TupleTableSlot *slot;
+	TupleTableSlot *resultSlot;
+	ExprDoneCond isDone;
+	ExprContext *econtext;
+
+	/* Execute childs asynchronously if possible */
+	if (ExecNode_is_inited(node))
+		StartGather(node);
+
+	SetNodeRunState(node, Running);
+
 	/*
 	 * Check to see if we're still projecting out tuples from a previous scan
 	 * tuple (because there is a function-returning-set in the projection
@@ -462,6 +483,8 @@ ExecReScanGather(GatherState *node)
 	 */
 	ExecShutdownGatherWorkers(node);
 
+	SetNodeRunState(node, Inited);
+
 	if (node->pei)
 		ExecParallelReinitialize(node->pei);
 
diff --git a/src/backend/executor/nodeGroup.c b/src/backend/executor/nodeGroup.c
index a593d9f..ea947b9 100644
--- a/src/backend/executor/nodeGroup.c
+++ b/src/backend/executor/nodeGroup.c
@@ -190,6 +190,28 @@ ExecGroup(GroupState *node)
 }
 
 /* -----------------
+ * StartGroup
+ *
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
+ * -----------------
+ */
+bool
+StartGroup(GroupState *node)
+{
+	if (!ExecNode_is_inited(node))
+		return false;
+
+	if (StartProcNode(outerPlanState(node)))
+	{
+		SetNodeRunState(node, Started);
+		return true;
+	}
+
+	return false;
+}
+
+/* -----------------
  * ExecInitGroup
  *
  *	Creates the run-time information for the group node produced by the
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 5a71fe3..e04a78f 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -157,6 +157,28 @@ MultiExecHash(HashState *node)
 }
 
 /* ----------------------------------------------------------------
+ *		AsyncStartHash
+ *
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
+ * ----------------------------------------------------------------
+ */
+bool
+StartHash(HashState *node)
+{
+	if (!ExecNode_is_inited(node))
+		return false;
+
+	if (StartProcNode(outerPlanState(node)))
+	{
+		SetNodeRunState(node, Started);
+		return true;
+	}
+
+	return false;
+}
+
+/* ----------------------------------------------------------------
  *		ExecInitHash
  *
  *		Init routine for Hash node
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index dbaabc4..ada9290 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -72,6 +72,10 @@ ExecHashJoin(HashJoinState *node)
 	uint32		hashvalue;
 	int			batchno;
 
+	/* Try to start asynchronously */
+	if (ExecNode_is_inited(node))
+		StartHashJoin(node);
+
 	SetNodeRunState(node, Running);
 
 	/*
@@ -435,6 +439,56 @@ ExecHashJoin(HashJoinState *node)
 }
 
 /* ----------------------------------------------------------------
+ *		StartHashJoin
+ *
+ * This function behaves a bit different from StartNode functions of other
+ * nodes from the behavior of ExecHashJoin.
+ * Returns true if any of underlying nodes started asynchronously
+ * ----------------------------------------------------------------
+ */
+bool
+StartHashJoin(HashJoinState *node)
+{
+	PlanState  *outerNode = outerPlanState(node);
+	HashState  *hashNode = (HashState *) innerPlanState(node);
+	bool 		async;
+
+	if (!ExecNode_is_inited(node))
+		return false;
+
+	async = StartProcNode(outerNode);
+
+	/*
+	 * This condition is the same to that to check the necessity of inner hash
+	 * at HJ_BUILD_HASHTABLE of ExecHashJoin.
+	 */
+	if (!HJ_FILL_INNER(node) &&
+		(HJ_FILL_OUTER(node) ||
+		 (outerNode->plan->startup_cost < hashNode->ps.plan->total_cost &&
+		  !node->hj_OuterNotEmpty)))
+	{
+		/*
+		 * The first tuple of outer plan is needed to judge the necessity of
+		 * inner hash here so don't start inner plan. Although the condition
+		 * to come here is dependent on the costs of outer startup and hash
+		 * creation and asynchronous execution will break this balance, we
+		 * continue to depend on this formula for now, because of the lack of
+		 * appropriate alternative.
+		 */
+	}
+	else
+	{
+		/* Hash will be created. Start the inner node. */
+		async |= StartProcNode((PlanState *)hashNode);
+	}
+
+	if (async)
+		SetNodeRunState(node, Started);
+
+	return async;
+}
+
+/* ----------------------------------------------------------------
  *		ExecInitHashJoin
  *
  *		Init routine for HashJoin node.
diff --git a/src/backend/executor/nodeLimit.c b/src/backend/executor/nodeLimit.c
index e59d71f..21b2c37 100644
--- a/src/backend/executor/nodeLimit.c
+++ b/src/backend/executor/nodeLimit.c
@@ -243,6 +243,28 @@ ExecLimit(LimitState *node)
 	return slot;
 }
 
+/* ----------------------------------------------------------------
+ *		StartLimit
+ *
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
+ * ----------------------------------------------------------------
+ */
+bool
+StartLimit(LimitState *node)
+{
+	if (!ExecNode_is_inited(node))
+		return false;
+
+	if (StartProcNode(outerPlanState(node)))
+	{
+		SetNodeRunState(node, Started);
+		return true;
+	}
+
+	return false;
+}
+
 /*
  * Evaluate the limit/offset expressions --- done at startup or rescan.
  *
diff --git a/src/backend/executor/nodeLockRows.c b/src/backend/executor/nodeLockRows.c
index 2ccf05d..e741a93 100644
--- a/src/backend/executor/nodeLockRows.c
+++ b/src/backend/executor/nodeLockRows.c
@@ -347,6 +347,28 @@ lnext:
 }
 
 /* ----------------------------------------------------------------
+ *		StartLockRows
+ *
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
+ * ----------------------------------------------------------------
+ */
+bool
+StartLockRows(LockRowsState *node)
+{
+	if (!ExecNode_is_inited(node))
+		return false;
+
+	if (StartProcNode(outerPlanState(node)))
+	{
+		SetNodeRunState(node, Started);
+		return true;
+	}
+
+	return false;
+}
+
+/* ----------------------------------------------------------------
  *		ExecInitLockRows
  *
  *		This initializes the LockRows node state structures and
diff --git a/src/backend/executor/nodeMaterial.c b/src/backend/executor/nodeMaterial.c
index 981398a..4e41c1c 100644
--- a/src/backend/executor/nodeMaterial.c
+++ b/src/backend/executor/nodeMaterial.c
@@ -160,6 +160,28 @@ ExecMaterial(MaterialState *node)
 }
 
 /* ----------------------------------------------------------------
+ *		StartMaterial
+ *
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
+ * ----------------------------------------------------------------
+ */
+bool
+StartMaterial(MaterialState *node)
+{
+	if (!ExecNode_is_inited(node))
+		return false;
+
+	if (StartProcNode(outerPlanState(node)))
+	{
+		SetNodeRunState(node, Started);
+		return true;
+	}
+
+	return false;
+}
+
+/* ----------------------------------------------------------------
  *		ExecInitMaterial
  * ----------------------------------------------------------------
  */
@@ -330,6 +352,7 @@ ExecReScanMaterial(MaterialState *node)
 	SetNodeRunState(node, Inited);
 
 	ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
+	SetNodeRunState(node, Inited);
 
 	if (node->eflags != 0)
 	{
diff --git a/src/backend/executor/nodeMergeAppend.c b/src/backend/executor/nodeMergeAppend.c
index 4678d7c..ab6c304 100644
--- a/src/backend/executor/nodeMergeAppend.c
+++ b/src/backend/executor/nodeMergeAppend.c
@@ -170,6 +170,10 @@ ExecMergeAppend(MergeAppendState *node)
 	TupleTableSlot *result;
 	SlotNumber	i;
 
+	/* start child nodes asynchronously if possible */
+	if (ExecNode_is_inited(node))
+		StartMergeAppend(node);
+	
 	SetNodeRunState(node, Running);
 
 	if (!node->ms_initialized)
@@ -220,6 +224,32 @@ ExecMergeAppend(MergeAppendState *node)
 	return result;
 }
 
+/* ----------------------------------------------------------------
+ *		StartMergeAppend
+ *
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
+ * ----------------------------------------------------------------
+ */
+bool
+StartMergeAppend(MergeAppendState *node)
+{
+	int i;
+	bool async = false;
+
+	if (ExecNode_is_inited(node))
+		return false;
+
+	for (i = 0 ; i < node->ms_nplans ; i++)
+		async |= StartProcNode(node->mergeplans[i]);
+
+	if (async)
+		SetNodeRunState(node, Started);
+
+	return async;
+}
+
+
 /*
  * Compare the tuples in the two given slots.
  */
diff --git a/src/backend/executor/nodeMergejoin.c b/src/backend/executor/nodeMergejoin.c
index 74ceaa2..32bd8a5 100644
--- a/src/backend/executor/nodeMergejoin.c
+++ b/src/backend/executor/nodeMergejoin.c
@@ -630,6 +630,10 @@ ExecMergeJoin(MergeJoinState *node)
 	bool		doFillOuter;
 	bool		doFillInner;
 
+	/* Execute childs asynchronously if possible */
+	if (ExecNode_is_inited(node))
+		StartMergeJoin(node);
+
 	SetNodeRunState(node, Running);
 
 	/*
@@ -1475,6 +1479,31 @@ ExecMergeJoin(MergeJoinState *node)
 }
 
 /* ----------------------------------------------------------------
+ *		AsyncStartMergeJoin
+ *
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
+ * ----------------------------------------------------------------
+ */
+bool
+StartMergeJoin(MergeJoinState *node)
+{
+	bool async;
+
+	if (!ExecNode_is_inited(node))
+		return false;
+
+	/* Merge join can unconditionally start child nodes asynchronously */
+	async  = StartProcNode(innerPlanState(node));
+	async |= StartProcNode(outerPlanState(node));
+
+	if (async)
+		SetNodeRunState(node, Started);
+
+	return async;
+}
+
+/* ----------------------------------------------------------------
  *		ExecInitMergeJoin
  * ----------------------------------------------------------------
  */
diff --git a/src/backend/executor/nodeNestloop.c b/src/backend/executor/nodeNestloop.c
index ae69176..cb95a3d 100644
--- a/src/backend/executor/nodeNestloop.c
+++ b/src/backend/executor/nodeNestloop.c
@@ -69,6 +69,10 @@ ExecNestLoop(NestLoopState *node)
 	ExprContext *econtext;
 	ListCell   *lc;
 
+	/* Execute childs asynchronously if possible */
+	if (ExecNode_is_inited(node))
+		StartNestLoop(node);
+
 	SetNodeRunState(node, Running);
 
 	/*
@@ -292,6 +296,36 @@ ExecNestLoop(NestLoopState *node)
 }
 
 /* ----------------------------------------------------------------
+ *		StartNestLoop
+ *
+ * The inner plan of nest loop won't be executed asynchronously if it is
+ * parameterized.
+ * Returns true if any of underlying nodes started asynchronously
+ * ----------------------------------------------------------------
+ */
+bool
+StartNestLoop(NestLoopState *node)
+{
+	NestLoop   *nl = (NestLoop *) node->js.ps.plan;
+	bool async;
+
+	if (!ExecNode_is_inited(node))
+		return true;
+
+	/* Always try async execution of outer plan  */
+	async = StartProcNode(outerPlanState(node));
+
+	/* This inner node cannot be asynchronous if it is parameterized */
+	if (list_length(nl->nestParams) < 1)
+		async |= StartProcNode(innerPlanState(node));
+
+	if (async)
+		SetNodeRunState(node, Started);
+
+	return async;
+}
+
+/* ----------------------------------------------------------------
  *		ExecInitNestLoop
  * ----------------------------------------------------------------
  */
diff --git a/src/backend/executor/nodeResult.c b/src/backend/executor/nodeResult.c
index ec81eda..c33443a 100644
--- a/src/backend/executor/nodeResult.c
+++ b/src/backend/executor/nodeResult.c
@@ -171,6 +171,30 @@ ExecResult(ResultState *node)
 }
 
 /* ----------------------------------------------------------------
+ *		StartResult
+ *
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
+ * ----------------------------------------------------------------
+ */
+bool
+StartResult(ResultState * node)
+{
+	PlanState *subnode = outerPlanState(node);
+
+	if (!ExecNode_is_inited(node))
+		return false;
+
+	if (subnode && StartProcNode(subnode))
+	{
+		SetNodeRunState(node, Started);
+		return true;
+	}
+
+	return false;
+}
+
+/* ----------------------------------------------------------------
  *		ExecResultMarkPos
  * ----------------------------------------------------------------
  */
diff --git a/src/backend/executor/nodeSetOp.c b/src/backend/executor/nodeSetOp.c
index c248ff3..a0dbec6 100644
--- a/src/backend/executor/nodeSetOp.c
+++ b/src/backend/executor/nodeSetOp.c
@@ -225,6 +225,28 @@ ExecSetOp(SetOpState *node)
 		return setop_retrieve_direct(node);
 }
 
+/* ----------------------------------------------------------------
+ *		StartSetOp
+ *
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
+ * ----------------------------------------------------------------
+ */
+bool
+StartSetOp(SetOpState *node)
+{
+	if (!ExecNode_is_inited(node))
+		return false;
+
+	if (StartProcNode(outerPlanState(node)))
+	{
+		SetNodeRunState(node, Started);
+		return true;
+	}
+
+	return false;
+}
+
 /*
  * ExecSetOp for non-hashed case
  */
diff --git a/src/backend/executor/nodeSort.c b/src/backend/executor/nodeSort.c
index a2abec7..f0c9a63 100644
--- a/src/backend/executor/nodeSort.c
+++ b/src/backend/executor/nodeSort.c
@@ -148,6 +148,28 @@ ExecSort(SortState *node)
 }
 
 /* ----------------------------------------------------------------
+ *		StartSort
+ *
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
+ * ----------------------------------------------------------------
+ */
+bool
+StartSort(SortState *node)
+{
+	if (!ExecNode_is_inited(node))
+		return false;
+
+	if (StartProcNode(outerPlanState(node)))
+	{
+		SetNodeRunState(node, Started);
+		return true;
+	}
+
+	return false;
+}
+
+/* ----------------------------------------------------------------
  *		ExecInitSort
  *
  *		Creates the run-time state information for the sort node
diff --git a/src/backend/executor/nodeSubqueryscan.c b/src/backend/executor/nodeSubqueryscan.c
index d8799d1..4899d93 100644
--- a/src/backend/executor/nodeSubqueryscan.c
+++ b/src/backend/executor/nodeSubqueryscan.c
@@ -104,6 +104,28 @@ ExecSubqueryScan(SubqueryScanState *node)
 }
 
 /* ----------------------------------------------------------------
+ *		StartSubqueryScan
+ *
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
+ * ----------------------------------------------------------------
+ */
+bool
+StartSubqueryScan(SubqueryScanState *node)
+{
+	if (ExecNode_is_inited(node))
+		return false;
+
+	if (StartProcNode(node->subplan))
+	{
+		SetNodeRunState(node, Started);
+		return true;
+	}
+
+	return false;
+}
+
+/* ----------------------------------------------------------------
  *		ExecInitSubqueryScan
  * ----------------------------------------------------------------
  */
diff --git a/src/backend/executor/nodeUnique.c b/src/backend/executor/nodeUnique.c
index 1f7ca10..53da967 100644
--- a/src/backend/executor/nodeUnique.c
+++ b/src/backend/executor/nodeUnique.c
@@ -105,6 +105,28 @@ ExecUnique(UniqueState *node)
 }
 
 /* ----------------------------------------------------------------
+ *		StartUnique
+ *
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
+ * ----------------------------------------------------------------
+ */
+bool
+StartUnique(UniqueState *node)
+{
+	if (!ExecNode_is_inited(node))
+		return false;
+
+	if (StartProcNode(outerPlanState(node)))
+	{
+		SetNodeRunState(node, Started);
+		return true;
+	}
+
+	return false;
+}
+
+/* ----------------------------------------------------------------
  *		ExecInitUnique
  *
  *		This initializes the unique node state structures and
diff --git a/src/backend/executor/nodeWindowAgg.c b/src/backend/executor/nodeWindowAgg.c
index 0d127b4..6327d55 100644
--- a/src/backend/executor/nodeWindowAgg.c
+++ b/src/backend/executor/nodeWindowAgg.c
@@ -1760,6 +1760,28 @@ restart:
 }
 
 /* -----------------
+ * StartWindowAgg
+ *
+ * Try to start asynchronously.
+ * Returns true if any of underlying nodes started asynchronously
+ * -----------------
+ */
+bool
+StartWindowAgg(WindowAggState *node)
+{
+	if (!ExecNode_is_inited(node))
+		return false;
+
+	if (StartProcNode(outerPlanState(node)))
+	{
+		SetNodeRunState(node, Started);
+		return true;
+	}
+
+	return false;
+}
+
+/* -----------------
  * ExecInitWindowAgg
  *
  *	Creates the run-time information for the WindowAgg node produced by the
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 4f77692..230c9af 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -223,6 +223,7 @@ extern void EvalPlanQualEnd(EPQState *epqstate);
  */
 extern PlanState *ExecInitNode(Plan *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecProcNode(PlanState *node);
+extern bool StartProcNode(PlanState *node);
 extern Node *MultiExecProcNode(PlanState *node);
 extern void ExecEndNode(PlanState *node);
 extern bool ExecShutdownNode(PlanState *node);
diff --git a/src/include/executor/nodeAgg.h b/src/include/executor/nodeAgg.h
index fe3b81a..7fb0a6f 100644
--- a/src/include/executor/nodeAgg.h
+++ b/src/include/executor/nodeAgg.h
@@ -18,6 +18,7 @@
 
 extern AggState *ExecInitAgg(Agg *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecAgg(AggState *node);
+extern bool StartAgg(AggState *node);
 extern void ExecEndAgg(AggState *node);
 extern void ExecReScanAgg(AggState *node);
 
diff --git a/src/include/executor/nodeAppend.h b/src/include/executor/nodeAppend.h
index f2d920b..d77b70e 100644
--- a/src/include/executor/nodeAppend.h
+++ b/src/include/executor/nodeAppend.h
@@ -18,6 +18,7 @@
 
 extern AppendState *ExecInitAppend(Append *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecAppend(AppendState *node);
+extern bool StartAppend(AppendState *node);
 extern void ExecEndAppend(AppendState *node);
 extern void ExecReScanAppend(AppendState *node);
 
diff --git a/src/include/executor/nodeCtescan.h b/src/include/executor/nodeCtescan.h
index 369dafa..e418786 100644
--- a/src/include/executor/nodeCtescan.h
+++ b/src/include/executor/nodeCtescan.h
@@ -18,6 +18,7 @@
 
 extern CteScanState *ExecInitCteScan(CteScan *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecCteScan(CteScanState *node);
+extern bool StartCteScan(CteScanState *node);
 extern void ExecEndCteScan(CteScanState *node);
 extern void ExecReScanCteScan(CteScanState *node);
 
diff --git a/src/include/executor/nodeGather.h b/src/include/executor/nodeGather.h
index 9e5d8fc..e7cbe21 100644
--- a/src/include/executor/nodeGather.h
+++ b/src/include/executor/nodeGather.h
@@ -17,6 +17,7 @@
 #include "nodes/execnodes.h"
 
 extern GatherState *ExecInitGather(Gather *node, EState *estate, int eflags);
+extern bool StartGather(GatherState *node);
 extern TupleTableSlot *ExecGather(GatherState *node);
 extern void ExecEndGather(GatherState *node);
 extern void ExecShutdownGather(GatherState *node);
diff --git a/src/include/executor/nodeGroup.h b/src/include/executor/nodeGroup.h
index 3485fe8..bfc75cd 100644
--- a/src/include/executor/nodeGroup.h
+++ b/src/include/executor/nodeGroup.h
@@ -18,6 +18,7 @@
 
 extern GroupState *ExecInitGroup(Group *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecGroup(GroupState *node);
+extern bool StartGroup(GroupState *node);
 extern void ExecEndGroup(GroupState *node);
 extern void ExecReScanGroup(GroupState *node);
 
diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h
index acc28438..b0855d3 100644
--- a/src/include/executor/nodeHash.h
+++ b/src/include/executor/nodeHash.h
@@ -19,6 +19,7 @@
 extern HashState *ExecInitHash(Hash *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecHash(HashState *node);
 extern Node *MultiExecHash(HashState *node);
+extern bool StartHash(HashState *node);
 extern void ExecEndHash(HashState *node);
 extern void ExecReScanHash(HashState *node);
 
diff --git a/src/include/executor/nodeHashjoin.h b/src/include/executor/nodeHashjoin.h
index c35a51c..826f639 100644
--- a/src/include/executor/nodeHashjoin.h
+++ b/src/include/executor/nodeHashjoin.h
@@ -19,6 +19,7 @@
 
 extern HashJoinState *ExecInitHashJoin(HashJoin *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecHashJoin(HashJoinState *node);
+extern bool StartHashJoin(HashJoinState *node);
 extern void ExecEndHashJoin(HashJoinState *node);
 extern void ExecReScanHashJoin(HashJoinState *node);
 
diff --git a/src/include/executor/nodeLimit.h b/src/include/executor/nodeLimit.h
index 44f2936..5e8d2ea 100644
--- a/src/include/executor/nodeLimit.h
+++ b/src/include/executor/nodeLimit.h
@@ -18,6 +18,7 @@
 
 extern LimitState *ExecInitLimit(Limit *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecLimit(LimitState *node);
+extern bool StartLimit(LimitState *node);
 extern void ExecEndLimit(LimitState *node);
 extern void ExecReScanLimit(LimitState *node);
 
diff --git a/src/include/executor/nodeLockRows.h b/src/include/executor/nodeLockRows.h
index 41764a1..c450233 100644
--- a/src/include/executor/nodeLockRows.h
+++ b/src/include/executor/nodeLockRows.h
@@ -18,6 +18,7 @@
 
 extern LockRowsState *ExecInitLockRows(LockRows *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecLockRows(LockRowsState *node);
+extern bool StartLockRows(LockRowsState *node);
 extern void ExecEndLockRows(LockRowsState *node);
 extern void ExecReScanLockRows(LockRowsState *node);
 
diff --git a/src/include/executor/nodeMaterial.h b/src/include/executor/nodeMaterial.h
index cfb7a13..0392d29 100644
--- a/src/include/executor/nodeMaterial.h
+++ b/src/include/executor/nodeMaterial.h
@@ -18,6 +18,7 @@
 
 extern MaterialState *ExecInitMaterial(Material *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecMaterial(MaterialState *node);
+extern bool StartMaterial(MaterialState *node);
 extern void ExecEndMaterial(MaterialState *node);
 extern void ExecMaterialMarkPos(MaterialState *node);
 extern void ExecMaterialRestrPos(MaterialState *node);
diff --git a/src/include/executor/nodeMergeAppend.h b/src/include/executor/nodeMergeAppend.h
index 3c5068c..2f637dc 100644
--- a/src/include/executor/nodeMergeAppend.h
+++ b/src/include/executor/nodeMergeAppend.h
@@ -18,6 +18,7 @@
 
 extern MergeAppendState *ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecMergeAppend(MergeAppendState *node);
+extern bool StartMergeAppend(MergeAppendState *node);
 extern void ExecEndMergeAppend(MergeAppendState *node);
 extern void ExecReScanMergeAppend(MergeAppendState *node);
 
diff --git a/src/include/executor/nodeMergejoin.h b/src/include/executor/nodeMergejoin.h
index bee5367..ead6898 100644
--- a/src/include/executor/nodeMergejoin.h
+++ b/src/include/executor/nodeMergejoin.h
@@ -18,6 +18,7 @@
 
 extern MergeJoinState *ExecInitMergeJoin(MergeJoin *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecMergeJoin(MergeJoinState *node);
+extern bool StartMergeJoin(MergeJoinState *node);
 extern void ExecEndMergeJoin(MergeJoinState *node);
 extern void ExecReScanMergeJoin(MergeJoinState *node);
 
diff --git a/src/include/executor/nodeNestloop.h b/src/include/executor/nodeNestloop.h
index ff0720f..f79a002 100644
--- a/src/include/executor/nodeNestloop.h
+++ b/src/include/executor/nodeNestloop.h
@@ -18,6 +18,7 @@
 
 extern NestLoopState *ExecInitNestLoop(NestLoop *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecNestLoop(NestLoopState *node);
+extern bool StartNestLoop(NestLoopState *node);
 extern void ExecEndNestLoop(NestLoopState *node);
 extern void ExecReScanNestLoop(NestLoopState *node);
 
diff --git a/src/include/executor/nodeResult.h b/src/include/executor/nodeResult.h
index 17a7bb6..84b375d 100644
--- a/src/include/executor/nodeResult.h
+++ b/src/include/executor/nodeResult.h
@@ -18,6 +18,7 @@
 
 extern ResultState *ExecInitResult(Result *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecResult(ResultState *node);
+extern bool StartResult(ResultState *node);
 extern void ExecEndResult(ResultState *node);
 extern void ExecResultMarkPos(ResultState *node);
 extern void ExecResultRestrPos(ResultState *node);
diff --git a/src/include/executor/nodeSetOp.h b/src/include/executor/nodeSetOp.h
index ed6c96a..f960dda 100644
--- a/src/include/executor/nodeSetOp.h
+++ b/src/include/executor/nodeSetOp.h
@@ -18,6 +18,7 @@
 
 extern SetOpState *ExecInitSetOp(SetOp *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecSetOp(SetOpState *node);
+extern bool StartSetOp(SetOpState *node);
 extern void ExecEndSetOp(SetOpState *node);
 extern void ExecReScanSetOp(SetOpState *node);
 
diff --git a/src/include/executor/nodeSort.h b/src/include/executor/nodeSort.h
index 20d909b..0c6d12d 100644
--- a/src/include/executor/nodeSort.h
+++ b/src/include/executor/nodeSort.h
@@ -18,6 +18,7 @@
 
 extern SortState *ExecInitSort(Sort *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecSort(SortState *node);
+extern bool StartSort(SortState *node);
 extern void ExecEndSort(SortState *node);
 extern void ExecSortMarkPos(SortState *node);
 extern void ExecSortRestrPos(SortState *node);
diff --git a/src/include/executor/nodeSubqueryscan.h b/src/include/executor/nodeSubqueryscan.h
index 56e3aec..0301edd 100644
--- a/src/include/executor/nodeSubqueryscan.h
+++ b/src/include/executor/nodeSubqueryscan.h
@@ -18,6 +18,7 @@
 
 extern SubqueryScanState *ExecInitSubqueryScan(SubqueryScan *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecSubqueryScan(SubqueryScanState *node);
+extern bool StartSubqueryScan(SubqueryScanState *node);
 extern void ExecEndSubqueryScan(SubqueryScanState *node);
 extern void ExecReScanSubqueryScan(SubqueryScanState *node);
 
diff --git a/src/include/executor/nodeUnique.h b/src/include/executor/nodeUnique.h
index ec2df59..76727aa 100644
--- a/src/include/executor/nodeUnique.h
+++ b/src/include/executor/nodeUnique.h
@@ -18,6 +18,7 @@
 
 extern UniqueState *ExecInitUnique(Unique *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecUnique(UniqueState *node);
+extern bool StartUnique(UniqueState *node);
 extern void ExecEndUnique(UniqueState *node);
 extern void ExecReScanUnique(UniqueState *node);
 
diff --git a/src/include/executor/nodeWindowAgg.h b/src/include/executor/nodeWindowAgg.h
index 8a7b1fa..e9699b0 100644
--- a/src/include/executor/nodeWindowAgg.h
+++ b/src/include/executor/nodeWindowAgg.h
@@ -18,6 +18,7 @@
 
 extern WindowAggState *ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecWindowAgg(WindowAggState *node);
+extern bool StartWindowAgg(WindowAggState *node);
 extern void ExecEndWindowAgg(WindowAggState *node);
 extern void ExecReScanWindowAgg(WindowAggState *node);
 
-- 
1.8.3.1

