*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.c,v 1.170 2009/04/12 21:02:44 adunstan Exp $
+ * src/bin/pg_dump/pg_backup_archiver.c
*
*-------------------------------------------------------------------------
*/
#define thandle HANDLE
#endif
+/* Arguments needed for a worker child */
typedef struct _restore_args
{
ArchiveHandle *AH;
- TocEntry *te;
+ TocEntry *te;
} RestoreArgs;
+/* State for each parallel activity slot */
typedef struct _parallel_slot
{
thandle child_id;
static const char *modulename = gettext_noop("archiver");
+/* index array created by fix_dependencies -- only used in parallel restore */
+static TocEntry **tocsByDumpId; /* index by dumpId - 1 */
+static DumpId maxDumpId; /* length of above array */
+
static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
const int compression, ArchiveMode mode);
static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te);
static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te);
static teReqs _tocEntryRequired(TocEntry *te, RestoreOptions *ropt, bool include_acls);
+static bool _tocEntryIsACL(TocEntry *te);
static void _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
static void _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
static TocEntry *getTocEntryByDumpId(ArchiveHandle *AH, DumpId id);
-static void _moveAfter(ArchiveHandle *AH, TocEntry *pos, TocEntry *te);
+static void _moveBefore(ArchiveHandle *AH, TocEntry *pos, TocEntry *te);
static int _discoverArchiveFormat(ArchiveHandle *AH);
static void dump_lo_buf(ArchiveHandle *AH);
static void ResetOutput(ArchiveHandle *AH, OutputContext savedContext);
static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
- RestoreOptions *ropt, bool is_parallel);
+ RestoreOptions *ropt, bool is_parallel);
static void restore_toc_entries_parallel(ArchiveHandle *AH);
static thandle spawn_restore(RestoreArgs *args);
static thandle reap_child(ParallelSlot *slots, int n_slots, int *work_status);
static bool work_in_progress(ParallelSlot *slots, int n_slots);
-static int get_next_slot(ParallelSlot *slots, int n_slots);
+static int get_next_slot(ParallelSlot *slots, int n_slots);
+static void par_list_header_init(TocEntry *l);
+static void par_list_append(TocEntry *l, TocEntry *te);
+static void par_list_remove(TocEntry *te);
static TocEntry *get_next_work_item(ArchiveHandle *AH,
- TocEntry **first_unprocessed,
- ParallelSlot *slots, int n_slots);
+ TocEntry *ready_list,
+ ParallelSlot *slots, int n_slots);
static parallel_restore_result parallel_restore(RestoreArgs *args);
-static void mark_work_done(ArchiveHandle *AH, thandle worker, int status,
- ParallelSlot *slots, int n_slots);
+static void mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
+ thandle worker, int status,
+ ParallelSlot *slots, int n_slots);
static void fix_dependencies(ArchiveHandle *AH);
static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
static void repoint_table_dependencies(ArchiveHandle *AH,
- DumpId tableId, DumpId tableDataId);
-static void identify_locking_dependencies(TocEntry *te,
- TocEntry **tocsByDumpId);
-static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te);
+ DumpId tableId, DumpId tableDataId);
+static void identify_locking_dependencies(TocEntry *te);
+static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
+ TocEntry *ready_list);
static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);
static ArchiveHandle *CloneArchive(ArchiveHandle *AH);
/*
* Check for nonsensical option combinations.
*
- * NB: create+dropSchema is useless because if you're creating the DB,
+ * NB: createDB+dropSchema is useless because if you're creating the DB,
* there's no need to drop individual items in it. Moreover, if we tried
* to do that then we'd issue the drops in the database initially
* connected to, not the one we will create, which is very bad...
*/
- if (ropt->create && ropt->dropSchema)
+ if (ropt->createDB && ropt->dropSchema)
die_horribly(AH, modulename, "-C and -c are incompatible options\n");
/*
- * -1 is not compatible with -C, because we can't create a database
- * inside a transaction block.
+ * -C is not compatible with -1, because we can't create a database inside
+ * a transaction block.
*/
- if (ropt->create && ropt->single_txn)
+ if (ropt->createDB && ropt->single_txn)
die_horribly(AH, modulename, "-C and -1 are incompatible options\n");
/*
* Make sure we won't need (de)compression we haven't got
*/
#ifndef HAVE_LIBZ
- if (AH->compression != 0 && AH->PrintTocDataPtr != NULL)
+ if (AH->compression != 0 && AH->PrintTocDataPtr !=NULL)
{
for (te = AH->toc->next; te != AH->toc; te = te->next)
{
ahprintf(AH, "--\n-- PostgreSQL database dump\n--\n\n");
if (AH->public.verbose)
+ {
+ if (AH->archiveRemoteVersion)
+ ahprintf(AH, "-- Dumped from database version %s\n",
+ AH->archiveRemoteVersion);
+ if (AH->archiveDumpVersion)
+ ahprintf(AH, "-- Dumped by pg_dump version %s\n",
+ AH->archiveDumpVersion);
dumpTimestamp(AH, "Started on", AH->createDate);
+ }
if (ropt->single_txn)
{
AH->currentTE = te;
reqs = _tocEntryRequired(te, ropt, false /* needn't drop ACLs */ );
- if (((reqs & REQ_SCHEMA) != 0) && te->dropStmt)
+ /* We want anything that's selected and has a dropStmt */
+ if (((reqs & (REQ_SCHEMA | REQ_DATA)) != 0) && te->dropStmt)
{
- /* We want the schema */
ahlog(AH, 1, "dropping %s %s\n", te->desc, te->tag);
/* Select owner and schema as necessary */
_becomeOwner(AH, te);
/* Work out what, if anything, we want from this entry */
reqs = _tocEntryRequired(te, ropt, true);
- if ((reqs & REQ_SCHEMA) != 0) /* We want the schema */
+ /* Both schema and data objects might now have ownership/ACLs */
+ if ((reqs & (REQ_SCHEMA | REQ_DATA)) != 0)
{
ahlog(AH, 1, "setting owner and privileges for %s %s\n",
te->desc, te->tag);
restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
RestoreOptions *ropt, bool is_parallel)
{
- int retval = 0;
+ int retval = 0;
teReqs reqs;
bool defnDumped;
defnDumped = false;
- if ((reqs & REQ_SCHEMA) != 0) /* We want the schema */
+ if ((reqs & REQ_SCHEMA) != 0) /* We want the schema */
{
ahlog(AH, 1, "creating %s %s\n", te->desc, te->tag);
if (AH->lastErrorTE == te)
{
/*
- * We failed to create the table.
- * If --no-data-for-failed-tables was given,
- * mark the corresponding TABLE DATA to be ignored.
+ * We failed to create the table. If
+ * --no-data-for-failed-tables was given, mark the
+ * corresponding TABLE DATA to be ignored.
*
- * In the parallel case this must be done in the parent,
- * so we just set the return value.
+ * In the parallel case this must be done in the parent, so we
+ * just set the return value.
*/
if (ropt->noDataForFailedTables)
{
else
{
/*
- * We created the table successfully. Mark the
- * corresponding TABLE DATA for possible truncation.
+ * We created the table successfully. Mark the corresponding
+ * TABLE DATA for possible truncation.
*
- * In the parallel case this must be done in the parent,
- * so we just set the return value.
+ * In the parallel case this must be done in the parent, so we
+ * just set the return value.
*/
if (is_parallel)
retval = WORKER_CREATE_DONE;
if ((reqs & REQ_DATA) != 0)
{
/*
- * hadDumper will be set if there is genuine data component for
- * this node. Otherwise, we need to check the defn field for
- * statements that need to be executed in data-only restores.
+ * hadDumper will be set if there is genuine data component for this
+ * node. Otherwise, we need to check the defn field for statements
+ * that need to be executed in data-only restores.
*/
if (te->hadDumper)
{
/*
* If we can output the data, then restore it.
*/
- if (AH->PrintTocDataPtr != NULL && (reqs & REQ_DATA) != 0)
+ if (AH->PrintTocDataPtr !=NULL && (reqs & REQ_DATA) != 0)
{
_printTocEntry(AH, te, ropt, true, false);
te->tag);
/*
- * In parallel restore, if we created the table earlier
- * in the run then we wrap the COPY in a transaction and
- * precede it with a TRUNCATE. If archiving is not on
- * this prevents WAL-logging the COPY. This obtains a
- * speedup similar to that from using single_txn mode
- * in non-parallel restores.
+ * In parallel restore, if we created the table earlier in
+ * the run then we wrap the COPY in a transaction and
+ * precede it with a TRUNCATE. If archiving is not on
+ * this prevents WAL-logging the COPY. This obtains a
+ * speedup similar to that from using single_txn mode in
+ * non-parallel restores.
*/
if (is_parallel && te->created)
{
}
/*
- * If we have a copy statement, use it. As of V1.3,
- * these are separate to allow easy import from
- * withing a database connection. Pre 1.3 archives can
- * not use DB connections and are sent to output only.
+ * If we have a copy statement, use it. As of V1.3, these
+ * are separate to allow easy import from withing a
+ * database connection. Pre 1.3 archives can not use DB
+ * connections and are sent to output only.
*
- * For V1.3+, the table data MUST have a copy
- * statement so that we can go into appropriate mode
- * with libpq.
+ * For V1.3+, the table data MUST have a copy statement so
+ * that we can go into appropriate mode with libpq.
*/
if (te->copyStmt && strlen(te->copyStmt) > 0)
{
ahprintf(AH, ";\n;\n; Selected TOC Entries:\n;\n");
+ /* We should print DATABASE entries whether or not -C was specified */
+ ropt->createDB = 1;
+
for (te = AH->toc->next; te != AH->toc; te = te->next)
{
if (ropt->verbose || _tocEntryRequired(te, ropt, true) != 0)
te->tag, te->owner);
if (ropt->verbose && te->nDeps > 0)
{
- int i;
+ int i;
ahprintf(AH, ";\tdepends on:");
for (i = 0; i < te->nDeps; i++)
* Called by a format handler to initiate restoration of a blob
*/
void
-StartRestoreBlob(ArchiveHandle *AH, Oid oid)
+StartRestoreBlob(ArchiveHandle *AH, Oid oid, bool drop)
{
+ bool old_blob_style = (AH->version < K_VERS_1_12);
Oid loOid;
AH->blobCount++;
ahlog(AH, 2, "restoring large object with OID %u\n", oid);
+ /* With an old archive we must do drop and create logic here */
+ if (old_blob_style && drop)
+ DropBlobIfExists(AH, oid);
+
if (AH->connection)
{
- loOid = lo_create(AH->connection, oid);
- if (loOid == 0 || loOid != oid)
- die_horribly(AH, modulename, "could not create large object %u\n",
- oid);
-
+ if (old_blob_style)
+ {
+ loOid = lo_create(AH->connection, oid);
+ if (loOid == 0 || loOid != oid)
+ die_horribly(AH, modulename, "could not create large object %u: %s",
+ oid, PQerrorMessage(AH->connection));
+ }
AH->loFd = lo_open(AH->connection, oid, INV_WRITE);
if (AH->loFd == -1)
- die_horribly(AH, modulename, "could not open large object\n");
+ die_horribly(AH, modulename, "could not open large object %u: %s",
+ oid, PQerrorMessage(AH->connection));
}
else
{
- ahprintf(AH, "SELECT lo_open(lo_create(%u), %d);\n", oid, INV_WRITE);
+ if (old_blob_style)
+ ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n",
+ oid, INV_WRITE);
+ else
+ ahprintf(AH, "SELECT pg_catalog.lo_open('%u', %d);\n",
+ oid, INV_WRITE);
}
AH->writingBlob = 1;
}
else
{
- ahprintf(AH, "SELECT lo_close(0);\n\n");
+ ahprintf(AH, "SELECT pg_catalog.lo_close(0);\n\n");
}
}
char *endptr;
DumpId id;
TocEntry *te;
- TocEntry *tePrev;
/* Allocate space for the 'wanted' array, and init it */
ropt->idWanted = (bool *) malloc(sizeof(bool) * AH->maxDumpId);
memset(ropt->idWanted, 0, sizeof(bool) * AH->maxDumpId);
- /* Set prev entry as head of list */
- tePrev = AH->toc;
-
/* Setup the file */
fh = fopen(ropt->tocFile, PG_BINARY_R);
if (!fh)
cmnt[0] = '\0';
/* Ignore if all blank */
- if (strspn(buf, " \t\r") == strlen(buf))
+ if (strspn(buf, " \t\r\n") == strlen(buf))
continue;
/* Get an ID, check it's valid and not already seen */
die_horribly(AH, modulename, "could not find entry for ID %d\n",
id);
+ /* Mark it wanted */
ropt->idWanted[id - 1] = true;
- _moveAfter(AH, tePrev, te);
- tePrev = te;
+ /*
+ * Move each item to the end of the list as it is selected, so that
+ * they are placed in the desired order. Any unwanted items will end
+ * up at the front of the list, which may seem unintuitive but it's
+ * what we need. In an ordinary serial restore that makes no
+ * difference, but in a parallel restore we need to mark unrestored
+ * items' dependencies as satisfied before we start examining
+ * restorable items. Otherwise they could have surprising
+ * side-effects on the order in which restorable items actually get
+ * restored.
+ */
+ _moveBefore(AH, AH->toc, te);
}
if (fclose(fh) != 0)
res = lo_write(AH->connection, AH->loFd, AH->lo_buf, AH->lo_buf_used);
ahlog(AH, 5, ngettext("wrote %lu byte of large object data (result = %lu)\n",
- "wrote %lu bytes of large object data (result = %lu)\n",
+ "wrote %lu bytes of large object data (result = %lu)\n",
AH->lo_buf_used),
(unsigned long) AH->lo_buf_used, (unsigned long) res);
if (res != AH->lo_buf_used)
}
else
{
- unsigned char *str;
- size_t len;
+ PQExpBuffer buf = createPQExpBuffer();
- str = PQescapeBytea((const unsigned char *) AH->lo_buf,
- AH->lo_buf_used, &len);
- if (!str)
- die_horribly(AH, modulename, "out of memory\n");
+ appendByteaLiteralAHX(buf,
+ (const unsigned char *) AH->lo_buf,
+ AH->lo_buf_used,
+ AH);
/* Hack: turn off writingBlob so ahwrite doesn't recurse to here */
AH->writingBlob = 0;
- ahprintf(AH, "SELECT lowrite(0, '%s');\n", str);
+ ahprintf(AH, "SELECT pg_catalog.lowrite(0, %s);\n", buf->data);
AH->writingBlob = 1;
- free(str);
+ destroyPQExpBuffer(buf);
}
AH->lo_buf_used = 0;
}
va_end(ap);
}
+#ifdef NOT_USED
+
static void
_moveAfter(ArchiveHandle *AH, TocEntry *pos, TocEntry *te)
{
+ /* Unlink te from list */
te->prev->next = te->next;
te->next->prev = te->prev;
+ /* and insert it after "pos" */
te->prev = pos;
te->next = pos->next;
-
pos->next->prev = te;
pos->next = te;
}
-#ifdef NOT_USED
+#endif
static void
_moveBefore(ArchiveHandle *AH, TocEntry *pos, TocEntry *te)
{
+ /* Unlink te from list */
te->prev->next = te->next;
te->next->prev = te->prev;
+ /* and insert it before "pos" */
te->prev = pos->prev;
te->next = pos;
pos->prev->next = te;
pos->prev = te;
}
-#endif
static TocEntry *
getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
if (strncmp(sig, "PGDMP", 5) == 0)
{
+ /*
+ * Finish reading (most of) a custom-format header.
+ *
+ * NB: this code must agree with ReadHead().
+ */
AH->vmaj = fgetc(fh);
AH->vmin = fgetc(fh);
else
AH->lookaheadLen = 0; /* Don't bother since we've reset the file */
-#if 0
- write_msg(modulename, ngettext("read %lu byte into lookahead buffer\n",
- "read %lu bytes into lookahead buffer\n",
- AH->lookaheadLen),
- (unsigned long) AH->lookaheadLen);
-#endif
-
/* Close the file */
if (wantClose)
if (fclose(fh) != 0)
AH->vmin = K_VERS_MINOR;
AH->vrev = K_VERS_REV;
+ /* Make a convenient integer <maj><min><rev>00 */
+ AH->version = ((AH->vmaj * 256 + AH->vmin) * 256 + AH->vrev) * 256 + 0;
+
/* initialize for backwards compatible string processing */
AH->public.encoding = 0; /* PG_SQL_ASCII */
AH->public.std_strings = false;
AH->public.exit_on_error = true;
AH->public.n_errors = 0;
+ AH->archiveDumpVersion = PG_VERSION;
+
AH->createDate = time(NULL);
AH->intSize = sizeof(int);
else
{
/*
- * rules for pre-8.4 archives wherein pg_dump hasn't classified
- * the entries into sections
+ * Rules for pre-8.4 archives wherein pg_dump hasn't classified
+ * the entries into sections. This list need not cover entry
+ * types added later than 8.4.
*/
if (strcmp(te->desc, "COMMENT") == 0 ||
- strcmp(te->desc, "ACL") == 0)
+ strcmp(te->desc, "ACL") == 0 ||
+ strcmp(te->desc, "ACL LANGUAGE") == 0)
te->section = SECTION_NONE;
else if (strcmp(te->desc, "TABLE DATA") == 0 ||
strcmp(te->desc, "BLOBS") == 0 ||
return 0;
/* If it's an ACL, maybe ignore it */
- if ((!include_acls || ropt->aclsSkip) && strcmp(te->desc, "ACL") == 0)
+ if ((!include_acls || ropt->aclsSkip) && _tocEntryIsACL(te))
return 0;
- if (!ropt->create && strcmp(te->desc, "DATABASE") == 0)
+ /* If it's security labels, maybe ignore it */
+ if (ropt->skip_seclabel && strcmp(te->desc, "SECURITY LABEL") == 0)
+ return 0;
+
+ /* Ignore DATABASE entry unless we should create it */
+ if (!ropt->createDB && strcmp(te->desc, "DATABASE") == 0)
return 0;
/* Check options for selective dump/restore */
if (!te->hadDumper)
{
/*
- * Special Case: If 'SEQUENCE SET' then it is considered a data entry
+ * Special Case: If 'SEQUENCE SET' or anything to do with BLOBs, then
+ * it is considered a data entry. We don't need to check for the
+ * BLOBS entry or old-style BLOB COMMENTS, because they will have
+ * hadDumper = true ... but we do need to check new-style BLOB
+ * comments.
*/
- if (strcmp(te->desc, "SEQUENCE SET") == 0)
+ if (strcmp(te->desc, "SEQUENCE SET") == 0 ||
+ strcmp(te->desc, "BLOB") == 0 ||
+ (strcmp(te->desc, "ACL") == 0 &&
+ strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
+ (strcmp(te->desc, "COMMENT") == 0 &&
+ strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
+ (strcmp(te->desc, "SECURITY LABEL") == 0 &&
+ strncmp(te->tag, "LARGE OBJECT ", 13) == 0))
res = res & REQ_DATA;
else
res = res & ~REQ_DATA;
return res;
}
+/*
+ * Identify TOC entries that are ACLs.
+ */
+static bool
+_tocEntryIsACL(TocEntry *te)
+{
+ /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
+ if (strcmp(te->desc, "ACL") == 0 ||
+ strcmp(te->desc, "ACL LANGUAGE") == 0 ||
+ strcmp(te->desc, "DEFAULT ACL") == 0)
+ return true;
+ return false;
+}
+
/*
* Issue SET commands for parameters that we want to have set the same way
* at all times during execution of a restore script.
static void
_doSetFixedOutputState(ArchiveHandle *AH)
{
- /* Disable statement_timeout in archive for pg_restore/psql */
+ /* Disable statement_timeout in archive for pg_restore/psql */
ahprintf(AH, "SET statement_timeout = 0;\n");
/* Select the correct character set encoding */
}
/*
- * Become the owner of the the given TOC entry object. If
+ * Become the owner of the given TOC entry object. If
* changes in ownership are not allowed, this doesn't do anything.
*/
static void
strcmp(type, "DOMAIN") == 0 ||
strcmp(type, "TABLE") == 0 ||
strcmp(type, "TYPE") == 0 ||
+ strcmp(type, "FOREIGN TABLE") == 0 ||
strcmp(type, "TEXT SEARCH DICTIONARY") == 0 ||
strcmp(type, "TEXT SEARCH CONFIGURATION") == 0)
{
return;
}
+ /* BLOBs just have a name, but it's numeric so must not use fmtId */
+ if (strcmp(type, "BLOB") == 0)
+ {
+ appendPQExpBuffer(buf, "LARGE OBJECT %s", te->tag);
+ return;
+ }
+
/*
* These object types require additional decoration. Fortunately, the
* information needed is exactly what's in the DROP command.
/* ACLs are dumped only during acl pass */
if (acl_pass)
{
- if (strcmp(te->desc, "ACL") != 0)
+ if (!_tocEntryIsACL(te))
return;
}
else
{
- if (strcmp(te->desc, "ACL") == 0)
+ if (_tocEntryIsACL(te))
return;
}
strlen(te->owner) > 0 && strlen(te->dropStmt) > 0)
{
if (strcmp(te->desc, "AGGREGATE") == 0 ||
+ strcmp(te->desc, "BLOB") == 0 ||
strcmp(te->desc, "CONVERSION") == 0 ||
strcmp(te->desc, "DATABASE") == 0 ||
strcmp(te->desc, "DOMAIN") == 0 ||
strcmp(te->desc, "TYPE") == 0 ||
strcmp(te->desc, "VIEW") == 0 ||
strcmp(te->desc, "SEQUENCE") == 0 ||
+ strcmp(te->desc, "FOREIGN TABLE") == 0 ||
strcmp(te->desc, "TEXT SEARCH DICTIONARY") == 0 ||
strcmp(te->desc, "TEXT SEARCH CONFIGURATION") == 0 ||
strcmp(te->desc, "FOREIGN DATA WRAPPER") == 0 ||
* If it's an ACL entry, it might contain SET SESSION AUTHORIZATION
* commands, so we can no longer assume we know the current auth setting.
*/
- if (strncmp(te->desc, "ACL", 3) == 0)
+ if (acl_pass)
{
if (AH->currUser)
free(AH->currUser);
int fmt;
struct tm crtm;
- /* If we haven't already read the header... */
+ /*
+ * If we haven't already read the header, do so.
+ *
+ * NB: this code must agree with _discoverArchiveFormat(). Maybe find a
+ * way to unify the cases?
+ */
if (!AH->readHeader)
{
if ((*AH->ReadBufPtr) (AH, tmpMag, 5) != 5)
AH->version = ((AH->vmaj * 256 + AH->vmin) * 256 + AH->vrev) * 256 + 0;
-
if (AH->version < K_VERS_1_0 || AH->version > K_VERS_MAX)
die_horribly(AH, modulename, "unsupported version (%d.%d) in file header\n",
AH->vmaj, AH->vmin);
AH->archiveRemoteVersion = ReadStr(AH);
AH->archiveDumpVersion = ReadStr(AH);
}
-
}
/*
* checkSeek
- * check to see if fseek can be performed.
+ * check to see if ftell/fseek can be performed.
*/
bool
checkSeek(FILE *fp)
{
- if (fseeko(fp, 0, SEEK_CUR) != 0)
- return false;
- else if (sizeof(pgoff_t) > sizeof(long))
- {
- /*
- * At this point, pgoff_t is too large for long, so we return based on
- * whether an pgoff_t version of fseek is available.
- */
-#ifdef HAVE_FSEEKO
- return true;
-#else
+ pgoff_t tpos;
+
+ /*
+ * If pgoff_t is wider than long, we must have "real" fseeko and not an
+ * emulation using fseek. Otherwise report no seek capability.
+ */
+#ifndef HAVE_FSEEKO
+ if (sizeof(pgoff_t) > sizeof(long))
return false;
#endif
- }
- else
- return true;
+
+ /* Check that ftello works on this file */
+ errno = 0;
+ tpos = ftello(fp);
+ if (errno)
+ return false;
+
+ /*
+ * Check that fseeko(SEEK_SET) works, too. NB: we used to try to test
+ * this with fseeko(fp, 0, SEEK_CUR). But some platforms treat that as a
+ * successful no-op even on files that are otherwise unseekable.
+ */
+ if (fseeko(fp, tpos, SEEK_SET) != 0)
+ return false;
+
+ return true;
}
ParallelSlot *slots;
int work_status;
int next_slot;
- TocEntry *first_unprocessed = AH->toc->next;
+ TocEntry pending_list;
+ TocEntry ready_list;
TocEntry *next_work_item;
thandle ret_child;
TocEntry *te;
- ahlog(AH,2,"entering restore_toc_entries_parallel\n");
+ ahlog(AH, 2, "entering restore_toc_entries_parallel\n");
/* we haven't got round to making this work for all archive formats */
if (AH->ClonePtr == NULL || AH->ReopenPtr == NULL)
die_horribly(AH, modulename, "parallel restore is not supported with this archive file format\n");
+ /* doesn't work if the archive represents dependencies as OIDs, either */
+ if (AH->version < K_VERS_1_8)
+ die_horribly(AH, modulename, "parallel restore is not supported with archives made by pre-8.0 pg_dump\n");
+
slots = (ParallelSlot *) calloc(sizeof(ParallelSlot), n_slots);
/* Adjust dependency information */
fix_dependencies(AH);
/*
- * Do all the early stuff in a single connection in the parent.
- * There's no great point in running it in parallel, in fact it will
- * actually run faster in a single connection because we avoid all the
- * connection and setup overhead.
+ * Do all the early stuff in a single connection in the parent. There's no
+ * great point in running it in parallel, in fact it will actually run
+ * faster in a single connection because we avoid all the connection and
+ * setup overhead. Also, pg_dump is not currently very good about
+ * showing all the dependencies of SECTION_PRE_DATA items, so we do not
+ * risk trying to process them out-of-order.
*/
- while ((next_work_item = get_next_work_item(AH, &first_unprocessed,
- NULL, 0)) != NULL)
+ for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next)
{
+ /* Non-PRE_DATA items are just ignored for now */
if (next_work_item->section == SECTION_DATA ||
next_work_item->section == SECTION_POST_DATA)
- break;
+ continue;
ahlog(AH, 1, "processing item %d %s %s\n",
next_work_item->dumpId,
(void) restore_toc_entry(AH, next_work_item, ropt, false);
- next_work_item->restored = true;
- reduce_dependencies(AH, next_work_item);
+ /* there should be no touch of ready_list here, so pass NULL */
+ reduce_dependencies(AH, next_work_item, NULL);
}
/*
- * Now close parent connection in prep for parallel steps. We do this
+ * Now close parent connection in prep for parallel steps. We do this
* mainly to ensure that we don't exceed the specified number of parallel
* connections.
*/
AH->currTablespace = NULL;
AH->currWithOids = -1;
+ /*
+ * Initialize the lists of pending and ready items. After this setup, the
+ * pending list is everything that needs to be done but is blocked by one
+ * or more dependencies, while the ready list contains items that have no
+ * remaining dependencies. Note: we don't yet filter out entries that
+ * aren't going to be restored. They might participate in dependency
+ * chains connecting entries that should be restored, so we treat them as
+ * live until we actually process them.
+ */
+ par_list_header_init(&pending_list);
+ par_list_header_init(&ready_list);
+ for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next)
+ {
+ /* All PRE_DATA items were dealt with above */
+ if (next_work_item->section == SECTION_DATA ||
+ next_work_item->section == SECTION_POST_DATA)
+ {
+ if (next_work_item->depCount > 0)
+ par_list_append(&pending_list, next_work_item);
+ else
+ par_list_append(&ready_list, next_work_item);
+ }
+ }
+
/*
* main parent loop
*
* left to be done.
*/
- ahlog(AH,1,"entering main parallel loop\n");
+ ahlog(AH, 1, "entering main parallel loop\n");
- while ((next_work_item = get_next_work_item(AH, &first_unprocessed,
+ while ((next_work_item = get_next_work_item(AH, &ready_list,
slots, n_slots)) != NULL ||
work_in_progress(slots, n_slots))
{
if (next_work_item != NULL)
{
- teReqs reqs;
+ teReqs reqs;
/* If not to be dumped, don't waste time launching a worker */
reqs = _tocEntryRequired(next_work_item, AH->ropt, false);
next_work_item->dumpId,
next_work_item->desc, next_work_item->tag);
- next_work_item->restored = true;
- reduce_dependencies(AH, next_work_item);
+ par_list_remove(next_work_item);
+ reduce_dependencies(AH, next_work_item, &ready_list);
continue;
}
if ((next_slot = get_next_slot(slots, n_slots)) != NO_SLOT)
{
/* There is work still to do and a worker slot available */
- thandle child;
+ thandle child;
RestoreArgs *args;
ahlog(AH, 1, "launching item %d %s %s\n",
next_work_item->dumpId,
next_work_item->desc, next_work_item->tag);
- next_work_item->restored = true;
+ par_list_remove(next_work_item);
/* this memory is dealloced in mark_work_done() */
args = malloc(sizeof(RestoreArgs));
if (WIFEXITED(work_status))
{
- mark_work_done(AH, ret_child, WEXITSTATUS(work_status),
+ mark_work_done(AH, &ready_list,
+ ret_child, WEXITSTATUS(work_status),
slots, n_slots);
}
else
}
}
- ahlog(AH,1,"finished main parallel loop\n");
+ ahlog(AH, 1, "finished main parallel loop\n");
/*
* Now reconnect the single parent connection.
_doSetFixedOutputState(AH);
/*
- * Make sure there is no non-ACL work left due to, say,
- * circular dependencies, or some other pathological condition.
- * If so, do it in the single parent connection.
+ * Make sure there is no non-ACL work left due to, say, circular
+ * dependencies, or some other pathological condition. If so, do it in the
+ * single parent connection.
*/
- for (te = AH->toc->next; te != AH->toc; te = te->next)
+ for (te = pending_list.par_next; te != &pending_list; te = te->par_next)
{
- if (!te->restored)
- {
- ahlog(AH, 1, "processing missed item %d %s %s\n",
- te->dumpId, te->desc, te->tag);
- (void) restore_toc_entry(AH, te, ropt, false);
- }
+ ahlog(AH, 1, "processing missed item %d %s %s\n",
+ te->dumpId, te->desc, te->tag);
+ (void) restore_toc_entry(AH, te, ropt, false);
}
/* The ACLs will be handled back in RestoreArchive. */
static thandle
spawn_restore(RestoreArgs *args)
{
- thandle child;
+ thandle child;
/* Ensure stdio state is quiesced before forking */
fflush(NULL);
}
/*
- * collect status from a completed worker child
+ * collect status from a completed worker child
*/
static thandle
reap_child(ParallelSlot *slots, int n_slots, int *work_status)
return wait(work_status);
#else
static HANDLE *handles = NULL;
- int hindex, snum, tnum;
- thandle ret_child;
- DWORD res;
+ int hindex,
+ snum,
+ tnum;
+ thandle ret_child;
+ DWORD res;
/* first time around only, make space for handles to listen on */
if (handles == NULL)
- handles = (HANDLE *) calloc(sizeof(HANDLE),n_slots);
+ handles = (HANDLE *) calloc(sizeof(HANDLE), n_slots);
/* set up list of handles to listen to */
- for (snum=0, tnum=0; snum < n_slots; snum++)
+ for (snum = 0, tnum = 0; snum < n_slots; snum++)
if (slots[snum].child_id != 0)
handles[tnum++] = slots[snum].child_id;
ret_child = handles[hindex - WAIT_OBJECT_0];
/* get the result */
- GetExitCodeThread(ret_child,&res);
+ GetExitCodeThread(ret_child, &res);
*work_status = res;
/* dispose of handle to stop leaks */
static bool
work_in_progress(ParallelSlot *slots, int n_slots)
{
- int i;
+ int i;
for (i = 0; i < n_slots; i++)
{
static int
get_next_slot(ParallelSlot *slots, int n_slots)
{
- int i;
+ int i;
for (i = 0; i < n_slots; i++)
{
static bool
has_lock_conflicts(TocEntry *te1, TocEntry *te2)
{
- int j,k;
+ int j,
+ k;
for (j = 0; j < te1->nLockDeps; j++)
{
}
+/*
+ * Initialize the header of a parallel-processing list.
+ *
+ * These are circular lists with a dummy TocEntry as header, just like the
+ * main TOC list; but we use separate list links so that an entry can be in
+ * the main TOC list as well as in a parallel-processing list.
+ */
+static void
+par_list_header_init(TocEntry *l)
+{
+ l->par_prev = l->par_next = l;
+}
+
+/* Append te to the end of the parallel-processing list headed by l */
+static void
+par_list_append(TocEntry *l, TocEntry *te)
+{
+ te->par_prev = l->par_prev;
+ l->par_prev->par_next = te;
+ l->par_prev = te;
+ te->par_next = l;
+}
+
+/* Remove te from whatever parallel-processing list it's in */
+static void
+par_list_remove(TocEntry *te)
+{
+ te->par_prev->par_next = te->par_next;
+ te->par_next->par_prev = te->par_prev;
+ te->par_prev = NULL;
+ te->par_next = NULL;
+}
+
/*
* Find the next work item (if any) that is capable of being run now.
*
* To qualify, the item must have no remaining dependencies
- * and no requirement for locks that is incompatible with
- * items currently running.
+ * and no requirements for locks that are incompatible with
+ * items currently running. Items in the ready_list are known to have
+ * no remaining dependencies, but we have to check for lock conflicts.
*
- * first_unprocessed is state data that tracks the location of the first
- * TocEntry that's not marked 'restored'. This avoids O(N^2) search time
- * with long TOC lists. (Even though the constant is pretty small, it'd
- * get us eventually.)
+ * Note that the returned item has *not* been removed from ready_list.
+ * The caller must do that after successfully dispatching the item.
*
* pref_non_data is for an alternative selection algorithm that gives
* preference to non-data items if there is already a data load running.
* It is currently disabled.
*/
static TocEntry *
-get_next_work_item(ArchiveHandle *AH, TocEntry **first_unprocessed,
+get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
ParallelSlot *slots, int n_slots)
{
- bool pref_non_data = false; /* or get from AH->ropt */
- TocEntry *data_te = NULL;
- TocEntry *te;
- int i,k;
+ bool pref_non_data = false; /* or get from AH->ropt */
+ TocEntry *data_te = NULL;
+ TocEntry *te;
+ int i,
+ k;
/*
* Bogus heuristics for pref_non_data
*/
if (pref_non_data)
{
- int count = 0;
+ int count = 0;
- for (k=0; k < n_slots; k++)
+ for (k = 0; k < n_slots; k++)
if (slots[k].args->te != NULL &&
slots[k].args->te->section == SECTION_DATA)
count++;
}
/*
- * Advance first_unprocessed if possible.
+ * Search the ready_list until we find a suitable item.
*/
- for (te = *first_unprocessed; te != AH->toc; te = te->next)
+ for (te = ready_list->par_next; te != ready_list; te = te->par_next)
{
- if (!te->restored)
- break;
- }
- *first_unprocessed = te;
-
- /*
- * Search from first_unprocessed until we find an available item.
- */
- for (; te != AH->toc; te = te->next)
- {
- bool conflicts = false;
-
- /* Ignore if already done or still waiting on dependencies */
- if (te->restored || te->depCount > 0)
- continue;
+ bool conflicts = false;
/*
* Check to see if the item would need exclusive lock on something
- * that a currently running item also needs lock on, or vice versa.
- * If so, we don't want to schedule them together.
+ * that a currently running item also needs lock on, or vice versa. If
+ * so, we don't want to schedule them together.
*/
for (i = 0; i < n_slots && !conflicts; i++)
{
- TocEntry *running_te;
+ TocEntry *running_te;
if (slots[i].args == NULL)
continue;
if (data_te != NULL)
return data_te;
- ahlog(AH,2,"no item ready\n");
+ ahlog(AH, 2, "no item ready\n");
return NULL;
}
parallel_restore(RestoreArgs *args)
{
ArchiveHandle *AH = args->AH;
- TocEntry *te = args->te;
+ TocEntry *te = args->te;
RestoreOptions *ropt = AH->ropt;
- int retval;
+ int retval;
/*
- * Close and reopen the input file so we have a private file pointer
- * that doesn't stomp on anyone else's file pointer, if we're actually
- * going to need to read from the file. Otherwise, just close it
- * except on Windows, where it will possibly be needed by other threads.
+ * Close and reopen the input file so we have a private file pointer that
+ * doesn't stomp on anyone else's file pointer, if we're actually going to
+ * need to read from the file. Otherwise, just close it except on Windows,
+ * where it will possibly be needed by other threads.
*
- * Note: on Windows, since we are using threads not processes, the
- * reopen call *doesn't* close the original file pointer but just open
- * a new one.
+ * Note: on Windows, since we are using threads not processes, the reopen
+ * call *doesn't* close the original file pointer but just open a new one.
*/
- if (te->section == SECTION_DATA )
+ if (te->section == SECTION_DATA)
(AH->ReopenPtr) (AH);
#ifndef WIN32
else
AH->connection = NULL;
/* If we reopened the file, we are done with it, so close it now */
- if (te->section == SECTION_DATA )
+ if (te->section == SECTION_DATA)
(AH->ClosePtr) (AH);
if (retval == 0 && AH->public.n_errors)
* update status, and reduce the dependency count of any dependent items.
*/
static void
-mark_work_done(ArchiveHandle *AH, thandle worker, int status,
+mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
+ thandle worker, int status,
ParallelSlot *slots, int n_slots)
{
- TocEntry *te = NULL;
- int i;
+ TocEntry *te = NULL;
+ int i;
for (i = 0; i < n_slots; i++)
{
die_horribly(AH, modulename, "worker process failed: exit code %d\n",
status);
- reduce_dependencies(AH, te);
+ reduce_dependencies(AH, te, ready_list);
}
/*
* Process the dependency information into a form useful for parallel restore.
*
- * We set up depCount fields that are the number of as-yet-unprocessed
+ * This function takes care of fixing up some missing or badly designed
+ * dependencies, and then prepares subsidiary data structures that will be
+ * used in the main parallel-restore logic, including:
+ * 1. We build the tocsByDumpId[] index array.
+ * 2. We build the revDeps[] arrays of incoming dependency dumpIds.
+ * 3. We set up depCount fields that are the number of as-yet-unprocessed
* dependencies for each TOC entry.
*
* We also identify locking dependencies so that we can avoid trying to
static void
fix_dependencies(ArchiveHandle *AH)
{
- TocEntry **tocsByDumpId;
TocEntry *te;
- int i;
+ int i;
/*
- * For some of the steps here, it is convenient to have an array that
- * indexes the TOC entries by dump ID, rather than searching the TOC
- * list repeatedly. Entries for dump IDs not present in the TOC will
- * be NULL.
+ * It is convenient to have an array that indexes the TOC entries by dump
+ * ID, rather than searching the TOC list repeatedly. Entries for dump
+ * IDs not present in the TOC will be NULL.
+ *
+ * NOTE: because maxDumpId is just the highest dump ID defined in the
+ * archive, there might be dependencies for IDs > maxDumpId. All uses of
+ * this array must guard against out-of-range dependency numbers.
*
- * Also, initialize the depCount fields.
+ * Also, initialize the depCount/revDeps/nRevDeps fields, and make sure
+ * the TOC items are marked as not being in any parallel-processing list.
*/
- tocsByDumpId = (TocEntry **) calloc(AH->maxDumpId, sizeof(TocEntry *));
+ maxDumpId = AH->maxDumpId;
+ tocsByDumpId = (TocEntry **) calloc(maxDumpId, sizeof(TocEntry *));
for (te = AH->toc->next; te != AH->toc; te = te->next)
{
tocsByDumpId[te->dumpId - 1] = te;
te->depCount = te->nDeps;
+ te->revDeps = NULL;
+ te->nRevDeps = 0;
+ te->par_prev = NULL;
+ te->par_next = NULL;
}
/*
* dependencies.
*
* Note: currently, a TABLE DATA should always have exactly one
- * dependency, on its TABLE item. So we don't bother to search,
- * but look just at the first dependency. We do trouble to make sure
- * that it's a TABLE, if possible. However, if the dependency isn't
- * in the archive then just assume it was a TABLE; this is to cover
- * cases where the table was suppressed but we have the data and some
- * dependent post-data items.
+ * dependency, on its TABLE item. So we don't bother to search, but look
+ * just at the first dependency. We do trouble to make sure that it's a
+ * TABLE, if possible. However, if the dependency isn't in the archive
+ * then just assume it was a TABLE; this is to cover cases where the table
+ * was suppressed but we have the data and some dependent post-data items.
+ *
+ * XXX this is O(N^2) if there are a lot of tables. We ought to fix
+ * pg_dump to produce correctly-linked dependencies in the first place.
*/
for (te = AH->toc->next; te != AH->toc; te = te->next)
{
if (strcmp(te->desc, "TABLE DATA") == 0 && te->nDeps > 0)
{
- DumpId tableId = te->dependencies[0];
+ DumpId tableId = te->dependencies[0];
- if (tocsByDumpId[tableId - 1] == NULL ||
+ if (tableId > maxDumpId ||
+ tocsByDumpId[tableId - 1] == NULL ||
strcmp(tocsByDumpId[tableId - 1]->desc, "TABLE") == 0)
{
repoint_table_dependencies(AH, tableId, te->dumpId);
}
/*
- * Pre-8.4 versions of pg_dump neglected to set up a dependency from
- * BLOB COMMENTS to BLOBS. Cope. (We assume there's only one BLOBS
- * and only one BLOB COMMENTS in such files.)
+ * Pre-8.4 versions of pg_dump neglected to set up a dependency from BLOB
+ * COMMENTS to BLOBS. Cope. (We assume there's only one BLOBS and only
+ * one BLOB COMMENTS in such files.)
*/
if (AH->version < K_VERS_1_11)
{
}
/*
- * It is possible that the dependencies list items that are not in the
- * archive at all. Subtract such items from the depCounts.
+ * At this point we start to build the revDeps reverse-dependency arrays,
+ * so all changes of dependencies must be complete.
+ */
+
+ /*
+ * Count the incoming dependencies for each item. Also, it is possible
+ * that the dependencies list items that are not in the archive at
+ * all. Subtract such items from the depCounts.
*/
for (te = AH->toc->next; te != AH->toc; te = te->next)
{
for (i = 0; i < te->nDeps; i++)
{
- if (tocsByDumpId[te->dependencies[i] - 1] == NULL)
+ DumpId depid = te->dependencies[i];
+
+ if (depid <= maxDumpId && tocsByDumpId[depid - 1] != NULL)
+ tocsByDumpId[depid - 1]->nRevDeps++;
+ else
te->depCount--;
}
}
+ /*
+ * Allocate space for revDeps[] arrays, and reset nRevDeps so we can
+ * use it as a counter below.
+ */
+ for (te = AH->toc->next; te != AH->toc; te = te->next)
+ {
+ if (te->nRevDeps > 0)
+ te->revDeps = (DumpId *) malloc(te->nRevDeps * sizeof(DumpId));
+ te->nRevDeps = 0;
+ }
+
+ /*
+ * Build the revDeps[] arrays of incoming-dependency dumpIds. This
+ * had better agree with the loops above.
+ */
+ for (te = AH->toc->next; te != AH->toc; te = te->next)
+ {
+ for (i = 0; i < te->nDeps; i++)
+ {
+ DumpId depid = te->dependencies[i];
+
+ if (depid <= maxDumpId && tocsByDumpId[depid - 1] != NULL)
+ {
+ TocEntry *otherte = tocsByDumpId[depid - 1];
+
+ otherte->revDeps[otherte->nRevDeps++] = te->dumpId;
+ }
+ }
+ }
+
/*
* Lastly, work out the locking dependencies.
*/
{
te->lockDeps = NULL;
te->nLockDeps = 0;
- identify_locking_dependencies(te, tocsByDumpId);
+ identify_locking_dependencies(te);
}
-
- free(tocsByDumpId);
}
/*
DumpId tableId, DumpId tableDataId)
{
TocEntry *te;
- int i;
+ int i;
for (te = AH->toc->next; te != AH->toc; te = te->next)
{
* Identify which objects we'll need exclusive lock on in order to restore
* the given TOC entry (*other* than the one identified by the TOC entry
* itself). Record their dump IDs in the entry's lockDeps[] array.
- * tocsByDumpId[] is a convenience array to avoid searching the TOC
- * for each dependency.
*/
static void
-identify_locking_dependencies(TocEntry *te, TocEntry **tocsByDumpId)
+identify_locking_dependencies(TocEntry *te)
{
DumpId *lockids;
int nlockids;
/*
* We assume the item requires exclusive lock on each TABLE DATA item
- * listed among its dependencies. (This was originally a dependency
- * on the TABLE, but fix_dependencies repointed it to the data item.
- * Note that all the entry types we are interested in here are POST_DATA,
- * so they will all have been changed this way.)
+ * listed among its dependencies. (This was originally a dependency on
+ * the TABLE, but fix_dependencies repointed it to the data item. Note
+ * that all the entry types we are interested in here are POST_DATA, so
+ * they will all have been changed this way.)
*/
lockids = (DumpId *) malloc(te->nDeps * sizeof(DumpId));
nlockids = 0;
for (i = 0; i < te->nDeps; i++)
{
- DumpId depid = te->dependencies[i];
+ DumpId depid = te->dependencies[i];
- if (tocsByDumpId[depid - 1] &&
+ if (depid <= maxDumpId && tocsByDumpId[depid - 1] &&
strcmp(tocsByDumpId[depid - 1]->desc, "TABLE DATA") == 0)
lockids[nlockids++] = depid;
}
/*
* Remove the specified TOC entry from the depCounts of items that depend on
- * it, thereby possibly making them ready-to-run.
+ * it, thereby possibly making them ready-to-run. Any pending item that
+ * becomes ready should be moved to the ready list.
*/
static void
-reduce_dependencies(ArchiveHandle *AH, TocEntry *te)
+reduce_dependencies(ArchiveHandle *AH, TocEntry *te, TocEntry *ready_list)
{
- DumpId target = te->dumpId;
- int i;
+ int i;
- ahlog(AH,2,"reducing dependencies for %d\n",target);
+ ahlog(AH, 2, "reducing dependencies for %d\n", te->dumpId);
- /*
- * We must examine all entries, not only the ones after the target item,
- * because if the user used a -L switch then the original dependency-
- * respecting order has been destroyed by SortTocFromFile.
- */
- for (te = AH->toc->next; te != AH->toc; te = te->next)
+ for (i = 0; i < te->nRevDeps; i++)
{
- for (i = 0; i < te->nDeps; i++)
+ TocEntry *otherte = tocsByDumpId[te->revDeps[i] - 1];
+
+ otherte->depCount--;
+ if (otherte->depCount == 0 && otherte->par_prev != NULL)
{
- if (te->dependencies[i] == target)
- te->depCount--;
+ /* It must be in the pending list, so remove it ... */
+ par_list_remove(otherte);
+ /* ... and add to ready_list */
+ par_list_append(ready_list, otherte);
}
}
}
ArchiveHandle *clone;
/* Make a "flat" copy */
- clone = (ArchiveHandle *) malloc(sizeof(ArchiveHandle));
+ clone = (ArchiveHandle *) malloc(sizeof(ArchiveHandle));
if (clone == NULL)
die_horribly(AH, modulename, "out of memory\n");
memcpy(clone, AH, sizeof(ArchiveHandle));