From df330045ef8b7a303f01b38c90e5d1805a71acac Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Tue, 25 Feb 2020 15:34:12 +1300
Subject: [PATCH 08/11] Use a WaitEventSet for postgres_fdw.

The same WaitEventSet object will be reused for the life of the
backend.

Author: Thomas Munro
Discussion: https://postgr.es/m/CA%2BhUKGJAC4Oqao%3DqforhNey20J8CiG2R%3DoBPqvfR0vOJrFysGw%40mail.gmail.com
---
 contrib/postgres_fdw/connection.c | 98 +++++++++++++++++++++++++++----
 1 file changed, 88 insertions(+), 10 deletions(-)

diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index e45647f3ea..0397a16702 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -65,6 +65,12 @@ typedef struct ConnCacheEntry
  */
 static HTAB *ConnectionHash = NULL;
 
+/* Reusuable WaitEventSet. */
+static WaitEventSet *ConnectionWaitSet = NULL;
+static int64 ConnectionWaitSetSocketChangeCount = -1;
+static PGconn *ConnectionWaitSetConn = NULL;
+static int ConnectionWaitSetPosition = -1;
+
 /* for assigning cursor numbers and prepared statement numbers */
 static unsigned int cursor_number = 0;
 static unsigned int prep_stmt_number = 0;
@@ -92,6 +98,7 @@ static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
 static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
 									 PGresult **result);
 static bool UserMappingPasswordRequired(UserMapping *user);
+static int pgfdw_wait_for_socket(PGconn *conn, long timeout);
 
 /*
  * Get a PGconn which can be used to execute queries on the remote PostgreSQL
@@ -115,6 +122,17 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
 	{
 		HASHCTL		ctl;
 
+		/*
+		 * We'll use a single WaitEventSet for the lifetime of this backend,
+		 * and add and remove sockets as appropriate.  Only one socket will
+		 * be in it at a time.
+		 */
+		Assert(ConnectionWaitSet == NULL);
+		ConnectionWaitSet = CreateWaitEventSet(TopMemoryContext, 3);
+		AddWaitEventToSet(ConnectionWaitSet, WL_LATCH_SET, -1, MyLatch, NULL);
+		AddWaitEventToSet(ConnectionWaitSet, WL_EXIT_ON_PM_DEATH, -1, NULL,
+						  NULL);
+
 		MemSet(&ctl, 0, sizeof(ctl));
 		ctl.keysize = sizeof(ConnCacheKey);
 		ctl.entrysize = sizeof(ConnCacheEntry);
@@ -344,6 +362,14 @@ disconnect_pg_server(ConnCacheEntry *entry)
 	if (entry->conn != NULL)
 	{
 		PQfinish(entry->conn);
+		if (ConnectionWaitSetConn == entry->conn)
+		{
+			/* We do this after PQfinish, so we know the socket is closed. */
+			RemoveWaitEvent(ConnectionWaitSet,
+							ConnectionWaitSetPosition,
+							true);
+			ConnectionWaitSetConn = NULL;
+		}
 		entry->conn = NULL;
 		ReleaseExternalFD();
 	}
@@ -603,11 +629,7 @@ pgfdw_get_result(PGconn *conn, const char *query)
 				int			wc;
 
 				/* Sleep until there's something to do */
-				wc = WaitLatchOrSocket(MyLatch,
-									   WL_LATCH_SET | WL_SOCKET_READABLE |
-									   WL_EXIT_ON_PM_DEATH,
-									   PQsocket(conn),
-									   -1L, PG_WAIT_EXTENSION);
+				wc = pgfdw_wait_for_socket(conn, -1);
 				ResetLatch(MyLatch);
 
 				CHECK_FOR_INTERRUPTS();
@@ -1207,11 +1229,7 @@ pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result)
 				cur_timeout = Min(60000, secs * USECS_PER_SEC + microsecs);
 
 				/* Sleep until there's something to do */
-				wc = WaitLatchOrSocket(MyLatch,
-									   WL_LATCH_SET | WL_SOCKET_READABLE |
-									   WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
-									   PQsocket(conn),
-									   cur_timeout, PG_WAIT_EXTENSION);
+				wc = pgfdw_wait_for_socket(conn, cur_timeout);
 				ResetLatch(MyLatch);
 
 				CHECK_FOR_INTERRUPTS();
@@ -1250,3 +1268,63 @@ exit:	;
 		*result = last_res;
 	return timed_out;
 }
+
+static int
+pgfdw_wait_for_socket(PGconn *conn, long timeout)
+{
+	WaitEvent	event;
+	int			rc;
+
+	/* If a different conn is in the set, or the socket changed, remove. */
+	if (ConnectionWaitSetConn)
+	{
+		bool socket_changed =
+			(ConnectionWaitSetSocketChangeCount !=
+			PQsocketChangeCount(ConnectionWaitSetConn));
+
+		if (ConnectionWaitSetConn == conn)
+		{
+			/*
+			 * This connection is already in there, but the socket might have
+			 * changed.  If so, remove it.
+			 */
+			if (socket_changed)
+			{
+				RemoveWaitEvent(ConnectionWaitSet,
+								ConnectionWaitSetPosition,
+								true);
+				ConnectionWaitSetConn = NULL;
+			}
+		}
+		else
+		{
+			/*
+			 * A different connection is in there.  Remove it, being careful
+			 * to report whether the socket was already closed (this affects
+			 * whether we unregister the fd with the kernel).
+			 */
+			RemoveWaitEvent(ConnectionWaitSet,
+							ConnectionWaitSetPosition,
+							socket_changed);
+			ConnectionWaitSetConn = NULL;
+		}
+	}
+
+	/* Do we need to add our connection? */
+	if (ConnectionWaitSetConn == NULL)
+	{
+		ConnectionWaitSetPosition =
+			AddWaitEventToSet(ConnectionWaitSet, WL_SOCKET_READABLE,
+							  PQsocket(conn), NULL, NULL);
+		ConnectionWaitSetConn = conn;
+		ConnectionWaitSetSocketChangeCount = PQsocketChangeCount(conn);
+	}
+
+	/* Finally, we can wait. */
+	rc = WaitEventSetWait(ConnectionWaitSet, timeout, &event, 1,
+						  PG_WAIT_EXTENSION);
+	if (rc == 0)
+		return WL_TIMEOUT;
+
+	return event.events;
+}
-- 
2.18.2

