From bc2016ad468094ccc09507d3ddd755f5c7692d4b Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Wed, 4 Sep 2024 15:27:00 -0400
Subject: [PATCH v2.1 09/20] aio: Add worker method

---
 src/include/storage/aio.h                     |   5 +-
 src/include/storage/aio_internal.h            |   1 +
 src/include/storage/lwlocklist.h              |   1 +
 src/backend/postmaster/postmaster.c           |   3 +-
 src/backend/storage/aio/aio.c                 |   2 +
 src/backend/storage/aio/aio_init.c            |  15 +
 src/backend/storage/aio/method_worker.c       | 404 +++++++++++++++++-
 .../utils/activity/wait_event_names.txt       |   1 +
 src/backend/utils/misc/postgresql.conf.sample |   2 +-
 src/tools/pgindent/typedefs.list              |   3 +
 10 files changed, 428 insertions(+), 9 deletions(-)

diff --git a/src/include/storage/aio.h b/src/include/storage/aio.h
index c0a59f47bc0..1e4c8807c71 100644
--- a/src/include/storage/aio.h
+++ b/src/include/storage/aio.h
@@ -332,11 +332,12 @@ extern void assign_io_method(int newval, void *extra);
 typedef enum IoMethod
 {
 	IOMETHOD_SYNC = 0,
+	IOMETHOD_WORKER,
 } IoMethod;
 
 
-/* We'll default to synchronous execution. */
-#define DEFAULT_IO_METHOD IOMETHOD_SYNC
+/* We'll default to bgworker. */
+#define DEFAULT_IO_METHOD IOMETHOD_WORKER
 
 
 /* GUCs */
diff --git a/src/include/storage/aio_internal.h b/src/include/storage/aio_internal.h
index 82bce1cf27c..b6f44a875dd 100644
--- a/src/include/storage/aio_internal.h
+++ b/src/include/storage/aio_internal.h
@@ -264,6 +264,7 @@ extern const char *pgaio_io_get_state_name(PgAioHandle *ioh);
 
 /* Declarations for the tables of function pointers exposed by each IO method. */
 extern const IoMethodOps pgaio_sync_ops;
+extern const IoMethodOps pgaio_worker_ops;
 
 extern const IoMethodOps *pgaio_impl;
 extern PgAioCtl *aio_ctl;
diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h
index 88dc79b2bd6..7aaccf69d1e 100644
--- a/src/include/storage/lwlocklist.h
+++ b/src/include/storage/lwlocklist.h
@@ -84,3 +84,4 @@ PG_LWLOCK(50, DSMRegistry)
 PG_LWLOCK(51, InjectionPoint)
 PG_LWLOCK(52, SerialControl)
 PG_LWLOCK(53, WaitLSN)
+PG_LWLOCK(54, AioWorkerSubmissionQueue)
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 3d970374733..76440321d18 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -4222,7 +4222,8 @@ maybe_reap_io_worker(int pid)
 static void
 maybe_adjust_io_workers(void)
 {
-	/* ATODO: This will need to check if io_method == worker */
+	if (!pgaio_workers_enabled())
+		return;
 
 	/*
 	 * If we're in final shutting down state, then we're just waiting for all
diff --git a/src/backend/storage/aio/aio.c b/src/backend/storage/aio/aio.c
index b5370330620..0ca641d9322 100644
--- a/src/backend/storage/aio/aio.c
+++ b/src/backend/storage/aio/aio.c
@@ -36,6 +36,7 @@ static void pgaio_bounce_buffer_wait_for_free(void);
 /* Options for io_method. */
 const struct config_enum_entry io_method_options[] = {
 	{"sync", IOMETHOD_SYNC, false},
+	{"worker", IOMETHOD_WORKER, false},
 	{NULL, 0, false}
 };
 
@@ -53,6 +54,7 @@ PgAioPerBackend *my_aio;
 
 static const IoMethodOps *pgaio_ops_table[] = {
 	[IOMETHOD_SYNC] = &pgaio_sync_ops,
+	[IOMETHOD_WORKER] = &pgaio_worker_ops,
 };
 
 
diff --git a/src/backend/storage/aio/aio_init.c b/src/backend/storage/aio/aio_init.c
index e25bdf1dba0..ca3513019a6 100644
--- a/src/backend/storage/aio/aio_init.c
+++ b/src/backend/storage/aio/aio_init.c
@@ -19,6 +19,7 @@
 #include "storage/aio_init.h"
 #include "storage/aio_internal.h"
 #include "storage/bufmgr.h"
+#include "storage/io_worker.h"
 #include "storage/proc.h"
 #include "storage/shmem.h"
 
@@ -37,6 +38,11 @@ AioCtlShmemSize(void)
 static uint32
 AioProcs(void)
 {
+	/*
+	 * While AIO workers don't need their own AIO context, we can't currently
+	 * guarantee nothing gets assigned to the a ProcNumber for an IO worker if
+	 * we just subtracted MAX_IO_WORKERS.
+	 */
 	return MaxBackends + NUM_AUXILIARY_PROCS;
 }
 
@@ -333,6 +339,9 @@ pgaio_postmaster_child_init(void)
 	/* shouldn't be initialized twice */
 	Assert(!my_aio);
 
+	if (MyBackendType == B_IO_WORKER)
+		return;
+
 	if (MyProc == NULL || MyProcNumber >= AioProcs())
 		elog(ERROR, "aio requires a normal PGPROC");
 
@@ -348,3 +357,9 @@ pgaio_postmaster_child_init_local(void)
 	if (pgaio_impl->postmaster_child_init_local)
 		pgaio_impl->postmaster_child_init_local();
 }
+
+bool
+pgaio_workers_enabled(void)
+{
+	return io_method == IOMETHOD_WORKER;
+}
diff --git a/src/backend/storage/aio/method_worker.c b/src/backend/storage/aio/method_worker.c
index 5df2eea4a03..a6c21df2ea5 100644
--- a/src/backend/storage/aio/method_worker.c
+++ b/src/backend/storage/aio/method_worker.c
@@ -3,6 +3,21 @@
  * method_worker.c
  *    AIO implementation using workers
  *
+ * Worker processes consume IOs from a shared memory submission queue, run
+ * traditional synchronous system calls, and perform the shared completion
+ * handling immediately.  Client code submits most requests by pushing IOs
+ * into the submission queue, and waits (if necessary) using condition
+ * variables.  Some IOs cannot be performed in another process due to lack of
+ * infrastructure for reopening the file, and must processed synchronously by
+ * the client code when submitted.
+ *
+ * So that the submitter can make just one system call when submitting a batch
+ * of IOs, wakeups "fan out"; each woken backend can wake two more.  XXX This
+ * could be improved by using futexes instead of latches to wake N waiters.
+ *
+ * This method of AIO is available in all builds on all operating systems, and
+ * is the default.
+ *
  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
@@ -16,24 +31,290 @@
 
 #include "libpq/pqsignal.h"
 #include "miscadmin.h"
+#include "port/pg_bitutils.h"
+#include "postmaster/auxprocess.h"
 #include "postmaster/interrupt.h"
+#include "storage/aio.h"
+#include "storage/aio_internal.h"
 #include "storage/io_worker.h"
 #include "storage/ipc.h"
 #include "storage/latch.h"
 #include "storage/proc.h"
 #include "tcop/tcopprot.h"
 #include "utils/wait_event.h"
+#include "utils/ps_status.h"
+
+
+/* How many workers should each worker wake up if needed? */
+#define IO_WORKER_WAKEUP_FANOUT 2
+
+
+typedef struct AioWorkerSubmissionQueue
+{
+	uint32		size;
+	uint32		mask;
+	uint32		head;
+	uint32		tail;
+	uint32		ios[FLEXIBLE_ARRAY_MEMBER];
+} AioWorkerSubmissionQueue;
+
+typedef struct AioWorkerSlot
+{
+	Latch	   *latch;
+	bool		in_use;
+} AioWorkerSlot;
+
+typedef struct AioWorkerControl
+{
+	uint64		idle_worker_mask;
+	AioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER];
+} AioWorkerControl;
+
+
+static size_t pgaio_worker_shmem_size(void);
+static void pgaio_worker_shmem_init(bool first_time);
+static void pgaio_worker_postmaster_child_init_local(void);
+
+static bool	pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh);
+static int	pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios);
+
+
+const IoMethodOps pgaio_worker_ops = {
+	.shmem_size = pgaio_worker_shmem_size,
+	.shmem_init = pgaio_worker_shmem_init,
+	.postmaster_child_init_local = pgaio_worker_postmaster_child_init_local,
+
+	.needs_synchronous_execution = pgaio_worker_needs_synchronous_execution,
+	.submit = pgaio_worker_submit,
+#if 0
+	.wait_one = pgaio_worker_wait_one,
+	.retry = pgaio_worker_io_retry,
+	.drain = pgaio_worker_drain,
+#endif
+
+	.can_scatter_gather_direct = true,
+	.can_scatter_gather_buffered = true
+};
 
 
 int			io_workers = 3;
+static int	io_worker_queue_size = 64;
 
+static int	MyIoWorkerId;
+
+
+static AioWorkerSubmissionQueue *io_worker_submission_queue;
+static AioWorkerControl *io_worker_control;
+
+
+static size_t
+pgaio_worker_shmem_size(void)
+{
+	return
+		offsetof(AioWorkerSubmissionQueue, ios) +
+		sizeof(uint32) * io_worker_queue_size +
+		offsetof(AioWorkerControl, workers) +
+		sizeof(AioWorkerSlot) * io_workers;
+}
+
+static void
+pgaio_worker_shmem_init(bool first_time)
+{
+	bool		found;
+	int			size;
+
+	/* Round size up to next power of two so we can make a mask. */
+	size = pg_nextpower2_32(io_worker_queue_size);
+
+	io_worker_submission_queue =
+		ShmemInitStruct("AioWorkerSubmissionQueue",
+						offsetof(AioWorkerSubmissionQueue, ios) +
+						sizeof(uint32) * size,
+						&found);
+	if (!found)
+	{
+		io_worker_submission_queue->size = size;
+		io_worker_submission_queue->head = 0;
+		io_worker_submission_queue->tail = 0;
+	}
+
+	io_worker_control =
+		ShmemInitStruct("AioWorkerControl",
+						offsetof(AioWorkerControl, workers) +
+						sizeof(AioWorkerSlot) * io_workers,
+						&found);
+	if (!found)
+	{
+		io_worker_control->idle_worker_mask = 0;
+		for (int i = 0; i < io_workers; ++i)
+		{
+			io_worker_control->workers[i].latch = NULL;
+			io_worker_control->workers[i].in_use = false;
+		}
+	}
+}
+
+static void
+pgaio_worker_postmaster_child_init_local(void)
+{
+}
+
+
+static int
+pgaio_choose_idle_worker(void)
+{
+	int			worker;
+
+	if (io_worker_control->idle_worker_mask == 0)
+		return -1;
+
+	/* Find the lowest bit position, and clear it. */
+	worker = pg_rightmost_one_pos64(io_worker_control->idle_worker_mask);
+	io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << worker);
+
+	return worker;
+}
+
+static bool
+pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
+{
+	AioWorkerSubmissionQueue *queue;
+	uint32		new_head;
+
+	queue = io_worker_submission_queue;
+	new_head = (queue->head + 1) & (queue->size - 1);
+	if (new_head == queue->tail)
+	{
+		elog(DEBUG1, "full");
+		return false;			/* full */
+	}
+
+	queue->ios[queue->head] = pgaio_io_get_id(ioh);
+	queue->head = new_head;
+
+	return true;
+}
+
+static uint32
+pgaio_worker_submission_queue_consume(void)
+{
+	AioWorkerSubmissionQueue *queue;
+	uint32		result;
+
+	queue = io_worker_submission_queue;
+	if (queue->tail == queue->head)
+		return UINT32_MAX;		/* empty */
+
+	result = queue->ios[queue->tail];
+	queue->tail = (queue->tail + 1) & (queue->size - 1);
+
+	return result;
+}
+
+static uint32
+pgaio_worker_submission_queue_depth(void)
+{
+	uint32		head;
+	uint32		tail;
+
+	head = io_worker_submission_queue->head;
+	tail = io_worker_submission_queue->tail;
+
+	if (tail > head)
+		head += io_worker_submission_queue->size;
+
+	Assert(head >= tail);
+
+	return head - tail;
+}
+
+static void
+pgaio_worker_submit_internal(int nios, PgAioHandle *ios[])
+{
+	PgAioHandle *synchronous_ios[PGAIO_SUBMIT_BATCH_SIZE];
+	int			nsync = 0;
+	Latch	   *wakeup = NULL;
+	int			worker;
+
+	Assert(nios <= PGAIO_SUBMIT_BATCH_SIZE);
+
+	LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
+	for (int i = 0; i < nios; ++i)
+	{
+		Assert(!pgaio_worker_needs_synchronous_execution(ios[i]));
+		if (!pgaio_worker_submission_queue_insert(ios[i]))
+		{
+			/*
+			 * We'll do it synchronously, but only after we've sent as many as
+			 * we can to workers, to maximize concurrency.
+			 */
+			synchronous_ios[nsync++] = ios[i];
+			continue;
+		}
+
+		if (wakeup == NULL)
+		{
+			/* Choose an idle worker to wake up if we haven't already. */
+			worker = pgaio_choose_idle_worker();
+			if (worker >= 0)
+				wakeup = io_worker_control->workers[worker].latch;
+
+			ereport(DEBUG3,
+					errmsg("submission for io:%d choosing worker %d, latch %p",
+						   pgaio_io_get_id(ios[i]), worker, wakeup),
+					errhidestmt(true), errhidecontext(true));
+		}
+	}
+	LWLockRelease(AioWorkerSubmissionQueueLock);
+
+	if (wakeup)
+		SetLatch(wakeup);
+
+	/* Run whatever is left synchronously. */
+	if (nsync > 0)
+	{
+		for (int i = 0; i < nsync; ++i)
+		{
+			pgaio_io_perform_synchronously(synchronous_ios[i]);
+		}
+	}
+}
+
+static bool
+pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh)
+{
+	return
+		!IsUnderPostmaster
+		|| ioh->flags & AHF_REFERENCES_LOCAL
+		|| pgaio_io_can_reopen(ioh);
+}
+
+static int
+pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
+{
+	int			nios = 0;
+
+	for (int i = 0; i < num_staged_ios; i++)
+	{
+		PgAioHandle *ioh = staged_ios[i];
+
+		pgaio_io_prepare_submit(ioh);
+	}
+
+	pgaio_worker_submit_internal(num_staged_ios, staged_ios);
+
+	return nios;
+}
 
 void
 IoWorkerMain(char *startup_data, size_t startup_data_len)
 {
 	sigjmp_buf	local_sigjmp_buf;
+	volatile PgAioHandle *ioh = NULL;
+	char		cmd[128];
 
 	MyBackendType = B_IO_WORKER;
+	AuxiliaryProcessMainCommon();
 
 	/* TODO review all signals */
 	pqsignal(SIGHUP, SignalHandlerForConfigReload);
@@ -49,7 +330,34 @@ IoWorkerMain(char *startup_data, size_t startup_data_len)
 	pqsignal(SIGPIPE, SIG_IGN);
 	pqsignal(SIGUSR1, procsignal_sigusr1_handler);
 	pqsignal(SIGUSR2, SignalHandlerForShutdownRequest);
-	sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
+
+	/* FIXME: locking */
+	MyIoWorkerId = -1;
+
+	LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
+
+	for (int i = 0; i < io_workers; ++i)
+	{
+		if (!io_worker_control->workers[i].in_use)
+		{
+			Assert(io_worker_control->workers[i].latch == NULL);
+			io_worker_control->workers[i].in_use = true;
+			MyIoWorkerId = i;
+			break;
+		}
+		else
+			Assert(io_worker_control->workers[i].latch != NULL);
+	}
+
+	if (MyIoWorkerId == -1)
+		elog(ERROR, "couldn't find a free worker slot");
+
+	io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
+	io_worker_control->workers[MyIoWorkerId].latch = MyLatch;
+	LWLockRelease(AioWorkerSubmissionQueueLock);
+
+	sprintf(cmd, "worker: %d", MyIoWorkerId);
+	set_ps_display(cmd);
 
 	/* see PostgresMain() */
 	if (sigsetjmp(local_sigjmp_buf, 1) != 0)
@@ -64,21 +372,107 @@ IoWorkerMain(char *startup_data, size_t startup_data_len)
 		LWLockReleaseAll();
 
 		/* TODO: recover from IO errors */
+		if (ioh != NULL)
+		{
+#if 0
+			/* EINTR is treated as a retryable error */
+			pgaio_process_io_completion(unvolatize(PgAioInProgress *, io),
+										EINTR);
+#endif
+		}
 
 		EmitErrorReport();
+
+		/* FIXME: should probably be a before-shmem-exit instead */
+		LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
+		Assert(io_worker_control->workers[MyIoWorkerId].in_use);
+		Assert(io_worker_control->workers[MyIoWorkerId].latch == MyLatch);
+
+		io_worker_control->workers[MyIoWorkerId].in_use = false;
+		io_worker_control->workers[MyIoWorkerId].latch = NULL;
+		LWLockRelease(AioWorkerSubmissionQueueLock);
+
 		proc_exit(1);
 	}
 
 	/* We can now handle ereport(ERROR) */
 	PG_exception_stack = &local_sigjmp_buf;
 
+	sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
+
 	while (!ShutdownRequestPending)
 	{
-		WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
-				  WAIT_EVENT_IO_WORKER_MAIN);
-		ResetLatch(MyLatch);
-		CHECK_FOR_INTERRUPTS();
+		uint32		io_index;
+		Latch	   *latches[IO_WORKER_WAKEUP_FANOUT];
+		int			nlatches = 0;
+		int			nwakeups = 0;
+		int			worker;
+
+		/* Try to get a job to do. */
+		LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
+		if ((io_index = pgaio_worker_submission_queue_consume()) == UINT32_MAX)
+		{
+			/* Nothing to do.  Mark self idle. */
+			/*
+			 * XXX: Invent some kind of back pressure to reduce useless
+			 * wakeups?
+			 */
+			io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
+		}
+		else
+		{
+			/* Got one.  Clear idle flag. */
+			io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId);
+
+			/* See if we can wake up some peers. */
+			nwakeups = Min(pgaio_worker_submission_queue_depth(),
+						   IO_WORKER_WAKEUP_FANOUT);
+			for (int i = 0; i < nwakeups; ++i)
+			{
+				if ((worker = pgaio_choose_idle_worker()) < 0)
+					break;
+				latches[nlatches++] = io_worker_control->workers[worker].latch;
+			}
+#if 0
+			if (nwakeups > 0)
+				elog(LOG, "wake %d", nwakeups);
+#endif
+		}
+		LWLockRelease(AioWorkerSubmissionQueueLock);
+
+		for (int i = 0; i < nlatches; ++i)
+			SetLatch(latches[i]);
+
+		if (io_index != UINT32_MAX)
+		{
+			ioh = &aio_ctl->io_handles[io_index];
+
+			ereport(DEBUG3,
+					errmsg("worker processing io:%d",
+						   pgaio_io_get_id(unvolatize(PgAioHandle *, ioh))),
+					errhidestmt(true), errhidecontext(true));
+
+			pgaio_io_reopen(unvolatize(PgAioHandle *, ioh));
+			pgaio_io_perform_synchronously(unvolatize(PgAioHandle *, ioh));
+
+			ioh = NULL;
+		}
+		else
+		{
+			WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
+					  WAIT_EVENT_IO_WORKER_MAIN);
+			ResetLatch(MyLatch);
+			CHECK_FOR_INTERRUPTS();
+		}
 	}
 
+	LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
+	Assert(io_worker_control->workers[MyIoWorkerId].in_use);
+	Assert(io_worker_control->workers[MyIoWorkerId].latch == MyLatch);
+
+	io_worker_control->workers[MyIoWorkerId].in_use = false;
+	io_worker_control->workers[MyIoWorkerId].latch = NULL;
+	LWLockRelease(AioWorkerSubmissionQueueLock);
+
 	proc_exit(0);
 }
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index ecc513aa7bd..3678f2b3e43 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -351,6 +351,7 @@ DSMRegistry	"Waiting to read or update the dynamic shared memory registry."
 InjectionPoint	"Waiting to read or update information related to injection points."
 SerialControl	"Waiting to read or update shared <filename>pg_serial</filename> state."
 WaitLSN	"Waiting to read or update shared Wait-for-LSN state."
+AioWorkerSubmissionQueue	"Waiting to access AIO worker submission queue."
 
 #
 # END OF PREDEFINED LWLOCKS (DO NOT CHANGE THIS LINE)
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 8c062240373..1fc8336496c 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -839,7 +839,7 @@
 # WIP AIO GUC docs
 #------------------------------------------------------------------------------
 
-#io_method = sync			# (change requires restart)
+#io_method = worker			# (change requires restart)
 #io_workers = 3				# 1-32;
 
 #io_max_concurrency = 32		# Max number of IOs that may be in
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 2f463d29ca1..f1cac7aa5bf 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -54,6 +54,9 @@ AggStrategy
 AggTransInfo
 Aggref
 AggregateInstrumentation
+AioWorkerControl
+AioWorkerSlot
+AioWorkerSubmissionQueue
 AlenState
 Alias
 AllocBlock
-- 
2.45.2.827.g557ae147e6

