pg_createsubscriber: drop pre-existing subscriptions from the converted node
Dear Hackers,
This is a follow-up thread for pg_createsubscriber [1]/messages/by-id/CAA4eK1J22UEfrqx222h5j9DQ7nxGrTbAa_BC+=mQXdXs-RCsew@mail.gmail.com. I started a new thread
since there is no activity around here.
## Problem
Assuming that there is a cascading replication like below:
node A --(logical replication)--> node B --(streaming replication)--> node C
In this case, subscriptions exist even on node C, but it does not try to connect
to node A because the logical replication launcher/worker won't be launched.
After the conversion, node C becomes a subscriber for node B, and the subscription
toward node A remains. Therefore, another worker that tries to connect to node A
will be launched, raising an ERROR [2]/messages/by-id/CANhcyEWvimA1-f6hSrA=9qkfR5SonFb56b36M++vT=LiFj=76g@mail.gmail.com. This failure may occur even during the
conversion.
## Solution
The easiest solution is to drop pre-existing subscriptions from the converted node.
To avoid establishing connections during the conversion, slot_name is set to NONE
on the primary first, then drop on the standby. The setting will be restored on the
primary node.
Attached patch implements the idea. Test script is also included, but not sure it should
be on the HEAD
BTW, I found that LogicalRepInfo.oid won't be used. If needed, I can create
another patch to remove the attribute.
How do you think?
[1]: /messages/by-id/CAA4eK1J22UEfrqx222h5j9DQ7nxGrTbAa_BC+=mQXdXs-RCsew@mail.gmail.com
[2]: /messages/by-id/CANhcyEWvimA1-f6hSrA=9qkfR5SonFb56b36M++vT=LiFj=76g@mail.gmail.com
Best Regards,
Hayato Kuroda
FUJITSU LIMITED
https://www.fujitsu.com/
Attachments:
0001-pg_createsubscriber-Drop-pre-existing-subscriptions-.patchapplication/octet-stream; name=0001-pg_createsubscriber-Drop-pre-existing-subscriptions-.patchDownload
From 683c4fc3e767a068eca0c85561186c1322287d69 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Fri, 21 Jun 2024 10:55:46 +0000
Subject: [PATCH] pg_createsubscriber: Drop pre-existing subscriptions from the
converted instance
Previously, we did nothing for pre-existing subscriptions on the streaming
replication cluster. However, after the conversion, the downstream node will try
to connect to another publisher node specified by the pre-existing subscriptions,
which will cause an ERROR. To avoid failure, drop such subscriptions from the
converted node at the end of this command.
---
src/bin/pg_basebackup/pg_createsubscriber.c | 167 +++++++++++++++++-
.../t/041_pg_createsubscriber_added.pl | 118 +++++++++++++
2 files changed, 283 insertions(+), 2 deletions(-)
create mode 100644 src/bin/pg_basebackup/t/041_pg_createsubscriber_added.pl
diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index 1138c20e56..b5f62c7c54 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -57,6 +57,9 @@ struct LogicalRepInfo
bool made_replslot; /* replication slot was created */
bool made_publication; /* publication was created */
+ int num_subscriptions; /* number of pre-existing subscriptions */
+ char **pre_subnames; /* subscription name */
+ char **pre_slotnames; /* used replication slots name */
};
static void cleanup_objects_atexit(void);
@@ -78,7 +81,7 @@ static bool server_is_in_recovery(PGconn *conn);
static char *generate_object_name(PGconn *conn);
static void check_publisher(const struct LogicalRepInfo *dbinfo);
static char *setup_publisher(struct LogicalRepInfo *dbinfo);
-static void check_subscriber(const struct LogicalRepInfo *dbinfo);
+static void check_subscriber(struct LogicalRepInfo *dbinfo);
static void setup_subscriber(struct LogicalRepInfo *dbinfo,
const char *consistent_lsn);
static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir,
@@ -103,6 +106,11 @@ static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *
const char *lsn);
static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
+static void obtain_and_disable_pre_existing_subscriptions(struct LogicalRepInfo *dbinfo);
+static void disable_subscription(PGconn *conn, const char *subname);
+static void enable_subscirptions_on_publisher(struct LogicalRepInfo *dbinfo);
+static void drop_pre_existing_subscriptions(struct LogicalRepInfo *dbinfo);
+
#define USEC_PER_SEC 1000000
#define WAIT_INTERVAL 1 /* 1 second */
@@ -912,6 +920,152 @@ check_publisher(const struct LogicalRepInfo *dbinfo)
exit(1);
}
+/*
+ * Connect to the primary to obtain a list of subscriptions and disable them.
+ * They will be enabled again on the primary and dropped on the converted node.
+ */
+static void
+obtain_and_disable_pre_existing_subscriptions(struct LogicalRepInfo *dbinfo)
+{
+ PQExpBuffer query = createPQExpBuffer();
+
+ for (int i = 0; i < num_dbs; i++)
+ {
+ PGconn *conn;
+ PGresult *res;
+ int ntups;
+
+ /* Connect to publisher */
+ conn = connect_database(dbinfo[i].pubconninfo, true);
+
+ appendPQExpBuffer(query,
+ "SELECT s.subname, s.subslotname FROM pg_catalog.pg_subscription s "
+ "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
+ "WHERE d.datname = '%s'",
+ dbinfo[i].dbname);
+
+ res = PQexec(conn, query->data);
+ ntups = PQntuples(res);
+
+ dbinfo[i].num_subscriptions = ntups;
+
+ if (ntups > 0)
+ {
+ dbinfo[i].pre_subnames = pg_malloc_array(char *, ntups);
+ dbinfo[i].pre_slotnames = pg_malloc_array(char *, ntups);
+
+ for (int j = 0; j < ntups; j++)
+ {
+ /*
+ * Store name of subscriptions and specified slots. They are
+ * used to enable them again on the primary node.
+ */
+ dbinfo[i].pre_subnames[j] = pg_strdup(PQgetvalue(res, j, 0));
+ dbinfo[i].pre_slotnames[j] = pg_strdup(PQgetvalue(res, j, 1));
+
+ disable_subscription(conn, dbinfo[i].pre_subnames[j]);
+ }
+ }
+
+ resetPQExpBuffer(query);
+ PQclear(res);
+ disconnect_database(conn, false);
+ }
+
+ destroyPQExpBuffer(query);
+}
+
+/*
+ * Disable the given subscription. After that, the subscription will be altered
+ * to set slot_name = NONE. This is needed for the standby server to allow
+ * subscription drops.
+ */
+static void
+disable_subscription(PGconn *conn, const char *subname)
+{
+ PQExpBuffer query = createPQExpBuffer();
+
+ Assert(conn != NULL);
+
+ /* Disable the given subscription */
+ appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s DISABLE", subname);
+ PQclear(PQexec(conn, query->data));
+ resetPQExpBuffer(query);
+
+ /* ...and alter the slot_name to NONE */
+ appendPQExpBuffer(query,
+ "ALTER SUBSCRIPTION %s SET (slot_name = NONE)",
+ subname);
+ PQclear(PQexec(conn, query->data));
+
+ destroyPQExpBuffer(query);
+}
+
+/*
+ * Enable pre-existing subscriptions on the primary
+ */
+static void
+enable_subscirptions_on_publisher(struct LogicalRepInfo *dbinfo)
+{
+ PQExpBuffer query = createPQExpBuffer();
+
+ for (int i = 0; i < num_dbs; i++)
+ {
+ PGconn *conn;
+ struct LogicalRepInfo info = dbinfo[i];
+
+ /* Connect to publisher */
+ conn = connect_database(info.pubconninfo, false);
+
+ for (int j = 0; j < info.num_subscriptions; j++)
+ {
+ /* Restore the slot_name parameter */
+ appendPQExpBuffer(query,
+ "ALTER SUBSCRIPTION %s SET (slot_name = %s)",
+ info.pre_subnames[j], info.pre_slotnames[j]);
+ PQclear(PQexec(conn, query->data));
+ resetPQExpBuffer(query);
+
+ /* ...and them enable the subscription */
+ appendPQExpBuffer(query,
+ "ALTER SUBSCRIPTION %s ENABLE",
+ info.pre_subnames[j]);
+ PQclear(PQexec(conn, query->data));
+ resetPQExpBuffer(query);
+ }
+ }
+
+ destroyPQExpBuffer(query);
+}
+
+/*
+ * Drop pre-existing subscriptions on the standby
+ */
+static void
+drop_pre_existing_subscriptions(struct LogicalRepInfo *dbinfo)
+{
+ PQExpBuffer query = createPQExpBuffer();
+
+ for (int i = 0; i < num_dbs; i++)
+ {
+ PGconn *conn;
+ struct LogicalRepInfo info = dbinfo[i];
+
+ /* Connect to subscriber */
+ conn = connect_database(info.subconninfo, false);
+
+ for (int j = 0; j < info.num_subscriptions; j++)
+ {
+ appendPQExpBuffer(query,
+ "DROP SUBSCRIPTION %s;", info.pre_subnames[j]);
+ PQexec(conn, query->data);
+ resetPQExpBuffer(query);
+ }
+ }
+
+ destroyPQExpBuffer(query);
+}
+
/*
* Is the standby server ready for logical replication?
*
@@ -924,7 +1078,7 @@ check_publisher(const struct LogicalRepInfo *dbinfo)
* will be broken at the end of this process (due to pg_resetwal).
*/
static void
-check_subscriber(const struct LogicalRepInfo *dbinfo)
+check_subscriber(struct LogicalRepInfo *dbinfo)
{
PGconn *conn;
PGresult *res;
@@ -2067,6 +2221,9 @@ main(int argc, char **argv)
/* Check if the primary server is ready for logical replication */
check_publisher(dbinfo);
+ /* Check the existence of pre-existing subscriptions, and disable once */
+ obtain_and_disable_pre_existing_subscriptions(dbinfo);
+
/*
* Stop the target server. The recovery process requires that the server
* reaches a consistent state before targeting the recovery stop point.
@@ -2109,9 +2266,15 @@ main(int argc, char **argv)
/* Remove primary_slot_name if it exists on primary */
drop_primary_replication_slot(dbinfo, primary_slot_name);
+ /* Enable pre-subscriptions on primary */
+ enable_subscirptions_on_publisher(dbinfo);
+
/* Remove failover replication slots if they exist on subscriber */
drop_failover_replication_slots(dbinfo);
+ /* Drop pre-subscriptions on standby */
+ drop_pre_existing_subscriptions(dbinfo);
+
/* Stop the subscriber */
pg_log_info("stopping the subscriber");
stop_standby_server(subscriber_dir);
diff --git a/src/bin/pg_basebackup/t/041_pg_createsubscriber_added.pl b/src/bin/pg_basebackup/t/041_pg_createsubscriber_added.pl
new file mode 100644
index 0000000000..59cddb2fc1
--- /dev/null
+++ b/src/bin/pg_basebackup/t/041_pg_createsubscriber_added.pl
@@ -0,0 +1,118 @@
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Construct a cascading replication system like:
+#
+# node_a --(logical replication)--> node_b --(streaming replication)--> node_c
+#
+
+# Set up node A as publisher
+my $node_a = PostgreSQL::Test::Cluster->new('node_a');
+my $aconnstr = $node_a->connstr;
+$node_a->init(allows_streaming => 'logical');
+$node_a->start;
+
+# On node A
+# - create databases
+# - create test tables
+# - insert a row
+# - create publications
+$node_a->safe_psql(
+ 'postgres', q(
+ CREATE DATABASE pg1;
+ CREATE DATABASE pg2;
+));
+$node_a->safe_psql('pg1', 'CREATE TABLE tbl1 (a text)');
+$node_a->safe_psql('pg1', "INSERT INTO tbl1 VALUES('first row')");
+$node_a->safe_psql('pg2', 'CREATE TABLE tbl2 (a text)');
+$node_a->safe_psql('pg1', 'CREATE PUBLICATION pub1_pg1 FOR ALL TABLES');
+$node_a->safe_psql('pg1', 'CREATE PUBLICATION pub2_pg1');
+$node_a->safe_psql('pg2', 'CREATE PUBLICATION pub1_pg2 FOR ALL TABLES');
+$node_a->safe_psql('pg2', 'CREATE PUBLICATION pub2_pg2');
+
+# Set up node B as subscriber/primary
+my $node_b = PostgreSQL::Test::Cluster->new('node_b');
+my $bconnstr = $node_b->connstr;
+$node_b->init(allows_streaming => 'logical');
+$node_b->start;
+
+# On node B
+# - create databases
+# - create subscriptions
+$node_b->safe_psql(
+ 'postgres', q(
+ CREATE DATABASE pg1;
+ CREATE DATABASE pg2;
+));
+$node_b->safe_psql('pg1', 'CREATE TABLE tbl1 (a text)');
+$node_b->safe_psql('pg2', 'CREATE TABLE tbl2 (a text)');
+$node_b->safe_psql('pg1',
+ "CREATE SUBSCRIPTION sub1_pg1 CONNECTION '$aconnstr dbname=pg1' PUBLICATION pub1_pg1");
+$node_b->safe_psql('pg1',
+ "CREATE SUBSCRIPTION sub2_pg1 CONNECTION '$aconnstr dbname=pg1' PUBLICATION pub2_pg1");
+$node_b->safe_psql('pg2',
+ "CREATE SUBSCRIPTION sub1_pg2 CONNECTION '$aconnstr dbname=pg2' PUBLICATION pub1_pg2");
+$node_b->safe_psql('pg2',
+ "CREATE SUBSCRIPTION sub2_pg2 CONNECTION '$aconnstr dbname=pg2' PUBLICATION pub2_pg2");
+
+$node_b->wait_for_subscription_sync($node_a, 'sub1_pg1');
+$node_b->wait_for_subscription_sync($node_a, 'sub1_pg2');
+
+# Confirms logical replication works well
+my $result = $node_b->safe_psql('pg1', 'SELECT * FROM tbl1;');
+is($result, 'first row', 'check logical replication works well');
+
+# Set up node C as standby
+$node_b->backup('backup_1');
+my $node_c = PostgreSQL::Test::Cluster->new('node_c');
+$node_c->init_from_backup($node_b, 'backup_1', has_streaming => 1);
+$node_c->append_conf(
+ 'postgresql.conf', qq[
+primary_conninfo = '$bconnstr'
+]);
+$node_c->set_standby_mode();
+$node_c->start;
+
+$node_b->wait_for_replay_catchup($node_c);
+
+# Confirms streaming replication works well
+$result = $node_c->safe_psql('pg1', 'SELECT * FROM tbl1;');
+is($result, 'first row', 'check streaming replication works well');
+
+$node_c->stop;
+
+# Run pg_createsubscriber
+command_ok(
+ [
+ 'pg_createsubscriber', '--verbose',
+ '--recovery-timeout', "$PostgreSQL::Test::Utils::timeout_default",
+ '--verbose',
+ '--pgdata', $node_c->data_dir,
+ '--publisher-server', $bconnstr,
+ '--socket-directory', $node_c->host,
+ '--subscriber-port', $node_c->port,
+ '--database', 'pg1',
+ '--database', 'pg2'
+ ],
+ 'run pg_createsubscriber on node S');
+
+# Confirms pre-existing subscriptions are removed from the converted node
+$node_c->start;
+$result = $node_c->safe_psql('pg1',
+ "SELECT subname FROM pg_subscription WHERE subname NOT LIKE 'pg_createsubscriber%';");
+is($result, '', 'check subscriptions are removed');
+
+# Confirms pre-existing subscriptions still exist on the primary node
+$result = $node_b->safe_psql('pg1',
+ "SELECT subname, subenabled, subslotname FROM pg_subscription WHERE subname NOT LIKE 'pg_createsubscriber%' ORDER BY subname;");
+is($result, 'sub1_pg1|t|sub1_pg1
+sub1_pg2|t|sub1_pg2
+sub2_pg1|t|sub2_pg1
+sub2_pg2|t|sub2_pg2', 'check subscriptions still exist');
+
+done_testing();
--
2.43.0
On Fri, 21 Jun 2024 at 16:51, Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:
Dear Hackers,
This is a follow-up thread for pg_createsubscriber [1]. I started a new thread
since there is no activity around here.## Problem
Assuming that there is a cascading replication like below:
node A --(logical replication)--> node B --(streaming replication)--> node C
In this case, subscriptions exist even on node C, but it does not try to connect
to node A because the logical replication launcher/worker won't be launched.
After the conversion, node C becomes a subscriber for node B, and the subscription
toward node A remains. Therefore, another worker that tries to connect to node A
will be launched, raising an ERROR [2]. This failure may occur even during the
conversion.## Solution
The easiest solution is to drop pre-existing subscriptions from the converted node.
To avoid establishing connections during the conversion, slot_name is set to NONE
on the primary first, then drop on the standby. The setting will be restored on the
primary node.
Attached patch implements the idea. Test script is also included, but not sure it should
be on the HEAD
Few comments:
1) Should we do this only for the enabled subscription, otherwise the
disabled subscriptions will be enabled after running
pg_createsubscriber:
+obtain_and_disable_pre_existing_subscriptions(struct LogicalRepInfo *dbinfo)
+{
+ PQExpBuffer query = createPQExpBuffer();
+
+ for (int i = 0; i < num_dbs; i++)
+ {
+ PGconn *conn;
+ PGresult *res;
+ int ntups;
+
+ /* Connect to publisher */
+ conn = connect_database(dbinfo[i].pubconninfo, true);
+
+ appendPQExpBuffer(query,
+ "SELECT s.subname,
s.subslotname FROM pg_catalog.pg_subscription s "
+ "INNER JOIN
pg_catalog.pg_database d ON (s.subdbid = d.oid) "
+ "WHERE d.datname = '%s'",
+ dbinfo[i].dbname);
+
2) disconnect_database not called here, should the connection be disconnected:
+drop_pre_existing_subscriptions(struct LogicalRepInfo *dbinfo)
+{
+ PQExpBuffer query = createPQExpBuffer();
+
+ for (int i = 0; i < num_dbs; i++)
+ {
+ PGconn *conn;
+ struct LogicalRepInfo info = dbinfo[i];
+
+ /* Connect to subscriber */
+ conn = connect_database(info.subconninfo, false);
+
+ for (int j = 0; j < info.num_subscriptions; j++)
+ {
+ appendPQExpBuffer(query,
+ "DROP
SUBSCRIPTION %s;", info.pre_subnames[j]);
+ PQexec(conn, query->data);
+ resetPQExpBuffer(query);
+ }
+ }
3) Similarly here too:
+static void
+enable_subscirptions_on_publisher(struct LogicalRepInfo *dbinfo)
+{
+ PQExpBuffer query = createPQExpBuffer();
+
+ for (int i = 0; i < num_dbs; i++)
+ {
+ PGconn *conn;
+ struct LogicalRepInfo info = dbinfo[i];
+
+ /* Connect to publisher */
+ conn = connect_database(info.pubconninfo, false);
4) them should be then here:
+ /* ...and them enable the subscription */
+ appendPQExpBuffer(query,
+ "ALTER
SUBSCRIPTION %s ENABLE",
+ info.pre_subnames[j]);
+ PQclear(PQexec(conn, query->data));
+ resetPQExpBuffer(query);
BTW, I found that LogicalRepInfo.oid won't be used. If needed, I can create
another patch to remove the attribute.
I was able to compile without this, I think this can be removed.
Regards,
Vignesh
On Fri, Jun 21, 2024 at 4:51 PM Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:
This is a follow-up thread for pg_createsubscriber [1]. I started a new thread
since there is no activity around here.## Problem
Assuming that there is a cascading replication like below:
node A --(logical replication)--> node B --(streaming replication)--> node C
In this case, subscriptions exist even on node C, but it does not try to connect
to node A because the logical replication launcher/worker won't be launched.
After the conversion, node C becomes a subscriber for node B, and the subscription
toward node A remains. Therefore, another worker that tries to connect to node A
will be launched, raising an ERROR [2]. This failure may occur even during the
conversion.## Solution
The easiest solution is to drop pre-existing subscriptions from the converted node.
To avoid establishing connections during the conversion, slot_name is set to NONE
on the primary first, then drop on the standby. The setting will be restored on the
primary node.
It seems disabling subscriptions on the primary can make the primary
stop functioning for some duration of time. I feel we need some
solution where after converting to subscriber, we disable and drop
pre-existing subscriptions. One idea could be that we use the list of
new subscriptions created by the tool such that any subscription not
existing in that list will be dropped.
Shouldn't this be an open item for PG17?
--
With Regards,
Amit Kapila.
Dear Amit, Vingesh,
Thanks for giving comments!
It seems disabling subscriptions on the primary can make the primary
stop functioning for some duration of time. I feel we need some
solution where after converting to subscriber, we disable and drop
pre-existing subscriptions. One idea could be that we use the list of
new subscriptions created by the tool such that any subscription not
existing in that list will be dropped.
Previously I avoided coding like yours, because there is a room that converted
node can connect to another publisher. But per off-list discussion, we can skip
it by setting max_logical_replication_workers = 0. I refactored with the approach.
Note that the GUC is checked at verification phase, so an attribute is added to
start_standby_server() to select the workload.
Most of comments by Vignesh were invalidated due to the code change, but I hoped
I checked your comments were not reproduced. Also, 0001 was created to remove an
unused attribute.
Shouldn't this be an open item for PG17?
Added this thread to wikipage.
Best Regards,
Hayato Kuroda
FUJITSU LIMITED
https://www.fujitsu.com/
Attachments:
v2-0001-pg_createsubscriber-remove-unused-attribute.patchapplication/octet-stream; name=v2-0001-pg_createsubscriber-remove-unused-attribute.patchDownload
From a7ac110316909222dee9d92651ca0eb440a5376d Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Thu, 27 Jun 2024 06:04:55 +0000
Subject: [PATCH v2 1/2] pg_createsubscriber: remove unused attribute
---
src/bin/pg_basebackup/pg_createsubscriber.c | 1 -
1 file changed, 1 deletion(-)
diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index 1138c20e56..63317791f8 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -47,7 +47,6 @@ struct CreateSubscriberOptions
struct LogicalRepInfo
{
- Oid oid; /* database OID */
char *dbname; /* database name */
char *pubconninfo; /* publisher connection string */
char *subconninfo; /* subscriber connection string */
--
2.43.0
v2-0002-pg_createsubscriber-Drop-pre-existing-subscriptio.patchapplication/octet-stream; name=v2-0002-pg_createsubscriber-Drop-pre-existing-subscriptio.patchDownload
From 8a1e7c8a8eff38068eeda86ce4ccce9f5e209ba8 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Fri, 21 Jun 2024 10:55:46 +0000
Subject: [PATCH v2 2/2] pg_createsubscriber: Drop pre-existing subscriptions
from the converted instance
Previously, we did nothing for pre-existing subscriptions on the streaming
replication cluster. However, after the conversion, the downstream node will try
to connect to another publisher node specified by the pre-existing subscriptions,
which will cause an ERROR. To avoid failure, drop such subscriptions from the
converted node.
---
src/bin/pg_basebackup/pg_createsubscriber.c | 96 +++++++++++++-
.../t/041_pg_createsubscriber_added.pl | 118 ++++++++++++++++++
2 files changed, 210 insertions(+), 4 deletions(-)
create mode 100644 src/bin/pg_basebackup/t/041_pg_createsubscriber_added.pl
diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index 63317791f8..5f34f2d501 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -91,7 +91,8 @@ static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
const char *slot_name);
static void pg_ctl_status(const char *pg_ctl_cmd, int rc);
static void start_standby_server(const struct CreateSubscriberOptions *opt,
- bool restricted_access);
+ bool restricted_access,
+ bool restricted_worker);
static void stop_standby_server(const char *datadir);
static void wait_for_end_recovery(const char *conninfo,
const struct CreateSubscriberOptions *opt);
@@ -101,6 +102,9 @@ static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinf
static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
const char *lsn);
static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
+static void check_and_drop_existing_subscriptions(PGconn *conn,
+ const struct LogicalRepInfo *dbinfo);
+static void drop_existing_subscriptions(PGconn *conn, const char *subname);
#define USEC_PER_SEC 1000000
#define WAIT_INTERVAL 1 /* 1 second */
@@ -1017,6 +1021,75 @@ check_subscriber(const struct LogicalRepInfo *dbinfo)
exit(1);
}
+/*
+ * Drop a specified subscription. This is used for resolving the duplication of
+ * subscriptions on the primary and standby, so no need tp drop the replication
+ * slot. It will be used by the publisher node.
+ */
+static void
+drop_existing_subscriptions(PGconn *conn, const char *subname)
+{
+ PQExpBuffer query = createPQExpBuffer();
+ PGresult *res;
+
+ Assert(conn != NULL);
+
+ /*
+ * Construct a query string. These commands are allowed to be executed
+ * within a transaction.
+ */
+ appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s DISABLE;",
+ subname);
+ appendPQExpBuffer(query, " ALTER SUBSCRIPTION %s SET (slot_name = NONE);",
+ subname);
+ appendPQExpBuffer(query, " DROP SUBSCRIPTION %s;", subname);
+
+ res = PQexec(conn, query->data);
+
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ {
+ pg_log_error("could not drop a subscription \"%s\" settings: %s",
+ subname, PQresultErrorMessage(res));
+ disconnect_database(conn, true);
+ }
+
+ PQclear(res);
+ destroyPQExpBuffer(query);
+}
+
+/*
+ * Find out subscriptions not created by pg_createsubscriber, and drop them
+ */
+static void
+check_and_drop_existing_subscriptions(PGconn *conn,
+ const struct LogicalRepInfo *dbinfo)
+{
+ PQExpBuffer query = createPQExpBuffer();
+ PGresult *res;
+
+ Assert(conn != NULL);
+
+ appendPQExpBuffer(query,
+ "SELECT s.subname FROM pg_catalog.pg_subscription s "
+ "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
+ "WHERE d.datname = '%s'",
+ dbinfo->dbname);
+ res = PQexec(conn, query->data);
+
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ pg_log_error("could not obtain pre-existing subscriptions: %s",
+ PQresultErrorMessage(res));
+ disconnect_database(conn, true);
+ }
+
+ for (int i = 0; i < PQntuples(res); i++)
+ drop_existing_subscriptions(conn, PQgetvalue(res, i, 0));
+
+ PQclear(res);
+ destroyPQExpBuffer(query);
+}
+
/*
* Create the subscriptions, adjust the initial location for logical
* replication and enable the subscriptions. That's the last step for logical
@@ -1032,6 +1105,15 @@ setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
/* Connect to subscriber. */
conn = connect_database(dbinfo[i].subconninfo, true);
+ /*
+ * If the streaming replication cluster has subscriptions, a converted
+ * node may connect to another publisher node. This could cause an
+ * ERROR due to a slot acquirement miss or data inconsistency because
+ * only the converted node receives changes. To avoid it, drop
+ * pre-existing subscriptions from the converted node.
+ */
+ check_and_drop_existing_subscriptions(conn, &dbinfo[i]);
+
/*
* Since the publication was created before the consistent LSN, it is
* available on the subscriber when the physical replica is promoted.
@@ -1306,7 +1388,8 @@ pg_ctl_status(const char *pg_ctl_cmd, int rc)
}
static void
-start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access)
+start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access,
+ bool restricted_worker)
{
PQExpBuffer pg_ctl_cmd = createPQExpBuffer();
int rc;
@@ -1334,6 +1417,11 @@ start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_
if (opt->config_file != NULL)
appendPQExpBuffer(pg_ctl_cmd, " -o \"-c config_file=%s\"",
opt->config_file);
+
+ /* Suppress to start logical replication if requested */
+ if (restricted_worker)
+ appendPQExpBuffer(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");
+
pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd->data);
rc = system(pg_ctl_cmd->data);
pg_ctl_status(pg_ctl_cmd->data, rc);
@@ -2058,7 +2146,7 @@ main(int argc, char **argv)
* transformation steps.
*/
pg_log_info("starting the standby with command-line options");
- start_standby_server(&opt, true);
+ start_standby_server(&opt, true, false);
/* Check if the standby server is ready for logical replication */
check_subscriber(dbinfo);
@@ -2092,7 +2180,7 @@ main(int argc, char **argv)
* until accepting connections.
*/
pg_log_info("starting the subscriber");
- start_standby_server(&opt, true);
+ start_standby_server(&opt, true, true);
/* Waiting the subscriber to be promoted */
wait_for_end_recovery(dbinfo[0].subconninfo, &opt);
diff --git a/src/bin/pg_basebackup/t/041_pg_createsubscriber_added.pl b/src/bin/pg_basebackup/t/041_pg_createsubscriber_added.pl
new file mode 100644
index 0000000000..59cddb2fc1
--- /dev/null
+++ b/src/bin/pg_basebackup/t/041_pg_createsubscriber_added.pl
@@ -0,0 +1,118 @@
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Construct a cascading replication system like:
+#
+# node_a --(logical replication)--> node_b --(streaming replication)--> node_c
+#
+
+# Set up node A as publisher
+my $node_a = PostgreSQL::Test::Cluster->new('node_a');
+my $aconnstr = $node_a->connstr;
+$node_a->init(allows_streaming => 'logical');
+$node_a->start;
+
+# On node A
+# - create databases
+# - create test tables
+# - insert a row
+# - create publications
+$node_a->safe_psql(
+ 'postgres', q(
+ CREATE DATABASE pg1;
+ CREATE DATABASE pg2;
+));
+$node_a->safe_psql('pg1', 'CREATE TABLE tbl1 (a text)');
+$node_a->safe_psql('pg1', "INSERT INTO tbl1 VALUES('first row')");
+$node_a->safe_psql('pg2', 'CREATE TABLE tbl2 (a text)');
+$node_a->safe_psql('pg1', 'CREATE PUBLICATION pub1_pg1 FOR ALL TABLES');
+$node_a->safe_psql('pg1', 'CREATE PUBLICATION pub2_pg1');
+$node_a->safe_psql('pg2', 'CREATE PUBLICATION pub1_pg2 FOR ALL TABLES');
+$node_a->safe_psql('pg2', 'CREATE PUBLICATION pub2_pg2');
+
+# Set up node B as subscriber/primary
+my $node_b = PostgreSQL::Test::Cluster->new('node_b');
+my $bconnstr = $node_b->connstr;
+$node_b->init(allows_streaming => 'logical');
+$node_b->start;
+
+# On node B
+# - create databases
+# - create subscriptions
+$node_b->safe_psql(
+ 'postgres', q(
+ CREATE DATABASE pg1;
+ CREATE DATABASE pg2;
+));
+$node_b->safe_psql('pg1', 'CREATE TABLE tbl1 (a text)');
+$node_b->safe_psql('pg2', 'CREATE TABLE tbl2 (a text)');
+$node_b->safe_psql('pg1',
+ "CREATE SUBSCRIPTION sub1_pg1 CONNECTION '$aconnstr dbname=pg1' PUBLICATION pub1_pg1");
+$node_b->safe_psql('pg1',
+ "CREATE SUBSCRIPTION sub2_pg1 CONNECTION '$aconnstr dbname=pg1' PUBLICATION pub2_pg1");
+$node_b->safe_psql('pg2',
+ "CREATE SUBSCRIPTION sub1_pg2 CONNECTION '$aconnstr dbname=pg2' PUBLICATION pub1_pg2");
+$node_b->safe_psql('pg2',
+ "CREATE SUBSCRIPTION sub2_pg2 CONNECTION '$aconnstr dbname=pg2' PUBLICATION pub2_pg2");
+
+$node_b->wait_for_subscription_sync($node_a, 'sub1_pg1');
+$node_b->wait_for_subscription_sync($node_a, 'sub1_pg2');
+
+# Confirms logical replication works well
+my $result = $node_b->safe_psql('pg1', 'SELECT * FROM tbl1;');
+is($result, 'first row', 'check logical replication works well');
+
+# Set up node C as standby
+$node_b->backup('backup_1');
+my $node_c = PostgreSQL::Test::Cluster->new('node_c');
+$node_c->init_from_backup($node_b, 'backup_1', has_streaming => 1);
+$node_c->append_conf(
+ 'postgresql.conf', qq[
+primary_conninfo = '$bconnstr'
+]);
+$node_c->set_standby_mode();
+$node_c->start;
+
+$node_b->wait_for_replay_catchup($node_c);
+
+# Confirms streaming replication works well
+$result = $node_c->safe_psql('pg1', 'SELECT * FROM tbl1;');
+is($result, 'first row', 'check streaming replication works well');
+
+$node_c->stop;
+
+# Run pg_createsubscriber
+command_ok(
+ [
+ 'pg_createsubscriber', '--verbose',
+ '--recovery-timeout', "$PostgreSQL::Test::Utils::timeout_default",
+ '--verbose',
+ '--pgdata', $node_c->data_dir,
+ '--publisher-server', $bconnstr,
+ '--socket-directory', $node_c->host,
+ '--subscriber-port', $node_c->port,
+ '--database', 'pg1',
+ '--database', 'pg2'
+ ],
+ 'run pg_createsubscriber on node S');
+
+# Confirms pre-existing subscriptions are removed from the converted node
+$node_c->start;
+$result = $node_c->safe_psql('pg1',
+ "SELECT subname FROM pg_subscription WHERE subname NOT LIKE 'pg_createsubscriber%';");
+is($result, '', 'check subscriptions are removed');
+
+# Confirms pre-existing subscriptions still exist on the primary node
+$result = $node_b->safe_psql('pg1',
+ "SELECT subname, subenabled, subslotname FROM pg_subscription WHERE subname NOT LIKE 'pg_createsubscriber%' ORDER BY subname;");
+is($result, 'sub1_pg1|t|sub1_pg1
+sub1_pg2|t|sub1_pg2
+sub2_pg1|t|sub2_pg1
+sub2_pg2|t|sub2_pg2', 'check subscriptions still exist');
+
+done_testing();
--
2.43.0
On Thu, Jun 27, 2024 at 11:47 AM Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:
It seems disabling subscriptions on the primary can make the primary
stop functioning for some duration of time. I feel we need some
solution where after converting to subscriber, we disable and drop
pre-existing subscriptions. One idea could be that we use the list of
new subscriptions created by the tool such that any subscription not
existing in that list will be dropped.Previously I avoided coding like yours, because there is a room that converted
node can connect to another publisher. But per off-list discussion, we can skip
it by setting max_logical_replication_workers = 0. I refactored with the approach.
Note that the GUC is checked at verification phase, so an attribute is added to
start_standby_server() to select the workload.
Thanks, this is a better approach. I have changed a few comments and
made some other cosmetic changes. See attached.
Euler, Peter E., and others, do you have any comments/suggestions?
BTW, why have you created a separate test file for this test? I think
we should add a new test to one of the existing tests in
040_pg_createsubscriber. You can create a dummy subscription on node_p
and do a test similar to what we are doing in "# Create failover slot
to test its removal".
--
With Regards,
Amit Kapila.
Attachments:
v3-0001-pg_createsubscriber-Drop-pre-existing-subscriptio.patchapplication/octet-stream; name=v3-0001-pg_createsubscriber-Drop-pre-existing-subscriptio.patchDownload
From d78c22f3caf3a72d4fe16d1b4346acf7800ad5ed Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Fri, 21 Jun 2024 10:55:46 +0000
Subject: [PATCH v3] pg_createsubscriber: Drop pre-existing subscriptions from
the converted instance
Previously, we did nothing for pre-existing subscriptions on the streaming
replication cluster. However, after the conversion, the downstream node will try
to connect to another publisher node specified by the pre-existing subscriptions,
which will cause an ERROR. To avoid failure, drop such subscriptions from the
converted node.
---
src/bin/pg_basebackup/pg_createsubscriber.c | 99 ++++++++++++++-
.../t/041_pg_createsubscriber_added.pl | 118 ++++++++++++++++++
2 files changed, 212 insertions(+), 5 deletions(-)
create mode 100644 src/bin/pg_basebackup/t/041_pg_createsubscriber_added.pl
diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index 1138c20e56..ec33d478a5 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -92,7 +92,8 @@ static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
const char *slot_name);
static void pg_ctl_status(const char *pg_ctl_cmd, int rc);
static void start_standby_server(const struct CreateSubscriberOptions *opt,
- bool restricted_access);
+ bool restricted_access,
+ bool restrict_logical_worker);
static void stop_standby_server(const char *datadir);
static void wait_for_end_recovery(const char *conninfo,
const struct CreateSubscriberOptions *opt);
@@ -102,6 +103,9 @@ static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinf
static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
const char *lsn);
static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
+static void check_and_drop_existing_subscriptions(PGconn *conn,
+ const struct LogicalRepInfo *dbinfo);
+static void drop_existing_subscriptions(PGconn *conn, const char *subname);
#define USEC_PER_SEC 1000000
#define WAIT_INTERVAL 1 /* 1 second */
@@ -1018,6 +1022,76 @@ check_subscriber(const struct LogicalRepInfo *dbinfo)
exit(1);
}
+/*
+ * Drop a specified subscription. This is to avoid duplicate subscriptions on
+ * the primary (publisher node) and the newly created subscriber. We
+ * shouldn't drop the associated slot as that would be used by the publisher
+ * node.
+ */
+static void
+drop_existing_subscriptions(PGconn *conn, const char *subname)
+{
+ PQExpBuffer query = createPQExpBuffer();
+ PGresult *res;
+
+ Assert(conn != NULL);
+
+ /*
+ * Construct a query string. These commands are allowed to be executed
+ * within a transaction.
+ */
+ appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s DISABLE;",
+ subname);
+ appendPQExpBuffer(query, " ALTER SUBSCRIPTION %s SET (slot_name = NONE);",
+ subname);
+ appendPQExpBuffer(query, " DROP SUBSCRIPTION %s;", subname);
+
+ res = PQexec(conn, query->data);
+
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ {
+ pg_log_error("could not drop a subscription \"%s\" settings: %s",
+ subname, PQresultErrorMessage(res));
+ disconnect_database(conn, true);
+ }
+
+ PQclear(res);
+ destroyPQExpBuffer(query);
+}
+
+/*
+ * Retrieve and drop the pre-existing subscriptions.
+ */
+static void
+check_and_drop_existing_subscriptions(PGconn *conn,
+ const struct LogicalRepInfo *dbinfo)
+{
+ PQExpBuffer query = createPQExpBuffer();
+ PGresult *res;
+
+ Assert(conn != NULL);
+
+ appendPQExpBuffer(query,
+ "SELECT s.subname FROM pg_catalog.pg_subscription s "
+ "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
+ "WHERE d.datname = '%s'",
+ dbinfo->dbname);
+ res = PQexec(conn, query->data);
+
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ pg_log_error("could not obtain pre-existing subscriptions: %s",
+ PQresultErrorMessage(res));
+ disconnect_database(conn, true);
+ }
+
+ for (int i = 0; i < PQntuples(res); i++)
+ drop_existing_subscriptions(conn, PQgetvalue(res, i, 0));
+
+ PQclear(res);
+ destroyPQExpBuffer(query);
+}
+
/*
* Create the subscriptions, adjust the initial location for logical
* replication and enable the subscriptions. That's the last step for logical
@@ -1033,6 +1107,14 @@ setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
/* Connect to subscriber. */
conn = connect_database(dbinfo[i].subconninfo, true);
+ /*
+ * We don't need the pre-existing subscriptions on the newly formed
+ * subscriber. They can connect to other publisher nodes and either
+ * get some unwarranted data or can lead to ERRORs in connecting to
+ * such nodes.
+ */
+ check_and_drop_existing_subscriptions(conn, &dbinfo[i]);
+
/*
* Since the publication was created before the consistent LSN, it is
* available on the subscriber when the physical replica is promoted.
@@ -1307,7 +1389,8 @@ pg_ctl_status(const char *pg_ctl_cmd, int rc)
}
static void
-start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access)
+start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access,
+ bool restrict_logical_worker)
{
PQExpBuffer pg_ctl_cmd = createPQExpBuffer();
int rc;
@@ -1335,6 +1418,11 @@ start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_
if (opt->config_file != NULL)
appendPQExpBuffer(pg_ctl_cmd, " -o \"-c config_file=%s\"",
opt->config_file);
+
+ /* Suppress to start logical replication if requested */
+ if (restrict_logical_worker)
+ appendPQExpBuffer(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");
+
pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd->data);
rc = system(pg_ctl_cmd->data);
pg_ctl_status(pg_ctl_cmd->data, rc);
@@ -2059,7 +2147,7 @@ main(int argc, char **argv)
* transformation steps.
*/
pg_log_info("starting the standby with command-line options");
- start_standby_server(&opt, true);
+ start_standby_server(&opt, true, false);
/* Check if the standby server is ready for logical replication */
check_subscriber(dbinfo);
@@ -2090,10 +2178,11 @@ main(int argc, char **argv)
/*
* Start subscriber so the recovery parameters will take effect. Wait
- * until accepting connections.
+ * until accepting connections. We don't want to start logical replication
+ * during setup.
*/
pg_log_info("starting the subscriber");
- start_standby_server(&opt, true);
+ start_standby_server(&opt, true, true);
/* Waiting the subscriber to be promoted */
wait_for_end_recovery(dbinfo[0].subconninfo, &opt);
diff --git a/src/bin/pg_basebackup/t/041_pg_createsubscriber_added.pl b/src/bin/pg_basebackup/t/041_pg_createsubscriber_added.pl
new file mode 100644
index 0000000000..59cddb2fc1
--- /dev/null
+++ b/src/bin/pg_basebackup/t/041_pg_createsubscriber_added.pl
@@ -0,0 +1,118 @@
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Construct a cascading replication system like:
+#
+# node_a --(logical replication)--> node_b --(streaming replication)--> node_c
+#
+
+# Set up node A as publisher
+my $node_a = PostgreSQL::Test::Cluster->new('node_a');
+my $aconnstr = $node_a->connstr;
+$node_a->init(allows_streaming => 'logical');
+$node_a->start;
+
+# On node A
+# - create databases
+# - create test tables
+# - insert a row
+# - create publications
+$node_a->safe_psql(
+ 'postgres', q(
+ CREATE DATABASE pg1;
+ CREATE DATABASE pg2;
+));
+$node_a->safe_psql('pg1', 'CREATE TABLE tbl1 (a text)');
+$node_a->safe_psql('pg1', "INSERT INTO tbl1 VALUES('first row')");
+$node_a->safe_psql('pg2', 'CREATE TABLE tbl2 (a text)');
+$node_a->safe_psql('pg1', 'CREATE PUBLICATION pub1_pg1 FOR ALL TABLES');
+$node_a->safe_psql('pg1', 'CREATE PUBLICATION pub2_pg1');
+$node_a->safe_psql('pg2', 'CREATE PUBLICATION pub1_pg2 FOR ALL TABLES');
+$node_a->safe_psql('pg2', 'CREATE PUBLICATION pub2_pg2');
+
+# Set up node B as subscriber/primary
+my $node_b = PostgreSQL::Test::Cluster->new('node_b');
+my $bconnstr = $node_b->connstr;
+$node_b->init(allows_streaming => 'logical');
+$node_b->start;
+
+# On node B
+# - create databases
+# - create subscriptions
+$node_b->safe_psql(
+ 'postgres', q(
+ CREATE DATABASE pg1;
+ CREATE DATABASE pg2;
+));
+$node_b->safe_psql('pg1', 'CREATE TABLE tbl1 (a text)');
+$node_b->safe_psql('pg2', 'CREATE TABLE tbl2 (a text)');
+$node_b->safe_psql('pg1',
+ "CREATE SUBSCRIPTION sub1_pg1 CONNECTION '$aconnstr dbname=pg1' PUBLICATION pub1_pg1");
+$node_b->safe_psql('pg1',
+ "CREATE SUBSCRIPTION sub2_pg1 CONNECTION '$aconnstr dbname=pg1' PUBLICATION pub2_pg1");
+$node_b->safe_psql('pg2',
+ "CREATE SUBSCRIPTION sub1_pg2 CONNECTION '$aconnstr dbname=pg2' PUBLICATION pub1_pg2");
+$node_b->safe_psql('pg2',
+ "CREATE SUBSCRIPTION sub2_pg2 CONNECTION '$aconnstr dbname=pg2' PUBLICATION pub2_pg2");
+
+$node_b->wait_for_subscription_sync($node_a, 'sub1_pg1');
+$node_b->wait_for_subscription_sync($node_a, 'sub1_pg2');
+
+# Confirms logical replication works well
+my $result = $node_b->safe_psql('pg1', 'SELECT * FROM tbl1;');
+is($result, 'first row', 'check logical replication works well');
+
+# Set up node C as standby
+$node_b->backup('backup_1');
+my $node_c = PostgreSQL::Test::Cluster->new('node_c');
+$node_c->init_from_backup($node_b, 'backup_1', has_streaming => 1);
+$node_c->append_conf(
+ 'postgresql.conf', qq[
+primary_conninfo = '$bconnstr'
+]);
+$node_c->set_standby_mode();
+$node_c->start;
+
+$node_b->wait_for_replay_catchup($node_c);
+
+# Confirms streaming replication works well
+$result = $node_c->safe_psql('pg1', 'SELECT * FROM tbl1;');
+is($result, 'first row', 'check streaming replication works well');
+
+$node_c->stop;
+
+# Run pg_createsubscriber
+command_ok(
+ [
+ 'pg_createsubscriber', '--verbose',
+ '--recovery-timeout', "$PostgreSQL::Test::Utils::timeout_default",
+ '--verbose',
+ '--pgdata', $node_c->data_dir,
+ '--publisher-server', $bconnstr,
+ '--socket-directory', $node_c->host,
+ '--subscriber-port', $node_c->port,
+ '--database', 'pg1',
+ '--database', 'pg2'
+ ],
+ 'run pg_createsubscriber on node S');
+
+# Confirms pre-existing subscriptions are removed from the converted node
+$node_c->start;
+$result = $node_c->safe_psql('pg1',
+ "SELECT subname FROM pg_subscription WHERE subname NOT LIKE 'pg_createsubscriber%';");
+is($result, '', 'check subscriptions are removed');
+
+# Confirms pre-existing subscriptions still exist on the primary node
+$result = $node_b->safe_psql('pg1',
+ "SELECT subname, subenabled, subslotname FROM pg_subscription WHERE subname NOT LIKE 'pg_createsubscriber%' ORDER BY subname;");
+is($result, 'sub1_pg1|t|sub1_pg1
+sub1_pg2|t|sub1_pg2
+sub2_pg1|t|sub2_pg1
+sub2_pg2|t|sub2_pg2', 'check subscriptions still exist');
+
+done_testing();
--
2.28.0.windows.1
Dear Amit,
Thanks for giving comments! PSA new version.
Thanks, this is a better approach. I have changed a few comments and
made some other cosmetic changes. See attached.
I checked your attached and LGTM. Based on that, I added some changes
like below:
- Made dbname be escaped while listing up pre-existing subscriptions
Previous version could not pass tests by recent commits.
- Skipped dropping subscriptions in dry_run mode
I found the issue while poring the test to 040_pg_createsubscriber.pl.
- Added info-level output to follow other drop_XXX functions
BTW, why have you created a separate test file for this test? I think
we should add a new test to one of the existing tests in
040_pg_createsubscriber.
I was separated a test file for just confirmation purpose, I've planned to merge.
New patch set did that.
You can create a dummy subscription on node_p
and do a test similar to what we are doing in "# Create failover slot
to test its removal".
Your approach looks better than mine. I followed the approach.
Best Regards,
Hayato Kuroda
FUJITSU LIMITED
https://www.fujitsu.com/
Attachments:
v4-0001-pg_createsubscriber-remove-unused-attribute.patchapplication/octet-stream; name=v4-0001-pg_createsubscriber-remove-unused-attribute.patchDownload
From 7724a29b6fabd7ef7d7aef516587be1f58c6330d Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Thu, 27 Jun 2024 06:04:55 +0000
Subject: [PATCH v4 1/2] pg_createsubscriber: remove unused attribute
---
src/bin/pg_basebackup/pg_createsubscriber.c | 1 -
1 file changed, 1 deletion(-)
diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index fecf5db365..fb57737f7c 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -48,7 +48,6 @@ struct CreateSubscriberOptions
struct LogicalRepInfo
{
- Oid oid; /* database OID */
char *dbname; /* database name */
char *pubconninfo; /* publisher connection string */
char *subconninfo; /* subscriber connection string */
--
2.43.0
v4-0002-pg_createsubscriber-Drop-pre-existing-subscriptio.patchapplication/octet-stream; name=v4-0002-pg_createsubscriber-Drop-pre-existing-subscriptio.patchDownload
From fc3b0148848b1a6891b079fb449b3fb9223f5a07 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Fri, 21 Jun 2024 10:55:46 +0000
Subject: [PATCH v4 2/2] pg_createsubscriber: Drop pre-existing subscriptions
from the converted instance
Previously, we did nothing for pre-existing subscriptions on the streaming
replication cluster. However, after the conversion, the downstream node will try
to connect to another publisher node specified by the pre-existing subscriptions,
which will cause an ERROR. To avoid failure, drop such subscriptions from the
converted node.
---
src/bin/pg_basebackup/pg_createsubscriber.c | 111 +++++++++++++++++-
.../t/040_pg_createsubscriber.pl | 14 +++
2 files changed, 120 insertions(+), 5 deletions(-)
diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index fb57737f7c..21dd50f808 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -92,7 +92,8 @@ static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
const char *slot_name);
static void pg_ctl_status(const char *pg_ctl_cmd, int rc);
static void start_standby_server(const struct CreateSubscriberOptions *opt,
- bool restricted_access);
+ bool restricted_access,
+ bool restrict_logical_worker);
static void stop_standby_server(const char *datadir);
static void wait_for_end_recovery(const char *conninfo,
const struct CreateSubscriberOptions *opt);
@@ -102,6 +103,10 @@ static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinf
static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
const char *lsn);
static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
+static void check_and_drop_existing_subscriptions(PGconn *conn,
+ const struct LogicalRepInfo *dbinfo);
+static void drop_existing_subscriptions(PGconn *conn, const char *subname,
+ const char *dbname);
#define USEC_PER_SEC 1000000
#define WAIT_INTERVAL 1 /* 1 second */
@@ -1025,6 +1030,87 @@ check_subscriber(const struct LogicalRepInfo *dbinfo)
exit(1);
}
+/*
+ * Drop a specified subscription. This is to avoid duplicate subscriptions on
+ * the primary (publisher node) and the newly created subscriber. We
+ * shouldn't drop the associated slot as that would be used by the publisher
+ * node.
+ */
+static void
+drop_existing_subscriptions(PGconn *conn, const char *subname, const char *dbname)
+{
+ PQExpBuffer query = createPQExpBuffer();
+ PGresult *res;
+
+ Assert(conn != NULL);
+
+ /*
+ * Construct a query string. These commands are allowed to be executed
+ * within a transaction.
+ */
+ appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s DISABLE;",
+ subname);
+ appendPQExpBuffer(query, " ALTER SUBSCRIPTION %s SET (slot_name = NONE);",
+ subname);
+ appendPQExpBuffer(query, " DROP SUBSCRIPTION %s;", subname);
+
+ pg_log_info("dropping subscription \"%s\" on database \"%s\"",
+ subname, dbname);
+
+ if (!dry_run)
+ {
+ res = PQexec(conn, query->data);
+
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ {
+ pg_log_error("could not drop a subscription \"%s\" settings: %s",
+ subname, PQresultErrorMessage(res));
+ disconnect_database(conn, true);
+ }
+
+ PQclear(res);
+ }
+
+ destroyPQExpBuffer(query);
+}
+
+/*
+ * Retrieve and drop the pre-existing subscriptions.
+ */
+static void
+check_and_drop_existing_subscriptions(PGconn *conn,
+ const struct LogicalRepInfo *dbinfo)
+{
+ PQExpBuffer query = createPQExpBuffer();
+ char *dbname;
+ PGresult *res;
+
+ Assert(conn != NULL);
+
+ dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
+
+ appendPQExpBuffer(query,
+ "SELECT s.subname FROM pg_catalog.pg_subscription s "
+ "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
+ "WHERE d.datname = %s",
+ dbname);
+ res = PQexec(conn, query->data);
+
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ pg_log_error("could not obtain pre-existing subscriptions: %s",
+ PQresultErrorMessage(res));
+ disconnect_database(conn, true);
+ }
+
+ for (int i = 0; i < PQntuples(res); i++)
+ drop_existing_subscriptions(conn, PQgetvalue(res, i, 0),
+ dbinfo->dbname);
+
+ PQclear(res);
+ destroyPQExpBuffer(query);
+}
+
/*
* Create the subscriptions, adjust the initial location for logical
* replication and enable the subscriptions. That's the last step for logical
@@ -1040,6 +1126,14 @@ setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
/* Connect to subscriber. */
conn = connect_database(dbinfo[i].subconninfo, true);
+ /*
+ * We don't need the pre-existing subscriptions on the newly formed
+ * subscriber. They can connect to other publisher nodes and either
+ * get some unwarranted data or can lead to ERRORs in connecting to
+ * such nodes.
+ */
+ check_and_drop_existing_subscriptions(conn, &dbinfo[i]);
+
/*
* Since the publication was created before the consistent LSN, it is
* available on the subscriber when the physical replica is promoted.
@@ -1314,7 +1408,8 @@ pg_ctl_status(const char *pg_ctl_cmd, int rc)
}
static void
-start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access)
+start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access,
+ bool restrict_logical_worker)
{
PQExpBuffer pg_ctl_cmd = createPQExpBuffer();
int rc;
@@ -1343,6 +1438,11 @@ start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_
if (opt->config_file != NULL)
appendPQExpBuffer(pg_ctl_cmd, " -o \"-c config_file=%s\"",
opt->config_file);
+
+ /* Suppress to start logical replication if requested */
+ if (restrict_logical_worker)
+ appendPQExpBuffer(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");
+
pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd->data);
rc = system(pg_ctl_cmd->data);
pg_ctl_status(pg_ctl_cmd->data, rc);
@@ -2067,7 +2167,7 @@ main(int argc, char **argv)
* transformation steps.
*/
pg_log_info("starting the standby with command-line options");
- start_standby_server(&opt, true);
+ start_standby_server(&opt, true, false);
/* Check if the standby server is ready for logical replication */
check_subscriber(dbinfo);
@@ -2098,10 +2198,11 @@ main(int argc, char **argv)
/*
* Start subscriber so the recovery parameters will take effect. Wait
- * until accepting connections.
+ * until accepting connections. We don't want to start logical replication
+ * during setup.
*/
pg_log_info("starting the subscriber");
- start_standby_server(&opt, true);
+ start_standby_server(&opt, true, true);
/* Waiting the subscriber to be promoted */
wait_for_end_recovery(dbinfo[0].subconninfo, &opt);
diff --git a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
index 1241bf6c6a..80002c5a17 100644
--- a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
+++ b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
@@ -298,6 +298,13 @@ my $result = $node_s->safe_psql('postgres',
"SELECT slot_name FROM pg_replication_slots WHERE slot_name = '$fslotname' AND synced AND NOT temporary"
);
is($result, 'failover_slot', 'failover slot is synced');
+
+# Create subscription to test its removal
+my $dummy_sub = 'regress_sub_dummy';
+$node_p->safe_psql($db1,
+ "CREATE SUBSCRIPTION $dummy_sub CONNECTION 'dbname=dummy' PUBLICATION pub_dummy WITH (connect=false)"
+);
+$node_p->wait_for_replay_catchup($node_s);
$node_s->stop;
# dry run mode on node S
@@ -372,6 +379,13 @@ $node_p->safe_psql($db2, "INSERT INTO tbl2 VALUES('row 1')");
# Start subscriber
$node_s->start;
+# Confirm the pre-existing subscription has been removed
+$result = $node_s->safe_psql(
+ 'postgres', qq(
+ SELECT count(*) FROM pg_subscription WHERE subname = '$dummy_sub'
+));
+is($result, qq(0), 'pre-existing subscription was dropped');
+
# Get subscription names
$result = $node_s->safe_psql(
'postgres', qq(
--
2.43.0
On Mon, Jul 1, 2024 at 11:44 AM Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:
You can create a dummy subscription on node_p
and do a test similar to what we are doing in "# Create failover slot
to test its removal".Your approach looks better than mine. I followed the approach.
LGTM.
--
With Regards,
Amit Kapila.
On Mon, 1 Jul 2024 at 11:44, Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:
Dear Amit,
Thanks for giving comments! PSA new version.
Thanks, this is a better approach. I have changed a few comments and
made some other cosmetic changes. See attached.I checked your attached and LGTM. Based on that, I added some changes
like below:- Made dbname be escaped while listing up pre-existing subscriptions
Previous version could not pass tests by recent commits.
- Skipped dropping subscriptions in dry_run mode
I found the issue while poring the test to 040_pg_createsubscriber.pl.
- Added info-level output to follow other drop_XXX functionsBTW, why have you created a separate test file for this test? I think
we should add a new test to one of the existing tests in
040_pg_createsubscriber.I was separated a test file for just confirmation purpose, I've planned to merge.
New patch set did that.You can create a dummy subscription on node_p
and do a test similar to what we are doing in "# Create failover slot
to test its removal".Your approach looks better than mine. I followed the approach.
Hi Kuroda-san,
I tested the patches on linux and windows and I confirm that it
successfully fixes the issue [1]/messages/by-id/CANhcyEWvimA1-f6hSrA=9qkfR5SonFb56b36M++vT=LiFj=76g@mail.gmail.com.
[1]: /messages/by-id/CANhcyEWvimA1-f6hSrA=9qkfR5SonFb56b36M++vT=LiFj=76g@mail.gmail.com
Thanks and Regards,
Shlok Kyal
On Tue, Jul 2, 2024 at 9:57 AM Shlok Kyal <shlok.kyal.oss@gmail.com> wrote:
On Mon, 1 Jul 2024 at 11:44, Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:Dear Amit,
Thanks for giving comments! PSA new version.
Thanks, this is a better approach. I have changed a few comments and
made some other cosmetic changes. See attached.I checked your attached and LGTM. Based on that, I added some changes
like below:- Made dbname be escaped while listing up pre-existing subscriptions
Previous version could not pass tests by recent commits.
- Skipped dropping subscriptions in dry_run mode
I found the issue while poring the test to 040_pg_createsubscriber.pl.
- Added info-level output to follow other drop_XXX functionsBTW, why have you created a separate test file for this test? I think
we should add a new test to one of the existing tests in
040_pg_createsubscriber.I was separated a test file for just confirmation purpose, I've planned to merge.
New patch set did that.You can create a dummy subscription on node_p
and do a test similar to what we are doing in "# Create failover slot
to test its removal".Your approach looks better than mine. I followed the approach.
Hi Kuroda-san,
I tested the patches on linux and windows and I confirm that it
successfully fixes the issue [1].
Thanks for the verification. I have pushed the patch.
--
With Regards,
Amit Kapila.