From a17923204736d8842eade3517d6a8ee81290fca4 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera Date: Fri, 23 Jan 2015 15:02:45 -0300 Subject: [PATCH] vacuumdb: enable parallel mode MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit This mode allows vacuumdb to open several server connections to vacuum or analyze several tables simultaneously. Author: Dilip Kumar. Some reworking by Álvaro Herrera Reviewed by: Jeff Janes, Amit Kapila, Magnus Hagander, Andres Freund --- doc/src/sgml/ref/vacuumdb.sgml | 24 + src/bin/pg_dump/parallel.c | 2 +- src/bin/scripts/common.c | 23 +- src/bin/scripts/common.h | 6 + src/bin/scripts/vacuumdb.c | 797 ++++++++++++++++++++++++++------- 5 files changed, 691 insertions(+), 161 deletions(-) diff --git a/doc/src/sgml/ref/vacuumdb.sgml b/doc/src/sgml/ref/vacuumdb.sgml index 3ecd999981..e38c34aea3 100644 --- a/doc/src/sgml/ref/vacuumdb.sgml +++ b/doc/src/sgml/ref/vacuumdb.sgml @@ -203,6 +203,30 @@ PostgreSQL documentation + + + + + + Execute the vacuum or analyze commands in parallel by running + njobs + commands simultaneously. This option reduces the time of the + processing but it also increases the load on the database server. + + + vacuumdb will open + njobs connections to the + database, so make sure your + setting is high enough to accommodate all connections. + + + Note that using this mode together with the + (FULL) option might cause deadlock failures if + certain system catalogs are processed in parallel. + + + + diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c index d942a75f7c..1bf76114c0 100644 --- a/src/bin/pg_dump/parallel.c +++ b/src/bin/pg_dump/parallel.c @@ -1160,7 +1160,7 @@ select_loop(int maxFd, fd_set *workerset) i = select(maxFd + 1, workerset, NULL, NULL, NULL); /* - * If we Ctrl-C the master process , it's likely that we interrupt + * If we Ctrl-C the master process, it's likely that we interrupt * select() here. The signal handler will set wantAbort == true and * the shutdown journey starts from here. Note that we'll come back * here later when we tell all workers to terminate and read their diff --git a/src/bin/scripts/common.c b/src/bin/scripts/common.c index 6bfe2e628b..da142aaa64 100644 --- a/src/bin/scripts/common.c +++ b/src/bin/scripts/common.c @@ -19,10 +19,9 @@ #include "common.h" -static void SetCancelConn(PGconn *conn); -static void ResetCancelConn(void); static PGcancel *volatile cancelConn = NULL; +bool CancelRequested = false; #ifdef WIN32 static CRITICAL_SECTION cancelConnLock; @@ -291,7 +290,7 @@ yesno_prompt(const char *question) * * Set cancelConn to point to the current database connection. */ -static void +void SetCancelConn(PGconn *conn) { PGcancel *oldCancelConn; @@ -321,7 +320,7 @@ SetCancelConn(PGconn *conn) * * Free the current cancel connection, if any, and set to NULL. */ -static void +void ResetCancelConn(void) { PGcancel *oldCancelConn; @@ -345,9 +344,8 @@ ResetCancelConn(void) #ifndef WIN32 /* - * Handle interrupt signals by canceling the current command, - * if it's being executed through executeMaintenanceCommand(), - * and thus has a cancelConn set. + * Handle interrupt signals by canceling the current command, if a cancelConn + * is set. */ static void handle_sigint(SIGNAL_ARGS) @@ -359,10 +357,15 @@ handle_sigint(SIGNAL_ARGS) if (cancelConn != NULL) { if (PQcancel(cancelConn, errbuf, sizeof(errbuf))) + { + CancelRequested = true; fprintf(stderr, _("Cancel request sent\n")); + } else fprintf(stderr, _("Could not send cancel request: %s"), errbuf); } + else + CancelRequested = true; errno = save_errno; /* just in case the write changed it */ } @@ -392,10 +395,16 @@ consoleHandler(DWORD dwCtrlType) if (cancelConn != NULL) { if (PQcancel(cancelConn, errbuf, sizeof(errbuf))) + { fprintf(stderr, _("Cancel request sent\n")); + CancelRequested = true; + } else fprintf(stderr, _("Could not send cancel request: %s"), errbuf); } + else + CancelRequested = true; + LeaveCriticalSection(&cancelConnLock); return TRUE; diff --git a/src/bin/scripts/common.h b/src/bin/scripts/common.h index c0c1715bc1..b5ce1ed744 100644 --- a/src/bin/scripts/common.h +++ b/src/bin/scripts/common.h @@ -21,6 +21,8 @@ enum trivalue TRI_YES }; +extern bool CancelRequested; + typedef void (*help_handler) (const char *progname); extern void handle_help_version_opts(int argc, char *argv[], @@ -49,4 +51,8 @@ extern bool yesno_prompt(const char *question); extern void setup_cancel_handler(void); +extern void SetCancelConn(PGconn *conn); +extern void ResetCancelConn(void); + + #endif /* COMMON_H */ diff --git a/src/bin/scripts/vacuumdb.c b/src/bin/scripts/vacuumdb.c index 957fdb6e18..506cdc7def 100644 --- a/src/bin/scripts/vacuumdb.c +++ b/src/bin/scripts/vacuumdb.c @@ -11,24 +11,73 @@ */ #include "postgres_fe.h" + #include "common.h" #include "dumputils.h" -static void vacuum_one_database(const char *dbname, bool full, bool verbose, - bool and_analyze, bool analyze_only, bool analyze_in_stages, int stage, bool freeze, - const char *table, const char *host, const char *port, +#define ERRCODE_UNDEFINED_TABLE "42P01" + +/* Parallel vacuuming stuff */ +typedef struct ParallelSlot +{ + PGconn *connection; + pgsocket sock; + bool isFree; +} ParallelSlot; + +/* vacuum options controlled by user flags */ +typedef struct vacuumingOptions +{ + bool analyze_only; + bool verbose; + bool and_analyze; + bool full; + bool freeze; +} vacuumingOptions; + + +static void vacuum_one_database(const char *dbname, vacuumingOptions *vacopts, + int stage, + SimpleStringList *tables, + const char *host, const char *port, const char *username, enum trivalue prompt_password, + int concurrentCons, const char *progname, bool echo, bool quiet); -static void vacuum_all_databases(bool full, bool verbose, bool and_analyze, - bool analyze_only, bool analyze_in_stages, bool freeze, + +static void vacuum_all_databases(vacuumingOptions *vacopts, + bool analyze_in_stages, const char *maintenance_db, const char *host, const char *port, const char *username, enum trivalue prompt_password, + int concurrentCons, const char *progname, bool echo, bool quiet); +static void prepare_vacuum_command(PQExpBuffer sql, PGconn *conn, + vacuumingOptions *vacopts, const char *table); + +static void run_vacuum_command(PGconn *conn, const char *sql, bool echo, + const char *dbname, const char *table, + const char *progname, bool async); + +static ParallelSlot *GetIdleSlot(ParallelSlot slots[], int numslots, + const char *dbname, const char *progname); + +static bool GetQueryResult(PGconn *conn, const char *dbname, + const char *progname); + +static void DisconnectDatabase(ParallelSlot *slot); + +static int select_loop(int maxFd, fd_set *workerset, bool *aborting); + +static void init_slot(ParallelSlot *slot, PGconn *conn); + static void help(const char *progname); +/* For analyze-in-stages mode */ +#define ANALYZE_NO_STAGE -1 +#define ANALYZE_NUM_STAGES 3 + int main(int argc, char *argv[]) @@ -49,6 +98,7 @@ main(int argc, char *argv[]) {"table", required_argument, NULL, 't'}, {"full", no_argument, NULL, 'f'}, {"verbose", no_argument, NULL, 'v'}, + {"jobs", required_argument, NULL, 'j'}, {"maintenance-db", required_argument, NULL, 2}, {"analyze-in-stages", no_argument, NULL, 3}, {NULL, 0, NULL, 0} @@ -57,7 +107,6 @@ main(int argc, char *argv[]) const char *progname; int optindex; int c; - const char *dbname = NULL; const char *maintenance_db = NULL; char *host = NULL; @@ -66,21 +115,23 @@ main(int argc, char *argv[]) enum trivalue prompt_password = TRI_DEFAULT; bool echo = false; bool quiet = false; - bool and_analyze = false; - bool analyze_only = false; + vacuumingOptions vacopts; bool analyze_in_stages = false; - bool freeze = false; bool alldb = false; - bool full = false; - bool verbose = false; SimpleStringList tables = {NULL, NULL}; + int concurrentCons = 1; + int tbl_count = 0; + + /* initialize options to all false */ + memset(&vacopts, 0, sizeof(vacopts)); progname = get_progname(argv[0]); + set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pgscripts")); handle_help_version_opts(argc, argv, "vacuumdb", help); - while ((c = getopt_long(argc, argv, "h:p:U:wWeqd:zZFat:fv", long_options, &optindex)) != -1) + while ((c = getopt_long(argc, argv, "h:p:U:wWeqd:zZFat:fvj:", long_options, &optindex)) != -1) { switch (c) { @@ -109,31 +160,49 @@ main(int argc, char *argv[]) dbname = pg_strdup(optarg); break; case 'z': - and_analyze = true; + vacopts.and_analyze = true; break; case 'Z': - analyze_only = true; + vacopts.analyze_only = true; break; case 'F': - freeze = true; + vacopts.freeze = true; break; case 'a': alldb = true; break; case 't': - simple_string_list_append(&tables, optarg); - break; + { + simple_string_list_append(&tables, optarg); + tbl_count++; + break; + } case 'f': - full = true; + vacopts.full = true; break; case 'v': - verbose = true; + vacopts.verbose = true; + break; + case 'j': + concurrentCons = atoi(optarg); + if (concurrentCons <= 0) + { + fprintf(stderr, _("%s: number of parallel \"jobs\" must be at least 1\n"), + progname); + exit(1); + } + if (concurrentCons > FD_SETSIZE - 1) + { + fprintf(stderr, _("%s: too many parallel jobs requested (maximum: %d)\n"), + progname, FD_SETSIZE - 1); + exit(1); + } break; case 2: maintenance_db = pg_strdup(optarg); break; case 3: - analyze_in_stages = analyze_only = true; + analyze_in_stages = vacopts.analyze_only = true; break; default: fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname); @@ -141,7 +210,6 @@ main(int argc, char *argv[]) } } - /* * Non-option argument specifies database name as long as it wasn't * already specified with -d / --dbname @@ -160,18 +228,18 @@ main(int argc, char *argv[]) exit(1); } - if (analyze_only) + if (vacopts.analyze_only) { - if (full) + if (vacopts.full) { - fprintf(stderr, _("%s: cannot use the \"full\" option when performing only analyze\n"), - progname); + fprintf(stderr, _("%s: cannot use the \"%s\" option when performing only analyze\n"), + progname, "full"); exit(1); } - if (freeze) + if (vacopts.freeze) { - fprintf(stderr, _("%s: cannot use the \"freeze\" option when performing only analyze\n"), - progname); + fprintf(stderr, _("%s: cannot use the \"%s\" option when performing only analyze\n"), + progname, "freeze"); exit(1); } /* allow 'and_analyze' with 'analyze_only' */ @@ -179,6 +247,10 @@ main(int argc, char *argv[]) setup_cancel_handler(); + /* Avoid opening extra connections. */ + if (tbl_count && (concurrentCons > tbl_count)) + concurrentCons = tbl_count; + if (alldb) { if (dbname) @@ -194,9 +266,12 @@ main(int argc, char *argv[]) exit(1); } - vacuum_all_databases(full, verbose, and_analyze, analyze_only, analyze_in_stages, freeze, - maintenance_db, host, port, username, - prompt_password, progname, echo, quiet); + vacuum_all_databases(&vacopts, + analyze_in_stages, + maintenance_db, + host, port, username, prompt_password, + concurrentCons, + progname, echo, quiet); } else { @@ -210,213 +285,628 @@ main(int argc, char *argv[]) dbname = get_user_name_or_exit(progname); } - if (tables.head != NULL) + if (analyze_in_stages) { - SimpleStringListCell *cell; + int stage; - for (cell = tables.head; cell; cell = cell->next) + for (stage = 0; stage < ANALYZE_NUM_STAGES; stage++) { - vacuum_one_database(dbname, full, verbose, and_analyze, - analyze_only, analyze_in_stages, -1, - freeze, cell->val, + vacuum_one_database(dbname, &vacopts, + stage, + &tables, host, port, username, prompt_password, + concurrentCons, progname, echo, quiet); } } else - vacuum_one_database(dbname, full, verbose, and_analyze, - analyze_only, analyze_in_stages, -1, - freeze, NULL, + vacuum_one_database(dbname, &vacopts, + ANALYZE_NO_STAGE, + &tables, host, port, username, prompt_password, + concurrentCons, progname, echo, quiet); } exit(0); } - +/* + * vacuum_one_database + * + * Process tables in the given database. If the 'tables' list is empty, + * process all tables in the database. + * + * Note that this function is only concerned with running exactly one stage + * when in analyze-in-stages mode; caller must iterate on us if necessary. + * + * If concurrentCons is > 1, multiple connections are used to vacuum tables + * in parallel. In this case and if the table list is empty, we first obtain + * a list of tables from the database. + */ static void -run_vacuum_command(PGconn *conn, const char *sql, bool echo, const char *dbname, const char *table, const char *progname) +vacuum_one_database(const char *dbname, vacuumingOptions *vacopts, + int stage, + SimpleStringList *tables, + const char *host, const char *port, + const char *username, enum trivalue prompt_password, + int concurrentCons, + const char *progname, bool echo, bool quiet) { - if (!executeMaintenanceCommand(conn, sql, echo)) + PQExpBufferData sql; + PGconn *conn; + SimpleStringListCell *cell; + ParallelSlot *slots = NULL; + SimpleStringList dbtables = {NULL, NULL}; + int i; + bool result = 0; + bool parallel = concurrentCons > 1; + const char *stage_commands[] = { + "SET default_statistics_target=1; SET vacuum_cost_delay=0;", + "SET default_statistics_target=10; RESET vacuum_cost_delay;", + "RESET default_statistics_target;" + }; + const char *stage_messages[] = { + gettext_noop("Generating minimal optimizer statistics (1 target)"), + gettext_noop("Generating medium optimizer statistics (10 targets)"), + gettext_noop("Generating default (full) optimizer statistics") + }; + + Assert(stage == ANALYZE_NO_STAGE || + (stage >= 0 && stage < ANALYZE_NUM_STAGES)); + + if (!quiet) { - if (table) - fprintf(stderr, _("%s: vacuuming of table \"%s\" in database \"%s\" failed: %s"), - progname, table, dbname, PQerrorMessage(conn)); + if (stage != ANALYZE_NO_STAGE) + printf(_("%s: processing database \"%s\": %s\n"), progname, dbname, + stage_messages[stage]); else - fprintf(stderr, _("%s: vacuuming of database \"%s\" failed: %s"), - progname, dbname, PQerrorMessage(conn)); - PQfinish(conn); - exit(1); + printf(_("%s: vacuuming database \"%s\"\n"), progname, dbname); + fflush(stdout); } -} + conn = connectDatabase(dbname, host, port, username, prompt_password, + progname, false); + + initPQExpBuffer(&sql); + + /* + * If a table list is not provided and we're using multiple connections, + * prepare the list of tables by querying the catalogs. + */ + if (parallel && (!tables || !tables->head)) + { + PQExpBufferData buf; + PGresult *res; + int ntups; + int i; + + initPQExpBuffer(&buf); + res = executeQuery(conn, + "SELECT c.relname, ns.nspname FROM pg_class c, pg_namespace ns\n" + " WHERE relkind IN (\'r\', \'m\') AND c.relnamespace = ns.oid\n" + " ORDER BY c.relpages DESC;", + progname, echo); + + ntups = PQntuples(res); + for (i = 0; i < ntups; i++) + { + appendPQExpBuffer(&buf, "%s", + fmtQualifiedId(PQserverVersion(conn), + PQgetvalue(res, i, 1), + PQgetvalue(res, i, 0))); + + simple_string_list_append(&dbtables, buf.data); + resetPQExpBuffer(&buf); + } + + termPQExpBuffer(&buf); + tables = &dbtables; + + /* + * If there are more connections than vacuumable relations, we don't + * need to use them all. + */ + if (concurrentCons > ntups) + concurrentCons = ntups; + if (concurrentCons <= 1) + parallel = false; + } + + /* + * Setup the database connections. We reuse the connection we already have + * for the first slot. If not in parallel mode, the first slot in the + * array contains the connection. + */ + slots = (ParallelSlot *) pg_malloc(sizeof(ParallelSlot) * concurrentCons); + init_slot(slots, conn); + if (parallel) + { + for (i = 1; i < concurrentCons; i++) + { + conn = connectDatabase(dbname, host, port, username, prompt_password, + progname, false); + init_slot(slots + i, conn); + } + } + + /* + * Prepare all the connections to run the appropriate analyze stage, if + * caller requested that mode. + */ + if (stage != ANALYZE_NO_STAGE) + { + int j; + + /* We already emitted the message above */ + + for (j = 0; j < concurrentCons; j++) + executeCommand((slots + j)->connection, + stage_commands[stage], progname, echo); + } + + cell = tables ? tables->head : NULL; + do + { + ParallelSlot *free_slot; + const char *tabname = cell ? cell->val : NULL; + + prepare_vacuum_command(&sql, conn, vacopts, tabname); + + if (CancelRequested) + { + result = -1; + goto finish; + } + + /* + * Get the connection slot to use. If in parallel mode, here we wait + * for one connection to become available if none already is. In + * non-parallel mode we simply use the only slot we have, which we + * know to be free. + */ + if (parallel) + { + /* + * Get a free slot, waiting until one becomes free if none + * currently is. + */ + free_slot = GetIdleSlot(slots, concurrentCons, dbname, progname); + if (!free_slot) + { + result = -1; + goto finish; + } + + free_slot->isFree = false; + } + else + free_slot = slots; + + run_vacuum_command(free_slot->connection, sql.data, + echo, dbname, tabname, progname, parallel); + + if (cell) + cell = cell->next; + } while (cell != NULL); + + if (parallel) + { + int j; + + for (j = 0; j < concurrentCons; j++) + { + /* wait for all connection to return the results */ + if (!GetQueryResult((slots + j)->connection, dbname, progname)) + goto finish; + + (slots + j)->isFree = true; + } + } + +finish: + for (i = 0; i < concurrentCons; i++) + DisconnectDatabase(slots + i); + pfree(slots); + + termPQExpBuffer(&sql); + + if (result == -1) + exit(1); +} + +/* + * Vacuum/analyze all connectable databases. + * + * In analyze-in-stages mode, we process all databases in one stage before + * moving on to the next stage. That ensure minimal stats are available + * quickly everywhere before generating more detailed ones. + */ static void -vacuum_one_database(const char *dbname, bool full, bool verbose, bool and_analyze, - bool analyze_only, bool analyze_in_stages, int stage, bool freeze, const char *table, - const char *host, const char *port, - const char *username, enum trivalue prompt_password, - const char *progname, bool echo, bool quiet) +vacuum_all_databases(vacuumingOptions *vacopts, + bool analyze_in_stages, + const char *maintenance_db, const char *host, + const char *port, const char *username, + enum trivalue prompt_password, + int concurrentCons, + const char *progname, bool echo, bool quiet) { - PQExpBufferData sql; - PGconn *conn; + PGresult *result; + int stage; + int i; - initPQExpBuffer(&sql); + conn = connectMaintenanceDatabase(maintenance_db, host, port, + username, prompt_password, progname); + result = executeQuery(conn, + "SELECT datname FROM pg_database WHERE datallowconn ORDER BY 1;", + progname, echo); + PQfinish(conn); - conn = connectDatabase(dbname, host, port, username, prompt_password, - progname, false); + if (analyze_in_stages) + { + /* + * When analyzing all databases in stages, we analyze them all in the + * fastest stage first, so that initial statistics become available + * for all of them as soon as possible. + * + * This means we establish several times as many connections, but + * that's a secondary consideration. + */ + for (stage = 0; stage < ANALYZE_NUM_STAGES; stage++) + { + for (i = 0; i < PQntuples(result); i++) + { + const char *dbname; + + dbname = PQgetvalue(result, i, 0); + vacuum_one_database(dbname, vacopts, + stage, + NULL, + host, port, username, prompt_password, + concurrentCons, + progname, echo, quiet); + } + } + } + else + { + for (i = 0; i < PQntuples(result); i++) + { + const char *dbname; - if (analyze_only) + dbname = PQgetvalue(result, i, 0); + vacuum_one_database(dbname, vacopts, + ANALYZE_NO_STAGE, + NULL, + host, port, username, prompt_password, + concurrentCons, + progname, echo, quiet); + } + } + + PQclear(result); +} + +/* + * Construct a vacuum/analyze command to run based on the given options, in the + * given string buffer, which may contain previous garbage. + * + * An optional table name can be passed; this must be already be properly + * quoted. The command is semicolon-terminated. + */ +static void +prepare_vacuum_command(PQExpBuffer sql, PGconn *conn, vacuumingOptions *vacopts, + const char *table) +{ + resetPQExpBuffer(sql); + + if (vacopts->analyze_only) { - appendPQExpBufferStr(&sql, "ANALYZE"); - if (verbose) - appendPQExpBufferStr(&sql, " VERBOSE"); + appendPQExpBufferStr(sql, "ANALYZE"); + if (vacopts->verbose) + appendPQExpBufferStr(sql, " VERBOSE"); } else { - appendPQExpBufferStr(&sql, "VACUUM"); + appendPQExpBufferStr(sql, "VACUUM"); if (PQserverVersion(conn) >= 90000) { const char *paren = " ("; const char *comma = ", "; const char *sep = paren; - if (full) + if (vacopts->full) { - appendPQExpBuffer(&sql, "%sFULL", sep); + appendPQExpBuffer(sql, "%sFULL", sep); sep = comma; } - if (freeze) + if (vacopts->freeze) { - appendPQExpBuffer(&sql, "%sFREEZE", sep); + appendPQExpBuffer(sql, "%sFREEZE", sep); sep = comma; } - if (verbose) + if (vacopts->verbose) { - appendPQExpBuffer(&sql, "%sVERBOSE", sep); + appendPQExpBuffer(sql, "%sVERBOSE", sep); sep = comma; } - if (and_analyze) + if (vacopts->and_analyze) { - appendPQExpBuffer(&sql, "%sANALYZE", sep); + appendPQExpBuffer(sql, "%sANALYZE", sep); sep = comma; } if (sep != paren) - appendPQExpBufferStr(&sql, ")"); + appendPQExpBufferStr(sql, ")"); } else { - if (full) - appendPQExpBufferStr(&sql, " FULL"); - if (freeze) - appendPQExpBufferStr(&sql, " FREEZE"); - if (verbose) - appendPQExpBufferStr(&sql, " VERBOSE"); - if (and_analyze) - appendPQExpBufferStr(&sql, " ANALYZE"); + if (vacopts->full) + appendPQExpBufferStr(sql, " FULL"); + if (vacopts->freeze) + appendPQExpBufferStr(sql, " FREEZE"); + if (vacopts->verbose) + appendPQExpBufferStr(sql, " VERBOSE"); + if (vacopts->and_analyze) + appendPQExpBufferStr(sql, " ANALYZE"); } } + if (table) - appendPQExpBuffer(&sql, " %s", table); - appendPQExpBufferStr(&sql, ";"); + appendPQExpBuffer(sql, " %s", table); + appendPQExpBufferChar(sql, ';'); +} - if (analyze_in_stages) +/* + * Execute a vacuum/analyze command to the server. + * + * Result status is checked only if 'async' is false. + */ +static void +run_vacuum_command(PGconn *conn, const char *sql, bool echo, + const char *dbname, const char *table, + const char *progname, bool async) +{ + if (async) + { + if (echo) + printf("%s\n", sql); + + PQsendQuery(conn, sql); + } + else if (!executeMaintenanceCommand(conn, sql, echo)) + { + if (table) + fprintf(stderr, + _("%s: vacuuming of table \"%s\" in database \"%s\" failed: %s"), + progname, table, dbname, PQerrorMessage(conn)); + else + fprintf(stderr, _("%s: vacuuming of database \"%s\" failed: %s"), + progname, dbname, PQerrorMessage(conn)); + PQfinish(conn); + exit(1); + } +} + +/* + * GetIdleSlot + * Return a connection slot that is ready to execute a command. + * + * We return the first slot we find that is marked isFree, if one is; + * otherwise, we loop on select() until one socket becomes available. When + * this happens, we read the whole set and mark as free all sockets that become + * available. + * + * Process the slot list, if any free slot is available then return the slotid + * else perform the select on all the socket's and wait until at least one slot + * becomes available. + * + * If an error occurs, NULL is returned. + */ +static ParallelSlot * +GetIdleSlot(ParallelSlot slots[], int numslots, const char *dbname, + const char *progname) +{ + int i; + int firstFree = -1; + fd_set slotset; + pgsocket maxFd; + + for (i = 0; i < numslots; i++) + if ((slots + i)->isFree) + return slots + i; + + FD_ZERO(&slotset); + + maxFd = slots->sock; + for (i = 0; i < numslots; i++) + { + FD_SET((slots + i)->sock, &slotset); + if ((slots + i)->sock > maxFd) + maxFd = (slots + i)->sock; + } + + /* + * No free slot found, so wait until one of the connections has finished + * its task and return the available slot. + */ + for (firstFree = -1; firstFree < 0;) { - const char *stage_commands[] = { - "SET default_statistics_target=1; SET vacuum_cost_delay=0;", - "SET default_statistics_target=10; RESET vacuum_cost_delay;", - "RESET default_statistics_target;" - }; - const char *stage_messages[] = { - gettext_noop("Generating minimal optimizer statistics (1 target)"), - gettext_noop("Generating medium optimizer statistics (10 targets)"), - gettext_noop("Generating default (full) optimizer statistics") - }; - - if (stage == -1) + bool aborting; + + SetCancelConn(slots->connection); + i = select_loop(maxFd, &slotset, &aborting); + ResetCancelConn(); + + if (aborting) { - int i; + /* + * We set the cancel-receiving connection to the one in the zeroth + * slot above, so fetch the error from there. + */ + GetQueryResult(slots->connection, dbname, progname); + return NULL; + } + Assert(i != 0); - /* Run all stages. */ - for (i = 0; i < 3; i++) - { - if (!quiet) - { - puts(gettext(stage_messages[i])); - fflush(stdout); - } - executeCommand(conn, stage_commands[i], progname, echo); - run_vacuum_command(conn, sql.data, echo, dbname, table, progname); - } + for (i = 0; i < numslots; i++) + { + if (!FD_ISSET((slots + i)->sock, &slotset)) + continue; + + PQconsumeInput((slots + i)->connection); + if (PQisBusy((slots + i)->connection)) + continue; + + (slots + i)->isFree = true; + + if (!GetQueryResult((slots + i)->connection, dbname, progname)) + return NULL; + + if (firstFree < 0) + firstFree = i; } - else + } + + return slots + firstFree; +} + +/* + * GetQueryResult + * + * Process the query result. Returns true if there's no error, false + * otherwise -- but errors about trying to vacuum a missing relation are + * reported and subsequently ignored. + */ +static bool +GetQueryResult(PGconn *conn, const char *dbname, const char *progname) +{ + PGresult *result; + + SetCancelConn(conn); + while ((result = PQgetResult(conn)) != NULL) + { + /* + * If errors are found, report them. Errors about a missing table are + * harmless so we continue processing; but die for other errors. + */ + if (PQresultStatus(result) != PGRES_COMMAND_OK) { - /* Otherwise, we got a stage from vacuum_all_databases(), so run - * only that one. */ - if (!quiet) + char *sqlState = PQresultErrorField(result, PG_DIAG_SQLSTATE); + + fprintf(stderr, _("%s: vacuuming of database \"%s\" failed: %s"), + progname, dbname, PQerrorMessage(conn)); + + if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0) { - puts(gettext(stage_messages[stage])); - fflush(stdout); + PQclear(result); + return false; } - executeCommand(conn, stage_commands[stage], progname, echo); - run_vacuum_command(conn, sql.data, echo, dbname, table, progname); } + PQclear(result); } - else - run_vacuum_command(conn, sql.data, echo, dbname, NULL, progname); + ResetCancelConn(); - PQfinish(conn); - termPQExpBuffer(&sql); + return true; } - +/* + * DisconnectDatabase + * Disconnect the connection associated with the given slot + */ static void -vacuum_all_databases(bool full, bool verbose, bool and_analyze, bool analyze_only, - bool analyze_in_stages, bool freeze, const char *maintenance_db, - const char *host, const char *port, - const char *username, enum trivalue prompt_password, - const char *progname, bool echo, bool quiet) +DisconnectDatabase(ParallelSlot *slot) { - PGconn *conn; - PGresult *result; - int stage; + char errbuf[256]; - conn = connectMaintenanceDatabase(maintenance_db, host, port, - username, prompt_password, progname); - result = executeQuery(conn, "SELECT datname FROM pg_database WHERE datallowconn ORDER BY 1;", progname, echo); - PQfinish(conn); + if (!slot->connection) + return; - /* If analyzing in stages, then run through all stages. Otherwise just - * run once, passing -1 as the stage. */ - for (stage = (analyze_in_stages ? 0 : -1); - stage < (analyze_in_stages ? 3 : 0); - stage++) + if (PQtransactionStatus(slot->connection) == PQTRANS_ACTIVE) { - int i; + PGcancel *cancel; - for (i = 0; i < PQntuples(result); i++) + if ((cancel = PQgetCancel(slot->connection))) { - char *dbname = PQgetvalue(result, i, 0); + PQcancel(cancel, errbuf, sizeof(errbuf)); + PQfreeCancel(cancel); + } + } - if (!quiet) - { - printf(_("%s: vacuuming database \"%s\"\n"), progname, dbname); - fflush(stdout); - } + PQfinish(slot->connection); + slot->connection = NULL; +} - vacuum_one_database(dbname, full, verbose, and_analyze, analyze_only, - analyze_in_stages, stage, - freeze, NULL, host, port, username, prompt_password, - progname, echo, quiet); +/* + * Loop on select() until a descriptor from the given set becomes readable. + * + * If we get a cancel request while we're waiting, we forego all further + * processing and set the *aborting flag to true. The return value must be + * ignored in this case. Otherwise, *aborting is set to false. + */ +static int +select_loop(int maxFd, fd_set *workerset, bool *aborting) +{ + int i; + fd_set saveSet = *workerset; + + if (CancelRequested) + { + *aborting = true; + return -1; + } + else + *aborting = false; + + for (;;) + { + /* + * On Windows, we need to check once in a while for cancel requests; + * on other platforms we rely on select() returning when interrupted. + */ + struct timeval *tvp; +#ifdef WIN32 + struct timeval tv = {0, 1000000}; + + tvp = &tv; +#else + tvp = NULL; +#endif + + *workerset = saveSet; + i = select(maxFd + 1, workerset, NULL, NULL, tvp); + +#ifdef WIN32 + if (i == SOCKET_ERROR) + { + i = -1; + + if (WSAGetLastError() == WSAEINTR) + errno == EINTR; } +#endif + + if (i < 0 && errno == EINTR) + continue; /* ignore this */ + if (i < 0 || CancelRequested) + *aborting = true; /* but not this */ + if (i == 0) + continue; /* timeout (Win32 only) */ + break; } - PQclear(result); + return i; } +static void +init_slot(ParallelSlot *slot, PGconn *conn) +{ + slot->connection = conn; + slot->isFree = true; + slot->sock = PQsocket(conn); +} static void help(const char *progname) @@ -436,6 +926,7 @@ help(const char *progname) printf(_(" -V, --version output version information, then exit\n")); printf(_(" -z, --analyze update optimizer statistics\n")); printf(_(" -Z, --analyze-only only update optimizer statistics\n")); + printf(_(" -j, --jobs=NUM use this many concurrent connections to vacuum\n")); printf(_(" --analyze-in-stages only update optimizer statistics, in multiple\n" " stages for faster results\n")); printf(_(" -?, --help show this help, then exit\n")); -- 2.40.0