From 9c9bbb42fb561fb2cf7d6d5183db5359d37e004e Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Fri, 8 Nov 2024 12:38:41 -0500
Subject: [PATCH v2 06/20] aio: Add worker method

---
 src/include/storage/aio.h                     |   5 +-
 src/include/storage/aio_internal.h            |   1 +
 src/include/storage/lwlocklist.h              |   1 +
 src/backend/storage/aio/aio.c                 |   2 +
 src/backend/storage/aio/aio_init.c            |  12 +-
 src/backend/storage/aio/method_worker.c       | 406 +++++++++++++++++-
 .../utils/activity/wait_event_names.txt       |   1 +
 src/backend/utils/misc/postgresql.conf.sample |   2 +-
 src/tools/pgindent/typedefs.list              |   3 +
 9 files changed, 423 insertions(+), 10 deletions(-)

diff --git a/src/include/storage/aio.h b/src/include/storage/aio.h
index b386dabc921..2e84abfea2d 100644
--- a/src/include/storage/aio.h
+++ b/src/include/storage/aio.h
@@ -322,11 +322,12 @@ extern void assign_io_method(int newval, void *extra);
 typedef enum IoMethod
 {
 	IOMETHOD_SYNC = 0,
+	IOMETHOD_WORKER,
 } IoMethod;
 
 
-/* We'll default to synchronous execution. */
-#define DEFAULT_IO_METHOD IOMETHOD_SYNC
+/* We'll default to bgworker. */
+#define DEFAULT_IO_METHOD IOMETHOD_WORKER
 
 
 /* GUCs */
diff --git a/src/include/storage/aio_internal.h b/src/include/storage/aio_internal.h
index d600d45b4fd..f974c4accf5 100644
--- a/src/include/storage/aio_internal.h
+++ b/src/include/storage/aio_internal.h
@@ -234,6 +234,7 @@ extern const char *pgaio_io_get_state_name(PgAioHandle *ioh);
 
 /* Declarations for the tables of function pointers exposed by each IO method. */
 extern const IoMethodOps pgaio_sync_ops;
+extern const IoMethodOps pgaio_worker_ops;
 
 extern const IoMethodOps *pgaio_impl;
 extern PgAioCtl *aio_ctl;
diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h
index 6a2f64c54fb..8d00d62e208 100644
--- a/src/include/storage/lwlocklist.h
+++ b/src/include/storage/lwlocklist.h
@@ -83,3 +83,4 @@ PG_LWLOCK(49, WALSummarizer)
 PG_LWLOCK(50, DSMRegistry)
 PG_LWLOCK(51, InjectionPoint)
 PG_LWLOCK(52, SerialControl)
+PG_LWLOCK(53, AioWorkerSubmissionQueue)
diff --git a/src/backend/storage/aio/aio.c b/src/backend/storage/aio/aio.c
index 3e2ff9718ca..e4c9d439ddd 100644
--- a/src/backend/storage/aio/aio.c
+++ b/src/backend/storage/aio/aio.c
@@ -57,6 +57,7 @@ static PgAioHandle *pgaio_io_from_ref(PgAioHandleRef *ior, uint64 *ref_generatio
 /* Options for io_method. */
 const struct config_enum_entry io_method_options[] = {
 	{"sync", IOMETHOD_SYNC, false},
+	{"worker", IOMETHOD_WORKER, false},
 	{NULL, 0, false}
 };
 
@@ -73,6 +74,7 @@ PgAioPerBackend *my_aio;
 
 static const IoMethodOps *pgaio_ops_table[] = {
 	[IOMETHOD_SYNC] = &pgaio_sync_ops,
+	[IOMETHOD_WORKER] = &pgaio_worker_ops,
 };
 
 
diff --git a/src/backend/storage/aio/aio_init.c b/src/backend/storage/aio/aio_init.c
index 0c2d77ec8ab..23adc5308e5 100644
--- a/src/backend/storage/aio/aio_init.c
+++ b/src/backend/storage/aio/aio_init.c
@@ -19,6 +19,7 @@
 #include "storage/aio_init.h"
 #include "storage/aio_internal.h"
 #include "storage/bufmgr.h"
+#include "storage/io_worker.h"
 #include "storage/proc.h"
 #include "storage/shmem.h"
 
@@ -37,6 +38,11 @@ AioCtlShmemSize(void)
 static uint32
 AioProcs(void)
 {
+	/*
+	 * While AIO workers don't need their own AIO context, we can't currently
+	 * guarantee nothing gets assigned to the a ProcNumber for an IO worker if
+	 * we just subtracted MAX_IO_WORKERS.
+	 */
 	return MaxBackends + NUM_AUXILIARY_PROCS;
 }
 
@@ -209,6 +215,9 @@ pgaio_init_backend(void)
 	/* shouldn't be initialized twice */
 	Assert(!my_aio);
 
+	if (MyBackendType == B_IO_WORKER)
+		return;
+
 	if (MyProc == NULL || MyProcNumber >= AioProcs())
 		elog(ERROR, "aio requires a normal PGPROC");
 
@@ -221,6 +230,5 @@ pgaio_init_backend(void)
 bool
 pgaio_workers_enabled(void)
 {
-	/* placeholder for future commit */
-	return false;
+	return io_method == IOMETHOD_WORKER;
 }
diff --git a/src/backend/storage/aio/method_worker.c b/src/backend/storage/aio/method_worker.c
index 0ea749a8ba8..a508f53ebd4 100644
--- a/src/backend/storage/aio/method_worker.c
+++ b/src/backend/storage/aio/method_worker.c
@@ -1,7 +1,22 @@
 /*-------------------------------------------------------------------------
  *
  * method_worker.c
- *    AIO implementation using workers
+ *    AIO - perform AIO using worker processes
+ *
+ * Worker processes consume IOs from a shared memory submission queue, run
+ * traditional synchronous system calls, and perform the shared completion
+ * handling immediately.  Client code submits most requests by pushing IOs
+ * into the submission queue, and waits (if necessary) using condition
+ * variables.  Some IOs cannot be performed in another process due to lack of
+ * infrastructure for reopening the file, and must processed synchronously by
+ * the client code when submitted.
+ *
+ * So that the submitter can make just one system call when submitting a batch
+ * of IOs, wakeups "fan out"; each woken backend can wake two more.  XXX This
+ * could be improved by using futexes instead of latches to wake N waiters.
+ *
+ * This method of AIO is available in all builds on all operating systems, and
+ * is the default.
  *
  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
@@ -16,23 +31,323 @@
 
 #include "libpq/pqsignal.h"
 #include "miscadmin.h"
+#include "port/pg_bitutils.h"
 #include "postmaster/auxprocess.h"
 #include "postmaster/interrupt.h"
+#include "storage/aio.h"
+#include "storage/aio_internal.h"
 #include "storage/io_worker.h"
 #include "storage/ipc.h"
 #include "storage/latch.h"
 #include "storage/proc.h"
 #include "tcop/tcopprot.h"
+#include "utils/ps_status.h"
 #include "utils/wait_event.h"
 
 
+/* How many workers should each worker wake up if needed? */
+#define IO_WORKER_WAKEUP_FANOUT 2
+
+
+typedef struct AioWorkerSubmissionQueue
+{
+	uint32		size;
+	uint32		mask;
+	uint32		head;
+	uint32		tail;
+	uint32		ios[FLEXIBLE_ARRAY_MEMBER];
+} AioWorkerSubmissionQueue;
+
+typedef struct AioWorkerSlot
+{
+	Latch	   *latch;
+	bool		in_use;
+} AioWorkerSlot;
+
+typedef struct AioWorkerControl
+{
+	uint64		idle_worker_mask;
+	AioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER];
+} AioWorkerControl;
+
+
+static size_t pgaio_worker_shmem_size(void);
+static void pgaio_worker_shmem_init(bool first_time);
+
+static bool pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh);
+static int	pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios);
+
+
+const IoMethodOps pgaio_worker_ops = {
+	.shmem_size = pgaio_worker_shmem_size,
+	.shmem_init = pgaio_worker_shmem_init,
+
+	.needs_synchronous_execution = pgaio_worker_needs_synchronous_execution,
+	.submit = pgaio_worker_submit,
+};
+
+
 int			io_workers = 3;
+static int	io_worker_queue_size = 64;
 
+static int	MyIoWorkerId;
+
+
+static AioWorkerSubmissionQueue *io_worker_submission_queue;
+static AioWorkerControl *io_worker_control;
+
+
+static size_t
+pgaio_worker_shmem_size(void)
+{
+	return
+		offsetof(AioWorkerSubmissionQueue, ios) +
+		sizeof(uint32) * io_worker_queue_size +
+		offsetof(AioWorkerControl, workers) +
+		sizeof(AioWorkerSlot) * io_workers;
+}
+
+static void
+pgaio_worker_shmem_init(bool first_time)
+{
+	bool		found;
+	int			size;
+
+	/* Round size up to next power of two so we can make a mask. */
+	size = pg_nextpower2_32(io_worker_queue_size);
+
+	io_worker_submission_queue =
+		ShmemInitStruct("AioWorkerSubmissionQueue",
+						offsetof(AioWorkerSubmissionQueue, ios) +
+						sizeof(uint32) * size,
+						&found);
+	if (!found)
+	{
+		io_worker_submission_queue->size = size;
+		io_worker_submission_queue->head = 0;
+		io_worker_submission_queue->tail = 0;
+	}
+
+	io_worker_control =
+		ShmemInitStruct("AioWorkerControl",
+						offsetof(AioWorkerControl, workers) +
+						sizeof(AioWorkerSlot) * io_workers,
+						&found);
+	if (!found)
+	{
+		io_worker_control->idle_worker_mask = 0;
+		for (int i = 0; i < io_workers; ++i)
+		{
+			io_worker_control->workers[i].latch = NULL;
+			io_worker_control->workers[i].in_use = false;
+		}
+	}
+}
+
+
+static int
+pgaio_choose_idle_worker(void)
+{
+	int			worker;
+
+	if (io_worker_control->idle_worker_mask == 0)
+		return -1;
+
+	/* Find the lowest bit position, and clear it. */
+	worker = pg_rightmost_one_pos64(io_worker_control->idle_worker_mask);
+	io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << worker);
+
+	return worker;
+}
+
+static bool
+pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
+{
+	AioWorkerSubmissionQueue *queue;
+	uint32		new_head;
+
+	queue = io_worker_submission_queue;
+	new_head = (queue->head + 1) & (queue->size - 1);
+	if (new_head == queue->tail)
+	{
+		elog(DEBUG1, "full");
+		return false;			/* full */
+	}
+
+	queue->ios[queue->head] = pgaio_io_get_id(ioh);
+	queue->head = new_head;
+
+	return true;
+}
+
+static uint32
+pgaio_worker_submission_queue_consume(void)
+{
+	AioWorkerSubmissionQueue *queue;
+	uint32		result;
+
+	queue = io_worker_submission_queue;
+	if (queue->tail == queue->head)
+		return UINT32_MAX;		/* empty */
+
+	result = queue->ios[queue->tail];
+	queue->tail = (queue->tail + 1) & (queue->size - 1);
+
+	return result;
+}
+
+static uint32
+pgaio_worker_submission_queue_depth(void)
+{
+	uint32		head;
+	uint32		tail;
+
+	head = io_worker_submission_queue->head;
+	tail = io_worker_submission_queue->tail;
+
+	if (tail > head)
+		head += io_worker_submission_queue->size;
+
+	Assert(head >= tail);
+
+	return head - tail;
+}
+
+static void
+pgaio_worker_submit_internal(int nios, PgAioHandle *ios[])
+{
+	PgAioHandle *synchronous_ios[PGAIO_SUBMIT_BATCH_SIZE];
+	int			nsync = 0;
+	Latch	   *wakeup = NULL;
+	int			worker;
+
+	Assert(nios <= PGAIO_SUBMIT_BATCH_SIZE);
+
+	LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
+	for (int i = 0; i < nios; ++i)
+	{
+		Assert(!pgaio_worker_needs_synchronous_execution(ios[i]));
+		if (!pgaio_worker_submission_queue_insert(ios[i]))
+		{
+			/*
+			 * We'll do it synchronously, but only after we've sent as many as
+			 * we can to workers, to maximize concurrency.
+			 */
+			synchronous_ios[nsync++] = ios[i];
+			continue;
+		}
+
+		if (wakeup == NULL)
+		{
+			/* Choose an idle worker to wake up if we haven't already. */
+			worker = pgaio_choose_idle_worker();
+			if (worker >= 0)
+				wakeup = io_worker_control->workers[worker].latch;
+
+			ereport(DEBUG3,
+					errmsg("submission for io:%d choosing worker %d, latch %p",
+						   pgaio_io_get_id(ios[i]), worker, wakeup),
+					errhidestmt(true), errhidecontext(true));
+		}
+	}
+	LWLockRelease(AioWorkerSubmissionQueueLock);
+
+	if (wakeup)
+		SetLatch(wakeup);
+
+	/* Run whatever is left synchronously. */
+	if (nsync > 0)
+	{
+		for (int i = 0; i < nsync; ++i)
+		{
+			pgaio_io_perform_synchronously(synchronous_ios[i]);
+		}
+	}
+}
+
+static bool
+pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh)
+{
+	return
+		!IsUnderPostmaster
+		|| ioh->flags & AHF_REFERENCES_LOCAL
+		|| !pgaio_io_can_reopen(ioh);
+}
+
+static int
+pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
+{
+	for (int i = 0; i < num_staged_ios; i++)
+	{
+		PgAioHandle *ioh = staged_ios[i];
+
+		pgaio_io_prepare_submit(ioh);
+	}
+
+	pgaio_worker_submit_internal(num_staged_ios, staged_ios);
+
+	return num_staged_ios;
+}
+
+/*
+ * shmem_exit() callback that releases the worker's slot in io_worker_control.
+ */
+static void
+pgaio_worker_die(int code, Datum arg)
+{
+	LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
+	Assert(io_worker_control->workers[MyIoWorkerId].in_use);
+	Assert(io_worker_control->workers[MyIoWorkerId].latch == MyLatch);
+
+	io_worker_control->workers[MyIoWorkerId].in_use = false;
+	io_worker_control->workers[MyIoWorkerId].latch = NULL;
+	LWLockRelease(AioWorkerSubmissionQueueLock);
+}
+
+/*
+ * Register the worker in shared memory, assign MyWorkerId and register a
+ * shutdown callback to release registration.
+ */
+static void
+pgaio_worker_register(void)
+{
+	MyIoWorkerId = -1;
+
+	/*
+	 * XXX: This could do with more fine-grained locking. But it's also not
+	 * very common for the number of workers to change at the moment...
+	 */
+	LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
+
+	for (int i = 0; i < io_workers; ++i)
+	{
+		if (!io_worker_control->workers[i].in_use)
+		{
+			Assert(io_worker_control->workers[i].latch == NULL);
+			io_worker_control->workers[i].in_use = true;
+			MyIoWorkerId = i;
+			break;
+		}
+		else
+			Assert(io_worker_control->workers[i].latch != NULL);
+	}
+
+	if (MyIoWorkerId == -1)
+		elog(ERROR, "couldn't find a free worker slot");
+
+	io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
+	io_worker_control->workers[MyIoWorkerId].latch = MyLatch;
+	LWLockRelease(AioWorkerSubmissionQueueLock);
+
+	on_shmem_exit(pgaio_worker_die, 0);
+}
 
 void
 IoWorkerMain(char *startup_data, size_t startup_data_len)
 {
 	sigjmp_buf	local_sigjmp_buf;
+	volatile PgAioHandle *ioh = NULL;
+	char		cmd[128];
 
 	MyBackendType = B_IO_WORKER;
 	AuxiliaryProcessMainCommon();
@@ -53,6 +368,11 @@ IoWorkerMain(char *startup_data, size_t startup_data_len)
 	pqsignal(SIGUSR2, SignalHandlerForShutdownRequest);
 	sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
 
+	pgaio_worker_register();
+
+	sprintf(cmd, "io worker: %d", MyIoWorkerId);
+	set_ps_display(cmd);
+
 	/* see PostgresMain() */
 	if (sigsetjmp(local_sigjmp_buf, 1) != 0)
 	{
@@ -66,8 +386,26 @@ IoWorkerMain(char *startup_data, size_t startup_data_len)
 		LWLockReleaseAll();
 
 		/* TODO: recover from IO errors */
+		if (ioh != NULL)
+		{
+#if 0
+			/* EINTR is treated as a retryable error */
+			pgaio_process_io_completion(unvolatize(PgAioInProgress *, io),
+										EINTR);
+#endif
+		}
 
 		EmitErrorReport();
+
+		/* FIXME: should probably be a before-shmem-exit instead */
+		LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
+		Assert(io_worker_control->workers[MyIoWorkerId].in_use);
+		Assert(io_worker_control->workers[MyIoWorkerId].latch == MyLatch);
+
+		io_worker_control->workers[MyIoWorkerId].in_use = false;
+		io_worker_control->workers[MyIoWorkerId].latch = NULL;
+		LWLockRelease(AioWorkerSubmissionQueueLock);
+
 		proc_exit(1);
 	}
 
@@ -76,10 +414,68 @@ IoWorkerMain(char *startup_data, size_t startup_data_len)
 
 	while (!ShutdownRequestPending)
 	{
-		WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
-				  WAIT_EVENT_IO_WORKER_MAIN);
-		ResetLatch(MyLatch);
-		CHECK_FOR_INTERRUPTS();
+		uint32		io_index;
+		Latch	   *latches[IO_WORKER_WAKEUP_FANOUT];
+		int			nlatches = 0;
+		int			nwakeups = 0;
+		int			worker;
+
+		/* Try to get a job to do. */
+		LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
+		if ((io_index = pgaio_worker_submission_queue_consume()) == UINT32_MAX)
+		{
+			/* Nothing to do.  Mark self idle. */
+			/*
+			 * XXX: Invent some kind of back pressure to reduce useless
+			 * wakeups?
+			 */
+			io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
+		}
+		else
+		{
+			/* Got one.  Clear idle flag. */
+			io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId);
+
+			/* See if we can wake up some peers. */
+			nwakeups = Min(pgaio_worker_submission_queue_depth(),
+						   IO_WORKER_WAKEUP_FANOUT);
+			for (int i = 0; i < nwakeups; ++i)
+			{
+				if ((worker = pgaio_choose_idle_worker()) < 0)
+					break;
+				latches[nlatches++] = io_worker_control->workers[worker].latch;
+			}
+#if 0
+			if (nwakeups > 0)
+				elog(LOG, "wake %d", nwakeups);
+#endif
+		}
+		LWLockRelease(AioWorkerSubmissionQueueLock);
+
+		for (int i = 0; i < nlatches; ++i)
+			SetLatch(latches[i]);
+
+		if (io_index != UINT32_MAX)
+		{
+			ioh = &aio_ctl->io_handles[io_index];
+
+			ereport(DEBUG3,
+					errmsg("worker processing io:%d",
+						   pgaio_io_get_id(unvolatize(PgAioHandle *, ioh))),
+					errhidestmt(true), errhidecontext(true));
+
+			pgaio_io_reopen(unvolatize(PgAioHandle *, ioh));
+			pgaio_io_perform_synchronously(unvolatize(PgAioHandle *, ioh));
+
+			ioh = NULL;
+		}
+		else
+		{
+			WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
+					  WAIT_EVENT_IO_WORKER_MAIN);
+			ResetLatch(MyLatch);
+			CHECK_FOR_INTERRUPTS();
+		}
 	}
 
 	proc_exit(0);
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index 330a32a90ce..8c3aafd8a18 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -349,6 +349,7 @@ WALSummarizer	"Waiting to read or update WAL summarization state."
 DSMRegistry	"Waiting to read or update the dynamic shared memory registry."
 InjectionPoint	"Waiting to read or update information related to injection points."
 SerialControl	"Waiting to read or update shared <filename>pg_serial</filename> state."
+AioWorkerSubmissionQueue	"Waiting to access AIO worker submission queue."
 
 #
 # END OF PREDEFINED LWLOCKS (DO NOT CHANGE THIS LINE)
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 0f80a0680ec..5893eb29228 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -842,7 +842,7 @@
 # WIP AIO GUC docs
 #------------------------------------------------------------------------------
 
-#io_method = sync			# (change requires restart)
+#io_method = worker			# (change requires restart)
 #io_workers = 3				# 1-32;
 
 #io_max_concurrency = 32		# Max number of IOs that may be in
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index bc1acbb98ee..9b9c8f0d1fc 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -54,6 +54,9 @@ AggStrategy
 AggTransInfo
 Aggref
 AggregateInstrumentation
+AioWorkerControl
+AioWorkerSlot
+AioWorkerSubmissionQueue
 AlenState
 Alias
 AllocBlock
-- 
2.45.2.746.g06e570c0df.dirty

