From 73444c7e6979e6493043f011600a3c49bcea2aa6 Mon Sep 17 00:00:00 2001
From: Euler Taveira <euler.taveira@enterprisedb.com>
Date: Thu, 19 Nov 2020 21:53:29 -0300
Subject: [PATCH 6/6] Overhaul tests

---
 src/test/subscription/t/020_messages.pl | 160 +++++++++++-------------
 1 file changed, 75 insertions(+), 85 deletions(-)

diff --git a/src/test/subscription/t/020_messages.pl b/src/test/subscription/t/020_messages.pl
index 8e59e324e3..d9123ed3ef 100644
--- a/src/test/subscription/t/020_messages.pl
+++ b/src/test/subscription/t/020_messages.pl
@@ -1,158 +1,148 @@
-# Tests that logical decoding messages are emitted and that
-# they do not break subscribers
+# Tests that logical decoding messages
 use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 7;
+use Test::More tests => 5;
 
+# Create publisher node
 my $node_publisher = get_new_node('publisher');
 $node_publisher->init(allows_streaming => 'logical');
 $node_publisher->start;
 
+# Create subscriber node
 my $node_subscriber = get_new_node('subscriber');
 $node_subscriber->init(allows_streaming => 'logical');
 $node_subscriber->start;
 
+# Create some preexisting content on publisher
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_test (a int primary key)");
+
+# Setup structure on subscriber
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab_test (a int primary key)");
+
+# Setup logical replication
 my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
-#
-$node_publisher->safe_psql('postgres',
-	"CREATE TABLE tab (a int PRIMARY KEY)");
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE tab_test");
+
 $node_subscriber->safe_psql('postgres',
-	"CREATE TABLE tab (a int PRIMARY KEY)");
-$node_publisher->safe_psql('postgres',
-	"CREATE PUBLICATION pub FOR TABLE tab");
-$node_subscriber->safe_psql('postgres',
-	"CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub"
+	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
 );
 
-# ensure a transactional logical decoding message shows up on the slot
-$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub DISABLE");
+# Ensure a transactional logical decoding message shows up on the slot
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE");
 
 $node_publisher->safe_psql('postgres',
-	"select pg_logical_emit_message(true, 'a prefix', 'a transactional message')"
+	"SELECT pg_logical_emit_message(true, 'pgoutput', 'a transactional message')"
 );
 
-my $slot_codes_with_message = $node_publisher->safe_psql(
+my $result = $node_publisher->safe_psql(
 	'postgres', qq(
-		select get_byte(data, 0)
-		from pg_logical_slot_peek_binary_changes('sub', NULL, NULL,
+		SELECT get_byte(data, 0)
+		FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL,
 			'proto_version', '1',
-			'publication_names', 'pub',
+			'publication_names', 'tap_pub',
 			'messages', 'true')
 ));
 
 # 66 77 67 == B M C == BEGIN MESSAGE COMMIT
-is($slot_codes_with_message, "66\n77\n67",
+is($result, qq(66
+77
+67),
 	'messages on slot are B M C with message option');
 
-my $transactional_message_flags = $node_publisher->safe_psql(
+$result = $node_publisher->safe_psql(
 	'postgres', qq(
-		select get_byte(data, 1)
-		from pg_logical_slot_peek_binary_changes('sub', NULL, NULL,
+		SELECT get_byte(data, 1), encode(substr(data, 11, 8), 'escape')
+		FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL,
 			'proto_version', '1',
-			'publication_names', 'pub',
+			'publication_names', 'tap_pub',
 			'messages', 'true')
-		offset 1 limit 1
+		OFFSET 1 LIMIT 1
 ));
 
-is($transactional_message_flags, "1",
-	"transactional message flags are set to 1");
+is($result, qq(1|pgoutput),
+	"flag transactional is set to 1 and prefix is pgoutput");
 
-my $slot_codes_without_message = $node_publisher->safe_psql(
+$result = $node_publisher->safe_psql(
 	'postgres', qq(
-		select get_byte(data, 0)
-		from pg_logical_slot_peek_binary_changes('sub', NULL, NULL,
+		SELECT get_byte(data, 0)
+		FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL,
 			'proto_version', '1',
-			'publication_names', 'pub')
+			'publication_names', 'tap_pub')
 ));
 
 # 66 67 == B C == BEGIN COMMIT
-is($slot_codes_without_message, "66\n67",
-	'messages on slot are B C without message option');
+is($result, qq(66
+67),
+	'option messages defaults to false so message (M) is not available on slot');
 
-$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
-$node_publisher->wait_for_catchup('sub');
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub ENABLE");
+$node_publisher->wait_for_catchup('tap_sub');
 
 # ensure a non-transactional logical decoding message shows up on the slot
-$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub DISABLE");
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE");
 
-$node_publisher->safe_psql('postgres', "INSERT INTO tab VALUES (3)");
+$node_publisher->safe_psql('postgres', "INSERT INTO tab_test VALUES (1)");
 
 my $message_lsn = $node_publisher->safe_psql('postgres',
-	"select pg_logical_emit_message(false, 'prefix', 'nontransactional')");
+	"SELECT pg_logical_emit_message(false, 'pgoutput', 'a non-transactional message')");
 
-$node_publisher->safe_psql('postgres', "INSERT INTO tab VALUES (4)");
+$node_publisher->safe_psql('postgres', "INSERT INTO tab_test VALUES (2)");
 
-my $slot_message_code = $node_publisher->safe_psql(
+$result = $node_publisher->safe_psql(
 	'postgres', qq(
-		select get_byte(data, 0)
-		from pg_logical_slot_peek_binary_changes('sub', NULL, NULL,
+		SELECT get_byte(data, 0), get_byte(data, 1)
+		FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL,
 			'proto_version', '1',
-			'publication_names', 'pub',
+			'publication_names', 'tap_pub',
 			'messages', 'true')
-		where lsn = '$message_lsn' and xid = 0
+		WHERE lsn = '$message_lsn' AND xid = 0
 ));
 
-is($slot_message_code, "77", "non-transactional message on slot is M");
+is($result, qq(77|0), 'non-transactional message on slot is M');
 
-my $nontransactional_message_flags = $node_publisher->safe_psql(
-	'postgres', qq(
-		select get_byte(data, 1)
-		from pg_logical_slot_peek_binary_changes('sub', NULL, NULL,
-			'proto_version', '1',
-			'publication_names', 'pub',
-			'messages', 'true')
-		offset 1 limit 1
-));
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub ENABLE");
+$node_publisher->wait_for_catchup('tap_sub');
 
-is($nontransactional_message_flags,
-	"0", "non-transactional message flags are set to 0");
-
-$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
-$node_publisher->wait_for_catchup('sub');
-
-$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub DISABLE");
-$node_subscriber->safe_psql('postgres', "checkpoint;");
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE");
 
 # wait for the replication connection to drop from the publisher
 $node_publisher->poll_query_until('postgres',
-	'SELECT count(*) from pg_catalog.pg_stat_replication', 0);
+	'SELECT COUNT(*) FROM pg_catalog.pg_stat_replication', 0);
 
-# ensure a non-transactional logical decoding message shows up on the slot
-# when it is emitted within an aborted transaction. the message won't emit
-# until something advances the LSN, which we intentionally do here with a
-# checkpoint.
+# Ensure a non-transactional logical decoding message shows up on the slot when
+# it is emitted within an aborted transaction. The message won't emit until
+# something advances the LSN, hence, we intentionally forces the server to
+# switch to a new WAL file.
 $node_publisher->safe_psql(
 	'postgres', qq(
 		BEGIN;
-		SELECT pg_logical_emit_message(false, 'prefix',
-			'nontransactional aborted 1');
-		INSERT INTO tab VALUES (5);
-		SELECT pg_logical_emit_message(false, 'prefix',
-			'nontransactional aborted 2');
+		SELECT pg_logical_emit_message(false, 'pgoutput',
+			'a non-transactional message is available even if the transaction is aborted 1');
+		INSERT INTO tab_test VALUES (3);
+		SELECT pg_logical_emit_message(true, 'pgoutput',
+			'a transactional message is not available if the transaction is aborted');
+		SELECT pg_logical_emit_message(false, 'pgoutput',
+			'a non-transactional message is available even if the transaction is aborted 2');
 		ROLLBACK;
-		CHECKPOINT;
+		SELECT pg_switch_wal();
 ));
 
-my $aborted_txn_message_codes = $node_publisher->safe_psql(
+$result = $node_publisher->safe_psql(
 	'postgres', qq(
-		select get_byte(data, 0)
-		from pg_logical_slot_peek_binary_changes('sub', NULL, NULL,
+		SELECT get_byte(data, 0), get_byte(data, 1)
+		FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL,
 			'proto_version', '1',
-			'publication_names', 'pub',
+			'publication_names', 'tap_pub',
 			'messages', 'true')
 ));
 
-is($aborted_txn_message_codes, "77\n77",
-	"non-transactional message on slot from aborted transaction is M");
-
-$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
-$node_publisher->wait_for_catchup('sub');
-
-my $result =
-	$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab");
-is($result, qq(2), 'rows move');
+is($result, qq(77|0
+77|0),
+	'non-transactional message on slot from aborted transaction is M');
 
 $node_subscriber->stop('fast');
 $node_publisher->stop('fast');
-- 
2.20.1

