]> granicus.if.org Git - postgresql/blob - src/bin/pg_dump/pg_backup_archiver.c
Rename pg_restore -m to -j, and add documentation about what good numbers
[postgresql] / src / bin / pg_dump / pg_backup_archiver.c
1 /*-------------------------------------------------------------------------
2  *
3  * pg_backup_archiver.c
4  *
5  *      Private implementation of the archiver routines.
6  *
7  *      See the headers to pg_restore for more details.
8  *
9  * Copyright (c) 2000, Philip Warner
10  *      Rights are granted to use this software in any way so long
11  *      as this notice is not removed.
12  *
13  *      The author is not responsible for loss or damages that may
14  *      result from its use.
15  *
16  *
17  * IDENTIFICATION
18  *              $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.c,v 1.168 2009/03/20 09:21:08 petere Exp $
19  *
20  *-------------------------------------------------------------------------
21  */
22
23 #include "pg_backup_db.h"
24 #include "dumputils.h"
25
26 #include <ctype.h>
27 #include <unistd.h>
28 #include <sys/types.h>
29 #include <sys/wait.h>
30
31 #ifdef WIN32
32 #include <io.h>
33 #endif
34
35 #include "libpq/libpq-fs.h"
36
37 /*
38  * Special exit values from worker children.  We reserve 0 for normal
39  * success; 1 and other small values should be interpreted as crashes.
40  */
41 #define WORKER_CREATE_DONE              10
42 #define WORKER_INHIBIT_DATA             11
43 #define WORKER_IGNORED_ERRORS   12
44
45 /*
46  * Unix uses exit to return result from worker child, so function is void.
47  * Windows thread result comes via function return.
48  */
49 #ifndef WIN32
50 #define parallel_restore_result void
51 #else
52 #define parallel_restore_result DWORD
53 #endif
54
55 /* IDs for worker children are either PIDs or thread handles */
56 #ifndef WIN32
57 #define thandle pid_t
58 #else
59 #define thandle HANDLE
60 #endif
61
62 typedef struct _restore_args
63 {
64         ArchiveHandle *AH;
65         TocEntry      *te;
66 } RestoreArgs;
67
68 typedef struct _parallel_slot
69 {
70         thandle         child_id;
71         RestoreArgs *args;
72 } ParallelSlot;
73
74 #define NO_SLOT (-1)
75
76 const char *progname;
77
78 static const char *modulename = gettext_noop("archiver");
79
80
81 static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
82                  const int compression, ArchiveMode mode);
83 static void _getObjectDescription(PQExpBuffer buf, TocEntry *te,
84                                           ArchiveHandle *AH);
85 static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt, bool isData, bool acl_pass);
86
87
88 static void _doSetFixedOutputState(ArchiveHandle *AH);
89 static void _doSetSessionAuth(ArchiveHandle *AH, const char *user);
90 static void _doSetWithOids(ArchiveHandle *AH, const bool withOids);
91 static void _reconnectToDB(ArchiveHandle *AH, const char *dbname);
92 static void _becomeUser(ArchiveHandle *AH, const char *user);
93 static void _becomeOwner(ArchiveHandle *AH, TocEntry *te);
94 static void _selectOutputSchema(ArchiveHandle *AH, const char *schemaName);
95 static void _selectTablespace(ArchiveHandle *AH, const char *tablespace);
96 static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te);
97 static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te);
98 static teReqs _tocEntryRequired(TocEntry *te, RestoreOptions *ropt, bool include_acls);
99 static void _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
100 static void _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
101 static TocEntry *getTocEntryByDumpId(ArchiveHandle *AH, DumpId id);
102 static void _moveAfter(ArchiveHandle *AH, TocEntry *pos, TocEntry *te);
103 static int      _discoverArchiveFormat(ArchiveHandle *AH);
104
105 static void dump_lo_buf(ArchiveHandle *AH);
106 static void _write_msg(const char *modulename, const char *fmt, va_list ap);
107 static void _die_horribly(ArchiveHandle *AH, const char *modulename, const char *fmt, va_list ap);
108
109 static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim);
110 static OutputContext SetOutput(ArchiveHandle *AH, char *filename, int compression);
111 static void ResetOutput(ArchiveHandle *AH, OutputContext savedContext);
112
113 static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
114                                                          RestoreOptions *ropt, bool is_parallel);
115 static void restore_toc_entries_parallel(ArchiveHandle *AH);
116 static thandle spawn_restore(RestoreArgs *args);
117 static thandle reap_child(ParallelSlot *slots, int n_slots, int *work_status);
118 static bool work_in_progress(ParallelSlot *slots, int n_slots);
119 static int get_next_slot(ParallelSlot *slots, int n_slots);
120 static TocEntry *get_next_work_item(ArchiveHandle *AH,
121                                                                         TocEntry **first_unprocessed,
122                                                                         ParallelSlot *slots, int n_slots);
123 static parallel_restore_result parallel_restore(RestoreArgs *args);
124 static void mark_work_done(ArchiveHandle *AH, thandle worker, int status,
125                                                    ParallelSlot *slots, int n_slots);
126 static void fix_dependencies(ArchiveHandle *AH);
127 static void repoint_table_dependencies(ArchiveHandle *AH,
128                                                                            DumpId tableId, DumpId tableDataId);
129 static void identify_locking_dependencies(TocEntry *te,
130                                                                                   TocEntry **tocsByDumpId);
131 static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te);
132 static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
133 static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);
134 static ArchiveHandle *CloneArchive(ArchiveHandle *AH);
135 static void DeCloneArchive(ArchiveHandle *AH);
136
137
138 /*
139  *      Wrapper functions.
140  *
141  *      The objective it to make writing new formats and dumpers as simple
142  *      as possible, if necessary at the expense of extra function calls etc.
143  *
144  */
145
146
147 /* Create a new archive */
148 /* Public */
149 Archive *
150 CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
151                           const int compression, ArchiveMode mode)
152
153 {
154         ArchiveHandle *AH = _allocAH(FileSpec, fmt, compression, mode);
155
156         return (Archive *) AH;
157 }
158
159 /* Open an existing archive */
160 /* Public */
161 Archive *
162 OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
163 {
164         ArchiveHandle *AH = _allocAH(FileSpec, fmt, 0, archModeRead);
165
166         return (Archive *) AH;
167 }
168
169 /* Public */
170 void
171 CloseArchive(Archive *AHX)
172 {
173         int                     res = 0;
174         ArchiveHandle *AH = (ArchiveHandle *) AHX;
175
176         (*AH->ClosePtr) (AH);
177
178         /* Close the output */
179         if (AH->gzOut)
180                 res = GZCLOSE(AH->OF);
181         else if (AH->OF != stdout)
182                 res = fclose(AH->OF);
183
184         if (res != 0)
185                 die_horribly(AH, modulename, "could not close output file: %s\n",
186                                          strerror(errno));
187 }
188
189 /* Public */
190 void
191 RestoreArchive(Archive *AHX, RestoreOptions *ropt)
192 {
193         ArchiveHandle *AH = (ArchiveHandle *) AHX;
194         TocEntry   *te;
195         teReqs          reqs;
196         OutputContext sav;
197
198         AH->ropt = ropt;
199         AH->stage = STAGE_INITIALIZING;
200
201         /*
202          * Check for nonsensical option combinations.
203          *
204          * NB: create+dropSchema is useless because if you're creating the DB,
205          * there's no need to drop individual items in it.  Moreover, if we tried
206          * to do that then we'd issue the drops in the database initially
207          * connected to, not the one we will create, which is very bad...
208          */
209         if (ropt->create && ropt->dropSchema)
210                 die_horribly(AH, modulename, "-C and -c are incompatible options\n");
211
212         /*
213          * -1 is not compatible with -C, because we can't create a database
214          *  inside a transaction block.
215          */
216         if (ropt->create && ropt->single_txn)
217                 die_horribly(AH, modulename, "-C and -1 are incompatible options\n");
218
219         /*
220          * Make sure we won't need (de)compression we haven't got
221          */
222 #ifndef HAVE_LIBZ
223         if (AH->compression != 0 && AH->PrintTocDataPtr != NULL)
224         {
225                 for (te = AH->toc->next; te != AH->toc; te = te->next)
226                 {
227                         reqs = _tocEntryRequired(te, ropt, false);
228                         if (te->hadDumper && (reqs & REQ_DATA) != 0)
229                                 die_horribly(AH, modulename, "cannot restore from compressed archive (compression not supported in this installation)\n");
230                 }
231         }
232 #endif
233
234         /*
235          * If we're using a DB connection, then connect it.
236          */
237         if (ropt->useDB)
238         {
239                 ahlog(AH, 1, "connecting to database for restore\n");
240                 if (AH->version < K_VERS_1_3)
241                         die_horribly(AH, modulename, "direct database connections are not supported in pre-1.3 archives\n");
242
243                 /* XXX Should get this from the archive */
244                 AHX->minRemoteVersion = 070100;
245                 AHX->maxRemoteVersion = 999999;
246
247                 ConnectDatabase(AHX, ropt->dbname,
248                                                 ropt->pghost, ropt->pgport, ropt->username,
249                                                 ropt->promptPassword);
250
251                 /*
252                  * If we're talking to the DB directly, don't send comments since they
253                  * obscure SQL when displaying errors
254                  */
255                 AH->noTocComments = 1;
256         }
257
258         /*
259          * Work out if we have an implied data-only restore. This can happen if
260          * the dump was data only or if the user has used a toc list to exclude
261          * all of the schema data. All we do is look for schema entries - if none
262          * are found then we set the dataOnly flag.
263          *
264          * We could scan for wanted TABLE entries, but that is not the same as
265          * dataOnly. At this stage, it seems unnecessary (6-Mar-2001).
266          */
267         if (!ropt->dataOnly)
268         {
269                 int                     impliedDataOnly = 1;
270
271                 for (te = AH->toc->next; te != AH->toc; te = te->next)
272                 {
273                         reqs = _tocEntryRequired(te, ropt, true);
274                         if ((reqs & REQ_SCHEMA) != 0)
275                         {                                       /* It's schema, and it's wanted */
276                                 impliedDataOnly = 0;
277                                 break;
278                         }
279                 }
280                 if (impliedDataOnly)
281                 {
282                         ropt->dataOnly = impliedDataOnly;
283                         ahlog(AH, 1, "implied data-only restore\n");
284                 }
285         }
286
287         /*
288          * Setup the output file if necessary.
289          */
290         if (ropt->filename || ropt->compression)
291                 sav = SetOutput(AH, ropt->filename, ropt->compression);
292
293         ahprintf(AH, "--\n-- PostgreSQL database dump\n--\n\n");
294
295         if (AH->public.verbose)
296                 dumpTimestamp(AH, "Started on", AH->createDate);
297
298         if (ropt->single_txn)
299         {
300                 if (AH->connection)
301                         StartTransaction(AH);
302                 else
303                         ahprintf(AH, "BEGIN;\n\n");
304         }
305
306         /*
307          * Establish important parameter values right away.
308          */
309         _doSetFixedOutputState(AH);
310
311         AH->stage = STAGE_PROCESSING;
312
313         /*
314          * Drop the items at the start, in reverse order
315          */
316         if (ropt->dropSchema)
317         {
318                 for (te = AH->toc->prev; te != AH->toc; te = te->prev)
319                 {
320                         AH->currentTE = te;
321
322                         reqs = _tocEntryRequired(te, ropt, false /* needn't drop ACLs */ );
323                         if (((reqs & REQ_SCHEMA) != 0) && te->dropStmt)
324                         {
325                                 /* We want the schema */
326                                 ahlog(AH, 1, "dropping %s %s\n", te->desc, te->tag);
327                                 /* Select owner and schema as necessary */
328                                 _becomeOwner(AH, te);
329                                 _selectOutputSchema(AH, te->namespace);
330                                 /* Drop it */
331                                 ahprintf(AH, "%s", te->dropStmt);
332                         }
333                 }
334
335                 /*
336                  * _selectOutputSchema may have set currSchema to reflect the effect
337                  * of a "SET search_path" command it emitted.  However, by now we may
338                  * have dropped that schema; or it might not have existed in the first
339                  * place.  In either case the effective value of search_path will not
340                  * be what we think.  Forcibly reset currSchema so that we will
341                  * re-establish the search_path setting when needed (after creating
342                  * the schema).
343                  *
344                  * If we treated users as pg_dump'able objects then we'd need to reset
345                  * currUser here too.
346                  */
347                 if (AH->currSchema)
348                         free(AH->currSchema);
349                 AH->currSchema = NULL;
350         }
351
352         /*
353          * In serial mode, we now process each non-ACL TOC entry.
354          *
355          * In parallel mode, turn control over to the parallel-restore logic.
356          */
357         if (ropt->number_of_jobs > 1 && ropt->useDB)
358                 restore_toc_entries_parallel(AH);
359         else
360         {
361                 for (te = AH->toc->next; te != AH->toc; te = te->next)
362                         (void) restore_toc_entry(AH, te, ropt, false);
363         }
364
365         /*
366          * Scan TOC again to output ownership commands and ACLs
367          */
368         for (te = AH->toc->next; te != AH->toc; te = te->next)
369         {
370                 AH->currentTE = te;
371
372                 /* Work out what, if anything, we want from this entry */
373                 reqs = _tocEntryRequired(te, ropt, true);
374
375                 if ((reqs & REQ_SCHEMA) != 0)   /* We want the schema */
376                 {
377                         ahlog(AH, 1, "setting owner and privileges for %s %s\n",
378                                   te->desc, te->tag);
379                         _printTocEntry(AH, te, ropt, false, true);
380                 }
381         }
382
383         if (ropt->single_txn)
384         {
385                 if (AH->connection)
386                         CommitTransaction(AH);
387                 else
388                         ahprintf(AH, "COMMIT;\n\n");
389         }
390
391         if (AH->public.verbose)
392                 dumpTimestamp(AH, "Completed on", time(NULL));
393
394         ahprintf(AH, "--\n-- PostgreSQL database dump complete\n--\n\n");
395
396         /*
397          * Clean up & we're done.
398          */
399         AH->stage = STAGE_FINALIZING;
400
401         if (ropt->filename || ropt->compression)
402                 ResetOutput(AH, sav);
403
404         if (ropt->useDB)
405         {
406                 PQfinish(AH->connection);
407                 AH->connection = NULL;
408         }
409 }
410
411 /*
412  * Restore a single TOC item.  Used in both parallel and non-parallel restore;
413  * is_parallel is true if we are in a worker child process.
414  *
415  * Returns 0 normally, but WORKER_CREATE_DONE or WORKER_INHIBIT_DATA if
416  * the parallel parent has to make the corresponding status update.
417  */
418 static int
419 restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
420                                   RestoreOptions *ropt, bool is_parallel)
421 {
422         int         retval = 0;
423         teReqs          reqs;
424         bool            defnDumped;
425
426         AH->currentTE = te;
427
428         /* Work out what, if anything, we want from this entry */
429         reqs = _tocEntryRequired(te, ropt, false);
430
431         /* Dump any relevant dump warnings to stderr */
432         if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0)
433         {
434                 if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0)
435                         write_msg(modulename, "warning from original dump file: %s\n", te->defn);
436                 else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0)
437                         write_msg(modulename, "warning from original dump file: %s\n", te->copyStmt);
438         }
439
440         defnDumped = false;
441
442         if ((reqs & REQ_SCHEMA) != 0)   /* We want the schema */
443         {
444                 ahlog(AH, 1, "creating %s %s\n", te->desc, te->tag);
445
446                 _printTocEntry(AH, te, ropt, false, false);
447                 defnDumped = true;
448
449                 if (strcmp(te->desc, "TABLE") == 0)
450                 {
451                         if (AH->lastErrorTE == te)
452                         {
453                                 /*
454                                  * We failed to create the table.
455                                  * If --no-data-for-failed-tables was given,
456                                  * mark the corresponding TABLE DATA to be ignored.
457                                  *
458                                  * In the parallel case this must be done in the parent,
459                                  * so we just set the return value.
460                                  */
461                                 if (ropt->noDataForFailedTables)
462                                 {
463                                         if (is_parallel)
464                                                 retval = WORKER_INHIBIT_DATA;
465                                         else
466                                                 inhibit_data_for_failed_table(AH, te);
467                                 }
468                         }
469                         else
470                         {
471                                 /*
472                                  * We created the table successfully.  Mark the
473                                  * corresponding TABLE DATA for possible truncation.
474                                  *
475                                  * In the parallel case this must be done in the parent,
476                                  * so we just set the return value.
477                                  */
478                                 if (is_parallel)
479                                         retval = WORKER_CREATE_DONE;
480                                 else
481                                         mark_create_done(AH, te);
482                         }
483                 }
484
485                 /* If we created a DB, connect to it... */
486                 if (strcmp(te->desc, "DATABASE") == 0)
487                 {
488                         ahlog(AH, 1, "connecting to new database \"%s\"\n", te->tag);
489                         _reconnectToDB(AH, te->tag);
490                         ropt->dbname = strdup(te->tag);
491                 }
492         }
493
494         /*
495          * If we have a data component, then process it
496          */
497         if ((reqs & REQ_DATA) != 0)
498         {
499                 /*
500                  * hadDumper will be set if there is genuine data component for
501                  * this node. Otherwise, we need to check the defn field for
502                  * statements that need to be executed in data-only restores.
503                  */
504                 if (te->hadDumper)
505                 {
506                         /*
507                          * If we can output the data, then restore it.
508                          */
509                         if (AH->PrintTocDataPtr != NULL && (reqs & REQ_DATA) != 0)
510                         {
511                                 _printTocEntry(AH, te, ropt, true, false);
512
513                                 if (strcmp(te->desc, "BLOBS") == 0 ||
514                                         strcmp(te->desc, "BLOB COMMENTS") == 0)
515                                 {
516                                         ahlog(AH, 1, "restoring %s\n", te->desc);
517
518                                         _selectOutputSchema(AH, "pg_catalog");
519
520                                         (*AH->PrintTocDataPtr) (AH, te, ropt);
521                                 }
522                                 else
523                                 {
524                                         _disableTriggersIfNecessary(AH, te, ropt);
525
526                                         /* Select owner and schema as necessary */
527                                         _becomeOwner(AH, te);
528                                         _selectOutputSchema(AH, te->namespace);
529
530                                         ahlog(AH, 1, "restoring data for table \"%s\"\n",
531                                                   te->tag);
532
533                                         /*
534                                          * In parallel restore, if we created the table earlier
535                                          * in the run then we wrap the COPY in a transaction and
536                                          * precede it with a TRUNCATE.  If archiving is not on
537                                          * this prevents WAL-logging the COPY.  This obtains a
538                                          * speedup similar to that from using single_txn mode
539                                          * in non-parallel restores.
540                                          */
541                                         if (is_parallel && te->created)
542                                         {
543                                                 /*
544                                                  * Parallel restore is always talking directly to a
545                                                  * server, so no need to see if we should issue BEGIN.
546                                                  */
547                                                 StartTransaction(AH);
548
549                                                 /*
550                                                  * If the server version is >= 8.4, make sure we issue
551                                                  * TRUNCATE with ONLY so that child tables are not
552                                                  * wiped.
553                                                  */
554                                                 ahprintf(AH, "TRUNCATE TABLE %s%s;\n\n",
555                                                                  (PQserverVersion(AH->connection) >= 80400 ?
556                                                                   "ONLY " : ""),
557                                                                  fmtId(te->tag));
558                                         }
559
560                                         /*
561                                          * If we have a copy statement, use it. As of V1.3,
562                                          * these are separate to allow easy import from
563                                          * withing a database connection. Pre 1.3 archives can
564                                          * not use DB connections and are sent to output only.
565                                          *
566                                          * For V1.3+, the table data MUST have a copy
567                                          * statement so that we can go into appropriate mode
568                                          * with libpq.
569                                          */
570                                         if (te->copyStmt && strlen(te->copyStmt) > 0)
571                                         {
572                                                 ahprintf(AH, "%s", te->copyStmt);
573                                                 AH->writingCopyData = true;
574                                         }
575
576                                         (*AH->PrintTocDataPtr) (AH, te, ropt);
577
578                                         AH->writingCopyData = false;
579
580                                         /* close out the transaction started above */
581                                         if (is_parallel && te->created)
582                                                 CommitTransaction(AH);
583
584                                         _enableTriggersIfNecessary(AH, te, ropt);
585                                 }
586                         }
587                 }
588                 else if (!defnDumped)
589                 {
590                         /* If we haven't already dumped the defn part, do so now */
591                         ahlog(AH, 1, "executing %s %s\n", te->desc, te->tag);
592                         _printTocEntry(AH, te, ropt, false, false);
593                 }
594         }
595
596         return retval;
597 }
598
599 /*
600  * Allocate a new RestoreOptions block.
601  * This is mainly so we can initialize it, but also for future expansion,
602  */
603 RestoreOptions *
604 NewRestoreOptions(void)
605 {
606         RestoreOptions *opts;
607
608         opts = (RestoreOptions *) calloc(1, sizeof(RestoreOptions));
609
610         /* set any fields that shouldn't default to zeroes */
611         opts->format = archUnknown;
612         opts->promptPassword = TRI_DEFAULT;
613
614         return opts;
615 }
616
617 static void
618 _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt)
619 {
620         /* This hack is only needed in a data-only restore */
621         if (!ropt->dataOnly || !ropt->disable_triggers)
622                 return;
623
624         ahlog(AH, 1, "disabling triggers for %s\n", te->tag);
625
626         /*
627          * Become superuser if possible, since they are the only ones who can
628          * disable constraint triggers.  If -S was not given, assume the initial
629          * user identity is a superuser.  (XXX would it be better to become the
630          * table owner?)
631          */
632         _becomeUser(AH, ropt->superuser);
633
634         /*
635          * Disable them.
636          */
637         _selectOutputSchema(AH, te->namespace);
638
639         ahprintf(AH, "ALTER TABLE %s DISABLE TRIGGER ALL;\n\n",
640                          fmtId(te->tag));
641 }
642
643 static void
644 _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt)
645 {
646         /* This hack is only needed in a data-only restore */
647         if (!ropt->dataOnly || !ropt->disable_triggers)
648                 return;
649
650         ahlog(AH, 1, "enabling triggers for %s\n", te->tag);
651
652         /*
653          * Become superuser if possible, since they are the only ones who can
654          * disable constraint triggers.  If -S was not given, assume the initial
655          * user identity is a superuser.  (XXX would it be better to become the
656          * table owner?)
657          */
658         _becomeUser(AH, ropt->superuser);
659
660         /*
661          * Enable them.
662          */
663         _selectOutputSchema(AH, te->namespace);
664
665         ahprintf(AH, "ALTER TABLE %s ENABLE TRIGGER ALL;\n\n",
666                          fmtId(te->tag));
667 }
668
669 /*
670  * This is a routine that is part of the dumper interface, hence the 'Archive*' parameter.
671  */
672
673 /* Public */
674 size_t
675 WriteData(Archive *AHX, const void *data, size_t dLen)
676 {
677         ArchiveHandle *AH = (ArchiveHandle *) AHX;
678
679         if (!AH->currToc)
680                 die_horribly(AH, modulename, "internal error -- WriteData cannot be called outside the context of a DataDumper routine\n");
681
682         return (*AH->WriteDataPtr) (AH, data, dLen);
683 }
684
685 /*
686  * Create a new TOC entry. The TOC was designed as a TOC, but is now the
687  * repository for all metadata. But the name has stuck.
688  */
689
690 /* Public */
691 void
692 ArchiveEntry(Archive *AHX,
693                          CatalogId catalogId, DumpId dumpId,
694                          const char *tag,
695                          const char *namespace,
696                          const char *tablespace,
697                          const char *owner, bool withOids,
698                          const char *desc, teSection section,
699                          const char *defn,
700                          const char *dropStmt, const char *copyStmt,
701                          const DumpId *deps, int nDeps,
702                          DataDumperPtr dumpFn, void *dumpArg)
703 {
704         ArchiveHandle *AH = (ArchiveHandle *) AHX;
705         TocEntry   *newToc;
706
707         newToc = (TocEntry *) calloc(1, sizeof(TocEntry));
708         if (!newToc)
709                 die_horribly(AH, modulename, "out of memory\n");
710
711         AH->tocCount++;
712         if (dumpId > AH->maxDumpId)
713                 AH->maxDumpId = dumpId;
714
715         newToc->prev = AH->toc->prev;
716         newToc->next = AH->toc;
717         AH->toc->prev->next = newToc;
718         AH->toc->prev = newToc;
719
720         newToc->catalogId = catalogId;
721         newToc->dumpId = dumpId;
722         newToc->section = section;
723
724         newToc->tag = strdup(tag);
725         newToc->namespace = namespace ? strdup(namespace) : NULL;
726         newToc->tablespace = tablespace ? strdup(tablespace) : NULL;
727         newToc->owner = strdup(owner);
728         newToc->withOids = withOids;
729         newToc->desc = strdup(desc);
730         newToc->defn = strdup(defn);
731         newToc->dropStmt = strdup(dropStmt);
732         newToc->copyStmt = copyStmt ? strdup(copyStmt) : NULL;
733
734         if (nDeps > 0)
735         {
736                 newToc->dependencies = (DumpId *) malloc(nDeps * sizeof(DumpId));
737                 memcpy(newToc->dependencies, deps, nDeps * sizeof(DumpId));
738                 newToc->nDeps = nDeps;
739         }
740         else
741         {
742                 newToc->dependencies = NULL;
743                 newToc->nDeps = 0;
744         }
745
746         newToc->dataDumper = dumpFn;
747         newToc->dataDumperArg = dumpArg;
748         newToc->hadDumper = dumpFn ? true : false;
749
750         newToc->formatData = NULL;
751
752         if (AH->ArchiveEntryPtr !=NULL)
753                 (*AH->ArchiveEntryPtr) (AH, newToc);
754 }
755
756 /* Public */
757 void
758 PrintTOCSummary(Archive *AHX, RestoreOptions *ropt)
759 {
760         ArchiveHandle *AH = (ArchiveHandle *) AHX;
761         TocEntry   *te;
762         OutputContext sav;
763         char       *fmtName;
764
765         if (ropt->filename)
766                 sav = SetOutput(AH, ropt->filename, 0 /* no compression */ );
767
768         ahprintf(AH, ";\n; Archive created at %s", ctime(&AH->createDate));
769         ahprintf(AH, ";     dbname: %s\n;     TOC Entries: %d\n;     Compression: %d\n",
770                          AH->archdbname, AH->tocCount, AH->compression);
771
772         switch (AH->format)
773         {
774                 case archFiles:
775                         fmtName = "FILES";
776                         break;
777                 case archCustom:
778                         fmtName = "CUSTOM";
779                         break;
780                 case archTar:
781                         fmtName = "TAR";
782                         break;
783                 default:
784                         fmtName = "UNKNOWN";
785         }
786
787         ahprintf(AH, ";     Dump Version: %d.%d-%d\n", AH->vmaj, AH->vmin, AH->vrev);
788         ahprintf(AH, ";     Format: %s\n", fmtName);
789         ahprintf(AH, ";     Integer: %d bytes\n", (int) AH->intSize);
790         ahprintf(AH, ";     Offset: %d bytes\n", (int) AH->offSize);
791         if (AH->archiveRemoteVersion)
792                 ahprintf(AH, ";     Dumped from database version: %s\n",
793                                  AH->archiveRemoteVersion);
794         if (AH->archiveDumpVersion)
795                 ahprintf(AH, ";     Dumped by pg_dump version: %s\n",
796                                  AH->archiveDumpVersion);
797
798         ahprintf(AH, ";\n;\n; Selected TOC Entries:\n;\n");
799
800         for (te = AH->toc->next; te != AH->toc; te = te->next)
801         {
802                 if (ropt->verbose || _tocEntryRequired(te, ropt, true) != 0)
803                         ahprintf(AH, "%d; %u %u %s %s %s %s\n", te->dumpId,
804                                          te->catalogId.tableoid, te->catalogId.oid,
805                                          te->desc, te->namespace ? te->namespace : "-",
806                                          te->tag, te->owner);
807                 if (ropt->verbose && te->nDeps > 0)
808                 {
809                         int             i;
810
811                         ahprintf(AH, ";\tdepends on:");
812                         for (i = 0; i < te->nDeps; i++)
813                                 ahprintf(AH, " %d", te->dependencies[i]);
814                         ahprintf(AH, "\n");
815                 }
816         }
817
818         if (ropt->filename)
819                 ResetOutput(AH, sav);
820 }
821
822 /***********
823  * BLOB Archival
824  ***********/
825
826 /* Called by a dumper to signal start of a BLOB */
827 int
828 StartBlob(Archive *AHX, Oid oid)
829 {
830         ArchiveHandle *AH = (ArchiveHandle *) AHX;
831
832         if (!AH->StartBlobPtr)
833                 die_horribly(AH, modulename, "large-object output not supported in chosen format\n");
834
835         (*AH->StartBlobPtr) (AH, AH->currToc, oid);
836
837         return 1;
838 }
839
840 /* Called by a dumper to signal end of a BLOB */
841 int
842 EndBlob(Archive *AHX, Oid oid)
843 {
844         ArchiveHandle *AH = (ArchiveHandle *) AHX;
845
846         if (AH->EndBlobPtr)
847                 (*AH->EndBlobPtr) (AH, AH->currToc, oid);
848
849         return 1;
850 }
851
852 /**********
853  * BLOB Restoration
854  **********/
855
856 /*
857  * Called by a format handler before any blobs are restored
858  */
859 void
860 StartRestoreBlobs(ArchiveHandle *AH)
861 {
862         if (!AH->ropt->single_txn)
863         {
864                 if (AH->connection)
865                         StartTransaction(AH);
866                 else
867                         ahprintf(AH, "BEGIN;\n\n");
868         }
869
870         AH->blobCount = 0;
871 }
872
873 /*
874  * Called by a format handler after all blobs are restored
875  */
876 void
877 EndRestoreBlobs(ArchiveHandle *AH)
878 {
879         if (!AH->ropt->single_txn)
880         {
881                 if (AH->connection)
882                         CommitTransaction(AH);
883                 else
884                         ahprintf(AH, "COMMIT;\n\n");
885         }
886
887         ahlog(AH, 1, "restored %d large objects\n", AH->blobCount);
888 }
889
890
891 /*
892  * Called by a format handler to initiate restoration of a blob
893  */
894 void
895 StartRestoreBlob(ArchiveHandle *AH, Oid oid)
896 {
897         Oid                     loOid;
898
899         AH->blobCount++;
900
901         /* Initialize the LO Buffer */
902         AH->lo_buf_used = 0;
903
904         ahlog(AH, 2, "restoring large object with OID %u\n", oid);
905
906         if (AH->connection)
907         {
908                 loOid = lo_create(AH->connection, oid);
909                 if (loOid == 0 || loOid != oid)
910                         die_horribly(AH, modulename, "could not create large object %u\n",
911                                                  oid);
912
913                 AH->loFd = lo_open(AH->connection, oid, INV_WRITE);
914                 if (AH->loFd == -1)
915                         die_horribly(AH, modulename, "could not open large object\n");
916         }
917         else
918         {
919                 ahprintf(AH, "SELECT lo_open(lo_create(%u), %d);\n", oid, INV_WRITE);
920         }
921
922         AH->writingBlob = 1;
923 }
924
925 void
926 EndRestoreBlob(ArchiveHandle *AH, Oid oid)
927 {
928         if (AH->lo_buf_used > 0)
929         {
930                 /* Write remaining bytes from the LO buffer */
931                 dump_lo_buf(AH);
932         }
933
934         AH->writingBlob = 0;
935
936         if (AH->connection)
937         {
938                 lo_close(AH->connection, AH->loFd);
939                 AH->loFd = -1;
940         }
941         else
942         {
943                 ahprintf(AH, "SELECT lo_close(0);\n\n");
944         }
945 }
946
947 /***********
948  * Sorting and Reordering
949  ***********/
950
951 void
952 SortTocFromFile(Archive *AHX, RestoreOptions *ropt)
953 {
954         ArchiveHandle *AH = (ArchiveHandle *) AHX;
955         FILE       *fh;
956         char            buf[1024];
957         char       *cmnt;
958         char       *endptr;
959         DumpId          id;
960         TocEntry   *te;
961         TocEntry   *tePrev;
962
963         /* Allocate space for the 'wanted' array, and init it */
964         ropt->idWanted = (bool *) malloc(sizeof(bool) * AH->maxDumpId);
965         memset(ropt->idWanted, 0, sizeof(bool) * AH->maxDumpId);
966
967         /* Set prev entry as head of list */
968         tePrev = AH->toc;
969
970         /* Setup the file */
971         fh = fopen(ropt->tocFile, PG_BINARY_R);
972         if (!fh)
973                 die_horribly(AH, modulename, "could not open TOC file \"%s\": %s\n",
974                                          ropt->tocFile, strerror(errno));
975
976         while (fgets(buf, sizeof(buf), fh) != NULL)
977         {
978                 /* Truncate line at comment, if any */
979                 cmnt = strchr(buf, ';');
980                 if (cmnt != NULL)
981                         cmnt[0] = '\0';
982
983                 /* Ignore if all blank */
984                 if (strspn(buf, " \t\r") == strlen(buf))
985                         continue;
986
987                 /* Get an ID, check it's valid and not already seen */
988                 id = strtol(buf, &endptr, 10);
989                 if (endptr == buf || id <= 0 || id > AH->maxDumpId ||
990                         ropt->idWanted[id - 1])
991                 {
992                         write_msg(modulename, "WARNING: line ignored: %s\n", buf);
993                         continue;
994                 }
995
996                 /* Find TOC entry */
997                 te = getTocEntryByDumpId(AH, id);
998                 if (!te)
999                         die_horribly(AH, modulename, "could not find entry for ID %d\n",
1000                                                  id);
1001
1002                 ropt->idWanted[id - 1] = true;
1003
1004                 _moveAfter(AH, tePrev, te);
1005                 tePrev = te;
1006         }
1007
1008         if (fclose(fh) != 0)
1009                 die_horribly(AH, modulename, "could not close TOC file: %s\n",
1010                                          strerror(errno));
1011 }
1012
1013 /*
1014  * Set up a dummy ID filter that selects all dump IDs
1015  */
1016 void
1017 InitDummyWantedList(Archive *AHX, RestoreOptions *ropt)
1018 {
1019         ArchiveHandle *AH = (ArchiveHandle *) AHX;
1020
1021         /* Allocate space for the 'wanted' array, and init it to 1's */
1022         ropt->idWanted = (bool *) malloc(sizeof(bool) * AH->maxDumpId);
1023         memset(ropt->idWanted, 1, sizeof(bool) * AH->maxDumpId);
1024 }
1025
1026 /**********************
1027  * 'Convenience functions that look like standard IO functions
1028  * for writing data when in dump mode.
1029  **********************/
1030
1031 /* Public */
1032 int
1033 archputs(const char *s, Archive *AH)
1034 {
1035         return WriteData(AH, s, strlen(s));
1036 }
1037
1038 /* Public */
1039 int
1040 archprintf(Archive *AH, const char *fmt,...)
1041 {
1042         char       *p = NULL;
1043         va_list         ap;
1044         int                     bSize = strlen(fmt) + 256;
1045         int                     cnt = -1;
1046
1047         /*
1048          * This is paranoid: deal with the possibility that vsnprintf is willing
1049          * to ignore trailing null or returns > 0 even if string does not fit. It
1050          * may be the case that it returns cnt = bufsize
1051          */
1052         while (cnt < 0 || cnt >= (bSize - 1))
1053         {
1054                 if (p != NULL)
1055                         free(p);
1056                 bSize *= 2;
1057                 p = (char *) malloc(bSize);
1058                 if (p == NULL)
1059                         exit_horribly(AH, modulename, "out of memory\n");
1060                 va_start(ap, fmt);
1061                 cnt = vsnprintf(p, bSize, fmt, ap);
1062                 va_end(ap);
1063         }
1064         WriteData(AH, p, cnt);
1065         free(p);
1066         return cnt;
1067 }
1068
1069
1070 /*******************************
1071  * Stuff below here should be 'private' to the archiver routines
1072  *******************************/
1073
1074 static OutputContext
1075 SetOutput(ArchiveHandle *AH, char *filename, int compression)
1076 {
1077         OutputContext sav;
1078         int                     fn;
1079
1080         /* Replace the AH output file handle */
1081         sav.OF = AH->OF;
1082         sav.gzOut = AH->gzOut;
1083
1084         if (filename)
1085                 fn = -1;
1086         else if (AH->FH)
1087                 fn = fileno(AH->FH);
1088         else if (AH->fSpec)
1089         {
1090                 fn = -1;
1091                 filename = AH->fSpec;
1092         }
1093         else
1094                 fn = fileno(stdout);
1095
1096         /* If compression explicitly requested, use gzopen */
1097 #ifdef HAVE_LIBZ
1098         if (compression != 0)
1099         {
1100                 char            fmode[10];
1101
1102                 /* Don't use PG_BINARY_x since this is zlib */
1103                 sprintf(fmode, "wb%d", compression);
1104                 if (fn >= 0)
1105                         AH->OF = gzdopen(dup(fn), fmode);
1106                 else
1107                         AH->OF = gzopen(filename, fmode);
1108                 AH->gzOut = 1;
1109         }
1110         else
1111 #endif
1112         {                                                       /* Use fopen */
1113                 if (AH->mode == archModeAppend)
1114                 {
1115                         if (fn >= 0)
1116                                 AH->OF = fdopen(dup(fn), PG_BINARY_A);
1117                         else
1118                                 AH->OF = fopen(filename, PG_BINARY_A);
1119                 }
1120                 else
1121                 {
1122                         if (fn >= 0)
1123                                 AH->OF = fdopen(dup(fn), PG_BINARY_W);
1124                         else
1125                                 AH->OF = fopen(filename, PG_BINARY_W);
1126                 }
1127                 AH->gzOut = 0;
1128         }
1129
1130         if (!AH->OF)
1131         {
1132                 if (filename)
1133                         die_horribly(AH, modulename, "could not open output file \"%s\": %s\n",
1134                                                  filename, strerror(errno));
1135                 else
1136                         die_horribly(AH, modulename, "could not open output file: %s\n",
1137                                                  strerror(errno));
1138         }
1139
1140         return sav;
1141 }
1142
1143 static void
1144 ResetOutput(ArchiveHandle *AH, OutputContext sav)
1145 {
1146         int                     res;
1147
1148         if (AH->gzOut)
1149                 res = GZCLOSE(AH->OF);
1150         else
1151                 res = fclose(AH->OF);
1152
1153         if (res != 0)
1154                 die_horribly(AH, modulename, "could not close output file: %s\n",
1155                                          strerror(errno));
1156
1157         AH->gzOut = sav.gzOut;
1158         AH->OF = sav.OF;
1159 }
1160
1161
1162
1163 /*
1164  *      Print formatted text to the output file (usually stdout).
1165  */
1166 int
1167 ahprintf(ArchiveHandle *AH, const char *fmt,...)
1168 {
1169         char       *p = NULL;
1170         va_list         ap;
1171         int                     bSize = strlen(fmt) + 256;              /* Should be enough */
1172         int                     cnt = -1;
1173
1174         /*
1175          * This is paranoid: deal with the possibility that vsnprintf is willing
1176          * to ignore trailing null
1177          */
1178
1179         /*
1180          * or returns > 0 even if string does not fit. It may be the case that it
1181          * returns cnt = bufsize
1182          */
1183         while (cnt < 0 || cnt >= (bSize - 1))
1184         {
1185                 if (p != NULL)
1186                         free(p);
1187                 bSize *= 2;
1188                 p = (char *) malloc(bSize);
1189                 if (p == NULL)
1190                         die_horribly(AH, modulename, "out of memory\n");
1191                 va_start(ap, fmt);
1192                 cnt = vsnprintf(p, bSize, fmt, ap);
1193                 va_end(ap);
1194         }
1195         ahwrite(p, 1, cnt, AH);
1196         free(p);
1197         return cnt;
1198 }
1199
1200 void
1201 ahlog(ArchiveHandle *AH, int level, const char *fmt,...)
1202 {
1203         va_list         ap;
1204
1205         if (AH->debugLevel < level && (!AH->public.verbose || level > 1))
1206                 return;
1207
1208         va_start(ap, fmt);
1209         _write_msg(NULL, fmt, ap);
1210         va_end(ap);
1211 }
1212
1213 /*
1214  * Single place for logic which says 'We are restoring to a direct DB connection'.
1215  */
1216 static int
1217 RestoringToDB(ArchiveHandle *AH)
1218 {
1219         return (AH->ropt && AH->ropt->useDB && AH->connection);
1220 }
1221
1222 /*
1223  * Dump the current contents of the LO data buffer while writing a BLOB
1224  */
1225 static void
1226 dump_lo_buf(ArchiveHandle *AH)
1227 {
1228         if (AH->connection)
1229         {
1230                 size_t          res;
1231
1232                 res = lo_write(AH->connection, AH->loFd, AH->lo_buf, AH->lo_buf_used);
1233                 ahlog(AH, 5, "wrote %lu bytes of large object data (result = %lu)\n",
1234                           (unsigned long) AH->lo_buf_used, (unsigned long) res);
1235                 if (res != AH->lo_buf_used)
1236                         die_horribly(AH, modulename,
1237                         "could not write to large object (result: %lu, expected: %lu)\n",
1238                                            (unsigned long) res, (unsigned long) AH->lo_buf_used);
1239         }
1240         else
1241         {
1242                 unsigned char *str;
1243                 size_t          len;
1244
1245                 str = PQescapeBytea((const unsigned char *) AH->lo_buf,
1246                                                         AH->lo_buf_used, &len);
1247                 if (!str)
1248                         die_horribly(AH, modulename, "out of memory\n");
1249
1250                 /* Hack: turn off writingBlob so ahwrite doesn't recurse to here */
1251                 AH->writingBlob = 0;
1252                 ahprintf(AH, "SELECT lowrite(0, '%s');\n", str);
1253                 AH->writingBlob = 1;
1254
1255                 free(str);
1256         }
1257         AH->lo_buf_used = 0;
1258 }
1259
1260
1261 /*
1262  *      Write buffer to the output file (usually stdout). This is user for
1263  *      outputting 'restore' scripts etc. It is even possible for an archive
1264  *      format to create a custom output routine to 'fake' a restore if it
1265  *      wants to generate a script (see TAR output).
1266  */
1267 int
1268 ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
1269 {
1270         size_t          res;
1271
1272         if (AH->writingBlob)
1273         {
1274                 size_t          remaining = size * nmemb;
1275
1276                 while (AH->lo_buf_used + remaining > AH->lo_buf_size)
1277                 {
1278                         size_t          avail = AH->lo_buf_size - AH->lo_buf_used;
1279
1280                         memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, avail);
1281                         ptr = (const void *) ((const char *) ptr + avail);
1282                         remaining -= avail;
1283                         AH->lo_buf_used += avail;
1284                         dump_lo_buf(AH);
1285                 }
1286
1287                 memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, remaining);
1288                 AH->lo_buf_used += remaining;
1289
1290                 return size * nmemb;
1291         }
1292         else if (AH->gzOut)
1293         {
1294                 res = GZWRITE((void *) ptr, size, nmemb, AH->OF);
1295                 if (res != (nmemb * size))
1296                         die_horribly(AH, modulename, "could not write to output file: %s\n", strerror(errno));
1297                 return res;
1298         }
1299         else if (AH->CustomOutPtr)
1300         {
1301                 res = AH->CustomOutPtr (AH, ptr, size * nmemb);
1302
1303                 if (res != (nmemb * size))
1304                         die_horribly(AH, modulename, "could not write to custom output routine\n");
1305                 return res;
1306         }
1307         else
1308         {
1309                 /*
1310                  * If we're doing a restore, and it's direct to DB, and we're
1311                  * connected then send it to the DB.
1312                  */
1313                 if (RestoringToDB(AH))
1314                         return ExecuteSqlCommandBuf(AH, (void *) ptr, size * nmemb);            /* Always 1, currently */
1315                 else
1316                 {
1317                         res = fwrite((void *) ptr, size, nmemb, AH->OF);
1318                         if (res != nmemb)
1319                                 die_horribly(AH, modulename, "could not write to output file: %s\n",
1320                                                          strerror(errno));
1321                         return res;
1322                 }
1323         }
1324 }
1325
1326 /* Common exit code */
1327 static void
1328 _write_msg(const char *modulename, const char *fmt, va_list ap)
1329 {
1330         if (modulename)
1331                 fprintf(stderr, "%s: [%s] ", progname, _(modulename));
1332         else
1333                 fprintf(stderr, "%s: ", progname);
1334         vfprintf(stderr, _(fmt), ap);
1335 }
1336
1337 void
1338 write_msg(const char *modulename, const char *fmt,...)
1339 {
1340         va_list         ap;
1341
1342         va_start(ap, fmt);
1343         _write_msg(modulename, fmt, ap);
1344         va_end(ap);
1345 }
1346
1347
1348 static void
1349 _die_horribly(ArchiveHandle *AH, const char *modulename, const char *fmt, va_list ap)
1350 {
1351         _write_msg(modulename, fmt, ap);
1352
1353         if (AH)
1354         {
1355                 if (AH->public.verbose)
1356                         write_msg(NULL, "*** aborted because of error\n");
1357                 if (AH->connection)
1358                         PQfinish(AH->connection);
1359         }
1360
1361         exit(1);
1362 }
1363
1364 /* External use */
1365 void
1366 exit_horribly(Archive *AH, const char *modulename, const char *fmt,...)
1367 {
1368         va_list         ap;
1369
1370         va_start(ap, fmt);
1371         _die_horribly((ArchiveHandle *) AH, modulename, fmt, ap);
1372         va_end(ap);
1373 }
1374
1375 /* Archiver use (just different arg declaration) */
1376 void
1377 die_horribly(ArchiveHandle *AH, const char *modulename, const char *fmt,...)
1378 {
1379         va_list         ap;
1380
1381         va_start(ap, fmt);
1382         _die_horribly(AH, modulename, fmt, ap);
1383         va_end(ap);
1384 }
1385
1386 /* on some error, we may decide to go on... */
1387 void
1388 warn_or_die_horribly(ArchiveHandle *AH,
1389                                          const char *modulename, const char *fmt,...)
1390 {
1391         va_list         ap;
1392
1393         switch (AH->stage)
1394         {
1395
1396                 case STAGE_NONE:
1397                         /* Do nothing special */
1398                         break;
1399
1400                 case STAGE_INITIALIZING:
1401                         if (AH->stage != AH->lastErrorStage)
1402                                 write_msg(modulename, "Error while INITIALIZING:\n");
1403                         break;
1404
1405                 case STAGE_PROCESSING:
1406                         if (AH->stage != AH->lastErrorStage)
1407                                 write_msg(modulename, "Error while PROCESSING TOC:\n");
1408                         break;
1409
1410                 case STAGE_FINALIZING:
1411                         if (AH->stage != AH->lastErrorStage)
1412                                 write_msg(modulename, "Error while FINALIZING:\n");
1413                         break;
1414         }
1415         if (AH->currentTE != NULL && AH->currentTE != AH->lastErrorTE)
1416         {
1417                 write_msg(modulename, "Error from TOC entry %d; %u %u %s %s %s\n",
1418                                   AH->currentTE->dumpId,
1419                          AH->currentTE->catalogId.tableoid, AH->currentTE->catalogId.oid,
1420                           AH->currentTE->desc, AH->currentTE->tag, AH->currentTE->owner);
1421         }
1422         AH->lastErrorStage = AH->stage;
1423         AH->lastErrorTE = AH->currentTE;
1424
1425         va_start(ap, fmt);
1426         if (AH->public.exit_on_error)
1427                 _die_horribly(AH, modulename, fmt, ap);
1428         else
1429         {
1430                 _write_msg(modulename, fmt, ap);
1431                 AH->public.n_errors++;
1432         }
1433         va_end(ap);
1434 }
1435
1436 static void
1437 _moveAfter(ArchiveHandle *AH, TocEntry *pos, TocEntry *te)
1438 {
1439         te->prev->next = te->next;
1440         te->next->prev = te->prev;
1441
1442         te->prev = pos;
1443         te->next = pos->next;
1444
1445         pos->next->prev = te;
1446         pos->next = te;
1447 }
1448
1449 #ifdef NOT_USED
1450
1451 static void
1452 _moveBefore(ArchiveHandle *AH, TocEntry *pos, TocEntry *te)
1453 {
1454         te->prev->next = te->next;
1455         te->next->prev = te->prev;
1456
1457         te->prev = pos->prev;
1458         te->next = pos;
1459         pos->prev->next = te;
1460         pos->prev = te;
1461 }
1462 #endif
1463
1464 static TocEntry *
1465 getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
1466 {
1467         TocEntry   *te;
1468
1469         for (te = AH->toc->next; te != AH->toc; te = te->next)
1470         {
1471                 if (te->dumpId == id)
1472                         return te;
1473         }
1474         return NULL;
1475 }
1476
1477 teReqs
1478 TocIDRequired(ArchiveHandle *AH, DumpId id, RestoreOptions *ropt)
1479 {
1480         TocEntry   *te = getTocEntryByDumpId(AH, id);
1481
1482         if (!te)
1483                 return 0;
1484
1485         return _tocEntryRequired(te, ropt, true);
1486 }
1487
1488 size_t
1489 WriteOffset(ArchiveHandle *AH, pgoff_t o, int wasSet)
1490 {
1491         int                     off;
1492
1493         /* Save the flag */
1494         (*AH->WriteBytePtr) (AH, wasSet);
1495
1496         /* Write out pgoff_t smallest byte first, prevents endian mismatch */
1497         for (off = 0; off < sizeof(pgoff_t); off++)
1498         {
1499                 (*AH->WriteBytePtr) (AH, o & 0xFF);
1500                 o >>= 8;
1501         }
1502         return sizeof(pgoff_t) + 1;
1503 }
1504
1505 int
1506 ReadOffset(ArchiveHandle *AH, pgoff_t * o)
1507 {
1508         int                     i;
1509         int                     off;
1510         int                     offsetFlg;
1511
1512         /* Initialize to zero */
1513         *o = 0;
1514
1515         /* Check for old version */
1516         if (AH->version < K_VERS_1_7)
1517         {
1518                 /* Prior versions wrote offsets using WriteInt */
1519                 i = ReadInt(AH);
1520                 /* -1 means not set */
1521                 if (i < 0)
1522                         return K_OFFSET_POS_NOT_SET;
1523                 else if (i == 0)
1524                         return K_OFFSET_NO_DATA;
1525
1526                 /* Cast to pgoff_t because it was written as an int. */
1527                 *o = (pgoff_t) i;
1528                 return K_OFFSET_POS_SET;
1529         }
1530
1531         /*
1532          * Read the flag indicating the state of the data pointer. Check if valid
1533          * and die if not.
1534          *
1535          * This used to be handled by a negative or zero pointer, now we use an
1536          * extra byte specifically for the state.
1537          */
1538         offsetFlg = (*AH->ReadBytePtr) (AH) & 0xFF;
1539
1540         switch (offsetFlg)
1541         {
1542                 case K_OFFSET_POS_NOT_SET:
1543                 case K_OFFSET_NO_DATA:
1544                 case K_OFFSET_POS_SET:
1545
1546                         break;
1547
1548                 default:
1549                         die_horribly(AH, modulename, "unexpected data offset flag %d\n", offsetFlg);
1550         }
1551
1552         /*
1553          * Read the bytes
1554          */
1555         for (off = 0; off < AH->offSize; off++)
1556         {
1557                 if (off < sizeof(pgoff_t))
1558                         *o |= ((pgoff_t) ((*AH->ReadBytePtr) (AH))) << (off * 8);
1559                 else
1560                 {
1561                         if ((*AH->ReadBytePtr) (AH) != 0)
1562                                 die_horribly(AH, modulename, "file offset in dump file is too large\n");
1563                 }
1564         }
1565
1566         return offsetFlg;
1567 }
1568
1569 size_t
1570 WriteInt(ArchiveHandle *AH, int i)
1571 {
1572         int                     b;
1573
1574         /*
1575          * This is a bit yucky, but I don't want to make the binary format very
1576          * dependent on representation, and not knowing much about it, I write out
1577          * a sign byte. If you change this, don't forget to change the file
1578          * version #, and modify readInt to read the new format AS WELL AS the old
1579          * formats.
1580          */
1581
1582         /* SIGN byte */
1583         if (i < 0)
1584         {
1585                 (*AH->WriteBytePtr) (AH, 1);
1586                 i = -i;
1587         }
1588         else
1589                 (*AH->WriteBytePtr) (AH, 0);
1590
1591         for (b = 0; b < AH->intSize; b++)
1592         {
1593                 (*AH->WriteBytePtr) (AH, i & 0xFF);
1594                 i >>= 8;
1595         }
1596
1597         return AH->intSize + 1;
1598 }
1599
1600 int
1601 ReadInt(ArchiveHandle *AH)
1602 {
1603         int                     res = 0;
1604         int                     bv,
1605                                 b;
1606         int                     sign = 0;               /* Default positive */
1607         int                     bitShift = 0;
1608
1609         if (AH->version > K_VERS_1_0)
1610                 /* Read a sign byte */
1611                 sign = (*AH->ReadBytePtr) (AH);
1612
1613         for (b = 0; b < AH->intSize; b++)
1614         {
1615                 bv = (*AH->ReadBytePtr) (AH) & 0xFF;
1616                 if (bv != 0)
1617                         res = res + (bv << bitShift);
1618                 bitShift += 8;
1619         }
1620
1621         if (sign)
1622                 res = -res;
1623
1624         return res;
1625 }
1626
1627 size_t
1628 WriteStr(ArchiveHandle *AH, const char *c)
1629 {
1630         size_t          res;
1631
1632         if (c)
1633         {
1634                 res = WriteInt(AH, strlen(c));
1635                 res += (*AH->WriteBufPtr) (AH, c, strlen(c));
1636         }
1637         else
1638                 res = WriteInt(AH, -1);
1639
1640         return res;
1641 }
1642
1643 char *
1644 ReadStr(ArchiveHandle *AH)
1645 {
1646         char       *buf;
1647         int                     l;
1648
1649         l = ReadInt(AH);
1650         if (l < 0)
1651                 buf = NULL;
1652         else
1653         {
1654                 buf = (char *) malloc(l + 1);
1655                 if (!buf)
1656                         die_horribly(AH, modulename, "out of memory\n");
1657
1658                 if ((*AH->ReadBufPtr) (AH, (void *) buf, l) != l)
1659                         die_horribly(AH, modulename, "unexpected end of file\n");
1660
1661                 buf[l] = '\0';
1662         }
1663
1664         return buf;
1665 }
1666
1667 static int
1668 _discoverArchiveFormat(ArchiveHandle *AH)
1669 {
1670         FILE       *fh;
1671         char            sig[6];                 /* More than enough */
1672         size_t          cnt;
1673         int                     wantClose = 0;
1674
1675 #if 0
1676         write_msg(modulename, "attempting to ascertain archive format\n");
1677 #endif
1678
1679         if (AH->lookahead)
1680                 free(AH->lookahead);
1681
1682         AH->lookaheadSize = 512;
1683         AH->lookahead = calloc(1, 512);
1684         AH->lookaheadLen = 0;
1685         AH->lookaheadPos = 0;
1686
1687         if (AH->fSpec)
1688         {
1689                 wantClose = 1;
1690                 fh = fopen(AH->fSpec, PG_BINARY_R);
1691                 if (!fh)
1692                         die_horribly(AH, modulename, "could not open input file \"%s\": %s\n",
1693                                                  AH->fSpec, strerror(errno));
1694         }
1695         else
1696         {
1697                 fh = stdin;
1698                 if (!fh)
1699                         die_horribly(AH, modulename, "could not open input file: %s\n",
1700                                                  strerror(errno));
1701         }
1702
1703         cnt = fread(sig, 1, 5, fh);
1704
1705         if (cnt != 5)
1706         {
1707                 if (ferror(fh))
1708                         die_horribly(AH, modulename, "could not read input file: %s\n", strerror(errno));
1709                 else
1710                         die_horribly(AH, modulename, "input file is too short (read %lu, expected 5)\n",
1711                                                  (unsigned long) cnt);
1712         }
1713
1714         /* Save it, just in case we need it later */
1715         strncpy(&AH->lookahead[0], sig, 5);
1716         AH->lookaheadLen = 5;
1717
1718         if (strncmp(sig, "PGDMP", 5) == 0)
1719         {
1720                 AH->vmaj = fgetc(fh);
1721                 AH->vmin = fgetc(fh);
1722
1723                 /* Save these too... */
1724                 AH->lookahead[AH->lookaheadLen++] = AH->vmaj;
1725                 AH->lookahead[AH->lookaheadLen++] = AH->vmin;
1726
1727                 /* Check header version; varies from V1.0 */
1728                 if (AH->vmaj > 1 || ((AH->vmaj == 1) && (AH->vmin > 0)))                /* Version > 1.0 */
1729                 {
1730                         AH->vrev = fgetc(fh);
1731                         AH->lookahead[AH->lookaheadLen++] = AH->vrev;
1732                 }
1733                 else
1734                         AH->vrev = 0;
1735
1736                 /* Make a convenient integer <maj><min><rev>00 */
1737                 AH->version = ((AH->vmaj * 256 + AH->vmin) * 256 + AH->vrev) * 256 + 0;
1738
1739                 AH->intSize = fgetc(fh);
1740                 AH->lookahead[AH->lookaheadLen++] = AH->intSize;
1741
1742                 if (AH->version >= K_VERS_1_7)
1743                 {
1744                         AH->offSize = fgetc(fh);
1745                         AH->lookahead[AH->lookaheadLen++] = AH->offSize;
1746                 }
1747                 else
1748                         AH->offSize = AH->intSize;
1749
1750                 AH->format = fgetc(fh);
1751                 AH->lookahead[AH->lookaheadLen++] = AH->format;
1752         }
1753         else
1754         {
1755                 /*
1756                  * *Maybe* we have a tar archive format file... So, read first 512
1757                  * byte header...
1758                  */
1759                 cnt = fread(&AH->lookahead[AH->lookaheadLen], 1, 512 - AH->lookaheadLen, fh);
1760                 AH->lookaheadLen += cnt;
1761
1762                 if (AH->lookaheadLen != 512)
1763                         die_horribly(AH, modulename, "input file does not appear to be a valid archive (too short?)\n");
1764
1765                 if (!isValidTarHeader(AH->lookahead))
1766                         die_horribly(AH, modulename, "input file does not appear to be a valid archive\n");
1767
1768                 AH->format = archTar;
1769         }
1770
1771         /* If we can't seek, then mark the header as read */
1772         if (fseeko(fh, 0, SEEK_SET) != 0)
1773         {
1774                 /*
1775                  * NOTE: Formats that use the lookahead buffer can unset this in their
1776                  * Init routine.
1777                  */
1778                 AH->readHeader = 1;
1779         }
1780         else
1781                 AH->lookaheadLen = 0;   /* Don't bother since we've reset the file */
1782
1783 #if 0
1784         write_msg(modulename, "read %lu bytes into lookahead buffer\n",
1785                           (unsigned long) AH->lookaheadLen);
1786 #endif
1787
1788         /* Close the file */
1789         if (wantClose)
1790                 if (fclose(fh) != 0)
1791                         die_horribly(AH, modulename, "could not close input file: %s\n",
1792                                                  strerror(errno));
1793
1794         return AH->format;
1795 }
1796
1797
1798 /*
1799  * Allocate an archive handle
1800  */
1801 static ArchiveHandle *
1802 _allocAH(const char *FileSpec, const ArchiveFormat fmt,
1803                  const int compression, ArchiveMode mode)
1804 {
1805         ArchiveHandle *AH;
1806
1807 #if 0
1808         write_msg(modulename, "allocating AH for %s, format %d\n", FileSpec, fmt);
1809 #endif
1810
1811         AH = (ArchiveHandle *) calloc(1, sizeof(ArchiveHandle));
1812         if (!AH)
1813                 die_horribly(AH, modulename, "out of memory\n");
1814
1815         /* AH->debugLevel = 100; */
1816
1817         AH->vmaj = K_VERS_MAJOR;
1818         AH->vmin = K_VERS_MINOR;
1819         AH->vrev = K_VERS_REV;
1820
1821         /* initialize for backwards compatible string processing */
1822         AH->public.encoding = 0;        /* PG_SQL_ASCII */
1823         AH->public.std_strings = false;
1824
1825         /* sql error handling */
1826         AH->public.exit_on_error = true;
1827         AH->public.n_errors = 0;
1828
1829         AH->createDate = time(NULL);
1830
1831         AH->intSize = sizeof(int);
1832         AH->offSize = sizeof(pgoff_t);
1833         if (FileSpec)
1834         {
1835                 AH->fSpec = strdup(FileSpec);
1836
1837                 /*
1838                  * Not used; maybe later....
1839                  *
1840                  * AH->workDir = strdup(FileSpec); for(i=strlen(FileSpec) ; i > 0 ;
1841                  * i--) if (AH->workDir[i-1] == '/')
1842                  */
1843         }
1844         else
1845                 AH->fSpec = NULL;
1846
1847         AH->currUser = NULL;            /* unknown */
1848         AH->currSchema = NULL;          /* ditto */
1849         AH->currTablespace = NULL;      /* ditto */
1850         AH->currWithOids = -1;          /* force SET */
1851
1852         AH->toc = (TocEntry *) calloc(1, sizeof(TocEntry));
1853         if (!AH->toc)
1854                 die_horribly(AH, modulename, "out of memory\n");
1855
1856         AH->toc->next = AH->toc;
1857         AH->toc->prev = AH->toc;
1858
1859         AH->mode = mode;
1860         AH->compression = compression;
1861
1862         AH->pgCopyBuf = createPQExpBuffer();
1863         AH->sqlBuf = createPQExpBuffer();
1864
1865         /* Open stdout with no compression for AH output handle */
1866         AH->gzOut = 0;
1867         AH->OF = stdout;
1868
1869         /*
1870          * On Windows, we need to use binary mode to read/write non-text archive
1871          * formats.  Force stdin/stdout into binary mode if that is what we are
1872          * using.
1873          */
1874 #ifdef WIN32
1875         if (fmt != archNull &&
1876                 (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0))
1877         {
1878                 if (mode == archModeWrite)
1879                         setmode(fileno(stdout), O_BINARY);
1880                 else
1881                         setmode(fileno(stdin), O_BINARY);
1882         }
1883 #endif
1884
1885         if (fmt == archUnknown)
1886                 AH->format = _discoverArchiveFormat(AH);
1887         else
1888                 AH->format = fmt;
1889
1890         AH->promptPassword = TRI_DEFAULT;
1891
1892         switch (AH->format)
1893         {
1894                 case archCustom:
1895                         InitArchiveFmt_Custom(AH);
1896                         break;
1897
1898                 case archFiles:
1899                         InitArchiveFmt_Files(AH);
1900                         break;
1901
1902                 case archNull:
1903                         InitArchiveFmt_Null(AH);
1904                         break;
1905
1906                 case archTar:
1907                         InitArchiveFmt_Tar(AH);
1908                         break;
1909
1910                 default:
1911                         die_horribly(AH, modulename, "unrecognized file format \"%d\"\n", fmt);
1912         }
1913
1914         return AH;
1915 }
1916
1917
1918 void
1919 WriteDataChunks(ArchiveHandle *AH)
1920 {
1921         TocEntry   *te;
1922         StartDataPtr startPtr;
1923         EndDataPtr      endPtr;
1924
1925         for (te = AH->toc->next; te != AH->toc; te = te->next)
1926         {
1927                 if (te->dataDumper != NULL)
1928                 {
1929                         AH->currToc = te;
1930                         /* printf("Writing data for %d (%x)\n", te->id, te); */
1931
1932                         if (strcmp(te->desc, "BLOBS") == 0)
1933                         {
1934                                 startPtr = AH->StartBlobsPtr;
1935                                 endPtr = AH->EndBlobsPtr;
1936                         }
1937                         else
1938                         {
1939                                 startPtr = AH->StartDataPtr;
1940                                 endPtr = AH->EndDataPtr;
1941                         }
1942
1943                         if (startPtr != NULL)
1944                                 (*startPtr) (AH, te);
1945
1946                         /*
1947                          * printf("Dumper arg for %d is %x\n", te->id, te->dataDumperArg);
1948                          */
1949
1950                         /*
1951                          * The user-provided DataDumper routine needs to call
1952                          * AH->WriteData
1953                          */
1954                         (*te->dataDumper) ((Archive *) AH, te->dataDumperArg);
1955
1956                         if (endPtr != NULL)
1957                                 (*endPtr) (AH, te);
1958                         AH->currToc = NULL;
1959                 }
1960         }
1961 }
1962
1963 void
1964 WriteToc(ArchiveHandle *AH)
1965 {
1966         TocEntry   *te;
1967         char            workbuf[32];
1968         int                     i;
1969
1970         /* printf("%d TOC Entries to save\n", AH->tocCount); */
1971
1972         WriteInt(AH, AH->tocCount);
1973
1974         for (te = AH->toc->next; te != AH->toc; te = te->next)
1975         {
1976                 WriteInt(AH, te->dumpId);
1977                 WriteInt(AH, te->dataDumper ? 1 : 0);
1978
1979                 /* OID is recorded as a string for historical reasons */
1980                 sprintf(workbuf, "%u", te->catalogId.tableoid);
1981                 WriteStr(AH, workbuf);
1982                 sprintf(workbuf, "%u", te->catalogId.oid);
1983                 WriteStr(AH, workbuf);
1984
1985                 WriteStr(AH, te->tag);
1986                 WriteStr(AH, te->desc);
1987                 WriteInt(AH, te->section);
1988                 WriteStr(AH, te->defn);
1989                 WriteStr(AH, te->dropStmt);
1990                 WriteStr(AH, te->copyStmt);
1991                 WriteStr(AH, te->namespace);
1992                 WriteStr(AH, te->tablespace);
1993                 WriteStr(AH, te->owner);
1994                 WriteStr(AH, te->withOids ? "true" : "false");
1995
1996                 /* Dump list of dependencies */
1997                 for (i = 0; i < te->nDeps; i++)
1998                 {
1999                         sprintf(workbuf, "%d", te->dependencies[i]);
2000                         WriteStr(AH, workbuf);
2001                 }
2002                 WriteStr(AH, NULL);             /* Terminate List */
2003
2004                 if (AH->WriteExtraTocPtr)
2005                         (*AH->WriteExtraTocPtr) (AH, te);
2006         }
2007 }
2008
2009 void
2010 ReadToc(ArchiveHandle *AH)
2011 {
2012         int                     i;
2013         char       *tmp;
2014         DumpId     *deps;
2015         int                     depIdx;
2016         int                     depSize;
2017         TocEntry   *te;
2018
2019         AH->tocCount = ReadInt(AH);
2020         AH->maxDumpId = 0;
2021
2022         for (i = 0; i < AH->tocCount; i++)
2023         {
2024                 te = (TocEntry *) calloc(1, sizeof(TocEntry));
2025                 te->dumpId = ReadInt(AH);
2026
2027                 if (te->dumpId > AH->maxDumpId)
2028                         AH->maxDumpId = te->dumpId;
2029
2030                 /* Sanity check */
2031                 if (te->dumpId <= 0)
2032                         die_horribly(AH, modulename,
2033                                            "entry ID %d out of range -- perhaps a corrupt TOC\n",
2034                                                  te->dumpId);
2035
2036                 te->hadDumper = ReadInt(AH);
2037
2038                 if (AH->version >= K_VERS_1_8)
2039                 {
2040                         tmp = ReadStr(AH);
2041                         sscanf(tmp, "%u", &te->catalogId.tableoid);
2042                         free(tmp);
2043                 }
2044                 else
2045                         te->catalogId.tableoid = InvalidOid;
2046                 tmp = ReadStr(AH);
2047                 sscanf(tmp, "%u", &te->catalogId.oid);
2048                 free(tmp);
2049
2050                 te->tag = ReadStr(AH);
2051                 te->desc = ReadStr(AH);
2052
2053                 if (AH->version >= K_VERS_1_11)
2054                 {
2055                         te->section = ReadInt(AH);
2056                 }
2057                 else
2058                 {
2059                         /*
2060                          * rules for pre-8.4 archives wherein pg_dump hasn't classified
2061                          * the entries into sections
2062                          */
2063                         if (strcmp(te->desc, "COMMENT") == 0 ||
2064                                 strcmp(te->desc, "ACL") == 0)
2065                                 te->section = SECTION_NONE;
2066                         else if (strcmp(te->desc, "TABLE DATA") == 0 ||
2067                                          strcmp(te->desc, "BLOBS") == 0 ||
2068                                          strcmp(te->desc, "BLOB COMMENTS") == 0)
2069                                 te->section = SECTION_DATA;
2070                         else if (strcmp(te->desc, "CONSTRAINT") == 0 ||
2071                                          strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
2072                                          strcmp(te->desc, "FK CONSTRAINT") == 0 ||
2073                                          strcmp(te->desc, "INDEX") == 0 ||
2074                                          strcmp(te->desc, "RULE") == 0 ||
2075                                          strcmp(te->desc, "TRIGGER") == 0)
2076                                 te->section = SECTION_POST_DATA;
2077                         else
2078                                 te->section = SECTION_PRE_DATA;
2079                 }
2080
2081                 te->defn = ReadStr(AH);
2082                 te->dropStmt = ReadStr(AH);
2083
2084                 if (AH->version >= K_VERS_1_3)
2085                         te->copyStmt = ReadStr(AH);
2086
2087                 if (AH->version >= K_VERS_1_6)
2088                         te->namespace = ReadStr(AH);
2089
2090                 if (AH->version >= K_VERS_1_10)
2091                         te->tablespace = ReadStr(AH);
2092
2093                 te->owner = ReadStr(AH);
2094                 if (AH->version >= K_VERS_1_9)
2095                 {
2096                         if (strcmp(ReadStr(AH), "true") == 0)
2097                                 te->withOids = true;
2098                         else
2099                                 te->withOids = false;
2100                 }
2101                 else
2102                         te->withOids = true;
2103
2104                 /* Read TOC entry dependencies */
2105                 if (AH->version >= K_VERS_1_5)
2106                 {
2107                         depSize = 100;
2108                         deps = (DumpId *) malloc(sizeof(DumpId) * depSize);
2109                         depIdx = 0;
2110                         for (;;)
2111                         {
2112                                 tmp = ReadStr(AH);
2113                                 if (!tmp)
2114                                         break;          /* end of list */
2115                                 if (depIdx >= depSize)
2116                                 {
2117                                         depSize *= 2;
2118                                         deps = (DumpId *) realloc(deps, sizeof(DumpId) * depSize);
2119                                 }
2120                                 sscanf(tmp, "%d", &deps[depIdx]);
2121                                 free(tmp);
2122                                 depIdx++;
2123                         }
2124
2125                         if (depIdx > 0)         /* We have a non-null entry */
2126                         {
2127                                 deps = (DumpId *) realloc(deps, sizeof(DumpId) * depIdx);
2128                                 te->dependencies = deps;
2129                                 te->nDeps = depIdx;
2130                         }
2131                         else
2132                         {
2133                                 free(deps);
2134                                 te->dependencies = NULL;
2135                                 te->nDeps = 0;
2136                         }
2137                 }
2138                 else
2139                 {
2140                         te->dependencies = NULL;
2141                         te->nDeps = 0;
2142                 }
2143
2144                 if (AH->ReadExtraTocPtr)
2145                         (*AH->ReadExtraTocPtr) (AH, te);
2146
2147                 ahlog(AH, 3, "read TOC entry %d (ID %d) for %s %s\n",
2148                           i, te->dumpId, te->desc, te->tag);
2149
2150                 /* link completed entry into TOC circular list */
2151                 te->prev = AH->toc->prev;
2152                 AH->toc->prev->next = te;
2153                 AH->toc->prev = te;
2154                 te->next = AH->toc;
2155
2156                 /* special processing immediately upon read for some items */
2157                 if (strcmp(te->desc, "ENCODING") == 0)
2158                         processEncodingEntry(AH, te);
2159                 else if (strcmp(te->desc, "STDSTRINGS") == 0)
2160                         processStdStringsEntry(AH, te);
2161         }
2162 }
2163
2164 static void
2165 processEncodingEntry(ArchiveHandle *AH, TocEntry *te)
2166 {
2167         /* te->defn should have the form SET client_encoding = 'foo'; */
2168         char       *defn = strdup(te->defn);
2169         char       *ptr1;
2170         char       *ptr2 = NULL;
2171         int                     encoding;
2172
2173         ptr1 = strchr(defn, '\'');
2174         if (ptr1)
2175                 ptr2 = strchr(++ptr1, '\'');
2176         if (ptr2)
2177         {
2178                 *ptr2 = '\0';
2179                 encoding = pg_char_to_encoding(ptr1);
2180                 if (encoding < 0)
2181                         die_horribly(AH, modulename, "unrecognized encoding \"%s\"\n",
2182                                                  ptr1);
2183                 AH->public.encoding = encoding;
2184         }
2185         else
2186                 die_horribly(AH, modulename, "invalid ENCODING item: %s\n",
2187                                          te->defn);
2188
2189         free(defn);
2190 }
2191
2192 static void
2193 processStdStringsEntry(ArchiveHandle *AH, TocEntry *te)
2194 {
2195         /* te->defn should have the form SET standard_conforming_strings = 'x'; */
2196         char       *ptr1;
2197
2198         ptr1 = strchr(te->defn, '\'');
2199         if (ptr1 && strncmp(ptr1, "'on'", 4) == 0)
2200                 AH->public.std_strings = true;
2201         else if (ptr1 && strncmp(ptr1, "'off'", 5) == 0)
2202                 AH->public.std_strings = false;
2203         else
2204                 die_horribly(AH, modulename, "invalid STDSTRINGS item: %s\n",
2205                                          te->defn);
2206 }
2207
2208 static teReqs
2209 _tocEntryRequired(TocEntry *te, RestoreOptions *ropt, bool include_acls)
2210 {
2211         teReqs          res = REQ_ALL;
2212
2213         /* ENCODING and STDSTRINGS items are dumped specially, so always reject */
2214         if (strcmp(te->desc, "ENCODING") == 0 ||
2215                 strcmp(te->desc, "STDSTRINGS") == 0)
2216                 return 0;
2217
2218         /* If it's an ACL, maybe ignore it */
2219         if ((!include_acls || ropt->aclsSkip) && strcmp(te->desc, "ACL") == 0)
2220                 return 0;
2221
2222         if (!ropt->create && strcmp(te->desc, "DATABASE") == 0)
2223                 return 0;
2224
2225         /* Check options for selective dump/restore */
2226         if (ropt->schemaNames)
2227         {
2228                 /* If no namespace is specified, it means all. */
2229                 if (!te->namespace)
2230                         return 0;
2231                 if (strcmp(ropt->schemaNames, te->namespace) != 0)
2232                         return 0;
2233         }
2234
2235         if (ropt->selTypes)
2236         {
2237                 if (strcmp(te->desc, "TABLE") == 0 ||
2238                         strcmp(te->desc, "TABLE DATA") == 0)
2239                 {
2240                         if (!ropt->selTable)
2241                                 return 0;
2242                         if (ropt->tableNames && strcmp(ropt->tableNames, te->tag) != 0)
2243                                 return 0;
2244                 }
2245                 else if (strcmp(te->desc, "INDEX") == 0)
2246                 {
2247                         if (!ropt->selIndex)
2248                                 return 0;
2249                         if (ropt->indexNames && strcmp(ropt->indexNames, te->tag) != 0)
2250                                 return 0;
2251                 }
2252                 else if (strcmp(te->desc, "FUNCTION") == 0)
2253                 {
2254                         if (!ropt->selFunction)
2255                                 return 0;
2256                         if (ropt->functionNames && strcmp(ropt->functionNames, te->tag) != 0)
2257                                 return 0;
2258                 }
2259                 else if (strcmp(te->desc, "TRIGGER") == 0)
2260                 {
2261                         if (!ropt->selTrigger)
2262                                 return 0;
2263                         if (ropt->triggerNames && strcmp(ropt->triggerNames, te->tag) != 0)
2264                                 return 0;
2265                 }
2266                 else
2267                         return 0;
2268         }
2269
2270         /*
2271          * Check if we had a dataDumper. Indicates if the entry is schema or data
2272          */
2273         if (!te->hadDumper)
2274         {
2275                 /*
2276                  * Special Case: If 'SEQUENCE SET' then it is considered a data entry
2277                  */
2278                 if (strcmp(te->desc, "SEQUENCE SET") == 0)
2279                         res = res & REQ_DATA;
2280                 else
2281                         res = res & ~REQ_DATA;
2282         }
2283
2284         /*
2285          * Special case: <Init> type with <Max OID> tag; this is obsolete and we
2286          * always ignore it.
2287          */
2288         if ((strcmp(te->desc, "<Init>") == 0) && (strcmp(te->tag, "Max OID") == 0))
2289                 return 0;
2290
2291         /* Mask it if we only want schema */
2292         if (ropt->schemaOnly)
2293                 res = res & REQ_SCHEMA;
2294
2295         /* Mask it we only want data */
2296         if (ropt->dataOnly)
2297                 res = res & REQ_DATA;
2298
2299         /* Mask it if we don't have a schema contribution */
2300         if (!te->defn || strlen(te->defn) == 0)
2301                 res = res & ~REQ_SCHEMA;
2302
2303         /* Finally, if there's a per-ID filter, limit based on that as well */
2304         if (ropt->idWanted && !ropt->idWanted[te->dumpId - 1])
2305                 return 0;
2306
2307         return res;
2308 }
2309
2310 /*
2311  * Issue SET commands for parameters that we want to have set the same way
2312  * at all times during execution of a restore script.
2313  */
2314 static void
2315 _doSetFixedOutputState(ArchiveHandle *AH)
2316 {
2317         /* Disable statement_timeout in archive for pg_restore/psql      */
2318         ahprintf(AH, "SET statement_timeout = 0;\n");
2319
2320         /* Select the correct character set encoding */
2321         ahprintf(AH, "SET client_encoding = '%s';\n",
2322                          pg_encoding_to_char(AH->public.encoding));
2323
2324         /* Select the correct string literal syntax */
2325         ahprintf(AH, "SET standard_conforming_strings = %s;\n",
2326                          AH->public.std_strings ? "on" : "off");
2327
2328         /* Select the role to be used during restore */
2329         if (AH->ropt && AH->ropt->use_role)
2330                 ahprintf(AH, "SET ROLE %s;\n", fmtId(AH->ropt->use_role));
2331
2332         /* Make sure function checking is disabled */
2333         ahprintf(AH, "SET check_function_bodies = false;\n");
2334
2335         /* Avoid annoying notices etc */
2336         ahprintf(AH, "SET client_min_messages = warning;\n");
2337         if (!AH->public.std_strings)
2338                 ahprintf(AH, "SET escape_string_warning = off;\n");
2339
2340         ahprintf(AH, "\n");
2341 }
2342
2343 /*
2344  * Issue a SET SESSION AUTHORIZATION command.  Caller is responsible
2345  * for updating state if appropriate.  If user is NULL or an empty string,
2346  * the specification DEFAULT will be used.
2347  */
2348 static void
2349 _doSetSessionAuth(ArchiveHandle *AH, const char *user)
2350 {
2351         PQExpBuffer cmd = createPQExpBuffer();
2352
2353         appendPQExpBuffer(cmd, "SET SESSION AUTHORIZATION ");
2354
2355         /*
2356          * SQL requires a string literal here.  Might as well be correct.
2357          */
2358         if (user && *user)
2359                 appendStringLiteralAHX(cmd, user, AH);
2360         else
2361                 appendPQExpBuffer(cmd, "DEFAULT");
2362         appendPQExpBuffer(cmd, ";");
2363
2364         if (RestoringToDB(AH))
2365         {
2366                 PGresult   *res;
2367
2368                 res = PQexec(AH->connection, cmd->data);
2369
2370                 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
2371                         /* NOT warn_or_die_horribly... use -O instead to skip this. */
2372                         die_horribly(AH, modulename, "could not set session user to \"%s\": %s",
2373                                                  user, PQerrorMessage(AH->connection));
2374
2375                 PQclear(res);
2376         }
2377         else
2378                 ahprintf(AH, "%s\n\n", cmd->data);
2379
2380         destroyPQExpBuffer(cmd);
2381 }
2382
2383
2384 /*
2385  * Issue a SET default_with_oids command.  Caller is responsible
2386  * for updating state if appropriate.
2387  */
2388 static void
2389 _doSetWithOids(ArchiveHandle *AH, const bool withOids)
2390 {
2391         PQExpBuffer cmd = createPQExpBuffer();
2392
2393         appendPQExpBuffer(cmd, "SET default_with_oids = %s;", withOids ?
2394                                           "true" : "false");
2395
2396         if (RestoringToDB(AH))
2397         {
2398                 PGresult   *res;
2399
2400                 res = PQexec(AH->connection, cmd->data);
2401
2402                 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
2403                         warn_or_die_horribly(AH, modulename,
2404                                                                  "could not set default_with_oids: %s",
2405                                                                  PQerrorMessage(AH->connection));
2406
2407                 PQclear(res);
2408         }
2409         else
2410                 ahprintf(AH, "%s\n\n", cmd->data);
2411
2412         destroyPQExpBuffer(cmd);
2413 }
2414
2415
2416 /*
2417  * Issue the commands to connect to the specified database.
2418  *
2419  * If we're currently restoring right into a database, this will
2420  * actually establish a connection. Otherwise it puts a \connect into
2421  * the script output.
2422  *
2423  * NULL dbname implies reconnecting to the current DB (pretty useless).
2424  */
2425 static void
2426 _reconnectToDB(ArchiveHandle *AH, const char *dbname)
2427 {
2428         if (RestoringToDB(AH))
2429                 ReconnectToServer(AH, dbname, NULL);
2430         else
2431         {
2432                 PQExpBuffer qry = createPQExpBuffer();
2433
2434                 appendPQExpBuffer(qry, "\\connect %s\n\n",
2435                                                   dbname ? fmtId(dbname) : "-");
2436                 ahprintf(AH, "%s", qry->data);
2437                 destroyPQExpBuffer(qry);
2438         }
2439
2440         /*
2441          * NOTE: currUser keeps track of what the imaginary session user in our
2442          * script is.  It's now effectively reset to the original userID.
2443          */
2444         if (AH->currUser)
2445                 free(AH->currUser);
2446         AH->currUser = NULL;
2447
2448         /* don't assume we still know the output schema, tablespace, etc either */
2449         if (AH->currSchema)
2450                 free(AH->currSchema);
2451         AH->currSchema = NULL;
2452         if (AH->currTablespace)
2453                 free(AH->currTablespace);
2454         AH->currTablespace = NULL;
2455         AH->currWithOids = -1;
2456
2457         /* re-establish fixed state */
2458         _doSetFixedOutputState(AH);
2459 }
2460
2461 /*
2462  * Become the specified user, and update state to avoid redundant commands
2463  *
2464  * NULL or empty argument is taken to mean restoring the session default
2465  */
2466 static void
2467 _becomeUser(ArchiveHandle *AH, const char *user)
2468 {
2469         if (!user)
2470                 user = "";                              /* avoid null pointers */
2471
2472         if (AH->currUser && strcmp(AH->currUser, user) == 0)
2473                 return;                                 /* no need to do anything */
2474
2475         _doSetSessionAuth(AH, user);
2476
2477         /*
2478          * NOTE: currUser keeps track of what the imaginary session user in our
2479          * script is
2480          */
2481         if (AH->currUser)
2482                 free(AH->currUser);
2483         AH->currUser = strdup(user);
2484 }
2485
2486 /*
2487  * Become the owner of the the given TOC entry object.  If
2488  * changes in ownership are not allowed, this doesn't do anything.
2489  */
2490 static void
2491 _becomeOwner(ArchiveHandle *AH, TocEntry *te)
2492 {
2493         if (AH->ropt && (AH->ropt->noOwner || !AH->ropt->use_setsessauth))
2494                 return;
2495
2496         _becomeUser(AH, te->owner);
2497 }
2498
2499
2500 /*
2501  * Set the proper default_with_oids value for the table.
2502  */
2503 static void
2504 _setWithOids(ArchiveHandle *AH, TocEntry *te)
2505 {
2506         if (AH->currWithOids != te->withOids)
2507         {
2508                 _doSetWithOids(AH, te->withOids);
2509                 AH->currWithOids = te->withOids;
2510         }
2511 }
2512
2513
2514 /*
2515  * Issue the commands to select the specified schema as the current schema
2516  * in the target database.
2517  */
2518 static void
2519 _selectOutputSchema(ArchiveHandle *AH, const char *schemaName)
2520 {
2521         PQExpBuffer qry;
2522
2523         if (!schemaName || *schemaName == '\0' ||
2524                 (AH->currSchema && strcmp(AH->currSchema, schemaName) == 0))
2525                 return;                                 /* no need to do anything */
2526
2527         qry = createPQExpBuffer();
2528
2529         appendPQExpBuffer(qry, "SET search_path = %s",
2530                                           fmtId(schemaName));
2531         if (strcmp(schemaName, "pg_catalog") != 0)
2532                 appendPQExpBuffer(qry, ", pg_catalog");
2533
2534         if (RestoringToDB(AH))
2535         {
2536                 PGresult   *res;
2537
2538                 res = PQexec(AH->connection, qry->data);
2539
2540                 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
2541                         warn_or_die_horribly(AH, modulename,
2542                                                                  "could not set search_path to \"%s\": %s",
2543                                                                  schemaName, PQerrorMessage(AH->connection));
2544
2545                 PQclear(res);
2546         }
2547         else
2548                 ahprintf(AH, "%s;\n\n", qry->data);
2549
2550         if (AH->currSchema)
2551                 free(AH->currSchema);
2552         AH->currSchema = strdup(schemaName);
2553
2554         destroyPQExpBuffer(qry);
2555 }
2556
2557 /*
2558  * Issue the commands to select the specified tablespace as the current one
2559  * in the target database.
2560  */
2561 static void
2562 _selectTablespace(ArchiveHandle *AH, const char *tablespace)
2563 {
2564         PQExpBuffer qry;
2565         const char *want,
2566                            *have;
2567
2568         /* do nothing in --no-tablespaces mode */
2569         if (AH->ropt->noTablespace)
2570                 return;
2571
2572         have = AH->currTablespace;
2573         want = tablespace;
2574
2575         /* no need to do anything for non-tablespace object */
2576         if (!want)
2577                 return;
2578
2579         if (have && strcmp(want, have) == 0)
2580                 return;                                 /* no need to do anything */
2581
2582         qry = createPQExpBuffer();
2583
2584         if (strcmp(want, "") == 0)
2585         {
2586                 /* We want the tablespace to be the database's default */
2587                 appendPQExpBuffer(qry, "SET default_tablespace = ''");
2588         }
2589         else
2590         {
2591                 /* We want an explicit tablespace */
2592                 appendPQExpBuffer(qry, "SET default_tablespace = %s", fmtId(want));
2593         }
2594
2595         if (RestoringToDB(AH))
2596         {
2597                 PGresult   *res;
2598
2599                 res = PQexec(AH->connection, qry->data);
2600
2601                 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
2602                         warn_or_die_horribly(AH, modulename,
2603                                                                  "could not set default_tablespace to %s: %s",
2604                                                                  fmtId(want), PQerrorMessage(AH->connection));
2605
2606                 PQclear(res);
2607         }
2608         else
2609                 ahprintf(AH, "%s;\n\n", qry->data);
2610
2611         if (AH->currTablespace)
2612                 free(AH->currTablespace);
2613         AH->currTablespace = strdup(want);
2614
2615         destroyPQExpBuffer(qry);
2616 }
2617
2618 /*
2619  * Extract an object description for a TOC entry, and append it to buf.
2620  *
2621  * This is not quite as general as it may seem, since it really only
2622  * handles constructing the right thing to put into ALTER ... OWNER TO.
2623  *
2624  * The whole thing is pretty grotty, but we are kind of stuck since the
2625  * information used is all that's available in older dump files.
2626  */
2627 static void
2628 _getObjectDescription(PQExpBuffer buf, TocEntry *te, ArchiveHandle *AH)
2629 {
2630         const char *type = te->desc;
2631
2632         /* Use ALTER TABLE for views and sequences */
2633         if (strcmp(type, "VIEW") == 0 || strcmp(type, "SEQUENCE") == 0)
2634                 type = "TABLE";
2635
2636         /* objects named by a schema and name */
2637         if (strcmp(type, "CONVERSION") == 0 ||
2638                 strcmp(type, "DOMAIN") == 0 ||
2639                 strcmp(type, "TABLE") == 0 ||
2640                 strcmp(type, "TYPE") == 0 ||
2641                 strcmp(type, "TEXT SEARCH DICTIONARY") == 0 ||
2642                 strcmp(type, "TEXT SEARCH CONFIGURATION") == 0)
2643         {
2644                 appendPQExpBuffer(buf, "%s ", type);
2645                 if (te->namespace && te->namespace[0])  /* is null pre-7.3 */
2646                         appendPQExpBuffer(buf, "%s.", fmtId(te->namespace));
2647
2648                 /*
2649                  * Pre-7.3 pg_dump would sometimes (not always) put a fmtId'd name
2650                  * into te->tag for an index. This check is heuristic, so make its
2651                  * scope as narrow as possible.
2652                  */
2653                 if (AH->version < K_VERS_1_7 &&
2654                         te->tag[0] == '"' &&
2655                         te->tag[strlen(te->tag) - 1] == '"' &&
2656                         strcmp(type, "INDEX") == 0)
2657                         appendPQExpBuffer(buf, "%s", te->tag);
2658                 else
2659                         appendPQExpBuffer(buf, "%s", fmtId(te->tag));
2660                 return;
2661         }
2662
2663         /* objects named by just a name */
2664         if (strcmp(type, "DATABASE") == 0 ||
2665                 strcmp(type, "PROCEDURAL LANGUAGE") == 0 ||
2666                 strcmp(type, "SCHEMA") == 0 ||
2667                 strcmp(type, "FOREIGN DATA WRAPPER") == 0 ||
2668                 strcmp(type, "SERVER") == 0 ||
2669                 strcmp(type, "USER MAPPING") == 0)
2670         {
2671                 appendPQExpBuffer(buf, "%s %s", type, fmtId(te->tag));
2672                 return;
2673         }
2674
2675         /*
2676          * These object types require additional decoration.  Fortunately, the
2677          * information needed is exactly what's in the DROP command.
2678          */
2679         if (strcmp(type, "AGGREGATE") == 0 ||
2680                 strcmp(type, "FUNCTION") == 0 ||
2681                 strcmp(type, "OPERATOR") == 0 ||
2682                 strcmp(type, "OPERATOR CLASS") == 0 ||
2683                 strcmp(type, "OPERATOR FAMILY") == 0)
2684         {
2685                 /* Chop "DROP " off the front and make a modifiable copy */
2686                 char       *first = strdup(te->dropStmt + 5);
2687                 char       *last;
2688
2689                 /* point to last character in string */
2690                 last = first + strlen(first) - 1;
2691
2692                 /* Strip off any ';' or '\n' at the end */
2693                 while (last >= first && (*last == '\n' || *last == ';'))
2694                         last--;
2695                 *(last + 1) = '\0';
2696
2697                 appendPQExpBufferStr(buf, first);
2698
2699                 free(first);
2700                 return;
2701         }
2702
2703         write_msg(modulename, "WARNING: don't know how to set owner for object type %s\n",
2704                           type);
2705 }
2706
2707 static void
2708 _printTocEntry(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt, bool isData, bool acl_pass)
2709 {
2710         /* ACLs are dumped only during acl pass */
2711         if (acl_pass)
2712         {
2713                 if (strcmp(te->desc, "ACL") != 0)
2714                         return;
2715         }
2716         else
2717         {
2718                 if (strcmp(te->desc, "ACL") == 0)
2719                         return;
2720         }
2721
2722         /*
2723          * Avoid dumping the public schema, as it will already be created ...
2724          * unless we are using --clean mode, in which case it's been deleted and
2725          * we'd better recreate it.  Likewise for its comment, if any.
2726          */
2727         if (!ropt->dropSchema)
2728         {
2729                 if (strcmp(te->desc, "SCHEMA") == 0 &&
2730                         strcmp(te->tag, "public") == 0)
2731                         return;
2732                 /* The comment restore would require super-user privs, so avoid it. */
2733                 if (strcmp(te->desc, "COMMENT") == 0 &&
2734                         strcmp(te->tag, "SCHEMA public") == 0)
2735                         return;
2736         }
2737
2738         /* Select owner, schema, and tablespace as necessary */
2739         _becomeOwner(AH, te);
2740         _selectOutputSchema(AH, te->namespace);
2741         _selectTablespace(AH, te->tablespace);
2742
2743         /* Set up OID mode too */
2744         if (strcmp(te->desc, "TABLE") == 0)
2745                 _setWithOids(AH, te);
2746
2747         /* Emit header comment for item */
2748         if (!AH->noTocComments)
2749         {
2750                 const char *pfx;
2751
2752                 if (isData)
2753                         pfx = "Data for ";
2754                 else
2755                         pfx = "";
2756
2757                 ahprintf(AH, "--\n");
2758                 if (AH->public.verbose)
2759                 {
2760                         ahprintf(AH, "-- TOC entry %d (class %u OID %u)\n",
2761                                          te->dumpId, te->catalogId.tableoid, te->catalogId.oid);
2762                         if (te->nDeps > 0)
2763                         {
2764                                 int                     i;
2765
2766                                 ahprintf(AH, "-- Dependencies:");
2767                                 for (i = 0; i < te->nDeps; i++)
2768                                         ahprintf(AH, " %d", te->dependencies[i]);
2769                                 ahprintf(AH, "\n");
2770                         }
2771                 }
2772                 ahprintf(AH, "-- %sName: %s; Type: %s; Schema: %s; Owner: %s",
2773                                  pfx, te->tag, te->desc,
2774                                  te->namespace ? te->namespace : "-",
2775                                  ropt->noOwner ? "-" : te->owner);
2776                 if (te->tablespace && !ropt->noTablespace)
2777                         ahprintf(AH, "; Tablespace: %s", te->tablespace);
2778                 ahprintf(AH, "\n");
2779
2780                 if (AH->PrintExtraTocPtr !=NULL)
2781                         (*AH->PrintExtraTocPtr) (AH, te);
2782                 ahprintf(AH, "--\n\n");
2783         }
2784
2785         /*
2786          * Actually print the definition.
2787          *
2788          * Really crude hack for suppressing AUTHORIZATION clause that old pg_dump
2789          * versions put into CREATE SCHEMA.  We have to do this when --no-owner
2790          * mode is selected.  This is ugly, but I see no other good way ...
2791          */
2792         if (ropt->noOwner && strcmp(te->desc, "SCHEMA") == 0)
2793         {
2794                 ahprintf(AH, "CREATE SCHEMA %s;\n\n\n", fmtId(te->tag));
2795         }
2796         else
2797         {
2798                 if (strlen(te->defn) > 0)
2799                         ahprintf(AH, "%s\n\n", te->defn);
2800         }
2801
2802         /*
2803          * If we aren't using SET SESSION AUTH to determine ownership, we must
2804          * instead issue an ALTER OWNER command.  We assume that anything without
2805          * a DROP command is not a separately ownable object.  All the categories
2806          * with DROP commands must appear in one list or the other.
2807          */
2808         if (!ropt->noOwner && !ropt->use_setsessauth &&
2809                 strlen(te->owner) > 0 && strlen(te->dropStmt) > 0)
2810         {
2811                 if (strcmp(te->desc, "AGGREGATE") == 0 ||
2812                         strcmp(te->desc, "CONVERSION") == 0 ||
2813                         strcmp(te->desc, "DATABASE") == 0 ||
2814                         strcmp(te->desc, "DOMAIN") == 0 ||
2815                         strcmp(te->desc, "FUNCTION") == 0 ||
2816                         strcmp(te->desc, "OPERATOR") == 0 ||
2817                         strcmp(te->desc, "OPERATOR CLASS") == 0 ||
2818                         strcmp(te->desc, "OPERATOR FAMILY") == 0 ||
2819                         strcmp(te->desc, "PROCEDURAL LANGUAGE") == 0 ||
2820                         strcmp(te->desc, "SCHEMA") == 0 ||
2821                         strcmp(te->desc, "TABLE") == 0 ||
2822                         strcmp(te->desc, "TYPE") == 0 ||
2823                         strcmp(te->desc, "VIEW") == 0 ||
2824                         strcmp(te->desc, "SEQUENCE") == 0 ||
2825                         strcmp(te->desc, "TEXT SEARCH DICTIONARY") == 0 ||
2826                         strcmp(te->desc, "TEXT SEARCH CONFIGURATION") == 0 ||
2827                         strcmp(te->desc, "FOREIGN DATA WRAPPER") == 0 ||
2828                         strcmp(te->desc, "SERVER") == 0)
2829                 {
2830                         PQExpBuffer temp = createPQExpBuffer();
2831
2832                         appendPQExpBuffer(temp, "ALTER ");
2833                         _getObjectDescription(temp, te, AH);
2834                         appendPQExpBuffer(temp, " OWNER TO %s;", fmtId(te->owner));
2835                         ahprintf(AH, "%s\n\n", temp->data);
2836                         destroyPQExpBuffer(temp);
2837                 }
2838                 else if (strcmp(te->desc, "CAST") == 0 ||
2839                                  strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
2840                                  strcmp(te->desc, "CONSTRAINT") == 0 ||
2841                                  strcmp(te->desc, "DEFAULT") == 0 ||
2842                                  strcmp(te->desc, "FK CONSTRAINT") == 0 ||
2843                                  strcmp(te->desc, "INDEX") == 0 ||
2844                                  strcmp(te->desc, "RULE") == 0 ||
2845                                  strcmp(te->desc, "TRIGGER") == 0 ||
2846                                  strcmp(te->desc, "USER MAPPING") == 0)
2847                 {
2848                         /* these object types don't have separate owners */
2849                 }
2850                 else
2851                 {
2852                         write_msg(modulename, "WARNING: don't know how to set owner for object type %s\n",
2853                                           te->desc);
2854                 }
2855         }
2856
2857         /*
2858          * If it's an ACL entry, it might contain SET SESSION AUTHORIZATION
2859          * commands, so we can no longer assume we know the current auth setting.
2860          */
2861         if (strncmp(te->desc, "ACL", 3) == 0)
2862         {
2863                 if (AH->currUser)
2864                         free(AH->currUser);
2865                 AH->currUser = NULL;
2866         }
2867 }
2868
2869 void
2870 WriteHead(ArchiveHandle *AH)
2871 {
2872         struct tm       crtm;
2873
2874         (*AH->WriteBufPtr) (AH, "PGDMP", 5);            /* Magic code */
2875         (*AH->WriteBytePtr) (AH, AH->vmaj);
2876         (*AH->WriteBytePtr) (AH, AH->vmin);
2877         (*AH->WriteBytePtr) (AH, AH->vrev);
2878         (*AH->WriteBytePtr) (AH, AH->intSize);
2879         (*AH->WriteBytePtr) (AH, AH->offSize);
2880         (*AH->WriteBytePtr) (AH, AH->format);
2881
2882 #ifndef HAVE_LIBZ
2883         if (AH->compression != 0)
2884                 write_msg(modulename, "WARNING: requested compression not available in this "
2885                                   "installation -- archive will be uncompressed\n");
2886
2887         AH->compression = 0;
2888 #endif
2889
2890         WriteInt(AH, AH->compression);
2891
2892         crtm = *localtime(&AH->createDate);
2893         WriteInt(AH, crtm.tm_sec);
2894         WriteInt(AH, crtm.tm_min);
2895         WriteInt(AH, crtm.tm_hour);
2896         WriteInt(AH, crtm.tm_mday);
2897         WriteInt(AH, crtm.tm_mon);
2898         WriteInt(AH, crtm.tm_year);
2899         WriteInt(AH, crtm.tm_isdst);
2900         WriteStr(AH, PQdb(AH->connection));
2901         WriteStr(AH, AH->public.remoteVersionStr);
2902         WriteStr(AH, PG_VERSION);
2903 }
2904
2905 void
2906 ReadHead(ArchiveHandle *AH)
2907 {
2908         char            tmpMag[7];
2909         int                     fmt;
2910         struct tm       crtm;
2911
2912         /* If we haven't already read the header... */
2913         if (!AH->readHeader)
2914         {
2915                 if ((*AH->ReadBufPtr) (AH, tmpMag, 5) != 5)
2916                         die_horribly(AH, modulename, "unexpected end of file\n");
2917
2918                 if (strncmp(tmpMag, "PGDMP", 5) != 0)
2919                         die_horribly(AH, modulename, "did not find magic string in file header\n");
2920
2921                 AH->vmaj = (*AH->ReadBytePtr) (AH);
2922                 AH->vmin = (*AH->ReadBytePtr) (AH);
2923
2924                 if (AH->vmaj > 1 || ((AH->vmaj == 1) && (AH->vmin > 0)))                /* Version > 1.0 */
2925                         AH->vrev = (*AH->ReadBytePtr) (AH);
2926                 else
2927                         AH->vrev = 0;
2928
2929                 AH->version = ((AH->vmaj * 256 + AH->vmin) * 256 + AH->vrev) * 256 + 0;
2930
2931
2932                 if (AH->version < K_VERS_1_0 || AH->version > K_VERS_MAX)
2933                         die_horribly(AH, modulename, "unsupported version (%d.%d) in file header\n",
2934                                                  AH->vmaj, AH->vmin);
2935
2936                 AH->intSize = (*AH->ReadBytePtr) (AH);
2937                 if (AH->intSize > 32)
2938                         die_horribly(AH, modulename, "sanity check on integer size (%lu) failed\n",
2939                                                  (unsigned long) AH->intSize);
2940
2941                 if (AH->intSize > sizeof(int))
2942                         write_msg(modulename, "WARNING: archive was made on a machine with larger integers, some operations might fail\n");
2943
2944                 if (AH->version >= K_VERS_1_7)
2945                         AH->offSize = (*AH->ReadBytePtr) (AH);
2946                 else
2947                         AH->offSize = AH->intSize;
2948
2949                 fmt = (*AH->ReadBytePtr) (AH);
2950
2951                 if (AH->format != fmt)
2952                         die_horribly(AH, modulename, "expected format (%d) differs from format found in file (%d)\n",
2953                                                  AH->format, fmt);
2954         }
2955
2956         if (AH->version >= K_VERS_1_2)
2957         {
2958                 if (AH->version < K_VERS_1_4)
2959                         AH->compression = (*AH->ReadBytePtr) (AH);
2960                 else
2961                         AH->compression = ReadInt(AH);
2962         }
2963         else
2964                 AH->compression = Z_DEFAULT_COMPRESSION;
2965
2966 #ifndef HAVE_LIBZ
2967         if (AH->compression != 0)
2968                 write_msg(modulename, "WARNING: archive is compressed, but this installation does not support compression -- no data will be available\n");
2969 #endif
2970
2971         if (AH->version >= K_VERS_1_4)
2972         {
2973                 crtm.tm_sec = ReadInt(AH);
2974                 crtm.tm_min = ReadInt(AH);
2975                 crtm.tm_hour = ReadInt(AH);
2976                 crtm.tm_mday = ReadInt(AH);
2977                 crtm.tm_mon = ReadInt(AH);
2978                 crtm.tm_year = ReadInt(AH);
2979                 crtm.tm_isdst = ReadInt(AH);
2980
2981                 AH->archdbname = ReadStr(AH);
2982
2983                 AH->createDate = mktime(&crtm);
2984
2985                 if (AH->createDate == (time_t) -1)
2986                         write_msg(modulename, "WARNING: invalid creation date in header\n");
2987         }
2988
2989         if (AH->version >= K_VERS_1_10)
2990         {
2991                 AH->archiveRemoteVersion = ReadStr(AH);
2992                 AH->archiveDumpVersion = ReadStr(AH);
2993         }
2994
2995 }
2996
2997
2998 /*
2999  * checkSeek
3000  *        check to see if fseek can be performed.
3001  */
3002 bool
3003 checkSeek(FILE *fp)
3004 {
3005         if (fseeko(fp, 0, SEEK_CUR) != 0)
3006                 return false;
3007         else if (sizeof(pgoff_t) > sizeof(long))
3008         {
3009                 /*
3010                  * At this point, pgoff_t is too large for long, so we return based on
3011                  * whether an pgoff_t version of fseek is available.
3012                  */
3013 #ifdef HAVE_FSEEKO
3014                 return true;
3015 #else
3016                 return false;
3017 #endif
3018         }
3019         else
3020                 return true;
3021 }
3022
3023
3024 /*
3025  * dumpTimestamp
3026  */
3027 static void
3028 dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim)
3029 {
3030         char            buf[256];
3031
3032         /*
3033          * We don't print the timezone on Win32, because the names are long and
3034          * localized, which means they may contain characters in various random
3035          * encodings; this has been seen to cause encoding errors when reading the
3036          * dump script.
3037          */
3038         if (strftime(buf, sizeof(buf),
3039 #ifndef WIN32
3040                                  "%Y-%m-%d %H:%M:%S %Z",
3041 #else
3042                                  "%Y-%m-%d %H:%M:%S",
3043 #endif
3044                                  localtime(&tim)) != 0)
3045                 ahprintf(AH, "-- %s %s\n\n", msg, buf);
3046 }
3047
3048
3049 /*
3050  * Main engine for parallel restore.
3051  *
3052  * Work is done in three phases.
3053  * First we process tocEntries until we come to one that is marked
3054  * SECTION_DATA or SECTION_POST_DATA, in a single connection, just as for a
3055  * standard restore.  Second we process the remaining non-ACL steps in
3056  * parallel worker children (threads on Windows, processes on Unix), each of
3057  * which connects separately to the database.  Finally we process all the ACL
3058  * entries in a single connection (that happens back in RestoreArchive).
3059  */
3060 static void
3061 restore_toc_entries_parallel(ArchiveHandle *AH)
3062 {
3063         RestoreOptions *ropt = AH->ropt;
3064         int                     n_slots = ropt->number_of_jobs;
3065         ParallelSlot *slots;
3066         int                     work_status;
3067         int                     next_slot;
3068         TocEntry   *first_unprocessed = AH->toc->next;
3069         TocEntry   *next_work_item;
3070         thandle         ret_child;
3071         TocEntry   *te;
3072
3073         ahlog(AH,2,"entering restore_toc_entries_parallel\n");
3074
3075         /* we haven't got round to making this work for all archive formats */
3076         if (AH->ClonePtr == NULL || AH->ReopenPtr == NULL)
3077                 die_horribly(AH, modulename, "parallel restore is not supported with this archive file format\n");
3078
3079         slots = (ParallelSlot *) calloc(sizeof(ParallelSlot), n_slots);
3080
3081         /* Adjust dependency information */
3082         fix_dependencies(AH);
3083
3084         /*
3085          * Do all the early stuff in a single connection in the parent.
3086          * There's no great point in running it in parallel, in fact it will
3087          * actually run faster in a single connection because we avoid all the
3088          * connection and setup overhead.
3089          */
3090         while ((next_work_item = get_next_work_item(AH, &first_unprocessed,
3091                                                                                                 NULL, 0)) != NULL)
3092         {
3093                 if (next_work_item->section == SECTION_DATA ||
3094                         next_work_item->section == SECTION_POST_DATA)
3095                         break;
3096
3097                 ahlog(AH, 1, "processing item %d %s %s\n",
3098                           next_work_item->dumpId,
3099                           next_work_item->desc, next_work_item->tag);
3100
3101                 (void) restore_toc_entry(AH, next_work_item, ropt, false);
3102
3103                 next_work_item->restored = true;
3104                 reduce_dependencies(AH, next_work_item);
3105         }
3106
3107         /*
3108          * Now close parent connection in prep for parallel steps.  We do this
3109          * mainly to ensure that we don't exceed the specified number of parallel
3110          * connections.
3111          */
3112         PQfinish(AH->connection);
3113         AH->connection = NULL;
3114
3115         /* blow away any transient state from the old connection */
3116         if (AH->currUser)
3117                 free(AH->currUser);
3118         AH->currUser = NULL;
3119         if (AH->currSchema)
3120                 free(AH->currSchema);
3121         AH->currSchema = NULL;
3122         if (AH->currTablespace)
3123                 free(AH->currTablespace);
3124         AH->currTablespace = NULL;
3125         AH->currWithOids = -1;
3126
3127         /*
3128          * main parent loop
3129          *
3130          * Keep going until there is no worker still running AND there is no work
3131          * left to be done.
3132          */
3133
3134         ahlog(AH,1,"entering main parallel loop\n");
3135
3136         while ((next_work_item = get_next_work_item(AH, &first_unprocessed,
3137                                                                                                 slots, n_slots)) != NULL ||
3138                    work_in_progress(slots, n_slots))
3139         {
3140                 if (next_work_item != NULL)
3141                 {
3142                         teReqs    reqs;
3143
3144                         /* If not to be dumped, don't waste time launching a worker */
3145                         reqs = _tocEntryRequired(next_work_item, AH->ropt, false);
3146                         if ((reqs & (REQ_SCHEMA | REQ_DATA)) == 0)
3147                         {
3148                                 ahlog(AH, 1, "skipping item %d %s %s\n",
3149                                           next_work_item->dumpId,
3150                                           next_work_item->desc, next_work_item->tag);
3151
3152                                 next_work_item->restored = true;
3153                                 reduce_dependencies(AH, next_work_item);
3154
3155                                 continue;
3156                         }
3157
3158                         if ((next_slot = get_next_slot(slots, n_slots)) != NO_SLOT)
3159                         {
3160                                 /* There is work still to do and a worker slot available */
3161                                 thandle child;
3162                                 RestoreArgs *args;
3163
3164                                 ahlog(AH, 1, "launching item %d %s %s\n",
3165                                           next_work_item->dumpId,
3166                                           next_work_item->desc, next_work_item->tag);
3167
3168                                 next_work_item->restored = true;
3169
3170                                 /* this memory is dealloced in mark_work_done() */
3171                                 args = malloc(sizeof(RestoreArgs));
3172                                 args->AH = CloneArchive(AH);
3173                                 args->te = next_work_item;
3174
3175                                 /* run the step in a worker child */
3176                                 child = spawn_restore(args);
3177
3178                                 slots[next_slot].child_id = child;
3179                                 slots[next_slot].args = args;
3180
3181                                 continue;
3182                         }
3183                 }
3184
3185                 /*
3186                  * If we get here there must be work being done.  Either there is no
3187                  * work available to schedule (and work_in_progress returned true) or
3188                  * there are no slots available.  So we wait for a worker to finish,
3189                  * and process the result.
3190                  */
3191                 ret_child = reap_child(slots, n_slots, &work_status);
3192
3193                 if (WIFEXITED(work_status))
3194                 {
3195                         mark_work_done(AH, ret_child, WEXITSTATUS(work_status),
3196                                                    slots, n_slots);
3197                 }
3198                 else
3199                 {
3200                         die_horribly(AH, modulename, "worker process crashed: status %d\n",
3201                                                  work_status);
3202                 }
3203         }
3204
3205         ahlog(AH,1,"finished main parallel loop\n");
3206
3207         /*
3208          * Now reconnect the single parent connection.
3209          */
3210         ConnectDatabase((Archive *) AH, ropt->dbname,
3211                                         ropt->pghost, ropt->pgport, ropt->username,
3212                                         ropt->promptPassword);
3213
3214         _doSetFixedOutputState(AH);
3215
3216         /*
3217          * Make sure there is no non-ACL work left due to, say,
3218          * circular dependencies, or some other pathological condition.
3219          * If so, do it in the single parent connection.
3220          */
3221         for (te = AH->toc->next; te != AH->toc; te = te->next)
3222         {
3223                 if (!te->restored)
3224                 {
3225                         ahlog(AH, 1, "processing missed item %d %s %s\n",
3226                                   te->dumpId, te->desc, te->tag);
3227                         (void) restore_toc_entry(AH, te, ropt, false);
3228                 }
3229         }
3230
3231         /* The ACLs will be handled back in RestoreArchive. */
3232 }
3233
3234 /*
3235  * create a worker child to perform a restore step in parallel
3236  */
3237 static thandle
3238 spawn_restore(RestoreArgs *args)
3239 {
3240         thandle child;
3241
3242         /* Ensure stdio state is quiesced before forking */
3243         fflush(NULL);
3244
3245 #ifndef WIN32
3246         child = fork();
3247         if (child == 0)
3248         {
3249                 /* in child process */
3250                 parallel_restore(args);
3251                 die_horribly(args->AH, modulename,
3252                                          "parallel_restore should not return\n");
3253         }
3254         else if (child < 0)
3255         {
3256                 /* fork failed */
3257                 die_horribly(args->AH, modulename,
3258                                          "could not create worker process: %s\n",
3259                                          strerror(errno));
3260         }
3261 #else
3262         child = (HANDLE) _beginthreadex(NULL, 0, (void *) parallel_restore,
3263                                                                         args, 0, NULL);
3264         if (child == 0)
3265                 die_horribly(args->AH, modulename,
3266                                          "could not create worker thread: %s\n",
3267                                          strerror(errno));
3268 #endif
3269
3270         return child;
3271 }
3272
3273 /*
3274  *  collect status from a completed worker child
3275  */
3276 static thandle
3277 reap_child(ParallelSlot *slots, int n_slots, int *work_status)
3278 {
3279 #ifndef WIN32
3280         /* Unix is so much easier ... */
3281         return wait(work_status);
3282 #else
3283         static HANDLE *handles = NULL;
3284         int hindex, snum, tnum;
3285         thandle ret_child;
3286         DWORD res;
3287
3288         /* first time around only, make space for handles to listen on */
3289         if (handles == NULL)
3290                 handles = (HANDLE *) calloc(sizeof(HANDLE),n_slots);
3291
3292         /* set up list of handles to listen to */
3293         for (snum=0, tnum=0; snum < n_slots; snum++)
3294                 if (slots[snum].child_id != 0)
3295                         handles[tnum++] = slots[snum].child_id;
3296
3297         /* wait for one to finish */
3298         hindex = WaitForMultipleObjects(tnum, handles, false, INFINITE);
3299
3300         /* get handle of finished thread */
3301         ret_child = handles[hindex - WAIT_OBJECT_0];
3302
3303         /* get the result */
3304         GetExitCodeThread(ret_child,&res);
3305         *work_status = res;
3306
3307         /* dispose of handle to stop leaks */
3308         CloseHandle(ret_child);
3309
3310         return ret_child;
3311 #endif
3312 }
3313
3314 /*
3315  * are we doing anything now?
3316  */
3317 static bool
3318 work_in_progress(ParallelSlot *slots, int n_slots)
3319 {
3320         int i;
3321
3322         for (i = 0; i < n_slots; i++)
3323         {
3324                 if (slots[i].child_id != 0)
3325                         return true;
3326         }
3327         return false;
3328 }
3329
3330 /*
3331  * find the first free parallel slot (if any).
3332  */
3333 static int
3334 get_next_slot(ParallelSlot *slots, int n_slots)
3335 {
3336         int i;
3337
3338         for (i = 0; i < n_slots; i++)
3339         {
3340                 if (slots[i].child_id == 0)
3341                         return i;
3342         }
3343         return NO_SLOT;
3344 }
3345
3346 /*
3347  * Find the next work item (if any) that is capable of being run now.
3348  *
3349  * To qualify, the item must have no remaining dependencies
3350  * and no requirement for locks that is incompatible with
3351  * items currently running.
3352  *
3353  * first_unprocessed is state data that tracks the location of the first
3354  * TocEntry that's not marked 'restored'.  This avoids O(N^2) search time
3355  * with long TOC lists.  (Even though the constant is pretty small, it'd
3356  * get us eventually.)
3357  *
3358  * pref_non_data is for an alternative selection algorithm that gives
3359  * preference to non-data items if there is already a data load running.
3360  * It is currently disabled.
3361  */
3362 static TocEntry *
3363 get_next_work_item(ArchiveHandle *AH, TocEntry **first_unprocessed,
3364                                    ParallelSlot *slots, int n_slots)
3365 {
3366         bool      pref_non_data = false; /* or get from AH->ropt */
3367         TocEntry *data_te = NULL;
3368         TocEntry *te;
3369         int       i,j,k;
3370
3371         /*
3372          * Bogus heuristics for pref_non_data
3373          */
3374         if (pref_non_data)
3375         {
3376                 int count = 0;
3377
3378                 for (k=0; k < n_slots; k++)
3379                         if (slots[k].args->te != NULL &&
3380                                 slots[k].args->te->section == SECTION_DATA)
3381                                 count++;
3382                 if (n_slots == 0 || count * 4 < n_slots)
3383                         pref_non_data = false;
3384         }
3385
3386         /*
3387          * Advance first_unprocessed if possible.
3388          */
3389         for (te = *first_unprocessed; te != AH->toc; te = te->next)
3390         {
3391                 if (!te->restored)
3392                         break;
3393         }
3394         *first_unprocessed = te;
3395
3396         /*
3397          * Search from first_unprocessed until we find an available item.
3398          */
3399         for (; te != AH->toc; te = te->next)
3400         {
3401                 bool conflicts = false;
3402
3403                 /* Ignore if already done or still waiting on dependencies */
3404                 if (te->restored || te->depCount > 0)
3405                         continue;
3406
3407                 /*
3408                  * Check to see if the item would need exclusive lock on something
3409                  * that a currently running item also needs lock on.  If so, we
3410                  * don't want to schedule them together.
3411                  */
3412                 for (i = 0; i < n_slots && !conflicts; i++)
3413                 {
3414                         TocEntry *running_te;
3415
3416                         if (slots[i].args == NULL)
3417                                 continue;
3418                         running_te = slots[i].args->te;
3419                         for (j = 0; j < te->nLockDeps && !conflicts; j++)
3420                         {
3421                                 for (k = 0; k < running_te->nLockDeps; k++)
3422                                 {
3423                                         if (te->lockDeps[j] == running_te->lockDeps[k])
3424                                         {
3425                                                 conflicts = true;
3426                                                 break;
3427                                         }
3428                                 }
3429                         }
3430                 }
3431
3432                 if (conflicts)
3433                         continue;
3434
3435                 if (pref_non_data && te->section == SECTION_DATA)
3436                 {
3437                         if (data_te == NULL)
3438                                 data_te = te;
3439                         continue;
3440                 }
3441
3442                 /* passed all tests, so this item can run */
3443                 return te;
3444         }
3445
3446         if (data_te != NULL)
3447                 return data_te;
3448
3449         ahlog(AH,2,"no item ready\n");
3450         return NULL;
3451 }
3452
3453
3454 /*
3455  * Restore a single TOC item in parallel with others
3456  *
3457  * this is the procedure run as a thread (Windows) or a
3458  * separate process (everything else).
3459  */
3460 static parallel_restore_result
3461 parallel_restore(RestoreArgs *args)
3462 {
3463         ArchiveHandle *AH = args->AH;
3464         TocEntry *te = args->te;
3465         RestoreOptions *ropt = AH->ropt;
3466         int retval;
3467
3468         /*
3469          * Close and reopen the input file so we have a private file pointer
3470          * that doesn't stomp on anyone else's file pointer, if we're actually 
3471          * going to need to read from the file. Otherwise, just close it
3472          * except on Windows, where it will possibly be needed by other threads.
3473          *
3474          * Note: on Windows, since we are using threads not processes, the
3475          * reopen call *doesn't* close the original file pointer but just open 
3476          * a new one.
3477          */
3478         if (te->section == SECTION_DATA )
3479                 (AH->ReopenPtr) (AH);
3480 #ifndef WIN32
3481         else
3482                 (AH->ClosePtr) (AH);
3483 #endif
3484
3485         /*
3486          * We need our own database connection, too
3487          */
3488         ConnectDatabase((Archive *) AH, ropt->dbname,
3489                                         ropt->pghost, ropt->pgport, ropt->username,
3490                                         ropt->promptPassword);
3491
3492         _doSetFixedOutputState(AH);
3493
3494         /* Restore the TOC item */
3495         retval = restore_toc_entry(AH, te, ropt, true);
3496
3497         /* And clean up */
3498         PQfinish(AH->connection);
3499         AH->connection = NULL;
3500
3501         /* If we reopened the file, we are done with it, so close it now */
3502         if (te->section == SECTION_DATA )
3503                 (AH->ClosePtr) (AH);
3504
3505         if (retval == 0 && AH->public.n_errors)
3506                 retval = WORKER_IGNORED_ERRORS;
3507
3508 #ifndef WIN32
3509         exit(retval);
3510 #else
3511         return retval;
3512 #endif
3513 }
3514
3515
3516 /*
3517  * Housekeeping to be done after a step has been parallel restored.
3518  *
3519  * Clear the appropriate slot, free all the extra memory we allocated,
3520  * update status, and reduce the dependency count of any dependent items.
3521  */
3522 static void
3523 mark_work_done(ArchiveHandle *AH, thandle worker, int status,
3524                            ParallelSlot *slots, int n_slots)
3525 {
3526         TocEntry *te = NULL;
3527         int i;
3528
3529         for (i = 0; i < n_slots; i++)
3530         {
3531                 if (slots[i].child_id == worker)
3532                 {
3533                         slots[i].child_id = 0;
3534                         te = slots[i].args->te;
3535                         DeCloneArchive(slots[i].args->AH);
3536                         free(slots[i].args);
3537                         slots[i].args = NULL;
3538
3539                         break;
3540                 }
3541         }
3542
3543         if (te == NULL)
3544                 die_horribly(AH, modulename, "could not find slot of finished worker\n");
3545
3546         ahlog(AH, 1, "finished item %d %s %s\n",
3547                   te->dumpId, te->desc, te->tag);
3548
3549         if (status == WORKER_CREATE_DONE)
3550                 mark_create_done(AH, te);
3551         else if (status == WORKER_INHIBIT_DATA)
3552         {
3553                 inhibit_data_for_failed_table(AH, te);
3554                 AH->public.n_errors++;
3555         }
3556         else if (status == WORKER_IGNORED_ERRORS)
3557                 AH->public.n_errors++;
3558         else if (status != 0)
3559                 die_horribly(AH, modulename, "worker process failed: exit code %d\n",
3560                                          status);
3561
3562         reduce_dependencies(AH, te);
3563 }
3564
3565
3566 /*
3567  * Process the dependency information into a form useful for parallel restore.
3568  *
3569  * We set up depCount fields that are the number of as-yet-unprocessed
3570  * dependencies for each TOC entry.
3571  *
3572  * We also identify locking dependencies so that we can avoid trying to
3573  * schedule conflicting items at the same time.
3574  */
3575 static void
3576 fix_dependencies(ArchiveHandle *AH)
3577 {
3578         TocEntry  **tocsByDumpId;
3579         TocEntry   *te;
3580         int i;
3581
3582         /*
3583          * For some of the steps here, it is convenient to have an array that
3584          * indexes the TOC entries by dump ID, rather than searching the TOC
3585          * list repeatedly.  Entries for dump IDs not present in the TOC will
3586          * be NULL.
3587          *
3588          * Also, initialize the depCount fields.
3589          */
3590         tocsByDumpId = (TocEntry **) calloc(AH->maxDumpId, sizeof(TocEntry *));
3591         for (te = AH->toc->next; te != AH->toc; te = te->next)
3592         {
3593                 tocsByDumpId[te->dumpId - 1] = te;
3594                 te->depCount = te->nDeps;
3595         }
3596
3597         /*
3598          * POST_DATA items that are shown as depending on a table need to be
3599          * re-pointed to depend on that table's data, instead.  This ensures they
3600          * won't get scheduled until the data has been loaded.  We handle this by
3601          * first finding TABLE/TABLE DATA pairs and then scanning all the
3602          * dependencies.
3603          *
3604          * Note: currently, a TABLE DATA should always have exactly one
3605          * dependency, on its TABLE item.  So we don't bother to search,
3606          * but look just at the first dependency.  We do trouble to make sure
3607          * that it's a TABLE, if possible.  However, if the dependency isn't
3608          * in the archive then just assume it was a TABLE; this is to cover
3609          * cases where the table was suppressed but we have the data and some
3610          * dependent post-data items.
3611          */
3612         for (te = AH->toc->next; te != AH->toc; te = te->next)
3613         {
3614                 if (strcmp(te->desc, "TABLE DATA") == 0 && te->nDeps > 0)
3615                 {
3616                         DumpId  tableId = te->dependencies[0];
3617
3618                         if (tocsByDumpId[tableId - 1] == NULL ||
3619                                 strcmp(tocsByDumpId[tableId - 1]->desc, "TABLE") == 0)
3620                         {
3621                                 repoint_table_dependencies(AH, tableId, te->dumpId);
3622                         }
3623                 }
3624         }
3625
3626         /*
3627          * Pre-8.4 versions of pg_dump neglected to set up a dependency from
3628          * BLOB COMMENTS to BLOBS.  Cope.  (We assume there's only one BLOBS
3629          * and only one BLOB COMMENTS in such files.)
3630          */
3631         if (AH->version < K_VERS_1_11)
3632         {
3633                 for (te = AH->toc->next; te != AH->toc; te = te->next)
3634                 {
3635                         if (strcmp(te->desc, "BLOB COMMENTS") == 0 && te->nDeps == 0)
3636                         {
3637                                 TocEntry   *te2;
3638
3639                                 for (te2 = AH->toc->next; te2 != AH->toc; te2 = te2->next)
3640                                 {
3641                                         if (strcmp(te2->desc, "BLOBS") == 0)
3642                                         {
3643                                                 te->dependencies = (DumpId *) malloc(sizeof(DumpId));
3644                                                 te->dependencies[0] = te2->dumpId;
3645                                                 te->nDeps++;
3646                                                 te->depCount++;
3647                                                 break;
3648                                         }
3649                                 }
3650                                 break;
3651                         }
3652                 }
3653         }
3654
3655         /*
3656          * It is possible that the dependencies list items that are not in the
3657          * archive at all.  Subtract such items from the depCounts.
3658          */
3659         for (te = AH->toc->next; te != AH->toc; te = te->next)
3660         {
3661                 for (i = 0; i < te->nDeps; i++)
3662                 {
3663                         if (tocsByDumpId[te->dependencies[i] - 1] == NULL)
3664                                 te->depCount--;
3665                 }
3666         }
3667
3668         /*
3669          * Lastly, work out the locking dependencies.
3670          */
3671         for (te = AH->toc->next; te != AH->toc; te = te->next)
3672         {
3673                 te->lockDeps = NULL;
3674                 te->nLockDeps = 0;
3675                 identify_locking_dependencies(te, tocsByDumpId);
3676         }
3677
3678         free(tocsByDumpId);
3679 }
3680
3681 /*
3682  * Change dependencies on tableId to depend on tableDataId instead,
3683  * but only in POST_DATA items.
3684  */
3685 static void
3686 repoint_table_dependencies(ArchiveHandle *AH,
3687                                                    DumpId tableId, DumpId tableDataId)
3688 {
3689         TocEntry   *te;
3690         int i;
3691
3692         for (te = AH->toc->next; te != AH->toc; te = te->next)
3693         {
3694                 if (te->section != SECTION_POST_DATA)
3695                         continue;
3696                 for (i = 0; i < te->nDeps; i++)
3697                 {
3698                         if (te->dependencies[i] == tableId)
3699                         {
3700                                 te->dependencies[i] = tableDataId;
3701                                 ahlog(AH, 2, "transferring dependency %d -> %d to %d\n",
3702                                           te->dumpId, tableId, tableDataId);
3703                         }
3704                 }
3705         }
3706 }
3707
3708 /*
3709  * Identify which objects we'll need exclusive lock on in order to restore
3710  * the given TOC entry (*other* than the one identified by the TOC entry
3711  * itself).  Record their dump IDs in the entry's lockDeps[] array.
3712  * tocsByDumpId[] is a convenience array to avoid searching the TOC
3713  * for each dependency.
3714  */
3715 static void
3716 identify_locking_dependencies(TocEntry *te, TocEntry **tocsByDumpId)
3717 {
3718         DumpId     *lockids;
3719         int                     nlockids;
3720         int                     i;
3721
3722         /* Quick exit if no dependencies at all */
3723         if (te->nDeps == 0)
3724                 return;
3725
3726         /* Exit if this entry doesn't need exclusive lock on other objects */
3727         if (!(strcmp(te->desc, "CONSTRAINT") == 0 ||
3728                   strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
3729                   strcmp(te->desc, "FK CONSTRAINT") == 0 ||
3730                   strcmp(te->desc, "RULE") == 0 ||
3731                   strcmp(te->desc, "TRIGGER") == 0))
3732                 return;
3733
3734         /*
3735          * We assume the item requires exclusive lock on each TABLE DATA item
3736          * listed among its dependencies.  (This was originally a dependency
3737          * on the TABLE, but fix_dependencies repointed it to the data item.
3738          * Note that all the entry types we are interested in here are POST_DATA,
3739          * so they will all have been changed this way.)
3740          */
3741         lockids = (DumpId *) malloc(te->nDeps * sizeof(DumpId));
3742         nlockids = 0;
3743         for (i = 0; i < te->nDeps; i++)
3744         {
3745                 DumpId  depid = te->dependencies[i];
3746
3747                 if (tocsByDumpId[depid - 1] &&
3748                         strcmp(tocsByDumpId[depid - 1]->desc, "TABLE DATA") == 0)
3749                         lockids[nlockids++] = depid;
3750         }
3751
3752         if (nlockids == 0)
3753         {
3754                 free(lockids);
3755                 return;
3756         }
3757
3758         te->lockDeps = realloc(lockids, nlockids * sizeof(DumpId));
3759         te->nLockDeps = nlockids;
3760 }
3761
3762 /*
3763  * Remove the specified TOC entry from the depCounts of items that depend on
3764  * it, thereby possibly making them ready-to-run.
3765  */
3766 static void
3767 reduce_dependencies(ArchiveHandle *AH, TocEntry *te)
3768 {
3769         DumpId target = te->dumpId;
3770         int i;
3771
3772         ahlog(AH,2,"reducing dependencies for %d\n",target);
3773
3774         /*
3775          * We must examine all entries, not only the ones after the target item,
3776          * because if the user used a -L switch then the original dependency-
3777          * respecting order has been destroyed by SortTocFromFile.
3778          */
3779         for (te = AH->toc->next; te != AH->toc; te = te->next)
3780         {
3781                 for (i = 0; i < te->nDeps; i++)
3782                 {
3783                         if (te->dependencies[i] == target)
3784                                 te->depCount--;
3785                 }
3786         }
3787 }
3788
3789 /*
3790  * Set the created flag on the DATA member corresponding to the given
3791  * TABLE member
3792  */
3793 static void
3794 mark_create_done(ArchiveHandle *AH, TocEntry *te)
3795 {
3796         TocEntry   *tes;
3797
3798         for (tes = AH->toc->next; tes != AH->toc; tes = tes->next)
3799         {
3800                 if (strcmp(tes->desc, "TABLE DATA") == 0 &&
3801                         strcmp(tes->tag, te->tag) == 0 &&
3802                         strcmp(tes->namespace ? tes->namespace : "",
3803                                    te->namespace ? te->namespace : "") == 0)
3804                 {
3805                         tes->created = true;
3806                         break;
3807                 }
3808         }
3809 }
3810
3811 /*
3812  * Mark the DATA member corresponding to the given TABLE member
3813  * as not wanted
3814  */
3815 static void
3816 inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te)
3817 {
3818         RestoreOptions *ropt = AH->ropt;
3819         TocEntry   *tes;
3820
3821         ahlog(AH, 1, "table \"%s\" could not be created, will not restore its data\n",
3822                   te->tag);
3823
3824         for (tes = AH->toc->next; tes != AH->toc; tes = tes->next)
3825         {
3826                 if (strcmp(tes->desc, "TABLE DATA") == 0 &&
3827                         strcmp(tes->tag, te->tag) == 0 &&
3828                         strcmp(tes->namespace ? tes->namespace : "",
3829                                    te->namespace ? te->namespace : "") == 0)
3830                 {
3831                         /* mark it unwanted; we assume idWanted array already exists */
3832                         ropt->idWanted[tes->dumpId - 1] = false;
3833                         break;
3834                 }
3835         }
3836 }
3837
3838
3839 /*
3840  * Clone and de-clone routines used in parallel restoration.
3841  *
3842  * Enough of the structure is cloned to ensure that there is no
3843  * conflict between different threads each with their own clone.
3844  *
3845  * These could be public, but no need at present.
3846  */
3847 static ArchiveHandle *
3848 CloneArchive(ArchiveHandle *AH)
3849 {
3850         ArchiveHandle *clone;
3851
3852         /* Make a "flat" copy */
3853         clone = (ArchiveHandle *) malloc(sizeof(ArchiveHandle)); 
3854         if (clone == NULL)
3855                 die_horribly(AH, modulename, "out of memory\n");
3856         memcpy(clone, AH, sizeof(ArchiveHandle));
3857
3858         /* Handle format-independent fields */
3859         clone->pgCopyBuf = createPQExpBuffer();
3860         clone->sqlBuf = createPQExpBuffer();
3861         clone->sqlparse.tagBuf = NULL;
3862
3863         /* The clone will have its own connection, so disregard connection state */
3864         clone->connection = NULL;
3865         clone->currUser = NULL;
3866         clone->currSchema = NULL;
3867         clone->currTablespace = NULL;
3868         clone->currWithOids = -1;
3869
3870         /* savedPassword must be local in case we change it while connecting */
3871         if (clone->savedPassword)
3872                 clone->savedPassword = strdup(clone->savedPassword);
3873
3874         /* clone has its own error count, too */
3875         clone->public.n_errors = 0;
3876
3877         /* Let the format-specific code have a chance too */
3878         (clone->ClonePtr) (clone);
3879
3880         return clone;
3881 }
3882
3883 /*
3884  * Release clone-local storage.
3885  *
3886  * Note: we assume any clone-local connection was already closed.
3887  */
3888 static void
3889 DeCloneArchive(ArchiveHandle *AH)
3890 {
3891         /* Clear format-specific state */
3892         (AH->DeClonePtr) (AH);
3893
3894         /* Clear state allocated by CloneArchive */
3895         destroyPQExpBuffer(AH->pgCopyBuf);
3896         destroyPQExpBuffer(AH->sqlBuf);
3897         if (AH->sqlparse.tagBuf)
3898                 destroyPQExpBuffer(AH->sqlparse.tagBuf);
3899
3900         /* Clear any connection-local state */
3901         if (AH->currUser)
3902                 free(AH->currUser);
3903         if (AH->currSchema)
3904                 free(AH->currSchema);
3905         if (AH->currTablespace)
3906                 free(AH->currTablespace);
3907         if (AH->savedPassword)
3908                 free(AH->savedPassword);
3909
3910         free(AH);
3911 }