]> granicus.if.org Git - postgresql/blob - src/bin/pg_dump/pg_backup_archiver.c
pgindent run for 9.0
[postgresql] / src / bin / pg_dump / pg_backup_archiver.c
1 /*-------------------------------------------------------------------------
2  *
3  * pg_backup_archiver.c
4  *
5  *      Private implementation of the archiver routines.
6  *
7  *      See the headers to pg_restore for more details.
8  *
9  * Copyright (c) 2000, Philip Warner
10  *      Rights are granted to use this software in any way so long
11  *      as this notice is not removed.
12  *
13  *      The author is not responsible for loss or damages that may
14  *      result from its use.
15  *
16  *
17  * IDENTIFICATION
18  *              $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.c,v 1.182 2010/02/26 02:01:16 momjian Exp $
19  *
20  *-------------------------------------------------------------------------
21  */
22
23 #include "pg_backup_db.h"
24 #include "dumputils.h"
25
26 #include <ctype.h>
27 #include <unistd.h>
28 #include <sys/types.h>
29 #include <sys/wait.h>
30
31 #ifdef WIN32
32 #include <io.h>
33 #endif
34
35 #include "libpq/libpq-fs.h"
36
37 /*
38  * Special exit values from worker children.  We reserve 0 for normal
39  * success; 1 and other small values should be interpreted as crashes.
40  */
41 #define WORKER_CREATE_DONE              10
42 #define WORKER_INHIBIT_DATA             11
43 #define WORKER_IGNORED_ERRORS   12
44
45 /*
46  * Unix uses exit to return result from worker child, so function is void.
47  * Windows thread result comes via function return.
48  */
49 #ifndef WIN32
50 #define parallel_restore_result void
51 #else
52 #define parallel_restore_result DWORD
53 #endif
54
55 /* IDs for worker children are either PIDs or thread handles */
56 #ifndef WIN32
57 #define thandle pid_t
58 #else
59 #define thandle HANDLE
60 #endif
61
62 /* Arguments needed for a worker child */
63 typedef struct _restore_args
64 {
65         ArchiveHandle *AH;
66         TocEntry   *te;
67 } RestoreArgs;
68
69 /* State for each parallel activity slot */
70 typedef struct _parallel_slot
71 {
72         thandle         child_id;
73         RestoreArgs *args;
74 } ParallelSlot;
75
76 #define NO_SLOT (-1)
77
78 const char *progname;
79
80 static const char *modulename = gettext_noop("archiver");
81
82
83 static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
84                  const int compression, ArchiveMode mode);
85 static void _getObjectDescription(PQExpBuffer buf, TocEntry *te,
86                                           ArchiveHandle *AH);
87 static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt, bool isData, bool acl_pass);
88
89
90 static void _doSetFixedOutputState(ArchiveHandle *AH);
91 static void _doSetSessionAuth(ArchiveHandle *AH, const char *user);
92 static void _doSetWithOids(ArchiveHandle *AH, const bool withOids);
93 static void _reconnectToDB(ArchiveHandle *AH, const char *dbname);
94 static void _becomeUser(ArchiveHandle *AH, const char *user);
95 static void _becomeOwner(ArchiveHandle *AH, TocEntry *te);
96 static void _selectOutputSchema(ArchiveHandle *AH, const char *schemaName);
97 static void _selectTablespace(ArchiveHandle *AH, const char *tablespace);
98 static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te);
99 static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te);
100 static teReqs _tocEntryRequired(TocEntry *te, RestoreOptions *ropt, bool include_acls);
101 static bool _tocEntryIsACL(TocEntry *te);
102 static void _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
103 static void _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
104 static TocEntry *getTocEntryByDumpId(ArchiveHandle *AH, DumpId id);
105 static void _moveAfter(ArchiveHandle *AH, TocEntry *pos, TocEntry *te);
106 static int      _discoverArchiveFormat(ArchiveHandle *AH);
107
108 static void dump_lo_buf(ArchiveHandle *AH);
109 static void _write_msg(const char *modulename, const char *fmt, va_list ap);
110 static void _die_horribly(ArchiveHandle *AH, const char *modulename, const char *fmt, va_list ap);
111
112 static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim);
113 static OutputContext SetOutput(ArchiveHandle *AH, char *filename, int compression);
114 static void ResetOutput(ArchiveHandle *AH, OutputContext savedContext);
115
116 static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
117                                   RestoreOptions *ropt, bool is_parallel);
118 static void restore_toc_entries_parallel(ArchiveHandle *AH);
119 static thandle spawn_restore(RestoreArgs *args);
120 static thandle reap_child(ParallelSlot *slots, int n_slots, int *work_status);
121 static bool work_in_progress(ParallelSlot *slots, int n_slots);
122 static int      get_next_slot(ParallelSlot *slots, int n_slots);
123 static void par_list_header_init(TocEntry *l);
124 static void par_list_append(TocEntry *l, TocEntry *te);
125 static void par_list_remove(TocEntry *te);
126 static TocEntry *get_next_work_item(ArchiveHandle *AH,
127                                    TocEntry *ready_list,
128                                    ParallelSlot *slots, int n_slots);
129 static parallel_restore_result parallel_restore(RestoreArgs *args);
130 static void mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
131                            thandle worker, int status,
132                            ParallelSlot *slots, int n_slots);
133 static void fix_dependencies(ArchiveHandle *AH);
134 static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
135 static void repoint_table_dependencies(ArchiveHandle *AH,
136                                                    DumpId tableId, DumpId tableDataId);
137 static void identify_locking_dependencies(TocEntry *te,
138                                                           TocEntry **tocsByDumpId,
139                                                           DumpId maxDumpId);
140 static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
141                                         TocEntry *ready_list);
142 static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
143 static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);
144 static ArchiveHandle *CloneArchive(ArchiveHandle *AH);
145 static void DeCloneArchive(ArchiveHandle *AH);
146
147
148 /*
149  *      Wrapper functions.
150  *
151  *      The objective it to make writing new formats and dumpers as simple
152  *      as possible, if necessary at the expense of extra function calls etc.
153  *
154  */
155
156
157 /* Create a new archive */
158 /* Public */
159 Archive *
160 CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
161                           const int compression, ArchiveMode mode)
162
163 {
164         ArchiveHandle *AH = _allocAH(FileSpec, fmt, compression, mode);
165
166         return (Archive *) AH;
167 }
168
169 /* Open an existing archive */
170 /* Public */
171 Archive *
172 OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
173 {
174         ArchiveHandle *AH = _allocAH(FileSpec, fmt, 0, archModeRead);
175
176         return (Archive *) AH;
177 }
178
179 /* Public */
180 void
181 CloseArchive(Archive *AHX)
182 {
183         int                     res = 0;
184         ArchiveHandle *AH = (ArchiveHandle *) AHX;
185
186         (*AH->ClosePtr) (AH);
187
188         /* Close the output */
189         if (AH->gzOut)
190                 res = GZCLOSE(AH->OF);
191         else if (AH->OF != stdout)
192                 res = fclose(AH->OF);
193
194         if (res != 0)
195                 die_horribly(AH, modulename, "could not close output file: %s\n",
196                                          strerror(errno));
197 }
198
199 /* Public */
200 void
201 RestoreArchive(Archive *AHX, RestoreOptions *ropt)
202 {
203         ArchiveHandle *AH = (ArchiveHandle *) AHX;
204         TocEntry   *te;
205         teReqs          reqs;
206         OutputContext sav;
207
208         AH->ropt = ropt;
209         AH->stage = STAGE_INITIALIZING;
210
211         /*
212          * Check for nonsensical option combinations.
213          *
214          * NB: create+dropSchema is useless because if you're creating the DB,
215          * there's no need to drop individual items in it.  Moreover, if we tried
216          * to do that then we'd issue the drops in the database initially
217          * connected to, not the one we will create, which is very bad...
218          */
219         if (ropt->create && ropt->dropSchema)
220                 die_horribly(AH, modulename, "-C and -c are incompatible options\n");
221
222         /*
223          * -1 is not compatible with -C, because we can't create a database inside
224          * a transaction block.
225          */
226         if (ropt->create && ropt->single_txn)
227                 die_horribly(AH, modulename, "-C and -1 are incompatible options\n");
228
229         /*
230          * Make sure we won't need (de)compression we haven't got
231          */
232 #ifndef HAVE_LIBZ
233         if (AH->compression != 0 && AH->PrintTocDataPtr !=NULL)
234         {
235                 for (te = AH->toc->next; te != AH->toc; te = te->next)
236                 {
237                         reqs = _tocEntryRequired(te, ropt, false);
238                         if (te->hadDumper && (reqs & REQ_DATA) != 0)
239                                 die_horribly(AH, modulename, "cannot restore from compressed archive (compression not supported in this installation)\n");
240                 }
241         }
242 #endif
243
244         /*
245          * If we're using a DB connection, then connect it.
246          */
247         if (ropt->useDB)
248         {
249                 ahlog(AH, 1, "connecting to database for restore\n");
250                 if (AH->version < K_VERS_1_3)
251                         die_horribly(AH, modulename, "direct database connections are not supported in pre-1.3 archives\n");
252
253                 /* XXX Should get this from the archive */
254                 AHX->minRemoteVersion = 070100;
255                 AHX->maxRemoteVersion = 999999;
256
257                 ConnectDatabase(AHX, ropt->dbname,
258                                                 ropt->pghost, ropt->pgport, ropt->username,
259                                                 ropt->promptPassword);
260
261                 /*
262                  * If we're talking to the DB directly, don't send comments since they
263                  * obscure SQL when displaying errors
264                  */
265                 AH->noTocComments = 1;
266         }
267
268         /*
269          * Work out if we have an implied data-only restore. This can happen if
270          * the dump was data only or if the user has used a toc list to exclude
271          * all of the schema data. All we do is look for schema entries - if none
272          * are found then we set the dataOnly flag.
273          *
274          * We could scan for wanted TABLE entries, but that is not the same as
275          * dataOnly. At this stage, it seems unnecessary (6-Mar-2001).
276          */
277         if (!ropt->dataOnly)
278         {
279                 int                     impliedDataOnly = 1;
280
281                 for (te = AH->toc->next; te != AH->toc; te = te->next)
282                 {
283                         reqs = _tocEntryRequired(te, ropt, true);
284                         if ((reqs & REQ_SCHEMA) != 0)
285                         {                                       /* It's schema, and it's wanted */
286                                 impliedDataOnly = 0;
287                                 break;
288                         }
289                 }
290                 if (impliedDataOnly)
291                 {
292                         ropt->dataOnly = impliedDataOnly;
293                         ahlog(AH, 1, "implied data-only restore\n");
294                 }
295         }
296
297         /*
298          * Setup the output file if necessary.
299          */
300         if (ropt->filename || ropt->compression)
301                 sav = SetOutput(AH, ropt->filename, ropt->compression);
302
303         ahprintf(AH, "--\n-- PostgreSQL database dump\n--\n\n");
304
305         if (AH->public.verbose)
306         {
307                 if (AH->archiveRemoteVersion)
308                         ahprintf(AH, "-- Dumped from database version %s\n",
309                                          AH->archiveRemoteVersion);
310                 if (AH->archiveDumpVersion)
311                         ahprintf(AH, "-- Dumped by pg_dump version %s\n",
312                                          AH->archiveDumpVersion);
313                 dumpTimestamp(AH, "Started on", AH->createDate);
314         }
315
316         if (ropt->single_txn)
317         {
318                 if (AH->connection)
319                         StartTransaction(AH);
320                 else
321                         ahprintf(AH, "BEGIN;\n\n");
322         }
323
324         /*
325          * Establish important parameter values right away.
326          */
327         _doSetFixedOutputState(AH);
328
329         AH->stage = STAGE_PROCESSING;
330
331         /*
332          * Drop the items at the start, in reverse order
333          */
334         if (ropt->dropSchema)
335         {
336                 for (te = AH->toc->prev; te != AH->toc; te = te->prev)
337                 {
338                         AH->currentTE = te;
339
340                         reqs = _tocEntryRequired(te, ropt, false /* needn't drop ACLs */ );
341                         /* We want anything that's selected and has a dropStmt */
342                         if (((reqs & (REQ_SCHEMA | REQ_DATA)) != 0) && te->dropStmt)
343                         {
344                                 ahlog(AH, 1, "dropping %s %s\n", te->desc, te->tag);
345                                 /* Select owner and schema as necessary */
346                                 _becomeOwner(AH, te);
347                                 _selectOutputSchema(AH, te->namespace);
348                                 /* Drop it */
349                                 ahprintf(AH, "%s", te->dropStmt);
350                         }
351                 }
352
353                 /*
354                  * _selectOutputSchema may have set currSchema to reflect the effect
355                  * of a "SET search_path" command it emitted.  However, by now we may
356                  * have dropped that schema; or it might not have existed in the first
357                  * place.  In either case the effective value of search_path will not
358                  * be what we think.  Forcibly reset currSchema so that we will
359                  * re-establish the search_path setting when needed (after creating
360                  * the schema).
361                  *
362                  * If we treated users as pg_dump'able objects then we'd need to reset
363                  * currUser here too.
364                  */
365                 if (AH->currSchema)
366                         free(AH->currSchema);
367                 AH->currSchema = NULL;
368         }
369
370         /*
371          * In serial mode, we now process each non-ACL TOC entry.
372          *
373          * In parallel mode, turn control over to the parallel-restore logic.
374          */
375         if (ropt->number_of_jobs > 1 && ropt->useDB)
376                 restore_toc_entries_parallel(AH);
377         else
378         {
379                 for (te = AH->toc->next; te != AH->toc; te = te->next)
380                         (void) restore_toc_entry(AH, te, ropt, false);
381         }
382
383         /*
384          * Scan TOC again to output ownership commands and ACLs
385          */
386         for (te = AH->toc->next; te != AH->toc; te = te->next)
387         {
388                 AH->currentTE = te;
389
390                 /* Work out what, if anything, we want from this entry */
391                 reqs = _tocEntryRequired(te, ropt, true);
392
393                 /* Both schema and data objects might now have ownership/ACLs */
394                 if ((reqs & (REQ_SCHEMA | REQ_DATA)) != 0)
395                 {
396                         ahlog(AH, 1, "setting owner and privileges for %s %s\n",
397                                   te->desc, te->tag);
398                         _printTocEntry(AH, te, ropt, false, true);
399                 }
400         }
401
402         if (ropt->single_txn)
403         {
404                 if (AH->connection)
405                         CommitTransaction(AH);
406                 else
407                         ahprintf(AH, "COMMIT;\n\n");
408         }
409
410         if (AH->public.verbose)
411                 dumpTimestamp(AH, "Completed on", time(NULL));
412
413         ahprintf(AH, "--\n-- PostgreSQL database dump complete\n--\n\n");
414
415         /*
416          * Clean up & we're done.
417          */
418         AH->stage = STAGE_FINALIZING;
419
420         if (ropt->filename || ropt->compression)
421                 ResetOutput(AH, sav);
422
423         if (ropt->useDB)
424         {
425                 PQfinish(AH->connection);
426                 AH->connection = NULL;
427         }
428 }
429
430 /*
431  * Restore a single TOC item.  Used in both parallel and non-parallel restore;
432  * is_parallel is true if we are in a worker child process.
433  *
434  * Returns 0 normally, but WORKER_CREATE_DONE or WORKER_INHIBIT_DATA if
435  * the parallel parent has to make the corresponding status update.
436  */
437 static int
438 restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
439                                   RestoreOptions *ropt, bool is_parallel)
440 {
441         int                     retval = 0;
442         teReqs          reqs;
443         bool            defnDumped;
444
445         AH->currentTE = te;
446
447         /* Work out what, if anything, we want from this entry */
448         reqs = _tocEntryRequired(te, ropt, false);
449
450         /* Dump any relevant dump warnings to stderr */
451         if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0)
452         {
453                 if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0)
454                         write_msg(modulename, "warning from original dump file: %s\n", te->defn);
455                 else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0)
456                         write_msg(modulename, "warning from original dump file: %s\n", te->copyStmt);
457         }
458
459         defnDumped = false;
460
461         if ((reqs & REQ_SCHEMA) != 0)           /* We want the schema */
462         {
463                 ahlog(AH, 1, "creating %s %s\n", te->desc, te->tag);
464
465                 _printTocEntry(AH, te, ropt, false, false);
466                 defnDumped = true;
467
468                 if (strcmp(te->desc, "TABLE") == 0)
469                 {
470                         if (AH->lastErrorTE == te)
471                         {
472                                 /*
473                                  * We failed to create the table. If
474                                  * --no-data-for-failed-tables was given, mark the
475                                  * corresponding TABLE DATA to be ignored.
476                                  *
477                                  * In the parallel case this must be done in the parent, so we
478                                  * just set the return value.
479                                  */
480                                 if (ropt->noDataForFailedTables)
481                                 {
482                                         if (is_parallel)
483                                                 retval = WORKER_INHIBIT_DATA;
484                                         else
485                                                 inhibit_data_for_failed_table(AH, te);
486                                 }
487                         }
488                         else
489                         {
490                                 /*
491                                  * We created the table successfully.  Mark the corresponding
492                                  * TABLE DATA for possible truncation.
493                                  *
494                                  * In the parallel case this must be done in the parent, so we
495                                  * just set the return value.
496                                  */
497                                 if (is_parallel)
498                                         retval = WORKER_CREATE_DONE;
499                                 else
500                                         mark_create_done(AH, te);
501                         }
502                 }
503
504                 /* If we created a DB, connect to it... */
505                 if (strcmp(te->desc, "DATABASE") == 0)
506                 {
507                         ahlog(AH, 1, "connecting to new database \"%s\"\n", te->tag);
508                         _reconnectToDB(AH, te->tag);
509                         ropt->dbname = strdup(te->tag);
510                 }
511         }
512
513         /*
514          * If we have a data component, then process it
515          */
516         if ((reqs & REQ_DATA) != 0)
517         {
518                 /*
519                  * hadDumper will be set if there is genuine data component for this
520                  * node. Otherwise, we need to check the defn field for statements
521                  * that need to be executed in data-only restores.
522                  */
523                 if (te->hadDumper)
524                 {
525                         /*
526                          * If we can output the data, then restore it.
527                          */
528                         if (AH->PrintTocDataPtr !=NULL && (reqs & REQ_DATA) != 0)
529                         {
530                                 _printTocEntry(AH, te, ropt, true, false);
531
532                                 if (strcmp(te->desc, "BLOBS") == 0 ||
533                                         strcmp(te->desc, "BLOB COMMENTS") == 0)
534                                 {
535                                         ahlog(AH, 1, "restoring %s\n", te->desc);
536
537                                         _selectOutputSchema(AH, "pg_catalog");
538
539                                         (*AH->PrintTocDataPtr) (AH, te, ropt);
540                                 }
541                                 else
542                                 {
543                                         _disableTriggersIfNecessary(AH, te, ropt);
544
545                                         /* Select owner and schema as necessary */
546                                         _becomeOwner(AH, te);
547                                         _selectOutputSchema(AH, te->namespace);
548
549                                         ahlog(AH, 1, "restoring data for table \"%s\"\n",
550                                                   te->tag);
551
552                                         /*
553                                          * In parallel restore, if we created the table earlier in
554                                          * the run then we wrap the COPY in a transaction and
555                                          * precede it with a TRUNCATE.  If archiving is not on
556                                          * this prevents WAL-logging the COPY.  This obtains a
557                                          * speedup similar to that from using single_txn mode in
558                                          * non-parallel restores.
559                                          */
560                                         if (is_parallel && te->created)
561                                         {
562                                                 /*
563                                                  * Parallel restore is always talking directly to a
564                                                  * server, so no need to see if we should issue BEGIN.
565                                                  */
566                                                 StartTransaction(AH);
567
568                                                 /*
569                                                  * If the server version is >= 8.4, make sure we issue
570                                                  * TRUNCATE with ONLY so that child tables are not
571                                                  * wiped.
572                                                  */
573                                                 ahprintf(AH, "TRUNCATE TABLE %s%s;\n\n",
574                                                                  (PQserverVersion(AH->connection) >= 80400 ?
575                                                                   "ONLY " : ""),
576                                                                  fmtId(te->tag));
577                                         }
578
579                                         /*
580                                          * If we have a copy statement, use it. As of V1.3, these
581                                          * are separate to allow easy import from withing a
582                                          * database connection. Pre 1.3 archives can not use DB
583                                          * connections and are sent to output only.
584                                          *
585                                          * For V1.3+, the table data MUST have a copy statement so
586                                          * that we can go into appropriate mode with libpq.
587                                          */
588                                         if (te->copyStmt && strlen(te->copyStmt) > 0)
589                                         {
590                                                 ahprintf(AH, "%s", te->copyStmt);
591                                                 AH->writingCopyData = true;
592                                         }
593
594                                         (*AH->PrintTocDataPtr) (AH, te, ropt);
595
596                                         AH->writingCopyData = false;
597
598                                         /* close out the transaction started above */
599                                         if (is_parallel && te->created)
600                                                 CommitTransaction(AH);
601
602                                         _enableTriggersIfNecessary(AH, te, ropt);
603                                 }
604                         }
605                 }
606                 else if (!defnDumped)
607                 {
608                         /* If we haven't already dumped the defn part, do so now */
609                         ahlog(AH, 1, "executing %s %s\n", te->desc, te->tag);
610                         _printTocEntry(AH, te, ropt, false, false);
611                 }
612         }
613
614         return retval;
615 }
616
617 /*
618  * Allocate a new RestoreOptions block.
619  * This is mainly so we can initialize it, but also for future expansion,
620  */
621 RestoreOptions *
622 NewRestoreOptions(void)
623 {
624         RestoreOptions *opts;
625
626         opts = (RestoreOptions *) calloc(1, sizeof(RestoreOptions));
627
628         /* set any fields that shouldn't default to zeroes */
629         opts->format = archUnknown;
630         opts->promptPassword = TRI_DEFAULT;
631
632         return opts;
633 }
634
635 static void
636 _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt)
637 {
638         /* This hack is only needed in a data-only restore */
639         if (!ropt->dataOnly || !ropt->disable_triggers)
640                 return;
641
642         ahlog(AH, 1, "disabling triggers for %s\n", te->tag);
643
644         /*
645          * Become superuser if possible, since they are the only ones who can
646          * disable constraint triggers.  If -S was not given, assume the initial
647          * user identity is a superuser.  (XXX would it be better to become the
648          * table owner?)
649          */
650         _becomeUser(AH, ropt->superuser);
651
652         /*
653          * Disable them.
654          */
655         _selectOutputSchema(AH, te->namespace);
656
657         ahprintf(AH, "ALTER TABLE %s DISABLE TRIGGER ALL;\n\n",
658                          fmtId(te->tag));
659 }
660
661 static void
662 _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt)
663 {
664         /* This hack is only needed in a data-only restore */
665         if (!ropt->dataOnly || !ropt->disable_triggers)
666                 return;
667
668         ahlog(AH, 1, "enabling triggers for %s\n", te->tag);
669
670         /*
671          * Become superuser if possible, since they are the only ones who can
672          * disable constraint triggers.  If -S was not given, assume the initial
673          * user identity is a superuser.  (XXX would it be better to become the
674          * table owner?)
675          */
676         _becomeUser(AH, ropt->superuser);
677
678         /*
679          * Enable them.
680          */
681         _selectOutputSchema(AH, te->namespace);
682
683         ahprintf(AH, "ALTER TABLE %s ENABLE TRIGGER ALL;\n\n",
684                          fmtId(te->tag));
685 }
686
687 /*
688  * This is a routine that is part of the dumper interface, hence the 'Archive*' parameter.
689  */
690
691 /* Public */
692 size_t
693 WriteData(Archive *AHX, const void *data, size_t dLen)
694 {
695         ArchiveHandle *AH = (ArchiveHandle *) AHX;
696
697         if (!AH->currToc)
698                 die_horribly(AH, modulename, "internal error -- WriteData cannot be called outside the context of a DataDumper routine\n");
699
700         return (*AH->WriteDataPtr) (AH, data, dLen);
701 }
702
703 /*
704  * Create a new TOC entry. The TOC was designed as a TOC, but is now the
705  * repository for all metadata. But the name has stuck.
706  */
707
708 /* Public */
709 void
710 ArchiveEntry(Archive *AHX,
711                          CatalogId catalogId, DumpId dumpId,
712                          const char *tag,
713                          const char *namespace,
714                          const char *tablespace,
715                          const char *owner, bool withOids,
716                          const char *desc, teSection section,
717                          const char *defn,
718                          const char *dropStmt, const char *copyStmt,
719                          const DumpId *deps, int nDeps,
720                          DataDumperPtr dumpFn, void *dumpArg)
721 {
722         ArchiveHandle *AH = (ArchiveHandle *) AHX;
723         TocEntry   *newToc;
724
725         newToc = (TocEntry *) calloc(1, sizeof(TocEntry));
726         if (!newToc)
727                 die_horribly(AH, modulename, "out of memory\n");
728
729         AH->tocCount++;
730         if (dumpId > AH->maxDumpId)
731                 AH->maxDumpId = dumpId;
732
733         newToc->prev = AH->toc->prev;
734         newToc->next = AH->toc;
735         AH->toc->prev->next = newToc;
736         AH->toc->prev = newToc;
737
738         newToc->catalogId = catalogId;
739         newToc->dumpId = dumpId;
740         newToc->section = section;
741
742         newToc->tag = strdup(tag);
743         newToc->namespace = namespace ? strdup(namespace) : NULL;
744         newToc->tablespace = tablespace ? strdup(tablespace) : NULL;
745         newToc->owner = strdup(owner);
746         newToc->withOids = withOids;
747         newToc->desc = strdup(desc);
748         newToc->defn = strdup(defn);
749         newToc->dropStmt = strdup(dropStmt);
750         newToc->copyStmt = copyStmt ? strdup(copyStmt) : NULL;
751
752         if (nDeps > 0)
753         {
754                 newToc->dependencies = (DumpId *) malloc(nDeps * sizeof(DumpId));
755                 memcpy(newToc->dependencies, deps, nDeps * sizeof(DumpId));
756                 newToc->nDeps = nDeps;
757         }
758         else
759         {
760                 newToc->dependencies = NULL;
761                 newToc->nDeps = 0;
762         }
763
764         newToc->dataDumper = dumpFn;
765         newToc->dataDumperArg = dumpArg;
766         newToc->hadDumper = dumpFn ? true : false;
767
768         newToc->formatData = NULL;
769
770         if (AH->ArchiveEntryPtr !=NULL)
771                 (*AH->ArchiveEntryPtr) (AH, newToc);
772 }
773
774 /* Public */
775 void
776 PrintTOCSummary(Archive *AHX, RestoreOptions *ropt)
777 {
778         ArchiveHandle *AH = (ArchiveHandle *) AHX;
779         TocEntry   *te;
780         OutputContext sav;
781         char       *fmtName;
782
783         if (ropt->filename)
784                 sav = SetOutput(AH, ropt->filename, 0 /* no compression */ );
785
786         ahprintf(AH, ";\n; Archive created at %s", ctime(&AH->createDate));
787         ahprintf(AH, ";     dbname: %s\n;     TOC Entries: %d\n;     Compression: %d\n",
788                          AH->archdbname, AH->tocCount, AH->compression);
789
790         switch (AH->format)
791         {
792                 case archFiles:
793                         fmtName = "FILES";
794                         break;
795                 case archCustom:
796                         fmtName = "CUSTOM";
797                         break;
798                 case archTar:
799                         fmtName = "TAR";
800                         break;
801                 default:
802                         fmtName = "UNKNOWN";
803         }
804
805         ahprintf(AH, ";     Dump Version: %d.%d-%d\n", AH->vmaj, AH->vmin, AH->vrev);
806         ahprintf(AH, ";     Format: %s\n", fmtName);
807         ahprintf(AH, ";     Integer: %d bytes\n", (int) AH->intSize);
808         ahprintf(AH, ";     Offset: %d bytes\n", (int) AH->offSize);
809         if (AH->archiveRemoteVersion)
810                 ahprintf(AH, ";     Dumped from database version: %s\n",
811                                  AH->archiveRemoteVersion);
812         if (AH->archiveDumpVersion)
813                 ahprintf(AH, ";     Dumped by pg_dump version: %s\n",
814                                  AH->archiveDumpVersion);
815
816         ahprintf(AH, ";\n;\n; Selected TOC Entries:\n;\n");
817
818         for (te = AH->toc->next; te != AH->toc; te = te->next)
819         {
820                 if (ropt->verbose || _tocEntryRequired(te, ropt, true) != 0)
821                         ahprintf(AH, "%d; %u %u %s %s %s %s\n", te->dumpId,
822                                          te->catalogId.tableoid, te->catalogId.oid,
823                                          te->desc, te->namespace ? te->namespace : "-",
824                                          te->tag, te->owner);
825                 if (ropt->verbose && te->nDeps > 0)
826                 {
827                         int                     i;
828
829                         ahprintf(AH, ";\tdepends on:");
830                         for (i = 0; i < te->nDeps; i++)
831                                 ahprintf(AH, " %d", te->dependencies[i]);
832                         ahprintf(AH, "\n");
833                 }
834         }
835
836         if (ropt->filename)
837                 ResetOutput(AH, sav);
838 }
839
840 /***********
841  * BLOB Archival
842  ***********/
843
844 /* Called by a dumper to signal start of a BLOB */
845 int
846 StartBlob(Archive *AHX, Oid oid)
847 {
848         ArchiveHandle *AH = (ArchiveHandle *) AHX;
849
850         if (!AH->StartBlobPtr)
851                 die_horribly(AH, modulename, "large-object output not supported in chosen format\n");
852
853         (*AH->StartBlobPtr) (AH, AH->currToc, oid);
854
855         return 1;
856 }
857
858 /* Called by a dumper to signal end of a BLOB */
859 int
860 EndBlob(Archive *AHX, Oid oid)
861 {
862         ArchiveHandle *AH = (ArchiveHandle *) AHX;
863
864         if (AH->EndBlobPtr)
865                 (*AH->EndBlobPtr) (AH, AH->currToc, oid);
866
867         return 1;
868 }
869
870 /**********
871  * BLOB Restoration
872  **********/
873
874 /*
875  * Called by a format handler before any blobs are restored
876  */
877 void
878 StartRestoreBlobs(ArchiveHandle *AH)
879 {
880         if (!AH->ropt->single_txn)
881         {
882                 if (AH->connection)
883                         StartTransaction(AH);
884                 else
885                         ahprintf(AH, "BEGIN;\n\n");
886         }
887
888         AH->blobCount = 0;
889 }
890
891 /*
892  * Called by a format handler after all blobs are restored
893  */
894 void
895 EndRestoreBlobs(ArchiveHandle *AH)
896 {
897         if (!AH->ropt->single_txn)
898         {
899                 if (AH->connection)
900                         CommitTransaction(AH);
901                 else
902                         ahprintf(AH, "COMMIT;\n\n");
903         }
904
905         ahlog(AH, 1, ngettext("restored %d large object\n",
906                                                   "restored %d large objects\n",
907                                                   AH->blobCount),
908                   AH->blobCount);
909 }
910
911
912 /*
913  * Called by a format handler to initiate restoration of a blob
914  */
915 void
916 StartRestoreBlob(ArchiveHandle *AH, Oid oid, bool drop)
917 {
918         bool            old_blob_style = (AH->version < K_VERS_1_12);
919         Oid                     loOid;
920
921         AH->blobCount++;
922
923         /* Initialize the LO Buffer */
924         AH->lo_buf_used = 0;
925
926         ahlog(AH, 2, "restoring large object with OID %u\n", oid);
927
928         /* With an old archive we must do drop and create logic here */
929         if (old_blob_style && drop)
930                 DropBlobIfExists(AH, oid);
931
932         if (AH->connection)
933         {
934                 if (old_blob_style)
935                 {
936                         loOid = lo_create(AH->connection, oid);
937                         if (loOid == 0 || loOid != oid)
938                                 die_horribly(AH, modulename, "could not create large object %u\n",
939                                                          oid);
940                 }
941                 AH->loFd = lo_open(AH->connection, oid, INV_WRITE);
942                 if (AH->loFd == -1)
943                         die_horribly(AH, modulename, "could not open large object %u\n",
944                                                  oid);
945         }
946         else
947         {
948                 if (old_blob_style)
949                         ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n",
950                                          oid, INV_WRITE);
951                 else
952                         ahprintf(AH, "SELECT pg_catalog.lo_open('%u', %d);\n",
953                                          oid, INV_WRITE);
954         }
955
956         AH->writingBlob = 1;
957 }
958
959 void
960 EndRestoreBlob(ArchiveHandle *AH, Oid oid)
961 {
962         if (AH->lo_buf_used > 0)
963         {
964                 /* Write remaining bytes from the LO buffer */
965                 dump_lo_buf(AH);
966         }
967
968         AH->writingBlob = 0;
969
970         if (AH->connection)
971         {
972                 lo_close(AH->connection, AH->loFd);
973                 AH->loFd = -1;
974         }
975         else
976         {
977                 ahprintf(AH, "SELECT pg_catalog.lo_close(0);\n\n");
978         }
979 }
980
981 /***********
982  * Sorting and Reordering
983  ***********/
984
985 void
986 SortTocFromFile(Archive *AHX, RestoreOptions *ropt)
987 {
988         ArchiveHandle *AH = (ArchiveHandle *) AHX;
989         FILE       *fh;
990         char            buf[1024];
991         char       *cmnt;
992         char       *endptr;
993         DumpId          id;
994         TocEntry   *te;
995         TocEntry   *tePrev;
996
997         /* Allocate space for the 'wanted' array, and init it */
998         ropt->idWanted = (bool *) malloc(sizeof(bool) * AH->maxDumpId);
999         memset(ropt->idWanted, 0, sizeof(bool) * AH->maxDumpId);
1000
1001         /* Set prev entry as head of list */
1002         tePrev = AH->toc;
1003
1004         /* Setup the file */
1005         fh = fopen(ropt->tocFile, PG_BINARY_R);
1006         if (!fh)
1007                 die_horribly(AH, modulename, "could not open TOC file \"%s\": %s\n",
1008                                          ropt->tocFile, strerror(errno));
1009
1010         while (fgets(buf, sizeof(buf), fh) != NULL)
1011         {
1012                 /* Truncate line at comment, if any */
1013                 cmnt = strchr(buf, ';');
1014                 if (cmnt != NULL)
1015                         cmnt[0] = '\0';
1016
1017                 /* Ignore if all blank */
1018                 if (strspn(buf, " \t\r") == strlen(buf))
1019                         continue;
1020
1021                 /* Get an ID, check it's valid and not already seen */
1022                 id = strtol(buf, &endptr, 10);
1023                 if (endptr == buf || id <= 0 || id > AH->maxDumpId ||
1024                         ropt->idWanted[id - 1])
1025                 {
1026                         write_msg(modulename, "WARNING: line ignored: %s\n", buf);
1027                         continue;
1028                 }
1029
1030                 /* Find TOC entry */
1031                 te = getTocEntryByDumpId(AH, id);
1032                 if (!te)
1033                         die_horribly(AH, modulename, "could not find entry for ID %d\n",
1034                                                  id);
1035
1036                 ropt->idWanted[id - 1] = true;
1037
1038                 _moveAfter(AH, tePrev, te);
1039                 tePrev = te;
1040         }
1041
1042         if (fclose(fh) != 0)
1043                 die_horribly(AH, modulename, "could not close TOC file: %s\n",
1044                                          strerror(errno));
1045 }
1046
1047 /*
1048  * Set up a dummy ID filter that selects all dump IDs
1049  */
1050 void
1051 InitDummyWantedList(Archive *AHX, RestoreOptions *ropt)
1052 {
1053         ArchiveHandle *AH = (ArchiveHandle *) AHX;
1054
1055         /* Allocate space for the 'wanted' array, and init it to 1's */
1056         ropt->idWanted = (bool *) malloc(sizeof(bool) * AH->maxDumpId);
1057         memset(ropt->idWanted, 1, sizeof(bool) * AH->maxDumpId);
1058 }
1059
1060 /**********************
1061  * 'Convenience functions that look like standard IO functions
1062  * for writing data when in dump mode.
1063  **********************/
1064
1065 /* Public */
1066 int
1067 archputs(const char *s, Archive *AH)
1068 {
1069         return WriteData(AH, s, strlen(s));
1070 }
1071
1072 /* Public */
1073 int
1074 archprintf(Archive *AH, const char *fmt,...)
1075 {
1076         char       *p = NULL;
1077         va_list         ap;
1078         int                     bSize = strlen(fmt) + 256;
1079         int                     cnt = -1;
1080
1081         /*
1082          * This is paranoid: deal with the possibility that vsnprintf is willing
1083          * to ignore trailing null or returns > 0 even if string does not fit. It
1084          * may be the case that it returns cnt = bufsize
1085          */
1086         while (cnt < 0 || cnt >= (bSize - 1))
1087         {
1088                 if (p != NULL)
1089                         free(p);
1090                 bSize *= 2;
1091                 p = (char *) malloc(bSize);
1092                 if (p == NULL)
1093                         exit_horribly(AH, modulename, "out of memory\n");
1094                 va_start(ap, fmt);
1095                 cnt = vsnprintf(p, bSize, fmt, ap);
1096                 va_end(ap);
1097         }
1098         WriteData(AH, p, cnt);
1099         free(p);
1100         return cnt;
1101 }
1102
1103
1104 /*******************************
1105  * Stuff below here should be 'private' to the archiver routines
1106  *******************************/
1107
1108 static OutputContext
1109 SetOutput(ArchiveHandle *AH, char *filename, int compression)
1110 {
1111         OutputContext sav;
1112         int                     fn;
1113
1114         /* Replace the AH output file handle */
1115         sav.OF = AH->OF;
1116         sav.gzOut = AH->gzOut;
1117
1118         if (filename)
1119                 fn = -1;
1120         else if (AH->FH)
1121                 fn = fileno(AH->FH);
1122         else if (AH->fSpec)
1123         {
1124                 fn = -1;
1125                 filename = AH->fSpec;
1126         }
1127         else
1128                 fn = fileno(stdout);
1129
1130         /* If compression explicitly requested, use gzopen */
1131 #ifdef HAVE_LIBZ
1132         if (compression != 0)
1133         {
1134                 char            fmode[10];
1135
1136                 /* Don't use PG_BINARY_x since this is zlib */
1137                 sprintf(fmode, "wb%d", compression);
1138                 if (fn >= 0)
1139                         AH->OF = gzdopen(dup(fn), fmode);
1140                 else
1141                         AH->OF = gzopen(filename, fmode);
1142                 AH->gzOut = 1;
1143         }
1144         else
1145 #endif
1146         {                                                       /* Use fopen */
1147                 if (AH->mode == archModeAppend)
1148                 {
1149                         if (fn >= 0)
1150                                 AH->OF = fdopen(dup(fn), PG_BINARY_A);
1151                         else
1152                                 AH->OF = fopen(filename, PG_BINARY_A);
1153                 }
1154                 else
1155                 {
1156                         if (fn >= 0)
1157                                 AH->OF = fdopen(dup(fn), PG_BINARY_W);
1158                         else
1159                                 AH->OF = fopen(filename, PG_BINARY_W);
1160                 }
1161                 AH->gzOut = 0;
1162         }
1163
1164         if (!AH->OF)
1165         {
1166                 if (filename)
1167                         die_horribly(AH, modulename, "could not open output file \"%s\": %s\n",
1168                                                  filename, strerror(errno));
1169                 else
1170                         die_horribly(AH, modulename, "could not open output file: %s\n",
1171                                                  strerror(errno));
1172         }
1173
1174         return sav;
1175 }
1176
1177 static void
1178 ResetOutput(ArchiveHandle *AH, OutputContext sav)
1179 {
1180         int                     res;
1181
1182         if (AH->gzOut)
1183                 res = GZCLOSE(AH->OF);
1184         else
1185                 res = fclose(AH->OF);
1186
1187         if (res != 0)
1188                 die_horribly(AH, modulename, "could not close output file: %s\n",
1189                                          strerror(errno));
1190
1191         AH->gzOut = sav.gzOut;
1192         AH->OF = sav.OF;
1193 }
1194
1195
1196
1197 /*
1198  *      Print formatted text to the output file (usually stdout).
1199  */
1200 int
1201 ahprintf(ArchiveHandle *AH, const char *fmt,...)
1202 {
1203         char       *p = NULL;
1204         va_list         ap;
1205         int                     bSize = strlen(fmt) + 256;              /* Should be enough */
1206         int                     cnt = -1;
1207
1208         /*
1209          * This is paranoid: deal with the possibility that vsnprintf is willing
1210          * to ignore trailing null
1211          */
1212
1213         /*
1214          * or returns > 0 even if string does not fit. It may be the case that it
1215          * returns cnt = bufsize
1216          */
1217         while (cnt < 0 || cnt >= (bSize - 1))
1218         {
1219                 if (p != NULL)
1220                         free(p);
1221                 bSize *= 2;
1222                 p = (char *) malloc(bSize);
1223                 if (p == NULL)
1224                         die_horribly(AH, modulename, "out of memory\n");
1225                 va_start(ap, fmt);
1226                 cnt = vsnprintf(p, bSize, fmt, ap);
1227                 va_end(ap);
1228         }
1229         ahwrite(p, 1, cnt, AH);
1230         free(p);
1231         return cnt;
1232 }
1233
1234 void
1235 ahlog(ArchiveHandle *AH, int level, const char *fmt,...)
1236 {
1237         va_list         ap;
1238
1239         if (AH->debugLevel < level && (!AH->public.verbose || level > 1))
1240                 return;
1241
1242         va_start(ap, fmt);
1243         _write_msg(NULL, fmt, ap);
1244         va_end(ap);
1245 }
1246
1247 /*
1248  * Single place for logic which says 'We are restoring to a direct DB connection'.
1249  */
1250 static int
1251 RestoringToDB(ArchiveHandle *AH)
1252 {
1253         return (AH->ropt && AH->ropt->useDB && AH->connection);
1254 }
1255
1256 /*
1257  * Dump the current contents of the LO data buffer while writing a BLOB
1258  */
1259 static void
1260 dump_lo_buf(ArchiveHandle *AH)
1261 {
1262         if (AH->connection)
1263         {
1264                 size_t          res;
1265
1266                 res = lo_write(AH->connection, AH->loFd, AH->lo_buf, AH->lo_buf_used);
1267                 ahlog(AH, 5, ngettext("wrote %lu byte of large object data (result = %lu)\n",
1268                                          "wrote %lu bytes of large object data (result = %lu)\n",
1269                                                           AH->lo_buf_used),
1270                           (unsigned long) AH->lo_buf_used, (unsigned long) res);
1271                 if (res != AH->lo_buf_used)
1272                         die_horribly(AH, modulename,
1273                         "could not write to large object (result: %lu, expected: %lu)\n",
1274                                            (unsigned long) res, (unsigned long) AH->lo_buf_used);
1275         }
1276         else
1277         {
1278                 PQExpBuffer buf = createPQExpBuffer();
1279
1280                 appendByteaLiteralAHX(buf,
1281                                                           (const unsigned char *) AH->lo_buf,
1282                                                           AH->lo_buf_used,
1283                                                           AH);
1284
1285                 /* Hack: turn off writingBlob so ahwrite doesn't recurse to here */
1286                 AH->writingBlob = 0;
1287                 ahprintf(AH, "SELECT pg_catalog.lowrite(0, %s);\n", buf->data);
1288                 AH->writingBlob = 1;
1289
1290                 destroyPQExpBuffer(buf);
1291         }
1292         AH->lo_buf_used = 0;
1293 }
1294
1295
1296 /*
1297  *      Write buffer to the output file (usually stdout). This is user for
1298  *      outputting 'restore' scripts etc. It is even possible for an archive
1299  *      format to create a custom output routine to 'fake' a restore if it
1300  *      wants to generate a script (see TAR output).
1301  */
1302 int
1303 ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
1304 {
1305         size_t          res;
1306
1307         if (AH->writingBlob)
1308         {
1309                 size_t          remaining = size * nmemb;
1310
1311                 while (AH->lo_buf_used + remaining > AH->lo_buf_size)
1312                 {
1313                         size_t          avail = AH->lo_buf_size - AH->lo_buf_used;
1314
1315                         memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, avail);
1316                         ptr = (const void *) ((const char *) ptr + avail);
1317                         remaining -= avail;
1318                         AH->lo_buf_used += avail;
1319                         dump_lo_buf(AH);
1320                 }
1321
1322                 memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, remaining);
1323                 AH->lo_buf_used += remaining;
1324
1325                 return size * nmemb;
1326         }
1327         else if (AH->gzOut)
1328         {
1329                 res = GZWRITE((void *) ptr, size, nmemb, AH->OF);
1330                 if (res != (nmemb * size))
1331                         die_horribly(AH, modulename, "could not write to output file: %s\n", strerror(errno));
1332                 return res;
1333         }
1334         else if (AH->CustomOutPtr)
1335         {
1336                 res = AH->CustomOutPtr (AH, ptr, size * nmemb);
1337
1338                 if (res != (nmemb * size))
1339                         die_horribly(AH, modulename, "could not write to custom output routine\n");
1340                 return res;
1341         }
1342         else
1343         {
1344                 /*
1345                  * If we're doing a restore, and it's direct to DB, and we're
1346                  * connected then send it to the DB.
1347                  */
1348                 if (RestoringToDB(AH))
1349                         return ExecuteSqlCommandBuf(AH, (void *) ptr, size * nmemb);            /* Always 1, currently */
1350                 else
1351                 {
1352                         res = fwrite((void *) ptr, size, nmemb, AH->OF);
1353                         if (res != nmemb)
1354                                 die_horribly(AH, modulename, "could not write to output file: %s\n",
1355                                                          strerror(errno));
1356                         return res;
1357                 }
1358         }
1359 }
1360
1361 /* Common exit code */
1362 static void
1363 _write_msg(const char *modulename, const char *fmt, va_list ap)
1364 {
1365         if (modulename)
1366                 fprintf(stderr, "%s: [%s] ", progname, _(modulename));
1367         else
1368                 fprintf(stderr, "%s: ", progname);
1369         vfprintf(stderr, _(fmt), ap);
1370 }
1371
1372 void
1373 write_msg(const char *modulename, const char *fmt,...)
1374 {
1375         va_list         ap;
1376
1377         va_start(ap, fmt);
1378         _write_msg(modulename, fmt, ap);
1379         va_end(ap);
1380 }
1381
1382
1383 static void
1384 _die_horribly(ArchiveHandle *AH, const char *modulename, const char *fmt, va_list ap)
1385 {
1386         _write_msg(modulename, fmt, ap);
1387
1388         if (AH)
1389         {
1390                 if (AH->public.verbose)
1391                         write_msg(NULL, "*** aborted because of error\n");
1392                 if (AH->connection)
1393                         PQfinish(AH->connection);
1394         }
1395
1396         exit(1);
1397 }
1398
1399 /* External use */
1400 void
1401 exit_horribly(Archive *AH, const char *modulename, const char *fmt,...)
1402 {
1403         va_list         ap;
1404
1405         va_start(ap, fmt);
1406         _die_horribly((ArchiveHandle *) AH, modulename, fmt, ap);
1407         va_end(ap);
1408 }
1409
1410 /* Archiver use (just different arg declaration) */
1411 void
1412 die_horribly(ArchiveHandle *AH, const char *modulename, const char *fmt,...)
1413 {
1414         va_list         ap;
1415
1416         va_start(ap, fmt);
1417         _die_horribly(AH, modulename, fmt, ap);
1418         va_end(ap);
1419 }
1420
1421 /* on some error, we may decide to go on... */
1422 void
1423 warn_or_die_horribly(ArchiveHandle *AH,
1424                                          const char *modulename, const char *fmt,...)
1425 {
1426         va_list         ap;
1427
1428         switch (AH->stage)
1429         {
1430
1431                 case STAGE_NONE:
1432                         /* Do nothing special */
1433                         break;
1434
1435                 case STAGE_INITIALIZING:
1436                         if (AH->stage != AH->lastErrorStage)
1437                                 write_msg(modulename, "Error while INITIALIZING:\n");
1438                         break;
1439
1440                 case STAGE_PROCESSING:
1441                         if (AH->stage != AH->lastErrorStage)
1442                                 write_msg(modulename, "Error while PROCESSING TOC:\n");
1443                         break;
1444
1445                 case STAGE_FINALIZING:
1446                         if (AH->stage != AH->lastErrorStage)
1447                                 write_msg(modulename, "Error while FINALIZING:\n");
1448                         break;
1449         }
1450         if (AH->currentTE != NULL && AH->currentTE != AH->lastErrorTE)
1451         {
1452                 write_msg(modulename, "Error from TOC entry %d; %u %u %s %s %s\n",
1453                                   AH->currentTE->dumpId,
1454                          AH->currentTE->catalogId.tableoid, AH->currentTE->catalogId.oid,
1455                           AH->currentTE->desc, AH->currentTE->tag, AH->currentTE->owner);
1456         }
1457         AH->lastErrorStage = AH->stage;
1458         AH->lastErrorTE = AH->currentTE;
1459
1460         va_start(ap, fmt);
1461         if (AH->public.exit_on_error)
1462                 _die_horribly(AH, modulename, fmt, ap);
1463         else
1464         {
1465                 _write_msg(modulename, fmt, ap);
1466                 AH->public.n_errors++;
1467         }
1468         va_end(ap);
1469 }
1470
1471 static void
1472 _moveAfter(ArchiveHandle *AH, TocEntry *pos, TocEntry *te)
1473 {
1474         te->prev->next = te->next;
1475         te->next->prev = te->prev;
1476
1477         te->prev = pos;
1478         te->next = pos->next;
1479
1480         pos->next->prev = te;
1481         pos->next = te;
1482 }
1483
1484 #ifdef NOT_USED
1485
1486 static void
1487 _moveBefore(ArchiveHandle *AH, TocEntry *pos, TocEntry *te)
1488 {
1489         te->prev->next = te->next;
1490         te->next->prev = te->prev;
1491
1492         te->prev = pos->prev;
1493         te->next = pos;
1494         pos->prev->next = te;
1495         pos->prev = te;
1496 }
1497 #endif
1498
1499 static TocEntry *
1500 getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
1501 {
1502         TocEntry   *te;
1503
1504         for (te = AH->toc->next; te != AH->toc; te = te->next)
1505         {
1506                 if (te->dumpId == id)
1507                         return te;
1508         }
1509         return NULL;
1510 }
1511
1512 teReqs
1513 TocIDRequired(ArchiveHandle *AH, DumpId id, RestoreOptions *ropt)
1514 {
1515         TocEntry   *te = getTocEntryByDumpId(AH, id);
1516
1517         if (!te)
1518                 return 0;
1519
1520         return _tocEntryRequired(te, ropt, true);
1521 }
1522
1523 size_t
1524 WriteOffset(ArchiveHandle *AH, pgoff_t o, int wasSet)
1525 {
1526         int                     off;
1527
1528         /* Save the flag */
1529         (*AH->WriteBytePtr) (AH, wasSet);
1530
1531         /* Write out pgoff_t smallest byte first, prevents endian mismatch */
1532         for (off = 0; off < sizeof(pgoff_t); off++)
1533         {
1534                 (*AH->WriteBytePtr) (AH, o & 0xFF);
1535                 o >>= 8;
1536         }
1537         return sizeof(pgoff_t) + 1;
1538 }
1539
1540 int
1541 ReadOffset(ArchiveHandle *AH, pgoff_t * o)
1542 {
1543         int                     i;
1544         int                     off;
1545         int                     offsetFlg;
1546
1547         /* Initialize to zero */
1548         *o = 0;
1549
1550         /* Check for old version */
1551         if (AH->version < K_VERS_1_7)
1552         {
1553                 /* Prior versions wrote offsets using WriteInt */
1554                 i = ReadInt(AH);
1555                 /* -1 means not set */
1556                 if (i < 0)
1557                         return K_OFFSET_POS_NOT_SET;
1558                 else if (i == 0)
1559                         return K_OFFSET_NO_DATA;
1560
1561                 /* Cast to pgoff_t because it was written as an int. */
1562                 *o = (pgoff_t) i;
1563                 return K_OFFSET_POS_SET;
1564         }
1565
1566         /*
1567          * Read the flag indicating the state of the data pointer. Check if valid
1568          * and die if not.
1569          *
1570          * This used to be handled by a negative or zero pointer, now we use an
1571          * extra byte specifically for the state.
1572          */
1573         offsetFlg = (*AH->ReadBytePtr) (AH) & 0xFF;
1574
1575         switch (offsetFlg)
1576         {
1577                 case K_OFFSET_POS_NOT_SET:
1578                 case K_OFFSET_NO_DATA:
1579                 case K_OFFSET_POS_SET:
1580
1581                         break;
1582
1583                 default:
1584                         die_horribly(AH, modulename, "unexpected data offset flag %d\n", offsetFlg);
1585         }
1586
1587         /*
1588          * Read the bytes
1589          */
1590         for (off = 0; off < AH->offSize; off++)
1591         {
1592                 if (off < sizeof(pgoff_t))
1593                         *o |= ((pgoff_t) ((*AH->ReadBytePtr) (AH))) << (off * 8);
1594                 else
1595                 {
1596                         if ((*AH->ReadBytePtr) (AH) != 0)
1597                                 die_horribly(AH, modulename, "file offset in dump file is too large\n");
1598                 }
1599         }
1600
1601         return offsetFlg;
1602 }
1603
1604 size_t
1605 WriteInt(ArchiveHandle *AH, int i)
1606 {
1607         int                     b;
1608
1609         /*
1610          * This is a bit yucky, but I don't want to make the binary format very
1611          * dependent on representation, and not knowing much about it, I write out
1612          * a sign byte. If you change this, don't forget to change the file
1613          * version #, and modify readInt to read the new format AS WELL AS the old
1614          * formats.
1615          */
1616
1617         /* SIGN byte */
1618         if (i < 0)
1619         {
1620                 (*AH->WriteBytePtr) (AH, 1);
1621                 i = -i;
1622         }
1623         else
1624                 (*AH->WriteBytePtr) (AH, 0);
1625
1626         for (b = 0; b < AH->intSize; b++)
1627         {
1628                 (*AH->WriteBytePtr) (AH, i & 0xFF);
1629                 i >>= 8;
1630         }
1631
1632         return AH->intSize + 1;
1633 }
1634
1635 int
1636 ReadInt(ArchiveHandle *AH)
1637 {
1638         int                     res = 0;
1639         int                     bv,
1640                                 b;
1641         int                     sign = 0;               /* Default positive */
1642         int                     bitShift = 0;
1643
1644         if (AH->version > K_VERS_1_0)
1645                 /* Read a sign byte */
1646                 sign = (*AH->ReadBytePtr) (AH);
1647
1648         for (b = 0; b < AH->intSize; b++)
1649         {
1650                 bv = (*AH->ReadBytePtr) (AH) & 0xFF;
1651                 if (bv != 0)
1652                         res = res + (bv << bitShift);
1653                 bitShift += 8;
1654         }
1655
1656         if (sign)
1657                 res = -res;
1658
1659         return res;
1660 }
1661
1662 size_t
1663 WriteStr(ArchiveHandle *AH, const char *c)
1664 {
1665         size_t          res;
1666
1667         if (c)
1668         {
1669                 res = WriteInt(AH, strlen(c));
1670                 res += (*AH->WriteBufPtr) (AH, c, strlen(c));
1671         }
1672         else
1673                 res = WriteInt(AH, -1);
1674
1675         return res;
1676 }
1677
1678 char *
1679 ReadStr(ArchiveHandle *AH)
1680 {
1681         char       *buf;
1682         int                     l;
1683
1684         l = ReadInt(AH);
1685         if (l < 0)
1686                 buf = NULL;
1687         else
1688         {
1689                 buf = (char *) malloc(l + 1);
1690                 if (!buf)
1691                         die_horribly(AH, modulename, "out of memory\n");
1692
1693                 if ((*AH->ReadBufPtr) (AH, (void *) buf, l) != l)
1694                         die_horribly(AH, modulename, "unexpected end of file\n");
1695
1696                 buf[l] = '\0';
1697         }
1698
1699         return buf;
1700 }
1701
1702 static int
1703 _discoverArchiveFormat(ArchiveHandle *AH)
1704 {
1705         FILE       *fh;
1706         char            sig[6];                 /* More than enough */
1707         size_t          cnt;
1708         int                     wantClose = 0;
1709
1710 #if 0
1711         write_msg(modulename, "attempting to ascertain archive format\n");
1712 #endif
1713
1714         if (AH->lookahead)
1715                 free(AH->lookahead);
1716
1717         AH->lookaheadSize = 512;
1718         AH->lookahead = calloc(1, 512);
1719         AH->lookaheadLen = 0;
1720         AH->lookaheadPos = 0;
1721
1722         if (AH->fSpec)
1723         {
1724                 wantClose = 1;
1725                 fh = fopen(AH->fSpec, PG_BINARY_R);
1726                 if (!fh)
1727                         die_horribly(AH, modulename, "could not open input file \"%s\": %s\n",
1728                                                  AH->fSpec, strerror(errno));
1729         }
1730         else
1731         {
1732                 fh = stdin;
1733                 if (!fh)
1734                         die_horribly(AH, modulename, "could not open input file: %s\n",
1735                                                  strerror(errno));
1736         }
1737
1738         cnt = fread(sig, 1, 5, fh);
1739
1740         if (cnt != 5)
1741         {
1742                 if (ferror(fh))
1743                         die_horribly(AH, modulename, "could not read input file: %s\n", strerror(errno));
1744                 else
1745                         die_horribly(AH, modulename, "input file is too short (read %lu, expected 5)\n",
1746                                                  (unsigned long) cnt);
1747         }
1748
1749         /* Save it, just in case we need it later */
1750         strncpy(&AH->lookahead[0], sig, 5);
1751         AH->lookaheadLen = 5;
1752
1753         if (strncmp(sig, "PGDMP", 5) == 0)
1754         {
1755                 AH->vmaj = fgetc(fh);
1756                 AH->vmin = fgetc(fh);
1757
1758                 /* Save these too... */
1759                 AH->lookahead[AH->lookaheadLen++] = AH->vmaj;
1760                 AH->lookahead[AH->lookaheadLen++] = AH->vmin;
1761
1762                 /* Check header version; varies from V1.0 */
1763                 if (AH->vmaj > 1 || ((AH->vmaj == 1) && (AH->vmin > 0)))                /* Version > 1.0 */
1764                 {
1765                         AH->vrev = fgetc(fh);
1766                         AH->lookahead[AH->lookaheadLen++] = AH->vrev;
1767                 }
1768                 else
1769                         AH->vrev = 0;
1770
1771                 /* Make a convenient integer <maj><min><rev>00 */
1772                 AH->version = ((AH->vmaj * 256 + AH->vmin) * 256 + AH->vrev) * 256 + 0;
1773
1774                 AH->intSize = fgetc(fh);
1775                 AH->lookahead[AH->lookaheadLen++] = AH->intSize;
1776
1777                 if (AH->version >= K_VERS_1_7)
1778                 {
1779                         AH->offSize = fgetc(fh);
1780                         AH->lookahead[AH->lookaheadLen++] = AH->offSize;
1781                 }
1782                 else
1783                         AH->offSize = AH->intSize;
1784
1785                 AH->format = fgetc(fh);
1786                 AH->lookahead[AH->lookaheadLen++] = AH->format;
1787         }
1788         else
1789         {
1790                 /*
1791                  * *Maybe* we have a tar archive format file... So, read first 512
1792                  * byte header...
1793                  */
1794                 cnt = fread(&AH->lookahead[AH->lookaheadLen], 1, 512 - AH->lookaheadLen, fh);
1795                 AH->lookaheadLen += cnt;
1796
1797                 if (AH->lookaheadLen != 512)
1798                         die_horribly(AH, modulename, "input file does not appear to be a valid archive (too short?)\n");
1799
1800                 if (!isValidTarHeader(AH->lookahead))
1801                         die_horribly(AH, modulename, "input file does not appear to be a valid archive\n");
1802
1803                 AH->format = archTar;
1804         }
1805
1806         /* If we can't seek, then mark the header as read */
1807         if (fseeko(fh, 0, SEEK_SET) != 0)
1808         {
1809                 /*
1810                  * NOTE: Formats that use the lookahead buffer can unset this in their
1811                  * Init routine.
1812                  */
1813                 AH->readHeader = 1;
1814         }
1815         else
1816                 AH->lookaheadLen = 0;   /* Don't bother since we've reset the file */
1817
1818         /* Close the file */
1819         if (wantClose)
1820                 if (fclose(fh) != 0)
1821                         die_horribly(AH, modulename, "could not close input file: %s\n",
1822                                                  strerror(errno));
1823
1824         return AH->format;
1825 }
1826
1827
1828 /*
1829  * Allocate an archive handle
1830  */
1831 static ArchiveHandle *
1832 _allocAH(const char *FileSpec, const ArchiveFormat fmt,
1833                  const int compression, ArchiveMode mode)
1834 {
1835         ArchiveHandle *AH;
1836
1837 #if 0
1838         write_msg(modulename, "allocating AH for %s, format %d\n", FileSpec, fmt);
1839 #endif
1840
1841         AH = (ArchiveHandle *) calloc(1, sizeof(ArchiveHandle));
1842         if (!AH)
1843                 die_horribly(AH, modulename, "out of memory\n");
1844
1845         /* AH->debugLevel = 100; */
1846
1847         AH->vmaj = K_VERS_MAJOR;
1848         AH->vmin = K_VERS_MINOR;
1849         AH->vrev = K_VERS_REV;
1850
1851         /* Make a convenient integer <maj><min><rev>00 */
1852         AH->version = ((AH->vmaj * 256 + AH->vmin) * 256 + AH->vrev) * 256 + 0;
1853
1854         /* initialize for backwards compatible string processing */
1855         AH->public.encoding = 0;        /* PG_SQL_ASCII */
1856         AH->public.std_strings = false;
1857
1858         /* sql error handling */
1859         AH->public.exit_on_error = true;
1860         AH->public.n_errors = 0;
1861
1862         AH->archiveDumpVersion = PG_VERSION;
1863
1864         AH->createDate = time(NULL);
1865
1866         AH->intSize = sizeof(int);
1867         AH->offSize = sizeof(pgoff_t);
1868         if (FileSpec)
1869         {
1870                 AH->fSpec = strdup(FileSpec);
1871
1872                 /*
1873                  * Not used; maybe later....
1874                  *
1875                  * AH->workDir = strdup(FileSpec); for(i=strlen(FileSpec) ; i > 0 ;
1876                  * i--) if (AH->workDir[i-1] == '/')
1877                  */
1878         }
1879         else
1880                 AH->fSpec = NULL;
1881
1882         AH->currUser = NULL;            /* unknown */
1883         AH->currSchema = NULL;          /* ditto */
1884         AH->currTablespace = NULL;      /* ditto */
1885         AH->currWithOids = -1;          /* force SET */
1886
1887         AH->toc = (TocEntry *) calloc(1, sizeof(TocEntry));
1888         if (!AH->toc)
1889                 die_horribly(AH, modulename, "out of memory\n");
1890
1891         AH->toc->next = AH->toc;
1892         AH->toc->prev = AH->toc;
1893
1894         AH->mode = mode;
1895         AH->compression = compression;
1896
1897         AH->pgCopyBuf = createPQExpBuffer();
1898         AH->sqlBuf = createPQExpBuffer();
1899
1900         /* Open stdout with no compression for AH output handle */
1901         AH->gzOut = 0;
1902         AH->OF = stdout;
1903
1904         /*
1905          * On Windows, we need to use binary mode to read/write non-text archive
1906          * formats.  Force stdin/stdout into binary mode if that is what we are
1907          * using.
1908          */
1909 #ifdef WIN32
1910         if (fmt != archNull &&
1911                 (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0))
1912         {
1913                 if (mode == archModeWrite)
1914                         setmode(fileno(stdout), O_BINARY);
1915                 else
1916                         setmode(fileno(stdin), O_BINARY);
1917         }
1918 #endif
1919
1920         if (fmt == archUnknown)
1921                 AH->format = _discoverArchiveFormat(AH);
1922         else
1923                 AH->format = fmt;
1924
1925         AH->promptPassword = TRI_DEFAULT;
1926
1927         switch (AH->format)
1928         {
1929                 case archCustom:
1930                         InitArchiveFmt_Custom(AH);
1931                         break;
1932
1933                 case archFiles:
1934                         InitArchiveFmt_Files(AH);
1935                         break;
1936
1937                 case archNull:
1938                         InitArchiveFmt_Null(AH);
1939                         break;
1940
1941                 case archTar:
1942                         InitArchiveFmt_Tar(AH);
1943                         break;
1944
1945                 default:
1946                         die_horribly(AH, modulename, "unrecognized file format \"%d\"\n", fmt);
1947         }
1948
1949         return AH;
1950 }
1951
1952
1953 void
1954 WriteDataChunks(ArchiveHandle *AH)
1955 {
1956         TocEntry   *te;
1957         StartDataPtr startPtr;
1958         EndDataPtr      endPtr;
1959
1960         for (te = AH->toc->next; te != AH->toc; te = te->next)
1961         {
1962                 if (te->dataDumper != NULL)
1963                 {
1964                         AH->currToc = te;
1965                         /* printf("Writing data for %d (%x)\n", te->id, te); */
1966
1967                         if (strcmp(te->desc, "BLOBS") == 0)
1968                         {
1969                                 startPtr = AH->StartBlobsPtr;
1970                                 endPtr = AH->EndBlobsPtr;
1971                         }
1972                         else
1973                         {
1974                                 startPtr = AH->StartDataPtr;
1975                                 endPtr = AH->EndDataPtr;
1976                         }
1977
1978                         if (startPtr != NULL)
1979                                 (*startPtr) (AH, te);
1980
1981                         /*
1982                          * printf("Dumper arg for %d is %x\n", te->id, te->dataDumperArg);
1983                          */
1984
1985                         /*
1986                          * The user-provided DataDumper routine needs to call
1987                          * AH->WriteData
1988                          */
1989                         (*te->dataDumper) ((Archive *) AH, te->dataDumperArg);
1990
1991                         if (endPtr != NULL)
1992                                 (*endPtr) (AH, te);
1993                         AH->currToc = NULL;
1994                 }
1995         }
1996 }
1997
1998 void
1999 WriteToc(ArchiveHandle *AH)
2000 {
2001         TocEntry   *te;
2002         char            workbuf[32];
2003         int                     i;
2004
2005         /* printf("%d TOC Entries to save\n", AH->tocCount); */
2006
2007         WriteInt(AH, AH->tocCount);
2008
2009         for (te = AH->toc->next; te != AH->toc; te = te->next)
2010         {
2011                 WriteInt(AH, te->dumpId);
2012                 WriteInt(AH, te->dataDumper ? 1 : 0);
2013
2014                 /* OID is recorded as a string for historical reasons */
2015                 sprintf(workbuf, "%u", te->catalogId.tableoid);
2016                 WriteStr(AH, workbuf);
2017                 sprintf(workbuf, "%u", te->catalogId.oid);
2018                 WriteStr(AH, workbuf);
2019
2020                 WriteStr(AH, te->tag);
2021                 WriteStr(AH, te->desc);
2022                 WriteInt(AH, te->section);
2023                 WriteStr(AH, te->defn);
2024                 WriteStr(AH, te->dropStmt);
2025                 WriteStr(AH, te->copyStmt);
2026                 WriteStr(AH, te->namespace);
2027                 WriteStr(AH, te->tablespace);
2028                 WriteStr(AH, te->owner);
2029                 WriteStr(AH, te->withOids ? "true" : "false");
2030
2031                 /* Dump list of dependencies */
2032                 for (i = 0; i < te->nDeps; i++)
2033                 {
2034                         sprintf(workbuf, "%d", te->dependencies[i]);
2035                         WriteStr(AH, workbuf);
2036                 }
2037                 WriteStr(AH, NULL);             /* Terminate List */
2038
2039                 if (AH->WriteExtraTocPtr)
2040                         (*AH->WriteExtraTocPtr) (AH, te);
2041         }
2042 }
2043
2044 void
2045 ReadToc(ArchiveHandle *AH)
2046 {
2047         int                     i;
2048         char       *tmp;
2049         DumpId     *deps;
2050         int                     depIdx;
2051         int                     depSize;
2052         TocEntry   *te;
2053
2054         AH->tocCount = ReadInt(AH);
2055         AH->maxDumpId = 0;
2056
2057         for (i = 0; i < AH->tocCount; i++)
2058         {
2059                 te = (TocEntry *) calloc(1, sizeof(TocEntry));
2060                 te->dumpId = ReadInt(AH);
2061
2062                 if (te->dumpId > AH->maxDumpId)
2063                         AH->maxDumpId = te->dumpId;
2064
2065                 /* Sanity check */
2066                 if (te->dumpId <= 0)
2067                         die_horribly(AH, modulename,
2068                                            "entry ID %d out of range -- perhaps a corrupt TOC\n",
2069                                                  te->dumpId);
2070
2071                 te->hadDumper = ReadInt(AH);
2072
2073                 if (AH->version >= K_VERS_1_8)
2074                 {
2075                         tmp = ReadStr(AH);
2076                         sscanf(tmp, "%u", &te->catalogId.tableoid);
2077                         free(tmp);
2078                 }
2079                 else
2080                         te->catalogId.tableoid = InvalidOid;
2081                 tmp = ReadStr(AH);
2082                 sscanf(tmp, "%u", &te->catalogId.oid);
2083                 free(tmp);
2084
2085                 te->tag = ReadStr(AH);
2086                 te->desc = ReadStr(AH);
2087
2088                 if (AH->version >= K_VERS_1_11)
2089                 {
2090                         te->section = ReadInt(AH);
2091                 }
2092                 else
2093                 {
2094                         /*
2095                          * Rules for pre-8.4 archives wherein pg_dump hasn't classified
2096                          * the entries into sections.  This list need not cover entry
2097                          * types added later than 8.4.
2098                          */
2099                         if (strcmp(te->desc, "COMMENT") == 0 ||
2100                                 strcmp(te->desc, "ACL") == 0 ||
2101                                 strcmp(te->desc, "ACL LANGUAGE") == 0)
2102                                 te->section = SECTION_NONE;
2103                         else if (strcmp(te->desc, "TABLE DATA") == 0 ||
2104                                          strcmp(te->desc, "BLOBS") == 0 ||
2105                                          strcmp(te->desc, "BLOB COMMENTS") == 0)
2106                                 te->section = SECTION_DATA;
2107                         else if (strcmp(te->desc, "CONSTRAINT") == 0 ||
2108                                          strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
2109                                          strcmp(te->desc, "FK CONSTRAINT") == 0 ||
2110                                          strcmp(te->desc, "INDEX") == 0 ||
2111                                          strcmp(te->desc, "RULE") == 0 ||
2112                                          strcmp(te->desc, "TRIGGER") == 0)
2113                                 te->section = SECTION_POST_DATA;
2114                         else
2115                                 te->section = SECTION_PRE_DATA;
2116                 }
2117
2118                 te->defn = ReadStr(AH);
2119                 te->dropStmt = ReadStr(AH);
2120
2121                 if (AH->version >= K_VERS_1_3)
2122                         te->copyStmt = ReadStr(AH);
2123
2124                 if (AH->version >= K_VERS_1_6)
2125                         te->namespace = ReadStr(AH);
2126
2127                 if (AH->version >= K_VERS_1_10)
2128                         te->tablespace = ReadStr(AH);
2129
2130                 te->owner = ReadStr(AH);
2131                 if (AH->version >= K_VERS_1_9)
2132                 {
2133                         if (strcmp(ReadStr(AH), "true") == 0)
2134                                 te->withOids = true;
2135                         else
2136                                 te->withOids = false;
2137                 }
2138                 else
2139                         te->withOids = true;
2140
2141                 /* Read TOC entry dependencies */
2142                 if (AH->version >= K_VERS_1_5)
2143                 {
2144                         depSize = 100;
2145                         deps = (DumpId *) malloc(sizeof(DumpId) * depSize);
2146                         depIdx = 0;
2147                         for (;;)
2148                         {
2149                                 tmp = ReadStr(AH);
2150                                 if (!tmp)
2151                                         break;          /* end of list */
2152                                 if (depIdx >= depSize)
2153                                 {
2154                                         depSize *= 2;
2155                                         deps = (DumpId *) realloc(deps, sizeof(DumpId) * depSize);
2156                                 }
2157                                 sscanf(tmp, "%d", &deps[depIdx]);
2158                                 free(tmp);
2159                                 depIdx++;
2160                         }
2161
2162                         if (depIdx > 0)         /* We have a non-null entry */
2163                         {
2164                                 deps = (DumpId *) realloc(deps, sizeof(DumpId) * depIdx);
2165                                 te->dependencies = deps;
2166                                 te->nDeps = depIdx;
2167                         }
2168                         else
2169                         {
2170                                 free(deps);
2171                                 te->dependencies = NULL;
2172                                 te->nDeps = 0;
2173                         }
2174                 }
2175                 else
2176                 {
2177                         te->dependencies = NULL;
2178                         te->nDeps = 0;
2179                 }
2180
2181                 if (AH->ReadExtraTocPtr)
2182                         (*AH->ReadExtraTocPtr) (AH, te);
2183
2184                 ahlog(AH, 3, "read TOC entry %d (ID %d) for %s %s\n",
2185                           i, te->dumpId, te->desc, te->tag);
2186
2187                 /* link completed entry into TOC circular list */
2188                 te->prev = AH->toc->prev;
2189                 AH->toc->prev->next = te;
2190                 AH->toc->prev = te;
2191                 te->next = AH->toc;
2192
2193                 /* special processing immediately upon read for some items */
2194                 if (strcmp(te->desc, "ENCODING") == 0)
2195                         processEncodingEntry(AH, te);
2196                 else if (strcmp(te->desc, "STDSTRINGS") == 0)
2197                         processStdStringsEntry(AH, te);
2198         }
2199 }
2200
2201 static void
2202 processEncodingEntry(ArchiveHandle *AH, TocEntry *te)
2203 {
2204         /* te->defn should have the form SET client_encoding = 'foo'; */
2205         char       *defn = strdup(te->defn);
2206         char       *ptr1;
2207         char       *ptr2 = NULL;
2208         int                     encoding;
2209
2210         ptr1 = strchr(defn, '\'');
2211         if (ptr1)
2212                 ptr2 = strchr(++ptr1, '\'');
2213         if (ptr2)
2214         {
2215                 *ptr2 = '\0';
2216                 encoding = pg_char_to_encoding(ptr1);
2217                 if (encoding < 0)
2218                         die_horribly(AH, modulename, "unrecognized encoding \"%s\"\n",
2219                                                  ptr1);
2220                 AH->public.encoding = encoding;
2221         }
2222         else
2223                 die_horribly(AH, modulename, "invalid ENCODING item: %s\n",
2224                                          te->defn);
2225
2226         free(defn);
2227 }
2228
2229 static void
2230 processStdStringsEntry(ArchiveHandle *AH, TocEntry *te)
2231 {
2232         /* te->defn should have the form SET standard_conforming_strings = 'x'; */
2233         char       *ptr1;
2234
2235         ptr1 = strchr(te->defn, '\'');
2236         if (ptr1 && strncmp(ptr1, "'on'", 4) == 0)
2237                 AH->public.std_strings = true;
2238         else if (ptr1 && strncmp(ptr1, "'off'", 5) == 0)
2239                 AH->public.std_strings = false;
2240         else
2241                 die_horribly(AH, modulename, "invalid STDSTRINGS item: %s\n",
2242                                          te->defn);
2243 }
2244
2245 static teReqs
2246 _tocEntryRequired(TocEntry *te, RestoreOptions *ropt, bool include_acls)
2247 {
2248         teReqs          res = REQ_ALL;
2249
2250         /* ENCODING and STDSTRINGS items are dumped specially, so always reject */
2251         if (strcmp(te->desc, "ENCODING") == 0 ||
2252                 strcmp(te->desc, "STDSTRINGS") == 0)
2253                 return 0;
2254
2255         /* If it's an ACL, maybe ignore it */
2256         if ((!include_acls || ropt->aclsSkip) && _tocEntryIsACL(te))
2257                 return 0;
2258
2259         /* Ignore DATABASE entry unless we should create it */
2260         if (!ropt->create && strcmp(te->desc, "DATABASE") == 0)
2261                 return 0;
2262
2263         /* Check options for selective dump/restore */
2264         if (ropt->schemaNames)
2265         {
2266                 /* If no namespace is specified, it means all. */
2267                 if (!te->namespace)
2268                         return 0;
2269                 if (strcmp(ropt->schemaNames, te->namespace) != 0)
2270                         return 0;
2271         }
2272
2273         if (ropt->selTypes)
2274         {
2275                 if (strcmp(te->desc, "TABLE") == 0 ||
2276                         strcmp(te->desc, "TABLE DATA") == 0)
2277                 {
2278                         if (!ropt->selTable)
2279                                 return 0;
2280                         if (ropt->tableNames && strcmp(ropt->tableNames, te->tag) != 0)
2281                                 return 0;
2282                 }
2283                 else if (strcmp(te->desc, "INDEX") == 0)
2284                 {
2285                         if (!ropt->selIndex)
2286                                 return 0;
2287                         if (ropt->indexNames && strcmp(ropt->indexNames, te->tag) != 0)
2288                                 return 0;
2289                 }
2290                 else if (strcmp(te->desc, "FUNCTION") == 0)
2291                 {
2292                         if (!ropt->selFunction)
2293                                 return 0;
2294                         if (ropt->functionNames && strcmp(ropt->functionNames, te->tag) != 0)
2295                                 return 0;
2296                 }
2297                 else if (strcmp(te->desc, "TRIGGER") == 0)
2298                 {
2299                         if (!ropt->selTrigger)
2300                                 return 0;
2301                         if (ropt->triggerNames && strcmp(ropt->triggerNames, te->tag) != 0)
2302                                 return 0;
2303                 }
2304                 else
2305                         return 0;
2306         }
2307
2308         /*
2309          * Check if we had a dataDumper. Indicates if the entry is schema or data
2310          */
2311         if (!te->hadDumper)
2312         {
2313                 /*
2314                  * Special Case: If 'SEQUENCE SET' or anything to do with BLOBs, then
2315                  * it is considered a data entry.  We don't need to check for the
2316                  * BLOBS entry or old-style BLOB COMMENTS, because they will have
2317                  * hadDumper = true ... but we do need to check new-style BLOB
2318                  * comments.
2319                  */
2320                 if (strcmp(te->desc, "SEQUENCE SET") == 0 ||
2321                         strcmp(te->desc, "BLOB") == 0 ||
2322                         (strcmp(te->desc, "ACL") == 0 &&
2323                          strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
2324                         (strcmp(te->desc, "COMMENT") == 0 &&
2325                          strncmp(te->tag, "LARGE OBJECT ", 13) == 0))
2326                         res = res & REQ_DATA;
2327                 else
2328                         res = res & ~REQ_DATA;
2329         }
2330
2331         /*
2332          * Special case: <Init> type with <Max OID> tag; this is obsolete and we
2333          * always ignore it.
2334          */
2335         if ((strcmp(te->desc, "<Init>") == 0) && (strcmp(te->tag, "Max OID") == 0))
2336                 return 0;
2337
2338         /* Mask it if we only want schema */
2339         if (ropt->schemaOnly)
2340                 res = res & REQ_SCHEMA;
2341
2342         /* Mask it we only want data */
2343         if (ropt->dataOnly)
2344                 res = res & REQ_DATA;
2345
2346         /* Mask it if we don't have a schema contribution */
2347         if (!te->defn || strlen(te->defn) == 0)
2348                 res = res & ~REQ_SCHEMA;
2349
2350         /* Finally, if there's a per-ID filter, limit based on that as well */
2351         if (ropt->idWanted && !ropt->idWanted[te->dumpId - 1])
2352                 return 0;
2353
2354         return res;
2355 }
2356
2357 /*
2358  * Identify TOC entries that are ACLs.
2359  */
2360 static bool
2361 _tocEntryIsACL(TocEntry *te)
2362 {
2363         /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
2364         if (strcmp(te->desc, "ACL") == 0 ||
2365                 strcmp(te->desc, "ACL LANGUAGE") == 0 ||
2366                 strcmp(te->desc, "DEFAULT ACL") == 0)
2367                 return true;
2368         return false;
2369 }
2370
2371 /*
2372  * Issue SET commands for parameters that we want to have set the same way
2373  * at all times during execution of a restore script.
2374  */
2375 static void
2376 _doSetFixedOutputState(ArchiveHandle *AH)
2377 {
2378         /* Disable statement_timeout in archive for pg_restore/psql  */
2379         ahprintf(AH, "SET statement_timeout = 0;\n");
2380
2381         /* Select the correct character set encoding */
2382         ahprintf(AH, "SET client_encoding = '%s';\n",
2383                          pg_encoding_to_char(AH->public.encoding));
2384
2385         /* Select the correct string literal syntax */
2386         ahprintf(AH, "SET standard_conforming_strings = %s;\n",
2387                          AH->public.std_strings ? "on" : "off");
2388
2389         /* Select the role to be used during restore */
2390         if (AH->ropt && AH->ropt->use_role)
2391                 ahprintf(AH, "SET ROLE %s;\n", fmtId(AH->ropt->use_role));
2392
2393         /* Make sure function checking is disabled */
2394         ahprintf(AH, "SET check_function_bodies = false;\n");
2395
2396         /* Avoid annoying notices etc */
2397         ahprintf(AH, "SET client_min_messages = warning;\n");
2398         if (!AH->public.std_strings)
2399                 ahprintf(AH, "SET escape_string_warning = off;\n");
2400
2401         ahprintf(AH, "\n");
2402 }
2403
2404 /*
2405  * Issue a SET SESSION AUTHORIZATION command.  Caller is responsible
2406  * for updating state if appropriate.  If user is NULL or an empty string,
2407  * the specification DEFAULT will be used.
2408  */
2409 static void
2410 _doSetSessionAuth(ArchiveHandle *AH, const char *user)
2411 {
2412         PQExpBuffer cmd = createPQExpBuffer();
2413
2414         appendPQExpBuffer(cmd, "SET SESSION AUTHORIZATION ");
2415
2416         /*
2417          * SQL requires a string literal here.  Might as well be correct.
2418          */
2419         if (user && *user)
2420                 appendStringLiteralAHX(cmd, user, AH);
2421         else
2422                 appendPQExpBuffer(cmd, "DEFAULT");
2423         appendPQExpBuffer(cmd, ";");
2424
2425         if (RestoringToDB(AH))
2426         {
2427                 PGresult   *res;
2428
2429                 res = PQexec(AH->connection, cmd->data);
2430
2431                 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
2432                         /* NOT warn_or_die_horribly... use -O instead to skip this. */
2433                         die_horribly(AH, modulename, "could not set session user to \"%s\": %s",
2434                                                  user, PQerrorMessage(AH->connection));
2435
2436                 PQclear(res);
2437         }
2438         else
2439                 ahprintf(AH, "%s\n\n", cmd->data);
2440
2441         destroyPQExpBuffer(cmd);
2442 }
2443
2444
2445 /*
2446  * Issue a SET default_with_oids command.  Caller is responsible
2447  * for updating state if appropriate.
2448  */
2449 static void
2450 _doSetWithOids(ArchiveHandle *AH, const bool withOids)
2451 {
2452         PQExpBuffer cmd = createPQExpBuffer();
2453
2454         appendPQExpBuffer(cmd, "SET default_with_oids = %s;", withOids ?
2455                                           "true" : "false");
2456
2457         if (RestoringToDB(AH))
2458         {
2459                 PGresult   *res;
2460
2461                 res = PQexec(AH->connection, cmd->data);
2462
2463                 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
2464                         warn_or_die_horribly(AH, modulename,
2465                                                                  "could not set default_with_oids: %s",
2466                                                                  PQerrorMessage(AH->connection));
2467
2468                 PQclear(res);
2469         }
2470         else
2471                 ahprintf(AH, "%s\n\n", cmd->data);
2472
2473         destroyPQExpBuffer(cmd);
2474 }
2475
2476
2477 /*
2478  * Issue the commands to connect to the specified database.
2479  *
2480  * If we're currently restoring right into a database, this will
2481  * actually establish a connection. Otherwise it puts a \connect into
2482  * the script output.
2483  *
2484  * NULL dbname implies reconnecting to the current DB (pretty useless).
2485  */
2486 static void
2487 _reconnectToDB(ArchiveHandle *AH, const char *dbname)
2488 {
2489         if (RestoringToDB(AH))
2490                 ReconnectToServer(AH, dbname, NULL);
2491         else
2492         {
2493                 PQExpBuffer qry = createPQExpBuffer();
2494
2495                 appendPQExpBuffer(qry, "\\connect %s\n\n",
2496                                                   dbname ? fmtId(dbname) : "-");
2497                 ahprintf(AH, "%s", qry->data);
2498                 destroyPQExpBuffer(qry);
2499         }
2500
2501         /*
2502          * NOTE: currUser keeps track of what the imaginary session user in our
2503          * script is.  It's now effectively reset to the original userID.
2504          */
2505         if (AH->currUser)
2506                 free(AH->currUser);
2507         AH->currUser = NULL;
2508
2509         /* don't assume we still know the output schema, tablespace, etc either */
2510         if (AH->currSchema)
2511                 free(AH->currSchema);
2512         AH->currSchema = NULL;
2513         if (AH->currTablespace)
2514                 free(AH->currTablespace);
2515         AH->currTablespace = NULL;
2516         AH->currWithOids = -1;
2517
2518         /* re-establish fixed state */
2519         _doSetFixedOutputState(AH);
2520 }
2521
2522 /*
2523  * Become the specified user, and update state to avoid redundant commands
2524  *
2525  * NULL or empty argument is taken to mean restoring the session default
2526  */
2527 static void
2528 _becomeUser(ArchiveHandle *AH, const char *user)
2529 {
2530         if (!user)
2531                 user = "";                              /* avoid null pointers */
2532
2533         if (AH->currUser && strcmp(AH->currUser, user) == 0)
2534                 return;                                 /* no need to do anything */
2535
2536         _doSetSessionAuth(AH, user);
2537
2538         /*
2539          * NOTE: currUser keeps track of what the imaginary session user in our
2540          * script is
2541          */
2542         if (AH->currUser)
2543                 free(AH->currUser);
2544         AH->currUser = strdup(user);
2545 }
2546
2547 /*
2548  * Become the owner of the the given TOC entry object.  If
2549  * changes in ownership are not allowed, this doesn't do anything.
2550  */
2551 static void
2552 _becomeOwner(ArchiveHandle *AH, TocEntry *te)
2553 {
2554         if (AH->ropt && (AH->ropt->noOwner || !AH->ropt->use_setsessauth))
2555                 return;
2556
2557         _becomeUser(AH, te->owner);
2558 }
2559
2560
2561 /*
2562  * Set the proper default_with_oids value for the table.
2563  */
2564 static void
2565 _setWithOids(ArchiveHandle *AH, TocEntry *te)
2566 {
2567         if (AH->currWithOids != te->withOids)
2568         {
2569                 _doSetWithOids(AH, te->withOids);
2570                 AH->currWithOids = te->withOids;
2571         }
2572 }
2573
2574
2575 /*
2576  * Issue the commands to select the specified schema as the current schema
2577  * in the target database.
2578  */
2579 static void
2580 _selectOutputSchema(ArchiveHandle *AH, const char *schemaName)
2581 {
2582         PQExpBuffer qry;
2583
2584         if (!schemaName || *schemaName == '\0' ||
2585                 (AH->currSchema && strcmp(AH->currSchema, schemaName) == 0))
2586                 return;                                 /* no need to do anything */
2587
2588         qry = createPQExpBuffer();
2589
2590         appendPQExpBuffer(qry, "SET search_path = %s",
2591                                           fmtId(schemaName));
2592         if (strcmp(schemaName, "pg_catalog") != 0)
2593                 appendPQExpBuffer(qry, ", pg_catalog");
2594
2595         if (RestoringToDB(AH))
2596         {
2597                 PGresult   *res;
2598
2599                 res = PQexec(AH->connection, qry->data);
2600
2601                 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
2602                         warn_or_die_horribly(AH, modulename,
2603                                                                  "could not set search_path to \"%s\": %s",
2604                                                                  schemaName, PQerrorMessage(AH->connection));
2605
2606                 PQclear(res);
2607         }
2608         else
2609                 ahprintf(AH, "%s;\n\n", qry->data);
2610
2611         if (AH->currSchema)
2612                 free(AH->currSchema);
2613         AH->currSchema = strdup(schemaName);
2614
2615         destroyPQExpBuffer(qry);
2616 }
2617
2618 /*
2619  * Issue the commands to select the specified tablespace as the current one
2620  * in the target database.
2621  */
2622 static void
2623 _selectTablespace(ArchiveHandle *AH, const char *tablespace)
2624 {
2625         PQExpBuffer qry;
2626         const char *want,
2627                            *have;
2628
2629         /* do nothing in --no-tablespaces mode */
2630         if (AH->ropt->noTablespace)
2631                 return;
2632
2633         have = AH->currTablespace;
2634         want = tablespace;
2635
2636         /* no need to do anything for non-tablespace object */
2637         if (!want)
2638                 return;
2639
2640         if (have && strcmp(want, have) == 0)
2641                 return;                                 /* no need to do anything */
2642
2643         qry = createPQExpBuffer();
2644
2645         if (strcmp(want, "") == 0)
2646         {
2647                 /* We want the tablespace to be the database's default */
2648                 appendPQExpBuffer(qry, "SET default_tablespace = ''");
2649         }
2650         else
2651         {
2652                 /* We want an explicit tablespace */
2653                 appendPQExpBuffer(qry, "SET default_tablespace = %s", fmtId(want));
2654         }
2655
2656         if (RestoringToDB(AH))
2657         {
2658                 PGresult   *res;
2659
2660                 res = PQexec(AH->connection, qry->data);
2661
2662                 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
2663                         warn_or_die_horribly(AH, modulename,
2664                                                                  "could not set default_tablespace to %s: %s",
2665                                                                  fmtId(want), PQerrorMessage(AH->connection));
2666
2667                 PQclear(res);
2668         }
2669         else
2670                 ahprintf(AH, "%s;\n\n", qry->data);
2671
2672         if (AH->currTablespace)
2673                 free(AH->currTablespace);
2674         AH->currTablespace = strdup(want);
2675
2676         destroyPQExpBuffer(qry);
2677 }
2678
2679 /*
2680  * Extract an object description for a TOC entry, and append it to buf.
2681  *
2682  * This is not quite as general as it may seem, since it really only
2683  * handles constructing the right thing to put into ALTER ... OWNER TO.
2684  *
2685  * The whole thing is pretty grotty, but we are kind of stuck since the
2686  * information used is all that's available in older dump files.
2687  */
2688 static void
2689 _getObjectDescription(PQExpBuffer buf, TocEntry *te, ArchiveHandle *AH)
2690 {
2691         const char *type = te->desc;
2692
2693         /* Use ALTER TABLE for views and sequences */
2694         if (strcmp(type, "VIEW") == 0 || strcmp(type, "SEQUENCE") == 0)
2695                 type = "TABLE";
2696
2697         /* objects named by a schema and name */
2698         if (strcmp(type, "CONVERSION") == 0 ||
2699                 strcmp(type, "DOMAIN") == 0 ||
2700                 strcmp(type, "TABLE") == 0 ||
2701                 strcmp(type, "TYPE") == 0 ||
2702                 strcmp(type, "TEXT SEARCH DICTIONARY") == 0 ||
2703                 strcmp(type, "TEXT SEARCH CONFIGURATION") == 0)
2704         {
2705                 appendPQExpBuffer(buf, "%s ", type);
2706                 if (te->namespace && te->namespace[0])  /* is null pre-7.3 */
2707                         appendPQExpBuffer(buf, "%s.", fmtId(te->namespace));
2708
2709                 /*
2710                  * Pre-7.3 pg_dump would sometimes (not always) put a fmtId'd name
2711                  * into te->tag for an index. This check is heuristic, so make its
2712                  * scope as narrow as possible.
2713                  */
2714                 if (AH->version < K_VERS_1_7 &&
2715                         te->tag[0] == '"' &&
2716                         te->tag[strlen(te->tag) - 1] == '"' &&
2717                         strcmp(type, "INDEX") == 0)
2718                         appendPQExpBuffer(buf, "%s", te->tag);
2719                 else
2720                         appendPQExpBuffer(buf, "%s", fmtId(te->tag));
2721                 return;
2722         }
2723
2724         /* objects named by just a name */
2725         if (strcmp(type, "DATABASE") == 0 ||
2726                 strcmp(type, "PROCEDURAL LANGUAGE") == 0 ||
2727                 strcmp(type, "SCHEMA") == 0 ||
2728                 strcmp(type, "FOREIGN DATA WRAPPER") == 0 ||
2729                 strcmp(type, "SERVER") == 0 ||
2730                 strcmp(type, "USER MAPPING") == 0)
2731         {
2732                 appendPQExpBuffer(buf, "%s %s", type, fmtId(te->tag));
2733                 return;
2734         }
2735
2736         /* BLOBs just have a name, but it's numeric so must not use fmtId */
2737         if (strcmp(type, "BLOB") == 0)
2738         {
2739                 appendPQExpBuffer(buf, "LARGE OBJECT %s", te->tag);
2740                 return;
2741         }
2742
2743         /*
2744          * These object types require additional decoration.  Fortunately, the
2745          * information needed is exactly what's in the DROP command.
2746          */
2747         if (strcmp(type, "AGGREGATE") == 0 ||
2748                 strcmp(type, "FUNCTION") == 0 ||
2749                 strcmp(type, "OPERATOR") == 0 ||
2750                 strcmp(type, "OPERATOR CLASS") == 0 ||
2751                 strcmp(type, "OPERATOR FAMILY") == 0)
2752         {
2753                 /* Chop "DROP " off the front and make a modifiable copy */
2754                 char       *first = strdup(te->dropStmt + 5);
2755                 char       *last;
2756
2757                 /* point to last character in string */
2758                 last = first + strlen(first) - 1;
2759
2760                 /* Strip off any ';' or '\n' at the end */
2761                 while (last >= first && (*last == '\n' || *last == ';'))
2762                         last--;
2763                 *(last + 1) = '\0';
2764
2765                 appendPQExpBufferStr(buf, first);
2766
2767                 free(first);
2768                 return;
2769         }
2770
2771         write_msg(modulename, "WARNING: don't know how to set owner for object type %s\n",
2772                           type);
2773 }
2774
2775 static void
2776 _printTocEntry(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt, bool isData, bool acl_pass)
2777 {
2778         /* ACLs are dumped only during acl pass */
2779         if (acl_pass)
2780         {
2781                 if (!_tocEntryIsACL(te))
2782                         return;
2783         }
2784         else
2785         {
2786                 if (_tocEntryIsACL(te))
2787                         return;
2788         }
2789
2790         /*
2791          * Avoid dumping the public schema, as it will already be created ...
2792          * unless we are using --clean mode, in which case it's been deleted and
2793          * we'd better recreate it.  Likewise for its comment, if any.
2794          */
2795         if (!ropt->dropSchema)
2796         {
2797                 if (strcmp(te->desc, "SCHEMA") == 0 &&
2798                         strcmp(te->tag, "public") == 0)
2799                         return;
2800                 /* The comment restore would require super-user privs, so avoid it. */
2801                 if (strcmp(te->desc, "COMMENT") == 0 &&
2802                         strcmp(te->tag, "SCHEMA public") == 0)
2803                         return;
2804         }
2805
2806         /* Select owner, schema, and tablespace as necessary */
2807         _becomeOwner(AH, te);
2808         _selectOutputSchema(AH, te->namespace);
2809         _selectTablespace(AH, te->tablespace);
2810
2811         /* Set up OID mode too */
2812         if (strcmp(te->desc, "TABLE") == 0)
2813                 _setWithOids(AH, te);
2814
2815         /* Emit header comment for item */
2816         if (!AH->noTocComments)
2817         {
2818                 const char *pfx;
2819
2820                 if (isData)
2821                         pfx = "Data for ";
2822                 else
2823                         pfx = "";
2824
2825                 ahprintf(AH, "--\n");
2826                 if (AH->public.verbose)
2827                 {
2828                         ahprintf(AH, "-- TOC entry %d (class %u OID %u)\n",
2829                                          te->dumpId, te->catalogId.tableoid, te->catalogId.oid);
2830                         if (te->nDeps > 0)
2831                         {
2832                                 int                     i;
2833
2834                                 ahprintf(AH, "-- Dependencies:");
2835                                 for (i = 0; i < te->nDeps; i++)
2836                                         ahprintf(AH, " %d", te->dependencies[i]);
2837                                 ahprintf(AH, "\n");
2838                         }
2839                 }
2840                 ahprintf(AH, "-- %sName: %s; Type: %s; Schema: %s; Owner: %s",
2841                                  pfx, te->tag, te->desc,
2842                                  te->namespace ? te->namespace : "-",
2843                                  ropt->noOwner ? "-" : te->owner);
2844                 if (te->tablespace && !ropt->noTablespace)
2845                         ahprintf(AH, "; Tablespace: %s", te->tablespace);
2846                 ahprintf(AH, "\n");
2847
2848                 if (AH->PrintExtraTocPtr !=NULL)
2849                         (*AH->PrintExtraTocPtr) (AH, te);
2850                 ahprintf(AH, "--\n\n");
2851         }
2852
2853         /*
2854          * Actually print the definition.
2855          *
2856          * Really crude hack for suppressing AUTHORIZATION clause that old pg_dump
2857          * versions put into CREATE SCHEMA.  We have to do this when --no-owner
2858          * mode is selected.  This is ugly, but I see no other good way ...
2859          */
2860         if (ropt->noOwner && strcmp(te->desc, "SCHEMA") == 0)
2861         {
2862                 ahprintf(AH, "CREATE SCHEMA %s;\n\n\n", fmtId(te->tag));
2863         }
2864         else
2865         {
2866                 if (strlen(te->defn) > 0)
2867                         ahprintf(AH, "%s\n\n", te->defn);
2868         }
2869
2870         /*
2871          * If we aren't using SET SESSION AUTH to determine ownership, we must
2872          * instead issue an ALTER OWNER command.  We assume that anything without
2873          * a DROP command is not a separately ownable object.  All the categories
2874          * with DROP commands must appear in one list or the other.
2875          */
2876         if (!ropt->noOwner && !ropt->use_setsessauth &&
2877                 strlen(te->owner) > 0 && strlen(te->dropStmt) > 0)
2878         {
2879                 if (strcmp(te->desc, "AGGREGATE") == 0 ||
2880                         strcmp(te->desc, "BLOB") == 0 ||
2881                         strcmp(te->desc, "CONVERSION") == 0 ||
2882                         strcmp(te->desc, "DATABASE") == 0 ||
2883                         strcmp(te->desc, "DOMAIN") == 0 ||
2884                         strcmp(te->desc, "FUNCTION") == 0 ||
2885                         strcmp(te->desc, "OPERATOR") == 0 ||
2886                         strcmp(te->desc, "OPERATOR CLASS") == 0 ||
2887                         strcmp(te->desc, "OPERATOR FAMILY") == 0 ||
2888                         strcmp(te->desc, "PROCEDURAL LANGUAGE") == 0 ||
2889                         strcmp(te->desc, "SCHEMA") == 0 ||
2890                         strcmp(te->desc, "TABLE") == 0 ||
2891                         strcmp(te->desc, "TYPE") == 0 ||
2892                         strcmp(te->desc, "VIEW") == 0 ||
2893                         strcmp(te->desc, "SEQUENCE") == 0 ||
2894                         strcmp(te->desc, "TEXT SEARCH DICTIONARY") == 0 ||
2895                         strcmp(te->desc, "TEXT SEARCH CONFIGURATION") == 0 ||
2896                         strcmp(te->desc, "FOREIGN DATA WRAPPER") == 0 ||
2897                         strcmp(te->desc, "SERVER") == 0)
2898                 {
2899                         PQExpBuffer temp = createPQExpBuffer();
2900
2901                         appendPQExpBuffer(temp, "ALTER ");
2902                         _getObjectDescription(temp, te, AH);
2903                         appendPQExpBuffer(temp, " OWNER TO %s;", fmtId(te->owner));
2904                         ahprintf(AH, "%s\n\n", temp->data);
2905                         destroyPQExpBuffer(temp);
2906                 }
2907                 else if (strcmp(te->desc, "CAST") == 0 ||
2908                                  strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
2909                                  strcmp(te->desc, "CONSTRAINT") == 0 ||
2910                                  strcmp(te->desc, "DEFAULT") == 0 ||
2911                                  strcmp(te->desc, "FK CONSTRAINT") == 0 ||
2912                                  strcmp(te->desc, "INDEX") == 0 ||
2913                                  strcmp(te->desc, "RULE") == 0 ||
2914                                  strcmp(te->desc, "TRIGGER") == 0 ||
2915                                  strcmp(te->desc, "USER MAPPING") == 0)
2916                 {
2917                         /* these object types don't have separate owners */
2918                 }
2919                 else
2920                 {
2921                         write_msg(modulename, "WARNING: don't know how to set owner for object type %s\n",
2922                                           te->desc);
2923                 }
2924         }
2925
2926         /*
2927          * If it's an ACL entry, it might contain SET SESSION AUTHORIZATION
2928          * commands, so we can no longer assume we know the current auth setting.
2929          */
2930         if (acl_pass)
2931         {
2932                 if (AH->currUser)
2933                         free(AH->currUser);
2934                 AH->currUser = NULL;
2935         }
2936 }
2937
2938 void
2939 WriteHead(ArchiveHandle *AH)
2940 {
2941         struct tm       crtm;
2942
2943         (*AH->WriteBufPtr) (AH, "PGDMP", 5);            /* Magic code */
2944         (*AH->WriteBytePtr) (AH, AH->vmaj);
2945         (*AH->WriteBytePtr) (AH, AH->vmin);
2946         (*AH->WriteBytePtr) (AH, AH->vrev);
2947         (*AH->WriteBytePtr) (AH, AH->intSize);
2948         (*AH->WriteBytePtr) (AH, AH->offSize);
2949         (*AH->WriteBytePtr) (AH, AH->format);
2950
2951 #ifndef HAVE_LIBZ
2952         if (AH->compression != 0)
2953                 write_msg(modulename, "WARNING: requested compression not available in this "
2954                                   "installation -- archive will be uncompressed\n");
2955
2956         AH->compression = 0;
2957 #endif
2958
2959         WriteInt(AH, AH->compression);
2960
2961         crtm = *localtime(&AH->createDate);
2962         WriteInt(AH, crtm.tm_sec);
2963         WriteInt(AH, crtm.tm_min);
2964         WriteInt(AH, crtm.tm_hour);
2965         WriteInt(AH, crtm.tm_mday);
2966         WriteInt(AH, crtm.tm_mon);
2967         WriteInt(AH, crtm.tm_year);
2968         WriteInt(AH, crtm.tm_isdst);
2969         WriteStr(AH, PQdb(AH->connection));
2970         WriteStr(AH, AH->public.remoteVersionStr);
2971         WriteStr(AH, PG_VERSION);
2972 }
2973
2974 void
2975 ReadHead(ArchiveHandle *AH)
2976 {
2977         char            tmpMag[7];
2978         int                     fmt;
2979         struct tm       crtm;
2980
2981         /* If we haven't already read the header... */
2982         if (!AH->readHeader)
2983         {
2984                 if ((*AH->ReadBufPtr) (AH, tmpMag, 5) != 5)
2985                         die_horribly(AH, modulename, "unexpected end of file\n");
2986
2987                 if (strncmp(tmpMag, "PGDMP", 5) != 0)
2988                         die_horribly(AH, modulename, "did not find magic string in file header\n");
2989
2990                 AH->vmaj = (*AH->ReadBytePtr) (AH);
2991                 AH->vmin = (*AH->ReadBytePtr) (AH);
2992
2993                 if (AH->vmaj > 1 || ((AH->vmaj == 1) && (AH->vmin > 0)))                /* Version > 1.0 */
2994                         AH->vrev = (*AH->ReadBytePtr) (AH);
2995                 else
2996                         AH->vrev = 0;
2997
2998                 AH->version = ((AH->vmaj * 256 + AH->vmin) * 256 + AH->vrev) * 256 + 0;
2999
3000
3001                 if (AH->version < K_VERS_1_0 || AH->version > K_VERS_MAX)
3002                         die_horribly(AH, modulename, "unsupported version (%d.%d) in file header\n",
3003                                                  AH->vmaj, AH->vmin);
3004
3005                 AH->intSize = (*AH->ReadBytePtr) (AH);
3006                 if (AH->intSize > 32)
3007                         die_horribly(AH, modulename, "sanity check on integer size (%lu) failed\n",
3008                                                  (unsigned long) AH->intSize);
3009
3010                 if (AH->intSize > sizeof(int))
3011                         write_msg(modulename, "WARNING: archive was made on a machine with larger integers, some operations might fail\n");
3012
3013                 if (AH->version >= K_VERS_1_7)
3014                         AH->offSize = (*AH->ReadBytePtr) (AH);
3015                 else
3016                         AH->offSize = AH->intSize;
3017
3018                 fmt = (*AH->ReadBytePtr) (AH);
3019
3020                 if (AH->format != fmt)
3021                         die_horribly(AH, modulename, "expected format (%d) differs from format found in file (%d)\n",
3022                                                  AH->format, fmt);
3023         }
3024
3025         if (AH->version >= K_VERS_1_2)
3026         {
3027                 if (AH->version < K_VERS_1_4)
3028                         AH->compression = (*AH->ReadBytePtr) (AH);
3029                 else
3030                         AH->compression = ReadInt(AH);
3031         }
3032         else
3033                 AH->compression = Z_DEFAULT_COMPRESSION;
3034
3035 #ifndef HAVE_LIBZ
3036         if (AH->compression != 0)
3037                 write_msg(modulename, "WARNING: archive is compressed, but this installation does not support compression -- no data will be available\n");
3038 #endif
3039
3040         if (AH->version >= K_VERS_1_4)
3041         {
3042                 crtm.tm_sec = ReadInt(AH);
3043                 crtm.tm_min = ReadInt(AH);
3044                 crtm.tm_hour = ReadInt(AH);
3045                 crtm.tm_mday = ReadInt(AH);
3046                 crtm.tm_mon = ReadInt(AH);
3047                 crtm.tm_year = ReadInt(AH);
3048                 crtm.tm_isdst = ReadInt(AH);
3049
3050                 AH->archdbname = ReadStr(AH);
3051
3052                 AH->createDate = mktime(&crtm);
3053
3054                 if (AH->createDate == (time_t) -1)
3055                         write_msg(modulename, "WARNING: invalid creation date in header\n");
3056         }
3057
3058         if (AH->version >= K_VERS_1_10)
3059         {
3060                 AH->archiveRemoteVersion = ReadStr(AH);
3061                 AH->archiveDumpVersion = ReadStr(AH);
3062         }
3063 }
3064
3065
3066 /*
3067  * checkSeek
3068  *        check to see if fseek can be performed.
3069  */
3070 bool
3071 checkSeek(FILE *fp)
3072 {
3073         if (fseeko(fp, 0, SEEK_CUR) != 0)
3074                 return false;
3075         else if (sizeof(pgoff_t) > sizeof(long))
3076         {
3077                 /*
3078                  * At this point, pgoff_t is too large for long, so we return based on
3079                  * whether an pgoff_t version of fseek is available.
3080                  */
3081 #ifdef HAVE_FSEEKO
3082                 return true;
3083 #else
3084                 return false;
3085 #endif
3086         }
3087         else
3088                 return true;
3089 }
3090
3091
3092 /*
3093  * dumpTimestamp
3094  */
3095 static void
3096 dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim)
3097 {
3098         char            buf[256];
3099
3100         /*
3101          * We don't print the timezone on Win32, because the names are long and
3102          * localized, which means they may contain characters in various random
3103          * encodings; this has been seen to cause encoding errors when reading the
3104          * dump script.
3105          */
3106         if (strftime(buf, sizeof(buf),
3107 #ifndef WIN32
3108                                  "%Y-%m-%d %H:%M:%S %Z",
3109 #else
3110                                  "%Y-%m-%d %H:%M:%S",
3111 #endif
3112                                  localtime(&tim)) != 0)
3113                 ahprintf(AH, "-- %s %s\n\n", msg, buf);
3114 }
3115
3116
3117 /*
3118  * Main engine for parallel restore.
3119  *
3120  * Work is done in three phases.
3121  * First we process tocEntries until we come to one that is marked
3122  * SECTION_DATA or SECTION_POST_DATA, in a single connection, just as for a
3123  * standard restore.  Second we process the remaining non-ACL steps in
3124  * parallel worker children (threads on Windows, processes on Unix), each of
3125  * which connects separately to the database.  Finally we process all the ACL
3126  * entries in a single connection (that happens back in RestoreArchive).
3127  */
3128 static void
3129 restore_toc_entries_parallel(ArchiveHandle *AH)
3130 {
3131         RestoreOptions *ropt = AH->ropt;
3132         int                     n_slots = ropt->number_of_jobs;
3133         ParallelSlot *slots;
3134         int                     work_status;
3135         int                     next_slot;
3136         TocEntry        pending_list;
3137         TocEntry        ready_list;
3138         TocEntry   *next_work_item;
3139         thandle         ret_child;
3140         TocEntry   *te;
3141
3142         ahlog(AH, 2, "entering restore_toc_entries_parallel\n");
3143
3144         /* we haven't got round to making this work for all archive formats */
3145         if (AH->ClonePtr == NULL || AH->ReopenPtr == NULL)
3146                 die_horribly(AH, modulename, "parallel restore is not supported with this archive file format\n");
3147
3148         /* doesn't work if the archive represents dependencies as OIDs, either */
3149         if (AH->version < K_VERS_1_8)
3150                 die_horribly(AH, modulename, "parallel restore is not supported with archives made by pre-8.0 pg_dump\n");
3151
3152         slots = (ParallelSlot *) calloc(sizeof(ParallelSlot), n_slots);
3153
3154         /* Adjust dependency information */
3155         fix_dependencies(AH);
3156
3157         /*
3158          * Do all the early stuff in a single connection in the parent. There's no
3159          * great point in running it in parallel, in fact it will actually run
3160          * faster in a single connection because we avoid all the connection and
3161          * setup overhead.
3162          */
3163         for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next)
3164         {
3165                 if (next_work_item->section == SECTION_DATA ||
3166                         next_work_item->section == SECTION_POST_DATA)
3167                         break;
3168
3169                 ahlog(AH, 1, "processing item %d %s %s\n",
3170                           next_work_item->dumpId,
3171                           next_work_item->desc, next_work_item->tag);
3172
3173                 (void) restore_toc_entry(AH, next_work_item, ropt, false);
3174
3175                 /* there should be no touch of ready_list here, so pass NULL */
3176                 reduce_dependencies(AH, next_work_item, NULL);
3177         }
3178
3179         /*
3180          * Now close parent connection in prep for parallel steps.      We do this
3181          * mainly to ensure that we don't exceed the specified number of parallel
3182          * connections.
3183          */
3184         PQfinish(AH->connection);
3185         AH->connection = NULL;
3186
3187         /* blow away any transient state from the old connection */
3188         if (AH->currUser)
3189                 free(AH->currUser);
3190         AH->currUser = NULL;
3191         if (AH->currSchema)
3192                 free(AH->currSchema);
3193         AH->currSchema = NULL;
3194         if (AH->currTablespace)
3195                 free(AH->currTablespace);
3196         AH->currTablespace = NULL;
3197         AH->currWithOids = -1;
3198
3199         /*
3200          * Initialize the lists of pending and ready items.  After this setup, the
3201          * pending list is everything that needs to be done but is blocked by one
3202          * or more dependencies, while the ready list contains items that have no
3203          * remaining dependencies.      Note: we don't yet filter out entries that
3204          * aren't going to be restored.  They might participate in dependency
3205          * chains connecting entries that should be restored, so we treat them as
3206          * live until we actually process them.
3207          */
3208         par_list_header_init(&pending_list);
3209         par_list_header_init(&ready_list);
3210         for (; next_work_item != AH->toc; next_work_item = next_work_item->next)
3211         {
3212                 if (next_work_item->depCount > 0)
3213                         par_list_append(&pending_list, next_work_item);
3214                 else
3215                         par_list_append(&ready_list, next_work_item);
3216         }
3217
3218         /*
3219          * main parent loop
3220          *
3221          * Keep going until there is no worker still running AND there is no work
3222          * left to be done.
3223          */
3224
3225         ahlog(AH, 1, "entering main parallel loop\n");
3226
3227         while ((next_work_item = get_next_work_item(AH, &ready_list,
3228                                                                                                 slots, n_slots)) != NULL ||
3229                    work_in_progress(slots, n_slots))
3230         {
3231                 if (next_work_item != NULL)
3232                 {
3233                         teReqs          reqs;
3234
3235                         /* If not to be dumped, don't waste time launching a worker */
3236                         reqs = _tocEntryRequired(next_work_item, AH->ropt, false);
3237                         if ((reqs & (REQ_SCHEMA | REQ_DATA)) == 0)
3238                         {
3239                                 ahlog(AH, 1, "skipping item %d %s %s\n",
3240                                           next_work_item->dumpId,
3241                                           next_work_item->desc, next_work_item->tag);
3242
3243                                 par_list_remove(next_work_item);
3244                                 reduce_dependencies(AH, next_work_item, &ready_list);
3245
3246                                 continue;
3247                         }
3248
3249                         if ((next_slot = get_next_slot(slots, n_slots)) != NO_SLOT)
3250                         {
3251                                 /* There is work still to do and a worker slot available */
3252                                 thandle         child;
3253                                 RestoreArgs *args;
3254
3255                                 ahlog(AH, 1, "launching item %d %s %s\n",
3256                                           next_work_item->dumpId,
3257                                           next_work_item->desc, next_work_item->tag);
3258
3259                                 par_list_remove(next_work_item);
3260
3261                                 /* this memory is dealloced in mark_work_done() */
3262                                 args = malloc(sizeof(RestoreArgs));
3263                                 args->AH = CloneArchive(AH);
3264                                 args->te = next_work_item;
3265
3266                                 /* run the step in a worker child */
3267                                 child = spawn_restore(args);
3268
3269                                 slots[next_slot].child_id = child;
3270                                 slots[next_slot].args = args;
3271
3272                                 continue;
3273                         }
3274                 }
3275
3276                 /*
3277                  * If we get here there must be work being done.  Either there is no
3278                  * work available to schedule (and work_in_progress returned true) or
3279                  * there are no slots available.  So we wait for a worker to finish,
3280                  * and process the result.
3281                  */
3282                 ret_child = reap_child(slots, n_slots, &work_status);
3283
3284                 if (WIFEXITED(work_status))
3285                 {
3286                         mark_work_done(AH, &ready_list,
3287                                                    ret_child, WEXITSTATUS(work_status),
3288                                                    slots, n_slots);
3289                 }
3290                 else
3291                 {
3292                         die_horribly(AH, modulename, "worker process crashed: status %d\n",
3293                                                  work_status);
3294                 }
3295         }
3296
3297         ahlog(AH, 1, "finished main parallel loop\n");
3298
3299         /*
3300          * Now reconnect the single parent connection.
3301          */
3302         ConnectDatabase((Archive *) AH, ropt->dbname,
3303                                         ropt->pghost, ropt->pgport, ropt->username,
3304                                         ropt->promptPassword);
3305
3306         _doSetFixedOutputState(AH);
3307
3308         /*
3309          * Make sure there is no non-ACL work left due to, say, circular
3310          * dependencies, or some other pathological condition. If so, do it in the
3311          * single parent connection.
3312          */
3313         for (te = pending_list.par_next; te != &pending_list; te = te->par_next)
3314         {
3315                 ahlog(AH, 1, "processing missed item %d %s %s\n",
3316                           te->dumpId, te->desc, te->tag);
3317                 (void) restore_toc_entry(AH, te, ropt, false);
3318         }
3319
3320         /* The ACLs will be handled back in RestoreArchive. */
3321 }
3322
3323 /*
3324  * create a worker child to perform a restore step in parallel
3325  */
3326 static thandle
3327 spawn_restore(RestoreArgs *args)
3328 {
3329         thandle         child;
3330
3331         /* Ensure stdio state is quiesced before forking */
3332         fflush(NULL);
3333
3334 #ifndef WIN32
3335         child = fork();
3336         if (child == 0)
3337         {
3338                 /* in child process */
3339                 parallel_restore(args);
3340                 die_horribly(args->AH, modulename,
3341                                          "parallel_restore should not return\n");
3342         }
3343         else if (child < 0)
3344         {
3345                 /* fork failed */
3346                 die_horribly(args->AH, modulename,
3347                                          "could not create worker process: %s\n",
3348                                          strerror(errno));
3349         }
3350 #else
3351         child = (HANDLE) _beginthreadex(NULL, 0, (void *) parallel_restore,
3352                                                                         args, 0, NULL);
3353         if (child == 0)
3354                 die_horribly(args->AH, modulename,
3355                                          "could not create worker thread: %s\n",
3356                                          strerror(errno));
3357 #endif
3358
3359         return child;
3360 }
3361
3362 /*
3363  *      collect status from a completed worker child
3364  */
3365 static thandle
3366 reap_child(ParallelSlot *slots, int n_slots, int *work_status)
3367 {
3368 #ifndef WIN32
3369         /* Unix is so much easier ... */
3370         return wait(work_status);
3371 #else
3372         static HANDLE *handles = NULL;
3373         int                     hindex,
3374                                 snum,
3375                                 tnum;
3376         thandle         ret_child;
3377         DWORD           res;
3378
3379         /* first time around only, make space for handles to listen on */
3380         if (handles == NULL)
3381                 handles = (HANDLE *) calloc(sizeof(HANDLE), n_slots);
3382
3383         /* set up list of handles to listen to */
3384         for (snum = 0, tnum = 0; snum < n_slots; snum++)
3385                 if (slots[snum].child_id != 0)
3386                         handles[tnum++] = slots[snum].child_id;
3387
3388         /* wait for one to finish */
3389         hindex = WaitForMultipleObjects(tnum, handles, false, INFINITE);
3390
3391         /* get handle of finished thread */
3392         ret_child = handles[hindex - WAIT_OBJECT_0];
3393
3394         /* get the result */
3395         GetExitCodeThread(ret_child, &res);
3396         *work_status = res;
3397
3398         /* dispose of handle to stop leaks */
3399         CloseHandle(ret_child);
3400
3401         return ret_child;
3402 #endif
3403 }
3404
3405 /*
3406  * are we doing anything now?
3407  */
3408 static bool
3409 work_in_progress(ParallelSlot *slots, int n_slots)
3410 {
3411         int                     i;
3412
3413         for (i = 0; i < n_slots; i++)
3414         {
3415                 if (slots[i].child_id != 0)
3416                         return true;
3417         }
3418         return false;
3419 }
3420
3421 /*
3422  * find the first free parallel slot (if any).
3423  */
3424 static int
3425 get_next_slot(ParallelSlot *slots, int n_slots)
3426 {
3427         int                     i;
3428
3429         for (i = 0; i < n_slots; i++)
3430         {
3431                 if (slots[i].child_id == 0)
3432                         return i;
3433         }
3434         return NO_SLOT;
3435 }
3436
3437
3438 /*
3439  * Check if te1 has an exclusive lock requirement for an item that te2 also
3440  * requires, whether or not te2's requirement is for an exclusive lock.
3441  */
3442 static bool
3443 has_lock_conflicts(TocEntry *te1, TocEntry *te2)
3444 {
3445         int                     j,
3446                                 k;
3447
3448         for (j = 0; j < te1->nLockDeps; j++)
3449         {
3450                 for (k = 0; k < te2->nDeps; k++)
3451                 {
3452                         if (te1->lockDeps[j] == te2->dependencies[k])
3453                                 return true;
3454                 }
3455         }
3456         return false;
3457 }
3458
3459
3460 /*
3461  * Initialize the header of a parallel-processing list.
3462  *
3463  * These are circular lists with a dummy TocEntry as header, just like the
3464  * main TOC list; but we use separate list links so that an entry can be in
3465  * the main TOC list as well as in a parallel-processing list.
3466  */
3467 static void
3468 par_list_header_init(TocEntry *l)
3469 {
3470         l->par_prev = l->par_next = l;
3471 }
3472
3473 /* Append te to the end of the parallel-processing list headed by l */
3474 static void
3475 par_list_append(TocEntry *l, TocEntry *te)
3476 {
3477         te->par_prev = l->par_prev;
3478         l->par_prev->par_next = te;
3479         l->par_prev = te;
3480         te->par_next = l;
3481 }
3482
3483 /* Remove te from whatever parallel-processing list it's in */
3484 static void
3485 par_list_remove(TocEntry *te)
3486 {
3487         te->par_prev->par_next = te->par_next;
3488         te->par_next->par_prev = te->par_prev;
3489         te->par_prev = NULL;
3490         te->par_next = NULL;
3491 }
3492
3493
3494 /*
3495  * Find the next work item (if any) that is capable of being run now.
3496  *
3497  * To qualify, the item must have no remaining dependencies
3498  * and no requirements for locks that are incompatible with
3499  * items currently running.  Items in the ready_list are known to have
3500  * no remaining dependencies, but we have to check for lock conflicts.
3501  *
3502  * Note that the returned item has *not* been removed from ready_list.
3503  * The caller must do that after successfully dispatching the item.
3504  *
3505  * pref_non_data is for an alternative selection algorithm that gives
3506  * preference to non-data items if there is already a data load running.
3507  * It is currently disabled.
3508  */
3509 static TocEntry *
3510 get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
3511                                    ParallelSlot *slots, int n_slots)
3512 {
3513         bool            pref_non_data = false;  /* or get from AH->ropt */
3514         TocEntry   *data_te = NULL;
3515         TocEntry   *te;
3516         int                     i,
3517                                 k;
3518
3519         /*
3520          * Bogus heuristics for pref_non_data
3521          */
3522         if (pref_non_data)
3523         {
3524                 int                     count = 0;
3525
3526                 for (k = 0; k < n_slots; k++)
3527                         if (slots[k].args->te != NULL &&
3528                                 slots[k].args->te->section == SECTION_DATA)
3529                                 count++;
3530                 if (n_slots == 0 || count * 4 < n_slots)
3531                         pref_non_data = false;
3532         }
3533
3534         /*
3535          * Search the ready_list until we find a suitable item.
3536          */
3537         for (te = ready_list->par_next; te != ready_list; te = te->par_next)
3538         {
3539                 bool            conflicts = false;
3540
3541                 /*
3542                  * Check to see if the item would need exclusive lock on something
3543                  * that a currently running item also needs lock on, or vice versa. If
3544                  * so, we don't want to schedule them together.
3545                  */
3546                 for (i = 0; i < n_slots && !conflicts; i++)
3547                 {
3548                         TocEntry   *running_te;
3549
3550                         if (slots[i].args == NULL)
3551                                 continue;
3552                         running_te = slots[i].args->te;
3553
3554                         if (has_lock_conflicts(te, running_te) ||
3555                                 has_lock_conflicts(running_te, te))
3556                         {
3557                                 conflicts = true;
3558                                 break;
3559                         }
3560                 }
3561
3562                 if (conflicts)
3563                         continue;
3564
3565                 if (pref_non_data && te->section == SECTION_DATA)
3566                 {
3567                         if (data_te == NULL)
3568                                 data_te = te;
3569                         continue;
3570                 }
3571
3572                 /* passed all tests, so this item can run */
3573                 return te;
3574         }
3575
3576         if (data_te != NULL)
3577                 return data_te;
3578
3579         ahlog(AH, 2, "no item ready\n");
3580         return NULL;
3581 }
3582
3583
3584 /*
3585  * Restore a single TOC item in parallel with others
3586  *
3587  * this is the procedure run as a thread (Windows) or a
3588  * separate process (everything else).
3589  */
3590 static parallel_restore_result
3591 parallel_restore(RestoreArgs *args)
3592 {
3593         ArchiveHandle *AH = args->AH;
3594         TocEntry   *te = args->te;
3595         RestoreOptions *ropt = AH->ropt;
3596         int                     retval;
3597
3598         /*
3599          * Close and reopen the input file so we have a private file pointer that
3600          * doesn't stomp on anyone else's file pointer, if we're actually going to
3601          * need to read from the file. Otherwise, just close it except on Windows,
3602          * where it will possibly be needed by other threads.
3603          *
3604          * Note: on Windows, since we are using threads not processes, the reopen
3605          * call *doesn't* close the original file pointer but just open a new one.
3606          */
3607         if (te->section == SECTION_DATA)
3608                 (AH->ReopenPtr) (AH);
3609 #ifndef WIN32
3610         else
3611                 (AH->ClosePtr) (AH);
3612 #endif
3613
3614         /*
3615          * We need our own database connection, too
3616          */
3617         ConnectDatabase((Archive *) AH, ropt->dbname,
3618                                         ropt->pghost, ropt->pgport, ropt->username,
3619                                         ropt->promptPassword);
3620
3621         _doSetFixedOutputState(AH);
3622
3623         /* Restore the TOC item */
3624         retval = restore_toc_entry(AH, te, ropt, true);
3625
3626         /* And clean up */
3627         PQfinish(AH->connection);
3628         AH->connection = NULL;
3629
3630         /* If we reopened the file, we are done with it, so close it now */
3631         if (te->section == SECTION_DATA)
3632                 (AH->ClosePtr) (AH);
3633
3634         if (retval == 0 && AH->public.n_errors)
3635                 retval = WORKER_IGNORED_ERRORS;
3636
3637 #ifndef WIN32
3638         exit(retval);
3639 #else
3640         return retval;
3641 #endif
3642 }
3643
3644
3645 /*
3646  * Housekeeping to be done after a step has been parallel restored.
3647  *
3648  * Clear the appropriate slot, free all the extra memory we allocated,
3649  * update status, and reduce the dependency count of any dependent items.
3650  */
3651 static void
3652 mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
3653                            thandle worker, int status,
3654                            ParallelSlot *slots, int n_slots)
3655 {
3656         TocEntry   *te = NULL;
3657         int                     i;
3658
3659         for (i = 0; i < n_slots; i++)
3660         {
3661                 if (slots[i].child_id == worker)
3662                 {
3663                         slots[i].child_id = 0;
3664                         te = slots[i].args->te;
3665                         DeCloneArchive(slots[i].args->AH);
3666                         free(slots[i].args);
3667                         slots[i].args = NULL;
3668
3669                         break;
3670                 }
3671         }
3672
3673         if (te == NULL)
3674                 die_horribly(AH, modulename, "could not find slot of finished worker\n");
3675
3676         ahlog(AH, 1, "finished item %d %s %s\n",
3677                   te->dumpId, te->desc, te->tag);
3678
3679         if (status == WORKER_CREATE_DONE)
3680                 mark_create_done(AH, te);
3681         else if (status == WORKER_INHIBIT_DATA)
3682         {
3683                 inhibit_data_for_failed_table(AH, te);
3684                 AH->public.n_errors++;
3685         }
3686         else if (status == WORKER_IGNORED_ERRORS)
3687                 AH->public.n_errors++;
3688         else if (status != 0)
3689                 die_horribly(AH, modulename, "worker process failed: exit code %d\n",
3690                                          status);
3691
3692         reduce_dependencies(AH, te, ready_list);
3693 }
3694
3695
3696 /*
3697  * Process the dependency information into a form useful for parallel restore.
3698  *
3699  * We set up depCount fields that are the number of as-yet-unprocessed
3700  * dependencies for each TOC entry.
3701  *
3702  * We also identify locking dependencies so that we can avoid trying to
3703  * schedule conflicting items at the same time.
3704  */
3705 static void
3706 fix_dependencies(ArchiveHandle *AH)
3707 {
3708         TocEntry  **tocsByDumpId;
3709         TocEntry   *te;
3710         DumpId          maxDumpId;
3711         int                     i;
3712
3713         /*
3714          * For some of the steps here, it is convenient to have an array that
3715          * indexes the TOC entries by dump ID, rather than searching the TOC list
3716          * repeatedly.  Entries for dump IDs not present in the TOC will be NULL.
3717          *
3718          * NOTE: because maxDumpId is just the highest dump ID defined in the
3719          * archive, there might be dependencies for IDs > maxDumpId.  All uses of
3720          * this array must guard against out-of-range dependency numbers.
3721          *
3722          * Also, initialize the depCount fields, and make sure all the TOC items
3723          * are marked as not being in any parallel-processing list.
3724          */
3725         maxDumpId = AH->maxDumpId;
3726         tocsByDumpId = (TocEntry **) calloc(maxDumpId, sizeof(TocEntry *));
3727         for (te = AH->toc->next; te != AH->toc; te = te->next)
3728         {
3729                 tocsByDumpId[te->dumpId - 1] = te;
3730                 te->depCount = te->nDeps;
3731                 te->par_prev = NULL;
3732                 te->par_next = NULL;
3733         }
3734
3735         /*
3736          * POST_DATA items that are shown as depending on a table need to be
3737          * re-pointed to depend on that table's data, instead.  This ensures they
3738          * won't get scheduled until the data has been loaded.  We handle this by
3739          * first finding TABLE/TABLE DATA pairs and then scanning all the
3740          * dependencies.
3741          *
3742          * Note: currently, a TABLE DATA should always have exactly one
3743          * dependency, on its TABLE item.  So we don't bother to search, but look
3744          * just at the first dependency.  We do trouble to make sure that it's a
3745          * TABLE, if possible.  However, if the dependency isn't in the archive
3746          * then just assume it was a TABLE; this is to cover cases where the table
3747          * was suppressed but we have the data and some dependent post-data items.
3748          */
3749         for (te = AH->toc->next; te != AH->toc; te = te->next)
3750         {
3751                 if (strcmp(te->desc, "TABLE DATA") == 0 && te->nDeps > 0)
3752                 {
3753                         DumpId          tableId = te->dependencies[0];
3754
3755                         if (tableId > maxDumpId ||
3756                                 tocsByDumpId[tableId - 1] == NULL ||
3757                                 strcmp(tocsByDumpId[tableId - 1]->desc, "TABLE") == 0)
3758                         {
3759                                 repoint_table_dependencies(AH, tableId, te->dumpId);
3760                         }
3761                 }
3762         }
3763
3764         /*
3765          * Pre-8.4 versions of pg_dump neglected to set up a dependency from BLOB
3766          * COMMENTS to BLOBS.  Cope.  (We assume there's only one BLOBS and only
3767          * one BLOB COMMENTS in such files.)
3768          */
3769         if (AH->version < K_VERS_1_11)
3770         {
3771                 for (te = AH->toc->next; te != AH->toc; te = te->next)
3772                 {
3773                         if (strcmp(te->desc, "BLOB COMMENTS") == 0 && te->nDeps == 0)
3774                         {
3775                                 TocEntry   *te2;
3776
3777                                 for (te2 = AH->toc->next; te2 != AH->toc; te2 = te2->next)
3778                                 {
3779                                         if (strcmp(te2->desc, "BLOBS") == 0)
3780                                         {
3781                                                 te->dependencies = (DumpId *) malloc(sizeof(DumpId));
3782                                                 te->dependencies[0] = te2->dumpId;
3783                                                 te->nDeps++;
3784                                                 te->depCount++;
3785                                                 break;
3786                                         }
3787                                 }
3788                                 break;
3789                         }
3790                 }
3791         }
3792
3793         /*
3794          * It is possible that the dependencies list items that are not in the
3795          * archive at all.      Subtract such items from the depCounts.
3796          */
3797         for (te = AH->toc->next; te != AH->toc; te = te->next)
3798         {
3799                 for (i = 0; i < te->nDeps; i++)
3800                 {
3801                         DumpId          depid = te->dependencies[i];
3802
3803                         if (depid > maxDumpId || tocsByDumpId[depid - 1] == NULL)
3804                                 te->depCount--;
3805                 }
3806         }
3807
3808         /*
3809          * Lastly, work out the locking dependencies.
3810          */
3811         for (te = AH->toc->next; te != AH->toc; te = te->next)
3812         {
3813                 te->lockDeps = NULL;
3814                 te->nLockDeps = 0;
3815                 identify_locking_dependencies(te, tocsByDumpId, maxDumpId);
3816         }
3817
3818         free(tocsByDumpId);
3819 }
3820
3821 /*
3822  * Change dependencies on tableId to depend on tableDataId instead,
3823  * but only in POST_DATA items.
3824  */
3825 static void
3826 repoint_table_dependencies(ArchiveHandle *AH,
3827                                                    DumpId tableId, DumpId tableDataId)
3828 {
3829         TocEntry   *te;
3830         int                     i;
3831
3832         for (te = AH->toc->next; te != AH->toc; te = te->next)
3833         {
3834                 if (te->section != SECTION_POST_DATA)
3835                         continue;
3836                 for (i = 0; i < te->nDeps; i++)
3837                 {
3838                         if (te->dependencies[i] == tableId)
3839                         {
3840                                 te->dependencies[i] = tableDataId;
3841                                 ahlog(AH, 2, "transferring dependency %d -> %d to %d\n",
3842                                           te->dumpId, tableId, tableDataId);
3843                         }
3844                 }
3845         }
3846 }
3847
3848 /*
3849  * Identify which objects we'll need exclusive lock on in order to restore
3850  * the given TOC entry (*other* than the one identified by the TOC entry
3851  * itself).  Record their dump IDs in the entry's lockDeps[] array.
3852  * tocsByDumpId[] is a convenience array (of size maxDumpId) to avoid
3853  * searching the TOC for each dependency.
3854  */
3855 static void
3856 identify_locking_dependencies(TocEntry *te,
3857                                                           TocEntry **tocsByDumpId,
3858                                                           DumpId maxDumpId)
3859 {
3860         DumpId     *lockids;
3861         int                     nlockids;
3862         int                     i;
3863
3864         /* Quick exit if no dependencies at all */
3865         if (te->nDeps == 0)
3866                 return;
3867
3868         /* Exit if this entry doesn't need exclusive lock on other objects */
3869         if (!(strcmp(te->desc, "CONSTRAINT") == 0 ||
3870                   strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
3871                   strcmp(te->desc, "FK CONSTRAINT") == 0 ||
3872                   strcmp(te->desc, "RULE") == 0 ||
3873                   strcmp(te->desc, "TRIGGER") == 0))
3874                 return;
3875
3876         /*
3877          * We assume the item requires exclusive lock on each TABLE DATA item
3878          * listed among its dependencies.  (This was originally a dependency on
3879          * the TABLE, but fix_dependencies repointed it to the data item. Note
3880          * that all the entry types we are interested in here are POST_DATA, so
3881          * they will all have been changed this way.)
3882          */
3883         lockids = (DumpId *) malloc(te->nDeps * sizeof(DumpId));
3884         nlockids = 0;
3885         for (i = 0; i < te->nDeps; i++)
3886         {
3887                 DumpId          depid = te->dependencies[i];
3888
3889                 if (depid <= maxDumpId && tocsByDumpId[depid - 1] &&
3890                         strcmp(tocsByDumpId[depid - 1]->desc, "TABLE DATA") == 0)
3891                         lockids[nlockids++] = depid;
3892         }
3893
3894         if (nlockids == 0)
3895         {
3896                 free(lockids);
3897                 return;
3898         }
3899
3900         te->lockDeps = realloc(lockids, nlockids * sizeof(DumpId));
3901         te->nLockDeps = nlockids;
3902 }
3903
3904 /*
3905  * Remove the specified TOC entry from the depCounts of items that depend on
3906  * it, thereby possibly making them ready-to-run.  Any pending item that
3907  * becomes ready should be moved to the ready list.
3908  */
3909 static void
3910 reduce_dependencies(ArchiveHandle *AH, TocEntry *te, TocEntry *ready_list)
3911 {
3912         DumpId          target = te->dumpId;
3913         int                     i;
3914
3915         ahlog(AH, 2, "reducing dependencies for %d\n", target);
3916
3917         /*
3918          * We must examine all entries, not only the ones after the target item,
3919          * because if the user used a -L switch then the original dependency-
3920          * respecting order has been destroyed by SortTocFromFile.
3921          */
3922         for (te = AH->toc->next; te != AH->toc; te = te->next)
3923         {
3924                 for (i = 0; i < te->nDeps; i++)
3925                 {
3926                         if (te->dependencies[i] == target)
3927                         {
3928                                 te->depCount--;
3929                                 if (te->depCount == 0 && te->par_prev != NULL)
3930                                 {
3931                                         /* It must be in the pending list, so remove it ... */
3932                                         par_list_remove(te);
3933                                         /* ... and add to ready_list */
3934                                         par_list_append(ready_list, te);
3935                                 }
3936                         }
3937                 }
3938         }
3939 }
3940
3941 /*
3942  * Set the created flag on the DATA member corresponding to the given
3943  * TABLE member
3944  */
3945 static void
3946 mark_create_done(ArchiveHandle *AH, TocEntry *te)
3947 {
3948         TocEntry   *tes;
3949
3950         for (tes = AH->toc->next; tes != AH->toc; tes = tes->next)
3951         {
3952                 if (strcmp(tes->desc, "TABLE DATA") == 0 &&
3953                         strcmp(tes->tag, te->tag) == 0 &&
3954                         strcmp(tes->namespace ? tes->namespace : "",
3955                                    te->namespace ? te->namespace : "") == 0)
3956                 {
3957                         tes->created = true;
3958                         break;
3959                 }
3960         }
3961 }
3962
3963 /*
3964  * Mark the DATA member corresponding to the given TABLE member
3965  * as not wanted
3966  */
3967 static void
3968 inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te)
3969 {
3970         RestoreOptions *ropt = AH->ropt;
3971         TocEntry   *tes;
3972
3973         ahlog(AH, 1, "table \"%s\" could not be created, will not restore its data\n",
3974                   te->tag);
3975
3976         for (tes = AH->toc->next; tes != AH->toc; tes = tes->next)
3977         {
3978                 if (strcmp(tes->desc, "TABLE DATA") == 0 &&
3979                         strcmp(tes->tag, te->tag) == 0 &&
3980                         strcmp(tes->namespace ? tes->namespace : "",
3981                                    te->namespace ? te->namespace : "") == 0)
3982                 {
3983                         /* mark it unwanted; we assume idWanted array already exists */
3984                         ropt->idWanted[tes->dumpId - 1] = false;
3985                         break;
3986                 }
3987         }
3988 }
3989
3990
3991 /*
3992  * Clone and de-clone routines used in parallel restoration.
3993  *
3994  * Enough of the structure is cloned to ensure that there is no
3995  * conflict between different threads each with their own clone.
3996  *
3997  * These could be public, but no need at present.
3998  */
3999 static ArchiveHandle *
4000 CloneArchive(ArchiveHandle *AH)
4001 {
4002         ArchiveHandle *clone;
4003
4004         /* Make a "flat" copy */
4005         clone = (ArchiveHandle *) malloc(sizeof(ArchiveHandle));
4006         if (clone == NULL)
4007                 die_horribly(AH, modulename, "out of memory\n");
4008         memcpy(clone, AH, sizeof(ArchiveHandle));
4009
4010         /* Handle format-independent fields */
4011         clone->pgCopyBuf = createPQExpBuffer();
4012         clone->sqlBuf = createPQExpBuffer();
4013         clone->sqlparse.tagBuf = NULL;
4014
4015         /* The clone will have its own connection, so disregard connection state */
4016         clone->connection = NULL;
4017         clone->currUser = NULL;
4018         clone->currSchema = NULL;
4019         clone->currTablespace = NULL;
4020         clone->currWithOids = -1;
4021
4022         /* savedPassword must be local in case we change it while connecting */
4023         if (clone->savedPassword)
4024                 clone->savedPassword = strdup(clone->savedPassword);
4025
4026         /* clone has its own error count, too */
4027         clone->public.n_errors = 0;
4028
4029         /* Let the format-specific code have a chance too */
4030         (clone->ClonePtr) (clone);
4031
4032         return clone;
4033 }
4034
4035 /*
4036  * Release clone-local storage.
4037  *
4038  * Note: we assume any clone-local connection was already closed.
4039  */
4040 static void
4041 DeCloneArchive(ArchiveHandle *AH)
4042 {
4043         /* Clear format-specific state */
4044         (AH->DeClonePtr) (AH);
4045
4046         /* Clear state allocated by CloneArchive */
4047         destroyPQExpBuffer(AH->pgCopyBuf);
4048         destroyPQExpBuffer(AH->sqlBuf);
4049         if (AH->sqlparse.tagBuf)
4050                 destroyPQExpBuffer(AH->sqlparse.tagBuf);
4051
4052         /* Clear any connection-local state */
4053         if (AH->currUser)
4054                 free(AH->currUser);
4055         if (AH->currSchema)
4056                 free(AH->currSchema);
4057         if (AH->currTablespace)
4058                 free(AH->currTablespace);
4059         if (AH->savedPassword)
4060                 free(AH->savedPassword);
4061
4062         free(AH);
4063 }