From 14c0ac1aacfacb3b6b42bf0c5e453ab4e989aa2c Mon Sep 17 00:00:00 2001
From: Craig Ringer <craig@2ndquadrant.com>
Date: Tue, 10 May 2016 10:34:10 +0800
Subject: [PATCH] Respect client-initiated CopyDone in walsender

---
 src/backend/replication/walsender.c | 32 +++++++++++++++++++++++++++-----
 1 file changed, 27 insertions(+), 5 deletions(-)

diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 926a247..4e2164e 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -759,6 +759,13 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 	/* make sure we have enough WAL available */
 	flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
 
+	/*
+	 * If the client sent CopyDone while we were waiting,
+	 * bail out so we can wind up the decoding session.
+	 */
+	if (streamingDoneSending)
+		return -1;
+
 	/* more than one block available */
 	if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
 		count = XLOG_BLCKSZ;
@@ -1220,8 +1227,11 @@ WalSndWaitForWal(XLogRecPtr loc)
 		 * It's important to do this check after the recomputation of
 		 * RecentFlushPtr, so we can send all remaining data before shutting
 		 * down.
+		 *
+		 * We'll also exit here if the client sent CopyDone because it wants
+		 * to return to command mode.
 		 */
-		if (walsender_ready_to_stop)
+		if (walsender_ready_to_stop || streamingDoneReceiving)
 			break;
 
 		/*
@@ -1789,7 +1799,14 @@ WalSndCheckTimeOut(TimestampTz now)
 	}
 }
 
-/* Main loop of walsender process that streams the WAL over Copy messages. */
+/*
+ * Main loop of walsender process that streams the WAL over Copy messages.
+ *
+ * The send_data callback must enqueue complete CopyData messages to libpq
+ * using pq_putmessage_noblock or similar, since the walsender loop may send
+ * CopyDone then exit and return to command mode in response to a client
+ * CopyDone between calls to send_data.
+ */
 static void
 WalSndLoop(WalSndSendDataCallback send_data)
 {
@@ -1852,10 +1869,15 @@ WalSndLoop(WalSndSendDataCallback send_data)
 		 * some more.  If there is some, we don't bother to call send_data
 		 * again until we've flushed it ... but we'd better assume we are not
 		 * caught up.
+		 *
+		 * If we're trying to finish sending and exit we shouldn't enqueue more
+		 * data to libpq. We need to finish writing out whatever we already
+		 * have in libpq's send buffer to maintain protocol sync so we still
+		 * need to loop until it's flushed.
 		 */
-		if (!pq_is_send_pending())
+		if (!pq_is_send_pending() && !streamingDoneSending)
 			send_data();
-		else
+		else if (!streamingDoneSending)
 			WalSndCaughtUp = false;
 
 		/* Try to flush pending output to the client */
@@ -2911,7 +2933,7 @@ WalSndKeepaliveIfNecessary(TimestampTz now)
 	if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
 		return;
 
-	if (waiting_for_ping_response)
+	if (waiting_for_ping_response || streamingDoneSending)
 		return;
 
 	/*
-- 
2.5.5

