*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.c,v 1.174 2009/08/04 21:56:08 tgl Exp $
+ * $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.c,v 1.175 2009/08/07 22:48:34 tgl Exp $
*
*-------------------------------------------------------------------------
*/
#define thandle HANDLE
#endif
+/* Arguments needed for a worker child */
typedef struct _restore_args
{
ArchiveHandle *AH;
TocEntry *te;
} RestoreArgs;
+/* State for each parallel activity slot */
typedef struct _parallel_slot
{
thandle child_id;
static thandle reap_child(ParallelSlot *slots, int n_slots, int *work_status);
static bool work_in_progress(ParallelSlot *slots, int n_slots);
static int get_next_slot(ParallelSlot *slots, int n_slots);
+static void par_list_header_init(TocEntry *l);
+static void par_list_append(TocEntry *l, TocEntry *te);
+static void par_list_remove(TocEntry *te);
static TocEntry *get_next_work_item(ArchiveHandle *AH,
- TocEntry **first_unprocessed,
+ TocEntry *ready_list,
ParallelSlot *slots, int n_slots);
static parallel_restore_result parallel_restore(RestoreArgs *args);
-static void mark_work_done(ArchiveHandle *AH, thandle worker, int status,
+static void mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
+ thandle worker, int status,
ParallelSlot *slots, int n_slots);
static void fix_dependencies(ArchiveHandle *AH);
static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
DumpId tableId, DumpId tableDataId);
static void identify_locking_dependencies(TocEntry *te,
TocEntry **tocsByDumpId);
-static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te);
+static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
+ TocEntry *ready_list);
static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);
static ArchiveHandle *CloneArchive(ArchiveHandle *AH);
ParallelSlot *slots;
int work_status;
int next_slot;
- TocEntry *first_unprocessed = AH->toc->next;
+ TocEntry pending_list;
+ TocEntry ready_list;
TocEntry *next_work_item;
thandle ret_child;
TocEntry *te;
* faster in a single connection because we avoid all the connection and
* setup overhead.
*/
- while ((next_work_item = get_next_work_item(AH, &first_unprocessed,
- NULL, 0)) != NULL)
+ for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next)
{
if (next_work_item->section == SECTION_DATA ||
next_work_item->section == SECTION_POST_DATA)
(void) restore_toc_entry(AH, next_work_item, ropt, false);
- next_work_item->restored = true;
- reduce_dependencies(AH, next_work_item);
+ /* there should be no touch of ready_list here, so pass NULL */
+ reduce_dependencies(AH, next_work_item, NULL);
}
/*
AH->currTablespace = NULL;
AH->currWithOids = -1;
+ /*
+ * Initialize the lists of pending and ready items. After this setup,
+ * the pending list is everything that needs to be done but is blocked
+ * by one or more dependencies, while the ready list contains items that
+ * have no remaining dependencies. Note: we don't yet filter out entries
+ * that aren't going to be restored. They might participate in
+ * dependency chains connecting entries that should be restored, so we
+ * treat them as live until we actually process them.
+ */
+ par_list_header_init(&pending_list);
+ par_list_header_init(&ready_list);
+ for (; next_work_item != AH->toc; next_work_item = next_work_item->next)
+ {
+ if (next_work_item->depCount > 0)
+ par_list_append(&pending_list, next_work_item);
+ else
+ par_list_append(&ready_list, next_work_item);
+ }
+
/*
* main parent loop
*
ahlog(AH, 1, "entering main parallel loop\n");
- while ((next_work_item = get_next_work_item(AH, &first_unprocessed,
+ while ((next_work_item = get_next_work_item(AH, &ready_list,
slots, n_slots)) != NULL ||
work_in_progress(slots, n_slots))
{
next_work_item->dumpId,
next_work_item->desc, next_work_item->tag);
- next_work_item->restored = true;
- reduce_dependencies(AH, next_work_item);
+ par_list_remove(next_work_item);
+ reduce_dependencies(AH, next_work_item, &ready_list);
continue;
}
next_work_item->dumpId,
next_work_item->desc, next_work_item->tag);
- next_work_item->restored = true;
+ par_list_remove(next_work_item);
/* this memory is dealloced in mark_work_done() */
args = malloc(sizeof(RestoreArgs));
if (WIFEXITED(work_status))
{
- mark_work_done(AH, ret_child, WEXITSTATUS(work_status),
+ mark_work_done(AH, &ready_list,
+ ret_child, WEXITSTATUS(work_status),
slots, n_slots);
}
else
* dependencies, or some other pathological condition. If so, do it in the
* single parent connection.
*/
- for (te = AH->toc->next; te != AH->toc; te = te->next)
+ for (te = pending_list.par_next; te != &pending_list; te = te->par_next)
{
- if (!te->restored)
- {
- ahlog(AH, 1, "processing missed item %d %s %s\n",
- te->dumpId, te->desc, te->tag);
- (void) restore_toc_entry(AH, te, ropt, false);
- }
+ ahlog(AH, 1, "processing missed item %d %s %s\n",
+ te->dumpId, te->desc, te->tag);
+ (void) restore_toc_entry(AH, te, ropt, false);
}
/* The ACLs will be handled back in RestoreArchive. */
}
+/*
+ * Initialize the header of a parallel-processing list.
+ *
+ * These are circular lists with a dummy TocEntry as header, just like the
+ * main TOC list; but we use separate list links so that an entry can be in
+ * the main TOC list as well as in a parallel-processing list.
+ */
+static void
+par_list_header_init(TocEntry *l)
+{
+ l->par_prev = l->par_next = l;
+}
+
+/* Append te to the end of the parallel-processing list headed by l */
+static void
+par_list_append(TocEntry *l, TocEntry *te)
+{
+ te->par_prev = l->par_prev;
+ l->par_prev->par_next = te;
+ l->par_prev = te;
+ te->par_next = l;
+}
+
+/* Remove te from whatever parallel-processing list it's in */
+static void
+par_list_remove(TocEntry *te)
+{
+ te->par_prev->par_next = te->par_next;
+ te->par_next->par_prev = te->par_prev;
+ te->par_prev = NULL;
+ te->par_next = NULL;
+}
+
/*
* Find the next work item (if any) that is capable of being run now.
*
* To qualify, the item must have no remaining dependencies
- * and no requirement for locks that is incompatible with
- * items currently running.
+ * and no requirements for locks that are incompatible with
+ * items currently running. Items in the ready_list are known to have
+ * no remaining dependencies, but we have to check for lock conflicts.
*
- * first_unprocessed is state data that tracks the location of the first
- * TocEntry that's not marked 'restored'. This avoids O(N^2) search time
- * with long TOC lists. (Even though the constant is pretty small, it'd
- * get us eventually.)
+ * Note that the returned item has *not* been removed from ready_list.
+ * The caller must do that after successfully dispatching the item.
*
* pref_non_data is for an alternative selection algorithm that gives
* preference to non-data items if there is already a data load running.
* It is currently disabled.
*/
static TocEntry *
-get_next_work_item(ArchiveHandle *AH, TocEntry **first_unprocessed,
+get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
ParallelSlot *slots, int n_slots)
{
bool pref_non_data = false; /* or get from AH->ropt */
}
/*
- * Advance first_unprocessed if possible.
- */
- for (te = *first_unprocessed; te != AH->toc; te = te->next)
- {
- if (!te->restored)
- break;
- }
- *first_unprocessed = te;
-
- /*
- * Search from first_unprocessed until we find an available item.
+ * Search the ready_list until we find a suitable item.
*/
- for (; te != AH->toc; te = te->next)
+ for (te = ready_list->par_next; te != ready_list; te = te->par_next)
{
bool conflicts = false;
- /* Ignore if already done or still waiting on dependencies */
- if (te->restored || te->depCount > 0)
- continue;
-
/*
* Check to see if the item would need exclusive lock on something
* that a currently running item also needs lock on, or vice versa. If
* update status, and reduce the dependency count of any dependent items.
*/
static void
-mark_work_done(ArchiveHandle *AH, thandle worker, int status,
+mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
+ thandle worker, int status,
ParallelSlot *slots, int n_slots)
{
TocEntry *te = NULL;
die_horribly(AH, modulename, "worker process failed: exit code %d\n",
status);
- reduce_dependencies(AH, te);
+ reduce_dependencies(AH, te, ready_list);
}
* indexes the TOC entries by dump ID, rather than searching the TOC list
* repeatedly. Entries for dump IDs not present in the TOC will be NULL.
*
- * Also, initialize the depCount fields.
+ * Also, initialize the depCount fields, and make sure all the TOC items
+ * are marked as not being in any parallel-processing list.
*/
tocsByDumpId = (TocEntry **) calloc(AH->maxDumpId, sizeof(TocEntry *));
for (te = AH->toc->next; te != AH->toc; te = te->next)
{
tocsByDumpId[te->dumpId - 1] = te;
te->depCount = te->nDeps;
+ te->par_prev = NULL;
+ te->par_next = NULL;
}
/*
/*
* Remove the specified TOC entry from the depCounts of items that depend on
- * it, thereby possibly making them ready-to-run.
+ * it, thereby possibly making them ready-to-run. Any pending item that
+ * becomes ready should be moved to the ready list.
*/
static void
-reduce_dependencies(ArchiveHandle *AH, TocEntry *te)
+reduce_dependencies(ArchiveHandle *AH, TocEntry *te, TocEntry *ready_list)
{
DumpId target = te->dumpId;
int i;
for (i = 0; i < te->nDeps; i++)
{
if (te->dependencies[i] == target)
+ {
te->depCount--;
+ if (te->depCount == 0 && te->par_prev != NULL)
+ {
+ /* It must be in the pending list, so remove it ... */
+ par_list_remove(te);
+ /* ... and add to ready_list */
+ par_list_append(ready_list, te);
+ }
+ }
}
}
}