static void ExecPostprocessPlan(EState *estate);
static void ExecEndPlan(PlanState *planstate, EState *estate);
static void ExecutePlan(EState *estate, PlanState *planstate,
+ bool use_parallel_mode,
CmdType operation,
bool sendTuples,
long numberTuples,
if (!(eflags & (EXEC_FLAG_SKIP_TRIGGERS | EXEC_FLAG_EXPLAIN_ONLY)))
AfterTriggerBeginQuery();
- /* Enter parallel mode, if required by the query. */
- if (queryDesc->plannedstmt->parallelModeNeeded &&
- !(eflags & EXEC_FLAG_EXPLAIN_ONLY))
- EnterParallelMode();
-
MemoryContextSwitchTo(oldcontext);
}
if (!ScanDirectionIsNoMovement(direction))
ExecutePlan(estate,
queryDesc->planstate,
+ queryDesc->plannedstmt->parallelModeNeeded,
operation,
sendTuples,
count,
direction,
dest);
- /* Allow nodes to release or shut down resources. */
- (void) ExecShutdownNode(queryDesc->planstate);
-
/*
* shutdown tuple receiver, if we started it
*/
*/
MemoryContextSwitchTo(oldcontext);
- /* Exit parallel mode, if it was required by the query. */
- if (queryDesc->plannedstmt->parallelModeNeeded &&
- !(estate->es_top_eflags & EXEC_FLAG_EXPLAIN_ONLY))
- ExitParallelMode();
-
/*
* Release EState and per-query memory context. This should release
* everything the executor has allocated.
static void
ExecutePlan(EState *estate,
PlanState *planstate,
+ bool use_parallel_mode,
CmdType operation,
bool sendTuples,
long numberTuples,
*/
estate->es_direction = direction;
+ /*
+ * If a tuple count was supplied, we must force the plan to run without
+ * parallelism, because we might exit early.
+ */
+ if (numberTuples != 0)
+ use_parallel_mode = false;
+
+ /*
+ * If a tuple count was supplied, we must force the plan to run without
+ * parallelism, because we might exit early.
+ */
+ if (use_parallel_mode)
+ EnterParallelMode();
+
/*
* Loop until we've processed the proper number of tuples from the plan.
*/
* process so we just end the loop...
*/
if (TupIsNull(slot))
+ {
+ /* Allow nodes to release or shut down resources. */
+ (void) ExecShutdownNode(planstate);
break;
+ }
/*
* If we have a junk filter, then project a new tuple with the junk
if (numberTuples && numberTuples == current_tuple_count)
break;
}
+
+ if (use_parallel_mode)
+ ExitParallelMode();
}
#include "postgres.h"
#include "access/relscan.h"
+#include "access/xact.h"
#include "executor/execdebug.h"
#include "executor/execParallel.h"
#include "executor/nodeGather.h"
gatherstate = makeNode(GatherState);
gatherstate->ps.plan = (Plan *) node;
gatherstate->ps.state = estate;
- gatherstate->need_to_scan_workers = false;
gatherstate->need_to_scan_locally = !node->single_copy;
/*
* needs to allocate large dynamic segement, so it is better to do if it
* is really needed.
*/
- if (!node->pei)
+ if (!node->initialized)
{
EState *estate = node->ps.state;
-
- /* Initialize the workers required to execute Gather node. */
- node->pei = ExecInitParallelPlan(node->ps.lefttree,
- estate,
- ((Gather *) (node->ps.plan))->num_workers);
+ Gather *gather = (Gather *) node->ps.plan;
/*
- * Register backend workers. If the required number of workers are not
- * available then we perform the scan with available workers and if
- * there are no more workers available, then the Gather node will just
- * scan locally.
+ * Sometimes we might have to run without parallelism; but if
+ * parallel mode is active then we can try to fire up some workers.
*/
- LaunchParallelWorkers(node->pei->pcxt);
-
- node->funnel = CreateTupleQueueFunnel();
-
- for (i = 0; i < node->pei->pcxt->nworkers; ++i)
+ if (gather->num_workers > 0 && IsInParallelMode())
{
- if (node->pei->pcxt->worker[i].bgwhandle)
+ bool got_any_worker = false;
+
+ /* Initialize the workers required to execute Gather node. */
+ node->pei = ExecInitParallelPlan(node->ps.lefttree,
+ estate,
+ gather->num_workers);
+
+ /*
+ * Register backend workers. We might not get as many as we
+ * requested, or indeed any at all.
+ */
+ LaunchParallelWorkers(node->pei->pcxt);
+
+ /* Set up a tuple queue to collect the results. */
+ node->funnel = CreateTupleQueueFunnel();
+ for (i = 0; i < node->pei->pcxt->nworkers; ++i)
{
- shm_mq_set_handle(node->pei->tqueue[i],
- node->pei->pcxt->worker[i].bgwhandle);
- RegisterTupleQueueOnFunnel(node->funnel, node->pei->tqueue[i]);
- node->need_to_scan_workers = true;
+ if (node->pei->pcxt->worker[i].bgwhandle)
+ {
+ shm_mq_set_handle(node->pei->tqueue[i],
+ node->pei->pcxt->worker[i].bgwhandle);
+ RegisterTupleQueueOnFunnel(node->funnel,
+ node->pei->tqueue[i]);
+ got_any_worker = true;
+ }
}
+
+ /* No workers? Then never mind. */
+ if (!got_any_worker)
+ ExecShutdownGather(node);
}
- /* If no workers are available, we must always scan locally. */
- if (!node->need_to_scan_workers)
- node->need_to_scan_locally = true;
+ /* Run plan locally if no workers or not single-copy. */
+ node->need_to_scan_locally = (node->funnel == NULL)
+ || !gather->single_copy;
+ node->initialized = true;
}
slot = gather_getnext(node);
- if (TupIsNull(slot))
- {
- /*
- * Destroy the parallel context once we complete fetching all the
- * tuples. Otherwise, the DSM and workers will stick around for the
- * lifetime of the entire statement.
- */
- ExecShutdownGather(node);
- }
return slot;
}
*/
slot = gatherstate->ps.ps_ProjInfo->pi_slot;
- while (gatherstate->need_to_scan_workers ||
- gatherstate->need_to_scan_locally)
+ while (gatherstate->funnel != NULL || gatherstate->need_to_scan_locally)
{
- if (gatherstate->need_to_scan_workers)
+ if (gatherstate->funnel != NULL)
{
bool done = false;
gatherstate->need_to_scan_locally,
&done);
if (done)
- gatherstate->need_to_scan_workers = false;
+ ExecShutdownGather(gatherstate);
if (HeapTupleIsValid(tup))
{
void
ExecShutdownGather(GatherState *node)
{
- Gather *gather;
-
- if (node->pei == NULL || node->pei->pcxt == NULL)
- return;
-
- /*
- * Ensure all workers have finished before destroying the parallel context
- * to ensure a clean exit.
- */
- if (node->funnel)
+ /* Shut down tuple queue funnel before shutting down workers. */
+ if (node->funnel != NULL)
{
DestroyTupleQueueFunnel(node->funnel);
node->funnel = NULL;
}
- ExecParallelFinish(node->pei);
-
- /* destroy parallel context. */
- DestroyParallelContext(node->pei->pcxt);
- node->pei->pcxt = NULL;
-
- gather = (Gather *) node->ps.plan;
- node->need_to_scan_locally = !gather->single_copy;
- node->need_to_scan_workers = false;
+ /* Now shut down the workers. */
+ if (node->pei != NULL)
+ {
+ ExecParallelFinish(node->pei);
+ ExecParallelCleanup(node->pei);
+ node->pei = NULL;
+ }
}
/* ----------------------------------------------------------------
*/
ExecShutdownGather(node);
+ node->initialized = false;
+
ExecReScan(node->ps.lefttree);
}