From d59282209327a4def92d10284d1ff9a08819fb9f Mon Sep 17 00:00:00 2001
From: Craig Ringer <craig@2ndquadrant.com>
Date: Thu, 15 Oct 2015 15:50:04 +0800
Subject: [PATCH] Expose origin_id to logical decoding callacks

Fix an oversight in replication origins where the origin_id isn't stored
while decoding the commit record. Prior to this fix the begin and commit
decoding callbacks were always passed InvalidRepOriginId (0) in
txn->origin_id even for remotely-originated transactions.

The replication origin filter hook
OutputPluginCallbacks->filter_by_origin_cb has always been passed the
origin correctly, so test_decoding didn't notice this issue. It only
tested filtering out remote transactions. So this change adds support
for outputting the origin information on decoded transactions in
test_decoding and add tests to exercise it.
---
 contrib/test_decoding/expected/replorigin.out | 43 +++++++++++++++++++++++++++
 contrib/test_decoding/sql/replorigin.sql      | 18 +++++++++++
 contrib/test_decoding/test_decoding.c         | 37 +++++++++++++++++++++--
 src/backend/replication/logical/decode.c      |  1 +
 4 files changed, 96 insertions(+), 3 deletions(-)

diff --git a/contrib/test_decoding/expected/replorigin.out b/contrib/test_decoding/expected/replorigin.out
index c0f5125..eba8da2 100644
--- a/contrib/test_decoding/expected/replorigin.out
+++ b/contrib/test_decoding/expected/replorigin.out
@@ -127,6 +127,49 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
  COMMIT
 (3 rows)
 
+-- Make sure remote-originated tx's are not filtered out when only-local is unset
+SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
+ pg_replication_origin_session_setup 
+-------------------------------------
+ 
+(1 row)
+
+BEGIN;
+SELECT pg_replication_origin_xact_setup('0/aabbccff', '2013-01-01 00:10');
+ pg_replication_origin_xact_setup 
+----------------------------------
+ 
+(1 row)
+
+INSERT INTO origin_tbl(data) VALUES ('will be replicated even though remotely originated');
+COMMIT;
+-- and empty tx's
+BEGIN;
+SELECT pg_replication_origin_xact_setup('1/aabbccff', '2013-01-01 00:20');
+ pg_replication_origin_xact_setup 
+----------------------------------
+ 
+(1 row)
+
+CREATE TEMPORARY TABLE test_empty_tx(blah integer);
+COMMIT;
+-- Decode with options not otherwise tested - skip empty xacts on, only-local off. Can't include xids since they'll change each run.
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0',  'only-local', '0', 'include-origins', '1');
+                                                      data                                                      
+----------------------------------------------------------------------------------------------------------------
+ BEGIN (from "test_decoding: regression_slot" at lsn 0/AABBCCFF)
+ table public.origin_tbl: INSERT: id[integer]:4 data[text]:'will be replicated even though remotely originated'
+ COMMIT
+ BEGIN (from "test_decoding: regression_slot" at lsn 1/AABBCCFF)
+ COMMIT
+(5 rows)
+
+SELECT pg_replication_origin_session_reset();
+ pg_replication_origin_session_reset 
+-------------------------------------
+ 
+(1 row)
+
 SELECT pg_drop_replication_slot('regression_slot');
  pg_drop_replication_slot 
 --------------------------
diff --git a/contrib/test_decoding/sql/replorigin.sql b/contrib/test_decoding/sql/replorigin.sql
index e12404e..9115d34 100644
--- a/contrib/test_decoding/sql/replorigin.sql
+++ b/contrib/test_decoding/sql/replorigin.sql
@@ -60,5 +60,23 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
 INSERT INTO origin_tbl(data) VALUES ('will be replicated');
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1',  'only-local', '1');
 
+-- Make sure remote-originated tx's are not filtered out when only-local is unset
+SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
+BEGIN;
+SELECT pg_replication_origin_xact_setup('0/aabbccff', '2013-01-01 00:10');
+INSERT INTO origin_tbl(data) VALUES ('will be replicated even though remotely originated');
+COMMIT;
+
+-- and empty tx's
+BEGIN;
+SELECT pg_replication_origin_xact_setup('1/aabbccff', '2013-01-01 00:20');
+CREATE TEMPORARY TABLE test_empty_tx(blah integer);
+COMMIT;
+
+-- Decode with options not otherwise tested - skip empty xacts on, only-local off. Can't include xids since they'll change each run.
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0',  'only-local', '0', 'include-origins', '1');
+
+SELECT pg_replication_origin_session_reset();
+
 SELECT pg_drop_replication_slot('regression_slot');
 SELECT pg_replication_origin_drop('test_decoding: regression_slot');
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 80fc5f4..e041154 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -45,6 +45,7 @@ typedef struct
 	bool		skip_empty_xacts;
 	bool		xact_wrote_changes;
 	bool		only_local;
+	bool		include_origins;
 } TestDecodingData;
 
 static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
@@ -103,6 +104,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 	data->include_timestamp = false;
 	data->skip_empty_xacts = false;
 	data->only_local = false;
+	data->include_origins = false;
 
 	ctx->output_plugin_private = data;
 
@@ -172,6 +174,17 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 				  errmsg("could not parse value \"%s\" for parameter \"%s\"",
 						 strVal(elem->arg), elem->defname)));
 		}
+		else if (strcmp(elem->defname, "include-origins") == 0)
+		{
+
+			if (elem->arg == NULL)
+				data->include_origins = true;
+			else if (!parse_bool(strVal(elem->arg), &data->include_origins))
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				  errmsg("could not parse value \"%s\" for parameter \"%s\"",
+						 strVal(elem->arg), elem->defname)));
+		}
 		else
 		{
 			ereport(ERROR,
@@ -203,17 +216,33 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 	if (data->skip_empty_xacts)
 		return;
 
+	Assert(txn->origin_id != DoNotReplicateId);
+
 	pg_output_begin(ctx, data, txn, true);
 }
 
 static void
 pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
 {
+	char *origin = NULL;
+
 	OutputPluginPrepareWrite(ctx, last_write);
+
+	appendStringInfoString(ctx->out, "BEGIN");
+
 	if (data->include_xids)
-		appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
-	else
-		appendStringInfoString(ctx->out, "BEGIN");
+		appendStringInfo(ctx->out, " %u", txn->xid);
+
+	if (data->include_origins && txn->origin_id != InvalidRepOriginId)
+	{
+		if (!replorigin_by_oid(txn->origin_id, true, &origin))
+			origin = "unknown node";
+		appendStringInfo(ctx->out, " (from \"%s\" at lsn %X/%X)",
+				origin,
+				(uint32)(txn->origin_lsn >> 32),
+				(uint32)(txn->origin_lsn));
+	}
+
 	OutputPluginWrite(ctx, last_write);
 }
 
@@ -237,6 +266,8 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 		appendStringInfo(ctx->out, " (at %s)",
 						 timestamptz_to_str(txn->commit_time));
 
+	Assert(txn->origin_id != DoNotReplicateId);
+
 	OutputPluginWrite(ctx, true);
 }
 
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index c629da3..9044226 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -455,6 +455,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 
 	if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
 	{
+		origin_id = XLogRecGetOrigin(buf->record);
 		origin_lsn = parsed->origin_lsn;
 		commit_time = parsed->origin_timestamp;
 	}
-- 
2.1.0

