From 65fc69bb64f4596aff0c37a2d090ddf1609120bb Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Tue, 15 May 2018 20:21:32 +0900
Subject: [PATCH v1 2/3] infrastructure for asynchronous execution

This patch add an infrastructure for asynchronous execution. As a PoC
this makes only Append capable to handle asynchronously executable
subnodes.
---
 src/backend/commands/explain.c          |  17 ++
 src/backend/executor/Makefile           |   1 +
 src/backend/executor/execAsync.c        | 145 ++++++++++++
 src/backend/executor/nodeAppend.c       | 286 +++++++++++++++++++++---
 src/backend/executor/nodeForeignscan.c  |  22 +-
 src/backend/nodes/bitmapset.c           |  72 ++++++
 src/backend/nodes/copyfuncs.c           |   2 +
 src/backend/nodes/outfuncs.c            |   2 +
 src/backend/nodes/readfuncs.c           |   2 +
 src/backend/optimizer/plan/createplan.c |  83 +++++++
 src/backend/postmaster/pgstat.c         |   3 +
 src/backend/postmaster/syslogger.c      |   2 +-
 src/backend/utils/adt/ruleutils.c       |   8 +-
 src/include/executor/execAsync.h        |  23 ++
 src/include/executor/executor.h         |   1 +
 src/include/executor/nodeForeignscan.h  |   3 +
 src/include/foreign/fdwapi.h            |  11 +
 src/include/nodes/bitmapset.h           |   1 +
 src/include/nodes/execnodes.h           |  20 +-
 src/include/nodes/plannodes.h           |   9 +
 src/include/pgstat.h                    |   3 +-
 21 files changed, 676 insertions(+), 40 deletions(-)
 create mode 100644 src/backend/executor/execAsync.c
 create mode 100644 src/include/executor/execAsync.h

diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 62fb3434a3..9f06e1fbdc 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -82,6 +82,7 @@ static void show_sort_keys(SortState *sortstate, List *ancestors,
 						   ExplainState *es);
 static void show_merge_append_keys(MergeAppendState *mstate, List *ancestors,
 								   ExplainState *es);
+static void show_append_info(AppendState *astate, ExplainState *es);
 static void show_agg_keys(AggState *astate, List *ancestors,
 						  ExplainState *es);
 static void show_grouping_sets(PlanState *planstate, Agg *agg,
@@ -1319,6 +1320,8 @@ ExplainNode(PlanState *planstate, List *ancestors,
 		}
 		if (plan->parallel_aware)
 			appendStringInfoString(es->str, "Parallel ");
+		if (plan->async_capable)
+			appendStringInfoString(es->str, "Async ");
 		appendStringInfoString(es->str, pname);
 		es->indent++;
 	}
@@ -1860,6 +1863,11 @@ ExplainNode(PlanState *planstate, List *ancestors,
 		case T_Hash:
 			show_hash_info(castNode(HashState, planstate), es);
 			break;
+
+		case T_Append:
+			show_append_info(castNode(AppendState, planstate), es);
+			break;
+
 		default:
 			break;
 	}
@@ -2197,6 +2205,15 @@ show_merge_append_keys(MergeAppendState *mstate, List *ancestors,
 						 ancestors, es);
 }
 
+static void
+show_append_info(AppendState *astate, ExplainState *es)
+{
+	Append *plan = (Append *) astate->ps.plan;
+
+	if (plan->nasyncplans > 0)
+		ExplainPropertyInteger("Async subplans", "", plan->nasyncplans, es);
+}
+
 /*
  * Show the grouping keys for an Agg node.
  */
diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile
index a983800e4b..8a2d6e9961 100644
--- a/src/backend/executor/Makefile
+++ b/src/backend/executor/Makefile
@@ -14,6 +14,7 @@ include $(top_builddir)/src/Makefile.global
 
 OBJS = \
 	execAmi.o \
+	execAsync.o \
 	execCurrent.o \
 	execExpr.o \
 	execExprInterp.o \
diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c
new file mode 100644
index 0000000000..db477e2cf6
--- /dev/null
+++ b/src/backend/executor/execAsync.c
@@ -0,0 +1,145 @@
+/*-------------------------------------------------------------------------
+ *
+ * execAsync.c
+ *	  Support routines for asynchronous execution.
+ *
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *	  src/backend/executor/execAsync.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "executor/execAsync.h"
+#include "executor/nodeAppend.h"
+#include "executor/nodeForeignscan.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "utils/memutils.h"
+#include "utils/resowner.h"
+
+void ExecAsyncSetState(PlanState *pstate, AsyncState status)
+{
+	pstate->asyncstate = status;
+}
+
+bool
+ExecAsyncConfigureWait(WaitEventSet *wes, PlanState *node,
+					   void *data, bool reinit)
+{
+	switch (nodeTag(node))
+	{
+	case T_ForeignScanState:
+		return ExecForeignAsyncConfigureWait((ForeignScanState *)node,
+											 wes, data, reinit);
+		break;
+	default:
+			elog(ERROR, "unrecognized node type: %d",
+				(int) nodeTag(node));
+	}
+}
+
+/*
+ * struct for memory context callback argument used in ExecAsyncEventWait
+ */
+typedef struct {
+	int **p_refind;
+	int *p_refindsize;
+} ExecAsync_mcbarg;
+
+/*
+ * callback function to reset static variables pointing to the memory in
+ * TopTransactionContext in ExecAsyncEventWait.
+ */
+static void ExecAsyncMemoryContextCallback(void *arg)
+{
+	/* arg is the address of the variable refind in ExecAsyncEventWait */
+	ExecAsync_mcbarg *mcbarg = (ExecAsync_mcbarg *) arg;
+	*mcbarg->p_refind = NULL;
+	*mcbarg->p_refindsize = 0;
+}
+
+#define EVENT_BUFFER_SIZE 16
+
+Bitmapset *
+ExecAsyncEventWait(PlanState **nodes, Bitmapset *waitnodes, long timeout)
+{
+	static int *refind = NULL;
+	static int refindsize = 0;
+	WaitEventSet *wes;
+	WaitEvent   occurred_event[EVENT_BUFFER_SIZE];
+	int noccurred = 0;
+	Bitmapset *fired_events = NULL;
+	int i;
+	int n;
+
+	n = bms_num_members(waitnodes);
+	wes = CreateWaitEventSet(TopTransactionContext,
+							 TopTransactionResourceOwner, n);
+	if (refindsize < n)
+	{
+		if (refindsize == 0)
+			refindsize = EVENT_BUFFER_SIZE; /* XXX */
+		while (refindsize < n)
+			refindsize *= 2;
+		if (refind)
+			refind = (int *) repalloc(refind, refindsize * sizeof(int));
+		else
+		{
+			static ExecAsync_mcbarg mcb_arg =
+				{ &refind, &refindsize };
+			static MemoryContextCallback mcb =
+				{ ExecAsyncMemoryContextCallback, (void *)&mcb_arg, NULL };
+			MemoryContext oldctxt =
+				MemoryContextSwitchTo(TopTransactionContext);
+
+			/*
+			 * refind points to a memory block in
+			 * TopTransactionContext. Register a callback to reset it.
+			 */
+			MemoryContextRegisterResetCallback(TopTransactionContext, &mcb);
+			refind = (int *) palloc(refindsize * sizeof(int));
+			MemoryContextSwitchTo(oldctxt);
+		}
+	}
+
+	n = 0;
+	for (i = bms_next_member(waitnodes, -1) ; i >= 0 ;
+		 i = bms_next_member(waitnodes, i))
+	{
+		refind[i] = i;
+		if (ExecAsyncConfigureWait(wes, nodes[i], refind + i, true))
+			n++;
+	}
+
+	if (n == 0)
+	{
+		FreeWaitEventSet(wes);
+		return NULL;
+	}
+
+	noccurred = WaitEventSetWait(wes, timeout, occurred_event,
+								 EVENT_BUFFER_SIZE,
+								 WAIT_EVENT_ASYNC_WAIT);
+	FreeWaitEventSet(wes);
+	if (noccurred == 0)
+		return NULL;
+
+	for (i = 0 ; i < noccurred ; i++)
+	{
+		WaitEvent *w = &occurred_event[i];
+
+		if ((w->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) != 0)
+		{
+			int n = *(int*)w->user_data;
+
+			fired_events = bms_add_member(fired_events, n);
+		}
+	}
+
+	return fired_events;
+}
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index 5ff986ac7d..03dec4d648 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -60,6 +60,7 @@
 #include "executor/execdebug.h"
 #include "executor/execPartition.h"
 #include "executor/nodeAppend.h"
+#include "executor/execAsync.h"
 #include "miscadmin.h"
 
 /* Shared state for parallel-aware Append. */
@@ -81,6 +82,7 @@ struct ParallelAppendState
 #define NO_MATCHING_SUBPLANS		-2
 
 static TupleTableSlot *ExecAppend(PlanState *pstate);
+static TupleTableSlot *ExecAppendAsync(PlanState *pstate);
 static bool choose_next_subplan_locally(AppendState *node);
 static bool choose_next_subplan_for_leader(AppendState *node);
 static bool choose_next_subplan_for_worker(AppendState *node);
@@ -104,22 +106,28 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
 	PlanState **appendplanstates;
 	Bitmapset  *validsubplans;
 	int			nplans;
+	int			nasyncplans;
 	int			firstvalid;
 	int			i,
 				j;
 
 	/* check for unsupported flags */
-	Assert(!(eflags & EXEC_FLAG_MARK));
+	Assert(!(eflags & (EXEC_FLAG_MARK | EXEC_FLAG_ASYNC)));
 
 	/*
 	 * create new AppendState for our append node
 	 */
 	appendstate->ps.plan = (Plan *) node;
 	appendstate->ps.state = estate;
-	appendstate->ps.ExecProcNode = ExecAppend;
+
+	/* choose appropriate version of Exec function */
+	if (node->nasyncplans == 0)
+		appendstate->ps.ExecProcNode = ExecAppend;
+	else
+		appendstate->ps.ExecProcNode = ExecAppendAsync;
 
 	/* Let choose_next_subplan_* function handle setting the first subplan */
-	appendstate->as_whichplan = INVALID_SUBPLAN_INDEX;
+	appendstate->as_whichsyncplan = INVALID_SUBPLAN_INDEX;
 
 	/* If run-time partition pruning is enabled, then set that up now */
 	if (node->part_prune_info != NULL)
@@ -152,7 +160,7 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
 			 */
 			if (bms_is_empty(validsubplans))
 			{
-				appendstate->as_whichplan = NO_MATCHING_SUBPLANS;
+				appendstate->as_whichsyncplan = NO_MATCHING_SUBPLANS;
 
 				/* Mark the first as valid so that it's initialized below */
 				validsubplans = bms_make_singleton(0);
@@ -212,10 +220,20 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
 	 */
 	j = 0;
 	firstvalid = nplans;
+	nasyncplans = 0;
+
 	i = -1;
 	while ((i = bms_next_member(validsubplans, i)) >= 0)
 	{
 		Plan	   *initNode = (Plan *) list_nth(node->appendplans, i);
+		int			sub_eflags = eflags;
+
+		/* Let async-capable subplans run asynchronously */
+		if (i < node->nasyncplans)
+		{
+			sub_eflags |= EXEC_FLAG_ASYNC;
+			nasyncplans++;
+		}
 
 		/*
 		 * Record the lowest appendplans index which is a valid partial plan.
@@ -223,13 +241,28 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
 		if (i >= node->first_partial_plan && j < firstvalid)
 			firstvalid = j;
 
-		appendplanstates[j++] = ExecInitNode(initNode, estate, eflags);
+		appendplanstates[j++] = ExecInitNode(initNode, estate, sub_eflags);
 	}
 
 	appendstate->as_first_partial_plan = firstvalid;
 	appendstate->appendplans = appendplanstates;
 	appendstate->as_nplans = nplans;
 
+	/* fill in async stuff */
+	appendstate->as_nasyncplans = nasyncplans;
+	appendstate->as_syncdone = (nasyncplans == nplans);
+
+	if (appendstate->as_nasyncplans)
+	{
+		appendstate->as_asyncresult = (TupleTableSlot **)
+			palloc0(node->nasyncplans * sizeof(TupleTableSlot *));
+
+		/* initially, all async requests need a request */
+		for (i = 0; i < appendstate->as_nasyncplans; ++i)
+			appendstate->as_needrequest =
+				bms_add_member(appendstate->as_needrequest, i);
+	}
+
 	/*
 	 * Miscellaneous initialization
 	 */
@@ -253,21 +286,23 @@ ExecAppend(PlanState *pstate)
 {
 	AppendState *node = castNode(AppendState, pstate);
 
-	if (node->as_whichplan < 0)
+	if (node->as_whichsyncplan < 0)
 	{
 		/*
 		 * If no subplan has been chosen, we must choose one before
 		 * proceeding.
 		 */
-		if (node->as_whichplan == INVALID_SUBPLAN_INDEX &&
+		if (node->as_whichsyncplan == INVALID_SUBPLAN_INDEX &&
 			!node->choose_next_subplan(node))
 			return ExecClearTuple(node->ps.ps_ResultTupleSlot);
 
 		/* Nothing to do if there are no matching subplans */
-		else if (node->as_whichplan == NO_MATCHING_SUBPLANS)
+		else if (node->as_whichsyncplan == NO_MATCHING_SUBPLANS)
 			return ExecClearTuple(node->ps.ps_ResultTupleSlot);
 	}
 
+	Assert(node->as_nasyncplans == 0);
+
 	for (;;)
 	{
 		PlanState  *subnode;
@@ -278,8 +313,9 @@ ExecAppend(PlanState *pstate)
 		/*
 		 * figure out which subplan we are currently processing
 		 */
-		Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
-		subnode = node->appendplans[node->as_whichplan];
+		Assert(node->as_whichsyncplan >= 0 &&
+			   node->as_whichsyncplan < node->as_nplans);
+		subnode = node->appendplans[node->as_whichsyncplan];
 
 		/*
 		 * get a tuple from the subplan
@@ -302,6 +338,175 @@ ExecAppend(PlanState *pstate)
 	}
 }
 
+static TupleTableSlot *
+ExecAppendAsync(PlanState *pstate)
+{
+	AppendState *node = castNode(AppendState, pstate);
+	Bitmapset *needrequest;
+	int	i;
+
+	Assert(node->as_nasyncplans > 0);
+
+restart:
+	if (node->as_nasyncresult > 0)
+	{
+		--node->as_nasyncresult;
+		return node->as_asyncresult[node->as_nasyncresult];
+	}
+
+	needrequest = node->as_needrequest;
+	node->as_needrequest = NULL;
+	while ((i = bms_first_member(needrequest)) >= 0)
+	{
+		TupleTableSlot *slot;
+		PlanState *subnode = node->appendplans[i];
+
+		slot = ExecProcNode(subnode);
+		if (subnode->asyncstate == AS_AVAILABLE)
+		{
+			if (!TupIsNull(slot))
+			{
+				node->as_asyncresult[node->as_nasyncresult++] = slot;
+				node->as_needrequest = bms_add_member(node->as_needrequest, i);
+			}
+		}
+		else
+			node->as_pending_async = bms_add_member(node->as_pending_async, i);
+	}
+	bms_free(needrequest);
+
+	for (;;)
+	{
+		TupleTableSlot *result;
+
+		/* return now if a result is available */
+		if (node->as_nasyncresult > 0)
+		{
+			--node->as_nasyncresult;
+			return node->as_asyncresult[node->as_nasyncresult];
+		}
+
+		while (!bms_is_empty(node->as_pending_async))
+		{
+			long timeout = node->as_syncdone ? -1 : 0;
+			Bitmapset *fired;
+			int i;
+
+			fired = ExecAsyncEventWait(node->appendplans,
+									   node->as_pending_async,
+									   timeout);
+
+			if (bms_is_empty(fired) && node->as_syncdone)
+			{
+				/*
+				 * No subplan fired. This happens when even in normal
+				 * operation where the subnode already prepared results before
+				 * waiting. as_pending_result is storing stale information so
+				 * restart from the beginning.
+				 */
+				node->as_needrequest = node->as_pending_async;
+				node->as_pending_async = NULL;
+				goto restart;
+			}
+
+			while ((i = bms_first_member(fired)) >= 0)
+			{
+				TupleTableSlot *slot;
+				PlanState *subnode = node->appendplans[i];
+				slot = ExecProcNode(subnode);
+				if (subnode->asyncstate == AS_AVAILABLE)
+				{
+					if (!TupIsNull(slot))
+					{
+						node->as_asyncresult[node->as_nasyncresult++] = slot;
+						node->as_needrequest =
+							bms_add_member(node->as_needrequest, i);
+					}
+					node->as_pending_async =
+						bms_del_member(node->as_pending_async, i);
+				}
+			}
+			bms_free(fired);
+
+			/* return now if a result is available */
+			if (node->as_nasyncresult > 0)
+			{
+				--node->as_nasyncresult;
+				return node->as_asyncresult[node->as_nasyncresult];
+			}
+
+			if (!node->as_syncdone)
+				break;
+		}
+
+		/*
+		 * If there is no asynchronous activity still pending and the
+		 * synchronous activity is also complete, we're totally done scanning
+		 * this node.  Otherwise, we're done with the asynchronous stuff but
+		 * must continue scanning the synchronous children.
+		 */
+		if (node->as_syncdone)
+		{
+			Assert(bms_is_empty(node->as_pending_async));
+			return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+		}
+
+		/*
+		 * get a tuple from the subplan
+		 */
+		
+		if (node->as_whichsyncplan < 0)
+		{
+			/*
+			 * If no subplan has been chosen, we must choose one before
+			 * proceeding.
+			 */
+			if (node->as_whichsyncplan == INVALID_SUBPLAN_INDEX &&
+				!node->choose_next_subplan(node))
+			{
+				node->as_syncdone = true;
+				return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+			}
+
+			/* Nothing to do if there are no matching subplans */
+			else if (node->as_whichsyncplan == NO_MATCHING_SUBPLANS)
+			{
+				node->as_syncdone = true;
+				return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+			}
+		}
+
+		result = ExecProcNode(node->appendplans[node->as_whichsyncplan]);
+
+		if (!TupIsNull(result))
+		{
+			/*
+			 * If the subplan gave us something then return it as-is. We do
+			 * NOT make use of the result slot that was set up in
+			 * ExecInitAppend; there's no need for it.
+			 */
+			return result;
+		}
+
+		/*
+		 * Go on to the "next" subplan. If no more subplans, return the empty
+		 * slot set up for us by ExecInitAppend, unless there are async plans
+		 * we have yet to finish.
+		 */
+		if (!node->choose_next_subplan(node))
+		{
+			node->as_syncdone = true;
+			if (bms_is_empty(node->as_pending_async))
+			{
+				Assert(bms_is_empty(node->as_needrequest));
+				return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+			}
+		}
+
+		/* Else loop back and try to get a tuple from the new subplan */
+	}
+}
+
 /* ----------------------------------------------------------------
  *		ExecEndAppend
  *
@@ -348,6 +553,15 @@ ExecReScanAppend(AppendState *node)
 		node->as_valid_subplans = NULL;
 	}
 
+	/* Reset async state. */
+	for (i = 0; i < node->as_nasyncplans; ++i)
+	{
+		ExecShutdownNode(node->appendplans[i]);
+		node->as_needrequest = bms_add_member(node->as_needrequest, i);
+	}
+	node->as_nasyncresult = 0;
+	node->as_syncdone = (node->as_nasyncplans == node->as_nplans);
+
 	for (i = 0; i < node->as_nplans; i++)
 	{
 		PlanState  *subnode = node->appendplans[i];
@@ -368,7 +582,7 @@ ExecReScanAppend(AppendState *node)
 	}
 
 	/* Let choose_next_subplan_* function handle setting the first subplan */
-	node->as_whichplan = INVALID_SUBPLAN_INDEX;
+	node->as_whichsyncplan = INVALID_SUBPLAN_INDEX;
 }
 
 /* ----------------------------------------------------------------
@@ -456,7 +670,7 @@ ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
 static bool
 choose_next_subplan_locally(AppendState *node)
 {
-	int			whichplan = node->as_whichplan;
+	int			whichplan = node->as_whichsyncplan;
 	int			nextplan;
 
 	/* We should never be called when there are no subplans */
@@ -475,6 +689,10 @@ choose_next_subplan_locally(AppendState *node)
 			node->as_valid_subplans =
 				ExecFindMatchingSubPlans(node->as_prune_state);
 
+		/* Exclude async plans */
+		if (node->as_nasyncplans > 0)
+			bms_del_range(node->as_valid_subplans, 0, node->as_nasyncplans - 1);
+
 		whichplan = -1;
 	}
 
@@ -489,7 +707,7 @@ choose_next_subplan_locally(AppendState *node)
 	if (nextplan < 0)
 		return false;
 
-	node->as_whichplan = nextplan;
+	node->as_whichsyncplan = nextplan;
 
 	return true;
 }
@@ -511,19 +729,19 @@ choose_next_subplan_for_leader(AppendState *node)
 	Assert(ScanDirectionIsForward(node->ps.state->es_direction));
 
 	/* We should never be called when there are no subplans */
-	Assert(node->as_whichplan != NO_MATCHING_SUBPLANS);
+	Assert(node->as_whichsyncplan != NO_MATCHING_SUBPLANS);
 
 	LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
 
-	if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
+	if (node->as_whichsyncplan != INVALID_SUBPLAN_INDEX)
 	{
 		/* Mark just-completed subplan as finished. */
-		node->as_pstate->pa_finished[node->as_whichplan] = true;
+		node->as_pstate->pa_finished[node->as_whichsyncplan] = true;
 	}
 	else
 	{
 		/* Start with last subplan. */
-		node->as_whichplan = node->as_nplans - 1;
+		node->as_whichsyncplan = node->as_nplans - 1;
 
 		/*
 		 * If we've yet to determine the valid subplans then do so now.  If
@@ -544,12 +762,12 @@ choose_next_subplan_for_leader(AppendState *node)
 	}
 
 	/* Loop until we find a subplan to execute. */
-	while (pstate->pa_finished[node->as_whichplan])
+	while (pstate->pa_finished[node->as_whichsyncplan])
 	{
-		if (node->as_whichplan == 0)
+		if (node->as_whichsyncplan == 0)
 		{
 			pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
-			node->as_whichplan = INVALID_SUBPLAN_INDEX;
+			node->as_whichsyncplan = INVALID_SUBPLAN_INDEX;
 			LWLockRelease(&pstate->pa_lock);
 			return false;
 		}
@@ -558,12 +776,12 @@ choose_next_subplan_for_leader(AppendState *node)
 		 * We needn't pay attention to as_valid_subplans here as all invalid
 		 * plans have been marked as finished.
 		 */
-		node->as_whichplan--;
+		node->as_whichsyncplan--;
 	}
 
 	/* If non-partial, immediately mark as finished. */
-	if (node->as_whichplan < node->as_first_partial_plan)
-		node->as_pstate->pa_finished[node->as_whichplan] = true;
+	if (node->as_whichsyncplan < node->as_first_partial_plan)
+		node->as_pstate->pa_finished[node->as_whichsyncplan] = true;
 
 	LWLockRelease(&pstate->pa_lock);
 
@@ -592,13 +810,13 @@ choose_next_subplan_for_worker(AppendState *node)
 	Assert(ScanDirectionIsForward(node->ps.state->es_direction));
 
 	/* We should never be called when there are no subplans */
-	Assert(node->as_whichplan != NO_MATCHING_SUBPLANS);
+	Assert(node->as_whichsyncplan != NO_MATCHING_SUBPLANS);
 
 	LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
 
 	/* Mark just-completed subplan as finished. */
-	if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
-		node->as_pstate->pa_finished[node->as_whichplan] = true;
+	if (node->as_whichsyncplan != INVALID_SUBPLAN_INDEX)
+		node->as_pstate->pa_finished[node->as_whichsyncplan] = true;
 
 	/*
 	 * If we've yet to determine the valid subplans then do so now.  If
@@ -620,7 +838,7 @@ choose_next_subplan_for_worker(AppendState *node)
 	}
 
 	/* Save the plan from which we are starting the search. */
-	node->as_whichplan = pstate->pa_next_plan;
+	node->as_whichsyncplan = pstate->pa_next_plan;
 
 	/* Loop until we find a valid subplan to execute. */
 	while (pstate->pa_finished[pstate->pa_next_plan])
@@ -634,7 +852,7 @@ choose_next_subplan_for_worker(AppendState *node)
 			/* Advance to the next valid plan. */
 			pstate->pa_next_plan = nextplan;
 		}
-		else if (node->as_whichplan > node->as_first_partial_plan)
+		else if (node->as_whichsyncplan > node->as_first_partial_plan)
 		{
 			/*
 			 * Try looping back to the first valid partial plan, if there is
@@ -643,7 +861,7 @@ choose_next_subplan_for_worker(AppendState *node)
 			nextplan = bms_next_member(node->as_valid_subplans,
 									   node->as_first_partial_plan - 1);
 			pstate->pa_next_plan =
-				nextplan < 0 ? node->as_whichplan : nextplan;
+				nextplan < 0 ? node->as_whichsyncplan : nextplan;
 		}
 		else
 		{
@@ -651,10 +869,10 @@ choose_next_subplan_for_worker(AppendState *node)
 			 * At last plan, and either there are no partial plans or we've
 			 * tried them all.  Arrange to bail out.
 			 */
-			pstate->pa_next_plan = node->as_whichplan;
+			pstate->pa_next_plan = node->as_whichsyncplan;
 		}
 
-		if (pstate->pa_next_plan == node->as_whichplan)
+		if (pstate->pa_next_plan == node->as_whichsyncplan)
 		{
 			/* We've tried everything! */
 			pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
@@ -664,7 +882,7 @@ choose_next_subplan_for_worker(AppendState *node)
 	}
 
 	/* Pick the plan we found, and advance pa_next_plan one more time. */
-	node->as_whichplan = pstate->pa_next_plan;
+	node->as_whichsyncplan = pstate->pa_next_plan;
 	pstate->pa_next_plan = bms_next_member(node->as_valid_subplans,
 										   pstate->pa_next_plan);
 
@@ -691,8 +909,8 @@ choose_next_subplan_for_worker(AppendState *node)
 	}
 
 	/* If non-partial, immediately mark as finished. */
-	if (node->as_whichplan < node->as_first_partial_plan)
-		node->as_pstate->pa_finished[node->as_whichplan] = true;
+	if (node->as_whichsyncplan < node->as_first_partial_plan)
+		node->as_pstate->pa_finished[node->as_whichsyncplan] = true;
 
 	LWLockRelease(&pstate->pa_lock);
 
diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c
index 52af1dac5c..1a54383ec8 100644
--- a/src/backend/executor/nodeForeignscan.c
+++ b/src/backend/executor/nodeForeignscan.c
@@ -117,7 +117,6 @@ ExecForeignScan(PlanState *pstate)
 					(ExecScanRecheckMtd) ForeignRecheck);
 }
 
-
 /* ----------------------------------------------------------------
  *		ExecInitForeignScan
  * ----------------------------------------------------------------
@@ -141,6 +140,10 @@ ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags)
 	scanstate->ss.ps.plan = (Plan *) node;
 	scanstate->ss.ps.state = estate;
 	scanstate->ss.ps.ExecProcNode = ExecForeignScan;
+	scanstate->ss.ps.asyncstate = AS_AVAILABLE;
+
+	if ((eflags & EXEC_FLAG_ASYNC) != 0)
+		scanstate->fs_async = true;
 
 	/*
 	 * Miscellaneous initialization
@@ -384,3 +387,20 @@ ExecShutdownForeignScan(ForeignScanState *node)
 	if (fdwroutine->ShutdownForeignScan)
 		fdwroutine->ShutdownForeignScan(node);
 }
+
+/* ----------------------------------------------------------------
+ *		ExecAsyncForeignScanConfigureWait
+ *
+ *		In async mode, configure for a wait
+ * ----------------------------------------------------------------
+ */
+bool
+ExecForeignAsyncConfigureWait(ForeignScanState *node, WaitEventSet *wes,
+							  void *caller_data, bool reinit)
+{
+	FdwRoutine *fdwroutine = node->fdwroutine;
+
+	Assert(fdwroutine->ForeignAsyncConfigureWait != NULL);
+	return fdwroutine->ForeignAsyncConfigureWait(node, wes,
+												 caller_data, reinit);
+}
diff --git a/src/backend/nodes/bitmapset.c b/src/backend/nodes/bitmapset.c
index 665149defe..5d4e19a052 100644
--- a/src/backend/nodes/bitmapset.c
+++ b/src/backend/nodes/bitmapset.c
@@ -895,6 +895,78 @@ bms_add_range(Bitmapset *a, int lower, int upper)
 	return a;
 }
 
+/*
+ * bms_del_range
+ *		Delete members in the range of 'lower' to 'upper' from the set.
+ *
+ * Note this could also be done by calling bms_del_member in a loop, however,
+ * using this function will be faster when the range is large as we work at
+ * the bitmapword level rather than at bit level.
+ */
+Bitmapset *
+bms_del_range(Bitmapset *a, int lower, int upper)
+{
+	int			lwordnum,
+				lbitnum,
+				uwordnum,
+				ushiftbits,
+				wordnum;
+
+	if (lower < 0 || upper < 0)
+		elog(ERROR, "negative bitmapset member not allowed");
+	if (lower > upper)
+		elog(ERROR, "lower range must not be above upper range");
+	uwordnum = WORDNUM(upper);
+
+	if (a == NULL)
+	{
+		a = (Bitmapset *) palloc0(BITMAPSET_SIZE(uwordnum + 1));
+		a->nwords = uwordnum + 1;
+	}
+
+	/* ensure we have enough words to store the upper bit */
+	else if (uwordnum >= a->nwords)
+	{
+		int			oldnwords = a->nwords;
+		int			i;
+
+		a = (Bitmapset *) repalloc(a, BITMAPSET_SIZE(uwordnum + 1));
+		a->nwords = uwordnum + 1;
+		/* zero out the enlarged portion */
+		for (i = oldnwords; i < a->nwords; i++)
+			a->words[i] = 0;
+	}
+
+	wordnum = lwordnum = WORDNUM(lower);
+
+	lbitnum = BITNUM(lower);
+	ushiftbits = BITNUM(upper) + 1;
+
+	/*
+	 * Special case when lwordnum is the same as uwordnum we must perform the
+	 * upper and lower masking on the word.
+	 */
+	if (lwordnum == uwordnum)
+	{
+		a->words[lwordnum] &= ((bitmapword) (((bitmapword) 1 << lbitnum) - 1)
+								| (~(bitmapword) 0) << ushiftbits);
+	}
+	else
+	{
+		/* turn off lbitnum and all bits left of it */
+		a->words[wordnum++] &= (bitmapword) (((bitmapword) 1 << lbitnum) - 1);
+
+		/* turn off all bits for any intermediate words */
+		while (wordnum < uwordnum)
+			a->words[wordnum++] = (bitmapword) 0;
+
+		/* turn off upper's bit and all bits right of it. */
+		a->words[uwordnum] &= (~(bitmapword) 0) << ushiftbits;
+	}
+
+	return a;
+}
+
 /*
  * bms_int_members - like bms_intersect, but left input is recycled
  */
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index a74b56bb59..a266904010 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -244,6 +244,8 @@ _copyAppend(const Append *from)
 	COPY_NODE_FIELD(appendplans);
 	COPY_SCALAR_FIELD(first_partial_plan);
 	COPY_NODE_FIELD(part_prune_info);
+	COPY_SCALAR_FIELD(nasyncplans);
+	COPY_SCALAR_FIELD(referent);
 
 	return newnode;
 }
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index a80eccc2c1..bf87e721a5 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -434,6 +434,8 @@ _outAppend(StringInfo str, const Append *node)
 	WRITE_NODE_FIELD(appendplans);
 	WRITE_INT_FIELD(first_partial_plan);
 	WRITE_NODE_FIELD(part_prune_info);
+	WRITE_INT_FIELD(nasyncplans);
+	WRITE_INT_FIELD(referent);
 }
 
 static void
diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c
index 764e3bb90c..25b84b3f15 100644
--- a/src/backend/nodes/readfuncs.c
+++ b/src/backend/nodes/readfuncs.c
@@ -1639,6 +1639,8 @@ _readAppend(void)
 	READ_NODE_FIELD(appendplans);
 	READ_INT_FIELD(first_partial_plan);
 	READ_NODE_FIELD(part_prune_info);
+	READ_INT_FIELD(nasyncplans);
+	READ_INT_FIELD(referent);
 
 	READ_DONE();
 }
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index aee81bd755..c676980a30 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -207,6 +207,9 @@ static NamedTuplestoreScan *make_namedtuplestorescan(List *qptlist, List *qpqual
 													 Index scanrelid, char *enrname);
 static WorkTableScan *make_worktablescan(List *qptlist, List *qpqual,
 										 Index scanrelid, int wtParam);
+static Append *make_append(List *appendplans, int first_partial_plan,
+						   int nasyncplans,	int referent,
+						   List *tlist, PartitionPruneInfo *partpruneinfos);
 static RecursiveUnion *make_recursive_union(List *tlist,
 											Plan *lefttree,
 											Plan *righttree,
@@ -292,6 +295,7 @@ static ModifyTable *make_modifytable(PlannerInfo *root,
 									 List *rowMarks, OnConflictExpr *onconflict, int epqParam);
 static GatherMerge *create_gather_merge_plan(PlannerInfo *root,
 											 GatherMergePath *best_path);
+static bool is_async_capable_path(Path *path);
 
 
 /*
@@ -1069,6 +1073,8 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
 	bool		tlist_was_changed = false;
 	List	   *pathkeys = best_path->path.pathkeys;
 	List	   *subplans = NIL;
+	List	   *asyncplans = NIL;
+	List	   *syncplans = NIL;
 	ListCell   *subpaths;
 	RelOptInfo *rel = best_path->path.parent;
 	PartitionPruneInfo *partpruneinfo = NULL;
@@ -1077,6 +1083,9 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
 	Oid		   *nodeSortOperators = NULL;
 	Oid		   *nodeCollations = NULL;
 	bool	   *nodeNullsFirst = NULL;
+	int			nasyncplans = 0;
+	bool		first = true;
+	bool		referent_is_sync = true;
 
 	/*
 	 * The subpaths list could be empty, if every child was proven empty by
@@ -1206,6 +1215,23 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
 		}
 
 		subplans = lappend(subplans, subplan);
+
+		/*
+		 * Classify as async-capable or not. If we have decided to run the
+		 * chidlren in parallel, we cannot any one of them run asynchronously.
+		 */
+		if (!best_path->path.parallel_safe && is_async_capable_path(subpath))
+		{
+			subplan->async_capable = true;
+			asyncplans = lappend(asyncplans, subplan);
+			++nasyncplans;
+			if (first)
+				referent_is_sync = false;
+		}
+		else
+			syncplans = lappend(syncplans, subplan);
+
+		first = false;
 	}
 
 	/*
@@ -1244,6 +1270,18 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
 	plan->first_partial_plan = best_path->first_partial_path;
 	plan->part_prune_info = partpruneinfo;
 
+	/*
+	 * XXX ideally, if there's just one child, we'd not bother to generate an
+	 * Append node but just return the single child.  At the moment this does
+	 * not work because the varno of the child scan plan won't match the
+	 * parent-rel Vars it'll be asked to emit.
+	 */
+
+	plan = make_append(list_concat(asyncplans, syncplans),
+					   best_path->first_partial_path, nasyncplans,
+					   referent_is_sync ? nasyncplans : 0, tlist,
+					   partpruneinfo);
+
 	copy_generic_path_info(&plan->plan, (Path *) best_path);
 
 	/*
@@ -5462,6 +5500,27 @@ make_foreignscan(List *qptlist,
 	return node;
 }
 
+static Append *
+make_append(List *appendplans, int first_partial_plan, int nasyncplans,
+			int referent, List *tlist, PartitionPruneInfo *partpruneinfo)
+{
+	Append	   *node = makeNode(Append);
+	Plan	   *plan = &node->plan;
+
+	plan->targetlist = tlist;
+	plan->qual = NIL;
+	plan->lefttree = NULL;
+	plan->righttree = NULL;
+
+	node->appendplans = appendplans;
+	node->first_partial_plan = first_partial_plan;
+	node->part_prune_info = partpruneinfo;
+	node->nasyncplans = nasyncplans;
+	node->referent = referent;
+
+	return node;
+}
+
 static RecursiveUnion *
 make_recursive_union(List *tlist,
 					 Plan *lefttree,
@@ -6836,3 +6895,27 @@ is_projection_capable_plan(Plan *plan)
 	}
 	return true;
 }
+
+/*
+ * is_projection_capable_path
+ *		Check whether a given Path node is async-capable.
+ */
+static bool
+is_async_capable_path(Path *path)
+{
+	switch (nodeTag(path))
+	{
+		case T_ForeignPath:
+			{
+				FdwRoutine *fdwroutine = path->parent->fdwroutine;
+
+				Assert(fdwroutine != NULL);
+				if (fdwroutine->IsForeignPathAsyncCapable != NULL &&
+					fdwroutine->IsForeignPathAsyncCapable((ForeignPath *) path))
+					return true;
+			}
+		default:
+			break;
+	}
+	return false;
+}
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index fabcf31de8..575ccd5def 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3853,6 +3853,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
 		case WAIT_EVENT_SYNC_REP:
 			event_name = "SyncRep";
 			break;
+		case WAIT_EVENT_ASYNC_WAIT:
+			event_name = "AsyncExecWait";
+			break;
 			/* no default case, so that compiler will warn */
 	}
 
diff --git a/src/backend/postmaster/syslogger.c b/src/backend/postmaster/syslogger.c
index bb2baff763..7669e6ff53 100644
--- a/src/backend/postmaster/syslogger.c
+++ b/src/backend/postmaster/syslogger.c
@@ -306,7 +306,7 @@ SysLoggerMain(int argc, char *argv[])
 	 * syslog pipe, which implies that all other backends have exited
 	 * (including the postmaster).
 	 */
-	wes = CreateWaitEventSet(CurrentMemoryContext, 2);
+	wes = CreateWaitEventSet(CurrentMemoryContext, NULL, 2);
 	AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL);
 #ifndef WIN32
 	AddWaitEventToSet(wes, WL_SOCKET_READABLE, syslogPipe[0], NULL, NULL);
diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c
index 13685a0a0e..60b749f062 100644
--- a/src/backend/utils/adt/ruleutils.c
+++ b/src/backend/utils/adt/ruleutils.c
@@ -4640,7 +4640,7 @@ set_deparse_planstate(deparse_namespace *dpns, PlanState *ps)
 	dpns->planstate = ps;
 
 	/*
-	 * We special-case Append and MergeAppend to pretend that the first child
+	 * We special-case Append and MergeAppend to pretend that a specific child
 	 * plan is the OUTER referent; we have to interpret OUTER Vars in their
 	 * tlists according to one of the children, and the first one is the most
 	 * natural choice.  Likewise special-case ModifyTable to pretend that the
@@ -4648,7 +4648,11 @@ set_deparse_planstate(deparse_namespace *dpns, PlanState *ps)
 	 * lists containing references to non-target relations.
 	 */
 	if (IsA(ps, AppendState))
-		dpns->outer_planstate = ((AppendState *) ps)->appendplans[0];
+	{
+		AppendState *aps = (AppendState *) ps;
+		Append *app = (Append *) ps->plan;
+		dpns->outer_planstate = aps->appendplans[app->referent];
+	}
 	else if (IsA(ps, MergeAppendState))
 		dpns->outer_planstate = ((MergeAppendState *) ps)->mergeplans[0];
 	else if (IsA(ps, ModifyTableState))
diff --git a/src/include/executor/execAsync.h b/src/include/executor/execAsync.h
new file mode 100644
index 0000000000..5fd67d9004
--- /dev/null
+++ b/src/include/executor/execAsync.h
@@ -0,0 +1,23 @@
+/*--------------------------------------------------------------------
+ * execAsync.c
+ *		Support functions for asynchronous query execution
+ *
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *		src/backend/executor/execAsync.c
+ *--------------------------------------------------------------------
+ */
+#ifndef EXECASYNC_H
+#define EXECASYNC_H
+
+#include "nodes/execnodes.h"
+#include "storage/latch.h"
+
+extern void ExecAsyncSetState(PlanState *pstate, AsyncState status);
+extern bool ExecAsyncConfigureWait(WaitEventSet *wes, PlanState *node,
+								   void *data, bool reinit);
+extern Bitmapset *ExecAsyncEventWait(PlanState **nodes, Bitmapset *waitnodes,
+									 long timeout);
+#endif   /* EXECASYNC_H */
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 6298c7c8ca..4adb2efe76 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -59,6 +59,7 @@
 #define EXEC_FLAG_MARK			0x0008	/* need mark/restore */
 #define EXEC_FLAG_SKIP_TRIGGERS 0x0010	/* skip AfterTrigger calls */
 #define EXEC_FLAG_WITH_NO_DATA	0x0020	/* rel scannability doesn't matter */
+#define EXEC_FLAG_ASYNC			0x0040	/* request async execution */
 
 
 /* Hook for plugins to get control in ExecutorStart() */
diff --git a/src/include/executor/nodeForeignscan.h b/src/include/executor/nodeForeignscan.h
index ca7723c899..81791033af 100644
--- a/src/include/executor/nodeForeignscan.h
+++ b/src/include/executor/nodeForeignscan.h
@@ -30,5 +30,8 @@ extern void ExecForeignScanReInitializeDSM(ForeignScanState *node,
 extern void ExecForeignScanInitializeWorker(ForeignScanState *node,
 											ParallelWorkerContext *pwcxt);
 extern void ExecShutdownForeignScan(ForeignScanState *node);
+extern bool ExecForeignAsyncConfigureWait(ForeignScanState *node,
+										  WaitEventSet *wes,
+										  void *caller_data, bool reinit);
 
 #endif							/* NODEFOREIGNSCAN_H */
diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h
index 822686033e..851cd15e65 100644
--- a/src/include/foreign/fdwapi.h
+++ b/src/include/foreign/fdwapi.h
@@ -169,6 +169,11 @@ typedef bool (*IsForeignScanParallelSafe_function) (PlannerInfo *root,
 typedef List *(*ReparameterizeForeignPathByChild_function) (PlannerInfo *root,
 															List *fdw_private,
 															RelOptInfo *child_rel);
+typedef bool (*IsForeignPathAsyncCapable_function) (ForeignPath *path);
+typedef bool (*ForeignAsyncConfigureWait_function) (ForeignScanState *node,
+													WaitEventSet *wes,
+													void *caller_data,
+													bool reinit);
 
 /*
  * FdwRoutine is the struct returned by a foreign-data wrapper's handler
@@ -190,6 +195,7 @@ typedef struct FdwRoutine
 	GetForeignPlan_function GetForeignPlan;
 	BeginForeignScan_function BeginForeignScan;
 	IterateForeignScan_function IterateForeignScan;
+	IterateForeignScan_function IterateForeignScanAsync;
 	ReScanForeignScan_function ReScanForeignScan;
 	EndForeignScan_function EndForeignScan;
 
@@ -242,6 +248,11 @@ typedef struct FdwRoutine
 	InitializeDSMForeignScan_function InitializeDSMForeignScan;
 	ReInitializeDSMForeignScan_function ReInitializeDSMForeignScan;
 	InitializeWorkerForeignScan_function InitializeWorkerForeignScan;
+
+	/* Support functions for asynchronous execution */
+	IsForeignPathAsyncCapable_function IsForeignPathAsyncCapable;
+	ForeignAsyncConfigureWait_function ForeignAsyncConfigureWait;
+
 	ShutdownForeignScan_function ShutdownForeignScan;
 
 	/* Support functions for path reparameterization. */
diff --git a/src/include/nodes/bitmapset.h b/src/include/nodes/bitmapset.h
index 0c645628e5..d97a4c2235 100644
--- a/src/include/nodes/bitmapset.h
+++ b/src/include/nodes/bitmapset.h
@@ -107,6 +107,7 @@ extern Bitmapset *bms_add_members(Bitmapset *a, const Bitmapset *b);
 extern Bitmapset *bms_add_range(Bitmapset *a, int lower, int upper);
 extern Bitmapset *bms_int_members(Bitmapset *a, const Bitmapset *b);
 extern Bitmapset *bms_del_members(Bitmapset *a, const Bitmapset *b);
+extern Bitmapset *bms_del_range(Bitmapset *a, int lower, int upper);
 extern Bitmapset *bms_join(Bitmapset *a, Bitmapset *b);
 
 /* support for iterating through the integer elements of a set: */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 6eb647290b..46d7fbab3a 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -932,6 +932,12 @@ typedef TupleTableSlot *(*ExecProcNodeMtd) (struct PlanState *pstate);
  * abstract superclass for all PlanState-type nodes.
  * ----------------
  */
+typedef enum AsyncState
+{
+	AS_AVAILABLE,
+	AS_WAITING
+} AsyncState;
+
 typedef struct PlanState
 {
 	NodeTag		type;
@@ -1020,6 +1026,11 @@ typedef struct PlanState
 	bool		outeropsset;
 	bool		inneropsset;
 	bool		resultopsset;
+
+	/* Async subnode execution sutff */
+	AsyncState	asyncstate;
+
+	int32		padding;			/* to keep alignment of derived types */
 } PlanState;
 
 /* ----------------
@@ -1216,14 +1227,20 @@ struct AppendState
 	PlanState	ps;				/* its first field is NodeTag */
 	PlanState **appendplans;	/* array of PlanStates for my inputs */
 	int			as_nplans;
-	int			as_whichplan;
+	int			as_whichsyncplan; /* which sync plan is being executed  */
 	int			as_first_partial_plan;	/* Index of 'appendplans' containing
 										 * the first partial plan */
+	int			as_nasyncplans;	/* # of async-capable children */
 	ParallelAppendState *as_pstate; /* parallel coordination info */
 	Size		pstate_len;		/* size of parallel coordination info */
 	struct PartitionPruneState *as_prune_state;
 	Bitmapset  *as_valid_subplans;
 	bool		(*choose_next_subplan) (AppendState *);
+	bool		as_syncdone;	/* all synchronous plans done? */
+	Bitmapset  *as_needrequest;	/* async plans needing a new request */
+	Bitmapset  *as_pending_async;	/* pending async plans */
+	TupleTableSlot **as_asyncresult;	/* unreturned results of async plans */
+	int			as_nasyncresult;	/* # of valid entries in as_asyncresult */
 };
 
 /* ----------------
@@ -1786,6 +1803,7 @@ typedef struct ForeignScanState
 	Size		pscan_len;		/* size of parallel coordination information */
 	/* use struct pointer to avoid including fdwapi.h here */
 	struct FdwRoutine *fdwroutine;
+	bool		fs_async;
 	void	   *fdw_state;		/* foreign-data wrapper can keep state here */
 } ForeignScanState;
 
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index 8e6594e355..26810915e5 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -133,6 +133,11 @@ typedef struct Plan
 	bool		parallel_aware; /* engage parallel-aware logic? */
 	bool		parallel_safe;	/* OK to use as part of parallel plan? */
 
+	/*
+	 * information needed for asynchronous execution
+	 */
+	bool		async_capable;  /* engage asyncronous execution logic? */
+
 	/*
 	 * Common structural data for all Plan types.
 	 */
@@ -259,6 +264,10 @@ typedef struct Append
 
 	/* Info for run-time subplan pruning; NULL if we're not doing that */
 	struct PartitionPruneInfo *part_prune_info;
+
+	/* Async child node execution stuff */
+	int			nasyncplans;	/* # async subplans, always at start of list */
+	int			referent; 		/* index of inheritance tree referent */
 } Append;
 
 /* ----------------
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index fe076d823d..d57ef809fc 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -853,7 +853,8 @@ typedef enum
 	WAIT_EVENT_REPLICATION_ORIGIN_DROP,
 	WAIT_EVENT_REPLICATION_SLOT_DROP,
 	WAIT_EVENT_SAFE_SNAPSHOT,
-	WAIT_EVENT_SYNC_REP
+	WAIT_EVENT_SYNC_REP,
+	WAIT_EVENT_ASYNC_WAIT
 } WaitEventIPC;
 
 /* ----------
-- 
2.23.0

