X-Git-Url: https://granicus.if.org/sourcecode?a=blobdiff_plain;f=src%2Fbin%2Fpg_dump%2Fpg_backup_archiver.c;h=64d8d93dda81805540ac70dbd7afcf11390c19fc;hb=0d692a0dc9f0e532c67c577187fe5d7d323cb95b;hp=64603b6a324f8aa97043f0cbd88211d54849bbe5;hpb=c7040429e7ab19490c2e292e6b6ca01ef551b909;p=postgresql diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c index 64603b6a32..64d8d93dda 100644 --- a/src/bin/pg_dump/pg_backup_archiver.c +++ b/src/bin/pg_dump/pg_backup_archiver.c @@ -15,64 +15,136 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.c,v 1.96 2004/08/30 19:44:14 tgl Exp $ + * src/bin/pg_dump/pg_backup_archiver.c * *------------------------------------------------------------------------- */ -#include "pg_backup.h" -#include "pg_dump.h" -#include "pg_backup_archiver.h" #include "pg_backup_db.h" #include "dumputils.h" #include -#include #include +#include +#include + +#ifdef WIN32 +#include +#endif -#include "pqexpbuffer.h" #include "libpq/libpq-fs.h" +/* + * Special exit values from worker children. We reserve 0 for normal + * success; 1 and other small values should be interpreted as crashes. + */ +#define WORKER_CREATE_DONE 10 +#define WORKER_INHIBIT_DATA 11 +#define WORKER_IGNORED_ERRORS 12 + +/* + * Unix uses exit to return result from worker child, so function is void. + * Windows thread result comes via function return. + */ +#ifndef WIN32 +#define parallel_restore_result void +#else +#define parallel_restore_result DWORD +#endif + +/* IDs for worker children are either PIDs or thread handles */ +#ifndef WIN32 +#define thandle pid_t +#else +#define thandle HANDLE +#endif + +/* Arguments needed for a worker child */ +typedef struct _restore_args +{ + ArchiveHandle *AH; + TocEntry *te; +} RestoreArgs; -typedef enum _teReqs_ +/* State for each parallel activity slot */ +typedef struct _parallel_slot { - REQ_SCHEMA = 1, - REQ_DATA = 2, - REQ_ALL = REQ_SCHEMA + REQ_DATA -} teReqs; + thandle child_id; + RestoreArgs *args; +} ParallelSlot; + +#define NO_SLOT (-1) const char *progname; -static char *modulename = gettext_noop("archiver"); + +static const char *modulename = gettext_noop("archiver"); + +/* index array created by fix_dependencies -- only used in parallel restore */ +static TocEntry **tocsByDumpId; /* index by dumpId - 1 */ +static DumpId maxDumpId; /* length of above array */ static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt, const int compression, ArchiveMode mode); -static char *_getObjectFromDropStmt(const char *dropStmt, const char *type); +static void _getObjectDescription(PQExpBuffer buf, TocEntry *te, + ArchiveHandle *AH); static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt, bool isData, bool acl_pass); -static void fixPriorBlobRefs(ArchiveHandle *AH, TocEntry *blobte, - RestoreOptions *ropt); static void _doSetFixedOutputState(ArchiveHandle *AH); static void _doSetSessionAuth(ArchiveHandle *AH, const char *user); static void _doSetWithOids(ArchiveHandle *AH, const bool withOids); -static void _reconnectToDB(ArchiveHandle *AH, const char *dbname, const char *user); +static void _reconnectToDB(ArchiveHandle *AH, const char *dbname); static void _becomeUser(ArchiveHandle *AH, const char *user); static void _becomeOwner(ArchiveHandle *AH, TocEntry *te); static void _selectOutputSchema(ArchiveHandle *AH, const char *schemaName); - -static teReqs _tocEntryRequired(TocEntry *te, RestoreOptions *ropt, bool acl_pass); +static void _selectTablespace(ArchiveHandle *AH, const char *tablespace); +static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te); +static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te); +static teReqs _tocEntryRequired(TocEntry *te, RestoreOptions *ropt, bool include_acls); +static bool _tocEntryIsACL(TocEntry *te); static void _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt); static void _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt); static TocEntry *getTocEntryByDumpId(ArchiveHandle *AH, DumpId id); -static void _moveAfter(ArchiveHandle *AH, TocEntry *pos, TocEntry *te); +static void _moveBefore(ArchiveHandle *AH, TocEntry *pos, TocEntry *te); static int _discoverArchiveFormat(ArchiveHandle *AH); +static void dump_lo_buf(ArchiveHandle *AH); static void _write_msg(const char *modulename, const char *fmt, va_list ap); static void _die_horribly(ArchiveHandle *AH, const char *modulename, const char *fmt, va_list ap); -static int _canRestoreBlobs(ArchiveHandle *AH); -static int _restoringToDB(ArchiveHandle *AH); +static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim); +static OutputContext SetOutput(ArchiveHandle *AH, char *filename, int compression); +static void ResetOutput(ArchiveHandle *AH, OutputContext savedContext); + +static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te, + RestoreOptions *ropt, bool is_parallel); +static void restore_toc_entries_parallel(ArchiveHandle *AH); +static thandle spawn_restore(RestoreArgs *args); +static thandle reap_child(ParallelSlot *slots, int n_slots, int *work_status); +static bool work_in_progress(ParallelSlot *slots, int n_slots); +static int get_next_slot(ParallelSlot *slots, int n_slots); +static void par_list_header_init(TocEntry *l); +static void par_list_append(TocEntry *l, TocEntry *te); +static void par_list_remove(TocEntry *te); +static TocEntry *get_next_work_item(ArchiveHandle *AH, + TocEntry *ready_list, + ParallelSlot *slots, int n_slots); +static parallel_restore_result parallel_restore(RestoreArgs *args); +static void mark_work_done(ArchiveHandle *AH, TocEntry *ready_list, + thandle worker, int status, + ParallelSlot *slots, int n_slots); +static void fix_dependencies(ArchiveHandle *AH); +static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2); +static void repoint_table_dependencies(ArchiveHandle *AH, + DumpId tableId, DumpId tableDataId); +static void identify_locking_dependencies(TocEntry *te); +static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te, + TocEntry *ready_list); +static void mark_create_done(ArchiveHandle *AH, TocEntry *te); +static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te); +static ArchiveHandle *CloneArchive(ArchiveHandle *AH); +static void DeCloneArchive(ArchiveHandle *AH); /* @@ -88,10 +160,10 @@ static int _restoringToDB(ArchiveHandle *AH); /* Public */ Archive * CreateArchive(const char *FileSpec, const ArchiveFormat fmt, - const int compression) + const int compression, ArchiveMode mode) { - ArchiveHandle *AH = _allocAH(FileSpec, fmt, compression, archModeWrite); + ArchiveHandle *AH = _allocAH(FileSpec, fmt, compression, mode); return (Archive *) AH; } @@ -122,7 +194,8 @@ CloseArchive(Archive *AHX) res = fclose(AH->OF); if (res != 0) - die_horribly(AH, modulename, "could not close output archive file\n"); + die_horribly(AH, modulename, "could not close output file: %s\n", + strerror(errno)); } /* Public */ @@ -130,11 +203,9 @@ void RestoreArchive(Archive *AHX, RestoreOptions *ropt) { ArchiveHandle *AH = (ArchiveHandle *) AHX; - TocEntry *te = AH->toc->next; + TocEntry *te; teReqs reqs; OutputContext sav; - int impliedDataOnly; - bool defnDumped; AH->ropt = ropt; AH->stage = STAGE_INITIALIZING; @@ -142,15 +213,36 @@ RestoreArchive(Archive *AHX, RestoreOptions *ropt) /* * Check for nonsensical option combinations. * - * NB: create+dropSchema is useless because if you're creating the DB, - * there's no need to drop individual items in it. Moreover, if we - * tried to do that then we'd issue the drops in the database - * initially connected to, not the one we will create, which is very - * bad... + * NB: createDB+dropSchema is useless because if you're creating the DB, + * there's no need to drop individual items in it. Moreover, if we tried + * to do that then we'd issue the drops in the database initially + * connected to, not the one we will create, which is very bad... */ - if (ropt->create && ropt->dropSchema) + if (ropt->createDB && ropt->dropSchema) die_horribly(AH, modulename, "-C and -c are incompatible options\n"); + /* + * -C is not compatible with -1, because we can't create a database inside + * a transaction block. + */ + if (ropt->createDB && ropt->single_txn) + die_horribly(AH, modulename, "-C and -1 are incompatible options\n"); + + /* + * Make sure we won't need (de)compression we haven't got + */ +#ifndef HAVE_LIBZ + if (AH->compression != 0 && AH->PrintTocDataPtr !=NULL) + { + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + reqs = _tocEntryRequired(te, ropt, false); + if (te->hadDumper && (reqs & REQ_DATA) != 0) + die_horribly(AH, modulename, "cannot restore from compressed archive (compression not supported in this installation)\n"); + } + } +#endif + /* * If we're using a DB connection, then connect it. */ @@ -166,37 +258,36 @@ RestoreArchive(Archive *AHX, RestoreOptions *ropt) ConnectDatabase(AHX, ropt->dbname, ropt->pghost, ropt->pgport, ropt->username, - ropt->requirePassword, ropt->ignoreVersion); + ropt->promptPassword); /* - * If we're talking to the DB directly, don't send comments since - * they obscure SQL when displaying errors + * If we're talking to the DB directly, don't send comments since they + * obscure SQL when displaying errors */ AH->noTocComments = 1; } /* - * Work out if we have an implied data-only restore. This can happen - * if the dump was data only or if the user has used a toc list to - * exclude all of the schema data. All we do is look for schema - * entries - if none are found then we set the dataOnly flag. + * Work out if we have an implied data-only restore. This can happen if + * the dump was data only or if the user has used a toc list to exclude + * all of the schema data. All we do is look for schema entries - if none + * are found then we set the dataOnly flag. * * We could scan for wanted TABLE entries, but that is not the same as * dataOnly. At this stage, it seems unnecessary (6-Mar-2001). */ if (!ropt->dataOnly) { - te = AH->toc->next; - impliedDataOnly = 1; - while (te != AH->toc) + int impliedDataOnly = 1; + + for (te = AH->toc->next; te != AH->toc; te = te->next) { - reqs = _tocEntryRequired(te, ropt, false); + reqs = _tocEntryRequired(te, ropt, true); if ((reqs & REQ_SCHEMA) != 0) { /* It's schema, and it's wanted */ impliedDataOnly = 0; break; } - te = te->next; } if (impliedDataOnly) { @@ -213,6 +304,25 @@ RestoreArchive(Archive *AHX, RestoreOptions *ropt) ahprintf(AH, "--\n-- PostgreSQL database dump\n--\n\n"); + if (AH->public.verbose) + { + if (AH->archiveRemoteVersion) + ahprintf(AH, "-- Dumped from database version %s\n", + AH->archiveRemoteVersion); + if (AH->archiveDumpVersion) + ahprintf(AH, "-- Dumped by pg_dump version %s\n", + AH->archiveDumpVersion); + dumpTimestamp(AH, "Started on", AH->createDate); + } + + if (ropt->single_txn) + { + if (AH->connection) + StartTransaction(AH); + else + ahprintf(AH, "BEGIN;\n\n"); + } + /* * Establish important parameter values right away. */ @@ -225,15 +335,14 @@ RestoreArchive(Archive *AHX, RestoreOptions *ropt) */ if (ropt->dropSchema) { - te = AH->toc->prev; - AH->currentTE = te; - - while (te != AH->toc) + for (te = AH->toc->prev; te != AH->toc; te = te->prev) { - reqs = _tocEntryRequired(te, ropt, false); - if (((reqs & REQ_SCHEMA) != 0) && te->dropStmt) + AH->currentTE = te; + + reqs = _tocEntryRequired(te, ropt, false /* needn't drop ACLs */ ); + /* We want anything that's selected and has a dropStmt */ + if (((reqs & (REQ_SCHEMA | REQ_DATA)) != 0) && te->dropStmt) { - /* We want the schema */ ahlog(AH, 1, "dropping %s %s\n", te->desc, te->tag); /* Select owner and schema as necessary */ _becomeOwner(AH, te); @@ -241,168 +350,70 @@ RestoreArchive(Archive *AHX, RestoreOptions *ropt) /* Drop it */ ahprintf(AH, "%s", te->dropStmt); } - te = te->prev; } + + /* + * _selectOutputSchema may have set currSchema to reflect the effect + * of a "SET search_path" command it emitted. However, by now we may + * have dropped that schema; or it might not have existed in the first + * place. In either case the effective value of search_path will not + * be what we think. Forcibly reset currSchema so that we will + * re-establish the search_path setting when needed (after creating + * the schema). + * + * If we treated users as pg_dump'able objects then we'd need to reset + * currUser here too. + */ + if (AH->currSchema) + free(AH->currSchema); + AH->currSchema = NULL; } /* - * Now process each TOC entry + * In serial mode, we now process each non-ACL TOC entry. + * + * In parallel mode, turn control over to the parallel-restore logic. */ - te = AH->toc->next; - while (te != AH->toc) + if (ropt->number_of_jobs > 1 && ropt->useDB) + restore_toc_entries_parallel(AH); + else { - AH->currentTE = te; - - /* Work out what, if anything, we want from this entry */ - reqs = _tocEntryRequired(te, ropt, false); - - /* Dump any relevant dump warnings to stderr */ - if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0) - { - if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0) - write_msg(modulename, "warning from original dump file: %s\n", te->defn); - else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0) - write_msg(modulename, "warning from original dump file: %s\n", te->copyStmt); - } - - defnDumped = false; - - if ((reqs & REQ_SCHEMA) != 0) /* We want the schema */ - { - ahlog(AH, 1, "creating %s %s\n", te->desc, te->tag); - - _printTocEntry(AH, te, ropt, false, false); - defnDumped = true; - - /* If we created a DB, connect to it... */ - if (strcmp(te->desc, "DATABASE") == 0) - { - ahlog(AH, 1, "connecting to new database \"%s\" as user \"%s\"\n", te->tag, te->owner); - _reconnectToDB(AH, te->tag, te->owner); - } - } - - /* - * If we have a data component, then process it - */ - if ((reqs & REQ_DATA) != 0) - { - /* - * hadDumper will be set if there is genuine data component - * for this node. Otherwise, we need to check the defn field - * for statements that need to be executed in data-only - * restores. - */ - if (te->hadDumper) - { - /* - * If we can output the data, then restore it. - */ - if (AH->PrintTocDataPtr !=NULL && (reqs & REQ_DATA) != 0) - { -#ifndef HAVE_LIBZ - if (AH->compression != 0) - die_horribly(AH, modulename, "cannot restore from compressed archive (not configured for compression support)\n"); -#endif - - _printTocEntry(AH, te, ropt, true, false); - - /* - * Maybe we can't do BLOBS, so check if this node is - * for BLOBS - */ - if ((strcmp(te->desc, "BLOBS") == 0) && - !_canRestoreBlobs(AH)) - { - ahprintf(AH, "--\n-- SKIPPED \n--\n\n"); - - /* - * This is a bit nasty - we assume, for the - * moment, that if a custom output is used, then - * we don't want warnings. - */ - if (!AH->CustomOutPtr) - write_msg(modulename, "WARNING: skipping large-object restoration\n"); - } - else - { - _disableTriggersIfNecessary(AH, te, ropt); - - /* Select owner and schema as necessary */ - _becomeOwner(AH, te); - _selectOutputSchema(AH, te->namespace); - - ahlog(AH, 1, "restoring data for table \"%s\"\n", te->tag); - - /* - * If we have a copy statement, use it. As of - * V1.3, these are separate to allow easy import - * from withing a database connection. Pre 1.3 - * archives can not use DB connections and are - * sent to output only. - * - * For V1.3+, the table data MUST have a copy - * statement so that we can go into appropriate - * mode with libpq. - */ - if (te->copyStmt && strlen(te->copyStmt) > 0) - ahprintf(AH, te->copyStmt); - - (*AH->PrintTocDataPtr) (AH, te, ropt); - - /* - * If we just restored blobs, fix references in - * previously-loaded tables; otherwise, if we - * previously restored blobs, fix references in - * this table. Note that in standard cases the - * BLOBS entry comes after all TABLE DATA entries, - * but we should cope with other orders in case - * the user demands reordering. - */ - if (strcmp(te->desc, "BLOBS") == 0) - fixPriorBlobRefs(AH, te, ropt); - else if (AH->createdBlobXref && - strcmp(te->desc, "TABLE DATA") == 0) - { - ahlog(AH, 1, "fixing up large-object cross-reference for \"%s\"\n", te->tag); - FixupBlobRefs(AH, te); - } - - _enableTriggersIfNecessary(AH, te, ropt); - } - } - } - else if (!defnDumped) - { - /* If we haven't already dumped the defn part, do so now */ - ahlog(AH, 1, "executing %s %s\n", te->desc, te->tag); - _printTocEntry(AH, te, ropt, false, false); - } - } - te = te->next; - } /* end loop over TOC entries */ + for (te = AH->toc->next; te != AH->toc; te = te->next) + (void) restore_toc_entry(AH, te, ropt, false); + } /* * Scan TOC again to output ownership commands and ACLs */ - te = AH->toc->next; - while (te != AH->toc) + for (te = AH->toc->next; te != AH->toc; te = te->next) { AH->currentTE = te; /* Work out what, if anything, we want from this entry */ reqs = _tocEntryRequired(te, ropt, true); - if ((reqs & REQ_SCHEMA) != 0) /* We want the schema */ + /* Both schema and data objects might now have ownership/ACLs */ + if ((reqs & (REQ_SCHEMA | REQ_DATA)) != 0) { - ahlog(AH, 1, "setting owner and acl for %s %s\n", + ahlog(AH, 1, "setting owner and privileges for %s %s\n", te->desc, te->tag); _printTocEntry(AH, te, ropt, false, true); } + } - te = te->next; + if (ropt->single_txn) + { + if (AH->connection) + CommitTransaction(AH); + else + ahprintf(AH, "COMMIT;\n\n"); } + if (AH->public.verbose) + dumpTimestamp(AH, "Completed on", time(NULL)); + + ahprintf(AH, "--\n-- PostgreSQL database dump complete\n--\n\n"); + /* * Clean up & we're done. */ @@ -415,50 +426,194 @@ RestoreArchive(Archive *AHX, RestoreOptions *ropt) { PQfinish(AH->connection); AH->connection = NULL; - - if (AH->blobConnection) - { - PQfinish(AH->blobConnection); - AH->blobConnection = NULL; - } } - - ahprintf(AH, "--\n-- PostgreSQL database dump complete\n--\n\n"); } /* - * After restoring BLOBS, fix all blob references in previously-restored - * tables. (Normally, the BLOBS entry should appear after all TABLE DATA - * entries, so this will in fact handle all blob references.) + * Restore a single TOC item. Used in both parallel and non-parallel restore; + * is_parallel is true if we are in a worker child process. + * + * Returns 0 normally, but WORKER_CREATE_DONE or WORKER_INHIBIT_DATA if + * the parallel parent has to make the corresponding status update. */ -static void -fixPriorBlobRefs(ArchiveHandle *AH, TocEntry *blobte, RestoreOptions *ropt) +static int +restore_toc_entry(ArchiveHandle *AH, TocEntry *te, + RestoreOptions *ropt, bool is_parallel) { - TocEntry *te; + int retval = 0; teReqs reqs; + bool defnDumped; + + AH->currentTE = te; + + /* Work out what, if anything, we want from this entry */ + reqs = _tocEntryRequired(te, ropt, false); + + /* Dump any relevant dump warnings to stderr */ + if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0) + { + if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0) + write_msg(modulename, "warning from original dump file: %s\n", te->defn); + else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0) + write_msg(modulename, "warning from original dump file: %s\n", te->copyStmt); + } - if (AH->createdBlobXref) + defnDumped = false; + + if ((reqs & REQ_SCHEMA) != 0) /* We want the schema */ { - /* NULL parameter means disable ALL user triggers */ - _disableTriggersIfNecessary(AH, NULL, ropt); + ahlog(AH, 1, "creating %s %s\n", te->desc, te->tag); + + _printTocEntry(AH, te, ropt, false, false); + defnDumped = true; - for (te = AH->toc->next; te != blobte; te = te->next) + if (strcmp(te->desc, "TABLE") == 0) { - if (strcmp(te->desc, "TABLE DATA") == 0) + if (AH->lastErrorTE == te) { - reqs = _tocEntryRequired(te, ropt, false); - - if ((reqs & REQ_DATA) != 0) /* We loaded the data */ + /* + * We failed to create the table. If + * --no-data-for-failed-tables was given, mark the + * corresponding TABLE DATA to be ignored. + * + * In the parallel case this must be done in the parent, so we + * just set the return value. + */ + if (ropt->noDataForFailedTables) { - ahlog(AH, 1, "fixing up large-object cross-reference for \"%s\"\n", te->tag); - FixupBlobRefs(AH, te); + if (is_parallel) + retval = WORKER_INHIBIT_DATA; + else + inhibit_data_for_failed_table(AH, te); } } + else + { + /* + * We created the table successfully. Mark the corresponding + * TABLE DATA for possible truncation. + * + * In the parallel case this must be done in the parent, so we + * just set the return value. + */ + if (is_parallel) + retval = WORKER_CREATE_DONE; + else + mark_create_done(AH, te); + } + } + + /* If we created a DB, connect to it... */ + if (strcmp(te->desc, "DATABASE") == 0) + { + ahlog(AH, 1, "connecting to new database \"%s\"\n", te->tag); + _reconnectToDB(AH, te->tag); + ropt->dbname = strdup(te->tag); } + } + + /* + * If we have a data component, then process it + */ + if ((reqs & REQ_DATA) != 0) + { + /* + * hadDumper will be set if there is genuine data component for this + * node. Otherwise, we need to check the defn field for statements + * that need to be executed in data-only restores. + */ + if (te->hadDumper) + { + /* + * If we can output the data, then restore it. + */ + if (AH->PrintTocDataPtr !=NULL && (reqs & REQ_DATA) != 0) + { + _printTocEntry(AH, te, ropt, true, false); + + if (strcmp(te->desc, "BLOBS") == 0 || + strcmp(te->desc, "BLOB COMMENTS") == 0) + { + ahlog(AH, 1, "restoring %s\n", te->desc); + + _selectOutputSchema(AH, "pg_catalog"); + + (*AH->PrintTocDataPtr) (AH, te, ropt); + } + else + { + _disableTriggersIfNecessary(AH, te, ropt); + + /* Select owner and schema as necessary */ + _becomeOwner(AH, te); + _selectOutputSchema(AH, te->namespace); + + ahlog(AH, 1, "restoring data for table \"%s\"\n", + te->tag); + + /* + * In parallel restore, if we created the table earlier in + * the run then we wrap the COPY in a transaction and + * precede it with a TRUNCATE. If archiving is not on + * this prevents WAL-logging the COPY. This obtains a + * speedup similar to that from using single_txn mode in + * non-parallel restores. + */ + if (is_parallel && te->created) + { + /* + * Parallel restore is always talking directly to a + * server, so no need to see if we should issue BEGIN. + */ + StartTransaction(AH); + + /* + * If the server version is >= 8.4, make sure we issue + * TRUNCATE with ONLY so that child tables are not + * wiped. + */ + ahprintf(AH, "TRUNCATE TABLE %s%s;\n\n", + (PQserverVersion(AH->connection) >= 80400 ? + "ONLY " : ""), + fmtId(te->tag)); + } + + /* + * If we have a copy statement, use it. As of V1.3, these + * are separate to allow easy import from withing a + * database connection. Pre 1.3 archives can not use DB + * connections and are sent to output only. + * + * For V1.3+, the table data MUST have a copy statement so + * that we can go into appropriate mode with libpq. + */ + if (te->copyStmt && strlen(te->copyStmt) > 0) + { + ahprintf(AH, "%s", te->copyStmt); + AH->writingCopyData = true; + } + + (*AH->PrintTocDataPtr) (AH, te, ropt); + + AH->writingCopyData = false; - /* NULL parameter means enable ALL user triggers */ - _enableTriggersIfNecessary(AH, NULL, ropt); + /* close out the transaction started above */ + if (is_parallel && te->created) + CommitTransaction(AH); + + _enableTriggersIfNecessary(AH, te, ropt); + } + } + } + else if (!defnDumped) + { + /* If we haven't already dumped the defn part, do so now */ + ahlog(AH, 1, "executing %s %s\n", te->desc, te->tag); + _printTocEntry(AH, te, ropt, false, false); + } } + + return retval; } /* @@ -472,29 +627,13 @@ NewRestoreOptions(void) opts = (RestoreOptions *) calloc(1, sizeof(RestoreOptions)); + /* set any fields that shouldn't default to zeroes */ opts->format = archUnknown; - opts->suppressDumpWarnings = false; - opts->exit_on_error = false; + opts->promptPassword = TRI_DEFAULT; return opts; } -/* - * Returns true if we're restoring directly to the database (and - * aren't just making a psql script that can do the restoration). - */ -static int -_restoringToDB(ArchiveHandle *AH) -{ - return (AH->ropt->useDB && AH->connection); -} - -static int -_canRestoreBlobs(ArchiveHandle *AH) -{ - return _restoringToDB(AH); -} - static void _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt) { @@ -502,36 +641,23 @@ _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *rop if (!ropt->dataOnly || !ropt->disable_triggers) return; - /* Don't do it for the BLOBS TocEntry, either */ - if (te && strcmp(te->desc, "BLOBS") == 0) - return; + ahlog(AH, 1, "disabling triggers for %s\n", te->tag); /* * Become superuser if possible, since they are the only ones who can - * update pg_class. If -S was not given, assume the initial user - * identity is a superuser. + * disable constraint triggers. If -S was not given, assume the initial + * user identity is a superuser. (XXX would it be better to become the + * table owner?) */ _becomeUser(AH, ropt->superuser); - ahlog(AH, 1, "disabling triggers\n"); - /* - * Disable them. This is a hack. Needs to be done via an appropriate - * 'SET' command when one is available. + * Disable them. */ - ahprintf(AH, "-- Disable triggers\n"); + _selectOutputSchema(AH, te->namespace); - /* - * Just update the AFFECTED table, if known. Otherwise update all - * non-system tables. - */ - if (te && te->tag && strlen(te->tag) > 0) - ahprintf(AH, "UPDATE pg_catalog.pg_class SET reltriggers = 0 " - "WHERE oid = '%s'::pg_catalog.regclass;\n\n", - fmtId(te->tag)); - else - ahprintf(AH, "UPDATE pg_catalog.pg_class SET reltriggers = 0 FROM pg_catalog.pg_namespace " - "WHERE relnamespace = pg_namespace.oid AND nspname !~ '^pg_';\n\n"); + ahprintf(AH, "ALTER TABLE %s DISABLE TRIGGER ALL;\n\n", + fmtId(te->tag)); } static void @@ -541,39 +667,23 @@ _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt if (!ropt->dataOnly || !ropt->disable_triggers) return; - /* Don't do it for the BLOBS TocEntry, either */ - if (te && strcmp(te->desc, "BLOBS") == 0) - return; + ahlog(AH, 1, "enabling triggers for %s\n", te->tag); /* * Become superuser if possible, since they are the only ones who can - * update pg_class. If -S was not given, assume the initial user - * identity is a superuser. + * disable constraint triggers. If -S was not given, assume the initial + * user identity is a superuser. (XXX would it be better to become the + * table owner?) */ _becomeUser(AH, ropt->superuser); - ahlog(AH, 1, "enabling triggers\n"); - /* - * Enable them. This is a hack. Needs to be done via an appropriate - * 'SET' command when one is available. + * Enable them. */ - ahprintf(AH, "-- Enable triggers\n"); + _selectOutputSchema(AH, te->namespace); - /* - * Just update the AFFECTED table, if known. Otherwise update all - * non-system tables. - */ - if (te && te->tag && strlen(te->tag) > 0) - ahprintf(AH, "UPDATE pg_catalog.pg_class SET reltriggers = " - "(SELECT pg_catalog.count(*) FROM pg_catalog.pg_trigger where pg_class.oid = tgrelid) " - "WHERE oid = '%s'::pg_catalog.regclass;\n\n", - fmtId(te->tag)); - else - ahprintf(AH, "UPDATE pg_catalog.pg_class SET reltriggers = " - "(SELECT pg_catalog.count(*) FROM pg_catalog.pg_trigger where pg_class.oid = tgrelid) " - "FROM pg_catalog.pg_namespace " - "WHERE relnamespace = pg_namespace.oid AND nspname !~ '^pg_';\n\n"); + ahprintf(AH, "ALTER TABLE %s ENABLE TRIGGER ALL;\n\n", + fmtId(te->tag)); } /* @@ -602,8 +712,11 @@ void ArchiveEntry(Archive *AHX, CatalogId catalogId, DumpId dumpId, const char *tag, - const char *namespace, const char *owner, bool withOids, - const char *desc, const char *defn, + const char *namespace, + const char *tablespace, + const char *owner, bool withOids, + const char *desc, teSection section, + const char *defn, const char *dropStmt, const char *copyStmt, const DumpId *deps, int nDeps, DataDumperPtr dumpFn, void *dumpArg) @@ -626,9 +739,11 @@ ArchiveEntry(Archive *AHX, newToc->catalogId = catalogId; newToc->dumpId = dumpId; + newToc->section = section; newToc->tag = strdup(tag); newToc->namespace = namespace ? strdup(namespace) : NULL; + newToc->tablespace = tablespace ? strdup(tablespace) : NULL; newToc->owner = strdup(owner); newToc->withOids = withOids; newToc->desc = strdup(desc); @@ -663,7 +778,7 @@ void PrintTOCSummary(Archive *AHX, RestoreOptions *ropt) { ArchiveHandle *AH = (ArchiveHandle *) AHX; - TocEntry *te = AH->toc->next; + TocEntry *te; OutputContext sav; char *fmtName; @@ -693,16 +808,34 @@ PrintTOCSummary(Archive *AHX, RestoreOptions *ropt) ahprintf(AH, "; Format: %s\n", fmtName); ahprintf(AH, "; Integer: %d bytes\n", (int) AH->intSize); ahprintf(AH, "; Offset: %d bytes\n", (int) AH->offSize); + if (AH->archiveRemoteVersion) + ahprintf(AH, "; Dumped from database version: %s\n", + AH->archiveRemoteVersion); + if (AH->archiveDumpVersion) + ahprintf(AH, "; Dumped by pg_dump version: %s\n", + AH->archiveDumpVersion); ahprintf(AH, ";\n;\n; Selected TOC Entries:\n;\n"); - while (te != AH->toc) + /* We should print DATABASE entries whether or not -C was specified */ + ropt->createDB = 1; + + for (te = AH->toc->next; te != AH->toc; te = te->next) { - if (_tocEntryRequired(te, ropt, false) != 0) - ahprintf(AH, "%d; %u %u %s %s %s\n", te->dumpId, + if (ropt->verbose || _tocEntryRequired(te, ropt, true) != 0) + ahprintf(AH, "%d; %u %u %s %s %s %s\n", te->dumpId, te->catalogId.tableoid, te->catalogId.oid, - te->desc, te->tag, te->owner); - te = te->next; + te->desc, te->namespace ? te->namespace : "-", + te->tag, te->owner); + if (ropt->verbose && te->nDeps > 0) + { + int i; + + ahprintf(AH, ";\tdepends on:"); + for (i = 0; i < te->nDeps; i++) + ahprintf(AH, " %d", te->dependencies[i]); + ahprintf(AH, "\n"); + } } if (ropt->filename) @@ -749,6 +882,14 @@ EndBlob(Archive *AHX, Oid oid) void StartRestoreBlobs(ArchiveHandle *AH) { + if (!AH->ropt->single_txn) + { + if (AH->connection) + StartTransaction(AH); + else + ahprintf(AH, "BEGIN;\n\n"); + } + AH->blobCount = 0; } @@ -758,19 +899,18 @@ StartRestoreBlobs(ArchiveHandle *AH) void EndRestoreBlobs(ArchiveHandle *AH) { - if (AH->txActive) + if (!AH->ropt->single_txn) { - ahlog(AH, 2, "committing large-object transactions\n"); - CommitTransaction(AH); + if (AH->connection) + CommitTransaction(AH); + else + ahprintf(AH, "COMMIT;\n\n"); } - if (AH->blobTxActive) - CommitTransactionXref(AH); - - if (AH->createdBlobXref) - CreateBlobXrefIndex(AH); - - ahlog(AH, 1, "restored %d large objects\n", AH->blobCount); + ahlog(AH, 1, ngettext("restored %d large object\n", + "restored %d large objects\n", + AH->blobCount), + AH->blobCount); } @@ -778,46 +918,45 @@ EndRestoreBlobs(ArchiveHandle *AH) * Called by a format handler to initiate restoration of a blob */ void -StartRestoreBlob(ArchiveHandle *AH, Oid oid) +StartRestoreBlob(ArchiveHandle *AH, Oid oid, bool drop) { + bool old_blob_style = (AH->version < K_VERS_1_12); Oid loOid; AH->blobCount++; - if (!AH->createdBlobXref) - { - if (!AH->connection) - die_horribly(AH, modulename, "cannot restore large objects without a database connection\n"); - - CreateBlobXrefTable(AH); - AH->createdBlobXref = 1; - } - /* Initialize the LO Buffer */ AH->lo_buf_used = 0; - /* - * Start long-running TXs if necessary - */ - if (!AH->txActive) - { - ahlog(AH, 2, "starting large-object transactions\n"); - StartTransaction(AH); - } - if (!AH->blobTxActive) - StartTransactionXref(AH); - - loOid = lo_creat(AH->connection, INV_READ | INV_WRITE); - if (loOid == 0) - die_horribly(AH, modulename, "could not create large object\n"); + ahlog(AH, 2, "restoring large object with OID %u\n", oid); - ahlog(AH, 2, "restoring large object with OID %u as %u\n", oid, loOid); + /* With an old archive we must do drop and create logic here */ + if (old_blob_style && drop) + DropBlobIfExists(AH, oid); - InsertBlobXref(AH, oid, loOid); - - AH->loFd = lo_open(AH->connection, loOid, INV_WRITE); - if (AH->loFd == -1) - die_horribly(AH, modulename, "could not open large object\n"); + if (AH->connection) + { + if (old_blob_style) + { + loOid = lo_create(AH->connection, oid); + if (loOid == 0 || loOid != oid) + die_horribly(AH, modulename, "could not create large object %u: %s", + oid, PQerrorMessage(AH->connection)); + } + AH->loFd = lo_open(AH->connection, oid, INV_WRITE); + if (AH->loFd == -1) + die_horribly(AH, modulename, "could not open large object %u: %s", + oid, PQerrorMessage(AH->connection)); + } + else + { + if (old_blob_style) + ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n", + oid, INV_WRITE); + else + ahprintf(AH, "SELECT pg_catalog.lo_open('%u', %d);\n", + oid, INV_WRITE); + } AH->writingBlob = 1; } @@ -828,32 +967,22 @@ EndRestoreBlob(ArchiveHandle *AH, Oid oid) if (AH->lo_buf_used > 0) { /* Write remaining bytes from the LO buffer */ - size_t res; - - res = lo_write(AH->connection, AH->loFd, (void *) AH->lo_buf, AH->lo_buf_used); - - ahlog(AH, 5, "wrote remaining %lu bytes of large-object data (result = %lu)\n", - (unsigned long) AH->lo_buf_used, (unsigned long) res); - if (res != AH->lo_buf_used) - die_horribly(AH, modulename, "could not write to large object (result: %lu, expected: %lu)\n", - (unsigned long) res, (unsigned long) AH->lo_buf_used); - AH->lo_buf_used = 0; + dump_lo_buf(AH); } - lo_close(AH->connection, AH->loFd); AH->writingBlob = 0; - /* - * Commit every BLOB_BATCH_SIZE blobs... - */ - if (((AH->blobCount / BLOB_BATCH_SIZE) * BLOB_BATCH_SIZE) == AH->blobCount) + if (AH->connection) { - ahlog(AH, 2, "committing large-object transactions\n"); - CommitTransaction(AH); - CommitTransactionXref(AH); + lo_close(AH->connection, AH->loFd); + AH->loFd = -1; } -} - + else + { + ahprintf(AH, "SELECT pg_catalog.lo_close(0);\n\n"); + } +} + /*********** * Sorting and Reordering ***********/ @@ -868,39 +997,32 @@ SortTocFromFile(Archive *AHX, RestoreOptions *ropt) char *endptr; DumpId id; TocEntry *te; - TocEntry *tePrev; /* Allocate space for the 'wanted' array, and init it */ ropt->idWanted = (bool *) malloc(sizeof(bool) * AH->maxDumpId); memset(ropt->idWanted, 0, sizeof(bool) * AH->maxDumpId); - ropt->limitToList = true; - - /* Set prev entry as head of list */ - tePrev = AH->toc; /* Setup the file */ fh = fopen(ropt->tocFile, PG_BINARY_R); if (!fh) - die_horribly(AH, modulename, "could not open TOC file\n"); + die_horribly(AH, modulename, "could not open TOC file \"%s\": %s\n", + ropt->tocFile, strerror(errno)); - while (fgets(buf, 1024, fh) != NULL) + while (fgets(buf, sizeof(buf), fh) != NULL) { - /* Find a comment */ + /* Truncate line at comment, if any */ cmnt = strchr(buf, ';'); - if (cmnt == buf) - continue; - - /* End string at comment */ if (cmnt != NULL) cmnt[0] = '\0'; - /* Skip if all spaces */ - if (strspn(buf, " \t") == strlen(buf)) + /* Ignore if all blank */ + if (strspn(buf, " \t\r\n") == strlen(buf)) continue; - /* Get an ID */ + /* Get an ID, check it's valid and not already seen */ id = strtol(buf, &endptr, 10); - if (endptr == buf || id <= 0 || id > AH->maxDumpId) + if (endptr == buf || id <= 0 || id > AH->maxDumpId || + ropt->idWanted[id - 1]) { write_msg(modulename, "WARNING: line ignored: %s\n", buf); continue; @@ -912,10 +1034,21 @@ SortTocFromFile(Archive *AHX, RestoreOptions *ropt) die_horribly(AH, modulename, "could not find entry for ID %d\n", id); + /* Mark it wanted */ ropt->idWanted[id - 1] = true; - _moveAfter(AH, tePrev, te); - tePrev = te; + /* + * Move each item to the end of the list as it is selected, so that + * they are placed in the desired order. Any unwanted items will end + * up at the front of the list, which may seem unintuitive but it's + * what we need. In an ordinary serial restore that makes no + * difference, but in a parallel restore we need to mark unrestored + * items' dependencies as satisfied before we start examining + * restorable items. Otherwise they could have surprising + * side-effects on the order in which restorable items actually get + * restored. + */ + _moveBefore(AH, AH->toc, te); } if (fclose(fh) != 0) @@ -923,6 +1056,19 @@ SortTocFromFile(Archive *AHX, RestoreOptions *ropt) strerror(errno)); } +/* + * Set up a dummy ID filter that selects all dump IDs + */ +void +InitDummyWantedList(Archive *AHX, RestoreOptions *ropt) +{ + ArchiveHandle *AH = (ArchiveHandle *) AHX; + + /* Allocate space for the 'wanted' array, and init it to 1's */ + ropt->idWanted = (bool *) malloc(sizeof(bool) * AH->maxDumpId); + memset(ropt->idWanted, 1, sizeof(bool) * AH->maxDumpId); +} + /********************** * 'Convenience functions that look like standard IO functions * for writing data when in dump mode. @@ -945,9 +1091,9 @@ archprintf(Archive *AH, const char *fmt,...) int cnt = -1; /* - * This is paranoid: deal with the possibility that vsnprintf is - * willing to ignore trailing null or returns > 0 even if string does - * not fit. It may be the case that it returns cnt = bufsize + * This is paranoid: deal with the possibility that vsnprintf is willing + * to ignore trailing null or returns > 0 even if string does not fit. It + * may be the case that it returns cnt = bufsize */ while (cnt < 0 || cnt >= (bSize - 1)) { @@ -971,7 +1117,7 @@ archprintf(Archive *AH, const char *fmt,...) * Stuff below here should be 'private' to the archiver routines *******************************/ -OutputContext +static OutputContext SetOutput(ArchiveHandle *AH, char *filename, int compression) { OutputContext sav; @@ -1010,20 +1156,37 @@ SetOutput(ArchiveHandle *AH, char *filename, int compression) else #endif { /* Use fopen */ - if (fn >= 0) - AH->OF = fdopen(dup(fn), PG_BINARY_W); + if (AH->mode == archModeAppend) + { + if (fn >= 0) + AH->OF = fdopen(dup(fn), PG_BINARY_A); + else + AH->OF = fopen(filename, PG_BINARY_A); + } else - AH->OF = fopen(filename, PG_BINARY_W); + { + if (fn >= 0) + AH->OF = fdopen(dup(fn), PG_BINARY_W); + else + AH->OF = fopen(filename, PG_BINARY_W); + } AH->gzOut = 0; } if (!AH->OF) - die_horribly(AH, modulename, "could not open output file: %s\n", strerror(errno)); + { + if (filename) + die_horribly(AH, modulename, "could not open output file \"%s\": %s\n", + filename, strerror(errno)); + else + die_horribly(AH, modulename, "could not open output file: %s\n", + strerror(errno)); + } return sav; } -void +static void ResetOutput(ArchiveHandle *AH, OutputContext sav) { int res; @@ -1055,13 +1218,13 @@ ahprintf(ArchiveHandle *AH, const char *fmt,...) int cnt = -1; /* - * This is paranoid: deal with the possibility that vsnprintf is - * willing to ignore trailing null + * This is paranoid: deal with the possibility that vsnprintf is willing + * to ignore trailing null */ /* - * or returns > 0 even if string does not fit. It may be the case that - * it returns cnt = bufsize + * or returns > 0 even if string does not fit. It may be the case that it + * returns cnt = bufsize */ while (cnt < 0 || cnt >= (bSize - 1)) { @@ -1096,12 +1259,52 @@ ahlog(ArchiveHandle *AH, int level, const char *fmt,...) /* * Single place for logic which says 'We are restoring to a direct DB connection'. */ -int +static int RestoringToDB(ArchiveHandle *AH) { return (AH->ropt && AH->ropt->useDB && AH->connection); } +/* + * Dump the current contents of the LO data buffer while writing a BLOB + */ +static void +dump_lo_buf(ArchiveHandle *AH) +{ + if (AH->connection) + { + size_t res; + + res = lo_write(AH->connection, AH->loFd, AH->lo_buf, AH->lo_buf_used); + ahlog(AH, 5, ngettext("wrote %lu byte of large object data (result = %lu)\n", + "wrote %lu bytes of large object data (result = %lu)\n", + AH->lo_buf_used), + (unsigned long) AH->lo_buf_used, (unsigned long) res); + if (res != AH->lo_buf_used) + die_horribly(AH, modulename, + "could not write to large object (result: %lu, expected: %lu)\n", + (unsigned long) res, (unsigned long) AH->lo_buf_used); + } + else + { + PQExpBuffer buf = createPQExpBuffer(); + + appendByteaLiteralAHX(buf, + (const unsigned char *) AH->lo_buf, + AH->lo_buf_used, + AH); + + /* Hack: turn off writingBlob so ahwrite doesn't recurse to here */ + AH->writingBlob = 0; + ahprintf(AH, "SELECT pg_catalog.lowrite(0, %s);\n", buf->data); + AH->writingBlob = 1; + + destroyPQExpBuffer(buf); + } + AH->lo_buf_used = 0; +} + + /* * Write buffer to the output file (usually stdout). This is user for * outputting 'restore' scripts etc. It is even possible for an archive @@ -1115,37 +1318,29 @@ ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH) if (AH->writingBlob) { - if (AH->lo_buf_used + size * nmemb > AH->lo_buf_size) - { - /* Split LO buffer */ - size_t remaining = AH->lo_buf_size - AH->lo_buf_used; - size_t slack = nmemb * size - remaining; - - memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, remaining); - res = lo_write(AH->connection, AH->loFd, AH->lo_buf, AH->lo_buf_size); - ahlog(AH, 5, "wrote %lu bytes of large object data (result = %lu)\n", - (unsigned long) AH->lo_buf_size, (unsigned long) res); - if (res != AH->lo_buf_size) - die_horribly(AH, modulename, - "could not write to large object (result: %lu, expected: %lu)\n", - (unsigned long) res, (unsigned long) AH->lo_buf_size); - memcpy(AH->lo_buf, (char *) ptr + remaining, slack); - AH->lo_buf_used = slack; - } - else + size_t remaining = size * nmemb; + + while (AH->lo_buf_used + remaining > AH->lo_buf_size) { - /* LO Buffer is still large enough, buffer it */ - memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, size * nmemb); - AH->lo_buf_used += size * nmemb; + size_t avail = AH->lo_buf_size - AH->lo_buf_used; + + memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, avail); + ptr = (const void *) ((const char *) ptr + avail); + remaining -= avail; + AH->lo_buf_used += avail; + dump_lo_buf(AH); } + memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, remaining); + AH->lo_buf_used += remaining; + return size * nmemb; } else if (AH->gzOut) { res = GZWRITE((void *) ptr, size, nmemb, AH->OF); if (res != (nmemb * size)) - die_horribly(AH, modulename, "could not write to compressed archive\n"); + die_horribly(AH, modulename, "could not write to output file: %s\n", strerror(errno)); return res; } else if (AH->CustomOutPtr) @@ -1168,8 +1363,8 @@ ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH) { res = fwrite((void *) ptr, size, nmemb, AH->OF); if (res != nmemb) - die_horribly(AH, modulename, "could not write to output file (%lu != %lu)\n", - (unsigned long) res, (unsigned long) nmemb); + die_horribly(AH, modulename, "could not write to output file: %s\n", + strerror(errno)); return res; } } @@ -1180,10 +1375,10 @@ static void _write_msg(const char *modulename, const char *fmt, va_list ap) { if (modulename) - fprintf(stderr, "%s: [%s] ", progname, gettext(modulename)); + fprintf(stderr, "%s: [%s] ", progname, _(modulename)); else fprintf(stderr, "%s: ", progname); - vfprintf(stderr, gettext(fmt), ap); + vfprintf(stderr, _(fmt), ap); } void @@ -1208,8 +1403,6 @@ _die_horribly(ArchiveHandle *AH, const char *modulename, const char *fmt, va_lis write_msg(NULL, "*** aborted because of error\n"); if (AH->connection) PQfinish(AH->connection); - if (AH->blobConnection) - PQfinish(AH->blobConnection); } exit(1); @@ -1268,9 +1461,10 @@ warn_or_die_horribly(ArchiveHandle *AH, } if (AH->currentTE != NULL && AH->currentTE != AH->lastErrorTE) { - write_msg(modulename, "Error from TOC Entry %d; %u %u %s %s %s\n", AH->currentTE->dumpId, - AH->currentTE->catalogId.tableoid, AH->currentTE->catalogId.oid, - AH->currentTE->desc, AH->currentTE->tag, AH->currentTE->owner); + write_msg(modulename, "Error from TOC entry %d; %u %u %s %s %s\n", + AH->currentTE->dumpId, + AH->currentTE->catalogId.tableoid, AH->currentTE->catalogId.oid, + AH->currentTE->desc, AH->currentTE->tag, AH->currentTE->owner); } AH->lastErrorStage = AH->stage; AH->lastErrorTE = AH->currentTE; @@ -1286,50 +1480,52 @@ warn_or_die_horribly(ArchiveHandle *AH, va_end(ap); } +#ifdef NOT_USED + static void _moveAfter(ArchiveHandle *AH, TocEntry *pos, TocEntry *te) { + /* Unlink te from list */ te->prev->next = te->next; te->next->prev = te->prev; + /* and insert it after "pos" */ te->prev = pos; te->next = pos->next; - pos->next->prev = te; pos->next = te; } -#ifdef NOT_USED +#endif static void _moveBefore(ArchiveHandle *AH, TocEntry *pos, TocEntry *te) { + /* Unlink te from list */ te->prev->next = te->next; te->next->prev = te->prev; + /* and insert it before "pos" */ te->prev = pos->prev; te->next = pos; pos->prev->next = te; pos->prev = te; } -#endif static TocEntry * getTocEntryByDumpId(ArchiveHandle *AH, DumpId id) { TocEntry *te; - te = AH->toc->next; - while (te != AH->toc) + for (te = AH->toc->next; te != AH->toc; te = te->next) { if (te->dumpId == id) return te; - te = te->next; } return NULL; } -int +teReqs TocIDRequired(ArchiveHandle *AH, DumpId id, RestoreOptions *ropt) { TocEntry *te = getTocEntryByDumpId(AH, id); @@ -1337,28 +1533,28 @@ TocIDRequired(ArchiveHandle *AH, DumpId id, RestoreOptions *ropt) if (!te) return 0; - return _tocEntryRequired(te, ropt, false); + return _tocEntryRequired(te, ropt, true); } size_t -WriteOffset(ArchiveHandle *AH, off_t o, int wasSet) +WriteOffset(ArchiveHandle *AH, pgoff_t o, int wasSet) { int off; /* Save the flag */ (*AH->WriteBytePtr) (AH, wasSet); - /* Write out off_t smallest byte first, prevents endian mismatch */ - for (off = 0; off < sizeof(off_t); off++) + /* Write out pgoff_t smallest byte first, prevents endian mismatch */ + for (off = 0; off < sizeof(pgoff_t); off++) { (*AH->WriteBytePtr) (AH, o & 0xFF); o >>= 8; } - return sizeof(off_t) + 1; + return sizeof(pgoff_t) + 1; } int -ReadOffset(ArchiveHandle *AH, off_t *o) +ReadOffset(ArchiveHandle *AH, pgoff_t * o) { int i; int off; @@ -1378,14 +1574,14 @@ ReadOffset(ArchiveHandle *AH, off_t *o) else if (i == 0) return K_OFFSET_NO_DATA; - /* Cast to off_t because it was written as an int. */ - *o = (off_t) i; + /* Cast to pgoff_t because it was written as an int. */ + *o = (pgoff_t) i; return K_OFFSET_POS_SET; } /* - * Read the flag indicating the state of the data pointer. Check if - * valid and die if not. + * Read the flag indicating the state of the data pointer. Check if valid + * and die if not. * * This used to be handled by a negative or zero pointer, now we use an * extra byte specifically for the state. @@ -1401,7 +1597,7 @@ ReadOffset(ArchiveHandle *AH, off_t *o) break; default: - die_horribly(AH, modulename, "Unexpected data offset flag %d\n", offsetFlg); + die_horribly(AH, modulename, "unexpected data offset flag %d\n", offsetFlg); } /* @@ -1409,8 +1605,8 @@ ReadOffset(ArchiveHandle *AH, off_t *o) */ for (off = 0; off < AH->offSize; off++) { - if (off < sizeof(off_t)) - *o |= ((off_t) ((*AH->ReadBytePtr) (AH))) << (off * 8); + if (off < sizeof(pgoff_t)) + *o |= ((pgoff_t) ((*AH->ReadBytePtr) (AH))) << (off * 8); else { if ((*AH->ReadBytePtr) (AH) != 0) @@ -1427,11 +1623,11 @@ WriteInt(ArchiveHandle *AH, int i) int b; /* - * This is a bit yucky, but I don't want to make the binary format - * very dependent on representation, and not knowing much about it, I - * write out a sign byte. If you change this, don't forget to change - * the file version #, and modify readInt to read the new format AS - * WELL AS the old formats. + * This is a bit yucky, but I don't want to make the binary format very + * dependent on representation, and not knowing much about it, I write out + * a sign byte. If you change this, don't forget to change the file + * version #, and modify readInt to read the new format AS WELL AS the old + * formats. */ /* SIGN byte */ @@ -1502,7 +1698,7 @@ ReadStr(ArchiveHandle *AH) int l; l = ReadInt(AH); - if (l == -1) + if (l < 0) buf = NULL; else { @@ -1510,7 +1706,9 @@ ReadStr(ArchiveHandle *AH) if (!buf) die_horribly(AH, modulename, "out of memory\n"); - (*AH->ReadBufPtr) (AH, (void *) buf, l); + if ((*AH->ReadBufPtr) (AH, (void *) buf, l) != l) + die_horribly(AH, modulename, "unexpected end of file\n"); + buf[l] = '\0'; } @@ -1541,12 +1739,17 @@ _discoverArchiveFormat(ArchiveHandle *AH) { wantClose = 1; fh = fopen(AH->fSpec, PG_BINARY_R); + if (!fh) + die_horribly(AH, modulename, "could not open input file \"%s\": %s\n", + AH->fSpec, strerror(errno)); } else + { fh = stdin; - - if (!fh) - die_horribly(AH, modulename, "could not open input file: %s\n", strerror(errno)); + if (!fh) + die_horribly(AH, modulename, "could not open input file: %s\n", + strerror(errno)); + } cnt = fread(sig, 1, 5, fh); @@ -1565,6 +1768,11 @@ _discoverArchiveFormat(ArchiveHandle *AH) if (strncmp(sig, "PGDMP", 5) == 0) { + /* + * Finish reading (most of) a custom-format header. + * + * NB: this code must agree with ReadHead(). + */ AH->vmaj = fgetc(fh); AH->vmin = fgetc(fh); @@ -1620,23 +1828,18 @@ _discoverArchiveFormat(ArchiveHandle *AH) if (fseeko(fh, 0, SEEK_SET) != 0) { /* - * NOTE: Formats that use the lookahead buffer can unset this in - * their Init routine. + * NOTE: Formats that use the lookahead buffer can unset this in their + * Init routine. */ AH->readHeader = 1; } else AH->lookaheadLen = 0; /* Don't bother since we've reset the file */ -#if 0 - write_msg(modulename, "read %lu bytes into lookahead buffer\n", - (unsigned long) AH->lookaheadLen); -#endif - /* Close the file */ if (wantClose) if (fclose(fh) != 0) - die_horribly(AH, modulename, "could not close the input file after reading header: %s\n", + die_horribly(AH, modulename, "could not close input file: %s\n", strerror(errno)); return AH->format; @@ -1666,10 +1869,23 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt, AH->vmin = K_VERS_MINOR; AH->vrev = K_VERS_REV; + /* Make a convenient integer 00 */ + AH->version = ((AH->vmaj * 256 + AH->vmin) * 256 + AH->vrev) * 256 + 0; + + /* initialize for backwards compatible string processing */ + AH->public.encoding = 0; /* PG_SQL_ASCII */ + AH->public.std_strings = false; + + /* sql error handling */ + AH->public.exit_on_error = true; + AH->public.n_errors = 0; + + AH->archiveDumpVersion = PG_VERSION; + AH->createDate = time(NULL); AH->intSize = sizeof(int); - AH->offSize = sizeof(off_t); + AH->offSize = sizeof(pgoff_t); if (FileSpec) { AH->fSpec = strdup(FileSpec); @@ -1684,9 +1900,9 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt, else AH->fSpec = NULL; - AH->currUser = strdup(""); /* So it's valid, but we can free() it - * later if necessary */ - AH->currSchema = strdup(""); /* ditto */ + AH->currUser = NULL; /* unknown */ + AH->currSchema = NULL; /* ditto */ + AH->currTablespace = NULL; /* ditto */ AH->currWithOids = -1; /* force SET */ AH->toc = (TocEntry *) calloc(1, sizeof(TocEntry)); @@ -1706,8 +1922,20 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt, AH->gzOut = 0; AH->OF = stdout; -#if 0 - write_msg(modulename, "archive format is %d\n", fmt); + /* + * On Windows, we need to use binary mode to read/write non-text archive + * formats. Force stdin/stdout into binary mode if that is what we are + * using. + */ +#ifdef WIN32 + if (fmt != archNull && + (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0)) + { + if (mode == archModeWrite) + setmode(fileno(stdout), O_BINARY); + else + setmode(fileno(stdin), O_BINARY); + } #endif if (fmt == archUnknown) @@ -1715,9 +1943,10 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt, else AH->format = fmt; + AH->promptPassword = TRI_DEFAULT; + switch (AH->format) { - case archCustom: InitArchiveFmt_Custom(AH); break; @@ -1738,10 +1967,6 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt, die_horribly(AH, modulename, "unrecognized file format \"%d\"\n", fmt); } - /* sql error handling */ - AH->public.exit_on_error = true; - AH->public.n_errors = 0; - return AH; } @@ -1749,11 +1974,11 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt, void WriteDataChunks(ArchiveHandle *AH) { - TocEntry *te = AH->toc->next; + TocEntry *te; StartDataPtr startPtr; EndDataPtr endPtr; - while (te != AH->toc) + for (te = AH->toc->next; te != AH->toc; te = te->next) { if (te->dataDumper != NULL) { @@ -1775,8 +2000,7 @@ WriteDataChunks(ArchiveHandle *AH) (*startPtr) (AH, te); /* - * printf("Dumper arg for %d is %x\n", te->id, - * te->dataDumperArg); + * printf("Dumper arg for %d is %x\n", te->id, te->dataDumperArg); */ /* @@ -1789,7 +2013,6 @@ WriteDataChunks(ArchiveHandle *AH) (*endPtr) (AH, te); AH->currToc = NULL; } - te = te->next; } } @@ -1817,10 +2040,12 @@ WriteToc(ArchiveHandle *AH) WriteStr(AH, te->tag); WriteStr(AH, te->desc); + WriteInt(AH, te->section); WriteStr(AH, te->defn); WriteStr(AH, te->dropStmt); WriteStr(AH, te->copyStmt); WriteStr(AH, te->namespace); + WriteStr(AH, te->tablespace); WriteStr(AH, te->owner); WriteStr(AH, te->withOids ? "true" : "false"); @@ -1845,8 +2070,7 @@ ReadToc(ArchiveHandle *AH) DumpId *deps; int depIdx; int depSize; - - TocEntry *te = AH->toc->next; + TocEntry *te; AH->tocCount = ReadInt(AH); AH->maxDumpId = 0; @@ -1862,7 +2086,7 @@ ReadToc(ArchiveHandle *AH) /* Sanity check */ if (te->dumpId <= 0) die_horribly(AH, modulename, - "entry ID %d out of range -- perhaps a corrupt TOC\n", + "entry ID %d out of range -- perhaps a corrupt TOC\n", te->dumpId); te->hadDumper = ReadInt(AH); @@ -1881,6 +2105,37 @@ ReadToc(ArchiveHandle *AH) te->tag = ReadStr(AH); te->desc = ReadStr(AH); + + if (AH->version >= K_VERS_1_11) + { + te->section = ReadInt(AH); + } + else + { + /* + * Rules for pre-8.4 archives wherein pg_dump hasn't classified + * the entries into sections. This list need not cover entry + * types added later than 8.4. + */ + if (strcmp(te->desc, "COMMENT") == 0 || + strcmp(te->desc, "ACL") == 0 || + strcmp(te->desc, "ACL LANGUAGE") == 0) + te->section = SECTION_NONE; + else if (strcmp(te->desc, "TABLE DATA") == 0 || + strcmp(te->desc, "BLOBS") == 0 || + strcmp(te->desc, "BLOB COMMENTS") == 0) + te->section = SECTION_DATA; + else if (strcmp(te->desc, "CONSTRAINT") == 0 || + strcmp(te->desc, "CHECK CONSTRAINT") == 0 || + strcmp(te->desc, "FK CONSTRAINT") == 0 || + strcmp(te->desc, "INDEX") == 0 || + strcmp(te->desc, "RULE") == 0 || + strcmp(te->desc, "TRIGGER") == 0) + te->section = SECTION_POST_DATA; + else + te->section = SECTION_PRE_DATA; + } + te->defn = ReadStr(AH); te->dropStmt = ReadStr(AH); @@ -1890,6 +2145,9 @@ ReadToc(ArchiveHandle *AH) if (AH->version >= K_VERS_1_6) te->namespace = ReadStr(AH); + if (AH->version >= K_VERS_1_10) + te->tablespace = ReadStr(AH); + te->owner = ReadStr(AH); if (AH->version >= K_VERS_1_9) { @@ -1947,33 +2205,100 @@ ReadToc(ArchiveHandle *AH) ahlog(AH, 3, "read TOC entry %d (ID %d) for %s %s\n", i, te->dumpId, te->desc, te->tag); + /* link completed entry into TOC circular list */ te->prev = AH->toc->prev; AH->toc->prev->next = te; AH->toc->prev = te; te->next = AH->toc; + + /* special processing immediately upon read for some items */ + if (strcmp(te->desc, "ENCODING") == 0) + processEncodingEntry(AH, te); + else if (strcmp(te->desc, "STDSTRINGS") == 0) + processStdStringsEntry(AH, te); + } +} + +static void +processEncodingEntry(ArchiveHandle *AH, TocEntry *te) +{ + /* te->defn should have the form SET client_encoding = 'foo'; */ + char *defn = strdup(te->defn); + char *ptr1; + char *ptr2 = NULL; + int encoding; + + ptr1 = strchr(defn, '\''); + if (ptr1) + ptr2 = strchr(++ptr1, '\''); + if (ptr2) + { + *ptr2 = '\0'; + encoding = pg_char_to_encoding(ptr1); + if (encoding < 0) + die_horribly(AH, modulename, "unrecognized encoding \"%s\"\n", + ptr1); + AH->public.encoding = encoding; } + else + die_horribly(AH, modulename, "invalid ENCODING item: %s\n", + te->defn); + + free(defn); +} + +static void +processStdStringsEntry(ArchiveHandle *AH, TocEntry *te) +{ + /* te->defn should have the form SET standard_conforming_strings = 'x'; */ + char *ptr1; + + ptr1 = strchr(te->defn, '\''); + if (ptr1 && strncmp(ptr1, "'on'", 4) == 0) + AH->public.std_strings = true; + else if (ptr1 && strncmp(ptr1, "'off'", 5) == 0) + AH->public.std_strings = false; + else + die_horribly(AH, modulename, "invalid STDSTRINGS item: %s\n", + te->defn); } static teReqs -_tocEntryRequired(TocEntry *te, RestoreOptions *ropt, bool acl_pass) +_tocEntryRequired(TocEntry *te, RestoreOptions *ropt, bool include_acls) { - teReqs res = 3; /* Schema = 1, Data = 2, Both = 3 */ + teReqs res = REQ_ALL; - /* ENCODING objects are dumped specially, so always reject here */ - if (strcmp(te->desc, "ENCODING") == 0) + /* ENCODING and STDSTRINGS items are dumped specially, so always reject */ + if (strcmp(te->desc, "ENCODING") == 0 || + strcmp(te->desc, "STDSTRINGS") == 0) return 0; /* If it's an ACL, maybe ignore it */ - if ((!acl_pass || ropt->aclsSkip) && strcmp(te->desc, "ACL") == 0) + if ((!include_acls || ropt->aclsSkip) && _tocEntryIsACL(te)) + return 0; + + /* If it's security labels, maybe ignore it */ + if (ropt->skip_seclabel && strcmp(te->desc, "SECURITY LABEL") == 0) return 0; - if (!ropt->create && strcmp(te->desc, "DATABASE") == 0) + /* Ignore DATABASE entry unless we should create it */ + if (!ropt->createDB && strcmp(te->desc, "DATABASE") == 0) return 0; - /* Check if tablename only is wanted */ + /* Check options for selective dump/restore */ + if (ropt->schemaNames) + { + /* If no namespace is specified, it means all. */ + if (!te->namespace) + return 0; + if (strcmp(ropt->schemaNames, te->namespace) != 0) + return 0; + } + if (ropt->selTypes) { - if ((strcmp(te->desc, "TABLE") == 0) || (strcmp(te->desc, "TABLE DATA") == 0)) + if (strcmp(te->desc, "TABLE") == 0 || + strcmp(te->desc, "TABLE DATA") == 0) { if (!ropt->selTable) return 0; @@ -2006,27 +2331,36 @@ _tocEntryRequired(TocEntry *te, RestoreOptions *ropt, bool acl_pass) } /* - * Check if we had a dataDumper. Indicates if the entry is schema or - * data + * Check if we had a dataDumper. Indicates if the entry is schema or data */ if (!te->hadDumper) { /* - * Special Case: If 'SEQUENCE SET' then it is considered a data - * entry + * Special Case: If 'SEQUENCE SET' or anything to do with BLOBs, then + * it is considered a data entry. We don't need to check for the + * BLOBS entry or old-style BLOB COMMENTS, because they will have + * hadDumper = true ... but we do need to check new-style BLOB + * comments. */ - if (strcmp(te->desc, "SEQUENCE SET") == 0) + if (strcmp(te->desc, "SEQUENCE SET") == 0 || + strcmp(te->desc, "BLOB") == 0 || + (strcmp(te->desc, "ACL") == 0 && + strncmp(te->tag, "LARGE OBJECT ", 13) == 0) || + (strcmp(te->desc, "COMMENT") == 0 && + strncmp(te->tag, "LARGE OBJECT ", 13) == 0) || + (strcmp(te->desc, "SECURITY LABEL") == 0 && + strncmp(te->tag, "LARGE OBJECT ", 13) == 0)) res = res & REQ_DATA; else res = res & ~REQ_DATA; } /* - * Special case: type with tag; this is part of a - * DATA restore even though it has SQL. + * Special case: type with tag; this is obsolete and we + * always ignore it. */ if ((strcmp(te->desc, "") == 0) && (strcmp(te->tag, "Max OID") == 0)) - res = REQ_DATA; + return 0; /* Mask it if we only want schema */ if (ropt->schemaOnly) @@ -2040,13 +2374,27 @@ _tocEntryRequired(TocEntry *te, RestoreOptions *ropt, bool acl_pass) if (!te->defn || strlen(te->defn) == 0) res = res & ~REQ_SCHEMA; - /* Finally, if we used a list, limit based on that as well */ - if (ropt->limitToList && !ropt->idWanted[te->dumpId - 1]) + /* Finally, if there's a per-ID filter, limit based on that as well */ + if (ropt->idWanted && !ropt->idWanted[te->dumpId - 1]) return 0; return res; } +/* + * Identify TOC entries that are ACLs. + */ +static bool +_tocEntryIsACL(TocEntry *te) +{ + /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */ + if (strcmp(te->desc, "ACL") == 0 || + strcmp(te->desc, "ACL LANGUAGE") == 0 || + strcmp(te->desc, "DEFAULT ACL") == 0) + return true; + return false; +} + /* * Issue SET commands for parameters that we want to have set the same way * at all times during execution of a restore script. @@ -2054,25 +2402,28 @@ _tocEntryRequired(TocEntry *te, RestoreOptions *ropt, bool acl_pass) static void _doSetFixedOutputState(ArchiveHandle *AH) { - TocEntry *te; + /* Disable statement_timeout in archive for pg_restore/psql */ + ahprintf(AH, "SET statement_timeout = 0;\n"); - /* If we have an encoding setting, emit that */ - te = AH->toc->next; - while (te != AH->toc) - { - if (strcmp(te->desc, "ENCODING") == 0) - { - ahprintf(AH, "%s", te->defn); - break; - } - te = te->next; - } + /* Select the correct character set encoding */ + ahprintf(AH, "SET client_encoding = '%s';\n", + pg_encoding_to_char(AH->public.encoding)); + + /* Select the correct string literal syntax */ + ahprintf(AH, "SET standard_conforming_strings = %s;\n", + AH->public.std_strings ? "on" : "off"); + + /* Select the role to be used during restore */ + if (AH->ropt && AH->ropt->use_role) + ahprintf(AH, "SET ROLE %s;\n", fmtId(AH->ropt->use_role)); /* Make sure function checking is disabled */ ahprintf(AH, "SET check_function_bodies = false;\n"); /* Avoid annoying notices etc */ ahprintf(AH, "SET client_min_messages = warning;\n"); + if (!AH->public.std_strings) + ahprintf(AH, "SET escape_string_warning = off;\n"); ahprintf(AH, "\n"); } @@ -2093,7 +2444,7 @@ _doSetSessionAuth(ArchiveHandle *AH, const char *user) * SQL requires a string literal here. Might as well be correct. */ if (user && *user) - appendStringLiteral(cmd, user, false); + appendStringLiteralAHX(cmd, user, AH); else appendPQExpBuffer(cmd, "DEFAULT"); appendPQExpBuffer(cmd, ";"); @@ -2151,45 +2502,44 @@ _doSetWithOids(ArchiveHandle *AH, const bool withOids) /* - * Issue the commands to connect to the specified database - * as the specified user. + * Issue the commands to connect to the specified database. * * If we're currently restoring right into a database, this will * actually establish a connection. Otherwise it puts a \connect into * the script output. + * + * NULL dbname implies reconnecting to the current DB (pretty useless). */ static void -_reconnectToDB(ArchiveHandle *AH, const char *dbname, const char *user) +_reconnectToDB(ArchiveHandle *AH, const char *dbname) { if (RestoringToDB(AH)) - ReconnectToServer(AH, dbname, user); + ReconnectToServer(AH, dbname, NULL); else { PQExpBuffer qry = createPQExpBuffer(); - appendPQExpBuffer(qry, "\\connect %s", + appendPQExpBuffer(qry, "\\connect %s\n\n", dbname ? fmtId(dbname) : "-"); - appendPQExpBuffer(qry, " %s\n\n", - fmtId(user)); - - ahprintf(AH, qry->data); - + ahprintf(AH, "%s", qry->data); destroyPQExpBuffer(qry); } /* - * NOTE: currUser keeps track of what the imaginary session user in - * our script is + * NOTE: currUser keeps track of what the imaginary session user in our + * script is. It's now effectively reset to the original userID. */ if (AH->currUser) free(AH->currUser); + AH->currUser = NULL; - AH->currUser = strdup(user); - - /* don't assume we still know the output schema */ + /* don't assume we still know the output schema, tablespace, etc either */ if (AH->currSchema) free(AH->currSchema); - AH->currSchema = strdup(""); + AH->currSchema = NULL; + if (AH->currTablespace) + free(AH->currTablespace); + AH->currTablespace = NULL; AH->currWithOids = -1; /* re-establish fixed state */ @@ -2213,17 +2563,16 @@ _becomeUser(ArchiveHandle *AH, const char *user) _doSetSessionAuth(AH, user); /* - * NOTE: currUser keeps track of what the imaginary session user in - * our script is + * NOTE: currUser keeps track of what the imaginary session user in our + * script is */ if (AH->currUser) free(AH->currUser); - AH->currUser = strdup(user); } /* - * Become the owner of the the given TOC entry object. If + * Become the owner of the given TOC entry object. If * changes in ownership are not allowed, this doesn't do anything. */ static void @@ -2260,7 +2609,7 @@ _selectOutputSchema(ArchiveHandle *AH, const char *schemaName) PQExpBuffer qry; if (!schemaName || *schemaName == '\0' || - strcmp(AH->currSchema, schemaName) == 0) + (AH->currSchema && strcmp(AH->currSchema, schemaName) == 0)) return; /* no need to do anything */ qry = createPQExpBuffer(); @@ -2278,8 +2627,8 @@ _selectOutputSchema(ArchiveHandle *AH, const char *schemaName) if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) warn_or_die_horribly(AH, modulename, - "could not set search_path to \"%s\": %s", - schemaName, PQerrorMessage(AH->connection)); + "could not set search_path to \"%s\": %s", + schemaName, PQerrorMessage(AH->connection)); PQclear(res); } @@ -2293,65 +2642,162 @@ _selectOutputSchema(ArchiveHandle *AH, const char *schemaName) destroyPQExpBuffer(qry); } - -/** - * Parses the dropStmt part of a TOC entry and returns - * a newly allocated string that is the object identifier - * The caller must free the result. +/* + * Issue the commands to select the specified tablespace as the current one + * in the target database. */ -static char * -_getObjectFromDropStmt(const char *dropStmt, const char *type) +static void +_selectTablespace(ArchiveHandle *AH, const char *tablespace) { - /* Chop "DROP" off the front and make a copy */ - char *first = strdup(dropStmt + 5); - char *last = first + strlen(first) - 1; /* Points to the last - * real char in extract */ - char *buf = NULL; + PQExpBuffer qry; + const char *want, + *have; - /* - * Loop from the end of the string until last char is no longer '\n' - * or ';' - */ - while (last >= first && (*last == '\n' || *last == ';')) - last--; + /* do nothing in --no-tablespaces mode */ + if (AH->ropt->noTablespace) + return; - /* Insert end of string one place after last */ - *(last + 1) = '\0'; + have = AH->currTablespace; + want = tablespace; - /* - * Take off CASCADE if necessary. Only TYPEs seem to have this, but - * may as well check for all - */ - if ((last - first) >= 8) + /* no need to do anything for non-tablespace object */ + if (!want) + return; + + if (have && strcmp(want, have) == 0) + return; /* no need to do anything */ + + qry = createPQExpBuffer(); + + if (strcmp(want, "") == 0) + { + /* We want the tablespace to be the database's default */ + appendPQExpBuffer(qry, "SET default_tablespace = ''"); + } + else + { + /* We want an explicit tablespace */ + appendPQExpBuffer(qry, "SET default_tablespace = %s", fmtId(want)); + } + + if (RestoringToDB(AH)) { - if (strcmp(last - 7, " CASCADE") == 0) - last -= 8; + PGresult *res; + + res = PQexec(AH->connection, qry->data); + + if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) + warn_or_die_horribly(AH, modulename, + "could not set default_tablespace to %s: %s", + fmtId(want), PQerrorMessage(AH->connection)); + + PQclear(res); } + else + ahprintf(AH, "%s;\n\n", qry->data); - /* Insert end of string one place after last */ - *(last + 1) = '\0'; + if (AH->currTablespace) + free(AH->currTablespace); + AH->currTablespace = strdup(want); - /* Special case VIEWs and SEQUENCEs. They must use ALTER TABLE. */ - if (strcmp(type, "VIEW") == 0 && (last - first) >= 5) + destroyPQExpBuffer(qry); +} + +/* + * Extract an object description for a TOC entry, and append it to buf. + * + * This is not quite as general as it may seem, since it really only + * handles constructing the right thing to put into ALTER ... OWNER TO. + * + * The whole thing is pretty grotty, but we are kind of stuck since the + * information used is all that's available in older dump files. + */ +static void +_getObjectDescription(PQExpBuffer buf, TocEntry *te, ArchiveHandle *AH) +{ + const char *type = te->desc; + + /* Use ALTER TABLE for views and sequences */ + if (strcmp(type, "VIEW") == 0 || strcmp(type, "SEQUENCE") == 0) + type = "TABLE"; + + /* objects named by a schema and name */ + if (strcmp(type, "CONVERSION") == 0 || + strcmp(type, "DOMAIN") == 0 || + strcmp(type, "TABLE") == 0 || + strcmp(type, "TYPE") == 0 || + strcmp(type, "FOREIGN TABLE") == 0 || + strcmp(type, "TEXT SEARCH DICTIONARY") == 0 || + strcmp(type, "TEXT SEARCH CONFIGURATION") == 0) { - int len = 6 + strlen(first + 5) + 1; + appendPQExpBuffer(buf, "%s ", type); + if (te->namespace && te->namespace[0]) /* is null pre-7.3 */ + appendPQExpBuffer(buf, "%s.", fmtId(te->namespace)); - buf = malloc(len); - snprintf(buf, len, "TABLE %s", first + 5); - free(first); + /* + * Pre-7.3 pg_dump would sometimes (not always) put a fmtId'd name + * into te->tag for an index. This check is heuristic, so make its + * scope as narrow as possible. + */ + if (AH->version < K_VERS_1_7 && + te->tag[0] == '"' && + te->tag[strlen(te->tag) - 1] == '"' && + strcmp(type, "INDEX") == 0) + appendPQExpBuffer(buf, "%s", te->tag); + else + appendPQExpBuffer(buf, "%s", fmtId(te->tag)); + return; + } + + /* objects named by just a name */ + if (strcmp(type, "DATABASE") == 0 || + strcmp(type, "PROCEDURAL LANGUAGE") == 0 || + strcmp(type, "SCHEMA") == 0 || + strcmp(type, "FOREIGN DATA WRAPPER") == 0 || + strcmp(type, "SERVER") == 0 || + strcmp(type, "USER MAPPING") == 0) + { + appendPQExpBuffer(buf, "%s %s", type, fmtId(te->tag)); + return; + } + + /* BLOBs just have a name, but it's numeric so must not use fmtId */ + if (strcmp(type, "BLOB") == 0) + { + appendPQExpBuffer(buf, "LARGE OBJECT %s", te->tag); + return; } - else if (strcmp(type, "SEQUENCE") == 0 && (last - first) >= 9) + + /* + * These object types require additional decoration. Fortunately, the + * information needed is exactly what's in the DROP command. + */ + if (strcmp(type, "AGGREGATE") == 0 || + strcmp(type, "FUNCTION") == 0 || + strcmp(type, "OPERATOR") == 0 || + strcmp(type, "OPERATOR CLASS") == 0 || + strcmp(type, "OPERATOR FAMILY") == 0) { - int len = 6 + strlen(first + 9) + 1; + /* Chop "DROP " off the front and make a modifiable copy */ + char *first = strdup(te->dropStmt + 5); + char *last; + + /* point to last character in string */ + last = first + strlen(first) - 1; + + /* Strip off any ';' or '\n' at the end */ + while (last >= first && (*last == '\n' || *last == ';')) + last--; + *(last + 1) = '\0'; + + appendPQExpBufferStr(buf, first); - buf = malloc(len); - snprintf(buf, len, "TABLE %s", first + 9); free(first); + return; } - else - buf = first; - return buf; + write_msg(modulename, "WARNING: don't know how to set owner for object type %s\n", + type); } static void @@ -2360,27 +2806,35 @@ _printTocEntry(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt, bool isDat /* ACLs are dumped only during acl pass */ if (acl_pass) { - if (strcmp(te->desc, "ACL") != 0) + if (!_tocEntryIsACL(te)) return; } else { - if (strcmp(te->desc, "ACL") == 0) + if (_tocEntryIsACL(te)) return; } /* * Avoid dumping the public schema, as it will already be created ... - * unless we are using --clean mode, in which case it's been deleted - * and we'd better recreate it. + * unless we are using --clean mode, in which case it's been deleted and + * we'd better recreate it. Likewise for its comment, if any. */ - if (!ropt->dropSchema && - strcmp(te->desc, "SCHEMA") == 0 && strcmp(te->tag, "public") == 0) - return; + if (!ropt->dropSchema) + { + if (strcmp(te->desc, "SCHEMA") == 0 && + strcmp(te->tag, "public") == 0) + return; + /* The comment restore would require super-user privs, so avoid it. */ + if (strcmp(te->desc, "COMMENT") == 0 && + strcmp(te->tag, "SCHEMA public") == 0) + return; + } - /* Select owner and schema as necessary */ + /* Select owner, schema, and tablespace as necessary */ _becomeOwner(AH, te); _selectOutputSchema(AH, te->namespace); + _selectTablespace(AH, te->tablespace); /* Set up OID mode too */ if (strcmp(te->desc, "TABLE") == 0) @@ -2411,11 +2865,15 @@ _printTocEntry(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt, bool isDat ahprintf(AH, "\n"); } } - ahprintf(AH, "-- %sName: %s; Type: %s; Schema: %s; Owner: %s\n", + ahprintf(AH, "-- %sName: %s; Type: %s; Schema: %s; Owner: %s", pfx, te->tag, te->desc, te->namespace ? te->namespace : "-", - te->owner); - if (AH->PrintExtraTocPtr != NULL) + ropt->noOwner ? "-" : te->owner); + if (te->tablespace && !ropt->noTablespace) + ahprintf(AH, "; Tablespace: %s", te->tablespace); + ahprintf(AH, "\n"); + + if (AH->PrintExtraTocPtr !=NULL) (*AH->PrintExtraTocPtr) (AH, te); ahprintf(AH, "--\n\n"); } @@ -2423,13 +2881,13 @@ _printTocEntry(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt, bool isDat /* * Actually print the definition. * - * Really crude hack for suppressing AUTHORIZATION clause of CREATE - * SCHEMA when --no-owner mode is selected. This is ugly, but I see - * no other good way ... + * Really crude hack for suppressing AUTHORIZATION clause that old pg_dump + * versions put into CREATE SCHEMA. We have to do this when --no-owner + * mode is selected. This is ugly, but I see no other good way ... */ - if (AH->ropt && AH->ropt->noOwner && strcmp(te->desc, "SCHEMA") == 0) + if (ropt->noOwner && strcmp(te->desc, "SCHEMA") == 0) { - ahprintf(AH, "CREATE SCHEMA %s;\n\n\n", te->tag); + ahprintf(AH, "CREATE SCHEMA %s;\n\n\n", fmtId(te->tag)); } else { @@ -2439,36 +2897,66 @@ _printTocEntry(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt, bool isDat /* * If we aren't using SET SESSION AUTH to determine ownership, we must - * instead issue an ALTER OWNER command. Ugly, since we have to cons - * one up based on the dropStmt. We don't need this for schemas - * (since we use CREATE SCHEMA AUTHORIZATION instead), nor for some - * other object types. + * instead issue an ALTER OWNER command. We assume that anything without + * a DROP command is not a separately ownable object. All the categories + * with DROP commands must appear in one list or the other. */ if (!ropt->noOwner && !ropt->use_setsessauth && - strlen(te->owner) > 0 && strlen(te->dropStmt) > 0 && - (strcmp(te->desc, "AGGREGATE") == 0 || - strcmp(te->desc, "CONVERSION") == 0 || - strcmp(te->desc, "DOMAIN") == 0 || - strcmp(te->desc, "FUNCTION") == 0 || - strcmp(te->desc, "OPERATOR") == 0 || - strcmp(te->desc, "OPERATOR CLASS") == 0 || - strcmp(te->desc, "TABLE") == 0 || - strcmp(te->desc, "TYPE") == 0 || - strcmp(te->desc, "VIEW") == 0 || - strcmp(te->desc, "SEQUENCE") == 0)) + strlen(te->owner) > 0 && strlen(te->dropStmt) > 0) { - char *temp = _getObjectFromDropStmt(te->dropStmt, te->desc); + if (strcmp(te->desc, "AGGREGATE") == 0 || + strcmp(te->desc, "BLOB") == 0 || + strcmp(te->desc, "CONVERSION") == 0 || + strcmp(te->desc, "DATABASE") == 0 || + strcmp(te->desc, "DOMAIN") == 0 || + strcmp(te->desc, "FUNCTION") == 0 || + strcmp(te->desc, "OPERATOR") == 0 || + strcmp(te->desc, "OPERATOR CLASS") == 0 || + strcmp(te->desc, "OPERATOR FAMILY") == 0 || + strcmp(te->desc, "PROCEDURAL LANGUAGE") == 0 || + strcmp(te->desc, "SCHEMA") == 0 || + strcmp(te->desc, "TABLE") == 0 || + strcmp(te->desc, "TYPE") == 0 || + strcmp(te->desc, "VIEW") == 0 || + strcmp(te->desc, "SEQUENCE") == 0 || + strcmp(te->desc, "FOREIGN TABLE") == 0 || + strcmp(te->desc, "TEXT SEARCH DICTIONARY") == 0 || + strcmp(te->desc, "TEXT SEARCH CONFIGURATION") == 0 || + strcmp(te->desc, "FOREIGN DATA WRAPPER") == 0 || + strcmp(te->desc, "SERVER") == 0) + { + PQExpBuffer temp = createPQExpBuffer(); - ahprintf(AH, "ALTER %s OWNER TO %s;\n\n", temp, fmtId(te->owner)); - free(temp); + appendPQExpBuffer(temp, "ALTER "); + _getObjectDescription(temp, te, AH); + appendPQExpBuffer(temp, " OWNER TO %s;", fmtId(te->owner)); + ahprintf(AH, "%s\n\n", temp->data); + destroyPQExpBuffer(temp); + } + else if (strcmp(te->desc, "CAST") == 0 || + strcmp(te->desc, "CHECK CONSTRAINT") == 0 || + strcmp(te->desc, "CONSTRAINT") == 0 || + strcmp(te->desc, "DEFAULT") == 0 || + strcmp(te->desc, "FK CONSTRAINT") == 0 || + strcmp(te->desc, "INDEX") == 0 || + strcmp(te->desc, "RULE") == 0 || + strcmp(te->desc, "TRIGGER") == 0 || + strcmp(te->desc, "USER MAPPING") == 0) + { + /* these object types don't have separate owners */ + } + else + { + write_msg(modulename, "WARNING: don't know how to set owner for object type %s\n", + te->desc); + } } /* * If it's an ACL entry, it might contain SET SESSION AUTHORIZATION - * commands, so we can no longer assume we know the current auth - * setting. + * commands, so we can no longer assume we know the current auth setting. */ - if (strncmp(te->desc, "ACL", 3) == 0) + if (acl_pass) { if (AH->currUser) free(AH->currUser); @@ -2508,6 +2996,8 @@ WriteHead(ArchiveHandle *AH) WriteInt(AH, crtm.tm_year); WriteInt(AH, crtm.tm_isdst); WriteStr(AH, PQdb(AH->connection)); + WriteStr(AH, AH->public.remoteVersionStr); + WriteStr(AH, PG_VERSION); } void @@ -2517,11 +3007,16 @@ ReadHead(ArchiveHandle *AH) int fmt; struct tm crtm; - /* If we haven't already read the header... */ + /* + * If we haven't already read the header, do so. + * + * NB: this code must agree with _discoverArchiveFormat(). Maybe find a + * way to unify the cases? + */ if (!AH->readHeader) { - - (*AH->ReadBufPtr) (AH, tmpMag, 5); + if ((*AH->ReadBufPtr) (AH, tmpMag, 5) != 5) + die_horribly(AH, modulename, "unexpected end of file\n"); if (strncmp(tmpMag, "PGDMP", 5) != 0) die_horribly(AH, modulename, "did not find magic string in file header\n"); @@ -2536,7 +3031,6 @@ ReadHead(ArchiveHandle *AH) AH->version = ((AH->vmaj * 256 + AH->vmin) * 256 + AH->vrev) * 256 + 0; - if (AH->version < K_VERS_1_0 || AH->version > K_VERS_MAX) die_horribly(AH, modulename, "unsupported version (%d.%d) in file header\n", AH->vmaj, AH->vmin); @@ -2547,7 +3041,7 @@ ReadHead(ArchiveHandle *AH) (unsigned long) AH->intSize); if (AH->intSize > sizeof(int)) - write_msg(modulename, "WARNING: archive was made on a machine with larger integers, some operations may fail\n"); + write_msg(modulename, "WARNING: archive was made on a machine with larger integers, some operations might fail\n"); if (AH->version >= K_VERS_1_7) AH->offSize = (*AH->ReadBytePtr) (AH); @@ -2594,31 +3088,1057 @@ ReadHead(ArchiveHandle *AH) write_msg(modulename, "WARNING: invalid creation date in header\n"); } + if (AH->version >= K_VERS_1_10) + { + AH->archiveRemoteVersion = ReadStr(AH); + AH->archiveDumpVersion = ReadStr(AH); + } } /* * checkSeek - * check to see if fseek can be performed. + * check to see if ftell/fseek can be performed. */ - bool checkSeek(FILE *fp) { + pgoff_t tpos; - if (fseeko(fp, 0, SEEK_CUR) != 0) + /* + * If pgoff_t is wider than long, we must have "real" fseeko and not an + * emulation using fseek. Otherwise report no seek capability. + */ +#ifndef HAVE_FSEEKO + if (sizeof(pgoff_t) > sizeof(long)) return false; - else if (sizeof(off_t) > sizeof(long)) +#endif - /* - * At this point, off_t is too large for long, so we return based - * on whether an off_t version of fseek is available. - */ -#ifdef HAVE_FSEEKO - return true; -#else + /* Check that ftello works on this file */ + errno = 0; + tpos = ftello(fp); + if (errno) return false; -#endif - else - return true; + + /* + * Check that fseeko(SEEK_SET) works, too. NB: we used to try to test + * this with fseeko(fp, 0, SEEK_CUR). But some platforms treat that as a + * successful no-op even on files that are otherwise unseekable. + */ + if (fseeko(fp, tpos, SEEK_SET) != 0) + return false; + + return true; +} + + +/* + * dumpTimestamp + */ +static void +dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim) +{ + char buf[256]; + + /* + * We don't print the timezone on Win32, because the names are long and + * localized, which means they may contain characters in various random + * encodings; this has been seen to cause encoding errors when reading the + * dump script. + */ + if (strftime(buf, sizeof(buf), +#ifndef WIN32 + "%Y-%m-%d %H:%M:%S %Z", +#else + "%Y-%m-%d %H:%M:%S", +#endif + localtime(&tim)) != 0) + ahprintf(AH, "-- %s %s\n\n", msg, buf); +} + + +/* + * Main engine for parallel restore. + * + * Work is done in three phases. + * First we process tocEntries until we come to one that is marked + * SECTION_DATA or SECTION_POST_DATA, in a single connection, just as for a + * standard restore. Second we process the remaining non-ACL steps in + * parallel worker children (threads on Windows, processes on Unix), each of + * which connects separately to the database. Finally we process all the ACL + * entries in a single connection (that happens back in RestoreArchive). + */ +static void +restore_toc_entries_parallel(ArchiveHandle *AH) +{ + RestoreOptions *ropt = AH->ropt; + int n_slots = ropt->number_of_jobs; + ParallelSlot *slots; + int work_status; + int next_slot; + TocEntry pending_list; + TocEntry ready_list; + TocEntry *next_work_item; + thandle ret_child; + TocEntry *te; + + ahlog(AH, 2, "entering restore_toc_entries_parallel\n"); + + /* we haven't got round to making this work for all archive formats */ + if (AH->ClonePtr == NULL || AH->ReopenPtr == NULL) + die_horribly(AH, modulename, "parallel restore is not supported with this archive file format\n"); + + /* doesn't work if the archive represents dependencies as OIDs, either */ + if (AH->version < K_VERS_1_8) + die_horribly(AH, modulename, "parallel restore is not supported with archives made by pre-8.0 pg_dump\n"); + + slots = (ParallelSlot *) calloc(sizeof(ParallelSlot), n_slots); + + /* Adjust dependency information */ + fix_dependencies(AH); + + /* + * Do all the early stuff in a single connection in the parent. There's no + * great point in running it in parallel, in fact it will actually run + * faster in a single connection because we avoid all the connection and + * setup overhead. Also, pg_dump is not currently very good about + * showing all the dependencies of SECTION_PRE_DATA items, so we do not + * risk trying to process them out-of-order. + */ + for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next) + { + /* Non-PRE_DATA items are just ignored for now */ + if (next_work_item->section == SECTION_DATA || + next_work_item->section == SECTION_POST_DATA) + continue; + + ahlog(AH, 1, "processing item %d %s %s\n", + next_work_item->dumpId, + next_work_item->desc, next_work_item->tag); + + (void) restore_toc_entry(AH, next_work_item, ropt, false); + + /* there should be no touch of ready_list here, so pass NULL */ + reduce_dependencies(AH, next_work_item, NULL); + } + + /* + * Now close parent connection in prep for parallel steps. We do this + * mainly to ensure that we don't exceed the specified number of parallel + * connections. + */ + PQfinish(AH->connection); + AH->connection = NULL; + + /* blow away any transient state from the old connection */ + if (AH->currUser) + free(AH->currUser); + AH->currUser = NULL; + if (AH->currSchema) + free(AH->currSchema); + AH->currSchema = NULL; + if (AH->currTablespace) + free(AH->currTablespace); + AH->currTablespace = NULL; + AH->currWithOids = -1; + + /* + * Initialize the lists of pending and ready items. After this setup, the + * pending list is everything that needs to be done but is blocked by one + * or more dependencies, while the ready list contains items that have no + * remaining dependencies. Note: we don't yet filter out entries that + * aren't going to be restored. They might participate in dependency + * chains connecting entries that should be restored, so we treat them as + * live until we actually process them. + */ + par_list_header_init(&pending_list); + par_list_header_init(&ready_list); + for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next) + { + /* All PRE_DATA items were dealt with above */ + if (next_work_item->section == SECTION_DATA || + next_work_item->section == SECTION_POST_DATA) + { + if (next_work_item->depCount > 0) + par_list_append(&pending_list, next_work_item); + else + par_list_append(&ready_list, next_work_item); + } + } + + /* + * main parent loop + * + * Keep going until there is no worker still running AND there is no work + * left to be done. + */ + + ahlog(AH, 1, "entering main parallel loop\n"); + + while ((next_work_item = get_next_work_item(AH, &ready_list, + slots, n_slots)) != NULL || + work_in_progress(slots, n_slots)) + { + if (next_work_item != NULL) + { + teReqs reqs; + + /* If not to be dumped, don't waste time launching a worker */ + reqs = _tocEntryRequired(next_work_item, AH->ropt, false); + if ((reqs & (REQ_SCHEMA | REQ_DATA)) == 0) + { + ahlog(AH, 1, "skipping item %d %s %s\n", + next_work_item->dumpId, + next_work_item->desc, next_work_item->tag); + + par_list_remove(next_work_item); + reduce_dependencies(AH, next_work_item, &ready_list); + + continue; + } + + if ((next_slot = get_next_slot(slots, n_slots)) != NO_SLOT) + { + /* There is work still to do and a worker slot available */ + thandle child; + RestoreArgs *args; + + ahlog(AH, 1, "launching item %d %s %s\n", + next_work_item->dumpId, + next_work_item->desc, next_work_item->tag); + + par_list_remove(next_work_item); + + /* this memory is dealloced in mark_work_done() */ + args = malloc(sizeof(RestoreArgs)); + args->AH = CloneArchive(AH); + args->te = next_work_item; + + /* run the step in a worker child */ + child = spawn_restore(args); + + slots[next_slot].child_id = child; + slots[next_slot].args = args; + + continue; + } + } + + /* + * If we get here there must be work being done. Either there is no + * work available to schedule (and work_in_progress returned true) or + * there are no slots available. So we wait for a worker to finish, + * and process the result. + */ + ret_child = reap_child(slots, n_slots, &work_status); + + if (WIFEXITED(work_status)) + { + mark_work_done(AH, &ready_list, + ret_child, WEXITSTATUS(work_status), + slots, n_slots); + } + else + { + die_horribly(AH, modulename, "worker process crashed: status %d\n", + work_status); + } + } + + ahlog(AH, 1, "finished main parallel loop\n"); + + /* + * Now reconnect the single parent connection. + */ + ConnectDatabase((Archive *) AH, ropt->dbname, + ropt->pghost, ropt->pgport, ropt->username, + ropt->promptPassword); + + _doSetFixedOutputState(AH); + + /* + * Make sure there is no non-ACL work left due to, say, circular + * dependencies, or some other pathological condition. If so, do it in the + * single parent connection. + */ + for (te = pending_list.par_next; te != &pending_list; te = te->par_next) + { + ahlog(AH, 1, "processing missed item %d %s %s\n", + te->dumpId, te->desc, te->tag); + (void) restore_toc_entry(AH, te, ropt, false); + } + + /* The ACLs will be handled back in RestoreArchive. */ +} + +/* + * create a worker child to perform a restore step in parallel + */ +static thandle +spawn_restore(RestoreArgs *args) +{ + thandle child; + + /* Ensure stdio state is quiesced before forking */ + fflush(NULL); + +#ifndef WIN32 + child = fork(); + if (child == 0) + { + /* in child process */ + parallel_restore(args); + die_horribly(args->AH, modulename, + "parallel_restore should not return\n"); + } + else if (child < 0) + { + /* fork failed */ + die_horribly(args->AH, modulename, + "could not create worker process: %s\n", + strerror(errno)); + } +#else + child = (HANDLE) _beginthreadex(NULL, 0, (void *) parallel_restore, + args, 0, NULL); + if (child == 0) + die_horribly(args->AH, modulename, + "could not create worker thread: %s\n", + strerror(errno)); +#endif + + return child; +} + +/* + * collect status from a completed worker child + */ +static thandle +reap_child(ParallelSlot *slots, int n_slots, int *work_status) +{ +#ifndef WIN32 + /* Unix is so much easier ... */ + return wait(work_status); +#else + static HANDLE *handles = NULL; + int hindex, + snum, + tnum; + thandle ret_child; + DWORD res; + + /* first time around only, make space for handles to listen on */ + if (handles == NULL) + handles = (HANDLE *) calloc(sizeof(HANDLE), n_slots); + + /* set up list of handles to listen to */ + for (snum = 0, tnum = 0; snum < n_slots; snum++) + if (slots[snum].child_id != 0) + handles[tnum++] = slots[snum].child_id; + + /* wait for one to finish */ + hindex = WaitForMultipleObjects(tnum, handles, false, INFINITE); + + /* get handle of finished thread */ + ret_child = handles[hindex - WAIT_OBJECT_0]; + + /* get the result */ + GetExitCodeThread(ret_child, &res); + *work_status = res; + + /* dispose of handle to stop leaks */ + CloseHandle(ret_child); + + return ret_child; +#endif +} + +/* + * are we doing anything now? + */ +static bool +work_in_progress(ParallelSlot *slots, int n_slots) +{ + int i; + + for (i = 0; i < n_slots; i++) + { + if (slots[i].child_id != 0) + return true; + } + return false; +} + +/* + * find the first free parallel slot (if any). + */ +static int +get_next_slot(ParallelSlot *slots, int n_slots) +{ + int i; + + for (i = 0; i < n_slots; i++) + { + if (slots[i].child_id == 0) + return i; + } + return NO_SLOT; +} + + +/* + * Check if te1 has an exclusive lock requirement for an item that te2 also + * requires, whether or not te2's requirement is for an exclusive lock. + */ +static bool +has_lock_conflicts(TocEntry *te1, TocEntry *te2) +{ + int j, + k; + + for (j = 0; j < te1->nLockDeps; j++) + { + for (k = 0; k < te2->nDeps; k++) + { + if (te1->lockDeps[j] == te2->dependencies[k]) + return true; + } + } + return false; +} + + +/* + * Initialize the header of a parallel-processing list. + * + * These are circular lists with a dummy TocEntry as header, just like the + * main TOC list; but we use separate list links so that an entry can be in + * the main TOC list as well as in a parallel-processing list. + */ +static void +par_list_header_init(TocEntry *l) +{ + l->par_prev = l->par_next = l; +} + +/* Append te to the end of the parallel-processing list headed by l */ +static void +par_list_append(TocEntry *l, TocEntry *te) +{ + te->par_prev = l->par_prev; + l->par_prev->par_next = te; + l->par_prev = te; + te->par_next = l; +} + +/* Remove te from whatever parallel-processing list it's in */ +static void +par_list_remove(TocEntry *te) +{ + te->par_prev->par_next = te->par_next; + te->par_next->par_prev = te->par_prev; + te->par_prev = NULL; + te->par_next = NULL; +} + + +/* + * Find the next work item (if any) that is capable of being run now. + * + * To qualify, the item must have no remaining dependencies + * and no requirements for locks that are incompatible with + * items currently running. Items in the ready_list are known to have + * no remaining dependencies, but we have to check for lock conflicts. + * + * Note that the returned item has *not* been removed from ready_list. + * The caller must do that after successfully dispatching the item. + * + * pref_non_data is for an alternative selection algorithm that gives + * preference to non-data items if there is already a data load running. + * It is currently disabled. + */ +static TocEntry * +get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list, + ParallelSlot *slots, int n_slots) +{ + bool pref_non_data = false; /* or get from AH->ropt */ + TocEntry *data_te = NULL; + TocEntry *te; + int i, + k; + + /* + * Bogus heuristics for pref_non_data + */ + if (pref_non_data) + { + int count = 0; + + for (k = 0; k < n_slots; k++) + if (slots[k].args->te != NULL && + slots[k].args->te->section == SECTION_DATA) + count++; + if (n_slots == 0 || count * 4 < n_slots) + pref_non_data = false; + } + + /* + * Search the ready_list until we find a suitable item. + */ + for (te = ready_list->par_next; te != ready_list; te = te->par_next) + { + bool conflicts = false; + + /* + * Check to see if the item would need exclusive lock on something + * that a currently running item also needs lock on, or vice versa. If + * so, we don't want to schedule them together. + */ + for (i = 0; i < n_slots && !conflicts; i++) + { + TocEntry *running_te; + + if (slots[i].args == NULL) + continue; + running_te = slots[i].args->te; + + if (has_lock_conflicts(te, running_te) || + has_lock_conflicts(running_te, te)) + { + conflicts = true; + break; + } + } + + if (conflicts) + continue; + + if (pref_non_data && te->section == SECTION_DATA) + { + if (data_te == NULL) + data_te = te; + continue; + } + + /* passed all tests, so this item can run */ + return te; + } + + if (data_te != NULL) + return data_te; + + ahlog(AH, 2, "no item ready\n"); + return NULL; +} + + +/* + * Restore a single TOC item in parallel with others + * + * this is the procedure run as a thread (Windows) or a + * separate process (everything else). + */ +static parallel_restore_result +parallel_restore(RestoreArgs *args) +{ + ArchiveHandle *AH = args->AH; + TocEntry *te = args->te; + RestoreOptions *ropt = AH->ropt; + int retval; + + /* + * Close and reopen the input file so we have a private file pointer that + * doesn't stomp on anyone else's file pointer, if we're actually going to + * need to read from the file. Otherwise, just close it except on Windows, + * where it will possibly be needed by other threads. + * + * Note: on Windows, since we are using threads not processes, the reopen + * call *doesn't* close the original file pointer but just open a new one. + */ + if (te->section == SECTION_DATA) + (AH->ReopenPtr) (AH); +#ifndef WIN32 + else + (AH->ClosePtr) (AH); +#endif + + /* + * We need our own database connection, too + */ + ConnectDatabase((Archive *) AH, ropt->dbname, + ropt->pghost, ropt->pgport, ropt->username, + ropt->promptPassword); + + _doSetFixedOutputState(AH); + + /* Restore the TOC item */ + retval = restore_toc_entry(AH, te, ropt, true); + + /* And clean up */ + PQfinish(AH->connection); + AH->connection = NULL; + + /* If we reopened the file, we are done with it, so close it now */ + if (te->section == SECTION_DATA) + (AH->ClosePtr) (AH); + + if (retval == 0 && AH->public.n_errors) + retval = WORKER_IGNORED_ERRORS; + +#ifndef WIN32 + exit(retval); +#else + return retval; +#endif +} + + +/* + * Housekeeping to be done after a step has been parallel restored. + * + * Clear the appropriate slot, free all the extra memory we allocated, + * update status, and reduce the dependency count of any dependent items. + */ +static void +mark_work_done(ArchiveHandle *AH, TocEntry *ready_list, + thandle worker, int status, + ParallelSlot *slots, int n_slots) +{ + TocEntry *te = NULL; + int i; + + for (i = 0; i < n_slots; i++) + { + if (slots[i].child_id == worker) + { + slots[i].child_id = 0; + te = slots[i].args->te; + DeCloneArchive(slots[i].args->AH); + free(slots[i].args); + slots[i].args = NULL; + + break; + } + } + + if (te == NULL) + die_horribly(AH, modulename, "could not find slot of finished worker\n"); + + ahlog(AH, 1, "finished item %d %s %s\n", + te->dumpId, te->desc, te->tag); + + if (status == WORKER_CREATE_DONE) + mark_create_done(AH, te); + else if (status == WORKER_INHIBIT_DATA) + { + inhibit_data_for_failed_table(AH, te); + AH->public.n_errors++; + } + else if (status == WORKER_IGNORED_ERRORS) + AH->public.n_errors++; + else if (status != 0) + die_horribly(AH, modulename, "worker process failed: exit code %d\n", + status); + + reduce_dependencies(AH, te, ready_list); +} + + +/* + * Process the dependency information into a form useful for parallel restore. + * + * This function takes care of fixing up some missing or badly designed + * dependencies, and then prepares subsidiary data structures that will be + * used in the main parallel-restore logic, including: + * 1. We build the tocsByDumpId[] index array. + * 2. We build the revDeps[] arrays of incoming dependency dumpIds. + * 3. We set up depCount fields that are the number of as-yet-unprocessed + * dependencies for each TOC entry. + * + * We also identify locking dependencies so that we can avoid trying to + * schedule conflicting items at the same time. + */ +static void +fix_dependencies(ArchiveHandle *AH) +{ + TocEntry *te; + int i; + + /* + * It is convenient to have an array that indexes the TOC entries by dump + * ID, rather than searching the TOC list repeatedly. Entries for dump + * IDs not present in the TOC will be NULL. + * + * NOTE: because maxDumpId is just the highest dump ID defined in the + * archive, there might be dependencies for IDs > maxDumpId. All uses of + * this array must guard against out-of-range dependency numbers. + * + * Also, initialize the depCount/revDeps/nRevDeps fields, and make sure + * the TOC items are marked as not being in any parallel-processing list. + */ + maxDumpId = AH->maxDumpId; + tocsByDumpId = (TocEntry **) calloc(maxDumpId, sizeof(TocEntry *)); + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + tocsByDumpId[te->dumpId - 1] = te; + te->depCount = te->nDeps; + te->revDeps = NULL; + te->nRevDeps = 0; + te->par_prev = NULL; + te->par_next = NULL; + } + + /* + * POST_DATA items that are shown as depending on a table need to be + * re-pointed to depend on that table's data, instead. This ensures they + * won't get scheduled until the data has been loaded. We handle this by + * first finding TABLE/TABLE DATA pairs and then scanning all the + * dependencies. + * + * Note: currently, a TABLE DATA should always have exactly one + * dependency, on its TABLE item. So we don't bother to search, but look + * just at the first dependency. We do trouble to make sure that it's a + * TABLE, if possible. However, if the dependency isn't in the archive + * then just assume it was a TABLE; this is to cover cases where the table + * was suppressed but we have the data and some dependent post-data items. + * + * XXX this is O(N^2) if there are a lot of tables. We ought to fix + * pg_dump to produce correctly-linked dependencies in the first place. + */ + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + if (strcmp(te->desc, "TABLE DATA") == 0 && te->nDeps > 0) + { + DumpId tableId = te->dependencies[0]; + + if (tableId > maxDumpId || + tocsByDumpId[tableId - 1] == NULL || + strcmp(tocsByDumpId[tableId - 1]->desc, "TABLE") == 0) + { + repoint_table_dependencies(AH, tableId, te->dumpId); + } + } + } + + /* + * Pre-8.4 versions of pg_dump neglected to set up a dependency from BLOB + * COMMENTS to BLOBS. Cope. (We assume there's only one BLOBS and only + * one BLOB COMMENTS in such files.) + */ + if (AH->version < K_VERS_1_11) + { + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + if (strcmp(te->desc, "BLOB COMMENTS") == 0 && te->nDeps == 0) + { + TocEntry *te2; + + for (te2 = AH->toc->next; te2 != AH->toc; te2 = te2->next) + { + if (strcmp(te2->desc, "BLOBS") == 0) + { + te->dependencies = (DumpId *) malloc(sizeof(DumpId)); + te->dependencies[0] = te2->dumpId; + te->nDeps++; + te->depCount++; + break; + } + } + break; + } + } + } + + /* + * At this point we start to build the revDeps reverse-dependency arrays, + * so all changes of dependencies must be complete. + */ + + /* + * Count the incoming dependencies for each item. Also, it is possible + * that the dependencies list items that are not in the archive at + * all. Subtract such items from the depCounts. + */ + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + for (i = 0; i < te->nDeps; i++) + { + DumpId depid = te->dependencies[i]; + + if (depid <= maxDumpId && tocsByDumpId[depid - 1] != NULL) + tocsByDumpId[depid - 1]->nRevDeps++; + else + te->depCount--; + } + } + + /* + * Allocate space for revDeps[] arrays, and reset nRevDeps so we can + * use it as a counter below. + */ + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + if (te->nRevDeps > 0) + te->revDeps = (DumpId *) malloc(te->nRevDeps * sizeof(DumpId)); + te->nRevDeps = 0; + } + + /* + * Build the revDeps[] arrays of incoming-dependency dumpIds. This + * had better agree with the loops above. + */ + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + for (i = 0; i < te->nDeps; i++) + { + DumpId depid = te->dependencies[i]; + + if (depid <= maxDumpId && tocsByDumpId[depid - 1] != NULL) + { + TocEntry *otherte = tocsByDumpId[depid - 1]; + + otherte->revDeps[otherte->nRevDeps++] = te->dumpId; + } + } + } + + /* + * Lastly, work out the locking dependencies. + */ + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + te->lockDeps = NULL; + te->nLockDeps = 0; + identify_locking_dependencies(te); + } +} + +/* + * Change dependencies on tableId to depend on tableDataId instead, + * but only in POST_DATA items. + */ +static void +repoint_table_dependencies(ArchiveHandle *AH, + DumpId tableId, DumpId tableDataId) +{ + TocEntry *te; + int i; + + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + if (te->section != SECTION_POST_DATA) + continue; + for (i = 0; i < te->nDeps; i++) + { + if (te->dependencies[i] == tableId) + { + te->dependencies[i] = tableDataId; + ahlog(AH, 2, "transferring dependency %d -> %d to %d\n", + te->dumpId, tableId, tableDataId); + } + } + } +} + +/* + * Identify which objects we'll need exclusive lock on in order to restore + * the given TOC entry (*other* than the one identified by the TOC entry + * itself). Record their dump IDs in the entry's lockDeps[] array. + */ +static void +identify_locking_dependencies(TocEntry *te) +{ + DumpId *lockids; + int nlockids; + int i; + + /* Quick exit if no dependencies at all */ + if (te->nDeps == 0) + return; + + /* Exit if this entry doesn't need exclusive lock on other objects */ + if (!(strcmp(te->desc, "CONSTRAINT") == 0 || + strcmp(te->desc, "CHECK CONSTRAINT") == 0 || + strcmp(te->desc, "FK CONSTRAINT") == 0 || + strcmp(te->desc, "RULE") == 0 || + strcmp(te->desc, "TRIGGER") == 0)) + return; + + /* + * We assume the item requires exclusive lock on each TABLE DATA item + * listed among its dependencies. (This was originally a dependency on + * the TABLE, but fix_dependencies repointed it to the data item. Note + * that all the entry types we are interested in here are POST_DATA, so + * they will all have been changed this way.) + */ + lockids = (DumpId *) malloc(te->nDeps * sizeof(DumpId)); + nlockids = 0; + for (i = 0; i < te->nDeps; i++) + { + DumpId depid = te->dependencies[i]; + + if (depid <= maxDumpId && tocsByDumpId[depid - 1] && + strcmp(tocsByDumpId[depid - 1]->desc, "TABLE DATA") == 0) + lockids[nlockids++] = depid; + } + + if (nlockids == 0) + { + free(lockids); + return; + } + + te->lockDeps = realloc(lockids, nlockids * sizeof(DumpId)); + te->nLockDeps = nlockids; +} + +/* + * Remove the specified TOC entry from the depCounts of items that depend on + * it, thereby possibly making them ready-to-run. Any pending item that + * becomes ready should be moved to the ready list. + */ +static void +reduce_dependencies(ArchiveHandle *AH, TocEntry *te, TocEntry *ready_list) +{ + int i; + + ahlog(AH, 2, "reducing dependencies for %d\n", te->dumpId); + + for (i = 0; i < te->nRevDeps; i++) + { + TocEntry *otherte = tocsByDumpId[te->revDeps[i] - 1]; + + otherte->depCount--; + if (otherte->depCount == 0 && otherte->par_prev != NULL) + { + /* It must be in the pending list, so remove it ... */ + par_list_remove(otherte); + /* ... and add to ready_list */ + par_list_append(ready_list, otherte); + } + } +} + +/* + * Set the created flag on the DATA member corresponding to the given + * TABLE member + */ +static void +mark_create_done(ArchiveHandle *AH, TocEntry *te) +{ + TocEntry *tes; + + for (tes = AH->toc->next; tes != AH->toc; tes = tes->next) + { + if (strcmp(tes->desc, "TABLE DATA") == 0 && + strcmp(tes->tag, te->tag) == 0 && + strcmp(tes->namespace ? tes->namespace : "", + te->namespace ? te->namespace : "") == 0) + { + tes->created = true; + break; + } + } +} + +/* + * Mark the DATA member corresponding to the given TABLE member + * as not wanted + */ +static void +inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te) +{ + RestoreOptions *ropt = AH->ropt; + TocEntry *tes; + + ahlog(AH, 1, "table \"%s\" could not be created, will not restore its data\n", + te->tag); + + for (tes = AH->toc->next; tes != AH->toc; tes = tes->next) + { + if (strcmp(tes->desc, "TABLE DATA") == 0 && + strcmp(tes->tag, te->tag) == 0 && + strcmp(tes->namespace ? tes->namespace : "", + te->namespace ? te->namespace : "") == 0) + { + /* mark it unwanted; we assume idWanted array already exists */ + ropt->idWanted[tes->dumpId - 1] = false; + break; + } + } +} + + +/* + * Clone and de-clone routines used in parallel restoration. + * + * Enough of the structure is cloned to ensure that there is no + * conflict between different threads each with their own clone. + * + * These could be public, but no need at present. + */ +static ArchiveHandle * +CloneArchive(ArchiveHandle *AH) +{ + ArchiveHandle *clone; + + /* Make a "flat" copy */ + clone = (ArchiveHandle *) malloc(sizeof(ArchiveHandle)); + if (clone == NULL) + die_horribly(AH, modulename, "out of memory\n"); + memcpy(clone, AH, sizeof(ArchiveHandle)); + + /* Handle format-independent fields */ + clone->pgCopyBuf = createPQExpBuffer(); + clone->sqlBuf = createPQExpBuffer(); + clone->sqlparse.tagBuf = NULL; + + /* The clone will have its own connection, so disregard connection state */ + clone->connection = NULL; + clone->currUser = NULL; + clone->currSchema = NULL; + clone->currTablespace = NULL; + clone->currWithOids = -1; + + /* savedPassword must be local in case we change it while connecting */ + if (clone->savedPassword) + clone->savedPassword = strdup(clone->savedPassword); + + /* clone has its own error count, too */ + clone->public.n_errors = 0; + + /* Let the format-specific code have a chance too */ + (clone->ClonePtr) (clone); + + return clone; +} + +/* + * Release clone-local storage. + * + * Note: we assume any clone-local connection was already closed. + */ +static void +DeCloneArchive(ArchiveHandle *AH) +{ + /* Clear format-specific state */ + (AH->DeClonePtr) (AH); + + /* Clear state allocated by CloneArchive */ + destroyPQExpBuffer(AH->pgCopyBuf); + destroyPQExpBuffer(AH->sqlBuf); + if (AH->sqlparse.tagBuf) + destroyPQExpBuffer(AH->sqlparse.tagBuf); + + /* Clear any connection-local state */ + if (AH->currUser) + free(AH->currUser); + if (AH->currSchema) + free(AH->currSchema); + if (AH->currTablespace) + free(AH->currTablespace); + if (AH->savedPassword) + free(AH->savedPassword); + + free(AH); }