From 9c9745754dc88502e050b5822d90d20b517a052b Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Wed, 22 Jan 2025 13:44:44 -0500
Subject: [PATCH v2.3 20/30] WIP: localbuf: Track pincount in BufferDesc as
 well

For AIO on temp tables the AIO subsystem needs to be able to ensure a pin on a
buffer while AIO is going on, even if the IO issuing query errors out. To do
so, track the refcount in BufferDesc.state, not ust LocalRefCount.

Note that we still don't need locking, AIO completion callbacks for local
buffers are executed in the issuing session (nobody else has access to the
BufferDesc).
---
 src/backend/storage/buffer/bufmgr.c   |  40 ++++++++--
 src/backend/storage/buffer/localbuf.c | 108 ++++++++++++++++----------
 2 files changed, 101 insertions(+), 47 deletions(-)

diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 169829e8031..fe871691350 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -5356,8 +5356,20 @@ ConditionalLockBufferForCleanup(Buffer buffer)
 		Assert(refcount > 0);
 		if (refcount != 1)
 			return false;
-		/* Nobody else to wait for */
-		return true;
+
+		bufHdr = GetLocalBufferDescriptor(-buffer - 1);
+		buf_state = pg_atomic_read_u32(&bufHdr->state);
+
+		/*
+		 * Check that the AIO subsystem doesn't have a pin. Likely not
+		 * possible today, but better safe than sorry.
+		 */
+		refcount = BUF_STATE_GET_REFCOUNT(buf_state);
+		Assert(refcount > 0);
+		if (refcount == 1)
+			return true;
+
+		return false;
 	}
 
 	/* There should be exactly one local pin */
@@ -5409,8 +5421,18 @@ IsBufferCleanupOK(Buffer buffer)
 		/* There should be exactly one pin */
 		if (LocalRefCount[-buffer - 1] != 1)
 			return false;
-		/* Nobody else to wait for */
-		return true;
+
+		bufHdr = GetLocalBufferDescriptor(-buffer - 1);
+		buf_state = pg_atomic_read_u32(&bufHdr->state);
+
+		/*
+		 * Check that the AIO subsystem doesn't have a pin. Likely not
+		 * possible today, but better safe than sorry.
+		 */
+		if (BUF_STATE_GET_REFCOUNT(buf_state) == 1)
+			return true;
+
+		return false;
 	}
 
 	/* There should be exactly one local pin */
@@ -6388,9 +6410,15 @@ local_buffer_readv_stage(PgAioHandle *ioh)
 		buf_state = pg_atomic_read_u32(&bufHdr->state);
 
 		bufHdr->io_wref = io_wref;
-		LocalRefCount[-buf - 1] += 1;
 
-		UnlockBufHdr(bufHdr, buf_state);
+		/*
+		 * Track pin by AIO subsystem in BufferDesc, not in LocalRefCount as
+		 * one might initially think. This is necessary to handle this backend
+		 * erroring out while AIO is still in progress.
+		 */
+		buf_state += BUF_REFCOUNT_ONE;
+
+		pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
 	}
 }
 
diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c
index b3805c1ff94..72c93ae15a2 100644
--- a/src/backend/storage/buffer/localbuf.c
+++ b/src/backend/storage/buffer/localbuf.c
@@ -208,10 +208,19 @@ GetLocalVictimBuffer(void)
 				pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
 				trycounter = NLocBuffer;
 			}
+			else if (BUF_STATE_GET_REFCOUNT(buf_state) > 0)
+			{
+				/*
+				 * This can be reached if the backend initiated AIO for this
+				 * buffer and then errored out.
+				 */
+			}
 			else
 			{
 				/* Found a usable buffer */
 				PinLocalBuffer(bufHdr, false);
+				/* the buf_state may be modified inside PinLocalBuffer */
+				buf_state = pg_atomic_read_u32(&bufHdr->state);
 				break;
 			}
 		}
@@ -476,6 +485,44 @@ MarkLocalBufferDirty(Buffer buffer)
 	pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
 }
 
+static void
+InvalidateLocalBuffer(BufferDesc *bufHdr)
+{
+	Buffer		buffer = BufferDescriptorGetBuffer(bufHdr);
+	int			bufid = -buffer - 1;
+	uint32		buf_state;
+	LocalBufferLookupEnt *hresult;
+
+	buf_state = pg_atomic_read_u32(&bufHdr->state);
+
+	/*
+	 * We need to test not just LocalRefCount[bufid] but also the BufferDesc
+	 * itself, as the latter is used to represent a pin by the AIO subsystem.
+	 * This can happen if AIO is initiated and then the query errors out.
+	 */
+	if (LocalRefCount[bufid] != 0 ||
+		BUF_STATE_GET_REFCOUNT(buf_state) > 0)
+		elog(ERROR, "block %u of %s is still referenced (local %u)",
+			 bufHdr->tag.blockNum,
+			 relpathbackend(BufTagGetRelFileLocator(&bufHdr->tag),
+							MyProcNumber,
+							BufTagGetForkNum(&bufHdr->tag)),
+			 LocalRefCount[bufid]);
+
+	/* Remove entry from hashtable */
+	hresult = (LocalBufferLookupEnt *)
+		hash_search(LocalBufHash, &bufHdr->tag, HASH_REMOVE, NULL);
+	if (!hresult)				/* shouldn't happen */
+		elog(ERROR, "local buffer hash table corrupted");
+	/* Mark buffer invalid */
+	ClearBufferTag(&bufHdr->tag);
+
+	buf_state &= ~BUF_FLAG_MASK;
+	buf_state &= ~BUF_USAGECOUNT_MASK;
+	pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
+
+}
+
 /*
  * DropRelationLocalBuffers
  *		This function removes from the buffer pool all the pages of the
@@ -496,7 +543,6 @@ DropRelationLocalBuffers(RelFileLocator rlocator, ForkNumber forkNum,
 	for (i = 0; i < NLocBuffer; i++)
 	{
 		BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
-		LocalBufferLookupEnt *hresult;
 		uint32		buf_state;
 
 		buf_state = pg_atomic_read_u32(&bufHdr->state);
@@ -506,24 +552,7 @@ DropRelationLocalBuffers(RelFileLocator rlocator, ForkNumber forkNum,
 			BufTagGetForkNum(&bufHdr->tag) == forkNum &&
 			bufHdr->tag.blockNum >= firstDelBlock)
 		{
-			if (LocalRefCount[i] != 0)
-				elog(ERROR, "block %u of %s is still referenced (local %u)",
-					 bufHdr->tag.blockNum,
-					 relpathbackend(BufTagGetRelFileLocator(&bufHdr->tag),
-									MyProcNumber,
-									BufTagGetForkNum(&bufHdr->tag)),
-					 LocalRefCount[i]);
-
-			/* Remove entry from hashtable */
-			hresult = (LocalBufferLookupEnt *)
-				hash_search(LocalBufHash, &bufHdr->tag, HASH_REMOVE, NULL);
-			if (!hresult)		/* shouldn't happen */
-				elog(ERROR, "local buffer hash table corrupted");
-			/* Mark buffer invalid */
-			ClearBufferTag(&bufHdr->tag);
-			buf_state &= ~BUF_FLAG_MASK;
-			buf_state &= ~BUF_USAGECOUNT_MASK;
-			pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
+			InvalidateLocalBuffer(bufHdr);
 		}
 	}
 }
@@ -543,7 +572,6 @@ DropRelationAllLocalBuffers(RelFileLocator rlocator)
 	for (i = 0; i < NLocBuffer; i++)
 	{
 		BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
-		LocalBufferLookupEnt *hresult;
 		uint32		buf_state;
 
 		buf_state = pg_atomic_read_u32(&bufHdr->state);
@@ -551,23 +579,7 @@ DropRelationAllLocalBuffers(RelFileLocator rlocator)
 		if ((buf_state & BM_TAG_VALID) &&
 			BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator))
 		{
-			if (LocalRefCount[i] != 0)
-				elog(ERROR, "block %u of %s is still referenced (local %u)",
-					 bufHdr->tag.blockNum,
-					 relpathbackend(BufTagGetRelFileLocator(&bufHdr->tag),
-									MyProcNumber,
-									BufTagGetForkNum(&bufHdr->tag)),
-					 LocalRefCount[i]);
-			/* Remove entry from hashtable */
-			hresult = (LocalBufferLookupEnt *)
-				hash_search(LocalBufHash, &bufHdr->tag, HASH_REMOVE, NULL);
-			if (!hresult)		/* shouldn't happen */
-				elog(ERROR, "local buffer hash table corrupted");
-			/* Mark buffer invalid */
-			ClearBufferTag(&bufHdr->tag);
-			buf_state &= ~BUF_FLAG_MASK;
-			buf_state &= ~BUF_USAGECOUNT_MASK;
-			pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
+			InvalidateLocalBuffer(bufHdr);
 		}
 	}
 }
@@ -667,12 +679,13 @@ PinLocalBuffer(BufferDesc *buf_hdr, bool adjust_usagecount)
 	if (LocalRefCount[bufid] == 0)
 	{
 		NLocalPinnedBuffers++;
+		buf_state += BUF_REFCOUNT_ONE;
 		if (adjust_usagecount &&
 			BUF_STATE_GET_USAGECOUNT(buf_state) < BM_MAX_USAGE_COUNT)
 		{
 			buf_state += BUF_USAGECOUNT_ONE;
-			pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
 		}
+		pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
 	}
 	LocalRefCount[bufid]++;
 	ResourceOwnerRememberBuffer(CurrentResourceOwner,
@@ -698,7 +711,17 @@ UnpinLocalBufferNoOwner(Buffer buffer)
 	Assert(NLocalPinnedBuffers > 0);
 
 	if (--LocalRefCount[buffid] == 0)
+	{
+		BufferDesc *buf_hdr = GetLocalBufferDescriptor(buffid);
+		uint32		buf_state;
+
 		NLocalPinnedBuffers--;
+
+		buf_state = pg_atomic_read_u32(&buf_hdr->state);
+		Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
+		buf_state -= BUF_REFCOUNT_ONE;
+		pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
+	}
 }
 
 /*
@@ -894,11 +917,14 @@ ReadBufferCompleteReadLocal(Buffer buffer, int mode, bool failed)
 
 		buf_state = pg_atomic_read_u32(&buf_hdr->state);
 		buf_state |= BM_VALID;
+
+		/*
+		 * Release pin held by IO subsystem, see also
+		 * local_buffer_readv_prepare().
+		 */
+		buf_state -= BUF_REFCOUNT_ONE;
 		pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
 	}
 
-	/* release pin held by IO subsystem */
-	LocalRefCount[-buffer - 1] -= 1;
-
 	return buf_failed;
 }
-- 
2.48.1.76.g4e746b1a31.dirty

