From 2204f65f216c93c6eb3ca9366fd687420b8b4fcf Mon Sep 17 00:00:00 2001
From: Craig Ringer <craig@2ndquadrant.com>
Date: Tue, 15 Nov 2016 16:06:16 +0800
Subject: [PATCH 02/21] Add a pg_recvlogical wrapper to PostgresNode

---
 src/test/perl/PostgresNode.pm               | 75 ++++++++++++++++++++++++++++-
 src/test/recovery/t/006_logical_decoding.pl | 31 +++++++++++-
 2 files changed, 104 insertions(+), 2 deletions(-)

diff --git a/src/test/perl/PostgresNode.pm b/src/test/perl/PostgresNode.pm
index c1b16ca..b2e4813 100644
--- a/src/test/perl/PostgresNode.pm
+++ b/src/test/perl/PostgresNode.pm
@@ -1125,7 +1125,7 @@ sub psql
 			# IPC::Run::run threw an exception. re-throw unless it's a
 			# timeout, which we'll handle by testing is_expired
 			die $exc_save
-			  if (blessed($exc_save) || $exc_save ne $timeout_exception);
+			  if (blessed($exc_save) || $exc_save !~ qr/$timeout_exception/);
 
 			$ret = undef;
 
@@ -1325,6 +1325,79 @@ sub run_log
 	TestLib::run_log(@_);
 }
 
+=pod $node->pg_recvlogical_upto(self, dbname, slot_name, endpos, timeout_secs, ...)
+
+Invoke pg_recvlogical to read from slot_name on dbname until LSN endpos, which
+corresponds to pg_recvlogical --endpos.  Gives up after timeout (if nonzero).
+
+Disallows pg_recvlogial from internally retrying on error by passing --no-loop.
+
+Plugin options are passed as additional keyword arguments.
+
+If called in scalar context, returns stdout, and die()s on timeout or nonzero return.
+
+If called in array context, returns a tuple of (retval, stdout, stderr, timeout).
+timeout is the IPC::Run::Timeout object whose is_expired method can be tested
+to check for timeout. retval is undef on timeout.
+
+=cut
+
+sub pg_recvlogical_upto
+{
+	my ($self, $dbname, $slot_name, $endpos, $timeout_secs, %plugin_options) = @_;
+	my ($stdout, $stderr);
+
+	my $timeout_exception = 'pg_recvlogical timed out';
+
+	my @cmd = ('pg_recvlogical', '-S', $slot_name, '--dbname', $self->connstr($dbname));
+	push @cmd, '--endpos', $endpos if ($endpos);
+	push @cmd, '-f', '-', '--no-loop', '--start';
+
+	while (my ($k, $v) = each %plugin_options)
+	{
+		die "= is not permitted to appear in replication option name" if ($k =~ qr/=/);
+		push @cmd, "-o", "$k=$v";
+	}
+
+	my $timeout;
+	$timeout = IPC::Run::timeout($timeout_secs, exception => $timeout_exception ) if $timeout_secs;
+	my $ret = 0;
+
+	do {
+		local $@;
+		eval {
+			IPC::Run::run(\@cmd, ">", \$stdout, "2>", \$stderr, $timeout);
+			$ret = $?;
+		};
+		my $exc_save = $@;
+		if ($exc_save)
+		{
+			# IPC::Run::run threw an exception. re-throw unless it's a
+			# timeout, which we'll handle by testing is_expired
+			die $exc_save
+			  if (blessed($exc_save) || $exc_save !~ qr/$timeout_exception/);
+
+			$ret = undef;
+
+			die "Got timeout exception '$exc_save' but timer not expired?!"
+			  unless $timeout->is_expired;
+
+			die "$exc_save waiting for endpos $endpos with stdout '$stdout', stderr '$stderr'"
+				unless wantarray;
+		}
+	};
+
+	if (wantarray)
+	{
+		return ($ret, $stdout, $stderr, $timeout);
+	}
+	else
+	{
+		die "pg_recvlogical exited with code '$ret', stdout '$stdout' and stderr '$stderr'" if $ret;
+		return $stdout;
+	}
+}
+
 =pod
 
 =back
diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl
index b80a9a9..d8cc8d3 100644
--- a/src/test/recovery/t/006_logical_decoding.pl
+++ b/src/test/recovery/t/006_logical_decoding.pl
@@ -1,9 +1,13 @@
 # Testing of logical decoding using SQL interface and/or pg_recvlogical
+#
+# Most logical decoding tests are in contrib/test_decoding. This module
+# is for work that doesn't fit well there, like where server restarts
+# are required.
 use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 2;
+use Test::More tests => 5;
 
 # Initialize master node
 my $node_master = get_new_node('master');
@@ -36,5 +40,30 @@ $result = $node_master->safe_psql('postgres', qq[SELECT pg_logical_slot_get_chan
 chomp($result);
 is($result, '', 'Decoding after fast restart repeats no rows');
 
+# Insert some rows and verify that we get the same results from pg_recvlogical
+# and the SQL interface.
+$node_master->safe_psql('postgres', qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,4) s;]);
+
+my $expected = q{BEGIN
+table public.decoding_test: INSERT: x[integer]:1 y[text]:'1'
+table public.decoding_test: INSERT: x[integer]:2 y[text]:'2'
+table public.decoding_test: INSERT: x[integer]:3 y[text]:'3'
+table public.decoding_test: INSERT: x[integer]:4 y[text]:'4'
+COMMIT};
+
+my $stdout_sql = $node_master->safe_psql('postgres', qq[SELECT data FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]);
+is($stdout_sql, $expected, 'got expected output from SQL decoding session');
+
+my $endpos = $node_master->safe_psql('postgres', "SELECT location FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL) ORDER BY location DESC LIMIT 1;");
+diag "waiting to replay $endpos";
+
+my $stdout_recv = $node_master->pg_recvlogical_upto('postgres', 'test_slot', $endpos, 10, 'include-xids' => '0', 'skip-empty-xacts' => '1');
+chomp($stdout_recv);
+is($stdout_recv, $expected, 'got same expected output from pg_recvlogical decoding session');
+
+$stdout_recv = $node_master->pg_recvlogical_upto('postgres', 'test_slot', $endpos, 10, 'include-xids' => '0', 'skip-empty-xacts' => '1');
+chomp($stdout_recv);
+is($stdout_recv, '', 'pg_recvlogical acknowledged changes, nothing pending on slot');
+
 # done with the node
 $node_master->stop;
-- 
2.5.5

