/*-------------------------------------------------------------------------
*
- * nodeAgg.c--
+ * nodeAgg.c
* Routines to handle aggregate nodes.
*
- * Copyright (c) 1994, Regents of the University of California
+ * ExecAgg evaluates each aggregate in the following steps:
*
+ * transvalue = initcond
+ * foreach input_value do
+ * transvalue = transfunc(transvalue, input_value)
+ * result = finalfunc(transvalue)
*
- * NOTE
- * The implementation of Agg node has been reworked to handle legal
- * SQL aggregates. (Do not expect POSTQUEL semantics.) -- ay 2/95
+ * If a finalfunc is not supplied then the result is just the ending
+ * value of transvalue.
+ *
+ * If transfunc is marked "strict" in pg_proc and initcond is NULL,
+ * then the first non-NULL input_value is assigned directly to transvalue,
+ * and transfunc isn't applied until the second non-NULL input_value.
+ * The agg's input type and transtype must be the same in this case!
+ *
+ * If transfunc is marked "strict" then NULL input_values are skipped,
+ * keeping the previous transvalue. If transfunc is not strict then it
+ * is called for every input tuple and must deal with NULL initcond
+ * or NULL input_value for itself.
+ *
+ * If finalfunc is marked "strict" then it is not called when the
+ * ending transvalue is NULL, instead a NULL result is created
+ * automatically (this is just the usual handling of strict functions,
+ * of course). A non-strict finalfunc can make its own choice of
+ * what to return for a NULL ending transvalue.
+ *
+ * We compute aggregate input expressions and run the transition functions
+ * in a temporary econtext (aggstate->tmpcontext). This is reset at
+ * least once per input tuple, so when the transvalue datatype is
+ * pass-by-reference, we have to be careful to copy it into a longer-lived
+ * memory context, and free the prior value to avoid memory leakage.
+ * We store transvalues in the memory context aggstate->aggcontext,
+ * which is also used for the hashtable structures in AGG_HASHED mode.
+ * The node's regular econtext (aggstate->csstate.cstate.cs_ExprContext)
+ * is used to run finalize functions and compute the output tuple;
+ * this context can be reset once per output tuple.
+ *
+ *
+ * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * /usr/local/devel/pglite/cvs/src/backend/executor/nodeAgg.c,v 1.13 1995/08/01 20:19:07 jolly Exp
+ * $PostgreSQL: pgsql/src/backend/executor/nodeAgg.c,v 1.118 2004/02/03 17:34:02 tgl Exp $
*
*-------------------------------------------------------------------------
*/
-#include <string.h>
#include "postgres.h"
-#include "fmgr.h"
#include "access/heapam.h"
#include "catalog/pg_aggregate.h"
-#include "catalog/catalog.h"
-#include "parser/parse_type.h"
+#include "catalog/pg_operator.h"
#include "executor/executor.h"
#include "executor/nodeAgg.h"
-#include "storage/bufmgr.h"
-#include "utils/palloc.h"
-#include "utils/syscache.h"
+#include "miscadmin.h"
#include "optimizer/clauses.h"
+#include "parser/parse_agg.h"
+#include "parser/parse_coerce.h"
+#include "parser/parse_expr.h"
+#include "parser/parse_oper.h"
+#include "utils/acl.h"
+#include "utils/builtins.h"
+#include "utils/lsyscache.h"
+#include "utils/syscache.h"
+#include "utils/tuplesort.h"
+#include "utils/datum.h"
+
/*
- * AggFuncInfo -
- * keeps the transition functions information around
+ * AggStatePerAggData - per-aggregate working state for the Agg scan
*/
-typedef struct AggFuncInfo
+typedef struct AggStatePerAggData
{
- Oid xfn1_oid;
- Oid xfn2_oid;
- Oid finalfn_oid;
- FmgrInfo xfn1;
- FmgrInfo xfn2;
+ /*
+ * These values are set up during ExecInitAgg() and do not change
+ * thereafter:
+ */
+
+ /* Links to Aggref expr and state nodes this working state is for */
+ AggrefExprState *aggrefstate;
+ Aggref *aggref;
+
+ /* Oids of transfer functions */
+ Oid transfn_oid;
+ Oid finalfn_oid; /* may be InvalidOid */
+
+ /*
+ * fmgr lookup data for transfer functions --- only valid when
+ * corresponding oid is not InvalidOid. Note in particular that
+ * fn_strict flags are kept here.
+ */
+ FmgrInfo transfn;
FmgrInfo finalfn;
-} AggFuncInfo;
-static Datum aggGetAttr(TupleTableSlot *tuple, Aggreg *agg, bool *isNull);
+ /*
+ * Type of input data and Oid of sort operator to use for it; only
+ * set/used when aggregate has DISTINCT flag. (These are not used
+ * directly by nodeAgg, but must be passed to the Tuplesort object.)
+ */
+ Oid inputType;
+ Oid sortOperator;
+ /*
+ * fmgr lookup data for input type's equality operator --- only
+ * set/used when aggregate has DISTINCT flag.
+ */
+ FmgrInfo equalfn;
-/* ---------------------------------------
- *
- * ExecAgg -
- *
- * ExecAgg receives tuples from its outer subplan and aggregates over
- * the appropriate attribute for each (unique) aggregate in the target
- * list. (The number of tuples to aggregate over depends on whether a
- * GROUP BY clause is present. It might be the number of tuples in a
- * group or all the tuples that satisfy the qualifications.) The value of
- * each aggregate is stored in the expression context for ExecProject to
- * evaluate the result tuple.
- *
- * ExecAgg evaluates each aggregate in the following steps: (initcond1,
- * initcond2 are the initial values and sfunc1, sfunc2, and finalfunc are
- * the transition functions.)
- *
- * value1[i] = initcond1
- * value2[i] = initcond2
- * forall tuples do
- * value1[i] = sfunc1(aggregate_attribute, value1[i])
- * value2[i] = sfunc2(value2[i])
- * value1[i] = finalfunc(value1[i], value2[i])
- *
- * If the outer subplan is a Group node, ExecAgg returns as many tuples
- * as there are groups.
+ /*
+ * initial value from pg_aggregate entry
+ */
+ Datum initValue;
+ bool initValueIsNull;
+
+ /*
+ * We need the len and byval info for the agg's input, result, and
+ * transition data types in order to know how to copy/delete values.
+ */
+ int16 inputtypeLen,
+ resulttypeLen,
+ transtypeLen;
+ bool inputtypeByVal,
+ resulttypeByVal,
+ transtypeByVal;
+
+ /*
+ * These values are working state that is initialized at the start of
+ * an input tuple group and updated for each input tuple.
+ *
+ * For a simple (non DISTINCT) aggregate, we just feed the input values
+ * straight to the transition function. If it's DISTINCT, we pass the
+ * input values into a Tuplesort object; then at completion of the
+ * input tuple group, we scan the sorted values, eliminate duplicates,
+ * and run the transition function on the rest.
+ */
+
+ Tuplesortstate *sortstate; /* sort object, if a DISTINCT agg */
+} AggStatePerAggData;
+
+/*
+ * AggStatePerGroupData - per-aggregate-per-group working state
*
- * XXX handling of NULL doesn't work
+ * These values are working state that is initialized at the start of
+ * an input tuple group and updated for each input tuple.
*
- * OLD COMMENTS
+ * In AGG_PLAIN and AGG_SORTED modes, we have a single array of these
+ * structs (pointed to by aggstate->pergroup); we re-use the array for
+ * each input group, if it's AGG_SORTED mode. In AGG_HASHED mode, the
+ * hash table contains an array of these structs for each tuple group.
*
- * XXX Aggregates should probably have another option: what to do
- * with transfn2 if we hit a null value. "count" (transfn1 = null,
- * transfn2 = increment) will want to have transfn2 called; "avg"
- * (transfn1 = add, transfn2 = increment) will not. -pma 1/3/93
+ * Logically, the sortstate field belongs in this struct, but we do not
+ * keep it here for space reasons: we don't support DISTINCT aggregates
+ * in AGG_HASHED mode, so there's no reason to use up a pointer field
+ * in every entry of the hashtable.
+ */
+typedef struct AggStatePerGroupData
+{
+ Datum transValue; /* current transition value */
+ bool transValueIsNull;
+
+ bool noTransValue; /* true if transValue not set yet */
+
+ /*
+ * Note: noTransValue initially has the same value as
+ * transValueIsNull, and if true both are cleared to false at the same
+ * time. They are not the same though: if transfn later returns a
+ * NULL, we want to keep that NULL and not auto-replace it with a
+ * later input value. Only the first non-NULL input will be
+ * auto-substituted.
+ */
+} AggStatePerGroupData;
+
+/*
+ * To implement hashed aggregation, we need a hashtable that stores a
+ * representative tuple and an array of AggStatePerGroup structs for each
+ * distinct set of GROUP BY column values. We compute the hash key from
+ * the GROUP BY columns.
+ */
+typedef struct AggHashEntryData *AggHashEntry;
+
+typedef struct AggHashEntryData
+{
+ TupleHashEntryData shared; /* common header for hash table entries */
+ /* per-aggregate transition status array - must be last! */
+ AggStatePerGroupData pergroup[1]; /* VARIABLE LENGTH ARRAY */
+} AggHashEntryData; /* VARIABLE LENGTH STRUCT */
+
+
+static void initialize_aggregates(AggState *aggstate,
+ AggStatePerAgg peragg,
+ AggStatePerGroup pergroup);
+static void advance_transition_function(AggState *aggstate,
+ AggStatePerAgg peraggstate,
+ AggStatePerGroup pergroupstate,
+ Datum newVal, bool isNull);
+static void advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup);
+static void process_sorted_aggregate(AggState *aggstate,
+ AggStatePerAgg peraggstate,
+ AggStatePerGroup pergroupstate);
+static void finalize_aggregate(AggState *aggstate,
+ AggStatePerAgg peraggstate,
+ AggStatePerGroup pergroupstate,
+ Datum *resultVal, bool *resultIsNull);
+static void build_hash_table(AggState *aggstate);
+static AggHashEntry lookup_hash_entry(AggState *aggstate,
+ TupleTableSlot *slot);
+static TupleTableSlot *agg_retrieve_direct(AggState *aggstate);
+static void agg_fill_hash_table(AggState *aggstate);
+static TupleTableSlot *agg_retrieve_hash_table(AggState *aggstate);
+static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
+
+
+/*
+ * Initialize all aggregates for a new group of input values.
*
- * ------------------------------------------
+ * When called, CurrentMemoryContext should be the per-query context.
*/
-TupleTableSlot *
-ExecAgg(Agg *node)
+static void
+initialize_aggregates(AggState *aggstate,
+ AggStatePerAgg peragg,
+ AggStatePerGroup pergroup)
{
- AggState *aggstate;
- EState *estate;
- Aggreg **aggregates;
- Plan *outerPlan;
- int i,
- nagg;
- Datum *value1,
- *value2;
- int *noInitValue;
- AggFuncInfo *aggFuncInfo;
- long nTuplesAgged = 0;
- ExprContext *econtext;
- ProjectionInfo *projInfo;
- TupleTableSlot *resultSlot;
- HeapTuple oneTuple;
- List *alist;
- char *nulls;
- bool isDone;
- bool isNull = FALSE,
- isNull1 = FALSE,
- isNull2 = FALSE;
+ int aggno;
+ for (aggno = 0; aggno < aggstate->numaggs; aggno++)
+ {
+ AggStatePerAgg peraggstate = &peragg[aggno];
+ AggStatePerGroup pergroupstate = &pergroup[aggno];
+ Aggref *aggref = peraggstate->aggref;
- do {
+ /*
+ * Start a fresh sort operation for each DISTINCT aggregate.
+ */
+ if (aggref->aggdistinct)
+ {
+ /*
+ * In case of rescan, maybe there could be an uncompleted sort
+ * operation? Clean it up if so.
+ */
+ if (peraggstate->sortstate)
+ tuplesort_end(peraggstate->sortstate);
+ peraggstate->sortstate =
+ tuplesort_begin_datum(peraggstate->inputType,
+ peraggstate->sortOperator,
+ work_mem, false);
+ }
- /* ---------------------
- * get state info from node
- * ---------------------
- */
+ /*
+ * (Re)set transValue to the initial value.
+ *
+ * Note that when the initial value is pass-by-ref, we must copy it
+ * (into the aggcontext) since we will pfree the transValue later.
+ */
+ if (peraggstate->initValueIsNull)
+ pergroupstate->transValue = peraggstate->initValue;
+ else
+ {
+ MemoryContext oldContext;
- aggstate = node->aggstate;
- if (aggstate->agg_done)
- return NULL;
+ oldContext = MemoryContextSwitchTo(aggstate->aggcontext);
+ pergroupstate->transValue = datumCopy(peraggstate->initValue,
+ peraggstate->transtypeByVal,
+ peraggstate->transtypeLen);
+ MemoryContextSwitchTo(oldContext);
+ }
+ pergroupstate->transValueIsNull = peraggstate->initValueIsNull;
- estate = node->plan.state;
- econtext = aggstate->csstate.cstate.cs_ExprContext;
- nagg = length(node->aggs);
+ /*
+ * If the initial value for the transition state doesn't exist in
+ * the pg_aggregate table then we will let the first non-NULL
+ * value returned from the outer procNode become the initial
+ * value. (This is useful for aggregates like max() and min().)
+ * The noTransValue flag signals that we still need to do this.
+ */
+ pergroupstate->noTransValue = peraggstate->initValueIsNull;
+ }
+}
- aggregates = (Aggreg **) palloc(sizeof(Aggreg *) * nagg);
+/*
+ * Given a new input value, advance the transition function of an aggregate.
+ *
+ * It doesn't matter which memory context this is called in.
+ */
+static void
+advance_transition_function(AggState *aggstate,
+ AggStatePerAgg peraggstate,
+ AggStatePerGroup pergroupstate,
+ Datum newVal, bool isNull)
+{
+ FunctionCallInfoData fcinfo;
+ MemoryContext oldContext;
- /* take List* and make it an array that can be quickly indexed */
- alist = node->aggs;
- for (i = 0; i < nagg; i++)
+ if (peraggstate->transfn.fn_strict)
{
- aggregates[i] = lfirst(alist);
- aggregates[i]->aggno = i;
- alist = lnext(alist);
+ /*
+ * For a strict transfn, nothing happens at a NULL input tuple; we
+ * just keep the prior transValue.
+ */
+ if (isNull)
+ return;
+ if (pergroupstate->noTransValue)
+ {
+ /*
+ * transValue has not been initialized. This is the first
+ * non-NULL input value. We use it as the initial value for
+ * transValue. (We already checked that the agg's input type
+ * is binary-compatible with its transtype, so straight copy
+ * here is OK.)
+ *
+ * We must copy the datum into aggcontext if it is pass-by-ref.
+ * We do not need to pfree the old transValue, since it's
+ * NULL.
+ */
+ oldContext = MemoryContextSwitchTo(aggstate->aggcontext);
+ pergroupstate->transValue = datumCopy(newVal,
+ peraggstate->transtypeByVal,
+ peraggstate->transtypeLen);
+ pergroupstate->transValueIsNull = false;
+ pergroupstate->noTransValue = false;
+ MemoryContextSwitchTo(oldContext);
+ return;
+ }
+ if (pergroupstate->transValueIsNull)
+ {
+ /*
+ * Don't call a strict function with NULL inputs. Note it is
+ * possible to get here despite the above tests, if the
+ * transfn is strict *and* returned a NULL on a prior cycle.
+ * If that happens we will propagate the NULL all the way to
+ * the end.
+ */
+ return;
+ }
}
- value1 = node->aggstate->csstate.cstate.cs_ExprContext->ecxt_values;
- nulls = node->aggstate->csstate.cstate.cs_ExprContext->ecxt_nulls;
+ /* We run the transition functions in per-input-tuple memory context */
+ oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory);
- value2 = (Datum *) palloc(sizeof(Datum) * nagg);
- MemSet(value2, 0, sizeof(Datum) * nagg);
+ /*
+ * OK to call the transition function
+ *
+ * This is heavily-used code, so manually zero just the necessary fields
+ * instead of using MemSet(). Compare FunctionCall2().
+ */
- aggFuncInfo = (AggFuncInfo *) palloc(sizeof(AggFuncInfo) * nagg);
- MemSet(aggFuncInfo, 0, sizeof(AggFuncInfo) * nagg);
+ /* MemSet(&fcinfo, 0, sizeof(fcinfo)); */
+ fcinfo.context = NULL;
+ fcinfo.resultinfo = NULL;
+ fcinfo.isnull = false;
- noInitValue = (int *) palloc(sizeof(int) * nagg);
- MemSet(noInitValue, 0, sizeof(noInitValue) * nagg);
+ fcinfo.flinfo = &peraggstate->transfn;
+ fcinfo.nargs = 2;
+ fcinfo.arg[0] = pergroupstate->transValue;
+ fcinfo.argnull[0] = pergroupstate->transValueIsNull;
+ fcinfo.arg[1] = newVal;
+ fcinfo.argnull[1] = isNull;
- outerPlan = outerPlan(node);
- oneTuple = NULL;
+ newVal = FunctionCallInvoke(&fcinfo);
- projInfo = aggstate->csstate.cstate.cs_ProjInfo;
+ /*
+ * If pass-by-ref datatype, must copy the new value into aggcontext
+ * and pfree the prior transValue. But if transfn returned a pointer
+ * to its first input, we don't need to do anything.
+ */
+ if (!peraggstate->transtypeByVal &&
+ DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
+ {
+ if (!fcinfo.isnull)
+ {
+ MemoryContextSwitchTo(aggstate->aggcontext);
+ newVal = datumCopy(newVal,
+ peraggstate->transtypeByVal,
+ peraggstate->transtypeLen);
+ }
+ if (!pergroupstate->transValueIsNull)
+ pfree(DatumGetPointer(pergroupstate->transValue));
+ }
- for (i = 0; i < nagg; i++)
+ pergroupstate->transValue = newVal;
+ pergroupstate->transValueIsNull = fcinfo.isnull;
+
+ MemoryContextSwitchTo(oldContext);
+}
+
+/*
+ * Advance all the aggregates for one input tuple. The input tuple
+ * has been stored in tmpcontext->ecxt_scantuple, so that it is accessible
+ * to ExecEvalExpr. pergroup is the array of per-group structs to use
+ * (this might be in a hashtable entry).
+ *
+ * When called, CurrentMemoryContext should be the per-query context.
+ */
+static void
+advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup)
+{
+ ExprContext *econtext = aggstate->tmpcontext;
+ int aggno;
+
+ for (aggno = 0; aggno < aggstate->numaggs; aggno++)
{
- Aggreg *agg;
- char *aggname;
- HeapTuple aggTuple;
- Form_pg_aggregate aggp;
- Oid xfn1_oid,
- xfn2_oid,
- finalfn_oid;
+ AggStatePerAgg peraggstate = &aggstate->peragg[aggno];
+ AggStatePerGroup pergroupstate = &pergroup[aggno];
+ AggrefExprState *aggrefstate = peraggstate->aggrefstate;
+ Aggref *aggref = peraggstate->aggref;
+ Datum newVal;
+ bool isNull;
- agg = aggregates[i];
+ newVal = ExecEvalExprSwitchContext(aggrefstate->target, econtext,
+ &isNull, NULL);
- /* ---------------------
- * find transfer functions of all the aggregates and initialize
- * their initial values
- * ---------------------
+ if (aggref->aggdistinct)
+ {
+ /* in DISTINCT mode, we may ignore nulls */
+ if (isNull)
+ continue;
+ tuplesort_putdatum(peraggstate->sortstate, newVal, isNull);
+ }
+ else
+ {
+ advance_transition_function(aggstate, peraggstate, pergroupstate,
+ newVal, isNull);
+ }
+ }
+}
+
+/*
+ * Run the transition function for a DISTINCT aggregate. This is called
+ * after we have completed entering all the input values into the sort
+ * object. We complete the sort, read out the values in sorted order,
+ * and run the transition function on each non-duplicate value.
+ *
+ * When called, CurrentMemoryContext should be the per-query context.
+ */
+static void
+process_sorted_aggregate(AggState *aggstate,
+ AggStatePerAgg peraggstate,
+ AggStatePerGroup pergroupstate)
+{
+ Datum oldVal = (Datum) 0;
+ bool haveOldVal = false;
+ MemoryContext workcontext = aggstate->tmpcontext->ecxt_per_tuple_memory;
+ MemoryContext oldContext;
+ Datum newVal;
+ bool isNull;
+
+ tuplesort_performsort(peraggstate->sortstate);
+
+ /*
+ * Note: if input type is pass-by-ref, the datums returned by the sort
+ * are freshly palloc'd in the per-query context, so we must be
+ * careful to pfree them when they are no longer needed.
+ */
+
+ while (tuplesort_getdatum(peraggstate->sortstate, true,
+ &newVal, &isNull))
+ {
+ /*
+ * DISTINCT always suppresses nulls, per SQL spec, regardless of
+ * the transition function's strictness.
*/
- aggname = agg->aggname;
- aggTuple = SearchSysCacheTuple(AGGNAME,
- PointerGetDatum(aggname),
- ObjectIdGetDatum(agg->basetype),
- 0, 0);
- if (!HeapTupleIsValid(aggTuple))
- elog(ERROR, "ExecAgg: cache lookup failed for aggregate \"%s\"(%s)",
- aggname,
- typeidTypeName(agg->basetype));
- aggp = (Form_pg_aggregate) GETSTRUCT(aggTuple);
+ if (isNull)
+ continue;
- xfn1_oid = aggp->aggtransfn1;
- xfn2_oid = aggp->aggtransfn2;
- finalfn_oid = aggp->aggfinalfn;
+ /*
+ * Clear and select the working context for evaluation of the
+ * equality function and transition function.
+ */
+ MemoryContextReset(workcontext);
+ oldContext = MemoryContextSwitchTo(workcontext);
- if (OidIsValid(finalfn_oid))
+ if (haveOldVal &&
+ DatumGetBool(FunctionCall2(&peraggstate->equalfn,
+ oldVal, newVal)))
{
- fmgr_info(finalfn_oid, &aggFuncInfo[i].finalfn);
- aggFuncInfo[i].finalfn_oid = finalfn_oid;
+ /* equal to prior, so forget this one */
+ if (!peraggstate->inputtypeByVal)
+ pfree(DatumGetPointer(newVal));
}
-
- if (OidIsValid(xfn2_oid))
+ else
{
- fmgr_info(xfn2_oid, &aggFuncInfo[i].xfn2);
- aggFuncInfo[i].xfn2_oid = xfn2_oid;
- value2[i] = (Datum) AggNameGetInitVal((char *) aggname,
- aggp->aggbasetype,
- 2,
- &isNull2);
- /* ------------------------------------------
- * If there is a second transition function, its initial
- * value must exist -- as it does not depend on data values,
- * we have no other way of determining an initial value.
- * ------------------------------------------
- */
- if (isNull2)
- elog(ERROR, "ExecAgg: agginitval2 is null");
+ advance_transition_function(aggstate, peraggstate, pergroupstate,
+ newVal, false);
+ /* forget the old value, if any */
+ if (haveOldVal && !peraggstate->inputtypeByVal)
+ pfree(DatumGetPointer(oldVal));
+ /* and remember the new one for subsequent equality checks */
+ oldVal = newVal;
+ haveOldVal = true;
}
- if (OidIsValid(xfn1_oid))
+ MemoryContextSwitchTo(oldContext);
+ }
+
+ if (haveOldVal && !peraggstate->inputtypeByVal)
+ pfree(DatumGetPointer(oldVal));
+
+ tuplesort_end(peraggstate->sortstate);
+ peraggstate->sortstate = NULL;
+}
+
+/*
+ * Compute the final value of one aggregate.
+ *
+ * The finalfunction will be run, and the result delivered, in the
+ * output-tuple context; caller's CurrentMemoryContext does not matter.
+ */
+static void
+finalize_aggregate(AggState *aggstate,
+ AggStatePerAgg peraggstate,
+ AggStatePerGroup pergroupstate,
+ Datum *resultVal, bool *resultIsNull)
+{
+ MemoryContext oldContext;
+
+ oldContext = MemoryContextSwitchTo(aggstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
+
+ /*
+ * Apply the agg's finalfn if one is provided, else return transValue.
+ */
+ if (OidIsValid(peraggstate->finalfn_oid))
+ {
+ FunctionCallInfoData fcinfo;
+
+ MemSet(&fcinfo, 0, sizeof(fcinfo));
+ fcinfo.flinfo = &peraggstate->finalfn;
+ fcinfo.nargs = 1;
+ fcinfo.arg[0] = pergroupstate->transValue;
+ fcinfo.argnull[0] = pergroupstate->transValueIsNull;
+ if (fcinfo.flinfo->fn_strict && pergroupstate->transValueIsNull)
{
- fmgr_info(xfn1_oid, &aggFuncInfo[i].xfn1);
- aggFuncInfo[i].xfn1_oid = xfn1_oid;
- value1[i] = (Datum) AggNameGetInitVal((char *) aggname,
- aggp->aggbasetype,
- 1,
- &isNull1);
-
- /* ------------------------------------------
- * If the initial value for the first transition function
- * doesn't exist in the pg_aggregate table then we let
- * the first value returned from the outer procNode become
- * the initial value. (This is useful for aggregates like
- * max{} and min{}.)
- * ------------------------------------------
- */
- if (isNull1)
- {
- noInitValue[i] = 1;
- nulls[i] = 1;
- }
+ /* don't call a strict function with NULL inputs */
+ *resultVal = (Datum) 0;
+ *resultIsNull = true;
+ }
+ else
+ {
+ *resultVal = FunctionCallInvoke(&fcinfo);
+ *resultIsNull = fcinfo.isnull;
}
}
-
+ else
+ {
+ *resultVal = pergroupstate->transValue;
+ *resultIsNull = pergroupstate->transValueIsNull;
+ }
- /* ----------------
- * for each tuple from the the outer plan, apply all the aggregates
- * ----------------
+ /*
+ * If result is pass-by-ref, make sure it is in the right context.
*/
- for (;;)
+ if (!peraggstate->resulttypeByVal && !*resultIsNull &&
+ !MemoryContextContains(CurrentMemoryContext,
+ DatumGetPointer(*resultVal)))
+ *resultVal = datumCopy(*resultVal,
+ peraggstate->resulttypeByVal,
+ peraggstate->resulttypeLen);
+
+ MemoryContextSwitchTo(oldContext);
+}
+
+/*
+ * Initialize the hash table to empty.
+ *
+ * The hash table always lives in the aggcontext memory context.
+ */
+static void
+build_hash_table(AggState *aggstate)
+{
+ Agg *node = (Agg *) aggstate->ss.ps.plan;
+ MemoryContext tmpmem = aggstate->tmpcontext->ecxt_per_tuple_memory;
+ Size entrysize;
+
+ Assert(node->aggstrategy == AGG_HASHED);
+ Assert(node->numGroups > 0);
+
+ entrysize = sizeof(AggHashEntryData) +
+ (aggstate->numaggs - 1) *sizeof(AggStatePerGroupData);
+
+ aggstate->hashtable = BuildTupleHashTable(node->numCols,
+ node->grpColIdx,
+ aggstate->eqfunctions,
+ aggstate->hashfunctions,
+ node->numGroups,
+ entrysize,
+ aggstate->aggcontext,
+ tmpmem);
+}
+
+/*
+ * Find or create a hashtable entry for the tuple group containing the
+ * given tuple.
+ *
+ * When called, CurrentMemoryContext should be the per-query context.
+ */
+static AggHashEntry
+lookup_hash_entry(AggState *aggstate, TupleTableSlot *slot)
+{
+ AggHashEntry entry;
+ bool isnew;
+
+ entry = (AggHashEntry) LookupTupleHashEntry(aggstate->hashtable,
+ slot,
+ &isnew);
+
+ if (isnew)
{
- HeapTuple outerTuple = NULL;
- TupleTableSlot *outerslot;
-
- isNull = isNull1 = isNull2 = 0;
- outerslot = ExecProcNode(outerPlan, (Plan *) node);
- if (outerslot)
- outerTuple = outerslot->val;
- if (!HeapTupleIsValid(outerTuple))
- {
+ /* initialize aggregates for new tuple group */
+ initialize_aggregates(aggstate, aggstate->peragg, entry->pergroup);
+ }
- /*
- * when the outerplan doesn't return a single tuple, create a
- * dummy heaptuple anyway because we still need to return a
- * valid aggregate value. The value returned will be the
- * initial values of the transition functions
- */
- if (nTuplesAgged == 0)
+ return entry;
+}
+
+/*
+ * ExecAgg -
+ *
+ * ExecAgg receives tuples from its outer subplan and aggregates over
+ * the appropriate attribute for each aggregate function use (Aggref
+ * node) appearing in the targetlist or qual of the node. The number
+ * of tuples to aggregate over depends on whether grouped or plain
+ * aggregation is selected. In grouped aggregation, we produce a result
+ * row for each group; in plain aggregation there's a single result row
+ * for the whole query. In either case, the value of each aggregate is
+ * stored in the expression context to be used when ExecProject evaluates
+ * the result tuple.
+ */
+TupleTableSlot *
+ExecAgg(AggState *node)
+{
+ if (node->agg_done)
+ return NULL;
+
+ if (((Agg *) node->ss.ps.plan)->aggstrategy == AGG_HASHED)
+ {
+ if (!node->table_filled)
+ agg_fill_hash_table(node);
+ return agg_retrieve_hash_table(node);
+ }
+ else
+ return agg_retrieve_direct(node);
+}
+
+/*
+ * ExecAgg for non-hashed case
+ */
+static TupleTableSlot *
+agg_retrieve_direct(AggState *aggstate)
+{
+ Agg *node = (Agg *) aggstate->ss.ps.plan;
+ PlanState *outerPlan;
+ ExprContext *econtext;
+ ExprContext *tmpcontext;
+ ProjectionInfo *projInfo;
+ Datum *aggvalues;
+ bool *aggnulls;
+ AggStatePerAgg peragg;
+ AggStatePerGroup pergroup;
+ TupleTableSlot *outerslot;
+ TupleTableSlot *firstSlot;
+ TupleTableSlot *resultSlot;
+ int aggno;
+
+ /*
+ * get state info from node
+ */
+ outerPlan = outerPlanState(aggstate);
+ /* econtext is the per-output-tuple expression context */
+ econtext = aggstate->ss.ps.ps_ExprContext;
+ aggvalues = econtext->ecxt_aggvalues;
+ aggnulls = econtext->ecxt_aggnulls;
+ /* tmpcontext is the per-input-tuple expression context */
+ tmpcontext = aggstate->tmpcontext;
+ projInfo = aggstate->ss.ps.ps_ProjInfo;
+ peragg = aggstate->peragg;
+ pergroup = aggstate->pergroup;
+ firstSlot = aggstate->ss.ss_ScanTupleSlot;
+
+ /*
+ * We loop retrieving groups until we find one matching
+ * aggstate->ss.ps.qual
+ */
+ do
+ {
+ if (aggstate->agg_done)
+ return NULL;
+
+ /*
+ * If we don't already have the first tuple of the new group,
+ * fetch it from the outer plan.
+ */
+ if (aggstate->grp_firstTuple == NULL)
+ {
+ outerslot = ExecProcNode(outerPlan);
+ if (!TupIsNull(outerslot))
{
- TupleDesc tupType;
- Datum *tupValue;
- char *null_array;
-
- tupType = aggstate->csstate.css_ScanTupleSlot->ttc_tupleDescriptor;
- tupValue = projInfo->pi_tupValue;
-
- /* initially, set all the values to NULL */
- null_array = palloc(tupType->natts);
- for (i = 0; i < tupType->natts; i++)
- null_array[i] = 'n';
- oneTuple = heap_formtuple(tupType, tupValue, null_array);
- pfree(null_array);
+ /*
+ * Make a copy of the first input tuple; we will use this
+ * for comparisons (in group mode) and for projection.
+ */
+ aggstate->grp_firstTuple = heap_copytuple(outerslot->val);
+ }
+ else
+ {
+ /* outer plan produced no tuples at all */
+ aggstate->agg_done = true;
+ /* If we are grouping, we should produce no tuples too */
+ if (node->aggstrategy != AGG_PLAIN)
+ return NULL;
}
- break;
}
- for (i = 0; i < nagg; i++)
+ /*
+ * Clear the per-output-tuple context for each group
+ */
+ ResetExprContext(econtext);
+
+ /*
+ * Initialize working state for a new input tuple group
+ */
+ initialize_aggregates(aggstate, peragg, pergroup);
+
+ if (aggstate->grp_firstTuple != NULL)
{
- AttrNumber attnum;
- int2 attlen = 0;
- Datum newVal = (Datum) NULL;
- AggFuncInfo *aggfns = &aggFuncInfo[i];
- Datum args[2];
- Node *tagnode = NULL;
-
- switch (nodeTag(aggregates[i]->target))
- {
- case T_Var:
- tagnode = NULL;
- newVal = aggGetAttr(outerslot,
- aggregates[i],
- &isNull);
- break;
- case T_Expr:
- tagnode = ((Expr *) aggregates[i]->target)->oper;
- econtext->ecxt_scantuple = outerslot;
- newVal = ExecEvalExpr(aggregates[i]->target, econtext,
- &isNull, &isDone);
- break;
- case T_Const:
- tagnode = NULL;
- econtext->ecxt_scantuple = outerslot;
- newVal = ExecEvalExpr(aggregates[i]->target, econtext,
- &isNull, &isDone);
- break;
- default:
- elog(ERROR, "ExecAgg: Bad Agg->Target for Agg %d", i);
- }
+ /*
+ * Store the copied first input tuple in the tuple table slot
+ * reserved for it. The tuple will be deleted when it is
+ * cleared from the slot.
+ */
+ ExecStoreTuple(aggstate->grp_firstTuple,
+ firstSlot,
+ InvalidBuffer,
+ true);
+ aggstate->grp_firstTuple = NULL; /* don't keep two pointers */
- if (isNull && !aggregates[i]->usenulls)
- continue; /* ignore this tuple for this agg */
+ /* set up for first advance_aggregates call */
+ tmpcontext->ecxt_scantuple = firstSlot;
- if (aggfns->xfn1.fn_addr != NULL)
+ /*
+ * Process each outer-plan tuple, and then fetch the next one,
+ * until we exhaust the outer plan or cross a group boundary.
+ */
+ for (;;)
{
- if (noInitValue[i])
+ advance_aggregates(aggstate, pergroup);
+
+ /* Reset per-input-tuple context after each tuple */
+ ResetExprContext(tmpcontext);
+
+ outerslot = ExecProcNode(outerPlan);
+ if (TupIsNull(outerslot))
{
- int byVal = 0;
-
- /*
- * value1 and value2 has not been initialized. This is
- * the first non-NULL value. We use it as the initial
- * value.
- */
-
- /*
- * but we can't just use it straight, we have to make
- * a copy of it since the tuple from which it came
- * will be freed on the next iteration of the scan
- */
- switch (nodeTag(aggregates[i]->target))
- {
- case T_Var:
- attnum = ((Var *) aggregates[i]->target)->varattno;
- attlen = outerslot->ttc_tupleDescriptor->attrs[attnum - 1]->attlen;
- byVal = outerslot->ttc_tupleDescriptor->attrs[attnum - 1]->attbyval;
-
- break;
- case T_Expr:
- {
- FunctionCachePtr fcache_ptr;
-
- if (nodeTag(tagnode) == T_Func)
- fcache_ptr = ((Func *) tagnode)->func_fcache;
- else
- fcache_ptr = ((Oper *) tagnode)->op_fcache;
- attlen = fcache_ptr->typlen;
- byVal = fcache_ptr->typbyval;
-
- break;
- }
- case T_Const:
- attlen = ((Const *) aggregates[i]->target)->constlen;
- byVal = ((Const *) aggregates[i]->target)->constbyval;
-
- break;
- default:
- elog(ERROR, "ExecAgg: Bad Agg->Target for Agg %d", i);
- }
- if (attlen == -1)
- {
- /* variable length */
- attlen = VARSIZE((struct varlena *) newVal);
- }
- value1[i] = (Datum) palloc(attlen);
- if (byVal)
- value1[i] = newVal;
- else
- memmove((char *) (value1[i]), (char *) newVal, attlen);
- noInitValue[i] = 0;
- nulls[i] = 0;
+ /* no more outer-plan tuples available */
+ aggstate->agg_done = true;
+ break;
}
- else
+ /* set up for next advance_aggregates call */
+ tmpcontext->ecxt_scantuple = outerslot;
+
+ /*
+ * If we are grouping, check whether we've crossed a group
+ * boundary.
+ */
+ if (node->aggstrategy == AGG_SORTED)
{
-
- /*
- * apply the transition functions.
- */
- args[0] = value1[i];
- args[1] = newVal;
- value1[i] =
- (Datum) fmgr_c(&aggfns->xfn1,
- (FmgrValues *) args,
- &isNull1);
- Assert(!isNull1);
+ if (!execTuplesMatch(firstSlot->val,
+ outerslot->val,
+ firstSlot->ttc_tupleDescriptor,
+ node->numCols, node->grpColIdx,
+ aggstate->eqfunctions,
+ tmpcontext->ecxt_per_tuple_memory))
+ {
+ /*
+ * Save the first input tuple of the next group.
+ */
+ aggstate->grp_firstTuple = heap_copytuple(outerslot->val);
+ break;
+ }
}
}
-
- if (aggfns->xfn2.fn_addr != NULL)
- {
- Datum xfn2_val = value2[i];
-
- value2[i] =
- (Datum) fmgr_c(&aggfns->xfn2,
- (FmgrValues *) &xfn2_val, &isNull2);
- Assert(!isNull2);
- }
}
/*
- * keep this for the projection (we only need one of these - all
- * the tuples we aggregate over share the same group column)
+ * Done scanning input tuple group. Finalize each aggregate
+ * calculation, and stash results in the per-output-tuple context.
*/
- if (!oneTuple)
- oneTuple = heap_copytuple(outerslot->val);
+ for (aggno = 0; aggno < aggstate->numaggs; aggno++)
+ {
+ AggStatePerAgg peraggstate = &peragg[aggno];
+ AggStatePerGroup pergroupstate = &pergroup[aggno];
- nTuplesAgged++;
- }
+ if (peraggstate->aggref->aggdistinct)
+ process_sorted_aggregate(aggstate, peraggstate, pergroupstate);
- /* --------------
- * finalize the aggregate (if necessary), and get the resultant value
- * --------------
- */
- for (i = 0; i < nagg; i++)
- {
- char *args[2];
- AggFuncInfo *aggfns = &aggFuncInfo[i];
+ finalize_aggregate(aggstate, peraggstate, pergroupstate,
+ &aggvalues[aggno], &aggnulls[aggno]);
+ }
- if (noInitValue[i])
+ /*
+ * If we have no first tuple (ie, the outerPlan didn't return
+ * anything), create a dummy all-nulls input tuple for use by
+ * ExecProject. 99.44% of the time this is a waste of cycles,
+ * because ordinarily the projected output tuple's targetlist
+ * cannot contain any direct (non-aggregated) references to input
+ * columns, so the dummy tuple will not be referenced. However
+ * there are special cases where this isn't so --- in particular
+ * an UPDATE involving an aggregate will have a targetlist
+ * reference to ctid. We need to return a null for ctid in that
+ * situation, not coredump.
+ *
+ * The values returned for the aggregates will be the initial values
+ * of the transition functions.
+ */
+ if (TupIsNull(firstSlot))
{
+ TupleDesc tupType;
- /*
- * No values found for this agg; return current state. This
- * seems to fix behavior for avg() aggregate. -tgl 12/96
- */
- }
- else if (aggfns->finalfn.fn_addr != NULL && nTuplesAgged > 0)
- {
- if (aggfns->finalfn.fn_nargs > 1)
+ /* Should only happen in non-grouped mode */
+ Assert(node->aggstrategy == AGG_PLAIN);
+ Assert(aggstate->agg_done);
+
+ tupType = firstSlot->ttc_tupleDescriptor;
+ /* watch out for zero-column input tuples, though... */
+ if (tupType && tupType->natts > 0)
{
- args[0] = (char *) value1[i];
- args[1] = (char *) value2[i];
+ HeapTuple nullsTuple;
+ Datum *dvalues;
+ char *dnulls;
+
+ dvalues = (Datum *) palloc0(sizeof(Datum) * tupType->natts);
+ dnulls = (char *) palloc(sizeof(char) * tupType->natts);
+ MemSet(dnulls, 'n', sizeof(char) * tupType->natts);
+ nullsTuple = heap_formtuple(tupType, dvalues, dnulls);
+ ExecStoreTuple(nullsTuple,
+ firstSlot,
+ InvalidBuffer,
+ true);
+ pfree(dvalues);
+ pfree(dnulls);
}
- else if (aggfns->xfn1.fn_addr != NULL)
- args[0] = (char *) value1[i];
- else if (aggfns->xfn2.fn_addr != NULL)
- args[0] = (char *) value2[i];
- else
- elog(NOTICE, "ExecAgg: no valid transition functions??");
- value1[i] = (Datum) fmgr_c(&aggfns->finalfn,
- (FmgrValues *) args, &(nulls[i]));
}
- else if (aggfns->xfn1.fn_addr != NULL)
- {
- /*
- * value in the right place, ignore. (If you remove this case,
- * fix the else part. -ay 2/95)
- */
- }
- else if (aggfns->xfn2.fn_addr != NULL)
- value1[i] = value2[i];
- else
- elog(ERROR, "ExecAgg: no valid transition functions??");
+ /*
+ * Form a projection tuple using the aggregate results and the
+ * representative input tuple. Store it in the result tuple slot.
+ * Note we do not support aggregates returning sets ...
+ */
+ econtext->ecxt_scantuple = firstSlot;
+ resultSlot = ExecProject(projInfo, NULL);
+
+ /*
+ * If the completed tuple does not match the qualifications, it is
+ * ignored and we loop back to try to process another group.
+ * Otherwise, return the tuple.
+ */
}
+ while (!ExecQual(aggstate->ss.ps.qual, econtext, false));
+
+ return resultSlot;
+}
+
+/*
+ * ExecAgg for hashed case: phase 1, read input and build hash table
+ */
+static void
+agg_fill_hash_table(AggState *aggstate)
+{
+ PlanState *outerPlan;
+ ExprContext *tmpcontext;
+ AggHashEntry entry;
+ TupleTableSlot *outerslot;
+
+ /*
+ * get state info from node
+ */
+ outerPlan = outerPlanState(aggstate);
+ /* tmpcontext is the per-input-tuple expression context */
+ tmpcontext = aggstate->tmpcontext;
/*
- * whether the aggregation is done depends on whether we are doing
- * aggregation over groups or the entire table
+ * Process each outer-plan tuple, and then fetch the next one, until
+ * we exhaust the outer plan.
*/
- if (nodeTag(outerPlan) == T_Group)
+ for (;;)
{
- /* aggregation over groups */
- aggstate->agg_done = ((Group *) outerPlan)->grpstate->grp_done;
+ outerslot = ExecProcNode(outerPlan);
+ if (TupIsNull(outerslot))
+ break;
+ /* set up for advance_aggregates call */
+ tmpcontext->ecxt_scantuple = outerslot;
+
+ /* Find or build hashtable entry for this tuple's group */
+ entry = lookup_hash_entry(aggstate, outerslot);
+
+ /* Advance the aggregates */
+ advance_aggregates(aggstate, entry->pergroup);
+
+ /* Reset per-input-tuple context after each tuple */
+ ResetExprContext(tmpcontext);
}
- else
- aggstate->agg_done = TRUE;
- /* ----------------
- * form a projection tuple, store it in the result tuple
- * slot and return it.
- * ----------------
+ aggstate->table_filled = true;
+ /* Initialize to walk the hash table */
+ ResetTupleHashIterator(aggstate->hashtable, &aggstate->hashiter);
+}
+
+/*
+ * ExecAgg for hashed case: phase 2, retrieving groups from hash table
+ */
+static TupleTableSlot *
+agg_retrieve_hash_table(AggState *aggstate)
+{
+ ExprContext *econtext;
+ ProjectionInfo *projInfo;
+ Datum *aggvalues;
+ bool *aggnulls;
+ AggStatePerAgg peragg;
+ AggStatePerGroup pergroup;
+ AggHashEntry entry;
+ TupleTableSlot *firstSlot;
+ TupleTableSlot *resultSlot;
+ int aggno;
+
+ /*
+ * get state info from node
*/
+ /* econtext is the per-output-tuple expression context */
+ econtext = aggstate->ss.ps.ps_ExprContext;
+ aggvalues = econtext->ecxt_aggvalues;
+ aggnulls = econtext->ecxt_aggnulls;
+ projInfo = aggstate->ss.ps.ps_ProjInfo;
+ peragg = aggstate->peragg;
+ firstSlot = aggstate->ss.ss_ScanTupleSlot;
- }
- while((ExecQual(fix_opids(node->plan.qual),econtext)!=true) &&
- (node->plan.qual!=NULL));
+ /*
+ * We loop retrieving groups until we find one satisfying
+ * aggstate->ss.ps.qual
+ */
+ do
+ {
+ if (aggstate->agg_done)
+ return NULL;
-
- ExecStoreTuple(oneTuple,
- aggstate->csstate.css_ScanTupleSlot,
- InvalidBuffer,
- false);
- econtext->ecxt_scantuple = aggstate->csstate.css_ScanTupleSlot;
+ /*
+ * Find the next entry in the hash table
+ */
+ entry = (AggHashEntry) ScanTupleHashTable(&aggstate->hashiter);
+ if (entry == NULL)
+ {
+ /* No more entries in hashtable, so done */
+ aggstate->agg_done = TRUE;
+ return NULL;
+ }
- resultSlot = ExecProject(projInfo, &isDone);
+ /*
+ * Clear the per-output-tuple context for each group
+ */
+ ResetExprContext(econtext);
- if (oneTuple)
- pfree(oneTuple);
+ /*
+ * Store the copied first input tuple in the tuple table slot
+ * reserved for it, so that it can be used in ExecProject.
+ */
+ ExecStoreTuple(entry->shared.firstTuple,
+ firstSlot,
+ InvalidBuffer,
+ false);
+
+ pergroup = entry->pergroup;
+
+ /*
+ * Finalize each aggregate calculation, and stash results in the
+ * per-output-tuple context.
+ */
+ for (aggno = 0; aggno < aggstate->numaggs; aggno++)
+ {
+ AggStatePerAgg peraggstate = &peragg[aggno];
+ AggStatePerGroup pergroupstate = &pergroup[aggno];
+
+ Assert(!peraggstate->aggref->aggdistinct);
+ finalize_aggregate(aggstate, peraggstate, pergroupstate,
+ &aggvalues[aggno], &aggnulls[aggno]);
+ }
+
+ /*
+ * Form a projection tuple using the aggregate results and the
+ * representative input tuple. Store it in the result tuple slot.
+ * Note we do not support aggregates returning sets ...
+ */
+ econtext->ecxt_scantuple = firstSlot;
+ resultSlot = ExecProject(projInfo, NULL);
+
+ /*
+ * If the completed tuple does not match the qualifications, it is
+ * ignored and we loop back to try to process another group.
+ * Otherwise, return the tuple.
+ */
+ }
+ while (!ExecQual(aggstate->ss.ps.qual, econtext, false));
return resultSlot;
}
* planner and initializes its outer subtree
* -----------------
*/
-bool
-ExecInitAgg(Agg *node, EState *estate, Plan *parent)
+AggState *
+ExecInitAgg(Agg *node, EState *estate)
{
AggState *aggstate;
+ AggStatePerAgg peragg;
Plan *outerPlan;
ExprContext *econtext;
+ int numaggs,
+ aggno;
+ List *alist;
/*
- * assign the node's execution state
+ * create state structure
*/
- node->plan.state = estate;
+ aggstate = makeNode(AggState);
+ aggstate->ss.ps.plan = (Plan *) node;
+ aggstate->ss.ps.state = estate;
+
+ aggstate->aggs = NIL;
+ aggstate->numaggs = 0;
+ aggstate->eqfunctions = NULL;
+ aggstate->hashfunctions = NULL;
+ aggstate->peragg = NULL;
+ aggstate->agg_done = false;
+ aggstate->pergroup = NULL;
+ aggstate->grp_firstTuple = NULL;
+ aggstate->hashtable = NULL;
/*
- * create state structure
+ * Create expression contexts. We need two, one for per-input-tuple
+ * processing and one for per-output-tuple processing. We cheat a
+ * little by using ExecAssignExprContext() to build both.
*/
- aggstate = makeNode(AggState);
- node->aggstate = aggstate;
- aggstate->agg_done = FALSE;
+ ExecAssignExprContext(estate, &aggstate->ss.ps);
+ aggstate->tmpcontext = aggstate->ss.ps.ps_ExprContext;
+ ExecAssignExprContext(estate, &aggstate->ss.ps);
/*
- * assign node's base id and create expression context
+ * We also need a long-lived memory context for holding hashtable data
+ * structures and transition values. NOTE: the details of what is
+ * stored in aggcontext and what is stored in the regular per-query
+ * memory context are driven by a simple decision: we want to reset
+ * the aggcontext in ExecReScanAgg to recover no-longer-wanted space.
*/
- ExecAssignNodeBaseInfo(estate, &aggstate->csstate.cstate,
- (Plan *) parent);
- ExecAssignExprContext(estate, &aggstate->csstate.cstate);
+ aggstate->aggcontext =
+ AllocSetContextCreate(CurrentMemoryContext,
+ "AggContext",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
#define AGG_NSLOTS 2
/*
* tuple table initialization
*/
- ExecInitScanTupleSlot(estate, &aggstate->csstate);
- ExecInitResultTupleSlot(estate, &aggstate->csstate.cstate);
+ ExecInitScanTupleSlot(estate, &aggstate->ss);
+ ExecInitResultTupleSlot(estate, &aggstate->ss.ps);
- econtext = aggstate->csstate.cstate.cs_ExprContext;
- econtext->ecxt_values =
- (Datum *) palloc(sizeof(Datum) * length(node->aggs));
- MemSet(econtext->ecxt_values, 0, sizeof(Datum) * length(node->aggs));
- econtext->ecxt_nulls = (char *) palloc(length(node->aggs));
- MemSet(econtext->ecxt_nulls, 0, length(node->aggs));
+ /*
+ * initialize child expressions
+ *
+ * Note: ExecInitExpr finds Aggrefs for us, and also checks that no aggs
+ * contain other agg calls in their arguments. This would make no
+ * sense under SQL semantics anyway (and it's forbidden by the spec).
+ * Because that is true, we don't need to worry about evaluating the
+ * aggs in any particular order.
+ */
+ aggstate->ss.ps.targetlist = (List *)
+ ExecInitExpr((Expr *) node->plan.targetlist,
+ (PlanState *) aggstate);
+ aggstate->ss.ps.qual = (List *)
+ ExecInitExpr((Expr *) node->plan.qual,
+ (PlanState *) aggstate);
/*
- * initializes child nodes
+ * initialize child nodes
*/
outerPlan = outerPlan(node);
- ExecInitNode(outerPlan, estate, (Plan *) node);
+ outerPlanState(aggstate) = ExecInitNode(outerPlan, estate);
+
+ /*
+ * initialize source tuple type.
+ */
+ ExecAssignScanTypeFromOuterPlan(&aggstate->ss);
+
+ /*
+ * Initialize result tuple type and projection info.
+ */
+ ExecAssignResultTypeFromTL(&aggstate->ss.ps);
+ ExecAssignProjectionInfo(&aggstate->ss.ps);
/*
- * Result runs in its own context, but make it use our aggregates fix
- * for 'select sum(2+2)'
+ * get the count of aggregates in targetlist and quals
*/
- if (nodeTag(outerPlan) == T_Result)
+ numaggs = aggstate->numaggs;
+ Assert(numaggs == length(aggstate->aggs));
+ if (numaggs <= 0)
{
- ((Result *) outerPlan)->resstate->cstate.cs_ProjInfo->pi_exprContext->ecxt_values =
- econtext->ecxt_values;
- ((Result *) outerPlan)->resstate->cstate.cs_ProjInfo->pi_exprContext->ecxt_nulls =
- econtext->ecxt_nulls;
+ /*
+ * This is not an error condition: we might be using the Agg node
+ * just to do hash-based grouping. Even in the regular case,
+ * constant-expression simplification could optimize away all of
+ * the Aggrefs in the targetlist and qual. So keep going, but
+ * force local copy of numaggs positive so that palloc()s below
+ * don't choke.
+ */
+ numaggs = 1;
}
+ /*
+ * If we are grouping, precompute fmgr lookup data for inner loop. We
+ * need both equality and hashing functions to do it by hashing, but
+ * only equality if not hashing.
+ */
+ if (node->numCols > 0)
+ {
+ if (node->aggstrategy == AGG_HASHED)
+ execTuplesHashPrepare(ExecGetScanType(&aggstate->ss),
+ node->numCols,
+ node->grpColIdx,
+ &aggstate->eqfunctions,
+ &aggstate->hashfunctions);
+ else
+ aggstate->eqfunctions =
+ execTuplesMatchPrepare(ExecGetScanType(&aggstate->ss),
+ node->numCols,
+ node->grpColIdx);
+ }
- /* ----------------
- * initialize tuple type.
- * ----------------
+ /*
+ * Set up aggregate-result storage in the output expr context, and
+ * also allocate my private per-agg working storage
*/
- ExecAssignScanTypeFromOuterPlan((Plan *) node, &aggstate->csstate);
+ econtext = aggstate->ss.ps.ps_ExprContext;
+ econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numaggs);
+ econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numaggs);
+
+ peragg = (AggStatePerAgg) palloc0(sizeof(AggStatePerAggData) * numaggs);
+ aggstate->peragg = peragg;
+
+ if (node->aggstrategy == AGG_HASHED)
+ {
+ build_hash_table(aggstate);
+ aggstate->table_filled = false;
+ }
+ else
+ {
+ AggStatePerGroup pergroup;
+
+ pergroup = (AggStatePerGroup) palloc0(sizeof(AggStatePerGroupData) * numaggs);
+ aggstate->pergroup = pergroup;
+ }
/*
- * Initialize tuple type for both result and scan. This node does no
- * projection
+ * Perform lookups of aggregate function info, and initialize the
+ * unchanging fields of the per-agg data. We also detect duplicate
+ * aggregates (for example, "SELECT sum(x) ... HAVING sum(x) > 0").
+ * When duplicates are detected, we only make an AggStatePerAgg struct
+ * for the first one. The clones are simply pointed at the same
+ * result entry by giving them duplicate aggno values.
*/
- ExecAssignResultTypeFromTL((Plan *) node, &aggstate->csstate.cstate);
- ExecAssignProjectionInfo((Plan *) node, &aggstate->csstate.cstate);
+ aggno = -1;
+ foreach(alist, aggstate->aggs)
+ {
+ AggrefExprState *aggrefstate = (AggrefExprState *) lfirst(alist);
+ Aggref *aggref = (Aggref *) aggrefstate->xprstate.expr;
+ AggStatePerAgg peraggstate;
+ Oid inputType;
+ HeapTuple aggTuple;
+ Form_pg_aggregate aggform;
+ Oid aggtranstype;
+ AclResult aclresult;
+ Oid transfn_oid,
+ finalfn_oid;
+ Expr *transfnexpr,
+ *finalfnexpr;
+ Datum textInitVal;
+ int i;
+
+ /* Planner should have assigned aggregate to correct level */
+ Assert(aggref->agglevelsup == 0);
+
+ /* Look for a previous duplicate aggregate */
+ for (i = 0; i <= aggno; i++)
+ {
+ if (equal(aggref, peragg[i].aggref) &&
+ !contain_volatile_functions((Node *) aggref))
+ break;
+ }
+ if (i <= aggno)
+ {
+ /* Found a match to an existing entry, so just mark it */
+ aggrefstate->aggno = i;
+ continue;
+ }
+
+ /* Nope, so assign a new PerAgg record */
+ peraggstate = &peragg[++aggno];
+
+ /* Mark Aggref state node with assigned index in the result array */
+ aggrefstate->aggno = aggno;
+
+ /* Fill in the peraggstate data */
+ peraggstate->aggrefstate = aggrefstate;
+ peraggstate->aggref = aggref;
+
+ /*
+ * Get actual datatype of the input. We need this because it may
+ * be different from the agg's declared input type, when the agg
+ * accepts ANY (eg, COUNT(*)) or ANYARRAY or ANYELEMENT.
+ */
+ inputType = exprType((Node *) aggref->target);
+
+ aggTuple = SearchSysCache(AGGFNOID,
+ ObjectIdGetDatum(aggref->aggfnoid),
+ 0, 0, 0);
+ if (!HeapTupleIsValid(aggTuple))
+ elog(ERROR, "cache lookup failed for aggregate %u",
+ aggref->aggfnoid);
+ aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
+
+ /* Check permission to call aggregate function */
+ aclresult = pg_proc_aclcheck(aggref->aggfnoid, GetUserId(),
+ ACL_EXECUTE);
+ if (aclresult != ACLCHECK_OK)
+ aclcheck_error(aclresult, ACL_KIND_PROC,
+ get_func_name(aggref->aggfnoid));
+
+ peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn;
+ peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
+
+ /* resolve actual type of transition state, if polymorphic */
+ aggtranstype = aggform->aggtranstype;
+ if (aggtranstype == ANYARRAYOID || aggtranstype == ANYELEMENTOID)
+ {
+ /* have to fetch the agg's declared input type... */
+ Oid agg_arg_types[FUNC_MAX_ARGS];
+ int agg_nargs;
+
+ (void) get_func_signature(aggref->aggfnoid,
+ agg_arg_types, &agg_nargs);
+ Assert(agg_nargs == 1);
+ aggtranstype = resolve_generic_type(aggtranstype,
+ inputType,
+ agg_arg_types[0]);
+ }
+
+ /* build expression trees using actual argument & result types */
+ build_aggregate_fnexprs(inputType,
+ aggtranstype,
+ aggref->aggtype,
+ transfn_oid,
+ finalfn_oid,
+ &transfnexpr,
+ &finalfnexpr);
+
+ fmgr_info(transfn_oid, &peraggstate->transfn);
+ peraggstate->transfn.fn_expr = (Node *) transfnexpr;
+
+ if (OidIsValid(finalfn_oid))
+ {
+ fmgr_info(finalfn_oid, &peraggstate->finalfn);
+ peraggstate->finalfn.fn_expr = (Node *) finalfnexpr;
+ }
+
+ get_typlenbyval(aggref->aggtype,
+ &peraggstate->resulttypeLen,
+ &peraggstate->resulttypeByVal);
+ get_typlenbyval(aggtranstype,
+ &peraggstate->transtypeLen,
+ &peraggstate->transtypeByVal);
+
+ /*
+ * initval is potentially null, so don't try to access it as a
+ * struct field. Must do it the hard way with SysCacheGetAttr.
+ */
+ textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple,
+ Anum_pg_aggregate_agginitval,
+ &peraggstate->initValueIsNull);
+
+ if (peraggstate->initValueIsNull)
+ peraggstate->initValue = (Datum) 0;
+ else
+ peraggstate->initValue = GetAggInitVal(textInitVal,
+ aggtranstype);
+
+ /*
+ * If the transfn is strict and the initval is NULL, make sure
+ * input type and transtype are the same (or at least binary-
+ * compatible), so that it's OK to use the first input value as
+ * the initial transValue. This should have been checked at agg
+ * definition time, but just in case...
+ */
+ if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull)
+ {
+ if (!IsBinaryCoercible(inputType, aggtranstype))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
+ errmsg("aggregate %u needs to have compatible input type and transition type",
+ aggref->aggfnoid)));
+ }
+
+ if (aggref->aggdistinct)
+ {
+ Oid eq_function;
+
+ /* We don't implement DISTINCT aggs in the HASHED case */
+ Assert(node->aggstrategy != AGG_HASHED);
+
+ peraggstate->inputType = inputType;
+ get_typlenbyval(inputType,
+ &peraggstate->inputtypeLen,
+ &peraggstate->inputtypeByVal);
+
+ eq_function = equality_oper_funcid(inputType);
+ fmgr_info(eq_function, &(peraggstate->equalfn));
+ peraggstate->sortOperator = ordering_oper_opid(inputType);
+ peraggstate->sortstate = NULL;
+ }
+
+ ReleaseSysCache(aggTuple);
+ }
- return TRUE;
+ /* Update numaggs to match number of unique aggregates found */
+ aggstate->numaggs = aggno + 1;
+
+ return aggstate;
+}
+
+static Datum
+GetAggInitVal(Datum textInitVal, Oid transtype)
+{
+ char *strInitVal;
+ HeapTuple tup;
+ Oid typinput,
+ typelem;
+ Datum initVal;
+
+ strInitVal = DatumGetCString(DirectFunctionCall1(textout, textInitVal));
+
+ tup = SearchSysCache(TYPEOID,
+ ObjectIdGetDatum(transtype),
+ 0, 0, 0);
+ if (!HeapTupleIsValid(tup))
+ elog(ERROR, "cache lookup failed for type %u", transtype);
+
+ typinput = ((Form_pg_type) GETSTRUCT(tup))->typinput;
+ typelem = ((Form_pg_type) GETSTRUCT(tup))->typelem;
+ ReleaseSysCache(tup);
+
+ initVal = OidFunctionCall3(typinput,
+ CStringGetDatum(strInitVal),
+ ObjectIdGetDatum(typelem),
+ Int32GetDatum(-1));
+
+ pfree(strInitVal);
+ return initVal;
}
int
ExecCountSlotsAgg(Agg *node)
{
return ExecCountSlotsNode(outerPlan(node)) +
- ExecCountSlotsNode(innerPlan(node)) +
- AGG_NSLOTS;
+ ExecCountSlotsNode(innerPlan(node)) +
+ AGG_NSLOTS;
}
-/* ------------------------
- * ExecEndAgg(node)
- *
- * -----------------------
- */
void
-ExecEndAgg(Agg *node)
+ExecEndAgg(AggState *node)
{
- AggState *aggstate;
- Plan *outerPlan;
+ PlanState *outerPlan;
+ int aggno;
- aggstate = node->aggstate;
+ /* Make sure we have closed any open tuplesorts */
+ for (aggno = 0; aggno < node->numaggs; aggno++)
+ {
+ AggStatePerAgg peraggstate = &node->peragg[aggno];
- ExecFreeProjectionInfo(&aggstate->csstate.cstate);
+ if (peraggstate->sortstate)
+ tuplesort_end(peraggstate->sortstate);
+ }
- outerPlan = outerPlan(node);
- ExecEndNode(outerPlan, (Plan *) node);
+ /*
+ * Free both the expr contexts.
+ */
+ ExecFreeExprContext(&node->ss.ps);
+ node->ss.ps.ps_ExprContext = node->tmpcontext;
+ ExecFreeExprContext(&node->ss.ps);
/* clean up tuple table */
- ExecClearTuple(aggstate->csstate.css_ScanTupleSlot);
-}
+ ExecClearTuple(node->ss.ss_ScanTupleSlot);
+ MemoryContextDelete(node->aggcontext);
-/*****************************************************************************
- * Support Routines
- *****************************************************************************/
+ outerPlan = outerPlanState(node);
+ ExecEndNode(outerPlan);
+}
-/*
- * aggGetAttr -
- * get the attribute (specified in the Var node in agg) to aggregate
- * over from the tuple
- */
-static Datum
-aggGetAttr(TupleTableSlot *slot,
- Aggreg *agg,
- bool *isNull)
+void
+ExecReScanAgg(AggState *node, ExprContext *exprCtxt)
{
- Datum result;
- AttrNumber attnum;
- HeapTuple heapTuple;
- TupleDesc tuple_type;
- Buffer buffer;
+ ExprContext *econtext = node->ss.ps.ps_ExprContext;
+ int aggno;
- /* ----------------
- * extract tuple information from the slot
- * ----------------
- */
- heapTuple = slot->val;
- tuple_type = slot->ttc_tupleDescriptor;
- buffer = slot->ttc_buffer;
+ node->agg_done = false;
- attnum = ((Var *) agg->target)->varattno;
-
- /*
- * If the attribute number is invalid, then we are supposed to return
- * the entire tuple, we give back a whole slot so that callers know
- * what the tuple looks like.
- */
- if (attnum == InvalidAttrNumber)
+ if (((Agg *) node->ss.ps.plan)->aggstrategy == AGG_HASHED)
{
- TupleTableSlot *tempSlot;
- TupleDesc td;
- HeapTuple tup;
-
- tempSlot = makeNode(TupleTableSlot);
- tempSlot->ttc_shouldFree = false;
- tempSlot->ttc_descIsNew = true;
- tempSlot->ttc_tupleDescriptor = (TupleDesc) NULL,
- tempSlot->ttc_buffer = InvalidBuffer;
- tempSlot->ttc_whichplan = -1;
+ /*
+ * In the hashed case, if we haven't yet built the hash table then
+ * we can just return; nothing done yet, so nothing to undo. If
+ * subnode's chgParam is not NULL then it will be re-scanned by
+ * ExecProcNode, else no reason to re-scan it at all.
+ */
+ if (!node->table_filled)
+ return;
- tup = heap_copytuple(slot->val);
- td = CreateTupleDescCopy(slot->ttc_tupleDescriptor);
+ /*
+ * If we do have the hash table and the subplan does not have any
+ * parameter changes, then we can just rescan the existing hash
+ * table; no need to build it again.
+ */
+ if (((PlanState *) node)->lefttree->chgParam == NULL)
+ {
+ ResetTupleHashIterator(node->hashtable, &node->hashiter);
+ return;
+ }
+ }
- ExecSetSlotDescriptor(tempSlot, td);
+ /* Make sure we have closed any open tuplesorts */
+ for (aggno = 0; aggno < node->numaggs; aggno++)
+ {
+ AggStatePerAgg peraggstate = &node->peragg[aggno];
- ExecStoreTuple(tup, tempSlot, InvalidBuffer, true);
- return (Datum) tempSlot;
+ if (peraggstate->sortstate)
+ tuplesort_end(peraggstate->sortstate);
+ peraggstate->sortstate = NULL;
}
- result =
- heap_getattr(heapTuple, /* tuple containing attribute */
- attnum, /* attribute number of desired attribute */
- tuple_type,/* tuple descriptor of tuple */
- isNull); /* return: is attribute null? */
-
- /* ----------------
- * return null if att is null
- * ----------------
- */
- if (*isNull)
- return (Datum) NULL;
+ /* Release first tuple of group, if we have made a copy */
+ if (node->grp_firstTuple != NULL)
+ {
+ heap_freetuple(node->grp_firstTuple);
+ node->grp_firstTuple = NULL;
+ }
- return result;
-}
+ /* Forget current agg values */
+ MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numaggs);
+ MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numaggs);
-void
-ExecReScanAgg(Agg *node, ExprContext *exprCtxt, Plan *parent)
-{
- AggState *aggstate = node->aggstate;
- ExprContext *econtext = aggstate->csstate.cstate.cs_ExprContext;
+ /* Release all temp storage */
+ MemoryContextReset(node->aggcontext);
- aggstate->agg_done = FALSE;
- MemSet(econtext->ecxt_values, 0, sizeof(Datum) * length(node->aggs));
- MemSet(econtext->ecxt_nulls, 0, length(node->aggs));
+ if (((Agg *) node->ss.ps.plan)->aggstrategy == AGG_HASHED)
+ {
+ /* Rebuild an empty hash table */
+ build_hash_table(node);
+ node->table_filled = false;
+ }
/*
* if chgParam of subnode is not null then plan will be re-scanned by
* first ExecProcNode.
*/
- if (((Plan *) node)->lefttree->chgParam == NULL)
- ExecReScan(((Plan *) node)->lefttree, exprCtxt, (Plan *) node);
+ if (((PlanState *) node)->lefttree->chgParam == NULL)
+ ExecReScan(((PlanState *) node)->lefttree, exprCtxt);
+}
+/*
+ * aggregate_dummy - dummy execution routine for aggregate functions
+ *
+ * This function is listed as the implementation (prosrc field) of pg_proc
+ * entries for aggregate functions. Its only purpose is to throw an error
+ * if someone mistakenly executes such a function in the normal way.
+ *
+ * Perhaps someday we could assign real meaning to the prosrc field of
+ * an aggregate?
+ */
+Datum
+aggregate_dummy(PG_FUNCTION_ARGS)
+{
+ elog(ERROR, "aggregate function %u called as normal function",
+ fcinfo->flinfo->fn_oid);
+ return (Datum) 0; /* keep compiler quiet */
}