* the desired number of worker processes, which each enter WaitForCommands().
*
* The master process dispatches an individual work item to one of the worker
- * processes in DispatchJobForTocEntry(). That calls
- * AH->MasterStartParallelItemPtr, a routine of the output format. This
- * function's arguments are the parents archive handle AH (containing the full
- * catalog information), the TocEntry that the worker should work on and a
- * T_Action value indicating whether this is a backup or a restore task. The
- * function simply converts the TocEntry assignment into a command string that
- * is then sent over to the worker process. In the simplest case that would be
- * something like "DUMP 1234", with 1234 being the TocEntry id.
- *
+ * processes in DispatchJobForTocEntry(). We send a command string such as
+ * "DUMP 1234" or "RESTORE 1234", where 1234 is the TocEntry ID.
* The worker process receives and decodes the command and passes it to the
* routine pointed to by AH->WorkerJobDumpPtr or AH->WorkerJobRestorePtr,
* which are routines of the current archive format. That routine performs
- * the required action (dump or restore) and returns a malloc'd status string.
- * The status string is passed back to the master where it is interpreted by
- * AH->MasterEndParallelItemPtr, another format-specific routine. That
- * function can update format-specific information on the master's side,
- * depending on the reply from the worker process. In the end it returns a
- * status code, which we pass to the ParallelCompletionPtr callback function
- * that was passed to DispatchJobForTocEntry(). The callback function does
- * state updating for the master control logic in pg_backup_archiver.c.
+ * the required action (dump or restore) and returns an integer status code.
+ * This is passed back to the master where we pass it to the
+ * ParallelCompletionPtr callback function that was passed to
+ * DispatchJobForTocEntry(). The callback function does state updating
+ * for the master control logic in pg_backup_archiver.c.
*
- * Remember that we have forked off the workers only after we have read in
- * the catalog. That's why our worker processes can also access the catalog
- * information. (In the Windows case, the workers are threads in the same
- * process. To avoid problems, they work with cloned copies of the Archive
- * data structure; see RunWorker().)
+ * In principle additional archive-format-specific information might be needed
+ * in commands or worker status responses, but so far that hasn't proved
+ * necessary, since workers have full copies of the ArchiveHandle/TocEntry
+ * data structures. Remember that we have forked off the workers only after
+ * we have read in the catalog. That's why our worker processes can also
+ * access the catalog information. (In the Windows case, the workers are
+ * threads in the same process. To avoid problems, they work with cloned
+ * copies of the Archive data structure; see RunWorker().)
*
* In the master process, the workerStatus field for each worker has one of
* the following values:
free(pstate);
}
+/*
+ * These next four functions handle construction and parsing of the command
+ * strings and response strings for parallel workers.
+ *
+ * Currently, these can be the same regardless of which archive format we are
+ * processing. In future, we might want to let format modules override these
+ * functions to add format-specific data to a command or response.
+ */
+
+/*
+ * buildWorkerCommand: format a command string to send to a worker.
+ *
+ * The string is built in the caller-supplied buffer of size buflen.
+ */
+static void
+buildWorkerCommand(ArchiveHandle *AH, TocEntry *te, T_Action act,
+ char *buf, int buflen)
+{
+ if (act == ACT_DUMP)
+ snprintf(buf, buflen, "DUMP %d", te->dumpId);
+ else if (act == ACT_RESTORE)
+ snprintf(buf, buflen, "RESTORE %d", te->dumpId);
+ else
+ Assert(false);
+}
+
+/*
+ * parseWorkerCommand: interpret a command string in a worker.
+ */
+static void
+parseWorkerCommand(ArchiveHandle *AH, TocEntry **te, T_Action *act,
+ const char *msg)
+{
+ DumpId dumpId;
+ int nBytes;
+
+ if (messageStartsWith(msg, "DUMP "))
+ {
+ *act = ACT_DUMP;
+ sscanf(msg, "DUMP %d%n", &dumpId, &nBytes);
+ Assert(nBytes == strlen(msg));
+ *te = getTocEntryByDumpId(AH, dumpId);
+ Assert(*te != NULL);
+ }
+ else if (messageStartsWith(msg, "RESTORE "))
+ {
+ *act = ACT_RESTORE;
+ sscanf(msg, "RESTORE %d%n", &dumpId, &nBytes);
+ Assert(nBytes == strlen(msg));
+ *te = getTocEntryByDumpId(AH, dumpId);
+ Assert(*te != NULL);
+ }
+ else
+ exit_horribly(modulename,
+ "unrecognized command received from master: \"%s\"\n",
+ msg);
+}
+
+/*
+ * buildWorkerResponse: format a response string to send to the master.
+ *
+ * The string is built in the caller-supplied buffer of size buflen.
+ */
+static void
+buildWorkerResponse(ArchiveHandle *AH, TocEntry *te, T_Action act, int status,
+ char *buf, int buflen)
+{
+ snprintf(buf, buflen, "OK %d %d %d",
+ te->dumpId,
+ status,
+ status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
+}
+
+/*
+ * parseWorkerResponse: parse the status message returned by a worker.
+ *
+ * Returns the integer status code, and may update fields of AH and/or te.
+ */
+static int
+parseWorkerResponse(ArchiveHandle *AH, TocEntry *te,
+ const char *msg)
+{
+ DumpId dumpId;
+ int nBytes,
+ n_errors;
+ int status = 0;
+
+ if (messageStartsWith(msg, "OK "))
+ {
+ sscanf(msg, "OK %d %d %d%n", &dumpId, &status, &n_errors, &nBytes);
+
+ Assert(dumpId == te->dumpId);
+ Assert(nBytes == strlen(msg));
+
+ AH->public.n_errors += n_errors;
+ }
+ else
+ exit_horribly(modulename,
+ "invalid message received from worker: \"%s\"\n",
+ msg);
+
+ return status;
+}
+
/*
* Dispatch a job to some free worker.
*
void *callback_data)
{
int worker;
- char *arg;
+ char buf[256];
/* Get a worker, waiting if none are idle */
while ((worker = GetIdleWorker(pstate)) == NO_SLOT)
WaitForWorkers(AH, pstate, WFW_ONE_IDLE);
/* Construct and send command string */
- arg = (AH->MasterStartParallelItemPtr) (AH, te, act);
-
- sendMessageToWorker(pstate, worker, arg);
+ buildWorkerCommand(AH, te, act, buf, sizeof(buf));
- /* XXX aren't we leaking string here? (no, because it's static. Ick.) */
+ sendMessageToWorker(pstate, worker, buf);
/* Remember worker is busy, and which TocEntry it's working on */
pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
WaitForCommands(ArchiveHandle *AH, int pipefd[2])
{
char *command;
- DumpId dumpId;
- int nBytes;
- char *str;
TocEntry *te;
+ T_Action act;
+ int status = 0;
+ char buf[256];
for (;;)
{
return;
}
- if (messageStartsWith(command, "DUMP "))
- {
- /* Decode the command */
- sscanf(command + strlen("DUMP "), "%d%n", &dumpId, &nBytes);
- Assert(nBytes == strlen(command) - strlen("DUMP "));
- te = getTocEntryByDumpId(AH, dumpId);
- Assert(te != NULL);
+ /* Decode the command */
+ parseWorkerCommand(AH, &te, &act, command);
+ if (act == ACT_DUMP)
+ {
/* Acquire lock on this table within the worker's session */
lockTableForWorker(AH, te);
/* Perform the dump command */
- str = (AH->WorkerJobDumpPtr) (AH, te);
-
- /* Return status to master */
- sendMessageToMaster(pipefd, str);
-
- /* we are responsible for freeing the status string */
- free(str);
+ status = (AH->WorkerJobDumpPtr) (AH, te);
}
- else if (messageStartsWith(command, "RESTORE "))
+ else if (act == ACT_RESTORE)
{
- /* Decode the command */
- sscanf(command + strlen("RESTORE "), "%d%n", &dumpId, &nBytes);
- Assert(nBytes == strlen(command) - strlen("RESTORE "));
- te = getTocEntryByDumpId(AH, dumpId);
- Assert(te != NULL);
-
/* Perform the restore command */
- str = (AH->WorkerJobRestorePtr) (AH, te);
-
- /* Return status to master */
- sendMessageToMaster(pipefd, str);
-
- /* we are responsible for freeing the status string */
- free(str);
+ status = (AH->WorkerJobRestorePtr) (AH, te);
}
else
- exit_horribly(modulename,
- "unrecognized command received from master: \"%s\"\n",
- command);
+ Assert(false);
+
+ /* Return status to master */
+ buildWorkerResponse(AH, te, act, status, buf, sizeof(buf));
+
+ sendMessageToMaster(pipefd, buf);
/* command was pg_malloc'd and we are responsible for free()ing it. */
free(command);
* If do_wait is true, wait to get a status message; otherwise, just return
* immediately if there is none available.
*
- * When we get a status message, we let MasterEndParallelItemPtr process it,
- * then pass the resulting status code to the callback function that was
- * specified to DispatchJobForTocEntry, then reset the worker status to IDLE.
+ * When we get a status message, we pass the status code to the callback
+ * function that was specified to DispatchJobForTocEntry, then reset the
+ * worker status to IDLE.
*
* Returns true if we collected a status message, else false.
*
{
ParallelSlot *slot = &pstate->parallelSlot[worker];
TocEntry *te = slot->te;
- char *statusString;
int status;
- if (messageStartsWith(msg, "OK RESTORE "))
- {
- statusString = msg + strlen("OK RESTORE ");
- status =
- (AH->MasterEndParallelItemPtr)
- (AH, te, statusString, ACT_RESTORE);
- slot->callback(AH, te, status, slot->callback_data);
- }
- else if (messageStartsWith(msg, "OK DUMP "))
- {
- statusString = msg + strlen("OK DUMP ");
- status =
- (AH->MasterEndParallelItemPtr)
- (AH, te, statusString, ACT_DUMP);
- slot->callback(AH, te, status, slot->callback_data);
- }
- else
- exit_horribly(modulename,
- "invalid message received from worker: \"%s\"\n",
- msg);
+ status = parseWorkerResponse(AH, te, msg);
+ slot->callback(AH, te, status, slot->callback_data);
slot->workerStatus = WRKR_IDLE;
slot->te = NULL;
}
* WFW_ONE_IDLE: wait for at least one worker to be idle
* WFW_ALL_IDLE: wait for all workers to be idle
*
- * Any received results are passed to MasterEndParallelItemPtr and then
- * to the callback specified to DispatchJobForTocEntry.
+ * Any received results are passed to the callback specified to
+ * DispatchJobForTocEntry.
*
* This function is executed in the master process.
*/
static void _Clone(ArchiveHandle *AH);
static void _DeClone(ArchiveHandle *AH);
-static char *_MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act);
-static int _MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te, const char *str, T_Action act);
-char *_WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te);
+static int _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te);
typedef struct
{
AH->ClonePtr = _Clone;
AH->DeClonePtr = _DeClone;
- AH->MasterStartParallelItemPtr = _MasterStartParallelItem;
- AH->MasterEndParallelItemPtr = _MasterEndParallelItem;
-
/* no parallel dump in the custom archive, only parallel restore */
AH->WorkerJobDumpPtr = NULL;
AH->WorkerJobRestorePtr = _WorkerJobRestoreCustom;
}
/*
- * This function is executed in the child of a parallel backup for the
- * custom format archive and dumps the actual data.
- */
-char *
-_WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te)
-{
- /*
- * short fixed-size string + some ID so far, this needs to be malloc'ed
- * instead of static because we work with threads on windows
- */
- const int buflen = 64;
- char *buf = (char *) pg_malloc(buflen);
- int status;
-
- status = parallel_restore(AH, te);
-
- snprintf(buf, buflen, "OK RESTORE %d %d %d", te->dumpId, status,
- status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
-
- return buf;
-}
-
-/*
- * This function is executed in the parent process. Depending on the desired
- * action (dump or restore) it creates a string that is understood by the
- * _WorkerJobDump /_WorkerJobRestore functions of the dump format.
- */
-static char *
-_MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act)
-{
- /*
- * A static char is okay here, even on Windows because we call this
- * function only from one process (the master).
- */
- static char buf[64]; /* short fixed-size string + number */
-
- /* no parallel dump in the custom archive format */
- Assert(act == ACT_RESTORE);
-
- snprintf(buf, sizeof(buf), "RESTORE %d", te->dumpId);
-
- return buf;
-}
-
-/*
- * This function is executed in the parent process. It analyzes the response of
- * the _WorkerJobDump / _WorkerJobRestore functions of the dump format.
+ * This function is executed in the child of a parallel restore from a
+ * custom-format archive and restores the actual data for one TOC entry.
*/
static int
-_MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te, const char *str, T_Action act)
+_WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te)
{
- DumpId dumpId;
- int nBytes,
- status,
- n_errors;
-
- /* no parallel dump in the custom archive */
- Assert(act == ACT_RESTORE);
-
- sscanf(str, "%d %d %d%n", &dumpId, &status, &n_errors, &nBytes);
-
- Assert(nBytes == strlen(str));
- Assert(dumpId == te->dumpId);
-
- AH->public.n_errors += n_errors;
-
- return status;
+ return parallel_restore(AH, te);
}
/*--------------------------------------------------
static void _Clone(ArchiveHandle *AH);
static void _DeClone(ArchiveHandle *AH);
-static char *_MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act);
-static int _MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te,
- const char *str, T_Action act);
-static char *_WorkerJobRestoreDirectory(ArchiveHandle *AH, TocEntry *te);
-static char *_WorkerJobDumpDirectory(ArchiveHandle *AH, TocEntry *te);
+static int _WorkerJobRestoreDirectory(ArchiveHandle *AH, TocEntry *te);
+static int _WorkerJobDumpDirectory(ArchiveHandle *AH, TocEntry *te);
static void setFilePath(ArchiveHandle *AH, char *buf,
const char *relativeFilename);
AH->WorkerJobRestorePtr = _WorkerJobRestoreDirectory;
AH->WorkerJobDumpPtr = _WorkerJobDumpDirectory;
- AH->MasterStartParallelItemPtr = _MasterStartParallelItem;
- AH->MasterEndParallelItemPtr = _MasterEndParallelItem;
-
/* Set up our private context */
ctx = (lclContext *) pg_malloc0(sizeof(lclContext));
AH->formatData = (void *) ctx;
}
/*
- * This function is executed in the parent process. Depending on the desired
- * action (dump or restore) it creates a string that is understood by the
- * _WorkerJobDump /_WorkerJobRestore functions of the dump format.
- */
-static char *
-_MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act)
-{
- /*
- * A static char is okay here, even on Windows because we call this
- * function only from one process (the master).
- */
- static char buf[64];
-
- if (act == ACT_DUMP)
- snprintf(buf, sizeof(buf), "DUMP %d", te->dumpId);
- else if (act == ACT_RESTORE)
- snprintf(buf, sizeof(buf), "RESTORE %d", te->dumpId);
-
- return buf;
-}
-
-/*
- * This function is executed in the child of a parallel backup for the
- * directory archive and dumps the actual data.
- *
- * We are currently returning only the DumpId so theoretically we could
- * make this function returning an int (or a DumpId). However, to
- * facilitate further enhancements and because sooner or later we need to
- * convert this to a string and send it via a message anyway, we stick with
- * char *. It is parsed on the other side by the _EndMasterParallel()
- * function of the respective dump format.
+ * This function is executed in the child of a parallel backup for a
+ * directory-format archive and dumps the actual data for one TOC entry.
*/
-static char *
+static int
_WorkerJobDumpDirectory(ArchiveHandle *AH, TocEntry *te)
{
- /*
- * short fixed-size string + some ID so far, this needs to be malloc'ed
- * instead of static because we work with threads on windows
- */
- const int buflen = 64;
- char *buf = (char *) pg_malloc(buflen);
- lclTocEntry *tctx = (lclTocEntry *) te->formatData;
-
- /* This should never happen */
- if (!tctx)
- exit_horribly(modulename, "error during backup\n");
-
/*
* This function returns void. We either fail and die horribly or
* succeed... A failure will be detected by the parent when the child dies
*/
WriteDataChunksForTocEntry(AH, te);
- snprintf(buf, buflen, "OK DUMP %d", te->dumpId);
-
- return buf;
+ return 0;
}
/*
- * This function is executed in the child of a parallel backup for the
- * directory archive and dumps the actual data.
- */
-static char *
-_WorkerJobRestoreDirectory(ArchiveHandle *AH, TocEntry *te)
-{
- /*
- * short fixed-size string + some ID so far, this needs to be malloc'ed
- * instead of static because we work with threads on windows
- */
- const int buflen = 64;
- char *buf = (char *) pg_malloc(buflen);
- int status;
-
- status = parallel_restore(AH, te);
-
- snprintf(buf, buflen, "OK RESTORE %d %d %d", te->dumpId, status,
- status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
-
- return buf;
-}
-
-/*
- * This function is executed in the parent process. It analyzes the response of
- * the _WorkerJobDumpDirectory/_WorkerJobRestoreDirectory functions of the
- * respective dump format.
+ * This function is executed in the child of a parallel restore from a
+ * directory-format archive and restores the actual data for one TOC entry.
*/
static int
-_MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te, const char *str, T_Action act)
+_WorkerJobRestoreDirectory(ArchiveHandle *AH, TocEntry *te)
{
- DumpId dumpId;
- int nBytes,
- n_errors;
- int status = 0;
-
- if (act == ACT_DUMP)
- {
- sscanf(str, "%d%n", &dumpId, &nBytes);
-
- Assert(dumpId == te->dumpId);
- Assert(nBytes == strlen(str));
- }
- else if (act == ACT_RESTORE)
- {
- sscanf(str, "%d %d %d%n", &dumpId, &status, &n_errors, &nBytes);
-
- Assert(dumpId == te->dumpId);
- Assert(nBytes == strlen(str));
-
- AH->public.n_errors += n_errors;
- }
-
- return status;
+ return parallel_restore(AH, te);
}