diff --git a/contrib/pg_fdw_xact_resolver/Makefile b/contrib/pg_fdw_xact_resolver/Makefile
new file mode 100644
index 0000000..f8924f0
--- /dev/null
+++ b/contrib/pg_fdw_xact_resolver/Makefile
@@ -0,0 +1,15 @@
+# contrib/pg_fdw_xact_resolver/Makefile
+
+MODULES = pg_fdw_xact_resolver
+PGFILEDESC = "pg_fdw_xact_resolver - foreign transaction resolver demon"
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = contrib/pg_fdw_xact_resolver
+top_builddir = ../../
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/contrib/pg_fdw_xact_resolver/pg_fdw_xact_resolver.c b/contrib/pg_fdw_xact_resolver/pg_fdw_xact_resolver.c
new file mode 100644
index 0000000..6f587ae
--- /dev/null
+++ b/contrib/pg_fdw_xact_resolver/pg_fdw_xact_resolver.c
@@ -0,0 +1,364 @@
+/* -------------------------------------------------------------------------
+ *
+ * pg_fdw_xact_resolver.c
+ *
+ * Contrib module to launch foreign transaction resolver to resolve unresolved
+ * transactions prepared on foreign servers.
+ *
+ * The extension launches foreign transaction resolver launcher process as a
+ * background worker. The launcher then launches separate background worker
+ * process to resolve the foreign transaction in each database. The worker
+ * process simply connects to the database specified and calls pg_fdw_resolve()
+ * function, which tries to resolve the transactions.
+ *
+ * Copyright (C) 2015, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		contrib/pg_fdw_xact_resolver/pg_fdw_xact_resolver.c
+ *
+ * -------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+/* These are always necessary for a bgworker */
+#include "miscadmin.h"
+#include "postmaster/bgworker.h"
+#include "storage/ipc.h"
+#include "storage/latch.h"
+#include "storage/lwlock.h"
+#include "storage/proc.h"
+#include "storage/shmem.h"
+
+/* these headers are used by this particular worker's code */
+#include "access/xact.h"
+#include "access/fdw_xact.h"
+#include "executor/spi.h"
+#include "fmgr.h"
+#include "lib/stringinfo.h"
+#include "pgstat.h"
+#include "utils/builtins.h"
+#include "utils/snapmgr.h"
+#include "tcop/utility.h"
+
+PG_MODULE_MAGIC;
+
+void		_PG_init(void);
+
+/*
+ * Flags set by interrupt handlers of foreign transaction resolver for later
+ * service in the main loop.
+ */
+static volatile sig_atomic_t got_sighup = false;
+static volatile sig_atomic_t got_sigterm = false;
+static volatile sig_atomic_t got_sigquit = false;
+static volatile sig_atomic_t got_sigusr1 = false;
+
+static void FDWXactResolver_worker_main(Datum dbid_datum);
+static void FDWXactResolverMain(Datum main_arg);
+
+/* How frequently the resolver demon checks for unresolved transactions? */
+#define FDW_XACT_RESOLVE_NAP_TIME (10 * 1000L)
+
+/*
+ * Signal handler for SIGTERM
+ *		Set a flag to let the main loop to terminate, and set our latch to wake
+ *		it up.
+ */
+static void
+FDWXactResolver_SIGTERM(SIGNAL_ARGS)
+{
+	int			save_errno = errno;
+
+	got_sigterm = true;
+	SetLatch(MyLatch);
+
+	errno = save_errno;
+}
+
+/*
+ * Signal handler for SIGQUIT
+ *		Set a flag to let the main loop to terminate, and set our latch to wake
+ *		it up.
+ */
+static void
+FDWXactResolver_SIGQUIT(SIGNAL_ARGS)
+{
+	int			save_errno = errno;
+
+	got_sigquit = true;
+	SetLatch(MyLatch);
+
+	errno = save_errno;
+}
+/*
+ * Signal handler for SIGHUP
+ *		Set a flag to tell the main loop to reread the config file, and set
+ *		our latch to wake it up.
+ */
+static void
+FDWXactResolver_SIGHUP(SIGNAL_ARGS)
+{
+	int			save_errno = errno;
+
+	got_sighup = true;
+	SetLatch(MyLatch);
+
+	errno = save_errno;
+}
+
+static void
+FDWXactResolver_SIGUSR1(SIGNAL_ARGS)
+{
+	int			save_errno = errno;
+
+	got_sigusr1 = true;
+	SetLatch(MyLatch);
+
+	errno = save_errno;
+}
+
+/*
+ * Entrypoint of this module.
+ *
+ * Launches the foreign transaction resolver demon.
+ */
+void
+_PG_init(void)
+{
+	BackgroundWorker worker;
+
+	if (!process_shared_preload_libraries_in_progress)
+		return;
+
+	/* set up common data for all our workers */
+	/* 
+	 * For some reason unless background worker set
+	 * BGWORKER_BACKEND_DATABASE_CONNECTION, it's not added to BackendList and
+	 * hence notification to this backend is not enabled. So set that flag even
+	 * if the backend itself doesn't need database connection.
+	 */
+	worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
+	worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
+	worker.bgw_restart_time = 0;	/* restart immediately */
+	snprintf(worker.bgw_name, BGW_MAXLEN, "foreign transaction resolver launcher");
+	worker.bgw_main = FDWXactResolverMain;
+	worker.bgw_main_arg = (Datum) 0;/* Craft some dummy arg. */
+	worker.bgw_notify_pid = 0;
+
+	RegisterBackgroundWorker(&worker);
+}
+
+void
+FDWXactResolverMain(Datum main_arg)
+{
+	/* For launching background worker */
+	BackgroundWorker worker;
+	BackgroundWorkerHandle *handle = NULL;
+	pid_t		pid;
+
+	/* Properly accept or ignore signals the postmaster might send us */
+	pqsignal(SIGHUP, FDWXactResolver_SIGHUP);		/* set flag to read config
+												 * file */
+	pqsignal(SIGINT, SIG_IGN);
+	pqsignal(SIGTERM, FDWXactResolver_SIGTERM);	/* request shutdown */
+	pqsignal(SIGQUIT, FDWXactResolver_SIGQUIT);	/* hard crash time */
+	pqsignal(SIGALRM, SIG_IGN);
+	pqsignal(SIGPIPE, SIG_IGN);
+	pqsignal(SIGUSR1, FDWXactResolver_SIGUSR1);
+	pqsignal(SIGUSR2, SIG_IGN);
+
+	/* Reset some signals that are accepted by postmaster but not here */
+	pqsignal(SIGCHLD, SIG_DFL);
+	pqsignal(SIGTTIN, SIG_DFL);
+	pqsignal(SIGTTOU, SIG_DFL);
+	pqsignal(SIGCONT, SIG_DFL);
+	pqsignal(SIGWINCH, SIG_DFL);
+
+	/* Unblock signals */
+	BackgroundWorkerUnblockSignals();
+
+	/*
+	 * Main loop: do this until the SIGTERM handler tells us to terminate
+	 */
+	while (!got_sigterm)
+	{
+		int		rc;
+		List	*dbid_list = NIL; 
+		/*
+		 * If no background worker is running, we can start one if there are
+		 * unresolved foreign transactions.
+		 */
+		if (!handle)
+		{
+			/*
+			 * If we do not know which databases have foreign servers with
+			 * unresolved foreign transactions, get the list.
+			 */
+			if (!dbid_list)
+				dbid_list = get_dbids_with_unresolved_xact();
+
+			if (dbid_list)
+			{
+				/* Work on the first dbid, and remove it from the list */
+				Oid dbid = linitial_oid(dbid_list);
+				dbid_list = list_delete_first(dbid_list);
+
+				Assert(OidIsValid(dbid));
+
+				/* Start the foreign transaction resolver */
+				worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
+					BGWORKER_BACKEND_DATABASE_CONNECTION;
+				worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
+				/* We will start another worker if needed */
+				worker.bgw_restart_time = BGW_NEVER_RESTART;
+				worker.bgw_main = FDWXactResolver_worker_main;
+				snprintf(worker.bgw_name, BGW_MAXLEN, "foreign transaction resolver (dbid %u)", dbid);
+				worker.bgw_main_arg = ObjectIdGetDatum(dbid);
+				/* set bgw_notify_pid so that we can wait for it to finish */
+				worker.bgw_notify_pid = MyProcPid;
+			
+				RegisterDynamicBackgroundWorker(&worker, &handle);
+			}
+		}
+
+		/*
+		 * Background workers mustn't call usleep() or any direct equivalent:
+		 * instead, they may wait on their process latch, which sleeps as
+		 * necessary, but is awakened if postmaster dies.  That way the
+		 * background process goes away immediately in an emergency.
+		 */
+		rc = WaitLatch(MyLatch,
+					   WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
+					   FDW_XACT_RESOLVE_NAP_TIME);
+		ResetLatch(MyLatch);
+
+		/* emergency bailout if postmaster has died */
+		if (rc & WL_POSTMASTER_DEATH)
+			proc_exit(1);
+
+		/*
+		 * In case of a SIGHUP, just reload the configuration.
+		 */
+		if (got_sighup)
+		{
+			got_sighup = false;
+			ProcessConfigFile(PGC_SIGHUP);
+		}
+
+		if (got_sigusr1)
+		{
+			got_sigusr1 = false;
+
+			/* If we had started a worker check whether it completed */
+			if (handle)
+			{
+				BgwHandleStatus status;
+
+				status = GetBackgroundWorkerPid(handle, &pid); 
+				if (status == BGWH_STOPPED)
+					handle = NULL;
+			}
+		}
+
+		/* 
+		 * Postmaster wants to stop this process. Exit with non-zero code, so
+		 * that the postmaster starts this process again. The worker processes
+		 * will receive the signal and end themselves. This process will restart
+		 * them if necessary.
+		 */
+		if (got_sigquit)
+			proc_exit(2);
+	}
+
+	/* Time to exit */
+	ereport(LOG,
+			(errmsg("foreign transaction resolver shutting down")));
+
+	proc_exit(0);				/* done */
+}
+
+/* FDWXactWorker_SIGTERM
+ * Terminates the foreign transaction resolver worker process */
+static void
+FDWXactWorker_SIGTERM(SIGNAL_ARGS)
+{
+	/* Just terminate the current process */
+	proc_exit(1);
+}
+
+/* Per database foreign transaction resolver */
+static void
+FDWXactResolver_worker_main(Datum dbid_datum)
+{
+	char	*command = "SELECT pg_fdw_resolve()";
+	Oid		dbid = DatumGetObjectId(dbid_datum);
+	int		ret;
+
+	/*
+	 * This background worker does not loop infinitely, so we need handler only
+	 * for SIGTERM, in which case the process should just exit quickly.
+	 */
+	pqsignal(SIGTERM, FDWXactWorker_SIGTERM);
+	pqsignal(SIGQUIT, FDWXactWorker_SIGTERM);
+	pqsignal(SIGINT, SIG_IGN);
+	pqsignal(SIGALRM, SIG_IGN);
+	pqsignal(SIGPIPE, SIG_IGN);
+	pqsignal(SIGUSR1, SIG_IGN);
+	pqsignal(SIGUSR2, SIG_IGN);
+
+	/* Reset some signals that are accepted by postmaster but not here */
+	pqsignal(SIGCHLD, SIG_DFL);
+	pqsignal(SIGTTIN, SIG_DFL);
+	pqsignal(SIGTTOU, SIG_DFL);
+	pqsignal(SIGCONT, SIG_DFL);
+	pqsignal(SIGWINCH, SIG_DFL);
+	
+	/* Unblock signals */
+	BackgroundWorkerUnblockSignals();
+
+	/*
+	 * Run this background worker in superuser mode, so that all the foreign
+	 * server and user information isaccessible.
+	 */
+	BackgroundWorkerInitializeConnectionByOid(dbid, InvalidOid);
+
+	/*
+	 * Start a transaction on which we can call resolver function.
+	 * Note that each StartTransactionCommand() call should be preceded by a
+	 * SetCurrentStatementStartTimestamp() call, which sets both the time
+	 * for the statement we're about the run, and also the transaction
+	 * start time.  Also, each other query sent to SPI should probably be
+	 * preceded by SetCurrentStatementStartTimestamp(), so that statement
+	 * start time is always up to date.
+	 *
+	 * The SPI_connect() call lets us run queries through the SPI manager,
+	 * and the PushActiveSnapshot() call creates an "active" snapshot
+	 * which is necessary for queries to have MVCC data to work on.
+	 *
+	 * The pgstat_report_activity() call makes our activity visible
+	 * through the pgstat views.
+	 */
+	SetCurrentStatementStartTimestamp();
+	StartTransactionCommand();
+	SPI_connect();
+	PushActiveSnapshot(GetTransactionSnapshot());
+	pgstat_report_activity(STATE_RUNNING, command);
+
+	/* Run the resolver function */
+	ret = SPI_execute(command, false, 0);
+
+	if (ret < 0)
+		elog(LOG, "error running pg_fdw_resolve() within database %d",
+			 dbid);
+
+	/*
+	 * And finish our transaction.
+	 */
+	SPI_finish();
+	PopActiveSnapshot();
+	CommitTransactionCommand();
+	pgstat_report_activity(STATE_IDLE, NULL);
+
+	/* Done exit now */
+	proc_exit(0);
+}
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 1a1e5b5..341db6f 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -8,20 +8,22 @@
  * IDENTIFICATION
  *		  contrib/postgres_fdw/connection.c
  *
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
 
 #include "postgres_fdw.h"
 
 #include "access/xact.h"
+#include "access/fdw_xact.h"
+#include "commands/defrem.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
 #include "utils/hsearch.h"
 #include "utils/memutils.h"
 
 
 /*
  * Connection cache hash table entry
  *
  * The lookup key in this hash table is the foreign server OID plus the user
@@ -57,52 +59,59 @@ typedef struct ConnCacheEntry
 static HTAB *ConnectionHash = NULL;
 
 /* for assigning cursor numbers and prepared statement numbers */
 static unsigned int cursor_number = 0;
 static unsigned int prep_stmt_number = 0;
 
 /* tracks whether any work is needed in callback functions */
 static bool xact_got_connection = false;
 
 /* prototypes of private functions */
-static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
+static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user,
+									bool connection_error_ok);
 static void check_conn_params(const char **keywords, const char **values);
 static void configure_remote_session(PGconn *conn);
 static void do_sql_command(PGconn *conn, const char *sql);
-static void begin_remote_xact(ConnCacheEntry *entry);
+static void begin_remote_xact(ConnCacheEntry *entry, ForeignServer *server);
 static void pgfdw_xact_callback(XactEvent event, void *arg);
 static void pgfdw_subxact_callback(SubXactEvent event,
 					   SubTransactionId mySubid,
 					   SubTransactionId parentSubid,
 					   void *arg);
+static bool server_uses_two_phase_commit(ForeignServer *server);
+static void pgfdw_cleanup_after_transaction(ConnCacheEntry *entry);
 
 
 /*
  * Get a PGconn which can be used to execute queries on the remote PostgreSQL
  * server with the user's authorization.  A new connection is established
  * if we don't already have a suitable one, and a transaction is opened at
  * the right subtransaction nesting depth if we didn't do that already.
  *
  * will_prep_stmt must be true if caller intends to create any prepared
  * statements.  Since those don't go away automatically at transaction end
  * (not even on error), we need this flag to cue manual cleanup.
  *
+ * connection_error_ok if true, indicates that caller can handle connection
+ * error by itself. If false, raise error.
+ *
  * XXX Note that caching connections theoretically requires a mechanism to
  * detect change of FDW objects to invalidate already established connections.
  * We could manage that by watching for invalidation events on the relevant
  * syscaches.  For the moment, though, it's not clear that this would really
  * be useful and not mere pedantry.  We could not flush any active connections
  * mid-transaction anyway.
  */
 PGconn *
 GetConnection(ForeignServer *server, UserMapping *user,
-			  bool will_prep_stmt)
+			  bool will_prep_stmt, bool start_transaction,
+			  bool connection_error_ok)
 {
 	bool		found;
 	ConnCacheEntry *entry;
 	ConnCacheKey key;
 
 	/* First time through, initialize connection cache hashtable */
 	if (ConnectionHash == NULL)
 	{
 		HASHCTL		ctl;
 
@@ -116,23 +125,20 @@ GetConnection(ForeignServer *server, UserMapping *user,
 									 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
 
 		/*
 		 * Register some callback functions that manage connection cleanup.
 		 * This should be done just once in each backend.
 		 */
 		RegisterXactCallback(pgfdw_xact_callback, NULL);
 		RegisterSubXactCallback(pgfdw_subxact_callback, NULL);
 	}
 
-	/* Set flag that we did GetConnection during the current transaction */
-	xact_got_connection = true;
-
 	/* Create hash key for the entry.  Assume no pad bytes in key struct */
 	key.serverid = server->serverid;
 	key.userid = user->userid;
 
 	/*
 	 * Find or create cached entry for requested connection.
 	 */
 	entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
 	if (!found)
 	{
@@ -152,41 +158,64 @@ GetConnection(ForeignServer *server, UserMapping *user,
 	/*
 	 * If cache entry doesn't have a connection, we have to establish a new
 	 * connection.  (If connect_pg_server throws an error, the cache entry
 	 * will be left in a valid empty state.)
 	 */
 	if (entry->conn == NULL)
 	{
 		entry->xact_depth = 0;	/* just to be sure */
 		entry->have_prep_stmt = false;
 		entry->have_error = false;
-		entry->conn = connect_pg_server(server, user);
+		entry->conn = connect_pg_server(server, user, connection_error_ok);
+
+		/*
+		 * If the attempt to connect to the foreign server failed, we should not
+		 * come here, unless the caller has indicated so.
+		 */
+		Assert(entry->conn || connection_error_ok);
+
+		if (!entry->conn && connection_error_ok)
+		{
+			elog(DEBUG3, "attempt to connection to server \"%s\" by postgres_fdw failed",
+				 	server->servername);
+			return NULL;
+		}
+
 		elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\"",
 			 entry->conn, server->servername);
 	}
 
 	/*
 	 * Start a new transaction or subtransaction if needed.
 	 */
-	begin_remote_xact(entry);
+	if (start_transaction)
+	{
+		begin_remote_xact(entry, server);
+		/* Set flag that we did GetConnection during the current transaction */
+		xact_got_connection = true;
+	}
+
 
 	/* Remember if caller will prepare statements */
 	entry->have_prep_stmt |= will_prep_stmt;
 
 	return entry->conn;
 }
 
 /*
  * Connect to remote server using specified server and user mapping properties.
+ * If the attempt to connect fails, and the caller can handle connection failure
+ * (connection_error_ok = true) return NULL, throw error otherwise.
  */
 static PGconn *
-connect_pg_server(ForeignServer *server, UserMapping *user)
+connect_pg_server(ForeignServer *server, UserMapping *user,
+					bool connection_error_ok)
 {
 	PGconn	   *volatile conn = NULL;
 
 	/*
 	 * Use PG_TRY block to ensure closing connection on error.
 	 */
 	PG_TRY();
 	{
 		const char **keywords;
 		const char **values;
@@ -227,25 +256,29 @@ connect_pg_server(ForeignServer *server, UserMapping *user)
 		if (!conn || PQstatus(conn) != CONNECTION_OK)
 		{
 			char	   *connmessage;
 			int			msglen;
 
 			/* libpq typically appends a newline, strip that */
 			connmessage = pstrdup(PQerrorMessage(conn));
 			msglen = strlen(connmessage);
 			if (msglen > 0 && connmessage[msglen - 1] == '\n')
 				connmessage[msglen - 1] = '\0';
-			ereport(ERROR,
-			   (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
-				errmsg("could not connect to server \"%s\"",
-					   server->servername),
-				errdetail_internal("%s", connmessage)));
+
+			if (connection_error_ok)
+				return NULL;
+			else
+				ereport(ERROR,
+						(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
+						errmsg("could not connect to server \"%s\"",
+					   		server->servername),
+						errdetail_internal("%s", connmessage)));
 		}
 
 		/*
 		 * Check that non-superuser has used password to establish connection;
 		 * otherwise, he's piggybacking on the postgres server's user
 		 * identity. See also dblink_security_check() in contrib/dblink.
 		 */
 		if (!superuser() && !PQconnectionUsedPassword(conn))
 			ereport(ERROR,
 				  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
@@ -362,29 +395,36 @@ do_sql_command(PGconn *conn, const char *sql)
  * Start remote transaction or subtransaction, if needed.
  *
  * Note that we always use at least REPEATABLE READ in the remote session.
  * This is so that, if a query initiates multiple scans of the same or
  * different foreign tables, we will get snapshot-consistent results from
  * those scans.  A disadvantage is that we can't provide sane emulation of
  * READ COMMITTED behavior --- it would be nice if we had some other way to
  * control which remote queries share a snapshot.
  */
 static void
-begin_remote_xact(ConnCacheEntry *entry)
+begin_remote_xact(ConnCacheEntry *entry, ForeignServer *server)
 {
 	int			curlevel = GetCurrentTransactionNestLevel();
 
 	/* Start main transaction if we haven't yet */
 	if (entry->xact_depth <= 0)
 	{
 		const char *sql;
 
+		/*
+		 * Register the new foreign server and check whether the two phase
+		 * compliance is possible. 
+		 */
+		RegisterXactForeignServer(entry->key.serverid, entry->key.userid,
+									server_uses_two_phase_commit(server));
+
 		elog(DEBUG3, "starting remote transaction on connection %p",
 			 entry->conn);
 
 		if (IsolationIsSerializable())
 			sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
 		else
 			sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
 		do_sql_command(entry->conn, sql);
 		entry->xact_depth = 1;
 	}
@@ -506,148 +546,295 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
 		if (clear)
 			PQclear(res);
 		PG_RE_THROW();
 	}
 	PG_END_TRY();
 	if (clear)
 		PQclear(res);
 }
 
 /*
- * pgfdw_xact_callback --- cleanup at main-transaction end.
+ * postgresGetPrepareId
+ * The function crafts prepared transaction identifier. PostgreSQL documentation
+ * mentions two restrictions on the name
+ * 1. String literal, less than 200 bytes long.
+ * 2. Should not be same as any other concurrent prepared transaction id.
+ *
+ * To make the prepared transaction id, we should ideally use something like
+ * UUID, which gives unique ids with high probability, but that may be expensive
+ * here and UUID extension which provides the function to generate UUID is
+ * not part of the core.
  */
-static void
-pgfdw_xact_callback(XactEvent event, void *arg)
+extern char *
+postgresGetPrepareId(Oid serverid, Oid userid, int *prep_info_len)
 {
-	HASH_SEQ_STATUS scan;
-	ConnCacheEntry *entry;
+/* Maximum length of the prepared transaction id, borrowed from twophase.c */
+#define PREP_XACT_ID_MAX_LEN 200
+#define RANDOM_LARGE_MULTIPLIER 1000
+	char	*prep_info;
+
+	/* Allocate the memory in the same context as the hash entry */
+	prep_info = (char *)palloc(PREP_XACT_ID_MAX_LEN * sizeof(char));
+	snprintf(prep_info, PREP_XACT_ID_MAX_LEN, "%s_%4d_%d_%d",
+								"px", abs(random() * RANDOM_LARGE_MULTIPLIER),
+								serverid, userid);
+	/* Account for the last NULL byte */
+	*prep_info_len = strlen(prep_info);
+	return prep_info;
+}
 
-	/* Quick exit if no connections were touched in this transaction. */
-	if (!xact_got_connection)
-		return;
+bool
+postgresPrepareForeignTransaction(Oid serverid, Oid userid, int prep_info_len,
+									char *prep_info)
+{
+	StringInfo		command;
+	PGresult		*res;
+	ConnCacheEntry	*entry = NULL;
+	ConnCacheKey	 key;
+	bool			found;
+
+	/* Connection hash should have a connection we want */
+		
+	/* Create hash key for the entry.  Assume no pad bytes in key struct */
+	key.serverid = serverid;
+	key.userid = userid;
+
+	Assert(ConnectionHash);
+	entry = hash_search(ConnectionHash, &key, HASH_FIND, &found);
+
+	if (found && entry->conn)
+	{
+		bool result;
+
+		PGconn	*conn = entry->conn;
+		command = makeStringInfo();
+		appendStringInfo(command, "PREPARE TRANSACTION '%.*s'", prep_info_len,
+																	prep_info);
+		res = PQexec(conn, command->data);
+		result = (PQresultStatus(res) == PGRES_COMMAND_OK);
+		if (!result)
+		{
+			/*
+			 * TODO: check whether we should raise an error or warning.
+			 * The command failed, raise a warning, so that the reason for
+			 * failure gets logged. Do not raise an error, the caller i.e. foreign
+			 * transaction manager takes care of taking appropriate action.
+			 */
+			pgfdw_report_error(WARNING, res, conn, false, command->data);
+		}
+
+		PQclear(res);
+		pgfdw_cleanup_after_transaction(entry);
+		return result;
+	}
+	else
+		return false;
+}
+
+bool
+postgresEndForeignTransaction(Oid serverid, Oid userid, bool is_commit)
+{
+	StringInfo		command;
+	PGresult		*res;
+	ConnCacheEntry	*entry = NULL;
+	ConnCacheKey	 key;
+	bool			found;
+
+	/* Connection hash should have a connection we want */
+		
+	/* Create hash key for the entry.  Assume no pad bytes in key struct */
+	key.serverid = serverid;
+	key.userid = userid;
+
+	Assert(ConnectionHash);
+	entry = hash_search(ConnectionHash, &key, HASH_FIND, &found);
+
+	if (found && entry->conn)
+	{
+		PGconn	*conn = entry->conn;
+		bool	result;
+
+		command = makeStringInfo();
+		appendStringInfo(command, "%s TRANSACTION",
+							is_commit ? "COMMIT" : "ROLLBACK");
+		res = PQexec(conn, command->data);
+		result = (PQresultStatus(res) == PGRES_COMMAND_OK);
+		if (!result)
+		{
+			/*
+			 * The local transaction has ended, so there is no point in raising
+			 * error. Raise a warning so that the reason for the failure gets
+			 * logged.
+			 */
+			pgfdw_report_error(WARNING, res, conn, false, command->data);
+		}
+
+		PQclear(res);
+		pgfdw_cleanup_after_transaction(entry);
+		return result;
+	}
+	return false;
+}
+
+bool
+postgresResolvePreparedForeignTransaction(Oid serverid, Oid userid,
+											bool is_commit,
+											int prep_info_len, char *prep_info)
+{
+	PGconn			*conn = NULL;
 
 	/*
-	 * Scan all connection cache entries to find open remote transactions, and
-	 * close them.
+	 * If there exists a connection in the connection cache that can be used,
+	 * use it. If there is none, we need foreign server and user information
+	 * which can be obtained only when in a transaction block.
+	 * If we are resolving prepared foreign transactions immediately after
+	 * preparing them, the connection hash would have a connection. If we are
+	 * resolving them any other time, a resolver would have started a
+	 * transaction.
 	 */
-	hash_seq_init(&scan, ConnectionHash);
-	while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+	if (ConnectionHash)
 	{
-		PGresult   *res;
+		/* Connection hash should have a connection we want */
+		bool		found;
+		ConnCacheKey key;
+		ConnCacheEntry	*entry;
+		
+		/* Create hash key for the entry.  Assume no pad bytes in key struct */
+		key.serverid = serverid;
+		key.userid = userid;
+
+		entry = (ConnCacheEntry *)hash_search(ConnectionHash, &key, HASH_FIND, &found);
+		if (found && entry->conn)
+			conn = entry->conn;
+	}
 
-		/* Ignore cache entry if no open connection right now */
-		if (entry->conn == NULL)
-			continue;
+	if (!conn && IsTransactionState())
+	{
+		ForeignServer	*foreign_server = GetForeignServer(serverid); 
+		UserMapping		*user_mapping = GetUserMapping(userid, serverid);
 
-		/* If it has an open remote transaction, try to close it */
-		if (entry->xact_depth > 0)
-		{
-			elog(DEBUG3, "closing remote transaction on connection %p",
-				 entry->conn);
+		conn = GetConnection(foreign_server, user_mapping, false, false, true);
+	}
 
-			switch (event)
+	/* Proceed with resolution if we got a connection, else return false */
+	if (conn)
+	{
+		StringInfo		command;
+		PGresult		*res;
+		bool			result;
+
+		command = makeStringInfo();
+		appendStringInfo(command, "%s PREPARED '%.*s'",
+							is_commit ? "COMMIT" : "ROLLBACK",
+							prep_info_len, prep_info);
+		res = PQexec(conn, command->data);
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		{
+			int		sqlstate;
+			char	*diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
+			/*
+			 * The command failed, raise a warning to log the reason of failure.
+			 * We may not be in a transaction here, so raising error doesn't
+			 * help. Even if we are in a transaction, it would be the resolver
+			 * transaction, which will get aborted on raising error, thus
+			 * delaying resolution of other prepared foreign transactions.
+			 */
+			pgfdw_report_error(WARNING, res, conn, false, command->data);
+	
+			if (diag_sqlstate)
 			{
-				case XACT_EVENT_PARALLEL_PRE_COMMIT:
-				case XACT_EVENT_PRE_COMMIT:
-					/* Commit all remote transactions during pre-commit */
-					do_sql_command(entry->conn, "COMMIT TRANSACTION");
-
-					/*
-					 * If there were any errors in subtransactions, and we
-					 * made prepared statements, do a DEALLOCATE ALL to make
-					 * sure we get rid of all prepared statements. This is
-					 * annoying and not terribly bulletproof, but it's
-					 * probably not worth trying harder.
-					 *
-					 * DEALLOCATE ALL only exists in 8.3 and later, so this
-					 * constrains how old a server postgres_fdw can
-					 * communicate with.  We intentionally ignore errors in
-					 * the DEALLOCATE, so that we can hobble along to some
-					 * extent with older servers (leaking prepared statements
-					 * as we go; but we don't really support update operations
-					 * pre-8.3 anyway).
-					 */
-					if (entry->have_prep_stmt && entry->have_error)
-					{
-						res = PQexec(entry->conn, "DEALLOCATE ALL");
-						PQclear(res);
-					}
-					entry->have_prep_stmt = false;
-					entry->have_error = false;
-					break;
-				case XACT_EVENT_PRE_PREPARE:
-
-					/*
-					 * We disallow remote transactions that modified anything,
-					 * since it's not very reasonable to hold them open until
-					 * the prepared transaction is committed.  For the moment,
-					 * throw error unconditionally; later we might allow
-					 * read-only cases.  Note that the error will cause us to
-					 * come right back here with event == XACT_EVENT_ABORT, so
-					 * we'll clean up the connection state at that point.
-					 */
-					ereport(ERROR,
-							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-							 errmsg("cannot prepare a transaction that modified remote tables")));
-					break;
-				case XACT_EVENT_PARALLEL_COMMIT:
-				case XACT_EVENT_COMMIT:
-				case XACT_EVENT_PREPARE:
-					/* Pre-commit should have closed the open transaction */
-					elog(ERROR, "missed cleaning up connection during pre-commit");
-					break;
-				case XACT_EVENT_PARALLEL_ABORT:
-				case XACT_EVENT_ABORT:
-					/* Assume we might have lost track of prepared statements */
-					entry->have_error = true;
-					/* If we're aborting, abort all remote transactions too */
-					res = PQexec(entry->conn, "ABORT TRANSACTION");
-					/* Note: can't throw ERROR, it would be infinite loop */
-					if (PQresultStatus(res) != PGRES_COMMAND_OK)
-						pgfdw_report_error(WARNING, res, entry->conn, true,
-										   "ABORT TRANSACTION");
-					else
-					{
-						PQclear(res);
-						/* As above, make sure to clear any prepared stmts */
-						if (entry->have_prep_stmt && entry->have_error)
-						{
-							res = PQexec(entry->conn, "DEALLOCATE ALL");
-							PQclear(res);
-						}
-						entry->have_prep_stmt = false;
-						entry->have_error = false;
-					}
-					break;
+				sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
+										 diag_sqlstate[1],
+										 diag_sqlstate[2],
+										 diag_sqlstate[3],
+										 diag_sqlstate[4]);
 			}
+			else
+				sqlstate = ERRCODE_CONNECTION_FAILURE;
+	
+			/*
+			 * If we tried to COMMIT/ABORT a prepared transaction and the pepared
+			 * transaction was missing on the foreign server, it was probably
+			 * resolved by some other means. Anyway, it should be considered as resolved.
+			 */
+			result = (sqlstate == ERRCODE_UNDEFINED_OBJECT);
 		}
+		else
+			result = true;
 
-		/* Reset state to show we're out of a transaction */
-		entry->xact_depth = 0;
+		PQclear(res);
+		ReleaseConnection(conn);
+		return result;
+	}
+	else
+		return false;
+}
 
-		/*
-		 * If the connection isn't in a good idle state, discard it to
-		 * recover. Next GetConnection will open a new connection.
-		 */
-		if (PQstatus(entry->conn) != CONNECTION_OK ||
-			PQtransactionStatus(entry->conn) != PQTRANS_IDLE)
-		{
-			elog(DEBUG3, "discarding connection %p", entry->conn);
-			PQfinish(entry->conn);
-			entry->conn = NULL;
-		}
+static void
+pgfdw_cleanup_after_transaction(ConnCacheEntry *entry)
+{
+	/*
+	 * If there were any errors in subtransactions, and we made prepared
+	 * statements, do a DEALLOCATE ALL to make sure we get rid of all
+	 * prepared statements. This is annoying and not terribly bulletproof,
+	 * but it's probably not worth trying harder.
+	 *
+	 * DEALLOCATE ALL only exists in 8.3 and later, so this constrains how
+	 * old a server postgres_fdw can communicate with.  We intentionally
+	 * ignore errors in the DEALLOCATE, so that we can hobble along to some
+	 * extent with older servers (leaking prepared statements as we go;
+	 * but we don't really support update operations pre-8.3 anyway).
+	 */
+	if (entry->have_prep_stmt && entry->have_error)
+	{
+		PGresult *res = PQexec(entry->conn, "DEALLOCATE ALL");
+		PQclear(res);
 	}
 
+	entry->have_prep_stmt = false;
+	entry->have_error = false;
+	/* Reset state to show we're out of a transaction */
+	entry->xact_depth = 0;
+
+	/*
+	 * If the connection isn't in a good idle state, discard it to
+	 * recover. Next GetConnection will open a new connection.
+	 */
+	if (PQstatus(entry->conn) != CONNECTION_OK ||
+		PQtransactionStatus(entry->conn) != PQTRANS_IDLE)
+	{
+		elog(DEBUG3, "discarding connection %p", entry->conn);
+		PQfinish(entry->conn);
+		entry->conn = NULL;
+	}
+
+	/*
+	 * TODO: these next two statements should be moved to end of transaction
+	 * call back.
+	 * Regardless of the event type, we can now mark ourselves as out of the
+	 * transaction.
+	 */
+	xact_got_connection = false;
+
+	/* Also reset cursor numbering for next transaction */
+	cursor_number = 0;
+}
+
+/*
+ * pgfdw_xact_callback --- cleanup at main-transaction end.
+ */
+static void
+pgfdw_xact_callback(XactEvent event, void *arg)
+{
 	/*
 	 * Regardless of the event type, we can now mark ourselves as out of the
-	 * transaction.  (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
-	 * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
+	 * transaction.
 	 */
 	xact_got_connection = false;
 
 	/* Also reset cursor numbering for next transaction */
 	cursor_number = 0;
 }
 
 /*
  * pgfdw_subxact_callback --- cleanup at subtransaction end.
  */
@@ -708,10 +895,33 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
 			if (PQresultStatus(res) != PGRES_COMMAND_OK)
 				pgfdw_report_error(WARNING, res, entry->conn, true, sql);
 			else
 				PQclear(res);
 		}
 
 		/* OK, we're outta that level of subtransaction */
 		entry->xact_depth--;
 	}
 }
+
+/*
+ * server_uses_two_phase_commit
+ * Returns true if the foreign server is configured to support 2PC.
+ */
+static bool
+server_uses_two_phase_commit(ForeignServer *server)
+{
+	ListCell		*lc;
+	
+	/* Check the options for two phase compliance */ 
+	foreach(lc, server->options)
+	{
+		DefElem    *d = (DefElem *) lfirst(lc);
+
+		if (strcmp(d->defname, "two_phase_commit") == 0)
+		{
+			return defGetBoolean(d);
+		}
+	}
+	/* By default a server is not 2PC compliant */
+	return false;
+}
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 866a09b..0c52753 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -3779,10 +3779,348 @@ ERROR:  type "public.Colors" does not exist
 LINE 4:   "Col" public."Colors" OPTIONS (column_name 'Col')
                 ^
 QUERY:  CREATE FOREIGN TABLE t5 (
   c1 integer OPTIONS (column_name 'c1'),
   c2 text OPTIONS (column_name 'c2') COLLATE pg_catalog."C",
   "Col" public."Colors" OPTIONS (column_name 'Col')
 ) SERVER loopback
 OPTIONS (schema_name 'import_source', table_name 't5');
 CONTEXT:  importing foreign table "t5"
 ROLLBACK;
+-- This will suppress the context of errors, which contains prepared transaction
+-- IDs. Those come out to be different each time.
+\set VERBOSITY terse
+-- Test transactional consistency for multiple server case
+-- create two loopback servers for testing consistency on two connections
+DO $d$
+    BEGIN
+        EXECUTE $$CREATE SERVER loopback1 FOREIGN DATA WRAPPER postgres_fdw
+            OPTIONS (dbname '$$||current_database()||$$',
+                     port '$$||current_setting('port')||$$',
+					 two_phase_commit 'true'
+            )$$;
+    END;
+$d$;
+DO $d$
+    BEGIN
+        EXECUTE $$CREATE SERVER loopback2 FOREIGN DATA WRAPPER postgres_fdw
+            OPTIONS (dbname '$$||current_database()||$$',
+                     port '$$||current_setting('port')||$$',
+					 two_phase_commit 'true'
+            )$$;
+    END;
+$d$;
+CREATE USER MAPPING FOR CURRENT_USER SERVER loopback1;
+CREATE USER MAPPING FOR CURRENT_USER SERVER loopback2;
+-- create a local table to refer to as foreign table. Add a row. The table has
+-- constraints which are deferred till end of transaction. This allows commit
+-- time errors to occur by inserting data which violates constraints.
+CREATE TABLE lt(val int UNIQUE DEFERRABLE INITIALLY DEFERRED);
+-- create two foreign tables each on separate server referring to the local table.
+CREATE FOREIGN TABLE ft1_lt (val int) SERVER loopback1 OPTIONS (table_name 'lt');
+CREATE FOREIGN TABLE ft2_lt (val int) SERVER loopback2 OPTIONS (table_name 'lt');
+-- test prepared transactions with foreign servers
+-- test for commit prepared
+BEGIN;
+	INSERT INTO ft1_lt VALUES (1);
+	INSERT INTO ft2_lt VALUES (3);
+PREPARE TRANSACTION 'prep_xact_with_fdw';
+-- prepared transactions should be seen in the system view
+SELECT P.database, P.gid AS "local transaction identifier",
+		"foreign server", "local user", status
+		FROM pg_fdw_xacts F
+			LEFT JOIN pg_prepared_xacts P ON F.transaction = P.transaction
+		WHERE P.database = F.database;	-- WHERE condition is actually an assertion
+      database      | local transaction identifier | foreign server | local user |  status  
+--------------------+------------------------------+----------------+------------+----------
+ contrib_regression | prep_xact_with_fdw           | loopback1      | ashutosh   | prepared
+ contrib_regression | prep_xact_with_fdw           | loopback2      | ashutosh   | prepared
+(2 rows)
+
+COMMIT PREPARED 'prep_xact_with_fdw';
+SELECT * FROM lt;
+ val 
+-----
+   1
+   3
+(2 rows)
+
+-- test for rollback prepared
+BEGIN;
+	INSERT INTO ft1_lt VALUES (10);
+	INSERT INTO ft2_lt VALUES (30);
+PREPARE TRANSACTION 'prep_xact_with_fdw';
+ROLLBACK PREPARED 'prep_xact_with_fdw';
+SELECT * FROM lt;
+ val 
+-----
+   1
+   3
+(2 rows)
+
+-- In a transaction insert two rows one each to the two foreign tables. One of
+-- the rows violates the constraint and other not. At the time of commit
+-- constraints on one of the server will rollback transaction on that server in
+-- turn rolling back the whole transaction.
+BEGIN TRANSACTION;
+	INSERT INTO ft1_lt VALUES (1); -- Violates constraint
+	INSERT INTO ft2_lt VALUES (2);
+COMMIT TRANSACTION;
+WARNING:  duplicate key value violates unique constraint "lt_val_key"
+ERROR:  can not prepare transaction on foreign server loopback1
+SELECT * FROM lt;
+ val 
+-----
+   1
+   3
+(2 rows)
+
+BEGIN TRANSACTION;
+	INSERT INTO ft1_lt VALUES (4);
+	INSERT INTO ft2_lt VALUES (3); -- Violates constraint
+COMMIT TRANSACTION;
+WARNING:  duplicate key value violates unique constraint "lt_val_key"
+ERROR:  can not prepare transaction on foreign server loopback2
+SELECT * FROM lt;
+ val 
+-----
+   1
+   3
+(2 rows)
+
+-- Transaction involving local changes and remote changes, one of them or both
+-- violating the constraints
+BEGIN TRANSACTION;
+	INSERT INTO lt VALUES (1); -- violates constraints
+	INSERT INTO ft1_lt VALUES (5);
+COMMIT TRANSACTION;
+ERROR:  duplicate key value violates unique constraint "lt_val_key"
+SELECT * FROM lt;
+ val 
+-----
+   1
+   3
+(2 rows)
+
+BEGIN TRANSACTION;
+	INSERT INTO lt VALUES (6);
+	INSERT INTO ft1_lt VALUES (1); -- violates constraints
+COMMIT TRANSACTION;
+WARNING:  duplicate key value violates unique constraint "lt_val_key"
+ERROR:  can not prepare transaction on foreign server loopback1
+SELECT * FROM lt;
+ val 
+-----
+   1
+   3
+(2 rows)
+
+BEGIN TRANSACTION;
+	INSERT INTO lt VALUES (1); -- violates constraints 
+	INSERT INTO ft1_lt VALUES (3); -- violates constraints
+COMMIT TRANSACTION;
+ERROR:  duplicate key value violates unique constraint "lt_val_key"
+SELECT * FROM lt;
+ val 
+-----
+   1
+   3
+(2 rows)
+
+-- Multiple foreign servers with local changes
+BEGIN TRANSACTION;
+	INSERT INTO lt VALUES (7);
+	INSERT INTO ft1_lt VALUES (8);
+	INSERT INTO ft2_lt VALUES (1); -- violates constraints
+COMMIT TRANSACTION;
+WARNING:  duplicate key value violates unique constraint "lt_val_key"
+ERROR:  can not prepare transaction on foreign server loopback2
+SELECT * FROM lt;
+ val 
+-----
+   1
+   3
+(2 rows)
+
+-- test for removing foreign transactions 
+BEGIN;
+	INSERT INTO ft1_lt VALUES (10);
+	INSERT INTO ft2_lt VALUES (30);
+PREPARE TRANSACTION 'prep_xact_with_fdw';
+-- get the transaction identifiers for foreign servers loopback1 and loopback2
+SELECT "foreign transaction identifier" AS lbs1_id FROM pg_fdw_xacts WHERE "foreign server" = 'loopback1'
+\gset
+SELECT "foreign transaction identifier" AS lbs2_id FROM pg_fdw_xacts WHERE "foreign server" = 'loopback2'
+\gset
+-- Rollback the transactions with identifiers collected above. The foreign
+-- servers are pointing to self, so the transactions are local.
+ROLLBACK PREPARED :'lbs1_id';
+ROLLBACK PREPARED :'lbs2_id';
+-- Get the xid of parent transaction into a variable. The foreign
+-- transactions corresponding to this xid are removed later.
+SELECT transaction AS rem_xid FROM pg_prepared_xacts
+\gset
+-- There should be 2 entries corresponding to the prepared foreign transactions
+-- on two foreign servers.
+SELECT count(*) FROM pg_fdw_xacts WHERE transaction = :rem_xid;
+ count 
+-------
+     2
+(1 row)
+
+-- Remove the prepared foreign transaction entries.
+SELECT pg_fdw_remove(:'rem_xid'::xid);
+ pg_fdw_remove 
+---------------
+ 
+(1 row)
+
+-- There should be no foreign prepared transactions now.
+SELECT count(*) FROM pg_fdw_xacts WHERE transaction = :rem_xid;
+ count 
+-------
+     0
+(1 row)
+
+-- Rollback the parent transaction to release any resources
+ROLLBACK PREPARED 'prep_xact_with_fdw';
+-- source table should be in-tact
+SELECT * FROM lt;
+ val 
+-----
+   1
+   3
+(2 rows)
+
+-- test for failing prepared transaction
+BEGIN;
+	INSERT INTO ft1_lt VALUES (1); -- violates constraint, so prepare should fail
+	INSERT INTO ft2_lt VALUES (2);
+PREPARE TRANSACTION 'prep_fdw_xact_failure'; -- should fail
+WARNING:  duplicate key value violates unique constraint "lt_val_key"
+ERROR:  can not prepare transaction on foreign server loopback1
+-- We shouldn't see anything, the transactions prepared on the foreign servers
+-- should be rolled back.
+SELECT database, "foreign server", "local user", status FROM pg_fdw_xacts;
+ database | foreign server | local user | status 
+----------+----------------+------------+--------
+(0 rows)
+
+SELECT database, gid FROM pg_prepared_xacts;
+ database | gid 
+----------+-----
+(0 rows)
+
+-- subtransactions with foreign servers
+TRUNCATE TABLE lt;
+BEGIN TRANSACTION;
+	INSERT INTO ft1_lt VALUES (1);
+	INSERT INTO ft2_lt VALUES (2);
+	SAVEPOINT sv1;
+		UPDATE ft1_lt SET val = val + 1;
+		UPDATE ft2_lt SET val = val + 1;
+	ROLLBACK TO SAVEPOINT sv1;
+	SAVEPOINT sv2;
+		UPDATE ft1_lt SET val = val + 2;
+		UPDATE ft2_lt SET val = val + 2;
+	RELEASE SAVEPOINT sv2;
+	INSERT INTO lt VALUES (10);
+COMMIT TRANSACTION;
+SELECT * FROM lt;
+ val 
+-----
+   3
+   4
+  10
+(3 rows)
+
+TRUNCATE TABLE lt;
+BEGIN TRANSACTION;
+	INSERT INTO ft1_lt VALUES (1);
+	INSERT INTO ft2_lt VALUES (2);
+	SAVEPOINT sv1;
+		UPDATE ft1_lt SET val = val + 1;
+		UPDATE ft2_lt SET val = val + 1;
+	ROLLBACK TO SAVEPOINT sv1;
+	SAVEPOINT sv2;
+		UPDATE ft1_lt SET val = val + 2;
+		UPDATE ft2_lt SET val = val + 2;
+	RELEASE SAVEPOINT sv2;
+	INSERT INTO lt VALUES (10);
+PREPARE TRANSACTION 'prep_xact_fdw_subxact';
+-- only top transaction's xid should be recorded, not that of subtransactions'
+SELECT P.database, P.gid AS "local transaction identifier",
+		"foreign server", "local user", status
+		FROM pg_fdw_xacts F
+			LEFT JOIN pg_prepared_xacts P ON F.transaction = P.transaction
+		WHERE P.database = F.database;	-- WHERE condition is actually an assertion
+      database      | local transaction identifier | foreign server | local user |  status  
+--------------------+------------------------------+----------------+------------+----------
+ contrib_regression | prep_xact_fdw_subxact        | loopback1      | ashutosh   | prepared
+ contrib_regression | prep_xact_fdw_subxact        | loopback2      | ashutosh   | prepared
+(2 rows)
+
+COMMIT PREPARED 'prep_xact_fdw_subxact';
+SELECT * FROM lt;
+ val 
+-----
+   3
+   4
+  10
+(3 rows)
+
+-- What if one of the servers involved in a transaction isn't capable of 2PC?
+-- Those servers capable of two phase commit, will commit their transactions
+-- atomically with the local transaction. The transactions on the incapable
+-- servers will be committed independent of the outcome of the other foreign
+-- transactions.
+TRUNCATE TABLE lt;
+INSERT INTO lt VALUES (1);
+ALTER SERVER loopback2 OPTIONS (SET two_phase_commit 'false'); 
+-- Changes to the local server and the loopback1 will be rolled back as prepare
+-- on loopback1 would fail because of constraint violation. But the changes on
+-- loopback2, which doesn't execute two phase commit, will be committed.
+BEGIN TRANSACTION;
+	INSERT INTO ft2_lt VALUES (2);
+	INSERT INTO lt VALUES (3);
+	INSERT INTO ft1_lt VALUES (1);
+COMMIT TRANSACTION;
+WARNING:  duplicate key value violates unique constraint "lt_val_key"
+ERROR:  can not prepare transaction on foreign server loopback1
+SELECT * FROM lt;
+ val 
+-----
+   1
+   2
+(2 rows)
+
+TRUNCATE TABLE lt;
+INSERT INTO lt VALUES (1);
+-- Changes to all the servers, local and foreign, will be rolled back as those
+-- on loopback2 (incapable of two-phase commit) could not be commited.
+BEGIN TRANSACTION;
+	INSERT INTO ft2_lt VALUES (1);
+	INSERT INTO lt VALUES (3);
+	INSERT INTO ft1_lt VALUES (2);
+COMMIT TRANSACTION;
+WARNING:  duplicate key value violates unique constraint "lt_val_key"
+WARNING:  could not commit transaction on server loopback2
+SELECT * FROM lt;
+ val 
+-----
+   1
+   3
+   2
+(3 rows)
+
+-- At the end, we should not have any foreign transaction remaining unresolved
+SELECT * FROM pg_fdw_xacts;
+ transaction | database | foreign server | local user | status | foreign transaction identifier 
+-------------+----------+----------------+------------+--------+--------------------------------
+(0 rows)
+
+DROP SERVER loopback1 CASCADE;
+NOTICE:  drop cascades to 2 other objects
+DROP SERVER loopback2 CASCADE;
+NOTICE:  drop cascades to 2 other objects
+DROP TABLE lt;
+\set VERBOSITY default
diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c
index 380ac80..32a6247 100644
--- a/contrib/postgres_fdw/option.c
+++ b/contrib/postgres_fdw/option.c
@@ -100,21 +100,22 @@ postgres_fdw_validator(PG_FUNCTION_ARGS)
 					(errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
 					 errmsg("invalid option \"%s\"", def->defname),
 					 errhint("Valid options in this context are: %s",
 							 buf.data)));
 		}
 
 		/*
 		 * Validate option value, when we can do so without any context.
 		 */
 		if (strcmp(def->defname, "use_remote_estimate") == 0 ||
-			strcmp(def->defname, "updatable") == 0)
+			strcmp(def->defname, "updatable") == 0 ||
+			strcmp(def->defname, "two_phase_commit") == 0)
 		{
 			/* these accept only boolean values */
 			(void) defGetBoolean(def);
 		}
 		else if (strcmp(def->defname, "fdw_startup_cost") == 0 ||
 				 strcmp(def->defname, "fdw_tuple_cost") == 0)
 		{
 			/* these must have a non-negative numeric value */
 			double		val;
 			char	   *endp;
@@ -155,20 +156,22 @@ InitPgFdwOptions(void)
 		{"use_remote_estimate", ForeignServerRelationId, false},
 		{"use_remote_estimate", ForeignTableRelationId, false},
 		/* cost factors */
 		{"fdw_startup_cost", ForeignServerRelationId, false},
 		{"fdw_tuple_cost", ForeignServerRelationId, false},
 		/* shippable extensions */
 		{"extensions", ForeignServerRelationId, false},
 		/* updatable is available on both server and table */
 		{"updatable", ForeignServerRelationId, false},
 		{"updatable", ForeignTableRelationId, false},
+		/* 2PC compatibility */
+		{"two_phase_commit", ForeignServerRelationId, false},
 		{NULL, InvalidOid, false}
 	};
 
 	/* Prevent redundant initialization. */
 	if (postgres_fdw_options)
 		return;
 
 	/*
 	 * Get list of valid libpq options.
 	 *
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index cd4ed0c..3f765e3 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -9,20 +9,22 @@
  *		  contrib/postgres_fdw/postgres_fdw.c
  *
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
 
 #include "postgres_fdw.h"
 
 #include "access/htup_details.h"
 #include "access/sysattr.h"
+#include "access/fdw_xact.h"
+#include "access/xact.h"
 #include "commands/defrem.h"
 #include "commands/explain.h"
 #include "commands/vacuum.h"
 #include "foreign/fdwapi.h"
 #include "funcapi.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
 #include "nodes/nodeFuncs.h"
 #include "optimizer/cost.h"
 #include "optimizer/pathnode.h"
@@ -332,20 +334,26 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
 	/* Support functions for EXPLAIN */
 	routine->ExplainForeignScan = postgresExplainForeignScan;
 	routine->ExplainForeignModify = postgresExplainForeignModify;
 
 	/* Support functions for ANALYZE */
 	routine->AnalyzeForeignTable = postgresAnalyzeForeignTable;
 
 	/* Support functions for IMPORT FOREIGN SCHEMA */
 	routine->ImportForeignSchema = postgresImportForeignSchema;
 
+	/* Support functions for foreign transactions */
+	routine->GetPrepareId = postgresGetPrepareId;
+	routine->PrepareForeignTransaction = postgresPrepareForeignTransaction;
+	routine->ResolvePreparedForeignTransaction = postgresResolvePreparedForeignTransaction;
+	routine->EndForeignTransaction = postgresEndForeignTransaction;
+
 	PG_RETURN_POINTER(routine);
 }
 
 /*
  * postgresGetForeignRelSize
  *		Estimate # of rows and width of the result of the scan
  *
  * We should consider the effect of all baserestrictinfo clauses here, but
  * not any join clauses.
  */
@@ -959,21 +967,21 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
 	/* Get info about foreign table. */
 	fsstate->rel = node->ss.ss_currentRelation;
 	table = GetForeignTable(RelationGetRelid(fsstate->rel));
 	server = GetForeignServer(table->serverid);
 	user = GetUserMapping(userid, server->serverid);
 
 	/*
 	 * Get connection to the foreign server.  Connection manager will
 	 * establish new connection if necessary.
 	 */
-	fsstate->conn = GetConnection(server, user, false);
+	fsstate->conn = GetConnection(server, user, false, true, false);
 
 	/* Assign a unique ID for my cursor */
 	fsstate->cursor_number = GetCursorNumber(fsstate->conn);
 	fsstate->cursor_exists = false;
 
 	/* Get private info created by planner functions. */
 	fsstate->query = strVal(list_nth(fsplan->fdw_private,
 									 FdwScanPrivateSelectSql));
 	fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
 											   FdwScanPrivateRetrievedAttrs);
@@ -1357,21 +1365,21 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
 	 */
 	rte = rt_fetch(resultRelInfo->ri_RangeTableIndex, estate->es_range_table);
 	userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
 
 	/* Get info about foreign table. */
 	table = GetForeignTable(RelationGetRelid(rel));
 	server = GetForeignServer(table->serverid);
 	user = GetUserMapping(userid, server->serverid);
 
 	/* Open connection; report that we'll create a prepared statement. */
-	fmstate->conn = GetConnection(server, user, true);
+	fmstate->conn = GetConnection(server, user, true, true, false);
 	fmstate->p_name = NULL;		/* prepared statement not made yet */
 
 	/* Deconstruct fdw_private data. */
 	fmstate->query = strVal(list_nth(fdw_private,
 									 FdwModifyPrivateUpdateSql));
 	fmstate->target_attrs = (List *) list_nth(fdw_private,
 											  FdwModifyPrivateTargetAttnums);
 	fmstate->has_returning = intVal(list_nth(fdw_private,
 											 FdwModifyPrivateHasReturning));
 	fmstate->retrieved_attrs = (List *) list_nth(fdw_private,
@@ -1811,21 +1819,21 @@ estimate_path_cost_size(PlannerInfo *root,
 			appendWhereClause(&sql, root, baserel, fpinfo->remote_conds,
 							  true, NULL);
 		if (remote_join_conds)
 			appendWhereClause(&sql, root, baserel, remote_join_conds,
 							  (fpinfo->remote_conds == NIL), NULL);
 
 		if (pathkeys)
 			appendOrderByClause(&sql, root, baserel, pathkeys);
 
 		/* Get the remote estimate */
-		conn = GetConnection(fpinfo->server, fpinfo->user, false);
+		conn = GetConnection(fpinfo->server, fpinfo->user, false, true, false);
 		get_remote_estimate(sql.data, conn, &rows, &width,
 							&startup_cost, &total_cost);
 		ReleaseConnection(conn);
 
 		retrieved_rows = rows;
 
 		/* Factor in the selectivity of the locally-checked quals */
 		local_sel = clauselist_selectivity(root,
 										   local_join_conds,
 										   baserel->relid,
@@ -2390,21 +2398,21 @@ postgresAnalyzeForeignTable(Relation relation,
 	 * it's probably not worth redefining that API at this point.
 	 */
 
 	/*
 	 * Get the connection to use.  We do the remote access as the table's
 	 * owner, even if the ANALYZE was started by some other user.
 	 */
 	table = GetForeignTable(RelationGetRelid(relation));
 	server = GetForeignServer(table->serverid);
 	user = GetUserMapping(relation->rd_rel->relowner, server->serverid);
-	conn = GetConnection(server, user, false);
+	conn = GetConnection(server, user, false, true, false);
 
 	/*
 	 * Construct command to get page count for relation.
 	 */
 	initStringInfo(&sql);
 	deparseAnalyzeSizeSql(&sql, relation);
 
 	/* In what follows, do not risk leaking any PGresults. */
 	PG_TRY();
 	{
@@ -2482,21 +2490,21 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
 											ALLOCSET_SMALL_INITSIZE,
 											ALLOCSET_SMALL_MAXSIZE);
 
 	/*
 	 * Get the connection to use.  We do the remote access as the table's
 	 * owner, even if the ANALYZE was started by some other user.
 	 */
 	table = GetForeignTable(RelationGetRelid(relation));
 	server = GetForeignServer(table->serverid);
 	user = GetUserMapping(relation->rd_rel->relowner, server->serverid);
-	conn = GetConnection(server, user, false);
+	conn = GetConnection(server, user, false, true, false);
 
 	/*
 	 * Construct cursor that retrieves whole rows from remote.
 	 */
 	cursor_number = GetCursorNumber(conn);
 	initStringInfo(&sql);
 	appendStringInfo(&sql, "DECLARE c%u CURSOR FOR ", cursor_number);
 	deparseAnalyzeSql(&sql, relation, &astate.retrieved_attrs);
 
 	/* In what follows, do not risk leaking any PGresults. */
@@ -2683,21 +2691,21 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 					(errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
 					 errmsg("invalid option \"%s\"", def->defname)));
 	}
 
 	/*
 	 * Get connection to the foreign server.  Connection manager will
 	 * establish new connection if necessary.
 	 */
 	server = GetForeignServer(serverOid);
 	mapping = GetUserMapping(GetUserId(), server->serverid);
-	conn = GetConnection(server, mapping, false);
+	conn = GetConnection(server, mapping, false, true, false);
 
 	/* Don't attempt to import collation if remote server hasn't got it */
 	if (PQserverVersion(conn) < 90100)
 		import_collate = false;
 
 	/* Create workspace for strings */
 	initStringInfo(&buf);
 
 	/* In what follows, do not risk leaking any PGresults. */
 	PG_TRY();
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index f243de8..e61157f 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -10,20 +10,21 @@
  *
  *-------------------------------------------------------------------------
  */
 #ifndef POSTGRES_FDW_H
 #define POSTGRES_FDW_H
 
 #include "foreign/foreign.h"
 #include "lib/stringinfo.h"
 #include "nodes/relation.h"
 #include "utils/relcache.h"
+#include "access/fdw_xact.h"
 
 #include "libpq-fe.h"
 
 /*
  * FDW-specific planner information kept in RelOptInfo.fdw_private for a
  * foreign table.  This information is collected by postgresGetForeignRelSize.
  */
 typedef struct PgFdwRelationInfo
 {
 	/* baserestrictinfo clauses, broken down into safe and unsafe subsets. */
@@ -54,21 +55,22 @@ typedef struct PgFdwRelationInfo
 	ForeignServer *server;
 	UserMapping *user;			/* only set in use_remote_estimate mode */
 } PgFdwRelationInfo;
 
 /* in postgres_fdw.c */
 extern int	set_transmission_modes(void);
 extern void reset_transmission_modes(int nestlevel);
 
 /* in connection.c */
 extern PGconn *GetConnection(ForeignServer *server, UserMapping *user,
-			  bool will_prep_stmt);
+			  bool will_prep_stmt, bool start_transaction,
+			  bool connection_error_ok);
 extern void ReleaseConnection(PGconn *conn);
 extern unsigned int GetCursorNumber(PGconn *conn);
 extern unsigned int GetPrepStmtNumber(PGconn *conn);
 extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
 				   bool clear, const char *sql);
 
 /* in option.c */
 extern int ExtractConnectionOptions(List *defelems,
 						 const char **keywords,
 						 const char **values);
@@ -104,19 +106,26 @@ extern void deparseUpdateSql(StringInfo buf, PlannerInfo *root,
 				 List *targetAttrs, List *returningList,
 				 List **retrieved_attrs);
 extern void deparseDeleteSql(StringInfo buf, PlannerInfo *root,
 				 Index rtindex, Relation rel,
 				 List *returningList,
 				 List **retrieved_attrs);
 extern void deparseAnalyzeSizeSql(StringInfo buf, Relation rel);
 extern void deparseAnalyzeSql(StringInfo buf, Relation rel,
 				  List **retrieved_attrs);
 extern void deparseStringLiteral(StringInfo buf, const char *val);
+extern char	*postgresGetPrepareId(Oid serveroid, Oid userid, int *prep_info_len);
+extern bool postgresResolvePreparedForeignTransaction(Oid serverid, Oid userid,
+											bool is_commit,
+											int prep_info_len, char *prep_info);
+extern bool postgresEndForeignTransaction(Oid serverid, Oid userid, bool is_commit);
+extern bool postgresPrepareForeignTransaction(Oid serverid, Oid userid, int prep_info_len,
+									char *prep_info);
 extern Expr *find_em_expr_for_rel(EquivalenceClass *ec, RelOptInfo *rel);
 extern void appendOrderByClause(StringInfo buf, PlannerInfo *root,
 					RelOptInfo *baserel, List *pathkeys);
 
 /* in shippable.c */
 extern bool is_builtin(Oid objectId);
 extern bool is_shippable(Oid objectId, Oid classId, PgFdwRelationInfo *fpinfo);
 
 #endif   /* POSTGRES_FDW_H */
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 671e38c..b6fe637 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -873,10 +873,239 @@ IMPORT FOREIGN SCHEMA nonesuch FROM SERVER nowhere INTO notthere;
 -- We can fake this by dropping the type locally in our transaction.
 CREATE TYPE "Colors" AS ENUM ('red', 'green', 'blue');
 CREATE TABLE import_source.t5 (c1 int, c2 text collate "C", "Col" "Colors");
 
 CREATE SCHEMA import_dest5;
 BEGIN;
 DROP TYPE "Colors" CASCADE;
 IMPORT FOREIGN SCHEMA import_source LIMIT TO (t5)
   FROM SERVER loopback INTO import_dest5;  -- ERROR
 ROLLBACK;
+
+-- This will suppress the context of errors, which contains prepared transaction
+-- IDs. Those come out to be different each time.
+\set VERBOSITY terse
+-- Test transactional consistency for multiple server case
+-- create two loopback servers for testing consistency on two connections
+DO $d$
+    BEGIN
+        EXECUTE $$CREATE SERVER loopback1 FOREIGN DATA WRAPPER postgres_fdw
+            OPTIONS (dbname '$$||current_database()||$$',
+                     port '$$||current_setting('port')||$$',
+					 two_phase_commit 'true'
+            )$$;
+    END;
+$d$;
+
+DO $d$
+    BEGIN
+        EXECUTE $$CREATE SERVER loopback2 FOREIGN DATA WRAPPER postgres_fdw
+            OPTIONS (dbname '$$||current_database()||$$',
+                     port '$$||current_setting('port')||$$',
+					 two_phase_commit 'true'
+            )$$;
+    END;
+$d$;
+
+CREATE USER MAPPING FOR CURRENT_USER SERVER loopback1;
+CREATE USER MAPPING FOR CURRENT_USER SERVER loopback2;
+
+-- create a local table to refer to as foreign table. Add a row. The table has
+-- constraints which are deferred till end of transaction. This allows commit
+-- time errors to occur by inserting data which violates constraints.
+CREATE TABLE lt(val int UNIQUE DEFERRABLE INITIALLY DEFERRED);
+-- create two foreign tables each on separate server referring to the local table.
+CREATE FOREIGN TABLE ft1_lt (val int) SERVER loopback1 OPTIONS (table_name 'lt');
+CREATE FOREIGN TABLE ft2_lt (val int) SERVER loopback2 OPTIONS (table_name 'lt');
+
+-- test prepared transactions with foreign servers
+-- test for commit prepared
+BEGIN;
+	INSERT INTO ft1_lt VALUES (1);
+	INSERT INTO ft2_lt VALUES (3);
+PREPARE TRANSACTION 'prep_xact_with_fdw';
+-- prepared transactions should be seen in the system view
+SELECT P.database, P.gid AS "local transaction identifier",
+		"foreign server", "local user", status
+		FROM pg_fdw_xacts F
+			LEFT JOIN pg_prepared_xacts P ON F.transaction = P.transaction
+		WHERE P.database = F.database;	-- WHERE condition is actually an assertion
+
+COMMIT PREPARED 'prep_xact_with_fdw';
+SELECT * FROM lt;
+
+-- test for rollback prepared
+BEGIN;
+	INSERT INTO ft1_lt VALUES (10);
+	INSERT INTO ft2_lt VALUES (30);
+PREPARE TRANSACTION 'prep_xact_with_fdw';
+ROLLBACK PREPARED 'prep_xact_with_fdw';
+SELECT * FROM lt;
+
+-- In a transaction insert two rows one each to the two foreign tables. One of
+-- the rows violates the constraint and other not. At the time of commit
+-- constraints on one of the server will rollback transaction on that server in
+-- turn rolling back the whole transaction.
+BEGIN TRANSACTION;
+	INSERT INTO ft1_lt VALUES (1); -- Violates constraint
+	INSERT INTO ft2_lt VALUES (2);
+COMMIT TRANSACTION;
+SELECT * FROM lt;
+
+BEGIN TRANSACTION;
+	INSERT INTO ft1_lt VALUES (4);
+	INSERT INTO ft2_lt VALUES (3); -- Violates constraint
+COMMIT TRANSACTION;
+SELECT * FROM lt;
+
+-- Transaction involving local changes and remote changes, one of them or both
+-- violating the constraints
+BEGIN TRANSACTION;
+	INSERT INTO lt VALUES (1); -- violates constraints
+	INSERT INTO ft1_lt VALUES (5);
+COMMIT TRANSACTION;
+SELECT * FROM lt;
+
+BEGIN TRANSACTION;
+	INSERT INTO lt VALUES (6);
+	INSERT INTO ft1_lt VALUES (1); -- violates constraints
+COMMIT TRANSACTION;
+SELECT * FROM lt;
+
+BEGIN TRANSACTION;
+	INSERT INTO lt VALUES (1); -- violates constraints 
+	INSERT INTO ft1_lt VALUES (3); -- violates constraints
+COMMIT TRANSACTION;
+SELECT * FROM lt;
+
+-- Multiple foreign servers with local changes
+BEGIN TRANSACTION;
+	INSERT INTO lt VALUES (7);
+	INSERT INTO ft1_lt VALUES (8);
+	INSERT INTO ft2_lt VALUES (1); -- violates constraints
+COMMIT TRANSACTION;
+SELECT * FROM lt;
+
+-- test for removing foreign transactions 
+BEGIN;
+	INSERT INTO ft1_lt VALUES (10);
+	INSERT INTO ft2_lt VALUES (30);
+PREPARE TRANSACTION 'prep_xact_with_fdw';
+
+-- get the transaction identifiers for foreign servers loopback1 and loopback2
+SELECT "foreign transaction identifier" AS lbs1_id FROM pg_fdw_xacts WHERE "foreign server" = 'loopback1'
+\gset
+SELECT "foreign transaction identifier" AS lbs2_id FROM pg_fdw_xacts WHERE "foreign server" = 'loopback2'
+\gset
+-- Rollback the transactions with identifiers collected above. The foreign
+-- servers are pointing to self, so the transactions are local.
+ROLLBACK PREPARED :'lbs1_id';
+ROLLBACK PREPARED :'lbs2_id';
+-- Get the xid of parent transaction into a variable. The foreign
+-- transactions corresponding to this xid are removed later.
+SELECT transaction AS rem_xid FROM pg_prepared_xacts
+\gset
+
+-- There should be 2 entries corresponding to the prepared foreign transactions
+-- on two foreign servers.
+SELECT count(*) FROM pg_fdw_xacts WHERE transaction = :rem_xid;
+
+-- Remove the prepared foreign transaction entries.
+SELECT pg_fdw_remove(:'rem_xid'::xid);
+
+-- There should be no foreign prepared transactions now.
+SELECT count(*) FROM pg_fdw_xacts WHERE transaction = :rem_xid;
+
+-- Rollback the parent transaction to release any resources
+ROLLBACK PREPARED 'prep_xact_with_fdw';
+-- source table should be in-tact
+SELECT * FROM lt;
+
+-- test for failing prepared transaction
+BEGIN;
+	INSERT INTO ft1_lt VALUES (1); -- violates constraint, so prepare should fail
+	INSERT INTO ft2_lt VALUES (2);
+PREPARE TRANSACTION 'prep_fdw_xact_failure'; -- should fail
+-- We shouldn't see anything, the transactions prepared on the foreign servers
+-- should be rolled back.
+SELECT database, "foreign server", "local user", status FROM pg_fdw_xacts;
+SELECT database, gid FROM pg_prepared_xacts;
+
+-- subtransactions with foreign servers
+TRUNCATE TABLE lt;
+BEGIN TRANSACTION;
+	INSERT INTO ft1_lt VALUES (1);
+	INSERT INTO ft2_lt VALUES (2);
+	SAVEPOINT sv1;
+		UPDATE ft1_lt SET val = val + 1;
+		UPDATE ft2_lt SET val = val + 1;
+	ROLLBACK TO SAVEPOINT sv1;
+	SAVEPOINT sv2;
+		UPDATE ft1_lt SET val = val + 2;
+		UPDATE ft2_lt SET val = val + 2;
+	RELEASE SAVEPOINT sv2;
+	INSERT INTO lt VALUES (10);
+COMMIT TRANSACTION;
+SELECT * FROM lt;
+
+TRUNCATE TABLE lt;
+BEGIN TRANSACTION;
+	INSERT INTO ft1_lt VALUES (1);
+	INSERT INTO ft2_lt VALUES (2);
+	SAVEPOINT sv1;
+		UPDATE ft1_lt SET val = val + 1;
+		UPDATE ft2_lt SET val = val + 1;
+	ROLLBACK TO SAVEPOINT sv1;
+	SAVEPOINT sv2;
+		UPDATE ft1_lt SET val = val + 2;
+		UPDATE ft2_lt SET val = val + 2;
+	RELEASE SAVEPOINT sv2;
+	INSERT INTO lt VALUES (10);
+PREPARE TRANSACTION 'prep_xact_fdw_subxact';
+-- only top transaction's xid should be recorded, not that of subtransactions'
+SELECT P.database, P.gid AS "local transaction identifier",
+		"foreign server", "local user", status
+		FROM pg_fdw_xacts F
+			LEFT JOIN pg_prepared_xacts P ON F.transaction = P.transaction
+		WHERE P.database = F.database;	-- WHERE condition is actually an assertion
+
+COMMIT PREPARED 'prep_xact_fdw_subxact';
+SELECT * FROM lt;
+
+-- What if one of the servers involved in a transaction isn't capable of 2PC?
+-- Those servers capable of two phase commit, will commit their transactions
+-- atomically with the local transaction. The transactions on the incapable
+-- servers will be committed independent of the outcome of the other foreign
+-- transactions.
+TRUNCATE TABLE lt;
+INSERT INTO lt VALUES (1);
+
+ALTER SERVER loopback2 OPTIONS (SET two_phase_commit 'false'); 
+-- Changes to the local server and the loopback1 will be rolled back as prepare
+-- on loopback1 would fail because of constraint violation. But the changes on
+-- loopback2, which doesn't execute two phase commit, will be committed.
+BEGIN TRANSACTION;
+	INSERT INTO ft2_lt VALUES (2);
+	INSERT INTO lt VALUES (3);
+	INSERT INTO ft1_lt VALUES (1);
+COMMIT TRANSACTION;
+SELECT * FROM lt;
+
+TRUNCATE TABLE lt;
+INSERT INTO lt VALUES (1);
+
+-- Changes to all the servers, local and foreign, will be rolled back as those
+-- on loopback2 (incapable of two-phase commit) could not be commited.
+BEGIN TRANSACTION;
+	INSERT INTO ft2_lt VALUES (1);
+	INSERT INTO lt VALUES (3);
+	INSERT INTO ft1_lt VALUES (2);
+COMMIT TRANSACTION;
+SELECT * FROM lt;
+
+-- At the end, we should not have any foreign transaction remaining unresolved
+SELECT * FROM pg_fdw_xacts;
+
+DROP SERVER loopback1 CASCADE;
+DROP SERVER loopback2 CASCADE;
+DROP TABLE lt;
+\set VERBOSITY default
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 5549de7..3418c81 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -1411,20 +1411,62 @@ include_dir 'conf.d'
        </para>
 
        <para>
         When running a standby server, you must set this parameter to the
         same or higher value than on the master server. Otherwise, queries
         will not be allowed in the standby server.
        </para>
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-max-prepared-foreign-transactions" xreflabel="max_prepared_foreign_transactions">
+      <term><varname>max_prepared_foreign_transactions</varname> (<type>integer</type>)
+      <indexterm>
+       <primary><varname>max_prepared_foreign_transactions</> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Sets the maximum number of foreign transactions that can be prepared
+        simultaneously.
+        If this parameter is set to zero (which is the default) and
+        <xref linkend="guc-atomic-foreign-transaction"> is enabled,
+        transactions involving foreign servers will not succeed, because foreign
+        transactions can not be prepared.
+        This parameter can only be set at server start.
+       </para>
+
+       <para>
+        When running a standby server, you must set this parameter to the
+        same or higher value than on the master server. Otherwise, queries
+        will not be allowed in the standby server.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry id="guc-atomic-foreign-transaction" xreflabel="atomic_foreign_transaction">
+      <term><varname>atomic_foreign_transaction</varname> (<type>boolean</type>)
+      <indexterm>
+       <primary><varname>atomic_foreign_transaction</> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+       When this parameter is enabled the transaction involving foreign server/s is
+       guaranteed to commit all or none of the changes to the foreign server/s.
+       The parameter can be set any time during the session. The value of this parameter
+       at the time of committing the transaction is used.
+       </para>
+
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-work-mem" xreflabel="work_mem">
       <term><varname>work_mem</varname> (<type>integer</type>)
       <indexterm>
        <primary><varname>work_mem</> configuration parameter</primary>
       </indexterm>
       </term>
       <listitem>
        <para>
         Specifies the amount of memory to be used by internal sort operations
         and hash tables before writing to temporary disk files. The value
diff --git a/doc/src/sgml/fdwhandler.sgml b/doc/src/sgml/fdwhandler.sgml
index 1533a6b..247aa09 100644
--- a/doc/src/sgml/fdwhandler.sgml
+++ b/doc/src/sgml/fdwhandler.sgml
@@ -918,20 +918,85 @@ ImportForeignSchema (ImportForeignSchemaStmt *stmt, Oid serverOid);
      useful to test whether a given foreign-table name will pass the filter.
     </para>
 
     <para>
      If the FDW does not support importing table definitions, the
      <function>ImportForeignSchema</> pointer can be set to <literal>NULL</>.
     </para>
 
    </sect2>
 
+   <sect2 id="fdw-callbacks-transactions">
+    <title>FDW Routines For transaction management</title>
+
+    <para>
+<programlisting>
+char *
+GetPrepareInfo (Oid serverOid, Oid userid, int *prep_info_len);
+</programlisting>
+
+     Get prepared transaction identifier for given foreign server and user.
+     This function is called when executing <xref linkend="sql-commit">, if
+     <literal>atomic_foreign_transaction</> is enabled. It should return a
+     valid transaction identifier that will be used to prepare transaction
+     on the foreign server. The <parameter>prep_info_len</> should be set
+     to the length of this identifier. The identifier should not be longer
+     than 256 bytes. The identifier should not cause conflict with existing
+     identifiers on the foreign server. It should be unique enough not to
+     identify a transaction in future. It's possible that a transaction is
+     considered unresolved on <productname>PostgreSQL</> while it is resolved
+     in reality. This causes the foreign transaction resolver to try resolving
+     the transaction till it finds out that the transaction has been resolved.
+     In case the transaction identifier is same as a future transaction identifier
+     there is a possibility of the future transaction getting resolved
+     erroneously.
+    </para>
+
+    <para>
+     If a foreign server with Foreign Data Wrapper having <literal>NULL</>
+      <function>GetPrepareInfo</> pointer participates in a transaction
+      with<literal>atomic_foreign_transaction</> enabled, the transaction
+      is aborted.
+    </para>
+
+    <para>
+<programlisting>
+bool
+HandleForeignTransaction (Oid serverOid, Oid userid, FDWXactAction action,
+                            int prep_id_len, char *prep_id)
+</programlisting>
+
+     Function to end a transaction on the given foreign server with given user.
+     This function is called when executing <xref linkend="sql-commit"> or
+     <xref linkend="sql-rollback">. The function should complete a transaction
+     according to the <parameter>action</> specified. The function should
+     return TRUE on successful completion of transaction and FALSE otherwise.
+     It should not throw an error in case of failure to complete the transaction.
+    </para>
+
+    <para>
+    When <parameter>action</> is FDW_XACT_COMMIT or FDW_XACT_ABORT, the function
+    should commit or rollback the running transaction resp. When <parameter>action</>
+    is FDW_XACT_PREPARE, the running transaction should be prepared with the
+    identifier given by <parameter>prep_id</> and <parameter>prep_id_len</>.
+    When <parameter>action</> is FDW_XACT_ABORT_PREPARED or FDW_XACT_COMMIT_PREPARED
+    the function should respectively commit or rollback the transaction identified
+    by <parameter>prep_id</> and <parameter>prep_id_len</>.
+    </para>
+
+    <para>
+    This function is usually called at the end of the transaction, when the
+    access to the database may not be possible. Trying to access catalogs
+    in this function may cause error to be thrown and can affect other foreign
+    data wrappers. 
+    </para>
+   </sect2>
    </sect1>
 
    <sect1 id="fdw-helpers">
     <title>Foreign Data Wrapper Helper Functions</title>
 
     <para>
      Several helper functions are exported from the core server so that
      authors of foreign data wrappers can get easy access to attributes of
      FDW-related objects, such as FDW options.
      To use any of these functions, you need to include the header file
@@ -1318,11 +1383,93 @@ GetForeignServerByName(const char *name, bool missing_ok);
     <para>
      See <filename>src/include/nodes/lockoptions.h</>, the comments
      for <type>RowMarkType</> and <type>PlanRowMark</>
      in <filename>src/include/nodes/plannodes.h</>, and the comments for
      <type>ExecRowMark</> in <filename>src/include/nodes/execnodes.h</> for
      additional information.
     </para>
 
   </sect1>
 
+   <sect1 id="fdw-transactions">
+    <title>Transaction manager for Foreign Data Wrappers</title>
+
+    <para>
+    <productname>PostgreSQL</> transaction manager allows FDWs to read and write
+    data on foreign server within a transaction while maintaining atomicity
+    (and hence consistency) of the foreign data. Every Foreign Data Wrapper is
+    required to register the foreign server along with the <productname>PostgreSQL</>
+    user whose user mapping is used to connect to the foreign server while starting a
+    transaction on the foreign server as part of the transaction on
+    <productname>PostgreSQL</> using <function>RegisterXactForeignServer</>.
+<programlisting>
+void
+RegisterXactForeignServer(Oid serverid,
+                            Oid userid,
+                            bool two_phase_compliant)
+</programlisting>
+    <varname>two_phase_compliant</> should be true if the foreign server supports
+    two-phase commit protocol, false otherwise.
+    </para>
+
+    <para>
+    An example of such transaction is as follows
+<programlisting>
+BEGIN;
+UPDATE ft1 SET col = 'a';
+UPDATE ft2 SET col = 'b';
+COMMIT;
+</programlisting>
+    ft1 and ft2 are foreign tables on different foreign servers may be using different
+    Foreign Data Wrappers.
+    </para>
+
+    <para>
+    When <varname>atomic_foreign_transaction</> is enabled
+    <productname>PostgreSQL</> employs Two-phase commit protocol to achieve
+    atomic distributed transaction. All the foreign servers registered should
+    support two-phase commit protocol. In Two-phase commit protocol the commit
+    is processed in two phases: prepare phase and commit phase. In prepare phase,
+    <productname>PostgreSQL</> prepares the transactions on all the foreign
+    servers registered using <function>RegisterXactForeignServer</>.
+    If any of the foreign server fails to prepare transaction, prepare phase fails.
+    In commit phase, all the prepared transactions are committed if prepare
+    phase has succeeded or rolled back if prepare phase fails to prepare
+    transactions on all the foreign servers.
+    </para>
+
+    <para>
+    During prepare phase the distributed transaction manager calls
+    <function>GetPrepareInfo</> to get the prepared transaction identifier for
+    each foreign server involved. It stores this identifier along with the
+    serverid and userid for later use. It then calls
+    <function>HandleForeignTransaction</> with the same identifier with action
+    FDW_XACT_PREPARE.
+    </para>
+    
+    <para>
+    During commit phase the distributed transaction manager calls
+    <function>HandleForeignTransaction</> with the same identifier with action
+    FDW_XACT_COMMIT_PREPARED to commit the prepared transaction or
+    FDW_XACT_ABORT_PREPARED to rollback the prepared transaction. In case the
+    distributed transaction manager fails to commit or rollback a prepared
+    transaction because of connection failure, the operation can be tried again
+    through built-in <function>pg_fdw_xact</>. One may set up a background worker
+    process to retry the operation by installing extension pg_fdw_xact_resolver
+    and including $libdir/pg_fdw_xact_resolver.so in
+    <varname>shared_preload_libraries</>.
+    </para>
+
+    <para>
+    When <varname>atomic_foreign_transaction</> is disabled, atomicity can not be
+    guaranteed across foreign servers. If transaction on <productname>PostgreSQL</>
+    is committed, Distributed transaction manager calls
+    <function>HandleForeignTransaction</> to commit the transaction on all the
+    foreign servers registered using <function>RegisterXactForeignServer</>,
+    independent of the outcome of the same operation on other foreign servers.
+    Thus transactions on some foreign servers may be committed, while the same
+    on other foreign servers would be rolled back. If the transaction on
+    <productname>PostgreSQL</> aborts transactions on all the foreign servers
+    are aborted too.
+    </para>
+    </sect1>
  </chapter>
diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile
index c72a1f2..8c1afcf 100644
--- a/src/backend/access/rmgrdesc/Makefile
+++ b/src/backend/access/rmgrdesc/Makefile
@@ -1,16 +1,16 @@
 #
 # Makefile for the rmgr descriptor routines
 #
 # src/backend/access/rmgrdesc/Makefile
 #
 
 subdir = src/backend/access/rmgrdesc
 top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = brindesc.o clogdesc.o committsdesc.o dbasedesc.o gindesc.o gistdesc.o \
-	   hashdesc.o heapdesc.o mxactdesc.o nbtdesc.o relmapdesc.o \
+OBJS = brindesc.o clogdesc.o committsdesc.o dbasedesc.o fdw_xactdesc.o gindesc.o \
+	   gistdesc.o hashdesc.o heapdesc.o mxactdesc.o nbtdesc.o relmapdesc.o \
 	   replorigindesc.o seqdesc.o smgrdesc.o spgdesc.o \
 	   standbydesc.o tblspcdesc.o xactdesc.o xlogdesc.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/rmgrdesc/fdw_xactdesc.c b/src/backend/access/rmgrdesc/fdw_xactdesc.c
new file mode 100644
index 0000000..0f0c899
--- /dev/null
+++ b/src/backend/access/rmgrdesc/fdw_xactdesc.c
@@ -0,0 +1,61 @@
+/*-------------------------------------------------------------------------
+ *
+ * fdw_xactdesc.c
+ *		PostgreSQL distributed transaction manager. 
+ *
+ * This module describes the WAL records for foreign transaction manager. 
+ *
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/access/transam/fdw_xactdesc.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/fdw_xact.h"
+#include "access/xloginsert.h"
+#include "lib/stringinfo.h"
+
+extern void
+fdw_xact_desc(StringInfo buf, XLogReaderState *record)
+{
+	char	   *rec = XLogRecGetData(record);
+	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+	if (info == XLOG_FDW_XACT_INSERT)
+	{
+		FDWXactOnDiskData *fdw_insert_xlog = (FDWXactOnDiskData *)rec;
+		appendStringInfo(buf, "Foreign server oid: %u", fdw_insert_xlog->serveroid);
+		appendStringInfo(buf, " user oid: %u", fdw_insert_xlog->userid);
+		appendStringInfo(buf, " database id: %u", fdw_insert_xlog->dboid);
+		/* TODO: This should be really interpreted by each FDW */
+		/* TODO: we also need to assess whether we want to add this information */
+		appendStringInfo(buf, " foreign transaction info: ");
+		appendStringInfo(buf, "%.*s", fdw_insert_xlog->fdw_xact_id_len,
+							fdw_insert_xlog->fdw_xact_id);
+	}
+	else
+	{
+		FdwRemoveXlogRec	*fdw_remove_xlog = (FdwRemoveXlogRec *)rec;
+		appendStringInfo(buf, "Foreign server oid: %u", fdw_remove_xlog->serveroid);
+		appendStringInfo(buf, " user oid: %u", fdw_remove_xlog->userid);
+		appendStringInfo(buf, " database id: %u", fdw_remove_xlog->dbid);
+	}
+
+}
+
+extern const char *
+fdw_xact_identify(uint8 info)
+{
+	switch(info & ~XLR_INFO_MASK)
+	{
+		case XLOG_FDW_XACT_INSERT:
+			return "NEW FOREIGN TRANSACTION";
+		case XLOG_FDW_XACT_REMOVE:
+			return "REMOVE FOREIGN TRANSACTION";
+	}
+	/* Keep compiler happy */
+	return NULL;
+}
diff --git a/src/backend/access/rmgrdesc/xlogdesc.c b/src/backend/access/rmgrdesc/xlogdesc.c
index 83cc9e8..041964a 100644
--- a/src/backend/access/rmgrdesc/xlogdesc.c
+++ b/src/backend/access/rmgrdesc/xlogdesc.c
@@ -104,28 +104,29 @@ xlog_desc(StringInfo buf, XLogReaderState *record)
 			if (entry->val == xlrec.wal_level)
 			{
 				wal_level_str = entry->name;
 				break;
 			}
 		}
 
 		appendStringInfo(buf, "max_connections=%d max_worker_processes=%d "
 						 "max_prepared_xacts=%d max_locks_per_xact=%d "
 						 "wal_level=%s wal_log_hints=%s "
-						 "track_commit_timestamp=%s",
+						 "track_commit_timestamp=%s max_fdw_xacts=%d",
 						 xlrec.MaxConnections,
 						 xlrec.max_worker_processes,
 						 xlrec.max_prepared_xacts,
 						 xlrec.max_locks_per_xact,
 						 wal_level_str,
 						 xlrec.wal_log_hints ? "on" : "off",
-						 xlrec.track_commit_timestamp ? "on" : "off");
+						 xlrec.track_commit_timestamp ? "on" : "off",
+						 xlrec.max_fdw_xacts);
 	}
 	else if (info == XLOG_FPW_CHANGE)
 	{
 		bool		fpw;
 
 		memcpy(&fpw, rec, sizeof(bool));
 		appendStringInfoString(buf, fpw ? "true" : "false");
 	}
 	else if (info == XLOG_END_OF_RECOVERY)
 	{
diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index 94455b2..51b2efd 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -8,16 +8,17 @@
 #
 #-------------------------------------------------------------------------
 
 subdir = src/backend/access/transam
 top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
 OBJS = clog.o commit_ts.o multixact.o parallel.o rmgr.o slru.o subtrans.o \
 	timeline.o transam.o twophase.o twophase_rmgr.o varsup.o \
 	xact.o xlog.o xlogarchive.o xlogfuncs.o \
-	xloginsert.o xlogreader.o xlogutils.o
+	xloginsert.o xlogreader.o xlogutils.o \
+	fdw_xact.o
 
 include $(top_srcdir)/src/backend/common.mk
 
 # ensure that version checks in xlog.c get recompiled when catversion.h changes
 xlog.o: xlog.c $(top_srcdir)/src/include/catalog/catversion.h
diff --git a/src/backend/access/transam/fdw_xact.c b/src/backend/access/transam/fdw_xact.c
new file mode 100644
index 0000000..9f315d9
--- /dev/null
+++ b/src/backend/access/transam/fdw_xact.c
@@ -0,0 +1,2024 @@
+/*-------------------------------------------------------------------------
+ *
+ * fdw_xact.c
+ *		PostgreSQL distributed transaction manager. 
+ *
+ * This module manages the transactions involving foreign servers. 
+ *
+ * Copyright (c) 2015, PostgreSQL Global Development Group
+ *
+ * src/backend/access/transam/fdw_xact.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+
+#include "postgres.h"
+
+#include "miscadmin.h"
+#include "funcapi.h"
+
+#include "access/fdw_xact.h"
+#include "access/htup_details.h"
+#include "access/xlog.h"
+#include "access/xloginsert.h"
+#include "catalog/pg_type.h"
+#include "foreign/foreign.h"
+#include "foreign/fdwapi.h"
+#include "libpq/pqsignal.h"
+#include "storage/fd.h"
+#include "storage/ipc.h"
+#include "storage/latch.h"
+#include "storage/lock.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
+#include "storage/shmem.h"
+#include "utils/builtins.h"
+#include "utils/memutils.h"
+#include "utils/guc.h"
+#include "utils/snapmgr.h"
+
+/*
+ * This comment summarises how the transaction manager handles transactions
+ * involving one or more foreign server/s.
+ *
+ * When an foreign data wrapper starts transaction on a foreign server, it is
+ * required to register the foreign server and user who initiated the
+ * transaction using function RegisterXactForeignServer(). A foreign server
+ * connection is identified by oid of foreign server and user.
+ *
+ * The commit is executed in two phases:
+ * First phase (executed during pre-commit processing)
+ * -----------
+ * Transactions are prepared on all the foreign servers, which can participate
+ * in two-phase commit protocol. Transaction on other foreign servers are
+ * committed in the same phase.
+ *
+ * Second phase (executed during post-commit/abort processing)
+ * ------------
+ * If first phase succeeds, foreign servers are requested to commit respective
+ * prepared transactions. If the first phase  does not succeed because of any
+ * failure, the foreign servers are asked to rollback respective prepared
+ * transactions or abort the transactions if they are not prepared.
+ *
+ * Any network failure, server crash after preparing foreign transaction leaves
+ * that prepared transaction unresolved. During the first phase, before actually
+ * preparing the transactions, enough information is persisted to the disk and
+ * logs in order to resolve such transactions.
+ */
+
+/* Shared memory entry for a prepared or being prepared foreign transaction */
+typedef struct FDWXactData	*FDWXact;
+
+/* Structure to bundle the foreign connection participating in transaction */ 
+typedef struct
+{
+	Oid							serverid;
+	Oid							userid;
+	char						*servername;
+	FDWXact						fdw_xact;	/* foreign prepared transaction entry
+											   in case prepared */
+	bool						two_phase_commit;	/* Should use two phase commit
+													 * protocol while committing
+													 * transaction on this
+													 * server, whenever
+													 * necessary.
+													 */
+	GetPrepareId_function		prepare_id_provider;
+	EndForeignTransaction_function	end_foreing_xact;
+	PrepareForeignTransaction_function	prepare_foreign_xact;
+	ResolvePreparedForeignTransaction_function	resolve_prepared_foreign_xact;
+} FDWConnection;
+
+/* List of foreign connections participating in the transaction */
+List	*MyFDWConnections = NIL;
+
+/*
+ * By default we assume that all the foreign connections participating in this
+ * transaction can use two phase commit protocol.
+ */
+bool	TwoPhaseReady = true;
+
+/* Record the server, userid participating in the transaction. */
+void
+RegisterXactForeignServer(Oid serverid, Oid userid, bool two_phase_commit)
+{
+	FDWConnection	*fdw_conn;
+	ListCell		*lcell;
+	ForeignServer	*foreign_server;
+	ForeignDataWrapper	*fdw;
+	FdwRoutine 		*fdw_routine;
+	MemoryContext	old_context;
+
+	TwoPhaseReady = TwoPhaseReady && two_phase_commit;
+
+	/* Check if the entry already exists, if so, raise an error */
+	foreach(lcell, MyFDWConnections)
+	{
+		fdw_conn = lfirst(lcell);
+		if (fdw_conn->serverid == serverid && fdw_conn->userid == userid)
+			ereport(ERROR,
+					(errmsg("attempt to start transction again on server %u with user %u",
+							serverid, userid)));
+	}
+
+	/* This list and its contents needs to be saved in the transaction context memory */
+	old_context = MemoryContextSwitchTo(TopTransactionContext);
+	/* Add this foreign connection to the list for transaction management */
+	fdw_conn = (FDWConnection *) palloc(sizeof(FDWConnection));
+	fdw_conn->serverid = serverid;
+	fdw_conn->userid = userid;
+
+	/* Make sure that the FDW has at least a transaction handler */
+	foreign_server = GetForeignServer(fdw_conn->serverid); 
+	fdw = GetForeignDataWrapper(foreign_server->fdwid);
+	fdw_routine = GetFdwRoutine(fdw->fdwhandler);
+	if (!fdw_routine->EndForeignTransaction)
+			elog(ERROR, "no function to end a foreign transaction provided for FDW %s",
+					fdw->fdwname);
+
+	if (two_phase_commit)
+	{
+		if (!fdw_routine->GetPrepareId)
+			elog(ERROR, "no prepared transaction identifier provider function for FDW %s",
+					fdw->fdwname);
+
+		if (!fdw_routine->PrepareForeignTransaction)
+			elog(ERROR, "no function provided for preparing foreign transaction for FDW %s",
+					fdw->fdwname);
+
+		if (!fdw_routine->ResolvePreparedForeignTransaction)
+			elog(ERROR, "no function provided for resolving prepared foreign transaction for FDW %s",
+					fdw->fdwname);
+	}
+
+	/*
+	 * We may need following information at the end of a transaction, when the
+	 * system caches are not available. So save it before hand.
+	 */
+	fdw_conn->servername = foreign_server->servername;
+	fdw_conn->prepare_id_provider = fdw_routine->GetPrepareId;
+	fdw_conn->prepare_foreign_xact = fdw_routine->PrepareForeignTransaction;
+	fdw_conn->resolve_prepared_foreign_xact = fdw_routine->ResolvePreparedForeignTransaction;
+	fdw_conn->end_foreing_xact = fdw_routine->EndForeignTransaction;
+	fdw_conn->fdw_xact = NULL;
+	fdw_conn->two_phase_commit = two_phase_commit;
+	MyFDWConnections = lappend(MyFDWConnections, fdw_conn);
+	/* Revert back the context */
+	MemoryContextSwitchTo(old_context);
+
+	return;
+}
+
+/* Prepared transaction identifier can be maximum 256 bytes long */
+#define MAX_FDW_XACT_ID_LEN	256
+
+/* Enum to track the status of prepared foreign transaction */
+typedef enum
+{
+	FDW_XACT_PREPARING,			/* foreign transaction is (being) prepared */
+	FDW_XACT_COMMITTING_PREPARED,	/* foreign prepared transaction is to be committed */
+	FDW_XACT_ABORTING_PREPARED,	/* foreign prepared transaction is to be aborted */
+	FDW_XACT_RESOLVED			/* Status used only by pg_fdw_resolve().
+								   It doesn't appear in the in-memory entry. */
+} FDWXactStatus;
+
+typedef struct FDWXactData
+{
+	FDWXact			fx_next;	/* Next free FDWXact entry */
+	Oid				dboid;		/* database oid where to find foreign server and
+								 * user mapping
+								 */
+	TransactionId	local_xid;	/* XID of local transaction */ 
+	Oid				serveroid;	/* foreign server where transaction takes place */
+	Oid				userid;		/* user who initiated the foreign transaction */
+	FDWXactStatus	fdw_xact_status;	/* The state of the foreign transaction.
+										   This doubles as the action to be
+										   taken on this entry.*/
+	XLogRecPtr		fdw_xact_lsn;		/* LSN of the log record for inserting this entry */ 
+	bool			fdw_xact_valid;		/* Has the entry been complete and written to file? */
+	BackendId		locking_backend;	/* Backend working on this entry */
+	int				fdw_xact_id_len;	/* Length of prepared transaction identifier */
+	char			fdw_xact_id[MAX_FDW_XACT_ID_LEN];	/* prepared transaction identifier */
+} FDWXactData;
+
+/* Directory where the foreign prepared transaction files will reside */
+#define FDW_XACTS_DIR "pg_fdw_xact"
+
+/*
+ * Name of foreign prepared transaction file is 8 bytes xid, 8 bytes foreign
+ * server oid and 8 bytes user oid separated by '_'.
+ */
+#define FDW_XACT_FILE_NAME_LEN (8 + 1 + 8 + 1 + 8)
+#define FDWXactFilePath(path, xid, serveroid, userid)	\
+	snprintf(path, MAXPGPATH, FDW_XACTS_DIR "/%08X_%08X_%08X", xid, \
+							serveroid, userid)
+
+/* Shared memory layout for maintaining foreign prepared transaction entries. */
+typedef struct
+{
+	/* Head of linked list of free FDWXactData structs */
+	FDWXact		freeFDWXacts;
+
+	/* Number of valid FDW transaction entries */
+	int			num_fdw_xacts;
+
+	/* Upto max_fdw_xacts entries in the array */
+	FDWXact		fdw_xacts[FLEXIBLE_ARRAY_MEMBER];	/* Variable length array */
+} FDWXactGlobalData;
+
+static void AtProcExit_FDWXact(int code, Datum arg);
+static bool resolve_fdw_xact(FDWXact fdw_xact,
+							ResolvePreparedForeignTransaction_function prepared_foreign_xact_resolver);
+static FDWXact insert_fdw_xact(Oid dboid, TransactionId xid, Oid foreign_server, Oid userid,
+										int fdw_xact_id_len, char *fdw_xact_id,
+										FDWXactStatus fdw_xact_status);
+static void unlock_fdw_xact(FDWXact fdw_xact);
+static void unlock_fdw_xact_entries();
+static void remove_fdw_xact(FDWXact fdw_xact);
+static FDWXact register_fdw_xact(Oid dbid, TransactionId xid, Oid serveroid,
+								Oid userid, int fdw_xact_info_len,
+								char *fdw_xact_info);
+static int GetFDWXactList(FDWXact *fdw_xacts);
+static ResolvePreparedForeignTransaction_function get_prepared_foreign_xact_resolver(FDWXact fdw_xact);
+static FDWXactOnDiskData *ReadFDWXactFile(TransactionId xid, Oid serveroid,
+											Oid userid);
+static void RemoveFDWXactFile(TransactionId xid, Oid serveroid, Oid userid,
+								bool giveWarning);
+static void prepare_foreign_transactions(void);
+bool search_fdw_xact(TransactionId xid, Oid dbid, Oid serverid, Oid userid,
+						List **qualifying_xacts);
+
+/*
+ * Maximum number of foreign prepared transaction entries at any given time
+ * GUC variable, change requires restart.
+ */
+int	max_fdw_xacts = 0;
+
+/* Keep track of registering process exit call back. */
+static bool fdwXactExitRegistered = false;
+
+/* Pointer to the shared memory holding the foreign transactions data */
+static FDWXactGlobalData	*FDWXactGlobal;
+
+/* foreign transaction entries locked by this backend */
+List	*MyLockedFDWXacts = NIL;
+
+/*
+ * FDWXactShmemSize
+ * Calculates the size of shared memory allocated for maintaining foreign
+ * prepared transaction entries.
+ */
+extern Size
+FDWXactShmemSize(void)
+{
+	Size		size;
+
+	/* Need the fixed struct, foreign transaction information array */ 
+	size = offsetof(FDWXactGlobalData, fdw_xacts);
+	size = add_size(size, mul_size(max_fdw_xacts,
+								   sizeof(FDWXact)));
+	size = MAXALIGN(size);
+	size = add_size(size, mul_size(max_fdw_xacts,
+								   sizeof(FDWXactData)));
+
+	return size;
+}
+
+/*
+ * FDWXactShmemInit
+ * Initialization of shared memory for maintaining foreign prepared transaction
+ * entries. The shared memory layout is defined in definition of
+ * FDWXactGlobalData structure.
+ */
+extern void
+FDWXactShmemInit(void)
+{
+	bool		found;
+
+	FDWXactGlobal = ShmemInitStruct("Foreign transactions table",
+									FDWXactShmemSize(),
+									&found);
+	if (!IsUnderPostmaster)
+	{
+		FDWXact	fdw_xacts;
+		int		cnt;
+
+		Assert(!found);
+		FDWXactGlobal->freeFDWXacts = NULL;
+		FDWXactGlobal->num_fdw_xacts = 0;
+
+		/* Initialise the linked list of free FDW transactions */
+		fdw_xacts = (FDWXact)
+			((char *) FDWXactGlobal +
+			 MAXALIGN(offsetof(FDWXactGlobalData, fdw_xacts) +
+					  sizeof(FDWXact) * max_fdw_xacts));
+		for (cnt = 0; cnt < max_fdw_xacts; cnt++)
+		{
+			fdw_xacts[cnt].fx_next = FDWXactGlobal->freeFDWXacts;
+			FDWXactGlobal->freeFDWXacts = &fdw_xacts[cnt];
+		}
+	}
+	else
+	{
+		Assert(FDWXactGlobal);
+		Assert(found);
+	}
+}
+
+/*
+ * PreCommit_FDWXacts
+ * The function is responsible for pre-commit processing on foreign connections.
+ * The foreign transactions are prepared on the foreign servers which can
+ * execute two-phase-commit protocol. Those will be aborted or committed after
+ * the current transaction has been aborted or committed resp. We try to commit
+ * transactions on rest of the foreign servers now. For these foreign servers
+ * it is possible that some transactions commit even if the local transaction
+ * aborts.
+ */
+void
+PreCommit_FDWXacts(void)
+{
+	ListCell	*cur;
+	ListCell	*prev;
+	ListCell	*next;
+
+	/* If there are no foreign servers involved, we have no business here */
+	if (list_length(MyFDWConnections) < 1)
+		return;
+
+	/*
+	 * Try committing transactions on the foreign servers, which can not execute
+	 * two-phase-commit protocol.
+	 */
+	for (cur = list_head(MyFDWConnections), prev = NULL; cur; cur = next)
+	{
+		FDWConnection *fdw_conn = lfirst(cur);
+		next = lnext(cur);
+
+		if (!fdw_conn->two_phase_commit)
+		{
+			/*
+			 * The FDW has to make sure that the connection opened to the
+			 * foreign server is out of transaction. Even if the handler
+			 * function returns failure statue, there's hardly anything to do.
+			 */
+			if (!fdw_conn->end_foreing_xact(fdw_conn->serverid, fdw_conn->userid,
+												true))
+				elog(WARNING, "could not commit transaction on server %s",
+								fdw_conn->servername);
+
+			/* The connection is no more part of this transaction, forget it */
+			MyFDWConnections = list_delete_cell(MyFDWConnections, cur, prev);
+		}
+		else
+			prev = cur;
+	}
+
+	/*
+	 * Prepare the transactions on the foreign servers, which can execute
+	 * two-phase-commit protocol.
+	 */
+	prepare_foreign_transactions();
+}
+
+/*
+ * Prepare transactions on the foreign servers which can execute two phase
+ * commit protocol. Rest of the foreign servers are ignored.
+ */
+static void
+prepare_foreign_transactions(void)
+{
+	ListCell	*lcell;
+
+	/* 
+	 * Loop over the foreign connections 
+	 */
+	foreach(lcell, MyFDWConnections)
+	{
+		FDWConnection	*fdw_conn = (FDWConnection *)lfirst(lcell);
+		char			*fdw_xact_info;
+		int				fdw_xact_info_len;
+		FDWXact			fdw_xact;
+
+		if (!fdw_conn->two_phase_commit)
+			continue;
+
+		Assert(fdw_conn->prepare_id_provider);
+		fdw_xact_info = fdw_conn->prepare_id_provider(fdw_conn->serverid,
+															fdw_conn->userid,
+															&fdw_xact_info_len);
+		
+		/*
+		 * Register the foreign transaction with the identifier used to prepare
+		 * it on the foreign server. Registration persists this information to
+		 * the disk and logs (that way relaying it on standby). Thus in case we
+		 * loose connectivity to the foreign server or crash ourselves, we will
+		 * remember that we have prepared transaction on the foreign server and
+		 * try to resolve it when connectivity is restored or after crash
+		 * recovery.
+		 *
+		 * If we crash after persisting the information but before preparing the
+		 * transaction on the foreign server, we will try to resolve a
+		 * never-prepared transaction, and get an error. This is fine as long as
+		 * the FDW provides us unique prepared transaction identifiers.
+		 * 
+		 * If we prepare the transaction on the foreign server before persisting
+		 * the information to the disk and crash in-between these two steps, we
+		 * will forget that we prepared the transaction on the foreign server
+		 * and will not be able to resolve it after the crash. Hence persist
+		 * first then prepare.
+		 */
+		fdw_xact = register_fdw_xact(MyDatabaseId, GetTopTransactionId(),
+											fdw_conn->serverid, fdw_conn->userid,
+											fdw_xact_info_len, fdw_xact_info);
+		/*
+		 * Between register_fdw_xact call above till this backend hears back
+		 * from foreign server, the backend may abort the local transaction (say,
+		 * because of a signal). During abort processing, it will send an ABORT
+		 * message to the foreign server. If the foreign server has not prepared
+		 * the transaction, the message will succeed. If the foreign server has
+		 * prepared transaction, it will throw an error, which we will ignore and the
+		 * prepared foreign transaction will be resolved by the foreign transaction
+		 * resolver.
+		 */
+		if (!fdw_conn->prepare_foreign_xact(fdw_conn->serverid, fdw_conn->userid,
+											fdw_xact_info_len, fdw_xact_info))
+		{
+			/*
+			 * An error occured, and we didn't prepare the transaction. Delete the
+			 * entry from foreign transaction table. Raise an error, so that the
+			 * local server knows that one of the foreign server has failed to
+			 * prepare the transaction.
+			 * TODO:
+			 * FDW is expected to print the error as a warning and then we
+			 * raise actual error here. But instead, we should pull the
+			 * error text from FDW and add it here in the message or as a
+			 * context or a hint.
+			 */
+			remove_fdw_xact(fdw_xact);
+
+			/*
+			 * Delete the connection, since it doesn't require any further
+			 * processing. This deletion will invalidate current cell
+			 * pointer, but that is fine since we will not use that pointer
+			 * because the subsequent ereport will get us out of this loop.
+			 */
+			MyFDWConnections = list_delete_ptr(MyFDWConnections, fdw_conn);
+			ereport(ERROR,
+					(errmsg("can not prepare transaction on foreign server %s",
+							fdw_conn->servername)));
+		}
+
+		/* Prepare succeeded, remember it in the connection */
+		fdw_conn->fdw_xact = fdw_xact; 
+	}
+	return;
+}
+/*
+ * register_fdw_xact
+ * This function is used to create new foreign transaction entry before an FDW
+ * executes the first phase of two-phase commit. The function adds the entry to
+ * WAL and then persists it to the disk by creating a file under
+ * data/pg_fdw_xact directory.
+ */
+static FDWXact
+register_fdw_xact(Oid dbid, TransactionId xid, Oid serveroid, Oid userid,
+					int fdw_xact_id_len, char *fdw_xact_id)
+{
+	FDWXact				fdw_xact;
+	FDWXactOnDiskData	*fdw_xact_file_data;
+	int					data_len;
+	char				path[MAXPGPATH];
+	int					fd;
+	pg_crc32c			fdw_xact_crc;
+	pg_crc32c			bogus_crc;
+
+	/* Enter the foreign transaction in the shared memory structure */
+	fdw_xact = insert_fdw_xact(dbid, xid, serveroid, userid,
+									fdw_xact_id_len, fdw_xact_id,
+									FDW_XACT_PREPARING);
+	/*
+	 * Prepare to write the entry to a file. Also add xlog entry. The contents
+	 * of the xlog record are same as what is written to the file.
+	 */
+	data_len = offsetof(FDWXactOnDiskData, fdw_xact_id);
+	data_len = data_len + fdw_xact->fdw_xact_id_len;
+	data_len = MAXALIGN(data_len);
+	fdw_xact_file_data = (FDWXactOnDiskData *) palloc0(data_len);
+	fdw_xact_file_data->dboid = fdw_xact->dboid; 
+	fdw_xact_file_data->local_xid = fdw_xact->local_xid;
+	fdw_xact_file_data->serveroid = fdw_xact->serveroid;
+	fdw_xact_file_data->userid = fdw_xact->userid;
+	fdw_xact_file_data->fdw_xact_id_len = fdw_xact->fdw_xact_id_len;
+	memcpy(fdw_xact_file_data->fdw_xact_id, fdw_xact->fdw_xact_id,
+					fdw_xact->fdw_xact_id_len);
+
+	FDWXactFilePath(path, fdw_xact->local_xid, fdw_xact->serveroid,
+						fdw_xact->userid);
+
+	/* Create the file, but error out if it already exists. */ 
+	fd = OpenTransientFile(path, O_EXCL | O_CREAT | PG_BINARY | O_WRONLY,
+							S_IRUSR | S_IWUSR);
+	if (fd < 0)
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not create foreign transaction state file \"%s\": %m",
+						path)));
+
+	/* Write data to file, and calculate CRC as we pass over it */
+	INIT_CRC32C(fdw_xact_crc);
+	COMP_CRC32C(fdw_xact_crc, fdw_xact_file_data, data_len);
+	if (write(fd, fdw_xact_file_data, data_len) != data_len)
+	{
+		CloseTransientFile(fd);
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not write FDW transaction state file: %s", path)));
+	}
+
+	FIN_CRC32C(fdw_xact_crc);
+	/*
+	 * Write a deliberately bogus CRC to the state file; this is just paranoia
+	 * to catch the case where four more bytes will run us out of disk space.
+	 */
+	bogus_crc = ~fdw_xact_crc;
+
+	if ((write(fd, &bogus_crc, sizeof(pg_crc32c))) != sizeof(pg_crc32c))
+	{
+		CloseTransientFile(fd);
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not write two-phase state file: %m")));
+	}
+
+	/* Back up to prepare for rewriting the CRC */
+	if (lseek(fd, -((off_t) sizeof(pg_crc32c)), SEEK_CUR) < 0)
+	{
+		CloseTransientFile(fd);
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not seek in two-phase state file: %m")));
+	}
+
+	/*
+	 * The state file isn't valid yet, because we haven't written the correct
+	 * CRC yet.  Before we do that, insert entry in WAL and flush it to disk.
+	 *
+	 * Between the time we have written the WAL entry and the time we write
+	 * out the correct state file CRC, we have an inconsistency: we have
+	 * recorded the foreign transaction in WAL but not on the disk. We
+	 * use a critical section to force a PANIC if we are unable to complete
+	 * the write --- then, WAL replay should repair the inconsistency.  The
+	 * odds of a PANIC actually occurring should be very tiny given that we
+	 * were able to write the bogus CRC above.
+	 */
+	START_CRIT_SECTION();
+
+	/*
+	 * We have to set delayChkpt here, too; otherwise a checkpoint starting
+	 * immediately after the WAL record is inserted could complete without
+	 * fsync'ing our foreign transaction file. (This is essentially the same
+	 * kind of race condition as the COMMIT-to-clog-write case that
+	 * RecordTransactionCommit uses delayChkpt for; see notes there.)
+	 */
+	MyPgXact->delayChkpt = true;
+
+	/* Add the entry in the xlog and save LSN for checkpointer */
+	XLogBeginInsert();
+	XLogRegisterData((char *)fdw_xact_file_data, data_len);
+	fdw_xact->fdw_xact_lsn = XLogInsert(RM_FDW_XACT_ID, XLOG_FDW_XACT_INSERT);
+	XLogFlush(fdw_xact->fdw_xact_lsn);
+
+	/* If we crash now WAL replay will fix things */
+	/* write correct CRC and close file */
+	if ((write(fd, &fdw_xact_crc, sizeof(pg_crc32c))) != sizeof(pg_crc32c))
+	{
+		CloseTransientFile(fd);
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not write foreign transaction file: %m")));
+	}
+
+	if (CloseTransientFile(fd) != 0)
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not close foreign transaction file: %m")));
+
+	/* File is written completely, checkpoint can proceed with syncing */ 
+	fdw_xact->fdw_xact_valid = true;
+
+	MyPgXact->delayChkpt = false;
+	END_CRIT_SECTION();
+
+	pfree(fdw_xact_file_data);
+	return fdw_xact;
+}
+
+/*
+ * insert_fdw_xact
+ * Insert a new entry for a given foreign transaction identified by transaction
+ * id, foreign server and user mapping, in the shared memory. The inserted entry
+ * is returned locked.
+ *
+ * If the entry already exists, the function raises an error.
+ */
+static FDWXact 
+insert_fdw_xact(Oid dboid, TransactionId xid, Oid serveroid, Oid userid,
+					int fdw_xact_id_len, char *fdw_xact_id,
+					FDWXactStatus fdw_xact_status)
+{
+	FDWXact			fdw_xact;
+	int				cnt;
+
+	if (!fdwXactExitRegistered)
+	{
+		before_shmem_exit(AtProcExit_FDWXact, 0);
+		fdwXactExitRegistered = true;
+	}
+
+	if (fdw_xact_id_len > MAX_FDW_XACT_ID_LEN)
+		elog(ERROR, "foreign transaction identifier longer (%d) than allowed (%d)",
+				fdw_xact_id_len, MAX_FDW_XACT_ID_LEN);
+
+	LWLockAcquire(FDWXactLock, LW_EXCLUSIVE);
+	fdw_xact = NULL;
+	for (cnt = 0; cnt < FDWXactGlobal->num_fdw_xacts; cnt++)
+	{
+		fdw_xact = FDWXactGlobal->fdw_xacts[cnt];
+
+		if (fdw_xact->local_xid == xid &&
+			fdw_xact->serveroid == serveroid &&
+			fdw_xact->userid == userid)
+			elog(ERROR, "duplicate entry for foreign transaction with transaction id %u, serveroid %u, userid %u found",
+						xid, serveroid, userid);
+	}
+
+	/*
+	 * Get the next free foreign transaction entry. Raise error if there are
+	 * none left.
+	 */
+	if (!FDWXactGlobal->freeFDWXacts)
+	{
+		ereport(ERROR,
+				(errcode(ERRCODE_OUT_OF_MEMORY),
+				 errmsg("maximum number of foreign transactions reached"),
+				 errhint("Increase max_prepared_foreign_transactions (currently %d).",
+						 max_fdw_xacts)));
+	}
+
+	fdw_xact = FDWXactGlobal->freeFDWXacts;
+	FDWXactGlobal->freeFDWXacts = fdw_xact->fx_next;
+
+	/* Insert the entry to active array */
+	Assert(FDWXactGlobal->num_fdw_xacts < max_fdw_xacts);
+	FDWXactGlobal->fdw_xacts[FDWXactGlobal->num_fdw_xacts++] = fdw_xact;
+
+	/* Stamp the entry with backend id before releasing the LWLock */
+	fdw_xact->locking_backend = MyBackendId;
+	fdw_xact->dboid = dboid;
+	fdw_xact->local_xid = xid;
+	fdw_xact->serveroid = serveroid;
+	fdw_xact->userid = userid;
+	fdw_xact->fdw_xact_status = fdw_xact_status; 
+	fdw_xact->fdw_xact_lsn = 0;
+	fdw_xact->fdw_xact_valid = false;
+	fdw_xact->fdw_xact_id_len = fdw_xact_id_len;
+	memcpy(fdw_xact->fdw_xact_id, fdw_xact_id, fdw_xact_id_len);
+
+	/* Remember that we have locked this entry. */
+	MyLockedFDWXacts = lappend(MyLockedFDWXacts, fdw_xact);
+	LWLockRelease(FDWXactLock);
+
+	return fdw_xact;
+}
+
+/*
+ * remove_fdw_xact
+ * Removes the foreign prepared transaction entry from shared memory, disk and
+ * logs about the removal in WAL.
+ */
+static void
+remove_fdw_xact(FDWXact fdw_xact)
+{
+	int cnt;
+
+	LWLockAcquire(FDWXactLock, LW_EXCLUSIVE);
+	/* Search the slot where this entry resided */
+	for (cnt = 0; cnt < FDWXactGlobal->num_fdw_xacts; cnt++)
+	{
+		if (FDWXactGlobal->fdw_xacts[cnt] == fdw_xact)
+		{
+			FdwRemoveXlogRec	fdw_remove_xlog;
+
+			/* Fill up the log record before releasing the entry */ 
+			fdw_remove_xlog.serveroid = fdw_xact->serveroid;
+			fdw_remove_xlog.dbid = fdw_xact->dboid;
+			fdw_remove_xlog.xid = fdw_xact->local_xid;
+			fdw_remove_xlog.userid = fdw_xact->userid;
+
+			/* Remove the entry from active array */
+			FDWXactGlobal->num_fdw_xacts--;
+			FDWXactGlobal->fdw_xacts[cnt] = FDWXactGlobal->fdw_xacts[FDWXactGlobal->num_fdw_xacts];
+
+			/* Put it back into free list */
+			fdw_xact->fx_next = FDWXactGlobal->freeFDWXacts;
+			FDWXactGlobal->freeFDWXacts = fdw_xact;
+
+			/* Unlock the entry */
+			fdw_xact->locking_backend = InvalidBackendId;
+			MyLockedFDWXacts = list_delete_ptr(MyLockedFDWXacts, fdw_xact);
+
+			LWLockRelease(FDWXactLock);
+
+			/*
+			 * Log that we are removing the foreign transaction entry and remove
+			 * the file from the disk as well.
+			 */
+			XLogBeginInsert();
+			XLogRegisterData((char *)&fdw_remove_xlog, sizeof(fdw_remove_xlog));
+			XLogInsert(RM_FDW_XACT_ID, XLOG_FDW_XACT_REMOVE);
+
+			/* Remove the file from the disk as well. */
+			RemoveFDWXactFile(fdw_remove_xlog.xid, fdw_remove_xlog.serveroid,
+								fdw_remove_xlog.userid, true);
+			return;
+		}
+	}
+	LWLockRelease(FDWXactLock);
+
+	/* We did not find the given entry in global array */
+	elog(ERROR, "failed to find %p in FDWXactGlobal array", fdw_xact);
+}
+
+/*
+ * unlock_fdw_xact
+ * Unlock the foreign transaction entry by wiping out the locking_backend and
+ * removing it from the backend's list of foreign transaction.
+ */
+static void
+unlock_fdw_xact(FDWXact fdw_xact)
+{
+	/* Only the backend holding the lock is allowed to unlock */
+	Assert(fdw_xact->locking_backend == MyBackendId);
+	/*
+	 * First set the locking backend as invalid, and then remove it from the
+	 * list of locked foreign transactions, under the LW lock. If we reverse the
+	 * order and process exits in-between those two, we will be left an entry
+	 * locked by this backend, which gets unlocked only at the server restart.
+	 */
+
+	LWLockAcquire(FDWXactLock, LW_EXCLUSIVE);
+	fdw_xact->locking_backend = InvalidBackendId;
+	MyLockedFDWXacts = list_delete_ptr(MyLockedFDWXacts, fdw_xact);
+	LWLockRelease(FDWXactLock);
+}
+
+/*
+ * unlock_fdw_xact_entries
+ * Unlock the foreign transaction entries locked by this backend.
+ */
+static void
+unlock_fdw_xact_entries()
+{
+	while (MyLockedFDWXacts)
+	{
+		FDWXact	fdw_xact = (FDWXact) linitial(MyLockedFDWXacts);
+		unlock_fdw_xact(fdw_xact);
+	}
+}
+
+/* 
+ * AtProcExit_FDWXact
+ * When the process exits, unlock the entries it held.
+ */
+static void
+AtProcExit_FDWXact(int code, Datum arg)
+{
+	unlock_fdw_xact_entries();
+}
+
+/*
+ * AtEOXact_FDWXacts
+ * The function executes phase 2 of two-phase commit protocol.
+ * At the end of transaction perform following actions
+ * 1. Mark the entries locked by this backend as ABORTING or COMMITTING
+ *    according the result of transaction.
+ * 2. Try to commit or abort the transactions on foreign servers. If that
+ *    succeeds, remove them from foreign transaction entries, otherwise unlock
+ *    them.
+ */ 
+extern void
+AtEOXact_FDWXacts(bool is_commit)
+{
+	ListCell	*lcell;
+
+	foreach(lcell, MyFDWConnections)
+	{
+		FDWConnection *fdw_conn = lfirst(lcell);
+
+		/* Commit/abort prepared foreign transactions */
+		if (fdw_conn->fdw_xact)
+		{
+			FDWXact	fdw_xact = fdw_conn->fdw_xact;
+			fdw_xact->fdw_xact_status = (is_commit ?
+											FDW_XACT_COMMITTING_PREPARED :
+											FDW_XACT_ABORTING_PREPARED);
+			/* Try aborting or commiting the transaction on the foreign server */
+			if (!resolve_fdw_xact(fdw_xact, fdw_conn->resolve_prepared_foreign_xact))
+			{
+				/*
+				 * The transaction was not resolved on the foreign server, unlock
+				 * it, so that someone else can take care of it.
+				 */
+				unlock_fdw_xact(fdw_xact);
+			}
+		}
+		else
+		{
+			/*
+			 * On servers where two phase commit protocol could not be executed
+			 * we have tried to commit the transactions during pre-commit phase.
+			 * Any remaining transactions need to be aborted.
+			 */
+			Assert(!is_commit);
+
+			/*
+			 * The FDW has to make sure that the connection opened to the
+			 * foreign server is out of transaction. Even if the handler
+			 * function returns failure statue, there's hardly anything to do.
+			 */
+			if (!fdw_conn->end_foreing_xact(fdw_conn->serverid, fdw_conn->userid,
+												is_commit))
+				elog(WARNING, "could not %s transaction on server %s",
+								is_commit ? "commit" : "abort",
+								fdw_conn->servername);
+		}
+	}
+
+	/*
+	 * Unlock any locked foreign transactions. Resolver might lock the entries,
+	 * and may not be able to unlock them if aborted in-between. In any case,
+	 * there is no reason for a foreign transaction entry to be locked after the
+	 * transaction which locked it has ended.
+	 */
+	unlock_fdw_xact_entries();
+
+	/*
+	 * Reset the list of registered connections. Since the memory for the list
+	 * and its nodes comes from transaction memory context, it will be freed
+	 * after this call.
+	 */
+	MyFDWConnections = NIL;
+	/* Set TwoPhaseReady to its default value */
+	TwoPhaseReady = true;
+}
+
+/*
+ * AtPrepare_FDWXacts
+ * The function is called while preparing a transaction. If there are foreign
+ * servers involved in the transaction, this function prepares transactions
+ * on those servers.
+ */
+extern void
+AtPrepare_FDWXacts(void)
+{
+	/* If there are no foreign servers involved, we have no business here */
+	if (list_length(MyFDWConnections) < 1)
+		return;
+
+	/*
+	 * All foreign servers participating in a transaction to be prepared should
+	 * be two phase compliant.
+	 */
+	if (!TwoPhaseReady)
+		ereport(ERROR,
+				(errcode(ERRCODE_T_R_INTEGRITY_CONSTRAINT_VIOLATION),
+				 errmsg("can not prepare the transaction because some foreign server/s involved in transaction can not prepare the transaction")));
+
+	/* Prepare transactions on participating foreign servers. */
+	prepare_foreign_transactions();
+
+	/*
+	 * Unlock the foreign transaction entries so COMMIT/ROLLBACK PREPARED from
+	 * some other backend will be able to lock those if required.
+	 */
+	unlock_fdw_xact_entries();
+
+	/*
+	 * Reset the list of registered connections. Since the memory for the list
+	 * and its nodes comes from transaction memory context, it will be freed
+	 * after this call.
+	 */
+	MyFDWConnections = NIL;
+	/* Set TwoPhaseReady to its default value */
+	TwoPhaseReady = true;
+}
+
+/* 
+ * FDWXactTwoPhaseFinish
+ * This function is called as part of the COMMIT/ROLLBACK PREPARED command to
+ * commit/rollback the foreign transactions prepared as part of the local
+ * prepared transaction. The function looks for the foreign transaction entries
+ * with local_xid equal to xid of the prepared transaction and tries to resolve them.
+ */
+extern void
+FDWXactTwoPhaseFinish(bool isCommit, TransactionId xid)
+{
+	List	*entries_to_resolve;
+
+	FDWXactStatus	status = isCommit ? FDW_XACT_COMMITTING_PREPARED :
+											FDW_XACT_ABORTING_PREPARED;
+	/* Get all the entries belonging to the given transaction id locked. If
+	 * foreign transaction resolver is running, it might lock entries to
+	 * check whether they can be resolved. The search function will skip such
+	 * entries. The resolver will resolve them at a later point of time.
+	 */
+	search_fdw_xact(xid, InvalidOid, InvalidOid, InvalidOid, &entries_to_resolve);
+
+	/* Try resolving the foreign transactions */
+	while (entries_to_resolve)
+	{
+		FDWXact	fdw_xact = linitial(entries_to_resolve);
+		entries_to_resolve = list_delete_first(entries_to_resolve);
+		fdw_xact->fdw_xact_status = status;
+
+		/*
+		 * Resolve the foreign transaction. If resolution is not successful,
+		 * unlock the entry so that someone else can pick it up.
+		 */
+		if (!resolve_fdw_xact(fdw_xact,
+								get_prepared_foreign_xact_resolver(fdw_xact)))
+			unlock_fdw_xact(fdw_xact);
+	}
+}
+
+static ResolvePreparedForeignTransaction_function
+get_prepared_foreign_xact_resolver(FDWXact fdw_xact)
+{
+		ForeignServer		*foreign_server;
+		ForeignDataWrapper	*fdw;
+		FdwRoutine			*fdw_routine;
+
+		foreign_server = GetForeignServer(fdw_xact->serveroid); 
+		fdw = GetForeignDataWrapper(foreign_server->fdwid);
+		fdw_routine = GetFdwRoutine(fdw->fdwhandler);
+		if (!fdw_routine->ResolvePreparedForeignTransaction)
+			elog(ERROR, "no foreign transaction resolver routine provided for FDW %s",
+					fdw->fdwname);
+		return fdw_routine->ResolvePreparedForeignTransaction;
+}
+
+/*
+ * resolve_fdw_xact
+ * Resolve the foreign transaction using the foreign data wrapper's transaction
+ * handler routine.
+ * If the resolution is successful, remove the foreign transaction entry from
+ * the shared memory and also remove the corresponding on-disk file.
+ */
+static bool 
+resolve_fdw_xact(FDWXact fdw_xact,
+				ResolvePreparedForeignTransaction_function fdw_xact_handler)
+{
+	bool	resolved;
+	bool	is_commit;
+
+	Assert(fdw_xact->fdw_xact_status == FDW_XACT_COMMITTING_PREPARED ||
+			fdw_xact->fdw_xact_status == FDW_XACT_ABORTING_PREPARED);
+
+	is_commit = (fdw_xact->fdw_xact_status == FDW_XACT_COMMITTING_PREPARED) ?
+							true : false;
+
+	resolved = fdw_xact_handler(fdw_xact->serveroid, fdw_xact->userid,
+								is_commit,
+								fdw_xact->fdw_xact_id_len,
+								fdw_xact->fdw_xact_id);
+	
+	/* If we succeeded in resolving the transaction, remove the entry */
+	if (resolved)
+		remove_fdw_xact(fdw_xact);
+
+	return resolved;
+}
+
+/*
+ * fdw_xact_exists
+ * Returns true if there exists at least one prepared foreign transaction which
+ * matches criteria. This function is wrapper around search_fdw_xact. Check that
+ * function's prologue for details.
+ */
+bool
+fdw_xact_exists(TransactionId xid, Oid dbid, Oid serverid, Oid userid)
+{
+	return search_fdw_xact(xid, dbid, serverid, userid, NULL);
+}
+
+/*
+ * search_fdw_xact
+ * Return true if there exists at least one prepared foreign transaction
+ * entry with given criteria. The criteria is defined by arguments with
+ * valid values for respective datatypes.
+ *
+ * The table below explains the same
+ * xid     | dbid    | serverid | userid  | search for entry with
+ * invalid | invalid | invalid  | invalid | nothing
+ * invalid | invalid | invalid  | valid   | given userid
+ * invalid | invalid | valid    | invalid | given serverid
+ * invalid | invalid | valid    | valid   | given serverid and userid
+ * invalid | valid   | invalid  | invalid | given dbid
+ * invalid | valid   | invalid  | valid   | given dbid and userid
+ * invalid | valid   | valid    | invalid | given dbid and serverid
+ * invalid | valid   | valid    | valid   | given dbid, servroid and userid
+ * valid   | invalid | invalid  | invalid | given xid
+ * valid   | invalid | invalid  | valid   | given xid and userid
+ * valid   | invalid | valid    | invalid | given xid, serverid
+ * valid   | invalid | valid    | valid   | given xid, serverid, userid
+ * valid   | valid   | invalid  | invalid | given xid and dbid 
+ * valid   | valid   | invalid  | valid   | given xid, dbid and userid
+ * valid   | valid   | valid    | invalid | given xid, dbid, serverid
+ * valid   | valid   | valid    | valid   | given xid, dbid, serverid, userid
+ *
+ * When the criteria is void (all arguments invalid) the
+ * function returns true, since any entry would match the criteria.
+ *
+ * If qualifying_fdw_xacts is not NULL, the qualifying entries are locked and
+ * returned in a linked list. Any entry which is already locked is ignored. If
+ * all the qualifying entries are locked, nothing will be returned in the list
+ * but returned value will be true.
+ */
+bool
+search_fdw_xact(TransactionId xid, Oid dbid, Oid serverid, Oid userid,
+				List **qualifying_xacts)
+{
+	int			cnt;
+	LWLockMode	lock_mode;
+	/* Return value if a qualifying entry exists */
+	bool		entry_exists = false;
+
+	if (qualifying_xacts)
+	{
+		*qualifying_xacts = NIL;
+		/* The caller expects us to lock entries */
+		lock_mode = LW_EXCLUSIVE;
+	}
+	else
+		lock_mode = LW_SHARED;
+
+	LWLockAcquire(FDWXactLock, lock_mode);
+	for (cnt = 0; cnt < FDWXactGlobal->num_fdw_xacts; cnt++)
+	{
+		FDWXact	fdw_xact = FDWXactGlobal->fdw_xacts[cnt];
+		bool	entry_matches = true;
+
+		/* xid */
+		if (xid != InvalidTransactionId && xid != fdw_xact->local_xid)
+			entry_matches = false;
+		
+		/* dbid */
+		if (OidIsValid(dbid) && fdw_xact->dboid != dbid)
+			entry_matches = false;
+
+		/* serverid */
+		if (OidIsValid(serverid) && serverid != fdw_xact->serveroid)
+			entry_matches = false;
+		
+		/* userid */
+		if (OidIsValid(userid) && fdw_xact->userid != userid)
+			entry_matches = false;
+
+		if (entry_matches)
+		{
+			entry_exists = true;
+			if (qualifying_xacts)
+			{
+				/*
+				 * User has requested list of qualifying entries. If the
+				 * matching entry is not locked, lock it and add to the list. If
+				 * the entry is locked by some other backend, ignore it.
+				 */
+				if (fdw_xact->locking_backend == InvalidBackendId)
+				{
+					MemoryContext oldcontext;
+					fdw_xact->locking_backend = MyBackendId;
+
+					/* The list and its members may be required at the end of the transaction */
+					oldcontext = MemoryContextSwitchTo(TopTransactionContext);
+					MyLockedFDWXacts = lappend(MyLockedFDWXacts, fdw_xact);
+					MemoryContextSwitchTo(oldcontext);
+				}
+				else if (fdw_xact->locking_backend != MyBackendId)
+					continue;
+
+				*qualifying_xacts = lappend(*qualifying_xacts, fdw_xact);
+			}
+			else
+			{
+				/*
+				 * User wants to check the existence, and we have found one
+				 * matching entry. No need to check other entries.
+				 */
+				break;
+			}
+		}
+	}
+
+	LWLockRelease(FDWXactLock);
+	
+	return entry_exists;
+}
+
+/*
+ * get_dbids_with_unresolved_xact
+ * returns the oids of the databases containing unresolved foreign transactions.
+ * The function is used by pg_fdw_xact_resolver extension. Returns NIL if
+ * no such entry exists.
+ */
+List *
+get_dbids_with_unresolved_xact(void)
+{
+	int		cnt_xact;
+	List	*dbid_list = NIL;
+
+	LWLockAcquire(FDWXactLock, LW_SHARED);
+	for (cnt_xact = 0; cnt_xact < FDWXactGlobal->num_fdw_xacts; cnt_xact++)
+	{
+		FDWXact	fdw_xact;
+	
+		fdw_xact = FDWXactGlobal->fdw_xacts[cnt_xact];
+		
+		/* Skip locked entry as someone must be working on it */
+		if (fdw_xact->locking_backend == InvalidBackendId)
+			dbid_list = list_append_unique_oid(dbid_list, fdw_xact->dboid);
+	}
+	LWLockRelease(FDWXactLock);
+
+	return dbid_list;
+}
+
+/*
+ * fdw_xact_redo
+ * Apply the redo log for a foreign transaction.
+ */
+extern void
+fdw_xact_redo(XLogReaderState *record)
+{
+	char	  		*rec = XLogRecGetData(record);
+	uint8			info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+	int				rec_len = XLogRecGetDataLen(record);
+	TransactionId	xid = XLogRecGetXid(record);
+
+	if (info == XLOG_FDW_XACT_INSERT)
+	{
+		FDWXactOnDiskData	*fdw_xact_data_file = (FDWXactOnDiskData *)rec;
+		char				path[MAXPGPATH];
+		int					fd;
+		pg_crc32c	fdw_xact_crc;
+		
+		/* Recompute CRC */
+		INIT_CRC32C(fdw_xact_crc);
+		COMP_CRC32C(fdw_xact_crc, rec, rec_len);
+		FIN_CRC32C(fdw_xact_crc);
+
+		FDWXactFilePath(path, xid, fdw_xact_data_file->serveroid,
+							fdw_xact_data_file->userid);
+		/*
+		 * The file may exist, if it was flushed to disk after creating it. The
+		 * file might have been flushed while it was being crafted, so the
+		 * contents can not be guaranteed to be accurate. Hence truncate and
+		 * rewrite the file.
+		 */
+		fd = OpenTransientFile(path, O_CREAT | O_WRONLY | O_TRUNC | PG_BINARY,
+								S_IRUSR | S_IWUSR);
+		if (fd < 0)
+			ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not create/open foreign transaction state file \"%s\": %m",
+						path)));
+	
+		/* The log record is exactly the contents of the file. */
+		if (write(fd, rec, rec_len) != rec_len)
+		{
+			CloseTransientFile(fd);
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not write FDW transaction state file: %s", path)));
+		}
+	
+		if (write(fd, &fdw_xact_crc, sizeof(pg_crc32c)) != sizeof(pg_crc32c))
+		{
+			CloseTransientFile(fd);
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not write two-phase state file: %m")));
+		}
+
+		/*
+		 * We must fsync the file because the end-of-replay checkpoint will not do
+		 * so, there being no foreign transaction entry in shared memory yet to
+		 * tell it to.
+		 */
+		if (pg_fsync(fd) != 0)
+		{
+			CloseTransientFile(fd);
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not fsync foreign transaction state file: %m")));
+		}
+
+		CloseTransientFile(fd);
+		
+	}
+	else if (info == XLOG_FDW_XACT_REMOVE)
+	{
+		FdwRemoveXlogRec	*fdw_remove_xlog = (FdwRemoveXlogRec *)rec;
+
+		/* Remove the file from the disk. */
+		RemoveFDWXactFile(fdw_remove_xlog->xid, fdw_remove_xlog->serveroid, fdw_remove_xlog->userid,
+								true);
+	}
+	else
+		elog(ERROR, "invalid log type %d in foreign transction log record", info);
+
+	return;
+}
+
+/*
+ * CheckPointFDWXact
+ * Function syncs the foreign transaction files created between the two
+ * checkpoints.
+ * The foreign transaction entries and hence the corresponding files are expected
+ * to be very short-lived. By executing this function at the end, we might have
+ * lesser files to fsync, thus reducing some I/O. This is similar to
+ * CheckPointTwoPhase().
+ * In order to avoid disk I/O while holding a light weight lock, the function
+ * first collects the files which need to be synced under FDWXactLock and then
+ * syncs them after releasing the lock. This approach creates a race condition:
+ * after releasing the lock, and before syncing a file, the corresponding
+ * foreign transaction entry and hence the file might get removed. The function
+ * checks whether that's true and ignores the error if so.
+ */
+void
+CheckPointFDWXact(XLogRecPtr redo_horizon)
+{
+	Oid				*serveroids;
+	TransactionId	*xids;
+	Oid				*userids;
+	Oid				*dbids;
+	int				nxacts;
+	int				cnt;
+	/* Quick get-away, before taking lock */
+	if (max_fdw_xacts <= 0)
+		return;
+
+	LWLockAcquire(FDWXactLock, LW_SHARED);
+
+	/* Another quick, before we allocate memory */
+	if (FDWXactGlobal->num_fdw_xacts <= 0)
+	{
+		LWLockRelease(FDWXactLock);
+		return;
+	}
+
+	/*
+	 * Collect the file paths which need to be synced. We might sync a file
+	 * again if it lives beyond the checkpoint boundaries. But this case is rare
+	 * and may not involve much I/O.
+	 */
+	xids = (TransactionId *) palloc(FDWXactGlobal->num_fdw_xacts * sizeof(TransactionId));
+	serveroids = (Oid *) palloc(FDWXactGlobal->num_fdw_xacts * sizeof(Oid));
+	userids = (Oid *) palloc(FDWXactGlobal->num_fdw_xacts * sizeof(Oid));
+	dbids = (Oid *) palloc(FDWXactGlobal->num_fdw_xacts * sizeof(Oid));
+	nxacts = 0;
+
+	for (cnt = 0; cnt < FDWXactGlobal->num_fdw_xacts; cnt++)
+	{
+		FDWXact	fdw_xact = FDWXactGlobal->fdw_xacts[cnt];
+		if (fdw_xact->fdw_xact_valid &&
+			fdw_xact->fdw_xact_lsn <= redo_horizon)
+		{
+			xids[nxacts] = fdw_xact->local_xid;
+			serveroids[nxacts] = fdw_xact->serveroid;
+			userids[nxacts] = fdw_xact->userid;
+			dbids[nxacts] = fdw_xact->dboid;
+			nxacts++;
+		}
+	}
+
+	LWLockRelease(FDWXactLock);
+
+	for (cnt = 0; cnt < nxacts; cnt++)
+	{
+		char	path[MAXPGPATH];
+		int		fd;
+
+		FDWXactFilePath(path, xids[cnt], serveroids[cnt], userids[cnt]);
+			
+		fd = OpenTransientFile(path, O_RDWR | PG_BINARY, 0);
+
+		if (fd < 0)
+		{
+			if (errno == ENOENT)
+			{
+				/* OK if we do not have the entry anymore */
+				if (!fdw_xact_exists(xids[cnt], dbids[cnt], serveroids[cnt],
+										userids[cnt]))
+					continue;
+
+				/* Restore errno in case it was changed */
+				errno = ENOENT;
+			}
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not open foreign transaction state file \"%s\": %m",
+							path)));
+		}
+
+		if (pg_fsync(fd) != 0)
+		{
+			CloseTransientFile(fd);
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not fsync foreign transaction state file \"%s\": %m",
+							path)));
+		}
+
+		if (CloseTransientFile(fd) != 0)
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not close foreign transaction state file \"%s\": %m",
+							path)));
+	}
+
+	pfree(xids);
+	pfree(serveroids);
+	pfree(userids);
+	pfree(dbids);
+}
+
+/* Built in functions */
+/*
+ * pg_fdw_xact
+ *		Produce a view with one row per prepared transaction on foreign server.
+ *
+ * This function is here so we don't have to export the
+ * FDWXactGlobalData struct definition.
+ *
+ */
+
+/*
+ * Structure to hold and iterate over the foreign transactions to be displayed
+ * by the built-in functions.
+ */
+typedef struct
+{
+	FDWXact	fdw_xacts;
+	int		num_xacts;
+	int		cur_xact;
+} WorkingStatus;
+
+/*
+ * Returns an array of all foreign prepared transactions for the user-level
+ * function pg_fdw_xact.
+ *
+ * The returned array and all its elements are copies of internal data
+ * structures, to minimize the time we need to hold the FDWXactLock.
+ *
+ * WARNING -- we return even those transactions whose information is not
+ * completely filled yet. The caller should filter them out if he doesn't want them.
+ *
+ * The returned array is palloc'd.
+ */
+static int
+GetFDWXactList(FDWXact *fdw_xacts)
+{
+	int	num_xacts;
+	int	cnt_xacts;
+
+	LWLockAcquire(FDWXactLock, LW_SHARED);
+
+	if (FDWXactGlobal->num_fdw_xacts == 0)
+	{
+		LWLockRelease(FDWXactLock);
+		*fdw_xacts = NULL;
+		return 0;
+	}
+
+	num_xacts = FDWXactGlobal->num_fdw_xacts;
+	*fdw_xacts = (FDWXact) palloc(sizeof(FDWXactData) * num_xacts);
+	for (cnt_xacts = 0; cnt_xacts < num_xacts; cnt_xacts++)
+		memcpy((*fdw_xacts) + cnt_xacts, FDWXactGlobal->fdw_xacts[cnt_xacts],
+			   sizeof(FDWXactData));
+
+	LWLockRelease(FDWXactLock);
+
+	return num_xacts;
+}
+
+Datum
+pg_fdw_xact(PG_FUNCTION_ARGS)
+{
+	FuncCallContext *funcctx;
+	WorkingStatus	*status;
+	char			*xact_status;
+
+	if (SRF_IS_FIRSTCALL())
+	{
+		TupleDesc	tupdesc;
+		MemoryContext oldcontext;
+
+		/* create a function context for cross-call persistence */
+		funcctx = SRF_FIRSTCALL_INIT();
+
+		/*
+		 * Switch to memory context appropriate for multiple function calls
+		 */
+		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+		/* build tupdesc for result tuples */
+		/* this had better match pg_fdw_xacts view in system_views.sql */
+		tupdesc = CreateTemplateTupleDesc(6, false);
+		TupleDescInitEntry(tupdesc, (AttrNumber) 1, "dbid",
+						   OIDOID, -1, 0);
+		TupleDescInitEntry(tupdesc, (AttrNumber) 2, "transaction",
+						   XIDOID, -1, 0);
+		TupleDescInitEntry(tupdesc, (AttrNumber) 3, "serverid",
+						   OIDOID, -1, 0);
+		TupleDescInitEntry(tupdesc, (AttrNumber) 4, "userid",
+						   OIDOID, -1, 0);
+		TupleDescInitEntry(tupdesc, (AttrNumber) 5, "status",
+						   TEXTOID, -1, 0);
+		TupleDescInitEntry(tupdesc, (AttrNumber) 6, "identifier",
+						   TEXTOID, -1, 0);
+
+		funcctx->tuple_desc = BlessTupleDesc(tupdesc);
+
+		/*
+		 * Collect status information that we will format and send
+		 * out as a result set.
+		 */
+		status = (WorkingStatus *) palloc(sizeof(WorkingStatus));
+		funcctx->user_fctx = (void *) status;
+
+		status->num_xacts = GetFDWXactList(&status->fdw_xacts);
+		status->cur_xact = 0;
+
+		MemoryContextSwitchTo(oldcontext);
+	}
+
+	funcctx = SRF_PERCALL_SETUP();
+	status = funcctx->user_fctx;
+
+	while (status->cur_xact < status->num_xacts)
+	{
+		FDWXact		fdw_xact = &status->fdw_xacts[status->cur_xact++];
+		Datum		values[6];
+		bool		nulls[6];
+		HeapTuple	tuple;
+		Datum		result;
+		
+		if (!fdw_xact->fdw_xact_valid)
+			continue;
+
+		/*
+		 * Form tuple with appropriate data.
+		 */
+		MemSet(values, 0, sizeof(values));
+		MemSet(nulls, 0, sizeof(nulls));
+
+		values[0] = ObjectIdGetDatum(fdw_xact->dboid);
+		values[1] = TransactionIdGetDatum(fdw_xact->local_xid);
+		values[2] = ObjectIdGetDatum(fdw_xact->serveroid);
+		values[3] = ObjectIdGetDatum(fdw_xact->userid);
+		switch (fdw_xact->fdw_xact_status)
+		{
+			case FDW_XACT_PREPARING:
+				xact_status = "prepared";
+				break;
+			case FDW_XACT_COMMITTING_PREPARED:
+				xact_status = "committing";
+				break;
+			case FDW_XACT_ABORTING_PREPARED:
+				xact_status = "aborting";
+				break;
+			default:
+				xact_status = "unknown";
+				break;
+		}
+		values[4] = CStringGetTextDatum(xact_status);
+		/* should this be really interpreted by FDW */
+		values[5] = PointerGetDatum(cstring_to_text_with_len(fdw_xact->fdw_xact_id,
+															fdw_xact->fdw_xact_id_len));
+
+		tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
+		result = HeapTupleGetDatum(tuple);
+		SRF_RETURN_NEXT(funcctx, result);
+	}
+
+	SRF_RETURN_DONE(funcctx);
+}
+
+/*
+ * pg_fdw_resolve
+ * a user interface to initiate foreign transaction resolution. The function
+ * tries to resolve the prepared transactions on foreign servers in the database
+ * from where it is run.
+ * The function prints the status of all the foreign transactions it
+ * encountered, whether resolved or not.
+ */
+Datum
+pg_fdw_resolve(PG_FUNCTION_ARGS)
+{
+	MemoryContext	oldcontext;
+	FuncCallContext *funcctx;
+	WorkingStatus	*status;
+	char			*xact_status;
+	List			*entries_to_resolve;
+
+	if (SRF_IS_FIRSTCALL())
+	{
+		TupleDesc	tupdesc;
+
+		/* We will be modifying the shared memory. Prepare to clean up on exit */
+		if (!fdwXactExitRegistered)
+		{
+			before_shmem_exit(AtProcExit_FDWXact, 0);
+			fdwXactExitRegistered = true;
+		}
+	
+
+		/* Allocate space for and prepare the returning set */
+		/* create a function context for cross-call persistence */
+		funcctx = SRF_FIRSTCALL_INIT();
+		/* Switch to memory context appropriate for multiple function calls */
+		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+		/* build tupdesc for result tuples */
+		tupdesc = CreateTemplateTupleDesc(6, false);
+		TupleDescInitEntry(tupdesc, (AttrNumber) 1, "dbid",
+						   OIDOID, -1, 0);
+		TupleDescInitEntry(tupdesc, (AttrNumber) 2, "transaction",
+						   XIDOID, -1, 0);
+		TupleDescInitEntry(tupdesc, (AttrNumber) 3, "serverid",
+						   OIDOID, -1, 0);
+		TupleDescInitEntry(tupdesc, (AttrNumber) 4, "userid",
+						   OIDOID, -1, 0);
+		TupleDescInitEntry(tupdesc, (AttrNumber) 5, "status",
+						   TEXTOID, -1, 0);
+		TupleDescInitEntry(tupdesc, (AttrNumber) 6, "identifier",
+						   TEXTOID, -1, 0);
+
+		funcctx->tuple_desc = BlessTupleDesc(tupdesc);
+
+		/*
+		 * Collect status information that we will format and send
+		 * out as a result set.
+		 */
+		status = (WorkingStatus *) palloc(sizeof(WorkingStatus));
+		funcctx->user_fctx = (void *) status;
+		status->fdw_xacts = (FDWXact) palloc(sizeof(FDWXactData) * FDWXactGlobal->num_fdw_xacts);
+		status->num_xacts = 0;
+		status->cur_xact = 0;
+
+		/* Done preparation for the result. */
+		MemoryContextSwitchTo(oldcontext);
+
+	
+		/*
+		 * Get entries whose foreign servers are part of the database where
+		 * this function was called. We can get information about only such
+		 * foreign servers. The function will lock the entries. The entries
+		 * which are locked by other backends and whose foreign servers belong
+		 * to this database are left out, since we can not work on those.
+		 */
+		search_fdw_xact(InvalidTransactionId, MyDatabaseId, InvalidOid, InvalidOid,
+						&entries_to_resolve);
+	
+		/* Work to resolve the resolvable entries */
+		while (entries_to_resolve)
+		{
+			FDWXact	fdw_xact = linitial(entries_to_resolve);
+			
+			/* Remove the entry as we will not use it again */
+			entries_to_resolve = list_delete_first(entries_to_resolve);
+	
+			/* Copy the data for the sake of result. */
+			memcpy(status->fdw_xacts + status->num_xacts++,
+						fdw_xact, sizeof(FDWXactData));
+
+			if (fdw_xact->fdw_xact_status == FDW_XACT_COMMITTING_PREPARED ||
+					fdw_xact->fdw_xact_status == FDW_XACT_ABORTING_PREPARED)
+			{
+				/*
+				 * We have already decided what to do with the foreign transaction
+				 * nothing to be done.
+				 */
+			}
+			else if (TransactionIdDidCommit(fdw_xact->local_xid))
+				fdw_xact->fdw_xact_status = FDW_XACT_COMMITTING_PREPARED;
+			else if (TransactionIdDidAbort(fdw_xact->local_xid))
+				fdw_xact->fdw_xact_status = FDW_XACT_ABORTING_PREPARED;
+			else if (!TransactionIdIsInProgress(fdw_xact->local_xid))
+			{
+				/*
+				 * The transaction is in progress but not on any of the backends. So
+				 * probably, it crashed before actual abort or commit. So assume it
+				 * to be aborted.
+				 */
+				fdw_xact->fdw_xact_status = FDW_XACT_ABORTING_PREPARED;
+			}
+			else
+			{
+				/*
+				 * Local transaction is in progress, should not resolve the foreign
+				 * transaction. This can happen when the foreign transaction is
+				 * prepared as part of a local prepared transaction. Just
+				 * continue with the next one.
+				 */
+				unlock_fdw_xact(fdw_xact);
+				continue;
+			}
+	
+	
+			/*
+			 * Resolve the foreign transaction. If resolution was not successful,
+			 * unlock the entry so that someone else can pick it up
+			 */
+			if (!resolve_fdw_xact(fdw_xact, get_prepared_foreign_xact_resolver(fdw_xact)))
+				unlock_fdw_xact(fdw_xact);
+			else
+				/* Update the status in the result set */
+				status->fdw_xacts[status->num_xacts - 1].fdw_xact_status = FDW_XACT_RESOLVED;
+		}
+	}
+	
+	/* Print the result set */
+	funcctx = SRF_PERCALL_SETUP();
+	status = funcctx->user_fctx;
+
+	while (status->cur_xact < status->num_xacts)
+	{
+		FDWXact		fdw_xact = &status->fdw_xacts[status->cur_xact++];
+		Datum		values[6];
+		bool		nulls[6];
+		HeapTuple	tuple;
+		Datum		result;
+		
+		if (!fdw_xact->fdw_xact_valid)
+			continue;
+
+		/*
+		 * Form tuple with appropriate data.
+		 */
+		MemSet(values, 0, sizeof(values));
+		MemSet(nulls, 0, sizeof(nulls));
+
+		values[0] = ObjectIdGetDatum(fdw_xact->dboid);
+		values[1] = TransactionIdGetDatum(fdw_xact->local_xid);
+		values[2] = ObjectIdGetDatum(fdw_xact->serveroid);
+		values[3] = ObjectIdGetDatum(fdw_xact->userid);
+		switch (fdw_xact->fdw_xact_status)
+		{
+			case FDW_XACT_PREPARING:
+				xact_status = "preparing";
+				break;
+			case FDW_XACT_COMMITTING_PREPARED:
+				xact_status = "committing";
+				break;
+			case FDW_XACT_ABORTING_PREPARED:
+				xact_status = "aborting";
+				break;
+			case FDW_XACT_RESOLVED:
+				xact_status = "resolved";
+				break;
+			default:
+				xact_status = "unknown";
+				break;
+		}
+		values[4] = CStringGetTextDatum(xact_status);
+		/* should this be really interpreted by FDW? */
+		values[5] = PointerGetDatum(cstring_to_text_with_len(fdw_xact->fdw_xact_id,
+															fdw_xact->fdw_xact_id_len));
+
+		tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
+		result = HeapTupleGetDatum(tuple);
+		SRF_RETURN_NEXT(funcctx, result);
+	}
+
+	SRF_RETURN_DONE(funcctx);
+}
+
+/*
+ * Built-in function to remove prepared foreign transaction entry/s without
+ * resolving. The function gives a way to forget about such prepared
+ * transaction in case
+ * 1. The foreign server where it is prepared is no longer available
+ * 2. The user which prepared this transaction needs to be dropped
+ * 3. PITR is recoverying before a transaction id, which created the prepared
+ *    foreign transaction
+ * 4. The database containing the entries needs to be dropped
+ *
+ * Or any such conditions in which resolution is no longer possible.
+ *
+ * The function accepts 4 arguments transaction id, dbid, serverid and userid,
+ * which define the criteria in the same way as search_fdw_xact(). The entries
+ * matching the criteria are removed. The function does not remove an entry
+ * which is locked by some other backend.
+ */
+Datum
+pg_fdw_remove(PG_FUNCTION_ARGS)
+{
+/* Some #defines only for this function to deal with the arguments */
+#define XID_ARGNUM	0
+#define DBID_ARGNUM 1
+#define SRVID_ARGNUM 2
+#define USRID_ARGNUM 3
+
+	TransactionId	xid;
+	Oid				dbid;
+	Oid				serverid;
+	Oid				userid;
+	List			*entries_to_remove;
+
+	xid = PG_ARGISNULL(XID_ARGNUM) ? InvalidTransactionId :
+									PG_GETARG_TRANSACTIONID(XID_ARGNUM);
+	dbid = PG_ARGISNULL(DBID_ARGNUM) ? InvalidOid :
+									PG_GETARG_OID(DBID_ARGNUM);
+	serverid = PG_ARGISNULL(SRVID_ARGNUM) ? InvalidOid :
+									PG_GETARG_OID(SRVID_ARGNUM);
+	userid = PG_ARGISNULL(USRID_ARGNUM) ? InvalidOid :
+									PG_GETARG_OID(USRID_ARGNUM);
+
+	search_fdw_xact(xid, dbid, serverid, userid, &entries_to_remove);
+
+	while (entries_to_remove)
+	{
+		FDWXact	fdw_xact = linitial(entries_to_remove);
+		entries_to_remove = list_delete_first(entries_to_remove);
+
+		remove_fdw_xact(fdw_xact);
+	}
+	
+	PG_RETURN_VOID();
+}
+
+/*
+ * Code dealing with the on disk files used to store foreign transaction
+ * information.
+ */
+
+/*
+ * ReadFDWXactFile
+ * Read the foreign transction state file and return the contents in a
+ * structure allocated in-memory. The structure can be later freed by the
+ * caller.
+ */
+static FDWXactOnDiskData *
+ReadFDWXactFile(TransactionId xid, Oid serveroid, Oid userid)
+{
+	char				path[MAXPGPATH];
+	int					fd;
+	FDWXactOnDiskData	*fdw_xact_file_data;
+	struct stat			stat;
+	uint32				crc_offset;
+	pg_crc32c			calc_crc;
+	pg_crc32c			file_crc;
+	char				*buf;
+
+	FDWXactFilePath(path, xid, serveroid, userid);
+
+	fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
+	if (fd < 0)
+		ereport(ERROR,
+			(errcode_for_file_access(),
+			 errmsg("could not open FDW transaction state file \"%s\": %m",
+					path)));
+
+	/*
+	 * Check file length.  We can determine a lower bound pretty easily. We
+	 * set an upper bound to avoid palloc() failure on a corrupt file, though
+	 * we can't guarantee that we won't get an out of memory error anyway,
+	 * even on a valid file.
+	 */
+	if (fstat(fd, &stat))
+	{
+		CloseTransientFile(fd);
+
+		ereport(WARNING,
+					(errcode_for_file_access(),
+					 errmsg("could not stat FDW transaction state file \"%s\": %m",
+							path)));
+		return NULL;
+	}
+
+	if (stat.st_size < offsetof(FDWXactOnDiskData, fdw_xact_id) ||
+		stat.st_size > MaxAllocSize)
+	{
+		CloseTransientFile(fd);
+		ereport(WARNING,
+					(errcode_for_file_access(),
+					 errmsg("Too large FDW transaction state file \"%s\": %m",
+							path)));
+		return NULL;
+	}
+
+	buf = (char *) palloc(stat.st_size);
+	fdw_xact_file_data = (FDWXactOnDiskData *)buf;
+	crc_offset = stat.st_size - sizeof(pg_crc32c);
+	/* Slurp the file */
+	if (read(fd, fdw_xact_file_data, stat.st_size) != stat.st_size)
+	{
+		CloseTransientFile(fd);
+		ereport(WARNING,
+					(errcode_for_file_access(),
+					 errmsg("could not read FDW transaction state file \"%s\": %m",
+							path)));
+		pfree(fdw_xact_file_data);
+		return NULL;
+	}
+
+	CloseTransientFile(fd);
+	/*
+	 * Check the CRC.
+	 */
+
+	INIT_CRC32C(calc_crc);
+	COMP_CRC32C(calc_crc, buf, crc_offset);
+	FIN_CRC32C(calc_crc);
+
+	file_crc = *((pg_crc32c *) (buf + crc_offset));
+
+	if (!EQ_CRC32C(calc_crc, file_crc))
+	{
+		pfree(buf);
+		return NULL;
+	}
+
+	if (fdw_xact_file_data->serveroid != serveroid ||
+			fdw_xact_file_data->userid != userid ||
+			fdw_xact_file_data->local_xid != xid)
+	{
+		ereport(WARNING,
+				  (errmsg("removing corrupt foreign transaction state file \"%s\"",
+							  path)));
+		CloseTransientFile(fd);
+		pfree(buf);
+		return NULL;
+	}
+	
+	return fdw_xact_file_data;
+}
+
+/*
+ * PrescanFDWXacts
+ * Read the foreign prepared transactions directory for oldest active
+ * transaction. The transactions corresponding to the xids in this directory
+ * are not necessarily active per say locally. But we still need those XIDs to
+ * be alive so that
+ * 1. we can determine whether they are committed or aborted
+ * 2. the file name contains xid which shouldn't get used again to avoid
+ *    conflicting file names.
+ *
+ * The function accepts the oldest active xid determined by other functions
+ * (e.g. PrescanPreparedTransactions()). It then compares every xid it comes
+ * across while scanning foreign prepared transactions directory with the oldest
+ * active xid. It returns the oldest of those xids or oldest active xid
+ * whichever is older.
+ *
+ * If any foreign prepared transaction is part of a future transaction (PITR),
+ * the function removes the corresponding file as
+ * 1. We can not know the status of the local transaction which prepared this
+ * foreign transaction
+ * 2. The foreign server or the user may not be available as per new timeline
+ *
+ * Anyway, the local transaction which prepared the foreign prepared transaction
+ * does not exist as per the new timeline, so it's better to forget the foreign
+ * prepared transaction as well.
+ */
+TransactionId
+PrescanFDWXacts(TransactionId oldestActiveXid)
+{
+	TransactionId	nextXid = ShmemVariableCache->nextXid;
+	DIR		  		*cldir;
+	struct dirent	*clde;
+
+	cldir = AllocateDir(FDW_XACTS_DIR);
+	while ((clde = ReadDir(cldir, FDW_XACTS_DIR)) != NULL)
+	{
+		if (strlen(clde->d_name) == FDW_XACT_FILE_NAME_LEN &&
+			strspn(clde->d_name, "0123456789ABCDEF_") == FDW_XACT_FILE_NAME_LEN)
+		{
+			Oid					serveroid;
+			Oid					userid;
+			TransactionId		local_xid;
+
+			sscanf(clde->d_name, "%08x_%08x_%08x", &local_xid, &serveroid,
+					&userid);
+
+			/*
+			 * Remove a foreign prepared transaction file correspnding
+			 * to an XID, which is too new.
+			 */
+			if (TransactionIdFollowsOrEquals(local_xid, nextXid))
+			{
+				ereport(WARNING,
+					  (errmsg("removing future foreign prepared transaction file \"%s\"",
+							  clde->d_name)));
+				RemoveFDWXactFile(local_xid, serveroid, userid, true);
+				continue;
+			}
+
+			if (TransactionIdPrecedesOrEquals(local_xid, oldestActiveXid))
+				oldestActiveXid = local_xid;
+		}
+	}
+	
+	FreeDir(cldir);
+	return oldestActiveXid;
+}
+/*
+ * ReadFDWXact
+ * Read the foreign prepared transaction information and set it up for further
+ * usage.
+ */
+void
+ReadFDWXacts(void)
+{
+	DIR		  		*cldir;
+	struct dirent	*clde;
+
+	cldir = AllocateDir(FDW_XACTS_DIR);
+	while ((clde = ReadDir(cldir, FDW_XACTS_DIR)) != NULL)
+	{
+		if (strlen(clde->d_name) == FDW_XACT_FILE_NAME_LEN &&
+			strspn(clde->d_name, "0123456789ABCDEF_") == FDW_XACT_FILE_NAME_LEN)
+		{
+			Oid					serveroid;
+			Oid					userid;
+			TransactionId		local_xid;
+			FDWXactOnDiskData	*fdw_xact_file_data;
+			FDWXact				fdw_xact;
+
+			sscanf(clde->d_name, "%08x_%08x_%08x", &local_xid, &serveroid,
+					&userid);
+
+			fdw_xact_file_data = ReadFDWXactFile(local_xid, serveroid, userid);
+
+			if (!fdw_xact_file_data)
+			{
+				ereport(WARNING,
+						(errmsg("Removing corrupt foreign transaction file \"%s\"",
+								 clde->d_name)));
+				RemoveFDWXactFile(local_xid, serveroid, userid, false);
+				continue;
+			}
+
+			ereport(LOG,
+					(errmsg("recovering foreign transaction entry for xid %u, foreign server %u and user %u",
+								local_xid, serveroid, userid)));
+
+			/*
+			 * Add this entry into the table of foreign transactions. The status
+			 * of the transaction is set as preparing, since we do not know the
+			 * exact status right now. Resolver will set it later based on the
+			 * status of local transaction which prepared this foreign
+			 * transaction.
+			 */
+			fdw_xact = insert_fdw_xact(fdw_xact_file_data->dboid, local_xid,
+										serveroid, userid,
+										fdw_xact_file_data->fdw_xact_id_len,
+										fdw_xact_file_data->fdw_xact_id,
+										FDW_XACT_PREPARING);
+			/* Add some valid LSN */
+			fdw_xact->fdw_xact_lsn = 0;
+			/* Mark the entry as ready */	
+			fdw_xact->fdw_xact_valid = true;
+			/* Unlock the entry as we don't need it any further */
+			unlock_fdw_xact(fdw_xact);
+			pfree(fdw_xact_file_data);
+		}
+	}
+	
+	FreeDir(cldir);
+}
+
+/*
+ * Remove the foreign transaction file for given entry.
+ *
+ * If giveWarning is false, do not complain about file-not-present;
+ * this is an expected case during WAL replay.
+ */
+void
+RemoveFDWXactFile(TransactionId xid, Oid serveroid, Oid userid, bool giveWarning)
+{
+	char		path[MAXPGPATH];
+
+	FDWXactFilePath(path, xid, serveroid, userid);
+	if (unlink(path))
+		if (errno != ENOENT || giveWarning)
+			ereport(WARNING,
+					(errcode_for_file_access(),
+				   errmsg("could not remove foreign transaction state file \"%s\": %m",
+						  path)));
+}
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index 7c4d773..cdbc583 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -7,20 +7,21 @@
  */
 #include "postgres.h"
 
 #include "access/clog.h"
 #include "access/commit_ts.h"
 #include "access/gin.h"
 #include "access/gist_private.h"
 #include "access/hash.h"
 #include "access/heapam_xlog.h"
 #include "access/brin_xlog.h"
+#include "access/fdw_xact.h"
 #include "access/multixact.h"
 #include "access/nbtree.h"
 #include "access/spgist.h"
 #include "access/xact.h"
 #include "access/xlog_internal.h"
 #include "catalog/storage_xlog.h"
 #include "commands/dbcommands_xlog.h"
 #include "commands/sequence.h"
 #include "commands/tablespace.h"
 #include "replication/origin.h"
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 8c47e0f..e7c1199 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -35,20 +35,21 @@
  */
 #include "postgres.h"
 
 #include <fcntl.h>
 #include <sys/stat.h>
 #include <sys/types.h>
 #include <time.h>
 #include <unistd.h>
 
 #include "access/commit_ts.h"
+#include "access/fdw_xact.h"
 #include "access/htup_details.h"
 #include "access/subtrans.h"
 #include "access/transam.h"
 #include "access/twophase.h"
 #include "access/twophase_rmgr.h"
 #include "access/xact.h"
 #include "access/xlog.h"
 #include "access/xloginsert.h"
 #include "access/xlogutils.h"
 #include "catalog/pg_type.h"
@@ -1471,20 +1472,26 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 		RelationCacheInitFilePostInvalidate();
 
 	/* And now do the callbacks */
 	if (isCommit)
 		ProcessRecords(bufptr, xid, twophase_postcommit_callbacks);
 	else
 		ProcessRecords(bufptr, xid, twophase_postabort_callbacks);
 
 	PredicateLockTwoPhaseFinish(xid, isCommit);
 
+	/*
+	 * Commit/Rollback the foreign transactions prepared as part of this
+	 * prepared transaction.
+	 */
+	FDWXactTwoPhaseFinish(isCommit, xid);
+
 	/* Count the prepared xact as committed or aborted */
 	AtEOXact_PgStat(isCommit);
 
 	/*
 	 * And now we can clean up our mess.
 	 */
 	RemoveTwoPhaseFile(xid, true);
 
 	RemoveGXact(gxact);
 	MyLockedGxact = NULL;
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 47312f6..47a2a9b 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -14,20 +14,21 @@
  *
  *-------------------------------------------------------------------------
  */
 
 #include "postgres.h"
 
 #include <time.h>
 #include <unistd.h>
 
 #include "access/commit_ts.h"
+#include "access/fdw_xact.h"
 #include "access/multixact.h"
 #include "access/parallel.h"
 #include "access/subtrans.h"
 #include "access/transam.h"
 #include "access/twophase.h"
 #include "access/xact.h"
 #include "access/xlog.h"
 #include "access/xloginsert.h"
 #include "access/xlogutils.h"
 #include "catalog/catalog.h"
@@ -179,20 +180,24 @@ typedef struct TransactionStateData
 	TransactionId *childXids;	/* subcommitted child XIDs, in XID order */
 	int			nChildXids;		/* # of subcommitted child XIDs */
 	int			maxChildXids;	/* allocated size of childXids[] */
 	Oid			prevUser;		/* previous CurrentUserId setting */
 	int			prevSecContext; /* previous SecurityRestrictionContext */
 	bool		prevXactReadOnly;		/* entry-time xact r/o state */
 	bool		startedInRecovery;		/* did we start in recovery? */
 	bool		didLogXid;		/* has xid been included in WAL record? */
 	int			parallelModeLevel;		/* Enter/ExitParallelMode counter */
 	struct TransactionStateData *parent;		/* back link to parent */
+	int			num_foreign_servers;	/* number of foreign servers participating in the transaction,
+										   Only valid for top level transaction */
+	int			can_prepare;			/* can all the foreign server involved in
+										   this transaction participate in 2PC */
 } TransactionStateData;
 
 typedef TransactionStateData *TransactionState;
 
 /*
  * CurrentTransactionState always points to the current transaction state
  * block.  It will point to TopTransactionStateData when not in a
  * transaction at all, or when in a top-level transaction.
  */
 static TransactionStateData TopTransactionStateData = {
@@ -1896,20 +1901,23 @@ StartTransaction(void)
 	/* SecurityRestrictionContext should never be set outside a transaction */
 	Assert(s->prevSecContext == 0);
 
 	/*
 	 * initialize other subsystems for new transaction
 	 */
 	AtStart_GUC();
 	AtStart_Cache();
 	AfterTriggerBeginXact();
 
+	/* Foreign transaction stuff */
+	s->num_foreign_servers = 0;
+
 	/*
 	 * done with start processing, set current transaction state to "in
 	 * progress"
 	 */
 	s->state = TRANS_INPROGRESS;
 
 	ShowTransactionState("StartTransaction");
 }
 
 
@@ -1956,20 +1964,23 @@ CommitTransaction(void)
 
 		/*
 		 * Close open portals (converting holdable ones into static portals).
 		 * If there weren't any, we are done ... otherwise loop back to check
 		 * if they queued deferred triggers.  Lather, rinse, repeat.
 		 */
 		if (!PreCommit_Portals(false))
 			break;
 	}
 
+	/* Pre-commit step for foreign transactions */
+	PreCommit_FDWXacts();
+
 	CallXactCallbacks(is_parallel_worker ? XACT_EVENT_PARALLEL_PRE_COMMIT
 					  : XACT_EVENT_PRE_COMMIT);
 
 	/*
 	 * The remaining actions cannot call any user-defined code, so it's safe
 	 * to start shutting down within-transaction services.  But note that most
 	 * of this stuff could still throw an error, which would switch us into
 	 * the transaction-abort path.
 	 */
 
@@ -2113,20 +2124,21 @@ CommitTransaction(void)
 	AtEOXact_GUC(true, 1);
 	AtEOXact_SPI(true);
 	AtEOXact_on_commit_actions(true);
 	AtEOXact_Namespace(true, is_parallel_worker);
 	AtEOXact_SMgr();
 	AtEOXact_Files();
 	AtEOXact_ComboCid();
 	AtEOXact_HashTables(true);
 	AtEOXact_PgStat(true);
 	AtEOXact_Snapshot(true);
+	AtEOXact_FDWXacts(true);
 	pgstat_report_xact_timestamp(0);
 
 	CurrentResourceOwner = NULL;
 	ResourceOwnerDelete(TopTransactionResourceOwner);
 	s->curTransactionOwner = NULL;
 	CurTransactionResourceOwner = NULL;
 	TopTransactionResourceOwner = NULL;
 
 	AtCommit_Memory();
 
@@ -2297,20 +2309,21 @@ PrepareTransaction(void)
 	 * before or after releasing the transaction's locks.
 	 */
 	StartPrepare(gxact);
 
 	AtPrepare_Notify();
 	AtPrepare_Locks();
 	AtPrepare_PredicateLocks();
 	AtPrepare_PgStat();
 	AtPrepare_MultiXact();
 	AtPrepare_RelationMap();
+	AtPrepare_FDWXacts();
 
 	/*
 	 * Here is where we really truly prepare.
 	 *
 	 * We have to record transaction prepares even if we didn't make any
 	 * updates, because the transaction manager might get confused if we lose
 	 * a global transaction.
 	 */
 	EndPrepare(gxact);
 
@@ -2579,20 +2592,21 @@ AbortTransaction(void)
 
 		AtEOXact_GUC(false, 1);
 		AtEOXact_SPI(false);
 		AtEOXact_on_commit_actions(false);
 		AtEOXact_Namespace(false, is_parallel_worker);
 		AtEOXact_SMgr();
 		AtEOXact_Files();
 		AtEOXact_ComboCid();
 		AtEOXact_HashTables(false);
 		AtEOXact_PgStat(false);
+		AtEOXact_FDWXacts(false);
 		pgstat_report_xact_timestamp(0);
 	}
 
 	/*
 	 * State remains TRANS_ABORT until CleanupTransaction().
 	 */
 	RESUME_INTERRUPTS();
 }
 
 /*
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 08d1682..d9cd8cd 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -16,20 +16,21 @@
 
 #include <ctype.h>
 #include <time.h>
 #include <fcntl.h>
 #include <sys/stat.h>
 #include <sys/time.h>
 #include <unistd.h>
 
 #include "access/clog.h"
 #include "access/commit_ts.h"
+#include "access/fdw_xact.h"
 #include "access/multixact.h"
 #include "access/rewriteheap.h"
 #include "access/subtrans.h"
 #include "access/timeline.h"
 #include "access/transam.h"
 #include "access/tuptoaster.h"
 #include "access/twophase.h"
 #include "access/xact.h"
 #include "access/xlog_internal.h"
 #include "access/xloginsert.h"
@@ -4872,20 +4873,21 @@ BootStrapXLOG(void)
 
 	/* Set important parameter values for use when replaying WAL */
 	ControlFile->MaxConnections = MaxConnections;
 	ControlFile->max_worker_processes = max_worker_processes;
 	ControlFile->max_prepared_xacts = max_prepared_xacts;
 	ControlFile->max_locks_per_xact = max_locks_per_xact;
 	ControlFile->wal_level = wal_level;
 	ControlFile->wal_log_hints = wal_log_hints;
 	ControlFile->track_commit_timestamp = track_commit_timestamp;
 	ControlFile->data_checksum_version = bootstrap_data_checksum_version;
+	ControlFile->max_fdw_xacts = max_fdw_xacts;
 
 	/* some additional ControlFile fields are set in WriteControlFile() */
 
 	WriteControlFile();
 
 	/* Bootstrap the commit log, too */
 	BootStrapCLOG();
 	BootStrapCommitTs();
 	BootStrapSUBTRANS();
 	BootStrapMultiXact();
@@ -5865,20 +5867,23 @@ CheckRequiredParameterValues(void)
 									 ControlFile->MaxConnections);
 		RecoveryRequiresIntParameter("max_worker_processes",
 									 max_worker_processes,
 									 ControlFile->max_worker_processes);
 		RecoveryRequiresIntParameter("max_prepared_transactions",
 									 max_prepared_xacts,
 									 ControlFile->max_prepared_xacts);
 		RecoveryRequiresIntParameter("max_locks_per_transaction",
 									 max_locks_per_xact,
 									 ControlFile->max_locks_per_xact);
+		RecoveryRequiresIntParameter("max_prepared_foreign_transactions",
+									 max_fdw_xacts,
+									 ControlFile->max_fdw_xacts);
 	}
 }
 
 /*
  * This must be called ONCE during postmaster or standalone-backend startup
  */
 void
 StartupXLOG(void)
 {
 	XLogCtlInsert *Insert;
@@ -6546,21 +6551,24 @@ StartupXLOG(void)
 		{
 			TransactionId *xids;
 			int			nxids;
 
 			ereport(DEBUG1,
 					(errmsg("initializing for hot standby")));
 
 			InitRecoveryTransactionEnvironment();
 
 			if (wasShutdown)
+			{
 				oldestActiveXID = PrescanPreparedTransactions(&xids, &nxids);
+				oldestActiveXID = PrescanFDWXacts(oldestActiveXID);
+			}
 			else
 				oldestActiveXID = checkPoint.oldestActiveXid;
 			Assert(TransactionIdIsValid(oldestActiveXID));
 
 			/* Tell procarray about the range of xids it has to deal with */
 			ProcArrayInitRecovery(ShmemVariableCache->nextXid);
 
 			/*
 			 * Startup commit log, commit timestamp and subtrans only.
 			 * MultiXact has already been started up and other SLRUs are not
@@ -7146,20 +7154,21 @@ StartupXLOG(void)
 
 	LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
 
 	XLogCtl->LogwrtResult = LogwrtResult;
 
 	XLogCtl->LogwrtRqst.Write = EndOfLog;
 	XLogCtl->LogwrtRqst.Flush = EndOfLog;
 
 	/* Pre-scan prepared transactions to find out the range of XIDs present */
 	oldestActiveXID = PrescanPreparedTransactions(NULL, NULL);
+	oldestActiveXID = PrescanFDWXacts(oldestActiveXID);
 
 	/*
 	 * Update full_page_writes in shared memory and write an XLOG_FPW_CHANGE
 	 * record before resource manager writes cleanup WAL records or checkpoint
 	 * record is written.
 	 */
 	Insert->fullPageWrites = lastFullPageWrites;
 	LocalSetXLogInsertAllowed();
 	UpdateFullPageWrites();
 	LocalXLogInsertAllowed = -1;
@@ -7343,20 +7352,26 @@ StartupXLOG(void)
 	/*
 	 * Perform end of recovery actions for any SLRUs that need it.
 	 */
 	TrimCLOG();
 	TrimMultiXact();
 
 	/* Reload shared-memory state for prepared transactions */
 	RecoverPreparedTransactions();
 
 	/*
+	 * WAL reply must have created the files for prepared foreign transactions.
+	 * Reload the shared-memory foreign transaction state.
+	 */
+	ReadFDWXacts();
+
+	/*
 	 * Shutdown the recovery environment. This must occur after
 	 * RecoverPreparedTransactions(), see notes for lock_twophase_recover()
 	 */
 	if (standbyState != STANDBY_DISABLED)
 		ShutdownRecoveryTransactionEnvironment();
 
 	/* Shut down xlogreader */
 	if (readFile >= 0)
 	{
 		close(readFile);
@@ -8606,20 +8621,25 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
 	CheckPointMultiXact();
 	CheckPointPredicate();
 	CheckPointRelationMap();
 	CheckPointReplicationSlots();
 	CheckPointSnapBuild();
 	CheckPointLogicalRewriteHeap();
 	CheckPointBuffers(flags);	/* performs all required fsyncs */
 	CheckPointReplicationOrigin();
 	/* We deliberately delay 2PC checkpointing as long as possible */
 	CheckPointTwoPhase(checkPointRedo);
+	/*
+	 * We deliberately delay foreign transaction checkpointing as long as
+	 * possible.
+	 */
+	CheckPointFDWXact(checkPointRedo);
 }
 
 /*
  * Save a checkpoint for recovery restart if appropriate
  *
  * This function is called each time a checkpoint record is read from XLOG.
  * It must determine whether the checkpoint represents a safe restartpoint or
  * not.  If so, the checkpoint record is stashed in shared memory so that
  * CreateRestartPoint can consult it.  (Note that the latter function is
  * executed by the checkpointer, while this one will be executed by the
@@ -9016,56 +9036,59 @@ XLogRestorePoint(const char *rpName)
  */
 static void
 XLogReportParameters(void)
 {
 	if (wal_level != ControlFile->wal_level ||
 		wal_log_hints != ControlFile->wal_log_hints ||
 		MaxConnections != ControlFile->MaxConnections ||
 		max_worker_processes != ControlFile->max_worker_processes ||
 		max_prepared_xacts != ControlFile->max_prepared_xacts ||
 		max_locks_per_xact != ControlFile->max_locks_per_xact ||
-		track_commit_timestamp != ControlFile->track_commit_timestamp)
+		track_commit_timestamp != ControlFile->track_commit_timestamp ||
+		max_fdw_xacts != ControlFile->max_fdw_xacts)
 	{
 		/*
 		 * The change in number of backend slots doesn't need to be WAL-logged
 		 * if archiving is not enabled, as you can't start archive recovery
 		 * with wal_level=minimal anyway. We don't really care about the
 		 * values in pg_control either if wal_level=minimal, but seems better
 		 * to keep them up-to-date to avoid confusion.
 		 */
 		if (wal_level != ControlFile->wal_level || XLogIsNeeded())
 		{
 			xl_parameter_change xlrec;
 			XLogRecPtr	recptr;
 
 			xlrec.MaxConnections = MaxConnections;
 			xlrec.max_worker_processes = max_worker_processes;
 			xlrec.max_prepared_xacts = max_prepared_xacts;
 			xlrec.max_locks_per_xact = max_locks_per_xact;
 			xlrec.wal_level = wal_level;
 			xlrec.wal_log_hints = wal_log_hints;
 			xlrec.track_commit_timestamp = track_commit_timestamp;
+			xlrec.max_fdw_xacts = max_fdw_xacts;
 
 			XLogBeginInsert();
 			XLogRegisterData((char *) &xlrec, sizeof(xlrec));
 
 			recptr = XLogInsert(RM_XLOG_ID, XLOG_PARAMETER_CHANGE);
 			XLogFlush(recptr);
 		}
 
 		ControlFile->MaxConnections = MaxConnections;
 		ControlFile->max_worker_processes = max_worker_processes;
 		ControlFile->max_prepared_xacts = max_prepared_xacts;
 		ControlFile->max_locks_per_xact = max_locks_per_xact;
 		ControlFile->wal_level = wal_level;
 		ControlFile->wal_log_hints = wal_log_hints;
 		ControlFile->track_commit_timestamp = track_commit_timestamp;
+		ControlFile->max_fdw_xacts = max_fdw_xacts;
 		UpdateControlFile();
 	}
 }
 
 /*
  * Update full_page_writes in shared memory, and write an
  * XLOG_FPW_CHANGE record if necessary.
  *
  * Note: this function assumes there is no other process running
  * concurrently that could update it.
@@ -9240,20 +9263,21 @@ xlog_redo(XLogReaderState *record)
 		 */
 		if (standbyState >= STANDBY_INITIALIZED)
 		{
 			TransactionId *xids;
 			int			nxids;
 			TransactionId oldestActiveXID;
 			TransactionId latestCompletedXid;
 			RunningTransactionsData running;
 
 			oldestActiveXID = PrescanPreparedTransactions(&xids, &nxids);
+			oldestActiveXID = PrescanFDWXacts(oldestActiveXID);
 
 			/*
 			 * Construct a RunningTransactions snapshot representing a shut
 			 * down server, with only prepared transactions still alive. We're
 			 * never overflowed at this point because all subxids are listed
 			 * with their parent prepared transactions.
 			 */
 			running.xcnt = nxids;
 			running.subxcnt = 0;
 			running.subxid_overflow = false;
@@ -9432,20 +9456,21 @@ xlog_redo(XLogReaderState *record)
 		/* Update our copy of the parameters in pg_control */
 		memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_parameter_change));
 
 		LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
 		ControlFile->MaxConnections = xlrec.MaxConnections;
 		ControlFile->max_worker_processes = xlrec.max_worker_processes;
 		ControlFile->max_prepared_xacts = xlrec.max_prepared_xacts;
 		ControlFile->max_locks_per_xact = xlrec.max_locks_per_xact;
 		ControlFile->wal_level = xlrec.wal_level;
 		ControlFile->wal_log_hints = xlrec.wal_log_hints;
+		ControlFile->max_fdw_xacts = xlrec.max_fdw_xacts;
 
 		/*
 		 * Update minRecoveryPoint to ensure that if recovery is aborted, we
 		 * recover back up to this point before allowing hot standby again.
 		 * This is particularly important if wal_level was set to 'archive'
 		 * before, and is now 'hot_standby', to ensure you don't run queries
 		 * against the WAL preceding the wal_level change. Same applies to
 		 * decreasing max_* settings.
 		 */
 		minRecoveryPoint = ControlFile->minRecoveryPoint;
diff --git a/src/backend/bootstrap/bootstrap.c b/src/backend/bootstrap/bootstrap.c
index 95d6c14..3100f50 100644
--- a/src/backend/bootstrap/bootstrap.c
+++ b/src/backend/bootstrap/bootstrap.c
@@ -11,20 +11,21 @@
  *	  src/backend/bootstrap/bootstrap.c
  *
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
 
 #include <unistd.h>
 #include <signal.h>
 
 #include "access/htup_details.h"
+#include "access/fdw_xact.h"
 #include "bootstrap/bootstrap.h"
 #include "catalog/index.h"
 #include "catalog/pg_collation.h"
 #include "catalog/pg_type.h"
 #include "libpq/pqsignal.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
 #include "pg_getopt.h"
 #include "postmaster/bgwriter.h"
 #include "postmaster/startup.h"
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index ccc030f..4691e66 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -236,20 +236,29 @@ CREATE VIEW pg_available_extension_versions AS
            LEFT JOIN pg_extension AS X
              ON E.name = X.extname AND E.version = X.extversion;
 
 CREATE VIEW pg_prepared_xacts AS
     SELECT P.transaction, P.gid, P.prepared,
            U.rolname AS owner, D.datname AS database
     FROM pg_prepared_xact() AS P
          LEFT JOIN pg_authid U ON P.ownerid = U.oid
          LEFT JOIN pg_database D ON P.dbid = D.oid;
 
+CREATE VIEW pg_fdw_xacts AS
+	SELECT P.transaction, D.datname AS database, S.srvname AS "foreign server",
+			U.rolname AS "local user", P.status,
+			P.identifier AS "foreign transaction identifier" 
+	FROM pg_fdw_xact() AS P
+		LEFT JOIN pg_authid U ON P.userid = U.oid
+		LEFT JOIN pg_database D ON P.dbid = D.oid
+		LEFT JOIN pg_foreign_server S ON P.serverid = S.oid;
+
 CREATE VIEW pg_prepared_statements AS
     SELECT * FROM pg_prepared_statement() AS P;
 
 CREATE VIEW pg_seclabels AS
 SELECT
 	l.objoid, l.classoid, l.objsubid,
 	CASE WHEN rel.relkind = 'r' THEN 'table'::text
 		 WHEN rel.relkind = 'v' THEN 'view'::text
 		 WHEN rel.relkind = 'm' THEN 'materialized view'::text
 		 WHEN rel.relkind = 'S' THEN 'sequence'::text
@@ -933,10 +942,18 @@ LANGUAGE INTERNAL
 STRICT IMMUTABLE
 AS 'make_interval';
 
 CREATE OR REPLACE FUNCTION
   jsonb_set(jsonb_in jsonb, path text[] , replacement jsonb,
             create_if_missing boolean DEFAULT true)
 RETURNS jsonb
 LANGUAGE INTERNAL
 STRICT IMMUTABLE
 AS 'jsonb_set';
+
+CREATE OR REPLACE FUNCTION
+  pg_fdw_remove(transaction xid DEFAULT NULL, dbid oid DEFAULT NULL,
+				serverid oid DEFAULT NULL, userid oid DEFAULT NULL)
+RETURNS void
+LANGUAGE INTERNAL
+VOLATILE
+AS 'pg_fdw_remove';
diff --git a/src/backend/commands/foreigncmds.c b/src/backend/commands/foreigncmds.c
index cc912b2..3408252 100644
--- a/src/backend/commands/foreigncmds.c
+++ b/src/backend/commands/foreigncmds.c
@@ -6,20 +6,21 @@
  * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
  *
  *
  * IDENTIFICATION
  *	  src/backend/commands/foreigncmds.c
  *
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
 
+#include "access/fdw_xact.h"
 #include "access/heapam.h"
 #include "access/htup_details.h"
 #include "access/reloptions.h"
 #include "access/xact.h"
 #include "catalog/dependency.h"
 #include "catalog/indexing.h"
 #include "catalog/objectaccess.h"
 #include "catalog/pg_foreign_data_wrapper.h"
 #include "catalog/pg_foreign_server.h"
 #include "catalog/pg_foreign_table.h"
@@ -1080,20 +1081,34 @@ RemoveForeignServerById(Oid srvId)
 	HeapTuple	tp;
 	Relation	rel;
 
 	rel = heap_open(ForeignServerRelationId, RowExclusiveLock);
 
 	tp = SearchSysCache1(FOREIGNSERVEROID, ObjectIdGetDatum(srvId));
 
 	if (!HeapTupleIsValid(tp))
 		elog(ERROR, "cache lookup failed for foreign server %u", srvId);
 
+	/*
+	 * Check if the foreign server has any foreign transaction prepared on it.
+	 * If there is one, and it gets dropped, we will not have any chance to
+	 * resolve that transaction.
+	 */
+	if (fdw_xact_exists(InvalidTransactionId, MyDatabaseId, srvId, InvalidOid))
+	{
+		Form_pg_foreign_server srvForm;
+		srvForm = (Form_pg_foreign_server) GETSTRUCT(tp);
+		ereport(ERROR,
+				(errmsg("server \"%s\" has unresolved prepared transactions on it",
+						NameStr(srvForm->srvname))));
+	}
+
 	simple_heap_delete(rel, &tp->t_self);
 
 	ReleaseSysCache(tp);
 
 	heap_close(rel, RowExclusiveLock);
 }
 
 
 /*
  * Common routine to check permission for user-mapping-related DDL
@@ -1252,20 +1267,21 @@ AlterUserMapping(AlterUserMappingStmt *stmt)
 
 	umId = GetSysCacheOid2(USERMAPPINGUSERSERVER,
 						   ObjectIdGetDatum(useId),
 						   ObjectIdGetDatum(srv->serverid));
 	if (!OidIsValid(umId))
 		ereport(ERROR,
 				(errcode(ERRCODE_UNDEFINED_OBJECT),
 				 errmsg("user mapping \"%s\" does not exist for the server",
 						MappingUserName(useId))));
 
+
 	user_mapping_ddl_aclcheck(useId, srv->serverid, stmt->servername);
 
 	tp = SearchSysCacheCopy1(USERMAPPINGOID, ObjectIdGetDatum(umId));
 
 	if (!HeapTupleIsValid(tp))
 		elog(ERROR, "cache lookup failed for user mapping %u", umId);
 
 	memset(repl_val, 0, sizeof(repl_val));
 	memset(repl_null, false, sizeof(repl_null));
 	memset(repl_repl, false, sizeof(repl_repl));
@@ -1378,20 +1394,31 @@ RemoveUserMapping(DropUserMappingStmt *stmt)
 		/* IF EXISTS specified, just note it */
 		ereport(NOTICE,
 		(errmsg("user mapping \"%s\" does not exist for the server, skipping",
 				MappingUserName(useId))));
 		return InvalidOid;
 	}
 
 	user_mapping_ddl_aclcheck(useId, srv->serverid, srv->servername);
 
 	/*
+	 * If there is a foreign prepared transaction with this user mapping,
+	 * dropping the user mapping might result in dangling prepared
+	 * transaction.
+	 */
+	if (fdw_xact_exists(InvalidTransactionId, MyDatabaseId, srv->serverid,
+						useId))
+		ereport(ERROR,
+				(errmsg("server \"%s\" has unresolved prepared transaction for user \"%s\"",
+							srv->servername, MappingUserName(useId))));
+
+	/*
 	 * Do the deletion
 	 */
 	object.classId = UserMappingRelationId;
 	object.objectId = umId;
 	object.objectSubId = 0;
 
 	performDeletion(&object, DROP_CASCADE, 0);
 
 	return umId;
 }
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 90c2f4a..f82a537 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -86,20 +86,21 @@
 #ifdef USE_BONJOUR
 #include <dns_sd.h>
 #endif
 
 #ifdef HAVE_PTHREAD_IS_THREADED_NP
 #include <pthread.h>
 #endif
 
 #include "access/transam.h"
 #include "access/xlog.h"
+#include "access/fdw_xact.h"
 #include "bootstrap/bootstrap.h"
 #include "catalog/pg_control.h"
 #include "lib/ilist.h"
 #include "libpq/auth.h"
 #include "libpq/ip.h"
 #include "libpq/libpq.h"
 #include "libpq/pqsignal.h"
 #include "miscadmin.h"
 #include "pg_getopt.h"
 #include "pgstat.h"
@@ -2541,21 +2542,20 @@ pmdie(SIGNAL_ARGS)
 							   BACKEND_TYPE_AUTOVAC | BACKEND_TYPE_BGWORKER);
 				/* and the autovac launcher too */
 				if (AutoVacPID != 0)
 					signal_child(AutoVacPID, SIGTERM);
 				/* and the bgwriter too */
 				if (BgWriterPID != 0)
 					signal_child(BgWriterPID, SIGTERM);
 				/* and the walwriter too */
 				if (WalWriterPID != 0)
 					signal_child(WalWriterPID, SIGTERM);
-
 				/*
 				 * If we're in recovery, we can't kill the startup process
 				 * right away, because at present doing so does not release
 				 * its locks.  We might want to change this in a future
 				 * release.  For the time being, the PM_WAIT_READONLY state
 				 * indicates that we're waiting for the regular (read only)
 				 * backends to die off; once they do, we'll kill the startup
 				 * and walreceiver processes.
 				 */
 				pmState = (pmState == PM_RUN) ?
@@ -5705,20 +5705,21 @@ PostmasterMarkPIDForWorkerNotify(int pid)
 
 	dlist_foreach(iter, &BackendList)
 	{
 		bp = dlist_container(Backend, elem, iter.cur);
 		if (bp->pid == pid)
 		{
 			bp->bgworker_notify = true;
 			return true;
 		}
 	}
+
 	return false;
 }
 
 #ifdef EXEC_BACKEND
 
 /*
  * The following need to be available to the save/restore_backend_variables
  * functions.  They are marked NON_EXEC_STATIC in their home modules.
  */
 extern slock_t *ShmemLock;
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index c629da3..6fdd818 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -127,20 +127,21 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
 		case RM_MULTIXACT_ID:
 		case RM_RELMAP_ID:
 		case RM_BTREE_ID:
 		case RM_HASH_ID:
 		case RM_GIN_ID:
 		case RM_GIST_ID:
 		case RM_SEQ_ID:
 		case RM_SPGIST_ID:
 		case RM_BRIN_ID:
 		case RM_COMMIT_TS_ID:
+		case RM_FDW_XACT_ID:
 		case RM_REPLORIGIN_ID:
 			break;
 		case RM_NEXT_ID:
 			elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record));
 	}
 }
 
 /*
  * Handle rmgr XLOG_ID records for DecodeRecordIntoReorderBuffer().
  */
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 32ac58f..a790e5b 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -14,20 +14,21 @@
  */
 #include "postgres.h"
 
 #include "access/clog.h"
 #include "access/commit_ts.h"
 #include "access/heapam.h"
 #include "access/multixact.h"
 #include "access/nbtree.h"
 #include "access/subtrans.h"
 #include "access/twophase.h"
+#include "access/fdw_xact.h"
 #include "commands/async.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
 #include "postmaster/bgworker_internals.h"
 #include "postmaster/bgwriter.h"
 #include "postmaster/postmaster.h"
 #include "replication/slot.h"
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
@@ -132,20 +133,21 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
 		size = add_size(size, ProcSignalShmemSize());
 		size = add_size(size, CheckpointerShmemSize());
 		size = add_size(size, AutoVacuumShmemSize());
 		size = add_size(size, ReplicationSlotsShmemSize());
 		size = add_size(size, ReplicationOriginShmemSize());
 		size = add_size(size, WalSndShmemSize());
 		size = add_size(size, WalRcvShmemSize());
 		size = add_size(size, BTreeShmemSize());
 		size = add_size(size, SyncScanShmemSize());
 		size = add_size(size, AsyncShmemSize());
+		size = add_size(size, FDWXactShmemSize());
 #ifdef EXEC_BACKEND
 		size = add_size(size, ShmemBackendArraySize());
 #endif
 
 		/* freeze the addin request size and include it */
 		addin_request_allowed = false;
 		size = add_size(size, total_addin_request);
 
 		/* might as well round it off to a multiple of a typical page size */
 		size = add_size(size, 8192 - (size % 8192));
@@ -243,20 +245,21 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
 	ReplicationOriginShmemInit();
 	WalSndShmemInit();
 	WalRcvShmemInit();
 
 	/*
 	 * Set up other modules that need some shared memory space
 	 */
 	BTreeShmemInit();
 	SyncScanShmemInit();
 	AsyncShmemInit();
+	FDWXactShmemInit();
 
 #ifdef EXEC_BACKEND
 
 	/*
 	 * Alloc the win32 shared backend array
 	 */
 	if (!IsUnderPostmaster)
 		ShmemBackendArrayAllocation();
 #endif
 
diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt
index c557cb6..d0f1472 100644
--- a/src/backend/storage/lmgr/lwlocknames.txt
+++ b/src/backend/storage/lmgr/lwlocknames.txt
@@ -1,14 +1,12 @@
 # Some commonly-used locks have predefined positions within MainLWLockArray;
-# these are defined here.  If you add a lock, add it to the end to avoid
-# renumbering the existing locks; if you remove a lock, consider leaving a gap
-# in the numbering sequence for the benefit of DTrace and other external
+# these are defined here.  If you add a lock, add it to the end to avoid # renumbering the existing locks; if you remove a lock, consider leaving a gap # in the numbering sequence for the benefit of DTrace and other external
 # debugging scripts.
 
 # 0 is available; was formerly BufFreelistLock
 ShmemIndexLock						1
 OidGenLock							2
 XidGenLock							3
 ProcArrayLock						4
 SInvalReadLock						5
 SInvalWriteLock						6
 WALBufMappingLock					7
@@ -39,10 +37,11 @@ OldSerXidLock						31
 SyncRepLock							32
 BackgroundWorkerLock				33
 DynamicSharedMemoryControlLock		34
 AutoFileLock						35
 ReplicationSlotAllocationLock		36
 ReplicationSlotControlLock			37
 CommitTsControlLock					38
 CommitTsLock						39
 ReplicationOriginLock				40
 MultiXactTruncationLock				41
+FDWXactLock							42
diff --git a/src/backend/utils/adt/xid.c b/src/backend/utils/adt/xid.c
index 6b61765..d6cba87 100644
--- a/src/backend/utils/adt/xid.c
+++ b/src/backend/utils/adt/xid.c
@@ -15,21 +15,20 @@
 #include "postgres.h"
 
 #include <limits.h>
 
 #include "access/multixact.h"
 #include "access/transam.h"
 #include "access/xact.h"
 #include "libpq/pqformat.h"
 #include "utils/builtins.h"
 
-#define PG_GETARG_TRANSACTIONID(n)	DatumGetTransactionId(PG_GETARG_DATUM(n))
 #define PG_RETURN_TRANSACTIONID(x)	return TransactionIdGetDatum(x)
 
 #define PG_GETARG_COMMANDID(n)		DatumGetCommandId(PG_GETARG_DATUM(n))
 #define PG_RETURN_COMMANDID(x)		return CommandIdGetDatum(x)
 
 
 Datum
 xidin(PG_FUNCTION_ARGS)
 {
 	char	   *str = PG_GETARG_CSTRING(0);
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index fda0fb9..1fe94bb 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -20,20 +20,21 @@
 #include <float.h>
 #include <math.h>
 #include <limits.h>
 #include <unistd.h>
 #include <sys/stat.h>
 #ifdef HAVE_SYSLOG
 #include <syslog.h>
 #endif
 
 #include "access/commit_ts.h"
+#include "access/fdw_xact.h"
 #include "access/gin.h"
 #include "access/transam.h"
 #include "access/twophase.h"
 #include "access/xact.h"
 #include "catalog/namespace.h"
 #include "commands/async.h"
 #include "commands/prepare.h"
 #include "commands/vacuum.h"
 #include "commands/variable.h"
 #include "commands/trigger.h"
@@ -1999,20 +2000,33 @@ static struct config_int ConfigureNamesInt[] =
 	{
 		{"max_prepared_transactions", PGC_POSTMASTER, RESOURCES_MEM,
 			gettext_noop("Sets the maximum number of simultaneously prepared transactions."),
 			NULL
 		},
 		&max_prepared_xacts,
 		0, 0, MAX_BACKENDS,
 		NULL, NULL, NULL
 	},
 
+	/*
+	 * See also CheckRequiredParameterValues() if this parameter changes
+	 */
+	{
+		{"max_prepared_foreign_transactions", PGC_POSTMASTER, RESOURCES_MEM,
+			gettext_noop("Sets the maximum number of simultaneously prepared transactions on foreign servers."),
+			NULL
+		},
+		&max_fdw_xacts,
+		0, 0, INT_MAX,
+		NULL, NULL, NULL
+	},
+
 #ifdef LOCK_DEBUG
 	{
 		{"trace_lock_oidmin", PGC_SUSET, DEVELOPER_OPTIONS,
 			gettext_noop("Sets the minimum OID of tables for tracking locks."),
 			gettext_noop("Is used to avoid output on system tables."),
 			GUC_NOT_IN_SAMPLE
 		},
 		&Trace_lock_oidmin,
 		FirstNormalObjectId, 0, INT_MAX,
 		NULL, NULL, NULL
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index dcf929f..f7df0e2 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -116,20 +116,26 @@
 					# (change requires restart)
 #huge_pages = try			# on, off, or try
 					# (change requires restart)
 #temp_buffers = 8MB			# min 800kB
 #max_prepared_transactions = 0		# zero disables the feature
 					# (change requires restart)
 # Note:  Increasing max_prepared_transactions costs ~600 bytes of shared memory
 # per transaction slot, plus lock space (see max_locks_per_transaction).
 # It is not advisable to set max_prepared_transactions nonzero unless you
 # actively intend to use prepared transactions.
+#max_prepared_foreign_transactions = 0		# zero disables the feature
+					# (change requires restart)
+# Note:  Increasing max_prepared_foreign_transactions costs ~600(?) bytes of shared memory
+# per foreign transaction slot.
+# It is not advisable to set max_prepared_foreign_transactions nonzero unless you
+# actively intend to use atomic foreign transactions feature. 
 #work_mem = 4MB				# min 64kB
 #maintenance_work_mem = 64MB		# min 1MB
 #autovacuum_work_mem = -1		# min 1MB, or -1 to use maintenance_work_mem
 #max_stack_depth = 2MB			# min 100kB
 #dynamic_shared_memory_type = posix	# the default is the first option
 					# supported by the operating system:
 					#   posix
 					#   sysv
 					#   windows
 					#   mmap
diff --git a/src/bin/initdb/initdb.c b/src/bin/initdb/initdb.c
index feeff9e..47ecf1e 100644
--- a/src/bin/initdb/initdb.c
+++ b/src/bin/initdb/initdb.c
@@ -192,31 +192,32 @@ static const char *subdirs[] = {
 	"pg_xlog",
 	"pg_xlog/archive_status",
 	"pg_clog",
 	"pg_commit_ts",
 	"pg_dynshmem",
 	"pg_notify",
 	"pg_serial",
 	"pg_snapshots",
 	"pg_subtrans",
 	"pg_twophase",
+	"pg_fdw_xact",
 	"pg_multixact/members",
 	"pg_multixact/offsets",
 	"base",
 	"base/1",
 	"pg_replslot",
 	"pg_tblspc",
 	"pg_stat",
 	"pg_stat_tmp",
 	"pg_logical",
 	"pg_logical/snapshots",
-	"pg_logical/mappings"
+	"pg_logical/mappings",
 };
 
 
 /* path to 'initdb' binary directory */
 static char bin_path[MAXPGPATH];
 static char backend_exec[MAXPGPATH];
 
 static char **replace_token(char **lines,
 			  const char *token, const char *replacement);
 
diff --git a/src/bin/pg_controldata/pg_controldata.c b/src/bin/pg_controldata/pg_controldata.c
index 32e1d81..8e4cf86 100644
--- a/src/bin/pg_controldata/pg_controldata.c
+++ b/src/bin/pg_controldata/pg_controldata.c
@@ -327,12 +327,14 @@ main(int argc, char *argv[])
 	printf(_("Size of a large-object chunk:         %u\n"),
 		   ControlFile.loblksize);
 	printf(_("Date/time type storage:               %s\n"),
 		   (ControlFile.enableIntTimes ? _("64-bit integers") : _("floating-point numbers")));
 	printf(_("Float4 argument passing:              %s\n"),
 		   (ControlFile.float4ByVal ? _("by value") : _("by reference")));
 	printf(_("Float8 argument passing:              %s\n"),
 		   (ControlFile.float8ByVal ? _("by value") : _("by reference")));
 	printf(_("Data page checksum version:           %u\n"),
 		   ControlFile.data_checksum_version);
+	printf(_("Current max_fdw_xacts setting:   %d\n"),
+		   ControlFile.max_fdw_xacts);
 	return 0;
 }
diff --git a/src/bin/pg_resetxlog/pg_resetxlog.c b/src/bin/pg_resetxlog/pg_resetxlog.c
index d7ac2ba..1e3b4a2 100644
--- a/src/bin/pg_resetxlog/pg_resetxlog.c
+++ b/src/bin/pg_resetxlog/pg_resetxlog.c
@@ -579,20 +579,21 @@ GuessControlValues(void)
 	ControlFile.unloggedLSN = 1;
 
 	/* minRecoveryPoint, backupStartPoint and backupEndPoint can be left zero */
 
 	ControlFile.wal_level = WAL_LEVEL_MINIMAL;
 	ControlFile.wal_log_hints = false;
 	ControlFile.track_commit_timestamp = false;
 	ControlFile.MaxConnections = 100;
 	ControlFile.max_worker_processes = 8;
 	ControlFile.max_prepared_xacts = 0;
+	ControlFile.max_fdw_xacts = 0;
 	ControlFile.max_locks_per_xact = 64;
 
 	ControlFile.maxAlign = MAXIMUM_ALIGNOF;
 	ControlFile.floatFormat = FLOATFORMAT_VALUE;
 	ControlFile.blcksz = BLCKSZ;
 	ControlFile.relseg_size = RELSEG_SIZE;
 	ControlFile.xlog_blcksz = XLOG_BLCKSZ;
 	ControlFile.xlog_seg_size = XLOG_SEG_SIZE;
 	ControlFile.nameDataLen = NAMEDATALEN;
 	ControlFile.indexMaxKeys = INDEX_MAX_KEYS;
@@ -795,20 +796,21 @@ RewriteControlFile(void)
 	 * Force the defaults for max_* settings. The values don't really matter
 	 * as long as wal_level='minimal'; the postmaster will reset these fields
 	 * anyway at startup.
 	 */
 	ControlFile.wal_level = WAL_LEVEL_MINIMAL;
 	ControlFile.wal_log_hints = false;
 	ControlFile.track_commit_timestamp = false;
 	ControlFile.MaxConnections = 100;
 	ControlFile.max_worker_processes = 8;
 	ControlFile.max_prepared_xacts = 0;
+	ControlFile.max_fdw_xacts = 0;
 	ControlFile.max_locks_per_xact = 64;
 
 	/* Now we can force the recorded xlog seg size to the right thing. */
 	ControlFile.xlog_seg_size = XLogSegSize;
 
 	/* Contents are protected with a CRC */
 	INIT_CRC32C(ControlFile.crc);
 	COMP_CRC32C(ControlFile.crc,
 				(char *) &ControlFile,
 				offsetof(ControlFileData, crc));
diff --git a/src/bin/pg_xlogdump/rmgrdesc.c b/src/bin/pg_xlogdump/rmgrdesc.c
index 5b88a8d..82c6b51 100644
--- a/src/bin/pg_xlogdump/rmgrdesc.c
+++ b/src/bin/pg_xlogdump/rmgrdesc.c
@@ -14,20 +14,21 @@
 #include "access/gin.h"
 #include "access/gist_private.h"
 #include "access/hash.h"
 #include "access/heapam_xlog.h"
 #include "access/multixact.h"
 #include "access/nbtree.h"
 #include "access/rmgr.h"
 #include "access/spgist.h"
 #include "access/xact.h"
 #include "access/xlog_internal.h"
+#include "access/fdw_xact.h"
 #include "catalog/storage_xlog.h"
 #include "commands/dbcommands_xlog.h"
 #include "commands/sequence.h"
 #include "commands/tablespace.h"
 #include "replication/origin.h"
 #include "rmgrdesc.h"
 #include "storage/standby.h"
 #include "utils/relmapper.h"
 
 #define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup) \
diff --git a/src/include/access/fdw_xact.h b/src/include/access/fdw_xact.h
new file mode 100644
index 0000000..664de7e
--- /dev/null
+++ b/src/include/access/fdw_xact.h
@@ -0,0 +1,73 @@
+/*
+ * fdw_xact.h 
+ *
+ * PostgreSQL distributed transaction manager
+ *
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/fdw_xact.h
+ */
+#ifndef FDW_XACT_H 
+#define FDW_XACT_H 
+
+#include "storage/backendid.h"
+#include "access/xlogreader.h"
+#include "lib/stringinfo.h"
+#include "nodes/pg_list.h"
+
+/*
+ * On disk file structure
+ */
+typedef struct
+{
+	Oid				dboid;		/* database oid where to find foreign server and
+								 * user mapping
+								 */
+	TransactionId	local_xid;
+	Oid				serveroid;			/* foreign server where transaction takes place */
+	Oid				userid;				/* user who initiated the foreign transaction */
+	uint32			fdw_xact_id_len;	/* Length of the value stored in the next field */
+	/* This should always be the last member */
+	char			fdw_xact_id[FLEXIBLE_ARRAY_MEMBER];	/* variable length array
+														 * to store foreign transaction
+														 * information.
+														 */
+} FDWXactOnDiskData;
+
+typedef struct
+{
+	TransactionId	xid;
+	Oid				serveroid;
+	Oid				userid;
+	Oid				dbid;
+} FdwRemoveXlogRec;
+
+extern int	max_fdw_xacts;
+
+/* Info types for logs related to FDW transactions */
+#define XLOG_FDW_XACT_INSERT	0x00
+#define XLOG_FDW_XACT_REMOVE	0x10
+
+extern Size FDWXactShmemSize(void);
+extern void FDWXactShmemInit(void);
+extern void ReadFDWXacts(void);
+extern TransactionId PrescanFDWXacts(TransactionId oldestActiveXid);
+extern bool fdw_xact_has_usermapping(Oid serverid, Oid userid);
+extern bool fdw_xact_has_server(Oid serverid);
+extern void fdw_xact_redo(XLogReaderState *record);
+extern void fdw_xact_desc(StringInfo buf, XLogReaderState *record);
+extern const char *fdw_xact_identify(uint8 info);
+extern void AtEOXact_FDWXacts(bool is_commit);
+extern void AtPrepare_FDWXacts(void);
+extern void FDWXactTwoPhaseFinish(bool isCommit, TransactionId xid);
+extern bool fdw_xact_exists(TransactionId xid, Oid dboid, Oid serverid,
+								Oid userid);
+extern void CheckPointFDWXact(XLogRecPtr redo_horizon);
+extern void RegisterXactForeignServer(Oid serveroid, Oid userid, bool can_prepare);
+extern bool FdwTwoPhaseNeeded(void);
+extern void PreCommit_FDWXacts(void);
+/* For the sake of foreign transaction resolver */
+extern List	*get_dbids_with_unresolved_xact(void);
+
+#endif /* FDW_XACT_H */
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index c083216..7272c33 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -37,11 +37,12 @@ PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify,
 PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL)
 PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL)
 PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, NULL, NULL)
 PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL)
 PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup)
 PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup)
 PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL)
 PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup)
 PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL)
 PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL)
+PG_RMGR(RM_FDW_XACT_ID, "Foreign Transactions", fdw_xact_redo, fdw_xact_desc, fdw_xact_identify, NULL, NULL)
 PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL)
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index cb1c2db..d614ab6 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -296,20 +296,21 @@ typedef struct xl_xact_parsed_abort
 	RelFileNode *xnodes;
 
 	TransactionId twophase_xid; /* only for 2PC */
 } xl_xact_parsed_abort;
 
 
 /* ----------------
  *		extern definitions
  * ----------------
  */
+#define PG_GETARG_TRANSACTIONID(n)	DatumGetTransactionId(PG_GETARG_DATUM(n))
 extern bool IsTransactionState(void);
 extern bool IsAbortedTransactionBlockState(void);
 extern TransactionId GetTopTransactionId(void);
 extern TransactionId GetTopTransactionIdIfAny(void);
 extern TransactionId GetCurrentTransactionId(void);
 extern TransactionId GetCurrentTransactionIdIfAny(void);
 extern TransactionId GetStableLatestTransactionId(void);
 extern SubTransactionId GetCurrentSubTransactionId(void);
 extern void MarkCurrentTransactionIdLoggedIfAny(void);
 extern bool SubTransactionIsActive(SubTransactionId subxid);
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index 86b532d..9ce64bd 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -206,20 +206,21 @@ typedef XLogLongPageHeaderData *XLogLongPageHeader;
 
 /*
  * Information logged when we detect a change in one of the parameters
  * important for Hot Standby.
  */
 typedef struct xl_parameter_change
 {
 	int			MaxConnections;
 	int			max_worker_processes;
 	int			max_prepared_xacts;
+	int			max_fdw_xacts;
 	int			max_locks_per_xact;
 	int			wal_level;
 	bool		wal_log_hints;
 	bool		track_commit_timestamp;
 } xl_parameter_change;
 
 /* logs restore point */
 typedef struct xl_restore_point
 {
 	TimestampTz rp_time;
diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h
index ad1eb4b..d168c32 100644
--- a/src/include/catalog/pg_control.h
+++ b/src/include/catalog/pg_control.h
@@ -173,20 +173,21 @@ typedef struct ControlFileData
 
 	/*
 	 * Parameter settings that determine if the WAL can be used for archival
 	 * or hot standby.
 	 */
 	int			wal_level;
 	bool		wal_log_hints;
 	int			MaxConnections;
 	int			max_worker_processes;
 	int			max_prepared_xacts;
+	int			max_fdw_xacts;
 	int			max_locks_per_xact;
 	bool		track_commit_timestamp;
 
 	/*
 	 * This data is used to check for hardware-architecture compatibility of
 	 * the database and the backend executable.  We need not check endianness
 	 * explicitly, since the pg_control version will surely look wrong to a
 	 * machine of different endianness, but we do need to worry about MAXALIGN
 	 * and floating-point format.  (Note: storage layout nominally also
 	 * depends on SHORTALIGN and INTALIGN, but in practice these are the same
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index f688454..00a119a 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -5268,20 +5268,26 @@ DESCR("fractional rank of hypothetical row");
 DATA(insert OID = 3989 ( percent_rank_final PGNSP PGUID 12 1 0 2276 0 f f f f f f i s 2 0 701 "2281 2276" "{2281,2276}" "{i,v}" _null_ _null_ _null_ hypothetical_percent_rank_final _null_ _null_ _null_ ));
 DESCR("aggregate final function");
 DATA(insert OID = 3990 ( cume_dist			PGNSP PGUID 12 1 0 2276 0 t f f f f f i s 1 0 701 "2276" "{2276}" "{v}" _null_ _null_ _null_ aggregate_dummy _null_ _null_ _null_ ));
 DESCR("cumulative distribution of hypothetical row");
 DATA(insert OID = 3991 ( cume_dist_final	PGNSP PGUID 12 1 0 2276 0 f f f f f f i s 2 0 701 "2281 2276" "{2281,2276}" "{i,v}" _null_ _null_ _null_ hypothetical_cume_dist_final _null_ _null_ _null_ ));
 DESCR("aggregate final function");
 DATA(insert OID = 3992 ( dense_rank			PGNSP PGUID 12 1 0 2276 0 t f f f f f i s 1 0 20 "2276" "{2276}" "{v}" _null_ _null_ _null_	aggregate_dummy _null_ _null_ _null_ ));
 DESCR("rank of hypothetical row without gaps");
 DATA(insert OID = 3993 ( dense_rank_final	PGNSP PGUID 12 1 0 2276 0 f f f f f f i s 2 0 20 "2281 2276" "{2281,2276}" "{i,v}" _null_ _null_ _null_	hypothetical_dense_rank_final _null_ _null_ _null_ ));
 DESCR("aggregate final function");
+DATA(insert OID = 4066 ( pg_fdw_xact	PGNSP PGUID 12 1 1000 0 0 f f f f t t v u 0 0 2249 "" "{26, 28,26,26,25,25}" "{o,o,o,o,o,o}" "{dbid, transaction,serverid,userid,status,identifier}" _null_ _null_ pg_fdw_xact _null_ _null_ _null_ ));
+DESCR("view foreign transactions");
+DATA(insert OID = 4083 ( pg_fdw_resolve	PGNSP PGUID 12 1 1000 0 0 f f f f t t v u 0 0 2249 "" "{26, 28,26,26,25,25}" "{o,o,o,o,o,o}" "{dbid, transaction,serverid,userid,status,identifier}" _null_ _null_ pg_fdw_resolve _null_ _null_ _null_ ));
+DESCR("resolve foreign transactions");
+DATA(insert OID = 4099 ( pg_fdw_remove PGNSP PGUID 12 1 0 0 0 f f f f f f v u 4 0 2278 "28 26 26 26" _null_ _null_ "{transaction,dbid,serverid,userid}" _null_ _null_ pg_fdw_remove _null_ _null_ _null_ ));
+DESCR("remove foreign transactions");
 
 /* pg_upgrade support */
 DATA(insert OID = 3582 ( binary_upgrade_set_next_pg_type_oid PGNSP PGUID  12 1 0 0 0 f f f f t f v s 1 0 2278 "26" _null_ _null_ _null_ _null_ _null_ binary_upgrade_set_next_pg_type_oid _null_ _null_ _null_ ));
 DESCR("for use by pg_upgrade");
 DATA(insert OID = 3584 ( binary_upgrade_set_next_array_pg_type_oid PGNSP PGUID	12 1 0 0 0 f f f f t f v s 1 0 2278 "26" _null_ _null_ _null_ _null_ _null_ binary_upgrade_set_next_array_pg_type_oid _null_ _null_ _null_ ));
 DESCR("for use by pg_upgrade");
 DATA(insert OID = 3585 ( binary_upgrade_set_next_toast_pg_type_oid PGNSP PGUID	12 1 0 0 0 f f f f t f v s 1 0 2278 "26" _null_ _null_ _null_ _null_ _null_ binary_upgrade_set_next_toast_pg_type_oid _null_ _null_ _null_ ));
 DESCR("for use by pg_upgrade");
 DATA(insert OID = 3586 ( binary_upgrade_set_next_heap_pg_class_oid PGNSP PGUID	12 1 0 0 0 f f f f t f v s 1 0 2278 "26" _null_ _null_ _null_ _null_ _null_ binary_upgrade_set_next_heap_pg_class_oid _null_ _null_ _null_ ));
 DESCR("for use by pg_upgrade");
diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h
index 69b48b4..d1ddb4e 100644
--- a/src/include/foreign/fdwapi.h
+++ b/src/include/foreign/fdwapi.h
@@ -5,20 +5,21 @@
  *
  * Copyright (c) 2010-2015, PostgreSQL Global Development Group
  *
  * src/include/foreign/fdwapi.h
  *
  *-------------------------------------------------------------------------
  */
 #ifndef FDWAPI_H
 #define FDWAPI_H
 
+#include "access/xact.h"
 #include "nodes/execnodes.h"
 #include "nodes/relation.h"
 
 /* To avoid including explain.h here, reference ExplainState thus: */
 struct ExplainState;
 
 
 /*
  * Callback function signatures --- see fdwhandler.sgml for more info.
  */
@@ -110,20 +111,32 @@ typedef int (*AcquireSampleRowsFunc) (Relation relation, int elevel,
 											   HeapTuple *rows, int targrows,
 												  double *totalrows,
 												  double *totaldeadrows);
 
 typedef bool (*AnalyzeForeignTable_function) (Relation relation,
 												 AcquireSampleRowsFunc *func,
 													BlockNumber *totalpages);
 
 typedef List *(*ImportForeignSchema_function) (ImportForeignSchemaStmt *stmt,
 														   Oid serverOid);
+typedef bool (*EndForeignTransaction_function) (Oid serverOid, Oid userid,
+													bool is_commit);
+typedef bool (*PrepareForeignTransaction_function) (Oid serverOid, Oid userid,
+														int prep_info_len,
+														char *prep_info);
+typedef bool (*ResolvePreparedForeignTransaction_function) (Oid serverOid, Oid userid,
+															bool is_commit,
+														int prep_info_len,
+														char *prep_info);
+typedef char *(*GetPrepareId_function) (Oid serverOid, Oid userid,
+														int *prep_info_len);
+
 
 /*
  * FdwRoutine is the struct returned by a foreign-data wrapper's handler
  * function.  It provides pointers to the callback functions needed by the
  * planner and executor.
  *
  * More function pointers are likely to be added in the future.  Therefore
  * it's recommended that the handler initialize the struct with
  * makeNode(FdwRoutine) so that all fields are set to NULL.  This will
  * ensure that no fields are accidentally left undefined.
@@ -165,20 +178,26 @@ typedef struct FdwRoutine
 
 	/* Support functions for EXPLAIN */
 	ExplainForeignScan_function ExplainForeignScan;
 	ExplainForeignModify_function ExplainForeignModify;
 
 	/* Support functions for ANALYZE */
 	AnalyzeForeignTable_function AnalyzeForeignTable;
 
 	/* Support functions for IMPORT FOREIGN SCHEMA */
 	ImportForeignSchema_function ImportForeignSchema;
+
+	/* Support functions for foreign transactions */
+	GetPrepareId_function				GetPrepareId;
+	EndForeignTransaction_function		EndForeignTransaction;
+	PrepareForeignTransaction_function	PrepareForeignTransaction;
+	ResolvePreparedForeignTransaction_function ResolvePreparedForeignTransaction;
 } FdwRoutine;
 
 
 /* Functions in foreign/foreign.c */
 extern FdwRoutine *GetFdwRoutine(Oid fdwhandler);
 extern Oid	GetForeignServerIdByRelId(Oid relid);
 extern FdwRoutine *GetFdwRoutineByServerId(Oid serverid);
 extern FdwRoutine *GetFdwRoutineByRelId(Oid relid);
 extern FdwRoutine *GetFdwRoutineForRelation(Relation relation, bool makecopy);
 extern bool IsImportableForeignTable(const char *tablename,
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 3d68017..7458d5b 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -223,25 +223,26 @@ typedef struct PROC_HDR
 } PROC_HDR;
 
 extern PROC_HDR *ProcGlobal;
 
 extern PGPROC *PreparedXactProcs;
 
 /*
  * We set aside some extra PGPROC structures for auxiliary processes,
  * ie things that aren't full-fledged backends but need shmem access.
  *
- * Background writer, checkpointer and WAL writer run during normal operation.
- * Startup process and WAL receiver also consume 2 slots, but WAL writer is
- * launched only after startup has exited, so we only need 4 slots.
+ * Background writer, checkpointer, WAL writer and foreign transaction resolver
+ * run during normal operation. Startup process and WAL receiver also consume 2
+ * slots, but WAL writer is launched only after startup has exited, so we only
+ * need 5 slots.
  */
-#define NUM_AUXILIARY_PROCS		4
+#define NUM_AUXILIARY_PROCS		5
 
 
 /* configurable options */
 extern int	DeadlockTimeout;
 extern int	StatementTimeout;
 extern int	LockTimeout;
 extern bool log_lock_waits;
 
 
 /*
diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h
index c193e44..56df3a5 100644
--- a/src/include/utils/builtins.h
+++ b/src/include/utils/builtins.h
@@ -1258,11 +1258,15 @@ extern Datum pg_available_extensions(PG_FUNCTION_ARGS);
 extern Datum pg_available_extension_versions(PG_FUNCTION_ARGS);
 extern Datum pg_extension_update_paths(PG_FUNCTION_ARGS);
 extern Datum pg_extension_config_dump(PG_FUNCTION_ARGS);
 
 /* commands/prepare.c */
 extern Datum pg_prepared_statement(PG_FUNCTION_ARGS);
 
 /* utils/mmgr/portalmem.c */
 extern Datum pg_cursor(PG_FUNCTION_ARGS);
 
+/* access/transam/fdw_xact.c */
+extern Datum pg_fdw_xact(PG_FUNCTION_ARGS);
+extern Datum pg_fdw_resolve(PG_FUNCTION_ARGS);
+extern Datum pg_fdw_remove(PG_FUNCTION_ARGS);
 #endif   /* BUILTINS_H */
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 80374e4..e95334d 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1305,20 +1305,30 @@ pg_available_extensions| SELECT e.name,
     e.comment
    FROM (pg_available_extensions() e(name, default_version, comment)
      LEFT JOIN pg_extension x ON ((e.name = x.extname)));
 pg_cursors| SELECT c.name,
     c.statement,
     c.is_holdable,
     c.is_binary,
     c.is_scrollable,
     c.creation_time
    FROM pg_cursor() c(name, statement, is_holdable, is_binary, is_scrollable, creation_time);
+pg_fdw_xacts| SELECT p.transaction,
+    d.datname AS database,
+    s.srvname AS "foreign server",
+    u.rolname AS "local user",
+    p.status,
+    p.identifier AS "foreign transaction identifier"
+   FROM (((pg_fdw_xact() p(dbid, transaction, serverid, userid, status, identifier)
+     LEFT JOIN pg_authid u ON ((p.userid = u.oid)))
+     LEFT JOIN pg_database d ON ((p.dbid = d.oid)))
+     LEFT JOIN pg_foreign_server s ON ((p.serverid = s.oid)));
 pg_file_settings| SELECT a.sourcefile,
     a.sourceline,
     a.seqno,
     a.name,
     a.setting,
     a.applied,
     a.error
    FROM pg_show_all_file_settings() a(sourcefile, sourceline, seqno, name, setting, applied, error);
 pg_group| SELECT pg_authid.rolname AS groname,
     pg_authid.oid AS grosysid,
diff --git a/src/test/regress/pg_regress.c b/src/test/regress/pg_regress.c
index dd65ab5..3c23446 100644
--- a/src/test/regress/pg_regress.c
+++ b/src/test/regress/pg_regress.c
@@ -2224,37 +2224,40 @@ regression_main(int argc, char *argv[], init_function ifunc, test_function tfunc
 		if (system(buf))
 		{
 			fprintf(stderr, _("\n%s: initdb failed\nExamine %s/log/initdb.log for the reason.\nCommand was: %s\n"), progname, outputdir, buf);
 			exit(2);
 		}
 
 		/*
 		 * Adjust the default postgresql.conf for regression testing. The user
 		 * can specify a file to be appended; in any case we expand logging
 		 * and set max_prepared_transactions to enable testing of prepared
-		 * xacts.  (Note: to reduce the probability of unexpected shmmax
-		 * failures, don't set max_prepared_transactions any higher than
-		 * actually needed by the prepared_xacts regression test.)
+		 * xacts. We also set max_fdw_transctions to enable testing of atomic
+		 * foreign transactions. (Note: to reduce the probability of unexpected
+		 * shmmax failures, don't set max_prepared_transactions or
+		 * max_prepared_foreign_transactions any higher than actually needed by the
+		 * corresponding regression tests.).
 		 */
 		snprintf(buf, sizeof(buf), "%s/data/postgresql.conf", temp_instance);
 		pg_conf = fopen(buf, "a");
 		if (pg_conf == NULL)
 		{
 			fprintf(stderr, _("\n%s: could not open \"%s\" for adding extra config: %s\n"), progname, buf, strerror(errno));
 			exit(2);
 		}
 		fputs("\n# Configuration added by pg_regress\n\n", pg_conf);
 		fputs("log_autovacuum_min_duration = 0\n", pg_conf);
 		fputs("log_checkpoints = on\n", pg_conf);
 		fputs("log_lock_waits = on\n", pg_conf);
 		fputs("log_temp_files = 128kB\n", pg_conf);
-		fputs("max_prepared_transactions = 2\n", pg_conf);
+		fputs("max_prepared_transactions = 3\n", pg_conf);
+		fputs("max_prepared_foreign_transactions = 2\n", pg_conf);
 
 		if (temp_config != NULL)
 		{
 			FILE	   *extra_conf;
 			char		line_buf[1024];
 
 			extra_conf = fopen(temp_config, "r");
 			if (extra_conf == NULL)
 			{
 				fprintf(stderr, _("\n%s: could not open \"%s\" to read extra config: %s\n"), progname, temp_config, strerror(errno));
