From 9fba4ed4c5fcc686c5b574dd85f918b2e0e1feff Mon Sep 17 00:00:00 2001
From: Bertrand Drouvot <bertranddrouvot.pg@gmail.com>
Date: Mon, 3 Apr 2023 16:46:09 +0000
Subject: [PATCH va65 6/9] For cascading replication, wake up physical
 walsenders separately from logical walsenders.

Physical walsenders can't send data until it's been flushed; logical
walsenders can't decode and send data until it's been applied. On the
standby, the WAL is flushed first, which will only wake up physical
walsenders; and then applied, which will only wake up logical
walsenders.

Previously, all walsenders were awakened when the WAL was flushed. That
was fine for logical walsenders on the primary; but on the standby the
flushed WAL would have been not applied yet, so logical walsenders were
awakened too early.

Author: Bertrand Drouvot per idea from Jeff Davis and Amit Kapila.
Reviewed-By: Sawada Masahiko, Robert Haas.
---
 src/include/replication/walsender.h         | 22 +++++------
 src/include/replication/walsender_private.h |  3 ++
 src/backend/access/transam/xlog.c           |  6 +--
 src/backend/access/transam/xlogarchive.c    |  2 +-
 src/backend/access/transam/xlogrecovery.c   | 30 +++++++++++---
 src/backend/replication/walreceiver.c       |  2 +-
 src/backend/replication/walsender.c         | 43 +++++++++++++++++----
 7 files changed, 79 insertions(+), 29 deletions(-)

diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h
index 52bb3e2aae3..9df7e50f943 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -42,7 +42,7 @@ extern void WalSndResourceCleanup(bool isCommit);
 extern void WalSndSignals(void);
 extern Size WalSndShmemSize(void);
 extern void WalSndShmemInit(void);
-extern void WalSndWakeup(void);
+extern void WalSndWakeup(bool physical, bool logical);
 extern void WalSndInitStopping(void);
 extern void WalSndWaitStopping(void);
 extern void HandleWalSndInitStopping(void);
@@ -60,15 +60,15 @@ extern void WalSndRqstFileReload(void);
 /*
  * wakeup walsenders if there is work to be done
  */
-#define WalSndWakeupProcessRequests()		\
-	do										\
-	{										\
-		if (wake_wal_senders)				\
-		{									\
-			wake_wal_senders = false;		\
-			if (max_wal_senders > 0)		\
-				WalSndWakeup();				\
-		}									\
-	} while (0)
+static inline void
+WalSndWakeupProcessRequests(bool physical, bool logical)
+{
+	if (wake_wal_senders)
+	{
+		wake_wal_senders = false;
+		if (max_wal_senders > 0)
+			WalSndWakeup(physical, logical);
+	}
+}
 
 #endif							/* _WALSENDER_H */
diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h
index 5310e054c48..ff25aa70a89 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -15,6 +15,7 @@
 #include "access/xlog.h"
 #include "lib/ilist.h"
 #include "nodes/nodes.h"
+#include "nodes/replnodes.h"
 #include "replication/syncrep.h"
 #include "storage/latch.h"
 #include "storage/shmem.h"
@@ -79,6 +80,8 @@ typedef struct WalSnd
 	 * Timestamp of the last message received from standby.
 	 */
 	TimestampTz replyTime;
+
+	ReplicationKind kind;
 } WalSnd;
 
 extern PGDLLIMPORT WalSnd *MyWalSnd;
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 80a7cd8948f..d30575d8741 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -2645,7 +2645,7 @@ XLogFlush(XLogRecPtr record)
 	END_CRIT_SECTION();
 
 	/* wake up walsenders now that we've released heavily contended locks */
-	WalSndWakeupProcessRequests();
+	WalSndWakeupProcessRequests(true, !RecoveryInProgress());
 
 	/*
 	 * If we still haven't flushed to the request point then we have a
@@ -2816,7 +2816,7 @@ XLogBackgroundFlush(void)
 	END_CRIT_SECTION();
 
 	/* wake up walsenders now that we've released heavily contended locks */
-	WalSndWakeupProcessRequests();
+	WalSndWakeupProcessRequests(true, !RecoveryInProgress());
 
 	/*
 	 * Great, done. To take some work off the critical path, try to initialize
@@ -5762,7 +5762,7 @@ StartupXLOG(void)
 	 * If there were cascading standby servers connected to us, nudge any wal
 	 * sender processes to notice that we've been promoted.
 	 */
-	WalSndWakeup();
+	WalSndWakeup(true, true);
 
 	/*
 	 * If this was a promotion, request an (online) checkpoint now. This isn't
diff --git a/src/backend/access/transam/xlogarchive.c b/src/backend/access/transam/xlogarchive.c
index a0f5aa24b58..f3fb92c8f96 100644
--- a/src/backend/access/transam/xlogarchive.c
+++ b/src/backend/access/transam/xlogarchive.c
@@ -421,7 +421,7 @@ KeepFileRestoredFromArchive(const char *path, const char *xlogfname)
 	 * if we restored something other than a WAL segment, but it does no harm
 	 * either.
 	 */
-	WalSndWakeup();
+	WalSndWakeup(true, false);
 }
 
 /*
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index dbe93947627..e6427c54c57 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -1935,6 +1935,30 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
 	XLogRecoveryCtl->lastReplayedTLI = *replayTLI;
 	SpinLockRelease(&XLogRecoveryCtl->info_lck);
 
+	/*
+	 * Wakeup walsenders:
+	 *
+	 * On the standby, the WAL is flushed first (which will only wake up
+	 * physical walsenders) and then applied, which will only wake up logical
+	 * walsenders.
+	 *
+	 * Indeed, logical walsenders on standby can't decode and send data until
+	 * it's been applied.
+	 *
+	 * Physical walsenders don't need to be woken up during replay unless
+	 * cascading replication is allowed and time line change occured (so that
+	 * they can notice that they are on a new time line).
+	 *
+	 * That's why the wake up conditions are for:
+	 *
+	 *  - physical walsenders in case of new time line and cascade
+	 *  replication is allowed.
+	 *  - logical walsenders in case cascade replication is allowed (could not
+	 *  be created otherwise).
+	 */
+	if (AllowCascadeReplication())
+		WalSndWakeup(switchedTLI, true);
+
 	/*
 	 * If rm_redo called XLogRequestWalReceiverReply, then we wake up the
 	 * receiver so that it notices the updated lastReplayedEndRecPtr and sends
@@ -1958,12 +1982,6 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
 		 */
 		RemoveNonParentXlogFiles(xlogreader->EndRecPtr, *replayTLI);
 
-		/*
-		 * Wake up any walsenders to notice that we are on a new timeline.
-		 */
-		if (AllowCascadeReplication())
-			WalSndWakeup();
-
 		/* Reset the prefetcher. */
 		XLogPrefetchReconfigure();
 	}
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 685af51d5d3..feff7094351 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -1010,7 +1010,7 @@ XLogWalRcvFlush(bool dying, TimeLineID tli)
 		/* Signal the startup process and walsender that new WAL has arrived */
 		WakeupRecovery();
 		if (AllowCascadeReplication())
-			WalSndWakeup();
+			WalSndWakeup(true, false);
 
 		/* Report XLOG streaming progress in PS display */
 		if (update_process_title)
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index e40a9b1ba7b..66493b6e896 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2603,6 +2603,23 @@ InitWalSenderSlot(void)
 			walsnd->sync_standby_priority = 0;
 			walsnd->latch = &MyProc->procLatch;
 			walsnd->replyTime = 0;
+
+			/*
+			 * The kind assignment is done here and not in StartReplication()
+			 * and StartLogicalReplication(). Indeed, the logical walsender
+			 * needs to read WAL records (like snapshot of running
+			 * transactions) during the slot creation. So it needs to be woken
+			 * up based on its kind.
+			 *
+			 * The kind assignment could also be done in StartReplication(),
+			 * StartLogicalReplication() and CREATE_REPLICATION_SLOT but it
+			 * seems better to set it on one place.
+			 */
+			if (MyDatabaseId == InvalidOid)
+				walsnd->kind = REPLICATION_KIND_PHYSICAL;
+			else
+				walsnd->kind = REPLICATION_KIND_LOGICAL;
+
 			SpinLockRelease(&walsnd->mutex);
 			/* don't need the lock anymore */
 			MyWalSnd = (WalSnd *) walsnd;
@@ -3280,30 +3297,42 @@ WalSndShmemInit(void)
 }
 
 /*
- * Wake up all walsenders
+ * Wake up physical, logical or both walsenders kind
+ *
+ * The distinction between physical and logical walsenders is done, because:
+ * - physical walsenders can't send data until it's been flushed
+ * - logical walsenders on standby can't decode and send data until it's been
+ * applied
+ *
+ * For cascading replication we need to wake up physical
+ * walsenders separately from logical walsenders (see the comment before calling
+ * WalSndWakeup() in ApplyWalRecord() for more details).
  *
  * This will be called inside critical sections, so throwing an error is not
  * advisable.
  */
 void
-WalSndWakeup(void)
+WalSndWakeup(bool physical, bool logical)
 {
 	int			i;
 
 	for (i = 0; i < max_wal_senders; i++)
 	{
 		Latch	   *latch;
+		ReplicationKind kind;
 		WalSnd	   *walsnd = &WalSndCtl->walsnds[i];
 
-		/*
-		 * Get latch pointer with spinlock held, for the unlikely case that
-		 * pointer reads aren't atomic (as they're 8 bytes).
-		 */
+		/* get latch pointer and kind with spinlock helds */
 		SpinLockAcquire(&walsnd->mutex);
 		latch = walsnd->latch;
+		kind = walsnd->kind;
 		SpinLockRelease(&walsnd->mutex);
 
-		if (latch != NULL)
+		if (latch == NULL)
+			continue;
+
+		if ((physical && kind == REPLICATION_KIND_PHYSICAL) ||
+			(logical && kind == REPLICATION_KIND_LOGICAL))
 			SetLatch(latch);
 	}
 }
-- 
2.38.0

