*
* parallel.c
*
- * Parallel support for the pg_dump archiver
+ * Parallel support for pg_dump and pg_restore
*
* Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * The author is not responsible for loss or damages that may
- * result from its use.
- *
* IDENTIFICATION
* src/bin/pg_dump/parallel.c
*
*-------------------------------------------------------------------------
*/
+/*
+ * Parallel operation works like this:
+ *
+ * The original, master process calls ParallelBackupStart(), which forks off
+ * the desired number of worker processes, which each enter WaitForCommands().
+ *
+ * The master process dispatches an individual work item to one of the worker
+ * processes in DispatchJobForTocEntry(). That calls
+ * AH->MasterStartParallelItemPtr, a routine of the output format. This
+ * function's arguments are the parents archive handle AH (containing the full
+ * catalog information), the TocEntry that the worker should work on and a
+ * T_Action value indicating whether this is a backup or a restore task. The
+ * function simply converts the TocEntry assignment into a command string that
+ * is then sent over to the worker process. In the simplest case that would be
+ * something like "DUMP 1234", with 1234 being the TocEntry id.
+ *
+ * The worker process receives and decodes the command and passes it to the
+ * routine pointed to by AH->WorkerJobDumpPtr or AH->WorkerJobRestorePtr,
+ * which are routines of the current archive format. That routine performs
+ * the required action (dump or restore) and returns a malloc'd status string.
+ * The status string is passed back to the master where it is interpreted by
+ * AH->MasterEndParallelItemPtr, another format-specific routine. That
+ * function can update state or catalog information on the master's side,
+ * depending on the reply from the worker process. In the end it returns a
+ * status code, which is 0 for successful execution.
+ *
+ * Remember that we have forked off the workers only after we have read in
+ * the catalog. That's why our worker processes can also access the catalog
+ * information. (In the Windows case, the workers are threads in the same
+ * process. To avoid problems, they work with cloned copies of the Archive
+ * data structure; see init_spawned_worker_win32().)
+ *
+ * In the master process, the workerStatus field for each worker has one of
+ * the following values:
+ * WRKR_IDLE: it's waiting for a command
+ * WRKR_WORKING: it's been sent a command
+ * WRKR_FINISHED: it's returned a result
+ * WRKR_TERMINATED: process ended
+ * The FINISHED state indicates that the worker is idle, but we've not yet
+ * dealt with the status code it returned from the prior command.
+ * ReapWorkerStatus() extracts the unhandled command status value and sets
+ * the workerStatus back to WRKR_IDLE.
+ */
+
#include "postgres_fe.h"
#include "parallel.h"
#include <fcntl.h>
#endif
+/* Mnemonic macros for indexing the fd array returned by pipe(2) */
#define PIPE_READ 0
#define PIPE_WRITE 1
-/* file-scope variables */
#ifdef WIN32
-static unsigned int tMasterThreadId = 0;
-static HANDLE termEvent = INVALID_HANDLE_VALUE;
-static int pgpipe(int handles[2]);
-static int piperead(int s, char *buf, int len);
/*
* Structure to hold info passed by _beginthreadex() to the function it calls
typedef struct
{
ArchiveHandle *AH;
- int worker;
int pipeRead;
int pipeWrite;
} WorkerInfo;
+/* Windows implementation of pipe access */
+static int pgpipe(int handles[2]);
+static int piperead(int s, char *buf, int len);
#define pipewrite(a,b,c) send(a,b,c,0)
-#else
+
+#else /* !WIN32 */
+
/*
- * aborting is only ever used in the master, the workers are fine with just
- * wantAbort.
+ * Variables for handling signals. aborting is only ever used in the master,
+ * the workers just need wantAbort.
*/
static bool aborting = false;
static volatile sig_atomic_t wantAbort = 0;
+/* Non-Windows implementation of pipe access */
#define pgpipe(a) pipe(a)
#define piperead(a,b,c) read(a,b,c)
#define pipewrite(a,b,c) write(a,b,c)
-#endif
+#endif /* WIN32 */
+
+/*
+ * State info for archive_close_connection() shutdown callback.
+ */
typedef struct ShutdownInformation
{
ParallelState *pstate;
static ShutdownInformation shutdown_info;
+#ifdef WIN32
+/* file-scope variables */
+static unsigned int tMasterThreadId = 0;
+static HANDLE termEvent = INVALID_HANDLE_VALUE;
+static DWORD tls_index;
+
+/* globally visible variables (needed by exit_nicely) */
+bool parallel_init_done = false;
+DWORD mainThreadId;
+#endif /* WIN32 */
+
static const char *modulename = gettext_noop("parallel archiver");
+/* Local function prototypes */
static ParallelSlot *GetMyPSlot(ParallelState *pstate);
static void archive_close_connection(int code, void *arg);
static void ShutdownWorkersHard(ParallelState *pstate);
static void WaitForTerminatingWorkers(ParallelState *pstate);
-
-#ifndef WIN32
-static void sigTermHandler(int signum);
-#endif
-static void SetupWorker(ArchiveHandle *AH, int pipefd[2], int worker);
+static void RunWorker(ArchiveHandle *AH, int pipefd[2]);
static bool HasEveryWorkerTerminated(ParallelState *pstate);
-
-static void lockTableNoWait(ArchiveHandle *AH, TocEntry *te);
+static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te);
static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]);
static char *getMessageFromMaster(int pipefd[2]);
static void sendMessageToMaster(int pipefd[2], const char *str);
#define messageEquals(msg, pattern) \
(strcmp(msg, pattern) == 0)
-#ifdef WIN32
-static void shutdown_parallel_dump_utils(int code, void *unused);
-bool parallel_init_done = false;
-static DWORD tls_index;
-DWORD mainThreadId;
-#endif
-
+/*
+ * Shutdown callback to clean up socket access
+ */
#ifdef WIN32
static void
shutdown_parallel_dump_utils(int code, void *unused)
}
#endif
+/*
+ * Initialize parallel dump support --- should be called early in process
+ * startup. (Currently, this is called whether or not we intend parallel
+ * activity.)
+ */
void
init_parallel_dump_utils(void)
{
WSADATA wsaData;
int err;
+ /* Prepare for threaded operation */
tls_index = TlsAlloc();
mainThreadId = GetCurrentThreadId();
+
+ /* Initialize socket access */
err = WSAStartup(MAKEWORD(2, 2), &wsaData);
if (err != 0)
{
fprintf(stderr, _("%s: WSAStartup failed: %d\n"), progname, err);
exit_nicely(1);
}
+ /* ... and arrange to shut it down at exit */
on_exit_nicely(shutdown_parallel_dump_utils, NULL);
parallel_init_done = true;
}
#endif
}
+/*
+ * Find the ParallelSlot for the current worker process or thread.
+ *
+ * Returns NULL if no matching slot is found (this implies we're the master).
+ */
static ParallelSlot *
GetMyPSlot(ParallelState *pstate)
{
int i;
for (i = 0; i < pstate->numWorkers; i++)
+ {
#ifdef WIN32
if (pstate->parallelSlot[i].threadId == GetCurrentThreadId())
#else
if (pstate->parallelSlot[i].pid == getpid())
#endif
return &(pstate->parallelSlot[i]);
+ }
return NULL;
}
/*
* A thread-local version of getLocalPQExpBuffer().
*
- * Non-reentrant but reduces memory leakage. (On Windows the memory leakage
- * will be one buffer per thread, which is at least better than one per call).
+ * Non-reentrant but reduces memory leakage: we'll consume one buffer per
+ * thread, which is much better than one per fmtId/fmtQualifiedId call.
*/
+#ifdef WIN32
static PQExpBuffer
getThreadLocalPQExpBuffer(void)
{
/*
* The Tls code goes awry if we use a static var, so we provide for both
- * static and auto, and omit any use of the static var when using Tls.
+ * static and auto, and omit any use of the static var when using Tls. We
+ * rely on TlsGetValue() to return 0 if the value is not yet set.
*/
static PQExpBuffer s_id_return = NULL;
PQExpBuffer id_return;
-#ifdef WIN32
if (parallel_init_done)
- id_return = (PQExpBuffer) TlsGetValue(tls_index); /* 0 when not set */
+ id_return = (PQExpBuffer) TlsGetValue(tls_index);
else
id_return = s_id_return;
-#else
- id_return = s_id_return;
-#endif
if (id_return) /* first time through? */
{
{
/* new buffer */
id_return = createPQExpBuffer();
-#ifdef WIN32
if (parallel_init_done)
TlsSetValue(tls_index, id_return);
else
s_id_return = id_return;
-#else
- s_id_return = id_return;
-#endif
-
}
return id_return;
}
+#endif /* WIN32 */
/*
- * pg_dump and pg_restore register the Archive pointer for the exit handler
- * (called from exit_nicely). This function mainly exists so that we can
- * keep shutdown_info in file scope only.
+ * pg_dump and pg_restore call this to register the cleanup handler
+ * as soon as they've created the ArchiveHandle.
*/
void
on_exit_close_archive(Archive *AHX)
}
/*
+ * Check to see if we've been told to abort, and exit the process/thread if
+ * so. We don't print any error message; that would just clutter the screen.
+ *
* If we have one worker that terminates for some reason, we'd like the other
* threads to terminate as well (and not finish with their 70 GB table dump
- * first...). Now in UNIX we can just kill these processes, and let the signal
- * handler set wantAbort to 1. In Windows we set a termEvent and this serves
- * as the signal for everyone to terminate. We don't print any error message,
- * that would just clutter the screen.
+ * first...). In Unix, the master sends SIGTERM and the worker's signal
+ * handler sets wantAbort to 1. In Windows we set a termEvent and this serves
+ * as the signal for worker threads to exit. Note that while we check this
+ * fairly frequently during data transfers, an idle worker doesn't come here
+ * at all, so additional measures are needed to force shutdown.
+ *
+ * XXX in parallel restore, slow server-side operations like CREATE INDEX
+ * are not interrupted by anything we do here. This needs more work.
*/
void
checkAborting(ArchiveHandle *AH)
}
/*
- * Shut down any remaining workers, waiting for them to finish.
+ * Forcibly shut down any remaining workers, waiting for them to finish.
*/
static void
ShutdownWorkersHard(ParallelState *pstate)
}
}
+/*
+ * Signal handler (Unix only)
+ */
#ifndef WIN32
-/* Signal handling (UNIX only) */
static void
-sigTermHandler(int signum)
+sigTermHandler(SIGNAL_ARGS)
{
wantAbort = 1;
}
#endif
/*
- * This function is called by both UNIX and Windows variants to set up
+ * This function is called by both Unix and Windows variants to set up
* and run a worker process. Caller should exit the process (or thread)
* upon return.
*/
static void
-SetupWorker(ArchiveHandle *AH, int pipefd[2], int worker)
+RunWorker(ArchiveHandle *AH, int pipefd[2])
{
/*
* Call the setup worker function that's defined in the ArchiveHandle.
Assert(AH->connection != NULL);
+ /*
+ * Execute commands until done.
+ */
WaitForCommands(AH, pipefd);
}
+/*
+ * Thread base function for Windows
+ */
#ifdef WIN32
static unsigned __stdcall
init_spawned_worker_win32(WorkerInfo *wi)
{
ArchiveHandle *AH;
int pipefd[2] = {wi->pipeRead, wi->pipeWrite};
- int worker = wi->worker;
+ /*
+ * Clone the archive so that we have our own state to work with, and in
+ * particular our own database connection.
+ */
AH = CloneArchive(wi->AH);
free(wi);
- SetupWorker(AH, pipefd, worker);
+ /* Run the worker ... */
+ RunWorker(AH, pipefd);
+
+ /* Clean up and exit the thread */
DeCloneArchive(AH);
_endthreadex(0);
return 0;
}
-#endif
+#endif /* WIN32 */
/*
- * This function starts the parallel dump or restore by spawning off the
- * worker processes in both Unix and Windows. For Windows, it creates a number
- * of threads while it does a fork() on Unix.
+ * This function starts a parallel dump or restore by spawning off the worker
+ * processes. For Windows, it creates a number of threads; on Unix the
+ * workers are created with fork().
*/
ParallelState *
ParallelBackupStart(ArchiveHandle *AH)
* set and falls back to AHX otherwise.
*/
shutdown_info.pstate = pstate;
- getLocalPQExpBuffer = getThreadLocalPQExpBuffer;
#ifdef WIN32
+ /* Set up thread management state */
tMasterThreadId = GetCurrentThreadId();
termEvent = CreateEvent(NULL, true, false, "Terminate");
+ /* Make fmtId() and fmtQualifiedId() use thread-local storage */
+ getLocalPQExpBuffer = getThreadLocalPQExpBuffer;
#else
+ /* Set up signal handling state */
signal(SIGTERM, sigTermHandler);
signal(SIGINT, sigTermHandler);
signal(SIGQUIT, sigTermHandler);
#endif
+ /* Create desired number of workers */
for (i = 0; i < pstate->numWorkers; i++)
{
#ifdef WIN32
int pipeMW[2],
pipeWM[2];
+ /* Create communication pipes for this worker */
if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0)
exit_horribly(modulename,
"could not create communication channels: %s\n",
pstate->parallelSlot[i].pipeRevWrite = pipeWM[PIPE_WRITE];
#ifdef WIN32
- /* Allocate a new structure for every worker */
+ /* Create transient structure to pass args to worker function */
wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo));
- wi->worker = i;
wi->AH = AH;
wi->pipeRead = pipeMW[PIPE_READ];
wi->pipeWrite = pipeWM[PIPE_WRITE];
handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
wi, 0, &(pstate->parallelSlot[i].threadId));
pstate->parallelSlot[i].hThread = handle;
-#else
+#else /* !WIN32 */
pid = fork();
if (pid == 0)
{
pstate->parallelSlot[i].pid = getpid();
- /*
- * Call CloneArchive on Unix as well even though technically we
- * don't need to because fork() gives us a copy in our own address
- * space already. But CloneArchive resets the state information
- * and also clones the database connection (for parallel dump)
- * which both seem kinda helpful.
- */
- pstate->parallelSlot[i].args->AH = CloneArchive(AH);
-
/* close read end of Worker -> Master */
closesocket(pipeWM[PIPE_READ]);
/* close write end of Master -> Worker */
closesocket(pstate->parallelSlot[j].pipeWrite);
}
- SetupWorker(pstate->parallelSlot[i].args->AH, pipefd, i);
+ /*
+ * Call CloneArchive on Unix as well as Windows, even though
+ * technically we don't need to because fork() gives us a copy in
+ * our own address space already. But CloneArchive resets the
+ * state information and also clones the database connection which
+ * both seem kinda helpful.
+ */
+ pstate->parallelSlot[i].args->AH = CloneArchive(AH);
+ /* Run the worker ... */
+ RunWorker(pstate->parallelSlot[i].args->AH, pipefd);
+
+ /* We can just exit(0) when done */
exit(0);
}
else if (pid < 0)
+ {
/* fork failed */
exit_horribly(modulename,
"could not create worker process: %s\n",
strerror(errno));
+ }
- /* we are the Master, pid > 0 here */
- Assert(pid > 0);
+ /* In Master after successful fork */
+ pstate->parallelSlot[i].pid = pid;
/* close read end of Master -> Worker */
closesocket(pipeMW[PIPE_READ]);
/* close write end of Worker -> Master */
closesocket(pipeWM[PIPE_WRITE]);
-
- pstate->parallelSlot[i].pid = pid;
-#endif
+#endif /* WIN32 */
}
/*
* Having forked off the workers, disable SIGPIPE so that master isn't
- * killed if it tries to send a command to a dead worker.
+ * killed if it tries to send a command to a dead worker. We don't want
+ * the workers to inherit this setting, though.
*/
#ifndef WIN32
signal(SIGPIPE, SIG_IGN);
}
/*
- * Tell all of our workers to terminate.
- *
- * Pretty straightforward routine, first we tell everyone to terminate, then
- * we listen to the workers' replies and finally close the sockets that we
- * have used for communication.
+ * Close down a parallel dump or restore.
*/
void
ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
{
int i;
+ /* No work if non-parallel */
if (pstate->numWorkers == 1)
return;
+ /* There should not be any unfinished jobs */
Assert(IsEveryWorkerIdle(pstate));
- /* close the sockets so that the workers know they can exit */
+ /* Close the sockets so that the workers know they can exit */
for (i = 0; i < pstate->numWorkers; i++)
{
closesocket(pstate->parallelSlot[i].pipeRead);
closesocket(pstate->parallelSlot[i].pipeWrite);
}
+
+ /* Wait for them to exit */
WaitForTerminatingWorkers(pstate);
/*
- * Remove the pstate again, so the exit handler in the parent will now
- * again fall back to closing AH->connection (if connected).
+ * Unlink pstate from shutdown_info, so the exit handler will again fall
+ * back to closing AH->connection (if connected).
*/
shutdown_info.pstate = NULL;
+ /* Release state (mere neatnik-ism, since we're about to terminate) */
free(pstate->parallelSlot);
free(pstate);
}
-
/*
- * The sequence is the following (for dump, similar for restore):
- *
- * The master process starts the parallel backup in ParllelBackupStart, this
- * forks the worker processes which enter WaitForCommand().
- *
- * The master process dispatches an individual work item to one of the worker
- * processes in DispatchJobForTocEntry(). It calls
- * AH->MasterStartParallelItemPtr, a routine of the output format. This
- * function's arguments are the parents archive handle AH (containing the full
- * catalog information), the TocEntry that the worker should work on and a
- * T_Action act indicating whether this is a backup or a restore item. The
- * function then converts the TocEntry assignment into a string that is then
- * sent over to the worker process. In the simplest case that would be
- * something like "DUMP 1234", with 1234 being the TocEntry id.
- *
- * The worker receives the message in the routine pointed to by
- * WorkerJobDumpPtr or WorkerJobRestorePtr. These are also pointers to
- * corresponding routines of the respective output format, e.g.
- * _WorkerJobDumpDirectory().
- *
- * Remember that we have forked off the workers only after we have read in the
- * catalog. That's why our worker processes can also access the catalog
- * information. Now they re-translate the textual representation to a TocEntry
- * on their side and do the required action (restore or dump).
- *
- * The result is again a textual string that is sent back to the master and is
- * interpreted by AH->MasterEndParallelItemPtr. This function can update state
- * or catalog information on the master's side, depending on the reply from
- * the worker process. In the end it returns status which is 0 for successful
- * execution.
- *
- * ---------------------------------------------------------------------
- * Master Worker
- *
- * enters WaitForCommands()
- * DispatchJobForTocEntry(...te...)
- *
- * [ Worker is IDLE ]
- *
- * arg = (MasterStartParallelItemPtr)()
- * send: DUMP arg
- * receive: DUMP arg
- * str = (WorkerJobDumpPtr)(arg)
- * [ Worker is WORKING ] ... gets te from arg ...
- * ... dump te ...
- * send: OK DUMP info
- *
- * In ListenToWorkers():
- *
- * [ Worker is FINISHED ]
- * receive: OK DUMP info
- * status = (MasterEndParallelItemPtr)(info)
+ * Dispatch a job to some free worker (caller must ensure there is one!)
*
- * In ReapWorkerStatus(&ptr):
- * *ptr = status;
- * [ Worker is IDLE ]
- * ---------------------------------------------------------------------
+ * te is the TocEntry to be processed, act is the action to be taken on it.
*/
void
DispatchJobForTocEntry(ArchiveHandle *AH, ParallelState *pstate, TocEntry *te,
char *arg;
/* our caller makes sure that at least one worker is idle */
- Assert(GetIdleWorker(pstate) != NO_SLOT);
worker = GetIdleWorker(pstate);
Assert(worker != NO_SLOT);
+ /* Construct and send command string */
arg = (AH->MasterStartParallelItemPtr) (AH, te, act);
sendMessageToWorker(pstate, worker, arg);
+ /* XXX aren't we leaking string here? (no, because it's static. Ick.) */
+
+ /* Remember worker is busy, and which TocEntry it's working on */
pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
pstate->parallelSlot[worker].args->te = te;
}
/*
- * Find the first free parallel slot (if any).
+ * Find an idle worker and return its slot number.
+ * Return NO_SLOT if none are idle.
*/
int
GetIdleWorker(ParallelState *pstate)
int i;
for (i = 0; i < pstate->numWorkers; i++)
+ {
if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE)
return i;
+ }
return NO_SLOT;
}
/*
- * Return true iff every worker process is in the WRKR_TERMINATED state.
+ * Return true iff every worker is in the WRKR_TERMINATED state.
*/
static bool
HasEveryWorkerTerminated(ParallelState *pstate)
int i;
for (i = 0; i < pstate->numWorkers; i++)
+ {
if (pstate->parallelSlot[i].workerStatus != WRKR_TERMINATED)
return false;
+ }
return true;
}
int i;
for (i = 0; i < pstate->numWorkers; i++)
+ {
if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE)
return false;
+ }
return true;
}
/*
- * ---------------------------------------------------------------------
- * One danger of the parallel backup is a possible deadlock:
+ * Acquire lock on a table to be dumped by a worker process.
+ *
+ * The master process is already holding an ACCESS SHARE lock. Ordinarily
+ * it's no problem for a worker to get one too, but if anything else besides
+ * pg_dump is running, there's a possible deadlock:
*
* 1) Master dumps the schema and locks all tables in ACCESS SHARE mode.
* 2) Another process requests an ACCESS EXCLUSIVE lock (which is not granted
* because the master holds a conflicting ACCESS SHARE lock).
- * 3) The worker process also requests an ACCESS SHARE lock to read the table.
- * The worker's not granted that lock but is enqueued behind the ACCESS
- * EXCLUSIVE lock request.
- * ---------------------------------------------------------------------
+ * 3) A worker process also requests an ACCESS SHARE lock to read the table.
+ * The worker is enqueued behind the ACCESS EXCLUSIVE lock request.
+ * 4) Now we have a deadlock, since the master is effectively waiting for
+ * the worker. The server cannot detect that, however.
*
- * Now what we do here is to just request a lock in ACCESS SHARE but with
- * NOWAIT in the worker prior to touching the table. If we don't get the lock,
+ * To prevent an infinite wait, prior to touching a table in a worker, request
+ * a lock in ACCESS SHARE mode but with NOWAIT. If we don't get the lock,
* then we know that somebody else has requested an ACCESS EXCLUSIVE lock and
- * are good to just fail the whole backup because we have detected a deadlock.
+ * so we have a deadlock. We must fail the backup in that case.
*/
static void
-lockTableNoWait(ArchiveHandle *AH, TocEntry *te)
+lockTableForWorker(ArchiveHandle *AH, TocEntry *te)
{
Archive *AHX = (Archive *) AH;
const char *qualId;
- PQExpBuffer query = createPQExpBuffer();
+ PQExpBuffer query;
PGresult *res;
- Assert(AH->format == archDirectory);
- Assert(strcmp(te->desc, "BLOBS") != 0);
+ /* Nothing to do for BLOBS */
+ if (strcmp(te->desc, "BLOBS") == 0)
+ return;
+
+ query = createPQExpBuffer();
+ /*
+ * XXX this is an unbelievably expensive substitute for knowing how to dig
+ * a table name out of a TocEntry.
+ */
appendPQExpBuffer(query,
"SELECT pg_namespace.nspname,"
" pg_class.relname "
}
/*
- * That's the main routine for the worker.
- * When it starts up it enters this routine and waits for commands from the
- * master process. After having processed a command it comes back to here to
- * wait for the next command. Finally it will receive a TERMINATE command and
- * exit.
+ * WaitForCommands: main routine for a worker process.
+ *
+ * Read and execute commands from the master until we see EOF on the pipe.
*/
static void
WaitForCommands(ArchiveHandle *AH, int pipefd[2])
char *command;
DumpId dumpId;
int nBytes;
- char *str = NULL;
+ char *str;
TocEntry *te;
for (;;)
{
if (!(command = getMessageFromMaster(pipefd)))
{
+ /* EOF ... clean up */
PQfinish(AH->connection);
AH->connection = NULL;
return;
if (messageStartsWith(command, "DUMP "))
{
- Assert(AH->format == archDirectory);
+ /* Decode the command */
sscanf(command + strlen("DUMP "), "%d%n", &dumpId, &nBytes);
Assert(nBytes == strlen(command) - strlen("DUMP "));
-
te = getTocEntryByDumpId(AH, dumpId);
Assert(te != NULL);
- /*
- * Lock the table but with NOWAIT. Note that the parent is already
- * holding a lock. If we cannot acquire another ACCESS SHARE MODE
- * lock, then somebody else has requested an exclusive lock in the
- * meantime. lockTableNoWait dies in this case to prevent a
- * deadlock.
- */
- if (strcmp(te->desc, "BLOBS") != 0)
- lockTableNoWait(AH, te);
+ /* Acquire lock on this table within the worker's session */
+ lockTableForWorker(AH, te);
- /*
- * The message we return here has been pg_malloc()ed and we are
- * responsible for free()ing it.
- */
+ /* Perform the dump command */
str = (AH->WorkerJobDumpPtr) (AH, te);
- Assert(AH->connection != NULL);
+
+ /* Return status to master */
sendMessageToMaster(pipefd, str);
+
+ /* we are responsible for freeing the status string */
free(str);
}
else if (messageStartsWith(command, "RESTORE "))
{
- Assert(AH->format == archDirectory || AH->format == archCustom);
- Assert(AH->connection != NULL);
-
+ /* Decode the command */
sscanf(command + strlen("RESTORE "), "%d%n", &dumpId, &nBytes);
Assert(nBytes == strlen(command) - strlen("RESTORE "));
-
te = getTocEntryByDumpId(AH, dumpId);
Assert(te != NULL);
- /*
- * The message we return here has been pg_malloc()ed and we are
- * responsible for free()ing it.
- */
+ /* Perform the restore command */
str = (AH->WorkerJobRestorePtr) (AH, te);
- Assert(AH->connection != NULL);
+
+ /* Return status to master */
sendMessageToMaster(pipefd, str);
+
+ /* we are responsible for freeing the status string */
free(str);
}
else
exit_horribly(modulename,
- "unrecognized command on communication channel: %s\n",
+ "unrecognized command received from master: \"%s\"\n",
command);
/* command was pg_malloc'd and we are responsible for free()ing it. */
}
/*
- * ---------------------------------------------------------------------
- * Note the status change:
+ * Check for status messages from workers.
*
- * DispatchJobForTocEntry WRKR_IDLE -> WRKR_WORKING
- * ListenToWorkers WRKR_WORKING -> WRKR_FINISHED / WRKR_TERMINATED
- * ReapWorkerStatus WRKR_FINISHED -> WRKR_IDLE
- * ---------------------------------------------------------------------
+ * If do_wait is true, wait to get a status message; otherwise, just return
+ * immediately if there is none available.
*
- * Just calling ReapWorkerStatus() when all workers are working might or might
- * not give you an idle worker because you need to call ListenToWorkers() in
- * between and only thereafter ReapWorkerStatus(). This is necessary in order
- * to get and deal with the status (=result) of the worker's execution.
+ * When we get a status message, we let MasterEndParallelItemPtr process it,
+ * then save the resulting status code and switch the worker's state to
+ * WRKR_FINISHED. Later, caller must call ReapWorkerStatus() to verify
+ * that the status was "OK" and push the worker back to IDLE state.
+ *
+ * XXX Rube Goldberg would be proud of this API, but no one else should be.
+ *
+ * XXX is it worth checking for more than one status message per call?
+ * It seems somewhat unlikely that multiple workers would finish at exactly
+ * the same time.
*/
void
ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
int worker;
char *msg;
+ /* Try to collect a status message */
msg = getMessageFromWorker(pstate, do_wait, &worker);
if (!msg)
{
+ /* If do_wait is true, we must have detected EOF on some socket */
if (do_wait)
exit_horribly(modulename, "a worker process died unexpectedly\n");
return;
}
+ /* Process it and update our idea of the worker's status */
if (messageStartsWith(msg, "OK "))
{
+ TocEntry *te = pstate->parallelSlot[worker].args->te;
char *statusString;
- TocEntry *te;
- pstate->parallelSlot[worker].workerStatus = WRKR_FINISHED;
- te = pstate->parallelSlot[worker].args->te;
if (messageStartsWith(msg, "OK RESTORE "))
{
statusString = msg + strlen("OK RESTORE ");
exit_horribly(modulename,
"invalid message received from worker: \"%s\"\n",
msg);
+ pstate->parallelSlot[worker].workerStatus = WRKR_FINISHED;
}
else
exit_horribly(modulename,
"invalid message received from worker: \"%s\"\n",
msg);
- /* both Unix and Win32 return pg_malloc()ed space, so we free it */
+ /* Free the string returned from getMessageFromWorker */
free(msg);
}
/*
- * This function is executed in the master process.
+ * Check to see if any worker is in WRKR_FINISHED state. If so,
+ * return its command status code into *status, reset it to IDLE state,
+ * and return its slot number. Otherwise return NO_SLOT.
*
- * This function is used to get the return value of a terminated worker
- * process. If a process has terminated, its status is stored in *status and
- * the id of the worker is returned.
+ * This function is executed in the master process.
*/
int
ReapWorkerStatus(ParallelState *pstate, int *status)
}
/*
- * This function is executed in the master process.
+ * Wait, if necessary, until we have at least one idle worker.
+ * Reap worker status as necessary to move FINISHED workers to IDLE state.
+ *
+ * We assume that no extra processing is required when reaping a finished
+ * command, except for checking that the status was OK (zero).
+ * Caution: that assumption means that this function can only be used in
+ * parallel dump, not parallel restore, because the latter has a more
+ * complex set of rules about handling status.
*
- * It looks for an idle worker process and only returns if there is one.
+ * This function is executed in the master process.
*/
void
EnsureIdleWorker(ArchiveHandle *AH, ParallelState *pstate)
}
/*
- * This function is executed in the master process.
+ * Wait for all workers to be idle.
+ * Reap worker status as necessary to move FINISHED workers to IDLE state.
+ *
+ * We assume that no extra processing is required when reaping a finished
+ * command, except for checking that the status was OK (zero).
+ * Caution: that assumption means that this function can only be used in
+ * parallel dump, not parallel restore, because the latter has a more
+ * complex set of rules about handling status.
*
- * It waits for all workers to terminate.
+ * This function is executed in the master process.
*/
void
EnsureWorkersFinished(ArchiveHandle *AH, ParallelState *pstate)
}
/*
- * This function is executed in the worker process.
+ * Read one command message from the master, blocking if necessary
+ * until one is available, and return it as a malloc'd string.
+ * On EOF, return NULL.
*
- * It returns the next message on the communication channel, blocking until it
- * becomes available.
+ * This function is executed in worker processes.
*/
static char *
getMessageFromMaster(int pipefd[2])
}
/*
- * This function is executed in the worker process.
+ * Send a status message to the master.
*
- * It sends a message to the master on the communication channel.
+ * This function is executed in worker processes.
*/
static void
sendMessageToMaster(int pipefd[2], const char *str)
}
/*
- * A select loop that repeats calling select until a descriptor in the read
- * set becomes readable. On Windows we have to check for the termination event
- * from time to time, on Unix we can just block forever.
+ * Wait until some descriptor in "workerset" becomes readable.
+ * Returns -1 on error, else the number of readable descriptors.
*/
static int
select_loop(int maxFd, fd_set *workerset)
fd_set saveSet = *workerset;
#ifdef WIN32
- /* should always be the master */
- Assert(tMasterThreadId == GetCurrentThreadId());
-
for (;;)
{
/*
- * sleep a quarter of a second before checking if we should terminate.
+ * Sleep a quarter of a second before checking if we should terminate.
+ *
+ * XXX we're not actually checking for a cancel interrupt ... but we
+ * should be.
*/
struct timeval tv = {0, 250000};
if (i)
break;
}
-#else /* UNIX */
-
+#else /* !WIN32 */
for (;;)
{
*workerset = saveSet;
continue;
break;
}
-#endif
+#endif /* WIN32 */
return i;
}
/*
- * This function is executed in the master process.
+ * Check for messages from worker processes.
+ *
+ * If a message is available, return it as a malloc'd string, and put the
+ * index of the sending worker in *worker.
*
- * It returns the next message from the worker on the communication channel,
- * optionally blocking (do_wait) until it becomes available.
+ * If nothing is available, wait if "do_wait" is true, else return NULL.
*
- * The id of the worker is returned in *worker.
+ * If we detect EOF on any socket, we'll return NULL. It's not great that
+ * that's hard to distinguish from the no-data-available case, but for now
+ * our one caller is okay with that.
+ *
+ * This function is executed in the master process.
*/
static char *
getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
int maxFd = -1;
struct timeval nowait = {0, 0};
+ /* construct bitmap of socket descriptors for select() */
FD_ZERO(&workerset);
-
for (i = 0; i < pstate->numWorkers; i++)
{
if (pstate->parallelSlot[i].workerStatus == WRKR_TERMINATED)
continue;
FD_SET(pstate->parallelSlot[i].pipeRead, &workerset);
- /* actually WIN32 ignores the first parameter to select()... */
if (pstate->parallelSlot[i].pipeRead > maxFd)
maxFd = pstate->parallelSlot[i].pipeRead;
}
}
if (i < 0)
- exit_horribly(modulename, "error in ListenToWorkers(): %s\n", strerror(errno));
+ exit_horribly(modulename, "select() failed: %s\n", strerror(errno));
for (i = 0; i < pstate->numWorkers; i++)
{
if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset))
continue;
+ /*
+ * Read the message if any. If the socket is ready because of EOF,
+ * we'll return NULL instead (and the socket will stay ready, so the
+ * condition will persist).
+ *
+ * Note: because this is a blocking read, we'll wait if only part of
+ * the message is available. Waiting a long time would be bad, but
+ * since worker status messages are short and are always sent in one
+ * operation, it shouldn't be a problem in practice.
+ */
msg = readMessageFromPipe(pstate->parallelSlot[i].pipeRead);
*worker = i;
return msg;
}
/*
- * This function is executed in the master process.
+ * Send a command message to the specified worker process.
*
- * It sends a message to a certain worker on the communication channel.
+ * This function is executed in the master process.
*/
static void
sendMessageToWorker(ParallelState *pstate, int worker, const char *str)
{
/*
* If we're already aborting anyway, don't care if we succeed or not.
- * The child might have gone already.
+ * The child might have gone already. (XXX but if we're aborting
+ * already, why are we here at all?)
*/
#ifndef WIN32
if (!aborting)
}
/*
- * The underlying function to read a message from the communication channel
- * (fd) with optional blocking (do_wait).
+ * Read one message from the specified pipe (fd), blocking if necessary
+ * until one is available, and return it as a malloc'd string.
+ * On EOF, return NULL.
+ *
+ * A "message" on the channel is just a null-terminated string.
*/
static char *
readMessageFromPipe(int fd)
int ret;
/*
- * The problem here is that we need to deal with several possibilities: we
- * could receive only a partial message or several messages at once. The
- * caller expects us to return exactly one message however.
- *
- * We could either read in as much as we can and keep track of what we
- * delivered back to the caller or we just read byte by byte. Once we see
- * (char) 0, we know that it's the message's end. This would be quite
- * inefficient for more data but since we are reading only on the command
- * channel, the performance loss does not seem worth the trouble of
- * keeping internal states for different file descriptors.
+ * In theory, if we let piperead() read multiple bytes, it might give us
+ * back fragments of multiple messages. (That can't actually occur, since
+ * neither master nor workers send more than one message without waiting
+ * for a reply, but we don't wish to assume that here.) For simplicity,
+ * read a byte at a time until we get the terminating '\0'. This method
+ * is a bit inefficient, but since this is only used for relatively short
+ * command and status strings, it shouldn't matter.
*/
bufsize = 64; /* could be any number */
msg = (char *) pg_malloc(bufsize);
-
msgsize = 0;
for (;;)
{
- Assert(msgsize <= bufsize);
+ Assert(msgsize < bufsize);
ret = piperead(fd, msg + msgsize, 1);
-
- /* worker has closed the connection or another error happened */
if (ret <= 0)
- break;
+ break; /* error or connection closure */
Assert(ret == 1);
if (msg[msgsize] == '\0')
- return msg;
+ return msg; /* collected whole message */
msgsize++;
- if (msgsize == bufsize)
+ if (msgsize == bufsize) /* enlarge buffer if needed */
{
- /* could be any number */
- bufsize += 16;
+ bufsize += 16; /* could be any number */
msg = (char *) pg_realloc(msg, bufsize);
}
}
- /*
- * Worker has closed the connection, make sure to clean up before return
- * since we are not returning msg (but did allocate it).
- */
+ /* Other end has closed the connection */
pg_free(msg);
-
return NULL;
}
#ifdef WIN32
+
/*
- * This is a replacement version of pipe for Win32 which allows returned
- * handles to be used in select(). Note that read/write calls must be replaced
- * with recv/send. "handles" have to be integers so we check for errors then
- * cast to integers.
+ * This is a replacement version of pipe(2) for Windows which allows the pipe
+ * handles to be used in select().
+ *
+ * Reads and writes on the pipe must go through piperead()/pipewrite().
+ *
+ * For consistency with Unix we declare the returned handles as "int".
+ * This is okay even on WIN64 because system handles are not more than
+ * 32 bits wide, but we do have to do some casting.
*/
static int
pgpipe(int handles[2])
{
write_msg(modulename, "pgpipe: could not connect socket: error code %d\n",
WSAGetLastError());
+ closesocket(handles[1]);
+ handles[1] = -1;
closesocket(s);
return -1;
}
return 0;
}
+/*
+ * Windows implementation of reading from a pipe.
+ */
static int
piperead(int s, char *buf, int len)
{
int ret = recv(s, buf, len, 0);
if (ret < 0 && WSAGetLastError() == WSAECONNRESET)
- /* EOF on the pipe! (win32 socket based implementation) */
+ {
+ /* EOF on the pipe! */
ret = 0;
+ }
return ret;
}
-#endif
+#endif /* WIN32 */