From 4000408863fca43b1e79c55d638525c0cd04f92a Mon Sep 17 00:00:00 2001
From: Stas Kelvich <stanconn@gmail.com>
Date: Wed, 25 Apr 2018 16:22:30 +0300
Subject: [PATCH 2/3] Global snapshots

---
 src/backend/access/transam/Makefile           |   2 +-
 src/backend/access/transam/global_snapshot.c  | 754 ++++++++++++++++++++++++++
 src/backend/access/transam/twophase.c         | 156 ++++++
 src/backend/access/transam/xact.c             |  29 +
 src/backend/access/transam/xlog.c             |   2 +
 src/backend/storage/ipc/ipci.c                |   3 +
 src/backend/storage/ipc/procarray.c           |  93 +++-
 src/backend/storage/lmgr/lwlocknames.txt      |   1 +
 src/backend/storage/lmgr/proc.c               |   5 +
 src/backend/utils/misc/guc.c                  |  13 +-
 src/backend/utils/misc/postgresql.conf.sample |   2 +
 src/backend/utils/time/snapmgr.c              | 103 ++++
 src/backend/utils/time/tqual.c                |  65 ++-
 src/include/access/global_snapshot.h          |  72 +++
 src/include/access/twophase.h                 |   1 +
 src/include/catalog/pg_proc.dat               |  14 +
 src/include/datatype/timestamp.h              |   3 +
 src/include/fmgr.h                            |   1 +
 src/include/portability/instr_time.h          |  10 +
 src/include/storage/proc.h                    |  15 +
 src/include/storage/procarray.h               |   8 +
 src/include/utils/snapmgr.h                   |   3 +
 src/include/utils/snapshot.h                  |   8 +
 23 files changed, 1355 insertions(+), 8 deletions(-)
 create mode 100644 src/backend/access/transam/global_snapshot.c
 create mode 100644 src/include/access/global_snapshot.h

diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index 03aa360ea3..8ef677cada 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -12,7 +12,7 @@ subdir = src/backend/access/transam
 top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = clog.o commit_ts.o global_csn_log.o generic_xlog.o \
+OBJS = clog.o commit_ts.o global_csn_log.o global_snapshot.o generic_xlog.o \
 	multixact.o parallel.o rmgr.o slru.o \
 	subtrans.o timeline.o transam.o twophase.o twophase_rmgr.o varsup.o \
 	xact.o xlog.o xlogarchive.o xlogfuncs.o \
diff --git a/src/backend/access/transam/global_snapshot.c b/src/backend/access/transam/global_snapshot.c
new file mode 100644
index 0000000000..b9d6c56334
--- /dev/null
+++ b/src/backend/access/transam/global_snapshot.c
@@ -0,0 +1,754 @@
+/*-------------------------------------------------------------------------
+ *
+ * global_snapshot.c
+ *		Support for cross-node snapshot isolation.
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/access/transam/global_snapshot.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/global_csn_log.h"
+#include "access/global_snapshot.h"
+#include "access/transam.h"
+#include "access/twophase.h"
+#include "access/xact.h"
+#include "portability/instr_time.h"
+#include "storage/lmgr.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
+#include "storage/shmem.h"
+#include "storage/spin.h"
+#include "utils/builtins.h"
+#include "utils/guc.h"
+#include "utils/snapmgr.h"
+#include "miscadmin.h"
+
+/* Raise a warning if imported global_csn exceeds ours by this value. */
+#define SNAP_DESYNC_COMPLAIN (1*NSECS_PER_SEC) /* 1 second */
+
+/*
+ * GlobalSnapshotState
+ *
+ * Do not trust local clocks to be strictly monotonical and save last acquired
+ * value so later we can compare next timestamp with it. Accessed through
+ * GlobalSnapshotGenerate() and GlobalSnapshotSync().
+ */
+typedef struct
+{
+	GlobalCSN		 last_global_csn;
+	volatile slock_t lock;
+} GlobalSnapshotState;
+
+static GlobalSnapshotState *gsState;
+
+
+/*
+ * GUC to delay advance of oldestXid for this amount of time. Also determines
+ * the size GlobalSnapshotXidMap circular buffer.
+ */
+int global_snapshot_defer_time;
+
+/*
+ * Enables this module.
+ */
+extern bool track_global_snapshots;
+
+/*
+ * GlobalSnapshotXidMap
+ *
+ * To be able to install global snapshot that points to past we need to keep
+ * old versions of tuples and therefore delay advance of oldestXid.  Here we
+ * keep track of correspondence between snapshot's global_csn and oldestXid
+ * that was set at the time when the snapshot was taken.  Much like the
+ * snapshot too old's OldSnapshotControlData does, but with finer granularity
+ * to seconds.
+ *
+ * Different strategies can be employed to hold oldestXid (e.g. we can track
+ * oldest global_csn-based snapshot among cluster nodes and map it oldestXid
+ * on each node) but here implemented one that tries to avoid cross-node
+ * communications which are tricky in case of postgres_fdw.
+ *
+ * On each snapshot acquisition GlobalSnapshotMapXmin() is called and stores
+ * correspondence between current global_csn and oldestXmin in a sparse way:
+ * global_csn is rounded to seconds (and here we use the fact that global_csn
+ * is just a timestamp) and oldestXmin is stored in the circular buffer where
+ * rounded global_csn acts as an offset from current circular buffer head.
+ * Size of the circular buffer is controlled by global_snapshot_defer_time GUC.
+ *
+ * When global snapshot arrives from different node we check that its
+ * global_csn is still in our map, otherwise we'll error out with "snapshot too
+ * old" message.  If global_csn is successfully mapped to oldestXid we move
+ * backend's pgxact->xmin to proc->originalXmin and fill pgxact->xmin to
+ * mapped oldestXid.  That way GetOldestXmin() can take into account backends
+ * with imported global snapshot and old tuple versions will be preserved.
+ *
+ * Also while calculating oldestXmin for our map in presence of imported
+ * global snapshots we should use proc->originalXmin instead of pgxact->xmin
+ * that was set during import.  Otherwise, we can create a feedback loop:
+ * xmin's of imported global snapshots were calculated using our map and new
+ * entries in map going to be calculated based on that xmin's, and there is
+ * a risk to stuck forever with one non-increasing oldestXmin.  All other
+ * callers of GetOldestXmin() are using pgxact->xmin so the old tuple versions
+ * are preserved.
+ */
+typedef struct GlobalSnapshotXidMap
+{
+	int				 head;				/* offset of current freshest value */
+	int				 size;				/* total size of circular buffer */
+	GlobalCSN_atomic last_csn_seconds;	/* last rounded global_csn that changed
+										 * xmin_by_second[] */
+	TransactionId   *xmin_by_second;	/* circular buffer of oldestXmin's */
+}
+GlobalSnapshotXidMap;
+
+static GlobalSnapshotXidMap *gsXidMap;
+
+
+/* Estimate shared memory space needed */
+Size
+GlobalSnapshotShmemSize(void)
+{
+	Size	size = 0;
+
+	if (track_global_snapshots || global_snapshot_defer_time > 0)
+	{
+		size += MAXALIGN(sizeof(GlobalSnapshotState));
+	}
+
+	if (global_snapshot_defer_time > 0)
+	{
+		size += sizeof(GlobalSnapshotXidMap);
+		size += global_snapshot_defer_time*sizeof(TransactionId);
+		size = MAXALIGN(size);
+	}
+
+	return size;
+}
+
+/* Init shared memory structures */
+void
+GlobalSnapshotShmemInit()
+{
+	bool found;
+
+	if (track_global_snapshots || global_snapshot_defer_time > 0)
+	{
+		gsState = ShmemInitStruct("gsState",
+								sizeof(GlobalSnapshotState),
+								&found);
+		if (!found)
+		{
+			gsState->last_global_csn = 0;
+			SpinLockInit(&gsState->lock);
+		}
+	}
+
+	if (global_snapshot_defer_time > 0)
+	{
+		gsXidMap = ShmemInitStruct("gsXidMap",
+								   sizeof(GlobalSnapshotXidMap),
+								   &found);
+		if (!found)
+		{
+			int i;
+
+			pg_atomic_init_u64(&gsXidMap->last_csn_seconds, 0);
+			gsXidMap->head = 0;
+			gsXidMap->size = global_snapshot_defer_time;
+			gsXidMap->xmin_by_second =
+							ShmemAlloc(sizeof(TransactionId)*gsXidMap->size);
+
+			for (i = 0; i < gsXidMap->size; i++)
+				gsXidMap->xmin_by_second[i] = InvalidTransactionId;
+		}
+	}
+}
+
+/*
+ * GlobalSnapshotStartup
+ *
+ * Set gsXidMap entries to oldestActiveXID during startup.
+ */
+void
+GlobalSnapshotStartup(TransactionId oldestActiveXID)
+{
+	/*
+	 * Run only if we have initialized shared memory and gsXidMap
+	 * is enabled.
+	 */
+	if (IsNormalProcessingMode() && global_snapshot_defer_time > 0)
+	{
+		int i;
+
+		Assert(TransactionIdIsValid(oldestActiveXID));
+		for (i = 0; i < gsXidMap->size; i++)
+			gsXidMap->xmin_by_second[i] = oldestActiveXID;
+		ProcArraySetGlobalSnapshotXmin(oldestActiveXID);
+	}
+}
+
+/*
+ * GlobalSnapshotMapXmin
+ *
+ * Maintain circular buffer of oldestXmins for several seconds in past. This
+ * buffer allows to shift oldestXmin in the past when backend is importing
+ * global transaction. Otherwise old versions of tuples that were needed for
+ * this transaction can be recycled by other processes (vacuum, HOT, etc).
+ *
+ * Locking here is not trivial. Called upon each snapshot creation after
+ * ProcArrayLock is released. Such usage creates several race conditions. It
+ * is possible that backend who got global_csn called GlobalSnapshotMapXmin()
+ * only after other backends managed to get snapshot and complete
+ * GlobalSnapshotMapXmin() call, or even committed. This is safe because
+ *
+ *      * We already hold our xmin in MyPgXact, so our snapshot will not be
+ * 	      harmed even though ProcArrayLock is released.
+ *
+ *		* snapshot_global_csn is always pessmistically rounded up to the next
+ *		  second.
+ *
+ *      * For performance reasons, xmin value for particular second is filled
+ *        only once. Because of that instead of writing to buffer just our
+ *        xmin (which is enough for our snapshot), we bump oldestXmin there --
+ *        it mitigates the possibility of damaging someone else's snapshot by
+ *        writing to the buffer too advanced value in case of slowness of
+ *        another backend who generated csn earlier, but didn't manage to
+ *        insert it before us.
+ *
+ *		* if GlobalSnapshotMapXmin() founds a gap in several seconds between
+ *		  current call and latest completed call then it should fill that gap
+ *		  with latest known values instead of new one. Otherwise it is
+ *		  possible (however highly unlikely) that this gap also happend
+ *		  between taking snapshot and call to GlobalSnapshotMapXmin() for some
+ *		  backend. And we are at risk to fill circullar buffer with
+ *		  oldestXmin's that are bigger then they actually were.
+ */
+void
+GlobalSnapshotMapXmin(GlobalCSN snapshot_global_csn)
+{
+	int offset, gap, i;
+	GlobalCSN csn_seconds;
+	GlobalCSN last_csn_seconds;
+	volatile TransactionId oldest_deferred_xmin;
+	TransactionId current_oldest_xmin, previous_oldest_xmin;
+
+	/* Callers should check config values */
+	Assert(global_snapshot_defer_time > 0);
+	Assert(gsXidMap != NULL);
+
+	/*
+	 * Round up global_csn to the next second -- pessimistically and safely.
+	 */
+	csn_seconds = (snapshot_global_csn / NSECS_PER_SEC + 1);
+
+	/*
+	 * Fast-path check. Avoid taking exclusive GlobalSnapshotXidMapLock lock
+	 * if oldestXid was already written to xmin_by_second[] for this rounded
+	 * global_csn.
+	 */
+	if (pg_atomic_read_u64(&gsXidMap->last_csn_seconds) >= csn_seconds)
+		return;
+
+	/* Ok, we have new entry (or entries) */
+	LWLockAcquire(GlobalSnapshotXidMapLock, LW_EXCLUSIVE);
+
+	/* Re-check last_csn_seconds under lock */
+	last_csn_seconds = pg_atomic_read_u64(&gsXidMap->last_csn_seconds);
+	if (last_csn_seconds >= csn_seconds)
+	{
+		LWLockRelease(GlobalSnapshotXidMapLock);
+		return;
+	}
+	pg_atomic_write_u64(&gsXidMap->last_csn_seconds, csn_seconds);
+
+	/*
+	 * Count oldest_xmin.
+	 *
+	 * It was possible to calculate oldest_xmin during corresponding snapshot
+	 * creation, but GetSnapshotData() intentionally reads only PgXact, but not
+	 * PgProc. And we need info about originalXmin (see comment to gsXidMap)
+	 * which is stored in PgProc because of threats in comments around PgXact
+	 * about extending it with new fields. So just calculate oldest_xmin again,
+	 * that anyway happens quite rarely.
+	 */
+	current_oldest_xmin = GetOldestXmin(NULL, PROCARRAY_NON_IMPORTED_XMIN);
+
+	previous_oldest_xmin = gsXidMap->xmin_by_second[gsXidMap->head];
+
+	Assert(TransactionIdIsNormal(current_oldest_xmin));
+	Assert(TransactionIdIsNormal(previous_oldest_xmin));
+
+	gap = csn_seconds - last_csn_seconds;
+	offset = csn_seconds % gsXidMap->size;
+
+	/* Sanity check before we update head and gap */
+	Assert( gap >= 1 );
+	Assert( (gsXidMap->head + gap) % gsXidMap->size == offset );
+
+	gap = gap > gsXidMap->size ? gsXidMap->size : gap;
+	gsXidMap->head = offset;
+
+	/* Fill new entry with current_oldest_xmin */
+	gsXidMap->xmin_by_second[offset] = current_oldest_xmin;
+
+	/*
+	 * If we have gap then fill it with previous_oldest_xmin for reasons
+	 * outlined in comment above this function.
+	 */
+	for (i = 1; i < gap; i++)
+	{
+		offset = (offset + gsXidMap->size - 1) % gsXidMap->size;
+		gsXidMap->xmin_by_second[offset] = previous_oldest_xmin;
+	}
+
+	oldest_deferred_xmin =
+		gsXidMap->xmin_by_second[ (gsXidMap->head + 1) % gsXidMap->size ];
+
+	LWLockRelease(GlobalSnapshotXidMapLock);
+
+	/*
+	 * Advance procArray->global_snapshot_xmin after we released
+	 * GlobalSnapshotXidMapLock. Since we gather not xmin but oldestXmin, it
+	 * never goes backwards regardless of how slow we can do that.
+	 */
+	Assert(TransactionIdFollowsOrEquals(oldest_deferred_xmin,
+										ProcArrayGetGlobalSnapshotXmin()));
+	ProcArraySetGlobalSnapshotXmin(oldest_deferred_xmin);
+}
+
+
+/*
+ * GlobalSnapshotToXmin
+ *
+ * Get oldestXmin that took place when snapshot_global_csn was taken.
+ */
+TransactionId
+GlobalSnapshotToXmin(GlobalCSN snapshot_global_csn)
+{
+	TransactionId xmin;
+	GlobalCSN csn_seconds;
+	volatile GlobalCSN last_csn_seconds;
+
+	/* Callers should check config values */
+	Assert(global_snapshot_defer_time > 0);
+	Assert(gsXidMap != NULL);
+
+	/* Round down to get conservative estimates */
+	csn_seconds = (snapshot_global_csn / NSECS_PER_SEC);
+
+	LWLockAcquire(GlobalSnapshotXidMapLock, LW_SHARED);
+	last_csn_seconds = pg_atomic_read_u64(&gsXidMap->last_csn_seconds);
+	if (csn_seconds > last_csn_seconds)
+	{
+		/* we don't have entry for this global_csn yet, return latest known */
+		xmin = gsXidMap->xmin_by_second[gsXidMap->head];
+	}
+	else if (last_csn_seconds - csn_seconds < gsXidMap->size)
+	{
+		/* we are good, retrieve value from our map */
+		Assert(last_csn_seconds % gsXidMap->size == gsXidMap->head);
+		xmin = gsXidMap->xmin_by_second[csn_seconds % gsXidMap->size];
+	}
+	else
+	{
+		/* requested global_csn is too old, let caller know */
+		xmin = InvalidTransactionId;
+	}
+	LWLockRelease(GlobalSnapshotXidMapLock);
+
+	return xmin;
+}
+
+/*
+ * GlobalSnapshotGenerate
+ *
+ * Generate GlobalCSN which is actually a local time. Also we are forcing
+ * this time to be always increasing. Since now it is not uncommon to have
+ * millions of read transactions per second we are trying to use nanoseconds
+ * if such time resolution is available.
+ */
+GlobalCSN
+GlobalSnapshotGenerate(bool locked)
+{
+	instr_time	current_time;
+	GlobalCSN	global_csn;
+
+	Assert(track_global_snapshots || global_snapshot_defer_time > 0);
+
+	/*
+	 * TODO: create some macro that add small random shift to current time.
+	 */
+	INSTR_TIME_SET_CURRENT(current_time);
+	global_csn = (GlobalCSN) INSTR_TIME_GET_NANOSEC(current_time);
+
+	/* TODO: change to atomics? */
+	if (!locked)
+		SpinLockAcquire(&gsState->lock);
+
+	if (global_csn <= gsState->last_global_csn)
+		global_csn = ++gsState->last_global_csn;
+	else
+		gsState->last_global_csn = global_csn;
+
+	if (!locked)
+		SpinLockRelease(&gsState->lock);
+
+	return global_csn;
+}
+
+/*
+ * GlobalSnapshotSync
+ *
+ * Due to time desynchronization on different nodes we can receive global_csn
+ * which is greater than global_csn on this node. To preserve proper isolation
+ * this node needs to wait when such global_csn comes on local clock.
+ *
+ * This should happend relatively rare if nodes have running NTP/PTP/etc.
+ * Complain if wait time is more than SNAP_SYNC_COMPLAIN.
+ */
+void
+GlobalSnapshotSync(GlobalCSN remote_gcsn)
+{
+	GlobalCSN	local_gcsn;
+	GlobalCSN	delta;
+
+	Assert(track_global_snapshots);
+
+	for(;;)
+	{
+		SpinLockAcquire(&gsState->lock);
+		if (gsState->last_global_csn > remote_gcsn)
+		{
+			/* Everything is fine */
+			SpinLockRelease(&gsState->lock);
+			return;
+		}
+		else if ((local_gcsn = GlobalSnapshotGenerate(true)) >= remote_gcsn)
+		{
+			/*
+			 * Everything is fine too, but last_global_csn wasn't updated for
+			 * some time.
+			 */
+			SpinLockRelease(&gsState->lock);
+			return;
+		}
+		SpinLockRelease(&gsState->lock);
+
+		/* Okay we need to sleep now */
+		delta = remote_gcsn - local_gcsn;
+		if (delta > SNAP_DESYNC_COMPLAIN)
+			ereport(WARNING,
+				(errmsg("remote global snapshot exceeds ours by more than a second"),
+				 errhint("Consider running NTPd on servers participating in global transaction")));
+
+		/* TODO: report this sleeptime somewhere? */
+		pg_usleep((long) (delta/NSECS_PER_USEC));
+
+		/*
+		 * Loop that checks to ensure that we actually slept for specified
+		 * amount of time.
+		 */
+	}
+
+	Assert(false); /* Should not happend */
+	return;
+}
+
+/*
+ * TransactionIdGetGlobalCSN
+ *
+ * Get GlobalCSN for specified TransactionId taking care about special xids,
+ * xids beyond TransactionXmin and InDoubt states.
+ */
+GlobalCSN
+TransactionIdGetGlobalCSN(TransactionId xid)
+{
+	GlobalCSN global_csn;
+
+	Assert(track_global_snapshots);
+
+	/* Handle permanent TransactionId's for which we don't have mapping */
+	if (!TransactionIdIsNormal(xid))
+	{
+		if (xid == InvalidTransactionId)
+			return AbortedGlobalCSN;
+		if (xid == FrozenTransactionId || xid == BootstrapTransactionId)
+			return FrozenGlobalCSN;
+		Assert(false); /* Should not happend */
+	}
+
+	/*
+	 * For xids which less then TransactionXmin GlobalCSNLog can be already
+	 * trimmed but we know that such transaction is definetly not concurrently
+	 * running according to any snapshot including timetravel ones. Callers
+	 * should check TransactionDidCommit after.
+	 */
+	if (TransactionIdPrecedes(xid, TransactionXmin))
+		return FrozenGlobalCSN;
+
+	/* Read GlobalCSN from SLRU */
+	global_csn = GlobalCSNLogGetCSN(xid);
+
+	/*
+	 * If we faced InDoubt state then transaction is beeing committed and we
+	 * should wait until GlobalCSN will be assigned so that visibility check
+	 * could decide whether tuple is in snapshot. See also comments in
+	 * GlobalSnapshotPrecommit().
+	 */
+	if (GlobalCSNIsInDoubt(global_csn))
+	{
+		XactLockTableWait(xid, NULL, NULL, XLTW_None);
+		global_csn = GlobalCSNLogGetCSN(xid);
+		Assert(GlobalCSNIsNormal(global_csn) ||
+				GlobalCSNIsAborted(global_csn));
+	}
+
+	Assert(GlobalCSNIsNormal(global_csn) ||
+			GlobalCSNIsInProgress(global_csn) ||
+			GlobalCSNIsAborted(global_csn));
+
+	return global_csn;
+}
+
+/*
+ * XidInvisibleInGlobalSnapshot
+ *
+ * Version of XidInMVCCSnapshot for global transactions. For non-imported
+ * global snapshots this should give same results as XidInLocalMVCCSnapshot
+ * (except that aborts will be shown as invisible without going to clog) and to
+ * ensure such behaviour XidInMVCCSnapshot is coated with asserts that checks
+ * identicalness of XidInvisibleInGlobalSnapshot/XidInLocalMVCCSnapshot in
+ * case of ordinary snapshot.
+ */
+bool
+XidInvisibleInGlobalSnapshot(TransactionId xid, Snapshot snapshot)
+{
+	GlobalCSN csn;
+
+	Assert(track_global_snapshots);
+
+	csn = TransactionIdGetGlobalCSN(xid);
+
+	if (GlobalCSNIsNormal(csn))
+	{
+		if (csn < snapshot->global_csn)
+			return false;
+		else
+			return true;
+	}
+	else if (GlobalCSNIsFrozen(csn))
+	{
+		/* It is bootstrap or frozen transaction */
+		return false;
+	}
+	else
+	{
+		/* It is aborted or in-progress */
+		Assert(GlobalCSNIsAborted(csn) || GlobalCSNIsInProgress(csn));
+		if (GlobalCSNIsAborted(csn))
+			Assert(TransactionIdDidAbort(xid));
+		return true;
+	}
+}
+
+
+/*****************************************************************************
+ * Functions to handle distributed commit on transaction coordinator:
+ * GlobalSnapshotPrepareCurrent() / GlobalSnapshotAssignCsnCurrent().
+ * Correspoding functions for remote nodes are defined in twophase.c:
+ * pg_global_snapshot_prepare/pg_global_snapshot_assign.
+ *****************************************************************************/
+
+
+/*
+ * GlobalSnapshotPrepareCurrent
+ *
+ * Set InDoubt state for currently active transaction and return commit's
+ * global snapshot.
+ */
+GlobalCSN
+GlobalSnapshotPrepareCurrent()
+{
+	TransactionId xid = GetCurrentTransactionIdIfAny();
+
+	if (!track_global_snapshots)
+		ereport(ERROR,
+			(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				errmsg("could not prepare transaction for global commit"),
+				errhint("Make sure the configuration parameter \"%s\" is enabled.",
+						"track_global_snapshots")));
+
+	if (TransactionIdIsValid(xid))
+	{
+		TransactionId *subxids;
+		int nsubxids = xactGetCommittedChildren(&subxids);
+		GlobalCSNLogSetCSN(xid, nsubxids,
+									subxids, InDoubtGlobalCSN);
+	}
+
+	/* Nothing to write if we don't heve xid */
+
+	return GlobalSnapshotGenerate(false);
+}
+
+/*
+ * GlobalSnapshotAssignCsnCurrent
+ *
+ * Asign GlobalCSN for currently active transaction. GlobalCSN is supposedly
+ * maximal among of values returned by GlobalSnapshotPrepareCurrent and
+ * pg_global_snapshot_prepare.
+ */
+void
+GlobalSnapshotAssignCsnCurrent(GlobalCSN global_csn)
+{
+	if (!track_global_snapshots)
+		ereport(ERROR,
+			(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				errmsg("could not prepare transaction for global commit"),
+				errhint("Make sure the configuration parameter \"%s\" is enabled.",
+						"track_global_snapshots")));
+
+	if (!GlobalCSNIsNormal(global_csn))
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("pg_global_snapshot_assign expects normal global_csn")));
+
+	/* Skip emtpty transactions */
+	if (!TransactionIdIsValid(GetCurrentTransactionIdIfAny()))
+		return;
+
+	/* Set global_csn and defuse ProcArrayEndTransaction from assigning one */
+	pg_atomic_write_u64(&MyProc->assignedGlobalCsn, global_csn);
+}
+
+
+/*****************************************************************************
+ * Functions to handle global and local transactions commit.
+ *
+ * For local transactions GlobalSnapshotPrecommit sets InDoubt state before
+ * ProcArrayEndTransaction is called and transaction data potetntially becomes
+ * visible to other backends. ProcArrayEndTransaction (or ProcArrayRemove in
+ * twophase case) then acquires global_csn under ProcArray lock and stores it
+ * in proc->assignedGlobalCsn. It's important that global_csn for commit is
+ * generated under ProcArray lock, otherwise global and local snapshots won't
+ * be equivalent. Consequent call to GlobalSnapshotCommit will write
+ * proc->assignedGlobalCsn to GlobalCSNLog.
+ *
+ * Same rules applies to global transaction, except that global_csn is already
+ * assigned by GlobalSnapshotAssignCsnCurrent/pg_global_snapshot_assign and
+ * GlobalSnapshotPrecommit is basically no-op.
+ *
+ * GlobalSnapshotAbort is slightly different comparing to commit because abort
+ * can skip InDoubt phase and can be called for transaction subtree.
+ *****************************************************************************/
+
+
+/*
+ * GlobalSnapshotAbort
+ *
+ * Abort transaction in GlobalCsnLog. We can skip InDoubt state for aborts
+ * since no concurrent transactions allowed to see aborted data anyway.
+ */
+void
+GlobalSnapshotAbort(PGPROC *proc, TransactionId xid,
+					int nsubxids, TransactionId *subxids)
+{
+	if (!track_global_snapshots)
+		return;
+
+	GlobalCSNLogSetCSN(xid, nsubxids, subxids, AbortedGlobalCSN);
+
+	/*
+	 * Clean assignedGlobalCsn anyway, as it was possibly set in
+	 * GlobalSnapshotAssignCsnCurrent.
+	 */
+	pg_atomic_write_u64(&proc->assignedGlobalCsn, InProgressGlobalCSN);
+}
+
+/*
+ * GlobalSnapshotPrecommit
+ *
+ * Set InDoubt status for local transaction that we are going to commit.
+ * This step is needed to achieve consistency between local snapshots and
+ * global csn-based snapshots. We don't hold ProcArray lock while writing
+ * csn for transaction in SLRU but instead we set InDoubt status before
+ * transaction is deleted from ProcArray so the readers who will read csn
+ * in the gap between ProcArray removal and GlobalCSN assignment can wait
+ * until GlobalCSN is finally assigned. See also TransactionIdGetGlobalCSN().
+ *
+ * For global transaction this does nothing as InDoubt state was written
+ * earlier.
+ *
+ * This should be called only from parallel group leader before backend is
+ * deleted from ProcArray.
+ */
+void
+GlobalSnapshotPrecommit(PGPROC *proc, TransactionId xid,
+					int nsubxids, TransactionId *subxids)
+{
+	GlobalCSN oldAssignedGlobalCsn = InProgressGlobalCSN;
+	bool in_progress;
+
+	if (!track_global_snapshots)
+		return;
+
+	/* Set InDoubt status if it is local transaction */
+	in_progress = pg_atomic_compare_exchange_u64(&proc->assignedGlobalCsn,
+												 &oldAssignedGlobalCsn,
+												 InDoubtGlobalCSN);
+	if (in_progress)
+	{
+		Assert(GlobalCSNIsInProgress(oldAssignedGlobalCsn));
+		GlobalCSNLogSetCSN(xid, nsubxids,
+						   subxids, InDoubtGlobalCSN);
+	}
+	else
+	{
+		/* Otherwise we should have valid GlobalCSN by this time */
+		Assert(GlobalCSNIsNormal(oldAssignedGlobalCsn));
+		/* Also global transaction should already be in InDoubt state */
+		Assert(GlobalCSNIsInDoubt(GlobalCSNLogGetCSN(xid)));
+	}
+}
+
+/*
+ * GlobalSnapshotCommit
+ *
+ * Write GlobalCSN that were acquired earlier to GlobalCsnLog. Should be
+ * preceded by GlobalSnapshotPrecommit() so readers can wait until we finally
+ * finished writing to SLRU.
+ *
+ * Should be called after ProcArrayEndTransaction, but before releasing
+ * transaction locks, so that TransactionIdGetGlobalCSN can wait on this
+ * lock for GlobalCSN.
+ */
+void
+GlobalSnapshotCommit(PGPROC *proc, TransactionId xid,
+					int nsubxids, TransactionId *subxids)
+{
+	volatile GlobalCSN assigned_global_csn;
+
+	if (!track_global_snapshots)
+		return;
+
+	if (!TransactionIdIsValid(xid))
+	{
+		assigned_global_csn = pg_atomic_read_u64(&proc->assignedGlobalCsn);
+		Assert(GlobalCSNIsInProgress(assigned_global_csn));
+		return;
+	}
+
+	/* Finally write resulting GlobalCSN in SLRU */
+	assigned_global_csn = pg_atomic_read_u64(&proc->assignedGlobalCsn);
+	Assert(GlobalCSNIsNormal(assigned_global_csn));
+	GlobalCSNLogSetCSN(xid, nsubxids,
+						   subxids, assigned_global_csn);
+
+	/* Reset for next transaction */
+	pg_atomic_write_u64(&proc->assignedGlobalCsn, InProgressGlobalCSN);
+}
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 3aee5e50c5..2fe8b93617 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -77,6 +77,7 @@
 #include <unistd.h>
 
 #include "access/commit_ts.h"
+#include "access/global_snapshot.h"
 #include "access/global_csn_log.h"
 #include "access/htup_details.h"
 #include "access/subtrans.h"
@@ -1526,9 +1527,35 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 									   hdr->nabortrels, abortrels,
 									   gid);
 
+	/*
+	 * GlobalSnapshot callbacks that should be called right before we are
+	 * going to become visible. Details in comments to this functions.
+	 */
+	if (isCommit)
+		GlobalSnapshotPrecommit(proc, xid, hdr->nsubxacts, children);
+	else
+		GlobalSnapshotAbort(proc, xid, hdr->nsubxacts, children);
+
+
 	ProcArrayRemove(proc, latestXid);
 
 	/*
+	 * Stamp our transaction with GlobalCSN in GlobalCsnLog.
+	 * Should be called after ProcArrayEndTransaction, but before releasing
+	 * transaction locks, since TransactionIdGetGlobalCSN relies on
+	 * XactLockTableWait to await global_csn.
+	 */
+	if (isCommit)
+	{
+		GlobalSnapshotCommit(proc, xid, hdr->nsubxacts, children);
+	}
+	else
+	{
+		Assert(GlobalCSNIsInProgress(
+				   pg_atomic_read_u64(&proc->assignedGlobalCsn)));
+	}
+
+	/*
 	 * In case we fail while running the callbacks, mark the gxact invalid so
 	 * no one else will try to commit/rollback, and so it will be recycled if
 	 * we fail after this point.  It is still locked by our backend so it
@@ -2513,3 +2540,132 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning)
 
 	return;
 }
+
+/*
+ * GlobalSnapshotPrepareTwophase
+ *
+ * Set InDoubt state for currently active transaction and return commit's
+ * global snapshot.
+ *
+ * This function is a counterpart of GlobalSnapshotPrepareCurrent() for
+ * twophase transactions.
+ */
+static GlobalCSN
+GlobalSnapshotPrepareTwophase(const char *gid)
+{
+	GlobalTransaction gxact;
+	PGXACT	   *pgxact;
+	char	   *buf;
+	TransactionId xid;
+	xl_xact_parsed_prepare parsed;
+
+	if (!track_global_snapshots)
+		ereport(ERROR,
+			(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				errmsg("could not prepare transaction for global commit"),
+				errhint("Make sure the configuration parameter \"%s\" is enabled.",
+						"track_global_snapshots")));
+
+	/*
+	 * Validate the GID, and lock the GXACT to ensure that two backends do not
+	 * try to access the same GID at once.
+	 */
+	gxact = LockGXact(gid, GetUserId());
+	pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
+	xid = pgxact->xid;
+
+	if (gxact->ondisk)
+		buf = ReadTwoPhaseFile(xid, true);
+	else
+		XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
+
+	ParsePrepareRecord(0, buf, &parsed);
+
+	GlobalCSNLogSetCSN(xid, parsed.nsubxacts,
+					parsed.subxacts, InDoubtGlobalCSN);
+
+	/* Unlock our GXACT */
+	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
+	gxact->locking_backend = InvalidBackendId;
+	LWLockRelease(TwoPhaseStateLock);
+
+	pfree(buf);
+
+	return GlobalSnapshotGenerate(false);
+}
+
+/*
+ * SQL interface to GlobalSnapshotPrepareTwophase()
+ *
+ * TODO: Rewrite this as PREPARE TRANSACTION 'gid' RETURNING SNAPSHOT
+ */
+Datum
+pg_global_snapshot_prepare(PG_FUNCTION_ARGS)
+{
+	const char *gid = text_to_cstring(PG_GETARG_TEXT_PP(0));
+	GlobalCSN	global_csn;
+
+	global_csn = GlobalSnapshotPrepareTwophase(gid);
+
+	PG_RETURN_INT64(global_csn);
+}
+
+
+/*
+ * TwoPhaseAssignGlobalCsn
+ *
+ * Asign GlobalCSN for currently active transaction. GlobalCSN is supposedly
+ * maximal among of values returned by GlobalSnapshotPrepareCurrent and
+ * pg_global_snapshot_prepare.
+ *
+ * This function is a counterpart of GlobalSnapshotAssignCsnCurrent() for
+ * twophase transactions.
+ */
+static void
+GlobalSnapshotAssignCsnTwoPhase(const char *gid, GlobalCSN global_csn)
+{
+	GlobalTransaction gxact;
+	PGPROC	   *proc;
+
+	if (!track_global_snapshots)
+		ereport(ERROR,
+			(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				errmsg("could not prepare transaction for global commit"),
+				errhint("Make sure the configuration parameter \"%s\" is enabled.",
+						"track_global_snapshots")));
+
+	if (!GlobalCSNIsNormal(global_csn))
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("pg_global_snapshot_assign expects normal global_csn")));
+
+	/*
+	 * Validate the GID, and lock the GXACT to ensure that two backends do not
+	 * try to access the same GID at once.
+	 */
+	gxact = LockGXact(gid, GetUserId());
+	proc = &ProcGlobal->allProcs[gxact->pgprocno];
+
+	/* Set global_csn and defuse ProcArrayRemove from assigning one. */
+	pg_atomic_write_u64(&proc->assignedGlobalCsn, global_csn);
+
+	/* Unlock our GXACT */
+	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
+	gxact->locking_backend = InvalidBackendId;
+	LWLockRelease(TwoPhaseStateLock);
+}
+
+/*
+ * SQL interface to GlobalSnapshotAssignCsnTwoPhase()
+ *
+ * TODO: Rewrite this as COMMIT PREPARED 'gid' SNAPSHOT 'global_csn'
+ */
+Datum
+pg_global_snapshot_assign(PG_FUNCTION_ARGS)
+{
+	const char *gid = text_to_cstring(PG_GETARG_TEXT_PP(0));
+	GlobalCSN	global_csn = PG_GETARG_INT64(1);
+
+	GlobalSnapshotAssignCsnTwoPhase(gid, global_csn);
+	PG_RETURN_VOID();
+}
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 9aa63c8792..0086adadf1 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -21,6 +21,7 @@
 #include <unistd.h>
 
 #include "access/commit_ts.h"
+#include "access/global_snapshot.h"
 #include "access/multixact.h"
 #include "access/parallel.h"
 #include "access/subtrans.h"
@@ -1341,6 +1342,14 @@ RecordTransactionCommit(void)
 
 	/* Reset XactLastRecEnd until the next transaction writes something */
 	XactLastRecEnd = 0;
+
+	/*
+	 * Mark our transaction as InDoubt in GlobalCsnLog and get ready for
+	 * commit.
+	 */
+	if (markXidCommitted)
+		GlobalSnapshotPrecommit(MyProc, xid, nchildren, children);
+
 cleanup:
 	/* Clean up local data */
 	if (rels)
@@ -1602,6 +1611,11 @@ RecordTransactionAbort(bool isSubXact)
 	 */
 	TransactionIdAbortTree(xid, nchildren, children);
 
+	/*
+	 * Mark our transaction as Aborted in GlobalCsnLog.
+	 */
+	GlobalSnapshotAbort(MyProc, xid, nchildren, children);
+
 	END_CRIT_SECTION();
 
 	/* Compute latestXid while we have the child XIDs handy */
@@ -2060,6 +2074,21 @@ CommitTransaction(void)
 	ProcArrayEndTransaction(MyProc, latestXid);
 
 	/*
+	 * Stamp our transaction with GlobalCSN in GlobalCsnLog.
+	 * Should be called after ProcArrayEndTransaction, but before releasing
+	 * transaction locks.
+	 */
+	if (!is_parallel_worker)
+	{
+		TransactionId  xid = GetTopTransactionIdIfAny();
+		TransactionId *subxids;
+		int			   nsubxids;
+
+		nsubxids = xactGetCommittedChildren(&subxids);
+		GlobalSnapshotCommit(MyProc, xid, nsubxids, subxids);
+	}
+
+	/*
 	 * This is all post-commit cleanup.  Note that if an error is raised here,
 	 * it's too late to abort the transaction.  This should be just
 	 * noncritical resource releasing.
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index ca0d934c76..edb0d07aca 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7070,6 +7070,7 @@ StartupXLOG(void)
 			StartupCLOG();
 			StartupGlobalCSNLog(oldestActiveXID);
 			StartupSUBTRANS(oldestActiveXID);
+			GlobalSnapshotStartup(oldestActiveXID);
 
 			/*
 			 * If we're beginning at a shutdown checkpoint, we know that
@@ -7869,6 +7870,7 @@ StartupXLOG(void)
 		StartupCLOG();
 		StartupGlobalCSNLog(oldestActiveXID);
 		StartupSUBTRANS(oldestActiveXID);
+		GlobalSnapshotStartup(oldestActiveXID);
 	}
 
 	/*
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 2af468fc6a..3a04f6a824 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -22,6 +22,7 @@
 #include "access/nbtree.h"
 #include "access/subtrans.h"
 #include "access/twophase.h"
+#include "access/global_snapshot.h"
 #include "commands/async.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -147,6 +148,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
 		size = add_size(size, WalSndShmemSize());
 		size = add_size(size, WalRcvShmemSize());
 		size = add_size(size, ApplyLauncherShmemSize());
+		size = add_size(size, GlobalSnapshotShmemSize());
 		size = add_size(size, SnapMgrShmemSize());
 		size = add_size(size, BTreeShmemSize());
 		size = add_size(size, SyncScanShmemSize());
@@ -273,6 +275,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
 	SyncScanShmemInit();
 	AsyncShmemInit();
 	BackendRandomShmemInit();
+	GlobalSnapshotShmemInit();
 
 #ifdef EXEC_BACKEND
 
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 64ab249615..fb7f74d4cd 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -47,6 +47,7 @@
 
 #include "access/clog.h"
 #include "access/global_csn_log.h"
+#include "access/global_snapshot.h"
 #include "access/subtrans.h"
 #include "access/transam.h"
 #include "access/twophase.h"
@@ -91,6 +92,8 @@ typedef struct ProcArrayStruct
 	TransactionId replication_slot_xmin;
 	/* oldest catalog xmin of any replication slot */
 	TransactionId replication_slot_catalog_xmin;
+	/* xmin of oldest active global snapshot */
+	TransactionId global_snapshot_xmin;
 
 	/* indexes into allPgXact[], has PROCARRAY_MAXPROCS entries */
 	int			pgprocnos[FLEXIBLE_ARRAY_MEMBER];
@@ -246,6 +249,7 @@ CreateSharedProcArray(void)
 		procArray->lastOverflowedXid = InvalidTransactionId;
 		procArray->replication_slot_xmin = InvalidTransactionId;
 		procArray->replication_slot_catalog_xmin = InvalidTransactionId;
+		procArray->global_snapshot_xmin = InvalidTransactionId;
 	}
 
 	allProcs = ProcGlobal->allProcs;
@@ -352,6 +356,17 @@ ProcArrayRemove(PGPROC *proc, TransactionId latestXid)
 		if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
 								  latestXid))
 			ShmemVariableCache->latestCompletedXid = latestXid;
+
+		/*
+		 * Assign global csn while holding ProcArrayLock for non-global
+		 * COMMIT PREPARED. After lock is released consequent
+		 * GlobalSnapshotCommit() will write this value to GlobalCsnLog.
+		 *
+		 * In case of global commit proc->assignedGlobalCsn is already set
+		 * by prior AssignGlobalCsn().
+		 */
+		if (GlobalCSNIsInDoubt(pg_atomic_read_u64(&proc->assignedGlobalCsn)))
+			pg_atomic_write_u64(&proc->assignedGlobalCsn, GlobalSnapshotGenerate(false));
 	}
 	else
 	{
@@ -432,6 +447,8 @@ ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
 
 		proc->lxid = InvalidLocalTransactionId;
 		pgxact->xmin = InvalidTransactionId;
+		proc->originalXmin = InvalidTransactionId;
+
 		/* must be cleared with xid/xmin: */
 		pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
 		pgxact->delayChkpt = false; /* be sure this is cleared in abort */
@@ -454,6 +471,8 @@ ProcArrayEndTransactionInternal(PGPROC *proc, PGXACT *pgxact,
 	pgxact->xid = InvalidTransactionId;
 	proc->lxid = InvalidLocalTransactionId;
 	pgxact->xmin = InvalidTransactionId;
+	proc->originalXmin = InvalidTransactionId;
+
 	/* must be cleared with xid/xmin: */
 	pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
 	pgxact->delayChkpt = false; /* be sure this is cleared in abort */
@@ -467,6 +486,20 @@ ProcArrayEndTransactionInternal(PGPROC *proc, PGXACT *pgxact,
 	if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
 							  latestXid))
 		ShmemVariableCache->latestCompletedXid = latestXid;
+
+	/*
+	 * Assign global csn while holding ProcArrayLock for non-global
+	 * COMMIT. After lock is released consequent GlobalSnapshotFinish() will
+	 * write this value to GlobalCsnLog.
+	 *
+	 * In case of global commit MyProc->assignedGlobalCsn is already set
+	 * by prior AssignGlobalCsn().
+	 *
+	 * TODO: in case of group commit we can generate one GlobalSnapshot for
+	 * whole group to save time on timestamp aquisition.
+	 */
+	if (GlobalCSNIsInDoubt(pg_atomic_read_u64(&proc->assignedGlobalCsn)))
+		pg_atomic_write_u64(&proc->assignedGlobalCsn, GlobalSnapshotGenerate(false));
 }
 
 /*
@@ -616,6 +649,7 @@ ProcArrayClearTransaction(PGPROC *proc)
 	pgxact->xid = InvalidTransactionId;
 	proc->lxid = InvalidLocalTransactionId;
 	pgxact->xmin = InvalidTransactionId;
+	proc->originalXmin = InvalidTransactionId;
 	proc->recoveryConflictPending = false;
 
 	/* redundant, but just in case */
@@ -1320,6 +1354,7 @@ GetOldestXmin(Relation rel, int flags)
 
 	volatile TransactionId replication_slot_xmin = InvalidTransactionId;
 	volatile TransactionId replication_slot_catalog_xmin = InvalidTransactionId;
+	volatile TransactionId global_snapshot_xmin = InvalidTransactionId;
 
 	/*
 	 * If we're not computing a relation specific limit, or if a shared
@@ -1356,8 +1391,9 @@ GetOldestXmin(Relation rel, int flags)
 			proc->databaseId == MyDatabaseId ||
 			proc->databaseId == 0)	/* always include WalSender */
 		{
-			/* Fetch xid just once - see GetNewTransactionId */
+			/* Fetch both xids just once - see GetNewTransactionId */
 			TransactionId xid = pgxact->xid;
+			TransactionId original_xmin = proc->originalXmin;
 
 			/* First consider the transaction's own Xid, if any */
 			if (TransactionIdIsNormal(xid) &&
@@ -1370,8 +1406,17 @@ GetOldestXmin(Relation rel, int flags)
 			 * We must check both Xid and Xmin because a transaction might
 			 * have an Xmin but not (yet) an Xid; conversely, if it has an
 			 * Xid, that could determine some not-yet-set Xmin.
+			 *
+			 * In case of oldestXmin calculation for GlobalSnapshotMapXmin()
+			 * pgxact->xmin should be changed to proc->originalXmin. Details
+			 * in commets to GlobalSnapshotMapXmin.
 			 */
-			xid = pgxact->xmin; /* Fetch just once */
+			if ((flags & PROCARRAY_NON_IMPORTED_XMIN) &&
+					TransactionIdIsValid(original_xmin))
+				xid = original_xmin;
+			else
+				xid = pgxact->xmin; /* Fetch just once */
+
 			if (TransactionIdIsNormal(xid) &&
 				TransactionIdPrecedes(xid, result))
 				result = xid;
@@ -1381,6 +1426,7 @@ GetOldestXmin(Relation rel, int flags)
 	/* fetch into volatile var while ProcArrayLock is held */
 	replication_slot_xmin = procArray->replication_slot_xmin;
 	replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin;
+	global_snapshot_xmin = procArray->global_snapshot_xmin;
 
 	if (RecoveryInProgress())
 	{
@@ -1422,6 +1468,11 @@ GetOldestXmin(Relation rel, int flags)
 			result = FirstNormalTransactionId;
 	}
 
+	if (!(flags & PROCARRAY_NON_IMPORTED_XMIN) &&
+		TransactionIdIsValid(global_snapshot_xmin) &&
+		NormalTransactionIdPrecedes(global_snapshot_xmin, result))
+		result = global_snapshot_xmin;
+
 	/*
 	 * Check whether there are replication slots requiring an older xmin.
 	 */
@@ -1517,6 +1568,8 @@ GetSnapshotData(Snapshot snapshot)
 	bool		suboverflowed = false;
 	volatile TransactionId replication_slot_xmin = InvalidTransactionId;
 	volatile TransactionId replication_slot_catalog_xmin = InvalidTransactionId;
+	volatile TransactionId global_snapshot_xmin = InvalidTransactionId;
+	volatile GlobalCSN	   global_csn = FrozenGlobalCSN;
 
 	Assert(snapshot != NULL);
 
@@ -1705,10 +1758,18 @@ GetSnapshotData(Snapshot snapshot)
 	/* fetch into volatile var while ProcArrayLock is held */
 	replication_slot_xmin = procArray->replication_slot_xmin;
 	replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin;
+	global_snapshot_xmin = procArray->global_snapshot_xmin;
 
 	if (!TransactionIdIsValid(MyPgXact->xmin))
 		MyPgXact->xmin = TransactionXmin = xmin;
 
+	/*
+	 * Take GlobalCSN under ProcArrayLock so the local/global snapshot stays
+	 * synchronized.
+	 */
+	if (track_global_snapshots)
+		global_csn = GlobalSnapshotGenerate(false);
+
 	LWLockRelease(ProcArrayLock);
 
 	/*
@@ -1724,6 +1785,10 @@ GetSnapshotData(Snapshot snapshot)
 	if (!TransactionIdIsNormal(RecentGlobalXmin))
 		RecentGlobalXmin = FirstNormalTransactionId;
 
+	if (TransactionIdIsValid(global_snapshot_xmin) &&
+		TransactionIdPrecedes(global_snapshot_xmin, RecentGlobalXmin))
+		RecentGlobalXmin = global_snapshot_xmin;
+
 	/* Check whether there's a replication slot requiring an older xmin. */
 	if (TransactionIdIsValid(replication_slot_xmin) &&
 		NormalTransactionIdPrecedes(replication_slot_xmin, RecentGlobalXmin))
@@ -1779,6 +1844,12 @@ GetSnapshotData(Snapshot snapshot)
 		MaintainOldSnapshotTimeMapping(snapshot->whenTaken, xmin);
 	}
 
+	snapshot->imported_global_csn = false;
+	snapshot->global_csn = global_csn;
+	/* if (global_snapshot_defer_time > 0 && IsNormalProcessingMode()) */
+	if (global_snapshot_defer_time > 0 && IsUnderPostmaster)
+		GlobalSnapshotMapXmin(snapshot->global_csn);
+
 	return snapshot;
 }
 
@@ -3007,6 +3078,24 @@ ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
 	LWLockRelease(ProcArrayLock);
 }
 
+/*
+ * ProcArraySetGlobalSnapshotXmin
+ */
+void
+ProcArraySetGlobalSnapshotXmin(TransactionId xmin)
+{
+	/* We rely on atomic fetch/store of xid */
+	procArray->global_snapshot_xmin = xmin;
+}
+
+/*
+ * ProcArrayGetGlobalSnapshotXmin
+ */
+TransactionId
+ProcArrayGetGlobalSnapshotXmin(void)
+{
+	return procArray->global_snapshot_xmin;
+}
 
 #define XidCacheRemove(i) \
 	do { \
diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt
index 9615058f29..8ef731d560 100644
--- a/src/backend/storage/lmgr/lwlocknames.txt
+++ b/src/backend/storage/lmgr/lwlocknames.txt
@@ -51,3 +51,4 @@ BackendRandomLock					43
 LogicalRepWorkerLock				44
 CLogTruncationLock					45
 GlobalCSNLogControlLock				46
+GlobalSnapshotXidMapLock			47
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 6f30e082b2..ed497185be 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -37,6 +37,7 @@
 
 #include "access/transam.h"
 #include "access/twophase.h"
+#include "access/global_snapshot.h"
 #include "access/xact.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -417,6 +418,9 @@ InitProcess(void)
 	MyProc->clogGroupMemberLsn = InvalidXLogRecPtr;
 	pg_atomic_init_u32(&MyProc->clogGroupNext, INVALID_PGPROCNO);
 
+	MyProc->originalXmin = InvalidTransactionId;
+	pg_atomic_init_u64(&MyProc->assignedGlobalCsn, InProgressGlobalCSN);
+
 	/*
 	 * Acquire ownership of the PGPROC's latch, so that we can use WaitLatch
 	 * on it.  That allows us to repoint the process latch, which so far
@@ -559,6 +563,7 @@ InitAuxiliaryProcess(void)
 	MyProc->lwWaitMode = 0;
 	MyProc->waitLock = NULL;
 	MyProc->waitProcLock = NULL;
+	MyProc->originalXmin = InvalidTransactionId;
 #ifdef USE_ASSERT_CHECKING
 	{
 		int			i;
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 2701528c55..434b672ee2 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -28,6 +28,7 @@
 
 #include "access/commit_ts.h"
 #include "access/gin.h"
+#include "access/global_snapshot.h"
 #include "access/rmgr.h"
 #include "access/transam.h"
 #include "access/twophase.h"
@@ -1024,7 +1025,7 @@ static struct config_bool ConfigureNamesBool[] =
 			gettext_noop("Used to achieve REPEATEBLE READ isolation level for postgres_fdw transactions.")
 		},
 		&track_global_snapshots,
-		true, /* XXX: set true to simplify tesing. XXX2: Seems that RESOURCES_MEM isn't the best catagory */
+		false, /* XXX: Seems that RESOURCES_MEM isn't the best catagory */
 		NULL, NULL, NULL
 	},
 	{
@@ -2349,6 +2350,16 @@ static struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"global_snapshot_defer_time", PGC_POSTMASTER, REPLICATION_MASTER,
+			gettext_noop("Minimal age of records which allowed to be vacuumed, in seconds."),
+			NULL
+		},
+		&global_snapshot_defer_time,
+		5, 0, INT_MAX,
+		NULL, NULL, NULL
+	},
+
 	/*
 	 * See also CheckRequiredParameterValues() if this parameter changes
 	 */
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index c0d3fb8491..ed1237f5c0 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -254,6 +254,8 @@
 				# and comma-separated list of application_name
 				# from standby(s); '*' = all
 #vacuum_defer_cleanup_age = 0	# number of xacts by which cleanup is delayed
+#global_snapshot_defer_time = 0	# minimal age of records which allowed to be
+				# vacuumed, in seconds
 
 # - Standby Servers -
 
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c
index edf59efc29..d4cf0710fc 100644
--- a/src/backend/utils/time/snapmgr.c
+++ b/src/backend/utils/time/snapmgr.c
@@ -48,6 +48,7 @@
 #include <sys/stat.h>
 #include <unistd.h>
 
+#include "access/global_snapshot.h"
 #include "access/transam.h"
 #include "access/xact.h"
 #include "access/xlog.h"
@@ -245,6 +246,8 @@ typedef struct SerializedSnapshotData
 	CommandId	curcid;
 	TimestampTz whenTaken;
 	XLogRecPtr	lsn;
+	GlobalCSN	global_csn;
+	bool		imported_global_csn;
 } SerializedSnapshotData;
 
 Size
@@ -992,7 +995,9 @@ SnapshotResetXmin(void)
 										pairingheap_first(&RegisteredSnapshots));
 
 	if (TransactionIdPrecedes(MyPgXact->xmin, minSnapshot->xmin))
+	{
 		MyPgXact->xmin = minSnapshot->xmin;
+	}
 }
 
 /*
@@ -2081,6 +2086,8 @@ SerializeSnapshot(Snapshot snapshot, char *start_address)
 	serialized_snapshot.curcid = snapshot->curcid;
 	serialized_snapshot.whenTaken = snapshot->whenTaken;
 	serialized_snapshot.lsn = snapshot->lsn;
+	serialized_snapshot.global_csn = snapshot->global_csn;
+	serialized_snapshot.imported_global_csn = snapshot->imported_global_csn;
 
 	/*
 	 * Ignore the SubXID array if it has overflowed, unless the snapshot was
@@ -2155,6 +2162,8 @@ RestoreSnapshot(char *start_address)
 	snapshot->curcid = serialized_snapshot.curcid;
 	snapshot->whenTaken = serialized_snapshot.whenTaken;
 	snapshot->lsn = serialized_snapshot.lsn;
+	snapshot->global_csn = serialized_snapshot.global_csn;
+	snapshot->imported_global_csn = serialized_snapshot.imported_global_csn;
 
 	/* Copy XIDs, if present. */
 	if (serialized_snapshot.xcnt > 0)
@@ -2192,3 +2201,97 @@ RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
 {
 	SetTransactionSnapshot(snapshot, NULL, InvalidPid, master_pgproc);
 }
+
+/*
+ * ExportGlobalSnapshot
+ *
+ * Export global_csn so that caller can expand this transaction to other
+ * nodes.
+ *
+ * TODO: it's better to do this through EXPORT/IMPORT SNAPSHOT syntax and
+ * add some additional checks that transaction did not yet acquired xid, but
+ * for current iteration of this patch I don't want to hack on parser.
+ */
+GlobalCSN
+ExportGlobalSnapshot()
+{
+	if (!track_global_snapshots)
+		ereport(ERROR,
+			(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+			 errmsg("could not export global snapshot"),
+			 errhint("Make sure the configuration parameter \"%s\" is enabled.",
+					 "track_global_snapshots")));
+
+	return CurrentSnapshot->global_csn;
+}
+
+/* SQL accessor to ExportGlobalSnapshot() */
+Datum
+pg_global_snapshot_export(PG_FUNCTION_ARGS)
+{
+	GlobalCSN	global_csn = ExportGlobalSnapshot();
+	PG_RETURN_UINT64(global_csn);
+}
+
+/*
+ * ImportGlobalSnapshot
+ *
+ * Import global_csn and retract this backends xmin to the value that was
+ * actual when we had such global_csn.
+ *
+ * TODO: it's better to do this through EXPORT/IMPORT SNAPSHOT syntax and
+ * add some additional checks that transaction did not yet acquired xid, but
+ * for current iteration of this patch I don't want to hack on parser.
+ */
+void
+ImportGlobalSnapshot(GlobalCSN snap_global_csn)
+{
+	volatile TransactionId xmin;
+
+	if (!track_global_snapshots)
+		ereport(ERROR,
+			(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+			 errmsg("could not import global snapshot"),
+			 errhint("Make sure the configuration parameter \"%s\" is enabled.",
+					 "track_global_snapshots")));
+
+	if (global_snapshot_defer_time <= 0)
+		ereport(ERROR,
+			(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+			 errmsg("could not import global snapshot"),
+			 errhint("Make sure the configuration parameter \"%s\" is positive.",
+					 "global_snapshot_defer_time")));
+
+	/*
+	 * Call GlobalSnapshotToXmin under ProcArrayLock to avoid situation that
+	 * resulting xmin will be evicted from map before we will set it into our
+	 * backend's xmin.
+	 */
+	LWLockAcquire(ProcArrayLock, LW_SHARED);
+	xmin = GlobalSnapshotToXmin(snap_global_csn);
+	if (!TransactionIdIsValid(xmin))
+	{
+		LWLockRelease(ProcArrayLock);
+		elog(ERROR, "GlobalSnapshotToXmin: global snapshot too old");
+	}
+	MyProc->originalXmin = MyPgXact->xmin;
+	MyPgXact->xmin = TransactionXmin = xmin;
+	LWLockRelease(ProcArrayLock);
+
+	CurrentSnapshot->xmin = xmin; /* defuse SnapshotResetXmin() */
+	CurrentSnapshot->global_csn = snap_global_csn;
+	CurrentSnapshot->imported_global_csn = true;
+	GlobalSnapshotSync(snap_global_csn);
+
+	Assert(TransactionIdPrecedesOrEquals(RecentGlobalXmin, xmin));
+	Assert(TransactionIdPrecedesOrEquals(RecentGlobalDataXmin, xmin));
+}
+
+/* SQL accessor to ImportGlobalSnapshot() */
+Datum
+pg_global_snapshot_import(PG_FUNCTION_ARGS)
+{
+	GlobalCSN	global_csn = PG_GETARG_UINT64(0);
+	ImportGlobalSnapshot(global_csn);
+	PG_RETURN_VOID();
+}
diff --git a/src/backend/utils/time/tqual.c b/src/backend/utils/time/tqual.c
index f7c4c9188c..f2fbc77fa8 100644
--- a/src/backend/utils/time/tqual.c
+++ b/src/backend/utils/time/tqual.c
@@ -63,6 +63,7 @@
 
 #include "postgres.h"
 
+#include "access/global_snapshot.h"
 #include "access/htup_details.h"
 #include "access/multixact.h"
 #include "access/subtrans.h"
@@ -1462,8 +1463,8 @@ HeapTupleIsSurelyDead(HeapTuple htup, TransactionId OldestXmin)
 }
 
 /*
- * XidInMVCCSnapshot
- *		Is the given XID still-in-progress according to the snapshot?
+ * XidInLocalMVCCSnapshot
+ *		Is the given XID still-in-progress according to the local snapshot?
  *
  * Note: GetSnapshotData never stores either top xid or subxids of our own
  * backend into a snapshot, so these xids will not be reported as "running"
@@ -1471,8 +1472,8 @@ HeapTupleIsSurelyDead(HeapTuple htup, TransactionId OldestXmin)
  * TransactionIdIsCurrentTransactionId first, except when it's known the
  * XID could not be ours anyway.
  */
-bool
-XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
+static bool
+XidInLocalMVCCSnapshot(TransactionId xid, Snapshot snapshot)
 {
 	uint32		i;
 
@@ -1584,6 +1585,62 @@ XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
 }
 
 /*
+ * XidInMVCCSnapshot
+ *
+ * Check whether this xid is in snapshot, taking into account fact that
+ * snapshot can be global. When track_global_snapshots is switched off
+ * just call XidInLocalMVCCSnapshot().
+ */
+bool
+XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
+{
+	bool in_snapshot;
+
+	if (snapshot->imported_global_csn)
+	{
+		Assert(track_global_snapshots);
+		/* No point to using snapshot info except CSN */
+		return XidInvisibleInGlobalSnapshot(xid, snapshot);
+	}
+
+	in_snapshot = XidInLocalMVCCSnapshot(xid, snapshot);
+
+	if (!track_global_snapshots)
+	{
+		Assert(GlobalCSNIsFrozen(snapshot->global_csn));
+		return in_snapshot;
+	}
+
+	if (in_snapshot)
+	{
+		/*
+		 * This xid may be already in unknown state and in that case
+		 * we must wait and recheck.
+		 *
+		 * TODO: this check can be skipped if we know for sure that there were
+		 * no global transactions when this snapshot was taken. That requires
+		 * some changes to mechanisms of global snapshots exprot/import (if
+		 * backend set xmin then we should have a-priori knowledge that this
+		 * transaction going to be global or local -- right now this is not
+		 * enforced). Leave that for future and don't complicate this patch.
+		 */
+		return XidInvisibleInGlobalSnapshot(xid, snapshot);
+	}
+	else
+	{
+#ifdef USE_ASSERT_CHECKING
+		/* Check that global snapshot gives the same results as local one */
+		if (XidInvisibleInGlobalSnapshot(xid, snapshot))
+		{
+			GlobalCSN gcsn = TransactionIdGetGlobalCSN(xid);
+			Assert(GlobalCSNIsAborted(gcsn));
+		}
+#endif
+		return false;
+	}
+}
+
+/*
  * Is the tuple really only locked?  That is, is it not updated?
  *
  * It's easy to check just infomask bits if the locker is not a multi; but
diff --git a/src/include/access/global_snapshot.h b/src/include/access/global_snapshot.h
new file mode 100644
index 0000000000..246b180cfd
--- /dev/null
+++ b/src/include/access/global_snapshot.h
@@ -0,0 +1,72 @@
+/*-------------------------------------------------------------------------
+ *
+ * global_snapshot.h
+ *	  Support for cross-node snapshot isolation.
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/global_snapshot.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef GLOBAL_SNAPSHOT_H
+#define GLOBAL_SNAPSHOT_H
+
+#include "port/atomics.h"
+#include "storage/lock.h"
+#include "utils/snapshot.h"
+#include "utils/guc.h"
+
+/*
+ * snapshot.h is used in frontend code so atomic variant of GlobalCSN type
+ * is defined here.
+ */
+typedef pg_atomic_uint64 GlobalCSN_atomic;
+
+#define InProgressGlobalCSN	 UINT64CONST(0x0)
+#define AbortedGlobalCSN	 UINT64CONST(0x1)
+#define FrozenGlobalCSN		 UINT64CONST(0x2)
+#define InDoubtGlobalCSN	 UINT64CONST(0x3)
+#define FirstNormalGlobalCSN UINT64CONST(0x4)
+
+#define GlobalCSNIsInProgress(csn)	((csn) == InProgressGlobalCSN)
+#define GlobalCSNIsAborted(csn)		((csn) == AbortedGlobalCSN)
+#define GlobalCSNIsFrozen(csn)		((csn) == FrozenGlobalCSN)
+#define GlobalCSNIsInDoubt(csn)		((csn) == InDoubtGlobalCSN)
+#define GlobalCSNIsNormal(csn)		((csn) >= FirstNormalGlobalCSN)
+
+
+extern int global_snapshot_defer_time;
+
+
+extern Size GlobalSnapshotShmemSize(void);
+extern void GlobalSnapshotShmemInit(void);
+extern void GlobalSnapshotStartup(TransactionId oldestActiveXID);
+
+extern void GlobalSnapshotMapXmin(GlobalCSN snapshot_global_csn);
+extern TransactionId GlobalSnapshotToXmin(GlobalCSN snapshot_global_csn);
+
+extern GlobalCSN GlobalSnapshotGenerate(bool locked);
+
+extern bool XidInvisibleInGlobalSnapshot(TransactionId xid, Snapshot snapshot);
+
+extern void GlobalSnapshotSync(GlobalCSN remote_gcsn);
+
+extern GlobalCSN TransactionIdGetGlobalCSN(TransactionId xid);
+
+extern GlobalCSN GlobalSnapshotPrepareGlobal(const char *gid);
+extern void GlobalSnapshotAssignCsnGlobal(const char *gid,
+										  GlobalCSN global_csn);
+
+extern GlobalCSN GlobalSnapshotPrepareCurrent(void);
+extern void GlobalSnapshotAssignCsnCurrent(GlobalCSN global_csn);
+
+extern void GlobalSnapshotAbort(PGPROC *proc, TransactionId xid, int nsubxids,
+								TransactionId *subxids);
+extern void GlobalSnapshotPrecommit(PGPROC *proc, TransactionId xid, int nsubxids,
+									TransactionId *subxids);
+extern void GlobalSnapshotCommit(PGPROC *proc, TransactionId xid, int nsubxids,
+									TransactionId *subxids);
+
+#endif							/* GLOBAL_SNAPSHOT_H */
diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h
index 0e932daa48..f8b774f393 100644
--- a/src/include/access/twophase.h
+++ b/src/include/access/twophase.h
@@ -18,6 +18,7 @@
 #include "access/xact.h"
 #include "datatype/timestamp.h"
 #include "storage/lock.h"
+#include "utils/snapshot.h"
 
 /*
  * GlobalTransactionData is defined in twophase.c; other places have no
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index a14651010f..e0e20c2e6c 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -10206,4 +10206,18 @@
   proisstrict => 'f', prorettype => 'bool', proargtypes => 'oid int4 int4 any',
   proargmodes => '{i,i,i,v}', prosrc => 'satisfies_hash_partition' },
 
+# global transaction handling
+{ oid => '3430', descr => 'export global transaction snapshot',
+  proname => 'pg_global_snapshot_export', provolatile => 'v', proparallel => 'u',
+  prorettype => 'int8', proargtypes => '', prosrc => 'pg_global_snapshot_export' },
+{ oid => '3431', descr => 'import global transaction snapshot',
+  proname => 'pg_global_snapshot_import', provolatile => 'v', proparallel => 'u',
+  prorettype => 'void', proargtypes => 'int8', prosrc => 'pg_global_snapshot_import' },
+{ oid => '3432', descr => 'prepare distributed transaction for commit, get global_csn',
+  proname => 'pg_global_snapshot_prepare', provolatile => 'v', proparallel => 'u',
+  prorettype => 'int8', proargtypes => 'text', prosrc => 'pg_global_snapshot_prepare' },
+{ oid => '3433', descr => 'assign global_csn to distributed transaction',
+  proname => 'pg_global_snapshot_assign', provolatile => 'v', proparallel => 'u',
+  prorettype => 'void', proargtypes => 'text int8', prosrc => 'pg_global_snapshot_assign' },
+
 ]
diff --git a/src/include/datatype/timestamp.h b/src/include/datatype/timestamp.h
index f5b6026ef5..75ec93b46b 100644
--- a/src/include/datatype/timestamp.h
+++ b/src/include/datatype/timestamp.h
@@ -93,6 +93,9 @@ typedef struct
 #define USECS_PER_MINUTE INT64CONST(60000000)
 #define USECS_PER_SEC	INT64CONST(1000000)
 
+#define NSECS_PER_SEC	INT64CONST(1000000000)
+#define NSECS_PER_USEC	INT64CONST(1000)
+
 /*
  * We allow numeric timezone offsets up to 15:59:59 either way from Greenwich.
  * Currently, the record holders for wackiest offsets in actual use are zones
diff --git a/src/include/fmgr.h b/src/include/fmgr.h
index 101f513ba6..3026e71f83 100644
--- a/src/include/fmgr.h
+++ b/src/include/fmgr.h
@@ -250,6 +250,7 @@ extern struct varlena *pg_detoast_datum_packed(struct varlena *datum);
 #define PG_GETARG_FLOAT4(n)  DatumGetFloat4(PG_GETARG_DATUM(n))
 #define PG_GETARG_FLOAT8(n)  DatumGetFloat8(PG_GETARG_DATUM(n))
 #define PG_GETARG_INT64(n)	 DatumGetInt64(PG_GETARG_DATUM(n))
+#define PG_GETARG_UINT64(n)	 DatumGetUInt64(PG_GETARG_DATUM(n))
 /* use this if you want the raw, possibly-toasted input datum: */
 #define PG_GETARG_RAW_VARLENA_P(n)	((struct varlena *) PG_GETARG_POINTER(n))
 /* use this if you want the input datum de-toasted: */
diff --git a/src/include/portability/instr_time.h b/src/include/portability/instr_time.h
index f968444671..ebc3836c9c 100644
--- a/src/include/portability/instr_time.h
+++ b/src/include/portability/instr_time.h
@@ -138,6 +138,9 @@ typedef struct timespec instr_time;
 #define INSTR_TIME_GET_MICROSEC(t) \
 	(((uint64) (t).tv_sec * (uint64) 1000000) + (uint64) ((t).tv_nsec / 1000))
 
+#define INSTR_TIME_GET_NANOSEC(t) \
+	(((uint64) (t).tv_sec * (uint64) 1000000000) + (uint64) ((t).tv_nsec))
+
 #else							/* !HAVE_CLOCK_GETTIME */
 
 /* Use gettimeofday() */
@@ -202,6 +205,10 @@ typedef struct timeval instr_time;
 #define INSTR_TIME_GET_MICROSEC(t) \
 	(((uint64) (t).tv_sec * (uint64) 1000000) + (uint64) (t).tv_usec)
 
+#define INSTR_TIME_GET_NANOSEC(t) \
+	(((uint64) (t).tv_sec * (uint64) 1000000000) + \
+		(uint64) (t).tv_usec * (uint64) 1000)
+
 #endif							/* HAVE_CLOCK_GETTIME */
 
 #else							/* WIN32 */
@@ -234,6 +241,9 @@ typedef LARGE_INTEGER instr_time;
 #define INSTR_TIME_GET_MICROSEC(t) \
 	((uint64) (((double) (t).QuadPart * 1000000.0) / GetTimerFrequency()))
 
+#define INSTR_TIME_GET_NANOSEC(t) \
+	((uint64) (((double) (t).QuadPart * 1000000000.0) / GetTimerFrequency()))
+
 static inline double
 GetTimerFrequency(void)
 {
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 5c19a61dcf..7e24d539cc 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -15,8 +15,10 @@
 #define _PROC_H_
 
 #include "access/clog.h"
+#include "access/global_snapshot.h"
 #include "access/xlogdefs.h"
 #include "lib/ilist.h"
+#include "utils/snapshot.h"
 #include "storage/latch.h"
 #include "storage/lock.h"
 #include "storage/pg_sema.h"
@@ -57,6 +59,7 @@ struct XidCache
 #define		PROC_IN_LOGICAL_DECODING	0x10	/* currently doing logical
 												 * decoding outside xact */
 #define		PROC_RESERVED				0x20	/* reserved for procarray */
+#define		PROC_RESERVED2				0x40	/* reserved for procarray */
 
 /* flags reset at EOXact */
 #define		PROC_VACUUM_STATE_MASK \
@@ -200,6 +203,18 @@ struct PGPROC
 	PGPROC	   *lockGroupLeader;	/* lock group leader, if I'm a member */
 	dlist_head	lockGroupMembers;	/* list of members, if I'm a leader */
 	dlist_node	lockGroupLink;	/* my member link, if I'm a member */
+
+	/*
+	 * assignedGlobalCsn holds GlobalCSN for this transaction.  It is generated
+	 * under a ProcArray lock and later is writter to a GlobalCSNLog.  This
+	 * variable defined as atomic only for case of group commit, in all other
+	 * scenarios only backend responsible for this proc entry is working with
+	 * this variable.
+	 */
+	GlobalCSN_atomic assignedGlobalCsn;
+
+	/* Original xmin of this backend before global snapshot was imported */
+	TransactionId originalXmin;
 };
 
 /* NOTE: "typedef struct PGPROC PGPROC" appears in storage/lock.h. */
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index 75bab2985f..e68a87575e 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -36,6 +36,10 @@
 
 #define		PROCARRAY_SLOTS_XMIN			0x20	/* replication slot xmin,
 													 * catalog_xmin */
+
+#define		PROCARRAY_NON_IMPORTED_XMIN		0x40	/* use originalXmin instead
+													 * of xmin to properly
+													 * maintain gsXidMap */
 /*
  * Only flags in PROCARRAY_PROC_FLAGS_MASK are considered when matching
  * PGXACT->vacuumFlags. Other flags are used for different purposes and
@@ -124,4 +128,8 @@ extern void ProcArraySetReplicationSlotXmin(TransactionId xmin,
 extern void ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
 								TransactionId *catalog_xmin);
 
+extern void ProcArraySetGlobalSnapshotXmin(TransactionId xmin);
+
+extern TransactionId ProcArrayGetGlobalSnapshotXmin(void);
+
 #endif							/* PROCARRAY_H */
diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h
index 83806f3040..1a066fd8d8 100644
--- a/src/include/utils/snapmgr.h
+++ b/src/include/utils/snapmgr.h
@@ -87,6 +87,9 @@ extern void AtSubCommit_Snapshot(int level);
 extern void AtSubAbort_Snapshot(int level);
 extern void AtEOXact_Snapshot(bool isCommit, bool resetXmin);
 
+extern GlobalCSN ExportGlobalSnapshot(void);
+extern void ImportGlobalSnapshot(GlobalCSN snap_global_csn);
+
 extern void ImportSnapshot(const char *idstr);
 extern bool XactHasExportedSnapshots(void);
 extern void DeleteAllExportedSnapshotFiles(void);
diff --git a/src/include/utils/snapshot.h b/src/include/utils/snapshot.h
index 318d41e6f7..1563407824 100644
--- a/src/include/utils/snapshot.h
+++ b/src/include/utils/snapshot.h
@@ -115,6 +115,14 @@ typedef struct SnapshotData
 
 	TimestampTz whenTaken;		/* timestamp when snapshot was taken */
 	XLogRecPtr	lsn;			/* position in the WAL stream when taken */
+
+	/*
+	 * GlobalCSN for cross-node snapshot isolation support.
+	 * Will be used only if track_global_snapshots is enabled.
+	 */
+	GlobalCSN	global_csn;
+	/* Did we have our own global_csn or imported one from different node */
+	bool		imported_global_csn;
 } SnapshotData;
 
 /*
-- 
2.11.0

