From 0e37f6acca869ea471a840f9511adf1de856b5d2 Mon Sep 17 00:00:00 2001
From: Jelte Fennema-Nio <postgres@jeltef.nl>
Date: Sat, 13 Dec 2025 13:05:50 +0100
Subject: [PATCH v0] POC: Use non-blocking apis for frontend tools

POC by Claude Code, to see what would be needed to change all
fe_utils/cancel.c users to use non-blocking APIs. This code is an
extremely rough draft, only meant as a way to gauge feasibility of this
approach.
---
 src/bin/pgbench/pgbench.c     |  38 +-
 src/bin/psql/command.c        |   5 +-
 src/bin/psql/common.c         |  47 +-
 src/bin/psql/copy.c           |  54 +-
 src/bin/psql/large_obj.c      |  12 +-
 src/fe_utils/Makefile         |   2 +-
 src/fe_utils/cancel.c         | 914 ++++++++++++++++++++++++++++++----
 src/fe_utils/parallel_slot.c  |  45 +-
 src/fe_utils/query_utils.c    |   8 +-
 src/include/fe_utils/cancel.h |  34 +-
 10 files changed, 953 insertions(+), 206 deletions(-)

diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 1dae918cc09..8ebefceeebf 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -867,7 +867,7 @@ get_table_relkind(PGconn *con, const char *table)
 	const char *sql =
 		"SELECT relkind FROM pg_catalog.pg_class WHERE oid=$1::pg_catalog.regclass";
 
-	res = PQexecParams(con, sql, 1, NULL, params, NULL, NULL, 0);
+	res = cancellable_exec_params(con, sql, 1, NULL, params, NULL, NULL, 0);
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
 		pg_log_error("query failed: %s", PQerrorMessage(con));
@@ -1490,13 +1490,13 @@ accumStats(StatsData *stats, bool skipped, double lat, double lag,
 	}
 }
 
-/* call PQexec() and exit() on failure */
+/* call cancellable_exec() and exit() on failure */
 static void
 executeStatement(PGconn *con, const char *sql)
 {
 	PGresult   *res;
 
-	res = PQexec(con, sql);
+	res = cancellable_exec(con, sql);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 	{
 		pg_log_error("query failed: %s", PQerrorMessage(con));
@@ -1506,13 +1506,13 @@ executeStatement(PGconn *con, const char *sql)
 	PQclear(res);
 }
 
-/* call PQexec() and complain, but without exiting, on failure */
+/* call cancellable_exec() and complain, but without exiting, on failure */
 static void
 tryExecuteStatement(PGconn *con, const char *sql)
 {
 	PGresult   *res;
 
-	res = PQexec(con, sql);
+	res = cancellable_exec(con, sql);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 	{
 		pg_log_error("%s", PQerrorMessage(con));
@@ -5059,7 +5059,7 @@ initPopulateTable(PGconn *con, const char *table, int64 base,
 	else if (n == -1)
 		pg_fatal("invalid format string");
 
-	res = PQexec(con, copy_statement);
+	res = cancellable_exec(con, copy_statement);
 
 	if (PQresultStatus(res) != PGRES_COPY_IN)
 		pg_fatal("unexpected copy in result: %s", PQerrorMessage(con));
@@ -5338,7 +5338,6 @@ runInitSteps(const char *initialize_steps)
 		pg_fatal("could not create connection for initialization");
 
 	setup_cancel_handler(NULL);
-	SetCancelConn(con);
 
 	for (step = initialize_steps; *step != '\0'; step++)
 	{
@@ -5399,7 +5398,6 @@ runInitSteps(const char *initialize_steps)
 	}
 
 	fprintf(stderr, "done in %.2f s (%s).\n", run_time, stats.data);
-	ResetCancelConn();
 	PQfinish(con);
 	termPQExpBuffer(&stats);
 }
@@ -5417,7 +5415,7 @@ GetTableInfo(PGconn *con, bool scale_given)
 	 * get the scaling factor that should be same as count(*) from
 	 * pgbench_branches if this is not a custom query
 	 */
-	res = PQexec(con, "select count(*) from pgbench_branches");
+	res = cancellable_exec(con, "select count(*) from pgbench_branches");
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
 		char	   *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE);
@@ -5456,17 +5454,17 @@ GetTableInfo(PGconn *con, bool scale_given)
 	 * We assume no partitioning on any failure, so as to avoid failing on an
 	 * old version without "pg_partitioned_table".
 	 */
-	res = PQexec(con,
-				 "select o.n, p.partstrat, pg_catalog.count(i.inhparent) "
-				 "from pg_catalog.pg_class as c "
-				 "join pg_catalog.pg_namespace as n on (n.oid = c.relnamespace) "
-				 "cross join lateral (select pg_catalog.array_position(pg_catalog.current_schemas(true), n.nspname)) as o(n) "
-				 "left join pg_catalog.pg_partitioned_table as p on (p.partrelid = c.oid) "
-				 "left join pg_catalog.pg_inherits as i on (c.oid = i.inhparent) "
-				 "where c.relname = 'pgbench_accounts' and o.n is not null "
-				 "group by 1, 2 "
-				 "order by 1 asc "
-				 "limit 1");
+	res = cancellable_exec(con,
+						   "select o.n, p.partstrat, pg_catalog.count(i.inhparent) "
+						   "from pg_catalog.pg_class as c "
+						   "join pg_catalog.pg_namespace as n on (n.oid = c.relnamespace) "
+						   "cross join lateral (select pg_catalog.array_position(pg_catalog.current_schemas(true), n.nspname)) as o(n) "
+						   "left join pg_catalog.pg_partitioned_table as p on (p.partrelid = c.oid) "
+						   "left join pg_catalog.pg_inherits as i on (c.oid = i.inhparent) "
+						   "where c.relname = 'pgbench_accounts' and o.n is not null "
+						   "group by 1, 2 "
+						   "order by 1 asc "
+						   "limit 1");
 
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
diff --git a/src/bin/psql/command.c b/src/bin/psql/command.c
index e6365d823ce..238d62dc86c 100644
--- a/src/bin/psql/command.c
+++ b/src/bin/psql/command.c
@@ -4299,7 +4299,6 @@ do_connect(enum trivalue reuse_previous_specification,
 				 */
 				PQfinish(o_conn);
 				pset.db = NULL;
-				ResetCancelConn();
 				UnsyncVariables();
 			}
 
@@ -6225,7 +6224,7 @@ lookup_object_oid(EditableObjectType obj_type, const char *desc,
 		destroyPQExpBuffer(query);
 		return false;
 	}
-	res = PQexec(pset.db, query->data);
+	res = cancellable_exec(pset.db, query->data);
 	if (PQresultStatus(res) == PGRES_TUPLES_OK && PQntuples(res) == 1)
 		*obj_oid = atooid(PQgetvalue(res, 0, 0));
 	else
@@ -6307,7 +6306,7 @@ get_create_object_cmd(EditableObjectType obj_type, Oid oid,
 		destroyPQExpBuffer(query);
 		return false;
 	}
-	res = PQexec(pset.db, query->data);
+	res = cancellable_exec(pset.db, query->data);
 	if (PQresultStatus(res) == PGRES_TUPLES_OK && PQntuples(res) == 1)
 	{
 		resetPQExpBuffer(buf);
diff --git a/src/bin/psql/common.c b/src/bin/psql/common.c
index 2eadd391a9c..119ca99b418 100644
--- a/src/bin/psql/common.c
+++ b/src/bin/psql/common.c
@@ -382,7 +382,6 @@ CheckConnection(void)
 				PQfinish(pset.dead_conn);
 			pset.dead_conn = pset.db;
 			pset.db = NULL;
-			ResetCancelConn();
 			UnsyncVariables();
 		}
 		else
@@ -583,7 +582,7 @@ ClearOrSaveAllResults(void)
 {
 	PGresult   *result;
 
-	while ((result = PQgetResult(pset.db)) != NULL)
+	while ((result = cancellable_getresult(pset.db)) != NULL)
 		ClearOrSaveResult(result);
 }
 
@@ -681,11 +680,7 @@ PSQLexec(const char *query)
 			return NULL;
 	}
 
-	SetCancelConn(pset.db);
-
-	res = PQexec(pset.db, query);
-
-	ResetCancelConn();
+	res = cancellable_exec(pset.db, query);
 
 	if (!AcceptResult(res, true))
 	{
@@ -719,12 +714,8 @@ PSQLexecWatch(const char *query, const printQueryOpt *opt, FILE *printQueryFout,
 		return 0;
 	}
 
-	SetCancelConn(pset.db);
-
 	res = ExecQueryAndProcessResults(query, &elapsed_msec, NULL, true, min_rows, opt, printQueryFout);
 
-	ResetCancelConn();
-
 	/* Possible microtiming output */
 	if (timing)
 		PrintTiming(elapsed_msec);
@@ -943,8 +934,6 @@ HandleCopyResult(PGresult **resultp, FILE *copystream)
 	Assert(result_status == PGRES_COPY_OUT ||
 		   result_status == PGRES_COPY_IN);
 
-	SetCancelConn(pset.db);
-
 	if (result_status == PGRES_COPY_OUT)
 	{
 		success = handleCopyOut(pset.db,
@@ -973,7 +962,6 @@ HandleCopyResult(PGresult **resultp, FILE *copystream)
 							   PQbinaryTuples(*resultp),
 							   &copy_result);
 	}
-	ResetCancelConn();
 
 	/*
 	 * Replace the PGRES_COPY_OUT/IN result with COPY command's exit status,
@@ -1162,8 +1150,6 @@ SendQuery(const char *query)
 		fflush(pset.logfile);
 	}
 
-	SetCancelConn(pset.db);
-
 	transaction_status = PQtransactionStatus(pset.db);
 
 	if (transaction_status == PQTRANS_IDLE &&
@@ -1172,7 +1158,7 @@ SendQuery(const char *query)
 	{
 		PGresult   *result;
 
-		result = PQexec(pset.db, "BEGIN");
+		result = cancellable_exec(pset.db, "BEGIN");
 		if (PQresultStatus(result) != PGRES_COMMAND_OK)
 		{
 			pg_log_info("%s", PQerrorMessage(pset.db));
@@ -1190,7 +1176,7 @@ SendQuery(const char *query)
 	{
 		PGresult   *result;
 
-		result = PQexec(pset.db, "SAVEPOINT pg_psql_temporary_savepoint");
+		result = cancellable_exec(pset.db, "SAVEPOINT pg_psql_temporary_savepoint");
 		if (PQresultStatus(result) != PGRES_COMMAND_OK)
 		{
 			pg_log_info("%s", PQerrorMessage(pset.db));
@@ -1258,7 +1244,7 @@ SendQuery(const char *query)
 		{
 			PGresult   *svptres;
 
-			svptres = PQexec(pset.db, svptcmd);
+			svptres = cancellable_exec(pset.db, svptcmd);
 			if (PQresultStatus(svptres) != PGRES_COMMAND_OK)
 			{
 				pg_log_info("%s", PQerrorMessage(pset.db));
@@ -1293,9 +1279,6 @@ SendQuery(const char *query)
 
 sendquery_cleanup:
 
-	/* global cancellation reset */
-	ResetCancelConn();
-
 	/* reset \g's output-to-filename trigger */
 	if (pset.gfname)
 	{
@@ -1370,7 +1353,7 @@ DescribeQuery(const char *query, double *elapsed_msec)
 	 * anyway.  (So there's no great need to clear it when done, which is a
 	 * good thing because libpq provides no easy way to do that.)
 	 */
-	result = PQprepare(pset.db, "", query, 0, NULL);
+	result = cancellable_prepare(pset.db, "", query, 0, NULL);
 	if (PQresultStatus(result) != PGRES_COMMAND_OK)
 	{
 		pg_log_info("%s", PQerrorMessage(pset.db));
@@ -1380,7 +1363,7 @@ DescribeQuery(const char *query, double *elapsed_msec)
 	}
 	PQclear(result);
 
-	result = PQdescribePrepared(pset.db, "");
+	result = cancellable_describe_prepared(pset.db, "");
 	OK = AcceptResult(result, true) &&
 		(PQresultStatus(result) == PGRES_COMMAND_OK);
 	if (OK && result)
@@ -1428,7 +1411,7 @@ DescribeQuery(const char *query, double *elapsed_msec)
 			appendPQExpBufferStr(&buf, ") s(name, tp, tpm)");
 			PQclear(result);
 
-			result = PQexec(pset.db, buf.data);
+			result = cancellable_exec(pset.db, buf.data);
 			OK = AcceptResult(result, true);
 
 			if (timing)
@@ -1467,7 +1450,7 @@ discardAbortedPipelineResults(void)
 {
 	for (;;)
 	{
-		PGresult   *res = PQgetResult(pset.db);
+		PGresult   *res = cancellable_getresult(pset.db);
 		ExecStatusType result_status = PQresultStatus(res);
 
 		if (result_status == PGRES_PIPELINE_SYNC)
@@ -1491,7 +1474,7 @@ discardAbortedPipelineResults(void)
 			 * Fetch result to consume the end of the current query being
 			 * processed.
 			 */
-			fatal_res = PQgetResult(pset.db);
+			fatal_res = cancellable_getresult(pset.db);
 			Assert(fatal_res == NULL);
 			return res;
 		}
@@ -1759,8 +1742,8 @@ ExecQueryAndProcessResults(const char *query,
 		return 0;
 	}
 
-	/* first result */
-	result = PQgetResult(pset.db);
+	/* first result -- use cancellable wait for the potentially long wait */
+	result = cancellable_getresult(pset.db);
 	if (min_rows > 0 && PQntuples(result) < min_rows)
 	{
 		return_early = true;
@@ -1828,7 +1811,7 @@ ExecQueryAndProcessResults(const char *query,
 				result = discardAbortedPipelineResults();
 			}
 			else
-				result = PQgetResult(pset.db);
+				result = cancellable_getresult(pset.db);
 
 			/*
 			 * Get current timing measure in case an error occurs
@@ -1987,7 +1970,7 @@ ExecQueryAndProcessResults(const char *query,
 				ClearOrSaveResult(result);
 
 				/* get the next result, loop if it's PGRES_TUPLES_CHUNK */
-				result = PQgetResult(pset.db);
+				result = cancellable_getresult(pset.db);
 			} while (PQresultStatus(result) == PGRES_TUPLES_CHUNK);
 
 			/* We expect an empty PGRES_TUPLES_OK, else there's a problem */
@@ -2081,7 +2064,7 @@ ExecQueryAndProcessResults(const char *query,
 		 * to process.  We need to do that to check whether this is the last.
 		 */
 		if (PQpipelineStatus(pset.db) == PQ_PIPELINE_OFF)
-			next_result = PQgetResult(pset.db);
+			next_result = cancellable_getresult(pset.db);
 		else
 		{
 			/*
diff --git a/src/bin/psql/copy.c b/src/bin/psql/copy.c
index 6a8a9792e7d..fc28537acc1 100644
--- a/src/bin/psql/copy.c
+++ b/src/bin/psql/copy.c
@@ -18,6 +18,7 @@
 #include "common.h"
 #include "common/logging.h"
 #include "copy.h"
+#include "fe_utils/cancel.h"
 #include "libpq-fe.h"
 #include "pqexpbuffer.h"
 #include "prompt.h"
@@ -436,10 +437,24 @@ handleCopyOut(PGconn *conn, FILE *copystream, PGresult **res)
 	bool		OK = true;
 	char	   *buf;
 	int			ret;
+	bool		cancel_sent = false;
 
 	for (;;)
 	{
-		ret = PQgetCopyData(conn, &buf, 0);
+		/* Use async mode so we can watch for cancel interrupts */
+		ret = PQgetCopyData(conn, &buf, 1);
+
+		if (ret == 0)
+		{
+			/* No data available yet, wait for socket or cancel */
+			if (!cancellable_socket_wait(conn, &cancel_sent, false))
+			{
+				OK = false;
+				break;
+			}
+			PQconsumeInput(conn);
+			continue;
+		}
 
 		if (ret < 0)
 			break;				/* done or server/connection error */
@@ -480,7 +495,7 @@ handleCopyOut(PGconn *conn, FILE *copystream, PGresult **res)
 	 * but hasn't exited COPY_OUT state internally.  So we ignore the
 	 * possibility here.
 	 */
-	*res = PQgetResult(conn);
+	*res = cancellable_getresult(conn);
 	if (PQresultStatus(*res) != PGRES_COMMAND_OK)
 	{
 		pg_log_info("%s", PQerrorMessage(conn));
@@ -513,6 +528,10 @@ handleCopyIn(PGconn *conn, FILE *copystream, bool isbinary, PGresult **res)
 	bool		OK;
 	char		buf[COPYBUFSIZ];
 	bool		showprompt;
+	bool		cancel_sent = false;
+
+	/* Set non-blocking mode so PQputCopyData/End won't block internally */
+	PQsetnonblocking(conn, 1);
 
 	/*
 	 * Establish longjmp destination for exiting from wait-for-input. (This is
@@ -523,9 +542,10 @@ handleCopyIn(PGconn *conn, FILE *copystream, bool isbinary, PGresult **res)
 		/* got here with longjmp */
 
 		/* Terminate data transfer */
-		PQputCopyEnd(conn,
-					 (PQprotocolVersion(conn) < 3) ? NULL :
-					 _("canceled by user"));
+		cancellable_put_copy_end(conn,
+								 (PQprotocolVersion(conn) < 3) ? NULL :
+								 _("canceled by user"),
+								 &cancel_sent);
 
 		OK = false;
 		goto copyin_cleanup;
@@ -569,7 +589,7 @@ handleCopyIn(PGconn *conn, FILE *copystream, bool isbinary, PGresult **res)
 			if (buflen <= 0)
 				break;
 
-			if (PQputCopyData(conn, buf, buflen) <= 0)
+			if (cancellable_put_copy_data(conn, buf, buflen, &cancel_sent) <= 0)
 			{
 				OK = false;
 				break;
@@ -667,7 +687,7 @@ handleCopyIn(PGconn *conn, FILE *copystream, bool isbinary, PGresult **res)
 			 */
 			if (buflen >= COPYBUFSIZ - 5 || (copydone && buflen > 0))
 			{
-				if (PQputCopyData(conn, buf, buflen) <= 0)
+				if (cancellable_put_copy_data(conn, buf, buflen, &cancel_sent) <= 0)
 				{
 					OK = false;
 					break;
@@ -688,9 +708,10 @@ handleCopyIn(PGconn *conn, FILE *copystream, bool isbinary, PGresult **res)
 	 * keep the version checks just in case you're using a pre-v14 libpq.so at
 	 * runtime)
 	 */
-	if (PQputCopyEnd(conn,
-					 (OK || PQprotocolVersion(conn) < 3) ? NULL :
-					 _("aborted because of read failure")) <= 0)
+	if (cancellable_put_copy_end(conn,
+								 (OK || PQprotocolVersion(conn) < 3) ? NULL :
+								 _("aborted because of read failure"),
+								 &cancel_sent) <= 0)
 		OK = false;
 
 copyin_cleanup:
@@ -717,15 +738,20 @@ copyin_cleanup:
 	 * connection is lost.  But that's fine; it will get us out of COPY_IN
 	 * state, which is what we need.)
 	 */
-	while (*res = PQgetResult(conn), PQresultStatus(*res) == PGRES_COPY_IN)
+	while (*res = cancellable_getresult(conn), PQresultStatus(*res) == PGRES_COPY_IN)
 	{
 		OK = false;
 		PQclear(*res);
 		/* We can't send an error message if we're using protocol version 2 */
-		PQputCopyEnd(conn,
-					 (PQprotocolVersion(conn) < 3) ? NULL :
-					 _("trying to exit copy mode"));
+		cancellable_put_copy_end(conn,
+								 (PQprotocolVersion(conn) < 3) ? NULL :
+								 _("trying to exit copy mode"),
+								 &cancel_sent);
 	}
+
+	/* Restore blocking mode */
+	PQsetnonblocking(conn, 0);
+
 	if (PQresultStatus(*res) != PGRES_COMMAND_OK)
 	{
 		pg_log_info("%s", PQerrorMessage(conn));
diff --git a/src/bin/psql/large_obj.c b/src/bin/psql/large_obj.c
index 021f78e0f78..949f984ff58 100644
--- a/src/bin/psql/large_obj.c
+++ b/src/bin/psql/large_obj.c
@@ -147,9 +147,7 @@ do_lo_export(const char *loid_arg, const char *filename_arg)
 	if (!start_lo_xact("\\lo_export", &own_transaction))
 		return false;
 
-	SetCancelConn(NULL);
-	status = lo_export(pset.db, atooid(loid_arg), filename_arg);
-	ResetCancelConn();
+	status = cancellable_lo_export(pset.db, atooid(loid_arg), filename_arg);
 
 	/* of course this status is documented nowhere :( */
 	if (status != 1)
@@ -183,9 +181,7 @@ do_lo_import(const char *filename_arg, const char *comment_arg)
 	if (!start_lo_xact("\\lo_import", &own_transaction))
 		return false;
 
-	SetCancelConn(NULL);
-	loid = lo_import(pset.db, filename_arg);
-	ResetCancelConn();
+	loid = cancellable_lo_import(pset.db, filename_arg);
 
 	if (loid == InvalidOid)
 	{
@@ -245,9 +241,7 @@ do_lo_unlink(const char *loid_arg)
 	if (!start_lo_xact("\\lo_unlink", &own_transaction))
 		return false;
 
-	SetCancelConn(NULL);
-	status = lo_unlink(pset.db, loid);
-	ResetCancelConn();
+	status = cancellable_lo_unlink(pset.db, loid);
 
 	if (status == -1)
 	{
diff --git a/src/fe_utils/Makefile b/src/fe_utils/Makefile
index cbfbf93ac69..809ab21cc0c 100644
--- a/src/fe_utils/Makefile
+++ b/src/fe_utils/Makefile
@@ -17,7 +17,7 @@ subdir = src/fe_utils
 top_builddir = ../..
 include $(top_builddir)/src/Makefile.global
 
-override CPPFLAGS := -DFRONTEND -I$(libpq_srcdir) $(CPPFLAGS)
+override CPPFLAGS := -DFRONTEND -I$(libpq_srcdir) -I$(top_srcdir)/src/port $(CPPFLAGS)
 
 OBJS = \
 	archive.o \
diff --git a/src/fe_utils/cancel.c b/src/fe_utils/cancel.c
index e6b75439f56..514a8f54c82 100644
--- a/src/fe_utils/cancel.c
+++ b/src/fe_utils/cancel.c
@@ -2,9 +2,47 @@
  *
  * Query cancellation support for frontend code
  *
- * Assorted utility functions to control query cancellation with signal
- * handler for SIGINT.
+ * This module provides SIGINT/Ctrl-C handling for frontend tools that need
+ * to cancel queries running on the server.  It combines three completely
+ * independent mechanisms, any combination of which can be used by a caller:
  *
+ * 1. Server cancel request -- Often what applications need. When a query is
+ *    running, and the main thread is waiting for the result of that query in a
+ *    blocking manner, we want SIGINT/Ctrl-C to cancel that query. This can be
+ *    done by waiting for the query using cancellable_getresult() or
+ *    cancellable_exec() instead of PQgetResult() or PQexec(). These functions
+ *    wait on both the server connection and a cancel interrupt simultaneously.
+ *    When SIGINT/Ctrl-C is received the cancel request is sent to the server
+ *    from the main thread, which avoids race conditions that would occur if
+ *    the cancel were sent from a different thread.
+ *
+ * 2. CancelRequested flag -- A more involved but also much more flexible way
+ *    of cancelling. A volatile sig_atomic_t CancelRequested flag is set to
+ *    true whenever SIGINT is received. This means that the application code
+ *    can fully control what it does with this flag. The primary usecase for
+ *    this is when the application code is not blocked (indefinitely), but
+ *    needs to take an action when Ctrl-C is pressed, such as break out of a
+ *    long running loop.
+ *
+ * 3. Cancel callback -- The most complex way of handling a sigint. An optional
+ *    function pointer registered via setup_cancel_handler().  If set, it is
+ *    called directly from the signal handler, so it must be async-signal-safe.
+ *    Writing async-signal-safe code is not easy, so this is only recommended
+ *    as a last resort. psql uses this to longjmp back to the main loop when no
+ *    query is active.
+ *
+ * On Unix, the SIGINT signal handler cannot call PQcancelBlocking() directly
+ * because it is not async-signal-safe.  Instead, we use a pipe (the "self-pipe
+ * trick") to interrupt the main thread's select()/poll() call.  The signal
+ * handler writes a byte to the pipe, which makes the main thread's wait
+ * return.  The main thread then notices the cancel request and sends it to the
+ * server synchronously.
+ *
+ * On Windows, the console control handler runs in a separate OS-provided
+ * thread.  We use a Windows event object as the equivalent of the self-pipe:
+ * the console handler signals the event, and the main thread uses
+ * WaitForMultipleObjects to wait on both the socket and the cancel event
+ * simultaneously, so cancellation is noticed instantly.
  *
  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
@@ -16,11 +54,18 @@
 
 #include "postgres_fe.h"
 
+#include <fcntl.h>
+#include <signal.h>
 #include <unistd.h>
 
-#include "common/connect.h"
+#ifndef WIN32
+#include <sys/select.h>
+#endif
+
+#include "common/logging.h"
 #include "fe_utils/cancel.h"
 #include "fe_utils/string_utils.h"
+#include "libpq/libpq-fs.h"
 
 
 /*
@@ -36,11 +81,6 @@
 		(void) rc_; \
 	} while (0)
 
-/*
- * Contains all the information needed to cancel a query issued from
- * a database connection to the backend.
- */
-static PGcancel *volatile cancelConn = NULL;
 
 /*
  * Predetermined localized error strings --- needed to avoid trying
@@ -58,169 +98,770 @@ static const char *cancel_not_sent_msg = NULL;
  */
 volatile sig_atomic_t CancelRequested = false;
 
-#ifdef WIN32
-static CRITICAL_SECTION cancelConnLock;
-#endif
-
 /*
  * Additional callback for cancellations.
  */
 static void (*cancel_callback) (void) = NULL;
 
+#ifndef WIN32
+/*
+ * On Unix, the SIGINT signal handler cannot call PQcancelBlocking() directly
+ * because it is not async-signal-safe.  Instead, we use a pipe to interrupt
+ * the main thread: the signal handler writes a byte to the pipe, and the main
+ * thread's select() call returns because the pipe's read end becomes readable.
+ * The main thread then sends the cancel request to the server.
+ */
+static int	cancel_pipe[2] = {-1, -1};
+#else
+/*
+ * On Windows, we use an event object to wake the main thread's
+ * WaitForMultipleObjects() call when Ctrl-C is pressed.
+ */
+static HANDLE cancel_event = NULL;
+#endif
+
 
 /*
- * SetCancelConn
- *
- * Set cancelConn to point to the current database connection.
+ * Send a cancel request to the given connection.
  */
 void
-SetCancelConn(PGconn *conn)
+send_cancel(PGconn *conn)
 {
-	PGcancel   *oldCancelConn;
+	PGcancelConn *cancelConn = PQcancelCreate(conn);
 
-#ifdef WIN32
-	EnterCriticalSection(&cancelConnLock);
-#endif
+	if (cancel_sent_msg)
+		write_stderr(cancel_sent_msg);
 
-	/* Free the old one if we have one */
-	oldCancelConn = cancelConn;
+	if (!PQcancelBlocking(cancelConn))
+	{
+		if (cancel_not_sent_msg)
+			write_stderr(cancel_not_sent_msg);
+		write_stderr(PQcancelErrorMessage(cancelConn));
+	}
 
-	/* be sure handle_sigint doesn't use pointer while freeing */
-	cancelConn = NULL;
+	PQcancelFinish(cancelConn);
+}
 
-	if (oldCancelConn != NULL)
-		PQfreeCancel(oldCancelConn);
 
-	cancelConn = PQgetCancel(conn);
+#ifndef WIN32
 
-#ifdef WIN32
-	LeaveCriticalSection(&cancelConnLock);
-#endif
+/*
+ * Drain any pending bytes from the cancel pipe.  This should be called
+ * before entering a cancellable wait, so that a stale signal from a
+ * previous query doesn't immediately trigger a cancel.
+ */
+static void
+drain_cancel_pipe(void)
+{
+	if (cancel_pipe[0] >= 0)
+	{
+		char		buf[16];
+		int			save_errno = errno;
+
+		while (read(cancel_pipe[0], buf, sizeof(buf)) > 0)
+			 /* loop */ ;
+
+		errno = save_errno;
+	}
 }
 
+#endif							/* !WIN32 */
+
+
 /*
- * ResetCancelConn
+ * cancellable_socket_wait
+ *
+ * Wait for the given connection's socket to become readable (or writable if
+ * for_write is true), while also watching for a cancel interrupt
+ * (SIGINT/Ctrl-C).
  *
- * Free the current cancel connection, if any, and set to NULL.
+ * If a cancel interrupt arrives during the wait, a cancel request is sent to
+ * the server.  The cancel is sent at most once per call (tracked via
+ * *cancel_sent, which the caller should initialize to false).  After sending
+ * the cancel, this function continues waiting for the socket, since the server
+ * is expected to respond with an error that the caller will process normally.
+ *
+ * Returns true if the socket is ready, false on error.
  */
-void
-ResetCancelConn(void)
+bool
+cancellable_socket_wait(PGconn *conn, bool *cancel_sent, bool for_write)
 {
-	PGcancel   *oldCancelConn;
+	int			sock = PQsocket(conn);
 
-#ifdef WIN32
-	EnterCriticalSection(&cancelConnLock);
-#endif
+	if (sock < 0)
+		return false;
+
+	for (;;)
+	{
+#ifndef WIN32
+		fd_set		sock_mask;
+		fd_set		cancel_mask;
+		fd_set	   *read_set;
+		fd_set	   *write_set;
+		int			maxFd;
+		int			rc;
+
+		FD_ZERO(&sock_mask);
+		FD_SET(sock, &sock_mask);
+		maxFd = sock;
+
+		FD_ZERO(&cancel_mask);
+		if (cancel_pipe[0] >= 0)
+		{
+			FD_SET(cancel_pipe[0], &cancel_mask);
+			if (cancel_pipe[0] > maxFd)
+				maxFd = cancel_pipe[0];
+		}
+
+		if (for_write)
+		{
+			read_set = &cancel_mask;
+			write_set = &sock_mask;
+		}
+		else
+		{
+			/*
+			 * Watch both the socket and the cancel pipe for readability.
+			 * Merge them into one fd_set.
+			 */
+			if (cancel_pipe[0] >= 0)
+				FD_SET(cancel_pipe[0], &sock_mask);
+			read_set = &sock_mask;
+			write_set = NULL;
+		}
 
-	oldCancelConn = cancelConn;
+		rc = select(maxFd + 1, read_set, write_set, NULL, NULL);
 
-	/* be sure handle_sigint doesn't use pointer while freeing */
-	cancelConn = NULL;
+		if (rc < 0)
+		{
+			if (errno == EINTR)
+				continue;
+			return false;
+		}
 
-	if (oldCancelConn != NULL)
-		PQfreeCancel(oldCancelConn);
+		/* Check cancel pipe (always in read_set for write, merged for read) */
+		if (cancel_pipe[0] >= 0 &&
+			((for_write && FD_ISSET(cancel_pipe[0], &cancel_mask)) ||
+			 (!for_write && FD_ISSET(cancel_pipe[0], &sock_mask))))
+		{
+			drain_cancel_pipe();
 
-#ifdef WIN32
-	LeaveCriticalSection(&cancelConnLock);
+			if (!*cancel_sent)
+			{
+				send_cancel(conn);
+				*cancel_sent = true;
+			}
+
+			/* Check if the socket is also ready */
+			if (for_write)
+			{
+				if (!FD_ISSET(sock, &sock_mask))
+					continue;
+			}
+			else
+			{
+				if (!FD_ISSET(sock, &sock_mask))
+					continue;
+			}
+		}
+
+		if (FD_ISSET(sock, &sock_mask))
+			return true;
+#else							/* WIN32 */
+		HANDLE		events[2];
+		WSAEVENT	sock_event;
+		DWORD		ret;
+		long		net_events;
+
+		net_events = for_write ? FD_WRITE : (FD_READ | FD_CLOSE);
+
+		sock_event = WSACreateEvent();
+		if (sock_event == WSA_INVALID_EVENT)
+			return false;
+
+		if (WSAEventSelect(sock, sock_event, net_events) != 0)
+		{
+			WSACloseEvent(sock_event);
+			return false;
+		}
+
+		events[0] = sock_event;
+		events[1] = cancel_event;
+
+		ret = WaitForMultipleObjects(cancel_event ? 2 : 1,
+									 events, FALSE, INFINITE);
+
+		WSAEventSelect(sock, sock_event, 0);
+		WSACloseEvent(sock_event);
+
+		if (ret == WAIT_OBJECT_0)
+		{
+			/* Socket is ready */
+			return true;
+		}
+		else if (ret == WAIT_OBJECT_0 + 1)
+		{
+			/* Cancel event signaled */
+			ResetEvent(cancel_event);
+
+			if (!*cancel_sent)
+			{
+				send_cancel(conn);
+				*cancel_sent = true;
+			}
+
+			/* Loop back to wait for the socket */
+			continue;
+		}
+		else
+		{
+			/* WAIT_FAILED or unexpected return */
+			return false;
+		}
+#endif							/* WIN32 */
+	}
+}
+
+
+/*
+ * cancellable_getresult
+ *
+ * Like PQgetResult, but cancellable via SIGINT.  When a cancel interrupt
+ * arrives while waiting for data from the server, a cancel request is sent
+ * to the server.  The server is then expected to respond with an error,
+ * which will be returned as the PGresult.
+ */
+PGresult *
+cancellable_getresult(PGconn *conn)
+{
+	bool		cancel_sent = false;
+
+#ifndef WIN32
+	drain_cancel_pipe();
+#else
+	if (cancel_event)
+		ResetEvent(cancel_event);
 #endif
+
+	while (PQisBusy(conn))
+	{
+		if (!cancellable_socket_wait(conn, &cancel_sent, false))
+			break;
+
+		if (!PQconsumeInput(conn))
+			break;
+	}
+
+	return PQgetResult(conn);
+}
+
+/*
+ * cancellable_exec
+ *
+ * Like PQexec, but cancellable via SIGINT.
+ */
+PGresult *
+cancellable_exec(PGconn *conn, const char *query)
+{
+	PGresult   *lastResult = NULL;
+	PGresult   *result;
+
+	if (!PQsendQuery(conn, query))
+		return PQgetResult(conn);
+
+	while ((result = cancellable_getresult(conn)) != NULL)
+	{
+		PQclear(lastResult);
+		lastResult = result;
+	}
+
+	return lastResult;
 }
 
 
 /*
- * Code to support query cancellation
+ * cancellable_exec_params
+ *
+ * Like PQexecParams, but cancellable via SIGINT.
+ */
+PGresult *
+cancellable_exec_params(PGconn *conn, const char *query,
+						int nParams, const Oid *paramTypes,
+						const char *const *paramValues,
+						const int *paramLengths, const int *paramFormats,
+						int resultFormat)
+{
+	PGresult   *lastResult = NULL;
+	PGresult   *result;
+
+	if (!PQsendQueryParams(conn, query, nParams, paramTypes,
+						   paramValues, paramLengths, paramFormats,
+						   resultFormat))
+		return PQgetResult(conn);
+
+	while ((result = cancellable_getresult(conn)) != NULL)
+	{
+		PQclear(lastResult);
+		lastResult = result;
+	}
+
+	return lastResult;
+}
+
+/*
+ * cancellable_prepare
  *
- * Note that sending the cancel directly from the signal handler is safe
- * because PQcancel() is written to make it so.  We use write() to report
- * to stderr because it's better to use simple facilities in a signal
- * handler.
+ * Like PQprepare, but cancellable via SIGINT.
+ */
+PGresult *
+cancellable_prepare(PGconn *conn, const char *stmtName,
+					const char *query, int nParams,
+					const Oid *paramTypes)
+{
+	PGresult   *lastResult = NULL;
+	PGresult   *result;
+
+	if (!PQsendPrepare(conn, stmtName, query, nParams, paramTypes))
+		return PQgetResult(conn);
+
+	while ((result = cancellable_getresult(conn)) != NULL)
+	{
+		PQclear(lastResult);
+		lastResult = result;
+	}
+
+	return lastResult;
+}
+
+/*
+ * cancellable_describe_prepared
  *
- * On Windows, the signal canceling happens on a separate thread, because
- * that's how SetConsoleCtrlHandler works.  The PQcancel function is safe
- * for this (unlike PQrequestCancel).  However, a CRITICAL_SECTION is required
- * to protect the PGcancel structure against being changed while the signal
- * thread is using it.
+ * Like PQdescribePrepared, but cancellable via SIGINT.
  */
+PGresult *
+cancellable_describe_prepared(PGconn *conn, const char *stmtName)
+{
+	PGresult   *lastResult = NULL;
+	PGresult   *result;
 
-#ifndef WIN32
+	if (!PQsendDescribePrepared(conn, stmtName))
+		return PQgetResult(conn);
+
+	while ((result = cancellable_getresult(conn)) != NULL)
+	{
+		PQclear(lastResult);
+		lastResult = result;
+	}
+
+	return lastResult;
+}
 
 /*
- * handle_sigint
+ * cancellable_flush
  *
- * Handle interrupt signals by canceling the current command, if cancelConn
- * is set.
+ * Flush pending output on the connection, waiting for the socket to become
+ * writable if needed.  Cancellable via SIGINT.
+ *
+ * Returns 0 on success, -1 on error or cancel.
  */
-static void
-handle_sigint(SIGNAL_ARGS)
+static int
+cancellable_flush(PGconn *conn, bool *cancel_sent)
 {
-	char		errbuf[256];
+	int			rc;
 
-	CancelRequested = true;
+	while ((rc = PQflush(conn)) > 0)
+	{
+		if (!cancellable_socket_wait(conn, cancel_sent, true))
+			return -1;
+	}
 
-	if (cancel_callback != NULL)
-		cancel_callback();
+	return rc;
+}
+
+/*
+ * cancellable_put_copy_data
+ *
+ * Like PQputCopyData, but cancellable via SIGINT.  The connection must be in
+ * non-blocking mode.
+ *
+ * Returns 1 on success, -1 on error or cancel.
+ */
+int
+cancellable_put_copy_data(PGconn *conn, const char *buffer, int nbytes,
+						  bool *cancel_sent)
+{
+	int			rc;
 
-	/* Send QueryCancel if we are processing a database query */
-	if (cancelConn != NULL)
+	for (;;)
 	{
-		if (PQcancel(cancelConn, errbuf, sizeof(errbuf)))
+		rc = PQputCopyData(conn, buffer, nbytes);
+
+		if (rc < 0)
+			return -1;
+
+		if (rc > 0)
 		{
-			write_stderr(cancel_sent_msg);
+			/* Data queued, flush it */
+			if (cancellable_flush(conn, cancel_sent) < 0)
+				return -1;
+			return 1;
 		}
-		else
+
+		/* rc == 0: would block, flush and retry */
+		if (cancellable_flush(conn, cancel_sent) < 0)
+			return -1;
+	}
+}
+
+/*
+ * cancellable_put_copy_end
+ *
+ * Like PQputCopyEnd, but cancellable via SIGINT.  The connection must be in
+ * non-blocking mode.
+ *
+ * Returns 1 on success, -1 on error or cancel.
+ */
+int
+cancellable_put_copy_end(PGconn *conn, const char *errormsg,
+						 bool *cancel_sent)
+{
+	int			rc;
+
+	for (;;)
+	{
+		rc = PQputCopyEnd(conn, errormsg);
+
+		if (rc < 0)
+			return -1;
+
+		if (rc > 0)
 		{
-			write_stderr(cancel_not_sent_msg);
-			write_stderr(errbuf);
+			/* End message queued, flush it */
+			if (cancellable_flush(conn, cancel_sent) < 0)
+				return -1;
+			return 1;
 		}
+
+		/* rc == 0: would block, flush and retry */
+		if (cancellable_flush(conn, cancel_sent) < 0)
+			return -1;
 	}
 }
 
 /*
- * setup_cancel_handler
+ * Helper to execute a lo_* SQL function via cancellable_exec_params and
+ * return a single integer column result.  Returns true on success, with
+ * the integer result in *result.  Returns false on failure.
+ */
+static bool
+lo_exec_int(PGconn *conn, const char *query, int nParams,
+			const char *const *paramValues, int *result)
+{
+	PGresult   *res;
+
+	res = cancellable_exec_params(conn, query, nParams, NULL,
+								  paramValues, NULL, NULL, 0);
+
+	if (PQresultStatus(res) != PGRES_TUPLES_OK ||
+		PQntuples(res) != 1 || PQnfields(res) != 1)
+	{
+		PQclear(res);
+		return false;
+	}
+
+	*result = atoi(PQgetvalue(res, 0, 0));
+	PQclear(res);
+	return true;
+}
+
+#define LO_BUFSIZE 8192
+
+/*
+ * cancellable_lo_export
  *
- * Register query cancellation callback for SIGINT.
+ * Like lo_export, but each server round-trip is cancellable via SIGINT.
+ * Returns 1 on success, -1 on failure.
  */
-void
-setup_cancel_handler(void (*query_cancel_callback) (void))
+int
+cancellable_lo_export(PGconn *conn, Oid lobjId, const char *filename)
 {
-	cancel_callback = query_cancel_callback;
-	cancel_sent_msg = _("Cancel request sent\n");
-	cancel_not_sent_msg = _("Could not send cancel request: ");
+	char		oidbuf[32];
+	char		fdbuf[32];
+	char		lenbuf[32];
+	const char *params[2];
+	int			lobj;
+	int			fd;
+	int			result = 1;
+
+	/* lo_open */
+	sprintf(oidbuf, "%u", lobjId);
+	sprintf(lenbuf, "%d", INV_READ);
+	params[0] = oidbuf;
+	params[1] = lenbuf;
+	if (!lo_exec_int(conn, "SELECT lo_open($1::oid, $2::int4)",
+					 2, params, &lobj))
+		return -1;
+	if (lobj == -1)
+		return -1;
+
+	sprintf(fdbuf, "%d", lobj);
+
+	/* Open local file */
+	fd = open(filename, O_CREAT | O_WRONLY | O_TRUNC | PG_BINARY, 0666);
+	if (fd < 0)
+	{
+		pg_log_error("could not open file \"%s\": %m", filename);
+		params[0] = fdbuf;
+		cancellable_exec_params(conn, "SELECT lo_close($1::int4)",
+								1, NULL, params, NULL, NULL, 0);
+		return -1;
+	}
 
-	pqsignal(SIGINT, handle_sigint);
+	/* Read loop */
+	sprintf(lenbuf, "%d", LO_BUFSIZE);
+	params[0] = fdbuf;
+	params[1] = lenbuf;
+	for (;;)
+	{
+		PGresult   *res;
+		char	   *data;
+		int			nbytes;
+		int			written;
+
+		res = cancellable_exec_params(conn,
+									  "SELECT loread($1::int4, $2::int4)",
+									  2, NULL, params, NULL, NULL, 1);
+		if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		{
+			PQclear(res);
+			result = -1;
+			break;
+		}
+
+		nbytes = PQgetlength(res, 0, 0);
+		if (nbytes == 0)
+		{
+			PQclear(res);
+			break;
+		}
+
+		data = PQgetvalue(res, 0, 0);
+		written = write(fd, data, nbytes);
+		PQclear(res);
+
+		if (written != nbytes)
+		{
+			pg_log_error("could not write to file \"%s\": %m", filename);
+			result = -1;
+			break;
+		}
+	}
+
+	close(fd);
+
+	/*
+	 * If the read loop failed, we're likely in an aborted transaction, so
+	 * skip lo_close to avoid overwriting the useful error message.
+	 */
+	if (result == 1)
+	{
+		int			close_result;
+
+		params[0] = fdbuf;
+		if (!lo_exec_int(conn, "SELECT lo_close($1::int4)",
+						 1, params, &close_result) ||
+			close_result != 0)
+			result = -1;
+	}
+
+	return result;
+}
+
+/*
+ * cancellable_lo_import
+ *
+ * Like lo_import, but each server round-trip is cancellable via SIGINT.
+ * Returns the OID of the new large object, or InvalidOid on failure.
+ */
+Oid
+cancellable_lo_import(PGconn *conn, const char *filename)
+{
+	char		oidbuf[32];
+	char		fdbuf[32];
+	char		modebuf[32];
+	const char *params[2];
+	int			paramLengths[2];
+	int			paramFormats[2];
+	int			fd;
+	Oid			lobjOid;
+	int			lobj;
+	int			tmp;
+	char		buf[LO_BUFSIZE];
+	int			nbytes;
+
+	/* Open local file */
+	fd = open(filename, O_RDONLY | PG_BINARY, 0666);
+	if (fd < 0)
+	{
+		pg_log_error("could not open file \"%s\": %m", filename);
+		return InvalidOid;
+	}
+
+	/* lo_creat */
+	sprintf(modebuf, "%d", INV_READ | INV_WRITE);
+	params[0] = modebuf;
+	if (!lo_exec_int(conn, "SELECT lo_creat($1::int4)",
+					 1, params, &tmp))
+	{
+		close(fd);
+		return InvalidOid;
+	}
+	lobjOid = (Oid) tmp;
+	if (lobjOid == InvalidOid)
+	{
+		close(fd);
+		return InvalidOid;
+	}
+
+	/* lo_open */
+	sprintf(oidbuf, "%u", lobjOid);
+	sprintf(modebuf, "%d", INV_WRITE);
+	params[0] = oidbuf;
+	params[1] = modebuf;
+	if (!lo_exec_int(conn, "SELECT lo_open($1::oid, $2::int4)",
+					 2, params, &lobj))
+	{
+		close(fd);
+		return InvalidOid;
+	}
+	if (lobj == -1)
+	{
+		close(fd);
+		return InvalidOid;
+	}
+
+	sprintf(fdbuf, "%d", lobj);
+
+	/* Write loop: read from file, write to large object */
+	while ((nbytes = read(fd, buf, LO_BUFSIZE)) > 0)
+	{
+		PGresult   *res;
+
+		paramFormats[0] = 0;	/* text for fd */
+		paramFormats[1] = 1;	/* binary for bytea data */
+		paramLengths[0] = 0;
+		paramLengths[1] = nbytes;
+		params[0] = fdbuf;
+		params[1] = buf;
+
+		res = cancellable_exec_params(conn,
+									  "SELECT lowrite($1::int4, $2::bytea)",
+									  2, NULL, params,
+									  paramLengths, paramFormats, 0);
+		if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		{
+			PQclear(res);
+			close(fd);
+			return InvalidOid;
+		}
+
+		tmp = atoi(PQgetvalue(res, 0, 0));
+		PQclear(res);
+
+		if (tmp != nbytes)
+		{
+			close(fd);
+			return InvalidOid;
+		}
+	}
+
+	if (nbytes < 0)
+	{
+		pg_log_error("could not read from file \"%s\": %m", filename);
+		params[0] = fdbuf;
+		paramFormats[0] = 0;
+		cancellable_exec_params(conn, "SELECT lo_close($1::int4)",
+								1, NULL, params, NULL, NULL, 0);
+		close(fd);
+		return InvalidOid;
+	}
+
+	close(fd);
+
+	/* lo_close */
+	params[0] = fdbuf;
+	if (!lo_exec_int(conn, "SELECT lo_close($1::int4)",
+					 1, params, &tmp) ||
+		tmp != 0)
+		return InvalidOid;
+
+	return lobjOid;
+}
+
+/*
+ * cancellable_lo_unlink
+ *
+ * Like lo_unlink, but cancellable via SIGINT.
+ * Returns 1 on success, -1 on failure.
+ */
+int
+cancellable_lo_unlink(PGconn *conn, Oid lobjId)
+{
+	char		oidbuf[32];
+	const char *params[1];
+	int			result;
+
+	sprintf(oidbuf, "%u", lobjId);
+	params[0] = oidbuf;
+
+	if (!lo_exec_int(conn, "SELECT lo_unlink($1::oid)",
+					 1, params, &result))
+		return -1;
+
+	return result;
+}
+
+/*
+ * cancel_pipe_fd
+ *
+ * Return the read end of the cancel pipe, for use in select()/poll() by
+ * callers that manage their own wait loops (e.g. parallel_slot.c).
+ * Returns -1 on Windows or if the cancel handler hasn't been set up.
+ */
+int
+cancel_pipe_fd(void)
+{
+#ifndef WIN32
+	return cancel_pipe[0];
+#else
+	return -1;
+#endif
 }
 
-#else							/* WIN32 */
 
+#ifdef WIN32
+/*
+ * Console control handler for Windows.
+ *
+ * This runs in a separate thread created by the OS.  It sets
+ * CancelRequested and invokes the callback, but does not send a cancel
+ * request itself -- that is done by the main thread in the cancellable
+ * wait functions when it notices CancelRequested.
+ */
 static BOOL WINAPI
 consoleHandler(DWORD dwCtrlType)
 {
-	char		errbuf[256];
-
 	if (dwCtrlType == CTRL_C_EVENT ||
 		dwCtrlType == CTRL_BREAK_EVENT)
 	{
 		CancelRequested = true;
 
+		if (cancel_event)
+			SetEvent(cancel_event);
+
 		if (cancel_callback != NULL)
 			cancel_callback();
 
-		/* Send QueryCancel if we are processing a database query */
-		EnterCriticalSection(&cancelConnLock);
-		if (cancelConn != NULL)
-		{
-			if (PQcancel(cancelConn, errbuf, sizeof(errbuf)))
-			{
-				write_stderr(cancel_sent_msg);
-			}
-			else
-			{
-				write_stderr(cancel_not_sent_msg);
-				write_stderr(errbuf);
-			}
-		}
-
-		LeaveCriticalSection(&cancelConnLock);
-
 		return TRUE;
 	}
 	else
@@ -228,16 +869,77 @@ consoleHandler(DWORD dwCtrlType)
 		return FALSE;
 }
 
+#else							/* !WIN32 */
+
+/*
+ * Signal handler for SIGINT.  Sets CancelRequested and wakes up the main
+ * thread by writing to the pipe.
+ */
+static void
+handle_sigint(SIGNAL_ARGS)
+{
+	int			save_errno = errno;
+	char		c = 1;
+
+	CancelRequested = true;
+
+	if (cancel_callback != NULL)
+		cancel_callback();
+
+	/* Wake up the main thread's select() call */
+	if (cancel_pipe[1] >= 0)
+		(void) write(cancel_pipe[1], &c, 1);
+
+	errno = save_errno;
+}
+
+#endif							/* WIN32 */
+
+
+/*
+ * setup_cancel_handler
+ *
+ * Set up handler for SIGINT (Unix) or console events (Windows) to send a
+ * cancel request to the server.
+ *
+ * The optional callback is invoked directly from the signal handler context
+ * on every SIGINT (on Unix), so it must be async-signal-safe.
+ */
 void
-setup_cancel_handler(void (*callback) (void))
+setup_cancel_handler(void (*query_cancel_callback) (void))
 {
-	cancel_callback = callback;
+	cancel_callback = query_cancel_callback;
 	cancel_sent_msg = _("Cancel request sent\n");
 	cancel_not_sent_msg = _("Could not send cancel request: ");
 
-	InitializeCriticalSection(&cancelConnLock);
-
+#ifdef WIN32
+	cancel_event = CreateEvent(NULL, TRUE, FALSE, NULL);
+	if (cancel_event == NULL)
+	{
+		pg_log_error("could not create event for cancel: error code %lu",
+					 GetLastError());
+		exit(1);
+	}
 	SetConsoleCtrlHandler(consoleHandler, TRUE);
-}
+#else
 
-#endif							/* WIN32 */
+	/*
+	 * Create the pipe that the signal handler uses to wake the main thread.
+	 * See comment on cancel_pipe above.
+	 */
+	if (pipe(cancel_pipe) < 0)
+	{
+		pg_log_error("could not create pipe for cancel: %m");
+		exit(1);
+	}
+
+	/*
+	 * Make both ends non-blocking: the write end so that the signal handler
+	 * won't block, and the read end so that drain_cancel_pipe() won't block.
+	 */
+	fcntl(cancel_pipe[0], F_SETFL, O_NONBLOCK);
+	fcntl(cancel_pipe[1], F_SETFL, O_NONBLOCK);
+
+	pqsignal(SIGINT, handle_sigint);
+#endif
+}
diff --git a/src/fe_utils/parallel_slot.c b/src/fe_utils/parallel_slot.c
index fb9e6cc4ec1..dbf958c5656 100644
--- a/src/fe_utils/parallel_slot.c
+++ b/src/fe_utils/parallel_slot.c
@@ -60,13 +60,11 @@ consumeQueryResult(ParallelSlot *slot)
 	bool		ok = true;
 	PGresult   *result;
 
-	SetCancelConn(slot->connection);
-	while ((result = PQgetResult(slot->connection)) != NULL)
+	while ((result = cancellable_getresult(slot->connection)) != NULL)
 	{
 		if (!processQueryResult(slot, result))
 			ok = false;
 	}
-	ResetCancelConn();
 	return ok;
 }
 
@@ -81,6 +79,7 @@ select_loop(int maxFd, fd_set *workerset)
 {
 	int			i;
 	fd_set		saveSet = *workerset;
+	int			cancel_fd = cancel_pipe_fd();
 
 	if (CancelRequested)
 		return -1;
@@ -89,7 +88,8 @@ select_loop(int maxFd, fd_set *workerset)
 	{
 		/*
 		 * On Windows, we need to check once in a while for cancel requests;
-		 * on other platforms we rely on select() returning when interrupted.
+		 * on other platforms the cancel pipe makes select() return
+		 * immediately when SIGINT arrives.
 		 */
 		struct timeval *tvp;
 #ifdef WIN32
@@ -101,6 +101,16 @@ select_loop(int maxFd, fd_set *workerset)
 #endif
 
 		*workerset = saveSet;
+
+#ifndef WIN32
+		if (cancel_fd >= 0)
+		{
+			FD_SET(cancel_fd, workerset);
+			if (cancel_fd > maxFd)
+				maxFd = cancel_fd;
+		}
+#endif
+
 		i = select(maxFd + 1, workerset, NULL, NULL, tvp);
 
 #ifdef WIN32
@@ -115,10 +125,25 @@ select_loop(int maxFd, fd_set *workerset)
 
 		if (i < 0 && errno == EINTR)
 			continue;			/* ignore this */
-		if (i < 0 || CancelRequested)
-			return -1;			/* but not this */
+		if (i < 0)
+			return -1;
+
+#ifndef WIN32
+		if (cancel_fd >= 0 && FD_ISSET(cancel_fd, workerset))
+			return -1;			/* cancel requested */
+#endif
+
+		if (CancelRequested)
+			return -1;
 		if (i == 0)
 			continue;			/* timeout (Win32 only) */
+
+#ifndef WIN32
+		/* Remove the cancel pipe from the returned set */
+		if (cancel_fd >= 0)
+			FD_CLR(cancel_fd, workerset);
+#endif
+
 		break;
 	}
 
@@ -235,13 +260,15 @@ wait_on_slots(ParallelSlotArray *sa)
 	if (cancelconn == NULL)
 		return false;
 
-	SetCancelConn(cancelconn);
 	i = select_loop(maxFd, &slotset);
-	ResetCancelConn();
 
-	/* failure? */
+	/* failure or cancel? */
 	if (i < 0)
+	{
+		if (CancelRequested && cancelconn != NULL)
+			send_cancel(cancelconn);
 		return false;
+	}
 
 	for (i = 0; i < sa->numslots; i++)
 	{
diff --git a/src/fe_utils/query_utils.c b/src/fe_utils/query_utils.c
index c05fd9c21df..7e5b6676102 100644
--- a/src/fe_utils/query_utils.c
+++ b/src/fe_utils/query_utils.c
@@ -26,7 +26,7 @@ executeQuery(PGconn *conn, const char *query, bool echo)
 	if (echo)
 		printf("%s\n", query);
 
-	res = PQexec(conn, query);
+	res = cancellable_exec(conn, query);
 	if (!res ||
 		PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
@@ -51,7 +51,7 @@ executeCommand(PGconn *conn, const char *query, bool echo)
 	if (echo)
 		printf("%s\n", query);
 
-	res = PQexec(conn, query);
+	res = cancellable_exec(conn, query);
 	if (!res ||
 		PQresultStatus(res) != PGRES_COMMAND_OK)
 	{
@@ -79,9 +79,7 @@ executeMaintenanceCommand(PGconn *conn, const char *query, bool echo)
 	if (echo)
 		printf("%s\n", query);
 
-	SetCancelConn(conn);
-	res = PQexec(conn, query);
-	ResetCancelConn();
+	res = cancellable_exec(conn, query);
 
 	r = (res && PQresultStatus(res) == PGRES_COMMAND_OK);
 
diff --git a/src/include/fe_utils/cancel.h b/src/include/fe_utils/cancel.h
index e174fb83b92..1664c278a97 100644
--- a/src/include/fe_utils/cancel.h
+++ b/src/include/fe_utils/cancel.h
@@ -2,6 +2,7 @@
  *
  * Query cancellation support for frontend code
  *
+ * See cancel.c for an overview of the three cancellation mechanisms.
  *
  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
@@ -20,13 +21,32 @@
 
 extern PGDLLIMPORT volatile sig_atomic_t CancelRequested;
 
-extern void SetCancelConn(PGconn *conn);
-extern void ResetCancelConn(void);
-
-/*
- * A callback can be optionally set up to be called at cancellation
- * time.
- */
 extern void setup_cancel_handler(void (*query_cancel_callback) (void));
 
+extern void send_cancel(PGconn *conn);
+extern PGresult *cancellable_exec(PGconn *conn, const char *query);
+extern PGresult *cancellable_exec_params(PGconn *conn, const char *query,
+										 int nParams, const Oid *paramTypes,
+										 const char *const *paramValues,
+										 const int *paramLengths,
+										 const int *paramFormats,
+										 int resultFormat);
+extern PGresult *cancellable_prepare(PGconn *conn, const char *stmtName,
+									 const char *query, int nParams,
+									 const Oid *paramTypes);
+extern PGresult *cancellable_describe_prepared(PGconn *conn,
+											   const char *stmtName);
+extern PGresult *cancellable_getresult(PGconn *conn);
+extern bool cancellable_socket_wait(PGconn *conn, bool *cancel_sent,
+									bool for_write);
+extern int	cancellable_put_copy_data(PGconn *conn, const char *buffer,
+									  int nbytes, bool *cancel_sent);
+extern int	cancellable_put_copy_end(PGconn *conn, const char *errormsg,
+									 bool *cancel_sent);
+extern int	cancellable_lo_export(PGconn *conn, Oid lobjId,
+								  const char *filename);
+extern Oid	cancellable_lo_import(PGconn *conn, const char *filename);
+extern int	cancellable_lo_unlink(PGconn *conn, Oid lobjId);
+extern int	cancel_pipe_fd(void);
+
 #endif							/* CANCEL_H */

base-commit: f95d73ed433207c4323802dc96e52f3e5553a86c
-- 
2.53.0

