diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile index 91ad1ef..640d37f 100644 --- a/src/backend/access/rmgrdesc/Makefile +++ b/src/backend/access/rmgrdesc/Makefile @@ -11,6 +11,7 @@ include $(top_builddir)/src/Makefile.global OBJS = brindesc.o clogdesc.o committsdesc.o dbasedesc.o genericdesc.o \ gindesc.o gistdesc.o hashdesc.o heapdesc.o logicalmsgdesc.o \ mxactdesc.o nbtdesc.o relmapdesc.o replorigindesc.o seqdesc.o \ - smgrdesc.o spgdesc.o standbydesc.o tblspcdesc.o undologdesc.o xactdesc.o xlogdesc.o + smgrdesc.o spgdesc.o standbydesc.o tblspcdesc.o undoactiondesc.o \ + undologdesc.o xactdesc.o xlogdesc.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/access/rmgrdesc/undoactiondesc.c b/src/backend/access/rmgrdesc/undoactiondesc.c new file mode 100644 index 0000000..c396582 --- /dev/null +++ b/src/backend/access/rmgrdesc/undoactiondesc.c @@ -0,0 +1,47 @@ +/*------------------------------------------------------------------------- + * + * undoactiondesc.c + * rmgr descriptor routines for access/undo/undoactionxlog.c + * + * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/access/rmgrdesc/undoactiondesc.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/undoaction_xlog.h" + +void +undoaction_desc(StringInfo buf, XLogReaderState *record) +{ + char *rec = XLogRecGetData(record); + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + if (info == XLOG_UNDO_APPLY_PROGRESS) + { + xl_undoapply_progress *xlrec = (xl_undoapply_progress *) rec; + + appendStringInfo(buf, "urec_ptr %lu progress %u", + xlrec->urec_ptr, xlrec->progress); + } +} + +const char * +undoaction_identify(uint8 info) +{ + const char *id = NULL; + + switch (info & ~XLR_INFO_MASK) + { + case XLOG_UNDO_APPLY_PROGRESS: + id = "UNDO_APPLY_PROGRESS"; + break; + } + + return id; +} diff --git a/src/backend/access/rmgrdesc/xlogdesc.c b/src/backend/access/rmgrdesc/xlogdesc.c index 33060f3..2ff2b86 100644 --- a/src/backend/access/rmgrdesc/xlogdesc.c +++ b/src/backend/access/rmgrdesc/xlogdesc.c @@ -48,7 +48,8 @@ xlog_desc(StringInfo buf, XLogReaderState *record) "tli %u; prev tli %u; fpw %s; xid %u:%u; oid %u; multi %u; offset %u; " "oldest xid %u in DB %u; oldest multi %u in DB %u; " "oldest/newest commit timestamp xid: %u/%u; " - "oldest running xid %u; %s", + "oldest running xid %u; " + "oldest xid with epoch having undo " UINT64_FORMAT "; %s", (uint32) (checkpoint->redo >> 32), (uint32) checkpoint->redo, checkpoint->ThisTimeLineID, checkpoint->PrevTimeLineID, @@ -65,6 +66,7 @@ xlog_desc(StringInfo buf, XLogReaderState *record) checkpoint->oldestCommitTsXid, checkpoint->newestCommitTsXid, checkpoint->oldestActiveXid, + checkpoint->oldestXidWithEpochHavingUndo, (info == XLOG_CHECKPOINT_SHUTDOWN) ? "shutdown" : "online"); } else if (info == XLOG_NEXTOID) diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c index 8b05374..6238240 100644 --- a/src/backend/access/transam/rmgr.c +++ b/src/backend/access/transam/rmgr.c @@ -18,6 +18,7 @@ #include "access/multixact.h" #include "access/nbtxlog.h" #include "access/spgxlog.h" +#include "access/undoaction_xlog.h" #include "access/undolog_xlog.h" #include "access/xact.h" #include "access/xlog_internal.h" @@ -31,8 +32,8 @@ #include "utils/relmapper.h" /* must be kept in sync with RmgrData definition in xlog_internal.h */ -#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \ - { name, redo, desc, identify, startup, cleanup, mask }, +#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,undo,undo_desc) \ + { name, redo, desc, identify, startup, cleanup, mask, undo, undo_desc }, const RmgrData RmgrTable[RM_MAX_ID + 1] = { #include "access/rmgrlist.h" diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index ecc01f7..00dcd59 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -927,6 +927,16 @@ typedef struct TwoPhaseFileHeader uint16 gidlen; /* length of the GID - GID follows the header */ XLogRecPtr origin_lsn; /* lsn of this record at origin node */ TimestampTz origin_timestamp; /* time of prepare at origin node */ + + /* + * We need the locations of start and end undo record pointers when + * rollbacks are to be performed for prepared transactions using undo-based + * relations. We need to store these information in file as user might + * rollback the prepared transaction after recovery and for that we need + * it's start and end undo locations. + */ + UndoRecPtr start_urec_ptr[UndoPersistenceLevels]; + UndoRecPtr end_urec_ptr[UndoPersistenceLevels]; } TwoPhaseFileHeader; /* @@ -1001,7 +1011,8 @@ save_state_data(const void *data, uint32 len) * Initializes data structure and inserts the 2PC file header record. */ void -StartPrepare(GlobalTransaction gxact) +StartPrepare(GlobalTransaction gxact, UndoRecPtr *start_urec_ptr, + UndoRecPtr *end_urec_ptr) { PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno]; PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno]; @@ -1032,6 +1043,11 @@ StartPrepare(GlobalTransaction gxact) hdr.database = proc->databaseId; hdr.prepared_at = gxact->prepared_at; hdr.owner = gxact->owner; + + /* save the start and end undo record pointers */ + memcpy(hdr.start_urec_ptr, start_urec_ptr, sizeof(hdr.start_urec_ptr)); + memcpy(hdr.end_urec_ptr, end_urec_ptr, sizeof(hdr.end_urec_ptr)); + hdr.nsubxacts = xactGetCommittedChildren(&children); hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels); hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels); @@ -1468,6 +1484,9 @@ FinishPreparedTransaction(const char *gid, bool isCommit) RelFileNode *delrels; int ndelrels; SharedInvalidationMessage *invalmsgs; + int i; + UndoRecPtr start_urec_ptr[UndoPersistenceLevels]; + UndoRecPtr end_urec_ptr[UndoPersistenceLevels]; /* * Validate the GID, and lock the GXACT to ensure that two backends do not @@ -1505,6 +1524,10 @@ FinishPreparedTransaction(const char *gid, bool isCommit) invalmsgs = (SharedInvalidationMessage *) bufptr; bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage)); + /* save the start and end undo record pointers */ + memcpy(start_urec_ptr, hdr->start_urec_ptr, sizeof(start_urec_ptr)); + memcpy(end_urec_ptr, hdr->end_urec_ptr, sizeof(end_urec_ptr)); + /* compute latestXid among all children */ latestXid = TransactionIdLatest(xid, hdr->nsubxacts, children); @@ -1612,6 +1635,83 @@ FinishPreparedTransaction(const char *gid, bool isCommit) MyLockedGxact = NULL; + /* + * Perform undo actions, if there are undologs for this transaction. We + * need to perform undo actions while we are still in transaction. Never + * push rollbacks of temp tables to undo worker. + */ + for (i = 0; i < UndoPersistenceLevels; i++) + { + volatile UndoRequestInfo urinfo; + uint32 epoch; + FullTransactionId full_xid; + + /* + * We don't allow XIDs with an age of more than 2 billion in undo, so + * we can infer the epoch here. This assumption is not good, but + * we can fix it by storing full transaction id in TwoPhaseFileHeader. + */ + epoch = GetEpochForXid(hdr->xid); + full_xid = FullTransactionIdFromEpochAndXid(epoch, hdr->xid); + + if (end_urec_ptr[i] != InvalidUndoRecPtr && !isCommit) + { + uint32 save_holdoff; + + save_holdoff = InterruptHoldoffCount; + PG_TRY(); + { + bool result = false; + + /* + * Prepare required undo request info so that it can be used in + * exception. + */ + ResetUndoRequestInfo(&urinfo); + urinfo.dbid = MyDatabaseId; + urinfo.full_xid = full_xid; + + if (i != UNDO_TEMP) + result = RegisterRollbackReq(end_urec_ptr[i], + start_urec_ptr[i], + hdr->database, + full_xid); + if (!result) + execute_undo_actions(full_xid, end_urec_ptr[i], + start_urec_ptr[i], true); + } + PG_CATCH(); + { + if (i == UNDO_TEMP) + pg_rethrow_as_fatal(); + + /* + * Add the request into an error queue so that it can be + * processed in a timely fashion. + * + * If we fail to add the request in an error queue, then + * remove the entry from the hash table and continue to + * process the remaining undo requests if any. This request + * will be later processed by discard worker. + */ + if (!InsertRequestIntoErrorUndoQueue(&urinfo)) + RollbackHTRemoveEntry(urinfo.full_xid, urinfo.start_urec_ptr); + + /* + * Errors can reset holdoff count, so restore back. This is + * required because this function can be called after holding + * interrupts. + */ + InterruptHoldoffCount = save_holdoff; + + /* Send the error only to server log. */ + err_out_to_client(false); + EmitErrorReport(); + } + PG_END_TRY(); + } + } + RESUME_INTERRUPTS(); pfree(buf); diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c index 8c3d84f..9e90b51 100644 --- a/src/backend/access/transam/varsup.c +++ b/src/backend/access/transam/varsup.c @@ -300,6 +300,30 @@ AdvanceNextFullTransactionIdPastXid(TransactionId xid) } /* + * Get epoch for the given xid. + */ +uint32 +GetEpochForXid(TransactionId xid) +{ + FullTransactionId next_fxid; + TransactionId next_xid; + uint32 epoch; + + next_fxid = ReadNextFullTransactionId(); + next_xid = XidFromFullTransactionId(next_fxid); + epoch = EpochFromFullTransactionId(next_fxid); + + /* + * If xid is numerically bigger than next_xid, it has to be from the last + * epoch. + */ + if (unlikely(xid > next_xid)) + epoch--; + + return epoch; +} + +/* * Advance the cluster-wide value for the oldest valid clog entry. * * We must acquire CLogTruncationLock to advance the oldestClogXid. It's not @@ -333,10 +357,23 @@ SetTransactionIdLimit(TransactionId oldest_datfrozenxid, Oid oldest_datoid) TransactionId xidStopLimit; TransactionId xidWrapLimit; TransactionId curXid; + TransactionId oldestXidHavingUndo; Assert(TransactionIdIsNormal(oldest_datfrozenxid)); /* + * To determine the last safe xid that can be allocated, we need to + * consider oldestXidHavingUndo because this is a oldest xid whose undo is + * not yet discarded so this is still a valid xid in system. The + * oldestXidHavingUndo will be only valid for zheap storage engine, so it + * won't impact any other storage engine. + */ + oldestXidHavingUndo = GetXidFromEpochXid( + pg_atomic_read_u64(&ProcGlobal->oldestXidWithEpochHavingUndo)); + if (TransactionIdIsValid(oldestXidHavingUndo)) + oldest_datfrozenxid = Min(oldest_datfrozenxid, oldestXidHavingUndo); + + /* * The place where we actually get into deep trouble is halfway around * from the oldest potentially-existing XID. (This calculation is * probably off by one or two counts, because the special XIDs reduce the @@ -403,6 +440,13 @@ SetTransactionIdLimit(TransactionId oldest_datfrozenxid, Oid oldest_datoid) curXid = XidFromFullTransactionId(ShmemVariableCache->nextFullXid); LWLockRelease(XidGenLock); + /* + * Fixme - The messages in below code need some adjustment for zheap. They + * should reflect that the system needs to discard the undo. We can add + * it once we have a pluggable storage API which might provide us some way + * to distinguish among differnt storage engines. + */ + /* Log the info */ ereport(DEBUG1, (errmsg("transaction ID wrap limit is %u, limited by database with OID %u", diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 47a8f0d..5996997 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -26,11 +26,13 @@ #include "access/subtrans.h" #include "access/transam.h" #include "access/twophase.h" +#include "access/undodiscard.h" #include "access/xact.h" #include "access/xlog.h" #include "access/xloginsert.h" #include "access/xlogutils.h" #include "access/undoinsert.h" +#include "access/undorequest.h" #include "catalog/namespace.h" #include "catalog/pg_enum.h" #include "catalog/storage.h" @@ -129,7 +131,8 @@ typedef enum TransState TRANS_INPROGRESS, /* inside a valid transaction */ TRANS_COMMIT, /* commit in progress */ TRANS_ABORT, /* abort in progress */ - TRANS_PREPARE /* prepare in progress */ + TRANS_PREPARE, /* prepare in progress */ + TRANS_UNDO /* undo apply in progress */ } TransState; /* @@ -154,6 +157,7 @@ typedef enum TBlockState TBLOCK_ABORT_END, /* failed xact, ROLLBACK received */ TBLOCK_ABORT_PENDING, /* live xact, ROLLBACK received */ TBLOCK_PREPARE, /* live xact, PREPARE received */ + TBLOCK_UNDO, /* failed xact, awaiting undo to be applied */ /* subtransaction states */ TBLOCK_SUBBEGIN, /* starting a subtransaction */ @@ -164,7 +168,8 @@ typedef enum TBlockState TBLOCK_SUBABORT_END, /* failed subxact, ROLLBACK received */ TBLOCK_SUBABORT_PENDING, /* live subxact, ROLLBACK received */ TBLOCK_SUBRESTART, /* live subxact, ROLLBACK TO received */ - TBLOCK_SUBABORT_RESTART /* failed subxact, ROLLBACK TO received */ + TBLOCK_SUBABORT_RESTART, /* failed subxact, ROLLBACK TO received */ + TBLOCK_SUBUNDO /* failed subxact, awaiting undo to be applied */ } TBlockState; /* @@ -192,6 +197,14 @@ typedef struct TransactionStateData bool didLogXid; /* has xid been included in WAL record? */ int parallelModeLevel; /* Enter/ExitParallelMode counter */ bool chain; /* start a new block after this one */ + bool subXactLock; /* has lock created for subtransaction? */ + + /* start and end undo record location for each persistence level */ + UndoRecPtr start_urec_ptr[UndoPersistenceLevels]; /* this is 'to' location */ + UndoRecPtr latest_urec_ptr[UndoPersistenceLevels]; /* this is 'from' + * location */ + bool performUndoActions; + struct TransactionStateData *parent; /* back link to parent */ } TransactionStateData; @@ -363,9 +376,9 @@ IsTransactionState(void) * also reject the startup/shutdown states TRANS_START, TRANS_COMMIT, * TRANS_PREPARE since it might be too soon or too late within those * transition states to do anything interesting. Hence, the only "valid" - * state is TRANS_INPROGRESS. + * state is TRANS_INPROGRESS or TRANS_UNDO. */ - return (s->state == TRANS_INPROGRESS); + return (s->state == TRANS_INPROGRESS || s->state == TRANS_UNDO); } /* @@ -724,9 +737,14 @@ SubTransactionIsActive(SubTransactionId subxid) { TransactionState s; + /* + * The subtransaction is not considered active if it is being aborted or + * in undo apply state, even though it may still have an entry on the + * state stack. + */ for (s = CurrentTransactionState; s != NULL; s = s->parent) { - if (s->state == TRANS_ABORT) + if (s->state == TRANS_ABORT || s->state == TRANS_UNDO) continue; if (s->subTransactionId == subxid) return true; @@ -906,15 +924,15 @@ TransactionIdIsCurrentTransactionId(TransactionId xid) * We will return true for the Xid of the current subtransaction, any of * its subcommitted children, any of its parents, or any of their * previously subcommitted children. However, a transaction being aborted - * is no longer "current", even though it may still have an entry on the - * state stack. + * or in undo apply state is no longer "current", even though it may still + * have an entry on the state stack. */ for (s = CurrentTransactionState; s != NULL; s = s->parent) { int low, high; - if (s->state == TRANS_ABORT) + if (s->state == TRANS_ABORT || s->state == TRANS_UNDO) continue; if (!FullTransactionIdIsValid(s->fullTransactionId)) continue; /* it can't have any child XIDs either */ @@ -998,6 +1016,22 @@ IsInParallelMode(void) } /* + * SetCurrentUndoLocation + */ +void +SetCurrentUndoLocation(UndoRecPtr urec_ptr, UndoPersistence upersistence) +{ + /* + * Set the start undo record pointer for first undo record in a + * subtransaction. + */ + if (!UndoRecPtrIsValid(CurrentTransactionState->start_urec_ptr[upersistence])) + CurrentTransactionState->start_urec_ptr[upersistence] = urec_ptr; + CurrentTransactionState->latest_urec_ptr[upersistence] = urec_ptr; + +} + +/* * CommandCounterIncrement */ void @@ -1886,6 +1920,7 @@ StartTransaction(void) { TransactionState s; VirtualTransactionId vxid; + int i; /* * Let's just make sure the state stack is empty @@ -1969,6 +2004,15 @@ StartTransaction(void) nUnreportedXids = 0; s->didLogXid = false; + /* initialize undo record locations for the transaction */ + for (i = 0; i < UndoPersistenceLevels; i++) + { + s->start_urec_ptr[i] = InvalidUndoRecPtr; + s->latest_urec_ptr[i] = InvalidUndoRecPtr; + } + s->performUndoActions = false; + s->subXactLock = false; + /* * must initialize resource-management stuff first */ @@ -2246,6 +2290,10 @@ CommitTransaction(void) AtEOXact_ApplyLauncher(true); pgstat_report_xact_timestamp(0); + /* In single user mode, discard all the undo logs, once committed. */ + if (!IsUnderPostmaster) + UndoLogDiscardAll(); + CurrentResourceOwner = NULL; ResourceOwnerDelete(TopTransactionResourceOwner); s->curTransactionOwner = NULL; @@ -2265,6 +2313,8 @@ CommitTransaction(void) XactTopFullTransactionId = InvalidFullTransactionId; nParallelCurrentXids = 0; + ResetUndoActionsInfo(); + /* * done with commit processing, set current transaction state back to * default @@ -2281,7 +2331,7 @@ CommitTransaction(void) * NB: if you change this routine, better look at CommitTransaction too! */ static void -PrepareTransaction(void) +PrepareTransaction(UndoRecPtr *start_urec_ptr, UndoRecPtr *end_urec_ptr) { TransactionState s = CurrentTransactionState; TransactionId xid = GetCurrentTransactionId(); @@ -2434,7 +2484,7 @@ PrepareTransaction(void) * PREPARED; in particular, pay attention to whether things should happen * before or after releasing the transaction's locks. */ - StartPrepare(gxact); + StartPrepare(gxact, start_urec_ptr, end_urec_ptr); AtPrepare_Notify(); AtPrepare_Locks(); @@ -2629,7 +2679,9 @@ AbortTransaction(void) * check the current transaction state */ is_parallel_worker = (s->blockState == TBLOCK_PARALLEL_INPROGRESS); - if (s->state != TRANS_INPROGRESS && s->state != TRANS_PREPARE) + if (s->state != TRANS_INPROGRESS && + s->state != TRANS_PREPARE && + s->state != TRANS_UNDO) elog(WARNING, "AbortTransaction while in %s state", TransStateAsString(s->state)); Assert(s->parent == NULL); @@ -2788,6 +2840,8 @@ CleanupTransaction(void) XactTopFullTransactionId = InvalidFullTransactionId; nParallelCurrentXids = 0; + ResetUndoActionsInfo(); + /* * done with abort processing, set current transaction state back to * default @@ -2853,6 +2907,8 @@ StartTransactionCommand(void) case TBLOCK_SUBRESTART: case TBLOCK_SUBABORT_RESTART: case TBLOCK_PREPARE: + case TBLOCK_UNDO: + case TBLOCK_SUBUNDO: elog(ERROR, "StartTransactionCommand: unexpected state %s", BlockStateAsString(s->blockState)); break; @@ -2914,9 +2970,17 @@ CommitTransactionCommand(void) * StartTransactionCommand didn't set the STARTED state * appropriately, while TBLOCK_PARALLEL_INPROGRESS should be ended * by EndParallelWorkerTransaction(), not this function. + * + * TBLOCK_(SUB)UNDO means the error has occurred while applying + * undo for a (sub)transaction. We can't reach here as while + * applying undo via top-level transaction, if we get an error, + * then it is handled by ApplyUndoActions and for subtransaction, + * we promote the error to fatal in such a situation. */ case TBLOCK_DEFAULT: case TBLOCK_PARALLEL_INPROGRESS: + case TBLOCK_UNDO: + case TBLOCK_SUBUNDO: elog(FATAL, "CommitTransactionCommand: unexpected state %s", BlockStateAsString(s->blockState)); break; @@ -2995,11 +3059,13 @@ CommitTransactionCommand(void) /* * Here we were in a perfectly good transaction block but the user - * told us to ROLLBACK anyway. We have to abort the transaction - * and then clean up. + * told us to ROLLBACK anyway. We have to abort the transaction, + * apply the undo actions if any and then clean up. */ case TBLOCK_ABORT_PENDING: + SetUndoActionsInfo(); AbortTransaction(); + ApplyUndoActions(); CleanupTransaction(); s->blockState = TBLOCK_DEFAULT; if (s->chain) @@ -3016,7 +3082,7 @@ CommitTransactionCommand(void) * return to the idle state. */ case TBLOCK_PREPARE: - PrepareTransaction(); + PrepareTransaction(s->start_urec_ptr, s->latest_urec_ptr); s->blockState = TBLOCK_DEFAULT; break; @@ -3060,6 +3126,24 @@ CommitTransactionCommand(void) case TBLOCK_SUBCOMMIT: do { + int i; + + /* + * Before cleaning up the current sub transaction state, + * overwrite parent transaction's latest_urec_ptr with current + * transaction's latest_urec_ptr so that in case parent + * transaction get aborted we must not skip performing undo + * for this transaction. Also set the start_urec_ptr if + * parent start_urec_ptr is not valid. + */ + for (i = 0; i < UndoPersistenceLevels; i++) + { + if (UndoRecPtrIsValid(s->latest_urec_ptr[i])) + s->parent->latest_urec_ptr[i] = s->latest_urec_ptr[i]; + if (!UndoRecPtrIsValid(s->parent->start_urec_ptr[i])) + s->parent->start_urec_ptr[i] = s->start_urec_ptr[i]; + } + CommitSubTransaction(); s = CurrentTransactionState; /* changed by pop */ } while (s->blockState == TBLOCK_SUBCOMMIT); @@ -3073,7 +3157,7 @@ CommitTransactionCommand(void) else if (s->blockState == TBLOCK_PREPARE) { Assert(s->parent == NULL); - PrepareTransaction(); + PrepareTransaction(s->start_urec_ptr, s->latest_urec_ptr); s->blockState = TBLOCK_DEFAULT; } else @@ -3095,7 +3179,9 @@ CommitTransactionCommand(void) * As above, but it's not dead yet, so abort first. */ case TBLOCK_SUBABORT_PENDING: + SetUndoActionsInfo(); AbortSubTransaction(); + ApplyUndoActions(); CleanupSubTransaction(); CommitTransactionCommand(); break; @@ -3115,7 +3201,9 @@ CommitTransactionCommand(void) s->name = NULL; savepointLevel = s->savepointLevel; + SetUndoActionsInfo(); AbortSubTransaction(); + ApplyUndoActions(); CleanupSubTransaction(); DefineSavepoint(NULL); @@ -3168,6 +3256,14 @@ AbortCurrentTransaction(void) { TransactionState s = CurrentTransactionState; + /* + * Here, we just detect whether there are any pending undo actions so that + * we can skip releasing the locks during abort transaction. We don't + * release the locks till we execute undo actions otherwise, there is a + * risk of deadlock. + */ + SetUndoActionsInfo(); + switch (s->blockState) { case TBLOCK_DEFAULT: @@ -3183,7 +3279,11 @@ AbortCurrentTransaction(void) * incompletely started transaction. First, adjust the * low-level state to suppress warning message from * AbortTransaction. + * + * In this state, we must not have performed any operation + * which can generate undo. */ + Assert(!s->performUndoActions); if (s->state == TRANS_START) s->state = TRANS_INPROGRESS; AbortTransaction(); @@ -3199,6 +3299,7 @@ AbortCurrentTransaction(void) case TBLOCK_STARTED: case TBLOCK_IMPLICIT_INPROGRESS: AbortTransaction(); + ApplyUndoActions(); CleanupTransaction(); s->blockState = TBLOCK_DEFAULT; break; @@ -3209,8 +3310,12 @@ AbortCurrentTransaction(void) * will interpret the error as meaning the BEGIN failed to get him * into a transaction block, so we should abort and return to idle * state. + * + * In this state, we must not have performed any operation which + * which can generate undo. */ case TBLOCK_BEGIN: + Assert(!s->performUndoActions); AbortTransaction(); CleanupTransaction(); s->blockState = TBLOCK_DEFAULT; @@ -3224,6 +3329,7 @@ AbortCurrentTransaction(void) case TBLOCK_INPROGRESS: case TBLOCK_PARALLEL_INPROGRESS: AbortTransaction(); + ApplyUndoActions(); s->blockState = TBLOCK_ABORT; /* CleanupTransaction happens when we exit TBLOCK_ABORT_END */ break; @@ -3235,6 +3341,7 @@ AbortCurrentTransaction(void) */ case TBLOCK_END: AbortTransaction(); + ApplyUndoActions(); CleanupTransaction(); s->blockState = TBLOCK_DEFAULT; break; @@ -3264,6 +3371,7 @@ AbortCurrentTransaction(void) */ case TBLOCK_ABORT_PENDING: AbortTransaction(); + ApplyUndoActions(); CleanupTransaction(); s->blockState = TBLOCK_DEFAULT; break; @@ -3275,6 +3383,7 @@ AbortCurrentTransaction(void) */ case TBLOCK_PREPARE: AbortTransaction(); + ApplyUndoActions(); CleanupTransaction(); s->blockState = TBLOCK_DEFAULT; break; @@ -3286,6 +3395,7 @@ AbortCurrentTransaction(void) */ case TBLOCK_SUBINPROGRESS: AbortSubTransaction(); + ApplyUndoActions(); s->blockState = TBLOCK_SUBABORT; break; @@ -3300,6 +3410,7 @@ AbortCurrentTransaction(void) case TBLOCK_SUBABORT_PENDING: case TBLOCK_SUBRESTART: AbortSubTransaction(); + ApplyUndoActions(); CleanupSubTransaction(); AbortCurrentTransaction(); break; @@ -3312,7 +3423,210 @@ AbortCurrentTransaction(void) CleanupSubTransaction(); AbortCurrentTransaction(); break; + + /* + * The error occurred while applying undo for a (sub)transaction. + * We can't reach here as while applying undo via top-level + * transaction, if we get an error, then it is handled by + * ApplyUndoActions and for subtransaction, we promote the error + * to fatal in such a situation. + */ + case TBLOCK_UNDO: + case TBLOCK_SUBUNDO: + elog(FATAL, "AbortCurrentTransaction: unexpected state %s", + BlockStateAsString(s->blockState)); + break; + } +} + +/* + * ApplyUndoActions - Execute undo actions for current (sub)xact. + * + * To execute undo actions during abort, we bring the transaction to a clean + * state by releasing the required resources and put it in a new state + * TRANS_UNDO. + * + * Note that we release locks after applying undo actions. We skip them + * during Abort(Sub)Transaction as otherwise there is always a risk of + * deadlock when we need to re-take them during processing of undo actions. + */ +void +ApplyUndoActions(void) +{ + TransactionState s = CurrentTransactionState; + uint32 save_holdoff; + volatile UndoRequestInfo urinfo; + volatile int per_level; + bool error = false; + + if (!s->performUndoActions) + return; + + /* + * State should still be TRANS_ABORT from AbortTransaction(). + */ + if (s->state != TRANS_ABORT) + elog(FATAL, "ApplyUndoActions: unexpected state %s", + TransStateAsString(s->state)); + + /* + * We promote the error level to FATAL if we get an error while applying + * undo for the subtransaction. See errstart. So, we should never reach + * here for such a case. + */ + Assert(!applying_subxact_undo); + + /* + * Do abort cleanup processing before applying the undo actions. We must + * do this before applying the undo actions to remove the effects of + * failed transaction. + */ + if (IsSubTransaction()) + { + AtSubCleanup_Portals(s->subTransactionId); + s->blockState = TBLOCK_SUBUNDO; + applying_subxact_undo = true; + + /* We can't afford to allow cancel of subtransaction's rollback. */ + HOLD_CANCEL_INTERRUPTS(); + } + else + { + AtCleanup_Portals(); /* now safe to release portal memory */ + AtEOXact_Snapshot(false, true); /* and release the transaction's + * snapshots */ + s->fullTransactionId = InvalidFullTransactionId; + s->subTransactionId = TopSubTransactionId; + s->blockState = TBLOCK_UNDO; + } + + s->state = TRANS_UNDO; + + for (per_level = 0; per_level < UndoPersistenceLevels; per_level++) + { + if (s->latest_urec_ptr[per_level]) + { + save_holdoff = InterruptHoldoffCount; + + PG_TRY(); + { + bool result = false; + + /* + * Prepare required undo request info so that it can be used in + * exception. + */ + ResetUndoRequestInfo(&urinfo); + urinfo.dbid = MyDatabaseId; + urinfo.full_xid = GetTopFullTransactionId(); + + /* + * If this request is not for a temp table and not aborting + * subtransaction and the request size is greater than some + * threshold then push it to undo-worker through RollbackHT, + * undo-worker will perform the corresponding undo actions + * later. + * + * We can't push the undo actions for temp table to background + * workers as the the temp tables are only accessible in the + * backend that has created them. We can't postpone applying + * undo actions for subtransactions as the modifications made + * by aborted subtransaction must not be visible even if the + * main transaction commits. It is not advisable to apply the + * undo actions of a very large transaction as that can lead + * to a delay in retruning the control back to user after + * abort. + */ + if (per_level != UNDO_TEMP && + !IsSubTransaction()) + result = RegisterRollbackReq(s->latest_urec_ptr[per_level], + s->start_urec_ptr[per_level], + MyDatabaseId, + urinfo.full_xid); + if (!result) + { + /* for subtransactions, we do partial rollback. */ + execute_undo_actions(urinfo.full_xid, + s->latest_urec_ptr[per_level], + s->start_urec_ptr[per_level], + !IsSubTransaction()); + } + } + PG_CATCH(); + { + if (per_level == UNDO_TEMP) + pg_rethrow_as_fatal(); + + /* + * Add the request into an error queue so that it can be + * processed in a timely fashion. + * + * If we fail to add the request in an error queue, then + * remove the entry from the hash table and continue to + * process the remaining undo requests if any. This request + * will be later processed by discard worker. + */ + if (!InsertRequestIntoErrorUndoQueue(&urinfo)) + RollbackHTRemoveEntry(urinfo.full_xid, urinfo.start_urec_ptr); + + /* + * Errors can reset holdoff count, so restore back. This is + * required because this function can be called after holding + * interrupts. + */ + InterruptHoldoffCount = save_holdoff; + + /* Send the error only to server log. */ + err_out_to_client(false); + EmitErrorReport(); + + error = true; + + /* + * We promote the error level to FATAL if we get an error + * while applying undo for the subtransaction. See errstart. + * So, we should never reach here for such a case. + */ + Assert(!applying_subxact_undo); + } + PG_END_TRY(); + } + } + + if (error) + { + /* + * This should take care of releasing the locks held under + * TopTransactionResourceOwner. + */ + AbortTransaction(); } + + /* Reset undo information */ + ResetUndoActionsInfo(); + + applying_subxact_undo = false; + + /* Release the locks after applying undo actions. */ + if (IsSubTransaction()) + { + ResourceOwnerRelease(s->curTransactionOwner, + RESOURCE_RELEASE_LOCKS, + false, false); + RESUME_CANCEL_INTERRUPTS(); + } + else + { + ResourceOwnerRelease(s->curTransactionOwner, + RESOURCE_RELEASE_LOCKS, + false, true); + } + + /* + * Here we again put back the transaction in abort state so that callers + * can proceed with the cleanup work. + */ + s->state = TRANS_ABORT; } /* @@ -3641,6 +3955,8 @@ BeginTransactionBlock(void) case TBLOCK_SUBRESTART: case TBLOCK_SUBABORT_RESTART: case TBLOCK_PREPARE: + case TBLOCK_UNDO: + case TBLOCK_SUBUNDO: elog(FATAL, "BeginTransactionBlock: unexpected state %s", BlockStateAsString(s->blockState)); break; @@ -3833,6 +4149,8 @@ EndTransactionBlock(bool chain) case TBLOCK_SUBRESTART: case TBLOCK_SUBABORT_RESTART: case TBLOCK_PREPARE: + case TBLOCK_UNDO: + case TBLOCK_SUBUNDO: elog(FATAL, "EndTransactionBlock: unexpected state %s", BlockStateAsString(s->blockState)); break; @@ -3949,6 +4267,8 @@ UserAbortTransactionBlock(bool chain) case TBLOCK_SUBRESTART: case TBLOCK_SUBABORT_RESTART: case TBLOCK_PREPARE: + case TBLOCK_UNDO: + case TBLOCK_SUBUNDO: elog(FATAL, "UserAbortTransactionBlock: unexpected state %s", BlockStateAsString(s->blockState)); break; @@ -4089,6 +4409,8 @@ DefineSavepoint(const char *name) case TBLOCK_SUBRESTART: case TBLOCK_SUBABORT_RESTART: case TBLOCK_PREPARE: + case TBLOCK_UNDO: + case TBLOCK_SUBUNDO: elog(FATAL, "DefineSavepoint: unexpected state %s", BlockStateAsString(s->blockState)); break; @@ -4107,6 +4429,18 @@ ReleaseSavepoint(const char *name) TransactionState s = CurrentTransactionState; TransactionState target, xact; + UndoRecPtr latest_urec_ptr[UndoPersistenceLevels]; + UndoRecPtr start_urec_ptr[UndoPersistenceLevels]; + int i = 0; + + /* + * Remember the 'from' and 'to' locations of the current transaction so + * that we can propagate it to parent transaction. This is required + * because in case the parent transaction get aborted we must not skip + * performing undo for this transaction. + */ + memcpy(latest_urec_ptr, s->latest_urec_ptr, sizeof(latest_urec_ptr)); + memcpy(start_urec_ptr, s->start_urec_ptr, sizeof(start_urec_ptr)); /* * Workers synchronize transaction state at the beginning of each parallel @@ -4165,6 +4499,8 @@ ReleaseSavepoint(const char *name) case TBLOCK_SUBRESTART: case TBLOCK_SUBABORT_RESTART: case TBLOCK_PREPARE: + case TBLOCK_UNDO: + case TBLOCK_SUBUNDO: elog(FATAL, "ReleaseSavepoint: unexpected state %s", BlockStateAsString(s->blockState)); break; @@ -4200,8 +4536,37 @@ ReleaseSavepoint(const char *name) if (xact == target) break; xact = xact->parent; + + /* + * Propagate the 'from' and 'to' undo locations to parent transaction. + */ + for (i = 0; i < UndoPersistenceLevels; i++) + { + if (!UndoRecPtrIsValid(latest_urec_ptr[i])) + latest_urec_ptr[i] = xact->latest_urec_ptr[i]; + + if (UndoRecPtrIsValid(xact->start_urec_ptr[i])) + start_urec_ptr[i] = xact->start_urec_ptr[i]; + } + + Assert(PointerIsValid(xact)); } + + /* + * Before cleaning up the current sub transaction state, overwrite parent + * transaction's latest_urec_ptr with current transaction's + * latest_urec_ptr so that in case parent transaction get aborted we will + * not skip performing undo for this transaction. Also set the + * start_urec_ptr if parent start_urec_ptr is not valid. + */ + for (i = 0; i < UndoPersistenceLevels; i++) + { + if (UndoRecPtrIsValid(latest_urec_ptr[i])) + xact->parent->latest_urec_ptr[i] = latest_urec_ptr[i]; + if (!UndoRecPtrIsValid(xact->parent->start_urec_ptr[i])) + xact->parent->start_urec_ptr[i] = start_urec_ptr[i]; + } } /* @@ -4274,6 +4639,8 @@ RollbackToSavepoint(const char *name) case TBLOCK_SUBRESTART: case TBLOCK_SUBABORT_RESTART: case TBLOCK_PREPARE: + case TBLOCK_UNDO: + case TBLOCK_SUBUNDO: elog(FATAL, "RollbackToSavepoint: unexpected state %s", BlockStateAsString(s->blockState)); break; @@ -4392,6 +4759,8 @@ BeginInternalSubTransaction(const char *name) case TBLOCK_SUBABORT_PENDING: case TBLOCK_SUBRESTART: case TBLOCK_SUBABORT_RESTART: + case TBLOCK_UNDO: + case TBLOCK_SUBUNDO: elog(FATAL, "BeginInternalSubTransaction: unexpected state %s", BlockStateAsString(s->blockState)); break; @@ -4412,6 +4781,7 @@ void ReleaseCurrentSubTransaction(void) { TransactionState s = CurrentTransactionState; + int i; /* * Workers synchronize transaction state at the beginning of each parallel @@ -4430,6 +4800,22 @@ ReleaseCurrentSubTransaction(void) BlockStateAsString(s->blockState)); Assert(s->state == TRANS_INPROGRESS); MemoryContextSwitchTo(CurTransactionContext); + + /* + * Before cleaning up the current sub transaction state, overwrite parent + * transaction's latest_urec_ptr with current transaction's + * latest_urec_ptr so that in case parent transaction get aborted we will + * not skip performing undo for this transaction. + */ + for (i = 0; i < UndoPersistenceLevels; i++) + { + if (UndoRecPtrIsValid(s->latest_urec_ptr[i])) + s->parent->latest_urec_ptr[i] = s->latest_urec_ptr[i]; + + if (!UndoRecPtrIsValid(s->parent->start_urec_ptr[i])) + s->parent->start_urec_ptr[i] = s->start_urec_ptr[i]; + } + CommitSubTransaction(); s = CurrentTransactionState; /* changed by pop */ Assert(s->state == TRANS_INPROGRESS); @@ -4481,17 +4867,29 @@ RollbackAndReleaseCurrentSubTransaction(void) case TBLOCK_SUBRESTART: case TBLOCK_SUBABORT_RESTART: case TBLOCK_PREPARE: + case TBLOCK_UNDO: + case TBLOCK_SUBUNDO: elog(FATAL, "RollbackAndReleaseCurrentSubTransaction: unexpected state %s", BlockStateAsString(s->blockState)); break; } /* + * Set the information required to perform undo actions. Note that, it + * must be done before AbortSubTransaction as we need to skip releasing + * locks if that is the case. See ApplyUndoActions. + */ + SetUndoActionsInfo(); + + /* * Abort the current subtransaction, if needed. */ if (s->blockState == TBLOCK_SUBINPROGRESS) AbortSubTransaction(); + /* Execute undo actions */ + ApplyUndoActions(); + /* And clean it up, too */ CleanupSubTransaction(); @@ -4518,6 +4916,14 @@ AbortOutOfAnyTransaction(void) AtAbort_Memory(); /* + * Here, we just detect whether there are any pending undo actions so that + * we can skip releasing the locks during abort transaction. We don't + * release the locks till we execute undo actions otherwise, there is a + * risk of deadlock. + */ + SetUndoActionsInfo(); + + /* * Get out of any transaction or nested transaction */ do @@ -4537,7 +4943,11 @@ AbortOutOfAnyTransaction(void) * incompletely started transaction. First, adjust the * low-level state to suppress warning message from * AbortTransaction. + * + * In this state, we must not have performed any operation + * which can generate undo. */ + Assert(!s->performUndoActions); if (s->state == TRANS_START) s->state = TRANS_INPROGRESS; AbortTransaction(); @@ -4554,6 +4964,19 @@ AbortOutOfAnyTransaction(void) case TBLOCK_PREPARE: /* In a transaction, so clean up */ AbortTransaction(); + ApplyUndoActions(); + CleanupTransaction(); + s->blockState = TBLOCK_DEFAULT; + break; + case TBLOCK_UNDO: + + /* + * We reach here when we got error while applying undo + * actions, so we don't want to again start applying it. Undo + * workers can take care of it. + */ + ResetUndoActionsInfo(); + AbortTransaction(); CleanupTransaction(); s->blockState = TBLOCK_DEFAULT; break; @@ -4581,6 +5004,19 @@ AbortOutOfAnyTransaction(void) case TBLOCK_SUBABORT_PENDING: case TBLOCK_SUBRESTART: AbortSubTransaction(); + ApplyUndoActions(); + CleanupSubTransaction(); + s = CurrentTransactionState; /* changed by pop */ + break; + case TBLOCK_SUBUNDO: + + /* + * We reach here when we got error while applying undo + * actions, so we don't want to again start applying it. Undo + * workers can take care of it. + */ + ResetUndoActionsInfo(); + AbortSubTransaction(); CleanupSubTransaction(); s = CurrentTransactionState; /* changed by pop */ break; @@ -4674,6 +5110,8 @@ TransactionBlockStatusCode(void) case TBLOCK_SUBABORT_PENDING: case TBLOCK_SUBRESTART: case TBLOCK_SUBABORT_RESTART: + case TBLOCK_UNDO: + case TBLOCK_SUBUNDO: return 'E'; /* in failed transaction */ } @@ -4713,6 +5151,7 @@ static void StartSubTransaction(void) { TransactionState s = CurrentTransactionState; + int i; if (s->state != TRANS_DEFAULT) elog(WARNING, "StartSubTransaction while in %s state", @@ -4730,6 +5169,15 @@ StartSubTransaction(void) AtSubStart_Notify(); AfterTriggerBeginSubXact(); + /* initialize undo record locations for the transaction */ + for (i = 0; i < UndoPersistenceLevels; i++) + { + s->start_urec_ptr[i] = InvalidUndoRecPtr; + s->latest_urec_ptr[i] = InvalidUndoRecPtr; + } + s->performUndoActions = false; + + s->subXactLock = false; s->state = TRANS_INPROGRESS; /* @@ -4924,7 +5372,8 @@ AbortSubTransaction(void) */ ShowTransactionState("AbortSubTransaction"); - if (s->state != TRANS_INPROGRESS) + if (s->state != TRANS_INPROGRESS && + s->state != TRANS_UNDO) elog(WARNING, "AbortSubTransaction while in %s state", TransStateAsString(s->state)); @@ -5352,6 +5801,8 @@ BlockStateAsString(TBlockState blockState) return "ABORT_PENDING"; case TBLOCK_PREPARE: return "PREPARE"; + case TBLOCK_UNDO: + return "UNDO"; case TBLOCK_SUBBEGIN: return "SUBBEGIN"; case TBLOCK_SUBINPROGRESS: @@ -5370,6 +5821,8 @@ BlockStateAsString(TBlockState blockState) return "SUBRESTART"; case TBLOCK_SUBABORT_RESTART: return "SUBABORT_RESTART"; + case TBLOCK_SUBUNDO: + return "SUBUNDO"; } return "UNRECOGNIZED"; } @@ -5395,6 +5848,8 @@ TransStateAsString(TransState state) return "ABORT"; case TRANS_PREPARE: return "PREPARE"; + case TRANS_UNDO: + return "UNDO"; } return "UNRECOGNIZED"; } @@ -5994,3 +6449,52 @@ xact_redo(XLogReaderState *record) else elog(PANIC, "xact_redo: unknown op code %u", info); } + +/* + * SetUndoActionsInfo - set the start and end undo record pointers before + * performing the undo actions. + */ +void +SetUndoActionsInfo(void) +{ + TransactionState s = CurrentTransactionState; + int i; + + for (i = 0; i < UndoPersistenceLevels; i++) + { + if (s->latest_urec_ptr[i]) + { + s->performUndoActions = true; + break; + } + } +} + +/* + * ResetUndoActionsInfo - reset the start and end undo record pointers. + */ +void +ResetUndoActionsInfo(void) +{ + TransactionState s = CurrentTransactionState; + int i; + + s->performUndoActions = false; + for (i = 0; i < UndoPersistenceLevels; i++) + { + s->start_urec_ptr[i] = InvalidUndoRecPtr; + s->latest_urec_ptr[i] = InvalidUndoRecPtr; + } +} + +/* + * CanPerformUndoActions - Returns true, if the current transaction can + * perform undo actions, false otherwise. + */ +bool +CanPerformUndoActions(void) +{ + TransactionState s = CurrentTransactionState; + + return s->performUndoActions; +} diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 28fdaaf..007911c 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -5162,6 +5162,7 @@ BootStrapXLOG(void) checkPoint.newestCommitTsXid = InvalidTransactionId; checkPoint.time = (pg_time_t) time(NULL); checkPoint.oldestActiveXid = InvalidTransactionId; + checkPoint.oldestXidWithEpochHavingUndo = InvalidTransactionId; ShmemVariableCache->nextFullXid = checkPoint.nextFullXid; ShmemVariableCache->nextOid = checkPoint.nextOid; @@ -6613,6 +6614,9 @@ StartupXLOG(void) (errmsg_internal("commit timestamp Xid oldest/newest: %u/%u", checkPoint.oldestCommitTsXid, checkPoint.newestCommitTsXid))); + ereport(DEBUG1, + (errmsg_internal("oldest xid with epoch having undo: " UINT64_FORMAT, + checkPoint.oldestXidWithEpochHavingUndo))); if (!TransactionIdIsNormal(XidFromFullTransactionId(checkPoint.nextFullXid))) ereport(PANIC, (errmsg("invalid next transaction ID"))); @@ -6629,6 +6633,10 @@ StartupXLOG(void) checkPoint.newestCommitTsXid); XLogCtl->ckptFullXid = checkPoint.nextFullXid; + /* Read oldest xid having undo from checkpoint and set in proc global. */ + pg_atomic_write_u64(&ProcGlobal->oldestXidWithEpochHavingUndo, + checkPoint.oldestXidWithEpochHavingUndo); + /* * Initialize replication slots, before there's a chance to remove * required resources. @@ -7317,7 +7325,13 @@ StartupXLOG(void) * end-of-recovery steps fail. */ if (InRecovery) + { ResetUnloggedRelations(UNLOGGED_RELATION_INIT); + ResetUndoLogs(UNDO_UNLOGGED); + } + + /* Always reset temporary undo logs. */ + ResetUndoLogs(UNDO_TEMP); /* * We don't need the latch anymore. It's not strictly necessary to disown @@ -8713,6 +8727,10 @@ CreateCheckPoint(int flags) if (!shutdown) checkPoint.nextOid += ShmemVariableCache->oidCount; LWLockRelease(OidGenLock); + + checkPoint.oldestXidWithEpochHavingUndo = + pg_atomic_read_u64(&ProcGlobal->oldestXidWithEpochHavingUndo); + MultiXactGetCheckptMulti(shutdown, &checkPoint.nextMulti, @@ -9625,6 +9643,9 @@ xlog_redo(XLogReaderState *record) MultiXactAdvanceOldest(checkPoint.oldestMulti, checkPoint.oldestMultiDB); + + pg_atomic_write_u64(&ProcGlobal->oldestXidWithEpochHavingUndo, + checkPoint.oldestXidWithEpochHavingUndo); /* * No need to set oldestClogXid here as well; it'll be set when we @@ -9683,12 +9704,17 @@ xlog_redo(XLogReaderState *record) /* ControlFile->checkPointCopy always tracks the latest ckpt XID */ ControlFile->checkPointCopy.nextFullXid = checkPoint.nextFullXid; + ControlFile->checkPointCopy.oldestXidWithEpochHavingUndo = + checkPoint.oldestXidWithEpochHavingUndo; /* Update shared-memory copy of checkpoint XID/epoch */ SpinLockAcquire(&XLogCtl->info_lck); XLogCtl->ckptFullXid = checkPoint.nextFullXid; SpinLockRelease(&XLogCtl->info_lck); + ControlFile->checkPointCopy.oldestXidWithEpochHavingUndo = + checkPoint.oldestXidWithEpochHavingUndo; + /* * We should've already switched to the new TLI before replaying this * record. @@ -9727,6 +9753,9 @@ xlog_redo(XLogReaderState *record) /* Handle multixact */ MultiXactAdvanceNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset); + + pg_atomic_write_u64(&ProcGlobal->oldestXidWithEpochHavingUndo, + checkPoint.oldestXidWithEpochHavingUndo); /* * NB: This may perform multixact truncation when replaying WAL diff --git a/src/backend/access/undo/Makefile b/src/backend/access/undo/Makefile index f41e8f7..6e4c890 100644 --- a/src/backend/access/undo/Makefile +++ b/src/backend/access/undo/Makefile @@ -12,6 +12,7 @@ subdir = src/backend/access/undo top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global -OBJS = undoinsert.o undolog.o undorecord.o +OBJS = discardworker.o undoaction.o undoactionxlog.o undodiscard.o undoinsert.o \ + undolog.o undorecord.o undorequest.o undoworker.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/access/undo/README.UndoProcessing b/src/backend/access/undo/README.UndoProcessing new file mode 100644 index 0000000..a0e0f96 --- /dev/null +++ b/src/backend/access/undo/README.UndoProcessing @@ -0,0 +1,96 @@ +src/backend/access/undo/README.UndoProcessing + +Transaction Rollbacks and Undo Processing +------------------------------------------ +We always perform rollback actions after cleaning up the current +(sub)transaction. This will ensure that we perform the actions immediately +after error rather than when user issues Rollback command at some later point +of time. We are releasing the locks after the undo actions are applied. The +reason to delay lock release is that if we release locks before applying undo +actions, then the parallel session can acquire the lock before us which can +lead to deadlock. To execute undo actions during abort, we bring the +transaction to a clean state by releasing the required resources and put it in +a new state TRANS_UNDO which indicates that undo apply is in progress. This +state is considered as a valid state which means that it is safe to initiate a +database access, acquire heavyweight locks, etc. in this state. We have also +introduced new block states TBLOCK_UNDO and TBLOCK_SUBUNDO, so that if we get +an error while applying undo, we don't restart applying it again and rather +just perform Abort/Cleanup of transaction. + +We promote the error to FATAL error if it occurred while applying undo for a +subtransaction. The reason we can't proceed without applying subtransaction's +undo is that the modifications made in that case must not be visible even if +the main transaction commits. Normally, the backends that receive the request +to perform Rollback (To Savepoint) applies the undo actions, but there are +cases where it is preferable to push the requests to background workers. The +main reasons to push the requests to background workers are (a) The request for +a very large rollback, this will allow us to return control to users quickly. +There is a guc rollback_overflow_size which indicates that rollbacks greater +than the configured size are performed lazily by background workers. (b) While +applying the undo actions, if there is an error, we push such a request to +background workers. + +We do have some restrictions on which requests can be pushed to the background +workers. In single user mode, all the requests are performed in foreground. +We can't push the undo actions for temp table to background workers as the temp +tables are only accessible in the backend that has created them. We can't +postpone applying undo actions for subtransactions as the modifications +made by aborted subtransaction must not be visible even if the main transaction +commits. + +Undo Requests and Undo workers +------------------------------- +To improve the efficiency of the rollbacks, we create three queues and a hash +table for the rollback requests. A Xid based priority queue which will allow +us to process the requests of older transactions and help us to move +oldesdXidHavingUndo (this is a xid-horizon below which all the transactions are +visible) forward. A size-based queue which will help us to perform the rollbacks +of larger aborts in a timely fashion, so that we don't get stuck while processing +them during discard of the logs. An error queue to hold the requests for +transactions that failed to apply its undo. The rollback hash table is used to +avoid duplicate undo requests by backends and discard worker. The table must be +able to accommodate all active undo requests. The undo requests must appear in +both xid and size requests queues or neither. As of now we, process the requests +from these queues in a round-robin fashion to give equal priority to all three +types of requests. + +Note that, if the request queues are full, then we put backpressure on backends +to complete the requests by themselves. There is an exception to it where when +error queue becomes full, we just remove the request from the hash table and +continue to process other requests if any. The discard worker will find this +errored transaction at later point of time and again add it to the request +queues. + +To process the request, we get the request from one of the queues, search it in +hash table and mark it as in-progress and then remove from the respective queue. +After that, we perform the request which means apply the undo actions and +remove it from the hash table. + +Undo launcher is responsible for launching the workers iff there is some work +available in one of the work queues and there are more workers available. The +worker is launched to handle requests for a particular database. Each undo +worker then start reading from one of the queues the requests for that +particular database. A worker would peek into each queue for the requests from +a particular database, if it needs to switch a database in less than +undo_worker_quantum ms (10s as default) after starting. Also, if there is no +work, it lingers for UNDO_WORKER_LINGER_MS (10s as default). This avoids +restarting the workers too frequently. + +Discard Worker +--------------- +The discard worker is responsible for discarding the undo log of transactions +that are committed and all-visible or are rolled-back. It also registers the +request for aborted transactions in the work queues. It iterates through all +the active logs one-by-one and try to discard the transactions that are old +enough to matter. + +For transactions that span across multiple logs, the log for committed and +all-visible transactions are discarded separately for each log. This is +possible as the transactions that span across logs have separate transaction +header for each log. For aborted transactions, we try to process the actions +of the entire transaction at one-shot as we need to perform the actions +starting from end location to start location. However, it is possible that the +later portion of the transaction that is overflowed into a separate log can be +processed separately if we encounter the corresponding log first. If we want +we can combine the log for processing in that case as well, but there is no +clear advantage of the same. diff --git a/src/backend/access/undo/discardworker.c b/src/backend/access/undo/discardworker.c new file mode 100644 index 0000000..4476769 --- /dev/null +++ b/src/backend/access/undo/discardworker.c @@ -0,0 +1,196 @@ +/*------------------------------------------------------------------------- + * + * discardworker.c + * The undo discard worker for asynchronous undo management. + * + * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/postmaster/discardworker.c + * + * The main responsibility of the discard worker is to discard the undo log + * of transactions that are committed and all-visible or are rolledback. It + * also registers the request for aborted transactions in the work queues. + * To know more about work queues, see undorequest.c. It iterates through all + * the active logs one-by-one and try to discard the transactions that are old + * enough to matter. + * + * For tranasctions that spans across multiple logs, the log for committed and + * all-visible transactions are discarded seprately for each log. This is + * possible as the transactions that span across logs have separate transaction + * header for each log. For aborted transactions, we try to process the actions + * of entire transaction at one-shot as we need to perform the actions starting + * from end location to start location. However, it is possbile that the later + * portion of transaction that is overflowed into a separate log can be processed + * separately if we encounter the corresponding log first. If we want we can + * combine the log for processing in that case as well, but there is no clear + * advantage of the same. + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include + +#include "access/undodiscard.h" +#include "access/discardworker.h" +#include "miscadmin.h" +#include "pgstat.h" +#include "postmaster/bgworker.h" +#include "storage/ipc.h" +#include "storage/latch.h" +#include "storage/lwlock.h" +#include "storage/proc.h" +#include "storage/procarray.h" +#include "storage/shmem.h" +#include "tcop/tcopprot.h" +#include "utils/guc.h" +#include "utils/resowner.h" + +static void undoworker_sigterm_handler(SIGNAL_ARGS); + +/* max sleep time between cycles (100 milliseconds) */ +#define MIN_NAPTIME_PER_CYCLE 100L +#define DELAYED_NAPTIME 10 * MIN_NAPTIME_PER_CYCLE +#define MAX_NAPTIME_PER_CYCLE 100 * MIN_NAPTIME_PER_CYCLE + +static bool got_SIGTERM = false; +static bool hibernate = false; +static long wait_time = MIN_NAPTIME_PER_CYCLE; +static bool am_discard_worker = false; + +/* SIGTERM: set flag to exit at next convenient time */ +static void +undoworker_sigterm_handler(SIGNAL_ARGS) +{ + got_SIGTERM = true; + + /* Waken anything waiting on the process latch */ + SetLatch(MyLatch); +} + +/* + * DiscardWorkerRegister -- Register a undo discard worker. + */ +void +DiscardWorkerRegister(void) +{ + BackgroundWorker bgw; + + /* TODO: This should be configurable. */ + + memset(&bgw, 0, sizeof(bgw)); + bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | + BGWORKER_BACKEND_DATABASE_CONNECTION; + bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; + snprintf(bgw.bgw_name, BGW_MAXLEN, "discard worker"); + sprintf(bgw.bgw_library_name, "postgres"); + sprintf(bgw.bgw_function_name, "DiscardWorkerMain"); + bgw.bgw_restart_time = 5; + bgw.bgw_notify_pid = 0; + bgw.bgw_main_arg = (Datum) 0; + + RegisterBackgroundWorker(&bgw); +} + +/* + * DiscardWorkerMain -- Main loop for the undo discard worker. + */ +void +DiscardWorkerMain(Datum main_arg) +{ + ereport(LOG, + (errmsg("discard worker started"))); + + /* Establish signal handlers. */ + pqsignal(SIGTERM, undoworker_sigterm_handler); + BackgroundWorkerUnblockSignals(); + + am_discard_worker = true; + + /* Make it easy to identify our processes. */ + SetConfigOption("application_name", MyBgworkerEntry->bgw_name, + PGC_USERSET, PGC_S_SESSION); + + /* Establish connection to nailed catalogs. */ + BackgroundWorkerInitializeConnection(NULL, NULL, 0); + + /* Enter main loop */ + while (!got_SIGTERM) + { + int rc; + + TransactionId OldestXmin, + oldestXidHavingUndo; + + /* + * It is okay to ignore vacuum transaction here, as we can discard the + * undo of the vacuuming transaction if the transaction is committed. + * We don't need to hold its undo for the visibility purpose. + */ + OldestXmin = GetOldestXmin(NULL, PROCARRAY_FLAGS_AUTOVACUUM | + PROCARRAY_FLAGS_VACUUM); + + oldestXidHavingUndo = GetXidFromEpochXid( + pg_atomic_read_u64(&ProcGlobal->oldestXidWithEpochHavingUndo)); + + /* + * Call the discard routine if there oldestXidHavingUndo is lagging + * behind OldestXmin. + */ + if (OldestXmin != InvalidTransactionId && + TransactionIdPrecedes(oldestXidHavingUndo, OldestXmin)) + { + UndoDiscard(OldestXmin, &hibernate); + + /* + * If we got some undo logs to discard or discarded something, + * then reset the wait_time as we have got work to do. Note that + * if there are some undologs that cannot be discarded, then above + * condition will remain unsatisfied till oldestXmin remains + * unchanged and the wait_time will not reset in that case. + */ + if (!hibernate) + wait_time = MIN_NAPTIME_PER_CYCLE; + } + + /* Wait for more work. */ + rc = WaitLatch(&MyProc->procLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + wait_time, + WAIT_EVENT_UNDO_DISCARD_WORKER_MAIN); + + ResetLatch(&MyProc->procLatch); + + /* + * Increase the wait_time based on the length of inactivity. If + * wait_time is within one second, then increment it by 100 ms at a + * time. Henceforth, increment it one second at a time, till it + * reaches ten seconds. Never increase the wait_time more than ten + * seconds, it will be too much of waiting otherwise. + */ + if (rc & WL_TIMEOUT && hibernate) + { + wait_time += (wait_time < DELAYED_NAPTIME ? + MIN_NAPTIME_PER_CYCLE : DELAYED_NAPTIME); + if (wait_time > MAX_NAPTIME_PER_CYCLE) + wait_time = MAX_NAPTIME_PER_CYCLE; + } + + /* emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + } + + /* we're done */ + ereport(LOG, + (errmsg("discard worker shutting down"))); + + proc_exit(0); +} + +bool +IsDiscardProcess(void) +{ + return am_discard_worker; +} diff --git a/src/backend/access/undo/undoaction.c b/src/backend/access/undo/undoaction.c new file mode 100644 index 0000000..31ee8e8 --- /dev/null +++ b/src/backend/access/undo/undoaction.c @@ -0,0 +1,303 @@ +/*------------------------------------------------------------------------- + * + * undoaction.c + * execute undo actions + * + * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/backend/access/undo/undoaction.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/table.h" +#include "access/undoaction_xlog.h" +#include "access/undolog.h" +#include "access/undorequest.h" +#include "access/xact.h" +#include "access/xloginsert.h" +#include "access/xlog_internal.h" +#include "nodes/pg_list.h" +#include "pgstat.h" +#include "storage/block.h" +#include "storage/buf.h" +#include "storage/bufmgr.h" +#include "utils/relfilenodemap.h" +#include "utils/syscache.h" +#include "miscadmin.h" +#include "storage/shmem.h" +#include "access/undodiscard.h" + +/* + * undo_record_comparator + * + * qsort comparator to handle undo record for applying undo actions of the + * transaction. + */ +static int +undo_record_comparator(const void *left, const void *right) +{ + UnpackedUndoRecord *luur = ((UndoRecInfo *) left)->uur; + UnpackedUndoRecord *ruur = ((UndoRecInfo *) right)->uur; + + if (luur->uur_reloid < ruur->uur_reloid) + return -1; + else if (luur->uur_reloid > ruur->uur_reloid) + return 1; + else if (luur->uur_block == ruur->uur_block) + { + /* + * If records are for the same block then maintain their existing + * order by comparing their index in the array. Because for single + * block we need to maintain the order for applying undo action. + */ + if (((UndoRecInfo *) left)->index < ((UndoRecInfo *) right)->index) + return -1; + else + return 1; + } + else if (luur->uur_block < ruur->uur_block) + return -1; + else + return 1; +} + +/* + * execute_undo_actions - Execute the undo actions + * + * xid - Transaction id that is getting rolled back. + * from_urecptr - undo record pointer from where to start applying undo action. + * to_urecptr - undo record pointer upto which point apply undo action. + * nopartial - true if rollback is for complete transaction. + */ +void +execute_undo_actions(FullTransactionId full_xid, UndoRecPtr from_urecptr, + UndoRecPtr to_urecptr, bool nopartial) +{ + UnpackedUndoRecord *uur = NULL; + UndoRecInfo *urp_array; + UndoRecPtr urec_ptr; + ForkNumber prev_fork = InvalidForkNumber; + BlockNumber prev_block = InvalidBlockNumber; + int undo_apply_size = maintenance_work_mem * 1024L; + TransactionId xid = XidFromFullTransactionId(full_xid); + + + /* 'from' and 'to' pointers must be valid. */ + Assert(from_urecptr != InvalidUndoRecPtr); + Assert(to_urecptr != InvalidUndoRecPtr); + + urec_ptr = from_urecptr; + + if (nopartial) + { + /* + * It is important here to fetch the latest undo record and validate if + * the actions are already executed. The reason is that it is possible + * that discard worker or backend might try to execute the rollback + * request which is already executed. For ex., after discard worker + * fetches the record and found that this transaction need to be + * rolledback, backend might concurrently execute the actions and + * remove the request from rollback hash table. The similar problem + * can happen if the discard worker first pushes the request, the undo + * worker processed it and backend tries to process it some later point. + */ + uur = UndoFetchRecord(to_urecptr, InvalidBlockNumber, InvalidOffsetNumber, + InvalidTransactionId, NULL, NULL); + + /* already processed. */ + if (uur == NULL) + return; + + /* + * We don't need to execute the undo actions if they are already + * executed. + */ + if (uur->uur_progress != 0) + return; + + Assert(xid == uur->uur_xid); + + UndoRecordRelease(uur); + uur = NULL; + } + + /* + * Fetch the multiple undo records which can fit into uur_segment; sort + * them in order of reloid and block number then apply them together + * page-wise. Repeat this until we get invalid undo record pointer. + */ + do + { + Oid prev_reloid = InvalidOid; + bool blk_chain_complete; + int i; + int nrecords; + int last_index = 0; + int prefetch_pages = 0; + + /* + * If urec_ptr is not valid means we have complete all undo actions + * for this transaction, otherwise we need to fetch the next batch of + * the undo records. + */ + if (!UndoRecPtrIsValid(urec_ptr)) + break; + + /* + * Fetch multiple undo record in bulk. This will return the array of + * undo record which will holds undo record pointers and the pointers + * to the actual unpacked undo record. This will also update the + * number of undo records it has copied in the urp_array. Also, for + * prefetching the target block ahead of applying undo actions it will + * update undo_blkinfo which will contains the information of the data + * blocks for which undo actions are going to applied for this undo + * record batch. + */ + urp_array = UndoRecordBulkFetch(&urec_ptr, to_urecptr, undo_apply_size, + &nrecords, false); + if (nrecords == 0) + break; + + Assert(TransactionIdEquals(xid, urp_array[0].uur->uur_xid)); + + /* Sort the undo record array in order of target blocks. */ + qsort((void *) urp_array, nrecords, sizeof(UndoRecInfo), + undo_record_comparator); + + if (nopartial && !UndoRecPtrIsValid(urec_ptr)) + blk_chain_complete = true; + else + blk_chain_complete = false; + + /* + * Now we have urp_array which is sorted in the block order so + * traverse this array and apply the undo action block by block. + */ + for (i = last_index; i < nrecords; i++) + { + UnpackedUndoRecord *uur = urp_array[i].uur; + + /* + * If this undo is not for the same block then apply all undo + * actions for the previous block. + */ + if (OidIsValid(prev_reloid) && + (prev_reloid != uur->uur_reloid || + prev_fork != uur->uur_fork || + prev_block != uur->uur_block)) + { + execute_undo_actions_page(urp_array, last_index, i - 1, + prev_reloid, full_xid, prev_block, + blk_chain_complete); + last_index = i; + + /* We have consumed one prefetched page. */ + if (prefetch_pages > 0) + prefetch_pages--; + } + + prev_reloid = uur->uur_reloid; + prev_fork = uur->uur_fork; + prev_block = uur->uur_block; + } + + /* Apply the last set of the actions. */ + execute_undo_actions_page(urp_array, last_index, i - 1, + prev_reloid, full_xid, prev_block, + blk_chain_complete); + + /* Free all undo records. */ + for (i = 0; i < nrecords; i++) + UndoRecordRelease(urp_array[i].uur); + + /* + * Free urp array and undo_blkinfo array for the current batch of undo + * records. + */ + pfree(urp_array); + } while (true); + + /* + * Set undo action apply progress as completed in the transaction header + * if this is a main transaction. + */ + if (nopartial) + { + /* + * Prepare and update the progress of the undo action apply in the + * transaction header. + */ + PrepareUpdateUndoActionProgress(NULL, to_urecptr, 1); + + START_CRIT_SECTION(); + + /* Update the progress in the transaction header. */ + UndoRecordUpdateTransInfo(0); + + /* WAL log the undo apply progress. */ + { + xl_undoapply_progress xlrec; + + xlrec.urec_ptr = to_urecptr; + xlrec.progress = 1; + + /* + * FIXME : We need to register undo buffers and set LSN for + * them that will be required for FPW of the undo buffers. + * This is currently not possible as undolog API only allows + * register buffer via backends and we can reach here via + * backend. + */ + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, sizeof(xlrec)); + + /* RegisterUndoLogBuffers(2); */ + (void) XLogInsert(RM_UNDOACTION_ID, XLOG_UNDO_APPLY_PROGRESS); + /* UndoLogBuffersSetLSN(recptr); */ + } + + END_CRIT_SECTION(); + UnlockReleaseUndoBuffers(); + + /* + * Undo action is applied so delete the hash table entry. + */ + Assert(TransactionIdIsValid(xid)); + RollbackHTRemoveEntry(full_xid, to_urecptr); + } +} + +/* + * execute_undo_actions_page - Execute the undo actions for a page + * + * urp_array - array of undo records (along with their location) for which undo + * action needs to be applied. + * first_idx - index in the urp_array of the first undo action to be applied + * last_idx - index in the urp_array of the first undo action to be applied + * reloid - OID of relation on which undo actions needs to be applied. + * blkno - block number on which undo actions needs to be applied. + * blk_chain_complete - indicates whether the undo chain for block is + * complete. + * + * returns true, if successfully applied the undo actions, otherwise, false. + */ +bool +execute_undo_actions_page(UndoRecInfo * urp_array, int first_idx, int last_idx, + Oid reloid, FullTransactionId full_xid, BlockNumber blkno, + bool blk_chain_complete) +{ + /* + * All records passed to us are for the same RMGR, so we just use the + * first record to dispatch. + */ + Assert(urp_array != NULL); + + return RmgrTable[urp_array[0].uur->uur_rmid].rm_undo(urp_array, first_idx, + last_idx, reloid, + full_xid, blkno, + blk_chain_complete); +} diff --git a/src/backend/access/undo/undoactionxlog.c b/src/backend/access/undo/undoactionxlog.c new file mode 100644 index 0000000..087e737 --- /dev/null +++ b/src/backend/access/undo/undoactionxlog.c @@ -0,0 +1,49 @@ +/*------------------------------------------------------------------------- + * + * undoactionxlog.c + * WAL replay logic for undo actions. + * + * + * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/access/undo/undoactionxlog.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/undoaction_xlog.h" +#include "access/undoinsert.h" +#include "access/xlog.h" +#include "access/xlogutils.h" + +/* + * Replay of undo apply progress. + */ +static void +undo_xlog_apply_progress(XLogReaderState *record) +{ + xl_undoapply_progress *xlrec = (xl_undoapply_progress *) XLogRecGetData(record); + + /* Update the progress in the transaction header. */ + PrepareUpdateUndoActionProgress(record, xlrec->urec_ptr, xlrec->progress); + UndoRecordUpdateTransInfo(0); + UnlockReleaseUndoBuffers(); +} + +void +undoaction_redo(XLogReaderState *record) +{ + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + switch (info) + { + case XLOG_UNDO_APPLY_PROGRESS: + undo_xlog_apply_progress(record); + break; + default: + elog(PANIC, "undoaction_redo: unknown op code %u", info); + } +} diff --git a/src/backend/access/undo/undodiscard.c b/src/backend/access/undo/undodiscard.c new file mode 100644 index 0000000..cf304ba --- /dev/null +++ b/src/backend/access/undo/undodiscard.c @@ -0,0 +1,348 @@ +/*------------------------------------------------------------------------- + * + * undodiscard.c + * discard undo records + * + * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/backend/access/undo/undodiscard.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/xact.h" +#include "access/xlog.h" +#include "access/undolog.h" +#include "access/undodiscard.h" +#include "access/undorequest.h" +#include "catalog/pg_tablespace.h" +#include "miscadmin.h" +#include "storage/block.h" +#include "storage/buf.h" +#include "storage/bufmgr.h" +#include "storage/shmem.h" +#include "storage/proc.h" +#include "storage/procarray.h" +#include "utils/resowner.h" + +/* + * Discard the undo for the given log + * + * Search the undo log, get the start record for each transaction until we get + * the transaction with xid >= xmin or an invalid xid. Then call undolog + * routine to discard upto that point and update the memory structure for the + * log slot. We set the hibernate flag if we do not have any undo data that + * can be discarded, this flag is passed to the discard worker wherein it + * determines if system is idle and it should sleep for sometime. + * + * Return the oldest xid remaining in this undo log (which should be >= xmin, + * since we'll discard everything older). Return InvalidTransactionId if the + * undo log is empty. + */ +static FullTransactionId +UndoDiscardOneLog(UndoLogControl *log, TransactionId xmin, bool *hibernate) +{ + UndoRecPtr undo_recptr, next_insert; + UndoRecPtr next_urecptr = InvalidUndoRecPtr; + UnpackedUndoRecord *uur = NULL; + bool need_discard = false; + bool log_complete = false; + TransactionId undoxid = InvalidTransactionId; + TransactionId latest_discardxid = InvalidTransactionId; + uint32 epoch = 0; + + if (UndoRecPtrIsValid(log->oldest_data)) + undo_recptr = log->oldest_data; + else + undo_recptr = UndoLogGetFirstValidRecord(log, NULL); + + /* There might not be any undo log and hibernation might be needed. */ + *hibernate = true; + + StartTransactionCommand(); + + /* Loop until we run out of discardable transactions in the given log. */ + do + { + bool pending_abort = false; + + next_insert = UndoLogGetNextInsertPtr(log->logno, InvalidTransactionId); + + if (next_insert == undo_recptr) + { + /* + * The caller of this function must have ensured that there is + * something to discard. + */ + Assert(undo_recptr != log->oldest_data); + + /* Indicate that we have processed all the log. */ + log_complete = true; + } + else + { + /* Fetch the undo record for given undo_recptr. */ + uur = UndoFetchRecord(undo_recptr, InvalidBlockNumber, + InvalidOffsetNumber, InvalidTransactionId, + NULL, NULL); + + if (uur != NULL) + { + /* + * Add the aborted transaction to the rollback request queues. + * + * If the undo actions for the aborted transaction is already + * applied then continue discarding the undo log, otherwise, + * discard till current point and stop processing this undo + * log. + * + * We can ignore the abort for transactions whose + * corresponding database doesn't exist. + * + * XXX: We've added the transaction-in-progress check + * to avoid xids of in-progress autovacuum. Note that, + * while calculating xmin, we ignore the vacuum and + * autovacuum xids in DiscardWorkerMain. But, when a + * backend performs VACUUM, we forcefully clear the + * vacuum flag from MyPgXact in lazy_vacuum_zheap_rel. + * Hence, the problem arises only for autovacuum xids. + * We should fix this behaviour. Perhaps, discard worker + * should consider vacuum and autovacuum xid to calculate + * the xmin. But, in that case, a long-running autovacuum + * might block the discard worker for moving ahead. + */ + if (!TransactionIdDidCommit(uur->uur_xid) && + !TransactionIdIsInProgress(uur->uur_xid) && + TransactionIdPrecedes(uur->uur_xid, xmin) && + uur->uur_progress == 0 && + dbid_exists(uur->uur_dbid)) + { + FullTransactionId full_xid; + + full_xid = FullTransactionIdFromEpochAndXid(uur->uur_xidepoch, + uur->uur_xid); + (void) RegisterRollbackReq(InvalidUndoRecPtr, + undo_recptr, + uur->uur_dbid, + full_xid); + + pending_abort = true; + } + + next_urecptr = uur->uur_next; + undoxid = uur->uur_xid; + epoch = uur->uur_xidepoch; + + UndoRecordRelease(uur); + uur = NULL; + } + } + + /* + * We can discard upto this point when one of following conditions is + * met: (a) the next transaction is not all-visible. (b) there is no + * more log to process. (c) the transaction undo in current log is + * finished. (d) there is a pending abort. + */ + if ((TransactionIdIsValid(undoxid) && + TransactionIdFollowsOrEquals(undoxid, xmin)) || + next_urecptr == InvalidUndoRecPtr || + log_complete || + UndoRecPtrGetLogNo(next_urecptr) != log->logno || + pending_abort) + { + /* Hey, I got some undo log to discard, can not hibernate now. */ + *hibernate = false; + + /* + * If the transaction id is smaller than the xmin, it means this + * must be the last transaction in this undo log, so we need to + * get the last insert point in this undo log and discard till + * that point. + * + * Also, if the transaction has pending abort, stop discarding + * further. + */ + if (TransactionIdPrecedes(undoxid, xmin) && !pending_abort) + { + UndoRecPtr next_insert = InvalidUndoRecPtr; + + /* + * If the more undo has been inserted since last we checked, + * then we can process that as well. + */ + next_insert = UndoLogGetNextInsertPtr(log->logno, undoxid); + if (!UndoRecPtrIsValid(next_insert)) + continue; + + undo_recptr = next_insert; + need_discard = true; + epoch = 0; + latest_discardxid = undoxid; + undoxid = InvalidTransactionId; + } + + /* Update the shared memory state. */ + LWLockAcquire(&log->discard_lock, LW_EXCLUSIVE); + + /* + * If no more pending undo logs then set the oldest transaction to + * InvalidTransactionId. + */ + if (log_complete) + { + log->oldest_xid = InvalidTransactionId; + log->oldest_xidepoch = 0; + } + else + { + log->oldest_xid = undoxid; + log->oldest_xidepoch = epoch; + } + + log->oldest_data = undo_recptr; + + LWLockRelease(&log->discard_lock); + + if (need_discard) + { + LWLockAcquire(&log->discard_update_lock, LW_EXCLUSIVE); + UndoLogDiscard(undo_recptr, latest_discardxid); + LWLockRelease(&log->discard_update_lock); + } + + break; + } + + /* + * This transaction is smaller than the xmin so lets jump to the next + * transaction. + */ + undo_recptr = next_urecptr; + latest_discardxid = undoxid; + + Assert(uur == NULL); + + need_discard = true; + } while (true); + + CommitTransactionCommand(); + + return FullTransactionIdFromEpochAndXid(epoch, undoxid); +} + +/* + * Discard the undo for all the transactions whose xid is smaller than + * oldestXmin + */ +void +UndoDiscard(TransactionId oldestXmin, bool *hibernate) +{ + FullTransactionId oldestXidHavingUndo = InvalidFullTransactionId; + UndoLogControl *log = NULL; + + /* + * Iterate through all the active logs and one-by-one try to discard the + * transactions that are old enough to matter. + * + * XXX Ideally we can arrange undo logs so that we can efficiently find + * those with oldest_xid < oldestXmin, but for now we'll just scan all of + * them. + */ + while ((log = UndoLogNext(log))) + { + FullTransactionId oldest_xid = InvalidFullTransactionId; + + /* + * If the log is already discarded, then we are done. It is important + * to first check this to ensure that tablespace containing this log + * doesn't get dropped concurrently. + */ + LWLockAcquire(&log->mutex, LW_SHARED); + if (log->meta.discard == log->meta.unlogged.insert) + { + LWLockRelease(&log->mutex); + continue; + } + LWLockRelease(&log->mutex); + + /* We can't process temporary undo logs. */ + if (log->meta.persistence == UNDO_TEMP) + continue; + + /* + * If the first xid of the undo log is smaller than the xmin the try + * to discard the undo log. + */ + if (!TransactionIdIsValid(log->oldest_xid) || + TransactionIdPrecedes(log->oldest_xid, oldestXmin)) + { + /* Process the undo log. */ + oldest_xid = UndoDiscardOneLog(log, oldestXmin, hibernate); + } + + /* If oldestXidHavingUndo is not yet initialized, initialize it. */ + if (!FullTransactionIdIsValid(oldestXidHavingUndo)) + oldestXidHavingUndo = oldest_xid; + else if (FullTransactionIdIsValid(oldest_xid) && + FullTransactionIdPrecedes(oldest_xid, oldestXidHavingUndo)) + oldestXidHavingUndo = oldest_xid; + } + + /* + * Update the oldestXidWithEpochHavingUndo in the shared memory. + * + * XXX In future if multiple worker can perform discard then we may need + * to use compare and swap for updating the shared memory value. + */ + if (FullTransactionIdIsValid(oldestXidHavingUndo)) + pg_atomic_write_u64(&ProcGlobal->oldestXidWithEpochHavingUndo, + U64FromFullTransactionId(oldestXidHavingUndo)); +} + +/* + * To discard all the logs. Particularly required in single user mode. + * At the commit time, discard all the undo logs. + */ +void +UndoLogDiscardAll(void) +{ + UndoLogControl *log = NULL; + + Assert(!IsUnderPostmaster); + + while ((log = UndoLogNext(log))) + { + /* + * Process the undo log. No locks are required for discard, since + * this called only in single-user mode. Similarly, no transaction id + * is required here because WAL-logging the xid till where the undo is + * discarded will not be required for single user mode. + */ + UndoLogDiscard(MakeUndoRecPtr(log->logno, log->meta.unlogged.insert), + InvalidTransactionId); + } + +} + +/* + * Discard the undo logs for temp tables. + */ +void +TempUndoDiscard(UndoLogNumber logno) +{ + UndoLogControl *log = UndoLogGet(logno, false); + + /* + * Discard the undo log for temp table only. Ensure that there is + * something to be discarded there. + */ + Assert (log->meta.persistence == UNDO_TEMP); + + /* Process the undo log. */ + UndoLogDiscard(MakeUndoRecPtr(log->logno, log->meta.unlogged.insert), + InvalidTransactionId); +} diff --git a/src/backend/access/undo/undoinsert.c b/src/backend/access/undo/undoinsert.c index 69f85e2..aa61372 100644 --- a/src/backend/access/undo/undoinsert.c +++ b/src/backend/access/undo/undoinsert.c @@ -173,7 +173,6 @@ static UnpackedUndoRecord *UndoGetOneRecord(UnpackedUndoRecord *urec, static void UndoRecordPrepareTransInfo(UndoRecPtr urecptr, UndoRecPtr xact_urp, XLogReaderState *xlog_record); -static void UndoRecordUpdateTransInfo(int idx); static int UndoGetBufferSlot(RelFileNode rnode, BlockNumber blk, ReadBufferMode rbm, UndoPersistence persistence, XLogReaderState *xlog_record); @@ -258,19 +257,18 @@ UndoRecordPrepareTransInfo(UndoRecPtr urecptr, UndoRecPtr xact_urp, return; /* - * Acquire the discard lock before accessing the undo record so that - * discard worker doesn't remove the record while we are in process of + * Acquire the discard_update_lock before accessing the undo record so + * that discard worker can't remove the record while we are in process of * reading it. */ - LWLockAcquire(&log->discard_lock, LW_SHARED); - - /* - * The absence of previous transaction's undo indicate that this backend - * is preparing its first undo in which case we have nothing to update. - * UndoRecordIsValid will release the lock if it returns false. - */ - if (!UndoRecordIsValid(log, xact_urp)) + LWLockAcquire(&log->discard_update_lock, LW_SHARED); + /* Check if it is already discarded. */ + if (UndoLogIsDiscarded(xact_urp)) + { + /* Release lock and return. */ + LWLockRelease(&log->discard_update_lock); return; + } UndoRecPtrAssignRelFileNode(rnode, xact_urp); cur_blk = UndoRecPtrGetBlockNum(xact_urp); @@ -305,7 +303,63 @@ UndoRecordPrepareTransInfo(UndoRecPtr urecptr, UndoRecPtr xact_urp, xact_urec_info[xact_urec_info_idx].urecptr = xact_urp; xact_urec_info_idx++; - LWLockRelease(&log->discard_lock); + LWLockRelease(&log->discard_update_lock); +} + +/* + * Update the progress of the undo record in the transaction header. + */ +void +PrepareUpdateUndoActionProgress(XLogReaderState *xlog_record, + UndoRecPtr urecptr, int progress) +{ + Buffer buffer = InvalidBuffer; + BlockNumber cur_blk; + RelFileNode rnode; + UndoLogNumber logno = UndoRecPtrGetLogNo(urecptr); + UndoLogControl *log; + Page page; + int starting_byte; + int bufidx; + int index = 0; + UnpackUndoContext ucontext = {0}; + + log = UndoLogGet(logno, false); + + if (log->meta.persistence == UNDO_TEMP) + return; + + UndoRecPtrAssignRelFileNode(rnode, urecptr); + cur_blk = UndoRecPtrGetBlockNum(urecptr); + starting_byte = UndoRecPtrGetPageOffset(urecptr); + + /* Initiate reading the undo record. */ + BeginUnpackUndo(&ucontext); + while (true) + { + bufidx = UndoGetBufferSlot(rnode, cur_blk, + RBM_NORMAL, + log->meta.persistence, + xlog_record); + + xact_urec_info[xact_urec_info_idx].idx_undo_buffers[index++] = bufidx; + buffer = undo_buffer[bufidx].buf; + page = BufferGetPage(buffer); + + UnpackUndoData(&ucontext, page, starting_byte); + + /* We just want to fetch upto transaction header so stop after that. */ + if (ucontext.stage > UNDO_DECODE_STAGE_TRANSACTION) + break; + + starting_byte = UndoLogBlockHeaderSize; + cur_blk++; + } + FinishUnpackUndo(&ucontext, &xact_urec_info[xact_urec_info_idx].uur); + + xact_urec_info[xact_urec_info_idx].urecptr = urecptr; + xact_urec_info[xact_urec_info_idx].uur.uur_progress = progress; + xact_urec_info_idx++; } @@ -315,7 +369,7 @@ UndoRecordPrepareTransInfo(UndoRecPtr urecptr, UndoRecPtr xact_urp, * UndoRecordPrepareTransInfo. This must be called under the critical section. * This will just overwrite the undo header not the data. */ -static void +void UndoRecordUpdateTransInfo(int idx) { Page page = NULL; @@ -326,6 +380,12 @@ UndoRecordUpdateTransInfo(int idx) urec_ptr = xact_urec_info[idx].urecptr; /* + * We've pinned the undo buffers in UndoRecordPrepareTransInfo, so + * it shouldn't be discarded. + */ + Assert(!UndoLogIsDiscarded(urec_ptr)); + + /* * Update the next transactions start urecptr in the transaction header. */ starting_byte = UndoRecPtrGetPageOffset(urec_ptr); @@ -731,7 +791,7 @@ PrepareUndoInsert(UnpackedUndoRecord *urec, FullTransactionId fxid, * criticalsection; it should never fail. */ void -InsertPreparedUndo(void) +InsertPreparedUndo(UndoPersistence upersistence) { Page page = NULL; int starting_byte; @@ -825,6 +885,12 @@ InsertPreparedUndo(void) Assert(bufidx < MAX_BUFFER_PER_UNDO); } while (true); + /* + * Set the current undo location for a transaction. This is required + * to perform rollback during abort of transaction. + */ + SetCurrentUndoLocation(urp, upersistence); + /* Advance the insert pointer past this record. */ UndoLogAdvance(urp, size); } diff --git a/src/backend/access/undo/undolog.c b/src/backend/access/undo/undolog.c index 028b282..1b58f7d 100644 --- a/src/backend/access/undo/undolog.c +++ b/src/backend/access/undo/undolog.c @@ -185,6 +185,8 @@ UndoLogShmemInit(void) LWTRANCHE_UNDOLOG); LWLockInitialize(&shared->logs[i].discard_lock, LWTRANCHE_UNDODISCARD); + LWLockInitialize(&shared->logs[i].discard_update_lock, + LWTRANCHE_DISCARD_UPDATE); LWLockInitialize(&shared->logs[i].rewind_lock, LWTRANCHE_REWIND); } @@ -1230,7 +1232,8 @@ UndoLogGetFirstValidRecord(UndoLogControl *log, bool *full) result = InvalidUndoRecPtr; else result = MakeUndoRecPtr(log->logno, log->meta.discard); - *full = log->meta.status == UNDO_LOG_STATUS_FULL; + if (full) + *full = log->meta.status == UNDO_LOG_STATUS_FULL; LWLockRelease(&log->mutex); return result; @@ -1778,6 +1781,8 @@ get_undo_log(UndoLogNumber logno, bool locked) * If the calling backend is currently attached to the undo log, that is not * possible, because logs can only reach UNDO_LOG_STATUS_DISCARDED after first * reaching UNDO_LOG_STATUS_FULL, and that only happens while detaching. + * + * TODO: analyze missing ok cases. */ UndoLogControl * UndoLogGet(UndoLogNumber logno, bool missing_ok) diff --git a/src/backend/access/undo/undorequest.c b/src/backend/access/undo/undorequest.c new file mode 100644 index 0000000..5d27882 --- /dev/null +++ b/src/backend/access/undo/undorequest.c @@ -0,0 +1,1491 @@ +/*------------------------------------------------------------------------- + * + * undorequest.c + * This contains routines to register and fetch undo action requests. + * + * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/backend/access/undo/undorequest.c + * + * To increase the efficiency of the rollbacks, we create three queues and + * a hash table for the rollback requests. A Xid based priority queue which + * will allow us to process the requests of older transactions and help us + * to move oldesdXidHavingUndo forward. A size-based queue which will help + * us to perform the rollbacks of larger aborts in a timely fashion, so that + * we don't get stuck while processing them during discard of the logs. + * An error queue to hold the requests for transactions that failed to apply + * its undo. The rollback hash table is used to avoid duplicate undo requests + * by backends and discard worker. The table must be able to accommodate all + * active undo requests. The undo requests must appear in both xid and size + * requests queues or neither. As of now we, process the requests from these + * queues in a round-robin fashion to give equal priority to all three type + * of requests. + * + * The rollback requests exceeding a certain threshold are pushed into both + * xid and size based queues. They are also registered in the hash table. + * + * To process the request, we get the request from one of the queues, search + * it in hash table and mark it as in-progress and then remove from the + * respective queue. Once we process all the actions, the request is removed + * from the hash table. If the other worker found the same request in other + * queue, it can just ignore the request (and remove it from that queue) if + * the request is not found in the hash table or is marked as in-progress. + * + * Also note that, if the work queues are full, then we put backpressure on + * backends to complete the requests by themselves. + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "miscadmin.h" + +#include "access/genam.h" +#include "access/heapam.h" +#include "access/transam.h" +#include "access/discardworker.h" +#include "access/undodiscard.h" +#include "access/undorequest.h" +#include "access/undoworker.h" +#include "access/xact.h" +#include "catalog/indexing.h" +#include "catalog/pg_database.h" +#include "lib/binaryheap.h" +#include "storage/shmem.h" +#include "storage/procarray.h" +#include "utils/fmgroids.h" +#include "access/xlog.h" + +#define ROLLBACK_REQUEST_QUEUE_SIZE 1024 +#define MAX_UNDO_WORK_QUEUES 3 +#define UNDO_PEEK_DEPTH 10 + +typedef struct +{ + binaryheap *bh; + union + { + UndoXidQueue *xid_elems; + UndoSizeQueue *size_elems; + UndoErrorQueue *error_elems; + } q_choice; +} UndoWorkerQueue; + +/* This is the hash table to store all the rollabck requests. */ +static HTAB *RollbackHT; +static UndoWorkerQueue UndoWorkerQueues[MAX_UNDO_WORK_QUEUES]; + +static uint32 cur_undo_queue = 0; + +/* Different operations for XID queue */ +#define InitXidQueue(bh, elems) \ +( \ + UndoWorkerQueues[XID_QUEUE].bh = bh, \ + UndoWorkerQueues[XID_QUEUE].q_choice.xid_elems = elems \ +) + +#define XidQueueIsEmpty() \ + (binaryheap_empty(UndoWorkerQueues[XID_QUEUE].bh)) + +#define GetXidQueueSize() \ + (binaryheap_cur_size(UndoWorkerQueues[XID_QUEUE].bh)) + +#define GetXidQueueElem(elem) \ + (UndoWorkerQueues[XID_QUEUE].q_choice.xid_elems[elem]) + +#define GetXidQueueTopElem() \ +( \ + AssertMacro(!binaryheap_empty(UndoWorkerQueues[XID_QUEUE].bh)), \ + DatumGetPointer(binaryheap_first(UndoWorkerQueues[XID_QUEUE].bh)) \ +) + +#define GetXidQueueNthElem(n) \ +( \ + AssertMacro(!XidQueueIsEmpty()), \ + DatumGetPointer(binaryheap_nth(UndoWorkerQueues[XID_QUEUE].bh, n)) \ +) + +#define SetXidQueueElem(elem, e_dbid, e_full_xid, e_start_urec_ptr) \ +( \ + GetXidQueueElem(elem).dbid = e_dbid, \ + GetXidQueueElem(elem).full_xid = e_full_xid, \ + GetXidQueueElem(elem).start_urec_ptr = e_start_urec_ptr \ +) + +/* Different operations for SIZE queue */ +#define InitSizeQueue(bh, elems) \ +( \ + UndoWorkerQueues[SIZE_QUEUE].bh = bh, \ + UndoWorkerQueues[SIZE_QUEUE].q_choice.size_elems = elems \ +) + +#define SizeQueueIsEmpty() \ + (binaryheap_empty(UndoWorkerQueues[SIZE_QUEUE].bh)) + +#define GetSizeQueueSize() \ + (binaryheap_cur_size(UndoWorkerQueues[SIZE_QUEUE].bh)) + +#define GetSizeQueueElem(elem) \ + (UndoWorkerQueues[SIZE_QUEUE].q_choice.size_elems[elem]) + +#define GetSizeQueueTopElem() \ +( \ + AssertMacro(!SizeQueueIsEmpty()), \ + DatumGetPointer(binaryheap_first(UndoWorkerQueues[SIZE_QUEUE].bh)) \ +) + +#define GetSizeQueueNthElem(n) \ +( \ + AssertMacro(!SizeQueueIsEmpty()), \ + DatumGetPointer(binaryheap_nth(UndoWorkerQueues[SIZE_QUEUE].bh, n)) \ +) + +#define SetSizeQueueElem(elem, e_dbid, e_full_xid, e_size, e_start_urec_ptr) \ +( \ + GetSizeQueueElem(elem).dbid = e_dbid, \ + GetSizeQueueElem(elem).full_xid = e_full_xid, \ + GetSizeQueueElem(elem).request_size = e_size, \ + GetSizeQueueElem(elem).start_urec_ptr = e_start_urec_ptr \ +) + +/* Different operations for Error queue */ +#define InitErrorQueue(bh, elems) \ +( \ + UndoWorkerQueues[ERROR_QUEUE].bh = bh, \ + UndoWorkerQueues[ERROR_QUEUE].q_choice.error_elems = elems \ +) + +#define ErrorQueueIsEmpty() \ + (binaryheap_empty(UndoWorkerQueues[ERROR_QUEUE].bh)) + +#define GetErrorQueueSize() \ + (binaryheap_cur_size(UndoWorkerQueues[ERROR_QUEUE].bh)) + +#define GetErrorQueueElem(elem) \ + (UndoWorkerQueues[ERROR_QUEUE].q_choice.error_elems[elem]) + +#define GetErrorQueueTopElem() \ +( \ + AssertMacro(!binaryheap_empty(UndoWorkerQueues[ERROR_QUEUE].bh)), \ + DatumGetPointer(binaryheap_first(UndoWorkerQueues[ERROR_QUEUE].bh)) \ +) + +#define GetErrorQueueNthElem(n) \ +( \ + AssertMacro(!ErrorQueueIsEmpty()), \ + DatumGetPointer(binaryheap_nth(UndoWorkerQueues[ERROR_QUEUE].bh, n)) \ +) + +#define SetErrorQueueElem(elem, e_dbid, e_full_xid, e_occurred_at) \ +( \ + GetErrorQueueElem(elem).dbid = e_dbid, \ + GetErrorQueueElem(elem).full_xid = e_full_xid, \ + GetErrorQueueElem(elem).err_occurred_at = e_occurred_at \ +) + +/* + * Binary heap comparison function to compare the age of transactions. + */ +static int +undo_age_comparator(Datum a, Datum b, void *arg) +{ + UndoXidQueue *xidQueueElem1 = (UndoXidQueue *) DatumGetPointer(a); + UndoXidQueue *xidQueueElem2 = (UndoXidQueue *) DatumGetPointer(b); + + if (FullTransactionIdPrecedes(xidQueueElem1->full_xid, + xidQueueElem2->full_xid)) + return 1; + else if (FullTransactionIdEquals(xidQueueElem1->full_xid, + xidQueueElem2->full_xid)) + return 0; + return -1; +} + +/* + * Binary heap comparison function to compare the size of transactions. + */ +static int +undo_size_comparator(Datum a, Datum b, void *arg) +{ + UndoSizeQueue *sizeQueueElem1 = (UndoSizeQueue *) DatumGetPointer(a); + UndoSizeQueue *sizeQueueElem2 = (UndoSizeQueue *) DatumGetPointer(b); + + if (sizeQueueElem1->request_size > sizeQueueElem2->request_size) + return 1; + else if (sizeQueueElem1->request_size == sizeQueueElem2->request_size) + return 0; + return -1; +} + +/* + * Binary heap comparison function to compare the time at which an error + * occurred for transactions. + */ +static int +undo_err_time_comparator(Datum a, Datum b, void *arg) +{ + UndoErrorQueue *errQueueElem1 = (UndoErrorQueue *) DatumGetPointer(a); + UndoErrorQueue *errQueueElem2 = (UndoErrorQueue *) DatumGetPointer(b); + + if (errQueueElem1->err_occurred_at < errQueueElem2->err_occurred_at) + return 1; + else if (errQueueElem1->err_occurred_at == errQueueElem2->err_occurred_at) + return 0; + return -1; +} + +static int +UndoXidQueueElemsShmSize(void) +{ + return mul_size(ROLLBACK_REQUEST_QUEUE_SIZE, sizeof(UndoXidQueue)); +} + +static int +UndoSizeQueueElemsShmSize(void) +{ + return mul_size(ROLLBACK_REQUEST_QUEUE_SIZE, sizeof(UndoSizeQueue)); +} + +static int +UndoErrorQueueElemsShmSize(void) +{ + return mul_size(ROLLBACK_REQUEST_QUEUE_SIZE, sizeof(UndoErrorQueue)); +} + +static int +UndoRollbackHashTableSize() +{ + /* + * The rollback hash table is used to avoid duplicate undo requests by + * backends and discard worker. The table must be able to accomodate all + * active undo requests. The undo requests must appear in both xid and + * size requests queues or neither. In same transaction, there can be two + * requests one for logged relations and another for unlogged relations. + * So, the rollback hash table size should be equal to two request queues, + * an error queue (currently this is same as request queue) and max + * backends. This will ensure that it won't get filled. + */ + return ((2 * ROLLBACK_REQUEST_QUEUE_SIZE) + ROLLBACK_REQUEST_QUEUE_SIZE + + MaxBackends); +} + +/* Get the first free element of xid based request array. */ +static int +UndoXidQueueGetFreeElem(void) +{ + int i; + + for (i = 0; i < ROLLBACK_REQUEST_QUEUE_SIZE; i++) + { + if (FullTransactionIdEquals(GetXidQueueElem(i).full_xid, + InvalidFullTransactionId)) + return i; + } + + /* we should never call this function when the request queue is full. */ + Assert(false); + + /* silence compiler. */ + return -1; +} + +/* Push an element in the xid based request queue */ +static void +PushXidQueueElem(UndoRequestInfo * urinfo) +{ + int elem = UndoXidQueueGetFreeElem(); + + SetXidQueueElem(elem, urinfo->dbid, urinfo->full_xid, urinfo->start_urec_ptr); + + binaryheap_add(UndoWorkerQueues[XID_QUEUE].bh, + PointerGetDatum(&GetXidQueueElem(elem))); +} + +/* Pop nth element from the xid based request queue */ +static UndoXidQueue * +PopXidQueueNthElem(int n) +{ + Datum elem; + + Assert(!XidQueueIsEmpty()); + elem = binaryheap_remove_nth(UndoWorkerQueues[XID_QUEUE].bh, n); + + return (UndoXidQueue *) (DatumGetPointer(elem)); +} + +/* Get the first free element of size based request array. */ +static inline int +UndoSizeQueueGetFreeElem(void) +{ + int i; + + for (i = 0; i < ROLLBACK_REQUEST_QUEUE_SIZE; i++) + { + if (FullTransactionIdEquals(GetSizeQueueElem(i).full_xid, + InvalidFullTransactionId)) + return i; + } + + /* we should never call this function when the request queue is full. */ + Assert(false); + + /* silence compiler. */ + return -1; +} + +/* + * Traverse the queue and remove dangling entries, if any. The queue + * entry is considered dangling if the hash table doesn't contain the + * corresponding entry. + */ +static int +RemoveOldElemsFromXidQueue() +{ + int nCleaned = 0; + int i = 0; + + Assert(LWLockHeldByMeInMode(RollbackRequestLock, LW_EXCLUSIVE)); + + while (i < GetXidQueueSize()) + { + RollbackHashEntry *rh; + RollbackHashKey hkey; + UndoXidQueue *elem = (UndoXidQueue *) GetXidQueueNthElem(i); + + hkey.full_xid = elem->full_xid; + hkey.start_urec_ptr = elem->start_urec_ptr; + rh = (RollbackHashEntry *) hash_search(RollbackHT, + (void *) &hkey, + HASH_FIND, NULL); + + /* + * If some undo worker is already processing the rollback request or + * it is already processed, then we drop that request from the queue. + */ + if (!rh || rh->in_progress) + { + elem->dbid = InvalidOid; + elem->full_xid = InvalidFullTransactionId; + nCleaned++; + binaryheap_remove_nth_unordered(UndoWorkerQueues[XID_QUEUE].bh, i); + + continue; + } + + i++; + } + + binaryheap_build(UndoWorkerQueues[XID_QUEUE].bh); + + return nCleaned; +} + +/* Push an element in the size based request queue */ +static void +PushSizeQueueElem(UndoRequestInfo * urinfo) +{ + int elem = UndoSizeQueueGetFreeElem(); + + SetSizeQueueElem(elem, urinfo->dbid, urinfo->full_xid, urinfo->request_size, + urinfo->start_urec_ptr); + + binaryheap_add(UndoWorkerQueues[SIZE_QUEUE].bh, + PointerGetDatum(&GetSizeQueueElem(elem))); +} + +/* Pop nth element from the size based request queue */ +static UndoSizeQueue * +PopSizeQueueNthElem(int n) +{ + Datum elem; + + Assert(!binaryheap_empty(UndoWorkerQueues[SIZE_QUEUE].bh)); + elem = binaryheap_remove_nth(UndoWorkerQueues[SIZE_QUEUE].bh, n); + + return (UndoSizeQueue *) DatumGetPointer(elem); +} + +/* + * Traverse the queue and remove dangling entries, if any. The queue + * entry is considered dangling if the hash table doesn't contain the + * corresponding entry. + */ +static int +RemoveOldElemsFromSizeQueue() +{ + int nCleaned = 0; + int i = 0; + + Assert(LWLockHeldByMeInMode(RollbackRequestLock, LW_EXCLUSIVE)); + + while (i < GetSizeQueueSize()) + { + RollbackHashEntry *rh; + RollbackHashKey hkey; + UndoSizeQueue *elem = (UndoSizeQueue *) GetSizeQueueNthElem(i); + + hkey.full_xid = elem->full_xid; + hkey.start_urec_ptr = elem->start_urec_ptr; + rh = (RollbackHashEntry *) hash_search(RollbackHT, + (void *) &hkey, + HASH_FIND, NULL); + + /* + * If some undo worker is already processing the rollback request or + * it is already processed, then we drop that request from the queue. + */ + if (!rh || rh->in_progress) + { + elem->dbid = InvalidOid; + elem->full_xid = InvalidFullTransactionId; + elem->request_size = 0; + binaryheap_remove_nth_unordered(UndoWorkerQueues[SIZE_QUEUE].bh, i); + nCleaned++; + continue; + } + + i++; + } + + binaryheap_build(UndoWorkerQueues[SIZE_QUEUE].bh); + + return nCleaned; +} + +/* Get the first free element of error time based request array. */ +static int +UndoErrorQueueGetFreeElem(void) +{ + int i; + + for (i = 0; i < ROLLBACK_REQUEST_QUEUE_SIZE; i++) + { + if (FullTransactionIdEquals(GetErrorQueueElem(i).full_xid, + InvalidFullTransactionId)) + return i; + } + + /* we should never call this function when the request queue is full. */ + Assert(false); + + /* silence compiler. */ + return -1; +} + +/* Push an element in the error time based request queue */ +static void +PushErrorQueueElem(volatile UndoRequestInfo * urinfo) +{ + int elem = UndoErrorQueueGetFreeElem(); + TimestampTz now = GetCurrentTimestamp(); + + SetErrorQueueElem(elem, urinfo->dbid, urinfo->full_xid, now); + + binaryheap_add(UndoWorkerQueues[ERROR_QUEUE].bh, + PointerGetDatum(&GetErrorQueueElem(elem))); +} + +/* Pop nth element from the error time based request queue */ +static UndoErrorQueue * +PopErrorQueueNthElem(int n) +{ + Datum elem; + + Assert(!ErrorQueueIsEmpty()); + elem = binaryheap_remove_nth(UndoWorkerQueues[ERROR_QUEUE].bh, n); + + return (UndoErrorQueue *) (DatumGetPointer(elem)); +} + +/* + * Traverse the queue and remove dangling entries, if any. The queue + * entry is considered dangling if the hash table doesn't contain the + * corresponding entry. + */ +static int +RemoveOldElemsFromErrorQueue() +{ + int nCleaned = 0; + int i = 0; + + Assert(LWLockHeldByMeInMode(RollbackRequestLock, LW_EXCLUSIVE)); + + while (i < GetErrorQueueSize()) + { + RollbackHashEntry *rh; + RollbackHashKey hkey; + UndoErrorQueue *elem = (UndoErrorQueue *) GetErrorQueueNthElem(i); + + hkey.full_xid = elem->full_xid; + hkey.start_urec_ptr = elem->start_urec_ptr; + rh = (RollbackHashEntry *) hash_search(RollbackHT, + (void *) &hkey, + HASH_FIND, NULL); + + /* + * If some undo worker is already processing the rollback request or + * it is already processed, then we drop that request from the queue. + */ + if (!rh || rh->in_progress) + { + elem->dbid = InvalidOid; + elem->full_xid = InvalidFullTransactionId; + elem->err_occurred_at = 0; + binaryheap_remove_nth_unordered(UndoWorkerQueues[ERROR_QUEUE].bh, i); + nCleaned++; + continue; + } + + i++; + } + + binaryheap_build(UndoWorkerQueues[ERROR_QUEUE].bh); + + return nCleaned; +} + +/* + * Remove nth work item from queue and clear the array element as well from + * the corresponding queue. + */ +static void +RemoveRequestFromQueue(UndoWorkerQueueType type, int n) +{ + if (type == XID_QUEUE) + { + UndoXidQueue *uXidQueueElem = (UndoXidQueue *) PopXidQueueNthElem(n); + + Assert(FullTransactionIdIsValid(uXidQueueElem->full_xid)); + uXidQueueElem->dbid = InvalidOid; + uXidQueueElem->full_xid = InvalidFullTransactionId; + } + else if (type == SIZE_QUEUE) + { + UndoSizeQueue *uSizeQueueElem = (UndoSizeQueue *) PopSizeQueueNthElem(n); + + Assert(FullTransactionIdIsValid(uSizeQueueElem->full_xid)); + uSizeQueueElem->dbid = InvalidOid; + uSizeQueueElem->full_xid = InvalidFullTransactionId; + uSizeQueueElem->request_size = 0; + } + else + { + UndoErrorQueue *uErrorQueueElem = (UndoErrorQueue *) PopErrorQueueNthElem(n); + + Assert(type == ERROR_QUEUE); + Assert(FullTransactionIdIsValid(uErrorQueueElem->full_xid)); + uErrorQueueElem->dbid = InvalidOid; + uErrorQueueElem->full_xid = InvalidFullTransactionId; + uErrorQueueElem->err_occurred_at = 0; + } +} + +/* + * Returns the hash key corresponding to the nth element of the specified + * queue. + */ +static bool +GetRollbackHashKeyFromQueue(UndoWorkerQueueType cur_queue, int n, + RollbackHashKey * hkey) +{ + if (cur_queue == XID_QUEUE) + { + UndoXidQueue *elem; + + /* check if there is a work in the next queue */ + if (GetXidQueueSize() <= n) + return false; + + elem = (UndoXidQueue *) GetXidQueueNthElem(n); + hkey->full_xid = elem->full_xid; + hkey->start_urec_ptr = elem->start_urec_ptr; + } + else if (cur_queue == SIZE_QUEUE) + { + UndoSizeQueue *elem; + + /* check if there is a work in the next queue */ + if (GetSizeQueueSize() <= n) + return false; + + elem = (UndoSizeQueue *) GetSizeQueueNthElem(n); + hkey->full_xid = elem->full_xid; + hkey->start_urec_ptr = elem->start_urec_ptr; + } + else + { + UndoErrorQueue *elem; + + /* It must be an error queue. */ + Assert(cur_queue == ERROR_QUEUE); + + /* check if there is a work in the next queue */ + if (GetErrorQueueSize() <= n) + { + cur_undo_queue++; + return false; + } + + elem = (UndoErrorQueue *) GetErrorQueueNthElem(n); + hkey->full_xid = elem->full_xid; + hkey->start_urec_ptr = elem->start_urec_ptr; + } + + return true; +} + +/* + * Fetch the end urec pointer for the transaction and the undo request size. + * + * end_urecptr_out - This is an INOUT parameter. If end undo pointer is + * specified, we use the same to calculate the size. Else, we calculate + * the end undo pointer and return the same. + * + * XXX: We don't calculate the exact undo size. We always skip the size of + * the last undo record (if not already discarded) from the calculation. This + * optimization allows us to skip fetching an undo record for the most + * frequent cases where the end pointer and current start pointer belong to + * the same log. A simple subtraction between them gives us the size. In + * future this function can be modified if someone needs the exact undo size. + * As of now, we use this function to calculate the undo size for inserting + * in the pending undo actions in undo worker's size queue. + */ +static uint64 +FindUndoEndLocationAndSize(UndoRecPtr start_urecptr, + UndoRecPtr *end_urecptr_out, + FullTransactionId full_xid) +{ + UnpackedUndoRecord *uur = NULL; + UndoLogControl *log = NULL; + UndoRecPtr urecptr = start_urecptr; + UndoRecPtr end_urecptr = InvalidUndoRecPtr; + uint64 sz = 0; + UndoPersistence persistence; + + Assert(urecptr != InvalidUndoRecPtr); + Assert(!TransactionIdIsInProgress(XidFromFullTransactionId(full_xid))); + + while (true) + { + UndoRecPtr next_urecptr = InvalidUndoRecPtr; + + if (*end_urecptr_out != InvalidUndoRecPtr) + { + /* + * Check whether end pointer and the current pointer belong to + * same log. In that case, we can get the size easily. + */ + if (UndoRecPtrGetLogNo(urecptr) == UndoRecPtrGetLogNo(*end_urecptr_out)) + { + sz += (*end_urecptr_out - urecptr); + break; + } + } + + /* + * Fetch the log and undo record corresponding the current undo + * pointer + */ + if ((log == NULL) || (UndoRecPtrGetLogNo(urecptr) != log->logno)) + log = UndoLogGet(UndoRecPtrGetLogNo(urecptr), false); + Assert(log != NULL); + persistence = log->meta.persistence; + + /* The corresponding log must be ahead urecptr. */ + Assert(MakeUndoRecPtr(log->logno, log->meta.unlogged.insert) >= urecptr); + + uur = UndoFetchRecord(urecptr, + InvalidBlockNumber, + InvalidOffsetNumber, + InvalidTransactionId, + NULL, NULL); + + /* + * If the corresponding undo record got rolled back and rewound, we + * return from here. + */ + if (uur == NULL) + break; + + /* The undo must belongs to a same transaction. */ + Assert(FullTransactionIdEquals(full_xid, + FullTransactionIdFromEpochAndXid(uur->uur_xidepoch, + uur->uur_xid))); + + /* + * Since this is the first undo record of this transaction in this + * log, this must include the transaction header. + */ + Assert(uur->uur_info & UREC_INFO_TRANSACTION); + next_urecptr = uur->uur_next; + + /* + * Case 1: If this is the last transaction in the log then calculate + * the latest urec pointer using next insert location of the undo log. + */ + if (!UndoRecPtrIsValid(next_urecptr)) + { + UndoLogOffset next_insert; + + /* + * While fetching the next insert location if the new transaction + * has already started in this log then lets re-fetch the undo + * record. + */ + next_insert = UndoLogGetNextInsertPtr(log->logno, uur->uur_xid); + if (!UndoRecPtrIsValid(next_insert)) + { + UndoRecordRelease(uur); + uur = NULL; + continue; + } + + /* + * If next_insert location points to the starting location of a + * new page, we should subtract the page header size from the + * insert location. + */ + if (UndoRecPtrGetPageOffset(next_insert) == UndoLogBlockHeaderSize) + next_insert -= UndoLogBlockHeaderSize; + + end_urecptr = UndoGetPrevUndoRecptr(next_insert, InvalidUndoRecPtr, + InvalidBuffer, persistence); + sz += (end_urecptr - urecptr); + Assert(UndoRecPtrIsValid(end_urecptr)); + break; + } + + + /* + * Case 2: The transaction ended in the same undo log, but this is not + * the last transaction. + */ + if (UndoRecPtrGetLogNo(next_urecptr) == log->logno) + { + end_urecptr = + UndoGetPrevUndoRecptr(next_urecptr, InvalidUndoRecPtr, + InvalidBuffer, persistence); + sz += (end_urecptr - urecptr); + Assert(UndoRecPtrIsValid(end_urecptr)); + break; + } + + /* + * Case 3: If transaction is overflowed to a different undolog and + * it's already discarded. It means that the undo actions for this + * transaction which are in the next log has already been executed. + */ + if (UndoLogIsDiscarded(next_urecptr)) + { + UndoLogOffset next_insert; + + next_insert = UndoLogGetNextInsertPtr(log->logno, uur->uur_xid); + Assert(UndoRecPtrIsValid(next_insert)); + + end_urecptr = UndoGetPrevUndoRecptr(next_insert, InvalidUndoRecPtr, + InvalidBuffer, persistence); + sz += (next_insert - urecptr); + Assert(UndoRecPtrIsValid(end_urecptr)); + break; + } + + /* + * Case 4: The transaction is overflowed to a different log, so + * restart the processing from then next log. + */ + { + UndoLogOffset next_insert; + + next_insert = UndoLogGetNextInsertPtr(log->logno, uur->uur_xid); + Assert(UndoRecPtrIsValid(next_insert)); + + end_urecptr = UndoGetPrevUndoRecptr(next_insert, InvalidUndoRecPtr, + InvalidBuffer, persistence); + sz += (next_insert - urecptr); + + UndoRecordRelease(uur); + uur = NULL; + } + + /* Follow the undo chain */ + urecptr = next_urecptr; + } + + if (uur != NULL) + UndoRecordRelease(uur); + + if (end_urecptr_out && (*end_urecptr_out == InvalidUndoRecPtr)) + *end_urecptr_out = end_urecptr; + + return sz; +} + + +/* + * Returns true, if we can push the rollback request to undo wrokers, false, + * otherwise. + */ +static bool +CanPushReqToUndoWorker(UndoRecPtr start_urec_ptr, UndoRecPtr end_urec_ptr, + uint64 req_size) +{ + /* + * This must be called after acquring RollbackRequestLock as we will check + * the binary heaps which can change. + */ + Assert(LWLockHeldByMeInMode(RollbackRequestLock, LW_EXCLUSIVE)); + + /* + * We normally push the rollback request to undo workers if the size of + * same is above a certain threshold. However, discard worker is allowed + * to push any size request provided there is a space in rollback request + * queue. This is mainly because discard worker can be processing the + * rollback requests after crash recovery when no backend is alive. + * + * We have a race condition where discard worker can process the request + * before the backend which has aborted the transaction in which case + * backend won't do anything. Normally, this won't happen because + * backends try to apply the undo actions immediately after marking the + * transaction as aborted in the clog. One way to avoid this race + * condition is that we register the request by backend in hash table but + * not in rollback queues before marking abort in clog and then later add + * them in rollback queues. However, we are not sure how important it is + * avoid such a race as this won't lead to any problem and OTOH, we might + * need some more trickery in the code to avoid such a race condition. + */ + if (req_size >= rollback_overflow_size * 1024 * 1024 || + IsDiscardProcess()) + { + if (GetXidQueueSize() >= ROLLBACK_REQUEST_QUEUE_SIZE || + GetSizeQueueSize() >= ROLLBACK_REQUEST_QUEUE_SIZE) + { + /* + * If one of the queues is full traverse both the queues and + * remove dangling entries, if any. The queue entry is considered + * dangling if the hash table doesn't contain the corresponding + * entry. It can happen due to two reasons (a) we have processed + * the entry from one of the queues, but not from the other. (b) + * the corresponding database has been dropped due to which we + * have removed the entries from hash table, but not from the + * queues. This is just a lazy cleanup, if we want we can remove + * the entries from the queues when we detect that the database is + * dropped and remove the corresponding entries from hash table. + */ + if (GetXidQueueSize() >= ROLLBACK_REQUEST_QUEUE_SIZE) + RemoveOldElemsFromXidQueue(); + if (GetSizeQueueSize() >= ROLLBACK_REQUEST_QUEUE_SIZE) + RemoveOldElemsFromSizeQueue(); + } + + if ((GetXidQueueSize() < ROLLBACK_REQUEST_QUEUE_SIZE)) + { + Assert(GetSizeQueueSize() < ROLLBACK_REQUEST_QUEUE_SIZE); + return true; + } + } + + return false; +} + +/* + * To return the size of the request queues and hash-table for rollbacks. + */ +int +PendingUndoShmemSize(void) +{ + Size size; + + size = hash_estimate_size(UndoRollbackHashTableSize(), sizeof(RollbackHashEntry)); + size = add_size(size, mul_size(MAX_UNDO_WORK_QUEUES, + binaryheap_shmem_size(ROLLBACK_REQUEST_QUEUE_SIZE))); + size = add_size(size, UndoXidQueueElemsShmSize()); + size = add_size(size, UndoSizeQueueElemsShmSize()); + size = add_size(size, UndoErrorQueueElemsShmSize()); + + return size; +} + +/* + * Initialize the hash-table and priority heap based queues for rollback + * requests in shared memory. + */ +void +PendingUndoShmemInit(void) +{ + HASHCTL info; + bool foundXidQueue = false; + bool foundSizeQueue = false; + bool foundErrorQueue = false; + binaryheap *bh; + UndoXidQueue *xid_elems; + UndoSizeQueue *size_elems; + UndoErrorQueue *error_elems; + + MemSet(&info, 0, sizeof(info)); + + info.keysize = sizeof(TransactionId); + info.entrysize = sizeof(RollbackHashEntry); + info.hash = tag_hash; + + RollbackHT = ShmemInitHash("Undo Actions Lookup Table", + UndoRollbackHashTableSize(), + UndoRollbackHashTableSize(), &info, + HASH_ELEM | HASH_FUNCTION | HASH_FIXED_SIZE); + + bh = binaryheap_allocate_shm("Undo Xid Binary Heap", + ROLLBACK_REQUEST_QUEUE_SIZE, + undo_age_comparator, + NULL); + + xid_elems = (UndoXidQueue *) ShmemInitStruct("Undo Xid Queue Elements", + UndoXidQueueElemsShmSize(), + &foundXidQueue); + + Assert(foundXidQueue || !IsUnderPostmaster); + + if (!IsUnderPostmaster) + memset(xid_elems, 0, sizeof(UndoXidQueue)); + + InitXidQueue(bh, xid_elems); + + bh = binaryheap_allocate_shm("Undo Size Binary Heap", + ROLLBACK_REQUEST_QUEUE_SIZE, + undo_size_comparator, + NULL); + size_elems = (UndoSizeQueue *) ShmemInitStruct("Undo Size Queue Elements", + UndoSizeQueueElemsShmSize(), + &foundSizeQueue); + Assert(foundSizeQueue || !IsUnderPostmaster); + + if (!IsUnderPostmaster) + memset(size_elems, 0, sizeof(UndoSizeQueue)); + + InitSizeQueue(bh, size_elems); + + bh = binaryheap_allocate_shm("Undo Error Binary Heap", + ROLLBACK_REQUEST_QUEUE_SIZE, + undo_err_time_comparator, + NULL); + + error_elems = (UndoErrorQueue *) ShmemInitStruct("Undo Size Queue Elements", + UndoErrorQueueElemsShmSize(), + &foundErrorQueue); + Assert(foundErrorQueue || !IsUnderPostmaster); + + if (!IsUnderPostmaster) + memset(error_elems, 0, sizeof(UndoSizeQueue)); + + InitErrorQueue(bh, error_elems); +} + +/* + * Returns true, if there is no pending undo apply work, false, otherwise. + */ +bool +UndoWorkerQueuesEmpty(void) +{ + if (XidQueueIsEmpty() && SizeQueueIsEmpty()) + return true; + + return false; +} + +/* Insert the request in both xid and size based queues. */ +void +InsertRequestIntoUndoQueues(UndoRequestInfo * urinfo) +{ + /* + * This must be called after acquring RollbackRequestLock as we will + * insert into the binary heaps which can change. + */ + Assert(LWLockHeldByMeInMode(RollbackRequestLock, LW_EXCLUSIVE)); + PushXidQueueElem(urinfo); + PushSizeQueueElem(urinfo); + + elog(DEBUG1, "Undo action pushed Xid: " UINT64_FORMAT ", Size: " UINT64_FORMAT ", " + "Start: " UndoRecPtrFormat ", End: " UndoRecPtrFormat "", + U64FromFullTransactionId(urinfo->full_xid), urinfo->request_size, + urinfo->start_urec_ptr, urinfo->end_urec_ptr); +} + +/* Insert the request into an error queue. */ +bool +InsertRequestIntoErrorUndoQueue(volatile UndoRequestInfo * urinfo) +{ + RollbackHashEntry *rh; + + LWLockAcquire(RollbackRequestLock, LW_EXCLUSIVE); + + /* We can't insert into an error queue if it is already full. */ + if (GetErrorQueueSize() >= ROLLBACK_REQUEST_QUEUE_SIZE) + { + int num_removed = 0; + + /* Try to remove few elements */ + num_removed = RemoveOldElemsFromErrorQueue(); + + if (num_removed == 0) + { + LWLockRelease(RollbackRequestLock); + return false; + } + } + + /* + * Mark the undo request in hash table as not in_progress so that undo + * launcher or other undo worker don't remove the entry from queues. + */ + rh = (RollbackHashEntry *) hash_search(RollbackHT, (void *) &urinfo->full_xid, + HASH_FIND, NULL); + rh->in_progress = false; + + /* Insert the request into error queue for processing it later. */ + PushErrorQueueElem(urinfo); + LWLockRelease(RollbackRequestLock); + + elog(DEBUG1, "Undo action pushed(error) Xid: " UINT64_FORMAT ", Size: " UINT64_FORMAT ", " + "Start: " UndoRecPtrFormat ", End: " UndoRecPtrFormat "", + U64FromFullTransactionId(urinfo->full_xid), urinfo->request_size, + urinfo->start_urec_ptr, urinfo->end_urec_ptr); + + return true; +} + +/* + * Set the undo worker queue from which the undo worker should start looking + * for work. + */ +void +SetUndoWorkerQueueStart(UndoWorkerQueueType undo_worker_queue) +{ + cur_undo_queue = undo_worker_queue; +} + +/* + * Get the next set of pending rollback request for undo worker. + * + * allow_peek - if true, peeks a few element from each queue to check whether + * any request matches current dbid. + * remove_from_queue - if true, picks an element from the queue whose dbid matches + * current dbid and remove it from the queue before returning the same to + * caller. + * urinfo - this is an OUT parameter that returns the details of undo request + * whose undo action is still pending. + * in_other_db_out - this is an OUT parameter. If we've not found any work for + * current database, but there are work for some other database, we set this + * parameter as true. + */ +bool +UndoGetWork(bool allow_peek, bool remove_from_queue, UndoRequestInfo * urinfo, + bool *in_other_db_out) +{ + int i; + bool found_work = false; + bool in_other_db = false; + + /* Reset the undo request info */ + ResetUndoRequestInfo(urinfo); + + /* Search the queues under lock as they can be modified concurrently. */ + LWLockAcquire(RollbackRequestLock, LW_EXCLUSIVE); + + /* Here, we check each of the work queues in a round-robin way. */ + for (i = 0; i < MAX_UNDO_WORK_QUEUES; i++) + { + RollbackHashKey hkey; + RollbackHashEntry *rh; + int cur_queue = (int) (cur_undo_queue % MAX_UNDO_WORK_QUEUES); + + if (!GetRollbackHashKeyFromQueue(cur_queue, 0, &hkey)) + { + cur_undo_queue++; + continue; + } + + rh = (RollbackHashEntry *) hash_search(RollbackHT, + (void *) &hkey, + HASH_FIND, NULL); + + /* + * If some undo worker is already processing the rollback request or + * it is already processed, then we drop that request from the queue + * and fetch the next entry from the queue. + */ + if (!rh || rh->in_progress) + { + RemoveRequestFromQueue(cur_queue, 0); + cur_undo_queue++; + continue; + } + + found_work = true; + + /* + * We've found a work for some database. If we don't want to remove + * the request, we return from here and spawn a worker process to + * apply the same. + */ + if (!remove_from_queue) + { + bool exists; + + StartTransactionCommand(); + exists = dbid_exists(rh->dbid); + CommitTransactionCommand(); + + /* + * If the database doesn't exist, just remove the request since we + * no longer need to apply the undo actions. + */ + if (!exists) + { + RemoveRequestFromQueue(cur_queue, 0); + RollbackHTRemoveEntry(rh->full_xid, rh->start_urec_ptr); + cur_undo_queue++; + continue; + } + + /* set the undo request info to process */ + SetUndoRequestInfoFromRHEntry(urinfo, rh, cur_queue); + + cur_undo_queue++; + LWLockRelease(RollbackRequestLock); + return true; + } + + /* + * The worker can perform this request if it is either not connected + * to any database or the request belongs to the same database to + * which it is connected. + */ + if ((MyDatabaseId == InvalidOid) || + (MyDatabaseId != InvalidOid && MyDatabaseId == rh->dbid)) + { + /* found a work for current database */ + if (in_other_db_out) + *in_other_db_out = false; + + /* + * Mark the undo request in hash table as in_progress so that + * other undo worker doesn't pick the same entry for rollback. + */ + rh->in_progress = true; + + /* set the undo request info to process */ + SetUndoRequestInfoFromRHEntry(urinfo, rh, cur_queue); + + /* + * Remove the request from queue so that other undo worker doesn't + * process the same entry. + */ + RemoveRequestFromQueue(cur_queue, 0); + + cur_undo_queue++; + LWLockRelease(RollbackRequestLock); + return true; + } + else + in_other_db = true; + + cur_undo_queue++; + } + + /* + * Iff a worker would need to switch databases less than + * undo_worker_quantum ms after starting, it peeks a few entries deep into + * each queue to see whether there's work for that database. This ensures + * that one worker doesn't have to restart quickly to switch databases. + */ + if (allow_peek) + { + int depth, + cur_queue; + RollbackHashKey hkey; + RollbackHashEntry *rh; + + /* + * We shouldn't have come here if we've found a work above for our + * database. + */ + Assert(!found_work || in_other_db); + + for (depth = 0; depth < UNDO_PEEK_DEPTH; depth++) + { + for (cur_queue = 0; cur_queue < MAX_UNDO_WORK_QUEUES; cur_queue++) + { + if (!GetRollbackHashKeyFromQueue(cur_queue, depth, &hkey)) + continue; + + rh = (RollbackHashEntry *) hash_search(RollbackHT, + (void *) &hkey, + HASH_FIND, NULL); + + /* + * If some undo worker is already processing the rollback + * request or it is already processed, then fetch the next + * entry from the queue. + */ + if (!rh || rh->in_progress) + continue; + + found_work = true; + + /* + * The worker can perform this request if it is either not + * connected to any database or the request belongs to the + * same database to which it is connected. + */ + if ((MyDatabaseId == InvalidOid) || + (MyDatabaseId != InvalidOid && MyDatabaseId == rh->dbid)) + { + /* found a work for current database */ + if (in_other_db_out) + *in_other_db_out = false; + + /* + * Mark the undo request in hash table as in_progress so + * that other undo worker doesn't pick the same entry for + * rollback. + */ + rh->in_progress = true; + + /* set the undo request info to process */ + SetUndoRequestInfoFromRHEntry(urinfo, rh, cur_queue); + + /* + * Remove the request from queue so that other undo worker + * doesn't process the same entry. + */ + RemoveRequestFromQueue(cur_queue, depth); + LWLockRelease(RollbackRequestLock); + return true; + } + else + in_other_db = true; + } + } + } + + LWLockRelease(RollbackRequestLock); + + if (in_other_db_out) + *in_other_db_out = in_other_db; + + return found_work; +} + +/* + * This function registers the rollback requests. + * + * Returns true, if the request is registered and will be processed by undo + * worker at some later point of time, false, otherwise in which case caller + * can process the undo request by itself. + * + * The caller may execute undo actions itself (a) if the entry is not already + * present in rollback hash table and can't be pushed to pending undo request + * queues, (b) if the entry is present, but the size is small enough that + * backend can execute by itself and undo worker hasn't started processing it + * yet. + */ +bool +RegisterRollbackReq(UndoRecPtr end_urec_ptr, UndoRecPtr start_urec_ptr, + Oid dbid, FullTransactionId full_xid) +{ + bool found = false; + bool can_push; + bool pushed = false; + bool request_registered = false; + RollbackHashEntry *rh; + uint64 req_size = 0; + + /* Do not push any rollback request if working in single user-mode */ + if (!IsUnderPostmaster) + return false; + + Assert(UndoRecPtrIsValid(start_urec_ptr)); + Assert(dbid != InvalidOid); + + /* + * There must be space to accommodate the new request. See + * UndoRollbackHashTableSize. + */ + Assert(!RollbackHTIsFull()); + + req_size = FindUndoEndLocationAndSize(start_urec_ptr, &end_urec_ptr, full_xid); + + /* The transaction got rolled back and rewound. */ + if (!UndoRecPtrIsValid(end_urec_ptr)) + return false; + + LWLockAcquire(RollbackRequestLock, LW_EXCLUSIVE); + + /* + * Check whether we can push the rollback request to the undo worker. This + * must be done under lock, see CanPushReqToUndoWorker. + */ + can_push = CanPushReqToUndoWorker(start_urec_ptr, end_urec_ptr, req_size); + + /* + * Backends always register the rollback request in the rollback hash + * table irrespective of whether we push it to undo worker. This ensures + * that discard worker won't try to process the request on which backend + * is working. OTOH, discard worker won't add an entry to the hash table + * unless it can push the request to undo worker. This is because + * otherwise backends might not process the request by themselves even + * though no undo worker is going to process such a request. + */ + if (can_push || + (!can_push && !IsDiscardProcess())) + { + RollbackHashKey hkey; + + hkey.full_xid = full_xid; + hkey.start_urec_ptr = start_urec_ptr; + + rh = (RollbackHashEntry *) hash_search(RollbackHT, &hkey, + HASH_ENTER_NULL, &found); + if (!rh) + { + LWLockRelease(RollbackRequestLock); + return false; + } + + /* We shouldn't try to add the same rollback request again. */ + if (!found) + { + rh->start_urec_ptr = start_urec_ptr; + rh->end_urec_ptr = end_urec_ptr; + rh->dbid = dbid; + rh->full_xid = full_xid; + rh->in_progress = false; + + if (can_push) + { + UndoRequestInfo urinfo; + + ResetUndoRequestInfo(&urinfo); + + urinfo.full_xid = rh->full_xid; + urinfo.start_urec_ptr = rh->start_urec_ptr; + urinfo.end_urec_ptr = rh->end_urec_ptr; + urinfo.dbid = rh->dbid; + urinfo.request_size = req_size; + + InsertRequestIntoUndoQueues(&urinfo); + + /* + * Indicates that the request will be processed by undo + * worker. + */ + request_registered = true; + pushed = true; + } + else + { + /* Indicates that the request can be processed by backend. */ + request_registered = false; + } + } + else if (!rh->in_progress && !can_push) + { + /* + * Indicates that the request can be processed by backend. This is + * the case where discard worker would have pushed the request of + * smaller size which backend itself can process. Mark the request + * as in-progress, so that discard worker doesn't try to process + * it. + */ + rh->in_progress = true; + request_registered = false; + } + else + { + /* Indicates that the request will be processed by undo worker. */ + request_registered = true; + } + } + + LWLockRelease(RollbackRequestLock); + + /* + * If we are able to successfully push the request, wakeup the undo worker + * so that it can be processed in a timely fashion. + */ + if (pushed) + WakeupUndoWorker(dbid); + + return request_registered; +} + +/* + * Remove the rollback request entry from the rollback hash table. + */ +void +RollbackHTRemoveEntry(FullTransactionId full_xid, UndoRecPtr start_urec_ptr) +{ + RollbackHashKey hkey; + + hkey.full_xid = full_xid; + hkey.start_urec_ptr = start_urec_ptr; + + LWLockAcquire(RollbackRequestLock, LW_EXCLUSIVE); + + hash_search(RollbackHT, &hkey, HASH_REMOVE, NULL); + + LWLockRelease(RollbackRequestLock); +} + +/* + * To check if the rollback requests in the hash table are all + * completed or not. This is required because we don't not want to + * expose RollbackHT in xact.c, where it is required to ensure + * that we push the resuests only when there is some space in + * the hash-table. + */ +bool +RollbackHTIsFull(void) +{ + bool result = false; + + LWLockAcquire(RollbackRequestLock, LW_SHARED); + + if (hash_get_num_entries(RollbackHT) >= UndoRollbackHashTableSize()) + result = true; + + LWLockRelease(RollbackRequestLock); + + return result; +} + +/* + * Remove all the entries for the given dbid. This is required in cases when + * the database is dropped and there were rollback requests pushed to the + * hash-table. + */ +void +RollbackHTCleanup(Oid dbid) +{ + RollbackHashEntry *rh; + HASH_SEQ_STATUS status; + + /* Fetch the rollback requests */ + LWLockAcquire(RollbackRequestLock, LW_SHARED); + + Assert(hash_get_num_entries(RollbackHT) <= UndoRollbackHashTableSize()); + hash_seq_init(&status, RollbackHT); + while (RollbackHT != NULL && + (rh = (RollbackHashEntry *) hash_seq_search(&status)) != NULL) + { + if (rh->dbid == dbid) + { + RollbackHashKey hkey; + + hkey.full_xid = rh->full_xid; + hkey.start_urec_ptr = rh->start_urec_ptr; + + hash_search(RollbackHT, &hkey, HASH_REMOVE, NULL); + } + } + + LWLockRelease(RollbackRequestLock); +} diff --git a/src/backend/access/undo/undoworker.c b/src/backend/access/undo/undoworker.c new file mode 100644 index 0000000..ce1956d --- /dev/null +++ b/src/backend/access/undo/undoworker.c @@ -0,0 +1,788 @@ +/*------------------------------------------------------------------------- + * + * undoworker.c + * undo launcher and undo worker process. + * + * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/postmaster/undoworker.c + * + * Undo launcher is responsible for launching the workers iff there is some + * work available in one of work queues and there are more workers available. + * To know more about work queues, see undorequest.c. The worker is launched + * to handle requests for a particular database. + * + * Each undo worker then start reading from one of the queue the requests for + * that particular database. A worker would peek into each queue for the + * requests from a particular database, if it needs to switch a database in + * less than undo_worker_quantum ms after starting. Also, if there is no + * work, it lingers for UNDO_WORKER_LINGER_MS. This avoids restarting + * the workers too frequently. + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "funcapi.h" +#include "miscadmin.h" +#include "pgstat.h" + +#include "access/genam.h" +#include "access/table.h" +#include "access/xact.h" +#include "access/undorequest.h" +#include "access/undoworker.h" + +#include "libpq/pqsignal.h" + +#include "postmaster/bgworker.h" +#include "postmaster/fork_process.h" +#include "postmaster/postmaster.h" + +#include "replication/slot.h" +#include "replication/worker_internal.h" + +#include "storage/ipc.h" +#include "storage/lmgr.h" +#include "storage/proc.h" +#include "storage/procarray.h" +#include "storage/procsignal.h" + +#include "tcop/tcopprot.h" + +#include "utils/hsearch.h" +#include "utils/memutils.h" +#include "utils/resowner.h" + + +/* + * GUC parameters + */ +int max_undo_workers = 5; + +/* + * If, a worker would need to switch databases less than undo_worker_quantum + * (10s as default) after starting, it peeks a few entries deep into each + * queue to see whether there's work for that database. + */ +int undo_worker_quantum_ms = 10000; + +/* max sleep time between cycles (100 milliseconds) */ +#define DEFAULT_NAPTIME_PER_CYCLE 100L + +/* + * Time for which undo worker can linger if there is no work, in + * milliseconds. + */ +#define UNDO_WORKER_LINGER_MS 10000 + +/* Flags set by signal handlers */ +static volatile sig_atomic_t got_SIGHUP = false; + +static TimestampTz last_xact_processed_at; + +typedef struct UndoApplyWorker +{ + /* Indicates if this slot is used or free. */ + bool in_use; + + /* Increased every time the slot is taken by new worker. */ + uint16 generation; + + /* Pointer to proc array. NULL if not running. */ + PGPROC *proc; + + /* Database id this worker is connected to. */ + Oid dbid; + + /* this tells whether worker is lingering. */ + bool lingering; + + /* + * This tells the undo worker from which undo worker queue it should start + * processing. + */ + UndoWorkerQueueType undo_worker_queue; +} UndoApplyWorker; + +UndoApplyWorker *MyUndoWorker = NULL; + +typedef struct UndoApplyCtxStruct +{ + /* Supervisor process. */ + pid_t launcher_pid; + + /* latch to wake up undo launcher. */ + Latch *undo_launcher_latch; + + /* Background workers. */ + UndoApplyWorker workers[FLEXIBLE_ARRAY_MEMBER]; +} UndoApplyCtxStruct; + +UndoApplyCtxStruct *UndoApplyCtx; + +static void UndoWorkerOnExit(int code, Datum arg); +static void UndoWorkerCleanup(UndoApplyWorker * worker); +static void UndoWorkerIsLingering(bool sleep); +static void UndoWorkerGetSlotInfo(int slot, UndoRequestInfo * urinfo); + +/* + * Cleanup function for undo worker launcher. + * + * Called on undo worker launcher exit. + */ +static void +UndoLauncherOnExit(int code, Datum arg) +{ + UndoApplyCtx->launcher_pid = 0; + UndoApplyCtx->undo_launcher_latch = NULL; +} + +/* SIGHUP: set flag to reload configuration at next convenient time */ +static void +UndoLauncherSighup(SIGNAL_ARGS) +{ + int save_errno = errno; + + got_SIGHUP = true; + + /* Waken anything waiting on the process latch */ + SetLatch(MyLatch); + + errno = save_errno; +} + +/* + * Wait for a background worker to start up and attach to the shmem context. + * + * This is only needed for cleaning up the shared memory in case the worker + * fails to attach. + */ +static void +WaitForUndoWorkerAttach(UndoApplyWorker * worker, + uint16 generation, + BackgroundWorkerHandle *handle) +{ + BgwHandleStatus status; + int rc; + + for (;;) + { + pid_t pid; + + CHECK_FOR_INTERRUPTS(); + + LWLockAcquire(UndoWorkerLock, LW_SHARED); + + /* Worker either died or has started; no need to do anything. */ + if (!worker->in_use || worker->proc) + { + LWLockRelease(UndoWorkerLock); + return; + } + + LWLockRelease(UndoWorkerLock); + + /* Check if worker has died before attaching, and clean up after it. */ + status = GetBackgroundWorkerPid(handle, &pid); + + if (status == BGWH_STOPPED) + { + LWLockAcquire(UndoWorkerLock, LW_EXCLUSIVE); + /* Ensure that this was indeed the worker we waited for. */ + if (generation == worker->generation) + UndoWorkerCleanup(worker); + LWLockRelease(UndoWorkerLock); + return; + } + + /* + * We need timeout because we generally don't get notified via latch + * about the worker attach. But we don't expect to have to wait long. + */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + 10L, WAIT_EVENT_BGWORKER_STARTUP); + + /* emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + } + + return; +} + +/* + * Attach to a slot. + */ +static void +UndoWorkerAttach(int slot) +{ + /* Block concurrent access. */ + LWLockAcquire(UndoWorkerLock, LW_EXCLUSIVE); + + MyUndoWorker = &UndoApplyCtx->workers[slot]; + + if (!MyUndoWorker->in_use) + { + LWLockRelease(UndoWorkerLock); + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("undo worker slot %d is empty, cannot attach", + slot))); + } + + if (MyUndoWorker->proc) + { + LWLockRelease(UndoWorkerLock); + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("undo worker slot %d is already used by " + "another worker, cannot attach", slot))); + } + + MyUndoWorker->proc = MyProc; + before_shmem_exit(UndoWorkerOnExit, (Datum) 0); + + LWLockRelease(UndoWorkerLock); +} + +/* + * Returns whether an undo worker is available. + */ +static int +IsUndoWorkerAvailable(void) +{ + int i; + int alive_workers = 0; + + LWLockAcquire(UndoWorkerLock, LW_EXCLUSIVE); + + /* Search for attached worker for a given db id. */ + for (i = 0; i < max_undo_workers; i++) + { + UndoApplyWorker *w = &UndoApplyCtx->workers[i]; + + if (w->in_use) + alive_workers++; + } + + LWLockRelease(UndoWorkerLock); + + return (alive_workers < max_undo_workers); +} + +static void +UndoWorkerIsLingering(bool sleep) +{ + /* Block concurrent access. */ + LWLockAcquire(UndoWorkerLock, LW_EXCLUSIVE); + + MyUndoWorker->lingering = sleep; + + LWLockRelease(UndoWorkerLock); +} + +/* Get the dbid and undo worker queue set by the undo launcher. */ +static void +UndoWorkerGetSlotInfo(int slot, UndoRequestInfo * urinfo) +{ + /* Block concurrent access. */ + LWLockAcquire(UndoWorkerLock, LW_EXCLUSIVE); + + MyUndoWorker = &UndoApplyCtx->workers[slot]; + + if (!MyUndoWorker->in_use) + { + LWLockRelease(UndoWorkerLock); + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("undo worker slot %d is empty", + slot))); + } + + urinfo->dbid = MyUndoWorker->dbid; + urinfo->undo_worker_queue = MyUndoWorker->undo_worker_queue; + + LWLockRelease(UndoWorkerLock); +} + +/* + * Start new undo apply background worker, if possible otherwise return false. + */ +static bool +UndoWorkerLaunch(UndoRequestInfo urinfo) +{ + BackgroundWorker bgw; + BackgroundWorkerHandle *bgw_handle; + uint16 generation; + int i; + int slot = 0; + UndoApplyWorker *worker = NULL; + + /* + * We need to do the modification of the shared memory under lock so that + * we have consistent view. + */ + LWLockAcquire(UndoWorkerLock, LW_EXCLUSIVE); + + /* Find unused worker slot. */ + for (i = 0; i < max_undo_workers; i++) + { + UndoApplyWorker *w = &UndoApplyCtx->workers[i]; + + if (!w->in_use) + { + worker = w; + slot = i; + break; + } + } + + /* We must not try to start a worker if there are no available workers. */ + Assert(worker != NULL); + + /* Prepare the worker slot. */ + worker->in_use = true; + worker->proc = NULL; + worker->dbid = urinfo.dbid; + worker->lingering = false; + worker->undo_worker_queue = urinfo.undo_worker_queue; + worker->generation++; + + generation = worker->generation; + LWLockRelease(UndoWorkerLock); + + /* Register the new dynamic worker. */ + memset(&bgw, 0, sizeof(bgw)); + bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | + BGWORKER_BACKEND_DATABASE_CONNECTION; + bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; + snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres"); + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "UndoWorkerMain"); + snprintf(bgw.bgw_type, BGW_MAXLEN, "undo apply worker"); + snprintf(bgw.bgw_name, BGW_MAXLEN, "undo apply worker"); + + bgw.bgw_restart_time = BGW_NEVER_RESTART; + bgw.bgw_notify_pid = MyProcPid; + bgw.bgw_main_arg = Int32GetDatum(slot); + + if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle)) + { + /* Failed to start worker, so clean up the worker slot. */ + LWLockAcquire(UndoWorkerLock, LW_EXCLUSIVE); + UndoWorkerCleanup(worker); + LWLockRelease(UndoWorkerLock); + + CommitTransactionCommand(); + + return false; + } + + /* Now wait until it attaches. */ + WaitForUndoWorkerAttach(worker, generation, bgw_handle); + + return true; +} + +/* + * Detach the worker (cleans up the worker info). + */ +static void +UndoWorkerDetach(void) +{ + /* Block concurrent access. */ + LWLockAcquire(UndoWorkerLock, LW_EXCLUSIVE); + + UndoWorkerCleanup(MyUndoWorker); + + LWLockRelease(UndoWorkerLock); +} + +/* + * Clean up worker info. + */ +static void +UndoWorkerCleanup(UndoApplyWorker * worker) +{ + Assert(LWLockHeldByMeInMode(UndoWorkerLock, LW_EXCLUSIVE)); + + worker->in_use = false; + worker->proc = NULL; + worker->dbid = InvalidOid; + worker->lingering = false; + worker->undo_worker_queue = InvalidUndoWorkerQueue; +} + +/* + * Cleanup function. + * + * Called on logical replication worker exit. + */ +static void +UndoWorkerOnExit(int code, Datum arg) +{ + UndoWorkerDetach(); +} + +/* + * Perform rollback request. We need to connect to the database for first + * request and that is required because we access system tables while + * performing undo actions. + */ +static void +UndoWorkerPerformRequest(UndoRequestInfo * urinfo) +{ + /* Should be connected to the database. */ + Assert(MyDatabaseId != InvalidOid); + + StartTransactionCommand(); + PG_TRY(); + { + execute_undo_actions(urinfo->full_xid, urinfo->end_urec_ptr, + urinfo->start_urec_ptr, true); + } + PG_CATCH(); + { + /* + * Register the unprocessed request in an error queue, so that it can + * be processed in a timely fashion. + */ + if (InsertRequestIntoErrorUndoQueue(urinfo)) + RollbackHTRemoveEntry(urinfo->full_xid, urinfo->start_urec_ptr); + + /* Send the error only to server log. */ + err_out_to_client(false); + EmitErrorReport(); + } + PG_END_TRY(); + CommitTransactionCommand(); +} + +/* + * UndoLauncherShmemSize + * Compute space needed for undo launcher shared memory + */ +Size +UndoLauncherShmemSize(void) +{ + Size size; + + /* + * Need the fixed struct and the array of LogicalRepWorker. + */ + size = sizeof(UndoApplyCtxStruct); + size = MAXALIGN(size); + size = add_size(size, mul_size(max_undo_workers, + sizeof(UndoApplyWorker))); + return size; +} + +/* + * UndoLauncherShmemInit + * Allocate and initialize undo worker launcher shared memory + */ +void +UndoLauncherShmemInit(void) +{ + bool found; + + UndoApplyCtx = (UndoApplyCtxStruct *) + ShmemInitStruct("Undo Worker Launcher Data", + UndoLauncherShmemSize(), + &found); + + if (!found) + memset(UndoApplyCtx, 0, UndoLauncherShmemSize()); +} + +/* + * UndoLauncherRegister + * Register a background worker running the undo worker launcher. + */ +void +UndoLauncherRegister(void) +{ + BackgroundWorker bgw; + + if (max_undo_workers == 0) + return; + + memset(&bgw, 0, sizeof(bgw)); + bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | + BGWORKER_BACKEND_DATABASE_CONNECTION; + bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; + snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres"); + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "UndoLauncherMain"); + snprintf(bgw.bgw_name, BGW_MAXLEN, + "undo worker launcher"); + snprintf(bgw.bgw_type, BGW_MAXLEN, + "undo worker launcher"); + bgw.bgw_restart_time = 5; + bgw.bgw_notify_pid = 0; + bgw.bgw_main_arg = (Datum)0; + + RegisterBackgroundWorker(&bgw); +} + +/* + * Main loop for the undo worker launcher process. + */ +void +UndoLauncherMain(Datum main_arg) +{ + UndoRequestInfo urinfo; + + ereport(DEBUG1, + (errmsg("undo launcher started"))); + + before_shmem_exit(UndoLauncherOnExit, (Datum) 0); + + Assert(UndoApplyCtx->launcher_pid == 0); + UndoApplyCtx->launcher_pid = MyProcPid; + + /* Establish signal handlers. */ + pqsignal(SIGHUP, UndoLauncherSighup); + pqsignal(SIGTERM, die); + BackgroundWorkerUnblockSignals(); + + /* Establish connection to nailed catalogs. */ + BackgroundWorkerInitializeConnection(NULL, NULL, 0); + + /* + * Advertise our latch that undo request enqueuer can use to wake us up + * while we're sleeping. + */ + UndoApplyCtx->undo_launcher_latch = &MyProc->procLatch; + + /* Enter main loop */ + for (;;) + { + int rc; + + CHECK_FOR_INTERRUPTS(); + + ResetUndoRequestInfo(&urinfo); + + if (UndoGetWork(false, false, &urinfo, NULL) && + IsUndoWorkerAvailable()) + UndoWorkerLaunch(urinfo); + + /* Wait for more work. */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + DEFAULT_NAPTIME_PER_CYCLE, + WAIT_EVENT_UNDO_LAUNCHER_MAIN); + + /* emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + + if (got_SIGHUP) + { + got_SIGHUP = false; + ProcessConfigFile(PGC_SIGHUP); + } + } +} + +/* + * UndoWorkerMain -- Main loop for the undo apply worker. + */ +void +UndoWorkerMain(Datum main_arg) +{ + UndoRequestInfo urinfo; + int worker_slot = DatumGetInt32(main_arg); + bool in_other_db; + bool found_work; + TimestampTz started_at; + + /* Setup signal handling */ + pqsignal(SIGTERM, die); + BackgroundWorkerUnblockSignals(); + + ResetUndoRequestInfo(&urinfo); + started_at = GetCurrentTimestamp(); + + /* + * Get the dbid where the wroker should connect to and get the worker + * request queue from which the worker should start looking for an undo + * request. + */ + UndoWorkerGetSlotInfo(worker_slot, &urinfo); + + /* Connect to the requested database. */ + BackgroundWorkerInitializeConnectionByOid(urinfo.dbid, 0, 0); + + /* + * Set the undo worker request queue from which the undo worker start + * looking for a work. + */ + SetUndoWorkerQueueStart(urinfo.undo_worker_queue); + + /* + * Before attaching the worker, fetch and remove the undo request for + * which the undo launcher has launched this worker. This restricts the + * undo launcher from launching multiple workers for the same request. + * But, it's possible that the undo request has already been processed by + * other in-progress undo worker. In that case, we enter the undo worker + * main loop and fetch the next request. + */ + found_work = UndoGetWork(false, true, &urinfo, &in_other_db); + + /* Attach to slot */ + UndoWorkerAttach(worker_slot); + + if (found_work && !in_other_db) + { + /* We must have got the pending undo request. */ + Assert(FullTransactionIdIsValid(urinfo.full_xid)); + UndoWorkerPerformRequest(&urinfo); + last_xact_processed_at = GetCurrentTimestamp(); + } + + for (;;) + { + int rc; + bool allow_peek; + + allow_peek = !TimestampDifferenceExceeds(started_at, + GetCurrentTimestamp(), + undo_worker_quantum_ms); + + found_work = UndoGetWork(allow_peek, true, &urinfo, &in_other_db); + + if (found_work && in_other_db) + { + proc_exit(0); + } + else if (found_work) + { + /* We must have got the pending undo request. */ + Assert(FullTransactionIdIsValid(urinfo.full_xid)); + UndoWorkerPerformRequest(&urinfo); + last_xact_processed_at = GetCurrentTimestamp(); + } + else + { + TimestampTz timeout = 0; + + timeout = TimestampTzPlusMilliseconds(last_xact_processed_at, + UNDO_WORKER_LINGER_MS); + + /* + * We don't need to linger if we have already spent + * UNDO_WORKER_LINGER_MS since last transaction has processed. + */ + if (timeout <= GetCurrentTimestamp()) + { + proc_exit(0); + } + + /* + * Update the shared state to reflect that this worker is + * lingering so that if there is new work request, requester can + * wake us up. + */ + UndoWorkerIsLingering(true); + + /* Wait for more work. */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + DEFAULT_NAPTIME_PER_CYCLE, + WAIT_EVENT_UNDO_WORKER_MAIN); + + /* reset the shared state. */ + UndoWorkerIsLingering(false); + + /* emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + + if (got_SIGHUP) + { + got_SIGHUP = false; + ProcessConfigFile(PGC_SIGHUP); + } + } + } + + proc_exit(0); +} + +/* + * Wake up undo worker so that undo requests can be processed in a timely + * fashion. + * + * We first try to wake up the lingering worker in the given database. If we + * found even one such worker, we are done. + * + * Next, we try to stop some worker which is lingering, but doesn't belong to + * the given database. We know that any worker which is lingering doesn't have + * any pending work, so it is fine to stop it when we know that there is going + * to be some work in the other database. + * + * Finally, we wakeup launcher so that it can either restart the worker we have + * stopped or find some other worker who can take up this request. + */ +void +WakeupUndoWorker(Oid dbid) +{ + int i; + + LWLockAcquire(UndoWorkerLock, LW_EXCLUSIVE); + + /* wake up lingering worker in the given database. */ + for (i = 0; i < max_undo_workers; i++) + { + UndoApplyWorker *w = &UndoApplyCtx->workers[i]; + + if (w->in_use && w->lingering && w->dbid == dbid) + { + SetLatch(&w->proc->procLatch); + + LWLockRelease(UndoWorkerLock); + return; + } + } + + /* + * Stop one of the lingering worker which is not processing the requests + * in the given database. + */ + for (i = 0; i < max_undo_workers; i++) + { + UndoApplyWorker *w = &UndoApplyCtx->workers[i]; + + if (w->in_use && w->lingering && w->dbid != dbid) + kill(w->proc->pid, SIGTERM); + } + + if (UndoApplyCtx->undo_launcher_latch) + SetLatch(UndoApplyCtx->undo_launcher_latch); + + LWLockRelease(UndoWorkerLock); + + return; +} diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 7855954..4ba173a 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -14370,6 +14370,10 @@ PreCommit_on_commit_actions(void) case ONCOMMIT_DROP: oids_to_drop = lappend_oid(oids_to_drop, oc->relid); break; + case ONCOMMIT_TEMP_DISCARD: + /* Discard temp table undo logs for temp tables. */ + TempUndoDiscard(oc->relid); + break; } } diff --git a/src/backend/lib/binaryheap.c b/src/backend/lib/binaryheap.c index a2c8967..6a41f2b 100644 --- a/src/backend/lib/binaryheap.c +++ b/src/backend/lib/binaryheap.c @@ -16,6 +16,7 @@ #include #include "lib/binaryheap.h" +#include "storage/shmem.h" static void sift_down(binaryheap *heap, int node_off); static void sift_up(binaryheap *heap, int node_off); @@ -48,6 +49,36 @@ binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg) } /* + * binaryheap_allocate_shm + * + * It works same as binaryheap_allocate except that the heap will be created + * in shared memory. + */ +binaryheap * +binaryheap_allocate_shm(const char *name, int capacity, + binaryheap_comparator compare, void *arg) +{ + Size sz; + binaryheap *heap; + bool foundBHeap; + + sz = binaryheap_shmem_size(capacity); + heap = (binaryheap *) ShmemInitStruct(name, sz, &foundBHeap); + + if (!foundBHeap) + { + heap->bh_space = capacity; + heap->bh_compare = compare; + heap->bh_arg = arg; + + heap->bh_size = 0; + heap->bh_has_heap_property = true; + } + + return heap; +} + +/* * binaryheap_reset * * Resets the heap to an empty state, losing its data content but not the @@ -163,6 +194,79 @@ binaryheap_first(binaryheap *heap) } /* + * binaryheap_nth + * + * Returns a pointer to the nth (0-based) node in the heap without modifying + * the heap. The caller must ensure that this routine is not used on an empty + * heap and is not called with n greater than or equal to the heap size. Always + * O(1). + */ +Datum +binaryheap_nth(binaryheap *heap, int n) +{ + Assert(!binaryheap_empty(heap)); + Assert(n < heap->bh_size); + return heap->bh_nodes[n]; +} + +/* + * binaryheap_remove_nth + * + * Removes the nth node (0-based) in the heap and returns a + * pointer to it after rebalancing the heap. The caller must ensure + * that this routine is not used on an empty heap. O(log n) worst + * case. + */ +Datum +binaryheap_remove_nth(binaryheap *heap, int n) +{ + Assert(!binaryheap_empty(heap) && heap->bh_has_heap_property); + Assert(n < heap->bh_size); + + + if (n == heap->bh_size - 1) + { + heap->bh_size--; + return heap->bh_nodes[heap->bh_size]; + } + + swap_nodes(heap, n, heap->bh_size - 1); + heap->bh_size--; + sift_down(heap, n); + + return heap->bh_nodes[heap->bh_size]; +} + +/* + * binaryheap_remove_nth_unordered + * + * Removes the nth node (0-based) in the heap and returns a pointer + * to it in O(1) without preserving the heap property. This is a + * convenience to remove elements quickly. To obtain a valid heap, + * one must call binaryheap_build() afterwards. The caller must ensure + * that this routine is not used on an empty heap. + */ +Datum +binaryheap_remove_nth_unordered(binaryheap *heap, int n) +{ + Assert(!binaryheap_empty(heap)); + Assert(n < heap->bh_size); + + heap->bh_has_heap_property = false; + + if (n == heap->bh_size - 1) + { + heap->bh_size--; + return heap->bh_nodes[heap->bh_size]; + } + + swap_nodes(heap, n, heap->bh_size - 1); + heap->bh_size--; + + return heap->bh_nodes[heap->bh_size]; +} + +/* * binaryheap_remove_first * * Removes the first (root, topmost) node in the heap and returns a @@ -305,3 +409,17 @@ sift_down(binaryheap *heap, int node_off) node_off = swap_off; } } + +/* + * Compute the size required by binary heap structure. + */ +Size +binaryheap_shmem_size(int capacity) +{ + Size sz; + + sz = add_size(offsetof(binaryheap, bh_nodes), + mul_size(sizeof(Datum), capacity)); + + return sz; +} diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index f5db5a8..0550d2c 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -15,7 +15,9 @@ #include #include "libpq/pqsignal.h" +#include "access/discardworker.h" #include "access/parallel.h" +#include "access/undoworker.h" #include "miscadmin.h" #include "pgstat.h" #include "port/atomics.h" @@ -129,6 +131,15 @@ static const struct }, { "ApplyWorkerMain", ApplyWorkerMain + }, + { + "UndoLauncherMain", UndoLauncherMain + }, + { + "UndoWorkerMain", UndoWorkerMain + }, + { + "DiscardWorkerMain", DiscardWorkerMain } }; diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 6b4e8ec..72fa92c 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -3595,6 +3595,15 @@ pgstat_get_wait_activity(WaitEventActivity w) case WAIT_EVENT_WAL_WRITER_MAIN: event_name = "WalWriterMain"; break; + case WAIT_EVENT_UNDO_DISCARD_WORKER_MAIN: + event_name = "UndoDiscardWorkerMain"; + break; + case WAIT_EVENT_UNDO_LAUNCHER_MAIN: + event_name = "UndoLauncherMain"; + break; + case WAIT_EVENT_UNDO_WORKER_MAIN: + event_name = "UndoWorkerMain"; + break; /* no default case, so that compiler will warn */ } diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index 067487f..cd4c26c 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -93,7 +93,9 @@ #include #endif +#include "access/discardworker.h" #include "access/transam.h" +#include "access/undoworker.h" #include "access/xlog.h" #include "bootstrap/bootstrap.h" #include "catalog/pg_control.h" @@ -246,6 +248,8 @@ bool enable_bonjour = false; char *bonjour_name; bool restart_after_crash = true; +bool disable_undo_launcher; + /* PIDs of special child processes; 0 when not running */ static pid_t StartupPID = 0, BgWriterPID = 0, @@ -981,6 +985,13 @@ PostmasterMain(int argc, char *argv[]) */ ApplyLauncherRegister(); + /* Register the Undo worker launcher. */ + if (!disable_undo_launcher) + UndoLauncherRegister(); + + /* Register the Undo Discard worker. */ + DiscardWorkerRegister(); + /* * process any libraries that should be preloaded at postmaster start */ diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 905acf2..4658113 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -159,6 +159,10 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record), buf.origptr); break; + case RM_UNDOACTION_ID: + /* Logical decoding is not yet implemented for undoactions. */ + Assert(0); + break; case RM_NEXT_ID: elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record)); } diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 7a4f452..d2fe91a 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -22,6 +22,8 @@ #include "access/subtrans.h" #include "access/twophase.h" #include "access/undolog.h" +#include "access/undorequest.h" +#include "access/undoworker.h" #include "commands/async.h" #include "miscadmin.h" #include "pgstat.h" @@ -152,6 +154,8 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) size = add_size(size, BTreeShmemSize()); size = add_size(size, SyncScanShmemSize()); size = add_size(size, AsyncShmemSize()); + size = add_size(size, PendingUndoShmemSize()); + size = add_size(size, UndoLauncherShmemSize()); #ifdef EXEC_BACKEND size = add_size(size, ShmemBackendArraySize()); #endif @@ -226,6 +230,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) SUBTRANSShmemInit(); MultiXactShmemInit(); InitBufferPool(); + PendingUndoShmemInit(); /* * Set up lock manager @@ -264,6 +269,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) WalSndShmemInit(); WalRcvShmemInit(); ApplyLauncherShmemInit(); + UndoLauncherShmemInit(); /* * Set up other modules that need some shared memory space diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt index 4b42a1c..1f65056 100644 --- a/src/backend/storage/lmgr/lwlocknames.txt +++ b/src/backend/storage/lmgr/lwlocknames.txt @@ -50,3 +50,5 @@ OldSnapshotTimeMapLock 42 LogicalRepWorkerLock 43 CLogTruncationLock 44 UndoLogLock 45 +RollbackRequestLock 46 +UndoWorkerLock 47 diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 0da5b19..a725854 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -296,6 +296,8 @@ InitProcGlobal(void) /* Create ProcStructLock spinlock, too */ ProcStructLock = (slock_t *) ShmemAlloc(sizeof(slock_t)); SpinLockInit(ProcStructLock); + + pg_atomic_init_u64(&ProcGlobal->oldestXidWithEpochHavingUndo, 0); } /* diff --git a/src/backend/utils/error/elog.c b/src/backend/utils/error/elog.c index 8b4720e..c665269 100644 --- a/src/backend/utils/error/elog.c +++ b/src/backend/utils/error/elog.c @@ -259,12 +259,18 @@ errstart(int elevel, const char *filename, int lineno, * 3. the error occurred after proc_exit has begun to run. (It's * proc_exit's responsibility to see that this doesn't turn into * infinite recursion!) + * + * 4. the error occurred while applying undo for a subtransaction. (We + * can't proceed without applying subtransaction's undo as the + * modifications made in that case must not be visible even if the + * main transaction commits.) */ if (elevel == ERROR) { if (PG_exception_stack == NULL || ExitOnAnyError || - proc_exit_inprogress) + proc_exit_inprogress || + applying_subxact_undo) elevel = FATAL; } @@ -1165,6 +1171,22 @@ internalerrquery(const char *query) } /* + * err_out_to_client --- sets whether to send error output to client or not. + */ +int +err_out_to_client(bool out_to_client) +{ + ErrorData *edata = &errordata[errordata_stack_depth]; + + /* we don't bother incrementing recursion_depth */ + CHECK_STACK_DEPTH(); + + edata->output_to_client = out_to_client; + + return 0; /* return value does not matter */ +} + +/* * err_generic_string -- used to set individual ErrorData string fields * identified by PG_DIAG_xxx codes. * @@ -1762,6 +1784,18 @@ pg_re_throw(void) __FILE__, __LINE__); } +/* + * pg_rethrow_as_fatal - Promote the error level to fatal. + */ +void +pg_rethrow_as_fatal(void) +{ + ErrorData *edata = &errordata[errordata_stack_depth]; + + Assert(errordata_stack_depth >= 0); + edata->elevel = FATAL; + PG_RE_THROW(); +} /* * GetErrorContextStack - Return the context stack, for display/diags diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c index a5950c1..323e1e3 100644 --- a/src/backend/utils/init/globals.c +++ b/src/backend/utils/init/globals.c @@ -121,6 +121,13 @@ bool allowSystemTableMods = false; int work_mem = 1024; int maintenance_work_mem = 16384; int max_parallel_maintenance_workers = 2; +int rollback_overflow_size = 64; + +/* + * We need this variable primarily to promote the error level to FATAL if we + * get any error while performing undo actions for a subtransaction. + */ +bool applying_subxact_undo = false; /* * Primary determinants of sizes of shared-memory structures. diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c index 4eba98b..27f6c62 100644 --- a/src/backend/utils/init/postinit.c +++ b/src/backend/utils/init/postinit.c @@ -453,6 +453,20 @@ InitCommunication(void) } } +/* + * Check whether the dbid exist or not. + */ +bool +dbid_exists(Oid dboid) +{ + bool result = false; + + Assert(IsTransactionState()); + result = (GetDatabaseTupleByOid(dboid) != NULL); + + return result; +} + /* * pg_split_opts -- split a string of options and append it to an argv array diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 9ba8ed7..6084b0a 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -32,6 +32,7 @@ #include "access/tableam.h" #include "access/transam.h" #include "access/twophase.h" +#include "access/undoworker.h" #include "access/xact.h" #include "access/xlog_internal.h" #include "catalog/namespace.h" @@ -1954,6 +1955,17 @@ static struct config_bool ConfigureNamesBool[] = NULL, NULL, NULL }, + { + {"disable_undo_launcher", PGC_POSTMASTER, DEVELOPER_OPTIONS, + gettext_noop("Decides whether to launch an undo worker."), + NULL, + GUC_NOT_IN_SAMPLE + }, + &disable_undo_launcher, + false, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL @@ -2923,6 +2935,16 @@ static struct config_int ConfigureNamesInt[] = 5000, 1, INT_MAX, NULL, NULL, NULL }, + { + {"rollback_overflow_size", PGC_USERSET, RESOURCES_MEM, + gettext_noop("Rollbacks greater than this size are done lazily"), + NULL, + GUC_UNIT_MB + }, + &rollback_overflow_size, + 64, 0, MAX_KILOBYTES, + NULL, NULL, NULL + }, { {"wal_segment_size", PGC_INTERNAL, PRESET_OPTIONS, diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 77bb7c2..9779082 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -749,4 +749,10 @@ # CUSTOMIZED OPTIONS #------------------------------------------------------------------------------ +# If often there are large transactions requiring rollbacks, then we can push +# them to undo-workers for better performance. The size specifeid by the +# parameter below, determines the minimum size of the rollback requests to be +# sent to the undo-worker. +# +#rollback_overflow_size = 64 # Add settings for extensions here diff --git a/src/backend/utils/resowner/resowner.c b/src/backend/utils/resowner/resowner.c index 64aafef..467e4e9 100644 --- a/src/backend/utils/resowner/resowner.c +++ b/src/backend/utils/resowner/resowner.c @@ -20,6 +20,7 @@ */ #include "postgres.h" +#include "access/xact.h" #include "jit/jit.h" #include "storage/bufmgr.h" #include "storage/ipc.h" @@ -556,6 +557,11 @@ ResourceOwnerReleaseInternal(ResourceOwner owner, } else if (phase == RESOURCE_RELEASE_LOCKS) { + /* + * For aborts, we don't want to release the locks immediately if we have + * some pending undo actions to perform. Instead, we release them after + * applying undo actions. See ApplyUndoActions. + */ if (isTopLevel) { /* @@ -565,7 +571,8 @@ ResourceOwnerReleaseInternal(ResourceOwner owner, */ if (owner == TopTransactionResourceOwner) { - ProcReleaseLocks(isCommit); + if (!CanPerformUndoActions()) + ProcReleaseLocks(isCommit); ReleasePredicateLocks(isCommit, false); } } @@ -598,7 +605,7 @@ ResourceOwnerReleaseInternal(ResourceOwner owner, if (isCommit) LockReassignCurrentOwner(locks, nlocks); - else + else if (!CanPerformUndoActions()) LockReleaseCurrentOwner(locks, nlocks); } } diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c index 7f0a179..f079d69 100644 --- a/src/bin/pg_rewind/parsexlog.c +++ b/src/bin/pg_rewind/parsexlog.c @@ -29,7 +29,7 @@ * RmgrNames is an array of resource manager names, to make error messages * a bit nicer. */ -#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \ +#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,undo,undo_desc) \ name, static const char *RmgrNames[RM_MAX_ID + 1] = { diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c index 938150d..396193b 100644 --- a/src/bin/pg_waldump/rmgrdesc.c +++ b/src/bin/pg_waldump/rmgrdesc.c @@ -20,6 +20,7 @@ #include "access/nbtxlog.h" #include "access/rmgr.h" #include "access/spgxlog.h" +#include "access/undoaction_xlog.h" #include "access/undolog_xlog.h" #include "access/xact.h" #include "access/xlog_internal.h" @@ -33,7 +34,7 @@ #include "storage/standbydefs.h" #include "utils/relmapper.h" -#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \ +#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,undo,undo_desc) \ { name, desc, identify}, const RmgrDescData RmgrDescTable[RM_MAX_ID + 1] = { diff --git a/src/include/access/discardworker.h b/src/include/access/discardworker.h new file mode 100644 index 0000000..5b065bf --- /dev/null +++ b/src/include/access/discardworker.h @@ -0,0 +1,20 @@ +/*------------------------------------------------------------------------- + * + * discardworker.h + * Exports from access/undo/discardworker.c. + * + * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/access/discardworker.h + * + *------------------------------------------------------------------------- + */ +#ifndef _DISCARDWORKER_H +#define _DISCARDWORKER_H + +extern void DiscardWorkerRegister(void); +extern void DiscardWorkerMain(Datum main_arg) pg_attribute_noreturn(); +extern bool IsDiscardProcess(void); + +#endif /* _DISCARDWORKER_H */ diff --git a/src/include/access/rmgr.h b/src/include/access/rmgr.h index c9b5c56..e1fb42a 100644 --- a/src/include/access/rmgr.h +++ b/src/include/access/rmgr.h @@ -19,7 +19,7 @@ typedef uint8 RmgrId; * Note: RM_MAX_ID must fit in RmgrId; widening that type will affect the XLOG * file format. */ -#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \ +#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,undo,undo_desc) \ symname, typedef enum RmgrIds diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h index 6945e3e..ef0f6ac 100644 --- a/src/include/access/rmgrlist.h +++ b/src/include/access/rmgrlist.h @@ -25,26 +25,27 @@ */ /* symbol name, textual name, redo, desc, identify, startup, cleanup */ -PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL) -PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL) -PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL) -PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL) -PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL) -PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL) -PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL, NULL) -PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL, NULL) -PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL, NULL) -PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL, heap_mask) -PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, heap_mask) -PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, NULL, NULL, btree_mask) -PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask) -PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask) -PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask) -PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask) -PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask) -PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask) -PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL) -PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL) -PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask) -PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL) -PG_RMGR(RM_UNDOLOG_ID, "UndoLog", undolog_redo, undolog_desc, undolog_identify, NULL, NULL, NULL) +PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL, heap_mask, NULL, NULL) +PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, heap_mask, NULL, NULL) +PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, NULL, NULL, btree_mask, NULL, NULL) +PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask, NULL, NULL) +PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask, NULL, NULL) +PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask, NULL, NULL) +PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask, NULL, NULL) +PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask, NULL, NULL) +PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask, NULL, NULL) +PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL, NULL) +PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_UNDOLOG_ID, "UndoLog", undolog_redo, undolog_desc, undolog_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_UNDOACTION_ID, "UndoAction", undoaction_redo, undoaction_desc, undoaction_identify, NULL, NULL, NULL, NULL, NULL) diff --git a/src/include/access/transam.h b/src/include/access/transam.h index 592c338..1477ddc 100644 --- a/src/include/access/transam.h +++ b/src/include/access/transam.h @@ -49,6 +49,7 @@ #define U64FromFullTransactionId(x) ((x).value) #define FullTransactionIdEquals(a, b) ((a).value == (b).value) #define FullTransactionIdPrecedes(a, b) ((a).value < (b).value) +#define FullTransactionIdFollows(a, b) ((a).value > (b).value) #define FullTransactionIdIsValid(x) TransactionIdIsValid(XidFromFullTransactionId(x)) #define InvalidFullTransactionId FullTransactionIdFromEpochAndXid(0, InvalidTransactionId) @@ -72,6 +73,16 @@ FullTransactionIdFromEpochAndXid(uint32 epoch, TransactionId xid) return result; } +static inline FullTransactionId +FullTransactionIdFromU64(uint64 fxid) +{ + FullTransactionId result; + + result.value = fxid; + + return result; +} + /* advance a transaction ID variable, handling wraparound correctly */ #define TransactionIdAdvance(dest) \ do { \ @@ -105,6 +116,14 @@ FullTransactionIdAdvance(FullTransactionId *dest) (AssertMacro(TransactionIdIsNormal(id1) && TransactionIdIsNormal(id2)), \ (int32) ((id1) - (id2)) > 0) +/* Extract xid from a value comprised of epoch and xid */ +#define GetXidFromEpochXid(epochxid) \ + ((uint32) (epochxid) & 0XFFFFFFFF) + +/* Extract epoch from a value comprised of epoch and xid */ +#define GetEpochFromEpochXid(epochxid) \ + ((uint32) ((epochxid) >> 32)) + /* ---------- * Object ID (OID) zero is InvalidOid. * @@ -225,6 +244,7 @@ extern XLogRecPtr TransactionIdGetCommitLSN(TransactionId xid); /* in transam/varsup.c */ extern FullTransactionId GetNewTransactionId(bool isSubXact); extern void AdvanceNextFullTransactionIdPastXid(TransactionId xid); +extern uint32 GetEpochForXid(TransactionId xid); extern FullTransactionId ReadNextFullTransactionId(void); extern void SetTransactionIdLimit(TransactionId oldest_datfrozenxid, Oid oldest_datoid); diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h index fcd1913..fdf4470 100644 --- a/src/include/access/twophase.h +++ b/src/include/access/twophase.h @@ -14,10 +14,12 @@ #ifndef TWOPHASE_H #define TWOPHASE_H +#include "access/undorequest.h" #include "access/xlogdefs.h" #include "access/xact.h" #include "datatype/timestamp.h" #include "storage/lock.h" +#include "access/undolog.h" /* * GlobalTransactionData is defined in twophase.c; other places have no @@ -41,7 +43,7 @@ extern GlobalTransaction MarkAsPreparing(TransactionId xid, const char *gid, TimestampTz prepared_at, Oid owner, Oid databaseid); -extern void StartPrepare(GlobalTransaction gxact); +extern void StartPrepare(GlobalTransaction gxact, UndoRecPtr *, UndoRecPtr *); extern void EndPrepare(GlobalTransaction gxact); extern bool StandbyTransactionIdIsPrepared(TransactionId xid); diff --git a/src/include/access/undoaction_xlog.h b/src/include/access/undoaction_xlog.h new file mode 100644 index 0000000..b9e65d1 --- /dev/null +++ b/src/include/access/undoaction_xlog.h @@ -0,0 +1,39 @@ +/*------------------------------------------------------------------------- + * + * undoaction_xlog.h + * undo action XLOG definitions + * + * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/access/undoaction_xlog.h + * + *------------------------------------------------------------------------- + */ +#ifndef UNDOACTION_XLOG_H +#define UNDOACTION_XLOG_H + +#include "access/undolog.h" +#include "access/xlogreader.h" +#include "lib/stringinfo.h" +#include "storage/off.h" + +/* + * WAL record definitions for undoactions.c's WAL operations + */ +#define XLOG_UNDO_APPLY_PROGRESS 0x00 + +/* This is what we need to know about undo apply progress */ +typedef struct xl_undoapply_progress +{ + UndoRecPtr urec_ptr; + uint32 progress; +} xl_undoapply_progress; + +#define SizeOfUndoActionProgress (offsetof(xl_undoapply_progress, progress) + sizeof(uint32)) + +extern void undoaction_redo(XLogReaderState *record); +extern void undoaction_desc(StringInfo buf, XLogReaderState *record); +extern const char *undoaction_identify(uint8 info); + +#endif /* UNDOACTION_XLOG_H */ diff --git a/src/include/access/undodiscard.h b/src/include/access/undodiscard.h new file mode 100644 index 0000000..652712c --- /dev/null +++ b/src/include/access/undodiscard.h @@ -0,0 +1,25 @@ +/*------------------------------------------------------------------------- + * + * undoinsert.h + * undo discard definitions + * + * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/access/undodiscard.h + * + *------------------------------------------------------------------------- + */ +#ifndef UNDODISCARD_H +#define UNDODISCARD_H + +#include "access/undolog.h" +#include "access/xlogdefs.h" +#include "catalog/pg_class.h" +#include "storage/lwlock.h" + +extern void UndoDiscard(TransactionId xmin, bool *hibernate); +extern void UndoLogDiscardAll(void); +extern void TempUndoDiscard(UndoLogNumber); + +#endif /* UNDODISCARD_H */ diff --git a/src/include/access/undoinsert.h b/src/include/access/undoinsert.h index 0a6df73..25a80bb 100644 --- a/src/include/access/undoinsert.h +++ b/src/include/access/undoinsert.h @@ -39,7 +39,7 @@ typedef bool (*SatisfyUndoRecordCallback) (UnpackedUndoRecord *urec, extern UndoRecPtr PrepareUndoInsert(UnpackedUndoRecord *, FullTransactionId fxid, UndoPersistence upersistence, XLogReaderState *xlog_record); -extern void InsertPreparedUndo(void); +extern void InsertPreparedUndo(UndoPersistence upersistence); extern void RegisterUndoLogBuffers(uint8 first_block_id); extern void UndoLogBuffersSetLSN(XLogRecPtr recptr); @@ -59,6 +59,9 @@ extern void UndoSetPrepareSize(int nrecords); extern UndoRecPtr UndoGetPrevUndoRecptr(UndoRecPtr urp, UndoRecPtr prevurp, Buffer buffer, UndoPersistence upersistence); +extern void PrepareUpdateUndoActionProgress(XLogReaderState *xlog_record, + UndoRecPtr urecptr, int progress); +extern void UndoRecordUpdateTransInfo(int idx); extern void AtAbort_ResetUndoBuffers(void); #endif /* UNDOINSERT_H */ diff --git a/src/include/access/undolog.h b/src/include/access/undolog.h index 2cfce8c..7b08a18 100644 --- a/src/include/access/undolog.h +++ b/src/include/access/undolog.h @@ -242,6 +242,19 @@ typedef struct UndoLogMetaData /* * The in-memory control object for an undo log. We have a fixed-sized array * of these. + * + * The following two locks are used to manage the discard process + * discard_lock - should be acquired for undo read to protect it from discard and + * discard worker will acquire this lock to update oldest_data. + * + * discard_update_lock - This lock will be acquired in exclusive mode by discard + * worker during the discard process and in shared mode to update the + * next_urp in previous transaction's start header. + * + * Two different locks are used so that the readers are not blocked during the + * actual discard but only during the update of shared memory variable which + * influences the visibility decision but the updaters need to be blocked for + * the entire discard process to ensure proper ordering of WAL records. */ typedef struct UndoLogControl { @@ -265,6 +278,7 @@ typedef struct UndoLogControl uint32 oldest_xidepoch; UndoRecPtr oldest_data; LWLock discard_lock; /* prevents discarding while reading */ + LWLock discard_update_lock; /* block updaters during discard */ LWLock rewind_lock; /* prevent rewinding while reading */ } UndoLogControl; diff --git a/src/include/access/undorequest.h b/src/include/access/undorequest.h new file mode 100644 index 0000000..b7f68c3 --- /dev/null +++ b/src/include/access/undorequest.h @@ -0,0 +1,135 @@ +/*------------------------------------------------------------------------- + * + * undorequest.h + * Exports from undo/undorequest.c. + * + * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group + * + * src/include/access/undorequest.h + * + *------------------------------------------------------------------------- + */ +#ifndef _UNDOREQUEST_H +#define _UNDOREQUEST_H + +#include "access/transam.h" +#include "access/undoinsert.h" +#include "datatype/timestamp.h" +#include "utils/relcache.h" + + +/* different types of undo worker */ +typedef enum +{ + XID_QUEUE = 0, + SIZE_QUEUE = 1, + ERROR_QUEUE +} UndoWorkerQueueType; + +#define InvalidUndoWorkerQueue -1 + +/* Remembers the last seen RecentGlobalXmin */ +TransactionId latestRecentGlobalXmin; + +/* This is the data structure for each hash table entry for rollbacks. */ +typedef struct RollbackHashEntry +{ + FullTransactionId full_xid; /* must be first entry */ + UndoRecPtr start_urec_ptr; + UndoRecPtr end_urec_ptr; + Oid dbid; + bool in_progress; /* indicates that undo actions are being processed */ +} RollbackHashEntry; + +/* This is the data structure for each hash table key for rollbacks. */ +typedef struct RollbackHashKey +{ + FullTransactionId full_xid; + UndoRecPtr start_urec_ptr; +} RollbackHashKey; + +/* This is an entry for undo request queue that is sorted by xid. */ +typedef struct UndoXidQueue +{ + FullTransactionId full_xid; + UndoRecPtr start_urec_ptr; + Oid dbid; +} UndoXidQueue; + +/* This is an entry for undo request queue that is sorted by size. */ +typedef struct UndoSizeQueue +{ + FullTransactionId full_xid; + UndoRecPtr start_urec_ptr; + Oid dbid; + uint64 request_size; +} UndoSizeQueue; + +/* + * This is an entry for undo request queue that is sorted by time at which an + * error has occurred. + */ +typedef struct UndoErrorQueue +{ + FullTransactionId full_xid; + UndoRecPtr start_urec_ptr; + Oid dbid; + TimestampTz err_occurred_at; +} UndoErrorQueue; + +/* undo request information */ +typedef struct UndoRequestInfo +{ + FullTransactionId full_xid; + UndoRecPtr start_urec_ptr; + UndoRecPtr end_urec_ptr; + Oid dbid; + uint64 request_size; + UndoWorkerQueueType undo_worker_queue; +} UndoRequestInfo; + +/* Reset the undo request info */ +#define ResetUndoRequestInfo(urinfo) \ +( \ + (urinfo)->full_xid = InvalidFullTransactionId, \ + (urinfo)->start_urec_ptr = InvalidUndoRecPtr, \ + (urinfo)->end_urec_ptr = InvalidUndoRecPtr, \ + (urinfo)->dbid = InvalidOid, \ + (urinfo)->request_size = 0, \ + (urinfo)->undo_worker_queue = InvalidUndoWorkerQueue \ +) + +/* set the undo request info from the rollback request */ +#define SetUndoRequestInfoFromRHEntry(urinfo, rh, cur_queue) \ +( \ + urinfo->full_xid = rh->full_xid, \ + urinfo->start_urec_ptr = rh->start_urec_ptr, \ + urinfo->end_urec_ptr = rh->end_urec_ptr, \ + urinfo->dbid = rh->dbid, \ + urinfo->undo_worker_queue = cur_queue \ +) + +/* Exposed functions for rollback request queues. */ +extern int PendingUndoShmemSize(void); +extern void PendingUndoShmemInit(void); +extern bool UndoWorkerQueuesEmpty(void); +extern void InsertRequestIntoUndoQueues(UndoRequestInfo *urinfo); +extern bool InsertRequestIntoErrorUndoQueue(volatile UndoRequestInfo *urinfo); +extern void SetUndoWorkerQueueStart(UndoWorkerQueueType undo_worker_queue); +extern bool UndoGetWork(bool allow_peek, bool is_undo_launcher, + UndoRequestInfo *urinfo, bool *in_other_db); +/* Exposed functions for rollback hash table. */ +extern bool RegisterRollbackReq(UndoRecPtr end_urec_ptr, UndoRecPtr start_urec_ptr, + Oid dbid, FullTransactionId full_xid); +extern void RollbackHTRemoveEntry(FullTransactionId full_xid, UndoRecPtr start_urec_ptr); +extern bool RollbackHTIsFull(void); +extern void RollbackHTCleanup(Oid dbid); + +/* functions exposed from undoaction.c */ +extern void execute_undo_actions(FullTransactionId full_xid, UndoRecPtr from_urecptr, + UndoRecPtr to_urecptr, bool nopartial); +extern bool execute_undo_actions_page(UndoRecInfo *urp_array, int first_idx, + int last_idx, Oid reloid, FullTransactionId full_xid, + BlockNumber blkno, bool blk_chain_complete); + +#endif /* _UNDOREQUEST_H */ diff --git a/src/include/access/undoworker.h b/src/include/access/undoworker.h new file mode 100644 index 0000000..e2a61ff --- /dev/null +++ b/src/include/access/undoworker.h @@ -0,0 +1,27 @@ +/*------------------------------------------------------------------------- + * + * undoworker.h + * Exports from undoworker.c. + * + * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/access/undoworker.h + * + *------------------------------------------------------------------------- + */ +#ifndef _UNDOWORKER_H +#define _UNDOWORKER_H + +/* GUC options */ +/* undo worker sleep time between rounds */ +extern int UndoWorkerDelay; + +extern Size UndoLauncherShmemSize(void); +extern void UndoLauncherShmemInit(void); +extern void UndoLauncherRegister(void); +extern void UndoLauncherMain(Datum main_arg); +extern void UndoWorkerMain(Datum main_arg) pg_attribute_noreturn(); +extern void WakeupUndoWorker(Oid dbid); + +#endif /* _UNDOWORKER_H */ diff --git a/src/include/access/xact.h b/src/include/access/xact.h index 61df235..ca7924e 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -22,6 +22,7 @@ #include "storage/relfilenode.h" #include "storage/sinval.h" #include "utils/datetime.h" +#include "utils/resowner.h" /* * Maximum size of Global Transaction ID (including '\0'). @@ -429,6 +430,11 @@ extern XLogRecPtr XactLogAbortRecord(TimestampTz abort_time, const char *twophase_gid); extern void xact_redo(XLogReaderState *record); +extern void ApplyUndoActions(void); +extern void SetUndoActionsInfo(void); +extern void ResetUndoActionsInfo(void); +extern bool CanPerformUndoActions(void); + /* xactdesc.c */ extern void xact_desc(StringInfo buf, XLogReaderState *record); extern const char *xact_identify(uint8 info); @@ -440,5 +446,7 @@ extern void ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_ab extern void EnterParallelMode(void); extern void ExitParallelMode(void); extern bool IsInParallelMode(void); +extern void SetCurrentUndoLocation(UndoRecPtr urec_ptr, + UndoPersistence upersistence); #endif /* XACT_H */ diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h index 8b1348c..04440ed 100644 --- a/src/include/access/xlog_internal.h +++ b/src/include/access/xlog_internal.h @@ -21,8 +21,11 @@ #include "access/xlogdefs.h" #include "access/xlogreader.h" +#include "access/undorecord.h" +#include "access/undorequest.h" #include "datatype/timestamp.h" #include "lib/stringinfo.h" +#include "nodes/pg_list.h" #include "pgtime.h" #include "storage/block.h" #include "storage/relfilenode.h" @@ -295,9 +298,13 @@ typedef struct RmgrData void (*rm_startup) (void); void (*rm_cleanup) (void); void (*rm_mask) (char *pagedata, BlockNumber blkno); + bool (*rm_undo) (UndoRecInfo *urp_array, int first_idx, int last_idx, + Oid reloid, FullTransactionId full_xid, BlockNumber blkno, + bool blk_chain_complete); + void (*rm_undo_desc) (StringInfo buf, UnpackedUndoRecord *record); } RmgrData; -extern const RmgrData RmgrTable[]; +extern PGDLLIMPORT const RmgrData RmgrTable[]; /* * Exported to support xlog switching from checkpointer diff --git a/src/include/access/xlogrecord.h b/src/include/access/xlogrecord.h index 9375e54..08584fc 100644 --- a/src/include/access/xlogrecord.h +++ b/src/include/access/xlogrecord.h @@ -13,6 +13,7 @@ #include "access/rmgr.h" #include "access/xlogdefs.h" +#include "access/transam.h" #include "port/pg_crc32c.h" #include "storage/block.h" #include "storage/relfilenode.h" diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h index ff98d9e..0fdd2f8 100644 --- a/src/include/catalog/pg_control.h +++ b/src/include/catalog/pg_control.h @@ -61,6 +61,13 @@ typedef struct CheckPoint * set to InvalidTransactionId. */ TransactionId oldestActiveXid; + + /* + * Oldest transaction id with epoc which is having undo. Include this + * value in the checkpoint record so that whenever server starts we get + * proper value. + */ + uint64 oldestXidWithEpochHavingUndo; } CheckPoint; /* XLOG info values for XLOG rmgr */ diff --git a/src/include/lib/binaryheap.h b/src/include/lib/binaryheap.h index 92dab66..c6fa89b 100644 --- a/src/include/lib/binaryheap.h +++ b/src/include/lib/binaryheap.h @@ -40,6 +40,9 @@ typedef struct binaryheap extern binaryheap *binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg); +extern binaryheap *binaryheap_allocate_shm(const char *name, int capacity, + binaryheap_comparator compare, + void *arg); extern void binaryheap_reset(binaryheap *heap); extern void binaryheap_free(binaryheap *heap); extern void binaryheap_add_unordered(binaryheap *heap, Datum d); @@ -48,7 +51,12 @@ extern void binaryheap_add(binaryheap *heap, Datum d); extern Datum binaryheap_first(binaryheap *heap); extern Datum binaryheap_remove_first(binaryheap *heap); extern void binaryheap_replace_first(binaryheap *heap, Datum d); +extern Size binaryheap_shmem_size(int capacity); +extern Datum binaryheap_nth(binaryheap *heap, int n); +extern Datum binaryheap_remove_nth(binaryheap *heap, int n); +extern Datum binaryheap_remove_nth_unordered(binaryheap *heap, int n); #define binaryheap_empty(h) ((h)->bh_size == 0) +#define binaryheap_cur_size(h) ((h)->bh_size) #endif /* BINARYHEAP_H */ diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h index b677c7e..acc7323 100644 --- a/src/include/miscadmin.h +++ b/src/include/miscadmin.h @@ -170,6 +170,8 @@ extern PGDLLIMPORT struct Latch *MyLatch; extern int32 MyCancelKey; extern int MyPMChildSlot; +extern bool applying_subxact_undo; + extern char OutputFileName[]; extern PGDLLIMPORT char my_exec_path[]; extern char pkglib_path[]; @@ -245,6 +247,7 @@ extern PGDLLIMPORT bool allowSystemTableMods; extern PGDLLIMPORT int work_mem; extern PGDLLIMPORT int maintenance_work_mem; extern PGDLLIMPORT int max_parallel_maintenance_workers; +extern PGDLLIMPORT int rollback_overflow_size; extern int VacuumCostPageHit; extern int VacuumCostPageMiss; @@ -422,6 +425,7 @@ extern AuxProcType MyAuxProcType; *****************************************************************************/ /* in utils/init/postinit.c */ +extern bool dbid_exists(Oid dboid); extern void pg_split_opts(char **argv, int *argcp, const char *optstr); extern void InitializeMaxBackends(void); extern void InitPostgres(const char *in_dbname, Oid dboid, const char *username, diff --git a/src/include/nodes/primnodes.h b/src/include/nodes/primnodes.h index f9b1cf2..3cda35d 100644 --- a/src/include/nodes/primnodes.h +++ b/src/include/nodes/primnodes.h @@ -49,7 +49,8 @@ typedef enum OnCommitAction ONCOMMIT_NOOP, /* No ON COMMIT clause (do nothing) */ ONCOMMIT_PRESERVE_ROWS, /* ON COMMIT PRESERVE ROWS (do nothing) */ ONCOMMIT_DELETE_ROWS, /* ON COMMIT DELETE ROWS */ - ONCOMMIT_DROP /* ON COMMIT DROP */ + ONCOMMIT_DROP, /* ON COMMIT DROP */ + ONCOMMIT_TEMP_DISCARD /* ON COMMIT discard temp table undo logs */ } OnCommitAction; /* diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 7cd71bd..013c0eb 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -781,7 +781,10 @@ typedef enum WAIT_EVENT_SYSLOGGER_MAIN, WAIT_EVENT_WAL_RECEIVER_MAIN, WAIT_EVENT_WAL_SENDER_MAIN, - WAIT_EVENT_WAL_WRITER_MAIN + WAIT_EVENT_WAL_WRITER_MAIN, + WAIT_EVENT_UNDO_DISCARD_WORKER_MAIN, + WAIT_EVENT_UNDO_LAUNCHER_MAIN, + WAIT_EVENT_UNDO_WORKER_MAIN } WaitEventActivity; /* ---------- diff --git a/src/include/postmaster/postmaster.h b/src/include/postmaster/postmaster.h index 8ccd2af..6212a18 100644 --- a/src/include/postmaster/postmaster.h +++ b/src/include/postmaster/postmaster.h @@ -29,6 +29,8 @@ extern bool log_hostname; extern bool enable_bonjour; extern char *bonjour_name; extern bool restart_after_crash; +extern bool disable_undo_launcher; + #ifdef WIN32 extern HANDLE PostmasterHandle; diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h index 048947c..a146bf6 100644 --- a/src/include/storage/lock.h +++ b/src/include/storage/lock.h @@ -152,7 +152,6 @@ typedef enum LockTagType LOCKTAG_SPECULATIVE_TOKEN, /* speculative insertion Xid and token */ /* ID info for a transaction is its TransactionId */ LOCKTAG_OBJECT, /* non-relation database object */ - /* ID info for an object is DB OID + CLASS OID + OBJECT OID + SUBID */ /* * Note: object ID has same representation as in pg_depend and diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index 4abb344..118ff6b 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -222,6 +222,7 @@ typedef enum BuiltinTrancheIds LWTRANCHE_SXACT, LWTRANCHE_UNDOLOG, LWTRANCHE_UNDODISCARD, + LWTRANCHE_DISCARD_UPDATE, LWTRANCHE_REWIND, LWTRANCHE_FIRST_USER_DEFINED, } BuiltinTrancheIds; diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index 1cee7db..a656991 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -272,6 +272,8 @@ typedef struct PROC_HDR int startupProcPid; /* Buffer id of the buffer that Startup process waits for pin on, or -1 */ int startupBufferPinWaitBufId; + /* Oldest transaction id which is having undo. */ + pg_atomic_uint64 oldestXidWithEpochHavingUndo; } PROC_HDR; extern PGDLLIMPORT PROC_HDR *ProcGlobal; diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h index bd24850..8aa3249 100644 --- a/src/include/storage/procarray.h +++ b/src/include/storage/procarray.h @@ -27,6 +27,8 @@ * to avoid forcing to include proc.h when including procarray.h. So if you modify * PROC_XXX flags, you need to modify these flags. */ +#define PROCARRAY_AUTOVACUUM_FLAG 0x01 /* currently running + * autovacuum */ #define PROCARRAY_VACUUM_FLAG 0x02 /* currently running lazy * vacuum */ #define PROCARRAY_ANALYZE_FLAG 0x04 /* currently running @@ -41,7 +43,8 @@ * PGXACT->vacuumFlags. Other flags are used for different purposes and * have no corresponding PROC flag equivalent. */ -#define PROCARRAY_PROC_FLAGS_MASK (PROCARRAY_VACUUM_FLAG | \ +#define PROCARRAY_PROC_FLAGS_MASK (PROCARRAY_AUTOVACUUM_FLAG | \ + PROCARRAY_VACUUM_FLAG | \ PROCARRAY_ANALYZE_FLAG | \ PROCARRAY_LOGICAL_DECODING_FLAG) @@ -50,6 +53,8 @@ #define PROCARRAY_FLAGS_DEFAULT PROCARRAY_LOGICAL_DECODING_FLAG /* Ignore vacuum backends */ #define PROCARRAY_FLAGS_VACUUM PROCARRAY_FLAGS_DEFAULT | PROCARRAY_VACUUM_FLAG +/* Ignore autovacuum worker and backends running vacuum */ +#define PROCARRAY_FLAGS_AUTOVACUUM PROCARRAY_FLAGS_DEFAULT | PROCARRAY_AUTOVACUUM_FLAG /* Ignore analyze backends */ #define PROCARRAY_FLAGS_ANALYZE PROCARRAY_FLAGS_DEFAULT | PROCARRAY_ANALYZE_FLAG /* Ignore both vacuum and analyze backends */ diff --git a/src/include/utils/elog.h b/src/include/utils/elog.h index 7ac37fd..ce302bb 100644 --- a/src/include/utils/elog.h +++ b/src/include/utils/elog.h @@ -195,6 +195,8 @@ extern int errposition(int cursorpos); extern int internalerrposition(int cursorpos); extern int internalerrquery(const char *query); +extern int err_out_to_client(bool out_to_client); + extern int err_generic_string(int field, const char *str); extern int geterrcode(void); @@ -384,6 +386,7 @@ extern void FlushErrorState(void); extern void ReThrowError(ErrorData *edata) pg_attribute_noreturn(); extern void ThrowErrorData(ErrorData *edata); extern void pg_re_throw(void) pg_attribute_noreturn(); +extern void pg_rethrow_as_fatal(void); extern char *GetErrorContextStack(void);