From 3b7b8829481e08137d42fbd660976923dc8b293a Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Tue, 11 Mar 2025 11:33:13 -0400
Subject: [PATCH v2.8 31/38] bufmgr: Implement AIO write support

As of this commit there are no users of these AIO facilities, that'll come in
later commits.

Author:
Reviewed-By:
Discussion: https://postgr.es/m/
Backpatch:
---
 src/include/storage/aio.h              |   2 +
 src/include/storage/bufmgr.h           |   2 +
 src/backend/storage/aio/aio_callback.c |   2 +
 src/backend/storage/buffer/bufmgr.c    | 182 ++++++++++++++++++++++++-
 4 files changed, 187 insertions(+), 1 deletion(-)

diff --git a/src/include/storage/aio.h b/src/include/storage/aio.h
index 3047dc77ce1..192abc5a712 100644
--- a/src/include/storage/aio.h
+++ b/src/include/storage/aio.h
@@ -186,8 +186,10 @@ typedef enum PgAioHandleCallbackID
 	PGAIO_HCB_MD_WRITEV,
 
 	PGAIO_HCB_SHARED_BUFFER_READV,
+	PGAIO_HCB_SHARED_BUFFER_WRITEV,
 
 	PGAIO_HCB_LOCAL_BUFFER_READV,
+	PGAIO_HCB_LOCAL_BUFFER_WRITEV,
 } PgAioHandleCallbackID;
 
 
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index ca4ca22c345..fffc76d11d9 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -176,7 +176,9 @@ extern PGDLLIMPORT int backend_flush_after;
 extern PGDLLIMPORT int bgwriter_flush_after;
 
 extern const PgAioHandleCallbacks aio_shared_buffer_readv_cb;
+extern const PgAioHandleCallbacks aio_shared_buffer_writev_cb;
 extern const PgAioHandleCallbacks aio_local_buffer_readv_cb;
+extern const PgAioHandleCallbacks aio_local_buffer_writev_cb;
 
 /* in buf_init.c */
 extern PGDLLIMPORT char *BufferBlocks;
diff --git a/src/backend/storage/aio/aio_callback.c b/src/backend/storage/aio/aio_callback.c
index fb6ac058a09..7162f722e3c 100644
--- a/src/backend/storage/aio/aio_callback.c
+++ b/src/backend/storage/aio/aio_callback.c
@@ -44,8 +44,10 @@ static const PgAioHandleCallbacksEntry aio_handle_cbs[] = {
 	CALLBACK_ENTRY(PGAIO_HCB_MD_WRITEV, aio_md_writev_cb),
 
 	CALLBACK_ENTRY(PGAIO_HCB_SHARED_BUFFER_READV, aio_shared_buffer_readv_cb),
+	CALLBACK_ENTRY(PGAIO_HCB_SHARED_BUFFER_WRITEV, aio_shared_buffer_writev_cb),
 
 	CALLBACK_ENTRY(PGAIO_HCB_LOCAL_BUFFER_READV, aio_local_buffer_readv_cb),
+	CALLBACK_ENTRY(PGAIO_HCB_LOCAL_BUFFER_WRITEV, aio_local_buffer_writev_cb),
 #undef CALLBACK_ENTRY
 };
 
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 860886d4d90..886fbaf63f3 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -5507,7 +5507,15 @@ LockBuffer(Buffer buffer, int mode)
 	else if (mode == BUFFER_LOCK_SHARE)
 		LWLockAcquire(BufferDescriptorGetContentLock(buf), LW_SHARED);
 	else if (mode == BUFFER_LOCK_EXCLUSIVE)
+	{
+		/*
+		 * FIXME: Wait for AIO writes, otherwise there would be a risk of
+		 * deadlock. This isn't entirely trivial to do in a race-free way, IO
+		 * could be started between us checking whether there is IO and
+		 * enqueueing ourselves for the lock.
+		 */
 		LWLockAcquire(BufferDescriptorGetContentLock(buf), LW_EXCLUSIVE);
+	}
 	else
 		elog(ERROR, "unrecognized buffer lock mode: %d", mode);
 }
@@ -5522,6 +5530,19 @@ ConditionalLockBuffer(Buffer buffer)
 {
 	BufferDesc *buf;
 
+	/*
+	 * FIXME: Wait for AIO writes. Some code does not deal well
+	 * ConditionalLockBuffer() continuously failing, e.g.
+	 * spginsert()->spgdoinsert() ends up busy-looping (easily reproducible by
+	 * just making this function always fail and running the regression
+	 * tests). While that code could be fixed, it'd be hard to find all
+	 * problematic places.
+	 *
+	 * It would be OK to wait for the IO as waiting for IO completion does not
+	 * need to wait for any locks that could lead to an undetected deadlock or
+	 * such.
+	 */
+
 	Assert(BufferIsPinned(buffer));
 	if (BufferIsLocal(buffer))
 		return true;			/* act as though we got it */
@@ -5594,6 +5615,11 @@ LockBufferForCleanup(Buffer buffer)
 	{
 		uint32		buf_state;
 
+		/*
+		 * FIXME: LockBuffer()'s handling of in-progress writes (once
+		 * implemented) should suffice to deal with deadlock risk.
+		 */
+
 		/* Try to acquire lock */
 		LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 		buf_state = LockBufHdr(bufHdr);
@@ -5741,6 +5767,14 @@ ConditionalLockBufferForCleanup(Buffer buffer)
 
 	Assert(BufferIsValid(buffer));
 
+	/*
+	 * FIXME: Should wait for IO for the same reason as in
+	 * ConditionalLockBuffer(). Needs to happen before the
+	 * ConditionalLockBuffer() call below, as we'd never reach the
+	 * ConditionalLockBuffer() call due the buffer pin held for the duration
+	 * of the IO.
+	 */
+
 	if (BufferIsLocal(buffer))
 	{
 		refcount = LocalRefCount[-buffer - 1];
@@ -6802,12 +6836,131 @@ buffer_readv_report(PgAioResult result, const PgAioTargetData *target_data, int
 		);
 }
 
+/*
+ * Helper for AIO writev completion callbacks, supporting both shared and temp
+ * buffers. Gets called once for each buffer in a multi-page write.
+ */
+static pg_attribute_always_inline PgAioResult
+buffer_writev_complete_one(uint8 buf_off, Buffer buffer, uint8 flags,
+						   bool failed, bool is_temp)
+{
+	BufferDesc *buf_hdr = is_temp ?
+		GetLocalBufferDescriptor(-buffer - 1)
+		: GetBufferDescriptor(buffer - 1);
+	PgAioResult result;
+	bool		clear_dirty;
+	uint32		set_flag_bits;
+
+#ifdef USE_ASSERT_CHECKING
+	{
+		uint32		buf_state = pg_atomic_read_u32(&buf_hdr->state);
+
+		Assert(buf_state & BM_VALID);
+		Assert(buf_state & BM_TAG_VALID);
+		/* temp buffers don't use BM_IO_IN_PROGRESS */
+		if (!is_temp)
+			Assert(buf_state & BM_IO_IN_PROGRESS);
+		Assert(buf_state & BM_DIRTY);
+	}
+#endif
+
+	result.status = ARS_OK;
+
+	clear_dirty = failed ? false : true;
+	set_flag_bits = failed ? BM_IO_ERROR : 0;
+
+	if (is_temp)
+		TerminateLocalBufferIO(buf_hdr, clear_dirty, set_flag_bits, false);
+	else
+		TerminateBufferIO(buf_hdr, clear_dirty, set_flag_bits, false, false);
+
+	/*
+	 * The initiator of IO is not managing the lock (i.e. we called
+	 * LWLockDisown()), we are.
+	 */
+	if (!is_temp)
+		LWLockReleaseDisowned(BufferDescriptorGetContentLock(buf_hdr),
+							  LW_SHARED);
+
+	/* FIXME: tracepoint */
+
+	return result;
+}
+
+/*
+ * Perform completion handling of a single AIO write. This write may cover
+ * multiple blocks / buffers.
+ *
+ * Shared between shared and local buffers, to reduce code duplication.
+ */
+static pg_attribute_always_inline PgAioResult
+buffer_writev_complete(PgAioHandle *ioh, PgAioResult prior_result,
+					   uint8 cb_data, bool is_temp)
+{
+	PgAioResult result = prior_result;
+	PgAioTargetData *td = pgaio_io_get_target_data(ioh);
+	uint64	   *io_data;
+	uint8		handle_data_len;
+
+	if (is_temp)
+	{
+		Assert(td->smgr.is_temp);
+		Assert(pgaio_io_get_owner(ioh) == MyProcNumber);
+	}
+	else
+		Assert(!td->smgr.is_temp);
+
+	/*
+	 * Iterate over all the buffers affected by this IO and call appropriate
+	 * per-buffer completion function for each buffer.
+	 */
+	io_data = pgaio_io_get_handle_data(ioh, &handle_data_len);
+	for (uint8 buf_off = 0; buf_off < handle_data_len; buf_off++)
+	{
+		Buffer		buf = io_data[buf_off];
+		PgAioResult buf_result;
+		bool		failed;
+
+		Assert(BufferIsValid(buf));
+
+		/*
+		 * If the entire failed on a lower-level, each buffer needs to be
+		 * marked as failed. In case of a partial read, some buffers may be
+		 * ok.
+		 */
+		failed =
+			prior_result.status == ARS_ERROR
+			|| prior_result.result <= buf_off;
+
+		buf_result = buffer_writev_complete_one(buf_off, buf, cb_data, failed,
+												is_temp);
+
+		/*
+		 * If there wasn't any prior error and the IO for this page failed in
+		 * some form, set the whole IO's to the page's result.
+		 */
+		if (result.status != ARS_ERROR && buf_result.status != ARS_OK)
+		{
+			result = buf_result;
+			pgaio_result_report(result, td, LOG);
+		}
+	}
+
+	return result;
+}
+
 static void
 shared_buffer_readv_stage(PgAioHandle *ioh, uint8 cb_data)
 {
 	buffer_stage_common(ioh, false, false);
 }
 
+static void
+shared_buffer_writev_stage(PgAioHandle *ioh, uint8 cb_data)
+{
+	buffer_stage_common(ioh, true, false);
+}
+
 static PgAioResult
 shared_buffer_readv_complete(PgAioHandle *ioh, PgAioResult prior_result,
 							 uint8 cb_data)
@@ -6815,6 +6968,13 @@ shared_buffer_readv_complete(PgAioHandle *ioh, PgAioResult prior_result,
 	return buffer_readv_complete(ioh, prior_result, cb_data, false);
 }
 
+static PgAioResult
+shared_buffer_writev_complete(PgAioHandle *ioh, PgAioResult prior_result,
+							  uint8 cb_data)
+{
+	return buffer_writev_complete(ioh, prior_result, cb_data, false);
+}
+
 static void
 local_buffer_readv_stage(PgAioHandle *ioh, uint8 cb_data)
 {
@@ -6828,13 +6988,29 @@ local_buffer_readv_complete(PgAioHandle *ioh, PgAioResult prior_result,
 	return buffer_readv_complete(ioh, prior_result, cb_data, true);
 }
 
-/* readv callback is is passed READ_BUFFERS_* flags as callback data */
+static void
+local_buffer_writev_stage(PgAioHandle *ioh, uint8 cb_data)
+{
+	/*
+	 * Currently this is unreachable as the only write support is for
+	 * checkpointer / bgwriter, which don't deal with local buffers.
+	 */
+	elog(ERROR, "should be unreachable");
+}
+
+
+/* readv callback is passed READ_BUFFERS_* flags as callback data */
 const PgAioHandleCallbacks aio_shared_buffer_readv_cb = {
 	.stage = shared_buffer_readv_stage,
 	.complete_shared = shared_buffer_readv_complete,
 	.report = buffer_readv_report,
 };
 
+const PgAioHandleCallbacks aio_shared_buffer_writev_cb = {
+	.stage = shared_buffer_writev_stage,
+	.complete_shared = shared_buffer_writev_complete,
+};
+
 /* readv callback is is passed READ_BUFFERS_* flags as callback data */
 const PgAioHandleCallbacks aio_local_buffer_readv_cb = {
 	.stage = local_buffer_readv_stage,
@@ -6848,3 +7024,7 @@ const PgAioHandleCallbacks aio_local_buffer_readv_cb = {
 	.complete_local = local_buffer_readv_complete,
 	.report = buffer_readv_report,
 };
+
+const PgAioHandleCallbacks aio_local_buffer_writev_cb = {
+	.stage = local_buffer_writev_stage,
+};
-- 
2.48.1.76.g4e746b1a31.dirty

