*/
#include "postgres_fe.h"
+
+#include "catalog/pg_class_d.h"
#include "common.h"
#include "common/logging.h"
+#include "fe_utils/connect.h"
#include "fe_utils/simple_list.h"
#include "fe_utils/string_utils.h"
+#include "scripts_parallel.h"
typedef enum ReindexType
{
} ReindexType;
-static void reindex_one_database(const char *name, const char *dbname,
- ReindexType type, const char *host,
+static SimpleStringList *get_parallel_object_list(PGconn *conn,
+ ReindexType type,
+ SimpleStringList *user_list,
+ bool echo);
+static void reindex_one_database(const char *dbname, ReindexType type,
+ SimpleStringList *user_list, const char *host,
const char *port, const char *username,
enum trivalue prompt_password, const char *progname,
- bool echo, bool verbose, bool concurrently);
+ bool echo, bool verbose, bool concurrently,
+ int concurrentCons);
static void reindex_all_databases(const char *maintenance_db,
const char *host, const char *port,
const char *username, enum trivalue prompt_password,
const char *progname, bool echo,
- bool quiet, bool verbose, bool concurrently);
+ bool quiet, bool verbose, bool concurrently,
+ int concurrentCons);
+static void run_reindex_command(PGconn *conn, ReindexType type,
+ const char *name, bool echo, bool verbose,
+ bool concurrently, bool async);
+
static void help(const char *progname);
int
{"system", no_argument, NULL, 's'},
{"table", required_argument, NULL, 't'},
{"index", required_argument, NULL, 'i'},
+ {"jobs", required_argument, NULL, 'j'},
{"verbose", no_argument, NULL, 'v'},
{"concurrently", no_argument, NULL, 1},
{"maintenance-db", required_argument, NULL, 2},
SimpleStringList indexes = {NULL, NULL};
SimpleStringList tables = {NULL, NULL};
SimpleStringList schemas = {NULL, NULL};
+ int concurrentCons = 1;
pg_logging_init(argv[0]);
progname = get_progname(argv[0]);
handle_help_version_opts(argc, argv, "reindexdb", help);
/* process command-line options */
- while ((c = getopt_long(argc, argv, "h:p:U:wWeqS:d:ast:i:v", long_options, &optindex)) != -1)
+ while ((c = getopt_long(argc, argv, "h:p:U:wWeqS:d:ast:i:j:v", long_options, &optindex)) != -1)
{
switch (c)
{
case 'i':
simple_string_list_append(&indexes, optarg);
break;
+ case 'j':
+ concurrentCons = atoi(optarg);
+ if (concurrentCons <= 0)
+ {
+ pg_log_error("number of parallel jobs must be at least 1");
+ exit(1);
+ }
+ if (concurrentCons > FD_SETSIZE - 1)
+ {
+ pg_log_error("too many parallel jobs requested (maximum: %d)",
+ FD_SETSIZE - 1);
+ exit(1);
+ }
+ break;
case 'v':
verbose = true;
break;
}
reindex_all_databases(maintenance_db, host, port, username,
- prompt_password, progname, echo, quiet, verbose, concurrently);
+ prompt_password, progname, echo, quiet, verbose,
+ concurrently, concurrentCons);
}
else if (syscatalog)
{
exit(1);
}
+ if (concurrentCons > 1)
+ {
+ pg_log_error("cannot use multiple jobs to reindex system catalogs");
+ exit(1);
+ }
+
if (dbname == NULL)
{
if (getenv("PGDATABASE"))
dbname = get_user_name_or_exit(progname);
}
- reindex_one_database(NULL, dbname, REINDEX_SYSTEM, host,
+ reindex_one_database(dbname, REINDEX_SYSTEM, NULL, host,
port, username, prompt_password, progname,
- echo, verbose, concurrently);
+ echo, verbose, concurrently, 1);
}
else
{
+ /*
+ * Index-level REINDEX is not supported with multiple jobs as we
+ * cannot control the concurrent processing of multiple indexes
+ * depending on the same relation.
+ */
+ if (concurrentCons > 1 && indexes.head != NULL)
+ {
+ pg_log_error("cannot use multiple jobs to reindex indexes");
+ exit(1);
+ }
+
if (dbname == NULL)
{
if (getenv("PGDATABASE"))
}
if (schemas.head != NULL)
- {
- SimpleStringListCell *cell;
-
- for (cell = schemas.head; cell; cell = cell->next)
- {
- reindex_one_database(cell->val, dbname, REINDEX_SCHEMA, host,
- port, username, prompt_password, progname,
- echo, verbose, concurrently);
- }
- }
+ reindex_one_database(dbname, REINDEX_SCHEMA, &schemas, host,
+ port, username, prompt_password, progname,
+ echo, verbose, concurrently, concurrentCons);
if (indexes.head != NULL)
- {
- SimpleStringListCell *cell;
+ reindex_one_database(dbname, REINDEX_INDEX, &indexes, host,
+ port, username, prompt_password, progname,
+ echo, verbose, concurrently, 1);
- for (cell = indexes.head; cell; cell = cell->next)
- {
- reindex_one_database(cell->val, dbname, REINDEX_INDEX, host,
- port, username, prompt_password, progname,
- echo, verbose, concurrently);
- }
- }
if (tables.head != NULL)
- {
- SimpleStringListCell *cell;
-
- for (cell = tables.head; cell; cell = cell->next)
- {
- reindex_one_database(cell->val, dbname, REINDEX_TABLE, host,
- port, username, prompt_password, progname,
- echo, verbose, concurrently);
- }
- }
+ reindex_one_database(dbname, REINDEX_TABLE, &tables, host,
+ port, username, prompt_password, progname,
+ echo, verbose, concurrently,
+ concurrentCons);
/*
* reindex database only if neither index nor table nor schema is
* specified
*/
if (indexes.head == NULL && tables.head == NULL && schemas.head == NULL)
- reindex_one_database(NULL, dbname, REINDEX_DATABASE, host,
+ reindex_one_database(dbname, REINDEX_DATABASE, NULL, host,
port, username, prompt_password, progname,
- echo, verbose, concurrently);
+ echo, verbose, concurrently, concurrentCons);
}
exit(0);
}
static void
-reindex_one_database(const char *name, const char *dbname, ReindexType type,
- const char *host, const char *port, const char *username,
+reindex_one_database(const char *dbname, ReindexType type,
+ SimpleStringList *user_list, const char *host,
+ const char *port, const char *username,
enum trivalue prompt_password, const char *progname, bool echo,
- bool verbose, bool concurrently)
+ bool verbose, bool concurrently, int concurrentCons)
{
- PQExpBufferData sql;
PGconn *conn;
+ SimpleStringListCell *cell;
+ bool parallel = concurrentCons > 1;
+ SimpleStringList *process_list = user_list;
+ ReindexType process_type = type;
+ ParallelSlot *slots;
+ bool failed = false;
+ int items_count = 0;
conn = connectDatabase(dbname, host, port, username, prompt_password,
progname, echo, false, false);
exit(1);
}
+ if (!parallel)
+ {
+ switch (process_type)
+ {
+ case REINDEX_DATABASE:
+ case REINDEX_SYSTEM:
+
+ /*
+ * Database and system reindexes only need to work on the
+ * database itself, so build a list with a single entry.
+ */
+ Assert(user_list == NULL);
+ process_list = pg_malloc0(sizeof(SimpleStringList));
+ simple_string_list_append(process_list, PQdb(conn));
+ break;
+
+ case REINDEX_INDEX:
+ case REINDEX_SCHEMA:
+ case REINDEX_TABLE:
+ Assert(user_list != NULL);
+ break;
+ }
+ }
+ else
+ {
+ switch (process_type)
+ {
+ case REINDEX_DATABASE:
+
+ /*
+ * Database-wide parallel reindex requires special processing.
+ * If multiple jobs were asked, we have to reindex system
+ * catalogs first as they cannot be processed in parallel.
+ */
+ if (concurrently)
+ pg_log_warning("cannot reindex system catalogs concurrently, skipping all");
+ else
+ run_reindex_command(conn, REINDEX_SYSTEM, PQdb(conn), echo,
+ verbose, concurrently, false);
+
+ /* Build a list of relations from the database */
+ process_list = get_parallel_object_list(conn, process_type,
+ user_list, echo);
+ process_type = REINDEX_TABLE;
+
+ /* Bail out if nothing to process */
+ if (process_list == NULL)
+ return;
+ break;
+
+ case REINDEX_SCHEMA:
+ Assert(user_list != NULL);
+
+ /* Build a list of relations from all the schemas */
+ process_list = get_parallel_object_list(conn, process_type,
+ user_list, echo);
+ process_type = REINDEX_TABLE;
+
+ /* Bail out if nothing to process */
+ if (process_list == NULL)
+ return;
+ break;
+
+ case REINDEX_SYSTEM:
+ case REINDEX_INDEX:
+ /* not supported */
+ Assert(false);
+ break;
+
+ case REINDEX_TABLE:
+
+ /*
+ * Fall through. The list of items for tables is already
+ * created.
+ */
+ break;
+ }
+ }
+
+ /*
+ * Adjust the number of concurrent connections depending on the items in
+ * the list. We choose the minimum between the number of concurrent
+ * connections and the number of items in the list.
+ */
+ for (cell = process_list->head; cell; cell = cell->next)
+ {
+ items_count++;
+
+ /* no need to continue if there are more elements than jobs */
+ if (items_count >= concurrentCons)
+ break;
+ }
+ concurrentCons = Min(concurrentCons, items_count);
+ Assert(concurrentCons > 0);
+
+ Assert(process_list != NULL);
+
+ slots = ParallelSlotsSetup(dbname, host, port, username, prompt_password,
+ progname, echo, conn, concurrentCons);
+
+ cell = process_list->head;
+ do
+ {
+ const char *objname = cell->val;
+ ParallelSlot *free_slot = NULL;
+
+ if (CancelRequested)
+ {
+ failed = true;
+ goto finish;
+ }
+
+ free_slot = ParallelSlotsGetIdle(slots, concurrentCons);
+ if (!free_slot)
+ {
+ failed = true;
+ goto finish;
+ }
+
+ run_reindex_command(free_slot->connection, process_type, objname,
+ echo, verbose, concurrently, true);
+
+ cell = cell->next;
+ } while (cell != NULL);
+
+ if (!ParallelSlotsWaitCompletion(slots, concurrentCons))
+ failed = true;
+
+finish:
+ ParallelSlotsTerminate(slots, concurrentCons);
+ pfree(slots);
+
+ if (failed)
+ exit(1);
+}
+
+static void
+run_reindex_command(PGconn *conn, ReindexType type, const char *name,
+ bool echo, bool verbose, bool concurrently, bool async)
+{
+ PQExpBufferData sql;
+ bool status;
+
+ Assert(name);
+
/* build the REINDEX query */
initPQExpBuffer(&sql);
{
case REINDEX_DATABASE:
case REINDEX_SYSTEM:
- appendPQExpBufferStr(&sql, fmtId(PQdb(conn)));
+ appendPQExpBufferStr(&sql, fmtId(name));
break;
case REINDEX_INDEX:
case REINDEX_TABLE:
/* finish the query */
appendPQExpBufferChar(&sql, ';');
- if (!executeMaintenanceCommand(conn, sql.data, echo))
+ if (async)
+ {
+ if (echo)
+ printf("%s\n", sql.data);
+
+ status = PQsendQuery(conn, sql.data) == 1;
+ }
+ else
+ status = executeMaintenanceCommand(conn, sql.data, echo);
+
+ if (!status)
{
switch (type)
{
name, PQdb(conn), PQerrorMessage(conn));
break;
}
- PQfinish(conn);
- exit(1);
+ if (!async)
+ {
+ PQfinish(conn);
+ exit(1);
+ }
}
- PQfinish(conn);
termPQExpBuffer(&sql);
}
+/*
+ * Prepare the list of objects to process by querying the catalogs.
+ *
+ * This function will return a SimpleStringList object containing the entire
+ * list of tables in the given database that should be processed by a parallel
+ * database-wide reindex (excluding system tables), or NULL if there's no such
+ * table.
+ */
+static SimpleStringList *
+get_parallel_object_list(PGconn *conn, ReindexType type,
+ SimpleStringList *user_list, bool echo)
+{
+ PQExpBufferData catalog_query;
+ PQExpBufferData buf;
+ PGresult *res;
+ SimpleStringList *tables;
+ int ntups,
+ i;
+
+ initPQExpBuffer(&catalog_query);
+
+ /*
+ * The queries here are using a safe search_path, so there's no need to
+ * fully qualify everything.
+ */
+ switch (type)
+ {
+ case REINDEX_DATABASE:
+ Assert(user_list == NULL);
+ appendPQExpBuffer(&catalog_query,
+ "SELECT c.relname, ns.nspname\n"
+ " FROM pg_catalog.pg_class c\n"
+ " JOIN pg_catalog.pg_namespace ns"
+ " ON c.relnamespace = ns.oid\n"
+ " WHERE ns.nspname != 'pg_catalog'\n"
+ " AND c.relkind IN ("
+ CppAsString2(RELKIND_RELATION) ", "
+ CppAsString2(RELKIND_MATVIEW) ")\n"
+ " ORDER BY c.relpages DESC;");
+ break;
+
+ case REINDEX_SCHEMA:
+ {
+ SimpleStringListCell *cell;
+ bool nsp_listed = false;
+
+ Assert(user_list != NULL);
+
+ /*
+ * All the tables from all the listed schemas are grabbed at
+ * once.
+ */
+ appendPQExpBuffer(&catalog_query,
+ "SELECT c.relname, ns.nspname\n"
+ " FROM pg_catalog.pg_class c\n"
+ " JOIN pg_catalog.pg_namespace ns"
+ " ON c.relnamespace = ns.oid\n"
+ " WHERE c.relkind IN ("
+ CppAsString2(RELKIND_RELATION) ", "
+ CppAsString2(RELKIND_MATVIEW) ")\n"
+ " AND ns.nspname IN (");
+
+ for (cell = user_list->head; cell; cell = cell->next)
+ {
+ const char *nspname = cell->val;
+
+ if (nsp_listed)
+ appendPQExpBuffer(&catalog_query, ", ");
+ else
+ nsp_listed = true;
+
+ appendStringLiteralConn(&catalog_query, nspname, conn);
+ }
+
+ appendPQExpBuffer(&catalog_query, ")\n"
+ " ORDER BY c.relpages DESC;");
+ }
+ break;
+
+ case REINDEX_SYSTEM:
+ case REINDEX_INDEX:
+ case REINDEX_TABLE:
+ Assert(false);
+ break;
+ }
+
+ res = executeQuery(conn, catalog_query.data, echo);
+ termPQExpBuffer(&catalog_query);
+
+ /*
+ * If no rows are returned, there are no matching tables, so we are done.
+ */
+ ntups = PQntuples(res);
+ if (ntups == 0)
+ {
+ PQclear(res);
+ PQfinish(conn);
+ return NULL;
+ }
+
+ tables = pg_malloc0(sizeof(SimpleStringList));
+
+ /* Build qualified identifiers for each table */
+ initPQExpBuffer(&buf);
+ for (i = 0; i < ntups; i++)
+ {
+ appendPQExpBufferStr(&buf,
+ fmtQualifiedId(PQgetvalue(res, i, 1),
+ PQgetvalue(res, i, 0)));
+
+ simple_string_list_append(tables, buf.data);
+ resetPQExpBuffer(&buf);
+ }
+ termPQExpBuffer(&buf);
+ PQclear(res);
+
+ return tables;
+}
+
static void
reindex_all_databases(const char *maintenance_db,
const char *host, const char *port,
const char *username, enum trivalue prompt_password,
const char *progname, bool echo, bool quiet, bool verbose,
- bool concurrently)
+ bool concurrently, int concurrentCons)
{
PGconn *conn;
PGresult *result;
appendPQExpBufferStr(&connstr, "dbname=");
appendConnStrVal(&connstr, dbname);
- reindex_one_database(NULL, connstr.data, REINDEX_DATABASE, host,
+ reindex_one_database(connstr.data, REINDEX_DATABASE, NULL, host,
port, username, prompt_password,
- progname, echo, verbose, concurrently);
+ progname, echo, verbose, concurrently,
+ concurrentCons);
}
termPQExpBuffer(&connstr);
printf(_(" -d, --dbname=DBNAME database to reindex\n"));
printf(_(" -e, --echo show the commands being sent to the server\n"));
printf(_(" -i, --index=INDEX recreate specific index(es) only\n"));
+ printf(_(" -j, --jobs=NUM use this many concurrent connections to reindex\n"));
printf(_(" -q, --quiet don't write any messages\n"));
printf(_(" -s, --system reindex system catalogs\n"));
printf(_(" -S, --schema=SCHEMA reindex specific schema(s) only\n"));