From ff815ec0ea2ef1b3b5e30dc44e6f952353ec4b91 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Sat, 15 Mar 2025 12:29:57 -0400
Subject: [PATCH v2.9 13/30] localbuf: Track pincount in BufferDesc as well

For AIO on temporary table buffers the AIO subsystem needs to be able to
ensure a pin on a buffer while AIO is going on, even if the IO issuing query
errors out. Tracking the buffer in LocalRefCount does not work, as it would
cause CheckForLocalBufferLeaks() to assert out.

Instead, also track the refcount in BufferDesc.state, not just
LocalRefCount. This also makes local buffers behave a bit more akin to shared
buffers.

Note that we still don't need locking, AIO completion callbacks for local
buffers are executed in the issuing session (i.e. nobody else has access to
the BufferDesc).

Discussion: https://postgr.es/m/uvrtrknj4kdytuboidbhwclo4gxhswwcpgadptsjvjqcluzmah%40brqs62irg4dt
---
 src/backend/storage/buffer/bufmgr.c   | 22 +++++++++++++++++++++
 src/backend/storage/buffer/localbuf.c | 28 +++++++++++++++++++++++++--
 2 files changed, 48 insertions(+), 2 deletions(-)

diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 95911b51d58..a9643930974 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -5393,6 +5393,18 @@ ConditionalLockBufferForCleanup(Buffer buffer)
 		Assert(refcount > 0);
 		if (refcount != 1)
 			return false;
+
+		/*
+		 * Check that the AIO subsystem doesn't have a pin. Likely not
+		 * possible today, but better safe than sorry.
+		 */
+		bufHdr = GetLocalBufferDescriptor(-buffer - 1);
+		buf_state = pg_atomic_read_u32(&bufHdr->state);
+		refcount = BUF_STATE_GET_REFCOUNT(buf_state);
+		Assert(refcount > 0);
+		if (refcount != 1)
+			return false;
+
 		/* Nobody else to wait for */
 		return true;
 	}
@@ -5446,6 +5458,16 @@ IsBufferCleanupOK(Buffer buffer)
 		/* There should be exactly one pin */
 		if (LocalRefCount[-buffer - 1] != 1)
 			return false;
+
+		/*
+		 * Check that the AIO subsystem doesn't have a pin. Likely not
+		 * possible today, but better safe than sorry.
+		 */
+		bufHdr = GetLocalBufferDescriptor(-buffer - 1);
+		buf_state = pg_atomic_read_u32(&bufHdr->state);
+		if (BUF_STATE_GET_REFCOUNT(buf_state) != 1)
+			return false;
+
 		/* Nobody else to wait for */
 		return true;
 	}
diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c
index f172a5c7820..3a722321533 100644
--- a/src/backend/storage/buffer/localbuf.c
+++ b/src/backend/storage/buffer/localbuf.c
@@ -249,6 +249,13 @@ GetLocalVictimBuffer(void)
 				pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
 				trycounter = NLocBuffer;
 			}
+			else if (BUF_STATE_GET_REFCOUNT(buf_state) > 0)
+			{
+				/*
+				 * This can be reached if the backend initiated AIO for this
+				 * buffer and then errored out.
+				 */
+			}
 			else
 			{
 				/* Found a usable buffer */
@@ -570,7 +577,13 @@ InvalidateLocalBuffer(BufferDesc *bufHdr, bool check_unreferenced)
 
 	buf_state = pg_atomic_read_u32(&bufHdr->state);
 
-	if (check_unreferenced && LocalRefCount[bufid] != 0)
+	/*
+	 * We need to test not just LocalRefCount[bufid] but also the BufferDesc
+	 * itself, as the latter is used to represent a pin by the AIO subsystem.
+	 * This can happen if AIO is initiated and then the query errors out.
+	 */
+	if (check_unreferenced &&
+		(LocalRefCount[bufid] != 0 || BUF_STATE_GET_REFCOUNT(buf_state) != 0))
 		elog(ERROR, "block %u of %s is still referenced (local %u)",
 			 bufHdr->tag.blockNum,
 			 relpathbackend(BufTagGetRelFileLocator(&bufHdr->tag),
@@ -744,12 +757,13 @@ PinLocalBuffer(BufferDesc *buf_hdr, bool adjust_usagecount)
 	if (LocalRefCount[bufid] == 0)
 	{
 		NLocalPinnedBuffers++;
+		buf_state += BUF_REFCOUNT_ONE;
 		if (adjust_usagecount &&
 			BUF_STATE_GET_USAGECOUNT(buf_state) < BM_MAX_USAGE_COUNT)
 		{
 			buf_state += BUF_USAGECOUNT_ONE;
-			pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
 		}
+		pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
 	}
 	LocalRefCount[bufid]++;
 	ResourceOwnerRememberBuffer(CurrentResourceOwner,
@@ -775,7 +789,17 @@ UnpinLocalBufferNoOwner(Buffer buffer)
 	Assert(NLocalPinnedBuffers > 0);
 
 	if (--LocalRefCount[buffid] == 0)
+	{
+		BufferDesc *buf_hdr = GetLocalBufferDescriptor(buffid);
+		uint32		buf_state;
+
 		NLocalPinnedBuffers--;
+
+		buf_state = pg_atomic_read_u32(&buf_hdr->state);
+		Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
+		buf_state -= BUF_REFCOUNT_ONE;
+		pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
+	}
 }
 
 /*
-- 
2.48.1.76.g4e746b1a31.dirty

