From 15678a86b82207bd91aa0b596e112ebaed1be915 Mon Sep 17 00:00:00 2001 From: Khanna Date: Thu, 26 Jun 2025 11:11:48 +0530 Subject: [PATCH v3 1/2] Support tables via pg_createsubscriber This patch adds support for specifying tables to be included in logical replication publications via pg_createsubscriber. Users can now pass multiple '--database' and '--table' options to define which tables should be published and subscribed for each database. Features: 1. Supports per-database table mapping using multiple '--database'/'--table' pairs. 2. Allows optional column lists and row filters. 3. If '--table' is omitted for a database, a 'FOR ALL TABLES' publication is created. 4. Adds TAP tests to validate combinations of database and table arguments. This improves fine-grained control over logical replication setup and aligns pg_createsubscriber CLI design with other tools like vacuumdb and pg_restore. --- doc/src/sgml/ref/pg_createsubscriber.sgml | 57 +++++ src/bin/pg_basebackup/pg_createsubscriber.c | 236 +++++++++++++++++- .../t/040_pg_createsubscriber.pl | 83 ++++++ 3 files changed, 366 insertions(+), 10 deletions(-) diff --git a/doc/src/sgml/ref/pg_createsubscriber.sgml b/doc/src/sgml/ref/pg_createsubscriber.sgml index bb9cc72576c..ddc8777e138 100644 --- a/doc/src/sgml/ref/pg_createsubscriber.sgml +++ b/doc/src/sgml/ref/pg_createsubscriber.sgml @@ -39,6 +39,10 @@ PostgreSQL documentation connstr + + + + table-name @@ -321,6 +325,59 @@ PostgreSQL documentation + + + + + Adds one or more specific tables to the publication for the most recently + specified . This option can be given multiple + times to include additional tables. + + + + The argument must be a fully qualified table name in one of the + following forms: + schema.table + db.schema.table + If the database name is provided, it must match the most recent + argument. + + + + A table specification may also include an optional column list and/or + row filter: + + + + schema.table(col1, col2, ...) — publishes + only the specified columns. + + + + + schema.table WHERE (predicate) — publishes + only rows that satisfy the given condition. + + + + + Both forms can be combined, e.g. + schema.table(col1, col2) WHERE (id > 100). + + + + + + + When is specified, only the listed tables are + included in the publication. It cannot be combined with + (which publishes all databases and all tables). + Within a database, if no options are given, all + tables are included by default. + + + + diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c index 3986882f042..0017a740b72 100644 --- a/src/bin/pg_basebackup/pg_createsubscriber.c +++ b/src/bin/pg_basebackup/pg_createsubscriber.c @@ -31,6 +31,21 @@ #define DEFAULT_SUB_PORT "50432" #define OBJECTTYPE_PUBLICATIONS 0x0001 +typedef struct TableSpec +{ + char *spec; + char *dbname; + char *pattern_regex; + char *pattern_part1_regex; + char *pattern_part2_regex; + char *pattern_part3_regex; + struct TableSpec *next; +} TableSpec; + +static TableSpec * table_list_head = NULL; +static TableSpec * table_list_tail = NULL; +static char *current_dbname = NULL; + /* Command-line options */ struct CreateSubscriberOptions { @@ -61,6 +76,7 @@ struct LogicalRepInfo bool made_replslot; /* replication slot was created */ bool made_publication; /* publication was created */ + TableSpec *tables; /* list of tables to be subscribed */ }; /* @@ -161,7 +177,6 @@ enum WaitPMResult POSTMASTER_STILL_STARTING }; - /* * Cleanup objects that were created by pg_createsubscriber if there is an * error. @@ -265,6 +280,7 @@ usage(void) printf(_(" --publication=NAME publication name\n")); printf(_(" --replication-slot=NAME replication slot name\n")); printf(_(" --subscription=NAME subscription name\n")); + printf(_(" --table table to subscribe to; can be specified multiple times\n")); printf(_(" -V, --version output version information, then exit\n")); printf(_(" -?, --help show this help, then exit\n")); printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT); @@ -505,6 +521,7 @@ store_pub_sub_info(const struct CreateSubscriberOptions *opt, else dbinfo[i].subname = NULL; /* Other fields will be filled later */ + dbinfo[i].tables = NULL; pg_log_debug("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i, dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)", @@ -525,6 +542,40 @@ store_pub_sub_info(const struct CreateSubscriberOptions *opt, i++; } + for (i = 0; i < num_dbs; i++) + { + TableSpec *prev = NULL; + TableSpec *cur = table_list_head; + TableSpec *filtered_head = NULL; + TableSpec *filtered_tail = NULL; + + while (cur != NULL) + { + TableSpec *next = cur->next; + + if (strcmp(cur->dbname, dbinfo[i].dbname) == 0) + { + if (prev) + prev->next = next; + else + table_list_head = next; + + cur->next = NULL; + if (!filtered_head) + filtered_head = filtered_tail = cur; + else + { + filtered_tail->next = cur; + filtered_tail = cur; + } + } + else + prev = cur; + cur = next; + } + dbinfo[i].tables = filtered_head; + } + return dbinfo; } @@ -1615,6 +1666,7 @@ create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo) PGresult *res; char *ipubname_esc; char *spubname_esc; + bool first_table = true; Assert(conn != NULL); @@ -1654,12 +1706,80 @@ create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo) pg_log_info("creating publication \"%s\" in database \"%s\"", dbinfo->pubname, dbinfo->dbname); - appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES", - ipubname_esc); + if (dbinfo->tables == NULL) + appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES", ipubname_esc); + else + { + appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR TABLE ", ipubname_esc); + + for (TableSpec * tbl = dbinfo->tables; tbl != NULL; tbl = tbl->next) + { + const char *params[2] = { + tbl->pattern_part2_regex, + tbl->pattern_part3_regex + }; + + PGresult *tres = PQexecParams(conn, "SELECT n.nspname, c.relname " + "FROM pg_class c " + "JOIN pg_namespace n ON n.oid = c.relnamespace " + "WHERE n.nspname ~ $1 " + "AND c.relname ~ $2 " + "AND c.relkind IN ('r','p') " + "ORDER BY 1, 2", + 2, NULL, params, NULL, NULL, 0); + + if (PQresultStatus(tres) != PGRES_TUPLES_OK) + pg_fatal("could not fetch tables for pattern \"%s\": %s", + tbl->spec, PQerrorMessage(conn)); + + if (PQntuples(tres) == 0) + pg_fatal("no matching tables found for pattern \"%s\"", tbl->spec); + + for (int i = 0; i < PQntuples(tres); i++) + { + char *sch = PQgetvalue(tres, i, 0); + char *relname = PQgetvalue(tres, i, 1); + char *escaped_schema = PQescapeIdentifier(conn, sch, strlen(sch)); + char *escaped_table = PQescapeIdentifier(conn, relname, strlen(relname)); + + appendPQExpBuffer(str, "%s%s.%s", + first_table ? "" : ", ", + escaped_schema, escaped_table); + + first_table = false; + + PQfreemem(escaped_schema); + PQfreemem(escaped_table); + } + PQclear(tres); + } + } pg_log_debug("command is: %s", str->data); - if (!dry_run) + if (dry_run) + { + res = PQexec(conn, "BEGIN"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + pg_log_error("could not begin transaction: %s", PQerrorMessage(conn)); + disconnect_database(conn, true); + } + PQclear(res); + + res = PQexec(conn, str->data); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + pg_log_error("could not create publication \"%s\" in database \"%s\": %s", + dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res)); + disconnect_database(conn, true); + } + PQclear(res); + + res = PQexec(conn, "ROLLBACK"); + PQclear(res); + } + else { res = PQexec(conn, str->data); if (PQresultStatus(res) != PGRES_COMMAND_OK) @@ -2047,6 +2167,7 @@ main(int argc, char **argv) {"replication-slot", required_argument, NULL, 3}, {"subscription", required_argument, NULL, 4}, {"clean", required_argument, NULL, 5}, + {"table", required_argument, NULL, 6}, {NULL, 0, NULL, 0} }; @@ -2127,14 +2248,20 @@ main(int argc, char **argv) opt.all_dbs = true; break; case 'd': - if (!simple_string_list_member(&opt.database_names, optarg)) { - simple_string_list_append(&opt.database_names, optarg); - num_dbs++; + if (current_dbname) + pg_free(current_dbname); + current_dbname = pg_strdup(optarg); + + if (!simple_string_list_member(&opt.database_names, optarg)) + { + simple_string_list_append(&opt.database_names, optarg); + num_dbs++; + } + else + pg_fatal("database \"%s\" specified more than once for -d/--database", optarg); + break; } - else - pg_fatal("database \"%s\" specified more than once for -d/--database", optarg); - break; case 'D': subscriber_dir = pg_strdup(optarg); canonicalize_path(subscriber_dir); @@ -2200,6 +2327,95 @@ main(int argc, char **argv) else pg_fatal("object type \"%s\" specified more than once for --clean", optarg); break; + case 6: + { + char *copy_arg; + char *first_dot; + char *second_dot; + char *dbname_arg = NULL; + char *schema_table_part; + TableSpec *ts; + PQExpBuffer dbbuf; + PQExpBuffer schemabuf; + PQExpBuffer namebuf; + int encoding; + int dotcnt; + + if (!current_dbname) + pg_fatal("--table specified without a preceding --database"); + + copy_arg = pg_strdup(optarg); + + first_dot = strchr(copy_arg, '.'); + if (first_dot != NULL) + second_dot = strchr(first_dot + 1, '.'); + else + second_dot = NULL; + + if (second_dot != NULL) + { + *first_dot = '\0'; + dbname_arg = copy_arg; + schema_table_part = first_dot + 1; + } + else + { + dbname_arg = NULL; + schema_table_part = copy_arg; + } + + if (dbname_arg != NULL && strcmp(dbname_arg, current_dbname) != 0) + pg_fatal("database name in --table argument \"%s\" does not match most recent --database \"%s\"", + dbname_arg, current_dbname); + + ts = pg_malloc0(sizeof(TableSpec)); + dbbuf = createPQExpBuffer(); + schemabuf = createPQExpBuffer(); + namebuf = createPQExpBuffer(); + encoding = pg_get_encoding_from_locale(NULL, false); + dotcnt = 0; + + ts->spec = pg_strdup(optarg); + ts->dbname = pg_strdup(current_dbname); + + patternToSQLRegex(encoding, dbbuf, schemabuf, namebuf, + schema_table_part, false, false, &dotcnt); + + if (dbname_arg != NULL) + dotcnt++; + + if (dotcnt == 2) + { + ts->pattern_part1_regex = pg_strdup(dbbuf->data); + ts->pattern_part2_regex = pg_strdup(schemabuf->data); + ts->pattern_part3_regex = namebuf->len > 0 ? pg_strdup(namebuf->data) : NULL; + } + else if (dotcnt == 1) + { + ts->pattern_part1_regex = NULL; + ts->pattern_part2_regex = pg_strdup(dbbuf->data); + ts->pattern_part3_regex = NULL; + } + else + pg_fatal("invalid table specification \"%s\"", optarg); + + destroyPQExpBuffer(dbbuf); + destroyPQExpBuffer(schemabuf); + destroyPQExpBuffer(namebuf); + pg_free(copy_arg); + + ts->next = NULL; + + if (!table_list_head) + table_list_head = table_list_tail = ts; + else + table_list_tail->next = ts; + + table_list_tail = ts; + + break; + } + default: /* getopt_long already emitted a complaint */ pg_log_error_hint("Try \"%s --help\" for more information.", progname); diff --git a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl index 229fef5b3b5..62e75af4bb5 100644 --- a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl +++ b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl @@ -537,9 +537,92 @@ my $sysid_s = $node_s->safe_psql('postgres', 'SELECT system_identifier FROM pg_control_system()'); ok($sysid_p != $sysid_s, 'system identifier was changed'); +# Declare database names +my $db3 = 'db3'; +my $db4 = 'db4'; + +# Create databases +$node_p->safe_psql('postgres', "CREATE DATABASE $db3"); +$node_p->safe_psql('postgres', "CREATE DATABASE $db4"); + +# Create additional schemas +$node_p->safe_psql($db3, "CREATE SCHEMA myschema"); +$node_p->safe_psql($db4, "CREATE SCHEMA otherschema"); + +# Test: Table-level publication creation +$node_p->safe_psql($db3, "CREATE TABLE public.t1 (id int, val text)"); +$node_p->safe_psql($db3, "CREATE TABLE public.t2 (id int, val text)"); +$node_p->safe_psql($db3, "CREATE TABLE myschema.t4 (id int, val text)"); + +$node_p->safe_psql($db4, + "CREATE TABLE public.t3 (id int, val text, extra int)"); +$node_p->safe_psql($db4, + "CREATE TABLE otherschema.t5 (id serial primary key, info text)"); + +# Create explicit publications +my $pubname1 = 'pub1'; +my $pubname2 = 'pub2'; + +# Initialize node_s2 as a fresh standby of node_p for table-level +# publication test. +$node_p->backup('backup_tablepub'); +my $node_s2 = PostgreSQL::Test::Cluster->new('node_s2'); +$node_s2->init_from_backup($node_p, 'backup_tablepub', has_streaming => 1); +$node_s2->start; +$node_s2->stop; + +# Run pg_createsubscriber with table-level options +command_ok( + [ + 'pg_createsubscriber', + '--verbose', + '--recovery-timeout' => $PostgreSQL::Test::Utils::timeout_default, + '--pgdata' => $node_s2->data_dir, + '--publisher-server' => $node_p->connstr($db3), + '--socketdir' => $node_s2->host, + '--subscriber-port' => $node_s2->port, + '--publication' => $pubname1, + '--publication' => $pubname2, + '--database' => $db3, + '--table' => "$db3.public.t1", + '--table' => "$db3.public.t2", + '--table' => "$db3.myschema.t4", + '--database' => $db4, + '--table' => "$db4.public.t3", + '--table' => "$db4.otherschema.t5", + ], + 'pg_createsubscriber runs with table-level publication (existing nodes)'); + +# Check publication tables for db3 with public schema first +my $actual1 = $node_p->safe_psql( + $db3, qq( + SELECT pubname || '|' || schemaname || '|' || tablename + FROM pg_publication_tables + WHERE pubname = '$pubname1' + ORDER BY schemaname, tablename + ) +); +is( $actual1, + "$pubname1|myschema|t4\n$pubname1|public|t1\n$pubname1|public|t2", + 'publication includes tables in public and myschema schemas on db3'); + +# Check publication tables for db4, with public schema first +my $actual2 = $node_p->safe_psql( + $db4, qq( + SELECT pubname || '|' || schemaname || '|' || tablename + FROM pg_publication_tables + WHERE pubname = '$pubname2' + ORDER BY schemaname, tablename + ) +); +is( $actual2, + "$pubname2|otherschema|t5\n$pubname2|public|t3", + 'publication includes tables in public and otherschema schemas on db4'); + # clean up $node_p->teardown_node; $node_s->teardown_node; +$node_s2->teardown_node; $node_t->teardown_node; $node_f->teardown_node; -- 2.41.0.windows.3