From bfd939b88a8dcdbc424c1e7452d70195a46910ae Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Sat, 31 Aug 2024 21:55:59 -0400
Subject: [PATCH v2.1 15/20] bufmgr: Use aio for StartReadBuffers()

Author:
Reviewed-By:
Discussion: https://postgr.es/m/
Backpatch:
---
 src/include/storage/bufmgr.h        |  25 ++-
 src/backend/storage/buffer/bufmgr.c | 259 +++++++++++++++++-----------
 2 files changed, 182 insertions(+), 102 deletions(-)

diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 6cd64b8c2b3..a075a40b2ed 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -15,6 +15,7 @@
 #define BUFMGR_H
 
 #include "port/pg_iovec.h"
+#include "storage/aio_ref.h"
 #include "storage/block.h"
 #include "storage/buf.h"
 #include "storage/bufpage.h"
@@ -107,11 +108,22 @@ typedef struct BufferManagerRelation
 #define BMR_REL(p_rel) ((BufferManagerRelation){.rel = p_rel})
 #define BMR_SMGR(p_smgr, p_relpersistence) ((BufferManagerRelation){.smgr = p_smgr, .relpersistence = p_relpersistence})
 
+
+#define MAX_IO_COMBINE_LIMIT PG_IOV_MAX
+#define DEFAULT_IO_COMBINE_LIMIT Min(MAX_IO_COMBINE_LIMIT, (128 * 1024) / BLCKSZ)
+
+
 /* Zero out page if reading fails. */
 #define READ_BUFFERS_ZERO_ON_ERROR (1 << 0)
 /* Call smgrprefetch() if I/O necessary. */
 #define READ_BUFFERS_ISSUE_ADVICE (1 << 1)
 
+/*
+ * FIXME: PgAioReturn is defined in aio.h. It'd be much better if we didn't
+ * need to include that here.  Perhaps this could live in a separate header?
+ */
+#include "storage/aio.h"
+
 struct ReadBuffersOperation
 {
 	/* The following members should be set by the caller. */
@@ -131,6 +143,17 @@ struct ReadBuffersOperation
 	int			flags;
 	int16		nblocks;
 	int16		io_buffers_len;
+
+	/*
+	 * In some rare-ish cases one operation causes multiple reads (e.g. if a
+	 * buffer was concurrently read by another backend). It'd be much better
+	 * if we ensured that each ReadBuffersOperation covered only one IO - but
+	 * that's not entirely trivial, due to having pinned victim buffers before
+	 * starting IOs.
+	 */
+	int16		nios;
+	PgAioHandleRef refs[MAX_IO_COMBINE_LIMIT];
+	PgAioReturn returns[MAX_IO_COMBINE_LIMIT];
 };
 
 typedef struct ReadBuffersOperation ReadBuffersOperation;
@@ -161,8 +184,6 @@ extern PGDLLIMPORT bool track_io_timing;
 extern PGDLLIMPORT int effective_io_concurrency;
 extern PGDLLIMPORT int maintenance_io_concurrency;
 
-#define MAX_IO_COMBINE_LIMIT PG_IOV_MAX
-#define DEFAULT_IO_COMBINE_LIMIT Min(MAX_IO_COMBINE_LIMIT, (128 * 1024) / BLCKSZ)
 extern PGDLLIMPORT int io_combine_limit;
 
 extern PGDLLIMPORT int checkpoint_flush_after;
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 976ced82b6a..4914c71d41e 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -1253,6 +1253,12 @@ ReadBuffer_common(Relation rel, SMgrRelation smgr, char smgr_persistence,
 	return buffer;
 }
 
+static bool AsyncReadBuffers(ReadBuffersOperation *operation,
+							 Buffer *buffers,
+							 BlockNumber blockNum,
+							 int *nblocks,
+							 int flags);
+
 static pg_attribute_always_inline bool
 StartReadBuffersImpl(ReadBuffersOperation *operation,
 					 Buffer *buffers,
@@ -1288,6 +1294,12 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
 			 * so we stop here.
 			 */
 			actual_nblocks = i + 1;
+
+			ereport(DEBUG3,
+					errmsg("found buf %d, idx %i: %s, data %p",
+						   buffers[i], i, DebugPrintBufferRefcount(buffers[i]),
+						   BufferGetBlock(buffers[i])),
+					errhidestmt(true), errhidecontext(true));
 			break;
 		}
 		else
@@ -1325,27 +1337,18 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
 	operation->nblocks = actual_nblocks;
 	operation->io_buffers_len = io_buffers_len;
 
-	if (flags & READ_BUFFERS_ISSUE_ADVICE)
-	{
-		/*
-		 * In theory we should only do this if PinBufferForBlock() had to
-		 * allocate new buffers above.  That way, if two calls to
-		 * StartReadBuffers() were made for the same blocks before
-		 * WaitReadBuffers(), only the first would issue the advice. That'd be
-		 * a better simulation of true asynchronous I/O, which would only
-		 * start the I/O once, but isn't done here for simplicity.  Note also
-		 * that the following call might actually issue two advice calls if we
-		 * cross a segment boundary; in a true asynchronous version we might
-		 * choose to process only one real I/O at a time in that case.
-		 */
-		smgrprefetch(operation->smgr,
-					 operation->forknum,
-					 blockNum,
-					 operation->io_buffers_len);
-	}
+	operation->nios = 0;
 
-	/* Indicate that WaitReadBuffers() should be called. */
-	return true;
+	/*
+	 * TODO: When called for synchronous IO execution, we probably should
+	 * enter a dedicated fastpath here.
+	 */
+
+	/* initiate the IO */
+	return AsyncReadBuffers(operation,
+							buffers,
+							blockNum,
+							nblocks, flags);
 }
 
 /*
@@ -1397,12 +1400,31 @@ StartReadBuffer(ReadBuffersOperation *operation,
 }
 
 static inline bool
-WaitReadBuffersCanStartIO(Buffer buffer, bool nowait)
+ReadBuffersCanStartIO(Buffer buffer, bool nowait)
 {
 	if (BufferIsLocal(buffer))
 	{
 		BufferDesc *bufHdr = GetLocalBufferDescriptor(-buffer - 1);
 
+		/*
+		 * The buffer could have IO in progress by another scan. Right now
+		 * localbuf.c doesn't use IO_IN_PROGRESS, which is why we need this
+		 * hack.
+		 *
+		 * AFIXME: localbuf.c should use IO_IN_PROGRESS / have an equivalent
+		 * of StartBufferIO().
+		 */
+		if (pgaio_io_ref_valid(&bufHdr->io_in_progress))
+		{
+			PgAioHandleRef ior = bufHdr->io_in_progress;
+
+			ereport(DEBUG3,
+					errmsg("waiting for temp buffer IO in CSIO"),
+					errhidestmt(true), errhidecontext(true));
+			pgaio_io_ref_wait(&ior);
+			return false;
+		}
+
 		return (pg_atomic_read_u32(&bufHdr->state) & BM_VALID) == 0;
 	}
 	else
@@ -1412,12 +1434,7 @@ WaitReadBuffersCanStartIO(Buffer buffer, bool nowait)
 void
 WaitReadBuffers(ReadBuffersOperation *operation)
 {
-	Buffer	   *buffers;
 	int			nblocks;
-	BlockNumber blocknum;
-	ForkNumber	forknum;
-	IOContext	io_context;
-	IOObject	io_object;
 	char		persistence;
 
 	/*
@@ -1433,11 +1450,65 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 	if (nblocks == 0)
 		return;					/* nothing to do */
 
+	persistence = operation->persistence;
+
+	Assert(operation->nios > 0);
+
+	for (int i = 0; i < operation->nios; i++)
+	{
+		PgAioReturn *aio_ret;
+
+		pgaio_io_ref_wait(&operation->refs[i]);
+
+		aio_ret = &operation->returns[i];
+
+		if (aio_ret->result.status != ARS_OK)
+			pgaio_result_log(aio_ret->result, &aio_ret->subject_data, ERROR);
+	}
+
+	/*
+	 * We count all these blocks as read by this backend.  This is traditional
+	 * behavior, but might turn out to be not true if we find that someone
+	 * else has beaten us and completed the read of some of these blocks.  In
+	 * that case the system globally double-counts, but we traditionally don't
+	 * count this as a "hit", and we don't have a separate counter for "miss,
+	 * but another backend completed the read".
+	 */
+	if (persistence == RELPERSISTENCE_TEMP)
+		pgBufferUsage.local_blks_read += nblocks;
+	else
+		pgBufferUsage.shared_blks_read += nblocks;
+
+	if (VacuumCostActive)
+		VacuumCostBalance += VacuumCostPageMiss * nblocks;
+
+	/* FIXME: io timing */
+	/* FIXME: READ_DONE tracepoint */
+}
+
+static bool
+AsyncReadBuffers(ReadBuffersOperation *operation,
+				 Buffer *buffers,
+				 BlockNumber blockNum,
+				 int *nblocks,
+				 int flags)
+{
+	int			io_buffers_len = 0;
+	BlockNumber blocknum;
+	ForkNumber	forknum;
+	IOContext	io_context;
+	IOObject	io_object;
+	char		persistence;
+	bool		did_start_io_overall = false;
+	PgAioHandle *ioh = NULL;
+
 	buffers = &operation->buffers[0];
 	blocknum = operation->blocknum;
 	forknum = operation->forknum;
-	persistence = operation->persistence;
 
+	persistence = operation->rel
+		? operation->rel->rd_rel->relpersistence
+		: RELPERSISTENCE_PERMANENT;
 	if (persistence == RELPERSISTENCE_TEMP)
 	{
 		io_context = IOCONTEXT_NORMAL;
@@ -1458,25 +1529,33 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 	 * but another backend completed the read".
 	 */
 	if (persistence == RELPERSISTENCE_TEMP)
-		pgBufferUsage.local_blks_read += nblocks;
+		pgBufferUsage.local_blks_read += *nblocks;
 	else
-		pgBufferUsage.shared_blks_read += nblocks;
+		pgBufferUsage.shared_blks_read += *nblocks;
 
-	for (int i = 0; i < nblocks; ++i)
+	for (int i = 0; i < *nblocks; ++i)
 	{
-		int			io_buffers_len;
-		Buffer		io_buffers[MAX_IO_COMBINE_LIMIT];
 		void	   *io_pages[MAX_IO_COMBINE_LIMIT];
-		instr_time	io_start;
+		Buffer		io_buffers[MAX_IO_COMBINE_LIMIT];
 		BlockNumber io_first_block;
+		bool		did_start_io_this = false;
+
+		/*
+		 * Get IO before ReadBuffersCanStartIO, as pgaio_io_get() might block,
+		 * which we don't want after setting IO_IN_PROGRESS.
+		 */
+		if (likely(!ioh))
+			ioh = pgaio_io_get(CurrentResourceOwner, &operation->returns[operation->nios]);
 
 		/*
 		 * Skip this block if someone else has already completed it.  If an
 		 * I/O is already in progress in another backend, this will wait for
 		 * the outcome: either done, or something went wrong and we will
 		 * retry.
+		 *
+		 * ATODO: Should we wait if we already submitted another IO?
 		 */
-		if (!WaitReadBuffersCanStartIO(buffers[i], false))
+		if (!ReadBuffersCanStartIO(buffers[i], did_start_io_overall))
 		{
 			/*
 			 * Report this as a 'hit' for this backend, even though it must
@@ -1488,6 +1567,10 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 											  operation->smgr->smgr_rlocator.locator.relNumber,
 											  operation->smgr->smgr_rlocator.backend,
 											  true);
+
+			ereport(DEBUG3,
+					errmsg("can't start io for first buffer %u", buffers[i]),
+					errhidestmt(true), errhidecontext(true));
 			continue;
 		}
 
@@ -1497,6 +1580,11 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 		io_first_block = blocknum + i;
 		io_buffers_len = 1;
 
+		ereport(DEBUG3,
+				errmsg("first prepped for io: %s, offset %d",
+					   DebugPrintBufferRefcount(io_buffers[0]), i),
+				errhidestmt(true), errhidecontext(true));
+
 		/*
 		 * How many neighboring-on-disk blocks can we can scatter-read into
 		 * other buffers at the same time?  In this case we don't wait if we
@@ -1504,86 +1592,57 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 		 * for the head block, so we should get on with that I/O as soon as
 		 * possible.  We'll come back to this block again, above.
 		 */
-		while ((i + 1) < nblocks &&
-			   WaitReadBuffersCanStartIO(buffers[i + 1], true))
+		while ((i + 1) < *nblocks &&
+			   ReadBuffersCanStartIO(buffers[i + 1], true))
 		{
 			/* Must be consecutive block numbers. */
 			Assert(BufferGetBlockNumber(buffers[i + 1]) ==
 				   BufferGetBlockNumber(buffers[i]) + 1);
 
+			ereport(DEBUG3,
+					errmsg("seq prepped for io: %s, offset %d",
+						   DebugPrintBufferRefcount(buffers[i + 1]),
+						   i + 1),
+					errhidestmt(true), errhidecontext(true));
+
 			io_buffers[io_buffers_len] = buffers[++i];
 			io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]);
 		}
 
-		io_start = pgstat_prepare_io_time(track_io_timing);
-		smgrreadv(operation->smgr, forknum, io_first_block, io_pages, io_buffers_len);
-		pgstat_count_io_op_time(io_object, io_context, IOOP_READ, io_start,
-								io_buffers_len);
+		pgaio_io_get_ref(ioh, &operation->refs[operation->nios]);
 
-		/* Verify each block we read, and terminate the I/O. */
-		for (int j = 0; j < io_buffers_len; ++j)
+		pgaio_io_set_io_data_32(ioh, (uint32 *) io_buffers, io_buffers_len);
+		if (persistence == RELPERSISTENCE_TEMP)
 		{
-			BufferDesc *bufHdr;
-			Block		bufBlock;
-
-			if (persistence == RELPERSISTENCE_TEMP)
-			{
-				bufHdr = GetLocalBufferDescriptor(-io_buffers[j] - 1);
-				bufBlock = LocalBufHdrGetBlock(bufHdr);
-			}
-			else
-			{
-				bufHdr = GetBufferDescriptor(io_buffers[j] - 1);
-				bufBlock = BufHdrGetBlock(bufHdr);
-			}
-
-			/* check for garbage data */
-			if (!PageIsVerifiedExtended((Page) bufBlock, io_first_block + j,
-										PIV_LOG_WARNING | PIV_REPORT_STAT))
-			{
-				if ((operation->flags & READ_BUFFERS_ZERO_ON_ERROR) || zero_damaged_pages)
-				{
-					ereport(WARNING,
-							(errcode(ERRCODE_DATA_CORRUPTED),
-							 errmsg("invalid page in block %u of relation %s; zeroing out page",
-									io_first_block + j,
-									relpath(operation->smgr->smgr_rlocator, forknum))));
-					memset(bufBlock, 0, BLCKSZ);
-				}
-				else
-					ereport(ERROR,
-							(errcode(ERRCODE_DATA_CORRUPTED),
-							 errmsg("invalid page in block %u of relation %s",
-									io_first_block + j,
-									relpath(operation->smgr->smgr_rlocator, forknum))));
-			}
-
-			/* Terminate I/O and set BM_VALID. */
-			if (persistence == RELPERSISTENCE_TEMP)
-			{
-				uint32		buf_state = pg_atomic_read_u32(&bufHdr->state);
-
-				buf_state |= BM_VALID;
-				pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
-			}
-			else
-			{
-				/* Set BM_VALID, terminate IO, and wake up any waiters */
-				TerminateBufferIO(bufHdr, false, BM_VALID, true, true);
-			}
-
-			/* Report I/Os as completing individually. */
-			TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, io_first_block + j,
-											  operation->smgr->smgr_rlocator.locator.spcOid,
-											  operation->smgr->smgr_rlocator.locator.dbOid,
-											  operation->smgr->smgr_rlocator.locator.relNumber,
-											  operation->smgr->smgr_rlocator.backend,
-											  false);
+			pgaio_io_add_shared_cb(ioh, ASC_LOCAL_BUFFER_READ);
+			pgaio_io_set_flag(ioh, AHF_REFERENCES_LOCAL);
 		}
+		else
+			pgaio_io_add_shared_cb(ioh, ASC_SHARED_BUFFER_READ);
 
-		if (VacuumCostActive)
-			VacuumCostBalance += VacuumCostPageMiss * io_buffers_len;
+		did_start_io_overall = did_start_io_this = true;
+		smgrstartreadv(ioh, operation->smgr, forknum, io_first_block,
+					   io_pages, io_buffers_len);
+		ioh = NULL;
+		operation->nios++;
+
+		/* not obvious what we'd use for time */
+		pgstat_count_io_op_n(io_object, io_context, IOOP_READ, io_buffers_len);
 	}
+
+	if (ioh)
+	{
+		pgaio_io_release(ioh);
+		ioh = NULL;
+	}
+
+	if (did_start_io_overall)
+	{
+		pgaio_submit_staged();
+		return true;
+	}
+	else
+		return false;
 }
 
 /*
-- 
2.45.2.827.g557ae147e6

