From 1d91e067fad0bc68b55d6d771cc03078681de780 Mon Sep 17 00:00:00 2001
From: Alexey Makhmutov <a.makhmutov@postgrespro.ru>
Date: Wed, 28 May 2025 17:45:52 +0300
Subject: [PATCH] Use only replayed position as target flush point for cascaded
 logical replication

For physical walsender the GetStandbyFlushRecPtr function returns position
of the latest stored WAL record, as we can stream data to downstream instance
without waiting for it being locally applied. However, for cascaded logical
replication we can sent data to downstream client only up to the locally
applied position. This distinction is important when we decide whether
walsender has sent all the available data to client ('caught up'). During the
shutdown process walsender backends are allowed to work until they reach the
'caught up' state, while recovery process is already deactivated, so the
applied position won't be moving. In this case walsender for logical
replication should work only until it caught up with the latest applied record,
otherwise it will stuck in the infinite loop and inhibit instance shutdown.
To avoid such problems XLogSendLogical method should use GetXLogReplayRecPtr
function rather than GetStandbyFlushRecPtr.
---
 src/backend/replication/walsender.c | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)

diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 9fa8beb6103..c7d475b99b6 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -3449,8 +3449,17 @@ XLogSendLogical(void)
 	if (flushPtr == InvalidXLogRecPtr ||
 		logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
 	{
+		/*
+		 * For cascaded logical replication we need to use replay position
+		 * rather than flushed one, as we could not decode and stream data
+		 * which has not yet been applied locally. This distinction is
+		 * especially imporant during shutdown sequence, as caught up state is
+		 * calculated based on flushPtr and using flush position in this
+		 * context will cause indefinite wait in walsender for records which
+		 * won't be applied due to shutdown.
+		 */
 		if (am_cascading_walsender)
-			flushPtr = GetStandbyFlushRecPtr(NULL);
+			flushPtr = GetXLogReplayRecPtr(NULL);
 		else
 			flushPtr = GetFlushRecPtr(NULL);
 	}
-- 
2.49.0

