From fb9ba6b67df5060bcd788cbd72988734718c6a7d Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Fri, 23 Jan 2026 14:00:31 -0500
Subject: [PATCH v3 5/5] Don't wait for already in-progress IO
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

When a backend attempts to start a read on a buffer and finds that I/O
is already in progress, it previously waited for that I/O to complete
before initiating reads for any other buffers. Although the backend must
still wait for the I/O to finish when later acquiring the buffer, it
should not need to wait at read start time. Other buffers may be
available for I/O, and in some workloads this waiting significantly
reduces concurrency.

For example, index scans may repeatedly request the same heap block. If
the backend waits each time it encounters an in-progress read, the
access pattern effectively degenerates into synchronous I/O. By
introducing the concept of foreign I/O operations, a backend can record
the buffer’s wait reference and defer waiting until WaitReadBuffers()
when it actually acquires the buffer.

In rare cases, a backend may still need to wait when starting a read if
it encounters a buffer after another backend has set BM_IO_IN_PROGRESS
but before the buffer descriptor’s wait reference has been set. Such
windows should be brief and uncommon.
---
 src/backend/storage/buffer/bufmgr.c | 481 ++++++++++++++++++----------
 src/include/storage/bufmgr.h        |   1 +
 src/tools/pgindent/typedefs.list    |   1 +
 3 files changed, 320 insertions(+), 163 deletions(-)

diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index bad8894011a..55c77e10a81 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -169,6 +169,21 @@ typedef struct SMgrSortArray
 	SMgrRelation srel;
 } SMgrSortArray;
 
+
+/*
+ * In AsyncReadBuffers(), when preparing a buffer for reading and setting
+ * BM_IO_IN_PROGRESS, the buffer may already have I/O in progress or may
+ * already contain the desired block. AsyncReadBuffers() must distinguish
+ * between these cases (and the case where it should initiate I/O) so it can
+ * mark an in-progress buffer as foreign I/O rather than waiting on it.
+ */
+typedef enum PrepareReadBuffer_Status
+{
+	READ_BUFFER_ALREADY_DONE,
+	READ_BUFFER_IN_PROGRESS,
+	READ_BUFFER_READY_FOR_IO,
+} PrepareReadBuffer_Status;
+
 /* GUC variables */
 bool		zero_damaged_pages = false;
 int			bgwriter_lru_maxpages = 100;
@@ -1618,45 +1633,6 @@ CheckReadBuffersOperation(ReadBuffersOperation *operation, bool is_complete)
 #endif
 }
 
-/* helper for ReadBuffersCanStartIO(), to avoid repetition */
-static inline bool
-ReadBuffersCanStartIOOnce(Buffer buffer, bool nowait)
-{
-	if (BufferIsLocal(buffer))
-		return StartLocalBufferIO(GetLocalBufferDescriptor(-buffer - 1),
-								  true, nowait);
-	else
-		return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait);
-}
-
-/*
- * Helper for AsyncReadBuffers that tries to get the buffer ready for IO.
- */
-static inline bool
-ReadBuffersCanStartIO(Buffer buffer, bool nowait)
-{
-	/*
-	 * If this backend currently has staged IO, we need to submit the pending
-	 * IO before waiting for the right to issue IO, to avoid the potential for
-	 * deadlocks (and, more commonly, unnecessary delays for other backends).
-	 */
-	if (!nowait && pgaio_have_staged())
-	{
-		if (ReadBuffersCanStartIOOnce(buffer, true))
-			return true;
-
-		/*
-		 * Unfortunately StartBufferIO() returning false doesn't allow to
-		 * distinguish between the buffer already being valid and IO already
-		 * being in progress. Since IO already being in progress is quite
-		 * rare, this approach seems fine.
-		 */
-		pgaio_submit_staged();
-	}
-
-	return ReadBuffersCanStartIOOnce(buffer, nowait);
-}
-
 /*
  * We track various stats related to buffer hits. Because this is done in a
  * few separate places, this helper exists for convenience.
@@ -1806,7 +1782,7 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 			 *
 			 * we first check if we already know the IO is complete.
 			 */
-			if (aio_ret->result.status == PGAIO_RS_UNKNOWN &&
+			if ((operation->foreign_io || aio_ret->result.status == PGAIO_RS_UNKNOWN) &&
 				!pgaio_wref_check_done(&operation->io_wref))
 			{
 				instr_time	io_start = pgstat_prepare_io_time(track_io_timing);
@@ -1825,11 +1801,33 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 				Assert(pgaio_wref_check_done(&operation->io_wref));
 			}
 
-			/*
-			 * We now are sure the IO completed. Check the results. This
-			 * includes reporting on errors if there were any.
-			 */
-			ProcessReadBuffersResult(operation);
+			if (unlikely(operation->foreign_io))
+			{
+				Buffer		buffer = operation->buffers[operation->nblocks_done];
+				BufferDesc *desc = BufferIsLocal(buffer) ?
+					GetLocalBufferDescriptor(-buffer - 1) :
+					GetBufferDescriptor(buffer - 1);
+				uint32		buf_state = pg_atomic_read_u64(&desc->state);
+
+				if (buf_state & BM_VALID)
+				{
+					operation->nblocks_done += 1;
+					Assert(operation->nblocks_done <= operation->nblocks);
+
+					ProcessBufferHit(operation->strategy,
+									 operation->rel, operation->persistence,
+									 operation->smgr, operation->forknum,
+									 operation->blocknum + operation->nblocks_done);
+				}
+			}
+			else
+			{
+				/*
+				 * We now are sure the IO completed. Check the results. This
+				 * includes reporting on errors if there were any.
+				 */
+				ProcessReadBuffersResult(operation);
+			}
 		}
 
 		/*
@@ -1860,6 +1858,159 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 	/* NB: READ_DONE tracepoint was already executed in completion callback */
 }
 
+/*
+ * Local version of PrepareNewReadBufferIO(). Here instead of localbuf.c to
+ * avoid an external function call.
+ */
+static PrepareReadBuffer_Status
+PrepareNewLocalReadBufferIO(ReadBuffersOperation *operation,
+							Buffer buffer)
+{
+	BufferDesc *desc = GetLocalBufferDescriptor(-buffer - 1);
+	uint64		buf_state = pg_atomic_read_u64(&desc->state);
+
+	/* Already valid, no work to do */
+	if (buf_state & BM_VALID)
+	{
+		pgaio_wref_clear(&operation->io_wref);
+		return READ_BUFFER_ALREADY_DONE;
+	}
+
+	pgaio_submit_staged();
+
+	if (pgaio_wref_valid(&desc->io_wref))
+	{
+		operation->io_wref = desc->io_wref;
+		operation->foreign_io = true;
+		return READ_BUFFER_IN_PROGRESS;
+	}
+
+	return READ_BUFFER_READY_FOR_IO;
+}
+
+/*
+ * Try to start IO on the first buffer in a new run of blocks. If AIO is in
+ * progress, be it in this backend or another backend, we just associate the
+ * wait reference with the operation and wait in WaitReadBuffers(). This turns
+ * out to be important for performance in two workloads:
+ *
+ * 1) A read stream that has to read the same block multiple times within the
+ *    readahead distance. This can happen e.g. for the table accesses of an
+ *    index scan.
+ *
+ * 2) Concurrent scans by multiple backends on the same relation.
+ *
+ * If we were to synchronously wait for the in-progress IO, we'd not be able
+ * to keep enough I/O in flight.
+ *
+ * If we do find there is ongoing I/O for the buffer, we set up a 1-block
+ * ReadBuffersOperation that WaitReadBuffers then can wait on.
+ *
+ * It's possible that another backend has started IO on the buffer but not yet
+ * set its wait reference. In this case, we have no choice but to wait for
+ * either the wait reference to be valid or the IO to be done.
+ */
+static PrepareReadBuffer_Status
+PrepareNewReadBufferIO(ReadBuffersOperation *operation,
+					   Buffer buffer)
+{
+	uint64		buf_state;
+	BufferDesc *desc;
+
+	if (BufferIsLocal(buffer))
+		return PrepareNewLocalReadBufferIO(operation, buffer);
+
+	ResourceOwnerEnlarge(CurrentResourceOwner);
+	desc = GetBufferDescriptor(buffer - 1);
+
+	for (;;)
+	{
+		buf_state = LockBufHdr(desc);
+
+		/* Already valid, no work to do */
+		if (buf_state & BM_VALID)
+		{
+			UnlockBufHdr(desc);
+			pgaio_wref_clear(&operation->io_wref);
+			return READ_BUFFER_ALREADY_DONE;
+		}
+
+		if (buf_state & BM_IO_IN_PROGRESS)
+		{
+			/* Join existing read */
+			if (pgaio_wref_valid(&desc->io_wref))
+			{
+				operation->io_wref = desc->io_wref;
+				operation->foreign_io = true;
+				UnlockBufHdr(desc);
+				return READ_BUFFER_IN_PROGRESS;
+			}
+
+			/*
+			 * If the wait ref is not valid but the IO is in progress, someone
+			 * else started IO but hasn't set the wait ref yet. We have no
+			 * choice but to wait until the wait ref is set or the IO
+			 * completes.
+			 */
+			UnlockBufHdr(desc);
+			pgaio_submit_staged();
+			WaitIO(desc);
+			continue;
+		}
+
+		/*
+		 * No IO in progress and not already valid; We will start IO. It's
+		 * possible that the IO was in progress and never became valid because
+		 * the IO errored out. We'll do the IO ourselves.
+		 */
+		UnlockBufHdrExt(desc, buf_state, BM_IO_IN_PROGRESS, 0, 0);
+		ResourceOwnerRememberBufferIO(CurrentResourceOwner,
+									  BufferDescriptorGetBuffer(desc));
+
+		return READ_BUFFER_READY_FOR_IO;
+	}
+}
+
+
+/*
+ * When building a new IO from multiple buffers, we won't include buffers
+ * that are already valid or already in progress. This function should only be
+ * used for additional adjacent buffers following the head buffer in a new IO.
+ *
+ * Returns true if the buffer was successfully prepared for IO and false if it
+ * is rejected and the read IO should not include this buffer.
+*/
+static bool
+PrepareAdditionalReadBuffer(Buffer buffer)
+{
+	uint64		buf_state;
+	BufferDesc *desc;
+
+	if (BufferIsLocal(buffer))
+	{
+		desc = GetLocalBufferDescriptor(-buffer - 1);
+		buf_state = pg_atomic_read_u64(&desc->state);
+		/* Local buffers don't use BM_IO_IN_PROGRESS */
+		if (buf_state & BM_VALID || pgaio_wref_valid(&desc->io_wref))
+			return false;
+	}
+	else
+	{
+		ResourceOwnerEnlarge(CurrentResourceOwner);
+		desc = GetBufferDescriptor(buffer - 1);
+		buf_state = LockBufHdr(desc);
+		if (buf_state & (BM_VALID | BM_IO_IN_PROGRESS))
+		{
+			UnlockBufHdr(desc);
+			return false;
+		}
+		UnlockBufHdrExt(desc, buf_state, BM_IO_IN_PROGRESS, 0, 0);
+		ResourceOwnerRememberBufferIO(CurrentResourceOwner, buffer);
+	}
+
+	return true;
+}
+
 /*
  * Initiate IO for the ReadBuffersOperation
  *
@@ -1893,7 +2044,75 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 	void	   *io_pages[MAX_IO_COMBINE_LIMIT];
 	IOContext	io_context;
 	IOObject	io_object;
-	bool		did_start_io;
+	instr_time	io_start;
+	PrepareReadBuffer_Status status;
+
+	/*
+	 * We must get an IO handle before StartNewBufferReadIO(), as
+	 * pgaio_io_acquire() might block, which we don't want after setting
+	 * IO_IN_PROGRESS. If we don't need to do the IO, we'll release the
+	 * handle.
+	 *
+	 * If we need to wait for IO before we can get a handle, submit
+	 * already-staged IO first, so that other backends don't need to wait.
+	 * There wouldn't be a deadlock risk, as pgaio_io_acquire() just needs to
+	 * wait for already submitted IO, which doesn't require additional locks,
+	 * but it could still cause undesirable waits.
+	 *
+	 * A secondary benefit is that this would allow us to measure the time in
+	 * pgaio_io_acquire() without causing undue timer overhead in the common,
+	 * non-blocking, case.  However, currently the pgstats infrastructure
+	 * doesn't really allow that, as it a) asserts that an operation can't
+	 * have time without operations b) doesn't have an API to report
+	 * "accumulated" time.
+	 */
+	ioh = pgaio_io_acquire_nb(CurrentResourceOwner, &operation->io_return);
+	if (unlikely(!ioh))
+	{
+		pgaio_submit_staged();
+		ioh = pgaio_io_acquire(CurrentResourceOwner, &operation->io_return);
+	}
+
+	operation->foreign_io = false;
+
+	/* Check if we can start IO on the first to-be-read buffer */
+	if ((status = PrepareNewReadBufferIO(operation, buffers[nblocks_done])) <
+		READ_BUFFER_READY_FOR_IO)
+	{
+		pgaio_io_release(ioh);
+		*nblocks_progress = 1;
+		if (status == READ_BUFFER_ALREADY_DONE)
+		{
+			/*
+			 * Someone else has already completed this block, we're done.
+			 *
+			 * When IO is necessary, ->nblocks_done is updated in
+			 * ProcessReadBuffersResult(), but that is not called if no IO is
+			 * necessary. Thus update here.
+			 */
+			operation->nblocks_done += 1;
+			Assert(operation->nblocks_done <= operation->nblocks);
+
+			/*
+			 * Report and track this as a 'hit' for this backend, even though
+			 * it must have started out as a miss in PinBufferForBlock(). The
+			 * other backend will track this as a 'read'.
+			 */
+			ProcessBufferHit(operation->strategy,
+							 operation->rel, operation->persistence,
+							 operation->smgr, operation->forknum,
+							 operation->blocknum + operation->nblocks_done);
+			return false;
+		}
+
+		/* The IO is already in-progress */
+		Assert(status == READ_BUFFER_IN_PROGRESS);
+		CheckReadBuffersOperation(operation, false);
+		return true;
+	}
+
+	/* We can read in at least the head buffer . */
+	Assert(status == READ_BUFFER_READY_FOR_IO);
 
 	/*
 	 * When this IO is executed synchronously, either because the caller will
@@ -1944,138 +2163,74 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 	 */
 	pgstat_prepare_report_checksum_failure(operation->smgr->smgr_rlocator.locator.dbOid);
 
-	/*
-	 * Get IO handle before ReadBuffersCanStartIO(), as pgaio_io_acquire()
-	 * might block, which we don't want after setting IO_IN_PROGRESS.
-	 *
-	 * If we need to wait for IO before we can get a handle, submit
-	 * already-staged IO first, so that other backends don't need to wait.
-	 * There wouldn't be a deadlock risk, as pgaio_io_acquire() just needs to
-	 * wait for already submitted IO, which doesn't require additional locks,
-	 * but it could still cause undesirable waits.
-	 *
-	 * A secondary benefit is that this would allow us to measure the time in
-	 * pgaio_io_acquire() without causing undue timer overhead in the common,
-	 * non-blocking, case.  However, currently the pgstats infrastructure
-	 * doesn't really allow that, as it a) asserts that an operation can't
-	 * have time without operations b) doesn't have an API to report
-	 * "accumulated" time.
-	 */
-	ioh = pgaio_io_acquire_nb(CurrentResourceOwner, &operation->io_return);
-	if (unlikely(!ioh))
-	{
-		pgaio_submit_staged();
-
-		ioh = pgaio_io_acquire(CurrentResourceOwner, &operation->io_return);
-	}
+	Assert(io_buffers[0] == buffers[nblocks_done]);
+	io_pages[0] = BufferGetBlock(buffers[nblocks_done]);
+	io_buffers_len = 1;
 
 	/*
-	 * Check if we can start IO on the first to-be-read buffer.
-	 *
-	 * If an I/O is already in progress in another backend, we want to wait
-	 * for the outcome: either done, or something went wrong and we will
-	 * retry.
+	 * How many neighboring-on-disk blocks can we scatter-read into other
+	 * buffers at the same time?  In this case we don't wait if we see an I/O
+	 * already in progress.  We already set BM_IO_IN_PROGRESS for the head
+	 * block, so we should get on with that I/O as soon as possible.
 	 */
-	if (!ReadBuffersCanStartIO(buffers[nblocks_done], false))
+	for (int i = nblocks_done + 1; i < operation->nblocks; i++)
 	{
-		/*
-		 * Someone else has already completed this block, we're done.
-		 *
-		 * When IO is necessary, ->nblocks_done is updated in
-		 * ProcessReadBuffersResult(), but that is not called if no IO is
-		 * necessary. Thus update here.
-		 */
-		operation->nblocks_done += 1;
-		*nblocks_progress = 1;
-
-		pgaio_io_release(ioh);
-		pgaio_wref_clear(&operation->io_wref);
-		did_start_io = false;
+		if (!PrepareAdditionalReadBuffer(buffers[i]))
+			break;
+		/* Must be consecutive block numbers. */
+		Assert(BufferGetBlockNumber(buffers[i - 1]) ==
+			   BufferGetBlockNumber(buffers[i]) - 1);
+		Assert(io_buffers[io_buffers_len] == buffers[i]);
 
-		/*
-		 * Report and track this as a 'hit' for this backend, even though it
-		 * must have started out as a miss in PinBufferForBlock(). The other
-		 * backend will track this as a 'read'.
-		 */
-		ProcessBufferHit(operation->strategy, operation->rel, persistence,
-						 operation->smgr, forknum,
-						 blocknum + operation->nblocks_done);
+		io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]);
 	}
-	else
-	{
-		instr_time	io_start;
-
-		/* We found a buffer that we need to read in. */
-		Assert(io_buffers[0] == buffers[nblocks_done]);
-		io_pages[0] = BufferGetBlock(buffers[nblocks_done]);
-		io_buffers_len = 1;
-
-		/*
-		 * How many neighboring-on-disk blocks can we scatter-read into other
-		 * buffers at the same time?  In this case we don't wait if we see an
-		 * I/O already in progress.  We already set BM_IO_IN_PROGRESS for the
-		 * head block, so we should get on with that I/O as soon as possible.
-		 */
-		for (int i = nblocks_done + 1; i < operation->nblocks; i++)
-		{
-			if (!ReadBuffersCanStartIO(buffers[i], true))
-				break;
-			/* Must be consecutive block numbers. */
-			Assert(BufferGetBlockNumber(buffers[i - 1]) ==
-				   BufferGetBlockNumber(buffers[i]) - 1);
-			Assert(io_buffers[io_buffers_len] == buffers[i]);
 
-			io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]);
-		}
+	/* get a reference to wait for in WaitReadBuffers() */
+	pgaio_io_get_wref(ioh, &operation->io_wref);
 
-		/* get a reference to wait for in WaitReadBuffers() */
-		pgaio_io_get_wref(ioh, &operation->io_wref);
+	/* provide the list of buffers to the completion callbacks */
+	pgaio_io_set_handle_data_32(ioh, (uint32 *) io_buffers, io_buffers_len);
 
-		/* provide the list of buffers to the completion callbacks */
-		pgaio_io_set_handle_data_32(ioh, (uint32 *) io_buffers, io_buffers_len);
+	pgaio_io_register_callbacks(ioh,
+								persistence == RELPERSISTENCE_TEMP ?
+								PGAIO_HCB_LOCAL_BUFFER_READV :
+								PGAIO_HCB_SHARED_BUFFER_READV,
+								flags);
 
-		pgaio_io_register_callbacks(ioh,
-									persistence == RELPERSISTENCE_TEMP ?
-									PGAIO_HCB_LOCAL_BUFFER_READV :
-									PGAIO_HCB_SHARED_BUFFER_READV,
-									flags);
+	pgaio_io_set_flag(ioh, ioh_flags);
 
-		pgaio_io_set_flag(ioh, ioh_flags);
+	/* ---
+	* Even though we're trying to issue IO asynchronously, track the time
+	* in smgrstartreadv():
+	* - if io_method == IOMETHOD_SYNC, we will always perform the IO
+	*   immediately
+	* - the io method might not support the IO (e.g. worker IO for a temp
+	*   table)
+	* ---
+	*/
+	io_start = pgstat_prepare_io_time(track_io_timing);
+	smgrstartreadv(ioh, operation->smgr, forknum,
+				   blocknum + nblocks_done,
+				   io_pages, io_buffers_len);
+	pgstat_count_io_op_time(io_object, io_context, IOOP_READ,
+							io_start, 1, io_buffers_len * BLCKSZ);
 
-		/* ---
-		 * Even though we're trying to issue IO asynchronously, track the time
-		 * in smgrstartreadv():
-		 * - if io_method == IOMETHOD_SYNC, we will always perform the IO
-		 *   immediately
-		 * - the io method might not support the IO (e.g. worker IO for a temp
-		 *   table)
-		 * ---
-		 */
-		io_start = pgstat_prepare_io_time(track_io_timing);
-		smgrstartreadv(ioh, operation->smgr, forknum,
-					   blocknum + nblocks_done,
-					   io_pages, io_buffers_len);
-		pgstat_count_io_op_time(io_object, io_context, IOOP_READ,
-								io_start, 1, io_buffers_len * BLCKSZ);
-
-		if (persistence == RELPERSISTENCE_TEMP)
-			pgBufferUsage.local_blks_read += io_buffers_len;
-		else
-			pgBufferUsage.shared_blks_read += io_buffers_len;
+	if (persistence == RELPERSISTENCE_TEMP)
+		pgBufferUsage.local_blks_read += io_buffers_len;
+	else
+		pgBufferUsage.shared_blks_read += io_buffers_len;
 
-		/*
-		 * Track vacuum cost when issuing IO, not after waiting for it.
-		 * Otherwise we could end up issuing a lot of IO in a short timespan,
-		 * despite a low cost limit.
-		 */
-		if (VacuumCostActive)
-			VacuumCostBalance += VacuumCostPageMiss * io_buffers_len;
+	/*
+	 * Track vacuum cost when issuing IO, not after waiting for it. Otherwise
+	 * we could end up issuing a lot of IO in a short timespan, despite a low
+	 * cost limit.
+	 */
+	if (VacuumCostActive)
+		VacuumCostBalance += VacuumCostPageMiss * io_buffers_len;
 
-		*nblocks_progress = io_buffers_len;
-		did_start_io = true;
-	}
+	*nblocks_progress = io_buffers_len;
 
-	return did_start_io;
+	return true;
 }
 
 /*
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index a40adf6b2a8..1358fc7fa64 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -147,6 +147,7 @@ struct ReadBuffersOperation
 	int			flags;
 	int16		nblocks;
 	int16		nblocks_done;
+	bool		foreign_io;
 	PgAioWaitRef io_wref;
 	PgAioReturn io_return;
 };
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index db583985813..6c6bdc8ac4f 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2341,6 +2341,7 @@ PredicateLockData
 PredicateLockTargetType
 PrefetchBufferResult
 PrepParallelRestorePtrType
+PrepareReadBuffer_Status
 PrepareStmt
 PreparedStatement
 PresortedKeyData
-- 
2.43.0

