From bf991e458df1f03708450ddd76a75a9476b5809a Mon Sep 17 00:00:00 2001
From: Dave Pirotte <dpirotte@gmail.com>
Date: Tue, 17 Nov 2020 04:01:34 +0000
Subject: [PATCH 2/6] Add xid to messages when streaming

---
 src/backend/replication/logical/proto.c     | 16 ++++++++++++++--
 src/backend/replication/pgoutput/pgoutput.c |  2 ++
 src/include/replication/logicalproto.h      |  3 ++-
 3 files changed, 18 insertions(+), 3 deletions(-)

diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index deba2a321c..d5eeee4784 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -366,17 +366,29 @@ logicalrep_read_truncate(StringInfo in,
  * Write MESSAGE to stream
  */
 void
-logicalrep_write_message(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr lsn,
-						 bool transactional, const char *prefix, Size sz,
+logicalrep_write_message(StringInfo out,
+						 bool in_streaming,
+						 ReorderBufferTXN *txn,
+						 XLogRecPtr lsn,
+						 bool transactional,
+						 const char *prefix,
+						 Size sz,
 						 const char *message)
 {
 	uint8		flags = 0;
+	TransactionId xid = InvalidTransactionId;
 
 	pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE);
 
 	/* encode and send message flags */
 	if (transactional)
+	{
 		flags |= MESSAGE_TRANSACTIONAL;
+		xid = txn->xid;
+	}
+
+	if (in_streaming)
+		pq_sendint32(out, xid);
 
 	pq_sendint8(out, flags);
 	pq_sendint64(out, lsn);
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index cd849c10a4..bd3c2a3b99 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -707,11 +707,13 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				 const char *message)
 {
 	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+
 	if (!data->messages)
 		return;
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_message(ctx->out,
+							 in_streaming,
 							 txn,
 							 message_lsn,
 							 transactional,
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index f3c8f95e2c..f22c9436e0 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -152,7 +152,8 @@ extern void logicalrep_write_truncate(StringInfo out, TransactionId xid,
 									  bool cascade, bool restart_seqs);
 extern List *logicalrep_read_truncate(StringInfo in,
 									  bool *cascade, bool *restart_seqs);
-extern void logicalrep_write_message(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr lsn,
+extern void logicalrep_write_message(StringInfo out, bool in_streaming,
+									 ReorderBufferTXN *txn, XLogRecPtr lsn,
 									 bool transactional, const char *prefix, Size sz, const char *message);
 extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
 								 Relation rel);
-- 
2.20.1

