* greater than any 32-bit integer here so that values < 2^32 can be used
* by individual parallel nodes to store their own state.
*/
-#define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000001)
-#define PARALLEL_KEY_PARAMS UINT64CONST(0xE000000000000002)
-#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000003)
-#define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000004)
-#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000005)
-#define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000006)
-#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000007)
+#define PARALLEL_KEY_EXECUTOR_FIXED UINT64CONST(0xE000000000000001)
+#define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000002)
+#define PARALLEL_KEY_PARAMS UINT64CONST(0xE000000000000003)
+#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000004)
+#define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000005)
+#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000006)
+#define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000007)
+#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008)
#define PARALLEL_TUPLE_QUEUE_SIZE 65536
+/*
+ * Fixed-size random stuff that we need to pass to parallel workers.
+ */
+typedef struct FixedParallelExecutorState
+{
+ int64 tuples_needed; /* tuple bound, see ExecSetTupleBound */
+} FixedParallelExecutorState;
+
/*
* DSM structure for accumulating per-PlanState instrumentation.
*
* execution and return results to the main backend.
*/
ParallelExecutorInfo *
-ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
+ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers,
+ int64 tuples_needed)
{
ParallelExecutorInfo *pei;
ParallelContext *pcxt;
ExecParallelEstimateContext e;
ExecParallelInitializeDSMContext d;
+ FixedParallelExecutorState *fpes;
char *pstmt_data;
char *pstmt_space;
char *param_space;
* for the various things we need to store.
*/
+ /* Estimate space for fixed-size state. */
+ shm_toc_estimate_chunk(&pcxt->estimator,
+ sizeof(FixedParallelExecutorState));
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+
/* Estimate space for query text. */
query_len = strlen(estate->es_sourceText);
shm_toc_estimate_chunk(&pcxt->estimator, query_len);
* asked for has been allocated or initialized yet, though, so do that.
*/
+ /* Store fixed-size state. */
+ fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState));
+ fpes->tuples_needed = tuples_needed;
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes);
+
/* Store query string */
query_string = shm_toc_allocate(pcxt->toc, query_len);
memcpy(query_string, estate->es_sourceText, query_len);
void
ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
{
+ FixedParallelExecutorState *fpes;
BufferUsage *buffer_usage;
DestReceiver *receiver;
QueryDesc *queryDesc;
void *area_space;
dsa_area *area;
+ /* Get fixed-size state. */
+ fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
+
/* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
receiver = ExecParallelGetReceiver(seg, toc);
instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true);
queryDesc->planstate->state->es_query_dsa = area;
ExecParallelInitializeWorker(queryDesc->planstate, toc);
- /* Run the plan */
- ExecutorRun(queryDesc, ForwardScanDirection, 0L, true);
+ /* Pass down any tuple bound */
+ ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate);
+
+ /*
+ * Run the plan. If we specified a tuple bound, be careful not to demand
+ * more tuples than that.
+ */
+ ExecutorRun(queryDesc,
+ ForwardScanDirection,
+ fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed,
+ true);
/* Shut down the executor */
ExecutorFinish(queryDesc);
return false;
}
+
+/*
+ * ExecSetTupleBound
+ *
+ * Set a tuple bound for a planstate node. This lets child plan nodes
+ * optimize based on the knowledge that the maximum number of tuples that
+ * their parent will demand is limited. The tuple bound for a node may
+ * only be changed between scans (i.e., after node initialization or just
+ * before an ExecReScan call).
+ *
+ * Any negative tuples_needed value means "no limit", which should be the
+ * default assumption when this is not called at all for a particular node.
+ *
+ * Note: if this is called repeatedly on a plan tree, the exact same set
+ * of nodes must be updated with the new limit each time; be careful that
+ * only unchanging conditions are tested here.
+ */
+void
+ExecSetTupleBound(int64 tuples_needed, PlanState *child_node)
+{
+ /*
+ * Since this function recurses, in principle we should check stack depth
+ * here. In practice, it's probably pointless since the earlier node
+ * initialization tree traversal would surely have consumed more stack.
+ */
+
+ if (IsA(child_node, SortState))
+ {
+ /*
+ * If it is a Sort node, notify it that it can use bounded sort.
+ *
+ * Note: it is the responsibility of nodeSort.c to react properly to
+ * changes of these parameters. If we ever redesign this, it'd be a
+ * good idea to integrate this signaling with the parameter-change
+ * mechanism.
+ */
+ SortState *sortState = (SortState *) child_node;
+
+ if (tuples_needed < 0)
+ {
+ /* make sure flag gets reset if needed upon rescan */
+ sortState->bounded = false;
+ }
+ else
+ {
+ sortState->bounded = true;
+ sortState->bound = tuples_needed;
+ }
+ }
+ else if (IsA(child_node, MergeAppendState))
+ {
+ /*
+ * If it is a MergeAppend, we can apply the bound to any nodes that
+ * are children of the MergeAppend, since the MergeAppend surely need
+ * read no more than that many tuples from any one input.
+ */
+ MergeAppendState *maState = (MergeAppendState *) child_node;
+ int i;
+
+ for (i = 0; i < maState->ms_nplans; i++)
+ ExecSetTupleBound(tuples_needed, maState->mergeplans[i]);
+ }
+ else if (IsA(child_node, ResultState))
+ {
+ /*
+ * Similarly, for a projecting Result, we can apply the bound to its
+ * child node.
+ *
+ * If Result supported qual checking, we'd have to punt on seeing a
+ * qual. Note that having a resconstantqual is not a showstopper: if
+ * that condition succeeds it affects nothing, while if it fails, no
+ * rows will be demanded from the Result child anyway.
+ */
+ if (outerPlanState(child_node))
+ ExecSetTupleBound(tuples_needed, outerPlanState(child_node));
+ }
+ else if (IsA(child_node, SubqueryScanState))
+ {
+ /*
+ * We can also descend through SubqueryScan, but only if it has no
+ * qual (otherwise it might discard rows).
+ */
+ SubqueryScanState *subqueryState = (SubqueryScanState *) child_node;
+
+ if (subqueryState->ss.ps.qual == NULL)
+ ExecSetTupleBound(tuples_needed, subqueryState->subplan);
+ }
+ else if (IsA(child_node, GatherState))
+ {
+ /*
+ * A Gather node can propagate the bound to its workers. As with
+ * MergeAppend, no one worker could possibly need to return more
+ * tuples than the Gather itself needs to.
+ *
+ * Note: As with Sort, the Gather node is responsible for reacting
+ * properly to changes to this parameter.
+ */
+ GatherState *gstate = (GatherState *) child_node;
+
+ gstate->tuples_needed = tuples_needed;
+
+ /* Also pass down the bound to our own copy of the child plan */
+ ExecSetTupleBound(tuples_needed, outerPlanState(child_node));
+ }
+ else if (IsA(child_node, GatherMergeState))
+ {
+ /* Same comments as for Gather */
+ GatherMergeState *gstate = (GatherMergeState *) child_node;
+
+ gstate->tuples_needed = tuples_needed;
+
+ ExecSetTupleBound(tuples_needed, outerPlanState(child_node));
+ }
+
+ /*
+ * In principle we could descend through any plan node type that is
+ * certain not to discard or combine input rows; but on seeing a node that
+ * can do that, we can't propagate the bound any further. For the moment
+ * it's unclear that any other cases are worth checking here.
+ */
+}
gatherstate->ps.state = estate;
gatherstate->ps.ExecProcNode = ExecGather;
gatherstate->need_to_scan_locally = !node->single_copy;
+ gatherstate->tuples_needed = -1;
/*
* Miscellaneous initialization
if (!node->pei)
node->pei = ExecInitParallelPlan(node->ps.lefttree,
estate,
- gather->num_workers);
+ gather->num_workers,
+ node->tuples_needed);
/*
* Register backend workers. We might not get as many as we
gm_state->ps.plan = (Plan *) node;
gm_state->ps.state = estate;
gm_state->ps.ExecProcNode = ExecGatherMerge;
+ gm_state->tuples_needed = -1;
/*
* Miscellaneous initialization
if (!node->pei)
node->pei = ExecInitParallelPlan(node->ps.lefttree,
estate,
- gm->num_workers);
+ gm->num_workers,
+ node->tuples_needed);
/* Try to launch workers. */
pcxt = node->pei->pcxt;
#include "nodes/nodeFuncs.h"
static void recompute_limits(LimitState *node);
-static void pass_down_bound(LimitState *node, PlanState *child_node);
+static int64 compute_tuples_needed(LimitState *node);
/* ----------------------------------------------------------------
/* Set state-machine state */
node->lstate = LIMIT_RESCAN;
- /* Notify child node about limit, if useful */
- pass_down_bound(node, outerPlanState(node));
+ /*
+ * Notify child node about limit. Note: think not to "optimize" by
+ * skipping ExecSetTupleBound if compute_tuples_needed returns < 0. We
+ * must update the child node anyway, in case this is a rescan and the
+ * previous time we got a different result.
+ */
+ ExecSetTupleBound(compute_tuples_needed(node), outerPlanState(node));
}
/*
- * If we have a COUNT, and our input is a Sort node, notify it that it can
- * use bounded sort. We can also pass down the bound through plan nodes
- * that cannot remove or combine input rows; for example, if our input is a
- * MergeAppend, we can apply the same bound to any Sorts that are direct
- * children of the MergeAppend, since the MergeAppend surely need not read
- * more than that many tuples from any one input.
- *
- * This is a bit of a kluge, but we don't have any more-abstract way of
- * communicating between the two nodes; and it doesn't seem worth trying
- * to invent one without some more examples of special communication needs.
- *
- * Note: it is the responsibility of nodeSort.c to react properly to
- * changes of these parameters. If we ever do redesign this, it'd be a
- * good idea to integrate this signaling with the parameter-change mechanism.
+ * Compute the maximum number of tuples needed to satisfy this Limit node.
+ * Return a negative value if there is not a determinable limit.
*/
-static void
-pass_down_bound(LimitState *node, PlanState *child_node)
+static int64
+compute_tuples_needed(LimitState *node)
{
- /*
- * Since this function recurses, in principle we should check stack depth
- * here. In practice, it's probably pointless since the earlier node
- * initialization tree traversal would surely have consumed more stack.
- */
-
- if (IsA(child_node, SortState))
- {
- SortState *sortState = (SortState *) child_node;
- int64 tuples_needed = node->count + node->offset;
-
- /* negative test checks for overflow in sum */
- if (node->noCount || tuples_needed < 0)
- {
- /* make sure flag gets reset if needed upon rescan */
- sortState->bounded = false;
- }
- else
- {
- sortState->bounded = true;
- sortState->bound = tuples_needed;
- }
- }
- else if (IsA(child_node, MergeAppendState))
- {
- /* Pass down the bound through MergeAppend */
- MergeAppendState *maState = (MergeAppendState *) child_node;
- int i;
-
- for (i = 0; i < maState->ms_nplans; i++)
- pass_down_bound(node, maState->mergeplans[i]);
- }
- else if (IsA(child_node, ResultState))
- {
- /*
- * We also have to be prepared to look through a Result, since the
- * planner might stick one atop MergeAppend for projection purposes.
- *
- * If Result supported qual checking, we'd have to punt on seeing a
- * qual. Note that having a resconstantqual is not a showstopper: if
- * that fails we're not getting any rows at all.
- */
- if (outerPlanState(child_node))
- pass_down_bound(node, outerPlanState(child_node));
- }
- else if (IsA(child_node, SubqueryScanState))
- {
- /*
- * We can also look through SubqueryScan, but only if it has no qual
- * (otherwise it might discard rows).
- */
- SubqueryScanState *subqueryState = (SubqueryScanState *) child_node;
-
- if (subqueryState->ss.ps.qual == NULL)
- pass_down_bound(node, subqueryState->subplan);
- }
-
- /*
- * In principle we could look through any plan node type that is certain
- * not to discard or combine input rows. In practice, there are not many
- * node types that the planner might put between Sort and Limit, so trying
- * to be very general is not worth the trouble.
- */
+ if (node->noCount)
+ return -1;
+ /* Note: if this overflows, we'll return a negative value, which is OK */
+ return node->count + node->offset;
}
/* ----------------------------------------------------------------
} ParallelExecutorInfo;
extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate,
- EState *estate, int nworkers);
+ EState *estate, int nworkers, int64 tuples_needed);
extern void ExecParallelFinish(ParallelExecutorInfo *pei);
extern void ExecParallelCleanup(ParallelExecutorInfo *pei);
extern void ExecParallelReinitialize(ParallelExecutorInfo *pei);
extern Node *MultiExecProcNode(PlanState *node);
extern void ExecEndNode(PlanState *node);
extern bool ExecShutdownNode(PlanState *node);
+extern void ExecSetTupleBound(int64 tuples_needed, PlanState *child_node);
/* ----------------------------------------------------------------
struct TupleQueueReader **reader;
TupleTableSlot *funnel_slot;
bool need_to_scan_locally;
+ int64 tuples_needed; /* tuple bound, see ExecSetTupleBound */
} GatherState;
/* ----------------
struct binaryheap *gm_heap; /* binary heap of slot indices */
bool gm_initialized; /* gather merge initilized ? */
bool need_to_scan_locally;
+ int64 tuples_needed; /* tuple bound, see ExecSetTupleBound */
int gm_nkeys;
SortSupport gm_sortkeys; /* array of length ms_nkeys */
struct GMReaderTupleBuffer *gm_tuple_buffers; /* tuple buffer per reader */
500
(20 rows)
+reset enable_hashagg;
+-- gather merge test with a LIMIT
+explain (costs off)
+ select fivethous from tenk1 order by fivethous limit 4;
+ QUERY PLAN
+----------------------------------------------
+ Limit
+ -> Gather Merge
+ Workers Planned: 4
+ -> Sort
+ Sort Key: fivethous
+ -> Parallel Seq Scan on tenk1
+(6 rows)
+
+select fivethous from tenk1 order by fivethous limit 4;
+ fivethous
+-----------
+ 0
+ 0
+ 1
+ 1
+(4 rows)
+
-- gather merge test with 0 worker
set max_parallel_workers = 0;
explain (costs off)
(5 rows)
reset max_parallel_workers;
-reset enable_hashagg;
SAVEPOINT settings;
SET LOCAL force_parallel_mode = 1;
explain (costs off)
select count(*) from tenk1 group by twenty;
+reset enable_hashagg;
+
+-- gather merge test with a LIMIT
+explain (costs off)
+ select fivethous from tenk1 order by fivethous limit 4;
+
+select fivethous from tenk1 order by fivethous limit 4;
+
-- gather merge test with 0 worker
set max_parallel_workers = 0;
explain (costs off)
select string4 from tenk1 order by string4 limit 5;
select string4 from tenk1 order by string4 limit 5;
reset max_parallel_workers;
-reset enable_hashagg;
SAVEPOINT settings;
SET LOCAL force_parallel_mode = 1;