From 18a619617c2cc19d0e304e33fb11e72d23731061 Mon Sep 17 00:00:00 2001
From: Dave Pirotte <dpirotte@gmail.com>
Date: Thu, 5 Nov 2020 03:14:54 +0000
Subject: [PATCH 1/6] Add logical decoding messages to pgoutput

This patch adds a "messages" option to the pgoutput output plugin. When
"messages" is true, logical decoding messages (i.e. generated via
pg_logical_emit_message) are sent to the slot consumer.
---
 doc/src/sgml/protocol.sgml                  |  65 ++++++++
 src/backend/replication/logical/proto.c     |  24 +++
 src/backend/replication/logical/worker.c    |   3 +
 src/backend/replication/pgoutput/pgoutput.c |  40 ++++-
 src/include/replication/logicalproto.h      |   3 +
 src/include/replication/pgoutput.h          |   1 +
 src/test/subscription/t/020_messages.pl     | 158 ++++++++++++++++++++
 7 files changed, 293 insertions(+), 1 deletion(-)
 create mode 100644 src/test/subscription/t/020_messages.pl

diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index cee28889e1..02449bf792 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -6413,6 +6413,71 @@ Begin
 </listitem>
 </varlistentry>
 
+<varlistentry>
+<term>
+Message
+</term>
+<listitem>
+<para>
+
+<variablelist>
+<varlistentry>
+<term>
+        Byte1('M')
+</term>
+<listitem>
+<para>
+                Identifies the message as a logical decoding message.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Int8
+</term>
+<listitem>
+<para>
+                Flags; Either 0 for no flags or 1 if the logical decoding
+                message is transactional.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Int64
+</term>
+<listitem>
+<para>
+                The LSN of the logical decoding message.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        String
+</term>
+<listitem>
+<para>
+                The prefix of the logical decoding message.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Byte<replaceable>n</replaceable>
+</term>
+<listitem>
+<para>
+                The content of the logical decoding message.
+</para>
+</listitem>
+</varlistentry>
+
+</variablelist>
+</para>
+</listitem>
+</varlistentry>
+
 <varlistentry>
 <term>
 Commit
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index fdb31182d7..deba2a321c 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -25,6 +25,7 @@
  */
 #define LOGICALREP_IS_REPLICA_IDENTITY 1
 
+#define MESSAGE_TRANSACTIONAL (1<<0)
 #define TRUNCATE_CASCADE		(1<<0)
 #define TRUNCATE_RESTART_SEQS	(1<<1)
 
@@ -361,6 +362,29 @@ logicalrep_read_truncate(StringInfo in,
 	return relids;
 }
 
+/*
+ * Write MESSAGE to stream
+ */
+void
+logicalrep_write_message(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr lsn,
+						 bool transactional, const char *prefix, Size sz,
+						 const char *message)
+{
+	uint8		flags = 0;
+
+	pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE);
+
+	/* encode and send message flags */
+	if (transactional)
+		flags |= MESSAGE_TRANSACTIONAL;
+
+	pq_sendint8(out, flags);
+	pq_sendint64(out, lsn);
+	pq_sendstring(out, prefix);
+	pq_sendint32(out, sz);
+	pq_sendbytes(out, message, sz);
+}
+
 /*
  * Write relation description to the output stream.
  */
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 04684912de..6890603622 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1936,6 +1936,9 @@ apply_dispatch(StringInfo s)
 			apply_handle_origin(s);
 			return;
 
+		case LOGICAL_REP_MSG_MESSAGE:
+			return;
+
 		case LOGICAL_REP_MSG_STREAM_START:
 			apply_handle_stream_start(s);
 			return;
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 9c997aed83..cd849c10a4 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -45,6 +45,10 @@ static void pgoutput_change(LogicalDecodingContext *ctx,
 static void pgoutput_truncate(LogicalDecodingContext *ctx,
 							  ReorderBufferTXN *txn, int nrelations, Relation relations[],
 							  ReorderBufferChange *change);
+static void pgoutput_message(LogicalDecodingContext *ctx,
+							 ReorderBufferTXN *txn, XLogRecPtr message_lsn,
+							 bool transactional, const char *prefix,
+							 Size sz, const char *message);
 static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
 								   RepOriginId origin_id);
 static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
@@ -142,6 +146,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->begin_cb = pgoutput_begin_txn;
 	cb->change_cb = pgoutput_change;
 	cb->truncate_cb = pgoutput_truncate;
+	cb->message_cb = pgoutput_message;
 	cb->commit_cb = pgoutput_commit_txn;
 	cb->filter_by_origin_cb = pgoutput_origin_filter;
 	cb->shutdown_cb = pgoutput_shutdown;
@@ -158,15 +163,17 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 static void
 parse_output_parameters(List *options, uint32 *protocol_version,
 						List **publication_names, bool *binary,
-						bool *enable_streaming)
+						bool *messages, bool *enable_streaming)
 {
 	ListCell   *lc;
 	bool		protocol_version_given = false;
 	bool		publication_names_given = false;
 	bool		binary_option_given = false;
+	bool		messages_option_given = false;
 	bool		streaming_given = false;
 
 	*binary = false;
+	*messages = false;
 
 	foreach(lc, options)
 	{
@@ -222,6 +229,16 @@ parse_output_parameters(List *options, uint32 *protocol_version,
 
 			*binary = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "messages") == 0)
+		{
+			if (messages_option_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			messages_option_given = true;
+
+			*messages = defGetBoolean(defel);
+		}
 		else if (strcmp(defel->defname, "streaming") == 0)
 		{
 			if (streaming_given)
@@ -269,6 +286,7 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 								&data->protocol_version,
 								&data->publication_names,
 								&data->binary,
+								&data->messages,
 								&enable_streaming);
 
 		/* Check if we support requested protocol */
@@ -683,6 +701,26 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	MemoryContextReset(data->context);
 }
 
+static void
+pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+				 XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
+				 const char *message)
+{
+	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+	if (!data->messages)
+		return;
+
+	OutputPluginPrepareWrite(ctx, true);
+	logicalrep_write_message(ctx->out,
+							 txn,
+							 message_lsn,
+							 transactional,
+							 prefix,
+							 sz,
+							 message);
+	OutputPluginWrite(ctx, true);
+}
+
 /*
  * Currently we always forward.
  */
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 1f2535df80..f3c8f95e2c 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -54,6 +54,7 @@ typedef enum LogicalRepMsgType
 	LOGICAL_REP_MSG_TRUNCATE = 'T',
 	LOGICAL_REP_MSG_RELATION = 'R',
 	LOGICAL_REP_MSG_TYPE = 'Y',
+	LOGICAL_REP_MSG_MESSAGE = 'M',
 	LOGICAL_REP_MSG_STREAM_START = 'S',
 	LOGICAL_REP_MSG_STREAM_END = 'E',
 	LOGICAL_REP_MSG_STREAM_COMMIT = 'c',
@@ -151,6 +152,8 @@ extern void logicalrep_write_truncate(StringInfo out, TransactionId xid,
 									  bool cascade, bool restart_seqs);
 extern List *logicalrep_read_truncate(StringInfo in,
 									  bool *cascade, bool *restart_seqs);
+extern void logicalrep_write_message(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr lsn,
+									 bool transactional, const char *prefix, Size sz, const char *message);
 extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
 								 Relation rel);
 extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index a8c676ed23..3b7273bd89 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -25,6 +25,7 @@ typedef struct PGOutputData
 	List	   *publication_names;
 	List	   *publications;
 	bool		binary;
+	bool		messages;
 } PGOutputData;
 
 #endif							/* PGOUTPUT_H */
diff --git a/src/test/subscription/t/020_messages.pl b/src/test/subscription/t/020_messages.pl
new file mode 100644
index 0000000000..8e59e324e3
--- /dev/null
+++ b/src/test/subscription/t/020_messages.pl
@@ -0,0 +1,158 @@
+# Tests that logical decoding messages are emitted and that
+# they do not break subscribers
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 7;
+
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+#
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab (a int PRIMARY KEY)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab (a int PRIMARY KEY)");
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION pub FOR TABLE tab");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub"
+);
+
+# ensure a transactional logical decoding message shows up on the slot
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub DISABLE");
+
+$node_publisher->safe_psql('postgres',
+	"select pg_logical_emit_message(true, 'a prefix', 'a transactional message')"
+);
+
+my $slot_codes_with_message = $node_publisher->safe_psql(
+	'postgres', qq(
+		select get_byte(data, 0)
+		from pg_logical_slot_peek_binary_changes('sub', NULL, NULL,
+			'proto_version', '1',
+			'publication_names', 'pub',
+			'messages', 'true')
+));
+
+# 66 77 67 == B M C == BEGIN MESSAGE COMMIT
+is($slot_codes_with_message, "66\n77\n67",
+	'messages on slot are B M C with message option');
+
+my $transactional_message_flags = $node_publisher->safe_psql(
+	'postgres', qq(
+		select get_byte(data, 1)
+		from pg_logical_slot_peek_binary_changes('sub', NULL, NULL,
+			'proto_version', '1',
+			'publication_names', 'pub',
+			'messages', 'true')
+		offset 1 limit 1
+));
+
+is($transactional_message_flags, "1",
+	"transactional message flags are set to 1");
+
+my $slot_codes_without_message = $node_publisher->safe_psql(
+	'postgres', qq(
+		select get_byte(data, 0)
+		from pg_logical_slot_peek_binary_changes('sub', NULL, NULL,
+			'proto_version', '1',
+			'publication_names', 'pub')
+));
+
+# 66 67 == B C == BEGIN COMMIT
+is($slot_codes_without_message, "66\n67",
+	'messages on slot are B C without message option');
+
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
+$node_publisher->wait_for_catchup('sub');
+
+# ensure a non-transactional logical decoding message shows up on the slot
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub DISABLE");
+
+$node_publisher->safe_psql('postgres', "INSERT INTO tab VALUES (3)");
+
+my $message_lsn = $node_publisher->safe_psql('postgres',
+	"select pg_logical_emit_message(false, 'prefix', 'nontransactional')");
+
+$node_publisher->safe_psql('postgres', "INSERT INTO tab VALUES (4)");
+
+my $slot_message_code = $node_publisher->safe_psql(
+	'postgres', qq(
+		select get_byte(data, 0)
+		from pg_logical_slot_peek_binary_changes('sub', NULL, NULL,
+			'proto_version', '1',
+			'publication_names', 'pub',
+			'messages', 'true')
+		where lsn = '$message_lsn' and xid = 0
+));
+
+is($slot_message_code, "77", "non-transactional message on slot is M");
+
+my $nontransactional_message_flags = $node_publisher->safe_psql(
+	'postgres', qq(
+		select get_byte(data, 1)
+		from pg_logical_slot_peek_binary_changes('sub', NULL, NULL,
+			'proto_version', '1',
+			'publication_names', 'pub',
+			'messages', 'true')
+		offset 1 limit 1
+));
+
+is($nontransactional_message_flags,
+	"0", "non-transactional message flags are set to 0");
+
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
+$node_publisher->wait_for_catchup('sub');
+
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub DISABLE");
+$node_subscriber->safe_psql('postgres', "checkpoint;");
+
+# wait for the replication connection to drop from the publisher
+$node_publisher->poll_query_until('postgres',
+	'SELECT count(*) from pg_catalog.pg_stat_replication', 0);
+
+# ensure a non-transactional logical decoding message shows up on the slot
+# when it is emitted within an aborted transaction. the message won't emit
+# until something advances the LSN, which we intentionally do here with a
+# checkpoint.
+$node_publisher->safe_psql(
+	'postgres', qq(
+		BEGIN;
+		SELECT pg_logical_emit_message(false, 'prefix',
+			'nontransactional aborted 1');
+		INSERT INTO tab VALUES (5);
+		SELECT pg_logical_emit_message(false, 'prefix',
+			'nontransactional aborted 2');
+		ROLLBACK;
+		CHECKPOINT;
+));
+
+my $aborted_txn_message_codes = $node_publisher->safe_psql(
+	'postgres', qq(
+		select get_byte(data, 0)
+		from pg_logical_slot_peek_binary_changes('sub', NULL, NULL,
+			'proto_version', '1',
+			'publication_names', 'pub',
+			'messages', 'true')
+));
+
+is($aborted_txn_message_codes, "77\n77",
+	"non-transactional message on slot from aborted transaction is M");
+
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
+$node_publisher->wait_for_catchup('sub');
+
+my $result =
+	$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab");
+is($result, qq(2), 'rows move');
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
-- 
2.20.1

