diff --git a/contrib/pg_tsdtm/Makefile b/contrib/pg_tsdtm/Makefile
new file mode 100644
index 0000000..e70dffc
--- /dev/null
+++ b/contrib/pg_tsdtm/Makefile
@@ -0,0 +1,20 @@
+MODULE_big = pg_tsdtm
+OBJS = pg_tsdtm.o
+
+EXTENSION = pg_tsdtm
+DATA = pg_tsdtm--1.0.sql
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = contrib/pg_tsdtm
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
+
+check:
+	env DESTDIR='$(abs_top_builddir)'/tmp_install make install
+	$(prove_check)
diff --git a/contrib/pg_tsdtm/dtm_recovery/dtm_recovery.cpp b/contrib/pg_tsdtm/dtm_recovery/dtm_recovery.cpp
new file mode 100644
index 0000000..38285be
--- /dev/null
+++ b/contrib/pg_tsdtm/dtm_recovery/dtm_recovery.cpp
@@ -0,0 +1,129 @@
+#include <iostream>
+#include <string>
+#include <vector>
+#include <set>
+
+#include <pqxx/connection>
+#include <pqxx/transaction>
+#include <pqxx/nontransaction>
+
+using namespace std;
+using namespace pqxx;
+
+int main (int argc, char* argv[])
+{
+    if (argc == 1){
+        printf("Use -h to show usage options\n");
+        return 1;
+    }
+    vector<string> connections;
+    set<string> prepared_xacts;
+    set<string> committed_xacts;
+    bool verbose = false;
+    for (int i = 1; i < argc; i++) {
+        if (argv[i][0] == '-') {
+            switch (argv[i][1]) {
+              case 'C':
+              case 'c':
+                connections.push_back(string(argv[++i]));
+                continue;
+              case 'v':
+                verbose = true;
+                continue;
+            }
+        }
+        printf("Perform recovery of pg_tsdtm cluster.\n"
+               "Usage: dtm_recovery {options}\n"
+               "Options:\n"
+               "\t-c STR\tdatabase connection string\n"
+               "\t-v\tverbose mode: print extra information while processing\n");
+        return 1;
+    }
+    if (verbose) {
+        cout << "Collecting information about prepared transactions...\n";
+    }
+    for (vector<string>::iterator ic = connections.begin(); ic != connections.end(); ++ic)
+    {
+        if (verbose) {
+            cout << "Connecting to " << *ic << "...\n";
+        }
+        connection con(*ic);
+        work txn(con);
+        result r = txn.exec("select gid from pg_prepared_xacts");
+        for (result::const_iterator it = r.begin(); it != r.end(); ++it)
+        {
+            string gid = it.at("gid").as(string());
+            prepared_xacts.insert(gid);
+        }
+        txn.commit();
+    }
+    if (verbose) {
+        cout << "Prepared transactions: ";
+        for (set<string>::iterator it = prepared_xacts.begin(); it != prepared_xacts.end(); ++it)
+        {
+            cout << *it << ", ";
+        }
+        cout << "\nChecking which of them are committed...\n";
+    }
+    for (vector<string>::iterator ic = connections.begin(); ic != connections.end(); ++ic)
+    {
+        if (verbose) {
+            cout << "Connecting to " << *ic << "...\n";
+        }
+        connection con(*ic);
+        work txn(con);
+        con.prepare("commit-check", "select * from pg_committed_xacts where gid=$1");
+        for (set<string>::iterator it = prepared_xacts.begin(); it != prepared_xacts.end(); ++it)
+        {
+            string gid = *it;
+            result r = txn.prepared("commit-check")(gid).exec();
+            if (!r.empty()) {
+                committed_xacts.insert(gid);
+            }
+        }
+        txn.commit();
+    }
+    if (verbose) {
+        cout << "Committed transactions: ";
+        for (set<string>::iterator it = committed_xacts.begin(); it != committed_xacts.end(); ++it)
+        {
+            cout << *it << ", ";
+        }
+        cout << "\nCommitting them at all nodes...\n";
+    }
+    for (vector<string>::iterator ic = connections.begin(); ic != connections.end(); ++ic)
+    {
+        if (verbose) {
+            cout << "Connecting to " << *ic << "...\n";
+        }
+        connection con(*ic);
+        work txn(con);
+        con.prepare("commit-check", "select * from pg_committed_xacts where gid=$1");
+        con.prepare("commit-prepared", "commit prepared $1");
+        con.prepare("rollback-prepared", "rollback prepared $1");
+        result r = txn.exec("select gid from pg_prepared_xacts");
+        for (result::const_iterator it = r.begin(); it != r.end(); ++it)
+        {
+            string gid = it.at("gid").as(string());
+            result rc = txn.prepared("commit-check")(gid).exec();
+            if (rc.empty()) {
+                if (committed_xacts.find(gid) != committed_xacts.end()) {
+                    if (verbose) {
+                        cout << "Commit transaction " << gid << "\n";
+                    }
+                    txn.prepared("commit-prepared")(gid);
+                } else {
+                    if (verbose) {
+                        cout << "Rollback transaction " << gid << "\n";
+                    }
+                    txn.prepared("rollback-prepared")(gid);
+                }
+            }
+        }
+        txn.commit();
+    }
+    if (verbose) {
+        cout << "Recovery completed\n";
+    }
+    return 0;
+}
diff --git a/contrib/pg_tsdtm/dtm_recovery/makefile b/contrib/pg_tsdtm/dtm_recovery/makefile
new file mode 100644
index 0000000..4d12c0b
--- /dev/null
+++ b/contrib/pg_tsdtm/dtm_recovery/makefile
@@ -0,0 +1,10 @@
+CXX=g++
+CXXFLAGS=-g -Wall -O0 -pthread
+
+all: dtm_recovery
+
+dtm_recovery: dtm_recovery.cpp
+	$(CXX) $(CXXFLAGS) -o dtm_recovery dtm_recovery.cpp -lpqxx
+
+clean:
+	rm -f dtm_recovery
diff --git a/contrib/pg_tsdtm/pg_tsdtm--1.0.sql b/contrib/pg_tsdtm/pg_tsdtm--1.0.sql
new file mode 100644
index 0000000..dcd81ac
--- /dev/null
+++ b/contrib/pg_tsdtm/pg_tsdtm--1.0.sql
@@ -0,0 +1,26 @@
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION pg_dtm" to load this file. \quit
+
+CREATE FUNCTION dtm_extend(gtid cstring default null) RETURNS bigint
+AS 'MODULE_PATHNAME','dtm_extend'
+LANGUAGE C;
+
+CREATE FUNCTION dtm_access(snapshot bigint, gtid cstring default null) RETURNS bigint
+AS 'MODULE_PATHNAME','dtm_access'
+LANGUAGE C;
+
+CREATE FUNCTION dtm_begin_prepare(gtid cstring) RETURNS void
+AS 'MODULE_PATHNAME','dtm_begin_prepare'
+LANGUAGE C;
+
+CREATE FUNCTION dtm_prepare(gtid cstring, csn bigint) RETURNS bigint
+AS 'MODULE_PATHNAME','dtm_prepare'
+LANGUAGE C;
+
+CREATE FUNCTION dtm_end_prepare(gtid cstring, csn bigint) RETURNS void
+AS 'MODULE_PATHNAME','dtm_end_prepare'
+LANGUAGE C;
+
+CREATE FUNCTION dtm_get_csn(xid integer) RETURNS bigint
+AS 'MODULE_PATHNAME','dtm_get_csn'
+LANGUAGE C;
diff --git a/contrib/pg_tsdtm/pg_tsdtm.c b/contrib/pg_tsdtm/pg_tsdtm.c
new file mode 100644
index 0000000..6dabe76
--- /dev/null
+++ b/contrib/pg_tsdtm/pg_tsdtm.c
@@ -0,0 +1,1021 @@
+/*
+ * pg_dtm.c
+ *
+ * Pluggable distributed transaction manager
+ *
+ */
+
+#include <unistd.h>
+#include <sys/time.h>
+#include <time.h>
+
+#include "postgres.h"
+#include "fmgr.h"
+#include "miscadmin.h"
+#include "storage/s_lock.h"
+#include "storage/spin.h"
+#include "storage/lmgr.h"
+#include "storage/shmem.h"
+#include "storage/ipc.h"
+#include "access/xlogdefs.h"
+#include "access/xact.h"
+#include "access/xtm.h"
+#include "access/transam.h"
+#include "access/subtrans.h"
+#include "access/xlog.h"
+#include "access/clog.h"
+#include "access/twophase.h"
+#include "executor/spi.h"
+#include "utils/hsearch.h"
+#include "utils/tqual.h"
+#include <utils/guc.h>
+
+#include "pg_tsdtm.h"
+
+#define DTM_HASH_INIT_SIZE	1000000
+#define INVALID_CID    0
+#define MIN_WAIT_TIMEOUT 1000
+#define MAX_WAIT_TIMEOUT 100000
+#define MAX_GTID_SIZE  16
+#define HASH_PER_ELEM_OVERHEAD 64
+
+#define USEC 1000000
+
+#define TRACE_SLEEP_TIME 1
+
+typedef uint64 timestamp_t;
+
+/* Distributed transaction state kept in shared memory */
+typedef struct DtmTransStatus
+{
+	TransactionId xid;
+	XidStatus	status;
+	int			nSubxids;
+	cid_t		cid;			/* CSN */
+	struct DtmTransStatus *next;/* pointer to next element in finished
+								 * transaction list */
+}	DtmTransStatus;
+
+/* State of DTM node */
+typedef struct
+{
+	cid_t		cid;			/* last assigned CSN; used to provide unique
+								 * ascending CSNs */
+	TransactionId oldest_xid;	/* XID of oldest transaction visible by any
+								 * active transaction (local or global) */
+	long		time_shift;		/* correction to system time */
+	volatile slock_t lock;		/* spinlock to protect access to hash table  */
+	DtmTransStatus *trans_list_head;	/* L1 list of finished transactions
+										 * present in xid2status hash table.
+										 * This list is used to perform
+										 * cleanup of too old transactions */
+	DtmTransStatus **trans_list_tail;
+}	DtmNodeState;
+
+/* Structure used to map global transaction identifier to XID */
+typedef struct
+{
+	char		gtid[MAX_GTID_SIZE];
+	TransactionId xid;
+	TransactionId *subxids;
+	int			nSubxids;
+}	DtmTransId;
+
+
+#define DTM_TRACE(x)
+/* #define DTM_TRACE(x) fprintf x */
+
+static shmem_startup_hook_type prev_shmem_startup_hook;
+static HTAB *xid2status;
+static HTAB *gtid2xid;
+static DtmNodeState *local;
+static DtmCurrentTrans dtm_tx;
+static uint64 totalSleepInterrupts;
+static int	DtmVacuumDelay;
+static bool DtmRecordCommits;
+
+static Snapshot DtmGetSnapshot(Snapshot snapshot);
+static TransactionId DtmGetOldestXmin(Relation rel, bool ignoreVacuum);
+static bool DtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot);
+static TransactionId DtmAdjustOldestXid(TransactionId xid);
+static bool DtmDetectGlobalDeadLock(PGPROC *proc);
+static cid_t DtmGetCsn(TransactionId xid);
+static void DtmAddSubtransactions(DtmTransStatus * ts, TransactionId *subxids, int nSubxids);
+static char const *DtmGetName(void);
+
+static TransactionManager DtmTM = {
+	PgTransactionIdGetStatus,
+	PgTransactionIdSetTreeStatus,
+	DtmGetSnapshot,
+	PgGetNewTransactionId,
+	DtmGetOldestXmin,
+	PgTransactionIdIsInProgress,
+	PgGetGlobalTransactionId,
+	DtmXidInMVCCSnapshot,
+	DtmDetectGlobalDeadLock,
+	DtmGetName
+};
+
+void		_PG_init(void);
+void		_PG_fini(void);
+
+
+static void dtm_shmem_startup(void);
+static Size dtm_memsize(void);
+static void dtm_xact_callback(XactEvent event, void *arg);
+static timestamp_t dtm_get_current_time();
+static void dtm_sleep(timestamp_t interval);
+static cid_t dtm_get_cid();
+static cid_t dtm_sync(cid_t cid);
+
+/*
+ *	Time manipulation functions
+ */
+
+/* Get current time with microscond resolution */
+static timestamp_t
+dtm_get_current_time()
+{
+	struct timeval tv;
+
+	gettimeofday(&tv, NULL);
+	return (timestamp_t) tv.tv_sec * USEC + tv.tv_usec + local->time_shift;
+}
+
+/* Sleep for specified amount of time */
+static void
+dtm_sleep(timestamp_t interval)
+{
+	struct timespec ts;
+	struct timespec rem;
+
+	ts.tv_sec = 0;
+	ts.tv_nsec = interval * 1000;
+
+	while (nanosleep(&ts, &rem) < 0)
+	{
+		totalSleepInterrupts += 1;
+		Assert(errno == EINTR);
+		ts = rem;
+	}
+}
+
+/* Get unique ascending CSN.
+ * This function is called inside critical section
+ */
+static cid_t
+dtm_get_cid()
+{
+	cid_t		cid = dtm_get_current_time();
+
+	if (cid <= local->cid)
+	{
+		cid = ++local->cid;
+	}
+	else
+	{
+		local->cid = cid;
+	}
+	return cid;
+}
+
+/*
+ * Adjust system time
+ */
+static cid_t
+dtm_sync(cid_t global_cid)
+{
+	cid_t		local_cid;
+
+	while ((local_cid = dtm_get_cid()) < global_cid)
+	{
+		local->time_shift += global_cid - local_cid;
+	}
+	return local_cid;
+}
+
+void
+_PG_init(void)
+{
+	DTM_TRACE((stderr, "DTM_PG_init \n"));
+
+	/*
+	 * In order to create our shared memory area, we have to be loaded via
+	 * shared_preload_libraries.  If not, fall out without hooking into any of
+	 * the main system.  (We don't throw error here because it seems useful to
+	 * allow the pg_stat_statements functions to be created even when the
+	 * module isn't active.  The functions must protect themselves against
+	 * being called then, however.)
+	 */
+	if (!process_shared_preload_libraries_in_progress)
+		return;
+
+	RequestAddinShmemSpace(dtm_memsize());
+
+	DefineCustomIntVariable(
+							"dtm.vacuum_delay",
+					"Minimal age of records which can be vacuumed (seconds)",
+							NULL,
+							&DtmVacuumDelay,
+							10,
+							1,
+							INT_MAX,
+							PGC_BACKEND,
+							0,
+							NULL,
+							NULL,
+							NULL
+		);
+
+	DefineCustomBoolVariable(
+							 "dtm.record_commits",
+							 "Store information about committed global transactions in pg_committed_xacts table",
+							 NULL,
+							 &DtmRecordCommits,
+							 false,
+							 PGC_BACKEND,
+							 0,
+							 NULL,
+							 NULL,
+							 NULL
+		);
+
+
+	/*
+	 * Install hooks.
+	 */
+	prev_shmem_startup_hook = shmem_startup_hook;
+	shmem_startup_hook = dtm_shmem_startup;
+}
+
+/*
+ * Module unload callback
+ */
+void
+_PG_fini(void)
+{
+	/* Uninstall hooks. */
+	shmem_startup_hook = prev_shmem_startup_hook;
+}
+
+/*
+ * Estimate shared memory space needed.
+ */
+static Size
+dtm_memsize(void)
+{
+	Size		size;
+
+	size = MAXALIGN(sizeof(DtmNodeState));
+	size = add_size(size, (sizeof(DtmTransId) + sizeof(DtmTransStatus) + HASH_PER_ELEM_OVERHEAD * 2) * DTM_HASH_INIT_SIZE);
+
+	return size;
+}
+
+
+/*
+ * shmem_startup hook: allocate or attach to shared memory,
+ * then load any pre-existing statistics from file.
+ * Also create and load the query-texts file, which is expected to exist
+ * (even if empty) while the module is enabled.
+ */
+static void
+dtm_shmem_startup(void)
+{
+	if (prev_shmem_startup_hook)
+	{
+		prev_shmem_startup_hook();
+	}
+	DtmInitialize();
+}
+
+static GlobalTransactionId
+dtm_get_global_trans_id()
+{
+	return GetLockedGlobalTransactionId();
+}
+
+static void
+dtm_xact_callback(XactEvent event, void *arg)
+{
+	DTM_TRACE((stderr, "Backend %d dtm_xact_callback %d\n", getpid(), event));
+	switch (event)
+	{
+		case XACT_EVENT_START:
+			DtmLocalBegin(&dtm_tx);
+			break;
+
+		case XACT_EVENT_ABORT:
+			DtmLocalAbort(&dtm_tx);
+			DtmLocalEnd(&dtm_tx);
+			break;
+
+		case XACT_EVENT_COMMIT:
+			DtmLocalCommit(&dtm_tx);
+			DtmLocalEnd(&dtm_tx);
+			break;
+
+		case XACT_EVENT_ABORT_PREPARED:
+			DtmLocalAbortPrepared(&dtm_tx, dtm_get_global_trans_id());
+			break;
+
+		case XACT_EVENT_COMMIT_PREPARED:
+			DtmLocalCommitPrepared(&dtm_tx, dtm_get_global_trans_id());
+			break;
+
+		case XACT_EVENT_PREPARE:
+			DtmLocalSavePreparedState(dtm_get_global_trans_id());
+			DtmLocalEnd(&dtm_tx);
+			break;
+
+		default:
+			break;
+	}
+}
+
+/*
+ *	***************************************************************************
+ */
+
+PG_MODULE_MAGIC;
+
+PG_FUNCTION_INFO_V1(dtm_extend);
+PG_FUNCTION_INFO_V1(dtm_access);
+PG_FUNCTION_INFO_V1(dtm_begin_prepare);
+PG_FUNCTION_INFO_V1(dtm_prepare);
+PG_FUNCTION_INFO_V1(dtm_end_prepare);
+PG_FUNCTION_INFO_V1(dtm_get_csn);
+
+Datum
+dtm_extend(PG_FUNCTION_ARGS)
+{
+	GlobalTransactionId gtid = PG_GETARG_CSTRING(0);
+	cid_t		cid = DtmLocalExtend(&dtm_tx, gtid);
+
+	DTM_TRACE((stderr, "Backend %d extends transaction %u(%s) to global with cid=%lu\n", getpid(), dtm_tx.xid, gtid, cid));
+	PG_RETURN_INT64(cid);
+}
+
+Datum
+dtm_access(PG_FUNCTION_ARGS)
+{
+	cid_t		cid = PG_GETARG_INT64(0);
+	GlobalTransactionId gtid = PG_GETARG_CSTRING(1);
+
+	DTM_TRACE((stderr, "Backend %d joins transaction %u(%s) with cid=%lu\n", getpid(), dtm_tx.xid, gtid, cid));
+	cid = DtmLocalAccess(&dtm_tx, gtid, cid);
+	PG_RETURN_INT64(cid);
+}
+
+Datum
+dtm_begin_prepare(PG_FUNCTION_ARGS)
+{
+	GlobalTransactionId gtid = PG_GETARG_CSTRING(0);
+
+	DtmLocalBeginPrepare(gtid);
+	DTM_TRACE((stderr, "Backend %d begins prepare of transaction %s\n", getpid(), gtid));
+	PG_RETURN_VOID();
+}
+
+Datum
+dtm_prepare(PG_FUNCTION_ARGS)
+{
+	GlobalTransactionId gtid = PG_GETARG_CSTRING(0);
+	cid_t		cid = PG_GETARG_INT64(1);
+
+	cid = DtmLocalPrepare(gtid, cid);
+	DTM_TRACE((stderr, "Backend %d prepares transaction %s with cid=%lu\n", getpid(), gtid, cid));
+	PG_RETURN_INT64(cid);
+}
+
+Datum
+dtm_end_prepare(PG_FUNCTION_ARGS)
+{
+	GlobalTransactionId gtid = PG_GETARG_CSTRING(0);
+	cid_t		cid = PG_GETARG_INT64(1);
+
+	DTM_TRACE((stderr, "Backend %d ends prepare of transactions %s with cid=%lu\n", getpid(), gtid, cid));
+	DtmLocalEndPrepare(gtid, cid);
+	PG_RETURN_VOID();
+}
+
+Datum
+dtm_get_csn(PG_FUNCTION_ARGS)
+{
+	TransactionId xid = PG_GETARG_INT32(0);
+	cid_t		csn = DtmGetCsn(xid);
+
+	PG_RETURN_INT64(csn);
+}
+
+/*
+ *	***************************************************************************
+ */
+
+static uint32
+dtm_xid_hash_fn(const void *key, Size keysize)
+{
+	return (uint32) *(TransactionId *) key;
+}
+
+static int
+dtm_xid_match_fn(const void *key1, const void *key2, Size keysize)
+{
+	return *(TransactionId *) key1 - *(TransactionId *) key2;
+}
+
+static uint32
+dtm_gtid_hash_fn(const void *key, Size keysize)
+{
+	GlobalTransactionId id = (GlobalTransactionId) key;
+	uint32		h = 0;
+
+	while (*id != 0)
+	{
+		h = h * 31 + *id++;
+	}
+	return h;
+}
+
+static void *
+dtm_gtid_keycopy_fn(void *dest, const void *src, Size keysize)
+{
+	return strcpy((char *) dest, (GlobalTransactionId) src);
+}
+
+static int
+dtm_gtid_match_fn(const void *key1, const void *key2, Size keysize)
+{
+	return strcmp((GlobalTransactionId) key1, (GlobalTransactionId) key2);
+}
+
+static char const *
+DtmGetName(void)
+{
+	return "pg_tsdtm";
+}
+
+static void
+DtmTransactionListAppend(DtmTransStatus * ts)
+{
+	ts->next = NULL;
+	*local->trans_list_tail = ts;
+	local->trans_list_tail = &ts->next;
+}
+
+static void
+DtmTransactionListInsertAfter(DtmTransStatus * after, DtmTransStatus * ts)
+{
+	ts->next = after->next;
+	after->next = ts;
+	if (local->trans_list_tail == &after->next)
+	{
+		local->trans_list_tail = &ts->next;
+	}
+}
+
+/*
+ * There can be different oldest XIDs at different cluster node.
+ * Seince we do not have centralized aribiter, we have to rely in DtmVacuumDelay.
+ * This function takes XID which PostgreSQL consider to be the latest and try to find XID which
+ * is older than it more than DtmVacuumDelay.
+ * If no such XID can be located, then return previously observed oldest XID
+ */
+static TransactionId
+DtmAdjustOldestXid(TransactionId xid)
+{
+	if (TransactionIdIsValid(xid))
+	{
+		DtmTransStatus *ts,
+				   *prev = NULL;
+		timestamp_t now = dtm_get_current_time();
+		timestamp_t cutoff_time = now - DtmVacuumDelay * USEC;
+
+		SpinLockAcquire(&local->lock);
+		ts = (DtmTransStatus *) hash_search(xid2status, &xid, HASH_FIND, NULL);
+		if (ts != NULL)
+		{
+			cutoff_time = ts->cid - DtmVacuumDelay * USEC;
+
+			for (ts = local->trans_list_head; ts != NULL && ts->cid < cutoff_time; prev = ts, ts = ts->next)
+			{
+				if (prev != NULL)
+					hash_search(xid2status, &prev->xid, HASH_REMOVE, NULL);
+			}
+		}
+		if (prev != NULL)
+		{
+			local->trans_list_head = prev;
+			local->oldest_xid = xid = prev->xid;
+		}
+		else
+		{
+			xid = local->oldest_xid;
+		}
+		SpinLockRelease(&local->lock);
+	}
+	return xid;
+}
+
+Snapshot
+DtmGetSnapshot(Snapshot snapshot)
+{
+	snapshot = PgGetSnapshotData(snapshot);
+	RecentGlobalDataXmin = RecentGlobalXmin = DtmAdjustOldestXid(RecentGlobalDataXmin);
+	return snapshot;
+}
+
+TransactionId
+DtmGetOldestXmin(Relation rel, bool ignoreVacuum)
+{
+	TransactionId xmin = PgGetOldestXmin(rel, ignoreVacuum);
+
+	xmin = DtmAdjustOldestXid(xmin);
+	return xmin;
+}
+
+/*
+ * Check tuple bisibility based on CSN of current transaction.
+ * If there is no niformation about transaction with this XID, then use standard PostgreSQL visibility rules.
+ */
+bool
+DtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
+{
+	timestamp_t delay = MIN_WAIT_TIMEOUT;
+
+	Assert(xid != InvalidTransactionId);
+
+	SpinLockAcquire(&local->lock);
+
+	while (true)
+	{
+		DtmTransStatus *ts = (DtmTransStatus *) hash_search(xid2status, &xid, HASH_FIND, NULL);
+
+		if (ts != NULL)
+		{
+			if (ts->cid > dtm_tx.snapshot)
+			{
+				DTM_TRACE((stderr, "%d: tuple with xid=%d(csn=%lld) is invisibile in snapshot %lld\n",
+						   getpid(), xid, ts->cid, dtm_tx.snapshot));
+				SpinLockRelease(&local->lock);
+				return true;
+			}
+			if (ts->status == TRANSACTION_STATUS_IN_PROGRESS)
+			{
+				DTM_TRACE((stderr, "%d: wait for in-doubt transaction %u in snapshot %lu\n", getpid(), xid, dtm_tx.snapshot));
+				SpinLockRelease(&local->lock);
+
+				dtm_sleep(delay);
+
+				if (delay * 2 <= MAX_WAIT_TIMEOUT)
+					delay *= 2;
+				SpinLockAcquire(&local->lock);
+			}
+			else
+			{
+				bool		invisible = ts->status == TRANSACTION_STATUS_ABORTED;
+
+				DTM_TRACE((stderr, "%d: tuple with xid=%d(csn= %lld) is %s in snapshot %lld\n",
+						   getpid(), xid, ts->cid, invisible ? "rollbacked" : "committed", dtm_tx.snapshot));
+				SpinLockRelease(&local->lock);
+				return invisible;
+			}
+		}
+		else
+		{
+			DTM_TRACE((stderr, "%d: visibility check is skept for transaction %u in snapshot %lu\n", getpid(), xid, dtm_tx.snapshot));
+			break;
+		}
+	}
+	SpinLockRelease(&local->lock);
+	return PgXidInMVCCSnapshot(xid, snapshot);
+}
+
+void
+DtmInitialize()
+{
+	bool		found;
+	static HASHCTL info;
+
+	info.keysize = sizeof(TransactionId);
+	info.entrysize = sizeof(DtmTransStatus);
+	info.hash = dtm_xid_hash_fn;
+	info.match = dtm_xid_match_fn;
+	xid2status = ShmemInitHash("xid2status",
+							   DTM_HASH_INIT_SIZE, DTM_HASH_INIT_SIZE,
+							   &info,
+							   HASH_ELEM | HASH_FUNCTION | HASH_COMPARE);
+
+	info.keysize = MAX_GTID_SIZE;
+	info.entrysize = sizeof(DtmTransId);
+	info.hash = dtm_gtid_hash_fn;
+	info.match = dtm_gtid_match_fn;
+	info.keycopy = dtm_gtid_keycopy_fn;
+	gtid2xid = ShmemInitHash("gtid2xid",
+							 DTM_HASH_INIT_SIZE, DTM_HASH_INIT_SIZE,
+							 &info,
+					HASH_ELEM | HASH_FUNCTION | HASH_COMPARE | HASH_KEYCOPY);
+
+	TM = &DtmTM;
+
+	LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
+	local = (DtmNodeState *) ShmemInitStruct("dtm", sizeof(DtmNodeState), &found);
+	if (!found)
+	{
+		local->time_shift = 0;
+		local->oldest_xid = FirstNormalTransactionId;
+		local->cid = dtm_get_current_time();
+		local->trans_list_head = NULL;
+		local->trans_list_tail = &local->trans_list_head;
+		SpinLockInit(&local->lock);
+		RegisterXactCallback(dtm_xact_callback, NULL);
+	}
+	LWLockRelease(AddinShmemInitLock);
+}
+
+/*
+ * Start transaction at local node.
+ * Associate local snapshot (current time) with this transaction.
+ */
+void
+DtmLocalBegin(DtmCurrentTrans * x)
+{
+	if (!TransactionIdIsValid(x->xid))
+	{
+		SpinLockAcquire(&local->lock);
+		x->xid = GetCurrentTransactionId();
+		Assert(TransactionIdIsValid(x->xid));
+		x->cid = INVALID_CID;
+		x->is_global = false;
+		x->is_prepared = false;
+		x->snapshot = dtm_get_cid();
+		SpinLockRelease(&local->lock);
+		DTM_TRACE((stderr, "DtmLocalBegin: transaction %u uses local snapshot %lu\n", x->xid, x->snapshot));
+	}
+}
+
+/*
+ * Transaction is going to be distributed.
+ * Returns snapshot of current transaction.
+ */
+cid_t
+DtmLocalExtend(DtmCurrentTrans * x, GlobalTransactionId gtid)
+{
+	if (gtid != NULL)
+	{
+		SpinLockAcquire(&local->lock);
+		{
+			DtmTransId *id = (DtmTransId *) hash_search(gtid2xid, gtid, HASH_ENTER, NULL);
+
+			id->xid = x->xid;
+			id->nSubxids = 0;
+			id->subxids = 0;
+		}
+		SpinLockRelease(&local->lock);
+	}
+	x->is_global = true;
+	return x->snapshot;
+}
+
+/*
+ * This function is executed on all nodes joining distributed transaction.
+ * global_cid is snapshot taken from node initiated this transaction
+ */
+cid_t
+DtmLocalAccess(DtmCurrentTrans * x, GlobalTransactionId gtid, cid_t global_cid)
+{
+	cid_t		local_cid;
+
+	SpinLockAcquire(&local->lock);
+	{
+		if (gtid != NULL)
+		{
+			DtmTransId *id = (DtmTransId *) hash_search(gtid2xid, gtid, HASH_ENTER, NULL);
+
+			id->xid = x->xid;
+			id->nSubxids = 0;
+			id->subxids = 0;
+		}
+		local_cid = dtm_sync(global_cid);
+		x->snapshot = global_cid;
+		x->is_global = true;
+	}
+	SpinLockRelease(&local->lock);
+	if (global_cid < local_cid - DtmVacuumDelay * USEC)
+	{
+		elog(ERROR, "Too old snapshot: requested %ld, current %ld", global_cid, local_cid);
+	}
+	return global_cid;
+}
+
+/*
+ * Set transaction status to in-doubt. Now all transactions accessing tuples updated by this transaction have to
+ * wait until it is either committed either aborted
+ */
+void
+DtmLocalBeginPrepare(GlobalTransactionId gtid)
+{
+	SpinLockAcquire(&local->lock);
+	{
+		DtmTransStatus *ts;
+		DtmTransId *id;
+
+		id = (DtmTransId *) hash_search(gtid2xid, gtid, HASH_FIND, NULL);
+		Assert(id != NULL);
+		Assert(TransactionIdIsValid(id->xid));
+		ts = (DtmTransStatus *) hash_search(xid2status, &id->xid, HASH_ENTER, NULL);
+		ts->status = TRANSACTION_STATUS_IN_PROGRESS;
+		ts->cid = dtm_get_cid();
+		ts->nSubxids = id->nSubxids;
+		DtmTransactionListAppend(ts);
+		DtmAddSubtransactions(ts, id->subxids, id->nSubxids);
+	}
+	SpinLockRelease(&local->lock);
+}
+
+/*
+ * Choose maximal CSN among all nodes.
+ * This function returns maximum of passed (global) and local (current time) CSNs.
+ */
+cid_t
+DtmLocalPrepare(GlobalTransactionId gtid, cid_t global_cid)
+{
+	cid_t		local_cid;
+
+	SpinLockAcquire(&local->lock);
+	local_cid = dtm_get_cid();
+	if (local_cid > global_cid)
+	{
+		global_cid = local_cid;
+	}
+	SpinLockRelease(&local->lock);
+	return global_cid;
+}
+
+/*
+ * Adjust system tiem according to the received maximal CSN
+ */
+void
+DtmLocalEndPrepare(GlobalTransactionId gtid, cid_t cid)
+{
+	SpinLockAcquire(&local->lock);
+	{
+		DtmTransStatus *ts;
+		DtmTransId *id;
+		int			i;
+
+		id = (DtmTransId *) hash_search(gtid2xid, gtid, HASH_FIND, NULL);
+		Assert(id != NULL);
+
+		ts = (DtmTransStatus *) hash_search(xid2status, &id->xid, HASH_FIND, NULL);
+		Assert(ts != NULL);
+		ts->cid = cid;
+		for (i = 0; i < ts->nSubxids; i++)
+		{
+			ts = ts->next;
+			ts->cid = cid;
+		}
+		dtm_sync(cid);
+
+		DTM_TRACE((stderr, "Prepare transaction %u(%s) with CSN %lu\n", id->xid, gtid, cid));
+	}
+	SpinLockRelease(&local->lock);
+
+	/*
+	 * Record commit in pg_committed_xact table to be make it possible to
+	 * perform recovery in case of crash of some of cluster nodes
+	 */
+	if (DtmRecordCommits)
+	{
+		char		stmt[MAX_GTID_SIZE + 64];
+		int			rc;
+
+		sprintf(stmt, "insert into pg_committed_xacts values ('%s')", gtid);
+		SPI_connect();
+		rc = SPI_execute(stmt, true, 0);
+		SPI_finish();
+		if (rc != SPI_OK_INSERT)
+		{
+			elog(ERROR, "Failed to insert GTID %s in table pg_committed_xacts", gtid);
+		}
+	}
+}
+
+/*
+ * Mark tranasction as prepared
+ */
+void
+DtmLocalCommitPrepared(DtmCurrentTrans * x, GlobalTransactionId gtid)
+{
+	Assert(gtid != NULL);
+
+	SpinLockAcquire(&local->lock);
+	{
+		DtmTransId *id = (DtmTransId *) hash_search(gtid2xid, gtid, HASH_REMOVE, NULL);
+
+		Assert(id != NULL);
+
+		x->is_global = true;
+		x->is_prepared = true;
+		x->xid = id->xid;
+		free(id->subxids);
+
+		DTM_TRACE((stderr, "Global transaction %u(%s) is precommitted\n", x->xid, gtid));
+	}
+	SpinLockRelease(&local->lock);
+}
+
+/*
+ * Set transaction status to committed
+ */
+void
+DtmLocalCommit(DtmCurrentTrans * x)
+{
+	SpinLockAcquire(&local->lock);
+	if (TransactionIdIsValid(x->xid))
+	{
+		bool		found;
+		DtmTransStatus *ts;
+
+		ts = (DtmTransStatus *) hash_search(xid2status, &x->xid, HASH_ENTER, &found);
+		ts->status = TRANSACTION_STATUS_COMMITTED;
+		if (x->is_prepared)
+		{
+			int			i;
+			DtmTransStatus *sts = ts;
+
+			Assert(found);
+			Assert(x->is_global);
+			for (i = 0; i < ts->nSubxids; i++)
+			{
+				sts = sts->next;
+				Assert(sts->cid == ts->cid);
+				sts->status = TRANSACTION_STATUS_COMMITTED;
+			}
+		}
+		else
+		{
+			TransactionId *subxids;
+
+			Assert(!found);
+			ts->cid = dtm_get_cid();
+			DtmTransactionListAppend(ts);
+			ts->nSubxids = xactGetCommittedChildren(&subxids);
+			DtmAddSubtransactions(ts, subxids, ts->nSubxids);
+		}
+		x->cid = ts->cid;
+		DTM_TRACE((stderr, "Local transaction %u is committed at %lu\n", x->xid, x->cid));
+	}
+	SpinLockRelease(&local->lock);
+}
+
+/*
+ * Mark tranasction as prepared
+ */
+void
+DtmLocalAbortPrepared(DtmCurrentTrans * x, GlobalTransactionId gtid)
+{
+	Assert(gtid != NULL);
+
+	SpinLockAcquire(&local->lock);
+	{
+		DtmTransId *id = (DtmTransId *) hash_search(gtid2xid, gtid, HASH_REMOVE, NULL);
+
+		Assert(id != NULL);
+
+		x->is_global = true;
+		x->is_prepared = true;
+		x->xid = id->xid;
+		free(id->subxids);
+
+		DTM_TRACE((stderr, "Global transaction %u(%s) is preaborted\n", x->xid, gtid));
+	}
+	SpinLockRelease(&local->lock);
+}
+
+/*
+ * Set transaction status to aborted
+ */
+void
+DtmLocalAbort(DtmCurrentTrans * x)
+{
+	SpinLockAcquire(&local->lock);
+	{
+		bool		found;
+		DtmTransStatus *ts;
+
+		Assert(TransactionIdIsValid(x->xid));
+		ts = (DtmTransStatus *) hash_search(xid2status, &x->xid, HASH_ENTER, &found);
+		if (x->is_prepared)
+		{
+			Assert(found);
+			Assert(x->is_global);
+		}
+		else
+		{
+			Assert(!found);
+			ts->cid = dtm_get_cid();
+			ts->nSubxids = 0;
+			DtmTransactionListAppend(ts);
+		}
+		x->cid = ts->cid;
+		ts->status = TRANSACTION_STATUS_ABORTED;
+		DTM_TRACE((stderr, "Local transaction %u is aborted at %lu\n", x->xid, x->cid));
+	}
+	SpinLockRelease(&local->lock);
+}
+
+/*
+ * Cleanup dtm_tx structure
+ */
+void
+DtmLocalEnd(DtmCurrentTrans * x)
+{
+	x->is_global = false;
+	x->is_prepared = false;
+	x->xid = InvalidTransactionId;
+	x->cid = INVALID_CID;
+}
+
+/*
+ * Now only timestapm based dealock detection is supported for pg_tsdtm.
+ * Please adjust "deadlock_timeout" parameter in postresql.conf to avoid false
+ * deadlock detection.
+ */
+bool
+DtmDetectGlobalDeadLock(PGPROC *proc)
+{
+	elog(WARNING, "Global deadlock?");
+	return true;
+}
+
+static cid_t
+DtmGetCsn(TransactionId xid)
+{
+	cid_t		csn = 0;
+
+	SpinLockAcquire(&local->lock);
+	{
+		DtmTransStatus *ts = (DtmTransStatus *) hash_search(xid2status, &xid, HASH_FIND, NULL);
+
+		if (ts != NULL)
+		{
+			csn = ts->cid;
+		}
+	}
+	SpinLockRelease(&local->lock);
+	return csn;
+}
+
+/*
+ * Save state of parepared transaction
+ */
+void
+DtmLocalSavePreparedState(GlobalTransactionId gtid)
+{
+	if (gtid != NULL)
+	{
+		SpinLockAcquire(&local->lock);
+		{
+			DtmTransId *id = (DtmTransId *) hash_search(gtid2xid, gtid, HASH_FIND, NULL);
+
+			if (id != NULL)
+			{
+				TransactionId *subxids;
+				int			nSubxids = xactGetCommittedChildren(&subxids);
+
+				if (nSubxids != 0)
+				{
+					id->subxids = (TransactionId *) malloc(nSubxids * sizeof(TransactionId));
+					id->nSubxids = nSubxids;
+					memcpy(id->subxids, subxids, nSubxids * sizeof(TransactionId));
+				}
+			}
+		}
+		SpinLockRelease(&local->lock);
+	}
+}
+
+/*
+ * Add subtransactions to finished transactions list.
+ * Copy CSN and status of parent transaction.
+ */
+static void
+DtmAddSubtransactions(DtmTransStatus * ts, TransactionId *subxids, int nSubxids)
+{
+	int			i;
+
+	for (i = 0; i < nSubxids; i++)
+	{
+		bool		found;
+		DtmTransStatus *sts;
+
+		Assert(TransactionIdIsValid(subxids[i]));
+		sts = (DtmTransStatus *) hash_search(xid2status, &subxids[i], HASH_ENTER, &found);
+		Assert(!found);
+		sts->status = ts->status;
+		sts->cid = ts->cid;
+		sts->nSubxids = 0;
+		DtmTransactionListInsertAfter(ts, sts);
+	}
+}
diff --git a/contrib/pg_tsdtm/pg_tsdtm.control b/contrib/pg_tsdtm/pg_tsdtm.control
new file mode 100644
index 0000000..f9b8215
--- /dev/null
+++ b/contrib/pg_tsdtm/pg_tsdtm.control
@@ -0,0 +1,4 @@
+comment = 'Pluggable distributed transaction manager'
+default_version = '1.0'
+module_pathname = '$libdir/pg_tsdtm'
+relocatable = true
\ No newline at end of file
diff --git a/contrib/pg_tsdtm/pg_tsdtm.h b/contrib/pg_tsdtm/pg_tsdtm.h
new file mode 100644
index 0000000..467038a
--- /dev/null
+++ b/contrib/pg_tsdtm/pg_tsdtm.h
@@ -0,0 +1,57 @@
+#ifndef DTM_BACKEND_H
+#define DTM_BACKEND_H
+
+typedef int nodeid_t;
+typedef uint64 cid_t;
+
+typedef struct
+{
+	TransactionId xid;
+	bool		is_global;
+	bool		is_prepared;
+	cid_t		cid;
+	cid_t		snapshot;
+}	DtmCurrentTrans;
+
+typedef char const *GlobalTransactionId;
+
+/* Initialize DTM extension */
+void		DtmInitialize(void);
+
+/* Invoked at start of any local or global transaction */
+void		DtmLocalBegin(DtmCurrentTrans * x);
+
+/* Extend local transaction to global by assigning upper bound CSN which is returned to coordinator */
+cid_t		DtmLocalExtend(DtmCurrentTrans * x, GlobalTransactionId gtid);
+
+/* Function called at first access to any datanode except first one involved in distributed transaction */
+cid_t		DtmLocalAccess(DtmCurrentTrans * x, GlobalTransactionId gtid, cid_t snapshot);
+
+/* Mark transaction as in-doubt */
+void		DtmLocalBeginPrepare(GlobalTransactionId gtid);
+
+/* Choose CSN for global transaction */
+cid_t		DtmLocalPrepare(GlobalTransactionId gtid, cid_t cid);
+
+/* Assign CSN to global transaction */
+void		DtmLocalEndPrepare(GlobalTransactionId gtid, cid_t cid);
+
+/* Do local commit of global transaction */
+void		DtmLocalCommitPrepared(DtmCurrentTrans * x, GlobalTransactionId gtid);
+
+/* Do local abort of global transaction */
+void		DtmLocalAbortPrepared(DtmCurrentTrans * x, GlobalTransactionId gtid);
+
+/* Do local commit of global transaction */
+void		DtmLocalCommit(DtmCurrentTrans * x);
+
+/* Do local abort of global transaction */
+void		DtmLocalAbort(DtmCurrentTrans * x);
+
+/* Invoked at the end of any local or global transaction: free transaction state */
+void		DtmLocalEnd(DtmCurrentTrans * x);
+
+/* Save global preapred transactoin state */
+void		DtmLocalSavePreparedState(GlobalTransactionId gtid);
+
+#endif
diff --git a/contrib/pg_tsdtm/t/001_distributed_transactions.pl b/contrib/pg_tsdtm/t/001_distributed_transactions.pl
new file mode 100644
index 0000000..b34895b
--- /dev/null
+++ b/contrib/pg_tsdtm/t/001_distributed_transactions.pl
@@ -0,0 +1,133 @@
+###############################################################################
+# Test of proper transaction isolation.
+###############################################################################
+
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 1;
+use DBI();
+use DBD::Pg();
+
+sub query_row
+{
+	my ($dbi, $sql, @keys) = @_;
+	my $sth = $dbi->prepare($sql) || die;
+	$sth->execute(@keys) || die;
+	my $ret = $sth->fetchrow_array;
+	print "query_row('$sql') -> $ret \n";
+	return $ret;
+}
+
+sub query_exec
+{
+	my ($dbi, $sql) = @_;
+	print "query_exec('$sql')\n";
+	my $rv = $dbi->do($sql) || die;
+	return $rv;
+}
+
+sub PostgresNode::psql_ok {
+	my ($self, $sql, $comment) = @_;
+
+	$self->command_ok(['psql', '-A', '-t', '--no-psqlrc',
+		'-d', $self->connstr, '-c', $sql], $comment);
+}
+
+sub PostgresNode::psql_fails {
+	my ($self, $sql, $comment) = @_;
+
+	$self->command_ok(['psql', '-A', '-t', '--no-psqlrc',
+		'-d', $self->connstr, '-c', $sql], $comment);
+}
+
+###############################################################################
+# Setup nodes
+###############################################################################
+
+# Setup first node
+my $node1 = get_new_node("node1");
+$node1->init;
+$node1->append_conf('postgresql.conf', qq(
+max_prepared_transactions = 10
+shared_preload_libraries = 'pg_tsdtm'
+));
+$node1->start;
+
+# Setup second node
+my $node2 = get_new_node("node2");
+$node2->init;
+$node2->append_conf('postgresql.conf', qq(
+max_prepared_transactions = 10
+shared_preload_libraries = 'pg_tsdtm'
+));
+$node2->start;
+
+$node1->psql('postgres', "create extension pg_tsdtm;");
+$node1->psql('postgres', "create table t(u int primary key, v int)");
+$node1->psql('postgres', "insert into t (select generate_series(0, 9), 0)");
+
+$node2->psql('postgres', "create extension pg_tsdtm;");
+$node2->psql('postgres', "create table t(u int primary key, v int)");
+$node2->psql('postgres', "insert into t (select generate_series(0, 9), 0)");
+
+# we need two connections to each node (run two simultameous global tx)
+my $conn11 = DBI->connect('DBI:Pg:' . $node1->connstr('postgres'));
+my $conn21 = DBI->connect('DBI:Pg:' . $node2->connstr('postgres'));
+my $conn12 = DBI->connect('DBI:Pg:' . $node1->connstr('postgres'));
+my $conn22 = DBI->connect('DBI:Pg:' . $node2->connstr('postgres'));
+
+sub count_total
+{
+	my ($c1, $c2) = @_;
+
+	query_exec($c1, "begin");
+	query_exec($c2, "begin");
+
+	my $snapshot = query_row($c1, "select dtm_extend()");
+	query_row($c2, "select dtm_access($snapshot)");
+
+	my $sum1 = query_row($c1, "select sum(v) from t");
+	my $sum2 = query_row($c2, "select sum(v) from t");
+
+	query_exec($c1, "commit");
+	query_exec($c2, "commit");
+
+	my $tot = $sum1 + $sum2;
+
+	print "total = $tot\n";
+	return $tot;
+}
+
+###############################################################################
+# Sanity check on dirty reads
+###############################################################################
+
+my $gtid1 = "gtx1";
+
+# start global tx
+query_exec($conn11, "begin transaction");
+query_exec($conn21, "begin transaction");
+my $snapshot = query_row($conn11, "select dtm_extend('$gtid1')");
+query_exec($conn21, "select dtm_access($snapshot, '$gtid1')");
+
+# transfer some amount of integers to different node
+query_exec($conn11, "update t set v = v - 10 where u=1");
+my $intermediate_total = count_total($conn12, $conn22);
+query_exec($conn21, "update t set v = v + 10 where u=2");
+
+# commit our global tx
+query_exec($conn11, "prepare transaction '$gtid1'");
+query_exec($conn21, "prepare transaction '$gtid1'");
+query_exec($conn11, "select dtm_begin_prepare('$gtid1')");
+query_exec($conn21, "select dtm_begin_prepare('$gtid1')");
+my $csn = query_row($conn11, "select dtm_prepare('$gtid1', 0)");
+query_exec($conn21, "select dtm_prepare('$gtid1', $csn)");
+query_exec($conn11, "select dtm_end_prepare('$gtid1', $csn)");
+query_exec($conn21, "select dtm_end_prepare('$gtid1', $csn)");
+query_exec($conn11, "commit prepared '$gtid1'");
+query_exec($conn21, "commit prepared '$gtid1'");
+
+is($intermediate_total, 0, "Check for absence of dirty reads");
+
