Performance issues with parallelism and LIMIT

Started by David Geieralmost 3 years ago25 messages
#1David Geier
geidav.pg@gmail.com

Hi hackers,

While migrating from PostgreSQL 14 to 15, we encountered the following
performance degradation caused by commit 46846433a03dff: "shm_mq: Update
mq_bytes_written less often", discussion in [1]/messages/by-id/CAFiTN-tVXqn_OG7tHNeSkBbN+iiCZTiQ83uakax43y1sQb2OBA@mail.gmail.com.

The batching can make queries with a LIMIT clause run significantly
slower compared to PostgreSQL 14, because neither the ring buffer write
position is updated, nor the latch to inform the leader that there's
data available is set, before a worker's queue is 1/4th full. This can
be seen in the number of rows produced by a parallel worker. Worst-case,
the data set is large and all rows to answer the query appear early, but
are not big enough to fill the queue to 1/4th (e.g. when the LIMIT and
the tuple sizes are small). Here is an example to reproduce the problem.

CREATE TABLE t(id1 INT, id2 INT, id3 INT, id4 INT, id5 INT);
INSERT INTO t(id1, id2, id3, id4, id5) SELECT i%1000, i, i, i, i FROM
generate_series(1, 10000000) AS i;
ANALYZE t;
SET parallel_tuple_cost = 0;
SET parallel_setup_cost = 0;
SET min_parallel_table_scan_size = 0;
SET max_parallel_workers_per_gather = 8;
EXPLAIN ANALYZE VERBOSE SELECT id2 FROM t WHERE id1 = 100 LIMIT 100;

PostgreSQL 15:

 Limit  (cost=0.00..797.43 rows=100 width=4) (actual
time=65.083..69.207 rows=100 loops=1)
   Output: id2
   ->  Gather  (cost=0.00..79320.18 rows=9947 width=4) (actual
time=65.073..68.417 rows=100 loops=1)
         Output: id2
         Workers Planned: 8
         Workers Launched: 7
         ->  Parallel Seq Scan on public.t (cost=0.00..79320.18
rows=1243 width=4) (actual time=0.204..33.049 rows=100 loops=7)
               Output: id2
               Filter: (t.id1 = 100)
               Rows Removed by Filter: 99345
               Worker 0:  actual time=0.334..32.284 rows=100 loops=1
               Worker 1:  actual time=0.060..32.680 rows=100 loops=1
               Worker 2:  actual time=0.637..33.954 rows=98 loops=1
               Worker 3:  actual time=0.136..33.301 rows=100 loops=1
               Worker 4:  actual time=0.140..31.942 rows=100 loops=1
               Worker 5:  actual time=0.062..33.673 rows=100 loops=1
               Worker 6:  actual time=0.062..33.512 rows=100 loops=1
 Planning Time: 0.113 ms
 Execution Time: 69.772 ms

PostgreSQL 14:

 Limit  (cost=0.00..797.75 rows=100 width=4) (actual
time=30.602..38.459 rows=100 loops=1)
   Output: id2
   ->  Gather  (cost=0.00..79320.18 rows=9943 width=4) (actual
time=30.592..37.669 rows=100 loops=1)
         Output: id2
         Workers Planned: 8
         Workers Launched: 7
         ->  Parallel Seq Scan on public.t (cost=0.00..79320.18
rows=1243 width=4) (actual time=0.221..5.181 rows=15 loops=7)
               Output: id2
               Filter: (t.id1 = 100)
               Rows Removed by Filter: 15241
               Worker 0:  actual time=0.129..4.840 rows=15 loops=1
               Worker 1:  actual time=0.125..4.924 rows=15 loops=1
               Worker 2:  actual time=0.314..5.249 rows=17 loops=1
               Worker 3:  actual time=0.252..5.341 rows=15 loops=1
               Worker 4:  actual time=0.163..5.179 rows=15 loops=1
               Worker 5:  actual time=0.422..5.248 rows=15 loops=1
               Worker 6:  actual time=0.139..5.489 rows=16 loops=1
 Planning Time: 0.084 ms
 Execution Time: 38.880 ms

I had a quick look at the code and I started wondering if we can't
achieve the same performance improvement without batching by e.g.:

- Only set the latch if new data is written to an empty queue.
Otherwise, the leader should anyways keep try reading from the queues
without waiting for the latch, so no need to set the latch again.

- Reorganize struct shm_mq. There seems to be false sharing happening
between at least mq_ring_size and the atomics and potentially also
between the atomics. I'm wondering if the that's not the root cause of
the "slow atomics" observed in [1]/messages/by-id/CAFiTN-tVXqn_OG7tHNeSkBbN+iiCZTiQ83uakax43y1sQb2OBA@mail.gmail.com? I'm happy to do some profiling.

Alternatively, we could always set the latch if numberTuples in
ExecutePlan() is reasonably low. To do so, the DestReceiver's receive()
method would only need an additional "force flush" argument.

A slightly different but related problem is when some workers have
already produced enough rows to answer the LIMIT query, but other
workers are still running without producing any new rows. In that case
the "already done" workers will stop running even though they haven't
reached 1/4th of the queue size, because the for-loop in execMain.c
bails out in the following condition:

        if (numberTuples && numberTuples == current_tuple_count)
            break;

Subsequently, the leader will end the plan and then wait in the Gather
node for all workers to shutdown. However, workers still running but not
producing any new rows will never reach the following condition in
execMain.c to check if they're supposed to stop (the shared memory queue
dest receiver will return false on detached queues):

            /*
             * If we are not able to send the tuple, we assume the
destination
             * has closed and no more tuples can be sent. If that's the
case,
             * end the loop.
             */
            if (!dest->receiveSlot(slot, dest))
                break;

Reproduction steps for this problem are below. Here the worker getting
the first table page will be done right away, but the query takes as
long as it takes to scan all pages of the entire table.

CREATE TABLE bar (col INT);
INSERT INTO bar SELECT generate_series(1, 5000000);
SET max_parallel_workers_per_gather = 8;
EXPLAIN ANALYZE VERBOSE SELECT col FROM bar WHERE col = 1 LIMIT 1;

 Limit  (cost=0.00..1.10 rows=1 width=4) (actual time=32.289..196.200
rows=1 loops=1)
   Output: col
   ->  Gather  (cost=0.00..30939.03 rows=28208 width=4) (actual
time=32.278..196.176 rows=1 loops=1)
         Output: col
         Workers Planned: 8
         Workers Launched: 7
         ->  Parallel Seq Scan on public.bar (cost=0.00..30939.03
rows=3526 width=4) (actual time=137.251..137.255 rows=0 loops=7)
               Output: col
               Filter: (bar.col = 1)
               Rows Removed by Filter: 713769
               Worker 0:  actual time=160.177..160.181 rows=0 loops=1
               Worker 1:  actual time=160.111..160.115 rows=0 loops=1
               Worker 2:  actual time=0.043..0.047 rows=1 loops=1
               Worker 3:  actual time=160.040..160.044 rows=0 loops=1
               Worker 4:  actual time=160.167..160.171 rows=0 loops=1
               Worker 5:  actual time=160.018..160.022 rows=0 loops=1
               Worker 6:  actual time=160.201..160.205 rows=0 loops=1
 Planning Time: 0.087 ms
 Execution Time: 196.247 ms

We would need something similar to CHECK_FOR_INTERRUPTS() which returns
a NULL slot if a parallel worker is supposed to stop execution (we could
e.g. check if the queue got detached). Or could we amend
CHECK_FOR_INTERRUPTS() to just stop the worker gracefully if the queue
got detached?

Jasper Smit, Spiros Agathos and Dimos Stamatakis helped working on this.

[1]: /messages/by-id/CAFiTN-tVXqn_OG7tHNeSkBbN+iiCZTiQ83uakax43y1sQb2OBA@mail.gmail.com
/messages/by-id/CAFiTN-tVXqn_OG7tHNeSkBbN+iiCZTiQ83uakax43y1sQb2OBA@mail.gmail.com

--
David Geier
(ServiceNow)

#2Tomas Vondra
tomas.vondra@enterprisedb.com
In reply to: David Geier (#1)
Re: Performance issues with parallelism and LIMIT

On 2/1/23 14:41, David Geier wrote:

Hi hackers,

While migrating from PostgreSQL 14 to 15, we encountered the following
performance degradation caused by commit 46846433a03dff: "shm_mq: Update
mq_bytes_written less often", discussion in [1].

The batching can make queries with a LIMIT clause run significantly
slower compared to PostgreSQL 14, because neither the ring buffer write
position is updated, nor the latch to inform the leader that there's
data available is set, before a worker's queue is 1/4th full. This can
be seen in the number of rows produced by a parallel worker. Worst-case,
the data set is large and all rows to answer the query appear early, but
are not big enough to fill the queue to 1/4th (e.g. when the LIMIT and
the tuple sizes are small). Here is an example to reproduce the problem.

Yeah, this is a pretty annoying regression. We already can hit poor
behavior when matching rows are not distributed uniformly in the tables
(which is what LIMIT costing assumes), and this makes it more likely to
hit similar issues. A bit like when doing many HTTP requests makes it
more likely to hit at least one 99% outlier.

...

I had a quick look at the code and I started wondering if we can't
achieve the same performance improvement without batching by e.g.:

- Only set the latch if new data is written to an empty queue.
Otherwise, the leader should anyways keep try reading from the queues
without waiting for the latch, so no need to set the latch again.

- Reorganize struct shm_mq. There seems to be false sharing happening
between at least mq_ring_size and the atomics and potentially also
between the atomics. I'm wondering if the that's not the root cause of
the "slow atomics" observed in [1]? I'm happy to do some profiling.

Alternatively, we could always set the latch if numberTuples in
ExecutePlan() is reasonably low. To do so, the DestReceiver's receive()
method would only need an additional "force flush" argument.

No opinion on these options, but worth a try. Alternatively, we could
try the usual doubling approach - start with a low threshold (and set
the latch frequently), and then gradually increase it up to the 1/4.

That should work both for queries expecting only few rows and those
producing a lot of data.

A slightly different but related problem is when some workers have
already produced enough rows to answer the LIMIT query, but other
workers are still running without producing any new rows. In that case
the "already done" workers will stop running even though they haven't
reached 1/4th of the queue size, because the for-loop in execMain.c
bails out in the following condition:

        if (numberTuples && numberTuples == current_tuple_count)
            break;

Subsequently, the leader will end the plan and then wait in the Gather
node for all workers to shutdown. However, workers still running but not
producing any new rows will never reach the following condition in
execMain.c to check if they're supposed to stop (the shared memory queue
dest receiver will return false on detached queues):

            /*
             * If we are not able to send the tuple, we assume the
destination
             * has closed and no more tuples can be sent. If that's the
case,
             * end the loop.
             */
            if (!dest->receiveSlot(slot, dest))
                break;

Reproduction steps for this problem are below. Here the worker getting
the first table page will be done right away, but the query takes as
long as it takes to scan all pages of the entire table.

Ouch!

...

We would need something similar to CHECK_FOR_INTERRUPTS() which returns
a NULL slot if a parallel worker is supposed to stop execution (we could
e.g. check if the queue got detached). Or could we amend
CHECK_FOR_INTERRUPTS() to just stop the worker gracefully if the queue
got detached?

That sounds reasonable, but I'm not very familiar the leader-worker
communication, so no opinion on how it should be done.

regards

--
Tomas Vondra
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

#3David Geier
geidav.pg@gmail.com
In reply to: Tomas Vondra (#2)
Re: Performance issues with parallelism and LIMIT

Hi,

On 2/8/23 11:42, Tomas Vondra wrote:

On 2/1/23 14:41, David Geier wrote:

Yeah, this is a pretty annoying regression. We already can hit poor
behavior when matching rows are not distributed uniformly in the tables
(which is what LIMIT costing assumes), and this makes it more likely to
hit similar issues. A bit like when doing many HTTP requests makes it
more likely to hit at least one 99% outlier.

Are you talking about the use of ordering vs filtering indexes in
queries where there's both an ORDER BY and a filter present (e.g. using
an ordering index but then all rows passing the filter are at the end of
the table)? If not, can you elaborate a bit more on that and maybe give
an example.

No opinion on these options, but worth a try. Alternatively, we could
try the usual doubling approach - start with a low threshold (and set
the latch frequently), and then gradually increase it up to the 1/4.

That should work both for queries expecting only few rows and those
producing a lot of data.

I was thinking about this variant as well. One more alternative would be
latching the leader once a worker has produced 1/Nth of the LIMIT where
N is the number of workers. Both variants have the disadvantage that
there are still corner cases where the latch is set too late; but it
would for sure be much better than what we have today.

I also did some profiling and - at least on my development laptop with 8
physical cores - the original example, motivating the batching change is
slower than when it's disabled by commenting out:

    if (force_flush || mqh->mqh_send_pending > (mq->mq_ring_size >> 2))

SET parallel_tuple_cost TO 0;
CREATE TABLE b (a int);
INSERT INTO b SELECT generate_series(1, 200000000);
ANALYZE b;
EXPLAIN (ANALYZE, TIMING OFF) SELECT * FROM b;

 Gather  (cost=1000.00..1200284.61 rows=200375424 width=4) (actual
rows=200000000 loops=1)
   Workers Planned: 7
   Workers Launched: 7
   ->  Parallel Seq Scan on b  (cost=0.00..1199284.61 rows=28625061
width=4) (actual rows=25000000 loops=8)

Always latch: 19055 ms
Batching:     19575 ms

If I find some time, I'll play around a bit more and maybe propose a patch.

...

We would need something similar to CHECK_FOR_INTERRUPTS() which returns
a NULL slot if a parallel worker is supposed to stop execution (we could
e.g. check if the queue got detached). Or could we amend
CHECK_FOR_INTERRUPTS() to just stop the worker gracefully if the queue
got detached?

That sounds reasonable, but I'm not very familiar the leader-worker
communication, so no opinion on how it should be done.

I think an extra macro that needs to be called from dozens of places to
check if parallel execution is supposed to end is the least preferred
approach. I'll read up more on how CHECK_FOR_INTERRUPTS() works and if
we cannot actively signal the workers that they should stop.

--
David Geier
(ServiceNow)

#4Tomas Vondra
tomas.vondra@enterprisedb.com
In reply to: David Geier (#3)
Re: Performance issues with parallelism and LIMIT

On 2/20/23 19:18, David Geier wrote:

Hi,

On 2/8/23 11:42, Tomas Vondra wrote:

On 2/1/23 14:41, David Geier wrote:

Yeah, this is a pretty annoying regression. We already can hit poor
behavior when matching rows are not distributed uniformly in the tables
(which is what LIMIT costing assumes), and this makes it more likely to
hit similar issues. A bit like when doing many HTTP requests makes it
more likely to hit at least one 99% outlier.

Are you talking about the use of ordering vs filtering indexes in
queries where there's both an ORDER BY and a filter present (e.g. using
an ordering index but then all rows passing the filter are at the end of
the table)? If not, can you elaborate a bit more on that and maybe give
an example.

Yeah, roughly. I don't think the explicit ORDER BY is a requirement for
this to happen - it's enough when the part of the plan below LIMIT
produces many rows, but the matching rows are at the end.

No opinion on these options, but worth a try. Alternatively, we could
try the usual doubling approach - start with a low threshold (and set
the latch frequently), and then gradually increase it up to the 1/4.

That should work both for queries expecting only few rows and those
producing a lot of data.

I was thinking about this variant as well. One more alternative would be
latching the leader once a worker has produced 1/Nth of the LIMIT where
N is the number of workers. Both variants have the disadvantage that
there are still corner cases where the latch is set too late; but it
would for sure be much better than what we have today.

I also did some profiling and - at least on my development laptop with 8
physical cores - the original example, motivating the batching change is
slower than when it's disabled by commenting out:

    if (force_flush || mqh->mqh_send_pending > (mq->mq_ring_size >> 2))

SET parallel_tuple_cost TO 0;
CREATE TABLE b (a int);
INSERT INTO b SELECT generate_series(1, 200000000);
ANALYZE b;
EXPLAIN (ANALYZE, TIMING OFF) SELECT * FROM b;

 Gather  (cost=1000.00..1200284.61 rows=200375424 width=4) (actual
rows=200000000 loops=1)
   Workers Planned: 7
   Workers Launched: 7
   ->  Parallel Seq Scan on b  (cost=0.00..1199284.61 rows=28625061
width=4) (actual rows=25000000 loops=8)

Always latch: 19055 ms
Batching:     19575 ms

If I find some time, I'll play around a bit more and maybe propose a patch.

OK. Once you have a WIP patch maybe share it and I'll try to do some
profiling too.

...

We would need something similar to CHECK_FOR_INTERRUPTS() which returns
a NULL slot if a parallel worker is supposed to stop execution (we could
e.g. check if the queue got detached). Or could we amend
CHECK_FOR_INTERRUPTS() to just stop the worker gracefully if the queue
got detached?

That sounds reasonable, but I'm not very familiar the leader-worker
communication, so no opinion on how it should be done.

I think an extra macro that needs to be called from dozens of places to
check if parallel execution is supposed to end is the least preferred
approach. I'll read up more on how CHECK_FOR_INTERRUPTS() works and if
we cannot actively signal the workers that they should stop.

IMHO if this requires adding another macro to a bunch of ad hoc places
is rather inconvenient. It'd be much better to fix this in a localized
manner (especially as it seems related to a fairly specific place).

regards

--
Tomas Vondra
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

#5Tomas Vondra
tomas.vondra@enterprisedb.com
In reply to: Tomas Vondra (#4)
Re: Performance issues with parallelism and LIMIT

On 2/22/23 13:22, Tomas Vondra wrote:

...

No opinion on these options, but worth a try. Alternatively, we could
try the usual doubling approach - start with a low threshold (and set
the latch frequently), and then gradually increase it up to the 1/4.

That should work both for queries expecting only few rows and those
producing a lot of data.

I was thinking about this variant as well. One more alternative would be
latching the leader once a worker has produced 1/Nth of the LIMIT where
N is the number of workers. Both variants have the disadvantage that
there are still corner cases where the latch is set too late; but it
would for sure be much better than what we have today.

I also did some profiling and - at least on my development laptop with 8
physical cores - the original example, motivating the batching change is
slower than when it's disabled by commenting out:

    if (force_flush || mqh->mqh_send_pending > (mq->mq_ring_size >> 2))

SET parallel_tuple_cost TO 0;
CREATE TABLE b (a int);
INSERT INTO b SELECT generate_series(1, 200000000);
ANALYZE b;
EXPLAIN (ANALYZE, TIMING OFF) SELECT * FROM b;

 Gather  (cost=1000.00..1200284.61 rows=200375424 width=4) (actual
rows=200000000 loops=1)
   Workers Planned: 7
   Workers Launched: 7
   ->  Parallel Seq Scan on b  (cost=0.00..1199284.61 rows=28625061
width=4) (actual rows=25000000 loops=8)

Always latch: 19055 ms
Batching:     19575 ms

If I find some time, I'll play around a bit more and maybe propose a patch.

OK. Once you have a WIP patch maybe share it and I'll try to do some
profiling too.

...

We would need something similar to CHECK_FOR_INTERRUPTS() which returns
a NULL slot if a parallel worker is supposed to stop execution (we could
e.g. check if the queue got detached). Or could we amend
CHECK_FOR_INTERRUPTS() to just stop the worker gracefully if the queue
got detached?

That sounds reasonable, but I'm not very familiar the leader-worker
communication, so no opinion on how it should be done.

I think an extra macro that needs to be called from dozens of places to
check if parallel execution is supposed to end is the least preferred
approach. I'll read up more on how CHECK_FOR_INTERRUPTS() works and if
we cannot actively signal the workers that they should stop.

IMHO if this requires adding another macro to a bunch of ad hoc places
is rather inconvenient. It'd be much better to fix this in a localized
manner (especially as it seems related to a fairly specific place).

David, do you still plan to try fixing these issues? I have a feeling
those issues may be fairly common but often undetected, or just brushed
of as "slow query" (AFAICS it was only caught thanks to comparing
timings before/after upgrade). Would be great to improve this.

regards

--
Tomas Vondra
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

#6David Geier
geidav.pg@gmail.com
In reply to: Tomas Vondra (#5)
1 attachment(s)
Re: Performance issues with parallelism and LIMIT

Hi Tomas!

I've finally got time again to work on PostgreSQL.

On 03.11.2023 21:48, Tomas Vondra wrote:

On 2/22/23 13:22, Tomas Vondra wrote:

...

No opinion on these options, but worth a try. Alternatively, we could
try the usual doubling approach - start with a low threshold (and set
the latch frequently), and then gradually increase it up to the 1/4.

That should work both for queries expecting only few rows and those
producing a lot of data.

I was thinking about this variant as well. One more alternative would be
latching the leader once a worker has produced 1/Nth of the LIMIT where
N is the number of workers. Both variants have the disadvantage that
there are still corner cases where the latch is set too late; but it
would for sure be much better than what we have today.

Or always latching when a LIMIT is present. When a LIMIT is present,
it's much more likely that the latency hurts than that it doesn't.

I also did some profiling and - at least on my development laptop with 8
physical cores - the original example, motivating the batching change is
slower than when it's disabled by commenting out:

    if (force_flush || mqh->mqh_send_pending > (mq->mq_ring_size >> 2))

SET parallel_tuple_cost TO 0;
CREATE TABLE b (a int);
INSERT INTO b SELECT generate_series(1, 200000000);
ANALYZE b;
EXPLAIN (ANALYZE, TIMING OFF) SELECT * FROM b;

 Gather  (cost=1000.00..1200284.61 rows=200375424 width=4) (actual
rows=200000000 loops=1)
   Workers Planned: 7
   Workers Launched: 7
   ->  Parallel Seq Scan on b  (cost=0.00..1199284.61 rows=28625061
width=4) (actual rows=25000000 loops=8)

Always latch: 19055 ms
Batching:     19575 ms

If I find some time, I'll play around a bit more and maybe propose a patch.

I've also remeasured the shared memory latching with and without the
1/4th check using the original example from [1]/messages/by-id/CAFiTN-tVXqn_OG7tHNeSkBbN+iiCZTiQ83uakax43y1sQb2OBA@mail.gmail.com. Apart from the code
line mentioned by you, I also commented out the check on the consumer side:

if (mqh->mqh_consume_pending > mq->mq_ring_size / 4)

On my dev laptop (i9-13950HX) the runtimes are pretty much the same with
8 workers (16-17 seconds with some variance). It would be great to
understand when this truly helps, if at all, to see if we need some
smartness to latch the consumer or if we can just remove the 1/4th check.

If this turns out to be more involved we could also move this discussion
into a separate thread and have this thread focus on stopping the
parallel workers early, see below.

OK. Once you have a WIP patch maybe share it and I'll try to do some
profiling too.

...

We would need something similar to CHECK_FOR_INTERRUPTS() which returns
a NULL slot if a parallel worker is supposed to stop execution (we could
e.g. check if the queue got detached). Or could we amend
CHECK_FOR_INTERRUPTS() to just stop the worker gracefully if the queue
got detached?

That sounds reasonable, but I'm not very familiar the leader-worker
communication, so no opinion on how it should be done.

I think an extra macro that needs to be called from dozens of places to
check if parallel execution is supposed to end is the least preferred
approach. I'll read up more on how CHECK_FOR_INTERRUPTS() works and if
we cannot actively signal the workers that they should stop.

IMHO if this requires adding another macro to a bunch of ad hoc places
is rather inconvenient. It'd be much better to fix this in a localized
manner (especially as it seems related to a fairly specific place).

I've written up a draft patch that instructs workers to stop, once the
leader has gotten enough rows according to the LIMIT clause. I'm using
SendProcSignal() to inform the workers to take action and stop executing
ExecutePlan(). I've implemented the stopping via sigsetjmp. I cannot see
a good way of doing this differently which is not much more intrusive.
The patch is incomplete (comments, support for Gather Merge, more
testing, etc.) but I'm mostly interested at this point if the overall
approach is deemed fine.

I first tried to use TerminateBackgroundWorker() but postmaster.c then
logs the worker termination and also some of the cleanup code needed for
proper instrumentation doesn't run any longer in the parallel workers.

With the patch applied, the query from the first mail of this thread
runs in a few milliseconds. That it still takes that long is because
forking, plan (de-)serialization and remaining initialization are fairly
heavy weight. With threads, the "fork" time would already much lower and
no (de-)serialization would be necessary. In the process-based
architecture it would be interesting to think about adding a parallel
worker pool.

David, do you still plan to try fixing these issues? I have a feeling
those issues may be fairly common but often undetected, or just brushed
of as "slow query" (AFAICS it was only caught thanks to comparing
timings before/after upgrade). Would be great to improve this.

I completely agree. And while they look like corner cases, if the
workload is diverse enough they will be encountered (both findings are
from the field). If it's then a query that runs frequently enough it
causes a real issue that is hard to be diagnosed by the DBA.

[1]: /messages/by-id/CAFiTN-tVXqn_OG7tHNeSkBbN+iiCZTiQ83uakax43y1sQb2OBA@mail.gmail.com
/messages/by-id/CAFiTN-tVXqn_OG7tHNeSkBbN+iiCZTiQ83uakax43y1sQb2OBA@mail.gmail.com

--
David Geier

Attachments:

0001-Parallel-workers-stop-quicker.patchtext/x-patch; charset=UTF-8; name=0001-Parallel-workers-stop-quicker.patchDownload
From 9553f1ec5885c9e2a22a0eafa4f4b489f13d22ae Mon Sep 17 00:00:00 2001
From: David Geier <geidav.pg@gmail.com>
Date: Tue, 2 Sep 2025 13:24:45 +0200
Subject: [PATCH] Parallel workers stop quicker

---
 src/backend/executor/execParallel.c  | 51 ++++++++++++++++++++++++----
 src/backend/executor/nodeGather.c    | 36 ++++++++++++++++++++
 src/backend/storage/ipc/procsignal.c |  4 +++
 src/backend/tcop/postgres.c          |  4 +++
 src/include/executor/execParallel.h  |  5 +++
 src/include/nodes/execnodes.h        |  1 +
 src/include/storage/procsignal.h     |  1 +
 7 files changed, 95 insertions(+), 7 deletions(-)

diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index f098a5557cf..3b0993fd9d6 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -1410,6 +1410,32 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
 								 pwcxt);
 }
 
+volatile sig_atomic_t	ParallelStopPending = false;
+sigjmp_buf *			parallel_sigjmp_buf = NULL;
+volatile bool			got_stopped = false;
+
+void HandleParallelStop(void)
+{
+	InterruptPending = true;
+	ParallelStopPending = true;
+	SetLatch(MyLatch);	
+}
+
+void ProcessParallelStop(void)
+{
+	ParallelStopPending = false;
+	got_stopped = true;
+
+	/*
+	 * Only allow siglongjmp if we are executing the plan.
+	 * Otherwise, we might jump back right after ExecutePlan() even
+	 * though we are not yet executing the plan or we're already done.
+	 */
+	if (parallel_sigjmp_buf != NULL)
+		siglongjmp(*parallel_sigjmp_buf, 1);
+}
+
+
 /*
  * Main entrypoint for parallel query worker processes.
  *
@@ -1440,6 +1466,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
 	void	   *area_space;
 	dsa_area   *area;
 	ParallelWorkerContext pwcxt;
+	sigjmp_buf	local_sigjmp_buf;
 
 	/* Get fixed-size state. */
 	fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
@@ -1492,13 +1519,23 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
 	 */
 	InstrStartParallelQuery();
 
-	/*
-	 * Run the plan.  If we specified a tuple bound, be careful not to demand
-	 * more tuples than that.
-	 */
-	ExecutorRun(queryDesc,
-				ForwardScanDirection,
-				fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed);
+	if (!got_stopped)
+	{
+		if (sigsetjmp(local_sigjmp_buf, 1) == 0)
+		{
+			parallel_sigjmp_buf = &local_sigjmp_buf;
+
+			/*
+			* Run the plan.  If we specified a tuple bound, be careful not to demand
+			* more tuples than that.
+			*/
+			ExecutorRun(queryDesc,
+						ForwardScanDirection,
+						fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed);
+		}
+	}
+
+	parallel_sigjmp_buf = NULL;
 
 	/* Shut down the executor */
 	ExecutorFinish(queryDesc);
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index dc7d1830259..31745e86ced 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -36,6 +36,7 @@
 #include "executor/tqueue.h"
 #include "miscadmin.h"
 #include "optimizer/optimizer.h"
+#include "storage/procsignal.h"
 #include "utils/wait_event.h"
 
 
@@ -71,6 +72,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
 	gatherstate->need_to_scan_locally =
 		!node->single_copy && parallel_leader_participation;
 	gatherstate->tuples_needed = -1;
+	gatherstate->tuples_produced = 0;
 
 	/*
 	 * Miscellaneous initialization
@@ -126,6 +128,36 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
 	return gatherstate;
 }
 
+/* ----------------------------------------------------------------
+ *		Workers only stop when they themselves reach the LIMIT.
+ * 		They don't stop if other workers in total produced already
+ * 		enough rows to reach the LIMIT. Hence, we need to stop them
+ * 		explicitly.
+ * ----------------------------------------------------------------
+ */
+static void
+StopWorkersIfLimitReached(GatherState *node)
+{
+	if (node->tuples_needed != -1 && node->tuples_produced == node->tuples_needed)
+	{
+		if (node->pei != NULL)
+		{
+			ParallelContext *pcxt = node->pei->pcxt;
+			int i;
+
+			if (pcxt->worker != NULL)
+			{
+				for (i = 0; i < pcxt->nworkers_launched; ++i)
+				{
+					pid_t pid;
+					GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle, &pid);
+					SendProcSignal(pid, PROCSIG_PARALLEL_STOP, INVALID_PROC_NUMBER);
+				}
+			}
+		}
+	}
+}
+
 /* ----------------------------------------------------------------
  *		ExecGather(node)
  *
@@ -212,6 +244,7 @@ ExecGather(PlanState *pstate)
 		/* Run plan locally if no workers or enabled and not single-copy. */
 		node->need_to_scan_locally = (node->nreaders == 0)
 			|| (!gather->single_copy && parallel_leader_participation);
+		node->tuples_produced = 0;
 		node->initialized = true;
 	}
 
@@ -230,6 +263,9 @@ ExecGather(PlanState *pstate)
 	if (TupIsNull(slot))
 		return NULL;
 
+	node->tuples_produced++;
+	StopWorkersIfLimitReached(node);
+
 	/* If no projection is required, we're done. */
 	if (node->ps.ps_ProjInfo == NULL)
 		return slot;
diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c
index 087821311cc..8f99ecebe2f 100644
--- a/src/backend/storage/ipc/procsignal.c
+++ b/src/backend/storage/ipc/procsignal.c
@@ -19,6 +19,7 @@
 
 #include "access/parallel.h"
 #include "commands/async.h"
+#include "executor/execParallel.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "port/pg_bitutils.h"
@@ -694,6 +695,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
 	if (CheckProcSignal(PROCSIG_PARALLEL_APPLY_MESSAGE))
 		HandleParallelApplyMessageInterrupt();
 
+	if (CheckProcSignal(PROCSIG_PARALLEL_STOP))
+		HandleParallelStop();
+
 	if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_DATABASE))
 		HandleRecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_DATABASE);
 
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 0cecd464902..5b320e52b94 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -39,6 +39,7 @@
 #include "commands/event_trigger.h"
 #include "commands/prepare.h"
 #include "common/pg_prng.h"
+#include "executor/execParallel.h"
 #include "jit/jit.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
@@ -3536,6 +3537,9 @@ ProcessInterrupts(void)
 
 	if (ParallelApplyMessagePending)
 		ProcessParallelApplyMessages();
+
+	if (ParallelStopPending)
+		ProcessParallelStop();
 }
 
 /*
diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h
index 5e7106c397a..9e0be350694 100644
--- a/src/include/executor/execParallel.h
+++ b/src/include/executor/execParallel.h
@@ -37,6 +37,11 @@ typedef struct ParallelExecutorInfo
 	struct TupleQueueReader **reader;	/* tuple reader/writer support */
 } ParallelExecutorInfo;
 
+extern PGDLLIMPORT volatile sig_atomic_t ParallelStopPending;
+
+extern void HandleParallelStop(void);
+extern void ProcessParallelStop(void);
+
 extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate,
 												  EState *estate, Bitmapset *sendParams, int nworkers,
 												  int64 tuples_needed);
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index de782014b2d..63962aebcd2 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -2740,6 +2740,7 @@ typedef struct GatherState
 	bool		initialized;	/* workers launched? */
 	bool		need_to_scan_locally;	/* need to read from local plan? */
 	int64		tuples_needed;	/* tuple bound, see ExecSetTupleBound */
+	int64 		tuples_produced;	/* tuples already produced */
 	/* these fields are set up once: */
 	TupleTableSlot *funnel_slot;
 	struct ParallelExecutorInfo *pei;
diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h
index afeeb1ca019..f7f4ee85154 100644
--- a/src/include/storage/procsignal.h
+++ b/src/include/storage/procsignal.h
@@ -36,6 +36,7 @@ typedef enum
 	PROCSIG_BARRIER,			/* global barrier interrupt  */
 	PROCSIG_LOG_MEMORY_CONTEXT, /* ask backend to log the memory contexts */
 	PROCSIG_PARALLEL_APPLY_MESSAGE, /* Message from parallel apply workers */
+	PROCSIG_PARALLEL_STOP, /* Instruct parallel worker to stop */
 
 	/* Recovery conflict reasons */
 	PROCSIG_RECOVERY_CONFLICT_FIRST,
-- 
2.43.0

#7Tomas Vondra
tomas@vondra.me
In reply to: David Geier (#6)
Re: Performance issues with parallelism and LIMIT

Hi David,

Sorry for not responding to this thread earlier.

On 9/2/25 13:38, David Geier wrote:

Hi Tomas!

I've finally got time again to work on PostgreSQL.

On 03.11.2023 21:48, Tomas Vondra wrote:

On 2/22/23 13:22, Tomas Vondra wrote:

...

No opinion on these options, but worth a try. Alternatively, we could
try the usual doubling approach - start with a low threshold (and set
the latch frequently), and then gradually increase it up to the 1/4.

That should work both for queries expecting only few rows and those
producing a lot of data.

I was thinking about this variant as well. One more alternative would be
latching the leader once a worker has produced 1/Nth of the LIMIT where
N is the number of workers. Both variants have the disadvantage that
there are still corner cases where the latch is set too late; but it
would for sure be much better than what we have today.

Or always latching when a LIMIT is present. When a LIMIT is present,
it's much more likely that the latency hurts than that it doesn't.

I think something like this is probably the way to go ...

I also did some profiling and - at least on my development laptop with 8
physical cores - the original example, motivating the batching change is
slower than when it's disabled by commenting out:

    if (force_flush || mqh->mqh_send_pending > (mq->mq_ring_size >> 2))

SET parallel_tuple_cost TO 0;
CREATE TABLE b (a int);
INSERT INTO b SELECT generate_series(1, 200000000);
ANALYZE b;
EXPLAIN (ANALYZE, TIMING OFF) SELECT * FROM b;

 Gather  (cost=1000.00..1200284.61 rows=200375424 width=4) (actual
rows=200000000 loops=1)
   Workers Planned: 7
   Workers Launched: 7
   ->  Parallel Seq Scan on b  (cost=0.00..1199284.61 rows=28625061
width=4) (actual rows=25000000 loops=8)

Always latch: 19055 ms
Batching:     19575 ms

If I find some time, I'll play around a bit more and maybe propose a patch.

I've also remeasured the shared memory latching with and without the
1/4th check using the original example from [1]. Apart from the code
line mentioned by you, I also commented out the check on the consumer side:

if (mqh->mqh_consume_pending > mq->mq_ring_size / 4)

On my dev laptop (i9-13950HX) the runtimes are pretty much the same with
8 workers (16-17 seconds with some variance). It would be great to
understand when this truly helps, if at all, to see if we need some
smartness to latch the consumer or if we can just remove the 1/4th check.

If this turns out to be more involved we could also move this discussion
into a separate thread and have this thread focus on stopping the
parallel workers early, see below.

Yeah. It's a good question if this is really necessary. The number of
signals we can send between processes is hot huge (like ~200k/s per
process). I'd guess it might matter for data sets that fit into shared
buffers, with very narrow rows. Or something like that.

But as you say, better to move that discussion into a separate thread.

OK. Once you have a WIP patch maybe share it and I'll try to do some
profiling too.

...

We would need something similar to CHECK_FOR_INTERRUPTS() which returns
a NULL slot if a parallel worker is supposed to stop execution (we could
e.g. check if the queue got detached). Or could we amend
CHECK_FOR_INTERRUPTS() to just stop the worker gracefully if the queue
got detached?

That sounds reasonable, but I'm not very familiar the leader-worker
communication, so no opinion on how it should be done.

I think an extra macro that needs to be called from dozens of places to
check if parallel execution is supposed to end is the least preferred
approach. I'll read up more on how CHECK_FOR_INTERRUPTS() works and if
we cannot actively signal the workers that they should stop.

IMHO if this requires adding another macro to a bunch of ad hoc places
is rather inconvenient. It'd be much better to fix this in a localized
manner (especially as it seems related to a fairly specific place).

I've written up a draft patch that instructs workers to stop, once the
leader has gotten enough rows according to the LIMIT clause. I'm using
SendProcSignal() to inform the workers to take action and stop executing
ExecutePlan(). I've implemented the stopping via sigsetjmp. I cannot see
a good way of doing this differently which is not much more intrusive.
The patch is incomplete (comments, support for Gather Merge, more
testing, etc.) but I'm mostly interested at this point if the overall
approach is deemed fine.

I first tried to use TerminateBackgroundWorker() but postmaster.c then
logs the worker termination and also some of the cleanup code needed for
proper instrumentation doesn't run any longer in the parallel workers.

With the patch applied, the query from the first mail of this thread
runs in a few milliseconds. That it still takes that long is because
forking, plan (de-)serialization and remaining initialization are fairly
heavy weight. With threads, the "fork" time would already much lower and
no (de-)serialization would be necessary. In the process-based
architecture it would be interesting to think about adding a parallel
worker pool.

Unfortunately, I don't think this patch is the way to go. When I apply
it, I get:

ERROR: InstrEndLoop called on running node
CONTEXT: parallel worker

And I very much doubt inventing a new ad hoc way to signal workers is
the right solution (even if there wasn't the InstrEndLoop issue).

What I think we should do is much simpler - make the threshold in shm_mq
dynamic, start with a very low value and gradually ramp up (up to 1/4).
So we'd have

if (mqh->mqh_consume_pending > threshold)

We might start with

threshold = (mq->mq_ring_size / 1024)

or maybe some fixed value, list

thredhold = 128

And on every signal we'd double it, capping it to 1/4 of mq_ring_size.

threshold = Min(threshold * 2, mq->mq_ring_size / 1024);

This is very similar to other places doing this gradual ramp up, like in
the prefetching / read_stream, etc. It allows fast termination for low
LIMIT values, but quickly amortizes the cost for high LIMIT values.

regards

--
Tomas Vondra

#8Tomas Vondra
tomas@vondra.me
In reply to: Tomas Vondra (#7)
2 attachment(s)
Re: Performance issues with parallelism and LIMIT

On 11/13/25 23:36, Tomas Vondra wrote:

...

What I think we should do is much simpler - make the threshold in shm_mq
dynamic, start with a very low value and gradually ramp up (up to 1/4).
So we'd have

if (mqh->mqh_consume_pending > threshold)

We might start with

threshold = (mq->mq_ring_size / 1024)

or maybe some fixed value, list

thredhold = 128

And on every signal we'd double it, capping it to 1/4 of mq_ring_size.

threshold = Min(threshold * 2, mq->mq_ring_size / 1024);

This is very similar to other places doing this gradual ramp up, like in
the prefetching / read_stream, etc. It allows fast termination for low
LIMIT values, but quickly amortizes the cost for high LIMIT values.

I gave this a try today, to see if it can actually solve the regression.
Attached is a WIP patch, and a set of benchmarking scripts. On my ryzen
machine I got this (timings of the queries):

fill dataset | 14 15 16 17 18 patched
-----------------------------------------------------------------
10 random | 64.1 319.3 328.7 340.5 344.3 79.5
sequential | 54.6 323.4 347.5 350.5 399.2 78.3
100 random | 11.8 42.9 42.3 42.3 68.5 18.6
sequential | 10.0 44.3 45.0 44.3 60.6 20.0

Clearly 15 is a significant regression, with timings ~4x higher. And the
patch improves that quite a bit. It's not down all the way back to 14,
there's still ~10ms regression, for some reason.

Also, I didn't measure if this patch causes some other regressions for
other queries. I don't think it does, but maybe there's some weird
corner case affected.

regards

--
Tomas Vondra

Attachments:

scripts.tgzapplication/x-compressed-tar; name=scripts.tgzDownload
mq-threshold-wip.patchtext/x-patch; charset=UTF-8; name=mq-threshold-wip.patchDownload
diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c
index 2c79a649f46..71733612fb5 100644
--- a/src/backend/storage/ipc/shm_mq.c
+++ b/src/backend/storage/ipc/shm_mq.c
@@ -142,7 +142,9 @@ struct shm_mq_handle
 	char	   *mqh_buffer;
 	Size		mqh_buflen;
 	Size		mqh_consume_pending;
+	Size		mqh_consume_threshold;
 	Size		mqh_send_pending;
+	Size		mqh_send_threshold;
 	Size		mqh_partial_bytes;
 	Size		mqh_expected_bytes;
 	bool		mqh_length_word_complete;
@@ -305,6 +307,10 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
 	mqh->mqh_counterparty_attached = false;
 	mqh->mqh_context = CurrentMemoryContext;
 
+	/* start at 64B, then gradually ramp up */
+	mqh->mqh_consume_threshold = Min(64, mq->mq_ring_size / 4);
+	mqh->mqh_send_threshold = Min(64, mq->mq_ring_size / 4);
+
 	if (seg != NULL)
 		on_dsm_detach(seg, shm_mq_detach_callback, PointerGetDatum(mq));
 
@@ -535,12 +541,16 @@ shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait,
 	 * 1/4 of the ring size, mark it as written in shared memory and notify
 	 * the receiver.
 	 */
-	if (force_flush || mqh->mqh_send_pending > (mq->mq_ring_size >> 2))
+	if (force_flush || mqh->mqh_send_pending > mqh->mqh_send_threshold)
 	{
 		shm_mq_inc_bytes_written(mq, mqh->mqh_send_pending);
 		if (receiver != NULL)
 			SetLatch(&receiver->procLatch);
 		mqh->mqh_send_pending = 0;
+
+		/* ramp up, up to (mq_ring_size / 4) */
+		mqh->mqh_send_threshold = Min(mqh->mqh_send_threshold * 2,
+									  mq->mq_ring_size / 4);
 	}
 
 	return SHM_MQ_SUCCESS;
@@ -622,10 +632,14 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
 	 * because SetLatch() is fairly expensive and we don't want to do it too
 	 * often.
 	 */
-	if (mqh->mqh_consume_pending > mq->mq_ring_size / 4)
+	if (mqh->mqh_consume_pending > mqh->mqh_consume_threshold)
 	{
 		shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
 		mqh->mqh_consume_pending = 0;
+
+		/* ramp up, up to (mq_ring_size / 4) */
+		mqh->mqh_consume_threshold = Min(mqh->mqh_consume_threshold * 2,
+										 mq->mq_ring_size / 4);
 	}
 
 	/* Try to read, or finish reading, the length word from the buffer. */
#9David Geier
geidav.pg@gmail.com
In reply to: Tomas Vondra (#7)
Re: Performance issues with parallelism and LIMIT

Hi Tomas!

On 13.11.2025 23:36, Tomas Vondra wrote:

Sorry for not responding to this thread earlier.

No worries. Thanks for looking at it!

IMHO if this requires adding another macro to a bunch of ad hoc places
is rather inconvenient. It'd be much better to fix this in a localized
manner (especially as it seems related to a fairly specific place).

I've written up a draft patch that instructs workers to stop, once the
leader has gotten enough rows according to the LIMIT clause. I'm using
SendProcSignal() to inform the workers to take action and stop executing
ExecutePlan(). I've implemented the stopping via sigsetjmp. I cannot see
a good way of doing this differently which is not much more intrusive.
The patch is incomplete (comments, support for Gather Merge, more
testing, etc.) but I'm mostly interested at this point if the overall
approach is deemed fine.

I first tried to use TerminateBackgroundWorker() but postmaster.c then
logs the worker termination and also some of the cleanup code needed for
proper instrumentation doesn't run any longer in the parallel workers.

With the patch applied, the query from the first mail of this thread
runs in a few milliseconds. That it still takes that long is because
forking, plan (de-)serialization and remaining initialization are fairly
heavy weight. With threads, the "fork" time would already much lower and
no (de-)serialization would be necessary. In the process-based
architecture it would be interesting to think about adding a parallel
worker pool.

Unfortunately, I don't think this patch is the way to go. When I apply
it, I get:

ERROR: InstrEndLoop called on running node
CONTEXT: parallel worker

Ooops. That can likely be fixed.

And I very much doubt inventing a new ad hoc way to signal workers is
the right solution (even if there wasn't the InstrEndLoop issue).

What I think we should do is much simpler - make the threshold in shm_mq
dynamic, start with a very low value and gradually ramp up (up to 1/4).
So we'd have

if (mqh->mqh_consume_pending > threshold)

We might start with

threshold = (mq->mq_ring_size / 1024)

or maybe some fixed value, list

thredhold = 128

And on every signal we'd double it, capping it to 1/4 of mq_ring_size.

threshold = Min(threshold * 2, mq->mq_ring_size / 1024);

This is very similar to other places doing this gradual ramp up, like in
the prefetching / read_stream, etc. It allows fast termination for low
LIMIT values, but quickly amortizes the cost for high LIMIT values.

That's a different problem though, isn't it? The original thread
contained two problems: (1) signaling the queue to late and (2) workers
stopping to late in the presence of LIMIT if they're not finding any
rows in their portion of the data.

Changing the queue thresholds is a solution for (1) but not for (2). For
(2) we need a mechanism to instruct the parallel workers to stop when we
find that other parallel workers have already produced enough rows to
answer the query.

An alternative mechanism that might work is using some stop_worker
boolean in shared memory that we check in CHECK_FOR_INTERRUPTS().
stop_worker is set to true by the leader as soon as it has collected
enough tuples. But then CHECK_FOR_INTERRUPTS() would have to have access
to the parallel context, which might also be a bit ugly.

--
David Geier

#10Tomas Vondra
tomas@vondra.me
In reply to: David Geier (#9)
Re: Performance issues with parallelism and LIMIT

On 11/14/25 19:20, David Geier wrote:

Hi Tomas!

On 13.11.2025 23:36, Tomas Vondra wrote:

...

Unfortunately, I don't think this patch is the way to go. When I apply
it, I get:

ERROR: InstrEndLoop called on running node
CONTEXT: parallel worker

Ooops. That can likely be fixed.

And I very much doubt inventing a new ad hoc way to signal workers is
the right solution (even if there wasn't the InstrEndLoop issue).

What I think we should do is much simpler - make the threshold in shm_mq
dynamic, start with a very low value and gradually ramp up (up to 1/4).
So we'd have

if (mqh->mqh_consume_pending > threshold)

We might start with

threshold = (mq->mq_ring_size / 1024)

or maybe some fixed value, list

thredhold = 128

And on every signal we'd double it, capping it to 1/4 of mq_ring_size.

threshold = Min(threshold * 2, mq->mq_ring_size / 1024);

This is very similar to other places doing this gradual ramp up, like in
the prefetching / read_stream, etc. It allows fast termination for low
LIMIT values, but quickly amortizes the cost for high LIMIT values.

That's a different problem though, isn't it? The original thread
contained two problems: (1) signaling the queue to late and (2) workers
stopping to late in the presence of LIMIT if they're not finding any
rows in their portion of the data.

Changing the queue thresholds is a solution for (1) but not for (2). For
(2) we need a mechanism to instruct the parallel workers to stop when we
find that other parallel workers have already produced enough rows to
answer the query.

Good point, I completely forgot about (2).

An alternative mechanism that might work is using some stop_worker
boolean in shared memory that we check in CHECK_FOR_INTERRUPTS().
stop_worker is set to true by the leader as soon as it has collected
enough tuples. But then CHECK_FOR_INTERRUPTS() would have to have access
to the parallel context, which might also be a bit ugly.

Hmmm, yeah. We already do have shared state for the parallel scan. Do
you think we could maybe integrate that into that? So the scan would
just "finished" for all the workers ...

regards

--
Tomas Vondra

#11David Geier
geidav.pg@gmail.com
In reply to: Tomas Vondra (#8)
Re: Performance issues with parallelism and LIMIT

Hi Tomas!

On 14.11.2025 17:00, Tomas Vondra wrote:

On 11/13/25 23:36, Tomas Vondra wrote:

...

What I think we should do is much simpler - make the threshold in shm_mq
dynamic, start with a very low value and gradually ramp up (up to 1/4).
So we'd have

if (mqh->mqh_consume_pending > threshold)

We might start with

threshold = (mq->mq_ring_size / 1024)

or maybe some fixed value, list

thredhold = 128

And on every signal we'd double it, capping it to 1/4 of mq_ring_size.

threshold = Min(threshold * 2, mq->mq_ring_size / 1024);

This is very similar to other places doing this gradual ramp up, like in
the prefetching / read_stream, etc. It allows fast termination for low
LIMIT values, but quickly amortizes the cost for high LIMIT values.

I gave this a try today, to see if it can actually solve the regression.
Attached is a WIP patch, and a set of benchmarking scripts. On my ryzen
machine I got this (timings of the queries):

fill dataset | 14 15 16 17 18 patched
-----------------------------------------------------------------
10 random | 64.1 319.3 328.7 340.5 344.3 79.5
sequential | 54.6 323.4 347.5 350.5 399.2 78.3
100 random | 11.8 42.9 42.3 42.3 68.5 18.6
sequential | 10.0 44.3 45.0 44.3 60.6 20.0

Clearly 15 is a significant regression, with timings ~4x higher. And the
patch improves that quite a bit. It's not down all the way back to 14,
there's still ~10ms regression, for some reason.

Also, I didn't measure if this patch causes some other regressions for
other queries. I don't think it does, but maybe there's some weird
corner case affected.

regards

Thanks for working on that. This is certainly an improvement. It doesn't
work always though. You can still get into the situation where enough
data is waiting in the queues to satisfy the limit but the threshold
hasn't been reached and also won't be reached anymore because no more
rows will match. I'm especially passionate about that case because
currently you can get arbitrarily bad query runtimes with big data sets
and small LIMITs.

As shared previously in this thread, I cannot reproduce any slowdown
when deactivating the late latching. The test used a very narrow row
(single INT) and the data set fit into shared memory. I've only tried
with 8 parallel workers. Could you test if you can reproduce the
slowdown, in case you have a machine with more cores at hand? If we can
somehow reproduce the original problem, I would also like to check if
there's not other issues at play that can be fixed differently (e.g.
false sharing).

If that optimization is truly necessary, how about always latching if a
LIMIT clause is present? Or in the presence of a LIMIT clause, keeping
the row count of totally produced rows in shared memory and latching in
all workers once the LIMIT has been reached? The overhead of changing
the shared atomic should be neglectable for reasonable LIMITs. Another
alternative would be periodically latching. Given that the minimum
runtime of any parallel query is a few dozen milliseconds due to forking
and plan (de-)serialization, we could live with latching only say every
millisecond or so.

--
David Geier

#12David Geier
geidav.pg@gmail.com
In reply to: Tomas Vondra (#10)
Re: Performance issues with parallelism and LIMIT

Hi Tomas!

On 15.11.2025 00:00, Tomas Vondra wrote:

On 11/14/25 19:20, David Geier wrote:

Ooops. That can likely be fixed.

I'll take a look at why this happens the next days, if you think this
approach generally has a chance to be accepted. See below.

And I very much doubt inventing a new ad hoc way to signal workers is
the right solution (even if there wasn't the InstrEndLoop issue).

Good point, I completely forgot about (2).

In that light, could you take another look at my patch?

Some clarifications: I'm not inventing a new way to signal workers but
I'm using the existing SendProcSignal() machinery to inform parallel
workers to stop. I just added another signal PROCSIG_PARALLEL_STOP and
the corresponding functions to handle it from ProcessInterrupts().

What is "new" is how I'm stopping the parallel workers once they've
received the stop signal: the challenge is that the workers need to
actually jump out of whatever they are doing - even if they aren't
producing any rows at this point; but e.g. are scanning a table
somewhere deep down in ExecScan() / SeqNext().

The only way I can see to make this work, without a huge patch that adds
new code all over the place, is to instruct process termination from
inside ProcessInterrupts(). I'm siglongjmp-ing out of the ExecutorRun()
function so that all parallel worker cleanup code still runs as if the
worker processed to completion. I've tried to end the process without
but that caused all sorts of fallout (instrumentation not collected,
postmaster thinking the process stopped unexpectedly, ...).

Instead of siglongjmp-ing we could maybe call some parallel worker
shutdown function but that would require access to the parallel worker
state variables, which are currently not globally accessible.

--
David Geier

#13Tomas Vondra
tomas@vondra.me
In reply to: David Geier (#12)
Re: Performance issues with parallelism and LIMIT

On 11/18/25 15:06, David Geier wrote:

Hi Tomas!

On 15.11.2025 00:00, Tomas Vondra wrote:

On 11/14/25 19:20, David Geier wrote:

Ooops. That can likely be fixed.

I'll take a look at why this happens the next days, if you think this
approach generally has a chance to be accepted. See below.

And I very much doubt inventing a new ad hoc way to signal workers is
the right solution (even if there wasn't the InstrEndLoop issue).

Good point, I completely forgot about (2).

In that light, could you take another look at my patch?

Some clarifications: I'm not inventing a new way to signal workers but
I'm using the existing SendProcSignal() machinery to inform parallel
workers to stop. I just added another signal PROCSIG_PARALLEL_STOP and
the corresponding functions to handle it from ProcessInterrupts().

Sure, but I still don't quite see the need to do all this.

What is "new" is how I'm stopping the parallel workers once they've
received the stop signal: the challenge is that the workers need to
actually jump out of whatever they are doing - even if they aren't
producing any rows at this point; but e.g. are scanning a table
somewhere deep down in ExecScan() / SeqNext().

The only way I can see to make this work, without a huge patch that adds
new code all over the place, is to instruct process termination from
inside ProcessInterrupts(). I'm siglongjmp-ing out of the ExecutorRun()
function so that all parallel worker cleanup code still runs as if the
worker processed to completion. I've tried to end the process without
but that caused all sorts of fallout (instrumentation not collected,
postmaster thinking the process stopped unexpectedly, ...).

Instead of siglongjmp-ing we could maybe call some parallel worker
shutdown function but that would require access to the parallel worker
state variables, which are currently not globally accessible.

But why? The leader and workers already share state - the parallel scan
state (for the parallel-aware scan on the "driving" table). Why couldn't
the leader set a flag in the scan, and force it to end in workers? Which
AFAICS should lead to workers terminating shortly after that.

All the code / handling is already in place. It will need a bit of new
code in the parallel scans, but but not much I think.

regards

--
Tomas Vondra

#14David Geier
geidav.pg@gmail.com
In reply to: Tomas Vondra (#13)
Re: Performance issues with parallelism and LIMIT

Hi Tomas!

On 18.11.2025 15:59, Tomas Vondra wrote:

Some clarifications: I'm not inventing a new way to signal workers but
I'm using the existing SendProcSignal() machinery to inform parallel
workers to stop. I just added another signal PROCSIG_PARALLEL_STOP and
the corresponding functions to handle it from ProcessInterrupts().

Sure, but I still don't quite see the need to do all this.

What is "new" is how I'm stopping the parallel workers once they've
received the stop signal: the challenge is that the workers need to
actually jump out of whatever they are doing - even if they aren't
producing any rows at this point; but e.g. are scanning a table
somewhere deep down in ExecScan() / SeqNext().

The only way I can see to make this work, without a huge patch that adds
new code all over the place, is to instruct process termination from
inside ProcessInterrupts(). I'm siglongjmp-ing out of the ExecutorRun()
function so that all parallel worker cleanup code still runs as if the
worker processed to completion. I've tried to end the process without
but that caused all sorts of fallout (instrumentation not collected,
postmaster thinking the process stopped unexpectedly, ...).

Instead of siglongjmp-ing we could maybe call some parallel worker
shutdown function but that would require access to the parallel worker
state variables, which are currently not globally accessible.

But why? The leader and workers already share state - the parallel scan
state (for the parallel-aware scan on the "driving" table). Why couldn't
the leader set a flag in the scan, and force it to end in workers? Which
AFAICS should lead to workers terminating shortly after that.

All the code / handling is already in place. It will need a bit of new
code in the parallel scans, but but not much I think.

But this would only work for the SeqScan case, wouldn't it? The parallel
worker might equally well be executing other code which doesn't produce
tuples, such as parallel index scan, a big sort, building a hash table, etc.

I thought this is not a viable solution because it would need changes in
all these places.

--
David Geier

#15Tomas Vondra
tomas@vondra.me
In reply to: David Geier (#14)
Re: Performance issues with parallelism and LIMIT

On 11/18/25 16:07, David Geier wrote:

Hi Tomas!

On 18.11.2025 15:59, Tomas Vondra wrote:

Some clarifications: I'm not inventing a new way to signal workers but
I'm using the existing SendProcSignal() machinery to inform parallel
workers to stop. I just added another signal PROCSIG_PARALLEL_STOP and
the corresponding functions to handle it from ProcessInterrupts().

Sure, but I still don't quite see the need to do all this.

What is "new" is how I'm stopping the parallel workers once they've
received the stop signal: the challenge is that the workers need to
actually jump out of whatever they are doing - even if they aren't
producing any rows at this point; but e.g. are scanning a table
somewhere deep down in ExecScan() / SeqNext().

The only way I can see to make this work, without a huge patch that adds
new code all over the place, is to instruct process termination from
inside ProcessInterrupts(). I'm siglongjmp-ing out of the ExecutorRun()
function so that all parallel worker cleanup code still runs as if the
worker processed to completion. I've tried to end the process without
but that caused all sorts of fallout (instrumentation not collected,
postmaster thinking the process stopped unexpectedly, ...).

Instead of siglongjmp-ing we could maybe call some parallel worker
shutdown function but that would require access to the parallel worker
state variables, which are currently not globally accessible.

But why? The leader and workers already share state - the parallel scan
state (for the parallel-aware scan on the "driving" table). Why couldn't
the leader set a flag in the scan, and force it to end in workers? Which
AFAICS should lead to workers terminating shortly after that.

All the code / handling is already in place. It will need a bit of new
code in the parallel scans, but but not much I think.

But this would only work for the SeqScan case, wouldn't it? The parallel
worker might equally well be executing other code which doesn't produce
tuples, such as parallel index scan, a big sort, building a hash table, etc.

I thought this is not a viable solution because it would need changes in
all these places.

It'd need code in the parallel-aware scans, i.e. seqscan, bitmap, index.
I don't think you'd need code in other plans, because all parallel plans
have one "driving" table.

Maybe that's not enough, not sure. If we want to terminate "immediately"
(and not when getting the next tuple from a scan on the driving table),
we'd need a different solution.

regards

--
Tomas Vondra

#16David Geier
geidav.pg@gmail.com
In reply to: Tomas Vondra (#15)
Re: Performance issues with parallelism and LIMIT

On 18.11.2025 16:40, Tomas Vondra wrote:

But why? The leader and workers already share state - the parallel scan
state (for the parallel-aware scan on the "driving" table). Why couldn't
the leader set a flag in the scan, and force it to end in workers? Which
AFAICS should lead to workers terminating shortly after that.

All the code / handling is already in place. It will need a bit of new
code in the parallel scans, but but not much I think.

But this would only work for the SeqScan case, wouldn't it? The parallel
worker might equally well be executing other code which doesn't produce
tuples, such as parallel index scan, a big sort, building a hash table, etc.

I thought this is not a viable solution because it would need changes in
all these places.

It'd need code in the parallel-aware scans, i.e. seqscan, bitmap, index.
I don't think you'd need code in other plans, because all parallel plans
have one "driving" table.

Maybe that's not enough, not sure. If we want to terminate "immediately"
(and not when getting the next tuple from a scan on the driving table),
we'd need a different solution.

A sort node for example makes this no longer work. As soon as the sort
node pulled all rows from its driving table, the sort node becomes the
driving table for its parent nodes. If no more tables are involved in
the plan from that point on, early termination no longer works.

--
David Geier

#17Tom Lane
tgl@sss.pgh.pa.us
In reply to: David Geier (#16)
Re: Performance issues with parallelism and LIMIT

David Geier <geidav.pg@gmail.com> writes:

On 18.11.2025 16:40, Tomas Vondra wrote:

It'd need code in the parallel-aware scans, i.e. seqscan, bitmap, index.
I don't think you'd need code in other plans, because all parallel plans
have one "driving" table.

A sort node for example makes this no longer work. As soon as the sort
node pulled all rows from its driving table, the sort node becomes the
driving table for its parent nodes. If no more tables are involved in
the plan from that point on, early termination no longer works.

You're assuming that the planner will insert Gather nodes at arbitrary
places in the plan, which isn't true. If it does generate plans that
are problematic from this standpoint, maybe the answer is "don't
parallelize in exactly that way".

regards, tom lane

#18Tomas Vondra
tomas@vondra.me
In reply to: Tom Lane (#17)
Re: Performance issues with parallelism and LIMIT

On 11/18/25 17:51, Tom Lane wrote:

David Geier <geidav.pg@gmail.com> writes:

On 18.11.2025 16:40, Tomas Vondra wrote:

It'd need code in the parallel-aware scans, i.e. seqscan, bitmap, index.
I don't think you'd need code in other plans, because all parallel plans
have one "driving" table.

A sort node for example makes this no longer work. As soon as the sort
node pulled all rows from its driving table, the sort node becomes the
driving table for its parent nodes. If no more tables are involved in
the plan from that point on, early termination no longer works.

You're assuming that the planner will insert Gather nodes at arbitrary
places in the plan, which isn't true. If it does generate plans that
are problematic from this standpoint, maybe the answer is "don't
parallelize in exactly that way".

I think David has a point that nodes that "buffer" tuples (like Sort or
HashAgg) would break the approach making this the responsibility of the
parallel-aware scan. I don't see anything particularly wrong with such
plans - plans with partial aggregation often look like that.

Maybe this should be the responsibility of execProcnode.c, not the
various nodes?

It'd be nice to show this in EXPLAIN (that some of the workers were
terminated early, before processing all the data).

regards

--
Tomas Vondra

#19David Geier
geidav.pg@gmail.com
In reply to: Tomas Vondra (#18)
Re: Performance issues with parallelism and LIMIT

On 18.11.2025 18:31, Tomas Vondra wrote:

On 11/18/25 17:51, Tom Lane wrote:

David Geier <geidav.pg@gmail.com> writes:

On 18.11.2025 16:40, Tomas Vondra wrote:

It'd need code in the parallel-aware scans, i.e. seqscan, bitmap, index.
I don't think you'd need code in other plans, because all parallel plans
have one "driving" table.

You're assuming that the planner will insert Gather nodes at arbitrary
places in the plan, which isn't true. If it does generate plans that
are problematic from this standpoint, maybe the answer is "don't
parallelize in exactly that way".

I think David has a point that nodes that "buffer" tuples (like Sort or
HashAgg) would break the approach making this the responsibility of the
parallel-aware scan. I don't see anything particularly wrong with such
plans - plans with partial aggregation often look like that.

Maybe this should be the responsibility of execProcnode.c, not the
various nodes?

I like that idea, even though it would still not work while a node is
doing the crunching. That is after it has pulled all rows and before it
can return the first row. During this time the node won't call
ExecProcNode().

But that seems like an acceptable limitation. At least it keeps working
above "buffer" nodes.

I'll give this idea a try. Then we can contrast this approach with the
approach in my initial patch.

It'd be nice to show this in EXPLAIN (that some of the workers were
terminated early, before processing all the data).

Inspectability on that end seems useful. Maybe only with VERBOSE,
similarly to the extended per-worker information.

--
David Geier

#20Tomas Vondra
tomas@vondra.me
In reply to: David Geier (#19)
Re: Performance issues with parallelism and LIMIT

On 11/18/25 19:35, David Geier wrote:

On 18.11.2025 18:31, Tomas Vondra wrote:

On 11/18/25 17:51, Tom Lane wrote:

David Geier <geidav.pg@gmail.com> writes:

On 18.11.2025 16:40, Tomas Vondra wrote:

It'd need code in the parallel-aware scans, i.e. seqscan, bitmap, index.
I don't think you'd need code in other plans, because all parallel plans
have one "driving" table.

You're assuming that the planner will insert Gather nodes at arbitrary
places in the plan, which isn't true. If it does generate plans that
are problematic from this standpoint, maybe the answer is "don't
parallelize in exactly that way".

I think David has a point that nodes that "buffer" tuples (like Sort or
HashAgg) would break the approach making this the responsibility of the
parallel-aware scan. I don't see anything particularly wrong with such
plans - plans with partial aggregation often look like that.

Maybe this should be the responsibility of execProcnode.c, not the
various nodes?

I like that idea, even though it would still not work while a node is
doing the crunching. That is after it has pulled all rows and before it
can return the first row. During this time the node won't call
ExecProcNode().

True. Perhaps we could provide a function nodes could call in suitable
places to check whether to end?

Actually, how does canceling queries with parallel workers work? Is that
done similarly to what your patch did?

But that seems like an acceptable limitation. At least it keeps working
above "buffer" nodes.

I'll give this idea a try. Then we can contrast this approach with the
approach in my initial patch.

It'd be nice to show this in EXPLAIN (that some of the workers were
terminated early, before processing all the data).

Inspectability on that end seems useful. Maybe only with VERBOSE,
similarly to the extended per-worker information.

Maybe, no opinion. But it probably needs to apply to all nodes in the
parallel worker, right? Or maybe it's even a per-worker detail.

regards

--
Tomas Vondra

#21David Geier
geidav.pg@gmail.com
In reply to: Tomas Vondra (#20)
Re: Performance issues with parallelism and LIMIT

On 18.11.2025 20:37, Tomas Vondra wrote:

On 11/18/25 19:35, David Geier wrote:

On 18.11.2025 18:31, Tomas Vondra wrote:

On 11/18/25 17:51, Tom Lane wrote:

David Geier <geidav.pg@gmail.com> writes:

On 18.11.2025 16:40, Tomas Vondra wrote:

It'd need code in the parallel-aware scans, i.e. seqscan, bitmap, index.
I don't think you'd need code in other plans, because all parallel plans
have one "driving" table.

You're assuming that the planner will insert Gather nodes at arbitrary
places in the plan, which isn't true. If it does generate plans that
are problematic from this standpoint, maybe the answer is "don't
parallelize in exactly that way".

I think David has a point that nodes that "buffer" tuples (like Sort or
HashAgg) would break the approach making this the responsibility of the
parallel-aware scan. I don't see anything particularly wrong with such
plans - plans with partial aggregation often look like that.

Maybe this should be the responsibility of execProcnode.c, not the
various nodes?

I hadn't realized that this approach has the same limitation:
ExecProcNode() is only called when e.g. heap_nextslot() or
index_getnext_slot() have found a qualifying tuple. Otherwise, they just
keep crunching without returning.

I like that idea, even though it would still not work while a node is
doing the crunching. That is after it has pulled all rows and before it
can return the first row. During this time the node won't call
ExecProcNode().

True. Perhaps we could provide a function nodes could call in suitable
places to check whether to end?

This function would then also be required by the base relation scans.
And we would have to call it more or less in the same places
CHECK_FOR_INTERRUPTS() is called today.

Beyond that, code such as heap_nextslot() or index_getnext_slot() don't
have access to the PlanState which would contain the stop flag. So that
would have to be propagated downwards as well.

All of that would make for a fairly big patch, which the initial patch
avoids.

Actually, how does canceling queries with parallel workers work? Is that
done similarly to what your patch did?

Parallel workers use the same mechanism as normal backends, except that
parallel workers quit instead of waiting for the next query.

The flow is as follows: parallel workers catch SIGINT via
StatementCancelHandler() which sets QueryCancelPending = true. When
ProcessInterrupts() is called the next time, it will elog(ERROR) out.
BackgroundWorkerMain() will catch the error and proc_exit().

This mechanism is very similar to what I have in my patch, with the
difference that (1) I use SendProcSignal() to inform the workers to stop
and (2) that I added another sigsetjmp target around ExecutorRun() to be
able to bail but still call all the shutdown code.

(1) is necessary to be able to distinguish between query cancel and
early stopping but not cancelling.

(2) is necessary because the parallel shutdown code needs to be called
before exiting. I tried to piggy back on the existing error handling
mechanism by siglongjmp(*PG_exception_stack, 1) and there to not calling
EmitErrorReport() if got_stopped == true. That gave me the following error:

ERROR: lost connection to parallel worker

Inspectability on that end seems useful. Maybe only with VERBOSE,
similarly to the extended per-worker information.

Maybe, no opinion. But it probably needs to apply to all nodes in the
parallel worker, right? Or maybe it's even a per-worker detail.

I thought to make it a per-worker detail such as time or number of rows
returned. Let's discuss this again, once we've settled on a solution.

--
David Geier

#22Tomas Vondra
tomas@vondra.me
In reply to: David Geier (#21)
Re: Performance issues with parallelism and LIMIT

On 11/19/25 13:28, David Geier wrote:

On 18.11.2025 20:37, Tomas Vondra wrote:

On 11/18/25 19:35, David Geier wrote:

On 18.11.2025 18:31, Tomas Vondra wrote:

On 11/18/25 17:51, Tom Lane wrote:

David Geier <geidav.pg@gmail.com> writes:

On 18.11.2025 16:40, Tomas Vondra wrote:

It'd need code in the parallel-aware scans, i.e. seqscan, bitmap, index.
I don't think you'd need code in other plans, because all parallel plans
have one "driving" table.

You're assuming that the planner will insert Gather nodes at arbitrary
places in the plan, which isn't true. If it does generate plans that
are problematic from this standpoint, maybe the answer is "don't
parallelize in exactly that way".

I think David has a point that nodes that "buffer" tuples (like Sort or
HashAgg) would break the approach making this the responsibility of the
parallel-aware scan. I don't see anything particularly wrong with such
plans - plans with partial aggregation often look like that.

Maybe this should be the responsibility of execProcnode.c, not the
various nodes?

I hadn't realized that this approach has the same limitation:
ExecProcNode() is only called when e.g. heap_nextslot() or
index_getnext_slot() have found a qualifying tuple. Otherwise, they just
keep crunching without returning.

Right, that's why I suggested to have a function the nodes would call in
suitable places.

I like that idea, even though it would still not work while a node is
doing the crunching. That is after it has pulled all rows and before it
can return the first row. During this time the node won't call
ExecProcNode().

True. Perhaps we could provide a function nodes could call in suitable
places to check whether to end?

This function would then also be required by the base relation scans.
And we would have to call it more or less in the same places
CHECK_FOR_INTERRUPTS() is called today.

Yes, but I don't think CHECK_FOR_INTERRUPTS() would be a good place to
manipulate the executor state. Maybe you could do some magic with
siglongjmp(), but I have "funny" feeling about that - I wouldn't be
surprised if that interfered with elog(), which is the only other place
using siglongjmp() AFAICS.

Which is why I suggested maybe it should be handled in execProcnode
(which would take care of cases where we produce a tuple), and then let
nodes to optionally check the flag too (through a new function).

I haven't tried doing this, so maybe I'm missing something ...

Beyond that, code such as heap_nextslot() or index_getnext_slot() don't
have access to the PlanState which would contain the stop flag. So that
would have to be propagated downwards as well.

All of that would make for a fairly big patch, which the initial patch
avoids.

Right. I don't think we can set the flag in plan/executor state, because
that's not available in signal handler / ProcessInterrupts() ... It'd
need to be a global variable, I guess.

Actually, how does canceling queries with parallel workers work? Is that
done similarly to what your patch did?

Parallel workers use the same mechanism as normal backends, except that
parallel workers quit instead of waiting for the next query.

The flow is as follows: parallel workers catch SIGINT via
StatementCancelHandler() which sets QueryCancelPending = true. When
ProcessInterrupts() is called the next time, it will elog(ERROR) out.
BackgroundWorkerMain() will catch the error and proc_exit().

This mechanism is very similar to what I have in my patch, with the
difference that (1) I use SendProcSignal() to inform the workers to stop
and (2) that I added another sigsetjmp target around ExecutorRun() to be
able to bail but still call all the shutdown code.

OK

(1) is necessary to be able to distinguish between query cancel and
early stopping but not cancelling.

(2) is necessary because the parallel shutdown code needs to be called
before exiting. I tried to piggy back on the existing error handling
mechanism by siglongjmp(*PG_exception_stack, 1) and there to not calling
EmitErrorReport() if got_stopped == true. That gave me the following error:

ERROR: lost connection to parallel worker

Not sure. I have my doubts about using siglongjmp() for this.

Inspectability on that end seems useful. Maybe only with VERBOSE,
similarly to the extended per-worker information.

Maybe, no opinion. But it probably needs to apply to all nodes in the
parallel worker, right? Or maybe it's even a per-worker detail.

I thought to make it a per-worker detail such as time or number of rows
returned. Let's discuss this again, once we've settled on a solution.

OK

regards

--
Tomas Vondra

#23David Geier
geidav.pg@gmail.com
In reply to: Tomas Vondra (#22)
1 attachment(s)
Re: Performance issues with parallelism and LIMIT

On 19.11.2025 21:03, Tomas Vondra wrote:

Right, that's why I suggested to have a function the nodes would call in
suitable places.

I like that idea, even though it would still not work while a node is
doing the crunching. That is after it has pulled all rows and before it
can return the first row. During this time the node won't call
ExecProcNode().

True. Perhaps we could provide a function nodes could call in suitable
places to check whether to end?

This function would then also be required by the base relation scans.
And we would have to call it more or less in the same places
CHECK_FOR_INTERRUPTS() is called today.

Yes, but I don't think CHECK_FOR_INTERRUPTS() would be a good place to
manipulate the executor state. Maybe you could do some magic with
siglongjmp(), but I have "funny" feeling about that - I wouldn't be
surprised if that interfered with elog(), which is the only other place
using siglongjmp() AFAICS.

You had the right intuition. siglongjmp-ing out leaves behind per-node
instrumentation state and CurrentMemoryContext in an unexpected state.

Example: jumping out of the executor, after we've called
InstrStartNode(), but before we call InstrStopNode(). Subsequently
calling InstrEndLoop() will give the error you encountered. A similar
problem exists for CurrentMemoryContext which is not properly reset.

I didn't encounter these issues during my testing because they're
largely timing dependent. Execution can end before the other workers
have started executing. So the stopping logic didn't kick in.

Both issues can be accounted for when jumping out but this seems
somewhat hacky.

Which is why I suggested maybe it should be handled in execProcnode
(which would take care of cases where we produce a tuple), and then let
nodes to optionally check the flag too (through a new function).

I haven't tried doing this, so maybe I'm missing something ...

Beyond that, code such as heap_nextslot() or index_getnext_slot() don't
have access to the PlanState which would contain the stop flag. So that
would have to be propagated downwards as well.

All of that would make for a fairly big patch, which the initial patch
avoids.

This turned out to be false. See below.

Right. I don't think we can set the flag in plan/executor state, because
that's not available in signal handler / ProcessInterrupts() ... It'd
need to be a global variable, I guess.

What we can do is use a global variable. That also makes checking the
flag a lot easier because we don't need to pass it around through
multiple abstraction layers.

What needs to be taken care of though is to only bail from scans that
are actually initiated from plan nodes. There are many places in the
code that use e.g. the table AM API directly. We don't want to bail from
these scans. Without flagging if a scan should bail or not, e.g.
ScanPgRelation() will return no tuple and therefore relcache code starts
failing.

The new patch accounts for that by introducing a new TableScanDescData
flag SO_OBEY_PARALLEL_WORKER_STOP, which indicates if the scan should
adhere to the parallel worker stop or not.

Stopping is broadcasted to all parallel workers via SendProcSignal().
The stop variable is checked with the new
CHECK_FOR_PARALLEL_WORKER_STOP() macro.

In the PoC implementation I've for now only changed nodeSeqScan.c. If
there's agreement to pursue this approach, I'll change the other places
as well. Naming can also likely be still improved.

--
David Geier

Attachments:

v2-0001-Parallel-workers-stop-quicker.patchtext/x-patch; charset=UTF-8; name=v2-0001-Parallel-workers-stop-quicker.patchDownload
From d056801ec55eaf95829f9f09e1675a47e8d57b5b Mon Sep 17 00:00:00 2001
From: David Geier <geidav.pg@gmail.com>
Date: Tue, 2 Sep 2025 13:24:45 +0200
Subject: [PATCH] Parallel workers stop quicker

In the presence of a LIMIT N clause, the executor stops as soon as it got
enough rows and shuts down the plan. In the serial case the query ends
immediately. If the query happens to be parallel, the workers only
exit if they produced N rows, regardless of how many rows got already
produced by other participants.

Worst-case example: a query has a LIMIT 1 clause and scans a large table
where only a single row qualifies. The first worker that finds the
matching row will return that row and terminate. All other workers will
keep running until the table is scanned to completion.

This change signals all workers to stop execution once the leader got
enough rows.
---
 src/backend/access/heap/heapam.c     |  7 ++++++
 src/backend/executor/execParallel.c  | 23 +++++++++++++++---
 src/backend/executor/nodeGather.c    | 36 ++++++++++++++++++++++++++++
 src/backend/executor/nodeSeqscan.c   |  2 ++
 src/backend/storage/ipc/procsignal.c |  4 ++++
 src/backend/tcop/postgres.c          |  4 ++++
 src/include/access/tableam.h         |  3 +++
 src/include/executor/execParallel.h  |  8 +++++++
 src/include/nodes/execnodes.h        |  1 +
 src/include/storage/procsignal.h     |  1 +
 10 files changed, 86 insertions(+), 3 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 4b0c49f4bb0..b3bb17c7aad 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -39,6 +39,7 @@
 #include "access/syncscan.h"
 #include "access/valid.h"
 #include "access/visibilitymap.h"
+#include "executor/execParallel.h"
 #include "access/xloginsert.h"
 #include "catalog/pg_database.h"
 #include "catalog/pg_database_d.h"
@@ -917,6 +918,9 @@ heapgettup(HeapScanDesc scan,
 	 */
 	while (true)
 	{
+		if ((scan->rs_base.rs_flags & SO_OBEY_PARALLEL_WORKER_STOP) != 0 && IS_PARALLEL_WORKER_STOP())
+			break;
+
 		heap_fetch_next_buffer(scan, dir);
 
 		/* did we run out of blocks to scan? */
@@ -1034,6 +1038,9 @@ heapgettup_pagemode(HeapScanDesc scan,
 	 */
 	while (true)
 	{
+		if ((scan->rs_base.rs_flags & SO_OBEY_PARALLEL_WORKER_STOP) != 0 && IS_PARALLEL_WORKER_STOP())
+			break;
+
 		heap_fetch_next_buffer(scan, dir);
 
 		/* did we run out of blocks to scan? */
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index f098a5557cf..5e20e33efcf 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -1410,6 +1410,23 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
 								 pwcxt);
 }
 
+volatile sig_atomic_t	ParallelStopPending = false;
+volatile bool			parallel_worker_stop = false;
+
+void HandleParallelStop(void)
+{
+	InterruptPending = true;
+	ParallelStopPending = true;
+	SetLatch(MyLatch);	
+}
+
+void ProcessParallelStop(void)
+{
+	ParallelStopPending = false;
+	parallel_worker_stop = true;
+}
+
+
 /*
  * Main entrypoint for parallel query worker processes.
  *
@@ -1493,9 +1510,9 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
 	InstrStartParallelQuery();
 
 	/*
-	 * Run the plan.  If we specified a tuple bound, be careful not to demand
-	 * more tuples than that.
-	 */
+	* Run the plan.  If we specified a tuple bound, be careful not to demand
+	* more tuples than that.
+	*/
 	ExecutorRun(queryDesc,
 				ForwardScanDirection,
 				fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed);
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index dc7d1830259..31745e86ced 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -36,6 +36,7 @@
 #include "executor/tqueue.h"
 #include "miscadmin.h"
 #include "optimizer/optimizer.h"
+#include "storage/procsignal.h"
 #include "utils/wait_event.h"
 
 
@@ -71,6 +72,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
 	gatherstate->need_to_scan_locally =
 		!node->single_copy && parallel_leader_participation;
 	gatherstate->tuples_needed = -1;
+	gatherstate->tuples_produced = 0;
 
 	/*
 	 * Miscellaneous initialization
@@ -126,6 +128,36 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
 	return gatherstate;
 }
 
+/* ----------------------------------------------------------------
+ *		Workers only stop when they themselves reach the LIMIT.
+ * 		They don't stop if other workers in total produced already
+ * 		enough rows to reach the LIMIT. Hence, we need to stop them
+ * 		explicitly.
+ * ----------------------------------------------------------------
+ */
+static void
+StopWorkersIfLimitReached(GatherState *node)
+{
+	if (node->tuples_needed != -1 && node->tuples_produced == node->tuples_needed)
+	{
+		if (node->pei != NULL)
+		{
+			ParallelContext *pcxt = node->pei->pcxt;
+			int i;
+
+			if (pcxt->worker != NULL)
+			{
+				for (i = 0; i < pcxt->nworkers_launched; ++i)
+				{
+					pid_t pid;
+					GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle, &pid);
+					SendProcSignal(pid, PROCSIG_PARALLEL_STOP, INVALID_PROC_NUMBER);
+				}
+			}
+		}
+	}
+}
+
 /* ----------------------------------------------------------------
  *		ExecGather(node)
  *
@@ -212,6 +244,7 @@ ExecGather(PlanState *pstate)
 		/* Run plan locally if no workers or enabled and not single-copy. */
 		node->need_to_scan_locally = (node->nreaders == 0)
 			|| (!gather->single_copy && parallel_leader_participation);
+		node->tuples_produced = 0;
 		node->initialized = true;
 	}
 
@@ -230,6 +263,9 @@ ExecGather(PlanState *pstate)
 	if (TupIsNull(slot))
 		return NULL;
 
+	node->tuples_produced++;
+	StopWorkersIfLimitReached(node);
+
 	/* If no projection is required, we're done. */
 	if (node->ps.ps_ProjInfo == NULL)
 		return slot;
diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c
index 94047d29430..cb31ee8bbd3 100644
--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -375,6 +375,7 @@ ExecSeqScanInitializeDSM(SeqScanState *node,
 	shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pscan);
 	node->ss.ss_currentScanDesc =
 		table_beginscan_parallel(node->ss.ss_currentRelation, pscan);
+	node->ss.ss_currentScanDesc->rs_flags |= SO_OBEY_PARALLEL_WORKER_STOP;
 }
 
 /* ----------------------------------------------------------------
@@ -408,4 +409,5 @@ ExecSeqScanInitializeWorker(SeqScanState *node,
 	pscan = shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, false);
 	node->ss.ss_currentScanDesc =
 		table_beginscan_parallel(node->ss.ss_currentRelation, pscan);
+	node->ss.ss_currentScanDesc->rs_flags |= SO_OBEY_PARALLEL_WORKER_STOP;
 }
diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c
index 087821311cc..8f99ecebe2f 100644
--- a/src/backend/storage/ipc/procsignal.c
+++ b/src/backend/storage/ipc/procsignal.c
@@ -19,6 +19,7 @@
 
 #include "access/parallel.h"
 #include "commands/async.h"
+#include "executor/execParallel.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "port/pg_bitutils.h"
@@ -694,6 +695,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
 	if (CheckProcSignal(PROCSIG_PARALLEL_APPLY_MESSAGE))
 		HandleParallelApplyMessageInterrupt();
 
+	if (CheckProcSignal(PROCSIG_PARALLEL_STOP))
+		HandleParallelStop();
+
 	if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_DATABASE))
 		HandleRecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_DATABASE);
 
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 7dd75a490aa..bd74d381d67 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -40,6 +40,7 @@
 #include "commands/explain_state.h"
 #include "commands/prepare.h"
 #include "common/pg_prng.h"
+#include "executor/execParallel.h"
 #include "jit/jit.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
@@ -3541,6 +3542,9 @@ ProcessInterrupts(void)
 
 	if (ParallelApplyMessagePending)
 		ProcessParallelApplyMessages();
+
+	if (ParallelStopPending)
+		ProcessParallelStop();
 }
 
 /*
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index e16bf025692..239852627fc 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -63,6 +63,9 @@ typedef enum ScanOptions
 
 	/* unregister snapshot at scan end? */
 	SO_TEMP_SNAPSHOT = 1 << 9,
+
+	/* Bail out from this scan if parallel bailing activated */
+	SO_OBEY_PARALLEL_WORKER_STOP = 1 << 10,
 }			ScanOptions;
 
 /*
diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h
index 5e7106c397a..45d86b804f0 100644
--- a/src/include/executor/execParallel.h
+++ b/src/include/executor/execParallel.h
@@ -37,6 +37,14 @@ typedef struct ParallelExecutorInfo
 	struct TupleQueueReader **reader;	/* tuple reader/writer support */
 } ParallelExecutorInfo;
 
+extern PGDLLIMPORT volatile sig_atomic_t ParallelStopPending;
+extern PGDLLIMPORT volatile bool		 parallel_worker_stop;
+
+#define IS_PARALLEL_WORKER_STOP() 		(parallel_worker_stop)
+
+extern void HandleParallelStop(void);
+extern void ProcessParallelStop(void);
+
 extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate,
 												  EState *estate, Bitmapset *sendParams, int nworkers,
 												  int64 tuples_needed);
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 18ae8f0d4bb..9315fa6c942 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -2743,6 +2743,7 @@ typedef struct GatherState
 	bool		initialized;	/* workers launched? */
 	bool		need_to_scan_locally;	/* need to read from local plan? */
 	int64		tuples_needed;	/* tuple bound, see ExecSetTupleBound */
+	int64 		tuples_produced;	/* tuples already produced */
 	/* these fields are set up once: */
 	TupleTableSlot *funnel_slot;
 	struct ParallelExecutorInfo *pei;
diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h
index afeeb1ca019..f7f4ee85154 100644
--- a/src/include/storage/procsignal.h
+++ b/src/include/storage/procsignal.h
@@ -36,6 +36,7 @@ typedef enum
 	PROCSIG_BARRIER,			/* global barrier interrupt  */
 	PROCSIG_LOG_MEMORY_CONTEXT, /* ask backend to log the memory contexts */
 	PROCSIG_PARALLEL_APPLY_MESSAGE, /* Message from parallel apply workers */
+	PROCSIG_PARALLEL_STOP, /* Instruct parallel worker to stop */
 
 	/* Recovery conflict reasons */
 	PROCSIG_RECOVERY_CONFLICT_FIRST,
-- 
2.51.0

#24David Geier
geidav.pg@gmail.com
In reply to: David Geier (#23)
Re: Performance issues with parallelism and LIMIT

On 26.11.2025 09:15, David Geier wrote:

What we can do is use a global variable. That also makes checking the
flag a lot easier because we don't need to pass it around through
multiple abstraction layers.

What needs to be taken care of though is to only bail from scans that
are actually initiated from plan nodes. There are many places in the
code that use e.g. the table AM API directly. We don't want to bail from
these scans. Without flagging if a scan should bail or not, e.g.
ScanPgRelation() will return no tuple and therefore relcache code starts
failing.

The new patch accounts for that by introducing a new TableScanDescData
flag SO_OBEY_PARALLEL_WORKER_STOP, which indicates if the scan should
adhere to the parallel worker stop or not.

Stopping is broadcasted to all parallel workers via SendProcSignal().
The stop variable is checked with the new
CHECK_FOR_PARALLEL_WORKER_STOP() macro.

In the PoC implementation I've for now only changed nodeSeqScan.c. If
there's agreement to pursue this approach, I'll change the other places
as well. Naming can also likely be still improved.

I missed attaching the example I used for testing.

CREATE TABLE bar (col INT);
INSERT INTO bar SELECT generate_series(1, 50000000);
ANALYZE bar;
SET parallel_leader_participation = OFF;
SET synchronize_seqscans = OFF;
EXPLAIN ANALYZE VERBOSE SELECT col FROM bar WHERE col = 1 LIMIT 1;

I disabled sychronize_seqscans to make the test deterministic. I
disabled parallel_leader_participation to make sure one of the workers
finds the first row and quits.

Running with parallel_leader_participation enabled revealed one more
issue: if the leader finds the row, before the parallel workers have
started up, the stop signal is lost and the workers don't stop early.

Instead of SendProcSignal() we can store a flag in shared memory that
indicates that the leader has already enough rows. I'll give this
approach a try.

--
David Geier

#25Tomas Vondra
tomas@vondra.me
In reply to: David Geier (#23)
Re: Performance issues with parallelism and LIMIT

On 11/26/25 09:15, David Geier wrote:

On 19.11.2025 21:03, Tomas Vondra wrote:

Right, that's why I suggested to have a function the nodes would call in
suitable places.

I like that idea, even though it would still not work while a node is
doing the crunching. That is after it has pulled all rows and before it
can return the first row. During this time the node won't call
ExecProcNode().

True. Perhaps we could provide a function nodes could call in suitable
places to check whether to end?

This function would then also be required by the base relation scans.
And we would have to call it more or less in the same places
CHECK_FOR_INTERRUPTS() is called today.

Yes, but I don't think CHECK_FOR_INTERRUPTS() would be a good place to
manipulate the executor state. Maybe you could do some magic with
siglongjmp(), but I have "funny" feeling about that - I wouldn't be
surprised if that interfered with elog(), which is the only other place
using siglongjmp() AFAICS.

You had the right intuition. siglongjmp-ing out leaves behind per-node
instrumentation state and CurrentMemoryContext in an unexpected state.

Example: jumping out of the executor, after we've called
InstrStartNode(), but before we call InstrStopNode(). Subsequently
calling InstrEndLoop() will give the error you encountered. A similar
problem exists for CurrentMemoryContext which is not properly reset.

I didn't encounter these issues during my testing because they're
largely timing dependent. Execution can end before the other workers
have started executing. So the stopping logic didn't kick in.

Both issues can be accounted for when jumping out but this seems
somewhat hacky.

The question is if this are the only two such issues possible, and I'm
afraid the answer is "no" :-(

The question is if "exiting" from any place calling CFI leaves the
execution state in a valid state. Valid enough so that we can call
ExecEndNode() on all the nodes, including the one from which we exited.
But I don't think we can rely on that. The node can do multiple steps,
interleaved with CFI, not expecting that only one of them happens. I
assume this would cause a lot of issues ...

Which is why I suggested maybe it should be handled in execProcnode
(which would take care of cases where we produce a tuple), and then let
nodes to optionally check the flag too (through a new function).

I haven't tried doing this, so maybe I'm missing something ...

Beyond that, code such as heap_nextslot() or index_getnext_slot() don't
have access to the PlanState which would contain the stop flag. So that
would have to be propagated downwards as well.

All of that would make for a fairly big patch, which the initial patch
avoids.

This turned out to be false. See below.

Right. I don't think we can set the flag in plan/executor state, because
that's not available in signal handler / ProcessInterrupts() ... It'd
need to be a global variable, I guess.

What we can do is use a global variable. That also makes checking the
flag a lot easier because we don't need to pass it around through
multiple abstraction layers.

What needs to be taken care of though is to only bail from scans that
are actually initiated from plan nodes. There are many places in the
code that use e.g. the table AM API directly. We don't want to bail from
these scans. Without flagging if a scan should bail or not, e.g.
ScanPgRelation() will return no tuple and therefore relcache code starts
failing.

The new patch accounts for that by introducing a new TableScanDescData
flag SO_OBEY_PARALLEL_WORKER_STOP, which indicates if the scan should
adhere to the parallel worker stop or not.

Stopping is broadcasted to all parallel workers via SendProcSignal().
The stop variable is checked with the new
CHECK_FOR_PARALLEL_WORKER_STOP() macro.

In the PoC implementation I've for now only changed nodeSeqScan.c. If
there's agreement to pursue this approach, I'll change the other places
as well. Naming can also likely be still improved.

This assumes we need to "exit" only from a heapam scan. That's true for
the example, but is that enough in general? What if the worker already
finished it's plan, and is now busy doing something else expensive?
Could be a big sort, aggregation, ... Can we do something about these
cases too?

regards

--
Tomas Vondra