extern void DisconnectDatabase(Archive *AHX);
extern PGconn *GetConnection(Archive *AHX);
-/* Called to add a TOC entry */
-extern void ArchiveEntry(Archive *AHX,
- CatalogId catalogId, DumpId dumpId,
- const char *tag,
- const char *namespace, const char *tablespace,
- const char *owner, bool withOids,
- const char *desc, teSection section,
- const char *defn,
- const char *dropStmt, const char *copyStmt,
- const DumpId *deps, int nDeps,
- DataDumperPtr dumpFn, void *dumpArg);
-
/* Called to write *data* to the archive */
extern void WriteData(Archive *AH, const void *data, size_t dLen);
int gzOut;
} OutputContext;
+/*
+ * State for tracking TocEntrys that are ready to process during a parallel
+ * restore. (This used to be a list, and we still call it that, though now
+ * it's really an array so that we can apply qsort to it.)
+ *
+ * tes[] is sized large enough that we can't overrun it.
+ * The valid entries are indexed first_te .. last_te inclusive.
+ * We periodically sort the array to bring larger-by-dataLength entries to
+ * the front; "sorted" is true if the valid entries are known sorted.
+ */
+typedef struct _parallelReadyList
+{
+ TocEntry **tes; /* Ready-to-dump TocEntrys */
+ int first_te; /* index of first valid entry in tes[] */
+ int last_te; /* index of last valid entry in tes[] */
+ bool sorted; /* are valid entries currently sorted? */
+} ParallelReadyList;
+
/* translator: this is a module name */
static const char *modulename = gettext_noop("archiver");
TocEntry *pending_list);
static void restore_toc_entries_postfork(ArchiveHandle *AH,
TocEntry *pending_list);
-static void par_list_header_init(TocEntry *l);
-static void par_list_append(TocEntry *l, TocEntry *te);
-static void par_list_remove(TocEntry *te);
-static void move_to_ready_list(TocEntry *pending_list, TocEntry *ready_list,
+static void pending_list_header_init(TocEntry *l);
+static void pending_list_append(TocEntry *l, TocEntry *te);
+static void pending_list_remove(TocEntry *te);
+static void ready_list_init(ParallelReadyList *ready_list, int tocCount);
+static void ready_list_free(ParallelReadyList *ready_list);
+static void ready_list_insert(ParallelReadyList *ready_list, TocEntry *te);
+static void ready_list_remove(ParallelReadyList *ready_list, int i);
+static void ready_list_sort(ParallelReadyList *ready_list);
+static int TocEntrySizeCompare(const void *p1, const void *p2);
+static void move_to_ready_list(TocEntry *pending_list,
+ ParallelReadyList *ready_list,
RestorePass pass);
-static TocEntry *get_next_work_item(ArchiveHandle *AH,
- TocEntry *ready_list,
+static TocEntry *pop_next_work_item(ArchiveHandle *AH,
+ ParallelReadyList *ready_list,
ParallelState *pstate);
static void mark_dump_job_done(ArchiveHandle *AH,
TocEntry *te,
static void repoint_table_dependencies(ArchiveHandle *AH);
static void identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te);
static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
- TocEntry *ready_list);
+ ParallelReadyList *ready_list);
static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);
ParallelState *pstate;
TocEntry pending_list;
- par_list_header_init(&pending_list);
+ /* The archive format module may need some setup for this */
+ if (AH->PrepParallelRestorePtr)
+ AH->PrepParallelRestorePtr(AH);
+
+ pending_list_header_init(&pending_list);
/* This runs PRE_DATA items and then disconnects from the database */
restore_toc_entries_prefork(AH, &pending_list);
/*
* Create a new TOC entry. The TOC was designed as a TOC, but is now the
* repository for all metadata. But the name has stuck.
+ *
+ * The new entry is added to the Archive's TOC list. Most callers can ignore
+ * the result value because nothing else need be done, but a few want to
+ * manipulate the TOC entry further.
*/
/* Public */
-void
+TocEntry *
ArchiveEntry(Archive *AHX,
CatalogId catalogId, DumpId dumpId,
const char *tag,
newToc->hadDumper = dumpFn ? true : false;
newToc->formatData = NULL;
+ newToc->dataLength = 0;
if (AH->ArchiveEntryPtr != NULL)
AH->ArchiveEntryPtr(AH, newToc);
+
+ return newToc;
}
/* Public */
{
TocEntry *te;
- for (te = AH->toc->next; te != AH->toc; te = te->next)
+ if (pstate && pstate->numWorkers > 1)
{
- if (!te->dataDumper)
- continue;
-
- if ((te->reqs & REQ_DATA) == 0)
- continue;
+ /*
+ * In parallel mode, this code runs in the master process. We
+ * construct an array of candidate TEs, then sort it into decreasing
+ * size order, then dispatch each TE to a data-transfer worker. By
+ * dumping larger tables first, we avoid getting into a situation
+ * where we're down to one job and it's big, losing parallelism.
+ */
+ TocEntry **tes;
+ int ntes;
- if (pstate && pstate->numWorkers > 1)
+ tes = (TocEntry **) pg_malloc(AH->tocCount * sizeof(TocEntry *));
+ ntes = 0;
+ for (te = AH->toc->next; te != AH->toc; te = te->next)
{
- /*
- * If we are in a parallel backup, then we are always the master
- * process. Dispatch each data-transfer job to a worker.
- */
- DispatchJobForTocEntry(AH, pstate, te, ACT_DUMP,
- mark_dump_job_done, NULL);
+ /* Consider only TEs with dataDumper functions ... */
+ if (!te->dataDumper)
+ continue;
+ /* ... and ignore ones not enabled for dump */
+ if ((te->reqs & REQ_DATA) == 0)
+ continue;
+
+ tes[ntes++] = te;
}
- else
- WriteDataChunksForTocEntry(AH, te);
- }
- /*
- * If parallel, wait for workers to finish.
- */
- if (pstate && pstate->numWorkers > 1)
+ if (ntes > 1)
+ qsort((void *) tes, ntes, sizeof(TocEntry *),
+ TocEntrySizeCompare);
+
+ for (int i = 0; i < ntes; i++)
+ DispatchJobForTocEntry(AH, pstate, tes[i], ACT_DUMP,
+ mark_dump_job_done, NULL);
+
+ pg_free(tes);
+
+ /* Now wait for workers to finish. */
WaitForWorkers(AH, pstate, WFW_ALL_IDLE);
+ }
+ else
+ {
+ /* Non-parallel mode: just dump all candidate TEs sequentially. */
+ for (te = AH->toc->next; te != AH->toc; te = te->next)
+ {
+ /* Must have same filter conditions as above */
+ if (!te->dataDumper)
+ continue;
+ if ((te->reqs & REQ_DATA) == 0)
+ continue;
+
+ WriteDataChunksForTocEntry(AH, te);
+ }
+ }
}
te->dependencies = NULL;
te->nDeps = 0;
}
+ te->dataLength = 0;
if (AH->ReadExtraTocPtr)
AH->ReadExtraTocPtr(AH, te);
else
{
/* Nope, so add it to pending_list */
- par_list_append(pending_list, next_work_item);
+ pending_list_append(pending_list, next_work_item);
}
}
restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
TocEntry *pending_list)
{
- TocEntry ready_list;
+ ParallelReadyList ready_list;
TocEntry *next_work_item;
ahlog(AH, 2, "entering restore_toc_entries_parallel\n");
+ /* Set up ready_list with enough room for all known TocEntrys */
+ ready_list_init(&ready_list, AH->tocCount);
+
/*
* The pending_list contains all items that we need to restore. Move all
* items that are available to process immediately into the ready_list.
* contains items that have no remaining dependencies and are OK to
* process in the current restore pass.
*/
- par_list_header_init(&ready_list);
AH->restorePass = RESTORE_PASS_MAIN;
move_to_ready_list(pending_list, &ready_list, AH->restorePass);
for (;;)
{
/* Look for an item ready to be dispatched to a worker */
- next_work_item = get_next_work_item(AH, &ready_list, pstate);
+ next_work_item = pop_next_work_item(AH, &ready_list, pstate);
if (next_work_item != NULL)
{
/* If not to be restored, don't waste time launching a worker */
ahlog(AH, 1, "skipping item %d %s %s\n",
next_work_item->dumpId,
next_work_item->desc, next_work_item->tag);
- /* Drop it from ready_list, and update its dependencies */
- par_list_remove(next_work_item);
+ /* Update its dependencies as though we'd completed it */
reduce_dependencies(AH, next_work_item, &ready_list);
/* Loop around to see if anything else can be dispatched */
continue;
next_work_item->dumpId,
next_work_item->desc, next_work_item->tag);
- /* Remove it from ready_list, and dispatch to some worker */
- par_list_remove(next_work_item);
-
+ /* Dispatch to some worker */
DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE,
mark_restore_job_done, &ready_list);
}
}
/* There should now be nothing in ready_list. */
- Assert(ready_list.par_next == &ready_list);
+ Assert(ready_list.first_te > ready_list.last_te);
+
+ ready_list_free(&ready_list);
ahlog(AH, 1, "finished main parallel loop\n");
}
* connection. We don't sweat about RestorePass ordering; it's likely we
* already violated that.
*/
- for (te = pending_list->par_next; te != pending_list; te = te->par_next)
+ for (te = pending_list->pending_next; te != pending_list; te = te->pending_next)
{
ahlog(AH, 1, "processing missed item %d %s %s\n",
te->dumpId, te->desc, te->tag);
/*
- * Initialize the header of a parallel-processing list.
+ * Initialize the header of the pending-items list.
*
- * These are circular lists with a dummy TocEntry as header, just like the
+ * This is a circular list with a dummy TocEntry as header, just like the
* main TOC list; but we use separate list links so that an entry can be in
- * the main TOC list as well as in a parallel-processing list.
+ * the main TOC list as well as in the pending list.
+ */
+static void
+pending_list_header_init(TocEntry *l)
+{
+ l->pending_prev = l->pending_next = l;
+}
+
+/* Append te to the end of the pending-list headed by l */
+static void
+pending_list_append(TocEntry *l, TocEntry *te)
+{
+ te->pending_prev = l->pending_prev;
+ l->pending_prev->pending_next = te;
+ l->pending_prev = te;
+ te->pending_next = l;
+}
+
+/* Remove te from the pending-list */
+static void
+pending_list_remove(TocEntry *te)
+{
+ te->pending_prev->pending_next = te->pending_next;
+ te->pending_next->pending_prev = te->pending_prev;
+ te->pending_prev = NULL;
+ te->pending_next = NULL;
+}
+
+
+/*
+ * Initialize the ready_list with enough room for up to tocCount entries.
*/
static void
-par_list_header_init(TocEntry *l)
+ready_list_init(ParallelReadyList *ready_list, int tocCount)
{
- l->par_prev = l->par_next = l;
+ ready_list->tes = (TocEntry **)
+ pg_malloc(tocCount * sizeof(TocEntry *));
+ ready_list->first_te = 0;
+ ready_list->last_te = -1;
+ ready_list->sorted = false;
}
-/* Append te to the end of the parallel-processing list headed by l */
+/*
+ * Free storage for a ready_list.
+ */
+static void
+ready_list_free(ParallelReadyList *ready_list)
+{
+ pg_free(ready_list->tes);
+}
+
+/* Add te to the ready_list */
static void
-par_list_append(TocEntry *l, TocEntry *te)
+ready_list_insert(ParallelReadyList *ready_list, TocEntry *te)
{
- te->par_prev = l->par_prev;
- l->par_prev->par_next = te;
- l->par_prev = te;
- te->par_next = l;
+ ready_list->tes[++ready_list->last_te] = te;
+ /* List is (probably) not sorted anymore. */
+ ready_list->sorted = false;
+}
+
+/* Remove the i'th entry in the ready_list */
+static void
+ready_list_remove(ParallelReadyList *ready_list, int i)
+{
+ int f = ready_list->first_te;
+
+ Assert(i >= f && i <= ready_list->last_te);
+
+ /*
+ * In the typical case where the item to be removed is the first ready
+ * entry, we need only increment first_te to remove it. Otherwise, move
+ * the entries before it to compact the list. (This preserves sortedness,
+ * if any.) We could alternatively move the entries after i, but there
+ * are typically many more of those.
+ */
+ if (i > f)
+ {
+ TocEntry **first_te_ptr = &ready_list->tes[f];
+
+ memmove(first_te_ptr + 1, first_te_ptr, (i - f) * sizeof(TocEntry *));
+ }
+ ready_list->first_te++;
}
-/* Remove te from whatever parallel-processing list it's in */
+/* Sort the ready_list into the desired order */
static void
-par_list_remove(TocEntry *te)
+ready_list_sort(ParallelReadyList *ready_list)
{
- te->par_prev->par_next = te->par_next;
- te->par_next->par_prev = te->par_prev;
- te->par_prev = NULL;
- te->par_next = NULL;
+ if (!ready_list->sorted)
+ {
+ int n = ready_list->last_te - ready_list->first_te + 1;
+
+ if (n > 1)
+ qsort(ready_list->tes + ready_list->first_te, n,
+ sizeof(TocEntry *),
+ TocEntrySizeCompare);
+ ready_list->sorted = true;
+ }
+}
+
+/* qsort comparator for sorting TocEntries by dataLength */
+static int
+TocEntrySizeCompare(const void *p1, const void *p2)
+{
+ const TocEntry *te1 = *(const TocEntry *const *) p1;
+ const TocEntry *te2 = *(const TocEntry *const *) p2;
+
+ /* Sort by decreasing dataLength */
+ if (te1->dataLength > te2->dataLength)
+ return -1;
+ if (te1->dataLength < te2->dataLength)
+ return 1;
+
+ /* For equal dataLengths, sort by dumpId, just to be stable */
+ if (te1->dumpId < te2->dumpId)
+ return -1;
+ if (te1->dumpId > te2->dumpId)
+ return 1;
+
+ return 0;
}
* which applies the same logic one-at-a-time.)
*/
static void
-move_to_ready_list(TocEntry *pending_list, TocEntry *ready_list,
+move_to_ready_list(TocEntry *pending_list,
+ ParallelReadyList *ready_list,
RestorePass pass)
{
TocEntry *te;
TocEntry *next_te;
- for (te = pending_list->par_next; te != pending_list; te = next_te)
+ for (te = pending_list->pending_next; te != pending_list; te = next_te)
{
- /* must save list link before possibly moving te to other list */
- next_te = te->par_next;
+ /* must save list link before possibly removing te from list */
+ next_te = te->pending_next;
if (te->depCount == 0 &&
_tocEntryRestorePass(te) == pass)
{
/* Remove it from pending_list ... */
- par_list_remove(te);
+ pending_list_remove(te);
/* ... and add to ready_list */
- par_list_append(ready_list, te);
+ ready_list_insert(ready_list, te);
}
}
}
/*
- * Find the next work item (if any) that is capable of being run now.
+ * Find the next work item (if any) that is capable of being run now,
+ * and remove it from the ready_list.
+ *
+ * Returns the item, or NULL if nothing is runnable.
*
* To qualify, the item must have no remaining dependencies
* and no requirements for locks that are incompatible with
* items currently running. Items in the ready_list are known to have
* no remaining dependencies, but we have to check for lock conflicts.
*
- * Note that the returned item has *not* been removed from ready_list.
- * The caller must do that after successfully dispatching the item.
- *
* pref_non_data is for an alternative selection algorithm that gives
* preference to non-data items if there is already a data load running.
* It is currently disabled.
*/
static TocEntry *
-get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
+pop_next_work_item(ArchiveHandle *AH, ParallelReadyList *ready_list,
ParallelState *pstate)
{
bool pref_non_data = false; /* or get from AH->ropt */
- TocEntry *data_te = NULL;
- TocEntry *te;
- int i,
- k;
+ int data_te_index = -1;
/*
* Bogus heuristics for pref_non_data
{
int count = 0;
- for (k = 0; k < pstate->numWorkers; k++)
+ for (int k = 0; k < pstate->numWorkers; k++)
{
TocEntry *running_te = pstate->te[k];
pref_non_data = false;
}
+ /*
+ * Sort the ready_list so that we'll tackle larger jobs first.
+ */
+ ready_list_sort(ready_list);
+
/*
* Search the ready_list until we find a suitable item.
*/
- for (te = ready_list->par_next; te != ready_list; te = te->par_next)
+ for (int i = ready_list->first_te; i <= ready_list->last_te; i++)
{
+ TocEntry *te = ready_list->tes[i];
bool conflicts = false;
/*
* that a currently running item also needs lock on, or vice versa. If
* so, we don't want to schedule them together.
*/
- for (i = 0; i < pstate->numWorkers; i++)
+ for (int k = 0; k < pstate->numWorkers; k++)
{
- TocEntry *running_te = pstate->te[i];
+ TocEntry *running_te = pstate->te[k];
if (running_te == NULL)
continue;
if (pref_non_data && te->section == SECTION_DATA)
{
- if (data_te == NULL)
- data_te = te;
+ if (data_te_index < 0)
+ data_te_index = i;
continue;
}
/* passed all tests, so this item can run */
+ ready_list_remove(ready_list, i);
return te;
}
- if (data_te != NULL)
+ if (data_te_index >= 0)
+ {
+ TocEntry *data_te = ready_list->tes[data_te_index];
+
+ ready_list_remove(ready_list, data_te_index);
return data_te;
+ }
ahlog(AH, 2, "no item ready\n");
return NULL;
int status,
void *callback_data)
{
- TocEntry *ready_list = (TocEntry *) callback_data;
+ ParallelReadyList *ready_list = (ParallelReadyList *) callback_data;
ahlog(AH, 1, "finished item %d %s %s\n",
te->dumpId, te->desc, te->tag);
te->depCount = te->nDeps;
te->revDeps = NULL;
te->nRevDeps = 0;
- te->par_prev = NULL;
- te->par_next = NULL;
+ te->pending_prev = NULL;
+ te->pending_next = NULL;
}
/*
/*
* Change dependencies on table items to depend on table data items instead,
* but only in POST_DATA items.
+ *
+ * Also, for any item having such dependency(s), set its dataLength to the
+ * largest dataLength of the table data items it depends on. This ensures
+ * that parallel restore will prioritize larger jobs (index builds, FK
+ * constraint checks, etc) over smaller ones, avoiding situations where we
+ * end a restore with only one active job working on a large table.
*/
static void
repoint_table_dependencies(ArchiveHandle *AH)
if (olddep <= AH->maxDumpId &&
AH->tableDataId[olddep] != 0)
{
- te->dependencies[i] = AH->tableDataId[olddep];
+ DumpId tabledataid = AH->tableDataId[olddep];
+ TocEntry *tabledatate = AH->tocsByDumpId[tabledataid];
+
+ te->dependencies[i] = tabledataid;
+ te->dataLength = Max(te->dataLength, tabledatate->dataLength);
ahlog(AH, 2, "transferring dependency %d -> %d to %d\n",
- te->dumpId, olddep, AH->tableDataId[olddep]);
+ te->dumpId, olddep, tabledataid);
}
}
}
* becomes ready should be moved to the ready_list, if that's provided.
*/
static void
-reduce_dependencies(ArchiveHandle *AH, TocEntry *te, TocEntry *ready_list)
+reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
+ ParallelReadyList *ready_list)
{
int i;
*/
if (otherte->depCount == 0 &&
_tocEntryRestorePass(otherte) == AH->restorePass &&
- otherte->par_prev != NULL &&
+ otherte->pending_prev != NULL &&
ready_list != NULL)
{
/* Remove it from pending list ... */
- par_list_remove(otherte);
+ pending_list_remove(otherte);
/* ... and add to ready_list */
- par_list_append(ready_list, otherte);
+ ready_list_insert(ready_list, otherte);
}
}
}
typedef int (*ReadBytePtrType) (ArchiveHandle *AH);
typedef void (*WriteBufPtrType) (ArchiveHandle *AH, const void *c, size_t len);
typedef void (*ReadBufPtrType) (ArchiveHandle *AH, void *buf, size_t len);
-typedef void (*SaveArchivePtrType) (ArchiveHandle *AH);
typedef void (*WriteExtraTocPtrType) (ArchiveHandle *AH, TocEntry *te);
typedef void (*ReadExtraTocPtrType) (ArchiveHandle *AH, TocEntry *te);
typedef void (*PrintExtraTocPtrType) (ArchiveHandle *AH, TocEntry *te);
typedef void (*PrintTocDataPtrType) (ArchiveHandle *AH, TocEntry *te);
+typedef void (*PrepParallelRestorePtrType) (ArchiveHandle *AH);
typedef void (*ClonePtrType) (ArchiveHandle *AH);
typedef void (*DeClonePtrType) (ArchiveHandle *AH);
WorkerJobDumpPtrType WorkerJobDumpPtr;
WorkerJobRestorePtrType WorkerJobRestorePtr;
+ PrepParallelRestorePtrType PrepParallelRestorePtr;
ClonePtrType ClonePtr; /* Clone format-specific fields */
DeClonePtrType DeClonePtr; /* Clean up cloned fields */
void *formatData; /* TOC Entry data specific to file format */
/* working state while dumping/restoring */
+ pgoff_t dataLength; /* item's data size; 0 if none or unknown */
teReqs reqs; /* do we need schema and/or data of object */
bool created; /* set for DATA member if TABLE was created */
/* working state (needed only for parallel restore) */
- struct _tocEntry *par_prev; /* list links for pending/ready items; */
- struct _tocEntry *par_next; /* these are NULL if not in either list */
+ struct _tocEntry *pending_prev; /* list links for pending-items list; */
+ struct _tocEntry *pending_next; /* NULL if not in that list */
int depCount; /* number of dependencies not yet restored */
DumpId *revDeps; /* dumpIds of objects depending on this one */
int nRevDeps; /* number of such dependencies */
extern void warn_or_exit_horribly(ArchiveHandle *AH, const char *modulename, const char *fmt,...) pg_attribute_printf(3, 4);
+/* Called to add a TOC entry */
+extern TocEntry *ArchiveEntry(Archive *AHX,
+ CatalogId catalogId, DumpId dumpId,
+ const char *tag,
+ const char *namespace, const char *tablespace,
+ const char *owner, bool withOids,
+ const char *desc, teSection section,
+ const char *defn,
+ const char *dropStmt, const char *copyStmt,
+ const DumpId *deps, int nDeps,
+ DataDumperPtr dumpFn, void *dumpArg);
+
extern void WriteTOC(ArchiveHandle *AH);
extern void ReadTOC(ArchiveHandle *AH);
extern void WriteHead(ArchiveHandle *AH);
static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
static void _LoadBlobs(ArchiveHandle *AH, bool drop);
+
+static void _PrepParallelRestore(ArchiveHandle *AH);
static void _Clone(ArchiveHandle *AH);
static void _DeClone(ArchiveHandle *AH);
AH->StartBlobPtr = _StartBlob;
AH->EndBlobPtr = _EndBlob;
AH->EndBlobsPtr = _EndBlobs;
+
+ AH->PrepParallelRestorePtr = _PrepParallelRestore;
AH->ClonePtr = _Clone;
AH->DeClonePtr = _DeClone;
strerror(errno));
}
+/*
+ * Prepare for parallel restore.
+ *
+ * The main thing that needs to happen here is to fill in TABLE DATA and BLOBS
+ * TOC entries' dataLength fields with appropriate values to guide the
+ * ordering of restore jobs. The source of said data is format-dependent,
+ * as is the exact meaning of the values.
+ *
+ * A format module might also choose to do other setup here.
+ */
+static void
+_PrepParallelRestore(ArchiveHandle *AH)
+{
+ lclContext *ctx = (lclContext *) AH->formatData;
+ TocEntry *prev_te = NULL;
+ lclTocEntry *prev_tctx = NULL;
+ TocEntry *te;
+
+ /*
+ * Knowing that the data items were dumped out in TOC order, we can
+ * reconstruct the length of each item as the delta to the start offset of
+ * the next data item.
+ */
+ for (te = AH->toc->next; te != AH->toc; te = te->next)
+ {
+ lclTocEntry *tctx = (lclTocEntry *) te->formatData;
+
+ /*
+ * Ignore entries without a known data offset; if we were unable to
+ * seek to rewrite the TOC when creating the archive, this'll be all
+ * of them, and we'll end up with no size estimates.
+ */
+ if (tctx->dataState != K_OFFSET_POS_SET)
+ continue;
+
+ /* Compute previous data item's length */
+ if (prev_te)
+ {
+ if (tctx->dataPos > prev_tctx->dataPos)
+ prev_te->dataLength = tctx->dataPos - prev_tctx->dataPos;
+ }
+
+ prev_te = te;
+ prev_tctx = tctx;
+ }
+
+ /* If OK to seek, we can determine the length of the last item */
+ if (prev_te && ctx->hasSeek)
+ {
+ pgoff_t endpos;
+
+ if (fseeko(AH->FH, 0, SEEK_END) != 0)
+ exit_horribly(modulename, "error during file seek: %s\n",
+ strerror(errno));
+ endpos = ftello(AH->FH);
+ if (endpos > prev_tctx->dataPos)
+ prev_te->dataLength = endpos - prev_tctx->dataPos;
+ }
+}
+
/*
* Clone format-specific fields during parallel restoration.
*/
static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
static void _LoadBlobs(ArchiveHandle *AH);
+static void _PrepParallelRestore(ArchiveHandle *AH);
static void _Clone(ArchiveHandle *AH);
static void _DeClone(ArchiveHandle *AH);
AH->EndBlobPtr = _EndBlob;
AH->EndBlobsPtr = _EndBlobs;
+ AH->PrepParallelRestorePtr = _PrepParallelRestore;
AH->ClonePtr = _Clone;
AH->DeClonePtr = _DeClone;
char fn[MAXPGPATH];
tctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
- if (te->dataDumper)
+ if (strcmp(te->desc, "BLOBS") == 0)
+ tctx->filename = pg_strdup("blobs.toc");
+ else if (te->dataDumper)
{
snprintf(fn, MAXPGPATH, "%d.dat", te->dumpId);
tctx->filename = pg_strdup(fn);
}
- else if (strcmp(te->desc, "BLOBS") == 0)
- tctx->filename = pg_strdup("blobs.toc");
else
tctx->filename = NULL;
strcat(buf, relativeFilename);
}
+/*
+ * Prepare for parallel restore.
+ *
+ * The main thing that needs to happen here is to fill in TABLE DATA and BLOBS
+ * TOC entries' dataLength fields with appropriate values to guide the
+ * ordering of restore jobs. The source of said data is format-dependent,
+ * as is the exact meaning of the values.
+ *
+ * A format module might also choose to do other setup here.
+ */
+static void
+_PrepParallelRestore(ArchiveHandle *AH)
+{
+ TocEntry *te;
+
+ for (te = AH->toc->next; te != AH->toc; te = te->next)
+ {
+ lclTocEntry *tctx = (lclTocEntry *) te->formatData;
+ char fname[MAXPGPATH];
+ struct stat st;
+
+ /*
+ * A dumpable object has set tctx->filename, any other object has not.
+ * (see _ArchiveEntry).
+ */
+ if (tctx->filename == NULL)
+ continue;
+
+ /* We may ignore items not due to be restored */
+ if ((te->reqs & REQ_DATA) == 0)
+ continue;
+
+ /*
+ * Stat the file and, if successful, put its size in dataLength. When
+ * using compression, the physical file size might not be a very good
+ * guide to the amount of work involved in restoring the file, but we
+ * only need an approximate indicator of that.
+ */
+ setFilePath(AH, fname, tctx->filename);
+
+ if (stat(fname, &st) == 0)
+ te->dataLength = st.st_size;
+ else
+ {
+ /* It might be compressed */
+ strlcat(fname, ".gz", sizeof(fname));
+ if (stat(fname, &st) == 0)
+ te->dataLength = st.st_size;
+ }
+
+ /*
+ * If this is the BLOBS entry, what we stat'd was blobs.toc, which
+ * most likely is a lot smaller than the actual blob data. We don't
+ * have a cheap way to estimate how much smaller, but fortunately it
+ * doesn't matter too much as long as we get the blobs processed
+ * reasonably early. Arbitrarily scale up by a factor of 1K.
+ */
+ if (strcmp(te->desc, "BLOBS") == 0)
+ te->dataLength *= 1024;
+ }
+}
+
/*
* Clone format-specific fields during parallel restoration.
*/
#include "catalog/pg_trigger_d.h"
#include "catalog/pg_type_d.h"
#include "libpq/libpq-fs.h"
+#include "storage/block.h"
#include "dumputils.h"
#include "parallel.h"
*/
sortDumpableObjectsByTypeName(dobjs, numObjs);
- /* If we do a parallel dump, we want the largest tables to go first */
- if (archiveFormat == archDirectory && numWorkers > 1)
- sortDataAndIndexObjectsBySize(dobjs, numObjs);
-
sortDumpableObjects(dobjs, numObjs,
boundaryObjs[0].dumpId, boundaryObjs[1].dumpId);
* See comments for BuildArchiveDependencies.
*/
if (tdinfo->dobj.dump & DUMP_COMPONENT_DATA)
- ArchiveEntry(fout, tdinfo->dobj.catId, tdinfo->dobj.dumpId,
- tbinfo->dobj.name, tbinfo->dobj.namespace->dobj.name,
- NULL, tbinfo->rolname,
- false, "TABLE DATA", SECTION_DATA,
- "", "", copyStmt,
- &(tbinfo->dobj.dumpId), 1,
- dumpFn, tdinfo);
+ {
+ TocEntry *te;
+
+ te = ArchiveEntry(fout, tdinfo->dobj.catId, tdinfo->dobj.dumpId,
+ tbinfo->dobj.name, tbinfo->dobj.namespace->dobj.name,
+ NULL, tbinfo->rolname,
+ false, "TABLE DATA", SECTION_DATA,
+ "", "", copyStmt,
+ &(tbinfo->dobj.dumpId), 1,
+ dumpFn, tdinfo);
+
+ /*
+ * Set the TocEntry's dataLength in case we are doing a parallel dump
+ * and want to order dump jobs by table size. We choose to measure
+ * dataLength in table pages during dump, so no scaling is needed.
+ * However, relpages is declared as "integer" in pg_class, and hence
+ * also in TableInfo, but it's really BlockNumber a/k/a unsigned int.
+ * Cast so that we get the right interpretation of table sizes
+ * exceeding INT_MAX pages.
+ */
+ te->dataLength = (BlockNumber) tbinfo->relpages;
+ }
destroyPQExpBuffer(copyBuf);
destroyPQExpBuffer(clistBuf);
i_conoid,
i_condef,
i_tablespace,
- i_indreloptions,
- i_relpages;
+ i_indreloptions;
int ntups;
for (i = 0; i < numTables; i++)
"i.indnkeyatts AS indnkeyatts, "
"i.indnatts AS indnatts, "
"i.indkey, i.indisclustered, "
- "i.indisreplident, t.relpages, "
+ "i.indisreplident, "
"c.contype, c.conname, "
"c.condeferrable, c.condeferred, "
"c.tableoid AS contableoid, "
"i.indnatts AS indnkeyatts, "
"i.indnatts AS indnatts, "
"i.indkey, i.indisclustered, "
- "i.indisreplident, t.relpages, "
+ "i.indisreplident, "
"c.contype, c.conname, "
"c.condeferrable, c.condeferred, "
"c.tableoid AS contableoid, "
"i.indnatts AS indnkeyatts, "
"i.indnatts AS indnatts, "
"i.indkey, i.indisclustered, "
- "false AS indisreplident, t.relpages, "
+ "false AS indisreplident, "
"c.contype, c.conname, "
"c.condeferrable, c.condeferred, "
"c.tableoid AS contableoid, "
"i.indnatts AS indnkeyatts, "
"i.indnatts AS indnatts, "
"i.indkey, i.indisclustered, "
- "false AS indisreplident, t.relpages, "
+ "false AS indisreplident, "
"c.contype, c.conname, "
"c.condeferrable, c.condeferred, "
"c.tableoid AS contableoid, "
"t.relnatts AS indnkeyatts, "
"t.relnatts AS indnatts, "
"i.indkey, i.indisclustered, "
- "false AS indisreplident, t.relpages, "
+ "false AS indisreplident, "
"c.contype, c.conname, "
"c.condeferrable, c.condeferred, "
"c.tableoid AS contableoid, "
i_indkey = PQfnumber(res, "indkey");
i_indisclustered = PQfnumber(res, "indisclustered");
i_indisreplident = PQfnumber(res, "indisreplident");
- i_relpages = PQfnumber(res, "relpages");
i_contype = PQfnumber(res, "contype");
i_conname = PQfnumber(res, "conname");
i_condeferrable = PQfnumber(res, "condeferrable");
indxinfo[j].indisclustered = (PQgetvalue(res, j, i_indisclustered)[0] == 't');
indxinfo[j].indisreplident = (PQgetvalue(res, j, i_indisreplident)[0] == 't');
indxinfo[j].parentidx = atooid(PQgetvalue(res, j, i_parentidx));
- indxinfo[j].relpages = atoi(PQgetvalue(res, j, i_relpages));
contype = *(PQgetvalue(res, j, i_contype));
if (contype == 'p' || contype == 'u' || contype == 'x')
"'' AS attfdwoptions,\n");
if (fout->remoteVersion >= 90100)
+ {
/*
* Since we only want to dump COLLATE clauses for attributes whose
* collation is different from their type's default, we use a CASE
appendPQExpBuffer(q,
"CASE WHEN a.attcollation <> t.typcollation "
"THEN a.attcollation ELSE 0 END AS attcollation,\n");
+ }
else
appendPQExpBuffer(q,
"0 AS attcollation,\n");
appendPQExpBuffer(q,
"'' AS attoptions\n");
+ /* need left join here to not fail on dropped columns ... */
appendPQExpBuffer(q,
- /* need left join here to not fail on dropped columns ... */
"FROM pg_catalog.pg_attribute a LEFT JOIN pg_catalog.pg_type t "
"ON a.atttypid = t.oid\n"
"WHERE a.attrelid = '%u'::pg_catalog.oid "
break;
case DO_BLOB_DATA:
if (dobj->dump & DUMP_COMPONENT_DATA)
- ArchiveEntry(fout, dobj->catId, dobj->dumpId,
- dobj->name, NULL, NULL, "",
- false, "BLOBS", SECTION_DATA,
- "", "", NULL,
- NULL, 0,
- dumpBlobs, NULL);
+ {
+ TocEntry *te;
+
+ te = ArchiveEntry(fout, dobj->catId, dobj->dumpId,
+ dobj->name, NULL, NULL, "",
+ false, "BLOBS", SECTION_DATA,
+ "", "", NULL,
+ NULL, 0,
+ dumpBlobs, NULL);
+
+ /*
+ * Set the TocEntry's dataLength in case we are doing a
+ * parallel dump and want to order dump jobs by table size.
+ * (We need some size estimate for every TocEntry with a
+ * DataDumper function.) We don't currently have any cheap
+ * way to estimate the size of blobs, but it doesn't matter;
+ * let's just set the size to a large value so parallel dumps
+ * will launch this job first. If there's lots of blobs, we
+ * win, and if there aren't, we don't lose much. (If you want
+ * to improve on this, really what you should be thinking
+ * about is allowing blob dumping to be parallelized, not just
+ * getting a smarter estimate for the single TOC entry.)
+ */
+ te->dataLength = MaxBlockNumber;
+ }
break;
case DO_POLICY:
dumpPolicy(fout, (PolicyInfo *) dobj);
Oid parentidx; /* if partitioned, parent index OID */
/* if there is an associated constraint object, its dumpId: */
DumpId indexconstraint;
- int relpages; /* relpages of the underlying table */
} IndxInfo;
typedef struct _indexAttachInfo
extern void sortDumpableObjects(DumpableObject **objs, int numObjs,
DumpId preBoundaryId, DumpId postBoundaryId);
extern void sortDumpableObjectsByTypeName(DumpableObject **objs, int numObjs);
-extern void sortDataAndIndexObjectsBySize(DumpableObject **objs, int numObjs);
/*
* version specific routines
* pg_dump.c; that is, PRE_DATA objects must sort before DO_PRE_DATA_BOUNDARY,
* POST_DATA objects must sort after DO_POST_DATA_BOUNDARY, and DATA objects
* must sort between them.
- *
- * Note: sortDataAndIndexObjectsBySize wants to have all DO_TABLE_DATA and
- * DO_INDEX objects in contiguous chunks, so do not reuse the values for those
- * for other object types.
*/
static const int dbObjectTypePriority[] =
{
static void describeDumpableObject(DumpableObject *obj,
char *buf, int bufsize);
-static int DOSizeCompare(const void *p1, const void *p2);
-
-static int
-findFirstEqualType(DumpableObjectType type, DumpableObject **objs, int numObjs)
-{
- int i;
-
- for (i = 0; i < numObjs; i++)
- if (objs[i]->objType == type)
- return i;
- return -1;
-}
-
-static int
-findFirstDifferentType(DumpableObjectType type, DumpableObject **objs, int numObjs, int start)
-{
- int i;
-
- for (i = start; i < numObjs; i++)
- if (objs[i]->objType != type)
- return i;
- return numObjs - 1;
-}
-
-/*
- * When we do a parallel dump, we want to start with the largest items first.
- *
- * Say we have the objects in this order:
- * ....DDDDD....III....
- *
- * with D = Table data, I = Index, . = other object
- *
- * This sorting function now takes each of the D or I blocks and sorts them
- * according to their size.
- */
-void
-sortDataAndIndexObjectsBySize(DumpableObject **objs, int numObjs)
-{
- int startIdx,
- endIdx;
- void *startPtr;
-
- if (numObjs <= 1)
- return;
-
- startIdx = findFirstEqualType(DO_TABLE_DATA, objs, numObjs);
- if (startIdx >= 0)
- {
- endIdx = findFirstDifferentType(DO_TABLE_DATA, objs, numObjs, startIdx);
- startPtr = objs + startIdx;
- qsort(startPtr, endIdx - startIdx, sizeof(DumpableObject *),
- DOSizeCompare);
- }
-
- startIdx = findFirstEqualType(DO_INDEX, objs, numObjs);
- if (startIdx >= 0)
- {
- endIdx = findFirstDifferentType(DO_INDEX, objs, numObjs, startIdx);
- startPtr = objs + startIdx;
- qsort(startPtr, endIdx - startIdx, sizeof(DumpableObject *),
- DOSizeCompare);
- }
-}
-
-static int
-DOSizeCompare(const void *p1, const void *p2)
-{
- DumpableObject *obj1 = *(DumpableObject **) p1;
- DumpableObject *obj2 = *(DumpableObject **) p2;
- int obj1_size = 0;
- int obj2_size = 0;
-
- if (obj1->objType == DO_TABLE_DATA)
- obj1_size = ((TableDataInfo *) obj1)->tdtable->relpages;
- if (obj1->objType == DO_INDEX)
- obj1_size = ((IndxInfo *) obj1)->relpages;
-
- if (obj2->objType == DO_TABLE_DATA)
- obj2_size = ((TableDataInfo *) obj2)->tdtable->relpages;
- if (obj2->objType == DO_INDEX)
- obj2_size = ((IndxInfo *) obj2)->relpages;
-
- /* we want to see the biggest item go first */
- if (obj1_size > obj2_size)
- return -1;
- if (obj2_size > obj1_size)
- return 1;
-
- return 0;
-}
/*
* Sort the given objects into a type/name-based ordering