diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 49cce38..ec33357 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -194,6 +194,15 @@ static volatile sig_atomic_t replication_active = false;
 static LogicalDecodingContext *logical_decoding_ctx = NULL;
 static XLogRecPtr logical_startptr = InvalidXLogRecPtr;
 
+/*
+ * For logical replication, WalSndWriteData needs to process replies from the
+ * client to check if keepalive to be sent.  WAL send loop may skip check
+ * replies only while check_replies_needed = false.
+ */
+#define REPLY_SEND_TIMEOUT USER_TIMEOUT
+static bool	keepalive_timeout_initialized = false;
+static bool	check_replies_needed = false;
+
 /* A sample associating a WAL location with the time it was written. */
 typedef struct
 {
@@ -1175,12 +1184,12 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 	memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
 		   tmpbuf.data, sizeof(int64));
 
-	/* fast path */
 	/* Try to flush pending output to the client */
 	if (pq_flush_if_writable() != 0)
 		WalSndShutdown();
 
-	if (!pq_is_send_pending())
+ 	/* fast path: return immediately if possible */
+	if (!check_replies_needed && !pq_is_send_pending())
 		return;
 
 	for (;;)
@@ -1216,10 +1225,6 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 		if (pq_flush_if_writable() != 0)
 			WalSndShutdown();
 
-		/* If we finished clearing the buffered data, we're done here. */
-		if (!pq_is_send_pending())
-			break;
-
 		now = GetCurrentTimestamp();
 
 		/* die if timeout was reached */
@@ -1228,6 +1233,10 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 		/* Send keepalive if the time has come */
 		WalSndKeepaliveIfNecessary(now);
 
+		/* If we finished clearing the buffered data, we're done here. */
+		if (!pq_is_send_pending())
+			break;
+
 		sleeptime = WalSndComputeSleeptime(now);
 
 		wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
@@ -1562,6 +1571,13 @@ exec_replication_command(const char *cmd_string)
 	return true;
 }
 
+static void
+LogicalDecodeReplyTimeoutHandler(void)
+{
+	check_replies_needed = true;
+}
+
+
 /*
  * Process any incoming messages while streaming. Also checks if the remote
  * end has closed the connection.
@@ -1662,6 +1678,22 @@ ProcessRepliesIfAny(void)
 	{
 		last_reply_timestamp = GetCurrentTimestamp();
 		waiting_for_ping_response = false;
+
+		if (wal_sender_timeout > 0)
+		{
+			if (!keepalive_timeout_initialized)
+			{
+				RegisterTimeout(REPLY_SEND_TIMEOUT,
+								LogicalDecodeReplyTimeoutHandler);
+				keepalive_timeout_initialized = true;
+			}
+
+			check_replies_needed = false;
+			enable_timeout_at(REPLY_SEND_TIMEOUT,
+						  TimestampTzPlusMilliseconds(last_reply_timestamp,
+													  wal_sender_timeout / 2));
+		}
+		
 	}
 }
 
