diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index 32cb819..dae33c6 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -27,6 +27,7 @@ #include "commands/async.h" #include "miscadmin.h" #include "pg_trace.h" +#include "storage/atomic.h" #include "storage/ipc.h" #include "storage/proc.h" #include "storage/spin.h" @@ -36,18 +37,58 @@ extern slock_t *ShmemLock; +/* + * If compare-and-swap is not available, the mutex protects the entire LWLock. + * But when compare-and-swap is available, the state is manipulated using + * cas_int32() and only the remaining members are protected by the mutex. + */ typedef struct LWLock { - slock_t mutex; /* Protects LWLock and queue of PGPROCs */ + slock_t mutex; /* See notes above */ + int32 state; /* Lock state */ bool releaseOK; /* T if ok to release waiters */ - char exclusive; /* # of exclusive holders (0 or 1) */ - int shared; /* # of shared holders (0..MaxBackends) */ PGPROC *head; /* head of list of waiting PGPROCs */ PGPROC *tail; /* tail of list of waiting PGPROCs */ /* tail is undefined when head is NULL */ } LWLock; /* + * A lightweight lock can be unlocked, exclusively locked (by exactly one + * locker), or share-locked (by one or more lockers). We maintain the state of + * the lock in a single integer variable so that it can be atomically + * updated using compare-and-swap, where available. + */ +#define LWSTATE_UNLOCKED 0 +#define LWSTATE_EXCLUSIVE (-2) +#define LWSTATE_SHARED 2 +#define LWSTATE_IS_EXCLUSIVE(n) ((n) < 0) +#define LWSTATE_IS_SHARED(n) ((n) > 1) +#ifdef HAVE_COMPARE_AND_SWAP +#define LWSTATE_IS_UNLOCKED(n) ((n & ~1) == 0) +#else +#define LWSTATE_IS_UNLOCKED(n) ((n) == 0) +#endif + +#ifdef HAVE_COMPARE_AND_SWAP +/* + * When HAVE_COMPARE_AND_SWAP is set, we use the low bit of the state to + * track whether waiters are present. This allows the lwlock to be released + * without taking the mutex, unless there are actually waiters that need to + * be woken up. + */ +#define LWSTATE_WAITERS 1 +#define LWSTATE_HAS_WAITERS(n) ((n) & 1) +/* Last shared or exclusive lock requires a wakeup. */ +#define LWSTATE_NEEDS_WAKEUP(n) ((n) == -1 || (n) == 3) + +static bool LWLockCASAcquire(volatile LWLock *lock, LWLockMode mode, + int wait_ok); +static bool LWLockCASRelease(volatile LWLock *lock, LWLockMode mode, + int wake_ok); +static void LWLockCASClearWaitMark(volatile LWLock *lock); +#endif + +/* * All the LWLock structs are allocated as an array in shared memory. * (LWLockIds are indexes into the array.) We force the array stride to * be a power of 2, which saves a few cycles in indexing, but more @@ -85,6 +126,9 @@ NON_EXEC_STATIC LWLockPadded *LWLockArray = NULL; static int num_held_lwlocks = 0; static LWLockId held_lwlocks[MAX_SIMUL_LWLOCKS]; +#ifdef HAVE_COMPARE_AND_SWAP +static LWLockMode held_lwlock_modes[MAX_SIMUL_LWLOCKS]; +#endif static int lock_addin_request = 0; static bool lock_addin_request_allowed = true; @@ -260,8 +304,7 @@ CreateLWLocks(void) { SpinLockInit(&lock->lock.mutex); lock->lock.releaseOK = true; - lock->lock.exclusive = 0; - lock->lock.shared = 0; + lock->lock.state = LWSTATE_UNLOCKED; lock->lock.head = NULL; lock->lock.tail = NULL; } @@ -360,6 +403,15 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode) */ HOLD_INTERRUPTS(); +#ifdef HAVE_COMPARE_AND_SWAP + /* + * If we have compare-and-swap, we can try to grap the lock using an atomic + * operation, without actually taking the mutex lock. + */ + if (LWLockCASAcquire(lock, mode, false)) + goto lock_acquired; +#endif + /* * Loop here to try to acquire lock after each time we are signaled by * LWLockRelease. @@ -378,7 +430,7 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode) */ for (;;) { - bool mustwait; + bool acquired; /* Acquire mutex. Time spent holding mutex should be short! */ SpinLockAcquire(&lock->mutex); @@ -387,30 +439,36 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode) if (retry) lock->releaseOK = true; +#ifdef HAVE_COMPARE_AND_SWAP + /* Either acquire the mutex, or mark it as having waiters. */ + acquired = LWLockCASAcquire(lock, mode, true); +#else + /* If I can get the lock, do so quickly. */ if (mode == LW_EXCLUSIVE) { - if (lock->exclusive == 0 && lock->shared == 0) + if (LWSTATE_IS_UNLOCKED(lock->state)) { - lock->exclusive++; - mustwait = false; + lock->state = LWSTATE_EXCLUSIVE; + acquired = true; } else - mustwait = true; + acquired = false; } else { - if (lock->exclusive == 0) + if (!LWSTATE_IS_EXCLUSIVE(lock->state)) { - lock->shared++; - mustwait = false; + lock->state += LWSTATE_SHARED; + acquired = true; } else - mustwait = true; + acquired = false; } +#endif - if (!mustwait) - break; /* got the lock */ + if (acquired) + break; /* got the lock */ /* * Add myself to wait queue. @@ -474,9 +532,17 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode) /* We are done updating shared state of the lock itself. */ SpinLockRelease(&lock->mutex); +#ifdef HAVE_COMPARE_AND_SWAP + /* The mutex-free fast-path jumps here if it acquires the lock. */ +lock_acquired: +#endif + TRACE_POSTGRESQL_LWLOCK_ACQUIRE(lockid, mode); /* Add lock to list of locks held by this backend */ +#ifdef HAVE_COMPARE_AND_SWAP + held_lwlock_modes[num_held_lwlocks] = mode; +#endif held_lwlocks[num_held_lwlocks++] = lockid; /* @@ -497,7 +563,7 @@ bool LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode) { volatile LWLock *lock = &(LWLockArray[lockid].lock); - bool mustwait; + bool acquired; PRINT_LWDEBUG("LWLockConditionalAcquire", lockid, lock); @@ -512,35 +578,50 @@ LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode) */ HOLD_INTERRUPTS(); - /* Acquire mutex. Time spent holding mutex should be short! */ +#ifdef HAVE_COMPARE_AND_SWAP + + /* + * If we're using compare-and-swap, LWLockCASAcquire() does all the heavy + * lifting on our behalf. + */ + acquired = LWLockCASAcquire(lock, mode, false); + +#else + + /* + * We don't have compare-and-swap, and must acquire the mutex to check + * and update the lock state. Time spent holding mutex should be short! + */ SpinLockAcquire(&lock->mutex); /* If I can get the lock, do so quickly. */ if (mode == LW_EXCLUSIVE) { - if (lock->exclusive == 0 && lock->shared == 0) + if (LWSTATE_IS_UNLOCKED(lock->state)) { - lock->exclusive++; - mustwait = false; + lock->state = LWSTATE_EXCLUSIVE; + acquired = true; } else - mustwait = true; + acquired = false; } else { - if (lock->exclusive == 0) + if (LWSTATE_IS_UNLOCKED(lock->state)) { - lock->shared++; - mustwait = false; + lock->state += LWSTATE_SHARED; + acquired = true; } else - mustwait = true; + acquired = false; } /* We are done updating shared state of the lock itself. */ SpinLockRelease(&lock->mutex); - if (mustwait) +#endif + + if (!acquired) { /* Failed to get lock, so release interrupt holdoff */ RESUME_INTERRUPTS(); @@ -550,11 +631,14 @@ LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode) else { /* Add lock to list of locks held by this backend */ +#ifdef HAVE_COMPARE_AND_SWAP + held_lwlock_modes[num_held_lwlocks] = mode; +#endif held_lwlocks[num_held_lwlocks++] = lockid; TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE(lockid, mode); } - return !mustwait; + return acquired; } /* @@ -567,6 +651,10 @@ LWLockRelease(LWLockId lockid) PGPROC *head; PGPROC *proc; int i; + bool wakeup; +#ifdef HAVE_COMPARE_AND_SWAP + LWLockMode mode; +#endif PRINT_LWDEBUG("LWLockRelease", lockid, lock); @@ -581,21 +669,47 @@ LWLockRelease(LWLockId lockid) } if (i < 0) elog(ERROR, "lock %d is not held", (int) lockid); +#ifdef HAVE_COMPARE_AND_SWAP + mode = held_lwlock_modes[i]; +#endif num_held_lwlocks--; for (; i < num_held_lwlocks; i++) + { held_lwlocks[i] = held_lwlocks[i + 1]; +#ifdef HAVE_COMPARE_AND_SWAP + held_lwlock_modes[i] = held_lwlock_modes[i + 1]; +#endif + } + +#ifdef HAVE_COMPARE_AND_SWAP + /* + * If no waiters need to be awoken, we can drop the LWLock without needing + * to acquire the spinlock. + */ + if (!LWLockCASRelease(lock, mode, false)) + { + head = NULL; + goto fast_exit; + } +#endif /* Acquire mutex. Time spent holding mutex should be short! */ SpinLockAcquire(&lock->mutex); +#ifdef HAVE_COMPARE_AND_SWAP + /* LWLockCASRelease does all the hard work. */ + wakeup = LWLockCASRelease(lock, mode, true); +#else /* Release my hold on lock */ - if (lock->exclusive > 0) - lock->exclusive--; + if (LWSTATE_IS_EXCLUSIVE(lock->state)) + lock->state = LWSTATE_UNLOCKED; else { - Assert(lock->shared > 0); - lock->shared--; + Assert(LWSTATE_IS_SHARED(lock->state)); + lock->state -= LWSTATE_SHARE_INCREMENT; } + wakeup = LWSTATE_IS_UNLOCKED(lock->state); +#endif /* * See if I need to awaken any waiters. If I released a non-last shared @@ -606,7 +720,7 @@ LWLockRelease(LWLockId lockid) head = lock->head; if (head != NULL) { - if (lock->exclusive == 0 && lock->shared == 0 && lock->releaseOK) + if (wakeup && lock->releaseOK) { /* * Remove the to-be-awakened PGPROCs from the queue. If the front @@ -625,6 +739,14 @@ LWLockRelease(LWLockId lockid) proc->lwWaitLink = NULL; /* prevent additional wakeups until retryer gets to run */ lock->releaseOK = false; + +#ifdef HAVE_COMPARE_AND_SWAP + /* + * Clear the waiters bit if we removed all of the waiters. + */ + if (lock->head == NULL) + LWLockCASClearWaitMark(lock); +#endif } else { @@ -636,6 +758,9 @@ LWLockRelease(LWLockId lockid) /* We are done updating shared state of the lock itself. */ SpinLockRelease(&lock->mutex); +#ifdef HAVE_COMPARE_AND_SWAP +fast_exit: +#endif TRACE_POSTGRESQL_LWLOCK_RELEASE(lockid); /* @@ -697,3 +822,131 @@ LWLockHeldByMe(LWLockId lockid) } return false; } + +#ifdef HAVE_COMPARE_AND_SWAP + +/* + * LWLockCASAcquire - attempt to acquire an LWLock using compare-and-swap + * + * Returns true if the LWLock has been acquired, false if not. + * + * If wait_ok is true, and the lock cannot be acquired, try to set the waiters + * bit instead. + */ +static bool +LWLockCASAcquire(volatile LWLock *lock, LWLockMode mode, int wait_ok) +{ + int current_guess = LWSTATE_UNLOCKED; + + while (1) + { + int delta; + int result; + + /* Compute delta to apply, or give up! */ + if (mode == LW_EXCLUSIVE) + { + if (LWSTATE_IS_UNLOCKED(current_guess)) + delta = LWSTATE_EXCLUSIVE; + else if (LWSTATE_HAS_WAITERS(current_guess) || !wait_ok) + return false; + else + delta = LWSTATE_WAITERS; + } + else + { + if (!LWSTATE_IS_EXCLUSIVE(current_guess)) + delta = LWSTATE_SHARED; + else if (LWSTATE_HAS_WAITERS(current_guess) || !wait_ok) + return false; + else + delta = LWSTATE_WAITERS; + } + + /* Attempt to add delta. */ + result = cas_int32(&lock->state, current_guess, current_guess + delta); + + /* If it worked, we're done. */ + if (current_guess == result) + return (delta != LWSTATE_WAITERS); + + /* We are unlucky and must retry. */ + current_guess = result; + } +} + +/* + * LWLockCASRelease - release an LWLock using compare-and-swap + * + * If wake_ok is true, the lock is released always; if false, it's released + * only when no wakeups are required. The return value is true if wakeups + * are required and false otherwise. + */ +static bool +LWLockCASRelease(volatile LWLock *lock, LWLockMode mode, int wake_ok) +{ + int current_guess; + int delta; + + if (mode == LW_EXCLUSIVE) + { + current_guess = LWSTATE_EXCLUSIVE; + delta = LWSTATE_EXCLUSIVE; + } + else + { + current_guess = LWSTATE_SHARED; + delta = LWSTATE_SHARED; + } + + while (1) + { + int result; + + /* Attempt to subtract delta. */ + result = cas_int32(&lock->state, current_guess, current_guess - delta); + if (current_guess == result) + return LWSTATE_NEEDS_WAKEUP(result); + + /* Be careful about releasing the last lock. */ + if (!wake_ok && LWSTATE_NEEDS_WAKEUP(result)) + return true; + +#ifdef USE_ASSERT_CHECKING + if (mode == LW_EXCLUSIVE) + Assert(LWSTATE_IS_EXCLUSIVE(result)); + else + Assert(LWSTATE_IS_SHARED(result)); +#endif + + /* We are unlucky and must retry. */ + current_guess = result; + } +} + +/* + * LWLockCASClearWaitMark - remove waiters mark from lock state + */ +static void +LWLockCASClearWaitMark(volatile LWLock *lock) +{ + int current_guess = LWSTATE_UNLOCKED + LWSTATE_WAITERS; + + while (1) + { + int result; + + /* Attempt to subtract delta. */ + result = cas_int32(&lock->state, current_guess, + current_guess - LWSTATE_WAITERS); + if (current_guess == result) + return; + + Assert(LWSTATE_HAS_WAITERS(result)); + + /* We are unlucky and must retry. */ + current_guess = result; + } +} + +#endif diff --git a/src/include/storage/atomic.h b/src/include/storage/atomic.h new file mode 100644 index 0000000..a1f909f --- /dev/null +++ b/src/include/storage/atomic.h @@ -0,0 +1,65 @@ +/*------------------------------------------------------------------------- + * + * atomic.h + * Hardware-dependent atomic primitives. + * + * NOTE: No code must rely absolutely on the availability of the + * primitives in this file. On architectures where don't have a + * working implementation, or where they are not supported, use + * spinlocks or some other mechanism. + * + * CAUTION: Be sure that any operation defined here represents a sequence + * point - ie, loads and stores of other values must not be moved across + * a lock or unlock. In most cases it suffices to make the operation be + * done through a "volatile" pointer. + * + * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/storage/atomic.h + * + *------------------------------------------------------------------------- + */ +#ifndef ATOMIC_H +#define ATOMIC_H + +#if defined(__GNUC__) || defined(__INTEL_COMPILER) + +/* 32-bit i386, AMD Opteron, Intel EM64T */ +#if defined(__i386__) || defined(__x86_64__) + +#define HAVE_COMPARE_AND_SWAP 1 + +/* + * We might someday want to have more than one CAS variant (e.g. CAS is + * commonly used with pointers to build lock-free data structures), but int32 + * is sufficient for now. + */ +static __inline__ int32 +cas_int32(volatile int32 *var, volatile int32 old, volatile int32 new) +{ + /* + * cmpxchg compares EAX to its second operand (which must be a memory + * location). If they are equal, it writes its first operand to that + * address; otherwise, it writes the original value back to that address. + * In either case, the original value of the memory location is left in + * EAX. We therefore force GCC to allocate old in EAX (using the "a" + * constraints). We mark memory and the condition code register as + * clobbered. The former is not true but prevents GCC from reordering + * other instructions around our atomic primitive; the latter is correct + * (ZF will be set if old was equal to *var). + */ + __asm__ __volatile__( + " lock \n" + " cmpxchg %2,%1 \n" +: "+a" (old), "+m" (*var) +: "q" (new), "a" (old) +: "memory", "cc"); + return old; +} + +#endif /* defined(__i386__) || defined(__x86_64__) */ + +#endif /* defined(__GNUC__) || defined(__INTEL_COMPILER) */ + +#endif /* ATOMIC_H */