From ed356428e1058b156a158c2e319668611f3539cc Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Tue, 18 Mar 2025 14:40:06 -0400
Subject: [PATCH v2.11 09/27] bufmgr: Implement AIO read support

This commit implements the infrastructure to perform asynchronous reads into
the buffer pool.

To do so, it:

- Adds readv AIO callbacks for shared and local buffers

  It may be worth calling out that shared buffer completions may be run in a
  different backend than where the IO started.

- Adds an AIO wait reference to BufferDesc, to allow backends to wait for
  in-progress asynchronous IOs

- Adapts StartBufferIO(), WaitIO(), TerminateBufferIO(), and their localbuf.c
  equivalents, to be able to deal with AIO

- Moves the code to handle BM_PIN_COUNT_WAITER into a helper function, as it
  now also needs to be called on IO completion

As of this commit, nothing issues AIO on shared buffers. A future commit will
update StartReadBuffers() to do so.

Reviewed-by: Noah Misch <noah@leadboat.com>
Reviewed-by: Melanie Plageman <melanieplageman@gmail.com>
Reviewed-by: Nazir Bilal Yavuz <byavuz81@gmail.com>
Discussion: https://postgr.es/m/uvrtrknj4kdytuboidbhwclo4gxhswwcpgadptsjvjqcluzmah%40brqs62irg4dt
Discussion: https://postgr.es/m/20210223100344.llw5an2aklengrmn@alap3.anarazel.de
Discussion: https://postgr.es/m/stj36ea6yyhoxtqkhpieia2z4krnam7qyetc57rfezgk4zgapf@gcnactj4z56m
---
 src/include/storage/aio.h              |   4 +
 src/include/storage/buf_internals.h    |   7 +-
 src/include/storage/bufmgr.h           |   4 +
 src/backend/storage/aio/aio_callback.c |   5 +
 src/backend/storage/buffer/README      |   9 +-
 src/backend/storage/buffer/buf_init.c  |   3 +
 src/backend/storage/buffer/bufmgr.c    | 496 +++++++++++++++++++++++--
 src/backend/storage/buffer/localbuf.c  |  47 ++-
 8 files changed, 523 insertions(+), 52 deletions(-)

diff --git a/src/include/storage/aio.h b/src/include/storage/aio.h
index 1591a6c2fca..9a0868a270c 100644
--- a/src/include/storage/aio.h
+++ b/src/include/storage/aio.h
@@ -195,6 +195,10 @@ typedef enum PgAioHandleCallbackID
 
 	PGAIO_HCB_MD_READV,
 	PGAIO_HCB_MD_WRITEV,
+
+	PGAIO_HCB_SHARED_BUFFER_READV,
+
+	PGAIO_HCB_LOCAL_BUFFER_READV,
 } PgAioHandleCallbackID;
 
 
diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h
index 9327f60c44c..8a52ace37cd 100644
--- a/src/include/storage/buf_internals.h
+++ b/src/include/storage/buf_internals.h
@@ -17,6 +17,7 @@
 
 #include "pgstat.h"
 #include "port/atomics.h"
+#include "storage/aio_types.h"
 #include "storage/buf.h"
 #include "storage/bufmgr.h"
 #include "storage/condition_variable.h"
@@ -264,6 +265,8 @@ typedef struct BufferDesc
 
 	int			wait_backend_pgprocno;	/* backend of pin-count waiter */
 	int			freeNext;		/* link in freelist chain */
+
+	PgAioWaitRef io_wref;		/* set iff AIO is in progress */
 	LWLock		content_lock;	/* to lock access to buffer contents */
 } BufferDesc;
 
@@ -472,8 +475,8 @@ extern BlockNumber ExtendBufferedRelLocal(BufferManagerRelation bmr,
 										  uint32 *extended_by);
 extern void MarkLocalBufferDirty(Buffer buffer);
 extern void TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty,
-								   uint32 set_flag_bits);
-extern bool StartLocalBufferIO(BufferDesc *bufHdr, bool forInput);
+								   uint32 set_flag_bits, bool syncio);
+extern bool StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait);
 extern void FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln);
 extern void DropRelationLocalBuffers(RelFileLocator rlocator,
 									 ForkNumber forkNum,
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index ecc1c3a909b..b8c28fe79f4 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -15,6 +15,7 @@
 #define BUFMGR_H
 
 #include "port/pg_iovec.h"
+#include "storage/aio_types.h"
 #include "storage/block.h"
 #include "storage/buf.h"
 #include "storage/bufpage.h"
@@ -171,6 +172,9 @@ extern PGDLLIMPORT int checkpoint_flush_after;
 extern PGDLLIMPORT int backend_flush_after;
 extern PGDLLIMPORT int bgwriter_flush_after;
 
+extern const PgAioHandleCallbacks aio_shared_buffer_readv_cb;
+extern const PgAioHandleCallbacks aio_local_buffer_readv_cb;
+
 /* in buf_init.c */
 extern PGDLLIMPORT char *BufferBlocks;
 
diff --git a/src/backend/storage/aio/aio_callback.c b/src/backend/storage/aio/aio_callback.c
index a92fa8f678e..3cdd2a6b195 100644
--- a/src/backend/storage/aio/aio_callback.c
+++ b/src/backend/storage/aio/aio_callback.c
@@ -18,6 +18,7 @@
 #include "miscadmin.h"
 #include "storage/aio.h"
 #include "storage/aio_internal.h"
+#include "storage/bufmgr.h"
 #include "storage/md.h"
 
 
@@ -40,6 +41,10 @@ static const PgAioHandleCallbacksEntry aio_handle_cbs[] = {
 	CALLBACK_ENTRY(PGAIO_HCB_INVALID, aio_invalid_cb),
 
 	CALLBACK_ENTRY(PGAIO_HCB_MD_READV, aio_md_readv_cb),
+
+	CALLBACK_ENTRY(PGAIO_HCB_SHARED_BUFFER_READV, aio_shared_buffer_readv_cb),
+
+	CALLBACK_ENTRY(PGAIO_HCB_LOCAL_BUFFER_READV, aio_local_buffer_readv_cb),
 #undef CALLBACK_ENTRY
 };
 
diff --git a/src/backend/storage/buffer/README b/src/backend/storage/buffer/README
index 011af7aff3e..a182fcd660c 100644
--- a/src/backend/storage/buffer/README
+++ b/src/backend/storage/buffer/README
@@ -147,9 +147,12 @@ in the buffer.  It is used per the rules above.
 
 * The BM_IO_IN_PROGRESS flag acts as a kind of lock, used to wait for I/O on a
 buffer to complete (and in releases before 14, it was accompanied by a
-per-buffer LWLock).  The process doing a read or write sets the flag for the
-duration, and processes that need to wait for it to be cleared sleep on a
-condition variable.
+per-buffer LWLock).  The process starting a read or write sets the flag. When
+the I/O is completed, be it by the process that initiated the I/O or by
+another process, the flag is removed and the Buffer's condition variable is
+signalled.  Processes that need to wait for the I/O to complete can wait for
+asynchronous I/O by using BufferDesc->io_wref and for BM_IO_IN_PROGRESS to be
+unset by sleeping on the buffer's condition variable.
 
 
 Normal Buffer Replacement Strategy
diff --git a/src/backend/storage/buffer/buf_init.c b/src/backend/storage/buffer/buf_init.c
index ed1f8e03190..ed1dc488a42 100644
--- a/src/backend/storage/buffer/buf_init.c
+++ b/src/backend/storage/buffer/buf_init.c
@@ -14,6 +14,7 @@
  */
 #include "postgres.h"
 
+#include "storage/aio.h"
 #include "storage/buf_internals.h"
 #include "storage/bufmgr.h"
 
@@ -125,6 +126,8 @@ BufferManagerShmemInit(void)
 
 			buf->buf_id = i;
 
+			pgaio_wref_clear(&buf->io_wref);
+
 			/*
 			 * Initially link all the buffers together as unused. Subsequent
 			 * management of this list is done by freelist.c.
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index aea08b242ac..ea5f504e38a 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -48,6 +48,7 @@
 #include "pg_trace.h"
 #include "pgstat.h"
 #include "postmaster/bgwriter.h"
+#include "storage/aio.h"
 #include "storage/buf_internals.h"
 #include "storage/bufmgr.h"
 #include "storage/fd.h"
@@ -519,7 +520,8 @@ static int	SyncOneBuffer(int buf_id, bool skip_recently_used,
 static void WaitIO(BufferDesc *buf);
 static bool StartBufferIO(BufferDesc *buf, bool forInput, bool nowait);
 static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty,
-							  uint32 set_flag_bits, bool forget_owner);
+							  uint32 set_flag_bits, bool forget_owner,
+							  bool syncio);
 static void AbortBufferIO(Buffer buffer);
 static void shared_buffer_write_error_callback(void *arg);
 static void local_buffer_write_error_callback(void *arg);
@@ -1041,7 +1043,7 @@ ZeroAndLockBuffer(Buffer buffer, ReadBufferMode mode, bool already_valid)
 	{
 		/* Simple case for non-shared buffers. */
 		bufHdr = GetLocalBufferDescriptor(-buffer - 1);
-		need_to_zero = StartLocalBufferIO(bufHdr, true);
+		need_to_zero = StartLocalBufferIO(bufHdr, true, false);
 	}
 	else
 	{
@@ -1077,9 +1079,9 @@ ZeroAndLockBuffer(Buffer buffer, ReadBufferMode mode, bool already_valid)
 
 		/* Set BM_VALID, terminate IO, and wake up any waiters */
 		if (isLocalBuf)
-			TerminateLocalBufferIO(bufHdr, false, BM_VALID);
+			TerminateLocalBufferIO(bufHdr, false, BM_VALID, true);
 		else
-			TerminateBufferIO(bufHdr, false, BM_VALID, true);
+			TerminateBufferIO(bufHdr, false, BM_VALID, true, true);
 	}
 	else if (!isLocalBuf)
 	{
@@ -1391,7 +1393,8 @@ static inline bool
 WaitReadBuffersCanStartIO(Buffer buffer, bool nowait)
 {
 	if (BufferIsLocal(buffer))
-		return StartLocalBufferIO(GetLocalBufferDescriptor(-buffer - 1), true);
+		return StartLocalBufferIO(GetLocalBufferDescriptor(-buffer - 1),
+								  true, nowait);
 	else
 		return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait);
 }
@@ -1549,9 +1552,9 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 
 			/* Set BM_VALID, terminate IO, and wake up any waiters */
 			if (persistence == RELPERSISTENCE_TEMP)
-				TerminateLocalBufferIO(bufHdr, false, BM_VALID);
+				TerminateLocalBufferIO(bufHdr, false, BM_VALID, true);
 			else
-				TerminateBufferIO(bufHdr, false, BM_VALID, true);
+				TerminateBufferIO(bufHdr, false, BM_VALID, true, true);
 
 			/* Report I/Os as completing individually. */
 			TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, io_first_block + j,
@@ -2459,7 +2462,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr,
 		if (lock)
 			LWLockAcquire(BufferDescriptorGetContentLock(buf_hdr), LW_EXCLUSIVE);
 
-		TerminateBufferIO(buf_hdr, false, BM_VALID, true);
+		TerminateBufferIO(buf_hdr, false, BM_VALID, true, true);
 	}
 
 	pgBufferUsage.shared_blks_written += extend_by;
@@ -2805,6 +2808,44 @@ PinBuffer_Locked(BufferDesc *buf)
 	ResourceOwnerRememberBuffer(CurrentResourceOwner, b);
 }
 
+/*
+ * Support for waking up another backend that is waiting for the cleanup lock
+ * to be released using BM_PIN_COUNT_WAITER.
+ *
+ * See LockBufferForCleanup().
+ *
+ * Expected to be called just after releasing a buffer pin (in a BufferDesc,
+ * not just reducing the backend-local pincount for the buffer).
+ */
+static void
+WakePinCountWaiter(BufferDesc *buf)
+{
+	/*
+	 * Acquire the buffer header lock, re-check that there's a waiter. Another
+	 * backend could have unpinned this buffer, and already woken up the
+	 * waiter.
+	 *
+	 * There's no danger of the buffer being replaced after we unpinned it
+	 * above, as it's pinned by the waiter. The waiter removes
+	 * BM_PIN_COUNT_WAITER if it stops waiting for a reason other than this
+	 * backend waking it up.
+	 */
+	uint32		buf_state = LockBufHdr(buf);
+
+	if ((buf_state & BM_PIN_COUNT_WAITER) &&
+		BUF_STATE_GET_REFCOUNT(buf_state) == 1)
+	{
+		/* we just released the last pin other than the waiter's */
+		int			wait_backend_pgprocno = buf->wait_backend_pgprocno;
+
+		buf_state &= ~BM_PIN_COUNT_WAITER;
+		UnlockBufHdr(buf, buf_state);
+		ProcSendSignal(wait_backend_pgprocno);
+	}
+	else
+		UnlockBufHdr(buf, buf_state);
+}
+
 /*
  * UnpinBuffer -- make buffer available for replacement.
  *
@@ -2873,29 +2914,8 @@ UnpinBufferNoOwner(BufferDesc *buf)
 
 		/* Support LockBufferForCleanup() */
 		if (buf_state & BM_PIN_COUNT_WAITER)
-		{
-			/*
-			 * Acquire the buffer header lock, re-check that there's a waiter.
-			 * Another backend could have unpinned this buffer, and already
-			 * woken up the waiter.  There's no danger of the buffer being
-			 * replaced after we unpinned it above, as it's pinned by the
-			 * waiter.
-			 */
-			buf_state = LockBufHdr(buf);
+			WakePinCountWaiter(buf);
 
-			if ((buf_state & BM_PIN_COUNT_WAITER) &&
-				BUF_STATE_GET_REFCOUNT(buf_state) == 1)
-			{
-				/* we just released the last pin other than the waiter's */
-				int			wait_backend_pgprocno = buf->wait_backend_pgprocno;
-
-				buf_state &= ~BM_PIN_COUNT_WAITER;
-				UnlockBufHdr(buf, buf_state);
-				ProcSendSignal(wait_backend_pgprocno);
-			}
-			else
-				UnlockBufHdr(buf, buf_state);
-		}
 		ForgetPrivateRefCountEntry(ref);
 	}
 }
@@ -3916,7 +3936,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
 	 * Mark the buffer as clean (unless BM_JUST_DIRTIED has become set) and
 	 * end the BM_IO_IN_PROGRESS state.
 	 */
-	TerminateBufferIO(buf, true, 0, true);
+	TerminateBufferIO(buf, true, 0, true, true);
 
 	TRACE_POSTGRESQL_BUFFER_FLUSH_DONE(BufTagGetForkNum(&buf->tag),
 									   buf->tag.blockNum,
@@ -5472,6 +5492,7 @@ WaitIO(BufferDesc *buf)
 	for (;;)
 	{
 		uint32		buf_state;
+		PgAioWaitRef iow;
 
 		/*
 		 * It may not be necessary to acquire the spinlock to check the flag
@@ -5479,10 +5500,40 @@ WaitIO(BufferDesc *buf)
 		 * play it safe.
 		 */
 		buf_state = LockBufHdr(buf);
+
+		/*
+		 * Copy the wait reference while holding the spinlock. This protects
+		 * against a concurrent TerminateBufferIO() in another backend from
+		 * clearing the wref while it's being read.
+		 */
+		iow = buf->io_wref;
 		UnlockBufHdr(buf, buf_state);
 
+		/* no IO in progress, we don't need to wait */
 		if (!(buf_state & BM_IO_IN_PROGRESS))
 			break;
+
+		/*
+		 * The buffer has asynchronous IO in progress, wait for it to
+		 * complete.
+		 */
+		if (pgaio_wref_valid(&iow))
+		{
+			pgaio_wref_wait(&iow);
+
+			/*
+			 * The AIO subsystem internally uses condition variables and thus
+			 * might remove this backend from the BufferDesc's CV. While that
+			 * wouldn't cause a correctness issue (the first CV sleep just
+			 * immediately returns if not already registered), it seems worth
+			 * avoiding unnecessary loop iterations, given that we take care
+			 * to do so at the start of the function.
+			 */
+			ConditionVariablePrepareToSleep(cv);
+			continue;
+		}
+
+		/* wait on BufferDesc->cv, e.g. for concurrent synchronous IO */
 		ConditionVariableSleep(cv, WAIT_EVENT_BUFFER_IO);
 	}
 	ConditionVariableCancelSleep();
@@ -5491,13 +5542,12 @@ WaitIO(BufferDesc *buf)
 /*
  * StartBufferIO: begin I/O on this buffer
  *	(Assumptions)
- *	My process is executing no IO
+ *	My process is executing no IO on this buffer
  *	The buffer is Pinned
  *
- * In some scenarios there are race conditions in which multiple backends
- * could attempt the same I/O operation concurrently.  If someone else
- * has already started I/O on this buffer then we will block on the
- * I/O condition variable until he's done.
+ * In some scenarios multiple backends could attempt the same I/O operation
+ * concurrently.  If someone else has already started I/O on this buffer then
+ * we will wait for completion of the IO using WaitIO().
  *
  * Input operations are only attempted on buffers that are not BM_VALID,
  * and output operations only on buffers that are BM_VALID and BM_DIRTY,
@@ -5533,9 +5583,9 @@ StartBufferIO(BufferDesc *buf, bool forInput, bool nowait)
 
 	/* Once we get here, there is definitely no I/O active on this buffer */
 
+	/* Check if someone else already did the I/O */
 	if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY))
 	{
-		/* someone else already did the I/O */
 		UnlockBufHdr(buf, buf_state);
 		return false;
 	}
@@ -5571,7 +5621,7 @@ StartBufferIO(BufferDesc *buf, bool forInput, bool nowait)
  */
 static void
 TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits,
-				  bool forget_owner)
+				  bool forget_owner, bool syncio)
 {
 	uint32		buf_state;
 
@@ -5586,6 +5636,14 @@ TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits,
 	if (clear_dirty && !(buf_state & BM_JUST_DIRTIED))
 		buf_state &= ~(BM_DIRTY | BM_CHECKPOINT_NEEDED);
 
+	if (!syncio)
+	{
+		/* release ownership by the AIO subsystem */
+		Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
+		buf_state -= BUF_REFCOUNT_ONE;
+		pgaio_wref_clear(&buf->io_wref);
+	}
+
 	buf_state |= set_flag_bits;
 	UnlockBufHdr(buf, buf_state);
 
@@ -5594,6 +5652,17 @@ TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits,
 									BufferDescriptorGetBuffer(buf));
 
 	ConditionVariableBroadcast(BufferDescriptorGetIOCV(buf));
+
+	/*
+	 * Support LockBufferForCleanup()
+	 *
+	 * We may have just released the last pin other than the waiter's. In most
+	 * cases, this backend holds another pin on the buffer. But, if, for
+	 * example, this backend is completing an IO issued by another backend, it
+	 * may be time to wake the waiter.
+	 */
+	if (buf_state & BM_PIN_COUNT_WAITER)
+		WakePinCountWaiter(buf);
 }
 
 /*
@@ -5642,7 +5711,7 @@ AbortBufferIO(Buffer buffer)
 		}
 	}
 
-	TerminateBufferIO(buf_hdr, false, BM_IO_ERROR, false);
+	TerminateBufferIO(buf_hdr, false, BM_IO_ERROR, false, true);
 }
 
 /*
@@ -6093,3 +6162,352 @@ EvictUnpinnedBuffer(Buffer buf)
 
 	return result;
 }
+
+/*
+ * Generic implementation of the AIO handle staging callback for readv/writev
+ * on local/shared buffers.
+ *
+ * Each readv/writev can target multiple buffers. The buffers have already
+ * been registered with the IO handle.
+ *
+ * To make the IO ready for execution ("staging"), we need to ensure that the
+ * targeted buffers are in an appropriate state while the IO is ongoing. For
+ * that the AIO subsystem needs to have its own buffer pin, otherwise an error
+ * in this backend could lead to this backend's buffer pin being released as
+ * part of error handling, which in turn could lead to the buffer being
+ * replaced while IO is ongoing.
+ */
+static pg_attribute_always_inline void
+buffer_stage_common(PgAioHandle *ioh, bool is_write, bool is_temp)
+{
+	uint64	   *io_data;
+	uint8		handle_data_len;
+	PgAioWaitRef io_ref;
+	BufferTag	first PG_USED_FOR_ASSERTS_ONLY = {0};
+
+	io_data = pgaio_io_get_handle_data(ioh, &handle_data_len);
+
+	pgaio_io_get_wref(ioh, &io_ref);
+
+	/* iterate over all buffers affected by the vectored readv/writev */
+	for (int i = 0; i < handle_data_len; i++)
+	{
+		Buffer		buffer = (Buffer) io_data[i];
+		BufferDesc *buf_hdr = is_temp ?
+			GetLocalBufferDescriptor(-buffer - 1)
+			: GetBufferDescriptor(buffer - 1);
+		uint32		buf_state;
+
+		/*
+		 * Check that all the buffers are actually ones that could conceivably
+		 * be done in one IO, i.e. are sequential. This is the last
+		 * buffer-aware code before IO is actually executed and confusion
+		 * about which buffers are target by IO can be hard to debug, making
+		 * it worth doing extra-paranoid checks.
+		 */
+		if (i == 0)
+			first = buf_hdr->tag;
+		else
+		{
+			Assert(buf_hdr->tag.relNumber == first.relNumber);
+			Assert(buf_hdr->tag.blockNum == first.blockNum + i);
+		}
+
+		if (is_temp)
+			buf_state = pg_atomic_read_u32(&buf_hdr->state);
+		else
+			buf_state = LockBufHdr(buf_hdr);
+
+		/* verify the buffer is in the expected state */
+		Assert(buf_state & BM_TAG_VALID);
+		if (is_write)
+		{
+			Assert(buf_state & BM_VALID);
+			Assert(buf_state & BM_DIRTY);
+		}
+		else
+		{
+			Assert(!(buf_state & BM_VALID));
+			Assert(!(buf_state & BM_DIRTY));
+		}
+
+		/* temp buffers don't use BM_IO_IN_PROGRESS */
+		if (!is_temp)
+			Assert(buf_state & BM_IO_IN_PROGRESS);
+
+		Assert(BUF_STATE_GET_REFCOUNT(buf_state) >= 1);
+
+		/*
+		 * Reflect that the buffer is now owned by the AIO subsystem.
+		 *
+		 * For local buffers: This can't be done just via LocalRefCount, as
+		 * one might initially think, as this backend could error out while
+		 * AIO is still in progress, releasing all the pins by the backend
+		 * itself.
+		 *
+		 * This pin is released again in TerminateBufferIO().
+		 */
+		buf_state += BUF_REFCOUNT_ONE;
+		buf_hdr->io_wref = io_ref;
+
+		if (is_temp)
+			pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
+		else
+			UnlockBufHdr(buf_hdr, buf_state);
+
+		/*
+		 * Ensure the content lock that prevents buffer modifications while
+		 * the buffer is being written out is not released early due to an
+		 * error.
+		 */
+		if (is_write && !is_temp)
+		{
+			LWLock	   *content_lock;
+
+			content_lock = BufferDescriptorGetContentLock(buf_hdr);
+
+			Assert(LWLockHeldByMe(content_lock));
+
+			/*
+			 * Lock is now owned by AIO subsystem.
+			 */
+			LWLockDisown(content_lock);
+		}
+
+		/*
+		 * Stop tracking this buffer via the resowner - the AIO system now
+		 * keeps track.
+		 */
+		if (!is_temp)
+			ResourceOwnerForgetBufferIO(CurrentResourceOwner, buffer);
+	}
+}
+
+/*
+ * Helper for AIO readv completion callbacks, supporting both shared and temp
+ * buffers. Gets called once for each buffer in a multi-page read.
+ */
+static pg_attribute_always_inline PgAioResult
+buffer_readv_complete_one(uint8 buf_off, Buffer buffer, uint8 flags,
+						  bool failed, bool is_temp)
+{
+	BufferDesc *buf_hdr = is_temp ?
+		GetLocalBufferDescriptor(-buffer - 1)
+		: GetBufferDescriptor(buffer - 1);
+	BufferTag	tag = buf_hdr->tag;
+	char	   *bufdata = BufferGetBlock(buffer);
+	PgAioResult result;
+	uint32		set_flag_bits;
+
+	/* check that the buffer is in the expected state for a read */
+#ifdef USE_ASSERT_CHECKING
+	{
+		uint32		buf_state = pg_atomic_read_u32(&buf_hdr->state);
+
+		Assert(buf_state & BM_TAG_VALID);
+		Assert(!(buf_state & BM_VALID));
+		/* temp buffers don't use BM_IO_IN_PROGRESS */
+		if (!is_temp)
+			Assert(buf_state & BM_IO_IN_PROGRESS);
+		Assert(!(buf_state & BM_DIRTY));
+	}
+#endif
+
+	result.status = PGAIO_RS_OK;
+
+	/* check for garbage data */
+	if (!failed &&
+		!PageIsVerifiedExtended((Page) bufdata, tag.blockNum,
+								PIV_LOG_WARNING | PIV_REPORT_STAT))
+	{
+		RelFileLocator rlocator = BufTagGetRelFileLocator(&tag);
+
+		if ((flags & READ_BUFFERS_ZERO_ON_ERROR) || zero_damaged_pages)
+		{
+			ereport(WARNING,
+					(errcode(ERRCODE_DATA_CORRUPTED),
+					 errmsg("invalid page in block %u of relation %s; zeroing out page",
+							tag.blockNum,
+							relpathbackend(rlocator,
+										   is_temp ? MyProcNumber : INVALID_PROC_NUMBER,
+										   tag.forkNum).str)));
+			memset(bufdata, 0, BLCKSZ);
+		}
+		else
+		{
+			/* mark buffer as having failed */
+			failed = true;
+
+			/* encode error for buffer_readv_report */
+			result.status = PGAIO_RS_ERROR;
+			if (is_temp)
+				result.id = PGAIO_HCB_LOCAL_BUFFER_READV;
+			else
+				result.id = PGAIO_HCB_SHARED_BUFFER_READV;
+			result.error_data = buf_off;
+		}
+	}
+
+	/* Terminate I/O and set BM_VALID. */
+	set_flag_bits = failed ? BM_IO_ERROR : BM_VALID;
+	if (is_temp)
+		TerminateLocalBufferIO(buf_hdr, false, set_flag_bits, false);
+	else
+		TerminateBufferIO(buf_hdr, false, set_flag_bits, false, false);
+
+	/*
+	 * Call the BUFFER_READ_DONE tracepoint in the callback, even though the
+	 * callback may not be executed in the same backend that called
+	 * BUFFER_READ_START. The alternative would be to defer calling the
+	 * tracepoint to a later point (e.g. the local completion callback for
+	 * shared buffer reads), which seems even less helpful and would also
+	 * imply some overhead.
+	 */
+	TRACE_POSTGRESQL_BUFFER_READ_DONE(tag.forkNum,
+									  tag.blockNum,
+									  tag.spcOid,
+									  tag.dbOid,
+									  tag.relNumber,
+									  is_temp ? MyProcNumber : INVALID_PROC_NUMBER,
+									  false);
+
+	return result;
+}
+
+/*
+ * Perform completion handling of a single AIO read. This read may cover
+ * multiple blocks / buffers.
+ *
+ * Shared between shared and local buffers, to reduce code duplication.
+ */
+static pg_attribute_always_inline PgAioResult
+buffer_readv_complete(PgAioHandle *ioh, PgAioResult prior_result,
+					  uint8 cb_data, bool is_temp)
+{
+	PgAioResult result = prior_result;
+	PgAioTargetData *td = pgaio_io_get_target_data(ioh);
+	uint64	   *io_data;
+	uint8		handle_data_len;
+
+	if (is_temp)
+	{
+		Assert(td->smgr.is_temp);
+		Assert(pgaio_io_get_owner(ioh) == MyProcNumber);
+	}
+	else
+		Assert(!td->smgr.is_temp);
+
+	/*
+	 * Iterate over all the buffers affected by this IO and call the
+	 * per-buffer completion function for each buffer.
+	 */
+	io_data = pgaio_io_get_handle_data(ioh, &handle_data_len);
+	for (uint8 buf_off = 0; buf_off < handle_data_len; buf_off++)
+	{
+		Buffer		buf = io_data[buf_off];
+		PgAioResult buf_result;
+		bool		failed;
+
+		Assert(BufferIsValid(buf));
+
+		/*
+		 * If the entire I/O failed on a lower-level, each buffer needs to be
+		 * marked as failed. In case of a partial read, some buffers may be
+		 * ok.
+		 */
+		failed =
+			prior_result.status == PGAIO_RS_ERROR
+			|| prior_result.result <= buf_off;
+
+		buf_result = buffer_readv_complete_one(buf_off, buf, cb_data, failed,
+											   is_temp);
+
+		/*
+		 * If there wasn't any prior error and page verification failed in
+		 * some form, set the whole IO's result to the page's result.
+		 */
+		if (result.status != PGAIO_RS_ERROR
+			&& buf_result.status != PGAIO_RS_OK)
+		{
+			result = buf_result;
+			pgaio_result_report(result, td, LOG);
+		}
+	}
+
+	return result;
+}
+
+/*
+ * AIO error reporting callback for aio_shared_buffer_readv_cb and
+ * aio_local_buffer_readv_cb.
+ *
+ * Errors are encoded as follows:
+ * - the only error is page verification failing
+ * - result.error_data is the offset of the first page that failed
+ *   verification in a larger IO
+ */
+static void
+buffer_readv_report(PgAioResult result, const PgAioTargetData *target_data,
+					int elevel)
+{
+	ProcNumber	errProc;
+
+	if (target_data->smgr.is_temp)
+		errProc = MyProcNumber;
+	else
+		errProc = INVALID_PROC_NUMBER;
+
+	ereport(elevel,
+			errcode(ERRCODE_DATA_CORRUPTED),
+			errmsg("invalid page in block %u of relation %s",
+				   target_data->smgr.blockNum + result.error_data,
+				   relpathbackend(target_data->smgr.rlocator, errProc,
+								  target_data->smgr.forkNum).str));
+}
+
+static void
+shared_buffer_readv_stage(PgAioHandle *ioh, uint8 cb_data)
+{
+	buffer_stage_common(ioh, false, false);
+}
+
+static PgAioResult
+shared_buffer_readv_complete(PgAioHandle *ioh, PgAioResult prior_result,
+							 uint8 cb_data)
+{
+	return buffer_readv_complete(ioh, prior_result, cb_data, false);
+}
+
+static void
+local_buffer_readv_stage(PgAioHandle *ioh, uint8 cb_data)
+{
+	buffer_stage_common(ioh, false, true);
+}
+
+static PgAioResult
+local_buffer_readv_complete(PgAioHandle *ioh, PgAioResult prior_result,
+							uint8 cb_data)
+{
+	return buffer_readv_complete(ioh, prior_result, cb_data, true);
+}
+
+/* readv callback is passed READ_BUFFERS_* flags as callback data */
+const PgAioHandleCallbacks aio_shared_buffer_readv_cb = {
+	.stage = shared_buffer_readv_stage,
+	.complete_shared = shared_buffer_readv_complete,
+	.report = buffer_readv_report,
+};
+
+/* readv callback is passed READ_BUFFERS_* flags as callback data */
+const PgAioHandleCallbacks aio_local_buffer_readv_cb = {
+	.stage = local_buffer_readv_stage,
+
+	/*
+	 * Note that this, in contrast to the shared_buffers case, uses
+	 * complete_local, as only the issuing backend has access to the required
+	 * datastructures. This is important in case the IO completion may be
+	 * consumed incidentally by another backend.
+	 */
+	.complete_local = local_buffer_readv_complete,
+	.report = buffer_readv_report,
+};
diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c
index 3a722321533..8b6cd54d09e 100644
--- a/src/backend/storage/buffer/localbuf.c
+++ b/src/backend/storage/buffer/localbuf.c
@@ -17,7 +17,9 @@
 
 #include "access/parallel.h"
 #include "executor/instrument.h"
+#include "pg_trace.h"
 #include "pgstat.h"
+#include "storage/aio.h"
 #include "storage/buf_internals.h"
 #include "storage/bufmgr.h"
 #include "storage/fd.h"
@@ -187,7 +189,7 @@ FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln)
 	 * Try to start an I/O operation.  There currently are no reasons for
 	 * StartLocalBufferIO to return false, so we raise an error in that case.
 	 */
-	if (!StartLocalBufferIO(bufHdr, false))
+	if (!StartLocalBufferIO(bufHdr, false, false))
 		elog(ERROR, "failed to start write IO on local buffer");
 
 	/* Find smgr relation for buffer */
@@ -211,7 +213,7 @@ FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln)
 							IOOP_WRITE, io_start, 1, BLCKSZ);
 
 	/* Mark not-dirty */
-	TerminateLocalBufferIO(bufHdr, true, 0);
+	TerminateLocalBufferIO(bufHdr, true, 0, true);
 
 	pgBufferUsage.local_blks_written++;
 }
@@ -430,7 +432,7 @@ ExtendBufferedRelLocal(BufferManagerRelation bmr,
 			pg_atomic_unlocked_write_u32(&existing_hdr->state, buf_state);
 
 			/* no need to loop for local buffers */
-			StartLocalBufferIO(existing_hdr, true);
+			StartLocalBufferIO(existing_hdr, true, false);
 		}
 		else
 		{
@@ -446,7 +448,7 @@ ExtendBufferedRelLocal(BufferManagerRelation bmr,
 
 			hresult->id = victim_buf_id;
 
-			StartLocalBufferIO(victim_buf_hdr, true);
+			StartLocalBufferIO(victim_buf_hdr, true, false);
 		}
 	}
 
@@ -515,13 +517,31 @@ MarkLocalBufferDirty(Buffer buffer)
  * Like StartBufferIO, but for local buffers
  */
 bool
-StartLocalBufferIO(BufferDesc *bufHdr, bool forInput)
+StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait)
 {
-	uint32		buf_state = pg_atomic_read_u32(&bufHdr->state);
+	uint32		buf_state;
 
+	/*
+	 * With AIO the buffer could have IO in progress, e.g. when there are two
+	 * scans of the same relation. Either wait for the other IO or return
+	 * false.
+	 */
+	if (pgaio_wref_valid(&bufHdr->io_wref))
+	{
+		PgAioWaitRef iow = bufHdr->io_wref;
+
+		if (nowait)
+			return false;
+
+		pgaio_wref_wait(&iow);
+	}
+
+	/* Once we get here, there is definitely no I/O active on this buffer */
+
+	/* Check if someone else already did the I/O */
+	buf_state = pg_atomic_read_u32(&bufHdr->state);
 	if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY))
 	{
-		/* someone else already did the I/O */
 		return false;
 	}
 
@@ -536,7 +556,8 @@ StartLocalBufferIO(BufferDesc *bufHdr, bool forInput)
  * Like TerminateBufferIO, but for local buffers
  */
 void
-TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty, uint32 set_flag_bits)
+TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty, uint32 set_flag_bits,
+					   bool syncio)
 {
 	/* Only need to adjust flags */
 	uint32		buf_state = pg_atomic_read_u32(&bufHdr->state);
@@ -549,6 +570,14 @@ TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty, uint32 set_flag_bit
 	if (clear_dirty)
 		buf_state &= ~BM_DIRTY;
 
+	if (!syncio)
+	{
+		/* release pin held by IO subsystem, see also buffer_stage_common() */
+		Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
+		buf_state -= BUF_REFCOUNT_ONE;
+		pgaio_wref_clear(&bufHdr->io_wref);
+	}
+
 	buf_state |= set_flag_bits;
 	pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
 
@@ -714,6 +743,8 @@ InitLocalBuffers(void)
 		 */
 		buf->buf_id = -i - 2;
 
+		pgaio_wref_clear(&buf->io_wref);
+
 		/*
 		 * Intentionally do not initialize the buffer's atomic variable
 		 * (besides zeroing the underlying memory above). That way we get
-- 
2.48.1.76.g4e746b1a31.dirty

