From 81b3f61b0235e1b936e3336e43ce55c00bc230c4 Mon Sep 17 00:00:00 2001
From: Craig Ringer <craig@2ndquadrant.com>
Date: Tue, 10 May 2016 10:34:10 +0800
Subject: [PATCH 1/2] Respect client-initiated CopyDone in walsender

The walsender never looked for CopyDone sent by the client unless it
had already decided it was done sending data and dispatched its own
CopyDone message.

Check for client-initiated CopyDone when in COPY BOTH mode, returning to
command mode. In logical decoding this will allow the client to end a logical
decoding session between transactions without just unilaterally closing its
connection.

This change does not allow a client to end COPY BOTH session in the middle of
processing a logical decoding block.

TODO effect on physical walsender?
---
 src/backend/replication/walsender.c | 32 +++++++++++++++++++++++++++-----
 1 file changed, 27 insertions(+), 5 deletions(-)

diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 1ea2a5c..c43310c 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -759,6 +759,13 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 	/* make sure we have enough WAL available */
 	flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
 
+	/*
+	 * If the client sent CopyDone while we were waiting,
+	 * bail out so we can wind up the decoding session.
+	 */
+	if (streamingDoneSending)
+		return -1;
+
 	/* more than one block available */
 	if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
 		count = XLOG_BLCKSZ;
@@ -1220,8 +1227,11 @@ WalSndWaitForWal(XLogRecPtr loc)
 		 * It's important to do this check after the recomputation of
 		 * RecentFlushPtr, so we can send all remaining data before shutting
 		 * down.
+		 *
+		 * We'll also exit here if the client sent CopyDone because it wants
+		 * to return to command mode.
 		 */
-		if (walsender_ready_to_stop)
+		if (walsender_ready_to_stop || streamingDoneReceiving)
 			break;
 
 		/*
@@ -1787,7 +1797,14 @@ WalSndCheckTimeOut(TimestampTz now)
 	}
 }
 
-/* Main loop of walsender process that streams the WAL over Copy messages. */
+/*
+ * Main loop of walsender process that streams the WAL over Copy messages.
+ *
+ * The send_data callback must enqueue complete CopyData messages to libpq
+ * using pq_putmessage_noblock or similar, since the walsender loop may send
+ * CopyDone then exit and return to command mode in response to a client
+ * CopyDone between calls to send_data.
+ */
 static void
 WalSndLoop(WalSndSendDataCallback send_data)
 {
@@ -1850,10 +1867,15 @@ WalSndLoop(WalSndSendDataCallback send_data)
 		 * some more.  If there is some, we don't bother to call send_data
 		 * again until we've flushed it ... but we'd better assume we are not
 		 * caught up.
+		 *
+		 * If we're trying to finish sending and exit we shouldn't enqueue more
+		 * data to libpq. We need to finish writing out whatever we already
+		 * have in libpq's send buffer to maintain protocol sync so we still
+		 * need to loop until it's flushed.
 		 */
-		if (!pq_is_send_pending())
+		if (!pq_is_send_pending() && !streamingDoneSending)
 			send_data();
-		else
+		else if (!streamingDoneSending)
 			WalSndCaughtUp = false;
 
 		/* Try to flush pending output to the client */
@@ -2909,7 +2931,7 @@ WalSndKeepaliveIfNecessary(TimestampTz now)
 	if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
 		return;
 
-	if (waiting_for_ping_response)
+	if (waiting_for_ping_response || streamingDoneSending)
 		return;
 
 	/*
-- 
2.5.5

