diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index 4f3c5c9..b3b7efe 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -167,6 +167,8 @@ static int KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, static TransactionId KnownAssignedXidsGetOldestXmin(void); static void KnownAssignedXidsDisplay(int trace_level); static void KnownAssignedXidsReset(void); +static bool PushProcAndWaitForXidClear(PGPROC *proc, TransactionId latestXid); +static void PopProcsAndClearXids(void); /* * Report shared-memory space needed by CreateSharedProcArray. @@ -399,26 +401,41 @@ ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid) */ Assert(TransactionIdIsValid(allPgXact[proc->pgprocno].xid)); - LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); - - pgxact->xid = InvalidTransactionId; - proc->lxid = InvalidLocalTransactionId; - pgxact->xmin = InvalidTransactionId; - /* must be cleared with xid/xmin: */ - pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK; - pgxact->delayChkpt = false; /* be sure this is cleared in abort */ - proc->recoveryConflictPending = false; - - /* Clear the subtransaction-XID cache too while holding the lock */ - pgxact->nxids = 0; - pgxact->overflowed = false; + /* + * If we get the lock then clear the advertised Xid, else push this + * proc to global list of proc's for which Xid needs to be cleared + * and wait for xid to get cleared. If this is the first proc on + * global list, then clear xid's of all the proc's on list and wake + * them. + */ + if (LWLockConditionalAcquire(ProcArrayLock, LW_EXCLUSIVE)) + { + pgxact->xid = InvalidTransactionId; + proc->lxid = InvalidLocalTransactionId; + pgxact->xmin = InvalidTransactionId; + /* must be cleared with xid/xmin: */ + pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK; + pgxact->delayChkpt = false; /* be sure this is cleared in abort */ + proc->recoveryConflictPending = false; + + /* Clear the subtransaction-XID cache too while holding the lock */ + pgxact->nxids = 0; + pgxact->overflowed = false; + + /* Also advance global latestCompletedXid while holding the lock */ + if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid, + latestXid)) + ShmemVariableCache->latestCompletedXid = latestXid; - /* Also advance global latestCompletedXid while holding the lock */ - if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid, - latestXid)) - ShmemVariableCache->latestCompletedXid = latestXid; + LWLockRelease(ProcArrayLock); + } + else + { + if (PushProcAndWaitForXidClear(proc, latestXid)) + return; - LWLockRelease(ProcArrayLock); + PopProcsAndClearXids(); + } } else { @@ -441,6 +458,169 @@ ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid) } } +/* + * PushProcAndWaitForXidClear -- wait for xid to be cleared + * + * This sets the flag (which indicates that advertised Xid needs to + * be clear for this proc) and push this proc to list of proc's for + * which Xid needs to be cleared. Except one proc, all other proc's + * will wait for their Xid to be cleared. To set the appropriate value + * for ShmemVariableCache->latestCompletedXid, we need to advertise + * latestXid which will be later used by the backend that clears the + * xid's of procs on global list. + * + * Returns true, if xid is cleared, else false. + */ +bool +PushProcAndWaitForXidClear(PGPROC *proc, TransactionId latestXid) +{ + /* use volatile pointer to prevent code rearrangement */ + volatile PROC_HDR *procglobal = ProcGlobal; + uint32 nonempty_nextClearXidElem; + int extraWaits = 0; + + proc->clearXid = true; + + /* + * Add the proc to the list of proc's for which Xid needs to be + * cleared and advertise the latestXid, so that latestCompletedXid + * can be updated by the process which clears current process xid. + */ + proc->backendLatestXid = latestXid; + while (true) + { + proc->nextClearXidElem = pg_atomic_read_u32((volatile pg_atomic_uint32*)&procglobal->nextClearXidElem); + + /* + * Read barrier is required here to ensure that we read the + * head of list containing the proc's whose xid needs to be + * cleared before pushing the proc to list. The head of list + * will be used to decide if the proc needs to wait for it's + * xid to get cleared or will proceed to clear the xid's of + * all other proc's on list. + */ + pg_read_barrier(); + nonempty_nextClearXidElem = proc->nextClearXidElem; + if (pg_atomic_compare_exchange_u32((volatile pg_atomic_uint32*) &procglobal->nextClearXidElem, + (uint32*)&proc->nextClearXidElem, + (uint32)proc->pgprocno)) + break; + } + + /* + * only first process which has seen the list of proc's for which Xid + * needs to be cleared as empty will group clear all the xid's on that + * list, all other processes will wait for their xid to be cleared. + */ + if (nonempty_nextClearXidElem != 0) + { + for (;;) + { + PGSemaphoreLock(&proc->sem); + if (!proc->clearXid) + { + /* + * Fix the process wait semaphore's count for any absorbed wakeups. + */ + while (extraWaits-- > 0) + PGSemaphoreUnlock(&proc->sem); + return true; + } + extraWaits++; + } + } + + return false; +} + +/* + * PopProcsAndClearXids -- clear the Xids of procs. + * + * At-a-time, there will be only one proc that is allowed to clear + * the Xids. The only allowed proc will attempt the lock acquiration + * and after acquring the lock, pop all of the requests off the list + * using compare-and-swap, servicing each one before moving to next proc, + * and clearing their Xids. After servicing all the requests on list, + * release the lock and once again go through the saved list and wake + * all the processes waiting for their Xid to be cleared. + */ +void +PopProcsAndClearXids(void) +{ + /* use volatile pointer to prevent code rearrangement */ + volatile PROC_HDR *procglobal = ProcGlobal; + uint32 pendingClearXidElem, wake_pendingClearXidElem; + PGPROC *proc_to_clear; + PGXACT *pgxact_to_clear; + + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + + while (true) + { + pendingClearXidElem = pg_atomic_read_u32((volatile pg_atomic_uint32*)&procglobal->nextClearXidElem); + if (pg_atomic_compare_exchange_u32((volatile pg_atomic_uint32*) &procglobal->nextClearXidElem, + (uint32*)&pendingClearXidElem, + (uint32)0)) + break; + } + + /* save the list of procs whose xid needs to be cleared to wake them up. */ + wake_pendingClearXidElem = pendingClearXidElem; + + while (pendingClearXidElem) + { + proc_to_clear = &allProcs[pendingClearXidElem]; + + pgxact_to_clear = &allPgXact[proc_to_clear->pgprocno]; + + pgxact_to_clear->xid = InvalidTransactionId; + proc_to_clear->lxid = InvalidLocalTransactionId; + pgxact_to_clear->xmin = InvalidTransactionId; + /* must be cleared with xid/xmin: */ + pgxact_to_clear->vacuumFlags &= ~PROC_VACUUM_STATE_MASK; + pgxact_to_clear->delayChkpt = false; /* be sure this is cleared in abort */ + proc_to_clear->recoveryConflictPending = false; + + /* Clear the subtransaction-XID cache too while holding the lock */ + pgxact_to_clear->nxids = 0; + pgxact_to_clear->overflowed = false; + + + /* Also advance global latestCompletedXid while holding the lock */ + if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid, + proc_to_clear->backendLatestXid)) + ShmemVariableCache->latestCompletedXid = proc_to_clear->backendLatestXid; + + /* + * move to next proc in list. + */ + pendingClearXidElem = proc_to_clear->nextClearXidElem; + } + + LWLockRelease(ProcArrayLock); + + while (wake_pendingClearXidElem) + { + proc_to_clear = &allProcs[wake_pendingClearXidElem]; + + wake_pendingClearXidElem = proc_to_clear->nextClearXidElem; + + /* Mark that Xid has cleared for this proc */ + proc_to_clear->clearXid = false; + + /* + * Write barrier is required here to ensure that before waking + * the proc whose xid got cleared, we set the flag to indicate + * the same. After waking if process finds that the flag is not + * cleared, then it can again go to sleep with no one to wake that + * process. + */ + pg_write_barrier(); + + if (proc_to_clear != MyProc) + PGSemaphoreUnlock(&proc_to_clear->sem); + } +} /* * ProcArrayClearTransaction -- clear the transaction fields diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 884e91b..fea1edd 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -264,6 +264,8 @@ InitProcGlobal(void) SHMQueueInit(&(procs[i].myProcLocks[j])); } + pg_atomic_init_u32(&ProcGlobal->nextClearXidElem, 0); + /* * Save pointers to the blocks of PGPROC structures reserved for auxiliary * processes and prepared transactions. @@ -393,6 +395,11 @@ InitProcess(void) MyProc->syncRepState = SYNC_REP_NOT_WAITING; SHMQueueElemInit(&(MyProc->syncRepLinks)); + /* Initialize fields for clearing XID */ + MyProc->clearXid = false; + MyProc->backendLatestXid = InvalidTransactionId; + MyProc->nextClearXidElem = 0; + /* * Acquire ownership of the PGPROC's latch, so that we can use WaitLatch * on it. That allows us to repoint the process latch, which so far diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index 202a672..9981b61 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -135,6 +135,18 @@ struct PGPROC struct XidCache subxids; /* cache for subtransaction XIDs */ + /* + * Info to allow us to advertise that we want backend + * holding the ProcArrayLock can clear our xid. + */ + bool clearXid; + TransactionId backendLatestXid; + /* + * index of link if process is in list of proc's for which + * xid needs to be cleared. + */ + uint32 nextClearXidElem; + /* Per-backend LWLock. Protects fields below. */ LWLock *backendLock; /* protects the fields below */ @@ -196,6 +208,11 @@ typedef struct PROC_HDR PGPROC *autovacFreeProcs; /* Head of list of bgworker free PGPROC structures */ PGPROC *bgworkerFreeProcs; + /* + * Head of list of indexes for PGPROC structures that need to have + * their XIDs cleared + */ + pg_atomic_uint32 nextClearXidElem; /* WALWriter process's latch */ Latch *walwriterLatch; /* Checkpointer process's latch */