From 82366f1dbc0fc234fdd10dbc15519b3cf7104684 Mon Sep 17 00:00:00 2001
From: Martijn van Oosterhout <oosterhout@fox-it.com>
Date: Tue, 23 Jul 2019 16:49:30 +0200
Subject: [PATCH 1/3] Maintain queue of listening backends to speed up loops

Especially the loop to advance the tail pointer is called fairly often while
holding an exclusive lock.
---
 src/backend/commands/async.c | 45 +++++++++++++++++++++++++++++++++++++++-----
 1 file changed, 40 insertions(+), 5 deletions(-)

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 6e9c580ec6..ba0b1baecc 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -217,6 +217,7 @@ typedef struct QueueBackendStatus
 {
 	int32		pid;			/* either a PID or InvalidPid */
 	Oid			dboid;			/* backend's database OID, or InvalidOid */
+	int			nextListener;	/* backendid of next listener, 0=last */
 	QueuePosition pos;			/* backend has read queue up to here */
 } QueueBackendStatus;
 
@@ -247,6 +248,7 @@ typedef struct AsyncQueueControl
 	QueuePosition tail;			/* the global tail is equivalent to the pos of
 								 * the "slowest" backend */
 	TimestampTz lastQueueFillWarn;	/* time of last queue-full msg */
+	int			firstListener;	/* backendId of first listener, 0=none */
 	QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
 	/* backend[0] is not used; used entries are from [1] to [MaxBackends] */
 } AsyncQueueControl;
@@ -257,8 +259,11 @@ static AsyncQueueControl *asyncQueueControl;
 #define QUEUE_TAIL					(asyncQueueControl->tail)
 #define QUEUE_BACKEND_PID(i)		(asyncQueueControl->backend[i].pid)
 #define QUEUE_BACKEND_DBOID(i)		(asyncQueueControl->backend[i].dboid)
+#define QUEUE_LISTENER_NEXT(i)		(asyncQueueControl->backend[i].nextListener)
 #define QUEUE_BACKEND_POS(i)		(asyncQueueControl->backend[i].pos)
 
+#define QUEUE_FIRST_LISTENER		(asyncQueueControl->firstListener)
+
 /*
  * The SLRU buffer area through which we access the notification queue
  */
@@ -465,11 +470,13 @@ AsyncShmemInit(void)
 		SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
 		SET_QUEUE_POS(QUEUE_TAIL, 0, 0);
 		asyncQueueControl->lastQueueFillWarn = 0;
+		QUEUE_FIRST_LISTENER = 0;
 		/* zero'th entry won't be used, but let's initialize it anyway */
 		for (i = 0; i <= MaxBackends; i++)
 		{
 			QUEUE_BACKEND_PID(i) = InvalidPid;
 			QUEUE_BACKEND_DBOID(i) = InvalidOid;
+			QUEUE_LISTENER_NEXT(i) = InvalidBackendId;
 			SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
 		}
 	}
@@ -908,6 +915,7 @@ Exec_ListenPreCommit(void)
 	QueuePosition head;
 	QueuePosition max;
 	int			i;
+	int			prevListener;
 
 	/*
 	 * Nothing to do if we are already listening to something, nor if we
@@ -953,10 +961,14 @@ Exec_ListenPreCommit(void)
 	LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
 	head = QUEUE_HEAD;
 	max = QUEUE_TAIL;
+	prevListener = 0;
 	if (QUEUE_POS_PAGE(max) != QUEUE_POS_PAGE(head))
 	{
-		for (i = 1; i <= MaxBackends; i++)
+		for (i = QUEUE_FIRST_LISTENER; i; i = QUEUE_LISTENER_NEXT(i))
 		{
+			/* Find last listening backend before this one */
+			if (i < MyBackendId)
+				prevListener = i;
 			if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
 				max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
 		}
@@ -964,6 +976,15 @@ Exec_ListenPreCommit(void)
 	QUEUE_BACKEND_POS(MyBackendId) = max;
 	QUEUE_BACKEND_PID(MyBackendId) = MyProcPid;
 	QUEUE_BACKEND_DBOID(MyBackendId) = MyDatabaseId;
+
+	/* Insert backend into list of listeners */
+	if (prevListener == 0) {
+		QUEUE_LISTENER_NEXT(MyBackendId) = QUEUE_FIRST_LISTENER;
+		QUEUE_FIRST_LISTENER = MyBackendId;
+	} else {
+		QUEUE_LISTENER_NEXT(MyBackendId) = QUEUE_LISTENER_NEXT(prevListener);
+		QUEUE_LISTENER_NEXT(prevListener) = MyBackendId;
+	}
 	LWLockRelease(AsyncQueueLock);
 
 	/* Now we are listed in the global array, so remember we're listening */
@@ -1170,19 +1191,33 @@ static void
 asyncQueueUnregister(void)
 {
 	bool		advanceTail;
+	int		i;
 
 	Assert(listenChannels == NIL);	/* else caller error */
 
 	if (!amRegisteredListener)	/* nothing to do */
 		return;
 
-	LWLockAcquire(AsyncQueueLock, LW_SHARED);
+	LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
 	/* check if entry is valid and oldest ... */
 	advanceTail = (MyProcPid == QUEUE_BACKEND_PID(MyBackendId)) &&
 		QUEUE_POS_EQUAL(QUEUE_BACKEND_POS(MyBackendId), QUEUE_TAIL);
 	/* ... then mark it invalid */
 	QUEUE_BACKEND_PID(MyBackendId) = InvalidPid;
 	QUEUE_BACKEND_DBOID(MyBackendId) = InvalidOid;
+	/* and remove it from the list */
+	if (QUEUE_FIRST_LISTENER == MyBackendId)
+		QUEUE_FIRST_LISTENER = QUEUE_LISTENER_NEXT(MyBackendId);
+	else {
+		for (i = QUEUE_FIRST_LISTENER; i; i = QUEUE_LISTENER_NEXT(i))
+		{
+			if (QUEUE_LISTENER_NEXT(i) == MyBackendId) {
+				QUEUE_LISTENER_NEXT(i) = QUEUE_LISTENER_NEXT(QUEUE_LISTENER_NEXT(i));
+				break;
+			}
+		}
+	}
+	QUEUE_LISTENER_NEXT(MyBackendId) = InvalidBackendId;
 	LWLockRelease(AsyncQueueLock);
 
 	/* mark ourselves as no longer listed in the global array */
@@ -1459,7 +1494,7 @@ asyncQueueFillWarning(void)
 		int32		minPid = InvalidPid;
 		int			i;
 
-		for (i = 1; i <= MaxBackends; i++)
+		for (i = QUEUE_FIRST_LISTENER; i; i = QUEUE_LISTENER_NEXT(i))
 		{
 			if (QUEUE_BACKEND_PID(i) != InvalidPid)
 			{
@@ -1519,7 +1554,7 @@ SignalBackends(void)
 	count = 0;
 
 	LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
-	for (i = 1; i <= MaxBackends; i++)
+	for (i = QUEUE_FIRST_LISTENER; i; i = QUEUE_LISTENER_NEXT(i))
 	{
 		pid = QUEUE_BACKEND_PID(i);
 		if (pid != InvalidPid && pid != MyProcPid)
@@ -1995,7 +2030,7 @@ asyncQueueAdvanceTail(void)
 
 	LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
 	min = QUEUE_HEAD;
-	for (i = 1; i <= MaxBackends; i++)
+	for (i = QUEUE_FIRST_LISTENER; i; i = QUEUE_LISTENER_NEXT(i))
 	{
 		if (QUEUE_BACKEND_PID(i) != InvalidPid)
 			min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
-- 
2.11.0

