From 75eec490d5fa5e7272066ab35bba30c8c00e87cf Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Mon, 17 Oct 2016 15:54:32 +0900
Subject: [PATCH 3/7] Modify async execution infrastructure.

---
 contrib/postgres_fdw/expected/postgres_fdw.out |  68 ++++++++--------
 contrib/postgres_fdw/postgres_fdw.c            |   5 +-
 src/backend/executor/execAsync.c               | 105 ++++++++++++++-----------
 src/backend/executor/nodeAppend.c              |  50 ++++++------
 src/backend/executor/nodeForeignscan.c         |   4 +-
 src/backend/nodes/copyfuncs.c                  |   1 +
 src/backend/nodes/outfuncs.c                   |   1 +
 src/backend/nodes/readfuncs.c                  |   1 +
 src/backend/optimizer/plan/createplan.c        |  24 +++++-
 src/backend/utils/adt/ruleutils.c              |   6 +-
 src/include/executor/nodeForeignscan.h         |   2 +-
 src/include/foreign/fdwapi.h                   |   2 +-
 src/include/nodes/execnodes.h                  |  10 ++-
 src/include/nodes/plannodes.h                  |   1 +
 14 files changed, 167 insertions(+), 113 deletions(-)

diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 1b36579..a98e138 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -6321,13 +6321,13 @@ insert into bar2 values(4,44,44);
 insert into bar2 values(7,77,77);
 explain (verbose, costs off)
 select * from bar where f1 in (select f1 from foo) for update;
-                                                       QUERY PLAN                                                       
-------------------------------------------------------------------------------------------------------------------------
+                                          QUERY PLAN                                          
+----------------------------------------------------------------------------------------------
  LockRows
-   Output: bar2.f1, bar2.f2, bar2.ctid, ((bar2.*)::bar), bar2.tableoid, foo2.ctid, ((foo2.*)::foo), foo2.tableoid
+   Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid, foo.ctid, foo.*, foo.tableoid
    ->  Hash Join
-         Output: bar2.f1, bar2.f2, bar2.ctid, ((bar2.*)::bar), bar2.tableoid, foo2.ctid, ((foo2.*)::foo), foo2.tableoid
-         Hash Cond: (bar2.f1 = foo2.f1)
+         Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid, foo.ctid, foo.*, foo.tableoid
+         Hash Cond: (bar.f1 = foo.f1)
          ->  Append
                ->  Foreign Scan on public.bar2
                      Output: bar2.f1, bar2.f2, bar2.ctid, bar2.*, bar2.tableoid
@@ -6335,10 +6335,10 @@ select * from bar where f1 in (select f1 from foo) for update;
                ->  Seq Scan on public.bar
                      Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid
          ->  Hash
-               Output: foo2.ctid, ((foo2.*)::foo), foo2.tableoid, foo2.f1
+               Output: foo.ctid, foo.*, foo.tableoid, foo.f1
                ->  HashAggregate
-                     Output: foo2.ctid, ((foo2.*)::foo), foo2.tableoid, foo2.f1
-                     Group Key: foo2.f1
+                     Output: foo.ctid, foo.*, foo.tableoid, foo.f1
+                     Group Key: foo.f1
                      ->  Append
                            ->  Foreign Scan on public.foo2
                                  Output: foo2.ctid, foo2.*, foo2.tableoid, foo2.f1
@@ -6358,13 +6358,13 @@ select * from bar where f1 in (select f1 from foo) for update;
 
 explain (verbose, costs off)
 select * from bar where f1 in (select f1 from foo) for share;
-                                                       QUERY PLAN                                                       
-------------------------------------------------------------------------------------------------------------------------
+                                          QUERY PLAN                                          
+----------------------------------------------------------------------------------------------
  LockRows
-   Output: bar2.f1, bar2.f2, bar2.ctid, ((bar2.*)::bar), bar2.tableoid, foo2.ctid, ((foo2.*)::foo), foo2.tableoid
+   Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid, foo.ctid, foo.*, foo.tableoid
    ->  Hash Join
-         Output: bar2.f1, bar2.f2, bar2.ctid, ((bar2.*)::bar), bar2.tableoid, foo2.ctid, ((foo2.*)::foo), foo2.tableoid
-         Hash Cond: (bar2.f1 = foo2.f1)
+         Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid, foo.ctid, foo.*, foo.tableoid
+         Hash Cond: (bar.f1 = foo.f1)
          ->  Append
                ->  Foreign Scan on public.bar2
                      Output: bar2.f1, bar2.f2, bar2.ctid, bar2.*, bar2.tableoid
@@ -6372,10 +6372,10 @@ select * from bar where f1 in (select f1 from foo) for share;
                ->  Seq Scan on public.bar
                      Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid
          ->  Hash
-               Output: foo2.ctid, ((foo2.*)::foo), foo2.tableoid, foo2.f1
+               Output: foo.ctid, foo.*, foo.tableoid, foo.f1
                ->  HashAggregate
-                     Output: foo2.ctid, ((foo2.*)::foo), foo2.tableoid, foo2.f1
-                     Group Key: foo2.f1
+                     Output: foo.ctid, foo.*, foo.tableoid, foo.f1
+                     Group Key: foo.f1
                      ->  Append
                            ->  Foreign Scan on public.foo2
                                  Output: foo2.ctid, foo2.*, foo2.tableoid, foo2.f1
@@ -6396,22 +6396,22 @@ select * from bar where f1 in (select f1 from foo) for share;
 -- Check UPDATE with inherited target and an inherited source table
 explain (verbose, costs off)
 update bar set f2 = f2 + 100 where f1 in (select f1 from foo);
-                                               QUERY PLAN                                                
----------------------------------------------------------------------------------------------------------
+                                         QUERY PLAN                                          
+---------------------------------------------------------------------------------------------
  Update on public.bar
    Update on public.bar
    Foreign Update on public.bar2
      Remote SQL: UPDATE public.loct2 SET f2 = $2 WHERE ctid = $1
    ->  Hash Join
-         Output: bar.f1, (bar.f2 + 100), bar.ctid, foo2.ctid, ((foo2.*)::foo), foo2.tableoid
-         Hash Cond: (bar.f1 = foo2.f1)
+         Output: bar.f1, (bar.f2 + 100), bar.ctid, foo.ctid, foo.*, foo.tableoid
+         Hash Cond: (bar.f1 = foo.f1)
          ->  Seq Scan on public.bar
                Output: bar.f1, bar.f2, bar.ctid
          ->  Hash
-               Output: foo2.ctid, ((foo2.*)::foo), foo2.tableoid, foo2.f1
+               Output: foo.ctid, foo.*, foo.tableoid, foo.f1
                ->  HashAggregate
-                     Output: foo2.ctid, ((foo2.*)::foo), foo2.tableoid, foo2.f1
-                     Group Key: foo2.f1
+                     Output: foo.ctid, foo.*, foo.tableoid, foo.f1
+                     Group Key: foo.f1
                      ->  Append
                            ->  Foreign Scan on public.foo2
                                  Output: foo2.ctid, foo2.*, foo2.tableoid, foo2.f1
@@ -6419,16 +6419,16 @@ update bar set f2 = f2 + 100 where f1 in (select f1 from foo);
                            ->  Seq Scan on public.foo
                                  Output: foo.ctid, foo.*, foo.tableoid, foo.f1
    ->  Hash Join
-         Output: bar2.f1, (bar2.f2 + 100), bar2.f3, bar2.ctid, foo2.ctid, ((foo2.*)::foo), foo2.tableoid
-         Hash Cond: (bar2.f1 = foo2.f1)
+         Output: bar2.f1, (bar2.f2 + 100), bar2.f3, bar2.ctid, foo.ctid, foo.*, foo.tableoid
+         Hash Cond: (bar2.f1 = foo.f1)
          ->  Foreign Scan on public.bar2
                Output: bar2.f1, bar2.f2, bar2.f3, bar2.ctid
                Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 FOR UPDATE
          ->  Hash
-               Output: foo2.ctid, ((foo2.*)::foo), foo2.tableoid, foo2.f1
+               Output: foo.ctid, foo.*, foo.tableoid, foo.f1
                ->  HashAggregate
-                     Output: foo2.ctid, ((foo2.*)::foo), foo2.tableoid, foo2.f1
-                     Group Key: foo2.f1
+                     Output: foo.ctid, foo.*, foo.tableoid, foo.f1
+                     Group Key: foo.f1
                      ->  Append
                            ->  Foreign Scan on public.foo2
                                  Output: foo2.ctid, foo2.*, foo2.tableoid, foo2.f1
@@ -6462,8 +6462,8 @@ where bar.f1 = ss.f1;
    Foreign Update on public.bar2
      Remote SQL: UPDATE public.loct2 SET f2 = $2 WHERE ctid = $1
    ->  Hash Join
-         Output: bar.f1, (bar.f2 + 100), bar.ctid, (ROW(foo2.f1))
-         Hash Cond: (foo2.f1 = bar.f1)
+         Output: bar.f1, (bar.f2 + 100), bar.ctid, (ROW(foo.f1))
+         Hash Cond: (foo.f1 = bar.f1)
          ->  Append
                ->  Foreign Scan on public.foo2
                      Output: ROW(foo2.f1), foo2.f1
@@ -6480,8 +6480,8 @@ where bar.f1 = ss.f1;
                ->  Seq Scan on public.bar
                      Output: bar.f1, bar.f2, bar.ctid
    ->  Merge Join
-         Output: bar2.f1, (bar2.f2 + 100), bar2.f3, bar2.ctid, (ROW(foo2.f1))
-         Merge Cond: (bar2.f1 = foo2.f1)
+         Output: bar2.f1, (bar2.f2 + 100), bar2.f3, bar2.ctid, (ROW(foo.f1))
+         Merge Cond: (bar2.f1 = foo.f1)
          ->  Sort
                Output: bar2.f1, bar2.f2, bar2.f3, bar2.ctid
                Sort Key: bar2.f1
@@ -6489,8 +6489,8 @@ where bar.f1 = ss.f1;
                      Output: bar2.f1, bar2.f2, bar2.f3, bar2.ctid
                      Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 FOR UPDATE
          ->  Sort
-               Output: (ROW(foo2.f1)), foo2.f1
-               Sort Key: foo2.f1
+               Output: (ROW(foo.f1)), foo.f1
+               Sort Key: foo.f1
                ->  Append
                      ->  Foreign Scan on public.foo2
                            Output: ROW(foo2.f1), foo2.f1
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index e75f8a1..830212f 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -354,7 +354,7 @@ static void postgresGetForeignUpperPaths(PlannerInfo *root,
 static bool postgresIsForeignPathAsyncCapable(ForeignPath *path);
 static void postgresForeignAsyncRequest(EState *estate,
 							PendingAsyncRequest *areq);
-static void postgresForeignAsyncConfigureWait(EState *estate,
+static bool postgresForeignAsyncConfigureWait(EState *estate,
 								  PendingAsyncRequest *areq,
 								  bool reinit);
 static void postgresForeignAsyncNotify(EState *estate,
@@ -4479,11 +4479,12 @@ postgresForeignAsyncRequest(EState *estate, PendingAsyncRequest *areq)
 	ExecAsyncRequestDone(estate, areq, (Node *) slot);
 }
 
-static void
+static bool
 postgresForeignAsyncConfigureWait(EState *estate, PendingAsyncRequest *areq,
 								  bool reinit)
 {
 	elog(ERROR, "postgresForeignAsyncConfigureWait");
+	return false;
 }
 
 static void
diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c
index e070c26..33496a9 100644
--- a/src/backend/executor/execAsync.c
+++ b/src/backend/executor/execAsync.c
@@ -22,7 +22,7 @@
 #include "storage/latch.h"
 
 static bool ExecAsyncEventWait(EState *estate, long timeout);
-static void ExecAsyncConfigureWait(EState *estate, PendingAsyncRequest *areq,
+static bool ExecAsyncConfigureWait(EState *estate, PendingAsyncRequest *areq,
 	bool reinit);
 static void ExecAsyncNotify(EState *estate, PendingAsyncRequest *areq);
 static void ExecAsyncResponse(EState *estate, PendingAsyncRequest *areq);
@@ -43,7 +43,7 @@ ExecAsyncRequest(EState *estate, PlanState *requestor, int request_index,
 				 PlanState *requestee)
 {
 	PendingAsyncRequest *areq = NULL;
-	int		i = estate->es_num_pending_async;
+	int		nasync = estate->es_num_pending_async;
 
 	/*
 	 * If the number of pending asynchronous nodes exceeds the number of
@@ -51,7 +51,7 @@ ExecAsyncRequest(EState *estate, PlanState *requestor, int request_index,
 	 * We start with 16 slots, and thereafter double the array size each
 	 * time we run out of slots.
 	 */
-	if (i >= estate->es_max_pending_async)
+	if (nasync >= estate->es_max_pending_async)
 	{
 		int	newmax;
 
@@ -81,25 +81,28 @@ ExecAsyncRequest(EState *estate, PlanState *requestor, int request_index,
 	 * PendingAsyncRequest if there is one.  If not, we must allocate a new
 	 * one.
 	 */
-	if (estate->es_pending_async[i] == NULL)
+	if (estate->es_pending_async[nasync] == NULL)
 	{
 		areq = MemoryContextAllocZero(estate->es_query_cxt,
 									  sizeof(PendingAsyncRequest));
-		estate->es_pending_async[i] = areq;
+		estate->es_pending_async[nasync] = areq;
 	}
 	else
 	{
-		areq = estate->es_pending_async[i];
+		areq = estate->es_pending_async[nasync];
 		MemSet(areq, 0, sizeof(PendingAsyncRequest));
 	}
-	areq->myindex = estate->es_num_pending_async++;
+	areq->myindex = estate->es_num_pending_async;
 
 	/* Initialize the new request. */
 	areq->requestor = requestor;
 	areq->request_index = request_index;
 	areq->requestee = requestee;
 
-	/* Give the requestee a chance to do whatever it wants. */
+	/*
+	 * Give the requestee a chance to do whatever it wants.
+	 * Requst functions return true if a result is immediately available.
+	 */
 	switch (nodeTag(requestee))
 	{
 		case T_ForeignScanState:
@@ -110,6 +113,20 @@ ExecAsyncRequest(EState *estate, PlanState *requestor, int request_index,
 			elog(ERROR, "unrecognized node type: %d",
 				(int) nodeTag(requestee));
 	}
+
+	/*
+	 * If a result is available, complete it immediately.
+	 */
+	if (areq->state == ASYNC_COMPLETE)
+	{
+		Assert(areq->result == NULL || IsA(areq->result, TupleTableSlot));
+		ExecAsyncResponse(estate, areq);
+
+		return;
+	}
+
+	/* No result available now, make this node pending */
+	estate->es_num_pending_async++;
 }
 
 /*
@@ -175,22 +192,19 @@ ExecAsyncEventLoop(EState *estate, PlanState *requestor, long timeout)
 		{
 			PendingAsyncRequest *areq = estate->es_pending_async[i];
 
-			/* Skip it if no callback is pending. */
-			if (!areq->callback_pending)
-				continue;
-
-			/*
-			 * Mark it as no longer needing a callback.  We must do this
-			 * before dispatching the callback in case the callback resets
-			 * the flag.
-			 */
-			areq->callback_pending = false;
-			estate->es_async_callback_pending--;
-
-			/* Perform the actual callback; set request_done if appropraite. */
-			if (!areq->request_complete)
+			/* Skip it if not pending. */
+			if (areq->state == ASYNC_CALLBACK_PENDING)
+			{
+				/*
+				 * Mark it as no longer needing a callback.  We must do this
+				 * before dispatching the callback in case the callback resets
+				 * the flag.
+				 */
+				estate->es_async_callback_pending--;
 				ExecAsyncNotify(estate, areq);
-			else
+			}
+
+			if (areq->state == ASYNC_COMPLETE)
 			{
 				any_node_done = true;
 				if (requestor == areq->requestor)
@@ -214,7 +228,7 @@ ExecAsyncEventLoop(EState *estate, PlanState *requestor, long timeout)
 				PendingAsyncRequest *head;
 				PendingAsyncRequest *tail = estate->es_pending_async[tidx];
 
-				if (!tail->callback_pending && tail->request_complete)
+				if (tail->state == ASYNC_COMPLETE)
 					continue;
 				head = estate->es_pending_async[hidx];
 				estate->es_pending_async[tidx] = head;
@@ -247,7 +261,8 @@ ExecAsyncEventLoop(EState *estate, PlanState *requestor, long timeout)
  * means wait forever, 0 means don't wait at all, and >0 means wait for the
  * indicated number of milliseconds.
  *
- * Returns true if we found some events and false if we timed out.
+ * Returns true if we found some events and false if we timed out or there's
+ * no event to wait. The latter is occur when the areq is processed during
  */
 static bool
 ExecAsyncEventWait(EState *estate, long timeout)
@@ -258,6 +273,7 @@ ExecAsyncEventWait(EState *estate, long timeout)
 	int		n;
 	bool	reinit = false;
 	bool	process_latch_set = false;
+	bool	added = false;
 
 	if (estate->es_wait_event_set == NULL)
 	{
@@ -282,13 +298,16 @@ ExecAsyncEventWait(EState *estate, long timeout)
 		PendingAsyncRequest *areq = estate->es_pending_async[i];
 
 		if (areq->num_fd_events > 0)
-			ExecAsyncConfigureWait(estate, areq, reinit);
+			added |= ExecAsyncConfigureWait(estate, areq, reinit);
 	}
 
+	Assert(added);
+
 	/* Wait for at least one event to occur. */
 	noccurred = WaitEventSetWait(estate->es_wait_event_set, timeout,
 								 occurred_event, EVENT_BUFFER_SIZE,
 								 WAIT_EVENT_ASYNC_WAIT);
+
 	if (noccurred == 0)
 		return false;
 
@@ -312,12 +331,10 @@ ExecAsyncEventWait(EState *estate, long timeout)
 		{
 			PendingAsyncRequest *areq = w->user_data;
 
-			if (!areq->callback_pending)
-			{
-				Assert(!areq->request_complete);
-				areq->callback_pending = true;
-				estate->es_async_callback_pending++;
-			}
+			Assert(areq->state == ASYNC_WAITING);
+
+			areq->state = ASYNC_CALLBACK_PENDING;
+			estate->es_async_callback_pending++;
 		}
 	}
 
@@ -333,8 +350,8 @@ ExecAsyncEventWait(EState *estate, long timeout)
 
 			if (areq->wants_process_latch)
 			{
-				Assert(!areq->request_complete);
-				areq->callback_pending = true;
+				Assert(areq->state == ASYNC_WAITING);
+				areq->state = ASYNC_CALLBACK_PENDING;
 			}
 		}
 	}
@@ -352,15 +369,19 @@ ExecAsyncEventWait(EState *estate, long timeout)
  * The events should include only WL_SOCKET_READABLE or WL_SOCKET_WRITEABLE,
  * and the number of calls should not exceed areq->num_fd_events (as
  * prevously set via ExecAsyncSetRequiredEvents).
+ *
+ * Individual requests can omit registering an event but it is a
+ * responsibility of the node driver to set at least one event per one
+ * requestor.
  */
-static void
+static bool
 ExecAsyncConfigureWait(EState *estate, PendingAsyncRequest *areq,
 					   bool reinit)
 {
 	switch (nodeTag(areq->requestee))
 	{
 		case T_ForeignScanState:
-			ExecAsyncForeignScanConfigureWait(estate, areq, reinit);
+			return ExecAsyncForeignScanConfigureWait(estate, areq, reinit);
 			break;
 		default:
 			elog(ERROR, "unrecognized node type: %d",
@@ -419,6 +440,7 @@ ExecAsyncSetRequiredEvents(EState *estate, PendingAsyncRequest *areq,
 	estate->es_total_fd_events += num_fd_events - areq->num_fd_events;
 	areq->num_fd_events = num_fd_events;
 	areq->wants_process_latch = wants_process_latch;
+	areq->state = ASYNC_WAITING;
 
 	if (force_reset && estate->es_wait_event_set != NULL)
 	{
@@ -448,17 +470,12 @@ ExecAsyncRequestDone(EState *estate, PendingAsyncRequest *areq, Node *result)
 	 * need a callback to remove registered wait events.  It's not clear
 	 * that we would come out ahead, so use brute force for now.
 	 */
+	Assert(areq->state == ASYNC_IDLE || areq->state == ASYNC_CALLBACK_PENDING);
+
 	if (areq->num_fd_events > 0 || areq->wants_process_latch)
 		ExecAsyncSetRequiredEvents(estate, areq, 0, false, true);
 
 	/* Save result and mark request as complete. */
 	areq->result = result;
-	areq->request_complete = true;
-
-	/* Make sure this request is flagged for a callback. */
-	if (!areq->callback_pending)
-	{
-		areq->callback_pending = true;
-		estate->es_async_callback_pending++;
-	}
+	areq->state = ASYNC_COMPLETE;
 }
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index bb06569..c234f1f 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -229,9 +229,15 @@ ExecAppend(AppendState *node)
 		 */
 		while ((i = bms_first_member(node->as_needrequest)) >= 0)
 		{
-			ExecAsyncRequest(estate, &node->ps, i, node->appendplans[i]);
 			node->as_nasyncpending++;
+
+			ExecAsyncRequest(estate, &node->ps, i, node->appendplans[i]);
+			/* If this request immediately gives a result, take it. */
+			if (node->as_nasyncresult > 0)
+				return node->as_asyncresult[--node->as_nasyncresult];
 		}
+		if (node->as_nasyncpending == 0 && node->as_syncdone)
+				return ExecClearTuple(node->ps.ps_ResultTupleSlot);
 	}
 
 	for (;;)
@@ -246,32 +252,32 @@ ExecAppend(AppendState *node)
 		{
 			long	timeout = node->as_syncdone ? -1 : 0;
 
-			for (;;)
+			while (node->as_nasyncpending > 0)
 			{
-				if (node->as_nasyncpending == 0)
-				{
-					/*
-					 * If there is no asynchronous activity still pending
-					 * and the synchronous activity is also complete, we're
-					 * totally done scanning this node.  Otherwise, we're
-					 * done with the asynchronous stuff but must continue
-					 * scanning the synchronous children.
-					 */
-					if (node->as_syncdone)
-						return ExecClearTuple(node->ps.ps_ResultTupleSlot);
-					break;
-				}
-				if (!ExecAsyncEventLoop(node->ps.state, &node->ps, timeout))
-				{
-					/* Timeout reached. */
-					break;
-				}
-				if (node->as_nasyncresult > 0)
+				if (ExecAsyncEventLoop(node->ps.state, &node->ps, timeout) &&
+					node->as_nasyncresult > 0)
 				{
 					/* Asynchronous subplan returned a tuple! */
 					--node->as_nasyncresult;
 					return node->as_asyncresult[node->as_nasyncresult];
 				}
+
+				/* Timeout reached. Go through to sync nodes if exists */
+				if (!node->as_syncdone)
+					break;
+			}
+
+			/*
+			 * If there is no asynchronous activity still pending and the
+			 * synchronous activity is also complete, we're totally done
+			 * scanning this node.  Otherwise, we're done with the
+			 * asynchronous stuff but must continue scanning the synchronous
+			 * children.
+			 */
+			if (node->as_syncdone)
+			{
+				Assert(node->as_nasyncpending == 0);
+				return ExecClearTuple(node->ps.ps_ResultTupleSlot);
 			}
 		}
 
@@ -397,7 +403,7 @@ ExecAsyncAppendResponse(EState *estate, PendingAsyncRequest *areq)
 	TupleTableSlot *slot;
 
 	/* We shouldn't be called until the request is complete. */
-	Assert(areq->request_complete);
+	Assert(areq->state == ASYNC_COMPLETE);
 
 	/* Our result slot shouldn't already be occupied. */
 	Assert(TupIsNull(node->ps.ps_ResultTupleSlot));
diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c
index 85d436f..d3567bb 100644
--- a/src/backend/executor/nodeForeignscan.c
+++ b/src/backend/executor/nodeForeignscan.c
@@ -378,7 +378,7 @@ ExecAsyncForeignScanRequest(EState *estate, PendingAsyncRequest *areq)
  *		In async mode, configure for a wait
  * ----------------------------------------------------------------
  */
-void
+bool
 ExecAsyncForeignScanConfigureWait(EState *estate,
 	PendingAsyncRequest *areq, bool reinit)
 {
@@ -386,7 +386,7 @@ ExecAsyncForeignScanConfigureWait(EState *estate,
 	FdwRoutine *fdwroutine = node->fdwroutine;
 
 	Assert(fdwroutine->ForeignAsyncConfigureWait != NULL);
-	fdwroutine->ForeignAsyncConfigureWait(estate, areq, reinit);
+	return fdwroutine->ForeignAsyncConfigureWait(estate, areq, reinit);
 }
 
 /* ----------------------------------------------------------------
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 23b4e18..72d8cd6 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -219,6 +219,7 @@ _copyAppend(const Append *from)
 	 */
 	COPY_NODE_FIELD(appendplans);
 	COPY_SCALAR_FIELD(nasyncplans);
+	COPY_SCALAR_FIELD(referent);
 
 	return newnode;
 }
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index dc5b938..1ebdc48 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -360,6 +360,7 @@ _outAppend(StringInfo str, const Append *node)
 
 	WRITE_NODE_FIELD(appendplans);
 	WRITE_INT_FIELD(nasyncplans);
+	WRITE_INT_FIELD(referent);
 }
 
 static void
diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c
index 69453b5..8443a62 100644
--- a/src/backend/nodes/readfuncs.c
+++ b/src/backend/nodes/readfuncs.c
@@ -1520,6 +1520,7 @@ _readAppend(void)
 
 	READ_NODE_FIELD(appendplans);
 	READ_INT_FIELD(nasyncplans);
+	READ_INT_FIELD(referent);
 
 	READ_DONE();
 }
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index 7caa8d3..ff1d663 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -193,7 +193,8 @@ static CteScan *make_ctescan(List *qptlist, List *qpqual,
 			 Index scanrelid, int ctePlanId, int cteParam);
 static WorkTableScan *make_worktablescan(List *qptlist, List *qpqual,
 				   Index scanrelid, int wtParam);
-static Append *make_append(List *asyncplans, int nasyncplans, List *tlist);
+static Append *make_append(List *asyncplans, int nasyncplans,
+						   int referent, List *tlist);
 static RecursiveUnion *make_recursive_union(List *tlist,
 					 Plan *lefttree,
 					 Plan *righttree,
@@ -960,6 +961,8 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path)
 	List	   *syncplans = NIL;
 	ListCell   *subpaths;
 	int			nasyncplans = 0;
+	bool		first = true;
+	bool		referent_is_sync = true;
 
 	/*
 	 * The subpaths list could be empty, if every child was proven empty by
@@ -985,7 +988,14 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path)
 		return plan;
 	}
 
-	/* Build the plan for each child */
+	/*
+	 * Build the plan for each child
+
+	 * The first child in an inheritance set is the representative in
+	 * explaining tlist entries (see set_deparse_planstate). We should keep
+	 * the first child in best_path->subpaths at the head of the subplan list
+	 * for the reason.
+	 */
 	foreach(subpaths, best_path->subpaths)
 	{
 		Path	   *subpath = (Path *) lfirst(subpaths);
@@ -999,9 +1009,13 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path)
 		{
 			asyncplans = lappend(asyncplans, subplan);
 			++nasyncplans;
+			if (first)
+				referent_is_sync = false;
 		}
 		else
 			syncplans = lappend(syncplans, subplan);
+
+		first = false;
 	}
 
 	/*
@@ -1011,7 +1025,8 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path)
 	 * parent-rel Vars it'll be asked to emit.
 	 */
 
-	plan = make_append(list_concat(asyncplans, syncplans), nasyncplans, tlist);
+	plan = make_append(list_concat(asyncplans, syncplans), nasyncplans,
+					   referent_is_sync ? nasyncplans : 0, tlist);
 
 	copy_generic_path_info(&plan->plan, (Path *) best_path);
 
@@ -4951,7 +4966,7 @@ make_foreignscan(List *qptlist,
 }
 
 static Append *
-make_append(List *appendplans, int nasyncplans, List *tlist)
+make_append(List *appendplans, int nasyncplans,	int referent, List *tlist)
 {
 	Append	   *node = makeNode(Append);
 	Plan	   *plan = &node->plan;
@@ -4962,6 +4977,7 @@ make_append(List *appendplans, int nasyncplans, List *tlist)
 	plan->righttree = NULL;
 	node->appendplans = appendplans;
 	node->nasyncplans = nasyncplans;
+	node->referent = referent;
 
 	return node;
 }
diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c
index 8a81d7a..de0e96c 100644
--- a/src/backend/utils/adt/ruleutils.c
+++ b/src/backend/utils/adt/ruleutils.c
@@ -4056,7 +4056,11 @@ set_deparse_planstate(deparse_namespace *dpns, PlanState *ps)
 	 * lists containing references to non-target relations.
 	 */
 	if (IsA(ps, AppendState))
-		dpns->outer_planstate = ((AppendState *) ps)->appendplans[0];
+	{
+		int idx = ((Append*)(((AppendState *) ps)->ps.plan))->referent;
+		dpns->outer_planstate =
+			((AppendState *) ps)->appendplans[idx];
+	}
 	else if (IsA(ps, MergeAppendState))
 		dpns->outer_planstate = ((MergeAppendState *) ps)->mergeplans[0];
 	else if (IsA(ps, ModifyTableState))
diff --git a/src/include/executor/nodeForeignscan.h b/src/include/executor/nodeForeignscan.h
index 3e69ab0..47a3920 100644
--- a/src/include/executor/nodeForeignscan.h
+++ b/src/include/executor/nodeForeignscan.h
@@ -31,7 +31,7 @@ extern void ExecForeignScanInitializeWorker(ForeignScanState *node,
 
 extern void ExecAsyncForeignScanRequest(EState *estate,
 	PendingAsyncRequest *areq);
-extern void ExecAsyncForeignScanConfigureWait(EState *estate,
+extern bool ExecAsyncForeignScanConfigureWait(EState *estate,
 	PendingAsyncRequest *areq, bool reinit);
 extern void ExecAsyncForeignScanNotify(EState *estate,
 	PendingAsyncRequest *areq);
diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h
index 88feb9a..65517fd 100644
--- a/src/include/foreign/fdwapi.h
+++ b/src/include/foreign/fdwapi.h
@@ -158,7 +158,7 @@ typedef bool (*IsForeignScanParallelSafe_function) (PlannerInfo *root,
 typedef bool (*IsForeignPathAsyncCapable_function) (ForeignPath *path);
 typedef void (*ForeignAsyncRequest_function) (EState *estate,
 											PendingAsyncRequest *areq);
-typedef void (*ForeignAsyncConfigureWait_function) (EState *estate,
+typedef bool (*ForeignAsyncConfigureWait_function) (EState *estate,
 											PendingAsyncRequest *areq,
 											bool reinit);
 typedef void (*ForeignAsyncNotify_function) (EState *estate,
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index b50b41c..0c6af86 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -352,6 +352,13 @@ typedef struct ResultRelInfo
  * State for an asynchronous tuple request.
  * ----------------
  */
+typedef enum AsyncRequestState
+{
+	ASYNC_IDLE,
+	ASYNC_WAITING,
+	ASYNC_CALLBACK_PENDING,
+	ASYNC_COMPLETE
+} AsyncRequestState;
 typedef struct PendingAsyncRequest
 {
 	int			myindex;			/* Index in es_pending_async. */
@@ -360,8 +367,7 @@ typedef struct PendingAsyncRequest
 	int			request_index;	/* Scratch space for requestor. */
 	int			num_fd_events;	/* Max number of FD events requestee needs. */
 	bool		wants_process_latch;	/* Requestee cares about MyLatch. */
-	bool		callback_pending;			/* Callback is needed. */
-	bool		request_complete;		/* Request complete, result valid. */
+	AsyncRequestState state;
 	Node	   *result;			/* Result (NULL if no more tuples). */
 } PendingAsyncRequest;
 
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index 327119b..1df6693 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -209,6 +209,7 @@ typedef struct Append
 	Plan		plan;
 	List	   *appendplans;
 	int			nasyncplans;	/* # of async plans, always at start of list */
+	int			referent; 		/* index of inheritance tree referent */
 } Append;
 
 /* ----------------
-- 
2.9.2

