From f023c9acaea4d40b5d2d71a543d1d8563e0d5079 Mon Sep 17 00:00:00 2001 From: Stas Kelvich Date: Wed, 25 Apr 2018 16:22:30 +0300 Subject: [PATCH 2/3] Global snapshots --- src/backend/access/transam/Makefile | 2 +- src/backend/access/transam/global_snapshot.c | 747 ++++++++++++++++++++++++++ src/backend/access/transam/twophase.c | 156 ++++++ src/backend/access/transam/xact.c | 29 + src/backend/access/transam/xlog.c | 2 + src/backend/storage/ipc/ipci.c | 3 + src/backend/storage/ipc/procarray.c | 103 +++- src/backend/storage/lmgr/lwlocknames.txt | 1 + src/backend/storage/lmgr/proc.c | 5 + src/backend/utils/misc/guc.c | 11 + src/backend/utils/misc/postgresql.conf.sample | 2 + src/backend/utils/time/snapmgr.c | 98 ++++ src/backend/utils/time/tqual.c | 65 ++- src/include/access/global_snapshot.h | 72 +++ src/include/access/twophase.h | 1 + src/include/catalog/pg_proc.dat | 14 + src/include/datatype/timestamp.h | 3 + src/include/fmgr.h | 1 + src/include/portability/instr_time.h | 10 + src/include/storage/proc.h | 15 + src/include/storage/procarray.h | 8 + src/include/utils/snapmgr.h | 3 + src/include/utils/snapshot.h | 8 + 23 files changed, 1352 insertions(+), 7 deletions(-) create mode 100644 src/backend/access/transam/global_snapshot.c create mode 100644 src/include/access/global_snapshot.h diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile index 03aa360ea3..8ef677cada 100644 --- a/src/backend/access/transam/Makefile +++ b/src/backend/access/transam/Makefile @@ -12,7 +12,7 @@ subdir = src/backend/access/transam top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global -OBJS = clog.o commit_ts.o global_csn_log.o generic_xlog.o \ +OBJS = clog.o commit_ts.o global_csn_log.o global_snapshot.o generic_xlog.o \ multixact.o parallel.o rmgr.o slru.o \ subtrans.o timeline.o transam.o twophase.o twophase_rmgr.o varsup.o \ xact.o xlog.o xlogarchive.o xlogfuncs.o \ diff --git a/src/backend/access/transam/global_snapshot.c b/src/backend/access/transam/global_snapshot.c new file mode 100644 index 0000000000..b0801b5b3c --- /dev/null +++ b/src/backend/access/transam/global_snapshot.c @@ -0,0 +1,747 @@ +/*------------------------------------------------------------------------- + * + * global_snapshot.c + * Support for cross-node snapshot isolation. + * + * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/backend/access/transam/global_snapshot.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/global_csn_log.h" +#include "access/global_snapshot.h" +#include "access/transam.h" +#include "access/twophase.h" +#include "access/xact.h" +#include "portability/instr_time.h" +#include "storage/lmgr.h" +#include "storage/proc.h" +#include "storage/procarray.h" +#include "storage/shmem.h" +#include "storage/spin.h" +#include "utils/builtins.h" +#include "utils/guc.h" +#include "utils/snapmgr.h" +#include "miscadmin.h" + +/* Raise a warning if imported global_csn exceeds ours by this value. */ +#define SNAP_DESYNC_COMPLAIN (1*NSECS_PER_SEC) /* 1 second */ + +/* + * GlobalSnapshotState + * + * Do not trust local clocks to be strictly monotonical and save last acquired + * value so later we can compare next timestamp with it. Accessed through + * GlobalSnapshotGenerate() and GlobalSnapshotSync(). + */ +typedef struct +{ + GlobalCSN last_global_csn; + volatile slock_t lock; +} GlobalSnapshotState; + +static GlobalSnapshotState *gsState; + + +/* + * GUC to delay advance of oldestXid for this amount of time. Also determines + * the size GlobalSnapshotXidMap circular buffer. + */ +int global_snapshot_defer_time; + +/* + * Enables this module. + */ +extern bool track_global_snapshots; + +/* + * GlobalSnapshotXidMap + * + * To be able to install global snapshot that points to past we need to keep + * old versions of tuples and therefore delay advance of oldestXid. Here we + * keep track of correspondence between snapshot's global_csn and oldestXid + * that was set at the time when the snapshot was taken. Much like the + * snapshot too old's OldSnapshotControlData does, but with finer granularity + * to seconds. + * + * Different strategies can be employed to hold oldestXid (e.g. we can track + * oldest global_csn-based snapshot among cluster nodes and map it oldestXid + * on each node) but here implemented one that tries to avoid cross-node + * communications which are tricky in case of postgres_fdw. + * + * On each snapshot acquisition GlobalSnapshotMapXmin() is called and stores + * correspondence between current global_csn and oldestXmin in a sparse way: + * global_csn is rounded to seconds (and here we use the fact that global_csn + * is just a timestamp) and oldestXmin is stored in the circular buffer where + * rounded global_csn acts as an offset from current circular buffer head. + * Size of the circular buffer is controlled by global_snapshot_defer_time GUC. + * + * When global snapshot arrives from different node we check that its + * global_csn is still in our map, otherwise we'll error out with "snapshot too + * old" message. If global_csn is successfully mapped to oldestXid we move + * backend's pgxact->xmin to proc->originalXmin and fill pgxact->xmin to + * mapped oldestXid. That way GetOldestXmin() can take into account backends + * with imported global snapshot and old tuple versions will be preserved. + * + * Also while calculating oldestXmin for our map in presence of imported + * global snapshots we should use proc->originalXmin instead of pgxact->xmin + * that was set during import. Otherwise, we can create a feedback loop: + * xmin's of imported global snapshots were calculated using our map and new + * entries in map going to be calculated based on that xmin's, and there is + * a risk to stuck forever with one non-increasing oldestXmin. All other + * callers of GetOldestXmin() are using pgxact->xmin so the old tuple versions + * are preserved. + */ +typedef struct GlobalSnapshotXidMap +{ + int head; /* offset of current freshest value */ + int size; /* total size of circular buffer */ + GlobalCSN_atomic last_csn_seconds; /* last rounded global_csn that changed + * xmin_by_second[] */ + TransactionId *xmin_by_second; /* circular buffer of oldestXmin's */ +} +GlobalSnapshotXidMap; + +static GlobalSnapshotXidMap *gsXidMap; + + +/* Estimate shared memory space needed */ +Size +GlobalSnapshotShmemSize(void) +{ + Size size = 0; + + if (track_global_snapshots || global_snapshot_defer_time > 0) + { + size += MAXALIGN(sizeof(GlobalSnapshotState)); + } + + if (global_snapshot_defer_time > 0) + { + size += sizeof(GlobalSnapshotXidMap); + size += global_snapshot_defer_time*sizeof(TransactionId); + size = MAXALIGN(size); + } + + return size; +} + +/* Init shared memory structures */ +void +GlobalSnapshotShmemInit() +{ + bool found; + + if (track_global_snapshots || global_snapshot_defer_time > 0) + { + gsState = ShmemInitStruct("gsState", + sizeof(GlobalSnapshotState), + &found); + if (!found) + { + gsState->last_global_csn = 0; + SpinLockInit(&gsState->lock); + } + } + + if (global_snapshot_defer_time > 0) + { + gsXidMap = ShmemInitStruct("gsXidMap", + sizeof(GlobalSnapshotXidMap), + &found); + if (!found) + { + int i; + + pg_atomic_init_u64(&gsXidMap->last_csn_seconds, 0); + gsXidMap->head = 0; + gsXidMap->size = global_snapshot_defer_time; + gsXidMap->xmin_by_second = + ShmemAlloc(sizeof(TransactionId)*gsXidMap->size); + + for (i = 0; i < gsXidMap->size; i++) + gsXidMap->xmin_by_second[i] = InvalidTransactionId; + } + } +} + +/* + * GlobalSnapshotStartup + * + * Set gsXidMap etnries to oldestActiveXID during startup. + */ +void +GlobalSnapshotStartup(TransactionId oldestActiveXID) +{ + /* + * Run only if we have initialized shared memory and gsXidMap + * is enabled. + */ + if (IsNormalProcessingMode() && global_snapshot_defer_time > 0) + { + int i; + + Assert(TransactionIdIsValid(oldestActiveXID)); + for (i = 0; i < gsXidMap->size; i++) + gsXidMap->xmin_by_second[i] = oldestActiveXID; + ProcArraySetGlobalSnapshotXmin(oldestActiveXID); + } +} + +/* + * GlobalSnapshotMapXmin + * + * Maintain circular buffer of oldestXmins for several seconds in past. This + * buffer allows to shift oldestXmin in the past when backend is importing + * global transaction. Otherwise old versions of tuples that were needed for + * this transaction can be recycled by other processes (vacuum, HOT, etc). + * + * Called upon each snapshot creation after ProcArrayLock is released. Such + * usage creates a race condition. It is possible that backend who got + * glabal_csn called GlobalSnapshotMapXmin() only after other backends managed + * to get snapshot and complete GlobalSnapshotMapXmin() call. To address that + * race we do two thigs: + * + * * snapshot_global_csn is always rounded up to next second. So that is + * okay if call to GlobalSnapshotMapXmin() with later global_csn will + * succeed first -- it anyway will be taken into account for a next + * second. + * + * * if GlobalSnapshotMapXmin() founds a gap in several seconds between + * current call and latest completed call then it should fill that gap + * with latest known values instead of new one. Otherwise it is possible + * (however highly unlikely) that this gap also happend between taking + * snapshot and call to GlobalSnapshotMapXmin() for some backend. And we + * are at risk to fill circullar buffer with oldestXmin's that are + * bigger then they actually were. + */ +void +GlobalSnapshotMapXmin(GlobalCSN snapshot_global_csn) +{ + int offset, gap, i; + GlobalCSN csn_seconds; + GlobalCSN last_csn_seconds; + volatile TransactionId oldest_deferred_xmin; + TransactionId current_oldest_xmin, previous_oldest_xmin; + + /* Callers should check config values */ + Assert(global_snapshot_defer_time > 0); + Assert(gsXidMap != NULL); + + /* + * We don't have guarantee that process who called us first for this + * csn_seconds is actually one who took snapshot firt in this second. + * So just round up global_csn to the next second -- snapshots for next + * second would have oldestXmin greater or equal then ours anyway. + */ + csn_seconds = (snapshot_global_csn / NSECS_PER_SEC + 1); + + /* + * Fast-path check. Avoid taking exclusive GlobalSnapshotXidMapLock lock + * if oldestXid was already written to xmin_by_second[] for this rounded + * global_csn. + */ + if (pg_atomic_read_u64(&gsXidMap->last_csn_seconds) >= csn_seconds) + return; + + /* Ok, we have new entry (or entries) */ + LWLockAcquire(GlobalSnapshotXidMapLock, LW_EXCLUSIVE); + + /* Re-check last_csn_seconds under lock */ + last_csn_seconds = pg_atomic_read_u64(&gsXidMap->last_csn_seconds); + if (last_csn_seconds >= csn_seconds) + { + LWLockRelease(GlobalSnapshotXidMapLock); + return; + } + pg_atomic_write_u64(&gsXidMap->last_csn_seconds, csn_seconds); + + /* + * Count oldest_xmin. + * + * It was possible to calculate oldest_xmin during corresponding snapshot + * creation, but GetSnapshotData() intentionally reads only PgXact, but not + * PgProc. And we need info about originalXmin (see comment to gsXidMap) + * which is stored in PgProc because of threats in comments around PgXact + * about extending it with new fields. So just calculate oldest_xmin again, + * that anyway happens quite rarely. + */ + current_oldest_xmin = GetOldestXmin(NULL, PROCARRAY_NON_IMPORTED_XMIN); + + previous_oldest_xmin = gsXidMap->xmin_by_second[gsXidMap->head]; + + Assert(TransactionIdIsNormal(current_oldest_xmin)); + Assert(TransactionIdIsNormal(previous_oldest_xmin)); + Assert(TransactionIdFollowsOrEquals(current_oldest_xmin, previous_oldest_xmin)); + + gap = csn_seconds - last_csn_seconds; + offset = csn_seconds % gsXidMap->size; + + /* Sanity check before we update head and gap */ + Assert( gap >= 1 ); + Assert( (gsXidMap->head + gap) % gsXidMap->size == offset ); + + gap = gap > gsXidMap->size ? gsXidMap->size : gap; + gsXidMap->head = offset; + + /* Fill new entry with current_oldest_xmin */ + gsXidMap->xmin_by_second[offset] = current_oldest_xmin; + offset = (offset + gsXidMap->size - 1) % gsXidMap->size; + + /* + * If we have gap then fill it with previous_oldest_xmin for reasons + * outlined in comment above this function. + */ + for (i = 1; i < gap; i++) + { + gsXidMap->xmin_by_second[offset] = previous_oldest_xmin; + offset = (offset + gsXidMap->size - 1) % gsXidMap->size; + } + + oldest_deferred_xmin = + gsXidMap->xmin_by_second[ (gsXidMap->head + 1) % gsXidMap->size ]; + + LWLockRelease(GlobalSnapshotXidMapLock); + + /* + * Advance procArray->global_snapshot_xmin after we released + * GlobalSnapshotXidMapLock. + */ + ProcArraySetGlobalSnapshotXmin(oldest_deferred_xmin); +} + + +/* + * GlobalSnapshotToXmin + * + * Get oldestXmin that took place when snapshot_global_csn was taken. + */ +TransactionId +GlobalSnapshotToXmin(GlobalCSN snapshot_global_csn) +{ + TransactionId xmin; + GlobalCSN csn_seconds; + volatile GlobalCSN last_csn_seconds; + + /* Callers should check config values */ + Assert(global_snapshot_defer_time > 0); + Assert(gsXidMap != NULL); + + /* Round down to get conservative estimates */ + csn_seconds = (snapshot_global_csn / NSECS_PER_SEC); + + LWLockAcquire(GlobalSnapshotXidMapLock, LW_SHARED); + last_csn_seconds = pg_atomic_read_u64(&gsXidMap->last_csn_seconds); + if (csn_seconds > last_csn_seconds) + { + /* we don't have entry for this global_csn yet, return latest known */ + xmin = gsXidMap->xmin_by_second[gsXidMap->head]; + } + else if (last_csn_seconds - csn_seconds < gsXidMap->size) + { + /* we are good, retrieve value from our map */ + Assert(last_csn_seconds % gsXidMap->size == gsXidMap->head); + xmin = gsXidMap->xmin_by_second[csn_seconds % gsXidMap->size]; + } + else + { + /* requested global_csn is too old, let caller know */ + xmin = InvalidTransactionId; + } + LWLockRelease(GlobalSnapshotXidMapLock); + + return xmin; +} + +/* + * GlobalSnapshotGenerate + * + * Generate GlobalCSN which is actually a local time. Also we are forcing + * this time to be always increasing. Since now it is not uncommon to have + * millions of read transactions per second we are trying to use nanoseconds + * if such time resolution is available. + */ +GlobalCSN +GlobalSnapshotGenerate(bool locked) +{ + instr_time current_time; + GlobalCSN global_csn; + + Assert(track_global_snapshots || global_snapshot_defer_time > 0); + + /* + * TODO: create some macro that add small random shift to current time. + */ + INSTR_TIME_SET_CURRENT(current_time); + global_csn = (GlobalCSN) INSTR_TIME_GET_NANOSEC(current_time); + + /* TODO: change to atomics? */ + if (!locked) + SpinLockAcquire(&gsState->lock); + + if (global_csn <= gsState->last_global_csn) + global_csn = ++gsState->last_global_csn; + else + gsState->last_global_csn = global_csn; + + if (!locked) + SpinLockRelease(&gsState->lock); + + return global_csn; +} + +/* + * GlobalSnapshotSync + * + * Due to time desynchronization on different nodes we can receive global_csn + * which is greater than global_csn on this node. To preserve proper isolation + * this node needs to wait when such global_csn comes on local clock. + * + * This should happend relatively rare if nodes have running NTP/PTP/etc. + * Complain if wait time is more than SNAP_SYNC_COMPLAIN. + */ +void +GlobalSnapshotSync(GlobalCSN remote_gcsn) +{ + GlobalCSN local_gcsn; + GlobalCSN delta; + + Assert(track_global_snapshots); + + for(;;) + { + SpinLockAcquire(&gsState->lock); + if (gsState->last_global_csn > remote_gcsn) + { + /* Everything is fine */ + SpinLockRelease(&gsState->lock); + return; + } + else if ((local_gcsn = GlobalSnapshotGenerate(true)) >= remote_gcsn) + { + /* + * Everything is fine too, but last_global_csn wasn't updated for + * some time. + */ + SpinLockRelease(&gsState->lock); + return; + } + SpinLockRelease(&gsState->lock); + + /* Okay we need to sleep now */ + delta = remote_gcsn - local_gcsn; + if (delta > SNAP_DESYNC_COMPLAIN) + ereport(WARNING, + (errmsg("remote global snapshot exceeds ours by more than a second"), + errhint("Consider running NTPd on servers participating in global transaction"))); + + /* TODO: report this sleeptime somewhere? */ + pg_usleep((long) (delta/NSECS_PER_USEC)); + + /* + * Loop that checks to ensure that we actually slept for specified + * amount of time. + */ + } + + Assert(false); /* Should not happend */ + return; +} + +/* + * TransactionIdGetGlobalCSN + * + * Get GlobalCSN for specified TransactionId taking care about special xids, + * xids beyond TransactionXmin and InDoubt states. + */ +GlobalCSN +TransactionIdGetGlobalCSN(TransactionId xid) +{ + GlobalCSN global_csn; + + Assert(track_global_snapshots); + + /* Handle permanent TransactionId's for which we don't have mapping */ + if (!TransactionIdIsNormal(xid)) + { + if (xid == InvalidTransactionId) + return AbortedGlobalCSN; + if (xid == FrozenTransactionId || xid == BootstrapTransactionId) + return FrozenGlobalCSN; + Assert(false); /* Should not happend */ + } + + /* + * For xids which less then TransactionXmin GlobalCSNLog can be already + * trimmed but we know that such transaction is definetly not concurrently + * running according to any snapshot including timetravel ones. Callers + * should check TransactionDidCommit after. + */ + if (TransactionIdPrecedes(xid, TransactionXmin)) + return FrozenGlobalCSN; + + /* Read GlobalCSN from SLRU */ + global_csn = GlobalCSNLogGetCSN(xid); + + /* + * If we faced InDoubt state then transaction is beeing committed and we + * should wait until GlobalCSN will be assigned so that visibility check + * could decide whether tuple is in snapshot. See also comments in + * GlobalSnapshotPrecommit(). + */ + if (GlobalCSNIsInDoubt(global_csn)) + { + XactLockTableWait(xid, NULL, NULL, XLTW_None); + global_csn = GlobalCSNLogGetCSN(xid); + Assert(GlobalCSNIsNormal(global_csn) || + GlobalCSNIsAborted(global_csn)); + } + + Assert(GlobalCSNIsNormal(global_csn) || + GlobalCSNIsInProgress(global_csn) || + GlobalCSNIsAborted(global_csn)); + + return global_csn; +} + +/* + * XidInvisibleInGlobalSnapshot + * + * Version of XidInMVCCSnapshot for global transactions. For non-imported + * global snapshots this should give same results as XidInLocalMVCCSnapshot + * (except that aborts will be shown as invisible without going to clog) and to + * ensure such behaviour XidInMVCCSnapshot is coated with asserts that checks + * identicalness of XidInvisibleInGlobalSnapshot/XidInLocalMVCCSnapshot in + * case of ordinary snapshot. + */ +bool +XidInvisibleInGlobalSnapshot(TransactionId xid, Snapshot snapshot) +{ + GlobalCSN csn; + + Assert(track_global_snapshots); + + csn = TransactionIdGetGlobalCSN(xid); + + if (GlobalCSNIsNormal(csn)) + { + if (csn < snapshot->global_csn) + return false; + else + return true; + } + else if (GlobalCSNIsFrozen(csn)) + { + /* It is bootstrap or frozen transaction */ + return false; + } + else + { + /* It is aborted or in-progress */ + Assert(GlobalCSNIsAborted(csn) || GlobalCSNIsInProgress(csn)); + if (GlobalCSNIsAborted(csn)) + Assert(TransactionIdDidAbort(xid)); + return true; + } +} + + +/***************************************************************************** + * Functions to handle distributed commit on transaction coordinator: + * GlobalSnapshotPrepareCurrent() / GlobalSnapshotAssignCsnCurrent(). + * Correspoding functions for remote nodes are defined in twophase.c: + * pg_global_snaphot_prepare/pg_global_snaphot_assign. + *****************************************************************************/ + + +/* + * GlobalSnapshotPrepareCurrent + * + * Set InDoubt state for currently active transaction and return commit's + * global snapshot. + */ +GlobalCSN +GlobalSnapshotPrepareCurrent() +{ + TransactionId xid = GetCurrentTransactionIdIfAny(); + + if (!track_global_snapshots) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not prepare transaction for global commit"), + errhint("Make sure the configuration parameter \"%s\" is enabled.", + "track_global_snapshots"))); + + if (TransactionIdIsValid(xid)) + { + TransactionId *subxids; + int nsubxids = xactGetCommittedChildren(&subxids); + GlobalCSNLogSetCSN(xid, nsubxids, + subxids, InDoubtGlobalCSN); + } + + /* Nothing to write if we don't heve xid */ + + return GlobalSnapshotGenerate(false); +} + +/* + * GlobalSnapshotAssignCsnCurrent + * + * Asign GlobalCSN for currently active transaction. GlobalCSN is supposedly + * maximal among of values returned by GlobalSnapshotPrepareCurrent and + * pg_global_snaphot_prepare. + */ +void +GlobalSnapshotAssignCsnCurrent(GlobalCSN global_csn) +{ + if (!track_global_snapshots) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not prepare transaction for global commit"), + errhint("Make sure the configuration parameter \"%s\" is enabled.", + "track_global_snapshots"))); + + if (!GlobalCSNIsNormal(global_csn)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("pg_global_snaphot_assign expects normal global_csn"))); + + /* Skip emtpty transactions */ + if (!TransactionIdIsValid(GetCurrentTransactionIdIfAny())) + return; + + /* Set global_csn and defuse ProcArrayEndTransaction from assigning one */ + pg_atomic_write_u64(&MyProc->assignedGlobalCsn, global_csn); +} + + +/***************************************************************************** + * Functions to handle global and local transactions commit. + * + * For local transactions GlobalSnapshotPrecommit sets InDoubt state before + * ProcArrayEndTransaction is called and transaction data potetntially becomes + * visible to other backends. ProcArrayEndTransaction (or ProcArrayRemove in + * twophase case) then acquires global_csn under ProcArray lock and stores it + * in proc->assignedGlobalCsn. It's important that global_csn for commit is + * generated under ProcArray lock, otherwise global and local snapshots won't + * be equivalent. Consequent call to GlobalSnapshotCommit will write + * proc->assignedGlobalCsn to GlobalCSNLog. + * + * Same rules applies to global transaction, except that global_csn is already + * assigned by GlobalSnapshotAssignCsnCurrent/pg_global_snaphot_assign and + * GlobalSnapshotPrecommit is basically no-op. + * + * GlobalSnapshotAbort is slightly different comparing to commit because abort + * can skip InDoubt phase and can be called for transaction subtree. + *****************************************************************************/ + + +/* + * GlobalSnapshotAbort + * + * Abort transaction in GlobalCsnLog. We can skip InDoubt state for aborts + * since no concurrent transactions allowed to see aborted data anyway. + */ +void +GlobalSnapshotAbort(PGPROC *proc, TransactionId xid, + int nsubxids, TransactionId *subxids) +{ + if (!track_global_snapshots) + return; + + GlobalCSNLogSetCSN(xid, nsubxids, subxids, AbortedGlobalCSN); + + /* + * Clean assignedGlobalCsn anyway, as it was possibly set in + * GlobalSnapshotAssignCsnCurrent. + */ + pg_atomic_write_u64(&proc->assignedGlobalCsn, InProgressGlobalCSN); +} + +/* + * GlobalSnapshotPrecommit + * + * Set InDoubt status for local transaction that we are going to commit. + * This step is needed to achieve consistency between local snapshots and + * global csn-based snapshots. We don't hold ProcArray lock while writing + * csn for transaction in SLRU but instead we set InDoubt status before + * transaction is deleted from ProcArray so the readers who will read csn + * in the gap between ProcArray removal and GlobalCSN assignment can wait + * until GlobalCSN is finally assigned. See also TransactionIdGetGlobalCSN(). + * + * For global transaction this does nothing as InDoubt state was written + * earlier. + * + * This should be called only from parallel group leader before backend is + * deleted from ProcArray. + */ +void +GlobalSnapshotPrecommit(PGPROC *proc, TransactionId xid, + int nsubxids, TransactionId *subxids) +{ + GlobalCSN oldAssignedGlobalCsn = InProgressGlobalCSN; + bool in_progress; + + if (!track_global_snapshots) + return; + + /* Set InDoubt status if it is local transaction */ + in_progress = pg_atomic_compare_exchange_u64(&proc->assignedGlobalCsn, + &oldAssignedGlobalCsn, + InDoubtGlobalCSN); + if (in_progress) + { + Assert(GlobalCSNIsInProgress(oldAssignedGlobalCsn)); + GlobalCSNLogSetCSN(xid, nsubxids, + subxids, InDoubtGlobalCSN); + } + else + { + /* Otherwise we should have valid GlobalCSN by this time */ + Assert(GlobalCSNIsNormal(oldAssignedGlobalCsn)); + /* Also global transaction should already be in InDoubt state */ + Assert(GlobalCSNIsInDoubt(GlobalCSNLogGetCSN(xid))); + } +} + +/* + * GlobalSnapshotCommit + * + * Write GlobalCSN that were acquired earlier to GlobalCsnLog. Should be + * preceded by GlobalSnapshotPrecommit() so readers can wait until we finally + * finished writing to SLRU. + * + * Should be called after ProcArrayEndTransaction, but before releasing + * transaction locks, so that TransactionIdGetGlobalCSN can wait on this + * lock for GlobalCSN. + */ +void +GlobalSnapshotCommit(PGPROC *proc, TransactionId xid, + int nsubxids, TransactionId *subxids) +{ + volatile GlobalCSN assigned_global_csn; + + if (!track_global_snapshots) + return; + + if (!TransactionIdIsValid(xid)) + { + assigned_global_csn = pg_atomic_read_u64(&proc->assignedGlobalCsn); + Assert(GlobalCSNIsInProgress(assigned_global_csn)); + return; + } + + /* Finally write resulting GlobalCSN in SLRU */ + assigned_global_csn = pg_atomic_read_u64(&proc->assignedGlobalCsn); + Assert(GlobalCSNIsNormal(assigned_global_csn)); + GlobalCSNLogSetCSN(xid, nsubxids, + subxids, assigned_global_csn); + + /* Reset for next transaction */ + pg_atomic_write_u64(&proc->assignedGlobalCsn, InProgressGlobalCSN); +} diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index a144e2d6a9..f3fa3283f8 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -77,6 +77,7 @@ #include #include "access/commit_ts.h" +#include "access/global_snapshot.h" #include "access/global_csn_log.h" #include "access/htup_details.h" #include "access/subtrans.h" @@ -1508,8 +1509,34 @@ FinishPreparedTransaction(const char *gid, bool isCommit) hdr->nabortrels, abortrels, gid); + /* + * GlobalSnapshot callbacks that should be called right before we are + * going to become visible. Details in comments to this functions. + */ + if (isCommit) + GlobalSnapshotPrecommit(proc, xid, hdr->nsubxacts, children); + else + GlobalSnapshotAbort(proc, xid, hdr->nsubxacts, children); + + ProcArrayRemove(proc, latestXid); + /* + * Stamp our transaction with GlobalCSN in GlobalCsnLog. + * Should be called after ProcArrayEndTransaction, but before releasing + * transaction locks, since TransactionIdGetGlobalCSN relies on + * XactLockTableWait to await global_csn. + */ + if (isCommit) + { + GlobalSnapshotCommit(proc, xid, hdr->nsubxacts, children); + } + else + { + GlobalCSN global_csn = pg_atomic_read_u64(&proc->assignedGlobalCsn); + Assert(GlobalCSNIsInProgress(global_csn)); + } + /* * In case we fail while running the callbacks, mark the gxact invalid so * no one else will try to commit/rollback, and so it will be recycled if @@ -2486,3 +2513,132 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning) return; } + +/* + * GlobalSnapshotPrepareTwophase + * + * Set InDoubt state for currently active transaction and return commit's + * global snapshot. + * + * This function is a counterpart of GlobalSnapshotPrepareCurrent() for + * twophase transactions. + */ +static GlobalCSN +GlobalSnapshotPrepareTwophase(const char *gid) +{ + GlobalTransaction gxact; + PGXACT *pgxact; + char *buf; + TransactionId xid; + xl_xact_parsed_prepare parsed; + + if (!track_global_snapshots) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not prepare transaction for global commit"), + errhint("Make sure the configuration parameter \"%s\" is enabled.", + "track_global_snapshots"))); + + /* + * Validate the GID, and lock the GXACT to ensure that two backends do not + * try to access the same GID at once. + */ + gxact = LockGXact(gid, GetUserId()); + pgxact = &ProcGlobal->allPgXact[gxact->pgprocno]; + xid = pgxact->xid; + + if (gxact->ondisk) + buf = ReadTwoPhaseFile(xid, true); + else + XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL); + + ParsePrepareRecord(0, buf, &parsed); + + GlobalCSNLogSetCSN(xid, parsed.nsubxacts, + parsed.subxacts, InDoubtGlobalCSN); + + /* Unlock our GXACT */ + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); + gxact->locking_backend = InvalidBackendId; + LWLockRelease(TwoPhaseStateLock); + + pfree(buf); + + return GlobalSnapshotGenerate(false); +} + +/* + * SQL interface to GlobalSnapshotPrepareTwophase() + * + * TODO: Rewrite this as PREPARE TRANSACTION 'gid' RETURNING SNAPSHOT + */ +Datum +pg_global_snaphot_prepare(PG_FUNCTION_ARGS) +{ + const char *gid = text_to_cstring(PG_GETARG_TEXT_PP(0)); + GlobalCSN global_csn; + + global_csn = GlobalSnapshotPrepareTwophase(gid); + + PG_RETURN_INT64(global_csn); +} + + +/* + * TwoPhaseAssignGlobalCsn + * + * Asign GlobalCSN for currently active transaction. GlobalCSN is supposedly + * maximal among of values returned by GlobalSnapshotPrepareCurrent and + * pg_global_snaphot_prepare. + * + * This function is a counterpart of GlobalSnapshotAssignCsnCurrent() for + * twophase transactions. + */ +static void +GlobalSnapshotAssignCsnTwoPhase(const char *gid, GlobalCSN global_csn) +{ + GlobalTransaction gxact; + PGPROC *proc; + + if (!track_global_snapshots) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not prepare transaction for global commit"), + errhint("Make sure the configuration parameter \"%s\" is enabled.", + "track_global_snapshots"))); + + if (!GlobalCSNIsNormal(global_csn)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("pg_global_snaphot_assign expects normal global_csn"))); + + /* + * Validate the GID, and lock the GXACT to ensure that two backends do not + * try to access the same GID at once. + */ + gxact = LockGXact(gid, GetUserId()); + proc = &ProcGlobal->allProcs[gxact->pgprocno]; + + /* Set global_csn and defuse ProcArrayRemove from assigning one. */ + pg_atomic_write_u64(&proc->assignedGlobalCsn, global_csn); + + /* Unlock our GXACT */ + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); + gxact->locking_backend = InvalidBackendId; + LWLockRelease(TwoPhaseStateLock); +} + +/* + * SQL interface to GlobalSnapshotAssignCsnTwoPhase() + * + * TODO: Rewrite this as COMMIT PREPARED 'gid' SNAPSHOT 'global_csn' + */ +Datum +pg_global_snaphot_assign(PG_FUNCTION_ARGS) +{ + const char *gid = text_to_cstring(PG_GETARG_TEXT_PP(0)); + GlobalCSN global_csn = PG_GETARG_INT64(1); + + GlobalSnapshotAssignCsnTwoPhase(gid, global_csn); + PG_RETURN_VOID(); +} diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index f4e5ea84b9..44d57e2031 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -21,6 +21,7 @@ #include #include "access/commit_ts.h" +#include "access/global_snapshot.h" #include "access/multixact.h" #include "access/parallel.h" #include "access/subtrans.h" @@ -1341,6 +1342,14 @@ RecordTransactionCommit(void) /* Reset XactLastRecEnd until the next transaction writes something */ XactLastRecEnd = 0; + + /* + * Mark our transaction as InDoubt in GlobalCsnLog and get ready for + * commit. + */ + if (markXidCommitted) + GlobalSnapshotPrecommit(MyProc, xid, nchildren, children); + cleanup: /* Clean up local data */ if (rels) @@ -1602,6 +1611,11 @@ RecordTransactionAbort(bool isSubXact) */ TransactionIdAbortTree(xid, nchildren, children); + /* + * Mark our transaction as Aborted in GlobalCsnLog. + */ + GlobalSnapshotAbort(MyProc, xid, nchildren, children); + END_CRIT_SECTION(); /* Compute latestXid while we have the child XIDs handy */ @@ -2059,6 +2073,21 @@ CommitTransaction(void) */ ProcArrayEndTransaction(MyProc, latestXid); + /* + * Stamp our transaction with GlobalCSN in GlobalCsnLog. + * Should be called after ProcArrayEndTransaction, but before releasing + * transaction locks. + */ + if (!is_parallel_worker) + { + TransactionId xid = GetTopTransactionIdIfAny(); + TransactionId *subxids; + int nsubxids; + + nsubxids = xactGetCommittedChildren(&subxids); + GlobalSnapshotCommit(MyProc, xid, nsubxids, subxids); + } + /* * This is all post-commit cleanup. Note that if an error is raised here, * it's too late to abort the transaction. This should be just diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 749b0c1ce2..71151fc3df 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -6967,6 +6967,7 @@ StartupXLOG(void) StartupCLOG(); StartupGlobalCSNLog(oldestActiveXID); StartupSUBTRANS(oldestActiveXID); + GlobalSnapshotStartup(oldestActiveXID); /* * If we're beginning at a shutdown checkpoint, we know that @@ -7753,6 +7754,7 @@ StartupXLOG(void) StartupCLOG(); StartupGlobalCSNLog(oldestActiveXID); StartupSUBTRANS(oldestActiveXID); + GlobalSnapshotStartup(oldestActiveXID); } /* diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 2af468fc6a..3a04f6a824 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -22,6 +22,7 @@ #include "access/nbtree.h" #include "access/subtrans.h" #include "access/twophase.h" +#include "access/global_snapshot.h" #include "commands/async.h" #include "miscadmin.h" #include "pgstat.h" @@ -147,6 +148,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) size = add_size(size, WalSndShmemSize()); size = add_size(size, WalRcvShmemSize()); size = add_size(size, ApplyLauncherShmemSize()); + size = add_size(size, GlobalSnapshotShmemSize()); size = add_size(size, SnapMgrShmemSize()); size = add_size(size, BTreeShmemSize()); size = add_size(size, SyncScanShmemSize()); @@ -273,6 +275,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) SyncScanShmemInit(); AsyncShmemInit(); BackendRandomShmemInit(); + GlobalSnapshotShmemInit(); #ifdef EXEC_BACKEND diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index bc59fca21c..c6cb63fe97 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -47,6 +47,7 @@ #include "access/clog.h" #include "access/global_csn_log.h" +#include "access/global_snapshot.h" #include "access/subtrans.h" #include "access/transam.h" #include "access/twophase.h" @@ -91,6 +92,8 @@ typedef struct ProcArrayStruct TransactionId replication_slot_xmin; /* oldest catalog xmin of any replication slot */ TransactionId replication_slot_catalog_xmin; + /* xmin of oldest active global snapshot */ + TransactionId global_snapshot_xmin; /* indexes into allPgXact[], has PROCARRAY_MAXPROCS entries */ int pgprocnos[FLEXIBLE_ARRAY_MEMBER]; @@ -246,6 +249,7 @@ CreateSharedProcArray(void) procArray->lastOverflowedXid = InvalidTransactionId; procArray->replication_slot_xmin = InvalidTransactionId; procArray->replication_slot_catalog_xmin = InvalidTransactionId; + procArray->global_snapshot_xmin = InvalidTransactionId; } allProcs = ProcGlobal->allProcs; @@ -352,6 +356,17 @@ ProcArrayRemove(PGPROC *proc, TransactionId latestXid) if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid, latestXid)) ShmemVariableCache->latestCompletedXid = latestXid; + + /* + * Assign global csn while holding ProcArrayLock for non-global + * COMMIT PREPARED. After lock is released consequent + * GlobalSnapshotCommit() will write this value to GlobalCsnLog. + * + * In case of global commit proc->assignedGlobalCsn is already set + * by prior AssignGlobalCsn(). + */ + if (GlobalCSNIsInDoubt(pg_atomic_read_u64(&proc->assignedGlobalCsn))) + pg_atomic_write_u64(&proc->assignedGlobalCsn, GlobalSnapshotGenerate(false)); } else { @@ -432,6 +447,8 @@ ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid) proc->lxid = InvalidLocalTransactionId; pgxact->xmin = InvalidTransactionId; + proc->originalXmin = InvalidTransactionId; + /* must be cleared with xid/xmin: */ pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK; pgxact->delayChkpt = false; /* be sure this is cleared in abort */ @@ -454,6 +471,8 @@ ProcArrayEndTransactionInternal(PGPROC *proc, PGXACT *pgxact, pgxact->xid = InvalidTransactionId; proc->lxid = InvalidLocalTransactionId; pgxact->xmin = InvalidTransactionId; + proc->originalXmin = InvalidTransactionId; + /* must be cleared with xid/xmin: */ pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK; pgxact->delayChkpt = false; /* be sure this is cleared in abort */ @@ -467,6 +486,20 @@ ProcArrayEndTransactionInternal(PGPROC *proc, PGXACT *pgxact, if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid, latestXid)) ShmemVariableCache->latestCompletedXid = latestXid; + + /* + * Assign global csn while holding ProcArrayLock for non-global + * COMMIT. After lock is released consequent GlobalSnapshotFinish() will + * write this value to GlobalCsnLog. + * + * In case of global commit MyProc->assignedGlobalCsn is already set + * by prior AssignGlobalCsn(). + * + * TODO: in case of group commit we can generate one GlobalSnapshot for + * whole group to save time on timestamp aquisition. + */ + if (GlobalCSNIsInDoubt(pg_atomic_read_u64(&proc->assignedGlobalCsn))) + pg_atomic_write_u64(&proc->assignedGlobalCsn, GlobalSnapshotGenerate(false)); } /* @@ -616,6 +649,7 @@ ProcArrayClearTransaction(PGPROC *proc) pgxact->xid = InvalidTransactionId; proc->lxid = InvalidLocalTransactionId; pgxact->xmin = InvalidTransactionId; + proc->originalXmin = InvalidTransactionId; proc->recoveryConflictPending = false; /* redundant, but just in case */ @@ -1323,6 +1357,7 @@ GetOldestXmin(Relation rel, int flags) volatile TransactionId replication_slot_xmin = InvalidTransactionId; volatile TransactionId replication_slot_catalog_xmin = InvalidTransactionId; + volatile TransactionId global_snapshot_xmin = InvalidTransactionId; /* * If we're not computing a relation specific limit, or if a shared @@ -1359,8 +1394,9 @@ GetOldestXmin(Relation rel, int flags) proc->databaseId == MyDatabaseId || proc->databaseId == 0) /* always include WalSender */ { - /* Fetch xid just once - see GetNewTransactionId */ + /* Fetch both xids just once - see GetNewTransactionId */ TransactionId xid = pgxact->xid; + TransactionId original_xmin = proc->originalXmin; /* First consider the transaction's own Xid, if any */ if (TransactionIdIsNormal(xid) && @@ -1373,8 +1409,17 @@ GetOldestXmin(Relation rel, int flags) * We must check both Xid and Xmin because a transaction might * have an Xmin but not (yet) an Xid; conversely, if it has an * Xid, that could determine some not-yet-set Xmin. + * + * In case of oldestXmin calculation for GlobalSnapshotMapXmin() + * pgxact->xmin should be changed to proc->originalXmin. Details + * in commets to GlobalSnapshotMapXmin. */ - xid = pgxact->xmin; /* Fetch just once */ + if ((flags & PROCARRAY_NON_IMPORTED_XMIN) && + TransactionIdIsValid(original_xmin)) + xid = original_xmin; + else + xid = pgxact->xmin; /* Fetch just once */ + if (TransactionIdIsNormal(xid) && TransactionIdPrecedes(xid, result)) result = xid; @@ -1384,6 +1429,7 @@ GetOldestXmin(Relation rel, int flags) /* fetch into volatile var while ProcArrayLock is held */ replication_slot_xmin = procArray->replication_slot_xmin; replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin; + global_snapshot_xmin = procArray->global_snapshot_xmin; if (RecoveryInProgress()) { @@ -1425,6 +1471,11 @@ GetOldestXmin(Relation rel, int flags) result = FirstNormalTransactionId; } + if (!(flags & PROCARRAY_NON_IMPORTED_XMIN) && + TransactionIdIsValid(global_snapshot_xmin) && + NormalTransactionIdPrecedes(global_snapshot_xmin, result)) + result = global_snapshot_xmin; + /* * Check whether there are replication slots requiring an older xmin. */ @@ -1520,6 +1571,8 @@ GetSnapshotData(Snapshot snapshot) bool suboverflowed = false; volatile TransactionId replication_slot_xmin = InvalidTransactionId; volatile TransactionId replication_slot_catalog_xmin = InvalidTransactionId; + volatile TransactionId global_snapshot_xmin = InvalidTransactionId; + volatile GlobalCSN global_csn = FrozenGlobalCSN; Assert(snapshot != NULL); @@ -1708,10 +1761,18 @@ GetSnapshotData(Snapshot snapshot) /* fetch into volatile var while ProcArrayLock is held */ replication_slot_xmin = procArray->replication_slot_xmin; replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin; + global_snapshot_xmin = procArray->global_snapshot_xmin; if (!TransactionIdIsValid(MyPgXact->xmin)) MyPgXact->xmin = TransactionXmin = xmin; + /* + * Take GlobalCSN under ProcArrayLock so the local/global snapshot stays + * synchronized. + */ + if (track_global_snapshots || global_snapshot_defer_time > 0) + global_csn = GlobalSnapshotGenerate(false); + LWLockRelease(ProcArrayLock); /* @@ -1727,6 +1788,10 @@ GetSnapshotData(Snapshot snapshot) if (!TransactionIdIsNormal(RecentGlobalXmin)) RecentGlobalXmin = FirstNormalTransactionId; + if (TransactionIdIsValid(global_snapshot_xmin) && + TransactionIdPrecedes(global_snapshot_xmin, RecentGlobalXmin)) + RecentGlobalXmin = global_snapshot_xmin; + /* Check whether there's a replication slot requiring an older xmin. */ if (TransactionIdIsValid(replication_slot_xmin) && NormalTransactionIdPrecedes(replication_slot_xmin, RecentGlobalXmin)) @@ -1782,6 +1847,11 @@ GetSnapshotData(Snapshot snapshot) MaintainOldSnapshotTimeMapping(snapshot->whenTaken, xmin); } + snapshot->imported_global_csn = false; + snapshot->global_csn = global_csn; + if (global_snapshot_defer_time > 0) + GlobalSnapshotMapXmin(snapshot->global_csn); + return snapshot; } @@ -2999,6 +3069,35 @@ ProcArrayGetReplicationSlotXmin(TransactionId *xmin, LWLockRelease(ProcArrayLock); } +/* + * ProcArraySetGlobalSnapshotXmin + */ +void +ProcArraySetGlobalSnapshotXmin(TransactionId xmin) +{ + LWLockAcquire(ProcArrayLock, LW_SHARED); + + if (TransactionIdFollows(xmin, procArray->global_snapshot_xmin)) + procArray->global_snapshot_xmin = xmin; + + LWLockRelease(ProcArrayLock); +} + +/* + * ProcArrayGetGlobalSnapshotXmin + */ +TransactionId +ProcArrayGetGlobalSnapshotXmin(bool locked) +{ + volatile TransactionId xmin; + + if (!locked) + LWLockAcquire(ProcArrayLock, LW_SHARED); + xmin = procArray->global_snapshot_xmin; + if (!locked) + LWLockRelease(ProcArrayLock); + return xmin; +} #define XidCacheRemove(i) \ do { \ diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt index 9615058f29..8ef731d560 100644 --- a/src/backend/storage/lmgr/lwlocknames.txt +++ b/src/backend/storage/lmgr/lwlocknames.txt @@ -51,3 +51,4 @@ BackendRandomLock 43 LogicalRepWorkerLock 44 CLogTruncationLock 45 GlobalCSNLogControlLock 46 +GlobalSnapshotXidMapLock 47 diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 6f30e082b2..ed497185be 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -37,6 +37,7 @@ #include "access/transam.h" #include "access/twophase.h" +#include "access/global_snapshot.h" #include "access/xact.h" #include "miscadmin.h" #include "pgstat.h" @@ -417,6 +418,9 @@ InitProcess(void) MyProc->clogGroupMemberLsn = InvalidXLogRecPtr; pg_atomic_init_u32(&MyProc->clogGroupNext, INVALID_PGPROCNO); + MyProc->originalXmin = InvalidTransactionId; + pg_atomic_init_u64(&MyProc->assignedGlobalCsn, InProgressGlobalCSN); + /* * Acquire ownership of the PGPROC's latch, so that we can use WaitLatch * on it. That allows us to repoint the process latch, which so far @@ -559,6 +563,7 @@ InitAuxiliaryProcess(void) MyProc->lwWaitMode = 0; MyProc->waitLock = NULL; MyProc->waitProcLock = NULL; + MyProc->originalXmin = InvalidTransactionId; #ifdef USE_ASSERT_CHECKING { int i; diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 2699a7e6b4..2abc4be86c 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -28,6 +28,7 @@ #include "access/commit_ts.h" #include "access/gin.h" +#include "access/global_snapshot.h" #include "access/rmgr.h" #include "access/transam.h" #include "access/twophase.h" @@ -2332,6 +2333,16 @@ static struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"global_snapshot_defer_time", PGC_POSTMASTER, REPLICATION_MASTER, + gettext_noop("Minimal age of records which allowed to be vacuumed, in seconds."), + NULL + }, + &global_snapshot_defer_time, + 0, 0, INT_MAX, + NULL, NULL, NULL + }, + /* * See also CheckRequiredParameterValues() if this parameter changes */ diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 3d88e80a20..206c1806e0 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -255,6 +255,8 @@ # and comma-separated list of application_name # from standby(s); '*' = all #vacuum_defer_cleanup_age = 0 # number of xacts by which cleanup is delayed +#global_snapshot_defer_time = 0 # minimal age of records which allowed to be + # vacuumed, in seconds # - Standby Servers - diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c index 4b45d3cccd..135a5a6881 100644 --- a/src/backend/utils/time/snapmgr.c +++ b/src/backend/utils/time/snapmgr.c @@ -48,6 +48,7 @@ #include #include +#include "access/global_snapshot.h" #include "access/transam.h" #include "access/xact.h" #include "access/xlog.h" @@ -245,6 +246,8 @@ typedef struct SerializedSnapshotData CommandId curcid; TimestampTz whenTaken; XLogRecPtr lsn; + GlobalCSN global_csn; + bool imported_global_csn; } SerializedSnapshotData; Size @@ -992,7 +995,9 @@ SnapshotResetXmin(void) pairingheap_first(&RegisteredSnapshots)); if (TransactionIdPrecedes(MyPgXact->xmin, minSnapshot->xmin)) + { MyPgXact->xmin = minSnapshot->xmin; + } } /* @@ -2081,6 +2086,8 @@ SerializeSnapshot(Snapshot snapshot, char *start_address) serialized_snapshot.curcid = snapshot->curcid; serialized_snapshot.whenTaken = snapshot->whenTaken; serialized_snapshot.lsn = snapshot->lsn; + serialized_snapshot.global_csn = snapshot->global_csn; + serialized_snapshot.imported_global_csn = snapshot->imported_global_csn; /* * Ignore the SubXID array if it has overflowed, unless the snapshot was @@ -2155,6 +2162,8 @@ RestoreSnapshot(char *start_address) snapshot->curcid = serialized_snapshot.curcid; snapshot->whenTaken = serialized_snapshot.whenTaken; snapshot->lsn = serialized_snapshot.lsn; + snapshot->global_csn = serialized_snapshot.global_csn; + snapshot->imported_global_csn = serialized_snapshot.imported_global_csn; /* Copy XIDs, if present. */ if (serialized_snapshot.xcnt > 0) @@ -2192,3 +2201,92 @@ RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc) { SetTransactionSnapshot(snapshot, NULL, InvalidPid, master_pgproc); } + +/* + * ExportGlobalSnapshot + * + * Export global_csn so that caller can expand this transaction to other + * nodes. + * + * TODO: it's better to do this through EXPORT/IMPORT SNAPSHOT syntax and + * add some additional checks that transaction did not yet acquired xid, but + * for current iteration of this patch I don't want to hack on parser. + */ +GlobalCSN +ExportGlobalSnapshot() +{ + if (!track_global_snapshots) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not export global snapshot"), + errhint("Make sure the configuration parameter \"%s\" is enabled.", + "track_global_snapshots"))); + + return CurrentSnapshot->global_csn; +} + +/* SQL accessor to ExportGlobalSnapshot() */ +Datum +pg_global_snaphot_export(PG_FUNCTION_ARGS) +{ + GlobalCSN global_csn = ExportGlobalSnapshot(); + PG_RETURN_UINT64(global_csn); +} + +/* + * ImportGlobalSnapshot + * + * Import global_csn and retract this backends xmin to the value that was + * actual when we had such global_csn. + * + * TODO: it's better to do this through EXPORT/IMPORT SNAPSHOT syntax and + * add some additional checks that transaction did not yet acquired xid, but + * for current iteration of this patch I don't want to hack on parser. + */ +void +ImportGlobalSnapshot(GlobalCSN snap_global_csn) +{ + volatile TransactionId xmin; + + if (!track_global_snapshots) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not import global snapshot"), + errhint("Make sure the configuration parameter \"%s\" is enabled.", + "track_global_snapshots"))); + + /* + * Call GlobalSnapshotToXmin under ProcArrayLock to avoid situation that + * resulting xmin will be evicted from map before we will set it into our + * backend's xmin. + */ + LWLockAcquire(ProcArrayLock, LW_SHARED); + xmin = GlobalSnapshotToXmin(snap_global_csn); + if (!TransactionIdIsValid(xmin)) + { + LWLockRelease(ProcArrayLock); + elog(ERROR, "GlobalSnapshotToXmin: global snapshot too old"); + } + MyProc->originalXmin = MyPgXact->xmin; + MyPgXact->xmin = TransactionXmin = xmin; + LWLockRelease(ProcArrayLock); + + CurrentSnapshot->xmin = xmin; /* defuse SnapshotResetXmin() */ + CurrentSnapshot->global_csn = snap_global_csn; + CurrentSnapshot->imported_global_csn = true; + GlobalSnapshotSync(snap_global_csn); + + Assert(TransactionIdPrecedesOrEquals(RecentGlobalXmin, xmin)); + Assert(TransactionIdPrecedesOrEquals(RecentGlobalDataXmin, xmin)); +} + +/* SQL accessor to ImportGlobalSnapshot() */ +Datum +pg_global_snaphot_import(PG_FUNCTION_ARGS) +{ + GlobalCSN global_csn = PG_GETARG_UINT64(0); + ImportGlobalSnapshot(global_csn); + PG_RETURN_VOID(); +} + + diff --git a/src/backend/utils/time/tqual.c b/src/backend/utils/time/tqual.c index f7c4c9188c..f2fbc77fa8 100644 --- a/src/backend/utils/time/tqual.c +++ b/src/backend/utils/time/tqual.c @@ -63,6 +63,7 @@ #include "postgres.h" +#include "access/global_snapshot.h" #include "access/htup_details.h" #include "access/multixact.h" #include "access/subtrans.h" @@ -1462,8 +1463,8 @@ HeapTupleIsSurelyDead(HeapTuple htup, TransactionId OldestXmin) } /* - * XidInMVCCSnapshot - * Is the given XID still-in-progress according to the snapshot? + * XidInLocalMVCCSnapshot + * Is the given XID still-in-progress according to the local snapshot? * * Note: GetSnapshotData never stores either top xid or subxids of our own * backend into a snapshot, so these xids will not be reported as "running" @@ -1471,8 +1472,8 @@ HeapTupleIsSurelyDead(HeapTuple htup, TransactionId OldestXmin) * TransactionIdIsCurrentTransactionId first, except when it's known the * XID could not be ours anyway. */ -bool -XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot) +static bool +XidInLocalMVCCSnapshot(TransactionId xid, Snapshot snapshot) { uint32 i; @@ -1583,6 +1584,62 @@ XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot) return false; } +/* + * XidInMVCCSnapshot + * + * Check whether this xid is in snapshot, taking into account fact that + * snapshot can be global. When track_global_snapshots is switched off + * just call XidInLocalMVCCSnapshot(). + */ +bool +XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot) +{ + bool in_snapshot; + + if (snapshot->imported_global_csn) + { + Assert(track_global_snapshots); + /* No point to using snapshot info except CSN */ + return XidInvisibleInGlobalSnapshot(xid, snapshot); + } + + in_snapshot = XidInLocalMVCCSnapshot(xid, snapshot); + + if (!track_global_snapshots) + { + Assert(GlobalCSNIsFrozen(snapshot->global_csn)); + return in_snapshot; + } + + if (in_snapshot) + { + /* + * This xid may be already in unknown state and in that case + * we must wait and recheck. + * + * TODO: this check can be skipped if we know for sure that there were + * no global transactions when this snapshot was taken. That requires + * some changes to mechanisms of global snapshots exprot/import (if + * backend set xmin then we should have a-priori knowledge that this + * transaction going to be global or local -- right now this is not + * enforced). Leave that for future and don't complicate this patch. + */ + return XidInvisibleInGlobalSnapshot(xid, snapshot); + } + else + { +#ifdef USE_ASSERT_CHECKING + /* Check that global snapshot gives the same results as local one */ + if (XidInvisibleInGlobalSnapshot(xid, snapshot)) + { + GlobalCSN gcsn = TransactionIdGetGlobalCSN(xid); + Assert(GlobalCSNIsAborted(gcsn)); + } +#endif + return false; + } +} + /* * Is the tuple really only locked? That is, is it not updated? * diff --git a/src/include/access/global_snapshot.h b/src/include/access/global_snapshot.h new file mode 100644 index 0000000000..246b180cfd --- /dev/null +++ b/src/include/access/global_snapshot.h @@ -0,0 +1,72 @@ +/*------------------------------------------------------------------------- + * + * global_snapshot.h + * Support for cross-node snapshot isolation. + * + * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/access/global_snapshot.h + * + *------------------------------------------------------------------------- + */ +#ifndef GLOBAL_SNAPSHOT_H +#define GLOBAL_SNAPSHOT_H + +#include "port/atomics.h" +#include "storage/lock.h" +#include "utils/snapshot.h" +#include "utils/guc.h" + +/* + * snapshot.h is used in frontend code so atomic variant of GlobalCSN type + * is defined here. + */ +typedef pg_atomic_uint64 GlobalCSN_atomic; + +#define InProgressGlobalCSN UINT64CONST(0x0) +#define AbortedGlobalCSN UINT64CONST(0x1) +#define FrozenGlobalCSN UINT64CONST(0x2) +#define InDoubtGlobalCSN UINT64CONST(0x3) +#define FirstNormalGlobalCSN UINT64CONST(0x4) + +#define GlobalCSNIsInProgress(csn) ((csn) == InProgressGlobalCSN) +#define GlobalCSNIsAborted(csn) ((csn) == AbortedGlobalCSN) +#define GlobalCSNIsFrozen(csn) ((csn) == FrozenGlobalCSN) +#define GlobalCSNIsInDoubt(csn) ((csn) == InDoubtGlobalCSN) +#define GlobalCSNIsNormal(csn) ((csn) >= FirstNormalGlobalCSN) + + +extern int global_snapshot_defer_time; + + +extern Size GlobalSnapshotShmemSize(void); +extern void GlobalSnapshotShmemInit(void); +extern void GlobalSnapshotStartup(TransactionId oldestActiveXID); + +extern void GlobalSnapshotMapXmin(GlobalCSN snapshot_global_csn); +extern TransactionId GlobalSnapshotToXmin(GlobalCSN snapshot_global_csn); + +extern GlobalCSN GlobalSnapshotGenerate(bool locked); + +extern bool XidInvisibleInGlobalSnapshot(TransactionId xid, Snapshot snapshot); + +extern void GlobalSnapshotSync(GlobalCSN remote_gcsn); + +extern GlobalCSN TransactionIdGetGlobalCSN(TransactionId xid); + +extern GlobalCSN GlobalSnapshotPrepareGlobal(const char *gid); +extern void GlobalSnapshotAssignCsnGlobal(const char *gid, + GlobalCSN global_csn); + +extern GlobalCSN GlobalSnapshotPrepareCurrent(void); +extern void GlobalSnapshotAssignCsnCurrent(GlobalCSN global_csn); + +extern void GlobalSnapshotAbort(PGPROC *proc, TransactionId xid, int nsubxids, + TransactionId *subxids); +extern void GlobalSnapshotPrecommit(PGPROC *proc, TransactionId xid, int nsubxids, + TransactionId *subxids); +extern void GlobalSnapshotCommit(PGPROC *proc, TransactionId xid, int nsubxids, + TransactionId *subxids); + +#endif /* GLOBAL_SNAPSHOT_H */ diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h index 0e932daa48..f8b774f393 100644 --- a/src/include/access/twophase.h +++ b/src/include/access/twophase.h @@ -18,6 +18,7 @@ #include "access/xact.h" #include "datatype/timestamp.h" #include "storage/lock.h" +#include "utils/snapshot.h" /* * GlobalTransactionData is defined in twophase.c; other places have no diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 66c6c224a8..6d9774901f 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -10202,4 +10202,18 @@ proisstrict => 'f', prorettype => 'bool', proargtypes => 'oid int4 int4 any', proargmodes => '{i,i,i,v}', prosrc => 'satisfies_hash_partition' }, +# global transaction handling +{ oid => '3430', descr => 'export global transaction snapshot', + proname => 'pg_global_snaphot_export', provolatile => 'v', proparallel => 'u', + prorettype => 'int8', proargtypes => '', prosrc => 'pg_global_snaphot_export' }, +{ oid => '3431', descr => 'import global transaction snapshot', + proname => 'pg_global_snaphot_import', provolatile => 'v', proparallel => 'u', + prorettype => 'void', proargtypes => 'int8', prosrc => 'pg_global_snaphot_import' }, +{ oid => '3432', descr => 'prepare distributed transaction for commit, get global_csn', + proname => 'pg_global_snaphot_prepare', provolatile => 'v', proparallel => 'u', + prorettype => 'int8', proargtypes => 'text', prosrc => 'pg_global_snaphot_prepare' }, +{ oid => '3433', descr => 'assign global_csn to distributed transaction', + proname => 'pg_global_snaphot_assign', provolatile => 'v', proparallel => 'u', + prorettype => 'void', proargtypes => 'text int8', prosrc => 'pg_global_snaphot_assign' }, + ] diff --git a/src/include/datatype/timestamp.h b/src/include/datatype/timestamp.h index f5b6026ef5..75ec93b46b 100644 --- a/src/include/datatype/timestamp.h +++ b/src/include/datatype/timestamp.h @@ -93,6 +93,9 @@ typedef struct #define USECS_PER_MINUTE INT64CONST(60000000) #define USECS_PER_SEC INT64CONST(1000000) +#define NSECS_PER_SEC INT64CONST(1000000000) +#define NSECS_PER_USEC INT64CONST(1000) + /* * We allow numeric timezone offsets up to 15:59:59 either way from Greenwich. * Currently, the record holders for wackiest offsets in actual use are zones diff --git a/src/include/fmgr.h b/src/include/fmgr.h index 101f513ba6..3026e71f83 100644 --- a/src/include/fmgr.h +++ b/src/include/fmgr.h @@ -250,6 +250,7 @@ extern struct varlena *pg_detoast_datum_packed(struct varlena *datum); #define PG_GETARG_FLOAT4(n) DatumGetFloat4(PG_GETARG_DATUM(n)) #define PG_GETARG_FLOAT8(n) DatumGetFloat8(PG_GETARG_DATUM(n)) #define PG_GETARG_INT64(n) DatumGetInt64(PG_GETARG_DATUM(n)) +#define PG_GETARG_UINT64(n) DatumGetUInt64(PG_GETARG_DATUM(n)) /* use this if you want the raw, possibly-toasted input datum: */ #define PG_GETARG_RAW_VARLENA_P(n) ((struct varlena *) PG_GETARG_POINTER(n)) /* use this if you want the input datum de-toasted: */ diff --git a/src/include/portability/instr_time.h b/src/include/portability/instr_time.h index f968444671..ebc3836c9c 100644 --- a/src/include/portability/instr_time.h +++ b/src/include/portability/instr_time.h @@ -138,6 +138,9 @@ typedef struct timespec instr_time; #define INSTR_TIME_GET_MICROSEC(t) \ (((uint64) (t).tv_sec * (uint64) 1000000) + (uint64) ((t).tv_nsec / 1000)) +#define INSTR_TIME_GET_NANOSEC(t) \ + (((uint64) (t).tv_sec * (uint64) 1000000000) + (uint64) ((t).tv_nsec)) + #else /* !HAVE_CLOCK_GETTIME */ /* Use gettimeofday() */ @@ -202,6 +205,10 @@ typedef struct timeval instr_time; #define INSTR_TIME_GET_MICROSEC(t) \ (((uint64) (t).tv_sec * (uint64) 1000000) + (uint64) (t).tv_usec) +#define INSTR_TIME_GET_NANOSEC(t) \ + (((uint64) (t).tv_sec * (uint64) 1000000000) + \ + (uint64) (t).tv_usec * (uint64) 1000) + #endif /* HAVE_CLOCK_GETTIME */ #else /* WIN32 */ @@ -234,6 +241,9 @@ typedef LARGE_INTEGER instr_time; #define INSTR_TIME_GET_MICROSEC(t) \ ((uint64) (((double) (t).QuadPart * 1000000.0) / GetTimerFrequency())) +#define INSTR_TIME_GET_NANOSEC(t) \ + ((uint64) (((double) (t).QuadPart * 1000000000.0) / GetTimerFrequency())) + static inline double GetTimerFrequency(void) { diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index 5c19a61dcf..7e24d539cc 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -15,8 +15,10 @@ #define _PROC_H_ #include "access/clog.h" +#include "access/global_snapshot.h" #include "access/xlogdefs.h" #include "lib/ilist.h" +#include "utils/snapshot.h" #include "storage/latch.h" #include "storage/lock.h" #include "storage/pg_sema.h" @@ -57,6 +59,7 @@ struct XidCache #define PROC_IN_LOGICAL_DECODING 0x10 /* currently doing logical * decoding outside xact */ #define PROC_RESERVED 0x20 /* reserved for procarray */ +#define PROC_RESERVED2 0x40 /* reserved for procarray */ /* flags reset at EOXact */ #define PROC_VACUUM_STATE_MASK \ @@ -200,6 +203,18 @@ struct PGPROC PGPROC *lockGroupLeader; /* lock group leader, if I'm a member */ dlist_head lockGroupMembers; /* list of members, if I'm a leader */ dlist_node lockGroupLink; /* my member link, if I'm a member */ + + /* + * assignedGlobalCsn holds GlobalCSN for this transaction. It is generated + * under a ProcArray lock and later is writter to a GlobalCSNLog. This + * variable defined as atomic only for case of group commit, in all other + * scenarios only backend responsible for this proc entry is working with + * this variable. + */ + GlobalCSN_atomic assignedGlobalCsn; + + /* Original xmin of this backend before global snapshot was imported */ + TransactionId originalXmin; }; /* NOTE: "typedef struct PGPROC PGPROC" appears in storage/lock.h. */ diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h index 75bab2985f..4c749d6123 100644 --- a/src/include/storage/procarray.h +++ b/src/include/storage/procarray.h @@ -36,6 +36,10 @@ #define PROCARRAY_SLOTS_XMIN 0x20 /* replication slot xmin, * catalog_xmin */ + +#define PROCARRAY_NON_IMPORTED_XMIN 0x40 /* use originalXmin instead + * of xmin to properly + * maintain gsXidMap */ /* * Only flags in PROCARRAY_PROC_FLAGS_MASK are considered when matching * PGXACT->vacuumFlags. Other flags are used for different purposes and @@ -124,4 +128,8 @@ extern void ProcArraySetReplicationSlotXmin(TransactionId xmin, extern void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, TransactionId *catalog_xmin); +extern void ProcArraySetGlobalSnapshotXmin(TransactionId xmin); + +extern TransactionId ProcArrayGetGlobalSnapshotXmin(bool locked); + #endif /* PROCARRAY_H */ diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h index 83806f3040..1a066fd8d8 100644 --- a/src/include/utils/snapmgr.h +++ b/src/include/utils/snapmgr.h @@ -87,6 +87,9 @@ extern void AtSubCommit_Snapshot(int level); extern void AtSubAbort_Snapshot(int level); extern void AtEOXact_Snapshot(bool isCommit, bool resetXmin); +extern GlobalCSN ExportGlobalSnapshot(void); +extern void ImportGlobalSnapshot(GlobalCSN snap_global_csn); + extern void ImportSnapshot(const char *idstr); extern bool XactHasExportedSnapshots(void); extern void DeleteAllExportedSnapshotFiles(void); diff --git a/src/include/utils/snapshot.h b/src/include/utils/snapshot.h index 318d41e6f7..1563407824 100644 --- a/src/include/utils/snapshot.h +++ b/src/include/utils/snapshot.h @@ -115,6 +115,14 @@ typedef struct SnapshotData TimestampTz whenTaken; /* timestamp when snapshot was taken */ XLogRecPtr lsn; /* position in the WAL stream when taken */ + + /* + * GlobalCSN for cross-node snapshot isolation support. + * Will be used only if track_global_snapshots is enabled. + */ + GlobalCSN global_csn; + /* Did we have our own global_csn or imported one from different node */ + bool imported_global_csn; } SnapshotData; /* -- 2.16.2