From 52fe2c5094c3a65ff8bafced4b0f522007068ebb Mon Sep 17 00:00:00 2001
From: Alexey Makhmutov <a.makhmutov@postgrespro.ru>
Date: Thu, 22 May 2025 04:02:50 +0300
Subject: [PATCH] Use only replayed position as target flush point for logical
 replication

For physical walsender the GetStandbyFlushRecPtr function returns position
of the latest stored WAL record, as we can stream data to downstream instance
without waiting for it being locally applied. However, for logical replication
we can sent data to downstream client only up to the locally applied position.
This distinction is important when we decide whether walsender has sent all the
available data to client ('caught up'). During the shutdown process walsender
backends are allowed to work until they reach the 'caught up' state, while
recovery process is already deactivated, so the applied position won't be
moving. In this case walsender for logical replication should work only until
it caught up with the latest applied record, otherwise it will stuck in the
infinite loop and inhibit instance shutdown.
---
 src/backend/replication/walsender.c | 49 ++++++++++++++++-------------
 1 file changed, 28 insertions(+), 21 deletions(-)

diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 9fa8beb6103..49d5b3fea5a 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -3519,42 +3519,49 @@ WalSndDone(WalSndSendDataCallback send_data)
 }
 
 /*
- * Returns the latest point in WAL that has been safely flushed to disk.
- * This should only be called when in recovery.
- *
  * This is called either by cascading walsender to find WAL position to be sent
- * to a cascaded standby or by slot synchronization operation to validate remote
- * slot's lsn before syncing it locally.
+ * to a downstream server or by slot synchronization operation to validate remote
+ * slot's lsn before syncing it locally. For streaming replication it returns
+ * latest point in WAL that has been safely flushed to disk and for logical
+ * replication it returns latest replayed position. This should only be called
+ * when in recovery.
  *
- * As a side-effect, *tli is updated to the TLI of the last
- * replayed WAL record.
+ * As a side-effect, *tli is updated to the TLI of the last replayed WAL record.
  */
 XLogRecPtr
 GetStandbyFlushRecPtr(TimeLineID *tli)
 {
-	XLogRecPtr	replayPtr;
 	TimeLineID	replayTLI;
-	XLogRecPtr	receivePtr;
-	TimeLineID	receiveTLI;
 	XLogRecPtr	result;
 
 	Assert(am_cascading_walsender || IsSyncingReplicationSlots());
 
-	/*
-	 * We can safely send what's already been replayed. Also, if walreceiver
-	 * is streaming WAL from the same timeline, we can send anything that it
-	 * has streamed, but hasn't been replayed yet.
-	 */
-
-	receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
-	replayPtr = GetXLogReplayRecPtr(&replayTLI);
+	result = GetXLogReplayRecPtr(&replayTLI);
 
 	if (tli)
 		*tli = replayTLI;
 
-	result = replayPtr;
-	if (receiveTLI == replayTLI && receivePtr > replayPtr)
-		result = receivePtr;
+	/*
+	 * For physical replication we can send downstream anything that has
+	 * already been received if walreceiver is streaming WAL from the same
+	 * timeline. This is also true if this function is called from slot
+	 * synchronization routines (MyWalSnd is NULL in such case), since they
+	 * also only track the physically flushed position. However, for logical
+	 * replication the situation is different and we can only decode and send
+	 * downstream data that has already been applied to the current instance.
+	 * This distinction is especially important during shutdown, as walsender
+	 * is allowed to continue its work until data is sent to the client up to
+	 * the flush point, so it can get stuck if we return stored rather than
+	 * replayed data position.
+	 */
+	if (!MyWalSnd || MyWalSnd->kind == REPLICATION_KIND_PHYSICAL)
+	{
+		TimeLineID	receiveTLI;
+		XLogRecPtr	receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
+
+		if (receiveTLI == replayTLI && receivePtr > result)
+			result = receivePtr;
+	}
 
 	return result;
 }
-- 
2.49.0

