From 5a39e28d9508d5ad7c3759100da5d64d5ff9c08e Mon Sep 17 00:00:00 2001
From: Euler Taveira <euler.taveira@enterprisedb.com>
Date: Thu, 19 Nov 2020 20:14:08 -0300
Subject: [PATCH 5/6] Adjust in_streaming for messages

Use the same pattern as pgoutput_XXX functions, e.g., check in_streaming
into pgoutput_message and then pass xid as argument to
logicalrep_write_message.

Since ReorderBufferTXN is not used in logicalrep_write_message, remove
it.
---
 src/backend/replication/logical/proto.c     | 16 ++++------------
 src/backend/replication/pgoutput/pgoutput.c | 11 +++++++++--
 src/include/replication/logicalproto.h      |  3 +--
 3 files changed, 14 insertions(+), 16 deletions(-)

diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index d5eeee4784..a598658a8d 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -366,28 +366,20 @@ logicalrep_read_truncate(StringInfo in,
  * Write MESSAGE to stream
  */
 void
-logicalrep_write_message(StringInfo out,
-						 bool in_streaming,
-						 ReorderBufferTXN *txn,
-						 XLogRecPtr lsn,
-						 bool transactional,
-						 const char *prefix,
-						 Size sz,
+logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
+						 bool transactional, const char *prefix, Size sz,
 						 const char *message)
 {
 	uint8		flags = 0;
-	TransactionId xid = InvalidTransactionId;
 
 	pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE);
 
 	/* encode and send message flags */
 	if (transactional)
-	{
 		flags |= MESSAGE_TRANSACTIONAL;
-		xid = txn->xid;
-	}
 
-	if (in_streaming)
+	/* transaction ID (if not valid, we're not streaming) */
+	if (TransactionIdIsValid(xid))
 		pq_sendint32(out, xid);
 
 	pq_sendint8(out, flags);
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index b25e67edcb..f9bc15da40 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -700,14 +700,21 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				 const char *message)
 {
 	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+	TransactionId xid = InvalidTransactionId;
 
 	if (!data->messages)
 		return;
 
+	/*
+	 * Remember the xid for the message in streaming mode. See
+	 * pgoutput_change.
+	 */
+	if (in_streaming)
+		xid = txn->xid;
+
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_message(ctx->out,
-							 in_streaming,
-							 txn,
+							 xid,
 							 message_lsn,
 							 transactional,
 							 prefix,
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index f22c9436e0..d3d656dfb2 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -152,8 +152,7 @@ extern void logicalrep_write_truncate(StringInfo out, TransactionId xid,
 									  bool cascade, bool restart_seqs);
 extern List *logicalrep_read_truncate(StringInfo in,
 									  bool *cascade, bool *restart_seqs);
-extern void logicalrep_write_message(StringInfo out, bool in_streaming,
-									 ReorderBufferTXN *txn, XLogRecPtr lsn,
+extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
 									 bool transactional, const char *prefix, Size sz, const char *message);
 extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
 								 Relation rel);
-- 
2.20.1

