From 1337546fee26e5a80372c090acebc8bc53de3508 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Mon, 17 Oct 2016 16:00:56 +0900
Subject: [PATCH 4/4] Make postgres_fdw async-capable

---
 contrib/postgres_fdw/connection.c              |  79 ++--
 contrib/postgres_fdw/expected/postgres_fdw.out |  64 ++--
 contrib/postgres_fdw/postgres_fdw.c            | 483 +++++++++++++++++++++----
 contrib/postgres_fdw/postgres_fdw.h            |   2 +
 contrib/postgres_fdw/sql/postgres_fdw.sql      |   4 +-
 src/backend/executor/execProcnode.c            |   9 +
 src/include/foreign/fdwapi.h                   |   2 +
 7 files changed, 510 insertions(+), 133 deletions(-)

diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index bcdddc2..ebc9417 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -49,6 +49,7 @@ typedef struct ConnCacheEntry
 								 * one level of subxact open, etc */
 	bool		have_prep_stmt; /* have we prepared any stmts in this xact? */
 	bool		have_error;		/* have any subxacts aborted in this xact? */
+	void		*storage;		/* connection specific storage */
 } ConnCacheEntry;
 
 /*
@@ -64,6 +65,7 @@ static unsigned int prep_stmt_number = 0;
 static bool xact_got_connection = false;
 
 /* prototypes of private functions */
+static ConnCacheEntry *get_connection_entry(Oid umid);
 static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
 static void check_conn_params(const char **keywords, const char **values);
 static void configure_remote_session(PGconn *conn);
@@ -75,26 +77,12 @@ static void pgfdw_subxact_callback(SubXactEvent event,
 					   SubTransactionId parentSubid,
 					   void *arg);
 
-
 /*
- * Get a PGconn which can be used to execute queries on the remote PostgreSQL
- * server with the user's authorization.  A new connection is established
- * if we don't already have a suitable one, and a transaction is opened at
- * the right subtransaction nesting depth if we didn't do that already.
- *
- * will_prep_stmt must be true if caller intends to create any prepared
- * statements.  Since those don't go away automatically at transaction end
- * (not even on error), we need this flag to cue manual cleanup.
- *
- * XXX Note that caching connections theoretically requires a mechanism to
- * detect change of FDW objects to invalidate already established connections.
- * We could manage that by watching for invalidation events on the relevant
- * syscaches.  For the moment, though, it's not clear that this would really
- * be useful and not mere pedantry.  We could not flush any active connections
- * mid-transaction anyway.
+ * Common function to acquire or create a connection cache entry.
  */
-PGconn *
-GetConnection(UserMapping *user, bool will_prep_stmt)
+
+static ConnCacheEntry *
+get_connection_entry(Oid umid)
 {
 	bool		found;
 	ConnCacheEntry *entry;
@@ -122,11 +110,8 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
 		RegisterSubXactCallback(pgfdw_subxact_callback, NULL);
 	}
 
-	/* Set flag that we did GetConnection during the current transaction */
-	xact_got_connection = true;
-
 	/* Create hash key for the entry.  Assume no pad bytes in key struct */
-	key = user->umid;
+	key = umid;
 
 	/*
 	 * Find or create cached entry for requested connection.
@@ -139,8 +124,39 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
 		entry->xact_depth = 0;
 		entry->have_prep_stmt = false;
 		entry->have_error = false;
+		entry->storage = NULL;
 	}
 
+	return entry;
+}
+
+/*
+ * Get a PGconn which can be used to execute queries on the remote PostgreSQL
+ * server with the user's authorization.  A new connection is established
+ * if we don't already have a suitable one, and a transaction is opened at
+ * the right subtransaction nesting depth if we didn't do that already.
+ *
+ * will_prep_stmt must be true if caller intends to create any prepared
+ * statements.  Since those don't go away automatically at transaction end
+ * (not even on error), we need this flag to cue manual cleanup.
+ *
+ * XXX Note that caching connections theoretically requires a mechanism to
+ * detect change of FDW objects to invalidate already established connections.
+ * We could manage that by watching for invalidation events on the relevant
+ * syscaches.  For the moment, though, it's not clear that this would really
+ * be useful and not mere pedantry.  We could not flush any active connections
+ * mid-transaction anyway.
+ */
+PGconn *
+GetConnection(UserMapping *user, bool will_prep_stmt)
+{
+	ConnCacheEntry *entry;
+
+	/* Set flag that we did GetConnection during the current transaction */
+	xact_got_connection = true;
+
+	entry = get_connection_entry(user->umid);
+
 	/*
 	 * We don't check the health of cached connection here, because it would
 	 * require some overhead.  Broken connection will be detected when the
@@ -177,6 +193,25 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
 }
 
 /*
+ * Rerturns the connection specific storage for this user. Allocate with
+ * initsize if not exists.
+ */
+void *
+GetConnectionSpecificStorage(UserMapping *user, size_t initsize)
+{
+	ConnCacheEntry *entry;
+
+	entry = get_connection_entry(user->umid);
+	if (entry->storage == NULL)
+	{
+		entry->storage = MemoryContextAlloc(CacheMemoryContext, initsize);
+		memset(entry->storage, 0, initsize);
+	}
+
+	return entry->storage;
+}
+
+/*
  * Connect to remote server using specified server and user mapping properties.
  */
 static PGconn *
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index d429790..a53fff4 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -5082,12 +5082,12 @@ INSERT INTO b(aa) VALUES('bbbbb');
 SELECT tableoid::regclass, * FROM a;
  tableoid |  aa   
 ----------+-------
- b        | bbb
- b        | bbbb
- b        | bbbbb
  a        | aaa
  a        | aaaa
  a        | aaaaa
+ b        | bbb
+ b        | bbbb
+ b        | bbbbb
 (6 rows)
 
 SELECT tableoid::regclass, * FROM b;
@@ -5110,12 +5110,12 @@ UPDATE a SET aa = 'zzzzzz' WHERE aa LIKE 'aaaa%';
 SELECT tableoid::regclass, * FROM a;
  tableoid |   aa   
 ----------+--------
- b        | bbb
- b        | bbbb
- b        | bbbbb
  a        | aaa
  a        | zzzzzz
  a        | zzzzzz
+ b        | bbb
+ b        | bbbb
+ b        | bbbbb
 (6 rows)
 
 SELECT tableoid::regclass, * FROM b;
@@ -5138,12 +5138,12 @@ UPDATE b SET aa = 'new';
 SELECT tableoid::regclass, * FROM a;
  tableoid |   aa   
 ----------+--------
- b        | new
- b        | new
- b        | new
  a        | aaa
  a        | zzzzzz
  a        | zzzzzz
+ b        | new
+ b        | new
+ b        | new
 (6 rows)
 
 SELECT tableoid::regclass, * FROM b;
@@ -5166,12 +5166,12 @@ UPDATE a SET aa = 'newtoo';
 SELECT tableoid::regclass, * FROM a;
  tableoid |   aa   
 ----------+--------
- b        | newtoo
- b        | newtoo
- b        | newtoo
  a        | newtoo
  a        | newtoo
  a        | newtoo
+ b        | newtoo
+ b        | newtoo
+ b        | newtoo
 (6 rows)
 
 SELECT tableoid::regclass, * FROM b;
@@ -5259,9 +5259,9 @@ select * from bar where f1 in (select f1 from foo) for update;
 select * from bar where f1 in (select f1 from foo) for update;
  f1 | f2 
 ----+----
+  1 | 11
   3 | 33
   4 | 44
-  1 | 11
   2 | 22
 (4 rows)
 
@@ -5296,9 +5296,9 @@ select * from bar where f1 in (select f1 from foo) for share;
 select * from bar where f1 in (select f1 from foo) for share;
  f1 | f2 
 ----+----
+  1 | 11
   3 | 33
   4 | 44
-  1 | 11
   2 | 22
 (4 rows)
 
@@ -5561,27 +5561,33 @@ delete from foo where f1 < 5 returning *;
 (5 rows)
 
 explain (verbose, costs off)
-update bar set f2 = f2 + 100 returning *;
-                                  QUERY PLAN                                  
-------------------------------------------------------------------------------
- Update on public.bar
-   Output: bar.f1, bar.f2
-   Update on public.bar
-   Foreign Update on public.bar2
-   ->  Seq Scan on public.bar
-         Output: bar.f1, (bar.f2 + 100), bar.ctid
-   ->  Foreign Update on public.bar2
-         Remote SQL: UPDATE public.loct2 SET f2 = (f2 + 100) RETURNING f1, f2
-(8 rows)
+with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1;
+                                      QUERY PLAN                                      
+--------------------------------------------------------------------------------------
+ Sort
+   Output: u.f1, u.f2
+   Sort Key: u.f1
+   CTE u
+     ->  Update on public.bar
+           Output: bar.f1, bar.f2
+           Update on public.bar
+           Foreign Update on public.bar2
+           ->  Seq Scan on public.bar
+                 Output: bar.f1, (bar.f2 + 100), bar.ctid
+           ->  Foreign Update on public.bar2
+                 Remote SQL: UPDATE public.loct2 SET f2 = (f2 + 100) RETURNING f1, f2
+   ->  CTE Scan on u
+         Output: u.f1, u.f2
+(14 rows)
 
-update bar set f2 = f2 + 100 returning *;
+with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1;
  f1 | f2  
 ----+-----
-  2 | 322
   1 | 311
-  6 | 266
+  2 | 322
   3 | 333
   4 | 344
+  6 | 266
   7 | 277
 (6 rows)
 
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 997bd6c..c2b5b17 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -34,6 +34,7 @@
 #include "optimizer/var.h"
 #include "optimizer/tlist.h"
 #include "parser/parsetree.h"
+#include "pgstat.h"
 #include "utils/builtins.h"
 #include "utils/guc.h"
 #include "utils/lsyscache.h"
@@ -52,6 +53,9 @@ PG_MODULE_MAGIC;
 /* If no remote estimates, assume a sort costs 20% extra */
 #define DEFAULT_FDW_SORT_MULTIPLIER 1.2
 
+/* Retrive PgFdwScanState struct from ForeginScanState */
+#define GetPgFdwScanState(n) ((PgFdwScanState *)(n)->fdw_state)
+
 /*
  * Indexes of FDW-private information stored in fdw_private lists.
  *
@@ -121,10 +125,27 @@ enum FdwDirectModifyPrivateIndex
 };
 
 /*
+ * Connection private area structure.
+ */
+ typedef struct PgFdwConnspecate
+{
+	ForeignScanState *current_owner;	/* The node currently running a query
+										 * on this connection*/
+} PgFdwConnspecate;
+
+/* Execution state base type */
+typedef struct PgFdwState
+{
+	PGconn	   *conn;			/* connection for the scan */
+	PgFdwConnspecate *connspec;	/* connection private memory */
+} PgFdwState;
+
+/*
  * Execution state of a foreign scan using postgres_fdw.
  */
 typedef struct PgFdwScanState
 {
+	PgFdwState	s;				/* common structure */
 	Relation	rel;			/* relcache entry for the foreign table. NULL
 								 * for a foreign join scan. */
 	TupleDesc	tupdesc;		/* tuple descriptor of scan */
@@ -135,7 +156,7 @@ typedef struct PgFdwScanState
 	List	   *retrieved_attrs;	/* list of retrieved attribute numbers */
 
 	/* for remote query execution */
-	PGconn	   *conn;			/* connection for the scan */
+	bool		result_ready;
 	unsigned int cursor_number; /* quasi-unique ID for my cursor */
 	bool		cursor_exists;	/* have we created the cursor? */
 	int			numParams;		/* number of parameters passed to query */
@@ -151,6 +172,13 @@ typedef struct PgFdwScanState
 	/* batch-level state, for optimizing rewinds and avoiding useless fetch */
 	int			fetch_ct_2;		/* Min(# of fetches done, 2) */
 	bool		eof_reached;	/* true if last fetch reached EOF */
+	bool		run_async;		/* true if run asynchronously */
+	bool		async_waiting;	/* true if requesting the parent to wait */
+	ForeignScanState *waiter;	/* Next node to run a query among nodes
+								 * sharing the same connection */
+	ForeignScanState *last_waiter;	/* A waiting node at the end of a waiting
+								 * list. Maintained only by the current
+									 * owner of the connection */
 
 	/* working memory contexts */
 	MemoryContext batch_cxt;	/* context holding current batch of tuples */
@@ -164,11 +192,11 @@ typedef struct PgFdwScanState
  */
 typedef struct PgFdwModifyState
 {
+	PgFdwState	s;				/* common structure */
 	Relation	rel;			/* relcache entry for the foreign table */
 	AttInMetadata *attinmeta;	/* attribute datatype conversion metadata */
 
 	/* for remote query execution */
-	PGconn	   *conn;			/* connection for the scan */
 	char	   *p_name;			/* name of prepared statement, if created */
 
 	/* extracted fdw_private data */
@@ -191,6 +219,7 @@ typedef struct PgFdwModifyState
  */
 typedef struct PgFdwDirectModifyState
 {
+	PgFdwState	s;				/* common structure */
 	Relation	rel;			/* relcache entry for the foreign table */
 	AttInMetadata *attinmeta;	/* attribute datatype conversion metadata */
 
@@ -289,6 +318,7 @@ static void postgresBeginForeignScan(ForeignScanState *node, int eflags);
 static TupleTableSlot *postgresIterateForeignScan(ForeignScanState *node);
 static void postgresReScanForeignScan(ForeignScanState *node);
 static void postgresEndForeignScan(ForeignScanState *node);
+static void postgresShutdownForeignScan(ForeignScanState *node);
 static void postgresAddForeignUpdateTargets(Query *parsetree,
 								RangeTblEntry *target_rte,
 								Relation target_relation);
@@ -349,8 +379,8 @@ static bool postgresIsForeignPathAsyncCapable(ForeignPath *path);
 static void postgresForeignAsyncRequest(EState *estate,
 							PendingAsyncRequest *areq);
 static bool postgresForeignAsyncConfigureWait(EState *estate,
-								  PendingAsyncRequest *areq,
-								  bool reinit);
+						    PendingAsyncRequest *areq,
+						    bool reinit);
 static void postgresForeignAsyncNotify(EState *estate,
 						   PendingAsyncRequest *areq);
 
@@ -373,7 +403,10 @@ static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
 						  EquivalenceClass *ec, EquivalenceMember *em,
 						  void *arg);
 static void create_cursor(ForeignScanState *node);
-static void fetch_more_data(ForeignScanState *node);
+static void request_more_data(ForeignScanState *node);
+static void fetch_received_data(ForeignScanState *node);
+static void vacate_connection(PgFdwState *fdwconn);
+static void absorb_current_result(ForeignScanState *node);
 static void close_cursor(PGconn *conn, unsigned int cursor_number);
 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
@@ -434,6 +467,7 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
 	routine->IterateForeignScan = postgresIterateForeignScan;
 	routine->ReScanForeignScan = postgresReScanForeignScan;
 	routine->EndForeignScan = postgresEndForeignScan;
+	routine->ShutdownForeignScan = postgresShutdownForeignScan;
 
 	/* Functions for updating foreign tables */
 	routine->AddForeignUpdateTargets = postgresAddForeignUpdateTargets;
@@ -1314,12 +1348,21 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
 	 * Get connection to the foreign server.  Connection manager will
 	 * establish new connection if necessary.
 	 */
-	fsstate->conn = GetConnection(user, false);
+	fsstate->s.conn = GetConnection(user, false);
+	fsstate->s.connspec = (PgFdwConnspecate *)
+		GetConnectionSpecificStorage(user, sizeof(PgFdwConnspecate));
+	fsstate->s.connspec->current_owner = NULL;
+	fsstate->waiter = NULL;
+	fsstate->last_waiter = node;
 
 	/* Assign a unique ID for my cursor */
-	fsstate->cursor_number = GetCursorNumber(fsstate->conn);
+	fsstate->cursor_number = GetCursorNumber(fsstate->s.conn);
 	fsstate->cursor_exists = false;
 
+	/* Initialize async execution status */
+	fsstate->run_async = false;
+	fsstate->async_waiting = false;
+
 	/* Get private info created by planner functions. */
 	fsstate->query = strVal(list_nth(fsplan->fdw_private,
 									 FdwScanPrivateSelectSql));
@@ -1375,32 +1418,126 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
 static TupleTableSlot *
 postgresIterateForeignScan(ForeignScanState *node)
 {
-	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+	PgFdwScanState *fsstate = GetPgFdwScanState(node);
 	TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
 
 	/*
-	 * If this is the first call after Begin or ReScan, we need to create the
-	 * cursor on the remote side.
-	 */
-	if (!fsstate->cursor_exists)
-		create_cursor(node);
-
-	/*
 	 * Get some more tuples, if we've run out.
 	 */
 	if (fsstate->next_tuple >= fsstate->num_tuples)
 	{
-		/* No point in another fetch if we already detected EOF, though. */
-		if (!fsstate->eof_reached)
-			fetch_more_data(node);
-		/* If we didn't get any tuples, must be end of data. */
+		ForeignScanState *next_conn_owner = node;
+
+		/* This node has sent a query on this connection */
+		if (fsstate->s.connspec->current_owner == node)
+		{
+			/* Check if the result is available */
+			if (PQisBusy(fsstate->s.conn))
+			{
+				int rc = WaitLatchOrSocket(NULL,
+										   WL_SOCKET_READABLE | WL_TIMEOUT,
+										   PQsocket(fsstate->s.conn), 0,
+										   WAIT_EVENT_ASYNC_WAIT);
+				if (fsstate->run_async && !(rc & WL_SOCKET_READABLE))
+				{
+					/*
+					 * This node is not ready yet. Tell the caller to wait.
+					 */
+					fsstate->result_ready = false;
+					return ExecClearTuple(slot);
+				}
+			}
+
+			Assert(fsstate->async_waiting);
+			fsstate->async_waiting = false;
+			fetch_received_data(node);
+
+			/*
+			 * If someone is waiting this node on the same connection, let the
+			 * first waiter be the next owner of this connection.
+			 */
+			if (fsstate->waiter)
+			{
+				PgFdwScanState *next_owner_state;
+
+				next_conn_owner = fsstate->waiter;
+				next_owner_state = GetPgFdwScanState(next_conn_owner);
+				fsstate->waiter = NULL;
+
+				/*
+				 * only the current owner is responsible to maintain the shortcut
+				 * to the last waiter
+				 */
+				next_owner_state->last_waiter = fsstate->last_waiter;
+
+				/*
+				 * for simplicity, last_waiter points itself on a node that no one
+				 * is waiting for.
+				 */
+				fsstate->last_waiter = node;
+			}
+		}
+		else if (fsstate->s.connspec->current_owner)
+		{
+			/*
+			 * Anyone else is holding this connection. Add myself to the tail
+			 * of the waiters' list then return not-ready.  To avoid scanning
+			 * through the waiters' list, the current owner is to maintain the
+			 * shortcut to the last waiter.
+			 */
+			PgFdwScanState *conn_owner_state =
+				GetPgFdwScanState(fsstate->s.connspec->current_owner);
+			ForeignScanState *last_waiter = conn_owner_state->last_waiter;
+			PgFdwScanState *last_waiter_state = GetPgFdwScanState(last_waiter);
+
+			last_waiter_state->waiter = node;
+			conn_owner_state->last_waiter = node;
+
+			/* Register the node to the async-waiting node list */
+			Assert(!GetPgFdwScanState(node)->async_waiting);
+
+			GetPgFdwScanState(node)->async_waiting = true;
+
+			fsstate->result_ready = fsstate->eof_reached;
+			return ExecClearTuple(slot);
+		}
+
+		/*
+		 * Send the next request for the next owner of this connection if
+		 * needed.
+		 */
+
+		if (!GetPgFdwScanState(next_conn_owner)->eof_reached)
+		{
+			PgFdwScanState *next_owner_state =
+				GetPgFdwScanState(next_conn_owner);
+
+			request_more_data(next_conn_owner);
+
+			/* Register the node to the async-waiting node list */
+			if (!next_owner_state->async_waiting)
+				next_owner_state->async_waiting = true;
+
+			if (!next_owner_state->run_async)
+				fetch_received_data(next_conn_owner);
+		}
+
+
+		/*
+		 * If we haven't received a result for the given node this time,
+		 * return with no tuple to give way to other nodes.
+		 */
 		if (fsstate->next_tuple >= fsstate->num_tuples)
+		{
+			fsstate->result_ready = fsstate->eof_reached;
 			return ExecClearTuple(slot);
+		}
 	}
 
 	/*
 	 * Return the next tuple.
 	 */
+	fsstate->result_ready = true;
 	ExecStoreTuple(fsstate->tuples[fsstate->next_tuple++],
 				   slot,
 				   InvalidBuffer,
@@ -1416,7 +1553,7 @@ postgresIterateForeignScan(ForeignScanState *node)
 static void
 postgresReScanForeignScan(ForeignScanState *node)
 {
-	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+	PgFdwScanState *fsstate = GetPgFdwScanState(node);
 	char		sql[64];
 	PGresult   *res;
 
@@ -1424,6 +1561,9 @@ postgresReScanForeignScan(ForeignScanState *node)
 	if (!fsstate->cursor_exists)
 		return;
 
+	/* Absorb the ramining result */
+	absorb_current_result(node);
+
 	/*
 	 * If any internal parameters affecting this node have changed, we'd
 	 * better destroy and recreate the cursor.  Otherwise, rewinding it should
@@ -1452,9 +1592,9 @@ postgresReScanForeignScan(ForeignScanState *node)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = pgfdw_exec_query(fsstate->conn, sql);
+	res = pgfdw_exec_query(fsstate->s.conn, sql);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
-		pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
+		pgfdw_report_error(ERROR, res, fsstate->s.conn, true, sql);
 	PQclear(res);
 
 	/* Now force a fresh FETCH. */
@@ -1472,7 +1612,7 @@ postgresReScanForeignScan(ForeignScanState *node)
 static void
 postgresEndForeignScan(ForeignScanState *node)
 {
-	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+	PgFdwScanState *fsstate = GetPgFdwScanState(node);
 
 	/* if fsstate is NULL, we are in EXPLAIN; nothing to do */
 	if (fsstate == NULL)
@@ -1480,16 +1620,32 @@ postgresEndForeignScan(ForeignScanState *node)
 
 	/* Close the cursor if open, to prevent accumulation of cursors */
 	if (fsstate->cursor_exists)
-		close_cursor(fsstate->conn, fsstate->cursor_number);
+		close_cursor(fsstate->s.conn, fsstate->cursor_number);
 
 	/* Release remote connection */
-	ReleaseConnection(fsstate->conn);
-	fsstate->conn = NULL;
+	ReleaseConnection(fsstate->s.conn);
+	fsstate->s.conn = NULL;
 
 	/* MemoryContexts will be deleted automatically. */
 }
 
 /*
+ * postgresShutdownForeignScan
+ *		Remove asynchrony stuff and cleanup garbage on the connection.
+ */
+static void
+postgresShutdownForeignScan(ForeignScanState *node)
+{
+	ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
+
+	if (plan->operation != CMD_SELECT)
+		return;
+
+	/* Absorb the ramining result */
+	absorb_current_result(node);
+}
+
+/*
  * postgresAddForeignUpdateTargets
  *		Add resjunk column(s) needed for update/delete on a foreign table
  */
@@ -1691,7 +1847,9 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
 	user = GetUserMapping(userid, table->serverid);
 
 	/* Open connection; report that we'll create a prepared statement. */
-	fmstate->conn = GetConnection(user, true);
+	fmstate->s.conn = GetConnection(user, true);
+	fmstate->s.connspec = (PgFdwConnspecate *)
+		GetConnectionSpecificStorage(user, sizeof(PgFdwConnspecate));
 	fmstate->p_name = NULL;		/* prepared statement not made yet */
 
 	/* Deconstruct fdw_private data. */
@@ -1770,6 +1928,8 @@ postgresExecForeignInsert(EState *estate,
 	PGresult   *res;
 	int			n_rows;
 
+	vacate_connection((PgFdwState *)fmstate);
+
 	/* Set up the prepared statement on the remote server, if we didn't yet */
 	if (!fmstate->p_name)
 		prepare_foreign_modify(fmstate);
@@ -1780,14 +1940,14 @@ postgresExecForeignInsert(EState *estate,
 	/*
 	 * Execute the prepared statement.
 	 */
-	if (!PQsendQueryPrepared(fmstate->conn,
+	if (!PQsendQueryPrepared(fmstate->s.conn,
 							 fmstate->p_name,
 							 fmstate->p_nums,
 							 p_values,
 							 NULL,
 							 NULL,
 							 0))
-		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+		pgfdw_report_error(ERROR, NULL, fmstate->s.conn, false, fmstate->query);
 
 	/*
 	 * Get the result, and check for success.
@@ -1795,10 +1955,10 @@ postgresExecForeignInsert(EState *estate,
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = pgfdw_get_result(fmstate->conn, fmstate->query);
+	res = pgfdw_get_result(fmstate->s.conn, fmstate->query);
 	if (PQresultStatus(res) !=
 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
-		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
+		pgfdw_report_error(ERROR, res, fmstate->s.conn, true, fmstate->query);
 
 	/* Check number of rows affected, and fetch RETURNING tuple if any */
 	if (fmstate->has_returning)
@@ -1836,6 +1996,8 @@ postgresExecForeignUpdate(EState *estate,
 	PGresult   *res;
 	int			n_rows;
 
+	vacate_connection((PgFdwState *)fmstate);
+
 	/* Set up the prepared statement on the remote server, if we didn't yet */
 	if (!fmstate->p_name)
 		prepare_foreign_modify(fmstate);
@@ -1856,14 +2018,14 @@ postgresExecForeignUpdate(EState *estate,
 	/*
 	 * Execute the prepared statement.
 	 */
-	if (!PQsendQueryPrepared(fmstate->conn,
+	if (!PQsendQueryPrepared(fmstate->s.conn,
 							 fmstate->p_name,
 							 fmstate->p_nums,
 							 p_values,
 							 NULL,
 							 NULL,
 							 0))
-		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+		pgfdw_report_error(ERROR, NULL, fmstate->s.conn, false, fmstate->query);
 
 	/*
 	 * Get the result, and check for success.
@@ -1871,10 +2033,10 @@ postgresExecForeignUpdate(EState *estate,
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = pgfdw_get_result(fmstate->conn, fmstate->query);
+	res = pgfdw_get_result(fmstate->s.conn, fmstate->query);
 	if (PQresultStatus(res) !=
 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
-		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
+		pgfdw_report_error(ERROR, res, fmstate->s.conn, true, fmstate->query);
 
 	/* Check number of rows affected, and fetch RETURNING tuple if any */
 	if (fmstate->has_returning)
@@ -1912,6 +2074,8 @@ postgresExecForeignDelete(EState *estate,
 	PGresult   *res;
 	int			n_rows;
 
+	vacate_connection((PgFdwState *)fmstate);
+
 	/* Set up the prepared statement on the remote server, if we didn't yet */
 	if (!fmstate->p_name)
 		prepare_foreign_modify(fmstate);
@@ -1932,14 +2096,14 @@ postgresExecForeignDelete(EState *estate,
 	/*
 	 * Execute the prepared statement.
 	 */
-	if (!PQsendQueryPrepared(fmstate->conn,
+	if (!PQsendQueryPrepared(fmstate->s.conn,
 							 fmstate->p_name,
 							 fmstate->p_nums,
 							 p_values,
 							 NULL,
 							 NULL,
 							 0))
-		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+		pgfdw_report_error(ERROR, NULL, fmstate->s.conn, false, fmstate->query);
 
 	/*
 	 * Get the result, and check for success.
@@ -1947,10 +2111,10 @@ postgresExecForeignDelete(EState *estate,
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = pgfdw_get_result(fmstate->conn, fmstate->query);
+	res = pgfdw_get_result(fmstate->s.conn, fmstate->query);
 	if (PQresultStatus(res) !=
 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
-		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
+		pgfdw_report_error(ERROR, res, fmstate->s.conn, true, fmstate->query);
 
 	/* Check number of rows affected, and fetch RETURNING tuple if any */
 	if (fmstate->has_returning)
@@ -1997,16 +2161,16 @@ postgresEndForeignModify(EState *estate,
 		 * We don't use a PG_TRY block here, so be careful not to throw error
 		 * without releasing the PGresult.
 		 */
-		res = pgfdw_exec_query(fmstate->conn, sql);
+		res = pgfdw_exec_query(fmstate->s.conn, sql);
 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
-			pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
+			pgfdw_report_error(ERROR, res, fmstate->s.conn, true, sql);
 		PQclear(res);
 		fmstate->p_name = NULL;
 	}
 
 	/* Release remote connection */
-	ReleaseConnection(fmstate->conn);
-	fmstate->conn = NULL;
+	ReleaseConnection(fmstate->s.conn);
+	fmstate->s.conn = NULL;
 }
 
 /*
@@ -2286,7 +2450,9 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags)
 	 * Get connection to the foreign server.  Connection manager will
 	 * establish new connection if necessary.
 	 */
-	dmstate->conn = GetConnection(user, false);
+	dmstate->s.conn = GetConnection(user, false);
+	dmstate->s.connspec = (PgFdwConnspecate *)
+		GetConnectionSpecificStorage(user, sizeof(PgFdwConnspecate));
 
 	/* Initialize state variable */
 	dmstate->num_tuples = -1;	/* -1 means not set yet */
@@ -2339,7 +2505,10 @@ postgresIterateDirectModify(ForeignScanState *node)
 	 * If this is the first call after Begin, execute the statement.
 	 */
 	if (dmstate->num_tuples == -1)
+	{
+		vacate_connection((PgFdwState *)dmstate);
 		execute_dml_stmt(node);
+	}
 
 	/*
 	 * If the local query doesn't specify RETURNING, just clear tuple slot.
@@ -2386,8 +2555,8 @@ postgresEndDirectModify(ForeignScanState *node)
 		PQclear(dmstate->result);
 
 	/* Release remote connection */
-	ReleaseConnection(dmstate->conn);
-	dmstate->conn = NULL;
+	ReleaseConnection(dmstate->s.conn);
+	dmstate->s.conn = NULL;
 
 	/* MemoryContext will be deleted automatically. */
 }
@@ -2505,6 +2674,7 @@ estimate_path_cost_size(PlannerInfo *root,
 		List	   *local_param_join_conds;
 		StringInfoData sql;
 		PGconn	   *conn;
+		PgFdwConnspecate *connspec;
 		Selectivity local_sel;
 		QualCost	local_cost;
 		List	   *fdw_scan_tlist = NIL;
@@ -2547,6 +2717,16 @@ estimate_path_cost_size(PlannerInfo *root,
 
 		/* Get the remote estimate */
 		conn = GetConnection(fpinfo->user, false);
+		connspec = GetConnectionSpecificStorage(fpinfo->user,
+												sizeof(PgFdwConnspecate));
+		if (connspec)
+		{
+			PgFdwState tmpstate;
+			tmpstate.conn = conn;
+			tmpstate.connspec = connspec;
+			vacate_connection(&tmpstate);
+		}
+
 		get_remote_estimate(sql.data, conn, &rows, &width,
 							&startup_cost, &total_cost);
 		ReleaseConnection(conn);
@@ -2826,11 +3006,11 @@ ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
 static void
 create_cursor(ForeignScanState *node)
 {
-	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+	PgFdwScanState *fsstate = GetPgFdwScanState(node);
 	ExprContext *econtext = node->ss.ps.ps_ExprContext;
 	int			numParams = fsstate->numParams;
 	const char **values = fsstate->param_values;
-	PGconn	   *conn = fsstate->conn;
+	PGconn	   *conn = fsstate->s.conn;
 	StringInfoData buf;
 	PGresult   *res;
 
@@ -2896,47 +3076,96 @@ create_cursor(ForeignScanState *node)
  * Fetch some more rows from the node's cursor.
  */
 static void
-fetch_more_data(ForeignScanState *node)
+request_more_data(ForeignScanState *node)
+{
+	PgFdwScanState *fsstate = GetPgFdwScanState(node);
+	PGconn	   *conn = fsstate->s.conn;
+	char		sql[64];
+
+	/* The connection should be vacant */
+	Assert(fsstate->s.connspec->current_owner == NULL);
+
+	/*
+	 * If this is the first call after Begin or ReScan, we need to create the
+	 * cursor on the remote side.
+	 */
+	if (!fsstate->cursor_exists)
+		create_cursor(node);
+
+	snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
+			 fsstate->fetch_size, fsstate->cursor_number);
+
+	if (!PQsendQuery(conn, sql))
+		pgfdw_report_error(ERROR, NULL, conn, false, sql);
+
+	fsstate->s.connspec->current_owner = node;
+}
+
+/*
+ * Fetch some more rows from the node's cursor.
+ */
+static void
+fetch_received_data(ForeignScanState *node)
 {
-	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+	PgFdwScanState *fsstate = GetPgFdwScanState(node);
 	PGresult   *volatile res = NULL;
 	MemoryContext oldcontext;
 
+	/* I should be the current connection owner */
+	Assert(fsstate->s.connspec->current_owner == node);
+
 	/*
 	 * We'll store the tuples in the batch_cxt.  First, flush the previous
-	 * batch.
+	 * batch if no tuple is remaining
 	 */
-	fsstate->tuples = NULL;
-	MemoryContextReset(fsstate->batch_cxt);
+	if (fsstate->next_tuple >= fsstate->num_tuples)
+	{
+		fsstate->tuples = NULL;
+		fsstate->num_tuples = 0;
+		MemoryContextReset(fsstate->batch_cxt);
+	}
+	else if (fsstate->next_tuple > 0)
+	{
+		/* move the remaining tuples to the beginning of the store */
+		int n = 0;
+
+		while(fsstate->next_tuple < fsstate->num_tuples)
+			fsstate->tuples[n++] = fsstate->tuples[fsstate->next_tuple++];
+		fsstate->num_tuples = n;
+	}
+
 	oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
 
 	/* PGresult must be released before leaving this function. */
 	PG_TRY();
 	{
-		PGconn	   *conn = fsstate->conn;
+		PGconn	   *conn = fsstate->s.conn;
 		char		sql[64];
-		int			numrows;
+		int			addrows;
+		size_t		newsize;
 		int			i;
 
 		snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
 				 fsstate->fetch_size, fsstate->cursor_number);
 
-		res = pgfdw_exec_query(conn, sql);
+		res = pgfdw_get_result(conn, sql);
 		/* On error, report the original query, not the FETCH. */
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
 
 		/* Convert the data into HeapTuples */
-		numrows = PQntuples(res);
-		fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
-		fsstate->num_tuples = numrows;
-		fsstate->next_tuple = 0;
+		addrows = PQntuples(res);
+		newsize = (fsstate->num_tuples + addrows) * sizeof(HeapTuple);
+		if (fsstate->tuples)
+			fsstate->tuples = (HeapTuple *) repalloc(fsstate->tuples, newsize);
+		else
+			fsstate->tuples = (HeapTuple *) palloc(newsize);
 
-		for (i = 0; i < numrows; i++)
+		for (i = 0; i < addrows; i++)
 		{
 			Assert(IsA(node->ss.ps.plan, ForeignScan));
 
-			fsstate->tuples[i] =
+			fsstate->tuples[fsstate->num_tuples + i] =
 				make_tuple_from_result_row(res, i,
 										   fsstate->rel,
 										   fsstate->attinmeta,
@@ -2946,27 +3175,82 @@ fetch_more_data(ForeignScanState *node)
 		}
 
 		/* Update fetch_ct_2 */
-		if (fsstate->fetch_ct_2 < 2)
+		if (fsstate->fetch_ct_2 < 2 && fsstate->next_tuple == 0)
 			fsstate->fetch_ct_2++;
 
+		fsstate->next_tuple = 0;
+		fsstate->num_tuples += addrows;
+
 		/* Must be EOF if we didn't get as many tuples as we asked for. */
-		fsstate->eof_reached = (numrows < fsstate->fetch_size);
+		fsstate->eof_reached = (addrows < fsstate->fetch_size);
 
 		PQclear(res);
 		res = NULL;
 	}
 	PG_CATCH();
 	{
+		fsstate->s.connspec->current_owner = NULL;
 		if (res)
 			PQclear(res);
 		PG_RE_THROW();
 	}
 	PG_END_TRY();
 
+	fsstate->s.connspec->current_owner = NULL;
+
 	MemoryContextSwitchTo(oldcontext);
 }
 
 /*
+ * Vacate a connection so that this node can send the next query
+ */
+static void
+vacate_connection(PgFdwState *fdwstate)
+{
+	PgFdwConnspecate *connspec = fdwstate->connspec;
+	ForeignScanState *owner;
+
+	if (connspec == NULL || connspec->current_owner == NULL)
+		return;
+
+	/*
+	 * let the current connection owner read the result for the running query
+	 */
+	owner = connspec->current_owner;
+	fetch_received_data(owner);
+
+	/* Clear the waiting list */
+	while (owner)
+	{
+		PgFdwScanState *fsstate = GetPgFdwScanState(owner);
+
+		fsstate->last_waiter = NULL;
+		owner = fsstate->waiter;
+		fsstate->waiter = NULL;
+	}
+}
+
+/*
+ * Absorb the result of the current query.
+ */
+static void
+absorb_current_result(ForeignScanState *node)
+{
+	PgFdwScanState *fsstate = GetPgFdwScanState(node);
+	ForeignScanState *owner = fsstate->s.connspec->current_owner;
+
+	if (owner)
+	{
+		PgFdwScanState *target_state = GetPgFdwScanState(owner);
+		PGconn *conn = target_state->s.conn;
+
+		while(PQisBusy(conn))
+			PQclear(PQgetResult(conn));
+		fsstate->s.connspec->current_owner = NULL;
+		fsstate->async_waiting = false;
+	}
+}
+/*
  * Force assorted GUC parameters to settings that ensure that we'll output
  * data values in a form that is unambiguous to the remote server.
  *
@@ -3050,7 +3334,7 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
 
 	/* Construct name we'll use for the prepared statement. */
 	snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
-			 GetPrepStmtNumber(fmstate->conn));
+			 GetPrepStmtNumber(fmstate->s.conn));
 	p_name = pstrdup(prep_name);
 
 	/*
@@ -3060,12 +3344,12 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
 	 * the prepared statements we use in this module are simple enough that
 	 * the remote server will make the right choices.
 	 */
-	if (!PQsendPrepare(fmstate->conn,
+	if (!PQsendPrepare(fmstate->s.conn,
 					   p_name,
 					   fmstate->query,
 					   0,
 					   NULL))
-		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+		pgfdw_report_error(ERROR, NULL, fmstate->s.conn, false, fmstate->query);
 
 	/*
 	 * Get the result, and check for success.
@@ -3073,9 +3357,9 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = pgfdw_get_result(fmstate->conn, fmstate->query);
+	res = pgfdw_get_result(fmstate->s.conn, fmstate->query);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
-		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
+		pgfdw_report_error(ERROR, res, fmstate->s.conn, true, fmstate->query);
 	PQclear(res);
 
 	/* This action shows that the prepare has been done. */
@@ -3206,9 +3490,9 @@ execute_dml_stmt(ForeignScanState *node)
 	 * the desired result.  This allows us to avoid assuming that the remote
 	 * server has the same OIDs we do for the parameters' types.
 	 */
-	if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
+	if (!PQsendQueryParams(dmstate->s.conn, dmstate->query, numParams,
 						   NULL, values, NULL, NULL, 0))
-		pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
+		pgfdw_report_error(ERROR, NULL, dmstate->s.conn, false, dmstate->query);
 
 	/*
 	 * Get the result, and check for success.
@@ -3216,10 +3500,10 @@ execute_dml_stmt(ForeignScanState *node)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
+	dmstate->result = pgfdw_get_result(dmstate->s.conn, dmstate->query);
 	if (PQresultStatus(dmstate->result) !=
 		(dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
-		pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
+		pgfdw_report_error(ERROR, dmstate->result, dmstate->s.conn, true,
 						   dmstate->query);
 
 	/* Get the number of rows affected. */
@@ -4365,8 +4649,10 @@ postgresIsForeignPathAsyncCapable(ForeignPath *path)
 }
 
 /*
- * XXX. Just for testing purposes, let's run everything through the async
- * mechanism but return tuples synchronously.
+ * Accept async request. Notify to the caller if the next tuple is immediately
+ * available. ExecForeignScan does additional work to finishing the returning
+ * tuple, so call it instead of postgresIterateForeignScan to acquire a tuple
+ * in expected shape.
  */
 static void
 postgresForeignAsyncRequest(EState *estate, PendingAsyncRequest *areq)
@@ -4375,22 +4661,59 @@ postgresForeignAsyncRequest(EState *estate, PendingAsyncRequest *areq)
 	TupleTableSlot *slot;
 
 	Assert(IsA(node, ForeignScanState));
+	GetPgFdwScanState(node)->run_async = true;
 	slot = ExecForeignScan(node);
-	ExecAsyncRequestDone(estate, areq, (Node *) slot);
+	if (GetPgFdwScanState(node)->result_ready)
+		ExecAsyncRequestDone(estate, areq, (Node *) slot);
+	else
+		ExecAsyncSetRequiredEvents(estate, areq, 1, false, false);
 }
 
+/*
+ * Configure waiting event.
+ *
+ * Add an wait event only when the node is the connection owner. Elsewise
+ * another node on this connection is the owner.
+ */
 static bool
 postgresForeignAsyncConfigureWait(EState *estate, PendingAsyncRequest *areq,
-								  bool reinit)
+						   bool reinit)
 {
-	elog(ERROR, "postgresForeignAsyncConfigureWait");
+	ForeignScanState *node = (ForeignScanState *) areq->requestee;
+	PgFdwScanState *fsstate = GetPgFdwScanState(node);
+
+
+	/* If the caller didn't reinit, this event is already in event set */
+	if (!reinit)
+		return true;
+
+	if (fsstate->s.connspec->current_owner == node)
+	{
+		AddWaitEventToSet(estate->es_wait_event_set,
+						  WL_SOCKET_READABLE, PQsocket(fsstate->s.conn),
+						  NULL, areq);
+		return true;
+	}
+
 	return false;
 }
 
+/*
+ * Process a notification from async mechanism. ExecForeignScan does
+ * additional work to complete the returning tuple, so call it instead of
+ * postgresIterateForeignScan to acquire a completed tuple.
+ */
 static void
 postgresForeignAsyncNotify(EState *estate, PendingAsyncRequest *areq)
 {
-	elog(ERROR, "postgresForeignAsyncNotify");
+	ForeignScanState *node = (ForeignScanState *) areq->requestee;
+	TupleTableSlot *slot;
+
+	Assert(IsA(node, ForeignScanState));
+	slot = ExecForeignScan(node);
+	Assert(GetPgFdwScanState(node)->result_ready);
+
+	ExecAsyncRequestDone(estate, areq, (Node *) slot);
 }
 
 /*
@@ -4438,7 +4761,7 @@ make_tuple_from_result_row(PGresult *res,
 		PgFdwScanState *fdw_sstate;
 
 		Assert(fsstate);
-		fdw_sstate = (PgFdwScanState *) fsstate->fdw_state;
+		fdw_sstate = GetPgFdwScanState(fsstate);
 		tupdesc = fdw_sstate->tupdesc;
 	}
 
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index 67126bc..9eff0ba 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -79,6 +79,7 @@ typedef struct PgFdwRelationInfo
 	UserMapping *user;			/* only set in use_remote_estimate mode */
 
 	int			fetch_size;		/* fetch size for this remote table */
+	bool		allow_prefetch;	/* true to allow overlapped fetching  */
 
 	/*
 	 * Name of the relation while EXPLAINing ForeignScan. It is used for join
@@ -100,6 +101,7 @@ extern void reset_transmission_modes(int nestlevel);
 
 /* in connection.c */
 extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt);
+void *GetConnectionSpecificStorage(UserMapping *user, size_t initsize);
 extern void ReleaseConnection(PGconn *conn);
 extern unsigned int GetCursorNumber(PGconn *conn);
 extern unsigned int GetPrepStmtNumber(PGconn *conn);
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 4f68e89..de1d96e 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -1248,8 +1248,8 @@ explain (verbose, costs off)
 delete from foo where f1 < 5 returning *;
 delete from foo where f1 < 5 returning *;
 explain (verbose, costs off)
-update bar set f2 = f2 + 100 returning *;
-update bar set f2 = f2 + 100 returning *;
+with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1;
+with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1;
 
 drop table foo cascade;
 drop table bar cascade;
diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index 554244f..f864abe 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -114,6 +114,7 @@
 #include "executor/nodeValuesscan.h"
 #include "executor/nodeWindowAgg.h"
 #include "executor/nodeWorktablescan.h"
+#include "foreign/fdwapi.h"
 #include "nodes/nodeFuncs.h"
 #include "miscadmin.h"
 
@@ -806,6 +807,14 @@ ExecShutdownNode(PlanState *node)
 		case T_GatherState:
 			ExecShutdownGather((GatherState *) node);
 			break;
+		case T_ForeignScanState:
+		{
+			ForeignScanState *fsstate = (ForeignScanState *)node;
+			FdwRoutine *fdwroutine = fsstate->fdwroutine;
+			if (fdwroutine->ShutdownForeignScan)
+				fdwroutine->ShutdownForeignScan((ForeignScanState *) node);
+		}
+		break;
 		default:
 			break;
 	}
diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h
index 65517fd..e40db0e 100644
--- a/src/include/foreign/fdwapi.h
+++ b/src/include/foreign/fdwapi.h
@@ -163,6 +163,7 @@ typedef bool (*ForeignAsyncConfigureWait_function) (EState *estate,
 											bool reinit);
 typedef void (*ForeignAsyncNotify_function) (EState *estate,
 											PendingAsyncRequest *areq);
+typedef void (*ShutdownForeignScan_function) (ForeignScanState *node);
 
 /*
  * FdwRoutine is the struct returned by a foreign-data wrapper's handler
@@ -239,6 +240,7 @@ typedef struct FdwRoutine
 	ForeignAsyncRequest_function ForeignAsyncRequest;
 	ForeignAsyncConfigureWait_function ForeignAsyncConfigureWait;
 	ForeignAsyncNotify_function ForeignAsyncNotify;
+	ShutdownForeignScan_function ShutdownForeignScan;
 } FdwRoutine;
 
 
-- 
2.9.2

