ReinitializeParallelDSM(ParallelContext *pcxt)
{
FixedParallelState *fps;
- char *error_queue_space;
- int i;
/* Wait for any old workers to exit. */
if (pcxt->nworkers_launched > 0)
fps->last_xlog_end = 0;
/* Recreate error queues (if they exist). */
- error_queue_space =
- shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, true);
- Assert(pcxt->nworkers == 0 || error_queue_space != NULL);
- for (i = 0; i < pcxt->nworkers; ++i)
+ if (pcxt->nworkers > 0)
{
- char *start;
- shm_mq *mq;
+ char *error_queue_space;
+ int i;
- start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
- mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
- shm_mq_set_receiver(mq, MyProc);
- pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
+ error_queue_space =
+ shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, false);
+ for (i = 0; i < pcxt->nworkers; ++i)
+ {
+ char *start;
+ shm_mq *mq;
+
+ start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
+ mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
+ shm_mq_set_receiver(mq, MyProc);
+ pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
+ }
}
}