dblink query interruptibility

Started by Noah Mischabout 2 years ago10 messages
#1Noah Misch
noah@leadboat.com
1 attachment(s)

=== Background

Something as simple as the following doesn't respond to cancellation. In
v15+, any DROP DATABASE will hang as long as it's running:

SELECT dblink_exec(
$$dbname='$$||current_database()||$$' port=$$||current_setting('port'),
'SELECT pg_sleep(15)');

/messages/by-id/4B584C99.8090004@enterprisedb.com proposed a fix back in
2010. Latches and the libpqsrv facility have changed the server programming
environment since that patch. The problem statement also came up here:

On Thu, Dec 08, 2022 at 06:08:15PM -0800, Andres Freund wrote:

dblink.c uses a lot of other blocking libpq functions, which obviously also
isn't ok.

=== Key decisions

This patch adds to libpqsrv facility. It dutifully follows the existing
naming scheme. For greppability, I am favoring renaming new and old functions
such that the libpq name is a substring of this facility's name. That is,
rename libpqsrv_disconnect to srvPQfinish or maybe libpqsrv_PQfinish(). Now
is better than later, while pgxn contains no references to libpqsrv. Does
anyone else have a preference between naming schemes? If nobody does, I'll
keep today's libpqsrv_disconnect() style.

I was tempted to add a timeout argument to each libpqsrv function, which would
allow libpqsrv_get_result_last() to replace pgfdw_get_cleanup_result(). We
can always add a timeout-accepting function later and make this thread's
function name a thin wrapper around it. Does anyone feel a mandatory timeout
argument, accepting -1 for no timeout, would be the right thing?

=== Minor topics

It would be nice to replace libpqrcv_PQgetResult() and friends with the new
functions. I refrained since they use ProcessWalRcvInterrupts(), not
CHECK_FOR_INTERRUPTS(). Since walreceiver already reaches
CHECK_FOR_INTERRUPTS() via libpqsrv_connect_params(), things might just work.

This patch contains a PQexecParams() wrapper, called nowhere in
postgresql.git. It's inessential, but twelve pgxn modules do mention
PQexecParams. Just one mentions PQexecPrepared, and none mention PQdescribe*.

The patch makes postgres_fdw adopt its functions, as part of confirming the
functions are general enough. postgres_fdw create_cursor() has been passing
the "DECLARE CURSOR FOR inner_query" string for some error messages and just
inner_query for others. I almost standardized on the longer one, but the test
suite checks that. Hence, I standardized on just inner_query.

I wrote this because pglogical needs these functions to cooperate with v15+
DROP DATABASE (https://github.com/2ndQuadrant/pglogical/issues/418).

Thanks,
nm

Attachments:

libpqsrv-exec-v1.patchtext/plain; charset=us-asciiDownload
Author:     Noah Misch <noah@leadboat.com>
Commit:     Noah Misch <noah@leadboat.com>

    Make dblink interruptible, via new libpqsrv APIs.
    
    This replaces dblink's blocking libpq calls, allowing cancellation and
    allowing DROP DATABASE (of a database not involved in the query).  Apart
    from explicit dblink_cancel_query() calls, dblink still doesn't cancel
    the remote side.  The replacement for the blocking calls consists of
    new, general-purpose query execution wrappers in the libpqsrv facility.
    Out-of-tree extensions should adopt these.  Use them in postgres_fdw,
    replacing a local implementation from which the libpqsrv implementation
    derives.  This is a bug fix for dblink.  Code inspection identified the
    bug at least thirteen years ago, but user complaints have not appeared.
    Hence, no back-patch for now.
    
    Reviewed by FIXME.
    
    Discussion: https://postgr.es/m/FIXME

diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 195b278..4624e53 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -133,6 +133,7 @@ static HTAB *remoteConnHash = NULL;
 /* custom wait event values, retrieved from shared memory */
 static uint32 dblink_we_connect = 0;
 static uint32 dblink_we_get_conn = 0;
+static uint32 dblink_we_get_result = 0;
 
 /*
  *	Following is list that holds multiple remote connections.
@@ -252,6 +253,9 @@ dblink_init(void)
 {
 	if (!pconn)
 	{
+		if (dblink_we_get_result == 0)
+			dblink_we_get_result = WaitEventExtensionNew("DblinkGetResult");
+
 		pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn));
 		pconn->conn = NULL;
 		pconn->openCursorCount = 0;
@@ -442,7 +446,7 @@ dblink_open(PG_FUNCTION_ARGS)
 	/* If we are not in a transaction, start one */
 	if (PQtransactionStatus(conn) == PQTRANS_IDLE)
 	{
-		res = PQexec(conn, "BEGIN");
+		res = libpqsrv_exec(conn, "BEGIN", dblink_we_get_result);
 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
 			dblink_res_internalerror(conn, res, "begin error");
 		PQclear(res);
@@ -461,7 +465,7 @@ dblink_open(PG_FUNCTION_ARGS)
 		(rconn->openCursorCount)++;
 
 	appendStringInfo(&buf, "DECLARE %s CURSOR FOR %s", curname, sql);
-	res = PQexec(conn, buf.data);
+	res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
 	if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
 	{
 		dblink_res_error(conn, conname, res, fail,
@@ -530,7 +534,7 @@ dblink_close(PG_FUNCTION_ARGS)
 	appendStringInfo(&buf, "CLOSE %s", curname);
 
 	/* close the cursor */
-	res = PQexec(conn, buf.data);
+	res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
 	if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
 	{
 		dblink_res_error(conn, conname, res, fail,
@@ -550,7 +554,7 @@ dblink_close(PG_FUNCTION_ARGS)
 		{
 			rconn->newXactForCursor = false;
 
-			res = PQexec(conn, "COMMIT");
+			res = libpqsrv_exec(conn, "COMMIT", dblink_we_get_result);
 			if (PQresultStatus(res) != PGRES_COMMAND_OK)
 				dblink_res_internalerror(conn, res, "commit error");
 			PQclear(res);
@@ -632,7 +636,7 @@ dblink_fetch(PG_FUNCTION_ARGS)
 	 * PGresult will be long-lived even though we are still in a short-lived
 	 * memory context.
 	 */
-	res = PQexec(conn, buf.data);
+	res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
 	if (!res ||
 		(PQresultStatus(res) != PGRES_COMMAND_OK &&
 		 PQresultStatus(res) != PGRES_TUPLES_OK))
@@ -780,7 +784,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
 		else
 		{
 			/* async result retrieval, do it the old way */
-			PGresult   *res = PQgetResult(conn);
+			PGresult   *res = libpqsrv_get_result(conn, dblink_we_get_result);
 
 			/* NULL means we're all done with the async results */
 			if (res)
@@ -1088,7 +1092,8 @@ materializeQueryResult(FunctionCallInfo fcinfo,
 		PQclear(sinfo.last_res);
 		PQclear(sinfo.cur_res);
 		/* and clear out any pending data in libpq */
-		while ((res = PQgetResult(conn)) != NULL)
+		while ((res = libpqsrv_get_result(conn, dblink_we_get_result)) !=
+			   NULL)
 			PQclear(res);
 		PG_RE_THROW();
 	}
@@ -1115,7 +1120,7 @@ storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql)
 	{
 		CHECK_FOR_INTERRUPTS();
 
-		sinfo->cur_res = PQgetResult(conn);
+		sinfo->cur_res = libpqsrv_get_result(conn, dblink_we_get_result);
 		if (!sinfo->cur_res)
 			break;
 
@@ -1443,7 +1448,7 @@ dblink_exec(PG_FUNCTION_ARGS)
 		if (!conn)
 			dblink_conn_not_avail(conname);
 
-		res = PQexec(conn, sql);
+		res = libpqsrv_exec(conn, sql, dblink_we_get_result);
 		if (!res ||
 			(PQresultStatus(res) != PGRES_COMMAND_OK &&
 			 PQresultStatus(res) != PGRES_TUPLES_OK))
@@ -2740,8 +2745,8 @@ dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
 
 	/*
 	 * If we don't get a message from the PGresult, try the PGconn.  This is
-	 * needed because for connection-level failures, PQexec may just return
-	 * NULL, not a PGresult at all.
+	 * needed because for connection-level failures, PQgetResult may just
+	 * return NULL, not a PGresult at all.
 	 */
 	if (message_primary == NULL)
 		message_primary = pchomp(PQerrorMessage(conn));
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 5800c6a..8755244 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -187,6 +187,10 @@ GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
 	{
 		HASHCTL		ctl;
 
+		if (pgfdw_we_get_result == 0)
+			pgfdw_we_get_result =
+				WaitEventExtensionNew("PostgresFdwGetResult");
+
 		ctl.keysize = sizeof(ConnCacheKey);
 		ctl.entrysize = sizeof(ConnCacheEntry);
 		ConnectionHash = hash_create("postgres_fdw connections", 8,
@@ -716,7 +720,7 @@ do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
 	 */
 	if (consume_input && !PQconsumeInput(conn))
 		pgfdw_report_error(ERROR, NULL, conn, false, sql);
-	res = pgfdw_get_result(conn, sql);
+	res = pgfdw_get_result(conn);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, conn, true, sql);
 	PQclear(res);
@@ -819,7 +823,9 @@ GetPrepStmtNumber(PGconn *conn)
 /*
  * Submit a query and wait for the result.
  *
- * This function is interruptible by signals.
+ * Since we don't use non-blocking mode, this can't process interrupts while
+ * pushing the query text to the server.  That risk is relatively small, so we
+ * ignore that for now.
  *
  * Caller is responsible for the error handling on the result.
  */
@@ -830,81 +836,20 @@ pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
 	if (state && state->pendingAreq)
 		process_pending_request(state->pendingAreq);
 
-	/*
-	 * Submit a query.  Since we don't use non-blocking mode, this also can
-	 * block.  But its risk is relatively small, so we ignore that for now.
-	 */
 	if (!PQsendQuery(conn, query))
-		pgfdw_report_error(ERROR, NULL, conn, false, query);
-
-	/* Wait for the result. */
-	return pgfdw_get_result(conn, query);
+		return NULL;
+	return pgfdw_get_result(conn);
 }
 
 /*
- * Wait for the result from a prior asynchronous execution function call.
- *
- * This function offers quick responsiveness by checking for any interruptions.
- *
- * This function emulates PQexec()'s behavior of returning the last result
- * when there are many.
+ * Wrap libpqsrv_get_result_last(), adding wait event.
  *
  * Caller is responsible for the error handling on the result.
  */
 PGresult *
-pgfdw_get_result(PGconn *conn, const char *query)
+pgfdw_get_result(PGconn *conn)
 {
-	PGresult   *volatile last_res = NULL;
-
-	/* In what follows, do not leak any PGresults on an error. */
-	PG_TRY();
-	{
-		for (;;)
-		{
-			PGresult   *res;
-
-			while (PQisBusy(conn))
-			{
-				int			wc;
-
-				/* first time, allocate or get the custom wait event */
-				if (pgfdw_we_get_result == 0)
-					pgfdw_we_get_result = WaitEventExtensionNew("PostgresFdwGetResult");
-
-				/* Sleep until there's something to do */
-				wc = WaitLatchOrSocket(MyLatch,
-									   WL_LATCH_SET | WL_SOCKET_READABLE |
-									   WL_EXIT_ON_PM_DEATH,
-									   PQsocket(conn),
-									   -1L, pgfdw_we_get_result);
-				ResetLatch(MyLatch);
-
-				CHECK_FOR_INTERRUPTS();
-
-				/* Data available in socket? */
-				if (wc & WL_SOCKET_READABLE)
-				{
-					if (!PQconsumeInput(conn))
-						pgfdw_report_error(ERROR, NULL, conn, false, query);
-				}
-			}
-
-			res = PQgetResult(conn);
-			if (res == NULL)
-				break;			/* query is complete */
-
-			PQclear(last_res);
-			last_res = res;
-		}
-	}
-	PG_CATCH();
-	{
-		PQclear(last_res);
-		PG_RE_THROW();
-	}
-	PG_END_TRY();
-
-	return last_res;
+	return libpqsrv_get_result_last(conn, pgfdw_we_get_result);
 }
 
 /*
@@ -945,8 +890,8 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
 
 		/*
 		 * If we don't get a message from the PGresult, try the PGconn.  This
-		 * is needed because for connection-level failures, PQexec may just
-		 * return NULL, not a PGresult at all.
+		 * is needed because for connection-level failures, PQgetResult may
+		 * just return NULL, not a PGresult at all.
 		 */
 		if (message_primary == NULL)
 			message_primary = pchomp(PQerrorMessage(conn));
@@ -1046,7 +991,8 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 					 */
 					if (entry->have_prep_stmt && entry->have_error)
 					{
-						res = PQexec(entry->conn, "DEALLOCATE ALL");
+						res = pgfdw_exec_query(entry->conn, "DEALLOCATE ALL",
+											   NULL);
 						PQclear(res);
 					}
 					entry->have_prep_stmt = false;
diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c
index 09fd489..5084aa6 100644
--- a/contrib/postgres_fdw/deparse.c
+++ b/contrib/postgres_fdw/deparse.c
@@ -3673,7 +3673,7 @@ appendOrderBySuffix(Oid sortop, Oid sortcoltype, bool nulls_first,
  * Print the representation of a parameter to be sent to the remote side.
  *
  * Note: we always label the Param's type explicitly rather than relying on
- * transmitting a numeric type OID in PQexecParams().  This allows us to
+ * transmitting a numeric type OID in PQsendQueryParams().  This allows us to
  * avoid assuming that types have the same OIDs on the remote side as they
  * do locally --- they need only have the same names.
  */
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 6de2bec..7d509c6 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -3759,7 +3759,7 @@ create_cursor(ForeignScanState *node)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = pgfdw_get_result(conn, buf.data);
+	res = pgfdw_get_result(conn);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
 	PQclear(res);
@@ -3809,7 +3809,7 @@ fetch_more_data(ForeignScanState *node)
 			 * The query was already sent by an earlier call to
 			 * fetch_more_data_begin.  So now we just fetch the result.
 			 */
-			res = pgfdw_get_result(conn, fsstate->query);
+			res = pgfdw_get_result(conn);
 			/* On error, report the original query, not the FETCH. */
 			if (PQresultStatus(res) != PGRES_TUPLES_OK)
 				pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
@@ -4158,7 +4158,7 @@ execute_foreign_modify(EState *estate,
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = pgfdw_get_result(fmstate->conn, fmstate->query);
+	res = pgfdw_get_result(fmstate->conn);
 	if (PQresultStatus(res) !=
 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -4228,7 +4228,7 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = pgfdw_get_result(fmstate->conn, fmstate->query);
+	res = pgfdw_get_result(fmstate->conn);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
 	PQclear(res);
@@ -4570,7 +4570,7 @@ execute_dml_stmt(ForeignScanState *node)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
+	dmstate->result = pgfdw_get_result(dmstate->conn);
 	if (PQresultStatus(dmstate->result) !=
 		(dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
 		pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index 47157ac..3e94d51 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -158,7 +158,7 @@ extern void ReleaseConnection(PGconn *conn);
 extern unsigned int GetCursorNumber(PGconn *conn);
 extern unsigned int GetPrepStmtNumber(PGconn *conn);
 extern void do_sql_command(PGconn *conn, const char *sql);
-extern PGresult *pgfdw_get_result(PGconn *conn, const char *query);
+extern PGresult *pgfdw_get_result(PGconn *conn);
 extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query,
 								  PgFdwConnState *state);
 extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
diff --git a/doc/src/sgml/dblink.sgml b/doc/src/sgml/dblink.sgml
index e8de5a6..81f3598 100644
--- a/doc/src/sgml/dblink.sgml
+++ b/doc/src/sgml/dblink.sgml
@@ -37,6 +37,15 @@
     </para>
    </listitem>
   </varlistentry>
+
+  <varlistentry>
+   <term><literal>DblinkGetResult</literal></term>
+   <listitem>
+    <para>
+     Waiting to receive the results of a query from a remote server.
+    </para>
+   </listitem>
+  </varlistentry>
  </variablelist>
 
  <para>
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 60d5c1f..2825001 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -648,12 +648,9 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
  * Send a query and wait for the results by using the asynchronous libpq
  * functions and socket readiness events.
  *
- * We must not use the regular blocking libpq functions like PQexec()
- * since they are uninterruptible by signals on some platforms, such as
- * Windows.
- *
- * The function is modeled on PQexec() in libpq, but only implements
- * those parts that are in use in the walreceiver api.
+ * The function is modeled on libpqsrv_exec(), with the behavior difference
+ * being that it calls ProcessWalRcvInterrupts().  As an optimization, it
+ * skips try/catch, since all errors terminate the process.
  *
  * May return NULL, rather than an error result, on failure.
  */
diff --git a/src/include/libpq/libpq-be-fe-helpers.h b/src/include/libpq/libpq-be-fe-helpers.h
index 41e3bb4..a4b3e80 100644
--- a/src/include/libpq/libpq-be-fe-helpers.h
+++ b/src/include/libpq/libpq-be-fe-helpers.h
@@ -49,6 +49,8 @@
 
 static inline void libpqsrv_connect_prepare(void);
 static inline void libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info);
+static inline PGresult *libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info);
+static inline PGresult *libpqsrv_get_result(PGconn *conn, uint32 wait_event_info);
 
 
 /*
@@ -239,4 +241,129 @@ libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info)
 	PG_END_TRY();
 }
 
+/*
+ * PQexec() wrapper that processes interrupts.
+ *
+ * Unless PQsetnonblocking(conn, 1) is in effect, this can't process
+ * interrupts while pushing the query text to the server.  Consider that
+ * setting if query strings can be long relative to TCP buffer size.
+ *
+ * This has the preconditions of PQsendQuery(), not those of PQexec().  Most
+ * notably, PQexec() would silently discard any prior query results.
+ */
+static inline PGresult *
+libpqsrv_exec(PGconn *conn, const char *query, uint32 wait_event_info)
+{
+	if (!PQsendQuery(conn, query))
+		return NULL;
+	return libpqsrv_get_result_last(conn, wait_event_info);
+}
+
+/*
+ * PQexecParams() wrapper that processes interrupts.
+ *
+ * See notes at libpqsrv_exec().
+ */
+static inline PGresult *
+libpqsrv_exec_params(PGconn *conn,
+					 const char *command,
+					 int nParams,
+					 const Oid *paramTypes,
+					 const char *const *paramValues,
+					 const int *paramLengths,
+					 const int *paramFormats,
+					 int resultFormat,
+					 uint32 wait_event_info)
+{
+	if (!PQsendQueryParams(conn, command, nParams, paramTypes, paramValues,
+						   paramLengths, paramFormats, resultFormat))
+		return NULL;
+	return libpqsrv_get_result_last(conn, wait_event_info);
+}
+
+/*
+ * Like PQexec(), loop over PQgetResult() until it returns NULL or another
+ * terminal state.  Return the last non-NULL result or the terminal state.
+ */
+static inline PGresult *
+libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info)
+{
+	PGresult   *volatile lastResult = NULL;
+
+	/* In what follows, do not leak any PGresults on an error. */
+	PG_TRY();
+	{
+		for (;;)
+		{
+			/* Wait for, and collect, the next PGresult. */
+			PGresult   *result;
+
+			result = libpqsrv_get_result(conn, wait_event_info);
+			if (result == NULL)
+				break;			/* query is complete, or failure */
+
+			/*
+			 * Emulate PQexec()'s behavior of returning the last result when
+			 * there are many.
+			 */
+			PQclear(lastResult);
+			lastResult = result;
+
+			if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
+				PQresultStatus(lastResult) == PGRES_COPY_OUT ||
+				PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
+				PQstatus(conn) == CONNECTION_BAD)
+				break;
+		}
+	}
+	PG_CATCH();
+	{
+		PQclear(lastResult);
+		PG_RE_THROW();
+	}
+	PG_END_TRY();
+
+	return lastResult;
+}
+
+/*
+ * Perform the equivalent of PQgetResult(), but watch for interrupts.
+ */
+static inline PGresult *
+libpqsrv_get_result(PGconn *conn, uint32 wait_event_info)
+{
+	/*
+	 * Collect data until PQgetResult is ready to get the result without
+	 * blocking.
+	 */
+	while (PQisBusy(conn))
+	{
+		int			rc;
+
+		rc = WaitLatchOrSocket(MyLatch,
+							   WL_EXIT_ON_PM_DEATH | WL_LATCH_SET |
+							   WL_SOCKET_READABLE,
+							   PQsocket(conn),
+							   0,
+							   wait_event_info);
+
+		/* Interrupted? */
+		if (rc & WL_LATCH_SET)
+		{
+			ResetLatch(MyLatch);
+			CHECK_FOR_INTERRUPTS();
+		}
+
+		/* Consume whatever data is available from the socket */
+		if (PQconsumeInput(conn) == 0)
+		{
+			/* trouble; expect PQgetResult() to return NULL */
+			break;
+		}
+	}
+
+	/* Now we can collect and return the next PGresult */
+	return PQgetResult(conn);
+}
+
 #endif							/* LIBPQ_BE_FE_HELPERS_H */
#2Fujii Masao
masao.fujii@gmail.com
In reply to: Noah Misch (#1)
Re: dblink query interruptibility

On Wed, Nov 22, 2023 at 10:29 AM Noah Misch <noah@leadboat.com> wrote:

=== Background

Something as simple as the following doesn't respond to cancellation. In
v15+, any DROP DATABASE will hang as long as it's running:

SELECT dblink_exec(
$$dbname='$$||current_database()||$$' port=$$||current_setting('port'),
'SELECT pg_sleep(15)');

/messages/by-id/4B584C99.8090004@enterprisedb.com proposed a fix back in
2010. Latches and the libpqsrv facility have changed the server programming
environment since that patch. The problem statement also came up here:

On Thu, Dec 08, 2022 at 06:08:15PM -0800, Andres Freund wrote:

dblink.c uses a lot of other blocking libpq functions, which obviously also
isn't ok.

=== Key decisions

This patch adds to libpqsrv facility.

I found that this patch was committed at d3c5f37dd5 and changed the
error message in postgres_fdw slightly. Here's an example:

#1. Begin a new transaction.
#2. Execute a query accessing to a foreign table, like SELECT * FROM
<foreign table>
#3. Terminate the *remote* session corresponding to the foreign table.
#4. Commit the transaction, and then currently the following error
message is output.

ERROR: FATAL: terminating connection due to administrator command
server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
invalid socket

Previously, before commit d3c5f37dd5, the error message at #4 did not
include "invalid socket." Now, after the commit, it does. Is this
change intentional?

+ /* Consume whatever data is available from the socket */
+ if (PQconsumeInput(conn) == 0)
+ {
+ /* trouble; expect PQgetResult() to return NULL */
+ break;
+ }
+ }
+
+ /* Now we can collect and return the next PGresult */
+ return PQgetResult(conn);

This code appears to cause the change. When the remote session ends,
PQconsumeInput() returns 0 and marks conn->socket as invalid.
Subsequent PQgetResult() calls pqWait(), detecting the invalid socket
and appending "invalid socket" to the error message.

I think the "invalid socket" message is unsuitable in this scenario,
and PQgetResult() should not be called after PQconsumeInput() returns
0. Thought?

Regards,

--
Fujii Masao

#3Noah Misch
noah@leadboat.com
In reply to: Fujii Masao (#2)
Re: dblink query interruptibility

On Thu, Jan 25, 2024 at 04:23:39AM +0900, Fujii Masao wrote:

I found that this patch was committed at d3c5f37dd5 and changed the
error message in postgres_fdw slightly. Here's an example:

#1. Begin a new transaction.
#2. Execute a query accessing to a foreign table, like SELECT * FROM
<foreign table>
#3. Terminate the *remote* session corresponding to the foreign table.
#4. Commit the transaction, and then currently the following error
message is output.

ERROR: FATAL: terminating connection due to administrator command
server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
invalid socket

Previously, before commit d3c5f37dd5, the error message at #4 did not
include "invalid socket." Now, after the commit, it does. Is this
change intentional?

No. It's a consequence of an intentional change in libpq call sequence, but I
was unaware I was changing an error message.

+ /* Consume whatever data is available from the socket */
+ if (PQconsumeInput(conn) == 0)
+ {
+ /* trouble; expect PQgetResult() to return NULL */
+ break;
+ }
+ }
+
+ /* Now we can collect and return the next PGresult */
+ return PQgetResult(conn);

This code appears to cause the change. When the remote session ends,
PQconsumeInput() returns 0 and marks conn->socket as invalid.
Subsequent PQgetResult() calls pqWait(), detecting the invalid socket
and appending "invalid socket" to the error message.

I think the "invalid socket" message is unsuitable in this scenario,
and PQgetResult() should not be called after PQconsumeInput() returns
0. Thought?

The documentation is absolute about the necessity of PQgetResult():

PQsendQuery cannot be called again (on the same connection) until
PQgetResult has returned a null pointer, indicating that the command is
done.

PQgetResult must be called repeatedly until it returns a null pointer,
indicating that the command is done. (If called when no command is active,
PQgetResult will just return a null pointer at once.)

Similar statements also appear in libpq-pipeline-results,
libpq-pipeline-errors, and libpq-copy.

So, unless the documentation or my reading of it is wrong there, I think the
answer is something other than skipping PQgetResult(). Perhaps PQgetResult()
should not append "invalid socket" in this case? The extra line is a net
negative, though it's not wrong and not awful.

Thanks for reporting the change.

#4Fujii Masao
masao.fujii@gmail.com
In reply to: Noah Misch (#3)
Re: dblink query interruptibility

On Thu, Jan 25, 2024 at 5:45 AM Noah Misch <noah@leadboat.com> wrote:

On Thu, Jan 25, 2024 at 04:23:39AM +0900, Fujii Masao wrote:

I found that this patch was committed at d3c5f37dd5 and changed the
error message in postgres_fdw slightly. Here's an example:

#1. Begin a new transaction.
#2. Execute a query accessing to a foreign table, like SELECT * FROM
<foreign table>
#3. Terminate the *remote* session corresponding to the foreign table.
#4. Commit the transaction, and then currently the following error
message is output.

ERROR: FATAL: terminating connection due to administrator command
server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
invalid socket

Previously, before commit d3c5f37dd5, the error message at #4 did not
include "invalid socket." Now, after the commit, it does. Is this
change intentional?

No. It's a consequence of an intentional change in libpq call sequence, but I
was unaware I was changing an error message.

+ /* Consume whatever data is available from the socket */
+ if (PQconsumeInput(conn) == 0)
+ {
+ /* trouble; expect PQgetResult() to return NULL */
+ break;
+ }
+ }
+
+ /* Now we can collect and return the next PGresult */
+ return PQgetResult(conn);

This code appears to cause the change. When the remote session ends,
PQconsumeInput() returns 0 and marks conn->socket as invalid.
Subsequent PQgetResult() calls pqWait(), detecting the invalid socket
and appending "invalid socket" to the error message.

I think the "invalid socket" message is unsuitable in this scenario,
and PQgetResult() should not be called after PQconsumeInput() returns
0. Thought?

The documentation is absolute about the necessity of PQgetResult():

The documentation looks unclear to me regarding what should be done
when PQconsumeInput() returns 0. So I'm not sure if PQgetResult()
must be called even in that case.

As far as I read some functions like libpqrcv_PQgetResult() that use
PQconsumeInput(), it appears that they basically report the error message
using PQerrorMessage(), without calling PQgetResult(),
when PQconsumeInput() returns 0.

Regards,

--
Fujii Masao

#5Noah Misch
noah@leadboat.com
In reply to: Fujii Masao (#4)
Re: dblink query interruptibility

On Thu, Jan 25, 2024 at 12:28:43PM +0900, Fujii Masao wrote:

On Thu, Jan 25, 2024 at 5:45 AM Noah Misch <noah@leadboat.com> wrote:

On Thu, Jan 25, 2024 at 04:23:39AM +0900, Fujii Masao wrote:

I found that this patch was committed at d3c5f37dd5 and changed the
error message in postgres_fdw slightly. Here's an example:

#1. Begin a new transaction.
#2. Execute a query accessing to a foreign table, like SELECT * FROM
<foreign table>
#3. Terminate the *remote* session corresponding to the foreign table.
#4. Commit the transaction, and then currently the following error
message is output.

ERROR: FATAL: terminating connection due to administrator command
server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
invalid socket

Previously, before commit d3c5f37dd5, the error message at #4 did not
include "invalid socket." Now, after the commit, it does.

Other clients have witnessed the extra "invalid socket" message:
https://dba.stackexchange.com/questions/335081/how-to-investigate-intermittent-postgres-connection-errors
https://stackoverflow.com/questions/77781358/pgbackrest-backup-error-with-exit-code-57
https://github.com/timescale/timescaledb/issues/102

+ /* Consume whatever data is available from the socket */
+ if (PQconsumeInput(conn) == 0)
+ {
+ /* trouble; expect PQgetResult() to return NULL */
+ break;
+ }
+ }
+
+ /* Now we can collect and return the next PGresult */
+ return PQgetResult(conn);

This code appears to cause the change. When the remote session ends,
PQconsumeInput() returns 0 and marks conn->socket as invalid.
Subsequent PQgetResult() calls pqWait(), detecting the invalid socket
and appending "invalid socket" to the error message.

What do you think of making PQconsumeInput() set PGASYNC_READY and
CONNECTION_BAD in this case? Since libpq appended "server closed the
connection unexpectedly", it knows those indicators are correct. That way,
PQgetResult() won't issue a pointless pqWait() call.

I think the "invalid socket" message is unsuitable in this scenario,
and PQgetResult() should not be called after PQconsumeInput() returns
0. Thought?

The documentation is absolute about the necessity of PQgetResult():

The documentation looks unclear to me regarding what should be done
when PQconsumeInput() returns 0. So I'm not sure if PQgetResult()
must be called even in that case.

I agree PQconsumeInput() docs don't specify how to interpret it returning 0.

As far as I read some functions like libpqrcv_PQgetResult() that use
PQconsumeInput(), it appears that they basically report the error message
using PQerrorMessage(), without calling PQgetResult(),
when PQconsumeInput() returns 0.

libpqrcv_PQgetResult() is part of walreceiver, where any ERROR becomes FATAL.
Hence, it can't hurt anything by eagerly skipping to ERROR. I designed
libpqsrv_exec() to mimic PQexec() as closely as possible, so it would be a
drop-in replacement for arbitrary callers. Ideally, accepting interrupts
would be the only caller-visible difference.

I know of three ways PQconsumeInput() can return 0, along with my untested
estimates of how they work:

a. Protocol violation. handleSyncLoss() sets PGASYNC_READY and
CONNECTION_BAD. PQgetResult() is optional.

b. Connection broken. PQgetResult() is optional.

c. ENOMEM. PGASYNC_ and CONNECTION_ status don't change. Applications choose
among (c1) free memory and retry, (c2) close the connection, or (c3) call
PQgetResult() to break protocol sync and set PGASYNC_IDLE:

Comparing PQconsumeInput() with the PQgetResult() block under "while
(conn->asyncStatus == PGASYNC_BUSY)", there's a key difference that
PQgetResult() sets PGASYNC_IDLE on most errors, including ENOMEM. That
prevents PQexec() subroutine PQexecFinish() from busy looping on ENOMEM, but I
suspect that also silently breaks protocol sync. While we could change it
from (c3) to (c2) by dropping the connection via handleSyncLoss() or
equivalent, I'm not confident about that being better.

libpqsrv_exec() implements (c3) by way of calling PQgetResult() after
PQconsumeInput() fails. If PQisBusy(), the same ENOMEM typically will repeat,
yielding (c3). If memory became available in that brief time, PQgetResult()
may instead block. That blocking is unwanted but unimportant.

#6Andreas Karlsson
andreas@proxel.se
In reply to: Noah Misch (#1)
1 attachment(s)
Re: dblink query interruptibility

On 11/22/23 2:29 AM, Noah Misch wrote:

Something as simple as the following doesn't respond to cancellation. In
v15+, any DROP DATABASE will hang as long as it's running:

Hi,

One of our customers ran into this bug when upgrading from PostgreSQL 14
to PostgreSQL 16. Your commit[1] fixed this issue in PostgreSQL 17 but
the bugfix was not backported with the explanation below.

Code inspection identified the bug at least thirteen years ago, but

user complaints have not appeared. Hence, no back-patch for now.

But that is as far as I can tell not the case because at least for
CREATE DATABASE the bug was introduced in a commit[2] in PostgeSQL 15.
And now that we actually have a user complaint what do you think about
backporting the fix?

The patch seems small and relatively safe to backport and the functions
have had no bugfixes as far as I could see. And I did a quick git
cherry-pick myself on top of PG 16 (see attached patch) and the only
conflict was related to the introduction of custom wait events which was
easy to fix.

Here is a small snippet which reproduces the bug. It hangs reliably on
PostgreSQL 15 and 16, but not 14, 17 or HEAD.

CREATE EXTENSION dblink;

CREATE FUNCTION f(text, int, text) RETURNS VOID LANGUAGE plpgsql AS $$
BEGIN
PERFORM dblink_connect(format('host=%s port=%s user=%s
dbname=postgres', $1, $2, $3));
RAISE NOTICE 'dblink connected!';
PERFORM dblink_exec('DROP DATABASE test');
RAISE NOTICE 'Database dropped!';
PERFORM dblink_disconnect();
RAISE NOTICE 'dblink disconnected!';
END
$$;

CREATE DATABASE test;
SELECT f(:'HOST', :PORT, :'USER');

Andreas

1.
https://git.postgresql.org/gitweb/?p=postgresql.git;a=commitdiff;h=d3c5f37dd543498cc7c678815d3921823beec9e9
2.
https://git.postgresql.org/gitweb/?p=postgresql.git;a=commitdiff;h=e2f65f42555ff531c6d7c8f151526b4ef7c016f8

Attachments:

0001-Make-dblink-interruptible-via-new-libpqsrv-APIs.patchtext/x-patch; charset=UTF-8; name=0001-Make-dblink-interruptible-via-new-libpqsrv-APIs.patchDownload
From 9fa3af45dcfc6a27d3d2aa5ae29812bbd4740751 Mon Sep 17 00:00:00 2001
From: Noah Misch <noah@leadboat.com>
Date: Mon, 8 Jan 2024 11:39:56 -0800
Subject: [PATCH] Make dblink interruptible, via new libpqsrv APIs.

This replaces dblink's blocking libpq calls, allowing cancellation and
allowing DROP DATABASE (of a database not involved in the query).  Apart
from explicit dblink_cancel_query() calls, dblink still doesn't cancel
the remote side.  The replacement for the blocking calls consists of
new, general-purpose query execution wrappers in the libpqsrv facility.
Out-of-tree extensions should adopt these.  Use them in postgres_fdw,
replacing a local implementation from which the libpqsrv implementation
derives.  This is a bug fix for dblink.  Code inspection identified the
bug at least thirteen years ago, but user complaints have not appeared.
Hence, no back-patch for now.

Discussion: https://postgr.es/m/20231122012945.74@rfd.leadboat.com
---
 contrib/dblink/dblink.c                       |  24 ++--
 contrib/postgres_fdw/connection.c             |  80 ++---------
 contrib/postgres_fdw/deparse.c                |   2 +-
 contrib/postgres_fdw/postgres_fdw.c           |  10 +-
 contrib/postgres_fdw/postgres_fdw.h           |   2 +-
 .../libpqwalreceiver/libpqwalreceiver.c       |   9 +-
 src/include/libpq/libpq-be-fe-helpers.h       | 127 ++++++++++++++++++
 7 files changed, 163 insertions(+), 91 deletions(-)

diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 1ff65d1e521..bd409d7bf1b 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -61,6 +61,7 @@
 #include "utils/memutils.h"
 #include "utils/rel.h"
 #include "utils/varlena.h"
+#include "utils/wait_event.h"
 
 PG_MODULE_MAGIC;
 
@@ -430,7 +431,7 @@ dblink_open(PG_FUNCTION_ARGS)
 	/* If we are not in a transaction, start one */
 	if (PQtransactionStatus(conn) == PQTRANS_IDLE)
 	{
-		res = PQexec(conn, "BEGIN");
+		res = libpqsrv_exec(conn, "BEGIN", PG_WAIT_EXTENSION);
 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
 			dblink_res_internalerror(conn, res, "begin error");
 		PQclear(res);
@@ -449,7 +450,7 @@ dblink_open(PG_FUNCTION_ARGS)
 		(rconn->openCursorCount)++;
 
 	appendStringInfo(&buf, "DECLARE %s CURSOR FOR %s", curname, sql);
-	res = PQexec(conn, buf.data);
+	res = libpqsrv_exec(conn, buf.data, PG_WAIT_EXTENSION);
 	if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
 	{
 		dblink_res_error(conn, conname, res, fail,
@@ -518,7 +519,7 @@ dblink_close(PG_FUNCTION_ARGS)
 	appendStringInfo(&buf, "CLOSE %s", curname);
 
 	/* close the cursor */
-	res = PQexec(conn, buf.data);
+	res = libpqsrv_exec(conn, buf.data, PG_WAIT_EXTENSION);
 	if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
 	{
 		dblink_res_error(conn, conname, res, fail,
@@ -538,7 +539,7 @@ dblink_close(PG_FUNCTION_ARGS)
 		{
 			rconn->newXactForCursor = false;
 
-			res = PQexec(conn, "COMMIT");
+			res = libpqsrv_exec(conn, "COMMIT", PG_WAIT_EXTENSION);
 			if (PQresultStatus(res) != PGRES_COMMAND_OK)
 				dblink_res_internalerror(conn, res, "commit error");
 			PQclear(res);
@@ -620,7 +621,7 @@ dblink_fetch(PG_FUNCTION_ARGS)
 	 * PGresult will be long-lived even though we are still in a short-lived
 	 * memory context.
 	 */
-	res = PQexec(conn, buf.data);
+	res = libpqsrv_exec(conn, buf.data, PG_WAIT_EXTENSION);
 	if (!res ||
 		(PQresultStatus(res) != PGRES_COMMAND_OK &&
 		 PQresultStatus(res) != PGRES_TUPLES_OK))
@@ -768,7 +769,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
 		else
 		{
 			/* async result retrieval, do it the old way */
-			PGresult   *res = PQgetResult(conn);
+			PGresult   *res = libpqsrv_get_result(conn, PG_WAIT_EXTENSION);
 
 			/* NULL means we're all done with the async results */
 			if (res)
@@ -1076,7 +1077,8 @@ materializeQueryResult(FunctionCallInfo fcinfo,
 		PQclear(sinfo.last_res);
 		PQclear(sinfo.cur_res);
 		/* and clear out any pending data in libpq */
-		while ((res = PQgetResult(conn)) != NULL)
+		while ((res = libpqsrv_get_result(conn, PG_WAIT_EXTENSION)) !=
+			   NULL)
 			PQclear(res);
 		PG_RE_THROW();
 	}
@@ -1103,7 +1105,7 @@ storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql)
 	{
 		CHECK_FOR_INTERRUPTS();
 
-		sinfo->cur_res = PQgetResult(conn);
+		sinfo->cur_res = libpqsrv_get_result(conn, PG_WAIT_EXTENSION);
 		if (!sinfo->cur_res)
 			break;
 
@@ -1431,7 +1433,7 @@ dblink_exec(PG_FUNCTION_ARGS)
 		if (!conn)
 			dblink_conn_not_avail(conname);
 
-		res = PQexec(conn, sql);
+		res = libpqsrv_exec(conn, sql, PG_WAIT_EXTENSION);
 		if (!res ||
 			(PQresultStatus(res) != PGRES_COMMAND_OK &&
 			 PQresultStatus(res) != PGRES_TUPLES_OK))
@@ -2728,8 +2730,8 @@ dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
 
 	/*
 	 * If we don't get a message from the PGresult, try the PGconn.  This is
-	 * needed because for connection-level failures, PQexec may just return
-	 * NULL, not a PGresult at all.
+	 * needed because for connection-level failures, PQgetResult may just
+	 * return NULL, not a PGresult at all.
 	 */
 	if (message_primary == NULL)
 		message_primary = pchomp(PQerrorMessage(conn));
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 3246e18ab93..3cf0f4b148e 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -709,7 +709,7 @@ do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
 	 */
 	if (consume_input && !PQconsumeInput(conn))
 		pgfdw_report_error(ERROR, NULL, conn, false, sql);
-	res = pgfdw_get_result(conn, sql);
+	res = pgfdw_get_result(conn);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, conn, true, sql);
 	PQclear(res);
@@ -812,7 +812,9 @@ GetPrepStmtNumber(PGconn *conn)
 /*
  * Submit a query and wait for the result.
  *
- * This function is interruptible by signals.
+ * Since we don't use non-blocking mode, this can't process interrupts while
+ * pushing the query text to the server.  That risk is relatively small, so we
+ * ignore that for now.
  *
  * Caller is responsible for the error handling on the result.
  */
@@ -823,77 +825,20 @@ pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
 	if (state && state->pendingAreq)
 		process_pending_request(state->pendingAreq);
 
-	/*
-	 * Submit a query.  Since we don't use non-blocking mode, this also can
-	 * block.  But its risk is relatively small, so we ignore that for now.
-	 */
 	if (!PQsendQuery(conn, query))
-		pgfdw_report_error(ERROR, NULL, conn, false, query);
-
-	/* Wait for the result. */
-	return pgfdw_get_result(conn, query);
+		return NULL;
+	return pgfdw_get_result(conn);
 }
 
 /*
- * Wait for the result from a prior asynchronous execution function call.
- *
- * This function offers quick responsiveness by checking for any interruptions.
- *
- * This function emulates PQexec()'s behavior of returning the last result
- * when there are many.
+ * Wrap libpqsrv_get_result_last(), adding wait event.
  *
  * Caller is responsible for the error handling on the result.
  */
 PGresult *
-pgfdw_get_result(PGconn *conn, const char *query)
+pgfdw_get_result(PGconn *conn)
 {
-	PGresult   *volatile last_res = NULL;
-
-	/* In what follows, do not leak any PGresults on an error. */
-	PG_TRY();
-	{
-		for (;;)
-		{
-			PGresult   *res;
-
-			while (PQisBusy(conn))
-			{
-				int			wc;
-
-				/* Sleep until there's something to do */
-				wc = WaitLatchOrSocket(MyLatch,
-									   WL_LATCH_SET | WL_SOCKET_READABLE |
-									   WL_EXIT_ON_PM_DEATH,
-									   PQsocket(conn),
-									   -1L, PG_WAIT_EXTENSION);
-				ResetLatch(MyLatch);
-
-				CHECK_FOR_INTERRUPTS();
-
-				/* Data available in socket? */
-				if (wc & WL_SOCKET_READABLE)
-				{
-					if (!PQconsumeInput(conn))
-						pgfdw_report_error(ERROR, NULL, conn, false, query);
-				}
-			}
-
-			res = PQgetResult(conn);
-			if (res == NULL)
-				break;			/* query is complete */
-
-			PQclear(last_res);
-			last_res = res;
-		}
-	}
-	PG_CATCH();
-	{
-		PQclear(last_res);
-		PG_RE_THROW();
-	}
-	PG_END_TRY();
-
-	return last_res;
+	return libpqsrv_get_result_last(conn, PG_WAIT_EXTENSION);
 }
 
 /*
@@ -934,8 +879,8 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
 
 		/*
 		 * If we don't get a message from the PGresult, try the PGconn.  This
-		 * is needed because for connection-level failures, PQexec may just
-		 * return NULL, not a PGresult at all.
+		 * is needed because for connection-level failures, PQgetResult may
+		 * just return NULL, not a PGresult at all.
 		 */
 		if (message_primary == NULL)
 			message_primary = pchomp(PQerrorMessage(conn));
@@ -1035,7 +980,8 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 					 */
 					if (entry->have_prep_stmt && entry->have_error)
 					{
-						res = PQexec(entry->conn, "DEALLOCATE ALL");
+						res = pgfdw_exec_query(entry->conn, "DEALLOCATE ALL",
+											   NULL);
 						PQclear(res);
 					}
 					entry->have_prep_stmt = false;
diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c
index 6cba34350af..dba4dedd932 100644
--- a/contrib/postgres_fdw/deparse.c
+++ b/contrib/postgres_fdw/deparse.c
@@ -3673,7 +3673,7 @@ appendOrderBySuffix(Oid sortop, Oid sortcoltype, bool nulls_first,
  * Print the representation of a parameter to be sent to the remote side.
  *
  * Note: we always label the Param's type explicitly rather than relying on
- * transmitting a numeric type OID in PQexecParams().  This allows us to
+ * transmitting a numeric type OID in PQsendQueryParams().  This allows us to
  * avoid assuming that types have the same OIDs on the remote side as they
  * do locally --- they need only have the same names.
  */
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 8657a3b0e23..b18567cf8f4 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -3767,7 +3767,7 @@ create_cursor(ForeignScanState *node)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = pgfdw_get_result(conn, buf.data);
+	res = pgfdw_get_result(conn);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
 	PQclear(res);
@@ -3817,7 +3817,7 @@ fetch_more_data(ForeignScanState *node)
 			 * The query was already sent by an earlier call to
 			 * fetch_more_data_begin.  So now we just fetch the result.
 			 */
-			res = pgfdw_get_result(conn, fsstate->query);
+			res = pgfdw_get_result(conn);
 			/* On error, report the original query, not the FETCH. */
 			if (PQresultStatus(res) != PGRES_TUPLES_OK)
 				pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
@@ -4166,7 +4166,7 @@ execute_foreign_modify(EState *estate,
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = pgfdw_get_result(fmstate->conn, fmstate->query);
+	res = pgfdw_get_result(fmstate->conn);
 	if (PQresultStatus(res) !=
 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -4236,7 +4236,7 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = pgfdw_get_result(fmstate->conn, fmstate->query);
+	res = pgfdw_get_result(fmstate->conn);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
 	PQclear(res);
@@ -4578,7 +4578,7 @@ execute_dml_stmt(ForeignScanState *node)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
+	dmstate->result = pgfdw_get_result(dmstate->conn);
 	if (PQresultStatus(dmstate->result) !=
 		(dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
 		pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index 02c11523199..8e8a36ad14c 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -158,7 +158,7 @@ extern void ReleaseConnection(PGconn *conn);
 extern unsigned int GetCursorNumber(PGconn *conn);
 extern unsigned int GetPrepStmtNumber(PGconn *conn);
 extern void do_sql_command(PGconn *conn, const char *sql);
-extern PGresult *pgfdw_get_result(PGconn *conn, const char *query);
+extern PGresult *pgfdw_get_result(PGconn *conn);
 extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query,
 								  PgFdwConnState *state);
 extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index b4038e114d8..568024ec974 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -705,12 +705,9 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
  * Send a query and wait for the results by using the asynchronous libpq
  * functions and socket readiness events.
  *
- * We must not use the regular blocking libpq functions like PQexec()
- * since they are uninterruptible by signals on some platforms, such as
- * Windows.
- *
- * The function is modeled on PQexec() in libpq, but only implements
- * those parts that are in use in the walreceiver api.
+ * The function is modeled on libpqsrv_exec(), with the behavior difference
+ * being that it calls ProcessWalRcvInterrupts().  As an optimization, it
+ * skips try/catch, since all errors terminate the process.
  *
  * May return NULL, rather than an error result, on failure.
  */
diff --git a/src/include/libpq/libpq-be-fe-helpers.h b/src/include/libpq/libpq-be-fe-helpers.h
index 41e3bb4376a..a4b3e805b9d 100644
--- a/src/include/libpq/libpq-be-fe-helpers.h
+++ b/src/include/libpq/libpq-be-fe-helpers.h
@@ -49,6 +49,8 @@
 
 static inline void libpqsrv_connect_prepare(void);
 static inline void libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info);
+static inline PGresult *libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info);
+static inline PGresult *libpqsrv_get_result(PGconn *conn, uint32 wait_event_info);
 
 
 /*
@@ -239,4 +241,129 @@ libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info)
 	PG_END_TRY();
 }
 
+/*
+ * PQexec() wrapper that processes interrupts.
+ *
+ * Unless PQsetnonblocking(conn, 1) is in effect, this can't process
+ * interrupts while pushing the query text to the server.  Consider that
+ * setting if query strings can be long relative to TCP buffer size.
+ *
+ * This has the preconditions of PQsendQuery(), not those of PQexec().  Most
+ * notably, PQexec() would silently discard any prior query results.
+ */
+static inline PGresult *
+libpqsrv_exec(PGconn *conn, const char *query, uint32 wait_event_info)
+{
+	if (!PQsendQuery(conn, query))
+		return NULL;
+	return libpqsrv_get_result_last(conn, wait_event_info);
+}
+
+/*
+ * PQexecParams() wrapper that processes interrupts.
+ *
+ * See notes at libpqsrv_exec().
+ */
+static inline PGresult *
+libpqsrv_exec_params(PGconn *conn,
+					 const char *command,
+					 int nParams,
+					 const Oid *paramTypes,
+					 const char *const *paramValues,
+					 const int *paramLengths,
+					 const int *paramFormats,
+					 int resultFormat,
+					 uint32 wait_event_info)
+{
+	if (!PQsendQueryParams(conn, command, nParams, paramTypes, paramValues,
+						   paramLengths, paramFormats, resultFormat))
+		return NULL;
+	return libpqsrv_get_result_last(conn, wait_event_info);
+}
+
+/*
+ * Like PQexec(), loop over PQgetResult() until it returns NULL or another
+ * terminal state.  Return the last non-NULL result or the terminal state.
+ */
+static inline PGresult *
+libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info)
+{
+	PGresult   *volatile lastResult = NULL;
+
+	/* In what follows, do not leak any PGresults on an error. */
+	PG_TRY();
+	{
+		for (;;)
+		{
+			/* Wait for, and collect, the next PGresult. */
+			PGresult   *result;
+
+			result = libpqsrv_get_result(conn, wait_event_info);
+			if (result == NULL)
+				break;			/* query is complete, or failure */
+
+			/*
+			 * Emulate PQexec()'s behavior of returning the last result when
+			 * there are many.
+			 */
+			PQclear(lastResult);
+			lastResult = result;
+
+			if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
+				PQresultStatus(lastResult) == PGRES_COPY_OUT ||
+				PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
+				PQstatus(conn) == CONNECTION_BAD)
+				break;
+		}
+	}
+	PG_CATCH();
+	{
+		PQclear(lastResult);
+		PG_RE_THROW();
+	}
+	PG_END_TRY();
+
+	return lastResult;
+}
+
+/*
+ * Perform the equivalent of PQgetResult(), but watch for interrupts.
+ */
+static inline PGresult *
+libpqsrv_get_result(PGconn *conn, uint32 wait_event_info)
+{
+	/*
+	 * Collect data until PQgetResult is ready to get the result without
+	 * blocking.
+	 */
+	while (PQisBusy(conn))
+	{
+		int			rc;
+
+		rc = WaitLatchOrSocket(MyLatch,
+							   WL_EXIT_ON_PM_DEATH | WL_LATCH_SET |
+							   WL_SOCKET_READABLE,
+							   PQsocket(conn),
+							   0,
+							   wait_event_info);
+
+		/* Interrupted? */
+		if (rc & WL_LATCH_SET)
+		{
+			ResetLatch(MyLatch);
+			CHECK_FOR_INTERRUPTS();
+		}
+
+		/* Consume whatever data is available from the socket */
+		if (PQconsumeInput(conn) == 0)
+		{
+			/* trouble; expect PQgetResult() to return NULL */
+			break;
+		}
+	}
+
+	/* Now we can collect and return the next PGresult */
+	return PQgetResult(conn);
+}
+
 #endif							/* LIBPQ_BE_FE_HELPERS_H */
-- 
2.47.2

#7Noah Misch
noah@leadboat.com
In reply to: Andreas Karlsson (#6)
Re: dblink query interruptibility

On Thu, Mar 06, 2025 at 02:57:01PM +0100, Andreas Karlsson wrote:

On 11/22/23 2:29 AM, Noah Misch wrote:

Something as simple as the following doesn't respond to cancellation. In
v15+, any DROP DATABASE will hang as long as it's running:

One of our customers ran into this bug when upgrading from PostgreSQL 14 to
PostgreSQL 16. Your commit[1] fixed this issue in PostgreSQL 17 but the
bugfix was not backported with the explanation below.

Code inspection identified the bug at least thirteen years ago, but user

complaints have not appeared. Hence, no back-patch for now.

But that is as far as I can tell not the case because at least for CREATE
DATABASE the bug was introduced in a commit[2] in PostgeSQL 15.

1. https://git.postgresql.org/gitweb/?p=postgresql.git;a=commitdiff;h=d3c5f37dd543498cc7c678815d3921823beec9e9
2. https://git.postgresql.org/gitweb/?p=postgresql.git;a=commitdiff;h=e2f65f42555ff531c6d7c8f151526b4ef7c016f8

The CREATE DATABASE hang is indeed new in v15. The general dblink missed
interrupt processing (e.g. pg_cancel_backend response delay) is an old bug.

And now that
we actually have a user complaint what do you think about backporting the
fix?

Yes, that seems fine to do. No PGXN module refers to libpq-be-fe-helpers.h so
I'm unconcerned about a compatibility risk from adding it. In the context of
https://github.com/2ndQuadrant/pglogical/pull/454, I did test the functions
against all versions v9.4+.

Commit d3c5f37 used the new functions for postgres_fdw, not just dblink. That
caused message changes detailed in
postgr.es/m/CAHGQGwGpDTXeg8K1oTmDv9nankaKTrCD-XW-tnkzo6%3DE9p5%3Duw%40mail.gmail.com
so I'm inclined to omit postgres_fdw changes in back branches. postgres_fdw
was already interruptible, so the point of making postgres_fdw adopt the
functions was to reduce code duplication.

Overall, in the absence of objections, I will queue a task to back-patch the
non-postgres_fdw portion of commit d3c5f37 to v13-v16.

#8Andreas Karlsson
andreas@proxel.se
In reply to: Noah Misch (#7)
Re: dblink query interruptibility

On 3/12/25 12:48 AM, Noah Misch wrote:

The CREATE DATABASE hang is indeed new in v15. The general dblink missed
interrupt processing (e.g. pg_cancel_backend response delay) is an old bug.

Aha, that was what you were referring to! My apologies, was reading your
mail a bit too quickly. :)

Commit d3c5f37 used the new functions for postgres_fdw, not just dblink. That
caused message changes detailed in
postgr.es/m/CAHGQGwGpDTXeg8K1oTmDv9nankaKTrCD-XW-tnkzo6%3DE9p5%3Duw%40mail.gmail.com
so I'm inclined to omit postgres_fdw changes in back branches. postgres_fdw
was already interruptible, so the point of making postgres_fdw adopt the
functions was to reduce code duplication.

Makes sense.

Overall, in the absence of objections, I will queue a task to back-patch the
non-postgres_fdw portion of commit d3c5f37 to v13-v16.

Thanks!

Andreas

#9Noah Misch
noah@leadboat.com
In reply to: Andreas Karlsson (#8)
Re: dblink query interruptibility

On Wed, Mar 12, 2025 at 11:03:41AM +0100, Andreas Karlsson wrote:

On 3/12/25 12:48 AM, Noah Misch wrote:

Overall, in the absence of objections, I will queue a task to back-patch the
non-postgres_fdw portion of commit d3c5f37 to v13-v16.

Pushed (e.g. v16 has commit 82a8f0f). Only v16 had libpq-be-fe-helpers.h at
all, so I also back-patched 28a5917 to add it. The original use case for
libpq-be-fe-helpers.h was interrupting PQconnectdbParams(), commit e460248. I
decided not to back-patch that one, since connection-time delays are often
limited in ways query runtime is not. We could change that decision.

#10Andreas Karlsson
andreas@proxel.se
In reply to: Noah Misch (#9)
Re: dblink query interruptibility

On 4/3/25 11:29 PM, Noah Misch wrote:

Pushed (e.g. v16 has commit 82a8f0f). Only v16 had libpq-be-fe-helpers.h at
all, so I also back-patched 28a5917 to add it. The original use case for
libpq-be-fe-helpers.h was interrupting PQconnectdbParams(), commit e460248. I
decided not to back-patch that one, since connection-time delays are often
limited in ways query runtime is not. We could change that decision.

Thanks!

I am fine with that since there seem to be few real world complaints.

Andreas