From 4b56fcd0687172e3cccb329bc17e78935657f58f Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Fri, 28 Nov 2014 10:52:41 +0900
Subject: [PATCH 1/2] Async exec of postgres_fdw.

---
 contrib/postgres_fdw/connection.c   | 102 ++++++++++++-------
 contrib/postgres_fdw/postgres_fdw.c | 191 ++++++++++++++++++++++++++++--------
 contrib/postgres_fdw/postgres_fdw.h |  28 +++++-
 3 files changed, 242 insertions(+), 79 deletions(-)

diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 116be7d..8b1c738 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -44,7 +44,7 @@ typedef struct ConnCacheKey
 typedef struct ConnCacheEntry
 {
 	ConnCacheKey key;			/* hash key (must be first) */
-	PGconn	   *conn;			/* connection to foreign server, or NULL */
+	PgFdwConn	*conn;			/* connection to foreign server, or NULL */
 	int			xact_depth;		/* 0 = no xact open, 1 = main xact open, 2 =
 								 * one level of subxact open, etc */
 	bool		have_prep_stmt; /* have we prepared any stmts in this xact? */
@@ -93,7 +93,7 @@ static void pgfdw_subxact_callback(SubXactEvent event,
  * be useful and not mere pedantry.  We could not flush any active connections
  * mid-transaction anyway.
  */
-PGconn *
+PgFdwConn *
 GetConnection(ForeignServer *server, UserMapping *user,
 			  bool will_prep_stmt)
 {
@@ -160,16 +160,36 @@ GetConnection(ForeignServer *server, UserMapping *user,
 		entry->xact_depth = 0;	/* just to be sure */
 		entry->have_prep_stmt = false;
 		entry->have_error = false;
-		entry->conn = connect_pg_server(server, user);
+
+		/* This shoud be in the same memory context with the hashtable */
+		entry->conn = 
+			(PgFdwConn *) MemoryContextAllocZero(CacheMemoryContext,
+												 sizeof(PgFdwConn));
+		
 		elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\"",
-			 entry->conn, server->servername);
+			 entry->conn->conn, server->servername);
 	}
 
+	if (entry->conn->conn == NULL)
+	{
+		entry->conn->conn = connect_pg_server(server, user);
+		entry->conn->nscans = 0;
+		entry->conn->async_state = PGFDW_CONN_IDLE;
+		entry->conn->async_scan = NULL;
+	}
 	/*
 	 * Start a new transaction or subtransaction if needed.
 	 */
 	begin_remote_xact(entry);
 
+	/*
+	 * Cancel async query if there's another foreign scan node sharing this
+	 * connection.
+	 */
+	if (++entry->conn->nscans > 1 &&
+		entry->conn->async_state == PGFDW_CONN_ASYNC_RUNNING)
+		fetch_more_data(entry->conn->async_scan);
+
 	/* Remember if caller will prepare statements */
 	entry->have_prep_stmt |= will_prep_stmt;
 
@@ -182,7 +202,7 @@ GetConnection(ForeignServer *server, UserMapping *user,
 static PGconn *
 connect_pg_server(ForeignServer *server, UserMapping *user)
 {
-	PGconn	   *volatile conn = NULL;
+	PGconn   *volatile conn = NULL;
 
 	/*
 	 * Use PG_TRY block to ensure closing connection on error.
@@ -355,7 +375,12 @@ do_sql_command(PGconn *conn, const char *sql)
 
 	res = PQexec(conn, sql);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
-		pgfdw_report_error(ERROR, res, conn, true, sql);
+	{
+		PgFdwConn tmpfdwconn;
+
+		tmpfdwconn.conn = conn;
+		pgfdw_report_error(ERROR, res, &tmpfdwconn, true, sql);
+	}
 	PQclear(res);
 }
 
@@ -380,13 +405,13 @@ begin_remote_xact(ConnCacheEntry *entry)
 		const char *sql;
 
 		elog(DEBUG3, "starting remote transaction on connection %p",
-			 entry->conn);
+			 &entry->conn);
 
 		if (IsolationIsSerializable())
 			sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
 		else
 			sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
-		do_sql_command(entry->conn, sql);
+		do_sql_command(entry->conn->conn, sql);
 		entry->xact_depth = 1;
 	}
 
@@ -400,7 +425,7 @@ begin_remote_xact(ConnCacheEntry *entry)
 		char		sql[64];
 
 		snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1);
-		do_sql_command(entry->conn, sql);
+		do_sql_command(entry->conn->conn, sql);
 		entry->xact_depth++;
 	}
 }
@@ -409,13 +434,13 @@ begin_remote_xact(ConnCacheEntry *entry)
  * Release connection reference count created by calling GetConnection.
  */
 void
-ReleaseConnection(PGconn *conn)
+ReleaseConnection(PgFdwConn *conn)
 {
-	/*
-	 * Currently, we don't actually track connection references because all
-	 * cleanup is managed on a transaction or subtransaction basis instead. So
-	 * there's nothing to do here.
-	 */
+	if (--conn->nscans == 0)
+	{
+		if (conn->async_scan)
+			finish_async_connection(conn->async_scan);
+	}
 }
 
 /*
@@ -430,7 +455,7 @@ ReleaseConnection(PGconn *conn)
  * collisions are highly improbable; just be sure to use %u not %d to print.
  */
 unsigned int
-GetCursorNumber(PGconn *conn)
+GetCursorNumber(PgFdwConn *conn)
 {
 	return ++cursor_number;
 }
@@ -444,7 +469,7 @@ GetCursorNumber(PGconn *conn)
  * increasing the risk of prepared-statement name collisions by resetting.
  */
 unsigned int
-GetPrepStmtNumber(PGconn *conn)
+GetPrepStmtNumber(PgFdwConn *conn)
 {
 	return ++prep_stmt_number;
 }
@@ -463,7 +488,7 @@ GetPrepStmtNumber(PGconn *conn)
  * marked with have_error = true.
  */
 void
-pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
+pgfdw_report_error(int elevel, PGresult *res, PgFdwConn *conn,
 				   bool clear, const char *sql)
 {
 	/* If requested, PGresult must be released before leaving this function. */
@@ -491,7 +516,7 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
 		 * return NULL, not a PGresult at all.
 		 */
 		if (message_primary == NULL)
-			message_primary = PQerrorMessage(conn);
+			message_primary = PQerrorMessage(conn->conn);
 
 		ereport(elevel,
 				(errcode(sqlstate),
@@ -536,20 +561,20 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 		PGresult   *res;
 
 		/* Ignore cache entry if no open connection right now */
-		if (entry->conn == NULL)
+		if (entry->conn->conn == NULL)
 			continue;
 
 		/* If it has an open remote transaction, try to close it */
 		if (entry->xact_depth > 0)
 		{
 			elog(DEBUG3, "closing remote transaction on connection %p",
-				 entry->conn);
+				 entry->conn->conn);
 
 			switch (event)
 			{
 				case XACT_EVENT_PRE_COMMIT:
 					/* Commit all remote transactions during pre-commit */
-					do_sql_command(entry->conn, "COMMIT TRANSACTION");
+					do_sql_command(entry->conn->conn, "COMMIT TRANSACTION");
 
 					/*
 					 * If there were any errors in subtransactions, and we
@@ -568,7 +593,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 					 */
 					if (entry->have_prep_stmt && entry->have_error)
 					{
-						res = PQexec(entry->conn, "DEALLOCATE ALL");
+						res = PQexec(entry->conn->conn, "DEALLOCATE ALL");
 						PQclear(res);
 					}
 					entry->have_prep_stmt = false;
@@ -598,7 +623,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 					/* Assume we might have lost track of prepared statements */
 					entry->have_error = true;
 					/* If we're aborting, abort all remote transactions too */
-					res = PQexec(entry->conn, "ABORT TRANSACTION");
+					res = PQexec(entry->conn->conn, "ABORT TRANSACTION");
 					/* Note: can't throw ERROR, it would be infinite loop */
 					if (PQresultStatus(res) != PGRES_COMMAND_OK)
 						pgfdw_report_error(WARNING, res, entry->conn, true,
@@ -609,7 +634,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 						/* As above, make sure to clear any prepared stmts */
 						if (entry->have_prep_stmt && entry->have_error)
 						{
-							res = PQexec(entry->conn, "DEALLOCATE ALL");
+							res = PQexec(entry->conn->conn, "DEALLOCATE ALL");
 							PQclear(res);
 						}
 						entry->have_prep_stmt = false;
@@ -621,17 +646,19 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 
 		/* Reset state to show we're out of a transaction */
 		entry->xact_depth = 0;
-
+		entry->conn->nscans = 0;
+		entry->conn->async_state = PGFDW_CONN_IDLE;
+		entry->conn->async_scan = NULL;
 		/*
 		 * If the connection isn't in a good idle state, discard it to
 		 * recover. Next GetConnection will open a new connection.
 		 */
-		if (PQstatus(entry->conn) != CONNECTION_OK ||
-			PQtransactionStatus(entry->conn) != PQTRANS_IDLE)
+		if (PQstatus(entry->conn->conn) != CONNECTION_OK ||
+			PQtransactionStatus(entry->conn->conn) != PQTRANS_IDLE)
 		{
-			elog(DEBUG3, "discarding connection %p", entry->conn);
-			PQfinish(entry->conn);
-			entry->conn = NULL;
+			elog(DEBUG3, "discarding connection %p", entry->conn->conn);
+			PQfinish(entry->conn->conn);
+			entry->conn->conn = NULL;
 		}
 	}
 
@@ -677,11 +704,18 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
 		PGresult   *res;
 		char		sql[100];
 
+		/* Shut down asynchronous scan if running */
+		if (entry->conn->async_scan && PQisBusy(entry->conn->conn))
+			PQconsumeInput(entry->conn->conn);
+		entry->conn->async_scan = NULL;
+		entry->conn->async_state = PGFDW_CONN_IDLE;
+		entry->conn->nscans = 0;
+
 		/*
 		 * We only care about connections with open remote subtransactions of
 		 * the current level.
 		 */
-		if (entry->conn == NULL || entry->xact_depth < curlevel)
+		if (entry->conn->conn == NULL || entry->xact_depth < curlevel)
 			continue;
 
 		if (entry->xact_depth > curlevel)
@@ -692,7 +726,7 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
 		{
 			/* Commit all remote subtransactions during pre-commit */
 			snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
-			do_sql_command(entry->conn, sql);
+			do_sql_command(entry->conn->conn, sql);
 		}
 		else
 		{
@@ -702,7 +736,7 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
 			snprintf(sql, sizeof(sql),
 					 "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
 					 curlevel, curlevel);
-			res = PQexec(entry->conn, sql);
+			res = PQexec(entry->conn->conn, sql);
 			if (PQresultStatus(res) != PGRES_COMMAND_OK)
 				pgfdw_report_error(WARNING, res, entry->conn, true, sql);
 			else
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index c3039a6..b912091 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -136,7 +136,7 @@ typedef struct PgFdwScanState
 	List	   *retrieved_attrs;	/* list of retrieved attribute numbers */
 
 	/* for remote query execution */
-	PGconn	   *conn;			/* connection for the scan */
+	PgFdwConn  *conn;			/* connection for the scan */
 	unsigned int cursor_number; /* quasi-unique ID for my cursor */
 	bool		cursor_exists;	/* have we created the cursor? */
 	int			numParams;		/* number of parameters passed to query */
@@ -156,6 +156,7 @@ typedef struct PgFdwScanState
 	/* working memory contexts */
 	MemoryContext batch_cxt;	/* context holding current batch of tuples */
 	MemoryContext temp_cxt;		/* context for per-tuple temporary data */
+	ExprContext	 *econtext;		/* copy of ps_ExprContext of ForeignScanState */
 } PgFdwScanState;
 
 /*
@@ -167,7 +168,7 @@ typedef struct PgFdwModifyState
 	AttInMetadata *attinmeta;	/* attribute datatype conversion metadata */
 
 	/* for remote query execution */
-	PGconn	   *conn;			/* connection for the scan */
+	PgFdwConn  *conn;			/* connection for the scan */
 	char	   *p_name;			/* name of prepared statement, if created */
 
 	/* extracted fdw_private data */
@@ -298,7 +299,7 @@ static void estimate_path_cost_size(PlannerInfo *root,
 						double *p_rows, int *p_width,
 						Cost *p_startup_cost, Cost *p_total_cost);
 static void get_remote_estimate(const char *sql,
-					PGconn *conn,
+					PgFdwConn *conn,
 					double *rows,
 					int *width,
 					Cost *startup_cost,
@@ -306,9 +307,8 @@ static void get_remote_estimate(const char *sql,
 static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
 						  EquivalenceClass *ec, EquivalenceMember *em,
 						  void *arg);
-static void create_cursor(ForeignScanState *node);
-static void fetch_more_data(ForeignScanState *node);
-static void close_cursor(PGconn *conn, unsigned int cursor_number);
+static void create_cursor(PgFdwScanState *node);
+static void close_cursor(PgFdwConn *conn, unsigned int cursor_number);
 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
 						 ItemPointer tupleid,
@@ -329,7 +329,6 @@ static HeapTuple make_tuple_from_result_row(PGresult *res,
 						   MemoryContext temp_context);
 static void conversion_error_callback(void *arg);
 
-
 /*
  * Foreign-data wrapper handler function: return a struct with pointers
  * to my callback routines.
@@ -982,6 +981,15 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
 		fsstate->param_values = (const char **) palloc0(numParams * sizeof(char *));
 	else
 		fsstate->param_values = NULL;
+
+	fsstate->econtext = node->ss.ps.ps_ExprContext;
+
+	/*
+	 * Start scanning asynchronously if it is the first scan on this
+	 * connection.
+	 */
+	if (fsstate->conn->nscans == 1)
+		create_cursor(fsstate);
 }
 
 /*
@@ -1000,7 +1008,7 @@ postgresIterateForeignScan(ForeignScanState *node)
 	 * cursor on the remote side.
 	 */
 	if (!fsstate->cursor_exists)
-		create_cursor(node);
+		create_cursor(fsstate);
 
 	/*
 	 * Get some more tuples, if we've run out.
@@ -1009,7 +1017,7 @@ postgresIterateForeignScan(ForeignScanState *node)
 	{
 		/* No point in another fetch if we already detected EOF, though. */
 		if (!fsstate->eof_reached)
-			fetch_more_data(node);
+			fetch_more_data(fsstate);
 		/* If we didn't get any tuples, must be end of data. */
 		if (fsstate->next_tuple >= fsstate->num_tuples)
 			return ExecClearTuple(slot);
@@ -1069,7 +1077,7 @@ postgresReScanForeignScan(ForeignScanState *node)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexec(fsstate->conn, sql);
+	res = PQexec(fsstate->conn->conn, sql);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
 	PQclear(res);
@@ -1398,7 +1406,7 @@ postgresExecForeignInsert(EState *estate,
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexecPrepared(fmstate->conn,
+	res = PQexecPrepared(fmstate->conn->conn,
 						 fmstate->p_name,
 						 fmstate->p_nums,
 						 p_values,
@@ -1468,7 +1476,7 @@ postgresExecForeignUpdate(EState *estate,
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexecPrepared(fmstate->conn,
+	res = PQexecPrepared(fmstate->conn->conn,
 						 fmstate->p_name,
 						 fmstate->p_nums,
 						 p_values,
@@ -1538,7 +1546,7 @@ postgresExecForeignDelete(EState *estate,
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexecPrepared(fmstate->conn,
+	res = PQexecPrepared(fmstate->conn->conn,
 						 fmstate->p_name,
 						 fmstate->p_nums,
 						 p_values,
@@ -1594,7 +1602,7 @@ postgresEndForeignModify(EState *estate,
 		 * We don't use a PG_TRY block here, so be careful not to throw error
 		 * without releasing the PGresult.
 		 */
-		res = PQexec(fmstate->conn, sql);
+		res = PQexec(fmstate->conn->conn, sql);
 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
 			pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
 		PQclear(res);
@@ -1726,7 +1734,7 @@ estimate_path_cost_size(PlannerInfo *root,
 		List	   *local_join_conds;
 		StringInfoData sql;
 		List	   *retrieved_attrs;
-		PGconn	   *conn;
+		PgFdwConn  *conn;
 		Selectivity local_sel;
 		QualCost	local_cost;
 
@@ -1836,7 +1844,7 @@ estimate_path_cost_size(PlannerInfo *root,
  * The given "sql" must be an EXPLAIN command.
  */
 static void
-get_remote_estimate(const char *sql, PGconn *conn,
+get_remote_estimate(const char *sql, PgFdwConn *conn,
 					double *rows, int *width,
 					Cost *startup_cost, Cost *total_cost)
 {
@@ -1852,7 +1860,7 @@ get_remote_estimate(const char *sql, PGconn *conn,
 		/*
 		 * Execute EXPLAIN remotely.
 		 */
-		res = PQexec(conn, sql);
+		res = PQexec(conn->conn, sql);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, sql);
 
@@ -1917,13 +1925,12 @@ ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
  * Create cursor for node's query with current parameter values.
  */
 static void
-create_cursor(ForeignScanState *node)
+create_cursor(PgFdwScanState *fsstate)
 {
-	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
-	ExprContext *econtext = node->ss.ps.ps_ExprContext;
+	ExprContext *econtext = fsstate->econtext;
 	int			numParams = fsstate->numParams;
 	const char **values = fsstate->param_values;
-	PGconn	   *conn = fsstate->conn;
+	PgFdwConn	*conn = fsstate->conn;
 	StringInfoData buf;
 	PGresult   *res;
 
@@ -1985,7 +1992,7 @@ create_cursor(ForeignScanState *node)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexecParams(conn, buf.data, numParams, NULL, values,
+	res = PQexecParams(conn->conn, buf.data, numParams, NULL, values,
 					   NULL, NULL, 0);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
@@ -2001,15 +2008,18 @@ create_cursor(ForeignScanState *node)
 
 	/* Clean up */
 	pfree(buf.data);
+
+	/* Start async scan if this is the first scan */
+	if (fsstate->conn->nscans == 1)
+		fetch_more_data(fsstate);
 }
 
 /*
  * Fetch some more rows from the node's cursor.
  */
-static void
-fetch_more_data(ForeignScanState *node)
+void
+fetch_more_data(PgFdwScanState *fsstate)
 {
-	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
 	PGresult   *volatile res = NULL;
 	MemoryContext oldcontext;
 
@@ -2024,7 +2034,7 @@ fetch_more_data(ForeignScanState *node)
 	/* PGresult must be released before leaving this function. */
 	PG_TRY();
 	{
-		PGconn	   *conn = fsstate->conn;
+		PgFdwConn  *conn = fsstate->conn;
 		char		sql[64];
 		int			fetch_size;
 		int			numrows;
@@ -2036,9 +2046,63 @@ fetch_more_data(ForeignScanState *node)
 		snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
 				 fetch_size, fsstate->cursor_number);
 
-		res = PQexec(conn, sql);
+		switch (conn->async_state)
+		{
+		case PGFDW_CONN_IDLE:
+			Assert(conn->async_scan == NULL);
+
+			if (conn->nscans == 1)
+			{
+				conn->async_scan = fsstate;
+
+				if (!PQsendQuery(conn->conn, sql))
+					pgfdw_report_error(ERROR, res, conn, false,
+									   fsstate->query);
+
+				conn->async_state = PGFDW_CONN_ASYNC_RUNNING;
+				goto end_of_fetch;
+			}
+
+			/* Synchronous query execution */
+			conn->async_state = PGFDW_CONN_SYNC_RUNNING;
+			res = PQexec(conn->conn, sql);
+			break;
+
+		case PGFDW_CONN_ASYNC_RUNNING:
+			Assert(conn->async_scan != NULL);
+
+			res = PQgetResult(conn->conn);
+			if (PQntuples(res) == fetch_size)
+			{
+				/*
+				 * Connection state doesn't go to IDLE even if all data
+				 * has been sent to client for asynchronous query. One
+				 * more PQgetResult() is needed to reset the state to
+				 * IDLE.  See PQexecFinish() for details.
+				 */
+				if (PQgetResult(conn->conn) != NULL)
+					elog(ERROR, "Connection status error.");
+			}
+
+			if (conn->nscans == 1)
+				break;
+
+			/*
+			 * If nscans is more then 1, stop invoking command asynchronously
+			 * for multiple scans on this connection. If nscan is zero, async
+			 * command on this connection should be finished immediately.
+			 */
+			conn->async_state = PGFDW_CONN_SYNC_RUNNING;
+			break;
+
+		default:
+			elog(ERROR, "unexpected async state : %d", conn->async_state);
+			break;
+
+		}
+
 		/* On error, report the original query, not the FETCH. */
-		if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		if (res &&  PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
 
 		/* Convert the data into HeapTuples */
@@ -2066,6 +2130,36 @@ fetch_more_data(ForeignScanState *node)
 
 		PQclear(res);
 		res = NULL;
+
+		switch(conn->async_state)
+		{
+		case PGFDW_CONN_ASYNC_RUNNING:
+			if (!fsstate->eof_reached)
+			{
+				/*
+				 * We can immediately request the next bunch of tuples if
+				 * we're on asynchronous connection.
+				 */
+				if (!PQsendQuery(conn->conn, sql))
+					pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
+			}
+			else
+				conn->async_state = PGFDW_CONN_IDLE;
+			break;
+
+
+		case PGFDW_CONN_SYNC_RUNNING:
+			conn->async_state = PGFDW_CONN_IDLE;
+			conn->async_scan = NULL;
+			break;
+
+		default:
+			elog(ERROR, "Unexpedted async state: %d", conn->async_state);
+			break;
+		}
+			
+end_of_fetch:
+		;	/* Nothing to do here but needed to make compiler quiet. */
 	}
 	PG_CATCH();
 	{
@@ -2079,6 +2173,23 @@ fetch_more_data(ForeignScanState *node)
 }
 
 /*
+ * Force cancelling async command state.
+ */
+void
+finish_async_connection(PgFdwScanState *fsstate)
+{
+	/* Finish async command if any */
+	if (fsstate->conn->async_state == PGFDW_CONN_ASYNC_RUNNING)
+		fetch_more_data(fsstate->conn->async_scan);
+	fsstate->conn->async_scan = NULL;
+	Assert(fsstate->conn->async_state == PGFDW_CONN_IDLE);
+
+	/* Immediately discard the result */
+	fsstate->next_tuple = 0;
+	fsstate->num_tuples = 0;
+}
+
+/*
  * Force assorted GUC parameters to settings that ensure that we'll output
  * data values in a form that is unambiguous to the remote server.
  *
@@ -2132,7 +2243,7 @@ reset_transmission_modes(int nestlevel)
  * Utility routine to close a cursor.
  */
 static void
-close_cursor(PGconn *conn, unsigned int cursor_number)
+close_cursor(PgFdwConn *conn, unsigned int cursor_number)
 {
 	char		sql[64];
 	PGresult   *res;
@@ -2143,7 +2254,7 @@ close_cursor(PGconn *conn, unsigned int cursor_number)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexec(conn, sql);
+	res = PQexec(conn->conn, sql);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, conn, true, sql);
 	PQclear(res);
@@ -2175,7 +2286,7 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQprepare(fmstate->conn,
+	res = PQprepare(fmstate->conn->conn,
 					p_name,
 					fmstate->query,
 					0,
@@ -2297,7 +2408,7 @@ postgresAnalyzeForeignTable(Relation relation,
 	ForeignTable *table;
 	ForeignServer *server;
 	UserMapping *user;
-	PGconn	   *conn;
+	PgFdwConn	*conn;
 	StringInfoData sql;
 	PGresult   *volatile res = NULL;
 
@@ -2329,7 +2440,7 @@ postgresAnalyzeForeignTable(Relation relation,
 	/* In what follows, do not risk leaking any PGresults. */
 	PG_TRY();
 	{
-		res = PQexec(conn, sql.data);
+		res = PQexec(conn->conn, sql.data);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, sql.data);
 
@@ -2379,7 +2490,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
 	ForeignTable *table;
 	ForeignServer *server;
 	UserMapping *user;
-	PGconn	   *conn;
+	PgFdwConn	*conn;
 	unsigned int cursor_number;
 	StringInfoData sql;
 	PGresult   *volatile res = NULL;
@@ -2423,7 +2534,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
 	/* In what follows, do not risk leaking any PGresults. */
 	PG_TRY();
 	{
-		res = PQexec(conn, sql.data);
+		res = PQexec(conn->conn, sql.data);
 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
 			pgfdw_report_error(ERROR, res, conn, false, sql.data);
 		PQclear(res);
@@ -2453,7 +2564,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
 			snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
 					 fetch_size, cursor_number);
 
-			res = PQexec(conn, fetch_sql);
+			res = PQexec(conn->conn, fetch_sql);
 			/* On error, report the original query, not the FETCH. */
 			if (PQresultStatus(res) != PGRES_TUPLES_OK)
 				pgfdw_report_error(ERROR, res, conn, false, sql.data);
@@ -2582,7 +2693,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 	bool		import_not_null = true;
 	ForeignServer *server;
 	UserMapping *mapping;
-	PGconn	   *conn;
+	PgFdwConn   *conn;
 	StringInfoData buf;
 	PGresult   *volatile res = NULL;
 	int			numrows,
@@ -2615,7 +2726,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 	conn = GetConnection(server, mapping, false);
 
 	/* Don't attempt to import collation if remote server hasn't got it */
-	if (PQserverVersion(conn) < 90100)
+	if (PQserverVersion(conn->conn) < 90100)
 		import_collate = false;
 
 	/* Create workspace for strings */
@@ -2628,7 +2739,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 		appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
 		deparseStringLiteral(&buf, stmt->remote_schema);
 
-		res = PQexec(conn, buf.data);
+		res = PQexec(conn->conn, buf.data);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, buf.data);
 
@@ -2723,7 +2834,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 		appendStringInfo(&buf, " ORDER BY c.relname, a.attnum");
 
 		/* Fetch the data */
-		res = PQexec(conn, buf.data);
+		res = PQexec(conn->conn, buf.data);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, buf.data);
 
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index 0382c55..2472451 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -20,17 +20,35 @@
 
 #include "libpq-fe.h"
 
+typedef enum PgFdwConnState
+{
+	PGFDW_CONN_IDLE,
+	PGFDW_CONN_ASYNC_RUNNING,
+	PGFDW_CONN_SYNC_RUNNING
+} PgFdwConnState;
+
+typedef struct PgFdwConn
+{
+	PGconn *conn;
+	int		nscans;
+	PgFdwConnState	async_state;
+	struct PgFdwScanState *async_scan;
+} PgFdwConn;
+
+
 /* in postgres_fdw.c */
 extern int	set_transmission_modes(void);
 extern void reset_transmission_modes(int nestlevel);
+extern void fetch_more_data(struct PgFdwScanState *node);
+extern void finish_async_connection(struct PgFdwScanState *fsstate);
 
 /* in connection.c */
-extern PGconn *GetConnection(ForeignServer *server, UserMapping *user,
+extern PgFdwConn *GetConnection(ForeignServer *server, UserMapping *user,
 			  bool will_prep_stmt);
-extern void ReleaseConnection(PGconn *conn);
-extern unsigned int GetCursorNumber(PGconn *conn);
-extern unsigned int GetPrepStmtNumber(PGconn *conn);
-extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
+extern void ReleaseConnection(PgFdwConn *conn);
+extern unsigned int GetCursorNumber(PgFdwConn *conn);
+extern unsigned int GetPrepStmtNumber(PgFdwConn *conn);
+extern void pgfdw_report_error(int elevel, PGresult *res, PgFdwConn *conn,
 				   bool clear, const char *sql);
 
 /* in option.c */
-- 
2.1.0.GIT

