From 2abdbcb71484af0f0c6e62f8fc9ce2be7d99ffaa Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Sat, 8 Feb 2020 09:14:29 +1300
Subject: [PATCH 06/11] Reuse a WaitEventSet in libpqwalreceiver.c.

To avoid repeatedly setting up and tearing down WaitEventSet objects
and associated kernel objects, reuse a WaitEventSet.  Export a wait
function that is smart enough to handle socket changes under the
covers, and then use that for physical and logical replication.

Author: Thomas Munro
Discussion: https://postgr.es/m/CA%2BhUKGJAC4Oqao%3DqforhNey20J8CiG2R%3DoBPqvfR0vOJrFysGw%40mail.gmail.com
---
 .../libpqwalreceiver/libpqwalreceiver.c       | 126 +++++++++++++-----
 src/backend/replication/logical/tablesync.c   |   6 +-
 src/backend/replication/logical/worker.c      |   6 +-
 src/backend/replication/walreceiver.c         |  21 +--
 src/include/replication/walreceiver.h         |   5 +
 5 files changed, 101 insertions(+), 63 deletions(-)

diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index e4fd1f9bb6..e54581f37f 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -41,6 +41,11 @@ struct WalReceiverConn
 {
 	/* Current connection to the primary, if any */
 	PGconn	   *streamConn;
+	/* Wait event set used to wait for I/O */
+	WaitEventSet *wes;
+	/* Used to handle changes in the underlying socket */
+	int64		wes_fd_change_count;
+	int			wes_socket_position;
 	/* Used to remember if the connection is logical or physical */
 	bool		logical;
 	/* Buffer for currently read records */
@@ -80,6 +85,7 @@ static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn,
 									   const int nRetTypes,
 									   const Oid *retTypes);
 static void libpqrcv_disconnect(WalReceiverConn *conn);
+static int libpqrcv_wait(WalReceiverConn *conn, long timeout, int wait_event);
 
 static WalReceiverFunctionsType PQWalReceiverFunctions = {
 	libpqrcv_connect,
@@ -96,13 +102,16 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
 	libpqrcv_create_slot,
 	libpqrcv_get_backend_pid,
 	libpqrcv_exec,
-	libpqrcv_disconnect
+	libpqrcv_disconnect,
+	libpqrcv_wait
 };
 
 /* Prototypes for private functions */
-static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query);
-static PGresult *libpqrcv_PQgetResult(PGconn *streamConn);
+static PGresult *libpqrcv_PQexec(WalReceiverConn *conn, const char *query);
+static PGresult *libpqrcv_PQgetResult(WalReceiverConn *conn);
 static char *stringlist_to_identifierstr(PGconn *conn, List *strings);
+static void libpqrcv_prepare_to_wait_for_socket(WalReceiverConn *conn,
+												int io_flags);
 
 /*
  * Module initialization function
@@ -168,6 +177,15 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
 		return NULL;
 	}
 
+	/* Create a WaitEventSet that will last as long as the connection. */
+	conn->wes = CreateWaitEventSet(TopMemoryContext, 3);
+	conn->wes_socket_position =
+		AddWaitEventToSet(conn->wes, WL_SOCKET_READABLE,
+						  PQsocket(conn->streamConn), NULL, NULL);
+	conn->wes_fd_change_count = PQsocketChangeCount(conn->streamConn);
+	AddWaitEventToSet(conn->wes, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL);
+	AddWaitEventToSet(conn->wes, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, NULL, NULL);
+
 	/*
 	 * Poll connection until we have OK or FAILED status.
 	 *
@@ -177,7 +195,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
 	do
 	{
 		int			io_flag;
-		int			rc;
+		WaitEvent	event;
 
 		if (status == PGRES_POLLING_READING)
 			io_flag = WL_SOCKET_READABLE;
@@ -189,21 +207,19 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
 		else
 			io_flag = WL_SOCKET_WRITEABLE;
 
-		rc = WaitLatchOrSocket(MyLatch,
-							   WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
-							   PQsocket(conn->streamConn),
-							   0,
-							   WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
+		libpqrcv_prepare_to_wait_for_socket(conn, io_flag);
+		(void) WaitEventSetWait(conn->wes, -1, &event, 1,
+								WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
 
 		/* Interrupted? */
-		if (rc & WL_LATCH_SET)
+		if (event.events == WL_LATCH_SET)
 		{
 			ResetLatch(MyLatch);
 			ProcessWalRcvInterrupts();
 		}
 
 		/* If socket is ready, advance the libpq state machine */
-		if (rc & io_flag)
+		if (event.events == io_flag)
 			status = PQconnectPoll(conn->streamConn);
 	} while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED);
 
@@ -322,7 +338,7 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
 	 * Get the system identifier and timeline ID as a DataRow message from the
 	 * primary server.
 	 */
-	res = libpqrcv_PQexec(conn->streamConn, "IDENTIFY_SYSTEM");
+	res = libpqrcv_PQexec(conn, "IDENTIFY_SYSTEM");
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
 		PQclear(res);
@@ -431,7 +447,7 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 						 options->proto.physical.startpointTLI);
 
 	/* Start streaming. */
-	res = libpqrcv_PQexec(conn->streamConn, cmd.data);
+	res = libpqrcv_PQexec(conn, cmd.data);
 	pfree(cmd.data);
 
 	if (PQresultStatus(res) == PGRES_COMMAND_OK)
@@ -479,7 +495,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 	 * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is
 	 * also possible in case we aborted the copy in mid-stream.
 	 */
-	res = libpqrcv_PQgetResult(conn->streamConn);
+	res = libpqrcv_PQgetResult(conn);
 	if (PQresultStatus(res) == PGRES_TUPLES_OK)
 	{
 		/*
@@ -493,7 +509,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 		PQclear(res);
 
 		/* the result set should be followed by CommandComplete */
-		res = libpqrcv_PQgetResult(conn->streamConn);
+		res = libpqrcv_PQgetResult(conn);
 	}
 	else if (PQresultStatus(res) == PGRES_COPY_OUT)
 	{
@@ -506,7 +522,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 							pchomp(PQerrorMessage(conn->streamConn)))));
 
 		/* CommandComplete should follow */
-		res = libpqrcv_PQgetResult(conn->streamConn);
+		res = libpqrcv_PQgetResult(conn);
 	}
 
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
@@ -516,7 +532,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 	PQclear(res);
 
 	/* Verify that there are no more results */
-	res = libpqrcv_PQgetResult(conn->streamConn);
+	res = libpqrcv_PQgetResult(conn);
 	if (res != NULL)
 		ereport(ERROR,
 				(errmsg("unexpected result after CommandComplete: %s",
@@ -540,7 +556,7 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
 	 * Request the primary to send over the history file for given timeline.
 	 */
 	snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
-	res = libpqrcv_PQexec(conn->streamConn, cmd);
+	res = libpqrcv_PQexec(conn, cmd);
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
 		PQclear(res);
@@ -582,8 +598,9 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
  * May return NULL, rather than an error result, on failure.
  */
 static PGresult *
-libpqrcv_PQexec(PGconn *streamConn, const char *query)
+libpqrcv_PQexec(WalReceiverConn *conn, const char *query)
 {
+	PGconn	   *streamConn = conn->streamConn;
 	PGresult   *lastResult = NULL;
 
 	/*
@@ -606,7 +623,7 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query)
 		/* Wait for, and collect, the next PGresult. */
 		PGresult   *result;
 
-		result = libpqrcv_PQgetResult(streamConn);
+		result = libpqrcv_PQgetResult(conn);
 		if (result == NULL)
 			break;				/* query is complete, or failure */
 
@@ -631,30 +648,29 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query)
  * Perform the equivalent of PQgetResult(), but watch for interrupts.
  */
 static PGresult *
-libpqrcv_PQgetResult(PGconn *streamConn)
+libpqrcv_PQgetResult(WalReceiverConn *conn)
 {
+	PGconn	   *streamConn = conn->streamConn;
+
 	/*
 	 * Collect data until PQgetResult is ready to get the result without
 	 * blocking.
 	 */
 	while (PQisBusy(streamConn))
 	{
-		int			rc;
+		WaitEvent	event;
 
 		/*
 		 * We don't need to break down the sleep into smaller increments,
 		 * since we'll get interrupted by signals and can handle any
 		 * interrupts here.
 		 */
-		rc = WaitLatchOrSocket(MyLatch,
-							   WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
-							   WL_LATCH_SET,
-							   PQsocket(streamConn),
-							   0,
-							   WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
+		libpqrcv_prepare_to_wait_for_socket(conn, WL_SOCKET_READABLE);
+		(void) WaitEventSetWait(conn->wes, -1, &event, 1,
+								WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 
 		/* Interrupted? */
-		if (rc & WL_LATCH_SET)
+		if (event.events == WL_LATCH_SET)
 		{
 			ResetLatch(MyLatch);
 			ProcessWalRcvInterrupts();
@@ -681,9 +697,27 @@ libpqrcv_disconnect(WalReceiverConn *conn)
 	PQfinish(conn->streamConn);
 	if (conn->recvBuf != NULL)
 		PQfreemem(conn->recvBuf);
+	FreeWaitEventSet(conn->wes);
 	pfree(conn);
 }
 
+/*
+ * Wait for new data to arrive, or a timeout.
+ */
+static int
+libpqrcv_wait(WalReceiverConn *conn, long timeout, int wait_event)
+{
+	WaitEvent	event;
+	int			rc;
+
+	libpqrcv_prepare_to_wait_for_socket(conn, WL_SOCKET_READABLE);
+	rc = WaitEventSetWait(conn->wes, timeout, &event, 1, wait_event);
+	if (rc == 0)
+		return WL_TIMEOUT;
+
+	return event.events;
+}
+
 /*
  * Receive a message available from XLOG stream.
  *
@@ -701,8 +735,7 @@ libpqrcv_disconnect(WalReceiverConn *conn)
  * ereports on error.
  */
 static int
-libpqrcv_receive(WalReceiverConn *conn, char **buffer,
-				 pgsocket *wait_fd)
+libpqrcv_receive(WalReceiverConn *conn, char **buffer, pgsocket *wait_fd)
 {
 	int			rawlen;
 
@@ -733,13 +766,13 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer,
 	{
 		PGresult   *res;
 
-		res = libpqrcv_PQgetResult(conn->streamConn);
+		res = libpqrcv_PQgetResult(conn);
 		if (PQresultStatus(res) == PGRES_COMMAND_OK)
 		{
 			PQclear(res);
 
 			/* Verify that there are no more results. */
-			res = libpqrcv_PQgetResult(conn->streamConn);
+			res = libpqrcv_PQgetResult(conn);
 			if (res != NULL)
 			{
 				PQclear(res);
@@ -839,7 +872,7 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
 		appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL");
 	}
 
-	res = libpqrcv_PQexec(conn->streamConn, cmd.data);
+	res = libpqrcv_PQexec(conn, cmd.data);
 	pfree(cmd.data);
 
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
@@ -963,7 +996,7 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("the query interface requires a database connection")));
 
-	pgres = libpqrcv_PQexec(conn->streamConn, query);
+	pgres = libpqrcv_PQexec(conn, query);
 
 	switch (PQresultStatus(pgres))
 	{
@@ -1047,3 +1080,26 @@ stringlist_to_identifierstr(PGconn *conn, List *strings)
 
 	return res.data;
 }
+
+/*
+ * Update our WaitEventSet so that we can wait for 'io_flags' on our socket,
+ * considering that the socket might have changed.
+ */
+static void
+libpqrcv_prepare_to_wait_for_socket(WalReceiverConn *conn, int io_flags)
+{
+	if (conn->wes_fd_change_count != PQsocketChangeCount(conn->streamConn))
+	{
+		/* The previous socket has been closed.  Replace it with the new one. */
+		RemoveWaitEvent(conn->wes, conn->wes_socket_position, true);
+		conn->wes_socket_position =
+			AddWaitEventToSet(conn->wes, io_flags,
+							  PQsocket(conn->streamConn), NULL, NULL);
+		conn->wes_fd_change_count = PQsocketChangeCount(conn->streamConn);
+	}
+	else
+	{
+		/* No socket change needed, we just need to wait for the right event. */
+		ModifyWaitEvent(conn->wes, conn->wes_socket_position, io_flags, NULL);
+	}
+}
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index f8183cd488..6500d7275f 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -616,11 +616,7 @@ copy_read_data(void *outbuf, int minread, int maxread)
 		/*
 		 * Wait for more data or latch.
 		 */
-		(void) WaitLatchOrSocket(MyLatch,
-								 WL_SOCKET_READABLE | WL_LATCH_SET |
-								 WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
-								 fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
-
+		(void) walrcv_wait(wrconn, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
 		ResetLatch(MyLatch);
 	}
 
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index ad4a732fd2..f12763f54a 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1278,11 +1278,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 		else
 			wait_time = NAPTIME_PER_CYCLE;
 
-		rc = WaitLatchOrSocket(MyLatch,
-							   WL_SOCKET_READABLE | WL_LATCH_SET |
-							   WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
-							   fd, wait_time,
-							   WAIT_EVENT_LOGICAL_APPLY_MAIN);
+		rc = walrcv_wait(wrconn, wait_time, WAIT_EVENT_LOGICAL_APPLY_MAIN);
 
 		if (rc & WL_LATCH_SET)
 		{
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 2ab15c3cbb..e5d054d1a1 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -503,24 +503,9 @@ WalReceiverMain(void)
 				if (endofwal)
 					break;
 
-				/*
-				 * Ideally we would reuse a WaitEventSet object repeatedly
-				 * here to avoid the overheads of WaitLatchOrSocket on epoll
-				 * systems, but we can't be sure that libpq (or any other
-				 * walreceiver implementation) has the same socket (even if
-				 * the fd is the same number, it may have been closed and
-				 * reopened since the last time).  In future, if there is a
-				 * function for removing sockets from WaitEventSet, then we
-				 * could add and remove just the socket each time, potentially
-				 * avoiding some system calls.
-				 */
-				Assert(wait_fd != PGINVALID_SOCKET);
-				rc = WaitLatchOrSocket(walrcv->latch,
-									   WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
-									   WL_TIMEOUT | WL_LATCH_SET,
-									   wait_fd,
-									   NAPTIME_PER_CYCLE,
-									   WAIT_EVENT_WAL_RECEIVER_MAIN);
+				/* Wait for latch or data. */
+				rc = walrcv_wait(wrconn, NAPTIME_PER_CYCLE,
+								 WAIT_EVENT_WAL_RECEIVER_MAIN);
 				if (rc & WL_LATCH_SET)
 				{
 					ResetLatch(walrcv->latch);
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index e08afc6548..115d6acf18 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -239,6 +239,8 @@ typedef WalRcvExecResult *(*walrcv_exec_fn) (WalReceiverConn *conn,
 											 const int nRetTypes,
 											 const Oid *retTypes);
 typedef void (*walrcv_disconnect_fn) (WalReceiverConn *conn);
+typedef int (*walrcv_wait_fn) (WalReceiverConn *conn, long timeout,
+							   int wait_event);
 
 typedef struct WalReceiverFunctionsType
 {
@@ -257,6 +259,7 @@ typedef struct WalReceiverFunctionsType
 	walrcv_get_backend_pid_fn walrcv_get_backend_pid;
 	walrcv_exec_fn walrcv_exec;
 	walrcv_disconnect_fn walrcv_disconnect;
+	walrcv_wait_fn walrcv_wait;
 } WalReceiverFunctionsType;
 
 extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
@@ -291,6 +294,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
 	WalReceiverFunctions->walrcv_exec(conn, exec, nRetTypes, retTypes)
 #define walrcv_disconnect(conn) \
 	WalReceiverFunctions->walrcv_disconnect(conn)
+#define walrcv_wait(conn, timeout, wait_event) \
+	WalReceiverFunctions->walrcv_wait(conn, timeout, wait_event)
 
 static inline void
 walrcv_clear_result(WalRcvExecResult *walres)
-- 
2.20.1

