]> granicus.if.org Git - postgresql/blob - src/bin/pg_dump/pg_backup_archiver.c
Create an ALTER DEFAULT PRIVILEGES command, which allows users to adjust
[postgresql] / src / bin / pg_dump / pg_backup_archiver.c
1 /*-------------------------------------------------------------------------
2  *
3  * pg_backup_archiver.c
4  *
5  *      Private implementation of the archiver routines.
6  *
7  *      See the headers to pg_restore for more details.
8  *
9  * Copyright (c) 2000, Philip Warner
10  *      Rights are granted to use this software in any way so long
11  *      as this notice is not removed.
12  *
13  *      The author is not responsible for loss or damages that may
14  *      result from its use.
15  *
16  *
17  * IDENTIFICATION
18  *              $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.c,v 1.176 2009/10/05 19:24:45 tgl Exp $
19  *
20  *-------------------------------------------------------------------------
21  */
22
23 #include "pg_backup_db.h"
24 #include "dumputils.h"
25
26 #include <ctype.h>
27 #include <unistd.h>
28 #include <sys/types.h>
29 #include <sys/wait.h>
30
31 #ifdef WIN32
32 #include <io.h>
33 #endif
34
35 #include "libpq/libpq-fs.h"
36
37 /*
38  * Special exit values from worker children.  We reserve 0 for normal
39  * success; 1 and other small values should be interpreted as crashes.
40  */
41 #define WORKER_CREATE_DONE              10
42 #define WORKER_INHIBIT_DATA             11
43 #define WORKER_IGNORED_ERRORS   12
44
45 /*
46  * Unix uses exit to return result from worker child, so function is void.
47  * Windows thread result comes via function return.
48  */
49 #ifndef WIN32
50 #define parallel_restore_result void
51 #else
52 #define parallel_restore_result DWORD
53 #endif
54
55 /* IDs for worker children are either PIDs or thread handles */
56 #ifndef WIN32
57 #define thandle pid_t
58 #else
59 #define thandle HANDLE
60 #endif
61
62 /* Arguments needed for a worker child */
63 typedef struct _restore_args
64 {
65         ArchiveHandle *AH;
66         TocEntry   *te;
67 } RestoreArgs;
68
69 /* State for each parallel activity slot */
70 typedef struct _parallel_slot
71 {
72         thandle         child_id;
73         RestoreArgs *args;
74 } ParallelSlot;
75
76 #define NO_SLOT (-1)
77
78 const char *progname;
79
80 static const char *modulename = gettext_noop("archiver");
81
82
83 static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
84                  const int compression, ArchiveMode mode);
85 static void _getObjectDescription(PQExpBuffer buf, TocEntry *te,
86                                           ArchiveHandle *AH);
87 static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt, bool isData, bool acl_pass);
88
89
90 static void _doSetFixedOutputState(ArchiveHandle *AH);
91 static void _doSetSessionAuth(ArchiveHandle *AH, const char *user);
92 static void _doSetWithOids(ArchiveHandle *AH, const bool withOids);
93 static void _reconnectToDB(ArchiveHandle *AH, const char *dbname);
94 static void _becomeUser(ArchiveHandle *AH, const char *user);
95 static void _becomeOwner(ArchiveHandle *AH, TocEntry *te);
96 static void _selectOutputSchema(ArchiveHandle *AH, const char *schemaName);
97 static void _selectTablespace(ArchiveHandle *AH, const char *tablespace);
98 static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te);
99 static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te);
100 static teReqs _tocEntryRequired(TocEntry *te, RestoreOptions *ropt, bool include_acls);
101 static void _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
102 static void _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
103 static TocEntry *getTocEntryByDumpId(ArchiveHandle *AH, DumpId id);
104 static void _moveAfter(ArchiveHandle *AH, TocEntry *pos, TocEntry *te);
105 static int      _discoverArchiveFormat(ArchiveHandle *AH);
106
107 static void dump_lo_buf(ArchiveHandle *AH);
108 static void _write_msg(const char *modulename, const char *fmt, va_list ap);
109 static void _die_horribly(ArchiveHandle *AH, const char *modulename, const char *fmt, va_list ap);
110
111 static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim);
112 static OutputContext SetOutput(ArchiveHandle *AH, char *filename, int compression);
113 static void ResetOutput(ArchiveHandle *AH, OutputContext savedContext);
114
115 static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
116                                   RestoreOptions *ropt, bool is_parallel);
117 static void restore_toc_entries_parallel(ArchiveHandle *AH);
118 static thandle spawn_restore(RestoreArgs *args);
119 static thandle reap_child(ParallelSlot *slots, int n_slots, int *work_status);
120 static bool work_in_progress(ParallelSlot *slots, int n_slots);
121 static int      get_next_slot(ParallelSlot *slots, int n_slots);
122 static void par_list_header_init(TocEntry *l);
123 static void par_list_append(TocEntry *l, TocEntry *te);
124 static void par_list_remove(TocEntry *te);
125 static TocEntry *get_next_work_item(ArchiveHandle *AH,
126                                    TocEntry *ready_list,
127                                    ParallelSlot *slots, int n_slots);
128 static parallel_restore_result parallel_restore(RestoreArgs *args);
129 static void mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
130                            thandle worker, int status,
131                            ParallelSlot *slots, int n_slots);
132 static void fix_dependencies(ArchiveHandle *AH);
133 static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
134 static void repoint_table_dependencies(ArchiveHandle *AH,
135                                                    DumpId tableId, DumpId tableDataId);
136 static void identify_locking_dependencies(TocEntry *te,
137                                                           TocEntry **tocsByDumpId);
138 static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
139                                                                 TocEntry *ready_list);
140 static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
141 static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);
142 static ArchiveHandle *CloneArchive(ArchiveHandle *AH);
143 static void DeCloneArchive(ArchiveHandle *AH);
144
145
146 /*
147  *      Wrapper functions.
148  *
149  *      The objective it to make writing new formats and dumpers as simple
150  *      as possible, if necessary at the expense of extra function calls etc.
151  *
152  */
153
154
155 /* Create a new archive */
156 /* Public */
157 Archive *
158 CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
159                           const int compression, ArchiveMode mode)
160
161 {
162         ArchiveHandle *AH = _allocAH(FileSpec, fmt, compression, mode);
163
164         return (Archive *) AH;
165 }
166
167 /* Open an existing archive */
168 /* Public */
169 Archive *
170 OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
171 {
172         ArchiveHandle *AH = _allocAH(FileSpec, fmt, 0, archModeRead);
173
174         return (Archive *) AH;
175 }
176
177 /* Public */
178 void
179 CloseArchive(Archive *AHX)
180 {
181         int                     res = 0;
182         ArchiveHandle *AH = (ArchiveHandle *) AHX;
183
184         (*AH->ClosePtr) (AH);
185
186         /* Close the output */
187         if (AH->gzOut)
188                 res = GZCLOSE(AH->OF);
189         else if (AH->OF != stdout)
190                 res = fclose(AH->OF);
191
192         if (res != 0)
193                 die_horribly(AH, modulename, "could not close output file: %s\n",
194                                          strerror(errno));
195 }
196
197 /* Public */
198 void
199 RestoreArchive(Archive *AHX, RestoreOptions *ropt)
200 {
201         ArchiveHandle *AH = (ArchiveHandle *) AHX;
202         TocEntry   *te;
203         teReqs          reqs;
204         OutputContext sav;
205
206         AH->ropt = ropt;
207         AH->stage = STAGE_INITIALIZING;
208
209         /*
210          * Check for nonsensical option combinations.
211          *
212          * NB: create+dropSchema is useless because if you're creating the DB,
213          * there's no need to drop individual items in it.  Moreover, if we tried
214          * to do that then we'd issue the drops in the database initially
215          * connected to, not the one we will create, which is very bad...
216          */
217         if (ropt->create && ropt->dropSchema)
218                 die_horribly(AH, modulename, "-C and -c are incompatible options\n");
219
220         /*
221          * -1 is not compatible with -C, because we can't create a database inside
222          * a transaction block.
223          */
224         if (ropt->create && ropt->single_txn)
225                 die_horribly(AH, modulename, "-C and -1 are incompatible options\n");
226
227         /*
228          * Make sure we won't need (de)compression we haven't got
229          */
230 #ifndef HAVE_LIBZ
231         if (AH->compression != 0 && AH->PrintTocDataPtr !=NULL)
232         {
233                 for (te = AH->toc->next; te != AH->toc; te = te->next)
234                 {
235                         reqs = _tocEntryRequired(te, ropt, false);
236                         if (te->hadDumper && (reqs & REQ_DATA) != 0)
237                                 die_horribly(AH, modulename, "cannot restore from compressed archive (compression not supported in this installation)\n");
238                 }
239         }
240 #endif
241
242         /*
243          * If we're using a DB connection, then connect it.
244          */
245         if (ropt->useDB)
246         {
247                 ahlog(AH, 1, "connecting to database for restore\n");
248                 if (AH->version < K_VERS_1_3)
249                         die_horribly(AH, modulename, "direct database connections are not supported in pre-1.3 archives\n");
250
251                 /* XXX Should get this from the archive */
252                 AHX->minRemoteVersion = 070100;
253                 AHX->maxRemoteVersion = 999999;
254
255                 ConnectDatabase(AHX, ropt->dbname,
256                                                 ropt->pghost, ropt->pgport, ropt->username,
257                                                 ropt->promptPassword);
258
259                 /*
260                  * If we're talking to the DB directly, don't send comments since they
261                  * obscure SQL when displaying errors
262                  */
263                 AH->noTocComments = 1;
264         }
265
266         /*
267          * Work out if we have an implied data-only restore. This can happen if
268          * the dump was data only or if the user has used a toc list to exclude
269          * all of the schema data. All we do is look for schema entries - if none
270          * are found then we set the dataOnly flag.
271          *
272          * We could scan for wanted TABLE entries, but that is not the same as
273          * dataOnly. At this stage, it seems unnecessary (6-Mar-2001).
274          */
275         if (!ropt->dataOnly)
276         {
277                 int                     impliedDataOnly = 1;
278
279                 for (te = AH->toc->next; te != AH->toc; te = te->next)
280                 {
281                         reqs = _tocEntryRequired(te, ropt, true);
282                         if ((reqs & REQ_SCHEMA) != 0)
283                         {                                       /* It's schema, and it's wanted */
284                                 impliedDataOnly = 0;
285                                 break;
286                         }
287                 }
288                 if (impliedDataOnly)
289                 {
290                         ropt->dataOnly = impliedDataOnly;
291                         ahlog(AH, 1, "implied data-only restore\n");
292                 }
293         }
294
295         /*
296          * Setup the output file if necessary.
297          */
298         if (ropt->filename || ropt->compression)
299                 sav = SetOutput(AH, ropt->filename, ropt->compression);
300
301         ahprintf(AH, "--\n-- PostgreSQL database dump\n--\n\n");
302
303         if (AH->public.verbose)
304                 dumpTimestamp(AH, "Started on", AH->createDate);
305
306         if (ropt->single_txn)
307         {
308                 if (AH->connection)
309                         StartTransaction(AH);
310                 else
311                         ahprintf(AH, "BEGIN;\n\n");
312         }
313
314         /*
315          * Establish important parameter values right away.
316          */
317         _doSetFixedOutputState(AH);
318
319         AH->stage = STAGE_PROCESSING;
320
321         /*
322          * Drop the items at the start, in reverse order
323          */
324         if (ropt->dropSchema)
325         {
326                 for (te = AH->toc->prev; te != AH->toc; te = te->prev)
327                 {
328                         AH->currentTE = te;
329
330                         reqs = _tocEntryRequired(te, ropt, false /* needn't drop ACLs */ );
331                         if (((reqs & REQ_SCHEMA) != 0) && te->dropStmt)
332                         {
333                                 /* We want the schema */
334                                 ahlog(AH, 1, "dropping %s %s\n", te->desc, te->tag);
335                                 /* Select owner and schema as necessary */
336                                 _becomeOwner(AH, te);
337                                 _selectOutputSchema(AH, te->namespace);
338                                 /* Drop it */
339                                 ahprintf(AH, "%s", te->dropStmt);
340                         }
341                 }
342
343                 /*
344                  * _selectOutputSchema may have set currSchema to reflect the effect
345                  * of a "SET search_path" command it emitted.  However, by now we may
346                  * have dropped that schema; or it might not have existed in the first
347                  * place.  In either case the effective value of search_path will not
348                  * be what we think.  Forcibly reset currSchema so that we will
349                  * re-establish the search_path setting when needed (after creating
350                  * the schema).
351                  *
352                  * If we treated users as pg_dump'able objects then we'd need to reset
353                  * currUser here too.
354                  */
355                 if (AH->currSchema)
356                         free(AH->currSchema);
357                 AH->currSchema = NULL;
358         }
359
360         /*
361          * In serial mode, we now process each non-ACL TOC entry.
362          *
363          * In parallel mode, turn control over to the parallel-restore logic.
364          */
365         if (ropt->number_of_jobs > 1 && ropt->useDB)
366                 restore_toc_entries_parallel(AH);
367         else
368         {
369                 for (te = AH->toc->next; te != AH->toc; te = te->next)
370                         (void) restore_toc_entry(AH, te, ropt, false);
371         }
372
373         /*
374          * Scan TOC again to output ownership commands and ACLs
375          */
376         for (te = AH->toc->next; te != AH->toc; te = te->next)
377         {
378                 AH->currentTE = te;
379
380                 /* Work out what, if anything, we want from this entry */
381                 reqs = _tocEntryRequired(te, ropt, true);
382
383                 if ((reqs & REQ_SCHEMA) != 0)   /* We want the schema */
384                 {
385                         ahlog(AH, 1, "setting owner and privileges for %s %s\n",
386                                   te->desc, te->tag);
387                         _printTocEntry(AH, te, ropt, false, true);
388                 }
389         }
390
391         if (ropt->single_txn)
392         {
393                 if (AH->connection)
394                         CommitTransaction(AH);
395                 else
396                         ahprintf(AH, "COMMIT;\n\n");
397         }
398
399         if (AH->public.verbose)
400                 dumpTimestamp(AH, "Completed on", time(NULL));
401
402         ahprintf(AH, "--\n-- PostgreSQL database dump complete\n--\n\n");
403
404         /*
405          * Clean up & we're done.
406          */
407         AH->stage = STAGE_FINALIZING;
408
409         if (ropt->filename || ropt->compression)
410                 ResetOutput(AH, sav);
411
412         if (ropt->useDB)
413         {
414                 PQfinish(AH->connection);
415                 AH->connection = NULL;
416         }
417 }
418
419 /*
420  * Restore a single TOC item.  Used in both parallel and non-parallel restore;
421  * is_parallel is true if we are in a worker child process.
422  *
423  * Returns 0 normally, but WORKER_CREATE_DONE or WORKER_INHIBIT_DATA if
424  * the parallel parent has to make the corresponding status update.
425  */
426 static int
427 restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
428                                   RestoreOptions *ropt, bool is_parallel)
429 {
430         int                     retval = 0;
431         teReqs          reqs;
432         bool            defnDumped;
433
434         AH->currentTE = te;
435
436         /* Work out what, if anything, we want from this entry */
437         reqs = _tocEntryRequired(te, ropt, false);
438
439         /* Dump any relevant dump warnings to stderr */
440         if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0)
441         {
442                 if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0)
443                         write_msg(modulename, "warning from original dump file: %s\n", te->defn);
444                 else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0)
445                         write_msg(modulename, "warning from original dump file: %s\n", te->copyStmt);
446         }
447
448         defnDumped = false;
449
450         if ((reqs & REQ_SCHEMA) != 0)           /* We want the schema */
451         {
452                 ahlog(AH, 1, "creating %s %s\n", te->desc, te->tag);
453
454                 _printTocEntry(AH, te, ropt, false, false);
455                 defnDumped = true;
456
457                 if (strcmp(te->desc, "TABLE") == 0)
458                 {
459                         if (AH->lastErrorTE == te)
460                         {
461                                 /*
462                                  * We failed to create the table. If
463                                  * --no-data-for-failed-tables was given, mark the
464                                  * corresponding TABLE DATA to be ignored.
465                                  *
466                                  * In the parallel case this must be done in the parent, so we
467                                  * just set the return value.
468                                  */
469                                 if (ropt->noDataForFailedTables)
470                                 {
471                                         if (is_parallel)
472                                                 retval = WORKER_INHIBIT_DATA;
473                                         else
474                                                 inhibit_data_for_failed_table(AH, te);
475                                 }
476                         }
477                         else
478                         {
479                                 /*
480                                  * We created the table successfully.  Mark the corresponding
481                                  * TABLE DATA for possible truncation.
482                                  *
483                                  * In the parallel case this must be done in the parent, so we
484                                  * just set the return value.
485                                  */
486                                 if (is_parallel)
487                                         retval = WORKER_CREATE_DONE;
488                                 else
489                                         mark_create_done(AH, te);
490                         }
491                 }
492
493                 /* If we created a DB, connect to it... */
494                 if (strcmp(te->desc, "DATABASE") == 0)
495                 {
496                         ahlog(AH, 1, "connecting to new database \"%s\"\n", te->tag);
497                         _reconnectToDB(AH, te->tag);
498                         ropt->dbname = strdup(te->tag);
499                 }
500         }
501
502         /*
503          * If we have a data component, then process it
504          */
505         if ((reqs & REQ_DATA) != 0)
506         {
507                 /*
508                  * hadDumper will be set if there is genuine data component for this
509                  * node. Otherwise, we need to check the defn field for statements
510                  * that need to be executed in data-only restores.
511                  */
512                 if (te->hadDumper)
513                 {
514                         /*
515                          * If we can output the data, then restore it.
516                          */
517                         if (AH->PrintTocDataPtr !=NULL && (reqs & REQ_DATA) != 0)
518                         {
519                                 _printTocEntry(AH, te, ropt, true, false);
520
521                                 if (strcmp(te->desc, "BLOBS") == 0 ||
522                                         strcmp(te->desc, "BLOB COMMENTS") == 0)
523                                 {
524                                         ahlog(AH, 1, "restoring %s\n", te->desc);
525
526                                         _selectOutputSchema(AH, "pg_catalog");
527
528                                         (*AH->PrintTocDataPtr) (AH, te, ropt);
529                                 }
530                                 else
531                                 {
532                                         _disableTriggersIfNecessary(AH, te, ropt);
533
534                                         /* Select owner and schema as necessary */
535                                         _becomeOwner(AH, te);
536                                         _selectOutputSchema(AH, te->namespace);
537
538                                         ahlog(AH, 1, "restoring data for table \"%s\"\n",
539                                                   te->tag);
540
541                                         /*
542                                          * In parallel restore, if we created the table earlier in
543                                          * the run then we wrap the COPY in a transaction and
544                                          * precede it with a TRUNCATE.  If archiving is not on
545                                          * this prevents WAL-logging the COPY.  This obtains a
546                                          * speedup similar to that from using single_txn mode in
547                                          * non-parallel restores.
548                                          */
549                                         if (is_parallel && te->created)
550                                         {
551                                                 /*
552                                                  * Parallel restore is always talking directly to a
553                                                  * server, so no need to see if we should issue BEGIN.
554                                                  */
555                                                 StartTransaction(AH);
556
557                                                 /*
558                                                  * If the server version is >= 8.4, make sure we issue
559                                                  * TRUNCATE with ONLY so that child tables are not
560                                                  * wiped.
561                                                  */
562                                                 ahprintf(AH, "TRUNCATE TABLE %s%s;\n\n",
563                                                                  (PQserverVersion(AH->connection) >= 80400 ?
564                                                                   "ONLY " : ""),
565                                                                  fmtId(te->tag));
566                                         }
567
568                                         /*
569                                          * If we have a copy statement, use it. As of V1.3, these
570                                          * are separate to allow easy import from withing a
571                                          * database connection. Pre 1.3 archives can not use DB
572                                          * connections and are sent to output only.
573                                          *
574                                          * For V1.3+, the table data MUST have a copy statement so
575                                          * that we can go into appropriate mode with libpq.
576                                          */
577                                         if (te->copyStmt && strlen(te->copyStmt) > 0)
578                                         {
579                                                 ahprintf(AH, "%s", te->copyStmt);
580                                                 AH->writingCopyData = true;
581                                         }
582
583                                         (*AH->PrintTocDataPtr) (AH, te, ropt);
584
585                                         AH->writingCopyData = false;
586
587                                         /* close out the transaction started above */
588                                         if (is_parallel && te->created)
589                                                 CommitTransaction(AH);
590
591                                         _enableTriggersIfNecessary(AH, te, ropt);
592                                 }
593                         }
594                 }
595                 else if (!defnDumped)
596                 {
597                         /* If we haven't already dumped the defn part, do so now */
598                         ahlog(AH, 1, "executing %s %s\n", te->desc, te->tag);
599                         _printTocEntry(AH, te, ropt, false, false);
600                 }
601         }
602
603         return retval;
604 }
605
606 /*
607  * Allocate a new RestoreOptions block.
608  * This is mainly so we can initialize it, but also for future expansion,
609  */
610 RestoreOptions *
611 NewRestoreOptions(void)
612 {
613         RestoreOptions *opts;
614
615         opts = (RestoreOptions *) calloc(1, sizeof(RestoreOptions));
616
617         /* set any fields that shouldn't default to zeroes */
618         opts->format = archUnknown;
619         opts->promptPassword = TRI_DEFAULT;
620
621         return opts;
622 }
623
624 static void
625 _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt)
626 {
627         /* This hack is only needed in a data-only restore */
628         if (!ropt->dataOnly || !ropt->disable_triggers)
629                 return;
630
631         ahlog(AH, 1, "disabling triggers for %s\n", te->tag);
632
633         /*
634          * Become superuser if possible, since they are the only ones who can
635          * disable constraint triggers.  If -S was not given, assume the initial
636          * user identity is a superuser.  (XXX would it be better to become the
637          * table owner?)
638          */
639         _becomeUser(AH, ropt->superuser);
640
641         /*
642          * Disable them.
643          */
644         _selectOutputSchema(AH, te->namespace);
645
646         ahprintf(AH, "ALTER TABLE %s DISABLE TRIGGER ALL;\n\n",
647                          fmtId(te->tag));
648 }
649
650 static void
651 _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt)
652 {
653         /* This hack is only needed in a data-only restore */
654         if (!ropt->dataOnly || !ropt->disable_triggers)
655                 return;
656
657         ahlog(AH, 1, "enabling triggers for %s\n", te->tag);
658
659         /*
660          * Become superuser if possible, since they are the only ones who can
661          * disable constraint triggers.  If -S was not given, assume the initial
662          * user identity is a superuser.  (XXX would it be better to become the
663          * table owner?)
664          */
665         _becomeUser(AH, ropt->superuser);
666
667         /*
668          * Enable them.
669          */
670         _selectOutputSchema(AH, te->namespace);
671
672         ahprintf(AH, "ALTER TABLE %s ENABLE TRIGGER ALL;\n\n",
673                          fmtId(te->tag));
674 }
675
676 /*
677  * This is a routine that is part of the dumper interface, hence the 'Archive*' parameter.
678  */
679
680 /* Public */
681 size_t
682 WriteData(Archive *AHX, const void *data, size_t dLen)
683 {
684         ArchiveHandle *AH = (ArchiveHandle *) AHX;
685
686         if (!AH->currToc)
687                 die_horribly(AH, modulename, "internal error -- WriteData cannot be called outside the context of a DataDumper routine\n");
688
689         return (*AH->WriteDataPtr) (AH, data, dLen);
690 }
691
692 /*
693  * Create a new TOC entry. The TOC was designed as a TOC, but is now the
694  * repository for all metadata. But the name has stuck.
695  */
696
697 /* Public */
698 void
699 ArchiveEntry(Archive *AHX,
700                          CatalogId catalogId, DumpId dumpId,
701                          const char *tag,
702                          const char *namespace,
703                          const char *tablespace,
704                          const char *owner, bool withOids,
705                          const char *desc, teSection section,
706                          const char *defn,
707                          const char *dropStmt, const char *copyStmt,
708                          const DumpId *deps, int nDeps,
709                          DataDumperPtr dumpFn, void *dumpArg)
710 {
711         ArchiveHandle *AH = (ArchiveHandle *) AHX;
712         TocEntry   *newToc;
713
714         newToc = (TocEntry *) calloc(1, sizeof(TocEntry));
715         if (!newToc)
716                 die_horribly(AH, modulename, "out of memory\n");
717
718         AH->tocCount++;
719         if (dumpId > AH->maxDumpId)
720                 AH->maxDumpId = dumpId;
721
722         newToc->prev = AH->toc->prev;
723         newToc->next = AH->toc;
724         AH->toc->prev->next = newToc;
725         AH->toc->prev = newToc;
726
727         newToc->catalogId = catalogId;
728         newToc->dumpId = dumpId;
729         newToc->section = section;
730
731         newToc->tag = strdup(tag);
732         newToc->namespace = namespace ? strdup(namespace) : NULL;
733         newToc->tablespace = tablespace ? strdup(tablespace) : NULL;
734         newToc->owner = strdup(owner);
735         newToc->withOids = withOids;
736         newToc->desc = strdup(desc);
737         newToc->defn = strdup(defn);
738         newToc->dropStmt = strdup(dropStmt);
739         newToc->copyStmt = copyStmt ? strdup(copyStmt) : NULL;
740
741         if (nDeps > 0)
742         {
743                 newToc->dependencies = (DumpId *) malloc(nDeps * sizeof(DumpId));
744                 memcpy(newToc->dependencies, deps, nDeps * sizeof(DumpId));
745                 newToc->nDeps = nDeps;
746         }
747         else
748         {
749                 newToc->dependencies = NULL;
750                 newToc->nDeps = 0;
751         }
752
753         newToc->dataDumper = dumpFn;
754         newToc->dataDumperArg = dumpArg;
755         newToc->hadDumper = dumpFn ? true : false;
756
757         newToc->formatData = NULL;
758
759         if (AH->ArchiveEntryPtr !=NULL)
760                 (*AH->ArchiveEntryPtr) (AH, newToc);
761 }
762
763 /* Public */
764 void
765 PrintTOCSummary(Archive *AHX, RestoreOptions *ropt)
766 {
767         ArchiveHandle *AH = (ArchiveHandle *) AHX;
768         TocEntry   *te;
769         OutputContext sav;
770         char       *fmtName;
771
772         if (ropt->filename)
773                 sav = SetOutput(AH, ropt->filename, 0 /* no compression */ );
774
775         ahprintf(AH, ";\n; Archive created at %s", ctime(&AH->createDate));
776         ahprintf(AH, ";     dbname: %s\n;     TOC Entries: %d\n;     Compression: %d\n",
777                          AH->archdbname, AH->tocCount, AH->compression);
778
779         switch (AH->format)
780         {
781                 case archFiles:
782                         fmtName = "FILES";
783                         break;
784                 case archCustom:
785                         fmtName = "CUSTOM";
786                         break;
787                 case archTar:
788                         fmtName = "TAR";
789                         break;
790                 default:
791                         fmtName = "UNKNOWN";
792         }
793
794         ahprintf(AH, ";     Dump Version: %d.%d-%d\n", AH->vmaj, AH->vmin, AH->vrev);
795         ahprintf(AH, ";     Format: %s\n", fmtName);
796         ahprintf(AH, ";     Integer: %d bytes\n", (int) AH->intSize);
797         ahprintf(AH, ";     Offset: %d bytes\n", (int) AH->offSize);
798         if (AH->archiveRemoteVersion)
799                 ahprintf(AH, ";     Dumped from database version: %s\n",
800                                  AH->archiveRemoteVersion);
801         if (AH->archiveDumpVersion)
802                 ahprintf(AH, ";     Dumped by pg_dump version: %s\n",
803                                  AH->archiveDumpVersion);
804
805         ahprintf(AH, ";\n;\n; Selected TOC Entries:\n;\n");
806
807         for (te = AH->toc->next; te != AH->toc; te = te->next)
808         {
809                 if (ropt->verbose || _tocEntryRequired(te, ropt, true) != 0)
810                         ahprintf(AH, "%d; %u %u %s %s %s %s\n", te->dumpId,
811                                          te->catalogId.tableoid, te->catalogId.oid,
812                                          te->desc, te->namespace ? te->namespace : "-",
813                                          te->tag, te->owner);
814                 if (ropt->verbose && te->nDeps > 0)
815                 {
816                         int                     i;
817
818                         ahprintf(AH, ";\tdepends on:");
819                         for (i = 0; i < te->nDeps; i++)
820                                 ahprintf(AH, " %d", te->dependencies[i]);
821                         ahprintf(AH, "\n");
822                 }
823         }
824
825         if (ropt->filename)
826                 ResetOutput(AH, sav);
827 }
828
829 /***********
830  * BLOB Archival
831  ***********/
832
833 /* Called by a dumper to signal start of a BLOB */
834 int
835 StartBlob(Archive *AHX, Oid oid)
836 {
837         ArchiveHandle *AH = (ArchiveHandle *) AHX;
838
839         if (!AH->StartBlobPtr)
840                 die_horribly(AH, modulename, "large-object output not supported in chosen format\n");
841
842         (*AH->StartBlobPtr) (AH, AH->currToc, oid);
843
844         return 1;
845 }
846
847 /* Called by a dumper to signal end of a BLOB */
848 int
849 EndBlob(Archive *AHX, Oid oid)
850 {
851         ArchiveHandle *AH = (ArchiveHandle *) AHX;
852
853         if (AH->EndBlobPtr)
854                 (*AH->EndBlobPtr) (AH, AH->currToc, oid);
855
856         return 1;
857 }
858
859 /**********
860  * BLOB Restoration
861  **********/
862
863 /*
864  * Called by a format handler before any blobs are restored
865  */
866 void
867 StartRestoreBlobs(ArchiveHandle *AH)
868 {
869         if (!AH->ropt->single_txn)
870         {
871                 if (AH->connection)
872                         StartTransaction(AH);
873                 else
874                         ahprintf(AH, "BEGIN;\n\n");
875         }
876
877         AH->blobCount = 0;
878 }
879
880 /*
881  * Called by a format handler after all blobs are restored
882  */
883 void
884 EndRestoreBlobs(ArchiveHandle *AH)
885 {
886         if (!AH->ropt->single_txn)
887         {
888                 if (AH->connection)
889                         CommitTransaction(AH);
890                 else
891                         ahprintf(AH, "COMMIT;\n\n");
892         }
893
894         ahlog(AH, 1, ngettext("restored %d large object\n",
895                                                   "restored %d large objects\n",
896                                                   AH->blobCount),
897                   AH->blobCount);
898 }
899
900
901 /*
902  * Called by a format handler to initiate restoration of a blob
903  */
904 void
905 StartRestoreBlob(ArchiveHandle *AH, Oid oid, bool drop)
906 {
907         Oid                     loOid;
908
909         AH->blobCount++;
910
911         /* Initialize the LO Buffer */
912         AH->lo_buf_used = 0;
913
914         ahlog(AH, 2, "restoring large object with OID %u\n", oid);
915
916         if (drop)
917                 ahprintf(AH, "SELECT CASE WHEN EXISTS(SELECT 1 FROM pg_catalog.pg_largeobject WHERE loid = '%u') THEN pg_catalog.lo_unlink('%u') END;\n",
918                                  oid, oid);
919
920         if (AH->connection)
921         {
922                 loOid = lo_create(AH->connection, oid);
923                 if (loOid == 0 || loOid != oid)
924                         die_horribly(AH, modulename, "could not create large object %u\n",
925                                                  oid);
926
927                 AH->loFd = lo_open(AH->connection, oid, INV_WRITE);
928                 if (AH->loFd == -1)
929                         die_horribly(AH, modulename, "could not open large object\n");
930         }
931         else
932         {
933                 ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n",
934                                  oid, INV_WRITE);
935         }
936
937         AH->writingBlob = 1;
938 }
939
940 void
941 EndRestoreBlob(ArchiveHandle *AH, Oid oid)
942 {
943         if (AH->lo_buf_used > 0)
944         {
945                 /* Write remaining bytes from the LO buffer */
946                 dump_lo_buf(AH);
947         }
948
949         AH->writingBlob = 0;
950
951         if (AH->connection)
952         {
953                 lo_close(AH->connection, AH->loFd);
954                 AH->loFd = -1;
955         }
956         else
957         {
958                 ahprintf(AH, "SELECT pg_catalog.lo_close(0);\n\n");
959         }
960 }
961
962 /***********
963  * Sorting and Reordering
964  ***********/
965
966 void
967 SortTocFromFile(Archive *AHX, RestoreOptions *ropt)
968 {
969         ArchiveHandle *AH = (ArchiveHandle *) AHX;
970         FILE       *fh;
971         char            buf[1024];
972         char       *cmnt;
973         char       *endptr;
974         DumpId          id;
975         TocEntry   *te;
976         TocEntry   *tePrev;
977
978         /* Allocate space for the 'wanted' array, and init it */
979         ropt->idWanted = (bool *) malloc(sizeof(bool) * AH->maxDumpId);
980         memset(ropt->idWanted, 0, sizeof(bool) * AH->maxDumpId);
981
982         /* Set prev entry as head of list */
983         tePrev = AH->toc;
984
985         /* Setup the file */
986         fh = fopen(ropt->tocFile, PG_BINARY_R);
987         if (!fh)
988                 die_horribly(AH, modulename, "could not open TOC file \"%s\": %s\n",
989                                          ropt->tocFile, strerror(errno));
990
991         while (fgets(buf, sizeof(buf), fh) != NULL)
992         {
993                 /* Truncate line at comment, if any */
994                 cmnt = strchr(buf, ';');
995                 if (cmnt != NULL)
996                         cmnt[0] = '\0';
997
998                 /* Ignore if all blank */
999                 if (strspn(buf, " \t\r") == strlen(buf))
1000                         continue;
1001
1002                 /* Get an ID, check it's valid and not already seen */
1003                 id = strtol(buf, &endptr, 10);
1004                 if (endptr == buf || id <= 0 || id > AH->maxDumpId ||
1005                         ropt->idWanted[id - 1])
1006                 {
1007                         write_msg(modulename, "WARNING: line ignored: %s\n", buf);
1008                         continue;
1009                 }
1010
1011                 /* Find TOC entry */
1012                 te = getTocEntryByDumpId(AH, id);
1013                 if (!te)
1014                         die_horribly(AH, modulename, "could not find entry for ID %d\n",
1015                                                  id);
1016
1017                 ropt->idWanted[id - 1] = true;
1018
1019                 _moveAfter(AH, tePrev, te);
1020                 tePrev = te;
1021         }
1022
1023         if (fclose(fh) != 0)
1024                 die_horribly(AH, modulename, "could not close TOC file: %s\n",
1025                                          strerror(errno));
1026 }
1027
1028 /*
1029  * Set up a dummy ID filter that selects all dump IDs
1030  */
1031 void
1032 InitDummyWantedList(Archive *AHX, RestoreOptions *ropt)
1033 {
1034         ArchiveHandle *AH = (ArchiveHandle *) AHX;
1035
1036         /* Allocate space for the 'wanted' array, and init it to 1's */
1037         ropt->idWanted = (bool *) malloc(sizeof(bool) * AH->maxDumpId);
1038         memset(ropt->idWanted, 1, sizeof(bool) * AH->maxDumpId);
1039 }
1040
1041 /**********************
1042  * 'Convenience functions that look like standard IO functions
1043  * for writing data when in dump mode.
1044  **********************/
1045
1046 /* Public */
1047 int
1048 archputs(const char *s, Archive *AH)
1049 {
1050         return WriteData(AH, s, strlen(s));
1051 }
1052
1053 /* Public */
1054 int
1055 archprintf(Archive *AH, const char *fmt,...)
1056 {
1057         char       *p = NULL;
1058         va_list         ap;
1059         int                     bSize = strlen(fmt) + 256;
1060         int                     cnt = -1;
1061
1062         /*
1063          * This is paranoid: deal with the possibility that vsnprintf is willing
1064          * to ignore trailing null or returns > 0 even if string does not fit. It
1065          * may be the case that it returns cnt = bufsize
1066          */
1067         while (cnt < 0 || cnt >= (bSize - 1))
1068         {
1069                 if (p != NULL)
1070                         free(p);
1071                 bSize *= 2;
1072                 p = (char *) malloc(bSize);
1073                 if (p == NULL)
1074                         exit_horribly(AH, modulename, "out of memory\n");
1075                 va_start(ap, fmt);
1076                 cnt = vsnprintf(p, bSize, fmt, ap);
1077                 va_end(ap);
1078         }
1079         WriteData(AH, p, cnt);
1080         free(p);
1081         return cnt;
1082 }
1083
1084
1085 /*******************************
1086  * Stuff below here should be 'private' to the archiver routines
1087  *******************************/
1088
1089 static OutputContext
1090 SetOutput(ArchiveHandle *AH, char *filename, int compression)
1091 {
1092         OutputContext sav;
1093         int                     fn;
1094
1095         /* Replace the AH output file handle */
1096         sav.OF = AH->OF;
1097         sav.gzOut = AH->gzOut;
1098
1099         if (filename)
1100                 fn = -1;
1101         else if (AH->FH)
1102                 fn = fileno(AH->FH);
1103         else if (AH->fSpec)
1104         {
1105                 fn = -1;
1106                 filename = AH->fSpec;
1107         }
1108         else
1109                 fn = fileno(stdout);
1110
1111         /* If compression explicitly requested, use gzopen */
1112 #ifdef HAVE_LIBZ
1113         if (compression != 0)
1114         {
1115                 char            fmode[10];
1116
1117                 /* Don't use PG_BINARY_x since this is zlib */
1118                 sprintf(fmode, "wb%d", compression);
1119                 if (fn >= 0)
1120                         AH->OF = gzdopen(dup(fn), fmode);
1121                 else
1122                         AH->OF = gzopen(filename, fmode);
1123                 AH->gzOut = 1;
1124         }
1125         else
1126 #endif
1127         {                                                       /* Use fopen */
1128                 if (AH->mode == archModeAppend)
1129                 {
1130                         if (fn >= 0)
1131                                 AH->OF = fdopen(dup(fn), PG_BINARY_A);
1132                         else
1133                                 AH->OF = fopen(filename, PG_BINARY_A);
1134                 }
1135                 else
1136                 {
1137                         if (fn >= 0)
1138                                 AH->OF = fdopen(dup(fn), PG_BINARY_W);
1139                         else
1140                                 AH->OF = fopen(filename, PG_BINARY_W);
1141                 }
1142                 AH->gzOut = 0;
1143         }
1144
1145         if (!AH->OF)
1146         {
1147                 if (filename)
1148                         die_horribly(AH, modulename, "could not open output file \"%s\": %s\n",
1149                                                  filename, strerror(errno));
1150                 else
1151                         die_horribly(AH, modulename, "could not open output file: %s\n",
1152                                                  strerror(errno));
1153         }
1154
1155         return sav;
1156 }
1157
1158 static void
1159 ResetOutput(ArchiveHandle *AH, OutputContext sav)
1160 {
1161         int                     res;
1162
1163         if (AH->gzOut)
1164                 res = GZCLOSE(AH->OF);
1165         else
1166                 res = fclose(AH->OF);
1167
1168         if (res != 0)
1169                 die_horribly(AH, modulename, "could not close output file: %s\n",
1170                                          strerror(errno));
1171
1172         AH->gzOut = sav.gzOut;
1173         AH->OF = sav.OF;
1174 }
1175
1176
1177
1178 /*
1179  *      Print formatted text to the output file (usually stdout).
1180  */
1181 int
1182 ahprintf(ArchiveHandle *AH, const char *fmt,...)
1183 {
1184         char       *p = NULL;
1185         va_list         ap;
1186         int                     bSize = strlen(fmt) + 256;              /* Should be enough */
1187         int                     cnt = -1;
1188
1189         /*
1190          * This is paranoid: deal with the possibility that vsnprintf is willing
1191          * to ignore trailing null
1192          */
1193
1194         /*
1195          * or returns > 0 even if string does not fit. It may be the case that it
1196          * returns cnt = bufsize
1197          */
1198         while (cnt < 0 || cnt >= (bSize - 1))
1199         {
1200                 if (p != NULL)
1201                         free(p);
1202                 bSize *= 2;
1203                 p = (char *) malloc(bSize);
1204                 if (p == NULL)
1205                         die_horribly(AH, modulename, "out of memory\n");
1206                 va_start(ap, fmt);
1207                 cnt = vsnprintf(p, bSize, fmt, ap);
1208                 va_end(ap);
1209         }
1210         ahwrite(p, 1, cnt, AH);
1211         free(p);
1212         return cnt;
1213 }
1214
1215 void
1216 ahlog(ArchiveHandle *AH, int level, const char *fmt,...)
1217 {
1218         va_list         ap;
1219
1220         if (AH->debugLevel < level && (!AH->public.verbose || level > 1))
1221                 return;
1222
1223         va_start(ap, fmt);
1224         _write_msg(NULL, fmt, ap);
1225         va_end(ap);
1226 }
1227
1228 /*
1229  * Single place for logic which says 'We are restoring to a direct DB connection'.
1230  */
1231 static int
1232 RestoringToDB(ArchiveHandle *AH)
1233 {
1234         return (AH->ropt && AH->ropt->useDB && AH->connection);
1235 }
1236
1237 /*
1238  * Dump the current contents of the LO data buffer while writing a BLOB
1239  */
1240 static void
1241 dump_lo_buf(ArchiveHandle *AH)
1242 {
1243         if (AH->connection)
1244         {
1245                 size_t          res;
1246
1247                 res = lo_write(AH->connection, AH->loFd, AH->lo_buf, AH->lo_buf_used);
1248                 ahlog(AH, 5, ngettext("wrote %lu byte of large object data (result = %lu)\n",
1249                                          "wrote %lu bytes of large object data (result = %lu)\n",
1250                                                           AH->lo_buf_used),
1251                           (unsigned long) AH->lo_buf_used, (unsigned long) res);
1252                 if (res != AH->lo_buf_used)
1253                         die_horribly(AH, modulename,
1254                         "could not write to large object (result: %lu, expected: %lu)\n",
1255                                            (unsigned long) res, (unsigned long) AH->lo_buf_used);
1256         }
1257         else
1258         {
1259                 PQExpBuffer buf = createPQExpBuffer();
1260
1261                 appendByteaLiteralAHX(buf,
1262                                                           (const unsigned char *) AH->lo_buf,
1263                                                           AH->lo_buf_used,
1264                                                           AH);
1265
1266                 /* Hack: turn off writingBlob so ahwrite doesn't recurse to here */
1267                 AH->writingBlob = 0;
1268                 ahprintf(AH, "SELECT pg_catalog.lowrite(0, %s);\n", buf->data);
1269                 AH->writingBlob = 1;
1270
1271                 destroyPQExpBuffer(buf);
1272         }
1273         AH->lo_buf_used = 0;
1274 }
1275
1276
1277 /*
1278  *      Write buffer to the output file (usually stdout). This is user for
1279  *      outputting 'restore' scripts etc. It is even possible for an archive
1280  *      format to create a custom output routine to 'fake' a restore if it
1281  *      wants to generate a script (see TAR output).
1282  */
1283 int
1284 ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
1285 {
1286         size_t          res;
1287
1288         if (AH->writingBlob)
1289         {
1290                 size_t          remaining = size * nmemb;
1291
1292                 while (AH->lo_buf_used + remaining > AH->lo_buf_size)
1293                 {
1294                         size_t          avail = AH->lo_buf_size - AH->lo_buf_used;
1295
1296                         memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, avail);
1297                         ptr = (const void *) ((const char *) ptr + avail);
1298                         remaining -= avail;
1299                         AH->lo_buf_used += avail;
1300                         dump_lo_buf(AH);
1301                 }
1302
1303                 memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, remaining);
1304                 AH->lo_buf_used += remaining;
1305
1306                 return size * nmemb;
1307         }
1308         else if (AH->gzOut)
1309         {
1310                 res = GZWRITE((void *) ptr, size, nmemb, AH->OF);
1311                 if (res != (nmemb * size))
1312                         die_horribly(AH, modulename, "could not write to output file: %s\n", strerror(errno));
1313                 return res;
1314         }
1315         else if (AH->CustomOutPtr)
1316         {
1317                 res = AH->CustomOutPtr (AH, ptr, size * nmemb);
1318
1319                 if (res != (nmemb * size))
1320                         die_horribly(AH, modulename, "could not write to custom output routine\n");
1321                 return res;
1322         }
1323         else
1324         {
1325                 /*
1326                  * If we're doing a restore, and it's direct to DB, and we're
1327                  * connected then send it to the DB.
1328                  */
1329                 if (RestoringToDB(AH))
1330                         return ExecuteSqlCommandBuf(AH, (void *) ptr, size * nmemb);            /* Always 1, currently */
1331                 else
1332                 {
1333                         res = fwrite((void *) ptr, size, nmemb, AH->OF);
1334                         if (res != nmemb)
1335                                 die_horribly(AH, modulename, "could not write to output file: %s\n",
1336                                                          strerror(errno));
1337                         return res;
1338                 }
1339         }
1340 }
1341
1342 /* Common exit code */
1343 static void
1344 _write_msg(const char *modulename, const char *fmt, va_list ap)
1345 {
1346         if (modulename)
1347                 fprintf(stderr, "%s: [%s] ", progname, _(modulename));
1348         else
1349                 fprintf(stderr, "%s: ", progname);
1350         vfprintf(stderr, _(fmt), ap);
1351 }
1352
1353 void
1354 write_msg(const char *modulename, const char *fmt,...)
1355 {
1356         va_list         ap;
1357
1358         va_start(ap, fmt);
1359         _write_msg(modulename, fmt, ap);
1360         va_end(ap);
1361 }
1362
1363
1364 static void
1365 _die_horribly(ArchiveHandle *AH, const char *modulename, const char *fmt, va_list ap)
1366 {
1367         _write_msg(modulename, fmt, ap);
1368
1369         if (AH)
1370         {
1371                 if (AH->public.verbose)
1372                         write_msg(NULL, "*** aborted because of error\n");
1373                 if (AH->connection)
1374                         PQfinish(AH->connection);
1375         }
1376
1377         exit(1);
1378 }
1379
1380 /* External use */
1381 void
1382 exit_horribly(Archive *AH, const char *modulename, const char *fmt,...)
1383 {
1384         va_list         ap;
1385
1386         va_start(ap, fmt);
1387         _die_horribly((ArchiveHandle *) AH, modulename, fmt, ap);
1388         va_end(ap);
1389 }
1390
1391 /* Archiver use (just different arg declaration) */
1392 void
1393 die_horribly(ArchiveHandle *AH, const char *modulename, const char *fmt,...)
1394 {
1395         va_list         ap;
1396
1397         va_start(ap, fmt);
1398         _die_horribly(AH, modulename, fmt, ap);
1399         va_end(ap);
1400 }
1401
1402 /* on some error, we may decide to go on... */
1403 void
1404 warn_or_die_horribly(ArchiveHandle *AH,
1405                                          const char *modulename, const char *fmt,...)
1406 {
1407         va_list         ap;
1408
1409         switch (AH->stage)
1410         {
1411
1412                 case STAGE_NONE:
1413                         /* Do nothing special */
1414                         break;
1415
1416                 case STAGE_INITIALIZING:
1417                         if (AH->stage != AH->lastErrorStage)
1418                                 write_msg(modulename, "Error while INITIALIZING:\n");
1419                         break;
1420
1421                 case STAGE_PROCESSING:
1422                         if (AH->stage != AH->lastErrorStage)
1423                                 write_msg(modulename, "Error while PROCESSING TOC:\n");
1424                         break;
1425
1426                 case STAGE_FINALIZING:
1427                         if (AH->stage != AH->lastErrorStage)
1428                                 write_msg(modulename, "Error while FINALIZING:\n");
1429                         break;
1430         }
1431         if (AH->currentTE != NULL && AH->currentTE != AH->lastErrorTE)
1432         {
1433                 write_msg(modulename, "Error from TOC entry %d; %u %u %s %s %s\n",
1434                                   AH->currentTE->dumpId,
1435                          AH->currentTE->catalogId.tableoid, AH->currentTE->catalogId.oid,
1436                           AH->currentTE->desc, AH->currentTE->tag, AH->currentTE->owner);
1437         }
1438         AH->lastErrorStage = AH->stage;
1439         AH->lastErrorTE = AH->currentTE;
1440
1441         va_start(ap, fmt);
1442         if (AH->public.exit_on_error)
1443                 _die_horribly(AH, modulename, fmt, ap);
1444         else
1445         {
1446                 _write_msg(modulename, fmt, ap);
1447                 AH->public.n_errors++;
1448         }
1449         va_end(ap);
1450 }
1451
1452 static void
1453 _moveAfter(ArchiveHandle *AH, TocEntry *pos, TocEntry *te)
1454 {
1455         te->prev->next = te->next;
1456         te->next->prev = te->prev;
1457
1458         te->prev = pos;
1459         te->next = pos->next;
1460
1461         pos->next->prev = te;
1462         pos->next = te;
1463 }
1464
1465 #ifdef NOT_USED
1466
1467 static void
1468 _moveBefore(ArchiveHandle *AH, TocEntry *pos, TocEntry *te)
1469 {
1470         te->prev->next = te->next;
1471         te->next->prev = te->prev;
1472
1473         te->prev = pos->prev;
1474         te->next = pos;
1475         pos->prev->next = te;
1476         pos->prev = te;
1477 }
1478 #endif
1479
1480 static TocEntry *
1481 getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
1482 {
1483         TocEntry   *te;
1484
1485         for (te = AH->toc->next; te != AH->toc; te = te->next)
1486         {
1487                 if (te->dumpId == id)
1488                         return te;
1489         }
1490         return NULL;
1491 }
1492
1493 teReqs
1494 TocIDRequired(ArchiveHandle *AH, DumpId id, RestoreOptions *ropt)
1495 {
1496         TocEntry   *te = getTocEntryByDumpId(AH, id);
1497
1498         if (!te)
1499                 return 0;
1500
1501         return _tocEntryRequired(te, ropt, true);
1502 }
1503
1504 size_t
1505 WriteOffset(ArchiveHandle *AH, pgoff_t o, int wasSet)
1506 {
1507         int                     off;
1508
1509         /* Save the flag */
1510         (*AH->WriteBytePtr) (AH, wasSet);
1511
1512         /* Write out pgoff_t smallest byte first, prevents endian mismatch */
1513         for (off = 0; off < sizeof(pgoff_t); off++)
1514         {
1515                 (*AH->WriteBytePtr) (AH, o & 0xFF);
1516                 o >>= 8;
1517         }
1518         return sizeof(pgoff_t) + 1;
1519 }
1520
1521 int
1522 ReadOffset(ArchiveHandle *AH, pgoff_t * o)
1523 {
1524         int                     i;
1525         int                     off;
1526         int                     offsetFlg;
1527
1528         /* Initialize to zero */
1529         *o = 0;
1530
1531         /* Check for old version */
1532         if (AH->version < K_VERS_1_7)
1533         {
1534                 /* Prior versions wrote offsets using WriteInt */
1535                 i = ReadInt(AH);
1536                 /* -1 means not set */
1537                 if (i < 0)
1538                         return K_OFFSET_POS_NOT_SET;
1539                 else if (i == 0)
1540                         return K_OFFSET_NO_DATA;
1541
1542                 /* Cast to pgoff_t because it was written as an int. */
1543                 *o = (pgoff_t) i;
1544                 return K_OFFSET_POS_SET;
1545         }
1546
1547         /*
1548          * Read the flag indicating the state of the data pointer. Check if valid
1549          * and die if not.
1550          *
1551          * This used to be handled by a negative or zero pointer, now we use an
1552          * extra byte specifically for the state.
1553          */
1554         offsetFlg = (*AH->ReadBytePtr) (AH) & 0xFF;
1555
1556         switch (offsetFlg)
1557         {
1558                 case K_OFFSET_POS_NOT_SET:
1559                 case K_OFFSET_NO_DATA:
1560                 case K_OFFSET_POS_SET:
1561
1562                         break;
1563
1564                 default:
1565                         die_horribly(AH, modulename, "unexpected data offset flag %d\n", offsetFlg);
1566         }
1567
1568         /*
1569          * Read the bytes
1570          */
1571         for (off = 0; off < AH->offSize; off++)
1572         {
1573                 if (off < sizeof(pgoff_t))
1574                         *o |= ((pgoff_t) ((*AH->ReadBytePtr) (AH))) << (off * 8);
1575                 else
1576                 {
1577                         if ((*AH->ReadBytePtr) (AH) != 0)
1578                                 die_horribly(AH, modulename, "file offset in dump file is too large\n");
1579                 }
1580         }
1581
1582         return offsetFlg;
1583 }
1584
1585 size_t
1586 WriteInt(ArchiveHandle *AH, int i)
1587 {
1588         int                     b;
1589
1590         /*
1591          * This is a bit yucky, but I don't want to make the binary format very
1592          * dependent on representation, and not knowing much about it, I write out
1593          * a sign byte. If you change this, don't forget to change the file
1594          * version #, and modify readInt to read the new format AS WELL AS the old
1595          * formats.
1596          */
1597
1598         /* SIGN byte */
1599         if (i < 0)
1600         {
1601                 (*AH->WriteBytePtr) (AH, 1);
1602                 i = -i;
1603         }
1604         else
1605                 (*AH->WriteBytePtr) (AH, 0);
1606
1607         for (b = 0; b < AH->intSize; b++)
1608         {
1609                 (*AH->WriteBytePtr) (AH, i & 0xFF);
1610                 i >>= 8;
1611         }
1612
1613         return AH->intSize + 1;
1614 }
1615
1616 int
1617 ReadInt(ArchiveHandle *AH)
1618 {
1619         int                     res = 0;
1620         int                     bv,
1621                                 b;
1622         int                     sign = 0;               /* Default positive */
1623         int                     bitShift = 0;
1624
1625         if (AH->version > K_VERS_1_0)
1626                 /* Read a sign byte */
1627                 sign = (*AH->ReadBytePtr) (AH);
1628
1629         for (b = 0; b < AH->intSize; b++)
1630         {
1631                 bv = (*AH->ReadBytePtr) (AH) & 0xFF;
1632                 if (bv != 0)
1633                         res = res + (bv << bitShift);
1634                 bitShift += 8;
1635         }
1636
1637         if (sign)
1638                 res = -res;
1639
1640         return res;
1641 }
1642
1643 size_t
1644 WriteStr(ArchiveHandle *AH, const char *c)
1645 {
1646         size_t          res;
1647
1648         if (c)
1649         {
1650                 res = WriteInt(AH, strlen(c));
1651                 res += (*AH->WriteBufPtr) (AH, c, strlen(c));
1652         }
1653         else
1654                 res = WriteInt(AH, -1);
1655
1656         return res;
1657 }
1658
1659 char *
1660 ReadStr(ArchiveHandle *AH)
1661 {
1662         char       *buf;
1663         int                     l;
1664
1665         l = ReadInt(AH);
1666         if (l < 0)
1667                 buf = NULL;
1668         else
1669         {
1670                 buf = (char *) malloc(l + 1);
1671                 if (!buf)
1672                         die_horribly(AH, modulename, "out of memory\n");
1673
1674                 if ((*AH->ReadBufPtr) (AH, (void *) buf, l) != l)
1675                         die_horribly(AH, modulename, "unexpected end of file\n");
1676
1677                 buf[l] = '\0';
1678         }
1679
1680         return buf;
1681 }
1682
1683 static int
1684 _discoverArchiveFormat(ArchiveHandle *AH)
1685 {
1686         FILE       *fh;
1687         char            sig[6];                 /* More than enough */
1688         size_t          cnt;
1689         int                     wantClose = 0;
1690
1691 #if 0
1692         write_msg(modulename, "attempting to ascertain archive format\n");
1693 #endif
1694
1695         if (AH->lookahead)
1696                 free(AH->lookahead);
1697
1698         AH->lookaheadSize = 512;
1699         AH->lookahead = calloc(1, 512);
1700         AH->lookaheadLen = 0;
1701         AH->lookaheadPos = 0;
1702
1703         if (AH->fSpec)
1704         {
1705                 wantClose = 1;
1706                 fh = fopen(AH->fSpec, PG_BINARY_R);
1707                 if (!fh)
1708                         die_horribly(AH, modulename, "could not open input file \"%s\": %s\n",
1709                                                  AH->fSpec, strerror(errno));
1710         }
1711         else
1712         {
1713                 fh = stdin;
1714                 if (!fh)
1715                         die_horribly(AH, modulename, "could not open input file: %s\n",
1716                                                  strerror(errno));
1717         }
1718
1719         cnt = fread(sig, 1, 5, fh);
1720
1721         if (cnt != 5)
1722         {
1723                 if (ferror(fh))
1724                         die_horribly(AH, modulename, "could not read input file: %s\n", strerror(errno));
1725                 else
1726                         die_horribly(AH, modulename, "input file is too short (read %lu, expected 5)\n",
1727                                                  (unsigned long) cnt);
1728         }
1729
1730         /* Save it, just in case we need it later */
1731         strncpy(&AH->lookahead[0], sig, 5);
1732         AH->lookaheadLen = 5;
1733
1734         if (strncmp(sig, "PGDMP", 5) == 0)
1735         {
1736                 AH->vmaj = fgetc(fh);
1737                 AH->vmin = fgetc(fh);
1738
1739                 /* Save these too... */
1740                 AH->lookahead[AH->lookaheadLen++] = AH->vmaj;
1741                 AH->lookahead[AH->lookaheadLen++] = AH->vmin;
1742
1743                 /* Check header version; varies from V1.0 */
1744                 if (AH->vmaj > 1 || ((AH->vmaj == 1) && (AH->vmin > 0)))                /* Version > 1.0 */
1745                 {
1746                         AH->vrev = fgetc(fh);
1747                         AH->lookahead[AH->lookaheadLen++] = AH->vrev;
1748                 }
1749                 else
1750                         AH->vrev = 0;
1751
1752                 /* Make a convenient integer <maj><min><rev>00 */
1753                 AH->version = ((AH->vmaj * 256 + AH->vmin) * 256 + AH->vrev) * 256 + 0;
1754
1755                 AH->intSize = fgetc(fh);
1756                 AH->lookahead[AH->lookaheadLen++] = AH->intSize;
1757
1758                 if (AH->version >= K_VERS_1_7)
1759                 {
1760                         AH->offSize = fgetc(fh);
1761                         AH->lookahead[AH->lookaheadLen++] = AH->offSize;
1762                 }
1763                 else
1764                         AH->offSize = AH->intSize;
1765
1766                 AH->format = fgetc(fh);
1767                 AH->lookahead[AH->lookaheadLen++] = AH->format;
1768         }
1769         else
1770         {
1771                 /*
1772                  * *Maybe* we have a tar archive format file... So, read first 512
1773                  * byte header...
1774                  */
1775                 cnt = fread(&AH->lookahead[AH->lookaheadLen], 1, 512 - AH->lookaheadLen, fh);
1776                 AH->lookaheadLen += cnt;
1777
1778                 if (AH->lookaheadLen != 512)
1779                         die_horribly(AH, modulename, "input file does not appear to be a valid archive (too short?)\n");
1780
1781                 if (!isValidTarHeader(AH->lookahead))
1782                         die_horribly(AH, modulename, "input file does not appear to be a valid archive\n");
1783
1784                 AH->format = archTar;
1785         }
1786
1787         /* If we can't seek, then mark the header as read */
1788         if (fseeko(fh, 0, SEEK_SET) != 0)
1789         {
1790                 /*
1791                  * NOTE: Formats that use the lookahead buffer can unset this in their
1792                  * Init routine.
1793                  */
1794                 AH->readHeader = 1;
1795         }
1796         else
1797                 AH->lookaheadLen = 0;   /* Don't bother since we've reset the file */
1798
1799         /* Close the file */
1800         if (wantClose)
1801                 if (fclose(fh) != 0)
1802                         die_horribly(AH, modulename, "could not close input file: %s\n",
1803                                                  strerror(errno));
1804
1805         return AH->format;
1806 }
1807
1808
1809 /*
1810  * Allocate an archive handle
1811  */
1812 static ArchiveHandle *
1813 _allocAH(const char *FileSpec, const ArchiveFormat fmt,
1814                  const int compression, ArchiveMode mode)
1815 {
1816         ArchiveHandle *AH;
1817
1818 #if 0
1819         write_msg(modulename, "allocating AH for %s, format %d\n", FileSpec, fmt);
1820 #endif
1821
1822         AH = (ArchiveHandle *) calloc(1, sizeof(ArchiveHandle));
1823         if (!AH)
1824                 die_horribly(AH, modulename, "out of memory\n");
1825
1826         /* AH->debugLevel = 100; */
1827
1828         AH->vmaj = K_VERS_MAJOR;
1829         AH->vmin = K_VERS_MINOR;
1830         AH->vrev = K_VERS_REV;
1831
1832         /* initialize for backwards compatible string processing */
1833         AH->public.encoding = 0;        /* PG_SQL_ASCII */
1834         AH->public.std_strings = false;
1835
1836         /* sql error handling */
1837         AH->public.exit_on_error = true;
1838         AH->public.n_errors = 0;
1839
1840         AH->createDate = time(NULL);
1841
1842         AH->intSize = sizeof(int);
1843         AH->offSize = sizeof(pgoff_t);
1844         if (FileSpec)
1845         {
1846                 AH->fSpec = strdup(FileSpec);
1847
1848                 /*
1849                  * Not used; maybe later....
1850                  *
1851                  * AH->workDir = strdup(FileSpec); for(i=strlen(FileSpec) ; i > 0 ;
1852                  * i--) if (AH->workDir[i-1] == '/')
1853                  */
1854         }
1855         else
1856                 AH->fSpec = NULL;
1857
1858         AH->currUser = NULL;            /* unknown */
1859         AH->currSchema = NULL;          /* ditto */
1860         AH->currTablespace = NULL;      /* ditto */
1861         AH->currWithOids = -1;          /* force SET */
1862
1863         AH->toc = (TocEntry *) calloc(1, sizeof(TocEntry));
1864         if (!AH->toc)
1865                 die_horribly(AH, modulename, "out of memory\n");
1866
1867         AH->toc->next = AH->toc;
1868         AH->toc->prev = AH->toc;
1869
1870         AH->mode = mode;
1871         AH->compression = compression;
1872
1873         AH->pgCopyBuf = createPQExpBuffer();
1874         AH->sqlBuf = createPQExpBuffer();
1875
1876         /* Open stdout with no compression for AH output handle */
1877         AH->gzOut = 0;
1878         AH->OF = stdout;
1879
1880         /*
1881          * On Windows, we need to use binary mode to read/write non-text archive
1882          * formats.  Force stdin/stdout into binary mode if that is what we are
1883          * using.
1884          */
1885 #ifdef WIN32
1886         if (fmt != archNull &&
1887                 (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0))
1888         {
1889                 if (mode == archModeWrite)
1890                         setmode(fileno(stdout), O_BINARY);
1891                 else
1892                         setmode(fileno(stdin), O_BINARY);
1893         }
1894 #endif
1895
1896         if (fmt == archUnknown)
1897                 AH->format = _discoverArchiveFormat(AH);
1898         else
1899                 AH->format = fmt;
1900
1901         AH->promptPassword = TRI_DEFAULT;
1902
1903         switch (AH->format)
1904         {
1905                 case archCustom:
1906                         InitArchiveFmt_Custom(AH);
1907                         break;
1908
1909                 case archFiles:
1910                         InitArchiveFmt_Files(AH);
1911                         break;
1912
1913                 case archNull:
1914                         InitArchiveFmt_Null(AH);
1915                         break;
1916
1917                 case archTar:
1918                         InitArchiveFmt_Tar(AH);
1919                         break;
1920
1921                 default:
1922                         die_horribly(AH, modulename, "unrecognized file format \"%d\"\n", fmt);
1923         }
1924
1925         return AH;
1926 }
1927
1928
1929 void
1930 WriteDataChunks(ArchiveHandle *AH)
1931 {
1932         TocEntry   *te;
1933         StartDataPtr startPtr;
1934         EndDataPtr      endPtr;
1935
1936         for (te = AH->toc->next; te != AH->toc; te = te->next)
1937         {
1938                 if (te->dataDumper != NULL)
1939                 {
1940                         AH->currToc = te;
1941                         /* printf("Writing data for %d (%x)\n", te->id, te); */
1942
1943                         if (strcmp(te->desc, "BLOBS") == 0)
1944                         {
1945                                 startPtr = AH->StartBlobsPtr;
1946                                 endPtr = AH->EndBlobsPtr;
1947                         }
1948                         else
1949                         {
1950                                 startPtr = AH->StartDataPtr;
1951                                 endPtr = AH->EndDataPtr;
1952                         }
1953
1954                         if (startPtr != NULL)
1955                                 (*startPtr) (AH, te);
1956
1957                         /*
1958                          * printf("Dumper arg for %d is %x\n", te->id, te->dataDumperArg);
1959                          */
1960
1961                         /*
1962                          * The user-provided DataDumper routine needs to call
1963                          * AH->WriteData
1964                          */
1965                         (*te->dataDumper) ((Archive *) AH, te->dataDumperArg);
1966
1967                         if (endPtr != NULL)
1968                                 (*endPtr) (AH, te);
1969                         AH->currToc = NULL;
1970                 }
1971         }
1972 }
1973
1974 void
1975 WriteToc(ArchiveHandle *AH)
1976 {
1977         TocEntry   *te;
1978         char            workbuf[32];
1979         int                     i;
1980
1981         /* printf("%d TOC Entries to save\n", AH->tocCount); */
1982
1983         WriteInt(AH, AH->tocCount);
1984
1985         for (te = AH->toc->next; te != AH->toc; te = te->next)
1986         {
1987                 WriteInt(AH, te->dumpId);
1988                 WriteInt(AH, te->dataDumper ? 1 : 0);
1989
1990                 /* OID is recorded as a string for historical reasons */
1991                 sprintf(workbuf, "%u", te->catalogId.tableoid);
1992                 WriteStr(AH, workbuf);
1993                 sprintf(workbuf, "%u", te->catalogId.oid);
1994                 WriteStr(AH, workbuf);
1995
1996                 WriteStr(AH, te->tag);
1997                 WriteStr(AH, te->desc);
1998                 WriteInt(AH, te->section);
1999                 WriteStr(AH, te->defn);
2000                 WriteStr(AH, te->dropStmt);
2001                 WriteStr(AH, te->copyStmt);
2002                 WriteStr(AH, te->namespace);
2003                 WriteStr(AH, te->tablespace);
2004                 WriteStr(AH, te->owner);
2005                 WriteStr(AH, te->withOids ? "true" : "false");
2006
2007                 /* Dump list of dependencies */
2008                 for (i = 0; i < te->nDeps; i++)
2009                 {
2010                         sprintf(workbuf, "%d", te->dependencies[i]);
2011                         WriteStr(AH, workbuf);
2012                 }
2013                 WriteStr(AH, NULL);             /* Terminate List */
2014
2015                 if (AH->WriteExtraTocPtr)
2016                         (*AH->WriteExtraTocPtr) (AH, te);
2017         }
2018 }
2019
2020 void
2021 ReadToc(ArchiveHandle *AH)
2022 {
2023         int                     i;
2024         char       *tmp;
2025         DumpId     *deps;
2026         int                     depIdx;
2027         int                     depSize;
2028         TocEntry   *te;
2029
2030         AH->tocCount = ReadInt(AH);
2031         AH->maxDumpId = 0;
2032
2033         for (i = 0; i < AH->tocCount; i++)
2034         {
2035                 te = (TocEntry *) calloc(1, sizeof(TocEntry));
2036                 te->dumpId = ReadInt(AH);
2037
2038                 if (te->dumpId > AH->maxDumpId)
2039                         AH->maxDumpId = te->dumpId;
2040
2041                 /* Sanity check */
2042                 if (te->dumpId <= 0)
2043                         die_horribly(AH, modulename,
2044                                            "entry ID %d out of range -- perhaps a corrupt TOC\n",
2045                                                  te->dumpId);
2046
2047                 te->hadDumper = ReadInt(AH);
2048
2049                 if (AH->version >= K_VERS_1_8)
2050                 {
2051                         tmp = ReadStr(AH);
2052                         sscanf(tmp, "%u", &te->catalogId.tableoid);
2053                         free(tmp);
2054                 }
2055                 else
2056                         te->catalogId.tableoid = InvalidOid;
2057                 tmp = ReadStr(AH);
2058                 sscanf(tmp, "%u", &te->catalogId.oid);
2059                 free(tmp);
2060
2061                 te->tag = ReadStr(AH);
2062                 te->desc = ReadStr(AH);
2063
2064                 if (AH->version >= K_VERS_1_11)
2065                 {
2066                         te->section = ReadInt(AH);
2067                 }
2068                 else
2069                 {
2070                         /*
2071                          * rules for pre-8.4 archives wherein pg_dump hasn't classified
2072                          * the entries into sections
2073                          */
2074                         if (strcmp(te->desc, "COMMENT") == 0 ||
2075                                 strcmp(te->desc, "ACL") == 0 ||
2076                                 strcmp(te->desc, "DEFAULT ACL") == 0)
2077                                 te->section = SECTION_NONE;
2078                         else if (strcmp(te->desc, "TABLE DATA") == 0 ||
2079                                          strcmp(te->desc, "BLOBS") == 0 ||
2080                                          strcmp(te->desc, "BLOB COMMENTS") == 0)
2081                                 te->section = SECTION_DATA;
2082                         else if (strcmp(te->desc, "CONSTRAINT") == 0 ||
2083                                          strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
2084                                          strcmp(te->desc, "FK CONSTRAINT") == 0 ||
2085                                          strcmp(te->desc, "INDEX") == 0 ||
2086                                          strcmp(te->desc, "RULE") == 0 ||
2087                                          strcmp(te->desc, "TRIGGER") == 0)
2088                                 te->section = SECTION_POST_DATA;
2089                         else
2090                                 te->section = SECTION_PRE_DATA;
2091                 }
2092
2093                 te->defn = ReadStr(AH);
2094                 te->dropStmt = ReadStr(AH);
2095
2096                 if (AH->version >= K_VERS_1_3)
2097                         te->copyStmt = ReadStr(AH);
2098
2099                 if (AH->version >= K_VERS_1_6)
2100                         te->namespace = ReadStr(AH);
2101
2102                 if (AH->version >= K_VERS_1_10)
2103                         te->tablespace = ReadStr(AH);
2104
2105                 te->owner = ReadStr(AH);
2106                 if (AH->version >= K_VERS_1_9)
2107                 {
2108                         if (strcmp(ReadStr(AH), "true") == 0)
2109                                 te->withOids = true;
2110                         else
2111                                 te->withOids = false;
2112                 }
2113                 else
2114                         te->withOids = true;
2115
2116                 /* Read TOC entry dependencies */
2117                 if (AH->version >= K_VERS_1_5)
2118                 {
2119                         depSize = 100;
2120                         deps = (DumpId *) malloc(sizeof(DumpId) * depSize);
2121                         depIdx = 0;
2122                         for (;;)
2123                         {
2124                                 tmp = ReadStr(AH);
2125                                 if (!tmp)
2126                                         break;          /* end of list */
2127                                 if (depIdx >= depSize)
2128                                 {
2129                                         depSize *= 2;
2130                                         deps = (DumpId *) realloc(deps, sizeof(DumpId) * depSize);
2131                                 }
2132                                 sscanf(tmp, "%d", &deps[depIdx]);
2133                                 free(tmp);
2134                                 depIdx++;
2135                         }
2136
2137                         if (depIdx > 0)         /* We have a non-null entry */
2138                         {
2139                                 deps = (DumpId *) realloc(deps, sizeof(DumpId) * depIdx);
2140                                 te->dependencies = deps;
2141                                 te->nDeps = depIdx;
2142                         }
2143                         else
2144                         {
2145                                 free(deps);
2146                                 te->dependencies = NULL;
2147                                 te->nDeps = 0;
2148                         }
2149                 }
2150                 else
2151                 {
2152                         te->dependencies = NULL;
2153                         te->nDeps = 0;
2154                 }
2155
2156                 if (AH->ReadExtraTocPtr)
2157                         (*AH->ReadExtraTocPtr) (AH, te);
2158
2159                 ahlog(AH, 3, "read TOC entry %d (ID %d) for %s %s\n",
2160                           i, te->dumpId, te->desc, te->tag);
2161
2162                 /* link completed entry into TOC circular list */
2163                 te->prev = AH->toc->prev;
2164                 AH->toc->prev->next = te;
2165                 AH->toc->prev = te;
2166                 te->next = AH->toc;
2167
2168                 /* special processing immediately upon read for some items */
2169                 if (strcmp(te->desc, "ENCODING") == 0)
2170                         processEncodingEntry(AH, te);
2171                 else if (strcmp(te->desc, "STDSTRINGS") == 0)
2172                         processStdStringsEntry(AH, te);
2173         }
2174 }
2175
2176 static void
2177 processEncodingEntry(ArchiveHandle *AH, TocEntry *te)
2178 {
2179         /* te->defn should have the form SET client_encoding = 'foo'; */
2180         char       *defn = strdup(te->defn);
2181         char       *ptr1;
2182         char       *ptr2 = NULL;
2183         int                     encoding;
2184
2185         ptr1 = strchr(defn, '\'');
2186         if (ptr1)
2187                 ptr2 = strchr(++ptr1, '\'');
2188         if (ptr2)
2189         {
2190                 *ptr2 = '\0';
2191                 encoding = pg_char_to_encoding(ptr1);
2192                 if (encoding < 0)
2193                         die_horribly(AH, modulename, "unrecognized encoding \"%s\"\n",
2194                                                  ptr1);
2195                 AH->public.encoding = encoding;
2196         }
2197         else
2198                 die_horribly(AH, modulename, "invalid ENCODING item: %s\n",
2199                                          te->defn);
2200
2201         free(defn);
2202 }
2203
2204 static void
2205 processStdStringsEntry(ArchiveHandle *AH, TocEntry *te)
2206 {
2207         /* te->defn should have the form SET standard_conforming_strings = 'x'; */
2208         char       *ptr1;
2209
2210         ptr1 = strchr(te->defn, '\'');
2211         if (ptr1 && strncmp(ptr1, "'on'", 4) == 0)
2212                 AH->public.std_strings = true;
2213         else if (ptr1 && strncmp(ptr1, "'off'", 5) == 0)
2214                 AH->public.std_strings = false;
2215         else
2216                 die_horribly(AH, modulename, "invalid STDSTRINGS item: %s\n",
2217                                          te->defn);
2218 }
2219
2220 static teReqs
2221 _tocEntryRequired(TocEntry *te, RestoreOptions *ropt, bool include_acls)
2222 {
2223         teReqs          res = REQ_ALL;
2224
2225         /* ENCODING and STDSTRINGS items are dumped specially, so always reject */
2226         if (strcmp(te->desc, "ENCODING") == 0 ||
2227                 strcmp(te->desc, "STDSTRINGS") == 0)
2228                 return 0;
2229
2230         /* If it's an ACL, maybe ignore it */
2231         if ((!include_acls || ropt->aclsSkip) &&
2232                 (strcmp(te->desc, "ACL") == 0 || strcmp(te->desc, "DEFAULT ACL") == 0))
2233                 return 0;
2234
2235         if (!ropt->create && strcmp(te->desc, "DATABASE") == 0)
2236                 return 0;
2237
2238         /* Check options for selective dump/restore */
2239         if (ropt->schemaNames)
2240         {
2241                 /* If no namespace is specified, it means all. */
2242                 if (!te->namespace)
2243                         return 0;
2244                 if (strcmp(ropt->schemaNames, te->namespace) != 0)
2245                         return 0;
2246         }
2247
2248         if (ropt->selTypes)
2249         {
2250                 if (strcmp(te->desc, "TABLE") == 0 ||
2251                         strcmp(te->desc, "TABLE DATA") == 0)
2252                 {
2253                         if (!ropt->selTable)
2254                                 return 0;
2255                         if (ropt->tableNames && strcmp(ropt->tableNames, te->tag) != 0)
2256                                 return 0;
2257                 }
2258                 else if (strcmp(te->desc, "INDEX") == 0)
2259                 {
2260                         if (!ropt->selIndex)
2261                                 return 0;
2262                         if (ropt->indexNames && strcmp(ropt->indexNames, te->tag) != 0)
2263                                 return 0;
2264                 }
2265                 else if (strcmp(te->desc, "FUNCTION") == 0)
2266                 {
2267                         if (!ropt->selFunction)
2268                                 return 0;
2269                         if (ropt->functionNames && strcmp(ropt->functionNames, te->tag) != 0)
2270                                 return 0;
2271                 }
2272                 else if (strcmp(te->desc, "TRIGGER") == 0)
2273                 {
2274                         if (!ropt->selTrigger)
2275                                 return 0;
2276                         if (ropt->triggerNames && strcmp(ropt->triggerNames, te->tag) != 0)
2277                                 return 0;
2278                 }
2279                 else
2280                         return 0;
2281         }
2282
2283         /*
2284          * Check if we had a dataDumper. Indicates if the entry is schema or data
2285          */
2286         if (!te->hadDumper)
2287         {
2288                 /*
2289                  * Special Case: If 'SEQUENCE SET' then it is considered a data entry
2290                  */
2291                 if (strcmp(te->desc, "SEQUENCE SET") == 0)
2292                         res = res & REQ_DATA;
2293                 else
2294                         res = res & ~REQ_DATA;
2295         }
2296
2297         /*
2298          * Special case: <Init> type with <Max OID> tag; this is obsolete and we
2299          * always ignore it.
2300          */
2301         if ((strcmp(te->desc, "<Init>") == 0) && (strcmp(te->tag, "Max OID") == 0))
2302                 return 0;
2303
2304         /* Mask it if we only want schema */
2305         if (ropt->schemaOnly)
2306                 res = res & REQ_SCHEMA;
2307
2308         /* Mask it we only want data */
2309         if (ropt->dataOnly)
2310                 res = res & REQ_DATA;
2311
2312         /* Mask it if we don't have a schema contribution */
2313         if (!te->defn || strlen(te->defn) == 0)
2314                 res = res & ~REQ_SCHEMA;
2315
2316         /* Finally, if there's a per-ID filter, limit based on that as well */
2317         if (ropt->idWanted && !ropt->idWanted[te->dumpId - 1])
2318                 return 0;
2319
2320         return res;
2321 }
2322
2323 /*
2324  * Issue SET commands for parameters that we want to have set the same way
2325  * at all times during execution of a restore script.
2326  */
2327 static void
2328 _doSetFixedOutputState(ArchiveHandle *AH)
2329 {
2330         /* Disable statement_timeout in archive for pg_restore/psql  */
2331         ahprintf(AH, "SET statement_timeout = 0;\n");
2332
2333         /* Select the correct character set encoding */
2334         ahprintf(AH, "SET client_encoding = '%s';\n",
2335                          pg_encoding_to_char(AH->public.encoding));
2336
2337         /* Select the correct string literal syntax */
2338         ahprintf(AH, "SET standard_conforming_strings = %s;\n",
2339                          AH->public.std_strings ? "on" : "off");
2340
2341         /* Select the role to be used during restore */
2342         if (AH->ropt && AH->ropt->use_role)
2343                 ahprintf(AH, "SET ROLE %s;\n", fmtId(AH->ropt->use_role));
2344
2345         /* Make sure function checking is disabled */
2346         ahprintf(AH, "SET check_function_bodies = false;\n");
2347
2348         /* Avoid annoying notices etc */
2349         ahprintf(AH, "SET client_min_messages = warning;\n");
2350         if (!AH->public.std_strings)
2351                 ahprintf(AH, "SET escape_string_warning = off;\n");
2352
2353         ahprintf(AH, "\n");
2354 }
2355
2356 /*
2357  * Issue a SET SESSION AUTHORIZATION command.  Caller is responsible
2358  * for updating state if appropriate.  If user is NULL or an empty string,
2359  * the specification DEFAULT will be used.
2360  */
2361 static void
2362 _doSetSessionAuth(ArchiveHandle *AH, const char *user)
2363 {
2364         PQExpBuffer cmd = createPQExpBuffer();
2365
2366         appendPQExpBuffer(cmd, "SET SESSION AUTHORIZATION ");
2367
2368         /*
2369          * SQL requires a string literal here.  Might as well be correct.
2370          */
2371         if (user && *user)
2372                 appendStringLiteralAHX(cmd, user, AH);
2373         else
2374                 appendPQExpBuffer(cmd, "DEFAULT");
2375         appendPQExpBuffer(cmd, ";");
2376
2377         if (RestoringToDB(AH))
2378         {
2379                 PGresult   *res;
2380
2381                 res = PQexec(AH->connection, cmd->data);
2382
2383                 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
2384                         /* NOT warn_or_die_horribly... use -O instead to skip this. */
2385                         die_horribly(AH, modulename, "could not set session user to \"%s\": %s",
2386                                                  user, PQerrorMessage(AH->connection));
2387
2388                 PQclear(res);
2389         }
2390         else
2391                 ahprintf(AH, "%s\n\n", cmd->data);
2392
2393         destroyPQExpBuffer(cmd);
2394 }
2395
2396
2397 /*
2398  * Issue a SET default_with_oids command.  Caller is responsible
2399  * for updating state if appropriate.
2400  */
2401 static void
2402 _doSetWithOids(ArchiveHandle *AH, const bool withOids)
2403 {
2404         PQExpBuffer cmd = createPQExpBuffer();
2405
2406         appendPQExpBuffer(cmd, "SET default_with_oids = %s;", withOids ?
2407                                           "true" : "false");
2408
2409         if (RestoringToDB(AH))
2410         {
2411                 PGresult   *res;
2412
2413                 res = PQexec(AH->connection, cmd->data);
2414
2415                 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
2416                         warn_or_die_horribly(AH, modulename,
2417                                                                  "could not set default_with_oids: %s",
2418                                                                  PQerrorMessage(AH->connection));
2419
2420                 PQclear(res);
2421         }
2422         else
2423                 ahprintf(AH, "%s\n\n", cmd->data);
2424
2425         destroyPQExpBuffer(cmd);
2426 }
2427
2428
2429 /*
2430  * Issue the commands to connect to the specified database.
2431  *
2432  * If we're currently restoring right into a database, this will
2433  * actually establish a connection. Otherwise it puts a \connect into
2434  * the script output.
2435  *
2436  * NULL dbname implies reconnecting to the current DB (pretty useless).
2437  */
2438 static void
2439 _reconnectToDB(ArchiveHandle *AH, const char *dbname)
2440 {
2441         if (RestoringToDB(AH))
2442                 ReconnectToServer(AH, dbname, NULL);
2443         else
2444         {
2445                 PQExpBuffer qry = createPQExpBuffer();
2446
2447                 appendPQExpBuffer(qry, "\\connect %s\n\n",
2448                                                   dbname ? fmtId(dbname) : "-");
2449                 ahprintf(AH, "%s", qry->data);
2450                 destroyPQExpBuffer(qry);
2451         }
2452
2453         /*
2454          * NOTE: currUser keeps track of what the imaginary session user in our
2455          * script is.  It's now effectively reset to the original userID.
2456          */
2457         if (AH->currUser)
2458                 free(AH->currUser);
2459         AH->currUser = NULL;
2460
2461         /* don't assume we still know the output schema, tablespace, etc either */
2462         if (AH->currSchema)
2463                 free(AH->currSchema);
2464         AH->currSchema = NULL;
2465         if (AH->currTablespace)
2466                 free(AH->currTablespace);
2467         AH->currTablespace = NULL;
2468         AH->currWithOids = -1;
2469
2470         /* re-establish fixed state */
2471         _doSetFixedOutputState(AH);
2472 }
2473
2474 /*
2475  * Become the specified user, and update state to avoid redundant commands
2476  *
2477  * NULL or empty argument is taken to mean restoring the session default
2478  */
2479 static void
2480 _becomeUser(ArchiveHandle *AH, const char *user)
2481 {
2482         if (!user)
2483                 user = "";                              /* avoid null pointers */
2484
2485         if (AH->currUser && strcmp(AH->currUser, user) == 0)
2486                 return;                                 /* no need to do anything */
2487
2488         _doSetSessionAuth(AH, user);
2489
2490         /*
2491          * NOTE: currUser keeps track of what the imaginary session user in our
2492          * script is
2493          */
2494         if (AH->currUser)
2495                 free(AH->currUser);
2496         AH->currUser = strdup(user);
2497 }
2498
2499 /*
2500  * Become the owner of the the given TOC entry object.  If
2501  * changes in ownership are not allowed, this doesn't do anything.
2502  */
2503 static void
2504 _becomeOwner(ArchiveHandle *AH, TocEntry *te)
2505 {
2506         if (AH->ropt && (AH->ropt->noOwner || !AH->ropt->use_setsessauth))
2507                 return;
2508
2509         _becomeUser(AH, te->owner);
2510 }
2511
2512
2513 /*
2514  * Set the proper default_with_oids value for the table.
2515  */
2516 static void
2517 _setWithOids(ArchiveHandle *AH, TocEntry *te)
2518 {
2519         if (AH->currWithOids != te->withOids)
2520         {
2521                 _doSetWithOids(AH, te->withOids);
2522                 AH->currWithOids = te->withOids;
2523         }
2524 }
2525
2526
2527 /*
2528  * Issue the commands to select the specified schema as the current schema
2529  * in the target database.
2530  */
2531 static void
2532 _selectOutputSchema(ArchiveHandle *AH, const char *schemaName)
2533 {
2534         PQExpBuffer qry;
2535
2536         if (!schemaName || *schemaName == '\0' ||
2537                 (AH->currSchema && strcmp(AH->currSchema, schemaName) == 0))
2538                 return;                                 /* no need to do anything */
2539
2540         qry = createPQExpBuffer();
2541
2542         appendPQExpBuffer(qry, "SET search_path = %s",
2543                                           fmtId(schemaName));
2544         if (strcmp(schemaName, "pg_catalog") != 0)
2545                 appendPQExpBuffer(qry, ", pg_catalog");
2546
2547         if (RestoringToDB(AH))
2548         {
2549                 PGresult   *res;
2550
2551                 res = PQexec(AH->connection, qry->data);
2552
2553                 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
2554                         warn_or_die_horribly(AH, modulename,
2555                                                                  "could not set search_path to \"%s\": %s",
2556                                                                  schemaName, PQerrorMessage(AH->connection));
2557
2558                 PQclear(res);
2559         }
2560         else
2561                 ahprintf(AH, "%s;\n\n", qry->data);
2562
2563         if (AH->currSchema)
2564                 free(AH->currSchema);
2565         AH->currSchema = strdup(schemaName);
2566
2567         destroyPQExpBuffer(qry);
2568 }
2569
2570 /*
2571  * Issue the commands to select the specified tablespace as the current one
2572  * in the target database.
2573  */
2574 static void
2575 _selectTablespace(ArchiveHandle *AH, const char *tablespace)
2576 {
2577         PQExpBuffer qry;
2578         const char *want,
2579                            *have;
2580
2581         /* do nothing in --no-tablespaces mode */
2582         if (AH->ropt->noTablespace)
2583                 return;
2584
2585         have = AH->currTablespace;
2586         want = tablespace;
2587
2588         /* no need to do anything for non-tablespace object */
2589         if (!want)
2590                 return;
2591
2592         if (have && strcmp(want, have) == 0)
2593                 return;                                 /* no need to do anything */
2594
2595         qry = createPQExpBuffer();
2596
2597         if (strcmp(want, "") == 0)
2598         {
2599                 /* We want the tablespace to be the database's default */
2600                 appendPQExpBuffer(qry, "SET default_tablespace = ''");
2601         }
2602         else
2603         {
2604                 /* We want an explicit tablespace */
2605                 appendPQExpBuffer(qry, "SET default_tablespace = %s", fmtId(want));
2606         }
2607
2608         if (RestoringToDB(AH))
2609         {
2610                 PGresult   *res;
2611
2612                 res = PQexec(AH->connection, qry->data);
2613
2614                 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
2615                         warn_or_die_horribly(AH, modulename,
2616                                                                  "could not set default_tablespace to %s: %s",
2617                                                                  fmtId(want), PQerrorMessage(AH->connection));
2618
2619                 PQclear(res);
2620         }
2621         else
2622                 ahprintf(AH, "%s;\n\n", qry->data);
2623
2624         if (AH->currTablespace)
2625                 free(AH->currTablespace);
2626         AH->currTablespace = strdup(want);
2627
2628         destroyPQExpBuffer(qry);
2629 }
2630
2631 /*
2632  * Extract an object description for a TOC entry, and append it to buf.
2633  *
2634  * This is not quite as general as it may seem, since it really only
2635  * handles constructing the right thing to put into ALTER ... OWNER TO.
2636  *
2637  * The whole thing is pretty grotty, but we are kind of stuck since the
2638  * information used is all that's available in older dump files.
2639  */
2640 static void
2641 _getObjectDescription(PQExpBuffer buf, TocEntry *te, ArchiveHandle *AH)
2642 {
2643         const char *type = te->desc;
2644
2645         /* Use ALTER TABLE for views and sequences */
2646         if (strcmp(type, "VIEW") == 0 || strcmp(type, "SEQUENCE") == 0)
2647                 type = "TABLE";
2648
2649         /* objects named by a schema and name */
2650         if (strcmp(type, "CONVERSION") == 0 ||
2651                 strcmp(type, "DOMAIN") == 0 ||
2652                 strcmp(type, "TABLE") == 0 ||
2653                 strcmp(type, "TYPE") == 0 ||
2654                 strcmp(type, "TEXT SEARCH DICTIONARY") == 0 ||
2655                 strcmp(type, "TEXT SEARCH CONFIGURATION") == 0)
2656         {
2657                 appendPQExpBuffer(buf, "%s ", type);
2658                 if (te->namespace && te->namespace[0])  /* is null pre-7.3 */
2659                         appendPQExpBuffer(buf, "%s.", fmtId(te->namespace));
2660
2661                 /*
2662                  * Pre-7.3 pg_dump would sometimes (not always) put a fmtId'd name
2663                  * into te->tag for an index. This check is heuristic, so make its
2664                  * scope as narrow as possible.
2665                  */
2666                 if (AH->version < K_VERS_1_7 &&
2667                         te->tag[0] == '"' &&
2668                         te->tag[strlen(te->tag) - 1] == '"' &&
2669                         strcmp(type, "INDEX") == 0)
2670                         appendPQExpBuffer(buf, "%s", te->tag);
2671                 else
2672                         appendPQExpBuffer(buf, "%s", fmtId(te->tag));
2673                 return;
2674         }
2675
2676         /* objects named by just a name */
2677         if (strcmp(type, "DATABASE") == 0 ||
2678                 strcmp(type, "PROCEDURAL LANGUAGE") == 0 ||
2679                 strcmp(type, "SCHEMA") == 0 ||
2680                 strcmp(type, "FOREIGN DATA WRAPPER") == 0 ||
2681                 strcmp(type, "SERVER") == 0 ||
2682                 strcmp(type, "USER MAPPING") == 0)
2683         {
2684                 appendPQExpBuffer(buf, "%s %s", type, fmtId(te->tag));
2685                 return;
2686         }
2687
2688         /*
2689          * These object types require additional decoration.  Fortunately, the
2690          * information needed is exactly what's in the DROP command.
2691          */
2692         if (strcmp(type, "AGGREGATE") == 0 ||
2693                 strcmp(type, "FUNCTION") == 0 ||
2694                 strcmp(type, "OPERATOR") == 0 ||
2695                 strcmp(type, "OPERATOR CLASS") == 0 ||
2696                 strcmp(type, "OPERATOR FAMILY") == 0)
2697         {
2698                 /* Chop "DROP " off the front and make a modifiable copy */
2699                 char       *first = strdup(te->dropStmt + 5);
2700                 char       *last;
2701
2702                 /* point to last character in string */
2703                 last = first + strlen(first) - 1;
2704
2705                 /* Strip off any ';' or '\n' at the end */
2706                 while (last >= first && (*last == '\n' || *last == ';'))
2707                         last--;
2708                 *(last + 1) = '\0';
2709
2710                 appendPQExpBufferStr(buf, first);
2711
2712                 free(first);
2713                 return;
2714         }
2715
2716         write_msg(modulename, "WARNING: don't know how to set owner for object type %s\n",
2717                           type);
2718 }
2719
2720 static void
2721 _printTocEntry(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt, bool isData, bool acl_pass)
2722 {
2723         /* ACLs are dumped only during acl pass */
2724         if (acl_pass)
2725         {
2726                 if (!(strcmp(te->desc, "ACL") == 0 ||
2727                           strcmp(te->desc, "DEFAULT ACL") == 0))
2728                         return;
2729         }
2730         else
2731         {
2732                 if (strcmp(te->desc, "ACL") == 0 ||
2733                         strcmp(te->desc, "DEFAULT ACL") == 0)
2734                         return;
2735         }
2736
2737         /*
2738          * Avoid dumping the public schema, as it will already be created ...
2739          * unless we are using --clean mode, in which case it's been deleted and
2740          * we'd better recreate it.  Likewise for its comment, if any.
2741          */
2742         if (!ropt->dropSchema)
2743         {
2744                 if (strcmp(te->desc, "SCHEMA") == 0 &&
2745                         strcmp(te->tag, "public") == 0)
2746                         return;
2747                 /* The comment restore would require super-user privs, so avoid it. */
2748                 if (strcmp(te->desc, "COMMENT") == 0 &&
2749                         strcmp(te->tag, "SCHEMA public") == 0)
2750                         return;
2751         }
2752
2753         /* Select owner, schema, and tablespace as necessary */
2754         _becomeOwner(AH, te);
2755         _selectOutputSchema(AH, te->namespace);
2756         _selectTablespace(AH, te->tablespace);
2757
2758         /* Set up OID mode too */
2759         if (strcmp(te->desc, "TABLE") == 0)
2760                 _setWithOids(AH, te);
2761
2762         /* Emit header comment for item */
2763         if (!AH->noTocComments)
2764         {
2765                 const char *pfx;
2766
2767                 if (isData)
2768                         pfx = "Data for ";
2769                 else
2770                         pfx = "";
2771
2772                 ahprintf(AH, "--\n");
2773                 if (AH->public.verbose)
2774                 {
2775                         ahprintf(AH, "-- TOC entry %d (class %u OID %u)\n",
2776                                          te->dumpId, te->catalogId.tableoid, te->catalogId.oid);
2777                         if (te->nDeps > 0)
2778                         {
2779                                 int                     i;
2780
2781                                 ahprintf(AH, "-- Dependencies:");
2782                                 for (i = 0; i < te->nDeps; i++)
2783                                         ahprintf(AH, " %d", te->dependencies[i]);
2784                                 ahprintf(AH, "\n");
2785                         }
2786                 }
2787                 ahprintf(AH, "-- %sName: %s; Type: %s; Schema: %s; Owner: %s",
2788                                  pfx, te->tag, te->desc,
2789                                  te->namespace ? te->namespace : "-",
2790                                  ropt->noOwner ? "-" : te->owner);
2791                 if (te->tablespace && !ropt->noTablespace)
2792                         ahprintf(AH, "; Tablespace: %s", te->tablespace);
2793                 ahprintf(AH, "\n");
2794
2795                 if (AH->PrintExtraTocPtr !=NULL)
2796                         (*AH->PrintExtraTocPtr) (AH, te);
2797                 ahprintf(AH, "--\n\n");
2798         }
2799
2800         /*
2801          * Actually print the definition.
2802          *
2803          * Really crude hack for suppressing AUTHORIZATION clause that old pg_dump
2804          * versions put into CREATE SCHEMA.  We have to do this when --no-owner
2805          * mode is selected.  This is ugly, but I see no other good way ...
2806          */
2807         if (ropt->noOwner && strcmp(te->desc, "SCHEMA") == 0)
2808         {
2809                 ahprintf(AH, "CREATE SCHEMA %s;\n\n\n", fmtId(te->tag));
2810         }
2811         else
2812         {
2813                 if (strlen(te->defn) > 0)
2814                         ahprintf(AH, "%s\n\n", te->defn);
2815         }
2816
2817         /*
2818          * If we aren't using SET SESSION AUTH to determine ownership, we must
2819          * instead issue an ALTER OWNER command.  We assume that anything without
2820          * a DROP command is not a separately ownable object.  All the categories
2821          * with DROP commands must appear in one list or the other.
2822          */
2823         if (!ropt->noOwner && !ropt->use_setsessauth &&
2824                 strlen(te->owner) > 0 && strlen(te->dropStmt) > 0)
2825         {
2826                 if (strcmp(te->desc, "AGGREGATE") == 0 ||
2827                         strcmp(te->desc, "CONVERSION") == 0 ||
2828                         strcmp(te->desc, "DATABASE") == 0 ||
2829                         strcmp(te->desc, "DOMAIN") == 0 ||
2830                         strcmp(te->desc, "FUNCTION") == 0 ||
2831                         strcmp(te->desc, "OPERATOR") == 0 ||
2832                         strcmp(te->desc, "OPERATOR CLASS") == 0 ||
2833                         strcmp(te->desc, "OPERATOR FAMILY") == 0 ||
2834                         strcmp(te->desc, "PROCEDURAL LANGUAGE") == 0 ||
2835                         strcmp(te->desc, "SCHEMA") == 0 ||
2836                         strcmp(te->desc, "TABLE") == 0 ||
2837                         strcmp(te->desc, "TYPE") == 0 ||
2838                         strcmp(te->desc, "VIEW") == 0 ||
2839                         strcmp(te->desc, "SEQUENCE") == 0 ||
2840                         strcmp(te->desc, "TEXT SEARCH DICTIONARY") == 0 ||
2841                         strcmp(te->desc, "TEXT SEARCH CONFIGURATION") == 0 ||
2842                         strcmp(te->desc, "FOREIGN DATA WRAPPER") == 0 ||
2843                         strcmp(te->desc, "SERVER") == 0)
2844                 {
2845                         PQExpBuffer temp = createPQExpBuffer();
2846
2847                         appendPQExpBuffer(temp, "ALTER ");
2848                         _getObjectDescription(temp, te, AH);
2849                         appendPQExpBuffer(temp, " OWNER TO %s;", fmtId(te->owner));
2850                         ahprintf(AH, "%s\n\n", temp->data);
2851                         destroyPQExpBuffer(temp);
2852                 }
2853                 else if (strcmp(te->desc, "CAST") == 0 ||
2854                                  strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
2855                                  strcmp(te->desc, "CONSTRAINT") == 0 ||
2856                                  strcmp(te->desc, "DEFAULT") == 0 ||
2857                                  strcmp(te->desc, "FK CONSTRAINT") == 0 ||
2858                                  strcmp(te->desc, "INDEX") == 0 ||
2859                                  strcmp(te->desc, "RULE") == 0 ||
2860                                  strcmp(te->desc, "TRIGGER") == 0 ||
2861                                  strcmp(te->desc, "USER MAPPING") == 0)
2862                 {
2863                         /* these object types don't have separate owners */
2864                 }
2865                 else
2866                 {
2867                         write_msg(modulename, "WARNING: don't know how to set owner for object type %s\n",
2868                                           te->desc);
2869                 }
2870         }
2871
2872         /*
2873          * If it's an ACL entry, it might contain SET SESSION AUTHORIZATION
2874          * commands, so we can no longer assume we know the current auth setting.
2875          */
2876         if (strncmp(te->desc, "ACL", 3) == 0)
2877         {
2878                 if (AH->currUser)
2879                         free(AH->currUser);
2880                 AH->currUser = NULL;
2881         }
2882 }
2883
2884 void
2885 WriteHead(ArchiveHandle *AH)
2886 {
2887         struct tm       crtm;
2888
2889         (*AH->WriteBufPtr) (AH, "PGDMP", 5);            /* Magic code */
2890         (*AH->WriteBytePtr) (AH, AH->vmaj);
2891         (*AH->WriteBytePtr) (AH, AH->vmin);
2892         (*AH->WriteBytePtr) (AH, AH->vrev);
2893         (*AH->WriteBytePtr) (AH, AH->intSize);
2894         (*AH->WriteBytePtr) (AH, AH->offSize);
2895         (*AH->WriteBytePtr) (AH, AH->format);
2896
2897 #ifndef HAVE_LIBZ
2898         if (AH->compression != 0)
2899                 write_msg(modulename, "WARNING: requested compression not available in this "
2900                                   "installation -- archive will be uncompressed\n");
2901
2902         AH->compression = 0;
2903 #endif
2904
2905         WriteInt(AH, AH->compression);
2906
2907         crtm = *localtime(&AH->createDate);
2908         WriteInt(AH, crtm.tm_sec);
2909         WriteInt(AH, crtm.tm_min);
2910         WriteInt(AH, crtm.tm_hour);
2911         WriteInt(AH, crtm.tm_mday);
2912         WriteInt(AH, crtm.tm_mon);
2913         WriteInt(AH, crtm.tm_year);
2914         WriteInt(AH, crtm.tm_isdst);
2915         WriteStr(AH, PQdb(AH->connection));
2916         WriteStr(AH, AH->public.remoteVersionStr);
2917         WriteStr(AH, PG_VERSION);
2918 }
2919
2920 void
2921 ReadHead(ArchiveHandle *AH)
2922 {
2923         char            tmpMag[7];
2924         int                     fmt;
2925         struct tm       crtm;
2926
2927         /* If we haven't already read the header... */
2928         if (!AH->readHeader)
2929         {
2930                 if ((*AH->ReadBufPtr) (AH, tmpMag, 5) != 5)
2931                         die_horribly(AH, modulename, "unexpected end of file\n");
2932
2933                 if (strncmp(tmpMag, "PGDMP", 5) != 0)
2934                         die_horribly(AH, modulename, "did not find magic string in file header\n");
2935
2936                 AH->vmaj = (*AH->ReadBytePtr) (AH);
2937                 AH->vmin = (*AH->ReadBytePtr) (AH);
2938
2939                 if (AH->vmaj > 1 || ((AH->vmaj == 1) && (AH->vmin > 0)))                /* Version > 1.0 */
2940                         AH->vrev = (*AH->ReadBytePtr) (AH);
2941                 else
2942                         AH->vrev = 0;
2943
2944                 AH->version = ((AH->vmaj * 256 + AH->vmin) * 256 + AH->vrev) * 256 + 0;
2945
2946
2947                 if (AH->version < K_VERS_1_0 || AH->version > K_VERS_MAX)
2948                         die_horribly(AH, modulename, "unsupported version (%d.%d) in file header\n",
2949                                                  AH->vmaj, AH->vmin);
2950
2951                 AH->intSize = (*AH->ReadBytePtr) (AH);
2952                 if (AH->intSize > 32)
2953                         die_horribly(AH, modulename, "sanity check on integer size (%lu) failed\n",
2954                                                  (unsigned long) AH->intSize);
2955
2956                 if (AH->intSize > sizeof(int))
2957                         write_msg(modulename, "WARNING: archive was made on a machine with larger integers, some operations might fail\n");
2958
2959                 if (AH->version >= K_VERS_1_7)
2960                         AH->offSize = (*AH->ReadBytePtr) (AH);
2961                 else
2962                         AH->offSize = AH->intSize;
2963
2964                 fmt = (*AH->ReadBytePtr) (AH);
2965
2966                 if (AH->format != fmt)
2967                         die_horribly(AH, modulename, "expected format (%d) differs from format found in file (%d)\n",
2968                                                  AH->format, fmt);
2969         }
2970
2971         if (AH->version >= K_VERS_1_2)
2972         {
2973                 if (AH->version < K_VERS_1_4)
2974                         AH->compression = (*AH->ReadBytePtr) (AH);
2975                 else
2976                         AH->compression = ReadInt(AH);
2977         }
2978         else
2979                 AH->compression = Z_DEFAULT_COMPRESSION;
2980
2981 #ifndef HAVE_LIBZ
2982         if (AH->compression != 0)
2983                 write_msg(modulename, "WARNING: archive is compressed, but this installation does not support compression -- no data will be available\n");
2984 #endif
2985
2986         if (AH->version >= K_VERS_1_4)
2987         {
2988                 crtm.tm_sec = ReadInt(AH);
2989                 crtm.tm_min = ReadInt(AH);
2990                 crtm.tm_hour = ReadInt(AH);
2991                 crtm.tm_mday = ReadInt(AH);
2992                 crtm.tm_mon = ReadInt(AH);
2993                 crtm.tm_year = ReadInt(AH);
2994                 crtm.tm_isdst = ReadInt(AH);
2995
2996                 AH->archdbname = ReadStr(AH);
2997
2998                 AH->createDate = mktime(&crtm);
2999
3000                 if (AH->createDate == (time_t) -1)
3001                         write_msg(modulename, "WARNING: invalid creation date in header\n");
3002         }
3003
3004         if (AH->version >= K_VERS_1_10)
3005         {
3006                 AH->archiveRemoteVersion = ReadStr(AH);
3007                 AH->archiveDumpVersion = ReadStr(AH);
3008         }
3009
3010 }
3011
3012
3013 /*
3014  * checkSeek
3015  *        check to see if fseek can be performed.
3016  */
3017 bool
3018 checkSeek(FILE *fp)
3019 {
3020         if (fseeko(fp, 0, SEEK_CUR) != 0)
3021                 return false;
3022         else if (sizeof(pgoff_t) > sizeof(long))
3023         {
3024                 /*
3025                  * At this point, pgoff_t is too large for long, so we return based on
3026                  * whether an pgoff_t version of fseek is available.
3027                  */
3028 #ifdef HAVE_FSEEKO
3029                 return true;
3030 #else
3031                 return false;
3032 #endif
3033         }
3034         else
3035                 return true;
3036 }
3037
3038
3039 /*
3040  * dumpTimestamp
3041  */
3042 static void
3043 dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim)
3044 {
3045         char            buf[256];
3046
3047         /*
3048          * We don't print the timezone on Win32, because the names are long and
3049          * localized, which means they may contain characters in various random
3050          * encodings; this has been seen to cause encoding errors when reading the
3051          * dump script.
3052          */
3053         if (strftime(buf, sizeof(buf),
3054 #ifndef WIN32
3055                                  "%Y-%m-%d %H:%M:%S %Z",
3056 #else
3057                                  "%Y-%m-%d %H:%M:%S",
3058 #endif
3059                                  localtime(&tim)) != 0)
3060                 ahprintf(AH, "-- %s %s\n\n", msg, buf);
3061 }
3062
3063
3064 /*
3065  * Main engine for parallel restore.
3066  *
3067  * Work is done in three phases.
3068  * First we process tocEntries until we come to one that is marked
3069  * SECTION_DATA or SECTION_POST_DATA, in a single connection, just as for a
3070  * standard restore.  Second we process the remaining non-ACL steps in
3071  * parallel worker children (threads on Windows, processes on Unix), each of
3072  * which connects separately to the database.  Finally we process all the ACL
3073  * entries in a single connection (that happens back in RestoreArchive).
3074  */
3075 static void
3076 restore_toc_entries_parallel(ArchiveHandle *AH)
3077 {
3078         RestoreOptions *ropt = AH->ropt;
3079         int                     n_slots = ropt->number_of_jobs;
3080         ParallelSlot *slots;
3081         int                     work_status;
3082         int                     next_slot;
3083         TocEntry        pending_list;
3084         TocEntry        ready_list;
3085         TocEntry   *next_work_item;
3086         thandle         ret_child;
3087         TocEntry   *te;
3088
3089         ahlog(AH, 2, "entering restore_toc_entries_parallel\n");
3090
3091         /* we haven't got round to making this work for all archive formats */
3092         if (AH->ClonePtr == NULL || AH->ReopenPtr == NULL)
3093                 die_horribly(AH, modulename, "parallel restore is not supported with this archive file format\n");
3094
3095         slots = (ParallelSlot *) calloc(sizeof(ParallelSlot), n_slots);
3096
3097         /* Adjust dependency information */
3098         fix_dependencies(AH);
3099
3100         /*
3101          * Do all the early stuff in a single connection in the parent. There's no
3102          * great point in running it in parallel, in fact it will actually run
3103          * faster in a single connection because we avoid all the connection and
3104          * setup overhead.
3105          */
3106         for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next)
3107         {
3108                 if (next_work_item->section == SECTION_DATA ||
3109                         next_work_item->section == SECTION_POST_DATA)
3110                         break;
3111
3112                 ahlog(AH, 1, "processing item %d %s %s\n",
3113                           next_work_item->dumpId,
3114                           next_work_item->desc, next_work_item->tag);
3115
3116                 (void) restore_toc_entry(AH, next_work_item, ropt, false);
3117
3118                 /* there should be no touch of ready_list here, so pass NULL */
3119                 reduce_dependencies(AH, next_work_item, NULL);
3120         }
3121
3122         /*
3123          * Now close parent connection in prep for parallel steps.      We do this
3124          * mainly to ensure that we don't exceed the specified number of parallel
3125          * connections.
3126          */
3127         PQfinish(AH->connection);
3128         AH->connection = NULL;
3129
3130         /* blow away any transient state from the old connection */
3131         if (AH->currUser)
3132                 free(AH->currUser);
3133         AH->currUser = NULL;
3134         if (AH->currSchema)
3135                 free(AH->currSchema);
3136         AH->currSchema = NULL;
3137         if (AH->currTablespace)
3138                 free(AH->currTablespace);
3139         AH->currTablespace = NULL;
3140         AH->currWithOids = -1;
3141
3142         /*
3143          * Initialize the lists of pending and ready items.  After this setup,
3144          * the pending list is everything that needs to be done but is blocked
3145          * by one or more dependencies, while the ready list contains items that
3146          * have no remaining dependencies.  Note: we don't yet filter out entries
3147          * that aren't going to be restored.  They might participate in
3148          * dependency chains connecting entries that should be restored, so we
3149          * treat them as live until we actually process them.
3150          */
3151         par_list_header_init(&pending_list);
3152         par_list_header_init(&ready_list);
3153         for (; next_work_item != AH->toc; next_work_item = next_work_item->next)
3154         {
3155                 if (next_work_item->depCount > 0)
3156                         par_list_append(&pending_list, next_work_item);
3157                 else
3158                         par_list_append(&ready_list, next_work_item);
3159         }
3160
3161         /*
3162          * main parent loop
3163          *
3164          * Keep going until there is no worker still running AND there is no work
3165          * left to be done.
3166          */
3167
3168         ahlog(AH, 1, "entering main parallel loop\n");
3169
3170         while ((next_work_item = get_next_work_item(AH, &ready_list,
3171                                                                                                 slots, n_slots)) != NULL ||
3172                    work_in_progress(slots, n_slots))
3173         {
3174                 if (next_work_item != NULL)
3175                 {
3176                         teReqs          reqs;
3177
3178                         /* If not to be dumped, don't waste time launching a worker */
3179                         reqs = _tocEntryRequired(next_work_item, AH->ropt, false);
3180                         if ((reqs & (REQ_SCHEMA | REQ_DATA)) == 0)
3181                         {
3182                                 ahlog(AH, 1, "skipping item %d %s %s\n",
3183                                           next_work_item->dumpId,
3184                                           next_work_item->desc, next_work_item->tag);
3185
3186                                 par_list_remove(next_work_item);
3187                                 reduce_dependencies(AH, next_work_item, &ready_list);
3188
3189                                 continue;
3190                         }
3191
3192                         if ((next_slot = get_next_slot(slots, n_slots)) != NO_SLOT)
3193                         {
3194                                 /* There is work still to do and a worker slot available */
3195                                 thandle         child;
3196                                 RestoreArgs *args;
3197
3198                                 ahlog(AH, 1, "launching item %d %s %s\n",
3199                                           next_work_item->dumpId,
3200                                           next_work_item->desc, next_work_item->tag);
3201
3202                                 par_list_remove(next_work_item);
3203
3204                                 /* this memory is dealloced in mark_work_done() */
3205                                 args = malloc(sizeof(RestoreArgs));
3206                                 args->AH = CloneArchive(AH);
3207                                 args->te = next_work_item;
3208
3209                                 /* run the step in a worker child */
3210                                 child = spawn_restore(args);
3211
3212                                 slots[next_slot].child_id = child;
3213                                 slots[next_slot].args = args;
3214
3215                                 continue;
3216                         }
3217                 }
3218
3219                 /*
3220                  * If we get here there must be work being done.  Either there is no
3221                  * work available to schedule (and work_in_progress returned true) or
3222                  * there are no slots available.  So we wait for a worker to finish,
3223                  * and process the result.
3224                  */
3225                 ret_child = reap_child(slots, n_slots, &work_status);
3226
3227                 if (WIFEXITED(work_status))
3228                 {
3229                         mark_work_done(AH, &ready_list,
3230                                                    ret_child, WEXITSTATUS(work_status),
3231                                                    slots, n_slots);
3232                 }
3233                 else
3234                 {
3235                         die_horribly(AH, modulename, "worker process crashed: status %d\n",
3236                                                  work_status);
3237                 }
3238         }
3239
3240         ahlog(AH, 1, "finished main parallel loop\n");
3241
3242         /*
3243          * Now reconnect the single parent connection.
3244          */
3245         ConnectDatabase((Archive *) AH, ropt->dbname,
3246                                         ropt->pghost, ropt->pgport, ropt->username,
3247                                         ropt->promptPassword);
3248
3249         _doSetFixedOutputState(AH);
3250
3251         /*
3252          * Make sure there is no non-ACL work left due to, say, circular
3253          * dependencies, or some other pathological condition. If so, do it in the
3254          * single parent connection.
3255          */
3256         for (te = pending_list.par_next; te != &pending_list; te = te->par_next)
3257         {
3258                 ahlog(AH, 1, "processing missed item %d %s %s\n",
3259                           te->dumpId, te->desc, te->tag);
3260                 (void) restore_toc_entry(AH, te, ropt, false);
3261         }
3262
3263         /* The ACLs will be handled back in RestoreArchive. */
3264 }
3265
3266 /*
3267  * create a worker child to perform a restore step in parallel
3268  */
3269 static thandle
3270 spawn_restore(RestoreArgs *args)
3271 {
3272         thandle         child;
3273
3274         /* Ensure stdio state is quiesced before forking */
3275         fflush(NULL);
3276
3277 #ifndef WIN32
3278         child = fork();
3279         if (child == 0)
3280         {
3281                 /* in child process */
3282                 parallel_restore(args);
3283                 die_horribly(args->AH, modulename,
3284                                          "parallel_restore should not return\n");
3285         }
3286         else if (child < 0)
3287         {
3288                 /* fork failed */
3289                 die_horribly(args->AH, modulename,
3290                                          "could not create worker process: %s\n",
3291                                          strerror(errno));
3292         }
3293 #else
3294         child = (HANDLE) _beginthreadex(NULL, 0, (void *) parallel_restore,
3295                                                                         args, 0, NULL);
3296         if (child == 0)
3297                 die_horribly(args->AH, modulename,
3298                                          "could not create worker thread: %s\n",
3299                                          strerror(errno));
3300 #endif
3301
3302         return child;
3303 }
3304
3305 /*
3306  *      collect status from a completed worker child
3307  */
3308 static thandle
3309 reap_child(ParallelSlot *slots, int n_slots, int *work_status)
3310 {
3311 #ifndef WIN32
3312         /* Unix is so much easier ... */
3313         return wait(work_status);
3314 #else
3315         static HANDLE *handles = NULL;
3316         int                     hindex,
3317                                 snum,
3318                                 tnum;
3319         thandle         ret_child;
3320         DWORD           res;
3321
3322         /* first time around only, make space for handles to listen on */
3323         if (handles == NULL)
3324                 handles = (HANDLE *) calloc(sizeof(HANDLE), n_slots);
3325
3326         /* set up list of handles to listen to */
3327         for (snum = 0, tnum = 0; snum < n_slots; snum++)
3328                 if (slots[snum].child_id != 0)
3329                         handles[tnum++] = slots[snum].child_id;
3330
3331         /* wait for one to finish */
3332         hindex = WaitForMultipleObjects(tnum, handles, false, INFINITE);
3333
3334         /* get handle of finished thread */
3335         ret_child = handles[hindex - WAIT_OBJECT_0];
3336
3337         /* get the result */
3338         GetExitCodeThread(ret_child, &res);
3339         *work_status = res;
3340
3341         /* dispose of handle to stop leaks */
3342         CloseHandle(ret_child);
3343
3344         return ret_child;
3345 #endif
3346 }
3347
3348 /*
3349  * are we doing anything now?
3350  */
3351 static bool
3352 work_in_progress(ParallelSlot *slots, int n_slots)
3353 {
3354         int                     i;
3355
3356         for (i = 0; i < n_slots; i++)
3357         {
3358                 if (slots[i].child_id != 0)
3359                         return true;
3360         }
3361         return false;
3362 }
3363
3364 /*
3365  * find the first free parallel slot (if any).
3366  */
3367 static int
3368 get_next_slot(ParallelSlot *slots, int n_slots)
3369 {
3370         int                     i;
3371
3372         for (i = 0; i < n_slots; i++)
3373         {
3374                 if (slots[i].child_id == 0)
3375                         return i;
3376         }
3377         return NO_SLOT;
3378 }
3379
3380
3381 /*
3382  * Check if te1 has an exclusive lock requirement for an item that te2 also
3383  * requires, whether or not te2's requirement is for an exclusive lock.
3384  */
3385 static bool
3386 has_lock_conflicts(TocEntry *te1, TocEntry *te2)
3387 {
3388         int                     j,
3389                                 k;
3390
3391         for (j = 0; j < te1->nLockDeps; j++)
3392         {
3393                 for (k = 0; k < te2->nDeps; k++)
3394                 {
3395                         if (te1->lockDeps[j] == te2->dependencies[k])
3396                                 return true;
3397                 }
3398         }
3399         return false;
3400 }
3401
3402
3403 /*
3404  * Initialize the header of a parallel-processing list.
3405  *
3406  * These are circular lists with a dummy TocEntry as header, just like the
3407  * main TOC list; but we use separate list links so that an entry can be in
3408  * the main TOC list as well as in a parallel-processing list.
3409  */
3410 static void
3411 par_list_header_init(TocEntry *l)
3412 {
3413         l->par_prev = l->par_next = l;
3414 }
3415
3416 /* Append te to the end of the parallel-processing list headed by l */
3417 static void
3418 par_list_append(TocEntry *l, TocEntry *te)
3419 {
3420         te->par_prev = l->par_prev;
3421         l->par_prev->par_next = te;
3422         l->par_prev = te;
3423         te->par_next = l;
3424 }
3425
3426 /* Remove te from whatever parallel-processing list it's in */
3427 static void
3428 par_list_remove(TocEntry *te)
3429 {
3430         te->par_prev->par_next = te->par_next;
3431         te->par_next->par_prev = te->par_prev;
3432         te->par_prev = NULL;
3433         te->par_next = NULL;
3434 }
3435
3436
3437 /*
3438  * Find the next work item (if any) that is capable of being run now.
3439  *
3440  * To qualify, the item must have no remaining dependencies
3441  * and no requirements for locks that are incompatible with
3442  * items currently running.  Items in the ready_list are known to have
3443  * no remaining dependencies, but we have to check for lock conflicts.
3444  *
3445  * Note that the returned item has *not* been removed from ready_list.
3446  * The caller must do that after successfully dispatching the item.
3447  *
3448  * pref_non_data is for an alternative selection algorithm that gives
3449  * preference to non-data items if there is already a data load running.
3450  * It is currently disabled.
3451  */
3452 static TocEntry *
3453 get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
3454                                    ParallelSlot *slots, int n_slots)
3455 {
3456         bool            pref_non_data = false;  /* or get from AH->ropt */
3457         TocEntry   *data_te = NULL;
3458         TocEntry   *te;
3459         int                     i,
3460                                 k;
3461
3462         /*
3463          * Bogus heuristics for pref_non_data
3464          */
3465         if (pref_non_data)
3466         {
3467                 int                     count = 0;
3468
3469                 for (k = 0; k < n_slots; k++)
3470                         if (slots[k].args->te != NULL &&
3471                                 slots[k].args->te->section == SECTION_DATA)
3472                                 count++;
3473                 if (n_slots == 0 || count * 4 < n_slots)
3474                         pref_non_data = false;
3475         }
3476
3477         /*
3478          * Search the ready_list until we find a suitable item.
3479          */
3480         for (te = ready_list->par_next; te != ready_list; te = te->par_next)
3481         {
3482                 bool            conflicts = false;
3483
3484                 /*
3485                  * Check to see if the item would need exclusive lock on something
3486                  * that a currently running item also needs lock on, or vice versa. If
3487                  * so, we don't want to schedule them together.
3488                  */
3489                 for (i = 0; i < n_slots && !conflicts; i++)
3490                 {
3491                         TocEntry   *running_te;
3492
3493                         if (slots[i].args == NULL)
3494                                 continue;
3495                         running_te = slots[i].args->te;
3496
3497                         if (has_lock_conflicts(te, running_te) ||
3498                                 has_lock_conflicts(running_te, te))
3499                         {
3500                                 conflicts = true;
3501                                 break;
3502                         }
3503                 }
3504
3505                 if (conflicts)
3506                         continue;
3507
3508                 if (pref_non_data && te->section == SECTION_DATA)
3509                 {
3510                         if (data_te == NULL)
3511                                 data_te = te;
3512                         continue;
3513                 }
3514
3515                 /* passed all tests, so this item can run */
3516                 return te;
3517         }
3518
3519         if (data_te != NULL)
3520                 return data_te;
3521
3522         ahlog(AH, 2, "no item ready\n");
3523         return NULL;
3524 }
3525
3526
3527 /*
3528  * Restore a single TOC item in parallel with others
3529  *
3530  * this is the procedure run as a thread (Windows) or a
3531  * separate process (everything else).
3532  */
3533 static parallel_restore_result
3534 parallel_restore(RestoreArgs *args)
3535 {
3536         ArchiveHandle *AH = args->AH;
3537         TocEntry   *te = args->te;
3538         RestoreOptions *ropt = AH->ropt;
3539         int                     retval;
3540
3541         /*
3542          * Close and reopen the input file so we have a private file pointer that
3543          * doesn't stomp on anyone else's file pointer, if we're actually going to
3544          * need to read from the file. Otherwise, just close it except on Windows,
3545          * where it will possibly be needed by other threads.
3546          *
3547          * Note: on Windows, since we are using threads not processes, the reopen
3548          * call *doesn't* close the original file pointer but just open a new one.
3549          */
3550         if (te->section == SECTION_DATA)
3551                 (AH->ReopenPtr) (AH);
3552 #ifndef WIN32
3553         else
3554                 (AH->ClosePtr) (AH);
3555 #endif
3556
3557         /*
3558          * We need our own database connection, too
3559          */
3560         ConnectDatabase((Archive *) AH, ropt->dbname,
3561                                         ropt->pghost, ropt->pgport, ropt->username,
3562                                         ropt->promptPassword);
3563
3564         _doSetFixedOutputState(AH);
3565
3566         /* Restore the TOC item */
3567         retval = restore_toc_entry(AH, te, ropt, true);
3568
3569         /* And clean up */
3570         PQfinish(AH->connection);
3571         AH->connection = NULL;
3572
3573         /* If we reopened the file, we are done with it, so close it now */
3574         if (te->section == SECTION_DATA)
3575                 (AH->ClosePtr) (AH);
3576
3577         if (retval == 0 && AH->public.n_errors)
3578                 retval = WORKER_IGNORED_ERRORS;
3579
3580 #ifndef WIN32
3581         exit(retval);
3582 #else
3583         return retval;
3584 #endif
3585 }
3586
3587
3588 /*
3589  * Housekeeping to be done after a step has been parallel restored.
3590  *
3591  * Clear the appropriate slot, free all the extra memory we allocated,
3592  * update status, and reduce the dependency count of any dependent items.
3593  */
3594 static void
3595 mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
3596                            thandle worker, int status,
3597                            ParallelSlot *slots, int n_slots)
3598 {
3599         TocEntry   *te = NULL;
3600         int                     i;
3601
3602         for (i = 0; i < n_slots; i++)
3603         {
3604                 if (slots[i].child_id == worker)
3605                 {
3606                         slots[i].child_id = 0;
3607                         te = slots[i].args->te;
3608                         DeCloneArchive(slots[i].args->AH);
3609                         free(slots[i].args);
3610                         slots[i].args = NULL;
3611
3612                         break;
3613                 }
3614         }
3615
3616         if (te == NULL)
3617                 die_horribly(AH, modulename, "could not find slot of finished worker\n");
3618
3619         ahlog(AH, 1, "finished item %d %s %s\n",
3620                   te->dumpId, te->desc, te->tag);
3621
3622         if (status == WORKER_CREATE_DONE)
3623                 mark_create_done(AH, te);
3624         else if (status == WORKER_INHIBIT_DATA)
3625         {
3626                 inhibit_data_for_failed_table(AH, te);
3627                 AH->public.n_errors++;
3628         }
3629         else if (status == WORKER_IGNORED_ERRORS)
3630                 AH->public.n_errors++;
3631         else if (status != 0)
3632                 die_horribly(AH, modulename, "worker process failed: exit code %d\n",
3633                                          status);
3634
3635         reduce_dependencies(AH, te, ready_list);
3636 }
3637
3638
3639 /*
3640  * Process the dependency information into a form useful for parallel restore.
3641  *
3642  * We set up depCount fields that are the number of as-yet-unprocessed
3643  * dependencies for each TOC entry.
3644  *
3645  * We also identify locking dependencies so that we can avoid trying to
3646  * schedule conflicting items at the same time.
3647  */
3648 static void
3649 fix_dependencies(ArchiveHandle *AH)
3650 {
3651         TocEntry  **tocsByDumpId;
3652         TocEntry   *te;
3653         int                     i;
3654
3655         /*
3656          * For some of the steps here, it is convenient to have an array that
3657          * indexes the TOC entries by dump ID, rather than searching the TOC list
3658          * repeatedly.  Entries for dump IDs not present in the TOC will be NULL.
3659          *
3660          * Also, initialize the depCount fields, and make sure all the TOC items
3661          * are marked as not being in any parallel-processing list.
3662          */
3663         tocsByDumpId = (TocEntry **) calloc(AH->maxDumpId, sizeof(TocEntry *));
3664         for (te = AH->toc->next; te != AH->toc; te = te->next)
3665         {
3666                 tocsByDumpId[te->dumpId - 1] = te;
3667                 te->depCount = te->nDeps;
3668                 te->par_prev = NULL;
3669                 te->par_next = NULL;
3670         }
3671
3672         /*
3673          * POST_DATA items that are shown as depending on a table need to be
3674          * re-pointed to depend on that table's data, instead.  This ensures they
3675          * won't get scheduled until the data has been loaded.  We handle this by
3676          * first finding TABLE/TABLE DATA pairs and then scanning all the
3677          * dependencies.
3678          *
3679          * Note: currently, a TABLE DATA should always have exactly one
3680          * dependency, on its TABLE item.  So we don't bother to search, but look
3681          * just at the first dependency.  We do trouble to make sure that it's a
3682          * TABLE, if possible.  However, if the dependency isn't in the archive
3683          * then just assume it was a TABLE; this is to cover cases where the table
3684          * was suppressed but we have the data and some dependent post-data items.
3685          */
3686         for (te = AH->toc->next; te != AH->toc; te = te->next)
3687         {
3688                 if (strcmp(te->desc, "TABLE DATA") == 0 && te->nDeps > 0)
3689                 {
3690                         DumpId          tableId = te->dependencies[0];
3691
3692                         if (tocsByDumpId[tableId - 1] == NULL ||
3693                                 strcmp(tocsByDumpId[tableId - 1]->desc, "TABLE") == 0)
3694                         {
3695                                 repoint_table_dependencies(AH, tableId, te->dumpId);
3696                         }
3697                 }
3698         }
3699
3700         /*
3701          * Pre-8.4 versions of pg_dump neglected to set up a dependency from BLOB
3702          * COMMENTS to BLOBS.  Cope.  (We assume there's only one BLOBS and only
3703          * one BLOB COMMENTS in such files.)
3704          */
3705         if (AH->version < K_VERS_1_11)
3706         {
3707                 for (te = AH->toc->next; te != AH->toc; te = te->next)
3708                 {
3709                         if (strcmp(te->desc, "BLOB COMMENTS") == 0 && te->nDeps == 0)
3710                         {
3711                                 TocEntry   *te2;
3712
3713                                 for (te2 = AH->toc->next; te2 != AH->toc; te2 = te2->next)
3714                                 {
3715                                         if (strcmp(te2->desc, "BLOBS") == 0)
3716                                         {
3717                                                 te->dependencies = (DumpId *) malloc(sizeof(DumpId));
3718                                                 te->dependencies[0] = te2->dumpId;
3719                                                 te->nDeps++;
3720                                                 te->depCount++;
3721                                                 break;
3722                                         }
3723                                 }
3724                                 break;
3725                         }
3726                 }
3727         }
3728
3729         /*
3730          * It is possible that the dependencies list items that are not in the
3731          * archive at all.      Subtract such items from the depCounts.
3732          */
3733         for (te = AH->toc->next; te != AH->toc; te = te->next)
3734         {
3735                 for (i = 0; i < te->nDeps; i++)
3736                 {
3737                         if (tocsByDumpId[te->dependencies[i] - 1] == NULL)
3738                                 te->depCount--;
3739                 }
3740         }
3741
3742         /*
3743          * Lastly, work out the locking dependencies.
3744          */
3745         for (te = AH->toc->next; te != AH->toc; te = te->next)
3746         {
3747                 te->lockDeps = NULL;
3748                 te->nLockDeps = 0;
3749                 identify_locking_dependencies(te, tocsByDumpId);
3750         }
3751
3752         free(tocsByDumpId);
3753 }
3754
3755 /*
3756  * Change dependencies on tableId to depend on tableDataId instead,
3757  * but only in POST_DATA items.
3758  */
3759 static void
3760 repoint_table_dependencies(ArchiveHandle *AH,
3761                                                    DumpId tableId, DumpId tableDataId)
3762 {
3763         TocEntry   *te;
3764         int                     i;
3765
3766         for (te = AH->toc->next; te != AH->toc; te = te->next)
3767         {
3768                 if (te->section != SECTION_POST_DATA)
3769                         continue;
3770                 for (i = 0; i < te->nDeps; i++)
3771                 {
3772                         if (te->dependencies[i] == tableId)
3773                         {
3774                                 te->dependencies[i] = tableDataId;
3775                                 ahlog(AH, 2, "transferring dependency %d -> %d to %d\n",
3776                                           te->dumpId, tableId, tableDataId);
3777                         }
3778                 }
3779         }
3780 }
3781
3782 /*
3783  * Identify which objects we'll need exclusive lock on in order to restore
3784  * the given TOC entry (*other* than the one identified by the TOC entry
3785  * itself).  Record their dump IDs in the entry's lockDeps[] array.
3786  * tocsByDumpId[] is a convenience array to avoid searching the TOC
3787  * for each dependency.
3788  */
3789 static void
3790 identify_locking_dependencies(TocEntry *te, TocEntry **tocsByDumpId)
3791 {
3792         DumpId     *lockids;
3793         int                     nlockids;
3794         int                     i;
3795
3796         /* Quick exit if no dependencies at all */
3797         if (te->nDeps == 0)
3798                 return;
3799
3800         /* Exit if this entry doesn't need exclusive lock on other objects */
3801         if (!(strcmp(te->desc, "CONSTRAINT") == 0 ||
3802                   strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
3803                   strcmp(te->desc, "FK CONSTRAINT") == 0 ||
3804                   strcmp(te->desc, "RULE") == 0 ||
3805                   strcmp(te->desc, "TRIGGER") == 0))
3806                 return;
3807
3808         /*
3809          * We assume the item requires exclusive lock on each TABLE DATA item
3810          * listed among its dependencies.  (This was originally a dependency on
3811          * the TABLE, but fix_dependencies repointed it to the data item. Note
3812          * that all the entry types we are interested in here are POST_DATA, so
3813          * they will all have been changed this way.)
3814          */
3815         lockids = (DumpId *) malloc(te->nDeps * sizeof(DumpId));
3816         nlockids = 0;
3817         for (i = 0; i < te->nDeps; i++)
3818         {
3819                 DumpId          depid = te->dependencies[i];
3820
3821                 if (tocsByDumpId[depid - 1] &&
3822                         strcmp(tocsByDumpId[depid - 1]->desc, "TABLE DATA") == 0)
3823                         lockids[nlockids++] = depid;
3824         }
3825
3826         if (nlockids == 0)
3827         {
3828                 free(lockids);
3829                 return;
3830         }
3831
3832         te->lockDeps = realloc(lockids, nlockids * sizeof(DumpId));
3833         te->nLockDeps = nlockids;
3834 }
3835
3836 /*
3837  * Remove the specified TOC entry from the depCounts of items that depend on
3838  * it, thereby possibly making them ready-to-run.  Any pending item that
3839  * becomes ready should be moved to the ready list.
3840  */
3841 static void
3842 reduce_dependencies(ArchiveHandle *AH, TocEntry *te, TocEntry *ready_list)
3843 {
3844         DumpId          target = te->dumpId;
3845         int                     i;
3846
3847         ahlog(AH, 2, "reducing dependencies for %d\n", target);
3848
3849         /*
3850          * We must examine all entries, not only the ones after the target item,
3851          * because if the user used a -L switch then the original dependency-
3852          * respecting order has been destroyed by SortTocFromFile.
3853          */
3854         for (te = AH->toc->next; te != AH->toc; te = te->next)
3855         {
3856                 for (i = 0; i < te->nDeps; i++)
3857                 {
3858                         if (te->dependencies[i] == target)
3859                         {
3860                                 te->depCount--;
3861                                 if (te->depCount == 0 && te->par_prev != NULL)
3862                                 {
3863                                         /* It must be in the pending list, so remove it ... */
3864                                         par_list_remove(te);
3865                                         /* ... and add to ready_list */
3866                                         par_list_append(ready_list, te);
3867                                 }
3868                         }
3869                 }
3870         }
3871 }
3872
3873 /*
3874  * Set the created flag on the DATA member corresponding to the given
3875  * TABLE member
3876  */
3877 static void
3878 mark_create_done(ArchiveHandle *AH, TocEntry *te)
3879 {
3880         TocEntry   *tes;
3881
3882         for (tes = AH->toc->next; tes != AH->toc; tes = tes->next)
3883         {
3884                 if (strcmp(tes->desc, "TABLE DATA") == 0 &&
3885                         strcmp(tes->tag, te->tag) == 0 &&
3886                         strcmp(tes->namespace ? tes->namespace : "",
3887                                    te->namespace ? te->namespace : "") == 0)
3888                 {
3889                         tes->created = true;
3890                         break;
3891                 }
3892         }
3893 }
3894
3895 /*
3896  * Mark the DATA member corresponding to the given TABLE member
3897  * as not wanted
3898  */
3899 static void
3900 inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te)
3901 {
3902         RestoreOptions *ropt = AH->ropt;
3903         TocEntry   *tes;
3904
3905         ahlog(AH, 1, "table \"%s\" could not be created, will not restore its data\n",
3906                   te->tag);
3907
3908         for (tes = AH->toc->next; tes != AH->toc; tes = tes->next)
3909         {
3910                 if (strcmp(tes->desc, "TABLE DATA") == 0 &&
3911                         strcmp(tes->tag, te->tag) == 0 &&
3912                         strcmp(tes->namespace ? tes->namespace : "",
3913                                    te->namespace ? te->namespace : "") == 0)
3914                 {
3915                         /* mark it unwanted; we assume idWanted array already exists */
3916                         ropt->idWanted[tes->dumpId - 1] = false;
3917                         break;
3918                 }
3919         }
3920 }
3921
3922
3923 /*
3924  * Clone and de-clone routines used in parallel restoration.
3925  *
3926  * Enough of the structure is cloned to ensure that there is no
3927  * conflict between different threads each with their own clone.
3928  *
3929  * These could be public, but no need at present.
3930  */
3931 static ArchiveHandle *
3932 CloneArchive(ArchiveHandle *AH)
3933 {
3934         ArchiveHandle *clone;
3935
3936         /* Make a "flat" copy */
3937         clone = (ArchiveHandle *) malloc(sizeof(ArchiveHandle));
3938         if (clone == NULL)
3939                 die_horribly(AH, modulename, "out of memory\n");
3940         memcpy(clone, AH, sizeof(ArchiveHandle));
3941
3942         /* Handle format-independent fields */
3943         clone->pgCopyBuf = createPQExpBuffer();
3944         clone->sqlBuf = createPQExpBuffer();
3945         clone->sqlparse.tagBuf = NULL;
3946
3947         /* The clone will have its own connection, so disregard connection state */
3948         clone->connection = NULL;
3949         clone->currUser = NULL;
3950         clone->currSchema = NULL;
3951         clone->currTablespace = NULL;
3952         clone->currWithOids = -1;
3953
3954         /* savedPassword must be local in case we change it while connecting */
3955         if (clone->savedPassword)
3956                 clone->savedPassword = strdup(clone->savedPassword);
3957
3958         /* clone has its own error count, too */
3959         clone->public.n_errors = 0;
3960
3961         /* Let the format-specific code have a chance too */
3962         (clone->ClonePtr) (clone);
3963
3964         return clone;
3965 }
3966
3967 /*
3968  * Release clone-local storage.
3969  *
3970  * Note: we assume any clone-local connection was already closed.
3971  */
3972 static void
3973 DeCloneArchive(ArchiveHandle *AH)
3974 {
3975         /* Clear format-specific state */
3976         (AH->DeClonePtr) (AH);
3977
3978         /* Clear state allocated by CloneArchive */
3979         destroyPQExpBuffer(AH->pgCopyBuf);
3980         destroyPQExpBuffer(AH->sqlBuf);
3981         if (AH->sqlparse.tagBuf)
3982                 destroyPQExpBuffer(AH->sqlparse.tagBuf);
3983
3984         /* Clear any connection-local state */
3985         if (AH->currUser)
3986                 free(AH->currUser);
3987         if (AH->currSchema)
3988                 free(AH->currSchema);
3989         if (AH->currTablespace)
3990                 free(AH->currTablespace);
3991         if (AH->savedPassword)
3992                 free(AH->savedPassword);
3993
3994         free(AH);
3995 }