PATCH: 9.5 replication origins fix for logical decoding
Hi all
There's an oversight in replication origins support in logical
decoding, where the origin node ID isn't passed correctly to callbacks
except for the origin filter callback. All other callbacks see it as
InvalidRepOriginId.
It's a one-line fix, but I've added support in test_decoding to
validate the fix and expanded the test script to cover it.
Should be applied to 9.5 and 9.6.
--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
Attachments:
0001-Expose-origin_id-to-logical-decoding-callacks.patchtext/x-patch; charset=US-ASCII; name=0001-Expose-origin_id-to-logical-decoding-callacks.patchDownload
From d59282209327a4def92d10284d1ff9a08819fb9f Mon Sep 17 00:00:00 2001
From: Craig Ringer <craig@2ndquadrant.com>
Date: Thu, 15 Oct 2015 15:50:04 +0800
Subject: [PATCH] Expose origin_id to logical decoding callacks
Fix an oversight in replication origins where the origin_id isn't stored
while decoding the commit record. Prior to this fix the begin and commit
decoding callbacks were always passed InvalidRepOriginId (0) in
txn->origin_id even for remotely-originated transactions.
The replication origin filter hook
OutputPluginCallbacks->filter_by_origin_cb has always been passed the
origin correctly, so test_decoding didn't notice this issue. It only
tested filtering out remote transactions. So this change adds support
for outputting the origin information on decoded transactions in
test_decoding and add tests to exercise it.
---
contrib/test_decoding/expected/replorigin.out | 43 +++++++++++++++++++++++++++
contrib/test_decoding/sql/replorigin.sql | 18 +++++++++++
contrib/test_decoding/test_decoding.c | 37 +++++++++++++++++++++--
src/backend/replication/logical/decode.c | 1 +
4 files changed, 96 insertions(+), 3 deletions(-)
diff --git a/contrib/test_decoding/expected/replorigin.out b/contrib/test_decoding/expected/replorigin.out
index c0f5125..eba8da2 100644
--- a/contrib/test_decoding/expected/replorigin.out
+++ b/contrib/test_decoding/expected/replorigin.out
@@ -127,6 +127,49 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
COMMIT
(3 rows)
+-- Make sure remote-originated tx's are not filtered out when only-local is unset
+SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
+ pg_replication_origin_session_setup
+-------------------------------------
+
+(1 row)
+
+BEGIN;
+SELECT pg_replication_origin_xact_setup('0/aabbccff', '2013-01-01 00:10');
+ pg_replication_origin_xact_setup
+----------------------------------
+
+(1 row)
+
+INSERT INTO origin_tbl(data) VALUES ('will be replicated even though remotely originated');
+COMMIT;
+-- and empty tx's
+BEGIN;
+SELECT pg_replication_origin_xact_setup('1/aabbccff', '2013-01-01 00:20');
+ pg_replication_origin_xact_setup
+----------------------------------
+
+(1 row)
+
+CREATE TEMPORARY TABLE test_empty_tx(blah integer);
+COMMIT;
+-- Decode with options not otherwise tested - skip empty xacts on, only-local off. Can't include xids since they'll change each run.
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'only-local', '0', 'include-origins', '1');
+ data
+----------------------------------------------------------------------------------------------------------------
+ BEGIN (from "test_decoding: regression_slot" at lsn 0/AABBCCFF)
+ table public.origin_tbl: INSERT: id[integer]:4 data[text]:'will be replicated even though remotely originated'
+ COMMIT
+ BEGIN (from "test_decoding: regression_slot" at lsn 1/AABBCCFF)
+ COMMIT
+(5 rows)
+
+SELECT pg_replication_origin_session_reset();
+ pg_replication_origin_session_reset
+-------------------------------------
+
+(1 row)
+
SELECT pg_drop_replication_slot('regression_slot');
pg_drop_replication_slot
--------------------------
diff --git a/contrib/test_decoding/sql/replorigin.sql b/contrib/test_decoding/sql/replorigin.sql
index e12404e..9115d34 100644
--- a/contrib/test_decoding/sql/replorigin.sql
+++ b/contrib/test_decoding/sql/replorigin.sql
@@ -60,5 +60,23 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
INSERT INTO origin_tbl(data) VALUES ('will be replicated');
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
+-- Make sure remote-originated tx's are not filtered out when only-local is unset
+SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
+BEGIN;
+SELECT pg_replication_origin_xact_setup('0/aabbccff', '2013-01-01 00:10');
+INSERT INTO origin_tbl(data) VALUES ('will be replicated even though remotely originated');
+COMMIT;
+
+-- and empty tx's
+BEGIN;
+SELECT pg_replication_origin_xact_setup('1/aabbccff', '2013-01-01 00:20');
+CREATE TEMPORARY TABLE test_empty_tx(blah integer);
+COMMIT;
+
+-- Decode with options not otherwise tested - skip empty xacts on, only-local off. Can't include xids since they'll change each run.
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'only-local', '0', 'include-origins', '1');
+
+SELECT pg_replication_origin_session_reset();
+
SELECT pg_drop_replication_slot('regression_slot');
SELECT pg_replication_origin_drop('test_decoding: regression_slot');
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 80fc5f4..e041154 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -45,6 +45,7 @@ typedef struct
bool skip_empty_xacts;
bool xact_wrote_changes;
bool only_local;
+ bool include_origins;
} TestDecodingData;
static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
@@ -103,6 +104,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
data->include_timestamp = false;
data->skip_empty_xacts = false;
data->only_local = false;
+ data->include_origins = false;
ctx->output_plugin_private = data;
@@ -172,6 +174,17 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
+ else if (strcmp(elem->defname, "include-origins") == 0)
+ {
+
+ if (elem->arg == NULL)
+ data->include_origins = true;
+ else if (!parse_bool(strVal(elem->arg), &data->include_origins))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("could not parse value \"%s\" for parameter \"%s\"",
+ strVal(elem->arg), elem->defname)));
+ }
else
{
ereport(ERROR,
@@ -203,17 +216,33 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
if (data->skip_empty_xacts)
return;
+ Assert(txn->origin_id != DoNotReplicateId);
+
pg_output_begin(ctx, data, txn, true);
}
static void
pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
{
+ char *origin = NULL;
+
OutputPluginPrepareWrite(ctx, last_write);
+
+ appendStringInfoString(ctx->out, "BEGIN");
+
if (data->include_xids)
- appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
- else
- appendStringInfoString(ctx->out, "BEGIN");
+ appendStringInfo(ctx->out, " %u", txn->xid);
+
+ if (data->include_origins && txn->origin_id != InvalidRepOriginId)
+ {
+ if (!replorigin_by_oid(txn->origin_id, true, &origin))
+ origin = "unknown node";
+ appendStringInfo(ctx->out, " (from \"%s\" at lsn %X/%X)",
+ origin,
+ (uint32)(txn->origin_lsn >> 32),
+ (uint32)(txn->origin_lsn));
+ }
+
OutputPluginWrite(ctx, last_write);
}
@@ -237,6 +266,8 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
appendStringInfo(ctx->out, " (at %s)",
timestamptz_to_str(txn->commit_time));
+ Assert(txn->origin_id != DoNotReplicateId);
+
OutputPluginWrite(ctx, true);
}
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index c629da3..9044226 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -455,6 +455,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
{
+ origin_id = XLogRecGetOrigin(buf->record);
origin_lsn = parsed->origin_lsn;
commit_time = parsed->origin_timestamp;
}
--
2.1.0
On 2015-10-15 16:02:23 +0800, Craig Ringer wrote:
There's an oversight in replication origins support in logical
decoding, where the origin node ID isn't passed correctly to callbacks
except for the origin filter callback. All other callbacks see it as
InvalidRepOriginId.
Only for the transaction, right? I.e. the stuff on changes should be
correct? Your description sounds like it's more than that?
I don't think your fix is entirely correct, the
XLogRecGetOrigin(buf->record) shouldn't be in the XACT_XINFO_HAS_ORIGIN
block.
Your test prints the origins from the transaction instead the changes -
why?
Greetings,
Andres Freund
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 15 October 2015 at 16:48, Andres Freund <andres@anarazel.de> wrote:
On 2015-10-15 16:02:23 +0800, Craig Ringer wrote:
There's an oversight in replication origins support in logical
decoding, where the origin node ID isn't passed correctly to callbacks
except for the origin filter callback. All other callbacks see it as
InvalidRepOriginId.Only for the transaction, right? I.e. the stuff on changes should be
correct? Your description sounds like it's more than that?I don't think your fix is entirely correct, the
XLogRecGetOrigin(buf->record) shouldn't be in the XACT_XINFO_HAS_ORIGIN
block.
I guess since it'll be InvalidRepOriginId otherwise, that makes sense.
Your test prints the origins from the transaction instead the changes -
why?
I don't understand this part.
Do you mean:
+ BEGIN (from "test_decoding: regression_slot" at lsn 0/AABBCCFF)
?
--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 2015-10-15 17:05:33 +0800, Craig Ringer wrote:
On 15 October 2015 at 16:48, Andres Freund <andres@anarazel.de> wrote:
On 2015-10-15 16:02:23 +0800, Craig Ringer wrote:
There's an oversight in replication origins support in logical
decoding, where the origin node ID isn't passed correctly to callbacks
except for the origin filter callback. All other callbacks see it as
InvalidRepOriginId.Only for the transaction, right? I.e. the stuff on changes should be
correct? Your description sounds like it's more than that?I don't think your fix is entirely correct, the
XLogRecGetOrigin(buf->record) shouldn't be in the XACT_XINFO_HAS_ORIGIN
block.I guess since it'll be InvalidRepOriginId otherwise, that makes sense.
That's not the point. XACT_XINFO_HAS_ORIGIN is about
origin_lsn/timestamp, it doesn't have anything to do with the record
origin (which is included in many more types of record than just
commits).
Your test prints the origins from the transaction instead the changes -
why?I don't understand this part.
Your test prints origin in commits - but changes can have individual
origins.
Greetings,
Andres Freund
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 15 October 2015 at 17:43, Andres Freund <andres@anarazel.de> wrote:
I guess since it'll be InvalidRepOriginId otherwise, that makes sense.
That's not the point. XACT_XINFO_HAS_ORIGIN is about
origin_lsn/timestamp, it doesn't have anything to do with the record
origin (which is included in many more types of record than just
commits).
Ok, I think I see. That's also why it wasn't incorporated into
xl_xact_parsed_commit.
I'll check which records can contain it and assign it in the
appropriate decoding calls. I'll follow up in a while with an updated
patch.
Your test prints the origins from the transaction instead the changes -
why?I don't understand this part.
Your test prints origin in commits - but changes can have individual
origins.
I was completely unaware of that, so thankyou. I'll change the tests
to exercise that. Any preferences on the output format?
Maybe:
table public.origin_tbl: INSERT: id[integer]:6 data[text]:'from second
origin' -- origin:'some_origin' origin_lsn:'0/1234'
?
it's cluttered, but really I'm not sure there's a pretty way to pack
that in, and it's only test output.
--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On October 15, 2015 1:02:04 PM GMT+02:00, Craig Ringer <craig@2ndquadrant.com> wrote:
On 15 October 2015 at 17:43, Andres Freund <andres@anarazel.de> wrote:
I guess since it'll be InvalidRepOriginId otherwise, that makes
sense.
That's not the point. XACT_XINFO_HAS_ORIGIN is about
origin_lsn/timestamp, it doesn't have anything to do with the record
origin (which is included in many more types of record than just
commits).Ok, I think I see. That's also why it wasn't incorporated into
xl_xact_parsed_commit.I'll check which records can contain it and assign it in the
appropriate decoding calls. I'll follow up in a while with an updated
patch.
As far as I can see all the other places have it assigned.
Your test prints the origins from the transaction instead the
changes -
why?
I don't understand this part.
Your test prints origin in commits - but changes can have individual
origins.I was completely unaware of that, so thankyou. I'll change the tests
to exercise that. Any preferences on the output format?Maybe:
table public.origin_tbl: INSERT: id[integer]:6 data[text]:'from second
origin' -- origin:'some_origin' origin_lsn:'0/1234'?
it's cluttered, but really I'm not sure there's a pretty way to pack
that in, and it's only test output.
I'm inclined not to commit this part - seems to add too much complications for the amount of coverage. But please use it for testing.
Andres
---
Please excuse brevity and formatting - I am writing this on my mobile phone.
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 15 October 2015 at 19:04, Andres Freund <andres@anarazel.de> wrote:
As far as I can see all the other places have it assigned.
Ok, thanks. Not much need for a followup patch then, if you're not
using the test changes part.
table public.origin_tbl: INSERT: id[integer]:6 data[text]:'from second
origin' -- origin:'some_origin' origin_lsn:'0/1234'?
it's cluttered, but really I'm not sure there's a pretty way to pack
that in, and it's only test output.I'm inclined not to commit this part - seems to add too much complications for the amount of coverage. But please use it for testing.
It doesn't seem like this will be particularly vulnerable to
regressions or have new record types added that need a check for them.
I'd be inclined to add the info, but I have a higher tolerance for
verbosity than you ;)
I think it's worth adding a test for the change of origin mid-tx. I
had no idea that was even possible.
Testing forwarding of empty tx's is simple and should probably be there too.
--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 15 October 2015 at 20:11, Craig Ringer <craig@2ndquadrant.com> wrote:
On 15 October 2015 at 19:04, Andres Freund <andres@anarazel.de> wrote:
As far as I can see all the other places have it assigned.
Ok, thanks. Not much need for a followup patch then, if you're not
using the test changes part.
Here's what I used for my tests, anyway, including an updated fix.
You'll note that the tests fail. When the replication origin is reset
and set again with pg_replication_origin_xact_setup mid-xact, the
origin identity exposed to the decoding plugin callbacks for all
records (including those created before the origin change) is the
latter origin, the one active at COMMIT time.
Is that the intended behaviour? That the session identifier is
determined by what was active at commit time, and only the lsn and
timestamp vary throughout the xact? It looks like it from the code.
Should pg_replication_origin_xact_reset() and
pg_replication_origin_xact_setup() be permitted within a transaction?
Or is this just a "well, don't do that"?
--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
Attachments:
0001-Decode-replication-origin-in-commit.patchtext/x-patch; charset=US-ASCII; name=0001-Decode-replication-origin-in-commit.patchDownload
From 5efc1d3f9017ed8e178aa5a3986275bcbd504f30 Mon Sep 17 00:00:00 2001
From: Craig Ringer <craig@2ndquadrant.com>
Date: Thu, 15 Oct 2015 20:45:06 +0800
Subject: [PATCH] Decode replication origin in commit
Fix an oversight in replication origins where the origin_id isn't stored
while decoding the commit record. Prior to this fix the begin and commit
decoding callbacks were always passed InvalidRepOriginId (0) in
txn->origin_id even for remotely-originated transactions.
The replication origin filter hook
OutputPluginCallbacks->filter_by_origin_cb has always been passed the
origin correctly, so test_decoding didn't notice this issue. It only
tested filtering out remote transactions. So this change adds support
for outputting the origin information on decoded transactions in
test_decoding and add tests to exercise it.
Also add tests for changing origin within a transaction.
---
contrib/test_decoding/expected/replorigin.out | 88 ++++++++++++++++++++++++++-
contrib/test_decoding/sql/replorigin.sql | 35 +++++++++++
contrib/test_decoding/test_decoding.c | 50 ++++++++++++++-
src/backend/replication/logical/decode.c | 2 +-
4 files changed, 170 insertions(+), 5 deletions(-)
diff --git a/contrib/test_decoding/expected/replorigin.out b/contrib/test_decoding/expected/replorigin.out
index c0f5125..11f9a2b 100644
--- a/contrib/test_decoding/expected/replorigin.out
+++ b/contrib/test_decoding/expected/replorigin.out
@@ -12,11 +12,18 @@ SELECT pg_replication_origin_create('test_decoding: regression_slot');
SELECT pg_replication_origin_create('test_decoding: regression_slot');
ERROR: duplicate key value violates unique constraint "pg_replication_origin_roname_index"
DETAIL: Key (roname)=(test_decoding: regression_slot) already exists.
+-- Create a second origin too
+SELECT pg_replication_origin_create('test_decoding: regression_slot 2');
+ pg_replication_origin_create
+------------------------------
+ 2
+(1 row)
+
--ensure deletions work (once)
SELECT pg_replication_origin_create('test_decoding: temp');
pg_replication_origin_create
------------------------------
- 2
+ 3
(1 row)
SELECT pg_replication_origin_drop('test_decoding: temp');
@@ -127,6 +134,85 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
COMMIT
(3 rows)
+-- This time we'll prepare a series of transactions to decode in one go
+-- rather than decoding one by one.
+--
+-- First, to make sure remote-originated tx's are not filtered out when only-local is unset,
+-- we need another tx with an origin. This time we'll set a different origin for each
+-- change.
+SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
+ pg_replication_origin_session_setup
+-------------------------------------
+
+(1 row)
+
+BEGIN;
+SELECT pg_replication_origin_xact_setup('0/aabbccff', '2013-01-01 00:10');
+ pg_replication_origin_xact_setup
+----------------------------------
+
+(1 row)
+
+INSERT INTO origin_tbl(data) VALUES ('will be replicated even though remotely originated');
+SELECT pg_replication_origin_xact_setup('0/aabbcd00', '2013-01-01 00:11');
+ pg_replication_origin_xact_setup
+----------------------------------
+
+(1 row)
+
+INSERT INTO origin_tbl(data) VALUES ('will also be replicated even though remotely originated');
+-- Change the origin replication identifier mid-transaction
+SELECT pg_replication_origin_session_reset();
+ pg_replication_origin_session_reset
+-------------------------------------
+
+(1 row)
+
+SELECT pg_replication_origin_session_setup('test_decoding: regression_slot 2');
+ pg_replication_origin_session_setup
+-------------------------------------
+
+(1 row)
+
+SELECT pg_replication_origin_xact_setup('0/aabbcd01', '2013-01-01 00:13');
+ pg_replication_origin_xact_setup
+----------------------------------
+
+(1 row)
+
+INSERT INTO origin_tbl(data) VALUES ('from second origin');
+COMMIT;
+-- then run an empty tx, since this test will be setting skip-empty-xacts=0
+-- Note that we need to do something, just something that won't get decoded,
+-- to force a commit to be recorded.
+BEGIN;
+SELECT pg_replication_origin_xact_setup('1/aabbccff', '2013-01-01 00:20');
+ pg_replication_origin_xact_setup
+----------------------------------
+
+(1 row)
+
+CREATE TEMPORARY TABLE test_empty_tx(blah integer);
+COMMIT;
+-- Decode with options not otherwise tested - skip empty xacts on, only-local off. Can't include xids since they'll change each run.
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'only-local', '0', 'include-origins', '1');
+ data
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ BEGIN -- origin:'test_decoding: regression_slot 2'@0/AABBCD01
+ table public.origin_tbl: INSERT: id[integer]:4 data[text]:'will be replicated even though remotely originated' -- origin:'test_decoding: regression_slot'@0/AABBCD01
+ table public.origin_tbl: INSERT: id[integer]:5 data[text]:'will also be replicated even though remotely originated' -- origin:'test_decoding: regression_slot'@0/AABBCD01
+ table public.origin_tbl: INSERT: id[integer]:6 data[text]:'from second origin' -- origin:'test_decoding: regression_slot 2'@0/AABBCD01
+ COMMIT -- origin:'test_decoding: regression_slot 2'@0/AABBCD01
+ BEGIN -- origin:'test_decoding: regression_slot 2'@1/AABBCCFF
+ COMMIT -- origin:'test_decoding: regression_slot 2'@1/AABBCCFF
+(7 rows)
+
+SELECT pg_replication_origin_session_reset();
+ pg_replication_origin_session_reset
+-------------------------------------
+
+(1 row)
+
SELECT pg_drop_replication_slot('regression_slot');
pg_drop_replication_slot
--------------------------
diff --git a/contrib/test_decoding/sql/replorigin.sql b/contrib/test_decoding/sql/replorigin.sql
index e12404e..252c786 100644
--- a/contrib/test_decoding/sql/replorigin.sql
+++ b/contrib/test_decoding/sql/replorigin.sql
@@ -8,6 +8,9 @@ SELECT pg_replication_origin_create('test_decoding: regression_slot');
-- ensure duplicate creations fail
SELECT pg_replication_origin_create('test_decoding: regression_slot');
+-- Create a second origin too
+SELECT pg_replication_origin_create('test_decoding: regression_slot 2');
+
--ensure deletions work (once)
SELECT pg_replication_origin_create('test_decoding: temp');
SELECT pg_replication_origin_drop('test_decoding: temp');
@@ -60,5 +63,37 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
INSERT INTO origin_tbl(data) VALUES ('will be replicated');
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
+-- This time we'll prepare a series of transactions to decode in one go
+-- rather than decoding one by one.
+--
+-- First, to make sure remote-originated tx's are not filtered out when only-local is unset,
+-- we need another tx with an origin. This time we'll set a different origin for each
+-- change.
+SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
+BEGIN;
+SELECT pg_replication_origin_xact_setup('0/aabbccff', '2013-01-01 00:10');
+INSERT INTO origin_tbl(data) VALUES ('will be replicated even though remotely originated');
+SELECT pg_replication_origin_xact_setup('0/aabbcd00', '2013-01-01 00:11');
+INSERT INTO origin_tbl(data) VALUES ('will also be replicated even though remotely originated');
+-- Change the origin replication identifier mid-transaction
+SELECT pg_replication_origin_session_reset();
+SELECT pg_replication_origin_session_setup('test_decoding: regression_slot 2');
+SELECT pg_replication_origin_xact_setup('0/aabbcd01', '2013-01-01 00:13');
+INSERT INTO origin_tbl(data) VALUES ('from second origin');
+COMMIT;
+
+-- then run an empty tx, since this test will be setting skip-empty-xacts=0
+-- Note that we need to do something, just something that won't get decoded,
+-- to force a commit to be recorded.
+BEGIN;
+SELECT pg_replication_origin_xact_setup('1/aabbccff', '2013-01-01 00:20');
+CREATE TEMPORARY TABLE test_empty_tx(blah integer);
+COMMIT;
+
+-- Decode with options not otherwise tested - skip empty xacts on, only-local off. Can't include xids since they'll change each run.
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0', 'only-local', '0', 'include-origins', '1');
+
+SELECT pg_replication_origin_session_reset();
+
SELECT pg_drop_replication_slot('regression_slot');
SELECT pg_replication_origin_drop('test_decoding: regression_slot');
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 80fc5f4..4cf35fd 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -45,6 +45,7 @@ typedef struct
bool skip_empty_xacts;
bool xact_wrote_changes;
bool only_local;
+ bool include_origins;
} TestDecodingData;
static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
@@ -64,6 +65,9 @@ static void pg_decode_change(LogicalDecodingContext *ctx,
static bool pg_decode_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id);
+static void appendOrigin(LogicalDecodingContext *ctx,
+ TestDecodingData *data, ReorderBufferTXN *txn);
+
void
_PG_init(void)
{
@@ -103,6 +107,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
data->include_timestamp = false;
data->skip_empty_xacts = false;
data->only_local = false;
+ data->include_origins = false;
ctx->output_plugin_private = data;
@@ -172,6 +177,17 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
+ else if (strcmp(elem->defname, "include-origins") == 0)
+ {
+
+ if (elem->arg == NULL)
+ data->include_origins = true;
+ else if (!parse_bool(strVal(elem->arg), &data->include_origins))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("could not parse value \"%s\" for parameter \"%s\"",
+ strVal(elem->arg), elem->defname)));
+ }
else
{
ereport(ERROR,
@@ -203,6 +219,8 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
if (data->skip_empty_xacts)
return;
+ Assert(txn->origin_id != DoNotReplicateId);
+
pg_output_begin(ctx, data, txn, true);
}
@@ -210,10 +228,14 @@ static void
pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
{
OutputPluginPrepareWrite(ctx, last_write);
+
+ appendStringInfoString(ctx->out, "BEGIN");
+
if (data->include_xids)
- appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
- else
- appendStringInfoString(ctx->out, "BEGIN");
+ appendStringInfo(ctx->out, " %u", txn->xid);
+
+ appendOrigin(ctx, data, txn);
+
OutputPluginWrite(ctx, last_write);
}
@@ -237,6 +259,10 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
appendStringInfo(ctx->out, " (at %s)",
timestamptz_to_str(txn->commit_time));
+ appendOrigin(ctx, data, txn);
+
+ Assert(txn->origin_id != DoNotReplicateId);
+
OutputPluginWrite(ctx, true);
}
@@ -469,5 +495,23 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
MemoryContextSwitchTo(old);
MemoryContextReset(data->context);
+ appendOrigin(ctx, data, txn);
+
OutputPluginWrite(ctx, true);
}
+
+static void
+appendOrigin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn)
+{
+ char *origin;
+
+ if (data->include_origins && txn->origin_id != InvalidRepOriginId)
+ {
+ if (!replorigin_by_oid(txn->origin_id, true, &origin))
+ origin = "[unknown]";
+ appendStringInfo(ctx->out, " -- origin:'%s'@%X/%X",
+ origin,
+ (uint32)(txn->origin_lsn >> 32),
+ (uint32)(txn->origin_lsn));
+ }
+}
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index c629da3..9f60687 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -450,7 +450,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
{
XLogRecPtr origin_lsn = InvalidXLogRecPtr;
XLogRecPtr commit_time = InvalidXLogRecPtr;
- XLogRecPtr origin_id = InvalidRepOriginId;
+ XLogRecPtr origin_id = XLogRecGetOrigin(buf->record);
int i;
if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
--
2.1.0
On 2015-10-15 20:52:41 +0800, Craig Ringer wrote:
You'll note that the tests fail. When the replication origin is reset
and set again with pg_replication_origin_xact_setup mid-xact, the
origin identity exposed to the decoding plugin callbacks for all
records (including those created before the origin change) is the
latter origin, the one active at COMMIT time.Is that the intended behaviour? That the session identifier is
determined by what was active at commit time, and only the lsn and
timestamp vary throughout the xact? It looks like it from the code.
Uh. Isn't that just because you looked at txn->origin_id instead of the
change's origin_id?
Andres
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 15 October 2015 at 20:55, Andres Freund <andres@anarazel.de> wrote:
On 2015-10-15 20:52:41 +0800, Craig Ringer wrote:
You'll note that the tests fail. When the replication origin is reset
and set again with pg_replication_origin_xact_setup mid-xact, the
origin identity exposed to the decoding plugin callbacks for all
records (including those created before the origin change) is the
latter origin, the one active at COMMIT time.Is that the intended behaviour? That the session identifier is
determined by what was active at commit time, and only the lsn and
timestamp vary throughout the xact? It looks like it from the code.Uh. Isn't that just because you looked at txn->origin_id instead of the
change's origin_id?
Yes, it is. I didn't realise that the individual changes had their own
origins, rather than changing the origin in the txn, though I can see
that now that I know to look.
Either I'm confused (likely) or the concept behind allowing this is
critically flawed.
Say some client code does
set session origin=1
begin
set xact lsn=0/123, ts=13:00
do some inserts
set session origin=2
set xact lsn=0/199, ts=14:00
do some more inserts
commit
it seems to be decoded as:
begin origin=2 lsn=0/199
inserts origin=1 lsn=0/199
more inserts origin=2 lsn=0/199
commit origin=2 lsn=0/199
i.e. the begin and commit have the final session origin. Individual
changes have the session origin in effect at the time the change was
created. The last-set origin commit timestamp and origin lsn override
all prior ones; they aren't recorded per-change, only on the commit.
This means you have change records with a change->origin_id that's
from a completely different node, which makes no sense at all with the
txn->origin_lsn . It matches the txn->origin_id, which is the same
throughout, but then why even have the change->origin_id?
I find the idea of each change having its own origin node - but not
its own origin LSN - very confusing. For one thing the origin filter
callback can't know about that, and can only filter based on the txn's
origin. I guess that's the output plugin's problem - if it wants to
cope with arbitrary mixed-origin tx's it can't use the origin filter
and has to check each message.
I really don't see how it makes any sense to allow the origin_id to
change mid-tx. I can see how sending the origin_id for each change
could make sense to allow future support for transaction streaming
where decoding starts before we receive the commit record, but
changing the origin_id within the tx doesn't make any sense.
IMO changing the origin should be disallowed within a tx. Otherwise
there needs to be some way to record the origin lsn and commit
timestamp changing within the tx too.
I was going to just send a patch to disallow changing the origin
mid-tx, but I'm not sure I see a good way to do that since the origin
is a session-level global, not part of the xact info.
Document it as a "don't do that, if you do it you get to keep the pieces"?
--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 16 October 2015 at 11:51, Craig Ringer <craig@2ndquadrant.com> wrote:
Document it as a "don't do that, if you do it you get to keep the pieces"?
Thinking about this some more, having per-change origins makes sense
when you're not using origin LSNs, such as when you're not replaying
from another PostgreSQL instance. So I _can_ see why it exists.
I guess this is mostly a matter of adding some comments and/or some
notes in the functions' docs to explain how it all fits together -
that origins can be per-change, that the txn origin is the origin that
was in effect at commit time, and that the lsn and commit timestamp
are always those that were set at commit time, so you cannot use a
per-change origin with the txn's lsn and expect it to make sense.
Reasonable?
--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Hi all
Patch revision 3 attached. It's a one-liner, with just the fix, and an
explanatory note in the patch header.
I'll leave the test changes for now, per Andres's feedback.
I'll write up a proposed addition to the replication origin docs that
explains the existence of separate origins for each change and for the
tx as a whole, and explains how replication origins interacts with
transaction commit timestamps. That'll come as a followup; I think
it's worth getting the bug fix in now.
--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
Attachments:
0001-Send-replication-origin-on-commit.patchtext/x-patch; charset=US-ASCII; name=0001-Send-replication-origin-on-commit.patchDownload
From 8c4285402706329a64a3f180c66a04357a932b3d Mon Sep 17 00:00:00 2001
From: Craig Ringer <craig@2ndquadrant.com>
Date: Mon, 19 Oct 2015 21:37:19 +0800
Subject: [PATCH] Send replication origin on commit
The transaction replication origin is not set when decoding a commit.
Origins for individual replication changes are passed correctly, as is
the origin when calling the transaction filter. Fix that, supplying
the transaction origin to logical decoding callbacks.
Prior to this change the ReorderBufferTXN->origin_id (usually:
txn->origin_id) is always InvalidRepNodeOrigin in decoding callbacks
other than the origin filter.
See the thread
http://www.postgresql.org/message-id/CAMsr+YFhBJLp=qfSz3-J+0P1zLkE8zNXM2otycn20QRMx380gw@mail.gmail.com
for details.
---
src/backend/replication/logical/decode.c | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index c629da3..9f60687 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -450,7 +450,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
{
XLogRecPtr origin_lsn = InvalidXLogRecPtr;
XLogRecPtr commit_time = InvalidXLogRecPtr;
- XLogRecPtr origin_id = InvalidRepOriginId;
+ XLogRecPtr origin_id = XLogRecGetOrigin(buf->record);
int i;
if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
--
2.1.0
On 19 October 2015 at 21:43, Craig Ringer <craig@2ndquadrant.com> wrote:
Hi all
Patch revision 3 attached. It's a one-liner, with just the fix, and an
explanatory note in the patch header.
I'm bumping this because I think it's important not to miss it for
9.5, so it can't wait for the commitfest.
It's just the one-liner with the fix its self.
--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Hi,
On 2015-10-19 21:43:32 +0800, Craig Ringer wrote:
Patch revision 3 attached. It's a one-liner, with just the fix, and an
explanatory note in the patch header.
Pushed to 9.5 and master.
Thanks for noticing the issue,
Andres Freund
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 9 November 2015 at 07:04, Andres Freund <andres@anarazel.de> wrote:
Hi,
On 2015-10-19 21:43:32 +0800, Craig Ringer wrote:
Patch revision 3 attached. It's a one-liner, with just the fix, and an
explanatory note in the patch header.Pushed to 9.5 and master.
Thanks for noticing the issue,
Thanks for taking the time to sanity-check and apply the fix.
--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers