From 9d8c6210e3a5e39d585d0a8ebebeac8a9e9b62a2 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Sat, 31 Aug 2024 21:39:01 -0400
Subject: [PATCH v2.0 11/17] bufmgr: Implement AIO support

As of this commit there are no users of these AIO facilities, that'll come in
later commits.

Author:
Reviewed-By:
Discussion: https://postgr.es/m/
Backpatch:
---
 src/include/storage/aio.h             |   6 +
 src/include/storage/buf_internals.h   |   6 +
 src/include/storage/bufmgr.h          |  10 +
 src/backend/storage/aio/aio_subject.c |   5 +
 src/backend/storage/buffer/buf_init.c |   3 +
 src/backend/storage/buffer/bufmgr.c   | 431 +++++++++++++++++++++++++-
 src/backend/storage/buffer/localbuf.c |  65 ++++
 7 files changed, 519 insertions(+), 7 deletions(-)

diff --git a/src/include/storage/aio.h b/src/include/storage/aio.h
index acfd50c587c..40c80a2fed4 100644
--- a/src/include/storage/aio.h
+++ b/src/include/storage/aio.h
@@ -93,6 +93,12 @@ typedef enum PgAioHandleSharedCallbackID
 {
 	ASC_MD_READV,
 	ASC_MD_WRITEV,
+
+	ASC_SHARED_BUFFER_READ,
+	ASC_SHARED_BUFFER_WRITE,
+
+	ASC_LOCAL_BUFFER_READ,
+	ASC_LOCAL_BUFFER_WRITE,
 } PgAioHandleSharedCallbackID;
 
 
diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h
index f190e6e5e46..5cfa7dbd1f1 100644
--- a/src/include/storage/buf_internals.h
+++ b/src/include/storage/buf_internals.h
@@ -17,6 +17,7 @@
 
 #include "pgstat.h"
 #include "port/atomics.h"
+#include "storage/aio_ref.h"
 #include "storage/buf.h"
 #include "storage/bufmgr.h"
 #include "storage/condition_variable.h"
@@ -252,6 +253,8 @@ typedef struct BufferDesc
 
 	int			wait_backend_pgprocno;	/* backend of pin-count waiter */
 	int			freeNext;		/* link in freelist chain */
+
+	PgAioHandleRef io_in_progress;
 	LWLock		content_lock;	/* to lock access to buffer contents */
 } BufferDesc;
 
@@ -465,4 +468,7 @@ extern void DropRelationLocalBuffers(RelFileLocator rlocator,
 extern void DropRelationAllLocalBuffers(RelFileLocator rlocator);
 extern void AtEOXact_LocalBuffers(bool isCommit);
 
+
+extern bool ReadBufferCompleteReadLocal(Buffer buffer, int mode, bool failed);
+
 #endif							/* BUFMGR_INTERNALS_H */
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index eb0fba4230b..6cd64b8c2b3 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -177,6 +177,14 @@ extern PGDLLIMPORT int NLocBuffer;
 extern PGDLLIMPORT Block *LocalBufferBlockPointers;
 extern PGDLLIMPORT int32 *LocalRefCount;
 
+
+struct PgAioHandleSharedCallbacks;
+extern const struct PgAioHandleSharedCallbacks aio_shared_buffer_read_cb;
+extern const struct PgAioHandleSharedCallbacks aio_shared_buffer_write_cb;
+extern const struct PgAioHandleSharedCallbacks aio_local_buffer_read_cb;
+extern const struct PgAioHandleSharedCallbacks aio_local_buffer_write_cb;
+
+
 /* upper limit for effective_io_concurrency */
 #define MAX_IO_CONCURRENCY 1000
 
@@ -194,6 +202,8 @@ extern PGDLLIMPORT int32 *LocalRefCount;
 /*
  * prototypes for functions in bufmgr.c
  */
+struct PgAioHandle;
+
 extern PrefetchBufferResult PrefetchSharedBuffer(struct SMgrRelationData *smgr_reln,
 												 ForkNumber forkNum,
 												 BlockNumber blockNum);
diff --git a/src/backend/storage/aio/aio_subject.c b/src/backend/storage/aio/aio_subject.c
index 12ab1730f49..0676f3d3a66 100644
--- a/src/backend/storage/aio/aio_subject.c
+++ b/src/backend/storage/aio/aio_subject.c
@@ -35,6 +35,11 @@ static const PgAioSubjectInfo *aio_subject_info[] = {
 static const PgAioHandleSharedCallbacks *aio_shared_cbs[] = {
 	[ASC_MD_READV] = &aio_md_readv_cb,
 	[ASC_MD_WRITEV] = &aio_md_writev_cb,
+
+	[ASC_SHARED_BUFFER_READ] = &aio_shared_buffer_read_cb,
+	[ASC_SHARED_BUFFER_WRITE] = &aio_shared_buffer_write_cb,
+	[ASC_LOCAL_BUFFER_READ] = &aio_local_buffer_read_cb,
+	[ASC_LOCAL_BUFFER_WRITE] = &aio_local_buffer_write_cb,
 };
 
 
diff --git a/src/backend/storage/buffer/buf_init.c b/src/backend/storage/buffer/buf_init.c
index 09bec6449b6..059a80dfb13 100644
--- a/src/backend/storage/buffer/buf_init.c
+++ b/src/backend/storage/buffer/buf_init.c
@@ -14,6 +14,7 @@
  */
 #include "postgres.h"
 
+#include "storage/aio.h"
 #include "storage/buf_internals.h"
 #include "storage/bufmgr.h"
 #include "storage/proc.h"
@@ -126,6 +127,8 @@ BufferManagerShmemInit(void)
 
 			buf->buf_id = i;
 
+			pgaio_io_ref_clear(&buf->io_in_progress);
+
 			/*
 			 * Initially link all the buffers together as unused. Subsequent
 			 * management of this list is done by freelist.c.
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index f2e608f597d..8feafd6e53c 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -48,6 +48,7 @@
 #include "pg_trace.h"
 #include "pgstat.h"
 #include "postmaster/bgwriter.h"
+#include "storage/aio.h"
 #include "storage/buf_internals.h"
 #include "storage/bufmgr.h"
 #include "storage/fd.h"
@@ -58,6 +59,7 @@
 #include "storage/smgr.h"
 #include "storage/standby.h"
 #include "utils/memdebug.h"
+#include "utils/memutils.h"
 #include "utils/ps_status.h"
 #include "utils/rel.h"
 #include "utils/resowner.h"
@@ -541,7 +543,8 @@ static int	SyncOneBuffer(int buf_id, bool skip_recently_used,
 static void WaitIO(BufferDesc *buf);
 static bool StartBufferIO(BufferDesc *buf, bool forInput, bool nowait);
 static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty,
-							  uint32 set_flag_bits, bool forget_owner);
+							  uint32 set_flag_bits, bool forget_owner,
+							  bool syncio);
 static void AbortBufferIO(Buffer buffer);
 static void shared_buffer_write_error_callback(void *arg);
 static void local_buffer_write_error_callback(void *arg);
@@ -1108,7 +1111,7 @@ ZeroAndLockBuffer(Buffer buffer, ReadBufferMode mode, bool already_valid)
 		else
 		{
 			/* Set BM_VALID, terminate IO, and wake up any waiters */
-			TerminateBufferIO(bufHdr, false, BM_VALID, true);
+			TerminateBufferIO(bufHdr, false, BM_VALID, true, true);
 		}
 	}
 	else if (!isLocalBuf)
@@ -1593,7 +1596,7 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 			else
 			{
 				/* Set BM_VALID, terminate IO, and wake up any waiters */
-				TerminateBufferIO(bufHdr, false, BM_VALID, true);
+				TerminateBufferIO(bufHdr, false, BM_VALID, true, true);
 			}
 
 			/* Report I/Os as completing individually. */
@@ -2477,7 +2480,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr,
 		if (lock)
 			LWLockAcquire(BufferDescriptorGetContentLock(buf_hdr), LW_EXCLUSIVE);
 
-		TerminateBufferIO(buf_hdr, false, BM_VALID, true);
+		TerminateBufferIO(buf_hdr, false, BM_VALID, true, true);
 	}
 
 	pgBufferUsage.shared_blks_written += extend_by;
@@ -3926,7 +3929,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
 	 * Mark the buffer as clean (unless BM_JUST_DIRTIED has become set) and
 	 * end the BM_IO_IN_PROGRESS state.
 	 */
-	TerminateBufferIO(buf, true, 0, true);
+	TerminateBufferIO(buf, true, 0, true, true);
 
 	TRACE_POSTGRESQL_BUFFER_FLUSH_DONE(BufTagGetForkNum(&buf->tag),
 									   buf->tag.blockNum,
@@ -5541,6 +5544,7 @@ WaitIO(BufferDesc *buf)
 	for (;;)
 	{
 		uint32		buf_state;
+		PgAioHandleRef ior;
 
 		/*
 		 * It may not be necessary to acquire the spinlock to check the flag
@@ -5548,10 +5552,19 @@ WaitIO(BufferDesc *buf)
 		 * play it safe.
 		 */
 		buf_state = LockBufHdr(buf);
+		ior = buf->io_in_progress;
 		UnlockBufHdr(buf, buf_state);
 
 		if (!(buf_state & BM_IO_IN_PROGRESS))
 			break;
+
+		if (pgaio_io_ref_valid(&ior))
+		{
+			pgaio_io_ref_wait(&ior);
+			ConditionVariablePrepareToSleep(cv);
+			continue;
+		}
+
 		ConditionVariableSleep(cv, WAIT_EVENT_BUFFER_IO);
 	}
 	ConditionVariableCancelSleep();
@@ -5640,7 +5653,7 @@ StartBufferIO(BufferDesc *buf, bool forInput, bool nowait)
  */
 static void
 TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits,
-				  bool forget_owner)
+				  bool forget_owner, bool syncio)
 {
 	uint32		buf_state;
 
@@ -5652,6 +5665,13 @@ TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits,
 	if (clear_dirty && !(buf_state & BM_JUST_DIRTIED))
 		buf_state &= ~(BM_DIRTY | BM_CHECKPOINT_NEEDED);
 
+	if (!syncio)
+	{
+		/* release ownership by the AIO subsystem */
+		buf_state -= BUF_REFCOUNT_ONE;
+		pgaio_io_ref_clear(&buf->io_in_progress);
+	}
+
 	buf_state |= set_flag_bits;
 	UnlockBufHdr(buf, buf_state);
 
@@ -5660,6 +5680,40 @@ TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits,
 									BufferDescriptorGetBuffer(buf));
 
 	ConditionVariableBroadcast(BufferDescriptorGetIOCV(buf));
+
+	/*
+	 * If we just released a pin, need to do BM_PIN_COUNT_WAITER handling.
+	 * Most of the time the current backend will hold another pin preventing
+	 * that from happening, but that's e.g. not the case when completing an IO
+	 * another backend started.
+	 *
+	 * AFIXME: Deduplicate with UnpinBufferNoOwner() or just replace
+	 * BM_PIN_COUNT_WAITER with something saner.
+	 */
+	/* Support LockBufferForCleanup() */
+	if (buf_state & BM_PIN_COUNT_WAITER)
+	{
+		/*
+		 * Acquire the buffer header lock, re-check that there's a waiter.
+		 * Another backend could have unpinned this buffer, and already woken
+		 * up the waiter.  There's no danger of the buffer being replaced
+		 * after we unpinned it above, as it's pinned by the waiter.
+		 */
+		buf_state = LockBufHdr(buf);
+
+		if ((buf_state & BM_PIN_COUNT_WAITER) &&
+			BUF_STATE_GET_REFCOUNT(buf_state) == 1)
+		{
+			/* we just released the last pin other than the waiter's */
+			int			wait_backend_pgprocno = buf->wait_backend_pgprocno;
+
+			buf_state &= ~BM_PIN_COUNT_WAITER;
+			UnlockBufHdr(buf, buf_state);
+			ProcSendSignal(wait_backend_pgprocno);
+		}
+		else
+			UnlockBufHdr(buf, buf_state);
+	}
 }
 
 /*
@@ -5711,7 +5765,7 @@ AbortBufferIO(Buffer buffer)
 		}
 	}
 
-	TerminateBufferIO(buf_hdr, false, BM_IO_ERROR, false);
+	TerminateBufferIO(buf_hdr, false, BM_IO_ERROR, false, true);
 }
 
 /*
@@ -6170,3 +6224,366 @@ EvictUnpinnedBuffer(Buffer buf)
 
 	return result;
 }
+
+static bool
+ReadBufferCompleteReadShared(Buffer buffer, int mode, bool failed)
+{
+	BufferDesc *bufHdr = NULL;
+	BlockNumber blockno;
+	bool		buf_failed = false;
+	char	   *bufdata = BufferGetBlock(buffer);
+
+	Assert(BufferIsValid(buffer));
+
+	bufHdr = GetBufferDescriptor(buffer - 1);
+	blockno = bufHdr->tag.blockNum;
+
+#ifdef USE_ASSERT_CHECKING
+	{
+		uint32		buf_state = pg_atomic_read_u32(&bufHdr->state);
+
+		Assert(buf_state & BM_TAG_VALID);
+		Assert(!(buf_state & BM_VALID));
+		Assert(buf_state & BM_IO_IN_PROGRESS);
+		Assert(!(buf_state & BM_DIRTY));
+	}
+#endif
+
+	/* check for garbage data */
+	if (!failed &&
+		!PageIsVerifiedExtended((Page) bufdata, blockno,
+								PIV_LOG_WARNING | PIV_REPORT_STAT))
+	{
+		RelFileLocator rlocator = BufTagGetRelFileLocator(&bufHdr->tag);
+		BlockNumber forkNum = bufHdr->tag.forkNum;
+
+		/* AFIXME: relpathperm allocates memory */
+		MemoryContextSwitchTo(ErrorContext);
+		if (mode == READ_BUFFERS_ZERO_ON_ERROR || zero_damaged_pages)
+		{
+			ereport(LOG,
+					(errcode(ERRCODE_DATA_CORRUPTED),
+					 errmsg("invalid page in block %u of relation %s; zeroing out page",
+							blockno,
+							relpathperm(rlocator, forkNum))));
+			memset(bufdata, 0, BLCKSZ);
+		}
+		else
+		{
+			ereport(LOG,
+					(errcode(ERRCODE_DATA_CORRUPTED),
+					 errmsg("invalid page in block %u of relation %s",
+							blockno,
+							relpathperm(rlocator, forkNum))));
+			failed = true;
+			buf_failed = true;
+		}
+	}
+
+	/* Terminate I/O and set BM_VALID. */
+	TerminateBufferIO(bufHdr, false,
+					  failed ? BM_IO_ERROR : BM_VALID,
+					  false, false);
+
+	/* Report I/Os as completing individually. */
+
+	/* FIXME: Should we do TRACE_POSTGRESQL_BUFFER_READ_DONE here? */
+	return buf_failed;
+}
+
+static uint64
+ReadBufferCompleteWriteShared(Buffer buffer, bool release_lock, bool failed)
+{
+	BufferDesc *bufHdr;
+	bool		result = false;
+
+	Assert(BufferIsValid(buffer));
+
+	bufHdr = GetBufferDescriptor(buffer - 1);
+
+#ifdef USE_ASSERT_CHECKING
+	{
+		uint32		buf_state = pg_atomic_read_u32(&bufHdr->state);
+
+		Assert(buf_state & BM_VALID);
+		Assert(buf_state & BM_TAG_VALID);
+		Assert(buf_state & BM_IO_IN_PROGRESS);
+		Assert(buf_state & BM_DIRTY);
+	}
+#endif
+
+	/* AFIXME: implement track_io_timing */
+
+	TerminateBufferIO(bufHdr, /* clear_dirty = */ true,
+					  failed ? BM_IO_ERROR : 0,
+					   /* forget_owner = */ false,
+					   /* syncio = */ false);
+
+	/*
+	 * The initiator of IO is not managing the lock (i.e. called
+	 * LWLockReleaseOwnership()), we are.
+	 */
+	if (release_lock)
+		LWLockReleaseUnowned(BufferDescriptorGetContentLock(bufHdr), LW_SHARED);
+
+	return result;
+}
+
+static void
+shared_buffer_prepare_common(PgAioHandle *ioh, bool is_write)
+{
+	uint64	   *io_data;
+	uint8		io_data_len;
+	PgAioHandleRef io_ref;
+	BufferTag	first PG_USED_FOR_ASSERTS_ONLY = {0};
+
+	io_data = pgaio_io_get_io_data(ioh, &io_data_len);
+
+	pgaio_io_get_ref(ioh, &io_ref);
+
+	for (int i = 0; i < io_data_len; i++)
+	{
+		Buffer		buf = (Buffer) io_data[i];
+		BufferDesc *bufHdr;
+		uint32		buf_state;
+
+		bufHdr = GetBufferDescriptor(buf - 1);
+
+		if (i == 0)
+			first = bufHdr->tag;
+		else
+		{
+			Assert(bufHdr->tag.relNumber == first.relNumber);
+			Assert(bufHdr->tag.blockNum == first.blockNum + i);
+		}
+
+
+		buf_state = LockBufHdr(bufHdr);
+
+		Assert(buf_state & BM_TAG_VALID);
+		if (is_write)
+		{
+			Assert(buf_state & BM_VALID);
+			Assert(buf_state & BM_DIRTY);
+		}
+		else
+			Assert(!(buf_state & BM_VALID));
+
+		Assert(buf_state & BM_IO_IN_PROGRESS);
+		Assert(BUF_STATE_GET_REFCOUNT(buf_state) >= 1);
+
+		buf_state += BUF_REFCOUNT_ONE;
+		bufHdr->io_in_progress = io_ref;
+
+		UnlockBufHdr(bufHdr, buf_state);
+
+		if (is_write)
+		{
+			LWLock	   *content_lock;
+
+			content_lock = BufferDescriptorGetContentLock(bufHdr);
+
+			Assert(LWLockHeldByMe(content_lock));
+
+			/*
+			 * Lock now owned by IO.
+			 */
+			LWLockReleaseOwnership(content_lock);
+		}
+
+		/*
+		 * Stop tracking this buffer via the resowner - the AIO system now
+		 * keeps track.
+		 */
+		ResourceOwnerForgetBufferIO(CurrentResourceOwner, buf);
+	}
+}
+
+static void
+shared_buffer_read_prepare(PgAioHandle *ioh)
+{
+	shared_buffer_prepare_common(ioh, false);
+}
+
+static void
+shared_buffer_write_prepare(PgAioHandle *ioh)
+{
+	shared_buffer_prepare_common(ioh, true);
+}
+
+
+static PgAioResult
+shared_buffer_read_complete(PgAioHandle *ioh, PgAioResult prior_result)
+{
+	PgAioResult result = prior_result;
+	int			mode = pgaio_io_get_subject_data(ioh)->smgr.mode;
+	uint64	   *io_data;
+	uint8		io_data_len;
+
+	elog(DEBUG3, "%s: %d %d", __func__, prior_result.status, prior_result.result);
+
+	io_data = pgaio_io_get_io_data(ioh, &io_data_len);
+
+	for (int io_data_off = 0; io_data_off < io_data_len; io_data_off++)
+	{
+		Buffer		buf = io_data[io_data_off];
+		bool		buf_failed;
+		bool		failed;
+
+		failed =
+			prior_result.status == ARS_ERROR
+			|| prior_result.result <= io_data_off;
+
+		elog(DEBUG3, "calling rbcrs for buf %d with failed %d, error: %d, result: %d, data_off: %d",
+			 buf, failed, prior_result.status, prior_result.result, io_data_off);
+
+		/*
+		 * AFIXME: It'd probably be better to not set BM_IO_ERROR (which is
+		 * what failed = true leads to) when it's just a short read...
+		 */
+		buf_failed = ReadBufferCompleteReadShared(buf,
+												  mode,
+												  failed);
+
+		if (result.status != ARS_ERROR && buf_failed)
+		{
+			result.status = ARS_ERROR;
+			result.id = ASC_SHARED_BUFFER_READ;
+			result.error_data = io_data_off + 1;
+		}
+	}
+
+	return result;
+}
+
+static void
+shared_buffer_read_error(PgAioResult result, const PgAioSubjectData *subject_data, int elevel)
+{
+	MemoryContext oldContext = CurrentMemoryContext;
+
+	/* AFIXME: */
+	oldContext = MemoryContextSwitchTo(ErrorContext);
+
+	ereport(elevel,
+			errcode(ERRCODE_DATA_CORRUPTED),
+			errmsg("invalid page in block %u of relation %s",
+				   subject_data->smgr.blockNum + result.error_data,
+				   relpathperm(subject_data->smgr.rlocator, subject_data->smgr.forkNum)
+				   )
+		);
+	MemoryContextSwitchTo(oldContext);
+}
+
+static PgAioResult
+shared_buffer_write_complete(PgAioHandle *ioh, PgAioResult prior_result)
+{
+	PgAioResult result = prior_result;
+	uint64	   *io_data;
+	uint8		io_data_len;
+
+	elog(DEBUG3, "%s: %d %d", __func__, prior_result.status, prior_result.result);
+
+	io_data = pgaio_io_get_io_data(ioh, &io_data_len);
+
+	/* FIXME: handle outright errors */
+
+	for (int io_data_off = 0; io_data_off < io_data_len; io_data_off++)
+	{
+		Buffer		buf = io_data[io_data_off];
+
+		/* FIXME: handle short writes / failures */
+		/* FIXME: ioh->scb_data.shared_buffer.release_lock */
+		ReadBufferCompleteWriteShared(buf,
+									  true,
+									  false);
+
+	}
+
+	return result;
+}
+
+static void
+local_buffer_read_prepare(PgAioHandle *ioh)
+{
+	uint64	   *io_data;
+	uint8		io_data_len;
+	PgAioHandleRef io_ref;
+
+	io_data = pgaio_io_get_io_data(ioh, &io_data_len);
+
+	pgaio_io_get_ref(ioh, &io_ref);
+
+	for (int i = 0; i < io_data_len; i++)
+	{
+		Buffer		buf = (Buffer) io_data[i];
+		BufferDesc *bufHdr;
+		uint32		buf_state;
+
+		bufHdr = GetLocalBufferDescriptor(-buf - 1);
+
+		buf_state = pg_atomic_read_u32(&bufHdr->state);
+
+		bufHdr->io_in_progress = io_ref;
+		LocalRefCount[-buf - 1] += 1;
+
+		UnlockBufHdr(bufHdr, buf_state);
+	}
+}
+
+static PgAioResult
+local_buffer_read_complete(PgAioHandle *ioh, PgAioResult prior_result)
+{
+	PgAioResult result = prior_result;
+	int			mode = pgaio_io_get_subject_data(ioh)->smgr.mode;
+	uint64	   *io_data;
+	uint8		io_data_len;
+
+	elog(DEBUG3, "%s: %d %d", __func__, prior_result.status, prior_result.result);
+
+	io_data = pgaio_io_get_io_data(ioh, &io_data_len);
+
+	/* FIXME: error handling */
+
+	for (int io_data_off = 0; io_data_off < io_data_len; io_data_off++)
+	{
+		Buffer		buf = io_data[io_data_off];
+		bool		buf_failed;
+
+		buf_failed = ReadBufferCompleteReadLocal(buf,
+												 mode,
+												 false);
+
+		if (result.status != ARS_ERROR && buf_failed)
+		{
+			result.status = ARS_ERROR;
+			result.id = ASC_LOCAL_BUFFER_READ;
+			result.error_data = io_data_off + 1;
+		}
+	}
+
+	return result;
+}
+
+static void
+local_buffer_write_prepare(PgAioHandle *ioh)
+{
+	elog(ERROR, "not yet");
+}
+
+
+const struct PgAioHandleSharedCallbacks aio_shared_buffer_read_cb = {
+	.prepare = shared_buffer_read_prepare,
+	.complete = shared_buffer_read_complete,
+	.error = shared_buffer_read_error,
+};
+const struct PgAioHandleSharedCallbacks aio_shared_buffer_write_cb = {
+	.prepare = shared_buffer_write_prepare,
+	.complete = shared_buffer_write_complete,
+};
+const struct PgAioHandleSharedCallbacks aio_local_buffer_read_cb = {
+	.prepare = local_buffer_read_prepare,
+	.complete = local_buffer_read_complete,
+};
+const struct PgAioHandleSharedCallbacks aio_local_buffer_write_cb = {
+	.prepare = local_buffer_write_prepare,
+};
diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c
index 8da7dd6c98a..a7eb723f1e9 100644
--- a/src/backend/storage/buffer/localbuf.c
+++ b/src/backend/storage/buffer/localbuf.c
@@ -18,6 +18,7 @@
 #include "access/parallel.h"
 #include "executor/instrument.h"
 #include "pgstat.h"
+#include "storage/aio.h"
 #include "storage/buf_internals.h"
 #include "storage/bufmgr.h"
 #include "storage/fd.h"
@@ -620,6 +621,8 @@ InitLocalBuffers(void)
 		 */
 		buf->buf_id = -i - 2;
 
+		pgaio_io_ref_clear(&buf->io_in_progress);
+
 		/*
 		 * Intentionally do not initialize the buffer's atomic variable
 		 * (besides zeroing the underlying memory above). That way we get
@@ -836,3 +839,65 @@ AtProcExit_LocalBuffers(void)
 	 */
 	CheckForLocalBufferLeaks();
 }
+
+bool
+ReadBufferCompleteReadLocal(Buffer buffer, int mode, bool failed)
+{
+	BufferDesc *buf_hdr = NULL;
+	BlockNumber blockno;
+	bool		buf_failed = false;
+	char	   *bufdata = BufferGetBlock(buffer);
+
+	Assert(BufferIsValid(buffer));
+
+	buf_hdr = GetLocalBufferDescriptor(-buffer - 1);
+	blockno = buf_hdr->tag.blockNum;
+
+	/* check for garbage data */
+	if (!failed &&
+		!PageIsVerifiedExtended((Page) bufdata, blockno,
+								PIV_LOG_WARNING | PIV_REPORT_STAT))
+	{
+		RelFileLocator rlocator = BufTagGetRelFileLocator(&buf_hdr->tag);
+		BlockNumber forkNum = buf_hdr->tag.forkNum;
+
+		MemoryContextSwitchTo(ErrorContext);
+
+		if (mode == READ_BUFFERS_ZERO_ON_ERROR || zero_damaged_pages)
+		{
+
+			ereport(WARNING,
+					(errcode(ERRCODE_DATA_CORRUPTED),
+					 errmsg("invalid page in block %u of relation %s; zeroing out page",
+							blockno,
+							relpathperm(rlocator, forkNum))));
+			memset(bufdata, 0, BLCKSZ);
+		}
+		else
+		{
+			ereport(LOG,
+					(errcode(ERRCODE_DATA_CORRUPTED),
+					 errmsg("invalid page in block %u of relation %s",
+							blockno,
+							relpathperm(rlocator, forkNum))));
+			failed = true;
+			buf_failed = true;
+		}
+	}
+
+	/* Terminate I/O and set BM_VALID. */
+	pgaio_io_ref_clear(&buf_hdr->io_in_progress);
+
+	{
+		uint32		buf_state;
+
+		buf_state = pg_atomic_read_u32(&buf_hdr->state);
+		buf_state |= BM_VALID;
+		pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
+	}
+
+	/* release pin held by IO subsystem */
+	LocalRefCount[-buffer - 1] -= 1;
+
+	return buf_failed;
+}
-- 
2.45.2.827.g557ae147e6

