From 00eb469bb537dfb134d36d79b93c337e92308557 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyoga.ntt@gmail.com>
Date: Fri, 13 Mar 2020 14:18:05 +0900
Subject: [PATCH 07/11] Reuse a WaitEventSet in libpqwalreceiver.c.

To avoid repeatedly setting up and tearing down WaitEventSet objects
and associated kernel objects, reuse a WaitEventSet.  Export a wait
function that is smart enough to handle socket changes under the
covers, and then use that for physical and logical replication.
---
 .../libpqwalreceiver/libpqwalreceiver.c       | 157 ++++++++++++++----
 src/include/replication/walreceiver.h         |   5 +
 2 files changed, 127 insertions(+), 35 deletions(-)

diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index e4fd1f9bb6..319d5b970d 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -23,6 +23,7 @@
 #include "catalog/pg_type.h"
 #include "funcapi.h"
 #include "libpq-fe.h"
+#include "libpq-events.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -41,6 +42,10 @@ struct WalReceiverConn
 {
 	/* Current connection to the primary, if any */
 	PGconn	   *streamConn;
+	/* Wait event set used to wait for I/O */
+	WaitEventSet *wes;
+	/* Used to handle changes in the underlying socket */
+	int			wes_socket_position;
 	/* Used to remember if the connection is logical or physical */
 	bool		logical;
 	/* Buffer for currently read records */
@@ -80,6 +85,7 @@ static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn,
 									   const int nRetTypes,
 									   const Oid *retTypes);
 static void libpqrcv_disconnect(WalReceiverConn *conn);
+static int libpqrcv_wait(WalReceiverConn *conn, long timeout, int wait_event);
 
 static WalReceiverFunctionsType PQWalReceiverFunctions = {
 	libpqrcv_connect,
@@ -96,13 +102,20 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
 	libpqrcv_create_slot,
 	libpqrcv_get_backend_pid,
 	libpqrcv_exec,
-	libpqrcv_disconnect
+	libpqrcv_disconnect,
+	libpqrcv_wait
 };
 
 /* Prototypes for private functions */
-static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query);
-static PGresult *libpqrcv_PQgetResult(PGconn *streamConn);
+static int walrcv_libpqcallback(PGEventId evtId, void *evtInfo,
+								void *passThrough);
+static PGresult *libpqrcv_PQexec(WalReceiverConn *conn, const char *query);
+static PGresult *libpqrcv_PQgetResult(WalReceiverConn *conn);
 static char *stringlist_to_identifierstr(PGconn *conn, List *strings);
+static int libpqrcv_wait_for_socket(WalReceiverConn *conn, int io_flags,
+									long timeout,
+									WaitEvent *occurred_events,
+									WaitEventClient event_type);
 
 /*
  * Module initialization function
@@ -115,6 +128,27 @@ _PG_init(void)
 	WalReceiverFunctions = &PQWalReceiverFunctions;
 }
 
+static int
+walrcv_libpqcallback(PGEventId evtId, void *evtInfo, void *passThrough)
+{
+	PGconn					   *conn PG_USED_FOR_ASSERTS_ONLY;
+	WalReceiverConn			   *walrcvconn = (WalReceiverConn *) passThrough;
+
+	/* return if not interested in the event or nothing to do */
+	if (evtId != PGEVT_CONNDISCONNECTION ||
+		walrcvconn->wes == NULL || walrcvconn->wes_socket_position < 0)
+		return true;
+
+	conn = ((PGEventConnDisconnection *)evtInfo)->conn;
+	Assert(walrcvconn->streamConn == conn);
+
+	/* The socket is already closed. */
+	RemoveWaitEvent(walrcvconn->wes, walrcvconn->wes_socket_position, true);
+	walrcvconn->wes_socket_position = -1;
+
+	return true;
+}
+
 /*
  * Establish the connection to the primary server for XLOG streaming
  *
@@ -129,6 +163,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
 	const char *keys[5];
 	const char *vals[5];
 	int			i = 0;
+	PGRegEventResult regresult;
 
 	/*
 	 * We use the expand_dbname parameter to process the connection string (or
@@ -168,6 +203,23 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
 		return NULL;
 	}
 
+	/* Create a WaitEventSet that will last as long as the connection. */
+	Assert(conn->streamConn);
+	regresult = PQregisterEventProc(conn->streamConn, walrcv_libpqcallback,
+									"libpqwalrcv disconnect callback", conn);
+	if (regresult != PGEVTREG_SUCCESS)
+	{
+		*err = pchomp(PQerrorMessage(conn->streamConn));
+		return NULL;
+	}
+		
+	conn->wes = CreateWaitEventSet(TopMemoryContext, 3);
+	conn->wes_socket_position =
+		AddWaitEventToSet(conn->wes, WL_SOCKET_READABLE,
+						  PQsocket(conn->streamConn), NULL, NULL);
+	AddWaitEventToSet(conn->wes, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL);
+	AddWaitEventToSet(conn->wes, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, NULL, NULL);
+
 	/*
 	 * Poll connection until we have OK or FAILED status.
 	 *
@@ -177,7 +229,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
 	do
 	{
 		int			io_flag;
-		int			rc;
+		WaitEvent	event;
 
 		if (status == PGRES_POLLING_READING)
 			io_flag = WL_SOCKET_READABLE;
@@ -189,21 +241,18 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
 		else
 			io_flag = WL_SOCKET_WRITEABLE;
 
-		rc = WaitLatchOrSocket(MyLatch,
-							   WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
-							   PQsocket(conn->streamConn),
-							   0,
-							   WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
+		(void) libpqrcv_wait_for_socket(conn, io_flag, -1, &event,
+										WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
 
 		/* Interrupted? */
-		if (rc & WL_LATCH_SET)
+		if (event.events == WL_LATCH_SET)
 		{
 			ResetLatch(MyLatch);
 			ProcessWalRcvInterrupts();
 		}
 
 		/* If socket is ready, advance the libpq state machine */
-		if (rc & io_flag)
+		if (event.events == io_flag)
 			status = PQconnectPoll(conn->streamConn);
 	} while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED);
 
@@ -322,7 +371,7 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
 	 * Get the system identifier and timeline ID as a DataRow message from the
 	 * primary server.
 	 */
-	res = libpqrcv_PQexec(conn->streamConn, "IDENTIFY_SYSTEM");
+	res = libpqrcv_PQexec(conn, "IDENTIFY_SYSTEM");
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
 		PQclear(res);
@@ -431,7 +480,7 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 						 options->proto.physical.startpointTLI);
 
 	/* Start streaming. */
-	res = libpqrcv_PQexec(conn->streamConn, cmd.data);
+	res = libpqrcv_PQexec(conn, cmd.data);
 	pfree(cmd.data);
 
 	if (PQresultStatus(res) == PGRES_COMMAND_OK)
@@ -479,7 +528,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 	 * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is
 	 * also possible in case we aborted the copy in mid-stream.
 	 */
-	res = libpqrcv_PQgetResult(conn->streamConn);
+	res = libpqrcv_PQgetResult(conn);
 	if (PQresultStatus(res) == PGRES_TUPLES_OK)
 	{
 		/*
@@ -493,7 +542,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 		PQclear(res);
 
 		/* the result set should be followed by CommandComplete */
-		res = libpqrcv_PQgetResult(conn->streamConn);
+		res = libpqrcv_PQgetResult(conn);
 	}
 	else if (PQresultStatus(res) == PGRES_COPY_OUT)
 	{
@@ -506,7 +555,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 							pchomp(PQerrorMessage(conn->streamConn)))));
 
 		/* CommandComplete should follow */
-		res = libpqrcv_PQgetResult(conn->streamConn);
+		res = libpqrcv_PQgetResult(conn);
 	}
 
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
@@ -516,7 +565,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 	PQclear(res);
 
 	/* Verify that there are no more results */
-	res = libpqrcv_PQgetResult(conn->streamConn);
+	res = libpqrcv_PQgetResult(conn);
 	if (res != NULL)
 		ereport(ERROR,
 				(errmsg("unexpected result after CommandComplete: %s",
@@ -540,7 +589,7 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
 	 * Request the primary to send over the history file for given timeline.
 	 */
 	snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
-	res = libpqrcv_PQexec(conn->streamConn, cmd);
+	res = libpqrcv_PQexec(conn, cmd);
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
 		PQclear(res);
@@ -582,8 +631,9 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
  * May return NULL, rather than an error result, on failure.
  */
 static PGresult *
-libpqrcv_PQexec(PGconn *streamConn, const char *query)
+libpqrcv_PQexec(WalReceiverConn *conn, const char *query)
 {
+	PGconn	   *streamConn = conn->streamConn;
 	PGresult   *lastResult = NULL;
 
 	/*
@@ -606,7 +656,7 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query)
 		/* Wait for, and collect, the next PGresult. */
 		PGresult   *result;
 
-		result = libpqrcv_PQgetResult(streamConn);
+		result = libpqrcv_PQgetResult(conn);
 		if (result == NULL)
 			break;				/* query is complete, or failure */
 
@@ -631,30 +681,28 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query)
  * Perform the equivalent of PQgetResult(), but watch for interrupts.
  */
 static PGresult *
-libpqrcv_PQgetResult(PGconn *streamConn)
+libpqrcv_PQgetResult(WalReceiverConn *conn)
 {
+	PGconn	   *streamConn = conn->streamConn;
+
 	/*
 	 * Collect data until PQgetResult is ready to get the result without
 	 * blocking.
 	 */
 	while (PQisBusy(streamConn))
 	{
-		int			rc;
+		WaitEvent	event;
 
 		/*
 		 * We don't need to break down the sleep into smaller increments,
 		 * since we'll get interrupted by signals and can handle any
 		 * interrupts here.
 		 */
-		rc = WaitLatchOrSocket(MyLatch,
-							   WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
-							   WL_LATCH_SET,
-							   PQsocket(streamConn),
-							   0,
-							   WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
+		(void) libpqrcv_wait_for_socket(conn, WL_SOCKET_READABLE, -1, &event,
+										WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 
 		/* Interrupted? */
-		if (rc & WL_LATCH_SET)
+		if (event.events == WL_LATCH_SET)
 		{
 			ResetLatch(MyLatch);
 			ProcessWalRcvInterrupts();
@@ -681,9 +729,27 @@ libpqrcv_disconnect(WalReceiverConn *conn)
 	PQfinish(conn->streamConn);
 	if (conn->recvBuf != NULL)
 		PQfreemem(conn->recvBuf);
+	FreeWaitEventSet(conn->wes);
 	pfree(conn);
 }
 
+/*
+ * Wait for new data to arrive, or a timeout.
+ */
+static int
+libpqrcv_wait(WalReceiverConn *conn, long timeout, int wait_event)
+{
+	WaitEvent	event;
+	int			rc;
+
+	rc = libpqrcv_wait_for_socket(conn, WL_SOCKET_READABLE,
+								  timeout, &event, wait_event);
+	if (rc == 0)
+		return WL_TIMEOUT;
+
+	return event.events;
+}
+
 /*
  * Receive a message available from XLOG stream.
  *
@@ -701,8 +767,7 @@ libpqrcv_disconnect(WalReceiverConn *conn)
  * ereports on error.
  */
 static int
-libpqrcv_receive(WalReceiverConn *conn, char **buffer,
-				 pgsocket *wait_fd)
+libpqrcv_receive(WalReceiverConn *conn, char **buffer, pgsocket *wait_fd)
 {
 	int			rawlen;
 
@@ -733,13 +798,13 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer,
 	{
 		PGresult   *res;
 
-		res = libpqrcv_PQgetResult(conn->streamConn);
+		res = libpqrcv_PQgetResult(conn);
 		if (PQresultStatus(res) == PGRES_COMMAND_OK)
 		{
 			PQclear(res);
 
 			/* Verify that there are no more results. */
-			res = libpqrcv_PQgetResult(conn->streamConn);
+			res = libpqrcv_PQgetResult(conn);
 			if (res != NULL)
 			{
 				PQclear(res);
@@ -839,7 +904,7 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
 		appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL");
 	}
 
-	res = libpqrcv_PQexec(conn->streamConn, cmd.data);
+	res = libpqrcv_PQexec(conn, cmd.data);
 	pfree(cmd.data);
 
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
@@ -963,7 +1028,7 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("the query interface requires a database connection")));
 
-	pgres = libpqrcv_PQexec(conn->streamConn, query);
+	pgres = libpqrcv_PQexec(conn, query);
 
 	switch (PQresultStatus(pgres))
 	{
@@ -1047,3 +1112,25 @@ stringlist_to_identifierstr(PGconn *conn, List *strings)
 
 	return res.data;
 }
+
+/*
+ * Update our WaitEventSet so that we can wait for 'io_flags' on our socket,
+ * considering that the socket might have changed.
+ */
+static int
+libpqrcv_wait_for_socket(WalReceiverConn *conn, int io_flags, long timeout,
+						 WaitEvent *occurred_events,
+						 WaitEventClient event_type)
+{
+	/*  wes is removed on disconnection, add new event if needed */
+	if (conn->wes_socket_position < 0)
+		conn->wes_socket_position =
+			AddWaitEventToSet(conn->wes, io_flags,
+							  PQsocket(conn->streamConn), NULL, NULL);
+	else
+		/* just set to wait for the right event */
+		ModifyWaitEvent(conn->wes, conn->wes_socket_position,
+						io_flags, NULL);
+
+	return WaitEventSetWait(conn->wes, timeout, occurred_events, 1, event_type);
+}
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index e08afc6548..115d6acf18 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -239,6 +239,8 @@ typedef WalRcvExecResult *(*walrcv_exec_fn) (WalReceiverConn *conn,
 											 const int nRetTypes,
 											 const Oid *retTypes);
 typedef void (*walrcv_disconnect_fn) (WalReceiverConn *conn);
+typedef int (*walrcv_wait_fn) (WalReceiverConn *conn, long timeout,
+							   int wait_event);
 
 typedef struct WalReceiverFunctionsType
 {
@@ -257,6 +259,7 @@ typedef struct WalReceiverFunctionsType
 	walrcv_get_backend_pid_fn walrcv_get_backend_pid;
 	walrcv_exec_fn walrcv_exec;
 	walrcv_disconnect_fn walrcv_disconnect;
+	walrcv_wait_fn walrcv_wait;
 } WalReceiverFunctionsType;
 
 extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
@@ -291,6 +294,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
 	WalReceiverFunctions->walrcv_exec(conn, exec, nRetTypes, retTypes)
 #define walrcv_disconnect(conn) \
 	WalReceiverFunctions->walrcv_disconnect(conn)
+#define walrcv_wait(conn, timeout, wait_event) \
+	WalReceiverFunctions->walrcv_wait(conn, timeout, wait_event)
 
 static inline void
 walrcv_clear_result(WalRcvExecResult *walres)
-- 
2.18.2

