static const char *modulename = gettext_noop("parallel archiver");
static ParallelSlot *GetMyPSlot(ParallelState *pstate);
-static void parallel_msg_master(ParallelSlot *slot, const char *modulename,
- const char *fmt, va_list ap) pg_attribute_printf(3, 0);
static void archive_close_connection(int code, void *arg);
static void ShutdownWorkersHard(ParallelState *pstate);
static void WaitForTerminatingWorkers(ParallelState *pstate);
return NULL;
}
-/*
- * Fail and die, with a message to stderr. Parameters as for write_msg.
- *
- * This is defined in parallel.c, because in parallel mode, things are more
- * complicated. If the worker process does exit_horribly(), we forward its
- * last words to the master process. The master process then does
- * exit_horribly() with this error message itself and prints it normally.
- * After printing the message, exit_horribly() on the master will shut down
- * the remaining worker processes.
- */
-void
-exit_horribly(const char *modulename, const char *fmt,...)
-{
- va_list ap;
- ParallelState *pstate = shutdown_info.pstate;
- ParallelSlot *slot;
-
- va_start(ap, fmt);
-
- if (pstate == NULL)
- {
- /* Not in parallel mode, just write to stderr */
- vwrite_msg(modulename, fmt, ap);
- }
- else
- {
- slot = GetMyPSlot(pstate);
-
- if (!slot)
- /* We're the parent, just write the message out */
- vwrite_msg(modulename, fmt, ap);
- else
- /* If we're a worker process, send the msg to the master process */
- parallel_msg_master(slot, modulename, fmt, ap);
- }
-
- va_end(ap);
-
- exit_nicely(1);
-}
-
-/* Sends the error message from the worker to the master process */
-static void
-parallel_msg_master(ParallelSlot *slot, const char *modulename,
- const char *fmt, va_list ap)
-{
- char buf[512];
- int pipefd[2];
-
- pipefd[PIPE_READ] = slot->pipeRevRead;
- pipefd[PIPE_WRITE] = slot->pipeRevWrite;
-
- strcpy(buf, "ERROR ");
- vsnprintf(buf + strlen("ERROR "),
- sizeof(buf) - strlen("ERROR "), fmt, ap);
-
- sendMessageToMaster(pipefd, buf);
-}
-
/*
* A thread-local version of getLocalPQExpBuffer().
*
/*
* pg_dump and pg_restore register the Archive pointer for the exit handler
- * (called from exit_horribly). This function mainly exists so that we can
+ * (called from exit_nicely). This function mainly exists so that we can
* keep shutdown_info in file scope only.
*/
void
}
/*
- * This function can close archives in both the parallel and non-parallel
- * case.
+ * on_exit_nicely handler for shutting down database connections and
+ * worker processes cleanly.
*/
static void
archive_close_connection(int code, void *arg)
if (si->pstate)
{
+ /* In parallel mode, must figure out who we are */
ParallelSlot *slot = GetMyPSlot(si->pstate);
if (!slot)
{
/*
- * We're the master: We have already printed out the message
- * passed to exit_horribly() either from the master itself or from
- * a worker process. Now we need to close our own database
- * connection (only open during parallel dump but not restore) and
- * shut down the remaining workers.
+ * We're the master. Close our own database connection, if any,
+ * and then forcibly shut down workers.
*/
- DisconnectDatabase(si->AHX);
+ if (si->AHX)
+ DisconnectDatabase(si->AHX);
+
#ifndef WIN32
/*
- * Setting aborting to true switches to best-effort-mode
- * (send/receive but ignore errors) in communicating with our
- * workers.
+ * Setting aborting to true shuts off error/warning messages that
+ * are no longer useful once we start killing workers.
*/
aborting = true;
#endif
ShutdownWorkersHard(si->pstate);
}
- else if (slot->args->AH)
- DisconnectDatabase(&(slot->args->AH->public));
+ else
+ {
+ /*
+ * We're a worker. Shut down our own DB connection if any. On
+ * Windows, we also have to close our communication sockets, to
+ * emulate what will happen on Unix when the worker process exits.
+ * (Without this, if this is a premature exit, the master would
+ * fail to detect it because there would be no EOF condition on
+ * the other end of the pipe.)
+ */
+ if (slot->args->AH)
+ DisconnectDatabase(&(slot->args->AH->public));
+
+#ifdef WIN32
+ closesocket(slot->pipeRevRead);
+ closesocket(slot->pipeRevWrite);
+#endif
+ }
+ }
+ else
+ {
+ /* Non-parallel operation: just kill the master DB connection */
+ if (si->AHX)
+ DisconnectDatabase(si->AHX);
}
- else if (si->AHX)
- DisconnectDatabase(si->AHX);
}
/*
* threads to terminate as well (and not finish with their 70 GB table dump
* first...). Now in UNIX we can just kill these processes, and let the signal
* handler set wantAbort to 1. In Windows we set a termEvent and this serves
- * as the signal for everyone to terminate.
+ * as the signal for everyone to terminate. We don't print any error message,
+ * that would just clutter the screen.
*/
void
checkAborting(ArchiveHandle *AH)
#else
if (wantAbort)
#endif
- exit_horribly(modulename, "worker is terminating\n");
+ exit_nicely(1);
}
/*
#ifndef WIN32
int i;
- signal(SIGPIPE, SIG_IGN);
-
/*
* Close our write end of the sockets so that the workers know they can
* exit.
#endif
/*
- * This function is called by both UNIX and Windows variants to set up a
- * worker process.
+ * This function is called by both UNIX and Windows variants to set up
+ * and run a worker process. Caller should exit the process (or thread)
+ * upon return.
*/
static void
SetupWorker(ArchiveHandle *AH, int pipefd[2], int worker)
{
/*
* Call the setup worker function that's defined in the ArchiveHandle.
- *
- * We get the raw connection only for the reason that we can close it
- * properly when we shut down. This happens only that way when it is
- * brought down because of an error.
*/
(AH->SetupWorkerPtr) ((Archive *) AH);
Assert(AH->connection != NULL);
WaitForCommands(AH, pipefd);
-
- closesocket(pipefd[PIPE_READ]);
- closesocket(pipefd[PIPE_WRITE]);
}
#ifdef WIN32
pstate->parallelSlot[i].args = (ParallelArgs *) pg_malloc(sizeof(ParallelArgs));
pstate->parallelSlot[i].args->AH = NULL;
pstate->parallelSlot[i].args->te = NULL;
+
+ /* master's ends of the pipes */
+ pstate->parallelSlot[i].pipeRead = pipeWM[PIPE_READ];
+ pstate->parallelSlot[i].pipeWrite = pipeMW[PIPE_WRITE];
+ /* child's ends of the pipes */
+ pstate->parallelSlot[i].pipeRevRead = pipeMW[PIPE_READ];
+ pstate->parallelSlot[i].pipeRevWrite = pipeWM[PIPE_WRITE];
+
#ifdef WIN32
/* Allocate a new structure for every worker */
wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo));
wi->worker = i;
wi->AH = AH;
- wi->pipeRead = pstate->parallelSlot[i].pipeRevRead = pipeMW[PIPE_READ];
- wi->pipeWrite = pstate->parallelSlot[i].pipeRevWrite = pipeWM[PIPE_WRITE];
+ wi->pipeRead = pipeMW[PIPE_READ];
+ wi->pipeWrite = pipeWM[PIPE_WRITE];
handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
wi, 0, &(pstate->parallelSlot[i].threadId));
pipefd[0] = pipeMW[PIPE_READ];
pipefd[1] = pipeWM[PIPE_WRITE];
- /*
- * Store the fds for the reverse communication in pstate. Actually
- * we only use this in case of an error and don't use pstate
- * otherwise in the worker process. On Windows we write to the
- * global pstate, in Unix we write to our process-local copy but
- * that's also where we'd retrieve this information back from.
- */
- pstate->parallelSlot[i].pipeRevRead = pipefd[PIPE_READ];
- pstate->parallelSlot[i].pipeRevWrite = pipefd[PIPE_WRITE];
pstate->parallelSlot[i].pid = getpid();
/*
/*
* Close all inherited fds for communication of the master with
- * the other workers.
+ * previously-forked workers.
*/
for (j = 0; j < i; j++)
{
pstate->parallelSlot[i].pid = pid;
#endif
-
- pstate->parallelSlot[i].pipeRead = pipeWM[PIPE_READ];
- pstate->parallelSlot[i].pipeWrite = pipeMW[PIPE_WRITE];
}
+ /*
+ * Having forked off the workers, disable SIGPIPE so that master isn't
+ * killed if it tries to send a command to a dead worker.
+ */
+#ifndef WIN32
+ signal(SIGPIPE, SIG_IGN);
+#endif
+
return pstate;
}
}
else
exit_horribly(modulename,
- "invalid message received from worker: %s\n", msg);
- }
- else if (messageStartsWith(msg, "ERROR "))
- {
- Assert(AH->format == archDirectory || AH->format == archCustom);
- pstate->parallelSlot[worker].workerStatus = WRKR_TERMINATED;
- exit_horribly(modulename, "%s", msg + strlen("ERROR "));
+ "invalid message received from worker: \"%s\"\n",
+ msg);
}
else
- exit_horribly(modulename, "invalid message received from worker: %s\n", msg);
+ exit_horribly(modulename,
+ "invalid message received from worker: \"%s\"\n",
+ msg);
/* both Unix and Win32 return pg_malloc()ed space, so we free it */
free(msg);