From 894d0752b3d3248c27d37e6f02428f30b08d4eea Mon Sep 17 00:00:00 2001 From: Julien Rouhaud Date: Fri, 28 Jun 2019 13:21:58 +0200 Subject: [PATCH 3/4] Add parallel processing to reindexdb --- src/bin/scripts/Makefile | 2 +- src/bin/scripts/parallel.c | 52 ++- src/bin/scripts/parallel.h | 5 +- src/bin/scripts/reindexdb.c | 546 ++++++++++++++++++++++++++--- src/bin/scripts/t/090_reindexdb.pl | 12 +- src/bin/scripts/vacuumdb.c | 2 +- 6 files changed, 558 insertions(+), 61 deletions(-) diff --git a/src/bin/scripts/Makefile b/src/bin/scripts/Makefile index 6979d8f9ff..bf2e82c594 100644 --- a/src/bin/scripts/Makefile +++ b/src/bin/scripts/Makefile @@ -29,7 +29,7 @@ dropdb: dropdb.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake- dropuser: dropuser.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils clusterdb: clusterdb.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils vacuumdb: vacuumdb.o common.o parallel.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils -reindexdb: reindexdb.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils +reindexdb: reindexdb.o common.o parallel.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils pg_isready: pg_isready.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils install: all installdirs diff --git a/src/bin/scripts/parallel.c b/src/bin/scripts/parallel.c index 5e4505e9fe..bab49becaf 100644 --- a/src/bin/scripts/parallel.c +++ b/src/bin/scripts/parallel.c @@ -29,16 +29,21 @@ * GetIdleSlot * Return a connection slot that is ready to execute a command. * - * We return the first slot we find that is marked isFree, if one is; - * otherwise, we loop on select() until one socket becomes available. When - * this happens, we read the whole set and mark as free all sockets that become - * available. + * If pending_only is false, we return the first slot we find that is marked + * isFree, if one is; otherwise, we loop on select() until one socket becomes + * available. When this happens, we read the whole set and mark as free all + * sockets that become available. + * If pending_only is true, we filter out the slots that don't have any pending + * work to do, so we only return slots where slot is not NULL. This is useful + * if the caller pushed a list of items to process (provided as the root + * SimpleStringListCell of a SimpleStringList), to make sure that all pushed + * work is completed before exiting the program. * * If an error occurs, NULL is returned. */ ParallelSlot * GetIdleSlot(ParallelSlot slots[], int numslots, - const char *progname) + const char *progname, bool pending_only) { int i; int firstFree = -1; @@ -47,7 +52,23 @@ GetIdleSlot(ParallelSlot slots[], int numslots, for (i = 0; i < numslots; i++) { if (slots[i].isFree) - return slots + i; + { + /* + * If the caller didn't ask to filter the slots with only the one + * having pending items to process, we can return the first free + * slot we find. + */ + if (!pending_only) + return slots + i; + + /* + * If the caller asked to filter slots having pending items to + * process, check the current item and return the slot if it's not + * empty + */ + if (slots[i].cell) + return slots + i; + } } /* @@ -121,7 +142,23 @@ GetIdleSlot(ParallelSlot slots[], int numslots, /* This connection has become idle */ slots[i].isFree = true; if (firstFree < 0) - firstFree = i; + { + /* + * If the caller didn't ask to filter the slots with + * only the one having pending items to process, we can + * mark as usable the first free slot we find as free. + */ + if (!pending_only) + firstFree = i; + /* + * If the caller asked to filter slots having pending + * items to process, we need to check the current item + * and mark this slot as usable only if there's a + * pending item + */ + else if (slots[i].cell) + firstFree = i; + } break; } } @@ -277,6 +314,7 @@ void init_slot(ParallelSlot *slot, PGconn *conn) { slot->connection = conn; + slot->cell = NULL; /* Initially assume connection is idle */ slot->isFree = true; } diff --git a/src/bin/scripts/parallel.h b/src/bin/scripts/parallel.h index be58e0bb96..f3ce696472 100644 --- a/src/bin/scripts/parallel.h +++ b/src/bin/scripts/parallel.h @@ -9,15 +9,18 @@ #ifndef SCRIPTS_PARALLEL_H #define SCRIPTS_PARALLEL_H +#include "fe_utils/simple_list.h" + /* Parallel processing stuff */ typedef struct ParallelSlot { PGconn *connection; /* One connection */ + SimpleStringListCell *cell; /* Next item to process if any */ bool isFree; /* Is it known to be idle? */ } ParallelSlot; extern ParallelSlot *GetIdleSlot(ParallelSlot slots[], int numslots, - const char *progname); + const char *progname, bool pending_only); extern bool ProcessQueryResult(PGconn *conn, PGresult *result, const char *progname); diff --git a/src/bin/scripts/reindexdb.c b/src/bin/scripts/reindexdb.c index 3528de21e0..f8a78a1b4a 100644 --- a/src/bin/scripts/reindexdb.c +++ b/src/bin/scripts/reindexdb.c @@ -12,8 +12,10 @@ #include "postgres_fe.h" #include "common.h" #include "common/logging.h" +#include "fe_utils/connect.h" #include "fe_utils/simple_list.h" #include "fe_utils/string_utils.h" +#include "parallel.h" typedef enum ReindexType { @@ -25,16 +27,29 @@ typedef enum ReindexType } ReindexType; -static void reindex_one_database(const char *name, const char *dbname, - ReindexType type, const char *host, +static ReindexType get_parallel_object_list(PGconn *conn, ReindexType type, + SimpleStringList *user_list, + SimplePtrList * process_list, + const char *progname, bool echo); +static void reindex_one_database(const char *dbname, ReindexType type, + SimpleStringList *user_list, const char *host, const char *port, const char *username, enum trivalue prompt_password, const char *progname, - bool echo, bool verbose, bool concurrently); + bool echo, bool verbose, bool concurrently, + int concurrentCons); static void reindex_all_databases(const char *maintenance_db, const char *host, const char *port, const char *username, enum trivalue prompt_password, const char *progname, bool echo, - bool quiet, bool verbose, bool concurrently); + bool quiet, bool verbose, bool concurrently, + int concurrentCons); +static void run_reindex_command(PGconn *conn, ReindexType type, + const char *name, const char *progname, bool echo, + bool verbose, bool concurrently, bool async); +static void slot_process_item(ParallelSlot *slot, int *pending_conn, + ReindexType process_type, const char *progname, bool echo, + bool verbose, bool concurrently, bool parallel); + static void help(const char *progname); int @@ -54,6 +69,7 @@ main(int argc, char *argv[]) {"system", no_argument, NULL, 's'}, {"table", required_argument, NULL, 't'}, {"index", required_argument, NULL, 'i'}, + {"jobs", required_argument, NULL, 'j'}, {"verbose", no_argument, NULL, 'v'}, {"concurrently", no_argument, NULL, 1}, {"maintenance-db", required_argument, NULL, 2}, @@ -79,6 +95,9 @@ main(int argc, char *argv[]) SimpleStringList indexes = {NULL, NULL}; SimpleStringList tables = {NULL, NULL}; SimpleStringList schemas = {NULL, NULL}; + int concurrentCons = 1; + int tbl_count = 0, + nsp_count = 0; pg_logging_init(argv[0]); progname = get_progname(argv[0]); @@ -87,7 +106,7 @@ main(int argc, char *argv[]) handle_help_version_opts(argc, argv, "reindexdb", help); /* process command-line options */ - while ((c = getopt_long(argc, argv, "h:p:U:wWeqS:d:ast:i:v", long_options, &optindex)) != -1) + while ((c = getopt_long(argc, argv, "h:p:U:wWeqS:d:ast:i:j:v", long_options, &optindex)) != -1) { switch (c) { @@ -114,6 +133,7 @@ main(int argc, char *argv[]) break; case 'S': simple_string_list_append(&schemas, optarg); + nsp_count++; break; case 'd': dbname = pg_strdup(optarg); @@ -126,10 +146,25 @@ main(int argc, char *argv[]) break; case 't': simple_string_list_append(&tables, optarg); + tbl_count++; break; case 'i': simple_string_list_append(&indexes, optarg); break; + case 'j': + concurrentCons = atoi(optarg); + if (concurrentCons <= 0) + { + pg_log_error("number of parallel jobs must be at least 1"); + exit(1); + } + if (concurrentCons > FD_SETSIZE - 1) + { + pg_log_error("too many parallel jobs requested (maximum: %d)", + FD_SETSIZE - 1); + exit(1); + } + break; case 'v': verbose = true; break; @@ -194,7 +229,8 @@ main(int argc, char *argv[]) } reindex_all_databases(maintenance_db, host, port, username, - prompt_password, progname, echo, quiet, verbose, concurrently); + prompt_password, progname, echo, quiet, verbose, + concurrently, 1); } else if (syscatalog) { @@ -214,6 +250,12 @@ main(int argc, char *argv[]) exit(1); } + if (concurrentCons > 1) + { + pg_log_error("cannot use multiple jobs to reindex system catalogs"); + exit(1); + } + if (dbname == NULL) { if (getenv("PGDATABASE")) @@ -224,9 +266,9 @@ main(int argc, char *argv[]) dbname = get_user_name_or_exit(progname); } - reindex_one_database(NULL, dbname, REINDEX_SYSTEM, host, + reindex_one_database(dbname, REINDEX_SYSTEM, NULL, host, port, username, prompt_password, progname, - echo, verbose, concurrently); + echo, verbose, concurrently, 1); } else { @@ -241,61 +283,57 @@ main(int argc, char *argv[]) } if (schemas.head != NULL) - { - SimpleStringListCell *cell; - - for (cell = schemas.head; cell; cell = cell->next) - { - reindex_one_database(cell->val, dbname, REINDEX_SCHEMA, host, - port, username, prompt_password, progname, - echo, verbose, concurrently); - } - } + reindex_one_database(dbname, REINDEX_SCHEMA, &schemas, host, + port, username, prompt_password, progname, + echo, verbose, concurrently, + Min(concurrentCons, nsp_count)); if (indexes.head != NULL) - { - SimpleStringListCell *cell; - for (cell = indexes.head; cell; cell = cell->next) - { - reindex_one_database(cell->val, dbname, REINDEX_INDEX, host, - port, username, prompt_password, progname, - echo, verbose, concurrently); - } - } - if (tables.head != NULL) - { - SimpleStringListCell *cell; + /* + * The number of threads will be checked in the function, as it's + * depending on the number of underlying tables + */ + reindex_one_database(dbname, REINDEX_INDEX, &indexes, host, + port, username, prompt_password, progname, + echo, verbose, concurrently, + concurrentCons); - for (cell = tables.head; cell; cell = cell->next) - { - reindex_one_database(cell->val, dbname, REINDEX_TABLE, host, - port, username, prompt_password, progname, - echo, verbose, concurrently); - } - } + if (tables.head != NULL) + reindex_one_database(dbname, REINDEX_TABLE, &tables, host, + port, username, prompt_password, progname, + echo, verbose, concurrently, + Min(concurrentCons, tbl_count)); /* * reindex database only if neither index nor table nor schema is * specified */ if (indexes.head == NULL && tables.head == NULL && schemas.head == NULL) - reindex_one_database(NULL, dbname, REINDEX_DATABASE, host, + reindex_one_database(dbname, REINDEX_DATABASE, NULL, host, port, username, prompt_password, progname, - echo, verbose, concurrently); + echo, verbose, concurrently, concurrentCons); } exit(0); } static void -reindex_one_database(const char *name, const char *dbname, ReindexType type, - const char *host, const char *port, const char *username, +reindex_one_database(const char *dbname, ReindexType type, + SimpleStringList *user_list, const char *host, + const char *port, const char *username, enum trivalue prompt_password, const char *progname, bool echo, - bool verbose, bool concurrently) + bool verbose, bool concurrently, int concurrentCons) { - PQExpBufferData sql; PGconn *conn; + bool parallel = concurrentCons > 1; + ReindexType process_type = type; + SimplePtrList process_list = {0, NULL, NULL}; + SimplePtrListCell *cell; + ParallelSlot *slots; + int i, + pending_conn; + bool failed = false; conn = connectDatabase(dbname, host, port, username, prompt_password, progname, echo, false, false); @@ -308,6 +346,173 @@ reindex_one_database(const char *name, const char *dbname, ReindexType type, exit(1); } + if (!parallel) + { + if (user_list) + { + /* + * In non parallel mode, if the user provided a list, just use it + * as-is + */ + simple_ptr_list_append(&process_list, user_list); + } + else + { + /* + * Otherwise, create a dummy list with an empty string, as user + * requires an element. + */ + SimpleStringList *dummy = palloc0(sizeof(SimpleStringList)); + + simple_string_list_append(dummy, ""); + simple_ptr_list_append(&process_list, dummy); + } + } + else + { + /* + * Database-wide parallel reindex requires special processing. If + * multiple jobs were asked, we have to reindex system catalogs first, + * as they can't be processed in parallel. + */ + if (type == REINDEX_DATABASE) + { + run_reindex_command(conn, REINDEX_SYSTEM, NULL, progname, echo, verbose, + concurrently, false); + } + + /* Get the list of objects to process */ + process_type = get_parallel_object_list(conn, type, user_list, + &process_list, progname, echo); + } + + /* Lower down the number of required connections if needed. */ + concurrentCons = Min(concurrentCons, process_list.size); + + /* If no object was found, we're done. */ + if (concurrentCons <= 0) + return; + if (concurrentCons == 1) + parallel = false; + + slots = (ParallelSlot *) pg_malloc(sizeof(ParallelSlot) * concurrentCons); + init_slot(slots, conn); + if (parallel) + { + for (i = 1; i < concurrentCons; i++) + { + conn = connectDatabase(dbname, host, port, username, prompt_password, + progname, echo, false, true); + init_slot(slots + i, conn); + } + } + + pending_conn = 0; + cell = process_list.head; + do + { + ParallelSlot *free_slot = NULL; + SimpleStringList *cur = (SimpleStringList *) cell->val; + + if (CancelRequested) + { + failed = true; + goto finish; + } + + /* + * Get the connection slot to use. If in parallel mode, here we wait + * for one connection to become available if none already is. In + * non-parallel mode we simply use the only slot we have, which we + * know to be free. + */ + if (parallel) + { + /* + * Get a free slot, waiting until one becomes free if none + * currently is. + */ + free_slot = GetIdleSlot(slots, concurrentCons, progname, false); + if (!free_slot) + { + failed = true; + goto finish; + } + + free_slot->isFree = false; + } + else + free_slot = slots; + + /* + * If the idle slot found as done processing its object list, we can + * pop an item from the global processing list and affect it to this + * slot + */ + if (free_slot->cell == NULL) + { + free_slot->cell = cur->head; + cell = cell->next; + pending_conn++; + } + + /* Consume an item for this slot's list */ + slot_process_item(free_slot, &pending_conn, process_type, progname, + echo, verbose, concurrently, parallel); + } while (cell != NULL); + + /* + * We now have walked through all the global processing list. We still + * have to make sure that all slots are done processing their local lists. + */ + while (pending_conn > 0) + { + ParallelSlot *free_slot = NULL; + + free_slot = GetIdleSlot(slots, concurrentCons, progname, true); + if (!free_slot) + { + failed = true; + goto finish; + } + + slot_process_item(free_slot, &pending_conn, process_type, progname, + echo, verbose, concurrently, parallel); + + } + + if (parallel) + { + int j; + + /* wait for all connections to finish */ + for (j = 0; j < concurrentCons; j++) + { + if (!GetQueryResult((slots + j)->connection, progname)) + { + failed = true; + goto finish; + } + } + } + +finish: + for (i = 0; i < concurrentCons; i++) + DisconnectDatabase(slots + i); + pfree(slots); + + if (failed) + exit(1); +} + +static void +run_reindex_command(PGconn *conn, ReindexType type, const char *name, + const char *progname, bool echo, bool verbose, + bool concurrently, bool async) +{ + PQExpBufferData sql; + bool status; + /* build the REINDEX query */ initPQExpBuffer(&sql); @@ -358,7 +563,17 @@ reindex_one_database(const char *name, const char *dbname, ReindexType type, /* finish the query */ appendPQExpBufferChar(&sql, ';'); - if (!executeMaintenanceCommand(conn, sql.data, echo)) + if (async) + { + if (echo) + printf("%s\n", sql.data); + + status = PQsendQuery(conn, sql.data) == 1; + } + else + status = executeMaintenanceCommand(conn, sql.data, echo); + + if (!status) { switch (type) { @@ -383,20 +598,249 @@ reindex_one_database(const char *name, const char *dbname, ReindexType type, name, PQdb(conn), PQerrorMessage(conn)); break; } - PQfinish(conn); - exit(1); + if (!async) + { + PQfinish(conn); + exit(1); + } } - PQfinish(conn); termPQExpBuffer(&sql); } +/* + * Prepare the list of objects to process by querying the catalogs. + * + * This function will fill the given process_list list with SimpleStringList + * objects, filtered by the list of objects that the script was provided with + * if any. + * Each SimpleStringList describes objects that can be processed by + * multiple connections. This is required as multiple indexes belonging to the + * same table cannot be processed in parallel. + */ +static ReindexType +get_parallel_object_list(PGconn *conn, ReindexType type, + SimpleStringList *user_list, + SimplePtrList * process_list, const char *progname, + bool echo) +{ + ReindexType process_type = type; + PQExpBufferData buf; + PQExpBufferData catalog_query; + int i; + + Assert(process_list->size == 0); + + if (type == REINDEX_DATABASE) + { + PGresult *res; + SimpleStringList *tables; + int ntups; + + Assert(user_list == NULL); + + process_type = REINDEX_TABLE; + + initPQExpBuffer(&catalog_query); + + /* + * This query is run using a safe search_path, so there's no need to + * fully qualify everything. + */ + appendPQExpBuffer(&catalog_query, + "SELECT c.relname, ns.nspname\n" + " FROM pg_catalog.pg_class c\n" + " JOIN pg_catalog.pg_namespace ns" + " ON c.relnamespace = ns.oid\n" + " WHERE ns.nspname != 'pg_catalog'\n" + " AND c.relkind IN (" + CppAsString2(RELKIND_RELATION) ", " + CppAsString2(RELKIND_MATVIEW) ")\n" + " ORDER BY c.relpages DESC;"); + + res = executeQuery(conn, catalog_query.data, progname, echo); + termPQExpBuffer(&catalog_query); + + /* + * If no rows are returned, there are no matching tables, so we are + * done. + */ + ntups = PQntuples(res); + if (ntups == 0) + { + PQclear(res); + PQfinish(conn); + return process_type; + } + + /* Build qualified identifiers for each table */ + initPQExpBuffer(&buf); + for (i = 0; i < ntups; i++) + { + tables = pg_malloc0(sizeof(SimpleStringList)); + + appendPQExpBufferStr(&buf, + fmtQualifiedId(PQgetvalue(res, i, 1), + PQgetvalue(res, i, 0))); + + simple_string_list_append(tables, buf.data); + simple_ptr_list_append(process_list, tables); + resetPQExpBuffer(&buf); + } + termPQExpBuffer(&buf); + PQclear(res); + } + else if (type == REINDEX_INDEX) + { + SimpleStringList *indexes = NULL; + SimpleStringListCell *cell; + PGresult *res; + int ntups; + char *prev_rel = NULL; + + Assert(user_list != NULL); + + initPQExpBuffer(&catalog_query); + + /* + * Since we execute the constructed query with the default search_path + * (which could be unsafe), everything in this query MUST be fully + * qualified. Note that if the user provided catalog indexes, those + * would be processed in parallel with other indexes, with all the + * locking issues that would be implied. + */ + appendPQExpBuffer(&catalog_query, + "SELECT ic.relname, ns.nspname, tc.oid\n" + " FROM pg_catalog.pg_index i\n" + " JOIN pg_catalog.pg_class ic" + " ON i.indexrelid OPERATOR(pg_catalog.=) ic.oid\n" + " JOIN pg_catalog.pg_namespace ns" + " ON ic.relnamespace OPERATOR(pg_catalog.=) ns.oid\n" + " JOIN pg_catalog.pg_class tc" + " ON i.indrelid OPERATOR(pg_catalog.=) tc.oid\n" + " WHERE i.indexrelid OPERATOR(pg_catalog.=) ANY (array[\n"); + + for (cell = user_list->head; cell; cell = cell->next) + { + if (cell != user_list->head) + appendPQExpBuffer(&catalog_query, ", "); + + appendStringLiteralConn(&catalog_query, cell->val, conn); + appendPQExpBuffer(&catalog_query, "::pg_catalog.regclass"); + } + + /* + * We try to get the biggest indexes first so they're processed + * earlier. We require that all indexes belonging to the same table + * are contiguous, so we can only order by the underlying table size. + * We also need to order by the underlying table oid to make sure that + * indexes belonging to different tables of the same size are still + * correctly ordered. + */ + appendPQExpBuffer(&catalog_query, + "\n])\n" + " ORDER BY tc.relpages DESC, tc.oid;"); + + executeCommand(conn, "RESET search_path;", progname, echo); + res = executeQuery(conn, catalog_query.data, progname, echo); + termPQExpBuffer(&catalog_query); + PQclear(executeQuery(conn, ALWAYS_SECURE_SEARCH_PATH_SQL, + progname, echo)); + + /* + * If no rows are returned, there are no matching tables, so we are + * done. + */ + ntups = PQntuples(res); + if (ntups == 0) + { + PQclear(res); + PQfinish(conn); + return process_type; + } + + indexes = pg_malloc0(sizeof(SimpleStringList)); + + /* Build a list of qualified index name, aggregated per table */ + for (i = 0; i < ntups; i++) + { + if (!prev_rel) + prev_rel = pg_strdup(PQgetvalue(res, i, 2)); + + if (strcmp(PQgetvalue(res, i, 2), prev_rel) != 0) + { + simple_ptr_list_append(process_list, indexes); + + indexes = pg_malloc0(sizeof(SimpleStringList)); + pg_free(prev_rel); + prev_rel = pg_strdup(PQgetvalue(res, i, 2)); + } + + simple_string_list_append(indexes, + fmtQualifiedId(PQgetvalue(res, i, 1), + PQgetvalue(res, i, 0))); + } + simple_ptr_list_append(process_list, indexes); + + PQclear(res); + } + + /* + * Any other object list is safe to dispatch a-is. Note that if the user + * provided catalog tables, those would be processed in parallel with + * other tables, with all the locking issues that would be implied. + */ + else + { + SimpleStringList *res; + SimpleStringListCell *cell; + + Assert(user_list != NULL); + + for (cell = user_list->head; cell; cell = cell->next) + { + res = pg_malloc0(sizeof(SimpleStringList)); + + simple_string_list_append(res, cell->val); + simple_ptr_list_append(process_list, res); + } + } + + return process_type; +} + +static void +slot_process_item(ParallelSlot *slot, int *pending_conn, + ReindexType process_type, const char *progname, bool echo, + bool verbose, bool concurrently, bool parallel) +{ + Assert(slot->cell); + Assert(*pending_conn > 0); + + /* + * Process one item of current slot local list and advance it. If not in + * parallel mode, this terminates the program in case of an error. (The + * parallel case handles query errors in ProcessQueryResult through + * GetIdleSlot.) + */ + run_reindex_command(slot->connection, process_type, + slot->cell->val, + progname, echo, verbose, concurrently, + parallel); + + slot->cell = slot->cell->next; + + /* Check if that was the last item of this list */ + if (!slot->cell) + (*pending_conn)--; +} + static void reindex_all_databases(const char *maintenance_db, const char *host, const char *port, const char *username, enum trivalue prompt_password, const char *progname, bool echo, bool quiet, bool verbose, - bool concurrently) + bool concurrently, int concurrentCons) { PGconn *conn; PGresult *result; @@ -423,9 +867,10 @@ reindex_all_databases(const char *maintenance_db, appendPQExpBuffer(&connstr, "dbname="); appendConnStrVal(&connstr, dbname); - reindex_one_database(NULL, connstr.data, REINDEX_DATABASE, host, + reindex_one_database(connstr.data, REINDEX_DATABASE, NULL, host, port, username, prompt_password, - progname, echo, verbose, concurrently); + progname, echo, verbose, concurrently, + concurrentCons); } termPQExpBuffer(&connstr); @@ -444,6 +889,7 @@ help(const char *progname) printf(_(" -d, --dbname=DBNAME database to reindex\n")); printf(_(" -e, --echo show the commands being sent to the server\n")); printf(_(" -i, --index=INDEX recreate specific index(es) only\n")); + printf(_(" -j, --jobs=NUM use this many concurrent connections to reindex\n")); printf(_(" -q, --quiet don't write any messages\n")); printf(_(" -s, --system reindex system catalogs\n")); printf(_(" -S, --schema=SCHEMA reindex specific schema(s) only\n")); diff --git a/src/bin/scripts/t/090_reindexdb.pl b/src/bin/scripts/t/090_reindexdb.pl index 1af8ab70ad..a14853b3fc 100644 --- a/src/bin/scripts/t/090_reindexdb.pl +++ b/src/bin/scripts/t/090_reindexdb.pl @@ -3,7 +3,7 @@ use warnings; use PostgresNode; use TestLib; -use Test::More tests => 34; +use Test::More tests => 39; program_help_ok('reindexdb'); program_version_ok('reindexdb'); @@ -77,3 +77,13 @@ $node->command_ok( $node->command_ok( [qw(reindexdb --echo --system dbname=template1)], 'reindexdb system with connection string'); + +# parallel processing +$node->command_fails([qw(vreindexdb -j2 -s)], + 'vreindexdb cannot process systam catalogs in parallel'); +$node->issues_sql_like([qw(vreindexdb -j2)], + qr/statement: REINDEX SYSTEM postgres/, + 'Global and parallel reindex will issue a REINDEX SYSTEM'); +$node->issues_sql_like([qw(vreindexdb -j2)], + qr/statement: REINDEX TABLE public.test1/, + 'Global and parallel reindex will issue per-table REINDEX'); diff --git a/src/bin/scripts/vacuumdb.c b/src/bin/scripts/vacuumdb.c index 80c9341a5b..ad4ba0915c 100644 --- a/src/bin/scripts/vacuumdb.c +++ b/src/bin/scripts/vacuumdb.c @@ -652,7 +652,7 @@ vacuum_one_database(const char *dbname, vacuumingOptions *vacopts, * Get a free slot, waiting until one becomes free if none * currently is. */ - free_slot = GetIdleSlot(slots, concurrentCons, progname); + free_slot = GetIdleSlot(slots, concurrentCons, progname, false); if (!free_slot) { failed = true; -- 2.20.1