From 9680b22ab7c28343c1c2d1de7d318bae2d667e71 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Sat, 19 Nov 2022 15:18:28 -0800
Subject: [PATCH v2 4/9] Use dlist for syncrep queue

---
 src/include/replication/walsender_private.h |  3 +-
 src/include/storage/proc.h                  |  2 +-
 src/backend/replication/syncrep.c           | 89 +++++++++------------
 src/backend/replication/walsender.c         |  2 +-
 src/backend/storage/lmgr/proc.c             |  2 +-
 5 files changed, 41 insertions(+), 57 deletions(-)

diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h
index 7897c74589e..db801e9f5cf 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -13,6 +13,7 @@
 #define _WALSENDER_PRIVATE_H
 
 #include "access/xlog.h"
+#include "lib/ilist.h"
 #include "nodes/nodes.h"
 #include "replication/syncrep.h"
 #include "storage/latch.h"
@@ -89,7 +90,7 @@ typedef struct
 	 * Synchronous replication queue with one queue per request type.
 	 * Protected by SyncRepLock.
 	 */
-	SHM_QUEUE	SyncRepQueue[NUM_SYNC_REP_WAIT_MODE];
+	dlist_head	SyncRepQueue[NUM_SYNC_REP_WAIT_MODE];
 
 	/*
 	 * Current location of the head of the queue. All waiters should have a
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 7005770da79..ba9bf1c9508 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -248,7 +248,7 @@ struct PGPROC
 	 */
 	XLogRecPtr	waitLSN;		/* waiting for this LSN or higher */
 	int			syncRepState;	/* wait state for sync rep */
-	SHM_QUEUE	syncRepLinks;	/* list link if process is in syncrep queue */
+	dlist_node	syncRepLinks;	/* list link if process is in syncrep queue */
 
 	/*
 	 * All PROCLOCK objects for locks held or awaited by this backend are
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index 1a022b11a06..2c2f1082e97 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -182,7 +182,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
 	else
 		mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH);
 
-	Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
+	Assert(dlist_node_is_detached(&MyProc->syncRepLinks));
 	Assert(WalSndCtl != NULL);
 
 	LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
@@ -318,7 +318,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
 	 * assertions, but better safe than sorry).
 	 */
 	pg_read_barrier();
-	Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
+	Assert(dlist_node_is_detached(&MyProc->syncRepLinks));
 	MyProc->syncRepState = SYNC_REP_NOT_WAITING;
 	MyProc->waitLSN = 0;
 
@@ -339,31 +339,32 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
 static void
 SyncRepQueueInsert(int mode)
 {
-	PGPROC	   *proc;
+	dlist_head *queue;
+	dlist_iter iter;
 
 	Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
-	proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
-								   &(WalSndCtl->SyncRepQueue[mode]),
-								   offsetof(PGPROC, syncRepLinks));
+	queue = &WalSndCtl->SyncRepQueue[mode];
 
-	while (proc)
+	dlist_reverse_foreach(iter, queue)
 	{
+		PGPROC	   *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
+
 		/*
-		 * Stop at the queue element that we should after to ensure the queue
-		 * is ordered by LSN.
+		 * Stop at the queue element that we should insert after to ensure the
+		 * queue is ordered by LSN.
 		 */
 		if (proc->waitLSN < MyProc->waitLSN)
-			break;
-
-		proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
-									   &(proc->syncRepLinks),
-									   offsetof(PGPROC, syncRepLinks));
+		{
+			dlist_insert_after(&proc->syncRepLinks, &MyProc->syncRepLinks);
+			return;
+		}
 	}
 
-	if (proc)
-		SHMQueueInsertAfter(&(proc->syncRepLinks), &(MyProc->syncRepLinks));
-	else
-		SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue[mode]), &(MyProc->syncRepLinks));
+	/*
+	 * If we get here, the list was either empty, or this process needs to be
+	 * at the head.
+	 */
+	dlist_push_head(queue, &MyProc->syncRepLinks);
 }
 
 /*
@@ -373,8 +374,8 @@ static void
 SyncRepCancelWait(void)
 {
 	LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
-	if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
-		SHMQueueDelete(&(MyProc->syncRepLinks));
+	if (!dlist_node_is_detached(&MyProc->syncRepLinks))
+		dlist_delete_thoroughly(&MyProc->syncRepLinks);
 	MyProc->syncRepState = SYNC_REP_NOT_WAITING;
 	LWLockRelease(SyncRepLock);
 }
@@ -386,13 +387,13 @@ SyncRepCleanupAtProcExit(void)
 	 * First check if we are removed from the queue without the lock to not
 	 * slow down backend exit.
 	 */
-	if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
+	if (!dlist_node_is_detached(&MyProc->syncRepLinks))
 	{
 		LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
 
 		/* maybe we have just been removed, so recheck */
-		if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
-			SHMQueueDelete(&(MyProc->syncRepLinks));
+		if (!dlist_node_is_detached(&MyProc->syncRepLinks))
+			dlist_delete_thoroughly(&MyProc->syncRepLinks);
 
 		LWLockRelease(SyncRepLock);
 	}
@@ -879,20 +880,17 @@ static int
 SyncRepWakeQueue(bool all, int mode)
 {
 	volatile WalSndCtlData *walsndctl = WalSndCtl;
-	PGPROC	   *proc = NULL;
-	PGPROC	   *thisproc = NULL;
 	int			numprocs = 0;
+	dlist_mutable_iter iter;
 
 	Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
 	Assert(LWLockHeldByMeInMode(SyncRepLock, LW_EXCLUSIVE));
 	Assert(SyncRepQueueIsOrderedByLSN(mode));
 
-	proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
-								   &(WalSndCtl->SyncRepQueue[mode]),
-								   offsetof(PGPROC, syncRepLinks));
-
-	while (proc)
+	dlist_foreach_modify(iter, &WalSndCtl->SyncRepQueue[mode])
 	{
+		PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
+
 		/*
 		 * Assume the queue is ordered by LSN
 		 */
@@ -900,18 +898,9 @@ SyncRepWakeQueue(bool all, int mode)
 			return numprocs;
 
 		/*
-		 * Move to next proc, so we can delete thisproc from the queue.
-		 * thisproc is valid, proc may be NULL after this.
+		 * Remove from queue.
 		 */
-		thisproc = proc;
-		proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
-									   &(proc->syncRepLinks),
-									   offsetof(PGPROC, syncRepLinks));
-
-		/*
-		 * Remove thisproc from queue.
-		 */
-		SHMQueueDelete(&(thisproc->syncRepLinks));
+		dlist_delete_thoroughly(&proc->syncRepLinks);
 
 		/*
 		 * SyncRepWaitForLSN() reads syncRepState without holding the lock, so
@@ -924,12 +913,12 @@ SyncRepWakeQueue(bool all, int mode)
 		 * Set state to complete; see SyncRepWaitForLSN() for discussion of
 		 * the various states.
 		 */
-		thisproc->syncRepState = SYNC_REP_WAIT_COMPLETE;
+		proc->syncRepState = SYNC_REP_WAIT_COMPLETE;
 
 		/*
 		 * Wake only when we have set state and removed from queue.
 		 */
-		SetLatch(&(thisproc->procLatch));
+		SetLatch(&(proc->procLatch));
 
 		numprocs++;
 	}
@@ -983,19 +972,17 @@ SyncRepUpdateSyncStandbysDefined(void)
 static bool
 SyncRepQueueIsOrderedByLSN(int mode)
 {
-	PGPROC	   *proc = NULL;
 	XLogRecPtr	lastLSN;
+	dlist_iter	iter;
 
 	Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
 
 	lastLSN = 0;
 
-	proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
-								   &(WalSndCtl->SyncRepQueue[mode]),
-								   offsetof(PGPROC, syncRepLinks));
-
-	while (proc)
+	dlist_foreach(iter, &WalSndCtl->SyncRepQueue[mode])
 	{
+		PGPROC	   *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
+
 		/*
 		 * Check the queue is ordered by LSN and that multiple procs don't
 		 * have matching LSNs
@@ -1004,10 +991,6 @@ SyncRepQueueIsOrderedByLSN(int mode)
 			return false;
 
 		lastLSN = proc->waitLSN;
-
-		proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
-									   &(proc->syncRepLinks),
-									   offsetof(PGPROC, syncRepLinks));
 	}
 
 	return true;
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index a81ef6a2014..b5a40d2c439 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -3270,7 +3270,7 @@ WalSndShmemInit(void)
 		MemSet(WalSndCtl, 0, WalSndShmemSize());
 
 		for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
-			SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));
+			dlist_init(&(WalSndCtl->SyncRepQueue[i]));
 
 		for (i = 0; i < max_wal_senders; i++)
 		{
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 5ffbd7b8195..39c6685f467 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -410,7 +410,7 @@ InitProcess(void)
 	/* Initialize fields for sync rep */
 	MyProc->waitLSN = 0;
 	MyProc->syncRepState = SYNC_REP_NOT_WAITING;
-	SHMQueueElemInit(&(MyProc->syncRepLinks));
+	dlist_node_init(&MyProc->syncRepLinks);
 
 	/* Initialize fields for group XID clearing. */
 	MyProc->procArrayGroupMember = false;
-- 
2.38.0

