From 987fe6ff1bb1fb3be5b7e79d63b943d2f64a0a30 Mon Sep 17 00:00:00 2001
From: Craig Ringer <craig@2ndquadrant.com>
Date: Mon, 14 Nov 2016 12:27:17 +0800
Subject: [PATCH 04/21] PostgresNode methods to wait for node catchup

---
 src/test/perl/PostgresNode.pm         | 120 +++++++++++++++++++++++++++++++++-
 src/test/recovery/t/001_stream_rep.pl |  12 +---
 2 files changed, 120 insertions(+), 12 deletions(-)

diff --git a/src/test/perl/PostgresNode.pm b/src/test/perl/PostgresNode.pm
index c1b16ca..28e9f0b 100644
--- a/src/test/perl/PostgresNode.pm
+++ b/src/test/perl/PostgresNode.pm
@@ -93,6 +93,7 @@ use RecursiveCopy;
 use Socket;
 use Test::More;
 use TestLib ();
+use pg_lsn qw(parse_lsn);
 use Scalar::Util qw(blessed);
 
 our @EXPORT = qw(
@@ -1121,7 +1122,6 @@ sub psql
 		my $exc_save = $@;
 		if ($exc_save)
 		{
-
 			# IPC::Run::run threw an exception. re-throw unless it's a
 			# timeout, which we'll handle by testing is_expired
 			die $exc_save
@@ -1173,7 +1173,7 @@ sub psql
 		  if $ret == 1;
 		die "connection error: '$$stderr'\nwhile running '@psql_params'"
 		  if $ret == 2;
-		die "error running SQL: '$$stderr'\nwhile running '@psql_params'"
+		die "error running SQL: '$$stderr'\nwhile running '@psql_params' with sql '$sql'"
 		  if $ret == 3;
 		die "psql returns $ret: '$$stderr'\nwhile running '@psql_params'";
 	}
@@ -1325,6 +1325,122 @@ sub run_log
 	TestLib::run_log(@_);
 }
 
+=pod $node->lsn
+
+Return pg_current_xlog_insert_location() or, on a replica,
+pg_last_xlog_replay_location().
+
+=cut
+
+sub lsn
+{
+	my $self = shift;
+	return $self->safe_psql('postgres', 'select case when pg_is_in_recovery() then pg_last_xlog_replay_location() else pg_current_xlog_insert_location() end as lsn;');
+}
+
+=pod $node->wait_for_catchup(standby_name, mode, target_lsn)
+
+Wait for the node with application_name standby_name (usually from node->name)
+until its replication equals or passes the upstream's xlog insert point at the
+time this function is called. By default the replay_location is waited for,
+but 'mode' may be specified to wait for any of sent|write|flush|replay.
+
+If there is no active replication connection from this peer, waits until
+poll_query_until timeout.
+
+Requires that the 'postgres' db exists and is accessible.
+
+If pos is passed, use that xlog position instead of the server's current
+xlog insert position.
+
+This is not a test. It die()s on failure.
+
+Returns the LSN caught up to.
+
+=cut
+
+sub wait_for_catchup
+{
+	my ($self, $standby_name, $mode, $target_lsn) = @_;
+	$mode = defined($mode) ? $mode : 'replay';
+	my %valid_modes = ( 'sent' => 1, 'write' => 1, 'flush' => 1, 'replay' => 1 );
+	die "valid modes are " . join(', ', keys(%valid_modes)) unless exists($valid_modes{$mode});
+	if ( blessed( $standby_name ) && $standby_name->isa("PostgresNode") ) {
+		$standby_name = $standby_name->name;
+	}
+	if (!defined($target_lsn)) {
+		$target_lsn = $self->lsn;
+	}
+	$self->poll_query_until('postgres', qq[SELECT '$target_lsn' <= ${mode}_location FROM pg_catalog.pg_stat_replication WHERE application_name = '$standby_name';])
+		or die "timed out waiting for catchup";
+	return $target_lsn;
+}
+
+=pod $node->wait_for_slot_catchup(slot_name, mode, target_lsn)
+
+Wait for the named replication slot to equal or pass the xlog position of the
+server, or the supplied target_lsn if given. The position used is the
+restart_lsn unless mode is given, in which case it may be 'restart' or
+'confirmed_flush'.
+
+Requires that the 'postgres' db exists and is accessible.
+
+This is not a test. It die()s on failure.
+
+If the slot is not active, will time out after poll_query_until's timeout.
+
+Note that for logical slots, restart_lsn is held down by the oldest in progress tx.
+
+Returns the LSN caught up to.
+
+=cut
+
+sub wait_for_slot_catchup
+{
+	my ($self, $slot_name, $mode, $target_lsn) = @_;
+	$mode = defined($mode) ? $mode : 'restart';
+	if (!($mode eq 'restart' || $mode eq 'confirmed_flush')) {
+		die "valid modes are restart, confirmed_flush";
+	}
+	if (!defined($target_lsn)) {
+		$target_lsn = $self->lsn;
+	}
+	$self->poll_query_until('postgres', qq[SELECT '$target_lsn' <= ${mode}_lsn FROM pg_catalog.pg_replication_slots WHERE slot_name = '$slot_name';])
+		or die "timed out waiting for catchup";
+	return $target_lsn;
+}
+
+=pod $node->slot(slot_name)
+
+Return hash-ref of replication slot data for the named slot, or a hash-ref with
+all values '' if not found. Does not differentiate between null and empty string
+for fields, no field is ever undef.
+
+The restart_lsn and confirmed_flush_lsn fields are returned verbatim, and also
+as a 2-list of [highword, lowword] integer. Since we rely on Perl 5.8.8 we can't
+"use bigint", it's from 5.20, and we can't assume we have Math::Bigint from CPAN
+either.
+
+=cut
+
+sub slot
+{
+	my ($self, $slot_name) = @_;
+	my @fields = ('plugin', 'slot_type', 'datoid', 'database', 'active', 'active_pid', 'xmin', 'catalog_xmin', 'restart_lsn');
+	my $result = $self->safe_psql('postgres', 'SELECT ' . join(', ', @fields) . " FROM pg_catalog.pg_replication_slots WHERE slot_name = '$slot_name'");
+	$result = undef if $result eq '';
+	# hash slice, see http://stackoverflow.com/a/16755894/398670 .
+	#
+	# Fills the hash with empty strings produced by x-operator element
+	# duplication if result is an empty row
+	#
+	my %val;
+	@val{@fields} = $result ne '' ? split(qr/\|/, $result) : ('',) x scalar(@fields);
+	$val{'restart_lsn_arr'} = parse_lsn($val{'restart_lsn'});
+	$val{'confirmed_flush_lsn_arr'} = parse_lsn($val{'confirmed_flush_lsn'});
+	return \%val;
+}
+
 =pod
 
 =back
diff --git a/src/test/recovery/t/001_stream_rep.pl b/src/test/recovery/t/001_stream_rep.pl
index 981c00b..5ce69bb 100644
--- a/src/test/recovery/t/001_stream_rep.pl
+++ b/src/test/recovery/t/001_stream_rep.pl
@@ -40,16 +40,8 @@ $node_master->safe_psql('postgres',
 	"CREATE TABLE tab_int AS SELECT generate_series(1,1002) AS a");
 
 # Wait for standbys to catch up
-my $applname_1 = $node_standby_1->name;
-my $applname_2 = $node_standby_2->name;
-my $caughtup_query =
-"SELECT pg_current_xlog_location() <= replay_location FROM pg_stat_replication WHERE application_name = '$applname_1';";
-$node_master->poll_query_until('postgres', $caughtup_query)
-  or die "Timed out while waiting for standby 1 to catch up";
-$caughtup_query =
-"SELECT pg_last_xlog_replay_location() <= replay_location FROM pg_stat_replication WHERE application_name = '$applname_2';";
-$node_standby_1->poll_query_until('postgres', $caughtup_query)
-  or die "Timed out while waiting for standby 2 to catch up";
+$node_master->wait_for_catchup($node_standby_1);
+$node_standby_1->wait_for_catchup($node_standby_2);
 
 my $result =
   $node_standby_1->safe_psql('postgres', "SELECT count(*) FROM tab_int");
-- 
2.5.5

