From 00e9c3d575b0a72729980877e24b8d125c73071f Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Sat, 24 Aug 2024 20:12:08 +0300
Subject: [PATCH 10/11] Fix lost wakeup issue in logical replication launcher

by using a different interrupt reason for subscription changes

https://www.postgresql.org/message-id/flat/ff0663d9-8011-420f-a169-efbf57327cb5%40iki.fi#bef984f8c43d6b8a9428d2c5547fe72b
---
 src/backend/replication/logical/launcher.c | 23 ++++++++++++++--------
 src/include/storage/interrupt.h            |  2 ++
 2 files changed, 17 insertions(+), 8 deletions(-)

diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 78244af7c9..206733681a 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -57,7 +57,7 @@ LogicalRepWorker *MyLogicalRepWorker = NULL;
 typedef struct LogicalRepCtxStruct
 {
 	/* Supervisor process. */
-	pid_t		launcher_pid;
+	ProcNumber	launcher_procno;
 
 	/* Hash table holding last start times of subscriptions' apply workers. */
 	dsa_handle	last_start_dsa;
@@ -813,7 +813,7 @@ logicalrep_worker_cleanup(LogicalRepWorker *worker)
 static void
 logicalrep_launcher_onexit(int code, Datum arg)
 {
-	LogicalRepCtx->launcher_pid = 0;
+	LogicalRepCtx->launcher_procno = INVALID_PROC_NUMBER;
 }
 
 /*
@@ -973,6 +973,7 @@ ApplyLauncherShmemInit(void)
 
 		memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
 
+		LogicalRepCtx->launcher_procno = INVALID_PROC_NUMBER;
 		LogicalRepCtx->last_start_dsa = DSA_HANDLE_INVALID;
 		LogicalRepCtx->last_start_dsh = DSHASH_HANDLE_INVALID;
 
@@ -1118,8 +1119,12 @@ ApplyLauncherWakeupAtCommit(void)
 static void
 ApplyLauncherWakeup(void)
 {
-	if (LogicalRepCtx->launcher_pid != 0)
-		kill(LogicalRepCtx->launcher_pid, SIGUSR1);
+	volatile LogicalRepCtxStruct *repctx = LogicalRepCtx;
+	ProcNumber launcher_procno;
+
+	launcher_procno = repctx->launcher_procno;
+	if (launcher_procno != INVALID_PROC_NUMBER)
+		SendInterrupt(INTERRUPT_SUBSCRIPTION_CHANGE, launcher_procno);
 }
 
 /*
@@ -1133,8 +1138,8 @@ ApplyLauncherMain(Datum main_arg)
 
 	before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
 
-	Assert(LogicalRepCtx->launcher_pid == 0);
-	LogicalRepCtx->launcher_pid = MyProcPid;
+	Assert(LogicalRepCtx->launcher_procno == INVALID_PROC_NUMBER);
+	LogicalRepCtx->launcher_procno = MyProcNumber;
 
 	/* Establish signal handlers. */
 	pqsignal(SIGHUP, SignalHandlerForConfigReload);
@@ -1166,6 +1171,7 @@ ApplyLauncherMain(Datum main_arg)
 		oldctx = MemoryContextSwitchTo(subctx);
 
 		/* Start any missing workers for enabled subscriptions. */
+		ClearInterrupt(INTERRUPT_SUBSCRIPTION_CHANGE);
 		sublist = get_subscription_list();
 		foreach(lc, sublist)
 		{
@@ -1222,7 +1228,8 @@ ApplyLauncherMain(Datum main_arg)
 		MemoryContextDelete(subctx);
 
 		/* Wait for more work. */
-		rc = WaitInterrupt(1 << INTERRUPT_GENERAL_WAKEUP,
+		rc = WaitInterrupt(1 << INTERRUPT_GENERAL_WAKEUP |
+						   1 << INTERRUPT_SUBSCRIPTION_CHANGE,
 						   WL_INTERRUPT | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
 						   wait_time,
 						   WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
@@ -1249,7 +1256,7 @@ ApplyLauncherMain(Datum main_arg)
 bool
 IsLogicalLauncher(void)
 {
-	return LogicalRepCtx->launcher_pid == MyProcPid;
+	return LogicalRepCtx->launcher_procno == MyProcNumber;
 }
 
 /*
diff --git a/src/include/storage/interrupt.h b/src/include/storage/interrupt.h
index f0ed128526..ef18164c24 100644
--- a/src/include/storage/interrupt.h
+++ b/src/include/storage/interrupt.h
@@ -88,6 +88,8 @@ typedef enum
 {
 	INTERRUPT_GENERAL_WAKEUP,
 	INTERRUPT_RECOVERY_WAKEUP,
+	INTERRUPT_SUBSCRIPTION_CHANGE,		/* sent to logical replication launcher, when
+										 * a subscription changes */
 } InterruptType;
 
 /*
-- 
2.39.2

