From 9553f1ec5885c9e2a22a0eafa4f4b489f13d22ae Mon Sep 17 00:00:00 2001
From: David Geier <geidav.pg@gmail.com>
Date: Tue, 2 Sep 2025 13:24:45 +0200
Subject: [PATCH] Parallel workers stop quicker

---
 src/backend/executor/execParallel.c  | 51 ++++++++++++++++++++++++----
 src/backend/executor/nodeGather.c    | 36 ++++++++++++++++++++
 src/backend/storage/ipc/procsignal.c |  4 +++
 src/backend/tcop/postgres.c          |  4 +++
 src/include/executor/execParallel.h  |  5 +++
 src/include/nodes/execnodes.h        |  1 +
 src/include/storage/procsignal.h     |  1 +
 7 files changed, 95 insertions(+), 7 deletions(-)

diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index f098a5557cf..3b0993fd9d6 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -1410,6 +1410,32 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
 								 pwcxt);
 }
 
+volatile sig_atomic_t	ParallelStopPending = false;
+sigjmp_buf *			parallel_sigjmp_buf = NULL;
+volatile bool			got_stopped = false;
+
+void HandleParallelStop(void)
+{
+	InterruptPending = true;
+	ParallelStopPending = true;
+	SetLatch(MyLatch);	
+}
+
+void ProcessParallelStop(void)
+{
+	ParallelStopPending = false;
+	got_stopped = true;
+
+	/*
+	 * Only allow siglongjmp if we are executing the plan.
+	 * Otherwise, we might jump back right after ExecutePlan() even
+	 * though we are not yet executing the plan or we're already done.
+	 */
+	if (parallel_sigjmp_buf != NULL)
+		siglongjmp(*parallel_sigjmp_buf, 1);
+}
+
+
 /*
  * Main entrypoint for parallel query worker processes.
  *
@@ -1440,6 +1466,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
 	void	   *area_space;
 	dsa_area   *area;
 	ParallelWorkerContext pwcxt;
+	sigjmp_buf	local_sigjmp_buf;
 
 	/* Get fixed-size state. */
 	fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
@@ -1492,13 +1519,23 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
 	 */
 	InstrStartParallelQuery();
 
-	/*
-	 * Run the plan.  If we specified a tuple bound, be careful not to demand
-	 * more tuples than that.
-	 */
-	ExecutorRun(queryDesc,
-				ForwardScanDirection,
-				fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed);
+	if (!got_stopped)
+	{
+		if (sigsetjmp(local_sigjmp_buf, 1) == 0)
+		{
+			parallel_sigjmp_buf = &local_sigjmp_buf;
+
+			/*
+			* Run the plan.  If we specified a tuple bound, be careful not to demand
+			* more tuples than that.
+			*/
+			ExecutorRun(queryDesc,
+						ForwardScanDirection,
+						fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed);
+		}
+	}
+
+	parallel_sigjmp_buf = NULL;
 
 	/* Shut down the executor */
 	ExecutorFinish(queryDesc);
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index dc7d1830259..31745e86ced 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -36,6 +36,7 @@
 #include "executor/tqueue.h"
 #include "miscadmin.h"
 #include "optimizer/optimizer.h"
+#include "storage/procsignal.h"
 #include "utils/wait_event.h"
 
 
@@ -71,6 +72,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
 	gatherstate->need_to_scan_locally =
 		!node->single_copy && parallel_leader_participation;
 	gatherstate->tuples_needed = -1;
+	gatherstate->tuples_produced = 0;
 
 	/*
 	 * Miscellaneous initialization
@@ -126,6 +128,36 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
 	return gatherstate;
 }
 
+/* ----------------------------------------------------------------
+ *		Workers only stop when they themselves reach the LIMIT.
+ * 		They don't stop if other workers in total produced already
+ * 		enough rows to reach the LIMIT. Hence, we need to stop them
+ * 		explicitly.
+ * ----------------------------------------------------------------
+ */
+static void
+StopWorkersIfLimitReached(GatherState *node)
+{
+	if (node->tuples_needed != -1 && node->tuples_produced == node->tuples_needed)
+	{
+		if (node->pei != NULL)
+		{
+			ParallelContext *pcxt = node->pei->pcxt;
+			int i;
+
+			if (pcxt->worker != NULL)
+			{
+				for (i = 0; i < pcxt->nworkers_launched; ++i)
+				{
+					pid_t pid;
+					GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle, &pid);
+					SendProcSignal(pid, PROCSIG_PARALLEL_STOP, INVALID_PROC_NUMBER);
+				}
+			}
+		}
+	}
+}
+
 /* ----------------------------------------------------------------
  *		ExecGather(node)
  *
@@ -212,6 +244,7 @@ ExecGather(PlanState *pstate)
 		/* Run plan locally if no workers or enabled and not single-copy. */
 		node->need_to_scan_locally = (node->nreaders == 0)
 			|| (!gather->single_copy && parallel_leader_participation);
+		node->tuples_produced = 0;
 		node->initialized = true;
 	}
 
@@ -230,6 +263,9 @@ ExecGather(PlanState *pstate)
 	if (TupIsNull(slot))
 		return NULL;
 
+	node->tuples_produced++;
+	StopWorkersIfLimitReached(node);
+
 	/* If no projection is required, we're done. */
 	if (node->ps.ps_ProjInfo == NULL)
 		return slot;
diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c
index 087821311cc..8f99ecebe2f 100644
--- a/src/backend/storage/ipc/procsignal.c
+++ b/src/backend/storage/ipc/procsignal.c
@@ -19,6 +19,7 @@
 
 #include "access/parallel.h"
 #include "commands/async.h"
+#include "executor/execParallel.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "port/pg_bitutils.h"
@@ -694,6 +695,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
 	if (CheckProcSignal(PROCSIG_PARALLEL_APPLY_MESSAGE))
 		HandleParallelApplyMessageInterrupt();
 
+	if (CheckProcSignal(PROCSIG_PARALLEL_STOP))
+		HandleParallelStop();
+
 	if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_DATABASE))
 		HandleRecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_DATABASE);
 
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 0cecd464902..5b320e52b94 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -39,6 +39,7 @@
 #include "commands/event_trigger.h"
 #include "commands/prepare.h"
 #include "common/pg_prng.h"
+#include "executor/execParallel.h"
 #include "jit/jit.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
@@ -3536,6 +3537,9 @@ ProcessInterrupts(void)
 
 	if (ParallelApplyMessagePending)
 		ProcessParallelApplyMessages();
+
+	if (ParallelStopPending)
+		ProcessParallelStop();
 }
 
 /*
diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h
index 5e7106c397a..9e0be350694 100644
--- a/src/include/executor/execParallel.h
+++ b/src/include/executor/execParallel.h
@@ -37,6 +37,11 @@ typedef struct ParallelExecutorInfo
 	struct TupleQueueReader **reader;	/* tuple reader/writer support */
 } ParallelExecutorInfo;
 
+extern PGDLLIMPORT volatile sig_atomic_t ParallelStopPending;
+
+extern void HandleParallelStop(void);
+extern void ProcessParallelStop(void);
+
 extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate,
 												  EState *estate, Bitmapset *sendParams, int nworkers,
 												  int64 tuples_needed);
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index de782014b2d..63962aebcd2 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -2740,6 +2740,7 @@ typedef struct GatherState
 	bool		initialized;	/* workers launched? */
 	bool		need_to_scan_locally;	/* need to read from local plan? */
 	int64		tuples_needed;	/* tuple bound, see ExecSetTupleBound */
+	int64 		tuples_produced;	/* tuples already produced */
 	/* these fields are set up once: */
 	TupleTableSlot *funnel_slot;
 	struct ParallelExecutorInfo *pei;
diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h
index afeeb1ca019..f7f4ee85154 100644
--- a/src/include/storage/procsignal.h
+++ b/src/include/storage/procsignal.h
@@ -36,6 +36,7 @@ typedef enum
 	PROCSIG_BARRIER,			/* global barrier interrupt  */
 	PROCSIG_LOG_MEMORY_CONTEXT, /* ask backend to log the memory contexts */
 	PROCSIG_PARALLEL_APPLY_MESSAGE, /* Message from parallel apply workers */
+	PROCSIG_PARALLEL_STOP, /* Instruct parallel worker to stop */
 
 	/* Recovery conflict reasons */
 	PROCSIG_RECOVERY_CONFLICT_FIRST,
-- 
2.43.0

