From 1e28b9c21a953e66c48f78a90192f3a0ca83d0aa Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Tue, 2 Mar 2021 11:08:06 +1300
Subject: [PATCH v6 1/3] Add condition variable for walreceiver state.

Use this new CV to wait for walreceiver shutdown without a sleep/poll
loop, while also benefiting from standard postmaster death handling.

Reviewed-by: Heikki Linnakangas <hlinnaka@iki.fi>
Reviewed-by: Fujii Masao <masao.fujii@oss.nttdata.com>
Reviewed-by: Michael Paquier <michael@paquier.xyz>
Discussion: https://postgr.es/m/CA%2BhUKGK1607VmtrDUHQXrsooU%3Dap4g4R2yaoByWOOA3m8xevUQ%40mail.gmail.com
---
 doc/src/sgml/monitoring.sgml               |  4 +++
 src/backend/postmaster/pgstat.c            |  3 ++
 src/backend/replication/walreceiver.c      | 10 ++++++
 src/backend/replication/walreceiverfuncs.c | 42 ++++++++++++++--------
 src/include/pgstat.h                       |  1 +
 src/include/replication/walreceiver.h      |  2 ++
 6 files changed, 48 insertions(+), 14 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 3513e127b7..cf00210cb3 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1757,6 +1757,10 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
       <entry>Waiting for confirmation from a remote server during synchronous
        replication.</entry>
      </row>
+     <row>
+      <entry><literal>WalrcvExit</literal></entry>
+      <entry>Waiting for the walreceiver to exit.</entry>
+     </row>
      <row>
       <entry><literal>XactGroupUpdate</literal></entry>
       <entry>Waiting for the group leader to update transaction status at
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index f75b52719d..2dbfb81e40 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -4122,6 +4122,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
 		case WAIT_EVENT_SYNC_REP:
 			event_name = "SyncRep";
 			break;
+		case WAIT_EVENT_WALRCV_EXIT:
+			event_name = "WalrcvExit";
+			break;
 		case WAIT_EVENT_XACT_GROUP_UPDATE:
 			event_name = "XactGroupUpdate";
 			break;
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index e5f8a06fea..397a94d7af 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -207,6 +207,8 @@ WalReceiverMain(void)
 
 		case WALRCV_STOPPED:
 			SpinLockRelease(&walrcv->mutex);
+			/* We might have changed state and fallen through above. */
+			ConditionVariableBroadcast(&walrcv->walRcvStateChanged);
 			proc_exit(1);
 			break;
 
@@ -249,6 +251,8 @@ WalReceiverMain(void)
 
 	SpinLockRelease(&walrcv->mutex);
 
+	ConditionVariableBroadcast(&walrcv->walRcvStateChanged);
+
 	pg_atomic_write_u64(&WalRcv->writtenUpto, 0);
 
 	/* Arrange to clean up at walreceiver exit */
@@ -647,6 +651,8 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 	walrcv->receiveStartTLI = 0;
 	SpinLockRelease(&walrcv->mutex);
 
+	ConditionVariableBroadcast(&walrcv->walRcvStateChanged);
+
 	set_ps_display("idle");
 
 	/*
@@ -675,6 +681,8 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 			*startpointTLI = walrcv->receiveStartTLI;
 			walrcv->walRcvState = WALRCV_STREAMING;
 			SpinLockRelease(&walrcv->mutex);
+
+			ConditionVariableBroadcast(&walrcv->walRcvStateChanged);
 			break;
 		}
 		if (walrcv->walRcvState == WALRCV_STOPPING)
@@ -784,6 +792,8 @@ WalRcvDie(int code, Datum arg)
 	walrcv->latch = NULL;
 	SpinLockRelease(&walrcv->mutex);
 
+	ConditionVariableBroadcast(&walrcv->walRcvStateChanged);
+
 	/* Terminate the connection gracefully. */
 	if (wrconn != NULL)
 		walrcv_disconnect(wrconn);
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index 63e60478ea..9106f43d51 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -23,8 +23,10 @@
 #include <signal.h>
 
 #include "access/xlog_internal.h"
+#include "pgstat.h"
 #include "postmaster/startup.h"
 #include "replication/walreceiver.h"
+#include "storage/latch.h"
 #include "storage/pmsignal.h"
 #include "storage/shmem.h"
 #include "utils/timestamp.h"
@@ -62,6 +64,7 @@ WalRcvShmemInit(void)
 		/* First time through, so initialize */
 		MemSet(WalRcv, 0, WalRcvShmemSize());
 		WalRcv->walRcvState = WALRCV_STOPPED;
+		ConditionVariableInit(&WalRcv->walRcvStateChanged);
 		SpinLockInit(&WalRcv->mutex);
 		pg_atomic_init_u64(&WalRcv->writtenUpto, 0);
 		WalRcv->latch = NULL;
@@ -95,12 +98,18 @@ WalRcvRunning(void)
 
 		if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
 		{
-			SpinLockAcquire(&walrcv->mutex);
+			bool		state_changed = false;
 
+			SpinLockAcquire(&walrcv->mutex);
 			if (walrcv->walRcvState == WALRCV_STARTING)
+			{
 				state = walrcv->walRcvState = WALRCV_STOPPED;
-
+				state_changed = true;
+			}
 			SpinLockRelease(&walrcv->mutex);
+
+			if (state_changed)
+				ConditionVariableBroadcast(&walrcv->walRcvStateChanged);
 		}
 	}
 
@@ -140,12 +149,18 @@ WalRcvStreaming(void)
 
 		if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
 		{
-			SpinLockAcquire(&walrcv->mutex);
+			bool		state_changed = false;
 
+			SpinLockAcquire(&walrcv->mutex);
 			if (walrcv->walRcvState == WALRCV_STARTING)
+			{
 				state = walrcv->walRcvState = WALRCV_STOPPED;
-
+				state_changed = true;
+			}
 			SpinLockRelease(&walrcv->mutex);
+
+			if (state_changed)
+				ConditionVariableBroadcast(&walrcv->walRcvStateChanged);
 		}
 	}
 
@@ -191,6 +206,8 @@ ShutdownWalRcv(void)
 	}
 	SpinLockRelease(&walrcv->mutex);
 
+	ConditionVariableBroadcast(&walrcv->walRcvStateChanged);
+
 	/*
 	 * Signal walreceiver process if it was still running.
 	 */
@@ -199,18 +216,13 @@ ShutdownWalRcv(void)
 
 	/*
 	 * Wait for walreceiver to acknowledge its death by setting state to
-	 * WALRCV_STOPPED.
+	 * WALRCV_STOPPED.  This wait contains a standard CHECK_FOR_INTERRUPTS().
 	 */
+	ConditionVariablePrepareToSleep(&walrcv->walRcvStateChanged);
 	while (WalRcvRunning())
-	{
-		/*
-		 * This possibly-long loop needs to handle interrupts of startup
-		 * process.
-		 */
-		HandleStartupProcInterrupts();
-
-		pg_usleep(100000);		/* 100ms */
-	}
+		ConditionVariableSleep(&walrcv->walRcvStateChanged,
+							   WAIT_EVENT_WALRCV_EXIT);
+	ConditionVariableCancelSleep();
 }
 
 /*
@@ -298,6 +310,8 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
 
 	SpinLockRelease(&walrcv->mutex);
 
+	ConditionVariableBroadcast(&walrcv->walRcvStateChanged);
+
 	if (launch)
 		SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
 	else if (latch)
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 724068cf87..222e38037c 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -998,6 +998,7 @@ typedef enum
 	WAIT_EVENT_REPLICATION_SLOT_DROP,
 	WAIT_EVENT_SAFE_SNAPSHOT,
 	WAIT_EVENT_SYNC_REP,
+	WAIT_EVENT_WALRCV_EXIT,
 	WAIT_EVENT_XACT_GROUP_UPDATE
 } WaitEventIPC;
 
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index a97a59a6a3..bddea21a30 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -19,6 +19,7 @@
 #include "port/atomics.h"
 #include "replication/logicalproto.h"
 #include "replication/walsender.h"
+#include "storage/condition_variable.h"
 #include "storage/latch.h"
 #include "storage/spin.h"
 #include "utils/tuplestore.h"
@@ -62,6 +63,7 @@ typedef struct
 	 */
 	pid_t		pid;
 	WalRcvState walRcvState;
+	ConditionVariable walRcvStateChanged;
 	pg_time_t	startTime;
 
 	/*
-- 
2.30.1

