From dbe0e37d71bf32853f63ee5ca5961d0b2a7d827c Mon Sep 17 00:00:00 2001
From: Craig Ringer <craig@2ndquadrant.com>
Date: Thu, 3 Nov 2016 16:18:36 +0800
Subject: [PATCH 15/21] Tests for logical decoding on standby

---
 .../recovery/t/010_logical_decoding_on_replica.pl  | 168 +++++++++++++++++++++
 1 file changed, 168 insertions(+)
 create mode 100644 src/test/recovery/t/010_logical_decoding_on_replica.pl

diff --git a/src/test/recovery/t/010_logical_decoding_on_replica.pl b/src/test/recovery/t/010_logical_decoding_on_replica.pl
new file mode 100644
index 0000000..0d869e4
--- /dev/null
+++ b/src/test/recovery/t/010_logical_decoding_on_replica.pl
@@ -0,0 +1,168 @@
+# Demonstrate that logical can follow timeline switches.
+#
+# Test logical decoding on a standby.
+#
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 28;
+use RecursiveCopy;
+use File::Copy;
+
+my ($stdout, $stderr, $ret);
+my $backup_name;
+
+# Initialize master node
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1, has_archiving => 1);
+$node_master->append_conf('postgresql.conf', "wal_level = 'logical'\n");
+$node_master->append_conf('postgresql.conf', "max_replication_slots = 4\n");
+$node_master->append_conf('postgresql.conf', "max_wal_senders = 4\n");
+$node_master->append_conf('postgresql.conf', "log_min_messages = 'debug2'\n");
+$node_master->append_conf('postgresql.conf', "log_error_verbosity = verbose\n");
+$node_master->append_conf('postgresql.conf', "hot_standby_feedback = on\n");
+# send status rapidly so we promptly advance xmin on master
+$node_master->append_conf('postgresql.conf', "wal_receiver_status_interval = 1\n");
+$node_master->dump_info;
+$node_master->start;
+
+$node_master->safe_psql('postgres', q[SELECT * FROM pg_create_physical_replication_slot('decoding_standby');]);
+$backup_name = 'b1';
+my $backup_dir = $node_master->backup_dir . "/" . $backup_name;
+TestLib::system_or_bail('pg_basebackup', '-D', $backup_dir, '-d', $node_master->connstr('postgres'), '--xlog-method=stream', '--write-recovery-conf', '--slot=decoding_standby');
+
+open(my $fh, "<", $backup_dir . "/recovery.conf")
+  or die "can't open recovery.conf";
+
+my $found = 0;
+while (my $line = <$fh>)
+{
+	chomp($line);
+	if ($line eq "primary_slot_name = 'decoding_standby'")
+	{
+		$found = 1;
+		last;
+	}
+}
+ok($found, "using physical slot for standby");
+
+sub print_phys_xmin
+{
+    my ($xmin, $catalog_xmin) = split(qr/\|/, $node_master->safe_psql('postgres', q[SELECT xmin, catalog_xmin FROM pg_replication_slots WHERE slot_name = 'decoding_standby';]));
+	return ($xmin, $catalog_xmin);
+}
+
+my ($xmin, $catalog_xmin) = print_phys_xmin();
+# without the catalog_xmin hot standby feedback patch, catalog_xmin is always null
+# and xmin is the min(xmin, catalog_xmin) of all slots on the standby + anything else
+# holding down xmin.
+ok(!$xmin, "xmin null");
+ok(!$catalog_xmin, "catalog_xmin null");
+
+my $node_replica = get_new_node('replica');
+$node_replica->init_from_backup(
+	$node_master, $backup_name,
+	has_streaming => 1,
+	has_restoring => 1);
+
+$node_replica->start;
+
+$node_master->wait_for_catchup($node_replica);
+sleep(2); # ensure walreceiver feedback sent
+
+($xmin, $catalog_xmin) = print_phys_xmin();
+ok($xmin, "xmin not null");
+ok(!$catalog_xmin, "catalog_xmin null");
+
+# Create new slots on the replica, ignoring the ones on the master completely.
+is($node_replica->psql('postgres', qq[SELECT * FROM pg_create_logical_replication_slot('standby_logical', 'test_decoding')]),
+   0, 'logical slot creation on standby succeeded');
+
+sub print_logical_xmin
+{
+    my ($xmin, $catalog_xmin) = split(qr/\|/, $node_replica->safe_psql('postgres', q[SELECT xmin, catalog_xmin FROM pg_replication_slots WHERE slot_name = 'standby_logical';]));
+	return ($xmin, $catalog_xmin);
+}
+
+$node_master->wait_for_catchup($node_replica);
+sleep(2); # ensure walreceiver feedback sent
+
+($xmin, $catalog_xmin) = print_phys_xmin();
+isnt($xmin, '', "physical xmin not null");
+isnt($catalog_xmin, '', "physical catalog_xmin not null");
+
+($xmin, $catalog_xmin) = print_logical_xmin();
+is($xmin, '', "logical xmin null");
+isnt($catalog_xmin, '', "logical catalog_xmin not null");
+
+$node_master->safe_psql('postgres', 'CREATE TABLE test_table(id serial primary key, blah text)');
+$node_master->safe_psql('postgres', q[INSERT INTO test_table(blah) values ('itworks')]);
+
+$node_master->wait_for_catchup($node_replica);
+sleep(2); # ensure walreceiver feedback sent
+
+($xmin, $catalog_xmin) = print_phys_xmin();
+isnt($xmin, '', "physical xmin not null");
+isnt($catalog_xmin, '', "physical catalog_xmin not null");
+
+$node_master->wait_for_catchup($node_replica);
+sleep(2); # ensure walreceiver feedback sent
+
+($ret, $stdout, $stderr) = $node_replica->psql('postgres', qq[SELECT data FROM pg_logical_slot_get_changes('standby_logical', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-timestamp', '0')]);
+is($ret, 0, 'replay from slot succeeded');
+is($stdout, q{BEGIN
+table public.test_table: INSERT: id[integer]:1 blah[text]:'itworks'
+COMMIT}, 'replay results match');
+is($stderr, 'psql:<stdin>:1: WARNING:  logical decoding during recovery is experimental', 'stderr is warning');
+
+$node_master->wait_for_catchup($node_replica);
+sleep(2); # ensure walreceiver feedback sent
+
+my ($physical_xmin, $physical_catalog_xmin) = print_phys_xmin();
+isnt($physical_xmin, '', "physical xmin not null");
+isnt($physical_catalog_xmin, '', "physical catalog_xmin not null");
+
+my ($logical_xmin, $logical_catalog_xmin) = print_logical_xmin();
+is($logical_xmin, '', "logical xmin null");
+isnt($logical_catalog_xmin, '', "logical catalog_xmin not null");
+
+# Ok, do a pile of tx's and make sure xmin advances.
+# Ideally we'd just hold catalog_xmin, but since hs_feedback currently uses the slot,
+# we hold down xmin.
+for my $i (0 .. 1000)
+{
+    $node_master->safe_psql('postgres', qq[INSERT INTO test_table(blah) VALUES ('entry $i')]);
+}
+$node_master->safe_psql('postgres', 'VACUUM');
+
+($ret, $stdout, $stderr) = $node_replica->psql('postgres', qq[SELECT data FROM pg_logical_slot_get_changes('standby_logical', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-timestamp', '0')]);
+is($ret, 0, 'replay of big series succeeded');
+
+$node_master->wait_for_catchup($node_replica);
+sleep(2); # ensure walreceiver feedback sent
+
+my ($new_logical_xmin, $new_logical_catalog_xmin) = print_logical_xmin();
+is($new_logical_xmin, '', "logical xmin null");
+isnt($new_logical_catalog_xmin, '', "logical catalog_xmin not null");
+isnt($new_logical_catalog_xmin, $logical_catalog_xmin, "logical catalog_xmin changed");
+
+$node_master->wait_for_catchup($node_replica);
+sleep(2); # ensure walreceiver feedback sent
+
+my ($new_physical_xmin, $new_physical_catalog_xmin) = print_phys_xmin();
+isnt($new_physical_xmin, '', "physical xmin not null");
+# hot standby feedback should advance phys xmin now the standby's slot doesn't
+# hold it down as far.
+isnt($new_physical_xmin, $physical_xmin, "physical xmin changed");
+isnt($new_physical_catalog_xmin, '', "physical catalog_xmin not null");
+
+$node_replica->psql('postgres', q[SELECT pg_drop_replication_slot('standby_logical')]);
+
+$node_master->wait_for_catchup($node_replica);
+sleep(2); # ensure walreceiver feedback sent
+
+($xmin, $catalog_xmin) = print_phys_xmin();
+isnt($xmin, '', "physical xmin not null");
+is($catalog_xmin, '', "physical catalog_xmin null");
-- 
2.5.5

