From a28775fc5900cd740556a864e1826ceda268b794 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: Wed, 23 Aug 2023 09:25:35 +0200
Subject: [PATCH 1/3] Don't use LogicalRepWorker until ->in_use is verified

---
 src/backend/replication/logical/launcher.c | 4 ++--
 src/include/replication/worker_internal.h  | 8 ++++++--
 2 files changed, 8 insertions(+), 4 deletions(-)

diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 72e44d5a02..ec5156aa41 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -862,7 +862,7 @@ logicalrep_sync_worker_count(Oid subid)
 	{
 		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
 
-		if (w->subid == subid && isTablesyncWorker(w))
+		if (isTablesyncWorker(w) && w->subid == subid)
 			res++;
 	}
 
@@ -889,7 +889,7 @@ logicalrep_pa_worker_count(Oid subid)
 	{
 		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
 
-		if (w->subid == subid && isParallelApplyWorker(w))
+		if (isParallelApplyWorker(w) && w->subid == subid)
 			res++;
 	}
 
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index a428663859..8f4bed0958 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -327,8 +327,10 @@ extern void pa_decr_and_wait_stream_block(void);
 extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
 						   XLogRecPtr remote_lsn);
 
-#define isParallelApplyWorker(worker) ((worker)->type == WORKERTYPE_PARALLEL_APPLY)
-#define isTablesyncWorker(worker) ((worker)->type == WORKERTYPE_TABLESYNC)
+#define isParallelApplyWorker(worker) ((worker)->in_use && \
+									   (worker)->type == WORKERTYPE_PARALLEL_APPLY)
+#define isTablesyncWorker(worker) ((worker)->in_use && \
+								   (worker)->type == WORKERTYPE_TABLESYNC)
 
 static inline bool
 am_tablesync_worker(void)
@@ -339,12 +341,14 @@ am_tablesync_worker(void)
 static inline bool
 am_leader_apply_worker(void)
 {
+	Assert(MyLogicalRepWorker->in_use);
 	return (MyLogicalRepWorker->type == WORKERTYPE_APPLY);
 }
 
 static inline bool
 am_parallel_apply_worker(void)
 {
+	Assert(MyLogicalRepWorker->in_use);
 	return isParallelApplyWorker(MyLogicalRepWorker);
 }
 
-- 
2.30.2

