From 1dca369e595f1259736a015efcf13b1041d7780d Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Wed, 26 Oct 2022 16:43:31 +1300
Subject: [PATCH 3/8] Use SetLatches() for heavyweight locks.

Collect wakeups into a LatchBatch to be set at once after the
LockManager's internal partition lock is released.  This avoids holding
busy locks while issuing system calls.

Currently, waiters immediately try to acquire that lock themselves, so
deferring until it's released makes sense to avoid contention (a later
patch may remove that lock acquisition though).
---
 src/backend/storage/lmgr/deadlock.c |  4 ++--
 src/backend/storage/lmgr/lock.c     | 36 +++++++++++++++++++----------
 src/backend/storage/lmgr/proc.c     | 29 ++++++++++++++---------
 src/include/storage/lock.h          |  5 ++--
 src/include/storage/proc.h          |  5 ++--
 5 files changed, 49 insertions(+), 30 deletions(-)

diff --git a/src/backend/storage/lmgr/deadlock.c b/src/backend/storage/lmgr/deadlock.c
index cd9c0418ec..1b7454a8fe 100644
--- a/src/backend/storage/lmgr/deadlock.c
+++ b/src/backend/storage/lmgr/deadlock.c
@@ -214,7 +214,7 @@ InitDeadLockChecking(void)
  * and (b) we are typically invoked inside a signal handler.
  */
 DeadLockState
-DeadLockCheck(PGPROC *proc)
+DeadLockCheck(PGPROC *proc, LatchBatch *wakeups)
 {
 	int			i,
 				j;
@@ -272,7 +272,7 @@ DeadLockCheck(PGPROC *proc)
 #endif
 
 		/* See if any waiters for the lock can be woken up now */
-		ProcLockWakeup(GetLocksMethodTable(lock), lock);
+		ProcLockWakeup(GetLocksMethodTable(lock), lock, wakeups);
 	}
 
 	/* Return code tells caller if we had to escape a deadlock or not */
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index 3d1049cf75..f59ae20069 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -374,14 +374,14 @@ static PROCLOCK *SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc,
 static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner);
 static void BeginStrongLockAcquire(LOCALLOCK *locallock, uint32 fasthashcode);
 static void FinishStrongLockAcquire(void);
-static void WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner);
+static void WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner, LatchBatch *wakeups);
 static void ReleaseLockIfHeld(LOCALLOCK *locallock, bool sessionLock);
 static void LockReassignOwner(LOCALLOCK *locallock, ResourceOwner parent);
 static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode,
 						PROCLOCK *proclock, LockMethod lockMethodTable);
 static void CleanUpLock(LOCK *lock, PROCLOCK *proclock,
 						LockMethod lockMethodTable, uint32 hashcode,
-						bool wakeupNeeded);
+						bool wakeupNeeded, LatchBatch *wakeups);
 static void LockRefindAndRelease(LockMethod lockMethodTable, PGPROC *proc,
 								 LOCKTAG *locktag, LOCKMODE lockmode,
 								 bool decrement_strong_lock_count);
@@ -787,6 +787,7 @@ LockAcquireExtended(const LOCKTAG *locktag,
 	LWLock	   *partitionLock;
 	bool		found_conflict;
 	bool		log_lock = false;
+	LatchBatch	wakeups = {0};
 
 	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
 		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
@@ -1098,7 +1099,7 @@ LockAcquireExtended(const LOCKTAG *locktag,
 										 locktag->locktag_type,
 										 lockmode);
 
-		WaitOnLock(locallock, owner);
+		WaitOnLock(locallock, owner, &wakeups);
 
 		TRACE_POSTGRESQL_LOCK_WAIT_DONE(locktag->locktag_field1,
 										locktag->locktag_field2,
@@ -1138,6 +1139,8 @@ LockAcquireExtended(const LOCKTAG *locktag,
 
 	LWLockRelease(partitionLock);
 
+	SetLatches(&wakeups);
+
 	/*
 	 * Emit a WAL record if acquisition of this lock needs to be replayed in a
 	 * standby server.
@@ -1634,7 +1637,7 @@ UnGrantLock(LOCK *lock, LOCKMODE lockmode,
 static void
 CleanUpLock(LOCK *lock, PROCLOCK *proclock,
 			LockMethod lockMethodTable, uint32 hashcode,
-			bool wakeupNeeded)
+			bool wakeupNeeded, LatchBatch *wakeups)
 {
 	/*
 	 * If this was my last hold on this lock, delete my entry in the proclock
@@ -1674,7 +1677,7 @@ CleanUpLock(LOCK *lock, PROCLOCK *proclock,
 	else if (wakeupNeeded)
 	{
 		/* There are waiters on this lock, so wake them up. */
-		ProcLockWakeup(lockMethodTable, lock);
+		ProcLockWakeup(lockMethodTable, lock, wakeups);
 	}
 }
 
@@ -1811,7 +1814,7 @@ MarkLockClear(LOCALLOCK *locallock)
  * The appropriate partition lock must be held at entry.
  */
 static void
-WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner)
+WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner, LatchBatch *wakeups)
 {
 	LOCKMETHODID lockmethodid = LOCALLOCK_LOCKMETHOD(*locallock);
 	LockMethod	lockMethodTable = LockMethods[lockmethodid];
@@ -1856,7 +1859,7 @@ WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner)
 	 */
 	PG_TRY();
 	{
-		if (ProcSleep(locallock, lockMethodTable) != PROC_WAIT_STATUS_OK)
+		if (ProcSleep(locallock, lockMethodTable, wakeups) != PROC_WAIT_STATUS_OK)
 		{
 			/*
 			 * We failed as a result of a deadlock, see CheckDeadLock(). Quit
@@ -1915,7 +1918,7 @@ WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner)
  * NB: this does not clean up any locallock object that may exist for the lock.
  */
 void
-RemoveFromWaitQueue(PGPROC *proc, uint32 hashcode)
+RemoveFromWaitQueue(PGPROC *proc, uint32 hashcode, LatchBatch *wakeups)
 {
 	LOCK	   *waitLock = proc->waitLock;
 	PROCLOCK   *proclock = proc->waitProcLock;
@@ -1957,7 +1960,7 @@ RemoveFromWaitQueue(PGPROC *proc, uint32 hashcode)
 	 */
 	CleanUpLock(waitLock, proclock,
 				LockMethods[lockmethodid], hashcode,
-				true);
+				true, wakeups);
 }
 
 /*
@@ -1982,6 +1985,7 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
 	PROCLOCK   *proclock;
 	LWLock	   *partitionLock;
 	bool		wakeupNeeded;
+	LatchBatch	wakeups = {0};
 
 	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
 		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
@@ -2160,10 +2164,12 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
 
 	CleanUpLock(lock, proclock,
 				lockMethodTable, locallock->hashcode,
-				wakeupNeeded);
+				wakeupNeeded, &wakeups);
 
 	LWLockRelease(partitionLock);
 
+	SetLatches(&wakeups);
+
 	RemoveLocalLock(locallock);
 	return true;
 }
@@ -2188,6 +2194,7 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
 	PROCLOCK   *proclock;
 	int			partition;
 	bool		have_fast_path_lwlock = false;
+	LatchBatch	wakeups = {0};
 
 	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
 		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
@@ -2434,10 +2441,12 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
 			CleanUpLock(lock, proclock,
 						lockMethodTable,
 						LockTagHashCode(&lock->tag),
-						wakeupNeeded);
+						wakeupNeeded, &wakeups);
 		}						/* loop over PROCLOCKs within this partition */
 
 		LWLockRelease(partitionLock);
+
+		SetLatches(&wakeups);
 	}							/* loop over partitions */
 
 #ifdef LOCK_DEBUG
@@ -3137,6 +3146,7 @@ LockRefindAndRelease(LockMethod lockMethodTable, PGPROC *proc,
 	uint32		proclock_hashcode;
 	LWLock	   *partitionLock;
 	bool		wakeupNeeded;
+	LatchBatch	wakeups = {0};
 
 	hashcode = LockTagHashCode(locktag);
 	partitionLock = LockHashPartitionLock(hashcode);
@@ -3190,10 +3200,12 @@ LockRefindAndRelease(LockMethod lockMethodTable, PGPROC *proc,
 
 	CleanUpLock(lock, proclock,
 				lockMethodTable, hashcode,
-				wakeupNeeded);
+				wakeupNeeded, &wakeups);
 
 	LWLockRelease(partitionLock);
 
+	SetLatches(&wakeups);
+
 	/*
 	 * Decrement strong lock count.  This logic is needed only for 2PC.
 	 */
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 13fa07b0ff..f0e19b74a7 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -700,6 +700,7 @@ LockErrorCleanup(void)
 {
 	LWLock	   *partitionLock;
 	DisableTimeoutParams timeouts[2];
+	LatchBatch	wakeups = {0};
 
 	HOLD_INTERRUPTS();
 
@@ -733,7 +734,7 @@ LockErrorCleanup(void)
 	if (MyProc->links.next != NULL)
 	{
 		/* We could not have been granted the lock yet */
-		RemoveFromWaitQueue(MyProc, lockAwaited->hashcode);
+		RemoveFromWaitQueue(MyProc, lockAwaited->hashcode, &wakeups);
 	}
 	else
 	{
@@ -750,6 +751,7 @@ LockErrorCleanup(void)
 	lockAwaited = NULL;
 
 	LWLockRelease(partitionLock);
+	SetLatches(&wakeups);
 
 	RESUME_INTERRUPTS();
 }
@@ -1042,7 +1044,7 @@ ProcQueueInit(PROC_QUEUE *queue)
  * NOTES: The process queue is now a priority queue for locking.
  */
 ProcWaitStatus
-ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
+ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable, LatchBatch *wakeups)
 {
 	LOCKMODE	lockmode = locallock->tag.mode;
 	LOCK	   *lock = locallock->lock;
@@ -1188,7 +1190,7 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
 	 */
 	if (early_deadlock)
 	{
-		RemoveFromWaitQueue(MyProc, hashcode);
+		RemoveFromWaitQueue(MyProc, hashcode, wakeups);
 		return PROC_WAIT_STATUS_ERROR;
 	}
 
@@ -1204,6 +1206,7 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
 	 * LockErrorCleanup will clean up if cancel/die happens.
 	 */
 	LWLockRelease(partitionLock);
+	SetLatches(wakeups);
 
 	/*
 	 * Also, now that we will successfully clean up after an ereport, it's
@@ -1662,8 +1665,8 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
  * to twiddle the lock's request counts too --- see RemoveFromWaitQueue.
  * Hence, in practice the waitStatus parameter must be PROC_WAIT_STATUS_OK.
  */
-PGPROC *
-ProcWakeup(PGPROC *proc, ProcWaitStatus waitStatus)
+static PGPROC *
+ProcWakeup(PGPROC *proc, ProcWaitStatus waitStatus, LatchBatch *wakeups)
 {
 	PGPROC	   *retProc;
 
@@ -1686,8 +1689,8 @@ ProcWakeup(PGPROC *proc, ProcWaitStatus waitStatus)
 	proc->waitStatus = waitStatus;
 	pg_atomic_write_u64(&MyProc->waitStart, 0);
 
-	/* And awaken it */
-	SetLatch(&proc->procLatch);
+	/* Schedule it to be awoken */
+	AddLatch(wakeups, &proc->procLatch);
 
 	return retProc;
 }
@@ -1700,7 +1703,7 @@ ProcWakeup(PGPROC *proc, ProcWaitStatus waitStatus)
  * The appropriate lock partition lock must be held by caller.
  */
 void
-ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock)
+ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock, LatchBatch *wakeups)
 {
 	PROC_QUEUE *waitQueue = &(lock->waitProcs);
 	int			queue_size = waitQueue->size;
@@ -1728,7 +1731,7 @@ ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock)
 		{
 			/* OK to waken */
 			GrantLock(lock, proc->waitProcLock, lockmode);
-			proc = ProcWakeup(proc, PROC_WAIT_STATUS_OK);
+			proc = ProcWakeup(proc, PROC_WAIT_STATUS_OK, wakeups);
 
 			/*
 			 * ProcWakeup removes proc from the lock's waiting process queue
@@ -1762,6 +1765,7 @@ static void
 CheckDeadLock(void)
 {
 	int			i;
+	LatchBatch	wakeups = {0};
 
 	/*
 	 * Acquire exclusive lock on the entire shared lock data structures. Must
@@ -1796,7 +1800,7 @@ CheckDeadLock(void)
 #endif
 
 	/* Run the deadlock check, and set deadlock_state for use by ProcSleep */
-	deadlock_state = DeadLockCheck(MyProc);
+	deadlock_state = DeadLockCheck(MyProc, &wakeups);
 
 	if (deadlock_state == DS_HARD_DEADLOCK)
 	{
@@ -1813,7 +1817,8 @@ CheckDeadLock(void)
 		 * return from the signal handler.
 		 */
 		Assert(MyProc->waitLock != NULL);
-		RemoveFromWaitQueue(MyProc, LockTagHashCode(&(MyProc->waitLock->tag)));
+		RemoveFromWaitQueue(MyProc, LockTagHashCode(&(MyProc->waitLock->tag)),
+							&wakeups);
 
 		/*
 		 * We're done here.  Transaction abort caused by the error that
@@ -1837,6 +1842,8 @@ CheckDeadLock(void)
 check_done:
 	for (i = NUM_LOCK_PARTITIONS; --i >= 0;)
 		LWLockRelease(LockHashPartitionLockByIndex(i));
+
+	SetLatches(&wakeups);
 }
 
 /*
diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h
index e4e1495b24..163bf41293 100644
--- a/src/include/storage/lock.h
+++ b/src/include/storage/lock.h
@@ -19,6 +19,7 @@
 #endif
 
 #include "storage/backendid.h"
+#include "storage/latch.h"
 #include "storage/lockdefs.h"
 #include "storage/lwlock.h"
 #include "storage/shmem.h"
@@ -575,7 +576,7 @@ extern bool LockCheckConflicts(LockMethod lockMethodTable,
 							   LOCK *lock, PROCLOCK *proclock);
 extern void GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode);
 extern void GrantAwaitedLock(void);
-extern void RemoveFromWaitQueue(PGPROC *proc, uint32 hashcode);
+extern void RemoveFromWaitQueue(PGPROC *proc, uint32 hashcode, LatchBatch *wakeups);
 extern Size LockShmemSize(void);
 extern LockData *GetLockStatusData(void);
 extern BlockedProcsData *GetBlockerStatusData(int blocked_pid);
@@ -592,7 +593,7 @@ extern void lock_twophase_postabort(TransactionId xid, uint16 info,
 extern void lock_twophase_standby_recover(TransactionId xid, uint16 info,
 										  void *recdata, uint32 len);
 
-extern DeadLockState DeadLockCheck(PGPROC *proc);
+extern DeadLockState DeadLockCheck(PGPROC *proc, LatchBatch *wakeups);
 extern PGPROC *GetBlockingAutoVacuumPgproc(void);
 extern void DeadLockReport(void) pg_attribute_noreturn();
 extern void RememberSimpleDeadLock(PGPROC *proc1,
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 8d096fdeeb..4dba46cfcb 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -449,9 +449,8 @@ extern bool HaveNFreeProcs(int n);
 extern void ProcReleaseLocks(bool isCommit);
 
 extern void ProcQueueInit(PROC_QUEUE *queue);
-extern ProcWaitStatus ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable);
-extern PGPROC *ProcWakeup(PGPROC *proc, ProcWaitStatus waitStatus);
-extern void ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock);
+extern ProcWaitStatus ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable, LatchBatch *wakeups);
+extern void ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock, LatchBatch *wakeups);
 extern void CheckDeadLockAlert(void);
 extern bool IsWaitingForLock(void);
 extern void LockErrorCleanup(void);
-- 
2.35.1

