>From 8189689b71e5cd8129587345aa6e7b6e0231c6f8 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Tue, 24 Sep 2013 03:04:44 +0200
Subject: [PATCH] Wait free LW_SHARED lwlock acquiration

---
 src/backend/access/transam/xlog.c |   9 +-
 src/backend/storage/lmgr/lwlock.c | 863 +++++++++++++++++++++++++++-----------
 2 files changed, 617 insertions(+), 255 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index fc495d6..07e510b 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -1620,7 +1620,8 @@ WALInsertSlotAcquireOne(int slotno)
 			break;				/* got the lock */
 
 		Assert(slot->owner != MyProc);
-
+		Assert(!proc->lwWaiting);
+		Assert(proc->lwWaitLink == NULL);
 		/*
 		 * Add myself to wait queue.
 		 */
@@ -1630,7 +1631,10 @@ WALInsertSlotAcquireOne(int slotno)
 		if (slot->head == NULL)
 			slot->head = proc;
 		else
+		{
+			Assert(slot->tail->lwWaitLink == NULL);
 			slot->tail->lwWaitLink = proc;
+		}
 		slot->tail = proc;
 
 		/* Can release the mutex now */
@@ -1741,6 +1745,9 @@ WaitOnSlot(volatile XLogInsertSlot *slot, XLogRecPtr waitptr)
 
 		Assert(slot->owner != MyProc);
 
+		Assert(!proc->lwWaiting);
+		Assert(proc->lwWaitLink == NULL);
+
 		/*
 		 * Add myself to wait queue.
 		 */
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index 4f88d3f..c7fffec 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -17,6 +17,14 @@
  * IDENTIFICATION
  *	  src/backend/storage/lmgr/lwlock.c
  *
+ * TODO:
+ * - convert lwWaitLink to ilist.h slist or even dlist.
+ * - remove LWLockWakeup dealing with MyProc
+ * - revive pure-spinlock implementation
+ * - abstract away atomic ops, we really only need a few.
+ *   - CAS
+ *   - LOCK XADD
+ * - overhaul the mask offsets, make SHARED/EXCLUSIVE_LOCK_MASK wider, MAX_BACKENDS
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
@@ -27,22 +35,29 @@
 #include "commands/async.h"
 #include "miscadmin.h"
 #include "pg_trace.h"
+#include "storage/barrier.h"
 #include "storage/ipc.h"
 #include "storage/predicate.h"
 #include "storage/proc.h"
 #include "storage/spin.h"
 
-
 /* We use the ShmemLock spinlock to protect LWLockAssign */
 extern slock_t *ShmemLock;
 
+#define EXCLUSIVE_LOCK_FINALIZED ((uint32)1<<31)
+#define EXCLUSIVE_LOCK_MASK (~(uint32)((1<<16)-1))
+/* XXX: must be bigger than MAX_BACKENDS */
+#define SHARED_LOCK_MASK ((uint32)((1<<16)-1))
 
 typedef struct LWLock
 {
 	slock_t		mutex;			/* Protects LWLock and queue of PGPROCs */
+	uint32		lockcount;		/* magic */
 	bool		releaseOK;		/* T if ok to release waiters */
-	char		exclusive;		/* # of exclusive holders (0 or 1) */
-	int			shared;			/* # of shared holders (0..MaxBackends) */
+	uint32		waiters;
+#ifdef LWLOCK_DEBUG
+	PGPROC	   *owner;
+#endif
 	PGPROC	   *head;			/* head of list of waiting PGPROCs */
 	PGPROC	   *tail;			/* tail of list of waiting PGPROCs */
 	/* tail is undefined when head is NULL */
@@ -60,7 +75,7 @@ typedef struct LWLock
  * LWLock is between 16 and 32 bytes on all known platforms, so these two
  * cases are sufficient.
  */
-#define LWLOCK_PADDED_SIZE	(sizeof(LWLock) <= 16 ? 16 : 32)
+#define LWLOCK_PADDED_SIZE	64
 
 typedef union LWLockPadded
 {
@@ -75,7 +90,6 @@ typedef union LWLockPadded
  */
 NON_EXEC_STATIC LWLockPadded *LWLockArray = NULL;
 
-
 /*
  * We use this structure to keep track of locked LWLocks for release
  * during error recovery.  The maximum size could be determined at runtime
@@ -84,12 +98,21 @@ NON_EXEC_STATIC LWLockPadded *LWLockArray = NULL;
  */
 #define MAX_SIMUL_LWLOCKS	100
 
+struct LWLockHandle
+{
+	LWLockId lock;
+	LWLockMode mode;
+};
+
 static int	num_held_lwlocks = 0;
-static LWLockId held_lwlocks[MAX_SIMUL_LWLOCKS];
+static struct LWLockHandle held_lwlocks[MAX_SIMUL_LWLOCKS];
 
 static int	lock_addin_request = 0;
 static bool lock_addin_request_allowed = true;
 
+/* read value from memory, does *not* have any barrier semantics */
+#define pg_atomic_read(atomic)	(*(volatile uint32 *)&(atomic))
+
 #ifdef LWLOCK_STATS
 static int	counts_for_pid = 0;
 static int *sh_acquire_counts;
@@ -102,24 +125,27 @@ static int *spin_delay_counts;
 bool		Trace_lwlocks = false;
 
 inline static void
-PRINT_LWDEBUG(const char *where, LWLockId lockid, const volatile LWLock *lock)
+PRINT_LWDEBUG(const char *where, LWLockId lockid, const volatile LWLock *lock, LWLockMode mode)
 {
 	if (Trace_lwlocks)
-		elog(LOG, "%s(%d): excl %d shared %d head %p rOK %d",
-			 where, (int) lockid,
-			 (int) lock->exclusive, lock->shared, lock->head,
+		elog(LOG, "%s(%d)%u: excl %u shared %u head %p waiters %u rOK %d",
+		     where, (int) lockid, mode,
+		     (lock->lockcount & EXCLUSIVE_LOCK_MASK) >> 31,
+		     lock->lockcount & SHARED_LOCK_MASK,
+		     lock->head,
+		     lock->waiters,
 			 (int) lock->releaseOK);
 }
 
 inline static void
-LOG_LWDEBUG(const char *where, LWLockId lockid, const char *msg)
+LOG_LWDEBUG(const char *where, LWLockId lockid, LWLockMode mode, const char *msg)
 {
 	if (Trace_lwlocks)
-		elog(LOG, "%s(%d): %s", where, (int) lockid, msg);
+		elog(LOG, "%s(%d)%u: %s", where, (int) lockid, mode, msg);
 }
 #else							/* not LOCK_DEBUG */
-#define PRINT_LWDEBUG(a,b,c)
-#define LOG_LWDEBUG(a,b,c)
+#define PRINT_LWDEBUG(a,b,c,d) (void)0
+#define LOG_LWDEBUG(a,b,c,d) (void)0
 #endif   /* LOCK_DEBUG */
 
 #ifdef LWLOCK_STATS
@@ -285,8 +311,8 @@ CreateLWLocks(void)
 	{
 		SpinLockInit(&lock->lock.mutex);
 		lock->lock.releaseOK = true;
-		lock->lock.exclusive = 0;
-		lock->lock.shared = 0;
+		lock->lock.lockcount = 0;
+		lock->lock.waiters = 0;
 		lock->lock.head = NULL;
 		lock->lock.tail = NULL;
 	}
@@ -329,6 +355,364 @@ LWLockAssign(void)
 	return result;
 }
 
+/*
+ * Internal function handling the atomic manipulation of lock->lockcount.
+ *
+ * 'double_check' = true means that we try to check more carefully
+ * against causing somebody else to spuriously believe the lock is
+ * already taken, although we're just about to back out of it.
+ */
+static inline bool
+LWLockAttemptLock(volatile LWLock* lock, LWLockMode mode, bool double_check, bool *potentially_spurious)
+{
+	bool mustwait;
+	uint32 oldstate;
+
+	if (mode == LW_EXCLUSIVE)
+	{
+		pg_read_barrier();
+		/* check without CAS, way cheaper, frequently locked otherwise */
+		if (pg_atomic_read(lock->lockcount) != 0)
+			mustwait = true;
+		/*
+		 * ok, no can do. Between the pg_atomic_read() above and the
+		 * CAS somebody else acquired the lock.
+		 */
+		else if (__sync_val_compare_and_swap(&lock->lockcount,
+		                                     0, EXCLUSIVE_LOCK_FINALIZED) != 0)
+		{
+			mustwait = true;
+		}
+		/* yipeyyahee */
+		else
+		{
+			mustwait = false;
+#ifdef LWLOCK_DEBUG
+			lock->owner = MyProc;
+#endif
+		}
+	}
+	else
+	{
+		/*
+		 * Do an unlocked check first, useful if potentially spurious
+		 * results have a noticeable cost
+		 */
+		if (double_check)
+		{
+			pg_read_barrier();
+			if (pg_atomic_read(lock->lockcount) >= EXCLUSIVE_LOCK_FINALIZED)
+			{
+				mustwait = true;
+				goto out;
+			}
+		}
+
+		/*
+		 * Acquire the share lock unconditionally using an atomic
+		 * addition. We might have to back out again if it turns out
+		 * somebody else has an exclusive lock.
+		 */
+		oldstate = __sync_fetch_and_add(&lock->lockcount, 1);
+
+		/*
+		 * ok, somebody else holds the lock exclusively. We need to
+		 * back away from the shared lock, since we don't actually
+		 * hold it right now. Since there's a window between lockcount
+		 * += 1 and logcount -= 1 the previous exclusive locker could
+		 * have released and another exclusive locker could have seen
+		 * our +1. We need to signal that to the upper layers so they
+		 * can deal with the race condition.
+		 */
+		if (oldstate >= EXCLUSIVE_LOCK_FINALIZED)
+		{
+			/*
+			 * FIXME: check return value if (double_check), it's not
+			 * spurious if still exclusively locked.
+			 */
+			__sync_fetch_and_sub(&lock->lockcount, 1);
+			mustwait = true;
+			*potentially_spurious = true;
+		}
+		/* yipeyyahee */
+		else
+			mustwait = false;
+	}
+
+out:
+	return mustwait;
+}
+
+/*
+ * Wakeup all the lockers that currently have a chance to run.
+ */
+static void
+LWLockWakeup(volatile LWLock* lock, LWLockId lockid, LWLockMode mode)
+{
+	/*
+	 * Remove the to-be-awakened PGPROCs from the queue.
+	 */
+	bool		releaseOK = true;
+	PGPROC	   *cur;
+	PGPROC	   *last = NULL;
+	PGPROC	   *next = NULL;
+	PGPROC	  **wakeup;
+	size_t		nwakeup = 0;
+	size_t		wakeup_slots;
+	bool		woke_up_shared = false;
+	int			i;
+
+	/* Acquire mutex.  Time spent holding mutex should be short! */
+	SpinLockAcquire(&lock->mutex);
+
+	Assert((lock->head != NULL && lock->tail != NULL) ||
+	       lock->head == NULL);
+
+	cur = lock->head;
+
+	if (cur == NULL)
+	{
+#ifdef LOCK_DEBUG
+		/*
+		 * this can legally happen since we increase lock->waiters in
+		 * LWLockQueueLock before queueing, but it can also indicate a
+		 * bug, so it's usefull to display this when tracing.
+		 */
+		if (pg_atomic_read(lock->waiters) > 0)
+		{
+			PRINT_LWDEBUG("LWLockWakeup - no head, but waiters", lockid, lock, 0);
+		}
+#endif
+		/* nothing to do, can't be out of date because of the spinlock */
+		goto unlock;
+	}
+
+	wakeup_slots = pg_atomic_read(lock->waiters);
+	wakeup = alloca(wakeup_slots * sizeof(PGPROC *));
+
+	while (cur != NULL)
+	{
+		/* panic because others will be stuck in a semaphore */
+		if (nwakeup >= wakeup_slots)
+			elog(PANIC, "too many waiters");
+
+		/* wakeup all shared locks we find, unless we've found an exclusive lock first */
+		if (cur != MyProc && cur->lwWaitMode == LW_SHARED)
+		{
+			woke_up_shared = true;
+			wakeup[nwakeup++] = cur;
+			/*
+			 * don't want to wakeup again before any of the woken up
+			 * entries got scheduled.
+			 */
+			releaseOK = false;
+		}
+		/*
+		 * wakeup all UNTIL_FREE entries until we find a shared lock,
+		 * even if there were shared locks before
+		 */
+		else if (cur != MyProc && cur->lwWaitMode == LW_WAIT_UNTIL_FREE)
+		{
+			wakeup[nwakeup++] = cur;
+		}
+
+		/*
+		 * Exlusive lock. Wake this one up, but no further ones since
+		 * they wouldn't be able to acquire the lock anyway until the
+		 * exclusive lock releases.
+		 */
+		else if (cur != MyProc && cur->lwWaitMode == LW_EXCLUSIVE && !woke_up_shared)
+		{
+			wakeup[nwakeup++] = cur;
+
+			/* repoint lock->head since we're removing the current one */
+			if (last == NULL)
+				lock->head = cur->lwWaitLink;
+			/* unlink current node from chain */
+			else
+				last->lwWaitLink = cur->lwWaitLink;
+
+			/*
+			 * removing the last node, we need to repoint ->tail,
+			 * otherwise appending won't work.
+			 */
+			if (cur == lock->tail)
+				lock->tail = last;
+
+			Assert((lock->head != NULL && lock->tail != NULL) ||
+			       lock->head == NULL);
+
+			cur->lwWaitLink = NULL;
+			/*
+			 * don't want to wakeup again before any of the woken up
+			 * entries got scheduled.
+			 */
+			releaseOK = false;
+			break;
+		}
+		/* won't be woken up, but we want to look at the later nodes */
+		else
+		{
+			last = cur;
+			cur = cur->lwWaitLink;
+			continue;
+		}
+
+		/* remove current from linked list */
+
+		/* current is the first node */
+		if (last == NULL)
+			lock->head = cur->lwWaitLink;
+		else
+			last->lwWaitLink = cur->lwWaitLink;
+
+		/* current is the end of the list, repoint tail */
+		if (cur == lock->tail)
+			lock->tail = last;
+
+		next = cur->lwWaitLink;
+		/* mark us not being part of the list, but do not yet wake up */
+		cur->lwWaitLink = NULL;
+		cur = next;
+	}
+
+	Assert((lock->head != NULL && lock->tail != NULL) ||
+	       lock->head == NULL);
+
+unlock:
+	lock->releaseOK = releaseOK;
+
+	/* We are done updating shared state of the lock queue. */
+	SpinLockRelease(&lock->mutex);
+
+	/*
+	 * Awaken any waiters I removed from the queue.
+	 */
+	for (i = 0; i < nwakeup; i++)
+	{
+		PGPROC *proc = wakeup[i];
+
+		Assert(proc != MyProc);
+		Assert(lock->head != proc);
+		Assert(lock->tail != proc);
+
+		/*
+		 * unset waiting, doing so only now allows some more error
+		 * checking to be done elsewhere
+		 */
+		proc->lwWaiting = false;
+		PGSemaphoreUnlock(&proc->sem);
+	}
+}
+
+/*
+ * Add ourselves to the end of the queue. Mode can be
+ * LW_WAIT_UNTIL_FREE here!
+ */
+static inline void
+LWLockQueueSelf(volatile LWLock *lock, LWLockMode mode)
+{
+	/*
+	 * If we don't have a PGPROC structure, there's no way to wait. This
+	 * should never occur, since MyProc should only be null during shared
+	 * memory initialization.
+	 */
+	if (MyProc == NULL)
+		elog(PANIC, "cannot wait without a PGPROC structure");
+
+	__sync_fetch_and_add(&lock->waiters, 1);
+
+	SpinLockAcquire(&lock->mutex);
+
+	/* consistent list state */
+	Assert((lock->head != NULL && lock->tail != NULL) ||
+	       lock->head == NULL);
+	/* we can only be part of one queue at a time */
+	Assert(!MyProc->lwWaiting);
+	/* quick and dirty check for repeated queueing */
+	Assert(lock->head != MyProc);
+	Assert(lock->tail != MyProc);
+
+	MyProc->lwWaiting = true;
+	MyProc->lwWaitMode = mode;
+	MyProc->lwWaitLink = NULL;
+
+	/* no list there yet */
+	if (lock->head == NULL)
+		lock->head = MyProc;
+	/* to the end with it */
+	else
+	{
+		Assert(lock->tail != NULL);
+		Assert(lock->tail->lwWaitLink == NULL);
+
+		lock->tail->lwWaitLink = MyProc;
+		Assert(lock->tail->lwWaitLink != lock->tail);
+	}
+	lock->tail = MyProc;
+
+	/* Can release the mutex now */
+	SpinLockRelease(&lock->mutex);
+}
+
+/*
+ * Go ahead and remove ourselves from somewhere on the waitlist after
+ * we've discovered that we don't actually need to be on it after some
+ * more checking. Somebody else might have already woken us up, in
+ * that case return false.
+ */
+static inline bool
+LWLockUnqueueSelf(volatile LWLock *lock)
+{
+	PGPROC *proc;
+	PGPROC *prev = NULL;
+	bool	found = false;
+
+	SpinLockAcquire(&lock->mutex);
+	proc = lock->head;
+
+	while (proc != NULL)
+	{
+		if (proc == MyProc)
+		{
+			/* removing first element */
+			if (prev == NULL)
+				lock->head = proc->lwWaitLink;
+			/* any other */
+			else
+			{
+				prev->lwWaitLink = proc->lwWaitLink;
+				Assert(prev->lwWaitLink != prev);
+			}
+
+			/* removing last element, repoint tail */
+			if (proc == lock->tail)
+				lock->tail = prev;
+
+			proc->lwWaitLink = NULL;
+			proc->lwWaiting = false;
+			found = true;
+			break;
+		}
+		prev = proc;
+		proc = proc->lwWaitLink;
+	}
+
+	Assert((lock->head != NULL && lock->tail != NULL) ||
+	       lock->head == NULL);
+
+	SpinLockRelease(&lock->mutex);
+
+	/*
+	 * We cannot still be part of a queue even if somebody else woke
+	 * us up, but we still can be marked as lwWaiting.
+	 */
+	Assert(MyProc->lwWaitLink == NULL);
+
+	if (found)
+		__sync_fetch_and_sub(&lock->waiters, 1);
+	return found;
+}
 
 /*
  * LWLockAcquire - acquire a lightweight lock in the specified mode
@@ -342,10 +726,11 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode)
 {
 	volatile LWLock *lock = &(LWLockArray[lockid].lock);
 	PGPROC	   *proc = MyProc;
-	bool		retry = false;
 	int			extraWaits = 0;
+	bool		potentially_spurious;
+	uint32		iterations = 0;
 
-	PRINT_LWDEBUG("LWLockAcquire", lockid, lock);
+	PRINT_LWDEBUG("LWLockAcquire", lockid, lock, mode);
 
 #ifdef LWLOCK_STATS
 	/* Set up local count state first time through in a given process */
@@ -395,64 +780,69 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode)
 	for (;;)
 	{
 		bool		mustwait;
-
 		/* Acquire mutex.  Time spent holding mutex should be short! */
 #ifdef LWLOCK_STATS
 		spin_delay_counts[lockid] += SpinLockAcquire(&lock->mutex);
 #else
-		SpinLockAcquire(&lock->mutex);
 #endif
 
-		/* If retrying, allow LWLockRelease to release waiters again */
-		if (retry)
-			lock->releaseOK = true;
+		/* try to grab the lock the first time, we're not in the waitqueue yet */
+		mustwait = LWLockAttemptLock(lock, mode, false, &potentially_spurious);
+
+		if (!mustwait)
+			break;				/* got the lock */
+
+		/*
+		 * Ok, at this point we couldn't grab the lock on the first
+		 * try. We cannot simply queue ourselves to the end of the
+		 * list and wait to be woken up because by now the lock could
+		 * long have been released. Instead add us to the queue and
+		 * try to grab the lock again. If we suceed we need to revert
+		 * the queuing and be happy, otherwise we recheck the lock. If
+		 * we still couldn't grab it, we know that the other lock will
+		 * see our queue entries when releasing since they existed
+		 * before we checked for the lock.
+		 */
+
+		/* add to the queue */
+		LWLockQueueSelf(lock, mode);
+
+		/* we're now guaranteed to be woken up if neccessary */
+		mustwait = LWLockAttemptLock(lock, mode, false, &potentially_spurious);
 
-		/* If I can get the lock, do so quickly. */
-		if (mode == LW_EXCLUSIVE)
+		/* ok, grabbed the lock the second time round, need to undo queueing */
+		if (!mustwait)
 		{
-			if (lock->exclusive == 0 && lock->shared == 0)
+			if (!LWLockUnqueueSelf(lock))
 			{
-				lock->exclusive++;
-				mustwait = false;
+				/*
+				 * somebody else unqueued us and has or will wake us
+				 * up. Wait for the correct wakeup, otherwise our
+				 * ->lwWaiting might get at some unconvenient point
+				 * later, and releaseOk wouldn't be managed correctly.
+				 */
+				for (;;)
+				{
+					PGSemaphoreLock(&proc->sem, false);
+					if (!proc->lwWaiting)
+						break;
+					extraWaits++;
+				}
+				lock->releaseOK = true;
 			}
-			else
-				mustwait = true;
+			PRINT_LWDEBUG("LWLockAcquire undo queue", lockid, lock, mode);
+			break;
 		}
 		else
 		{
-			if (lock->exclusive == 0)
-			{
-				lock->shared++;
-				mustwait = false;
-			}
-			else
-				mustwait = true;
+			PRINT_LWDEBUG("LWLockAcquire waiting 4", lockid, lock, mode);
 		}
 
-		if (!mustwait)
-			break;				/* got the lock */
-
 		/*
-		 * Add myself to wait queue.
-		 *
-		 * If we don't have a PGPROC structure, there's no way to wait. This
-		 * should never occur, since MyProc should only be null during shared
-		 * memory initialization.
+		 * NB: There's no need to deal with spurious lock attempts
+		 * here. Anyone we prevented from acquiring the lock will
+		 * enqueue themselves using the same protocol we used here.
 		 */
-		if (proc == NULL)
-			elog(PANIC, "cannot wait without a PGPROC structure");
-
-		proc->lwWaiting = true;
-		proc->lwWaitMode = mode;
-		proc->lwWaitLink = NULL;
-		if (lock->head == NULL)
-			lock->head = proc;
-		else
-			lock->tail->lwWaitLink = proc;
-		lock->tail = proc;
-
-		/* Can release the mutex now */
-		SpinLockRelease(&lock->mutex);
 
 		/*
 		 * Wait until awakened.
@@ -466,7 +856,7 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode)
 		 * so that the lock manager or signal manager will see the received
 		 * signal when it next waits.
 		 */
-		LOG_LWDEBUG("LWLockAcquire", lockid, "waiting");
+		LOG_LWDEBUG("LWLockAcquire", lockid, mode, "waiting");
 
 #ifdef LWLOCK_STATS
 		block_counts[lockid]++;
@@ -482,28 +872,34 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode)
 				break;
 			extraWaits++;
 		}
+		lock->releaseOK = true;
 
 		TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(lockid, mode);
 
-		LOG_LWDEBUG("LWLockAcquire", lockid, "awakened");
+		LOG_LWDEBUG("LWLockAcquire", lockid, mode, "awakened");
 
 		/* Now loop back and try to acquire lock again. */
-		retry = true;
+		__sync_fetch_and_sub(&lock->waiters, 1);
+		iterations++;
 	}
 
-	/* We are done updating shared state of the lock itself. */
-	SpinLockRelease(&lock->mutex);
-
 	TRACE_POSTGRESQL_LWLOCK_ACQUIRE(lockid, mode);
 
 	/* Add lock to list of locks held by this backend */
-	held_lwlocks[num_held_lwlocks++] = lockid;
+	held_lwlocks[num_held_lwlocks].lock = lockid;
+	held_lwlocks[num_held_lwlocks++].mode = mode;
 
 	/*
 	 * Fix the process wait semaphore's count for any absorbed wakeups.
 	 */
 	while (extraWaits-- > 0)
 		PGSemaphoreUnlock(&proc->sem);
+
+	/* some minor consistency checks */
+	Assert(MyProc == NULL || MyProc->lwWaitLink == NULL);
+	Assert(MyProc == NULL || !MyProc->lwWaiting);
+	Assert(MyProc == NULL || lock->head != MyProc);
+	Assert(MyProc == NULL || lock->tail != MyProc);
 }
 
 /*
@@ -518,8 +914,9 @@ LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode)
 {
 	volatile LWLock *lock = &(LWLockArray[lockid].lock);
 	bool		mustwait;
+	bool		potentially_spurious;
 
-	PRINT_LWDEBUG("LWLockConditionalAcquire", lockid, lock);
+	PRINT_LWDEBUG("LWLockConditionalAcquire", lockid, lock, mode);
 
 	/* Ensure we will have room to remember the lock */
 	if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
@@ -531,49 +928,51 @@ LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode)
 	 * manipulations of data structures in shared memory.
 	 */
 	HOLD_INTERRUPTS();
-
-	/* Acquire mutex.  Time spent holding mutex should be short! */
-	SpinLockAcquire(&lock->mutex);
-
-	/* If I can get the lock, do so quickly. */
-	if (mode == LW_EXCLUSIVE)
-	{
-		if (lock->exclusive == 0 && lock->shared == 0)
-		{
-			lock->exclusive++;
-			mustwait = false;
-		}
-		else
-			mustwait = true;
-	}
-	else
-	{
-		if (lock->exclusive == 0)
-		{
-			lock->shared++;
-			mustwait = false;
-		}
-		else
-			mustwait = true;
-	}
-
-	/* We are done updating shared state of the lock itself. */
-	SpinLockRelease(&lock->mutex);
+retry:
+	/*
+	 * passing 'true' to check more carefully to avoid potential
+	 * spurious acquirations
+	 */
+	mustwait = LWLockAttemptLock(lock, mode, true, &potentially_spurious);
 
 	if (mustwait)
 	{
 		/* Failed to get lock, so release interrupt holdoff */
 		RESUME_INTERRUPTS();
-		LOG_LWDEBUG("LWLockConditionalAcquire", lockid, "failed");
+		LOG_LWDEBUG("LWLockConditionalAcquire", lockid, mode, "failed");
 		TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL(lockid, mode);
+
+		/*
+		 * We ran into an exclusive lock and might have blocked
+		 * another exclusive lock from taking a shot because it took a
+		 * time to back off. Retry till we are either sure we didn't
+		 * block somebody (because somebody else securely has the
+		 * lock) or till we got it.
+		 *
+		 * We cannot rely on the two-step lock-acquiration protocol as
+		 * in LWLockAcquire because we're not using it.
+		 *
+		 * Retry until we're sure no spurious acquiration happened.
+		 */
+		if (potentially_spurious)
+		{
+			SPIN_DELAY();
+			goto retry;
+		}
 	}
 	else
 	{
 		/* Add lock to list of locks held by this backend */
-		held_lwlocks[num_held_lwlocks++] = lockid;
+		held_lwlocks[num_held_lwlocks].lock = lockid;
+		held_lwlocks[num_held_lwlocks++].mode = mode;
 		TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE(lockid, mode);
 	}
 
+	Assert(MyProc == NULL || MyProc->lwWaitLink == NULL);
+	Assert(MyProc == NULL || !MyProc->lwWaiting);
+	Assert(MyProc == NULL || lock->head != MyProc);
+	Assert(MyProc == NULL || lock->tail != MyProc);
+
 	return !mustwait;
 }
 
@@ -598,8 +997,10 @@ LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode)
 	PGPROC	   *proc = MyProc;
 	bool		mustwait;
 	int			extraWaits = 0;
+	bool		potentially_spurious_first;
+	bool		potentially_spurious_second;
 
-	PRINT_LWDEBUG("LWLockAcquireOrWait", lockid, lock);
+	PRINT_LWDEBUG("LWLockAcquireOrWait", lockid, lock, mode);
 
 #ifdef LWLOCK_STATS
 	/* Set up local count state first time through in a given process */
@@ -618,84 +1019,66 @@ LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode)
 	 */
 	HOLD_INTERRUPTS();
 
-	/* Acquire mutex.  Time spent holding mutex should be short! */
-	SpinLockAcquire(&lock->mutex);
-
-	/* If I can get the lock, do so quickly. */
-	if (mode == LW_EXCLUSIVE)
-	{
-		if (lock->exclusive == 0 && lock->shared == 0)
-		{
-			lock->exclusive++;
-			mustwait = false;
-		}
-		else
-			mustwait = true;
-	}
-	else
-	{
-		if (lock->exclusive == 0)
-		{
-			lock->shared++;
-			mustwait = false;
-		}
-		else
-			mustwait = true;
-	}
+	/*
+	 * NB: We're using nearly the same twice-in-a-row lock acquiration
+	 * protocol as LWLockAcquire(). Check it's comments for details.
+	 */
+	mustwait = LWLockAttemptLock(lock, mode, false, &potentially_spurious_first);
 
 	if (mustwait)
 	{
-		/*
-		 * Add myself to wait queue.
-		 *
-		 * If we don't have a PGPROC structure, there's no way to wait.  This
-		 * should never occur, since MyProc should only be null during shared
-		 * memory initialization.
-		 */
-		if (proc == NULL)
-			elog(PANIC, "cannot wait without a PGPROC structure");
-
-		proc->lwWaiting = true;
-		proc->lwWaitMode = LW_WAIT_UNTIL_FREE;
-		proc->lwWaitLink = NULL;
-		if (lock->head == NULL)
-			lock->head = proc;
-		else
-			lock->tail->lwWaitLink = proc;
-		lock->tail = proc;
+		LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE);
 
-		/* Can release the mutex now */
-		SpinLockRelease(&lock->mutex);
+		mustwait = LWLockAttemptLock(lock, mode, false, &potentially_spurious_second);
 
-		/*
-		 * Wait until awakened.  Like in LWLockAcquire, be prepared for bogus
-		 * wakups, because we share the semaphore with ProcWaitForSignal.
-		 */
-		LOG_LWDEBUG("LWLockAcquireOrWait", lockid, "waiting");
+		if (mustwait)
+		{
+			/*
+			 * Wait until awakened.  Like in LWLockAcquire, be prepared for bogus
+			 * wakups, because we share the semaphore with ProcWaitForSignal.
+			 */
+			LOG_LWDEBUG("LWLockAcquireOrWait", lockid, mode, "waiting");
 
 #ifdef LWLOCK_STATS
-		block_counts[lockid]++;
+			block_counts[lockid]++;
 #endif
+			TRACE_POSTGRESQL_LWLOCK_WAIT_START(lockid, mode);
 
-		TRACE_POSTGRESQL_LWLOCK_WAIT_START(lockid, mode);
+			for (;;)
+			{
+				/* "false" means cannot accept cancel/die interrupt here. */
+				PGSemaphoreLock(&proc->sem, false);
+				if (!proc->lwWaiting)
+					break;
+				extraWaits++;
+			}
 
-		for (;;)
-		{
-			/* "false" means cannot accept cancel/die interrupt here. */
-			PGSemaphoreLock(&proc->sem, false);
-			if (!proc->lwWaiting)
-				break;
-			extraWaits++;
-		}
+			Assert(lock->head != MyProc);
+			Assert(lock->tail != MyProc);
 
-		TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(lockid, mode);
+			__sync_fetch_and_sub(&lock->waiters, 1);
 
-		LOG_LWDEBUG("LWLockAcquireOrWait", lockid, "awakened");
-	}
-	else
-	{
-		/* We are done updating shared state of the lock itself. */
-		SpinLockRelease(&lock->mutex);
+			TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(lockid, mode);
+
+			LOG_LWDEBUG("LWLockAcquireOrWait", lockid, mode, "awakened");
+		}
+		else
+		{
+			/* got lock in the second attempt, undo queueing */
+			if (!LWLockUnqueueSelf(lock))
+			{
+				for (;;)
+				{
+					PGSemaphoreLock(&proc->sem, false);
+					if (!proc->lwWaiting)
+						break;
+					extraWaits++;
+				}
+			}
+
+			/* FIXME: don't need that anymore? */
+			//LWLockWakeup(lock, lockid, mode);
+		}
 	}
 
 	/*
@@ -708,16 +1091,23 @@ LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode)
 	{
 		/* Failed to get lock, so release interrupt holdoff */
 		RESUME_INTERRUPTS();
-		LOG_LWDEBUG("LWLockAcquireOrWait", lockid, "failed");
+		LOG_LWDEBUG("LWLockAcquireOrWait", lockid, mode, "failed");
 		TRACE_POSTGRESQL_LWLOCK_WAIT_UNTIL_FREE_FAIL(lockid, mode);
 	}
 	else
 	{
+		LOG_LWDEBUG("LWLockAcquireOrWait", lockid, mode, "suceeded");
 		/* Add lock to list of locks held by this backend */
-		held_lwlocks[num_held_lwlocks++] = lockid;
+		held_lwlocks[num_held_lwlocks].lock = lockid;
+		held_lwlocks[num_held_lwlocks++].mode = mode;
 		TRACE_POSTGRESQL_LWLOCK_WAIT_UNTIL_FREE(lockid, mode);
 	}
 
+	Assert(MyProc == NULL || MyProc->lwWaitLink == NULL);
+	Assert(MyProc == NULL || !MyProc->lwWaiting);
+	Assert(MyProc == NULL || lock->head != MyProc);
+	Assert(MyProc == NULL || lock->tail != MyProc);
+
 	return !mustwait;
 }
 
@@ -728,11 +1118,11 @@ void
 LWLockRelease(LWLockId lockid)
 {
 	volatile LWLock *lock = &(LWLockArray[lockid].lock);
-	PGPROC	   *head;
-	PGPROC	   *proc;
 	int			i;
-
-	PRINT_LWDEBUG("LWLockRelease", lockid, lock);
+	LWLockMode	mode;
+	uint32		lockcount;
+	bool		check_waiters;
+	bool		have_waiters = false;
 
 	/*
 	 * Remove lock from list of locks held.  Usually, but not always, it will
@@ -740,8 +1130,11 @@ LWLockRelease(LWLockId lockid)
 	 */
 	for (i = num_held_lwlocks; --i >= 0;)
 	{
-		if (lockid == held_lwlocks[i])
+		if (lockid == held_lwlocks[i].lock)
+		{
+			mode = held_lwlocks[i].mode;
 			break;
+		}
 	}
 	if (i < 0)
 		elog(ERROR, "lock %d is not held", (int) lockid);
@@ -749,100 +1142,63 @@ LWLockRelease(LWLockId lockid)
 	for (; i < num_held_lwlocks; i++)
 		held_lwlocks[i] = held_lwlocks[i + 1];
 
-	/* Acquire mutex.  Time spent holding mutex should be short! */
-	SpinLockAcquire(&lock->mutex);
+	PRINT_LWDEBUG("LWLockRelease", lockid, lock, mode);
 
-	/* Release my hold on lock */
-	if (lock->exclusive > 0)
-		lock->exclusive--;
-	else
-	{
-		Assert(lock->shared > 0);
-		lock->shared--;
-	}
+	pg_read_barrier();
 
-	/*
-	 * See if I need to awaken any waiters.  If I released a non-last shared
-	 * hold, there cannot be anything to do.  Also, do not awaken any waiters
-	 * if someone has already awakened waiters that haven't yet acquired the
-	 * lock.
-	 */
-	head = lock->head;
-	if (head != NULL)
-	{
-		if (lock->exclusive == 0 && lock->shared == 0 && lock->releaseOK)
-		{
-			/*
-			 * Remove the to-be-awakened PGPROCs from the queue.
-			 */
-			bool		releaseOK = true;
+#ifdef NOT_ANYMORE
+	if (pg_atomic_read(lock->waiters) > 0)
+		have_waiters = true;
+#endif
 
-			proc = head;
+	/* Release my hold on lock, both are a full barrier */
+	if (mode == LW_EXCLUSIVE)
+		lockcount = __sync_sub_and_fetch(&lock->lockcount, EXCLUSIVE_LOCK_FINALIZED);
+	else
+		lockcount = __sync_sub_and_fetch(&lock->lockcount, 1);
 
-			/*
-			 * First wake up any backends that want to be woken up without
-			 * acquiring the lock.
-			 */
-			while (proc->lwWaitMode == LW_WAIT_UNTIL_FREE && proc->lwWaitLink)
-				proc = proc->lwWaitLink;
+	/* nobody else can have that kind of lock */
+	Assert(lockcount < EXCLUSIVE_LOCK_FINALIZED);
 
-			/*
-			 * If the front waiter wants exclusive lock, awaken him only.
-			 * Otherwise awaken as many waiters as want shared access.
-			 */
-			if (proc->lwWaitMode != LW_EXCLUSIVE)
-			{
-				while (proc->lwWaitLink != NULL &&
-					   proc->lwWaitLink->lwWaitMode != LW_EXCLUSIVE)
-				{
-					if (proc->lwWaitMode != LW_WAIT_UNTIL_FREE)
-						releaseOK = false;
-					proc = proc->lwWaitLink;
-				}
-			}
-			/* proc is now the last PGPROC to be released */
-			lock->head = proc->lwWaitLink;
-			proc->lwWaitLink = NULL;
+	/*
+	 * Anybody we need to wakeup needs to have started queueing before
+	 * we removed ourselves from the queue and the __sync_ operations
+	 * above are full barriers.
+	 */
 
-			/*
-			 * Prevent additional wakeups until retryer gets to run. Backends
-			 * that are just waiting for the lock to become free don't retry
-			 * automatically.
-			 */
-			if (proc->lwWaitMode != LW_WAIT_UNTIL_FREE)
-				releaseOK = false;
+	if (pg_atomic_read(lock->waiters) > 0)
+		have_waiters = true;
+
+	/* we're still waiting for backends to get scheduled, don't release again */
+	if (!lock->releaseOK)
+		check_waiters = false;
+	/* grant permission to run, even if a spurious share lock increases logcount */
+	else if (mode == LW_EXCLUSIVE && have_waiters)
+		check_waiters = true;
+	/* nobody has this locked anymore, potential exclusive lockers get a chance */
+	else if (lockcount == 0 && have_waiters)
+		check_waiters = true;
+	/* nobody queued or not free */
+	else
+		check_waiters = false;
 
-			lock->releaseOK = releaseOK;
-		}
-		else
-		{
-			/* lock is still held, can't awaken anything */
-			head = NULL;
-		}
+	if (check_waiters)
+	{
+		PRINT_LWDEBUG("LWLockRelease releasing", lockid, lock, mode);
+		LWLockWakeup(lock, lockid, mode);
 	}
 
-	/* We are done updating shared state of the lock itself. */
-	SpinLockRelease(&lock->mutex);
-
 	TRACE_POSTGRESQL_LWLOCK_RELEASE(lockid);
 
 	/*
-	 * Awaken any waiters I removed from the queue.
-	 */
-	while (head != NULL)
-	{
-		LOG_LWDEBUG("LWLockRelease", lockid, "release waiter");
-		proc = head;
-		head = proc->lwWaitLink;
-		proc->lwWaitLink = NULL;
-		proc->lwWaiting = false;
-		PGSemaphoreUnlock(&proc->sem);
-	}
-
-	/*
 	 * Now okay to allow cancel/die interrupts.
 	 */
 	RESUME_INTERRUPTS();
+
+	Assert(MyProc == NULL || MyProc->lwWaitLink == NULL);
+	Assert(MyProc == NULL || !MyProc->lwWaiting);
+	Assert(MyProc == NULL || lock->head != MyProc);
+	Assert(MyProc == NULL || lock->tail != MyProc);
 }
 
 
@@ -862,7 +1218,7 @@ LWLockReleaseAll(void)
 	{
 		HOLD_INTERRUPTS();		/* match the upcoming RESUME_INTERRUPTS */
 
-		LWLockRelease(held_lwlocks[num_held_lwlocks - 1]);
+		LWLockRelease(held_lwlocks[num_held_lwlocks - 1].lock);
 	}
 }
 
@@ -870,8 +1226,7 @@ LWLockReleaseAll(void)
 /*
  * LWLockHeldByMe - test whether my process currently holds a lock
  *
- * This is meant as debug support only.  We do not distinguish whether the
- * lock is held shared or exclusive.
+ * This is meant as debug support only.
  */
 bool
 LWLockHeldByMe(LWLockId lockid)
@@ -880,7 +1235,7 @@ LWLockHeldByMe(LWLockId lockid)
 
 	for (i = 0; i < num_held_lwlocks; i++)
 	{
-		if (held_lwlocks[i] == lockid)
+		if (held_lwlocks[i].lock == lockid)
 			return true;
 	}
 	return false;
-- 
1.8.4.21.g992c386.dirty

