>From e52c608e377fbbb80ffe90d3225142d7b981e740 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Mon, 18 Nov 2013 14:45:11 +0100
Subject: [PATCH 3/4] Convert the PGPROC->lwWaitLink list into a dlist instead
 of open coding it.

Besides being shorter and much easier to read it:
* fixes a bug in xlog.c:WakeupWaiters() accidentally waking up
  exclusive lockers
* changes the logic in LWLockRelease() to release all shared lockers
  when waking up any.
* adds a memory pg_write_barrier() in the wakeup paths between
  dequeuing and unsetting ->lwWaiting. On a weakly ordered machine the
  current coding could lead to problems.
---
 src/backend/access/transam/twophase.c |   1 -
 src/backend/access/transam/xlog.c     | 127 +++++++++++++---------------------
 src/backend/storage/lmgr/lwlock.c     | 106 +++++++++++-----------------
 src/backend/storage/lmgr/proc.c       |   2 -
 src/include/storage/proc.h            |   3 +-
 5 files changed, 90 insertions(+), 149 deletions(-)

diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index e975f8d..7aeb6e8 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -326,7 +326,6 @@ MarkAsPreparing(TransactionId xid, const char *gid,
 	proc->roleId = owner;
 	proc->lwWaiting = false;
 	proc->lwWaitMode = 0;
-	proc->lwWaitLink = NULL;
 	proc->waitLock = NULL;
 	proc->waitProcLock = NULL;
 	for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index a95149b..ea2fb93 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -393,8 +393,7 @@ typedef struct
 
 	bool		releaseOK;		/* T if ok to release waiters */
 	char		exclusive;		/* # of exclusive holders (0 or 1) */
-	PGPROC	   *head;			/* head of list of waiting PGPROCs */
-	PGPROC	   *tail;			/* tail of list of waiting PGPROCs */
+	dlist_head	waiters;		/* list of waiting PGPROCs */
 	/* tail is undefined when head is NULL */
 } XLogInsertSlot;
 
@@ -1626,12 +1625,7 @@ WALInsertSlotAcquireOne(int slotno)
 		 */
 		proc->lwWaiting = true;
 		proc->lwWaitMode = LW_EXCLUSIVE;
-		proc->lwWaitLink = NULL;
-		if (slot->head == NULL)
-			slot->head = proc;
-		else
-			slot->tail->lwWaitLink = proc;
-		slot->tail = proc;
+		dlist_push_tail((dlist_head *) &slot->waiters, &proc->lwWaitLink);
 
 		/* Can release the mutex now */
 		SpinLockRelease(&slot->mutex);
@@ -1746,13 +1740,8 @@ WaitOnSlot(volatile XLogInsertSlot *slot, XLogRecPtr waitptr)
 		 */
 		proc->lwWaiting = true;
 		proc->lwWaitMode = LW_WAIT_UNTIL_FREE;
-		proc->lwWaitLink = NULL;
-
 		/* waiters are added to the front of the queue */
-		proc->lwWaitLink = slot->head;
-		if (slot->head == NULL)
-			slot->tail = proc;
-		slot->head = proc;
+		dlist_push_head((dlist_head *) &slot->waiters, &proc->lwWaitLink);
 
 		/* Can release the mutex now */
 		SpinLockRelease(&slot->mutex);
@@ -1804,9 +1793,8 @@ static void
 WakeupWaiters(XLogRecPtr EndPos)
 {
 	volatile XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[MySlotNo].slot;
-	PGPROC	   *head;
-	PGPROC	   *proc;
-	PGPROC	   *next;
+	dlist_head	wakeup;
+	dlist_mutable_iter iter;
 
 	/*
 	 * If we have already reported progress up to the same point, do nothing.
@@ -1818,6 +1806,8 @@ WakeupWaiters(XLogRecPtr EndPos)
 	/* xlogInsertingAt should not go backwards */
 	Assert(slot->xlogInsertingAt < EndPos);
 
+	dlist_init(&wakeup);
+
 	/* Acquire mutex.  Time spent holding mutex should be short! */
 	SpinLockAcquire(&slot->mutex);
 
@@ -1827,25 +1817,19 @@ WakeupWaiters(XLogRecPtr EndPos)
 	slot->xlogInsertingAt = EndPos;
 
 	/*
-	 * See if there are any waiters that need to be woken up.
+	 * See if there are any LW_WAIT_UNTIL_FREE waiters that need to be woken
+	 * up.
 	 */
-	head = slot->head;
-
-	if (head != NULL)
+	dlist_foreach_modify(iter, (dlist_head *) &slot->waiters)
 	{
-		proc = head;
+		PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
 
 		/* LW_WAIT_UNTIL_FREE waiters are always in the front of the queue */
-		next = proc->lwWaitLink;
-		while (next && next->lwWaitMode == LW_WAIT_UNTIL_FREE)
-		{
-			proc = next;
-			next = next->lwWaitLink;
-		}
+		if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
+			break;
 
-		/* proc is now the last PGPROC to be released */
-		slot->head = next;
-		proc->lwWaitLink = NULL;
+		dlist_delete(&waiter->lwWaitLink);
+		dlist_push_tail(&wakeup, &waiter->lwWaitLink);
 	}
 
 	/* We are done updating shared state of the lock itself. */
@@ -1854,13 +1838,13 @@ WakeupWaiters(XLogRecPtr EndPos)
 	/*
 	 * Awaken any waiters I removed from the queue.
 	 */
-	while (head != NULL)
+	dlist_foreach_modify(iter, (dlist_head *) &wakeup)
 	{
-		proc = head;
-		head = proc->lwWaitLink;
-		proc->lwWaitLink = NULL;
-		proc->lwWaiting = false;
-		PGSemaphoreUnlock(&proc->sem);
+		PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
+		dlist_delete(&waiter->lwWaitLink);
+		pg_write_barrier();
+		waiter->lwWaiting = false;
+		PGSemaphoreUnlock(&waiter->sem);
 	}
 }
 
@@ -1886,8 +1870,11 @@ static void
 WALInsertSlotReleaseOne(int slotno)
 {
 	volatile XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[slotno].slot;
-	PGPROC	   *head;
-	PGPROC	   *proc;
+	dlist_head	wakeup;
+	dlist_mutable_iter iter;
+	bool		releaseOK = true;
+
+	dlist_init(&wakeup);
 
 	/* Acquire mutex.  Time spent holding mutex should be short! */
 	SpinLockAcquire(&slot->mutex);
@@ -1904,43 +1891,28 @@ WALInsertSlotReleaseOne(int slotno)
 	/*
 	 * See if I need to awaken any waiters..
 	 */
-	head = slot->head;
-	if (head != NULL)
+	dlist_foreach_modify(iter, (dlist_head *) &slot->waiters)
 	{
-		if (slot->releaseOK)
-		{
-			/*
-			 * Remove the to-be-awakened PGPROCs from the queue.
-			 */
-			bool		releaseOK = true;
+		PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
 
-			proc = head;
+		dlist_delete(&waiter->lwWaitLink);
+		dlist_push_tail(&wakeup, &waiter->lwWaitLink);
 
-			/*
-			 * First wake up any backends that want to be woken up without
-			 * acquiring the lock. These are always in the front of the queue.
-			 */
-			while (proc->lwWaitMode == LW_WAIT_UNTIL_FREE && proc->lwWaitLink)
-				proc = proc->lwWaitLink;
-
-			/*
-			 * Awaken the first exclusive-waiter, if any.
-			 */
-			if (proc->lwWaitLink)
-			{
-				Assert(proc->lwWaitLink->lwWaitMode == LW_EXCLUSIVE);
-				proc = proc->lwWaitLink;
-				releaseOK = false;
-			}
-			/* proc is now the last PGPROC to be released */
-			slot->head = proc->lwWaitLink;
-			proc->lwWaitLink = NULL;
+		/*
+		 * First wake up any backends that want to be woken up without
+		 * acquiring the lock. These are always in the front of the
+		 * queue. Then wakeup the first exclusive-waiter, if any.
+		 */
 
-			slot->releaseOK = releaseOK;
+		/* LW_WAIT_UNTIL_FREE waiters are always in the front of the queue */
+		if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
+		{
+			Assert(waiter->lwWaitMode == LW_EXCLUSIVE);
+			releaseOK = false;
+			break;
 		}
-		else
-			head = NULL;
 	}
+	slot->releaseOK = releaseOK;
 
 	/* We are done updating shared state of the slot itself. */
 	SpinLockRelease(&slot->mutex);
@@ -1948,13 +1920,13 @@ WALInsertSlotReleaseOne(int slotno)
 	/*
 	 * Awaken any waiters I removed from the queue.
 	 */
-	while (head != NULL)
+	dlist_foreach_modify(iter, (dlist_head *) &wakeup)
 	{
-		proc = head;
-		head = proc->lwWaitLink;
-		proc->lwWaitLink = NULL;
-		proc->lwWaiting = false;
-		PGSemaphoreUnlock(&proc->sem);
+		PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
+		dlist_delete(&waiter->lwWaitLink);
+		pg_write_barrier();
+		waiter->lwWaiting = false;
+		PGSemaphoreUnlock(&waiter->sem);
 	}
 
 	/*
@@ -5104,8 +5076,7 @@ XLOGShmemInit(void)
 
 		slot->releaseOK = true;
 		slot->exclusive = 0;
-		slot->head = NULL;
-		slot->tail = NULL;
+		dlist_init(&slot->waiters);
 	}
 
 	SpinLockInit(&XLogCtl->Insert.insertpos_lck);
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index 4f88d3f..732a5d2 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -25,6 +25,7 @@
 #include "access/multixact.h"
 #include "access/subtrans.h"
 #include "commands/async.h"
+#include "lib/ilist.h"
 #include "miscadmin.h"
 #include "pg_trace.h"
 #include "storage/ipc.h"
@@ -43,8 +44,7 @@ typedef struct LWLock
 	bool		releaseOK;		/* T if ok to release waiters */
 	char		exclusive;		/* # of exclusive holders (0 or 1) */
 	int			shared;			/* # of shared holders (0..MaxBackends) */
-	PGPROC	   *head;			/* head of list of waiting PGPROCs */
-	PGPROC	   *tail;			/* tail of list of waiting PGPROCs */
+	dlist_head	waiters;
 	/* tail is undefined when head is NULL */
 } LWLock;
 
@@ -105,9 +105,9 @@ inline static void
 PRINT_LWDEBUG(const char *where, LWLockId lockid, const volatile LWLock *lock)
 {
 	if (Trace_lwlocks)
-		elog(LOG, "%s(%d): excl %d shared %d head %p rOK %d",
+		elog(LOG, "%s(%d): excl %d shared %d rOK %d",
 			 where, (int) lockid,
-			 (int) lock->exclusive, lock->shared, lock->head,
+			 (int) lock->exclusive, lock->shared,
 			 (int) lock->releaseOK);
 }
 
@@ -287,8 +287,7 @@ CreateLWLocks(void)
 		lock->lock.releaseOK = true;
 		lock->lock.exclusive = 0;
 		lock->lock.shared = 0;
-		lock->lock.head = NULL;
-		lock->lock.tail = NULL;
+		dlist_init(&lock->lock.waiters);
 	}
 
 	/*
@@ -444,12 +443,7 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode)
 
 		proc->lwWaiting = true;
 		proc->lwWaitMode = mode;
-		proc->lwWaitLink = NULL;
-		if (lock->head == NULL)
-			lock->head = proc;
-		else
-			lock->tail->lwWaitLink = proc;
-		lock->tail = proc;
+		dlist_push_head((dlist_head *) &lock->waiters, &proc->lwWaitLink);
 
 		/* Can release the mutex now */
 		SpinLockRelease(&lock->mutex);
@@ -657,12 +651,7 @@ LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode)
 
 		proc->lwWaiting = true;
 		proc->lwWaitMode = LW_WAIT_UNTIL_FREE;
-		proc->lwWaitLink = NULL;
-		if (lock->head == NULL)
-			lock->head = proc;
-		else
-			lock->tail->lwWaitLink = proc;
-		lock->tail = proc;
+		dlist_push_head((dlist_head *) &lock->waiters, &proc->lwWaitLink);
 
 		/* Can release the mutex now */
 		SpinLockRelease(&lock->mutex);
@@ -728,10 +717,12 @@ void
 LWLockRelease(LWLockId lockid)
 {
 	volatile LWLock *lock = &(LWLockArray[lockid].lock);
-	PGPROC	   *head;
-	PGPROC	   *proc;
+	dlist_head	wakeup;
+	dlist_mutable_iter iter;
 	int			i;
 
+	dlist_init(&wakeup);
+
 	PRINT_LWDEBUG("LWLockRelease", lockid, lock);
 
 	/*
@@ -767,58 +758,39 @@ LWLockRelease(LWLockId lockid)
 	 * if someone has already awakened waiters that haven't yet acquired the
 	 * lock.
 	 */
-	head = lock->head;
-	if (head != NULL)
+	if (lock->exclusive == 0 && lock->shared == 0 && lock->releaseOK)
 	{
-		if (lock->exclusive == 0 && lock->shared == 0 && lock->releaseOK)
+		/*
+		 * Remove the to-be-awakened PGPROCs from the queue.
+		 */
+		bool		releaseOK = true;
+		bool		wokeup_somebody = false;
+
+		dlist_foreach_modify(iter, (dlist_head *) &lock->waiters)
 		{
-			/*
-			 * Remove the to-be-awakened PGPROCs from the queue.
-			 */
-			bool		releaseOK = true;
+			PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
 
-			proc = head;
+			if (wokeup_somebody && waiter->lwWaitMode == LW_EXCLUSIVE)
+				continue;
 
-			/*
-			 * First wake up any backends that want to be woken up without
-			 * acquiring the lock.
-			 */
-			while (proc->lwWaitMode == LW_WAIT_UNTIL_FREE && proc->lwWaitLink)
-				proc = proc->lwWaitLink;
+			dlist_delete(&waiter->lwWaitLink);
+			dlist_push_tail(&wakeup, &waiter->lwWaitLink);
 
 			/*
-			 * If the front waiter wants exclusive lock, awaken him only.
-			 * Otherwise awaken as many waiters as want shared access.
+			 * Prevent additional wakeups until retryer gets to
+			 * run. Backends that are just waiting for the lock to become
+			 * free don't retry automatically.
 			 */
-			if (proc->lwWaitMode != LW_EXCLUSIVE)
+			if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
 			{
-				while (proc->lwWaitLink != NULL &&
-					   proc->lwWaitLink->lwWaitMode != LW_EXCLUSIVE)
-				{
-					if (proc->lwWaitMode != LW_WAIT_UNTIL_FREE)
-						releaseOK = false;
-					proc = proc->lwWaitLink;
-				}
-			}
-			/* proc is now the last PGPROC to be released */
-			lock->head = proc->lwWaitLink;
-			proc->lwWaitLink = NULL;
-
-			/*
-			 * Prevent additional wakeups until retryer gets to run. Backends
-			 * that are just waiting for the lock to become free don't retry
-			 * automatically.
-			 */
-			if (proc->lwWaitMode != LW_WAIT_UNTIL_FREE)
 				releaseOK = false;
+				wokeup_somebody = true;
+			}
 
-			lock->releaseOK = releaseOK;
-		}
-		else
-		{
-			/* lock is still held, can't awaken anything */
-			head = NULL;
+			if(waiter->lwWaitMode == LW_EXCLUSIVE)
+				break;
 		}
+		lock->releaseOK = releaseOK;
 	}
 
 	/* We are done updating shared state of the lock itself. */
@@ -829,14 +801,14 @@ LWLockRelease(LWLockId lockid)
 	/*
 	 * Awaken any waiters I removed from the queue.
 	 */
-	while (head != NULL)
+	dlist_foreach_modify(iter, (dlist_head *) &wakeup)
 	{
+		PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
 		LOG_LWDEBUG("LWLockRelease", lockid, "release waiter");
-		proc = head;
-		head = proc->lwWaitLink;
-		proc->lwWaitLink = NULL;
-		proc->lwWaiting = false;
-		PGSemaphoreUnlock(&proc->sem);
+		dlist_delete(&waiter->lwWaitLink);
+		pg_write_barrier();
+		waiter->lwWaiting = false;
+		PGSemaphoreUnlock(&waiter->sem);
 	}
 
 	/*
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 222251d..75bddcd 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -370,7 +370,6 @@ InitProcess(void)
 		MyPgXact->vacuumFlags |= PROC_IS_AUTOVACUUM;
 	MyProc->lwWaiting = false;
 	MyProc->lwWaitMode = 0;
-	MyProc->lwWaitLink = NULL;
 	MyProc->waitLock = NULL;
 	MyProc->waitProcLock = NULL;
 #ifdef USE_ASSERT_CHECKING
@@ -533,7 +532,6 @@ InitAuxiliaryProcess(void)
 	MyPgXact->vacuumFlags = 0;
 	MyProc->lwWaiting = false;
 	MyProc->lwWaitMode = 0;
-	MyProc->lwWaitLink = NULL;
 	MyProc->waitLock = NULL;
 	MyProc->waitProcLock = NULL;
 #ifdef USE_ASSERT_CHECKING
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 3b04d3c..73cea1c 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -15,6 +15,7 @@
 #define _PROC_H_
 
 #include "access/xlogdefs.h"
+#include "lib/ilist.h"
 #include "storage/latch.h"
 #include "storage/lock.h"
 #include "storage/pg_sema.h"
@@ -101,7 +102,7 @@ struct PGPROC
 	/* Info about LWLock the process is currently waiting for, if any. */
 	bool		lwWaiting;		/* true if waiting for an LW lock */
 	uint8		lwWaitMode;		/* lwlock mode being waited for */
-	struct PGPROC *lwWaitLink;	/* next waiter for same LW lock */
+	dlist_node	lwWaitLink;		/* next waiter for same LW lock */
 
 	/* Info about lock the process is currently waiting for, if any. */
 	/* waitLock and waitProcLock are NULL if not currently waiting. */
-- 
1.8.3.251.g1462b67

