>From 58757978b5625aa3ae9a99fdcd9d6db393e62a5a Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Tue, 13 Jan 2015 19:20:35 +0900
Subject: [PATCH] Asynchronous execution of postgres_fdw v3

This is the modified version of Asynchronous execution of
postgres_fdw.

- Remove PgFdwAsyncState.
- Separate PgFdwConn into individual module
---
 contrib/postgres_fdw/Makefile       |   2 +-
 contrib/postgres_fdw/PgFdwConn.c    | 200 +++++++++++++++++++++++++++++++
 contrib/postgres_fdw/PgFdwConn.h    |  62 ++++++++++
 contrib/postgres_fdw/connection.c   |  86 +++++++------
 contrib/postgres_fdw/postgres_fdw.c | 232 ++++++++++++++++++++++++++----------
 contrib/postgres_fdw/postgres_fdw.h |  16 ++-
 6 files changed, 490 insertions(+), 108 deletions(-)
 create mode 100644 contrib/postgres_fdw/PgFdwConn.c
 create mode 100644 contrib/postgres_fdw/PgFdwConn.h

diff --git a/contrib/postgres_fdw/Makefile b/contrib/postgres_fdw/Makefile
index d2b98e1..d0913e2 100644
--- a/contrib/postgres_fdw/Makefile
+++ b/contrib/postgres_fdw/Makefile
@@ -1,7 +1,7 @@
 # contrib/postgres_fdw/Makefile
 
 MODULE_big = postgres_fdw
-OBJS = postgres_fdw.o option.o deparse.o connection.o $(WIN32RES)
+OBJS = postgres_fdw.o PgFdwConn.o option.o deparse.o connection.o $(WIN32RES)
 PGFILEDESC = "postgres_fdw - foreign data wrapper for PostgreSQL"
 
 PG_CPPFLAGS = -I$(libpq_srcdir)
diff --git a/contrib/postgres_fdw/PgFdwConn.c b/contrib/postgres_fdw/PgFdwConn.c
new file mode 100644
index 0000000..b13b597
--- /dev/null
+++ b/contrib/postgres_fdw/PgFdwConn.c
@@ -0,0 +1,200 @@
+/*-------------------------------------------------------------------------
+ *
+ * PgFdwConn.c
+ *		  PGconn extending wrapper to enable asynchronous query.
+ *
+ * Portions Copyright (c) 2012-2015, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		  contrib/postgres_fdw/PgFdwConn.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "PgFdwConn.h"
+
+#define PFC_ALLOCATE()	((PgFdwConn *)malloc(sizeof(PgFdwConn)))
+#define PFC_FREE(c)		free(c)
+
+struct pgfdw_conn
+{
+	PGconn *pgconn;				/* libpq connection for this connection */
+	int		nscans;				/* number of scans using this connection */
+	struct PgFdwScanState *async_scan; /* the connection currently running
+										* async query on this connection  */
+};
+
+void
+PFCsetAsyncScan(PgFdwConn *conn, struct PgFdwScanState *scan)
+{
+	conn->async_scan = scan;
+}
+
+struct PgFdwScanState *
+PFCgetAsyncScan(PgFdwConn *conn)
+{
+	return conn->async_scan;
+}
+
+int
+PFCisAsyncRunning(PgFdwConn *conn)
+{
+	return conn->async_scan != NULL;
+}
+
+PGconn *
+PFCgetPGconn(PgFdwConn *conn)
+{
+	return conn->pgconn;
+}
+
+int
+PFCgetNscans(PgFdwConn *conn)
+{
+	return conn->nscans;
+}
+
+int
+PFCincrementNscans(PgFdwConn *conn)
+{
+	return ++conn->nscans;
+}
+
+int
+PFCdecrementNscans(PgFdwConn *conn)
+{
+	Assert(conn->nscans > 0);
+	return --conn->nscans;
+}
+
+void
+PFCcancelAsync(PgFdwConn *conn)
+{
+	if (PFCisAsyncRunning(conn))
+		PFCconsumeInput(conn);
+}
+
+void
+PFCinit(PgFdwConn *conn)
+{
+	conn->async_scan = NULL;
+	conn->nscans = 0;
+}
+
+int
+PFCsendQuery(PgFdwConn *conn, const char *query)
+{
+	return PQsendQuery(conn->pgconn, query);
+}
+
+PGresult *
+PFCexec(PgFdwConn *conn, const char *query)
+{
+	return PQexec(conn->pgconn, query);
+}
+
+PGresult *
+PFCexecParams(PgFdwConn *conn,
+			  const char *command,
+			  int nParams,
+			  const Oid *paramTypes,
+			  const char *const * paramValues,
+			  const int *paramLengths,
+			  const int *paramFormats,
+			  int resultFormat)
+{
+	return PQexecParams(conn->pgconn,
+						command, nParams, paramTypes, paramValues,
+						paramLengths, paramFormats, resultFormat);
+}
+
+PGresult *
+PFCprepare(PgFdwConn *conn,
+		   const char *stmtName, const char *query,
+		   int nParams, const Oid *paramTypes)
+{
+	return PQprepare(conn->pgconn, stmtName, query, nParams, paramTypes);
+}
+
+PGresult *
+PFCexecPrepared(PgFdwConn *conn,
+				const char *stmtName,
+				int nParams,
+				const char *const * paramValues,
+				const int *paramLengths,
+				const int *paramFormats,
+				int resultFormat)
+{
+	return PQexecPrepared(conn->pgconn, 
+						  stmtName, nParams, paramValues, paramLengths,
+						  paramFormats, resultFormat);
+}
+
+PGresult *
+PFCgetResult(PgFdwConn *conn)
+{
+	return PQgetResult(conn->pgconn);
+}
+
+int
+PFCconsumeInput(PgFdwConn *conn)
+{
+	return PQconsumeInput(conn->pgconn);
+}
+
+int
+PFCisBusy(PgFdwConn *conn)
+{
+	return PQisBusy(conn->pgconn);
+}
+
+ConnStatusType
+PFCstatus(const PgFdwConn *conn)
+{
+	return PQstatus(conn->pgconn);
+}
+
+PGTransactionStatusType
+PFCtransactionStatus(const PgFdwConn *conn)
+{
+	return PQtransactionStatus(conn->pgconn);
+}
+
+int
+PFCserverVersion(const PgFdwConn *conn)
+{
+	return PQserverVersion(conn->pgconn);
+}
+
+char *
+PFCerrorMessage(const PgFdwConn *conn)
+{
+	return PQerrorMessage(conn->pgconn);
+}
+
+int
+PFCconnectionUsedPassword(const PgFdwConn *conn)
+{
+	return PQconnectionUsedPassword(conn->pgconn);
+}
+
+void
+PFCfinish(PgFdwConn *conn)
+{
+	return PQfinish(conn->pgconn);
+	PFC_FREE(conn);
+}
+
+PgFdwConn *
+PFCconnectdbParams(const char *const * keywords,
+				   const char *const * values, int expand_dbname)
+{
+	PgFdwConn *ret = PFC_ALLOCATE();
+
+	PFCinit(ret);
+	ret->pgconn = PQconnectdbParams(keywords, values, expand_dbname);
+
+	return ret;
+}
diff --git a/contrib/postgres_fdw/PgFdwConn.h b/contrib/postgres_fdw/PgFdwConn.h
new file mode 100644
index 0000000..2771de5
--- /dev/null
+++ b/contrib/postgres_fdw/PgFdwConn.h
@@ -0,0 +1,62 @@
+/*-------------------------------------------------------------------------
+ *
+ * PgFdwConn.h
+ *		  PGconn extending wrapper to enable asynchronous query.
+ *
+ * Portions Copyright (c) 2012-2015, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		  contrib/postgres_fdw/PgFdwConn.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef PGFDWCONN_H
+#define PGFDWCONN_H
+
+#include "libpq-fe.h"
+
+typedef struct pgfdw_conn PgFdwConn;
+struct PgFdwScanState;
+
+extern void PFCsetAsyncScan(PgFdwConn *conn, struct PgFdwScanState *scan);
+extern struct PgFdwScanState *PFCgetAsyncScan(PgFdwConn *conn);
+extern int PFCisAsyncRunning(PgFdwConn *conn);
+extern PGconn *PFCgetPGconn(PgFdwConn *conn);
+extern int PFCgetNscans(PgFdwConn *conn);
+extern int PFCincrementNscans(PgFdwConn *conn);
+extern int PFCdecrementNscans(PgFdwConn *conn);
+extern void PFCcancelAsync(PgFdwConn *conn);
+extern void PFCinit(PgFdwConn *conn);
+extern int PFCsendQuery(PgFdwConn *conn, const char *query);
+extern PGresult *PFCexec(PgFdwConn *conn, const char *query);
+extern PGresult *PFCexecParams(PgFdwConn *conn,
+								const char *command,
+								int nParams,
+								const Oid *paramTypes,
+								const char *const * paramValues,
+								const int *paramLengths,
+								const int *paramFormats,
+								int resultFormat);
+extern PGresult *PFCprepare(PgFdwConn *conn,
+							const char *stmtName, const char *query,
+							int nParams, const Oid *paramTypes);
+extern PGresult *PFCexecPrepared(PgFdwConn *conn,
+								 const char *stmtName,
+								 int nParams,
+								 const char *const * paramValues,
+								 const int *paramLengths,
+								 const int *paramFormats,
+								 int resultFormat);
+extern PGresult *PFCgetResult(PgFdwConn *conn);
+extern int PFCconsumeInput(PgFdwConn *conn);
+extern int PFCisBusy(PgFdwConn *conn);
+extern ConnStatusType PFCstatus(const PgFdwConn *conn);
+extern PGTransactionStatusType PFCtransactionStatus(const PgFdwConn *conn);
+extern int PFCserverVersion(const PgFdwConn *conn);
+extern char *PFCerrorMessage(const PgFdwConn *conn);
+extern int PFCconnectionUsedPassword(const PgFdwConn *conn);
+extern void PFCfinish(PgFdwConn *conn);
+extern PgFdwConn *PFCconnectdbParams(const char *const * keywords,
+			 const char *const * values, int expand_dbname);
+
+#endif   /* PGFDWCONN_H */
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 4e02cb2..5bf08ec 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -44,7 +44,7 @@ typedef struct ConnCacheKey
 typedef struct ConnCacheEntry
 {
 	ConnCacheKey key;			/* hash key (must be first) */
-	PGconn	   *conn;			/* connection to foreign server, or NULL */
+	PgFdwConn	*conn;			/* connection to foreign server, or NULL */
 	int			xact_depth;		/* 0 = no xact open, 1 = main xact open, 2 =
 								 * one level of subxact open, etc */
 	bool		have_prep_stmt; /* have we prepared any stmts in this xact? */
@@ -64,10 +64,10 @@ static unsigned int prep_stmt_number = 0;
 static bool xact_got_connection = false;
 
 /* prototypes of private functions */
-static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
+static PgFdwConn *connect_pg_server(ForeignServer *server, UserMapping *user);
 static void check_conn_params(const char **keywords, const char **values);
-static void configure_remote_session(PGconn *conn);
-static void do_sql_command(PGconn *conn, const char *sql);
+static void configure_remote_session(PgFdwConn *conn);
+static void do_sql_command(PgFdwConn *conn, const char *sql);
 static void begin_remote_xact(ConnCacheEntry *entry);
 static void pgfdw_xact_callback(XactEvent event, void *arg);
 static void pgfdw_subxact_callback(SubXactEvent event,
@@ -93,7 +93,7 @@ static void pgfdw_subxact_callback(SubXactEvent event,
  * be useful and not mere pedantry.  We could not flush any active connections
  * mid-transaction anyway.
  */
-PGconn *
+PgFdwConn *
 GetConnection(ForeignServer *server, UserMapping *user,
 			  bool will_prep_stmt)
 {
@@ -161,7 +161,7 @@ GetConnection(ForeignServer *server, UserMapping *user,
 		entry->have_error = false;
 		entry->conn = connect_pg_server(server, user);
 		elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\"",
-			 entry->conn, server->servername);
+			 PFCgetPGconn(entry->conn), server->servername);
 	}
 
 	/*
@@ -169,6 +169,13 @@ GetConnection(ForeignServer *server, UserMapping *user,
 	 */
 	begin_remote_xact(entry);
 
+	/*
+	 * Finish async query immediately if another foreign scan node sharing
+	 * this connection comes.
+	 */
+	if (PFCincrementNscans(entry->conn) > 1 && PFCisAsyncRunning(entry->conn))
+		fetch_more_data(PFCgetAsyncScan(entry->conn));
+
 	/* Remember if caller will prepare statements */
 	entry->have_prep_stmt |= will_prep_stmt;
 
@@ -178,10 +185,10 @@ GetConnection(ForeignServer *server, UserMapping *user,
 /*
  * Connect to remote server using specified server and user mapping properties.
  */
-static PGconn *
+static PgFdwConn *
 connect_pg_server(ForeignServer *server, UserMapping *user)
 {
-	PGconn	   *volatile conn = NULL;
+	PgFdwConn   *volatile conn = NULL;
 
 	/*
 	 * Use PG_TRY block to ensure closing connection on error.
@@ -223,14 +230,14 @@ connect_pg_server(ForeignServer *server, UserMapping *user)
 		/* verify connection parameters and make connection */
 		check_conn_params(keywords, values);
 
-		conn = PQconnectdbParams(keywords, values, false);
-		if (!conn || PQstatus(conn) != CONNECTION_OK)
+		conn = PFCconnectdbParams(keywords, values, false);
+		if (!conn || PFCstatus(conn) != CONNECTION_OK)
 		{
 			char	   *connmessage;
 			int			msglen;
 
 			/* libpq typically appends a newline, strip that */
-			connmessage = pstrdup(PQerrorMessage(conn));
+			connmessage = pstrdup(PFCerrorMessage(conn));
 			msglen = strlen(connmessage);
 			if (msglen > 0 && connmessage[msglen - 1] == '\n')
 				connmessage[msglen - 1] = '\0';
@@ -246,7 +253,7 @@ connect_pg_server(ForeignServer *server, UserMapping *user)
 		 * otherwise, he's piggybacking on the postgres server's user
 		 * identity. See also dblink_security_check() in contrib/dblink.
 		 */
-		if (!superuser() && !PQconnectionUsedPassword(conn))
+		if (!superuser() && !PFCconnectionUsedPassword(conn))
 			ereport(ERROR,
 				  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
 				   errmsg("password is required"),
@@ -263,7 +270,7 @@ connect_pg_server(ForeignServer *server, UserMapping *user)
 	{
 		/* Release PGconn data structure if we managed to create one */
 		if (conn)
-			PQfinish(conn);
+			PFCfinish(conn);
 		PG_RE_THROW();
 	}
 	PG_END_TRY();
@@ -312,9 +319,9 @@ check_conn_params(const char **keywords, const char **values)
  * there are any number of ways to break things.
  */
 static void
-configure_remote_session(PGconn *conn)
+configure_remote_session(PgFdwConn *conn)
 {
-	int			remoteversion = PQserverVersion(conn);
+	int			remoteversion = PFCserverVersion(conn);
 
 	/* Force the search path to contain only pg_catalog (see deparse.c) */
 	do_sql_command(conn, "SET search_path = pg_catalog");
@@ -348,11 +355,11 @@ configure_remote_session(PGconn *conn)
  * Convenience subroutine to issue a non-data-returning SQL command to remote
  */
 static void
-do_sql_command(PGconn *conn, const char *sql)
+do_sql_command(PgFdwConn *conn, const char *sql)
 {
 	PGresult   *res;
 
-	res = PQexec(conn, sql);
+	res = PFCexec(conn, sql);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, conn, true, sql);
 	PQclear(res);
@@ -379,7 +386,7 @@ begin_remote_xact(ConnCacheEntry *entry)
 		const char *sql;
 
 		elog(DEBUG3, "starting remote transaction on connection %p",
-			 entry->conn);
+			 PFCgetPGconn(entry->conn));
 
 		if (IsolationIsSerializable())
 			sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
@@ -408,13 +415,11 @@ begin_remote_xact(ConnCacheEntry *entry)
  * Release connection reference count created by calling GetConnection.
  */
 void
-ReleaseConnection(PGconn *conn)
+ReleaseConnection(PgFdwConn *conn)
 {
-	/*
-	 * Currently, we don't actually track connection references because all
-	 * cleanup is managed on a transaction or subtransaction basis instead. So
-	 * there's nothing to do here.
-	 */
+	/* ongoing async query should be canceled if no scans left */
+	if (PFCdecrementNscans(conn) == 0)
+		finish_async_connection(conn);
 }
 
 /*
@@ -429,7 +434,7 @@ ReleaseConnection(PGconn *conn)
  * collisions are highly improbable; just be sure to use %u not %d to print.
  */
 unsigned int
-GetCursorNumber(PGconn *conn)
+GetCursorNumber(PgFdwConn *conn)
 {
 	return ++cursor_number;
 }
@@ -443,7 +448,7 @@ GetCursorNumber(PGconn *conn)
  * increasing the risk of prepared-statement name collisions by resetting.
  */
 unsigned int
-GetPrepStmtNumber(PGconn *conn)
+GetPrepStmtNumber(PgFdwConn *conn)
 {
 	return ++prep_stmt_number;
 }
@@ -462,7 +467,7 @@ GetPrepStmtNumber(PGconn *conn)
  * marked with have_error = true.
  */
 void
-pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
+pgfdw_report_error(int elevel, PGresult *res, PgFdwConn *conn,
 				   bool clear, const char *sql)
 {
 	/* If requested, PGresult must be released before leaving this function. */
@@ -490,7 +495,7 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
 		 * return NULL, not a PGresult at all.
 		 */
 		if (message_primary == NULL)
-			message_primary = PQerrorMessage(conn);
+			message_primary = PFCerrorMessage(conn);
 
 		ereport(elevel,
 				(errcode(sqlstate),
@@ -542,7 +547,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 		if (entry->xact_depth > 0)
 		{
 			elog(DEBUG3, "closing remote transaction on connection %p",
-				 entry->conn);
+				 PFCgetPGconn(entry->conn));
 
 			switch (event)
 			{
@@ -567,7 +572,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 					 */
 					if (entry->have_prep_stmt && entry->have_error)
 					{
-						res = PQexec(entry->conn, "DEALLOCATE ALL");
+						res = PFCexec(entry->conn, "DEALLOCATE ALL");
 						PQclear(res);
 					}
 					entry->have_prep_stmt = false;
@@ -597,7 +602,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 					/* Assume we might have lost track of prepared statements */
 					entry->have_error = true;
 					/* If we're aborting, abort all remote transactions too */
-					res = PQexec(entry->conn, "ABORT TRANSACTION");
+					res = PFCexec(entry->conn, "ABORT TRANSACTION");
 					/* Note: can't throw ERROR, it would be infinite loop */
 					if (PQresultStatus(res) != PGRES_COMMAND_OK)
 						pgfdw_report_error(WARNING, res, entry->conn, true,
@@ -608,7 +613,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 						/* As above, make sure to clear any prepared stmts */
 						if (entry->have_prep_stmt && entry->have_error)
 						{
-							res = PQexec(entry->conn, "DEALLOCATE ALL");
+							res = PFCexec(entry->conn, "DEALLOCATE ALL");
 							PQclear(res);
 						}
 						entry->have_prep_stmt = false;
@@ -620,17 +625,19 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 
 		/* Reset state to show we're out of a transaction */
 		entry->xact_depth = 0;
+		PFCcancelAsync(entry->conn);
+		PFCinit(entry->conn);
 
 		/*
 		 * If the connection isn't in a good idle state, discard it to
 		 * recover. Next GetConnection will open a new connection.
 		 */
-		if (PQstatus(entry->conn) != CONNECTION_OK ||
-			PQtransactionStatus(entry->conn) != PQTRANS_IDLE)
+		if (PFCstatus(entry->conn) != CONNECTION_OK ||
+			PFCtransactionStatus(entry->conn) != PQTRANS_IDLE)
 		{
-			elog(DEBUG3, "discarding connection %p", entry->conn);
-			PQfinish(entry->conn);
-			entry->conn = NULL;
+			elog(DEBUG3, "discarding connection %p",
+				 PFCgetPGconn(entry->conn));
+			PFCfinish(entry->conn);
 		}
 	}
 
@@ -676,6 +683,9 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
 		PGresult   *res;
 		char		sql[100];
 
+		/* Shut down asynchronous scan if running */
+		PFCcancelAsync(entry->conn);
+
 		/*
 		 * We only care about connections with open remote subtransactions of
 		 * the current level.
@@ -701,7 +711,7 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
 			snprintf(sql, sizeof(sql),
 					 "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
 					 curlevel, curlevel);
-			res = PQexec(entry->conn, sql);
+			res = PFCexec(entry->conn, sql);
 			if (PQresultStatus(res) != PGRES_COMMAND_OK)
 				pgfdw_report_error(WARNING, res, entry->conn, true, sql);
 			else
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index d76e739..08d9ca6 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -136,7 +136,7 @@ typedef struct PgFdwScanState
 	List	   *retrieved_attrs;	/* list of retrieved attribute numbers */
 
 	/* for remote query execution */
-	PGconn	   *conn;			/* connection for the scan */
+	PgFdwConn  *conn;			/* connection for the scan */
 	unsigned int cursor_number; /* quasi-unique ID for my cursor */
 	bool		cursor_exists;	/* have we created the cursor? */
 	int			numParams;		/* number of parameters passed to query */
@@ -156,6 +156,7 @@ typedef struct PgFdwScanState
 	/* working memory contexts */
 	MemoryContext batch_cxt;	/* context holding current batch of tuples */
 	MemoryContext temp_cxt;		/* context for per-tuple temporary data */
+	ExprContext	 *econtext;		/* copy of ps_ExprContext of ForeignScanState */
 } PgFdwScanState;
 
 /*
@@ -167,7 +168,7 @@ typedef struct PgFdwModifyState
 	AttInMetadata *attinmeta;	/* attribute datatype conversion metadata */
 
 	/* for remote query execution */
-	PGconn	   *conn;			/* connection for the scan */
+	PgFdwConn  *conn;			/* connection for the scan */
 	char	   *p_name;			/* name of prepared statement, if created */
 
 	/* extracted fdw_private data */
@@ -298,7 +299,7 @@ static void estimate_path_cost_size(PlannerInfo *root,
 						double *p_rows, int *p_width,
 						Cost *p_startup_cost, Cost *p_total_cost);
 static void get_remote_estimate(const char *sql,
-					PGconn *conn,
+					PgFdwConn *conn,
 					double *rows,
 					int *width,
 					Cost *startup_cost,
@@ -306,9 +307,8 @@ static void get_remote_estimate(const char *sql,
 static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
 						  EquivalenceClass *ec, EquivalenceMember *em,
 						  void *arg);
-static void create_cursor(ForeignScanState *node);
-static void fetch_more_data(ForeignScanState *node);
-static void close_cursor(PGconn *conn, unsigned int cursor_number);
+static void create_cursor(PgFdwScanState *node);
+static void close_cursor(PgFdwConn *conn, unsigned int cursor_number);
 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
 						 ItemPointer tupleid,
@@ -329,7 +329,6 @@ static HeapTuple make_tuple_from_result_row(PGresult *res,
 						   MemoryContext temp_context);
 static void conversion_error_callback(void *arg);
 
-
 /*
  * Foreign-data wrapper handler function: return a struct with pointers
  * to my callback routines.
@@ -982,6 +981,15 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
 		fsstate->param_values = (const char **) palloc0(numParams * sizeof(char *));
 	else
 		fsstate->param_values = NULL;
+
+	fsstate->econtext = node->ss.ps.ps_ExprContext;
+
+	/*
+	 * Start scanning asynchronously if it is the first scan on this
+	 * connection.
+	 */
+	if (PFCgetNscans(fsstate->conn) == 1)
+		create_cursor(fsstate);
 }
 
 /*
@@ -1000,7 +1008,7 @@ postgresIterateForeignScan(ForeignScanState *node)
 	 * cursor on the remote side.
 	 */
 	if (!fsstate->cursor_exists)
-		create_cursor(node);
+		create_cursor(fsstate);
 
 	/*
 	 * Get some more tuples, if we've run out.
@@ -1009,7 +1017,7 @@ postgresIterateForeignScan(ForeignScanState *node)
 	{
 		/* No point in another fetch if we already detected EOF, though. */
 		if (!fsstate->eof_reached)
-			fetch_more_data(node);
+			fetch_more_data(fsstate);
 		/* If we didn't get any tuples, must be end of data. */
 		if (fsstate->next_tuple >= fsstate->num_tuples)
 			return ExecClearTuple(slot);
@@ -1069,7 +1077,7 @@ postgresReScanForeignScan(ForeignScanState *node)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexec(fsstate->conn, sql);
+	res = PFCexec(fsstate->conn, sql);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
 	PQclear(res);
@@ -1398,13 +1406,13 @@ postgresExecForeignInsert(EState *estate,
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexecPrepared(fmstate->conn,
-						 fmstate->p_name,
-						 fmstate->p_nums,
-						 p_values,
-						 NULL,
-						 NULL,
-						 0);
+	res = PFCexecPrepared(fmstate->conn,
+							 fmstate->p_name,
+							 fmstate->p_nums,
+							 p_values,
+							 NULL,
+							 NULL,
+							 0);
 	if (PQresultStatus(res) !=
 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -1468,13 +1476,13 @@ postgresExecForeignUpdate(EState *estate,
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexecPrepared(fmstate->conn,
-						 fmstate->p_name,
-						 fmstate->p_nums,
-						 p_values,
-						 NULL,
-						 NULL,
-						 0);
+	res = PFCexecPrepared(fmstate->conn,
+						   fmstate->p_name,
+						   fmstate->p_nums,
+						   p_values,
+						   NULL,
+						   NULL,
+						   0);
 	if (PQresultStatus(res) !=
 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -1538,13 +1546,13 @@ postgresExecForeignDelete(EState *estate,
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexecPrepared(fmstate->conn,
-						 fmstate->p_name,
-						 fmstate->p_nums,
-						 p_values,
-						 NULL,
-						 NULL,
-						 0);
+	res = PFCexecPrepared(fmstate->conn,
+						   fmstate->p_name,
+						   fmstate->p_nums,
+						   p_values,
+						   NULL,
+						   NULL,
+						   0);
 	if (PQresultStatus(res) !=
 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -1594,7 +1602,7 @@ postgresEndForeignModify(EState *estate,
 		 * We don't use a PG_TRY block here, so be careful not to throw error
 		 * without releasing the PGresult.
 		 */
-		res = PQexec(fmstate->conn, sql);
+		res = PFCexec(fmstate->conn, sql);
 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
 			pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
 		PQclear(res);
@@ -1726,7 +1734,7 @@ estimate_path_cost_size(PlannerInfo *root,
 		List	   *local_join_conds;
 		StringInfoData sql;
 		List	   *retrieved_attrs;
-		PGconn	   *conn;
+		PgFdwConn  *conn;
 		Selectivity local_sel;
 		QualCost	local_cost;
 
@@ -1836,7 +1844,7 @@ estimate_path_cost_size(PlannerInfo *root,
  * The given "sql" must be an EXPLAIN command.
  */
 static void
-get_remote_estimate(const char *sql, PGconn *conn,
+get_remote_estimate(const char *sql, PgFdwConn *conn,
 					double *rows, int *width,
 					Cost *startup_cost, Cost *total_cost)
 {
@@ -1852,7 +1860,7 @@ get_remote_estimate(const char *sql, PGconn *conn,
 		/*
 		 * Execute EXPLAIN remotely.
 		 */
-		res = PQexec(conn, sql);
+		res = PFCexec(conn, sql);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, sql);
 
@@ -1917,13 +1925,12 @@ ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
  * Create cursor for node's query with current parameter values.
  */
 static void
-create_cursor(ForeignScanState *node)
+create_cursor(PgFdwScanState *fsstate)
 {
-	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
-	ExprContext *econtext = node->ss.ps.ps_ExprContext;
+	ExprContext *econtext = fsstate->econtext;
 	int			numParams = fsstate->numParams;
 	const char **values = fsstate->param_values;
-	PGconn	   *conn = fsstate->conn;
+	PgFdwConn	*conn = fsstate->conn;
 	StringInfoData buf;
 	PGresult   *res;
 
@@ -1985,8 +1992,8 @@ create_cursor(ForeignScanState *node)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexecParams(conn, buf.data, numParams, NULL, values,
-					   NULL, NULL, 0);
+	res = PFCexecParams(conn, buf.data, numParams, NULL, values,
+						 NULL, NULL, 0);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
 	PQclear(res);
@@ -2001,15 +2008,21 @@ create_cursor(ForeignScanState *node)
 
 	/* Clean up */
 	pfree(buf.data);
+
+	/*
+	 * Start async scan if this is the first scan. See fetch_more_data() for
+	 * details
+	 */
+	if (PFCgetNscans(conn) == 1)
+		fetch_more_data(fsstate);
 }
 
 /*
  * Fetch some more rows from the node's cursor.
  */
-static void
-fetch_more_data(ForeignScanState *node)
+void
+fetch_more_data(PgFdwScanState *fsstate)
 {
-	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
 	PGresult   *volatile res = NULL;
 	MemoryContext oldcontext;
 
@@ -2024,7 +2037,7 @@ fetch_more_data(ForeignScanState *node)
 	/* PGresult must be released before leaving this function. */
 	PG_TRY();
 	{
-		PGconn	   *conn = fsstate->conn;
+		PgFdwConn  *conn = fsstate->conn;
 		char		sql[64];
 		int			fetch_size;
 		int			numrows;
@@ -2036,9 +2049,57 @@ fetch_more_data(ForeignScanState *node)
 		snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
 				 fetch_size, fsstate->cursor_number);
 
-		res = PQexec(conn, sql);
+		if (PFCisAsyncRunning(conn))
+		{
+			/* Get result of running async fetch */
+			res = PFCgetResult(conn);
+			if (PQntuples(res) == fetch_size)
+			{
+				/*
+				 * Connection state doesn't go to IDLE even if all data
+				 * has been sent to client for asynchronous query. One
+				 * more PQgetResult() is needed to reset the state to
+				 * IDLE.  See PQexecFinish() for details.
+				 */
+				if (PFCgetResult(conn) != NULL)
+					elog(ERROR, "Connection status error.");
+			}
+
+			/*
+			 * On the current postgres_fdw implement, multiple PgFdwScanState
+			 * on the same foreign server and mapped user share the same
+			 * connection to the remote server (see GetConnection() in
+			 * connection.c) and inidividual scans on it are separated using
+			 * cursors. Since one connection cannot accept two or more
+			 * asynchronous queries simultaneously, we should stop the async
+			 * fetching if the another scan comes.
+			 */
+			if (PFCgetNscans(conn) > 1)
+				PFCsetAsyncScan(conn, NULL);
+		}
+		else
+		{
+			/*
+			 * If no async scan is running and the number of scans running on
+			 * this connection is 1, start async fetch.
+			 */
+			if (PFCgetNscans(conn) == 1)
+			{
+				if (!PFCsendQuery(conn, sql))
+					pgfdw_report_error(ERROR, res, conn, false,
+									   fsstate->query);
+
+				PFCsetAsyncScan(conn, fsstate);
+				goto end_of_fetch;
+			}
+
+			/* Elsewise do synchronous query execution */
+			PFCsetAsyncScan(conn, NULL);
+			res = PFCexec(conn, sql);
+		}
+
 		/* On error, report the original query, not the FETCH. */
-		if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		if (res &&  PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
 
 		/* Convert the data into HeapTuples */
@@ -2066,6 +2127,26 @@ fetch_more_data(ForeignScanState *node)
 
 		PQclear(res);
 		res = NULL;
+
+		if (PFCisAsyncRunning(conn))
+		{
+			if (!fsstate->eof_reached)
+			{
+				/*
+				 * We can immediately request the next bunch of tuples if
+				 * we're on asynchronous connection.
+				 */
+				if (!PFCsendQuery(conn, sql))
+					pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
+			}
+			else
+			{
+				PFCsetAsyncScan(conn, NULL);
+			}
+		}
+
+end_of_fetch:
+		;	/* Nothing to do here but needed to make compiler quiet. */
 	}
 	PG_CATCH();
 	{
@@ -2079,6 +2160,31 @@ fetch_more_data(ForeignScanState *node)
 }
 
 /*
+ * Force cancelling async command state.
+ */
+void
+finish_async_connection(PgFdwConn *conn)
+{
+	PgFdwScanState *fsstate = PFCgetAsyncScan(conn);
+	PgFdwConn *async_conn;
+
+	/* Nothing to do if no async connection */
+	if (fsstate == NULL) return;
+	async_conn = fsstate->conn;
+	Assert(async_conn && PFCgetNscans(async_conn) != 1);
+
+	/* Finish async command if any */
+	if (PFCisAsyncRunning(async_conn))
+		fetch_more_data(PFCgetAsyncScan(async_conn));
+
+	Assert(!PFCisAsyncRunning(async_conn));
+
+	/* Immediately discard the result */
+	fsstate->next_tuple = 0;
+	fsstate->num_tuples = 0;
+}
+
+/*
  * Force assorted GUC parameters to settings that ensure that we'll output
  * data values in a form that is unambiguous to the remote server.
  *
@@ -2132,7 +2238,7 @@ reset_transmission_modes(int nestlevel)
  * Utility routine to close a cursor.
  */
 static void
-close_cursor(PGconn *conn, unsigned int cursor_number)
+close_cursor(PgFdwConn *conn, unsigned int cursor_number)
 {
 	char		sql[64];
 	PGresult   *res;
@@ -2143,7 +2249,7 @@ close_cursor(PGconn *conn, unsigned int cursor_number)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexec(conn, sql);
+	res = PFCexec(conn, sql);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, conn, true, sql);
 	PQclear(res);
@@ -2175,11 +2281,11 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQprepare(fmstate->conn,
-					p_name,
-					fmstate->query,
-					0,
-					NULL);
+	res = PFCprepare(fmstate->conn,
+					 p_name,
+					 fmstate->query,
+					 0,
+					 NULL);
 
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -2297,7 +2403,7 @@ postgresAnalyzeForeignTable(Relation relation,
 	ForeignTable *table;
 	ForeignServer *server;
 	UserMapping *user;
-	PGconn	   *conn;
+	PgFdwConn	*conn;
 	StringInfoData sql;
 	PGresult   *volatile res = NULL;
 
@@ -2329,7 +2435,7 @@ postgresAnalyzeForeignTable(Relation relation,
 	/* In what follows, do not risk leaking any PGresults. */
 	PG_TRY();
 	{
-		res = PQexec(conn, sql.data);
+		res = PFCexec(conn, sql.data);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, sql.data);
 
@@ -2379,7 +2485,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
 	ForeignTable *table;
 	ForeignServer *server;
 	UserMapping *user;
-	PGconn	   *conn;
+	PgFdwConn	*conn;
 	unsigned int cursor_number;
 	StringInfoData sql;
 	PGresult   *volatile res = NULL;
@@ -2423,7 +2529,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
 	/* In what follows, do not risk leaking any PGresults. */
 	PG_TRY();
 	{
-		res = PQexec(conn, sql.data);
+		res = PFCexec(conn, sql.data);
 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
 			pgfdw_report_error(ERROR, res, conn, false, sql.data);
 		PQclear(res);
@@ -2453,7 +2559,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
 			snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
 					 fetch_size, cursor_number);
 
-			res = PQexec(conn, fetch_sql);
+			res = PFCexec(conn, fetch_sql);
 			/* On error, report the original query, not the FETCH. */
 			if (PQresultStatus(res) != PGRES_TUPLES_OK)
 				pgfdw_report_error(ERROR, res, conn, false, sql.data);
@@ -2582,7 +2688,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 	bool		import_not_null = true;
 	ForeignServer *server;
 	UserMapping *mapping;
-	PGconn	   *conn;
+	PgFdwConn   *conn;
 	StringInfoData buf;
 	PGresult   *volatile res = NULL;
 	int			numrows,
@@ -2615,7 +2721,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 	conn = GetConnection(server, mapping, false);
 
 	/* Don't attempt to import collation if remote server hasn't got it */
-	if (PQserverVersion(conn) < 90100)
+	if (PFCserverVersion(conn) < 90100)
 		import_collate = false;
 
 	/* Create workspace for strings */
@@ -2628,7 +2734,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 		appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
 		deparseStringLiteral(&buf, stmt->remote_schema);
 
-		res = PQexec(conn, buf.data);
+		res = PFCexec(conn, buf.data);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, buf.data);
 
@@ -2723,7 +2829,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 		appendStringInfo(&buf, " ORDER BY c.relname, a.attnum");
 
 		/* Fetch the data */
-		res = PQexec(conn, buf.data);
+		res = PFCexec(conn, buf.data);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, buf.data);
 
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index 950c6f7..cac7dfc 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -18,19 +18,23 @@
 #include "nodes/relation.h"
 #include "utils/relcache.h"
 
-#include "libpq-fe.h"
+#include "PgFdwConn.h"
+
+struct PgFdwScanState;
 
 /* in postgres_fdw.c */
 extern int	set_transmission_modes(void);
 extern void reset_transmission_modes(int nestlevel);
+extern void fetch_more_data(struct PgFdwScanState *node);
+extern void finish_async_connection(PgFdwConn *fsstate);
 
 /* in connection.c */
-extern PGconn *GetConnection(ForeignServer *server, UserMapping *user,
+extern PgFdwConn *GetConnection(ForeignServer *server, UserMapping *user,
 			  bool will_prep_stmt);
-extern void ReleaseConnection(PGconn *conn);
-extern unsigned int GetCursorNumber(PGconn *conn);
-extern unsigned int GetPrepStmtNumber(PGconn *conn);
-extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
+extern void ReleaseConnection(PgFdwConn *conn);
+extern unsigned int GetCursorNumber(PgFdwConn *conn);
+extern unsigned int GetPrepStmtNumber(PgFdwConn *conn);
+extern void pgfdw_report_error(int elevel, PGresult *res, PgFdwConn *conn,
 				   bool clear, const char *sql);
 
 /* in option.c */
-- 
2.1.0.GIT

