/* List of active parallel contexts. */
static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list);
+/* Backend-local copy of data from FixedParallelState. */
+static pid_t ParallelMasterPid;
+
/*
* List of internal parallel worker entry points. We need this for
* reasons explained in LookupParallelWorkerFunction(), below.
static void HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg);
static void WaitForParallelWorkersToExit(ParallelContext *pcxt);
static parallel_worker_main_type LookupParallelWorkerFunction(const char *libraryname, const char *funcname);
+static void ParallelWorkerShutdown(int code, Datum arg);
/*
WaitForParallelWorkersToFinish(pcxt);
WaitForParallelWorkersToExit(pcxt);
pcxt->nworkers_launched = 0;
+ if (pcxt->any_message_received)
+ {
+ pfree(pcxt->any_message_received);
+ pcxt->any_message_received = NULL;
+ }
}
/* Reset a few bits of fixed parallel state to a clean state. */
}
}
+ /*
+ * Now that nworkers_launched has taken its final value, we can initialize
+ * any_message_received.
+ */
+ if (pcxt->nworkers_launched > 0)
+ pcxt->any_message_received =
+ palloc0(sizeof(bool) * pcxt->nworkers_launched);
+
/* Restore previous memory context. */
MemoryContextSwitchTo(oldcontext);
}
for (;;)
{
bool anyone_alive = false;
+ int nfinished = 0;
int i;
/*
for (i = 0; i < pcxt->nworkers_launched; ++i)
{
- if (pcxt->worker[i].error_mqh != NULL)
+ /*
+ * If error_mqh is NULL, then the worker has already exited
+ * cleanly. If we have received a message through error_mqh from
+ * the worker, we know it started up cleanly, and therefore we're
+ * certain to be notified when it exits.
+ */
+ if (pcxt->worker[i].error_mqh == NULL)
+ ++nfinished;
+ else if (pcxt->any_message_received[i])
{
anyone_alive = true;
break;
}
if (!anyone_alive)
- break;
+ {
+ /* If all workers are known to have finished, we're done. */
+ if (nfinished >= pcxt->nworkers_launched)
+ {
+ Assert(nfinished == pcxt->nworkers_launched);
+ break;
+ }
+
+ /*
+ * We didn't detect any living workers, but not all workers are
+ * known to have exited cleanly. Either not all workers have
+ * launched yet, or maybe some of them failed to start or
+ * terminated abnormally.
+ */
+ for (i = 0; i < pcxt->nworkers_launched; ++i)
+ {
+ pid_t pid;
+ shm_mq *mq;
+
+ /*
+ * If the worker is BGWH_NOT_YET_STARTED or BGWH_STARTED, we
+ * should just keep waiting. If it is BGWH_STOPPED, then
+ * further investigation is needed.
+ */
+ if (pcxt->worker[i].error_mqh == NULL ||
+ pcxt->worker[i].bgwhandle == NULL ||
+ GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle,
+ &pid) != BGWH_STOPPED)
+ continue;
+
+ /*
+ * Check whether the worker ended up stopped without ever
+ * attaching to the error queue. If so, the postmaster was
+ * unable to fork the worker or it exited without initializing
+ * properly. We must throw an error, since the caller may
+ * have been expecting the worker to do some work before
+ * exiting.
+ */
+ mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
+ if (shm_mq_get_sender(mq) == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("parallel worker failed to initialize"),
+ errhint("More details may be available in the server log.")));
+
+ /*
+ * The worker is stopped, but is attached to the error queue.
+ * Unless there's a bug somewhere, this will only happen when
+ * the worker writes messages and terminates after the
+ * CHECK_FOR_INTERRUPTS() near the top of this function and
+ * before the call to GetBackgroundWorkerPid(). In that case,
+ * or latch should have been set as well and the right things
+ * will happen on the next pass through the loop.
+ */
+ }
+ }
WaitLatch(MyLatch, WL_LATCH_SET, -1,
WAIT_EVENT_PARALLEL_FINISH);
{
char msgtype;
+ if (pcxt->any_message_received != NULL)
+ pcxt->any_message_received[i] = true;
+
msgtype = pq_getmsgbyte(msg);
switch (msgtype)
fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED, false);
MyFixedParallelState = fps;
+ /* Arrange to signal the leader if we exit. */
+ ParallelMasterPid = fps->parallel_master_pid;
+ ParallelMasterBackendId = fps->parallel_master_backend_id;
+ on_shmem_exit(ParallelWorkerShutdown, (Datum) 0);
+
/*
- * Now that we have a worker number, we can find and attach to the error
- * queue provided for us. That's good, because until we do that, any
- * errors that happen here will not be reported back to the process that
- * requested that this worker be launched.
+ * Now we can find and attach to the error queue provided for us. That's
+ * good, because until we do that, any errors that happen here will not be
+ * reported back to the process that requested that this worker be
+ * launched.
*/
error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE, false);
mq = (shm_mq *) (error_queue_space +
SetTempNamespaceState(fps->temp_namespace_id,
fps->temp_toast_namespace_id);
- /* Set ParallelMasterBackendId so we know how to address temp relations. */
- ParallelMasterBackendId = fps->parallel_master_backend_id;
-
/* Restore reindex state. */
reindexspace = shm_toc_lookup(toc, PARALLEL_KEY_REINDEX_STATE, false);
RestoreReindexState(reindexspace);
SpinLockRelease(&fps->mutex);
}
+/*
+ * Make sure the leader tries to read from our error queue one more time.
+ * This guards against the case where we exit uncleanly without sending an
+ * ErrorResponse to the leader, for example because some code calls proc_exit
+ * directly.
+ */
+static void
+ParallelWorkerShutdown(int code, Datum arg)
+{
+ SendProcSignal(ParallelMasterPid,
+ PROCSIG_PARALLEL_MESSAGE,
+ ParallelMasterBackendId);
+}
+
/*
* Look up (and possibly load) a parallel worker entry point function.
*