patch to ensure logical decoding errors early
This patch does 2 things
1) Ensure that when the slot is created
with pg_create_physical_replication_slot if the output plugin does not
exist it will error.
2) Currently when the decoding context is created and the output plugin
does not exist the protocol will respond with CopyDone. This patch will
return an error instead and abort the copy connection.
Dave Cramer
Attachments:
0002-remove-space.patchapplication/octet-stream; name=0002-remove-space.patchDownload
From 7e9934617491916abed089475c14af12a1fb8ac4 Mon Sep 17 00:00:00 2001
From: Dave Cramer <davecramer@gmail.com>
Date: Tue, 31 Jul 2018 14:46:09 -0400
Subject: [PATCH 2/2] remove space
---
src/backend/replication/logical/logical.c | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 9f883b9..b2512a1 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -311,7 +311,7 @@ CreateInitDecodingContext(char *plugin,
ReplicationSlotSave();
ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
- need_full_snapshot, true,
+ need_full_snapshot, true,
read_page, prepare_write, do_write,
update_progress);
--
2.6.4
0001-Ensure-that-pg_create_physical_replication_slot-erro.patchapplication/octet-stream; name=0001-Ensure-that-pg_create_physical_replication_slot-erro.patchDownload
From 290f612ab41f6cb779d44c5754b23092d139e88d Mon Sep 17 00:00:00 2001
From: Dave Cramer <davecramer@gmail.com>
Date: Tue, 31 Jul 2018 14:29:09 -0400
Subject: [PATCH 1/2] Ensure that pg_create_physical_replication_slot errors if
the output plugin does not exist If the output plugin does not exist send
back an error message as opposed to CopyBoth when the starting the
replication stream
---
contrib/test_decoding/expected/slot.out | 2 ++
contrib/test_decoding/sql/slot.sql | 2 ++
src/backend/replication/logical/logical.c | 5 ++---
src/backend/replication/walsender.c | 22 ++++++++++++----------
4 files changed, 18 insertions(+), 13 deletions(-)
diff --git a/contrib/test_decoding/expected/slot.out b/contrib/test_decoding/expected/slot.out
index 2737a8a..523621a 100644
--- a/contrib/test_decoding/expected/slot.out
+++ b/contrib/test_decoding/expected/slot.out
@@ -30,6 +30,8 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_t2', 'tes
init
(1 row)
+SELECT pg_create_logical_replication_slot('foo', 'nonexistent');
+ERROR: could not access file "nonexistent": No such file or directory
-- here we want to start a new session and wait till old one is gone
select pg_backend_pid() as oldpid \gset
\c -
diff --git a/contrib/test_decoding/sql/slot.sql b/contrib/test_decoding/sql/slot.sql
index 24cdf71..c8d08f8 100644
--- a/contrib/test_decoding/sql/slot.sql
+++ b/contrib/test_decoding/sql/slot.sql
@@ -9,6 +9,8 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_p', 'test
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_t2', 'test_decoding', true);
+SELECT pg_create_logical_replication_slot('foo', 'nonexistent');
+
-- here we want to start a new session and wait till old one is gone
select pg_backend_pid() as oldpid \gset
\c -
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 3cd4eef..9f883b9 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -143,8 +143,7 @@ StartupDecodingContext(List *output_plugin_options,
* (re-)load output plugins, so we detect a bad (removed) output plugin
* now.
*/
- if (!fast_forward)
- LoadOutputPlugin(&ctx->callbacks, NameStr(slot->data.plugin));
+ LoadOutputPlugin(&ctx->callbacks, NameStr(slot->data.plugin));
/*
* Now that the slot's xmin has been set, we can announce ourselves as a
@@ -312,7 +311,7 @@ CreateInitDecodingContext(char *plugin,
ReplicationSlotSave();
ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
- need_full_snapshot, true,
+ need_full_snapshot, true,
read_page, prepare_write, do_write,
update_progress);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index d60026d..a196c5b 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1068,6 +1068,18 @@ StartLogicalReplication(StartReplicationCmd *cmd)
got_STOPPING = true;
}
+ /*
+ * Initialize position to the last ack'ed one, then the xlog records begin
+ * to be shipped from that position.
+ */
+ logical_decoding_ctx = CreateDecodingContext(cmd->startpoint, cmd->options,
+ false,
+ logical_read_xlog_page,
+ WalSndPrepareWrite,
+ WalSndWriteData,
+ WalSndUpdateProgress);
+
+
WalSndSetState(WALSNDSTATE_CATCHUP);
/* Send a CopyBothResponse message, and start streaming */
@@ -1077,16 +1089,6 @@ StartLogicalReplication(StartReplicationCmd *cmd)
pq_endmessage(&buf);
pq_flush();
- /*
- * Initialize position to the last ack'ed one, then the xlog records begin
- * to be shipped from that position.
- */
- logical_decoding_ctx = CreateDecodingContext(cmd->startpoint, cmd->options,
- false,
- logical_read_xlog_page,
- WalSndPrepareWrite,
- WalSndWriteData,
- WalSndUpdateProgress);
/* Start reading WAL from the oldest required WAL. */
logical_startptr = MyReplicationSlot->data.restart_lsn;
--
2.6.4
Hi,
On 2018-07-31 14:51:12 -0400, Dave Cramer wrote:
This patch does 2 things
1) Ensure that when the slot is created
with pg_create_physical_replication_slot if the output plugin does not
exist it will error.
*logical, I assume?
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 3cd4eef..9f883b9 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -143,8 +143,7 @@ StartupDecodingContext(List *output_plugin_options, * (re-)load output plugins, so we detect a bad (removed) output plugin * now. */ - if (!fast_forward) - LoadOutputPlugin(&ctx->callbacks, NameStr(slot->data.plugin)); + LoadOutputPlugin(&ctx->callbacks, NameStr(slot->data.plugin));
So this actually was broken by 9c7d06d60680c7f00d931233873dee81fdb311c6
and worked before? Petr, Simon? Isn't the actual bug here that
CreateInitDecodingContext() passes true for fast_forward? Dave, could
you confirm this is the case? If so, this'll end up actually being an
open items entry...
/*
* Now that the slot's xmin has been set, we can announce ourselves as a
@@ -312,7 +311,7 @@ CreateInitDecodingContext(char *plugin,
ReplicationSlotSave();ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
- need_full_snapshot, true,
+ need_full_snapshot, true,
read_page, prepare_write, do_write,
update_progress);
Huh?
Greetings,
Andres Freund
On 31 July 2018 at 14:58, Andres Freund <andres@anarazel.de> wrote:
Hi,
On 2018-07-31 14:51:12 -0400, Dave Cramer wrote:
This patch does 2 things
1) Ensure that when the slot is created
with pg_create_physical_replication_slot if the output plugin does not
exist it will error.*logical, I assume?
Yes, logical.
diff --git a/src/backend/replication/logical/logical.cb/src/backend/replication/logical/logical.c
index 3cd4eef..9f883b9 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -143,8 +143,7 @@ StartupDecodingContext(List *output_plugin_options, * (re-)load output plugins, so we detect a bad (removed) outputplugin
* now.
*/
- if (!fast_forward)
- LoadOutputPlugin(&ctx->callbacks,NameStr(slot->data.plugin));
+ LoadOutputPlugin(&ctx->callbacks, NameStr(slot->data.plugin));
So this actually was broken by 9c7d06d60680c7f00d931233873dee81fdb311c6
and worked before? Petr, Simon? Isn't the actual bug here that
CreateInitDecodingContext() passes true for fast_forward? Dave, could
you confirm this is the case? If so, this'll end up actually being an
open items entry...
Ya, I think that is really the issue. I will redo my patch
/*
* Now that the slot's xmin has been set, we can announceourselves as a
@@ -312,7 +311,7 @@ CreateInitDecodingContext(char *plugin,
ReplicationSlotSave();ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
-need_full_snapshot, true,
+
need_full_snapshot, true,
read_page, prepare_write, do_write,
update_progress);
Hmmm, I guess I should read my patches more carefully before sending them
Huh?
Dave Cramer
Hi,
On 31/07/18 20:58, Andres Freund wrote>
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 3cd4eef..9f883b9 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -143,8 +143,7 @@ StartupDecodingContext(List *output_plugin_options, * (re-)load output plugins, so we detect a bad (removed) output plugin * now. */ - if (!fast_forward) - LoadOutputPlugin(&ctx->callbacks, NameStr(slot->data.plugin)); + LoadOutputPlugin(&ctx->callbacks, NameStr(slot->data.plugin));So this actually was broken by 9c7d06d60680c7f00d931233873dee81fdb311c6
and worked before? Petr, Simon? Isn't the actual bug here that
CreateInitDecodingContext() passes true for fast_forward? Dave, could
you confirm this is the case? If so, this'll end up actually being an
open items entry...
Indeed.
--
Petr Jelinek http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
On 1 August 2018 at 10:13, Petr Jelinek <petr.jelinek@2ndquadrant.com>
wrote:
Hi,
On 31/07/18 20:58, Andres Freund wrote>
diff --git a/src/backend/replication/logical/logical.cb/src/backend/replication/logical/logical.c
index 3cd4eef..9f883b9 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -143,8 +143,7 @@ StartupDecodingContext(List *output_plugin_options, * (re-)load output plugins, so we detect a bad (removed) outputplugin
* now.
*/
- if (!fast_forward)
- LoadOutputPlugin(&ctx->callbacks,NameStr(slot->data.plugin));
+ LoadOutputPlugin(&ctx->callbacks, NameStr(slot->data.plugin));
So this actually was broken by 9c7d06d60680c7f00d931233873dee81fdb311c6
and worked before? Petr, Simon? Isn't the actual bug here that
CreateInitDecodingContext() passes true for fast_forward? Dave, could
you confirm this is the case? If so, this'll end up actually being an
open items entry...Indeed.
See attached patch which fixes it, and adds a test for it.
Attachments:
0001-Ensure-pg_create_logical_replication_slot-fails-if-t.patchapplication/octet-stream; name=0001-Ensure-pg_create_logical_replication_slot-fails-if-t.patchDownload
From 36e1c2fdf754d89ca817440eab87edeb87cfe96e Mon Sep 17 00:00:00 2001
From: Dave Cramer <davecramer@gmail.com>
Date: Wed, 1 Aug 2018 11:04:29 -0400
Subject: [PATCH] Ensure pg_create_logical_replication_slot fails if the output
plugin does not exist Error early when creating DecodingContext before
sending CopyBoth in the event that the output plugin does not exist
---
contrib/test_decoding/expected/slot.out | 2 ++
contrib/test_decoding/sql/slot.sql | 2 ++
src/backend/replication/logical/logical.c | 2 +-
src/backend/replication/walsender.c | 24 ++++++++++++++----------
4 files changed, 19 insertions(+), 11 deletions(-)
diff --git a/contrib/test_decoding/expected/slot.out b/contrib/test_decoding/expected/slot.out
index 2737a8a..523621a 100644
--- a/contrib/test_decoding/expected/slot.out
+++ b/contrib/test_decoding/expected/slot.out
@@ -30,6 +30,8 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_t2', 'tes
init
(1 row)
+SELECT pg_create_logical_replication_slot('foo', 'nonexistent');
+ERROR: could not access file "nonexistent": No such file or directory
-- here we want to start a new session and wait till old one is gone
select pg_backend_pid() as oldpid \gset
\c -
diff --git a/contrib/test_decoding/sql/slot.sql b/contrib/test_decoding/sql/slot.sql
index 24cdf71..c8d08f8 100644
--- a/contrib/test_decoding/sql/slot.sql
+++ b/contrib/test_decoding/sql/slot.sql
@@ -9,6 +9,8 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_p', 'test
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_t2', 'test_decoding', true);
+SELECT pg_create_logical_replication_slot('foo', 'nonexistent');
+
-- here we want to start a new session and wait till old one is gone
select pg_backend_pid() as oldpid \gset
\c -
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 3cd4eef..bb83fc9 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -312,7 +312,7 @@ CreateInitDecodingContext(char *plugin,
ReplicationSlotSave();
ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
- need_full_snapshot, true,
+ need_full_snapshot, false,
read_page, prepare_write, do_write,
update_progress);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index d60026d..c0e4acc 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1068,6 +1068,20 @@ StartLogicalReplication(StartReplicationCmd *cmd)
got_STOPPING = true;
}
+ /*
+ * Initialize position to the last ack'ed one, then the xlog records begin
+ * to be shipped from that position.
+ * This is done before sending CopyBoth in the event there is an error
+ * the error message will be returned early
+ */
+ logical_decoding_ctx = CreateDecodingContext(cmd->startpoint, cmd->options,
+ false,
+ logical_read_xlog_page,
+ WalSndPrepareWrite,
+ WalSndWriteData,
+ WalSndUpdateProgress);
+
+
WalSndSetState(WALSNDSTATE_CATCHUP);
/* Send a CopyBothResponse message, and start streaming */
@@ -1077,16 +1091,6 @@ StartLogicalReplication(StartReplicationCmd *cmd)
pq_endmessage(&buf);
pq_flush();
- /*
- * Initialize position to the last ack'ed one, then the xlog records begin
- * to be shipped from that position.
- */
- logical_decoding_ctx = CreateDecodingContext(cmd->startpoint, cmd->options,
- false,
- logical_read_xlog_page,
- WalSndPrepareWrite,
- WalSndWriteData,
- WalSndUpdateProgress);
/* Start reading WAL from the oldest required WAL. */
logical_startptr = MyReplicationSlot->data.restart_lsn;
--
2.6.4
On 2018-Aug-01, Dave Cramer wrote:
See attached patch which fixes it, and adds a test for it.
Pushed, thanks.
--
�lvaro Herrera https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
On 1 August 2018 at 23:11, Alvaro Herrera <alvherre@2ndquadrant.com> wrote:
On 2018-Aug-01, Dave Cramer wrote:
See attached patch which fixes it, and adds a test for it.
Pushed, thanks.
Thanks
--
Simon Riggs http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services