* the required action (dump or restore) and returns a malloc'd status string.
* The status string is passed back to the master where it is interpreted by
* AH->MasterEndParallelItemPtr, another format-specific routine. That
- * function can update state or catalog information on the master's side,
+ * function can update format-specific information on the master's side,
* depending on the reply from the worker process. In the end it returns a
- * status code, which is 0 for successful execution.
+ * status code, which we pass to the ParallelCompletionPtr callback function
+ * that was passed to DispatchJobForTocEntry(). The callback function does
+ * state updating for the master control logic in pg_backup_archiver.c.
*
* Remember that we have forked off the workers only after we have read in
* the catalog. That's why our worker processes can also access the catalog
* In the master process, the workerStatus field for each worker has one of
* the following values:
* WRKR_IDLE: it's waiting for a command
- * WRKR_WORKING: it's been sent a command
- * WRKR_FINISHED: it's returned a result
+ * WRKR_WORKING: it's working on a command
* WRKR_TERMINATED: process ended
- * The FINISHED state indicates that the worker is idle, but we've not yet
- * dealt with the status code it returned from the prior command.
- * ReapWorkerStatus() extracts the unhandled command status value and sets
- * the workerStatus back to WRKR_IDLE.
*/
#include "postgres_fe.h"
#define PIPE_READ 0
#define PIPE_WRITE 1
+#define NO_SLOT (-1) /* Failure result for GetIdleWorker() */
+
#ifdef WIN32
/*
static void set_cancel_pstate(ParallelState *pstate);
static void set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH);
static void RunWorker(ArchiveHandle *AH, ParallelSlot *slot);
+static int GetIdleWorker(ParallelState *pstate);
static bool HasEveryWorkerTerminated(ParallelState *pstate);
static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te);
static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]);
+static bool ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate,
+ bool do_wait);
static char *getMessageFromMaster(int pipefd[2]);
static void sendMessageToMaster(int pipefd[2], const char *str);
static int select_loop(int maxFd, fd_set *workerset);
* fail to detect it because there would be no EOF condition on
* the other end of the pipe.)
*/
- if (slot->args->AH)
- DisconnectDatabase(&(slot->args->AH->public));
+ if (slot->AH)
+ DisconnectDatabase(&(slot->AH->public));
#ifdef WIN32
closesocket(slot->pipeRevRead);
EnterCriticalSection(&signal_info_lock);
for (i = 0; i < pstate->numWorkers; i++)
{
- ArchiveHandle *AH = pstate->parallelSlot[i].args->AH;
+ ArchiveHandle *AH = pstate->parallelSlot[i].AH;
char errbuf[1];
if (AH != NULL && AH->connCancel != NULL)
for (i = 0; i < signal_info.pstate->numWorkers; i++)
{
ParallelSlot *slot = &(signal_info.pstate->parallelSlot[i]);
- ArchiveHandle *AH = slot->args->AH;
+ ArchiveHandle *AH = slot->AH;
HANDLE hThread = (HANDLE) slot->hThread;
/*
EnterCriticalSection(&signal_info_lock);
#endif
- slot->args->AH = AH;
+ slot->AH = AH;
#ifdef WIN32
LeaveCriticalSection(&signal_info_lock);
strerror(errno));
slot->workerStatus = WRKR_IDLE;
- slot->args = (ParallelArgs *) pg_malloc(sizeof(ParallelArgs));
- slot->args->AH = NULL;
- slot->args->te = NULL;
+ slot->AH = NULL;
+ slot->te = NULL;
+ slot->callback = NULL;
+ slot->callback_data = NULL;
/* master's ends of the pipes */
slot->pipeRead = pipeWM[PIPE_READ];
}
/*
- * Dispatch a job to some free worker (caller must ensure there is one!)
+ * Dispatch a job to some free worker.
*
* te is the TocEntry to be processed, act is the action to be taken on it.
+ * callback is the function to call on completion of the job.
+ *
+ * If no worker is currently available, this will block, and previously
+ * registered callback functions may be called.
*/
void
-DispatchJobForTocEntry(ArchiveHandle *AH, ParallelState *pstate, TocEntry *te,
- T_Action act)
+DispatchJobForTocEntry(ArchiveHandle *AH,
+ ParallelState *pstate,
+ TocEntry *te,
+ T_Action act,
+ ParallelCompletionPtr callback,
+ void *callback_data)
{
int worker;
char *arg;
- /* our caller makes sure that at least one worker is idle */
- worker = GetIdleWorker(pstate);
- Assert(worker != NO_SLOT);
+ /* Get a worker, waiting if none are idle */
+ while ((worker = GetIdleWorker(pstate)) == NO_SLOT)
+ WaitForWorkers(AH, pstate, WFW_ONE_IDLE);
/* Construct and send command string */
arg = (AH->MasterStartParallelItemPtr) (AH, te, act);
/* Remember worker is busy, and which TocEntry it's working on */
pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
- pstate->parallelSlot[worker].args->te = te;
+ pstate->parallelSlot[worker].te = te;
+ pstate->parallelSlot[worker].callback = callback;
+ pstate->parallelSlot[worker].callback_data = callback_data;
}
/*
* Find an idle worker and return its slot number.
* Return NO_SLOT if none are idle.
*/
-int
+static int
GetIdleWorker(ParallelState *pstate)
{
int i;
* immediately if there is none available.
*
* When we get a status message, we let MasterEndParallelItemPtr process it,
- * then save the resulting status code and switch the worker's state to
- * WRKR_FINISHED. Later, caller must call ReapWorkerStatus() to verify
- * that the status was "OK" and push the worker back to IDLE state.
+ * then pass the resulting status code to the callback function that was
+ * specified to DispatchJobForTocEntry, then reset the worker status to IDLE.
*
- * XXX Rube Goldberg would be proud of this API, but no one else should be.
+ * Returns true if we collected a status message, else false.
*
* XXX is it worth checking for more than one status message per call?
* It seems somewhat unlikely that multiple workers would finish at exactly
* the same time.
*/
-void
+static bool
ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
{
int worker;
/* If do_wait is true, we must have detected EOF on some socket */
if (do_wait)
exit_horribly(modulename, "a worker process died unexpectedly\n");
- return;
+ return false;
}
/* Process it and update our idea of the worker's status */
if (messageStartsWith(msg, "OK "))
{
- TocEntry *te = pstate->parallelSlot[worker].args->te;
+ ParallelSlot *slot = &pstate->parallelSlot[worker];
+ TocEntry *te = slot->te;
char *statusString;
+ int status;
if (messageStartsWith(msg, "OK RESTORE "))
{
statusString = msg + strlen("OK RESTORE ");
- pstate->parallelSlot[worker].status =
+ status =
(AH->MasterEndParallelItemPtr)
(AH, te, statusString, ACT_RESTORE);
+ slot->callback(AH, te, status, slot->callback_data);
}
else if (messageStartsWith(msg, "OK DUMP "))
{
statusString = msg + strlen("OK DUMP ");
- pstate->parallelSlot[worker].status =
+ status =
(AH->MasterEndParallelItemPtr)
(AH, te, statusString, ACT_DUMP);
+ slot->callback(AH, te, status, slot->callback_data);
}
else
exit_horribly(modulename,
"invalid message received from worker: \"%s\"\n",
msg);
- pstate->parallelSlot[worker].workerStatus = WRKR_FINISHED;
+ slot->workerStatus = WRKR_IDLE;
+ slot->te = NULL;
}
else
exit_horribly(modulename,
/* Free the string returned from getMessageFromWorker */
free(msg);
-}
-
-/*
- * Check to see if any worker is in WRKR_FINISHED state. If so,
- * return its command status code into *status, reset it to IDLE state,
- * and return its slot number. Otherwise return NO_SLOT.
- *
- * This function is executed in the master process.
- */
-int
-ReapWorkerStatus(ParallelState *pstate, int *status)
-{
- int i;
- for (i = 0; i < pstate->numWorkers; i++)
- {
- if (pstate->parallelSlot[i].workerStatus == WRKR_FINISHED)
- {
- *status = pstate->parallelSlot[i].status;
- pstate->parallelSlot[i].status = 0;
- pstate->parallelSlot[i].workerStatus = WRKR_IDLE;
- return i;
- }
- }
- return NO_SLOT;
+ return true;
}
/*
- * Wait, if necessary, until we have at least one idle worker.
- * Reap worker status as necessary to move FINISHED workers to IDLE state.
+ * Check for status results from workers, waiting if necessary.
*
- * We assume that no extra processing is required when reaping a finished
- * command, except for checking that the status was OK (zero).
- * Caution: that assumption means that this function can only be used in
- * parallel dump, not parallel restore, because the latter has a more
- * complex set of rules about handling status.
+ * Available wait modes are:
+ * WFW_NO_WAIT: reap any available status, but don't block
+ * WFW_GOT_STATUS: wait for at least one more worker to finish
+ * WFW_ONE_IDLE: wait for at least one worker to be idle
+ * WFW_ALL_IDLE: wait for all workers to be idle
+ *
+ * Any received results are passed to MasterEndParallelItemPtr and then
+ * to the callback specified to DispatchJobForTocEntry.
*
* This function is executed in the master process.
*/
void
-EnsureIdleWorker(ArchiveHandle *AH, ParallelState *pstate)
+WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode)
{
- int ret_worker;
- int work_status;
+ bool do_wait = false;
- for (;;)
+ /*
+ * In GOT_STATUS mode, always block waiting for a message, since we can't
+ * return till we get something. In other modes, we don't block the first
+ * time through the loop.
+ */
+ if (mode == WFW_GOT_STATUS)
{
- int nTerm = 0;
-
- while ((ret_worker = ReapWorkerStatus(pstate, &work_status)) != NO_SLOT)
- {
- if (work_status != 0)
- exit_horribly(modulename, "error processing a parallel work item\n");
-
- nTerm++;
- }
-
- /*
- * We need to make sure that we have an idle worker before dispatching
- * the next item. If nTerm > 0 we already have that (quick check).
- */
- if (nTerm > 0)
- return;
-
- /* explicit check for an idle worker */
- if (GetIdleWorker(pstate) != NO_SLOT)
- return;
+ /* Assert that caller knows what it's doing */
+ Assert(!IsEveryWorkerIdle(pstate));
+ do_wait = true;
+ }
+ for (;;)
+ {
/*
- * If we have no idle worker, read the result of one or more workers
- * and loop the loop to call ReapWorkerStatus() on them
+ * Check for status messages, even if we don't need to block. We do
+ * not try very hard to reap all available messages, though, since
+ * there's unlikely to be more than one.
*/
- ListenToWorkers(AH, pstate, true);
- }
-}
-
-/*
- * Wait for all workers to be idle.
- * Reap worker status as necessary to move FINISHED workers to IDLE state.
- *
- * We assume that no extra processing is required when reaping a finished
- * command, except for checking that the status was OK (zero).
- * Caution: that assumption means that this function can only be used in
- * parallel dump, not parallel restore, because the latter has a more
- * complex set of rules about handling status.
- *
- * This function is executed in the master process.
- */
-void
-EnsureWorkersFinished(ArchiveHandle *AH, ParallelState *pstate)
-{
- int work_status;
+ if (ListenToWorkers(AH, pstate, do_wait))
+ {
+ /*
+ * If we got a message, we are done by definition for GOT_STATUS
+ * mode, and we can also be certain that there's at least one idle
+ * worker. So we're done in all but ALL_IDLE mode.
+ */
+ if (mode != WFW_ALL_IDLE)
+ return;
+ }
- if (!pstate || pstate->numWorkers == 1)
- return;
+ /* Check whether we must wait for new status messages */
+ switch (mode)
+ {
+ case WFW_NO_WAIT:
+ return; /* never wait */
+ case WFW_GOT_STATUS:
+ Assert(false); /* can't get here, because we waited */
+ break;
+ case WFW_ONE_IDLE:
+ if (GetIdleWorker(pstate) != NO_SLOT)
+ return;
+ break;
+ case WFW_ALL_IDLE:
+ if (IsEveryWorkerIdle(pstate))
+ return;
+ break;
+ }
- /* Waiting for the remaining worker processes to finish */
- while (!IsEveryWorkerIdle(pstate))
- {
- if (ReapWorkerStatus(pstate, &work_status) == NO_SLOT)
- ListenToWorkers(AH, pstate, true);
- else if (work_status != 0)
- exit_horribly(modulename,
- "error processing a parallel work item\n");
+ /* Loop back, and this time wait for something to happen */
+ do_wait = true;
}
}
*
* parallel.h
*
- * Parallel support header file for the pg_dump archiver
+ * Parallel support for pg_dump and pg_restore
*
* Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * The author is not responsible for loss or damages that may
- * result from its use.
- *
* IDENTIFICATION
* src/bin/pg_dump/parallel.h
*
#include "pg_backup_archiver.h"
+/* Function to call in master process on completion of a worker task */
+typedef void (*ParallelCompletionPtr) (ArchiveHandle *AH,
+ TocEntry *te,
+ int status,
+ void *callback_data);
+
+/* Wait options for WaitForWorkers */
+typedef enum
+{
+ WFW_NO_WAIT,
+ WFW_GOT_STATUS,
+ WFW_ONE_IDLE,
+ WFW_ALL_IDLE
+} WFW_WaitOption;
+
+/* Worker process statuses */
typedef enum
{
- WRKR_TERMINATED = 0,
WRKR_IDLE,
WRKR_WORKING,
- WRKR_FINISHED
+ WRKR_TERMINATED
} T_WorkerStatus;
-/* Arguments needed for a worker process */
-typedef struct ParallelArgs
-{
- ArchiveHandle *AH;
- TocEntry *te;
-} ParallelArgs;
-
-/* State for each parallel activity slot */
+/*
+ * Per-parallel-worker state of parallel.c.
+ *
+ * Much of this is valid only in the master process (or, on Windows, should
+ * be touched only by the master thread). But the AH field should be touched
+ * only by workers. The pipe descriptors are valid everywhere.
+ */
typedef struct ParallelSlot
{
- ParallelArgs *args;
- T_WorkerStatus workerStatus;
- int status;
+ T_WorkerStatus workerStatus; /* see enum above */
+
+ /* These fields are valid if workerStatus == WRKR_WORKING: */
+ TocEntry *te; /* item being worked on */
+ ParallelCompletionPtr callback; /* function to call on completion */
+ void *callback_data; /* passthru data for it */
+
+ ArchiveHandle *AH; /* Archive data worker is using */
+
int pipeRead; /* master's end of the pipes */
int pipeWrite;
int pipeRevRead; /* child's end of the pipes */
int pipeRevWrite;
+
+ /* Child process/thread identity info: */
#ifdef WIN32
uintptr_t hThread;
unsigned int threadId;
#endif
} ParallelSlot;
-#define NO_SLOT (-1)
-
+/* Overall state for parallel.c */
typedef struct ParallelState
{
- int numWorkers;
- ParallelSlot *parallelSlot;
+ int numWorkers; /* allowed number of workers */
+ ParallelSlot *parallelSlot; /* array of numWorkers slots */
} ParallelState;
#ifdef WIN32
extern void init_parallel_dump_utils(void);
-extern int GetIdleWorker(ParallelState *pstate);
extern bool IsEveryWorkerIdle(ParallelState *pstate);
-extern void ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait);
-extern int ReapWorkerStatus(ParallelState *pstate, int *status);
-extern void EnsureIdleWorker(ArchiveHandle *AH, ParallelState *pstate);
-extern void EnsureWorkersFinished(ArchiveHandle *AH, ParallelState *pstate);
+extern void WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate,
+ WFW_WaitOption mode);
extern ParallelState *ParallelBackupStart(ArchiveHandle *AH);
extern void DispatchJobForTocEntry(ArchiveHandle *AH,
ParallelState *pstate,
- TocEntry *te, T_Action act);
+ TocEntry *te,
+ T_Action act,
+ ParallelCompletionPtr callback,
+ void *callback_data);
extern void ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate);
extern void set_archive_cancel_info(ArchiveHandle *AH, PGconn *conn);
static TocEntry *get_next_work_item(ArchiveHandle *AH,
TocEntry *ready_list,
ParallelState *pstate);
-static void mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
- int worker, int status,
- ParallelState *pstate);
+static void mark_dump_job_done(ArchiveHandle *AH,
+ TocEntry *te,
+ int status,
+ void *callback_data);
+static void mark_restore_job_done(ArchiveHandle *AH,
+ TocEntry *te,
+ int status,
+ void *callback_data);
static void fix_dependencies(ArchiveHandle *AH);
static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
static void repoint_table_dependencies(ArchiveHandle *AH);
* If we are in a parallel backup, then we are always the master
* process. Dispatch each data-transfer job to a worker.
*/
- EnsureIdleWorker(AH, pstate);
- DispatchJobForTocEntry(AH, pstate, te, ACT_DUMP);
+ DispatchJobForTocEntry(AH, pstate, te, ACT_DUMP,
+ mark_dump_job_done, NULL);
}
else
WriteDataChunksForTocEntry(AH, te);
/*
* If parallel, wait for workers to finish.
*/
- EnsureWorkersFinished(AH, pstate);
+ if (pstate && pstate->numWorkers > 1)
+ WaitForWorkers(AH, pstate, WFW_ALL_IDLE);
}
+
+/*
+ * Callback function that's invoked in the master process after a step has
+ * been parallel dumped.
+ *
+ * We don't need to do anything except check for worker failure.
+ */
+static void
+mark_dump_job_done(ArchiveHandle *AH,
+ TocEntry *te,
+ int status,
+ void *callback_data)
+{
+ ahlog(AH, 1, "finished item %d %s %s\n",
+ te->dumpId, te->desc, te->tag);
+
+ if (status != 0)
+ exit_horribly(modulename, "worker process failed: exit code %d\n",
+ status);
+}
+
+
void
WriteDataChunksForTocEntry(ArchiveHandle *AH, TocEntry *te)
{
return 0;
}
- if (ropt->schemaExcludeNames.head != NULL
- && te->namespace
- && simple_string_list_member(&ropt->schemaExcludeNames, te->namespace))
+ if (ropt->schemaExcludeNames.head != NULL &&
+ te->namespace &&
+ simple_string_list_member(&ropt->schemaExcludeNames, te->namespace))
return 0;
if (ropt->selTypes)
restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
TocEntry *pending_list)
{
- int work_status;
bool skipped_some;
TocEntry ready_list;
TocEntry *next_work_item;
- int ret_child;
ahlog(AH, 2, "entering restore_toc_entries_parallel\n");
par_list_remove(next_work_item);
- DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE);
+ DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE,
+ mark_restore_job_done, &ready_list);
}
else
{
/* at least one child is working and we have nothing ready. */
}
- for (;;)
- {
- int nTerm = 0;
-
- /*
- * In order to reduce dependencies as soon as possible and
- * especially to reap the status of workers who are working on
- * items that pending items depend on, we do a non-blocking check
- * for ended workers first.
- *
- * However, if we do not have any other work items currently that
- * workers can work on, we do not busy-loop here but instead
- * really wait for at least one worker to terminate. Hence we call
- * ListenToWorkers(..., ..., do_wait = true) in this case.
- */
- ListenToWorkers(AH, pstate, !next_work_item);
-
- while ((ret_child = ReapWorkerStatus(pstate, &work_status)) != NO_SLOT)
- {
- nTerm++;
- mark_work_done(AH, &ready_list, ret_child, work_status, pstate);
- }
-
- /*
- * We need to make sure that we have an idle worker before
- * re-running the loop. If nTerm > 0 we already have that (quick
- * check).
- */
- if (nTerm > 0)
- break;
-
- /* if nobody terminated, explicitly check for an idle worker */
- if (GetIdleWorker(pstate) != NO_SLOT)
- break;
-
- /*
- * If we have no idle worker, read the result of one or more
- * workers and loop the loop to call ReapWorkerStatus() on them.
- */
- ListenToWorkers(AH, pstate, true);
- }
+ /*
+ * Before dispatching another job, check to see if anything has
+ * finished. We should check every time through the loop so as to
+ * reduce dependencies as soon as possible. If we were unable to
+ * dispatch any job this time through, wait until some worker finishes
+ * (and, hopefully, unblocks some pending item). If we did dispatch
+ * something, continue as soon as there's at least one idle worker.
+ * Note that in either case, there's guaranteed to be at least one
+ * idle worker when we return to the top of the loop. This ensures we
+ * won't block inside DispatchJobForTocEntry, which would be
+ * undesirable: we'd rather postpone dispatching until we see what's
+ * been unblocked by finished jobs.
+ */
+ WaitForWorkers(AH, pstate,
+ next_work_item ? WFW_ONE_IDLE : WFW_GOT_STATUS);
}
ahlog(AH, 1, "finished main parallel loop\n");
int count = 0;
for (k = 0; k < pstate->numWorkers; k++)
- if (pstate->parallelSlot[k].args->te != NULL &&
- pstate->parallelSlot[k].args->te->section == SECTION_DATA)
+ {
+ if (pstate->parallelSlot[k].workerStatus == WRKR_WORKING &&
+ pstate->parallelSlot[k].te->section == SECTION_DATA)
count++;
+ }
if (pstate->numWorkers == 0 || count * 4 < pstate->numWorkers)
pref_non_data = false;
}
* that a currently running item also needs lock on, or vice versa. If
* so, we don't want to schedule them together.
*/
- for (i = 0; i < pstate->numWorkers && !conflicts; i++)
+ for (i = 0; i < pstate->numWorkers; i++)
{
TocEntry *running_te;
if (pstate->parallelSlot[i].workerStatus != WRKR_WORKING)
continue;
- running_te = pstate->parallelSlot[i].args->te;
+ running_te = pstate->parallelSlot[i].te;
if (has_lock_conflicts(te, running_te) ||
has_lock_conflicts(running_te, te))
* our work is finished, the master process will assign us a new work item.
*/
int
-parallel_restore(ParallelArgs *args)
+parallel_restore(ArchiveHandle *AH, TocEntry *te)
{
- ArchiveHandle *AH = args->AH;
- TocEntry *te = args->te;
int status;
Assert(AH->connection != NULL);
/*
- * Housekeeping to be done after a step has been parallel restored.
+ * Callback function that's invoked in the master process after a step has
+ * been parallel restored.
*
- * Clear the appropriate slot, free all the extra memory we allocated,
- * update status, and reduce the dependency count of any dependent items.
+ * Update status and reduce the dependency count of any dependent items.
*/
static void
-mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
- int worker, int status,
- ParallelState *pstate)
+mark_restore_job_done(ArchiveHandle *AH,
+ TocEntry *te,
+ int status,
+ void *callback_data)
{
- TocEntry *te = NULL;
-
- te = pstate->parallelSlot[worker].args->te;
-
- if (te == NULL)
- exit_horribly(modulename, "could not find slot of finished worker\n");
+ TocEntry *ready_list = (TocEntry *) callback_data;
ahlog(AH, 1, "finished item %d %s %s\n",
te->dumpId, te->desc, te->tag);