From b153f4c8c7cf10171dd7390920ef38e079be1c87 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Wed, 22 Jan 2025 13:44:45 -0500
Subject: [PATCH v2.3 21/30] bufmgr: Use aio for StartReadBuffers()

Author:
Reviewed-By:
Discussion: https://postgr.es/m/
Backpatch:
---
 src/include/storage/bufmgr.h        |  25 +-
 src/backend/storage/buffer/bufmgr.c | 377 ++++++++++++++++++++--------
 2 files changed, 298 insertions(+), 104 deletions(-)

diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 5cff4e223f9..46ee957e99c 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -15,6 +15,7 @@
 #define BUFMGR_H
 
 #include "port/pg_iovec.h"
+#include "storage/aio_types.h"
 #include "storage/block.h"
 #include "storage/buf.h"
 #include "storage/bufpage.h"
@@ -107,10 +108,18 @@ typedef struct BufferManagerRelation
 #define BMR_REL(p_rel) ((BufferManagerRelation){.rel = p_rel})
 #define BMR_SMGR(p_smgr, p_relpersistence) ((BufferManagerRelation){.smgr = p_smgr, .relpersistence = p_relpersistence})
 
+
+#define MAX_IO_COMBINE_LIMIT PG_IOV_MAX
+#define DEFAULT_IO_COMBINE_LIMIT Min(MAX_IO_COMBINE_LIMIT, (128 * 1024) / BLCKSZ)
+
+
 /* Zero out page if reading fails. */
 #define READ_BUFFERS_ZERO_ON_ERROR (1 << 0)
 /* Call smgrprefetch() if I/O necessary. */
 #define READ_BUFFERS_ISSUE_ADVICE (1 << 1)
+/* IO will immediately be waited for */
+#define READ_BUFFERS_SYNCHRONOUSLY (1 << 2)
+
 
 struct ReadBuffersOperation
 {
@@ -131,6 +140,20 @@ struct ReadBuffersOperation
 	int			flags;
 	int16		nblocks;
 	int16		io_buffers_len;
+
+	/*
+	 * In some rare-ish cases one operation causes multiple reads (e.g. if a
+	 * buffer was concurrently read by another backend). It'd be much better
+	 * if we ensured that each ReadBuffersOperation covered only one IO - but
+	 * that's not entirely trivial, due to having pinned victim buffers before
+	 * starting IOs.
+	 *
+	 * TODO: Change the API of StartReadBuffers() to ensure we only ever need
+	 * one IO.
+	 */
+	int16		nios;
+	PgAioWaitRef wrefs[MAX_IO_COMBINE_LIMIT];
+	PgAioReturn returns[MAX_IO_COMBINE_LIMIT];
 };
 
 typedef struct ReadBuffersOperation ReadBuffersOperation;
@@ -161,8 +184,6 @@ extern PGDLLIMPORT bool track_io_timing;
 extern PGDLLIMPORT int effective_io_concurrency;
 extern PGDLLIMPORT int maintenance_io_concurrency;
 
-#define MAX_IO_COMBINE_LIMIT PG_IOV_MAX
-#define DEFAULT_IO_COMBINE_LIMIT Min(MAX_IO_COMBINE_LIMIT, (128 * 1024) / BLCKSZ)
 extern PGDLLIMPORT int io_combine_limit;
 
 extern PGDLLIMPORT int checkpoint_flush_after;
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index fe871691350..70f1da84083 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -1235,10 +1235,9 @@ ReadBuffer_common(Relation rel, SMgrRelation smgr, char smgr_persistence,
 		return buffer;
 	}
 
+	flags = READ_BUFFERS_SYNCHRONOUSLY;
 	if (mode == RBM_ZERO_ON_ERROR)
-		flags = READ_BUFFERS_ZERO_ON_ERROR;
-	else
-		flags = 0;
+		flags |= READ_BUFFERS_ZERO_ON_ERROR;
 	operation.smgr = smgr;
 	operation.rel = rel;
 	operation.persistence = persistence;
@@ -1253,6 +1252,9 @@ ReadBuffer_common(Relation rel, SMgrRelation smgr, char smgr_persistence,
 	return buffer;
 }
 
+static bool AsyncReadBuffers(ReadBuffersOperation *operation,
+							 int nblocks);
+
 static pg_attribute_always_inline bool
 StartReadBuffersImpl(ReadBuffersOperation *operation,
 					 Buffer *buffers,
@@ -1288,6 +1290,11 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
 			 * so we stop here.
 			 */
 			actual_nblocks = i + 1;
+
+			ereport(DEBUG3,
+					errmsg("found buf at idx %i: %s",
+						   i, DebugPrintBufferRefcount(buffers[i])),
+					errhidestmt(true), errhidecontext(true));
 			break;
 		}
 		else
@@ -1324,28 +1331,51 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
 	operation->flags = flags;
 	operation->nblocks = actual_nblocks;
 	operation->io_buffers_len = io_buffers_len;
+	operation->nios = 0;
 
-	if (flags & READ_BUFFERS_ISSUE_ADVICE)
+	/*
+	 * When using AIO, start the IO in the background. If not, issue prefetch
+	 * requests if desired by the caller.
+	 *
+	 * The reason we have a dedicated path for IOMETHOD_SYNC here is to derisk
+	 * the introduction of AIO somewhat. It's a large architectural change,
+	 * with lots of chances for unanticipated performance effects.  Use of
+	 * IOMETHOD_SYNC already leads to not actually performing IO
+	 * asynchronously, but without the check here we'd execute IO earlier than
+	 * we used to.
+	 */
+	if (io_method != IOMETHOD_SYNC)
 	{
-		/*
-		 * In theory we should only do this if PinBufferForBlock() had to
-		 * allocate new buffers above.  That way, if two calls to
-		 * StartReadBuffers() were made for the same blocks before
-		 * WaitReadBuffers(), only the first would issue the advice. That'd be
-		 * a better simulation of true asynchronous I/O, which would only
-		 * start the I/O once, but isn't done here for simplicity.  Note also
-		 * that the following call might actually issue two advice calls if we
-		 * cross a segment boundary; in a true asynchronous version we might
-		 * choose to process only one real I/O at a time in that case.
-		 */
-		smgrprefetch(operation->smgr,
-					 operation->forknum,
-					 blockNum,
-					 operation->io_buffers_len);
+		/* initiate the IO asynchronously */
+		return AsyncReadBuffers(operation, io_buffers_len);
 	}
+	else
+	{
+		operation->flags |= READ_BUFFERS_SYNCHRONOUSLY;
+
+		if (flags & READ_BUFFERS_ISSUE_ADVICE)
+		{
+			/*
+			 * In theory we should only do this if PinBufferForBlock() had to
+			 * allocate new buffers above.  That way, if two calls to
+			 * StartReadBuffers() were made for the same blocks before
+			 * WaitReadBuffers(), only the first would issue the advice.
+			 * That'd be a better simulation of true asynchronous I/O, which
+			 * would only start the I/O once, but isn't done here for
+			 * simplicity.  Note also that the following call might actually
+			 * issue two advice calls if we cross a segment boundary; in a
+			 * true asynchronous version we might choose to process only one
+			 * real I/O at a time in that case.
+			 */
+			smgrprefetch(operation->smgr,
+						 operation->forknum,
+						 blockNum,
+						 operation->io_buffers_len);
+		}
 
-	/* Indicate that WaitReadBuffers() should be called. */
-	return true;
+		/* Indicate that WaitReadBuffers() should be called. */
+		return true;
+	}
 }
 
 /*
@@ -1397,12 +1427,31 @@ StartReadBuffer(ReadBuffersOperation *operation,
 }
 
 static inline bool
-WaitReadBuffersCanStartIO(Buffer buffer, bool nowait)
+ReadBuffersCanStartIO(Buffer buffer, bool nowait)
 {
 	if (BufferIsLocal(buffer))
 	{
 		BufferDesc *bufHdr = GetLocalBufferDescriptor(-buffer - 1);
 
+		/*
+		 * The buffer could have IO in progress by another scan. Right now
+		 * localbuf.c doesn't use IO_IN_PROGRESS, which is why we need this
+		 * hack.
+		 *
+		 * TODO: localbuf.c should use IO_IN_PROGRESS / have an equivalent of
+		 * StartBufferIO().
+		 */
+		if (pgaio_wref_valid(&bufHdr->io_wref))
+		{
+			PgAioWaitRef iow = bufHdr->io_wref;
+
+			ereport(DEBUG3,
+					errmsg("waiting for temp buffer IO in CSIO"),
+					errhidestmt(true), errhidecontext(true));
+			pgaio_wref_wait(&iow);
+			return false;
+		}
+
 		return (pg_atomic_read_u32(&bufHdr->state) & BM_VALID) == 0;
 	}
 	else
@@ -1412,13 +1461,38 @@ WaitReadBuffersCanStartIO(Buffer buffer, bool nowait)
 void
 WaitReadBuffers(ReadBuffersOperation *operation)
 {
-	Buffer	   *buffers;
+	IOContext	io_context;
+	IOObject	io_object;
 	int			nblocks;
-	BlockNumber blocknum;
-	ForkNumber	forknum;
-	IOContext	io_context;
-	IOObject	io_object;
-	char		persistence;
+	bool		have_retryable_failure;
+
+	/*
+	 * If we get here without any IO operations having been issued, the
+	 * io_method == IOMETHOD_SYNC path must have been used. In that case, we
+	 * start - as we used to before - the IO now, just before waiting.
+	 */
+	if (operation->nios == 0)
+	{
+		Assert(io_method == IOMETHOD_SYNC);
+		if (!AsyncReadBuffers(operation, operation->io_buffers_len))
+		{
+			/* all blocks were already read in concurrently */
+			return;
+		}
+	}
+
+	if (operation->persistence == RELPERSISTENCE_TEMP)
+	{
+		io_context = IOCONTEXT_NORMAL;
+		io_object = IOOBJECT_TEMP_RELATION;
+	}
+	else
+	{
+		io_context = IOContextForStrategy(operation->strategy);
+		io_object = IOOBJECT_RELATION;
+	}
+
+restart:
 
 	/*
 	 * Currently operations are only allowed to include a read of some range,
@@ -1433,15 +1507,101 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 	if (nblocks == 0)
 		return;					/* nothing to do */
 
-	buffers = &operation->buffers[0];
-	blocknum = operation->blocknum;
-	forknum = operation->forknum;
-	persistence = operation->persistence;
+	Assert(operation->nios > 0);
 
+	/*
+	 * For IO timing we just count the time spent waiting for the IO.
+	 *
+	 * XXX: We probably should track the IO operation, rather than its time,
+	 * separately, when initiating the IO. But right now that's not quite
+	 * allowed by the interface.
+	 */
+	have_retryable_failure = false;
+	for (int i = 0; i < operation->nios; i++)
+	{
+		PgAioReturn *aio_ret = &operation->returns[i];
+
+		/*
+		 * Tracking a wait even if we don't actually need to wait a) is not
+		 * cheap b) reports some time as waiting, even if we never waited.
+		 */
+		if (aio_ret->result.status == ARS_UNKNOWN &&
+			!pgaio_wref_check_done(&operation->wrefs[i]))
+		{
+			instr_time	io_start = pgstat_prepare_io_time(track_io_timing);
+
+			pgaio_wref_wait(&operation->wrefs[i]);
+
+			/*
+			 * The IO operation itself was already counted earlier, in
+			 * AsyncReadBuffers().
+			 */
+			pgstat_count_io_op_time(io_object, io_context, IOOP_READ,
+									io_start, 0, 0);
+		}
+		else
+		{
+			Assert(pgaio_wref_check_done(&operation->wrefs[i]));
+		}
+
+		if (aio_ret->result.status == ARS_PARTIAL)
+		{
+			/*
+			 * We'll retry below, so we just emit a debug message the server
+			 * log (or not even that in prod scenarios).
+			 */
+			pgaio_result_report(aio_ret->result, &aio_ret->target_data, DEBUG1);
+			have_retryable_failure = true;
+		}
+		else if (aio_ret->result.status != ARS_OK)
+			pgaio_result_report(aio_ret->result, &aio_ret->target_data, ERROR);
+	}
+
+	/*
+	 * If any of the associated IOs failed, try again to issue IOs. Buffers
+	 * for which IO has completed successfully will be discovered as such and
+	 * not retried.
+	 */
+	if (have_retryable_failure)
+	{
+		nblocks = operation->io_buffers_len;
+
+		elog(DEBUG3, "retrying IO after partial failure");
+		CHECK_FOR_INTERRUPTS();
+		AsyncReadBuffers(operation, nblocks);
+		goto restart;
+	}
+
+	if (VacuumCostActive)
+		VacuumCostBalance += VacuumCostPageMiss * nblocks;
+
+	/* FIXME: READ_DONE tracepoint */
+}
+
+static bool
+AsyncReadBuffers(ReadBuffersOperation *operation,
+				 int nblocks)
+{
+	int			io_buffers_len = 0;
+	Buffer	   *buffers = &operation->buffers[0];
+	int			flags = operation->flags;
+	BlockNumber blocknum = operation->blocknum;
+	ForkNumber	forknum = operation->forknum;
+	IOContext	io_context;
+	IOObject	io_object;
+	char		persistence;
+	bool		did_start_io_overall = false;
+	PgAioHandle *ioh = NULL;
+	uint32		ioh_flags = 0;
+
+	persistence = operation->rel
+		? operation->rel->rd_rel->relpersistence
+		: RELPERSISTENCE_PERMANENT;
 	if (persistence == RELPERSISTENCE_TEMP)
 	{
 		io_context = IOCONTEXT_NORMAL;
 		io_object = IOOBJECT_TEMP_RELATION;
+		ioh_flags |= PGAIO_HF_REFERENCES_LOCAL;
 	}
 	else
 	{
@@ -1449,6 +1609,16 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 		io_object = IOOBJECT_RELATION;
 	}
 
+	/*
+	 * When this IO is executed synchronously, either because the caller will
+	 * immediately block waiting for the IO or because IOMETHOD_SYNC is used,
+	 * the AIO subsystem needs to know.
+	 */
+	if (flags & READ_BUFFERS_SYNCHRONOUSLY)
+		ioh_flags |= PGAIO_HF_SYNCHRONOUS;
+
+	operation->nios = 0;
+
 	/*
 	 * We count all these blocks as read by this backend.  This is traditional
 	 * behavior, but might turn out to be not true if we find that someone
@@ -1464,19 +1634,39 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 
 	for (int i = 0; i < nblocks; ++i)
 	{
-		int			io_buffers_len;
-		Buffer		io_buffers[MAX_IO_COMBINE_LIMIT];
 		void	   *io_pages[MAX_IO_COMBINE_LIMIT];
-		instr_time	io_start;
+		Buffer		io_buffers[MAX_IO_COMBINE_LIMIT];
 		BlockNumber io_first_block;
+		bool		did_start_io_this = false;
 
 		/*
-		 * Skip this block if someone else has already completed it.  If an
-		 * I/O is already in progress in another backend, this will wait for
-		 * the outcome: either done, or something went wrong and we will
-		 * retry.
+		 * Get IO before ReadBuffersCanStartIO, as pgaio_io_acquire() might
+		 * block, which we don't want after setting IO_IN_PROGRESS.
+		 *
+		 * XXX: Should we attribute the time spent in here to the IO? If there
+		 * already are a lot of IO operations in progress, getting an IO
+		 * handle will block waiting for some other IO operation to finish.
+		 *
+		 * In most cases it'll be free to get the IO, so a timer would be
+		 * overhead. Perhaps we should use pgaio_io_acquire_nb() and only
+		 * account IO time when pgaio_io_acquire_nb() returned false?
 		 */
-		if (!WaitReadBuffersCanStartIO(buffers[i], false))
+		if (likely(!ioh))
+			ioh = pgaio_io_acquire(CurrentResourceOwner,
+								   &operation->returns[operation->nios]);
+
+		/*
+		 * Skip this block if someone else has already completed it.
+		 *
+		 * If an I/O is already in progress in another backend, this will wait
+		 * for the outcome: either done, or something went wrong and we will
+		 * retry. But don't wait if we have staged, but haven't issued,
+		 * another IO.
+		 *
+		 * XXX: If we can't start IO due to unsubmitted IO, it might be worth
+		 * to submit and then try to start IO again.
+		 */
+		if (!ReadBuffersCanStartIO(buffers[i], did_start_io_overall))
 		{
 			/*
 			 * Report this as a 'hit' for this backend, even though it must
@@ -1488,6 +1678,11 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 											  operation->smgr->smgr_rlocator.locator.relNumber,
 											  operation->smgr->smgr_rlocator.backend,
 											  true);
+
+			ereport(DEBUG3,
+					errmsg("can't start io for first buffer %u: %s",
+						   buffers[i], DebugPrintBufferRefcount(buffers[i])),
+					errhidestmt(true), errhidecontext(true));
 			continue;
 		}
 
@@ -1497,6 +1692,11 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 		io_first_block = blocknum + i;
 		io_buffers_len = 1;
 
+		ereport(DEBUG5,
+				errmsg("first prepped for io: %s, offset %d",
+					   DebugPrintBufferRefcount(io_buffers[0]), i),
+				errhidestmt(true), errhidecontext(true));
+
 		/*
 		 * How many neighboring-on-disk blocks can we scatter-read into other
 		 * buffers at the same time?  In this case we don't wait if we see an
@@ -1505,85 +1705,58 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 		 * We'll come back to this block again, above.
 		 */
 		while ((i + 1) < nblocks &&
-			   WaitReadBuffersCanStartIO(buffers[i + 1], true))
+			   ReadBuffersCanStartIO(buffers[i + 1], true))
 		{
 			/* Must be consecutive block numbers. */
 			Assert(BufferGetBlockNumber(buffers[i + 1]) ==
 				   BufferGetBlockNumber(buffers[i]) + 1);
 
+			ereport(DEBUG5,
+					errmsg("seq prepped for io: %s, offset %d",
+						   DebugPrintBufferRefcount(buffers[i + 1]),
+						   i + 1),
+					errhidestmt(true), errhidecontext(true));
+
 			io_buffers[io_buffers_len] = buffers[++i];
 			io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]);
 		}
 
-		io_start = pgstat_prepare_io_time(track_io_timing);
-		smgrreadv(operation->smgr, forknum, io_first_block, io_pages, io_buffers_len);
-		pgstat_count_io_op_time(io_object, io_context, IOOP_READ, io_start,
-								1, io_buffers_len * BLCKSZ);
+		pgaio_io_get_wref(ioh, &operation->wrefs[operation->nios]);
 
-		/* Verify each block we read, and terminate the I/O. */
-		for (int j = 0; j < io_buffers_len; ++j)
-		{
-			BufferDesc *bufHdr;
-			Block		bufBlock;
+		pgaio_io_set_handle_data_32(ioh, (uint32 *) io_buffers, io_buffers_len);
 
-			if (persistence == RELPERSISTENCE_TEMP)
-			{
-				bufHdr = GetLocalBufferDescriptor(-io_buffers[j] - 1);
-				bufBlock = LocalBufHdrGetBlock(bufHdr);
-			}
-			else
-			{
-				bufHdr = GetBufferDescriptor(io_buffers[j] - 1);
-				bufBlock = BufHdrGetBlock(bufHdr);
-			}
 
-			/* check for garbage data */
-			if (!PageIsVerifiedExtended((Page) bufBlock, io_first_block + j,
-										PIV_LOG_WARNING | PIV_REPORT_STAT))
-			{
-				if ((operation->flags & READ_BUFFERS_ZERO_ON_ERROR) || zero_damaged_pages)
-				{
-					ereport(WARNING,
-							(errcode(ERRCODE_DATA_CORRUPTED),
-							 errmsg("invalid page in block %u of relation %s; zeroing out page",
-									io_first_block + j,
-									relpath(operation->smgr->smgr_rlocator, forknum))));
-					memset(bufBlock, 0, BLCKSZ);
-				}
-				else
-					ereport(ERROR,
-							(errcode(ERRCODE_DATA_CORRUPTED),
-							 errmsg("invalid page in block %u of relation %s",
-									io_first_block + j,
-									relpath(operation->smgr->smgr_rlocator, forknum))));
-			}
+		if (persistence == RELPERSISTENCE_TEMP)
+			pgaio_io_register_callbacks(ioh, PGAIO_HCB_LOCAL_BUFFER_READV);
+		else
+			pgaio_io_register_callbacks(ioh, PGAIO_HCB_SHARED_BUFFER_READV);
 
-			/* Terminate I/O and set BM_VALID. */
-			if (persistence == RELPERSISTENCE_TEMP)
-			{
-				uint32		buf_state = pg_atomic_read_u32(&bufHdr->state);
+		pgaio_io_set_flag(ioh, ioh_flags);
 
-				buf_state |= BM_VALID;
-				pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
-			}
-			else
-			{
-				/* Set BM_VALID, terminate IO, and wake up any waiters */
-				TerminateBufferIO(bufHdr, false, BM_VALID, true, true);
-			}
+		did_start_io_overall = did_start_io_this = true;
+		smgrstartreadv(ioh, operation->smgr, forknum, io_first_block,
+					   io_pages, io_buffers_len);
+		ioh = NULL;
+		operation->nios++;
 
-			/* Report I/Os as completing individually. */
-			TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, io_first_block + j,
-											  operation->smgr->smgr_rlocator.locator.spcOid,
-											  operation->smgr->smgr_rlocator.locator.dbOid,
-											  operation->smgr->smgr_rlocator.locator.relNumber,
-											  operation->smgr->smgr_rlocator.backend,
-											  false);
-		}
+		/* not obvious what we'd use for time */
+		pgstat_count_io_op(io_object, io_context, IOOP_READ,
+						   1, io_buffers_len * BLCKSZ);
+	}
+
+	if (ioh)
+	{
+		pgaio_io_release(ioh);
+		ioh = NULL;
+	}
 
-		if (VacuumCostActive)
-			VacuumCostBalance += VacuumCostPageMiss * io_buffers_len;
+	if (did_start_io_overall)
+	{
+		pgaio_submit_staged();
+		return true;
 	}
+	else
+		return false;
 }
 
 /*
-- 
2.48.1.76.g4e746b1a31.dirty

