diff --git a/contrib/postgres_fdw/Makefile b/contrib/postgres_fdw/Makefile
index 3543312..8054330 100644
--- a/contrib/postgres_fdw/Makefile
+++ b/contrib/postgres_fdw/Makefile
@@ -11,6 +11,7 @@ EXTENSION = postgres_fdw
 DATA = postgres_fdw--1.0.sql
 
 REGRESS = postgres_fdw
+REGRESS_OPTS= --temp-config $(top_srcdir)/contrib/postgres_fdw/pgfdw.conf
 
 ifdef USE_PGXS
 PG_CONFIG = pg_config
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 5fabc99..d89ead6 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -14,7 +14,9 @@
 
 #include "postgres_fdw.h"
 
+#include "access/fdw_xact.h"
 #include "access/xact.h"
+#include "commands/defrem.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -64,16 +66,19 @@ static unsigned int prep_stmt_number = 0;
 static bool xact_got_connection = false;
 
 /* prototypes of private functions */
-static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
+static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user,
+								 bool connection_error_ok);
 static void check_conn_params(const char **keywords, const char **values);
 static void configure_remote_session(PGconn *conn);
 static void do_sql_command(PGconn *conn, const char *sql);
-static void begin_remote_xact(ConnCacheEntry *entry);
+static void begin_remote_xact(ConnCacheEntry *entry, Oid serverid, Oid userid);
 static void pgfdw_xact_callback(XactEvent event, void *arg);
 static void pgfdw_subxact_callback(SubXactEvent event,
 					   SubTransactionId mySubid,
 					   SubTransactionId parentSubid,
 					   void *arg);
+static bool server_uses_two_phase_commit(ForeignServer *server);
+static void pgfdw_cleanup_after_transaction(ConnCacheEntry *entry);
 
 
 /*
@@ -86,6 +91,9 @@ static void pgfdw_subxact_callback(SubXactEvent event,
  * statements.  Since those don't go away automatically at transaction end
  * (not even on error), we need this flag to cue manual cleanup.
  *
+ * connection_error_ok if true, indicates that caller can handle connection
+ * error by itself. If false, raise error.
+ *
  * XXX Note that caching connections theoretically requires a mechanism to
  * detect change of FDW objects to invalidate already established connections.
  * We could manage that by watching for invalidation events on the relevant
@@ -94,7 +102,8 @@ static void pgfdw_subxact_callback(SubXactEvent event,
  * mid-transaction anyway.
  */
 PGconn *
-GetConnection(UserMapping *user, bool will_prep_stmt)
+GetConnection(UserMapping *user, bool will_prep_stmt,
+			  bool start_transaction, bool connection_error_ok)
 {
 	bool		found;
 	ConnCacheEntry *entry;
@@ -122,9 +131,6 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
 		RegisterSubXactCallback(pgfdw_subxact_callback, NULL);
 	}
 
-	/* Set flag that we did GetConnection during the current transaction */
-	xact_got_connection = true;
-
 	/* Create hash key for the entry.  Assume no pad bytes in key struct */
 	key = user->umid;
 
@@ -159,7 +165,20 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
 		entry->xact_depth = 0;	/* just to be sure */
 		entry->have_prep_stmt = false;
 		entry->have_error = false;
-		entry->conn = connect_pg_server(server, user);
+		entry->conn = connect_pg_server(server, user, connection_error_ok);
+
+		/*
+		 * If the attempt to connect to the foreign server failed, we should not
+		 * come here, unless the caller has indicated so.
+		 */
+		Assert(entry->conn || connection_error_ok);
+
+		if (!entry->conn && connection_error_ok)
+		{
+			elog(DEBUG3, "attempt to connection to server \"%s\" by postgres_fdw failed",
+				 server->servername);
+			return NULL;
+		}
 
 		elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
 			 entry->conn, server->servername, user->umid, user->userid);
@@ -168,7 +187,12 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
 	/*
 	 * Start a new transaction or subtransaction if needed.
 	 */
-	begin_remote_xact(entry);
+	if (start_transaction)
+	{
+		begin_remote_xact(entry, user->serverid, user->userid);
+		/* Set flag that we did GetConnection during the current transaction */
+		xact_got_connection = true;
+	}
 
 	/* Remember if caller will prepare statements */
 	entry->have_prep_stmt |= will_prep_stmt;
@@ -178,9 +202,12 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
 
 /*
  * Connect to remote server using specified server and user mapping properties.
+ * If the attempt to connect fails, and the caller can handle connection failure
+ * (connection_error_ok = true) return NULL, throw error otherwise.
  */
 static PGconn *
-connect_pg_server(ForeignServer *server, UserMapping *user)
+connect_pg_server(ForeignServer *server, UserMapping *user,
+				  bool connection_error_ok)
 {
 	PGconn	   *volatile conn = NULL;
 
@@ -235,11 +262,14 @@ connect_pg_server(ForeignServer *server, UserMapping *user)
 			msglen = strlen(connmessage);
 			if (msglen > 0 && connmessage[msglen - 1] == '\n')
 				connmessage[msglen - 1] = '\0';
-			ereport(ERROR,
-			   (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
-				errmsg("could not connect to server \"%s\"",
-					   server->servername),
-				errdetail_internal("%s", connmessage)));
+
+			if (connection_error_ok)
+				return NULL;
+			else
+				ereport(ERROR,
+						(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
+						errmsg("could not connect to server \"%s\"", server->servername),
+						 errdetail_internal("%s", connmessage)));
 		}
 
 		/*
@@ -370,15 +400,22 @@ do_sql_command(PGconn *conn, const char *sql)
  * control which remote queries share a snapshot.
  */
 static void
-begin_remote_xact(ConnCacheEntry *entry)
+begin_remote_xact(ConnCacheEntry *entry, Oid serverid, Oid userid)
 {
 	int			curlevel = GetCurrentTransactionNestLevel();
+	ForeignServer *server = GetForeignServer(serverid);
 
 	/* Start main transaction if we haven't yet */
 	if (entry->xact_depth <= 0)
 	{
 		const char *sql;
 
+		/*
+		 * Register the new foreign server and check whether the two phase
+		 * compliance is possible.
+		 */
+		RegisterXactForeignServer(serverid, userid, server_uses_two_phase_commit(server));
+
 		elog(DEBUG3, "starting remote transaction on connection %p",
 			 entry->conn);
 
@@ -586,158 +623,284 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
 }
 
 /*
- * pgfdw_xact_callback --- cleanup at main-transaction end.
+ * postgresGetPrepareId
+ *
+ * The function crafts prepared transaction identifier. PostgreSQL documentation
+ * mentions two restrictions on the name
+ * 1. String literal, less than 200 bytes long.
+ * 2. Should not be same as any other concurrent prepared transaction id.
+ *
+ * To make the prepared transaction id, we should ideally use something like
+ * UUID, which gives unique ids with high probability, but that may be expensive
+ * here and UUID extension which provides the function to generate UUID is
+ * not part of the core.
  */
-static void
-pgfdw_xact_callback(XactEvent event, void *arg)
+extern char *
+postgresGetPrepareId(Oid serverid, Oid userid, int *prep_info_len)
 {
-	HASH_SEQ_STATUS scan;
-	ConnCacheEntry *entry;
+/* Maximum length of the prepared transaction id, borrowed from twophase.c */
+#define PREP_XACT_ID_MAX_LEN 200
+#define RANDOM_LARGE_MULTIPLIER 1000
+	char	*prep_info;
+
+	/* Allocate the memory in the same context as the hash entry */
+	prep_info = (char *)palloc(PREP_XACT_ID_MAX_LEN * sizeof(char));
+	snprintf(prep_info, PREP_XACT_ID_MAX_LEN, "%s_%4d_%d_%d",
+								"px", abs(random() * RANDOM_LARGE_MULTIPLIER),
+								serverid, userid);
+	/* Account for the last NULL byte */
+	*prep_info_len = strlen(prep_info);
+	return prep_info;
+}
 
-	/* Quick exit if no connections were touched in this transaction. */
-	if (!xact_got_connection)
-		return;
+/*
+ * postgresPrepareForeignTransaction
+ *
+ * The function prepares transaction on foreign server.
+ */
+bool
+postgresPrepareForeignTransaction(Oid serverid, Oid userid, Oid umid,
+								  int prep_info_len, char *prep_info)
+{
+	StringInfo		command;
+	PGresult		*res;
+	ConnCacheEntry	*entry = NULL;
+	ConnCacheKey	 key;
+	bool			found;
+
+	/* Create hash key for the entry.  Assume no pad bytes in key struct */
+	key = umid;
+
+	Assert(ConnectionHash);
+	entry = hash_search(ConnectionHash, &key, HASH_FIND, &found);
+
+	if (found && entry->conn)
+	{
+		bool result;
+		PGconn	*conn = entry->conn;
+
+		command = makeStringInfo();
+		appendStringInfo(command, "PREPARE TRANSACTION '%.*s'", prep_info_len,
+																	prep_info);
+		res = PQexec(conn, command->data);
+		result = (PQresultStatus(res) == PGRES_COMMAND_OK);
+
+		if (!result)
+		{
+			/*
+			 * TODO: check whether we should raise an error or warning.
+			 * The command failed, raise a warning, so that the reason for
+			 * failure gets logged. Do not raise an error, the caller i.e. foreign
+			 * transaction manager takes care of taking appropriate action.
+			 */
+			pgfdw_report_error(WARNING, res, conn, false, command->data);
+		}
+
+		PQclear(res);
+		pgfdw_cleanup_after_transaction(entry);
+		return result;
+	}
+	else
+		return false;
+}
+
+bool
+postgresEndForeignTransaction(Oid serverid, Oid userid, Oid umid, bool is_commit)
+{
+	StringInfo		command;
+	PGresult		*res;
+	ConnCacheEntry	*entry = NULL;
+	ConnCacheKey	 key;
+	bool			found;
+
+	/* Create hash key for the entry.  Assume no pad bytes in key struct */
+	key = umid;
+
+	Assert(ConnectionHash);
+	entry = hash_search(ConnectionHash, &key, HASH_FIND, &found);
+
+	if (found && entry->conn)
+	{
+		PGconn	*conn = entry->conn;
+		bool	result;
+
+		command = makeStringInfo();
+		appendStringInfo(command, "%s TRANSACTION",
+							is_commit ? "COMMIT" : "ROLLBACK");
+		res = PQexec(conn, command->data);
+		result = (PQresultStatus(res) == PGRES_COMMAND_OK);
+		if (!result)
+		{
+			/*
+			 * The local transaction has ended, so there is no point in raising
+			 * error. Raise a warning so that the reason for the failure gets
+			 * logged.
+			 */
+			pgfdw_report_error(WARNING, res, conn, false, command->data);
+		}
+
+		PQclear(res);
+		pgfdw_cleanup_after_transaction(entry);
+		return result;
+	}
+	return false;
+}
+
+/*
+ * postgresResolvePreparedForeignTransaction
+ *
+ * The function commit or abort prepared transaction on foreign server.
+ * This function could be called when we don't have any connections to the
+ * foreign server involving distributed transaction being resolved.
+ */
+bool
+postgresResolvePreparedForeignTransaction(Oid serverid, Oid userid, Oid umid,
+										  bool is_commit,
+										  int prep_info_len, char *prep_info)
+{
+	PGconn			*conn = NULL;
 
 	/*
-	 * Scan all connection cache entries to find open remote transactions, and
-	 * close them.
+	 * If there exists a connection in the connection cache that can be used,
+	 * use it. If there is none, we need foreign server and user information
+	 * which can be obtained only when in a transaction block.
+	 * If we are resolving prepared foreign transactions immediately after
+	 * preparing them, the connection hash would have a connection. If we are
+	 * resolving them any other time, a resolver would have started a
+	 * transaction.
 	 */
-	hash_seq_init(&scan, ConnectionHash);
-	while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+	if (ConnectionHash)
 	{
-		PGresult   *res;
+		/* Connection hash should have a connection we want */
+		bool		found;
+		ConnCacheKey key;
+		ConnCacheEntry	*entry;
 
-		/* Ignore cache entry if no open connection right now */
-		if (entry->conn == NULL)
-			continue;
+		/* Create hash key for the entry.  Assume no pad bytes in key struct */
+		key = umid;
+
+		entry = (ConnCacheEntry *)hash_search(ConnectionHash, &key, HASH_FIND, &found);
+		if (found && entry->conn)
+			conn = entry->conn;
+	}
+
+	if (!conn && IsTransactionState())
+		conn = GetConnection(GetUserMapping(userid, serverid), false, false, true);
+
+	/* Proceed with resolution if we got a connection, else return false */
+	if (conn)
+	{
+		StringInfo		command;
+		PGresult		*res;
+		bool			result;
 
-		/* If it has an open remote transaction, try to close it */
-		if (entry->xact_depth > 0)
+		command = makeStringInfo();
+		appendStringInfo(command, "%s PREPARED '%.*s'",
+							is_commit ? "COMMIT" : "ROLLBACK",
+							prep_info_len, prep_info);
+		res = PQexec(conn, command->data);
+
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		{
-			elog(DEBUG3, "closing remote transaction on connection %p",
-				 entry->conn);
+			int		sqlstate;
+			char	*diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
+			/*
+			 * The command failed, raise a warning to log the reason of failure.
+			 * We may not be in a transaction here, so raising error doesn't
+			 * help. Even if we are in a transaction, it would be the resolver
+			 * transaction, which will get aborted on raising error, thus
+			 * delaying resolution of other prepared foreign transactions.
+			 */
+			pgfdw_report_error(WARNING, res, conn, false, command->data);
 
-			switch (event)
+			if (diag_sqlstate)
 			{
-				case XACT_EVENT_PARALLEL_PRE_COMMIT:
-				case XACT_EVENT_PRE_COMMIT:
-					/* Commit all remote transactions during pre-commit */
-					do_sql_command(entry->conn, "COMMIT TRANSACTION");
-
-					/*
-					 * If there were any errors in subtransactions, and we
-					 * made prepared statements, do a DEALLOCATE ALL to make
-					 * sure we get rid of all prepared statements. This is
-					 * annoying and not terribly bulletproof, but it's
-					 * probably not worth trying harder.
-					 *
-					 * DEALLOCATE ALL only exists in 8.3 and later, so this
-					 * constrains how old a server postgres_fdw can
-					 * communicate with.  We intentionally ignore errors in
-					 * the DEALLOCATE, so that we can hobble along to some
-					 * extent with older servers (leaking prepared statements
-					 * as we go; but we don't really support update operations
-					 * pre-8.3 anyway).
-					 */
-					if (entry->have_prep_stmt && entry->have_error)
-					{
-						res = PQexec(entry->conn, "DEALLOCATE ALL");
-						PQclear(res);
-					}
-					entry->have_prep_stmt = false;
-					entry->have_error = false;
-					break;
-				case XACT_EVENT_PRE_PREPARE:
-
-					/*
-					 * We disallow remote transactions that modified anything,
-					 * since it's not very reasonable to hold them open until
-					 * the prepared transaction is committed.  For the moment,
-					 * throw error unconditionally; later we might allow
-					 * read-only cases.  Note that the error will cause us to
-					 * come right back here with event == XACT_EVENT_ABORT, so
-					 * we'll clean up the connection state at that point.
-					 */
-					ereport(ERROR,
-							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-							 errmsg("cannot prepare a transaction that modified remote tables")));
-					break;
-				case XACT_EVENT_PARALLEL_COMMIT:
-				case XACT_EVENT_COMMIT:
-				case XACT_EVENT_PREPARE:
-					/* Pre-commit should have closed the open transaction */
-					elog(ERROR, "missed cleaning up connection during pre-commit");
-					break;
-				case XACT_EVENT_PARALLEL_ABORT:
-				case XACT_EVENT_ABORT:
-					/* Assume we might have lost track of prepared statements */
-					entry->have_error = true;
-
-					/*
-					 * If a command has been submitted to the remote server by
-					 * using an asynchronous execution function, the command
-					 * might not have yet completed.  Check to see if a
-					 * command is still being processed by the remote server,
-					 * and if so, request cancellation of the command.
-					 */
-					if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
-					{
-						PGcancel   *cancel;
-						char		errbuf[256];
-
-						if ((cancel = PQgetCancel(entry->conn)))
-						{
-							if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
-								ereport(WARNING,
-										(errcode(ERRCODE_CONNECTION_FAILURE),
-								  errmsg("could not send cancel request: %s",
-										 errbuf)));
-							PQfreeCancel(cancel);
-						}
-					}
-
-					/* If we're aborting, abort all remote transactions too */
-					res = PQexec(entry->conn, "ABORT TRANSACTION");
-					/* Note: can't throw ERROR, it would be infinite loop */
-					if (PQresultStatus(res) != PGRES_COMMAND_OK)
-						pgfdw_report_error(WARNING, res, entry->conn, true,
-										   "ABORT TRANSACTION");
-					else
-					{
-						PQclear(res);
-						/* As above, make sure to clear any prepared stmts */
-						if (entry->have_prep_stmt && entry->have_error)
-						{
-							res = PQexec(entry->conn, "DEALLOCATE ALL");
-							PQclear(res);
-						}
-						entry->have_prep_stmt = false;
-						entry->have_error = false;
-					}
-					break;
+				sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
+										 diag_sqlstate[1],
+										 diag_sqlstate[2],
+										 diag_sqlstate[3],
+										 diag_sqlstate[4]);
 			}
+			else
+				sqlstate = ERRCODE_CONNECTION_FAILURE;
+
+			/*
+			 * If we tried to COMMIT/ABORT a prepared transaction and the prepared
+			 * transaction was missing on the foreign server, it was probably
+			 * resolved by some other means. Anyway, it should be considered as resolved.
+			 */
+			result = (sqlstate == ERRCODE_UNDEFINED_OBJECT);
 		}
+		else
+			result = true;
 
-		/* Reset state to show we're out of a transaction */
-		entry->xact_depth = 0;
+		PQclear(res);
+		ReleaseConnection(conn);
+		return result;
+	}
+	else
+		return false;
+}
 
-		/*
-		 * If the connection isn't in a good idle state, discard it to
-		 * recover. Next GetConnection will open a new connection.
-		 */
-		if (PQstatus(entry->conn) != CONNECTION_OK ||
-			PQtransactionStatus(entry->conn) != PQTRANS_IDLE)
-		{
-			elog(DEBUG3, "discarding connection %p", entry->conn);
-			PQfinish(entry->conn);
-			entry->conn = NULL;
-		}
+static void
+pgfdw_cleanup_after_transaction(ConnCacheEntry *entry)
+{
+	/*
+	 * If there were any errors in subtransactions, and we made prepared
+	 * statements, do a DEALLOCATE ALL to make sure we get rid of all
+	 * prepared statements. This is annoying and not terribly bulletproof,
+	 * but it's probably not worth trying harder.
+	 *
+	 * DEALLOCATE ALL only exists in 8.3 and later, so this constrains how
+	 * old a server postgres_fdw can communicate with.	We intentionally
+	 * ignore errors in the DEALLOCATE, so that we can hobble along to some
+	 * extent with older servers (leaking prepared statements as we go;
+	 * but we don't really support update operations pre-8.3 anyway).
+	 */
+	if (entry->have_prep_stmt && entry->have_error)
+	{
+		PGresult *res = PQexec(entry->conn, "DEALLOCATE ALL");
+		PQclear(res);
 	}
 
+	entry->have_prep_stmt = false;
+	entry->have_error = false;
+	/* Reset state to show we're out of a transaction */
+	entry->xact_depth = 0;
+
 	/*
+	 * If the connection isn't in a good idle state, discard it to
+	 * recover. Next GetConnection will open a new connection.
+	 */
+	if (PQstatus(entry->conn) != CONNECTION_OK ||
+		PQtransactionStatus(entry->conn) != PQTRANS_IDLE)
+	{
+		elog(DEBUG3, "discarding connection %p", entry->conn);
+		PQfinish(entry->conn);
+		entry->conn = NULL;
+	}
+
+	/*
+	 * TODO: these next two statements should be moved to end of transaction
+	 * call back.
 	 * Regardless of the event type, we can now mark ourselves as out of the
-	 * transaction.  (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
-	 * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
+	 * transaction.
+	 */
+	xact_got_connection = false;
+
+	/* Also reset cursor numbering for next transaction */
+	cursor_number = 0;
+}
+
+/*
+ * pgfdw_xact_callback --- cleanup at main-transaction end.
+ */
+static void
+pgfdw_xact_callback(XactEvent event, void *arg)
+{
+	/*
+	 * Regardless of the event type, we can now mark ourselves as out of the
+	 * transction.
 	 */
 	xact_got_connection = false;
 
@@ -836,3 +999,26 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
 		entry->xact_depth--;
 	}
 }
+
+/*
+ * server_uses_two_phase_commit
+ * Returns true if the foreign server is configured to support 2PC.
+ */
+static bool
+server_uses_two_phase_commit(ForeignServer *server)
+{
+	ListCell		*lc;
+
+	/* Check the options for two phase compliance */
+	foreach(lc, server->options)
+	{
+		DefElem    *d = (DefElem *) lfirst(lc);
+
+		if (strcmp(d->defname, "two_phase_commit") == 0)
+		{
+			return defGetBoolean(d);
+		}
+	}
+	/* By default a server is not 2PC compliant */
+	return false;
+}
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 785f520..a9fb3f7 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -13,12 +13,17 @@ DO $d$
             OPTIONS (dbname '$$||current_database()||$$',
                      port '$$||current_setting('port')||$$'
             )$$;
+        EXECUTE $$CREATE SERVER loopback3 FOREIGN DATA WRAPPER postgres_fdw
+            OPTIONS (dbname '$$||current_database()||$$',
+                     port '$$||current_setting('port')||$$'
+            )$$;
     END;
 $d$;
 CREATE USER MAPPING FOR public SERVER testserver1
 	OPTIONS (user 'value', password 'value');
 CREATE USER MAPPING FOR CURRENT_USER SERVER loopback;
 CREATE USER MAPPING FOR CURRENT_USER SERVER loopback2;
+CREATE USER MAPPING FOR CURRENT_USER SERVER loopback3;
 -- ===================================================================
 -- create objects used through FDW loopback server
 -- ===================================================================
@@ -52,6 +57,14 @@ CREATE TABLE "S 1"."T 4" (
 	c3 text,
 	CONSTRAINT t4_pkey PRIMARY KEY (c1)
 );
+CREATE TABLE "S 1"."T 5" (
+       c1 int NOT NULL,
+       CONSTRAINT t5_pkey PRIMARY KEY (c1)
+);
+CREATE TABLE "S 1"."T 6" (
+       c1 int NOT NULL,
+       CONSTRAINT t6_pkey PRIMARY KEY (c1)
+);
 INSERT INTO "S 1"."T 1"
 	SELECT id,
 	       id % 10,
@@ -78,10 +91,13 @@ INSERT INTO "S 1"."T 4"
 	       'AAA' || to_char(id, 'FM000')
 	FROM generate_series(1, 100) id;
 DELETE FROM "S 1"."T 4" WHERE c1 % 3 != 0;	-- delete for outer join tests
+INSERT INTO "S 1"."T 5"
+	SELECT generate_series(1, 100);
 ANALYZE "S 1"."T 1";
 ANALYZE "S 1"."T 2";
 ANALYZE "S 1"."T 3";
 ANALYZE "S 1"."T 4";
+ANALYZE "S 1"."T 5";
 -- ===================================================================
 -- create foreign tables
 -- ===================================================================
@@ -124,6 +140,15 @@ CREATE FOREIGN TABLE ft6 (
 	c2 int NOT NULL,
 	c3 text
 ) SERVER loopback2 OPTIONS (schema_name 'S 1', table_name 'T 4');
+CREATE FOREIGN TABLE ft7 (
+       c1 int NOT NULL
+) SERVER loopback OPTIONS (schema_name 'S 1', table_name 'T 5');
+CREATE FOREIGN TABLE ft8 (
+       c1 int NOT NULL
+) SERVER loopback2 OPTIONS (schema_name 'S 1', table_name 'T 5');
+CREATE FOREIGN TABLE ft9 (
+       c1 int NOT NULL
+) SERVER loopback3 OPTIONS (schema_name 'S 1', table_name 'T 5');
 -- A table with oids. CREATE FOREIGN TABLE doesn't support the
 -- WITH OIDS option, but ALTER does.
 CREATE FOREIGN TABLE ft_pg_type (
@@ -188,8 +213,11 @@ ALTER FOREIGN TABLE ft2 ALTER COLUMN c1 OPTIONS (column_name 'C 1');
  public | ft4        | loopback  | (schema_name 'S 1', table_name 'T 3')            | 
  public | ft5        | loopback  | (schema_name 'S 1', table_name 'T 4')            | 
  public | ft6        | loopback2 | (schema_name 'S 1', table_name 'T 4')            | 
+ public | ft7        | loopback  | (schema_name 'S 1', table_name 'T 5')            | 
+ public | ft8        | loopback2 | (schema_name 'S 1', table_name 'T 5')            | 
+ public | ft9        | loopback3 | (schema_name 'S 1', table_name 'T 5')            | 
  public | ft_pg_type | loopback  | (schema_name 'pg_catalog', table_name 'pg_type') | 
-(6 rows)
+(9 rows)
 
 -- Now we should be able to run ANALYZE.
 -- To exercise multiple code paths, we use local stats on ft1
@@ -6972,3 +7000,176 @@ AND ftoptions @> array['fetch_size=60000'];
 (1 row)
 
 ROLLBACK;
+-- ===================================================================
+-- test Atomic commit across foreign servers
+-- ===================================================================
+ALTER SERVER loopback OPTIONS(ADD two_phase_commit 'of');
+ERROR:  two_phase_commit requires a Boolean value
+ALTER SERVER loopback2 OPTIONS(ADD two_phase_commit 'on');
+ALTER SERVER loopback3 OPTIONS(ADD two_phase_commit 'on');
+\des+
+                                                                                                                                                                                                                                                      List of foreign servers
+    Name     |  Owner   | Foreign-data wrapper | Access privileges | Type | Version |                                                                                                                                                                                                          FDW Options                                                                                                                                                                                                           | Description 
+-------------+----------+----------------------+-------------------+------+---------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------
+ loopback    | masahiko | postgres_fdw         |                   |      |         | (dbname 'contrib_regression', port '50848', extensions 'postgres_fdw')                                                                                                                                                                                                                                                                                                                                                         | 
+ loopback2   | masahiko | postgres_fdw         |                   |      |         | (dbname 'contrib_regression', port '50848', two_phase_commit 'on')                                                                                                                                                                                                                                                                                                                                                             | 
+ loopback3   | masahiko | postgres_fdw         |                   |      |         | (dbname 'contrib_regression', port '50848', two_phase_commit 'on')                                                                                                                                                                                                                                                                                                                                                             | 
+ testserver1 | masahiko | postgres_fdw         |                   |      |         | (use_remote_estimate 'false', updatable 'true', fdw_startup_cost '123.456', fdw_tuple_cost '0.123', service 'value', connect_timeout 'value', dbname 'value', host 'value', hostaddr 'value', port 'value', application_name 'value', keepalives 'value', keepalives_idle 'value', keepalives_interval 'value', sslcompression 'value', sslmode 'value', sslcert 'value', sslkey 'value', sslrootcert 'value', sslcrl 'value') | 
+(4 rows)
+
+-- one not supporting server
+BEGIN;
+INSERT INTO ft7 VALUES(101);
+COMMIT;
+SELECT COUNT(*) FROM ft8;
+ count 
+-------
+   101
+(1 row)
+
+-- One not supporting server and one supporting server
+BEGIN;
+INSERT INTO ft7 VALUES(102);
+INSERT INTO ft8 VALUES(103);
+COMMIT;
+SELECT COUNT(*) FROM ft8;
+ count 
+-------
+   103
+(1 row)
+
+-- Two supporting server and one not supporting server.
+BEGIN;
+INSERT INTO ft7 VALUES(104);
+INSERT INTO ft8 VALUES(105);
+INSERT INTO ft9 VALUES(106);
+COMMIT;
+SELECT COUNT(*) FROM ft8;
+ count 
+-------
+   106
+(1 row)
+
+-- one local and one not supporting foreign server
+BEGIN;
+INSERT INTO ft7 VALUES(107);
+INSERT INTO "S 1"."T 6" VALUES (1);
+COMMIT;
+SELECT COUNT(*) FROM ft8;
+ count 
+-------
+   107
+(1 row)
+
+SELECT COUNT(*) FROM "S 1"."T 6";
+ count 
+-------
+     1
+(1 row)
+
+-- one local and one supporting foreign server and not supporting one
+BEGIN;
+INSERT INTO ft7 VALUES(108);
+INSERT INTO ft8 VALUES(109);
+INSERT INTO "S 1"."T 6" VALUES (2);
+COMMIT;
+SELECT COUNT(*) FROM ft8;
+ count 
+-------
+   109
+(1 row)
+
+SELECT COUNT(*) FROM "S 1"."T 6";
+ count 
+-------
+     2
+(1 row)
+
+-- one local and two supporting foreign server and not supporting one
+BEGIN;
+INSERT INTO ft7 VALUES(110);
+INSERT INTO ft8 VALUES(111);
+INSERT INTO ft9 VALUES(112);
+INSERT INTO "S 1"."T 6" VALUES (3);
+COMMIT;
+SELECT COUNT(*) FROM ft8;
+ count 
+-------
+   112
+(1 row)
+
+SELECT COUNT(*) FROM "S 1"."T 6";
+ count 
+-------
+     3
+(1 row)
+
+-- transaction updating on single supporting foreign server with violation on foreign server
+BEGIN;
+INSERT INTO ft8 VALUES(113);
+INSERT INTO ft8 VALUES(110); -- violation on foreign server
+ERROR:  duplicate key value violates unique constraint "t5_pkey"
+DETAIL:  Key (c1)=(110) already exists.
+CONTEXT:  Remote SQL command: INSERT INTO "S 1"."T 5"(c1) VALUES ($1)
+COMMIT;
+SELECT COUNT(*) FROM ft8;
+ count 
+-------
+   112
+(1 row)
+
+-- transaction updating on single supporting foreign server and local with violation on local
+BEGIN;
+INSERT INTO ft8 VALUES(114);
+INSERT INTO "S 1"."T 6" VALUES (4);
+INSERT INTO "S 1"."T 6" VALUES (3); -- violation on local
+ERROR:  duplicate key value violates unique constraint "t6_pkey"
+DETAIL:  Key (c1)=(3) already exists.
+COMMIT;
+SELECT COUNT(*) FROM ft8;
+ count 
+-------
+   112
+(1 row)
+
+SELECT COUNT(*) FROM "S 1"."T 6";
+ count 
+-------
+     3
+(1 row)
+
+-- violation on foreign server supporting 2PC
+BEGIN;
+INSERT INTO ft8 VALUES(115);
+INSERT INTO ft9 VALUES(116);
+INSERT INTO ft9 VALUES(110); -- violation on foreign server
+ERROR:  duplicate key value violates unique constraint "t5_pkey"
+DETAIL:  Key (c1)=(110) already exists.
+CONTEXT:  Remote SQL command: INSERT INTO "S 1"."T 5"(c1) VALUES ($1)
+COMMIT;
+SELECT COUNT(*) FROM ft8;
+ count 
+-------
+   112
+(1 row)
+
+-- transaction involing local and foreign server with violation on local server
+BEGIN;
+INSERT INTO ft8 VALUES(117);
+INSERT INTO ft9 VALUES(118);
+INSERT INTO "S 1"."T 6" VALUES (3); -- violation on local
+ERROR:  duplicate key value violates unique constraint "t6_pkey"
+DETAIL:  Key (c1)=(3) already exists.
+COMMIT;
+SELECT COUNT(*) FROM ft8;
+ count 
+-------
+   112
+(1 row)
+
+SELECT COUNT(*) FROM "S 1"."T 6";
+ count 
+-------
+     3
+(1 row)
+
diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c
index 224aed9..6a20c47 100644
--- a/contrib/postgres_fdw/option.c
+++ b/contrib/postgres_fdw/option.c
@@ -107,7 +107,8 @@ postgres_fdw_validator(PG_FUNCTION_ARGS)
 		 * Validate option value, when we can do so without any context.
 		 */
 		if (strcmp(def->defname, "use_remote_estimate") == 0 ||
-			strcmp(def->defname, "updatable") == 0)
+			strcmp(def->defname, "updatable") == 0 ||
+			strcmp(def->defname, "two_phase_commit") == 0)
 		{
 			/* these accept only boolean values */
 			(void) defGetBoolean(def);
@@ -176,6 +177,8 @@ InitPgFdwOptions(void)
 		/* fetch_size is available on both server and table */
 		{"fetch_size", ForeignServerRelationId, false},
 		{"fetch_size", ForeignTableRelationId, false},
+		/* two phase commit support */
+		{"two_phase_commit", ForeignServerRelationId, false},
 		{NULL, InvalidOid, false}
 	};
 
diff --git a/contrib/postgres_fdw/pg_fdw.conf b/contrib/postgres_fdw/pg_fdw.conf
new file mode 100644
index 0000000..b086227
--- /dev/null
+++ b/contrib/postgres_fdw/pg_fdw.conf
@@ -0,0 +1,2 @@
+ax_prepared_foreign_transactions = 100
+max_prepared_transactions = 10
diff --git a/contrib/postgres_fdw/pgfdw.conf b/contrib/postgres_fdw/pgfdw.conf
new file mode 100644
index 0000000..2184040
--- /dev/null
+++ b/contrib/postgres_fdw/pgfdw.conf
@@ -0,0 +1,2 @@
+max_prepared_transactions = 10
+max_prepared_foreign_transactions = 10
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index fbe6929..a398498 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -14,6 +14,8 @@
 
 #include "postgres_fdw.h"
 
+#include "access/fdw_xact.h"
+#include "access/xact.h"
 #include "access/htup_details.h"
 #include "access/sysattr.h"
 #include "commands/defrem.h"
@@ -465,6 +467,12 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
 	/* Support functions for join push-down */
 	routine->GetForeignJoinPaths = postgresGetForeignJoinPaths;
 
+	/* Support functions for foreign transactions */
+	routine->GetPrepareId = postgresGetPrepareId;
+	routine->PrepareForeignTransaction = postgresPrepareForeignTransaction;
+	routine->ResolvePreparedForeignTransaction = postgresResolvePreparedForeignTransaction;
+	routine->EndForeignTransaction = postgresEndForeignTransaction;
+
 	/* Support functions for upper relation push-down */
 	routine->GetForeignUpperPaths = postgresGetForeignUpperPaths;
 
@@ -1321,7 +1329,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
 	 * Get connection to the foreign server.  Connection manager will
 	 * establish new connection if necessary.
 	 */
-	fsstate->conn = GetConnection(user, false);
+	fsstate->conn = GetConnection(user, false, true, false);
 
 	/* Assign a unique ID for my cursor */
 	fsstate->cursor_number = GetCursorNumber(fsstate->conn);
@@ -1698,7 +1706,7 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
 	user = GetUserMapping(userid, table->serverid);
 
 	/* Open connection; report that we'll create a prepared statement. */
-	fmstate->conn = GetConnection(user, true);
+	fmstate->conn = GetConnection(user, true, true, false);
 	fmstate->p_name = NULL;		/* prepared statement not made yet */
 
 	/* Deconstruct fdw_private data. */
@@ -2293,7 +2301,7 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags)
 	 * Get connection to the foreign server.  Connection manager will
 	 * establish new connection if necessary.
 	 */
-	dmstate->conn = GetConnection(user, false);
+	dmstate->conn = GetConnection(user, false, true, false);
 
 	/* Initialize state variable */
 	dmstate->num_tuples = -1;	/* -1 means not set yet */
@@ -2555,7 +2563,7 @@ estimate_path_cost_size(PlannerInfo *root,
 								NULL);
 
 		/* Get the remote estimate */
-		conn = GetConnection(fpinfo->user, false);
+		conn = GetConnection(fpinfo->user, false, true, false);
 		get_remote_estimate(sql.data, conn, &rows, &width,
 							&startup_cost, &total_cost);
 		ReleaseConnection(conn);
@@ -3492,7 +3500,7 @@ postgresAnalyzeForeignTable(Relation relation,
 	 */
 	table = GetForeignTable(RelationGetRelid(relation));
 	user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
-	conn = GetConnection(user, false);
+	conn = GetConnection(user, false, true, false);
 
 	/*
 	 * Construct command to get page count for relation.
@@ -3582,7 +3590,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
 	table = GetForeignTable(RelationGetRelid(relation));
 	server = GetForeignServer(table->serverid);
 	user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
-	conn = GetConnection(user, false);
+	conn = GetConnection(user, false, true, false);
 
 	/*
 	 * Construct cursor that retrieves whole rows from remote.
@@ -3805,7 +3813,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 	 */
 	server = GetForeignServer(serverOid);
 	mapping = GetUserMapping(GetUserId(), server->serverid);
-	conn = GetConnection(mapping, false);
+	conn = GetConnection(mapping, false, true, false);
 
 	/* Don't attempt to import collation if remote server hasn't got it */
 	if (PQserverVersion(conn) < 90100)
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index f8c255e..8409671 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -13,6 +13,7 @@
 #ifndef POSTGRES_FDW_H
 #define POSTGRES_FDW_H
 
+#include "access/fdw_xact.h"
 #include "foreign/foreign.h"
 #include "lib/stringinfo.h"
 #include "nodes/relation.h"
@@ -102,7 +103,8 @@ extern int	set_transmission_modes(void);
 extern void reset_transmission_modes(int nestlevel);
 
 /* in connection.c */
-extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt);
+extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt,
+							 bool start_transaction, bool connection_error_ok);
 extern void ReleaseConnection(PGconn *conn);
 extern unsigned int GetCursorNumber(PGconn *conn);
 extern unsigned int GetPrepStmtNumber(PGconn *conn);
@@ -163,6 +165,14 @@ extern void deparseSelectStmtForRel(StringInfo buf, PlannerInfo *root,
 						RelOptInfo *foreignrel, List *tlist,
 						List *remote_conds, List *pathkeys,
 						List **retrieved_attrs, List **params_list);
+extern char	*postgresGetPrepareId(Oid serveroid, Oid userid, int *prep_info_len);
+extern bool postgresPrepareForeignTransaction(Oid serverid, Oid userid,
+											  Oid umid, int prep_info_len,
+											  char *prep_info);
+extern bool postgresResolvePreparedForeignTransaction(Oid serverid, Oid userid,
+													  Oid umid, bool is_commit,
+													  int prep_info_len, char *prep_info);
+extern bool postgresEndForeignTransaction(Oid serverid, Oid userid, Oid umid, bool is_commit);
 
 /* in shippable.c */
 extern bool is_builtin(Oid objectId);
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index f48743c..4ef2e51 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -15,6 +15,10 @@ DO $d$
             OPTIONS (dbname '$$||current_database()||$$',
                      port '$$||current_setting('port')||$$'
             )$$;
+        EXECUTE $$CREATE SERVER loopback3 FOREIGN DATA WRAPPER postgres_fdw
+            OPTIONS (dbname '$$||current_database()||$$',
+                     port '$$||current_setting('port')||$$'
+            )$$;
     END;
 $d$;
 
@@ -22,6 +26,7 @@ CREATE USER MAPPING FOR public SERVER testserver1
 	OPTIONS (user 'value', password 'value');
 CREATE USER MAPPING FOR CURRENT_USER SERVER loopback;
 CREATE USER MAPPING FOR CURRENT_USER SERVER loopback2;
+CREATE USER MAPPING FOR CURRENT_USER SERVER loopback3;
 
 -- ===================================================================
 -- create objects used through FDW loopback server
@@ -56,6 +61,15 @@ CREATE TABLE "S 1"."T 4" (
 	c3 text,
 	CONSTRAINT t4_pkey PRIMARY KEY (c1)
 );
+CREATE TABLE "S 1"."T 5" (
+       c1 int NOT NULL,
+       CONSTRAINT t5_pkey PRIMARY KEY (c1)
+);
+
+CREATE TABLE "S 1"."T 6" (
+       c1 int NOT NULL,
+       CONSTRAINT t6_pkey PRIMARY KEY (c1)
+);
 
 INSERT INTO "S 1"."T 1"
 	SELECT id,
@@ -83,11 +97,14 @@ INSERT INTO "S 1"."T 4"
 	       'AAA' || to_char(id, 'FM000')
 	FROM generate_series(1, 100) id;
 DELETE FROM "S 1"."T 4" WHERE c1 % 3 != 0;	-- delete for outer join tests
+INSERT INTO "S 1"."T 5"
+	SELECT generate_series(1, 100);
 
 ANALYZE "S 1"."T 1";
 ANALYZE "S 1"."T 2";
 ANALYZE "S 1"."T 3";
 ANALYZE "S 1"."T 4";
+ANALYZE "S 1"."T 5";
 
 -- ===================================================================
 -- create foreign tables
@@ -136,6 +153,19 @@ CREATE FOREIGN TABLE ft6 (
 	c3 text
 ) SERVER loopback2 OPTIONS (schema_name 'S 1', table_name 'T 4');
 
+CREATE FOREIGN TABLE ft7 (
+       c1 int NOT NULL
+) SERVER loopback OPTIONS (schema_name 'S 1', table_name 'T 5');
+
+CREATE FOREIGN TABLE ft8 (
+       c1 int NOT NULL
+) SERVER loopback2 OPTIONS (schema_name 'S 1', table_name 'T 5');
+
+CREATE FOREIGN TABLE ft9 (
+       c1 int NOT NULL
+) SERVER loopback3 OPTIONS (schema_name 'S 1', table_name 'T 5');
+
+
 -- A table with oids. CREATE FOREIGN TABLE doesn't support the
 -- WITH OIDS option, but ALTER does.
 CREATE FOREIGN TABLE ft_pg_type (
@@ -1660,3 +1690,95 @@ WHERE ftrelid = 'table30000'::regclass
 AND ftoptions @> array['fetch_size=60000'];
 
 ROLLBACK;
+
+
+-- ===================================================================
+-- test Atomic commit across foreign servers
+-- ===================================================================
+
+ALTER SERVER loopback OPTIONS(ADD two_phase_commit 'off');
+ALTER SERVER loopback2 OPTIONS(ADD two_phase_commit 'on');
+ALTER SERVER loopback3 OPTIONS(ADD two_phase_commit 'on');
+
+\des+
+
+-- one not supporting server
+BEGIN;
+INSERT INTO ft7 VALUES(101);
+COMMIT;
+SELECT COUNT(*) FROM ft8;
+
+-- One not supporting server and one supporting server
+BEGIN;
+INSERT INTO ft7 VALUES(102);
+INSERT INTO ft8 VALUES(103);
+COMMIT;
+SELECT COUNT(*) FROM ft8;
+
+-- Two supporting server and one not supporting server.
+BEGIN;
+INSERT INTO ft7 VALUES(104);
+INSERT INTO ft8 VALUES(105);
+INSERT INTO ft9 VALUES(106);
+COMMIT;
+SELECT COUNT(*) FROM ft8;
+
+-- one local and one not supporting foreign server
+BEGIN;
+INSERT INTO ft7 VALUES(107);
+INSERT INTO "S 1"."T 6" VALUES (1);
+COMMIT;
+SELECT COUNT(*) FROM ft8;
+SELECT COUNT(*) FROM "S 1"."T 6";
+
+-- one local and one supporting foreign server and not supporting one
+BEGIN;
+INSERT INTO ft7 VALUES(108);
+INSERT INTO ft8 VALUES(109);
+INSERT INTO "S 1"."T 6" VALUES (2);
+COMMIT;
+SELECT COUNT(*) FROM ft8;
+SELECT COUNT(*) FROM "S 1"."T 6";
+
+-- one local and two supporting foreign server and not supporting one
+BEGIN;
+INSERT INTO ft7 VALUES(110);
+INSERT INTO ft8 VALUES(111);
+INSERT INTO ft9 VALUES(112);
+INSERT INTO "S 1"."T 6" VALUES (3);
+COMMIT;
+SELECT COUNT(*) FROM ft8;
+SELECT COUNT(*) FROM "S 1"."T 6";
+
+-- transaction updating on single supporting foreign server with violation on foreign server
+BEGIN;
+INSERT INTO ft8 VALUES(113);
+INSERT INTO ft8 VALUES(110); -- violation on foreign server
+COMMIT;
+SELECT COUNT(*) FROM ft8;
+
+-- transaction updating on single supporting foreign server and local with violation on local
+BEGIN;
+INSERT INTO ft8 VALUES(114);
+INSERT INTO "S 1"."T 6" VALUES (4);
+INSERT INTO "S 1"."T 6" VALUES (3); -- violation on local
+COMMIT;
+SELECT COUNT(*) FROM ft8;
+SELECT COUNT(*) FROM "S 1"."T 6";
+
+-- violation on foreign server supporting 2PC
+BEGIN;
+INSERT INTO ft8 VALUES(115);
+INSERT INTO ft9 VALUES(116);
+INSERT INTO ft9 VALUES(110); -- violation on foreign server
+COMMIT;
+SELECT COUNT(*) FROM ft8;
+
+-- transaction involing local and foreign server with violation on local server
+BEGIN;
+INSERT INTO ft8 VALUES(117);
+INSERT INTO ft9 VALUES(118);
+INSERT INTO "S 1"."T 6" VALUES (3); -- violation on local
+COMMIT;
+SELECT COUNT(*) FROM ft8;
+SELECT COUNT(*) FROM "S 1"."T 6";
