From 5403662d9fd2730066dd3ce3083b4f4f1a3d147a Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Fri, 14 Mar 2025 13:48:47 -0400
Subject: [PATCH v2.8 24/38] bufmgr: Use AIO in StartReadBuffers()

This finally introduces the first actual AIO user. StartReadBuffers() now uses
the AIO routines to issue IO. This converts a lot of callers to use the AIO
infrastructure.

As the implementation of StartReadBuffers() is also used by the functions for
reading individual blocks (StartReadBuffer() and through that
ReadBufferExtended()) this means all buffered read IO passes through the AIO
paths.  However, as those are synchronous reads, actually performing the IO
asynchronously would be rarely beneficial. Instead such IOs are flagged to
always be executed synchronously. This way we don't have to duplicate a fair
bit of code.

When io_method=sync is used, the IO patterns generated after this change are
the same as before, i.e. actual reads are only issued in WaitReadBuffers() and
StartReadBuffers() may issue prefetch requests.  This allows to bypass most of
the actual asynchronicity, which is important to make a change as big as this
less risky.

One thing worth calling out is that, if IO is actually executed
asynchronously, the precise meaning of what track_io_timing is measuring has
changed. Previously it tracked the time for each IO, but that does not make
sense when multiple IOs are executed concurrently. Now it only measures the
time actually spent waiting for IO.

While AIO is now actually used, the logic in read_stream.c will often prevent
using sufficiently many concurrent IOs. That will be addressed in the next
commits.

Co-authored-by: Andres Freund <andres@anarazel.de>
Co-authored-by: Thomas Munro <thomas.munro@gmail.com>
Discussion: https://postgr.es/m/uvrtrknj4kdytuboidbhwclo4gxhswwcpgadptsjvjqcluzmah%40brqs62irg4dt
Discussion: https://postgr.es/m/20210223100344.llw5an2aklengrmn@alap3.anarazel.de
Discussion: https://postgr.es/m/stj36ea6yyhoxtqkhpieia2z4krnam7qyetc57rfezgk4zgapf@gcnactj4z56m
---
 src/include/storage/bufmgr.h        |   6 +
 src/backend/storage/buffer/bufmgr.c | 614 +++++++++++++++++++++-------
 2 files changed, 477 insertions(+), 143 deletions(-)

diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index eafc94d7ed1..ca4ca22c345 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -112,6 +112,9 @@ typedef struct BufferManagerRelation
 #define READ_BUFFERS_ZERO_ON_ERROR (1 << 0)
 /* Call smgrprefetch() if I/O necessary. */
 #define READ_BUFFERS_ISSUE_ADVICE (1 << 1)
+/* IO will immediately be waited for */
+#define READ_BUFFERS_SYNCHRONOUSLY (1 << 2)
+
 
 struct ReadBuffersOperation
 {
@@ -131,6 +134,9 @@ struct ReadBuffersOperation
 	BlockNumber blocknum;
 	int			flags;
 	int16		nblocks;
+	int16		nblocks_done;
+	PgAioWaitRef io_wref;
+	PgAioReturn io_return;
 };
 
 typedef struct ReadBuffersOperation ReadBuffersOperation;
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index d783b33f78e..2691757016c 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -528,6 +528,8 @@ static inline BufferDesc *BufferAlloc(SMgrRelation smgr,
 									  BlockNumber blockNum,
 									  BufferAccessStrategy strategy,
 									  bool *foundPtr, IOContext io_context);
+static bool AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks);
+static void CheckReadBuffersOperation(ReadBuffersOperation *operation, bool is_complete);
 static Buffer GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context);
 static void FlushBuffer(BufferDesc *buf, SMgrRelation reln,
 						IOObject io_object, IOContext io_context);
@@ -1228,10 +1230,14 @@ ReadBuffer_common(Relation rel, SMgrRelation smgr, char smgr_persistence,
 		return buffer;
 	}
 
+	/*
+	 * Signal that we are going to immediately wait. If we're immediately
+	 * waiting, there is no benefit in actually executing the IO
+	 * asynchronously, it would just add dispatch overhead.
+	 */
+	flags = READ_BUFFERS_SYNCHRONOUSLY;
 	if (mode == RBM_ZERO_ON_ERROR)
-		flags = READ_BUFFERS_ZERO_ON_ERROR;
-	else
-		flags = 0;
+		flags |= READ_BUFFERS_ZERO_ON_ERROR;
 	operation.smgr = smgr;
 	operation.rel = rel;
 	operation.persistence = persistence;
@@ -1256,9 +1262,11 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
 {
 	int			actual_nblocks = *nblocks;
 	int			maxcombine = 0;
+	bool		did_start_io;
 
 	Assert(*nblocks > 0);
 	Assert(*nblocks <= MAX_IO_COMBINE_LIMIT);
+	Assert(*nblocks == 1 || allow_forwarding);
 
 	for (int i = 0; i < actual_nblocks; ++i)
 	{
@@ -1297,6 +1305,7 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
 				bufHdr = GetLocalBufferDescriptor(-buffers[i] - 1);
 			else
 				bufHdr = GetBufferDescriptor(buffers[i] - 1);
+			Assert(pg_atomic_read_u32(&bufHdr->state) & BM_TAG_VALID);
 			found = pg_atomic_read_u32(&bufHdr->state) & BM_VALID;
 		}
 		else
@@ -1321,6 +1330,20 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
 			if (i == 0)
 			{
 				*nblocks = 1;
+
+#ifdef USE_ASSERT_CHECKING
+
+				/*
+				 * Initialize enough of ReadBuffersOperation to make
+				 * CheckReadBuffersOperation() work. Outside of assertions
+				 * that's not necessary when no IO is issued.
+				 */
+				operation->buffers = buffers;
+				operation->blocknum = blockNum;
+				operation->nblocks = 1;
+				operation->nblocks_done = 1;
+				CheckReadBuffersOperation(operation, true);
+#endif
 				return false;
 			}
 
@@ -1363,25 +1386,74 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
 	operation->blocknum = blockNum;
 	operation->flags = flags;
 	operation->nblocks = actual_nblocks;
+	operation->nblocks_done = 0;
+	pgaio_wref_clear(&operation->io_wref);
 
-	if (flags & READ_BUFFERS_ISSUE_ADVICE)
+	/*
+	 * When using AIO, start the IO in the background. If not, issue prefetch
+	 * requests if desired by the caller.
+	 *
+	 * The reason we have a dedicated path for IOMETHOD_SYNC here is to
+	 * de-risk the introduction of AIO somewhat. It's a large architectural
+	 * change, with lots of chances for unanticipated performance effects.
+	 *
+	 * Use of IOMETHOD_SYNC already leads to not actually performing IO
+	 * asynchronously, but without the check here we'd execute IO earlier than
+	 * we used to. Eventually this IOMETHOD_SYNC specific path should go away.
+	 */
+	if (io_method != IOMETHOD_SYNC)
 	{
 		/*
-		 * In theory we should only do this if PinBufferForBlock() had to
-		 * allocate new buffers above.  That way, if two calls to
-		 * StartReadBuffers() were made for the same blocks before
-		 * WaitReadBuffers(), only the first would issue the advice. That'd be
-		 * a better simulation of true asynchronous I/O, which would only
-		 * start the I/O once, but isn't done here for simplicity.
+		 * Try to start IO asynchronously. It's possible that no IO needs to
+		 * be started, if another backend already performed the IO.
+		 *
+		 * Note that if an IO is started, it might not cover the entire
+		 * requested range, e.g. because an intermediary block has been read
+		 * in by another backend.  In that case any "trailing" buffers we
+		 * already pinned above will be "forwarded" by read_stream.c to the
+		 * next call to StartReadBuffers().
+		 *
+		 * This is signalled to the caller by decrementing *nblocks *and*
+		 * reducing operation->nblocks. The latter is done here, but not below
+		 * WaitReadBuffers(), as in WaitReadBuffers() we can't "shorten" the
+		 * overall read size anymore, we need to retry until done in its
+		 * entirety or until failed.
 		 */
-		smgrprefetch(operation->smgr,
-					 operation->forknum,
-					 blockNum,
-					 actual_nblocks);
+		did_start_io = AsyncReadBuffers(operation, nblocks);
+
+		operation->nblocks = *nblocks;
+	}
+	else
+	{
+		operation->flags |= READ_BUFFERS_SYNCHRONOUSLY;
+
+		if (flags & READ_BUFFERS_ISSUE_ADVICE)
+		{
+			/*
+			 * In theory we should only do this if PinBufferForBlock() had to
+			 * allocate new buffers above.  That way, if two calls to
+			 * StartReadBuffers() were made for the same blocks before
+			 * WaitReadBuffers(), only the first would issue the advice.
+			 * That'd be a better simulation of true asynchronous I/O, which
+			 * would only start the I/O once, but isn't done here for
+			 * simplicity.
+			 */
+			smgrprefetch(operation->smgr,
+						 operation->forknum,
+						 blockNum,
+						 actual_nblocks);
+		}
+
+		/*
+		 * Indicate that WaitReadBuffers() should be called. WaitReadBuffers()
+		 * will initiate the necessary IO.
+		 */
+		did_start_io = true;
 	}
 
-	/* Indicate that WaitReadBuffers() should be called. */
-	return true;
+	CheckReadBuffersOperation(operation, !did_start_io);
+
+	return did_start_io;
 }
 
 /*
@@ -1448,8 +1520,35 @@ StartReadBuffer(ReadBuffersOperation *operation,
 	return result;
 }
 
+/*
+ * Perform sanity checks on the ReadBuffersOperation.
+ */
+static void
+CheckReadBuffersOperation(ReadBuffersOperation *operation, bool is_complete)
+{
+#ifdef USE_ASSERT_CHECKING
+	Assert(operation->nblocks_done <= operation->nblocks);
+	Assert(!is_complete || operation->nblocks == operation->nblocks_done);
+
+	for (int i = 0; i < operation->nblocks; i++)
+	{
+		Buffer		buffer = operation->buffers[i];
+		BufferDesc *buf_hdr = BufferIsLocal(buffer) ?
+			GetLocalBufferDescriptor(-buffer - 1) :
+			GetBufferDescriptor(buffer - 1);
+
+		Assert(BufferGetBlockNumber(buffer) == operation->blocknum + i);
+		Assert(pg_atomic_read_u32(&buf_hdr->state) & BM_TAG_VALID);
+
+		if (i < operation->nblocks_done)
+			Assert(pg_atomic_read_u32(&buf_hdr->state) & BM_VALID);
+	}
+#endif
+}
+
+/* helper for ReadBuffersCanStartIO(), to avoid repetition */
 static inline bool
-WaitReadBuffersCanStartIO(Buffer buffer, bool nowait)
+ReadBuffersCanStartIOOnce(Buffer buffer, bool nowait)
 {
 	if (BufferIsLocal(buffer))
 		return StartLocalBufferIO(GetLocalBufferDescriptor(-buffer - 1),
@@ -1458,31 +1557,243 @@ WaitReadBuffersCanStartIO(Buffer buffer, bool nowait)
 		return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait);
 }
 
-void
-WaitReadBuffers(ReadBuffersOperation *operation)
+/*
+ * Helper for AsyncReadBuffers that tries to initiate IO on the buffer.
+ */
+static inline bool
+ReadBuffersCanStartIO(Buffer buffer, bool nowait)
 {
-	Buffer	   *buffers;
-	int			nblocks;
-	BlockNumber blocknum;
-	ForkNumber	forknum;
-	IOContext	io_context;
-	IOObject	io_object;
-	char		persistence;
-
-	/* Find the range of the physical read we need to perform. */
-	nblocks = operation->nblocks;
-	buffers = &operation->buffers[0];
-	blocknum = operation->blocknum;
-	forknum = operation->forknum;
-	persistence = operation->persistence;
+	/*
+	 * If this backend currently has staged IO, we we need to submit the
+	 * pending IO before waiting for the right to issue IO, to avoid the
+	 * potential for deadlocks (and, more commonly, unnecessary delays for
+	 * other backends).
+	 */
+	if (!nowait && pgaio_have_staged())
+	{
+		if (ReadBuffersCanStartIOOnce(buffer, true))
+			return true;
+
+		/*
+		 * Unfortunately a false returned StartBufferIO() doesn't allow to
+		 * distinguish between the the buffer already being valid and IO
+		 * already being in progress. Since IO already being in progress is
+		 * quite rare, this approach seems fine.
+		 */
+		pgaio_submit_staged();
+	}
+
+	return ReadBuffersCanStartIOOnce(buffer, nowait);
+}
+
+/*
+ * Helper for WaitReadBuffers() that processes the results of a readv
+ * operation, raising an error if necessary.
+ */
+static void
+ProcessReadBuffersResult(ReadBuffersOperation *operation)
+{
+	PgAioReturn *aio_ret = &operation->io_return;
+	int			nblocks = 0;
+
+	Assert(pgaio_wref_valid(&operation->io_wref));
+	Assert(aio_ret->result.status != ARS_UNKNOWN);
+
+	/*
+	 * SMGR reports the number of blocks sucessfully read as the result of the
+	 * IO operation. Thus we can simply add that to ->nblocks_done.
+	 */
+
+	if (likely(aio_ret->result.status == ARS_OK))
+		nblocks = aio_ret->result.result;
+	else if (aio_ret->result.status == ARS_PARTIAL)
+	{
+		/*
+		 * We'll retry, so we just emit a debug message to the server log (or
+		 * not even that in prod scenarios).
+		 */
+		pgaio_result_report(aio_ret->result, &aio_ret->target_data, DEBUG1);
+		nblocks = aio_ret->result.result;
+
+		elog(DEBUG3, "partial read, will retry");
+
+	}
+	else if (aio_ret->result.status == ARS_ERROR)
+	{
+		pgaio_result_report(aio_ret->result, &aio_ret->target_data, ERROR);
+		nblocks = 0;			/* silence compiler */
+	}
 
 	Assert(nblocks > 0);
 	Assert(nblocks <= MAX_IO_COMBINE_LIMIT);
 
+	operation->nblocks_done += nblocks;
+
+	Assert(operation->nblocks_done <= operation->nblocks);
+}
+
+void
+WaitReadBuffers(ReadBuffersOperation *operation)
+{
+	PgAioReturn *aio_ret = &operation->io_return;
+	IOContext	io_context;
+	IOObject	io_object;
+
+	if (operation->persistence == RELPERSISTENCE_TEMP)
+	{
+		io_context = IOCONTEXT_NORMAL;
+		io_object = IOOBJECT_TEMP_RELATION;
+	}
+	else
+	{
+		io_context = IOContextForStrategy(operation->strategy);
+		io_object = IOOBJECT_RELATION;
+	}
+
+	/*
+	 * If we get here without an IO operation having been issued, the
+	 * io_method == IOMETHOD_SYNC path must have been used. Otherwise the
+	 * caller should not have called WaitReadBuffers().
+	 *
+	 * In the case of IOMETHOD_SYNC, we start - as we used to before the
+	 * introducing of AIO - the IO in WaitReadBuffers(). This is done as part
+	 * of the retry logic below, no extra code is required.
+	 *
+	 * This path is expected to eventually go away.
+	 */
+	if (!pgaio_wref_valid(&operation->io_wref) && io_method != IOMETHOD_SYNC)
+		elog(ERROR, "waiting for read operation that didn't read");
+
+	/*
+	 * To handle partial reads, and IOMETHOD_SYNC, we re-issue IO until we're
+	 * done. We may need multiple retries, not just because we could get
+	 * multiple short reads, but also because some of the remaining to-be-read
+	 * buffers may have been read in by other backends, limiting the IO size.
+	 */
+	while (true)
+	{
+		int			nblocks;
+
+		CheckReadBuffersOperation(operation, false);
+
+		/*
+		 * If there is an IO associated with the operation, we may need to
+		 * wait for it. It's possible for there to be no IO if
+		 */
+		if (pgaio_wref_valid(&operation->io_wref))
+		{
+			/*
+			 * Track the time spent waiting for the IO to complete. As
+			 * tracking a wait even if we don't actually need to wait
+			 *
+			 * a) is not cheap, due to the timestamping overhead
+			 *
+			 * b) reports some time as waiting, even if we never waited
+			 *
+			 * we first check if we already know the IO is complete.
+			 */
+			if (aio_ret->result.status == ARS_UNKNOWN &&
+				!pgaio_wref_check_done(&operation->io_wref))
+			{
+				instr_time	io_start = pgstat_prepare_io_time(track_io_timing);
+
+				pgaio_wref_wait(&operation->io_wref);
+
+				/*
+				 * The IO operation itself was already counted earlier, in
+				 * AsyncReadBuffers(), this just accounts for the wait time.
+				 */
+				pgstat_count_io_op_time(io_object, io_context, IOOP_READ,
+										io_start, 0, 0);
+			}
+			else
+			{
+				Assert(pgaio_wref_check_done(&operation->io_wref));
+			}
+
+			/*
+			 * We now are sure the IO completed. Check the results. This
+			 * includes reporting on errors if there were any.
+			 */
+			ProcessReadBuffersResult(operation);
+		}
+
+		/*
+		 * Most of the the time the one IO we started will read in everything.
+		 * But we need to deal with short reads and buffers not needing IO
+		 * anymore.
+		 */
+		if (operation->nblocks_done == operation->nblocks)
+			break;
+
+		CHECK_FOR_INTERRUPTS();
+
+		/*
+		 * This may only complete the IO partially, either because some
+		 * buffers were already valid, or because of a short read.
+		 *
+		 * NB: In contrast to after the AsyncReadBuffers() call in
+		 * StartReadBuffers(), we do *not* reduce
+		 * ReadBuffersOperation->nblocks here, callers expect the full
+		 * operation to be completed at this point (as more operations may
+		 * have been queued).
+		 */
+		AsyncReadBuffers(operation, &nblocks);
+	}
+
+	CheckReadBuffersOperation(operation, true);
+
+	/* NB: READ_DONE tracepoint was already executed in completion callback */
+}
+
+/*
+ * Initiate IO for the ReadBuffersOperation
+ *
+ * This function only starts a single IO at a time. The size of the IO may be
+ * limited to below the to-be-read blocks if one of the buffers has
+ * concurrently been read in. If the first to-be-read buffer is already valid,
+ * no IO will be issued.
+ *
+ * To support retries after short reads, the first operation->nblocks_done is
+ * buffers are skipped.
+ *
+ * On return *nblocks_progres is updated to reflect the number of buffers
+ * affected by the call. If the first buffer is valid, *nblocks_progress is
+ * set to 1 and operation->nblocks_done is incremented.
+ *
+ * Returns true if IO was initiated, false if no IO was necessary.
+ */
+static bool
+AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
+{
+	Buffer	   *buffers = &operation->buffers[0];
+	int			flags = operation->flags;
+	BlockNumber blocknum = operation->blocknum;
+	ForkNumber	forknum = operation->forknum;
+	char		persistence = operation->persistence;
+	int16		nblocks_done = operation->nblocks_done;
+	Buffer	   *io_buffers = &operation->buffers[nblocks_done];
+	int			io_buffers_len = 0;
+	PgAioHandle *ioh;
+	uint32		ioh_flags = 0;
+	void	   *io_pages[MAX_IO_COMBINE_LIMIT];
+	IOContext	io_context;
+	IOObject	io_object;
+	bool		did_start_io;
+
+	/*
+	 * When this IO is executed synchronously, either because the caller will
+	 * immediately block waiting for the IO or because IOMETHOD_SYNC is used,
+	 * the AIO subsystem needs to know.
+	 */
+	if (flags & READ_BUFFERS_SYNCHRONOUSLY)
+		ioh_flags |= PGAIO_HF_SYNCHRONOUS;
+
 	if (persistence == RELPERSISTENCE_TEMP)
 	{
 		io_context = IOCONTEXT_NORMAL;
 		io_object = IOOBJECT_TEMP_RELATION;
+		ioh_flags |= PGAIO_HF_REFERENCES_LOCAL;
 	}
 	else
 	{
@@ -1490,58 +1801,90 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 		io_object = IOOBJECT_RELATION;
 	}
 
-	for (int i = 0; i < nblocks; ++i)
+	/*
+	 * Get IO before ReadBuffersCanStartIO, as pgaio_io_acquire() might block,
+	 * which we don't want after setting IO_IN_PROGRESS.
+	 *
+	 * If IO needs to be submitted before we can get a handle, submit already
+	 * staged IO first, so that other backends don't need to wait. There
+	 * wouldn't be a deadlock risk, as pgaio_io_acquire() just needs to wait
+	 * for already submitted IO, which doesn't require additional locks, but
+	 * it could still cause undesirable waits.
+	 *
+	 * A secondary benefit is that this would allows us to measure the time in
+	 * pgaio_io_acquire() without causing undue timer overhead in the common,
+	 * non-blocking, case.  However, currently the pgstats infrastructure
+	 * doesn't really allow that, as it a) asserts that an operation can't
+	 * have time without operations b) doesn't have an API to report
+	 * "accumulated" time.
+	 */
+	ioh = pgaio_io_acquire_nb(CurrentResourceOwner, &operation->io_return);
+	if (unlikely(!ioh))
+	{
+		pgaio_submit_staged();
+
+		ioh = pgaio_io_acquire(CurrentResourceOwner, &operation->io_return);
+	}
+
+	/*
+	 * Check if we can start IO on the buffer.
+	 *
+	 * If an I/O is already in progress in another backend, we want to wait
+	 * for the outcome: either done, or something went wrong and we will
+	 * retry.
+	 */
+	if (!ReadBuffersCanStartIO(buffers[nblocks_done], false))
+	{
+		/*
+		 * Someone else has already completed this block, we're done.
+		 *
+		 * When IO is necessary, ->nblocks_done is updated in
+		 * ProcessReadBuffersResult(), but that is not called if no IO is
+		 * necessary. Thus update here.
+		 */
+		operation->nblocks_done += 1;
+		*nblocks_progress = 1;
+
+		pgaio_io_release(ioh);
+		pgaio_wref_clear(&operation->io_wref);
+		did_start_io = false;
+
+		/*
+		 * Report and track this as a 'hit' for this backend, even though it
+		 * must have started out as a miss in PinBufferForBlock().
+		 *
+		 * Some of the accesses would otherwise never be counted (e.g.
+		 * pgBufferUsage) or counted as a miss (e.g.
+		 * pgstat_count_buffer_hit(), as we always call
+		 * pgstat_count_buffer_read()).
+		 */
+		TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, blocknum + operation->nblocks_done,
+										  operation->smgr->smgr_rlocator.locator.spcOid,
+										  operation->smgr->smgr_rlocator.locator.dbOid,
+										  operation->smgr->smgr_rlocator.locator.relNumber,
+										  operation->smgr->smgr_rlocator.backend,
+										  true);
+
+		if (persistence == RELPERSISTENCE_TEMP)
+			pgBufferUsage.local_blks_hit += 1;
+		else
+			pgBufferUsage.shared_blks_hit += 1;
+
+		if (operation->rel)
+			pgstat_count_buffer_hit(operation->rel);
+
+		pgstat_count_io_op(io_object, io_context, IOOP_HIT, 1, 0);
+
+		if (VacuumCostActive)
+			VacuumCostBalance += VacuumCostPageHit;
+	}
+	else
 	{
-		int			io_buffers_len;
-		Buffer		io_buffers[MAX_IO_COMBINE_LIMIT];
-		void	   *io_pages[MAX_IO_COMBINE_LIMIT];
 		instr_time	io_start;
-		BlockNumber io_first_block;
-
-		/*
-		 * Skip this block if someone else has already completed it.  If an
-		 * I/O is already in progress in another backend, this will wait for
-		 * the outcome: either done, or something went wrong and we will
-		 * retry.
-		 */
-		if (!WaitReadBuffersCanStartIO(buffers[i], false))
-		{
-			/*
-			 * Report and track this as a 'hit' for this backend, even though
-			 * it must have started out as a miss in PinBufferForBlock().
-			 *
-			 * Some of the accesses would otherwise never be counted (e.g.
-			 * pgBufferUsage) or counted as a miss (e.g.
-			 * pgstat_count_buffer_hit(), as we always call
-			 * pgstat_count_buffer_read()).
-			 */
-			TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, blocknum + i,
-											  operation->smgr->smgr_rlocator.locator.spcOid,
-											  operation->smgr->smgr_rlocator.locator.dbOid,
-											  operation->smgr->smgr_rlocator.locator.relNumber,
-											  operation->smgr->smgr_rlocator.backend,
-											  true);
-
-			if (persistence == RELPERSISTENCE_TEMP)
-				pgBufferUsage.local_blks_hit += 1;
-			else
-				pgBufferUsage.shared_blks_hit += 1;
-
-			if (operation->rel)
-				pgstat_count_buffer_hit(operation->rel);
-
-			pgstat_count_io_op(io_object, io_context, IOOP_HIT, 1, 0);
-
-			if (VacuumCostActive)
-				VacuumCostBalance += VacuumCostPageHit;
-
-			continue;
-		}
 
 		/* We found a buffer that we need to read in. */
-		io_buffers[0] = buffers[i];
-		io_pages[0] = BufferGetBlock(buffers[i]);
-		io_first_block = blocknum + i;
+		Assert(io_buffers[0] == buffers[nblocks_done]);
+		io_pages[0] = BufferGetBlock(buffers[nblocks_done]);
 		io_buffers_len = 1;
 
 		/*
@@ -1549,85 +1892,70 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 		 * buffers at the same time?  In this case we don't wait if we see an
 		 * I/O already in progress.  We already hold BM_IO_IN_PROGRESS for the
 		 * head block, so we should get on with that I/O as soon as possible.
-		 * We'll come back to this block again, above.
+		 *
+		 * We'll come back to this block in the next call to
+		 * StartReadBuffers() -> AsyncReadBuffers().
 		 */
-		while ((i + 1) < nblocks &&
-			   WaitReadBuffersCanStartIO(buffers[i + 1], true))
+		for (int i = nblocks_done + 1; i < operation->nblocks; i++)
 		{
+			if (!ReadBuffersCanStartIO(buffers[i], true))
+				break;
 			/* Must be consecutive block numbers. */
-			Assert(BufferGetBlockNumber(buffers[i + 1]) ==
-				   BufferGetBlockNumber(buffers[i]) + 1);
+			Assert(BufferGetBlockNumber(buffers[i - 1]) ==
+				   BufferGetBlockNumber(buffers[i]) - 1);
+			Assert(io_buffers[io_buffers_len] == buffers[i]);
 
-			io_buffers[io_buffers_len] = buffers[++i];
 			io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]);
 		}
 
+		/* get a reference to wait for in WaitReadBuffers() */
+		pgaio_io_get_wref(ioh, &operation->io_wref);
+
+		/* provide the list of buffers to the completion callbacks */
+		pgaio_io_set_handle_data_32(ioh, (uint32 *) io_buffers, io_buffers_len);
+
+		pgaio_io_register_callbacks(ioh,
+									persistence == RELPERSISTENCE_TEMP ?
+									PGAIO_HCB_LOCAL_BUFFER_READV :
+									PGAIO_HCB_SHARED_BUFFER_READV,
+									flags);
+
+		pgaio_io_set_flag(ioh, ioh_flags);
+
+		/* ---
+		 * Even though we're trying to issue IO asynchronously, track the time
+		 * in smgrstartreadv():
+		 * - if io_method == IOMETHOD_SYNC, we will always perform the IO
+		 *   immediately
+		 * - the io method might not support the IO (e.g. worker IO for a temp
+		 *   table)
+		 * ---
+		 */
 		io_start = pgstat_prepare_io_time(track_io_timing);
-		smgrreadv(operation->smgr, forknum, io_first_block, io_pages, io_buffers_len);
-		pgstat_count_io_op_time(io_object, io_context, IOOP_READ, io_start,
-								1, io_buffers_len * BLCKSZ);
-
-		/* Verify each block we read, and terminate the I/O. */
-		for (int j = 0; j < io_buffers_len; ++j)
-		{
-			BufferDesc *bufHdr;
-			Block		bufBlock;
-
-			if (persistence == RELPERSISTENCE_TEMP)
-			{
-				bufHdr = GetLocalBufferDescriptor(-io_buffers[j] - 1);
-				bufBlock = LocalBufHdrGetBlock(bufHdr);
-			}
-			else
-			{
-				bufHdr = GetBufferDescriptor(io_buffers[j] - 1);
-				bufBlock = BufHdrGetBlock(bufHdr);
-			}
-
-			/* check for garbage data */
-			if (!PageIsVerifiedExtended((Page) bufBlock, io_first_block + j,
-										PIV_LOG_WARNING | PIV_REPORT_STAT))
-			{
-				if ((operation->flags & READ_BUFFERS_ZERO_ON_ERROR) || zero_damaged_pages)
-				{
-					ereport(WARNING,
-							(errcode(ERRCODE_DATA_CORRUPTED),
-							 errmsg("invalid page in block %u of relation %s; zeroing out page",
-									io_first_block + j,
-									relpath(operation->smgr->smgr_rlocator, forknum).str)));
-					memset(bufBlock, 0, BLCKSZ);
-				}
-				else
-					ereport(ERROR,
-							(errcode(ERRCODE_DATA_CORRUPTED),
-							 errmsg("invalid page in block %u of relation %s",
-									io_first_block + j,
-									relpath(operation->smgr->smgr_rlocator, forknum).str)));
-			}
-
-			/* Set BM_VALID, terminate IO, and wake up any waiters */
-			if (persistence == RELPERSISTENCE_TEMP)
-				TerminateLocalBufferIO(bufHdr, false, BM_VALID, true);
-			else
-				TerminateBufferIO(bufHdr, false, BM_VALID, true, true);
-
-			/* Report I/Os as completing individually. */
-			TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, io_first_block + j,
-											  operation->smgr->smgr_rlocator.locator.spcOid,
-											  operation->smgr->smgr_rlocator.locator.dbOid,
-											  operation->smgr->smgr_rlocator.locator.relNumber,
-											  operation->smgr->smgr_rlocator.backend,
-											  false);
-		}
+		smgrstartreadv(ioh, operation->smgr, forknum,
+					   blocknum + nblocks_done,
+					   io_pages, io_buffers_len);
+		pgstat_count_io_op_time(io_object, io_context, IOOP_READ,
+								io_start, 1, *nblocks_progress * BLCKSZ);
 
 		if (persistence == RELPERSISTENCE_TEMP)
 			pgBufferUsage.local_blks_read += io_buffers_len;
 		else
 			pgBufferUsage.shared_blks_read += io_buffers_len;
 
+		/*
+		 * Track vacuum cost when issuing IO, not after waiting for it.
+		 * Otherwise we could end up issuing a lot of IO in a short timespan,
+		 * despite a low cost limit.
+		 */
 		if (VacuumCostActive)
 			VacuumCostBalance += VacuumCostPageMiss * io_buffers_len;
+
+		*nblocks_progress = io_buffers_len;
+		did_start_io = true;
 	}
+
+	return did_start_io;
 }
 
 /*
-- 
2.48.1.76.g4e746b1a31.dirty

