From 7d68b9bad4e172fc5d56df8630af2c22986b4f81 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Wed, 26 Oct 2022 17:36:34 +1300
Subject: [PATCH 2/8] Use SetLatches() for condition variables.

Drain condition variable wait lists in larger batches, not one at a
time, while broadcasting.  This is an idea from Yura Sokolov, which I've
now combined with SetLatches() for slightly more efficiency.

Since we're now performing loops, change the internal spinlock to an
lwlock.

XXX There is probably a better data structure/arrangement that would
allow us to hold the lock for a shorter time.  This patch is not very
satisfying yet.

Discussion: https://postgr.es/m/1edbb61981fe1d99c3f20e3d56d6c88999f4227c.camel%40postgrespro.ru
---
 src/backend/storage/lmgr/condition_variable.c | 97 +++++++++----------
 src/backend/storage/lmgr/lwlock.c             |  2 +
 src/include/storage/condition_variable.h      |  4 +-
 src/include/storage/lwlock.h                  |  1 +
 4 files changed, 49 insertions(+), 55 deletions(-)

diff --git a/src/backend/storage/lmgr/condition_variable.c b/src/backend/storage/lmgr/condition_variable.c
index de65dac3ae..4b9749ecdd 100644
--- a/src/backend/storage/lmgr/condition_variable.c
+++ b/src/backend/storage/lmgr/condition_variable.c
@@ -36,7 +36,7 @@ static ConditionVariable *cv_sleep_target = NULL;
 void
 ConditionVariableInit(ConditionVariable *cv)
 {
-	SpinLockInit(&cv->mutex);
+	LWLockInitialize(&cv->mutex, LWTRANCHE_CONDITION_VARIABLE);
 	proclist_init(&cv->wakeup);
 }
 
@@ -74,9 +74,9 @@ ConditionVariablePrepareToSleep(ConditionVariable *cv)
 	cv_sleep_target = cv;
 
 	/* Add myself to the wait queue. */
-	SpinLockAcquire(&cv->mutex);
+	LWLockAcquire(&cv->mutex, LW_EXCLUSIVE);
 	proclist_push_tail(&cv->wakeup, pgprocno, cvWaitLink);
-	SpinLockRelease(&cv->mutex);
+	LWLockRelease(&cv->mutex);
 }
 
 /*
@@ -180,13 +180,13 @@ ConditionVariableTimedSleep(ConditionVariable *cv, long timeout,
 		 * by something other than ConditionVariableSignal; though we don't
 		 * guarantee not to return spuriously, we'll avoid this obvious case.
 		 */
-		SpinLockAcquire(&cv->mutex);
+		LWLockAcquire(&cv->mutex, LW_EXCLUSIVE);
 		if (!proclist_contains(&cv->wakeup, MyProc->pgprocno, cvWaitLink))
 		{
 			done = true;
 			proclist_push_tail(&cv->wakeup, MyProc->pgprocno, cvWaitLink);
 		}
-		SpinLockRelease(&cv->mutex);
+		LWLockRelease(&cv->mutex);
 
 		/*
 		 * Check for interrupts, and return spuriously if that caused the
@@ -233,12 +233,12 @@ ConditionVariableCancelSleep(void)
 	if (cv == NULL)
 		return;
 
-	SpinLockAcquire(&cv->mutex);
+	LWLockAcquire(&cv->mutex, LW_EXCLUSIVE);
 	if (proclist_contains(&cv->wakeup, MyProc->pgprocno, cvWaitLink))
 		proclist_delete(&cv->wakeup, MyProc->pgprocno, cvWaitLink);
 	else
 		signaled = true;
-	SpinLockRelease(&cv->mutex);
+	LWLockRelease(&cv->mutex);
 
 	/*
 	 * If we've received a signal, pass it on to another waiting process, if
@@ -265,10 +265,10 @@ ConditionVariableSignal(ConditionVariable *cv)
 	PGPROC	   *proc = NULL;
 
 	/* Remove the first process from the wakeup queue (if any). */
-	SpinLockAcquire(&cv->mutex);
+	LWLockAcquire(&cv->mutex, LW_EXCLUSIVE);
 	if (!proclist_is_empty(&cv->wakeup))
 		proc = proclist_pop_head_node(&cv->wakeup, cvWaitLink);
-	SpinLockRelease(&cv->mutex);
+	LWLockRelease(&cv->mutex);
 
 	/* If we found someone sleeping, set their latch to wake them up. */
 	if (proc != NULL)
@@ -287,6 +287,7 @@ ConditionVariableBroadcast(ConditionVariable *cv)
 {
 	int			pgprocno = MyProc->pgprocno;
 	PGPROC	   *proc = NULL;
+	bool		inserted_sentinel = false;
 	bool		have_sentinel = false;
 
 	/*
@@ -313,52 +314,42 @@ ConditionVariableBroadcast(ConditionVariable *cv)
 	if (cv_sleep_target != NULL)
 		ConditionVariableCancelSleep();
 
-	/*
-	 * Inspect the state of the queue.  If it's empty, we have nothing to do.
-	 * If there's exactly one entry, we need only remove and signal that
-	 * entry.  Otherwise, remove the first entry and insert our sentinel.
-	 */
-	SpinLockAcquire(&cv->mutex);
-	/* While we're here, let's assert we're not in the list. */
-	Assert(!proclist_contains(&cv->wakeup, pgprocno, cvWaitLink));
-
-	if (!proclist_is_empty(&cv->wakeup))
+	do
 	{
-		proc = proclist_pop_head_node(&cv->wakeup, cvWaitLink);
-		if (!proclist_is_empty(&cv->wakeup))
-		{
-			proclist_push_tail(&cv->wakeup, pgprocno, cvWaitLink);
-			have_sentinel = true;
-		}
-	}
-	SpinLockRelease(&cv->mutex);
-
-	/* Awaken first waiter, if there was one. */
-	if (proc != NULL)
-		SetLatch(&proc->procLatch);
+		int max_batch_size = Min(LATCH_BATCH_SIZE, 8);	/* XXX */
+		LatchBatch batch = {0};
 
-	while (have_sentinel)
-	{
-		/*
-		 * Each time through the loop, remove the first wakeup list entry, and
-		 * signal it unless it's our sentinel.  Repeat as long as the sentinel
-		 * remains in the list.
-		 *
-		 * Notice that if someone else removes our sentinel, we will waken one
-		 * additional process before exiting.  That's intentional, because if
-		 * someone else signals the CV, they may be intending to waken some
-		 * third process that added itself to the list after we added the
-		 * sentinel.  Better to give a spurious wakeup (which should be
-		 * harmless beyond wasting some cycles) than to lose a wakeup.
-		 */
-		proc = NULL;
-		SpinLockAcquire(&cv->mutex);
-		if (!proclist_is_empty(&cv->wakeup))
+		LWLockAcquire(&cv->mutex, LW_EXCLUSIVE);
+		while (!proclist_is_empty(&cv->wakeup))
+		{
+			if (batch.size == max_batch_size)
+			{
+				if (!inserted_sentinel)
+				{
+					/* First batch of many.  Add sentinel. */
+					proclist_push_tail(&cv->wakeup, pgprocno, cvWaitLink);
+					inserted_sentinel = true;
+					have_sentinel = true;
+				}
+				break;
+			}
 			proc = proclist_pop_head_node(&cv->wakeup, cvWaitLink);
-		have_sentinel = proclist_contains(&cv->wakeup, pgprocno, cvWaitLink);
-		SpinLockRelease(&cv->mutex);
+			if (proc == MyProc)
+			{
+				/* We hit our sentinel.  We're done. */
+				have_sentinel = false;
+				break;
+			}
+			else if (!proclist_contains(&cv->wakeup, pgprocno, cvWaitLink))
+			{
+				/* Someone else hit our sentinel.  We're done. */
+				have_sentinel = false;
+			}
+			AddLatch(&batch, &proc->procLatch);
+		}
+		LWLockRelease(&cv->mutex);
 
-		if (proc != NULL && proc != MyProc)
-			SetLatch(&proc->procLatch);
-	}
+		/* Awaken this batch of waiters, if there were some. */
+		SetLatches(&batch);
+	} while (have_sentinel);
 }
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index d274c9b1dc..03186da0a3 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -183,6 +183,8 @@ static const char *const BuiltinTrancheNames[] = {
 	"PgStatsHash",
 	/* LWTRANCHE_PGSTATS_DATA: */
 	"PgStatsData",
+	/* LWTRANCHE_CONDITION_VARIABLE: */
+	"ConditionVariable",
 };
 
 StaticAssertDecl(lengthof(BuiltinTrancheNames) ==
diff --git a/src/include/storage/condition_variable.h b/src/include/storage/condition_variable.h
index e89175ebd5..82b8eeabf5 100644
--- a/src/include/storage/condition_variable.h
+++ b/src/include/storage/condition_variable.h
@@ -22,12 +22,12 @@
 #ifndef CONDITION_VARIABLE_H
 #define CONDITION_VARIABLE_H
 
+#include "storage/lwlock.h"
 #include "storage/proclist_types.h"
-#include "storage/spin.h"
 
 typedef struct
 {
-	slock_t		mutex;			/* spinlock protecting the wakeup list */
+	LWLock		mutex;			/* lock protecting the wakeup list */
 	proclist_head wakeup;		/* list of wake-able processes */
 } ConditionVariable;
 
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index ca4eca76f4..dcbffe11a2 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -193,6 +193,7 @@ typedef enum BuiltinTrancheIds
 	LWTRANCHE_PGSTATS_DSA,
 	LWTRANCHE_PGSTATS_HASH,
 	LWTRANCHE_PGSTATS_DATA,
+	LWTRANCHE_CONDITION_VARIABLE,
 	LWTRANCHE_FIRST_USER_DEFINED
 }			BuiltinTrancheIds;
 
-- 
2.35.1

