From: Tom Lane Date: Fri, 7 Aug 2009 22:48:34 +0000 (+0000) Subject: Modify parallel pg_restore to track pending and ready items by means of X-Git-Tag: REL8_5_ALPHA1~53 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=f033f6d28b014ff21c8ed250f55062c2fb1b12e8;p=postgresql Modify parallel pg_restore to track pending and ready items by means of two new lists, rather than repeatedly rescanning the main TOC list. This avoids a potential O(N^2) slowdown, although you'd need a *lot* of tables to make that really significant; and it might simplify future improvements in the scheduling algorithm by making the set of ready items more easily inspectable. The original thought that it would in itself result in a more efficient job dispatch order doesn't seem to have been borne out in testing, but it seems worth doing anyway. --- diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c index 2ff282faa8..20bd3eb7ea 100644 --- a/src/bin/pg_dump/pg_backup_archiver.c +++ b/src/bin/pg_dump/pg_backup_archiver.c @@ -15,7 +15,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.c,v 1.174 2009/08/04 21:56:08 tgl Exp $ + * $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.c,v 1.175 2009/08/07 22:48:34 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -59,12 +59,14 @@ #define thandle HANDLE #endif +/* Arguments needed for a worker child */ typedef struct _restore_args { ArchiveHandle *AH; TocEntry *te; } RestoreArgs; +/* State for each parallel activity slot */ typedef struct _parallel_slot { thandle child_id; @@ -117,11 +119,15 @@ static thandle spawn_restore(RestoreArgs *args); static thandle reap_child(ParallelSlot *slots, int n_slots, int *work_status); static bool work_in_progress(ParallelSlot *slots, int n_slots); static int get_next_slot(ParallelSlot *slots, int n_slots); +static void par_list_header_init(TocEntry *l); +static void par_list_append(TocEntry *l, TocEntry *te); +static void par_list_remove(TocEntry *te); static TocEntry *get_next_work_item(ArchiveHandle *AH, - TocEntry **first_unprocessed, + TocEntry *ready_list, ParallelSlot *slots, int n_slots); static parallel_restore_result parallel_restore(RestoreArgs *args); -static void mark_work_done(ArchiveHandle *AH, thandle worker, int status, +static void mark_work_done(ArchiveHandle *AH, TocEntry *ready_list, + thandle worker, int status, ParallelSlot *slots, int n_slots); static void fix_dependencies(ArchiveHandle *AH); static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2); @@ -129,7 +135,8 @@ static void repoint_table_dependencies(ArchiveHandle *AH, DumpId tableId, DumpId tableDataId); static void identify_locking_dependencies(TocEntry *te, TocEntry **tocsByDumpId); -static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te); +static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te, + TocEntry *ready_list); static void mark_create_done(ArchiveHandle *AH, TocEntry *te); static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te); static ArchiveHandle *CloneArchive(ArchiveHandle *AH); @@ -3069,7 +3076,8 @@ restore_toc_entries_parallel(ArchiveHandle *AH) ParallelSlot *slots; int work_status; int next_slot; - TocEntry *first_unprocessed = AH->toc->next; + TocEntry pending_list; + TocEntry ready_list; TocEntry *next_work_item; thandle ret_child; TocEntry *te; @@ -3091,8 +3099,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH) * faster in a single connection because we avoid all the connection and * setup overhead. */ - while ((next_work_item = get_next_work_item(AH, &first_unprocessed, - NULL, 0)) != NULL) + for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next) { if (next_work_item->section == SECTION_DATA || next_work_item->section == SECTION_POST_DATA) @@ -3104,8 +3111,8 @@ restore_toc_entries_parallel(ArchiveHandle *AH) (void) restore_toc_entry(AH, next_work_item, ropt, false); - next_work_item->restored = true; - reduce_dependencies(AH, next_work_item); + /* there should be no touch of ready_list here, so pass NULL */ + reduce_dependencies(AH, next_work_item, NULL); } /* @@ -3128,6 +3135,25 @@ restore_toc_entries_parallel(ArchiveHandle *AH) AH->currTablespace = NULL; AH->currWithOids = -1; + /* + * Initialize the lists of pending and ready items. After this setup, + * the pending list is everything that needs to be done but is blocked + * by one or more dependencies, while the ready list contains items that + * have no remaining dependencies. Note: we don't yet filter out entries + * that aren't going to be restored. They might participate in + * dependency chains connecting entries that should be restored, so we + * treat them as live until we actually process them. + */ + par_list_header_init(&pending_list); + par_list_header_init(&ready_list); + for (; next_work_item != AH->toc; next_work_item = next_work_item->next) + { + if (next_work_item->depCount > 0) + par_list_append(&pending_list, next_work_item); + else + par_list_append(&ready_list, next_work_item); + } + /* * main parent loop * @@ -3137,7 +3163,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH) ahlog(AH, 1, "entering main parallel loop\n"); - while ((next_work_item = get_next_work_item(AH, &first_unprocessed, + while ((next_work_item = get_next_work_item(AH, &ready_list, slots, n_slots)) != NULL || work_in_progress(slots, n_slots)) { @@ -3153,8 +3179,8 @@ restore_toc_entries_parallel(ArchiveHandle *AH) next_work_item->dumpId, next_work_item->desc, next_work_item->tag); - next_work_item->restored = true; - reduce_dependencies(AH, next_work_item); + par_list_remove(next_work_item); + reduce_dependencies(AH, next_work_item, &ready_list); continue; } @@ -3169,7 +3195,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH) next_work_item->dumpId, next_work_item->desc, next_work_item->tag); - next_work_item->restored = true; + par_list_remove(next_work_item); /* this memory is dealloced in mark_work_done() */ args = malloc(sizeof(RestoreArgs)); @@ -3196,7 +3222,8 @@ restore_toc_entries_parallel(ArchiveHandle *AH) if (WIFEXITED(work_status)) { - mark_work_done(AH, ret_child, WEXITSTATUS(work_status), + mark_work_done(AH, &ready_list, + ret_child, WEXITSTATUS(work_status), slots, n_slots); } else @@ -3222,14 +3249,11 @@ restore_toc_entries_parallel(ArchiveHandle *AH) * dependencies, or some other pathological condition. If so, do it in the * single parent connection. */ - for (te = AH->toc->next; te != AH->toc; te = te->next) + for (te = pending_list.par_next; te != &pending_list; te = te->par_next) { - if (!te->restored) - { - ahlog(AH, 1, "processing missed item %d %s %s\n", - te->dumpId, te->desc, te->tag); - (void) restore_toc_entry(AH, te, ropt, false); - } + ahlog(AH, 1, "processing missed item %d %s %s\n", + te->dumpId, te->desc, te->tag); + (void) restore_toc_entry(AH, te, ropt, false); } /* The ACLs will be handled back in RestoreArchive. */ @@ -3372,25 +3396,57 @@ has_lock_conflicts(TocEntry *te1, TocEntry *te2) } +/* + * Initialize the header of a parallel-processing list. + * + * These are circular lists with a dummy TocEntry as header, just like the + * main TOC list; but we use separate list links so that an entry can be in + * the main TOC list as well as in a parallel-processing list. + */ +static void +par_list_header_init(TocEntry *l) +{ + l->par_prev = l->par_next = l; +} + +/* Append te to the end of the parallel-processing list headed by l */ +static void +par_list_append(TocEntry *l, TocEntry *te) +{ + te->par_prev = l->par_prev; + l->par_prev->par_next = te; + l->par_prev = te; + te->par_next = l; +} + +/* Remove te from whatever parallel-processing list it's in */ +static void +par_list_remove(TocEntry *te) +{ + te->par_prev->par_next = te->par_next; + te->par_next->par_prev = te->par_prev; + te->par_prev = NULL; + te->par_next = NULL; +} + /* * Find the next work item (if any) that is capable of being run now. * * To qualify, the item must have no remaining dependencies - * and no requirement for locks that is incompatible with - * items currently running. + * and no requirements for locks that are incompatible with + * items currently running. Items in the ready_list are known to have + * no remaining dependencies, but we have to check for lock conflicts. * - * first_unprocessed is state data that tracks the location of the first - * TocEntry that's not marked 'restored'. This avoids O(N^2) search time - * with long TOC lists. (Even though the constant is pretty small, it'd - * get us eventually.) + * Note that the returned item has *not* been removed from ready_list. + * The caller must do that after successfully dispatching the item. * * pref_non_data is for an alternative selection algorithm that gives * preference to non-data items if there is already a data load running. * It is currently disabled. */ static TocEntry * -get_next_work_item(ArchiveHandle *AH, TocEntry **first_unprocessed, +get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list, ParallelSlot *slots, int n_slots) { bool pref_non_data = false; /* or get from AH->ropt */ @@ -3415,26 +3471,12 @@ get_next_work_item(ArchiveHandle *AH, TocEntry **first_unprocessed, } /* - * Advance first_unprocessed if possible. - */ - for (te = *first_unprocessed; te != AH->toc; te = te->next) - { - if (!te->restored) - break; - } - *first_unprocessed = te; - - /* - * Search from first_unprocessed until we find an available item. + * Search the ready_list until we find a suitable item. */ - for (; te != AH->toc; te = te->next) + for (te = ready_list->par_next; te != ready_list; te = te->par_next) { bool conflicts = false; - /* Ignore if already done or still waiting on dependencies */ - if (te->restored || te->depCount > 0) - continue; - /* * Check to see if the item would need exclusive lock on something * that a currently running item also needs lock on, or vice versa. If @@ -3546,7 +3588,8 @@ parallel_restore(RestoreArgs *args) * update status, and reduce the dependency count of any dependent items. */ static void -mark_work_done(ArchiveHandle *AH, thandle worker, int status, +mark_work_done(ArchiveHandle *AH, TocEntry *ready_list, + thandle worker, int status, ParallelSlot *slots, int n_slots) { TocEntry *te = NULL; @@ -3585,7 +3628,7 @@ mark_work_done(ArchiveHandle *AH, thandle worker, int status, die_horribly(AH, modulename, "worker process failed: exit code %d\n", status); - reduce_dependencies(AH, te); + reduce_dependencies(AH, te, ready_list); } @@ -3610,13 +3653,16 @@ fix_dependencies(ArchiveHandle *AH) * indexes the TOC entries by dump ID, rather than searching the TOC list * repeatedly. Entries for dump IDs not present in the TOC will be NULL. * - * Also, initialize the depCount fields. + * Also, initialize the depCount fields, and make sure all the TOC items + * are marked as not being in any parallel-processing list. */ tocsByDumpId = (TocEntry **) calloc(AH->maxDumpId, sizeof(TocEntry *)); for (te = AH->toc->next; te != AH->toc; te = te->next) { tocsByDumpId[te->dumpId - 1] = te; te->depCount = te->nDeps; + te->par_prev = NULL; + te->par_next = NULL; } /* @@ -3785,10 +3831,11 @@ identify_locking_dependencies(TocEntry *te, TocEntry **tocsByDumpId) /* * Remove the specified TOC entry from the depCounts of items that depend on - * it, thereby possibly making them ready-to-run. + * it, thereby possibly making them ready-to-run. Any pending item that + * becomes ready should be moved to the ready list. */ static void -reduce_dependencies(ArchiveHandle *AH, TocEntry *te) +reduce_dependencies(ArchiveHandle *AH, TocEntry *te, TocEntry *ready_list) { DumpId target = te->dumpId; int i; @@ -3805,7 +3852,16 @@ reduce_dependencies(ArchiveHandle *AH, TocEntry *te) for (i = 0; i < te->nDeps; i++) { if (te->dependencies[i] == target) + { te->depCount--; + if (te->depCount == 0 && te->par_prev != NULL) + { + /* It must be in the pending list, so remove it ... */ + par_list_remove(te); + /* ... and add to ready_list */ + par_list_append(ready_list, te); + } + } } } } diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h index 710dec019a..fa6db407f2 100644 --- a/src/bin/pg_dump/pg_backup_archiver.h +++ b/src/bin/pg_dump/pg_backup_archiver.h @@ -17,7 +17,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.h,v 1.81 2009/08/04 21:56:09 tgl Exp $ + * $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.h,v 1.82 2009/08/07 22:48:34 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -314,7 +314,8 @@ typedef struct _tocEntry void *formatData; /* TOC Entry data specific to file format */ /* working state (needed only for parallel restore) */ - bool restored; /* item is in progress or done */ + struct _tocEntry *par_prev; /* list links for pending/ready items; */ + struct _tocEntry *par_next; /* these are NULL if not in either list */ bool created; /* set for DATA member if TABLE was created */ int depCount; /* number of dependencies not yet restored */ DumpId *lockDeps; /* dumpIds of objects this one needs lock on */