From a9d9acbf219b9e96585779cd5f99d674d4ccba74 Mon Sep 17 00:00:00 2001 From: Tom Lane Date: Sat, 12 Apr 2014 11:58:53 -0400 Subject: [PATCH] Create infrastructure for moving-aggregate optimization. Until now, when executing an aggregate function as a window function within a window with moving frame start (that is, any frame start mode except UNBOUNDED PRECEDING), we had to recalculate the aggregate from scratch each time the frame head moved. This patch allows an aggregate definition to include an alternate "moving aggregate" implementation that includes an inverse transition function for removing rows from the aggregate's running state. As long as this can be done successfully, runtime is proportional to the total number of input rows, rather than to the number of input rows times the average frame length. This commit includes the core infrastructure, documentation, and regression tests using user-defined aggregates. Follow-on commits will update some of the built-in aggregates to use this feature. David Rowley and Florian Pflug, reviewed by Dean Rasheed; additional hacking by me --- doc/src/sgml/catalogs.sgml | 43 ++ doc/src/sgml/ref/create_aggregate.sgml | 153 ++++- doc/src/sgml/xaggr.sgml | 197 +++++- src/backend/catalog/pg_aggregate.c | 241 ++++++- src/backend/commands/aggregatecmds.c | 101 ++- src/backend/executor/nodeAgg.c | 13 +- src/backend/executor/nodeWindowAgg.c | 639 +++++++++++++++--- src/backend/optimizer/util/clauses.c | 6 +- src/backend/parser/parse_agg.c | 32 +- src/bin/pg_dump/pg_dump.c | 68 ++ src/include/catalog/catversion.h | 2 +- src/include/catalog/pg_aggregate.h | 302 +++++---- src/include/nodes/execnodes.h | 3 +- src/include/parser/parse_agg.h | 2 + .../regress/expected/create_aggregate.out | 35 + src/test/regress/expected/opr_sanity.out | 122 +++- src/test/regress/expected/window.out | 223 ++++++ src/test/regress/sql/create_aggregate.sql | 41 ++ src/test/regress/sql/opr_sanity.sql | 103 ++- src/test/regress/sql/window.sql | 192 ++++++ 20 files changed, 2228 insertions(+), 290 deletions(-) diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 0069573c45..c174e672ad 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -386,6 +386,24 @@ pg_proc.oid Final function (zero if none) + + aggmtransfn + regproc + pg_proc.oid + Forward transition function for moving-aggregate mode (zero if none) + + + aggminvtransfn + regproc + pg_proc.oid + Inverse transition function for moving-aggregate mode (zero if none) + + + aggmfinalfn + regproc + pg_proc.oid + Final function for moving-aggregate mode (zero if none) + aggsortop oid @@ -405,6 +423,20 @@ Approximate average size (in bytes) of the transition state data, or zero to use a default estimate + + aggmtranstype + oid + pg_type.oid + Data type of the aggregate function's internal transition (state) + data for moving-aggregate mode (zero if none) + + + aggmtransspace + int4 + + Approximate average size (in bytes) of the transition state data + for moving-aggregate mode, or zero to use a default estimate + agginitval text @@ -416,6 +448,17 @@ value starts out null. + + aggminitval + text + + + The initial value of the transition state for moving-aggregate mode. + This is a text field containing the initial value in its external + string representation. If this field is null, the transition state + value starts out null. + + diff --git a/doc/src/sgml/ref/create_aggregate.sgml b/doc/src/sgml/ref/create_aggregate.sgml index e5fc718654..268acf3e84 100644 --- a/doc/src/sgml/ref/create_aggregate.sgml +++ b/doc/src/sgml/ref/create_aggregate.sgml @@ -27,6 +27,12 @@ CREATE AGGREGATE name ( [ state_data_size ] [ , FINALFUNC = ffunc ] [ , INITCOND = initial_condition ] + [ , MSFUNC = msfunc ] + [ , MINVFUNC = minvfunc ] + [ , MSTYPE = mstate_data_type ] + [ , MSSPACE = mstate_data_size ] + [ , MFINALFUNC = mffunc ] + [ , MINITCOND = minitial_condition ] [ , SORTOP = sort_operator ] ) @@ -49,6 +55,12 @@ CREATE AGGREGATE name ( [ , SSPACE = state_data_size ] [ , FINALFUNC = ffunc ] [ , INITCOND = initial_condition ] + [ , MSFUNC = sfunc ] + [ , MINVFUNC = invfunc ] + [ , MSTYPE = state_data_type ] + [ , MSSPACE = state_data_size ] + [ , MFINALFUNC = ffunc ] + [ , MINITCOND = initial_condition ] [ , SORTOP = sort_operator ] ) @@ -84,7 +96,7 @@ CREATE AGGREGATE name ( - An aggregate function is made from one or two ordinary + A simple aggregate function is made from one or two ordinary functions: a state transition function sfunc, @@ -126,7 +138,7 @@ CREATE AGGREGATE name ( values are ignored (the function is not called and the previous state value is retained). If the initial state value is null, then at the first row with all-nonnull input values, the first argument value replaces the state - value, and the transition function is invoked at subsequent rows with + value, and the transition function is invoked at each subsequent row with all-nonnull input values. This is handy for implementing aggregates like max. Note that this behavior is only available when @@ -154,6 +166,18 @@ CREATE AGGREGATE name ( input rows. + + An aggregate can optionally support moving-aggregate mode, + as described in . This requires + specifying the MSFUNC, MINVFUNC, + and MSTYPE parameters, and optionally + the MSPACE, MFINALFUNC, + and MINITCOND parameters. Except for MINVFUNC, + these parameters work like the corresponding simple-aggregate parameters + without M; they define a separate implementation of the + aggregate that includes an inverse transition function. + + The syntax with ORDER BY in the parameter list creates a special type of aggregate called an ordered-set @@ -197,8 +221,8 @@ SELECT col FROM tab ORDER BY col USING sortop LIMIT 1; To be able to create an aggregate function, you must have USAGE privilege on the argument types, the state - type, and the return type, as well as EXECUTE privilege - on the transition and final functions. + type(s), and the return type, as well as EXECUTE + privilege on the transition and final functions. @@ -359,6 +383,79 @@ SELECT col FROM tab ORDER BY col USING sortop LIMIT 1; + + msfunc + + + The name of the forward state transition function to be called for each + input row in moving-aggregate mode. This is exactly like the regular + transition function, except that its first argument and result are of + type mstate_data_type, which might be different + from state_data_type. + + + + + + minvfunc + + + The name of the inverse state transition function to be used in + moving-aggregate mode. This function has the same argument and + result types as msfunc, but it is used to remove + a value from the current aggregate state, rather than add a value to + it. The inverse transition function must have the same strictness + attribute as the forward state transition function. + + + + + + mstate_data_type + + + The data type for the aggregate's state value, when using + moving-aggregate mode. + + + + + + mstate_data_size + + + The approximate average size (in bytes) of the aggregate's state + value, when using moving-aggregate mode. This works the same as + state_data_size. + + + + + + mffunc + + + The name of the final function called to compute the aggregate's + result after all input rows have been traversed, when using + moving-aggregate mode. This works the same as ffunc, + except that its input type is mstate_data_type. + The aggregate result type determined by mffunc + and mstate_data_type must match that determined by the + aggregate's regular implementation. + + + + + + minitial_condition + + + The initial setting for the state value, when using moving-aggregate + mode. This works the same as initial_condition. + + + + sort_operator @@ -397,6 +494,49 @@ SELECT col FROM tab ORDER BY col USING sortop LIMIT 1; Notes + + If an aggregate supports moving-aggregate mode, it will improve + calculation efficiency when the aggregate is used as a window function + for a window with moving frame start (that is, a frame start mode other + than UNBOUNDED PRECEDING). Conceptually, the forward + transition function adds input values to the aggregate's state when + they enter the window frame from the bottom, and the inverse transition + function removes them again when they leave the frame at the top. So, + when values are removed, they are always removed in the same order they + were added. Whenever the inverse transition function is invoked, it will + thus receive the earliest added but not yet removed argument value(s). + The inverse transition function can assume that at least one row will + remain in the current state after it removes the oldest row. (When this + would not be the case, the window function mechanism simply starts a + fresh aggregation, rather than using the inverse transition function.) + + + + The forward transition function for moving-aggregate mode is not + allowed to return NULL as the new state value. If the inverse + transition function returns NULL, this is taken as an indication that + the inverse function cannot reverse the state calculation for this + particular input, and so the aggregate calculation will be redone from + scratch for the current frame starting position. This convention + allows moving-aggregate mode to be used in situations where there are + some infrequent cases that are impractical to reverse out of the + running state value. + + + + If no moving-aggregate implementation is supplied, + the aggregate can still be used with moving frames, + but PostgreSQL will recompute the whole + aggregation whenever the start of the frame moves. + Note that whether or not the aggregate supports moving-aggregate + mode, PostgreSQL can handle a moving frame + end without recalculation; this is done by continuing to add new values + to the aggregate's state. It is assumed that the final function does + not damage the aggregate's state value, so that the aggregation can be + continued even after an aggregate result value has been obtained for + one set of frame boundaries. + + The syntax for ordered-set aggregates allows VARIADIC to be specified for both the last direct parameter and the last @@ -415,6 +555,11 @@ SELECT col FROM tab ORDER BY col USING sortop LIMIT 1; ones; any preceding parameters represent additional direct arguments that are not constrained to match the aggregated arguments. + + + Currently, ordered-set aggregates do not need to support + moving-aggregate mode, since they cannot be used as window functions. + diff --git a/doc/src/sgml/xaggr.sgml b/doc/src/sgml/xaggr.sgml index e77ef12e5c..cbbb051911 100644 --- a/doc/src/sgml/xaggr.sgml +++ b/doc/src/sgml/xaggr.sgml @@ -131,6 +131,161 @@ CREATE AGGREGATE avg (float8) + + Aggregate function calls in SQL allow DISTINCT + and ORDER BY options that control which rows are fed + to the aggregate's transition function and in what order. These + options are implemented behind the scenes and are not the concern + of the aggregate's support functions. + + + + For further details see the + + command. + + + + Moving-Aggregate Mode + + + moving-aggregate mode + + + + aggregate function + moving aggregate + + + + Aggregate functions can optionally support moving-aggregate + mode, which allows substantially faster execution of aggregate + functions within windows with moving frame starting points. + (See + and for information about use of + aggregate functions as window functions.) + The basic idea is that in addition to a normal forward + transition function, the aggregate provides an inverse + transition function, which allows rows to be removed from the + aggregate's running state value when they exit the window frame. + For example a sum aggregate, which uses addition as the + forward transition function, would use subtraction as the inverse + transition function. Without an inverse transition function, the window + function mechanism must recalculate the aggregate from scratch each time + the frame starting point moves, resulting in run time proportional to the + number of input rows times the average frame length. With an inverse + transition function, the run time is only proportional to the number of + input rows. + + + + The inverse transition function is passed the current state value and the + aggregate input value(s) for the earliest row included in the current + state. It must reconstruct what the state value would have been if the + given input value had never been aggregated, but only the rows following + it. This sometimes requires that the forward transition function keep + more state than is needed for plain aggregation mode. Therefore, the + moving-aggregate mode uses a completely separate implementation from the + plain mode: it has its own state data type, its own forward transition + function, and its own final function if needed. These can be the same as + the plain mode's data type and functions, if there is no need for extra + state. + + + + As an example, we could extend the sum aggregate given above + to support moving-aggregate mode like this: + + +CREATE AGGREGATE sum (complex) +( + sfunc = complex_add, + stype = complex, + initcond = '(0,0)', + msfunc = complex_add, + minvfunc = complex_sub, + mstype = complex, + minitcond = '(0,0)' +); + + + The parameters whose names begin with m define the + moving-aggregate implementation. Except for the inverse transition + function minvfunc, they correspond to the plain-aggregate + parameters without m. + + + + The forward transition function for moving-aggregate mode is not allowed + to return NULL as the new state value. If the inverse transition + function returns NULL, this is taken as an indication that the inverse + function cannot reverse the state calculation for this particular input, + and so the aggregate calculation will be redone from scratch for the + current frame starting position. This convention allows moving-aggregate + mode to be used in situations where there are some infrequent cases that + are impractical to reverse out of the running state value. The inverse + transition function can punt on these cases, and yet still come + out ahead so long as it can work for most cases. As an example, an + aggregate working with floating-point numbers might choose to punt when + a NaN (not a number) input has to be removed from the running + state value. + + + + When writing moving-aggregate support functions, it is important to be + sure that the inverse transition function can reconstruct the correct + state value exactly. Otherwise there might be user-visible differences + in results depending on whether the moving-aggregate mode is used. + An example of an aggregate for which adding an inverse transition + function seems easy at first, yet where this requirement cannot be met + is sum over float4 or float8 inputs. A + naive declaration of sum(float8) could be + + +CREATE AGGREGATE unsafe_sum (float8) +( + stype = float8, + sfunc = float8pl, + mstype = float8, + msfunc = float8pl, + minvfunc = float8mi +); + + + This aggregate, however, can give wildly different results than it would + have without the inverse transition function. For example, consider + + +SELECT + unsafe_sum(x) OVER (ORDER BY n ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING) +FROM (VALUES (1, 1.0e20::float8), + (2, 1.0::float8)) AS v (n,x); + + + This query returns 0 as its second result, rather than the + expected answer of 1. The cause is the limited precision of + floating-point values: adding 1 to 1e20 results + in 1e20 again, and so subtracting 1e20 from that + yields 0, not 1. Note that this is a limitation + of floating-point arithmetic in general, not a limitation + of PostgreSQL. + + + + + + Polymorphic and Variadic Aggregates + + + aggregate function + polymorphic + + + + aggregate function + variadic + + Aggregate functions can use polymorphic state transition functions or final functions, so that the same functions @@ -189,8 +344,8 @@ SELECT attrelid::regclass, array_accum(atttypid::regtype) by declaring its last argument as a VARIADIC array, in much the same fashion as for regular functions; see . The aggregate's transition - function must have the same array type as its last argument. The - transition function typically would also be marked VARIADIC, + function(s) must have the same array type as their last argument. The + transition function(s) typically would also be marked VARIADIC, but this is not strictly required. @@ -220,13 +375,15 @@ SELECT myaggregate(a, b, c ORDER BY a) FROM ... - - Aggregate function calls in SQL allow DISTINCT - and ORDER BY options that control which rows are fed - to the aggregate's transition function and in what order. These - options are implemented behind the scenes and are not the concern - of the aggregate's support functions. - + + + + Ordered-Set Aggregates + + + aggregate function + ordered set + The aggregates we have been describing so far are normal @@ -311,6 +468,21 @@ SELECT percentile_disc(0.5) WITHIN GROUP (ORDER BY income) FROM households; returns anyelement. + + Currently, ordered-set aggregates cannot be used as window functions, + and therefore there is no need for them to support moving-aggregate mode. + + + + + + Support Functions for Aggregates + + + aggregate function + support functions for + + A function written in C can detect that it is being called as an aggregate transition or final function by calling @@ -341,9 +513,6 @@ if (AggCheckCallContext(fcinfo, NULL)) source code. - - For further details see the - - command. - + + diff --git a/src/backend/catalog/pg_aggregate.c b/src/backend/catalog/pg_aggregate.c index fe6dc8a9a2..633b8f1d6a 100644 --- a/src/backend/catalog/pg_aggregate.c +++ b/src/backend/catalog/pg_aggregate.c @@ -57,10 +57,16 @@ AggregateCreate(const char *aggName, Oid variadicArgType, List *aggtransfnName, List *aggfinalfnName, + List *aggmtransfnName, + List *aggminvtransfnName, + List *aggmfinalfnName, List *aggsortopName, Oid aggTransType, int32 aggTransSpace, - const char *agginitval) + Oid aggmTransType, + int32 aggmTransSpace, + const char *agginitval, + const char *aggminitval) { Relation aggdesc; HeapTuple tup; @@ -69,14 +75,19 @@ AggregateCreate(const char *aggName, Form_pg_proc proc; Oid transfn; Oid finalfn = InvalidOid; /* can be omitted */ + Oid mtransfn = InvalidOid; /* can be omitted */ + Oid minvtransfn = InvalidOid; /* can be omitted */ + Oid mfinalfn = InvalidOid; /* can be omitted */ Oid sortop = InvalidOid; /* can be omitted */ Oid *aggArgTypes = parameterTypes->values; bool hasPolyArg; bool hasInternalArg; + bool mtransIsStrict = false; Oid rettype; Oid finaltype; Oid fnArgs[FUNC_MAX_ARGS]; int nargs_transfn; + int nargs_finalfn; Oid procOid; TupleDesc tupDesc; int i; @@ -128,6 +139,16 @@ AggregateCreate(const char *aggName, errmsg("cannot determine transition data type"), errdetail("An aggregate using a polymorphic transition type must have at least one polymorphic argument."))); + /* + * Likewise for moving-aggregate transtype, if any + */ + if (OidIsValid(aggmTransType) && + IsPolymorphicType(aggmTransType) && !hasPolyArg) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("cannot determine transition data type"), + errdetail("An aggregate using a polymorphic transition type must have at least one polymorphic argument."))); + /* * An ordered-set aggregate that is VARIADIC must be VARIADIC ANY. In * principle we could support regular variadic types, but it would make @@ -234,32 +255,120 @@ AggregateCreate(const char *aggName, (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), errmsg("must not omit initial value when transition function is strict and transition type is not compatible with input type"))); } + ReleaseSysCache(tup); - /* handle finalfn, if supplied */ - if (aggfinalfnName) + /* handle moving-aggregate transfn, if supplied */ + if (aggmtransfnName) { - int nargs_finalfn; + /* + * The arguments are the same as for the regular transfn, except that + * the transition data type might be different. So re-use the fnArgs + * values set up above, except for that one. + */ + Assert(OidIsValid(aggmTransType)); + fnArgs[0] = aggmTransType; + + mtransfn = lookup_agg_function(aggmtransfnName, nargs_transfn, + fnArgs, variadicArgType, + &rettype); + + /* As above, return type must exactly match declared mtranstype. */ + if (rettype != aggmTransType) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("return type of transition function %s is not %s", + NameListToString(aggmtransfnName), + format_type_be(aggmTransType)))); + + tup = SearchSysCache1(PROCOID, ObjectIdGetDatum(mtransfn)); + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for function %u", mtransfn); + proc = (Form_pg_proc) GETSTRUCT(tup); /* - * For ordinary aggs, the finalfn just takes the transtype. For - * ordered-set aggs, it takes the transtype plus all args. (The - * aggregated args are useless at runtime, and are actually passed as - * NULLs, but we may need them in the function signature to allow - * resolution of a polymorphic agg's result type.) + * If the mtransfn is strict and the minitval is NULL, check first + * input type and mtranstype are binary-compatible. */ - fnArgs[0] = aggTransType; - if (AGGKIND_IS_ORDERED_SET(aggKind)) + if (proc->proisstrict && aggminitval == NULL) { - nargs_finalfn = numArgs + 1; - memcpy(fnArgs + 1, aggArgTypes, numArgs * sizeof(Oid)); - } - else - { - nargs_finalfn = 1; - /* variadic-ness of the aggregate doesn't affect finalfn */ - variadicArgType = InvalidOid; + if (numArgs < 1 || + !IsBinaryCoercible(aggArgTypes[0], aggmTransType)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("must not omit initial value when transition function is strict and transition type is not compatible with input type"))); } + + /* Remember if mtransfn is strict; we may need this below */ + mtransIsStrict = proc->proisstrict; + + ReleaseSysCache(tup); + } + + /* handle minvtransfn, if supplied */ + if (aggminvtransfnName) + { + /* + * This must have the same number of arguments with the same types as + * the forward transition function, so just re-use the fnArgs data. + */ + Assert(aggmtransfnName); + + minvtransfn = lookup_agg_function(aggminvtransfnName, nargs_transfn, + fnArgs, variadicArgType, + &rettype); + + /* As above, return type must exactly match declared mtranstype. */ + if (rettype != aggmTransType) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("return type of inverse transition function %s is not %s", + NameListToString(aggminvtransfnName), + format_type_be(aggmTransType)))); + + tup = SearchSysCache1(PROCOID, ObjectIdGetDatum(minvtransfn)); + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for function %u", minvtransfn); + proc = (Form_pg_proc) GETSTRUCT(tup); + + /* + * We require the strictness settings of the forward and inverse + * transition functions to agree. This saves having to handle + * assorted special cases at execution time. + */ + if (proc->proisstrict != mtransIsStrict) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("strictness of aggregate's forward and inverse transition functions must match"))); + + ReleaseSysCache(tup); + } + + /* + * Set up fnArgs for looking up finalfn(s) + * + * For ordinary aggs, the finalfn just takes the transtype. For + * ordered-set aggs, it takes the transtype plus all args. (The + * aggregated args are useless at runtime, and are actually passed as + * NULLs, but we may need them in the function signature to allow + * resolution of a polymorphic agg's result type.) + */ + fnArgs[0] = aggTransType; + if (AGGKIND_IS_ORDERED_SET(aggKind)) + { + nargs_finalfn = numArgs + 1; + memcpy(fnArgs + 1, aggArgTypes, numArgs * sizeof(Oid)); + } + else + { + nargs_finalfn = 1; + /* variadic-ness of the aggregate doesn't affect finalfn */ + variadicArgType = InvalidOid; + } + + /* handle finalfn, if supplied */ + if (aggfinalfnName) + { finalfn = lookup_agg_function(aggfinalfnName, nargs_finalfn, fnArgs, variadicArgType, &finaltype); @@ -314,6 +423,49 @@ AggregateCreate(const char *aggName, errmsg("unsafe use of pseudo-type \"internal\""), errdetail("A function returning \"internal\" must have at least one \"internal\" argument."))); + /* + * If a moving-aggregate implementation is supplied, look up its finalfn + * if any, and check that the implied aggregate result type matches the + * plain implementation. + */ + if (OidIsValid(aggmTransType)) + { + /* handle finalfn, if supplied */ + if (aggmfinalfnName) + { + /* + * The arguments are the same as for the regular finalfn, except + * that the transition data type might be different. So re-use + * the fnArgs values set up above, except for that one. + */ + fnArgs[0] = aggmTransType; + + mfinalfn = lookup_agg_function(aggmfinalfnName, nargs_finalfn, + fnArgs, variadicArgType, + &rettype); + + /* As above, check strictness if it's an ordered-set agg */ + if (AGGKIND_IS_ORDERED_SET(aggKind) && func_strict(mfinalfn)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("final function of an ordered-set aggregate must not be declared STRICT"))); + } + else + { + /* + * If no finalfn, aggregate result type is type of the state value + */ + rettype = aggmTransType; + } + Assert(OidIsValid(rettype)); + if (rettype != finaltype) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("moving-aggregate implementation returns type %s, but plain implementation returns type %s", + format_type_be(aggmTransType), + format_type_be(aggTransType)))); + } + /* handle sortop, if supplied */ if (aggsortopName) { @@ -340,6 +492,13 @@ AggregateCreate(const char *aggName, if (aclresult != ACLCHECK_OK) aclcheck_error_type(aclresult, aggTransType); + if (OidIsValid(aggmTransType)) + { + aclresult = pg_type_aclcheck(aggmTransType, GetUserId(), ACL_USAGE); + if (aclresult != ACLCHECK_OK) + aclcheck_error_type(aclresult, aggmTransType); + } + aclresult = pg_type_aclcheck(finaltype, GetUserId(), ACL_USAGE); if (aclresult != ACLCHECK_OK) aclcheck_error_type(aclresult, finaltype); @@ -392,13 +551,22 @@ AggregateCreate(const char *aggName, values[Anum_pg_aggregate_aggnumdirectargs - 1] = Int16GetDatum(numDirectArgs); values[Anum_pg_aggregate_aggtransfn - 1] = ObjectIdGetDatum(transfn); values[Anum_pg_aggregate_aggfinalfn - 1] = ObjectIdGetDatum(finalfn); + values[Anum_pg_aggregate_aggmtransfn - 1] = ObjectIdGetDatum(mtransfn); + values[Anum_pg_aggregate_aggminvtransfn - 1] = ObjectIdGetDatum(minvtransfn); + values[Anum_pg_aggregate_aggmfinalfn - 1] = ObjectIdGetDatum(mfinalfn); values[Anum_pg_aggregate_aggsortop - 1] = ObjectIdGetDatum(sortop); values[Anum_pg_aggregate_aggtranstype - 1] = ObjectIdGetDatum(aggTransType); values[Anum_pg_aggregate_aggtransspace - 1] = Int32GetDatum(aggTransSpace); + values[Anum_pg_aggregate_aggmtranstype - 1] = ObjectIdGetDatum(aggmTransType); + values[Anum_pg_aggregate_aggmtransspace - 1] = Int32GetDatum(aggmTransSpace); if (agginitval) values[Anum_pg_aggregate_agginitval - 1] = CStringGetTextDatum(agginitval); else nulls[Anum_pg_aggregate_agginitval - 1] = true; + if (aggminitval) + values[Anum_pg_aggregate_aggminitval - 1] = CStringGetTextDatum(aggminitval); + else + nulls[Anum_pg_aggregate_aggminitval - 1] = true; aggdesc = heap_open(AggregateRelationId, RowExclusiveLock); tupDesc = aggdesc->rd_att; @@ -414,6 +582,7 @@ AggregateCreate(const char *aggName, * Create dependencies for the aggregate (above and beyond those already * made by ProcedureCreate). Note: we don't need an explicit dependency * on aggTransType since we depend on it indirectly through transfn. + * Likewise for aggmTransType if any. */ myself.classId = ProcedureRelationId; myself.objectId = procOid; @@ -434,6 +603,33 @@ AggregateCreate(const char *aggName, recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL); } + /* Depends on forward transition function, if any */ + if (OidIsValid(mtransfn)) + { + referenced.classId = ProcedureRelationId; + referenced.objectId = mtransfn; + referenced.objectSubId = 0; + recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL); + } + + /* Depends on inverse transition function, if any */ + if (OidIsValid(minvtransfn)) + { + referenced.classId = ProcedureRelationId; + referenced.objectId = minvtransfn; + referenced.objectSubId = 0; + recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL); + } + + /* Depends on final function, if any */ + if (OidIsValid(mfinalfn)) + { + referenced.classId = ProcedureRelationId; + referenced.objectId = mfinalfn; + referenced.objectSubId = 0; + recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL); + } + /* Depends on sort operator, if any */ if (OidIsValid(sortop)) { @@ -447,7 +643,12 @@ AggregateCreate(const char *aggName, } /* - * lookup_agg_function -- common code for finding both transfn and finalfn + * lookup_agg_function + * common code for finding transfn, invtransfn and finalfn + * + * Returns OID of function, and stores its return type into *rettype + * + * NB: must not scribble on input_types[], as we may re-use those */ static Oid lookup_agg_function(List *fnName, diff --git a/src/backend/commands/aggregatecmds.c b/src/backend/commands/aggregatecmds.c index 640e19cf12..9714112f6d 100644 --- a/src/backend/commands/aggregatecmds.c +++ b/src/backend/commands/aggregatecmds.c @@ -61,11 +61,17 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters, char aggKind = AGGKIND_NORMAL; List *transfuncName = NIL; List *finalfuncName = NIL; + List *mtransfuncName = NIL; + List *minvtransfuncName = NIL; + List *mfinalfuncName = NIL; List *sortoperatorName = NIL; TypeName *baseType = NULL; TypeName *transType = NULL; + TypeName *mtransType = NULL; int32 transSpace = 0; + int32 mtransSpace = 0; char *initval = NULL; + char *minitval = NULL; int numArgs; int numDirectArgs = 0; oidvector *parameterTypes; @@ -75,7 +81,9 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters, List *parameterDefaults; Oid variadicArgType; Oid transTypeId; + Oid mtransTypeId = InvalidOid; char transTypeType; + char mtransTypeType = 0; ListCell *pl; /* Convert list of names to a name and namespace */ @@ -114,6 +122,12 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters, transfuncName = defGetQualifiedName(defel); else if (pg_strcasecmp(defel->defname, "finalfunc") == 0) finalfuncName = defGetQualifiedName(defel); + else if (pg_strcasecmp(defel->defname, "msfunc") == 0) + mtransfuncName = defGetQualifiedName(defel); + else if (pg_strcasecmp(defel->defname, "minvfunc") == 0) + minvtransfuncName = defGetQualifiedName(defel); + else if (pg_strcasecmp(defel->defname, "mfinalfunc") == 0) + mfinalfuncName = defGetQualifiedName(defel); else if (pg_strcasecmp(defel->defname, "sortop") == 0) sortoperatorName = defGetQualifiedName(defel); else if (pg_strcasecmp(defel->defname, "basetype") == 0) @@ -135,10 +149,16 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters, transType = defGetTypeName(defel); else if (pg_strcasecmp(defel->defname, "sspace") == 0) transSpace = defGetInt32(defel); + else if (pg_strcasecmp(defel->defname, "mstype") == 0) + mtransType = defGetTypeName(defel); + else if (pg_strcasecmp(defel->defname, "msspace") == 0) + mtransSpace = defGetInt32(defel); else if (pg_strcasecmp(defel->defname, "initcond") == 0) initval = defGetString(defel); else if (pg_strcasecmp(defel->defname, "initcond1") == 0) initval = defGetString(defel); + else if (pg_strcasecmp(defel->defname, "minitcond") == 0) + minitval = defGetString(defel); else ereport(WARNING, (errcode(ERRCODE_SYNTAX_ERROR), @@ -158,6 +178,46 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters, (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), errmsg("aggregate sfunc must be specified"))); + /* + * if mtransType is given, mtransfuncName and minvtransfuncName must be as + * well; if not, then none of the moving-aggregate options should have + * been given. + */ + if (mtransType != NULL) + { + if (mtransfuncName == NIL) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("aggregate msfunc must be specified when mstype is specified"))); + if (minvtransfuncName == NIL) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("aggregate minvfunc must be specified when mstype is specified"))); + } + else + { + if (mtransfuncName != NIL) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("aggregate msfunc must not be specified without mstype"))); + if (minvtransfuncName != NIL) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("aggregate minvfunc must not be specified without mstype"))); + if (mfinalfuncName != NIL) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("aggregate mfinalfunc must not be specified without mstype"))); + if (mtransSpace != 0) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("aggregate msspace must not be specified without mstype"))); + if (minitval != NULL) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("aggregate minitcond must not be specified without mstype"))); + } + /* * look up the aggregate's input datatype(s). */ @@ -250,6 +310,27 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters, format_type_be(transTypeId)))); } + /* + * If a moving-aggregate transtype is specified, look that up. Same + * restrictions as for transtype. + */ + if (mtransType) + { + mtransTypeId = typenameTypeId(NULL, mtransType); + mtransTypeType = get_typtype(mtransTypeId); + if (mtransTypeType == TYPTYPE_PSEUDO && + !IsPolymorphicType(mtransTypeId)) + { + if (mtransTypeId == INTERNALOID && superuser()) + /* okay */ ; + else + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("aggregate transition data type cannot be %s", + format_type_be(mtransTypeId)))); + } + } + /* * If we have an initval, and it's not for a pseudotype (particularly a * polymorphic type), make sure it's acceptable to the type's input @@ -268,6 +349,18 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters, (void) OidInputFunctionCall(typinput, initval, typioparam, -1); } + /* + * Likewise for moving-aggregate initval. + */ + if (minitval && mtransTypeType != TYPTYPE_PSEUDO) + { + Oid typinput, + typioparam; + + getTypeInputInfo(mtransTypeId, &typinput, &typioparam); + (void) OidInputFunctionCall(typinput, minitval, typioparam, -1); + } + /* * Most of the argument-checking is done inside of AggregateCreate */ @@ -284,8 +377,14 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters, variadicArgType, transfuncName, /* step function name */ finalfuncName, /* final function name */ + mtransfuncName, /* fwd trans function name */ + minvtransfuncName, /* inv trans function name */ + mfinalfuncName, /* final function name */ sortoperatorName, /* sort operator name */ transTypeId, /* transition data type */ transSpace, /* transition space */ - initval); /* initial condition */ + mtransTypeId, /* transition data type */ + mtransSpace, /* transition space */ + initval, /* initial condition */ + minitval); /* initial condition */ } diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c index 7e4bca5b4d..d60845bcd3 100644 --- a/src/backend/executor/nodeAgg.c +++ b/src/backend/executor/nodeAgg.c @@ -1798,8 +1798,10 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) aggref->aggtype, aggref->inputcollid, transfn_oid, + InvalidOid, /* invtrans is not needed here */ finalfn_oid, &transfnexpr, + NULL, &finalfnexpr); /* set up infrastructure for calling the transfn and finalfn */ @@ -1847,7 +1849,8 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) * type and transtype are the same (or at least binary-compatible), so * that it's OK to use the first aggregated input value as the initial * transValue. This should have been checked at agg definition time, - * but just in case... + * but we must check again in case the transfn's strictness property + * has been changed. */ if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull) { @@ -2126,6 +2129,12 @@ ExecReScanAgg(AggState *node) ExecReScan(node->ss.ps.lefttree); } + +/*********************************************************************** + * API exposed to aggregate functions + ***********************************************************************/ + + /* * AggCheckCallContext - test if a SQL function is being called as an aggregate * @@ -2152,7 +2161,7 @@ AggCheckCallContext(FunctionCallInfo fcinfo, MemoryContext *aggcontext) if (fcinfo->context && IsA(fcinfo->context, WindowAggState)) { if (aggcontext) - *aggcontext = ((WindowAggState *) fcinfo->context)->aggcontext; + *aggcontext = ((WindowAggState *) fcinfo->context)->curaggcontext; return AGG_CONTEXT_WINDOW; } diff --git a/src/backend/executor/nodeWindowAgg.c b/src/backend/executor/nodeWindowAgg.c index 0b558e5923..046637fb09 100644 --- a/src/backend/executor/nodeWindowAgg.c +++ b/src/backend/executor/nodeWindowAgg.c @@ -102,16 +102,18 @@ typedef struct WindowStatePerFuncData */ typedef struct WindowStatePerAggData { - /* Oids of transfer functions */ + /* Oids of transition functions */ Oid transfn_oid; + Oid invtransfn_oid; /* may be InvalidOid */ Oid finalfn_oid; /* may be InvalidOid */ /* - * fmgr lookup data for transfer functions --- only valid when + * fmgr lookup data for transition functions --- only valid when * corresponding oid is not InvalidOid. Note in particular that fn_strict * flags are kept here. */ FmgrInfo transfn; + FmgrInfo invtransfn; FmgrInfo finalfn; /* @@ -139,11 +141,17 @@ typedef struct WindowStatePerAggData int wfuncno; /* index of associated PerFuncData */ + /* Context holding transition value and possibly other subsidiary data */ + MemoryContext aggcontext; /* may be private, or winstate->aggcontext */ + /* Current transition value */ Datum transValue; /* current transition value */ bool transValueIsNull; - bool noTransValue; /* true if transValue not set yet */ + int64 transValueCount; /* number of currently-aggregated rows */ + + /* Data local to eval_windowaggregates() */ + bool restart; /* need to restart this agg in this cycle? */ } WindowStatePerAggData; static void initialize_windowaggregate(WindowAggState *winstate, @@ -152,6 +160,9 @@ static void initialize_windowaggregate(WindowAggState *winstate, static void advance_windowaggregate(WindowAggState *winstate, WindowStatePerFunc perfuncstate, WindowStatePerAgg peraggstate); +static bool advance_windowaggregate_base(WindowAggState *winstate, + WindowStatePerFunc perfuncstate, + WindowStatePerAgg peraggstate); static void finalize_windowaggregate(WindowAggState *winstate, WindowStatePerFunc perfuncstate, WindowStatePerAgg peraggstate, @@ -193,18 +204,27 @@ initialize_windowaggregate(WindowAggState *winstate, { MemoryContext oldContext; + /* + * If we're using a private aggcontext, we may reset it here. But if the + * context is shared, we don't know which other aggregates may still need + * it, so we must leave it to the caller to reset at an appropriate time. + */ + if (peraggstate->aggcontext != winstate->aggcontext) + MemoryContextResetAndDeleteChildren(peraggstate->aggcontext); + if (peraggstate->initValueIsNull) peraggstate->transValue = peraggstate->initValue; else { - oldContext = MemoryContextSwitchTo(winstate->aggcontext); + oldContext = MemoryContextSwitchTo(peraggstate->aggcontext); peraggstate->transValue = datumCopy(peraggstate->initValue, peraggstate->transtypeByVal, peraggstate->transtypeLen); MemoryContextSwitchTo(oldContext); } peraggstate->transValueIsNull = peraggstate->initValueIsNull; - peraggstate->noTransValue = peraggstate->initValueIsNull; + peraggstate->transValueCount = 0; + peraggstate->resultValue = (Datum) 0; peraggstate->resultValueIsNull = true; } @@ -258,7 +278,8 @@ advance_windowaggregate(WindowAggState *winstate, { /* * For a strict transfn, nothing happens when there's a NULL input; we - * just keep the prior transValue. + * just keep the prior transValue. Note transValueCount doesn't + * change either. */ for (i = 1; i <= numArguments; i++) { @@ -268,41 +289,47 @@ advance_windowaggregate(WindowAggState *winstate, return; } } - if (peraggstate->noTransValue) + + /* + * For strict transition functions with initial value NULL we use the + * first non-NULL input as the initial state. (We already checked + * that the agg's input type is binary-compatible with its transtype, + * so straight copy here is OK.) + * + * We must copy the datum into aggcontext if it is pass-by-ref. We do + * not need to pfree the old transValue, since it's NULL. + */ + if (peraggstate->transValueCount == 0 && peraggstate->transValueIsNull) { - /* - * transValue has not been initialized. This is the first non-NULL - * input value. We use it as the initial value for transValue. (We - * already checked that the agg's input type is binary-compatible - * with its transtype, so straight copy here is OK.) - * - * We must copy the datum into aggcontext if it is pass-by-ref. We - * do not need to pfree the old transValue, since it's NULL. - */ - MemoryContextSwitchTo(winstate->aggcontext); + MemoryContextSwitchTo(peraggstate->aggcontext); peraggstate->transValue = datumCopy(fcinfo->arg[1], peraggstate->transtypeByVal, peraggstate->transtypeLen); peraggstate->transValueIsNull = false; - peraggstate->noTransValue = false; + peraggstate->transValueCount = 1; MemoryContextSwitchTo(oldContext); return; } + if (peraggstate->transValueIsNull) { /* * Don't call a strict function with NULL inputs. Note it is * possible to get here despite the above tests, if the transfn is - * strict *and* returned a NULL on a prior cycle. If that happens - * we will propagate the NULL all the way to the end. + * strict *and* returned a NULL on a prior cycle. If that happens + * we will propagate the NULL all the way to the end. That can + * only happen if there's no inverse transition function, though, + * since we disallow transitions back to NULL when there is one. */ MemoryContextSwitchTo(oldContext); + Assert(!OidIsValid(peraggstate->invtransfn_oid)); return; } } /* - * OK to call the transition function + * OK to call the transition function. Set winstate->curaggcontext while + * calling it, for possible use by AggCheckCallContext. */ InitFunctionCallInfoData(*fcinfo, &(peraggstate->transfn), numArguments + 1, @@ -310,7 +337,26 @@ advance_windowaggregate(WindowAggState *winstate, (void *) winstate, NULL); fcinfo->arg[0] = peraggstate->transValue; fcinfo->argnull[0] = peraggstate->transValueIsNull; + winstate->curaggcontext = peraggstate->aggcontext; newVal = FunctionCallInvoke(fcinfo); + winstate->curaggcontext = NULL; + + /* + * Moving-aggregate transition functions must not return NULL, see + * advance_windowaggregate_base(). + */ + if (fcinfo->isnull && OidIsValid(peraggstate->invtransfn_oid)) + ereport(ERROR, + (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("moving-aggregate transition function must not return NULL"))); + + /* + * We must track the number of rows included in transValue, since to + * remove the last input, advance_windowaggregate_base() musn't call the + * inverse transition function, but simply reset transValue back to its + * initial value. + */ + peraggstate->transValueCount++; /* * If pass-by-ref datatype, must copy the new value into aggcontext and @@ -322,7 +368,161 @@ advance_windowaggregate(WindowAggState *winstate, { if (!fcinfo->isnull) { - MemoryContextSwitchTo(winstate->aggcontext); + MemoryContextSwitchTo(peraggstate->aggcontext); + newVal = datumCopy(newVal, + peraggstate->transtypeByVal, + peraggstate->transtypeLen); + } + if (!peraggstate->transValueIsNull) + pfree(DatumGetPointer(peraggstate->transValue)); + } + + MemoryContextSwitchTo(oldContext); + peraggstate->transValue = newVal; + peraggstate->transValueIsNull = fcinfo->isnull; +} + +/* + * advance_windowaggregate_base + * Remove the oldest tuple from an aggregation. + * + * This is very much like advance_windowaggregate, except that we will call + * the inverse transition function (which caller must have checked is + * available). + * + * Returns true if we successfully removed the current row from this + * aggregate, false if not (in the latter case, caller is responsible + * for cleaning up by restarting the aggregation). + */ +static bool +advance_windowaggregate_base(WindowAggState *winstate, + WindowStatePerFunc perfuncstate, + WindowStatePerAgg peraggstate) +{ + WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate; + int numArguments = perfuncstate->numArguments; + FunctionCallInfoData fcinfodata; + FunctionCallInfo fcinfo = &fcinfodata; + Datum newVal; + ListCell *arg; + int i; + MemoryContext oldContext; + ExprContext *econtext = winstate->tmpcontext; + ExprState *filter = wfuncstate->aggfilter; + + oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); + + /* Skip anything FILTERed out */ + if (filter) + { + bool isnull; + Datum res = ExecEvalExpr(filter, econtext, &isnull, NULL); + + if (isnull || !DatumGetBool(res)) + { + MemoryContextSwitchTo(oldContext); + return true; + } + } + + /* We start from 1, since the 0th arg will be the transition value */ + i = 1; + foreach(arg, wfuncstate->args) + { + ExprState *argstate = (ExprState *) lfirst(arg); + + fcinfo->arg[i] = ExecEvalExpr(argstate, econtext, + &fcinfo->argnull[i], NULL); + i++; + } + + if (peraggstate->invtransfn.fn_strict) + { + /* + * For a strict (inv)transfn, nothing happens when there's a NULL + * input; we just keep the prior transValue. Note transValueCount + * doesn't change either. + */ + for (i = 1; i <= numArguments; i++) + { + if (fcinfo->argnull[i]) + { + MemoryContextSwitchTo(oldContext); + return true; + } + } + } + + /* There should still be an added but not yet removed value */ + Assert(peraggstate->transValueCount > 0); + + /* + * In moving-aggregate mode, the state must never be NULL, except possibly + * before any rows have been aggregated (which is surely not the case at + * this point). This restriction allows us to interpret a NULL result + * from the inverse function as meaning "sorry, can't do an inverse + * transition in this case". We already checked this in + * advance_windowaggregate, but just for safety, check again. + */ + if (peraggstate->transValueIsNull) + elog(ERROR, "aggregate transition value is NULL before inverse transition"); + + /* + * We mustn't use the inverse transition function to remove the last + * input. Doing so would yield a non-NULL state, whereas we should be in + * the initial state afterwards which may very well be NULL. So instead, + * we simply re-initialize the aggregate in this case. + */ + if (peraggstate->transValueCount == 1) + { + MemoryContextSwitchTo(oldContext); + initialize_windowaggregate(winstate, + &winstate->perfunc[peraggstate->wfuncno], + peraggstate); + return true; + } + + /* + * OK to call the inverse transition function. Set + * winstate->curaggcontext while calling it, for possible use by + * AggCheckCallContext. + */ + InitFunctionCallInfoData(*fcinfo, &(peraggstate->invtransfn), + numArguments + 1, + perfuncstate->winCollation, + (void *) winstate, NULL); + fcinfo->arg[0] = peraggstate->transValue; + fcinfo->argnull[0] = peraggstate->transValueIsNull; + winstate->curaggcontext = peraggstate->aggcontext; + newVal = FunctionCallInvoke(fcinfo); + winstate->curaggcontext = NULL; + + /* + * If the function returns NULL, report failure, forcing a restart. + */ + if (fcinfo->isnull) + { + MemoryContextSwitchTo(oldContext); + return false; + } + + /* Update number of rows included in transValue */ + peraggstate->transValueCount--; + + /* + * If pass-by-ref datatype, must copy the new value into aggcontext and + * pfree the prior transValue. But if invtransfn returned a pointer to + * its first input, we don't need to do anything. + * + * Note: the checks for null values here will never fire, but it seems + * best to have this stanza look just like advance_windowaggregate. + */ + if (!peraggstate->transtypeByVal && + DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue)) + { + if (!fcinfo->isnull) + { + MemoryContextSwitchTo(peraggstate->aggcontext); newVal = datumCopy(newVal, peraggstate->transtypeByVal, peraggstate->transtypeLen); @@ -334,6 +534,8 @@ advance_windowaggregate(WindowAggState *winstate, MemoryContextSwitchTo(oldContext); peraggstate->transValue = newVal; peraggstate->transValueIsNull = fcinfo->isnull; + + return true; } /* @@ -370,7 +572,9 @@ finalize_windowaggregate(WindowAggState *winstate, } else { + winstate->curaggcontext = peraggstate->aggcontext; *result = FunctionCallInvoke(&fcinfo); + winstate->curaggcontext = NULL; *isnull = fcinfo.isnull; } } @@ -396,7 +600,9 @@ finalize_windowaggregate(WindowAggState *winstate, * eval_windowaggregates * evaluate plain aggregates being used as window functions * - * Much of this is duplicated from nodeAgg.c. But NOTE that we expect to be + * This differs from nodeAgg.c in two ways. First, if the window's frame + * start position moves, we use the inverse transition function (if it exists) + * to remove rows from the transition value. And second, we expect to be * able to call aggregate final functions repeatedly after aggregating more * data onto the same transition value. This is not a behavior required by * nodeAgg.c. @@ -406,12 +612,15 @@ eval_windowaggregates(WindowAggState *winstate) { WindowStatePerAgg peraggstate; int wfuncno, - numaggs; - int i; + numaggs, + numaggs_restart, + i; + int64 aggregatedupto_nonrestarted; MemoryContext oldContext; ExprContext *econtext; WindowObject agg_winobj; TupleTableSlot *agg_row_slot; + TupleTableSlot *temp_slot; numaggs = winstate->numaggs; if (numaggs == 0) @@ -421,6 +630,7 @@ eval_windowaggregates(WindowAggState *winstate) econtext = winstate->ss.ps.ps_ExprContext; agg_winobj = winstate->agg_winobj; agg_row_slot = winstate->agg_row_slot; + temp_slot = winstate->temp_slot_1; /* * Currently, we support only a subset of the SQL-standard window framing @@ -438,9 +648,17 @@ eval_windowaggregates(WindowAggState *winstate) * damage the running transition value, but we have the same assumption in * nodeAgg.c too (when it rescans an existing hash table). * - * For other frame start rules, we discard the aggregate state and re-run - * the aggregates whenever the frame head row moves. We can still - * optimize as above whenever successive rows share the same frame head. + * If the frame start does sometimes move, we can still optimize as above + * whenever successive rows share the same frame head, but if the frame + * head moves beyond the previous head we try to remove those rows using + * the aggregate's inverse transition function. This function restores + * the aggregate's current state to what it would be if the removed row + * had never been aggregated in the first place. Inverse transition + * functions may optionally return NULL, indicating that the function was + * unable to remove the tuple from aggregation. If this happens, or if + * the aggregate doesn't have an inverse transition function at all, we + * must perform the aggregation all over again for all tuples within the + * new frame boundaries. * * In many common cases, multiple rows share the same frame and hence the * same aggregate value. (In particular, if there's no ORDER BY in a RANGE @@ -452,75 +670,195 @@ eval_windowaggregates(WindowAggState *winstate) * 'aggregatedupto' keeps track of the first row that has not yet been * accumulated into the aggregate transition values. Whenever we start a * new peer group, we accumulate forward to the end of the peer group. - * - * TODO: Rerunning aggregates from the frame start can be pretty slow. For - * some aggregates like SUM and COUNT we could avoid that by implementing - * a "negative transition function" that would be called for each row as - * it exits the frame. We'd have to think about avoiding recalculation of - * volatile arguments of aggregate functions, too. */ /* * First, update the frame head position. + * + * The frame head should never move backwards, and the code below wouldn't + * cope if it did, so for safety we complain if it does. */ - update_frameheadpos(agg_winobj, winstate->temp_slot_1); + update_frameheadpos(agg_winobj, temp_slot); + if (winstate->frameheadpos < winstate->aggregatedbase) + elog(ERROR, "window frame head moved backward"); /* - * Initialize aggregates on first call for partition, or if the frame head - * position moved since last time. + * If the frame didn't change compared to the previous row, we can re-use + * the result values that were previously saved at the bottom of this + * function. Since we don't know the current frame's end yet, this is not + * possible to check for fully. But if the frame end mode is UNBOUNDED + * FOLLOWING or CURRENT ROW, and the current row lies within the previous + * row's frame, then the two frames' ends must coincide. Note that on the + * first row aggregatedbase == aggregatedupto, meaning this test must + * fail, so we don't need to check the "there was no previous row" case + * explicitly here. */ - if (winstate->currentpos == 0 || - winstate->frameheadpos != winstate->aggregatedbase) + if (winstate->aggregatedbase == winstate->frameheadpos && + (winstate->frameOptions & (FRAMEOPTION_END_UNBOUNDED_FOLLOWING | + FRAMEOPTION_END_CURRENT_ROW)) && + winstate->aggregatedbase <= winstate->currentpos && + winstate->aggregatedupto > winstate->currentpos) { - /* - * Discard transient aggregate values - */ - MemoryContextResetAndDeleteChildren(winstate->aggcontext); - for (i = 0; i < numaggs; i++) { peraggstate = &winstate->peragg[i]; wfuncno = peraggstate->wfuncno; - initialize_windowaggregate(winstate, - &winstate->perfunc[wfuncno], - peraggstate); + econtext->ecxt_aggvalues[wfuncno] = peraggstate->resultValue; + econtext->ecxt_aggnulls[wfuncno] = peraggstate->resultValueIsNull; } + return; + } + /*---------- + * Initialize restart flags. + * + * We restart the aggregation: + * - if we're processing the first row in the partition, or + * - if the frame's head moved and we cannot use an inverse + * transition function, or + * - if the new frame doesn't overlap the old one + * + * Note that we don't strictly need to restart in the last case, but if + * we're going to remove all rows from the aggregation anyway, a restart + * surely is faster. + *---------- + */ + numaggs_restart = 0; + for (i = 0; i < numaggs; i++) + { + peraggstate = &winstate->peragg[i]; + if (winstate->currentpos == 0 || + (winstate->aggregatedbase != winstate->frameheadpos && + !OidIsValid(peraggstate->invtransfn_oid)) || + winstate->aggregatedupto <= winstate->frameheadpos) + { + peraggstate->restart = true; + numaggs_restart++; + } + else + peraggstate->restart = false; + } + + /* + * If we have any possibly-moving aggregates, attempt to advance + * aggregatedbase to match the frame's head by removing input rows that + * fell off the top of the frame from the aggregations. This can fail, + * i.e. advance_windowaggregate_base() can return false, in which case + * we'll restart that aggregate below. + */ + while (numaggs_restart < numaggs && + winstate->aggregatedbase < winstate->frameheadpos) + { /* - * If we created a mark pointer for aggregates, keep it pushed up to - * frame head, so that tuplestore can discard unnecessary rows. + * Fetch the next tuple of those being removed. This should never fail + * as we should have been here before. */ - if (agg_winobj->markptr >= 0) - WinSetMarkPosition(agg_winobj, winstate->frameheadpos); + if (!window_gettupleslot(agg_winobj, winstate->aggregatedbase, + temp_slot)) + elog(ERROR, "could not re-fetch previously fetched frame row"); + + /* Set tuple context for evaluation of aggregate arguments */ + winstate->tmpcontext->ecxt_outertuple = temp_slot; /* - * Initialize for loop below + * Perform the inverse transition for each aggregate function in the + * window, unless it has already been marked as needing a restart. */ - ExecClearTuple(agg_row_slot); - winstate->aggregatedbase = winstate->frameheadpos; - winstate->aggregatedupto = winstate->frameheadpos; + for (i = 0; i < numaggs; i++) + { + bool ok; + + peraggstate = &winstate->peragg[i]; + if (peraggstate->restart) + continue; + + wfuncno = peraggstate->wfuncno; + ok = advance_windowaggregate_base(winstate, + &winstate->perfunc[wfuncno], + peraggstate); + if (!ok) + { + /* Inverse transition function has failed, must restart */ + peraggstate->restart = true; + numaggs_restart++; + } + } + + /* Reset per-input-tuple context after each tuple */ + ResetExprContext(winstate->tmpcontext); + + /* And advance the aggregated-row state */ + winstate->aggregatedbase++; + ExecClearTuple(temp_slot); } /* - * In UNBOUNDED_FOLLOWING mode, we don't have to recalculate aggregates - * except when the frame head moves. In END_CURRENT_ROW mode, we only - * have to recalculate when the frame head moves or currentpos has - * advanced past the place we'd aggregated up to. Check for these cases - * and if so, reuse the saved result values. + * If we successfully advanced the base rows of all the aggregates, + * aggregatedbase now equals frameheadpos; but if we failed for any, we + * must forcibly update aggregatedbase. */ - if ((winstate->frameOptions & (FRAMEOPTION_END_UNBOUNDED_FOLLOWING | - FRAMEOPTION_END_CURRENT_ROW)) && - winstate->aggregatedbase <= winstate->currentpos && - winstate->aggregatedupto > winstate->currentpos) + winstate->aggregatedbase = winstate->frameheadpos; + + /* + * If we created a mark pointer for aggregates, keep it pushed up to frame + * head, so that tuplestore can discard unnecessary rows. + */ + if (agg_winobj->markptr >= 0) + WinSetMarkPosition(agg_winobj, winstate->frameheadpos); + + /* + * Now restart the aggregates that require it. + * + * We assume that aggregates using the shared context always restart if + * *any* aggregate restarts, and we may thus clean up the shared + * aggcontext if that is the case. Private aggcontexts are reset by + * initialize_windowaggregate() if their owning aggregate restarts. If we + * aren't restarting an aggregate, we need to free any previously saved + * result for it, else we'll leak memory. + */ + if (numaggs_restart > 0) + MemoryContextResetAndDeleteChildren(winstate->aggcontext); + for (i = 0; i < numaggs; i++) { - for (i = 0; i < numaggs; i++) + peraggstate = &winstate->peragg[i]; + + /* Aggregates using the shared ctx must restart if *any* agg does */ + Assert(peraggstate->aggcontext != winstate->aggcontext || + numaggs_restart == 0 || + peraggstate->restart); + + if (peraggstate->restart) { - peraggstate = &winstate->peragg[i]; wfuncno = peraggstate->wfuncno; - econtext->ecxt_aggvalues[wfuncno] = peraggstate->resultValue; - econtext->ecxt_aggnulls[wfuncno] = peraggstate->resultValueIsNull; + initialize_windowaggregate(winstate, + &winstate->perfunc[wfuncno], + peraggstate); + } + else if (!peraggstate->resultValueIsNull) + { + if (!peraggstate->resulttypeByVal) + pfree(DatumGetPointer(peraggstate->resultValue)); + peraggstate->resultValue = (Datum) 0; + peraggstate->resultValueIsNull = true; } - return; + } + + /* + * Non-restarted aggregates now contain the rows between aggregatedbase + * (i.e., frameheadpos) and aggregatedupto, while restarted aggregates + * contain no rows. If there are any restarted aggregates, we must thus + * begin aggregating anew at frameheadpos, otherwise we may simply + * continue at aggregatedupto. We must remember the old value of + * aggregatedupto to know how long to skip advancing non-restarted + * aggregates. If we modify aggregatedupto, we must also clear + * agg_row_slot, per the loop invariant below. + */ + aggregatedupto_nonrestarted = winstate->aggregatedupto; + if (numaggs_restart > 0 && + winstate->aggregatedupto != winstate->frameheadpos) + { + winstate->aggregatedupto = winstate->frameheadpos; + ExecClearTuple(agg_row_slot); } /* @@ -551,6 +889,12 @@ eval_windowaggregates(WindowAggState *winstate) for (i = 0; i < numaggs; i++) { peraggstate = &winstate->peragg[i]; + + /* Non-restarted aggs skip until aggregatedupto_nonrestarted */ + if (!peraggstate->restart && + winstate->aggregatedupto < aggregatedupto_nonrestarted) + continue; + wfuncno = peraggstate->wfuncno; advance_windowaggregate(winstate, &winstate->perfunc[wfuncno], @@ -565,6 +909,9 @@ eval_windowaggregates(WindowAggState *winstate) ExecClearTuple(agg_row_slot); } + /* The frame's end is not supposed to move backwards, ever */ + Assert(aggregatedupto_nonrestarted <= winstate->aggregatedupto); + /* * finalize aggregates and fill result/isnull fields. */ @@ -589,28 +936,14 @@ eval_windowaggregates(WindowAggState *winstate) * advance that the next row can't possibly share the same frame. Is * it worth detecting that and skipping this code? */ - if (!peraggstate->resulttypeByVal) + if (!peraggstate->resulttypeByVal && !*isnull) { - /* - * clear old resultValue in order not to leak memory. (Note: the - * new result can't possibly be the same datum as old resultValue, - * because we never passed it to the trans function.) - */ - if (!peraggstate->resultValueIsNull) - pfree(DatumGetPointer(peraggstate->resultValue)); - - /* - * If pass-by-ref, copy it into our aggregate context. - */ - if (!*isnull) - { - oldContext = MemoryContextSwitchTo(winstate->aggcontext); - peraggstate->resultValue = - datumCopy(*result, - peraggstate->resulttypeByVal, - peraggstate->resulttypeLen); - MemoryContextSwitchTo(oldContext); - } + oldContext = MemoryContextSwitchTo(peraggstate->aggcontext); + peraggstate->resultValue = + datumCopy(*result, + peraggstate->resulttypeByVal, + peraggstate->resulttypeLen); + MemoryContextSwitchTo(oldContext); } else { @@ -650,6 +983,8 @@ eval_windowfunction(WindowAggState *winstate, WindowStatePerFunc perfuncstate, (void *) perfuncstate->winobj, NULL); /* Just in case, make all the regular argument slots be null */ memset(fcinfo.argnull, true, perfuncstate->numArguments); + /* Window functions don't have a current aggregate context, either */ + winstate->curaggcontext = NULL; *result = FunctionCallInvoke(&fcinfo); *isnull = fcinfo.isnull; @@ -870,6 +1205,11 @@ release_partition(WindowAggState *winstate) */ MemoryContextResetAndDeleteChildren(winstate->partcontext); MemoryContextResetAndDeleteChildren(winstate->aggcontext); + for (i = 0; i < winstate->numaggs; i++) + { + if (winstate->peragg[i].aggcontext != winstate->aggcontext) + MemoryContextResetAndDeleteChildren(winstate->peragg[i].aggcontext); + } if (winstate->buffer) tuplestore_end(winstate->buffer); @@ -1450,7 +1790,12 @@ ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags) ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); - /* Create mid-lived context for aggregate trans values etc */ + /* + * Create mid-lived context for aggregate trans values etc. + * + * Note that moving aggregates each use their own private context, not + * this one. + */ winstate->aggcontext = AllocSetContextCreate(CurrentMemoryContext, "WindowAgg_Aggregates", @@ -1657,12 +2002,10 @@ void ExecEndWindowAgg(WindowAggState *node) { PlanState *outerPlan; + int i; release_partition(node); - pfree(node->perfunc); - pfree(node->peragg); - ExecClearTuple(node->ss.ss_ScanTupleSlot); ExecClearTuple(node->first_part_slot); ExecClearTuple(node->agg_row_slot); @@ -1676,9 +2019,17 @@ ExecEndWindowAgg(WindowAggState *node) node->ss.ps.ps_ExprContext = node->tmpcontext; ExecFreeExprContext(&node->ss.ps); + for (i = 0; i < node->numaggs; i++) + { + if (node->peragg[i].aggcontext != node->aggcontext) + MemoryContextDelete(node->peragg[i].aggcontext); + } MemoryContextDelete(node->partcontext); MemoryContextDelete(node->aggcontext); + pfree(node->perfunc); + pfree(node->peragg); + outerPlan = outerPlanState(node); ExecEndNode(outerPlan); } @@ -1733,10 +2084,13 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, HeapTuple aggTuple; Form_pg_aggregate aggform; Oid aggtranstype; + AttrNumber initvalAttNo; AclResult aclresult; Oid transfn_oid, + invtransfn_oid, finalfn_oid; Expr *transfnexpr, + *invtransfnexpr, *finalfnexpr; Datum textInitVal; int i; @@ -1756,14 +2110,40 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, wfunc->winfnoid); aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple); + /* + * Figure out whether we want to use the moving-aggregate implementation, + * and collect the right set of fields from the pg_attribute entry. + * + * If the frame head can't move, we don't need moving-aggregate code. Even + * if we'd like to use it, don't do so if the aggregate's arguments (and + * FILTER clause if any) contain any calls to volatile functions. + * Otherwise, the difference between restarting and not restarting the + * aggregation would be user-visible. + */ + if (OidIsValid(aggform->aggminvtransfn) && + !(winstate->frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING) && + !contain_volatile_functions((Node *) wfunc)) + { + peraggstate->transfn_oid = transfn_oid = aggform->aggmtransfn; + peraggstate->invtransfn_oid = invtransfn_oid = aggform->aggminvtransfn; + peraggstate->finalfn_oid = finalfn_oid = aggform->aggmfinalfn; + aggtranstype = aggform->aggmtranstype; + initvalAttNo = Anum_pg_aggregate_aggminitval; + } + else + { + peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn; + peraggstate->invtransfn_oid = invtransfn_oid = InvalidOid; + peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn; + aggtranstype = aggform->aggtranstype; + initvalAttNo = Anum_pg_aggregate_agginitval; + } + /* * ExecInitWindowAgg already checked permission to call aggregate function * ... but we still need to check the component functions */ - peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn; - peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn; - /* Check that aggregate owner has permission to call component fns */ { HeapTuple procTuple; @@ -1783,6 +2163,17 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, aclcheck_error(aclresult, ACL_KIND_PROC, get_func_name(transfn_oid)); InvokeFunctionExecuteHook(transfn_oid); + + if (OidIsValid(invtransfn_oid)) + { + aclresult = pg_proc_aclcheck(invtransfn_oid, aggOwner, + ACL_EXECUTE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, ACL_KIND_PROC, + get_func_name(invtransfn_oid)); + InvokeFunctionExecuteHook(invtransfn_oid); + } + if (OidIsValid(finalfn_oid)) { aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner, @@ -1796,7 +2187,7 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, /* resolve actual type of transition state, if polymorphic */ aggtranstype = resolve_aggregate_transtype(wfunc->winfnoid, - aggform->aggtranstype, + aggtranstype, inputTypes, numArguments); @@ -1810,13 +2201,21 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, wfunc->wintype, wfunc->inputcollid, transfn_oid, + invtransfn_oid, finalfn_oid, &transfnexpr, + &invtransfnexpr, &finalfnexpr); fmgr_info(transfn_oid, &peraggstate->transfn); fmgr_info_set_expr((Node *) transfnexpr, &peraggstate->transfn); + if (OidIsValid(invtransfn_oid)) + { + fmgr_info(invtransfn_oid, &peraggstate->invtransfn); + fmgr_info_set_expr((Node *) invtransfnexpr, &peraggstate->invtransfn); + } + if (OidIsValid(finalfn_oid)) { fmgr_info(finalfn_oid, &peraggstate->finalfn); @@ -1834,8 +2233,7 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, * initval is potentially null, so don't try to access it as a struct * field. Must do it the hard way with SysCacheGetAttr. */ - textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple, - Anum_pg_aggregate_agginitval, + textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple, initvalAttNo, &peraggstate->initValueIsNull); if (peraggstate->initValueIsNull) @@ -1848,7 +2246,8 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, * If the transfn is strict and the initval is NULL, make sure input type * and transtype are the same (or at least binary-compatible), so that * it's OK to use the first input value as the initial transValue. This - * should have been checked at agg definition time, but just in case... + * should have been checked at agg definition time, but we must check + * again in case the transfn's strictness property has been changed. */ if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull) { @@ -1860,6 +2259,44 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, wfunc->winfnoid))); } + /* + * Insist that forward and inverse transition functions have the same + * strictness setting. Allowing them to differ would require handling + * more special cases in advance_windowaggregate and + * advance_windowaggregate_base, for no discernible benefit. This should + * have been checked at agg definition time, but we must check again in + * case either function's strictness property has been changed. + */ + if (OidIsValid(invtransfn_oid) && + peraggstate->transfn.fn_strict != peraggstate->invtransfn.fn_strict) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("strictness of aggregate's forward and inverse transition functions must match"))); + + /* + * Moving aggregates use their own aggcontext. + * + * This is necessary because they might restart at different times, so we + * might never be able to reset the shared context otherwise. We can't + * make it the aggregates' responsibility to clean up after themselves, + * because strict aggregates must be restarted whenever we remove their + * last non-NULL input, which the aggregate won't be aware is happening. + * Also, just pfree()ing the transValue upon restarting wouldn't help, + * since we'd miss any indirectly referenced data. We could, in theory, + * make the memory allocation rules for moving aggregates different than + * they have historically been for plain aggregates, but that seems grotty + * and likely to lead to memory leaks. + */ + if (OidIsValid(invtransfn_oid)) + peraggstate->aggcontext = + AllocSetContextCreate(CurrentMemoryContext, + "WindowAgg_AggregatePrivate", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + else + peraggstate->aggcontext = winstate->aggcontext; + ReleaseSysCache(aggTuple); return peraggstate; diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c index 201529b885..3f307e6464 100644 --- a/src/backend/optimizer/util/clauses.c +++ b/src/backend/optimizer/util/clauses.c @@ -471,7 +471,11 @@ count_agg_clauses_walker(Node *node, count_agg_clauses_context *context) Assert(aggref->agglevelsup == 0); - /* fetch info about aggregate from pg_aggregate */ + /* + * Fetch info about aggregate from pg_aggregate. Note it's correct to + * ignore the moving-aggregate variant, since what we're concerned + * with here is aggregates not window functions. + */ aggTuple = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(aggref->aggfnoid)); if (!HeapTupleIsValid(aggTuple)) diff --git a/src/backend/parser/parse_agg.c b/src/backend/parser/parse_agg.c index 9613e2aec8..272d27f919 100644 --- a/src/backend/parser/parse_agg.c +++ b/src/backend/parser/parse_agg.c @@ -1187,11 +1187,13 @@ resolve_aggregate_transtype(Oid aggfuncid, * For an ordered-set aggregate, remember that agg_input_types describes * the direct arguments followed by the aggregated arguments. * - * transfn_oid and finalfn_oid identify the funcs to be called; the latter - * may be InvalidOid. + * transfn_oid, invtransfn_oid and finalfn_oid identify the funcs to be + * called; the latter two may be InvalidOid. * - * Pointers to the constructed trees are returned into *transfnexpr and - * *finalfnexpr. The latter is set to NULL if there's no finalfn. + * Pointers to the constructed trees are returned into *transfnexpr, + * *invtransfnexpr and *finalfnexpr. If there is no invtransfn or finalfn, + * the respective pointers are set to NULL. Since use of the invtransfn is + * optional, NULL may be passed for invtransfnexpr. */ void build_aggregate_fnexprs(Oid *agg_input_types, @@ -1203,8 +1205,10 @@ build_aggregate_fnexprs(Oid *agg_input_types, Oid agg_result_type, Oid agg_input_collation, Oid transfn_oid, + Oid invtransfn_oid, Oid finalfn_oid, Expr **transfnexpr, + Expr **invtransfnexpr, Expr **finalfnexpr) { Param *argp; @@ -1249,6 +1253,26 @@ build_aggregate_fnexprs(Oid *agg_input_types, fexpr->funcvariadic = agg_variadic; *transfnexpr = (Expr *) fexpr; + /* + * Build invtransfn expression if requested, with same args as transfn + */ + if (invtransfnexpr != NULL) + { + if (OidIsValid(invtransfn_oid)) + { + fexpr = makeFuncExpr(invtransfn_oid, + agg_state_type, + args, + InvalidOid, + agg_input_collation, + COERCE_EXPLICIT_CALL); + fexpr->funcvariadic = agg_variadic; + *invtransfnexpr = (Expr *) fexpr; + } + else + *invtransfnexpr = NULL; + } + /* see if we have a final function */ if (!OidIsValid(finalfn_oid)) { diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 2653ef052b..a6c0428501 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -11548,20 +11548,32 @@ dumpAgg(Archive *fout, AggInfo *agginfo) PGresult *res; int i_aggtransfn; int i_aggfinalfn; + int i_aggmtransfn; + int i_aggminvtransfn; + int i_aggmfinalfn; int i_aggsortop; int i_hypothetical; int i_aggtranstype; int i_aggtransspace; + int i_aggmtranstype; + int i_aggmtransspace; int i_agginitval; + int i_aggminitval; int i_convertok; const char *aggtransfn; const char *aggfinalfn; + const char *aggmtransfn; + const char *aggminvtransfn; + const char *aggmfinalfn; const char *aggsortop; char *aggsortconvop; bool hypothetical; const char *aggtranstype; const char *aggtransspace; + const char *aggmtranstype; + const char *aggmtransspace; const char *agginitval; + const char *aggminitval; bool convertok; /* Skip if not to be dumped */ @@ -11582,9 +11594,12 @@ dumpAgg(Archive *fout, AggInfo *agginfo) { appendPQExpBuffer(query, "SELECT aggtransfn, " "aggfinalfn, aggtranstype::pg_catalog.regtype, " + "aggmtransfn, aggminvtransfn, aggmfinalfn, " + "aggmtranstype::pg_catalog.regtype, " "aggsortop::pg_catalog.regoperator, " "(aggkind = 'h') as hypothetical, " "aggtransspace, agginitval, " + "aggmtransspace, aggminitval, " "'t'::boolean AS convertok, " "pg_catalog.pg_get_function_arguments(p.oid) AS funcargs, " "pg_catalog.pg_get_function_identity_arguments(p.oid) AS funciargs " @@ -11597,9 +11612,12 @@ dumpAgg(Archive *fout, AggInfo *agginfo) { appendPQExpBuffer(query, "SELECT aggtransfn, " "aggfinalfn, aggtranstype::pg_catalog.regtype, " + "'-' AS aggmtransfn, '-' AS aggminvtransfn, " + "'-' AS aggmfinalfn, 0 AS aggmtranstype, " "aggsortop::pg_catalog.regoperator, " "false as hypothetical, " "0 AS aggtransspace, agginitval, " + "0 AS aggmtransspace, NULL AS aggminitval, " "'t'::boolean AS convertok, " "pg_catalog.pg_get_function_arguments(p.oid) AS funcargs, " "pg_catalog.pg_get_function_identity_arguments(p.oid) AS funciargs " @@ -11612,9 +11630,12 @@ dumpAgg(Archive *fout, AggInfo *agginfo) { appendPQExpBuffer(query, "SELECT aggtransfn, " "aggfinalfn, aggtranstype::pg_catalog.regtype, " + "'-' AS aggmtransfn, '-' AS aggminvtransfn, " + "'-' AS aggmfinalfn, 0 AS aggmtranstype, " "aggsortop::pg_catalog.regoperator, " "false as hypothetical, " "0 AS aggtransspace, agginitval, " + "0 AS aggmtransspace, NULL AS aggminitval, " "'t'::boolean AS convertok " "FROM pg_catalog.pg_aggregate a, pg_catalog.pg_proc p " "WHERE a.aggfnoid = p.oid " @@ -11625,9 +11646,12 @@ dumpAgg(Archive *fout, AggInfo *agginfo) { appendPQExpBuffer(query, "SELECT aggtransfn, " "aggfinalfn, aggtranstype::pg_catalog.regtype, " + "'-' AS aggmtransfn, '-' AS aggminvtransfn, " + "'-' AS aggmfinalfn, 0 AS aggmtranstype, " "0 AS aggsortop, " "'f'::boolean as hypothetical, " "0 AS aggtransspace, agginitval, " + "0 AS aggmtransspace, NULL AS aggminitval, " "'t'::boolean AS convertok " "FROM pg_catalog.pg_aggregate a, pg_catalog.pg_proc p " "WHERE a.aggfnoid = p.oid " @@ -11638,9 +11662,12 @@ dumpAgg(Archive *fout, AggInfo *agginfo) { appendPQExpBuffer(query, "SELECT aggtransfn, aggfinalfn, " "format_type(aggtranstype, NULL) AS aggtranstype, " + "'-' AS aggmtransfn, '-' AS aggminvtransfn, " + "'-' AS aggmfinalfn, 0 AS aggmtranstype, " "0 AS aggsortop, " "'f'::boolean as hypothetical, " "0 AS aggtransspace, agginitval, " + "0 AS aggmtransspace, NULL AS aggminitval, " "'t'::boolean AS convertok " "FROM pg_aggregate " "WHERE oid = '%u'::oid", @@ -11651,9 +11678,12 @@ dumpAgg(Archive *fout, AggInfo *agginfo) appendPQExpBuffer(query, "SELECT aggtransfn1 AS aggtransfn, " "aggfinalfn, " "(SELECT typname FROM pg_type WHERE oid = aggtranstype1) AS aggtranstype, " + "'-' AS aggmtransfn, '-' AS aggminvtransfn, " + "'-' AS aggmfinalfn, 0 AS aggmtranstype, " "0 AS aggsortop, " "'f'::boolean as hypothetical, " "0 AS aggtransspace, agginitval1 AS agginitval, " + "0 AS aggmtransspace, NULL AS aggminitval, " "(aggtransfn2 = 0 and aggtranstype2 = 0 and agginitval2 is null) AS convertok " "FROM pg_aggregate " "WHERE oid = '%u'::oid", @@ -11664,20 +11694,32 @@ dumpAgg(Archive *fout, AggInfo *agginfo) i_aggtransfn = PQfnumber(res, "aggtransfn"); i_aggfinalfn = PQfnumber(res, "aggfinalfn"); + i_aggmtransfn = PQfnumber(res, "aggmtransfn"); + i_aggminvtransfn = PQfnumber(res, "aggminvtransfn"); + i_aggmfinalfn = PQfnumber(res, "aggmfinalfn"); i_aggsortop = PQfnumber(res, "aggsortop"); i_hypothetical = PQfnumber(res, "hypothetical"); i_aggtranstype = PQfnumber(res, "aggtranstype"); i_aggtransspace = PQfnumber(res, "aggtransspace"); + i_aggmtranstype = PQfnumber(res, "aggmtranstype"); + i_aggmtransspace = PQfnumber(res, "aggmtransspace"); i_agginitval = PQfnumber(res, "agginitval"); + i_aggminitval = PQfnumber(res, "aggminitval"); i_convertok = PQfnumber(res, "convertok"); aggtransfn = PQgetvalue(res, 0, i_aggtransfn); aggfinalfn = PQgetvalue(res, 0, i_aggfinalfn); + aggmtransfn = PQgetvalue(res, 0, i_aggmtransfn); + aggminvtransfn = PQgetvalue(res, 0, i_aggminvtransfn); + aggmfinalfn = PQgetvalue(res, 0, i_aggmfinalfn); aggsortop = PQgetvalue(res, 0, i_aggsortop); hypothetical = (PQgetvalue(res, 0, i_hypothetical)[0] == 't'); aggtranstype = PQgetvalue(res, 0, i_aggtranstype); aggtransspace = PQgetvalue(res, 0, i_aggtransspace); + aggmtranstype = PQgetvalue(res, 0, i_aggmtranstype); + aggmtransspace = PQgetvalue(res, 0, i_aggmtransspace); agginitval = PQgetvalue(res, 0, i_agginitval); + aggminitval = PQgetvalue(res, 0, i_aggminitval); convertok = (PQgetvalue(res, 0, i_convertok)[0] == 't'); if (fout->remoteVersion >= 80400) @@ -11751,6 +11793,32 @@ dumpAgg(Archive *fout, AggInfo *agginfo) aggfinalfn); } + if (strcmp(aggmtransfn, "-") != 0) + { + appendPQExpBuffer(details, ",\n MSFUNC = %s,\n MINVFUNC = %s,\n MSTYPE = %s", + aggmtransfn, + aggminvtransfn, + aggmtranstype); + } + + if (strcmp(aggmtransspace, "0") != 0) + { + appendPQExpBuffer(details, ",\n MSSPACE = %s", + aggmtransspace); + } + + if (!PQgetisnull(res, 0, i_aggminitval)) + { + appendPQExpBufferStr(details, ",\n MINITCOND = "); + appendStringLiteralAH(details, aggminitval, fout); + } + + if (strcmp(aggmfinalfn, "-") != 0) + { + appendPQExpBuffer(details, ",\n MFINALFUNC = %s", + aggmfinalfn); + } + aggsortconvop = convertOperatorReference(fout, aggsortop); if (aggsortconvop) { diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index fe6144e2d3..2fb0ce8656 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -53,6 +53,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 201404082 +#define CATALOG_VERSION_NO 201404121 #endif diff --git a/src/include/catalog/pg_aggregate.h b/src/include/catalog/pg_aggregate.h index f189998597..3cb0d754e7 100644 --- a/src/include/catalog/pg_aggregate.h +++ b/src/include/catalog/pg_aggregate.h @@ -32,10 +32,16 @@ * aggnumdirectargs number of arguments that are "direct" arguments * aggtransfn transition function * aggfinalfn final function (0 if none) + * aggmtransfn forward function for moving-aggregate mode (0 if none) + * aggminvtransfn inverse function for moving-aggregate mode (0 if none) + * aggmfinalfn final function for moving-aggregate mode (0 if none) * aggsortop associated sort operator (0 if none) * aggtranstype type of aggregate's transition (state) data * aggtransspace estimated size of state data (0 for default estimate) + * aggmtranstype type of moving-aggregate state data (0 if none) + * aggmtransspace estimated size of moving-agg state (0 for default est) * agginitval initial value for transition state (can be NULL) + * aggminitval initial value for moving-agg state (can be NULL) * ---------------------------------------------------------------- */ #define AggregateRelationId 2600 @@ -47,12 +53,18 @@ CATALOG(pg_aggregate,2600) BKI_WITHOUT_OIDS int16 aggnumdirectargs; regproc aggtransfn; regproc aggfinalfn; + regproc aggmtransfn; + regproc aggminvtransfn; + regproc aggmfinalfn; Oid aggsortop; Oid aggtranstype; int32 aggtransspace; + Oid aggmtranstype; + int32 aggmtransspace; #ifdef CATALOG_VARLEN /* variable-length fields start here */ text agginitval; + text aggminitval; #endif } FormData_pg_aggregate; @@ -68,16 +80,22 @@ typedef FormData_pg_aggregate *Form_pg_aggregate; * ---------------- */ -#define Natts_pg_aggregate 9 +#define Natts_pg_aggregate 15 #define Anum_pg_aggregate_aggfnoid 1 #define Anum_pg_aggregate_aggkind 2 #define Anum_pg_aggregate_aggnumdirectargs 3 #define Anum_pg_aggregate_aggtransfn 4 #define Anum_pg_aggregate_aggfinalfn 5 -#define Anum_pg_aggregate_aggsortop 6 -#define Anum_pg_aggregate_aggtranstype 7 -#define Anum_pg_aggregate_aggtransspace 8 -#define Anum_pg_aggregate_agginitval 9 +#define Anum_pg_aggregate_aggmtransfn 6 +#define Anum_pg_aggregate_aggminvtransfn 7 +#define Anum_pg_aggregate_aggmfinalfn 8 +#define Anum_pg_aggregate_aggsortop 9 +#define Anum_pg_aggregate_aggtranstype 10 +#define Anum_pg_aggregate_aggtransspace 11 +#define Anum_pg_aggregate_aggmtranstype 12 +#define Anum_pg_aggregate_aggmtransspace 13 +#define Anum_pg_aggregate_agginitval 14 +#define Anum_pg_aggregate_aggminitval 15 /* * Symbolic values for aggkind column. We distinguish normal aggregates @@ -101,177 +119,177 @@ typedef FormData_pg_aggregate *Form_pg_aggregate; */ /* avg */ -DATA(insert ( 2100 n 0 int8_avg_accum numeric_avg 0 2281 128 _null_ )); -DATA(insert ( 2101 n 0 int4_avg_accum int8_avg 0 1016 0 "{0,0}" )); -DATA(insert ( 2102 n 0 int2_avg_accum int8_avg 0 1016 0 "{0,0}" )); -DATA(insert ( 2103 n 0 numeric_avg_accum numeric_avg 0 2281 128 _null_ )); -DATA(insert ( 2104 n 0 float4_accum float8_avg 0 1022 0 "{0,0,0}" )); -DATA(insert ( 2105 n 0 float8_accum float8_avg 0 1022 0 "{0,0,0}" )); -DATA(insert ( 2106 n 0 interval_accum interval_avg 0 1187 0 "{0 second,0 second}" )); +DATA(insert ( 2100 n 0 int8_avg_accum numeric_avg - - - 0 2281 128 0 0 _null_ _null_ )); +DATA(insert ( 2101 n 0 int4_avg_accum int8_avg - - - 0 1016 0 0 0 "{0,0}" _null_ )); +DATA(insert ( 2102 n 0 int2_avg_accum int8_avg - - - 0 1016 0 0 0 "{0,0}" _null_ )); +DATA(insert ( 2103 n 0 numeric_avg_accum numeric_avg - - - 0 2281 128 0 0 _null_ _null_ )); +DATA(insert ( 2104 n 0 float4_accum float8_avg - - - 0 1022 0 0 0 "{0,0,0}" _null_ )); +DATA(insert ( 2105 n 0 float8_accum float8_avg - - - 0 1022 0 0 0 "{0,0,0}" _null_ )); +DATA(insert ( 2106 n 0 interval_accum interval_avg - - - 0 1187 0 0 0 "{0 second,0 second}" _null_ )); /* sum */ -DATA(insert ( 2107 n 0 int8_avg_accum numeric_sum 0 2281 128 _null_ )); -DATA(insert ( 2108 n 0 int4_sum - 0 20 0 _null_ )); -DATA(insert ( 2109 n 0 int2_sum - 0 20 0 _null_ )); -DATA(insert ( 2110 n 0 float4pl - 0 700 0 _null_ )); -DATA(insert ( 2111 n 0 float8pl - 0 701 0 _null_ )); -DATA(insert ( 2112 n 0 cash_pl - 0 790 0 _null_ )); -DATA(insert ( 2113 n 0 interval_pl - 0 1186 0 _null_ )); -DATA(insert ( 2114 n 0 numeric_avg_accum numeric_sum 0 2281 128 _null_ )); +DATA(insert ( 2107 n 0 int8_avg_accum numeric_sum - - - 0 2281 128 0 0 _null_ _null_ )); +DATA(insert ( 2108 n 0 int4_sum - - - - 0 20 0 0 0 _null_ _null_ )); +DATA(insert ( 2109 n 0 int2_sum - - - - 0 20 0 0 0 _null_ _null_ )); +DATA(insert ( 2110 n 0 float4pl - - - - 0 700 0 0 0 _null_ _null_ )); +DATA(insert ( 2111 n 0 float8pl - - - - 0 701 0 0 0 _null_ _null_ )); +DATA(insert ( 2112 n 0 cash_pl - - - - 0 790 0 0 0 _null_ _null_ )); +DATA(insert ( 2113 n 0 interval_pl - - - - 0 1186 0 0 0 _null_ _null_ )); +DATA(insert ( 2114 n 0 numeric_avg_accum numeric_sum - - - 0 2281 128 0 0 _null_ _null_ )); /* max */ -DATA(insert ( 2115 n 0 int8larger - 413 20 0 _null_ )); -DATA(insert ( 2116 n 0 int4larger - 521 23 0 _null_ )); -DATA(insert ( 2117 n 0 int2larger - 520 21 0 _null_ )); -DATA(insert ( 2118 n 0 oidlarger - 610 26 0 _null_ )); -DATA(insert ( 2119 n 0 float4larger - 623 700 0 _null_ )); -DATA(insert ( 2120 n 0 float8larger - 674 701 0 _null_ )); -DATA(insert ( 2121 n 0 int4larger - 563 702 0 _null_ )); -DATA(insert ( 2122 n 0 date_larger - 1097 1082 0 _null_ )); -DATA(insert ( 2123 n 0 time_larger - 1112 1083 0 _null_ )); -DATA(insert ( 2124 n 0 timetz_larger - 1554 1266 0 _null_ )); -DATA(insert ( 2125 n 0 cashlarger - 903 790 0 _null_ )); -DATA(insert ( 2126 n 0 timestamp_larger - 2064 1114 0 _null_ )); -DATA(insert ( 2127 n 0 timestamptz_larger - 1324 1184 0 _null_ )); -DATA(insert ( 2128 n 0 interval_larger - 1334 1186 0 _null_ )); -DATA(insert ( 2129 n 0 text_larger - 666 25 0 _null_ )); -DATA(insert ( 2130 n 0 numeric_larger - 1756 1700 0 _null_ )); -DATA(insert ( 2050 n 0 array_larger - 1073 2277 0 _null_ )); -DATA(insert ( 2244 n 0 bpchar_larger - 1060 1042 0 _null_ )); -DATA(insert ( 2797 n 0 tidlarger - 2800 27 0 _null_ )); -DATA(insert ( 3526 n 0 enum_larger - 3519 3500 0 _null_ )); +DATA(insert ( 2115 n 0 int8larger - - - - 413 20 0 0 0 _null_ _null_ )); +DATA(insert ( 2116 n 0 int4larger - - - - 521 23 0 0 0 _null_ _null_ )); +DATA(insert ( 2117 n 0 int2larger - - - - 520 21 0 0 0 _null_ _null_ )); +DATA(insert ( 2118 n 0 oidlarger - - - - 610 26 0 0 0 _null_ _null_ )); +DATA(insert ( 2119 n 0 float4larger - - - - 623 700 0 0 0 _null_ _null_ )); +DATA(insert ( 2120 n 0 float8larger - - - - 674 701 0 0 0 _null_ _null_ )); +DATA(insert ( 2121 n 0 int4larger - - - - 563 702 0 0 0 _null_ _null_ )); +DATA(insert ( 2122 n 0 date_larger - - - - 1097 1082 0 0 0 _null_ _null_ )); +DATA(insert ( 2123 n 0 time_larger - - - - 1112 1083 0 0 0 _null_ _null_ )); +DATA(insert ( 2124 n 0 timetz_larger - - - - 1554 1266 0 0 0 _null_ _null_ )); +DATA(insert ( 2125 n 0 cashlarger - - - - 903 790 0 0 0 _null_ _null_ )); +DATA(insert ( 2126 n 0 timestamp_larger - - - - 2064 1114 0 0 0 _null_ _null_ )); +DATA(insert ( 2127 n 0 timestamptz_larger - - - - 1324 1184 0 0 0 _null_ _null_ )); +DATA(insert ( 2128 n 0 interval_larger - - - - 1334 1186 0 0 0 _null_ _null_ )); +DATA(insert ( 2129 n 0 text_larger - - - - 666 25 0 0 0 _null_ _null_ )); +DATA(insert ( 2130 n 0 numeric_larger - - - - 1756 1700 0 0 0 _null_ _null_ )); +DATA(insert ( 2050 n 0 array_larger - - - - 1073 2277 0 0 0 _null_ _null_ )); +DATA(insert ( 2244 n 0 bpchar_larger - - - - 1060 1042 0 0 0 _null_ _null_ )); +DATA(insert ( 2797 n 0 tidlarger - - - - 2800 27 0 0 0 _null_ _null_ )); +DATA(insert ( 3526 n 0 enum_larger - - - - 3519 3500 0 0 0 _null_ _null_ )); /* min */ -DATA(insert ( 2131 n 0 int8smaller - 412 20 0 _null_ )); -DATA(insert ( 2132 n 0 int4smaller - 97 23 0 _null_ )); -DATA(insert ( 2133 n 0 int2smaller - 95 21 0 _null_ )); -DATA(insert ( 2134 n 0 oidsmaller - 609 26 0 _null_ )); -DATA(insert ( 2135 n 0 float4smaller - 622 700 0 _null_ )); -DATA(insert ( 2136 n 0 float8smaller - 672 701 0 _null_ )); -DATA(insert ( 2137 n 0 int4smaller - 562 702 0 _null_ )); -DATA(insert ( 2138 n 0 date_smaller - 1095 1082 0 _null_ )); -DATA(insert ( 2139 n 0 time_smaller - 1110 1083 0 _null_ )); -DATA(insert ( 2140 n 0 timetz_smaller - 1552 1266 0 _null_ )); -DATA(insert ( 2141 n 0 cashsmaller - 902 790 0 _null_ )); -DATA(insert ( 2142 n 0 timestamp_smaller - 2062 1114 0 _null_ )); -DATA(insert ( 2143 n 0 timestamptz_smaller - 1322 1184 0 _null_ )); -DATA(insert ( 2144 n 0 interval_smaller - 1332 1186 0 _null_ )); -DATA(insert ( 2145 n 0 text_smaller - 664 25 0 _null_ )); -DATA(insert ( 2146 n 0 numeric_smaller - 1754 1700 0 _null_ )); -DATA(insert ( 2051 n 0 array_smaller - 1072 2277 0 _null_ )); -DATA(insert ( 2245 n 0 bpchar_smaller - 1058 1042 0 _null_ )); -DATA(insert ( 2798 n 0 tidsmaller - 2799 27 0 _null_ )); -DATA(insert ( 3527 n 0 enum_smaller - 3518 3500 0 _null_ )); +DATA(insert ( 2131 n 0 int8smaller - - - - 412 20 0 0 0 _null_ _null_ )); +DATA(insert ( 2132 n 0 int4smaller - - - - 97 23 0 0 0 _null_ _null_ )); +DATA(insert ( 2133 n 0 int2smaller - - - - 95 21 0 0 0 _null_ _null_ )); +DATA(insert ( 2134 n 0 oidsmaller - - - - 609 26 0 0 0 _null_ _null_ )); +DATA(insert ( 2135 n 0 float4smaller - - - - 622 700 0 0 0 _null_ _null_ )); +DATA(insert ( 2136 n 0 float8smaller - - - - 672 701 0 0 0 _null_ _null_ )); +DATA(insert ( 2137 n 0 int4smaller - - - - 562 702 0 0 0 _null_ _null_ )); +DATA(insert ( 2138 n 0 date_smaller - - - - 1095 1082 0 0 0 _null_ _null_ )); +DATA(insert ( 2139 n 0 time_smaller - - - - 1110 1083 0 0 0 _null_ _null_ )); +DATA(insert ( 2140 n 0 timetz_smaller - - - - 1552 1266 0 0 0 _null_ _null_ )); +DATA(insert ( 2141 n 0 cashsmaller - - - - 902 790 0 0 0 _null_ _null_ )); +DATA(insert ( 2142 n 0 timestamp_smaller - - - - 2062 1114 0 0 0 _null_ _null_ )); +DATA(insert ( 2143 n 0 timestamptz_smaller - - - - 1322 1184 0 0 0 _null_ _null_ )); +DATA(insert ( 2144 n 0 interval_smaller - - - - 1332 1186 0 0 0 _null_ _null_ )); +DATA(insert ( 2145 n 0 text_smaller - - - - 664 25 0 0 0 _null_ _null_ )); +DATA(insert ( 2146 n 0 numeric_smaller - - - - 1754 1700 0 0 0 _null_ _null_ )); +DATA(insert ( 2051 n 0 array_smaller - - - - 1072 2277 0 0 0 _null_ _null_ )); +DATA(insert ( 2245 n 0 bpchar_smaller - - - - 1058 1042 0 0 0 _null_ _null_ )); +DATA(insert ( 2798 n 0 tidsmaller - - - - 2799 27 0 0 0 _null_ _null_ )); +DATA(insert ( 3527 n 0 enum_smaller - - - - 3518 3500 0 0 0 _null_ _null_ )); /* count */ -DATA(insert ( 2147 n 0 int8inc_any - 0 20 0 "0" )); -DATA(insert ( 2803 n 0 int8inc - 0 20 0 "0" )); +DATA(insert ( 2147 n 0 int8inc_any - - - - 0 20 0 0 0 "0" _null_ )); +DATA(insert ( 2803 n 0 int8inc - - - - 0 20 0 0 0 "0" _null_ )); /* var_pop */ -DATA(insert ( 2718 n 0 int8_accum numeric_var_pop 0 2281 128 _null_ )); -DATA(insert ( 2719 n 0 int4_accum numeric_var_pop 0 2281 128 _null_ )); -DATA(insert ( 2720 n 0 int2_accum numeric_var_pop 0 2281 128 _null_ )); -DATA(insert ( 2721 n 0 float4_accum float8_var_pop 0 1022 0 "{0,0,0}" )); -DATA(insert ( 2722 n 0 float8_accum float8_var_pop 0 1022 0 "{0,0,0}" )); -DATA(insert ( 2723 n 0 numeric_accum numeric_var_pop 0 2281 128 _null_ )); +DATA(insert ( 2718 n 0 int8_accum numeric_var_pop - - - 0 2281 128 0 0 _null_ _null_ )); +DATA(insert ( 2719 n 0 int4_accum numeric_var_pop - - - 0 2281 128 0 0 _null_ _null_ )); +DATA(insert ( 2720 n 0 int2_accum numeric_var_pop - - - 0 2281 128 0 0 _null_ _null_ )); +DATA(insert ( 2721 n 0 float4_accum float8_var_pop - - - 0 1022 0 0 0 "{0,0,0}" _null_ )); +DATA(insert ( 2722 n 0 float8_accum float8_var_pop - - - 0 1022 0 0 0 "{0,0,0}" _null_ )); +DATA(insert ( 2723 n 0 numeric_accum numeric_var_pop - - - 0 2281 128 0 0 _null_ _null_ )); /* var_samp */ -DATA(insert ( 2641 n 0 int8_accum numeric_var_samp 0 2281 128 _null_ )); -DATA(insert ( 2642 n 0 int4_accum numeric_var_samp 0 2281 128 _null_ )); -DATA(insert ( 2643 n 0 int2_accum numeric_var_samp 0 2281 128 _null_ )); -DATA(insert ( 2644 n 0 float4_accum float8_var_samp 0 1022 0 "{0,0,0}" )); -DATA(insert ( 2645 n 0 float8_accum float8_var_samp 0 1022 0 "{0,0,0}" )); -DATA(insert ( 2646 n 0 numeric_accum numeric_var_samp 0 2281 128 _null_ )); +DATA(insert ( 2641 n 0 int8_accum numeric_var_samp - - - 0 2281 128 0 0 _null_ _null_ )); +DATA(insert ( 2642 n 0 int4_accum numeric_var_samp - - - 0 2281 128 0 0 _null_ _null_ )); +DATA(insert ( 2643 n 0 int2_accum numeric_var_samp - - - 0 2281 128 0 0 _null_ _null_ )); +DATA(insert ( 2644 n 0 float4_accum float8_var_samp - - - 0 1022 0 0 0 "{0,0,0}" _null_ )); +DATA(insert ( 2645 n 0 float8_accum float8_var_samp - - - 0 1022 0 0 0 "{0,0,0}" _null_ )); +DATA(insert ( 2646 n 0 numeric_accum numeric_var_samp - - - 0 2281 128 0 0 _null_ _null_ )); /* variance: historical Postgres syntax for var_samp */ -DATA(insert ( 2148 n 0 int8_accum numeric_var_samp 0 2281 128 _null_ )); -DATA(insert ( 2149 n 0 int4_accum numeric_var_samp 0 2281 128 _null_ )); -DATA(insert ( 2150 n 0 int2_accum numeric_var_samp 0 2281 128 _null_ )); -DATA(insert ( 2151 n 0 float4_accum float8_var_samp 0 1022 0 "{0,0,0}" )); -DATA(insert ( 2152 n 0 float8_accum float8_var_samp 0 1022 0 "{0,0,0}" )); -DATA(insert ( 2153 n 0 numeric_accum numeric_var_samp 0 2281 128 _null_ )); +DATA(insert ( 2148 n 0 int8_accum numeric_var_samp - - - 0 2281 128 0 0 _null_ _null_ )); +DATA(insert ( 2149 n 0 int4_accum numeric_var_samp - - - 0 2281 128 0 0 _null_ _null_ )); +DATA(insert ( 2150 n 0 int2_accum numeric_var_samp - - - 0 2281 128 0 0 _null_ _null_ )); +DATA(insert ( 2151 n 0 float4_accum float8_var_samp - - - 0 1022 0 0 0 "{0,0,0}" _null_ )); +DATA(insert ( 2152 n 0 float8_accum float8_var_samp - - - 0 1022 0 0 0 "{0,0,0}" _null_ )); +DATA(insert ( 2153 n 0 numeric_accum numeric_var_samp - - - 0 2281 128 0 0 _null_ _null_ )); /* stddev_pop */ -DATA(insert ( 2724 n 0 int8_accum numeric_stddev_pop 0 2281 128 _null_ )); -DATA(insert ( 2725 n 0 int4_accum numeric_stddev_pop 0 2281 128 _null_ )); -DATA(insert ( 2726 n 0 int2_accum numeric_stddev_pop 0 2281 128 _null_ )); -DATA(insert ( 2727 n 0 float4_accum float8_stddev_pop 0 1022 0 "{0,0,0}" )); -DATA(insert ( 2728 n 0 float8_accum float8_stddev_pop 0 1022 0 "{0,0,0}" )); -DATA(insert ( 2729 n 0 numeric_accum numeric_stddev_pop 0 2281 128 _null_ )); +DATA(insert ( 2724 n 0 int8_accum numeric_stddev_pop - - - 0 2281 128 0 0 _null_ _null_ )); +DATA(insert ( 2725 n 0 int4_accum numeric_stddev_pop - - - 0 2281 128 0 0 _null_ _null_ )); +DATA(insert ( 2726 n 0 int2_accum numeric_stddev_pop - - - 0 2281 128 0 0 _null_ _null_ )); +DATA(insert ( 2727 n 0 float4_accum float8_stddev_pop - - - 0 1022 0 0 0 "{0,0,0}" _null_ )); +DATA(insert ( 2728 n 0 float8_accum float8_stddev_pop - - - 0 1022 0 0 0 "{0,0,0}" _null_ )); +DATA(insert ( 2729 n 0 numeric_accum numeric_stddev_pop - - - 0 2281 128 0 0 _null_ _null_ )); /* stddev_samp */ -DATA(insert ( 2712 n 0 int8_accum numeric_stddev_samp 0 2281 128 _null_ )); -DATA(insert ( 2713 n 0 int4_accum numeric_stddev_samp 0 2281 128 _null_ )); -DATA(insert ( 2714 n 0 int2_accum numeric_stddev_samp 0 2281 128 _null_ )); -DATA(insert ( 2715 n 0 float4_accum float8_stddev_samp 0 1022 0 "{0,0,0}" )); -DATA(insert ( 2716 n 0 float8_accum float8_stddev_samp 0 1022 0 "{0,0,0}" )); -DATA(insert ( 2717 n 0 numeric_accum numeric_stddev_samp 0 2281 128 _null_ )); +DATA(insert ( 2712 n 0 int8_accum numeric_stddev_samp - - - 0 2281 128 0 0 _null_ _null_ )); +DATA(insert ( 2713 n 0 int4_accum numeric_stddev_samp - - - 0 2281 128 0 0 _null_ _null_ )); +DATA(insert ( 2714 n 0 int2_accum numeric_stddev_samp - - - 0 2281 128 0 0 _null_ _null_ )); +DATA(insert ( 2715 n 0 float4_accum float8_stddev_samp - - - 0 1022 0 0 0 "{0,0,0}" _null_ )); +DATA(insert ( 2716 n 0 float8_accum float8_stddev_samp - - - 0 1022 0 0 0 "{0,0,0}" _null_ )); +DATA(insert ( 2717 n 0 numeric_accum numeric_stddev_samp - - - 0 2281 128 0 0 _null_ _null_ )); /* stddev: historical Postgres syntax for stddev_samp */ -DATA(insert ( 2154 n 0 int8_accum numeric_stddev_samp 0 2281 128 _null_ )); -DATA(insert ( 2155 n 0 int4_accum numeric_stddev_samp 0 2281 128 _null_ )); -DATA(insert ( 2156 n 0 int2_accum numeric_stddev_samp 0 2281 128 _null_ )); -DATA(insert ( 2157 n 0 float4_accum float8_stddev_samp 0 1022 0 "{0,0,0}" )); -DATA(insert ( 2158 n 0 float8_accum float8_stddev_samp 0 1022 0 "{0,0,0}" )); -DATA(insert ( 2159 n 0 numeric_accum numeric_stddev_samp 0 2281 128 _null_ )); +DATA(insert ( 2154 n 0 int8_accum numeric_stddev_samp - - - 0 2281 128 0 0 _null_ _null_ )); +DATA(insert ( 2155 n 0 int4_accum numeric_stddev_samp - - - 0 2281 128 0 0 _null_ _null_ )); +DATA(insert ( 2156 n 0 int2_accum numeric_stddev_samp - - - 0 2281 128 0 0 _null_ _null_ )); +DATA(insert ( 2157 n 0 float4_accum float8_stddev_samp - - - 0 1022 0 0 0 "{0,0,0}" _null_ )); +DATA(insert ( 2158 n 0 float8_accum float8_stddev_samp - - - 0 1022 0 0 0 "{0,0,0}" _null_ )); +DATA(insert ( 2159 n 0 numeric_accum numeric_stddev_samp - - - 0 2281 128 0 0 _null_ _null_ )); /* SQL2003 binary regression aggregates */ -DATA(insert ( 2818 n 0 int8inc_float8_float8 - 0 20 0 "0" )); -DATA(insert ( 2819 n 0 float8_regr_accum float8_regr_sxx 0 1022 0 "{0,0,0,0,0,0}" )); -DATA(insert ( 2820 n 0 float8_regr_accum float8_regr_syy 0 1022 0 "{0,0,0,0,0,0}" )); -DATA(insert ( 2821 n 0 float8_regr_accum float8_regr_sxy 0 1022 0 "{0,0,0,0,0,0}" )); -DATA(insert ( 2822 n 0 float8_regr_accum float8_regr_avgx 0 1022 0 "{0,0,0,0,0,0}" )); -DATA(insert ( 2823 n 0 float8_regr_accum float8_regr_avgy 0 1022 0 "{0,0,0,0,0,0}" )); -DATA(insert ( 2824 n 0 float8_regr_accum float8_regr_r2 0 1022 0 "{0,0,0,0,0,0}" )); -DATA(insert ( 2825 n 0 float8_regr_accum float8_regr_slope 0 1022 0 "{0,0,0,0,0,0}" )); -DATA(insert ( 2826 n 0 float8_regr_accum float8_regr_intercept 0 1022 0 "{0,0,0,0,0,0}" )); -DATA(insert ( 2827 n 0 float8_regr_accum float8_covar_pop 0 1022 0 "{0,0,0,0,0,0}" )); -DATA(insert ( 2828 n 0 float8_regr_accum float8_covar_samp 0 1022 0 "{0,0,0,0,0,0}" )); -DATA(insert ( 2829 n 0 float8_regr_accum float8_corr 0 1022 0 "{0,0,0,0,0,0}" )); +DATA(insert ( 2818 n 0 int8inc_float8_float8 - - - - 0 20 0 0 0 "0" _null_ )); +DATA(insert ( 2819 n 0 float8_regr_accum float8_regr_sxx - - - 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ )); +DATA(insert ( 2820 n 0 float8_regr_accum float8_regr_syy - - - 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ )); +DATA(insert ( 2821 n 0 float8_regr_accum float8_regr_sxy - - - 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ )); +DATA(insert ( 2822 n 0 float8_regr_accum float8_regr_avgx - - - 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ )); +DATA(insert ( 2823 n 0 float8_regr_accum float8_regr_avgy - - - 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ )); +DATA(insert ( 2824 n 0 float8_regr_accum float8_regr_r2 - - - 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ )); +DATA(insert ( 2825 n 0 float8_regr_accum float8_regr_slope - - - 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ )); +DATA(insert ( 2826 n 0 float8_regr_accum float8_regr_intercept - - - 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ )); +DATA(insert ( 2827 n 0 float8_regr_accum float8_covar_pop - - - 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ )); +DATA(insert ( 2828 n 0 float8_regr_accum float8_covar_samp - - - 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ )); +DATA(insert ( 2829 n 0 float8_regr_accum float8_corr - - - 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ )); /* boolean-and and boolean-or */ -DATA(insert ( 2517 n 0 booland_statefunc - 58 16 0 _null_ )); -DATA(insert ( 2518 n 0 boolor_statefunc - 59 16 0 _null_ )); -DATA(insert ( 2519 n 0 booland_statefunc - 58 16 0 _null_ )); +DATA(insert ( 2517 n 0 booland_statefunc - - - - 58 16 0 0 0 _null_ _null_ )); +DATA(insert ( 2518 n 0 boolor_statefunc - - - - 59 16 0 0 0 _null_ _null_ )); +DATA(insert ( 2519 n 0 booland_statefunc - - - - 58 16 0 0 0 _null_ _null_ )); /* bitwise integer */ -DATA(insert ( 2236 n 0 int2and - 0 21 0 _null_ )); -DATA(insert ( 2237 n 0 int2or - 0 21 0 _null_ )); -DATA(insert ( 2238 n 0 int4and - 0 23 0 _null_ )); -DATA(insert ( 2239 n 0 int4or - 0 23 0 _null_ )); -DATA(insert ( 2240 n 0 int8and - 0 20 0 _null_ )); -DATA(insert ( 2241 n 0 int8or - 0 20 0 _null_ )); -DATA(insert ( 2242 n 0 bitand - 0 1560 0 _null_ )); -DATA(insert ( 2243 n 0 bitor - 0 1560 0 _null_ )); +DATA(insert ( 2236 n 0 int2and - - - - 0 21 0 0 0 _null_ _null_ )); +DATA(insert ( 2237 n 0 int2or - - - - 0 21 0 0 0 _null_ _null_ )); +DATA(insert ( 2238 n 0 int4and - - - - 0 23 0 0 0 _null_ _null_ )); +DATA(insert ( 2239 n 0 int4or - - - - 0 23 0 0 0 _null_ _null_ )); +DATA(insert ( 2240 n 0 int8and - - - - 0 20 0 0 0 _null_ _null_ )); +DATA(insert ( 2241 n 0 int8or - - - - 0 20 0 0 0 _null_ _null_ )); +DATA(insert ( 2242 n 0 bitand - - - - 0 1560 0 0 0 _null_ _null_ )); +DATA(insert ( 2243 n 0 bitor - - - - 0 1560 0 0 0 _null_ _null_ )); /* xml */ -DATA(insert ( 2901 n 0 xmlconcat2 - 0 142 0 _null_ )); +DATA(insert ( 2901 n 0 xmlconcat2 - - - - 0 142 0 0 0 _null_ _null_ )); /* array */ -DATA(insert ( 2335 n 0 array_agg_transfn array_agg_finalfn 0 2281 0 _null_ )); +DATA(insert ( 2335 n 0 array_agg_transfn array_agg_finalfn - - - 0 2281 0 0 0 _null_ _null_ )); /* text */ -DATA(insert ( 3538 n 0 string_agg_transfn string_agg_finalfn 0 2281 0 _null_ )); +DATA(insert ( 3538 n 0 string_agg_transfn string_agg_finalfn - - - 0 2281 0 0 0 _null_ _null_ )); /* bytea */ -DATA(insert ( 3545 n 0 bytea_string_agg_transfn bytea_string_agg_finalfn 0 2281 0 _null_ )); +DATA(insert ( 3545 n 0 bytea_string_agg_transfn bytea_string_agg_finalfn - - - 0 2281 0 0 0 _null_ _null_ )); /* json */ -DATA(insert ( 3175 n 0 json_agg_transfn json_agg_finalfn 0 2281 0 _null_ )); -DATA(insert ( 3197 n 0 json_object_agg_transfn json_object_agg_finalfn 0 2281 0 _null_ )); +DATA(insert ( 3175 n 0 json_agg_transfn json_agg_finalfn - - - 0 2281 0 0 0 _null_ _null_ )); +DATA(insert ( 3197 n 0 json_object_agg_transfn json_object_agg_finalfn - - - 0 2281 0 0 0 _null_ _null_ )); /* ordered-set and hypothetical-set aggregates */ -DATA(insert ( 3972 o 1 ordered_set_transition percentile_disc_final 0 2281 0 _null_ )); -DATA(insert ( 3974 o 1 ordered_set_transition percentile_cont_float8_final 0 2281 0 _null_ )); -DATA(insert ( 3976 o 1 ordered_set_transition percentile_cont_interval_final 0 2281 0 _null_ )); -DATA(insert ( 3978 o 1 ordered_set_transition percentile_disc_multi_final 0 2281 0 _null_ )); -DATA(insert ( 3980 o 1 ordered_set_transition percentile_cont_float8_multi_final 0 2281 0 _null_ )); -DATA(insert ( 3982 o 1 ordered_set_transition percentile_cont_interval_multi_final 0 2281 0 _null_ )); -DATA(insert ( 3984 o 0 ordered_set_transition mode_final 0 2281 0 _null_ )); -DATA(insert ( 3986 h 1 ordered_set_transition_multi rank_final 0 2281 0 _null_ )); -DATA(insert ( 3988 h 1 ordered_set_transition_multi percent_rank_final 0 2281 0 _null_ )); -DATA(insert ( 3990 h 1 ordered_set_transition_multi cume_dist_final 0 2281 0 _null_ )); -DATA(insert ( 3992 h 1 ordered_set_transition_multi dense_rank_final 0 2281 0 _null_ )); +DATA(insert ( 3972 o 1 ordered_set_transition percentile_disc_final - - - 0 2281 0 0 0 _null_ _null_ )); +DATA(insert ( 3974 o 1 ordered_set_transition percentile_cont_float8_final - - - 0 2281 0 0 0 _null_ _null_ )); +DATA(insert ( 3976 o 1 ordered_set_transition percentile_cont_interval_final - - - 0 2281 0 0 0 _null_ _null_ )); +DATA(insert ( 3978 o 1 ordered_set_transition percentile_disc_multi_final - - - 0 2281 0 0 0 _null_ _null_ )); +DATA(insert ( 3980 o 1 ordered_set_transition percentile_cont_float8_multi_final - - - 0 2281 0 0 0 _null_ _null_ )); +DATA(insert ( 3982 o 1 ordered_set_transition percentile_cont_interval_multi_final - - - 0 2281 0 0 0 _null_ _null_ )); +DATA(insert ( 3984 o 0 ordered_set_transition mode_final - - - 0 2281 0 0 0 _null_ _null_ )); +DATA(insert ( 3986 h 1 ordered_set_transition_multi rank_final - - - 0 2281 0 0 0 _null_ _null_ )); +DATA(insert ( 3988 h 1 ordered_set_transition_multi percent_rank_final - - - 0 2281 0 0 0 _null_ _null_ )); +DATA(insert ( 3990 h 1 ordered_set_transition_multi cume_dist_final - - - 0 2281 0 0 0 _null_ _null_ )); +DATA(insert ( 3992 h 1 ordered_set_transition_multi dense_rank_final - - - 0 2281 0 0 0 _null_ _null_ )); /* @@ -290,9 +308,15 @@ extern Oid AggregateCreate(const char *aggName, Oid variadicArgType, List *aggtransfnName, List *aggfinalfnName, + List *aggmtransfnName, + List *aggminvtransfnName, + List *aggmfinalfnName, List *aggsortopName, Oid aggTransType, int32 aggTransSpace, - const char *agginitval); + Oid aggmTransType, + int32 aggmTransSpace, + const char *agginitval, + const char *aggminitval); #endif /* PG_AGGREGATE_H */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index a301a08fba..6c94e8a7ae 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1762,7 +1762,8 @@ typedef struct WindowAggState Datum endOffsetValue; /* result of endOffset evaluation */ MemoryContext partcontext; /* context for partition-lifespan data */ - MemoryContext aggcontext; /* context for each aggregate data */ + MemoryContext aggcontext; /* shared context for aggregate working data */ + MemoryContext curaggcontext; /* current aggregate's working data */ ExprContext *tmpcontext; /* short-term evaluation context */ bool all_first; /* true if the scan is starting */ diff --git a/src/include/parser/parse_agg.h b/src/include/parser/parse_agg.h index 8faf991a09..938d408bb7 100644 --- a/src/include/parser/parse_agg.h +++ b/src/include/parser/parse_agg.h @@ -39,8 +39,10 @@ extern void build_aggregate_fnexprs(Oid *agg_input_types, Oid agg_result_type, Oid agg_input_collation, Oid transfn_oid, + Oid invtransfn_oid, Oid finalfn_oid, Expr **transfnexpr, + Expr **invtransfnexpr, Expr **finalfnexpr); #endif /* PARSE_AGG_H */ diff --git a/src/test/regress/expected/create_aggregate.out b/src/test/regress/expected/create_aggregate.out index ca908d91f4..a547ca535f 100644 --- a/src/test/regress/expected/create_aggregate.out +++ b/src/test/regress/expected/create_aggregate.out @@ -90,3 +90,38 @@ alter aggregate my_rank(VARIADIC "any" ORDER BY VARIADIC "any") public | test_rank | bigint | VARIADIC "any" ORDER BY VARIADIC "any" | (2 rows) +-- moving-aggregate options +CREATE AGGREGATE sumdouble (float8) +( + stype = float8, + sfunc = float8pl, + mstype = float8, + msfunc = float8pl, + minvfunc = float8mi +); +-- invalid: nonstrict inverse with strict forward function +CREATE FUNCTION float8mi_n(float8, float8) RETURNS float8 AS +$$ SELECT $1 - $2; $$ +LANGUAGE SQL; +CREATE AGGREGATE invalidsumdouble (float8) +( + stype = float8, + sfunc = float8pl, + mstype = float8, + msfunc = float8pl, + minvfunc = float8mi_n +); +ERROR: strictness of aggregate's forward and inverse transition functions must match +-- invalid: non-matching result types +CREATE FUNCTION float8mi_int(float8, float8) RETURNS int AS +$$ SELECT CAST($1 - $2 AS INT); $$ +LANGUAGE SQL; +CREATE AGGREGATE wrongreturntype (float8) +( + stype = float8, + sfunc = float8pl, + mstype = float8, + msfunc = float8pl, + minvfunc = float8mi_int +); +ERROR: return type of inverse transition function float8mi_int is not double precision diff --git a/src/test/regress/expected/opr_sanity.out b/src/test/regress/expected/opr_sanity.out index 118f7e43dc..93ff18d589 100644 --- a/src/test/regress/expected/opr_sanity.out +++ b/src/test/regress/expected/opr_sanity.out @@ -735,7 +735,7 @@ WHERE aggfnoid = 0 OR aggtransfn = 0 OR aggkind NOT IN ('n', 'o', 'h') OR aggnumdirectargs < 0 OR (aggkind = 'n' AND aggnumdirectargs > 0) OR - aggtranstype = 0 OR aggtransspace < 0; + aggtranstype = 0 OR aggtransspace < 0 OR aggmtransspace < 0; ctid | aggfnoid ------+---------- (0 rows) @@ -827,6 +827,126 @@ WHERE a.aggfnoid = p.oid AND ----------+---------+-----+--------- (0 rows) +-- Check for inconsistent specifications of moving-aggregate columns. +SELECT ctid, aggfnoid::oid +FROM pg_aggregate as p1 +WHERE aggmtranstype != 0 AND + (aggmtransfn = 0 OR aggminvtransfn = 0); + ctid | aggfnoid +------+---------- +(0 rows) + +SELECT ctid, aggfnoid::oid +FROM pg_aggregate as p1 +WHERE aggmtranstype = 0 AND + (aggmtransfn != 0 OR aggminvtransfn != 0 OR aggmfinalfn != 0 OR + aggmtransspace != 0 OR aggminitval IS NOT NULL); + ctid | aggfnoid +------+---------- +(0 rows) + +-- If there is no mfinalfn then the output type must be the mtranstype. +SELECT a.aggfnoid::oid, p.proname +FROM pg_aggregate as a, pg_proc as p +WHERE a.aggfnoid = p.oid AND + a.aggmtransfn != 0 AND + a.aggmfinalfn = 0 AND p.prorettype != a.aggmtranstype; + aggfnoid | proname +----------+--------- +(0 rows) + +-- Cross-check mtransfn (if present) against its entry in pg_proc. +SELECT a.aggfnoid::oid, p.proname, ptr.oid, ptr.proname +FROM pg_aggregate AS a, pg_proc AS p, pg_proc AS ptr +WHERE a.aggfnoid = p.oid AND + a.aggmtransfn = ptr.oid AND + (ptr.proretset + OR NOT (ptr.pronargs = + CASE WHEN a.aggkind = 'n' THEN p.pronargs + 1 + ELSE greatest(p.pronargs - a.aggnumdirectargs, 1) + 1 END) + OR NOT physically_coercible(ptr.prorettype, a.aggmtranstype) + OR NOT physically_coercible(a.aggmtranstype, ptr.proargtypes[0]) + OR (p.pronargs > 0 AND + NOT physically_coercible(p.proargtypes[0], ptr.proargtypes[1])) + OR (p.pronargs > 1 AND + NOT physically_coercible(p.proargtypes[1], ptr.proargtypes[2])) + OR (p.pronargs > 2 AND + NOT physically_coercible(p.proargtypes[2], ptr.proargtypes[3])) + -- we could carry the check further, but 3 args is enough for now + ); + aggfnoid | proname | oid | proname +----------+---------+-----+--------- +(0 rows) + +-- Cross-check minvtransfn (if present) against its entry in pg_proc. +SELECT a.aggfnoid::oid, p.proname, ptr.oid, ptr.proname +FROM pg_aggregate AS a, pg_proc AS p, pg_proc AS ptr +WHERE a.aggfnoid = p.oid AND + a.aggminvtransfn = ptr.oid AND + (ptr.proretset + OR NOT (ptr.pronargs = + CASE WHEN a.aggkind = 'n' THEN p.pronargs + 1 + ELSE greatest(p.pronargs - a.aggnumdirectargs, 1) + 1 END) + OR NOT physically_coercible(ptr.prorettype, a.aggmtranstype) + OR NOT physically_coercible(a.aggmtranstype, ptr.proargtypes[0]) + OR (p.pronargs > 0 AND + NOT physically_coercible(p.proargtypes[0], ptr.proargtypes[1])) + OR (p.pronargs > 1 AND + NOT physically_coercible(p.proargtypes[1], ptr.proargtypes[2])) + OR (p.pronargs > 2 AND + NOT physically_coercible(p.proargtypes[2], ptr.proargtypes[3])) + -- we could carry the check further, but 3 args is enough for now + ); + aggfnoid | proname | oid | proname +----------+---------+-----+--------- +(0 rows) + +-- Cross-check mfinalfn (if present) against its entry in pg_proc. +SELECT a.aggfnoid::oid, p.proname, pfn.oid, pfn.proname +FROM pg_aggregate AS a, pg_proc AS p, pg_proc AS pfn +WHERE a.aggfnoid = p.oid AND + a.aggmfinalfn = pfn.oid AND + (pfn.proretset OR + NOT binary_coercible(pfn.prorettype, p.prorettype) OR + NOT binary_coercible(a.aggmtranstype, pfn.proargtypes[0]) OR + CASE WHEN a.aggkind = 'n' THEN pfn.pronargs != 1 + ELSE pfn.pronargs != p.pronargs + 1 + OR (p.pronargs > 0 AND + NOT binary_coercible(p.proargtypes[0], pfn.proargtypes[1])) + OR (p.pronargs > 1 AND + NOT binary_coercible(p.proargtypes[1], pfn.proargtypes[2])) + OR (p.pronargs > 2 AND + NOT binary_coercible(p.proargtypes[2], pfn.proargtypes[3])) + -- we could carry the check further, but 3 args is enough for now + END); + aggfnoid | proname | oid | proname +----------+---------+-----+--------- +(0 rows) + +-- If mtransfn is strict then either minitval should be non-NULL, or +-- input type should match mtranstype so that the first non-null input +-- can be assigned as the state value. +SELECT a.aggfnoid::oid, p.proname, ptr.oid, ptr.proname +FROM pg_aggregate AS a, pg_proc AS p, pg_proc AS ptr +WHERE a.aggfnoid = p.oid AND + a.aggmtransfn = ptr.oid AND ptr.proisstrict AND + a.aggminitval IS NULL AND + NOT binary_coercible(p.proargtypes[0], a.aggmtranstype); + aggfnoid | proname | oid | proname +----------+---------+-----+--------- +(0 rows) + +-- transfn and mtransfn should have same strictness setting. +SELECT a.aggfnoid::oid, p.proname, ptr.oid, ptr.proname, mptr.oid, mptr.proname +FROM pg_aggregate AS a, pg_proc AS p, pg_proc AS ptr, pg_proc AS mptr +WHERE a.aggfnoid = p.oid AND + a.aggtransfn = ptr.oid AND + a.aggmtransfn = mptr.oid AND + ptr.proisstrict != mptr.proisstrict; + aggfnoid | proname | oid | proname | oid | proname +----------+---------+-----+---------+-----+--------- +(0 rows) + -- Cross-check aggsortop (if present) against pg_operator. -- We expect to find entries for bool_and, bool_or, every, max, and min. SELECT DISTINCT proname, oprname diff --git a/src/test/regress/expected/window.out b/src/test/regress/expected/window.out index 0f21fcb01d..d9cb0addb3 100644 --- a/src/test/regress/expected/window.out +++ b/src/test/regress/expected/window.out @@ -1071,3 +1071,226 @@ SELECT nth_value_def(ten) OVER (PARTITION BY four), ten, four 1 | 3 | 3 (10 rows) +-- +-- Test the basic moving-aggregate machinery +-- +-- create aggregates that record the series of transform calls (these are +-- intentionally not true inverses) +CREATE FUNCTION logging_sfunc_nonstrict(text, anyelement) RETURNS text AS +$$ SELECT COALESCE($1, '') || '*' || quote_nullable($2) $$ +LANGUAGE SQL IMMUTABLE; +CREATE FUNCTION logging_msfunc_nonstrict(text, anyelement) RETURNS text AS +$$ SELECT COALESCE($1, '') || '+' || quote_nullable($2) $$ +LANGUAGE SQL IMMUTABLE; +CREATE FUNCTION logging_minvfunc_nonstrict(text, anyelement) RETURNS text AS +$$ SELECT $1 || '-' || quote_nullable($2) $$ +LANGUAGE SQL IMMUTABLE; +CREATE AGGREGATE logging_agg_nonstrict (anyelement) +( + stype = text, + sfunc = logging_sfunc_nonstrict, + mstype = text, + msfunc = logging_msfunc_nonstrict, + minvfunc = logging_minvfunc_nonstrict +); +CREATE AGGREGATE logging_agg_nonstrict_initcond (anyelement) +( + stype = text, + sfunc = logging_sfunc_nonstrict, + mstype = text, + msfunc = logging_msfunc_nonstrict, + minvfunc = logging_minvfunc_nonstrict, + initcond = 'I', + minitcond = 'MI' +); +CREATE FUNCTION logging_sfunc_strict(text, anyelement) RETURNS text AS +$$ SELECT $1 || '*' || quote_nullable($2) $$ +LANGUAGE SQL STRICT IMMUTABLE; +CREATE FUNCTION logging_msfunc_strict(text, anyelement) RETURNS text AS +$$ SELECT $1 || '+' || quote_nullable($2) $$ +LANGUAGE SQL STRICT IMMUTABLE; +CREATE FUNCTION logging_minvfunc_strict(text, anyelement) RETURNS text AS +$$ SELECT $1 || '-' || quote_nullable($2) $$ +LANGUAGE SQL STRICT IMMUTABLE; +CREATE AGGREGATE logging_agg_strict (text) +( + stype = text, + sfunc = logging_sfunc_strict, + mstype = text, + msfunc = logging_msfunc_strict, + minvfunc = logging_minvfunc_strict +); +CREATE AGGREGATE logging_agg_strict_initcond (anyelement) +( + stype = text, + sfunc = logging_sfunc_strict, + mstype = text, + msfunc = logging_msfunc_strict, + minvfunc = logging_minvfunc_strict, + initcond = 'I', + minitcond = 'MI' +); +-- test strict and non-strict cases +SELECT + p::text || ',' || i::text || ':' || COALESCE(v::text, 'NULL') AS row, + logging_agg_nonstrict(v) over wnd as nstrict, + logging_agg_nonstrict_initcond(v) over wnd as nstrict_init, + logging_agg_strict(v::text) over wnd as strict, + logging_agg_strict_initcond(v) over wnd as strict_init +FROM (VALUES + (1, 1, NULL), + (1, 2, 'a'), + (1, 3, 'b'), + (1, 4, NULL), + (1, 5, NULL), + (1, 6, 'c'), + (2, 1, NULL), + (2, 2, 'x'), + (3, 1, 'z') +) AS t(p, i, v) +WINDOW wnd AS (PARTITION BY P ORDER BY i ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) +ORDER BY p, i; + row | nstrict | nstrict_init | strict | strict_init +----------+-----------------------------------------------+-------------------------------------------------+-----------+---------------- + 1,1:NULL | +NULL | MI+NULL | | MI + 1,2:a | +NULL+'a' | MI+NULL+'a' | a | MI+'a' + 1,3:b | +NULL+'a'-NULL+'b' | MI+NULL+'a'-NULL+'b' | a+'b' | MI+'a'+'b' + 1,4:NULL | +NULL+'a'-NULL+'b'-'a'+NULL | MI+NULL+'a'-NULL+'b'-'a'+NULL | a+'b'-'a' | MI+'a'+'b'-'a' + 1,5:NULL | +NULL+'a'-NULL+'b'-'a'+NULL-'b'+NULL | MI+NULL+'a'-NULL+'b'-'a'+NULL-'b'+NULL | | MI + 1,6:c | +NULL+'a'-NULL+'b'-'a'+NULL-'b'+NULL-NULL+'c' | MI+NULL+'a'-NULL+'b'-'a'+NULL-'b'+NULL-NULL+'c' | c | MI+'c' + 2,1:NULL | +NULL | MI+NULL | | MI + 2,2:x | +NULL+'x' | MI+NULL+'x' | x | MI+'x' + 3,1:z | +'z' | MI+'z' | z | MI+'z' +(9 rows) + +-- and again, but with filter +SELECT + p::text || ',' || i::text || ':' || + CASE WHEN f THEN COALESCE(v::text, 'NULL') ELSE '-' END as row, + logging_agg_nonstrict(v) filter(where f) over wnd as nstrict_filt, + logging_agg_nonstrict_initcond(v) filter(where f) over wnd as nstrict_init_filt, + logging_agg_strict(v::text) filter(where f) over wnd as strict_filt, + logging_agg_strict_initcond(v) filter(where f) over wnd as strict_init_filt +FROM (VALUES + (1, 1, true, NULL), + (1, 2, false, 'a'), + (1, 3, true, 'b'), + (1, 4, false, NULL), + (1, 5, false, NULL), + (1, 6, false, 'c'), + (2, 1, false, NULL), + (2, 2, true, 'x'), + (3, 1, true, 'z') +) AS t(p, i, f, v) +WINDOW wnd AS (PARTITION BY p ORDER BY i ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) +ORDER BY p, i; + row | nstrict_filt | nstrict_init_filt | strict_filt | strict_init_filt +----------+--------------+-------------------+-------------+------------------ + 1,1:NULL | +NULL | MI+NULL | | MI + 1,2:- | +NULL | MI+NULL | | MI + 1,3:b | +'b' | MI+'b' | b | MI+'b' + 1,4:- | +'b' | MI+'b' | b | MI+'b' + 1,5:- | | MI | | MI + 1,6:- | | MI | | MI + 2,1:- | | MI | | MI + 2,2:x | +'x' | MI+'x' | x | MI+'x' + 3,1:z | +'z' | MI+'z' | z | MI+'z' +(9 rows) + +-- test that volatile arguments disable moving-aggregate mode +SELECT + i::text || ':' || COALESCE(v::text, 'NULL') as row, + logging_agg_strict(v::text) + over wnd as inverse, + logging_agg_strict(v::text || CASE WHEN random() < 0 then '?' ELSE '' END) + over wnd as noinverse +FROM (VALUES + (1, 'a'), + (2, 'b'), + (3, 'c') +) AS t(i, v) +WINDOW wnd AS (ORDER BY i ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) +ORDER BY i; + row | inverse | noinverse +-----+---------------+----------- + 1:a | a | a + 2:b | a+'b' | a*'b' + 3:c | a+'b'-'a'+'c' | b*'c' +(3 rows) + +SELECT + i::text || ':' || COALESCE(v::text, 'NULL') as row, + logging_agg_strict(v::text) filter(where true) + over wnd as inverse, + logging_agg_strict(v::text) filter(where random() >= 0) + over wnd as noinverse +FROM (VALUES + (1, 'a'), + (2, 'b'), + (3, 'c') +) AS t(i, v) +WINDOW wnd AS (ORDER BY i ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) +ORDER BY i; + row | inverse | noinverse +-----+---------------+----------- + 1:a | a | a + 2:b | a+'b' | a*'b' + 3:c | a+'b'-'a'+'c' | b*'c' +(3 rows) + +-- test that non-overlapping windows don't use inverse transitions +SELECT + logging_agg_strict(v::text) OVER wnd +FROM (VALUES + (1, 'a'), + (2, 'b'), + (3, 'c') +) AS t(i, v) +WINDOW wnd AS (ORDER BY i ROWS BETWEEN CURRENT ROW AND CURRENT ROW) +ORDER BY i; + logging_agg_strict +-------------------- + a + b + c +(3 rows) + +-- test that returning NULL from the inverse transition functions +-- restarts the aggregation from scratch. The second aggregate is supposed +-- to test cases where only some aggregates restart, the third one checks +-- that one aggregate restarting doesn't cause others to restart. +CREATE FUNCTION sum_int_randrestart_minvfunc(int4, int4) RETURNS int4 AS +$$ SELECT CASE WHEN random() < 0.2 THEN NULL ELSE $1 - $2 END $$ +LANGUAGE SQL STRICT; +CREATE AGGREGATE sum_int_randomrestart (int4) +( + stype = int4, + sfunc = int4pl, + mstype = int4, + msfunc = int4pl, + minvfunc = sum_int_randrestart_minvfunc +); +WITH +vs AS ( + SELECT i, (random() * 100)::int4 AS v + FROM generate_series(1, 100) AS i +), +sum_following AS ( + SELECT i, SUM(v) OVER + (ORDER BY i DESC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS s + FROM vs +) +SELECT DISTINCT + sum_following.s = sum_int_randomrestart(v) OVER fwd AS eq1, + -sum_following.s = sum_int_randomrestart(-v) OVER fwd AS eq2, + 100*3+(vs.i-1)*3 = length(logging_agg_nonstrict(''::text) OVER fwd) AS eq3 +FROM vs +JOIN sum_following ON sum_following.i = vs.i +WINDOW fwd AS ( + ORDER BY vs.i ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING +); + eq1 | eq2 | eq3 +-----+-----+----- + t | t | t +(1 row) + diff --git a/src/test/regress/sql/create_aggregate.sql b/src/test/regress/sql/create_aggregate.sql index c76882a398..2b502aca3e 100644 --- a/src/test/regress/sql/create_aggregate.sql +++ b/src/test/regress/sql/create_aggregate.sql @@ -101,3 +101,44 @@ alter aggregate my_rank(VARIADIC "any" ORDER BY VARIADIC "any") rename to test_rank; \da test_* + +-- moving-aggregate options + +CREATE AGGREGATE sumdouble (float8) +( + stype = float8, + sfunc = float8pl, + mstype = float8, + msfunc = float8pl, + minvfunc = float8mi +); + +-- invalid: nonstrict inverse with strict forward function + +CREATE FUNCTION float8mi_n(float8, float8) RETURNS float8 AS +$$ SELECT $1 - $2; $$ +LANGUAGE SQL; + +CREATE AGGREGATE invalidsumdouble (float8) +( + stype = float8, + sfunc = float8pl, + mstype = float8, + msfunc = float8pl, + minvfunc = float8mi_n +); + +-- invalid: non-matching result types + +CREATE FUNCTION float8mi_int(float8, float8) RETURNS int AS +$$ SELECT CAST($1 - $2 AS INT); $$ +LANGUAGE SQL; + +CREATE AGGREGATE wrongreturntype (float8) +( + stype = float8, + sfunc = float8pl, + mstype = float8, + msfunc = float8pl, + minvfunc = float8mi_int +); diff --git a/src/test/regress/sql/opr_sanity.sql b/src/test/regress/sql/opr_sanity.sql index ad37178924..22998a553c 100644 --- a/src/test/regress/sql/opr_sanity.sql +++ b/src/test/regress/sql/opr_sanity.sql @@ -592,7 +592,7 @@ WHERE aggfnoid = 0 OR aggtransfn = 0 OR aggkind NOT IN ('n', 'o', 'h') OR aggnumdirectargs < 0 OR (aggkind = 'n' AND aggnumdirectargs > 0) OR - aggtranstype = 0 OR aggtransspace < 0; + aggtranstype = 0 OR aggtransspace < 0 OR aggmtransspace < 0; -- Make sure the matching pg_proc entry is sensible, too. @@ -668,6 +668,107 @@ WHERE a.aggfnoid = p.oid AND a.agginitval IS NULL AND NOT binary_coercible(p.proargtypes[0], a.aggtranstype); +-- Check for inconsistent specifications of moving-aggregate columns. + +SELECT ctid, aggfnoid::oid +FROM pg_aggregate as p1 +WHERE aggmtranstype != 0 AND + (aggmtransfn = 0 OR aggminvtransfn = 0); + +SELECT ctid, aggfnoid::oid +FROM pg_aggregate as p1 +WHERE aggmtranstype = 0 AND + (aggmtransfn != 0 OR aggminvtransfn != 0 OR aggmfinalfn != 0 OR + aggmtransspace != 0 OR aggminitval IS NOT NULL); + +-- If there is no mfinalfn then the output type must be the mtranstype. + +SELECT a.aggfnoid::oid, p.proname +FROM pg_aggregate as a, pg_proc as p +WHERE a.aggfnoid = p.oid AND + a.aggmtransfn != 0 AND + a.aggmfinalfn = 0 AND p.prorettype != a.aggmtranstype; + +-- Cross-check mtransfn (if present) against its entry in pg_proc. +SELECT a.aggfnoid::oid, p.proname, ptr.oid, ptr.proname +FROM pg_aggregate AS a, pg_proc AS p, pg_proc AS ptr +WHERE a.aggfnoid = p.oid AND + a.aggmtransfn = ptr.oid AND + (ptr.proretset + OR NOT (ptr.pronargs = + CASE WHEN a.aggkind = 'n' THEN p.pronargs + 1 + ELSE greatest(p.pronargs - a.aggnumdirectargs, 1) + 1 END) + OR NOT physically_coercible(ptr.prorettype, a.aggmtranstype) + OR NOT physically_coercible(a.aggmtranstype, ptr.proargtypes[0]) + OR (p.pronargs > 0 AND + NOT physically_coercible(p.proargtypes[0], ptr.proargtypes[1])) + OR (p.pronargs > 1 AND + NOT physically_coercible(p.proargtypes[1], ptr.proargtypes[2])) + OR (p.pronargs > 2 AND + NOT physically_coercible(p.proargtypes[2], ptr.proargtypes[3])) + -- we could carry the check further, but 3 args is enough for now + ); + +-- Cross-check minvtransfn (if present) against its entry in pg_proc. +SELECT a.aggfnoid::oid, p.proname, ptr.oid, ptr.proname +FROM pg_aggregate AS a, pg_proc AS p, pg_proc AS ptr +WHERE a.aggfnoid = p.oid AND + a.aggminvtransfn = ptr.oid AND + (ptr.proretset + OR NOT (ptr.pronargs = + CASE WHEN a.aggkind = 'n' THEN p.pronargs + 1 + ELSE greatest(p.pronargs - a.aggnumdirectargs, 1) + 1 END) + OR NOT physically_coercible(ptr.prorettype, a.aggmtranstype) + OR NOT physically_coercible(a.aggmtranstype, ptr.proargtypes[0]) + OR (p.pronargs > 0 AND + NOT physically_coercible(p.proargtypes[0], ptr.proargtypes[1])) + OR (p.pronargs > 1 AND + NOT physically_coercible(p.proargtypes[1], ptr.proargtypes[2])) + OR (p.pronargs > 2 AND + NOT physically_coercible(p.proargtypes[2], ptr.proargtypes[3])) + -- we could carry the check further, but 3 args is enough for now + ); + +-- Cross-check mfinalfn (if present) against its entry in pg_proc. + +SELECT a.aggfnoid::oid, p.proname, pfn.oid, pfn.proname +FROM pg_aggregate AS a, pg_proc AS p, pg_proc AS pfn +WHERE a.aggfnoid = p.oid AND + a.aggmfinalfn = pfn.oid AND + (pfn.proretset OR + NOT binary_coercible(pfn.prorettype, p.prorettype) OR + NOT binary_coercible(a.aggmtranstype, pfn.proargtypes[0]) OR + CASE WHEN a.aggkind = 'n' THEN pfn.pronargs != 1 + ELSE pfn.pronargs != p.pronargs + 1 + OR (p.pronargs > 0 AND + NOT binary_coercible(p.proargtypes[0], pfn.proargtypes[1])) + OR (p.pronargs > 1 AND + NOT binary_coercible(p.proargtypes[1], pfn.proargtypes[2])) + OR (p.pronargs > 2 AND + NOT binary_coercible(p.proargtypes[2], pfn.proargtypes[3])) + -- we could carry the check further, but 3 args is enough for now + END); + +-- If mtransfn is strict then either minitval should be non-NULL, or +-- input type should match mtranstype so that the first non-null input +-- can be assigned as the state value. + +SELECT a.aggfnoid::oid, p.proname, ptr.oid, ptr.proname +FROM pg_aggregate AS a, pg_proc AS p, pg_proc AS ptr +WHERE a.aggfnoid = p.oid AND + a.aggmtransfn = ptr.oid AND ptr.proisstrict AND + a.aggminitval IS NULL AND + NOT binary_coercible(p.proargtypes[0], a.aggmtranstype); + +-- transfn and mtransfn should have same strictness setting. + +SELECT a.aggfnoid::oid, p.proname, ptr.oid, ptr.proname, mptr.oid, mptr.proname +FROM pg_aggregate AS a, pg_proc AS p, pg_proc AS ptr, pg_proc AS mptr +WHERE a.aggfnoid = p.oid AND + a.aggtransfn = ptr.oid AND + a.aggmtransfn = mptr.oid AND + ptr.proisstrict != mptr.proisstrict; + -- Cross-check aggsortop (if present) against pg_operator. -- We expect to find entries for bool_and, bool_or, every, max, and min. diff --git a/src/test/regress/sql/window.sql b/src/test/regress/sql/window.sql index 7297e62618..5bae12bd33 100644 --- a/src/test/regress/sql/window.sql +++ b/src/test/regress/sql/window.sql @@ -284,3 +284,195 @@ SELECT nth_value_def(n := 2, val := ten) OVER (PARTITION BY four), ten, four SELECT nth_value_def(ten) OVER (PARTITION BY four), ten, four FROM (SELECT * FROM tenk1 WHERE unique2 < 10 ORDER BY four, ten) s; + +-- +-- Test the basic moving-aggregate machinery +-- + +-- create aggregates that record the series of transform calls (these are +-- intentionally not true inverses) + +CREATE FUNCTION logging_sfunc_nonstrict(text, anyelement) RETURNS text AS +$$ SELECT COALESCE($1, '') || '*' || quote_nullable($2) $$ +LANGUAGE SQL IMMUTABLE; + +CREATE FUNCTION logging_msfunc_nonstrict(text, anyelement) RETURNS text AS +$$ SELECT COALESCE($1, '') || '+' || quote_nullable($2) $$ +LANGUAGE SQL IMMUTABLE; + +CREATE FUNCTION logging_minvfunc_nonstrict(text, anyelement) RETURNS text AS +$$ SELECT $1 || '-' || quote_nullable($2) $$ +LANGUAGE SQL IMMUTABLE; + +CREATE AGGREGATE logging_agg_nonstrict (anyelement) +( + stype = text, + sfunc = logging_sfunc_nonstrict, + mstype = text, + msfunc = logging_msfunc_nonstrict, + minvfunc = logging_minvfunc_nonstrict +); + +CREATE AGGREGATE logging_agg_nonstrict_initcond (anyelement) +( + stype = text, + sfunc = logging_sfunc_nonstrict, + mstype = text, + msfunc = logging_msfunc_nonstrict, + minvfunc = logging_minvfunc_nonstrict, + initcond = 'I', + minitcond = 'MI' +); + +CREATE FUNCTION logging_sfunc_strict(text, anyelement) RETURNS text AS +$$ SELECT $1 || '*' || quote_nullable($2) $$ +LANGUAGE SQL STRICT IMMUTABLE; + +CREATE FUNCTION logging_msfunc_strict(text, anyelement) RETURNS text AS +$$ SELECT $1 || '+' || quote_nullable($2) $$ +LANGUAGE SQL STRICT IMMUTABLE; + +CREATE FUNCTION logging_minvfunc_strict(text, anyelement) RETURNS text AS +$$ SELECT $1 || '-' || quote_nullable($2) $$ +LANGUAGE SQL STRICT IMMUTABLE; + +CREATE AGGREGATE logging_agg_strict (text) +( + stype = text, + sfunc = logging_sfunc_strict, + mstype = text, + msfunc = logging_msfunc_strict, + minvfunc = logging_minvfunc_strict +); + +CREATE AGGREGATE logging_agg_strict_initcond (anyelement) +( + stype = text, + sfunc = logging_sfunc_strict, + mstype = text, + msfunc = logging_msfunc_strict, + minvfunc = logging_minvfunc_strict, + initcond = 'I', + minitcond = 'MI' +); + +-- test strict and non-strict cases +SELECT + p::text || ',' || i::text || ':' || COALESCE(v::text, 'NULL') AS row, + logging_agg_nonstrict(v) over wnd as nstrict, + logging_agg_nonstrict_initcond(v) over wnd as nstrict_init, + logging_agg_strict(v::text) over wnd as strict, + logging_agg_strict_initcond(v) over wnd as strict_init +FROM (VALUES + (1, 1, NULL), + (1, 2, 'a'), + (1, 3, 'b'), + (1, 4, NULL), + (1, 5, NULL), + (1, 6, 'c'), + (2, 1, NULL), + (2, 2, 'x'), + (3, 1, 'z') +) AS t(p, i, v) +WINDOW wnd AS (PARTITION BY P ORDER BY i ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) +ORDER BY p, i; + +-- and again, but with filter +SELECT + p::text || ',' || i::text || ':' || + CASE WHEN f THEN COALESCE(v::text, 'NULL') ELSE '-' END as row, + logging_agg_nonstrict(v) filter(where f) over wnd as nstrict_filt, + logging_agg_nonstrict_initcond(v) filter(where f) over wnd as nstrict_init_filt, + logging_agg_strict(v::text) filter(where f) over wnd as strict_filt, + logging_agg_strict_initcond(v) filter(where f) over wnd as strict_init_filt +FROM (VALUES + (1, 1, true, NULL), + (1, 2, false, 'a'), + (1, 3, true, 'b'), + (1, 4, false, NULL), + (1, 5, false, NULL), + (1, 6, false, 'c'), + (2, 1, false, NULL), + (2, 2, true, 'x'), + (3, 1, true, 'z') +) AS t(p, i, f, v) +WINDOW wnd AS (PARTITION BY p ORDER BY i ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) +ORDER BY p, i; + +-- test that volatile arguments disable moving-aggregate mode +SELECT + i::text || ':' || COALESCE(v::text, 'NULL') as row, + logging_agg_strict(v::text) + over wnd as inverse, + logging_agg_strict(v::text || CASE WHEN random() < 0 then '?' ELSE '' END) + over wnd as noinverse +FROM (VALUES + (1, 'a'), + (2, 'b'), + (3, 'c') +) AS t(i, v) +WINDOW wnd AS (ORDER BY i ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) +ORDER BY i; + +SELECT + i::text || ':' || COALESCE(v::text, 'NULL') as row, + logging_agg_strict(v::text) filter(where true) + over wnd as inverse, + logging_agg_strict(v::text) filter(where random() >= 0) + over wnd as noinverse +FROM (VALUES + (1, 'a'), + (2, 'b'), + (3, 'c') +) AS t(i, v) +WINDOW wnd AS (ORDER BY i ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) +ORDER BY i; + +-- test that non-overlapping windows don't use inverse transitions +SELECT + logging_agg_strict(v::text) OVER wnd +FROM (VALUES + (1, 'a'), + (2, 'b'), + (3, 'c') +) AS t(i, v) +WINDOW wnd AS (ORDER BY i ROWS BETWEEN CURRENT ROW AND CURRENT ROW) +ORDER BY i; + +-- test that returning NULL from the inverse transition functions +-- restarts the aggregation from scratch. The second aggregate is supposed +-- to test cases where only some aggregates restart, the third one checks +-- that one aggregate restarting doesn't cause others to restart. + +CREATE FUNCTION sum_int_randrestart_minvfunc(int4, int4) RETURNS int4 AS +$$ SELECT CASE WHEN random() < 0.2 THEN NULL ELSE $1 - $2 END $$ +LANGUAGE SQL STRICT; + +CREATE AGGREGATE sum_int_randomrestart (int4) +( + stype = int4, + sfunc = int4pl, + mstype = int4, + msfunc = int4pl, + minvfunc = sum_int_randrestart_minvfunc +); + +WITH +vs AS ( + SELECT i, (random() * 100)::int4 AS v + FROM generate_series(1, 100) AS i +), +sum_following AS ( + SELECT i, SUM(v) OVER + (ORDER BY i DESC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS s + FROM vs +) +SELECT DISTINCT + sum_following.s = sum_int_randomrestart(v) OVER fwd AS eq1, + -sum_following.s = sum_int_randomrestart(-v) OVER fwd AS eq2, + 100*3+(vs.i-1)*3 = length(logging_agg_nonstrict(''::text) OVER fwd) AS eq3 +FROM vs +JOIN sum_following ON sum_following.i = vs.i +WINDOW fwd AS ( + ORDER BY vs.i ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING +); -- 2.40.0