From 90e9600286de19e2bb8c458dbb2632438542ed58 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Sat, 24 Aug 2024 00:42:08 +0300
Subject: [PATCH 06/11] Use proc number for wakeups in waitlsn.c

Also rename WaitLSNSetLatches() to WaitLSNWakeup(), to emphasize what
it does, rather than how it does it.
---
 src/backend/access/transam/xlog.c         |  2 +-
 src/backend/access/transam/xlogrecovery.c |  2 +-
 src/backend/commands/waitlsn.c            | 18 ++++++++++--------
 src/include/commands/waitlsn.h            |  9 +++------
 4 files changed, 15 insertions(+), 16 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 63a8ab5a29..fdc0906314 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -6155,7 +6155,7 @@ StartupXLOG(void)
 	 * Wake up all waiters for replay LSN.  They need to report an error that
 	 * recovery was ended before achieving the target LSN.
 	 */
-	WaitLSNSetLatches(InvalidXLogRecPtr);
+	WaitLSNWakeup(InvalidXLogRecPtr);
 
 	/*
 	 * Shutdown the recovery environment.  This must occur after
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index ad817fbca6..35a5e31e3d 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -1837,7 +1837,7 @@ PerformWalRecovery(void)
 			if (waitLSNState &&
 				(XLogRecoveryCtl->lastReplayedEndRecPtr >=
 				 pg_atomic_read_u64(&waitLSNState->minWaitedLSN)))
-				WaitLSNSetLatches(XLogRecoveryCtl->lastReplayedEndRecPtr);
+				WaitLSNWakeup(XLogRecoveryCtl->lastReplayedEndRecPtr);
 
 			/* Else, try to fetch the next WAL record */
 			record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
diff --git a/src/backend/commands/waitlsn.c b/src/backend/commands/waitlsn.c
index d9cf9e7d75..1937bafafc 100644
--- a/src/backend/commands/waitlsn.c
+++ b/src/backend/commands/waitlsn.c
@@ -113,7 +113,7 @@ addLSNWaiter(XLogRecPtr lsn)
 
 	Assert(!procInfo->inHeap);
 
-	procInfo->latch = MyLatch;
+	procInfo->procno = MyProcNumber;
 	procInfo->waitLSN = lsn;
 
 	pairingheap_add(&waitLSNState->waitersHeap, &procInfo->phNode);
@@ -152,19 +152,21 @@ deleteLSNWaiter(void)
  * and set latches for all waiters.
  */
 void
-WaitLSNSetLatches(XLogRecPtr currentLSN)
+WaitLSNWakeup(XLogRecPtr currentLSN)
 {
 	int			i;
-	Latch	  **wakeUpProcLatches;
+	ProcNumber *wakeUpProcs;
 	int			numWakeUpProcs = 0;
 
-	wakeUpProcLatches = palloc(sizeof(Latch *) * MaxBackends);
+	wakeUpProcs = palloc(sizeof(ProcNumber) * MaxBackends);
 
 	LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
 
 	/*
 	 * Iterate the pairing heap of waiting processes till we find LSN not yet
-	 * replayed.  Record the process latches to set them later.
+	 * replayed.  Record the process numbers to interrupt, but to avoid
+	 * holding the lock for too long, send the interrupts to them only after
+	 * releasing the lock.
 	 */
 	while (!pairingheap_is_empty(&waitLSNState->waitersHeap))
 	{
@@ -175,7 +177,7 @@ WaitLSNSetLatches(XLogRecPtr currentLSN)
 			procInfo->waitLSN > currentLSN)
 			break;
 
-		wakeUpProcLatches[numWakeUpProcs++] = procInfo->latch;
+		wakeUpProcs[numWakeUpProcs++] = procInfo->procno;
 		(void) pairingheap_remove_first(&waitLSNState->waitersHeap);
 		procInfo->inHeap = false;
 	}
@@ -192,9 +194,9 @@ WaitLSNSetLatches(XLogRecPtr currentLSN)
 	 */
 	for (i = 0; i < numWakeUpProcs; i++)
 	{
-		SetLatch(wakeUpProcLatches[i]);
+		SetLatch(&GetPGProcByNumber(wakeUpProcs[i])->procLatch);
 	}
-	pfree(wakeUpProcLatches);
+	pfree(wakeUpProcs);
 }
 
 /*
diff --git a/src/include/commands/waitlsn.h b/src/include/commands/waitlsn.h
index f719feadb0..339ba5d460 100644
--- a/src/include/commands/waitlsn.h
+++ b/src/include/commands/waitlsn.h
@@ -29,11 +29,8 @@ typedef struct WaitLSNProcInfo
 	/* LSN, which this process is waiting for */
 	XLogRecPtr	waitLSN;
 
-	/*
-	 * A pointer to the latch, which should be set once the waitLSN is
-	 * replayed.
-	 */
-	Latch	   *latch;
+	/* Process to be woken up once the waitLSN is replayed */
+	ProcNumber procno;
 
 	/* A pairing heap node for participation in waitLSNState->waitersHeap */
 	pairingheap_node phNode;
@@ -74,7 +71,7 @@ extern PGDLLIMPORT WaitLSNState *waitLSNState;
 
 extern Size WaitLSNShmemSize(void);
 extern void WaitLSNShmemInit(void);
-extern void WaitLSNSetLatches(XLogRecPtr currentLSN);
+extern void WaitLSNWakeup(XLogRecPtr currentLSN);
 extern void WaitLSNCleanup(void);
 
 #endif							/* WAIT_LSN_H */
-- 
2.39.2

