Any objections to implementing LogicalDecodeMessageCB for pgoutput?

Started by Dave Cramerover 5 years ago18 messages
#1Dave Cramer
davecramer@gmail.com

For logical replication there is no need to implement this, but others are
using the pgoutput plugin for Change Data Capture. The reason they are
using pgoutput is because it is guaranteed to be available as it is in core
postgres.

Implementing LogicalDecodeMessageCB provides some synchronization facility
that is not easily replicated.

Thoughts ?

Dave Cramer

#2Andres Freund
andres@anarazel.de
In reply to: Dave Cramer (#1)
Re: Any objections to implementing LogicalDecodeMessageCB for pgoutput?

Hi,

On 2020-07-24 11:33:52 -0400, Dave Cramer wrote:

For logical replication there is no need to implement this, but others are
using the pgoutput plugin for Change Data Capture. The reason they are
using pgoutput is because it is guaranteed to be available as it is in core
postgres.

Implementing LogicalDecodeMessageCB provides some synchronization facility
that is not easily replicated.

It's definitely useful. Probably needs to be parameter that signals
whether they should be sent out?

Greetings,

Andres Freund

#3David Pirotte
dpirotte@gmail.com
In reply to: Dave Cramer (#1)
1 attachment(s)
Re: Any objections to implementing LogicalDecodeMessageCB for pgoutput?

On Wed, Jul 29, 2020 at 9:41 PM Dave Cramer <davecramer@gmail.com> wrote:

For logical replication there is no need to implement this, but others are
using the pgoutput plugin for Change Data Capture. The reason they are
using pgoutput is because it is guaranteed to be available as it is in core
postgres.

Implementing LogicalDecodeMessageCB provides some synchronization facility
that is not easily replicated.

Thoughts ?

Attached is a draft patch that adds this functionality into the pgoutput
plugin. A slot consumer can pass 'messages' as an option to include
logical messages from pg_logical_emit_message in the replication flow.

FWIW, we have been using pg_logical_emit_message to send application-level
events alongside our change-data-capture for about two years, and we would
move this part of our stack to pgoutput if message support was available.

Looking forward to discussion and feedback.

Cheers,
Dave

Attachments:

0001-Add-logical-decoding-messages-to-pgoutput.patch.gzapplication/gzip; name=0001-Add-logical-decoding-messages-to-pgoutput.patch.gzDownload
#4Cary Huang
cary.huang@highgo.ca
In reply to: David Pirotte (#3)
Re: Any objections to implementing LogicalDecodeMessageCB for pgoutput?

The following review has been posted through the commitfest application:
make installcheck-world: tested, passed
Implements feature: tested, passed
Spec compliant: tested, passed
Documentation: tested, passed

Hi

I have tried the patch and it functions as described. The attached tap test case is comprehensive and is passing. However, the patch does not apply well on the current master; I had to checkout to a much earlier commit to be able to patch correctly. The patch will need to be rebased to the current master.

Thanks

Cary Huang
-------------
HighGo Software Inc. (Canada)
cary.huang@highgo.ca
www.highgo.ca

#5Andres Freund
andres@anarazel.de
In reply to: David Pirotte (#3)
Re: Any objections to implementing LogicalDecodeMessageCB for pgoutput?

Hi,

On 2020-07-29 22:26:04 -0500, David Pirotte wrote:

FWIW, we have been using pg_logical_emit_message to send application-level
events alongside our change-data-capture for about two years, and we would
move this part of our stack to pgoutput if message support was available.

Yea, it's really useful for this kind of thing.

@@ -119,14 +124,16 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)

static void
parse_output_parameters(List *options, uint32 *protocol_version,
-						List **publication_names, bool *binary)
+						List **publication_names, bool *binary, bool *messages)

I think it might be time to add a PgOutputParameters struct, instead of
adding more and more output parameters to
parse_output_parameters. Alternatively just passing PGOutputData owuld
make sense.

diff --git a/src/test/subscription/t/015_messages.pl b/src/test/subscription/t/015_messages.pl
new file mode 100644
index 0000000000..4709e69f4e
--- /dev/null
+++ b/src/test/subscription/t/015_messages.pl

A test verifying that a non-transactional message in later aborted
transaction is handled correctly would be good.

Greetings,

Andres Freund

#6Michael Paquier
michael@paquier.xyz
In reply to: Andres Freund (#5)
Re: Any objections to implementing LogicalDecodeMessageCB for pgoutput?

On Tue, Sep 08, 2020 at 12:18:23PM -0700, Andres Freund wrote:

A test verifying that a non-transactional message in later aborted
transaction is handled correctly would be good.

On top of that, the patch needs a rebase as it visibly fails to apply,
per the CF bot.
--
Michael

#7Dave Cramer
davecramer@gmail.com
In reply to: Michael Paquier (#6)
Re: Any objections to implementing LogicalDecodeMessageCB for pgoutput?

David,

On Thu, 24 Sep 2020 at 00:22, Michael Paquier <michael@paquier.xyz> wrote:

On Tue, Sep 08, 2020 at 12:18:23PM -0700, Andres Freund wrote:

A test verifying that a non-transactional message in later aborted
transaction is handled correctly would be good.

On top of that, the patch needs a rebase as it visibly fails to apply,
per the CF bot.
--
Michael

Where are you with this? Are you able to work on it ?
Dave Cramer

#8David Pirotte
dpirotte@gmail.com
In reply to: Dave Cramer (#7)
1 attachment(s)
Re: Any objections to implementing LogicalDecodeMessageCB for pgoutput?

On Tue, Nov 3, 2020 at 7:19 AM Dave Cramer <davecramer@gmail.com> wrote:

David,

On Thu, 24 Sep 2020 at 00:22, Michael Paquier <michael@paquier.xyz> wrote:

On Tue, Sep 08, 2020 at 12:18:23PM -0700, Andres Freund wrote:

A test verifying that a non-transactional message in later aborted
transaction is handled correctly would be good.

On top of that, the patch needs a rebase as it visibly fails to apply,
per the CF bot.
--
Michael

Where are you with this? Are you able to work on it ?
Dave Cramer

Apologies for the delay, here.

I've attached v2 of this patch which applies cleanly to master. The patch
also now includes a test demonstrating that pg_logical_emit_message
correctly sends non-transactional messages when called inside a transaction
that is rolled back. (Thank you, Andres, for this suggestion.) The only
other change is that I added this new message type into the
LogicalRepMsgType enum added earlier this week.

Let me know what you think.

Cheers,
Dave

Attachments:

v2-0001-Add-logical-decoding-messages-to-pgoutput.patch.gzapplication/gzip; name=v2-0001-Add-logical-decoding-messages-to-pgoutput.patch.gzDownload
#9Ashutosh Bapat
ashutosh.bapat.oss@gmail.com
In reply to: David Pirotte (#8)
Re: Any objections to implementing LogicalDecodeMessageCB for pgoutput?

On Thu, Nov 5, 2020 at 9:16 AM David Pirotte <dpirotte@gmail.com> wrote:

On Tue, Nov 3, 2020 at 7:19 AM Dave Cramer <davecramer@gmail.com> wrote:

David,

On Thu, 24 Sep 2020 at 00:22, Michael Paquier <michael@paquier.xyz> wrote:

On Tue, Sep 08, 2020 at 12:18:23PM -0700, Andres Freund wrote:

A test verifying that a non-transactional message in later aborted
transaction is handled correctly would be good.

On top of that, the patch needs a rebase as it visibly fails to apply,
per the CF bot.
--
Michael

Where are you with this? Are you able to work on it ?
Dave Cramer

Apologies for the delay, here.

I've attached v2 of this patch which applies cleanly to master. The patch also now includes a test demonstrating that pg_logical_emit_message correctly sends non-transactional messages when called inside a transaction that is rolled back. (Thank you, Andres, for this suggestion.) The only other change is that I added this new message type into the LogicalRepMsgType enum added earlier this week.

Let me know what you think.

This feature looks useful. Here are some comments.

+/*
+ * Write MESSAGE to stream
+ */
+void
+logicalrep_write_message(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr lsn,
+                        bool transactional, const char *prefix, Size sz,
+                        const char *message)
+{
+   uint8       flags = 0;
+
+   pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE);
+

Similar to the UPDATE/DELETE/INSERT records decoded when streaming is being
used, we need to add transaction id for transactional messages. May be we add
that even in case of non-streaming case and use it to decide whether it's a
transactional message or not. That might save us a byte when we are adding a
transaction id.

+   /* encode and send message flags */
+   if (transactional)
+       flags |= MESSAGE_TRANSACTIONAL;
+
+   pq_sendint8(out, flags);

Is 8 bits enough considering future improvements? What if we need to use more
than 8 bit flags?

@@ -1936,6 +1936,9 @@ apply_dispatch(StringInfo s)
apply_handle_origin(s);
return;

+ case LOGICAL_REP_MSG_MESSAGE:

Should we add the logical message to the WAL downstream so that it flows
further down to a cascaded logical replica. Should that be controlled
by an option?

--
Best Wishes,
Ashutosh Bapat

#10David Pirotte
dpirotte@gmail.com
In reply to: Ashutosh Bapat (#9)
2 attachment(s)
Re: Any objections to implementing LogicalDecodeMessageCB for pgoutput?

On Fri, Nov 6, 2020 at 7:05 AM Ashutosh Bapat <ashutosh.bapat.oss@gmail.com>
wrote:

+/*
+ * Write MESSAGE to stream
+ */
+void
+logicalrep_write_message(StringInfo out, ReorderBufferTXN *txn,
XLogRecPtr lsn,
+                        bool transactional, const char *prefix, Size sz,
+                        const char *message)
+{
+   uint8       flags = 0;
+
+   pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE);
+

Similar to the UPDATE/DELETE/INSERT records decoded when streaming is being
used, we need to add transaction id for transactional messages. May be we
add
that even in case of non-streaming case and use it to decide whether it's a
transactional message or not. That might save us a byte when we are adding
a
transaction id.

My preference is to add in the xid when streaming is enabled. (1) It is a
more consistent implementation with the other message types, and (2) it
saves 3 bytes when streaming is disabled. I've attached an updated patch.
It is not a strong preference, though, if you suggest a different approach.

+   /* encode and send message flags */
+   if (transactional)
+       flags |= MESSAGE_TRANSACTIONAL;
+
+   pq_sendint8(out, flags);

Is 8 bits enough considering future improvements? What if we need to use
more
than 8 bit flags?

8 possible flags already sounds like a lot, here, so I suspect that a byte
will be sufficient for the foreseeable future. If we needed to go beyond
that, it'd be a protocol version bump. (Well, it might first warrant
reflection as to why we had so many flags...)

@@ -1936,6 +1936,9 @@ apply_dispatch(StringInfo s)
apply_handle_origin(s);
return;

+ case LOGICAL_REP_MSG_MESSAGE:

Should we add the logical message to the WAL downstream so that it flows
further down to a cascaded logical replica. Should that be controlled
by an option?

Hmm, I can't think of a use case for this, but perhaps someone could. Do
you, or does anyone, have something in mind? I think we provide a lot of
value with logical messages in pgoutput without supporting consumption from
a downstream replica, so perhaps this is better considered separately.

If we want this, I think we would add a "messages" option on the
subscription. If present, the subscriber will receive messages and pass
them to any downstream subscribers. I started working on this and it does
expand the change's footprint. As is, a developer would consume messages by
connecting to a pgoutput slot on the message's origin. (e.g. via Debezium
or a custom client) The subscription and logical worker infrastructure
don't know about messages, but they would need to in order to support
consuming an origin's messages on a downstream logical replica. In
any case, I'll keep working on it so we can see what it looks like.

Cheers,
Dave

Attachments:

v2-0002-Add-xid-to-messages-when-streaming.patch.gzapplication/x-gzip; name=v2-0002-Add-xid-to-messages-when-streaming.patch.gzDownload
v2-0001-Add-logical-decoding-messages-to-pgoutput.patch.gzapplication/x-gzip; name=v2-0001-Add-logical-decoding-messages-to-pgoutput.patch.gzDownload
#11Euler Taveira
euler.taveira@2ndquadrant.com
In reply to: David Pirotte (#10)
6 attachment(s)
Re: Any objections to implementing LogicalDecodeMessageCB for pgoutput?

On Wed, 18 Nov 2020 at 03:04, David Pirotte <dpirotte@gmail.com> wrote:

On Fri, Nov 6, 2020 at 7:05 AM Ashutosh Bapat <
ashutosh.bapat.oss@gmail.com> wrote:

+/*
+ * Write MESSAGE to stream
+ */
+void
+logicalrep_write_message(StringInfo out, ReorderBufferTXN *txn,
XLogRecPtr lsn,
+                        bool transactional, const char *prefix, Size sz,
+                        const char *message)
+{
+   uint8       flags = 0;
+
+   pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE);
+

Similar to the UPDATE/DELETE/INSERT records decoded when streaming is
being
used, we need to add transaction id for transactional messages. May be we
add
that even in case of non-streaming case and use it to decide whether it's
a
transactional message or not. That might save us a byte when we are
adding a
transaction id.

I also reviewed your patch. This feature would be really useful for

replication
scenarios. Supporting this feature means that you don't need to use a table
to
pass messages from one node to another one. Here are a few comments/ideas.

@@ -1936,6 +1936,9 @@ apply_dispatch(StringInfo s)
apply_handle_origin(s);
return;

+       case LOGICAL_REP_MSG_MESSAGE:
+           return;
+

I added a comment explaining that this message is not used by logical
replication but it could possibly be useful for other applications using
pgoutput. See patch 0003.

Andres mentioned in this thread [1]/messages/by-id/20200908191823.pmsoobzearkrmtg4@alap3.anarazel.de that we could simplify the
parse_output_parameters. I refactored this function to pass only
PGOutputData
to it and also move enable_streaming to this struct. I use a similar
approach
in wal2json; it is easier to get the options since it is available in the
logical decoding context. See patch 0004.

My preference is to add in the xid when streaming is enabled. (1) It is a
more consistent implementation with the other message types, and (2) it
saves 3 bytes when streaming is disabled. I've attached an updated patch.
It is not a strong preference, though, if you suggest a different approach.

I agree with this approach. xid is available in the BEGIN message if the
MESSAGE is transactional. For non-transactional messages, xid is not
available.
Your implementation is not consistent with the other pgoutput_XXX functions
that check in_streaming in the pgoutput_XXX and pass parameters to other
functions that require it. See patch 005.

The last patch 0006 overhauls your tests. I added/changed some comments,
replaced identifiers with uppercase letters, used 'pgoutput' as prefix,
checked
the prefix, and avoided a checkpoint during the test. There are possibly
other
improvements that I didn't mention here. Maybe you can use
encode(substr(data,
1, 1), 'escape') instead of comparing the ASCII code (77).

Should we add the logical message to the WAL downstream so that it flows

further down to a cascaded logical replica. Should that be controlled

by an option?

Hmm, I can't think of a use case for this, but perhaps someone could. Do
you, or does anyone, have something in mind? I think we provide a lot of
value with logical messages in pgoutput without supporting consumption from
a downstream replica, so perhaps this is better considered separately.

If we want this, I think we would add a "messages" option on the
subscription. If present, the subscriber will receive messages and pass
them to any downstream subscribers. I started working on this and it does
expand the change's footprint. As is, a developer would consume messages by
connecting to a pgoutput slot on the message's origin. (e.g. via Debezium
or a custom client) The subscription and logical worker infrastructure
don't know about messages, but they would need to in order to support
consuming an origin's messages on a downstream logical replica. In
any case, I'll keep working on it so we can see what it looks like.

The decision to send received messages to downstream nodes should be made

by
the subscriber. If the subscriber wants to replicate messages to downstream
nodes, the worker should call LogLogicalMessage.

This does not belong to this patch but when/if this patch is committed, I
will
submit a patch to filter messages by prefix. wal2json has a similar
(filter-msg-prefixes / add-msg-prefixes) feature and it is useful for cases
where you are handling multiple output plugins like wal2json and pgoutput.
The
idea is to avoid sending useless messages to some node that (i) don't know
how
to process it and (ii) has no interest in it.

PS> I'm attaching David's patches (0001 and 0002) again to keep cfbot happy.

[1]: /messages/by-id/20200908191823.pmsoobzearkrmtg4@alap3.anarazel.de
/messages/by-id/20200908191823.pmsoobzearkrmtg4@alap3.anarazel.de

--
Euler Taveira http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services

Attachments:

0001-Add-logical-decoding-messages-to-pgoutput.patchtext/x-patch; charset=US-ASCII; name=0001-Add-logical-decoding-messages-to-pgoutput.patchDownload
From 18a619617c2cc19d0e304e33fb11e72d23731061 Mon Sep 17 00:00:00 2001
From: Dave Pirotte <dpirotte@gmail.com>
Date: Thu, 5 Nov 2020 03:14:54 +0000
Subject: [PATCH 1/6] Add logical decoding messages to pgoutput

This patch adds a "messages" option to the pgoutput output plugin. When
"messages" is true, logical decoding messages (i.e. generated via
pg_logical_emit_message) are sent to the slot consumer.
---
 doc/src/sgml/protocol.sgml                  |  65 ++++++++
 src/backend/replication/logical/proto.c     |  24 +++
 src/backend/replication/logical/worker.c    |   3 +
 src/backend/replication/pgoutput/pgoutput.c |  40 ++++-
 src/include/replication/logicalproto.h      |   3 +
 src/include/replication/pgoutput.h          |   1 +
 src/test/subscription/t/020_messages.pl     | 158 ++++++++++++++++++++
 7 files changed, 293 insertions(+), 1 deletion(-)
 create mode 100644 src/test/subscription/t/020_messages.pl

diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index cee28889e1..02449bf792 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -6413,6 +6413,71 @@ Begin
 </listitem>
 </varlistentry>
 
+<varlistentry>
+<term>
+Message
+</term>
+<listitem>
+<para>
+
+<variablelist>
+<varlistentry>
+<term>
+        Byte1('M')
+</term>
+<listitem>
+<para>
+                Identifies the message as a logical decoding message.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Int8
+</term>
+<listitem>
+<para>
+                Flags; Either 0 for no flags or 1 if the logical decoding
+                message is transactional.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Int64
+</term>
+<listitem>
+<para>
+                The LSN of the logical decoding message.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        String
+</term>
+<listitem>
+<para>
+                The prefix of the logical decoding message.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Byte<replaceable>n</replaceable>
+</term>
+<listitem>
+<para>
+                The content of the logical decoding message.
+</para>
+</listitem>
+</varlistentry>
+
+</variablelist>
+</para>
+</listitem>
+</varlistentry>
+
 <varlistentry>
 <term>
 Commit
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index fdb31182d7..deba2a321c 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -25,6 +25,7 @@
  */
 #define LOGICALREP_IS_REPLICA_IDENTITY 1
 
+#define MESSAGE_TRANSACTIONAL (1<<0)
 #define TRUNCATE_CASCADE		(1<<0)
 #define TRUNCATE_RESTART_SEQS	(1<<1)
 
@@ -361,6 +362,29 @@ logicalrep_read_truncate(StringInfo in,
 	return relids;
 }
 
+/*
+ * Write MESSAGE to stream
+ */
+void
+logicalrep_write_message(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr lsn,
+						 bool transactional, const char *prefix, Size sz,
+						 const char *message)
+{
+	uint8		flags = 0;
+
+	pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE);
+
+	/* encode and send message flags */
+	if (transactional)
+		flags |= MESSAGE_TRANSACTIONAL;
+
+	pq_sendint8(out, flags);
+	pq_sendint64(out, lsn);
+	pq_sendstring(out, prefix);
+	pq_sendint32(out, sz);
+	pq_sendbytes(out, message, sz);
+}
+
 /*
  * Write relation description to the output stream.
  */
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 04684912de..6890603622 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1936,6 +1936,9 @@ apply_dispatch(StringInfo s)
 			apply_handle_origin(s);
 			return;
 
+		case LOGICAL_REP_MSG_MESSAGE:
+			return;
+
 		case LOGICAL_REP_MSG_STREAM_START:
 			apply_handle_stream_start(s);
 			return;
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 9c997aed83..cd849c10a4 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -45,6 +45,10 @@ static void pgoutput_change(LogicalDecodingContext *ctx,
 static void pgoutput_truncate(LogicalDecodingContext *ctx,
 							  ReorderBufferTXN *txn, int nrelations, Relation relations[],
 							  ReorderBufferChange *change);
+static void pgoutput_message(LogicalDecodingContext *ctx,
+							 ReorderBufferTXN *txn, XLogRecPtr message_lsn,
+							 bool transactional, const char *prefix,
+							 Size sz, const char *message);
 static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
 								   RepOriginId origin_id);
 static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
@@ -142,6 +146,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->begin_cb = pgoutput_begin_txn;
 	cb->change_cb = pgoutput_change;
 	cb->truncate_cb = pgoutput_truncate;
+	cb->message_cb = pgoutput_message;
 	cb->commit_cb = pgoutput_commit_txn;
 	cb->filter_by_origin_cb = pgoutput_origin_filter;
 	cb->shutdown_cb = pgoutput_shutdown;
@@ -158,15 +163,17 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 static void
 parse_output_parameters(List *options, uint32 *protocol_version,
 						List **publication_names, bool *binary,
-						bool *enable_streaming)
+						bool *messages, bool *enable_streaming)
 {
 	ListCell   *lc;
 	bool		protocol_version_given = false;
 	bool		publication_names_given = false;
 	bool		binary_option_given = false;
+	bool		messages_option_given = false;
 	bool		streaming_given = false;
 
 	*binary = false;
+	*messages = false;
 
 	foreach(lc, options)
 	{
@@ -222,6 +229,16 @@ parse_output_parameters(List *options, uint32 *protocol_version,
 
 			*binary = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "messages") == 0)
+		{
+			if (messages_option_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			messages_option_given = true;
+
+			*messages = defGetBoolean(defel);
+		}
 		else if (strcmp(defel->defname, "streaming") == 0)
 		{
 			if (streaming_given)
@@ -269,6 +286,7 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 								&data->protocol_version,
 								&data->publication_names,
 								&data->binary,
+								&data->messages,
 								&enable_streaming);
 
 		/* Check if we support requested protocol */
@@ -683,6 +701,26 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	MemoryContextReset(data->context);
 }
 
+static void
+pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+				 XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
+				 const char *message)
+{
+	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+	if (!data->messages)
+		return;
+
+	OutputPluginPrepareWrite(ctx, true);
+	logicalrep_write_message(ctx->out,
+							 txn,
+							 message_lsn,
+							 transactional,
+							 prefix,
+							 sz,
+							 message);
+	OutputPluginWrite(ctx, true);
+}
+
 /*
  * Currently we always forward.
  */
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 1f2535df80..f3c8f95e2c 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -54,6 +54,7 @@ typedef enum LogicalRepMsgType
 	LOGICAL_REP_MSG_TRUNCATE = 'T',
 	LOGICAL_REP_MSG_RELATION = 'R',
 	LOGICAL_REP_MSG_TYPE = 'Y',
+	LOGICAL_REP_MSG_MESSAGE = 'M',
 	LOGICAL_REP_MSG_STREAM_START = 'S',
 	LOGICAL_REP_MSG_STREAM_END = 'E',
 	LOGICAL_REP_MSG_STREAM_COMMIT = 'c',
@@ -151,6 +152,8 @@ extern void logicalrep_write_truncate(StringInfo out, TransactionId xid,
 									  bool cascade, bool restart_seqs);
 extern List *logicalrep_read_truncate(StringInfo in,
 									  bool *cascade, bool *restart_seqs);
+extern void logicalrep_write_message(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr lsn,
+									 bool transactional, const char *prefix, Size sz, const char *message);
 extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
 								 Relation rel);
 extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index a8c676ed23..3b7273bd89 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -25,6 +25,7 @@ typedef struct PGOutputData
 	List	   *publication_names;
 	List	   *publications;
 	bool		binary;
+	bool		messages;
 } PGOutputData;
 
 #endif							/* PGOUTPUT_H */
diff --git a/src/test/subscription/t/020_messages.pl b/src/test/subscription/t/020_messages.pl
new file mode 100644
index 0000000000..8e59e324e3
--- /dev/null
+++ b/src/test/subscription/t/020_messages.pl
@@ -0,0 +1,158 @@
+# Tests that logical decoding messages are emitted and that
+# they do not break subscribers
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 7;
+
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+#
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab (a int PRIMARY KEY)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab (a int PRIMARY KEY)");
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION pub FOR TABLE tab");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub"
+);
+
+# ensure a transactional logical decoding message shows up on the slot
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub DISABLE");
+
+$node_publisher->safe_psql('postgres',
+	"select pg_logical_emit_message(true, 'a prefix', 'a transactional message')"
+);
+
+my $slot_codes_with_message = $node_publisher->safe_psql(
+	'postgres', qq(
+		select get_byte(data, 0)
+		from pg_logical_slot_peek_binary_changes('sub', NULL, NULL,
+			'proto_version', '1',
+			'publication_names', 'pub',
+			'messages', 'true')
+));
+
+# 66 77 67 == B M C == BEGIN MESSAGE COMMIT
+is($slot_codes_with_message, "66\n77\n67",
+	'messages on slot are B M C with message option');
+
+my $transactional_message_flags = $node_publisher->safe_psql(
+	'postgres', qq(
+		select get_byte(data, 1)
+		from pg_logical_slot_peek_binary_changes('sub', NULL, NULL,
+			'proto_version', '1',
+			'publication_names', 'pub',
+			'messages', 'true')
+		offset 1 limit 1
+));
+
+is($transactional_message_flags, "1",
+	"transactional message flags are set to 1");
+
+my $slot_codes_without_message = $node_publisher->safe_psql(
+	'postgres', qq(
+		select get_byte(data, 0)
+		from pg_logical_slot_peek_binary_changes('sub', NULL, NULL,
+			'proto_version', '1',
+			'publication_names', 'pub')
+));
+
+# 66 67 == B C == BEGIN COMMIT
+is($slot_codes_without_message, "66\n67",
+	'messages on slot are B C without message option');
+
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
+$node_publisher->wait_for_catchup('sub');
+
+# ensure a non-transactional logical decoding message shows up on the slot
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub DISABLE");
+
+$node_publisher->safe_psql('postgres', "INSERT INTO tab VALUES (3)");
+
+my $message_lsn = $node_publisher->safe_psql('postgres',
+	"select pg_logical_emit_message(false, 'prefix', 'nontransactional')");
+
+$node_publisher->safe_psql('postgres', "INSERT INTO tab VALUES (4)");
+
+my $slot_message_code = $node_publisher->safe_psql(
+	'postgres', qq(
+		select get_byte(data, 0)
+		from pg_logical_slot_peek_binary_changes('sub', NULL, NULL,
+			'proto_version', '1',
+			'publication_names', 'pub',
+			'messages', 'true')
+		where lsn = '$message_lsn' and xid = 0
+));
+
+is($slot_message_code, "77", "non-transactional message on slot is M");
+
+my $nontransactional_message_flags = $node_publisher->safe_psql(
+	'postgres', qq(
+		select get_byte(data, 1)
+		from pg_logical_slot_peek_binary_changes('sub', NULL, NULL,
+			'proto_version', '1',
+			'publication_names', 'pub',
+			'messages', 'true')
+		offset 1 limit 1
+));
+
+is($nontransactional_message_flags,
+	"0", "non-transactional message flags are set to 0");
+
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
+$node_publisher->wait_for_catchup('sub');
+
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub DISABLE");
+$node_subscriber->safe_psql('postgres', "checkpoint;");
+
+# wait for the replication connection to drop from the publisher
+$node_publisher->poll_query_until('postgres',
+	'SELECT count(*) from pg_catalog.pg_stat_replication', 0);
+
+# ensure a non-transactional logical decoding message shows up on the slot
+# when it is emitted within an aborted transaction. the message won't emit
+# until something advances the LSN, which we intentionally do here with a
+# checkpoint.
+$node_publisher->safe_psql(
+	'postgres', qq(
+		BEGIN;
+		SELECT pg_logical_emit_message(false, 'prefix',
+			'nontransactional aborted 1');
+		INSERT INTO tab VALUES (5);
+		SELECT pg_logical_emit_message(false, 'prefix',
+			'nontransactional aborted 2');
+		ROLLBACK;
+		CHECKPOINT;
+));
+
+my $aborted_txn_message_codes = $node_publisher->safe_psql(
+	'postgres', qq(
+		select get_byte(data, 0)
+		from pg_logical_slot_peek_binary_changes('sub', NULL, NULL,
+			'proto_version', '1',
+			'publication_names', 'pub',
+			'messages', 'true')
+));
+
+is($aborted_txn_message_codes, "77\n77",
+	"non-transactional message on slot from aborted transaction is M");
+
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
+$node_publisher->wait_for_catchup('sub');
+
+my $result =
+	$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab");
+is($result, qq(2), 'rows move');
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
-- 
2.20.1

0003-Explain-why-this-message-is-ignored.patchtext/x-patch; charset=US-ASCII; name=0003-Explain-why-this-message-is-ignored.patchDownload
From 520eeba6d022cbd0b5fa1a7f54971ecba57fff2d Mon Sep 17 00:00:00 2001
From: Euler Taveira <euler.taveira@enterprisedb.com>
Date: Mon, 16 Nov 2020 16:19:44 -0300
Subject: [PATCH 3/6] Explain why this message is ignored

This message is ignored in the logical replication worker. However, it
could be used by applications that use pgoutput as output plugin.
---
 src/backend/replication/logical/worker.c | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 6890603622..452eb02600 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1937,6 +1937,12 @@ apply_dispatch(StringInfo s)
 			return;
 
 		case LOGICAL_REP_MSG_MESSAGE:
+
+			/*
+			 * Logical replication does not use generic logical messages yet.
+			 * Although, it could be used by other applications that use this
+			 * output plugin.
+			 */
 			return;
 
 		case LOGICAL_REP_MSG_STREAM_START:
-- 
2.20.1

0005-Adjust-in_streaming-for-messages.patchtext/x-patch; charset=US-ASCII; name=0005-Adjust-in_streaming-for-messages.patchDownload
From 5a39e28d9508d5ad7c3759100da5d64d5ff9c08e Mon Sep 17 00:00:00 2001
From: Euler Taveira <euler.taveira@enterprisedb.com>
Date: Thu, 19 Nov 2020 20:14:08 -0300
Subject: [PATCH 5/6] Adjust in_streaming for messages

Use the same pattern as pgoutput_XXX functions, e.g., check in_streaming
into pgoutput_message and then pass xid as argument to
logicalrep_write_message.

Since ReorderBufferTXN is not used in logicalrep_write_message, remove
it.
---
 src/backend/replication/logical/proto.c     | 16 ++++------------
 src/backend/replication/pgoutput/pgoutput.c | 11 +++++++++--
 src/include/replication/logicalproto.h      |  3 +--
 3 files changed, 14 insertions(+), 16 deletions(-)

diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index d5eeee4784..a598658a8d 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -366,28 +366,20 @@ logicalrep_read_truncate(StringInfo in,
  * Write MESSAGE to stream
  */
 void
-logicalrep_write_message(StringInfo out,
-						 bool in_streaming,
-						 ReorderBufferTXN *txn,
-						 XLogRecPtr lsn,
-						 bool transactional,
-						 const char *prefix,
-						 Size sz,
+logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
+						 bool transactional, const char *prefix, Size sz,
 						 const char *message)
 {
 	uint8		flags = 0;
-	TransactionId xid = InvalidTransactionId;
 
 	pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE);
 
 	/* encode and send message flags */
 	if (transactional)
-	{
 		flags |= MESSAGE_TRANSACTIONAL;
-		xid = txn->xid;
-	}
 
-	if (in_streaming)
+	/* transaction ID (if not valid, we're not streaming) */
+	if (TransactionIdIsValid(xid))
 		pq_sendint32(out, xid);
 
 	pq_sendint8(out, flags);
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index b25e67edcb..f9bc15da40 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -700,14 +700,21 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				 const char *message)
 {
 	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+	TransactionId xid = InvalidTransactionId;
 
 	if (!data->messages)
 		return;
 
+	/*
+	 * Remember the xid for the message in streaming mode. See
+	 * pgoutput_change.
+	 */
+	if (in_streaming)
+		xid = txn->xid;
+
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_message(ctx->out,
-							 in_streaming,
-							 txn,
+							 xid,
 							 message_lsn,
 							 transactional,
 							 prefix,
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index f22c9436e0..d3d656dfb2 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -152,8 +152,7 @@ extern void logicalrep_write_truncate(StringInfo out, TransactionId xid,
 									  bool cascade, bool restart_seqs);
 extern List *logicalrep_read_truncate(StringInfo in,
 									  bool *cascade, bool *restart_seqs);
-extern void logicalrep_write_message(StringInfo out, bool in_streaming,
-									 ReorderBufferTXN *txn, XLogRecPtr lsn,
+extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
 									 bool transactional, const char *prefix, Size sz, const char *message);
 extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
 								 Relation rel);
-- 
2.20.1

0002-Add-xid-to-messages-when-streaming.patchtext/x-patch; charset=US-ASCII; name=0002-Add-xid-to-messages-when-streaming.patchDownload
From bf991e458df1f03708450ddd76a75a9476b5809a Mon Sep 17 00:00:00 2001
From: Dave Pirotte <dpirotte@gmail.com>
Date: Tue, 17 Nov 2020 04:01:34 +0000
Subject: [PATCH 2/6] Add xid to messages when streaming

---
 src/backend/replication/logical/proto.c     | 16 ++++++++++++++--
 src/backend/replication/pgoutput/pgoutput.c |  2 ++
 src/include/replication/logicalproto.h      |  3 ++-
 3 files changed, 18 insertions(+), 3 deletions(-)

diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index deba2a321c..d5eeee4784 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -366,17 +366,29 @@ logicalrep_read_truncate(StringInfo in,
  * Write MESSAGE to stream
  */
 void
-logicalrep_write_message(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr lsn,
-						 bool transactional, const char *prefix, Size sz,
+logicalrep_write_message(StringInfo out,
+						 bool in_streaming,
+						 ReorderBufferTXN *txn,
+						 XLogRecPtr lsn,
+						 bool transactional,
+						 const char *prefix,
+						 Size sz,
 						 const char *message)
 {
 	uint8		flags = 0;
+	TransactionId xid = InvalidTransactionId;
 
 	pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE);
 
 	/* encode and send message flags */
 	if (transactional)
+	{
 		flags |= MESSAGE_TRANSACTIONAL;
+		xid = txn->xid;
+	}
+
+	if (in_streaming)
+		pq_sendint32(out, xid);
 
 	pq_sendint8(out, flags);
 	pq_sendint64(out, lsn);
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index cd849c10a4..bd3c2a3b99 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -707,11 +707,13 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				 const char *message)
 {
 	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+
 	if (!data->messages)
 		return;
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_message(ctx->out,
+							 in_streaming,
 							 txn,
 							 message_lsn,
 							 transactional,
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index f3c8f95e2c..f22c9436e0 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -152,7 +152,8 @@ extern void logicalrep_write_truncate(StringInfo out, TransactionId xid,
 									  bool cascade, bool restart_seqs);
 extern List *logicalrep_read_truncate(StringInfo in,
 									  bool *cascade, bool *restart_seqs);
-extern void logicalrep_write_message(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr lsn,
+extern void logicalrep_write_message(StringInfo out, bool in_streaming,
+									 ReorderBufferTXN *txn, XLogRecPtr lsn,
 									 bool transactional, const char *prefix, Size sz, const char *message);
 extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
 								 Relation rel);
-- 
2.20.1

0004-Simplify-parse_output_parameters-function.patchtext/x-patch; charset=US-ASCII; name=0004-Simplify-parse_output_parameters-function.patchDownload
From 2cc05a67f69cab1e98c295ce23ae95592f167422 Mon Sep 17 00:00:00 2001
From: Euler Taveira <euler.taveira@enterprisedb.com>
Date: Mon, 16 Nov 2020 18:49:57 -0300
Subject: [PATCH 4/6] Simplify parse_output_parameters function

Instead of individual variables, pass PGOutputData to
parse_output_parameters. It is easier to read and maintain while
adding new options to pgoutput.
---
 src/backend/replication/pgoutput/pgoutput.c | 29 ++++++++-------------
 src/include/replication/pgoutput.h          |  1 +
 2 files changed, 12 insertions(+), 18 deletions(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index bd3c2a3b99..b25e67edcb 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -161,9 +161,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 }
 
 static void
-parse_output_parameters(List *options, uint32 *protocol_version,
-						List **publication_names, bool *binary,
-						bool *messages, bool *enable_streaming)
+parse_output_parameters(List *options, PGOutputData *data)
 {
 	ListCell   *lc;
 	bool		protocol_version_given = false;
@@ -172,8 +170,9 @@ parse_output_parameters(List *options, uint32 *protocol_version,
 	bool		messages_option_given = false;
 	bool		streaming_given = false;
 
-	*binary = false;
-	*messages = false;
+	data->binary = false;
+	data->messages = false;
+	data->streaming = false;
 
 	foreach(lc, options)
 	{
@@ -203,7 +202,7 @@ parse_output_parameters(List *options, uint32 *protocol_version,
 						 errmsg("proto_version \"%s\" out of range",
 								strVal(defel->arg))));
 
-			*protocol_version = (uint32) parsed;
+			data->protocol_version = (uint32) parsed;
 		}
 		else if (strcmp(defel->defname, "publication_names") == 0)
 		{
@@ -214,7 +213,7 @@ parse_output_parameters(List *options, uint32 *protocol_version,
 			publication_names_given = true;
 
 			if (!SplitIdentifierString(strVal(defel->arg), ',',
-									   publication_names))
+									   &data->publication_names))
 				ereport(ERROR,
 						(errcode(ERRCODE_INVALID_NAME),
 						 errmsg("invalid publication_names syntax")));
@@ -227,7 +226,7 @@ parse_output_parameters(List *options, uint32 *protocol_version,
 						 errmsg("conflicting or redundant options")));
 			binary_option_given = true;
 
-			*binary = defGetBoolean(defel);
+			data->binary = defGetBoolean(defel);
 		}
 		else if (strcmp(defel->defname, "messages") == 0)
 		{
@@ -237,7 +236,7 @@ parse_output_parameters(List *options, uint32 *protocol_version,
 						 errmsg("conflicting or redundant options")));
 			messages_option_given = true;
 
-			*messages = defGetBoolean(defel);
+			data->messages = defGetBoolean(defel);
 		}
 		else if (strcmp(defel->defname, "streaming") == 0)
 		{
@@ -247,7 +246,7 @@ parse_output_parameters(List *options, uint32 *protocol_version,
 						 errmsg("conflicting or redundant options")));
 			streaming_given = true;
 
-			*enable_streaming = defGetBoolean(defel);
+			data->streaming = defGetBoolean(defel);
 		}
 		else
 			elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
@@ -261,7 +260,6 @@ static void
 pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 				 bool is_init)
 {
-	bool		enable_streaming = false;
 	PGOutputData *data = palloc0(sizeof(PGOutputData));
 
 	/* Create our memory context for private allocations. */
@@ -282,12 +280,7 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 	if (!is_init)
 	{
 		/* Parse the params and ERROR if we see any we don't recognize */
-		parse_output_parameters(ctx->output_plugin_options,
-								&data->protocol_version,
-								&data->publication_names,
-								&data->binary,
-								&data->messages,
-								&enable_streaming);
+		parse_output_parameters(ctx->output_plugin_options, data);
 
 		/* Check if we support requested protocol */
 		if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
@@ -313,7 +306,7 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 		 * we only allow it with sufficient version of the protocol, and when
 		 * the output plugin supports it.
 		 */
-		if (!enable_streaming)
+		if (!data->streaming)
 			ctx->streaming = false;
 		else if (data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
 			ereport(ERROR,
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index 3b7273bd89..62a7d0a6d3 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -26,6 +26,7 @@ typedef struct PGOutputData
 	List	   *publications;
 	bool		binary;
 	bool		messages;
+	bool		streaming;
 } PGOutputData;
 
 #endif							/* PGOUTPUT_H */
-- 
2.20.1

0006-Overhaul-tests.patchtext/x-patch; charset=US-ASCII; name=0006-Overhaul-tests.patchDownload
From 73444c7e6979e6493043f011600a3c49bcea2aa6 Mon Sep 17 00:00:00 2001
From: Euler Taveira <euler.taveira@enterprisedb.com>
Date: Thu, 19 Nov 2020 21:53:29 -0300
Subject: [PATCH 6/6] Overhaul tests

---
 src/test/subscription/t/020_messages.pl | 160 +++++++++++-------------
 1 file changed, 75 insertions(+), 85 deletions(-)

diff --git a/src/test/subscription/t/020_messages.pl b/src/test/subscription/t/020_messages.pl
index 8e59e324e3..d9123ed3ef 100644
--- a/src/test/subscription/t/020_messages.pl
+++ b/src/test/subscription/t/020_messages.pl
@@ -1,158 +1,148 @@
-# Tests that logical decoding messages are emitted and that
-# they do not break subscribers
+# Tests that logical decoding messages
 use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 7;
+use Test::More tests => 5;
 
+# Create publisher node
 my $node_publisher = get_new_node('publisher');
 $node_publisher->init(allows_streaming => 'logical');
 $node_publisher->start;
 
+# Create subscriber node
 my $node_subscriber = get_new_node('subscriber');
 $node_subscriber->init(allows_streaming => 'logical');
 $node_subscriber->start;
 
+# Create some preexisting content on publisher
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_test (a int primary key)");
+
+# Setup structure on subscriber
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab_test (a int primary key)");
+
+# Setup logical replication
 my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
-#
-$node_publisher->safe_psql('postgres',
-	"CREATE TABLE tab (a int PRIMARY KEY)");
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE tab_test");
+
 $node_subscriber->safe_psql('postgres',
-	"CREATE TABLE tab (a int PRIMARY KEY)");
-$node_publisher->safe_psql('postgres',
-	"CREATE PUBLICATION pub FOR TABLE tab");
-$node_subscriber->safe_psql('postgres',
-	"CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub"
+	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
 );
 
-# ensure a transactional logical decoding message shows up on the slot
-$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub DISABLE");
+# Ensure a transactional logical decoding message shows up on the slot
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE");
 
 $node_publisher->safe_psql('postgres',
-	"select pg_logical_emit_message(true, 'a prefix', 'a transactional message')"
+	"SELECT pg_logical_emit_message(true, 'pgoutput', 'a transactional message')"
 );
 
-my $slot_codes_with_message = $node_publisher->safe_psql(
+my $result = $node_publisher->safe_psql(
 	'postgres', qq(
-		select get_byte(data, 0)
-		from pg_logical_slot_peek_binary_changes('sub', NULL, NULL,
+		SELECT get_byte(data, 0)
+		FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL,
 			'proto_version', '1',
-			'publication_names', 'pub',
+			'publication_names', 'tap_pub',
 			'messages', 'true')
 ));
 
 # 66 77 67 == B M C == BEGIN MESSAGE COMMIT
-is($slot_codes_with_message, "66\n77\n67",
+is($result, qq(66
+77
+67),
 	'messages on slot are B M C with message option');
 
-my $transactional_message_flags = $node_publisher->safe_psql(
+$result = $node_publisher->safe_psql(
 	'postgres', qq(
-		select get_byte(data, 1)
-		from pg_logical_slot_peek_binary_changes('sub', NULL, NULL,
+		SELECT get_byte(data, 1), encode(substr(data, 11, 8), 'escape')
+		FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL,
 			'proto_version', '1',
-			'publication_names', 'pub',
+			'publication_names', 'tap_pub',
 			'messages', 'true')
-		offset 1 limit 1
+		OFFSET 1 LIMIT 1
 ));
 
-is($transactional_message_flags, "1",
-	"transactional message flags are set to 1");
+is($result, qq(1|pgoutput),
+	"flag transactional is set to 1 and prefix is pgoutput");
 
-my $slot_codes_without_message = $node_publisher->safe_psql(
+$result = $node_publisher->safe_psql(
 	'postgres', qq(
-		select get_byte(data, 0)
-		from pg_logical_slot_peek_binary_changes('sub', NULL, NULL,
+		SELECT get_byte(data, 0)
+		FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL,
 			'proto_version', '1',
-			'publication_names', 'pub')
+			'publication_names', 'tap_pub')
 ));
 
 # 66 67 == B C == BEGIN COMMIT
-is($slot_codes_without_message, "66\n67",
-	'messages on slot are B C without message option');
+is($result, qq(66
+67),
+	'option messages defaults to false so message (M) is not available on slot');
 
-$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
-$node_publisher->wait_for_catchup('sub');
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub ENABLE");
+$node_publisher->wait_for_catchup('tap_sub');
 
 # ensure a non-transactional logical decoding message shows up on the slot
-$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub DISABLE");
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE");
 
-$node_publisher->safe_psql('postgres', "INSERT INTO tab VALUES (3)");
+$node_publisher->safe_psql('postgres', "INSERT INTO tab_test VALUES (1)");
 
 my $message_lsn = $node_publisher->safe_psql('postgres',
-	"select pg_logical_emit_message(false, 'prefix', 'nontransactional')");
+	"SELECT pg_logical_emit_message(false, 'pgoutput', 'a non-transactional message')");
 
-$node_publisher->safe_psql('postgres', "INSERT INTO tab VALUES (4)");
+$node_publisher->safe_psql('postgres', "INSERT INTO tab_test VALUES (2)");
 
-my $slot_message_code = $node_publisher->safe_psql(
+$result = $node_publisher->safe_psql(
 	'postgres', qq(
-		select get_byte(data, 0)
-		from pg_logical_slot_peek_binary_changes('sub', NULL, NULL,
+		SELECT get_byte(data, 0), get_byte(data, 1)
+		FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL,
 			'proto_version', '1',
-			'publication_names', 'pub',
+			'publication_names', 'tap_pub',
 			'messages', 'true')
-		where lsn = '$message_lsn' and xid = 0
+		WHERE lsn = '$message_lsn' AND xid = 0
 ));
 
-is($slot_message_code, "77", "non-transactional message on slot is M");
+is($result, qq(77|0), 'non-transactional message on slot is M');
 
-my $nontransactional_message_flags = $node_publisher->safe_psql(
-	'postgres', qq(
-		select get_byte(data, 1)
-		from pg_logical_slot_peek_binary_changes('sub', NULL, NULL,
-			'proto_version', '1',
-			'publication_names', 'pub',
-			'messages', 'true')
-		offset 1 limit 1
-));
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub ENABLE");
+$node_publisher->wait_for_catchup('tap_sub');
 
-is($nontransactional_message_flags,
-	"0", "non-transactional message flags are set to 0");
-
-$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
-$node_publisher->wait_for_catchup('sub');
-
-$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub DISABLE");
-$node_subscriber->safe_psql('postgres', "checkpoint;");
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE");
 
 # wait for the replication connection to drop from the publisher
 $node_publisher->poll_query_until('postgres',
-	'SELECT count(*) from pg_catalog.pg_stat_replication', 0);
+	'SELECT COUNT(*) FROM pg_catalog.pg_stat_replication', 0);
 
-# ensure a non-transactional logical decoding message shows up on the slot
-# when it is emitted within an aborted transaction. the message won't emit
-# until something advances the LSN, which we intentionally do here with a
-# checkpoint.
+# Ensure a non-transactional logical decoding message shows up on the slot when
+# it is emitted within an aborted transaction. The message won't emit until
+# something advances the LSN, hence, we intentionally forces the server to
+# switch to a new WAL file.
 $node_publisher->safe_psql(
 	'postgres', qq(
 		BEGIN;
-		SELECT pg_logical_emit_message(false, 'prefix',
-			'nontransactional aborted 1');
-		INSERT INTO tab VALUES (5);
-		SELECT pg_logical_emit_message(false, 'prefix',
-			'nontransactional aborted 2');
+		SELECT pg_logical_emit_message(false, 'pgoutput',
+			'a non-transactional message is available even if the transaction is aborted 1');
+		INSERT INTO tab_test VALUES (3);
+		SELECT pg_logical_emit_message(true, 'pgoutput',
+			'a transactional message is not available if the transaction is aborted');
+		SELECT pg_logical_emit_message(false, 'pgoutput',
+			'a non-transactional message is available even if the transaction is aborted 2');
 		ROLLBACK;
-		CHECKPOINT;
+		SELECT pg_switch_wal();
 ));
 
-my $aborted_txn_message_codes = $node_publisher->safe_psql(
+$result = $node_publisher->safe_psql(
 	'postgres', qq(
-		select get_byte(data, 0)
-		from pg_logical_slot_peek_binary_changes('sub', NULL, NULL,
+		SELECT get_byte(data, 0), get_byte(data, 1)
+		FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL,
 			'proto_version', '1',
-			'publication_names', 'pub',
+			'publication_names', 'tap_pub',
 			'messages', 'true')
 ));
 
-is($aborted_txn_message_codes, "77\n77",
-	"non-transactional message on slot from aborted transaction is M");
-
-$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
-$node_publisher->wait_for_catchup('sub');
-
-my $result =
-	$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab");
-is($result, qq(2), 'rows move');
+is($result, qq(77|0
+77|0),
+	'non-transactional message on slot from aborted transaction is M');
 
 $node_subscriber->stop('fast');
 $node_publisher->stop('fast');
-- 
2.20.1

#12David Steele
david@pgmasters.net
In reply to: Euler Taveira (#11)
Re: Any objections to implementing LogicalDecodeMessageCB for pgoutput?

Hi David,

On 11/24/20 10:28 PM, Euler Taveira wrote:

I also reviewed your patch. This feature would be really useful for
replication
scenarios. Supporting this feature means that you don't need to use a
table to
pass messages from one node to another one. Here are a few comments/ideas.

Do you know when you'll have a chance to look at Euler's suggestions?
Also, have Andres' suggestions/concerns upthread been addressed?

Marked Waiting on Author.

Regards,
--
-David
david@pgmasters.net

#13Amit Kapila
amit.kapila16@gmail.com
In reply to: Euler Taveira (#11)
Re: Any objections to implementing LogicalDecodeMessageCB for pgoutput?

On Wed, Nov 25, 2020 at 8:58 AM Euler Taveira
<euler.taveira@2ndquadrant.com> wrote:

On Wed, 18 Nov 2020 at 03:04, David Pirotte <dpirotte@gmail.com> wrote:

On Fri, Nov 6, 2020 at 7:05 AM Ashutosh Bapat <ashutosh.bapat.oss@gmail.com> wrote:

+/*
+ * Write MESSAGE to stream
+ */
+void
+logicalrep_write_message(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr lsn,
+                        bool transactional, const char *prefix, Size sz,
+                        const char *message)
+{
+   uint8       flags = 0;
+
+   pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE);
+

Similar to the UPDATE/DELETE/INSERT records decoded when streaming is being
used, we need to add transaction id for transactional messages. May be we add
that even in case of non-streaming case and use it to decide whether it's a
transactional message or not. That might save us a byte when we are adding a
transaction id.

I also reviewed your patch. This feature would be really useful for replication
scenarios. Supporting this feature means that you don't need to use a table to
pass messages from one node to another one. Here are a few comments/ideas.

Your ideas/suggestions look good to me. Don't we need to provide a
read function corresponding to logicalrep_write_message? We have it
for other write functions. Can you please combine all of your changes
into one patch?

--
With Regards,
Amit Kapila.

#14Euler Taveira
euler@eulerto.com
In reply to: Amit Kapila (#13)
2 attachment(s)
Re: Any objections to implementing LogicalDecodeMessageCB for pgoutput?

On Thu, Apr 1, 2021, at 7:19 AM, Amit Kapila wrote:

Your ideas/suggestions look good to me. Don't we need to provide a
read function corresponding to logicalrep_write_message? We have it
for other write functions. Can you please combine all of your changes
into one patch?

Thanks for taking a look at this patch. I didn't consider a
logicalrep_read_message function because the protocol doesn't support it yet.

/*
* Logical replication does not use generic logical messages yet.
* Although, it could be used by other applications that use this
* output plugin.
*/

Someone that is inspecting the code in the future could possibly check this
discussion to understand why this function isn't available.

This new patch set version has 2 patches that is because there are 2 separate
changes: parse_output_parameters() refactor and logical decoding message
support.

--
Euler Taveira
EDB https://www.enterprisedb.com/

Attachments:

v3-0001-Refactor-function-parse_output_parameters.patchtext/x-patch; name=v3-0001-Refactor-function-parse_output_parameters.patchDownload
From d17cda24259bbaa019bedd1175960375c2905112 Mon Sep 17 00:00:00 2001
From: Euler Taveira <euler.taveira@enterprisedb.com>
Date: Mon, 16 Nov 2020 18:49:57 -0300
Subject: [PATCH v3 1/2] Refactor function parse_output_parameters

Instead of using multiple parameters in parse_ouput_parameters function
signature, use the struct PGOutputData that encapsulates all pgoutput
options. It is the right approach to take in terms of maintainability.
---
 src/backend/replication/pgoutput/pgoutput.c | 24 ++++++++-------------
 src/include/replication/pgoutput.h          |  1 +
 2 files changed, 10 insertions(+), 15 deletions(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 1b993fb032..6146c5acdb 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -156,9 +156,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 }
 
 static void
-parse_output_parameters(List *options, uint32 *protocol_version,
-						List **publication_names, bool *binary,
-						bool *enable_streaming)
+parse_output_parameters(List *options, PGOutputData *data)
 {
 	ListCell   *lc;
 	bool		protocol_version_given = false;
@@ -166,7 +164,8 @@ parse_output_parameters(List *options, uint32 *protocol_version,
 	bool		binary_option_given = false;
 	bool		streaming_given = false;
 
-	*binary = false;
+	data->binary = false;
+	data->streaming = false;
 
 	foreach(lc, options)
 	{
@@ -196,7 +195,7 @@ parse_output_parameters(List *options, uint32 *protocol_version,
 						 errmsg("proto_version \"%s\" out of range",
 								strVal(defel->arg))));
 
-			*protocol_version = (uint32) parsed;
+			data->protocol_version = (uint32) parsed;
 		}
 		else if (strcmp(defel->defname, "publication_names") == 0)
 		{
@@ -207,7 +206,7 @@ parse_output_parameters(List *options, uint32 *protocol_version,
 			publication_names_given = true;
 
 			if (!SplitIdentifierString(strVal(defel->arg), ',',
-									   publication_names))
+									   &data->publication_names))
 				ereport(ERROR,
 						(errcode(ERRCODE_INVALID_NAME),
 						 errmsg("invalid publication_names syntax")));
@@ -220,7 +219,7 @@ parse_output_parameters(List *options, uint32 *protocol_version,
 						 errmsg("conflicting or redundant options")));
 			binary_option_given = true;
 
-			*binary = defGetBoolean(defel);
+			data->binary = defGetBoolean(defel);
 		}
 		else if (strcmp(defel->defname, "streaming") == 0)
 		{
@@ -230,7 +229,7 @@ parse_output_parameters(List *options, uint32 *protocol_version,
 						 errmsg("conflicting or redundant options")));
 			streaming_given = true;
 
-			*enable_streaming = defGetBoolean(defel);
+			data->streaming = defGetBoolean(defel);
 		}
 		else
 			elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
@@ -244,7 +243,6 @@ static void
 pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 				 bool is_init)
 {
-	bool		enable_streaming = false;
 	PGOutputData *data = palloc0(sizeof(PGOutputData));
 
 	/* Create our memory context for private allocations. */
@@ -265,11 +263,7 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 	if (!is_init)
 	{
 		/* Parse the params and ERROR if we see any we don't recognize */
-		parse_output_parameters(ctx->output_plugin_options,
-								&data->protocol_version,
-								&data->publication_names,
-								&data->binary,
-								&enable_streaming);
+		parse_output_parameters(ctx->output_plugin_options, data);
 
 		/* Check if we support requested protocol */
 		if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
@@ -295,7 +289,7 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 		 * we only allow it with sufficient version of the protocol, and when
 		 * the output plugin supports it.
 		 */
-		if (!enable_streaming)
+		if (!data->streaming)
 			ctx->streaming = false;
 		else if (data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
 			ereport(ERROR,
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index 4ba052fe38..bb383d523e 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -25,6 +25,7 @@ typedef struct PGOutputData
 	List	   *publication_names;
 	List	   *publications;
 	bool		binary;
+	bool		streaming;
 } PGOutputData;
 
 #endif							/* PGOUTPUT_H */
-- 
2.20.1

v3-0002-Logical-decoding-message-support-to-pgoutput.patchtext/x-patch; name=v3-0002-Logical-decoding-message-support-to-pgoutput.patchDownload
From e80c65d04e6b68fed34e44cb32962d078db6c28a Mon Sep 17 00:00:00 2001
From: Dave Pirotte <dpirotte@gmail.com>
Date: Thu, 5 Nov 2020 03:14:54 +0000
Subject: [PATCH v3 2/2] Logical decoding message support to pgoutput

This feature allows pgoutput to send logical decoding messages. The
output plugin accepts a new parameter (messages) that controls if
logical decoding messages are written into the replication stream. It is
useful for those clients that uses pgoutput as output plugin and needs
to process messages that was written by pg_logical_emit_message().

Although logical streaming replication protocol supports logical
decoding messages now, logical replication does not use this feature
yet.
---
 doc/src/sgml/protocol.sgml                  |  65 +++++++++
 src/backend/replication/logical/proto.c     |  28 ++++
 src/backend/replication/logical/worker.c    |   9 ++
 src/backend/replication/pgoutput/pgoutput.c |  46 ++++++
 src/include/replication/logicalproto.h      |   3 +
 src/include/replication/pgoutput.h          |   1 +
 src/test/subscription/t/020_messages.pl     | 148 ++++++++++++++++++++
 7 files changed, 300 insertions(+)
 create mode 100644 src/test/subscription/t/020_messages.pl

diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 43092fe62a..7c52d5ab70 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -6433,6 +6433,71 @@ Begin
 </listitem>
 </varlistentry>
 
+<varlistentry>
+<term>
+Message
+</term>
+<listitem>
+<para>
+
+<variablelist>
+<varlistentry>
+<term>
+        Byte1('M')
+</term>
+<listitem>
+<para>
+                Identifies the message as a logical decoding message.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Int8
+</term>
+<listitem>
+<para>
+                Flags; Either 0 for no flags or 1 if the logical decoding
+                message is transactional.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Int64
+</term>
+<listitem>
+<para>
+                The LSN of the logical decoding message.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        String
+</term>
+<listitem>
+<para>
+                The prefix of the logical decoding message.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Byte<replaceable>n</replaceable>
+</term>
+<listitem>
+<para>
+                The content of the logical decoding message.
+</para>
+</listitem>
+</varlistentry>
+
+</variablelist>
+</para>
+</listitem>
+</varlistentry>
+
 <varlistentry>
 <term>
 Commit
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index f2c85cabb5..2a1f9830e0 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -25,6 +25,7 @@
  */
 #define LOGICALREP_IS_REPLICA_IDENTITY 1
 
+#define MESSAGE_TRANSACTIONAL (1<<0)
 #define TRUNCATE_CASCADE		(1<<0)
 #define TRUNCATE_RESTART_SEQS	(1<<1)
 
@@ -361,6 +362,33 @@ logicalrep_read_truncate(StringInfo in,
 	return relids;
 }
 
+/*
+ * Write MESSAGE to stream
+ */
+void
+logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
+						 bool transactional, const char *prefix, Size sz,
+						 const char *message)
+{
+	uint8		flags = 0;
+
+	pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE);
+
+	/* encode and send message flags */
+	if (transactional)
+		flags |= MESSAGE_TRANSACTIONAL;
+
+	/* transaction ID (if not valid, we're not streaming) */
+	if (TransactionIdIsValid(xid))
+		pq_sendint32(out, xid);
+
+	pq_sendint8(out, flags);
+	pq_sendint64(out, lsn);
+	pq_sendstring(out, prefix);
+	pq_sendint32(out, sz);
+	pq_sendbytes(out, message, sz);
+}
+
 /*
  * Write relation description to the output stream.
  */
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 354fbe4b4b..74d538b5e3 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1939,6 +1939,15 @@ apply_dispatch(StringInfo s)
 			apply_handle_origin(s);
 			return;
 
+		case LOGICAL_REP_MSG_MESSAGE:
+
+			/*
+			 * Logical replication does not use generic logical messages yet.
+			 * Although, it could be used by other applications that use this
+			 * output plugin.
+			 */
+			return;
+
 		case LOGICAL_REP_MSG_STREAM_START:
 			apply_handle_stream_start(s);
 			return;
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 6146c5acdb..2981b9c430 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -45,6 +45,10 @@ static void pgoutput_change(LogicalDecodingContext *ctx,
 static void pgoutput_truncate(LogicalDecodingContext *ctx,
 							  ReorderBufferTXN *txn, int nrelations, Relation relations[],
 							  ReorderBufferChange *change);
+static void pgoutput_message(LogicalDecodingContext *ctx,
+							 ReorderBufferTXN *txn, XLogRecPtr message_lsn,
+							 bool transactional, const char *prefix,
+							 Size sz, const char *message);
 static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
 								   RepOriginId origin_id);
 static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
@@ -142,6 +146,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->begin_cb = pgoutput_begin_txn;
 	cb->change_cb = pgoutput_change;
 	cb->truncate_cb = pgoutput_truncate;
+	cb->message_cb = pgoutput_message;
 	cb->commit_cb = pgoutput_commit_txn;
 	cb->filter_by_origin_cb = pgoutput_origin_filter;
 	cb->shutdown_cb = pgoutput_shutdown;
@@ -162,10 +167,12 @@ parse_output_parameters(List *options, PGOutputData *data)
 	bool		protocol_version_given = false;
 	bool		publication_names_given = false;
 	bool		binary_option_given = false;
+	bool		messages_option_given = false;
 	bool		streaming_given = false;
 
 	data->binary = false;
 	data->streaming = false;
+	data->messages = false;
 
 	foreach(lc, options)
 	{
@@ -221,6 +228,16 @@ parse_output_parameters(List *options, PGOutputData *data)
 
 			data->binary = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "messages") == 0)
+		{
+			if (messages_option_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			messages_option_given = true;
+
+			data->messages = defGetBoolean(defel);
+		}
 		else if (strcmp(defel->defname, "streaming") == 0)
 		{
 			if (streaming_given)
@@ -689,6 +706,35 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	MemoryContextReset(data->context);
 }
 
+static void
+pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+				 XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
+				 const char *message)
+{
+	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+	TransactionId xid = InvalidTransactionId;
+
+	if (!data->messages)
+		return;
+
+	/*
+	 * Remember the xid for the message in streaming mode. See
+	 * pgoutput_change.
+	 */
+	if (in_streaming)
+		xid = txn->xid;
+
+	OutputPluginPrepareWrite(ctx, true);
+	logicalrep_write_message(ctx->out,
+							 xid,
+							 message_lsn,
+							 transactional,
+							 prefix,
+							 sz,
+							 message);
+	OutputPluginWrite(ctx, true);
+}
+
 /*
  * Currently we always forward.
  */
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index fa4c37277b..55b90c03ea 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -54,6 +54,7 @@ typedef enum LogicalRepMsgType
 	LOGICAL_REP_MSG_TRUNCATE = 'T',
 	LOGICAL_REP_MSG_RELATION = 'R',
 	LOGICAL_REP_MSG_TYPE = 'Y',
+	LOGICAL_REP_MSG_MESSAGE = 'M',
 	LOGICAL_REP_MSG_STREAM_START = 'S',
 	LOGICAL_REP_MSG_STREAM_END = 'E',
 	LOGICAL_REP_MSG_STREAM_COMMIT = 'c',
@@ -151,6 +152,8 @@ extern void logicalrep_write_truncate(StringInfo out, TransactionId xid,
 									  bool cascade, bool restart_seqs);
 extern List *logicalrep_read_truncate(StringInfo in,
 									  bool *cascade, bool *restart_seqs);
+extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
+									 bool transactional, const char *prefix, Size sz, const char *message);
 extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
 								 Relation rel);
 extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index bb383d523e..51e7c0348d 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -26,6 +26,7 @@ typedef struct PGOutputData
 	List	   *publications;
 	bool		binary;
 	bool		streaming;
+	bool		messages;
 } PGOutputData;
 
 #endif							/* PGOUTPUT_H */
diff --git a/src/test/subscription/t/020_messages.pl b/src/test/subscription/t/020_messages.pl
new file mode 100644
index 0000000000..d9123ed3ef
--- /dev/null
+++ b/src/test/subscription/t/020_messages.pl
@@ -0,0 +1,148 @@
+# Tests that logical decoding messages
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 5;
+
+# Create publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# Create some preexisting content on publisher
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_test (a int primary key)");
+
+# Setup structure on subscriber
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab_test (a int primary key)");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE tab_test");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
+);
+
+# Ensure a transactional logical decoding message shows up on the slot
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE");
+
+$node_publisher->safe_psql('postgres',
+	"SELECT pg_logical_emit_message(true, 'pgoutput', 'a transactional message')"
+);
+
+my $result = $node_publisher->safe_psql(
+	'postgres', qq(
+		SELECT get_byte(data, 0)
+		FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL,
+			'proto_version', '1',
+			'publication_names', 'tap_pub',
+			'messages', 'true')
+));
+
+# 66 77 67 == B M C == BEGIN MESSAGE COMMIT
+is($result, qq(66
+77
+67),
+	'messages on slot are B M C with message option');
+
+$result = $node_publisher->safe_psql(
+	'postgres', qq(
+		SELECT get_byte(data, 1), encode(substr(data, 11, 8), 'escape')
+		FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL,
+			'proto_version', '1',
+			'publication_names', 'tap_pub',
+			'messages', 'true')
+		OFFSET 1 LIMIT 1
+));
+
+is($result, qq(1|pgoutput),
+	"flag transactional is set to 1 and prefix is pgoutput");
+
+$result = $node_publisher->safe_psql(
+	'postgres', qq(
+		SELECT get_byte(data, 0)
+		FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL,
+			'proto_version', '1',
+			'publication_names', 'tap_pub')
+));
+
+# 66 67 == B C == BEGIN COMMIT
+is($result, qq(66
+67),
+	'option messages defaults to false so message (M) is not available on slot');
+
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub ENABLE");
+$node_publisher->wait_for_catchup('tap_sub');
+
+# ensure a non-transactional logical decoding message shows up on the slot
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE");
+
+$node_publisher->safe_psql('postgres', "INSERT INTO tab_test VALUES (1)");
+
+my $message_lsn = $node_publisher->safe_psql('postgres',
+	"SELECT pg_logical_emit_message(false, 'pgoutput', 'a non-transactional message')");
+
+$node_publisher->safe_psql('postgres', "INSERT INTO tab_test VALUES (2)");
+
+$result = $node_publisher->safe_psql(
+	'postgres', qq(
+		SELECT get_byte(data, 0), get_byte(data, 1)
+		FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL,
+			'proto_version', '1',
+			'publication_names', 'tap_pub',
+			'messages', 'true')
+		WHERE lsn = '$message_lsn' AND xid = 0
+));
+
+is($result, qq(77|0), 'non-transactional message on slot is M');
+
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub ENABLE");
+$node_publisher->wait_for_catchup('tap_sub');
+
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE");
+
+# wait for the replication connection to drop from the publisher
+$node_publisher->poll_query_until('postgres',
+	'SELECT COUNT(*) FROM pg_catalog.pg_stat_replication', 0);
+
+# Ensure a non-transactional logical decoding message shows up on the slot when
+# it is emitted within an aborted transaction. The message won't emit until
+# something advances the LSN, hence, we intentionally forces the server to
+# switch to a new WAL file.
+$node_publisher->safe_psql(
+	'postgres', qq(
+		BEGIN;
+		SELECT pg_logical_emit_message(false, 'pgoutput',
+			'a non-transactional message is available even if the transaction is aborted 1');
+		INSERT INTO tab_test VALUES (3);
+		SELECT pg_logical_emit_message(true, 'pgoutput',
+			'a transactional message is not available if the transaction is aborted');
+		SELECT pg_logical_emit_message(false, 'pgoutput',
+			'a non-transactional message is available even if the transaction is aborted 2');
+		ROLLBACK;
+		SELECT pg_switch_wal();
+));
+
+$result = $node_publisher->safe_psql(
+	'postgres', qq(
+		SELECT get_byte(data, 0), get_byte(data, 1)
+		FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL,
+			'proto_version', '1',
+			'publication_names', 'tap_pub',
+			'messages', 'true')
+));
+
+is($result, qq(77|0
+77|0),
+	'non-transactional message on slot from aborted transaction is M');
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
-- 
2.20.1

#15Amit Kapila
amit.kapila16@gmail.com
In reply to: Euler Taveira (#14)
2 attachment(s)
Re: Any objections to implementing LogicalDecodeMessageCB for pgoutput?

On Sat, Apr 3, 2021 at 5:26 AM Euler Taveira <euler@eulerto.com> wrote:

On Thu, Apr 1, 2021, at 7:19 AM, Amit Kapila wrote:

This new patch set version has 2 patches that is because there are 2 separate
changes: parse_output_parameters() refactor and logical decoding message
support.

I have made few minor changes in the attached. (a) Initialize the
streaming message callback API, (b) update docs to reflect that XID
can be sent for streaming of in-progress transactions, I see that the
same information needs to be updated for a few other protocol message
but we can do that as a separate patch (c) slightly tweaked the commit
messages

Let me know what you think? I am planning to push this tomorrow unless
you or someone else has any comments.

--
With Regards,
Amit Kapila.

Attachments:

v4-0001-Refactor-function-parse_output_parameters.patchapplication/octet-stream; name=v4-0001-Refactor-function-parse_output_parameters.patchDownload
From 4366284cdf94f84897ca10c5cebdcde4a427acf4 Mon Sep 17 00:00:00 2001
From: Euler Taveira <euler.taveira@enterprisedb.com>
Date: Mon, 16 Nov 2020 18:49:57 -0300
Subject: [PATCH v4 1/2] Refactor function parse_output_parameters.

Instead of using multiple parameters in parse_ouput_parameters function
signature, use the struct PGOutputData that encapsulates all pgoutput
options. It will be useful for future work where we need to add other
options in pgoutput.

Author: Euler Taveira
Reviewed-by: Amit Kapila
Discussion: https://postgr.es/m/CADK3HHJ-+9SO7KuRLH=9Wa1rAo60Yreq1GFNkH_kd0=CdaWM+A@mail.gmail.com
---
 src/backend/replication/pgoutput/pgoutput.c | 24 +++++++++---------------
 src/include/replication/pgoutput.h          |  1 +
 2 files changed, 10 insertions(+), 15 deletions(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 1b993fb..6146c5a 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -156,9 +156,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 }
 
 static void
-parse_output_parameters(List *options, uint32 *protocol_version,
-						List **publication_names, bool *binary,
-						bool *enable_streaming)
+parse_output_parameters(List *options, PGOutputData *data)
 {
 	ListCell   *lc;
 	bool		protocol_version_given = false;
@@ -166,7 +164,8 @@ parse_output_parameters(List *options, uint32 *protocol_version,
 	bool		binary_option_given = false;
 	bool		streaming_given = false;
 
-	*binary = false;
+	data->binary = false;
+	data->streaming = false;
 
 	foreach(lc, options)
 	{
@@ -196,7 +195,7 @@ parse_output_parameters(List *options, uint32 *protocol_version,
 						 errmsg("proto_version \"%s\" out of range",
 								strVal(defel->arg))));
 
-			*protocol_version = (uint32) parsed;
+			data->protocol_version = (uint32) parsed;
 		}
 		else if (strcmp(defel->defname, "publication_names") == 0)
 		{
@@ -207,7 +206,7 @@ parse_output_parameters(List *options, uint32 *protocol_version,
 			publication_names_given = true;
 
 			if (!SplitIdentifierString(strVal(defel->arg), ',',
-									   publication_names))
+									   &data->publication_names))
 				ereport(ERROR,
 						(errcode(ERRCODE_INVALID_NAME),
 						 errmsg("invalid publication_names syntax")));
@@ -220,7 +219,7 @@ parse_output_parameters(List *options, uint32 *protocol_version,
 						 errmsg("conflicting or redundant options")));
 			binary_option_given = true;
 
-			*binary = defGetBoolean(defel);
+			data->binary = defGetBoolean(defel);
 		}
 		else if (strcmp(defel->defname, "streaming") == 0)
 		{
@@ -230,7 +229,7 @@ parse_output_parameters(List *options, uint32 *protocol_version,
 						 errmsg("conflicting or redundant options")));
 			streaming_given = true;
 
-			*enable_streaming = defGetBoolean(defel);
+			data->streaming = defGetBoolean(defel);
 		}
 		else
 			elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
@@ -244,7 +243,6 @@ static void
 pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 				 bool is_init)
 {
-	bool		enable_streaming = false;
 	PGOutputData *data = palloc0(sizeof(PGOutputData));
 
 	/* Create our memory context for private allocations. */
@@ -265,11 +263,7 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 	if (!is_init)
 	{
 		/* Parse the params and ERROR if we see any we don't recognize */
-		parse_output_parameters(ctx->output_plugin_options,
-								&data->protocol_version,
-								&data->publication_names,
-								&data->binary,
-								&enable_streaming);
+		parse_output_parameters(ctx->output_plugin_options, data);
 
 		/* Check if we support requested protocol */
 		if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
@@ -295,7 +289,7 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 		 * we only allow it with sufficient version of the protocol, and when
 		 * the output plugin supports it.
 		 */
-		if (!enable_streaming)
+		if (!data->streaming)
 			ctx->streaming = false;
 		else if (data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
 			ereport(ERROR,
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index 4ba052f..bb383d5 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -25,6 +25,7 @@ typedef struct PGOutputData
 	List	   *publication_names;
 	List	   *publications;
 	bool		binary;
+	bool		streaming;
 } PGOutputData;
 
 #endif							/* PGOUTPUT_H */
-- 
1.8.3.1

v4-0002-Allow-pgoutput-to-send-logical-decoding-messages.patchapplication/octet-stream; name=v4-0002-Allow-pgoutput-to-send-logical-decoding-messages.patchDownload
From e16a60ab7da33db07cdd4d6c6427ef6ec1376eef Mon Sep 17 00:00:00 2001
From: Dave Pirotte <dpirotte@gmail.com>
Date: Thu, 5 Nov 2020 03:14:54 +0000
Subject: [PATCH v4 2/2] Allow pgoutput to send logical decoding messages.

The output plugin accepts a new parameter (messages) that controls if
logical decoding messages are written into the replication stream. It is
useful for those clients that use pgoutput as an output plugin and needs
to process messages that were written by pg_logical_emit_message().

Although logical streaming replication protocol supports logical
decoding messages now, logical replication does not use this feature yet.

Author: David Pirotte, Euler Taveira
Reviewed-by: Euler Taveira, Andres Freund, Ashutosh Bapat, Amit Kapila
Discussion: https://postgr.es/m/CADK3HHJ-+9SO7KuRLH=9Wa1rAo60Yreq1GFNkH_kd0=CdaWM+A@mail.gmail.com
---
 doc/src/sgml/protocol.sgml                  |  76 ++++++++++++++
 src/backend/replication/logical/proto.c     |  28 ++++++
 src/backend/replication/logical/worker.c    |   9 ++
 src/backend/replication/pgoutput/pgoutput.c |  47 +++++++++
 src/include/replication/logicalproto.h      |   3 +
 src/include/replication/pgoutput.h          |   1 +
 src/test/subscription/t/020_messages.pl     | 148 ++++++++++++++++++++++++++++
 7 files changed, 312 insertions(+)
 create mode 100644 src/test/subscription/t/020_messages.pl

diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 43092fe..380be5f 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -6435,6 +6435,82 @@ Begin
 
 <varlistentry>
 <term>
+Message
+</term>
+<listitem>
+<para>
+
+<variablelist>
+<varlistentry>
+<term>
+        Byte1('M')
+</term>
+<listitem>
+<para>
+                Identifies the message as a logical decoding message.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Int32
+</term>
+<listitem>
+<para>
+                Xid of the transaction. The XID is sent only when user has
+                requested streaming of in-progress transactions.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Int8
+</term>
+<listitem>
+<para>
+                Flags; Either 0 for no flags or 1 if the logical decoding
+                message is transactional.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Int64
+</term>
+<listitem>
+<para>
+                The LSN of the logical decoding message.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        String
+</term>
+<listitem>
+<para>
+                The prefix of the logical decoding message.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Byte<replaceable>n</replaceable>
+</term>
+<listitem>
+<para>
+                The content of the logical decoding message.
+</para>
+</listitem>
+</varlistentry>
+
+</variablelist>
+</para>
+</listitem>
+</varlistentry>
+
+<varlistentry>
+<term>
 Commit
 </term>
 <listitem>
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index f2c85ca..2a1f983 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -25,6 +25,7 @@
  */
 #define LOGICALREP_IS_REPLICA_IDENTITY 1
 
+#define MESSAGE_TRANSACTIONAL (1<<0)
 #define TRUNCATE_CASCADE		(1<<0)
 #define TRUNCATE_RESTART_SEQS	(1<<1)
 
@@ -362,6 +363,33 @@ logicalrep_read_truncate(StringInfo in,
 }
 
 /*
+ * Write MESSAGE to stream
+ */
+void
+logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
+						 bool transactional, const char *prefix, Size sz,
+						 const char *message)
+{
+	uint8		flags = 0;
+
+	pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE);
+
+	/* encode and send message flags */
+	if (transactional)
+		flags |= MESSAGE_TRANSACTIONAL;
+
+	/* transaction ID (if not valid, we're not streaming) */
+	if (TransactionIdIsValid(xid))
+		pq_sendint32(out, xid);
+
+	pq_sendint8(out, flags);
+	pq_sendint64(out, lsn);
+	pq_sendstring(out, prefix);
+	pq_sendint32(out, sz);
+	pq_sendbytes(out, message, sz);
+}
+
+/*
  * Write relation description to the output stream.
  */
 void
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 354fbe4..74d538b 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1939,6 +1939,15 @@ apply_dispatch(StringInfo s)
 			apply_handle_origin(s);
 			return;
 
+		case LOGICAL_REP_MSG_MESSAGE:
+
+			/*
+			 * Logical replication does not use generic logical messages yet.
+			 * Although, it could be used by other applications that use this
+			 * output plugin.
+			 */
+			return;
+
 		case LOGICAL_REP_MSG_STREAM_START:
 			apply_handle_stream_start(s);
 			return;
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 6146c5a..f68348d 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -45,6 +45,10 @@ static void pgoutput_change(LogicalDecodingContext *ctx,
 static void pgoutput_truncate(LogicalDecodingContext *ctx,
 							  ReorderBufferTXN *txn, int nrelations, Relation relations[],
 							  ReorderBufferChange *change);
+static void pgoutput_message(LogicalDecodingContext *ctx,
+							 ReorderBufferTXN *txn, XLogRecPtr message_lsn,
+							 bool transactional, const char *prefix,
+							 Size sz, const char *message);
 static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
 								   RepOriginId origin_id);
 static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
@@ -142,6 +146,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->begin_cb = pgoutput_begin_txn;
 	cb->change_cb = pgoutput_change;
 	cb->truncate_cb = pgoutput_truncate;
+	cb->message_cb = pgoutput_message;
 	cb->commit_cb = pgoutput_commit_txn;
 	cb->filter_by_origin_cb = pgoutput_origin_filter;
 	cb->shutdown_cb = pgoutput_shutdown;
@@ -152,6 +157,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->stream_abort_cb = pgoutput_stream_abort;
 	cb->stream_commit_cb = pgoutput_stream_commit;
 	cb->stream_change_cb = pgoutput_change;
+	cb->stream_message_cb = pgoutput_message;
 	cb->stream_truncate_cb = pgoutput_truncate;
 }
 
@@ -162,10 +168,12 @@ parse_output_parameters(List *options, PGOutputData *data)
 	bool		protocol_version_given = false;
 	bool		publication_names_given = false;
 	bool		binary_option_given = false;
+	bool		messages_option_given = false;
 	bool		streaming_given = false;
 
 	data->binary = false;
 	data->streaming = false;
+	data->messages = false;
 
 	foreach(lc, options)
 	{
@@ -221,6 +229,16 @@ parse_output_parameters(List *options, PGOutputData *data)
 
 			data->binary = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "messages") == 0)
+		{
+			if (messages_option_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			messages_option_given = true;
+
+			data->messages = defGetBoolean(defel);
+		}
 		else if (strcmp(defel->defname, "streaming") == 0)
 		{
 			if (streaming_given)
@@ -689,6 +707,35 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	MemoryContextReset(data->context);
 }
 
+static void
+pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+				 XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
+				 const char *message)
+{
+	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+	TransactionId xid = InvalidTransactionId;
+
+	if (!data->messages)
+		return;
+
+	/*
+	 * Remember the xid for the message in streaming mode. See
+	 * pgoutput_change.
+	 */
+	if (in_streaming)
+		xid = txn->xid;
+
+	OutputPluginPrepareWrite(ctx, true);
+	logicalrep_write_message(ctx->out,
+							 xid,
+							 message_lsn,
+							 transactional,
+							 prefix,
+							 sz,
+							 message);
+	OutputPluginWrite(ctx, true);
+}
+
 /*
  * Currently we always forward.
  */
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index fa4c372..55b90c0 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -54,6 +54,7 @@ typedef enum LogicalRepMsgType
 	LOGICAL_REP_MSG_TRUNCATE = 'T',
 	LOGICAL_REP_MSG_RELATION = 'R',
 	LOGICAL_REP_MSG_TYPE = 'Y',
+	LOGICAL_REP_MSG_MESSAGE = 'M',
 	LOGICAL_REP_MSG_STREAM_START = 'S',
 	LOGICAL_REP_MSG_STREAM_END = 'E',
 	LOGICAL_REP_MSG_STREAM_COMMIT = 'c',
@@ -151,6 +152,8 @@ extern void logicalrep_write_truncate(StringInfo out, TransactionId xid,
 									  bool cascade, bool restart_seqs);
 extern List *logicalrep_read_truncate(StringInfo in,
 									  bool *cascade, bool *restart_seqs);
+extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
+									 bool transactional, const char *prefix, Size sz, const char *message);
 extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
 								 Relation rel);
 extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index bb383d5..51e7c03 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -26,6 +26,7 @@ typedef struct PGOutputData
 	List	   *publications;
 	bool		binary;
 	bool		streaming;
+	bool		messages;
 } PGOutputData;
 
 #endif							/* PGOUTPUT_H */
diff --git a/src/test/subscription/t/020_messages.pl b/src/test/subscription/t/020_messages.pl
new file mode 100644
index 0000000..d9123ed
--- /dev/null
+++ b/src/test/subscription/t/020_messages.pl
@@ -0,0 +1,148 @@
+# Tests that logical decoding messages
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 5;
+
+# Create publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# Create some preexisting content on publisher
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_test (a int primary key)");
+
+# Setup structure on subscriber
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab_test (a int primary key)");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE tab_test");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
+);
+
+# Ensure a transactional logical decoding message shows up on the slot
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE");
+
+$node_publisher->safe_psql('postgres',
+	"SELECT pg_logical_emit_message(true, 'pgoutput', 'a transactional message')"
+);
+
+my $result = $node_publisher->safe_psql(
+	'postgres', qq(
+		SELECT get_byte(data, 0)
+		FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL,
+			'proto_version', '1',
+			'publication_names', 'tap_pub',
+			'messages', 'true')
+));
+
+# 66 77 67 == B M C == BEGIN MESSAGE COMMIT
+is($result, qq(66
+77
+67),
+	'messages on slot are B M C with message option');
+
+$result = $node_publisher->safe_psql(
+	'postgres', qq(
+		SELECT get_byte(data, 1), encode(substr(data, 11, 8), 'escape')
+		FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL,
+			'proto_version', '1',
+			'publication_names', 'tap_pub',
+			'messages', 'true')
+		OFFSET 1 LIMIT 1
+));
+
+is($result, qq(1|pgoutput),
+	"flag transactional is set to 1 and prefix is pgoutput");
+
+$result = $node_publisher->safe_psql(
+	'postgres', qq(
+		SELECT get_byte(data, 0)
+		FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL,
+			'proto_version', '1',
+			'publication_names', 'tap_pub')
+));
+
+# 66 67 == B C == BEGIN COMMIT
+is($result, qq(66
+67),
+	'option messages defaults to false so message (M) is not available on slot');
+
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub ENABLE");
+$node_publisher->wait_for_catchup('tap_sub');
+
+# ensure a non-transactional logical decoding message shows up on the slot
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE");
+
+$node_publisher->safe_psql('postgres', "INSERT INTO tab_test VALUES (1)");
+
+my $message_lsn = $node_publisher->safe_psql('postgres',
+	"SELECT pg_logical_emit_message(false, 'pgoutput', 'a non-transactional message')");
+
+$node_publisher->safe_psql('postgres', "INSERT INTO tab_test VALUES (2)");
+
+$result = $node_publisher->safe_psql(
+	'postgres', qq(
+		SELECT get_byte(data, 0), get_byte(data, 1)
+		FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL,
+			'proto_version', '1',
+			'publication_names', 'tap_pub',
+			'messages', 'true')
+		WHERE lsn = '$message_lsn' AND xid = 0
+));
+
+is($result, qq(77|0), 'non-transactional message on slot is M');
+
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub ENABLE");
+$node_publisher->wait_for_catchup('tap_sub');
+
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE");
+
+# wait for the replication connection to drop from the publisher
+$node_publisher->poll_query_until('postgres',
+	'SELECT COUNT(*) FROM pg_catalog.pg_stat_replication', 0);
+
+# Ensure a non-transactional logical decoding message shows up on the slot when
+# it is emitted within an aborted transaction. The message won't emit until
+# something advances the LSN, hence, we intentionally forces the server to
+# switch to a new WAL file.
+$node_publisher->safe_psql(
+	'postgres', qq(
+		BEGIN;
+		SELECT pg_logical_emit_message(false, 'pgoutput',
+			'a non-transactional message is available even if the transaction is aborted 1');
+		INSERT INTO tab_test VALUES (3);
+		SELECT pg_logical_emit_message(true, 'pgoutput',
+			'a transactional message is not available if the transaction is aborted');
+		SELECT pg_logical_emit_message(false, 'pgoutput',
+			'a non-transactional message is available even if the transaction is aborted 2');
+		ROLLBACK;
+		SELECT pg_switch_wal();
+));
+
+$result = $node_publisher->safe_psql(
+	'postgres', qq(
+		SELECT get_byte(data, 0), get_byte(data, 1)
+		FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL,
+			'proto_version', '1',
+			'publication_names', 'tap_pub',
+			'messages', 'true')
+));
+
+is($result, qq(77|0
+77|0),
+	'non-transactional message on slot from aborted transaction is M');
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
-- 
1.8.3.1

#16Euler Taveira
euler@eulerto.com
In reply to: Amit Kapila (#15)
Re: Any objections to implementing LogicalDecodeMessageCB for pgoutput?

On Mon, Apr 5, 2021, at 4:06 AM, Amit Kapila wrote:

I have made few minor changes in the attached. (a) Initialize the
streaming message callback API, (b) update docs to reflect that XID
can be sent for streaming of in-progress transactions, I see that the
same information needs to be updated for a few other protocol message
but we can do that as a separate patch (c) slightly tweaked the commit
messages

Good catch. I completely forgot the streaming of in progress transactions. I
agree that the documentation for transaction should be added as a separate
patch since the scope is beyond this feature.

--
Euler Taveira
EDB https://www.enterprisedb.com/

#17Amit Kapila
amit.kapila16@gmail.com
In reply to: Euler Taveira (#16)
Re: Any objections to implementing LogicalDecodeMessageCB for pgoutput?

On Mon, Apr 5, 2021 at 5:45 PM Euler Taveira <euler@eulerto.com> wrote:

On Mon, Apr 5, 2021, at 4:06 AM, Amit Kapila wrote:

I have made few minor changes in the attached. (a) Initialize the
streaming message callback API, (b) update docs to reflect that XID
can be sent for streaming of in-progress transactions, I see that the
same information needs to be updated for a few other protocol message
but we can do that as a separate patch (c) slightly tweaked the commit
messages

Good catch. I completely forgot the streaming of in progress transactions. I
agree that the documentation for transaction should be added as a separate
patch since the scope is beyond this feature.

I have pushed this work and updated the CF entry accordingly.

--
With Regards,
Amit Kapila.

#18Euler Taveira
euler@eulerto.com
In reply to: Amit Kapila (#17)
Re: Any objections to implementing LogicalDecodeMessageCB for pgoutput?

On Wed, Apr 7, 2021, at 2:20 AM, Amit Kapila wrote:

I have pushed this work and updated the CF entry accordingly.

Great. Thank you.

--
Euler Taveira
EDB https://www.enterprisedb.com/