From 40e15609a95f6733a7fe0e202c5ec4add3044bad Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Sat, 31 Aug 2024 21:39:01 -0400
Subject: [PATCH v2 15/20] bufmgr: Implement AIO write support

As of this commit there are no users of these AIO facilities, that'll come in
later commits.

Author:
Reviewed-By:
Discussion: https://postgr.es/m/
Backpatch:
---
 src/include/storage/aio.h             |  2 +
 src/include/storage/bufmgr.h          |  2 +
 src/backend/storage/aio/aio_subject.c |  2 +
 src/backend/storage/buffer/bufmgr.c   | 85 +++++++++++++++++++++++++++
 4 files changed, 91 insertions(+)

diff --git a/src/include/storage/aio.h b/src/include/storage/aio.h
index 1bef475b0a9..caa52d2aaba 100644
--- a/src/include/storage/aio.h
+++ b/src/include/storage/aio.h
@@ -106,8 +106,10 @@ typedef enum PgAioHandleSharedCallbackID
 	ASC_MD_WRITEV,
 
 	ASC_SHARED_BUFFER_READ,
+	ASC_SHARED_BUFFER_WRITE,
 
 	ASC_LOCAL_BUFFER_READ,
+	ASC_LOCAL_BUFFER_WRITE,
 } PgAioHandleSharedCallbackID;
 
 
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 2a836cf98c6..2e88b19619c 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -205,7 +205,9 @@ extern PGDLLIMPORT int32 *LocalRefCount;
 
 struct PgAioHandleSharedCallbacks;
 extern const struct PgAioHandleSharedCallbacks aio_shared_buffer_readv_cb;
+extern const struct PgAioHandleSharedCallbacks aio_shared_buffer_writev_cb;
 extern const struct PgAioHandleSharedCallbacks aio_local_buffer_readv_cb;
+extern const struct PgAioHandleSharedCallbacks aio_local_buffer_writev_cb;
 
 
 /* upper limit for effective_io_concurrency */
diff --git a/src/backend/storage/aio/aio_subject.c b/src/backend/storage/aio/aio_subject.c
index 21341aae425..b2bd0c235e7 100644
--- a/src/backend/storage/aio/aio_subject.c
+++ b/src/backend/storage/aio/aio_subject.c
@@ -52,8 +52,10 @@ static const PgAioHandleSharedCallbacksEntry aio_shared_cbs[] = {
 	CALLBACK_ENTRY(ASC_MD_WRITEV, aio_md_writev_cb),
 
 	CALLBACK_ENTRY(ASC_SHARED_BUFFER_READ, aio_shared_buffer_readv_cb),
+	CALLBACK_ENTRY(ASC_SHARED_BUFFER_WRITE, aio_shared_buffer_writev_cb),
 
 	CALLBACK_ENTRY(ASC_LOCAL_BUFFER_READ, aio_local_buffer_readv_cb),
+	CALLBACK_ENTRY(ASC_LOCAL_BUFFER_WRITE, aio_local_buffer_writev_cb),
 #undef CALLBACK_ENTRY
 };
 
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 722e73eb7d0..0f94db19f9d 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -6437,6 +6437,44 @@ ReadBufferCompleteReadShared(Buffer buffer, int mode, bool failed)
 	return buf_failed;
 }
 
+static uint64
+ReadBufferCompleteWriteShared(Buffer buffer, bool release_lock, bool failed)
+{
+	BufferDesc *bufHdr;
+	bool		result = false;
+
+	Assert(BufferIsValid(buffer));
+
+	bufHdr = GetBufferDescriptor(buffer - 1);
+
+#ifdef USE_ASSERT_CHECKING
+	{
+		uint32		buf_state = pg_atomic_read_u32(&bufHdr->state);
+
+		Assert(buf_state & BM_VALID);
+		Assert(buf_state & BM_TAG_VALID);
+		Assert(buf_state & BM_IO_IN_PROGRESS);
+		Assert(buf_state & BM_DIRTY);
+	}
+#endif
+
+	/* AFIXME: implement track_io_timing */
+
+	TerminateBufferIO(bufHdr, /* clear_dirty = */ true,
+					  failed ? BM_IO_ERROR : 0,
+					  /* forget_owner = */ false,
+					  /* syncio = */ false);
+
+	/*
+	 * The initiator of IO is not managing the lock (i.e. called
+	 * LWLockDisown()), we are.
+	 */
+	if (release_lock)
+		LWLockReleaseUnowned(BufferDescriptorGetContentLock(bufHdr), LW_SHARED);
+
+	return result;
+}
+
 /*
  * Helper to prepare IO on shared buffers for execution, shared between reads
  * and writes.
@@ -6518,6 +6556,12 @@ shared_buffer_readv_prepare(PgAioHandle *ioh)
 	shared_buffer_prepare_common(ioh, false);
 }
 
+static void
+shared_buffer_writev_prepare(PgAioHandle *ioh)
+{
+	shared_buffer_prepare_common(ioh, true);
+}
+
 static PgAioResult
 shared_buffer_readv_complete(PgAioHandle *ioh, PgAioResult prior_result)
 {
@@ -6586,6 +6630,34 @@ buffer_readv_error(PgAioResult result, const PgAioSubjectData *subject_data, int
 	MemoryContextSwitchTo(oldContext);
 }
 
+static PgAioResult
+shared_buffer_writev_complete(PgAioHandle *ioh, PgAioResult prior_result)
+{
+	PgAioResult result = prior_result;
+	uint64	   *io_data;
+	uint8		io_data_len;
+
+	elog(DEBUG3, "%s: %d %d", __func__, prior_result.status, prior_result.result);
+
+	io_data = pgaio_io_get_io_data(ioh, &io_data_len);
+
+	/* FIXME: handle outright errors */
+
+	for (int io_data_off = 0; io_data_off < io_data_len; io_data_off++)
+	{
+		Buffer		buf = io_data[io_data_off];
+
+		/* FIXME: handle short writes / failures */
+		/* FIXME: ioh->scb_data.shared_buffer.release_lock */
+		ReadBufferCompleteWriteShared(buf,
+									  true,
+									  false);
+
+	}
+
+	return result;
+}
+
 /*
  * Helper to prepare IO on local buffers for execution, shared between reads
  * and writes.
@@ -6655,14 +6727,27 @@ local_buffer_readv_complete(PgAioHandle *ioh, PgAioResult prior_result)
 	return result;
 }
 
+static void
+local_buffer_writev_prepare(PgAioHandle *ioh)
+{
+	elog(ERROR, "not yet");
+}
+
 
 const struct PgAioHandleSharedCallbacks aio_shared_buffer_readv_cb = {
 	.prepare = shared_buffer_readv_prepare,
 	.complete = shared_buffer_readv_complete,
 	.error = buffer_readv_error,
 };
+const struct PgAioHandleSharedCallbacks aio_shared_buffer_writev_cb = {
+	.prepare = shared_buffer_writev_prepare,
+	.complete = shared_buffer_writev_complete,
+};
 const struct PgAioHandleSharedCallbacks aio_local_buffer_readv_cb = {
 	.prepare = local_buffer_readv_prepare,
 	.complete = local_buffer_readv_complete,
 	.error = buffer_readv_error,
 };
+const struct PgAioHandleSharedCallbacks aio_local_buffer_writev_cb = {
+	.prepare = local_buffer_writev_prepare,
+};
-- 
2.45.2.746.g06e570c0df.dirty

