WRITE_INT_FIELD(width);
WRITE_BOOL_FIELD(consider_startup);
WRITE_BOOL_FIELD(consider_param_startup);
+ WRITE_BOOL_FIELD(consider_parallel);
WRITE_NODE_FIELD(reltargetlist);
WRITE_NODE_FIELD(pathlist);
WRITE_NODE_FIELD(ppilist);
#include "access/tsmapi.h"
#include "catalog/pg_class.h"
#include "catalog/pg_operator.h"
+#include "catalog/pg_proc.h"
#include "foreign/fdwapi.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
Index rti, RangeTblEntry *rte);
static void set_plain_rel_size(PlannerInfo *root, RelOptInfo *rel,
RangeTblEntry *rte);
+static void set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel,
+ RangeTblEntry *rte);
+static bool function_rte_parallel_ok(RangeTblEntry *rte);
static void set_plain_rel_pathlist(PlannerInfo *root, RelOptInfo *rel,
RangeTblEntry *rte);
static void set_tablesample_rel_size(PlannerInfo *root, RelOptInfo *rel,
set_base_rel_consider_startup(root);
/*
- * Generate access paths for the base rels.
+ * Generate access paths for the base rels. set_base_rel_sizes also
+ * sets the consider_parallel flag for each baserel, if appropriate.
*/
set_base_rel_sizes(root);
set_base_rel_pathlists(root);
/*
* set_base_rel_sizes
* Set the size estimates (rows and widths) for each base-relation entry.
+ * Also determine whether to consider parallel paths for base relations.
*
* We do this in a separate pass over the base rels so that rowcount
- * estimates are available for parameterized path generation.
+ * estimates are available for parameterized path generation, and also so
+ * that the consider_parallel flag is set correctly before we begin to
+ * generate paths.
*/
static void
set_base_rel_sizes(PlannerInfo *root)
for (rti = 1; rti < root->simple_rel_array_size; rti++)
{
RelOptInfo *rel = root->simple_rel_array[rti];
+ RangeTblEntry *rte;
/* there may be empty slots corresponding to non-baserel RTEs */
if (rel == NULL)
if (rel->reloptkind != RELOPT_BASEREL)
continue;
- set_rel_size(root, rel, rti, root->simple_rte_array[rti]);
+ rte = root->simple_rte_array[rti];
+
+ /*
+ * If parallelism is allowable for this query in general, see whether
+ * it's allowable for this rel in particular. We have to do this
+ * before set_rel_size, because that if this is an inheritance parent,
+ * set_append_rel_size will pass the consider_parallel flag down to
+ * inheritance children.
+ */
+ if (root->glob->parallelModeOK)
+ set_rel_consider_parallel(root, rel, rte);
+
+ set_rel_size(root, rel, rti, rte);
}
}
set_baserel_size_estimates(root, rel);
}
+/*
+ * If this relation could possibly be scanned from within a worker, then set
+ * the consider_parallel flag. The flag has previously been initialized to
+ * false, so we just bail out if it becomes clear that we can't safely set it.
+ */
+static void
+set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel,
+ RangeTblEntry *rte)
+{
+ /* Don't call this if parallelism is disallowed for the entire query. */
+ Assert(root->glob->parallelModeOK);
+
+ /* Don't call this for non-baserels. */
+ Assert(rel->reloptkind == RELOPT_BASEREL);
+
+ /* Assorted checks based on rtekind. */
+ switch (rte->rtekind)
+ {
+ case RTE_RELATION:
+ /*
+ * Currently, parallel workers can't access the leader's temporary
+ * tables. We could possibly relax this if the wrote all of its
+ * local buffers at the start of the query and made no changes
+ * thereafter (maybe we could allow hint bit changes), and if we
+ * taught the workers to read them. Writing a large number of
+ * temporary buffers could be expensive, though, and we don't have
+ * the rest of the necessary infrastructure right now anyway. So
+ * for now, bail out if we see a temporary table.
+ */
+ if (get_rel_persistence(rte->relid) == RELPERSISTENCE_TEMP)
+ return;
+
+ /*
+ * Table sampling can be pushed down to workers if the sample
+ * function and its arguments are safe.
+ */
+ if (rte->tablesample != NULL)
+ {
+ Oid proparallel = func_parallel(rte->tablesample->tsmhandler);
+
+ if (proparallel != PROPARALLEL_SAFE)
+ return;
+ if (has_parallel_hazard((Node *) rte->tablesample->args,
+ false))
+ return;
+ return;
+ }
+ break;
+
+ case RTE_SUBQUERY:
+ /*
+ * Subplans currently aren't passed to workers. Even if they
+ * were, the subplan might be using parallelism internally, and
+ * we can't support nested Gather nodes at present. Finally,
+ * we don't have a good way of knowing whether the subplan
+ * involves any parallel-restricted operations. It would be
+ * nice to relax this restriction some day, but it's going to
+ * take a fair amount of work.
+ */
+ return;
+
+ case RTE_JOIN:
+ /* Shouldn't happen; we're only considering baserels here. */
+ Assert(false);
+ return;
+
+ case RTE_FUNCTION:
+ /* Check for parallel-restricted functions. */
+ if (!function_rte_parallel_ok(rte))
+ return;
+ break;
+
+ case RTE_VALUES:
+ /*
+ * The data for a VALUES clause is stored in the plan tree itself,
+ * so scanning it in a worker is fine.
+ */
+ break;
+
+ case RTE_CTE:
+ /*
+ * CTE tuplestores aren't shared among parallel workers, so we
+ * force all CTE scans to happen in the leader. Also, populating
+ * the CTE would require executing a subplan that's not available
+ * in the worker, might be parallel-restricted, and must get
+ * executed only once.
+ */
+ return;
+ }
+
+ /*
+ * If there's anything in baserestrictinfo that's parallel-restricted,
+ * we give up on parallelizing access to this relation. We could consider
+ * instead postponing application of the restricted quals until we're
+ * above all the parallelism in the plan tree, but it's not clear that
+ * this would be a win in very many cases, and it might be tricky to make
+ * outer join clauses work correctly.
+ */
+ if (has_parallel_hazard((Node *) rel->baserestrictinfo, false))
+ return;
+
+ /* We have a winner. */
+ rel->consider_parallel = true;
+}
+
+/*
+ * Check whether a function RTE is scanning something parallel-restricted.
+ */
+static bool
+function_rte_parallel_ok(RangeTblEntry *rte)
+{
+ ListCell *lc;
+
+ foreach(lc, rte->functions)
+ {
+ RangeTblFunction *rtfunc = (RangeTblFunction *) lfirst(lc);
+
+ Assert(IsA(rtfunc, RangeTblFunction));
+ if (has_parallel_hazard(rtfunc->funcexpr, false))
+ return false;
+ }
+
+ return true;
+}
+
/*
* set_plain_rel_pathlist
* Build access paths for a plain relation (no subquery, no inheritance)
set_plain_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte)
{
Relids required_outer;
+ int parallel_threshold = 1000;
/*
* We don't support pushing join clauses into the quals of a seqscan, but
/* Consider sequential scan */
add_path(rel, create_seqscan_path(root, rel, required_outer, 0));
+ /* Consider parallel sequential scan */
+ if (rel->consider_parallel && rel->pages > parallel_threshold &&
+ required_outer == NULL)
+ {
+ Path *path;
+ int parallel_degree = 1;
+
+ /*
+ * Limit the degree of parallelism logarithmically based on the size
+ * of the relation. This probably needs to be a good deal more
+ * sophisticated, but we need something here for now.
+ */
+ while (rel->pages > parallel_threshold * 3 &&
+ parallel_degree < max_parallel_degree)
+ {
+ parallel_degree++;
+ parallel_threshold *= 3;
+ if (parallel_threshold >= PG_INT32_MAX / 3)
+ break;
+ }
+
+ /*
+ * Ideally we should consider postponing the gather operation until
+ * much later, after we've pushed joins and so on atop the parallel
+ * sequential scan path. But we don't have the infrastructure for
+ * that yet, so just do this for now.
+ */
+ path = create_seqscan_path(root, rel, required_outer, parallel_degree);
+ path = (Path *)
+ create_gather_path(root, rel, path, required_outer,
+ parallel_degree);
+ add_path(rel, path);
+ }
+
/* Consider index scans */
create_index_paths(root, rel);
continue;
}
+ /* Copy consider_parallel flag from parent. */
+ childrel->consider_parallel = rel->consider_parallel;
+
/*
* CE failed, so finish copying/modifying targetlist and join quals.
*
/* Parallel setup and communication cost. */
startup_cost += parallel_setup_cost;
- run_cost += parallel_tuple_cost * rel->tuples;
+ run_cost += parallel_tuple_cost * path->path.rows;
path->path.startup_cost = startup_cost;
path->path.total_cost = (startup_cost + run_cost);
*/
#include "postgres.h"
+#include "optimizer/clauses.h"
#include "optimizer/orclauses.h"
#include "optimizer/pathnode.h"
#include "optimizer/paths.h"
/* We need a dummy joinrel to describe the empty set of baserels */
final_rel = build_empty_join_rel(root);
+ /*
+ * If query allows parallelism in general, check whether the quals
+ * are parallel-restricted. There's currently no real benefit to
+ * setting this flag correctly because we can't yet reference subplans
+ * from parallel workers. But that might change someday, so set this
+ * correctly anyway.
+ */
+ if (root->glob->parallelModeOK)
+ final_rel->consider_parallel =
+ !has_parallel_hazard(parse->jointree->quals, false);
+
/* The only path for it is a trivial Result path */
add_path(final_rel, (Path *)
create_result_path((List *) parse->jointree->quals));
/*
* Assess whether it's feasible to use parallel mode for this query.
* We can't do this in a standalone backend, or if the command will
- * try to modify any data, or if this is a cursor operation, or if any
+ * try to modify any data, or if this is a cursor operation, or if
+ * GUCs are set to values that don't permit parallelism, or if
* parallel-unsafe functions are present in the query tree.
*
* For now, we don't try to use parallel mode if we're running inside
glob->parallelModeOK = (cursorOptions & CURSOR_OPT_PARALLEL_OK) != 0 &&
IsUnderPostmaster && dynamic_shared_memory_type != DSM_IMPL_NONE &&
parse->commandType == CMD_SELECT && !parse->hasModifyingCTE &&
- parse->utilityStmt == NULL && !IsParallelWorker() &&
- !IsolationIsSerializable() &&
- !contain_parallel_unsafe((Node *) parse);
+ parse->utilityStmt == NULL && max_parallel_degree > 0 &&
+ !IsParallelWorker() && !IsolationIsSerializable() &&
+ !has_parallel_hazard((Node *) parse, true);
/*
* glob->parallelModeOK should tell us whether it's necessary to impose
#include "access/htup_details.h"
#include "catalog/pg_aggregate.h"
+#include "catalog/pg_class.h"
#include "catalog/pg_language.h"
#include "catalog/pg_operator.h"
#include "catalog/pg_proc.h"
char *prosrc;
} inline_error_callback_arg;
+typedef struct
+{
+ bool allow_restricted;
+} has_parallel_hazard_arg;
+
static bool contain_agg_clause_walker(Node *node, void *context);
static bool count_agg_clauses_walker(Node *node,
count_agg_clauses_context *context);
static bool contain_mutable_functions_walker(Node *node, void *context);
static bool contain_volatile_functions_walker(Node *node, void *context);
static bool contain_volatile_functions_not_nextval_walker(Node *node, void *context);
-static bool contain_parallel_unsafe_walker(Node *node, void *context);
+static bool has_parallel_hazard_walker(Node *node,
+ has_parallel_hazard_arg *context);
+static bool parallel_too_dangerous(char proparallel,
+ has_parallel_hazard_arg *context);
+static bool typeid_is_temp(Oid typeid);
static bool contain_nonstrict_functions_walker(Node *node, void *context);
static bool contain_leaked_vars_walker(Node *node, void *context);
static Relids find_nonnullable_rels_walker(Node *node, bool top_level);
}
/*****************************************************************************
- * Check queries for parallel-unsafe constructs
+ * Check queries for parallel unsafe and/or restricted constructs
*****************************************************************************/
+/*
+ * Check whether a node tree contains parallel hazards. This is used both
+ * on the entire query tree, to see whether the query can be parallelized at
+ * all, and also to evaluate whether a particular expression is safe to
+ * run in a parallel worker. We could separate these concerns into two
+ * different functions, but there's enough overlap that it doesn't seem
+ * worthwhile.
+ */
bool
-contain_parallel_unsafe(Node *node)
+has_parallel_hazard(Node *node, bool allow_restricted)
{
- return contain_parallel_unsafe_walker(node, NULL);
+ has_parallel_hazard_arg context;
+
+ context.allow_restricted = allow_restricted;
+ return has_parallel_hazard_walker(node, &context);
}
static bool
-contain_parallel_unsafe_walker(Node *node, void *context)
+has_parallel_hazard_walker(Node *node, has_parallel_hazard_arg *context)
{
if (node == NULL)
return false;
+
+ /*
+ * When we're first invoked on a completely unplanned tree, we must
+ * recurse through Query objects to as to locate parallel-unsafe
+ * constructs anywhere in the tree.
+ *
+ * Later, we'll be called again for specific quals, possibly after
+ * some planning has been done, we may encounter SubPlan, SubLink,
+ * or AlternativeSubLink nodes. Currently, there's no need to recurse
+ * through these; they can't be unsafe, since we've already cleared
+ * the entire query of unsafe operations, and they're definitely
+ * parallel-restricted.
+ */
+ if (IsA(node, Query))
+ {
+ Query *query = (Query *) node;
+
+ if (query->rowMarks != NULL)
+ return true;
+
+ /* Recurse into subselects */
+ return query_tree_walker(query,
+ has_parallel_hazard_walker,
+ context, 0);
+ }
+ else if (IsA(node, SubPlan) || IsA(node, SubLink) ||
+ IsA(node, AlternativeSubPlan) || IsA(node, Param))
+ {
+ /*
+ * Since we don't have the ability to push subplans down to workers
+ * at present, we treat subplan references as parallel-restricted.
+ */
+ if (!context->allow_restricted)
+ return true;
+ }
+
+ /* This is just a notational convenience for callers. */
+ if (IsA(node, RestrictInfo))
+ {
+ RestrictInfo *rinfo = (RestrictInfo *) node;
+ return has_parallel_hazard_walker((Node *) rinfo->clause, context);
+ }
+
+ /*
+ * It is an error for a parallel worker to touch a temporary table in any
+ * way, so we can't handle nodes whose type is the rowtype of such a table.
+ */
+ if (!context->allow_restricted)
+ {
+ switch (nodeTag(node))
+ {
+ case T_Var:
+ case T_Const:
+ case T_Param:
+ case T_Aggref:
+ case T_WindowFunc:
+ case T_ArrayRef:
+ case T_FuncExpr:
+ case T_NamedArgExpr:
+ case T_OpExpr:
+ case T_DistinctExpr:
+ case T_NullIfExpr:
+ case T_FieldSelect:
+ case T_FieldStore:
+ case T_RelabelType:
+ case T_CoerceViaIO:
+ case T_ArrayCoerceExpr:
+ case T_ConvertRowtypeExpr:
+ case T_CaseExpr:
+ case T_CaseTestExpr:
+ case T_ArrayExpr:
+ case T_RowExpr:
+ case T_CoalesceExpr:
+ case T_MinMaxExpr:
+ case T_CoerceToDomain:
+ case T_CoerceToDomainValue:
+ case T_SetToDefault:
+ if (typeid_is_temp(exprType(node)))
+ return true;
+ break;
+ default:
+ break;
+ }
+ }
+
+ /*
+ * For each node that might potentially call a function, we need to
+ * examine the pg_proc.proparallel marking for that function to see
+ * whether it's safe enough for the current value of allow_restricted.
+ */
if (IsA(node, FuncExpr))
{
FuncExpr *expr = (FuncExpr *) node;
- if (func_parallel(expr->funcid) == PROPARALLEL_UNSAFE)
+ if (parallel_too_dangerous(func_parallel(expr->funcid), context))
return true;
- /* else fall through to check args */
}
else if (IsA(node, OpExpr))
{
OpExpr *expr = (OpExpr *) node;
set_opfuncid(expr);
- if (func_parallel(expr->opfuncid) == PROPARALLEL_UNSAFE)
+ if (parallel_too_dangerous(func_parallel(expr->opfuncid), context))
return true;
- /* else fall through to check args */
}
else if (IsA(node, DistinctExpr))
{
DistinctExpr *expr = (DistinctExpr *) node;
set_opfuncid((OpExpr *) expr); /* rely on struct equivalence */
- if (func_parallel(expr->opfuncid) == PROPARALLEL_UNSAFE)
+ if (parallel_too_dangerous(func_parallel(expr->opfuncid), context))
return true;
- /* else fall through to check args */
}
else if (IsA(node, NullIfExpr))
{
NullIfExpr *expr = (NullIfExpr *) node;
set_opfuncid((OpExpr *) expr); /* rely on struct equivalence */
- if (func_parallel(expr->opfuncid) == PROPARALLEL_UNSAFE)
+ if (parallel_too_dangerous(func_parallel(expr->opfuncid), context))
return true;
- /* else fall through to check args */
}
else if (IsA(node, ScalarArrayOpExpr))
{
ScalarArrayOpExpr *expr = (ScalarArrayOpExpr *) node;
set_sa_opfuncid(expr);
- if (func_parallel(expr->opfuncid) == PROPARALLEL_UNSAFE)
+ if (parallel_too_dangerous(func_parallel(expr->opfuncid), context))
return true;
- /* else fall through to check args */
}
else if (IsA(node, CoerceViaIO))
{
/* check the result type's input function */
getTypeInputInfo(expr->resulttype,
&iofunc, &typioparam);
- if (func_parallel(iofunc) == PROPARALLEL_UNSAFE)
+ if (parallel_too_dangerous(func_parallel(iofunc), context))
return true;
/* check the input type's output function */
getTypeOutputInfo(exprType((Node *) expr->arg),
&iofunc, &typisvarlena);
- if (func_parallel(iofunc) == PROPARALLEL_UNSAFE)
+ if (parallel_too_dangerous(func_parallel(iofunc), context))
return true;
- /* else fall through to check args */
}
else if (IsA(node, ArrayCoerceExpr))
{
ArrayCoerceExpr *expr = (ArrayCoerceExpr *) node;
if (OidIsValid(expr->elemfuncid) &&
- func_parallel(expr->elemfuncid) == PROPARALLEL_UNSAFE)
+ parallel_too_dangerous(func_parallel(expr->elemfuncid), context))
return true;
- /* else fall through to check args */
}
else if (IsA(node, RowCompareExpr))
{
- /* RowCompare probably can't have volatile ops, but check anyway */
RowCompareExpr *rcexpr = (RowCompareExpr *) node;
ListCell *opid;
foreach(opid, rcexpr->opnos)
{
- if (op_volatile(lfirst_oid(opid)) == PROPARALLEL_UNSAFE)
+ Oid opfuncid = get_opcode(lfirst_oid(opid));
+ if (parallel_too_dangerous(func_parallel(opfuncid), context))
return true;
}
- /* else fall through to check args */
}
- else if (IsA(node, Query))
- {
- Query *query = (Query *) node;
- if (query->rowMarks != NULL)
- return true;
-
- /* Recurse into subselects */
- return query_tree_walker(query,
- contain_parallel_unsafe_walker,
- context, 0);
- }
+ /* ... and recurse to check substructure */
return expression_tree_walker(node,
- contain_parallel_unsafe_walker,
+ has_parallel_hazard_walker,
context);
}
+static bool
+parallel_too_dangerous(char proparallel, has_parallel_hazard_arg *context)
+{
+ if (context->allow_restricted)
+ return proparallel == PROPARALLEL_UNSAFE;
+ else
+ return proparallel != PROPARALLEL_SAFE;
+}
+
+static bool
+typeid_is_temp(Oid typeid)
+{
+ Oid relid = get_typ_typrelid(typeid);
+
+ if (!OidIsValid(relid))
+ return false;
+
+ return (get_rel_persistence(relid) == RELPERSISTENCE_TEMP);
+}
+
/*****************************************************************************
* Check clauses for nonstrict functions
*****************************************************************************/
*/
#include "postgres.h"
+#include "optimizer/clauses.h"
#include "optimizer/cost.h"
#include "optimizer/pathnode.h"
#include "optimizer/paths.h"
/* cheap startup cost is interesting iff not all tuples to be retrieved */
rel->consider_startup = (root->tuple_fraction > 0);
rel->consider_param_startup = false; /* might get changed later */
+ rel->consider_parallel = false; /* might get changed later */
rel->reltargetlist = NIL;
rel->pathlist = NIL;
rel->ppilist = NIL;
/* cheap startup cost is interesting iff not all tuples to be retrieved */
joinrel->consider_startup = (root->tuple_fraction > 0);
joinrel->consider_param_startup = false;
+ joinrel->consider_parallel = false;
joinrel->reltargetlist = NIL;
joinrel->pathlist = NIL;
joinrel->ppilist = NIL;
set_joinrel_size_estimates(root, joinrel, outer_rel, inner_rel,
sjinfo, restrictlist);
+ /*
+ * Set the consider_parallel flag if this joinrel could potentially be
+ * scanned within a parallel worker. If this flag is false for either
+ * inner_rel or outer_rel, then it must be false for the joinrel also.
+ * Even if both are true, there might be parallel-restricted quals at
+ * our level.
+ *
+ * Note that if there are more than two rels in this relation, they
+ * could be divided between inner_rel and outer_rel in any arbitary
+ * way. We assume this doesn't matter, because we should hit all the
+ * same baserels and joinclauses while building up to this joinrel no
+ * matter which we take; therefore, we should make the same decision
+ * here however we get here.
+ */
+ if (inner_rel->consider_parallel && outer_rel->consider_parallel &&
+ !has_parallel_hazard((Node *) restrictlist, false))
+ joinrel->consider_parallel = true;
+
/*
* Add the joinrel to the query's joinrel list, and store it into the
* auxiliary hashtable if there is one. NB: GEQO requires us to append
return InvalidOid;
}
+/*
+ * get_rel_persistence
+ *
+ * Returns the relpersistence associated with a given relation.
+ */
+char
+get_rel_persistence(Oid relid)
+{
+ HeapTuple tp;
+ Form_pg_class reltup;
+ char result;
+
+ tp = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
+ if (!HeapTupleIsValid(tp))
+ elog(ERROR, "cache lookup failed for relation %u", relid);
+ reltup = (Form_pg_class) GETSTRUCT(tp);
+ result = reltup->relpersistence;
+ ReleaseSysCache(tp);
+
+ return result;
+}
+
/* ---------- TRANSFORM CACHE ---------- */
/* per-relation planner control flags */
bool consider_startup; /* keep cheap-startup-cost paths? */
bool consider_param_startup; /* ditto, for parameterized paths? */
+ bool consider_parallel; /* consider parallel paths? */
/* materialization information */
List *reltargetlist; /* Vars to be output by scan of relation */
extern bool contain_mutable_functions(Node *clause);
extern bool contain_volatile_functions(Node *clause);
extern bool contain_volatile_functions_not_nextval(Node *clause);
-extern bool contain_parallel_unsafe(Node *node);
+extern bool has_parallel_hazard(Node *node, bool allow_restricted);
extern bool contain_nonstrict_functions(Node *clause);
extern bool contain_leaked_vars(Node *clause);
extern Oid get_rel_type_id(Oid relid);
extern char get_rel_relkind(Oid relid);
extern Oid get_rel_tablespace(Oid relid);
+extern char get_rel_persistence(Oid relid);
extern Oid get_transform_fromsql(Oid typid, Oid langid, List *trftypes);
extern Oid get_transform_tosql(Oid typid, Oid langid, List *trftypes);
extern bool get_typisdefined(Oid typid);