diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c index 6259070..5f85851 100644 --- a/src/backend/storage/ipc/standby.c +++ b/src/backend/storage/ipc/standby.c @@ -38,7 +38,29 @@ int vacuum_defer_cleanup_age; int max_standby_archive_delay = 30 * 1000; int max_standby_streaming_delay = 30 * 1000; -static List *RecoveryLockList; +/* + * RecoveryLockTable is a poor man's hash table that allows us to partition + * the stored locks. Which partition a lock is stored in is determined by the + * xid which the lock belongs to. The hash function is very simplistic and + * merely performs a binary AND against the final 0-based partition number. + * Splitting into partitions in this way avoids having to look through all + * locks to find one specific to a given xid. + */ +static List **RecoveryLockTable; + +/* + * Number of items in RecoveryLockTable. We can make us of this so we can + * fast-path out from any RecoveryLockTable lookups when there's no locks held + */ +static uint64 RecoveryLockCount; + +/* + * Number of buckets to split RecoveryLockTable into. + * This must be a power of two. + */ +#define RECOVERYLOCKTABLE_SIZE 1024 +#define RECOVERYLOCKTABLE_MASK (RECOVERYLOCKTABLE_SIZE - 1) + static void ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist, ProcSignalReason reason); @@ -64,6 +86,12 @@ InitRecoveryTransactionEnvironment(void) { VirtualTransactionId vxid; + /* Setup the recovery lock table */ + RecoveryLockTable = (List **) + palloc0(RECOVERYLOCKTABLE_SIZE * sizeof(List *)); + + RecoveryLockCount = 0; + /* * Initialize shared invalidation management for Startup process, being * careful to register ourselves as a sendOnly process so we don't need to @@ -109,6 +137,9 @@ ShutdownRecoveryTransactionEnvironment(void) /* Cleanup our VirtualTransaction */ VirtualXactLockTableCleanup(); + + /* Free memory used by the RecoveryLockTable */ + pfree(RecoveryLockTable); } @@ -586,9 +617,10 @@ StandbyLockTimeoutHandler(void) * We only keep track of AccessExclusiveLocks, which are only ever held by * one transaction on one relation. * - * We keep a single dynamically expandible list of locks in local memory, - * RelationLockList, so we can keep track of the various entries made by - * the Startup process's virtual xid in the shared lock table. + * We maintain a simple hash table to store locks. This allows quick lookups + * to find locks which belong to a given xid and allows us to keep track of + * the various entries made by the Startup process's virtual xid in the shared + * lock table. * * We record the lock against the top-level xid, rather than individual * subtransaction xids. This means AccessExclusiveLocks held by aborted @@ -607,6 +639,8 @@ StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid) { xl_standby_lock *newlock; LOCKTAG locktag; + size_t xidhash; + /* Already processed? */ if (!TransactionIdIsValid(xid) || @@ -624,7 +658,11 @@ StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid) newlock->xid = xid; newlock->dbOid = dbOid; newlock->relOid = relOid; - RecoveryLockList = lappend(RecoveryLockList, newlock); + + /* add the newlock to the hash table */ + xidhash = xid & RECOVERYLOCKTABLE_MASK; + RecoveryLockTable[xidhash] = lappend(RecoveryLockTable[xidhash], newlock); + RecoveryLockCount++; SET_LOCKTAG_RELATION(locktag, newlock->dbOid, newlock->relOid); @@ -637,18 +675,31 @@ StandbyReleaseLocks(TransactionId xid) ListCell *cell, *prev, *next; + size_t xidhash; + + /* fast path out if there's no locks */ + if (RecoveryLockCount == 0) + return; + + if (!TransactionIdIsValid(xid)) + { + StandbyReleaseAllLocks(); + return; + } /* - * Release all matching locks and remove them from list + * Release all matching locks and remove them from recover lock table */ + + xidhash = xid & RECOVERYLOCKTABLE_MASK; prev = NULL; - for (cell = list_head(RecoveryLockList); cell; cell = next) + for (cell = list_head(RecoveryLockTable[xidhash]); cell; cell = next) { xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell); next = lnext(cell); - if (!TransactionIdIsValid(xid) || lock->xid == xid) + if (lock->xid == xid) { LOCKTAG locktag; @@ -658,10 +709,12 @@ StandbyReleaseLocks(TransactionId xid) SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid); if (!LockRelease(&locktag, AccessExclusiveLock, true)) elog(LOG, - "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u", + "RecoveryLockTable contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u", lock->xid, lock->dbOid, lock->relOid); - RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev); + RecoveryLockTable[xidhash] = + list_delete_cell(RecoveryLockTable[xidhash], cell, prev); + RecoveryLockCount--; pfree(lock); } else @@ -671,7 +724,7 @@ StandbyReleaseLocks(TransactionId xid) /* * Release locks for a transaction tree, starting at xid down, from - * RecoveryLockList. + * RecoveryLockTable. * * Called during WAL replay of COMMIT/ROLLBACK when in hot standby mode, * to remove any AccessExclusiveLocks requested by a transaction. @@ -681,6 +734,10 @@ StandbyReleaseLockTree(TransactionId xid, int nsubxids, TransactionId *subxids) { int i; + /* we needn't bother trying if there's no locks */ + if (RecoveryLockCount == 0) + return; + StandbyReleaseLocks(xid); for (i = 0; i < nsubxids; i++) @@ -697,27 +754,37 @@ StandbyReleaseAllLocks(void) *prev, *next; LOCKTAG locktag; + size_t i; elog(trace_recovery(DEBUG2), "release all standby locks"); - prev = NULL; - for (cell = list_head(RecoveryLockList); cell; cell = next) + if (RecoveryLockCount == 0) + return; + + for (i = 0; i < RECOVERYLOCKTABLE_SIZE; i++) { - xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell); + prev = NULL; + for (cell = list_head(RecoveryLockTable[i]); cell; cell = next) + { + xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell); - next = lnext(cell); + next = lnext(cell); - elog(trace_recovery(DEBUG4), - "releasing recovery lock: xid %u db %u rel %u", - lock->xid, lock->dbOid, lock->relOid); - SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid); - if (!LockRelease(&locktag, AccessExclusiveLock, true)) - elog(LOG, - "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u", + elog(trace_recovery(DEBUG4), + "releasing recovery lock: xid %u db %u rel %u", lock->xid, lock->dbOid, lock->relOid); - RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev); - pfree(lock); + SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid); + if (!LockRelease(&locktag, AccessExclusiveLock, true)) + elog(LOG, + "RecoveryLockTable contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u", + lock->xid, lock->dbOid, lock->relOid); + RecoveryLockTable[i] = + list_delete_cell(RecoveryLockTable[i], cell, prev); + RecoveryLockCount--; + pfree(lock); + } } + Assert(RecoveryLockCount == 0); } /* @@ -732,55 +799,64 @@ StandbyReleaseOldLocks(int nxids, TransactionId *xids) *prev, *next; LOCKTAG locktag; + size_t i; - prev = NULL; - for (cell = list_head(RecoveryLockList); cell; cell = next) - { - xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell); - bool remove = false; + if (RecoveryLockCount == 0) + return; - next = lnext(cell); + for (i = 0; i < RECOVERYLOCKTABLE_SIZE; i++) + { + prev = NULL; + for (cell = list_head(RecoveryLockTable[i]); cell; cell = next) + { + xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell); + bool remove = false; - Assert(TransactionIdIsValid(lock->xid)); + next = lnext(cell); - if (StandbyTransactionIdIsPrepared(lock->xid)) - remove = false; - else - { - int i; - bool found = false; + Assert(TransactionIdIsValid(lock->xid)); - for (i = 0; i < nxids; i++) + if (StandbyTransactionIdIsPrepared(lock->xid)) + remove = false; + else { - if (lock->xid == xids[i]) + int i; + bool found = false; + + for (i = 0; i < nxids; i++) { - found = true; - break; + if (lock->xid == xids[i]) + { + found = true; + break; + } } - } - /* - * If its not a running transaction, remove it. - */ - if (!found) - remove = true; - } + /* + * If its not a running transaction, remove it. + */ + if (!found) + remove = true; + } - if (remove) - { - elog(trace_recovery(DEBUG4), - "releasing recovery lock: xid %u db %u rel %u", - lock->xid, lock->dbOid, lock->relOid); - SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid); - if (!LockRelease(&locktag, AccessExclusiveLock, true)) - elog(LOG, - "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u", + if (remove) + { + elog(trace_recovery(DEBUG4), + "releasing recovery lock: xid %u db %u rel %u", lock->xid, lock->dbOid, lock->relOid); - RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev); - pfree(lock); + SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid); + if (!LockRelease(&locktag, AccessExclusiveLock, true)) + elog(LOG, + "RecoveryLockTable contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u", + lock->xid, lock->dbOid, lock->relOid); + RecoveryLockTable[i] = + list_delete_cell(RecoveryLockTable[i], cell, prev); + RecoveryLockCount--; + pfree(lock); + } + else + prev = cell; } - else - prev = cell; } }