From 8881170d9a7e01cb1ace337e3392ad7a74725211 Mon Sep 17 00:00:00 2001
From: Stas Kelvich <stanconn@gmail.com>
Date: Wed, 25 Apr 2018 16:39:09 +0300
Subject: [PATCH 3/3] postgres_fdw support for global snapshots

---
 contrib/postgres_fdw/Makefile                  |   9 +
 contrib/postgres_fdw/connection.c              | 292 ++++++++++++++++++++++---
 contrib/postgres_fdw/postgres_fdw.c            |  12 +
 contrib/postgres_fdw/postgres_fdw.h            |   2 +
 contrib/postgres_fdw/t/001_bank_coordinator.pl | 264 ++++++++++++++++++++++
 contrib/postgres_fdw/t/002_bank_participant.pl | 240 ++++++++++++++++++++
 src/test/perl/PostgresNode.pm                  |  31 +++
 7 files changed, 823 insertions(+), 27 deletions(-)
 create mode 100644 contrib/postgres_fdw/t/001_bank_coordinator.pl
 create mode 100644 contrib/postgres_fdw/t/002_bank_participant.pl

diff --git a/contrib/postgres_fdw/Makefile b/contrib/postgres_fdw/Makefile
index 85394b4f1f..02ae067cd0 100644
--- a/contrib/postgres_fdw/Makefile
+++ b/contrib/postgres_fdw/Makefile
@@ -23,3 +23,12 @@ top_builddir = ../..
 include $(top_builddir)/src/Makefile.global
 include $(top_srcdir)/contrib/contrib-global.mk
 endif
+
+# Global makefile will do temp-install for 'check'. Since REGRESS is defined,
+# PGXS (included from contrib-global.mk or directly) will care to add
+# postgres_fdw to it as EXTRA_INSTALL and build pg_regress. It will also
+# actually run pg_regress, so the only thing left is tap tests.
+check: tapcheck
+
+tapcheck: temp-install
+	$(prove_check)
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index fe4893a8e0..3759eaa8e1 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -14,9 +14,11 @@
 
 #include "postgres_fdw.h"
 
+#include "access/global_snapshot.h"
 #include "access/htup_details.h"
 #include "catalog/pg_user_mapping.h"
 #include "access/xact.h"
+#include "access/xlog.h" /* GetSystemIdentifier() */
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -24,6 +26,8 @@
 #include "utils/hsearch.h"
 #include "utils/inval.h"
 #include "utils/memutils.h"
+#include "utils/snapmgr.h"
+#include "utils/snapshot.h"
 #include "utils/syscache.h"
 
 
@@ -65,6 +69,21 @@ typedef struct ConnCacheEntry
  */
 static HTAB *ConnectionHash = NULL;
 
+/*
+ * FdwTransactionState
+ *
+ * Holds number of open remote transactions and shared state
+ * needed for all connection entries.
+ */
+typedef struct FdwTransactionState
+{
+	char		*gid;
+	int			nparticipants;
+	GlobalCSN	global_csn;
+	bool		two_phase_commit;
+} FdwTransactionState;
+static FdwTransactionState *fdwTransState;
+
 /* for assigning cursor numbers and prepared statement numbers */
 static unsigned int cursor_number = 0;
 static unsigned int prep_stmt_number = 0;
@@ -72,6 +91,9 @@ static unsigned int prep_stmt_number = 0;
 /* tracks whether any work is needed in callback functions */
 static bool xact_got_connection = false;
 
+/* counter of prepared tx made by this backend */
+static int two_phase_xact_count = 0;
+
 /* prototypes of private functions */
 static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
 static void disconnect_pg_server(ConnCacheEntry *entry);
@@ -80,6 +102,7 @@ static void configure_remote_session(PGconn *conn);
 static void do_sql_command(PGconn *conn, const char *sql);
 static void begin_remote_xact(ConnCacheEntry *entry);
 static void pgfdw_xact_callback(XactEvent event, void *arg);
+static void deallocate_prepared_stmts(ConnCacheEntry *entry);
 static void pgfdw_subxact_callback(SubXactEvent event,
 					   SubTransactionId mySubid,
 					   SubTransactionId parentSubid,
@@ -136,6 +159,15 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
 									  pgfdw_inval_callback, (Datum) 0);
 	}
 
+	/* allocate FdwTransactionState */
+	if (fdwTransState == NULL)
+	{
+		MemoryContext oldcxt;
+		oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+		fdwTransState = palloc0(sizeof(FdwTransactionState));
+		MemoryContextSwitchTo(oldcxt);
+	}
+
 	/* Set flag that we did GetConnection during the current transaction */
 	xact_got_connection = true;
 
@@ -388,7 +420,8 @@ configure_remote_session(PGconn *conn)
 }
 
 /*
- * Convenience subroutine to issue a non-data-returning SQL command to remote
+ * Convenience subroutine to issue a non-data-returning SQL command or
+ * statement to remote node.
  */
 static void
 do_sql_command(PGconn *conn, const char *sql)
@@ -398,7 +431,8 @@ do_sql_command(PGconn *conn, const char *sql)
 	if (!PQsendQuery(conn, sql))
 		pgfdw_report_error(ERROR, NULL, conn, false, sql);
 	res = pgfdw_get_result(conn, sql);
-	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+	if (PQresultStatus(res) != PGRES_COMMAND_OK &&
+			PQresultStatus(res) != PGRES_TUPLES_OK)
 		pgfdw_report_error(ERROR, res, conn, true, sql);
 	PQclear(res);
 }
@@ -426,6 +460,10 @@ begin_remote_xact(ConnCacheEntry *entry)
 		elog(DEBUG3, "starting remote transaction on connection %p",
 			 entry->conn);
 
+		if (UseGlobalSnapshots && (!IsolationUsesXactSnapshot() ||
+								   IsolationIsSerializable()))
+			elog(ERROR, "Global snapshots support only REPEATABLE READ");
+
 		if (IsolationIsSerializable())
 			sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
 		else
@@ -434,6 +472,23 @@ begin_remote_xact(ConnCacheEntry *entry)
 		do_sql_command(entry->conn, sql);
 		entry->xact_depth = 1;
 		entry->changing_xact_state = false;
+
+		if (UseGlobalSnapshots)
+		{
+			char import_sql[128];
+
+			/* Export our snapshot */
+			if (fdwTransState->global_csn == 0)
+				fdwTransState->global_csn = ExportGlobalSnapshot();
+
+			snprintf(import_sql, sizeof(import_sql),
+				"SELECT pg_global_snapshot_import("UINT64_FORMAT")",
+				fdwTransState->global_csn);
+
+			do_sql_command(entry->conn, import_sql);
+		}
+
+		fdwTransState->nparticipants += 1;
 	}
 
 	/*
@@ -643,6 +698,94 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
 		PQclear(res);
 }
 
+/* Callback typedef for BroadcastStmt */
+typedef bool (*BroadcastCmdResHandler) (PGresult *result, void *arg);
+
+/* Broadcast sql in parallel to all ConnectionHash entries */
+static bool
+BroadcastStmt(char const * sql, unsigned expectedStatus,
+				BroadcastCmdResHandler handler, void *arg)
+{
+	HASH_SEQ_STATUS scan;
+	ConnCacheEntry *entry;
+	bool		allOk = true;
+
+	/* Broadcast sql */
+	hash_seq_init(&scan, ConnectionHash);
+	while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+	{
+		pgfdw_reject_incomplete_xact_state_change(entry);
+
+		if (entry->xact_depth > 0 && entry->conn != NULL)
+		{
+			if (!PQsendQuery(entry->conn, sql))
+			{
+				PGresult   *res = PQgetResult(entry->conn);
+
+				elog(WARNING, "Failed to send command %s", sql);
+				pgfdw_report_error(WARNING, res, entry->conn, true, sql);
+				PQclear(res);
+			}
+		}
+	}
+
+	/* Collect responses */
+	hash_seq_init(&scan, ConnectionHash);
+	while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+	{
+		if (entry->xact_depth > 0 && entry->conn != NULL)
+		{
+			PGresult   *result = PQgetResult(entry->conn);
+
+			if (PQresultStatus(result) != expectedStatus ||
+				(handler && !handler(result, arg)))
+			{
+				elog(WARNING, "Failed command %s: status=%d, expected status=%d", sql, PQresultStatus(result), expectedStatus);
+				pgfdw_report_error(ERROR, result, entry->conn, true, sql);
+				allOk = false;
+			}
+			PQclear(result);
+			PQgetResult(entry->conn);	/* consume NULL result */
+		}
+	}
+
+	return allOk;
+}
+
+/* Wrapper for broadcasting commands */
+static bool
+BroadcastCmd(char const *sql)
+{
+	return BroadcastStmt(sql, PGRES_COMMAND_OK, NULL, NULL);
+}
+
+/* Wrapper for broadcasting statements */
+static bool
+BroadcastFunc(char const *sql)
+{
+	return BroadcastStmt(sql, PGRES_TUPLES_OK, NULL, NULL);
+}
+
+/* Callback for selecting maximal csn */
+static bool
+MaxCsnCB(PGresult *result, void *arg)
+{
+	char		   *resp;
+	GlobalCSN	   *max_csn = (GlobalCSN *) arg;
+	GlobalCSN		csn = 0;
+
+	resp = PQgetvalue(result, 0, 0);
+
+	if (resp == NULL || (*resp) == '\0' ||
+			sscanf(resp, UINT64_FORMAT, &csn) != 1)
+		return false;
+
+	if (*max_csn < csn)
+		*max_csn = csn;
+
+	return true;
+}
+
 /*
  * pgfdw_xact_callback --- cleanup at main-transaction end.
  */
@@ -656,6 +799,86 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 	if (!xact_got_connection)
 		return;
 
+	/* Handle possible two-phase commit */
+	if (event == XACT_EVENT_PARALLEL_PRE_COMMIT || event == XACT_EVENT_PRE_COMMIT)
+	{
+		bool include_local_tx = false;
+
+		/* Should we take into account this node? */
+		if (TransactionIdIsValid(GetCurrentTransactionIdIfAny()))
+		{
+			include_local_tx = true;
+			fdwTransState->nparticipants += 1;
+		}
+
+		/* Switch to 2PC mode there were more than one participant */
+		if (UseGlobalSnapshots && fdwTransState->nparticipants > 1)
+			fdwTransState->two_phase_commit = true;
+
+		if (fdwTransState->two_phase_commit)
+		{
+			GlobalCSN	max_csn = InProgressGlobalCSN,
+						my_csn = InProgressGlobalCSN;
+			bool	res;
+			char   *sql;
+
+			fdwTransState->gid = psprintf("pgfdw:%lld:%llu:%d:%u:%d:%d",
+										  (long long) GetCurrentTimestamp(),
+										  (long long) GetSystemIdentifier(),
+										  MyProcPid,
+										  GetCurrentTransactionIdIfAny(),
+										  ++two_phase_xact_count,
+										  fdwTransState->nparticipants);
+
+			/* Broadcast PREPARE */
+			sql = psprintf("PREPARE TRANSACTION '%s'", fdwTransState->gid);
+			res = BroadcastCmd(sql);
+			if (!res)
+				goto error;
+
+			/* Broadcast pg_global_snapshot_prepare() */
+			if (include_local_tx)
+				my_csn = GlobalSnapshotPrepareCurrent();
+
+			sql = psprintf("SELECT pg_global_snapshot_prepare('%s')",
+														fdwTransState->gid);
+			res = BroadcastStmt(sql, PGRES_TUPLES_OK, MaxCsnCB, &max_csn);
+			if (!res)
+				goto error;
+
+			/* select maximal global csn */
+			if (include_local_tx && my_csn > max_csn)
+				max_csn = my_csn;
+
+			/* Broadcast pg_global_snapshot_assign() */
+			if (include_local_tx)
+				GlobalSnapshotAssignCsnCurrent(max_csn);
+			sql = psprintf("SELECT pg_global_snapshot_assign('%s',"UINT64_FORMAT")",
+							fdwTransState->gid, max_csn);
+			res = BroadcastFunc(sql);
+
+error:
+			if (!res)
+			{
+				sql = psprintf("ABORT PREPARED '%s'", fdwTransState->gid);
+				BroadcastCmd(sql);
+				elog(ERROR, "Failed to PREPARE transaction on remote node");
+			}
+
+			/*
+			 * Do not fall down. Consequent COMMIT event will clean thing up.
+			 */
+			return;
+		}
+	}
+
+	/* COMMIT open transaction of we were doing 2PC */
+	if (fdwTransState->two_phase_commit &&
+		(event == XACT_EVENT_PARALLEL_COMMIT || event == XACT_EVENT_COMMIT))
+	{
+		BroadcastCmd(psprintf("COMMIT PREPARED '%s'", fdwTransState->gid));
+	}
+
 	/*
 	 * Scan all connection cache entries to find open remote transactions, and
 	 * close them.
@@ -663,8 +886,6 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 	hash_seq_init(&scan, ConnectionHash);
 	while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
 	{
-		PGresult   *res;
-
 		/* Ignore cache entry if no open connection right now */
 		if (entry->conn == NULL)
 			continue;
@@ -681,6 +902,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 			{
 				case XACT_EVENT_PARALLEL_PRE_COMMIT:
 				case XACT_EVENT_PRE_COMMIT:
+					Assert(!fdwTransState->two_phase_commit);
 
 					/*
 					 * If abort cleanup previously failed for this connection,
@@ -693,28 +915,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 					do_sql_command(entry->conn, "COMMIT TRANSACTION");
 					entry->changing_xact_state = false;
 
-					/*
-					 * If there were any errors in subtransactions, and we
-					 * made prepared statements, do a DEALLOCATE ALL to make
-					 * sure we get rid of all prepared statements. This is
-					 * annoying and not terribly bulletproof, but it's
-					 * probably not worth trying harder.
-					 *
-					 * DEALLOCATE ALL only exists in 8.3 and later, so this
-					 * constrains how old a server postgres_fdw can
-					 * communicate with.  We intentionally ignore errors in
-					 * the DEALLOCATE, so that we can hobble along to some
-					 * extent with older servers (leaking prepared statements
-					 * as we go; but we don't really support update operations
-					 * pre-8.3 anyway).
-					 */
-					if (entry->have_prep_stmt && entry->have_error)
-					{
-						res = PQexec(entry->conn, "DEALLOCATE ALL");
-						PQclear(res);
-					}
-					entry->have_prep_stmt = false;
-					entry->have_error = false;
+					deallocate_prepared_stmts(entry);
 					break;
 				case XACT_EVENT_PRE_PREPARE:
 
@@ -729,10 +930,15 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 					 */
 					ereport(ERROR,
 							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-							 errmsg("cannot prepare a transaction that modified remote tables")));
+								errmsg("cannot prepare a transaction that modified remote tables")));
 					break;
 				case XACT_EVENT_PARALLEL_COMMIT:
 				case XACT_EVENT_COMMIT:
+					if (fdwTransState->two_phase_commit)
+						deallocate_prepared_stmts(entry);
+					else /* Pre-commit should have closed the open transaction */
+						elog(ERROR, "missed cleaning up connection during pre-commit");
+					break;
 				case XACT_EVENT_PREPARE:
 					/* Pre-commit should have closed the open transaction */
 					elog(ERROR, "missed cleaning up connection during pre-commit");
@@ -828,6 +1034,38 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 
 	/* Also reset cursor numbering for next transaction */
 	cursor_number = 0;
+
+	/* Reset fdwTransState */
+	memset(fdwTransState, '\0', sizeof(FdwTransactionState));
+}
+
+/*
+ * If there were any errors in subtransactions, and we
+ * made prepared statements, do a DEALLOCATE ALL to make
+ * sure we get rid of all prepared statements. This is
+ * annoying and not terribly bulletproof, but it's
+ * probably not worth trying harder.
+ *
+ * DEALLOCATE ALL only exists in 8.3 and later, so this
+ * constrains how old a server postgres_fdw can
+ * communicate with.  We intentionally ignore errors in
+ * the DEALLOCATE, so that we can hobble along to some
+ * extent with older servers (leaking prepared statements
+ * as we go; but we don't really support update operations
+ * pre-8.3 anyway).
+ */
+static void
+deallocate_prepared_stmts(ConnCacheEntry *entry)
+{
+	PGresult   *res;
+
+	if (entry->have_prep_stmt && entry->have_error)
+	{
+		res = PQexec(entry->conn, "DEALLOCATE ALL");
+		PQclear(res);
+	}
+	entry->have_prep_stmt = false;
+	entry->have_error = false;
 }
 
 /*
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 5699252091..64279c8664 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -268,6 +268,9 @@ typedef struct
 	List	   *already_used;	/* expressions already dealt with */
 } ec_member_foreign_arg;
 
+bool		UseGlobalSnapshots;
+void		_PG_init(void);
+
 /*
  * SQL functions
  */
@@ -5806,3 +5809,12 @@ find_em_expr_for_rel(EquivalenceClass *ec, RelOptInfo *rel)
 	/* We didn't find any suitable equivalence class expression */
 	return NULL;
 }
+
+void
+_PG_init(void)
+{
+	DefineCustomBoolVariable("postgres_fdw.use_global_snapshots",
+							 "Use global snapshots for FDW transactions", NULL,
+							 &UseGlobalSnapshots, false, PGC_USERSET, 0, NULL,
+							 NULL, NULL);
+}
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index 70b538e2f9..8cf5b12798 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -186,4 +186,6 @@ extern const char *get_jointype_name(JoinType jointype);
 extern bool is_builtin(Oid objectId);
 extern bool is_shippable(Oid objectId, Oid classId, PgFdwRelationInfo *fpinfo);
 
+extern bool UseGlobalSnapshots;
+
 #endif							/* POSTGRES_FDW_H */
diff --git a/contrib/postgres_fdw/t/001_bank_coordinator.pl b/contrib/postgres_fdw/t/001_bank_coordinator.pl
new file mode 100644
index 0000000000..1e31f33349
--- /dev/null
+++ b/contrib/postgres_fdw/t/001_bank_coordinator.pl
@@ -0,0 +1,264 @@
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 3;
+
+my $master = get_new_node("master");
+$master->init;
+$master->append_conf('postgresql.conf', qq(
+	max_prepared_transactions = 30
+	log_checkpoints = true
+	postgres_fdw.use_global_snapshots = on
+	track_global_snapshots = on
+	default_transaction_isolation = 'REPEATABLE READ'
+));
+$master->start;
+
+my $shard1 = get_new_node("shard1");
+$shard1->init;
+$shard1->append_conf('postgresql.conf', qq(
+	max_prepared_transactions = 30
+	global_snapshot_defer_time = 15
+	track_global_snapshots = on
+));
+$shard1->start;
+
+my $shard2 = get_new_node("shard2");
+$shard2->init;
+$shard2->append_conf('postgresql.conf', qq(
+	max_prepared_transactions = 30
+	global_snapshot_defer_time = 15
+	track_global_snapshots = on
+));
+$shard2->start;
+
+###############################################################################
+# Prepare nodes
+###############################################################################
+
+$master->safe_psql('postgres', qq[
+	CREATE EXTENSION postgres_fdw;
+	CREATE TABLE accounts(id integer primary key, amount integer);
+	CREATE TABLE global_transactions(tx_time timestamp);
+]);
+
+foreach my $node ($shard1, $shard2)
+{
+	my $port = $node->port;
+	my $host = $node->host;
+
+	$node->safe_psql('postgres',
+			"CREATE TABLE accounts(id integer primary key, amount integer)");
+
+	$master->safe_psql('postgres', qq[
+		CREATE SERVER shard_$port FOREIGN DATA WRAPPER postgres_fdw options(dbname 'postgres', host '$host', port '$port');
+		CREATE FOREIGN TABLE accounts_fdw_$port() inherits (accounts) server shard_$port options(table_name 'accounts');
+		CREATE USER MAPPING for CURRENT_USER SERVER shard_$port;
+	])
+}
+
+$shard1->safe_psql('postgres', qq[
+	insert into accounts select 2*id-1, 0 from generate_series(1, 10010) as id;
+	CREATE TABLE local_transactions(tx_time timestamp);
+]);
+
+$shard2->safe_psql('postgres', qq[
+	insert into accounts select 2*id, 0 from generate_series(1, 10010) as id;
+	CREATE TABLE local_transactions(tx_time timestamp);
+]);
+
+diag("master: @{[$master->connstr('postgres')]}");
+diag("shard1: @{[$shard1->connstr('postgres')]}");
+diag("shard2: @{[$shard2->connstr('postgres')]}");
+
+###############################################################################
+# pgbench scripts
+###############################################################################
+
+my $bank = File::Temp->new();
+append_to_file($bank, q{
+	\set id random(1, 20000)
+	BEGIN;
+	WITH upd AS (UPDATE accounts SET amount = amount - 1 WHERE id = :id RETURNING *)
+		INSERT into global_transactions SELECT now() FROM upd;
+	UPDATE accounts SET amount = amount + 1 WHERE id = (:id + 1);
+	COMMIT;
+});
+
+my $bank1 = File::Temp->new();
+append_to_file($bank1, q{
+	\set id random(1, 10000)
+	BEGIN;
+	WITH upd AS (UPDATE accounts SET amount = amount - 1 WHERE id = (2*:id + 1) RETURNING *)
+		INSERT into local_transactions SELECT now() FROM upd;
+	UPDATE accounts SET amount = amount + 1 WHERE id = (2*:id + 3);
+	COMMIT;
+});
+
+my $bank2 = File::Temp->new();
+append_to_file($bank2, q{
+	\set id random(1, 10000)
+
+	BEGIN;
+	WITH upd AS (UPDATE accounts SET amount = amount - 1 WHERE id = 2*:id RETURNING *)
+		INSERT into local_transactions SELECT now() FROM upd;
+	UPDATE accounts SET amount = amount + 1 WHERE id = (2*:id + 2);
+	COMMIT;
+});
+
+###############################################################################
+# Helpers
+###############################################################################
+
+sub count_and_delete_rows
+{
+	my ($node, $table) = @_;
+	my $count;
+
+	$count = $node->safe_psql('postgres',"select count(*) from $table");
+	$node->safe_psql('postgres',"delete from $table");
+	diag($node->name, ": completed $count transactions");
+	return $count;
+}
+
+###############################################################################
+# Concurrent global transactions
+###############################################################################
+
+my ($err, $rc);
+my $started;
+my $seconds = 30;
+my $selects;
+my $total = '0';
+my $oldtotal = '0';
+my $isolation_errors = 0;
+
+
+my $pgb_handle;
+
+$pgb_handle = $master->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' );
+
+$started = time();
+$selects = 0;
+while (time() - $started < $seconds)
+{
+	$total = $master->safe_psql('postgres', "select sum(amount) from accounts");
+	if ( ($total ne $oldtotal) and ($total ne '') )
+	{
+		$isolation_errors++;
+		$oldtotal = $total;
+		diag("Isolation error. Total = $total");
+	}
+	if ($total ne '') { $selects++; }
+}
+
+$master->pgbench_await($pgb_handle);
+
+# sanity check
+diag("completed $selects selects");
+die "no actual transactions happend" unless ( $selects > 0 &&
+	count_and_delete_rows($master, 'global_transactions') > 0);
+
+is($isolation_errors, 0, 'isolation between concurrent global transaction');
+
+###############################################################################
+# Concurrent global and local transactions
+###############################################################################
+
+my ($pgb_handle1, $pgb_handle2, $pgb_handle3);
+
+# global txses
+$pgb_handle1 = $master->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' );
+
+# concurrent local
+$pgb_handle2 = $shard1->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank1, 'postgres' );
+$pgb_handle3 = $shard2->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank2, 'postgres' );
+
+$started = time();
+$selects = 0;
+$oldtotal = 0;
+while (time() - $started < $seconds)
+{
+	$total = $master->safe_psql('postgres', "select sum(amount) from accounts");
+	if ( ($total ne $oldtotal) and ($total ne '') )
+	{
+		$isolation_errors++;
+		$oldtotal = $total;
+		diag("Isolation error. Total = $total");
+	}
+	if ($total ne '') { $selects++; }
+}
+
+diag("selects = $selects");
+$master->pgbench_await($pgb_handle1);
+$shard1->pgbench_await($pgb_handle2);
+$shard2->pgbench_await($pgb_handle3);
+
+diag("completed $selects selects");
+die "" unless ( $selects > 0 &&
+	count_and_delete_rows($master, 'global_transactions') > 0 &&
+	count_and_delete_rows($shard1, 'local_transactions') > 0 &&
+	count_and_delete_rows($shard2, 'local_transactions') > 0);
+
+is($isolation_errors, 0, 'isolation between concurrent global and local transactions');
+
+
+###############################################################################
+# Snapshot stability
+###############################################################################
+
+my ($hashes, $hash1, $hash2);
+my $stability_errors = 0;
+
+# global txses
+$pgb_handle1 = $master->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' );
+# concurrent local
+$pgb_handle2 = $shard1->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank1, 'postgres' );
+$pgb_handle3 = $shard2->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank2, 'postgres' );
+
+$selects = 0;
+$started = time();
+while (time() - $started < $seconds)
+{
+	foreach my $node ($master, $shard1, $shard2)
+	{
+		($hash1, $_, $hash2) = split "\n", $node->safe_psql('postgres', qq[
+			begin isolation level repeatable read;
+			select md5(array_agg((t.*)::text)::text) from (select * from accounts order by id) as t;
+			select pg_sleep(3);
+			select md5(array_agg((t.*)::text)::text) from (select * from accounts order by id) as t;
+			commit;
+		]);
+
+		if ($hash1 ne $hash2)
+		{
+			diag("oops");
+			$stability_errors++;
+		}
+		elsif ($hash1 eq '' or $hash2 eq '')
+		{
+			die;
+		}
+		else
+		{
+			$selects++;
+		}
+	}
+}
+
+$master->pgbench_await($pgb_handle1);
+$shard1->pgbench_await($pgb_handle2);
+$shard2->pgbench_await($pgb_handle3);
+
+die "" unless ( $selects > 0 &&
+	count_and_delete_rows($master, 'global_transactions') > 0 &&
+	count_and_delete_rows($shard1, 'local_transactions') > 0 &&
+	count_and_delete_rows($shard2, 'local_transactions') > 0);
+
+is($stability_errors, 0, 'snapshot is stable during concurrent global and local transactions');
+
+$master->stop;
+$shard1->stop;
+$shard2->stop;
diff --git a/contrib/postgres_fdw/t/002_bank_participant.pl b/contrib/postgres_fdw/t/002_bank_participant.pl
new file mode 100644
index 0000000000..bf80d21d7a
--- /dev/null
+++ b/contrib/postgres_fdw/t/002_bank_participant.pl
@@ -0,0 +1,240 @@
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 3;
+
+my $shard1 = get_new_node("shard1");
+$shard1->init;
+$shard1->append_conf('postgresql.conf', qq(
+	max_prepared_transactions = 30
+	postgres_fdw.use_global_snapshots = on
+	global_snapshot_defer_time = 15
+	track_global_snapshots = on
+    default_transaction_isolation = 'REPEATABLE READ'
+));
+$shard1->start;
+
+my $shard2 = get_new_node("shard2");
+$shard2->init;
+$shard2->append_conf('postgresql.conf', qq(
+	max_prepared_transactions = 30
+	postgres_fdw.use_global_snapshots = on
+	global_snapshot_defer_time = 15
+	track_global_snapshots = on
+	default_transaction_isolation = 'REPEATABLE READ'
+));
+$shard2->start;
+
+###############################################################################
+# Prepare nodes
+###############################################################################
+
+my @shards = ($shard1, $shard2);
+
+foreach my $node (@shards)
+{
+	$node->safe_psql('postgres', qq[
+		CREATE EXTENSION postgres_fdw;
+		CREATE TABLE accounts(id integer primary key, amount integer);
+		CREATE TABLE accounts_local() inherits(accounts);
+		CREATE TABLE global_transactions(tx_time timestamp);
+		CREATE TABLE local_transactions(tx_time timestamp);
+	]);
+
+	foreach my $neighbor (@shards)
+	{
+		next if ($neighbor eq $node);
+
+		my $port = $neighbor->port;
+		my $host = $neighbor->host;
+
+		$node->safe_psql('postgres', qq[
+			CREATE SERVER shard_$port FOREIGN DATA WRAPPER postgres_fdw
+					options(dbname 'postgres', host '$host', port '$port');
+			CREATE FOREIGN TABLE accounts_fdw_$port() inherits (accounts)
+					server shard_$port options(table_name 'accounts_local');
+			CREATE USER MAPPING for CURRENT_USER SERVER shard_$port;
+		]);
+	}
+}
+
+$shard1->psql('postgres', "insert into accounts_local select 2*id-1, 0 from generate_series(1, 10010) as id;");
+$shard2->psql('postgres', "insert into accounts_local select 2*id,   0 from generate_series(1, 10010) as id;");
+
+###############################################################################
+# pgbench scripts
+###############################################################################
+
+my $bank = File::Temp->new();
+append_to_file($bank, q{
+	\set id random(1, 20000)
+	BEGIN;
+	WITH upd AS (UPDATE accounts SET amount = amount - 1 WHERE id = :id RETURNING *)
+		INSERT into global_transactions SELECT now() FROM upd;
+	UPDATE accounts SET amount = amount + 1 WHERE id = (:id + 1);
+	COMMIT;
+});
+
+###############################################################################
+# Helpers
+###############################################################################
+
+sub count_and_delete_rows
+{
+	my ($node, $table) = @_;
+	my $count;
+
+	$count = $node->safe_psql('postgres',"select count(*) from $table");
+	$node->safe_psql('postgres',"delete from $table");
+	diag($node->name, ": completed $count transactions");
+	return $count;
+}
+
+###############################################################################
+# Concurrent global transactions
+###############################################################################
+
+my ($err, $rc);
+my $started;
+my $seconds = 30;
+my $selects;
+my $total = '0';
+my $oldtotal = '0';
+my $isolation_errors = 0;
+my $i;
+
+
+my ($pgb_handle1, $pgb_handle2);
+
+$pgb_handle1 = $shard1->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' );
+$pgb_handle2 = $shard2->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' );
+
+$started = time();
+$selects = 0;
+$i = 0;
+while (time() - $started < $seconds)
+{
+	my $shard = $shard1;
+	foreach my $shard (@shards)
+	{
+		$total = $shard->safe_psql('postgres', "select sum(amount) from accounts");
+		if ( ($total ne $oldtotal) and ($total ne '') )
+		{
+			$isolation_errors++;
+			$oldtotal = $total;
+			diag("$i: Isolation error. Total = $total");
+		}
+		if ($total ne '') { $selects++; }
+	}
+	$i++;
+}
+
+$shard1->pgbench_await($pgb_handle1);
+$shard2->pgbench_await($pgb_handle2);
+
+# sanity check
+diag("completed $selects selects");
+die "no actual transactions happend" unless ( $selects > 0 &&
+	count_and_delete_rows($shard1, 'global_transactions') > 0 &&
+	count_and_delete_rows($shard2, 'global_transactions') > 0);
+
+is($isolation_errors, 0, 'isolation between concurrent global transaction');
+
+###############################################################################
+# And do the same after soft restart
+###############################################################################
+
+$shard1->restart;
+$shard2->restart;
+$shard1->poll_query_until('postgres', "select 't'")
+	or die "Timed out waiting for shard1 to became online";
+$shard2->poll_query_until('postgres', "select 't'")
+	or die "Timed out waiting for shard2 to became online";
+
+$seconds = 15;
+$pgb_handle1 = $shard1->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' );
+$pgb_handle2 = $shard2->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' );
+
+$started = time();
+$selects = 0;
+$i = 0;
+
+while (time() - $started < $seconds)
+{
+	my $shard = $shard1;
+	foreach my $shard (@shards)
+	{
+		$total = $shard->safe_psql('postgres', "select sum(amount) from accounts");
+		if ( ($total ne $oldtotal) and ($total ne '') )
+		{
+			$isolation_errors++;
+			$oldtotal = $total;
+			diag("$i: Isolation error. Total = $total");
+		}
+		if ($total ne '') { $selects++; }
+	}
+	$i++;
+}
+
+$shard1->pgbench_await($pgb_handle1);
+$shard2->pgbench_await($pgb_handle2);
+
+# sanity check
+diag("completed $selects selects");
+die "no actual transactions happend" unless ( $selects > 0 &&
+	count_and_delete_rows($shard1, 'global_transactions') > 0 &&
+	count_and_delete_rows($shard2, 'global_transactions') > 0);
+
+is($isolation_errors, 0, 'isolation between concurrent global transaction after restart');
+
+###############################################################################
+# And do the same after hard restart
+###############################################################################
+
+$shard1->teardown_node;
+$shard2->teardown_node;
+$shard1->start;
+$shard2->start;
+$shard1->poll_query_until('postgres', "select 't'")
+	or die "Timed out waiting for shard1 to became online";
+$shard2->poll_query_until('postgres', "select 't'")
+	or die "Timed out waiting for shard2 to became online";
+
+
+$seconds = 15;
+$pgb_handle1 = $shard1->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' );
+$pgb_handle2 = $shard2->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' );
+
+$started = time();
+$selects = 0;
+$i = 0;
+
+while (time() - $started < $seconds)
+{
+	my $shard = $shard1;
+	foreach my $shard (@shards)
+	{
+		$total = $shard->safe_psql('postgres', "select sum(amount) from accounts");
+		if ( ($total ne $oldtotal) and ($total ne '') )
+		{
+			$isolation_errors++;
+			$oldtotal = $total;
+			diag("$i: Isolation error. Total = $total");
+		}
+		if ($total ne '') { $selects++; }
+	}
+	$i++;
+}
+
+$shard1->pgbench_await($pgb_handle1);
+$shard2->pgbench_await($pgb_handle2);
+
+# sanity check
+diag("completed $selects selects");
+die "no actual transactions happend" unless ( $selects > 0 &&
+	count_and_delete_rows($shard1, 'global_transactions') > 0 &&
+	count_and_delete_rows($shard2, 'global_transactions') > 0);
+
+is($isolation_errors, 0, 'isolation between concurrent global transaction after hard restart');
diff --git a/src/test/perl/PostgresNode.pm b/src/test/perl/PostgresNode.pm
index 79fb457075..d72a28f661 100644
--- a/src/test/perl/PostgresNode.pm
+++ b/src/test/perl/PostgresNode.pm
@@ -1796,6 +1796,37 @@ sub pg_recvlogical_upto
 	}
 }
 
+sub pgbench()
+{
+	my ($self, $node, @args) = @_;
+	my $pgbench_handle = $self->pgbench_async($node, @args);
+	$self->pgbench_await($pgbench_handle);
+}
+
+sub pgbench_async()
+{
+	my ($self, @args) = @_;
+
+	my ($in, $out, $err, $rc);
+	$in = '';
+	$out = '';
+
+	my @pgbench_command = (
+		'pgbench',
+		-h => $self->host,
+		-p => $self->port,
+		@args
+	);
+	my $handle = IPC::Run::start(\@pgbench_command, $in, $out);
+	return $handle;
+}
+
+sub pgbench_await()
+{
+	my ($self, $pgbench_handle) = @_;
+	IPC::Run::finish($pgbench_handle) || BAIL_OUT("pgbench exited with $?");
+}
+
 =pod
 
 =back
-- 
2.11.0

