]> granicus.if.org Git - postgresql/blob - src/bin/pg_dump/pg_backup_archiver.c
Provide for parallel restoration from a custom format archive. Each data and
[postgresql] / src / bin / pg_dump / pg_backup_archiver.c
1 /*-------------------------------------------------------------------------
2  *
3  * pg_backup_archiver.c
4  *
5  *      Private implementation of the archiver routines.
6  *
7  *      See the headers to pg_restore for more details.
8  *
9  * Copyright (c) 2000, Philip Warner
10  *      Rights are granted to use this software in any way so long
11  *      as this notice is not removed.
12  *
13  *      The author is not responsible for loss or damages that may
14  *      result from its use.
15  *
16  *
17  * IDENTIFICATION
18  *              $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.c,v 1.162 2009/02/02 20:07:36 adunstan Exp $
19  *
20  *-------------------------------------------------------------------------
21  */
22
23 #include "pg_backup_db.h"
24 #include "dumputils.h"
25
26 #include <ctype.h>
27 #include <unistd.h>
28 #include <sys/types.h>
29 #include <sys/wait.h>
30
31 #ifdef WIN32
32 #include <io.h>
33 #endif
34
35 #include "libpq/libpq-fs.h"
36
37 /*
38  * Special exit values from worker children.  We reserve 0 for normal
39  * success; 1 and other small values should be interpreted as crashes.
40  */
41 #define WORKER_CREATE_DONE              10
42 #define WORKER_INHIBIT_DATA             11
43 #define WORKER_IGNORED_ERRORS   12
44
45 /*
46  * Unix uses exit to return result from worker child, so function is void.
47  * Windows thread result comes via function return.
48  */
49 #ifndef WIN32
50 #define parallel_restore_result void
51 #else
52 #define parallel_restore_result DWORD
53 #endif
54
55 /* IDs for worker children are either PIDs or thread handles */
56 #ifndef WIN32
57 #define thandle pid_t
58 #else
59 #define thandle HANDLE
60 #endif
61
62 typedef struct _restore_args
63 {
64         ArchiveHandle *AH;
65         TocEntry      *te;
66 } RestoreArgs;
67
68 typedef struct _parallel_slot
69 {
70         thandle         child_id;
71         RestoreArgs *args;
72 } ParallelSlot;
73
74 #define NO_SLOT (-1)
75
76 const char *progname;
77
78 static const char *modulename = gettext_noop("archiver");
79
80
81 static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
82                  const int compression, ArchiveMode mode);
83 static void _getObjectDescription(PQExpBuffer buf, TocEntry *te,
84                                           ArchiveHandle *AH);
85 static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt, bool isData, bool acl_pass);
86
87
88 static void _doSetFixedOutputState(ArchiveHandle *AH);
89 static void _doSetSessionAuth(ArchiveHandle *AH, const char *user);
90 static void _doSetWithOids(ArchiveHandle *AH, const bool withOids);
91 static void _reconnectToDB(ArchiveHandle *AH, const char *dbname);
92 static void _becomeUser(ArchiveHandle *AH, const char *user);
93 static void _becomeOwner(ArchiveHandle *AH, TocEntry *te);
94 static void _selectOutputSchema(ArchiveHandle *AH, const char *schemaName);
95 static void _selectTablespace(ArchiveHandle *AH, const char *tablespace);
96 static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te);
97 static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te);
98 static teReqs _tocEntryRequired(TocEntry *te, RestoreOptions *ropt, bool include_acls);
99 static void _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
100 static void _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
101 static TocEntry *getTocEntryByDumpId(ArchiveHandle *AH, DumpId id);
102 static void _moveAfter(ArchiveHandle *AH, TocEntry *pos, TocEntry *te);
103 static int      _discoverArchiveFormat(ArchiveHandle *AH);
104
105 static void dump_lo_buf(ArchiveHandle *AH);
106 static void _write_msg(const char *modulename, const char *fmt, va_list ap);
107 static void _die_horribly(ArchiveHandle *AH, const char *modulename, const char *fmt, va_list ap);
108
109 static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim);
110 static OutputContext SetOutput(ArchiveHandle *AH, char *filename, int compression);
111 static void ResetOutput(ArchiveHandle *AH, OutputContext savedContext);
112
113 static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
114                                                          RestoreOptions *ropt, bool is_parallel);
115 static void restore_toc_entries_parallel(ArchiveHandle *AH);
116 static thandle spawn_restore(RestoreArgs *args);
117 static thandle reap_child(ParallelSlot *slots, int n_slots, int *work_status);
118 static bool work_in_progress(ParallelSlot *slots, int n_slots);
119 static int get_next_slot(ParallelSlot *slots, int n_slots);
120 static TocEntry *get_next_work_item(ArchiveHandle *AH,
121                                                                         TocEntry **first_unprocessed,
122                                                                         ParallelSlot *slots, int n_slots);
123 static parallel_restore_result parallel_restore(RestoreArgs *args);
124 static void mark_work_done(ArchiveHandle *AH, thandle worker, int status,
125                                                    ParallelSlot *slots, int n_slots);
126 static void fix_dependencies(ArchiveHandle *AH);
127 static void repoint_table_dependencies(ArchiveHandle *AH,
128                                                                            DumpId tableId, DumpId tableDataId);
129 static void identify_locking_dependencies(TocEntry *te,
130                                                                                   TocEntry **tocsByDumpId);
131 static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te);
132 static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
133 static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);
134 static ArchiveHandle *CloneArchive(ArchiveHandle *AH);
135 static void DeCloneArchive(ArchiveHandle *AH);
136
137
138 /*
139  *      Wrapper functions.
140  *
141  *      The objective it to make writing new formats and dumpers as simple
142  *      as possible, if necessary at the expense of extra function calls etc.
143  *
144  */
145
146
147 /* Create a new archive */
148 /* Public */
149 Archive *
150 CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
151                           const int compression, ArchiveMode mode)
152
153 {
154         ArchiveHandle *AH = _allocAH(FileSpec, fmt, compression, mode);
155
156         return (Archive *) AH;
157 }
158
159 /* Open an existing archive */
160 /* Public */
161 Archive *
162 OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
163 {
164         ArchiveHandle *AH = _allocAH(FileSpec, fmt, 0, archModeRead);
165
166         return (Archive *) AH;
167 }
168
169 /* Public */
170 void
171 CloseArchive(Archive *AHX)
172 {
173         int                     res = 0;
174         ArchiveHandle *AH = (ArchiveHandle *) AHX;
175
176         (*AH->ClosePtr) (AH);
177
178         /* Close the output */
179         if (AH->gzOut)
180                 res = GZCLOSE(AH->OF);
181         else if (AH->OF != stdout)
182                 res = fclose(AH->OF);
183
184         if (res != 0)
185                 die_horribly(AH, modulename, "could not close output file: %s\n",
186                                          strerror(errno));
187 }
188
189 /* Public */
190 void
191 RestoreArchive(Archive *AHX, RestoreOptions *ropt)
192 {
193         ArchiveHandle *AH = (ArchiveHandle *) AHX;
194         TocEntry   *te;
195         teReqs          reqs;
196         OutputContext sav;
197
198         AH->ropt = ropt;
199         AH->stage = STAGE_INITIALIZING;
200
201         /*
202          * Check for nonsensical option combinations.
203          *
204          * NB: create+dropSchema is useless because if you're creating the DB,
205          * there's no need to drop individual items in it.  Moreover, if we tried
206          * to do that then we'd issue the drops in the database initially
207          * connected to, not the one we will create, which is very bad...
208          */
209         if (ropt->create && ropt->dropSchema)
210                 die_horribly(AH, modulename, "-C and -c are incompatible options\n");
211
212         /*
213          * -1 is not compatible with -C, because we can't create a database
214          *  inside a transaction block.
215          */
216         if (ropt->create && ropt->single_txn)
217                 die_horribly(AH, modulename, "-C and -1 are incompatible options\n");
218
219         /*
220          * Make sure we won't need (de)compression we haven't got
221          */
222 #ifndef HAVE_LIBZ
223         if (AH->compression != 0 && AH->PrintTocDataPtr != NULL)
224         {
225                 for (te = AH->toc->next; te != AH->toc; te = te->next)
226                 {
227                         reqs = _tocEntryRequired(te, ropt, false);
228                         if (te->hadDumper && (reqs & REQ_DATA) != 0)
229                                 die_horribly(AH, modulename, "cannot restore from compressed archive (compression not supported in this installation)\n");
230                 }
231         }
232 #endif
233
234         /*
235          * If we're using a DB connection, then connect it.
236          */
237         if (ropt->useDB)
238         {
239                 ahlog(AH, 1, "connecting to database for restore\n");
240                 if (AH->version < K_VERS_1_3)
241                         die_horribly(AH, modulename, "direct database connections are not supported in pre-1.3 archives\n");
242
243                 /* XXX Should get this from the archive */
244                 AHX->minRemoteVersion = 070100;
245                 AHX->maxRemoteVersion = 999999;
246
247                 ConnectDatabase(AHX, ropt->dbname,
248                                                 ropt->pghost, ropt->pgport, ropt->username,
249                                                 ropt->requirePassword);
250
251                 /*
252                  * If we're talking to the DB directly, don't send comments since they
253                  * obscure SQL when displaying errors
254                  */
255                 AH->noTocComments = 1;
256         }
257
258         /*
259          * Work out if we have an implied data-only restore. This can happen if
260          * the dump was data only or if the user has used a toc list to exclude
261          * all of the schema data. All we do is look for schema entries - if none
262          * are found then we set the dataOnly flag.
263          *
264          * We could scan for wanted TABLE entries, but that is not the same as
265          * dataOnly. At this stage, it seems unnecessary (6-Mar-2001).
266          */
267         if (!ropt->dataOnly)
268         {
269                 int                     impliedDataOnly = 1;
270
271                 for (te = AH->toc->next; te != AH->toc; te = te->next)
272                 {
273                         reqs = _tocEntryRequired(te, ropt, true);
274                         if ((reqs & REQ_SCHEMA) != 0)
275                         {                                       /* It's schema, and it's wanted */
276                                 impliedDataOnly = 0;
277                                 break;
278                         }
279                 }
280                 if (impliedDataOnly)
281                 {
282                         ropt->dataOnly = impliedDataOnly;
283                         ahlog(AH, 1, "implied data-only restore\n");
284                 }
285         }
286
287         /*
288          * Setup the output file if necessary.
289          */
290         if (ropt->filename || ropt->compression)
291                 sav = SetOutput(AH, ropt->filename, ropt->compression);
292
293         ahprintf(AH, "--\n-- PostgreSQL database dump\n--\n\n");
294
295         if (AH->public.verbose)
296                 dumpTimestamp(AH, "Started on", AH->createDate);
297
298         if (ropt->single_txn)
299         {
300                 if (AH->connection)
301                         StartTransaction(AH);
302                 else
303                         ahprintf(AH, "BEGIN;\n\n");
304         }
305
306         /*
307          * Establish important parameter values right away.
308          */
309         _doSetFixedOutputState(AH);
310
311         AH->stage = STAGE_PROCESSING;
312
313         /*
314          * Drop the items at the start, in reverse order
315          */
316         if (ropt->dropSchema)
317         {
318                 for (te = AH->toc->prev; te != AH->toc; te = te->prev)
319                 {
320                         AH->currentTE = te;
321
322                         reqs = _tocEntryRequired(te, ropt, false /* needn't drop ACLs */ );
323                         if (((reqs & REQ_SCHEMA) != 0) && te->dropStmt)
324                         {
325                                 /* We want the schema */
326                                 ahlog(AH, 1, "dropping %s %s\n", te->desc, te->tag);
327                                 /* Select owner and schema as necessary */
328                                 _becomeOwner(AH, te);
329                                 _selectOutputSchema(AH, te->namespace);
330                                 /* Drop it */
331                                 ahprintf(AH, "%s", te->dropStmt);
332                         }
333                 }
334
335                 /*
336                  * _selectOutputSchema may have set currSchema to reflect the effect
337                  * of a "SET search_path" command it emitted.  However, by now we may
338                  * have dropped that schema; or it might not have existed in the first
339                  * place.  In either case the effective value of search_path will not
340                  * be what we think.  Forcibly reset currSchema so that we will
341                  * re-establish the search_path setting when needed (after creating
342                  * the schema).
343                  *
344                  * If we treated users as pg_dump'able objects then we'd need to reset
345                  * currUser here too.
346                  */
347                 if (AH->currSchema)
348                         free(AH->currSchema);
349                 AH->currSchema = NULL;
350         }
351
352         /*
353          * In serial mode, we now process each non-ACL TOC entry.
354          *
355          * In parallel mode, turn control over to the parallel-restore logic.
356          */
357         if (ropt->number_of_threads > 1 && ropt->useDB)
358                 restore_toc_entries_parallel(AH);
359         else
360         {
361                 for (te = AH->toc->next; te != AH->toc; te = te->next)
362                         (void) restore_toc_entry(AH, te, ropt, false);
363         }
364
365         /*
366          * Scan TOC again to output ownership commands and ACLs
367          */
368         for (te = AH->toc->next; te != AH->toc; te = te->next)
369         {
370                 AH->currentTE = te;
371
372                 /* Work out what, if anything, we want from this entry */
373                 reqs = _tocEntryRequired(te, ropt, true);
374
375                 if ((reqs & REQ_SCHEMA) != 0)   /* We want the schema */
376                 {
377                         ahlog(AH, 1, "setting owner and privileges for %s %s\n",
378                                   te->desc, te->tag);
379                         _printTocEntry(AH, te, ropt, false, true);
380                 }
381         }
382
383         if (ropt->single_txn)
384         {
385                 if (AH->connection)
386                         CommitTransaction(AH);
387                 else
388                         ahprintf(AH, "COMMIT;\n\n");
389         }
390
391         if (AH->public.verbose)
392                 dumpTimestamp(AH, "Completed on", time(NULL));
393
394         ahprintf(AH, "--\n-- PostgreSQL database dump complete\n--\n\n");
395
396         /*
397          * Clean up & we're done.
398          */
399         AH->stage = STAGE_FINALIZING;
400
401         if (ropt->filename || ropt->compression)
402                 ResetOutput(AH, sav);
403
404         if (ropt->useDB)
405         {
406                 PQfinish(AH->connection);
407                 AH->connection = NULL;
408         }
409 }
410
411 /*
412  * Restore a single TOC item.  Used in both parallel and non-parallel restore;
413  * is_parallel is true if we are in a worker child process.
414  *
415  * Returns 0 normally, but WORKER_CREATE_DONE or WORKER_INHIBIT_DATA if
416  * the parallel parent has to make the corresponding status update.
417  */
418 static int
419 restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
420                                   RestoreOptions *ropt, bool is_parallel)
421 {
422         int         retval = 0;
423         teReqs          reqs;
424         bool            defnDumped;
425
426         AH->currentTE = te;
427
428         /* Work out what, if anything, we want from this entry */
429         reqs = _tocEntryRequired(te, ropt, false);
430
431         /* Dump any relevant dump warnings to stderr */
432         if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0)
433         {
434                 if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0)
435                         write_msg(modulename, "warning from original dump file: %s\n", te->defn);
436                 else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0)
437                         write_msg(modulename, "warning from original dump file: %s\n", te->copyStmt);
438         }
439
440         defnDumped = false;
441
442         if ((reqs & REQ_SCHEMA) != 0)   /* We want the schema */
443         {
444                 ahlog(AH, 1, "creating %s %s\n", te->desc, te->tag);
445
446                 _printTocEntry(AH, te, ropt, false, false);
447                 defnDumped = true;
448
449                 if (strcmp(te->desc, "TABLE") == 0)
450                 {
451                         if (AH->lastErrorTE == te)
452                         {
453                                 /*
454                                  * We failed to create the table.
455                                  * If --no-data-for-failed-tables was given,
456                                  * mark the corresponding TABLE DATA to be ignored.
457                                  *
458                                  * In the parallel case this must be done in the parent,
459                                  * so we just set the return value.
460                                  */
461                                 if (ropt->noDataForFailedTables)
462                                 {
463                                         if (is_parallel)
464                                                 retval = WORKER_INHIBIT_DATA;
465                                         else
466                                                 inhibit_data_for_failed_table(AH, te);
467                                 }
468                         }
469                         else
470                         {
471                                 /*
472                                  * We created the table successfully.  Mark the
473                                  * corresponding TABLE DATA for possible truncation.
474                                  *
475                                  * In the parallel case this must be done in the parent,
476                                  * so we just set the return value.
477                                  */
478                                 if (is_parallel)
479                                         retval = WORKER_CREATE_DONE;
480                                 else
481                                         mark_create_done(AH, te);
482                         }
483                 }
484
485                 /* If we created a DB, connect to it... */
486                 if (strcmp(te->desc, "DATABASE") == 0)
487                 {
488                         ahlog(AH, 1, "connecting to new database \"%s\"\n", te->tag);
489                         _reconnectToDB(AH, te->tag);
490                 }
491         }
492
493         /*
494          * If we have a data component, then process it
495          */
496         if ((reqs & REQ_DATA) != 0)
497         {
498                 /*
499                  * hadDumper will be set if there is genuine data component for
500                  * this node. Otherwise, we need to check the defn field for
501                  * statements that need to be executed in data-only restores.
502                  */
503                 if (te->hadDumper)
504                 {
505                         /*
506                          * If we can output the data, then restore it.
507                          */
508                         if (AH->PrintTocDataPtr != NULL && (reqs & REQ_DATA) != 0)
509                         {
510                                 _printTocEntry(AH, te, ropt, true, false);
511
512                                 if (strcmp(te->desc, "BLOBS") == 0 ||
513                                         strcmp(te->desc, "BLOB COMMENTS") == 0)
514                                 {
515                                         ahlog(AH, 1, "restoring %s\n", te->desc);
516
517                                         _selectOutputSchema(AH, "pg_catalog");
518
519                                         (*AH->PrintTocDataPtr) (AH, te, ropt);
520                                 }
521                                 else
522                                 {
523                                         _disableTriggersIfNecessary(AH, te, ropt);
524
525                                         /* Select owner and schema as necessary */
526                                         _becomeOwner(AH, te);
527                                         _selectOutputSchema(AH, te->namespace);
528
529                                         ahlog(AH, 1, "restoring data for table \"%s\"\n",
530                                                   te->tag);
531
532                                         /*
533                                          * In parallel restore, if we created the table earlier
534                                          * in the run then we wrap the COPY in a transaction and
535                                          * precede it with a TRUNCATE.  If archiving is not on
536                                          * this prevents WAL-logging the COPY.  This obtains a
537                                          * speedup similar to that from using single_txn mode
538                                          * in non-parallel restores.
539                                          */
540                                         if (is_parallel && te->created)
541                                         {
542                                                 /*
543                                                  * Parallel restore is always talking directly to a
544                                                  * server, so no need to see if we should issue BEGIN.
545                                                  */
546                                                 StartTransaction(AH);
547
548                                                 /*
549                                                  * If the server version is >= 8.4, make sure we issue
550                                                  * TRUNCATE with ONLY so that child tables are not
551                                                  * wiped.
552                                                  */
553                                                 ahprintf(AH, "TRUNCATE TABLE %s%s;\n\n",
554                                                                  (PQserverVersion(AH->connection) >= 80400 ?
555                                                                   "ONLY " : ""),
556                                                                  fmtId(te->tag));
557                                         }
558
559                                         /*
560                                          * If we have a copy statement, use it. As of V1.3,
561                                          * these are separate to allow easy import from
562                                          * withing a database connection. Pre 1.3 archives can
563                                          * not use DB connections and are sent to output only.
564                                          *
565                                          * For V1.3+, the table data MUST have a copy
566                                          * statement so that we can go into appropriate mode
567                                          * with libpq.
568                                          */
569                                         if (te->copyStmt && strlen(te->copyStmt) > 0)
570                                         {
571                                                 ahprintf(AH, "%s", te->copyStmt);
572                                                 AH->writingCopyData = true;
573                                         }
574
575                                         (*AH->PrintTocDataPtr) (AH, te, ropt);
576
577                                         AH->writingCopyData = false;
578
579                                         /* close out the transaction started above */
580                                         if (is_parallel && te->created)
581                                                 CommitTransaction(AH);
582
583                                         _enableTriggersIfNecessary(AH, te, ropt);
584                                 }
585                         }
586                 }
587                 else if (!defnDumped)
588                 {
589                         /* If we haven't already dumped the defn part, do so now */
590                         ahlog(AH, 1, "executing %s %s\n", te->desc, te->tag);
591                         _printTocEntry(AH, te, ropt, false, false);
592                 }
593         }
594
595         return retval;
596 }
597
598 /*
599  * Allocate a new RestoreOptions block.
600  * This is mainly so we can initialize it, but also for future expansion,
601  */
602 RestoreOptions *
603 NewRestoreOptions(void)
604 {
605         RestoreOptions *opts;
606
607         opts = (RestoreOptions *) calloc(1, sizeof(RestoreOptions));
608
609         /* set any fields that shouldn't default to zeroes */
610         opts->format = archUnknown;
611
612         return opts;
613 }
614
615 static void
616 _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt)
617 {
618         /* This hack is only needed in a data-only restore */
619         if (!ropt->dataOnly || !ropt->disable_triggers)
620                 return;
621
622         ahlog(AH, 1, "disabling triggers for %s\n", te->tag);
623
624         /*
625          * Become superuser if possible, since they are the only ones who can
626          * disable constraint triggers.  If -S was not given, assume the initial
627          * user identity is a superuser.  (XXX would it be better to become the
628          * table owner?)
629          */
630         _becomeUser(AH, ropt->superuser);
631
632         /*
633          * Disable them.
634          */
635         _selectOutputSchema(AH, te->namespace);
636
637         ahprintf(AH, "ALTER TABLE %s DISABLE TRIGGER ALL;\n\n",
638                          fmtId(te->tag));
639 }
640
641 static void
642 _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt)
643 {
644         /* This hack is only needed in a data-only restore */
645         if (!ropt->dataOnly || !ropt->disable_triggers)
646                 return;
647
648         ahlog(AH, 1, "enabling triggers for %s\n", te->tag);
649
650         /*
651          * Become superuser if possible, since they are the only ones who can
652          * disable constraint triggers.  If -S was not given, assume the initial
653          * user identity is a superuser.  (XXX would it be better to become the
654          * table owner?)
655          */
656         _becomeUser(AH, ropt->superuser);
657
658         /*
659          * Enable them.
660          */
661         _selectOutputSchema(AH, te->namespace);
662
663         ahprintf(AH, "ALTER TABLE %s ENABLE TRIGGER ALL;\n\n",
664                          fmtId(te->tag));
665 }
666
667 /*
668  * This is a routine that is part of the dumper interface, hence the 'Archive*' parameter.
669  */
670
671 /* Public */
672 size_t
673 WriteData(Archive *AHX, const void *data, size_t dLen)
674 {
675         ArchiveHandle *AH = (ArchiveHandle *) AHX;
676
677         if (!AH->currToc)
678                 die_horribly(AH, modulename, "internal error -- WriteData cannot be called outside the context of a DataDumper routine\n");
679
680         return (*AH->WriteDataPtr) (AH, data, dLen);
681 }
682
683 /*
684  * Create a new TOC entry. The TOC was designed as a TOC, but is now the
685  * repository for all metadata. But the name has stuck.
686  */
687
688 /* Public */
689 void
690 ArchiveEntry(Archive *AHX,
691                          CatalogId catalogId, DumpId dumpId,
692                          const char *tag,
693                          const char *namespace,
694                          const char *tablespace,
695                          const char *owner, bool withOids,
696                          const char *desc, teSection section,
697                          const char *defn,
698                          const char *dropStmt, const char *copyStmt,
699                          const DumpId *deps, int nDeps,
700                          DataDumperPtr dumpFn, void *dumpArg)
701 {
702         ArchiveHandle *AH = (ArchiveHandle *) AHX;
703         TocEntry   *newToc;
704
705         newToc = (TocEntry *) calloc(1, sizeof(TocEntry));
706         if (!newToc)
707                 die_horribly(AH, modulename, "out of memory\n");
708
709         AH->tocCount++;
710         if (dumpId > AH->maxDumpId)
711                 AH->maxDumpId = dumpId;
712
713         newToc->prev = AH->toc->prev;
714         newToc->next = AH->toc;
715         AH->toc->prev->next = newToc;
716         AH->toc->prev = newToc;
717
718         newToc->catalogId = catalogId;
719         newToc->dumpId = dumpId;
720         newToc->section = section;
721
722         newToc->tag = strdup(tag);
723         newToc->namespace = namespace ? strdup(namespace) : NULL;
724         newToc->tablespace = tablespace ? strdup(tablespace) : NULL;
725         newToc->owner = strdup(owner);
726         newToc->withOids = withOids;
727         newToc->desc = strdup(desc);
728         newToc->defn = strdup(defn);
729         newToc->dropStmt = strdup(dropStmt);
730         newToc->copyStmt = copyStmt ? strdup(copyStmt) : NULL;
731
732         if (nDeps > 0)
733         {
734                 newToc->dependencies = (DumpId *) malloc(nDeps * sizeof(DumpId));
735                 memcpy(newToc->dependencies, deps, nDeps * sizeof(DumpId));
736                 newToc->nDeps = nDeps;
737         }
738         else
739         {
740                 newToc->dependencies = NULL;
741                 newToc->nDeps = 0;
742         }
743
744         newToc->dataDumper = dumpFn;
745         newToc->dataDumperArg = dumpArg;
746         newToc->hadDumper = dumpFn ? true : false;
747
748         newToc->formatData = NULL;
749
750         if (AH->ArchiveEntryPtr !=NULL)
751                 (*AH->ArchiveEntryPtr) (AH, newToc);
752 }
753
754 /* Public */
755 void
756 PrintTOCSummary(Archive *AHX, RestoreOptions *ropt)
757 {
758         ArchiveHandle *AH = (ArchiveHandle *) AHX;
759         TocEntry   *te;
760         OutputContext sav;
761         char       *fmtName;
762
763         if (ropt->filename)
764                 sav = SetOutput(AH, ropt->filename, 0 /* no compression */ );
765
766         ahprintf(AH, ";\n; Archive created at %s", ctime(&AH->createDate));
767         ahprintf(AH, ";     dbname: %s\n;     TOC Entries: %d\n;     Compression: %d\n",
768                          AH->archdbname, AH->tocCount, AH->compression);
769
770         switch (AH->format)
771         {
772                 case archFiles:
773                         fmtName = "FILES";
774                         break;
775                 case archCustom:
776                         fmtName = "CUSTOM";
777                         break;
778                 case archTar:
779                         fmtName = "TAR";
780                         break;
781                 default:
782                         fmtName = "UNKNOWN";
783         }
784
785         ahprintf(AH, ";     Dump Version: %d.%d-%d\n", AH->vmaj, AH->vmin, AH->vrev);
786         ahprintf(AH, ";     Format: %s\n", fmtName);
787         ahprintf(AH, ";     Integer: %d bytes\n", (int) AH->intSize);
788         ahprintf(AH, ";     Offset: %d bytes\n", (int) AH->offSize);
789         if (AH->archiveRemoteVersion)
790                 ahprintf(AH, ";     Dumped from database version: %s\n",
791                                  AH->archiveRemoteVersion);
792         if (AH->archiveDumpVersion)
793                 ahprintf(AH, ";     Dumped by pg_dump version: %s\n",
794                                  AH->archiveDumpVersion);
795
796         ahprintf(AH, ";\n;\n; Selected TOC Entries:\n;\n");
797
798         for (te = AH->toc->next; te != AH->toc; te = te->next)
799         {
800                 if (ropt->verbose || _tocEntryRequired(te, ropt, true) != 0)
801                         ahprintf(AH, "%d; %u %u %s %s %s %s\n", te->dumpId,
802                                          te->catalogId.tableoid, te->catalogId.oid,
803                                          te->desc, te->namespace ? te->namespace : "-",
804                                          te->tag, te->owner);
805                 if (ropt->verbose && te->nDeps > 0)
806                 {
807                         int             i;
808
809                         ahprintf(AH, ";\tdepends on:");
810                         for (i = 0; i < te->nDeps; i++)
811                                 ahprintf(AH, " %d", te->dependencies[i]);
812                         ahprintf(AH, "\n");
813                 }
814         }
815
816         if (ropt->filename)
817                 ResetOutput(AH, sav);
818 }
819
820 /***********
821  * BLOB Archival
822  ***********/
823
824 /* Called by a dumper to signal start of a BLOB */
825 int
826 StartBlob(Archive *AHX, Oid oid)
827 {
828         ArchiveHandle *AH = (ArchiveHandle *) AHX;
829
830         if (!AH->StartBlobPtr)
831                 die_horribly(AH, modulename, "large-object output not supported in chosen format\n");
832
833         (*AH->StartBlobPtr) (AH, AH->currToc, oid);
834
835         return 1;
836 }
837
838 /* Called by a dumper to signal end of a BLOB */
839 int
840 EndBlob(Archive *AHX, Oid oid)
841 {
842         ArchiveHandle *AH = (ArchiveHandle *) AHX;
843
844         if (AH->EndBlobPtr)
845                 (*AH->EndBlobPtr) (AH, AH->currToc, oid);
846
847         return 1;
848 }
849
850 /**********
851  * BLOB Restoration
852  **********/
853
854 /*
855  * Called by a format handler before any blobs are restored
856  */
857 void
858 StartRestoreBlobs(ArchiveHandle *AH)
859 {
860         if (!AH->ropt->single_txn)
861         {
862                 if (AH->connection)
863                         StartTransaction(AH);
864                 else
865                         ahprintf(AH, "BEGIN;\n\n");
866         }
867
868         AH->blobCount = 0;
869 }
870
871 /*
872  * Called by a format handler after all blobs are restored
873  */
874 void
875 EndRestoreBlobs(ArchiveHandle *AH)
876 {
877         if (!AH->ropt->single_txn)
878         {
879                 if (AH->connection)
880                         CommitTransaction(AH);
881                 else
882                         ahprintf(AH, "COMMIT;\n\n");
883         }
884
885         ahlog(AH, 1, "restored %d large objects\n", AH->blobCount);
886 }
887
888
889 /*
890  * Called by a format handler to initiate restoration of a blob
891  */
892 void
893 StartRestoreBlob(ArchiveHandle *AH, Oid oid)
894 {
895         Oid                     loOid;
896
897         AH->blobCount++;
898
899         /* Initialize the LO Buffer */
900         AH->lo_buf_used = 0;
901
902         ahlog(AH, 2, "restoring large object with OID %u\n", oid);
903
904         if (AH->connection)
905         {
906                 loOid = lo_create(AH->connection, oid);
907                 if (loOid == 0 || loOid != oid)
908                         die_horribly(AH, modulename, "could not create large object %u\n",
909                                                  oid);
910
911                 AH->loFd = lo_open(AH->connection, oid, INV_WRITE);
912                 if (AH->loFd == -1)
913                         die_horribly(AH, modulename, "could not open large object\n");
914         }
915         else
916         {
917                 ahprintf(AH, "SELECT lo_open(lo_create(%u), %d);\n", oid, INV_WRITE);
918         }
919
920         AH->writingBlob = 1;
921 }
922
923 void
924 EndRestoreBlob(ArchiveHandle *AH, Oid oid)
925 {
926         if (AH->lo_buf_used > 0)
927         {
928                 /* Write remaining bytes from the LO buffer */
929                 dump_lo_buf(AH);
930         }
931
932         AH->writingBlob = 0;
933
934         if (AH->connection)
935         {
936                 lo_close(AH->connection, AH->loFd);
937                 AH->loFd = -1;
938         }
939         else
940         {
941                 ahprintf(AH, "SELECT lo_close(0);\n\n");
942         }
943 }
944
945 /***********
946  * Sorting and Reordering
947  ***********/
948
949 void
950 SortTocFromFile(Archive *AHX, RestoreOptions *ropt)
951 {
952         ArchiveHandle *AH = (ArchiveHandle *) AHX;
953         FILE       *fh;
954         char            buf[1024];
955         char       *cmnt;
956         char       *endptr;
957         DumpId          id;
958         TocEntry   *te;
959         TocEntry   *tePrev;
960
961         /* Allocate space for the 'wanted' array, and init it */
962         ropt->idWanted = (bool *) malloc(sizeof(bool) * AH->maxDumpId);
963         memset(ropt->idWanted, 0, sizeof(bool) * AH->maxDumpId);
964
965         /* Set prev entry as head of list */
966         tePrev = AH->toc;
967
968         /* Setup the file */
969         fh = fopen(ropt->tocFile, PG_BINARY_R);
970         if (!fh)
971                 die_horribly(AH, modulename, "could not open TOC file \"%s\": %s\n",
972                                          ropt->tocFile, strerror(errno));
973
974         while (fgets(buf, sizeof(buf), fh) != NULL)
975         {
976                 /* Truncate line at comment, if any */
977                 cmnt = strchr(buf, ';');
978                 if (cmnt != NULL)
979                         cmnt[0] = '\0';
980
981                 /* Ignore if all blank */
982                 if (strspn(buf, " \t\r") == strlen(buf))
983                         continue;
984
985                 /* Get an ID, check it's valid and not already seen */
986                 id = strtol(buf, &endptr, 10);
987                 if (endptr == buf || id <= 0 || id > AH->maxDumpId ||
988                         ropt->idWanted[id - 1])
989                 {
990                         write_msg(modulename, "WARNING: line ignored: %s\n", buf);
991                         continue;
992                 }
993
994                 /* Find TOC entry */
995                 te = getTocEntryByDumpId(AH, id);
996                 if (!te)
997                         die_horribly(AH, modulename, "could not find entry for ID %d\n",
998                                                  id);
999
1000                 ropt->idWanted[id - 1] = true;
1001
1002                 _moveAfter(AH, tePrev, te);
1003                 tePrev = te;
1004         }
1005
1006         if (fclose(fh) != 0)
1007                 die_horribly(AH, modulename, "could not close TOC file: %s\n",
1008                                          strerror(errno));
1009 }
1010
1011 /*
1012  * Set up a dummy ID filter that selects all dump IDs
1013  */
1014 void
1015 InitDummyWantedList(Archive *AHX, RestoreOptions *ropt)
1016 {
1017         ArchiveHandle *AH = (ArchiveHandle *) AHX;
1018
1019         /* Allocate space for the 'wanted' array, and init it to 1's */
1020         ropt->idWanted = (bool *) malloc(sizeof(bool) * AH->maxDumpId);
1021         memset(ropt->idWanted, 1, sizeof(bool) * AH->maxDumpId);
1022 }
1023
1024 /**********************
1025  * 'Convenience functions that look like standard IO functions
1026  * for writing data when in dump mode.
1027  **********************/
1028
1029 /* Public */
1030 int
1031 archputs(const char *s, Archive *AH)
1032 {
1033         return WriteData(AH, s, strlen(s));
1034 }
1035
1036 /* Public */
1037 int
1038 archprintf(Archive *AH, const char *fmt,...)
1039 {
1040         char       *p = NULL;
1041         va_list         ap;
1042         int                     bSize = strlen(fmt) + 256;
1043         int                     cnt = -1;
1044
1045         /*
1046          * This is paranoid: deal with the possibility that vsnprintf is willing
1047          * to ignore trailing null or returns > 0 even if string does not fit. It
1048          * may be the case that it returns cnt = bufsize
1049          */
1050         while (cnt < 0 || cnt >= (bSize - 1))
1051         {
1052                 if (p != NULL)
1053                         free(p);
1054                 bSize *= 2;
1055                 p = (char *) malloc(bSize);
1056                 if (p == NULL)
1057                         exit_horribly(AH, modulename, "out of memory\n");
1058                 va_start(ap, fmt);
1059                 cnt = vsnprintf(p, bSize, fmt, ap);
1060                 va_end(ap);
1061         }
1062         WriteData(AH, p, cnt);
1063         free(p);
1064         return cnt;
1065 }
1066
1067
1068 /*******************************
1069  * Stuff below here should be 'private' to the archiver routines
1070  *******************************/
1071
1072 static OutputContext
1073 SetOutput(ArchiveHandle *AH, char *filename, int compression)
1074 {
1075         OutputContext sav;
1076         int                     fn;
1077
1078         /* Replace the AH output file handle */
1079         sav.OF = AH->OF;
1080         sav.gzOut = AH->gzOut;
1081
1082         if (filename)
1083                 fn = -1;
1084         else if (AH->FH)
1085                 fn = fileno(AH->FH);
1086         else if (AH->fSpec)
1087         {
1088                 fn = -1;
1089                 filename = AH->fSpec;
1090         }
1091         else
1092                 fn = fileno(stdout);
1093
1094         /* If compression explicitly requested, use gzopen */
1095 #ifdef HAVE_LIBZ
1096         if (compression != 0)
1097         {
1098                 char            fmode[10];
1099
1100                 /* Don't use PG_BINARY_x since this is zlib */
1101                 sprintf(fmode, "wb%d", compression);
1102                 if (fn >= 0)
1103                         AH->OF = gzdopen(dup(fn), fmode);
1104                 else
1105                         AH->OF = gzopen(filename, fmode);
1106                 AH->gzOut = 1;
1107         }
1108         else
1109 #endif
1110         {                                                       /* Use fopen */
1111                 if (AH->mode == archModeAppend)
1112                 {
1113                         if (fn >= 0)
1114                                 AH->OF = fdopen(dup(fn), PG_BINARY_A);
1115                         else
1116                                 AH->OF = fopen(filename, PG_BINARY_A);
1117                 }
1118                 else
1119                 {
1120                         if (fn >= 0)
1121                                 AH->OF = fdopen(dup(fn), PG_BINARY_W);
1122                         else
1123                                 AH->OF = fopen(filename, PG_BINARY_W);
1124                 }
1125                 AH->gzOut = 0;
1126         }
1127
1128         if (!AH->OF)
1129         {
1130                 if (filename)
1131                         die_horribly(AH, modulename, "could not open output file \"%s\": %s\n",
1132                                                  filename, strerror(errno));
1133                 else
1134                         die_horribly(AH, modulename, "could not open output file: %s\n",
1135                                                  strerror(errno));
1136         }
1137
1138         return sav;
1139 }
1140
1141 static void
1142 ResetOutput(ArchiveHandle *AH, OutputContext sav)
1143 {
1144         int                     res;
1145
1146         if (AH->gzOut)
1147                 res = GZCLOSE(AH->OF);
1148         else
1149                 res = fclose(AH->OF);
1150
1151         if (res != 0)
1152                 die_horribly(AH, modulename, "could not close output file: %s\n",
1153                                          strerror(errno));
1154
1155         AH->gzOut = sav.gzOut;
1156         AH->OF = sav.OF;
1157 }
1158
1159
1160
1161 /*
1162  *      Print formatted text to the output file (usually stdout).
1163  */
1164 int
1165 ahprintf(ArchiveHandle *AH, const char *fmt,...)
1166 {
1167         char       *p = NULL;
1168         va_list         ap;
1169         int                     bSize = strlen(fmt) + 256;              /* Should be enough */
1170         int                     cnt = -1;
1171
1172         /*
1173          * This is paranoid: deal with the possibility that vsnprintf is willing
1174          * to ignore trailing null
1175          */
1176
1177         /*
1178          * or returns > 0 even if string does not fit. It may be the case that it
1179          * returns cnt = bufsize
1180          */
1181         while (cnt < 0 || cnt >= (bSize - 1))
1182         {
1183                 if (p != NULL)
1184                         free(p);
1185                 bSize *= 2;
1186                 p = (char *) malloc(bSize);
1187                 if (p == NULL)
1188                         die_horribly(AH, modulename, "out of memory\n");
1189                 va_start(ap, fmt);
1190                 cnt = vsnprintf(p, bSize, fmt, ap);
1191                 va_end(ap);
1192         }
1193         ahwrite(p, 1, cnt, AH);
1194         free(p);
1195         return cnt;
1196 }
1197
1198 void
1199 ahlog(ArchiveHandle *AH, int level, const char *fmt,...)
1200 {
1201         va_list         ap;
1202
1203         if (AH->debugLevel < level && (!AH->public.verbose || level > 1))
1204                 return;
1205
1206         va_start(ap, fmt);
1207         _write_msg(NULL, fmt, ap);
1208         va_end(ap);
1209 }
1210
1211 /*
1212  * Single place for logic which says 'We are restoring to a direct DB connection'.
1213  */
1214 static int
1215 RestoringToDB(ArchiveHandle *AH)
1216 {
1217         return (AH->ropt && AH->ropt->useDB && AH->connection);
1218 }
1219
1220 /*
1221  * Dump the current contents of the LO data buffer while writing a BLOB
1222  */
1223 static void
1224 dump_lo_buf(ArchiveHandle *AH)
1225 {
1226         if (AH->connection)
1227         {
1228                 size_t          res;
1229
1230                 res = lo_write(AH->connection, AH->loFd, AH->lo_buf, AH->lo_buf_used);
1231                 ahlog(AH, 5, "wrote %lu bytes of large object data (result = %lu)\n",
1232                           (unsigned long) AH->lo_buf_used, (unsigned long) res);
1233                 if (res != AH->lo_buf_used)
1234                         die_horribly(AH, modulename,
1235                         "could not write to large object (result: %lu, expected: %lu)\n",
1236                                            (unsigned long) res, (unsigned long) AH->lo_buf_used);
1237         }
1238         else
1239         {
1240                 unsigned char *str;
1241                 size_t          len;
1242
1243                 str = PQescapeBytea((const unsigned char *) AH->lo_buf,
1244                                                         AH->lo_buf_used, &len);
1245                 if (!str)
1246                         die_horribly(AH, modulename, "out of memory\n");
1247
1248                 /* Hack: turn off writingBlob so ahwrite doesn't recurse to here */
1249                 AH->writingBlob = 0;
1250                 ahprintf(AH, "SELECT lowrite(0, '%s');\n", str);
1251                 AH->writingBlob = 1;
1252
1253                 free(str);
1254         }
1255         AH->lo_buf_used = 0;
1256 }
1257
1258
1259 /*
1260  *      Write buffer to the output file (usually stdout). This is user for
1261  *      outputting 'restore' scripts etc. It is even possible for an archive
1262  *      format to create a custom output routine to 'fake' a restore if it
1263  *      wants to generate a script (see TAR output).
1264  */
1265 int
1266 ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
1267 {
1268         size_t          res;
1269
1270         if (AH->writingBlob)
1271         {
1272                 size_t          remaining = size * nmemb;
1273
1274                 while (AH->lo_buf_used + remaining > AH->lo_buf_size)
1275                 {
1276                         size_t          avail = AH->lo_buf_size - AH->lo_buf_used;
1277
1278                         memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, avail);
1279                         ptr = (const void *) ((const char *) ptr + avail);
1280                         remaining -= avail;
1281                         AH->lo_buf_used += avail;
1282                         dump_lo_buf(AH);
1283                 }
1284
1285                 memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, remaining);
1286                 AH->lo_buf_used += remaining;
1287
1288                 return size * nmemb;
1289         }
1290         else if (AH->gzOut)
1291         {
1292                 res = GZWRITE((void *) ptr, size, nmemb, AH->OF);
1293                 if (res != (nmemb * size))
1294                         die_horribly(AH, modulename, "could not write to output file: %s\n", strerror(errno));
1295                 return res;
1296         }
1297         else if (AH->CustomOutPtr)
1298         {
1299                 res = AH->CustomOutPtr (AH, ptr, size * nmemb);
1300
1301                 if (res != (nmemb * size))
1302                         die_horribly(AH, modulename, "could not write to custom output routine\n");
1303                 return res;
1304         }
1305         else
1306         {
1307                 /*
1308                  * If we're doing a restore, and it's direct to DB, and we're
1309                  * connected then send it to the DB.
1310                  */
1311                 if (RestoringToDB(AH))
1312                         return ExecuteSqlCommandBuf(AH, (void *) ptr, size * nmemb);            /* Always 1, currently */
1313                 else
1314                 {
1315                         res = fwrite((void *) ptr, size, nmemb, AH->OF);
1316                         if (res != nmemb)
1317                                 die_horribly(AH, modulename, "could not write to output file: %s\n",
1318                                                          strerror(errno));
1319                         return res;
1320                 }
1321         }
1322 }
1323
1324 /* Common exit code */
1325 static void
1326 _write_msg(const char *modulename, const char *fmt, va_list ap)
1327 {
1328         if (modulename)
1329                 fprintf(stderr, "%s: [%s] ", progname, _(modulename));
1330         else
1331                 fprintf(stderr, "%s: ", progname);
1332         vfprintf(stderr, _(fmt), ap);
1333 }
1334
1335 void
1336 write_msg(const char *modulename, const char *fmt,...)
1337 {
1338         va_list         ap;
1339
1340         va_start(ap, fmt);
1341         _write_msg(modulename, fmt, ap);
1342         va_end(ap);
1343 }
1344
1345
1346 static void
1347 _die_horribly(ArchiveHandle *AH, const char *modulename, const char *fmt, va_list ap)
1348 {
1349         _write_msg(modulename, fmt, ap);
1350
1351         if (AH)
1352         {
1353                 if (AH->public.verbose)
1354                         write_msg(NULL, "*** aborted because of error\n");
1355                 if (AH->connection)
1356                         PQfinish(AH->connection);
1357         }
1358
1359         exit(1);
1360 }
1361
1362 /* External use */
1363 void
1364 exit_horribly(Archive *AH, const char *modulename, const char *fmt,...)
1365 {
1366         va_list         ap;
1367
1368         va_start(ap, fmt);
1369         _die_horribly((ArchiveHandle *) AH, modulename, fmt, ap);
1370         va_end(ap);
1371 }
1372
1373 /* Archiver use (just different arg declaration) */
1374 void
1375 die_horribly(ArchiveHandle *AH, const char *modulename, const char *fmt,...)
1376 {
1377         va_list         ap;
1378
1379         va_start(ap, fmt);
1380         _die_horribly(AH, modulename, fmt, ap);
1381         va_end(ap);
1382 }
1383
1384 /* on some error, we may decide to go on... */
1385 void
1386 warn_or_die_horribly(ArchiveHandle *AH,
1387                                          const char *modulename, const char *fmt,...)
1388 {
1389         va_list         ap;
1390
1391         switch (AH->stage)
1392         {
1393
1394                 case STAGE_NONE:
1395                         /* Do nothing special */
1396                         break;
1397
1398                 case STAGE_INITIALIZING:
1399                         if (AH->stage != AH->lastErrorStage)
1400                                 write_msg(modulename, "Error while INITIALIZING:\n");
1401                         break;
1402
1403                 case STAGE_PROCESSING:
1404                         if (AH->stage != AH->lastErrorStage)
1405                                 write_msg(modulename, "Error while PROCESSING TOC:\n");
1406                         break;
1407
1408                 case STAGE_FINALIZING:
1409                         if (AH->stage != AH->lastErrorStage)
1410                                 write_msg(modulename, "Error while FINALIZING:\n");
1411                         break;
1412         }
1413         if (AH->currentTE != NULL && AH->currentTE != AH->lastErrorTE)
1414         {
1415                 write_msg(modulename, "Error from TOC entry %d; %u %u %s %s %s\n",
1416                                   AH->currentTE->dumpId,
1417                          AH->currentTE->catalogId.tableoid, AH->currentTE->catalogId.oid,
1418                           AH->currentTE->desc, AH->currentTE->tag, AH->currentTE->owner);
1419         }
1420         AH->lastErrorStage = AH->stage;
1421         AH->lastErrorTE = AH->currentTE;
1422
1423         va_start(ap, fmt);
1424         if (AH->public.exit_on_error)
1425                 _die_horribly(AH, modulename, fmt, ap);
1426         else
1427         {
1428                 _write_msg(modulename, fmt, ap);
1429                 AH->public.n_errors++;
1430         }
1431         va_end(ap);
1432 }
1433
1434 static void
1435 _moveAfter(ArchiveHandle *AH, TocEntry *pos, TocEntry *te)
1436 {
1437         te->prev->next = te->next;
1438         te->next->prev = te->prev;
1439
1440         te->prev = pos;
1441         te->next = pos->next;
1442
1443         pos->next->prev = te;
1444         pos->next = te;
1445 }
1446
1447 #ifdef NOT_USED
1448
1449 static void
1450 _moveBefore(ArchiveHandle *AH, TocEntry *pos, TocEntry *te)
1451 {
1452         te->prev->next = te->next;
1453         te->next->prev = te->prev;
1454
1455         te->prev = pos->prev;
1456         te->next = pos;
1457         pos->prev->next = te;
1458         pos->prev = te;
1459 }
1460 #endif
1461
1462 static TocEntry *
1463 getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
1464 {
1465         TocEntry   *te;
1466
1467         for (te = AH->toc->next; te != AH->toc; te = te->next)
1468         {
1469                 if (te->dumpId == id)
1470                         return te;
1471         }
1472         return NULL;
1473 }
1474
1475 teReqs
1476 TocIDRequired(ArchiveHandle *AH, DumpId id, RestoreOptions *ropt)
1477 {
1478         TocEntry   *te = getTocEntryByDumpId(AH, id);
1479
1480         if (!te)
1481                 return 0;
1482
1483         return _tocEntryRequired(te, ropt, true);
1484 }
1485
1486 size_t
1487 WriteOffset(ArchiveHandle *AH, pgoff_t o, int wasSet)
1488 {
1489         int                     off;
1490
1491         /* Save the flag */
1492         (*AH->WriteBytePtr) (AH, wasSet);
1493
1494         /* Write out pgoff_t smallest byte first, prevents endian mismatch */
1495         for (off = 0; off < sizeof(pgoff_t); off++)
1496         {
1497                 (*AH->WriteBytePtr) (AH, o & 0xFF);
1498                 o >>= 8;
1499         }
1500         return sizeof(pgoff_t) + 1;
1501 }
1502
1503 int
1504 ReadOffset(ArchiveHandle *AH, pgoff_t * o)
1505 {
1506         int                     i;
1507         int                     off;
1508         int                     offsetFlg;
1509
1510         /* Initialize to zero */
1511         *o = 0;
1512
1513         /* Check for old version */
1514         if (AH->version < K_VERS_1_7)
1515         {
1516                 /* Prior versions wrote offsets using WriteInt */
1517                 i = ReadInt(AH);
1518                 /* -1 means not set */
1519                 if (i < 0)
1520                         return K_OFFSET_POS_NOT_SET;
1521                 else if (i == 0)
1522                         return K_OFFSET_NO_DATA;
1523
1524                 /* Cast to pgoff_t because it was written as an int. */
1525                 *o = (pgoff_t) i;
1526                 return K_OFFSET_POS_SET;
1527         }
1528
1529         /*
1530          * Read the flag indicating the state of the data pointer. Check if valid
1531          * and die if not.
1532          *
1533          * This used to be handled by a negative or zero pointer, now we use an
1534          * extra byte specifically for the state.
1535          */
1536         offsetFlg = (*AH->ReadBytePtr) (AH) & 0xFF;
1537
1538         switch (offsetFlg)
1539         {
1540                 case K_OFFSET_POS_NOT_SET:
1541                 case K_OFFSET_NO_DATA:
1542                 case K_OFFSET_POS_SET:
1543
1544                         break;
1545
1546                 default:
1547                         die_horribly(AH, modulename, "unexpected data offset flag %d\n", offsetFlg);
1548         }
1549
1550         /*
1551          * Read the bytes
1552          */
1553         for (off = 0; off < AH->offSize; off++)
1554         {
1555                 if (off < sizeof(pgoff_t))
1556                         *o |= ((pgoff_t) ((*AH->ReadBytePtr) (AH))) << (off * 8);
1557                 else
1558                 {
1559                         if ((*AH->ReadBytePtr) (AH) != 0)
1560                                 die_horribly(AH, modulename, "file offset in dump file is too large\n");
1561                 }
1562         }
1563
1564         return offsetFlg;
1565 }
1566
1567 size_t
1568 WriteInt(ArchiveHandle *AH, int i)
1569 {
1570         int                     b;
1571
1572         /*
1573          * This is a bit yucky, but I don't want to make the binary format very
1574          * dependent on representation, and not knowing much about it, I write out
1575          * a sign byte. If you change this, don't forget to change the file
1576          * version #, and modify readInt to read the new format AS WELL AS the old
1577          * formats.
1578          */
1579
1580         /* SIGN byte */
1581         if (i < 0)
1582         {
1583                 (*AH->WriteBytePtr) (AH, 1);
1584                 i = -i;
1585         }
1586         else
1587                 (*AH->WriteBytePtr) (AH, 0);
1588
1589         for (b = 0; b < AH->intSize; b++)
1590         {
1591                 (*AH->WriteBytePtr) (AH, i & 0xFF);
1592                 i >>= 8;
1593         }
1594
1595         return AH->intSize + 1;
1596 }
1597
1598 int
1599 ReadInt(ArchiveHandle *AH)
1600 {
1601         int                     res = 0;
1602         int                     bv,
1603                                 b;
1604         int                     sign = 0;               /* Default positive */
1605         int                     bitShift = 0;
1606
1607         if (AH->version > K_VERS_1_0)
1608                 /* Read a sign byte */
1609                 sign = (*AH->ReadBytePtr) (AH);
1610
1611         for (b = 0; b < AH->intSize; b++)
1612         {
1613                 bv = (*AH->ReadBytePtr) (AH) & 0xFF;
1614                 if (bv != 0)
1615                         res = res + (bv << bitShift);
1616                 bitShift += 8;
1617         }
1618
1619         if (sign)
1620                 res = -res;
1621
1622         return res;
1623 }
1624
1625 size_t
1626 WriteStr(ArchiveHandle *AH, const char *c)
1627 {
1628         size_t          res;
1629
1630         if (c)
1631         {
1632                 res = WriteInt(AH, strlen(c));
1633                 res += (*AH->WriteBufPtr) (AH, c, strlen(c));
1634         }
1635         else
1636                 res = WriteInt(AH, -1);
1637
1638         return res;
1639 }
1640
1641 char *
1642 ReadStr(ArchiveHandle *AH)
1643 {
1644         char       *buf;
1645         int                     l;
1646
1647         l = ReadInt(AH);
1648         if (l < 0)
1649                 buf = NULL;
1650         else
1651         {
1652                 buf = (char *) malloc(l + 1);
1653                 if (!buf)
1654                         die_horribly(AH, modulename, "out of memory\n");
1655
1656                 if ((*AH->ReadBufPtr) (AH, (void *) buf, l) != l)
1657                         die_horribly(AH, modulename, "unexpected end of file\n");
1658
1659                 buf[l] = '\0';
1660         }
1661
1662         return buf;
1663 }
1664
1665 static int
1666 _discoverArchiveFormat(ArchiveHandle *AH)
1667 {
1668         FILE       *fh;
1669         char            sig[6];                 /* More than enough */
1670         size_t          cnt;
1671         int                     wantClose = 0;
1672
1673 #if 0
1674         write_msg(modulename, "attempting to ascertain archive format\n");
1675 #endif
1676
1677         if (AH->lookahead)
1678                 free(AH->lookahead);
1679
1680         AH->lookaheadSize = 512;
1681         AH->lookahead = calloc(1, 512);
1682         AH->lookaheadLen = 0;
1683         AH->lookaheadPos = 0;
1684
1685         if (AH->fSpec)
1686         {
1687                 wantClose = 1;
1688                 fh = fopen(AH->fSpec, PG_BINARY_R);
1689                 if (!fh)
1690                         die_horribly(AH, modulename, "could not open input file \"%s\": %s\n",
1691                                                  AH->fSpec, strerror(errno));
1692         }
1693         else
1694         {
1695                 fh = stdin;
1696                 if (!fh)
1697                         die_horribly(AH, modulename, "could not open input file: %s\n",
1698                                                  strerror(errno));
1699         }
1700
1701         cnt = fread(sig, 1, 5, fh);
1702
1703         if (cnt != 5)
1704         {
1705                 if (ferror(fh))
1706                         die_horribly(AH, modulename, "could not read input file: %s\n", strerror(errno));
1707                 else
1708                         die_horribly(AH, modulename, "input file is too short (read %lu, expected 5)\n",
1709                                                  (unsigned long) cnt);
1710         }
1711
1712         /* Save it, just in case we need it later */
1713         strncpy(&AH->lookahead[0], sig, 5);
1714         AH->lookaheadLen = 5;
1715
1716         if (strncmp(sig, "PGDMP", 5) == 0)
1717         {
1718                 AH->vmaj = fgetc(fh);
1719                 AH->vmin = fgetc(fh);
1720
1721                 /* Save these too... */
1722                 AH->lookahead[AH->lookaheadLen++] = AH->vmaj;
1723                 AH->lookahead[AH->lookaheadLen++] = AH->vmin;
1724
1725                 /* Check header version; varies from V1.0 */
1726                 if (AH->vmaj > 1 || ((AH->vmaj == 1) && (AH->vmin > 0)))                /* Version > 1.0 */
1727                 {
1728                         AH->vrev = fgetc(fh);
1729                         AH->lookahead[AH->lookaheadLen++] = AH->vrev;
1730                 }
1731                 else
1732                         AH->vrev = 0;
1733
1734                 /* Make a convenient integer <maj><min><rev>00 */
1735                 AH->version = ((AH->vmaj * 256 + AH->vmin) * 256 + AH->vrev) * 256 + 0;
1736
1737                 AH->intSize = fgetc(fh);
1738                 AH->lookahead[AH->lookaheadLen++] = AH->intSize;
1739
1740                 if (AH->version >= K_VERS_1_7)
1741                 {
1742                         AH->offSize = fgetc(fh);
1743                         AH->lookahead[AH->lookaheadLen++] = AH->offSize;
1744                 }
1745                 else
1746                         AH->offSize = AH->intSize;
1747
1748                 AH->format = fgetc(fh);
1749                 AH->lookahead[AH->lookaheadLen++] = AH->format;
1750         }
1751         else
1752         {
1753                 /*
1754                  * *Maybe* we have a tar archive format file... So, read first 512
1755                  * byte header...
1756                  */
1757                 cnt = fread(&AH->lookahead[AH->lookaheadLen], 1, 512 - AH->lookaheadLen, fh);
1758                 AH->lookaheadLen += cnt;
1759
1760                 if (AH->lookaheadLen != 512)
1761                         die_horribly(AH, modulename, "input file does not appear to be a valid archive (too short?)\n");
1762
1763                 if (!isValidTarHeader(AH->lookahead))
1764                         die_horribly(AH, modulename, "input file does not appear to be a valid archive\n");
1765
1766                 AH->format = archTar;
1767         }
1768
1769         /* If we can't seek, then mark the header as read */
1770         if (fseeko(fh, 0, SEEK_SET) != 0)
1771         {
1772                 /*
1773                  * NOTE: Formats that use the lookahead buffer can unset this in their
1774                  * Init routine.
1775                  */
1776                 AH->readHeader = 1;
1777         }
1778         else
1779                 AH->lookaheadLen = 0;   /* Don't bother since we've reset the file */
1780
1781 #if 0
1782         write_msg(modulename, "read %lu bytes into lookahead buffer\n",
1783                           (unsigned long) AH->lookaheadLen);
1784 #endif
1785
1786         /* Close the file */
1787         if (wantClose)
1788                 if (fclose(fh) != 0)
1789                         die_horribly(AH, modulename, "could not close input file: %s\n",
1790                                                  strerror(errno));
1791
1792         return AH->format;
1793 }
1794
1795
1796 /*
1797  * Allocate an archive handle
1798  */
1799 static ArchiveHandle *
1800 _allocAH(const char *FileSpec, const ArchiveFormat fmt,
1801                  const int compression, ArchiveMode mode)
1802 {
1803         ArchiveHandle *AH;
1804
1805 #if 0
1806         write_msg(modulename, "allocating AH for %s, format %d\n", FileSpec, fmt);
1807 #endif
1808
1809         AH = (ArchiveHandle *) calloc(1, sizeof(ArchiveHandle));
1810         if (!AH)
1811                 die_horribly(AH, modulename, "out of memory\n");
1812
1813         /* AH->debugLevel = 100; */
1814
1815         AH->vmaj = K_VERS_MAJOR;
1816         AH->vmin = K_VERS_MINOR;
1817         AH->vrev = K_VERS_REV;
1818
1819         /* initialize for backwards compatible string processing */
1820         AH->public.encoding = 0;        /* PG_SQL_ASCII */
1821         AH->public.std_strings = false;
1822
1823         /* sql error handling */
1824         AH->public.exit_on_error = true;
1825         AH->public.n_errors = 0;
1826
1827         AH->createDate = time(NULL);
1828
1829         AH->intSize = sizeof(int);
1830         AH->offSize = sizeof(pgoff_t);
1831         if (FileSpec)
1832         {
1833                 AH->fSpec = strdup(FileSpec);
1834
1835                 /*
1836                  * Not used; maybe later....
1837                  *
1838                  * AH->workDir = strdup(FileSpec); for(i=strlen(FileSpec) ; i > 0 ;
1839                  * i--) if (AH->workDir[i-1] == '/')
1840                  */
1841         }
1842         else
1843                 AH->fSpec = NULL;
1844
1845         AH->currUser = NULL;            /* unknown */
1846         AH->currSchema = NULL;          /* ditto */
1847         AH->currTablespace = NULL;      /* ditto */
1848         AH->currWithOids = -1;          /* force SET */
1849
1850         AH->toc = (TocEntry *) calloc(1, sizeof(TocEntry));
1851         if (!AH->toc)
1852                 die_horribly(AH, modulename, "out of memory\n");
1853
1854         AH->toc->next = AH->toc;
1855         AH->toc->prev = AH->toc;
1856
1857         AH->mode = mode;
1858         AH->compression = compression;
1859
1860         AH->pgCopyBuf = createPQExpBuffer();
1861         AH->sqlBuf = createPQExpBuffer();
1862
1863         /* Open stdout with no compression for AH output handle */
1864         AH->gzOut = 0;
1865         AH->OF = stdout;
1866
1867         /*
1868          * On Windows, we need to use binary mode to read/write non-text archive
1869          * formats.  Force stdin/stdout into binary mode if that is what we are
1870          * using.
1871          */
1872 #ifdef WIN32
1873         if (fmt != archNull &&
1874                 (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0))
1875         {
1876                 if (mode == archModeWrite)
1877                         setmode(fileno(stdout), O_BINARY);
1878                 else
1879                         setmode(fileno(stdin), O_BINARY);
1880         }
1881 #endif
1882
1883         if (fmt == archUnknown)
1884                 AH->format = _discoverArchiveFormat(AH);
1885         else
1886                 AH->format = fmt;
1887
1888         switch (AH->format)
1889         {
1890                 case archCustom:
1891                         InitArchiveFmt_Custom(AH);
1892                         break;
1893
1894                 case archFiles:
1895                         InitArchiveFmt_Files(AH);
1896                         break;
1897
1898                 case archNull:
1899                         InitArchiveFmt_Null(AH);
1900                         break;
1901
1902                 case archTar:
1903                         InitArchiveFmt_Tar(AH);
1904                         break;
1905
1906                 default:
1907                         die_horribly(AH, modulename, "unrecognized file format \"%d\"\n", fmt);
1908         }
1909
1910         return AH;
1911 }
1912
1913
1914 void
1915 WriteDataChunks(ArchiveHandle *AH)
1916 {
1917         TocEntry   *te;
1918         StartDataPtr startPtr;
1919         EndDataPtr      endPtr;
1920
1921         for (te = AH->toc->next; te != AH->toc; te = te->next)
1922         {
1923                 if (te->dataDumper != NULL)
1924                 {
1925                         AH->currToc = te;
1926                         /* printf("Writing data for %d (%x)\n", te->id, te); */
1927
1928                         if (strcmp(te->desc, "BLOBS") == 0)
1929                         {
1930                                 startPtr = AH->StartBlobsPtr;
1931                                 endPtr = AH->EndBlobsPtr;
1932                         }
1933                         else
1934                         {
1935                                 startPtr = AH->StartDataPtr;
1936                                 endPtr = AH->EndDataPtr;
1937                         }
1938
1939                         if (startPtr != NULL)
1940                                 (*startPtr) (AH, te);
1941
1942                         /*
1943                          * printf("Dumper arg for %d is %x\n", te->id, te->dataDumperArg);
1944                          */
1945
1946                         /*
1947                          * The user-provided DataDumper routine needs to call
1948                          * AH->WriteData
1949                          */
1950                         (*te->dataDumper) ((Archive *) AH, te->dataDumperArg);
1951
1952                         if (endPtr != NULL)
1953                                 (*endPtr) (AH, te);
1954                         AH->currToc = NULL;
1955                 }
1956         }
1957 }
1958
1959 void
1960 WriteToc(ArchiveHandle *AH)
1961 {
1962         TocEntry   *te;
1963         char            workbuf[32];
1964         int                     i;
1965
1966         /* printf("%d TOC Entries to save\n", AH->tocCount); */
1967
1968         WriteInt(AH, AH->tocCount);
1969
1970         for (te = AH->toc->next; te != AH->toc; te = te->next)
1971         {
1972                 WriteInt(AH, te->dumpId);
1973                 WriteInt(AH, te->dataDumper ? 1 : 0);
1974
1975                 /* OID is recorded as a string for historical reasons */
1976                 sprintf(workbuf, "%u", te->catalogId.tableoid);
1977                 WriteStr(AH, workbuf);
1978                 sprintf(workbuf, "%u", te->catalogId.oid);
1979                 WriteStr(AH, workbuf);
1980
1981                 WriteStr(AH, te->tag);
1982                 WriteStr(AH, te->desc);
1983                 WriteInt(AH, te->section);
1984                 WriteStr(AH, te->defn);
1985                 WriteStr(AH, te->dropStmt);
1986                 WriteStr(AH, te->copyStmt);
1987                 WriteStr(AH, te->namespace);
1988                 WriteStr(AH, te->tablespace);
1989                 WriteStr(AH, te->owner);
1990                 WriteStr(AH, te->withOids ? "true" : "false");
1991
1992                 /* Dump list of dependencies */
1993                 for (i = 0; i < te->nDeps; i++)
1994                 {
1995                         sprintf(workbuf, "%d", te->dependencies[i]);
1996                         WriteStr(AH, workbuf);
1997                 }
1998                 WriteStr(AH, NULL);             /* Terminate List */
1999
2000                 if (AH->WriteExtraTocPtr)
2001                         (*AH->WriteExtraTocPtr) (AH, te);
2002         }
2003 }
2004
2005 void
2006 ReadToc(ArchiveHandle *AH)
2007 {
2008         int                     i;
2009         char       *tmp;
2010         DumpId     *deps;
2011         int                     depIdx;
2012         int                     depSize;
2013         TocEntry   *te;
2014
2015         AH->tocCount = ReadInt(AH);
2016         AH->maxDumpId = 0;
2017
2018         for (i = 0; i < AH->tocCount; i++)
2019         {
2020                 te = (TocEntry *) calloc(1, sizeof(TocEntry));
2021                 te->dumpId = ReadInt(AH);
2022
2023                 if (te->dumpId > AH->maxDumpId)
2024                         AH->maxDumpId = te->dumpId;
2025
2026                 /* Sanity check */
2027                 if (te->dumpId <= 0)
2028                         die_horribly(AH, modulename,
2029                                            "entry ID %d out of range -- perhaps a corrupt TOC\n",
2030                                                  te->dumpId);
2031
2032                 te->hadDumper = ReadInt(AH);
2033
2034                 if (AH->version >= K_VERS_1_8)
2035                 {
2036                         tmp = ReadStr(AH);
2037                         sscanf(tmp, "%u", &te->catalogId.tableoid);
2038                         free(tmp);
2039                 }
2040                 else
2041                         te->catalogId.tableoid = InvalidOid;
2042                 tmp = ReadStr(AH);
2043                 sscanf(tmp, "%u", &te->catalogId.oid);
2044                 free(tmp);
2045
2046                 te->tag = ReadStr(AH);
2047                 te->desc = ReadStr(AH);
2048
2049                 if (AH->version >= K_VERS_1_11)
2050                 {
2051                         te->section = ReadInt(AH);
2052                 }
2053                 else
2054                 {
2055                         /*
2056                          * rules for pre-8.4 archives wherein pg_dump hasn't classified
2057                          * the entries into sections
2058                          */
2059                         if (strcmp(te->desc, "COMMENT") == 0 ||
2060                                 strcmp(te->desc, "ACL") == 0)
2061                                 te->section = SECTION_NONE;
2062                         else if (strcmp(te->desc, "TABLE DATA") == 0 ||
2063                                          strcmp(te->desc, "BLOBS") == 0 ||
2064                                          strcmp(te->desc, "BLOB COMMENTS") == 0)
2065                                 te->section = SECTION_DATA;
2066                         else if (strcmp(te->desc, "CONSTRAINT") == 0 ||
2067                                          strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
2068                                          strcmp(te->desc, "FK CONSTRAINT") == 0 ||
2069                                          strcmp(te->desc, "INDEX") == 0 ||
2070                                          strcmp(te->desc, "RULE") == 0 ||
2071                                          strcmp(te->desc, "TRIGGER") == 0)
2072                                 te->section = SECTION_POST_DATA;
2073                         else
2074                                 te->section = SECTION_PRE_DATA;
2075                 }
2076
2077                 te->defn = ReadStr(AH);
2078                 te->dropStmt = ReadStr(AH);
2079
2080                 if (AH->version >= K_VERS_1_3)
2081                         te->copyStmt = ReadStr(AH);
2082
2083                 if (AH->version >= K_VERS_1_6)
2084                         te->namespace = ReadStr(AH);
2085
2086                 if (AH->version >= K_VERS_1_10)
2087                         te->tablespace = ReadStr(AH);
2088
2089                 te->owner = ReadStr(AH);
2090                 if (AH->version >= K_VERS_1_9)
2091                 {
2092                         if (strcmp(ReadStr(AH), "true") == 0)
2093                                 te->withOids = true;
2094                         else
2095                                 te->withOids = false;
2096                 }
2097                 else
2098                         te->withOids = true;
2099
2100                 /* Read TOC entry dependencies */
2101                 if (AH->version >= K_VERS_1_5)
2102                 {
2103                         depSize = 100;
2104                         deps = (DumpId *) malloc(sizeof(DumpId) * depSize);
2105                         depIdx = 0;
2106                         for (;;)
2107                         {
2108                                 tmp = ReadStr(AH);
2109                                 if (!tmp)
2110                                         break;          /* end of list */
2111                                 if (depIdx >= depSize)
2112                                 {
2113                                         depSize *= 2;
2114                                         deps = (DumpId *) realloc(deps, sizeof(DumpId) * depSize);
2115                                 }
2116                                 sscanf(tmp, "%d", &deps[depIdx]);
2117                                 free(tmp);
2118                                 depIdx++;
2119                         }
2120
2121                         if (depIdx > 0)         /* We have a non-null entry */
2122                         {
2123                                 deps = (DumpId *) realloc(deps, sizeof(DumpId) * depIdx);
2124                                 te->dependencies = deps;
2125                                 te->nDeps = depIdx;
2126                         }
2127                         else
2128                         {
2129                                 free(deps);
2130                                 te->dependencies = NULL;
2131                                 te->nDeps = 0;
2132                         }
2133                 }
2134                 else
2135                 {
2136                         te->dependencies = NULL;
2137                         te->nDeps = 0;
2138                 }
2139
2140                 if (AH->ReadExtraTocPtr)
2141                         (*AH->ReadExtraTocPtr) (AH, te);
2142
2143                 ahlog(AH, 3, "read TOC entry %d (ID %d) for %s %s\n",
2144                           i, te->dumpId, te->desc, te->tag);
2145
2146                 /* link completed entry into TOC circular list */
2147                 te->prev = AH->toc->prev;
2148                 AH->toc->prev->next = te;
2149                 AH->toc->prev = te;
2150                 te->next = AH->toc;
2151
2152                 /* special processing immediately upon read for some items */
2153                 if (strcmp(te->desc, "ENCODING") == 0)
2154                         processEncodingEntry(AH, te);
2155                 else if (strcmp(te->desc, "STDSTRINGS") == 0)
2156                         processStdStringsEntry(AH, te);
2157         }
2158 }
2159
2160 static void
2161 processEncodingEntry(ArchiveHandle *AH, TocEntry *te)
2162 {
2163         /* te->defn should have the form SET client_encoding = 'foo'; */
2164         char       *defn = strdup(te->defn);
2165         char       *ptr1;
2166         char       *ptr2 = NULL;
2167         int                     encoding;
2168
2169         ptr1 = strchr(defn, '\'');
2170         if (ptr1)
2171                 ptr2 = strchr(++ptr1, '\'');
2172         if (ptr2)
2173         {
2174                 *ptr2 = '\0';
2175                 encoding = pg_char_to_encoding(ptr1);
2176                 if (encoding < 0)
2177                         die_horribly(AH, modulename, "unrecognized encoding \"%s\"\n",
2178                                                  ptr1);
2179                 AH->public.encoding = encoding;
2180         }
2181         else
2182                 die_horribly(AH, modulename, "invalid ENCODING item: %s\n",
2183                                          te->defn);
2184
2185         free(defn);
2186 }
2187
2188 static void
2189 processStdStringsEntry(ArchiveHandle *AH, TocEntry *te)
2190 {
2191         /* te->defn should have the form SET standard_conforming_strings = 'x'; */
2192         char       *ptr1;
2193
2194         ptr1 = strchr(te->defn, '\'');
2195         if (ptr1 && strncmp(ptr1, "'on'", 4) == 0)
2196                 AH->public.std_strings = true;
2197         else if (ptr1 && strncmp(ptr1, "'off'", 5) == 0)
2198                 AH->public.std_strings = false;
2199         else
2200                 die_horribly(AH, modulename, "invalid STDSTRINGS item: %s\n",
2201                                          te->defn);
2202 }
2203
2204 static teReqs
2205 _tocEntryRequired(TocEntry *te, RestoreOptions *ropt, bool include_acls)
2206 {
2207         teReqs          res = REQ_ALL;
2208
2209         /* ENCODING and STDSTRINGS items are dumped specially, so always reject */
2210         if (strcmp(te->desc, "ENCODING") == 0 ||
2211                 strcmp(te->desc, "STDSTRINGS") == 0)
2212                 return 0;
2213
2214         /* If it's an ACL, maybe ignore it */
2215         if ((!include_acls || ropt->aclsSkip) && strcmp(te->desc, "ACL") == 0)
2216                 return 0;
2217
2218         if (!ropt->create && strcmp(te->desc, "DATABASE") == 0)
2219                 return 0;
2220
2221         /* Check options for selective dump/restore */
2222         if (ropt->schemaNames)
2223         {
2224                 /* If no namespace is specified, it means all. */
2225                 if (!te->namespace)
2226                         return 0;
2227                 if (strcmp(ropt->schemaNames, te->namespace) != 0)
2228                         return 0;
2229         }
2230
2231         if (ropt->selTypes)
2232         {
2233                 if (strcmp(te->desc, "TABLE") == 0 ||
2234                         strcmp(te->desc, "TABLE DATA") == 0)
2235                 {
2236                         if (!ropt->selTable)
2237                                 return 0;
2238                         if (ropt->tableNames && strcmp(ropt->tableNames, te->tag) != 0)
2239                                 return 0;
2240                 }
2241                 else if (strcmp(te->desc, "INDEX") == 0)
2242                 {
2243                         if (!ropt->selIndex)
2244                                 return 0;
2245                         if (ropt->indexNames && strcmp(ropt->indexNames, te->tag) != 0)
2246                                 return 0;
2247                 }
2248                 else if (strcmp(te->desc, "FUNCTION") == 0)
2249                 {
2250                         if (!ropt->selFunction)
2251                                 return 0;
2252                         if (ropt->functionNames && strcmp(ropt->functionNames, te->tag) != 0)
2253                                 return 0;
2254                 }
2255                 else if (strcmp(te->desc, "TRIGGER") == 0)
2256                 {
2257                         if (!ropt->selTrigger)
2258                                 return 0;
2259                         if (ropt->triggerNames && strcmp(ropt->triggerNames, te->tag) != 0)
2260                                 return 0;
2261                 }
2262                 else
2263                         return 0;
2264         }
2265
2266         /*
2267          * Check if we had a dataDumper. Indicates if the entry is schema or data
2268          */
2269         if (!te->hadDumper)
2270         {
2271                 /*
2272                  * Special Case: If 'SEQUENCE SET' then it is considered a data entry
2273                  */
2274                 if (strcmp(te->desc, "SEQUENCE SET") == 0)
2275                         res = res & REQ_DATA;
2276                 else
2277                         res = res & ~REQ_DATA;
2278         }
2279
2280         /*
2281          * Special case: <Init> type with <Max OID> tag; this is obsolete and we
2282          * always ignore it.
2283          */
2284         if ((strcmp(te->desc, "<Init>") == 0) && (strcmp(te->tag, "Max OID") == 0))
2285                 return 0;
2286
2287         /* Mask it if we only want schema */
2288         if (ropt->schemaOnly)
2289                 res = res & REQ_SCHEMA;
2290
2291         /* Mask it we only want data */
2292         if (ropt->dataOnly)
2293                 res = res & REQ_DATA;
2294
2295         /* Mask it if we don't have a schema contribution */
2296         if (!te->defn || strlen(te->defn) == 0)
2297                 res = res & ~REQ_SCHEMA;
2298
2299         /* Finally, if there's a per-ID filter, limit based on that as well */
2300         if (ropt->idWanted && !ropt->idWanted[te->dumpId - 1])
2301                 return 0;
2302
2303         return res;
2304 }
2305
2306 /*
2307  * Issue SET commands for parameters that we want to have set the same way
2308  * at all times during execution of a restore script.
2309  */
2310 static void
2311 _doSetFixedOutputState(ArchiveHandle *AH)
2312 {
2313         /* Disable statement_timeout in archive for pg_restore/psql      */
2314         ahprintf(AH, "SET statement_timeout = 0;\n");
2315
2316         /* Select the correct character set encoding */
2317         ahprintf(AH, "SET client_encoding = '%s';\n",
2318                          pg_encoding_to_char(AH->public.encoding));
2319
2320         /* Select the correct string literal syntax */
2321         ahprintf(AH, "SET standard_conforming_strings = %s;\n",
2322                          AH->public.std_strings ? "on" : "off");
2323
2324         /* Select the role to be used during restore */
2325         if (AH->ropt && AH->ropt->use_role)
2326                 ahprintf(AH, "SET ROLE %s;\n", fmtId(AH->ropt->use_role));
2327
2328         /* Make sure function checking is disabled */
2329         ahprintf(AH, "SET check_function_bodies = false;\n");
2330
2331         /* Avoid annoying notices etc */
2332         ahprintf(AH, "SET client_min_messages = warning;\n");
2333         if (!AH->public.std_strings)
2334                 ahprintf(AH, "SET escape_string_warning = off;\n");
2335
2336         ahprintf(AH, "\n");
2337 }
2338
2339 /*
2340  * Issue a SET SESSION AUTHORIZATION command.  Caller is responsible
2341  * for updating state if appropriate.  If user is NULL or an empty string,
2342  * the specification DEFAULT will be used.
2343  */
2344 static void
2345 _doSetSessionAuth(ArchiveHandle *AH, const char *user)
2346 {
2347         PQExpBuffer cmd = createPQExpBuffer();
2348
2349         appendPQExpBuffer(cmd, "SET SESSION AUTHORIZATION ");
2350
2351         /*
2352          * SQL requires a string literal here.  Might as well be correct.
2353          */
2354         if (user && *user)
2355                 appendStringLiteralAHX(cmd, user, AH);
2356         else
2357                 appendPQExpBuffer(cmd, "DEFAULT");
2358         appendPQExpBuffer(cmd, ";");
2359
2360         if (RestoringToDB(AH))
2361         {
2362                 PGresult   *res;
2363
2364                 res = PQexec(AH->connection, cmd->data);
2365
2366                 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
2367                         /* NOT warn_or_die_horribly... use -O instead to skip this. */
2368                         die_horribly(AH, modulename, "could not set session user to \"%s\": %s",
2369                                                  user, PQerrorMessage(AH->connection));
2370
2371                 PQclear(res);
2372         }
2373         else
2374                 ahprintf(AH, "%s\n\n", cmd->data);
2375
2376         destroyPQExpBuffer(cmd);
2377 }
2378
2379
2380 /*
2381  * Issue a SET default_with_oids command.  Caller is responsible
2382  * for updating state if appropriate.
2383  */
2384 static void
2385 _doSetWithOids(ArchiveHandle *AH, const bool withOids)
2386 {
2387         PQExpBuffer cmd = createPQExpBuffer();
2388
2389         appendPQExpBuffer(cmd, "SET default_with_oids = %s;", withOids ?
2390                                           "true" : "false");
2391
2392         if (RestoringToDB(AH))
2393         {
2394                 PGresult   *res;
2395
2396                 res = PQexec(AH->connection, cmd->data);
2397
2398                 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
2399                         warn_or_die_horribly(AH, modulename,
2400                                                                  "could not set default_with_oids: %s",
2401                                                                  PQerrorMessage(AH->connection));
2402
2403                 PQclear(res);
2404         }
2405         else
2406                 ahprintf(AH, "%s\n\n", cmd->data);
2407
2408         destroyPQExpBuffer(cmd);
2409 }
2410
2411
2412 /*
2413  * Issue the commands to connect to the specified database.
2414  *
2415  * If we're currently restoring right into a database, this will
2416  * actually establish a connection. Otherwise it puts a \connect into
2417  * the script output.
2418  *
2419  * NULL dbname implies reconnecting to the current DB (pretty useless).
2420  */
2421 static void
2422 _reconnectToDB(ArchiveHandle *AH, const char *dbname)
2423 {
2424         if (RestoringToDB(AH))
2425                 ReconnectToServer(AH, dbname, NULL);
2426         else
2427         {
2428                 PQExpBuffer qry = createPQExpBuffer();
2429
2430                 appendPQExpBuffer(qry, "\\connect %s\n\n",
2431                                                   dbname ? fmtId(dbname) : "-");
2432                 ahprintf(AH, "%s", qry->data);
2433                 destroyPQExpBuffer(qry);
2434         }
2435
2436         /*
2437          * NOTE: currUser keeps track of what the imaginary session user in our
2438          * script is.  It's now effectively reset to the original userID.
2439          */
2440         if (AH->currUser)
2441                 free(AH->currUser);
2442         AH->currUser = NULL;
2443
2444         /* don't assume we still know the output schema, tablespace, etc either */
2445         if (AH->currSchema)
2446                 free(AH->currSchema);
2447         AH->currSchema = NULL;
2448         if (AH->currTablespace)
2449                 free(AH->currTablespace);
2450         AH->currTablespace = NULL;
2451         AH->currWithOids = -1;
2452
2453         /* re-establish fixed state */
2454         _doSetFixedOutputState(AH);
2455 }
2456
2457 /*
2458  * Become the specified user, and update state to avoid redundant commands
2459  *
2460  * NULL or empty argument is taken to mean restoring the session default
2461  */
2462 static void
2463 _becomeUser(ArchiveHandle *AH, const char *user)
2464 {
2465         if (!user)
2466                 user = "";                              /* avoid null pointers */
2467
2468         if (AH->currUser && strcmp(AH->currUser, user) == 0)
2469                 return;                                 /* no need to do anything */
2470
2471         _doSetSessionAuth(AH, user);
2472
2473         /*
2474          * NOTE: currUser keeps track of what the imaginary session user in our
2475          * script is
2476          */
2477         if (AH->currUser)
2478                 free(AH->currUser);
2479         AH->currUser = strdup(user);
2480 }
2481
2482 /*
2483  * Become the owner of the the given TOC entry object.  If
2484  * changes in ownership are not allowed, this doesn't do anything.
2485  */
2486 static void
2487 _becomeOwner(ArchiveHandle *AH, TocEntry *te)
2488 {
2489         if (AH->ropt && (AH->ropt->noOwner || !AH->ropt->use_setsessauth))
2490                 return;
2491
2492         _becomeUser(AH, te->owner);
2493 }
2494
2495
2496 /*
2497  * Set the proper default_with_oids value for the table.
2498  */
2499 static void
2500 _setWithOids(ArchiveHandle *AH, TocEntry *te)
2501 {
2502         if (AH->currWithOids != te->withOids)
2503         {
2504                 _doSetWithOids(AH, te->withOids);
2505                 AH->currWithOids = te->withOids;
2506         }
2507 }
2508
2509
2510 /*
2511  * Issue the commands to select the specified schema as the current schema
2512  * in the target database.
2513  */
2514 static void
2515 _selectOutputSchema(ArchiveHandle *AH, const char *schemaName)
2516 {
2517         PQExpBuffer qry;
2518
2519         if (!schemaName || *schemaName == '\0' ||
2520                 (AH->currSchema && strcmp(AH->currSchema, schemaName) == 0))
2521                 return;                                 /* no need to do anything */
2522
2523         qry = createPQExpBuffer();
2524
2525         appendPQExpBuffer(qry, "SET search_path = %s",
2526                                           fmtId(schemaName));
2527         if (strcmp(schemaName, "pg_catalog") != 0)
2528                 appendPQExpBuffer(qry, ", pg_catalog");
2529
2530         if (RestoringToDB(AH))
2531         {
2532                 PGresult   *res;
2533
2534                 res = PQexec(AH->connection, qry->data);
2535
2536                 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
2537                         warn_or_die_horribly(AH, modulename,
2538                                                                  "could not set search_path to \"%s\": %s",
2539                                                                  schemaName, PQerrorMessage(AH->connection));
2540
2541                 PQclear(res);
2542         }
2543         else
2544                 ahprintf(AH, "%s;\n\n", qry->data);
2545
2546         if (AH->currSchema)
2547                 free(AH->currSchema);
2548         AH->currSchema = strdup(schemaName);
2549
2550         destroyPQExpBuffer(qry);
2551 }
2552
2553 /*
2554  * Issue the commands to select the specified tablespace as the current one
2555  * in the target database.
2556  */
2557 static void
2558 _selectTablespace(ArchiveHandle *AH, const char *tablespace)
2559 {
2560         PQExpBuffer qry;
2561         const char *want,
2562                            *have;
2563
2564         /* do nothing in --no-tablespaces mode */
2565         if (AH->ropt->noTablespace)
2566                 return;
2567
2568         have = AH->currTablespace;
2569         want = tablespace;
2570
2571         /* no need to do anything for non-tablespace object */
2572         if (!want)
2573                 return;
2574
2575         if (have && strcmp(want, have) == 0)
2576                 return;                                 /* no need to do anything */
2577
2578         qry = createPQExpBuffer();
2579
2580         if (strcmp(want, "") == 0)
2581         {
2582                 /* We want the tablespace to be the database's default */
2583                 appendPQExpBuffer(qry, "SET default_tablespace = ''");
2584         }
2585         else
2586         {
2587                 /* We want an explicit tablespace */
2588                 appendPQExpBuffer(qry, "SET default_tablespace = %s", fmtId(want));
2589         }
2590
2591         if (RestoringToDB(AH))
2592         {
2593                 PGresult   *res;
2594
2595                 res = PQexec(AH->connection, qry->data);
2596
2597                 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
2598                         warn_or_die_horribly(AH, modulename,
2599                                                                  "could not set default_tablespace to %s: %s",
2600                                                                  fmtId(want), PQerrorMessage(AH->connection));
2601
2602                 PQclear(res);
2603         }
2604         else
2605                 ahprintf(AH, "%s;\n\n", qry->data);
2606
2607         if (AH->currTablespace)
2608                 free(AH->currTablespace);
2609         AH->currTablespace = strdup(want);
2610
2611         destroyPQExpBuffer(qry);
2612 }
2613
2614 /*
2615  * Extract an object description for a TOC entry, and append it to buf.
2616  *
2617  * This is not quite as general as it may seem, since it really only
2618  * handles constructing the right thing to put into ALTER ... OWNER TO.
2619  *
2620  * The whole thing is pretty grotty, but we are kind of stuck since the
2621  * information used is all that's available in older dump files.
2622  */
2623 static void
2624 _getObjectDescription(PQExpBuffer buf, TocEntry *te, ArchiveHandle *AH)
2625 {
2626         const char *type = te->desc;
2627
2628         /* Use ALTER TABLE for views and sequences */
2629         if (strcmp(type, "VIEW") == 0 || strcmp(type, "SEQUENCE") == 0)
2630                 type = "TABLE";
2631
2632         /* objects named by a schema and name */
2633         if (strcmp(type, "CONVERSION") == 0 ||
2634                 strcmp(type, "DOMAIN") == 0 ||
2635                 strcmp(type, "TABLE") == 0 ||
2636                 strcmp(type, "TYPE") == 0 ||
2637                 strcmp(type, "TEXT SEARCH DICTIONARY") == 0 ||
2638                 strcmp(type, "TEXT SEARCH CONFIGURATION") == 0)
2639         {
2640                 appendPQExpBuffer(buf, "%s ", type);
2641                 if (te->namespace && te->namespace[0])  /* is null pre-7.3 */
2642                         appendPQExpBuffer(buf, "%s.", fmtId(te->namespace));
2643
2644                 /*
2645                  * Pre-7.3 pg_dump would sometimes (not always) put a fmtId'd name
2646                  * into te->tag for an index. This check is heuristic, so make its
2647                  * scope as narrow as possible.
2648                  */
2649                 if (AH->version < K_VERS_1_7 &&
2650                         te->tag[0] == '"' &&
2651                         te->tag[strlen(te->tag) - 1] == '"' &&
2652                         strcmp(type, "INDEX") == 0)
2653                         appendPQExpBuffer(buf, "%s", te->tag);
2654                 else
2655                         appendPQExpBuffer(buf, "%s", fmtId(te->tag));
2656                 return;
2657         }
2658
2659         /* objects named by just a name */
2660         if (strcmp(type, "DATABASE") == 0 ||
2661                 strcmp(type, "PROCEDURAL LANGUAGE") == 0 ||
2662                 strcmp(type, "SCHEMA") == 0 ||
2663                 strcmp(type, "FOREIGN DATA WRAPPER") == 0 ||
2664                 strcmp(type, "SERVER") == 0 ||
2665                 strcmp(type, "USER MAPPING") == 0)
2666         {
2667                 appendPQExpBuffer(buf, "%s %s", type, fmtId(te->tag));
2668                 return;
2669         }
2670
2671         /*
2672          * These object types require additional decoration.  Fortunately, the
2673          * information needed is exactly what's in the DROP command.
2674          */
2675         if (strcmp(type, "AGGREGATE") == 0 ||
2676                 strcmp(type, "FUNCTION") == 0 ||
2677                 strcmp(type, "OPERATOR") == 0 ||
2678                 strcmp(type, "OPERATOR CLASS") == 0 ||
2679                 strcmp(type, "OPERATOR FAMILY") == 0)
2680         {
2681                 /* Chop "DROP " off the front and make a modifiable copy */
2682                 char       *first = strdup(te->dropStmt + 5);
2683                 char       *last;
2684
2685                 /* point to last character in string */
2686                 last = first + strlen(first) - 1;
2687
2688                 /* Strip off any ';' or '\n' at the end */
2689                 while (last >= first && (*last == '\n' || *last == ';'))
2690                         last--;
2691                 *(last + 1) = '\0';
2692
2693                 appendPQExpBufferStr(buf, first);
2694
2695                 free(first);
2696                 return;
2697         }
2698
2699         write_msg(modulename, "WARNING: don't know how to set owner for object type %s\n",
2700                           type);
2701 }
2702
2703 static void
2704 _printTocEntry(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt, bool isData, bool acl_pass)
2705 {
2706         /* ACLs are dumped only during acl pass */
2707         if (acl_pass)
2708         {
2709                 if (strcmp(te->desc, "ACL") != 0)
2710                         return;
2711         }
2712         else
2713         {
2714                 if (strcmp(te->desc, "ACL") == 0)
2715                         return;
2716         }
2717
2718         /*
2719          * Avoid dumping the public schema, as it will already be created ...
2720          * unless we are using --clean mode, in which case it's been deleted and
2721          * we'd better recreate it.  Likewise for its comment, if any.
2722          */
2723         if (!ropt->dropSchema)
2724         {
2725                 if (strcmp(te->desc, "SCHEMA") == 0 &&
2726                         strcmp(te->tag, "public") == 0)
2727                         return;
2728                 /* The comment restore would require super-user privs, so avoid it. */
2729                 if (strcmp(te->desc, "COMMENT") == 0 &&
2730                         strcmp(te->tag, "SCHEMA public") == 0)
2731                         return;
2732         }
2733
2734         /* Select owner, schema, and tablespace as necessary */
2735         _becomeOwner(AH, te);
2736         _selectOutputSchema(AH, te->namespace);
2737         _selectTablespace(AH, te->tablespace);
2738
2739         /* Set up OID mode too */
2740         if (strcmp(te->desc, "TABLE") == 0)
2741                 _setWithOids(AH, te);
2742
2743         /* Emit header comment for item */
2744         if (!AH->noTocComments)
2745         {
2746                 const char *pfx;
2747
2748                 if (isData)
2749                         pfx = "Data for ";
2750                 else
2751                         pfx = "";
2752
2753                 ahprintf(AH, "--\n");
2754                 if (AH->public.verbose)
2755                 {
2756                         ahprintf(AH, "-- TOC entry %d (class %u OID %u)\n",
2757                                          te->dumpId, te->catalogId.tableoid, te->catalogId.oid);
2758                         if (te->nDeps > 0)
2759                         {
2760                                 int                     i;
2761
2762                                 ahprintf(AH, "-- Dependencies:");
2763                                 for (i = 0; i < te->nDeps; i++)
2764                                         ahprintf(AH, " %d", te->dependencies[i]);
2765                                 ahprintf(AH, "\n");
2766                         }
2767                 }
2768                 ahprintf(AH, "-- %sName: %s; Type: %s; Schema: %s; Owner: %s",
2769                                  pfx, te->tag, te->desc,
2770                                  te->namespace ? te->namespace : "-",
2771                                  ropt->noOwner ? "-" : te->owner);
2772                 if (te->tablespace && !ropt->noTablespace)
2773                         ahprintf(AH, "; Tablespace: %s", te->tablespace);
2774                 ahprintf(AH, "\n");
2775
2776                 if (AH->PrintExtraTocPtr !=NULL)
2777                         (*AH->PrintExtraTocPtr) (AH, te);
2778                 ahprintf(AH, "--\n\n");
2779         }
2780
2781         /*
2782          * Actually print the definition.
2783          *
2784          * Really crude hack for suppressing AUTHORIZATION clause that old pg_dump
2785          * versions put into CREATE SCHEMA.  We have to do this when --no-owner
2786          * mode is selected.  This is ugly, but I see no other good way ...
2787          */
2788         if (ropt->noOwner && strcmp(te->desc, "SCHEMA") == 0)
2789         {
2790                 ahprintf(AH, "CREATE SCHEMA %s;\n\n\n", fmtId(te->tag));
2791         }
2792         else
2793         {
2794                 if (strlen(te->defn) > 0)
2795                         ahprintf(AH, "%s\n\n", te->defn);
2796         }
2797
2798         /*
2799          * If we aren't using SET SESSION AUTH to determine ownership, we must
2800          * instead issue an ALTER OWNER command.  We assume that anything without
2801          * a DROP command is not a separately ownable object.  All the categories
2802          * with DROP commands must appear in one list or the other.
2803          */
2804         if (!ropt->noOwner && !ropt->use_setsessauth &&
2805                 strlen(te->owner) > 0 && strlen(te->dropStmt) > 0)
2806         {
2807                 if (strcmp(te->desc, "AGGREGATE") == 0 ||
2808                         strcmp(te->desc, "CONVERSION") == 0 ||
2809                         strcmp(te->desc, "DATABASE") == 0 ||
2810                         strcmp(te->desc, "DOMAIN") == 0 ||
2811                         strcmp(te->desc, "FUNCTION") == 0 ||
2812                         strcmp(te->desc, "OPERATOR") == 0 ||
2813                         strcmp(te->desc, "OPERATOR CLASS") == 0 ||
2814                         strcmp(te->desc, "OPERATOR FAMILY") == 0 ||
2815                         strcmp(te->desc, "PROCEDURAL LANGUAGE") == 0 ||
2816                         strcmp(te->desc, "SCHEMA") == 0 ||
2817                         strcmp(te->desc, "TABLE") == 0 ||
2818                         strcmp(te->desc, "TYPE") == 0 ||
2819                         strcmp(te->desc, "VIEW") == 0 ||
2820                         strcmp(te->desc, "SEQUENCE") == 0 ||
2821                         strcmp(te->desc, "TEXT SEARCH DICTIONARY") == 0 ||
2822                         strcmp(te->desc, "TEXT SEARCH CONFIGURATION") == 0 ||
2823                         strcmp(te->desc, "FOREIGN DATA WRAPPER") == 0 ||
2824                         strcmp(te->desc, "SERVER") == 0)
2825                 {
2826                         PQExpBuffer temp = createPQExpBuffer();
2827
2828                         appendPQExpBuffer(temp, "ALTER ");
2829                         _getObjectDescription(temp, te, AH);
2830                         appendPQExpBuffer(temp, " OWNER TO %s;", fmtId(te->owner));
2831                         ahprintf(AH, "%s\n\n", temp->data);
2832                         destroyPQExpBuffer(temp);
2833                 }
2834                 else if (strcmp(te->desc, "CAST") == 0 ||
2835                                  strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
2836                                  strcmp(te->desc, "CONSTRAINT") == 0 ||
2837                                  strcmp(te->desc, "DEFAULT") == 0 ||
2838                                  strcmp(te->desc, "FK CONSTRAINT") == 0 ||
2839                                  strcmp(te->desc, "INDEX") == 0 ||
2840                                  strcmp(te->desc, "RULE") == 0 ||
2841                                  strcmp(te->desc, "TRIGGER") == 0 ||
2842                                  strcmp(te->desc, "USER MAPPING") == 0)
2843                 {
2844                         /* these object types don't have separate owners */
2845                 }
2846                 else
2847                 {
2848                         write_msg(modulename, "WARNING: don't know how to set owner for object type %s\n",
2849                                           te->desc);
2850                 }
2851         }
2852
2853         /*
2854          * If it's an ACL entry, it might contain SET SESSION AUTHORIZATION
2855          * commands, so we can no longer assume we know the current auth setting.
2856          */
2857         if (strncmp(te->desc, "ACL", 3) == 0)
2858         {
2859                 if (AH->currUser)
2860                         free(AH->currUser);
2861                 AH->currUser = NULL;
2862         }
2863 }
2864
2865 void
2866 WriteHead(ArchiveHandle *AH)
2867 {
2868         struct tm       crtm;
2869
2870         (*AH->WriteBufPtr) (AH, "PGDMP", 5);            /* Magic code */
2871         (*AH->WriteBytePtr) (AH, AH->vmaj);
2872         (*AH->WriteBytePtr) (AH, AH->vmin);
2873         (*AH->WriteBytePtr) (AH, AH->vrev);
2874         (*AH->WriteBytePtr) (AH, AH->intSize);
2875         (*AH->WriteBytePtr) (AH, AH->offSize);
2876         (*AH->WriteBytePtr) (AH, AH->format);
2877
2878 #ifndef HAVE_LIBZ
2879         if (AH->compression != 0)
2880                 write_msg(modulename, "WARNING: requested compression not available in this "
2881                                   "installation -- archive will be uncompressed\n");
2882
2883         AH->compression = 0;
2884 #endif
2885
2886         WriteInt(AH, AH->compression);
2887
2888         crtm = *localtime(&AH->createDate);
2889         WriteInt(AH, crtm.tm_sec);
2890         WriteInt(AH, crtm.tm_min);
2891         WriteInt(AH, crtm.tm_hour);
2892         WriteInt(AH, crtm.tm_mday);
2893         WriteInt(AH, crtm.tm_mon);
2894         WriteInt(AH, crtm.tm_year);
2895         WriteInt(AH, crtm.tm_isdst);
2896         WriteStr(AH, PQdb(AH->connection));
2897         WriteStr(AH, AH->public.remoteVersionStr);
2898         WriteStr(AH, PG_VERSION);
2899 }
2900
2901 void
2902 ReadHead(ArchiveHandle *AH)
2903 {
2904         char            tmpMag[7];
2905         int                     fmt;
2906         struct tm       crtm;
2907
2908         /* If we haven't already read the header... */
2909         if (!AH->readHeader)
2910         {
2911                 if ((*AH->ReadBufPtr) (AH, tmpMag, 5) != 5)
2912                         die_horribly(AH, modulename, "unexpected end of file\n");
2913
2914                 if (strncmp(tmpMag, "PGDMP", 5) != 0)
2915                         die_horribly(AH, modulename, "did not find magic string in file header\n");
2916
2917                 AH->vmaj = (*AH->ReadBytePtr) (AH);
2918                 AH->vmin = (*AH->ReadBytePtr) (AH);
2919
2920                 if (AH->vmaj > 1 || ((AH->vmaj == 1) && (AH->vmin > 0)))                /* Version > 1.0 */
2921                         AH->vrev = (*AH->ReadBytePtr) (AH);
2922                 else
2923                         AH->vrev = 0;
2924
2925                 AH->version = ((AH->vmaj * 256 + AH->vmin) * 256 + AH->vrev) * 256 + 0;
2926
2927
2928                 if (AH->version < K_VERS_1_0 || AH->version > K_VERS_MAX)
2929                         die_horribly(AH, modulename, "unsupported version (%d.%d) in file header\n",
2930                                                  AH->vmaj, AH->vmin);
2931
2932                 AH->intSize = (*AH->ReadBytePtr) (AH);
2933                 if (AH->intSize > 32)
2934                         die_horribly(AH, modulename, "sanity check on integer size (%lu) failed\n",
2935                                                  (unsigned long) AH->intSize);
2936
2937                 if (AH->intSize > sizeof(int))
2938                         write_msg(modulename, "WARNING: archive was made on a machine with larger integers, some operations might fail\n");
2939
2940                 if (AH->version >= K_VERS_1_7)
2941                         AH->offSize = (*AH->ReadBytePtr) (AH);
2942                 else
2943                         AH->offSize = AH->intSize;
2944
2945                 fmt = (*AH->ReadBytePtr) (AH);
2946
2947                 if (AH->format != fmt)
2948                         die_horribly(AH, modulename, "expected format (%d) differs from format found in file (%d)\n",
2949                                                  AH->format, fmt);
2950         }
2951
2952         if (AH->version >= K_VERS_1_2)
2953         {
2954                 if (AH->version < K_VERS_1_4)
2955                         AH->compression = (*AH->ReadBytePtr) (AH);
2956                 else
2957                         AH->compression = ReadInt(AH);
2958         }
2959         else
2960                 AH->compression = Z_DEFAULT_COMPRESSION;
2961
2962 #ifndef HAVE_LIBZ
2963         if (AH->compression != 0)
2964                 write_msg(modulename, "WARNING: archive is compressed, but this installation does not support compression -- no data will be available\n");
2965 #endif
2966
2967         if (AH->version >= K_VERS_1_4)
2968         {
2969                 crtm.tm_sec = ReadInt(AH);
2970                 crtm.tm_min = ReadInt(AH);
2971                 crtm.tm_hour = ReadInt(AH);
2972                 crtm.tm_mday = ReadInt(AH);
2973                 crtm.tm_mon = ReadInt(AH);
2974                 crtm.tm_year = ReadInt(AH);
2975                 crtm.tm_isdst = ReadInt(AH);
2976
2977                 AH->archdbname = ReadStr(AH);
2978
2979                 AH->createDate = mktime(&crtm);
2980
2981                 if (AH->createDate == (time_t) -1)
2982                         write_msg(modulename, "WARNING: invalid creation date in header\n");
2983         }
2984
2985         if (AH->version >= K_VERS_1_10)
2986         {
2987                 AH->archiveRemoteVersion = ReadStr(AH);
2988                 AH->archiveDumpVersion = ReadStr(AH);
2989         }
2990
2991 }
2992
2993
2994 /*
2995  * checkSeek
2996  *        check to see if fseek can be performed.
2997  */
2998 bool
2999 checkSeek(FILE *fp)
3000 {
3001         if (fseeko(fp, 0, SEEK_CUR) != 0)
3002                 return false;
3003         else if (sizeof(pgoff_t) > sizeof(long))
3004         {
3005                 /*
3006                  * At this point, pgoff_t is too large for long, so we return based on
3007                  * whether an pgoff_t version of fseek is available.
3008                  */
3009 #ifdef HAVE_FSEEKO
3010                 return true;
3011 #else
3012                 return false;
3013 #endif
3014         }
3015         else
3016                 return true;
3017 }
3018
3019
3020 /*
3021  * dumpTimestamp
3022  */
3023 static void
3024 dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim)
3025 {
3026         char            buf[256];
3027
3028         /*
3029          * We don't print the timezone on Win32, because the names are long and
3030          * localized, which means they may contain characters in various random
3031          * encodings; this has been seen to cause encoding errors when reading the
3032          * dump script.
3033          */
3034         if (strftime(buf, sizeof(buf),
3035 #ifndef WIN32
3036                                  "%Y-%m-%d %H:%M:%S %Z",
3037 #else
3038                                  "%Y-%m-%d %H:%M:%S",
3039 #endif
3040                                  localtime(&tim)) != 0)
3041                 ahprintf(AH, "-- %s %s\n\n", msg, buf);
3042 }
3043
3044
3045 /*
3046  * Main engine for parallel restore.
3047  *
3048  * Work is done in three phases.
3049  * First we process tocEntries until we come to one that is marked
3050  * SECTION_DATA or SECTION_POST_DATA, in a single connection, just as for a
3051  * standard restore.  Second we process the remaining non-ACL steps in
3052  * parallel worker children (threads on Windows, processes on Unix), each of
3053  * which connects separately to the database.  Finally we process all the ACL
3054  * entries in a single connection (that happens back in RestoreArchive).
3055  */
3056 static void
3057 restore_toc_entries_parallel(ArchiveHandle *AH)
3058 {
3059         RestoreOptions *ropt = AH->ropt;
3060         int                     n_slots = ropt->number_of_threads;
3061         ParallelSlot *slots;
3062         int                     work_status;
3063         int                     next_slot;
3064         TocEntry   *first_unprocessed = AH->toc->next;
3065         TocEntry   *next_work_item;
3066         thandle         ret_child;
3067         TocEntry   *te;
3068
3069         ahlog(AH,2,"entering restore_toc_entries_parallel\n");
3070
3071         /* we haven't got round to making this work for all archive formats */
3072         if (AH->ClonePtr == NULL || AH->ReopenPtr == NULL)
3073                 die_horribly(AH, modulename, "parallel restore is not supported with this archive file format\n");
3074
3075         slots = (ParallelSlot *) calloc(sizeof(ParallelSlot), n_slots);
3076
3077         /* Adjust dependency information */
3078         fix_dependencies(AH);
3079
3080         /*
3081          * Do all the early stuff in a single connection in the parent.
3082          * There's no great point in running it in parallel, in fact it will
3083          * actually run faster in a single connection because we avoid all the
3084          * connection and setup overhead.
3085          */
3086         while ((next_work_item = get_next_work_item(AH, &first_unprocessed,
3087                                                                                                 NULL, 0)) != NULL)
3088         {
3089                 if (next_work_item->section == SECTION_DATA ||
3090                         next_work_item->section == SECTION_POST_DATA)
3091                         break;
3092
3093                 ahlog(AH, 1, "processing item %d %s %s\n",
3094                           next_work_item->dumpId,
3095                           next_work_item->desc, next_work_item->tag);
3096
3097                 (void) restore_toc_entry(AH, next_work_item, ropt, false);
3098
3099                 next_work_item->restored = true;
3100                 reduce_dependencies(AH, next_work_item);
3101         }
3102
3103         /*
3104          * Now close parent connection in prep for parallel steps.  We do this
3105          * mainly to ensure that we don't exceed the specified number of parallel
3106          * connections.
3107          */
3108         PQfinish(AH->connection);
3109         AH->connection = NULL;
3110
3111         /* blow away any transient state from the old connection */
3112         if (AH->currUser)
3113                 free(AH->currUser);
3114         AH->currUser = NULL;
3115         if (AH->currSchema)
3116                 free(AH->currSchema);
3117         AH->currSchema = NULL;
3118         if (AH->currTablespace)
3119                 free(AH->currTablespace);
3120         AH->currTablespace = NULL;
3121         AH->currWithOids = -1;
3122
3123         /*
3124          * main parent loop
3125          *
3126          * Keep going until there is no worker still running AND there is no work
3127          * left to be done.
3128          */
3129
3130         ahlog(AH,1,"entering main parallel loop\n");
3131
3132         while ((next_work_item = get_next_work_item(AH, &first_unprocessed,
3133                                                                                                 slots, n_slots)) != NULL ||
3134                    work_in_progress(slots, n_slots))
3135         {
3136                 if (next_work_item != NULL)
3137                 {
3138                         teReqs    reqs;
3139
3140                         /* If not to be dumped, don't waste time launching a worker */
3141                         reqs = _tocEntryRequired(next_work_item, AH->ropt, false);
3142                         if ((reqs & (REQ_SCHEMA | REQ_DATA)) == 0)
3143                         {
3144                                 ahlog(AH, 1, "skipping item %d %s %s\n",
3145                                           next_work_item->dumpId,
3146                                           next_work_item->desc, next_work_item->tag);
3147
3148                                 next_work_item->restored = true;
3149                                 reduce_dependencies(AH, next_work_item);
3150
3151                                 continue;
3152                         }
3153
3154                         if ((next_slot = get_next_slot(slots, n_slots)) != NO_SLOT)
3155                         {
3156                                 /* There is work still to do and a worker slot available */
3157                                 thandle child;
3158                                 RestoreArgs *args;
3159
3160                                 ahlog(AH, 1, "launching item %d %s %s\n",
3161                                           next_work_item->dumpId,
3162                                           next_work_item->desc, next_work_item->tag);
3163
3164                                 next_work_item->restored = true;
3165
3166                                 /* this memory is dealloced in mark_work_done() */
3167                                 args = malloc(sizeof(RestoreArgs));
3168                                 args->AH = CloneArchive(AH);
3169                                 args->te = next_work_item;
3170
3171                                 /* run the step in a worker child */
3172                                 child = spawn_restore(args);
3173
3174                                 slots[next_slot].child_id = child;
3175                                 slots[next_slot].args = args;
3176
3177                                 continue;
3178                         }
3179                 }
3180
3181                 /*
3182                  * If we get here there must be work being done.  Either there is no
3183                  * work available to schedule (and work_in_progress returned true) or
3184                  * there are no slots available.  So we wait for a worker to finish,
3185                  * and process the result.
3186                  */
3187                 ret_child = reap_child(slots, n_slots, &work_status);
3188
3189                 if (WIFEXITED(work_status))
3190                 {
3191                         mark_work_done(AH, ret_child, WEXITSTATUS(work_status),
3192                                                    slots, n_slots);
3193                 }
3194                 else
3195                 {
3196                         die_horribly(AH, modulename, "worker process crashed: status %d\n",
3197                                                  work_status);
3198                 }
3199         }
3200
3201         ahlog(AH,1,"finished main parallel loop\n");
3202
3203         /*
3204          * Now reconnect the single parent connection.
3205          */
3206         ConnectDatabase((Archive *) AH, ropt->dbname,
3207                                         ropt->pghost, ropt->pgport, ropt->username,
3208                                         ropt->requirePassword);
3209
3210         _doSetFixedOutputState(AH);
3211
3212         /*
3213          * Make sure there is no non-ACL work left due to, say,
3214          * circular dependencies, or some other pathological condition.
3215          * If so, do it in the single parent connection.
3216          */
3217         for (te = AH->toc->next; te != AH->toc; te = te->next)
3218         {
3219                 if (!te->restored)
3220                 {
3221                         ahlog(AH, 1, "processing missed item %d %s %s\n",
3222                                   te->dumpId, te->desc, te->tag);
3223                         (void) restore_toc_entry(AH, te, ropt, false);
3224                 }
3225         }
3226
3227         /* The ACLs will be handled back in RestoreArchive. */
3228 }
3229
3230 /*
3231  * create a worker child to perform a restore step in parallel
3232  */
3233 static thandle
3234 spawn_restore(RestoreArgs *args)
3235 {
3236         thandle child;
3237
3238         /* Ensure stdio state is quiesced before forking */
3239         fflush(NULL);
3240
3241 #ifndef WIN32
3242         child = fork();
3243         if (child == 0)
3244         {
3245                 /* in child process */
3246                 parallel_restore(args);
3247                 die_horribly(args->AH, modulename,
3248                                          "parallel_restore should not return\n");
3249         }
3250         else if (child < 0)
3251         {
3252                 /* fork failed */
3253                 die_horribly(args->AH, modulename,
3254                                          "could not create worker process: %s\n",
3255                                          strerror(errno));
3256         }
3257 #else
3258         child = (HANDLE) _beginthreadex(NULL, 0, (void *) parallel_restore,
3259                                                                         args, 0, NULL);
3260         if (child == 0)
3261                 die_horribly(args->AH, modulename,
3262                                          "could not create worker thread: %s\n",
3263                                          strerror(errno));
3264 #endif
3265
3266         return child;
3267 }
3268
3269 /*
3270  *  collect status from a completed worker child
3271  */
3272 static thandle
3273 reap_child(ParallelSlot *slots, int n_slots, int *work_status)
3274 {
3275 #ifndef WIN32
3276         /* Unix is so much easier ... */
3277         return wait(work_status);
3278 #else
3279         static HANDLE *handles = NULL;
3280         int hindex, snum, tnum;
3281         thandle ret_child;
3282         DWORD res;
3283
3284         /* first time around only, make space for handles to listen on */
3285         if (handles == NULL)
3286                 handles = (HANDLE *) calloc(sizeof(HANDLE),n_slots);
3287
3288         /* set up list of handles to listen to */
3289         for (snum=0, tnum=0; snum < n_slots; snum++)
3290                 if (slots[snum].child_id != 0)
3291                         handles[tnum++] = slots[snum].child_id;
3292
3293         /* wait for one to finish */
3294         hindex = WaitForMultipleObjects(tnum, handles, false, INFINITE);
3295
3296         /* get handle of finished thread */
3297         ret_child = handles[hindex - WAIT_OBJECT_0];
3298
3299         /* get the result */
3300         GetExitCodeThread(ret_child,&res);
3301         *work_status = res;
3302
3303         /* dispose of handle to stop leaks */
3304         CloseHandle(ret_child);
3305
3306         return ret_child;
3307 #endif
3308 }
3309
3310 /*
3311  * are we doing anything now?
3312  */
3313 static bool
3314 work_in_progress(ParallelSlot *slots, int n_slots)
3315 {
3316         int i;
3317
3318         for (i = 0; i < n_slots; i++)
3319         {
3320                 if (slots[i].child_id != 0)
3321                         return true;
3322         }
3323         return false;
3324 }
3325
3326 /*
3327  * find the first free parallel slot (if any).
3328  */
3329 static int
3330 get_next_slot(ParallelSlot *slots, int n_slots)
3331 {
3332         int i;
3333
3334         for (i = 0; i < n_slots; i++)
3335         {
3336                 if (slots[i].child_id == 0)
3337                         return i;
3338         }
3339         return NO_SLOT;
3340 }
3341
3342 /*
3343  * Find the next work item (if any) that is capable of being run now.
3344  *
3345  * To qualify, the item must have no remaining dependencies
3346  * and no requirement for locks that is incompatible with
3347  * items currently running.
3348  *
3349  * first_unprocessed is state data that tracks the location of the first
3350  * TocEntry that's not marked 'restored'.  This avoids O(N^2) search time
3351  * with long TOC lists.  (Even though the constant is pretty small, it'd
3352  * get us eventually.)
3353  *
3354  * pref_non_data is for an alternative selection algorithm that gives
3355  * preference to non-data items if there is already a data load running.
3356  * It is currently disabled.
3357  */
3358 static TocEntry *
3359 get_next_work_item(ArchiveHandle *AH, TocEntry **first_unprocessed,
3360                                    ParallelSlot *slots, int n_slots)
3361 {
3362         bool      pref_non_data = false; /* or get from AH->ropt */
3363         TocEntry *data_te = NULL;
3364         TocEntry *te;
3365         int       i,j,k;
3366
3367         /*
3368          * Bogus heuristics for pref_non_data
3369          */
3370         if (pref_non_data)
3371         {
3372                 int count = 0;
3373
3374                 for (k=0; k < n_slots; k++)
3375                         if (slots[k].args->te != NULL &&
3376                                 slots[k].args->te->section == SECTION_DATA)
3377                                 count++;
3378                 if (n_slots == 0 || count * 4 < n_slots)
3379                         pref_non_data = false;
3380         }
3381
3382         /*
3383          * Advance first_unprocessed if possible.
3384          */
3385         for (te = *first_unprocessed; te != AH->toc; te = te->next)
3386         {
3387                 if (!te->restored)
3388                         break;
3389         }
3390         *first_unprocessed = te;
3391
3392         /*
3393          * Search from first_unprocessed until we find an available item.
3394          */
3395         for (; te != AH->toc; te = te->next)
3396         {
3397                 bool conflicts = false;
3398
3399                 /* Ignore if already done or still waiting on dependencies */
3400                 if (te->restored || te->depCount > 0)
3401                         continue;
3402
3403                 /*
3404                  * Check to see if the item would need exclusive lock on something
3405                  * that a currently running item also needs lock on.  If so, we
3406                  * don't want to schedule them together.
3407                  */
3408                 for (i = 0; i < n_slots && !conflicts; i++)
3409                 {
3410                         TocEntry *running_te;
3411
3412                         if (slots[i].args == NULL)
3413                                 continue;
3414                         running_te = slots[i].args->te;
3415                         for (j = 0; j < te->nLockDeps && !conflicts; j++)
3416                         {
3417                                 for (k = 0; k < running_te->nLockDeps; k++)
3418                                 {
3419                                         if (te->lockDeps[j] == running_te->lockDeps[k])
3420                                         {
3421                                                 conflicts = true;
3422                                                 break;
3423                                         }
3424                                 }
3425                         }
3426                 }
3427
3428                 if (conflicts)
3429                         continue;
3430
3431                 if (pref_non_data && te->section == SECTION_DATA)
3432                 {
3433                         if (data_te == NULL)
3434                                 data_te = te;
3435                         continue;
3436                 }
3437
3438                 /* passed all tests, so this item can run */
3439                 return te;
3440         }
3441
3442         if (data_te != NULL)
3443                 return data_te;
3444
3445         ahlog(AH,2,"no item ready\n");
3446         return NULL;
3447 }
3448
3449
3450 /*
3451  * Restore a single TOC item in parallel with others
3452  *
3453  * this is the procedure run as a thread (Windows) or a
3454  * separate process (everything else).
3455  */
3456 static parallel_restore_result
3457 parallel_restore(RestoreArgs *args)
3458 {
3459         ArchiveHandle *AH = args->AH;
3460         TocEntry *te = args->te;
3461         RestoreOptions *ropt = AH->ropt;
3462         int retval;
3463
3464         /*
3465          * Close and reopen the input file so we have a private file pointer
3466          * that doesn't stomp on anyone else's file pointer.
3467          *
3468          * Note: on Windows, since we are using threads not processes, this
3469          * *doesn't* close the original file pointer but just open a new one.
3470          */
3471         (AH->ReopenPtr) (AH);
3472
3473         /*
3474          * We need our own database connection, too
3475          */
3476         ConnectDatabase((Archive *) AH, ropt->dbname,
3477                                         ropt->pghost, ropt->pgport, ropt->username,
3478                                         ropt->requirePassword);
3479
3480         _doSetFixedOutputState(AH);
3481
3482         /* Restore the TOC item */
3483         retval = restore_toc_entry(AH, te, ropt, true);
3484
3485         /* And clean up */
3486         PQfinish(AH->connection);
3487         AH->connection = NULL;
3488
3489         (AH->ClosePtr) (AH);
3490
3491         if (retval == 0 && AH->public.n_errors)
3492                 retval = WORKER_IGNORED_ERRORS;
3493
3494 #ifndef WIN32
3495         exit(retval);
3496 #else
3497         return retval;
3498 #endif
3499 }
3500
3501
3502 /*
3503  * Housekeeping to be done after a step has been parallel restored.
3504  *
3505  * Clear the appropriate slot, free all the extra memory we allocated,
3506  * update status, and reduce the dependency count of any dependent items.
3507  */
3508 static void
3509 mark_work_done(ArchiveHandle *AH, thandle worker, int status,
3510                            ParallelSlot *slots, int n_slots)
3511 {
3512         TocEntry *te = NULL;
3513         int i;
3514
3515         for (i = 0; i < n_slots; i++)
3516         {
3517                 if (slots[i].child_id == worker)
3518                 {
3519                         slots[i].child_id = 0;
3520                         te = slots[i].args->te;
3521                         DeCloneArchive(slots[i].args->AH);
3522                         free(slots[i].args);
3523                         slots[i].args = NULL;
3524
3525                         break;
3526                 }
3527         }
3528
3529         if (te == NULL)
3530                 die_horribly(AH, modulename, "failed to find slot for finished worker\n");
3531
3532         ahlog(AH, 1, "finished item %d %s %s\n",
3533                   te->dumpId, te->desc, te->tag);
3534
3535         if (status == WORKER_CREATE_DONE)
3536                 mark_create_done(AH, te);
3537         else if (status == WORKER_INHIBIT_DATA)
3538         {
3539                 inhibit_data_for_failed_table(AH, te);
3540                 AH->public.n_errors++;
3541         }
3542         else if (status == WORKER_IGNORED_ERRORS)
3543                 AH->public.n_errors++;
3544         else if (status != 0)
3545                 die_horribly(AH, modulename, "worker process failed: exit code %d\n",
3546                                          status);
3547
3548         reduce_dependencies(AH, te);
3549 }
3550
3551
3552 /*
3553  * Process the dependency information into a form useful for parallel restore.
3554  *
3555  * We set up depCount fields that are the number of as-yet-unprocessed
3556  * dependencies for each TOC entry.
3557  *
3558  * We also identify locking dependencies so that we can avoid trying to
3559  * schedule conflicting items at the same time.
3560  */
3561 static void
3562 fix_dependencies(ArchiveHandle *AH)
3563 {
3564         TocEntry  **tocsByDumpId;
3565         TocEntry   *te;
3566         int i;
3567
3568         /*
3569          * For some of the steps here, it is convenient to have an array that
3570          * indexes the TOC entries by dump ID, rather than searching the TOC
3571          * list repeatedly.  Entries for dump IDs not present in the TOC will
3572          * be NULL.
3573          *
3574          * Also, initialize the depCount fields.
3575          */
3576         tocsByDumpId = (TocEntry **) calloc(AH->maxDumpId, sizeof(TocEntry *));
3577         for (te = AH->toc->next; te != AH->toc; te = te->next)
3578         {
3579                 tocsByDumpId[te->dumpId - 1] = te;
3580                 te->depCount = te->nDeps;
3581         }
3582
3583         /*
3584          * POST_DATA items that are shown as depending on a table need to be
3585          * re-pointed to depend on that table's data, instead.  This ensures they
3586          * won't get scheduled until the data has been loaded.  We handle this by
3587          * first finding TABLE/TABLE DATA pairs and then scanning all the
3588          * dependencies.
3589          *
3590          * Note: currently, a TABLE DATA should always have exactly one
3591          * dependency, on its TABLE item.  So we don't bother to search,
3592          * but look just at the first dependency.  We do trouble to make sure
3593          * that it's a TABLE, if possible.  However, if the dependency isn't
3594          * in the archive then just assume it was a TABLE; this is to cover
3595          * cases where the table was suppressed but we have the data and some
3596          * dependent post-data items.
3597          */
3598         for (te = AH->toc->next; te != AH->toc; te = te->next)
3599         {
3600                 if (strcmp(te->desc, "TABLE DATA") == 0 && te->nDeps > 0)
3601                 {
3602                         DumpId  tableId = te->dependencies[0];
3603
3604                         if (tocsByDumpId[tableId - 1] == NULL ||
3605                                 strcmp(tocsByDumpId[tableId - 1]->desc, "TABLE") == 0)
3606                         {
3607                                 repoint_table_dependencies(AH, tableId, te->dumpId);
3608                         }
3609                 }
3610         }
3611
3612         /*
3613          * Pre-8.4 versions of pg_dump neglected to set up a dependency from
3614          * BLOB COMMENTS to BLOBS.  Cope.  (We assume there's only one BLOBS
3615          * and only one BLOB COMMENTS in such files.)
3616          */
3617         if (AH->version < K_VERS_1_11)
3618         {
3619                 for (te = AH->toc->next; te != AH->toc; te = te->next)
3620                 {
3621                         if (strcmp(te->desc, "BLOB COMMENTS") == 0 && te->nDeps == 0)
3622                         {
3623                                 TocEntry   *te2;
3624
3625                                 for (te2 = AH->toc->next; te2 != AH->toc; te2 = te2->next)
3626                                 {
3627                                         if (strcmp(te2->desc, "BLOBS") == 0)
3628                                         {
3629                                                 te->dependencies = (DumpId *) malloc(sizeof(DumpId));
3630                                                 te->dependencies[0] = te2->dumpId;
3631                                                 te->nDeps++;
3632                                                 te->depCount++;
3633                                                 break;
3634                                         }
3635                                 }
3636                                 break;
3637                         }
3638                 }
3639         }
3640
3641         /*
3642          * It is possible that the dependencies list items that are not in the
3643          * archive at all.  Subtract such items from the depCounts.
3644          */
3645         for (te = AH->toc->next; te != AH->toc; te = te->next)
3646         {
3647                 for (i = 0; i < te->nDeps; i++)
3648                 {
3649                         if (tocsByDumpId[te->dependencies[i] - 1] == NULL)
3650                                 te->depCount--;
3651                 }
3652         }
3653
3654         /*
3655          * Lastly, work out the locking dependencies.
3656          */
3657         for (te = AH->toc->next; te != AH->toc; te = te->next)
3658         {
3659                 te->lockDeps = NULL;
3660                 te->nLockDeps = 0;
3661                 identify_locking_dependencies(te, tocsByDumpId);
3662         }
3663
3664         free(tocsByDumpId);
3665 }
3666
3667 /*
3668  * Change dependencies on tableId to depend on tableDataId instead,
3669  * but only in POST_DATA items.
3670  */
3671 static void
3672 repoint_table_dependencies(ArchiveHandle *AH,
3673                                                    DumpId tableId, DumpId tableDataId)
3674 {
3675         TocEntry   *te;
3676         int i;
3677
3678         for (te = AH->toc->next; te != AH->toc; te = te->next)
3679         {
3680                 if (te->section != SECTION_POST_DATA)
3681                         continue;
3682                 for (i = 0; i < te->nDeps; i++)
3683                 {
3684                         if (te->dependencies[i] == tableId)
3685                         {
3686                                 te->dependencies[i] = tableDataId;
3687                                 ahlog(AH, 2, "transferring dependency %d -> %d to %d\n",
3688                                           te->dumpId, tableId, tableDataId);
3689                         }
3690                 }
3691         }
3692 }
3693
3694 /*
3695  * Identify which objects we'll need exclusive lock on in order to restore
3696  * the given TOC entry (*other* than the one identified by the TOC entry
3697  * itself).  Record their dump IDs in the entry's lockDeps[] array.
3698  * tocsByDumpId[] is a convenience array to avoid searching the TOC
3699  * for each dependency.
3700  */
3701 static void
3702 identify_locking_dependencies(TocEntry *te, TocEntry **tocsByDumpId)
3703 {
3704         DumpId     *lockids;
3705         int                     nlockids;
3706         int                     i;
3707
3708         /* Quick exit if no dependencies at all */
3709         if (te->nDeps == 0)
3710                 return;
3711
3712         /* Exit if this entry doesn't need exclusive lock on other objects */
3713         if (!(strcmp(te->desc, "CONSTRAINT") == 0 ||
3714                   strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
3715                   strcmp(te->desc, "FK CONSTRAINT") == 0 ||
3716                   strcmp(te->desc, "RULE") == 0 ||
3717                   strcmp(te->desc, "TRIGGER") == 0))
3718                 return;
3719
3720         /*
3721          * We assume the item requires exclusive lock on each TABLE item
3722          * listed among its dependencies.
3723          */
3724         lockids = (DumpId *) malloc(te->nDeps * sizeof(DumpId));
3725         nlockids = 0;
3726         for (i = 0; i < te->nDeps; i++)
3727         {
3728                 DumpId  depid = te->dependencies[i];
3729
3730                 if (tocsByDumpId[depid - 1] &&
3731                         strcmp(tocsByDumpId[depid - 1]->desc, "TABLE") == 0)
3732                         lockids[nlockids++] = depid;
3733         }
3734
3735         if (nlockids == 0)
3736         {
3737                 free(lockids);
3738                 return;
3739         }
3740
3741         te->lockDeps = realloc(lockids, nlockids * sizeof(DumpId));
3742         te->nLockDeps = nlockids;
3743 }
3744
3745 /*
3746  * Remove the specified TOC entry from the depCounts of items that depend on
3747  * it, thereby possibly making them ready-to-run.
3748  */
3749 static void
3750 reduce_dependencies(ArchiveHandle *AH, TocEntry *te)
3751 {
3752         DumpId target = te->dumpId;
3753         int i;
3754
3755         ahlog(AH,2,"reducing dependencies for %d\n",target);
3756
3757         /*
3758          * We must examine all entries, not only the ones after the target item,
3759          * because if the user used a -L switch then the original dependency-
3760          * respecting order has been destroyed by SortTocFromFile.
3761          */
3762         for (te = AH->toc->next; te != AH->toc; te = te->next)
3763         {
3764                 for (i = 0; i < te->nDeps; i++)
3765                 {
3766                         if (te->dependencies[i] == target)
3767                                 te->depCount--;
3768                 }
3769         }
3770 }
3771
3772 /*
3773  * Set the created flag on the DATA member corresponding to the given
3774  * TABLE member
3775  */
3776 static void
3777 mark_create_done(ArchiveHandle *AH, TocEntry *te)
3778 {
3779         TocEntry   *tes;
3780
3781         for (tes = AH->toc->next; tes != AH->toc; tes = tes->next)
3782         {
3783                 if (strcmp(tes->desc, "TABLE DATA") == 0 &&
3784                         strcmp(tes->tag, te->tag) == 0 &&
3785                         strcmp(tes->namespace ? tes->namespace : "",
3786                                    te->namespace ? te->namespace : "") == 0)
3787                 {
3788                         tes->created = true;
3789                         break;
3790                 }
3791         }
3792 }
3793
3794 /*
3795  * Mark the DATA member corresponding to the given TABLE member
3796  * as not wanted
3797  */
3798 static void
3799 inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te)
3800 {
3801         RestoreOptions *ropt = AH->ropt;
3802         TocEntry   *tes;
3803
3804         ahlog(AH, 1, "table \"%s\" could not be created, will not restore its data\n",
3805                   te->tag);
3806
3807         for (tes = AH->toc->next; tes != AH->toc; tes = tes->next)
3808         {
3809                 if (strcmp(tes->desc, "TABLE DATA") == 0 &&
3810                         strcmp(tes->tag, te->tag) == 0 &&
3811                         strcmp(tes->namespace ? tes->namespace : "",
3812                                    te->namespace ? te->namespace : "") == 0)
3813                 {
3814                         /* mark it unwanted; we assume idWanted array already exists */
3815                         ropt->idWanted[tes->dumpId - 1] = false;
3816                         break;
3817                 }
3818         }
3819 }
3820
3821
3822 /*
3823  * Clone and de-clone routines used in parallel restoration.
3824  *
3825  * Enough of the structure is cloned to ensure that there is no
3826  * conflict between different threads each with their own clone.
3827  *
3828  * These could be public, but no need at present.
3829  */
3830 static ArchiveHandle *
3831 CloneArchive(ArchiveHandle *AH)
3832 {
3833         ArchiveHandle *clone;
3834
3835         /* Make a "flat" copy */
3836         clone = (ArchiveHandle *) malloc(sizeof(ArchiveHandle)); 
3837         if (clone == NULL)
3838                 die_horribly(AH, modulename, "out of memory\n");
3839         memcpy(clone, AH, sizeof(ArchiveHandle));
3840
3841         /* Handle format-independent fields */
3842         clone->pgCopyBuf = createPQExpBuffer();
3843         clone->sqlBuf = createPQExpBuffer();
3844         clone->sqlparse.tagBuf = NULL;
3845
3846         /* The clone will have its own connection, so disregard connection state */
3847         clone->connection = NULL;
3848         clone->currUser = NULL;
3849         clone->currSchema = NULL;
3850         clone->currTablespace = NULL;
3851         clone->currWithOids = -1;
3852
3853         /* savedPassword must be local in case we change it while connecting */
3854         if (clone->savedPassword)
3855                 clone->savedPassword = strdup(clone->savedPassword);
3856
3857         /* clone has its own error count, too */
3858         clone->public.n_errors = 0;
3859
3860         /* Let the format-specific code have a chance too */
3861         (clone->ClonePtr) (clone);
3862
3863         return clone;
3864 }
3865
3866 /*
3867  * Release clone-local storage.
3868  *
3869  * Note: we assume any clone-local connection was already closed.
3870  */
3871 static void
3872 DeCloneArchive(ArchiveHandle *AH)
3873 {
3874         /* Clear format-specific state */
3875         (AH->DeClonePtr) (AH);
3876
3877         /* Clear state allocated by CloneArchive */
3878         destroyPQExpBuffer(AH->pgCopyBuf);
3879         destroyPQExpBuffer(AH->sqlBuf);
3880         if (AH->sqlparse.tagBuf)
3881                 destroyPQExpBuffer(AH->sqlparse.tagBuf);
3882
3883         /* Clear any connection-local state */
3884         if (AH->currUser)
3885                 free(AH->currUser);
3886         if (AH->currSchema)
3887                 free(AH->currSchema);
3888         if (AH->currTablespace)
3889                 free(AH->currTablespace);
3890         if (AH->savedPassword)
3891                 free(AH->savedPassword);
3892
3893         free(AH);
3894 }