From 12f98aadb5a423b17b41e65d36c80355be1beb61 Mon Sep 17 00:00:00 2001
From: Alexey Makhmutov <a.makhmutov@postgrespro.ru>
Date: Thu, 22 May 2025 04:02:50 +0300
Subject: [PATCH] Use only replayed position as target flush point for logical
 replication

For physical walsender the GetStandbyFlushRecPtr function returns position
of the latest stored WAL record, as we can stream data to downstream instance
without waiting for it being locally applied. However, for logical replication
we can sent data to downstream client only up to the locally applied position.
This distinction is important when we decide whether walsender has sent all the
available data to client ('caught up'). During the shutdown process walsender
backends are allowed to work until they reach the 'caught up' state, while
recovery process is already deactivated, so the applied position won't be
moving. In this case walsender for logical replication should work only until
it caught up with the latest applied record, otherwise it will stuck in the
infinite loop and inhibit instance shutdown.
---
 src/backend/replication/walsender.c | 36 ++++++++++++++++++-----------
 1 file changed, 22 insertions(+), 14 deletions(-)

diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 9fa8beb6103..3c480d02393 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -3532,29 +3532,37 @@ WalSndDone(WalSndSendDataCallback send_data)
 XLogRecPtr
 GetStandbyFlushRecPtr(TimeLineID *tli)
 {
-	XLogRecPtr	replayPtr;
 	TimeLineID	replayTLI;
-	XLogRecPtr	receivePtr;
-	TimeLineID	receiveTLI;
 	XLogRecPtr	result;
 
 	Assert(am_cascading_walsender || IsSyncingReplicationSlots());
 
-	/*
-	 * We can safely send what's already been replayed. Also, if walreceiver
-	 * is streaming WAL from the same timeline, we can send anything that it
-	 * has streamed, but hasn't been replayed yet.
-	 */
-
-	receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
-	replayPtr = GetXLogReplayRecPtr(&replayTLI);
+	result = GetXLogReplayRecPtr(&replayTLI);
 
 	if (tli)
 		*tli = replayTLI;
 
-	result = replayPtr;
-	if (receiveTLI == replayTLI && receivePtr > replayPtr)
-		result = receivePtr;
+	/*
+	 * For physical replication we can send downstream anything that has
+	 * already been received if walreceiver is streaming WAL from the same
+	 * timeline. This is also true if this function is called from slot
+	 * synchronization routines (MyWalSnd is NULL in such case), since they
+	 * also only track the physically flushed position. However, for logical
+	 * replication the situation is different and we can only decode and send
+	 * downstream data that has already been applied to the current instance.
+	 * This distinction is especially important during shutdown, as walsender
+	 * is allowed to continue its work until data is sent to the client up to
+	 * the flush point, so it can get stuck if we return stored rather than
+	 * replayed data position.
+	 */
+	if (!MyWalSnd || MyWalSnd->kind == REPLICATION_KIND_PHYSICAL)
+	{
+		TimeLineID	receiveTLI;
+		XLogRecPtr	receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
+
+		if (receiveTLI == replayTLI && receivePtr > result)
+			result = receivePtr;
+	}
 
 	return result;
 }
-- 
2.49.0

