eXtensible Transaction Manager API (v2)
Hi,
PostgresProffesional cluster teams wants to propose new version of
eXtensible Transaction Manager API.
Previous discussion concerning this patch can be found here:
/messages/by-id/F2766B97-555D-424F-B29F-E0CA0F6D1D74@postgrespro.ru
The API patch itself is small enough, but we think that it will be
strange to provide just API without examples of its usage.
We have implemented various implementations of distributed transaction
manager based on this API:
pg_dtm (based ion snapshot sharing) and pg_tsdtm (CSN based on local
system time).
Based on this two DTM implementation we have developed various "cluster"
implementations:
multimaster+pg_dtm, multimaster+pg_tsdtm, pg_shard+pg_dtm,
pg_shard+pg_tsdtm, postgres_fdw+pg_dtm, postgres_fdw+pg+tsdtm,...
Multimaster is based on logical replication is something like BDR but
synchronous: provide consistency across cluster.
But we want to make this patch as small as possible.
So we decided to include in it only pg_tsdtm and patch of postgres_fdw
allowing to use it with pg_tsdtm.
pg_tsdtm is simpler than pg_dtm because last one includes arbiter with
RAFT protocol (centralized service)
and sockhub for efficient multiplexing backend connections.
Also, in theory, pg_tsdtm provides better scalability, because it is
decentralized.
Architecture of DTM and tsDTM as well as benchmark results are available
at WiKi page:
https://wiki.postgresql.org/wiki/DTM
Please notice pg-tsdtm is just reference implementation of DTM using
this XTM API.
The primary idea of this patch is to add XTM API to PostgreSQL code,
allowing to implement own transaction managers as
Postgres extension. So please review first of all XTM API itself and not
pg_tsdtm which is just and example of its usage.
The complete PostgreSQL branch with all our changes can be found here:
https://github.com/postgrespro/postgres_cluster
-- Konstantin Knizhnik Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
Attachments:
pg_tsdtm.patchtext/x-patch; name=pg_tsdtm.patchDownload
diff --git a/contrib/pg_tsdtm/Makefile b/contrib/pg_tsdtm/Makefile
new file mode 100644
index 0000000..e70dffc
--- /dev/null
+++ b/contrib/pg_tsdtm/Makefile
@@ -0,0 +1,20 @@
+MODULE_big = pg_tsdtm
+OBJS = pg_tsdtm.o
+
+EXTENSION = pg_tsdtm
+DATA = pg_tsdtm--1.0.sql
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = contrib/pg_tsdtm
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
+
+check:
+ env DESTDIR='$(abs_top_builddir)'/tmp_install make install
+ $(prove_check)
diff --git a/contrib/pg_tsdtm/dtm_recovery/dtm_recovery.cpp b/contrib/pg_tsdtm/dtm_recovery/dtm_recovery.cpp
new file mode 100644
index 0000000..38285be
--- /dev/null
+++ b/contrib/pg_tsdtm/dtm_recovery/dtm_recovery.cpp
@@ -0,0 +1,129 @@
+#include <iostream>
+#include <string>
+#include <vector>
+#include <set>
+
+#include <pqxx/connection>
+#include <pqxx/transaction>
+#include <pqxx/nontransaction>
+
+using namespace std;
+using namespace pqxx;
+
+int main (int argc, char* argv[])
+{
+ if (argc == 1){
+ printf("Use -h to show usage options\n");
+ return 1;
+ }
+ vector<string> connections;
+ set<string> prepared_xacts;
+ set<string> committed_xacts;
+ bool verbose = false;
+ for (int i = 1; i < argc; i++) {
+ if (argv[i][0] == '-') {
+ switch (argv[i][1]) {
+ case 'C':
+ case 'c':
+ connections.push_back(string(argv[++i]));
+ continue;
+ case 'v':
+ verbose = true;
+ continue;
+ }
+ }
+ printf("Perform recovery of pg_tsdtm cluster.\n"
+ "Usage: dtm_recovery {options}\n"
+ "Options:\n"
+ "\t-c STR\tdatabase connection string\n"
+ "\t-v\tverbose mode: print extra information while processing\n");
+ return 1;
+ }
+ if (verbose) {
+ cout << "Collecting information about prepared transactions...\n";
+ }
+ for (vector<string>::iterator ic = connections.begin(); ic != connections.end(); ++ic)
+ {
+ if (verbose) {
+ cout << "Connecting to " << *ic << "...\n";
+ }
+ connection con(*ic);
+ work txn(con);
+ result r = txn.exec("select gid from pg_prepared_xacts");
+ for (result::const_iterator it = r.begin(); it != r.end(); ++it)
+ {
+ string gid = it.at("gid").as(string());
+ prepared_xacts.insert(gid);
+ }
+ txn.commit();
+ }
+ if (verbose) {
+ cout << "Prepared transactions: ";
+ for (set<string>::iterator it = prepared_xacts.begin(); it != prepared_xacts.end(); ++it)
+ {
+ cout << *it << ", ";
+ }
+ cout << "\nChecking which of them are committed...\n";
+ }
+ for (vector<string>::iterator ic = connections.begin(); ic != connections.end(); ++ic)
+ {
+ if (verbose) {
+ cout << "Connecting to " << *ic << "...\n";
+ }
+ connection con(*ic);
+ work txn(con);
+ con.prepare("commit-check", "select * from pg_committed_xacts where gid=$1");
+ for (set<string>::iterator it = prepared_xacts.begin(); it != prepared_xacts.end(); ++it)
+ {
+ string gid = *it;
+ result r = txn.prepared("commit-check")(gid).exec();
+ if (!r.empty()) {
+ committed_xacts.insert(gid);
+ }
+ }
+ txn.commit();
+ }
+ if (verbose) {
+ cout << "Committed transactions: ";
+ for (set<string>::iterator it = committed_xacts.begin(); it != committed_xacts.end(); ++it)
+ {
+ cout << *it << ", ";
+ }
+ cout << "\nCommitting them at all nodes...\n";
+ }
+ for (vector<string>::iterator ic = connections.begin(); ic != connections.end(); ++ic)
+ {
+ if (verbose) {
+ cout << "Connecting to " << *ic << "...\n";
+ }
+ connection con(*ic);
+ work txn(con);
+ con.prepare("commit-check", "select * from pg_committed_xacts where gid=$1");
+ con.prepare("commit-prepared", "commit prepared $1");
+ con.prepare("rollback-prepared", "rollback prepared $1");
+ result r = txn.exec("select gid from pg_prepared_xacts");
+ for (result::const_iterator it = r.begin(); it != r.end(); ++it)
+ {
+ string gid = it.at("gid").as(string());
+ result rc = txn.prepared("commit-check")(gid).exec();
+ if (rc.empty()) {
+ if (committed_xacts.find(gid) != committed_xacts.end()) {
+ if (verbose) {
+ cout << "Commit transaction " << gid << "\n";
+ }
+ txn.prepared("commit-prepared")(gid);
+ } else {
+ if (verbose) {
+ cout << "Rollback transaction " << gid << "\n";
+ }
+ txn.prepared("rollback-prepared")(gid);
+ }
+ }
+ }
+ txn.commit();
+ }
+ if (verbose) {
+ cout << "Recovery completed\n";
+ }
+ return 0;
+}
diff --git a/contrib/pg_tsdtm/dtm_recovery/makefile b/contrib/pg_tsdtm/dtm_recovery/makefile
new file mode 100644
index 0000000..4d12c0b
--- /dev/null
+++ b/contrib/pg_tsdtm/dtm_recovery/makefile
@@ -0,0 +1,10 @@
+CXX=g++
+CXXFLAGS=-g -Wall -O0 -pthread
+
+all: dtm_recovery
+
+dtm_recovery: dtm_recovery.cpp
+ $(CXX) $(CXXFLAGS) -o dtm_recovery dtm_recovery.cpp -lpqxx
+
+clean:
+ rm -f dtm_recovery
diff --git a/contrib/pg_tsdtm/pg_tsdtm--1.0.sql b/contrib/pg_tsdtm/pg_tsdtm--1.0.sql
new file mode 100644
index 0000000..dcd81ac
--- /dev/null
+++ b/contrib/pg_tsdtm/pg_tsdtm--1.0.sql
@@ -0,0 +1,26 @@
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION pg_dtm" to load this file. \quit
+
+CREATE FUNCTION dtm_extend(gtid cstring default null) RETURNS bigint
+AS 'MODULE_PATHNAME','dtm_extend'
+LANGUAGE C;
+
+CREATE FUNCTION dtm_access(snapshot bigint, gtid cstring default null) RETURNS bigint
+AS 'MODULE_PATHNAME','dtm_access'
+LANGUAGE C;
+
+CREATE FUNCTION dtm_begin_prepare(gtid cstring) RETURNS void
+AS 'MODULE_PATHNAME','dtm_begin_prepare'
+LANGUAGE C;
+
+CREATE FUNCTION dtm_prepare(gtid cstring, csn bigint) RETURNS bigint
+AS 'MODULE_PATHNAME','dtm_prepare'
+LANGUAGE C;
+
+CREATE FUNCTION dtm_end_prepare(gtid cstring, csn bigint) RETURNS void
+AS 'MODULE_PATHNAME','dtm_end_prepare'
+LANGUAGE C;
+
+CREATE FUNCTION dtm_get_csn(xid integer) RETURNS bigint
+AS 'MODULE_PATHNAME','dtm_get_csn'
+LANGUAGE C;
diff --git a/contrib/pg_tsdtm/pg_tsdtm.c b/contrib/pg_tsdtm/pg_tsdtm.c
new file mode 100644
index 0000000..6dabe76
--- /dev/null
+++ b/contrib/pg_tsdtm/pg_tsdtm.c
@@ -0,0 +1,1021 @@
+/*
+ * pg_dtm.c
+ *
+ * Pluggable distributed transaction manager
+ *
+ */
+
+#include <unistd.h>
+#include <sys/time.h>
+#include <time.h>
+
+#include "postgres.h"
+#include "fmgr.h"
+#include "miscadmin.h"
+#include "storage/s_lock.h"
+#include "storage/spin.h"
+#include "storage/lmgr.h"
+#include "storage/shmem.h"
+#include "storage/ipc.h"
+#include "access/xlogdefs.h"
+#include "access/xact.h"
+#include "access/xtm.h"
+#include "access/transam.h"
+#include "access/subtrans.h"
+#include "access/xlog.h"
+#include "access/clog.h"
+#include "access/twophase.h"
+#include "executor/spi.h"
+#include "utils/hsearch.h"
+#include "utils/tqual.h"
+#include <utils/guc.h>
+
+#include "pg_tsdtm.h"
+
+#define DTM_HASH_INIT_SIZE 1000000
+#define INVALID_CID 0
+#define MIN_WAIT_TIMEOUT 1000
+#define MAX_WAIT_TIMEOUT 100000
+#define MAX_GTID_SIZE 16
+#define HASH_PER_ELEM_OVERHEAD 64
+
+#define USEC 1000000
+
+#define TRACE_SLEEP_TIME 1
+
+typedef uint64 timestamp_t;
+
+/* Distributed transaction state kept in shared memory */
+typedef struct DtmTransStatus
+{
+ TransactionId xid;
+ XidStatus status;
+ int nSubxids;
+ cid_t cid; /* CSN */
+ struct DtmTransStatus *next;/* pointer to next element in finished
+ * transaction list */
+} DtmTransStatus;
+
+/* State of DTM node */
+typedef struct
+{
+ cid_t cid; /* last assigned CSN; used to provide unique
+ * ascending CSNs */
+ TransactionId oldest_xid; /* XID of oldest transaction visible by any
+ * active transaction (local or global) */
+ long time_shift; /* correction to system time */
+ volatile slock_t lock; /* spinlock to protect access to hash table */
+ DtmTransStatus *trans_list_head; /* L1 list of finished transactions
+ * present in xid2status hash table.
+ * This list is used to perform
+ * cleanup of too old transactions */
+ DtmTransStatus **trans_list_tail;
+} DtmNodeState;
+
+/* Structure used to map global transaction identifier to XID */
+typedef struct
+{
+ char gtid[MAX_GTID_SIZE];
+ TransactionId xid;
+ TransactionId *subxids;
+ int nSubxids;
+} DtmTransId;
+
+
+#define DTM_TRACE(x)
+/* #define DTM_TRACE(x) fprintf x */
+
+static shmem_startup_hook_type prev_shmem_startup_hook;
+static HTAB *xid2status;
+static HTAB *gtid2xid;
+static DtmNodeState *local;
+static DtmCurrentTrans dtm_tx;
+static uint64 totalSleepInterrupts;
+static int DtmVacuumDelay;
+static bool DtmRecordCommits;
+
+static Snapshot DtmGetSnapshot(Snapshot snapshot);
+static TransactionId DtmGetOldestXmin(Relation rel, bool ignoreVacuum);
+static bool DtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot);
+static TransactionId DtmAdjustOldestXid(TransactionId xid);
+static bool DtmDetectGlobalDeadLock(PGPROC *proc);
+static cid_t DtmGetCsn(TransactionId xid);
+static void DtmAddSubtransactions(DtmTransStatus * ts, TransactionId *subxids, int nSubxids);
+static char const *DtmGetName(void);
+
+static TransactionManager DtmTM = {
+ PgTransactionIdGetStatus,
+ PgTransactionIdSetTreeStatus,
+ DtmGetSnapshot,
+ PgGetNewTransactionId,
+ DtmGetOldestXmin,
+ PgTransactionIdIsInProgress,
+ PgGetGlobalTransactionId,
+ DtmXidInMVCCSnapshot,
+ DtmDetectGlobalDeadLock,
+ DtmGetName
+};
+
+void _PG_init(void);
+void _PG_fini(void);
+
+
+static void dtm_shmem_startup(void);
+static Size dtm_memsize(void);
+static void dtm_xact_callback(XactEvent event, void *arg);
+static timestamp_t dtm_get_current_time();
+static void dtm_sleep(timestamp_t interval);
+static cid_t dtm_get_cid();
+static cid_t dtm_sync(cid_t cid);
+
+/*
+ * Time manipulation functions
+ */
+
+/* Get current time with microscond resolution */
+static timestamp_t
+dtm_get_current_time()
+{
+ struct timeval tv;
+
+ gettimeofday(&tv, NULL);
+ return (timestamp_t) tv.tv_sec * USEC + tv.tv_usec + local->time_shift;
+}
+
+/* Sleep for specified amount of time */
+static void
+dtm_sleep(timestamp_t interval)
+{
+ struct timespec ts;
+ struct timespec rem;
+
+ ts.tv_sec = 0;
+ ts.tv_nsec = interval * 1000;
+
+ while (nanosleep(&ts, &rem) < 0)
+ {
+ totalSleepInterrupts += 1;
+ Assert(errno == EINTR);
+ ts = rem;
+ }
+}
+
+/* Get unique ascending CSN.
+ * This function is called inside critical section
+ */
+static cid_t
+dtm_get_cid()
+{
+ cid_t cid = dtm_get_current_time();
+
+ if (cid <= local->cid)
+ {
+ cid = ++local->cid;
+ }
+ else
+ {
+ local->cid = cid;
+ }
+ return cid;
+}
+
+/*
+ * Adjust system time
+ */
+static cid_t
+dtm_sync(cid_t global_cid)
+{
+ cid_t local_cid;
+
+ while ((local_cid = dtm_get_cid()) < global_cid)
+ {
+ local->time_shift += global_cid - local_cid;
+ }
+ return local_cid;
+}
+
+void
+_PG_init(void)
+{
+ DTM_TRACE((stderr, "DTM_PG_init \n"));
+
+ /*
+ * In order to create our shared memory area, we have to be loaded via
+ * shared_preload_libraries. If not, fall out without hooking into any of
+ * the main system. (We don't throw error here because it seems useful to
+ * allow the pg_stat_statements functions to be created even when the
+ * module isn't active. The functions must protect themselves against
+ * being called then, however.)
+ */
+ if (!process_shared_preload_libraries_in_progress)
+ return;
+
+ RequestAddinShmemSpace(dtm_memsize());
+
+ DefineCustomIntVariable(
+ "dtm.vacuum_delay",
+ "Minimal age of records which can be vacuumed (seconds)",
+ NULL,
+ &DtmVacuumDelay,
+ 10,
+ 1,
+ INT_MAX,
+ PGC_BACKEND,
+ 0,
+ NULL,
+ NULL,
+ NULL
+ );
+
+ DefineCustomBoolVariable(
+ "dtm.record_commits",
+ "Store information about committed global transactions in pg_committed_xacts table",
+ NULL,
+ &DtmRecordCommits,
+ false,
+ PGC_BACKEND,
+ 0,
+ NULL,
+ NULL,
+ NULL
+ );
+
+
+ /*
+ * Install hooks.
+ */
+ prev_shmem_startup_hook = shmem_startup_hook;
+ shmem_startup_hook = dtm_shmem_startup;
+}
+
+/*
+ * Module unload callback
+ */
+void
+_PG_fini(void)
+{
+ /* Uninstall hooks. */
+ shmem_startup_hook = prev_shmem_startup_hook;
+}
+
+/*
+ * Estimate shared memory space needed.
+ */
+static Size
+dtm_memsize(void)
+{
+ Size size;
+
+ size = MAXALIGN(sizeof(DtmNodeState));
+ size = add_size(size, (sizeof(DtmTransId) + sizeof(DtmTransStatus) + HASH_PER_ELEM_OVERHEAD * 2) * DTM_HASH_INIT_SIZE);
+
+ return size;
+}
+
+
+/*
+ * shmem_startup hook: allocate or attach to shared memory,
+ * then load any pre-existing statistics from file.
+ * Also create and load the query-texts file, which is expected to exist
+ * (even if empty) while the module is enabled.
+ */
+static void
+dtm_shmem_startup(void)
+{
+ if (prev_shmem_startup_hook)
+ {
+ prev_shmem_startup_hook();
+ }
+ DtmInitialize();
+}
+
+static GlobalTransactionId
+dtm_get_global_trans_id()
+{
+ return GetLockedGlobalTransactionId();
+}
+
+static void
+dtm_xact_callback(XactEvent event, void *arg)
+{
+ DTM_TRACE((stderr, "Backend %d dtm_xact_callback %d\n", getpid(), event));
+ switch (event)
+ {
+ case XACT_EVENT_START:
+ DtmLocalBegin(&dtm_tx);
+ break;
+
+ case XACT_EVENT_ABORT:
+ DtmLocalAbort(&dtm_tx);
+ DtmLocalEnd(&dtm_tx);
+ break;
+
+ case XACT_EVENT_COMMIT:
+ DtmLocalCommit(&dtm_tx);
+ DtmLocalEnd(&dtm_tx);
+ break;
+
+ case XACT_EVENT_ABORT_PREPARED:
+ DtmLocalAbortPrepared(&dtm_tx, dtm_get_global_trans_id());
+ break;
+
+ case XACT_EVENT_COMMIT_PREPARED:
+ DtmLocalCommitPrepared(&dtm_tx, dtm_get_global_trans_id());
+ break;
+
+ case XACT_EVENT_PREPARE:
+ DtmLocalSavePreparedState(dtm_get_global_trans_id());
+ DtmLocalEnd(&dtm_tx);
+ break;
+
+ default:
+ break;
+ }
+}
+
+/*
+ * ***************************************************************************
+ */
+
+PG_MODULE_MAGIC;
+
+PG_FUNCTION_INFO_V1(dtm_extend);
+PG_FUNCTION_INFO_V1(dtm_access);
+PG_FUNCTION_INFO_V1(dtm_begin_prepare);
+PG_FUNCTION_INFO_V1(dtm_prepare);
+PG_FUNCTION_INFO_V1(dtm_end_prepare);
+PG_FUNCTION_INFO_V1(dtm_get_csn);
+
+Datum
+dtm_extend(PG_FUNCTION_ARGS)
+{
+ GlobalTransactionId gtid = PG_GETARG_CSTRING(0);
+ cid_t cid = DtmLocalExtend(&dtm_tx, gtid);
+
+ DTM_TRACE((stderr, "Backend %d extends transaction %u(%s) to global with cid=%lu\n", getpid(), dtm_tx.xid, gtid, cid));
+ PG_RETURN_INT64(cid);
+}
+
+Datum
+dtm_access(PG_FUNCTION_ARGS)
+{
+ cid_t cid = PG_GETARG_INT64(0);
+ GlobalTransactionId gtid = PG_GETARG_CSTRING(1);
+
+ DTM_TRACE((stderr, "Backend %d joins transaction %u(%s) with cid=%lu\n", getpid(), dtm_tx.xid, gtid, cid));
+ cid = DtmLocalAccess(&dtm_tx, gtid, cid);
+ PG_RETURN_INT64(cid);
+}
+
+Datum
+dtm_begin_prepare(PG_FUNCTION_ARGS)
+{
+ GlobalTransactionId gtid = PG_GETARG_CSTRING(0);
+
+ DtmLocalBeginPrepare(gtid);
+ DTM_TRACE((stderr, "Backend %d begins prepare of transaction %s\n", getpid(), gtid));
+ PG_RETURN_VOID();
+}
+
+Datum
+dtm_prepare(PG_FUNCTION_ARGS)
+{
+ GlobalTransactionId gtid = PG_GETARG_CSTRING(0);
+ cid_t cid = PG_GETARG_INT64(1);
+
+ cid = DtmLocalPrepare(gtid, cid);
+ DTM_TRACE((stderr, "Backend %d prepares transaction %s with cid=%lu\n", getpid(), gtid, cid));
+ PG_RETURN_INT64(cid);
+}
+
+Datum
+dtm_end_prepare(PG_FUNCTION_ARGS)
+{
+ GlobalTransactionId gtid = PG_GETARG_CSTRING(0);
+ cid_t cid = PG_GETARG_INT64(1);
+
+ DTM_TRACE((stderr, "Backend %d ends prepare of transactions %s with cid=%lu\n", getpid(), gtid, cid));
+ DtmLocalEndPrepare(gtid, cid);
+ PG_RETURN_VOID();
+}
+
+Datum
+dtm_get_csn(PG_FUNCTION_ARGS)
+{
+ TransactionId xid = PG_GETARG_INT32(0);
+ cid_t csn = DtmGetCsn(xid);
+
+ PG_RETURN_INT64(csn);
+}
+
+/*
+ * ***************************************************************************
+ */
+
+static uint32
+dtm_xid_hash_fn(const void *key, Size keysize)
+{
+ return (uint32) *(TransactionId *) key;
+}
+
+static int
+dtm_xid_match_fn(const void *key1, const void *key2, Size keysize)
+{
+ return *(TransactionId *) key1 - *(TransactionId *) key2;
+}
+
+static uint32
+dtm_gtid_hash_fn(const void *key, Size keysize)
+{
+ GlobalTransactionId id = (GlobalTransactionId) key;
+ uint32 h = 0;
+
+ while (*id != 0)
+ {
+ h = h * 31 + *id++;
+ }
+ return h;
+}
+
+static void *
+dtm_gtid_keycopy_fn(void *dest, const void *src, Size keysize)
+{
+ return strcpy((char *) dest, (GlobalTransactionId) src);
+}
+
+static int
+dtm_gtid_match_fn(const void *key1, const void *key2, Size keysize)
+{
+ return strcmp((GlobalTransactionId) key1, (GlobalTransactionId) key2);
+}
+
+static char const *
+DtmGetName(void)
+{
+ return "pg_tsdtm";
+}
+
+static void
+DtmTransactionListAppend(DtmTransStatus * ts)
+{
+ ts->next = NULL;
+ *local->trans_list_tail = ts;
+ local->trans_list_tail = &ts->next;
+}
+
+static void
+DtmTransactionListInsertAfter(DtmTransStatus * after, DtmTransStatus * ts)
+{
+ ts->next = after->next;
+ after->next = ts;
+ if (local->trans_list_tail == &after->next)
+ {
+ local->trans_list_tail = &ts->next;
+ }
+}
+
+/*
+ * There can be different oldest XIDs at different cluster node.
+ * Seince we do not have centralized aribiter, we have to rely in DtmVacuumDelay.
+ * This function takes XID which PostgreSQL consider to be the latest and try to find XID which
+ * is older than it more than DtmVacuumDelay.
+ * If no such XID can be located, then return previously observed oldest XID
+ */
+static TransactionId
+DtmAdjustOldestXid(TransactionId xid)
+{
+ if (TransactionIdIsValid(xid))
+ {
+ DtmTransStatus *ts,
+ *prev = NULL;
+ timestamp_t now = dtm_get_current_time();
+ timestamp_t cutoff_time = now - DtmVacuumDelay * USEC;
+
+ SpinLockAcquire(&local->lock);
+ ts = (DtmTransStatus *) hash_search(xid2status, &xid, HASH_FIND, NULL);
+ if (ts != NULL)
+ {
+ cutoff_time = ts->cid - DtmVacuumDelay * USEC;
+
+ for (ts = local->trans_list_head; ts != NULL && ts->cid < cutoff_time; prev = ts, ts = ts->next)
+ {
+ if (prev != NULL)
+ hash_search(xid2status, &prev->xid, HASH_REMOVE, NULL);
+ }
+ }
+ if (prev != NULL)
+ {
+ local->trans_list_head = prev;
+ local->oldest_xid = xid = prev->xid;
+ }
+ else
+ {
+ xid = local->oldest_xid;
+ }
+ SpinLockRelease(&local->lock);
+ }
+ return xid;
+}
+
+Snapshot
+DtmGetSnapshot(Snapshot snapshot)
+{
+ snapshot = PgGetSnapshotData(snapshot);
+ RecentGlobalDataXmin = RecentGlobalXmin = DtmAdjustOldestXid(RecentGlobalDataXmin);
+ return snapshot;
+}
+
+TransactionId
+DtmGetOldestXmin(Relation rel, bool ignoreVacuum)
+{
+ TransactionId xmin = PgGetOldestXmin(rel, ignoreVacuum);
+
+ xmin = DtmAdjustOldestXid(xmin);
+ return xmin;
+}
+
+/*
+ * Check tuple bisibility based on CSN of current transaction.
+ * If there is no niformation about transaction with this XID, then use standard PostgreSQL visibility rules.
+ */
+bool
+DtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
+{
+ timestamp_t delay = MIN_WAIT_TIMEOUT;
+
+ Assert(xid != InvalidTransactionId);
+
+ SpinLockAcquire(&local->lock);
+
+ while (true)
+ {
+ DtmTransStatus *ts = (DtmTransStatus *) hash_search(xid2status, &xid, HASH_FIND, NULL);
+
+ if (ts != NULL)
+ {
+ if (ts->cid > dtm_tx.snapshot)
+ {
+ DTM_TRACE((stderr, "%d: tuple with xid=%d(csn=%lld) is invisibile in snapshot %lld\n",
+ getpid(), xid, ts->cid, dtm_tx.snapshot));
+ SpinLockRelease(&local->lock);
+ return true;
+ }
+ if (ts->status == TRANSACTION_STATUS_IN_PROGRESS)
+ {
+ DTM_TRACE((stderr, "%d: wait for in-doubt transaction %u in snapshot %lu\n", getpid(), xid, dtm_tx.snapshot));
+ SpinLockRelease(&local->lock);
+
+ dtm_sleep(delay);
+
+ if (delay * 2 <= MAX_WAIT_TIMEOUT)
+ delay *= 2;
+ SpinLockAcquire(&local->lock);
+ }
+ else
+ {
+ bool invisible = ts->status == TRANSACTION_STATUS_ABORTED;
+
+ DTM_TRACE((stderr, "%d: tuple with xid=%d(csn= %lld) is %s in snapshot %lld\n",
+ getpid(), xid, ts->cid, invisible ? "rollbacked" : "committed", dtm_tx.snapshot));
+ SpinLockRelease(&local->lock);
+ return invisible;
+ }
+ }
+ else
+ {
+ DTM_TRACE((stderr, "%d: visibility check is skept for transaction %u in snapshot %lu\n", getpid(), xid, dtm_tx.snapshot));
+ break;
+ }
+ }
+ SpinLockRelease(&local->lock);
+ return PgXidInMVCCSnapshot(xid, snapshot);
+}
+
+void
+DtmInitialize()
+{
+ bool found;
+ static HASHCTL info;
+
+ info.keysize = sizeof(TransactionId);
+ info.entrysize = sizeof(DtmTransStatus);
+ info.hash = dtm_xid_hash_fn;
+ info.match = dtm_xid_match_fn;
+ xid2status = ShmemInitHash("xid2status",
+ DTM_HASH_INIT_SIZE, DTM_HASH_INIT_SIZE,
+ &info,
+ HASH_ELEM | HASH_FUNCTION | HASH_COMPARE);
+
+ info.keysize = MAX_GTID_SIZE;
+ info.entrysize = sizeof(DtmTransId);
+ info.hash = dtm_gtid_hash_fn;
+ info.match = dtm_gtid_match_fn;
+ info.keycopy = dtm_gtid_keycopy_fn;
+ gtid2xid = ShmemInitHash("gtid2xid",
+ DTM_HASH_INIT_SIZE, DTM_HASH_INIT_SIZE,
+ &info,
+ HASH_ELEM | HASH_FUNCTION | HASH_COMPARE | HASH_KEYCOPY);
+
+ TM = &DtmTM;
+
+ LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
+ local = (DtmNodeState *) ShmemInitStruct("dtm", sizeof(DtmNodeState), &found);
+ if (!found)
+ {
+ local->time_shift = 0;
+ local->oldest_xid = FirstNormalTransactionId;
+ local->cid = dtm_get_current_time();
+ local->trans_list_head = NULL;
+ local->trans_list_tail = &local->trans_list_head;
+ SpinLockInit(&local->lock);
+ RegisterXactCallback(dtm_xact_callback, NULL);
+ }
+ LWLockRelease(AddinShmemInitLock);
+}
+
+/*
+ * Start transaction at local node.
+ * Associate local snapshot (current time) with this transaction.
+ */
+void
+DtmLocalBegin(DtmCurrentTrans * x)
+{
+ if (!TransactionIdIsValid(x->xid))
+ {
+ SpinLockAcquire(&local->lock);
+ x->xid = GetCurrentTransactionId();
+ Assert(TransactionIdIsValid(x->xid));
+ x->cid = INVALID_CID;
+ x->is_global = false;
+ x->is_prepared = false;
+ x->snapshot = dtm_get_cid();
+ SpinLockRelease(&local->lock);
+ DTM_TRACE((stderr, "DtmLocalBegin: transaction %u uses local snapshot %lu\n", x->xid, x->snapshot));
+ }
+}
+
+/*
+ * Transaction is going to be distributed.
+ * Returns snapshot of current transaction.
+ */
+cid_t
+DtmLocalExtend(DtmCurrentTrans * x, GlobalTransactionId gtid)
+{
+ if (gtid != NULL)
+ {
+ SpinLockAcquire(&local->lock);
+ {
+ DtmTransId *id = (DtmTransId *) hash_search(gtid2xid, gtid, HASH_ENTER, NULL);
+
+ id->xid = x->xid;
+ id->nSubxids = 0;
+ id->subxids = 0;
+ }
+ SpinLockRelease(&local->lock);
+ }
+ x->is_global = true;
+ return x->snapshot;
+}
+
+/*
+ * This function is executed on all nodes joining distributed transaction.
+ * global_cid is snapshot taken from node initiated this transaction
+ */
+cid_t
+DtmLocalAccess(DtmCurrentTrans * x, GlobalTransactionId gtid, cid_t global_cid)
+{
+ cid_t local_cid;
+
+ SpinLockAcquire(&local->lock);
+ {
+ if (gtid != NULL)
+ {
+ DtmTransId *id = (DtmTransId *) hash_search(gtid2xid, gtid, HASH_ENTER, NULL);
+
+ id->xid = x->xid;
+ id->nSubxids = 0;
+ id->subxids = 0;
+ }
+ local_cid = dtm_sync(global_cid);
+ x->snapshot = global_cid;
+ x->is_global = true;
+ }
+ SpinLockRelease(&local->lock);
+ if (global_cid < local_cid - DtmVacuumDelay * USEC)
+ {
+ elog(ERROR, "Too old snapshot: requested %ld, current %ld", global_cid, local_cid);
+ }
+ return global_cid;
+}
+
+/*
+ * Set transaction status to in-doubt. Now all transactions accessing tuples updated by this transaction have to
+ * wait until it is either committed either aborted
+ */
+void
+DtmLocalBeginPrepare(GlobalTransactionId gtid)
+{
+ SpinLockAcquire(&local->lock);
+ {
+ DtmTransStatus *ts;
+ DtmTransId *id;
+
+ id = (DtmTransId *) hash_search(gtid2xid, gtid, HASH_FIND, NULL);
+ Assert(id != NULL);
+ Assert(TransactionIdIsValid(id->xid));
+ ts = (DtmTransStatus *) hash_search(xid2status, &id->xid, HASH_ENTER, NULL);
+ ts->status = TRANSACTION_STATUS_IN_PROGRESS;
+ ts->cid = dtm_get_cid();
+ ts->nSubxids = id->nSubxids;
+ DtmTransactionListAppend(ts);
+ DtmAddSubtransactions(ts, id->subxids, id->nSubxids);
+ }
+ SpinLockRelease(&local->lock);
+}
+
+/*
+ * Choose maximal CSN among all nodes.
+ * This function returns maximum of passed (global) and local (current time) CSNs.
+ */
+cid_t
+DtmLocalPrepare(GlobalTransactionId gtid, cid_t global_cid)
+{
+ cid_t local_cid;
+
+ SpinLockAcquire(&local->lock);
+ local_cid = dtm_get_cid();
+ if (local_cid > global_cid)
+ {
+ global_cid = local_cid;
+ }
+ SpinLockRelease(&local->lock);
+ return global_cid;
+}
+
+/*
+ * Adjust system tiem according to the received maximal CSN
+ */
+void
+DtmLocalEndPrepare(GlobalTransactionId gtid, cid_t cid)
+{
+ SpinLockAcquire(&local->lock);
+ {
+ DtmTransStatus *ts;
+ DtmTransId *id;
+ int i;
+
+ id = (DtmTransId *) hash_search(gtid2xid, gtid, HASH_FIND, NULL);
+ Assert(id != NULL);
+
+ ts = (DtmTransStatus *) hash_search(xid2status, &id->xid, HASH_FIND, NULL);
+ Assert(ts != NULL);
+ ts->cid = cid;
+ for (i = 0; i < ts->nSubxids; i++)
+ {
+ ts = ts->next;
+ ts->cid = cid;
+ }
+ dtm_sync(cid);
+
+ DTM_TRACE((stderr, "Prepare transaction %u(%s) with CSN %lu\n", id->xid, gtid, cid));
+ }
+ SpinLockRelease(&local->lock);
+
+ /*
+ * Record commit in pg_committed_xact table to be make it possible to
+ * perform recovery in case of crash of some of cluster nodes
+ */
+ if (DtmRecordCommits)
+ {
+ char stmt[MAX_GTID_SIZE + 64];
+ int rc;
+
+ sprintf(stmt, "insert into pg_committed_xacts values ('%s')", gtid);
+ SPI_connect();
+ rc = SPI_execute(stmt, true, 0);
+ SPI_finish();
+ if (rc != SPI_OK_INSERT)
+ {
+ elog(ERROR, "Failed to insert GTID %s in table pg_committed_xacts", gtid);
+ }
+ }
+}
+
+/*
+ * Mark tranasction as prepared
+ */
+void
+DtmLocalCommitPrepared(DtmCurrentTrans * x, GlobalTransactionId gtid)
+{
+ Assert(gtid != NULL);
+
+ SpinLockAcquire(&local->lock);
+ {
+ DtmTransId *id = (DtmTransId *) hash_search(gtid2xid, gtid, HASH_REMOVE, NULL);
+
+ Assert(id != NULL);
+
+ x->is_global = true;
+ x->is_prepared = true;
+ x->xid = id->xid;
+ free(id->subxids);
+
+ DTM_TRACE((stderr, "Global transaction %u(%s) is precommitted\n", x->xid, gtid));
+ }
+ SpinLockRelease(&local->lock);
+}
+
+/*
+ * Set transaction status to committed
+ */
+void
+DtmLocalCommit(DtmCurrentTrans * x)
+{
+ SpinLockAcquire(&local->lock);
+ if (TransactionIdIsValid(x->xid))
+ {
+ bool found;
+ DtmTransStatus *ts;
+
+ ts = (DtmTransStatus *) hash_search(xid2status, &x->xid, HASH_ENTER, &found);
+ ts->status = TRANSACTION_STATUS_COMMITTED;
+ if (x->is_prepared)
+ {
+ int i;
+ DtmTransStatus *sts = ts;
+
+ Assert(found);
+ Assert(x->is_global);
+ for (i = 0; i < ts->nSubxids; i++)
+ {
+ sts = sts->next;
+ Assert(sts->cid == ts->cid);
+ sts->status = TRANSACTION_STATUS_COMMITTED;
+ }
+ }
+ else
+ {
+ TransactionId *subxids;
+
+ Assert(!found);
+ ts->cid = dtm_get_cid();
+ DtmTransactionListAppend(ts);
+ ts->nSubxids = xactGetCommittedChildren(&subxids);
+ DtmAddSubtransactions(ts, subxids, ts->nSubxids);
+ }
+ x->cid = ts->cid;
+ DTM_TRACE((stderr, "Local transaction %u is committed at %lu\n", x->xid, x->cid));
+ }
+ SpinLockRelease(&local->lock);
+}
+
+/*
+ * Mark tranasction as prepared
+ */
+void
+DtmLocalAbortPrepared(DtmCurrentTrans * x, GlobalTransactionId gtid)
+{
+ Assert(gtid != NULL);
+
+ SpinLockAcquire(&local->lock);
+ {
+ DtmTransId *id = (DtmTransId *) hash_search(gtid2xid, gtid, HASH_REMOVE, NULL);
+
+ Assert(id != NULL);
+
+ x->is_global = true;
+ x->is_prepared = true;
+ x->xid = id->xid;
+ free(id->subxids);
+
+ DTM_TRACE((stderr, "Global transaction %u(%s) is preaborted\n", x->xid, gtid));
+ }
+ SpinLockRelease(&local->lock);
+}
+
+/*
+ * Set transaction status to aborted
+ */
+void
+DtmLocalAbort(DtmCurrentTrans * x)
+{
+ SpinLockAcquire(&local->lock);
+ {
+ bool found;
+ DtmTransStatus *ts;
+
+ Assert(TransactionIdIsValid(x->xid));
+ ts = (DtmTransStatus *) hash_search(xid2status, &x->xid, HASH_ENTER, &found);
+ if (x->is_prepared)
+ {
+ Assert(found);
+ Assert(x->is_global);
+ }
+ else
+ {
+ Assert(!found);
+ ts->cid = dtm_get_cid();
+ ts->nSubxids = 0;
+ DtmTransactionListAppend(ts);
+ }
+ x->cid = ts->cid;
+ ts->status = TRANSACTION_STATUS_ABORTED;
+ DTM_TRACE((stderr, "Local transaction %u is aborted at %lu\n", x->xid, x->cid));
+ }
+ SpinLockRelease(&local->lock);
+}
+
+/*
+ * Cleanup dtm_tx structure
+ */
+void
+DtmLocalEnd(DtmCurrentTrans * x)
+{
+ x->is_global = false;
+ x->is_prepared = false;
+ x->xid = InvalidTransactionId;
+ x->cid = INVALID_CID;
+}
+
+/*
+ * Now only timestapm based dealock detection is supported for pg_tsdtm.
+ * Please adjust "deadlock_timeout" parameter in postresql.conf to avoid false
+ * deadlock detection.
+ */
+bool
+DtmDetectGlobalDeadLock(PGPROC *proc)
+{
+ elog(WARNING, "Global deadlock?");
+ return true;
+}
+
+static cid_t
+DtmGetCsn(TransactionId xid)
+{
+ cid_t csn = 0;
+
+ SpinLockAcquire(&local->lock);
+ {
+ DtmTransStatus *ts = (DtmTransStatus *) hash_search(xid2status, &xid, HASH_FIND, NULL);
+
+ if (ts != NULL)
+ {
+ csn = ts->cid;
+ }
+ }
+ SpinLockRelease(&local->lock);
+ return csn;
+}
+
+/*
+ * Save state of parepared transaction
+ */
+void
+DtmLocalSavePreparedState(GlobalTransactionId gtid)
+{
+ if (gtid != NULL)
+ {
+ SpinLockAcquire(&local->lock);
+ {
+ DtmTransId *id = (DtmTransId *) hash_search(gtid2xid, gtid, HASH_FIND, NULL);
+
+ if (id != NULL)
+ {
+ TransactionId *subxids;
+ int nSubxids = xactGetCommittedChildren(&subxids);
+
+ if (nSubxids != 0)
+ {
+ id->subxids = (TransactionId *) malloc(nSubxids * sizeof(TransactionId));
+ id->nSubxids = nSubxids;
+ memcpy(id->subxids, subxids, nSubxids * sizeof(TransactionId));
+ }
+ }
+ }
+ SpinLockRelease(&local->lock);
+ }
+}
+
+/*
+ * Add subtransactions to finished transactions list.
+ * Copy CSN and status of parent transaction.
+ */
+static void
+DtmAddSubtransactions(DtmTransStatus * ts, TransactionId *subxids, int nSubxids)
+{
+ int i;
+
+ for (i = 0; i < nSubxids; i++)
+ {
+ bool found;
+ DtmTransStatus *sts;
+
+ Assert(TransactionIdIsValid(subxids[i]));
+ sts = (DtmTransStatus *) hash_search(xid2status, &subxids[i], HASH_ENTER, &found);
+ Assert(!found);
+ sts->status = ts->status;
+ sts->cid = ts->cid;
+ sts->nSubxids = 0;
+ DtmTransactionListInsertAfter(ts, sts);
+ }
+}
diff --git a/contrib/pg_tsdtm/pg_tsdtm.control b/contrib/pg_tsdtm/pg_tsdtm.control
new file mode 100644
index 0000000..f9b8215
--- /dev/null
+++ b/contrib/pg_tsdtm/pg_tsdtm.control
@@ -0,0 +1,4 @@
+comment = 'Pluggable distributed transaction manager'
+default_version = '1.0'
+module_pathname = '$libdir/pg_tsdtm'
+relocatable = true
\ No newline at end of file
diff --git a/contrib/pg_tsdtm/pg_tsdtm.h b/contrib/pg_tsdtm/pg_tsdtm.h
new file mode 100644
index 0000000..467038a
--- /dev/null
+++ b/contrib/pg_tsdtm/pg_tsdtm.h
@@ -0,0 +1,57 @@
+#ifndef DTM_BACKEND_H
+#define DTM_BACKEND_H
+
+typedef int nodeid_t;
+typedef uint64 cid_t;
+
+typedef struct
+{
+ TransactionId xid;
+ bool is_global;
+ bool is_prepared;
+ cid_t cid;
+ cid_t snapshot;
+} DtmCurrentTrans;
+
+typedef char const *GlobalTransactionId;
+
+/* Initialize DTM extension */
+void DtmInitialize(void);
+
+/* Invoked at start of any local or global transaction */
+void DtmLocalBegin(DtmCurrentTrans * x);
+
+/* Extend local transaction to global by assigning upper bound CSN which is returned to coordinator */
+cid_t DtmLocalExtend(DtmCurrentTrans * x, GlobalTransactionId gtid);
+
+/* Function called at first access to any datanode except first one involved in distributed transaction */
+cid_t DtmLocalAccess(DtmCurrentTrans * x, GlobalTransactionId gtid, cid_t snapshot);
+
+/* Mark transaction as in-doubt */
+void DtmLocalBeginPrepare(GlobalTransactionId gtid);
+
+/* Choose CSN for global transaction */
+cid_t DtmLocalPrepare(GlobalTransactionId gtid, cid_t cid);
+
+/* Assign CSN to global transaction */
+void DtmLocalEndPrepare(GlobalTransactionId gtid, cid_t cid);
+
+/* Do local commit of global transaction */
+void DtmLocalCommitPrepared(DtmCurrentTrans * x, GlobalTransactionId gtid);
+
+/* Do local abort of global transaction */
+void DtmLocalAbortPrepared(DtmCurrentTrans * x, GlobalTransactionId gtid);
+
+/* Do local commit of global transaction */
+void DtmLocalCommit(DtmCurrentTrans * x);
+
+/* Do local abort of global transaction */
+void DtmLocalAbort(DtmCurrentTrans * x);
+
+/* Invoked at the end of any local or global transaction: free transaction state */
+void DtmLocalEnd(DtmCurrentTrans * x);
+
+/* Save global preapred transactoin state */
+void DtmLocalSavePreparedState(GlobalTransactionId gtid);
+
+#endif
diff --git a/contrib/pg_tsdtm/t/001_distributed_transactions.pl b/contrib/pg_tsdtm/t/001_distributed_transactions.pl
new file mode 100644
index 0000000..b34895b
--- /dev/null
+++ b/contrib/pg_tsdtm/t/001_distributed_transactions.pl
@@ -0,0 +1,133 @@
+###############################################################################
+# Test of proper transaction isolation.
+###############################################################################
+
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 1;
+use DBI();
+use DBD::Pg();
+
+sub query_row
+{
+ my ($dbi, $sql, @keys) = @_;
+ my $sth = $dbi->prepare($sql) || die;
+ $sth->execute(@keys) || die;
+ my $ret = $sth->fetchrow_array;
+ print "query_row('$sql') -> $ret \n";
+ return $ret;
+}
+
+sub query_exec
+{
+ my ($dbi, $sql) = @_;
+ print "query_exec('$sql')\n";
+ my $rv = $dbi->do($sql) || die;
+ return $rv;
+}
+
+sub PostgresNode::psql_ok {
+ my ($self, $sql, $comment) = @_;
+
+ $self->command_ok(['psql', '-A', '-t', '--no-psqlrc',
+ '-d', $self->connstr, '-c', $sql], $comment);
+}
+
+sub PostgresNode::psql_fails {
+ my ($self, $sql, $comment) = @_;
+
+ $self->command_ok(['psql', '-A', '-t', '--no-psqlrc',
+ '-d', $self->connstr, '-c', $sql], $comment);
+}
+
+###############################################################################
+# Setup nodes
+###############################################################################
+
+# Setup first node
+my $node1 = get_new_node("node1");
+$node1->init;
+$node1->append_conf('postgresql.conf', qq(
+max_prepared_transactions = 10
+shared_preload_libraries = 'pg_tsdtm'
+));
+$node1->start;
+
+# Setup second node
+my $node2 = get_new_node("node2");
+$node2->init;
+$node2->append_conf('postgresql.conf', qq(
+max_prepared_transactions = 10
+shared_preload_libraries = 'pg_tsdtm'
+));
+$node2->start;
+
+$node1->psql('postgres', "create extension pg_tsdtm;");
+$node1->psql('postgres', "create table t(u int primary key, v int)");
+$node1->psql('postgres', "insert into t (select generate_series(0, 9), 0)");
+
+$node2->psql('postgres', "create extension pg_tsdtm;");
+$node2->psql('postgres', "create table t(u int primary key, v int)");
+$node2->psql('postgres', "insert into t (select generate_series(0, 9), 0)");
+
+# we need two connections to each node (run two simultameous global tx)
+my $conn11 = DBI->connect('DBI:Pg:' . $node1->connstr('postgres'));
+my $conn21 = DBI->connect('DBI:Pg:' . $node2->connstr('postgres'));
+my $conn12 = DBI->connect('DBI:Pg:' . $node1->connstr('postgres'));
+my $conn22 = DBI->connect('DBI:Pg:' . $node2->connstr('postgres'));
+
+sub count_total
+{
+ my ($c1, $c2) = @_;
+
+ query_exec($c1, "begin");
+ query_exec($c2, "begin");
+
+ my $snapshot = query_row($c1, "select dtm_extend()");
+ query_row($c2, "select dtm_access($snapshot)");
+
+ my $sum1 = query_row($c1, "select sum(v) from t");
+ my $sum2 = query_row($c2, "select sum(v) from t");
+
+ query_exec($c1, "commit");
+ query_exec($c2, "commit");
+
+ my $tot = $sum1 + $sum2;
+
+ print "total = $tot\n";
+ return $tot;
+}
+
+###############################################################################
+# Sanity check on dirty reads
+###############################################################################
+
+my $gtid1 = "gtx1";
+
+# start global tx
+query_exec($conn11, "begin transaction");
+query_exec($conn21, "begin transaction");
+my $snapshot = query_row($conn11, "select dtm_extend('$gtid1')");
+query_exec($conn21, "select dtm_access($snapshot, '$gtid1')");
+
+# transfer some amount of integers to different node
+query_exec($conn11, "update t set v = v - 10 where u=1");
+my $intermediate_total = count_total($conn12, $conn22);
+query_exec($conn21, "update t set v = v + 10 where u=2");
+
+# commit our global tx
+query_exec($conn11, "prepare transaction '$gtid1'");
+query_exec($conn21, "prepare transaction '$gtid1'");
+query_exec($conn11, "select dtm_begin_prepare('$gtid1')");
+query_exec($conn21, "select dtm_begin_prepare('$gtid1')");
+my $csn = query_row($conn11, "select dtm_prepare('$gtid1', 0)");
+query_exec($conn21, "select dtm_prepare('$gtid1', $csn)");
+query_exec($conn11, "select dtm_end_prepare('$gtid1', $csn)");
+query_exec($conn21, "select dtm_end_prepare('$gtid1', $csn)");
+query_exec($conn11, "commit prepared '$gtid1'");
+query_exec($conn21, "commit prepared '$gtid1'");
+
+is($intermediate_total, 0, "Check for absence of dirty reads");
+
postgres_fdw.patchtext/x-patch; name=postgres_fdw.patchDownload
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 3df86d1..1f36ffb 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -15,6 +15,8 @@
#include "postgres_fdw.h"
#include "access/xact.h"
+#include "access/xtm.h"
+#include "access/transam.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "utils/hsearch.h"
@@ -61,11 +63,17 @@ static unsigned int prep_stmt_number = 0;
/* tracks whether any work is needed in callback functions */
static bool xact_got_connection = false;
+typedef long long csn_t;
+static csn_t currentGlobalTransactionId = 0;
+static int currentLocalTransactionId = 0;
+
/* prototypes of private functions */
static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
static void check_conn_params(const char **keywords, const char **values);
static void configure_remote_session(PGconn *conn);
static void do_sql_command(PGconn *conn, const char *sql);
+static void do_sql_send_command(PGconn *conn, const char *sql);
+static void do_sql_wait_command(PGconn *conn, const char *sql);
static void begin_remote_xact(ConnCacheEntry *entry);
static void pgfdw_xact_callback(XactEvent event, void *arg);
static void pgfdw_subxact_callback(SubXactEvent event,
@@ -357,6 +365,32 @@ do_sql_command(PGconn *conn, const char *sql)
PQclear(res);
}
+static void
+do_sql_send_command(PGconn *conn, const char *sql)
+{
+ if (PQsendQuery(conn, sql) != PGRES_COMMAND_OK)
+ {
+ PGresult *res = PQgetResult(conn);
+
+ elog(WARNING, "Failed to send command %s", sql);
+ pgfdw_report_error(ERROR, res, conn, true, sql);
+ PQclear(res);
+ }
+}
+
+static void
+do_sql_wait_command(PGconn *conn, const char *sql)
+{
+ PGresult *res;
+
+ while ((res = PQgetResult(conn)) != NULL)
+ {
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pgfdw_report_error(ERROR, res, conn, true, sql);
+ PQclear(res);
+ }
+}
+
/*
* Start remote transaction or subtransaction, if needed.
*
@@ -375,17 +409,58 @@ begin_remote_xact(ConnCacheEntry *entry)
/* Start main transaction if we haven't yet */
if (entry->xact_depth <= 0)
{
+ TransactionId gxid = GetTransactionManager()->GetGlobalTransactionId();
const char *sql;
elog(DEBUG3, "starting remote transaction on connection %p",
entry->conn);
+ if (TransactionIdIsValid(gxid))
+ {
+ char stmt[64];
+ PGresult *res;
+
+ snprintf(stmt, sizeof(stmt), "select public.dtm_join_transaction(%d)", gxid);
+ res = PQexec(entry->conn, stmt);
+ PQclear(res);
+ }
+
if (IsolationIsSerializable())
sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
else
sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
do_sql_command(entry->conn, sql);
entry->xact_depth = 1;
+ if (UseTsDtmTransactions)
+ {
+ if (!currentGlobalTransactionId)
+ {
+ PGresult *res = PQexec(entry->conn, psprintf("SELECT public.dtm_extend('%d.%d')",
+ MyProcPid, ++currentLocalTransactionId));
+ char *resp;
+
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ pgfdw_report_error(ERROR, res, entry->conn, true, sql);
+ }
+ resp = PQgetvalue(res, 0, 0);
+ if (resp == NULL || (*resp) == '\0' || sscanf(resp, "%lld", ¤tGlobalTransactionId) != 1)
+ {
+ pgfdw_report_error(ERROR, res, entry->conn, true, sql);
+ }
+ PQclear(res);
+ }
+ else
+ {
+ PGresult *res = PQexec(entry->conn, psprintf("SELECT public.dtm_access(%llu, '%d.%d')", currentGlobalTransactionId, MyProcPid, currentLocalTransactionId));
+
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ pgfdw_report_error(ERROR, res, entry->conn, true, sql);
+ }
+ PQclear(res);
+ }
+ }
}
/*
@@ -511,6 +586,78 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
PQclear(res);
}
+typedef bool (*DtmCommandResultHandler) (PGresult *result, void *arg);
+
+static bool
+RunDtmStatement(char const * sql, unsigned expectedStatus, DtmCommandResultHandler handler, void *arg)
+{
+ HASH_SEQ_STATUS scan;
+ ConnCacheEntry *entry;
+ bool allOk = true;
+
+ hash_seq_init(&scan, ConnectionHash);
+ while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+ {
+ if (entry->xact_depth > 0)
+ {
+ do_sql_send_command(entry->conn, sql);
+ }
+ }
+
+ hash_seq_init(&scan, ConnectionHash);
+ while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+ {
+ if (entry->xact_depth > 0)
+ {
+ PGresult *result = PQgetResult(entry->conn);
+
+ if (PQresultStatus(result) != expectedStatus || (handler && !handler(result, arg)))
+ {
+ elog(WARNING, "Failed command %s: status=%d, expected status=%d", sql, PQresultStatus(result), expectedStatus);
+ pgfdw_report_error(ERROR, result, entry->conn, true, sql);
+ allOk = false;
+ }
+ PQclear(result);
+ PQgetResult(entry->conn); /* consume NULL result */
+ }
+ }
+ return allOk;
+}
+
+static bool
+RunDtmCommand(char const * sql)
+{
+ return RunDtmStatement(sql, PGRES_COMMAND_OK, NULL, NULL);
+}
+
+static bool
+RunDtmFunction(char const * sql)
+{
+ return RunDtmStatement(sql, PGRES_TUPLES_OK, NULL, NULL);
+}
+
+
+static bool
+DtmMaxCSN(PGresult *result, void *arg)
+{
+ char *resp = PQgetvalue(result, 0, 0);
+ csn_t *maxCSN = (csn_t *) arg;
+ csn_t csn = 0;
+
+ if (resp == NULL || (*resp) == '\0' || sscanf(resp, "%lld", &csn) != 1)
+ {
+ return false;
+ }
+ else
+ {
+ if (*maxCSN < csn)
+ {
+ *maxCSN = csn;
+ }
+ return true;
+ }
+}
+
/*
* pgfdw_xact_callback --- cleanup at main-transaction end.
*/
@@ -524,6 +671,40 @@ pgfdw_xact_callback(XactEvent event, void *arg)
if (!xact_got_connection)
return;
+ if (currentGlobalTransactionId != 0)
+ {
+ switch (event)
+ {
+ case XACT_EVENT_PARALLEL_PRE_COMMIT:
+ case XACT_EVENT_PRE_COMMIT:
+ {
+ csn_t maxCSN = 0;
+
+ if (!RunDtmCommand(psprintf("PREPARE TRANSACTION '%d.%d'",
+ MyProcPid, currentLocalTransactionId)) ||
+ !RunDtmFunction(psprintf("SELECT public.dtm_begin_prepare('%d.%d')",
+ MyProcPid, currentLocalTransactionId)) ||
+ !RunDtmStatement(psprintf("SELECT public.dtm_prepare('%d.%d',0)",
+ MyProcPid, currentLocalTransactionId), PGRES_TUPLES_OK, DtmMaxCSN, &maxCSN) ||
+ !RunDtmFunction(psprintf("SELECT public.dtm_end_prepare('%d.%d',%lld)",
+ MyProcPid, currentLocalTransactionId, maxCSN)) ||
+ !RunDtmCommand(psprintf("COMMIT PREPARED '%d.%d'",
+ MyProcPid, currentLocalTransactionId)))
+ {
+ RunDtmCommand(psprintf("ROLLBACK PREPARED '%d.%d'",
+ MyProcPid, currentLocalTransactionId));
+ ereport(ERROR,
+ (errcode(ERRCODE_TRANSACTION_ROLLBACK),
+ errmsg("transaction was aborted at one of the shards")));
+ break;
+ }
+ return;
+ }
+ default:
+ break;
+ }
+ }
+
/*
* Scan all connection cache entries to find open remote transactions, and
* close them.
@@ -540,15 +721,40 @@ pgfdw_xact_callback(XactEvent event, void *arg)
/* If it has an open remote transaction, try to close it */
if (entry->xact_depth > 0)
{
- elog(DEBUG3, "closing remote transaction on connection %p",
- entry->conn);
+ elog(DEBUG3, "closing remote transaction on connection %p event %d",
+ entry->conn, event);
switch (event)
{
case XACT_EVENT_PARALLEL_PRE_COMMIT:
case XACT_EVENT_PRE_COMMIT:
/* Commit all remote transactions during pre-commit */
- do_sql_command(entry->conn, "COMMIT TRANSACTION");
+ do_sql_send_command(entry->conn, "COMMIT TRANSACTION");
+ continue;
+
+ case XACT_EVENT_PRE_PREPARE:
+
+ /*
+ * We disallow remote transactions that modified anything,
+ * since it's not very reasonable to hold them open until
+ * the prepared transaction is committed. For the moment,
+ * throw error unconditionally; later we might allow
+ * read-only cases. Note that the error will cause us to
+ * come right back here with event == XACT_EVENT_ABORT, so
+ * we'll clean up the connection state at that point.
+ */
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot prepare a transaction that modified remote tables")));
+ break;
+
+ case XACT_EVENT_PARALLEL_COMMIT:
+ case XACT_EVENT_COMMIT:
+ case XACT_EVENT_PREPARE:
+ if (!currentGlobalTransactionId)
+ {
+ do_sql_wait_command(entry->conn, "COMMIT TRANSACTION");
+ }
/*
* If there were any errors in subtransactions, and we
@@ -573,27 +779,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
entry->have_prep_stmt = false;
entry->have_error = false;
break;
- case XACT_EVENT_PRE_PREPARE:
- /*
- * We disallow remote transactions that modified anything,
- * since it's not very reasonable to hold them open until
- * the prepared transaction is committed. For the moment,
- * throw error unconditionally; later we might allow
- * read-only cases. Note that the error will cause us to
- * come right back here with event == XACT_EVENT_ABORT, so
- * we'll clean up the connection state at that point.
- */
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("cannot prepare a transaction that modified remote tables")));
- break;
- case XACT_EVENT_PARALLEL_COMMIT:
- case XACT_EVENT_COMMIT:
- case XACT_EVENT_PREPARE:
- /* Pre-commit should have closed the open transaction */
- elog(ERROR, "missed cleaning up connection during pre-commit");
- break;
case XACT_EVENT_PARALLEL_ABORT:
case XACT_EVENT_ABORT:
/* Assume we might have lost track of prepared statements */
@@ -617,6 +803,11 @@ pgfdw_xact_callback(XactEvent event, void *arg)
entry->have_error = false;
}
break;
+
+ case XACT_EVENT_START:
+ case XACT_EVENT_ABORT_PREPARED:
+ case XACT_EVENT_COMMIT_PREPARED:
+ break;
}
}
@@ -630,21 +821,26 @@ pgfdw_xact_callback(XactEvent event, void *arg)
if (PQstatus(entry->conn) != CONNECTION_OK ||
PQtransactionStatus(entry->conn) != PQTRANS_IDLE)
{
- elog(DEBUG3, "discarding connection %p", entry->conn);
+ elog(WARNING, "discarding connection %p, conn status=%d, trans status=%d", entry->conn, PQstatus(entry->conn), PQtransactionStatus(entry->conn));
PQfinish(entry->conn);
entry->conn = NULL;
}
}
+ if (event != XACT_EVENT_PARALLEL_PRE_COMMIT && event != XACT_EVENT_PRE_COMMIT)
+ {
+ /*
+ * Regardless of the event type, we can now mark ourselves as out of
+ * the transaction. (Note: if we are here during PRE_COMMIT or
+ * PRE_PREPARE, this saves a useless scan of the hashtable during
+ * COMMIT or PREPARE.)
+ */
+ xact_got_connection = false;
- /*
- * Regardless of the event type, we can now mark ourselves as out of the
- * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
- * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
- */
- xact_got_connection = false;
+ /* Also reset cursor numbering for next transaction */
+ cursor_number = 0;
- /* Also reset cursor numbering for next transaction */
- cursor_number = 0;
+ currentGlobalTransactionId = 0;
+ }
}
/*
diff --git a/contrib/postgres_fdw/postgres_fdw--1.0.sql b/contrib/postgres_fdw/postgres_fdw--1.0.sql
index a0f0fc1..0ce8f0e 100644
--- a/contrib/postgres_fdw/postgres_fdw--1.0.sql
+++ b/contrib/postgres_fdw/postgres_fdw--1.0.sql
@@ -16,3 +16,8 @@ LANGUAGE C STRICT;
CREATE FOREIGN DATA WRAPPER postgres_fdw
HANDLER postgres_fdw_handler
VALIDATOR postgres_fdw_validator;
+
+CREATE FUNCTION postgres_fdw_exec(relid oid, sql cstring)
+RETURNS void
+AS 'MODULE_PATHNAME'
+LANGUAGE C STRICT;
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index d5a2af9..08b28b6 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -193,10 +193,14 @@ typedef struct
List *already_used; /* expressions already dealt with */
} ec_member_foreign_arg;
+bool UseTsDtmTransactions;
+void _PG_init(void);
+
/*
* SQL functions
*/
PG_FUNCTION_INFO_V1(postgres_fdw_handler);
+PG_FUNCTION_INFO_V1(postgres_fdw_exec);
/*
* FDW callback routines
@@ -3214,3 +3218,29 @@ find_em_expr_for_rel(EquivalenceClass *ec, RelOptInfo *rel)
/* We didn't find any suitable equivalence class expression */
return NULL;
}
+
+Datum
+postgres_fdw_exec(PG_FUNCTION_ARGS)
+{
+ Oid relid = PG_GETARG_OID(0);
+ char const *sql = PG_GETARG_CSTRING(1);
+ Oid userid = GetUserId();
+ ForeignTable *table = GetForeignTable(relid);
+ ForeignServer *server = GetForeignServer(table->serverid);
+ UserMapping *user = GetUserMapping(userid, server->serverid);
+ PGconn *conn = GetConnection(server, user, false);
+ PGresult *res = PQexec(conn, sql);
+
+ PQclear(res);
+ ReleaseConnection(conn);
+ PG_RETURN_VOID();
+}
+
+void
+_PG_init(void)
+{
+ DefineCustomBoolVariable("postgres_fdw.use_tsdtm",
+ "Use timestamp base distributed transaction manager for FDW connections", NULL,
+ &UseTsDtmTransactions, false, PGC_USERSET, 0, NULL,
+ NULL, NULL);
+}
diff --git a/contrib/postgres_fdw/tests/dtmbench.cpp b/contrib/postgres_fdw/tests/dtmbench.cpp
new file mode 100644
index 0000000..c8e7d72
--- /dev/null
+++ b/contrib/postgres_fdw/tests/dtmbench.cpp
@@ -0,0 +1,275 @@
+#include <time.h>
+#include <stdio.h>
+#include <stdarg.h>
+#include <stdlib.h>
+#include <inttypes.h>
+#include <sys/time.h>
+#include <pthread.h>
+
+#include <string>
+#include <vector>
+
+#include <pqxx/connection>
+#include <pqxx/transaction>
+#include <pqxx/nontransaction>
+#include <pqxx/pipeline>
+
+using namespace std;
+using namespace pqxx;
+
+typedef void* (*thread_proc_t)(void*);
+typedef uint32_t xid_t;
+
+struct thread
+{
+ pthread_t t;
+ size_t transactions;
+ size_t updates;
+ size_t selects;
+ size_t aborts;
+ int id;
+
+ void start(int tid, thread_proc_t proc) {
+ id = tid;
+ updates = 0;
+ selects = 0;
+ aborts = 0;
+ transactions = 0;
+ pthread_create(&t, NULL, proc, this);
+ }
+
+ void wait() {
+ pthread_join(t, NULL);
+ }
+};
+
+struct config
+{
+ int nReaders;
+ int nWriters;
+ int nIterations;
+ int nAccounts;
+ int updatePercent;
+ int nShards;
+ string connection;
+
+ config() {
+ nShards = 1;
+ nReaders = 1;
+ nWriters = 10;
+ nIterations = 1000;
+ nAccounts = 10000;
+ updatePercent = 100;
+ }
+};
+
+config cfg;
+bool running;
+
+#define USEC 1000000
+
+static time_t getCurrentTime()
+{
+ struct timeval tv;
+ gettimeofday(&tv, NULL);
+ return (time_t)tv.tv_sec*USEC + tv.tv_usec;
+}
+
+
+void exec(transaction_base& txn, char const* sql, ...)
+{
+ va_list args;
+ va_start(args, sql);
+ char buf[1024];
+ vsprintf(buf, sql, args);
+ va_end(args);
+ txn.exec(buf);
+}
+
+template<class T>
+T execQuery( transaction_base& txn, char const* sql, ...)
+{
+ va_list args;
+ va_start(args, sql);
+ char buf[1024];
+ vsprintf(buf, sql, args);
+ va_end(args);
+ result r = txn.exec(buf);
+ return r[0][0].as(T());
+}
+
+void* reader(void* arg)
+{
+ thread& t = *(thread*)arg;
+ connection conn(cfg.connection);
+ int64_t prevSum = 0;
+
+ while (running) {
+ work txn(conn);
+ result r = txn.exec("select sum(v) from t");
+ int64_t sum = r[0][0].as(int64_t());
+ if (sum != prevSum) {
+ printf("Total=%ld\n", sum);
+ prevSum = sum;
+ }
+ t.transactions += 1;
+ t.selects += 1;
+ txn.commit();
+ }
+ return NULL;
+}
+
+void* writer(void* arg)
+{
+ thread& t = *(thread*)arg;
+ connection conn(cfg.connection);
+ for (int i = 0; i < cfg.nIterations; i++)
+ {
+ work txn(conn);
+ int srcAcc = random() % cfg.nAccounts;
+ int dstAcc = random() % cfg.nAccounts;
+ try {
+ if (random() % 100 < cfg.updatePercent) {
+ exec(txn, "update t set v = v - 1 where u=%d", srcAcc);
+ exec(txn, "update t set v = v + 1 where u=%d", dstAcc);
+ t.updates += 2;
+ } else {
+ int64_t sum = execQuery<int64_t>(txn, "select v from t where u=%d", srcAcc)
+ + execQuery<int64_t>(txn, "select v from t where u=%d", dstAcc);
+ if (sum > cfg.nIterations*cfg.nWriters || sum < -cfg.nIterations*cfg.nWriters) {
+ printf("Wrong sum=%ld\n", sum);
+ }
+ t.selects += 2;
+ }
+ txn.commit();
+ t.transactions += 1;
+ } catch (pqxx_exception const& x) {
+ txn.abort();
+ t.aborts += 1;
+ i -= 1;
+ continue;
+ }
+ }
+ return NULL;
+}
+
+void initializeDatabase()
+{
+ connection conn(cfg.connection);
+ int accountsPerShard = (cfg.nAccounts + cfg.nShards - 1)/cfg.nShards;
+ for (int i = 0; i < cfg.nShards; i++)
+ {
+ work txn(conn);
+ exec(txn, "alter table t_fdw%i add check (u between %d and %d)", i+1, accountsPerShard*i, accountsPerShard*(i+1)-1);
+ exec(txn, "insert into t_fdw%i (select generate_series(%d,%d), %d)", i+1, accountsPerShard*i, accountsPerShard*(i+1)-1, 0);
+ txn.commit();
+ }
+}
+
+int main (int argc, char* argv[])
+{
+ bool initialize = false;
+
+ if (argc == 1){
+ printf("Use -h to show usage options\n");
+ return 1;
+ }
+
+ for (int i = 1; i < argc; i++) {
+ if (argv[i][0] == '-') {
+ switch (argv[i][1]) {
+ case 'r':
+ cfg.nReaders = atoi(argv[++i]);
+ continue;
+ case 'w':
+ cfg.nWriters = atoi(argv[++i]);
+ continue;
+ case 'a':
+ cfg.nAccounts = atoi(argv[++i]);
+ continue;
+ case 'n':
+ cfg.nIterations = atoi(argv[++i]);
+ continue;
+ case 'p':
+ cfg.updatePercent = atoi(argv[++i]);
+ continue;
+ case 'c':
+ cfg.connection = string(argv[++i]);
+ continue;
+ case 'i':
+ initialize = true;
+ cfg.nShards = atoi(argv[++i]);
+ continue;
+ }
+ }
+ printf("Options:\n"
+ "\t-r N\tnumber of readers (1)\n"
+ "\t-w N\tnumber of writers (10)\n"
+ "\t-a N\tnumber of accounts (100000)\n"
+ "\t-n N\tnumber of iterations (1000)\n"
+ "\t-p N\tupdate percent (100)\n"
+ "\t-c STR\tdatabase connection string\n"
+ "\t-i N\tinitialize N shards\n");
+ return 1;
+ }
+
+ if (initialize) {
+ initializeDatabase();
+ printf("%d accounts inserted\n", cfg.nAccounts);
+ return 0;
+ }
+
+ time_t start = getCurrentTime();
+ running = true;
+
+ vector<thread> readers(cfg.nReaders);
+ vector<thread> writers(cfg.nWriters);
+ size_t nAborts = 0;
+ size_t nUpdates = 0;
+ size_t nSelects = 0;
+ size_t nTransactions = 0;
+
+ for (int i = 0; i < cfg.nReaders; i++) {
+ readers[i].start(i, reader);
+ }
+ for (int i = 0; i < cfg.nWriters; i++) {
+ writers[i].start(i, writer);
+ }
+
+ for (int i = 0; i < cfg.nWriters; i++) {
+ writers[i].wait();
+ nUpdates += writers[i].updates;
+ nSelects += writers[i].selects;
+ nAborts += writers[i].aborts;
+ nTransactions += writers[i].transactions;
+ }
+
+ running = false;
+
+ for (int i = 0; i < cfg.nReaders; i++) {
+ readers[i].wait();
+ nSelects += readers[i].selects;
+ nTransactions += writers[i].transactions;
+ }
+
+ time_t elapsed = getCurrentTime() - start;
+
+ printf(
+ "{\"tps\":%f, \"transactions\":%ld,"
+ " \"selects\":%ld, \"updates\":%ld, \"aborts\":%ld, \"abort_percent\": %d,"
+ " \"readers\":%d, \"writers\":%d, \"update_percent\":%d, \"accounts\":%d, \"iterations\":%d ,\"shards\":%d}\n",
+ (double)(nTransactions*USEC)/elapsed,
+ nTransactions,
+ nSelects,
+ nUpdates,
+ nAborts,
+ (int)(nAborts*100/nTransactions),
+ cfg.nReaders,
+ cfg.nWriters,
+ cfg.updatePercent,
+ cfg.nAccounts,
+ cfg.nIterations,
+ cfg.nShards);
+
+ return 0;
+}
diff --git a/contrib/postgres_fdw/tests/makefile b/contrib/postgres_fdw/tests/makefile
new file mode 100644
index 0000000..766d99f
--- /dev/null
+++ b/contrib/postgres_fdw/tests/makefile
@@ -0,0 +1,10 @@
+CXX=g++
+CXXFLAGS=-g -Wall -O2 -pthread
+
+all: dtmbench
+
+dtmbench: dtmbench.cpp
+ $(CXX) $(CXXFLAGS) -o dtmbench dtmbench.cpp -lpqxx
+
+clean:
+ rm -f dtmbench
\ No newline at end of file
xtm.patchtext/x-patch; name=xtm.patchDownload
diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index 94455b2..37523a1 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -15,7 +15,7 @@ include $(top_builddir)/src/Makefile.global
OBJS = clog.o commit_ts.o multixact.o parallel.o rmgr.o slru.o subtrans.o \
timeline.o transam.o twophase.o twophase_rmgr.o varsup.o \
xact.o xlog.o xlogarchive.o xlogfuncs.o \
- xloginsert.o xlogreader.o xlogutils.o
+ xloginsert.o xlogreader.o xlogutils.o xtm.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/transam/clog.c b/src/backend/access/transam/clog.c
index 06aff18..ba7b09f 100644
--- a/src/backend/access/transam/clog.c
+++ b/src/backend/access/transam/clog.c
@@ -38,6 +38,7 @@
#include "access/xlog.h"
#include "access/xloginsert.h"
#include "access/xlogutils.h"
+#include "access/xtm.h"
#include "miscadmin.h"
#include "pg_trace.h"
@@ -92,6 +93,12 @@ static void TransactionIdSetStatusBit(TransactionId xid, XidStatus status,
static void set_status_by_pages(int nsubxids, TransactionId *subxids,
XidStatus status, XLogRecPtr lsn);
+void
+TransactionIdSetTreeStatus(TransactionId xid, int nsubxids,
+ TransactionId *subxids, XidStatus status, XLogRecPtr lsn)
+{
+ return TM->SetTransactionStatus(xid, nsubxids, subxids, status, lsn);
+}
/*
* TransactionIdSetTreeStatus
@@ -145,7 +152,7 @@ static void set_status_by_pages(int nsubxids, TransactionId *subxids,
* cache yet.
*/
void
-TransactionIdSetTreeStatus(TransactionId xid, int nsubxids,
+PgTransactionIdSetTreeStatus(TransactionId xid, int nsubxids,
TransactionId *subxids, XidStatus status, XLogRecPtr lsn)
{
int pageno = TransactionIdToPage(xid); /* get page of parent */
@@ -391,6 +398,12 @@ TransactionIdSetStatusBit(TransactionId xid, XidStatus status, XLogRecPtr lsn, i
XidStatus
TransactionIdGetStatus(TransactionId xid, XLogRecPtr *lsn)
{
+ return TM->GetTransactionStatus(xid, lsn);
+}
+
+XidStatus
+PgTransactionIdGetStatus(TransactionId xid, XLogRecPtr *lsn)
+{
int pageno = TransactionIdToPage(xid);
int byteno = TransactionIdToByte(xid);
int bshift = TransactionIdToBIndex(xid) * CLOG_BITS_PER_XACT;
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 8a22836..6c2813b 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -1381,15 +1381,21 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
* callbacks will release the locks the transaction held.
*/
if (isCommit)
+ {
RecordTransactionCommitPrepared(xid,
hdr->nsubxacts, children,
hdr->ncommitrels, commitrels,
hdr->ninvalmsgs, invalmsgs,
hdr->initfileinval);
+ CallXactCallbacks(XACT_EVENT_COMMIT_PREPARED);
+ }
else
+ {
RecordTransactionAbortPrepared(xid,
hdr->nsubxacts, children,
hdr->nabortrels, abortrels);
+ CallXactCallbacks(XACT_EVENT_ABORT_PREPARED);
+ }
ProcArrayRemove(proc, latestXid);
@@ -2150,3 +2156,12 @@ RecordTransactionAbortPrepared(TransactionId xid,
*/
SyncRepWaitForLSN(recptr);
}
+
+/*
+ * Return identified of current global transaction
+ */
+const char*
+GetLockedGlobalTransactionId(void)
+{
+ return MyLockedGxact ? MyLockedGxact->gid : NULL;
+}
diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c
index 2f7e645..2f8514d 100644
--- a/src/backend/access/transam/varsup.c
+++ b/src/backend/access/transam/varsup.c
@@ -19,6 +19,7 @@
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog.h"
+#include "access/xtm.h"
#include "commands/dbcommands.h"
#include "miscadmin.h"
#include "postmaster/autovacuum.h"
@@ -33,6 +34,11 @@
/* pointer to "variable cache" in shared memory (set up by shmem.c) */
VariableCache ShmemVariableCache = NULL;
+TransactionId
+GetNewTransactionId(bool isSubXact)
+{
+ return TM->GetNewTransactionId(isSubXact);
+}
/*
* Allocate the next XID for a new transaction or subtransaction.
@@ -45,7 +51,7 @@ VariableCache ShmemVariableCache = NULL;
* issue a warning about XID wrap.
*/
TransactionId
-GetNewTransactionId(bool isSubXact)
+PgGetNewTransactionId(bool isSubXact)
{
TransactionId xid;
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index b0d5440..c8d58d3 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -301,7 +301,6 @@ static void AtCommit_Memory(void);
static void AtStart_Cache(void);
static void AtStart_Memory(void);
static void AtStart_ResourceOwner(void);
-static void CallXactCallbacks(XactEvent event);
static void CallSubXactCallbacks(SubXactEvent event,
SubTransactionId mySubid,
SubTransactionId parentSubid);
@@ -1909,6 +1908,7 @@ StartTransaction(void)
*/
s->state = TRANS_INPROGRESS;
+ CallXactCallbacks(XACT_EVENT_START);
ShowTransactionState("StartTransaction");
}
@@ -3309,7 +3309,7 @@ UnregisterXactCallback(XactCallback callback, void *arg)
}
}
-static void
+void
CallXactCallbacks(XactEvent event)
{
XactCallbackItem *item;
@@ -5607,3 +5607,10 @@ xact_redo(XLogReaderState *record)
else
elog(PANIC, "xact_redo: unknown op code %u", info);
}
+
+void
+MarkAsAborted()
+{
+ CurrentTransactionState->state = TRANS_INPROGRESS;
+ CurrentTransactionState->blockState = TBLOCK_STARTED;
+}
diff --git a/src/backend/access/transam/xtm.c b/src/backend/access/transam/xtm.c
new file mode 100644
index 0000000..7915b1e
--- /dev/null
+++ b/src/backend/access/transam/xtm.c
@@ -0,0 +1,58 @@
+/*-------------------------------------------------------------------------
+ *
+ * xtm.c
+ * PostgreSQL implementation of transaction manager protocol
+ *
+ * This module defines default iplementaiton of PostgreSQL transaction manager protocol
+ *
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/access/transam/xtm.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/transam.h"
+#include "access/xtm.h"
+
+TransactionId
+PgGetGlobalTransactionId(void)
+{
+ return InvalidTransactionId;
+}
+
+bool
+PgDetectGlobalDeadLock(PGPROC *proc)
+{
+ return false;
+}
+
+char const *
+PgGetTransactionManagerName(void)
+{
+ return "postgres";
+}
+
+TransactionManager PgTM = {
+ PgTransactionIdGetStatus,
+ PgTransactionIdSetTreeStatus,
+ PgGetSnapshotData,
+ PgGetNewTransactionId,
+ PgGetOldestXmin,
+ PgTransactionIdIsInProgress,
+ PgGetGlobalTransactionId,
+ PgXidInMVCCSnapshot,
+ PgDetectGlobalDeadLock,
+ PgGetTransactionManagerName
+};
+
+TransactionManager *TM = &PgTM;
+
+TransactionManager *
+GetTransactionManager(void)
+{
+ return TM;
+}
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 6ded0f0..c2e878c 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -51,6 +51,7 @@
#include "access/twophase.h"
#include "access/xact.h"
#include "access/xlog.h"
+#include "access/xtm.h"
#include "catalog/catalog.h"
#include "miscadmin.h"
#include "storage/proc.h"
@@ -971,6 +972,12 @@ ProcArrayApplyXidAssignment(TransactionId topxid,
LWLockRelease(ProcArrayLock);
}
+bool
+TransactionIdIsInProgress(TransactionId xid)
+{
+ return TM->IsInProgress(xid);
+}
+
/*
* TransactionIdIsInProgress -- is given transaction running in some backend
*
@@ -998,7 +1005,7 @@ ProcArrayApplyXidAssignment(TransactionId topxid,
* PGXACT again anyway; see GetNewTransactionId).
*/
bool
-TransactionIdIsInProgress(TransactionId xid)
+PgTransactionIdIsInProgress(TransactionId xid)
{
static TransactionId *xids = NULL;
int nxids = 0;
@@ -1259,6 +1266,12 @@ TransactionIdIsActive(TransactionId xid)
}
+TransactionId
+GetOldestXmin(Relation rel, bool ignoreVacuum)
+{
+ return TM->GetOldestXmin(rel, ignoreVacuum);
+}
+
/*
* GetOldestXmin -- returns oldest transaction that was running
* when any current transaction was started.
@@ -1308,7 +1321,7 @@ TransactionIdIsActive(TransactionId xid)
* GetOldestXmin() move backwards, with no consequences for data integrity.
*/
TransactionId
-GetOldestXmin(Relation rel, bool ignoreVacuum)
+PgGetOldestXmin(Relation rel, bool ignoreVacuum)
{
ProcArrayStruct *arrayP = procArray;
TransactionId result;
@@ -1470,6 +1483,12 @@ GetMaxSnapshotSubxidCount(void)
return TOTAL_MAX_CACHED_SUBXIDS;
}
+Snapshot
+GetSnapshotData(Snapshot snapshot)
+{
+ return TM->GetSnapshot(snapshot);
+}
+
/*
* GetSnapshotData -- returns information about running transactions.
*
@@ -1506,7 +1525,7 @@ GetMaxSnapshotSubxidCount(void)
* not statically allocated (see xip allocation below).
*/
Snapshot
-GetSnapshotData(Snapshot snapshot)
+PgGetSnapshotData(Snapshot snapshot)
{
ProcArrayStruct *arrayP = procArray;
TransactionId xmin;
diff --git a/src/backend/storage/lmgr/deadlock.c b/src/backend/storage/lmgr/deadlock.c
index 69f678b..ec8b77e 100644
--- a/src/backend/storage/lmgr/deadlock.c
+++ b/src/backend/storage/lmgr/deadlock.c
@@ -30,6 +30,7 @@
#include "pgstat.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
+#include "access/xtm.h"
#include "utils/memutils.h"
@@ -272,7 +273,7 @@ DeadLockCheck(PGPROC *proc)
else if (blocking_autovacuum_proc != NULL)
return DS_BLOCKED_BY_AUTOVACUUM;
else
- return DS_NO_DEADLOCK;
+ return TM->DetectGlobalDeadLock(proc) ? DS_DISTRIBUTED_DEADLOCK : DS_NO_DEADLOCK;
}
/*
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index e3e9599..21bf32b 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -3639,6 +3639,20 @@ GetLockmodeName(LOCKMETHODID lockmethodid, LOCKMODE mode)
return LockMethods[lockmethodid]->lockModeNames[mode];
}
+void
+EnumerateLocks(LockIterator iterator, void* arg)
+{
+ PROCLOCK *proclock;
+ HASH_SEQ_STATUS status;
+
+ hash_seq_init(&status, LockMethodProcLockHash);
+
+ while ((proclock = (PROCLOCK *) hash_seq_search(&status)) != NULL)
+ {
+ iterator(proclock, arg);
+ }
+}
+
#ifdef LOCK_DEBUG
/*
* Dump all locks in the given proc's myProcLocks lists.
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 084be5a..8ded531 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -1395,6 +1395,22 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
"Processes holding the lock: %s. Wait queue: %s.",
lockHoldersNum, lock_holders_sbuf.data, lock_waiters_sbuf.data))));
}
+ else if (deadlock_state == DS_DISTRIBUTED_DEADLOCK)
+ {
+ /*
+ * This message is a bit redundant with the error that will be
+ * reported subsequently, but in some cases the error report
+ * might not make it to the log (eg, if it's caught by an
+ * exception handler), and we want to ensure all long-wait
+ * events get logged.
+ */
+ ereport(LOG,
+ (errmsg("process %d detected distributed deadlock while waiting for %s on %s after %ld.%03d ms",
+ MyProcPid, modename, buf.data, msecs, usecs),
+ (errdetail_log_plural("Process holding the lock: %s. Wait queue: %s.",
+ "Processes holding the lock: %s. Wait queue: %s.",
+ lockHoldersNum, lock_holders_sbuf.data, lock_waiters_sbuf.data))));
+ }
if (myWaitStatus == STATUS_WAITING)
ereport(LOG,
@@ -1419,7 +1435,7 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
* future-proofing, print a message if it looks like someone
* else kicked us off the lock.
*/
- if (deadlock_state != DS_HARD_DEADLOCK)
+ if (deadlock_state != DS_HARD_DEADLOCK && deadlock_state != DS_DISTRIBUTED_DEADLOCK)
ereport(LOG,
(errmsg("process %d failed to acquire %s on %s after %ld.%03d ms",
MyProcPid, modename, buf.data, msecs, usecs),
@@ -1636,7 +1652,7 @@ CheckDeadLock(void)
/* Run the deadlock check, and set deadlock_state for use by ProcSleep */
deadlock_state = DeadLockCheck(MyProc);
- if (deadlock_state == DS_HARD_DEADLOCK)
+ if (deadlock_state == DS_HARD_DEADLOCK || deadlock_state == DS_DISTRIBUTED_DEADLOCK)
{
/*
* Oops. We have a deadlock.
diff --git a/src/backend/utils/time/tqual.c b/src/backend/utils/time/tqual.c
index 465933d..7ca0e06 100644
--- a/src/backend/utils/time/tqual.c
+++ b/src/backend/utils/time/tqual.c
@@ -66,6 +66,7 @@
#include "access/subtrans.h"
#include "access/transam.h"
#include "access/xact.h"
+#include "access/xtm.h"
#include "access/xlog.h"
#include "storage/bufmgr.h"
#include "storage/procarray.h"
@@ -1454,6 +1455,12 @@ HeapTupleIsSurelyDead(HeapTuple htup, TransactionId OldestXmin)
return TransactionIdPrecedes(HeapTupleHeaderGetRawXmax(tuple), OldestXmin);
}
+bool
+XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
+{
+ return TM->IsInSnapshot(xid, snapshot);
+}
+
/*
* XidInMVCCSnapshot
* Is the given XID still-in-progress according to the snapshot?
@@ -1464,8 +1471,8 @@ HeapTupleIsSurelyDead(HeapTuple htup, TransactionId OldestXmin)
* TransactionIdIsCurrentTransactionId first, except for known-committed
* XIDs which could not be ours anyway.
*/
-static bool
-XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
+bool
+PgXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
{
uint32 i;
diff --git a/src/include/access/clog.h b/src/include/access/clog.h
index 06c069a..3adf6b1 100644
--- a/src/include/access/clog.h
+++ b/src/include/access/clog.h
@@ -27,6 +27,7 @@ typedef int XidStatus;
#define TRANSACTION_STATUS_COMMITTED 0x01
#define TRANSACTION_STATUS_ABORTED 0x02
#define TRANSACTION_STATUS_SUB_COMMITTED 0x03
+#define TRANSACTION_STATUS_UNKNOWN 0x03
extern void TransactionIdSetTreeStatus(TransactionId xid, int nsubxids,
diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h
index b7ce0c6..05d6aef 100644
--- a/src/include/access/twophase.h
+++ b/src/include/access/twophase.h
@@ -56,4 +56,6 @@ extern void CheckPointTwoPhase(XLogRecPtr redo_horizon);
extern void FinishPreparedTransaction(const char *gid, bool isCommit);
+extern const char *GetLockedGlobalTransactionId(void);
+
#endif /* TWOPHASE_H */
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index ebeb582..4a59047 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -77,6 +77,7 @@ extern bool MyXactAccessedTempRel;
*/
typedef enum
{
+ XACT_EVENT_START,
XACT_EVENT_COMMIT,
XACT_EVENT_PARALLEL_COMMIT,
XACT_EVENT_ABORT,
@@ -84,7 +85,9 @@ typedef enum
XACT_EVENT_PREPARE,
XACT_EVENT_PRE_COMMIT,
XACT_EVENT_PARALLEL_PRE_COMMIT,
- XACT_EVENT_PRE_PREPARE
+ XACT_EVENT_PRE_PREPARE,
+ XACT_EVENT_COMMIT_PREPARED,
+ XACT_EVENT_ABORT_PREPARED
} XactEvent;
typedef void (*XactCallback) (XactEvent event, void *arg);
@@ -100,6 +103,8 @@ typedef enum
typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
SubTransactionId parentSubid, void *arg);
+void CallXactCallbacks(XactEvent event);
+
/* ----------------
* transaction-related XLOG entries
@@ -380,4 +385,6 @@ extern void EnterParallelMode(void);
extern void ExitParallelMode(void);
extern bool IsInParallelMode(void);
+extern void MarkAsAborted(void);
+
#endif /* XACT_H */
diff --git a/src/include/access/xtm.h b/src/include/access/xtm.h
new file mode 100644
index 0000000..08fa259
--- /dev/null
+++ b/src/include/access/xtm.h
@@ -0,0 +1,103 @@
+/*
+ * xtm.h
+ *
+ * PostgreSQL transaction-commit-log manager
+ *
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/xtm.h
+ */
+#ifndef XTM_H
+#define XTM_H
+
+#include "storage/proc.h"
+#include "access/clog.h"
+#include "utils/snapmgr.h"
+#include "utils/relcache.h"
+
+typedef struct
+{
+ /*
+ * Get current transaction status (encapsulation of TransactionIdGetStatus
+ * in clog.c)
+ */
+ XidStatus (*GetTransactionStatus) (TransactionId xid, XLogRecPtr *lsn);
+
+ /*
+ * Set current transaction status (encapsulation of
+ * TransactionIdSetTreeStatus in clog.c)
+ */
+ void (*SetTransactionStatus) (TransactionId xid, int nsubxids, TransactionId *subxids, XidStatus status, XLogRecPtr lsn);
+
+ /*
+ * Get current transaction snaphot (encapsulation of GetSnapshotData in
+ * procarray.c)
+ */
+ Snapshot (*GetSnapshot) (Snapshot snapshot);
+
+ /*
+ * Assign new Xid to transaction (encapsulation of GetNewTransactionId in
+ * varsup.c)
+ */
+ TransactionId (*GetNewTransactionId) (bool isSubXact);
+
+ /*
+ * Get oldest transaction Xid that was running when any current
+ * transaction was started (encapsulation of GetOldestXmin in procarray.c)
+ */
+ TransactionId (*GetOldestXmin) (Relation rel, bool ignoreVacuum);
+
+ /*
+ * Check if current transaction is not yet completed (encapsulation of
+ * TransactionIdIsInProgress in procarray.c)
+ */
+ bool (*IsInProgress) (TransactionId xid);
+
+ /*
+ * Get global transaction XID: returns XID of current transaction if it is
+ * global, InvalidTransactionId otherwise
+ */
+ TransactionId (*GetGlobalTransactionId) (void);
+
+ /*
+ * Is the given XID still-in-progress according to the snapshot
+ * (encapsulation of XidInMVCCSnapshot in tqual.c)
+ */
+ bool (*IsInSnapshot) (TransactionId xid, Snapshot snapshot);
+
+ /* Detect distributed deadlock */
+ bool (*DetectGlobalDeadLock) (PGPROC *proc);
+
+ char const *(*GetName) (void);
+} TransactionManager;
+
+/* Get pointer to transaction manager: actually returns content of TM variable */
+TransactionManager *GetTransactionManager(void);
+
+extern TransactionManager *TM; /* Current transaction manager (can be
+ * substituted by extensions) */
+extern TransactionManager PgTM; /* Standard PostgreSQL transaction manager */
+
+/* Standard PostgreSQL function implementing TM interface */
+extern bool PgXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot);
+
+extern void PgTransactionIdSetTreeStatus(TransactionId xid, int nsubxids,
+ TransactionId *subxids, XidStatus status, XLogRecPtr lsn);
+extern XidStatus PgTransactionIdGetStatus(TransactionId xid, XLogRecPtr *lsn);
+
+extern Snapshot PgGetSnapshotData(Snapshot snapshot);
+
+extern TransactionId PgGetOldestXmin(Relation rel, bool ignoreVacuum);
+
+extern bool PgTransactionIdIsInProgress(TransactionId xid);
+
+extern TransactionId PgGetGlobalTransactionId(void);
+
+extern TransactionId PgGetNewTransactionId(bool isSubXact);
+
+extern bool PgDetectGlobalDeadLock(PGPROC *proc);
+
+extern char const *PgGetTransactionManagerName(void);
+
+#endif
diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h
index 6b4e365..0d45894 100644
--- a/src/include/storage/lock.h
+++ b/src/include/storage/lock.h
@@ -454,8 +454,9 @@ typedef enum
DS_NO_DEADLOCK, /* no deadlock detected */
DS_SOFT_DEADLOCK, /* deadlock avoided by queue rearrangement */
DS_HARD_DEADLOCK, /* deadlock, no way out but ERROR */
- DS_BLOCKED_BY_AUTOVACUUM /* no deadlock; queue blocked by autovacuum
+ DS_BLOCKED_BY_AUTOVACUUM, /* no deadlock; queue blocked by autovacuum
* worker */
+ DS_DISTRIBUTED_DEADLOCK /* distributed deadlock detected by DTM */
} DeadLockState;
/*
@@ -547,6 +548,10 @@ extern void DumpLocks(PGPROC *proc);
extern void DumpAllLocks(void);
#endif
+typedef void (*LockIterator) (PROCLOCK *lock, void *arg);
+
+extern void EnumerateLocks(LockIterator iterator, void *arg);
+
/* Lock a VXID (used to wait for a transaction to finish) */
extern void VirtualXactLockTableInsert(VirtualTransactionId vxid);
extern void VirtualXactLockTableCleanup(void);
On 2/10/16 12:50 PM, Konstantin Knizhnik wrote:
PostgresProffesional cluster teams wants to propose new version of
eXtensible Transaction Manager API.
Previous discussion concerning this patch can be found here:/messages/by-id/F2766B97-555D-424F-B29F-E0CA0F6D1D74@postgrespro.ru
I see a lot of discussion on this thread but little in the way of consensus.
The API patch itself is small enough, but we think that it will be
strange to provide just API without examples of its usage.
It's not all that small, though it does apply cleanly even after a few
months. At least that indicates there is not a lot of churn in this area.
I'm concerned about the lack of response or reviewers for this patch.
It may be because everyone believes they had their say on the original
thread, or because it seems like a big change to go into the last CF, or
for other reasons altogether.
I think you should try to make it clear why this patch would be a win
for 9.6.
Is anyone willing to volunteer a review or make an argument for the
importance of this patch?
--
-David
david@pgmasters.net
On Fri, Mar 11, 2016 at 1:11 PM, David Steele <david@pgmasters.net> wrote:
On 2/10/16 12:50 PM, Konstantin Knizhnik wrote:
PostgresProffesional cluster teams wants to propose new version of
eXtensible Transaction Manager API.
Previous discussion concerning this patch can be found here:/messages/by-id/F2766B97-555D-424F-B29F-E0CA0F6D1D74@postgrespro.ru
I see a lot of discussion on this thread but little in the way of consensus.
The API patch itself is small enough, but we think that it will be
strange to provide just API without examples of its usage.It's not all that small, though it does apply cleanly even after a few
months. At least that indicates there is not a lot of churn in this area.I'm concerned about the lack of response or reviewers for this patch.
It may be because everyone believes they had their say on the original
thread, or because it seems like a big change to go into the last CF, or
for other reasons altogether.I think you should try to make it clear why this patch would be a win
for 9.6.Is anyone willing to volunteer a review or make an argument for the
importance of this patch?
There's been a lot of discussion on another thread about this patch.
The subject is "The plan for FDW-based sharding", but the thread kind
of got partially hijacked by this issue. The net-net of that is that
I don't think we have a clear enough idea about where we're going with
global transaction management to make it a good idea to adopt an API
like this. For example, if we later decide we want to put the
functionality in core, will we keep the hooks around for the sake of
alternative non-core implementations? I just don't believe this
technology is nearly mature enough to commit to at this point.
Konstantin does not agree with my assessment, perhaps unsurprisingly.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 3/11/16 1:30 PM, Robert Haas wrote:
There's been a lot of discussion on another thread about this patch.
The subject is "The plan for FDW-based sharding", but the thread kind
of got partially hijacked by this issue. The net-net of that is that
I don't think we have a clear enough idea about where we're going with
global transaction management to make it a good idea to adopt an API
like this. For example, if we later decide we want to put the
functionality in core, will we keep the hooks around for the sake of
alternative non-core implementations? I just don't believe this
technology is nearly mature enough to commit to at this point.
Ah yes, I forgot about the related discussion on that thread. Pasting
here for reference:
/messages/by-id/20160223164335.GA11285@momjian.us
Konstantin does not agree with my assessment, perhaps unsurprisingly.
I'm certainly no stranger to feeling strongly about a patch!
--
-David
david@pgmasters.net
On Fri, Mar 11, 2016 at 7:11 PM, David Steele <david@pgmasters.net> wrote:
On 2/10/16 12:50 PM, Konstantin Knizhnik wrote:
PostgresProffesional cluster teams wants to propose new version of
eXtensible Transaction Manager API.
Previous discussion concerning this patch can be found here:/messages/by-id/F2766B97-555D-424F-B29F-E0CA0F6D1D74@postgrespro.ru
I see a lot of discussion on this thread but little in the way of
consensus.The API patch itself is small enough, but we think that it will be
strange to provide just API without examples of its usage.It's not all that small, though it does apply cleanly even after a few
months. At least that indicates there is not a lot of churn in this area.I'm concerned about the lack of response or reviewers for this patch.
It may be because everyone believes they had their say on the original
thread, or because it seems like a big change to go into the last CF, or
for other reasons altogether.
We'll prepare easy setup to play with our solutions, so any developers
could see how it works. Hope this weekend we'll post something about this.
I think you should try to make it clear why this patch would be a win
for 9.6.
Looks like discussion shifted to different thread, we'll answer here.
Show quoted text
Is anyone willing to volunteer a review or make an argument for the
importance of this patch?--
-David
david@pgmasters.net
On 3/11/16 2:00 PM, Oleg Bartunov wrote:
On Fri, Mar 11, 2016 at 7:11 PM, David Steele <david@pgmasters.net
I'm concerned about the lack of response or reviewers for this patch.
It may be because everyone believes they had their say on the original
thread, or because it seems like a big change to go into the last CF, or
for other reasons altogether.We'll prepare easy setup to play with our solutions, so any developers
could see how it works. Hope this weekend we'll post something about this.
OK, then for now I'm marking this "waiting for author." You can switch
it back to "needs review" once you have posted additional material.
--
-David
david@pgmasters.net
Robert Haas <robertmhaas@gmail.com> writes:
On Fri, Mar 11, 2016 at 1:11 PM, David Steele <david@pgmasters.net> wrote:
Is anyone willing to volunteer a review or make an argument for the
importance of this patch?
There's been a lot of discussion on another thread about this patch.
The subject is "The plan for FDW-based sharding", but the thread kind
of got partially hijacked by this issue. The net-net of that is that
I don't think we have a clear enough idea about where we're going with
global transaction management to make it a good idea to adopt an API
like this. For example, if we later decide we want to put the
functionality in core, will we keep the hooks around for the sake of
alternative non-core implementations? I just don't believe this
technology is nearly mature enough to commit to at this point.
Konstantin does not agree with my assessment, perhaps unsurprisingly.
I re-read the original thread,
/messages/by-id/F2766B97-555D-424F-B29F-E0CA0F6D1D74@postgrespro.ru
I think there is no question that this is an entirely immature patch.
Not coping with subtransactions is alone sufficient to make it not
credible for production.
Even if the extension API were complete and clearly stable, I have doubts
that there's any great value in integrating it into 9.6, rather than some
later release series. The above thread makes it clear that pg_dtm is very
much WIP and has easily a year's worth of work before anybody would think
of wanting to deploy it. So end users don't need this patch in 9.6, and
developers working on pg_dtm shouldn't really have much of a problem
applying the patch locally --- how likely is it that they'd be using a
perfectly stock build of the database apart from this patch?
But my real takeaway from that thread is that there's no great reason
to believe that this API definition *is* stable. The single existing
use-case is very far from completion, and it's hardly unlikely that
what it needs will change.
I also took a very quick look at the patch itself:
1. No documentation. For something that purports to be an API
specification, really the documentation should have been written *first*.
2. As noted in the cited thread, it's not clear that
Get/SetTransactionStatus are a useful cutpoint; they don't provide any
real atomicity guarantees.
3. Uh, how can you hook GetNewTransactionId but not ReadNewTransactionId?
4. There seems to be an intention to encapsulate snapshots, but surely
wrapping hooks around GetSnapshotData and XidInMVCCSnapshot is not nearly
enough for that. Look at all the knowledge snapmgr.c has about snapshot
representation, for example. And is a function like GetOldestXmin even
meaningful with a different notion of what snapshots are? (For that
matter, is TransactionId == uint32 still tenable for any other notion
of snapshots?)
5. BTW, why would you hook at XidInMVCCSnapshot rather than making use of
the existing capability to have a custom SnapshotSatisfiesFunc snapshot
checker function?
IMO this is not committable as-is, and I don't think that it's something
that will become committable during this 'fest. I think we'd be well
advised to boot it to the 2016-09 CF and focus our efforts on other stuff
that has a better chance of getting finished this month.
regards, tom lane
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 03/11/2016 11:35 PM, Tom Lane wrote:
Robert Haas <robertmhaas@gmail.com> writes:
On Fri, Mar 11, 2016 at 1:11 PM, David Steele <david@pgmasters.net> wrote:
Is anyone willing to volunteer a review or make an argument for the
importance of this patch?There's been a lot of discussion on another thread about this patch.
The subject is "The plan for FDW-based sharding", but the thread kind
of got partially hijacked by this issue. The net-net of that is that
I don't think we have a clear enough idea about where we're going with
global transaction management to make it a good idea to adopt an API
like this. For example, if we later decide we want to put the
functionality in core, will we keep the hooks around for the sake of
alternative non-core implementations? I just don't believe this
technology is nearly mature enough to commit to at this point.
Konstantin does not agree with my assessment, perhaps unsurprisingly.I re-read the original thread,
/messages/by-id/F2766B97-555D-424F-B29F-E0CA0F6D1D74@postgrespro.ru
I think there is no question that this is an entirely immature patch.
Not coping with subtransactions is alone sufficient to make it not
credible for production.
Lack of subtractions support is not a limitation of XTM API.
It is limitation of current pg_dtm implementation. And another DTM implementation - pg_tsdtm supports subtransactions.
Even if the extension API were complete and clearly stable, I have doubts
that there's any great value in integrating it into 9.6, rather than some
later release series. The above thread makes it clear that pg_dtm is very
much WIP and has easily a year's worth of work before anybody would think
of wanting to deploy it. So end users don't need this patch in 9.6, and
developers working on pg_dtm shouldn't really have much of a problem
applying the patch locally --- how likely is it that they'd be using a
perfectly stock build of the database apart from this patch?
I agree with you that pg_dtm is very far from production.
But I wan to notice two things:
1. pg_dtm and pg_tsdtm are not complete cluster solutions, them are just one (relatively small) part of them.
pg_tsdtm seems to be even more "mature", may be because it is simpler and do not have many limitations which pg_dtm has (like subtrasaction support).
2. Them can be quite easily integrated with other (existed) cluster solutions. We have integrated bother of them with postgres_fwd and pg_shard.
Postgres_fdw is also not a ready solution, but just a mechanism which can be used also for sharding.
But pg_shard & CitusDB are quite popular solutions for distributed execution of queries which provide good performance for analytic and single node OLTP queries.
Integration with DTMs adds ACID semantic for distributed transactions and makes it possible to support more complex OLTP and OLAP queries involving multiple nodes.
Such integration is already done, performance was evaluated, so it is not quite correct to say that we need a year or more to make pg_dtm/pg_tsdtm ready to deploy.
But my real takeaway from that thread is that there's no great reason
to believe that this API definition *is* stable. The single existing
use-case is very far from completion, and it's hardly unlikely that
what it needs will change.
Sorry, may be I am completely wrong, but I do not thing that it is possible to develop stable API if nobody is using it.
It is like "fill pool with a water only after you learn how to swim".
I also took a very quick look at the patch itself:
1. No documentation. For something that purports to be an API
specification, really the documentation should have been written *first*.
Sorry, it was my fault. I have already written documentation and it will be included in next version of the patch.
But please notice, that starting work on DTM we do not have good understanding with PostgreSQL TM features have to be changed.
Only during work on pg_dtm, pg_tsdtm, multimaster current view of XTM has been formed.
And yet another moment: we have not introduce new abstractions in XTM.
We just override existed PostgreSQL functions.
Certainly when some internal functions become part of API, it should be much better documented.
2. As noted in the cited thread, it's not clear that
Get/SetTransactionStatus are a useful cutpoint; they don't provide any
real atomicity guarantees.
I wonder how such guarantees can be provided at API level?
Atomicity means that all other transaction either see this transaction as committed, either uncommitted.
So transaction commit should be coordinated with visibility check.
In case of pg_dtm atomicity is simply enforced by the fact that decision whether to commit transaction is taken by central coordinator.
When it decides that transaction is committed, it marks it as committed in all subsequently obtained snapshots.
In case of pg_tsdtm there is no central arbiter, so we have to introduce "in-doubt" state of transaction, when it is not known whether transaction is
committed or aborted and any other transaction accessing tuples updated but this transaction has to wait while its status is "in-doubt".
The main challenge of pg_tsdtm is to make this period as short as possible...
But it is details of particular implementation which IMHO have no relation to API itself.
3. Uh, how can you hook GetNewTransactionId but not ReadNewTransactionId?
Uh-uh-uh:)
ReadNewTransactionId is just reading value of ShmemVariableCache->nextXid,
but unfortunately it is not the only point where nextXid is used - there are about hundred occurrences of nextXid in Postgres core.
This is why we made a decision that GetNewTransactionId should actually update ShmemVariableCache->nextXid, so that
there is no need to rewrite all this code.
Sorry, but IMHO it is problem of Postgres design and not of XTM;)
We just want to find some compromise which allows XTM to be flexible enough but minimize changes in core code.
4. There seems to be an intention to encapsulate snapshots, but surely
wrapping hooks around GetSnapshotData and XidInMVCCSnapshot is not nearly
enough for that. Look at all the knowledge snapmgr.c has about snapshot
representation, for example. And is a function like GetOldestXmin even
meaningful with a different notion of what snapshots are? (For that
matter, is TransactionId == uint32 still tenable for any other notion
of snapshots?)
XTM encapsulation of snapshots allows us to implement pg_dtm.
It does almost the same as Postgres-XL GTM, but without huge amount of #ifdefs.
Representation of XID is yet another compromise point: we do not want to change tuple header format.
So XID is still 32 bit and has the same meanining as in PostgreSQL. If custom implementation of TM wants to use some other identifiers of transactions,
like CSN in pg_tsdtm, it has to provide mapping between them and XIDs.
5. BTW, why would you hook at XidInMVCCSnapshot rather than making use of
the existing capability to have a custom SnapshotSatisfiesFunc snapshot
checker function?
HeapTupleSatisfies routines in times/tqual.c have implemented a lot of logic of handling different kind of snapshots, checking/setting hint bits in tuples,
caching,... We do not want to replace or just cut© all this code in DTM implementation.
And XidInMVCCSnapshot is common function finally used by most HeapTupleSatisfies* functions when all other checks are passed.
So it is really the most convenient place to plug-in custom visibility checking rules. And as far as I remember similar approach was used in Postgres-XL.
IMO this is not committable as-is, and I don't think that it's something
that will become committable during this 'fest. I think we'd be well
advised to boot it to the 2016-09 CF and focus our efforts on other stuff
that has a better chance of getting finished this month.
regards, tom lane
--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Fri, Mar 11, 2016 at 9:35 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote:
IMO this is not committable as-is, and I don't think that it's something
that will become committable during this 'fest. I think we'd be well
advised to boot it to the 2016-09 CF and focus our efforts on other stuff
that has a better chance of getting finished this month.
Yeah, I would believe that a good first step would be to discuss
deeply about that directly at PGCon for folks that will be there and
interested in the subject. It seems like a good timing to brainstorm
things F2F at the developer unconference for example, a couple of
months before the 1st CF of 9.7. We may perhaps (or not) get to
cleaner picture of what kind of things are wanted in this area.
--
Michael
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Michael Paquier <michael.paquier@gmail.com> writes:
On Fri, Mar 11, 2016 at 9:35 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote:
IMO this is not committable as-is, and I don't think that it's something
that will become committable during this 'fest. I think we'd be well
advised to boot it to the 2016-09 CF and focus our efforts on other stuff
that has a better chance of getting finished this month.
Yeah, I would believe that a good first step would be to discuss
deeply about that directly at PGCon for folks that will be there and
interested in the subject. It seems like a good timing to brainstorm
things F2F at the developer unconference for example, a couple of
months before the 1st CF of 9.7. We may perhaps (or not) get to
cleaner picture of what kind of things are wanted in this area.
Yeah, the whole area seems like a great topic for some unconference
sessions.
regards, tom lane
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Sat, Mar 12, 2016 at 11:06 AM, Tom Lane <tgl@sss.pgh.pa.us> wrote:
Michael Paquier <michael.paquier@gmail.com> writes:
On Fri, Mar 11, 2016 at 9:35 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote:
IMO this is not committable as-is, and I don't think that it's something
that will become committable during this 'fest. I think we'd be well
advised to boot it to the 2016-09 CF and focus our efforts on other stuff
that has a better chance of getting finished this month.Yeah, I would believe that a good first step would be to discuss
deeply about that directly at PGCon for folks that will be there and
interested in the subject. It seems like a good timing to brainstorm
things F2F at the developer unconference for example, a couple of
months before the 1st CF of 9.7. We may perhaps (or not) get to
cleaner picture of what kind of things are wanted in this area.Yeah, the whole area seems like a great topic for some unconference
sessions.
I agree. I think this is a problem we really need to solve, and I
think talking about it will help us figure out the best solution. I'd
also be interested in hearing Kevin Grittner's thoughts about
serializability in a distributed environment, since he's obviously
thought about the topic of serializability quite a bit.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Sat, Mar 12, 2016 at 11:21 AM, Robert Haas <robertmhaas@gmail.com> wrote:
I'd also be interested in hearing Kevin Grittner's thoughts about
serializability in a distributed environment, since he's obviously
thought about the topic of serializability quite a bit.
I haven't done a thorough search of the academic literature on
this, and I wouldn't be comfortable taking a really solid position
without that; but in thinking about it it seems like there are at
least three distinct problems which may have distinct solutions.
*Physical replication* may be best handled by leveraging the "safe
snapshot" idea already implemented in READ ONLY DEFERRABLE
transactions, and passing through information in the WAL stream to
allow the receiver to identify points where a snapshot can be taken
which cannot see an anomaly. There should probably be an option to
use the last known safe snapshot or wait for a point in the stream
where one next appears. This might take as little as a bit or two
per WAL commit record. It's not clear what the processing overhead
would be -- it wouldn't surprise me if it was "in the noise", nor
would it surprise me if it wasn't. We would need some careful
benchmarking, and, if performance was an issue, A GUC to control
whether the information was passed along (and, thus, whether
SERIALIZABLE transactions were allowed on the replica).
*Logical replication* (considered for the moment in a
unidirectional context) might best be handled by some reordering of
application of the commits on the replica into "apparent order of
execution" -- which is pretty well defined on the primary based on
commit order adjusted by read-write dependencies. Basically, the
"simple" implementation would be that WAL is applied normally
unless you receive a commit record which is flagged in some way to
indicate that it is for a serializable transaction which wrote data
and at the time of commit was concurrent with at least one other
serializable transaction which had not completed and was not READ
ONLY. Such a commit would await information in the WAL stream to
tell it when all such concurrent transactions completed, and would
indicate when such a transaction had a read-write dependency *in*
to the transaction with the suspended commit; commits for any such
transactions must be moved ahead of the suspended commit. This
would allow logical replication, with all the filtering and such,
to avoid ever showing a state on the replica which contained
serialization anomalies.
*Logical replication with cycles* (where there is some path for
cluster A to replicate to cluster B, and some other path for
cluster B to replicate the same or related data to cluster A) has a
few options. You could opt for "eventual consistency" --
essentially giving up on the I in ACID and managing the anomalies.
In practice this seems to lead to some form of S2PL at the
application coding level, which is very bad for performance and
concurrency, so I tend to think it should not be the only option.
Built-in S2PL would probably perform better than having it pasted
on at the application layer through some locking API, but for most
workloads is still inferior to SSI in both concurrency and
performance. Unless a search of the literature turns up some new
alternative, I'm inclined to think that if you want to distribute a
"logical" database over multiple clusters and still manage race
conditions through use of SERIALIZABLE transactions, a distributed
SSI implementation may be the best bet. That requires the
transaction manager (or something like it) to track non-blocking
predicate "locks" (what the implementation calls a SIReadLock)
across the whole environment, as well as tracking rw-conflicts (our
short name for read-write dependencies) across the whole
environment. Since SSI also looks at the MVCC state, handling
checks of that without falling victim to race conditions would also
need to be handled somehow.
If I remember correctly, the patches to add the SSI implementation
of SERIALIZABLE transactions were about ten times the size of the
patches to remove S2PL and initially replace it with MVCC. I don't
have even a gut feel as to how much bigger the distributed form is
likely to be. On the one hand the *fundamental logic* is all there
and should not need to change; on the other hand the *mechanism*
for acquiring the data to be *used* in that logic would be
different and potentially complex.
--
Kevin Grittner
EDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 12 Mar 2016, at 13:19, Michael Paquier <michael.paquier@gmail.com> wrote:
On Fri, Mar 11, 2016 at 9:35 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote:
IMO this is not committable as-is, and I don't think that it's something
that will become committable during this 'fest. I think we'd be well
advised to boot it to the 2016-09 CF and focus our efforts on other stuff
that has a better chance of getting finished this month.Yeah, I would believe that a good first step would be to discuss
deeply about that directly at PGCon for folks that will be there and
interested in the subject. It seems like a good timing to brainstorm
things F2F at the developer unconference for example, a couple of
months before the 1st CF of 9.7. We may perhaps (or not) get to
cleaner picture of what kind of things are wanted in this area.
To give overview of xtm coupled with postgres_fdw from users perspective i’ve packed patched postgres with docker
and provided test case when it is easy to spot violation of READ COMMITTED isolation level without XTM.
This test fills database with users across two shards connected by postgres_fdw and inherits the same table. Then
starts to concurrently transfers money between users in different shards:
begin;
update t set v = v - 1 where u=%d; -- this is user from t_fdw1, first shard
update t set v = v + 1 where u=%d; -- this is user from t_fdw2, second shard
commit;
Also test simultaneously runs reader thread that counts all money in system:
select sum(v) from t;
So in transactional system we expect that sum should be always constant (zero in our case, as we initialize user with zero balance).
But we can see that without tsdtm total amount of money fluctuates around zero.
https://github.com/kelvich/postgres_xtm_docker
---
Stas Kelvich
Postgres Professional: http://www.postgrespro.com
Russian Postgres Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 3/16/16 7:59 AM, Stas Kelvich wrote:
On 12 Mar 2016, at 13:19, Michael Paquier <michael.paquier@gmail.com> wrote:
On Fri, Mar 11, 2016 at 9:35 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote:
IMO this is not committable as-is, and I don't think that it's something
that will become committable during this 'fest. I think we'd be well
advised to boot it to the 2016-09 CF and focus our efforts on other stuff
that has a better chance of getting finished this month.Yeah, I would believe that a good first step would be to discuss
deeply about that directly at PGCon for folks that will be there and
interested in the subject. It seems like a good timing to brainstorm
things F2F at the developer unconference for example, a couple of
months before the 1st CF of 9.7. We may perhaps (or not) get to
cleaner picture of what kind of things are wanted in this area.To give overview of xtm coupled with postgres_fdw from users perspective i’ve packed patched postgres with docker
and provided test case when it is easy to spot violation of READ COMMITTED isolation level without XTM.This test fills database with users across two shards connected by postgres_fdw and inherits the same table. Then
starts to concurrently transfers money between users in different shards:begin;
update t set v = v - 1 where u=%d; -- this is user from t_fdw1, first shard
update t set v = v + 1 where u=%d; -- this is user from t_fdw2, second shard
commit;Also test simultaneously runs reader thread that counts all money in system:
select sum(v) from t;
So in transactional system we expect that sum should be always constant (zero in our case, as we initialize user with zero balance).
But we can see that without tsdtm total amount of money fluctuates around zero.
This is an interesting example but I don't believe it does much to
address the concerns that were raised in this thread.
As far as I can see the consensus is that this patch should not be
considered for the current CF so I have marked it "returned with feedback".
If possible please follow Michael's advice and create a session at the
PGCon unconference in May. I'm certain there will be a lot of interest.
--
-David
david@pgmasters.net
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers