]> granicus.if.org Git - postgresql/blobdiff - src/bin/pg_dump/pg_backup_archiver.c
Basic foreign table support.
[postgresql] / src / bin / pg_dump / pg_backup_archiver.c
index 20bd3eb7eacd1b0b24228024b69a64dc212e5329..64d8d93dda81805540ac70dbd7afcf11390c19fc 100644 (file)
@@ -15,7 +15,7 @@
  *
  *
  * IDENTIFICATION
- *             $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.c,v 1.175 2009/08/07 22:48:34 tgl Exp $
+ *             src/bin/pg_dump/pg_backup_archiver.c
  *
  *-------------------------------------------------------------------------
  */
@@ -79,6 +79,10 @@ const char *progname;
 
 static const char *modulename = gettext_noop("archiver");
 
+/* index array created by fix_dependencies -- only used in parallel restore */
+static TocEntry          **tocsByDumpId;                       /* index by dumpId - 1 */
+static DumpId          maxDumpId;                              /* length of above array */
+
 
 static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
                 const int compression, ArchiveMode mode);
@@ -98,10 +102,11 @@ static void _selectTablespace(ArchiveHandle *AH, const char *tablespace);
 static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te);
 static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te);
 static teReqs _tocEntryRequired(TocEntry *te, RestoreOptions *ropt, bool include_acls);
+static bool _tocEntryIsACL(TocEntry *te);
 static void _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
 static void _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
 static TocEntry *getTocEntryByDumpId(ArchiveHandle *AH, DumpId id);
-static void _moveAfter(ArchiveHandle *AH, TocEntry *pos, TocEntry *te);
+static void _moveBefore(ArchiveHandle *AH, TocEntry *pos, TocEntry *te);
 static int     _discoverArchiveFormat(ArchiveHandle *AH);
 
 static void dump_lo_buf(ArchiveHandle *AH);
@@ -133,10 +138,9 @@ static void fix_dependencies(ArchiveHandle *AH);
 static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
 static void repoint_table_dependencies(ArchiveHandle *AH,
                                                   DumpId tableId, DumpId tableDataId);
-static void identify_locking_dependencies(TocEntry *te,
-                                                         TocEntry **tocsByDumpId);
+static void identify_locking_dependencies(TocEntry *te);
 static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
-                                                               TocEntry *ready_list);
+                                       TocEntry *ready_list);
 static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
 static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);
 static ArchiveHandle *CloneArchive(ArchiveHandle *AH);
@@ -209,19 +213,19 @@ RestoreArchive(Archive *AHX, RestoreOptions *ropt)
        /*
         * Check for nonsensical option combinations.
         *
-        * NB: create+dropSchema is useless because if you're creating the DB,
+        * NB: createDB+dropSchema is useless because if you're creating the DB,
         * there's no need to drop individual items in it.  Moreover, if we tried
         * to do that then we'd issue the drops in the database initially
         * connected to, not the one we will create, which is very bad...
         */
-       if (ropt->create && ropt->dropSchema)
+       if (ropt->createDB && ropt->dropSchema)
                die_horribly(AH, modulename, "-C and -c are incompatible options\n");
 
        /*
-        * -1 is not compatible with -C, because we can't create a database inside
+        * -C is not compatible with -1, because we can't create a database inside
         * a transaction block.
         */
-       if (ropt->create && ropt->single_txn)
+       if (ropt->createDB && ropt->single_txn)
                die_horribly(AH, modulename, "-C and -1 are incompatible options\n");
 
        /*
@@ -301,7 +305,15 @@ RestoreArchive(Archive *AHX, RestoreOptions *ropt)
        ahprintf(AH, "--\n-- PostgreSQL database dump\n--\n\n");
 
        if (AH->public.verbose)
+       {
+               if (AH->archiveRemoteVersion)
+                       ahprintf(AH, "-- Dumped from database version %s\n",
+                                        AH->archiveRemoteVersion);
+               if (AH->archiveDumpVersion)
+                       ahprintf(AH, "-- Dumped by pg_dump version %s\n",
+                                        AH->archiveDumpVersion);
                dumpTimestamp(AH, "Started on", AH->createDate);
+       }
 
        if (ropt->single_txn)
        {
@@ -328,9 +340,9 @@ RestoreArchive(Archive *AHX, RestoreOptions *ropt)
                        AH->currentTE = te;
 
                        reqs = _tocEntryRequired(te, ropt, false /* needn't drop ACLs */ );
-                       if (((reqs & REQ_SCHEMA) != 0) && te->dropStmt)
+                       /* We want anything that's selected and has a dropStmt */
+                       if (((reqs & (REQ_SCHEMA | REQ_DATA)) != 0) && te->dropStmt)
                        {
-                               /* We want the schema */
                                ahlog(AH, 1, "dropping %s %s\n", te->desc, te->tag);
                                /* Select owner and schema as necessary */
                                _becomeOwner(AH, te);
@@ -380,7 +392,8 @@ RestoreArchive(Archive *AHX, RestoreOptions *ropt)
                /* Work out what, if anything, we want from this entry */
                reqs = _tocEntryRequired(te, ropt, true);
 
-               if ((reqs & REQ_SCHEMA) != 0)   /* We want the schema */
+               /* Both schema and data objects might now have ownership/ACLs */
+               if ((reqs & (REQ_SCHEMA | REQ_DATA)) != 0)
                {
                        ahlog(AH, 1, "setting owner and privileges for %s %s\n",
                                  te->desc, te->tag);
@@ -804,6 +817,9 @@ PrintTOCSummary(Archive *AHX, RestoreOptions *ropt)
 
        ahprintf(AH, ";\n;\n; Selected TOC Entries:\n;\n");
 
+       /* We should print DATABASE entries whether or not -C was specified */
+       ropt->createDB = 1;
+
        for (te = AH->toc->next; te != AH->toc; te = te->next)
        {
                if (ropt->verbose || _tocEntryRequired(te, ropt, true) != 0)
@@ -904,6 +920,7 @@ EndRestoreBlobs(ArchiveHandle *AH)
 void
 StartRestoreBlob(ArchiveHandle *AH, Oid oid, bool drop)
 {
+       bool            old_blob_style = (AH->version < K_VERS_1_12);
        Oid                     loOid;
 
        AH->blobCount++;
@@ -913,25 +930,32 @@ StartRestoreBlob(ArchiveHandle *AH, Oid oid, bool drop)
 
        ahlog(AH, 2, "restoring large object with OID %u\n", oid);
 
-       if (drop)
-               ahprintf(AH, "SELECT CASE WHEN EXISTS(SELECT 1 FROM pg_catalog.pg_largeobject WHERE loid = '%u') THEN pg_catalog.lo_unlink('%u') END;\n",
-                                oid, oid);
+       /* With an old archive we must do drop and create logic here */
+       if (old_blob_style && drop)
+               DropBlobIfExists(AH, oid);
 
        if (AH->connection)
        {
-               loOid = lo_create(AH->connection, oid);
-               if (loOid == 0 || loOid != oid)
-                       die_horribly(AH, modulename, "could not create large object %u\n",
-                                                oid);
-
+               if (old_blob_style)
+               {
+                       loOid = lo_create(AH->connection, oid);
+                       if (loOid == 0 || loOid != oid)
+                               die_horribly(AH, modulename, "could not create large object %u: %s",
+                                                        oid, PQerrorMessage(AH->connection));
+               }
                AH->loFd = lo_open(AH->connection, oid, INV_WRITE);
                if (AH->loFd == -1)
-                       die_horribly(AH, modulename, "could not open large object\n");
+                       die_horribly(AH, modulename, "could not open large object %u: %s",
+                                                oid, PQerrorMessage(AH->connection));
        }
        else
        {
-               ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n",
-                                oid, INV_WRITE);
+               if (old_blob_style)
+                       ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n",
+                                        oid, INV_WRITE);
+               else
+                       ahprintf(AH, "SELECT pg_catalog.lo_open('%u', %d);\n",
+                                        oid, INV_WRITE);
        }
 
        AH->writingBlob = 1;
@@ -973,15 +997,11 @@ SortTocFromFile(Archive *AHX, RestoreOptions *ropt)
        char       *endptr;
        DumpId          id;
        TocEntry   *te;
-       TocEntry   *tePrev;
 
        /* Allocate space for the 'wanted' array, and init it */
        ropt->idWanted = (bool *) malloc(sizeof(bool) * AH->maxDumpId);
        memset(ropt->idWanted, 0, sizeof(bool) * AH->maxDumpId);
 
-       /* Set prev entry as head of list */
-       tePrev = AH->toc;
-
        /* Setup the file */
        fh = fopen(ropt->tocFile, PG_BINARY_R);
        if (!fh)
@@ -996,7 +1016,7 @@ SortTocFromFile(Archive *AHX, RestoreOptions *ropt)
                        cmnt[0] = '\0';
 
                /* Ignore if all blank */
-               if (strspn(buf, " \t\r") == strlen(buf))
+               if (strspn(buf, " \t\r\n") == strlen(buf))
                        continue;
 
                /* Get an ID, check it's valid and not already seen */
@@ -1014,10 +1034,21 @@ SortTocFromFile(Archive *AHX, RestoreOptions *ropt)
                        die_horribly(AH, modulename, "could not find entry for ID %d\n",
                                                 id);
 
+               /* Mark it wanted */
                ropt->idWanted[id - 1] = true;
 
-               _moveAfter(AH, tePrev, te);
-               tePrev = te;
+               /*
+                * Move each item to the end of the list as it is selected, so that
+                * they are placed in the desired order.  Any unwanted items will end
+                * up at the front of the list, which may seem unintuitive but it's
+                * what we need.  In an ordinary serial restore that makes no
+                * difference, but in a parallel restore we need to mark unrestored
+                * items' dependencies as satisfied before we start examining
+                * restorable items.  Otherwise they could have surprising
+                * side-effects on the order in which restorable items actually get
+                * restored.
+                */
+               _moveBefore(AH, AH->toc, te);
        }
 
        if (fclose(fh) != 0)
@@ -1449,33 +1480,37 @@ warn_or_die_horribly(ArchiveHandle *AH,
        va_end(ap);
 }
 
+#ifdef NOT_USED
+
 static void
 _moveAfter(ArchiveHandle *AH, TocEntry *pos, TocEntry *te)
 {
+       /* Unlink te from list */
        te->prev->next = te->next;
        te->next->prev = te->prev;
 
+       /* and insert it after "pos" */
        te->prev = pos;
        te->next = pos->next;
-
        pos->next->prev = te;
        pos->next = te;
 }
 
-#ifdef NOT_USED
+#endif
 
 static void
 _moveBefore(ArchiveHandle *AH, TocEntry *pos, TocEntry *te)
 {
+       /* Unlink te from list */
        te->prev->next = te->next;
        te->next->prev = te->prev;
 
+       /* and insert it before "pos" */
        te->prev = pos->prev;
        te->next = pos;
        pos->prev->next = te;
        pos->prev = te;
 }
-#endif
 
 static TocEntry *
 getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
@@ -1733,6 +1768,11 @@ _discoverArchiveFormat(ArchiveHandle *AH)
 
        if (strncmp(sig, "PGDMP", 5) == 0)
        {
+               /*
+                * Finish reading (most of) a custom-format header.
+                *
+                * NB: this code must agree with ReadHead().
+                */
                AH->vmaj = fgetc(fh);
                AH->vmin = fgetc(fh);
 
@@ -1829,6 +1869,9 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt,
        AH->vmin = K_VERS_MINOR;
        AH->vrev = K_VERS_REV;
 
+       /* Make a convenient integer <maj><min><rev>00 */
+       AH->version = ((AH->vmaj * 256 + AH->vmin) * 256 + AH->vrev) * 256 + 0;
+
        /* initialize for backwards compatible string processing */
        AH->public.encoding = 0;        /* PG_SQL_ASCII */
        AH->public.std_strings = false;
@@ -1837,6 +1880,8 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt,
        AH->public.exit_on_error = true;
        AH->public.n_errors = 0;
 
+       AH->archiveDumpVersion = PG_VERSION;
+
        AH->createDate = time(NULL);
 
        AH->intSize = sizeof(int);
@@ -2068,11 +2113,13 @@ ReadToc(ArchiveHandle *AH)
                else
                {
                        /*
-                        * rules for pre-8.4 archives wherein pg_dump hasn't classified
-                        * the entries into sections
+                        * Rules for pre-8.4 archives wherein pg_dump hasn't classified
+                        * the entries into sections.  This list need not cover entry
+                        * types added later than 8.4.
                         */
                        if (strcmp(te->desc, "COMMENT") == 0 ||
-                               strcmp(te->desc, "ACL") == 0)
+                               strcmp(te->desc, "ACL") == 0 ||
+                               strcmp(te->desc, "ACL LANGUAGE") == 0)
                                te->section = SECTION_NONE;
                        else if (strcmp(te->desc, "TABLE DATA") == 0 ||
                                         strcmp(te->desc, "BLOBS") == 0 ||
@@ -2227,10 +2274,15 @@ _tocEntryRequired(TocEntry *te, RestoreOptions *ropt, bool include_acls)
                return 0;
 
        /* If it's an ACL, maybe ignore it */
-       if ((!include_acls || ropt->aclsSkip) && strcmp(te->desc, "ACL") == 0)
+       if ((!include_acls || ropt->aclsSkip) && _tocEntryIsACL(te))
+               return 0;
+
+       /* If it's security labels, maybe ignore it */
+       if (ropt->skip_seclabel && strcmp(te->desc, "SECURITY LABEL") == 0)
                return 0;
 
-       if (!ropt->create && strcmp(te->desc, "DATABASE") == 0)
+       /* Ignore DATABASE entry unless we should create it */
+       if (!ropt->createDB && strcmp(te->desc, "DATABASE") == 0)
                return 0;
 
        /* Check options for selective dump/restore */
@@ -2284,9 +2336,20 @@ _tocEntryRequired(TocEntry *te, RestoreOptions *ropt, bool include_acls)
        if (!te->hadDumper)
        {
                /*
-                * Special Case: If 'SEQUENCE SET' then it is considered a data entry
+                * Special Case: If 'SEQUENCE SET' or anything to do with BLOBs, then
+                * it is considered a data entry.  We don't need to check for the
+                * BLOBS entry or old-style BLOB COMMENTS, because they will have
+                * hadDumper = true ... but we do need to check new-style BLOB
+                * comments.
                 */
-               if (strcmp(te->desc, "SEQUENCE SET") == 0)
+               if (strcmp(te->desc, "SEQUENCE SET") == 0 ||
+                       strcmp(te->desc, "BLOB") == 0 ||
+                       (strcmp(te->desc, "ACL") == 0 &&
+                        strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
+                       (strcmp(te->desc, "COMMENT") == 0 &&
+                        strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
+                       (strcmp(te->desc, "SECURITY LABEL") == 0 &&
+                        strncmp(te->tag, "LARGE OBJECT ", 13) == 0))
                        res = res & REQ_DATA;
                else
                        res = res & ~REQ_DATA;
@@ -2318,6 +2381,20 @@ _tocEntryRequired(TocEntry *te, RestoreOptions *ropt, bool include_acls)
        return res;
 }
 
+/*
+ * Identify TOC entries that are ACLs.
+ */
+static bool
+_tocEntryIsACL(TocEntry *te)
+{
+       /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
+       if (strcmp(te->desc, "ACL") == 0 ||
+               strcmp(te->desc, "ACL LANGUAGE") == 0 ||
+               strcmp(te->desc, "DEFAULT ACL") == 0)
+               return true;
+       return false;
+}
+
 /*
  * Issue SET commands for parameters that we want to have set the same way
  * at all times during execution of a restore script.
@@ -2495,7 +2572,7 @@ _becomeUser(ArchiveHandle *AH, const char *user)
 }
 
 /*
- * Become the owner of the the given TOC entry object. If
+ * Become the owner of the given TOC entry object.     If
  * changes in ownership are not allowed, this doesn't do anything.
  */
 static void
@@ -2649,6 +2726,7 @@ _getObjectDescription(PQExpBuffer buf, TocEntry *te, ArchiveHandle *AH)
                strcmp(type, "DOMAIN") == 0 ||
                strcmp(type, "TABLE") == 0 ||
                strcmp(type, "TYPE") == 0 ||
+               strcmp(type, "FOREIGN TABLE") == 0 ||
                strcmp(type, "TEXT SEARCH DICTIONARY") == 0 ||
                strcmp(type, "TEXT SEARCH CONFIGURATION") == 0)
        {
@@ -2683,6 +2761,13 @@ _getObjectDescription(PQExpBuffer buf, TocEntry *te, ArchiveHandle *AH)
                return;
        }
 
+       /* BLOBs just have a name, but it's numeric so must not use fmtId */
+       if (strcmp(type, "BLOB") == 0)
+       {
+               appendPQExpBuffer(buf, "LARGE OBJECT %s", te->tag);
+               return;
+       }
+
        /*
         * These object types require additional decoration.  Fortunately, the
         * information needed is exactly what's in the DROP command.
@@ -2721,12 +2806,12 @@ _printTocEntry(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt, bool isDat
        /* ACLs are dumped only during acl pass */
        if (acl_pass)
        {
-               if (strcmp(te->desc, "ACL") != 0)
+               if (!_tocEntryIsACL(te))
                        return;
        }
        else
        {
-               if (strcmp(te->desc, "ACL") == 0)
+               if (_tocEntryIsACL(te))
                        return;
        }
 
@@ -2820,6 +2905,7 @@ _printTocEntry(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt, bool isDat
                strlen(te->owner) > 0 && strlen(te->dropStmt) > 0)
        {
                if (strcmp(te->desc, "AGGREGATE") == 0 ||
+                       strcmp(te->desc, "BLOB") == 0 ||
                        strcmp(te->desc, "CONVERSION") == 0 ||
                        strcmp(te->desc, "DATABASE") == 0 ||
                        strcmp(te->desc, "DOMAIN") == 0 ||
@@ -2833,6 +2919,7 @@ _printTocEntry(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt, bool isDat
                        strcmp(te->desc, "TYPE") == 0 ||
                        strcmp(te->desc, "VIEW") == 0 ||
                        strcmp(te->desc, "SEQUENCE") == 0 ||
+                       strcmp(te->desc, "FOREIGN TABLE") == 0 ||
                        strcmp(te->desc, "TEXT SEARCH DICTIONARY") == 0 ||
                        strcmp(te->desc, "TEXT SEARCH CONFIGURATION") == 0 ||
                        strcmp(te->desc, "FOREIGN DATA WRAPPER") == 0 ||
@@ -2869,7 +2956,7 @@ _printTocEntry(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt, bool isDat
         * If it's an ACL entry, it might contain SET SESSION AUTHORIZATION
         * commands, so we can no longer assume we know the current auth setting.
         */
-       if (strncmp(te->desc, "ACL", 3) == 0)
+       if (acl_pass)
        {
                if (AH->currUser)
                        free(AH->currUser);
@@ -2920,7 +3007,12 @@ ReadHead(ArchiveHandle *AH)
        int                     fmt;
        struct tm       crtm;
 
-       /* If we haven't already read the header... */
+       /*
+        * If we haven't already read the header, do so.
+        *
+        * NB: this code must agree with _discoverArchiveFormat().      Maybe find a
+        * way to unify the cases?
+        */
        if (!AH->readHeader)
        {
                if ((*AH->ReadBufPtr) (AH, tmpMag, 5) != 5)
@@ -2939,7 +3031,6 @@ ReadHead(ArchiveHandle *AH)
 
                AH->version = ((AH->vmaj * 256 + AH->vmin) * 256 + AH->vrev) * 256 + 0;
 
-
                if (AH->version < K_VERS_1_0 || AH->version > K_VERS_MAX)
                        die_horribly(AH, modulename, "unsupported version (%d.%d) in file header\n",
                                                 AH->vmaj, AH->vmin);
@@ -3002,33 +3093,42 @@ ReadHead(ArchiveHandle *AH)
                AH->archiveRemoteVersion = ReadStr(AH);
                AH->archiveDumpVersion = ReadStr(AH);
        }
-
 }
 
 
 /*
  * checkSeek
- *       check to see if fseek can be performed.
+ *       check to see if ftell/fseek can be performed.
  */
 bool
 checkSeek(FILE *fp)
 {
-       if (fseeko(fp, 0, SEEK_CUR) != 0)
-               return false;
-       else if (sizeof(pgoff_t) > sizeof(long))
-       {
-               /*
-                * At this point, pgoff_t is too large for long, so we return based on
-                * whether an pgoff_t version of fseek is available.
-                */
-#ifdef HAVE_FSEEKO
-               return true;
-#else
+       pgoff_t         tpos;
+
+       /*
+        * If pgoff_t is wider than long, we must have "real" fseeko and not an
+        * emulation using fseek.  Otherwise report no seek capability.
+        */
+#ifndef HAVE_FSEEKO
+       if (sizeof(pgoff_t) > sizeof(long))
                return false;
 #endif
-       }
-       else
-               return true;
+
+       /* Check that ftello works on this file */
+       errno = 0;
+       tpos = ftello(fp);
+       if (errno)
+               return false;
+
+       /*
+        * Check that fseeko(SEEK_SET) works, too.      NB: we used to try to test
+        * this with fseeko(fp, 0, SEEK_CUR).  But some platforms treat that as a
+        * successful no-op even on files that are otherwise unseekable.
+        */
+       if (fseeko(fp, tpos, SEEK_SET) != 0)
+               return false;
+
+       return true;
 }
 
 
@@ -3088,6 +3188,10 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
        if (AH->ClonePtr == NULL || AH->ReopenPtr == NULL)
                die_horribly(AH, modulename, "parallel restore is not supported with this archive file format\n");
 
+       /* doesn't work if the archive represents dependencies as OIDs, either */
+       if (AH->version < K_VERS_1_8)
+               die_horribly(AH, modulename, "parallel restore is not supported with archives made by pre-8.0 pg_dump\n");
+
        slots = (ParallelSlot *) calloc(sizeof(ParallelSlot), n_slots);
 
        /* Adjust dependency information */
@@ -3097,13 +3201,16 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
         * Do all the early stuff in a single connection in the parent. There's no
         * great point in running it in parallel, in fact it will actually run
         * faster in a single connection because we avoid all the connection and
-        * setup overhead.
+        * setup overhead.  Also, pg_dump is not currently very good about
+        * showing all the dependencies of SECTION_PRE_DATA items, so we do not
+        * risk trying to process them out-of-order.
         */
        for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next)
        {
+               /* Non-PRE_DATA items are just ignored for now */
                if (next_work_item->section == SECTION_DATA ||
                        next_work_item->section == SECTION_POST_DATA)
-                       break;
+                       continue;
 
                ahlog(AH, 1, "processing item %d %s %s\n",
                          next_work_item->dumpId,
@@ -3136,22 +3243,27 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
        AH->currWithOids = -1;
 
        /*
-        * Initialize the lists of pending and ready items.  After this setup,
-        * the pending list is everything that needs to be done but is blocked
-        * by one or more dependencies, while the ready list contains items that
-        * have no remaining dependencies.  Note: we don't yet filter out entries
-        * that aren't going to be restored.  They might participate in
-        * dependency chains connecting entries that should be restored, so we
-        * treat them as live until we actually process them.
+        * Initialize the lists of pending and ready items.  After this setup, the
+        * pending list is everything that needs to be done but is blocked by one
+        * or more dependencies, while the ready list contains items that have no
+        * remaining dependencies.      Note: we don't yet filter out entries that
+        * aren't going to be restored.  They might participate in dependency
+        * chains connecting entries that should be restored, so we treat them as
+        * live until we actually process them.
         */
        par_list_header_init(&pending_list);
        par_list_header_init(&ready_list);
-       for (; next_work_item != AH->toc; next_work_item = next_work_item->next)
+       for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next)
        {
-               if (next_work_item->depCount > 0)
-                       par_list_append(&pending_list, next_work_item);
-               else
-                       par_list_append(&ready_list, next_work_item);
+               /* All PRE_DATA items were dealt with above */
+               if (next_work_item->section == SECTION_DATA ||
+                       next_work_item->section == SECTION_POST_DATA)
+               {
+                       if (next_work_item->depCount > 0)
+                               par_list_append(&pending_list, next_work_item);
+                       else
+                               par_list_append(&ready_list, next_work_item);
+               }
        }
 
        /*
@@ -3635,7 +3747,12 @@ mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
 /*
  * Process the dependency information into a form useful for parallel restore.
  *
- * We set up depCount fields that are the number of as-yet-unprocessed
+ * This function takes care of fixing up some missing or badly designed
+ * dependencies, and then prepares subsidiary data structures that will be
+ * used in the main parallel-restore logic, including:
+ * 1. We build the tocsByDumpId[] index array.
+ * 2. We build the revDeps[] arrays of incoming dependency dumpIds.
+ * 3. We set up depCount fields that are the number of as-yet-unprocessed
  * dependencies for each TOC entry.
  *
  * We also identify locking dependencies so that we can avoid trying to
@@ -3644,23 +3761,29 @@ mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
 static void
 fix_dependencies(ArchiveHandle *AH)
 {
-       TocEntry  **tocsByDumpId;
        TocEntry   *te;
        int                     i;
 
        /*
-        * For some of the steps here, it is convenient to have an array that
-        * indexes the TOC entries by dump ID, rather than searching the TOC list
-        * repeatedly.  Entries for dump IDs not present in the TOC will be NULL.
+        * It is convenient to have an array that indexes the TOC entries by dump
+        * ID, rather than searching the TOC list repeatedly.  Entries for dump
+        * IDs not present in the TOC will be NULL.
+        *
+        * NOTE: because maxDumpId is just the highest dump ID defined in the
+        * archive, there might be dependencies for IDs > maxDumpId.  All uses of
+        * this array must guard against out-of-range dependency numbers.
         *
-        * Also, initialize the depCount fields, and make sure all the TOC items
-        * are marked as not being in any parallel-processing list.
+        * Also, initialize the depCount/revDeps/nRevDeps fields, and make sure
+        * the TOC items are marked as not being in any parallel-processing list.
         */
-       tocsByDumpId = (TocEntry **) calloc(AH->maxDumpId, sizeof(TocEntry *));
+       maxDumpId = AH->maxDumpId;
+       tocsByDumpId = (TocEntry **) calloc(maxDumpId, sizeof(TocEntry *));
        for (te = AH->toc->next; te != AH->toc; te = te->next)
        {
                tocsByDumpId[te->dumpId - 1] = te;
                te->depCount = te->nDeps;
+               te->revDeps = NULL;
+               te->nRevDeps = 0;
                te->par_prev = NULL;
                te->par_next = NULL;
        }
@@ -3678,6 +3801,9 @@ fix_dependencies(ArchiveHandle *AH)
         * TABLE, if possible.  However, if the dependency isn't in the archive
         * then just assume it was a TABLE; this is to cover cases where the table
         * was suppressed but we have the data and some dependent post-data items.
+        *
+        * XXX this is O(N^2) if there are a lot of tables.  We ought to fix
+        * pg_dump to produce correctly-linked dependencies in the first place.
         */
        for (te = AH->toc->next; te != AH->toc; te = te->next)
        {
@@ -3685,7 +3811,8 @@ fix_dependencies(ArchiveHandle *AH)
                {
                        DumpId          tableId = te->dependencies[0];
 
-                       if (tocsByDumpId[tableId - 1] == NULL ||
+                       if (tableId > maxDumpId ||
+                               tocsByDumpId[tableId - 1] == NULL ||
                                strcmp(tocsByDumpId[tableId - 1]->desc, "TABLE") == 0)
                        {
                                repoint_table_dependencies(AH, tableId, te->dumpId);
@@ -3723,18 +3850,58 @@ fix_dependencies(ArchiveHandle *AH)
        }
 
        /*
-        * It is possible that the dependencies list items that are not in the
-        * archive at all.      Subtract such items from the depCounts.
+        * At this point we start to build the revDeps reverse-dependency arrays,
+        * so all changes of dependencies must be complete.
+        */
+
+       /*
+        * Count the incoming dependencies for each item.  Also, it is possible
+        * that the dependencies list items that are not in the archive at
+        * all.  Subtract such items from the depCounts.
         */
        for (te = AH->toc->next; te != AH->toc; te = te->next)
        {
                for (i = 0; i < te->nDeps; i++)
                {
-                       if (tocsByDumpId[te->dependencies[i] - 1] == NULL)
+                       DumpId          depid = te->dependencies[i];
+
+                       if (depid <= maxDumpId && tocsByDumpId[depid - 1] != NULL)
+                               tocsByDumpId[depid - 1]->nRevDeps++;
+                       else
                                te->depCount--;
                }
        }
 
+       /*
+        * Allocate space for revDeps[] arrays, and reset nRevDeps so we can
+        * use it as a counter below.
+        */
+       for (te = AH->toc->next; te != AH->toc; te = te->next)
+       {
+               if (te->nRevDeps > 0)
+                       te->revDeps = (DumpId *) malloc(te->nRevDeps * sizeof(DumpId));
+               te->nRevDeps = 0;
+       }
+
+       /*
+        * Build the revDeps[] arrays of incoming-dependency dumpIds.  This
+        * had better agree with the loops above.
+        */
+       for (te = AH->toc->next; te != AH->toc; te = te->next)
+       {
+               for (i = 0; i < te->nDeps; i++)
+               {
+                       DumpId          depid = te->dependencies[i];
+
+                       if (depid <= maxDumpId && tocsByDumpId[depid - 1] != NULL)
+                       {
+                               TocEntry   *otherte = tocsByDumpId[depid - 1];
+
+                               otherte->revDeps[otherte->nRevDeps++] = te->dumpId;
+                       }
+               }
+       }
+
        /*
         * Lastly, work out the locking dependencies.
         */
@@ -3742,10 +3909,8 @@ fix_dependencies(ArchiveHandle *AH)
        {
                te->lockDeps = NULL;
                te->nLockDeps = 0;
-               identify_locking_dependencies(te, tocsByDumpId);
+               identify_locking_dependencies(te);
        }
-
-       free(tocsByDumpId);
 }
 
 /*
@@ -3779,11 +3944,9 @@ repoint_table_dependencies(ArchiveHandle *AH,
  * Identify which objects we'll need exclusive lock on in order to restore
  * the given TOC entry (*other* than the one identified by the TOC entry
  * itself).  Record their dump IDs in the entry's lockDeps[] array.
- * tocsByDumpId[] is a convenience array to avoid searching the TOC
- * for each dependency.
  */
 static void
-identify_locking_dependencies(TocEntry *te, TocEntry **tocsByDumpId)
+identify_locking_dependencies(TocEntry *te)
 {
        DumpId     *lockids;
        int                     nlockids;
@@ -3814,7 +3977,7 @@ identify_locking_dependencies(TocEntry *te, TocEntry **tocsByDumpId)
        {
                DumpId          depid = te->dependencies[i];
 
-               if (tocsByDumpId[depid - 1] &&
+               if (depid <= maxDumpId && tocsByDumpId[depid - 1] &&
                        strcmp(tocsByDumpId[depid - 1]->desc, "TABLE DATA") == 0)
                        lockids[nlockids++] = depid;
        }
@@ -3837,31 +4000,21 @@ identify_locking_dependencies(TocEntry *te, TocEntry **tocsByDumpId)
 static void
 reduce_dependencies(ArchiveHandle *AH, TocEntry *te, TocEntry *ready_list)
 {
-       DumpId          target = te->dumpId;
        int                     i;
 
-       ahlog(AH, 2, "reducing dependencies for %d\n", target);
+       ahlog(AH, 2, "reducing dependencies for %d\n", te->dumpId);
 
-       /*
-        * We must examine all entries, not only the ones after the target item,
-        * because if the user used a -L switch then the original dependency-
-        * respecting order has been destroyed by SortTocFromFile.
-        */
-       for (te = AH->toc->next; te != AH->toc; te = te->next)
+       for (i = 0; i < te->nRevDeps; i++)
        {
-               for (i = 0; i < te->nDeps; i++)
+               TocEntry   *otherte = tocsByDumpId[te->revDeps[i] - 1];
+
+               otherte->depCount--;
+               if (otherte->depCount == 0 && otherte->par_prev != NULL)
                {
-                       if (te->dependencies[i] == target)
-                       {
-                               te->depCount--;
-                               if (te->depCount == 0 && te->par_prev != NULL)
-                               {
-                                       /* It must be in the pending list, so remove it ... */
-                                       par_list_remove(te);
-                                       /* ... and add to ready_list */
-                                       par_list_append(ready_list, te);
-                               }
-                       }
+                       /* It must be in the pending list, so remove it ... */
+                       par_list_remove(otherte);
+                       /* ... and add to ready_list */
+                       par_list_append(ready_list, otherte);
                }
        }
 }