From ad8085d7646401f62d9b17af5274041bff77a720 Mon Sep 17 00:00:00 2001
From: Alexey Makhmutov <a.makhmutov@postgrespro.ru>
Date: Thu, 22 May 2025 04:02:50 +0300
Subject: [PATCH] Use only replayed position as target flush point for logical
 replication

For physical walsender the GetStandbyFlushRecPtr function returns position
of the latest stored WAL record, as we can stream data to downstream instance
without waiting for it being locally applied. However, for logical replication
we can sent data to downstream client only up to the locally applied position.
This distinction is important when we decide whether walsender has sent all the
available data to client ('caught up'). During the shutdown process walsender
backends are allowed to work until they reach the 'caught up' state, while
recovery process is already deactivated, so the applied position won't be
moving. In this case walsender for logical replication should work only until
it caught up with the latest applied record, otherwise it will stuck in the
infinite loop and inhibit instance shutdown.
---
 src/backend/replication/walsender.c | 43 ++++++++++++++++-------------
 1 file changed, 24 insertions(+), 19 deletions(-)

diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 1e0480b1d55..f91ba63454c 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -3189,37 +3189,42 @@ WalSndDone(WalSndSendDataCallback send_data)
 }
 
 /*
- * Returns the latest point in WAL that has been safely flushed to disk, and
- * can be sent to the standby. This should only be called when in recovery,
- * ie. we're streaming to a cascaded standby.
+ * This is called by cascading walsender to find WAL position to be sent to a
+ * downstream server. For streaming replication it returns* latest point in WAL
+ * that has been safely flushed to disk and for logical replication it returns
+ * latest replayed position. This should only be called when in recovery.
  *
- * As a side-effect, *tli is updated to the TLI of the last
- * replayed WAL record.
+ * As a side-effect, *tli is updated to the TLI of the last replayed WAL record.
  */
 static XLogRecPtr
 GetStandbyFlushRecPtr(TimeLineID *tli)
 {
-	XLogRecPtr	replayPtr;
 	TimeLineID	replayTLI;
-	XLogRecPtr	receivePtr;
-	TimeLineID	receiveTLI;
 	XLogRecPtr	result;
 
-	/*
-	 * We can safely send what's already been replayed. Also, if walreceiver
-	 * is streaming WAL from the same timeline, we can send anything that it
-	 * has streamed, but hasn't been replayed yet.
-	 */
-
-	receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
-	replayPtr = GetXLogReplayRecPtr(&replayTLI);
+	result = GetXLogReplayRecPtr(&replayTLI);
 
 	if (tli)
 		*tli = replayTLI;
 
-	result = replayPtr;
-	if (receiveTLI == replayTLI && receivePtr > replayPtr)
-		result = receivePtr;
+	/*
+	 * For physical replication we can send downstream anything that has
+	 * already been received if walreceiver is streaming WAL from the same
+	 * timeline. However, for logical replication the situation is different
+	 * and we can only decode and send downstream data that has already been
+	 * applied to the current instance. This distinction is especially
+	 * important during shutdown, as walsender is allowed to continue its work
+	 * until data is sent to the client up to the flush point, so it can get
+	 * stuck if we return stored rather than replayed data position.
+	 */
+	if (MyWalSnd->kind == REPLICATION_KIND_PHYSICAL)
+	{
+		TimeLineID	receiveTLI;
+		XLogRecPtr	receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
+
+		if (receiveTLI == replayTLI && receivePtr > result)
+			result = receivePtr;
+	}
 
 	return result;
 }
-- 
2.49.0

