Automatically sizing the IO worker pool
It's hard to know how to set io_workers=3. If it's too small,
io_method=worker's small submission queue overflows and it silently
falls back to synchronous IO. If it's too high, it generates a lot of
pointless wakeups and scheduling overhead, which might be considered
an independent problem or not, but having the right size pool
certainly mitigates it. Here's a patch to replace that GUC with:
io_min_workers=1
io_max_workers=8
io_worker_idle_timeout=60s
io_worker_launch_interval=500ms
It grows the pool when a backlog is detected (better ideas for this
logic welcome), and lets idle workers time out. IO jobs were already
concentrated into the lowest numbered workers, partly because that
seemed to have marginally better latency than anything else tried so
far due to latch collapsing with lucky timing, and partly in
anticipation of this.
The patch also reduces bogus wakeups a bit by being a bit more
cautious about fanout. That could probably be improved a lot more and
needs more research. It's quite tricky to figure out how to suppress
wakeups without throwing potential concurrency away.
The first couple of patches are independent of this topic, and might
be potential cleanups/fixes for master/v18. The last is a simple
latency test.
Ideas, testing, flames etc welcome.
Attachments:
0001-aio-Regularize-io_method-worker-naming-conventions.patchtext/x-patch; charset=US-ASCII; name=0001-aio-Regularize-io_method-worker-naming-conventions.patchDownload
From 1dbba36f67df5d3d34a990613d6d68d15caf1b17 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Sat, 29 Mar 2025 13:25:27 +1300
Subject: [PATCH 1/5] aio: Regularize io_method=worker naming conventions.
method_worker.c didn't keep up with the pattern of PgAioXXX for type
names in the pgaio module. Add the missing "Pg" prefix used else where.
Likewise for pgaio_choose_idle_worker() which alone failed to use a
pgaio_worker_XXX() name refecting its submodule. Rename.
Standardize on parameter names num_staged_ios, staged_ios for the
internal submission function.
Rename the array of handle IDs in PgAioSubmissionQueue to sqes,
since that's a term of art seen in many of these types of systems.
---
src/backend/storage/aio/method_worker.c | 54 ++++++++++++-------------
src/tools/pgindent/typedefs.list | 6 +--
2 files changed, 30 insertions(+), 30 deletions(-)
diff --git a/src/backend/storage/aio/method_worker.c b/src/backend/storage/aio/method_worker.c
index 8ad17ec1ef7..ba5bc5e44ba 100644
--- a/src/backend/storage/aio/method_worker.c
+++ b/src/backend/storage/aio/method_worker.c
@@ -51,26 +51,26 @@
#define IO_WORKER_WAKEUP_FANOUT 2
-typedef struct AioWorkerSubmissionQueue
+typedef struct PgAioWorkerSubmissionQueue
{
uint32 size;
uint32 mask;
uint32 head;
uint32 tail;
- uint32 ios[FLEXIBLE_ARRAY_MEMBER];
-} AioWorkerSubmissionQueue;
+ uint32 sqes[FLEXIBLE_ARRAY_MEMBER];
+} PgAioWorkerSubmissionQueue;
-typedef struct AioWorkerSlot
+typedef struct PgAioWorkerSlot
{
Latch *latch;
bool in_use;
-} AioWorkerSlot;
+} PgAioWorkerSlot;
-typedef struct AioWorkerControl
+typedef struct PgAioWorkerControl
{
uint64 idle_worker_mask;
- AioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER];
-} AioWorkerControl;
+ PgAioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER];
+} PgAioWorkerControl;
static size_t pgaio_worker_shmem_size(void);
@@ -95,8 +95,8 @@ int io_workers = 3;
static int io_worker_queue_size = 64;
static int MyIoWorkerId;
-static AioWorkerSubmissionQueue *io_worker_submission_queue;
-static AioWorkerControl *io_worker_control;
+static PgAioWorkerSubmissionQueue *io_worker_submission_queue;
+static PgAioWorkerControl *io_worker_control;
static size_t
@@ -105,15 +105,15 @@ pgaio_worker_queue_shmem_size(int *queue_size)
/* Round size up to next power of two so we can make a mask. */
*queue_size = pg_nextpower2_32(io_worker_queue_size);
- return offsetof(AioWorkerSubmissionQueue, ios) +
+ return offsetof(PgAioWorkerSubmissionQueue, sqes) +
sizeof(uint32) * *queue_size;
}
static size_t
pgaio_worker_control_shmem_size(void)
{
- return offsetof(AioWorkerControl, workers) +
- sizeof(AioWorkerSlot) * MAX_IO_WORKERS;
+ return offsetof(PgAioWorkerControl, workers) +
+ sizeof(PgAioWorkerSlot) * MAX_IO_WORKERS;
}
static size_t
@@ -161,7 +161,7 @@ pgaio_worker_shmem_init(bool first_time)
}
static int
-pgaio_choose_idle_worker(void)
+pgaio_worker_choose_idle(void)
{
int worker;
@@ -178,7 +178,7 @@ pgaio_choose_idle_worker(void)
static bool
pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
{
- AioWorkerSubmissionQueue *queue;
+ PgAioWorkerSubmissionQueue *queue;
uint32 new_head;
queue = io_worker_submission_queue;
@@ -190,7 +190,7 @@ pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
return false; /* full */
}
- queue->ios[queue->head] = pgaio_io_get_id(ioh);
+ queue->sqes[queue->head] = pgaio_io_get_id(ioh);
queue->head = new_head;
return true;
@@ -199,14 +199,14 @@ pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
static uint32
pgaio_worker_submission_queue_consume(void)
{
- AioWorkerSubmissionQueue *queue;
+ PgAioWorkerSubmissionQueue *queue;
uint32 result;
queue = io_worker_submission_queue;
if (queue->tail == queue->head)
return UINT32_MAX; /* empty */
- result = queue->ios[queue->tail];
+ result = queue->sqes[queue->tail];
queue->tail = (queue->tail + 1) & (queue->size - 1);
return result;
@@ -239,37 +239,37 @@ pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh)
}
static void
-pgaio_worker_submit_internal(int nios, PgAioHandle *ios[])
+pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios)
{
PgAioHandle *synchronous_ios[PGAIO_SUBMIT_BATCH_SIZE];
int nsync = 0;
Latch *wakeup = NULL;
int worker;
- Assert(nios <= PGAIO_SUBMIT_BATCH_SIZE);
+ Assert(num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE);
LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
- for (int i = 0; i < nios; ++i)
+ for (int i = 0; i < num_staged_ios; ++i)
{
- Assert(!pgaio_worker_needs_synchronous_execution(ios[i]));
- if (!pgaio_worker_submission_queue_insert(ios[i]))
+ Assert(!pgaio_worker_needs_synchronous_execution(staged_ios[i]));
+ if (!pgaio_worker_submission_queue_insert(staged_ios[i]))
{
/*
* We'll do it synchronously, but only after we've sent as many as
* we can to workers, to maximize concurrency.
*/
- synchronous_ios[nsync++] = ios[i];
+ synchronous_ios[nsync++] = staged_ios[i];
continue;
}
if (wakeup == NULL)
{
/* Choose an idle worker to wake up if we haven't already. */
- worker = pgaio_choose_idle_worker();
+ worker = pgaio_worker_choose_idle();
if (worker >= 0)
wakeup = io_worker_control->workers[worker].latch;
- pgaio_debug_io(DEBUG4, ios[i],
+ pgaio_debug_io(DEBUG4, staged_ios[i],
"choosing worker %d",
worker);
}
@@ -482,7 +482,7 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
IO_WORKER_WAKEUP_FANOUT);
for (int i = 0; i < nwakeups; ++i)
{
- if ((worker = pgaio_choose_idle_worker()) < 0)
+ if ((worker = pgaio_worker_choose_idle()) < 0)
break;
latches[nlatches++] = io_worker_control->workers[worker].latch;
}
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index d16bc208654..9946cfcec41 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -55,9 +55,6 @@ AggStrategy
AggTransInfo
Aggref
AggregateInstrumentation
-AioWorkerControl
-AioWorkerSlot
-AioWorkerSubmissionQueue
AlenState
Alias
AllocBlock
@@ -2175,6 +2172,9 @@ PgAioTargetID
PgAioTargetInfo
PgAioUringContext
PgAioWaitRef
+PgAioWorkerControl
+PgAioWorkerSlot
+PgAioWorkerSubmissionQueue
PgArchData
PgBackendGSSStatus
PgBackendSSLStatus
--
2.39.5
0002-aio-Remove-IO-worker-ID-references-from-postmaster.c.patchtext/x-patch; charset=US-ASCII; name=0002-aio-Remove-IO-worker-ID-references-from-postmaster.c.patchDownload
From 99c9a303d37d7e2232d3c28ee091aed82fe5b8eb Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Fri, 11 Apr 2025 23:10:10 +1200
Subject: [PATCH 2/5] aio: Remove IO worker ID references from postmaster.c.
An ancient ancestor of this code had the postmaster assign IDs to IO
workers. Now it tracks them in an unordered array, and it might be
confusing to readers that it refers to their indexes as IDs in various
places. Fix.
---
src/backend/postmaster/postmaster.c | 28 ++++++++++++++--------------
1 file changed, 14 insertions(+), 14 deletions(-)
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 17fed96fe20..0e8623dea18 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -4337,15 +4337,15 @@ maybe_start_bgworkers(void)
static bool
maybe_reap_io_worker(int pid)
{
- for (int id = 0; id < MAX_IO_WORKERS; ++id)
+ for (int i = 0; i < MAX_IO_WORKERS; ++i)
{
- if (io_worker_children[id] &&
- io_worker_children[id]->pid == pid)
+ if (io_worker_children[i] &&
+ io_worker_children[i]->pid == pid)
{
- ReleasePostmasterChildSlot(io_worker_children[id]);
+ ReleasePostmasterChildSlot(io_worker_children[i]);
--io_worker_count;
- io_worker_children[id] = NULL;
+ io_worker_children[i] = NULL;
return true;
}
}
@@ -4389,22 +4389,22 @@ maybe_adjust_io_workers(void)
while (io_worker_count < io_workers)
{
PMChild *child;
- int id;
+ int i;
/* find unused entry in io_worker_children array */
- for (id = 0; id < MAX_IO_WORKERS; ++id)
+ for (i = 0; i < MAX_IO_WORKERS; ++i)
{
- if (io_worker_children[id] == NULL)
+ if (io_worker_children[i] == NULL)
break;
}
- if (id == MAX_IO_WORKERS)
- elog(ERROR, "could not find a free IO worker ID");
+ if (i == MAX_IO_WORKERS)
+ elog(ERROR, "could not find a free IO worker slot");
/* Try to launch one. */
child = StartChildProcess(B_IO_WORKER);
if (child != NULL)
{
- io_worker_children[id] = child;
+ io_worker_children[i] = child;
++io_worker_count;
}
else
@@ -4415,11 +4415,11 @@ maybe_adjust_io_workers(void)
if (io_worker_count > io_workers)
{
/* ask the IO worker in the highest slot to exit */
- for (int id = MAX_IO_WORKERS - 1; id >= 0; --id)
+ for (int i = MAX_IO_WORKERS - 1; i >= 0; --i)
{
- if (io_worker_children[id] != NULL)
+ if (io_worker_children[i] != NULL)
{
- kill(io_worker_children[id]->pid, SIGUSR2);
+ kill(io_worker_children[i]->pid, SIGUSR2);
break;
}
}
--
2.39.5
0003-aio-Try-repeatedly-to-give-batched-IOs-to-workers.patchtext/x-patch; charset=US-ASCII; name=0003-aio-Try-repeatedly-to-give-batched-IOs-to-workers.patchDownload
From a90a692725eedd692f934bf3ed56a2e3a7f3fc2c Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Fri, 11 Apr 2025 21:17:26 +1200
Subject: [PATCH 3/5] aio: Try repeatedly to give batched IOs to workers.
Previously, if the first of a batch of IOs didn't fit in a batch we'd
run all of them synchronously. Andres rightly pointed out that we
should really try again between synchronous IOs, since the workers might
have made progress.
Suggested-by: Andres Freund <andres@anarazel.de>
---
src/backend/storage/aio/method_worker.c | 30 ++++++++++++++++++++++---
1 file changed, 27 insertions(+), 3 deletions(-)
diff --git a/src/backend/storage/aio/method_worker.c b/src/backend/storage/aio/method_worker.c
index ba5bc5e44ba..c20d6d0f18b 100644
--- a/src/backend/storage/aio/method_worker.c
+++ b/src/backend/storage/aio/method_worker.c
@@ -280,12 +280,36 @@ pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios)
SetLatch(wakeup);
/* Run whatever is left synchronously. */
- if (nsync > 0)
+ for (int i = 0; i < nsync; ++i)
{
- for (int i = 0; i < nsync; ++i)
+ wakeup = NULL;
+
+ /*
+ * Between synchronous IO operations, try again to enqueue as many as
+ * we can.
+ */
+ if (i > 0)
{
- pgaio_io_perform_synchronously(synchronous_ios[i]);
+ wakeup = NULL;
+
+ LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
+ while (i < nsync &&
+ pgaio_worker_submission_queue_insert(synchronous_ios[i]))
+ {
+ if (wakeup == NULL && (worker = pgaio_worker_choose_idle()) >= 0)
+ wakeup = io_worker_control->workers[worker].latch;
+ i++;
+ }
+ LWLockRelease(AioWorkerSubmissionQueueLock);
+
+ if (wakeup)
+ SetLatch(wakeup);
+
+ if (i == nsync)
+ break;
}
+
+ pgaio_io_perform_synchronously(synchronous_ios[i]);
}
}
--
2.39.5
0004-aio-Adjust-IO-worker-pool-size-automatically.patchtext/x-patch; charset=US-ASCII; name=0004-aio-Adjust-IO-worker-pool-size-automatically.patchDownload
From 02325442bea440e65b5f3817c3fb8bd4681bbd25 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Sat, 22 Mar 2025 00:36:49 +1300
Subject: [PATCH 4/5] aio: Adjust IO worker pool size automatically.
Replace the simple io_workers setting with:
io_min_workers=1
io_max_workers=8
io_worker_idle_timeout=60s
io_worker_launch_interval=500ms
The pool is automatically sized within the configured range according
to demand.
XXX WIP
---
doc/src/sgml/config.sgml | 70 ++-
src/backend/postmaster/postmaster.c | 64 ++-
src/backend/storage/aio/method_worker.c | 450 ++++++++++++++----
.../utils/activity/wait_event_names.txt | 1 +
src/backend/utils/misc/guc_tables.c | 46 +-
src/backend/utils/misc/postgresql.conf.sample | 5 +-
src/include/storage/io_worker.h | 9 +-
src/include/storage/lwlocklist.h | 1 +
src/include/storage/pmsignal.h | 1 +
src/test/modules/test_aio/t/002_io_workers.pl | 15 +-
10 files changed, 541 insertions(+), 121 deletions(-)
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index c1674c22cb2..9f2e7ae6785 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -2769,16 +2769,76 @@ include_dir 'conf.d'
</listitem>
</varlistentry>
- <varlistentry id="guc-io-workers" xreflabel="io_workers">
- <term><varname>io_workers</varname> (<type>int</type>)
+ <varlistentry id="guc-io-min-workers" xreflabel="io_min_workers">
+ <term><varname>io_min_workers</varname> (<type>int</type>)
<indexterm>
- <primary><varname>io_workers</varname> configuration parameter</primary>
+ <primary><varname>io_min_workers</varname> configuration parameter</primary>
</indexterm>
</term>
<listitem>
<para>
- Selects the number of I/O worker processes to use. The default is
- 3. This parameter can only be set in the
+ Sets the minimum number of I/O worker processes to use. The default is
+ 1. This parameter can only be set in the
+ <filename>postgresql.conf</filename> file or on the server command
+ line.
+ </para>
+ <para>
+ Only has an effect if <xref linkend="guc-io-method"/> is set to
+ <literal>worker</literal>.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry id="guc-io-max-workers" xreflabel="io_max_workers">
+ <term><varname>io_max_workers</varname> (<type>int</type>)
+ <indexterm>
+ <primary><varname>io_max_workers</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Sets the maximum number of I/O worker processes to use. The default is
+ 8. This parameter can only be set in the
+ <filename>postgresql.conf</filename> file or on the server command
+ line.
+ </para>
+ <para>
+ Only has an effect if <xref linkend="guc-io-method"/> is set to
+ <literal>worker</literal>.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry id="guc-io-worker-idle-timeout" xreflabel="io_worker_idle_timeout">
+ <term><varname>io_worker_idle_timeout</varname> (<type>int</type>)
+ <indexterm>
+ <primary><varname>io_worker_idle_timeout</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Sets the time after which idle I/O worker processes will exit, reducing the
+ maximum size of the I/O worker pool towards the minimum. The default
+ is 1 minute.
+ This parameter can only be set in the
+ <filename>postgresql.conf</filename> file or on the server command
+ line.
+ </para>
+ <para>
+ Only has an effect if <xref linkend="guc-io-method"/> is set to
+ <literal>worker</literal>.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry id="guc-io-worker-launch-interval" xreflabel="io_worker_launch_interval">
+ <term><varname>io_worker_launch_interval</varname> (<type>int</type>)
+ <indexterm>
+ <primary><varname>io_worker_launch_interval</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Sets the minimum time between launching new I/O workers. This can be used to avoid
+ sudden bursts of new I/O workers. The default is 100ms.
+ This parameter can only be set in the
<filename>postgresql.conf</filename> file or on the server command
line.
</para>
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 0e8623dea18..b3f68897194 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -408,6 +408,7 @@ static DNSServiceRef bonjour_sdref = NULL;
#endif
/* State for IO worker management. */
+static TimestampTz io_worker_launch_delay_until = 0;
static int io_worker_count = 0;
static PMChild *io_worker_children[MAX_IO_WORKERS];
@@ -1569,6 +1570,15 @@ DetermineSleepTime(void)
if (StartWorkerNeeded)
return 0;
+ /* If we need a new IO worker, defer until launch delay expires. */
+ if (pgaio_worker_test_new_worker_needed() &&
+ io_worker_count < io_max_workers)
+ {
+ if (io_worker_launch_delay_until == 0)
+ return 0;
+ next_wakeup = io_worker_launch_delay_until;
+ }
+
if (HaveCrashedWorker)
{
dlist_mutable_iter iter;
@@ -3750,6 +3760,15 @@ process_pm_pmsignal(void)
StartWorkerNeeded = true;
}
+ /* Process IO worker start requets. */
+ if (CheckPostmasterSignal(PMSIGNAL_IO_WORKER_CHANGE))
+ {
+ /*
+ * No local flag, as the state is exposed through pgaio_worker_*()
+ * functions. This signal is received on potentially actionable level
+ * changes, so that maybe_adjust_io_workers() will run.
+ */
+ }
/* Process background worker state changes. */
if (CheckPostmasterSignal(PMSIGNAL_BACKGROUND_WORKER_CHANGE))
{
@@ -4355,8 +4374,9 @@ maybe_reap_io_worker(int pid)
/*
* Start or stop IO workers, to close the gap between the number of running
* workers and the number of configured workers. Used to respond to change of
- * the io_workers GUC (by increasing and decreasing the number of workers), as
- * well as workers terminating in response to errors (by starting
+ * the io_{min,max}_workers GUCs (by increasing and decreasing the number of
+ * workers) and requests to start a new one due to submission queue backlog,
+ * as well as workers terminating in response to errors (by starting
* "replacement" workers).
*/
static void
@@ -4385,8 +4405,16 @@ maybe_adjust_io_workers(void)
Assert(pmState < PM_WAIT_IO_WORKERS);
- /* Not enough running? */
- while (io_worker_count < io_workers)
+ /* Cancel the launch delay when it expires to minimize clock access. */
+ if (io_worker_launch_delay_until != 0 &&
+ io_worker_launch_delay_until <= GetCurrentTimestamp())
+ io_worker_launch_delay_until = 0;
+
+ /* Not enough workers running? */
+ while (io_worker_launch_delay_until == 0 &&
+ io_worker_count < io_max_workers &&
+ ((io_worker_count < io_min_workers ||
+ pgaio_worker_clear_new_worker_needed())))
{
PMChild *child;
int i;
@@ -4400,6 +4428,16 @@ maybe_adjust_io_workers(void)
if (i == MAX_IO_WORKERS)
elog(ERROR, "could not find a free IO worker slot");
+ /*
+ * Apply launch delay even for failures to avoid retrying too fast on
+ * fork() failure, but not while we're still building the minimum pool
+ * size.
+ */
+ if (io_worker_count >= io_min_workers)
+ io_worker_launch_delay_until =
+ TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
+ io_worker_launch_interval);
+
/* Try to launch one. */
child = StartChildProcess(B_IO_WORKER);
if (child != NULL)
@@ -4411,19 +4449,11 @@ maybe_adjust_io_workers(void)
break; /* try again next time */
}
- /* Too many running? */
- if (io_worker_count > io_workers)
- {
- /* ask the IO worker in the highest slot to exit */
- for (int i = MAX_IO_WORKERS - 1; i >= 0; --i)
- {
- if (io_worker_children[i] != NULL)
- {
- kill(io_worker_children[i]->pid, SIGUSR2);
- break;
- }
- }
- }
+ /*
+ * If there are too many running because io_max_workers changed, that will
+ * be handled by the IO workers themselves so they can shut down in
+ * preferred order.
+ */
}
diff --git a/src/backend/storage/aio/method_worker.c b/src/backend/storage/aio/method_worker.c
index c20d6d0f18b..78817bb4196 100644
--- a/src/backend/storage/aio/method_worker.c
+++ b/src/backend/storage/aio/method_worker.c
@@ -11,9 +11,10 @@
* infrastructure for reopening the file, and must processed synchronously by
* the client code when submitted.
*
- * So that the submitter can make just one system call when submitting a batch
- * of IOs, wakeups "fan out"; each woken IO worker can wake two more. XXX This
- * could be improved by using futexes instead of latches to wake N waiters.
+ * When a batch of IOs is submitted, the lowest numbered idle worker is woken
+ * up. If it sees more work in the queue it wakes a peer to help, and so on
+ * in a chain. When a backlog is detected, the pool size is increased. When
+ * the highest numbered worker times out after a period of inactivity.
*
* This method of AIO is available in all builds on all operating systems, and
* is the default.
@@ -40,16 +41,16 @@
#include "storage/io_worker.h"
#include "storage/ipc.h"
#include "storage/latch.h"
+#include "storage/lwlock.h"
+#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "tcop/tcopprot.h"
#include "utils/memdebug.h"
#include "utils/ps_status.h"
#include "utils/wait_event.h"
-
-/* How many workers should each worker wake up if needed? */
-#define IO_WORKER_WAKEUP_FANOUT 2
-
+/* Saturation for stats counters used to estimate wakeup:work ratio. */
+#define PGAIO_WORKER_STATS_MAX 64
typedef struct PgAioWorkerSubmissionQueue
{
@@ -62,17 +63,25 @@ typedef struct PgAioWorkerSubmissionQueue
typedef struct PgAioWorkerSlot
{
- Latch *latch;
- bool in_use;
+ ProcNumber proc_number;
} PgAioWorkerSlot;
typedef struct PgAioWorkerControl
{
+ /* Seen by postmaster */
+ volatile bool new_worker_needed;
+
+ /* Potected by AioWorkerSubmissionQueueLock. */
uint64 idle_worker_mask;
+
+ /* Protected by AioWorkerControlLock. */
+ uint64 worker_set;
+ int nworkers;
+
+ /* Protected by AioWorkerControlLock. */
PgAioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER];
} PgAioWorkerControl;
-
static size_t pgaio_worker_shmem_size(void);
static void pgaio_worker_shmem_init(bool first_time);
@@ -90,11 +99,14 @@ const IoMethodOps pgaio_worker_ops = {
/* GUCs */
-int io_workers = 3;
+int io_min_workers = 1;
+int io_max_workers = 8;
+int io_worker_idle_timeout = 60000;
+int io_worker_launch_interval = 500;
static int io_worker_queue_size = 64;
-static int MyIoWorkerId;
+static int MyIoWorkerId = -1;
static PgAioWorkerSubmissionQueue *io_worker_submission_queue;
static PgAioWorkerControl *io_worker_control;
@@ -151,36 +163,171 @@ pgaio_worker_shmem_init(bool first_time)
&found);
if (!found)
{
- io_worker_control->idle_worker_mask = 0;
+ io_worker_control->new_worker_needed = false;
+ io_worker_control->worker_set = 0;
for (int i = 0; i < MAX_IO_WORKERS; ++i)
- {
- io_worker_control->workers[i].latch = NULL;
- io_worker_control->workers[i].in_use = false;
- }
+ io_worker_control->workers[i].proc_number = INVALID_PROC_NUMBER;
+ }
+}
+
+static void
+pgaio_worker_consider_new_worker(uint32 queue_depth)
+{
+ /*
+ * This is called from sites that don't hold AioWorkerControlLock, but it
+ * changes infrequently and an up to date value is not required for this
+ * heuristic purpose.
+ */
+ if (!io_worker_control->new_worker_needed &&
+ queue_depth >= io_worker_control->nworkers)
+ {
+ io_worker_control->new_worker_needed = true;
+ SendPostmasterSignal(PMSIGNAL_IO_WORKER_CHANGE);
}
}
+/*
+ * Called by a worker when the queue is empty, to try to prevent a delayed
+ * reaction to a brief burst. This races against the postmaster acting on the
+ * old value if it was recently set to true, but that's OK, the ordering would
+ * be indeterminate anyway even if we could use locks in the postmaster.
+ */
+static void
+pgaio_worker_cancel_new_worker(void)
+{
+ io_worker_control->new_worker_needed = false;
+}
+
+/*
+ * Called by the postmaster to check if a new worker is needed.
+ */
+bool
+pgaio_worker_test_new_worker_needed(void)
+{
+ return io_worker_control->new_worker_needed;
+}
+
+/*
+ * Called by the postmaster to check if a new worker is needed when it's ready
+ * to launch one, and clear the flag.
+ */
+bool
+pgaio_worker_clear_new_worker_needed(void)
+{
+ bool result;
+
+ result = io_worker_control->new_worker_needed;
+ if (result)
+ io_worker_control->new_worker_needed = false;
+
+ return result;
+}
+
+static uint64
+pgaio_worker_mask(int worker)
+{
+ return UINT64_C(1) << worker;
+}
+
+static void
+pgaio_worker_add(uint64 *set, int worker)
+{
+ *set |= pgaio_worker_mask(worker);
+}
+
+static void
+pgaio_worker_remove(uint64 *set, int worker)
+{
+ *set &= ~pgaio_worker_mask(worker);
+}
+
+#ifdef USE_ASSERT_CHECKING
+static bool
+pgaio_worker_in(uint64 set, int worker)
+{
+ return (set & pgaio_worker_mask(worker)) != 0;
+}
+#endif
+
+static uint64
+pgaio_worker_highest(uint64 set)
+{
+ return pg_leftmost_one_pos64(set);
+}
+
+static uint64
+pgaio_worker_lowest(uint64 set)
+{
+ return pg_rightmost_one_pos64(set);
+}
+
+static int
+pgaio_worker_pop(uint64 *set)
+{
+ int worker;
+
+ Assert(set != 0);
+ worker = pgaio_worker_lowest(*set);
+ pgaio_worker_remove(set, worker);
+ return worker;
+}
+
static int
pgaio_worker_choose_idle(void)
{
+ uint64 idle_worker_mask;
int worker;
- if (io_worker_control->idle_worker_mask == 0)
+ Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE));
+
+ /*
+ * Workers only wake higher numbered workers, to try to encourage an
+ * ordering of wakeup:work ratios, reducing spurious wakeups in lower
+ * numbered workers.
+ */
+ idle_worker_mask = io_worker_control->idle_worker_mask;
+ if (MyIoWorkerId != -1)
+ idle_worker_mask &= ~(pgaio_worker_mask(MyIoWorkerId) - 1);
+
+ if (idle_worker_mask == 0)
return -1;
/* Find the lowest bit position, and clear it. */
- worker = pg_rightmost_one_pos64(io_worker_control->idle_worker_mask);
- io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << worker);
+ worker = pgaio_worker_lowest(idle_worker_mask);
+ pgaio_worker_remove(&io_worker_control->idle_worker_mask, worker);
return worker;
}
+/*
+ * Try to wake a worker by setting its latch, to tell it there are IOs to
+ * process in the submission queue.
+ */
+static void
+pgaio_worker_wake(int worker)
+{
+ ProcNumber proc_number;
+
+ /*
+ * If the selected worker is concurrently exiting, then pgaio_worker_die()
+ * had not yet removed it as of when we saw it in idle_worker_mask. That's
+ * OK, because it will wake all remaining workers to close wakeup-vs-exit
+ * races: *someone* will see the queued IO. If there are no workers
+ * running, the postmaster will start a new one.
+ */
+ proc_number = io_worker_control->workers[worker].proc_number;
+ if (proc_number != INVALID_PROC_NUMBER)
+ SetLatch(&GetPGProcByNumber(proc_number)->procLatch);
+}
+
static bool
pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
{
PgAioWorkerSubmissionQueue *queue;
uint32 new_head;
+ Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE));
+
queue = io_worker_submission_queue;
new_head = (queue->head + 1) & (queue->size - 1);
if (new_head == queue->tail)
@@ -202,6 +349,8 @@ pgaio_worker_submission_queue_consume(void)
PgAioWorkerSubmissionQueue *queue;
uint32 result;
+ Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE));
+
queue = io_worker_submission_queue;
if (queue->tail == queue->head)
return UINT32_MAX; /* empty */
@@ -218,6 +367,8 @@ pgaio_worker_submission_queue_depth(void)
uint32 head;
uint32 tail;
+ Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE));
+
head = io_worker_submission_queue->head;
tail = io_worker_submission_queue->tail;
@@ -242,9 +393,9 @@ static void
pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios)
{
PgAioHandle *synchronous_ios[PGAIO_SUBMIT_BATCH_SIZE];
+ uint32 queue_depth;
+ int worker = -1;
int nsync = 0;
- Latch *wakeup = NULL;
- int worker;
Assert(num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE);
@@ -259,51 +410,48 @@ pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios)
* we can to workers, to maximize concurrency.
*/
synchronous_ios[nsync++] = staged_ios[i];
- continue;
}
-
- if (wakeup == NULL)
+ else if (worker == -1)
{
/* Choose an idle worker to wake up if we haven't already. */
worker = pgaio_worker_choose_idle();
- if (worker >= 0)
- wakeup = io_worker_control->workers[worker].latch;
pgaio_debug_io(DEBUG4, staged_ios[i],
"choosing worker %d",
worker);
}
}
+ queue_depth = pgaio_worker_submission_queue_depth();
LWLockRelease(AioWorkerSubmissionQueueLock);
- if (wakeup)
- SetLatch(wakeup);
+ if (worker != -1)
+ pgaio_worker_wake(worker);
+ else
+ pgaio_worker_consider_new_worker(queue_depth);
/* Run whatever is left synchronously. */
for (int i = 0; i < nsync; ++i)
{
- wakeup = NULL;
-
/*
* Between synchronous IO operations, try again to enqueue as many as
* we can.
*/
if (i > 0)
{
- wakeup = NULL;
+ worker = -1;
LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
while (i < nsync &&
pgaio_worker_submission_queue_insert(synchronous_ios[i]))
{
- if (wakeup == NULL && (worker = pgaio_worker_choose_idle()) >= 0)
- wakeup = io_worker_control->workers[worker].latch;
+ if (worker == -1)
+ worker = pgaio_worker_choose_idle();
i++;
}
LWLockRelease(AioWorkerSubmissionQueueLock);
- if (wakeup)
- SetLatch(wakeup);
+ if (worker != -1)
+ pgaio_worker_wake(worker);
if (i == nsync)
break;
@@ -335,13 +483,27 @@ pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
static void
pgaio_worker_die(int code, Datum arg)
{
- LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
- Assert(io_worker_control->workers[MyIoWorkerId].in_use);
- Assert(io_worker_control->workers[MyIoWorkerId].latch == MyLatch);
+ uint64 notify_set;
- io_worker_control->workers[MyIoWorkerId].in_use = false;
- io_worker_control->workers[MyIoWorkerId].latch = NULL;
+ LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
+ pgaio_worker_remove(&io_worker_control->idle_worker_mask, MyIoWorkerId);
LWLockRelease(AioWorkerSubmissionQueueLock);
+
+ LWLockAcquire(AioWorkerControlLock, LW_EXCLUSIVE);
+ Assert(io_worker_control->workers[MyIoWorkerId].proc_number == MyProcNumber);
+ io_worker_control->workers[MyIoWorkerId].proc_number = INVALID_PROC_NUMBER;
+ Assert(pgaio_worker_in(io_worker_control->worker_set, MyIoWorkerId));
+ pgaio_worker_remove(&io_worker_control->worker_set, MyIoWorkerId);
+ notify_set = io_worker_control->worker_set;
+ Assert(io_worker_control->nworkers > 0);
+ io_worker_control->nworkers--;
+ Assert(pg_popcount64(io_worker_control->worker_set) ==
+ io_worker_control->nworkers);
+ LWLockRelease(AioWorkerControlLock);
+
+ /* Notify other workers on pool change. */
+ while (notify_set != 0)
+ pgaio_worker_wake(pgaio_worker_pop(¬ify_set));
}
/*
@@ -351,33 +513,37 @@ pgaio_worker_die(int code, Datum arg)
static void
pgaio_worker_register(void)
{
- MyIoWorkerId = -1;
+ uint64 worker_set_inverted;
+ uint64 old_worker_set;
- /*
- * XXX: This could do with more fine-grained locking. But it's also not
- * very common for the number of workers to change at the moment...
- */
- LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
+ MyIoWorkerId = -1;
- for (int i = 0; i < MAX_IO_WORKERS; ++i)
+ LWLockAcquire(AioWorkerControlLock, LW_EXCLUSIVE);
+ worker_set_inverted = ~io_worker_control->worker_set;
+ if (worker_set_inverted != 0)
{
- if (!io_worker_control->workers[i].in_use)
- {
- Assert(io_worker_control->workers[i].latch == NULL);
- io_worker_control->workers[i].in_use = true;
- MyIoWorkerId = i;
- break;
- }
- else
- Assert(io_worker_control->workers[i].latch != NULL);
+ MyIoWorkerId = pgaio_worker_lowest(worker_set_inverted);
+ if (MyIoWorkerId >= MAX_IO_WORKERS)
+ MyIoWorkerId = -1;
}
-
if (MyIoWorkerId == -1)
elog(ERROR, "couldn't find a free worker slot");
- io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
- io_worker_control->workers[MyIoWorkerId].latch = MyLatch;
- LWLockRelease(AioWorkerSubmissionQueueLock);
+ Assert(io_worker_control->workers[MyIoWorkerId].proc_number ==
+ INVALID_PROC_NUMBER);
+ io_worker_control->workers[MyIoWorkerId].proc_number = MyProcNumber;
+
+ old_worker_set = io_worker_control->worker_set;
+ Assert(!pgaio_worker_in(old_worker_set, MyIoWorkerId));
+ pgaio_worker_add(&io_worker_control->worker_set, MyIoWorkerId);
+ io_worker_control->nworkers++;
+ Assert(pg_popcount64(io_worker_control->worker_set) ==
+ io_worker_control->nworkers);
+ LWLockRelease(AioWorkerControlLock);
+
+ /* Notify other workers on pool change. */
+ while (old_worker_set != 0)
+ pgaio_worker_wake(pgaio_worker_pop(&old_worker_set));
on_shmem_exit(pgaio_worker_die, 0);
}
@@ -403,14 +569,47 @@ pgaio_worker_error_callback(void *arg)
errcontext("I/O worker executing I/O on behalf of process %d", owner_pid);
}
+/*
+ * Check if this backend is allowed to time out, and thus should use a
+ * non-infinite sleep time. Only the highest-numbered worker is allowed to
+ * time out, and only if the pool is above io_min_workers. Serializing
+ * timeouts keeps IDs in a range 0..N without gaps, and avoids undershooting
+ * io_min_workers.
+ *
+ * The result is only instantaneously true and may be temporarily inconsistent
+ * in different workers around transitions, but all workers are woken up on
+ * pool size or GUC changes making the result eventually consistent.
+ */
+static bool
+pgaio_worker_can_timeout(void)
+{
+ uint64 worker_set;
+
+ /* Serialize against pool sized changes. */
+ LWLockAcquire(AioWorkerControlLock, LW_SHARED);
+ worker_set = io_worker_control->worker_set;
+ LWLockRelease(AioWorkerControlLock);
+
+ if (MyIoWorkerId != pgaio_worker_highest(worker_set))
+ return false;
+ if (MyIoWorkerId < io_min_workers)
+ return false;
+
+ return true;
+}
+
void
IoWorkerMain(const void *startup_data, size_t startup_data_len)
{
sigjmp_buf local_sigjmp_buf;
+ TimestampTz idle_timeout_abs = 0;
+ int timeout_guc_used = 0;
PgAioHandle *volatile error_ioh = NULL;
ErrorContextCallback errcallback = {0};
volatile int error_errno = 0;
char cmd[128];
+ int ios = 0;
+ int wakeups = 0;
MyBackendType = B_IO_WORKER;
AuxiliaryProcessMainCommon();
@@ -479,47 +678,53 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
while (!ShutdownRequestPending)
{
uint32 io_index;
- Latch *latches[IO_WORKER_WAKEUP_FANOUT];
- int nlatches = 0;
- int nwakeups = 0;
- int worker;
+ uint32 queue_depth;
+ int worker = -1;
/* Try to get a job to do. */
LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
- if ((io_index = pgaio_worker_submission_queue_consume()) == UINT32_MAX)
+ io_index = pgaio_worker_submission_queue_consume();
+ queue_depth = pgaio_worker_submission_queue_depth();
+ if (io_index == UINT32_MAX)
{
- /*
- * Nothing to do. Mark self idle.
- *
- * XXX: Invent some kind of back pressure to reduce useless
- * wakeups?
- */
- io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
+ /* Nothing to do. Mark self idle. */
+ pgaio_worker_add(&io_worker_control->idle_worker_mask,
+ MyIoWorkerId);
}
else
{
/* Got one. Clear idle flag. */
- io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId);
+ pgaio_worker_remove(&io_worker_control->idle_worker_mask,
+ MyIoWorkerId);
- /* See if we can wake up some peers. */
- nwakeups = Min(pgaio_worker_submission_queue_depth(),
- IO_WORKER_WAKEUP_FANOUT);
- for (int i = 0; i < nwakeups; ++i)
- {
- if ((worker = pgaio_worker_choose_idle()) < 0)
- break;
- latches[nlatches++] = io_worker_control->workers[worker].latch;
- }
+ /*
+ * See if we should wake up a peer. Only do this if this worker
+ * is not experiencing spurious wakeups itself, to end a chain of
+ * wasted scheduling.
+ */
+ if (queue_depth > 0 && wakeups <= ios)
+ worker = pgaio_worker_choose_idle();
}
LWLockRelease(AioWorkerSubmissionQueueLock);
- for (int i = 0; i < nlatches; ++i)
- SetLatch(latches[i]);
+ /* Propagate wakeups. */
+ if (worker != -1)
+ pgaio_worker_wake(worker);
+ else if (wakeups <= ios)
+ pgaio_worker_consider_new_worker(queue_depth);
if (io_index != UINT32_MAX)
{
PgAioHandle *ioh = NULL;
+ /* Cancel timeout and update wakeup:work ratio. */
+ idle_timeout_abs = 0;
+ if (++ios == PGAIO_WORKER_STATS_MAX)
+ {
+ ios /= 2;
+ wakeups /= 2;
+ }
+
ioh = &pgaio_ctl->io_handles[io_index];
error_ioh = ioh;
errcallback.arg = ioh;
@@ -585,12 +790,83 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
}
else
{
- WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
- WAIT_EVENT_IO_WORKER_MAIN);
+ int timeout_ms;
+
+ /* Cancel new worker if pending. */
+ pgaio_worker_cancel_new_worker();
+
+ /* Compute the remaining allowed idle time. */
+ if (io_worker_idle_timeout == -1)
+ {
+ /* Never time out. */
+ timeout_ms = -1;
+ }
+ else
+ {
+ TimestampTz now = GetCurrentTimestamp();
+
+ /* If the GUC changes, reset timer. */
+ if (idle_timeout_abs != 0 &&
+ io_worker_idle_timeout != timeout_guc_used)
+ idle_timeout_abs = 0;
+
+ /* On first sleep, compute absolute timeout. */
+ if (idle_timeout_abs == 0)
+ {
+ idle_timeout_abs =
+ TimestampTzPlusMilliseconds(now,
+ io_worker_idle_timeout);
+ timeout_guc_used = io_worker_idle_timeout;
+ }
+
+ /*
+ * All workers maintain the absolute timeout value, but only
+ * the highest worker can actually time out and only if
+ * io_min_workers is exceeded. All others wait only for
+ * explicit wakeups caused by queue insertion, wakeup
+ * propagation, change of pool size (possibly making them
+ * highest), or GUC reload.
+ */
+ if (pgaio_worker_can_timeout())
+ timeout_ms =
+ TimestampDifferenceMilliseconds(now,
+ idle_timeout_abs);
+ else
+ timeout_ms = -1;
+ }
+
+ if (WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT,
+ timeout_ms,
+ WAIT_EVENT_IO_WORKER_MAIN) == WL_TIMEOUT)
+ {
+ /* WL_TIMEOUT */
+ if (pgaio_worker_can_timeout())
+ if (GetCurrentTimestamp() >= idle_timeout_abs)
+ break;
+ }
+ else
+ {
+ /* WL_LATCH_SET */
+ if (++wakeups == PGAIO_WORKER_STATS_MAX)
+ {
+ ios /= 2;
+ wakeups /= 2;
+ }
+ }
ResetLatch(MyLatch);
}
CHECK_FOR_INTERRUPTS();
+
+ if (ConfigReloadPending)
+ {
+ ConfigReloadPending = false;
+ ProcessConfigFile(PGC_SIGHUP);
+
+ /* If io_max_workers has been decreased, exit highest first. */
+ if (MyIoWorkerId >= io_max_workers)
+ break;
+ }
}
error_context_stack = errcallback.previous;
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index 930321905f1..067a3a1bb21 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -353,6 +353,7 @@ DSMRegistry "Waiting to read or update the dynamic shared memory registry."
InjectionPoint "Waiting to read or update information related to injection points."
SerialControl "Waiting to read or update shared <filename>pg_serial</filename> state."
AioWorkerSubmissionQueue "Waiting to access AIO worker submission queue."
+AioWorkerControl "Waiting to update AIO worker information."
#
# END OF PREDEFINED LWLOCKS (DO NOT CHANGE THIS LINE)
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 60b12446a1c..bbb8855b12d 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -3306,14 +3306,52 @@ struct config_int ConfigureNamesInt[] =
},
{
- {"io_workers",
+ {"io_max_workers",
PGC_SIGHUP,
RESOURCES_IO,
- gettext_noop("Number of IO worker processes, for io_method=worker."),
+ gettext_noop("Maximum number of IO worker processes, for io_method=worker."),
NULL,
},
- &io_workers,
- 3, 1, MAX_IO_WORKERS,
+ &io_max_workers,
+ 8, 1, MAX_IO_WORKERS,
+ NULL, NULL, NULL
+ },
+
+ {
+ {"io_min_workers",
+ PGC_SIGHUP,
+ RESOURCES_IO,
+ gettext_noop("Minimum number of IO worker processes, for io_method=worker."),
+ NULL,
+ },
+ &io_min_workers,
+ 1, 1, MAX_IO_WORKERS,
+ NULL, NULL, NULL
+ },
+
+ {
+ {"io_worker_idle_timeout",
+ PGC_SIGHUP,
+ RESOURCES_IO,
+ gettext_noop("Maximum idle time before IO workers exit, for io_method=worker."),
+ NULL,
+ GUC_UNIT_MS
+ },
+ &io_worker_idle_timeout,
+ 60 * 1000, -1, INT_MAX,
+ NULL, NULL, NULL
+ },
+
+ {
+ {"io_worker_launch_interval",
+ PGC_SIGHUP,
+ RESOURCES_IO,
+ gettext_noop("Maximum idle time between launching IO workers, for io_method=worker."),
+ NULL,
+ GUC_UNIT_MS
+ },
+ &io_worker_launch_interval,
+ 500, 0, INT_MAX,
NULL, NULL, NULL
},
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 34826d01380..4370f673821 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -214,7 +214,10 @@
# can execute simultaneously
# -1 sets based on shared_buffers
# (change requires restart)
-#io_workers = 3 # 1-32;
+#io_min_workers = 1 # 1-32;
+#io_max_workers = 8 # 1-32;
+#io_worker_idle_timeout = 60s # min 100ms
+#io_worker_launch_interval = 500ms # min 0ms
# - Worker Processes -
diff --git a/src/include/storage/io_worker.h b/src/include/storage/io_worker.h
index 7bde7e89c8a..de9c80109e0 100644
--- a/src/include/storage/io_worker.h
+++ b/src/include/storage/io_worker.h
@@ -17,6 +17,13 @@
pg_noreturn extern void IoWorkerMain(const void *startup_data, size_t startup_data_len);
-extern PGDLLIMPORT int io_workers;
+extern PGDLLIMPORT int io_min_workers;
+extern PGDLLIMPORT int io_max_workers;
+extern PGDLLIMPORT int io_worker_idle_timeout;
+extern PGDLLIMPORT int io_worker_launch_interval;
+
+/* Interfaces visible to the postmaster. */
+extern bool pgaio_worker_test_new_worker_needed(void);
+extern bool pgaio_worker_clear_new_worker_needed(void);
#endif /* IO_WORKER_H */
diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h
index a9681738146..c1801d08833 100644
--- a/src/include/storage/lwlocklist.h
+++ b/src/include/storage/lwlocklist.h
@@ -84,3 +84,4 @@ PG_LWLOCK(50, DSMRegistry)
PG_LWLOCK(51, InjectionPoint)
PG_LWLOCK(52, SerialControl)
PG_LWLOCK(53, AioWorkerSubmissionQueue)
+PG_LWLOCK(54, AioWorkerControl)
diff --git a/src/include/storage/pmsignal.h b/src/include/storage/pmsignal.h
index 67fa9ac06e1..10a967f6739 100644
--- a/src/include/storage/pmsignal.h
+++ b/src/include/storage/pmsignal.h
@@ -38,6 +38,7 @@ typedef enum
PMSIGNAL_ROTATE_LOGFILE, /* send SIGUSR1 to syslogger to rotate logfile */
PMSIGNAL_START_AUTOVAC_LAUNCHER, /* start an autovacuum launcher */
PMSIGNAL_START_AUTOVAC_WORKER, /* start an autovacuum worker */
+ PMSIGNAL_IO_WORKER_CHANGE, /* IO worker pool change */
PMSIGNAL_BACKGROUND_WORKER_CHANGE, /* background worker state change */
PMSIGNAL_START_WALRECEIVER, /* start a walreceiver */
PMSIGNAL_ADVANCE_STATE_MACHINE, /* advance postmaster's state machine */
diff --git a/src/test/modules/test_aio/t/002_io_workers.pl b/src/test/modules/test_aio/t/002_io_workers.pl
index af5fae15ea7..a0252857798 100644
--- a/src/test/modules/test_aio/t/002_io_workers.pl
+++ b/src/test/modules/test_aio/t/002_io_workers.pl
@@ -14,6 +14,9 @@ $node->init();
$node->append_conf(
'postgresql.conf', qq(
io_method=worker
+io_worker_idle_timeout=0ms
+io_worker_launch_interval=0ms
+io_max_workers=32
));
$node->start();
@@ -31,7 +34,7 @@ sub test_number_of_io_workers_dynamic
{
my $node = shift;
- my $prev_worker_count = $node->safe_psql('postgres', 'SHOW io_workers');
+ my $prev_worker_count = $node->safe_psql('postgres', 'SHOW io_min_workers');
# Verify that worker count can't be set to 0
change_number_of_io_workers($node, 0, $prev_worker_count, 1);
@@ -62,23 +65,23 @@ sub change_number_of_io_workers
my ($result, $stdout, $stderr);
($result, $stdout, $stderr) =
- $node->psql('postgres', "ALTER SYSTEM SET io_workers = $worker_count");
+ $node->psql('postgres', "ALTER SYSTEM SET io_min_workers = $worker_count");
$node->safe_psql('postgres', 'SELECT pg_reload_conf()');
if ($expect_failure)
{
ok( $stderr =~
- /$worker_count is outside the valid range for parameter "io_workers"/,
- "updating number of io_workers to $worker_count failed, as expected"
+ /$worker_count is outside the valid range for parameter "io_min_workers"/,
+ "updating number of io_min_workers to $worker_count failed, as expected"
);
return $prev_worker_count;
}
else
{
- is( $node->safe_psql('postgres', 'SHOW io_workers'),
+ is( $node->safe_psql('postgres', 'SHOW io_min_workers'),
$worker_count,
- "updating number of io_workers from $prev_worker_count to $worker_count"
+ "updating number of io_min_workers from $prev_worker_count to $worker_count"
);
check_io_worker_count($node, $worker_count);
--
2.39.5
0005-XXX-read_buffer_loop.patchtext/x-patch; charset=US-ASCII; name=0005-XXX-read_buffer_loop.patchDownload
From 43fea48f5f6e9b3301a0216f0402b2558862d632 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Sat, 5 Apr 2025 11:14:26 +1300
Subject: [PATCH 5/5] XXX read_buffer_loop
select read_buffer_loop(n) with different values of n in each
session to test latency of reading one block.
---
src/test/modules/test_aio/test_aio--1.0.sql | 4 ++
src/test/modules/test_aio/test_aio.c | 59 +++++++++++++++++++++
2 files changed, 63 insertions(+)
diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql
index e495481c41e..c37b38afcb0 100644
--- a/src/test/modules/test_aio/test_aio--1.0.sql
+++ b/src/test/modules/test_aio/test_aio--1.0.sql
@@ -106,3 +106,7 @@ AS 'MODULE_PATHNAME' LANGUAGE C;
CREATE FUNCTION inj_io_reopen_detach()
RETURNS pg_catalog.void STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION read_buffer_loop(block int)
+RETURNS pg_catalog.void STRICT
+AS 'MODULE_PATHNAME' LANGUAGE C;
diff --git a/src/test/modules/test_aio/test_aio.c b/src/test/modules/test_aio/test_aio.c
index 1d776010ef4..2654302a13c 100644
--- a/src/test/modules/test_aio/test_aio.c
+++ b/src/test/modules/test_aio/test_aio.c
@@ -18,6 +18,8 @@
#include "postgres.h"
+#include <math.h>
+
#include "access/relation.h"
#include "fmgr.h"
#include "storage/aio.h"
@@ -27,6 +29,7 @@
#include "storage/checksum.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
+#include "storage/read_stream.h"
#include "utils/builtins.h"
#include "utils/injection_point.h"
#include "utils/rel.h"
@@ -806,3 +809,59 @@ inj_io_reopen_detach(PG_FUNCTION_ARGS)
#endif
PG_RETURN_VOID();
}
+
+static BlockNumber
+zero_callback(ReadStream *stream, void *user_data, void *pbd)
+{
+ return *(BlockNumber *) user_data;
+}
+
+PG_FUNCTION_INFO_V1(read_buffer_loop);
+Datum
+read_buffer_loop(PG_FUNCTION_ARGS)
+{
+ BlockNumber block = PG_GETARG_UINT32(0);
+ Relation rel;
+ ReadStream *stream;
+ Buffer buffer;
+ TimestampTz start;
+
+ rel = relation_open(TypeRelationId, AccessShareLock);
+ stream = read_stream_begin_relation(0, NULL, rel, MAIN_FORKNUM, zero_callback, &block, 0);
+ for (int loop = 0; loop < 10; loop++)
+ {
+ double samples[25000];
+ double avg = 0;
+ double sum = 0;
+ double var = 0;
+ double dev;
+ double stddev;
+
+ for (int i = 0; i < lengthof(samples); ++i)
+ {
+ bool flushed;
+
+ start = GetCurrentTimestamp();
+ buffer = read_stream_next_buffer(stream, NULL);
+ samples[i] = GetCurrentTimestamp() - start;
+ sum += samples[i];
+
+ ReleaseBuffer(buffer);
+ read_stream_reset(stream);
+ EvictUnpinnedBuffer(buffer, &flushed);
+ }
+ avg = sum / lengthof(samples);
+ for (int i = 0; i < lengthof(samples); i++)
+ {
+ dev = samples[i] - avg;
+ var += dev * dev;
+ }
+ stddev = sqrt(var / lengthof(samples));
+
+ elog(NOTICE, "n = %zu, avg = %.1fus, stddev = %.1f", lengthof(samples), avg, stddev);
+ }
+ read_stream_end(stream);
+ relation_close(rel, AccessShareLock);
+
+ PG_RETURN_VOID();
+}
--
2.39.5
On 12/4/25 18:59, Thomas Munro wrote:
It's hard to know how to set io_workers=3.
Hmmm.... enable the below behaviour if "io_workers=auto" (default) ?
Sometimes being able to set this kind of parameters manually helps
tremendously with specific workloads... :S
[snip]
Here's a patch to replace that GUC with:io_min_workers=1
io_max_workers=8
io_worker_idle_timeout=60s
io_worker_launch_interval=500ms
Great as defaults / backwards compat with io_workers=auto. Sounds more
user-friendly to me, at least....
[snip]
Ideas, testing, flames etc welcome.
Logic seems sound, if a bit daunting for inexperienced users --- well,
maybe just a bit more than it is now, but ISTM evolution should try and
flatten novices' learning curve, right?
Just .02€, though.
Thanks,
--
Parkinson's Law: Work expands to fill the time alloted to it.
On Mon, Apr 14, 2025 at 5:45 AM Jose Luis Tallon
<jltallon@adv-solutions.net> wrote:
On 12/4/25 18:59, Thomas Munro wrote:
It's hard to know how to set io_workers=3.
Hmmm.... enable the below behaviour if "io_workers=auto" (default) ?
Why not just delete io_workers? If you really want a fixed number,
you can set io_min_workers==io_max_workers.
What should io_max_workers default to? I guess it could be pretty
large without much danger, but I'm not sure. If it's a small value,
an overloaded storage system goes through two stages: first it fills
the queue up with a backlog of requests until it overflows because the
configured maximum of workers isn't keeping up, and then new
submissions start falling back to synchronous IO, sort of jumping
ahead of the queued backlog, but also stalling if the real reason is
that the storage itself isn't keeping up. Whether it'd be better for
the IO worker pool to balloon all the way up to 32 processes (an
internal limit) if required to try to avoid that with default
settings, I'm not entirely sure. Maybe? Why not at least try to get
all the concurrency possible, before falling back to synchronous?
Queued but not running IOs seem to be strictly worse than queued but
not even trying to run. I'd be interested to hear people's thoughts
and experiences actually trying different kinds of workloads on
different kinds of storage. Whether adding more concurrency actually
helps or just generates a lot of useless new processes before the
backpressure kicks in depends on why it's not keeping up, eg hitting
IOPS, throughput or concurrency limits in the storage. In later work
I hope we can make higher levels smarter about understanding whether
requesting more concurrency helps or hurts with feedback (that's quite
a hard problem that some of my colleagues have been looking into), but
the simpler question here seems to be: should this fairly low level
system-wide setting ship with a default that includes any preconceived
assumptions about that?
It's superficially like max_parallel_workers, which ships with a
default of 8, and that's basically where I plucked that 8 from in the
current patch for lack of a serious idea to propose yet. But it's
also more complex than CPU: you know how many cores you have and you
know things about your workload, but even really small "on the metal"
systems probably have a lot more concurrent I/O capacity -- perhaps
depending on the type of operation! (and so far we only have reads) --
than CPU cores. Especially once you completely abandon the idea that
anyone runs databases on spinning rust in modern times, even on low
end systems, which I think we've more or less agreed to assume these
days with related changes such as the recent *_io_concurrency default
change (1->16). It's actually pretty hard to drive a laptop up to
needing more half a dozen or a dozen or a dozen or so workers with
this patch for especially without debug_io_direct=data ie with fast
double-buffered I/O, but cloud environments may also be where most
databases run these days, and low end cloud configurations have
arbitrary made up limits that may be pretty low, so it all depends....
I really don't know, but one idea is that we could leave it open as
possible, and let users worry about that with higher-level settings
and the query concurrency they choose to generate...
io_method=io_uring is effectively open, so why should io_method=worker
be any different by default? Just some thoughts. I'm not sure.
On Sun, Apr 13, 2025 at 04:59:54AM GMT, Thomas Munro wrote:
It's hard to know how to set io_workers=3. If it's too small,
io_method=worker's small submission queue overflows and it silently
falls back to synchronous IO. If it's too high, it generates a lot of
pointless wakeups and scheduling overhead, which might be considered
an independent problem or not, but having the right size pool
certainly mitigates it. Here's a patch to replace that GUC with:io_min_workers=1
io_max_workers=8
io_worker_idle_timeout=60s
io_worker_launch_interval=500msIt grows the pool when a backlog is detected (better ideas for this
logic welcome), and lets idle workers time out.
I like the idea. In fact, I've been pondering about something like a
"smart" configuration for quite some time, and convinced that a similar
approach needs to be applied to many performance-related GUCs.
Idle timeout and launch interval serving as a measure of sensitivity
makes sense to me, growing the pool when a backlog (queue_depth >
nworkers, so even a slightest backlog?) is detected seems to be somewhat
arbitrary. From what I understand the pool growing velocity is constant
and do not depend on the worker demand (i.e. queue_depth)? It may sounds
fancy, but I've got an impression it should be possible to apply what's
called a "low-pass filter" in the control theory (sort of a transfer
function with an exponential decay) to smooth out the demand and adjust
the worker pool based on that.
As a side note, it might be far fetched, but there are instruments in
queueing theory to figure out how much workers are needed to guarantee a
certain low queueing probability, but for that one needs to have an
average arrival rate (in our case, average number of IO operations
dispatched to workers) and an average service rate (average number of IO
operations performed by workers).
HI
On Sun, Apr 13, 2025 at 04:59:54AM GMT, Thomas Munro wrote:
It's hard to know how to set io_workers=3. If it's too small,
io_method=worker's small submission queue overflows and it silently
falls back to synchronous IO. If it's too high, it generates a lot of
pointless wakeups and scheduling overhead, which might be considered
an independent problem or not, but having the right size pool
certainly mitigates it. Here's a patch to replace that GUC with:io_min_workers=1
io_max_workers=8
io_worker_idle_timeout=60s
io_worker_launch_interval=500msIt grows the pool when a backlog is detected (better ideas for this
logic welcome), and lets idle workers time out.
I also like idea ,can we set a
io_workers= 3
io_max_workers= cpu/4
io_workers_oversubscribe = 3 (range 1-8)
io_workers * io_workers_oversubscribe <=io_max_workers
On Sun, May 25, 2025 at 3:20 AM Dmitry Dolgov <9erthalion6@gmail.com> wrote:
Show quoted text
On Sun, Apr 13, 2025 at 04:59:54AM GMT, Thomas Munro wrote:
It's hard to know how to set io_workers=3. If it's too small,
io_method=worker's small submission queue overflows and it silently
falls back to synchronous IO. If it's too high, it generates a lot of
pointless wakeups and scheduling overhead, which might be considered
an independent problem or not, but having the right size pool
certainly mitigates it. Here's a patch to replace that GUC with:io_min_workers=1
io_max_workers=8
io_worker_idle_timeout=60s
io_worker_launch_interval=500msIt grows the pool when a backlog is detected (better ideas for this
logic welcome), and lets idle workers time out.I like the idea. In fact, I've been pondering about something like a
"smart" configuration for quite some time, and convinced that a similar
approach needs to be applied to many performance-related GUCs.Idle timeout and launch interval serving as a measure of sensitivity
makes sense to me, growing the pool when a backlog (queue_depth >
nworkers, so even a slightest backlog?) is detected seems to be somewhat
arbitrary. From what I understand the pool growing velocity is constant
and do not depend on the worker demand (i.e. queue_depth)? It may sounds
fancy, but I've got an impression it should be possible to apply what's
called a "low-pass filter" in the control theory (sort of a transfer
function with an exponential decay) to smooth out the demand and adjust
the worker pool based on that.As a side note, it might be far fetched, but there are instruments in
queueing theory to figure out how much workers are needed to guarantee a
certain low queueing probability, but for that one needs to have an
average arrival rate (in our case, average number of IO operations
dispatched to workers) and an average service rate (average number of IO
operations performed by workers).
On Sun, May 25, 2025 at 7:20 AM Dmitry Dolgov <9erthalion6@gmail.com> wrote:
On Sun, Apr 13, 2025 at 04:59:54AM GMT, Thomas Munro wrote:
It's hard to know how to set io_workers=3. If it's too small,
io_method=worker's small submission queue overflows and it silently
falls back to synchronous IO. If it's too high, it generates a lot of
pointless wakeups and scheduling overhead, which might be considered
an independent problem or not, but having the right size pool
certainly mitigates it. Here's a patch to replace that GUC with:io_min_workers=1
io_max_workers=8
io_worker_idle_timeout=60s
io_worker_launch_interval=500msIt grows the pool when a backlog is detected (better ideas for this
logic welcome), and lets idle workers time out.I like the idea. In fact, I've been pondering about something like a
"smart" configuration for quite some time, and convinced that a similar
approach needs to be applied to many performance-related GUCs.Idle timeout and launch interval serving as a measure of sensitivity
makes sense to me, growing the pool when a backlog (queue_depth >
nworkers, so even a slightest backlog?) is detected seems to be somewhat
arbitrary. From what I understand the pool growing velocity is constant
and do not depend on the worker demand (i.e. queue_depth)? It may sounds
fancy, but I've got an impression it should be possible to apply what's
called a "low-pass filter" in the control theory (sort of a transfer
function with an exponential decay) to smooth out the demand and adjust
the worker pool based on that.As a side note, it might be far fetched, but there are instruments in
queueing theory to figure out how much workers are needed to guarantee a
certain low queueing probability, but for that one needs to have an
average arrival rate (in our case, average number of IO operations
dispatched to workers) and an average service rate (average number of IO
operations performed by workers).
Hi Dmitry,
Thanks for looking, and yeah these are definitely the right sort of
questions. I will be both unsurprised and delighted if someone can
bring some more science to this problem. I did read up on Erlang's
formula C ("This formula is used to determine the number of agents or
customer service representatives needed to staff a call centre, for a
specified desired probability of queuing" according to Wikipedia) and
a bunch of related textbook stuff. And yeah I had a bunch of
exponential moving averages of various values using scaled fixed point
arithmetic (just a bunch of shifts and adds) to smooth inputs, in
various attempts. But ... I'm not even sure if we can say that our
I/O arrivals have a Poisson distribution, since they are not all
independent. I tried more things too, while I was still unsure what I
should even be optimising for. My current answer to that is: low
latency with low variance, as seen with io_uring.
In this version I went back to basics and built something that looks
more like the controls of a classic process/thread pool (think Apache)
or connection pool (think JDBC), with a couple of additions based on
intuition: (1) a launch interval, which acts as a bit of damping
against overshooting on brief bursts that are too far apart, and (2)
the queue length > workers * k as a simple way to determine that
latency is being introduced by not having enough workers. Perhaps
there is a good way to compute an adaptive value for k with some fancy
theories, but k=1 seems to have *some* basis: that's the lowest number
which the pool is too small and *certainly* introducing latency, but
any lower constant is harder to defend because we don't know how many
workers are already awake and about to consume tasks. Something from
queuing theory might provide an adaptive value, but in the end, I
figured we really just want to know if the queue is growing ie in
danger of overflowing (note: the queue is small! 64, and not
currently changeable, more on that later, and the overflow behaviour
is synchronous I/O as back-pressure). You seem to be suggesting that
k=1 sounds too low, not too high, but there is that separate
time-based defence against overshoot in response to rare bursts.
You could get more certainty about jobs already about to be consumed
by a worker that is about to dequeue, by doing a lot more book
keeping: assigning them to workers on submission (separate states,
separate queues, various other ideas I guess). But everything I tried
like that caused latency or latency variance to go up, because it
missed out on the chance for another worker to pick it up sooner
opportunistically. This arrangement has the most stable and
predictable pool size and lowest avg latency and stddev(latency) I
have managed to come up with so far. That said, we have plenty of
time to experiment with better ideas if you want to give it a shot or
propose concrete ideas, given that I missed v18 :-)
About control theory... yeah. That's an interesting bag of tricks.
FWIW Melanie and (more recently) I have looked into textbook control
algorithms at a higher level of the I/O stack (and Melanie gave a talk
about other applications in eg VACUUM at pgconf.dev). In
read_stream.c, where I/O demand is created, we've been trying to set
the desired I/O concurrency level and thus lookahead distance with
adaptive feedback. We've tried a lot of stuff. I hope we can share
some concept patches some time soon, well, maybe in this cycle. Some
interesting recent experiments produced graphs that look a lot like
the ones in the book "Feedback Control for Computer Systems" (an easy
software-person book I found for people without an engineering/control
theory background where the problems match our world more closely, cf
typical texts that are about controlling motors and other mechanical
stuff...). Experimental goals are: find the the smallest concurrent
I/O request level (and thus lookahead distance and thus speculative
work done and buffers pinned) that keeps the I/O stall probability
near zero (and keep adapting, since other queries and applications are
sharing system I/O queues), and if that's not even possible, find the
highest concurrent I/O request level that doesn't cause extra latency
due to queuing in lower levels (I/O workers, kernel, ..., disks).
That second part is quite hard. In other words, if higher levels own
that problem and bring the adaptivity, then perhaps io_method=worker
can get away with being quite stupid. Just a thought...
BTW I would like to push 0001 and 0002 to master/18. They are are not
behaviour changes, they just fix up a bunch of inconsistent (0001) and
misleading (0002) variable naming and comments to reflect reality (in
AIO v1 the postmaster used to assign those I/O worker numbers, now the
postmaster has its own array of slots to track them that is *not*
aligned with the ID numbers/slots in shared memory ie those numbers
you see in the ps status, so that's bound to confuse people
maintaining this code). I just happened to notice that when working
on this dynamic worker pool stuff. If there are no objections I will
go ahead and do that soon.
On Mon, May 26, 2025, 8:01 AM Thomas Munro <thomas.munro@gmail.com> wrote:
But ... I'm not even sure if we can say that our
I/O arrivals have a Poisson distribution, since they are not all
independent.
Yeah, a good point, one have to be careful with assumptions about
distribution -- from what I've read many processes in computer systems are
better described by a Pareto. But the beauty of the queuing theory is that
many results are independent from the distribution (not sure about
dependencies though).
In this version I went back to basics and built something that looks
more like the controls of a classic process/thread pool (think Apache)
or connection pool (think JDBC), with a couple of additions based on
intuition: (1) a launch interval, which acts as a bit of damping
against overshooting on brief bursts that are too far apart, and (2)
the queue length > workers * k as a simple way to determine that
latency is being introduced by not having enough workers. Perhaps
there is a good way to compute an adaptive value for k with some fancy
theories, but k=1 seems to have *some* basis: that's the lowest number
which the pool is too small and *certainly* introducing latency, but
any lower constant is harder to defend because we don't know how many
workers are already awake and about to consume tasks. Something from
queuing theory might provide an adaptive value, but in the end, I
figured we really just want to know if the queue is growing ie in
danger of overflowing (note: the queue is small! 64, and not
currently changeable, more on that later, and the overflow behaviour
is synchronous I/O as back-pressure). You seem to be suggesting that
k=1 sounds too low, not too high, but there is that separate
time-based defence against overshoot in response to rare bursts.
I probably had to start with a statement that I find the current approach
reasonable, and I'm only curious if there is more to get out of it. I
haven't benchmarked the patch yet (plan getting to it when I'll get back),
and can imagine practical considerations significantly impacting any
potential solution.
About control theory... yeah. That's an interesting bag of tricks.
FWIW Melanie and (more recently) I have looked into textbook control
algorithms at a higher level of the I/O stack (and Melanie gave a talk
about other applications in eg VACUUM at pgconf.dev). In
read_stream.c, where I/O demand is created, we've been trying to set
the desired I/O concurrency level and thus lookahead distance with
adaptive feedback. We've tried a lot of stuff. I hope we can share
some concept patches some time soon, well, maybe in this cycle. Some
interesting recent experiments produced graphs that look a lot like
the ones in the book "Feedback Control for Computer Systems" (an easy
software-person book I found for people without an engineering/control
theory background where the problems match our world more closely, cf
typical texts that are about controlling motors and other mechanical
stuff...). Experimental goals are: find the the smallest concurrent
I/O request level (and thus lookahead distance and thus speculative
work done and buffers pinned) that keeps the I/O stall probability
near zero (and keep adapting, since other queries and applications are
sharing system I/O queues), and if that's not even possible, find the
highest concurrent I/O request level that doesn't cause extra latency
due to queuing in lower levels (I/O workers, kernel, ..., disks).
That second part is quite hard. In other words, if higher levels own
that problem and bring the adaptivity, then perhaps io_method=worker
can get away with being quite stupid. Just a thought...
Looking forward to it. And thanks for the reminder about the talk, wanted
to watch it already long time ago, but somehow didn't managed yet.
Show quoted text
On Wed, May 28, 2025 at 5:55 AM Dmitry Dolgov <9erthalion6@gmail.com> wrote:
I probably had to start with a statement that I find the current approach reasonable, and I'm only curious if there is more to get out of it. I haven't benchmarked the patch yet (plan getting to it when I'll get back), and can imagine practical considerations significantly impacting any potential solution.
Here's a rebase.
Attachments:
v2-0001-aio-Try-repeatedly-to-give-batched-IOs-to-workers.patchtext/x-patch; charset=US-ASCII; name=v2-0001-aio-Try-repeatedly-to-give-batched-IOs-to-workers.patchDownload
From fa7aac1bc9c0a47fbdbd9459424f08fa61b71ce2 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Fri, 11 Apr 2025 21:17:26 +1200
Subject: [PATCH v2 1/2] aio: Try repeatedly to give batched IOs to workers.
Previously, when the submission queue was full we'd run all remaining
IOs in a batched submissoin synchronously. Andres rightly pointed out
that we should really try again between synchronous IOs, since the
workers might have made progress in draining the queue.
Suggested-by: Andres Freund <andres@anarazel.de>
Discussion: https://postgr.es/m/CA%2BhUKG%2Bm4xV0LMoH2c%3DoRAdEXuCnh%2BtGBTWa7uFeFMGgTLAw%2BQ%40mail.gmail.com
---
src/backend/storage/aio/method_worker.c | 30 ++++++++++++++++++++++---
1 file changed, 27 insertions(+), 3 deletions(-)
diff --git a/src/backend/storage/aio/method_worker.c b/src/backend/storage/aio/method_worker.c
index bf8f77e6ff6..9a82d5f847d 100644
--- a/src/backend/storage/aio/method_worker.c
+++ b/src/backend/storage/aio/method_worker.c
@@ -282,12 +282,36 @@ pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios)
SetLatch(wakeup);
/* Run whatever is left synchronously. */
- if (nsync > 0)
+ for (int i = 0; i < nsync; ++i)
{
- for (int i = 0; i < nsync; ++i)
+ wakeup = NULL;
+
+ /*
+ * Between synchronous IO operations, try again to enqueue as many as
+ * we can.
+ */
+ if (i > 0)
{
- pgaio_io_perform_synchronously(synchronous_ios[i]);
+ wakeup = NULL;
+
+ LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
+ while (i < nsync &&
+ pgaio_worker_submission_queue_insert(synchronous_ios[i]))
+ {
+ if (wakeup == NULL && (worker = pgaio_worker_choose_idle()) >= 0)
+ wakeup = io_worker_control->workers[worker].latch;
+ i++;
+ }
+ LWLockRelease(AioWorkerSubmissionQueueLock);
+
+ if (wakeup)
+ SetLatch(wakeup);
+
+ if (i == nsync)
+ break;
}
+
+ pgaio_io_perform_synchronously(synchronous_ios[i]);
}
}
--
2.47.2
v2-0002-aio-Adjust-IO-worker-pool-size-automatically.patchtext/x-patch; charset=US-ASCII; name=v2-0002-aio-Adjust-IO-worker-pool-size-automatically.patchDownload
From a0a5fff1f1d21c002bf68d36de9aff21bdf61783 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Sat, 22 Mar 2025 00:36:49 +1300
Subject: [PATCH v2 2/2] aio: Adjust IO worker pool size automatically.
Replace the simple io_workers setting with:
io_min_workers=1
io_max_workers=8
io_worker_idle_timeout=60s
io_worker_launch_interval=500ms
The pool is automatically sized within the configured range according
to demand.
Discussion: https://postgr.es/m/CA%2BhUKG%2Bm4xV0LMoH2c%3DoRAdEXuCnh%2BtGBTWa7uFeFMGgTLAw%2BQ%40mail.gmail.com
---
doc/src/sgml/config.sgml | 70 ++-
src/backend/postmaster/postmaster.c | 64 ++-
src/backend/storage/aio/method_worker.c | 445 ++++++++++++++----
.../utils/activity/wait_event_names.txt | 1 +
src/backend/utils/misc/guc_tables.c | 46 +-
src/backend/utils/misc/postgresql.conf.sample | 5 +-
src/include/storage/io_worker.h | 9 +-
src/include/storage/lwlocklist.h | 1 +
src/include/storage/pmsignal.h | 1 +
src/test/modules/test_aio/t/002_io_workers.pl | 15 +-
10 files changed, 535 insertions(+), 122 deletions(-)
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index c7acc0f182f..98532e55041 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -2787,16 +2787,76 @@ include_dir 'conf.d'
</listitem>
</varlistentry>
- <varlistentry id="guc-io-workers" xreflabel="io_workers">
- <term><varname>io_workers</varname> (<type>integer</type>)
+ <varlistentry id="guc-io-min-workers" xreflabel="io_min_workers">
+ <term><varname>io_min_workers</varname> (<type>integer</type>)
<indexterm>
- <primary><varname>io_workers</varname> configuration parameter</primary>
+ <primary><varname>io_min_workers</varname> configuration parameter</primary>
</indexterm>
</term>
<listitem>
<para>
- Selects the number of I/O worker processes to use. The default is
- 3. This parameter can only be set in the
+ Sets the minimum number of I/O worker processes to use. The default is
+ 1. This parameter can only be set in the
+ <filename>postgresql.conf</filename> file or on the server command
+ line.
+ </para>
+ <para>
+ Only has an effect if <xref linkend="guc-io-method"/> is set to
+ <literal>worker</literal>.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry id="guc-io-max-workers" xreflabel="io_max_workers">
+ <term><varname>io_max_workers</varname> (<type>int</type>)
+ <indexterm>
+ <primary><varname>io_max_workers</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Sets the maximum number of I/O worker processes to use. The default is
+ 8. This parameter can only be set in the
+ <filename>postgresql.conf</filename> file or on the server command
+ line.
+ </para>
+ <para>
+ Only has an effect if <xref linkend="guc-io-method"/> is set to
+ <literal>worker</literal>.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry id="guc-io-worker-idle-timeout" xreflabel="io_worker_idle_timeout">
+ <term><varname>io_worker_idle_timeout</varname> (<type>int</type>)
+ <indexterm>
+ <primary><varname>io_worker_idle_timeout</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Sets the time after which idle I/O worker processes will exit, reducing the
+ maximum size of the I/O worker pool towards the minimum. The default
+ is 1 minute.
+ This parameter can only be set in the
+ <filename>postgresql.conf</filename> file or on the server command
+ line.
+ </para>
+ <para>
+ Only has an effect if <xref linkend="guc-io-method"/> is set to
+ <literal>worker</literal>.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry id="guc-io-worker-launch-interval" xreflabel="io_worker_launch_interval">
+ <term><varname>io_worker_launch_interval</varname> (<type>int</type>)
+ <indexterm>
+ <primary><varname>io_worker_launch_interval</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Sets the minimum time between launching new I/O workers. This can be used to avoid
+ sudden bursts of new I/O workers. The default is 100ms.
+ This parameter can only be set in the
<filename>postgresql.conf</filename> file or on the server command
line.
</para>
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index cca9b946e53..a5438fa079d 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -408,6 +408,7 @@ static DNSServiceRef bonjour_sdref = NULL;
#endif
/* State for IO worker management. */
+static TimestampTz io_worker_launch_delay_until = 0;
static int io_worker_count = 0;
static PMChild *io_worker_children[MAX_IO_WORKERS];
@@ -1569,6 +1570,15 @@ DetermineSleepTime(void)
if (StartWorkerNeeded)
return 0;
+ /* If we need a new IO worker, defer until launch delay expires. */
+ if (pgaio_worker_test_new_worker_needed() &&
+ io_worker_count < io_max_workers)
+ {
+ if (io_worker_launch_delay_until == 0)
+ return 0;
+ next_wakeup = io_worker_launch_delay_until;
+ }
+
if (HaveCrashedWorker)
{
dlist_mutable_iter iter;
@@ -3750,6 +3760,15 @@ process_pm_pmsignal(void)
StartWorkerNeeded = true;
}
+ /* Process IO worker start requets. */
+ if (CheckPostmasterSignal(PMSIGNAL_IO_WORKER_CHANGE))
+ {
+ /*
+ * No local flag, as the state is exposed through pgaio_worker_*()
+ * functions. This signal is received on potentially actionable level
+ * changes, so that maybe_adjust_io_workers() will run.
+ */
+ }
/* Process background worker state changes. */
if (CheckPostmasterSignal(PMSIGNAL_BACKGROUND_WORKER_CHANGE))
{
@@ -4355,8 +4374,9 @@ maybe_reap_io_worker(int pid)
/*
* Start or stop IO workers, to close the gap between the number of running
* workers and the number of configured workers. Used to respond to change of
- * the io_workers GUC (by increasing and decreasing the number of workers), as
- * well as workers terminating in response to errors (by starting
+ * the io_{min,max}_workers GUCs (by increasing and decreasing the number of
+ * workers) and requests to start a new one due to submission queue backlog,
+ * as well as workers terminating in response to errors (by starting
* "replacement" workers).
*/
static void
@@ -4385,8 +4405,16 @@ maybe_adjust_io_workers(void)
Assert(pmState < PM_WAIT_IO_WORKERS);
- /* Not enough running? */
- while (io_worker_count < io_workers)
+ /* Cancel the launch delay when it expires to minimize clock access. */
+ if (io_worker_launch_delay_until != 0 &&
+ io_worker_launch_delay_until <= GetCurrentTimestamp())
+ io_worker_launch_delay_until = 0;
+
+ /* Not enough workers running? */
+ while (io_worker_launch_delay_until == 0 &&
+ io_worker_count < io_max_workers &&
+ ((io_worker_count < io_min_workers ||
+ pgaio_worker_clear_new_worker_needed())))
{
PMChild *child;
int i;
@@ -4400,6 +4428,16 @@ maybe_adjust_io_workers(void)
if (i == MAX_IO_WORKERS)
elog(ERROR, "could not find a free IO worker slot");
+ /*
+ * Apply launch delay even for failures to avoid retrying too fast on
+ * fork() failure, but not while we're still building the minimum pool
+ * size.
+ */
+ if (io_worker_count >= io_min_workers)
+ io_worker_launch_delay_until =
+ TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
+ io_worker_launch_interval);
+
/* Try to launch one. */
child = StartChildProcess(B_IO_WORKER);
if (child != NULL)
@@ -4411,19 +4449,11 @@ maybe_adjust_io_workers(void)
break; /* try again next time */
}
- /* Too many running? */
- if (io_worker_count > io_workers)
- {
- /* ask the IO worker in the highest slot to exit */
- for (int i = MAX_IO_WORKERS - 1; i >= 0; --i)
- {
- if (io_worker_children[i] != NULL)
- {
- kill(io_worker_children[i]->pid, SIGUSR2);
- break;
- }
- }
- }
+ /*
+ * If there are too many running because io_max_workers changed, that will
+ * be handled by the IO workers themselves so they can shut down in
+ * preferred order.
+ */
}
diff --git a/src/backend/storage/aio/method_worker.c b/src/backend/storage/aio/method_worker.c
index 9a82d5f847d..6d3f5289e18 100644
--- a/src/backend/storage/aio/method_worker.c
+++ b/src/backend/storage/aio/method_worker.c
@@ -11,9 +11,10 @@
* infrastructure for reopening the file, and must processed synchronously by
* the client code when submitted.
*
- * So that the submitter can make just one system call when submitting a batch
- * of IOs, wakeups "fan out"; each woken IO worker can wake two more. XXX This
- * could be improved by using futexes instead of latches to wake N waiters.
+ * When a batch of IOs is submitted, the lowest numbered idle worker is woken
+ * up. If it sees more work in the queue it wakes a peer to help, and so on
+ * in a chain. When a backlog is detected, the pool size is increased. When
+ * the highest numbered worker times out after a period of inactivity.
*
* This method of AIO is available in all builds on all operating systems, and
* is the default.
@@ -40,6 +41,8 @@
#include "storage/io_worker.h"
#include "storage/ipc.h"
#include "storage/latch.h"
+#include "storage/lwlock.h"
+#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "tcop/tcopprot.h"
#include "utils/injection_point.h"
@@ -47,10 +50,8 @@
#include "utils/ps_status.h"
#include "utils/wait_event.h"
-
-/* How many workers should each worker wake up if needed? */
-#define IO_WORKER_WAKEUP_FANOUT 2
-
+/* Saturation for stats counters used to estimate wakeup:work ratio. */
+#define PGAIO_WORKER_STATS_MAX 64
typedef struct PgAioWorkerSubmissionQueue
{
@@ -63,17 +64,25 @@ typedef struct PgAioWorkerSubmissionQueue
typedef struct PgAioWorkerSlot
{
- Latch *latch;
- bool in_use;
+ ProcNumber proc_number;
} PgAioWorkerSlot;
typedef struct PgAioWorkerControl
{
+ /* Seen by postmaster */
+ volatile bool new_worker_needed;
+
+ /* Potected by AioWorkerSubmissionQueueLock. */
uint64 idle_worker_mask;
+
+ /* Protected by AioWorkerControlLock. */
+ uint64 worker_set;
+ int nworkers;
+
+ /* Protected by AioWorkerControlLock. */
PgAioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER];
} PgAioWorkerControl;
-
static size_t pgaio_worker_shmem_size(void);
static void pgaio_worker_shmem_init(bool first_time);
@@ -91,11 +100,14 @@ const IoMethodOps pgaio_worker_ops = {
/* GUCs */
-int io_workers = 3;
+int io_min_workers = 1;
+int io_max_workers = 8;
+int io_worker_idle_timeout = 60000;
+int io_worker_launch_interval = 500;
static int io_worker_queue_size = 64;
-static int MyIoWorkerId;
+static int MyIoWorkerId = -1;
static PgAioWorkerSubmissionQueue *io_worker_submission_queue;
static PgAioWorkerControl *io_worker_control;
@@ -152,37 +164,172 @@ pgaio_worker_shmem_init(bool first_time)
&found);
if (!found)
{
+ io_worker_control->new_worker_needed = false;
+ io_worker_control->worker_set = 0;
io_worker_control->idle_worker_mask = 0;
for (int i = 0; i < MAX_IO_WORKERS; ++i)
- {
- io_worker_control->workers[i].latch = NULL;
- io_worker_control->workers[i].in_use = false;
- }
+ io_worker_control->workers[i].proc_number = INVALID_PROC_NUMBER;
}
}
+static void
+pgaio_worker_consider_new_worker(uint32 queue_depth)
+{
+ /*
+ * This is called from sites that don't hold AioWorkerControlLock, but it
+ * changes infrequently and an up to date value is not required for this
+ * heuristic purpose.
+ */
+ if (!io_worker_control->new_worker_needed &&
+ queue_depth >= io_worker_control->nworkers)
+ {
+ io_worker_control->new_worker_needed = true;
+ SendPostmasterSignal(PMSIGNAL_IO_WORKER_CHANGE);
+ }
+}
+
+/*
+ * Called by a worker when the queue is empty, to try to prevent a delayed
+ * reaction to a brief burst. This races against the postmaster acting on the
+ * old value if it was recently set to true, but that's OK, the ordering would
+ * be indeterminate anyway even if we could use locks in the postmaster.
+ */
+static void
+pgaio_worker_cancel_new_worker(void)
+{
+ io_worker_control->new_worker_needed = false;
+}
+
+/*
+ * Called by the postmaster to check if a new worker is needed.
+ */
+bool
+pgaio_worker_test_new_worker_needed(void)
+{
+ return io_worker_control->new_worker_needed;
+}
+
+/*
+ * Called by the postmaster to check if a new worker is needed when it's ready
+ * to launch one, and clear the flag.
+ */
+bool
+pgaio_worker_clear_new_worker_needed(void)
+{
+ bool result;
+
+ result = io_worker_control->new_worker_needed;
+ if (result)
+ io_worker_control->new_worker_needed = false;
+
+ return result;
+}
+
+static uint64
+pgaio_worker_mask(int worker)
+{
+ return UINT64_C(1) << worker;
+}
+
+static void
+pgaio_worker_add(uint64 *set, int worker)
+{
+ *set |= pgaio_worker_mask(worker);
+}
+
+static void
+pgaio_worker_remove(uint64 *set, int worker)
+{
+ *set &= ~pgaio_worker_mask(worker);
+}
+
+#ifdef USE_ASSERT_CHECKING
+static bool
+pgaio_worker_in(uint64 set, int worker)
+{
+ return (set & pgaio_worker_mask(worker)) != 0;
+}
+#endif
+
+static uint64
+pgaio_worker_highest(uint64 set)
+{
+ return pg_leftmost_one_pos64(set);
+}
+
+static uint64
+pgaio_worker_lowest(uint64 set)
+{
+ return pg_rightmost_one_pos64(set);
+}
+
+static int
+pgaio_worker_pop(uint64 *set)
+{
+ int worker;
+
+ Assert(set != 0);
+ worker = pgaio_worker_lowest(*set);
+ pgaio_worker_remove(set, worker);
+ return worker;
+}
+
static int
pgaio_worker_choose_idle(void)
{
+ uint64 idle_worker_mask;
int worker;
- if (io_worker_control->idle_worker_mask == 0)
+ Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE));
+
+ /*
+ * Workers only wake higher numbered workers, to try to encourage an
+ * ordering of wakeup:work ratios, reducing spurious wakeups in lower
+ * numbered workers.
+ */
+ idle_worker_mask = io_worker_control->idle_worker_mask;
+ if (MyIoWorkerId != -1)
+ idle_worker_mask &= ~(pgaio_worker_mask(MyIoWorkerId) - 1);
+
+ if (idle_worker_mask == 0)
return -1;
/* Find the lowest bit position, and clear it. */
- worker = pg_rightmost_one_pos64(io_worker_control->idle_worker_mask);
- io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << worker);
- Assert(io_worker_control->workers[worker].in_use);
+ worker = pgaio_worker_lowest(idle_worker_mask);
+ pgaio_worker_remove(&io_worker_control->idle_worker_mask, worker);
return worker;
}
+/*
+ * Try to wake a worker by setting its latch, to tell it there are IOs to
+ * process in the submission queue.
+ */
+static void
+pgaio_worker_wake(int worker)
+{
+ ProcNumber proc_number;
+
+ /*
+ * If the selected worker is concurrently exiting, then pgaio_worker_die()
+ * had not yet removed it as of when we saw it in idle_worker_mask. That's
+ * OK, because it will wake all remaining workers to close wakeup-vs-exit
+ * races: *someone* will see the queued IO. If there are no workers
+ * running, the postmaster will start a new one.
+ */
+ proc_number = io_worker_control->workers[worker].proc_number;
+ if (proc_number != INVALID_PROC_NUMBER)
+ SetLatch(&GetPGProcByNumber(proc_number)->procLatch);
+}
+
static bool
pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
{
PgAioWorkerSubmissionQueue *queue;
uint32 new_head;
+ Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE));
+
queue = io_worker_submission_queue;
new_head = (queue->head + 1) & (queue->size - 1);
if (new_head == queue->tail)
@@ -204,6 +351,8 @@ pgaio_worker_submission_queue_consume(void)
PgAioWorkerSubmissionQueue *queue;
uint32 result;
+ Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE));
+
queue = io_worker_submission_queue;
if (queue->tail == queue->head)
return UINT32_MAX; /* empty */
@@ -220,6 +369,8 @@ pgaio_worker_submission_queue_depth(void)
uint32 head;
uint32 tail;
+ Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE));
+
head = io_worker_submission_queue->head;
tail = io_worker_submission_queue->tail;
@@ -244,9 +395,9 @@ static void
pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios)
{
PgAioHandle *synchronous_ios[PGAIO_SUBMIT_BATCH_SIZE];
+ uint32 queue_depth;
+ int worker = -1;
int nsync = 0;
- Latch *wakeup = NULL;
- int worker;
Assert(num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE);
@@ -261,51 +412,48 @@ pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios)
* we can to workers, to maximize concurrency.
*/
synchronous_ios[nsync++] = staged_ios[i];
- continue;
}
-
- if (wakeup == NULL)
+ else if (worker == -1)
{
/* Choose an idle worker to wake up if we haven't already. */
worker = pgaio_worker_choose_idle();
- if (worker >= 0)
- wakeup = io_worker_control->workers[worker].latch;
pgaio_debug_io(DEBUG4, staged_ios[i],
"choosing worker %d",
worker);
}
}
+ queue_depth = pgaio_worker_submission_queue_depth();
LWLockRelease(AioWorkerSubmissionQueueLock);
- if (wakeup)
- SetLatch(wakeup);
+ if (worker != -1)
+ pgaio_worker_wake(worker);
+ else
+ pgaio_worker_consider_new_worker(queue_depth);
/* Run whatever is left synchronously. */
for (int i = 0; i < nsync; ++i)
{
- wakeup = NULL;
-
/*
* Between synchronous IO operations, try again to enqueue as many as
* we can.
*/
if (i > 0)
{
- wakeup = NULL;
+ worker = -1;
LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
while (i < nsync &&
pgaio_worker_submission_queue_insert(synchronous_ios[i]))
{
- if (wakeup == NULL && (worker = pgaio_worker_choose_idle()) >= 0)
- wakeup = io_worker_control->workers[worker].latch;
+ if (worker == -1)
+ worker = pgaio_worker_choose_idle();
i++;
}
LWLockRelease(AioWorkerSubmissionQueueLock);
- if (wakeup)
- SetLatch(wakeup);
+ if (worker != -1)
+ pgaio_worker_wake(worker);
if (i == nsync)
break;
@@ -337,14 +485,27 @@ pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
static void
pgaio_worker_die(int code, Datum arg)
{
- LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
- Assert(io_worker_control->workers[MyIoWorkerId].in_use);
- Assert(io_worker_control->workers[MyIoWorkerId].latch == MyLatch);
+ uint64 notify_set;
- io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId);
- io_worker_control->workers[MyIoWorkerId].in_use = false;
- io_worker_control->workers[MyIoWorkerId].latch = NULL;
+ LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
+ pgaio_worker_remove(&io_worker_control->idle_worker_mask, MyIoWorkerId);
LWLockRelease(AioWorkerSubmissionQueueLock);
+
+ LWLockAcquire(AioWorkerControlLock, LW_EXCLUSIVE);
+ Assert(io_worker_control->workers[MyIoWorkerId].proc_number == MyProcNumber);
+ io_worker_control->workers[MyIoWorkerId].proc_number = INVALID_PROC_NUMBER;
+ Assert(pgaio_worker_in(io_worker_control->worker_set, MyIoWorkerId));
+ pgaio_worker_remove(&io_worker_control->worker_set, MyIoWorkerId);
+ notify_set = io_worker_control->worker_set;
+ Assert(io_worker_control->nworkers > 0);
+ io_worker_control->nworkers--;
+ Assert(pg_popcount64(io_worker_control->worker_set) ==
+ io_worker_control->nworkers);
+ LWLockRelease(AioWorkerControlLock);
+
+ /* Notify other workers on pool change. */
+ while (notify_set != 0)
+ pgaio_worker_wake(pgaio_worker_pop(¬ify_set));
}
/*
@@ -354,33 +515,37 @@ pgaio_worker_die(int code, Datum arg)
static void
pgaio_worker_register(void)
{
- MyIoWorkerId = -1;
+ uint64 worker_set_inverted;
+ uint64 old_worker_set;
- /*
- * XXX: This could do with more fine-grained locking. But it's also not
- * very common for the number of workers to change at the moment...
- */
- LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
+ MyIoWorkerId = -1;
- for (int i = 0; i < MAX_IO_WORKERS; ++i)
+ LWLockAcquire(AioWorkerControlLock, LW_EXCLUSIVE);
+ worker_set_inverted = ~io_worker_control->worker_set;
+ if (worker_set_inverted != 0)
{
- if (!io_worker_control->workers[i].in_use)
- {
- Assert(io_worker_control->workers[i].latch == NULL);
- io_worker_control->workers[i].in_use = true;
- MyIoWorkerId = i;
- break;
- }
- else
- Assert(io_worker_control->workers[i].latch != NULL);
+ MyIoWorkerId = pgaio_worker_lowest(worker_set_inverted);
+ if (MyIoWorkerId >= MAX_IO_WORKERS)
+ MyIoWorkerId = -1;
}
-
if (MyIoWorkerId == -1)
elog(ERROR, "couldn't find a free worker slot");
- io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
- io_worker_control->workers[MyIoWorkerId].latch = MyLatch;
- LWLockRelease(AioWorkerSubmissionQueueLock);
+ Assert(io_worker_control->workers[MyIoWorkerId].proc_number ==
+ INVALID_PROC_NUMBER);
+ io_worker_control->workers[MyIoWorkerId].proc_number = MyProcNumber;
+
+ old_worker_set = io_worker_control->worker_set;
+ Assert(!pgaio_worker_in(old_worker_set, MyIoWorkerId));
+ pgaio_worker_add(&io_worker_control->worker_set, MyIoWorkerId);
+ io_worker_control->nworkers++;
+ Assert(pg_popcount64(io_worker_control->worker_set) ==
+ io_worker_control->nworkers);
+ LWLockRelease(AioWorkerControlLock);
+
+ /* Notify other workers on pool change. */
+ while (old_worker_set != 0)
+ pgaio_worker_wake(pgaio_worker_pop(&old_worker_set));
on_shmem_exit(pgaio_worker_die, 0);
}
@@ -406,14 +571,47 @@ pgaio_worker_error_callback(void *arg)
errcontext("I/O worker executing I/O on behalf of process %d", owner_pid);
}
+/*
+ * Check if this backend is allowed to time out, and thus should use a
+ * non-infinite sleep time. Only the highest-numbered worker is allowed to
+ * time out, and only if the pool is above io_min_workers. Serializing
+ * timeouts keeps IDs in a range 0..N without gaps, and avoids undershooting
+ * io_min_workers.
+ *
+ * The result is only instantaneously true and may be temporarily inconsistent
+ * in different workers around transitions, but all workers are woken up on
+ * pool size or GUC changes making the result eventually consistent.
+ */
+static bool
+pgaio_worker_can_timeout(void)
+{
+ uint64 worker_set;
+
+ /* Serialize against pool sized changes. */
+ LWLockAcquire(AioWorkerControlLock, LW_SHARED);
+ worker_set = io_worker_control->worker_set;
+ LWLockRelease(AioWorkerControlLock);
+
+ if (MyIoWorkerId != pgaio_worker_highest(worker_set))
+ return false;
+ if (MyIoWorkerId < io_min_workers)
+ return false;
+
+ return true;
+}
+
void
IoWorkerMain(const void *startup_data, size_t startup_data_len)
{
sigjmp_buf local_sigjmp_buf;
+ TimestampTz idle_timeout_abs = 0;
+ int timeout_guc_used = 0;
PgAioHandle *volatile error_ioh = NULL;
ErrorContextCallback errcallback = {0};
volatile int error_errno = 0;
char cmd[128];
+ int ios = 0;
+ int wakeups = 0;
MyBackendType = B_IO_WORKER;
AuxiliaryProcessMainCommon();
@@ -482,10 +680,8 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
while (!ShutdownRequestPending)
{
uint32 io_index;
- Latch *latches[IO_WORKER_WAKEUP_FANOUT];
- int nlatches = 0;
- int nwakeups = 0;
- int worker;
+ uint32 queue_depth;
+ int worker = -1;
/*
* Try to get a job to do.
@@ -494,40 +690,48 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
* to ensure that we don't see an outdated data in the handle.
*/
LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
- if ((io_index = pgaio_worker_submission_queue_consume()) == UINT32_MAX)
+ io_index = pgaio_worker_submission_queue_consume();
+ queue_depth = pgaio_worker_submission_queue_depth();
+ if (io_index == UINT32_MAX)
{
- /*
- * Nothing to do. Mark self idle.
- *
- * XXX: Invent some kind of back pressure to reduce useless
- * wakeups?
- */
- io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
+ /* Nothing to do. Mark self idle. */
+ pgaio_worker_add(&io_worker_control->idle_worker_mask,
+ MyIoWorkerId);
}
else
{
/* Got one. Clear idle flag. */
- io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId);
+ pgaio_worker_remove(&io_worker_control->idle_worker_mask,
+ MyIoWorkerId);
- /* See if we can wake up some peers. */
- nwakeups = Min(pgaio_worker_submission_queue_depth(),
- IO_WORKER_WAKEUP_FANOUT);
- for (int i = 0; i < nwakeups; ++i)
- {
- if ((worker = pgaio_worker_choose_idle()) < 0)
- break;
- latches[nlatches++] = io_worker_control->workers[worker].latch;
- }
+ /*
+ * See if we should wake up a peer. Only do this if this worker
+ * is not experiencing spurious wakeups itself, to end a chain of
+ * wasted scheduling.
+ */
+ if (queue_depth > 0 && wakeups <= ios)
+ worker = pgaio_worker_choose_idle();
}
LWLockRelease(AioWorkerSubmissionQueueLock);
- for (int i = 0; i < nlatches; ++i)
- SetLatch(latches[i]);
+ /* Propagate wakeups. */
+ if (worker != -1)
+ pgaio_worker_wake(worker);
+ else if (wakeups <= ios)
+ pgaio_worker_consider_new_worker(queue_depth);
if (io_index != UINT32_MAX)
{
PgAioHandle *ioh = NULL;
+ /* Cancel timeout and update wakeup:work ratio. */
+ idle_timeout_abs = 0;
+ if (++ios == PGAIO_WORKER_STATS_MAX)
+ {
+ ios /= 2;
+ wakeups /= 2;
+ }
+
ioh = &pgaio_ctl->io_handles[io_index];
error_ioh = ioh;
errcallback.arg = ioh;
@@ -593,8 +797,69 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
}
else
{
- WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
- WAIT_EVENT_IO_WORKER_MAIN);
+ int timeout_ms;
+
+ /* Cancel new worker if pending. */
+ pgaio_worker_cancel_new_worker();
+
+ /* Compute the remaining allowed idle time. */
+ if (io_worker_idle_timeout == -1)
+ {
+ /* Never time out. */
+ timeout_ms = -1;
+ }
+ else
+ {
+ TimestampTz now = GetCurrentTimestamp();
+
+ /* If the GUC changes, reset timer. */
+ if (idle_timeout_abs != 0 &&
+ io_worker_idle_timeout != timeout_guc_used)
+ idle_timeout_abs = 0;
+
+ /* On first sleep, compute absolute timeout. */
+ if (idle_timeout_abs == 0)
+ {
+ idle_timeout_abs =
+ TimestampTzPlusMilliseconds(now,
+ io_worker_idle_timeout);
+ timeout_guc_used = io_worker_idle_timeout;
+ }
+
+ /*
+ * All workers maintain the absolute timeout value, but only
+ * the highest worker can actually time out and only if
+ * io_min_workers is exceeded. All others wait only for
+ * explicit wakeups caused by queue insertion, wakeup
+ * propagation, change of pool size (possibly making them
+ * highest), or GUC reload.
+ */
+ if (pgaio_worker_can_timeout())
+ timeout_ms =
+ TimestampDifferenceMilliseconds(now,
+ idle_timeout_abs);
+ else
+ timeout_ms = -1;
+ }
+
+ if (WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT,
+ timeout_ms,
+ WAIT_EVENT_IO_WORKER_MAIN) == WL_TIMEOUT)
+ {
+ /* WL_TIMEOUT */
+ if (pgaio_worker_can_timeout())
+ if (GetCurrentTimestamp() >= idle_timeout_abs)
+ break;
+ }
+ else
+ {
+ /* WL_LATCH_SET */
+ if (++wakeups == PGAIO_WORKER_STATS_MAX)
+ {
+ ios /= 2;
+ wakeups /= 2;
+ }
+ }
ResetLatch(MyLatch);
}
@@ -604,6 +869,10 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
{
ConfigReloadPending = false;
ProcessConfigFile(PGC_SIGHUP);
+
+ /* If io_max_workers has been decreased, exit highest first. */
+ if (MyIoWorkerId >= io_max_workers)
+ break;
}
}
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index 4da68312b5f..c6c8107fe33 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -352,6 +352,7 @@ DSMRegistry "Waiting to read or update the dynamic shared memory registry."
InjectionPoint "Waiting to read or update information related to injection points."
SerialControl "Waiting to read or update shared <filename>pg_serial</filename> state."
AioWorkerSubmissionQueue "Waiting to access AIO worker submission queue."
+AioWorkerControl "Waiting to update AIO worker information."
#
# END OF PREDEFINED LWLOCKS (DO NOT CHANGE THIS LINE)
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index d14b1678e7f..ecb16facb67 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -3306,14 +3306,52 @@ struct config_int ConfigureNamesInt[] =
},
{
- {"io_workers",
+ {"io_max_workers",
PGC_SIGHUP,
RESOURCES_IO,
- gettext_noop("Number of IO worker processes, for io_method=worker."),
+ gettext_noop("Maximum number of IO worker processes, for io_method=worker."),
NULL,
},
- &io_workers,
- 3, 1, MAX_IO_WORKERS,
+ &io_max_workers,
+ 8, 1, MAX_IO_WORKERS,
+ NULL, NULL, NULL
+ },
+
+ {
+ {"io_min_workers",
+ PGC_SIGHUP,
+ RESOURCES_IO,
+ gettext_noop("Minimum number of IO worker processes, for io_method=worker."),
+ NULL,
+ },
+ &io_min_workers,
+ 1, 1, MAX_IO_WORKERS,
+ NULL, NULL, NULL
+ },
+
+ {
+ {"io_worker_idle_timeout",
+ PGC_SIGHUP,
+ RESOURCES_IO,
+ gettext_noop("Maximum idle time before IO workers exit, for io_method=worker."),
+ NULL,
+ GUC_UNIT_MS
+ },
+ &io_worker_idle_timeout,
+ 60 * 1000, -1, INT_MAX,
+ NULL, NULL, NULL
+ },
+
+ {
+ {"io_worker_launch_interval",
+ PGC_SIGHUP,
+ RESOURCES_IO,
+ gettext_noop("Maximum idle time between launching IO workers, for io_method=worker."),
+ NULL,
+ GUC_UNIT_MS
+ },
+ &io_worker_launch_interval,
+ 500, 0, INT_MAX,
NULL, NULL, NULL
},
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index a9d8293474a..1da6345ad7a 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -210,7 +210,10 @@
# can execute simultaneously
# -1 sets based on shared_buffers
# (change requires restart)
-#io_workers = 3 # 1-32;
+#io_min_workers = 1 # 1-32;
+#io_max_workers = 8 # 1-32;
+#io_worker_idle_timeout = 60s # min 100ms
+#io_worker_launch_interval = 500ms # min 0ms
# - Worker Processes -
diff --git a/src/include/storage/io_worker.h b/src/include/storage/io_worker.h
index 7bde7e89c8a..de9c80109e0 100644
--- a/src/include/storage/io_worker.h
+++ b/src/include/storage/io_worker.h
@@ -17,6 +17,13 @@
pg_noreturn extern void IoWorkerMain(const void *startup_data, size_t startup_data_len);
-extern PGDLLIMPORT int io_workers;
+extern PGDLLIMPORT int io_min_workers;
+extern PGDLLIMPORT int io_max_workers;
+extern PGDLLIMPORT int io_worker_idle_timeout;
+extern PGDLLIMPORT int io_worker_launch_interval;
+
+/* Interfaces visible to the postmaster. */
+extern bool pgaio_worker_test_new_worker_needed(void);
+extern bool pgaio_worker_clear_new_worker_needed(void);
#endif /* IO_WORKER_H */
diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h
index a9681738146..c1801d08833 100644
--- a/src/include/storage/lwlocklist.h
+++ b/src/include/storage/lwlocklist.h
@@ -84,3 +84,4 @@ PG_LWLOCK(50, DSMRegistry)
PG_LWLOCK(51, InjectionPoint)
PG_LWLOCK(52, SerialControl)
PG_LWLOCK(53, AioWorkerSubmissionQueue)
+PG_LWLOCK(54, AioWorkerControl)
diff --git a/src/include/storage/pmsignal.h b/src/include/storage/pmsignal.h
index 428aa3fd68a..2859a636349 100644
--- a/src/include/storage/pmsignal.h
+++ b/src/include/storage/pmsignal.h
@@ -38,6 +38,7 @@ typedef enum
PMSIGNAL_ROTATE_LOGFILE, /* send SIGUSR1 to syslogger to rotate logfile */
PMSIGNAL_START_AUTOVAC_LAUNCHER, /* start an autovacuum launcher */
PMSIGNAL_START_AUTOVAC_WORKER, /* start an autovacuum worker */
+ PMSIGNAL_IO_WORKER_CHANGE, /* IO worker pool change */
PMSIGNAL_BACKGROUND_WORKER_CHANGE, /* background worker state change */
PMSIGNAL_START_WALRECEIVER, /* start a walreceiver */
PMSIGNAL_ADVANCE_STATE_MACHINE, /* advance postmaster's state machine */
diff --git a/src/test/modules/test_aio/t/002_io_workers.pl b/src/test/modules/test_aio/t/002_io_workers.pl
index af5fae15ea7..a0252857798 100644
--- a/src/test/modules/test_aio/t/002_io_workers.pl
+++ b/src/test/modules/test_aio/t/002_io_workers.pl
@@ -14,6 +14,9 @@ $node->init();
$node->append_conf(
'postgresql.conf', qq(
io_method=worker
+io_worker_idle_timeout=0ms
+io_worker_launch_interval=0ms
+io_max_workers=32
));
$node->start();
@@ -31,7 +34,7 @@ sub test_number_of_io_workers_dynamic
{
my $node = shift;
- my $prev_worker_count = $node->safe_psql('postgres', 'SHOW io_workers');
+ my $prev_worker_count = $node->safe_psql('postgres', 'SHOW io_min_workers');
# Verify that worker count can't be set to 0
change_number_of_io_workers($node, 0, $prev_worker_count, 1);
@@ -62,23 +65,23 @@ sub change_number_of_io_workers
my ($result, $stdout, $stderr);
($result, $stdout, $stderr) =
- $node->psql('postgres', "ALTER SYSTEM SET io_workers = $worker_count");
+ $node->psql('postgres', "ALTER SYSTEM SET io_min_workers = $worker_count");
$node->safe_psql('postgres', 'SELECT pg_reload_conf()');
if ($expect_failure)
{
ok( $stderr =~
- /$worker_count is outside the valid range for parameter "io_workers"/,
- "updating number of io_workers to $worker_count failed, as expected"
+ /$worker_count is outside the valid range for parameter "io_min_workers"/,
+ "updating number of io_min_workers to $worker_count failed, as expected"
);
return $prev_worker_count;
}
else
{
- is( $node->safe_psql('postgres', 'SHOW io_workers'),
+ is( $node->safe_psql('postgres', 'SHOW io_min_workers'),
$worker_count,
- "updating number of io_workers from $prev_worker_count to $worker_count"
+ "updating number of io_min_workers from $prev_worker_count to $worker_count"
);
check_io_worker_count($node, $worker_count);
--
2.47.2
On Sat, Jul 12, 2025 at 05:08:29PM +1200, Thomas Munro wrote:
On Wed, May 28, 2025 at 5:55 AM Dmitry Dolgov <9erthalion6@gmail.com> wrote:I probably had to start with a statement that I find the current
approach reasonable, and I'm only curious if there is more to get
out of it. I haven't benchmarked the patch yet (plan getting to it
when I'll get back), and can imagine practical considerations
significantly impacting any potential solution.Here's a rebase.
Thanks. I was experimenting with this approach, and realized there isn't
much metrics exposed about workers and the IO queue so far. Since the
worker pool growth is based on the queue size and workers try to share
the load uniformly, it makes to have a system view to show those
numbers, let's say a system view for worker handles and a function to
get the current queue size? E.g. workers load in my testing was quite
varying, see "Load distribution between workers" graph, which shows a
quick profiling run including currently running io workers.
Regarding the worker pool growth approach, it sounds reasonable to me.
With static number of workers one needs to somehow find a number
suitable for all types of workload, where with this patch one needs only
to fiddle with the launch interval to handle possible spikes. It would
be interesting to investigate, how this approach would react to
different dynamics of the queue size. I've plotted one "spike" scenario
in the "Worker pool size response to queue depth", where there is a
pretty artificial burst of IO, making the queue size look like a step
function. If I understand the patch implementation correctly, it would
respond linearly over time (green line), one could also think about
applying a first order butterworth low pass filter to respond quicker
but still smooth (orange line).
But in reality the queue size would be of course much more volatile even
on stable workloads, like in "Queue depth over time" (one can see
general oscillation, as well as different modes, e.g. where data is in
the page cache vs where it isn't). Event more, there is a feedback where
increasing number of workers would accelerate queue size decrease --
based on [1]Harchol-Balter, Mor. Performance modeling and design of computer systems: queueing theory in action. Cambridge University Press, 2013. the system utilization for M/M/k depends on the arrival
rate, processing rate and number of processors, where pretty intuitively
more processors reduce utilization. But alas, as you've mentioned this
result exists for Poisson distribution only.
Btw, I assume something similar could be done to other methods as well?
I'm not up to date on io uring, can one change the ring depth on the
fly?
As a side note, I was trying to experiment with this patch using
dm-mapper's delay feature to introduce an arbitrary large io latency and
see how the io queue is growing. But strangely enough, even though the
pure io latency was high, the queue growth was smaller than e.g. on a
real hardware under the same conditions without any artificial delay. Is
there anything obvious I'm missing that could have explained that?
[1]: Harchol-Balter, Mor. Performance modeling and design of computer systems: queueing theory in action. Cambridge University Press, 2013.
systems: queueing theory in action. Cambridge University Press, 2013.
Attachments:
load.pngimage/pngDownload
�PNG
IHDR � � 5��� :tEXtSoftware Matplotlib version3.10.3, https://matplotlib.org/f%�� pHYs a a�?�i g(IDATx���y\T����0�:����n�����&�[e���Vz��-5s�Vje����u+�RK+355TpI� \�\PTe������r����g`>��k^1�<s����s���B�������R������~1 �@""""3� HDDDdf ����� ��a $"""23�DDDDf�������0 �@""""3� HDDDdf ����� ��a $"""23�DDDDf�������0 �@""""3� HDDDdf ����� ��a $"""23�DDDDf�������0 �@""""3� HDDDdf ����� ��a $"""23�DDDDf�������0 �@�z����B��������9s�P(��������g-Uxgg���B���e��i���?aoo_����P(0g��z[_���Nz0~~~<x��e�4@j��-[�B���]������d R�0��j����
�4Q}��� "z0iiiP*k�o��~�
�-�Q����EII ���������������[�/���[���%"�� 5pj��NCYee%����P(`mm
�:[��X[[7� H�����^�STTT/�!�K�D7����0` 4
�����o_��������W1m�4�k�����h40` :t��.\��!C�����2e
�����g�����������?���v��������s��EX[[���={�D||<p�?��E��}��^����G}����#00j�����`�3g� &&vvv����[o�!�4�N}o]��j�� �{����`���pww����q\�|������ �z=������X[[����=��]�f�;;v�v��!mc������|��'R�+W�@�T����`]/�������o�>���������E�^�����/^���c��� �Z���|���m�~g�W������f�����}����S����>�B�_�U�����B���;�0` ����-^�!!!P�����Fll,�������m��Err2"##akk��^{��5-_����x��Wk�����?~#G����3z�� �����1c��Y3��j4i��=���={��Cd
�Oi"