From 433ea40a02ab823f3aa70c18928b9862f0eb004b Mon Sep 17 00:00:00 2001
From: Euler Taveira <euler@timbira.com.br>
Date: Fri, 8 Nov 2019 12:48:03 -0300
Subject: [PATCH] Skip empty transactions for logical replication

The current logical replication behavior is to send every transaction to
subscriber even though the transaction is empty (because it does not
contain changes from the selected publications). It is a waste of CPU
cycles and network bandwidth to build/transmit those empty transactions.
Postpone the BEGIN message until the first change. While processing a
COMMIT message, if there is not a previous wrote change for that
transaction, does not send COMMIT message. It means that pgoutput will
skip BEGIN / COMMIT messages for transactions that do not wrote changes.

Discussion:
https://postgr.es/m/CAMkU=1yohp9-dv48FLoSPrMqYEyyS5ZWkaZGD41RJr10xiNo_Q@mail.gmail.com
---
 src/backend/replication/pgoutput/pgoutput.c | 34 +++++++++++++++++++++++++++++
 src/include/replication/pgoutput.h          |  3 +++
 2 files changed, 37 insertions(+)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 9c08757..eed1093 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -212,6 +212,22 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 static void
 pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 {
+	PGOutputData	*data = ctx->output_plugin_private;
+
+	/*
+	 * Don't send BEGIN message here. Instead, postpone it until the first
+	 * change. In logical replication, common scenarios is to replicate a set
+	 * of tables (instead of all tables) and transactions whose changes were to
+	 * table(s) that are not published will produce empty transactions. These
+	 * empty transactions will send BEGIN and COMMIT messages to subscribers,
+	 * using bandwidth on something with little/no use for logical replication.
+	 */
+	data->xact_wrote_changes = false;
+}
+
+static void
+pgoutput_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
+{
 	bool		send_replication_origin = txn->origin_id != InvalidRepOriginId;
 
 	OutputPluginPrepareWrite(ctx, !send_replication_origin);
@@ -249,8 +265,14 @@ static void
 pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					XLogRecPtr commit_lsn)
 {
+	PGOutputData	*data = ctx->output_plugin_private;
+
 	OutputPluginUpdateProgress(ctx);
 
+	/* skip COMMIT message if nothing was sent */
+	if (!data->xact_wrote_changes)
+		return;
+
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_commit(ctx->out, txn, commit_lsn);
 	OutputPluginWrite(ctx, true);
@@ -335,6 +357,12 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			Assert(false);
 	}
 
+	/* output BEGIN if we haven't yet */
+	if (!data->xact_wrote_changes)
+		pgoutput_begin(ctx, txn);
+
+	data->xact_wrote_changes = true;
+
 	/* Avoid leaking memory by using and resetting our own context */
 	old = MemoryContextSwitchTo(data->context);
 
@@ -415,6 +443,12 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
 	if (nrelids > 0)
 	{
+		/* output BEGIN if we haven't yet */
+		if (!data->xact_wrote_changes)
+			pgoutput_begin(ctx, txn);
+
+		data->xact_wrote_changes = true;
+
 		OutputPluginPrepareWrite(ctx, true);
 		logicalrep_write_truncate(ctx->out,
 								  nrelids,
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index 8870721..cb57e76 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -20,6 +20,9 @@ typedef struct PGOutputData
 	MemoryContext context;		/* private memory context for transient
 								 * allocations */
 
+	/* control wether messages can already be sent */
+	bool		xact_wrote_changes;
+
 	/* client info */
 	uint32		protocol_version;
 
-- 
2.7.4

