From 5cb30d2b2af6064684247d75e00b414f537a21b9 Mon Sep 17 00:00:00 2001 From: Khanna Date: Thu, 26 Jun 2025 11:11:48 +0530 Subject: [PATCH v2] Support tables via pg_createsubscriber This patch adds support for specifying tables to be included in logical replication publications via pg_createsubscriber. Users can now pass multiple '--database' and '--table' options to define which tables should be published and subscribed for each database. Features: 1. Supports per-database table mapping using multiple '--database'/'--table' pairs. 2. Allows optional column lists and row filters. 3. If '--table' is omitted for a database, a 'FOR ALL TABLES' publication is created. 4. Adds TAP tests to validate combinations of database and table arguments. This improves fine-grained control over logical replication setup and aligns pg_createsubscriber CLI design with other tools like vacuumdb and pg_restore. --- doc/src/sgml/ref/pg_createsubscriber.sgml | 11 ++ src/bin/pg_basebackup/pg_createsubscriber.c | 173 +++++++++++++++++- .../t/040_pg_createsubscriber.pl | 80 ++++++++ 3 files changed, 262 insertions(+), 2 deletions(-) diff --git a/doc/src/sgml/ref/pg_createsubscriber.sgml b/doc/src/sgml/ref/pg_createsubscriber.sgml index bb9cc72576c..2b70fc8851f 100644 --- a/doc/src/sgml/ref/pg_createsubscriber.sgml +++ b/doc/src/sgml/ref/pg_createsubscriber.sgml @@ -321,6 +321,17 @@ PostgreSQL documentation + + + + + Adds a table to be included in the publication for the most recently + specified database. Can be repeated multiple times. The syntax + supports optional column lists and WHERE clauses. + + + + diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c index 3986882f042..7fe71ece6e9 100644 --- a/src/bin/pg_basebackup/pg_createsubscriber.c +++ b/src/bin/pg_basebackup/pg_createsubscriber.c @@ -31,6 +31,27 @@ #define DEFAULT_SUB_PORT "50432" #define OBJECTTYPE_PUBLICATIONS 0x0001 +typedef struct TableSpec +{ + char *spec; + char *pattern_regex; + char *pattern_db_regex; + char *pattern_schema_regex; + char *pattern_table_regex; + struct TableSpec *next; +} TableSpec; + +typedef struct TableListPerDB +{ + char *dbname; + TableSpec *tables; + struct TableListPerDB *next; +} TableListPerDB; + +static TableListPerDB * dblist_head = NULL; +static TableListPerDB * dblist_tail = NULL; +static TableListPerDB * dblist_cur = NULL; + /* Command-line options */ struct CreateSubscriberOptions { @@ -61,6 +82,7 @@ struct LogicalRepInfo bool made_replslot; /* replication slot was created */ bool made_publication; /* publication was created */ + TableSpec *tables; /* list of tables to be subscribed */ }; /* @@ -265,6 +287,7 @@ usage(void) printf(_(" --publication=NAME publication name\n")); printf(_(" --replication-slot=NAME replication slot name\n")); printf(_(" --subscription=NAME subscription name\n")); + printf(_(" --table table to subscribe to; can be specified multiple times\n")); printf(_(" -V, --version output version information, then exit\n")); printf(_(" -?, --help show this help, then exit\n")); printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT); @@ -505,6 +528,7 @@ store_pub_sub_info(const struct CreateSubscriberOptions *opt, else dbinfo[i].subname = NULL; /* Other fields will be filled later */ + dbinfo[i].tables = NULL; pg_log_debug("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i, dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)", @@ -525,6 +549,20 @@ store_pub_sub_info(const struct CreateSubscriberOptions *opt, i++; } + for (int j = 0; j < num_dbs; j++) + { + const char *dbname = dbinfo[j].dbname; + + for (TableListPerDB * cur = dblist_head; cur != NULL; cur = cur->next) + { + if (strcmp(cur->dbname, dbname) == 0) + { + dbinfo[j].tables = cur->tables; + break; + } + } + } + return dbinfo; } @@ -1654,11 +1692,79 @@ create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo) pg_log_info("creating publication \"%s\" in database \"%s\"", dbinfo->pubname, dbinfo->dbname); - appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES", - ipubname_esc); + if (dbinfo->tables == NULL) + appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES", ipubname_esc); + else + { + bool first = true; + + appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR TABLE ", ipubname_esc); + for (TableSpec * tbl = dbinfo->tables; tbl != NULL; tbl = tbl->next) + { + const char *params[2] = { + tbl->pattern_schema_regex, + tbl->pattern_table_regex + }; + + PGresult *tres = PQexecParams(conn, "SELECT n.nspname, c.relname " + "FROM pg_class c " + "JOIN pg_namespace n ON n.oid = c.relnamespace " + "WHERE n.nspname ~ $1 " + "AND c.relname ~ $2 " + "AND c.relkind IN ('r','p') " + "ORDER BY 1, 2", + 2, NULL, params, NULL, NULL, 0); + + if (PQresultStatus(tres) != PGRES_TUPLES_OK) + pg_fatal("could not fetch tables for pattern \"%s\": %s", + tbl->spec, PQerrorMessage(conn)); + + if (PQntuples(tres) == 0) + pg_fatal("no matching tables found for pattern \"%s\"", tbl->spec); + + for (int i = 0; i < PQntuples(tres); i++) + { + char *escaped_schema = PQescapeIdentifier(conn, PQgetvalue(tres, i, 0), + strlen(PQgetvalue(tres, i, 0))); + char *escaped_table = PQescapeIdentifier(conn, PQgetvalue(tres, i, 1), + strlen(PQgetvalue(tres, i, 1))); + + appendPQExpBuffer(str, "%s%s.%s", first ? "" : ", ", + escaped_schema, escaped_table); + + PQfreemem(escaped_schema); + PQfreemem(escaped_table); + first = false; + } + PQclear(tres); + } + } pg_log_debug("command is: %s", str->data); + if (dry_run) + { + res = PQexec(conn, "BEGIN"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + pg_log_error("could not begin transaction: %s", PQerrorMessage(conn)); + disconnect_database(conn, true); + } + PQclear(res); + + res = PQexec(conn, str->data); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + pg_log_error("could not create publication \"%s\" in database \"%s\": %s", + dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res)); + disconnect_database(conn, true); + } + PQclear(res); + + res = PQexec(conn, "ROLLBACK"); + PQclear(res); + } + if (!dry_run) { res = PQexec(conn, str->data); @@ -2047,6 +2153,7 @@ main(int argc, char **argv) {"replication-slot", required_argument, NULL, 3}, {"subscription", required_argument, NULL, 4}, {"clean", required_argument, NULL, 5}, + {"table", required_argument, NULL, 6}, {NULL, 0, NULL, 0} }; @@ -2127,6 +2234,7 @@ main(int argc, char **argv) opt.all_dbs = true; break; case 'd': + TableListPerDB * newdb; if (!simple_string_list_member(&opt.database_names, optarg)) { simple_string_list_append(&opt.database_names, optarg); @@ -2134,6 +2242,18 @@ main(int argc, char **argv) } else pg_fatal("database \"%s\" specified more than once for -d/--database", optarg); + + newdb = pg_malloc0(sizeof(TableListPerDB)); + newdb->dbname = pg_strdup(optarg); + newdb->tables = NULL; + newdb->next = NULL; + if (dblist_tail) + dblist_tail->next = newdb; + else + dblist_head = newdb; + + dblist_tail = newdb; + dblist_cur = newdb; break; case 'D': subscriber_dir = pg_strdup(optarg); @@ -2200,6 +2320,55 @@ main(int argc, char **argv) else pg_fatal("object type \"%s\" specified more than once for --clean", optarg); break; + case 6: + TableSpec * ts = pg_malloc0(sizeof(TableSpec)); + PQExpBuffer dbbuf; + PQExpBuffer schemabuf; + PQExpBuffer namebuf; + int encoding; + int dotcnt = 0; + + if (!dblist_cur) + pg_fatal("--table specified without a preceding --database"); + + ts->spec = pg_strdup(optarg); + dbbuf = createPQExpBuffer(); + schemabuf = createPQExpBuffer(); + namebuf = createPQExpBuffer(); + encoding = pg_get_encoding_from_locale(NULL, false); + + patternToSQLRegex(encoding, dbbuf, schemabuf, namebuf, optarg, + false, false, &dotcnt); + if (dotcnt == 2) + { + ts->pattern_db_regex = NULL; + ts->pattern_schema_regex = pg_strdup(schemabuf->data); + ts->pattern_table_regex = pg_strdup(namebuf->data); + } + else if (dotcnt == 1) + { + ts->pattern_db_regex = NULL; + ts->pattern_schema_regex = pg_strdup(dbbuf->data); + ts->pattern_table_regex = pg_strdup(schemabuf->data); + } + else + pg_fatal("invalid --table specification: %s", optarg); + + destroyPQExpBuffer(dbbuf); + destroyPQExpBuffer(schemabuf); + destroyPQExpBuffer(namebuf); + ts->next = NULL; + if (!dblist_cur->tables) + dblist_cur->tables = ts; + else + { + TableSpec *tail = dblist_cur->tables; + + while (tail->next) + tail = tail->next; + tail->next = ts; + } + break; default: /* getopt_long already emitted a complaint */ pg_log_error_hint("Try \"%s --help\" for more information.", progname); diff --git a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl index 229fef5b3b5..d80c3f1470f 100644 --- a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl +++ b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl @@ -537,9 +537,89 @@ my $sysid_s = $node_s->safe_psql('postgres', 'SELECT system_identifier FROM pg_control_system()'); ok($sysid_p != $sysid_s, 'system identifier was changed'); +# Declare database names +my $db3 = 'db3'; +my $db4 = 'db4'; + +# Create databases +$node_p->safe_psql('postgres', "CREATE DATABASE $db3"); +$node_p->safe_psql('postgres', "CREATE DATABASE $db4"); + +# Test: Table-level publication creation +$node_p->safe_psql($db3, "CREATE TABLE public.t1 (id int, val text)"); +$node_p->safe_psql($db3, "CREATE TABLE public.t2 (id int, val text)"); +$node_p->safe_psql($db4, + "CREATE TABLE public.t3 (id int, val text, extra int)"); + +# Initialize node_s2 as a fresh standby of node_p for table-level +# publication test. +$node_p->backup('backup_tablepub'); +my $node_s2 = PostgreSQL::Test::Cluster->new('node_s2'); +$node_s2->init_from_backup($node_p, 'backup_tablepub', has_streaming => 1); +$node_s2->start; +$node_s2->stop; + +# Run pg_createsubscriber with table-level options +command_ok( + [ + 'pg_createsubscriber', + '--verbose', + '--recovery-timeout' => $PostgreSQL::Test::Utils::timeout_default, + '--pgdata' => $node_s2->data_dir, + '--publisher-server' => $node_p->connstr($db3), + '--socketdir' => $node_s2->host, + '--subscriber-port' => $node_s2->port, + '--database' => $db3, + '--table' => "$db3.public.t1", + '--table' => "$db3.public.t2", + '--database' => $db4, + '--table' => "$db4.public.t3", + ], + 'pg_createsubscriber runs with table-level publication (existing nodes)'); + +# Get the publication name created by pg_createsubscriber for db3 +my $pubname1 = $node_p->safe_psql( + $db3, qq( + SELECT pubname FROM pg_publication + WHERE pubname LIKE 'pg_createsubscriber_%' + ORDER BY pubname LIMIT 1 +)); + +# Check publication tables for db3 +my $actual1 = $node_p->safe_psql( + $db3, qq( + SELECT pubname || '|public|' || tablename + FROM pg_publication_tables + WHERE pubname = '$pubname1' + ORDER BY tablename +)); +is($actual1, "$pubname1|public|t1\n$pubname1|public|t2", + 'single publication for both tables created successfully on database db3' +); + +# Get the publication name created by pg_createsubscriber for db4 +my $pubname2 = $node_p->safe_psql( + $db4, qq( + SELECT pubname FROM pg_publication + WHERE pubname LIKE 'pg_createsubscriber_%' + ORDER BY pubname LIMIT 1 +)); + +# Check publication tables for db4 +my $actual2 = $node_p->safe_psql( + $db4, qq( + SELECT pubname || '|public|' || tablename + FROM pg_publication_tables + WHERE pubname = '$pubname2' + ORDER BY tablename +)); +is($actual2, "$pubname2|public|t3", + 'single publication for t3 created successfully on database db4'); + # clean up $node_p->teardown_node; $node_s->teardown_node; +$node_s2->teardown_node; $node_t->teardown_node; $node_f->teardown_node; -- 2.34.1