const int compression, ArchiveMode mode, SetupWorkerPtr setupWorkerPtr);
static void _getObjectDescription(PQExpBuffer buf, TocEntry *te,
ArchiveHandle *AH);
-static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData, bool acl_pass);
+static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData);
static char *replace_line_endings(const char *str);
static void _doSetFixedOutputState(ArchiveHandle *AH);
static void _doSetSessionAuth(ArchiveHandle *AH, const char *user);
static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te);
static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te);
static teReqs _tocEntryRequired(TocEntry *te, teSection curSection, RestoreOptions *ropt);
+static RestorePass _tocEntryRestorePass(TocEntry *te);
static bool _tocEntryIsACL(TocEntry *te);
static void _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te);
static void _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te);
static void RestoreOutput(ArchiveHandle *AH, OutputContext savedContext);
static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel);
-static void restore_toc_entries_prefork(ArchiveHandle *AH);
-static void restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
+static void restore_toc_entries_prefork(ArchiveHandle *AH,
+ TocEntry *pending_list);
+static void restore_toc_entries_parallel(ArchiveHandle *AH,
+ ParallelState *pstate,
+ TocEntry *pending_list);
+static void restore_toc_entries_postfork(ArchiveHandle *AH,
TocEntry *pending_list);
-static void restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list);
static void par_list_header_init(TocEntry *l);
static void par_list_append(TocEntry *l, TocEntry *te);
static void par_list_remove(TocEntry *te);
+static void move_to_ready_list(TocEntry *pending_list, TocEntry *ready_list,
+ RestorePass pass);
static TocEntry *get_next_work_item(ArchiveHandle *AH,
TocEntry *ready_list,
ParallelState *pstate);
AH->currSchema = NULL;
}
- /*
- * In serial mode, we now process each non-ACL TOC entry.
- *
- * In parallel mode, turn control over to the parallel-restore logic.
- */
if (parallel_mode)
{
+ /*
+ * In parallel mode, turn control over to the parallel-restore logic.
+ */
ParallelState *pstate;
TocEntry pending_list;
par_list_header_init(&pending_list);
/* This runs PRE_DATA items and then disconnects from the database */
- restore_toc_entries_prefork(AH);
+ restore_toc_entries_prefork(AH, &pending_list);
Assert(AH->connection == NULL);
/* ParallelBackupStart() will actually fork the processes */
}
else
{
+ /*
+ * In serial mode, process everything in three phases: normal items,
+ * then ACLs, then matview refresh items. We might be able to skip
+ * one or both extra phases in some cases, eg data-only restores.
+ */
+ bool haveACL = false;
+ bool haveRefresh = false;
+
for (te = AH->toc->next; te != AH->toc; te = te->next)
- (void) restore_toc_entry(AH, te, false);
- }
+ {
+ if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) == 0)
+ continue; /* ignore if not to be dumped at all */
- /*
- * Scan TOC again to output ownership commands and ACLs
- */
- for (te = AH->toc->next; te != AH->toc; te = te->next)
- {
- AH->currentTE = te;
+ switch (_tocEntryRestorePass(te))
+ {
+ case RESTORE_PASS_MAIN:
+ (void) restore_toc_entry(AH, te, false);
+ break;
+ case RESTORE_PASS_ACL:
+ haveACL = true;
+ break;
+ case RESTORE_PASS_REFRESH:
+ haveRefresh = true;
+ break;
+ }
+ }
- /* Both schema and data objects might now have ownership/ACLs */
- if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0)
+ if (haveACL)
{
- /* Show namespace if available */
- if (te->namespace)
- ahlog(AH, 1, "setting owner and privileges for %s \"%s.%s\"\n",
- te->desc, te->namespace, te->tag);
- else
- ahlog(AH, 1, "setting owner and privileges for %s \"%s\"\n",
- te->desc, te->tag);
- _printTocEntry(AH, te, false, true);
+ for (te = AH->toc->next; te != AH->toc; te = te->next)
+ {
+ if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0 &&
+ _tocEntryRestorePass(te) == RESTORE_PASS_ACL)
+ (void) restore_toc_entry(AH, te, false);
+ }
+ }
+
+ if (haveRefresh)
+ {
+ for (te = AH->toc->next; te != AH->toc; te = te->next)
+ {
+ if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0 &&
+ _tocEntryRestorePass(te) == RESTORE_PASS_REFRESH)
+ (void) restore_toc_entry(AH, te, false);
+ }
}
}
AH->currentTE = te;
/* Work out what, if anything, we want from this entry */
- if (_tocEntryIsACL(te))
- reqs = 0; /* ACLs are never restored here */
- else
- reqs = te->reqs;
+ reqs = te->reqs;
/*
* Ignore DATABASE entry unless we should create it. We must check this
defnDumped = false;
- if ((reqs & REQ_SCHEMA) != 0) /* We want the schema */
+ /*
+ * If it has a schema component that we want, then process that
+ */
+ if ((reqs & REQ_SCHEMA) != 0)
{
- /* Show namespace if available */
+ /* Show namespace in log message if available */
if (te->namespace)
ahlog(AH, 1, "creating %s \"%s.%s\"\n",
te->desc, te->namespace, te->tag);
else
ahlog(AH, 1, "creating %s \"%s\"\n", te->desc, te->tag);
-
- _printTocEntry(AH, te, false, false);
+ _printTocEntry(AH, te, false);
defnDumped = true;
if (strcmp(te->desc, "TABLE") == 0)
}
/*
- * If we have a data component, then process it
+ * If it has a data component that we want, then process that
*/
if ((reqs & REQ_DATA) != 0)
{
*/
if (AH->PrintTocDataPtr !=NULL)
{
- _printTocEntry(AH, te, true, false);
+ _printTocEntry(AH, te, true);
if (strcmp(te->desc, "BLOBS") == 0 ||
strcmp(te->desc, "BLOB COMMENTS") == 0)
{
/* If we haven't already dumped the defn part, do so now */
ahlog(AH, 1, "executing %s %s\n", te->desc, te->tag);
- _printTocEntry(AH, te, false, false);
+ _printTocEntry(AH, te, false);
}
}
return res;
}
+/*
+ * Identify which pass we should restore this TOC entry in.
+ *
+ * See notes with the RestorePass typedef in pg_backup_archiver.h.
+ */
+static RestorePass
+_tocEntryRestorePass(TocEntry *te)
+{
+ /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
+ if (strcmp(te->desc, "ACL") == 0 ||
+ strcmp(te->desc, "ACL LANGUAGE") == 0 ||
+ strcmp(te->desc, "DEFAULT ACL") == 0)
+ return RESTORE_PASS_ACL;
+ if (strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0)
+ return RESTORE_PASS_REFRESH;
+ return RESTORE_PASS_MAIN;
+}
+
/*
* Identify TOC entries that are ACLs.
+ *
+ * Note: it seems worth duplicating some code here to avoid a hard-wired
+ * assumption that these are exactly the same entries that we restore during
+ * the RESTORE_PASS_ACL phase.
*/
static bool
_tocEntryIsACL(TocEntry *te)
type);
}
+/*
+ * Emit the SQL commands to create the object represented by a TOC entry
+ *
+ * This now also includes issuing an ALTER OWNER command to restore the
+ * object's ownership, if wanted. But note that the object's permissions
+ * will remain at default, until the matching ACL TOC entry is restored.
+ */
static void
-_printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData, bool acl_pass)
+_printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData)
{
RestoreOptions *ropt = AH->public.ropt;
- /* ACLs are dumped only during acl pass */
- if (acl_pass)
- {
- if (!_tocEntryIsACL(te))
- return;
- }
- else
- {
- if (_tocEntryIsACL(te))
- return;
- }
-
/*
* Avoid dumping the public schema, as it will already be created ...
* unless we are using --clean mode (and *not* --create mode), in which
* If it's an ACL entry, it might contain SET SESSION AUTHORIZATION
* commands, so we can no longer assume we know the current auth setting.
*/
- if (acl_pass)
+ if (_tocEntryIsACL(te))
{
if (AH->currUser)
free(AH->currUser);
return result;
}
+/*
+ * Write the file header for a custom-format archive
+ */
void
WriteHead(ArchiveHandle *AH)
{
/*
* Main engine for parallel restore.
*
- * Work is done in three phases.
- * First we process all SECTION_PRE_DATA tocEntries, in a single connection,
- * just as for a standard restore. Second we process the remaining non-ACL
- * steps in parallel worker children (threads on Windows, processes on Unix),
- * each of which connects separately to the database. Finally we process all
- * the ACL entries in a single connection (that happens back in
- * RestoreArchive).
+ * Parallel restore is done in three phases. In this first phase,
+ * we'll process all SECTION_PRE_DATA TOC entries that are allowed to be
+ * processed in the RESTORE_PASS_MAIN pass. (In practice, that's all
+ * PRE_DATA items other than ACLs.) Entries we can't process now are
+ * added to the pending_list for later phases to deal with.
*/
static void
-restore_toc_entries_prefork(ArchiveHandle *AH)
+restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list)
{
bool skipped_some;
TocEntry *next_work_item;
* about showing all the dependencies of SECTION_PRE_DATA items, so we do
* not risk trying to process them out-of-order.
*
+ * Stuff that we can't do immediately gets added to the pending_list.
+ * Note: we don't yet filter out entries that aren't going to be restored.
+ * They might participate in dependency chains connecting entries that
+ * should be restored, so we treat them as live until we actually process
+ * them.
+ *
* Note: as of 9.2, it should be guaranteed that all PRE_DATA items appear
* before DATA items, and all DATA items before POST_DATA items. That is
* not certain to be true in older archives, though, so this loop is coded
* to not assume it.
*/
+ AH->restorePass = RESTORE_PASS_MAIN;
skipped_some = false;
for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next)
{
- /* NB: process-or-continue logic must be the inverse of loop below */
+ bool do_now = true;
+
if (next_work_item->section != SECTION_PRE_DATA)
{
/* DATA and POST_DATA items are just ignored for now */
if (next_work_item->section == SECTION_DATA ||
next_work_item->section == SECTION_POST_DATA)
{
+ do_now = false;
skipped_some = true;
- continue;
}
else
{
* comment's dependencies are satisfied, so skip it for now.
*/
if (skipped_some)
- continue;
+ do_now = false;
}
}
- ahlog(AH, 1, "processing item %d %s %s\n",
- next_work_item->dumpId,
- next_work_item->desc, next_work_item->tag);
+ /*
+ * Also skip items that need to be forced into later passes. We need
+ * not set skipped_some in this case, since by assumption no main-pass
+ * items could depend on these.
+ */
+ if (_tocEntryRestorePass(next_work_item) != RESTORE_PASS_MAIN)
+ do_now = false;
+
+ if (do_now)
+ {
+ /* OK, restore the item and update its dependencies */
+ ahlog(AH, 1, "processing item %d %s %s\n",
+ next_work_item->dumpId,
+ next_work_item->desc, next_work_item->tag);
- (void) restore_toc_entry(AH, next_work_item, false);
+ (void) restore_toc_entry(AH, next_work_item, false);
- /* there should be no touch of ready_list here, so pass NULL */
- reduce_dependencies(AH, next_work_item, NULL);
+ /* there should be no touch of ready_list here, so pass NULL */
+ reduce_dependencies(AH, next_work_item, NULL);
+ }
+ else
+ {
+ /* Nope, so add it to pending_list */
+ par_list_append(pending_list, next_work_item);
+ }
}
/*
/*
* Main engine for parallel restore.
*
- * Work is done in three phases.
- * First we process all SECTION_PRE_DATA tocEntries, in a single connection,
- * just as for a standard restore. This is done in restore_toc_entries_prefork().
- * Second we process the remaining non-ACL steps in parallel worker children
- * (threads on Windows, processes on Unix), these fork off and set up their
- * connections before we call restore_toc_entries_parallel_forked.
- * Finally we process all the ACL entries in a single connection (that happens
- * back in RestoreArchive).
+ * Parallel restore is done in three phases. In this second phase,
+ * we process entries by dispatching them to parallel worker children
+ * (processes on Unix, threads on Windows), each of which connects
+ * separately to the database. Inter-entry dependencies are respected,
+ * and so is the RestorePass multi-pass structure. When we can no longer
+ * make any entries ready to process, we exit. Normally, there will be
+ * nothing left to do; but if there is, the third phase will mop up.
*/
static void
restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
TocEntry *pending_list)
{
- int work_status;
- bool skipped_some;
TocEntry ready_list;
TocEntry *next_work_item;
- int ret_child;
ahlog(AH, 2, "entering restore_toc_entries_parallel\n");
/*
- * Initialize the lists of ready items, the list for pending items has
- * already been initialized in the caller. After this setup, the pending
- * list is everything that needs to be done but is blocked by one or more
- * dependencies, while the ready list contains items that have no
- * remaining dependencies. Note: we don't yet filter out entries that
- * aren't going to be restored. They might participate in dependency
- * chains connecting entries that should be restored, so we treat them as
- * live until we actually process them.
+ * The pending_list contains all items that we need to restore. Move all
+ * items that are available to process immediately into the ready_list.
+ * After this setup, the pending list is everything that needs to be done
+ * but is blocked by one or more dependencies, while the ready list
+ * contains items that have no remaining dependencies and are OK to
+ * process in the current restore pass.
*/
par_list_header_init(&ready_list);
- skipped_some = false;
- for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next)
- {
- /* NB: process-or-continue logic must be the inverse of loop above */
- if (next_work_item->section == SECTION_PRE_DATA)
- {
- /* All PRE_DATA items were dealt with above */
- continue;
- }
- if (next_work_item->section == SECTION_DATA ||
- next_work_item->section == SECTION_POST_DATA)
- {
- /* set this flag at same point that previous loop did */
- skipped_some = true;
- }
- else
- {
- /* SECTION_NONE items must be processed if previous loop didn't */
- if (!skipped_some)
- continue;
- }
-
- if (next_work_item->depCount > 0)
- par_list_append(pending_list, next_work_item);
- else
- par_list_append(&ready_list, next_work_item);
- }
+ AH->restorePass = RESTORE_PASS_MAIN;
+ move_to_ready_list(pending_list, &ready_list, AH->restorePass);
/*
* main parent loop
*
* Keep going until there is no worker still running AND there is no work
- * left to be done.
+ * left to be done. Note invariant: at top of loop, there should always
+ * be at least one worker available to dispatch a job to.
*/
-
ahlog(AH, 1, "entering main parallel loop\n");
- while ((next_work_item = get_next_work_item(AH, &ready_list, pstate)) != NULL ||
- !IsEveryWorkerIdle(pstate))
+ for (;;)
{
+ /* Look for an item ready to be dispatched to a worker */
+ next_work_item = get_next_work_item(AH, &ready_list, pstate);
if (next_work_item != NULL)
{
/* If not to be restored, don't waste time launching a worker */
- if ((next_work_item->reqs & (REQ_SCHEMA | REQ_DATA)) == 0 ||
- _tocEntryIsACL(next_work_item))
+ if ((next_work_item->reqs & (REQ_SCHEMA | REQ_DATA)) == 0)
{
ahlog(AH, 1, "skipping item %d %s %s\n",
next_work_item->dumpId,
next_work_item->desc, next_work_item->tag);
-
+ /* Drop it from ready_list, and update its dependencies */
par_list_remove(next_work_item);
reduce_dependencies(AH, next_work_item, &ready_list);
-
+ /* Loop around to see if anything else can be dispatched */
continue;
}
next_work_item->dumpId,
next_work_item->desc, next_work_item->tag);
+ /* Remove it from ready_list, and dispatch to some worker */
par_list_remove(next_work_item);
DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE);
}
+ else if (IsEveryWorkerIdle(pstate))
+ {
+ /*
+ * Nothing is ready and no worker is running, so we're done with
+ * the current pass or maybe with the whole process.
+ */
+ if (AH->restorePass == RESTORE_PASS_LAST)
+ break; /* No more parallel processing is possible */
+
+ /* Advance to next restore pass */
+ AH->restorePass++;
+ /* That probably allows some stuff to be made ready */
+ move_to_ready_list(pending_list, &ready_list, AH->restorePass);
+ /* Loop around to see if anything's now ready */
+ continue;
+ }
else
{
- /* at least one child is working and we have nothing ready. */
+ /*
+ * We have nothing ready, but at least one child is working, so
+ * wait for some subjob to finish.
+ */
}
for (;;)
{
int nTerm = 0;
+ int ret_child;
+ int work_status;
/*
* In order to reduce dependencies as soon as possible and
}
}
+ /* There should now be nothing in ready_list. */
+ Assert(ready_list.par_next == &ready_list);
+
ahlog(AH, 1, "finished main parallel loop\n");
}
+/*
+ * Main engine for parallel restore.
+ *
+ * Parallel restore is done in three phases. In this third phase,
+ * we mop up any remaining TOC entries by processing them serially.
+ * This phase normally should have nothing to do, but if we've somehow
+ * gotten stuck due to circular dependencies or some such, this provides
+ * at least some chance of completing the restore successfully.
+ */
static void
restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list)
{
_doSetFixedOutputState(AH);
/*
- * Make sure there is no non-ACL work left due to, say, circular
- * dependencies, or some other pathological condition. If so, do it in the
- * single parent connection.
+ * Make sure there is no work left due to, say, circular dependencies, or
+ * some other pathological condition. If so, do it in the single parent
+ * connection. We don't sweat about RestorePass ordering; it's likely we
+ * already violated that.
*/
for (te = pending_list->par_next; te != pending_list; te = te->par_next)
{
te->dumpId, te->desc, te->tag);
(void) restore_toc_entry(AH, te, false);
}
-
- /* The ACLs will be handled back in RestoreArchive. */
}
/*
}
+/*
+ * Move all immediately-ready items from pending_list to ready_list.
+ *
+ * Items are considered ready if they have no remaining dependencies and
+ * they belong in the current restore pass. (See also reduce_dependencies,
+ * which applies the same logic one-at-a-time.)
+ */
+static void
+move_to_ready_list(TocEntry *pending_list, TocEntry *ready_list,
+ RestorePass pass)
+{
+ TocEntry *te;
+ TocEntry *next_te;
+
+ for (te = pending_list->par_next; te != pending_list; te = next_te)
+ {
+ /* must save list link before possibly moving te to other list */
+ next_te = te->par_next;
+
+ if (te->depCount == 0 &&
+ _tocEntryRestorePass(te) == pass)
+ {
+ /* Remove it from pending_list ... */
+ par_list_remove(te);
+ /* ... and add to ready_list */
+ par_list_append(ready_list, te);
+ }
+ }
+}
+
/*
* Find the next work item (if any) that is capable of being run now.
*
{
TocEntry *otherte = AH->tocsByDumpId[te->revDeps[i]];
+ Assert(otherte->depCount > 0);
otherte->depCount--;
- if (otherte->depCount == 0 && otherte->par_prev != NULL)
+
+ /*
+ * It's ready if it has no remaining dependencies and it belongs in
+ * the current restore pass. However, don't move it if it has not yet
+ * been put into the pending list.
+ */
+ if (otherte->depCount == 0 &&
+ _tocEntryRestorePass(otherte) == AH->restorePass &&
+ otherte->par_prev != NULL)
{
/* It must be in the pending list, so remove it ... */
par_list_remove(otherte);