>From 47c024c1f7c2cba5a2941a72715be05b8b8c02d6 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Mon, 18 Nov 2013 14:45:11 +0100
Subject: [PATCH 4/4] Wait free LW_SHARED lwlock acquiration

---
 src/backend/storage/lmgr/lwlock.c | 754 ++++++++++++++++++++++++++------------
 1 file changed, 523 insertions(+), 231 deletions(-)

diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index 732a5d2..fe8858f 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -17,6 +17,82 @@
  * IDENTIFICATION
  *	  src/backend/storage/lmgr/lwlock.c
  *
+ * NOTES:
+ *
+ * This used to be a pretty straight forward reader-writer lock
+ * implementation, in which the internal state was protected by a
+ * spinlock. Unfortunately the overhead of taking the spinlock proved to be
+ * too high for workloads/locks that were locked in shared mode very
+ * frequently.
+ * Thus a new implementation was devised that provides wait-free shared lock
+ * acquiration for locks that aren't exclusively locked.
+ *
+ * The basic idea is to have a single atomic variable 'lockcount' instead of
+ * the formerly separate shared and exclusive counters and to use an atomic
+ * increment to acquire the lock. That's fairly easy to do for rw-spinlocks,
+ * but a lot harder for something like LWLocks that want to wait in the OS.
+ *
+ * For exlusive lock acquisition we use an atomic compare-and-exchange on the
+ * lockcount variable swapping in EXCLUSIVE_LOCK/1<<31/0x80000000 if and only
+ * if the current value of lockcount is 0. If the swap was not successfull, we
+ * have to wait.
+ *
+ * For shared lock acquisition we use an atomic add (lock xadd) to the
+ * lockcount variable to add 1. If the value is bigger than EXCLUSIVE_LOCK we
+ * know that somebody actually has an exclusive lock, and we back out by
+ * atomically decrementing by 1 again. If so, we have to wait for the exlusive
+ * locker to release the lock.
+ *
+ * To release the lock we use an atomic decrement to release the lock. If the
+ * new value is zero (we get that atomically), we know we have to release
+ * waiters.
+ *
+ * The attentive reader probably might have noticed that naively doing the
+ * above has two glaring race conditions:
+ *
+ * 1) too-quick-for-queueing: We try to lock using the atomic operations and
+ * notice that we have to wait. Unfortunately until we have finished queuing,
+ * the former locker very well might have already finished it's work. That's
+ * problematic because we're now stuck waiting inside the OS.
+ *
+ * 2) spurious failed locks: Due to the logic of backing out of shared
+ * locks after we unconditionally added a 1 to lockcount, we might have
+ * prevented another exclusive locker from getting the lock:
+ *   1) Session A: LWLockAcquire(LW_EXCLUSIVE) - success
+ *   2) Session B: LWLockAcquire(LW_SHARED) - lockcount += 1
+ *   3) Session B: LWLockAcquire(LW_SHARED) - oops, bigger than EXCLUSIVE_LOCK
+ *   4) Session A: LWLockRelease()
+ *   5) Session C: LWLockAcquire(LW_EXCLUSIVE) - check if lockcount = 0, no. wait.
+ *   6) Session B: LWLockAcquire(LW_SHARED) - lockcount -= 1
+ *   7) Session B: LWLockAcquire(LW_SHARED) - wait
+ *
+ * So we'd now have both B) and C) waiting on a lock that nobody is holding
+ * anymore. Not good.
+ *
+ * To mitigate those races we use a two phased attempt at locking:
+ *   Phase 1: * Try to do it atomically, if we succeed, nice
+ *   Phase 2: Add us too the waitqueue of the lock
+ *   Phase 3: Try to grab the lock again, if we succeed, remove ourselves from
+ *            the queue
+ *   Phase 4: Sleep till wakeup, goto Phase 1
+ *
+ * This protects us against both problems from above:
+ * 1) Nobody can release too quick, before we're queued, since after Phase 2 since we're
+ *    already queued.
+ * 2) If somebody spuriously got blocked from acquiring the lock, they will
+ *    get queued in Phase 2 and we can wake them up if neccessary or they will
+ *    have gotten the lock in Phase 2.
+ *
+ * There above algorithm only works for LWLockAcquire, not directly for
+ * LWLockAcquireConditional where we don't want to wait. In that case we just
+ * need to retry acquiring the lock until we're sure we didn't disturb anybody
+ * in doing so.
+ *
+ * TODO:
+ * - decide if we need a spinlock fallback
+ * - expand documentation
+ * - make LWLOCK_STATS do something sensible again
+ * - make LOCK_DEBUG output nicer
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
@@ -28,6 +104,7 @@
 #include "lib/ilist.h"
 #include "miscadmin.h"
 #include "pg_trace.h"
+#include "storage/atomics.h"
 #include "storage/ipc.h"
 #include "storage/predicate.h"
 #include "storage/proc.h"
@@ -37,15 +114,20 @@
 /* We use the ShmemLock spinlock to protect LWLockAssign */
 extern slock_t *ShmemLock;
 
+#define EXCLUSIVE_LOCK ((uint32) 1 << 31)
+/* must be greater than MAX_BACKENDS */
+#define SHARED_LOCK_MASK (~EXCLUSIVE_LOCK)
 
 typedef struct LWLock
 {
-	slock_t		mutex;			/* Protects LWLock and queue of PGPROCs */
-	bool		releaseOK;		/* T if ok to release waiters */
-	char		exclusive;		/* # of exclusive holders (0 or 1) */
-	int			shared;			/* # of shared holders (0..MaxBackends) */
-	dlist_head	waiters;
-	/* tail is undefined when head is NULL */
+	slock_t		mutex;		/* Protects LWLock and queue of PGPROCs */
+	bool		releaseOK;	/* T if ok to release waiters */
+	dlist_head	waiters;	/* list of waiters */
+	pg_atomic_uint32 lockcount;	/* state of exlusive/nonexclusive lockers */
+	pg_atomic_uint32 nwaiters; /* number of waiters */
+#ifdef LWLOCK_DEBUG
+	PGPROC	   *owner;
+#endif
 } LWLock;
 
 /*
@@ -60,7 +142,7 @@ typedef struct LWLock
  * LWLock is between 16 and 32 bytes on all known platforms, so these two
  * cases are sufficient.
  */
-#define LWLOCK_PADDED_SIZE	(sizeof(LWLock) <= 16 ? 16 : 32)
+#define LWLOCK_PADDED_SIZE	64
 
 typedef union LWLockPadded
 {
@@ -75,7 +157,6 @@ typedef union LWLockPadded
  */
 NON_EXEC_STATIC LWLockPadded *LWLockArray = NULL;
 
-
 /*
  * We use this structure to keep track of locked LWLocks for release
  * during error recovery.  The maximum size could be determined at runtime
@@ -84,8 +165,14 @@ NON_EXEC_STATIC LWLockPadded *LWLockArray = NULL;
  */
 #define MAX_SIMUL_LWLOCKS	100
 
+typedef struct LWLockHandle
+{
+	LWLockId lock;
+	LWLockMode mode;
+} LWLockHandle;
+
 static int	num_held_lwlocks = 0;
-static LWLockId held_lwlocks[MAX_SIMUL_LWLOCKS];
+static LWLockHandle held_lwlocks[MAX_SIMUL_LWLOCKS];
 
 static int	lock_addin_request = 0;
 static bool lock_addin_request_allowed = true;
@@ -102,24 +189,29 @@ static int *spin_delay_counts;
 bool		Trace_lwlocks = false;
 
 inline static void
-PRINT_LWDEBUG(const char *where, LWLockId lockid, const volatile LWLock *lock)
+PRINT_LWDEBUG(const char *where, LWLockId lockid, volatile LWLock *lock, LWLockMode mode)
 {
 	if (Trace_lwlocks)
-		elog(LOG, "%s(%d): excl %d shared %d rOK %d",
-			 where, (int) lockid,
-			 (int) lock->exclusive, lock->shared,
+	{
+		uint32 lockcount = pg_atomic_read_u32(&lock->lockcount);
+		elog(LOG, "%s(%d)%u: excl %u shared %u waiters %u rOK %d",
+			 where, (int) lockid, mode,
+			 (lockcount & EXCLUSIVE_LOCK) >> 31,
+			 (lockcount & SHARED_LOCK_MASK),
+			 pg_atomic_read_u32(&lock->nwaiters),
 			 (int) lock->releaseOK);
+	}
 }
 
 inline static void
-LOG_LWDEBUG(const char *where, LWLockId lockid, const char *msg)
+LOG_LWDEBUG(const char *where, LWLockId lockid, LWLockMode mode, const char *msg)
 {
 	if (Trace_lwlocks)
-		elog(LOG, "%s(%d): %s", where, (int) lockid, msg);
+		elog(LOG, "%s(%d)%u: %s", where, (int) lockid, mode, msg);
 }
 #else							/* not LOCK_DEBUG */
-#define PRINT_LWDEBUG(a,b,c)
-#define LOG_LWDEBUG(a,b,c)
+#define PRINT_LWDEBUG(a,b,c,d) ((void)0)
+#define LOG_LWDEBUG(a,b,c,d) ((void)0)
 #endif   /* LOCK_DEBUG */
 
 #ifdef LWLOCK_STATS
@@ -285,8 +377,8 @@ CreateLWLocks(void)
 	{
 		SpinLockInit(&lock->lock.mutex);
 		lock->lock.releaseOK = true;
-		lock->lock.exclusive = 0;
-		lock->lock.shared = 0;
+		pg_atomic_init_u32(&lock->lock.lockcount, 0);
+		pg_atomic_init_u32(&lock->lock.nwaiters, 0);
 		dlist_init(&lock->lock.waiters);
 	}
 
@@ -328,6 +420,231 @@ LWLockAssign(void)
 	return result;
 }
 
+/*
+ * Internal function handling the atomic manipulation of lock->lockcount.
+ *
+ * 'double_check' = true means that we try to check more carefully
+ * against causing somebody else to spuriously believe the lock is
+ * already taken, although we're just about to back out of it.
+ */
+static inline bool
+LWLockAttemptLock(volatile LWLock* lock, LWLockMode mode, bool double_check, bool *potentially_spurious)
+{
+	bool mustwait;
+	uint32 oldstate;
+
+	Assert(mode == LW_EXCLUSIVE || mode == LW_SHARED);
+
+	*potentially_spurious = false;
+
+	if (mode == LW_EXCLUSIVE)
+	{
+		uint32 expected = 0;
+		pg_read_barrier();
+
+		/* check without CAS first; it's way cheaper, frequently locked otherwise */
+		if (pg_atomic_read_u32(&lock->lockcount) != 0)
+			mustwait = true;
+		else if (!pg_atomic_compare_exchange_u32(&lock->lockcount,
+												 &expected, EXCLUSIVE_LOCK))
+		{
+			/*
+			 * ok, no can do. Between the pg_atomic_read() above and the
+			 * CAS somebody else acquired the lock.
+			 */
+			mustwait = true;
+		}
+		else
+		{
+			/* yipeyyahee */
+			mustwait = false;
+#ifdef LWLOCK_DEBUG
+			lock->owner = MyProc;
+#endif
+		}
+	}
+	else
+	{
+		/*
+		 * If requested by caller, do an unlocked check first.  This is useful
+		 * if potentially spurious results have a noticeable cost.
+		 */
+		if (double_check)
+		{
+			pg_read_barrier();
+			if (pg_atomic_read_u32(&lock->lockcount) >= EXCLUSIVE_LOCK)
+			{
+				mustwait = true;
+				goto out;
+			}
+		}
+
+		/*
+		 * Acquire the share lock unconditionally using an atomic addition. We
+		 * might have to back out again if it turns out somebody else has an
+		 * exclusive lock.
+		 */
+		oldstate = pg_atomic_fetch_add_u32(&lock->lockcount, 1);
+
+		if (oldstate >= EXCLUSIVE_LOCK)
+		{
+			/*
+			 * Ok, somebody else holds the lock exclusively. We need to back
+			 * away from the shared lock, since we don't actually hold it right
+			 * now.  Since there's a window between lockcount += 1 and lockcount
+			 * -= 1, the previous exclusive locker could have released and
+			 * another exclusive locker could have seen our +1. We need to
+			 * signal that to the upper layers so they can deal with the race
+			 * condition.
+			 */
+
+			/*
+			 * FIXME: check return value if (double_check), it's not
+			 * spurious if still exclusively locked.
+			 */
+			pg_atomic_fetch_sub_u32(&lock->lockcount, 1);
+
+
+			mustwait = true;
+			*potentially_spurious = true;
+		}
+		else
+		{
+			/* yipeyyahee */
+			mustwait = false;
+		}
+	}
+
+out:
+	return mustwait;
+}
+
+/*
+ * Wakeup all the lockers that currently have a chance to run.
+ */
+static void
+LWLockWakeup(volatile LWLock *lock, LWLockId lockid, LWLockMode mode)
+{
+	bool		releaseOK;
+	bool		wokeup_somebody = false;
+	dlist_head	wakeup;
+	dlist_mutable_iter iter;
+
+	dlist_init(&wakeup);
+
+	/* remove the to-be-awakened PGPROCs from the queue */
+	releaseOK = true;
+
+	/* Acquire mutex.  Time spent holding mutex should be short! */
+	SpinLockAcquire(&lock->mutex);
+
+	dlist_foreach_modify(iter, (dlist_head *) &lock->waiters)
+	{
+		PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
+
+		if (wokeup_somebody && waiter->lwWaitMode == LW_EXCLUSIVE)
+			continue;
+
+		dlist_delete(&waiter->lwWaitLink);
+		dlist_push_tail(&wakeup, &waiter->lwWaitLink);
+
+		if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
+		{
+			/*
+			 * Prevent additional wakeups until retryer gets to run. Backends
+			 * that are just waiting for the lock to become free don't retry
+			 * automatically.
+			 */
+			releaseOK = false;
+			/*
+			 * Don't wakeup (further) exclusive locks.
+			 */
+			wokeup_somebody = true;
+		}
+
+		/*
+		 * Once we've woken up an exclusive lock, there's no point in waking
+		 * up anybody else.
+		 */
+		if(waiter->lwWaitMode == LW_EXCLUSIVE)
+			break;
+	}
+	lock->releaseOK = releaseOK;
+
+	/* We are done updating shared state of the lock queue. */
+	SpinLockRelease(&lock->mutex);
+
+	/*
+	 * Awaken any waiters I removed from the queue.
+	 */
+	dlist_foreach_modify(iter, (dlist_head *) &wakeup)
+	{
+		PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
+		LOG_LWDEBUG("LWLockRelease", lockid, mode, "release waiter");
+		dlist_delete(&waiter->lwWaitLink);
+		pg_write_barrier();
+		waiter->lwWaiting = false;
+		PGSemaphoreUnlock(&waiter->sem);
+	}
+}
+
+/*
+ * Add ourselves to the end of the queue. Mode can be LW_WAIT_UNTIL_FREE here!
+ */
+static inline void
+LWLockQueueSelf(volatile LWLock *lock, LWLockMode mode)
+{
+	/*
+	 * If we don't have a PGPROC structure, there's no way to wait. This
+	 * should never occur, since MyProc should only be null during shared
+	 * memory initialization.
+	 */
+	if (MyProc == NULL)
+		elog(PANIC, "cannot wait without a PGPROC structure");
+
+	pg_atomic_fetch_add_u32(&lock->nwaiters, 1);
+
+	SpinLockAcquire(&lock->mutex);
+	MyProc->lwWaiting = true;
+	MyProc->lwWaitMode = mode;
+	dlist_push_tail((dlist_head *) &lock->waiters, &MyProc->lwWaitLink);
+
+	/* Can release the mutex now */
+	SpinLockRelease(&lock->mutex);
+}
+
+/*
+ * Remove ourselves from the waitlist.  This is used if we queued ourselves
+ * because we thought we needed to sleep but, after further checking, we
+ * discover that we don't actually need to do so. Somebody else might have
+ * already woken us up, in that case return false.
+ */
+static inline bool
+LWLockDequeueSelf(volatile LWLock *lock)
+{
+	bool	found = false;
+	dlist_mutable_iter iter;
+
+	SpinLockAcquire(&lock->mutex);
+
+	/* need to iterate, somebody else could have unqueued us */
+	dlist_foreach_modify(iter, (dlist_head *) &lock->waiters)
+	{
+		PGPROC *proc = dlist_container(PGPROC, lwWaitLink, iter.cur);
+		if (proc == MyProc)
+		{
+			found = true;
+			dlist_delete(&proc->lwWaitLink);
+			break;
+		}
+	}
+
+	SpinLockRelease(&lock->mutex);
+
+	if (found)
+		pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
+	return found;
+}
 
 /*
  * LWLockAcquire - acquire a lightweight lock in the specified mode
@@ -341,10 +658,13 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode)
 {
 	volatile LWLock *lock = &(LWLockArray[lockid].lock);
 	PGPROC	   *proc = MyProc;
-	bool		retry = false;
 	int			extraWaits = 0;
+	bool		potentially_spurious;
+	uint32		iterations = 0;
 
-	PRINT_LWDEBUG("LWLockAcquire", lockid, lock);
+	AssertArg(mode == LW_SHARED || mode == LW_EXCLUSIVE);
+
+	PRINT_LWDEBUG("LWLockAcquire", lockid, lock, mode);
 
 #ifdef LWLOCK_STATS
 	/* Set up local count state first time through in a given process */
@@ -395,58 +715,71 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode)
 	{
 		bool		mustwait;
 
-		/* Acquire mutex.  Time spent holding mutex should be short! */
 #ifdef LWLOCK_STATS
+		/* Acquire mutex.  Time spent holding mutex should be short! */
+		/* FIXME this stuff is completely useless now.  Should consider a
+		 * different way to do accounting -- perhaps at LWLockAttemptLock? */
 		spin_delay_counts[lockid] += SpinLockAcquire(&lock->mutex);
-#else
-		SpinLockAcquire(&lock->mutex);
 #endif
 
-		/* If retrying, allow LWLockRelease to release waiters again */
-		if (retry)
-			lock->releaseOK = true;
+		/*
+		 * try to grab the lock the first time, we're not in the waitqueue yet.
+		 */
+		mustwait = LWLockAttemptLock(lock, mode, false, &potentially_spurious);
+
+		if (!mustwait)
+			break;				/* got the lock */
+
+		/*
+		 * Ok, at this point we couldn't grab the lock on the first try. We
+		 * cannot simply queue ourselves to the end of the list and wait to be
+		 * woken up because by now the lock could long have been released.
+		 * Instead add us to the queue and try to grab the lock again. If we
+		 * suceed we need to revert the queuing and be happy, otherwise we
+		 * recheck the lock. If we still couldn't grab it, we know that the
+		 * other lock will see our queue entries when releasing since they
+		 * existed before we checked for the lock.
+		 */
+
+		/* add to the queue */
+		LWLockQueueSelf(lock, mode);
 
-		/* If I can get the lock, do so quickly. */
-		if (mode == LW_EXCLUSIVE)
+		/* we're now guaranteed to be woken up if necessary */
+		mustwait = LWLockAttemptLock(lock, mode, false, &potentially_spurious);
+
+		/* ok, grabbed the lock the second time round, need to undo queueing */
+		if (!mustwait)
 		{
-			if (lock->exclusive == 0 && lock->shared == 0)
+			if (!LWLockDequeueSelf(lock))
 			{
-				lock->exclusive++;
-				mustwait = false;
+				/*
+				 * Somebody else dequeued us and has or will wake us up. Wait
+				 * for the correct wakeup, otherwise our ->lwWaiting would get
+				 * reset at some inconvenient point later, and releaseOk
+				 * wouldn't be managed correctly.
+				 */
+				for (;;)
+				{
+					PGSemaphoreLock(&proc->sem, false);
+					if (!proc->lwWaiting)
+						break;
+					extraWaits++;
+				}
+				lock->releaseOK = true;
 			}
-			else
-				mustwait = true;
+			PRINT_LWDEBUG("LWLockAcquire undo queue", lockid, lock, mode);
+			break;
 		}
 		else
 		{
-			if (lock->exclusive == 0)
-			{
-				lock->shared++;
-				mustwait = false;
-			}
-			else
-				mustwait = true;
+			PRINT_LWDEBUG("LWLockAcquire waiting 4", lockid, lock, mode);
 		}
 
-		if (!mustwait)
-			break;				/* got the lock */
-
 		/*
-		 * Add myself to wait queue.
-		 *
-		 * If we don't have a PGPROC structure, there's no way to wait. This
-		 * should never occur, since MyProc should only be null during shared
-		 * memory initialization.
+		 * NB: There's no need to deal with spurious lock attempts
+		 * here. Anyone we prevented from acquiring the lock will
+		 * enqueue themselves using the same protocol we used here.
 		 */
-		if (proc == NULL)
-			elog(PANIC, "cannot wait without a PGPROC structure");
-
-		proc->lwWaiting = true;
-		proc->lwWaitMode = mode;
-		dlist_push_head((dlist_head *) &lock->waiters, &proc->lwWaitLink);
-
-		/* Can release the mutex now */
-		SpinLockRelease(&lock->mutex);
 
 		/*
 		 * Wait until awakened.
@@ -460,7 +793,7 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode)
 		 * so that the lock manager or signal manager will see the received
 		 * signal when it next waits.
 		 */
-		LOG_LWDEBUG("LWLockAcquire", lockid, "waiting");
+		LOG_LWDEBUG("LWLockAcquire", lockid, mode, "waiting");
 
 #ifdef LWLOCK_STATS
 		block_counts[lockid]++;
@@ -476,22 +809,22 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode)
 				break;
 			extraWaits++;
 		}
+		lock->releaseOK = true;
 
 		TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(lockid, mode);
 
-		LOG_LWDEBUG("LWLockAcquire", lockid, "awakened");
+		LOG_LWDEBUG("LWLockAcquire", lockid, mode, "awakened");
 
 		/* Now loop back and try to acquire lock again. */
-		retry = true;
+		pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
+		iterations++;
 	}
 
-	/* We are done updating shared state of the lock itself. */
-	SpinLockRelease(&lock->mutex);
-
 	TRACE_POSTGRESQL_LWLOCK_ACQUIRE(lockid, mode);
 
 	/* Add lock to list of locks held by this backend */
-	held_lwlocks[num_held_lwlocks++] = lockid;
+	held_lwlocks[num_held_lwlocks].lock = lockid;
+	held_lwlocks[num_held_lwlocks++].mode = mode;
 
 	/*
 	 * Fix the process wait semaphore's count for any absorbed wakeups.
@@ -512,8 +845,11 @@ LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode)
 {
 	volatile LWLock *lock = &(LWLockArray[lockid].lock);
 	bool		mustwait;
+	bool		potentially_spurious;
+
+	AssertArg(mode == LW_SHARED || mode == LW_EXCLUSIVE);
 
-	PRINT_LWDEBUG("LWLockConditionalAcquire", lockid, lock);
+	PRINT_LWDEBUG("LWLockConditionalAcquire", lockid, lock, mode);
 
 	/* Ensure we will have room to remember the lock */
 	if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
@@ -526,48 +862,42 @@ LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode)
 	 */
 	HOLD_INTERRUPTS();
 
-	/* Acquire mutex.  Time spent holding mutex should be short! */
-	SpinLockAcquire(&lock->mutex);
-
-	/* If I can get the lock, do so quickly. */
-	if (mode == LW_EXCLUSIVE)
-	{
-		if (lock->exclusive == 0 && lock->shared == 0)
-		{
-			lock->exclusive++;
-			mustwait = false;
-		}
-		else
-			mustwait = true;
-	}
-	else
-	{
-		if (lock->exclusive == 0)
-		{
-			lock->shared++;
-			mustwait = false;
-		}
-		else
-			mustwait = true;
-	}
-
-	/* We are done updating shared state of the lock itself. */
-	SpinLockRelease(&lock->mutex);
+retry:
+	/*
+	 * passing 'true' to check more carefully to avoid potential
+	 * spurious acquisitions
+	 */
+	mustwait = LWLockAttemptLock(lock, mode, true, &potentially_spurious);
 
 	if (mustwait)
 	{
 		/* Failed to get lock, so release interrupt holdoff */
 		RESUME_INTERRUPTS();
-		LOG_LWDEBUG("LWLockConditionalAcquire", lockid, "failed");
+		LOG_LWDEBUG("LWLockConditionalAcquire", lockid, mode, "failed");
 		TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL(lockid, mode);
+
+		/*
+		 * We ran into an exclusive lock and might have blocked another
+		 * exclusive lock from taking a shot because it took a time to back
+		 * off. Retry till we are either sure we didn't block somebody (because
+		 * somebody else certainly has the lock) or till we got it.
+		 *
+		 * We cannot rely on the two-step lock-acquisition protocol as in
+		 * LWLockAcquire because we're not using it.
+		 */
+		if (potentially_spurious)
+		{
+			SPIN_DELAY();
+			goto retry;
+		}
 	}
 	else
 	{
 		/* Add lock to list of locks held by this backend */
-		held_lwlocks[num_held_lwlocks++] = lockid;
+		held_lwlocks[num_held_lwlocks].lock = lockid;
+		held_lwlocks[num_held_lwlocks++].mode = mode;
 		TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE(lockid, mode);
 	}
-
 	return !mustwait;
 }
 
@@ -592,8 +922,11 @@ LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode)
 	PGPROC	   *proc = MyProc;
 	bool		mustwait;
 	int			extraWaits = 0;
+	bool		potentially_spurious_first;
+	bool		potentially_spurious_second;
 
-	PRINT_LWDEBUG("LWLockAcquireOrWait", lockid, lock);
+	Assert(mode == LW_SHARED || mode == LW_EXCLUSIVE);
+	PRINT_LWDEBUG("LWLockAcquireOrWait", lockid, lock, mode);
 
 #ifdef LWLOCK_STATS
 	/* Set up local count state first time through in a given process */
@@ -612,79 +945,64 @@ LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode)
 	 */
 	HOLD_INTERRUPTS();
 
-	/* Acquire mutex.  Time spent holding mutex should be short! */
-	SpinLockAcquire(&lock->mutex);
-
-	/* If I can get the lock, do so quickly. */
-	if (mode == LW_EXCLUSIVE)
-	{
-		if (lock->exclusive == 0 && lock->shared == 0)
-		{
-			lock->exclusive++;
-			mustwait = false;
-		}
-		else
-			mustwait = true;
-	}
-	else
-	{
-		if (lock->exclusive == 0)
-		{
-			lock->shared++;
-			mustwait = false;
-		}
-		else
-			mustwait = true;
-	}
+	/*
+	 * NB: We're using nearly the same twice-in-a-row lock acquisition
+	 * protocol as LWLockAcquire(). Check its comments for details.
+	 */
+	mustwait = LWLockAttemptLock(lock, mode, false, &potentially_spurious_first);
 
 	if (mustwait)
 	{
-		/*
-		 * Add myself to wait queue.
-		 *
-		 * If we don't have a PGPROC structure, there's no way to wait.  This
-		 * should never occur, since MyProc should only be null during shared
-		 * memory initialization.
-		 */
-		if (proc == NULL)
-			elog(PANIC, "cannot wait without a PGPROC structure");
+		LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE);
 
-		proc->lwWaiting = true;
-		proc->lwWaitMode = LW_WAIT_UNTIL_FREE;
-		dlist_push_head((dlist_head *) &lock->waiters, &proc->lwWaitLink);
+		mustwait = LWLockAttemptLock(lock, mode, false, &potentially_spurious_second);
 
-		/* Can release the mutex now */
-		SpinLockRelease(&lock->mutex);
-
-		/*
-		 * Wait until awakened.  Like in LWLockAcquire, be prepared for bogus
-		 * wakups, because we share the semaphore with ProcWaitForSignal.
-		 */
-		LOG_LWDEBUG("LWLockAcquireOrWait", lockid, "waiting");
+		if (mustwait)
+		{
+			/*
+			 * Wait until awakened.  Like in LWLockAcquire, be prepared for bogus
+			 * wakups, because we share the semaphore with ProcWaitForSignal.
+			 */
+			LOG_LWDEBUG("LWLockAcquireOrWait", lockid, mode, "waiting");
 
 #ifdef LWLOCK_STATS
-		block_counts[lockid]++;
+			block_counts[lockid]++;
 #endif
+			TRACE_POSTGRESQL_LWLOCK_WAIT_START(lockid, mode);
 
-		TRACE_POSTGRESQL_LWLOCK_WAIT_START(lockid, mode);
+			for (;;)
+			{
+				/* "false" means cannot accept cancel/die interrupt here. */
+				PGSemaphoreLock(&proc->sem, false);
+				if (!proc->lwWaiting)
+					break;
+				extraWaits++;
+			}
+			pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
 
-		for (;;)
-		{
-			/* "false" means cannot accept cancel/die interrupt here. */
-			PGSemaphoreLock(&proc->sem, false);
-			if (!proc->lwWaiting)
-				break;
-			extraWaits++;
-		}
+			TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(lockid, mode);
 
-		TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(lockid, mode);
+			LOG_LWDEBUG("LWLockAcquireOrWait", lockid, mode, "awakened");
+		}
+		else
+		{
+			/* got lock in the second attempt, undo queueing */
+			if (!LWLockDequeueSelf(lock))
+			{
+				for (;;)
+				{
+					PGSemaphoreLock(&proc->sem, false);
+					if (!proc->lwWaiting)
+						break;
+					extraWaits++;
+				}
+			}
 
-		LOG_LWDEBUG("LWLockAcquireOrWait", lockid, "awakened");
-	}
-	else
-	{
-		/* We are done updating shared state of the lock itself. */
-		SpinLockRelease(&lock->mutex);
+			/* FIXME: don't need that anymore? */
+#if 0
+			LWLockWakeup(lock, lockid, mode);
+#endif
+		}
 	}
 
 	/*
@@ -697,13 +1015,15 @@ LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode)
 	{
 		/* Failed to get lock, so release interrupt holdoff */
 		RESUME_INTERRUPTS();
-		LOG_LWDEBUG("LWLockAcquireOrWait", lockid, "failed");
+		LOG_LWDEBUG("LWLockAcquireOrWait", lockid, mode, "failed");
 		TRACE_POSTGRESQL_LWLOCK_WAIT_UNTIL_FREE_FAIL(lockid, mode);
 	}
 	else
 	{
+		LOG_LWDEBUG("LWLockAcquireOrWait", lockid, mode, "suceeded");
 		/* Add lock to list of locks held by this backend */
-		held_lwlocks[num_held_lwlocks++] = lockid;
+		held_lwlocks[num_held_lwlocks].lock = lockid;
+		held_lwlocks[num_held_lwlocks++].mode = mode;
 		TRACE_POSTGRESQL_LWLOCK_WAIT_UNTIL_FREE(lockid, mode);
 	}
 
@@ -717,13 +1037,11 @@ void
 LWLockRelease(LWLockId lockid)
 {
 	volatile LWLock *lock = &(LWLockArray[lockid].lock);
-	dlist_head	wakeup;
-	dlist_mutable_iter iter;
 	int			i;
-
-	dlist_init(&wakeup);
-
-	PRINT_LWDEBUG("LWLockRelease", lockid, lock);
+	LWLockMode	mode;
+	uint32		lockcount;
+	bool		check_waiters;
+	bool		have_waiters = false;
 
 	/*
 	 * Remove lock from list of locks held.  Usually, but not always, it will
@@ -731,8 +1049,11 @@ LWLockRelease(LWLockId lockid)
 	 */
 	for (i = num_held_lwlocks; --i >= 0;)
 	{
-		if (lockid == held_lwlocks[i])
+		if (lockid == held_lwlocks[i].lock)
+		{
+			mode = held_lwlocks[i].mode;
 			break;
+		}
 	}
 	if (i < 0)
 		elog(ERROR, "lock %d is not held", (int) lockid);
@@ -740,78 +1061,50 @@ LWLockRelease(LWLockId lockid)
 	for (; i < num_held_lwlocks; i++)
 		held_lwlocks[i] = held_lwlocks[i + 1];
 
-	/* Acquire mutex.  Time spent holding mutex should be short! */
-	SpinLockAcquire(&lock->mutex);
+	PRINT_LWDEBUG("LWLockRelease", lockid, lock, mode);
 
-	/* Release my hold on lock */
-	if (lock->exclusive > 0)
-		lock->exclusive--;
+	pg_read_barrier();
+
+	/* Release my hold on lock, both are a full barrier */
+	if (mode == LW_EXCLUSIVE)
+		lockcount = pg_atomic_sub_fetch_u32(&lock->lockcount, EXCLUSIVE_LOCK);
 	else
-	{
-		Assert(lock->shared > 0);
-		lock->shared--;
-	}
+		lockcount = pg_atomic_sub_fetch_u32(&lock->lockcount, 1);
+
+	/* nobody else can have that kind of lock */
+	Assert(lockcount < EXCLUSIVE_LOCK);
 
 	/*
-	 * See if I need to awaken any waiters.  If I released a non-last shared
-	 * hold, there cannot be anything to do.  Also, do not awaken any waiters
-	 * if someone has already awakened waiters that haven't yet acquired the
-	 * lock.
+	 * Anybody we need to wakeup needs to have started queueing before
+	 * we removed ourselves from the queue and the __sync_ operations
+	 * above are full barriers.
 	 */
-	if (lock->exclusive == 0 && lock->shared == 0 && lock->releaseOK)
-	{
-		/*
-		 * Remove the to-be-awakened PGPROCs from the queue.
-		 */
-		bool		releaseOK = true;
-		bool		wokeup_somebody = false;
-
-		dlist_foreach_modify(iter, (dlist_head *) &lock->waiters)
-		{
-			PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
-
-			if (wokeup_somebody && waiter->lwWaitMode == LW_EXCLUSIVE)
-				continue;
-
-			dlist_delete(&waiter->lwWaitLink);
-			dlist_push_tail(&wakeup, &waiter->lwWaitLink);
 
-			/*
-			 * Prevent additional wakeups until retryer gets to
-			 * run. Backends that are just waiting for the lock to become
-			 * free don't retry automatically.
-			 */
-			if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
-			{
-				releaseOK = false;
-				wokeup_somebody = true;
-			}
+	if (pg_atomic_read_u32(&lock->nwaiters) > 0)
+		have_waiters = true;
+
+	/* we're still waiting for backends to get scheduled, don't release again */
+	if (!lock->releaseOK)
+		check_waiters = false;
+	/* grant permission to run, even if a spurious share lock increases lockcount */
+	else if (mode == LW_EXCLUSIVE && have_waiters)
+		check_waiters = true;
+	/* nobody has this locked anymore, potential exclusive lockers get a chance */
+	else if (lockcount == 0 && have_waiters)
+		check_waiters = true;
+	/* nobody queued or not free */
+	else
+		check_waiters = false;
 
-			if(waiter->lwWaitMode == LW_EXCLUSIVE)
-				break;
-		}
-		lock->releaseOK = releaseOK;
+	if (check_waiters)
+	{
+		PRINT_LWDEBUG("LWLockRelease releasing", lockid, lock, mode);
+		LWLockWakeup(lock, lockid, mode);
 	}
 
-	/* We are done updating shared state of the lock itself. */
-	SpinLockRelease(&lock->mutex);
-
 	TRACE_POSTGRESQL_LWLOCK_RELEASE(lockid);
 
 	/*
-	 * Awaken any waiters I removed from the queue.
-	 */
-	dlist_foreach_modify(iter, (dlist_head *) &wakeup)
-	{
-		PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
-		LOG_LWDEBUG("LWLockRelease", lockid, "release waiter");
-		dlist_delete(&waiter->lwWaitLink);
-		pg_write_barrier();
-		waiter->lwWaiting = false;
-		PGSemaphoreUnlock(&waiter->sem);
-	}
-
-	/*
 	 * Now okay to allow cancel/die interrupts.
 	 */
 	RESUME_INTERRUPTS();
@@ -834,7 +1127,7 @@ LWLockReleaseAll(void)
 	{
 		HOLD_INTERRUPTS();		/* match the upcoming RESUME_INTERRUPTS */
 
-		LWLockRelease(held_lwlocks[num_held_lwlocks - 1]);
+		LWLockRelease(held_lwlocks[num_held_lwlocks - 1].lock);
 	}
 }
 
@@ -842,8 +1135,7 @@ LWLockReleaseAll(void)
 /*
  * LWLockHeldByMe - test whether my process currently holds a lock
  *
- * This is meant as debug support only.  We do not distinguish whether the
- * lock is held shared or exclusive.
+ * This is meant as debug support only.
  */
 bool
 LWLockHeldByMe(LWLockId lockid)
@@ -852,7 +1144,7 @@ LWLockHeldByMe(LWLockId lockid)
 
 	for (i = 0; i < num_held_lwlocks; i++)
 	{
-		if (held_lwlocks[i] == lockid)
+		if (held_lwlocks[i].lock == lockid)
 			return true;
 	}
 	return false;
-- 
1.8.3.251.g1462b67

