From 5efc1d3f9017ed8e178aa5a3986275bcbd504f30 Mon Sep 17 00:00:00 2001
From: Craig Ringer <craig@2ndquadrant.com>
Date: Thu, 15 Oct 2015 20:45:06 +0800
Subject: [PATCH] Decode replication origin in commit

Fix an oversight in replication origins where the origin_id isn't stored
while decoding the commit record. Prior to this fix the begin and commit
decoding callbacks were always passed InvalidRepOriginId (0) in
txn->origin_id even for remotely-originated transactions.

The replication origin filter hook
OutputPluginCallbacks->filter_by_origin_cb has always been passed the
origin correctly, so test_decoding didn't notice this issue. It only
tested filtering out remote transactions. So this change adds support
for outputting the origin information on decoded transactions in
test_decoding and add tests to exercise it.

Also add tests for changing origin within a transaction.
---
 contrib/test_decoding/expected/replorigin.out | 88 ++++++++++++++++++++++++++-
 contrib/test_decoding/sql/replorigin.sql      | 35 +++++++++++
 contrib/test_decoding/test_decoding.c         | 50 ++++++++++++++-
 src/backend/replication/logical/decode.c      |  2 +-
 4 files changed, 170 insertions(+), 5 deletions(-)

diff --git a/contrib/test_decoding/expected/replorigin.out b/contrib/test_decoding/expected/replorigin.out
index c0f5125..11f9a2b 100644
--- a/contrib/test_decoding/expected/replorigin.out
+++ b/contrib/test_decoding/expected/replorigin.out
@@ -12,11 +12,18 @@ SELECT pg_replication_origin_create('test_decoding: regression_slot');
 SELECT pg_replication_origin_create('test_decoding: regression_slot');
 ERROR:  duplicate key value violates unique constraint "pg_replication_origin_roname_index"
 DETAIL:  Key (roname)=(test_decoding: regression_slot) already exists.
+-- Create a second origin too
+SELECT pg_replication_origin_create('test_decoding: regression_slot 2');
+ pg_replication_origin_create 
+------------------------------
+                            2
+(1 row)
+
 --ensure deletions work (once)
 SELECT pg_replication_origin_create('test_decoding: temp');
  pg_replication_origin_create 
 ------------------------------
-                            2
+                            3
 (1 row)
 
 SELECT pg_replication_origin_drop('test_decoding: temp');
@@ -127,6 +134,85 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
  COMMIT
 (3 rows)
 
+-- This time we'll prepare a series of transactions to decode in one go
+-- rather than decoding one by one.
+--
+-- First, to make sure remote-originated tx's are not filtered out when only-local is unset,
+-- we need another tx with an origin. This time we'll set a different origin for each
+-- change.
+SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
+ pg_replication_origin_session_setup 
+-------------------------------------
+ 
+(1 row)
+
+BEGIN;
+SELECT pg_replication_origin_xact_setup('0/aabbccff', '2013-01-01 00:10');
+ pg_replication_origin_xact_setup 
+----------------------------------
+ 
+(1 row)
+
+INSERT INTO origin_tbl(data) VALUES ('will be replicated even though remotely originated');
+SELECT pg_replication_origin_xact_setup('0/aabbcd00', '2013-01-01 00:11');
+ pg_replication_origin_xact_setup 
+----------------------------------
+ 
+(1 row)
+
+INSERT INTO origin_tbl(data) VALUES ('will also be replicated even though remotely originated');
+-- Change the origin replication identifier mid-transaction
+SELECT pg_replication_origin_session_reset();
+ pg_replication_origin_session_reset 
+-------------------------------------
+ 
+(1 row)
+
+SELECT pg_replication_origin_session_setup('test_decoding: regression_slot 2');
+ pg_replication_origin_session_setup 
+-------------------------------------
+ 
+(1 row)
+
+SELECT pg_replication_origin_xact_setup('0/aabbcd01', '2013-01-01 00:13');
+ pg_replication_origin_xact_setup 
+----------------------------------
+ 
+(1 row)
+
+INSERT INTO origin_tbl(data) VALUES ('from second origin');
+COMMIT;
+-- then run an empty tx, since this test will be setting skip-empty-xacts=0
+-- Note that we need to do something, just something that won't get decoded,
+-- to force a commit to be recorded.
+BEGIN;
+SELECT pg_replication_origin_xact_setup('1/aabbccff', '2013-01-01 00:20');
+ pg_replication_origin_xact_setup 
+----------------------------------
+ 
+(1 row)
+
+CREATE TEMPORARY TABLE test_empty_tx(blah integer);
+COMMIT;
+-- Decode with options not otherwise tested - skip empty xacts on, only-local off. Can't include xids since they'll change each run.
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0',  'only-local', '0', 'include-origins', '1');
+                                                                                    data                                                                                     
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ BEGIN -- origin:'test_decoding: regression_slot 2'@0/AABBCD01
+ table public.origin_tbl: INSERT: id[integer]:4 data[text]:'will be replicated even though remotely originated' -- origin:'test_decoding: regression_slot'@0/AABBCD01
+ table public.origin_tbl: INSERT: id[integer]:5 data[text]:'will also be replicated even though remotely originated' -- origin:'test_decoding: regression_slot'@0/AABBCD01
+ table public.origin_tbl: INSERT: id[integer]:6 data[text]:'from second origin' -- origin:'test_decoding: regression_slot 2'@0/AABBCD01
+ COMMIT -- origin:'test_decoding: regression_slot 2'@0/AABBCD01
+ BEGIN -- origin:'test_decoding: regression_slot 2'@1/AABBCCFF
+ COMMIT -- origin:'test_decoding: regression_slot 2'@1/AABBCCFF
+(7 rows)
+
+SELECT pg_replication_origin_session_reset();
+ pg_replication_origin_session_reset 
+-------------------------------------
+ 
+(1 row)
+
 SELECT pg_drop_replication_slot('regression_slot');
  pg_drop_replication_slot 
 --------------------------
diff --git a/contrib/test_decoding/sql/replorigin.sql b/contrib/test_decoding/sql/replorigin.sql
index e12404e..252c786 100644
--- a/contrib/test_decoding/sql/replorigin.sql
+++ b/contrib/test_decoding/sql/replorigin.sql
@@ -8,6 +8,9 @@ SELECT pg_replication_origin_create('test_decoding: regression_slot');
 -- ensure duplicate creations fail
 SELECT pg_replication_origin_create('test_decoding: regression_slot');
 
+-- Create a second origin too
+SELECT pg_replication_origin_create('test_decoding: regression_slot 2');
+
 --ensure deletions work (once)
 SELECT pg_replication_origin_create('test_decoding: temp');
 SELECT pg_replication_origin_drop('test_decoding: temp');
@@ -60,5 +63,37 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
 INSERT INTO origin_tbl(data) VALUES ('will be replicated');
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1',  'only-local', '1');
 
+-- This time we'll prepare a series of transactions to decode in one go
+-- rather than decoding one by one.
+--
+-- First, to make sure remote-originated tx's are not filtered out when only-local is unset,
+-- we need another tx with an origin. This time we'll set a different origin for each
+-- change.
+SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
+BEGIN;
+SELECT pg_replication_origin_xact_setup('0/aabbccff', '2013-01-01 00:10');
+INSERT INTO origin_tbl(data) VALUES ('will be replicated even though remotely originated');
+SELECT pg_replication_origin_xact_setup('0/aabbcd00', '2013-01-01 00:11');
+INSERT INTO origin_tbl(data) VALUES ('will also be replicated even though remotely originated');
+-- Change the origin replication identifier mid-transaction
+SELECT pg_replication_origin_session_reset();
+SELECT pg_replication_origin_session_setup('test_decoding: regression_slot 2');
+SELECT pg_replication_origin_xact_setup('0/aabbcd01', '2013-01-01 00:13');
+INSERT INTO origin_tbl(data) VALUES ('from second origin');
+COMMIT;
+
+-- then run an empty tx, since this test will be setting skip-empty-xacts=0
+-- Note that we need to do something, just something that won't get decoded,
+-- to force a commit to be recorded.
+BEGIN;
+SELECT pg_replication_origin_xact_setup('1/aabbccff', '2013-01-01 00:20');
+CREATE TEMPORARY TABLE test_empty_tx(blah integer);
+COMMIT;
+
+-- Decode with options not otherwise tested - skip empty xacts on, only-local off. Can't include xids since they'll change each run.
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0',  'only-local', '0', 'include-origins', '1');
+
+SELECT pg_replication_origin_session_reset();
+
 SELECT pg_drop_replication_slot('regression_slot');
 SELECT pg_replication_origin_drop('test_decoding: regression_slot');
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 80fc5f4..4cf35fd 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -45,6 +45,7 @@ typedef struct
 	bool		skip_empty_xacts;
 	bool		xact_wrote_changes;
 	bool		only_local;
+	bool		include_origins;
 } TestDecodingData;
 
 static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
@@ -64,6 +65,9 @@ static void pg_decode_change(LogicalDecodingContext *ctx,
 static bool pg_decode_filter(LogicalDecodingContext *ctx,
 				 RepOriginId origin_id);
 
+static void appendOrigin(LogicalDecodingContext *ctx,
+				TestDecodingData *data, ReorderBufferTXN *txn);
+
 void
 _PG_init(void)
 {
@@ -103,6 +107,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 	data->include_timestamp = false;
 	data->skip_empty_xacts = false;
 	data->only_local = false;
+	data->include_origins = false;
 
 	ctx->output_plugin_private = data;
 
@@ -172,6 +177,17 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 				  errmsg("could not parse value \"%s\" for parameter \"%s\"",
 						 strVal(elem->arg), elem->defname)));
 		}
+		else if (strcmp(elem->defname, "include-origins") == 0)
+		{
+
+			if (elem->arg == NULL)
+				data->include_origins = true;
+			else if (!parse_bool(strVal(elem->arg), &data->include_origins))
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				  errmsg("could not parse value \"%s\" for parameter \"%s\"",
+						 strVal(elem->arg), elem->defname)));
+		}
 		else
 		{
 			ereport(ERROR,
@@ -203,6 +219,8 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 	if (data->skip_empty_xacts)
 		return;
 
+	Assert(txn->origin_id != DoNotReplicateId);
+
 	pg_output_begin(ctx, data, txn, true);
 }
 
@@ -210,10 +228,14 @@ static void
 pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
 {
 	OutputPluginPrepareWrite(ctx, last_write);
+
+	appendStringInfoString(ctx->out, "BEGIN");
+
 	if (data->include_xids)
-		appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
-	else
-		appendStringInfoString(ctx->out, "BEGIN");
+		appendStringInfo(ctx->out, " %u", txn->xid);
+
+	appendOrigin(ctx, data, txn);
+
 	OutputPluginWrite(ctx, last_write);
 }
 
@@ -237,6 +259,10 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 		appendStringInfo(ctx->out, " (at %s)",
 						 timestamptz_to_str(txn->commit_time));
 
+	appendOrigin(ctx, data, txn);
+
+	Assert(txn->origin_id != DoNotReplicateId);
+
 	OutputPluginWrite(ctx, true);
 }
 
@@ -469,5 +495,23 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	MemoryContextSwitchTo(old);
 	MemoryContextReset(data->context);
 
+	appendOrigin(ctx, data, txn);
+
 	OutputPluginWrite(ctx, true);
 }
+
+static void
+appendOrigin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn)
+{
+	char *origin;
+
+	if (data->include_origins && txn->origin_id != InvalidRepOriginId)
+	{
+		if (!replorigin_by_oid(txn->origin_id, true, &origin))
+			origin = "[unknown]";
+		appendStringInfo(ctx->out, " -- origin:'%s'@%X/%X",
+				origin,
+				(uint32)(txn->origin_lsn >> 32),
+				(uint32)(txn->origin_lsn));
+	}
+}
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index c629da3..9f60687 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -450,7 +450,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 {
 	XLogRecPtr	origin_lsn = InvalidXLogRecPtr;
 	XLogRecPtr	commit_time = InvalidXLogRecPtr;
-	XLogRecPtr	origin_id = InvalidRepOriginId;
+	XLogRecPtr	origin_id = XLogRecGetOrigin(buf->record);
 	int			i;
 
 	if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
-- 
2.1.0

