Reviving the "Stopping logical replication protocol" patch from Vladimir Gordichuk

Started by Dave Cramerover 7 years ago16 messages
#1Dave Cramer
davecramer@gmail.com
4 attachment(s)

Back in 2016 a patch was proposed that seems to have died on the vine. See
/messages/by-id/CAFgjRd3hdYOa33m69TbeOfNNer2BZbwa8FFjt2V5VFzTBvUU3w@mail.gmail.com

for the history and https://commitfest.postgresql.org/10/621/ for the
commitfest entry.

I've rebased the patches and attached them for consideration.

JDBC tests here
https://github.com/pgjdbc/pgjdbc/blob/master/pgjdbc/src/test/java/org/postgresql/replication/LogicalReplicationTest.java
all pass

Regards,

Dave Cramer

Attachments:

0004-Add-test-for-pg_recvlogical-to-stop-replication.patchapplication/octet-stream; name=0004-Add-test-for-pg_recvlogical-to-stop-replication.patchDownload
From 74a0ddc336d810a7fd2258dd05b1ba6e7ee7255e Mon Sep 17 00:00:00 2001
From: Dave Cramer <davecramer@gmail.com>
Date: Tue, 24 Jul 2018 11:30:57 -0400
Subject: [PATCH 4/4] Add test for pg_recvlogical to stop replication

---
 src/test/recovery/t/0016_pg_recvlogical.pl | 226 +++++++++++++++++++++++++++++
 1 file changed, 226 insertions(+)
 create mode 100644 src/test/recovery/t/0016_pg_recvlogical.pl

diff --git a/src/test/recovery/t/0016_pg_recvlogical.pl b/src/test/recovery/t/0016_pg_recvlogical.pl
new file mode 100644
index 0000000..1dd41df
--- /dev/null
+++ b/src/test/recovery/t/0016_pg_recvlogical.pl
@@ -0,0 +1,226 @@
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 8;
+use IPC::Run qw( start timeout ) ;
+
+
+my $verbose = $ENV{PG_TAP_VERBOSE};
+
+# Launch pg_recvlogical as a background proc and return the IPC::Run handle for it
+# as well as the proc's
+sub start_pg_recvlogical
+{
+    my ($node, $slotname, %params) = @_;
+    my $stdout = my $stderr = '';
+    my $timeout           = undef;
+    my $timeout_exception = 'pg_recvlogical timed out';
+
+    $timeout =
+        IPC::Run::timeout($params{timeout}, exception => $timeout_exception)
+        if (defined($params{timeout}));
+
+    my @cmd = ("pg_recvlogical", "--verbose", "-S", "$slotname", "--no-loop", "--dbname", $node->connstr, "--start", "-f", "-");
+
+    push @cmd, @{ $params{option} }
+        if defined $params{option};
+
+    diag "Running '@cmd'" if $verbose;
+
+    my $proc = start \@cmd, '<', \undef, '2>', \$stderr, '>', \$stdout, $timeout;
+
+    die $! unless defined($proc);
+
+    sleep 5;
+
+    if ($stdout ne "")
+    {
+        diag "#### Begin standard out\n" if $verbose;
+        diag $stdout if $verbose;
+        diag "\n#### End standard out\n" if $verbose;
+    }
+
+    if ($stderr ne "")
+    {
+        diag "#### Begin standard error\n" if $verbose;
+        diag $stderr if $verbose;
+        diag "\n#### End standard error\n" if $verbose;
+    }
+
+    if (wantarray)
+    {
+        return ($proc, \$stdout, \$stderr, $timeout);
+    }
+    else
+    {
+        return $proc;
+    }
+}
+
+sub wait_for_start_streaming
+{
+    my ($node, $slotname) = @_;
+
+    diag "waiting for " . $node->name . " start streaming by slot ".$slotname if $verbose;
+    $node->poll_query_until('postgres', "select active from pg_replication_slots where slot_name = '$slotname';");
+}
+
+sub wait_for_stop_streaming
+{
+    my ($node, $slotname) = @_;
+
+    diag "waiting for " . $node->name . " streaming by slot ".$slotname." will be stopped" if $verbose;
+    $node->poll_query_until('postgres', "select not(active) from pg_replication_slots where slot_name = '$slotname';");
+}
+
+sub create_logical_replication_slot
+{
+    my ($node, $slotname, $outplugin) = @_;
+
+    $node->safe_psql(
+        "postgres",
+        "select pg_drop_replication_slot('$slotname') where exists (select 1 from pg_replication_slots where slot_name = '$slotname');");
+
+    $node->safe_psql('postgres',
+        "SELECT pg_create_logical_replication_slot('$slotname', '$outplugin');"
+    );
+}
+
+my ($proc);
+
+# Initialize master node
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1, has_archiving => 1);
+$node_master->append_conf('postgresql.conf', "wal_level = 'logical'\n");
+$node_master->append_conf('postgresql.conf', "max_replication_slots = 12\n");
+$node_master->append_conf('postgresql.conf', "max_wal_senders = 12\n");
+$node_master->append_conf('postgresql.conf', "max_connections = 20\n");
+$node_master->dump_info;
+$node_master->start;
+
+
+#TestCase 1: client initialize stop logical replication when database doesn't have new changes(calm state)
+{
+    my $slotname = 'calm_state_slot';
+
+    create_logical_replication_slot($node_master, $slotname, "test_decoding");
+
+    my ($stdout, $stderr, $timeout);
+    ($proc, $stdout, $stderr, $timeout) = start_pg_recvlogical(
+        $node_master,
+        $slotname,
+        timeout => 60,
+        extra_params => ['-o include-xids=false', '-o skip-empty-xacts=true']
+    );
+
+    wait_for_start_streaming($node_master, $slotname);
+
+    my $cancelTime = time();
+    $proc->signal("SIGINT");
+
+    $proc->pump while $proc->pumpable;
+
+    wait_for_stop_streaming($node_master, $slotname);
+
+    my $spendTime = time() - $cancelTime;
+
+    my $timed_out = 0;
+    eval {
+        $proc->finish;
+    };
+    if ($@)
+    {
+        my $x = $@;
+        if ($timeout->is_expired)
+        {
+            diag "whoops, pg_recvlogical timed out" if $verbose;
+            $timed_out = 1;
+        }
+        else
+        {
+            die $x;
+        }
+    }
+
+    if ($verbose)
+    {
+        diag "#--- pg_recvlogical stderr ---";
+        diag $$stderr;
+        diag "#--- end stderr ---";
+    }
+
+    ok(!$timed_out, "pg_recvlogical exited before timeout when idle");
+    like($$stderr, qr/stopping write up to/, 'pg_recvlogical responded to sigint when idle');
+    like($$stderr, qr/streaming ended by user request/, 'idle wait ended due to client copydone');
+    diag "decoding when idle stopped after ${spendTime}s";
+    ok((time() - $cancelTime) <= 3, # allow extra time for slow machines
+        "pg_recvlogical exited promptly on sigint when idle"
+    );
+}
+
+
+#TestCase 2: client initialize stop logical replication during decode huge transaction(insert 200000 records)
+{
+    my $slotname = 'huge_tx_state_slot';
+
+    create_logical_replication_slot($node_master, $slotname, "test_decoding");
+
+    $node_master->safe_psql('postgres',
+        "create table test_logic_table(pk serial primary key, name varchar(100));");
+
+    diag 'Insert huge amount of data to table test_logic_table' if $verbose;
+    $node_master->safe_psql('postgres',
+        "insert into test_logic_table select id, md5(random()::text) as name from generate_series(1, 200000) as id;");
+
+    my ($stdout, $stderr, $timeout);
+    ($proc, $stdout, $stderr, $timeout) = start_pg_recvlogical(
+        $node_master,
+        $slotname,
+        timeout => 60,
+        extra_params => ['-o include-xids=false', '-o skip-empty-xacts=true']
+    );
+
+    wait_for_start_streaming($node_master, $slotname);
+
+    my $cancelTime = time();
+    $proc->signal("SIGINT");
+
+    $proc->pump while $proc->pumpable;
+
+    wait_for_stop_streaming($node_master, $slotname);
+
+    my $spendTime = time() - $cancelTime;
+
+    my $timed_out = 0;
+    eval {
+        $proc->finish;
+    };
+    if ($@)
+    {
+        my $x = $@;
+        if ($timeout->is_expired)
+        {
+            diag "whoops, pg_recvlogical timed out" if $verbose;
+            $timed_out = 1;
+        }
+        else
+        {
+            die $x;
+        }
+    }
+
+    if ($verbose)
+    {
+        diag "#--- pg_recvlogical stderr ---";
+        diag $$stderr;
+        diag "#--- end stderr ---";
+    }
+
+    ok(!$timed_out, "pg_recvlogical exited before timeout when streaming");
+    like($$stderr, qr/stopping write up to/, 'pg_recvlogical responded to sigint when streaming');
+    like($$stderr, qr/streaming ended by user request/, 'streaming ended due to client copydone');
+    diag "decoding of big xact stopped after ${spendTime}s";
+    ok($spendTime <= 5, # allow extra time for slow machines
+        "pg_recvlogical exited promptly on signal when decoding");
+}
-- 
2.6.4

0003-Add-ability-for-pg_recvlogical-to-stop-replication-f.patchapplication/octet-stream; name=0003-Add-ability-for-pg_recvlogical-to-stop-replication-f.patchDownload
From 0fcfbc84a0a81d5220bbf904fd2d021f1963df2b Mon Sep 17 00:00:00 2001
From: Dave Cramer <davecramer@gmail.com>
Date: Tue, 24 Jul 2018 11:24:44 -0400
Subject: [PATCH 3/4] Add ability for pg_recvlogical to stop replication from 
 client side

---
 src/bin/pg_basebackup/pg_recvlogical.c | 490 +++++++++++++++++++--------------
 1 file changed, 290 insertions(+), 200 deletions(-)

diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c
index a242e0b..2d69aa7 100644
--- a/src/bin/pg_basebackup/pg_recvlogical.c
+++ b/src/bin/pg_basebackup/pg_recvlogical.c
@@ -56,8 +56,12 @@ static const char *plugin = "test_decoding";
 /* Global State */
 static int	outfd = -1;
 static volatile sig_atomic_t time_to_abort = false;
+static volatile sig_atomic_t force_time_to_abort = false;
 static volatile sig_atomic_t output_reopen = false;
+static bool copyDoneSent;
+static bool copyDoneReceived;
 static bool output_isfile;
+static int64 last_status_time;
 static TimestampTz output_last_fsync = -1;
 static bool output_needs_fsync = false;
 static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
@@ -205,6 +209,222 @@ OutputFsync(TimestampTz now)
 	return true;
 }
 
+static bool
+ProcessKeepalive(PGconn *conn, char *msgBuf, int msgLength)
+{
+	int			pos;
+	bool		replyRequested;
+	XLogRecPtr	walEnd;
+
+	/*
+	 * Parse the keepalive message, enclosed in the CopyData message.
+	 * We just check if the server requested a reply, and ignore the
+	 * rest.
+	 */
+	pos = 1;			/* skip msgtype 'k' */
+
+	/* read walEnd */
+	walEnd = fe_recvint64(&msgBuf[pos]);
+	output_written_lsn = Max(walEnd, output_written_lsn);
+
+	pos += 8;			/* skip sendTime */
+
+	if (msgLength < pos + 1)
+	{
+		fprintf(stderr, _("%s: streaming header too small: %d\n"),
+				progname, msgLength);
+		return -1;
+	}
+	replyRequested = msgBuf[pos];
+
+	/* If the server requested an immediate reply, send one. */
+	if (replyRequested)
+	{
+		int64 now = feGetCurrentTimestamp();
+
+		/* fsync data, so we send a recent flush pointer */
+		if (!OutputFsync(now))
+		{
+			return false;
+		}
+
+		if (!sendFeedback(conn, now, true, false))
+		{
+			return false;
+		}
+		last_status_time = now;
+	}
+
+	return true;
+}
+
+static bool
+ProcessXLogData(PGconn *conn, char *msgBuf, int msgLength)
+{
+	int bytes_left;
+	int bytes_written;
+
+	/*
+	 * Read the header of the XLogData message, enclosed in the CopyData
+	 * message. We only need the WAL location field (dataStart), the rest
+	 * of the header is ignored.
+	 */
+	int hdr_len = 1;			/* msgtype 'w' */
+	hdr_len += 8;			/* dataStart */
+	hdr_len += 8;			/* walEnd */
+	hdr_len += 8;			/* sendTime */
+	if (msgLength < hdr_len + 1)
+	{
+		fprintf(stderr, _("%s: streaming header too small: %d\n"),
+				progname, msgLength);
+		return false;
+	}
+
+	if (time_to_abort && copyDoneSent)
+	{
+		/*
+		 * We've sent feedback and sent CopyDone, so we are now discarding
+		 * xlog data input to find the server's reply CopyDone. That way when
+		 * another client connects to the slot later they start replay exactly
+		 * where we left off - or at least at the last commit we flushed to
+		 * disk. This is not an error condition.
+		 */
+		return true;
+	}
+
+	/* Extract WAL location for this block */
+	{
+		XLogRecPtr	temp = fe_recvint64(&msgBuf[1]);
+
+		output_written_lsn = Max(temp, output_written_lsn);
+	}
+
+	bytes_left = msgLength - hdr_len;
+	bytes_written = 0;
+
+	/* signal that a fsync is needed */
+	output_needs_fsync = true;
+
+	while (bytes_left)
+	{
+		int			ret;
+
+		ret = write(outfd,
+					msgBuf + hdr_len + bytes_written,
+					bytes_left);
+
+		if (ret < 0)
+		{
+			fprintf(stderr,
+			  _("%s: could not write %u bytes to log file \"%s\": %s\n"),
+					progname, bytes_left, outfile,
+					strerror(errno));
+			return false;
+		}
+
+		/* Write was successful, advance our position */
+		bytes_written += ret;
+		bytes_left -= ret;
+	}
+
+	if (write(outfd, "\n", 1) != 1)
+	{
+		fprintf(stderr,
+			  _("%s: could not write %u bytes to log file \"%s\": %s\n"),
+				progname, 1, outfile,
+				strerror(errno));
+		return false;
+	}
+
+	return true;
+}
+
+static bool
+ProcessReceiveMsg(PGconn *conn, unsigned char type, char *msgBuf, int msgLength)
+{
+	bool success = false;
+	switch (type)
+	{
+		case 'k':
+			success = ProcessKeepalive(conn, msgBuf, msgLength);
+			break;
+		case 'w':
+			success = ProcessXLogData(conn, msgBuf, msgLength);
+			break;
+		default:
+			fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
+					progname, type);
+	}
+
+	return success;
+}
+
+/*
+ * Sync wait activity on socket. Waiting can be interrupt by fsync or keepalive timeout.
+ * Returns the number of ready descriptors, or -1 for errors.
+ */
+static int
+WaitSocketActivity(PGconn *conn, int64 now)
+{
+	/*
+	 * In async mode, and no data available. We block on reading but
+	 * not more than the specified timeout, so that we can send a
+	 * response back to the client.
+	 */
+	fd_set		input_mask;
+	int64		message_target = 0;
+	int64		fsync_target = 0;
+	struct timeval timeout;
+	struct timeval *timeoutptr = NULL;
+
+	if (PQsocket(conn) < 0)
+	{
+		fprintf(stderr,
+				_("%s: invalid socket: %s"),
+				progname, PQerrorMessage(conn));
+		return -1;
+	}
+
+	FD_ZERO(&input_mask);
+	FD_SET(PQsocket(conn), &input_mask);
+
+	/* Compute when we need to wakeup to send a keepalive message. */
+	if (standby_message_timeout)
+		message_target = last_status_time + (standby_message_timeout - 1) *
+			((int64) 1000);
+
+	/* Compute when we need to wakeup to fsync the output file. */
+	if (fsync_interval > 0 && output_needs_fsync)
+		fsync_target = output_last_fsync + (fsync_interval - 1) *
+			((int64) 1000);
+
+	/* Now compute when to wakeup. */
+	if (message_target > 0 || fsync_target > 0)
+	{
+		int64		targettime;
+		long		secs;
+		int			usecs;
+
+		targettime = message_target;
+
+		if (fsync_target > 0 && fsync_target < targettime)
+			targettime = fsync_target;
+
+		feTimestampDifference(now,
+							  targettime,
+							  &secs,
+							  &usecs);
+		if (secs <= 0)
+			timeout.tv_sec = 1; /* Always sleep at least 1 sec */
+		else
+			timeout.tv_sec = secs;
+		timeout.tv_usec = usecs;
+		timeoutptr = &timeout;
+	}
+
+	return select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
+}
+
 /*
  * Start the log streaming
  */
@@ -213,13 +433,14 @@ StreamLogicalLog(void)
 {
 	PGresult   *res;
 	char	   *copybuf = NULL;
-	TimestampTz last_status = -1;
 	int			i;
 	PQExpBuffer query;
 
 	output_written_lsn = InvalidXLogRecPtr;
 	output_fsync_lsn = InvalidXLogRecPtr;
-
+	last_status_time = -1;
+	copyDoneReceived = false;
+	copyDoneSent = false;
 	query = createPQExpBuffer();
 
 	/*
@@ -281,13 +502,10 @@ StreamLogicalLog(void)
 				_("%s: streaming initiated\n"),
 				progname);
 
-	while (!time_to_abort)
+	while (!force_time_to_abort)
 	{
 		int			r;
-		int			bytes_left;
-		int			bytes_written;
 		TimestampTz now;
-		int			hdr_len;
 		XLogRecPtr	cur_record_lsn = InvalidXLogRecPtr;
 
 		if (copybuf != NULL)
@@ -309,15 +527,50 @@ StreamLogicalLog(void)
 				goto error;
 		}
 
-		if (standby_message_timeout > 0 &&
-			feTimestampDifferenceExceeds(last_status, now,
+		if (standby_message_timeout > 0 && !time_to_abort &&
+			feTimestampDifferenceExceeds(last_status_time, now,
 										 standby_message_timeout))
 		{
 			/* Time to send feedback! */
 			if (!sendFeedback(conn, now, true, false))
 				goto error;
 
-			last_status = now;
+			last_status_time = now;
+		}
+
+		if (time_to_abort && !copyDoneSent)
+		{
+			if (verbose)
+			{
+				fprintf(stderr,
+						_("%s: stopping write up to %X/%X, flush to %X/%X (slot %s)\n"),
+						progname,
+						(uint32) (output_written_lsn >> 32), (uint32) output_written_lsn,
+						(uint32) (output_fsync_lsn >> 32), (uint32) output_fsync_lsn,
+						replication_slot);
+			}
+
+			/*
+			 * Force fsync and send feedback just before we send CopyDone to
+			 * make sure the server knows exactly what we replayed up to. We'll
+			 * discard data received after we request the end of COPY BOTH mode
+			 * so we know we've written everything we're going to.
+			 */
+			if (!OutputFsync(now))
+				goto error;
+
+			if (!sendFeedback(conn, now, true, false))
+				goto error;
+
+			last_status_time = now;
+
+			if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
+			{
+				fprintf(stderr, _("%s: could not send copy-end packet: %s"),
+						progname, PQerrorMessage(conn));
+				goto error;
+			}
+			copyDoneSent = true;
 		}
 
 		/* got SIGHUP, close output file */
@@ -360,64 +613,9 @@ StreamLogicalLog(void)
 		r = PQgetCopyData(conn, &copybuf, 1);
 		if (r == 0)
 		{
-			/*
-			 * In async mode, and no data available. We block on reading but
-			 * not more than the specified timeout, so that we can send a
-			 * response back to the client.
-			 */
-			fd_set		input_mask;
-			TimestampTz message_target = 0;
-			TimestampTz fsync_target = 0;
-			struct timeval timeout;
-			struct timeval *timeoutptr = NULL;
-
-			if (PQsocket(conn) < 0)
-			{
-				fprintf(stderr,
-						_("%s: invalid socket: %s"),
-						progname, PQerrorMessage(conn));
-				goto error;
-			}
-
-			FD_ZERO(&input_mask);
-			FD_SET(PQsocket(conn), &input_mask);
-
-			/* Compute when we need to wakeup to send a keepalive message. */
-			if (standby_message_timeout)
-				message_target = last_status + (standby_message_timeout - 1) *
-					((int64) 1000);
-
-			/* Compute when we need to wakeup to fsync the output file. */
-			if (fsync_interval > 0 && output_needs_fsync)
-				fsync_target = output_last_fsync + (fsync_interval - 1) *
-					((int64) 1000);
-
-			/* Now compute when to wakeup. */
-			if (message_target > 0 || fsync_target > 0)
-			{
-				TimestampTz targettime;
-				long		secs;
-				int			usecs;
-
-				targettime = message_target;
-
-				if (fsync_target > 0 && fsync_target < targettime)
-					targettime = fsync_target;
-
-				feTimestampDifference(now,
-									  targettime,
-									  &secs,
-									  &usecs);
-				if (secs <= 0)
-					timeout.tv_sec = 1; /* Always sleep at least 1 sec */
-				else
-					timeout.tv_sec = secs;
-				timeout.tv_usec = usecs;
-				timeoutptr = &timeout;
-			}
-
-			r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
-			if (r == 0 || (r < 0 && errno == EINTR))
+ 			int readyMsg = WaitSocketActivity(conn, now);
+  
+ 			if (readyMsg == 0 || (readyMsg < 0 && errno == EINTR))
 			{
 				/*
 				 * Got a timeout or signal. Continue the loop and either
@@ -426,7 +624,7 @@ StreamLogicalLog(void)
 				 */
 				continue;
 			}
-			else if (r < 0)
+			else if (readyMsg < 0)
 			{
 				fprintf(stderr, _("%s: select() failed: %s\n"),
 						progname, strerror(errno));
@@ -441,12 +639,26 @@ StreamLogicalLog(void)
 						progname, PQerrorMessage(conn));
 				goto error;
 			}
+
 			continue;
 		}
 
-		/* End of copy stream */
+		/*
+		 * End of copy stream (server sent CopyDone)
+		 *
+		 * This is where we exit on normal time_to_abort because our own
+		 * CopyDone caused the server to shut down streaming on its end.
+		 */
 		if (r == -1)
+		{
+			copyDoneReceived = true;
+			if (verbose && time_to_abort && copyDoneSent)
+			{
+				fprintf(stderr,
+						_("%s: streaming ended by user request"), progname);
+			}
 			break;
+		}
 
 		/* Failure while reading the copy stream */
 		if (r == -2)
@@ -456,138 +668,8 @@ StreamLogicalLog(void)
 			goto error;
 		}
 
-		/* Check the message type. */
-		if (copybuf[0] == 'k')
-		{
-			int			pos;
-			bool		replyRequested;
-			XLogRecPtr	walEnd;
-			bool		endposReached = false;
-
-			/*
-			 * Parse the keepalive message, enclosed in the CopyData message.
-			 * We just check if the server requested a reply, and ignore the
-			 * rest.
-			 */
-			pos = 1;			/* skip msgtype 'k' */
-			walEnd = fe_recvint64(&copybuf[pos]);
-			output_written_lsn = Max(walEnd, output_written_lsn);
-
-			pos += 8;			/* read walEnd */
-
-			pos += 8;			/* skip sendTime */
-
-			if (r < pos + 1)
-			{
-				fprintf(stderr, _("%s: streaming header too small: %d\n"),
-						progname, r);
-				goto error;
-			}
-			replyRequested = copybuf[pos];
-
-			if (endpos != InvalidXLogRecPtr && walEnd >= endpos)
-			{
-				/*
-				 * If there's nothing to read on the socket until a keepalive
-				 * we know that the server has nothing to send us; and if
-				 * walEnd has passed endpos, we know nothing else can have
-				 * committed before endpos.  So we can bail out now.
-				 */
-				endposReached = true;
-			}
-
-			/* Send a reply, if necessary */
-			if (replyRequested || endposReached)
-			{
-				if (!flushAndSendFeedback(conn, &now))
-					goto error;
-				last_status = now;
-			}
-
-			if (endposReached)
-			{
-				prepareToTerminate(conn, endpos, true, InvalidXLogRecPtr);
-				time_to_abort = true;
-				break;
-			}
-
-			continue;
-		}
-		else if (copybuf[0] != 'w')
-		{
-			fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
-					progname, copybuf[0]);
-			goto error;
-		}
-
-		/*
-		 * Read the header of the XLogData message, enclosed in the CopyData
-		 * message. We only need the WAL location field (dataStart), the rest
-		 * of the header is ignored.
-		 */
-		hdr_len = 1;			/* msgtype 'w' */
-		hdr_len += 8;			/* dataStart */
-		hdr_len += 8;			/* walEnd */
-		hdr_len += 8;			/* sendTime */
-		if (r < hdr_len + 1)
+		if(!ProcessReceiveMsg(conn, copybuf[0], copybuf, r))
 		{
-			fprintf(stderr, _("%s: streaming header too small: %d\n"),
-					progname, r);
-			goto error;
-		}
-
-		/* Extract WAL location for this block */
-		cur_record_lsn = fe_recvint64(&copybuf[1]);
-
-		if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos)
-		{
-			/*
-			 * We've read past our endpoint, so prepare to go away being
-			 * cautious about what happens to our output data.
-			 */
-			if (!flushAndSendFeedback(conn, &now))
-				goto error;
-			prepareToTerminate(conn, endpos, false, cur_record_lsn);
-			time_to_abort = true;
-			break;
-		}
-
-		output_written_lsn = Max(cur_record_lsn, output_written_lsn);
-
-		bytes_left = r - hdr_len;
-		bytes_written = 0;
-
-		/* signal that a fsync is needed */
-		output_needs_fsync = true;
-
-		while (bytes_left)
-		{
-			int			ret;
-
-			ret = write(outfd,
-						copybuf + hdr_len + bytes_written,
-						bytes_left);
-
-			if (ret < 0)
-			{
-				fprintf(stderr,
-						_("%s: could not write %u bytes to log file \"%s\": %s\n"),
-						progname, bytes_left, outfile,
-						strerror(errno));
-				goto error;
-			}
-
-			/* Write was successful, advance our position */
-			bytes_written += ret;
-			bytes_left -= ret;
-		}
-
-		if (write(outfd, "\n", 1) != 1)
-		{
-			fprintf(stderr,
-					_("%s: could not write %u bytes to log file \"%s\": %s\n"),
-					progname, 1, outfile,
-					strerror(errno));
 			goto error;
 		}
 
@@ -656,6 +738,14 @@ error:
 static void
 sigint_handler(int signum)
 {
+	/*
+	 * Backward compatible, allow force interrupt logical replication
+	 * after second SIGINT without wait CopyDone from server
+	 */
+	if (time_to_abort)
+	{
+		force_time_to_abort = true;
+	}
 	time_to_abort = true;
 }
 
-- 
2.6.4

0002-Client-initiated-CopyDone-during-transaction-decodin.patchapplication/octet-stream; name=0002-Client-initiated-CopyDone-during-transaction-decodin.patchDownload
From 099a56985968be9a18f46dce66b4ceada571183d Mon Sep 17 00:00:00 2001
From: Dave Cramer <davecramer@gmail.com>
Date: Tue, 24 Jul 2018 10:03:34 -0400
Subject: [PATCH 2/4] Client-initiated CopyDone during transaction decoding in 
 walsender

---
 src/backend/replication/logical/logical.c       | 14 ++++++---
 src/backend/replication/logical/logicalfuncs.c  |  2 +-
 src/backend/replication/logical/reorderbuffer.c | 11 +++++--
 src/backend/replication/slotfuncs.c             |  4 +--
 src/backend/replication/walsender.c             | 42 ++++++++++++++++---------
 src/include/replication/logical.h               |  6 ++--
 src/include/replication/reorderbuffer.h         | 13 ++++++++
 7 files changed, 65 insertions(+), 27 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 3cd4eef..2dc02dc 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -121,7 +121,8 @@ StartupDecodingContext(List *output_plugin_options,
 					   XLogPageReadCB read_page,
 					   LogicalOutputPluginWriterPrepareWrite prepare_write,
 					   LogicalOutputPluginWriterWrite do_write,
-					   LogicalOutputPluginWriterUpdateProgress update_progress)
+					   LogicalOutputPluginWriterUpdateProgress update_progress,
+					   ContinueDecodingCB continue_decoding_cb)
 {
 	ReplicationSlot *slot;
 	MemoryContext context,
@@ -188,6 +189,7 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder->apply_truncate = truncate_cb_wrapper;
 	ctx->reorder->commit = commit_cb_wrapper;
 	ctx->reorder->message = message_cb_wrapper;
+	ctx->reorder->continue_decoding_cb = continue_decoding_cb;
 
 	ctx->out = makeStringInfo();
 	ctx->prepare_write = prepare_write;
@@ -226,7 +228,8 @@ CreateInitDecodingContext(char *plugin,
 						  XLogPageReadCB read_page,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
 						  LogicalOutputPluginWriterWrite do_write,
-						  LogicalOutputPluginWriterUpdateProgress update_progress)
+						  LogicalOutputPluginWriterUpdateProgress update_progress,
+						  ContinueDecodingCB continue_decoding_cb)
 {
 	TransactionId xmin_horizon = InvalidTransactionId;
 	ReplicationSlot *slot;
@@ -314,7 +317,7 @@ CreateInitDecodingContext(char *plugin,
 	ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
 								 need_full_snapshot, true,
 								 read_page, prepare_write, do_write,
-								 update_progress);
+								 update_progress, continue_decoding_cb);
 
 	/* call output plugin initialization callback */
 	old_context = MemoryContextSwitchTo(ctx->context);
@@ -361,7 +364,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 					  XLogPageReadCB read_page,
 					  LogicalOutputPluginWriterPrepareWrite prepare_write,
 					  LogicalOutputPluginWriterWrite do_write,
-					  LogicalOutputPluginWriterUpdateProgress update_progress)
+					  LogicalOutputPluginWriterUpdateProgress update_progress,
+					  ContinueDecodingCB continue_decoding_cb)
 {
 	LogicalDecodingContext *ctx;
 	ReplicationSlot *slot;
@@ -412,7 +416,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 	ctx = StartupDecodingContext(output_plugin_options,
 								 start_lsn, InvalidTransactionId, false,
 								 fast_forward, read_page, prepare_write,
-								 do_write, update_progress);
+								 do_write, update_progress, continue_decoding_cb);
 
 	/* call output plugin initialization callback */
 	old_context = MemoryContextSwitchTo(ctx->context);
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 45aae71..3d8e61d 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -254,7 +254,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 									false,
 									logical_read_local_xlog_page,
 									LogicalOutputPrepareWrite,
-									LogicalOutputWrite, NULL);
+									LogicalOutputWrite, NULL, NULL);
 
 		MemoryContextSwitchTo(oldcontext);
 
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 9b55b94..aa024fa 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1460,7 +1460,9 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 		rb->begin(rb, txn);
 
 		iterstate = ReorderBufferIterTXNInit(rb, txn);
-		while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
+		while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL &&
+			   (rb->continue_decoding_cb == NULL ||
+				rb->continue_decoding_cb()))
 		{
 			Relation	relation = NULL;
 			Oid			reloid;
@@ -1727,8 +1729,11 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 		ReorderBufferIterTXNFinish(rb, iterstate);
 		iterstate = NULL;
 
-		/* call commit callback */
-		rb->commit(rb, txn, commit_lsn);
+		if (rb->continue_decoding_cb == NULL || rb->continue_decoding_cb())
+		{
+			/* call commit callback */
+			rb->commit(rb, txn, commit_lsn);
+		}
 
 		/* this is just a sanity check against bad output plugin behaviour */
 		if (GetCurrentTransactionIdIfAny() != InvalidTransactionId)
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 8782bad..82e8f37 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -137,7 +137,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 	ctx = CreateInitDecodingContext(NameStr(*plugin), NIL,
 									false,	/* do not build snapshot */
 									logical_read_local_xlog_page, NULL, NULL,
-									NULL);
+									NULL, NULL);
 
 	/* build initial snapshot, might take a while */
 	DecodingContextFindStartpoint(ctx);
@@ -370,7 +370,7 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
 									NIL,
 									true,	/* fast_forward */
 									logical_read_local_xlog_page,
-									NULL, NULL, NULL);
+									NULL, NULL, NULL, NULL);
 
 		/*
 		 * Start reading at the slot's restart_lsn, which we know to point to
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index f624048..23ab992 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -253,6 +253,23 @@ static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
 
 static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
 
+/*
+ * Return true until either the client or server side have requested that we wind
+ * up COPY BOTH mode by sending a CopyDone.
+ *
+ * If we receive a CopyDone from the client we should avoid sending any further
+ * CopyData messages and return to command mode as promptly as possible.
+ *
+ * While in the middle of sending data to a client we notice a client-initated
+ * CopyDone when WalSndWriteData() calls ProcessRepliesIfAny() and it
+ * sets streamingDoneSending.
+ */
+static
+bool IsStreamingActive(void)
+{
+	return !streamingDoneReceiving && !streamingDoneSending;
+}
+
 
 /* Initialize walsender process before entering the main command loop */
 void
@@ -935,7 +952,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 		ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
 										logical_read_xlog_page,
 										WalSndPrepareWrite, WalSndWriteData,
-										WalSndUpdateProgress);
+										WalSndUpdateProgress, IsStreamingActive);
 
 		/*
 		 * Signal that we don't need the timeout mechanism. We're just
@@ -1094,7 +1111,8 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 												 logical_read_xlog_page,
 												 WalSndPrepareWrite,
 												 WalSndWriteData,
-												 WalSndUpdateProgress);
+												 WalSndUpdateProgress,
+												 IsStreamingActive);
 
 	/* Start reading WAL from the oldest required WAL. */
 	logical_startptr = MyReplicationSlot->data.restart_lsn;
@@ -1186,17 +1204,6 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 
 	CHECK_FOR_INTERRUPTS();
 
-	/* Try to flush pending output to the client */
-	if (pq_flush_if_writable() != 0)
-		WalSndShutdown();
-
-	/* Try taking fast path unless we get too close to walsender timeout. */
-	if (now < TimestampTzPlusMilliseconds(last_reply_timestamp,
-										  wal_sender_timeout / 2) &&
-		!pq_is_send_pending())
-	{
-		return;
-	}
 
 	/* If we have pending write here, go to slow path */
 	for (;;)
@@ -1367,7 +1374,14 @@ WalSndWaitForWal(XLogRecPtr loc)
 			break;
 
 		/*
-		 * We only send regular messages to the client for full decoded
+		 * If we have received CopyDone from the client, sent CopyDone
+		 * ourselves, it's time to exit streaming.
+		 */
+		if (!IsStreamingActive()) {
+			break;
+		}
+
+		/* We only send regular messages to the client for full decoded
 		 * transactions, but a synchronous replication and walsender shutdown
 		 * possibly are waiting for a later location. So we send pings
 		 * containing the flush location every now and then.
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index c25ac1f..4822bfc 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -100,7 +100,8 @@ extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
 						  XLogPageReadCB read_page,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
 						  LogicalOutputPluginWriterWrite do_write,
-						  LogicalOutputPluginWriterUpdateProgress update_progress);
+						  LogicalOutputPluginWriterUpdateProgress update_progress,
+						  ContinueDecodingCB continue_decoding_cb);
 extern LogicalDecodingContext *CreateDecodingContext(
 					  XLogRecPtr start_lsn,
 					  List *output_plugin_options,
@@ -108,7 +109,8 @@ extern LogicalDecodingContext *CreateDecodingContext(
 					  XLogPageReadCB read_page,
 					  LogicalOutputPluginWriterPrepareWrite prepare_write,
 					  LogicalOutputPluginWriterWrite do_write,
-					  LogicalOutputPluginWriterUpdateProgress update_progress);
+					  LogicalOutputPluginWriterUpdateProgress update_progress,
+					  ContinueDecodingCB continue_decoding_cb);
 extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx);
 extern bool DecodingContextReady(LogicalDecodingContext *ctx);
 extern void FreeDecodingContext(LogicalDecodingContext *ctx);
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 1f52f6b..0a14c81 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -326,6 +326,14 @@ typedef void (*ReorderBufferMessageCB) (
 										const char *prefix, Size sz,
 										const char *message);
 
+
+/*
+ * Callback function that allow interrupt logical replication during decoding.
+ * Function return true if decoding can be continue decode, but if function return false
+ * logical decoding will stop as soon as possible.
+ */
+typedef bool (*ContinueDecodingCB) (void);
+
 struct ReorderBuffer
 {
 	/*
@@ -365,6 +373,11 @@ struct ReorderBuffer
 	ReorderBufferMessageCB message;
 
 	/*
+	 * Callback to define status of decoding. Return false if decoding not necessary continue
+	 */
+	ContinueDecodingCB continue_decoding_cb;
+
+	/*
 	 * Pointer that will be passed untouched to the callbacks.
 	 */
 	void	   *private_data;
-- 
2.6.4

0001-Respect-client-initiated-CopyDone-in-walsender.patchapplication/octet-stream; name=0001-Respect-client-initiated-CopyDone-in-walsender.patchDownload
From dfcc30566b53c378f03dff907d2c2a73c992f0f8 Mon Sep 17 00:00:00 2001
From: Dave Cramer <davecramer@gmail.com>
Date: Mon, 23 Jul 2018 16:34:20 -0400
Subject: [PATCH 1/4] Respect client-initiated CopyDone in walsender

---
 src/backend/replication/walsender.c | 36 ++++++++++++++++++++++++++++++------
 1 file changed, 30 insertions(+), 6 deletions(-)

diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index d60026d..f624048 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -765,6 +765,14 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 	sendTimeLineValidUpto = state->currTLIValidUntil;
 	sendTimeLineNextTLI = state->nextTLI;
 
+	/*
+	* If the client sent CopyDone while we were waiting,
+	* bail out so we can wind up the decoding session.
+	*/
+	if (streamingDoneSending)
+		return -1;
+
+	 /* more than one block available */
 	/* make sure we have enough WAL available */
 	flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
 
@@ -1350,8 +1358,12 @@ WalSndWaitForWal(XLogRecPtr loc)
 		 * It's important to do this check after the recomputation of
 		 * RecentFlushPtr, so we can send all remaining data before shutting
 		 * down.
-		 */
-		if (got_STOPPING)
+		 *
+		 * We'll also exit here if the client sent CopyDone because it wants
+		 * to return to command mode.
+		*/
+
+		if (got_STOPPING || streamingDoneReceiving)
 			break;
 
 		/*
@@ -2096,7 +2108,14 @@ WalSndCheckTimeOut(TimestampTz now)
 	}
 }
 
-/* Main loop of walsender process that streams the WAL over Copy messages. */
+/*
+ * Main loop of walsender process that streams the WAL over Copy messages.
+ *
+ * The send_data callback must enqueue complete CopyData messages to libpq
+ * using pq_putmessage_noblock or similar, since the walsender loop may send
+ * CopyDone then exit and return to command mode in response to a client
+ * CopyDone between calls to send_data.
+ */
 static void
 WalSndLoop(WalSndSendDataCallback send_data)
 {
@@ -2152,10 +2171,15 @@ WalSndLoop(WalSndSendDataCallback send_data)
 		 * some more.  If there is some, we don't bother to call send_data
 		 * again until we've flushed it ... but we'd better assume we are not
 		 * caught up.
+		 *
+		 * If we're trying to finish sending and exit we shouldn't enqueue more
+		 * data to libpq. We need to finish writing out whatever we already
+		 * have in libpq's send buffer to maintain protocol sync so we still
+		 * need to loop until it's flushed.
 		 */
-		if (!pq_is_send_pending())
+		if (!pq_is_send_pending() && !streamingDoneSending)
 			send_data();
-		else
+		else if (!streamingDoneSending)
 			WalSndCaughtUp = false;
 
 		/* Try to flush pending output to the client */
@@ -3387,7 +3411,7 @@ WalSndKeepaliveIfNecessary(TimestampTz now)
 	if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
 		return;
 
-	if (waiting_for_ping_response)
+	if (waiting_for_ping_response || streamingDoneSending)
 		return;
 
 	/*
-- 
2.6.4

#2Dmitry Dolgov
9erthalion6@gmail.com
In reply to: Dave Cramer (#1)
Re: Reviving the "Stopping logical replication protocol" patch from Vladimir Gordichuk

On Tue, Jul 24, 2018 at 5:52 PM Dave Cramer <davecramer@gmail.com> wrote:

Back in 2016 a patch was proposed that seems to have died on the vine. See /messages/by-id/CAFgjRd3hdYOa33m69TbeOfNNer2BZbwa8FFjt2V5VFzTBvUU3w@mail.gmail.com
for the history and https://commitfest.postgresql.org/10/621/ for the commitfest entry.
I've rebased the patches and attached them for consideration.
JDBC tests here https://github.com/pgjdbc/pgjdbc/blob/master/pgjdbc/src/test/java/org/postgresql/replication/LogicalReplicationTest.java all pass

Unfortunately the second patch from the series can't be applied anymore, could
you please rebase it one more time? Other than that it looks strange for me
that the corresponding discussion stopped when it was quite close to be in a
good shape, bouncing from "rejected with feedback" to "needs review". Can
someone from involved people clarify what's the current status of this patch?

#3Dmitry Dolgov
9erthalion6@gmail.com
In reply to: Dmitry Dolgov (#2)
Re: Reviving the "Stopping logical replication protocol" patch from Vladimir Gordichuk

On Mon, Nov 19, 2018 at 4:58 PM Dmitry Dolgov <9erthalion6@gmail.com> wrote:

On Tue, Jul 24, 2018 at 5:52 PM Dave Cramer <davecramer@gmail.com> wrote:

Back in 2016 a patch was proposed that seems to have died on the vine. See /messages/by-id/CAFgjRd3hdYOa33m69TbeOfNNer2BZbwa8FFjt2V5VFzTBvUU3w@mail.gmail.com
for the history and https://commitfest.postgresql.org/10/621/ for the commitfest entry.
I've rebased the patches and attached them for consideration.
JDBC tests here https://github.com/pgjdbc/pgjdbc/blob/master/pgjdbc/src/test/java/org/postgresql/replication/LogicalReplicationTest.java all pass

Unfortunately the second patch from the series can't be applied anymore, could
you please rebase it one more time? Other than that it looks strange for me
that the corresponding discussion stopped when it was quite close to be in a
good shape, bouncing from "rejected with feedback" to "needs review". Can
someone from involved people clarify what's the current status of this patch?

Looks like I'm a failure as a neuromancer, since revival didn't happen. I'm
marking it as returned with feedback.

#4Dave Cramer
davecramer@gmail.com
In reply to: Dmitry Dolgov (#3)
Re: Reviving the "Stopping logical replication protocol" patch from Vladimir Gordichuk

Why is this being closed? I did not see the first email looking for
clarification.

The history is the original author dropped off the planet (no idea where he
is)

I can certainly rebase it.

Dave Cramer

On Fri, 30 Nov 2018 at 18:00, Dmitry Dolgov <9erthalion6@gmail.com> wrote:

Show quoted text

On Mon, Nov 19, 2018 at 4:58 PM Dmitry Dolgov <9erthalion6@gmail.com>

wrote:

On Tue, Jul 24, 2018 at 5:52 PM Dave Cramer <davecramer@gmail.com>

wrote:

Back in 2016 a patch was proposed that seems to have died on the vine.

See
/messages/by-id/CAFgjRd3hdYOa33m69TbeOfNNer2BZbwa8FFjt2V5VFzTBvUU3w@mail.gmail.com

for the history and https://commitfest.postgresql.org/10/621/ for the

commitfest entry.

I've rebased the patches and attached them for consideration.
JDBC tests here

https://github.com/pgjdbc/pgjdbc/blob/master/pgjdbc/src/test/java/org/postgresql/replication/LogicalReplicationTest.java
all pass

Unfortunately the second patch from the series can't be applied anymore,

could

you please rebase it one more time? Other than that it looks strange for

me

that the corresponding discussion stopped when it was quite close to be

in a

good shape, bouncing from "rejected with feedback" to "needs review". Can
someone from involved people clarify what's the current status of this

patch?

Looks like I'm a failure as a neuromancer, since revival didn't happen. I'm
marking it as returned with feedback.

#5Dmitry Dolgov
9erthalion6@gmail.com
In reply to: Dave Cramer (#4)
Re: Reviving the "Stopping logical replication protocol" patch from Vladimir Gordichuk

On Sat, Dec 1, 2018 at 12:17 AM Dave Cramer <davecramer@gmail.com> wrote:

Why is this being closed? I did not see the first email looking for clarification.

Well, mostly due total absence of response and broken mind reading crystal ball.

I can certainly rebase it.

Yes, please do. I'll change the CF item status back.

#6Dave Cramer
davecramer@gmail.com
In reply to: Dmitry Dolgov (#5)
Re: Reviving the "Stopping logical replication protocol" patch from Vladimir Gordichuk

Dmitry,

Thanks, I have done a preliminary check and it seems pretty straightforward.

I will clean it up for Monday

Thanks for your patience!

Dave Cramer

On Fri, 30 Nov 2018 at 18:22, Dmitry Dolgov <9erthalion6@gmail.com> wrote:

Show quoted text

On Sat, Dec 1, 2018 at 12:17 AM Dave Cramer <davecramer@gmail.com> wrote:

Why is this being closed? I did not see the first email looking for

clarification.

Well, mostly due total absence of response and broken mind reading crystal
ball.

I can certainly rebase it.

Yes, please do. I'll change the CF item status back.

#7Dmitry Dolgov
9erthalion6@gmail.com
In reply to: Dave Cramer (#6)
Re: Reviving the "Stopping logical replication protocol" patch from Vladimir Gordichuk

On Sat, Dec 1, 2018 at 12:49 AM Dave Cramer <davecramer@gmail.com> wrote:

Thanks, I have done a preliminary check and it seems pretty straightforward.

I will clean it up for Monday

Great, thank you!

#8Dave Cramer
davecramer@gmail.com
In reply to: Dmitry Dolgov (#7)
4 attachment(s)
Re: Reviving the "Stopping logical replication protocol" patch from Vladimir Gordichuk

Dmitry,

Please see attached rebased patches

Dave Cramer

On Fri, 30 Nov 2018 at 18:52, Dmitry Dolgov <9erthalion6@gmail.com> wrote:

Show quoted text

On Sat, Dec 1, 2018 at 12:49 AM Dave Cramer <davecramer@gmail.com> wrote:

Thanks, I have done a preliminary check and it seems pretty

straightforward.

I will clean it up for Monday

Great, thank you!

Attachments:

0004-Add-test-for-pg_recvlogical-to-stop-replication.patchapplication/octet-stream; name=0004-Add-test-for-pg_recvlogical-to-stop-replication.patchDownload
From 4008ddd3c797e5684833f51a9cd667e74cf95b2c Mon Sep 17 00:00:00 2001
From: Dave Cramer <davecramer@gmail.com>
Date: Mon, 3 Dec 2018 06:29:08 -0500
Subject: [PATCH 4/4] Add test for pg_recvlogical to stop replication

---
 src/test/recovery/t/0016_pg_recvlogical.pl | 226 +++++++++++++++++++++++++++++
 1 file changed, 226 insertions(+)
 create mode 100644 src/test/recovery/t/0016_pg_recvlogical.pl

diff --git a/src/test/recovery/t/0016_pg_recvlogical.pl b/src/test/recovery/t/0016_pg_recvlogical.pl
new file mode 100644
index 0000000..1dd41df
--- /dev/null
+++ b/src/test/recovery/t/0016_pg_recvlogical.pl
@@ -0,0 +1,226 @@
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 8;
+use IPC::Run qw( start timeout ) ;
+
+
+my $verbose = $ENV{PG_TAP_VERBOSE};
+
+# Launch pg_recvlogical as a background proc and return the IPC::Run handle for it
+# as well as the proc's
+sub start_pg_recvlogical
+{
+    my ($node, $slotname, %params) = @_;
+    my $stdout = my $stderr = '';
+    my $timeout           = undef;
+    my $timeout_exception = 'pg_recvlogical timed out';
+
+    $timeout =
+        IPC::Run::timeout($params{timeout}, exception => $timeout_exception)
+        if (defined($params{timeout}));
+
+    my @cmd = ("pg_recvlogical", "--verbose", "-S", "$slotname", "--no-loop", "--dbname", $node->connstr, "--start", "-f", "-");
+
+    push @cmd, @{ $params{option} }
+        if defined $params{option};
+
+    diag "Running '@cmd'" if $verbose;
+
+    my $proc = start \@cmd, '<', \undef, '2>', \$stderr, '>', \$stdout, $timeout;
+
+    die $! unless defined($proc);
+
+    sleep 5;
+
+    if ($stdout ne "")
+    {
+        diag "#### Begin standard out\n" if $verbose;
+        diag $stdout if $verbose;
+        diag "\n#### End standard out\n" if $verbose;
+    }
+
+    if ($stderr ne "")
+    {
+        diag "#### Begin standard error\n" if $verbose;
+        diag $stderr if $verbose;
+        diag "\n#### End standard error\n" if $verbose;
+    }
+
+    if (wantarray)
+    {
+        return ($proc, \$stdout, \$stderr, $timeout);
+    }
+    else
+    {
+        return $proc;
+    }
+}
+
+sub wait_for_start_streaming
+{
+    my ($node, $slotname) = @_;
+
+    diag "waiting for " . $node->name . " start streaming by slot ".$slotname if $verbose;
+    $node->poll_query_until('postgres', "select active from pg_replication_slots where slot_name = '$slotname';");
+}
+
+sub wait_for_stop_streaming
+{
+    my ($node, $slotname) = @_;
+
+    diag "waiting for " . $node->name . " streaming by slot ".$slotname." will be stopped" if $verbose;
+    $node->poll_query_until('postgres', "select not(active) from pg_replication_slots where slot_name = '$slotname';");
+}
+
+sub create_logical_replication_slot
+{
+    my ($node, $slotname, $outplugin) = @_;
+
+    $node->safe_psql(
+        "postgres",
+        "select pg_drop_replication_slot('$slotname') where exists (select 1 from pg_replication_slots where slot_name = '$slotname');");
+
+    $node->safe_psql('postgres',
+        "SELECT pg_create_logical_replication_slot('$slotname', '$outplugin');"
+    );
+}
+
+my ($proc);
+
+# Initialize master node
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1, has_archiving => 1);
+$node_master->append_conf('postgresql.conf', "wal_level = 'logical'\n");
+$node_master->append_conf('postgresql.conf', "max_replication_slots = 12\n");
+$node_master->append_conf('postgresql.conf', "max_wal_senders = 12\n");
+$node_master->append_conf('postgresql.conf', "max_connections = 20\n");
+$node_master->dump_info;
+$node_master->start;
+
+
+#TestCase 1: client initialize stop logical replication when database doesn't have new changes(calm state)
+{
+    my $slotname = 'calm_state_slot';
+
+    create_logical_replication_slot($node_master, $slotname, "test_decoding");
+
+    my ($stdout, $stderr, $timeout);
+    ($proc, $stdout, $stderr, $timeout) = start_pg_recvlogical(
+        $node_master,
+        $slotname,
+        timeout => 60,
+        extra_params => ['-o include-xids=false', '-o skip-empty-xacts=true']
+    );
+
+    wait_for_start_streaming($node_master, $slotname);
+
+    my $cancelTime = time();
+    $proc->signal("SIGINT");
+
+    $proc->pump while $proc->pumpable;
+
+    wait_for_stop_streaming($node_master, $slotname);
+
+    my $spendTime = time() - $cancelTime;
+
+    my $timed_out = 0;
+    eval {
+        $proc->finish;
+    };
+    if ($@)
+    {
+        my $x = $@;
+        if ($timeout->is_expired)
+        {
+            diag "whoops, pg_recvlogical timed out" if $verbose;
+            $timed_out = 1;
+        }
+        else
+        {
+            die $x;
+        }
+    }
+
+    if ($verbose)
+    {
+        diag "#--- pg_recvlogical stderr ---";
+        diag $$stderr;
+        diag "#--- end stderr ---";
+    }
+
+    ok(!$timed_out, "pg_recvlogical exited before timeout when idle");
+    like($$stderr, qr/stopping write up to/, 'pg_recvlogical responded to sigint when idle');
+    like($$stderr, qr/streaming ended by user request/, 'idle wait ended due to client copydone');
+    diag "decoding when idle stopped after ${spendTime}s";
+    ok((time() - $cancelTime) <= 3, # allow extra time for slow machines
+        "pg_recvlogical exited promptly on sigint when idle"
+    );
+}
+
+
+#TestCase 2: client initialize stop logical replication during decode huge transaction(insert 200000 records)
+{
+    my $slotname = 'huge_tx_state_slot';
+
+    create_logical_replication_slot($node_master, $slotname, "test_decoding");
+
+    $node_master->safe_psql('postgres',
+        "create table test_logic_table(pk serial primary key, name varchar(100));");
+
+    diag 'Insert huge amount of data to table test_logic_table' if $verbose;
+    $node_master->safe_psql('postgres',
+        "insert into test_logic_table select id, md5(random()::text) as name from generate_series(1, 200000) as id;");
+
+    my ($stdout, $stderr, $timeout);
+    ($proc, $stdout, $stderr, $timeout) = start_pg_recvlogical(
+        $node_master,
+        $slotname,
+        timeout => 60,
+        extra_params => ['-o include-xids=false', '-o skip-empty-xacts=true']
+    );
+
+    wait_for_start_streaming($node_master, $slotname);
+
+    my $cancelTime = time();
+    $proc->signal("SIGINT");
+
+    $proc->pump while $proc->pumpable;
+
+    wait_for_stop_streaming($node_master, $slotname);
+
+    my $spendTime = time() - $cancelTime;
+
+    my $timed_out = 0;
+    eval {
+        $proc->finish;
+    };
+    if ($@)
+    {
+        my $x = $@;
+        if ($timeout->is_expired)
+        {
+            diag "whoops, pg_recvlogical timed out" if $verbose;
+            $timed_out = 1;
+        }
+        else
+        {
+            die $x;
+        }
+    }
+
+    if ($verbose)
+    {
+        diag "#--- pg_recvlogical stderr ---";
+        diag $$stderr;
+        diag "#--- end stderr ---";
+    }
+
+    ok(!$timed_out, "pg_recvlogical exited before timeout when streaming");
+    like($$stderr, qr/stopping write up to/, 'pg_recvlogical responded to sigint when streaming');
+    like($$stderr, qr/streaming ended by user request/, 'streaming ended due to client copydone');
+    diag "decoding of big xact stopped after ${spendTime}s";
+    ok($spendTime <= 5, # allow extra time for slow machines
+        "pg_recvlogical exited promptly on signal when decoding");
+}
-- 
2.6.4

0003-Add-ability-for-pg_recvlogical-to-stop-replication-f.patchapplication/octet-stream; name=0003-Add-ability-for-pg_recvlogical-to-stop-replication-f.patchDownload
From 867251ede310384ec3c9c72487411e634bd006fe Mon Sep 17 00:00:00 2001
From: Dave Cramer <davecramer@gmail.com>
Date: Mon, 3 Dec 2018 06:28:31 -0500
Subject: [PATCH 3/4] Add ability for pg_recvlogical to stop replication from
 client side

---
 src/bin/pg_basebackup/pg_recvlogical.c | 490 +++++++++++++++++++--------------
 1 file changed, 290 insertions(+), 200 deletions(-)

diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c
index a242e0b..2d69aa7 100644
--- a/src/bin/pg_basebackup/pg_recvlogical.c
+++ b/src/bin/pg_basebackup/pg_recvlogical.c
@@ -56,8 +56,12 @@ static const char *plugin = "test_decoding";
 /* Global State */
 static int	outfd = -1;
 static volatile sig_atomic_t time_to_abort = false;
+static volatile sig_atomic_t force_time_to_abort = false;
 static volatile sig_atomic_t output_reopen = false;
+static bool copyDoneSent;
+static bool copyDoneReceived;
 static bool output_isfile;
+static int64 last_status_time;
 static TimestampTz output_last_fsync = -1;
 static bool output_needs_fsync = false;
 static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
@@ -205,6 +209,222 @@ OutputFsync(TimestampTz now)
 	return true;
 }
 
+static bool
+ProcessKeepalive(PGconn *conn, char *msgBuf, int msgLength)
+{
+	int			pos;
+	bool		replyRequested;
+	XLogRecPtr	walEnd;
+
+	/*
+	 * Parse the keepalive message, enclosed in the CopyData message.
+	 * We just check if the server requested a reply, and ignore the
+	 * rest.
+	 */
+	pos = 1;			/* skip msgtype 'k' */
+
+	/* read walEnd */
+	walEnd = fe_recvint64(&msgBuf[pos]);
+	output_written_lsn = Max(walEnd, output_written_lsn);
+
+	pos += 8;			/* skip sendTime */
+
+	if (msgLength < pos + 1)
+	{
+		fprintf(stderr, _("%s: streaming header too small: %d\n"),
+				progname, msgLength);
+		return -1;
+	}
+	replyRequested = msgBuf[pos];
+
+	/* If the server requested an immediate reply, send one. */
+	if (replyRequested)
+	{
+		int64 now = feGetCurrentTimestamp();
+
+		/* fsync data, so we send a recent flush pointer */
+		if (!OutputFsync(now))
+		{
+			return false;
+		}
+
+		if (!sendFeedback(conn, now, true, false))
+		{
+			return false;
+		}
+		last_status_time = now;
+	}
+
+	return true;
+}
+
+static bool
+ProcessXLogData(PGconn *conn, char *msgBuf, int msgLength)
+{
+	int bytes_left;
+	int bytes_written;
+
+	/*
+	 * Read the header of the XLogData message, enclosed in the CopyData
+	 * message. We only need the WAL location field (dataStart), the rest
+	 * of the header is ignored.
+	 */
+	int hdr_len = 1;			/* msgtype 'w' */
+	hdr_len += 8;			/* dataStart */
+	hdr_len += 8;			/* walEnd */
+	hdr_len += 8;			/* sendTime */
+	if (msgLength < hdr_len + 1)
+	{
+		fprintf(stderr, _("%s: streaming header too small: %d\n"),
+				progname, msgLength);
+		return false;
+	}
+
+	if (time_to_abort && copyDoneSent)
+	{
+		/*
+		 * We've sent feedback and sent CopyDone, so we are now discarding
+		 * xlog data input to find the server's reply CopyDone. That way when
+		 * another client connects to the slot later they start replay exactly
+		 * where we left off - or at least at the last commit we flushed to
+		 * disk. This is not an error condition.
+		 */
+		return true;
+	}
+
+	/* Extract WAL location for this block */
+	{
+		XLogRecPtr	temp = fe_recvint64(&msgBuf[1]);
+
+		output_written_lsn = Max(temp, output_written_lsn);
+	}
+
+	bytes_left = msgLength - hdr_len;
+	bytes_written = 0;
+
+	/* signal that a fsync is needed */
+	output_needs_fsync = true;
+
+	while (bytes_left)
+	{
+		int			ret;
+
+		ret = write(outfd,
+					msgBuf + hdr_len + bytes_written,
+					bytes_left);
+
+		if (ret < 0)
+		{
+			fprintf(stderr,
+			  _("%s: could not write %u bytes to log file \"%s\": %s\n"),
+					progname, bytes_left, outfile,
+					strerror(errno));
+			return false;
+		}
+
+		/* Write was successful, advance our position */
+		bytes_written += ret;
+		bytes_left -= ret;
+	}
+
+	if (write(outfd, "\n", 1) != 1)
+	{
+		fprintf(stderr,
+			  _("%s: could not write %u bytes to log file \"%s\": %s\n"),
+				progname, 1, outfile,
+				strerror(errno));
+		return false;
+	}
+
+	return true;
+}
+
+static bool
+ProcessReceiveMsg(PGconn *conn, unsigned char type, char *msgBuf, int msgLength)
+{
+	bool success = false;
+	switch (type)
+	{
+		case 'k':
+			success = ProcessKeepalive(conn, msgBuf, msgLength);
+			break;
+		case 'w':
+			success = ProcessXLogData(conn, msgBuf, msgLength);
+			break;
+		default:
+			fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
+					progname, type);
+	}
+
+	return success;
+}
+
+/*
+ * Sync wait activity on socket. Waiting can be interrupt by fsync or keepalive timeout.
+ * Returns the number of ready descriptors, or -1 for errors.
+ */
+static int
+WaitSocketActivity(PGconn *conn, int64 now)
+{
+	/*
+	 * In async mode, and no data available. We block on reading but
+	 * not more than the specified timeout, so that we can send a
+	 * response back to the client.
+	 */
+	fd_set		input_mask;
+	int64		message_target = 0;
+	int64		fsync_target = 0;
+	struct timeval timeout;
+	struct timeval *timeoutptr = NULL;
+
+	if (PQsocket(conn) < 0)
+	{
+		fprintf(stderr,
+				_("%s: invalid socket: %s"),
+				progname, PQerrorMessage(conn));
+		return -1;
+	}
+
+	FD_ZERO(&input_mask);
+	FD_SET(PQsocket(conn), &input_mask);
+
+	/* Compute when we need to wakeup to send a keepalive message. */
+	if (standby_message_timeout)
+		message_target = last_status_time + (standby_message_timeout - 1) *
+			((int64) 1000);
+
+	/* Compute when we need to wakeup to fsync the output file. */
+	if (fsync_interval > 0 && output_needs_fsync)
+		fsync_target = output_last_fsync + (fsync_interval - 1) *
+			((int64) 1000);
+
+	/* Now compute when to wakeup. */
+	if (message_target > 0 || fsync_target > 0)
+	{
+		int64		targettime;
+		long		secs;
+		int			usecs;
+
+		targettime = message_target;
+
+		if (fsync_target > 0 && fsync_target < targettime)
+			targettime = fsync_target;
+
+		feTimestampDifference(now,
+							  targettime,
+							  &secs,
+							  &usecs);
+		if (secs <= 0)
+			timeout.tv_sec = 1; /* Always sleep at least 1 sec */
+		else
+			timeout.tv_sec = secs;
+		timeout.tv_usec = usecs;
+		timeoutptr = &timeout;
+	}
+
+	return select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
+}
+
 /*
  * Start the log streaming
  */
@@ -213,13 +433,14 @@ StreamLogicalLog(void)
 {
 	PGresult   *res;
 	char	   *copybuf = NULL;
-	TimestampTz last_status = -1;
 	int			i;
 	PQExpBuffer query;
 
 	output_written_lsn = InvalidXLogRecPtr;
 	output_fsync_lsn = InvalidXLogRecPtr;
-
+	last_status_time = -1;
+	copyDoneReceived = false;
+	copyDoneSent = false;
 	query = createPQExpBuffer();
 
 	/*
@@ -281,13 +502,10 @@ StreamLogicalLog(void)
 				_("%s: streaming initiated\n"),
 				progname);
 
-	while (!time_to_abort)
+	while (!force_time_to_abort)
 	{
 		int			r;
-		int			bytes_left;
-		int			bytes_written;
 		TimestampTz now;
-		int			hdr_len;
 		XLogRecPtr	cur_record_lsn = InvalidXLogRecPtr;
 
 		if (copybuf != NULL)
@@ -309,15 +527,50 @@ StreamLogicalLog(void)
 				goto error;
 		}
 
-		if (standby_message_timeout > 0 &&
-			feTimestampDifferenceExceeds(last_status, now,
+		if (standby_message_timeout > 0 && !time_to_abort &&
+			feTimestampDifferenceExceeds(last_status_time, now,
 										 standby_message_timeout))
 		{
 			/* Time to send feedback! */
 			if (!sendFeedback(conn, now, true, false))
 				goto error;
 
-			last_status = now;
+			last_status_time = now;
+		}
+
+		if (time_to_abort && !copyDoneSent)
+		{
+			if (verbose)
+			{
+				fprintf(stderr,
+						_("%s: stopping write up to %X/%X, flush to %X/%X (slot %s)\n"),
+						progname,
+						(uint32) (output_written_lsn >> 32), (uint32) output_written_lsn,
+						(uint32) (output_fsync_lsn >> 32), (uint32) output_fsync_lsn,
+						replication_slot);
+			}
+
+			/*
+			 * Force fsync and send feedback just before we send CopyDone to
+			 * make sure the server knows exactly what we replayed up to. We'll
+			 * discard data received after we request the end of COPY BOTH mode
+			 * so we know we've written everything we're going to.
+			 */
+			if (!OutputFsync(now))
+				goto error;
+
+			if (!sendFeedback(conn, now, true, false))
+				goto error;
+
+			last_status_time = now;
+
+			if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
+			{
+				fprintf(stderr, _("%s: could not send copy-end packet: %s"),
+						progname, PQerrorMessage(conn));
+				goto error;
+			}
+			copyDoneSent = true;
 		}
 
 		/* got SIGHUP, close output file */
@@ -360,64 +613,9 @@ StreamLogicalLog(void)
 		r = PQgetCopyData(conn, &copybuf, 1);
 		if (r == 0)
 		{
-			/*
-			 * In async mode, and no data available. We block on reading but
-			 * not more than the specified timeout, so that we can send a
-			 * response back to the client.
-			 */
-			fd_set		input_mask;
-			TimestampTz message_target = 0;
-			TimestampTz fsync_target = 0;
-			struct timeval timeout;
-			struct timeval *timeoutptr = NULL;
-
-			if (PQsocket(conn) < 0)
-			{
-				fprintf(stderr,
-						_("%s: invalid socket: %s"),
-						progname, PQerrorMessage(conn));
-				goto error;
-			}
-
-			FD_ZERO(&input_mask);
-			FD_SET(PQsocket(conn), &input_mask);
-
-			/* Compute when we need to wakeup to send a keepalive message. */
-			if (standby_message_timeout)
-				message_target = last_status + (standby_message_timeout - 1) *
-					((int64) 1000);
-
-			/* Compute when we need to wakeup to fsync the output file. */
-			if (fsync_interval > 0 && output_needs_fsync)
-				fsync_target = output_last_fsync + (fsync_interval - 1) *
-					((int64) 1000);
-
-			/* Now compute when to wakeup. */
-			if (message_target > 0 || fsync_target > 0)
-			{
-				TimestampTz targettime;
-				long		secs;
-				int			usecs;
-
-				targettime = message_target;
-
-				if (fsync_target > 0 && fsync_target < targettime)
-					targettime = fsync_target;
-
-				feTimestampDifference(now,
-									  targettime,
-									  &secs,
-									  &usecs);
-				if (secs <= 0)
-					timeout.tv_sec = 1; /* Always sleep at least 1 sec */
-				else
-					timeout.tv_sec = secs;
-				timeout.tv_usec = usecs;
-				timeoutptr = &timeout;
-			}
-
-			r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
-			if (r == 0 || (r < 0 && errno == EINTR))
+ 			int readyMsg = WaitSocketActivity(conn, now);
+  
+ 			if (readyMsg == 0 || (readyMsg < 0 && errno == EINTR))
 			{
 				/*
 				 * Got a timeout or signal. Continue the loop and either
@@ -426,7 +624,7 @@ StreamLogicalLog(void)
 				 */
 				continue;
 			}
-			else if (r < 0)
+			else if (readyMsg < 0)
 			{
 				fprintf(stderr, _("%s: select() failed: %s\n"),
 						progname, strerror(errno));
@@ -441,12 +639,26 @@ StreamLogicalLog(void)
 						progname, PQerrorMessage(conn));
 				goto error;
 			}
+
 			continue;
 		}
 
-		/* End of copy stream */
+		/*
+		 * End of copy stream (server sent CopyDone)
+		 *
+		 * This is where we exit on normal time_to_abort because our own
+		 * CopyDone caused the server to shut down streaming on its end.
+		 */
 		if (r == -1)
+		{
+			copyDoneReceived = true;
+			if (verbose && time_to_abort && copyDoneSent)
+			{
+				fprintf(stderr,
+						_("%s: streaming ended by user request"), progname);
+			}
 			break;
+		}
 
 		/* Failure while reading the copy stream */
 		if (r == -2)
@@ -456,138 +668,8 @@ StreamLogicalLog(void)
 			goto error;
 		}
 
-		/* Check the message type. */
-		if (copybuf[0] == 'k')
-		{
-			int			pos;
-			bool		replyRequested;
-			XLogRecPtr	walEnd;
-			bool		endposReached = false;
-
-			/*
-			 * Parse the keepalive message, enclosed in the CopyData message.
-			 * We just check if the server requested a reply, and ignore the
-			 * rest.
-			 */
-			pos = 1;			/* skip msgtype 'k' */
-			walEnd = fe_recvint64(&copybuf[pos]);
-			output_written_lsn = Max(walEnd, output_written_lsn);
-
-			pos += 8;			/* read walEnd */
-
-			pos += 8;			/* skip sendTime */
-
-			if (r < pos + 1)
-			{
-				fprintf(stderr, _("%s: streaming header too small: %d\n"),
-						progname, r);
-				goto error;
-			}
-			replyRequested = copybuf[pos];
-
-			if (endpos != InvalidXLogRecPtr && walEnd >= endpos)
-			{
-				/*
-				 * If there's nothing to read on the socket until a keepalive
-				 * we know that the server has nothing to send us; and if
-				 * walEnd has passed endpos, we know nothing else can have
-				 * committed before endpos.  So we can bail out now.
-				 */
-				endposReached = true;
-			}
-
-			/* Send a reply, if necessary */
-			if (replyRequested || endposReached)
-			{
-				if (!flushAndSendFeedback(conn, &now))
-					goto error;
-				last_status = now;
-			}
-
-			if (endposReached)
-			{
-				prepareToTerminate(conn, endpos, true, InvalidXLogRecPtr);
-				time_to_abort = true;
-				break;
-			}
-
-			continue;
-		}
-		else if (copybuf[0] != 'w')
-		{
-			fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
-					progname, copybuf[0]);
-			goto error;
-		}
-
-		/*
-		 * Read the header of the XLogData message, enclosed in the CopyData
-		 * message. We only need the WAL location field (dataStart), the rest
-		 * of the header is ignored.
-		 */
-		hdr_len = 1;			/* msgtype 'w' */
-		hdr_len += 8;			/* dataStart */
-		hdr_len += 8;			/* walEnd */
-		hdr_len += 8;			/* sendTime */
-		if (r < hdr_len + 1)
+		if(!ProcessReceiveMsg(conn, copybuf[0], copybuf, r))
 		{
-			fprintf(stderr, _("%s: streaming header too small: %d\n"),
-					progname, r);
-			goto error;
-		}
-
-		/* Extract WAL location for this block */
-		cur_record_lsn = fe_recvint64(&copybuf[1]);
-
-		if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos)
-		{
-			/*
-			 * We've read past our endpoint, so prepare to go away being
-			 * cautious about what happens to our output data.
-			 */
-			if (!flushAndSendFeedback(conn, &now))
-				goto error;
-			prepareToTerminate(conn, endpos, false, cur_record_lsn);
-			time_to_abort = true;
-			break;
-		}
-
-		output_written_lsn = Max(cur_record_lsn, output_written_lsn);
-
-		bytes_left = r - hdr_len;
-		bytes_written = 0;
-
-		/* signal that a fsync is needed */
-		output_needs_fsync = true;
-
-		while (bytes_left)
-		{
-			int			ret;
-
-			ret = write(outfd,
-						copybuf + hdr_len + bytes_written,
-						bytes_left);
-
-			if (ret < 0)
-			{
-				fprintf(stderr,
-						_("%s: could not write %u bytes to log file \"%s\": %s\n"),
-						progname, bytes_left, outfile,
-						strerror(errno));
-				goto error;
-			}
-
-			/* Write was successful, advance our position */
-			bytes_written += ret;
-			bytes_left -= ret;
-		}
-
-		if (write(outfd, "\n", 1) != 1)
-		{
-			fprintf(stderr,
-					_("%s: could not write %u bytes to log file \"%s\": %s\n"),
-					progname, 1, outfile,
-					strerror(errno));
 			goto error;
 		}
 
@@ -656,6 +738,14 @@ error:
 static void
 sigint_handler(int signum)
 {
+	/*
+	 * Backward compatible, allow force interrupt logical replication
+	 * after second SIGINT without wait CopyDone from server
+	 */
+	if (time_to_abort)
+	{
+		force_time_to_abort = true;
+	}
 	time_to_abort = true;
 }
 
-- 
2.6.4

0001-Respect-client-initiated-CopyDone-in-walsender.patchapplication/octet-stream; name=0001-Respect-client-initiated-CopyDone-in-walsender.patchDownload
From 4d023cfc1fed0b5852b4da1aad6a32549b03ce26 Mon Sep 17 00:00:00 2001
From: Dave Cramer <davecramer@gmail.com>
Date: Fri, 30 Nov 2018 18:23:49 -0500
Subject: [PATCH 1/5] Respect client initiated CopyDone in walsender

---
 src/backend/replication/walsender.c | 36 ++++++++++++++++++++++++++++++------
 1 file changed, 30 insertions(+), 6 deletions(-)

diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 46edb52..93f2648 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -770,6 +770,14 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 	sendTimeLineValidUpto = state->currTLIValidUntil;
 	sendTimeLineNextTLI = state->nextTLI;
 
+	/*
+	* If the client sent CopyDone while we were waiting,
+	* bail out so we can wind up the decoding session.
+	*/
+	if (streamingDoneSending)
+		return -1;
+
+	 /* more than one block available */
 	/* make sure we have enough WAL available */
 	flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
 
@@ -1341,8 +1349,12 @@ WalSndWaitForWal(XLogRecPtr loc)
 		 * It's important to do this check after the recomputation of
 		 * RecentFlushPtr, so we can send all remaining data before shutting
 		 * down.
-		 */
-		if (got_STOPPING)
+		 *
+		 * We'll also exit here if the client sent CopyDone because it wants
+		 * to return to command mode.
+		*/
+
+		if (got_STOPPING || streamingDoneReceiving)
 			break;
 
 		/*
@@ -2095,7 +2107,14 @@ WalSndCheckTimeOut(void)
 	}
 }
 
-/* Main loop of walsender process that streams the WAL over Copy messages. */
+/*
+ * Main loop of walsender process that streams the WAL over Copy messages.
+ *
+ * The send_data callback must enqueue complete CopyData messages to libpq
+ * using pq_putmessage_noblock or similar, since the walsender loop may send
+ * CopyDone then exit and return to command mode in response to a client
+ * CopyDone between calls to send_data.
+ */
 static void
 WalSndLoop(WalSndSendDataCallback send_data)
 {
@@ -2142,10 +2161,15 @@ WalSndLoop(WalSndSendDataCallback send_data)
 		 * some more.  If there is some, we don't bother to call send_data
 		 * again until we've flushed it ... but we'd better assume we are not
 		 * caught up.
+		 *
+		 * If we're trying to finish sending and exit we shouldn't enqueue more
+		 * data to libpq. We need to finish writing out whatever we already
+		 * have in libpq's send buffer to maintain protocol sync so we still
+		 * need to loop until it's flushed.
 		 */
-		if (!pq_is_send_pending())
+		if (!pq_is_send_pending() && !streamingDoneSending)
 			send_data();
-		else
+		else if (!streamingDoneSending)
 			WalSndCaughtUp = false;
 
 		/* Try to flush pending output to the client */
@@ -3375,7 +3399,7 @@ WalSndKeepaliveIfNecessary(void)
 	if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
 		return;
 
-	if (waiting_for_ping_response)
+	if (waiting_for_ping_response || streamingDoneSending)
 		return;
 
 	/*
-- 
2.6.4

0002-Client-initiated-CopyDone-during-transaction-decodin.patchapplication/octet-stream; name=0002-Client-initiated-CopyDone-during-transaction-decodin.patchDownload
From 6d784e7e678268af1eadb7b50f96308e5bd3d15c Mon Sep 17 00:00:00 2001
From: Dave Cramer <davecramer@gmail.com>
Date: Fri, 30 Nov 2018 18:36:58 -0500
Subject: [PATCH 2/5] Client-initiated CopyDone during transaction decoding in 
 walsender

---
 src/backend/replication/logical/logical.c       | 14 ++++++---
 src/backend/replication/logical/logicalfuncs.c  |  2 +-
 src/backend/replication/logical/reorderbuffer.c | 11 +++++--
 src/backend/replication/slotfuncs.c             |  4 +--
 src/backend/replication/walsender.c             | 42 ++++++++++++++++---------
 src/include/replication/logical.h               |  6 ++--
 src/include/replication/reorderbuffer.h         | 13 ++++++++
 7 files changed, 65 insertions(+), 27 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 9f99e4f..9ce1f6e 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -126,7 +126,8 @@ StartupDecodingContext(List *output_plugin_options,
 					   XLogPageReadCB read_page,
 					   LogicalOutputPluginWriterPrepareWrite prepare_write,
 					   LogicalOutputPluginWriterWrite do_write,
-					   LogicalOutputPluginWriterUpdateProgress update_progress)
+					   LogicalOutputPluginWriterUpdateProgress update_progress,
+					   ContinueDecodingCB continue_decoding_cb)
 {
 	ReplicationSlot *slot;
 	MemoryContext context,
@@ -193,6 +194,7 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder->apply_truncate = truncate_cb_wrapper;
 	ctx->reorder->commit = commit_cb_wrapper;
 	ctx->reorder->message = message_cb_wrapper;
+	ctx->reorder->continue_decoding_cb = continue_decoding_cb;
 
 	ctx->out = makeStringInfo();
 	ctx->prepare_write = prepare_write;
@@ -231,7 +233,8 @@ CreateInitDecodingContext(char *plugin,
 						  XLogPageReadCB read_page,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
 						  LogicalOutputPluginWriterWrite do_write,
-						  LogicalOutputPluginWriterUpdateProgress update_progress)
+						  LogicalOutputPluginWriterUpdateProgress update_progress,
+						  ContinueDecodingCB continue_decoding_cb)
 {
 	TransactionId xmin_horizon = InvalidTransactionId;
 	ReplicationSlot *slot;
@@ -319,7 +322,7 @@ CreateInitDecodingContext(char *plugin,
 	ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
 								 need_full_snapshot, false,
 								 read_page, prepare_write, do_write,
-								 update_progress);
+								 update_progress, continue_decoding_cb);
 
 	/* call output plugin initialization callback */
 	old_context = MemoryContextSwitchTo(ctx->context);
@@ -366,7 +369,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 					  XLogPageReadCB read_page,
 					  LogicalOutputPluginWriterPrepareWrite prepare_write,
 					  LogicalOutputPluginWriterWrite do_write,
-					  LogicalOutputPluginWriterUpdateProgress update_progress)
+					  LogicalOutputPluginWriterUpdateProgress update_progress,
+					  ContinueDecodingCB continue_decoding_cb)
 {
 	LogicalDecodingContext *ctx;
 	ReplicationSlot *slot;
@@ -417,7 +421,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 	ctx = StartupDecodingContext(output_plugin_options,
 								 start_lsn, InvalidTransactionId, false,
 								 fast_forward, read_page, prepare_write,
-								 do_write, update_progress);
+								 do_write, update_progress, continue_decoding_cb);
 
 	/* call output plugin initialization callback */
 	old_context = MemoryContextSwitchTo(ctx->context);
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 45aae71..3d8e61d 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -254,7 +254,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 									false,
 									logical_read_local_xlog_page,
 									LogicalOutputPrepareWrite,
-									LogicalOutputWrite, NULL);
+									LogicalOutputWrite, NULL, NULL);
 
 		MemoryContextSwitchTo(oldcontext);
 
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 23466ba..66b6e90 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1497,7 +1497,9 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 		rb->begin(rb, txn);
 
 		iterstate = ReorderBufferIterTXNInit(rb, txn);
-		while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
+		while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL &&
+			   (rb->continue_decoding_cb == NULL ||
+				rb->continue_decoding_cb()))
 		{
 			Relation	relation = NULL;
 			Oid			reloid;
@@ -1774,8 +1776,11 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 		ReorderBufferIterTXNFinish(rb, iterstate);
 		iterstate = NULL;
 
-		/* call commit callback */
-		rb->commit(rb, txn, commit_lsn);
+		if (rb->continue_decoding_cb == NULL || rb->continue_decoding_cb())
+		{
+			/* call commit callback */
+			rb->commit(rb, txn, commit_lsn);
+		}
 
 		/* this is just a sanity check against bad output plugin behaviour */
 		if (GetCurrentTransactionIdIfAny() != InvalidTransactionId)
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 8782bad..82e8f37 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -137,7 +137,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 	ctx = CreateInitDecodingContext(NameStr(*plugin), NIL,
 									false,	/* do not build snapshot */
 									logical_read_local_xlog_page, NULL, NULL,
-									NULL);
+									NULL, NULL);
 
 	/* build initial snapshot, might take a while */
 	DecodingContextFindStartpoint(ctx);
@@ -370,7 +370,7 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
 									NIL,
 									true,	/* fast_forward */
 									logical_read_local_xlog_page,
-									NULL, NULL, NULL);
+									NULL, NULL, NULL, NULL);
 
 		/*
 		 * Start reading at the slot's restart_lsn, which we know to point to
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 93f2648..f0f0390 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -258,6 +258,23 @@ static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
 
 static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
 
+/*
+ * Return true until either the client or server side have requested that we wind
+ * up COPY BOTH mode by sending a CopyDone.
+ *
+ * If we receive a CopyDone from the client we should avoid sending any further
+ * CopyData messages and return to command mode as promptly as possible.
+ *
+ * While in the middle of sending data to a client we notice a client-initated
+ * CopyDone when WalSndWriteData() calls ProcessRepliesIfAny() and it
+ * sets streamingDoneSending.
+ */
+static
+bool IsStreamingActive(void)
+{
+	return !streamingDoneReceiving && !streamingDoneSending;
+}
+
 
 /* Initialize walsender process before entering the main command loop */
 void
@@ -940,7 +957,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 		ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
 										logical_read_xlog_page,
 										WalSndPrepareWrite, WalSndWriteData,
-										WalSndUpdateProgress);
+										WalSndUpdateProgress, IsStreamingActive);
 
 		/*
 		 * Signal that we don't need the timeout mechanism. We're just
@@ -1091,7 +1108,8 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 		CreateDecodingContext(cmd->startpoint, cmd->options, false,
 							  logical_read_xlog_page,
 							  WalSndPrepareWrite, WalSndWriteData,
-							  WalSndUpdateProgress);
+							  WalSndUpdateProgress,
+							  IsStreamingActive);
 
 
 	WalSndSetState(WALSNDSTATE_CATCHUP);
@@ -1194,17 +1212,6 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 
 	CHECK_FOR_INTERRUPTS();
 
-	/* Try to flush pending output to the client */
-	if (pq_flush_if_writable() != 0)
-		WalSndShutdown();
-
-	/* Try taking fast path unless we get too close to walsender timeout. */
-	if (now < TimestampTzPlusMilliseconds(last_reply_timestamp,
-										  wal_sender_timeout / 2) &&
-		!pq_is_send_pending())
-	{
-		return;
-	}
 
 	/* If we have pending write here, go to slow path */
 	for (;;)
@@ -1358,7 +1365,14 @@ WalSndWaitForWal(XLogRecPtr loc)
 			break;
 
 		/*
-		 * We only send regular messages to the client for full decoded
+		 * If we have received CopyDone from the client, sent CopyDone
+		 * ourselves, it's time to exit streaming.
+		 */
+		if (!IsStreamingActive()) {
+			break;
+		}
+
+		/* We only send regular messages to the client for full decoded
 		 * transactions, but a synchronous replication and walsender shutdown
 		 * possibly are waiting for a later location. So we send pings
 		 * containing the flush location every now and then.
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index c25ac1f..4822bfc 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -100,7 +100,8 @@ extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
 						  XLogPageReadCB read_page,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
 						  LogicalOutputPluginWriterWrite do_write,
-						  LogicalOutputPluginWriterUpdateProgress update_progress);
+						  LogicalOutputPluginWriterUpdateProgress update_progress,
+						  ContinueDecodingCB continue_decoding_cb);
 extern LogicalDecodingContext *CreateDecodingContext(
 					  XLogRecPtr start_lsn,
 					  List *output_plugin_options,
@@ -108,7 +109,8 @@ extern LogicalDecodingContext *CreateDecodingContext(
 					  XLogPageReadCB read_page,
 					  LogicalOutputPluginWriterPrepareWrite prepare_write,
 					  LogicalOutputPluginWriterWrite do_write,
-					  LogicalOutputPluginWriterUpdateProgress update_progress);
+					  LogicalOutputPluginWriterUpdateProgress update_progress,
+					  ContinueDecodingCB continue_decoding_cb);
 extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx);
 extern bool DecodingContextReady(LogicalDecodingContext *ctx);
 extern void FreeDecodingContext(LogicalDecodingContext *ctx);
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 7787edf..a3f7e8d 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -326,6 +326,14 @@ typedef void (*ReorderBufferMessageCB) (
 										const char *prefix, Size sz,
 										const char *message);
 
+
+/*
+ * Callback function that allow interrupt logical replication during decoding.
+ * Function return true if decoding can be continue decode, but if function return false
+ * logical decoding will stop as soon as possible.
+ */
+typedef bool (*ContinueDecodingCB) (void);
+
 struct ReorderBuffer
 {
 	/*
@@ -365,6 +373,11 @@ struct ReorderBuffer
 	ReorderBufferMessageCB message;
 
 	/*
+	 * Callback to define status of decoding. Return false if decoding not necessary continue
+	 */
+	ContinueDecodingCB continue_decoding_cb;
+
+	/*
 	 * Pointer that will be passed untouched to the callbacks.
 	 */
 	void	   *private_data;
-- 
2.6.4

#9Craig Ringer
craig@2ndquadrant.com
In reply to: Dave Cramer (#8)
Re: Reviving the "Stopping logical replication protocol" patch from Vladimir Gordichuk

On Mon, 3 Dec 2018 at 19:38, Dave Cramer <davecramer@gmail.com> wrote:

Dmitry,

Please see attached rebased patches

I'm fine with patch 0001, though I find this comment a bit hard to follow:

+ * The send_data callback must enqueue complete CopyData messages to libpq
+ * using pq_putmessage_noblock or similar, since the walsender loop may
send
+ * CopyDone then exit and return to command mode in response to a client
+ * CopyDone between calls to send_data.

I think it needs splitting up into a couple of sentences.

In patch 0002, stopping during a txn. It's important that once we skip any
action, we continue skipping. In patch 0002 I'd like it to be clearer that
we will *always* skip the rb->commit callback if rb->continue_decoding_cb()
returned false and aborted the while loop. I suggest storing the result of
(rb->continue_decoding_cb == NULL || rb->continue_decoding_cb()) in an
assignment within the while loop, and testing that result later.

e.g.

(continue_decoding = (rb->continue_decoding_cb == NULL ||
rb->continue_decoding_cb()))

and later

if (continue_decoding) {
rb->commit(rb, txn, commit_lsn);
}

I don't actually think it's necessary to re-test the continue_decoding
callback and skip commit here. If we've sent all the of the txn
except the commit, we might as well send the commit, it's tiny and might
save some hassle later.

I think a comment on the skipped commit would be good too:

/*
* If we skipped any changes due to a client CopyDone we must not send a
commit
* otherwise the client would incorrectly think it received a complete
transaction.
*/

I notice that the fast-path logic in WalSndWriteData is removed by this
patch. It isn't moved; there's no comparison of last_reply_timestamp
and wal_sender_timeout now. What's the rationale there? You want to ensure
that we reach ProcessRepliesIfAny() ? Can we cheaply test for a readable
client socket then still take the fast-path if it's not readable?

--
Craig Ringer http://www.2ndQuadrant.com/
2ndQuadrant - PostgreSQL Solutions for the Enterprise

#10Dave Cramer
davecramer@gmail.com
In reply to: Craig Ringer (#9)
Re: Reviving the "Stopping logical replication protocol" patch from Vladimir Gordichuk

Dave Cramer

On Sun, 13 Jan 2019 at 23:19, Craig Ringer <craig@2ndquadrant.com> wrote:

On Mon, 3 Dec 2018 at 19:38, Dave Cramer <davecramer@gmail.com> wrote:

Dmitry,

Please see attached rebased patches

I'm fine with patch 0001, though I find this comment a bit hard to follow:

+ * The send_data callback must enqueue complete CopyData messages to libpq
+ * using pq_putmessage_noblock or similar, since the walsender loop may
send
+ * CopyDone then exit and return to command mode in response to a client
+ * CopyDone between calls to send_data.

I think it needs splitting up into a couple of sentences.

Fair point, remember it was originally written by a non-english speaker

In patch 0002, stopping during a txn. It's important that once we skip any
action, we continue skipping. In patch 0002 I'd like it to be clearer that
we will *always* skip the rb->commit callback if rb->continue_decoding_cb()
returned false and aborted the while loop. I suggest storing the result of
(rb->continue_decoding_cb == NULL || rb->continue_decoding_cb()) in an
assignment within the while loop, and testing that result later.

e.g.

(continue_decoding = (rb->continue_decoding_cb == NULL ||
rb->continue_decoding_cb()))

and later

if (continue_decoding) {
rb->commit(rb, txn, commit_lsn);
}

Will do

I don't actually think it's necessary to re-test the continue_decoding
callback and skip commit here. If we've sent all the of the txn
except the commit, we might as well send the commit, it's tiny and might
save some hassle later.

I think a comment on the skipped commit would be good too:

/*
* If we skipped any changes due to a client CopyDone we must not send a
commit
* otherwise the client would incorrectly think it received a complete
transaction.
*/

I notice that the fast-path logic in WalSndWriteData is removed by this
patch. It isn't moved; there's no comparison of last_reply_timestamp
and wal_sender_timeout now. What's the rationale there? You want to ensure
that we reach ProcessRepliesIfAny() ? Can we cheaply test for a readable
client socket then still take the fast-path if it's not readable?

This may have been a mistake as I am taking this over from a very old patch
that I did not originally write. I'll have a look at this

I

Show quoted text

--
Craig Ringer http://www.2ndQuadrant.com/
2ndQuadrant - PostgreSQL Solutions for the Enterprise

#11Dave Cramer
davecramer@gmail.com
In reply to: Dave Cramer (#10)
Re: Reviving the "Stopping logical replication protocol" patch from Vladimir Gordichuk

Dave Cramer

On Tue, 15 Jan 2019 at 07:53, Dave Cramer <davecramer@gmail.com> wrote:

Dave Cramer

On Sun, 13 Jan 2019 at 23:19, Craig Ringer <craig@2ndquadrant.com> wrote:

On Mon, 3 Dec 2018 at 19:38, Dave Cramer <davecramer@gmail.com> wrote:

Dmitry,

Please see attached rebased patches

I'm fine with patch 0001, though I find this comment a bit hard to follow:

+ * The send_data callback must enqueue complete CopyData messages to
libpq
+ * using pq_putmessage_noblock or similar, since the walsender loop may
send
+ * CopyDone then exit and return to command mode in response to a client
+ * CopyDone between calls to send_data.

I think it needs splitting up into a couple of sentences.

Fair point, remember it was originally written by a non-english speaker

Thoughts on below ?

+/*
+ * Main loop of walsender process that streams the WAL over Copy messages.
+ *
+ * The send_data callback must enqueue complete CopyData messages to the
client
+ * using pq_putmessage_noblock or similar
+ * In order to preserve the protocol it is necessary to send all of the
existing
+ * messages still in the buffer as the WalSender loop may send
+ * CopyDone then exit and return to command mode in response to a client
+ * CopyDone between calls to send_data.
+ */

In patch 0002, stopping during a txn. It's important that once we skip
any action, we continue skipping. In patch 0002 I'd like it to be clearer
that we will *always* skip the rb->commit callback
if rb->continue_decoding_cb() returned false and aborted the while loop. I
suggest storing the result of (rb->continue_decoding_cb == NULL ||
rb->continue_decoding_cb()) in an assignment within the while loop, and
testing that result later.

e.g.

(continue_decoding = (rb->continue_decoding_cb == NULL ||
rb->continue_decoding_cb()))

and later

if (continue_decoding) {
rb->commit(rb, txn, commit_lsn);
}

Will do

Hmmm... I don't actually see how this is any different than what we have in
the patch now where below would the test occur?

I don't actually think it's necessary to re-test the continue_decoding

callback and skip commit here. If we've sent all the of the txn
except the commit, we might as well send the commit, it's tiny and might
save some hassle later.

I think a comment on the skipped commit would be good too:

/*
* If we skipped any changes due to a client CopyDone we must not send a
commit
* otherwise the client would incorrectly think it received a complete
transaction.
*/

I notice that the fast-path logic in WalSndWriteData is removed by this
patch. It isn't moved; there's no comparison of last_reply_timestamp
and wal_sender_timeout now. What's the rationale there? You want to ensure
that we reach ProcessRepliesIfAny() ? Can we cheaply test for a readable
client socket then still take the fast-path if it's not readable?

This may have been a mistake as I am taking this over from a very old
patch that I did not originally write. I'll have a look at this

OK, I'm trying to decipher the original intent of this patch as well as I
didn't write it.

There are some hints here
/messages/by-id/CAFgjRd1LgVbtH=9O9_xvKQjvUP7aRF-edxqwKfaNs9hMFW_4gw@mail.gmail.com

As to why the fast path logic was removed. Does it make sense to you?

Dave

Show quoted text
#12Andres Freund
andres@anarazel.de
In reply to: Dave Cramer (#8)
Re: Reviving the "Stopping logical replication protocol" patch from Vladimir Gordichuk

Hi,

On 2018-12-03 06:38:43 -0500, Dave Cramer wrote:

From 4d023cfc1fed0b5852b4da1aad6a32549b03ce26 Mon Sep 17 00:00:00 2001
From: Dave Cramer <davecramer@gmail.com>
Date: Fri, 30 Nov 2018 18:23:49 -0500
Subject: [PATCH 1/5] Respect client initiated CopyDone in walsender

---
src/backend/replication/walsender.c | 36 ++++++++++++++++++++++++++++++------
1 file changed, 30 insertions(+), 6 deletions(-)

diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 46edb52..93f2648 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -770,6 +770,14 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
sendTimeLineValidUpto = state->currTLIValidUntil;
sendTimeLineNextTLI = state->nextTLI;
+	/*
+	* If the client sent CopyDone while we were waiting,
+	* bail out so we can wind up the decoding session.
+	*/
+	if (streamingDoneSending)
+		return -1;
+
+	 /* more than one block available */
/* make sure we have enough WAL available */
flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
@@ -1341,8 +1349,12 @@ WalSndWaitForWal(XLogRecPtr loc)
* It's important to do this check after the recomputation of
* RecentFlushPtr, so we can send all remaining data before shutting
* down.
-		 */
-		if (got_STOPPING)
+		 *
+		 * We'll also exit here if the client sent CopyDone because it wants
+		 * to return to command mode.
+		*/
+
+		if (got_STOPPING || streamingDoneReceiving)
break;

/*
@@ -2095,7 +2107,14 @@ WalSndCheckTimeOut(void)
}
}

-/* Main loop of walsender process that streams the WAL over Copy messages. */
+/*
+ * Main loop of walsender process that streams the WAL over Copy messages.
+ *
+ * The send_data callback must enqueue complete CopyData messages to libpq
+ * using pq_putmessage_noblock or similar, since the walsender loop may send
+ * CopyDone then exit and return to command mode in response to a client
+ * CopyDone between calls to send_data.
+ */

Wait, how is it ok to end CopyDone before all the pending data has been
sent out?

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 23466ba..66b6e90 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1497,7 +1497,9 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
rb->begin(rb, txn);
iterstate = ReorderBufferIterTXNInit(rb, txn);
-		while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
+		while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL &&
+			   (rb->continue_decoding_cb == NULL ||
+				rb->continue_decoding_cb()))
{
Relation	relation = NULL;
Oid			reloid;

@@ -1774,8 +1776,11 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
ReorderBufferIterTXNFinish(rb, iterstate);
iterstate = NULL;

-		/* call commit callback */
-		rb->commit(rb, txn, commit_lsn);
+		if (rb->continue_decoding_cb == NULL || rb->continue_decoding_cb())
+		{
+			/* call commit callback */
+			rb->commit(rb, txn, commit_lsn);
+		}

I'm doubtful it's ok to simply stop in the middle of a transaction.

@@ -1194,17 +1212,6 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,

CHECK_FOR_INTERRUPTS();

- /* Try to flush pending output to the client */
- if (pq_flush_if_writable() != 0)
- WalSndShutdown();
-
- /* Try taking fast path unless we get too close to walsender timeout. */
- if (now < TimestampTzPlusMilliseconds(last_reply_timestamp,
- wal_sender_timeout / 2) &&
- !pq_is_send_pending())
- {
- return;
- }

As somebody else commented on the thread, I'm also doubtful this is
ok. This'll introduce significant additional blocking unless I'm missing
something?

/* If we have pending write here, go to slow path */
for (;;)
@@ -1358,7 +1365,14 @@ WalSndWaitForWal(XLogRecPtr loc)
break;

/*
-		 * We only send regular messages to the client for full decoded
+		 * If we have received CopyDone from the client, sent CopyDone
+		 * ourselves, it's time to exit streaming.
+		 */
+		if (!IsStreamingActive()) {
+			break;
+		}

Wrong formatting.

I wonder if the saner approach here isn't to support query cancellations
or something of that vein, and then handle the error.

Greetings,

Andres Freund

#13Dave Cramer
davecramer@gmail.com
In reply to: Andres Freund (#12)
Re: Reviving the "Stopping logical replication protocol" patch from Vladimir Gordichuk

Andres,

Thanks for looking at this. FYI, I did not originally write this, rather
the original author has not replied to requests.
JDBC could use this, I assume others could as well.

That said I'm certainly open to suggestions on how to do this.

Craig, do you have any other ideas?

Dave Cramer

On Fri, 15 Feb 2019 at 22:01, Andres Freund <andres@anarazel.de> wrote:

Show quoted text

Hi,

On 2018-12-03 06:38:43 -0500, Dave Cramer wrote:

From 4d023cfc1fed0b5852b4da1aad6a32549b03ce26 Mon Sep 17 00:00:00 2001
From: Dave Cramer <davecramer@gmail.com>
Date: Fri, 30 Nov 2018 18:23:49 -0500
Subject: [PATCH 1/5] Respect client initiated CopyDone in walsender

---
src/backend/replication/walsender.c | 36

++++++++++++++++++++++++++++++------

1 file changed, 30 insertions(+), 6 deletions(-)

diff --git a/src/backend/replication/walsender.c

b/src/backend/replication/walsender.c

index 46edb52..93f2648 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -770,6 +770,14 @@ logical_read_xlog_page(XLogReaderState *state,

XLogRecPtr targetPagePtr, int req

sendTimeLineValidUpto = state->currTLIValidUntil;
sendTimeLineNextTLI = state->nextTLI;

+     /*
+     * If the client sent CopyDone while we were waiting,
+     * bail out so we can wind up the decoding session.
+     */
+     if (streamingDoneSending)
+             return -1;
+
+      /* more than one block available */
/* make sure we have enough WAL available */
flushptr = WalSndWaitForWal(targetPagePtr + reqLen);

@@ -1341,8 +1349,12 @@ WalSndWaitForWal(XLogRecPtr loc)
* It's important to do this check after the recomputation

of

* RecentFlushPtr, so we can send all remaining data

before shutting

* down.
-              */
-             if (got_STOPPING)
+              *
+              * We'll also exit here if the client sent CopyDone

because it wants

+              * to return to command mode.
+             */
+
+             if (got_STOPPING || streamingDoneReceiving)
break;

/*
@@ -2095,7 +2107,14 @@ WalSndCheckTimeOut(void)
}
}

-/* Main loop of walsender process that streams the WAL over Copy

messages. */

+/*
+ * Main loop of walsender process that streams the WAL over Copy

messages.

+ *
+ * The send_data callback must enqueue complete CopyData messages to

libpq

+ * using pq_putmessage_noblock or similar, since the walsender loop may

send

+ * CopyDone then exit and return to command mode in response to a client
+ * CopyDone between calls to send_data.
+ */

Wait, how is it ok to end CopyDone before all the pending data has been
sent out?

diff --git a/src/backend/replication/logical/reorderbuffer.c

b/src/backend/replication/logical/reorderbuffer.c

index 23466ba..66b6e90 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1497,7 +1497,9 @@ ReorderBufferCommit(ReorderBuffer *rb,

TransactionId xid,

rb->begin(rb, txn);

iterstate = ReorderBufferIterTXNInit(rb, txn);
- while ((change = ReorderBufferIterTXNNext(rb, iterstate))

!= NULL)

+ while ((change = ReorderBufferIterTXNNext(rb, iterstate))

!= NULL &&

+                        (rb->continue_decoding_cb == NULL ||
+                             rb->continue_decoding_cb()))
{
Relation        relation = NULL;
Oid                     reloid;

@@ -1774,8 +1776,11 @@ ReorderBufferCommit(ReorderBuffer *rb,

TransactionId xid,

ReorderBufferIterTXNFinish(rb, iterstate);
iterstate = NULL;

-             /* call commit callback */
-             rb->commit(rb, txn, commit_lsn);
+             if (rb->continue_decoding_cb == NULL ||

rb->continue_decoding_cb())

+             {
+                     /* call commit callback */
+                     rb->commit(rb, txn, commit_lsn);
+             }

I'm doubtful it's ok to simply stop in the middle of a transaction.

@@ -1194,17 +1212,6 @@ WalSndWriteData(LogicalDecodingContext *ctx,

XLogRecPtr lsn, TransactionId xid,

CHECK_FOR_INTERRUPTS();

- /* Try to flush pending output to the client */
- if (pq_flush_if_writable() != 0)
- WalSndShutdown();
-
- /* Try taking fast path unless we get too close to walsender

timeout. */

- if (now < TimestampTzPlusMilliseconds(last_reply_timestamp,
-

wal_sender_timeout / 2) &&

- !pq_is_send_pending())
- {
- return;
- }

As somebody else commented on the thread, I'm also doubtful this is
ok. This'll introduce significant additional blocking unless I'm missing
something?

/* If we have pending write here, go to slow path */
for (;;)
@@ -1358,7 +1365,14 @@ WalSndWaitForWal(XLogRecPtr loc)
break;

/*
- * We only send regular messages to the client for full

decoded

+ * If we have received CopyDone from the client, sent

CopyDone

+              * ourselves, it's time to exit streaming.
+              */
+             if (!IsStreamingActive()) {
+                     break;
+             }

Wrong formatting.

I wonder if the saner approach here isn't to support query cancellations
or something of that vein, and then handle the error.

Greetings,

Andres Freund

#14David Steele
david@pgmasters.net
In reply to: Dave Cramer (#13)
Re: Re: Reviving the "Stopping logical replication protocol" patch from Vladimir Gordichuk

On 2/16/19 10:38 PM, Dave Cramer wrote:

Thanks for looking at this. FYI, I did not originally write this, rather
the original author has not replied to requests.
JDBC could use this, I assume others could as well.

That said I'm certainly open to suggestions on how to do this.

Craig, do you have any other ideas?

This patch appears to be stalled. Are there any ideas on how to proceed?

Regards,
--
-David
david@pgmasters.net

#15Thomas Munro
thomas.munro@gmail.com
In reply to: David Steele (#14)
Re: Re: Reviving the "Stopping logical replication protocol" patch from Vladimir Gordichuk

On Thu, Mar 7, 2019 at 8:19 PM David Steele <david@pgmasters.net> wrote:

On 2/16/19 10:38 PM, Dave Cramer wrote:

Thanks for looking at this. FYI, I did not originally write this, rather
the original author has not replied to requests.
JDBC could use this, I assume others could as well.

That said I'm certainly open to suggestions on how to do this.

Craig, do you have any other ideas?

This patch appears to be stalled. Are there any ideas on how to proceed?

Hi Dave (C),

It sounds like this should be withdrawn for now, and potentially
revived in some future CF if someone has time to work on it?

--
Thomas Munro
https://enterprisedb.com

#16Dave Cramer
davecramer@gmail.com
In reply to: Thomas Munro (#15)
Re: Re: Reviving the "Stopping logical replication protocol" patch from Vladimir Gordichuk

On Mon, 8 Jul 2019 at 06:40, Thomas Munro <thomas.munro@gmail.com> wrote:

On Thu, Mar 7, 2019 at 8:19 PM David Steele <david@pgmasters.net> wrote:

On 2/16/19 10:38 PM, Dave Cramer wrote:

Thanks for looking at this. FYI, I did not originally write this,

rather

the original author has not replied to requests.
JDBC could use this, I assume others could as well.

That said I'm certainly open to suggestions on how to do this.

Craig, do you have any other ideas?

This patch appears to be stalled. Are there any ideas on how to proceed?

Hi Dave (C),

It sounds like this should be withdrawn for now, and potentially
revived in some future CF if someone has time to work on it?

Hi Thomas,

I'm fine with that decision.

Thanks,

Dave Cramer

Show quoted text