From 22b2c474b476ca91b579408f73ebdc014bb76083 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Mon, 24 Feb 2020 23:48:29 +1300
Subject: [PATCH v8 2/2] Use FeBeWaitSet for walsender.c.

This avoids the need to set up and tear down a new WaitEventSet every
time we wait.

Reviewed-by: Kyotaro Horiguchi <horikyota.ntt@gmail.com>
Discussion: https://postgr.es/m/CA%2BhUKGJAC4Oqao%3DqforhNey20J8CiG2R%3DoBPqvfR0vOJrFysGw%40mail.gmail.com
---
 src/backend/replication/walsender.c | 43 +++++++++++++++++------------
 1 file changed, 25 insertions(+), 18 deletions(-)

diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 81244541e2..eb3f18ed48 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -244,6 +244,7 @@ static void WalSndKeepalive(bool requestReply);
 static void WalSndKeepaliveIfNecessary(void);
 static void WalSndCheckTimeOut(void);
 static long WalSndComputeSleeptime(TimestampTz now);
+static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event);
 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid);
@@ -1287,7 +1288,6 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 	/* If we have pending write here, go to slow path */
 	for (;;)
 	{
-		int			wakeEvents;
 		long		sleeptime;
 
 		/* Check for input from the client */
@@ -1304,13 +1304,9 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 
 		sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
 
-		wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH |
-			WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT;
-
 		/* Sleep until something happens or we time out */
-		(void) WaitLatchOrSocket(MyLatch, wakeEvents,
-								 MyProcPort->sock, sleeptime,
-								 WAIT_EVENT_WAL_SENDER_WRITE_DATA);
+		WalSndWait(WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE, sleeptime,
+				   WAIT_EVENT_WAL_SENDER_WRITE_DATA);
 
 		/* Clear any already-pending wakeups */
 		ResetLatch(MyLatch);
@@ -1480,15 +1476,12 @@ WalSndWaitForWal(XLogRecPtr loc)
 		 */
 		sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
 
-		wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH |
-			WL_SOCKET_READABLE | WL_TIMEOUT;
+		wakeEvents = WL_SOCKET_READABLE;
 
 		if (pq_is_send_pending())
 			wakeEvents |= WL_SOCKET_WRITEABLE;
 
-		(void) WaitLatchOrSocket(MyLatch, wakeEvents,
-								 MyProcPort->sock, sleeptime,
-								 WAIT_EVENT_WAL_SENDER_WAIT_WAL);
+		WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_WAL);
 	}
 
 	/* reactivate latch so WalSndLoop knows to continue */
@@ -2348,10 +2341,10 @@ WalSndLoop(WalSndSendDataCallback send_data)
 			long		sleeptime;
 			int			wakeEvents;
 
-			wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT;
-
 			if (!streamingDoneReceiving)
-				wakeEvents |= WL_SOCKET_READABLE;
+				wakeEvents = WL_SOCKET_READABLE;
+			else
+				wakeEvents = 0;
 
 			/*
 			 * Use fresh timestamp, not last_processing, to reduce the chance
@@ -2363,9 +2356,7 @@ WalSndLoop(WalSndSendDataCallback send_data)
 				wakeEvents |= WL_SOCKET_WRITEABLE;
 
 			/* Sleep until something happens or we time out */
-			(void) WaitLatchOrSocket(MyLatch, wakeEvents,
-									 MyProcPort->sock, sleeptime,
-									 WAIT_EVENT_WAL_SENDER_MAIN);
+			WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN);
 		}
 	}
 }
@@ -3121,6 +3112,22 @@ WalSndWakeup(void)
 	}
 }
 
+/*
+ * Wait for readiness on the FeBe socket, or a timeout.  The mask should be
+ * composed of optional WL_SOCKET_WRITEABLE and WL_SOCKET_READABLE flags.  Exit
+ * on postmaster death.
+ */
+static void
+WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
+{
+	WaitEvent event;
+
+	ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL);
+	if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 &&
+		(event.events & WL_POSTMASTER_DEATH))
+		proc_exit(1);
+}
+
 /*
  * Signal all walsenders to move to stopping state.
  *
-- 
2.30.0

