From a1f0fd69a34d146294bd4398bd5a5712cdc002ce Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Tue, 5 Jan 2021 10:10:36 -0800
Subject: [PATCH v2.0 02/17] Allow lwlocks to be unowned

This is required for AIO so that the lock hold during a write can be released
in another backend. Which in turn is required to avoid the potential for
deadlocks.
---
 src/include/storage/lwlock.h      |  2 +
 src/backend/storage/lmgr/lwlock.c | 96 +++++++++++++++++++++----------
 2 files changed, 68 insertions(+), 30 deletions(-)

diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index d70e6d37e09..00e8022fbad 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -129,6 +129,8 @@ extern bool LWLockAcquireOrWait(LWLock *lock, LWLockMode mode);
 extern void LWLockRelease(LWLock *lock);
 extern void LWLockReleaseClearVar(LWLock *lock, pg_atomic_uint64 *valptr, uint64 val);
 extern void LWLockReleaseAll(void);
+extern LWLockMode LWLockReleaseOwnership(LWLock *l);
+extern void LWLockReleaseUnowned(LWLock *l, LWLockMode mode);
 extern bool LWLockHeldByMe(LWLock *lock);
 extern bool LWLockAnyHeldByMe(LWLock *lock, int nlocks, size_t stride);
 extern bool LWLockHeldByMeInMode(LWLock *lock, LWLockMode mode);
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index e765754d805..f3d3435b1f5 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -1773,52 +1773,36 @@ LWLockUpdateVar(LWLock *lock, pg_atomic_uint64 *valptr, uint64 val)
 	}
 }
 
-
-/*
- * LWLockRelease - release a previously acquired lock
- */
-void
-LWLockRelease(LWLock *lock)
+static void
+LWLockReleaseInternal(LWLock *lock, LWLockMode mode)
 {
-	LWLockMode	mode;
 	uint32		oldstate;
 	bool		check_waiters;
-	int			i;
-
-	/*
-	 * Remove lock from list of locks held.  Usually, but not always, it will
-	 * be the latest-acquired lock; so search array backwards.
-	 */
-	for (i = num_held_lwlocks; --i >= 0;)
-		if (lock == held_lwlocks[i].lock)
-			break;
-
-	if (i < 0)
-		elog(ERROR, "lock %s is not held", T_NAME(lock));
-
-	mode = held_lwlocks[i].mode;
-
-	num_held_lwlocks--;
-	for (; i < num_held_lwlocks; i++)
-		held_lwlocks[i] = held_lwlocks[i + 1];
-
-	PRINT_LWDEBUG("LWLockRelease", lock, mode);
 
 	/*
 	 * Release my hold on lock, after that it can immediately be acquired by
 	 * others, even if we still have to wakeup other waiters.
 	 */
 	if (mode == LW_EXCLUSIVE)
-		oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_EXCLUSIVE);
+		oldstate = pg_atomic_fetch_sub_u32(&lock->state, LW_VAL_EXCLUSIVE);
 	else
-		oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_SHARED);
+		oldstate = pg_atomic_fetch_sub_u32(&lock->state, LW_VAL_SHARED);
 
 	/* nobody else can have that kind of lock */
-	Assert(!(oldstate & LW_VAL_EXCLUSIVE));
+	if (mode == LW_EXCLUSIVE)
+		Assert((oldstate & LW_LOCK_MASK) == LW_VAL_EXCLUSIVE);
+	else
+		Assert((oldstate & LW_LOCK_MASK) < LW_VAL_EXCLUSIVE &&
+			   (oldstate & LW_LOCK_MASK) >= LW_VAL_SHARED);
 
 	if (TRACE_POSTGRESQL_LWLOCK_RELEASE_ENABLED())
 		TRACE_POSTGRESQL_LWLOCK_RELEASE(T_NAME(lock));
 
+	if (mode == LW_EXCLUSIVE)
+		oldstate -= LW_VAL_EXCLUSIVE;
+	else
+		oldstate -= LW_VAL_SHARED;
+
 	/*
 	 * We're still waiting for backends to get scheduled, don't wake them up
 	 * again.
@@ -1841,6 +1825,58 @@ LWLockRelease(LWLock *lock)
 		LWLockWakeup(lock);
 	}
 
+	TRACE_POSTGRESQL_LWLOCK_RELEASE(T_NAME(lock));
+}
+
+void
+LWLockReleaseUnowned(LWLock *lock, LWLockMode mode)
+{
+	LWLockReleaseInternal(lock, mode);
+}
+
+/*
+ * XXX: this doesn't do a RESUME_INTERRUPTS(), responsibility of the caller.
+ */
+LWLockMode
+LWLockReleaseOwnership(LWLock *lock)
+{
+	LWLockMode	mode;
+	int			i;
+
+	/*
+	 * Remove lock from list of locks held.  Usually, but not always, it will
+	 * be the latest-acquired lock; so search array backwards.
+	 */
+	for (i = num_held_lwlocks; --i >= 0;)
+		if (lock == held_lwlocks[i].lock)
+			break;
+
+	if (i < 0)
+		elog(ERROR, "lock %s is not held", T_NAME(lock));
+
+	mode = held_lwlocks[i].mode;
+
+	num_held_lwlocks--;
+	for (; i < num_held_lwlocks; i++)
+		held_lwlocks[i] = held_lwlocks[i + 1];
+
+	return mode;
+}
+
+/*
+ * LWLockRelease - release a previously acquired lock
+ */
+void
+LWLockRelease(LWLock *lock)
+{
+	LWLockMode	mode;
+
+	mode = LWLockReleaseOwnership(lock);
+
+	PRINT_LWDEBUG("LWLockRelease", lock, mode);
+
+	LWLockReleaseInternal(lock, mode);
+
 	/*
 	 * Now okay to allow cancel/die interrupts.
 	 */
-- 
2.45.2.827.g557ae147e6

