static void ParallelErrorContext(void *arg);
static void ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc);
static void ParallelWorkerMain(Datum main_arg);
+static void WaitForParallelWorkersToExit(ParallelContext *pcxt);
/*
* Establish a new parallel context. This should be done after entering
MemoryContextSwitchTo(oldcontext);
}
+/*
+ * Reinitialize the dynamic shared memory segment for a parallel context such
+ * that we could launch workers for it again.
+ */
+void
+ReinitializeParallelDSM(ParallelContext *pcxt)
+{
+ FixedParallelState *fps;
+ char *error_queue_space;
+ int i;
+
+ if (pcxt->nworkers_launched == 0)
+ return;
+
+ WaitForParallelWorkersToFinish(pcxt);
+ WaitForParallelWorkersToExit(pcxt);
+
+ /* Reset a few bits of fixed parallel state to a clean state. */
+ fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED);
+ fps->workers_attached = 0;
+ fps->last_xlog_end = 0;
+
+ /* Recreate error queues. */
+ error_queue_space =
+ shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE);
+ for (i = 0; i < pcxt->nworkers; ++i)
+ {
+ char *start;
+ shm_mq *mq;
+
+ start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
+ mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
+ shm_mq_set_receiver(mq, MyProc);
+ pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
+ }
+
+ /* Reset number of workers launched. */
+ pcxt->nworkers_launched = 0;
+}
+
/*
* Launch parallel workers.
*/
/* We might be running in a short-lived memory context. */
oldcontext = MemoryContextSwitchTo(TopTransactionContext);
- /*
- * This function can be called for a parallel context for which it has
- * already been called previously, but only if all of the old workers
- * have already exited. When this case arises, we need to do some extra
- * reinitialization.
- */
- if (pcxt->nworkers_launched > 0)
- {
- FixedParallelState *fps;
- char *error_queue_space;
-
- /* Clean out old worker handles. */
- for (i = 0; i < pcxt->nworkers; ++i)
- {
- if (pcxt->worker[i].error_mqh != NULL)
- elog(ERROR, "previously launched worker still alive");
- if (pcxt->worker[i].bgwhandle != NULL)
- {
- pfree(pcxt->worker[i].bgwhandle);
- pcxt->worker[i].bgwhandle = NULL;
- }
- }
-
- /* Reset a few bits of fixed parallel state to a clean state. */
- fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED);
- fps->workers_attached = 0;
- fps->last_xlog_end = 0;
-
- /* Recreate error queues. */
- error_queue_space =
- shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE);
- for (i = 0; i < pcxt->nworkers; ++i)
- {
- char *start;
- shm_mq *mq;
-
- start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
- mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
- shm_mq_set_receiver(mq, MyProc);
- pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
- }
-
- /* Reset number of workers launched. */
- pcxt->nworkers_launched = 0;
- }
-
/* Configure a worker. */
snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d",
MyProcPid);
}
/*
- * Wait for all workers to exit.
+ * Wait for all workers to finish computing.
*
* Even if the parallel operation seems to have completed successfully, it's
* important to call this function afterwards. We must not miss any errors
}
}
+/*
+ * Wait for all workers to exit.
+ *
+ * This function ensures that workers have been completely shutdown. The
+ * difference between WaitForParallelWorkersToFinish and this function is
+ * that former just ensures that last message sent by worker backend is
+ * received by master backend whereas this ensures the complete shutdown.
+ */
+static void
+WaitForParallelWorkersToExit(ParallelContext *pcxt)
+{
+ int i;
+
+ /* Wait until the workers actually die. */
+ for (i = 0; i < pcxt->nworkers; ++i)
+ {
+ BgwHandleStatus status;
+
+ if (pcxt->worker == NULL || pcxt->worker[i].bgwhandle == NULL)
+ continue;
+
+ status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle);
+
+ /*
+ * If the postmaster kicked the bucket, we have no chance of cleaning
+ * up safely -- we won't be able to tell when our workers are actually
+ * dead. This doesn't necessitate a PANIC since they will all abort
+ * eventually, but we can't safely continue this session.
+ */
+ if (status == BGWH_POSTMASTER_DIED)
+ ereport(FATAL,
+ (errcode(ERRCODE_ADMIN_SHUTDOWN),
+ errmsg("postmaster exited during a parallel transaction")));
+
+ /* Release memory. */
+ pfree(pcxt->worker[i].bgwhandle);
+ pcxt->worker[i].bgwhandle = NULL;
+ }
+}
+
/*
* Destroy a parallel context.
*
{
for (i = 0; i < pcxt->nworkers; ++i)
{
- if (pcxt->worker[i].bgwhandle != NULL)
- TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
if (pcxt->worker[i].error_mqh != NULL)
{
+ TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
+
pfree(pcxt->worker[i].error_mqh);
pcxt->worker[i].error_mqh = NULL;
}
pcxt->private_memory = NULL;
}
- /* Wait until the workers actually die. */
- for (i = 0; i < pcxt->nworkers; ++i)
- {
- BgwHandleStatus status;
-
- if (pcxt->worker == NULL || pcxt->worker[i].bgwhandle == NULL)
- continue;
-
- /*
- * We can't finish transaction commit or abort until all of the
- * workers are dead. This means, in particular, that we can't respond
- * to interrupts at this stage.
- */
- HOLD_INTERRUPTS();
- status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle);
- RESUME_INTERRUPTS();
-
- /*
- * If the postmaster kicked the bucket, we have no chance of cleaning
- * up safely -- we won't be able to tell when our workers are actually
- * dead. This doesn't necessitate a PANIC since they will all abort
- * eventually, but we can't safely continue this session.
- */
- if (status == BGWH_POSTMASTER_DIED)
- ereport(FATAL,
- (errcode(ERRCODE_ADMIN_SHUTDOWN),
- errmsg("postmaster exited during a parallel transaction")));
-
- /* Release memory. */
- pfree(pcxt->worker[i].bgwhandle);
- pcxt->worker[i].bgwhandle = NULL;
- }
+ /*
+ * We can't finish transaction commit or abort until all of the
+ * workers have exited. This means, in particular, that we can't respond
+ * to interrupts at this stage.
+ */
+ HOLD_INTERRUPTS();
+ WaitForParallelWorkersToExit(pcxt);
+ RESUME_INTERRUPTS();
/* Free the worker array itself. */
if (pcxt->worker != NULL)
case 'X': /* Terminate, indicating clean exit */
{
- pfree(pcxt->worker[i].bgwhandle);
pfree(pcxt->worker[i].error_mqh);
- pcxt->worker[i].bgwhandle = NULL;
pcxt->worker[i].error_mqh = NULL;
break;
}
ExecParallelEstimateContext *e);
static bool ExecParallelInitializeDSM(PlanState *node,
ExecParallelInitializeDSMContext *d);
-static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt);
+static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
+ bool reinitialize);
static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
SharedExecutorInstrumentation *instrumentation);
* to the main backend and start the workers.
*/
static shm_mq_handle **
-ExecParallelSetupTupleQueues(ParallelContext *pcxt)
+ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
{
shm_mq_handle **responseq;
char *tqueuespace;
responseq = (shm_mq_handle **)
palloc(pcxt->nworkers * sizeof(shm_mq_handle *));
- /* Allocate space from the DSM for the queues themselves. */
- tqueuespace = shm_toc_allocate(pcxt->toc,
- PARALLEL_TUPLE_QUEUE_SIZE * pcxt->nworkers);
+ /*
+ * If not reinitializing, allocate space from the DSM for the queues;
+ * otherwise, find the already allocated space.
+ */
+ if (!reinitialize)
+ tqueuespace =
+ shm_toc_allocate(pcxt->toc,
+ PARALLEL_TUPLE_QUEUE_SIZE * pcxt->nworkers);
+ else
+ tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE);
/* Create the queues, and become the receiver for each. */
for (i = 0; i < pcxt->nworkers; ++i)
}
/* Add array of queues to shm_toc, so others can find it. */
- shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
+ if (!reinitialize)
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
/* Return array of handles. */
return responseq;
}
+/*
+ * Re-initialize the response queues for backend workers to return tuples
+ * to the main backend and start the workers.
+ */
+shm_mq_handle **
+ExecParallelReinitializeTupleQueues(ParallelContext *pcxt)
+{
+ return ExecParallelSetupTupleQueues(pcxt, true);
+}
+
/*
* Sets up the required infrastructure for backend workers to perform
* execution and return results to the main backend.
pei->buffer_usage = bufusage_space;
/* Set up tuple queues. */
- pei->tqueue = ExecParallelSetupTupleQueues(pcxt);
+ pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
/*
* If instrumentation options were supplied, allocate space for the
static TupleTableSlot *gather_getnext(GatherState *gatherstate);
+static void ExecShutdownGatherWorkers(GatherState *node);
/* ----------------------------------------------------------------
bool got_any_worker = false;
/* Initialize the workers required to execute Gather node. */
- node->pei = ExecInitParallelPlan(node->ps.lefttree,
- estate,
- gather->num_workers);
+ if (!node->pei)
+ node->pei = ExecInitParallelPlan(node->ps.lefttree,
+ estate,
+ gather->num_workers);
/*
* Register backend workers. We might not get as many as we
gatherstate->need_to_scan_locally,
&done);
if (done)
- ExecShutdownGather(gatherstate);
+ ExecShutdownGatherWorkers(gatherstate);
if (HeapTupleIsValid(tup))
{
}
/* ----------------------------------------------------------------
- * ExecShutdownGather
+ * ExecShutdownGatherWorkers
*
- * Destroy the setup for parallel workers. Collect all the
- * stats after workers are stopped, else some work done by
- * workers won't be accounted.
+ * Destroy the parallel workers. Collect all the stats after
+ * workers are stopped, else some work done by workers won't be
+ * accounted.
* ----------------------------------------------------------------
*/
void
-ExecShutdownGather(GatherState *node)
+ExecShutdownGatherWorkers(GatherState *node)
{
/* Shut down tuple queue funnel before shutting down workers. */
if (node->funnel != NULL)
/* Now shut down the workers. */
if (node->pei != NULL)
- {
ExecParallelFinish(node->pei);
+}
+
+/* ----------------------------------------------------------------
+ * ExecShutdownGather
+ *
+ * Destroy the setup for parallel workers including parallel context.
+ * Collect all the stats after workers are stopped, else some work
+ * done by workers won't be accounted.
+ * ----------------------------------------------------------------
+ */
+void
+ExecShutdownGather(GatherState *node)
+{
+ ExecShutdownGatherWorkers(node);
+
+ /* Now destroy the parallel context. */
+ if (node->pei != NULL)
+ {
ExecParallelCleanup(node->pei);
node->pei = NULL;
}
ExecReScanGather(GatherState *node)
{
/*
- * Re-initialize the parallel context and workers to perform rescan of
- * relation. We want to gracefully shutdown all the workers so that they
+ * Re-initialize the parallel workers to perform rescan of relation.
+ * We want to gracefully shutdown all the workers so that they
* should be able to propagate any error or other information to master
- * backend before dying.
+ * backend before dying. Parallel context will be reused for rescan.
*/
- ExecShutdownGather(node);
+ ExecShutdownGatherWorkers(node);
node->initialized = false;
+ if (node->pei)
+ {
+ ReinitializeParallelDSM(node->pei->pcxt);
+ node->pei->tqueue =
+ ExecParallelReinitializeTupleQueues(node->pei->pcxt);
+ }
+
ExecReScan(node->ps.lefttree);
}