From 775f1b379e3a282140f60ef65a11d1444dc80ccf Mon Sep 17 00:00:00 2001 From: Andrew Dunstan Date: Mon, 2 Feb 2009 20:07:37 +0000 Subject: [PATCH] Provide for parallel restoration from a custom format archive. Each data and post-data step is run in a separate worker child (a thread on Windows, a child process elsewhere) up to the concurrent number specified by the new pg_restore command-line --multi-thread | -m switch. Andrew Dunstan, with some editing by Tom Lane. --- doc/src/sgml/ref/pg_restore.sgml | 24 +- src/bin/pg_dump/pg_backup.h | 14 +- src/bin/pg_dump/pg_backup_archiver.c | 1358 ++++++++++++++++++++++---- src/bin/pg_dump/pg_backup_archiver.h | 30 +- src/bin/pg_dump/pg_backup_custom.c | 104 +- src/bin/pg_dump/pg_backup_db.c | 47 +- src/bin/pg_dump/pg_backup_files.c | 5 +- src/bin/pg_dump/pg_backup_null.c | 5 +- src/bin/pg_dump/pg_backup_tar.c | 6 +- src/bin/pg_dump/pg_dump.c | 167 ++-- src/bin/pg_dump/pg_restore.c | 25 +- 11 files changed, 1509 insertions(+), 276 deletions(-) diff --git a/doc/src/sgml/ref/pg_restore.sgml b/doc/src/sgml/ref/pg_restore.sgml index f13c644772..92c9f4be93 100644 --- a/doc/src/sgml/ref/pg_restore.sgml +++ b/doc/src/sgml/ref/pg_restore.sgml @@ -1,4 +1,4 @@ - + @@ -241,6 +241,28 @@ + + + + + + Run the most time-consuming parts of pg_restore + — those which load data, create indexes, or create + constraints — using multiple concurrent connections to the + database. This option can dramatically reduce the time to restore a + large database to a server running on a multi-processor machine. + + + + This option is ignored when emitting a script rather than connecting + directly to a database server. Multiple threads cannot be used + together with . Also, the input + must be a plain file (not, for example, a pipe), and at present only + the custom archive format is supported. + + + + diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h index fa7e2d55f1..72bc066853 100644 --- a/src/bin/pg_dump/pg_backup.h +++ b/src/bin/pg_dump/pg_backup.h @@ -15,7 +15,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup.h,v 1.48 2009/01/05 16:54:36 tgl Exp $ + * $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup.h,v 1.49 2009/02/02 20:07:36 adunstan Exp $ * *------------------------------------------------------------------------- */ @@ -53,6 +53,14 @@ typedef enum _archiveMode archModeRead } ArchiveMode; +typedef enum _teSection +{ + SECTION_NONE = 1, /* COMMENTs, ACLs, etc; can be anywhere */ + SECTION_PRE_DATA, /* stuff to be processed before data */ + SECTION_DATA, /* TABLE DATA, BLOBS, BLOB COMMENTS */ + SECTION_POST_DATA /* stuff to be processed after data */ +} teSection; + /* * We may want to have some more user-readable data, but in the mean * time this gives us some abstraction and type checking. @@ -124,6 +132,7 @@ typedef struct _restoreOptions int suppressDumpWarnings; /* Suppress output of WARNING entries * to stderr */ bool single_txn; + int number_of_threads; bool *idWanted; /* array showing which dump IDs to emit */ } RestoreOptions; @@ -152,7 +161,8 @@ extern void ArchiveEntry(Archive *AHX, const char *tag, const char *namespace, const char *tablespace, const char *owner, bool withOids, - const char *desc, const char *defn, + const char *desc, teSection section, + const char *defn, const char *dropStmt, const char *copyStmt, const DumpId *deps, int nDeps, DataDumperPtr dumpFn, void *dumpArg); diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c index 9d877cb2d3..076bb0bf6f 100644 --- a/src/bin/pg_dump/pg_backup_archiver.c +++ b/src/bin/pg_dump/pg_backup_archiver.c @@ -15,7 +15,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.c,v 1.161 2009/01/13 11:44:56 mha Exp $ + * $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.c,v 1.162 2009/02/02 20:07:36 adunstan Exp $ * *------------------------------------------------------------------------- */ @@ -24,8 +24,9 @@ #include "dumputils.h" #include - #include +#include +#include #ifdef WIN32 #include @@ -33,6 +34,44 @@ #include "libpq/libpq-fs.h" +/* + * Special exit values from worker children. We reserve 0 for normal + * success; 1 and other small values should be interpreted as crashes. + */ +#define WORKER_CREATE_DONE 10 +#define WORKER_INHIBIT_DATA 11 +#define WORKER_IGNORED_ERRORS 12 + +/* + * Unix uses exit to return result from worker child, so function is void. + * Windows thread result comes via function return. + */ +#ifndef WIN32 +#define parallel_restore_result void +#else +#define parallel_restore_result DWORD +#endif + +/* IDs for worker children are either PIDs or thread handles */ +#ifndef WIN32 +#define thandle pid_t +#else +#define thandle HANDLE +#endif + +typedef struct _restore_args +{ + ArchiveHandle *AH; + TocEntry *te; +} RestoreArgs; + +typedef struct _parallel_slot +{ + thandle child_id; + RestoreArgs *args; +} ParallelSlot; + +#define NO_SLOT (-1) const char *progname; @@ -71,6 +110,30 @@ static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim); static OutputContext SetOutput(ArchiveHandle *AH, char *filename, int compression); static void ResetOutput(ArchiveHandle *AH, OutputContext savedContext); +static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te, + RestoreOptions *ropt, bool is_parallel); +static void restore_toc_entries_parallel(ArchiveHandle *AH); +static thandle spawn_restore(RestoreArgs *args); +static thandle reap_child(ParallelSlot *slots, int n_slots, int *work_status); +static bool work_in_progress(ParallelSlot *slots, int n_slots); +static int get_next_slot(ParallelSlot *slots, int n_slots); +static TocEntry *get_next_work_item(ArchiveHandle *AH, + TocEntry **first_unprocessed, + ParallelSlot *slots, int n_slots); +static parallel_restore_result parallel_restore(RestoreArgs *args); +static void mark_work_done(ArchiveHandle *AH, thandle worker, int status, + ParallelSlot *slots, int n_slots); +static void fix_dependencies(ArchiveHandle *AH); +static void repoint_table_dependencies(ArchiveHandle *AH, + DumpId tableId, DumpId tableDataId); +static void identify_locking_dependencies(TocEntry *te, + TocEntry **tocsByDumpId); +static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te); +static void mark_create_done(ArchiveHandle *AH, TocEntry *te); +static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te); +static ArchiveHandle *CloneArchive(ArchiveHandle *AH); +static void DeCloneArchive(ArchiveHandle *AH); + /* * Wrapper functions. @@ -131,7 +194,6 @@ RestoreArchive(Archive *AHX, RestoreOptions *ropt) TocEntry *te; teReqs reqs; OutputContext sav; - bool defnDumped; AH->ropt = ropt; AH->stage = STAGE_INITIALIZING; @@ -146,6 +208,7 @@ RestoreArchive(Archive *AHX, RestoreOptions *ropt) */ if (ropt->create && ropt->dropSchema) die_horribly(AH, modulename, "-C and -c are incompatible options\n"); + /* * -1 is not compatible with -C, because we can't create a database * inside a transaction block. @@ -153,6 +216,21 @@ RestoreArchive(Archive *AHX, RestoreOptions *ropt) if (ropt->create && ropt->single_txn) die_horribly(AH, modulename, "-C and -1 are incompatible options\n"); + /* + * Make sure we won't need (de)compression we haven't got + */ +#ifndef HAVE_LIBZ + if (AH->compression != 0 && AH->PrintTocDataPtr != NULL) + { + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + reqs = _tocEntryRequired(te, ropt, false); + if (te->hadDumper && (reqs & REQ_DATA) != 0) + die_horribly(AH, modulename, "cannot restore from compressed archive (compression not supported in this installation)\n"); + } + } +#endif + /* * If we're using a DB connection, then connect it. */ @@ -268,148 +346,21 @@ RestoreArchive(Archive *AHX, RestoreOptions *ropt) */ if (AH->currSchema) free(AH->currSchema); - AH->currSchema = strdup(""); + AH->currSchema = NULL; } /* - * Now process each non-ACL TOC entry + * In serial mode, we now process each non-ACL TOC entry. + * + * In parallel mode, turn control over to the parallel-restore logic. */ - for (te = AH->toc->next; te != AH->toc; te = te->next) + if (ropt->number_of_threads > 1 && ropt->useDB) + restore_toc_entries_parallel(AH); + else { - AH->currentTE = te; - - /* Work out what, if anything, we want from this entry */ - reqs = _tocEntryRequired(te, ropt, false); - - /* Dump any relevant dump warnings to stderr */ - if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0) - { - if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0) - write_msg(modulename, "warning from original dump file: %s\n", te->defn); - else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0) - write_msg(modulename, "warning from original dump file: %s\n", te->copyStmt); - } - - defnDumped = false; - - if ((reqs & REQ_SCHEMA) != 0) /* We want the schema */ - { - ahlog(AH, 1, "creating %s %s\n", te->desc, te->tag); - - _printTocEntry(AH, te, ropt, false, false); - defnDumped = true; - - /* - * If we could not create a table and --no-data-for-failed-tables - * was given, ignore the corresponding TABLE DATA - */ - if (ropt->noDataForFailedTables && - AH->lastErrorTE == te && - strcmp(te->desc, "TABLE") == 0) - { - TocEntry *tes; - - ahlog(AH, 1, "table \"%s\" could not be created, will not restore its data\n", - te->tag); - - for (tes = te->next; tes != AH->toc; tes = tes->next) - { - if (strcmp(tes->desc, "TABLE DATA") == 0 && - strcmp(tes->tag, te->tag) == 0 && - strcmp(tes->namespace ? tes->namespace : "", - te->namespace ? te->namespace : "") == 0) - { - /* mark it unwanted */ - ropt->idWanted[tes->dumpId - 1] = false; - break; - } - } - } - - /* If we created a DB, connect to it... */ - if (strcmp(te->desc, "DATABASE") == 0) - { - ahlog(AH, 1, "connecting to new database \"%s\"\n", te->tag); - _reconnectToDB(AH, te->tag); - } - } - - /* - * If we have a data component, then process it - */ - if ((reqs & REQ_DATA) != 0) - { - /* - * hadDumper will be set if there is genuine data component for - * this node. Otherwise, we need to check the defn field for - * statements that need to be executed in data-only restores. - */ - if (te->hadDumper) - { - /* - * If we can output the data, then restore it. - */ - if (AH->PrintTocDataPtr !=NULL && (reqs & REQ_DATA) != 0) - { -#ifndef HAVE_LIBZ - if (AH->compression != 0) - die_horribly(AH, modulename, "cannot restore from compressed archive (compression not supported in this installation)\n"); -#endif - - _printTocEntry(AH, te, ropt, true, false); - - if (strcmp(te->desc, "BLOBS") == 0 || - strcmp(te->desc, "BLOB COMMENTS") == 0) - { - ahlog(AH, 1, "restoring %s\n", te->desc); - - _selectOutputSchema(AH, "pg_catalog"); - - (*AH->PrintTocDataPtr) (AH, te, ropt); - } - else - { - _disableTriggersIfNecessary(AH, te, ropt); - - /* Select owner and schema as necessary */ - _becomeOwner(AH, te); - _selectOutputSchema(AH, te->namespace); - - ahlog(AH, 1, "restoring data for table \"%s\"\n", - te->tag); - - /* - * If we have a copy statement, use it. As of V1.3, - * these are separate to allow easy import from - * withing a database connection. Pre 1.3 archives can - * not use DB connections and are sent to output only. - * - * For V1.3+, the table data MUST have a copy - * statement so that we can go into appropriate mode - * with libpq. - */ - if (te->copyStmt && strlen(te->copyStmt) > 0) - { - ahprintf(AH, "%s", te->copyStmt); - AH->writingCopyData = true; - } - - (*AH->PrintTocDataPtr) (AH, te, ropt); - - AH->writingCopyData = false; - - _enableTriggersIfNecessary(AH, te, ropt); - } - } - } - else if (!defnDumped) - { - /* If we haven't already dumped the defn part, do so now */ - ahlog(AH, 1, "executing %s %s\n", te->desc, te->tag); - _printTocEntry(AH, te, ropt, false, false); - } - } - } /* end loop over TOC entries */ + for (te = AH->toc->next; te != AH->toc; te = te->next) + (void) restore_toc_entry(AH, te, ropt, false); + } /* * Scan TOC again to output ownership commands and ACLs @@ -457,6 +408,193 @@ RestoreArchive(Archive *AHX, RestoreOptions *ropt) } } +/* + * Restore a single TOC item. Used in both parallel and non-parallel restore; + * is_parallel is true if we are in a worker child process. + * + * Returns 0 normally, but WORKER_CREATE_DONE or WORKER_INHIBIT_DATA if + * the parallel parent has to make the corresponding status update. + */ +static int +restore_toc_entry(ArchiveHandle *AH, TocEntry *te, + RestoreOptions *ropt, bool is_parallel) +{ + int retval = 0; + teReqs reqs; + bool defnDumped; + + AH->currentTE = te; + + /* Work out what, if anything, we want from this entry */ + reqs = _tocEntryRequired(te, ropt, false); + + /* Dump any relevant dump warnings to stderr */ + if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0) + { + if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0) + write_msg(modulename, "warning from original dump file: %s\n", te->defn); + else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0) + write_msg(modulename, "warning from original dump file: %s\n", te->copyStmt); + } + + defnDumped = false; + + if ((reqs & REQ_SCHEMA) != 0) /* We want the schema */ + { + ahlog(AH, 1, "creating %s %s\n", te->desc, te->tag); + + _printTocEntry(AH, te, ropt, false, false); + defnDumped = true; + + if (strcmp(te->desc, "TABLE") == 0) + { + if (AH->lastErrorTE == te) + { + /* + * We failed to create the table. + * If --no-data-for-failed-tables was given, + * mark the corresponding TABLE DATA to be ignored. + * + * In the parallel case this must be done in the parent, + * so we just set the return value. + */ + if (ropt->noDataForFailedTables) + { + if (is_parallel) + retval = WORKER_INHIBIT_DATA; + else + inhibit_data_for_failed_table(AH, te); + } + } + else + { + /* + * We created the table successfully. Mark the + * corresponding TABLE DATA for possible truncation. + * + * In the parallel case this must be done in the parent, + * so we just set the return value. + */ + if (is_parallel) + retval = WORKER_CREATE_DONE; + else + mark_create_done(AH, te); + } + } + + /* If we created a DB, connect to it... */ + if (strcmp(te->desc, "DATABASE") == 0) + { + ahlog(AH, 1, "connecting to new database \"%s\"\n", te->tag); + _reconnectToDB(AH, te->tag); + } + } + + /* + * If we have a data component, then process it + */ + if ((reqs & REQ_DATA) != 0) + { + /* + * hadDumper will be set if there is genuine data component for + * this node. Otherwise, we need to check the defn field for + * statements that need to be executed in data-only restores. + */ + if (te->hadDumper) + { + /* + * If we can output the data, then restore it. + */ + if (AH->PrintTocDataPtr != NULL && (reqs & REQ_DATA) != 0) + { + _printTocEntry(AH, te, ropt, true, false); + + if (strcmp(te->desc, "BLOBS") == 0 || + strcmp(te->desc, "BLOB COMMENTS") == 0) + { + ahlog(AH, 1, "restoring %s\n", te->desc); + + _selectOutputSchema(AH, "pg_catalog"); + + (*AH->PrintTocDataPtr) (AH, te, ropt); + } + else + { + _disableTriggersIfNecessary(AH, te, ropt); + + /* Select owner and schema as necessary */ + _becomeOwner(AH, te); + _selectOutputSchema(AH, te->namespace); + + ahlog(AH, 1, "restoring data for table \"%s\"\n", + te->tag); + + /* + * In parallel restore, if we created the table earlier + * in the run then we wrap the COPY in a transaction and + * precede it with a TRUNCATE. If archiving is not on + * this prevents WAL-logging the COPY. This obtains a + * speedup similar to that from using single_txn mode + * in non-parallel restores. + */ + if (is_parallel && te->created) + { + /* + * Parallel restore is always talking directly to a + * server, so no need to see if we should issue BEGIN. + */ + StartTransaction(AH); + + /* + * If the server version is >= 8.4, make sure we issue + * TRUNCATE with ONLY so that child tables are not + * wiped. + */ + ahprintf(AH, "TRUNCATE TABLE %s%s;\n\n", + (PQserverVersion(AH->connection) >= 80400 ? + "ONLY " : ""), + fmtId(te->tag)); + } + + /* + * If we have a copy statement, use it. As of V1.3, + * these are separate to allow easy import from + * withing a database connection. Pre 1.3 archives can + * not use DB connections and are sent to output only. + * + * For V1.3+, the table data MUST have a copy + * statement so that we can go into appropriate mode + * with libpq. + */ + if (te->copyStmt && strlen(te->copyStmt) > 0) + { + ahprintf(AH, "%s", te->copyStmt); + AH->writingCopyData = true; + } + + (*AH->PrintTocDataPtr) (AH, te, ropt); + + AH->writingCopyData = false; + + /* close out the transaction started above */ + if (is_parallel && te->created) + CommitTransaction(AH); + + _enableTriggersIfNecessary(AH, te, ropt); + } + } + } + else if (!defnDumped) + { + /* If we haven't already dumped the defn part, do so now */ + ahlog(AH, 1, "executing %s %s\n", te->desc, te->tag); + _printTocEntry(AH, te, ropt, false, false); + } + } + + return retval; +} + /* * Allocate a new RestoreOptions block. * This is mainly so we can initialize it, but also for future expansion, @@ -555,7 +693,8 @@ ArchiveEntry(Archive *AHX, const char *namespace, const char *tablespace, const char *owner, bool withOids, - const char *desc, const char *defn, + const char *desc, teSection section, + const char *defn, const char *dropStmt, const char *copyStmt, const DumpId *deps, int nDeps, DataDumperPtr dumpFn, void *dumpArg) @@ -578,6 +717,7 @@ ArchiveEntry(Archive *AHX, newToc->catalogId = catalogId; newToc->dumpId = dumpId; + newToc->section = section; newToc->tag = strdup(tag); newToc->namespace = namespace ? strdup(namespace) : NULL; @@ -616,7 +756,7 @@ void PrintTOCSummary(Archive *AHX, RestoreOptions *ropt) { ArchiveHandle *AH = (ArchiveHandle *) AHX; - TocEntry *te = AH->toc->next; + TocEntry *te; OutputContext sav; char *fmtName; @@ -655,14 +795,22 @@ PrintTOCSummary(Archive *AHX, RestoreOptions *ropt) ahprintf(AH, ";\n;\n; Selected TOC Entries:\n;\n"); - while (te != AH->toc) + for (te = AH->toc->next; te != AH->toc; te = te->next) { - if (_tocEntryRequired(te, ropt, true) != 0) + if (ropt->verbose || _tocEntryRequired(te, ropt, true) != 0) ahprintf(AH, "%d; %u %u %s %s %s %s\n", te->dumpId, te->catalogId.tableoid, te->catalogId.oid, te->desc, te->namespace ? te->namespace : "-", te->tag, te->owner); - te = te->next; + if (ropt->verbose && te->nDeps > 0) + { + int i; + + ahprintf(AH, ";\tdepends on:"); + for (i = 0; i < te->nDeps; i++) + ahprintf(AH, " %d", te->dependencies[i]); + ahprintf(AH, "\n"); + } } if (ropt->filename) @@ -1316,12 +1464,10 @@ getTocEntryByDumpId(ArchiveHandle *AH, DumpId id) { TocEntry *te; - te = AH->toc->next; - while (te != AH->toc) + for (te = AH->toc->next; te != AH->toc; te = te->next) { if (te->dumpId == id) return te; - te = te->next; } return NULL; } @@ -1696,9 +1842,9 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt, else AH->fSpec = NULL; - AH->currUser = strdup(""); /* So it's valid, but we can free() it later - * if necessary */ - AH->currSchema = strdup(""); /* ditto */ + AH->currUser = NULL; /* unknown */ + AH->currSchema = NULL; /* ditto */ + AH->currTablespace = NULL; /* ditto */ AH->currWithOids = -1; /* force SET */ AH->toc = (TocEntry *) calloc(1, sizeof(TocEntry)); @@ -1734,10 +1880,6 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt, } #endif -#if 0 - write_msg(modulename, "archive format is %d\n", fmt); -#endif - if (fmt == archUnknown) AH->format = _discoverArchiveFormat(AH); else @@ -1772,11 +1914,11 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt, void WriteDataChunks(ArchiveHandle *AH) { - TocEntry *te = AH->toc->next; + TocEntry *te; StartDataPtr startPtr; EndDataPtr endPtr; - while (te != AH->toc) + for (te = AH->toc->next; te != AH->toc; te = te->next) { if (te->dataDumper != NULL) { @@ -1811,7 +1953,6 @@ WriteDataChunks(ArchiveHandle *AH) (*endPtr) (AH, te); AH->currToc = NULL; } - te = te->next; } } @@ -1839,6 +1980,7 @@ WriteToc(ArchiveHandle *AH) WriteStr(AH, te->tag); WriteStr(AH, te->desc); + WriteInt(AH, te->section); WriteStr(AH, te->defn); WriteStr(AH, te->dropStmt); WriteStr(AH, te->copyStmt); @@ -1868,8 +2010,7 @@ ReadToc(ArchiveHandle *AH) DumpId *deps; int depIdx; int depSize; - - TocEntry *te = AH->toc->next; + TocEntry *te; AH->tocCount = ReadInt(AH); AH->maxDumpId = 0; @@ -1904,6 +2045,35 @@ ReadToc(ArchiveHandle *AH) te->tag = ReadStr(AH); te->desc = ReadStr(AH); + + if (AH->version >= K_VERS_1_11) + { + te->section = ReadInt(AH); + } + else + { + /* + * rules for pre-8.4 archives wherein pg_dump hasn't classified + * the entries into sections + */ + if (strcmp(te->desc, "COMMENT") == 0 || + strcmp(te->desc, "ACL") == 0) + te->section = SECTION_NONE; + else if (strcmp(te->desc, "TABLE DATA") == 0 || + strcmp(te->desc, "BLOBS") == 0 || + strcmp(te->desc, "BLOB COMMENTS") == 0) + te->section = SECTION_DATA; + else if (strcmp(te->desc, "CONSTRAINT") == 0 || + strcmp(te->desc, "CHECK CONSTRAINT") == 0 || + strcmp(te->desc, "FK CONSTRAINT") == 0 || + strcmp(te->desc, "INDEX") == 0 || + strcmp(te->desc, "RULE") == 0 || + strcmp(te->desc, "TRIGGER") == 0) + te->section = SECTION_POST_DATA; + else + te->section = SECTION_PRE_DATA; + } + te->defn = ReadStr(AH); te->dropStmt = ReadStr(AH); @@ -2269,13 +2439,15 @@ _reconnectToDB(ArchiveHandle *AH, const char *dbname) */ if (AH->currUser) free(AH->currUser); + AH->currUser = NULL; - AH->currUser = strdup(""); - - /* don't assume we still know the output schema */ + /* don't assume we still know the output schema, tablespace, etc either */ if (AH->currSchema) free(AH->currSchema); - AH->currSchema = strdup(""); + AH->currSchema = NULL; + if (AH->currTablespace) + free(AH->currTablespace); + AH->currTablespace = NULL; AH->currWithOids = -1; /* re-establish fixed state */ @@ -2304,7 +2476,6 @@ _becomeUser(ArchiveHandle *AH, const char *user) */ if (AH->currUser) free(AH->currUser); - AH->currUser = strdup(user); } @@ -2824,15 +2995,13 @@ ReadHead(ArchiveHandle *AH) * checkSeek * check to see if fseek can be performed. */ - bool checkSeek(FILE *fp) { - if (fseeko(fp, 0, SEEK_CUR) != 0) return false; else if (sizeof(pgoff_t) > sizeof(long)) - + { /* * At this point, pgoff_t is too large for long, so we return based on * whether an pgoff_t version of fseek is available. @@ -2842,6 +3011,7 @@ checkSeek(FILE *fp) #else return false; #endif + } else return true; } @@ -2870,3 +3040,855 @@ dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim) localtime(&tim)) != 0) ahprintf(AH, "-- %s %s\n\n", msg, buf); } + + +/* + * Main engine for parallel restore. + * + * Work is done in three phases. + * First we process tocEntries until we come to one that is marked + * SECTION_DATA or SECTION_POST_DATA, in a single connection, just as for a + * standard restore. Second we process the remaining non-ACL steps in + * parallel worker children (threads on Windows, processes on Unix), each of + * which connects separately to the database. Finally we process all the ACL + * entries in a single connection (that happens back in RestoreArchive). + */ +static void +restore_toc_entries_parallel(ArchiveHandle *AH) +{ + RestoreOptions *ropt = AH->ropt; + int n_slots = ropt->number_of_threads; + ParallelSlot *slots; + int work_status; + int next_slot; + TocEntry *first_unprocessed = AH->toc->next; + TocEntry *next_work_item; + thandle ret_child; + TocEntry *te; + + ahlog(AH,2,"entering restore_toc_entries_parallel\n"); + + /* we haven't got round to making this work for all archive formats */ + if (AH->ClonePtr == NULL || AH->ReopenPtr == NULL) + die_horribly(AH, modulename, "parallel restore is not supported with this archive file format\n"); + + slots = (ParallelSlot *) calloc(sizeof(ParallelSlot), n_slots); + + /* Adjust dependency information */ + fix_dependencies(AH); + + /* + * Do all the early stuff in a single connection in the parent. + * There's no great point in running it in parallel, in fact it will + * actually run faster in a single connection because we avoid all the + * connection and setup overhead. + */ + while ((next_work_item = get_next_work_item(AH, &first_unprocessed, + NULL, 0)) != NULL) + { + if (next_work_item->section == SECTION_DATA || + next_work_item->section == SECTION_POST_DATA) + break; + + ahlog(AH, 1, "processing item %d %s %s\n", + next_work_item->dumpId, + next_work_item->desc, next_work_item->tag); + + (void) restore_toc_entry(AH, next_work_item, ropt, false); + + next_work_item->restored = true; + reduce_dependencies(AH, next_work_item); + } + + /* + * Now close parent connection in prep for parallel steps. We do this + * mainly to ensure that we don't exceed the specified number of parallel + * connections. + */ + PQfinish(AH->connection); + AH->connection = NULL; + + /* blow away any transient state from the old connection */ + if (AH->currUser) + free(AH->currUser); + AH->currUser = NULL; + if (AH->currSchema) + free(AH->currSchema); + AH->currSchema = NULL; + if (AH->currTablespace) + free(AH->currTablespace); + AH->currTablespace = NULL; + AH->currWithOids = -1; + + /* + * main parent loop + * + * Keep going until there is no worker still running AND there is no work + * left to be done. + */ + + ahlog(AH,1,"entering main parallel loop\n"); + + while ((next_work_item = get_next_work_item(AH, &first_unprocessed, + slots, n_slots)) != NULL || + work_in_progress(slots, n_slots)) + { + if (next_work_item != NULL) + { + teReqs reqs; + + /* If not to be dumped, don't waste time launching a worker */ + reqs = _tocEntryRequired(next_work_item, AH->ropt, false); + if ((reqs & (REQ_SCHEMA | REQ_DATA)) == 0) + { + ahlog(AH, 1, "skipping item %d %s %s\n", + next_work_item->dumpId, + next_work_item->desc, next_work_item->tag); + + next_work_item->restored = true; + reduce_dependencies(AH, next_work_item); + + continue; + } + + if ((next_slot = get_next_slot(slots, n_slots)) != NO_SLOT) + { + /* There is work still to do and a worker slot available */ + thandle child; + RestoreArgs *args; + + ahlog(AH, 1, "launching item %d %s %s\n", + next_work_item->dumpId, + next_work_item->desc, next_work_item->tag); + + next_work_item->restored = true; + + /* this memory is dealloced in mark_work_done() */ + args = malloc(sizeof(RestoreArgs)); + args->AH = CloneArchive(AH); + args->te = next_work_item; + + /* run the step in a worker child */ + child = spawn_restore(args); + + slots[next_slot].child_id = child; + slots[next_slot].args = args; + + continue; + } + } + + /* + * If we get here there must be work being done. Either there is no + * work available to schedule (and work_in_progress returned true) or + * there are no slots available. So we wait for a worker to finish, + * and process the result. + */ + ret_child = reap_child(slots, n_slots, &work_status); + + if (WIFEXITED(work_status)) + { + mark_work_done(AH, ret_child, WEXITSTATUS(work_status), + slots, n_slots); + } + else + { + die_horribly(AH, modulename, "worker process crashed: status %d\n", + work_status); + } + } + + ahlog(AH,1,"finished main parallel loop\n"); + + /* + * Now reconnect the single parent connection. + */ + ConnectDatabase((Archive *) AH, ropt->dbname, + ropt->pghost, ropt->pgport, ropt->username, + ropt->requirePassword); + + _doSetFixedOutputState(AH); + + /* + * Make sure there is no non-ACL work left due to, say, + * circular dependencies, or some other pathological condition. + * If so, do it in the single parent connection. + */ + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + if (!te->restored) + { + ahlog(AH, 1, "processing missed item %d %s %s\n", + te->dumpId, te->desc, te->tag); + (void) restore_toc_entry(AH, te, ropt, false); + } + } + + /* The ACLs will be handled back in RestoreArchive. */ +} + +/* + * create a worker child to perform a restore step in parallel + */ +static thandle +spawn_restore(RestoreArgs *args) +{ + thandle child; + + /* Ensure stdio state is quiesced before forking */ + fflush(NULL); + +#ifndef WIN32 + child = fork(); + if (child == 0) + { + /* in child process */ + parallel_restore(args); + die_horribly(args->AH, modulename, + "parallel_restore should not return\n"); + } + else if (child < 0) + { + /* fork failed */ + die_horribly(args->AH, modulename, + "could not create worker process: %s\n", + strerror(errno)); + } +#else + child = (HANDLE) _beginthreadex(NULL, 0, (void *) parallel_restore, + args, 0, NULL); + if (child == 0) + die_horribly(args->AH, modulename, + "could not create worker thread: %s\n", + strerror(errno)); +#endif + + return child; +} + +/* + * collect status from a completed worker child + */ +static thandle +reap_child(ParallelSlot *slots, int n_slots, int *work_status) +{ +#ifndef WIN32 + /* Unix is so much easier ... */ + return wait(work_status); +#else + static HANDLE *handles = NULL; + int hindex, snum, tnum; + thandle ret_child; + DWORD res; + + /* first time around only, make space for handles to listen on */ + if (handles == NULL) + handles = (HANDLE *) calloc(sizeof(HANDLE),n_slots); + + /* set up list of handles to listen to */ + for (snum=0, tnum=0; snum < n_slots; snum++) + if (slots[snum].child_id != 0) + handles[tnum++] = slots[snum].child_id; + + /* wait for one to finish */ + hindex = WaitForMultipleObjects(tnum, handles, false, INFINITE); + + /* get handle of finished thread */ + ret_child = handles[hindex - WAIT_OBJECT_0]; + + /* get the result */ + GetExitCodeThread(ret_child,&res); + *work_status = res; + + /* dispose of handle to stop leaks */ + CloseHandle(ret_child); + + return ret_child; +#endif +} + +/* + * are we doing anything now? + */ +static bool +work_in_progress(ParallelSlot *slots, int n_slots) +{ + int i; + + for (i = 0; i < n_slots; i++) + { + if (slots[i].child_id != 0) + return true; + } + return false; +} + +/* + * find the first free parallel slot (if any). + */ +static int +get_next_slot(ParallelSlot *slots, int n_slots) +{ + int i; + + for (i = 0; i < n_slots; i++) + { + if (slots[i].child_id == 0) + return i; + } + return NO_SLOT; +} + +/* + * Find the next work item (if any) that is capable of being run now. + * + * To qualify, the item must have no remaining dependencies + * and no requirement for locks that is incompatible with + * items currently running. + * + * first_unprocessed is state data that tracks the location of the first + * TocEntry that's not marked 'restored'. This avoids O(N^2) search time + * with long TOC lists. (Even though the constant is pretty small, it'd + * get us eventually.) + * + * pref_non_data is for an alternative selection algorithm that gives + * preference to non-data items if there is already a data load running. + * It is currently disabled. + */ +static TocEntry * +get_next_work_item(ArchiveHandle *AH, TocEntry **first_unprocessed, + ParallelSlot *slots, int n_slots) +{ + bool pref_non_data = false; /* or get from AH->ropt */ + TocEntry *data_te = NULL; + TocEntry *te; + int i,j,k; + + /* + * Bogus heuristics for pref_non_data + */ + if (pref_non_data) + { + int count = 0; + + for (k=0; k < n_slots; k++) + if (slots[k].args->te != NULL && + slots[k].args->te->section == SECTION_DATA) + count++; + if (n_slots == 0 || count * 4 < n_slots) + pref_non_data = false; + } + + /* + * Advance first_unprocessed if possible. + */ + for (te = *first_unprocessed; te != AH->toc; te = te->next) + { + if (!te->restored) + break; + } + *first_unprocessed = te; + + /* + * Search from first_unprocessed until we find an available item. + */ + for (; te != AH->toc; te = te->next) + { + bool conflicts = false; + + /* Ignore if already done or still waiting on dependencies */ + if (te->restored || te->depCount > 0) + continue; + + /* + * Check to see if the item would need exclusive lock on something + * that a currently running item also needs lock on. If so, we + * don't want to schedule them together. + */ + for (i = 0; i < n_slots && !conflicts; i++) + { + TocEntry *running_te; + + if (slots[i].args == NULL) + continue; + running_te = slots[i].args->te; + for (j = 0; j < te->nLockDeps && !conflicts; j++) + { + for (k = 0; k < running_te->nLockDeps; k++) + { + if (te->lockDeps[j] == running_te->lockDeps[k]) + { + conflicts = true; + break; + } + } + } + } + + if (conflicts) + continue; + + if (pref_non_data && te->section == SECTION_DATA) + { + if (data_te == NULL) + data_te = te; + continue; + } + + /* passed all tests, so this item can run */ + return te; + } + + if (data_te != NULL) + return data_te; + + ahlog(AH,2,"no item ready\n"); + return NULL; +} + + +/* + * Restore a single TOC item in parallel with others + * + * this is the procedure run as a thread (Windows) or a + * separate process (everything else). + */ +static parallel_restore_result +parallel_restore(RestoreArgs *args) +{ + ArchiveHandle *AH = args->AH; + TocEntry *te = args->te; + RestoreOptions *ropt = AH->ropt; + int retval; + + /* + * Close and reopen the input file so we have a private file pointer + * that doesn't stomp on anyone else's file pointer. + * + * Note: on Windows, since we are using threads not processes, this + * *doesn't* close the original file pointer but just open a new one. + */ + (AH->ReopenPtr) (AH); + + /* + * We need our own database connection, too + */ + ConnectDatabase((Archive *) AH, ropt->dbname, + ropt->pghost, ropt->pgport, ropt->username, + ropt->requirePassword); + + _doSetFixedOutputState(AH); + + /* Restore the TOC item */ + retval = restore_toc_entry(AH, te, ropt, true); + + /* And clean up */ + PQfinish(AH->connection); + AH->connection = NULL; + + (AH->ClosePtr) (AH); + + if (retval == 0 && AH->public.n_errors) + retval = WORKER_IGNORED_ERRORS; + +#ifndef WIN32 + exit(retval); +#else + return retval; +#endif +} + + +/* + * Housekeeping to be done after a step has been parallel restored. + * + * Clear the appropriate slot, free all the extra memory we allocated, + * update status, and reduce the dependency count of any dependent items. + */ +static void +mark_work_done(ArchiveHandle *AH, thandle worker, int status, + ParallelSlot *slots, int n_slots) +{ + TocEntry *te = NULL; + int i; + + for (i = 0; i < n_slots; i++) + { + if (slots[i].child_id == worker) + { + slots[i].child_id = 0; + te = slots[i].args->te; + DeCloneArchive(slots[i].args->AH); + free(slots[i].args); + slots[i].args = NULL; + + break; + } + } + + if (te == NULL) + die_horribly(AH, modulename, "failed to find slot for finished worker\n"); + + ahlog(AH, 1, "finished item %d %s %s\n", + te->dumpId, te->desc, te->tag); + + if (status == WORKER_CREATE_DONE) + mark_create_done(AH, te); + else if (status == WORKER_INHIBIT_DATA) + { + inhibit_data_for_failed_table(AH, te); + AH->public.n_errors++; + } + else if (status == WORKER_IGNORED_ERRORS) + AH->public.n_errors++; + else if (status != 0) + die_horribly(AH, modulename, "worker process failed: exit code %d\n", + status); + + reduce_dependencies(AH, te); +} + + +/* + * Process the dependency information into a form useful for parallel restore. + * + * We set up depCount fields that are the number of as-yet-unprocessed + * dependencies for each TOC entry. + * + * We also identify locking dependencies so that we can avoid trying to + * schedule conflicting items at the same time. + */ +static void +fix_dependencies(ArchiveHandle *AH) +{ + TocEntry **tocsByDumpId; + TocEntry *te; + int i; + + /* + * For some of the steps here, it is convenient to have an array that + * indexes the TOC entries by dump ID, rather than searching the TOC + * list repeatedly. Entries for dump IDs not present in the TOC will + * be NULL. + * + * Also, initialize the depCount fields. + */ + tocsByDumpId = (TocEntry **) calloc(AH->maxDumpId, sizeof(TocEntry *)); + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + tocsByDumpId[te->dumpId - 1] = te; + te->depCount = te->nDeps; + } + + /* + * POST_DATA items that are shown as depending on a table need to be + * re-pointed to depend on that table's data, instead. This ensures they + * won't get scheduled until the data has been loaded. We handle this by + * first finding TABLE/TABLE DATA pairs and then scanning all the + * dependencies. + * + * Note: currently, a TABLE DATA should always have exactly one + * dependency, on its TABLE item. So we don't bother to search, + * but look just at the first dependency. We do trouble to make sure + * that it's a TABLE, if possible. However, if the dependency isn't + * in the archive then just assume it was a TABLE; this is to cover + * cases where the table was suppressed but we have the data and some + * dependent post-data items. + */ + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + if (strcmp(te->desc, "TABLE DATA") == 0 && te->nDeps > 0) + { + DumpId tableId = te->dependencies[0]; + + if (tocsByDumpId[tableId - 1] == NULL || + strcmp(tocsByDumpId[tableId - 1]->desc, "TABLE") == 0) + { + repoint_table_dependencies(AH, tableId, te->dumpId); + } + } + } + + /* + * Pre-8.4 versions of pg_dump neglected to set up a dependency from + * BLOB COMMENTS to BLOBS. Cope. (We assume there's only one BLOBS + * and only one BLOB COMMENTS in such files.) + */ + if (AH->version < K_VERS_1_11) + { + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + if (strcmp(te->desc, "BLOB COMMENTS") == 0 && te->nDeps == 0) + { + TocEntry *te2; + + for (te2 = AH->toc->next; te2 != AH->toc; te2 = te2->next) + { + if (strcmp(te2->desc, "BLOBS") == 0) + { + te->dependencies = (DumpId *) malloc(sizeof(DumpId)); + te->dependencies[0] = te2->dumpId; + te->nDeps++; + te->depCount++; + break; + } + } + break; + } + } + } + + /* + * It is possible that the dependencies list items that are not in the + * archive at all. Subtract such items from the depCounts. + */ + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + for (i = 0; i < te->nDeps; i++) + { + if (tocsByDumpId[te->dependencies[i] - 1] == NULL) + te->depCount--; + } + } + + /* + * Lastly, work out the locking dependencies. + */ + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + te->lockDeps = NULL; + te->nLockDeps = 0; + identify_locking_dependencies(te, tocsByDumpId); + } + + free(tocsByDumpId); +} + +/* + * Change dependencies on tableId to depend on tableDataId instead, + * but only in POST_DATA items. + */ +static void +repoint_table_dependencies(ArchiveHandle *AH, + DumpId tableId, DumpId tableDataId) +{ + TocEntry *te; + int i; + + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + if (te->section != SECTION_POST_DATA) + continue; + for (i = 0; i < te->nDeps; i++) + { + if (te->dependencies[i] == tableId) + { + te->dependencies[i] = tableDataId; + ahlog(AH, 2, "transferring dependency %d -> %d to %d\n", + te->dumpId, tableId, tableDataId); + } + } + } +} + +/* + * Identify which objects we'll need exclusive lock on in order to restore + * the given TOC entry (*other* than the one identified by the TOC entry + * itself). Record their dump IDs in the entry's lockDeps[] array. + * tocsByDumpId[] is a convenience array to avoid searching the TOC + * for each dependency. + */ +static void +identify_locking_dependencies(TocEntry *te, TocEntry **tocsByDumpId) +{ + DumpId *lockids; + int nlockids; + int i; + + /* Quick exit if no dependencies at all */ + if (te->nDeps == 0) + return; + + /* Exit if this entry doesn't need exclusive lock on other objects */ + if (!(strcmp(te->desc, "CONSTRAINT") == 0 || + strcmp(te->desc, "CHECK CONSTRAINT") == 0 || + strcmp(te->desc, "FK CONSTRAINT") == 0 || + strcmp(te->desc, "RULE") == 0 || + strcmp(te->desc, "TRIGGER") == 0)) + return; + + /* + * We assume the item requires exclusive lock on each TABLE item + * listed among its dependencies. + */ + lockids = (DumpId *) malloc(te->nDeps * sizeof(DumpId)); + nlockids = 0; + for (i = 0; i < te->nDeps; i++) + { + DumpId depid = te->dependencies[i]; + + if (tocsByDumpId[depid - 1] && + strcmp(tocsByDumpId[depid - 1]->desc, "TABLE") == 0) + lockids[nlockids++] = depid; + } + + if (nlockids == 0) + { + free(lockids); + return; + } + + te->lockDeps = realloc(lockids, nlockids * sizeof(DumpId)); + te->nLockDeps = nlockids; +} + +/* + * Remove the specified TOC entry from the depCounts of items that depend on + * it, thereby possibly making them ready-to-run. + */ +static void +reduce_dependencies(ArchiveHandle *AH, TocEntry *te) +{ + DumpId target = te->dumpId; + int i; + + ahlog(AH,2,"reducing dependencies for %d\n",target); + + /* + * We must examine all entries, not only the ones after the target item, + * because if the user used a -L switch then the original dependency- + * respecting order has been destroyed by SortTocFromFile. + */ + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + for (i = 0; i < te->nDeps; i++) + { + if (te->dependencies[i] == target) + te->depCount--; + } + } +} + +/* + * Set the created flag on the DATA member corresponding to the given + * TABLE member + */ +static void +mark_create_done(ArchiveHandle *AH, TocEntry *te) +{ + TocEntry *tes; + + for (tes = AH->toc->next; tes != AH->toc; tes = tes->next) + { + if (strcmp(tes->desc, "TABLE DATA") == 0 && + strcmp(tes->tag, te->tag) == 0 && + strcmp(tes->namespace ? tes->namespace : "", + te->namespace ? te->namespace : "") == 0) + { + tes->created = true; + break; + } + } +} + +/* + * Mark the DATA member corresponding to the given TABLE member + * as not wanted + */ +static void +inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te) +{ + RestoreOptions *ropt = AH->ropt; + TocEntry *tes; + + ahlog(AH, 1, "table \"%s\" could not be created, will not restore its data\n", + te->tag); + + for (tes = AH->toc->next; tes != AH->toc; tes = tes->next) + { + if (strcmp(tes->desc, "TABLE DATA") == 0 && + strcmp(tes->tag, te->tag) == 0 && + strcmp(tes->namespace ? tes->namespace : "", + te->namespace ? te->namespace : "") == 0) + { + /* mark it unwanted; we assume idWanted array already exists */ + ropt->idWanted[tes->dumpId - 1] = false; + break; + } + } +} + + +/* + * Clone and de-clone routines used in parallel restoration. + * + * Enough of the structure is cloned to ensure that there is no + * conflict between different threads each with their own clone. + * + * These could be public, but no need at present. + */ +static ArchiveHandle * +CloneArchive(ArchiveHandle *AH) +{ + ArchiveHandle *clone; + + /* Make a "flat" copy */ + clone = (ArchiveHandle *) malloc(sizeof(ArchiveHandle)); + if (clone == NULL) + die_horribly(AH, modulename, "out of memory\n"); + memcpy(clone, AH, sizeof(ArchiveHandle)); + + /* Handle format-independent fields */ + clone->pgCopyBuf = createPQExpBuffer(); + clone->sqlBuf = createPQExpBuffer(); + clone->sqlparse.tagBuf = NULL; + + /* The clone will have its own connection, so disregard connection state */ + clone->connection = NULL; + clone->currUser = NULL; + clone->currSchema = NULL; + clone->currTablespace = NULL; + clone->currWithOids = -1; + + /* savedPassword must be local in case we change it while connecting */ + if (clone->savedPassword) + clone->savedPassword = strdup(clone->savedPassword); + + /* clone has its own error count, too */ + clone->public.n_errors = 0; + + /* Let the format-specific code have a chance too */ + (clone->ClonePtr) (clone); + + return clone; +} + +/* + * Release clone-local storage. + * + * Note: we assume any clone-local connection was already closed. + */ +static void +DeCloneArchive(ArchiveHandle *AH) +{ + /* Clear format-specific state */ + (AH->DeClonePtr) (AH); + + /* Clear state allocated by CloneArchive */ + destroyPQExpBuffer(AH->pgCopyBuf); + destroyPQExpBuffer(AH->sqlBuf); + if (AH->sqlparse.tagBuf) + destroyPQExpBuffer(AH->sqlparse.tagBuf); + + /* Clear any connection-local state */ + if (AH->currUser) + free(AH->currUser); + if (AH->currSchema) + free(AH->currSchema); + if (AH->currTablespace) + free(AH->currTablespace); + if (AH->savedPassword) + free(AH->savedPassword); + + free(AH); +} diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h index d8520ec9d6..43756eba4d 100644 --- a/src/bin/pg_dump/pg_backup_archiver.h +++ b/src/bin/pg_dump/pg_backup_archiver.h @@ -17,7 +17,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.h,v 1.76 2007/11/07 12:24:24 petere Exp $ + * $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.h,v 1.77 2009/02/02 20:07:37 adunstan Exp $ * *------------------------------------------------------------------------- */ @@ -62,7 +62,7 @@ typedef z_stream *z_streamp; #endif #define K_VERS_MAJOR 1 -#define K_VERS_MINOR 10 +#define K_VERS_MINOR 11 #define K_VERS_REV 0 /* Data block types */ @@ -85,8 +85,9 @@ typedef z_stream *z_streamp; #define K_VERS_1_9 (( (1 * 256 + 9) * 256 + 0) * 256 + 0) /* add default_with_oids * tracking */ #define K_VERS_1_10 (( (1 * 256 + 10) * 256 + 0) * 256 + 0) /* add tablespace */ +#define K_VERS_1_11 (( (1 * 256 + 11) * 256 + 0) * 256 + 0) /* add toc section indicator */ -#define K_VERS_MAX (( (1 * 256 + 10) * 256 + 255) * 256 + 0) +#define K_VERS_MAX (( (1 * 256 + 11) * 256 + 255) * 256 + 0) /* Flags to indicate disposition of offsets stored in files */ @@ -99,6 +100,7 @@ struct _tocEntry; struct _restoreList; typedef void (*ClosePtr) (struct _archiveHandle * AH); +typedef void (*ReopenPtr) (struct _archiveHandle * AH); typedef void (*ArchiveEntryPtr) (struct _archiveHandle * AH, struct _tocEntry * te); typedef void (*StartDataPtr) (struct _archiveHandle * AH, struct _tocEntry * te); @@ -120,6 +122,9 @@ typedef void (*ReadExtraTocPtr) (struct _archiveHandle * AH, struct _tocEntry * typedef void (*PrintExtraTocPtr) (struct _archiveHandle * AH, struct _tocEntry * te); typedef void (*PrintTocDataPtr) (struct _archiveHandle * AH, struct _tocEntry * te, RestoreOptions *ropt); +typedef void (*ClonePtr) (struct _archiveHandle * AH); +typedef void (*DeClonePtr) (struct _archiveHandle * AH); + typedef size_t (*CustomOutPtr) (struct _archiveHandle * AH, const void *buf, size_t len); typedef struct _outputContext @@ -212,6 +217,7 @@ typedef struct _archiveHandle WriteBufPtr WriteBufPtr; /* Write a buffer of output to the archive */ ReadBufPtr ReadBufPtr; /* Read a buffer of input from the archive */ ClosePtr ClosePtr; /* Close the archive */ + ReopenPtr ReopenPtr; /* Reopen the archive */ WriteExtraTocPtr WriteExtraTocPtr; /* Write extra TOC entry data * associated with the current archive * format */ @@ -225,11 +231,15 @@ typedef struct _archiveHandle StartBlobPtr StartBlobPtr; EndBlobPtr EndBlobPtr; + ClonePtr ClonePtr; /* Clone format-specific fields */ + DeClonePtr DeClonePtr; /* Clean up cloned fields */ + CustomOutPtr CustomOutPtr; /* Alternative script output routine */ /* Stuff for direct DB connection */ char *archdbname; /* DB name *read* from archive */ bool requirePassword; + char *savedPassword; /* password for ropt->username, if known */ PGconn *connection; int connectToDB; /* Flag to indicate if direct DB connection is * required */ @@ -260,9 +270,9 @@ typedef struct _archiveHandle * etc */ /* these vars track state to avoid sending redundant SET commands */ - char *currUser; /* current username */ - char *currSchema; /* current schema */ - char *currTablespace; /* current tablespace */ + char *currUser; /* current username, or NULL if unknown */ + char *currSchema; /* current schema, or NULL */ + char *currTablespace; /* current tablespace, or NULL */ bool currWithOids; /* current default_with_oids setting */ void *lo_buf; @@ -282,6 +292,7 @@ typedef struct _tocEntry struct _tocEntry *next; CatalogId catalogId; DumpId dumpId; + teSection section; bool hadDumper; /* Archiver was passed a dumper routine (used * in restore) */ char *tag; /* index tag */ @@ -300,6 +311,13 @@ typedef struct _tocEntry DataDumperPtr dataDumper; /* Routine to dump data for object */ void *dataDumperArg; /* Arg for above routine */ void *formatData; /* TOC Entry data specific to file format */ + + /* working state (needed only for parallel restore) */ + bool restored; /* item is in progress or done */ + bool created; /* set for DATA member if TABLE was created */ + int depCount; /* number of dependencies not yet restored */ + DumpId *lockDeps; /* dumpIds of objects this one needs lock on */ + int nLockDeps; /* number of such dependencies */ } TocEntry; /* Used everywhere */ diff --git a/src/bin/pg_dump/pg_backup_custom.c b/src/bin/pg_dump/pg_backup_custom.c index 5791ec7812..ebe2d6ce18 100644 --- a/src/bin/pg_dump/pg_backup_custom.c +++ b/src/bin/pg_dump/pg_backup_custom.c @@ -19,7 +19,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_custom.c,v 1.40 2007/10/28 21:55:52 tgl Exp $ + * $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_custom.c,v 1.41 2009/02/02 20:07:37 adunstan Exp $ * *------------------------------------------------------------------------- */ @@ -40,6 +40,7 @@ static int _ReadByte(ArchiveHandle *); static size_t _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len); static size_t _ReadBuf(ArchiveHandle *AH, void *buf, size_t len); static void _CloseArchive(ArchiveHandle *AH); +static void _ReopenArchive(ArchiveHandle *AH); static void _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt); static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te); static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te); @@ -54,6 +55,8 @@ static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid); static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid); static void _EndBlobs(ArchiveHandle *AH, TocEntry *te); static void _LoadBlobs(ArchiveHandle *AH); +static void _Clone(ArchiveHandle *AH); +static void _DeClone(ArchiveHandle *AH); /*------------ * Buffers used in zlib compression and extra data stored in archive and @@ -120,6 +123,7 @@ InitArchiveFmt_Custom(ArchiveHandle *AH) AH->WriteBufPtr = _WriteBuf; AH->ReadBufPtr = _ReadBuf; AH->ClosePtr = _CloseArchive; + AH->ReopenPtr = _ReopenArchive; AH->PrintTocDataPtr = _PrintTocData; AH->ReadExtraTocPtr = _ReadExtraToc; AH->WriteExtraTocPtr = _WriteExtraToc; @@ -129,6 +133,8 @@ InitArchiveFmt_Custom(ArchiveHandle *AH) AH->StartBlobPtr = _StartBlob; AH->EndBlobPtr = _EndBlob; AH->EndBlobsPtr = _EndBlobs; + AH->ClonePtr = _Clone; + AH->DeClonePtr = _DeClone; /* * Set up some special context used in compressing data. @@ -569,7 +575,6 @@ _PrintData(ArchiveHandle *AH) zp->avail_in = blkLen; #ifdef HAVE_LIBZ - if (AH->compression != 0) { while (zp->avail_in != 0) @@ -585,15 +590,12 @@ _PrintData(ArchiveHandle *AH) } } else - { #endif + { in[zp->avail_in] = '\0'; ahwrite(in, 1, zp->avail_in, AH); zp->avail_in = 0; - -#ifdef HAVE_LIBZ } -#endif blkLen = ReadInt(AH); } @@ -822,11 +824,9 @@ _CloseArchive(ArchiveHandle *AH) * expect to be doing seeks to read the data back - it may be ok to * just use the existing self-consistent block formatting. */ - if (ctx->hasSeek) - { - fseeko(AH->FH, tpos, SEEK_SET); + if (ctx->hasSeek && + fseeko(AH->FH, tpos, SEEK_SET) == 0) WriteToc(AH); - } } if (fclose(AH->FH) != 0) @@ -835,6 +835,48 @@ _CloseArchive(ArchiveHandle *AH) AH->FH = NULL; } +/* + * Reopen the archive's file handle. + * + * We close the original file handle, except on Windows. (The difference + * is because on Windows, this is used within a multithreading context, + * and we don't want a thread closing the parent file handle.) + */ +static void +_ReopenArchive(ArchiveHandle *AH) +{ + lclContext *ctx = (lclContext *) AH->formatData; + pgoff_t tpos; + + if (AH->mode == archModeWrite) + die_horribly(AH,modulename,"can only reopen input archives\n"); + if (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0) + die_horribly(AH,modulename,"cannot reopen stdin\n"); + if (!ctx->hasSeek) + die_horribly(AH,modulename,"cannot reopen non-seekable file\n"); + + errno = 0; + tpos = ftello(AH->FH); + if (errno) + die_horribly(AH, modulename, "could not determine seek position in archive file: %s\n", + strerror(errno)); + +#ifndef WIN32 + if (fclose(AH->FH) != 0) + die_horribly(AH, modulename, "could not close archive file: %s\n", + strerror(errno)); +#endif + + AH->FH = fopen(AH->fSpec, PG_BINARY_R); + if (!AH->FH) + die_horribly(AH, modulename, "could not open input file \"%s\": %s\n", + AH->fSpec, strerror(errno)); + + if (fseeko(AH->FH, tpos, SEEK_SET) != 0) + die_horribly(AH, modulename, "could not set seek position in archive file: %s\n", + strerror(errno)); +} + /*-------------------------------------------------- * END OF FORMAT CALLBACKS *-------------------------------------------------- @@ -990,7 +1032,6 @@ _DoDeflate(ArchiveHandle *AH, lclContext *ctx, int flush) /* * Terminate zlib context and flush it's buffers. If no zlib * then just return. - * */ static void _EndDataCompressor(ArchiveHandle *AH, TocEntry *te) @@ -1020,3 +1061,44 @@ _EndDataCompressor(ArchiveHandle *AH, TocEntry *te) /* Send the end marker */ WriteInt(AH, 0); } + + +/* + * Clone format-specific fields during parallel restoration. + */ +static void +_Clone(ArchiveHandle *AH) +{ + lclContext *ctx = (lclContext *) AH->formatData; + + AH->formatData = (lclContext *) malloc(sizeof(lclContext)); + if (AH->formatData == NULL) + die_horribly(AH, modulename, "out of memory\n"); + memcpy(AH->formatData, ctx, sizeof(lclContext)); + ctx = (lclContext *) AH->formatData; + + ctx->zp = (z_streamp) malloc(sizeof(z_stream)); + ctx->zlibOut = (char *) malloc(zlibOutSize + 1); + ctx->zlibIn = (char *) malloc(ctx->inSize); + + if (ctx->zp == NULL || ctx->zlibOut == NULL || ctx->zlibIn == NULL) + die_horribly(AH, modulename, "out of memory\n"); + + /* + * Note: we do not make a local lo_buf because we expect at most one + * BLOBS entry per archive, so no parallelism is possible. Likewise, + * TOC-entry-local state isn't an issue because any one TOC entry is + * touched by just one worker child. + */ +} + +static void +_DeClone(ArchiveHandle *AH) +{ + lclContext *ctx = (lclContext *) AH->formatData; + + free(ctx->zlibOut); + free(ctx->zlibIn); + free(ctx->zp); + free(ctx); +} diff --git a/src/bin/pg_dump/pg_backup_db.c b/src/bin/pg_dump/pg_backup_db.c index 0818b353e4..89fbabdfe6 100644 --- a/src/bin/pg_dump/pg_backup_db.c +++ b/src/bin/pg_dump/pg_backup_db.c @@ -5,7 +5,7 @@ * Implements the basic DB functions used by the archiver. * * IDENTIFICATION - * $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_db.c,v 1.80 2008/08/16 02:25:06 tgl Exp $ + * $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_db.c,v 1.81 2009/02/02 20:07:37 adunstan Exp $ * *------------------------------------------------------------------------- */ @@ -116,29 +116,36 @@ ReconnectToServer(ArchiveHandle *AH, const char *dbname, const char *username) /* * Connect to the db again. + * + * Note: it's not really all that sensible to use a single-entry password + * cache if the username keeps changing. In current usage, however, the + * username never does change, so one savedPassword is sufficient. We do + * update the cache on the off chance that the password has changed since the + * start of the run. */ static PGconn * _connectDB(ArchiveHandle *AH, const char *reqdb, const char *requser) { PGconn *newConn; - char *newdb; - char *newuser; - char *password = NULL; + const char *newdb; + const char *newuser; + char *password = AH->savedPassword; bool new_pass; if (!reqdb) newdb = PQdb(AH->connection); else - newdb = (char *) reqdb; + newdb = reqdb; - if (!requser || (strlen(requser) == 0)) + if (!requser || strlen(requser) == 0) newuser = PQuser(AH->connection); else - newuser = (char *) requser; + newuser = requser; - ahlog(AH, 1, "connecting to database \"%s\" as user \"%s\"\n", newdb, newuser); + ahlog(AH, 1, "connecting to database \"%s\" as user \"%s\"\n", + newdb, newuser); - if (AH->requirePassword) + if (AH->requirePassword && password == NULL) { password = simple_prompt("Password: ", 100, false); if (password == NULL) @@ -170,12 +177,13 @@ _connectDB(ArchiveHandle *AH, const char *reqdb, const char *requser) if (password) free(password); password = simple_prompt("Password: ", 100, false); + if (password == NULL) + die_horribly(AH, modulename, "out of memory\n"); new_pass = true; } } while (new_pass); - if (password) - free(password); + AH->savedPassword = password; /* check for version mismatch */ _check_database_version(AH); @@ -190,6 +198,10 @@ _connectDB(ArchiveHandle *AH, const char *reqdb, const char *requser) * Make a database connection with the given parameters. The * connection handle is returned, the parameters are stored in AHX. * An interactive password prompt is automatically issued if required. + * + * Note: it's not really all that sensible to use a single-entry password + * cache if the username keeps changing. In current usage, however, the + * username never does change, so one savedPassword is sufficient. */ PGconn * ConnectDatabase(Archive *AHX, @@ -200,21 +212,19 @@ ConnectDatabase(Archive *AHX, int reqPwd) { ArchiveHandle *AH = (ArchiveHandle *) AHX; - char *password = NULL; + char *password = AH->savedPassword; bool new_pass; if (AH->connection) die_horribly(AH, modulename, "already connected to a database\n"); - if (reqPwd) + if (reqPwd && password == NULL) { password = simple_prompt("Password: ", 100, false); if (password == NULL) die_horribly(AH, modulename, "out of memory\n"); - AH->requirePassword = true; } - else - AH->requirePassword = false; + AH->requirePassword = reqPwd; /* * Start the connection. Loop until we have a password if requested by @@ -236,12 +246,13 @@ ConnectDatabase(Archive *AHX, { PQfinish(AH->connection); password = simple_prompt("Password: ", 100, false); + if (password == NULL) + die_horribly(AH, modulename, "out of memory\n"); new_pass = true; } } while (new_pass); - if (password) - free(password); + AH->savedPassword = password; /* check to see that the backend connection was successfully made */ if (PQstatus(AH->connection) == CONNECTION_BAD) diff --git a/src/bin/pg_dump/pg_backup_files.c b/src/bin/pg_dump/pg_backup_files.c index 47a4ddd77c..544e48722b 100644 --- a/src/bin/pg_dump/pg_backup_files.c +++ b/src/bin/pg_dump/pg_backup_files.c @@ -20,7 +20,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_files.c,v 1.34 2007/10/28 21:55:52 tgl Exp $ + * $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_files.c,v 1.35 2009/02/02 20:07:37 adunstan Exp $ * *------------------------------------------------------------------------- */ @@ -87,6 +87,7 @@ InitArchiveFmt_Files(ArchiveHandle *AH) AH->WriteBufPtr = _WriteBuf; AH->ReadBufPtr = _ReadBuf; AH->ClosePtr = _CloseArchive; + AH->ReopenPtr = NULL; AH->PrintTocDataPtr = _PrintTocData; AH->ReadExtraTocPtr = _ReadExtraToc; AH->WriteExtraTocPtr = _WriteExtraToc; @@ -96,6 +97,8 @@ InitArchiveFmt_Files(ArchiveHandle *AH) AH->StartBlobPtr = _StartBlob; AH->EndBlobPtr = _EndBlob; AH->EndBlobsPtr = _EndBlobs; + AH->ClonePtr = NULL; + AH->DeClonePtr = NULL; /* * Set up some special context used in compressing data. diff --git a/src/bin/pg_dump/pg_backup_null.c b/src/bin/pg_dump/pg_backup_null.c index f31a173ce1..abff1138fc 100644 --- a/src/bin/pg_dump/pg_backup_null.c +++ b/src/bin/pg_dump/pg_backup_null.c @@ -17,7 +17,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_null.c,v 1.19 2006/07/14 14:52:26 momjian Exp $ + * $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_null.c,v 1.20 2009/02/02 20:07:37 adunstan Exp $ * *------------------------------------------------------------------------- */ @@ -54,12 +54,15 @@ InitArchiveFmt_Null(ArchiveHandle *AH) AH->WriteBytePtr = _WriteByte; AH->WriteBufPtr = _WriteBuf; AH->ClosePtr = _CloseArchive; + AH->ReopenPtr = NULL; AH->PrintTocDataPtr = _PrintTocData; AH->StartBlobsPtr = _StartBlobs; AH->StartBlobPtr = _StartBlob; AH->EndBlobPtr = _EndBlob; AH->EndBlobsPtr = _EndBlobs; + AH->ClonePtr = NULL; + AH->DeClonePtr = NULL; /* Initialize LO buffering */ AH->lo_buf_size = LOBBUFSIZE; diff --git a/src/bin/pg_dump/pg_backup_tar.c b/src/bin/pg_dump/pg_backup_tar.c index ddab506bf4..0156bbf9f0 100644 --- a/src/bin/pg_dump/pg_backup_tar.c +++ b/src/bin/pg_dump/pg_backup_tar.c @@ -16,7 +16,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_tar.c,v 1.62 2007/11/15 21:14:41 momjian Exp $ + * $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_tar.c,v 1.63 2009/02/02 20:07:37 adunstan Exp $ * *------------------------------------------------------------------------- */ @@ -143,6 +143,7 @@ InitArchiveFmt_Tar(ArchiveHandle *AH) AH->WriteBufPtr = _WriteBuf; AH->ReadBufPtr = _ReadBuf; AH->ClosePtr = _CloseArchive; + AH->ReopenPtr = NULL; AH->PrintTocDataPtr = _PrintTocData; AH->ReadExtraTocPtr = _ReadExtraToc; AH->WriteExtraTocPtr = _WriteExtraToc; @@ -152,6 +153,8 @@ InitArchiveFmt_Tar(ArchiveHandle *AH) AH->StartBlobPtr = _StartBlob; AH->EndBlobPtr = _EndBlob; AH->EndBlobsPtr = _EndBlobs; + AH->ClonePtr = NULL; + AH->DeClonePtr = NULL; /* * Set up some special context used in compressing data. @@ -1383,5 +1386,4 @@ _tarWriteHeader(TAR_MEMBER *th) if (fwrite(h, 1, 512, th->tarFH) != 512) die_horribly(th->AH, modulename, "could not write to output file: %s\n", strerror(errno)); - } diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 6f057f2a33..5ef46ae08e 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -12,7 +12,7 @@ * by PostgreSQL * * IDENTIFICATION - * $PostgreSQL: pgsql/src/bin/pg_dump/pg_dump.c,v 1.518 2009/02/02 19:31:39 alvherre Exp $ + * $PostgreSQL: pgsql/src/bin/pg_dump/pg_dump.c,v 1.519 2009/02/02 20:07:37 adunstan Exp $ * *------------------------------------------------------------------------- */ @@ -695,6 +695,7 @@ main(int argc, char **argv) { /* Add placeholders to allow correct sorting of blobs */ DumpableObject *blobobj; + DumpableObject *blobcobj; blobobj = (DumpableObject *) malloc(sizeof(DumpableObject)); blobobj->objType = DO_BLOBS; @@ -702,11 +703,12 @@ main(int argc, char **argv) AssignDumpId(blobobj); blobobj->name = strdup("BLOBS"); - blobobj = (DumpableObject *) malloc(sizeof(DumpableObject)); - blobobj->objType = DO_BLOB_COMMENTS; - blobobj->catId = nilCatalogId; - AssignDumpId(blobobj); - blobobj->name = strdup("BLOB COMMENTS"); + blobcobj = (DumpableObject *) malloc(sizeof(DumpableObject)); + blobcobj->objType = DO_BLOB_COMMENTS; + blobcobj->catId = nilCatalogId; + AssignDumpId(blobcobj); + blobcobj->name = strdup("BLOB COMMENTS"); + addObjectDependency(blobcobj, blobobj->dumpId); } /* @@ -1385,11 +1387,10 @@ dumpTableData(Archive *fout, TableDataInfo *tdinfo) } ArchiveEntry(fout, tdinfo->dobj.catId, tdinfo->dobj.dumpId, - tbinfo->dobj.name, - tbinfo->dobj.namespace->dobj.name, - NULL, - tbinfo->rolname, false, - "TABLE DATA", "", "", copyStmt, + tbinfo->dobj.name, tbinfo->dobj.namespace->dobj.name, + NULL, tbinfo->rolname, + false, "TABLE DATA", SECTION_DATA, + "", "", copyStmt, tdinfo->dobj.dependencies, tdinfo->dobj.nDeps, dumpFn, tdinfo); @@ -1738,6 +1739,7 @@ dumpDatabase(Archive *AH) dba, /* Owner */ false, /* with oids */ "DATABASE", /* Desc */ + SECTION_PRE_DATA, /* Section */ creaQry->data, /* Create */ delQry->data, /* Del */ NULL, /* Copy */ @@ -1764,7 +1766,8 @@ dumpDatabase(Archive *AH) appendPQExpBuffer(dbQry, ";\n"); ArchiveEntry(AH, dbCatId, createDumpId(), datname, NULL, NULL, - dba, false, "COMMENT", dbQry->data, "", NULL, + dba, false, "COMMENT", SECTION_NONE, + dbQry->data, "", NULL, &dbDumpId, 1, NULL, NULL); } } @@ -1802,7 +1805,8 @@ dumpEncoding(Archive *AH) ArchiveEntry(AH, nilCatalogId, createDumpId(), "ENCODING", NULL, NULL, "", - false, "ENCODING", qry->data, "", NULL, + false, "ENCODING", SECTION_PRE_DATA, + qry->data, "", NULL, NULL, 0, NULL, NULL); @@ -1828,7 +1832,8 @@ dumpStdStrings(Archive *AH) ArchiveEntry(AH, nilCatalogId, createDumpId(), "STDSTRINGS", NULL, NULL, "", - false, "STDSTRINGS", qry->data, "", NULL, + false, "STDSTRINGS", SECTION_PRE_DATA, + qry->data, "", NULL, NULL, 0, NULL, NULL); @@ -5514,9 +5519,15 @@ dumpComment(Archive *fout, const char *target, appendStringLiteralAH(query, comments->descr, fout); appendPQExpBuffer(query, ";\n"); + /* + * We mark comments as SECTION_NONE because they really belong + * in the same section as their parent, whether that is + * pre-data or post-data. + */ ArchiveEntry(fout, nilCatalogId, createDumpId(), - target, namespace, NULL, owner, false, - "COMMENT", query->data, "", NULL, + target, namespace, NULL, owner, + false, "COMMENT", SECTION_NONE, + query->data, "", NULL, &(dumpId), 1, NULL, NULL); @@ -5575,9 +5586,9 @@ dumpTableComment(Archive *fout, TableInfo *tbinfo, ArchiveEntry(fout, nilCatalogId, createDumpId(), target->data, tbinfo->dobj.namespace->dobj.name, - NULL, - tbinfo->rolname, - false, "COMMENT", query->data, "", NULL, + NULL, tbinfo->rolname, + false, "COMMENT", SECTION_NONE, + query->data, "", NULL, &(tbinfo->dobj.dumpId), 1, NULL, NULL); } @@ -5597,9 +5608,9 @@ dumpTableComment(Archive *fout, TableInfo *tbinfo, ArchiveEntry(fout, nilCatalogId, createDumpId(), target->data, tbinfo->dobj.namespace->dobj.name, - NULL, - tbinfo->rolname, - false, "COMMENT", query->data, "", NULL, + NULL, tbinfo->rolname, + false, "COMMENT", SECTION_NONE, + query->data, "", NULL, &(tbinfo->dobj.dumpId), 1, NULL, NULL); } @@ -5872,15 +5883,17 @@ dumpDumpableObject(Archive *fout, DumpableObject *dobj) case DO_BLOBS: ArchiveEntry(fout, dobj->catId, dobj->dumpId, dobj->name, NULL, NULL, "", - false, "BLOBS", "", "", NULL, - NULL, 0, + false, "BLOBS", SECTION_DATA, + "", "", NULL, + dobj->dependencies, dobj->nDeps, dumpBlobs, NULL); break; case DO_BLOB_COMMENTS: ArchiveEntry(fout, dobj->catId, dobj->dumpId, dobj->name, NULL, NULL, "", - false, "BLOB COMMENTS", "", "", NULL, - NULL, 0, + false, "BLOB COMMENTS", SECTION_DATA, + "", "", NULL, + dobj->dependencies, dobj->nDeps, dumpBlobComments, NULL); break; } @@ -5918,7 +5931,8 @@ dumpNamespace(Archive *fout, NamespaceInfo *nspinfo) nspinfo->dobj.name, NULL, NULL, nspinfo->rolname, - false, "SCHEMA", q->data, delq->data, NULL, + false, "SCHEMA", SECTION_PRE_DATA, + q->data, delq->data, NULL, nspinfo->dobj.dependencies, nspinfo->dobj.nDeps, NULL, NULL); @@ -6021,7 +6035,8 @@ dumpEnumType(Archive *fout, TypeInfo *tinfo) tinfo->dobj.namespace->dobj.name, NULL, tinfo->rolname, false, - "TYPE", q->data, delq->data, NULL, + "TYPE", SECTION_PRE_DATA, + q->data, delq->data, NULL, tinfo->dobj.dependencies, tinfo->dobj.nDeps, NULL, NULL); @@ -6389,7 +6404,8 @@ dumpBaseType(Archive *fout, TypeInfo *tinfo) tinfo->dobj.namespace->dobj.name, NULL, tinfo->rolname, false, - "TYPE", q->data, delq->data, NULL, + "TYPE", SECTION_PRE_DATA, + q->data, delq->data, NULL, tinfo->dobj.dependencies, tinfo->dobj.nDeps, NULL, NULL); @@ -6507,7 +6523,8 @@ dumpDomain(Archive *fout, TypeInfo *tinfo) tinfo->dobj.namespace->dobj.name, NULL, tinfo->rolname, false, - "DOMAIN", q->data, delq->data, NULL, + "DOMAIN", SECTION_PRE_DATA, + q->data, delq->data, NULL, tinfo->dobj.dependencies, tinfo->dobj.nDeps, NULL, NULL); @@ -6600,7 +6617,8 @@ dumpCompositeType(Archive *fout, TypeInfo *tinfo) tinfo->dobj.namespace->dobj.name, NULL, tinfo->rolname, false, - "TYPE", q->data, delq->data, NULL, + "TYPE", SECTION_PRE_DATA, + q->data, delq->data, NULL, tinfo->dobj.dependencies, tinfo->dobj.nDeps, NULL, NULL); @@ -6653,7 +6671,8 @@ dumpShellType(Archive *fout, ShellTypeInfo *stinfo) stinfo->dobj.namespace->dobj.name, NULL, stinfo->baseType->rolname, false, - "SHELL TYPE", q->data, "", NULL, + "SHELL TYPE", SECTION_PRE_DATA, + q->data, "", NULL, stinfo->dobj.dependencies, stinfo->dobj.nDeps, NULL, NULL); @@ -6773,7 +6792,7 @@ dumpProcLang(Archive *fout, ProcLangInfo *plang) ArchiveEntry(fout, plang->dobj.catId, plang->dobj.dumpId, plang->dobj.name, lanschema, NULL, plang->lanowner, - false, "PROCEDURAL LANGUAGE", + false, "PROCEDURAL LANGUAGE", SECTION_PRE_DATA, defqry->data, delqry->data, NULL, plang->dobj.dependencies, plang->dobj.nDeps, NULL, NULL); @@ -7331,7 +7350,8 @@ dumpFunc(Archive *fout, FuncInfo *finfo) finfo->dobj.namespace->dobj.name, NULL, finfo->rolname, false, - "FUNCTION", q->data, delqry->data, NULL, + "FUNCTION", SECTION_PRE_DATA, + q->data, delqry->data, NULL, finfo->dobj.dependencies, finfo->dobj.nDeps, NULL, NULL); @@ -7482,7 +7502,8 @@ dumpCast(Archive *fout, CastInfo *cast) ArchiveEntry(fout, cast->dobj.catId, cast->dobj.dumpId, castsig->data, "pg_catalog", NULL, "", - false, "CAST", defqry->data, delqry->data, NULL, + false, "CAST", SECTION_PRE_DATA, + defqry->data, delqry->data, NULL, cast->dobj.dependencies, cast->dobj.nDeps, NULL, NULL); @@ -7723,7 +7744,8 @@ dumpOpr(Archive *fout, OprInfo *oprinfo) oprinfo->dobj.namespace->dobj.name, NULL, oprinfo->rolname, - false, "OPERATOR", q->data, delq->data, NULL, + false, "OPERATOR", SECTION_PRE_DATA, + q->data, delq->data, NULL, oprinfo->dobj.dependencies, oprinfo->dobj.nDeps, NULL, NULL); @@ -8175,7 +8197,8 @@ dumpOpclass(Archive *fout, OpclassInfo *opcinfo) opcinfo->dobj.namespace->dobj.name, NULL, opcinfo->rolname, - false, "OPERATOR CLASS", q->data, delq->data, NULL, + false, "OPERATOR CLASS", SECTION_PRE_DATA, + q->data, delq->data, NULL, opcinfo->dobj.dependencies, opcinfo->dobj.nDeps, NULL, NULL); @@ -8451,7 +8474,8 @@ dumpOpfamily(Archive *fout, OpfamilyInfo *opfinfo) opfinfo->dobj.namespace->dobj.name, NULL, opfinfo->rolname, - false, "OPERATOR FAMILY", q->data, delq->data, NULL, + false, "OPERATOR FAMILY", SECTION_PRE_DATA, + q->data, delq->data, NULL, opfinfo->dobj.dependencies, opfinfo->dobj.nDeps, NULL, NULL); @@ -8564,7 +8588,8 @@ dumpConversion(Archive *fout, ConvInfo *convinfo) convinfo->dobj.namespace->dobj.name, NULL, convinfo->rolname, - false, "CONVERSION", q->data, delq->data, NULL, + false, "CONVERSION", SECTION_PRE_DATA, + q->data, delq->data, NULL, convinfo->dobj.dependencies, convinfo->dobj.nDeps, NULL, NULL); @@ -8805,7 +8830,8 @@ dumpAgg(Archive *fout, AggInfo *agginfo) agginfo->aggfn.dobj.namespace->dobj.name, NULL, agginfo->aggfn.rolname, - false, "AGGREGATE", q->data, delq->data, NULL, + false, "AGGREGATE", SECTION_PRE_DATA, + q->data, delq->data, NULL, agginfo->aggfn.dobj.dependencies, agginfo->aggfn.dobj.nDeps, NULL, NULL); @@ -8892,7 +8918,8 @@ dumpTSParser(Archive *fout, TSParserInfo *prsinfo) prsinfo->dobj.namespace->dobj.name, NULL, "", - false, "TEXT SEARCH PARSER", q->data, delq->data, NULL, + false, "TEXT SEARCH PARSER", SECTION_PRE_DATA, + q->data, delq->data, NULL, prsinfo->dobj.dependencies, prsinfo->dobj.nDeps, NULL, NULL); @@ -8981,7 +9008,8 @@ dumpTSDictionary(Archive *fout, TSDictInfo *dictinfo) dictinfo->dobj.namespace->dobj.name, NULL, dictinfo->rolname, - false, "TEXT SEARCH DICTIONARY", q->data, delq->data, NULL, + false, "TEXT SEARCH DICTIONARY", SECTION_PRE_DATA, + q->data, delq->data, NULL, dictinfo->dobj.dependencies, dictinfo->dobj.nDeps, NULL, NULL); @@ -9040,7 +9068,8 @@ dumpTSTemplate(Archive *fout, TSTemplateInfo *tmplinfo) tmplinfo->dobj.namespace->dobj.name, NULL, "", - false, "TEXT SEARCH TEMPLATE", q->data, delq->data, NULL, + false, "TEXT SEARCH TEMPLATE", SECTION_PRE_DATA, + q->data, delq->data, NULL, tmplinfo->dobj.dependencies, tmplinfo->dobj.nDeps, NULL, NULL); @@ -9170,7 +9199,8 @@ dumpTSConfig(Archive *fout, TSConfigInfo *cfginfo) cfginfo->dobj.namespace->dobj.name, NULL, cfginfo->rolname, - false, "TEXT SEARCH CONFIGURATION", q->data, delq->data, NULL, + false, "TEXT SEARCH CONFIGURATION", SECTION_PRE_DATA, + q->data, delq->data, NULL, cfginfo->dobj.dependencies, cfginfo->dobj.nDeps, NULL, NULL); @@ -9220,7 +9250,8 @@ dumpForeignDataWrapper(Archive *fout, FdwInfo *fdwinfo) NULL, NULL, fdwinfo->rolname, - false, "FOREIGN DATA WRAPPER", q->data, delq->data, NULL, + false, "FOREIGN DATA WRAPPER", SECTION_PRE_DATA, + q->data, delq->data, NULL, fdwinfo->dobj.dependencies, fdwinfo->dobj.nDeps, NULL, NULL); @@ -9298,7 +9329,8 @@ dumpForeignServer(Archive *fout, ForeignServerInfo *srvinfo) NULL, NULL, srvinfo->rolname, - false, "SERVER", q->data, delq->data, NULL, + false, "SERVER", SECTION_PRE_DATA, + q->data, delq->data, NULL, srvinfo->dobj.dependencies, srvinfo->dobj.nDeps, NULL, NULL); @@ -9393,8 +9425,8 @@ dumpUserMappings(Archive *fout, const char *target, namespace, NULL, owner, false, - "USER MAPPING", q->data, - delq->data, NULL, + "USER MAPPING", SECTION_PRE_DATA, + q->data, delq->data, NULL, &dumpId, 1, NULL, NULL); } @@ -9447,7 +9479,8 @@ dumpACL(Archive *fout, CatalogId objCatId, DumpId objDumpId, tag, nspname, NULL, owner ? owner : "", - false, "ACL", sql->data, "", NULL, + false, "ACL", SECTION_NONE, + sql->data, "", NULL, &(objDumpId), 1, NULL, NULL); @@ -9797,7 +9830,8 @@ dumpTableSchema(Archive *fout, TableInfo *tbinfo) (tbinfo->relkind == RELKIND_VIEW) ? NULL : tbinfo->reltablespace, tbinfo->rolname, (strcmp(reltypename, "TABLE") == 0) ? tbinfo->hasoids : false, - reltypename, q->data, delq->data, NULL, + reltypename, SECTION_PRE_DATA, + q->data, delq->data, NULL, tbinfo->dobj.dependencies, tbinfo->dobj.nDeps, NULL, NULL); @@ -9863,7 +9897,8 @@ dumpAttrDef(Archive *fout, AttrDefInfo *adinfo) tbinfo->dobj.namespace->dobj.name, NULL, tbinfo->rolname, - false, "DEFAULT", q->data, delq->data, NULL, + false, "DEFAULT", SECTION_PRE_DATA, + q->data, delq->data, NULL, adinfo->dobj.dependencies, adinfo->dobj.nDeps, NULL, NULL); @@ -9956,7 +9991,8 @@ dumpIndex(Archive *fout, IndxInfo *indxinfo) tbinfo->dobj.namespace->dobj.name, indxinfo->tablespace, tbinfo->rolname, false, - "INDEX", q->data, delq->data, NULL, + "INDEX", SECTION_POST_DATA, + q->data, delq->data, NULL, indxinfo->dobj.dependencies, indxinfo->dobj.nDeps, NULL, NULL); } @@ -10059,7 +10095,8 @@ dumpConstraint(Archive *fout, ConstraintInfo *coninfo) tbinfo->dobj.namespace->dobj.name, indxinfo->tablespace, tbinfo->rolname, false, - "CONSTRAINT", q->data, delq->data, NULL, + "CONSTRAINT", SECTION_POST_DATA, + q->data, delq->data, NULL, coninfo->dobj.dependencies, coninfo->dobj.nDeps, NULL, NULL); } @@ -10091,7 +10128,8 @@ dumpConstraint(Archive *fout, ConstraintInfo *coninfo) tbinfo->dobj.namespace->dobj.name, NULL, tbinfo->rolname, false, - "FK CONSTRAINT", q->data, delq->data, NULL, + "FK CONSTRAINT", SECTION_POST_DATA, + q->data, delq->data, NULL, coninfo->dobj.dependencies, coninfo->dobj.nDeps, NULL, NULL); } @@ -10125,7 +10163,8 @@ dumpConstraint(Archive *fout, ConstraintInfo *coninfo) tbinfo->dobj.namespace->dobj.name, NULL, tbinfo->rolname, false, - "CHECK CONSTRAINT", q->data, delq->data, NULL, + "CHECK CONSTRAINT", SECTION_POST_DATA, + q->data, delq->data, NULL, coninfo->dobj.dependencies, coninfo->dobj.nDeps, NULL, NULL); } @@ -10160,7 +10199,8 @@ dumpConstraint(Archive *fout, ConstraintInfo *coninfo) tinfo->dobj.namespace->dobj.name, NULL, tinfo->rolname, false, - "CHECK CONSTRAINT", q->data, delq->data, NULL, + "CHECK CONSTRAINT", SECTION_POST_DATA, + q->data, delq->data, NULL, coninfo->dobj.dependencies, coninfo->dobj.nDeps, NULL, NULL); } @@ -10433,7 +10473,8 @@ dumpSequence(Archive *fout, TableInfo *tbinfo) tbinfo->dobj.namespace->dobj.name, NULL, tbinfo->rolname, - false, "SEQUENCE", query->data, delqry->data, NULL, + false, "SEQUENCE", SECTION_PRE_DATA, + query->data, delqry->data, NULL, tbinfo->dobj.dependencies, tbinfo->dobj.nDeps, NULL, NULL); @@ -10468,7 +10509,8 @@ dumpSequence(Archive *fout, TableInfo *tbinfo) tbinfo->dobj.namespace->dobj.name, NULL, tbinfo->rolname, - false, "SEQUENCE OWNED BY", query->data, "", NULL, + false, "SEQUENCE OWNED BY", SECTION_PRE_DATA, + query->data, "", NULL, &(tbinfo->dobj.dumpId), 1, NULL, NULL); } @@ -10495,7 +10537,8 @@ dumpSequence(Archive *fout, TableInfo *tbinfo) tbinfo->dobj.namespace->dobj.name, NULL, tbinfo->rolname, - false, "SEQUENCE SET", query->data, "", NULL, + false, "SEQUENCE SET", SECTION_PRE_DATA, + query->data, "", NULL, &(tbinfo->dobj.dumpId), 1, NULL, NULL); } @@ -10691,7 +10734,8 @@ dumpTrigger(Archive *fout, TriggerInfo *tginfo) tbinfo->dobj.namespace->dobj.name, NULL, tbinfo->rolname, false, - "TRIGGER", query->data, delqry->data, NULL, + "TRIGGER", SECTION_POST_DATA, + query->data, delqry->data, NULL, tginfo->dobj.dependencies, tginfo->dobj.nDeps, NULL, NULL); @@ -10810,7 +10854,8 @@ dumpRule(Archive *fout, RuleInfo *rinfo) tbinfo->dobj.namespace->dobj.name, NULL, tbinfo->rolname, false, - "RULE", cmd->data, delcmd->data, NULL, + "RULE", SECTION_POST_DATA, + cmd->data, delcmd->data, NULL, rinfo->dobj.dependencies, rinfo->dobj.nDeps, NULL, NULL); diff --git a/src/bin/pg_dump/pg_restore.c b/src/bin/pg_dump/pg_restore.c index 1d0b3c42c2..cee5f9e620 100644 --- a/src/bin/pg_dump/pg_restore.c +++ b/src/bin/pg_dump/pg_restore.c @@ -34,7 +34,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/bin/pg_dump/pg_restore.c,v 1.91 2009/01/06 17:18:11 momjian Exp $ + * $PostgreSQL: pgsql/src/bin/pg_dump/pg_restore.c,v 1.92 2009/02/02 20:07:37 adunstan Exp $ * *------------------------------------------------------------------------- */ @@ -93,6 +93,7 @@ main(int argc, char **argv) {"ignore-version", 0, NULL, 'i'}, {"index", 1, NULL, 'I'}, {"list", 0, NULL, 'l'}, + {"multi-thread", 1, NULL, 'm'}, {"no-privileges", 0, NULL, 'x'}, {"no-acl", 0, NULL, 'x'}, {"no-owner", 0, NULL, 'O'}, @@ -141,7 +142,7 @@ main(int argc, char **argv) } } - while ((c = getopt_long(argc, argv, "acCd:ef:F:h:iI:lL:n:Op:P:RsS:t:T:U:vWxX:1", + while ((c = getopt_long(argc, argv, "acCd:ef:F:h:iI:lL:m:n:Op:P:RsS:t:T:U:vWxX:1", cmdopts, NULL)) != -1) { switch (c) @@ -184,6 +185,10 @@ main(int argc, char **argv) opts->tocFile = strdup(optarg); break; + case 'm': /* number of restore threads */ + opts->number_of_threads = atoi(optarg); + break; + case 'n': /* Dump data for this schema only */ opts->schemaNames = strdup(optarg); break; @@ -269,7 +274,10 @@ main(int argc, char **argv) break; case 0: - /* This covers the long options equivalent to -X xxx. */ + /* + * This covers the long options without a short equivalent, + * including those equivalent to -X xxx. + */ break; case 2: /* SET ROLE */ @@ -301,6 +309,14 @@ main(int argc, char **argv) opts->useDB = 1; } + /* Can't do single-txn mode with multiple connections */ + if (opts->single_txn && opts->number_of_threads > 1) + { + fprintf(stderr, _("%s: cannot specify both --single-transaction and multiple threads\n"), + progname); + exit(1); + } + opts->disable_triggers = disable_triggers; opts->noDataForFailedTables = no_data_for_failed_tables; opts->noTablespace = outputNoTablespaces; @@ -308,10 +324,8 @@ main(int argc, char **argv) if (opts->formatName) { - switch (opts->formatName[0]) { - case 'c': case 'C': opts->format = archCustom; @@ -396,6 +410,7 @@ usage(const char *progname) printf(_(" -I, --index=NAME restore named index\n")); printf(_(" -L, --use-list=FILENAME use specified table of contents for ordering\n" " output from this file\n")); + printf(_(" -m, --multi-thread=NUM use this many parallel connections to restore\n")); printf(_(" -n, --schema=NAME restore only objects in this schema\n")); printf(_(" -O, --no-owner skip restoration of object ownership\n")); printf(_(" -P, --function=NAME(args)\n" -- 2.40.0