From 41ea0c23761ca108e2f08f6e3151e3cb1f9652a1 Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Tue, 5 Apr 2016 16:06:15 -0400 Subject: [PATCH] Fix parallel-safety code for parallel aggregation. has_parallel_hazard() was ignoring the proparallel markings for aggregates, which is no good. Fix that. There was no way to mark an aggregate as actually being parallel-safe, either, so add a PARALLEL option to CREATE AGGREGATE. Patch by me, reviewed by David Rowley. --- doc/src/sgml/ref/create_aggregate.sgml | 9 ++++++++ src/backend/catalog/pg_aggregate.c | 5 +++-- src/backend/commands/aggregatecmds.c | 21 ++++++++++++++++++- src/backend/commands/functioncmds.c | 5 ++--- src/backend/optimizer/util/clauses.c | 7 +++++++ src/include/catalog/pg_aggregate.h | 3 ++- .../regress/expected/create_aggregate.out | 12 +++++++++-- src/test/regress/sql/create_aggregate.sql | 12 +++++++++-- 8 files changed, 63 insertions(+), 11 deletions(-) diff --git a/doc/src/sgml/ref/create_aggregate.sgml b/doc/src/sgml/ref/create_aggregate.sgml index 7a6f8a97fd..3df330393d 100644 --- a/doc/src/sgml/ref/create_aggregate.sgml +++ b/doc/src/sgml/ref/create_aggregate.sgml @@ -40,6 +40,7 @@ CREATE AGGREGATE name ( [ minitial_condition ] [ , SORTOP = sort_operator ] + [ , PARALLEL = { SAFE | RESTRICTED | UNSAFE } ] ) CREATE AGGREGATE name ( [ [ argmode ] [ argname ] arg_data_type [ , ... ] ] @@ -55,6 +56,8 @@ CREATE AGGREGATE name ( [ [ serialtype ] [ , INITCOND = initial_condition ] [ , HYPOTHETICAL ] + [ , PARALLEL = { SAFE | RESTRICTED | UNSAFE } ] + ) or the old syntax @@ -684,6 +687,12 @@ SELECT col FROM tab ORDER BY col USING sortop LIMIT 1; Currently, ordered-set aggregates do not need to support moving-aggregate mode, since they cannot be used as window functions. + + + The meaning of PARALLEL SAFE, PARALLEL RESTRICTED, + and PARALLEL UNSAFE is the same as for + . + diff --git a/src/backend/catalog/pg_aggregate.c b/src/backend/catalog/pg_aggregate.c index b420349835..bcc941104f 100644 --- a/src/backend/catalog/pg_aggregate.c +++ b/src/backend/catalog/pg_aggregate.c @@ -72,7 +72,8 @@ AggregateCreate(const char *aggName, Oid aggmTransType, int32 aggmTransSpace, const char *agginitval, - const char *aggminitval) + const char *aggminitval, + char proparallel) { Relation aggdesc; HeapTuple tup; @@ -622,7 +623,7 @@ AggregateCreate(const char *aggName, false, /* isStrict (not needed for agg) */ PROVOLATILE_IMMUTABLE, /* volatility (not * needed for agg) */ - PROPARALLEL_UNSAFE, + proparallel, parameterTypes, /* paramTypes */ allParameterTypes, /* allParamTypes */ parameterModes, /* parameterModes */ diff --git a/src/backend/commands/aggregatecmds.c b/src/backend/commands/aggregatecmds.c index 3424f842b9..5c4d576b86 100644 --- a/src/backend/commands/aggregatecmds.c +++ b/src/backend/commands/aggregatecmds.c @@ -78,6 +78,7 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters, int32 mtransSpace = 0; char *initval = NULL; char *minitval = NULL; + char *parallel = NULL; int numArgs; int numDirectArgs = 0; oidvector *parameterTypes; @@ -91,6 +92,7 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters, Oid mtransTypeId = InvalidOid; char transTypeType; char mtransTypeType = 0; + char proparallel = PROPARALLEL_UNSAFE; ListCell *pl; /* Convert list of names to a name and namespace */ @@ -178,6 +180,8 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters, initval = defGetString(defel); else if (pg_strcasecmp(defel->defname, "minitcond") == 0) minitval = defGetString(defel); + else if (pg_strcasecmp(defel->defname, "parallel") == 0) + parallel = defGetString(defel); else ereport(WARNING, (errcode(ERRCODE_SYNTAX_ERROR), @@ -449,6 +453,20 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters, (void) OidInputFunctionCall(typinput, minitval, typioparam, -1); } + if (parallel) + { + if (pg_strcasecmp(parallel, "safe") == 0) + proparallel = PROPARALLEL_SAFE; + else if (pg_strcasecmp(parallel, "restricted") == 0) + proparallel = PROPARALLEL_RESTRICTED; + else if (pg_strcasecmp(parallel, "unsafe") == 0) + proparallel = PROPARALLEL_UNSAFE; + else + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("parameter \"parallel\" must be SAFE, RESTRICTED, or UNSAFE"))); + } + /* * Most of the argument-checking is done inside of AggregateCreate */ @@ -480,5 +498,6 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters, mtransTypeId, /* transition data type */ mtransSpace, /* transition space */ initval, /* initial condition */ - minitval); /* initial condition */ + minitval, /* initial condition */ + proparallel); /* parallel safe? */ } diff --git a/src/backend/commands/functioncmds.c b/src/backend/commands/functioncmds.c index a745d73c7a..748c8f75d4 100644 --- a/src/backend/commands/functioncmds.c +++ b/src/backend/commands/functioncmds.c @@ -566,9 +566,8 @@ interpret_func_parallel(DefElem *defel) else { ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("parallel option \"%s\" not recognized", - str))); + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("parameter \"parallel\" must be SAFE, RESTRICTED, or UNSAFE"))); return PROPARALLEL_UNSAFE; /* keep compiler quiet */ } } diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c index c615717dea..5674a73dfe 100644 --- a/src/backend/optimizer/util/clauses.c +++ b/src/backend/optimizer/util/clauses.c @@ -1419,6 +1419,13 @@ has_parallel_hazard_walker(Node *node, has_parallel_hazard_arg *context) if (parallel_too_dangerous(func_parallel(expr->funcid), context)) return true; } + else if (IsA(node, Aggref)) + { + Aggref *aggref = (Aggref *) node; + + if (parallel_too_dangerous(func_parallel(aggref->aggfnoid), context)) + return true; + } else if (IsA(node, OpExpr)) { OpExpr *expr = (OpExpr *) node; diff --git a/src/include/catalog/pg_aggregate.h b/src/include/catalog/pg_aggregate.h index 101d073a04..7d5015a1cf 100644 --- a/src/include/catalog/pg_aggregate.h +++ b/src/include/catalog/pg_aggregate.h @@ -349,6 +349,7 @@ extern ObjectAddress AggregateCreate(const char *aggName, Oid aggmTransType, int32 aggmTransSpace, const char *agginitval, - const char *aggminitval); + const char *aggminitval, + char proparallel); #endif /* PG_AGGREGATE_H */ diff --git a/src/test/regress/expected/create_aggregate.out b/src/test/regress/expected/create_aggregate.out index dac26982bc..1aba0c6266 100644 --- a/src/test/regress/expected/create_aggregate.out +++ b/src/test/regress/expected/create_aggregate.out @@ -20,9 +20,9 @@ CREATE AGGREGATE newsum ( -- zero-argument aggregate CREATE AGGREGATE newcnt (*) ( sfunc = int8inc, stype = int8, - initcond = '0' + initcond = '0', parallel = safe ); --- old-style spelling of same +-- old-style spelling of same (except without parallel-safe; that's too new) CREATE AGGREGATE oldcnt ( sfunc = int8inc, basetype = 'ANY', stype = int8, initcond = '0' @@ -188,6 +188,14 @@ WHERE aggfnoid = 'myavg'::REGPROC; (1 row) DROP AGGREGATE myavg (numeric); +-- invalid: bad parallel-safety marking +CREATE AGGREGATE mysum (int) +( + stype = int, + sfunc = int4pl, + parallel = pear +); +ERROR: parameter "parallel" must be SAFE, RESTRICTED, or UNSAFE -- invalid: nonstrict inverse with strict forward function CREATE FUNCTION float8mi_n(float8, float8) RETURNS float8 AS $$ SELECT $1 - $2; $$ diff --git a/src/test/regress/sql/create_aggregate.sql b/src/test/regress/sql/create_aggregate.sql index a7da31e594..c98c154a82 100644 --- a/src/test/regress/sql/create_aggregate.sql +++ b/src/test/regress/sql/create_aggregate.sql @@ -23,10 +23,10 @@ CREATE AGGREGATE newsum ( -- zero-argument aggregate CREATE AGGREGATE newcnt (*) ( sfunc = int8inc, stype = int8, - initcond = '0' + initcond = '0', parallel = safe ); --- old-style spelling of same +-- old-style spelling of same (except without parallel-safe; that's too new) CREATE AGGREGATE oldcnt ( sfunc = int8inc, basetype = 'ANY', stype = int8, initcond = '0' @@ -201,6 +201,14 @@ WHERE aggfnoid = 'myavg'::REGPROC; DROP AGGREGATE myavg (numeric); +-- invalid: bad parallel-safety marking +CREATE AGGREGATE mysum (int) +( + stype = int, + sfunc = int4pl, + parallel = pear +); + -- invalid: nonstrict inverse with strict forward function CREATE FUNCTION float8mi_n(float8, float8) RETURNS float8 AS -- 2.40.0