From 200af0d589054f8d015a1ed4ae347c684149bde8 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Wed, 18 Mar 2026 11:13:25 -0400
Subject: [PATCH v6 7/8] Introduce PrepareHeadBufferReadIO() and
 PrepareAdditionalBufferReadIO()

Replace ReadBuffersCanStartIO() and ReadBuffersCanStartIOOnce() with
new explicit helper functions that inline the logic from
StartBufferIO() and StartLocalBufferIO().

Besides the inlined logic being easier to reason, StartBufferIO()
doesn't distinguish between 'already valid' and 'IO in progress' (and
explicitly states it does not want to), which is required to defer
waiting for in-progress IO. A future commit will implement deferred
waiting for in-progress IO.
---
 src/backend/storage/buffer/bufmgr.c | 171 +++++++++++++++++++++++-----
 1 file changed, 141 insertions(+), 30 deletions(-)

diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index a9995b75917..2179ade07cc 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -1659,43 +1659,150 @@ CheckReadBuffersOperation(ReadBuffersOperation *operation, bool is_complete)
 #endif
 }
 
-/* helper for ReadBuffersCanStartIO(), to avoid repetition */
-static inline bool
-ReadBuffersCanStartIOOnce(Buffer buffer, bool nowait)
+/*
+ * Local version of PrepareHeadBufferReadIO(). Here instead of localbuf.c to
+ * avoid an external function call.
+ */
+static bool
+PrepareHeadLocalBufferReadIO(Buffer buffer)
 {
-	if (BufferIsLocal(buffer))
-		return StartLocalBufferIO(GetLocalBufferDescriptor(-buffer - 1),
-								  true, nowait);
-	else
-		return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait);
+	BufferDesc *desc = GetLocalBufferDescriptor(-buffer - 1);
+	uint64		buf_state = pg_atomic_read_u64(&desc->state);
+
+	/*
+	 * The buffer could already be valid if a prior IO by this backend was
+	 * completed and reclaimed incidentally (e.g. while acquiring a new AIO
+	 * handle). Only the owning backend can set BM_VALID on a local buffer.
+	 */
+	if (buf_state & BM_VALID)
+		return false;
+
+	/*
+	 * Submit any staged IO before checking for in-progress IO. Without this,
+	 * the wref check below could find IO that this backend staged but hasn't
+	 * submitted yet. Waiting on that would PANIC because the owner can't wait
+	 * on its own staged IO.
+	 */
+	pgaio_submit_staged();
+
+	/* Wait for in-progress IO */
+	if (pgaio_wref_valid(&desc->io_wref))
+	{
+		PgAioWaitRef iow = desc->io_wref;
+
+		pgaio_wref_wait(&iow);
+
+		buf_state = pg_atomic_read_u64(&desc->state);
+	}
+
+	/*
+	 * If BM_VALID is set, we waited on IO and it completed successfully.
+	 * Otherwise, we'll initiate IO on the buffer.
+	 */
+	return !(buf_state & BM_VALID);
 }
 
 /*
- * Helper for AsyncReadBuffers that tries to get the buffer ready for IO.
+ * Try to start IO on the first buffer in a new run of blocks. If AIO is in
+ * progress, be it in this backend or another backend, we wait for it to
+ * finish and then check the result.
+ *
+ * Returns true if the buffer is ready for IO, false if the buffer is already
+ * valid.
  */
-static inline bool
-ReadBuffersCanStartIO(Buffer buffer, bool nowait)
+static bool
+PrepareHeadBufferReadIO(Buffer buffer)
 {
-	/*
-	 * If this backend currently has staged IO, we need to submit the pending
-	 * IO before waiting for the right to issue IO, to avoid the potential for
-	 * deadlocks (and, more commonly, unnecessary delays for other backends).
-	 */
-	if (!nowait && pgaio_have_staged())
+	uint64		buf_state;
+	BufferDesc *desc;
+
+	if (BufferIsLocal(buffer))
+		return PrepareHeadLocalBufferReadIO(buffer);
+
+	ResourceOwnerEnlarge(CurrentResourceOwner);
+	desc = GetBufferDescriptor(buffer - 1);
+
+	for (;;)
 	{
-		if (ReadBuffersCanStartIOOnce(buffer, true))
-			return true;
+		buf_state = LockBufHdr(desc);
+
+		Assert(buf_state & BM_TAG_VALID);
+
+		/* Already valid, no work to do */
+		if (buf_state & BM_VALID)
+		{
+			UnlockBufHdr(desc);
+			return false;
+		}
+
+		if (buf_state & BM_IO_IN_PROGRESS)
+		{
+			UnlockBufHdr(desc);
+
+			/*
+			 * If this backend currently has staged IO, submit it before
+			 * waiting for in-progress IO, to avoid potential deadlocks and
+			 * unnecessary delays.
+			 */
+			pgaio_submit_staged();
+			WaitIO(desc);
+			continue;
+		}
 
 		/*
-		 * Unfortunately StartBufferIO() returning false doesn't allow to
-		 * distinguish between the buffer already being valid and IO already
-		 * being in progress. Since IO already being in progress is quite
-		 * rare, this approach seems fine.
+		 * No IO in progress and not already valid; We will start IO. It's
+		 * possible that the IO was in progress and never became valid because
+		 * the IO errored out. We'll do the IO ourselves.
 		 */
-		pgaio_submit_staged();
+		UnlockBufHdrExt(desc, buf_state, BM_IO_IN_PROGRESS, 0, 0);
+		ResourceOwnerRememberBufferIO(CurrentResourceOwner,
+									  BufferDescriptorGetBuffer(desc));
+
+		return true;
+	}
+}
+
+/*
+ * When building a new IO from multiple buffers, we won't include buffers
+ * that are already valid or already in progress. This function should only be
+ * used for additional adjacent buffers following the head buffer in a new IO.
+ *
+ * This function must never wait for IO to avoid deadlocks. The head buffer
+ * already has BM_IO_IN_PROGRESS set, so we'll just issue that IO and come
+ * back in lieu of waiting here.
+ *
+ * Returns true if the buffer was successfully prepared for IO and false if it
+ * is rejected and the read IO should not include this buffer.
+ */
+static bool
+PrepareAdditionalBufferReadIO(Buffer buffer)
+{
+	uint64		buf_state;
+	BufferDesc *desc;
+
+	if (BufferIsLocal(buffer))
+	{
+		desc = GetLocalBufferDescriptor(-buffer - 1);
+		buf_state = pg_atomic_read_u64(&desc->state);
+		/* Local buffers don't use BM_IO_IN_PROGRESS */
+		if (buf_state & BM_VALID || pgaio_wref_valid(&desc->io_wref))
+			return false;
+	}
+	else
+	{
+		ResourceOwnerEnlarge(CurrentResourceOwner);
+		desc = GetBufferDescriptor(buffer - 1);
+		buf_state = LockBufHdr(desc);
+		if (buf_state & (BM_VALID | BM_IO_IN_PROGRESS))
+		{
+			UnlockBufHdr(desc);
+			return false;
+		}
+		UnlockBufHdrExt(desc, buf_state, BM_IO_IN_PROGRESS, 0, 0);
+		ResourceOwnerRememberBufferIO(CurrentResourceOwner, buffer);
 	}
 
-	return ReadBuffersCanStartIOOnce(buffer, nowait);
+	return true;
 }
 
 /*
@@ -1934,8 +2041,10 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 	}
 
 	/*
-	 * Get IO handle before ReadBuffersCanStartIO(), as pgaio_io_acquire()
-	 * might block, which we don't want after setting IO_IN_PROGRESS.
+	 * We must get an IO handle before PrepareHeadBufferReadIO(), as
+	 * pgaio_io_acquire() might block, which we don't want after setting
+	 * IO_IN_PROGRESS. If we don't need to do the IO, we'll release the
+	 * handle.
 	 *
 	 * If we need to wait for IO before we can get a handle, submit
 	 * already-staged IO first, so that other backends don't need to wait.
@@ -1957,6 +2066,8 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 		ioh = pgaio_io_acquire(CurrentResourceOwner, &operation->io_return);
 	}
 
+	pgaio_wref_clear(&operation->io_wref);
+
 	/*
 	 * Check if we can start IO on the first to-be-read buffer.
 	 *
@@ -1964,10 +2075,10 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 	 * for the outcome: either done, or something went wrong and we will
 	 * retry.
 	 */
-	if (!ReadBuffersCanStartIO(buffers[nblocks_done], false))
+	if (!PrepareHeadBufferReadIO(buffers[nblocks_done]))
 	{
 		/*
-		 * Someone else has already completed this block, we're done.
+		 * Someone has already completed this block, we're done.
 		 *
 		 * When IO is necessary, ->nblocks_done is updated in
 		 * ProcessReadBuffersResult(), but that is not called if no IO is
@@ -2046,7 +2157,7 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 		Assert(BufferGetBlockNumber(buffers[i - 1]) ==
 			   BufferGetBlockNumber(buffers[i]) - 1);
 
-		if (!ReadBuffersCanStartIO(buffers[i], true))
+		if (!PrepareAdditionalBufferReadIO(buffers[i]))
 			break;
 
 		Assert(io_buffers[io_buffers_len] == buffers[i]);
-- 
2.43.0

