dropdb: dropdb.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
dropuser: dropuser.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
clusterdb: clusterdb.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
-vacuumdb: vacuumdb.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
+vacuumdb: vacuumdb.o common.o scripts_parallel.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
reindexdb: reindexdb.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
pg_isready: pg_isready.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
clean distclean maintainer-clean:
rm -f $(addsuffix $(X), $(PROGRAMS)) $(addsuffix .o, $(PROGRAMS))
- rm -f common.o $(WIN32RES)
+ rm -f common.o scripts_parallel.o $(WIN32RES)
rm -rf tmp_check
check:
if (table)
{
appendPQExpBufferChar(&sql, ' ');
- appendQualifiedRelation(&sql, table, conn, progname, echo);
+ appendQualifiedRelation(&sql, table, conn, echo);
}
appendPQExpBufferChar(&sql, ';');
conn = connectMaintenanceDatabase(maintenance_db, host, port, username,
prompt_password, progname, echo);
- result = executeQuery(conn, "SELECT datname FROM pg_database WHERE datallowconn ORDER BY 1;", progname, echo);
+ result = executeQuery(conn, "SELECT datname FROM pg_database WHERE datallowconn ORDER BY 1;", echo);
PQfinish(conn);
initPQExpBuffer(&connstr);
#include "fe_utils/connect.h"
#include "fe_utils/string_utils.h"
+#define ERRCODE_UNDEFINED_TABLE "42P01"
+
static PGcancel *volatile cancelConn = NULL;
bool CancelRequested = false;
exit(1);
}
- PQclear(executeQuery(conn, ALWAYS_SECURE_SEARCH_PATH_SQL,
- progname, echo));
+ PQclear(executeQuery(conn, ALWAYS_SECURE_SEARCH_PATH_SQL, echo));
return conn;
}
return conn;
}
+/*
+ * Disconnect the given connection, canceling any statement if one is active.
+ */
+void
+disconnectDatabase(PGconn *conn)
+{
+ char errbuf[256];
+
+ Assert(conn != NULL);
+
+ if (PQtransactionStatus(conn) == PQTRANS_ACTIVE)
+ {
+ PGcancel *cancel;
+
+ if ((cancel = PQgetCancel(conn)))
+ {
+ (void) PQcancel(cancel, errbuf, sizeof(errbuf));
+ PQfreeCancel(cancel);
+ }
+ }
+
+ PQfinish(conn);
+}
+
/*
* Run a query, return the results, exit program on failure.
*/
PGresult *
-executeQuery(PGconn *conn, const char *query, const char *progname, bool echo)
+executeQuery(PGconn *conn, const char *query, bool echo)
{
PGresult *res;
* As above for a SQL command (which returns nothing).
*/
void
-executeCommand(PGconn *conn, const char *query,
- const char *progname, bool echo)
+executeCommand(PGconn *conn, const char *query, bool echo)
{
PGresult *res;
return r;
}
+/*
+ * Consume all the results generated for the given connection until
+ * nothing remains. If at least one error is encountered, return false.
+ * Note that this will block if the connection is busy.
+ */
+bool
+consumeQueryResult(PGconn *conn)
+{
+ bool ok = true;
+ PGresult *result;
+
+ SetCancelConn(conn);
+ while ((result = PQgetResult(conn)) != NULL)
+ {
+ if (!processQueryResult(conn, result))
+ ok = false;
+ }
+ ResetCancelConn();
+ return ok;
+}
+
+/*
+ * Process (and delete) a query result. Returns true if there's no error,
+ * false otherwise -- but errors about trying to work on a missing relation
+ * are reported and subsequently ignored.
+ */
+bool
+processQueryResult(PGconn *conn, PGresult *result)
+{
+ /*
+ * If it's an error, report it. Errors about a missing table are harmless
+ * so we continue processing; but die for other errors.
+ */
+ if (PQresultStatus(result) != PGRES_COMMAND_OK)
+ {
+ char *sqlState = PQresultErrorField(result, PG_DIAG_SQLSTATE);
+
+ pg_log_error("processing of database \"%s\" failed: %s",
+ PQdb(conn), PQerrorMessage(conn));
+
+ if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
+ {
+ PQclear(result);
+ return false;
+ }
+ }
+
+ PQclear(result);
+ return true;
+}
+
/*
* Split TABLE[(COLUMNS)] into TABLE and [(COLUMNS)] portions. When you
*/
void
appendQualifiedRelation(PQExpBuffer buf, const char *spec,
- PGconn *conn, const char *progname, bool echo)
+ PGconn *conn, bool echo)
{
char *table;
const char *columns;
appendStringLiteralConn(&sql, table, conn);
appendPQExpBufferStr(&sql, "::pg_catalog.regclass;");
- executeCommand(conn, "RESET search_path;", progname, echo);
+ executeCommand(conn, "RESET search_path;", echo);
/*
* One row is a typical result, as is a nonexistent relation ERROR.
* relation has that OID; this query returns no rows. Catalog corruption
* might elicit other row counts.
*/
- res = executeQuery(conn, sql.data, progname, echo);
+ res = executeQuery(conn, sql.data, echo);
ntups = PQntuples(res);
if (ntups != 1)
{
termPQExpBuffer(&sql);
pg_free(table);
- PQclear(executeQuery(conn, ALWAYS_SECURE_SEARCH_PATH_SQL,
- progname, echo));
+ PQclear(executeQuery(conn, ALWAYS_SECURE_SEARCH_PATH_SQL, echo));
}
const char *pguser, enum trivalue prompt_password,
const char *progname, bool echo);
-extern PGresult *executeQuery(PGconn *conn, const char *query,
- const char *progname, bool echo);
+extern void disconnectDatabase(PGconn *conn);
-extern void executeCommand(PGconn *conn, const char *query,
- const char *progname, bool echo);
+extern PGresult *executeQuery(PGconn *conn, const char *query, bool echo);
+
+extern void executeCommand(PGconn *conn, const char *query, bool echo);
extern bool executeMaintenanceCommand(PGconn *conn, const char *query,
bool echo);
+extern bool consumeQueryResult(PGconn *conn);
+
+extern bool processQueryResult(PGconn *conn, PGresult *result);
+
extern void splitTableColumnsSpec(const char *spec, int encoding,
char **table, const char **columns);
extern void appendQualifiedRelation(PQExpBuffer buf, const char *name,
- PGconn *conn, const char *progname, bool echo);
+ PGconn *conn, bool echo);
extern bool yesno_prompt(const char *question);
break;
case REINDEX_INDEX:
case REINDEX_TABLE:
- appendQualifiedRelation(&sql, name, conn, progname, echo);
+ appendQualifiedRelation(&sql, name, conn, echo);
break;
case REINDEX_SCHEMA:
appendPQExpBufferStr(&sql, name);
conn = connectMaintenanceDatabase(maintenance_db, host, port, username,
prompt_password, progname, echo);
- result = executeQuery(conn, "SELECT datname FROM pg_database WHERE datallowconn ORDER BY 1;", progname, echo);
+ result = executeQuery(conn, "SELECT datname FROM pg_database WHERE datallowconn ORDER BY 1;", echo);
PQfinish(conn);
initPQExpBuffer(&connstr);
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * scripts_parallel.c
+ * Parallel support for bin/scripts/
+ *
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/bin/scripts/scripts_parallel.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#ifdef HAVE_SYS_SELECT_H
+#include <sys/select.h>
+#endif
+
+#include "common.h"
+#include "common/logging.h"
+#include "scripts_parallel.h"
+
+static void init_slot(ParallelSlot *slot, PGconn *conn);
+static int select_loop(int maxFd, fd_set *workerset, bool *aborting);
+
+static void
+init_slot(ParallelSlot *slot, PGconn *conn)
+{
+ slot->connection = conn;
+ /* Initially assume connection is idle */
+ slot->isFree = true;
+}
+
+/*
+ * Loop on select() until a descriptor from the given set becomes readable.
+ *
+ * If we get a cancel request while we're waiting, we forego all further
+ * processing and set the *aborting flag to true. The return value must be
+ * ignored in this case. Otherwise, *aborting is set to false.
+ */
+static int
+select_loop(int maxFd, fd_set *workerset, bool *aborting)
+{
+ int i;
+ fd_set saveSet = *workerset;
+
+ if (CancelRequested)
+ {
+ *aborting = true;
+ return -1;
+ }
+ else
+ *aborting = false;
+
+ for (;;)
+ {
+ /*
+ * On Windows, we need to check once in a while for cancel requests;
+ * on other platforms we rely on select() returning when interrupted.
+ */
+ struct timeval *tvp;
+#ifdef WIN32
+ struct timeval tv = {0, 1000000};
+
+ tvp = &tv;
+#else
+ tvp = NULL;
+#endif
+
+ *workerset = saveSet;
+ i = select(maxFd + 1, workerset, NULL, NULL, tvp);
+
+#ifdef WIN32
+ if (i == SOCKET_ERROR)
+ {
+ i = -1;
+
+ if (WSAGetLastError() == WSAEINTR)
+ errno = EINTR;
+ }
+#endif
+
+ if (i < 0 && errno == EINTR)
+ continue; /* ignore this */
+ if (i < 0 || CancelRequested)
+ *aborting = true; /* but not this */
+ if (i == 0)
+ continue; /* timeout (Win32 only) */
+ break;
+ }
+
+ return i;
+}
+
+/*
+ * ParallelSlotsGetIdle
+ * Return a connection slot that is ready to execute a command.
+ *
+ * This returns the first slot we find that is marked isFree, if one is;
+ * otherwise, we loop on select() until one socket becomes available. When
+ * this happens, we read the whole set and mark as free all sockets that
+ * become available. If an error occurs, NULL is returned.
+ */
+ParallelSlot *
+ParallelSlotsGetIdle(ParallelSlot *slots, int numslots)
+{
+ int i;
+ int firstFree = -1;
+
+ /*
+ * Look for any connection currently free. If there is one, mark it as
+ * taken and let the caller know the slot to use.
+ */
+ for (i = 0; i < numslots; i++)
+ {
+ if (slots[i].isFree)
+ {
+ slots[i].isFree = false;
+ return slots + i;
+ }
+ }
+
+ /*
+ * No free slot found, so wait until one of the connections has finished
+ * its task and return the available slot.
+ */
+ while (firstFree < 0)
+ {
+ fd_set slotset;
+ int maxFd = 0;
+ bool aborting;
+
+ /* We must reconstruct the fd_set for each call to select_loop */
+ FD_ZERO(&slotset);
+
+ for (i = 0; i < numslots; i++)
+ {
+ int sock = PQsocket(slots[i].connection);
+
+ /*
+ * We don't really expect any connections to lose their sockets
+ * after startup, but just in case, cope by ignoring them.
+ */
+ if (sock < 0)
+ continue;
+
+ FD_SET(sock, &slotset);
+ if (sock > maxFd)
+ maxFd = sock;
+ }
+
+ SetCancelConn(slots->connection);
+ i = select_loop(maxFd, &slotset, &aborting);
+ ResetCancelConn();
+
+ if (aborting)
+ {
+ /*
+ * We set the cancel-receiving connection to the one in the zeroth
+ * slot above, so fetch the error from there.
+ */
+ consumeQueryResult(slots->connection);
+ return NULL;
+ }
+ Assert(i != 0);
+
+ for (i = 0; i < numslots; i++)
+ {
+ int sock = PQsocket(slots[i].connection);
+
+ if (sock >= 0 && FD_ISSET(sock, &slotset))
+ {
+ /* select() says input is available, so consume it */
+ PQconsumeInput(slots[i].connection);
+ }
+
+ /* Collect result(s) as long as any are available */
+ while (!PQisBusy(slots[i].connection))
+ {
+ PGresult *result = PQgetResult(slots[i].connection);
+
+ if (result != NULL)
+ {
+ /* Check and discard the command result */
+ if (!processQueryResult(slots[i].connection, result))
+ return NULL;
+ }
+ else
+ {
+ /* This connection has become idle */
+ slots[i].isFree = true;
+ if (firstFree < 0)
+ firstFree = i;
+ break;
+ }
+ }
+ }
+ }
+
+ slots[firstFree].isFree = false;
+ return slots + firstFree;
+}
+
+/*
+ * ParallelSlotsSetup
+ * Prepare a set of parallel slots to use on a given database.
+ *
+ * This creates and initializes a set of connections to the database
+ * using the information given by the caller, marking all parallel slots
+ * as free and ready to use. "conn" is an initial connection set up
+ * by the caller and is associated with the first slot in the parallel
+ * set.
+ */
+ParallelSlot *
+ParallelSlotsSetup(const char *dbname, const char *host, const char *port,
+ const char *username, bool prompt_password,
+ const char *progname, bool echo,
+ PGconn *conn, int numslots)
+{
+ ParallelSlot *slots;
+ int i;
+
+ Assert(conn != NULL);
+
+ slots = (ParallelSlot *) pg_malloc(sizeof(ParallelSlot) * numslots);
+ init_slot(slots, conn);
+ if (numslots > 1)
+ {
+ for (i = 1; i < numslots; i++)
+ {
+ conn = connectDatabase(dbname, host, port, username, prompt_password,
+ progname, echo, false, true);
+ init_slot(slots + i, conn);
+ }
+ }
+
+ return slots;
+}
+
+/*
+ * ParallelSlotsTerminate
+ * Clean up a set of parallel slots
+ *
+ * Iterate through all connections in a given set of ParallelSlots and
+ * terminate all connections.
+ */
+void
+ParallelSlotsTerminate(ParallelSlot *slots, int numslots)
+{
+ int i;
+
+ for (i = 0; i < numslots; i++)
+ {
+ PGconn *conn = slots[i].connection;
+
+ if (conn == NULL)
+ continue;
+
+ disconnectDatabase(conn);
+ }
+}
+
+/*
+ * ParallelSlotsWaitCompletion
+ *
+ * Wait for all connections to finish, returning false if at least one
+ * error has been found on the way.
+ */
+bool
+ParallelSlotsWaitCompletion(ParallelSlot *slots, int numslots)
+{
+ int i;
+
+ for (i = 0; i < numslots; i++)
+ {
+ if (!consumeQueryResult((slots + i)->connection))
+ return false;
+ }
+
+ return true;
+}
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * scripts_parallel.h
+ * Parallel support for bin/scripts/
+ *
+ * Copyright (c) 2003-2019, PostgreSQL Global Development Group
+ *
+ * src/bin/scripts/scripts_parallel.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef SCRIPTS_PARALLEL_H
+#define SCRIPTS_PARALLEL_H
+
+
+typedef struct ParallelSlot
+{
+ PGconn *connection; /* One connection */
+ bool isFree; /* Is it known to be idle? */
+} ParallelSlot;
+
+extern ParallelSlot *ParallelSlotsGetIdle(ParallelSlot *slots, int numslots);
+
+extern ParallelSlot *ParallelSlotsSetup(const char *dbname, const char *host,
+ const char *port,
+ const char *username,
+ bool prompt_password,
+ const char *progname, bool echo,
+ PGconn *conn, int numslots);
+
+extern void ParallelSlotsTerminate(ParallelSlot *slots, int numslots);
+
+extern bool ParallelSlotsWaitCompletion(ParallelSlot *slots, int numslots);
+
+
+#endif /* SCRIPTS_PARALLEL_H */
#include "postgres_fe.h"
-#ifdef HAVE_SYS_SELECT_H
-#include <sys/select.h>
-#endif
-
#include "catalog/pg_class_d.h"
#include "common.h"
#include "fe_utils/connect.h"
#include "fe_utils/simple_list.h"
#include "fe_utils/string_utils.h"
+#include "scripts_parallel.h"
-#define ERRCODE_UNDEFINED_TABLE "42P01"
-
-/* Parallel vacuuming stuff */
-typedef struct ParallelSlot
-{
- PGconn *connection; /* One connection */
- bool isFree; /* Is it known to be idle? */
-} ParallelSlot;
-
/* vacuum options controlled by user flags */
typedef struct vacuumingOptions
{
vacuumingOptions *vacopts, const char *table);
static void run_vacuum_command(PGconn *conn, const char *sql, bool echo,
- const char *table, const char *progname, bool async);
-
-static ParallelSlot *GetIdleSlot(ParallelSlot slots[], int numslots,
- const char *progname);
-
-static bool ProcessQueryResult(PGconn *conn, PGresult *result,
- const char *progname);
-
-static bool GetQueryResult(PGconn *conn, const char *progname);
-
-static void DisconnectDatabase(ParallelSlot *slot);
-
-static int select_loop(int maxFd, fd_set *workerset, bool *aborting);
-
-static void init_slot(ParallelSlot *slot, PGconn *conn);
+ const char *table, const char *progname);
static void help(const char *progname);
* query for consistency with table lookups done elsewhere by the user.
*/
appendPQExpBufferStr(&catalog_query, " ORDER BY c.relpages DESC;");
- executeCommand(conn, "RESET search_path;", progname, echo);
- res = executeQuery(conn, catalog_query.data, progname, echo);
+ executeCommand(conn, "RESET search_path;", echo);
+ res = executeQuery(conn, catalog_query.data, echo);
termPQExpBuffer(&catalog_query);
- PQclear(executeQuery(conn, ALWAYS_SECURE_SEARCH_PATH_SQL,
- progname, echo));
+ PQclear(executeQuery(conn, ALWAYS_SECURE_SEARCH_PATH_SQL, echo));
/*
* If no rows are returned, there are no matching tables, so we are done.
*/
if (concurrentCons <= 0)
concurrentCons = 1;
- slots = (ParallelSlot *) pg_malloc(sizeof(ParallelSlot) * concurrentCons);
- init_slot(slots, conn);
- if (parallel)
- {
- for (i = 1; i < concurrentCons; i++)
- {
- conn = connectDatabase(dbname, host, port, username, prompt_password,
- progname, echo, false, true);
- init_slot(slots + i, conn);
- }
- }
+
+ slots = ParallelSlotsSetup(dbname, host, port, username, prompt_password,
+ progname, echo, conn, concurrentCons);
/*
* Prepare all the connections to run the appropriate analyze stage, if
for (j = 0; j < concurrentCons; j++)
executeCommand((slots + j)->connection,
- stage_commands[stage], progname, echo);
+ stage_commands[stage], echo);
}
initPQExpBuffer(&sql);
goto finish;
}
- /*
- * Get the connection slot to use. If in parallel mode, here we wait
- * for one connection to become available if none already is. In
- * non-parallel mode we simply use the only slot we have, which we
- * know to be free.
- */
- if (parallel)
+ free_slot = ParallelSlotsGetIdle(slots, concurrentCons);
+ if (!free_slot)
{
- /*
- * Get a free slot, waiting until one becomes free if none
- * currently is.
- */
- free_slot = GetIdleSlot(slots, concurrentCons, progname);
- if (!free_slot)
- {
- failed = true;
- goto finish;
- }
-
- free_slot->isFree = false;
+ failed = true;
+ goto finish;
}
- else
- free_slot = slots;
prepare_vacuum_command(&sql, PQserverVersion(free_slot->connection),
vacopts, tabname);
/*
- * Execute the vacuum. If not in parallel mode, this terminates the
- * program in case of an error. (The parallel case handles query
- * errors in ProcessQueryResult through GetIdleSlot.)
+ * Execute the vacuum. All errors are handled in processQueryResult
+ * through ParallelSlotsGetIdle.
*/
run_vacuum_command(free_slot->connection, sql.data,
- echo, tabname, progname, parallel);
+ echo, tabname, progname);
cell = cell->next;
} while (cell != NULL);
- if (parallel)
- {
- int j;
-
- /* wait for all connections to finish */
- for (j = 0; j < concurrentCons; j++)
- {
- if (!GetQueryResult((slots + j)->connection, progname))
- {
- failed = true;
- goto finish;
- }
- }
- }
+ if (!ParallelSlotsWaitCompletion(slots, concurrentCons))
+ failed = true;
finish:
- for (i = 0; i < concurrentCons; i++)
- DisconnectDatabase(slots + i);
- pfree(slots);
+ ParallelSlotsTerminate(slots, concurrentCons);
+ pg_free(slots);
termPQExpBuffer(&sql);
prompt_password, progname, echo);
result = executeQuery(conn,
"SELECT datname FROM pg_database WHERE datallowconn ORDER BY 1;",
- progname, echo);
+ echo);
PQfinish(conn);
initPQExpBuffer(&connstr);
}
/*
- * Send a vacuum/analyze command to the server. In async mode, return after
- * sending the command; else, wait for it to finish.
+ * Send a vacuum/analyze command to the server, returning after sending the
+ * command.
*
- * Any errors during command execution are reported to stderr. If async is
- * false, this function exits the program after reporting the error.
+ * Any errors during command execution are reported to stderr.
*/
static void
run_vacuum_command(PGconn *conn, const char *sql, bool echo,
- const char *table, const char *progname, bool async)
+ const char *table, const char *progname)
{
bool status;
- if (async)
- {
- if (echo)
- printf("%s\n", sql);
+ if (echo)
+ printf("%s\n", sql);
- status = PQsendQuery(conn, sql) == 1;
- }
- else
- status = executeMaintenanceCommand(conn, sql, echo);
+ status = PQsendQuery(conn, sql) == 1;
if (!status)
{
else
pg_log_error("vacuuming of database \"%s\" failed: %s",
PQdb(conn), PQerrorMessage(conn));
-
- if (!async)
- {
- PQfinish(conn);
- exit(1);
- }
}
}
-/*
- * GetIdleSlot
- * Return a connection slot that is ready to execute a command.
- *
- * We return the first slot we find that is marked isFree, if one is;
- * otherwise, we loop on select() until one socket becomes available. When
- * this happens, we read the whole set and mark as free all sockets that become
- * available.
- *
- * If an error occurs, NULL is returned.
- */
-static ParallelSlot *
-GetIdleSlot(ParallelSlot slots[], int numslots,
- const char *progname)
-{
- int i;
- int firstFree = -1;
-
- /* Any connection already known free? */
- for (i = 0; i < numslots; i++)
- {
- if (slots[i].isFree)
- return slots + i;
- }
-
- /*
- * No free slot found, so wait until one of the connections has finished
- * its task and return the available slot.
- */
- while (firstFree < 0)
- {
- fd_set slotset;
- int maxFd = 0;
- bool aborting;
-
- /* We must reconstruct the fd_set for each call to select_loop */
- FD_ZERO(&slotset);
-
- for (i = 0; i < numslots; i++)
- {
- int sock = PQsocket(slots[i].connection);
-
- /*
- * We don't really expect any connections to lose their sockets
- * after startup, but just in case, cope by ignoring them.
- */
- if (sock < 0)
- continue;
-
- FD_SET(sock, &slotset);
- if (sock > maxFd)
- maxFd = sock;
- }
-
- SetCancelConn(slots->connection);
- i = select_loop(maxFd, &slotset, &aborting);
- ResetCancelConn();
-
- if (aborting)
- {
- /*
- * We set the cancel-receiving connection to the one in the zeroth
- * slot above, so fetch the error from there.
- */
- GetQueryResult(slots->connection, progname);
- return NULL;
- }
- Assert(i != 0);
-
- for (i = 0; i < numslots; i++)
- {
- int sock = PQsocket(slots[i].connection);
-
- if (sock >= 0 && FD_ISSET(sock, &slotset))
- {
- /* select() says input is available, so consume it */
- PQconsumeInput(slots[i].connection);
- }
-
- /* Collect result(s) as long as any are available */
- while (!PQisBusy(slots[i].connection))
- {
- PGresult *result = PQgetResult(slots[i].connection);
-
- if (result != NULL)
- {
- /* Check and discard the command result */
- if (!ProcessQueryResult(slots[i].connection, result,
- progname))
- return NULL;
- }
- else
- {
- /* This connection has become idle */
- slots[i].isFree = true;
- if (firstFree < 0)
- firstFree = i;
- break;
- }
- }
- }
- }
-
- return slots + firstFree;
-}
-
-/*
- * ProcessQueryResult
- *
- * Process (and delete) a query result. Returns true if there's no error,
- * false otherwise -- but errors about trying to vacuum a missing relation
- * are reported and subsequently ignored.
- */
-static bool
-ProcessQueryResult(PGconn *conn, PGresult *result, const char *progname)
-{
- /*
- * If it's an error, report it. Errors about a missing table are harmless
- * so we continue processing; but die for other errors.
- */
- if (PQresultStatus(result) != PGRES_COMMAND_OK)
- {
- char *sqlState = PQresultErrorField(result, PG_DIAG_SQLSTATE);
-
- pg_log_error("vacuuming of database \"%s\" failed: %s",
- PQdb(conn), PQerrorMessage(conn));
-
- if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
- {
- PQclear(result);
- return false;
- }
- }
-
- PQclear(result);
- return true;
-}
-
-/*
- * GetQueryResult
- *
- * Pump the conn till it's dry of results; return false if any are errors.
- * Note that this will block if the conn is busy.
- */
-static bool
-GetQueryResult(PGconn *conn, const char *progname)
-{
- bool ok = true;
- PGresult *result;
-
- SetCancelConn(conn);
- while ((result = PQgetResult(conn)) != NULL)
- {
- if (!ProcessQueryResult(conn, result, progname))
- ok = false;
- }
- ResetCancelConn();
- return ok;
-}
-
-/*
- * DisconnectDatabase
- * Disconnect the connection associated with the given slot
- */
-static void
-DisconnectDatabase(ParallelSlot *slot)
-{
- char errbuf[256];
-
- if (!slot->connection)
- return;
-
- if (PQtransactionStatus(slot->connection) == PQTRANS_ACTIVE)
- {
- PGcancel *cancel;
-
- if ((cancel = PQgetCancel(slot->connection)))
- {
- (void) PQcancel(cancel, errbuf, sizeof(errbuf));
- PQfreeCancel(cancel);
- }
- }
-
- PQfinish(slot->connection);
- slot->connection = NULL;
-}
-
-/*
- * Loop on select() until a descriptor from the given set becomes readable.
- *
- * If we get a cancel request while we're waiting, we forego all further
- * processing and set the *aborting flag to true. The return value must be
- * ignored in this case. Otherwise, *aborting is set to false.
- */
-static int
-select_loop(int maxFd, fd_set *workerset, bool *aborting)
-{
- int i;
- fd_set saveSet = *workerset;
-
- if (CancelRequested)
- {
- *aborting = true;
- return -1;
- }
- else
- *aborting = false;
-
- for (;;)
- {
- /*
- * On Windows, we need to check once in a while for cancel requests;
- * on other platforms we rely on select() returning when interrupted.
- */
- struct timeval *tvp;
-#ifdef WIN32
- struct timeval tv = {0, 1000000};
-
- tvp = &tv;
-#else
- tvp = NULL;
-#endif
-
- *workerset = saveSet;
- i = select(maxFd + 1, workerset, NULL, NULL, tvp);
-
-#ifdef WIN32
- if (i == SOCKET_ERROR)
- {
- i = -1;
-
- if (WSAGetLastError() == WSAEINTR)
- errno = EINTR;
- }
-#endif
-
- if (i < 0 && errno == EINTR)
- continue; /* ignore this */
- if (i < 0 || CancelRequested)
- *aborting = true; /* but not this */
- if (i == 0)
- continue; /* timeout (Win32 only) */
- break;
- }
-
- return i;
-}
-
-static void
-init_slot(ParallelSlot *slot, PGconn *conn)
-{
- slot->connection = conn;
- /* Initially assume connection is idle */
- slot->isFree = true;
-}
-
static void
help(const char *progname)
{