/* We might be running in a short-lived memory context. */
oldcontext = MemoryContextSwitchTo(TopTransactionContext);
+ /*
+ * This function can be called for a parallel context for which it has
+ * already been called previously, but only if all of the old workers
+ * have already exited. When this case arises, we need to do some extra
+ * reinitialization.
+ */
+ if (pcxt->nworkers_launched > 0)
+ {
+ FixedParallelState *fps;
+ char *error_queue_space;
+
+ /* Clean out old worker handles. */
+ for (i = 0; i < pcxt->nworkers; ++i)
+ {
+ if (pcxt->worker[i].error_mqh != NULL)
+ elog(ERROR, "previously launched worker still alive");
+ if (pcxt->worker[i].bgwhandle != NULL)
+ {
+ pfree(pcxt->worker[i].bgwhandle);
+ pcxt->worker[i].bgwhandle = NULL;
+ }
+ }
+
+ /* Reset a few bits of fixed parallel state to a clean state. */
+ fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED);
+ fps->workers_attached = 0;
+ fps->last_xlog_end = 0;
+
+ /* Recreate error queues. */
+ error_queue_space =
+ shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE);
+ for (i = 0; i < pcxt->nworkers; ++i)
+ {
+ char *start;
+ shm_mq *mq;
+
+ start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
+ mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
+ shm_mq_set_receiver(mq, MyProc);
+ pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
+ }
+
+ /* Reset number of workers launched. */
+ pcxt->nworkers_launched = 0;
+ }
+
/* Configure a worker. */
snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d",
MyProcPid);
if (!any_registrations_failed &&
RegisterDynamicBackgroundWorker(&worker,
&pcxt->worker[i].bgwhandle))
+ {
shm_mq_set_handle(pcxt->worker[i].error_mqh,
pcxt->worker[i].bgwhandle);
+ pcxt->nworkers_launched++;
+ }
else
{
/*