From 34e5ea311322b3bc3bb0f8c925f9ade1a59c6f09 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Tue, 3 Jan 2023 11:31:04 -0800
Subject: [PATCH v2] wip: don't block inside PQconnectdb et al

---
 src/include/libpq/libpq-be-fe-helpers.h       | 233 ++++++++++++++++++
 .../libpqwalreceiver/libpqwalreceiver.c       |  53 +---
 contrib/dblink/dblink.c                       |  80 +-----
 contrib/postgres_fdw/connection.c             |  42 +---
 4 files changed, 256 insertions(+), 152 deletions(-)
 create mode 100644 src/include/libpq/libpq-be-fe-helpers.h

diff --git a/src/include/libpq/libpq-be-fe-helpers.h b/src/include/libpq/libpq-be-fe-helpers.h
new file mode 100644
index 00000000000..4f3d3b821f0
--- /dev/null
+++ b/src/include/libpq/libpq-be-fe-helpers.h
@@ -0,0 +1,233 @@
+/*-------------------------------------------------------------------------
+ *
+ * libpq-be-fe-helpers.h
+ *	  Helper functions for using libpq in extensions
+ *
+ * Code built directly into the backend is not allowed to link to libpq
+ * directly. Extension code is allowed to use libpq however. However, libpq
+ * used in extensions has to be careful to block inside libpq, otherwise
+ * interrupts will not be processed, leading to issues like unresolvable
+ * deadlocks. Backend code also needs to take care to acquire/release an
+ * external fd for the connection, otherwise fd.c's accounting of fd's is
+ * broken.
+ *
+ * This file provides helper functions to make it easier to comply with these
+ * rules. It is a header only library as it needs to be linked into each
+ * extension using libpq, and it seems too small to be worth adding a
+ * dedicated static library for.
+ *
+ * TODO: For historical reasons the connections established here are not put
+ * into non-blocking mode. That can lead to blocking even when only the async
+ * libpq functions are used. This should be fixed.
+ *
+ * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/libpq/libpq-be-fe-helpers.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef LIBPQ_BE_FE_HELPERS_H
+#define LIBPQ_BE_FE_HELPERS_H
+
+/*
+ * Despite the name, BUILDING_DLL is set only when building code directly part
+ * of the backend. Which also is where libpq isn't allowed to be
+ * used. Obviously this doesn't protect against libpq-fe.h getting included
+ * otherwise, but perhaps still protects against a few mistakes...
+ */
+#ifdef BUILDING_DLL
+#error "libpq may not be used code directly built into the backend"
+#endif
+
+#include "libpq-fe.h"
+#include "miscadmin.h"
+#include "storage/fd.h"
+#include "storage/latch.h"
+
+
+static inline void libpqsrv_connect_prepare(void);
+static inline void libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info);
+
+/*
+ * PQfinish() wrapper that additionally releases the reserved file descriptor.
+ *
+ * It is allowed to call this with a NULL pgconn iff returned by
+ * libpqsrv_connect*.
+ */
+static inline void
+libpqsrv_disconnect(PGconn *conn)
+{
+	/*
+	 * If no connection was established, we haven't reserved an FD for it (or
+	 * already released it). This rule makes it easier to write PG_CATCH()
+	 * handlers for this facility's users.
+	 *
+	 * See also libpqsrv_connect_internal().
+	 */
+	if (conn == NULL)
+		return;
+
+	ReleaseExternalFD();
+	PQfinish(conn);
+}
+
+/*
+ * PQconnectdb() wrapper that reserves a file descriptor and processes
+ * interrupts.
+ */
+static inline PGconn *
+libpqsrv_connect(const char *conninfo, uint32 wait_event_info)
+{
+	PGconn	   *conn = NULL;
+
+	libpqsrv_connect_prepare();
+
+	conn = PQconnectStart(conninfo);
+
+	libpqsrv_connect_internal(conn, wait_event_info);
+
+	return conn;
+}
+
+/*
+ * PQconnectdbParams() wrapper that reserves a file descriptor and processes
+ * interrupts.
+ */
+static inline PGconn *
+libpqsrv_connect_params(const char *const *keywords,
+						const char *const *values,
+						int expand_dbname,
+						uint32 wait_event_info)
+{
+	PGconn	   *conn = NULL;
+
+	libpqsrv_connect_prepare();
+
+	conn = PQconnectStartParams(keywords, values, expand_dbname);
+
+	libpqsrv_connect_internal(conn, wait_event_info);
+
+	return conn;
+}
+
+/*
+ * Helper function for all connection establishment functions.
+ */
+static inline void
+libpqsrv_connect_prepare(void)
+{
+	/*
+	 * We must obey fd.c's limit on non-virtual file descriptors.  Assume that
+	 * a PGconn represents one long-lived FD.  (Doing this here also ensures
+	 * that VFDs are closed if needed to make room.)
+	 */
+	if (!AcquireExternalFD())
+	{
+#ifndef WIN32					/* can't write #if within ereport() macro */
+		ereport(ERROR,
+				(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
+				 errmsg("could not establish connection"),
+				 errdetail("There are too many open files on the local server."),
+				 errhint("Raise the server's max_files_per_process and/or \"ulimit -n\" limits.")));
+#else
+		ereport(ERROR,
+				(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
+				 errmsg("could not establish connection"),
+				 errdetail("There are too many open files on the local server."),
+				 errhint("Raise the server's max_files_per_process setting.")));
+#endif
+	}
+
+}
+
+/*
+ * Helper function for all connection establishment functions.
+ */
+static inline void
+libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info)
+{
+	/*
+	 * With conn == NULL libpqsrv_disconnect() wouldn't release the FD. So do
+	 * that here.
+	 */
+	if (conn == NULL)
+	{
+		ReleaseExternalFD();
+		return;
+	}
+
+	/*
+	 * Can't wait without a socket. Note that we don't want to close the libpq
+	 * connection yet, so callers can emit a useful error.
+	 */
+	if (PQstatus(conn) == CONNECTION_BAD)
+		return;
+
+	/*
+	 * WaitLatchOrSocket() can conceivably fail, handle this case here instead
+	 * of requiring all callers to do so.
+	 */
+	PG_TRY();
+	{
+		PostgresPollingStatusType status;
+
+		/*
+		 * Poll connection until we have OK or FAILED status.
+		 *
+		 * Per spec for PQconnectPoll, first wait till socket is write-ready.
+		 */
+		status = PGRES_POLLING_WRITING;
+		while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED)
+		{
+			int			io_flag;
+			int			rc;
+
+			if (status == PGRES_POLLING_READING)
+				io_flag = WL_SOCKET_READABLE;
+#ifdef WIN32
+
+			/*
+			 * Windows needs a different test while waiting for
+			 * connection-made
+			 */
+			else if (PQstatus(conn) == CONNECTION_STARTED)
+				io_flag = WL_SOCKET_CONNECTED;
+#endif
+			else
+				io_flag = WL_SOCKET_WRITEABLE;
+
+			rc = WaitLatchOrSocket(MyLatch,
+								   WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
+								   PQsocket(conn),
+								   0,
+								   wait_event_info);
+
+			/* Interrupted? */
+			if (rc & WL_LATCH_SET)
+			{
+				ResetLatch(MyLatch);
+				CHECK_FOR_INTERRUPTS();
+			}
+
+			/* If socket is ready, advance the libpq state machine */
+			if (rc & io_flag)
+				status = PQconnectPoll(conn);
+		}
+	}
+	PG_CATCH();
+	{
+		/*
+		 * If an error is thrown here, the callers won't call
+		 * libpqsrv_disconnect() with a conn, so release resources
+		 * immediately.
+		 */
+		ReleaseExternalFD();
+		PQfinish(conn);
+
+		PG_RE_THROW();
+	}
+	PG_END_TRY();
+}
+
+#endif							/* LIBPQ_BE_FE_HELPERS_H */
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 219cd73b7fc..bab2fe29562 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -24,6 +24,7 @@
 #include "common/connect.h"
 #include "funcapi.h"
 #include "libpq-fe.h"
+#include "libpq/libpq-be-fe-helpers.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -125,7 +126,6 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
 				 char **err)
 {
 	WalReceiverConn *conn;
-	PostgresPollingStatusType status;
 	const char *keys[6];
 	const char *vals[6];
 	int			i = 0;
@@ -172,52 +172,9 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
 	Assert(i < sizeof(keys));
 
 	conn = palloc0(sizeof(WalReceiverConn));
-	conn->streamConn = PQconnectStartParams(keys, vals,
-											 /* expand_dbname = */ true);
-	if (PQstatus(conn->streamConn) == CONNECTION_BAD)
-	{
-		*err = pchomp(PQerrorMessage(conn->streamConn));
-		return NULL;
-	}
-
-	/*
-	 * Poll connection until we have OK or FAILED status.
-	 *
-	 * Per spec for PQconnectPoll, first wait till socket is write-ready.
-	 */
-	status = PGRES_POLLING_WRITING;
-	do
-	{
-		int			io_flag;
-		int			rc;
-
-		if (status == PGRES_POLLING_READING)
-			io_flag = WL_SOCKET_READABLE;
-#ifdef WIN32
-		/* Windows needs a different test while waiting for connection-made */
-		else if (PQstatus(conn->streamConn) == CONNECTION_STARTED)
-			io_flag = WL_SOCKET_CONNECTED;
-#endif
-		else
-			io_flag = WL_SOCKET_WRITEABLE;
-
-		rc = WaitLatchOrSocket(MyLatch,
-							   WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
-							   PQsocket(conn->streamConn),
-							   0,
-							   WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
-
-		/* Interrupted? */
-		if (rc & WL_LATCH_SET)
-		{
-			ResetLatch(MyLatch);
-			ProcessWalRcvInterrupts();
-		}
-
-		/* If socket is ready, advance the libpq state machine */
-		if (rc & io_flag)
-			status = PQconnectPoll(conn->streamConn);
-	} while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED);
+	conn->streamConn = libpqsrv_connect_params(keys, vals,
+											    /* expand_dbname = */ true,
+											   WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
 
 	if (PQstatus(conn->streamConn) != CONNECTION_OK)
 	{
@@ -740,7 +697,7 @@ libpqrcv_PQgetResult(PGconn *streamConn)
 static void
 libpqrcv_disconnect(WalReceiverConn *conn)
 {
-	PQfinish(conn->streamConn);
+	libpqsrv_disconnect(conn->streamConn);
 	PQfreemem(conn->recvBuf);
 	pfree(conn);
 }
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 8dd122042b4..130320db571 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -48,6 +48,7 @@
 #include "funcapi.h"
 #include "lib/stringinfo.h"
 #include "libpq-fe.h"
+#include "libpq/libpq-be-fe-helpers.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
 #include "parser/scansup.h"
@@ -59,6 +60,7 @@
 #include "utils/memutils.h"
 #include "utils/rel.h"
 #include "utils/varlena.h"
+#include "utils/wait_event.h"
 
 PG_MODULE_MAGIC;
 
@@ -199,37 +201,14 @@ dblink_get_conn(char *conname_or_str,
 			connstr = conname_or_str;
 		dblink_connstr_check(connstr);
 
-		/*
-		 * We must obey fd.c's limit on non-virtual file descriptors.  Assume
-		 * that a PGconn represents one long-lived FD.  (Doing this here also
-		 * ensures that VFDs are closed if needed to make room.)
-		 */
-		if (!AcquireExternalFD())
-		{
-#ifndef WIN32					/* can't write #if within ereport() macro */
-			ereport(ERROR,
-					(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
-					 errmsg("could not establish connection"),
-					 errdetail("There are too many open files on the local server."),
-					 errhint("Raise the server's max_files_per_process and/or \"ulimit -n\" limits.")));
-#else
-			ereport(ERROR,
-					(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
-					 errmsg("could not establish connection"),
-					 errdetail("There are too many open files on the local server."),
-					 errhint("Raise the server's max_files_per_process setting.")));
-#endif
-		}
-
 		/* OK to make connection */
-		conn = PQconnectdb(connstr);
+		conn = libpqsrv_connect(connstr, PG_WAIT_EXTENSION);
 
 		if (PQstatus(conn) == CONNECTION_BAD)
 		{
 			char	   *msg = pchomp(PQerrorMessage(conn));
 
-			PQfinish(conn);
-			ReleaseExternalFD();
+			libpqsrv_disconnect(conn);
 			ereport(ERROR,
 					(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
 					 errmsg("could not establish connection"),
@@ -312,36 +291,13 @@ dblink_connect(PG_FUNCTION_ARGS)
 	/* check password in connection string if not superuser */
 	dblink_connstr_check(connstr);
 
-	/*
-	 * We must obey fd.c's limit on non-virtual file descriptors.  Assume that
-	 * a PGconn represents one long-lived FD.  (Doing this here also ensures
-	 * that VFDs are closed if needed to make room.)
-	 */
-	if (!AcquireExternalFD())
-	{
-#ifndef WIN32					/* can't write #if within ereport() macro */
-		ereport(ERROR,
-				(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
-				 errmsg("could not establish connection"),
-				 errdetail("There are too many open files on the local server."),
-				 errhint("Raise the server's max_files_per_process and/or \"ulimit -n\" limits.")));
-#else
-		ereport(ERROR,
-				(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
-				 errmsg("could not establish connection"),
-				 errdetail("There are too many open files on the local server."),
-				 errhint("Raise the server's max_files_per_process setting.")));
-#endif
-	}
-
 	/* OK to make connection */
-	conn = PQconnectdb(connstr);
+	conn = libpqsrv_connect(connstr, PG_WAIT_EXTENSION);
 
 	if (PQstatus(conn) == CONNECTION_BAD)
 	{
 		msg = pchomp(PQerrorMessage(conn));
-		PQfinish(conn);
-		ReleaseExternalFD();
+		libpqsrv_disconnect(conn);
 		if (rconn)
 			pfree(rconn);
 
@@ -366,10 +322,7 @@ dblink_connect(PG_FUNCTION_ARGS)
 	else
 	{
 		if (pconn->conn)
-		{
-			PQfinish(pconn->conn);
-			ReleaseExternalFD();
-		}
+			libpqsrv_disconnect(conn);
 		pconn->conn = conn;
 	}
 
@@ -402,8 +355,7 @@ dblink_disconnect(PG_FUNCTION_ARGS)
 	if (!conn)
 		dblink_conn_not_avail(conname);
 
-	PQfinish(conn);
-	ReleaseExternalFD();
+	libpqsrv_disconnect(conn);
 	if (rconn)
 	{
 		deleteConnection(conname);
@@ -838,10 +790,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
 	{
 		/* if needed, close the connection to the database */
 		if (freeconn)
-		{
-			PQfinish(conn);
-			ReleaseExternalFD();
-		}
+			libpqsrv_disconnect(conn);
 	}
 	PG_END_TRY();
 
@@ -1516,10 +1465,7 @@ dblink_exec(PG_FUNCTION_ARGS)
 	{
 		/* if needed, close the connection to the database */
 		if (freeconn)
-		{
-			PQfinish(conn);
-			ReleaseExternalFD();
-		}
+			libpqsrv_disconnect(conn);
 	}
 	PG_END_TRY();
 
@@ -2606,8 +2552,7 @@ createNewConnection(const char *name, remoteConn *rconn)
 
 	if (found)
 	{
-		PQfinish(rconn->conn);
-		ReleaseExternalFD();
+		libpqsrv_disconnect(rconn->conn);
 		pfree(rconn);
 
 		ereport(ERROR,
@@ -2647,8 +2592,7 @@ dblink_security_check(PGconn *conn, remoteConn *rconn)
 	{
 		if (!PQconnectionUsedPassword(conn))
 		{
-			PQfinish(conn);
-			ReleaseExternalFD();
+			libpqsrv_disconnect(conn);
 			if (rconn)
 				pfree(rconn);
 
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index ed75ce3f79c..7760380f00d 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -17,6 +17,7 @@
 #include "catalog/pg_user_mapping.h"
 #include "commands/defrem.h"
 #include "funcapi.h"
+#include "libpq/libpq-be-fe-helpers.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -446,35 +447,10 @@ connect_pg_server(ForeignServer *server, UserMapping *user)
 		/* verify the set of connection parameters */
 		check_conn_params(keywords, values, user);
 
-		/*
-		 * We must obey fd.c's limit on non-virtual file descriptors.  Assume
-		 * that a PGconn represents one long-lived FD.  (Doing this here also
-		 * ensures that VFDs are closed if needed to make room.)
-		 */
-		if (!AcquireExternalFD())
-		{
-#ifndef WIN32					/* can't write #if within ereport() macro */
-			ereport(ERROR,
-					(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
-					 errmsg("could not connect to server \"%s\"",
-							server->servername),
-					 errdetail("There are too many open files on the local server."),
-					 errhint("Raise the server's max_files_per_process and/or \"ulimit -n\" limits.")));
-#else
-			ereport(ERROR,
-					(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
-					 errmsg("could not connect to server \"%s\"",
-							server->servername),
-					 errdetail("There are too many open files on the local server."),
-					 errhint("Raise the server's max_files_per_process setting.")));
-#endif
-		}
-
 		/* OK to make connection */
-		conn = PQconnectdbParams(keywords, values, false);
-
-		if (!conn)
-			ReleaseExternalFD();	/* because the PG_CATCH block won't */
+		conn = libpqsrv_connect_params(keywords, values,
+									    /* expand_dbname = */ false,
+									   PG_WAIT_EXTENSION);
 
 		if (!conn || PQstatus(conn) != CONNECTION_OK)
 			ereport(ERROR,
@@ -507,12 +483,7 @@ connect_pg_server(ForeignServer *server, UserMapping *user)
 	}
 	PG_CATCH();
 	{
-		/* Release PGconn data structure if we managed to create one */
-		if (conn)
-		{
-			PQfinish(conn);
-			ReleaseExternalFD();
-		}
+		libpqsrv_disconnect(conn);
 		PG_RE_THROW();
 	}
 	PG_END_TRY();
@@ -528,9 +499,8 @@ disconnect_pg_server(ConnCacheEntry *entry)
 {
 	if (entry->conn != NULL)
 	{
-		PQfinish(entry->conn);
+		libpqsrv_disconnect(entry->conn);
 		entry->conn = NULL;
-		ReleaseExternalFD();
 	}
 }
 
-- 
2.38.0

