From 7378b26f9ee6e5eebf2756b174f101059d2c670e Mon Sep 17 00:00:00 2001 From: Khanna Date: Thu, 23 Jan 2025 12:39:03 +0530 Subject: [PATCH v1] Support for dropping all publications in 'pg_createsubscriber' This patch introduces a new '--clean-publisher-objects' option in the 'pg_createsubscriber utility'. This feature ensures a clean and streamlined setup of logical replication by removing stale or unnecessary publications from the subscriber node. These publications, replicated during streaming replication, become redundant after converting to logical replication and serve no further purpose. By ensuring that outdated publications are removed, it helps avoid potential conflicts and simplifies replication management. A new 'drop_all_publications()' function is added to fetch and drop all publications on the subscriber node within a single transaction. Since this cleanup is not required when upgrading streaming replication clusters,this feature is supported only when the '--clean-publisher-objects' option is specified, allowing users to choose accordingly. --- doc/src/sgml/ref/pg_createsubscriber.sgml | 12 +++ src/bin/pg_basebackup/pg_createsubscriber.c | 73 ++++++++++++++++++- .../t/040_pg_createsubscriber.pl | 65 +++++++++++++++++ 3 files changed, 149 insertions(+), 1 deletion(-) diff --git a/doc/src/sgml/ref/pg_createsubscriber.sgml b/doc/src/sgml/ref/pg_createsubscriber.sgml index 26b8e64a4e..c922cc1ed0 100644 --- a/doc/src/sgml/ref/pg_createsubscriber.sgml +++ b/doc/src/sgml/ref/pg_createsubscriber.sgml @@ -87,6 +87,18 @@ PostgreSQL documentation command-line arguments: + + + + + + The pg_createsubscriber now supports the + to remove all publications on + the subscriber node before creating a new subscription. + + + + diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c index faf18ccf13..900045c00a 100644 --- a/src/bin/pg_basebackup/pg_createsubscriber.c +++ b/src/bin/pg_basebackup/pg_createsubscriber.c @@ -43,6 +43,7 @@ struct CreateSubscriberOptions SimpleStringList sub_names; /* list of subscription names */ SimpleStringList replslot_names; /* list of replication slot names */ int recovery_timeout; /* stop recovery after this time */ + bool clean_publisher_objects; /* Drop all publications */ }; struct LogicalRepInfo @@ -98,6 +99,7 @@ static void wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt); static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo); static void drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo); +static void drop_all_publications(const struct LogicalRepInfo *dbinfo); static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo); static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn); @@ -220,6 +222,7 @@ usage(void) printf(_("Usage:\n")); printf(_(" %s [OPTION]...\n"), progname); printf(_("\nOptions:\n")); + printf(_(" -C --clean-publisher-objects drop all publications on the logical replica\n")); printf(_(" -d, --database=DBNAME database in which to create a subscription\n")); printf(_(" -D, --pgdata=DATADIR location for the subscriber data directory\n")); printf(_(" -n, --dry-run dry run, just show what would be done\n")); @@ -1860,11 +1863,72 @@ enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo) destroyPQExpBuffer(str); } +static void +drop_all_publications(const struct LogicalRepInfo *dbinfo) +{ + char *search_query = "SELECT pubname FROM pg_catalog.pg_publication;"; + + for (int i = 0; i < num_dbs; i++) + { + PGconn *conn; + PGresult *res; + int num_rows; + PQExpBuffer query = createPQExpBuffer(); + + /* Connect to the subscriber */ + conn = connect_database(dbinfo[i].subconninfo, true); + + /* Fetch all publications */ + res = PQexec(conn, search_query); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + pg_log_warning("could not obtain publication information: %s", + PQresultErrorMessage(res)); + + PQclear(res); + disconnect_database(conn, false); + continue; + } + + num_rows = PQntuples(res); + + for (int j = 0; j < num_rows; j++) + { + char *pubname = PQgetvalue(res, j, 0); + PGresult *res_for_drop; + + pg_log_debug("dropping publication \"%s\"", pubname); + + appendPQExpBuffer(query, "DROP PUBLICATION %s;", pubname); + + if (!dry_run) + { + res_for_drop = PQexec(conn, query->data); + + if (PQresultStatus(res_for_drop) != PGRES_COMMAND_OK) + { + pg_log_warning("could not drop publication \"%s\": %s", + pubname, PQresultErrorMessage(res)); + } + + PQclear(res_for_drop); + } + + resetPQExpBuffer(query); + } + + disconnect_database(conn, false); + destroyPQExpBuffer(query); + } +} + int main(int argc, char **argv) { static struct option long_options[] = { + {"clean-publisher-objects", no_argument, NULL, 'C'}, {"database", required_argument, NULL, 'd'}, {"pgdata", required_argument, NULL, 'D'}, {"dry-run", no_argument, NULL, 'n'}, @@ -1927,6 +1991,7 @@ main(int argc, char **argv) opt.socket_dir = NULL; opt.sub_port = DEFAULT_SUB_PORT; opt.sub_username = NULL; + opt.clean_publisher_objects = false; opt.database_names = (SimpleStringList) { 0 @@ -1949,11 +2014,14 @@ main(int argc, char **argv) get_restricted_token(); - while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:U:v", + while ((c = getopt_long(argc, argv, "Cd:D:np:P:s:t:U:v", long_options, &option_index)) != -1) { switch (c) { + case 'C': + opt.clean_publisher_objects = true; + break; case 'd': if (!simple_string_list_member(&opt.database_names, optarg)) { @@ -2237,6 +2305,9 @@ main(int argc, char **argv) /* Remove failover replication slots if they exist on subscriber */ drop_failover_replication_slots(dbinfo); + /* Drop publications from the subscriber if requested */ + drop_all_publications(dbinfo); + /* Stop the subscriber */ pg_log_info("stopping the subscriber"); stop_standby_server(subscriber_dir); diff --git a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl index c8dbdb7e9b..2a95962cbd 100644 --- a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl +++ b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl @@ -448,10 +448,75 @@ my $sysid_s = $node_s->safe_psql('postgres', 'SELECT system_identifier FROM pg_control_system()'); ok($sysid_p != $sysid_s, 'system identifier was changed'); +# Set up node A as primary +my $node_a = PostgreSQL::Test::Cluster->new('node_a'); +my $aconnstr = $node_a->connstr; +$node_a->init(allows_streaming => 'logical'); +$node_a->append_conf('postgresql.conf', 'autovacuum = off'); +$node_a->start; + +# Set up node B as standby linking to node A +$node_a->backup('backup_3'); +my $node_b = PostgreSQL::Test::Cluster->new('node_b'); +$node_b->init_from_backup($node_a, 'backup_3', has_streaming => 1); +$node_b->append_conf( + 'postgresql.conf', qq[ + primary_conninfo = '$aconnstr' + hot_standby_feedback = on + max_logical_replication_workers = 5 + ]); +$node_b->set_standby_mode(); +$node_b->start; + +# Ensure there are some user databases on the publisher +my $db3 = generate_db($node_a, 'regression', 91, 127, ''); + +# Create publications to test it's removal +$node_a->safe_psql($db3, "CREATE PUBLICATION test_pub FOR ALL TABLES;"); +$node_a->safe_psql($db3, "CREATE PUBLICATION test_pub2 FOR ALL TABLES;"); + +# Verify the existing publications +my $pub_count_before = + $node_a->safe_psql($db3, "SELECT COUNT(*) FROM pg_publication;"); +is($pub_count_before, '2', + 'two publications created before --clean-publisher-objects is run'); + +$node_b->stop; + +# Run pg_createsubscriber on node A using --clean-publisher-objects. +# --verbose is used twice to show more information. +command_ok( + [ + 'pg_createsubscriber', + '--verbose', '--verbose', + '--recovery-timeout' => $PostgreSQL::Test::Utils::timeout_default, + '--pgdata' => $node_b->data_dir, + '--publisher-server' => $node_a->connstr($db3), + '--socketdir' => $node_b->host, + '--subscriber-port' => $node_b->port, + '--database' => $db3, + '--clean-publisher-objects', + ], + 'run pg_createsubscriber with --clean-publisher-objects on node A'); + +$node_b->start; + +# Confirm publications are removed +my $pub_count_after = + $node_b->safe_psql($db3, "SELECT COUNT(*) FROM pg_publication;"); +is($pub_count_after, '0', + 'all publications dropped after --clean-publisher-objects is run'); + +# Drop the newly created publications +$node_a->safe_psql($db3, "DROP PUBLICATION IF EXISTS test_pub;"); +$node_a->safe_psql($db3, "DROP PUBLICATION IF EXISTS test_pub2;"); + # clean up $node_p->teardown_node; $node_s->teardown_node; $node_t->teardown_node; $node_f->teardown_node; +$node_a->teardown_node; +$node_b->teardown_node; done_testing(); -- 2.41.0.windows.3