X-Git-Url: https://granicus.if.org/sourcecode?a=blobdiff_plain;f=src%2Fbackend%2Fexecutor%2FnodeAgg.c;h=cb0a64c42771beb660c1f30a4c2aec1f39dd1711;hb=391c3811a2b7f4cd666e1b4f35534046a862abbb;hp=b45376013f95ede584d303a4a581ff7b9bb82dce;hpb=22d641a7d42fba17e1f9310a8444763b4c3aa47f;p=postgresql diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c index b45376013f..cb0a64c427 100644 --- a/src/backend/executor/nodeAgg.c +++ b/src/backend/executor/nodeAgg.c @@ -29,24 +29,23 @@ * of course). A non-strict finalfunc can make its own choice of * what to return for a NULL ending transvalue. * - * When the transvalue datatype is pass-by-reference, we have to be - * careful to ensure that the values survive across tuple cycles yet - * are not allowed to accumulate until end of query. We do this by - * "ping-ponging" between two memory contexts; successive calls to the - * transfunc are executed in alternate contexts, passing the previous - * transvalue that is in the other context. At the beginning of each - * tuple cycle we can reset the current output context to avoid memory - * usage growth. Note: we must use MemoryContextContains() to check - * whether the transfunc has perhaps handed us back one of its input - * values rather than a freshly palloc'd value; if so, we copy the value - * to the context we want it in. + * We compute aggregate input expressions and run the transition functions + * in a temporary econtext (aggstate->tmpcontext). This is reset at + * least once per input tuple, so when the transvalue datatype is + * pass-by-reference, we have to be careful to copy it into a longer-lived + * memory context, and free the prior value to avoid memory leakage. + * We store transvalues in the memory context aggstate->aggcontext, + * which is also used for the hashtable structures in AGG_HASHED mode. + * The node's regular econtext (aggstate->csstate.cstate.cs_ExprContext) + * is used to run finalize functions and compute the output tuple; + * this context can be reset once per output tuple. * * - * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group + * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/executor/nodeAgg.c,v 1.84 2002/05/17 22:35:12 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/executor/nodeAgg.c,v 1.118 2004/02/03 17:34:02 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -60,6 +59,7 @@ #include "executor/nodeAgg.h" #include "miscadmin.h" #include "optimizer/clauses.h" +#include "parser/parse_agg.h" #include "parser/parse_coerce.h" #include "parser/parse_expr.h" #include "parser/parse_oper.h" @@ -81,7 +81,8 @@ typedef struct AggStatePerAggData * thereafter: */ - /* Link to Aggref node this working state is for */ + /* Links to Aggref expr and state nodes this working state is for */ + AggrefExprState *aggrefstate; Aggref *aggref; /* Oids of transfer functions */ @@ -139,8 +140,27 @@ typedef struct AggStatePerAggData */ Tuplesortstate *sortstate; /* sort object, if a DISTINCT agg */ +} AggStatePerAggData; - Datum transValue; +/* + * AggStatePerGroupData - per-aggregate-per-group working state + * + * These values are working state that is initialized at the start of + * an input tuple group and updated for each input tuple. + * + * In AGG_PLAIN and AGG_SORTED modes, we have a single array of these + * structs (pointed to by aggstate->pergroup); we re-use the array for + * each input group, if it's AGG_SORTED mode. In AGG_HASHED mode, the + * hash table contains an array of these structs for each tuple group. + * + * Logically, the sortstate field belongs in this struct, but we do not + * keep it here for space reasons: we don't support DISTINCT aggregates + * in AGG_HASHED mode, so there's no reason to use up a pointer field + * in every entry of the hashtable. + */ +typedef struct AggStatePerGroupData +{ + Datum transValue; /* current transition value */ bool transValueIsNull; bool noTransValue; /* true if transValue not set yet */ @@ -153,96 +173,138 @@ typedef struct AggStatePerAggData * later input value. Only the first non-NULL input will be * auto-substituted. */ -} AggStatePerAggData; +} AggStatePerGroupData; +/* + * To implement hashed aggregation, we need a hashtable that stores a + * representative tuple and an array of AggStatePerGroup structs for each + * distinct set of GROUP BY column values. We compute the hash key from + * the GROUP BY columns. + */ +typedef struct AggHashEntryData *AggHashEntry; -static void initialize_aggregate(AggStatePerAgg peraggstate); -static void advance_transition_function(AggStatePerAgg peraggstate, +typedef struct AggHashEntryData +{ + TupleHashEntryData shared; /* common header for hash table entries */ + /* per-aggregate transition status array - must be last! */ + AggStatePerGroupData pergroup[1]; /* VARIABLE LENGTH ARRAY */ +} AggHashEntryData; /* VARIABLE LENGTH STRUCT */ + + +static void initialize_aggregates(AggState *aggstate, + AggStatePerAgg peragg, + AggStatePerGroup pergroup); +static void advance_transition_function(AggState *aggstate, + AggStatePerAgg peraggstate, + AggStatePerGroup pergroupstate, Datum newVal, bool isNull); +static void advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup); static void process_sorted_aggregate(AggState *aggstate, - AggStatePerAgg peraggstate); -static void finalize_aggregate(AggStatePerAgg peraggstate, + AggStatePerAgg peraggstate, + AggStatePerGroup pergroupstate); +static void finalize_aggregate(AggState *aggstate, + AggStatePerAgg peraggstate, + AggStatePerGroup pergroupstate, Datum *resultVal, bool *resultIsNull); +static void build_hash_table(AggState *aggstate); +static AggHashEntry lookup_hash_entry(AggState *aggstate, + TupleTableSlot *slot); +static TupleTableSlot *agg_retrieve_direct(AggState *aggstate); +static void agg_fill_hash_table(AggState *aggstate); +static TupleTableSlot *agg_retrieve_hash_table(AggState *aggstate); static Datum GetAggInitVal(Datum textInitVal, Oid transtype); /* - * Initialize one aggregate for a new set of input values. + * Initialize all aggregates for a new group of input values. * * When called, CurrentMemoryContext should be the per-query context. */ static void -initialize_aggregate(AggStatePerAgg peraggstate) +initialize_aggregates(AggState *aggstate, + AggStatePerAgg peragg, + AggStatePerGroup pergroup) { - Aggref *aggref = peraggstate->aggref; + int aggno; - /* - * Start a fresh sort operation for each DISTINCT aggregate. - */ - if (aggref->aggdistinct) + for (aggno = 0; aggno < aggstate->numaggs; aggno++) { + AggStatePerAgg peraggstate = &peragg[aggno]; + AggStatePerGroup pergroupstate = &pergroup[aggno]; + Aggref *aggref = peraggstate->aggref; + /* - * In case of rescan, maybe there could be an uncompleted sort - * operation? Clean it up if so. + * Start a fresh sort operation for each DISTINCT aggregate. */ - if (peraggstate->sortstate) - tuplesort_end(peraggstate->sortstate); + if (aggref->aggdistinct) + { + /* + * In case of rescan, maybe there could be an uncompleted sort + * operation? Clean it up if so. + */ + if (peraggstate->sortstate) + tuplesort_end(peraggstate->sortstate); - peraggstate->sortstate = - tuplesort_begin_datum(peraggstate->inputType, - peraggstate->sortOperator, - false); - } + peraggstate->sortstate = + tuplesort_begin_datum(peraggstate->inputType, + peraggstate->sortOperator, + work_mem, false); + } - /* - * (Re)set transValue to the initial value. - * - * Note that when the initial value is pass-by-ref, we just reuse it - * without copying for each group. Hence, transition function had - * better not scribble on its input, or it will fail for GROUP BY! - */ - peraggstate->transValue = peraggstate->initValue; - peraggstate->transValueIsNull = peraggstate->initValueIsNull; + /* + * (Re)set transValue to the initial value. + * + * Note that when the initial value is pass-by-ref, we must copy it + * (into the aggcontext) since we will pfree the transValue later. + */ + if (peraggstate->initValueIsNull) + pergroupstate->transValue = peraggstate->initValue; + else + { + MemoryContext oldContext; - /* - * If the initial value for the transition state doesn't exist in the - * pg_aggregate table then we will let the first non-NULL value - * returned from the outer procNode become the initial value. (This is - * useful for aggregates like max() and min().) The noTransValue flag - * signals that we still need to do this. - */ - peraggstate->noTransValue = peraggstate->initValueIsNull; + oldContext = MemoryContextSwitchTo(aggstate->aggcontext); + pergroupstate->transValue = datumCopy(peraggstate->initValue, + peraggstate->transtypeByVal, + peraggstate->transtypeLen); + MemoryContextSwitchTo(oldContext); + } + pergroupstate->transValueIsNull = peraggstate->initValueIsNull; + + /* + * If the initial value for the transition state doesn't exist in + * the pg_aggregate table then we will let the first non-NULL + * value returned from the outer procNode become the initial + * value. (This is useful for aggregates like max() and min().) + * The noTransValue flag signals that we still need to do this. + */ + pergroupstate->noTransValue = peraggstate->initValueIsNull; + } } /* * Given a new input value, advance the transition function of an aggregate. * - * When called, CurrentMemoryContext should be the context we want the - * transition function result to be delivered into on this cycle. + * It doesn't matter which memory context this is called in. */ static void -advance_transition_function(AggStatePerAgg peraggstate, +advance_transition_function(AggState *aggstate, + AggStatePerAgg peraggstate, + AggStatePerGroup pergroupstate, Datum newVal, bool isNull) { FunctionCallInfoData fcinfo; + MemoryContext oldContext; if (peraggstate->transfn.fn_strict) { + /* + * For a strict transfn, nothing happens at a NULL input tuple; we + * just keep the prior transValue. + */ if (isNull) - { - /* - * For a strict transfn, nothing happens at a NULL input - * tuple; we just keep the prior transValue. However, if the - * transtype is pass-by-ref, we have to copy it into the new - * context because the old one is going to get reset. - */ - if (!peraggstate->transValueIsNull) - peraggstate->transValue = datumCopy(peraggstate->transValue, - peraggstate->transtypeByVal, - peraggstate->transtypeLen); return; - } - if (peraggstate->noTransValue) + if (pergroupstate->noTransValue) { /* * transValue has not been initialized. This is the first @@ -251,18 +313,20 @@ advance_transition_function(AggStatePerAgg peraggstate, * is binary-compatible with its transtype, so straight copy * here is OK.) * - * We had better copy the datum if it is pass-by-ref, since the - * given pointer may be pointing into a scan tuple that will - * be freed on the next iteration of the scan. + * We must copy the datum into aggcontext if it is pass-by-ref. + * We do not need to pfree the old transValue, since it's + * NULL. */ - peraggstate->transValue = datumCopy(newVal, + oldContext = MemoryContextSwitchTo(aggstate->aggcontext); + pergroupstate->transValue = datumCopy(newVal, peraggstate->transtypeByVal, peraggstate->transtypeLen); - peraggstate->transValueIsNull = false; - peraggstate->noTransValue = false; + pergroupstate->transValueIsNull = false; + pergroupstate->noTransValue = false; + MemoryContextSwitchTo(oldContext); return; } - if (peraggstate->transValueIsNull) + if (pergroupstate->transValueIsNull) { /* * Don't call a strict function with NULL inputs. Note it is @@ -275,32 +339,94 @@ advance_transition_function(AggStatePerAgg peraggstate, } } - /* OK to call the transition function */ - MemSet(&fcinfo, 0, sizeof(fcinfo)); + /* We run the transition functions in per-input-tuple memory context */ + oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory); + + /* + * OK to call the transition function + * + * This is heavily-used code, so manually zero just the necessary fields + * instead of using MemSet(). Compare FunctionCall2(). + */ + + /* MemSet(&fcinfo, 0, sizeof(fcinfo)); */ + fcinfo.context = NULL; + fcinfo.resultinfo = NULL; + fcinfo.isnull = false; + fcinfo.flinfo = &peraggstate->transfn; fcinfo.nargs = 2; - fcinfo.arg[0] = peraggstate->transValue; - fcinfo.argnull[0] = peraggstate->transValueIsNull; + fcinfo.arg[0] = pergroupstate->transValue; + fcinfo.argnull[0] = pergroupstate->transValueIsNull; fcinfo.arg[1] = newVal; fcinfo.argnull[1] = isNull; newVal = FunctionCallInvoke(&fcinfo); /* - * If the transition function was uncooperative, it may have given us - * a pass-by-ref result that points at the scan tuple or the - * prior-cycle working memory. Copy it into the active context if it - * doesn't look right. + * If pass-by-ref datatype, must copy the new value into aggcontext + * and pfree the prior transValue. But if transfn returned a pointer + * to its first input, we don't need to do anything. */ - if (!peraggstate->transtypeByVal && !fcinfo.isnull && - !MemoryContextContains(CurrentMemoryContext, - DatumGetPointer(newVal))) - newVal = datumCopy(newVal, - peraggstate->transtypeByVal, - peraggstate->transtypeLen); + if (!peraggstate->transtypeByVal && + DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue)) + { + if (!fcinfo.isnull) + { + MemoryContextSwitchTo(aggstate->aggcontext); + newVal = datumCopy(newVal, + peraggstate->transtypeByVal, + peraggstate->transtypeLen); + } + if (!pergroupstate->transValueIsNull) + pfree(DatumGetPointer(pergroupstate->transValue)); + } - peraggstate->transValue = newVal; - peraggstate->transValueIsNull = fcinfo.isnull; + pergroupstate->transValue = newVal; + pergroupstate->transValueIsNull = fcinfo.isnull; + + MemoryContextSwitchTo(oldContext); +} + +/* + * Advance all the aggregates for one input tuple. The input tuple + * has been stored in tmpcontext->ecxt_scantuple, so that it is accessible + * to ExecEvalExpr. pergroup is the array of per-group structs to use + * (this might be in a hashtable entry). + * + * When called, CurrentMemoryContext should be the per-query context. + */ +static void +advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup) +{ + ExprContext *econtext = aggstate->tmpcontext; + int aggno; + + for (aggno = 0; aggno < aggstate->numaggs; aggno++) + { + AggStatePerAgg peraggstate = &aggstate->peragg[aggno]; + AggStatePerGroup pergroupstate = &pergroup[aggno]; + AggrefExprState *aggrefstate = peraggstate->aggrefstate; + Aggref *aggref = peraggstate->aggref; + Datum newVal; + bool isNull; + + newVal = ExecEvalExprSwitchContext(aggrefstate->target, econtext, + &isNull, NULL); + + if (aggref->aggdistinct) + { + /* in DISTINCT mode, we may ignore nulls */ + if (isNull) + continue; + tuplesort_putdatum(peraggstate->sortstate, newVal, isNull); + } + else + { + advance_transition_function(aggstate, peraggstate, pergroupstate, + newVal, isNull); + } + } } /* @@ -313,10 +439,12 @@ advance_transition_function(AggStatePerAgg peraggstate, */ static void process_sorted_aggregate(AggState *aggstate, - AggStatePerAgg peraggstate) + AggStatePerAgg peraggstate, + AggStatePerGroup pergroupstate) { Datum oldVal = (Datum) 0; bool haveOldVal = false; + MemoryContext workcontext = aggstate->tmpcontext->ecxt_per_tuple_memory; MemoryContext oldContext; Datum newVal; bool isNull; @@ -340,12 +468,11 @@ process_sorted_aggregate(AggState *aggstate, continue; /* - * Clear and select the current working context for evaluation of - * the equality function and transition function. + * Clear and select the working context for evaluation of the + * equality function and transition function. */ - MemoryContextReset(aggstate->agg_cxt[aggstate->which_cxt]); - oldContext = - MemoryContextSwitchTo(aggstate->agg_cxt[aggstate->which_cxt]); + MemoryContextReset(workcontext); + oldContext = MemoryContextSwitchTo(workcontext); if (haveOldVal && DatumGetBool(FunctionCall2(&peraggstate->equalfn, @@ -354,24 +481,15 @@ process_sorted_aggregate(AggState *aggstate, /* equal to prior, so forget this one */ if (!peraggstate->inputtypeByVal) pfree(DatumGetPointer(newVal)); - - /* - * note we do NOT flip contexts in this case, so no need to - * copy prior transValue to other context. - */ } else { - advance_transition_function(peraggstate, newVal, false); - - /* - * Make the other context current so that this transition - * result is preserved. - */ - aggstate->which_cxt = 1 - aggstate->which_cxt; + advance_transition_function(aggstate, peraggstate, pergroupstate, + newVal, false); /* forget the old value, if any */ if (haveOldVal && !peraggstate->inputtypeByVal) pfree(DatumGetPointer(oldVal)); + /* and remember the new one for subsequent equality checks */ oldVal = newVal; haveOldVal = true; } @@ -389,13 +507,19 @@ process_sorted_aggregate(AggState *aggstate, /* * Compute the final value of one aggregate. * - * When called, CurrentMemoryContext should be the context where we want - * final values delivered (ie, the per-output-tuple expression context). + * The finalfunction will be run, and the result delivered, in the + * output-tuple context; caller's CurrentMemoryContext does not matter. */ static void -finalize_aggregate(AggStatePerAgg peraggstate, +finalize_aggregate(AggState *aggstate, + AggStatePerAgg peraggstate, + AggStatePerGroup pergroupstate, Datum *resultVal, bool *resultIsNull) { + MemoryContext oldContext; + + oldContext = MemoryContextSwitchTo(aggstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory); + /* * Apply the agg's finalfn if one is provided, else return transValue. */ @@ -406,9 +530,9 @@ finalize_aggregate(AggStatePerAgg peraggstate, MemSet(&fcinfo, 0, sizeof(fcinfo)); fcinfo.flinfo = &peraggstate->finalfn; fcinfo.nargs = 1; - fcinfo.arg[0] = peraggstate->transValue; - fcinfo.argnull[0] = peraggstate->transValueIsNull; - if (fcinfo.flinfo->fn_strict && peraggstate->transValueIsNull) + fcinfo.arg[0] = pergroupstate->transValue; + fcinfo.argnull[0] = pergroupstate->transValueIsNull; + if (fcinfo.flinfo->fn_strict && pergroupstate->transValueIsNull) { /* don't call a strict function with NULL inputs */ *resultVal = (Datum) 0; @@ -422,8 +546,8 @@ finalize_aggregate(AggStatePerAgg peraggstate, } else { - *resultVal = peraggstate->transValue; - *resultIsNull = peraggstate->transValueIsNull; + *resultVal = pergroupstate->transValue; + *resultIsNull = pergroupstate->transValueIsNull; } /* @@ -435,59 +559,130 @@ finalize_aggregate(AggStatePerAgg peraggstate, *resultVal = datumCopy(*resultVal, peraggstate->resulttypeByVal, peraggstate->resulttypeLen); + + MemoryContextSwitchTo(oldContext); } +/* + * Initialize the hash table to empty. + * + * The hash table always lives in the aggcontext memory context. + */ +static void +build_hash_table(AggState *aggstate) +{ + Agg *node = (Agg *) aggstate->ss.ps.plan; + MemoryContext tmpmem = aggstate->tmpcontext->ecxt_per_tuple_memory; + Size entrysize; + + Assert(node->aggstrategy == AGG_HASHED); + Assert(node->numGroups > 0); + + entrysize = sizeof(AggHashEntryData) + + (aggstate->numaggs - 1) *sizeof(AggStatePerGroupData); + + aggstate->hashtable = BuildTupleHashTable(node->numCols, + node->grpColIdx, + aggstate->eqfunctions, + aggstate->hashfunctions, + node->numGroups, + entrysize, + aggstate->aggcontext, + tmpmem); +} -/* --------------------------------------- +/* + * Find or create a hashtable entry for the tuple group containing the + * given tuple. * + * When called, CurrentMemoryContext should be the per-query context. + */ +static AggHashEntry +lookup_hash_entry(AggState *aggstate, TupleTableSlot *slot) +{ + AggHashEntry entry; + bool isnew; + + entry = (AggHashEntry) LookupTupleHashEntry(aggstate->hashtable, + slot, + &isnew); + + if (isnew) + { + /* initialize aggregates for new tuple group */ + initialize_aggregates(aggstate, aggstate->peragg, entry->pergroup); + } + + return entry; +} + +/* * ExecAgg - * * ExecAgg receives tuples from its outer subplan and aggregates over * the appropriate attribute for each aggregate function use (Aggref * node) appearing in the targetlist or qual of the node. The number - * of tuples to aggregate over depends on whether a GROUP BY clause is - * present. We can produce an aggregate result row per group, or just - * one for the whole query. The value of each aggregate is stored in - * the expression context to be used when ExecProject evaluates the - * result tuple. - * - * If the outer subplan is a Group node, ExecAgg returns as many tuples - * as there are groups. - * - * ------------------------------------------ + * of tuples to aggregate over depends on whether grouped or plain + * aggregation is selected. In grouped aggregation, we produce a result + * row for each group; in plain aggregation there's a single result row + * for the whole query. In either case, the value of each aggregate is + * stored in the expression context to be used when ExecProject evaluates + * the result tuple. */ TupleTableSlot * -ExecAgg(Agg *node) +ExecAgg(AggState *node) { - AggState *aggstate; - EState *estate; - Plan *outerPlan; + if (node->agg_done) + return NULL; + + if (((Agg *) node->ss.ps.plan)->aggstrategy == AGG_HASHED) + { + if (!node->table_filled) + agg_fill_hash_table(node); + return agg_retrieve_hash_table(node); + } + else + return agg_retrieve_direct(node); +} + +/* + * ExecAgg for non-hashed case + */ +static TupleTableSlot * +agg_retrieve_direct(AggState *aggstate) +{ + Agg *node = (Agg *) aggstate->ss.ps.plan; + PlanState *outerPlan; ExprContext *econtext; + ExprContext *tmpcontext; ProjectionInfo *projInfo; Datum *aggvalues; bool *aggnulls; AggStatePerAgg peragg; - MemoryContext oldContext; + AggStatePerGroup pergroup; + TupleTableSlot *outerslot; + TupleTableSlot *firstSlot; TupleTableSlot *resultSlot; - HeapTuple inputTuple; int aggno; - bool isNull; /* * get state info from node */ - aggstate = node->aggstate; - estate = node->plan.state; - outerPlan = outerPlan(node); - econtext = aggstate->csstate.cstate.cs_ExprContext; + outerPlan = outerPlanState(aggstate); + /* econtext is the per-output-tuple expression context */ + econtext = aggstate->ss.ps.ps_ExprContext; aggvalues = econtext->ecxt_aggvalues; aggnulls = econtext->ecxt_aggnulls; - projInfo = aggstate->csstate.cstate.cs_ProjInfo; + /* tmpcontext is the per-input-tuple expression context */ + tmpcontext = aggstate->tmpcontext; + projInfo = aggstate->ss.ps.ps_ProjInfo; peragg = aggstate->peragg; + pergroup = aggstate->pergroup; + firstSlot = aggstate->ss.ss_ScanTupleSlot; /* * We loop retrieving groups until we find one matching - * node->plan.qual + * aggstate->ss.ps.qual */ do { @@ -495,208 +690,308 @@ ExecAgg(Agg *node) return NULL; /* - * Clear the per-output-tuple context for each group + * If we don't already have the first tuple of the new group, + * fetch it from the outer plan. */ - MemoryContextReset(aggstate->tup_cxt); - - /* - * Initialize working state for a new input tuple group - */ - for (aggno = 0; aggno < aggstate->numaggs; aggno++) + if (aggstate->grp_firstTuple == NULL) { - AggStatePerAgg peraggstate = &peragg[aggno]; - - initialize_aggregate(peraggstate); + outerslot = ExecProcNode(outerPlan); + if (!TupIsNull(outerslot)) + { + /* + * Make a copy of the first input tuple; we will use this + * for comparisons (in group mode) and for projection. + */ + aggstate->grp_firstTuple = heap_copytuple(outerslot->val); + } + else + { + /* outer plan produced no tuples at all */ + aggstate->agg_done = true; + /* If we are grouping, we should produce no tuples too */ + if (node->aggstrategy != AGG_PLAIN) + return NULL; + } } - inputTuple = NULL; /* no saved input tuple yet */ + /* + * Clear the per-output-tuple context for each group + */ + ResetExprContext(econtext); /* - * for each tuple from the outer plan, update all the aggregates + * Initialize working state for a new input tuple group */ - for (;;) + initialize_aggregates(aggstate, peragg, pergroup); + + if (aggstate->grp_firstTuple != NULL) { - TupleTableSlot *outerslot; + /* + * Store the copied first input tuple in the tuple table slot + * reserved for it. The tuple will be deleted when it is + * cleared from the slot. + */ + ExecStoreTuple(aggstate->grp_firstTuple, + firstSlot, + InvalidBuffer, + true); + aggstate->grp_firstTuple = NULL; /* don't keep two pointers */ - outerslot = ExecProcNode(outerPlan, (Plan *) node); - if (TupIsNull(outerslot)) - break; - econtext->ecxt_scantuple = outerslot; + /* set up for first advance_aggregates call */ + tmpcontext->ecxt_scantuple = firstSlot; /* - * Clear and select the current working context for evaluation - * of the input expressions and transition functions at this - * input tuple. + * Process each outer-plan tuple, and then fetch the next one, + * until we exhaust the outer plan or cross a group boundary. */ - econtext->ecxt_per_tuple_memory = - aggstate->agg_cxt[aggstate->which_cxt]; - ResetExprContext(econtext); - oldContext = - MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); - - for (aggno = 0; aggno < aggstate->numaggs; aggno++) + for (;;) { - AggStatePerAgg peraggstate = &peragg[aggno]; - Aggref *aggref = peraggstate->aggref; - Datum newVal; + advance_aggregates(aggstate, pergroup); - newVal = ExecEvalExpr(aggref->target, econtext, - &isNull, NULL); + /* Reset per-input-tuple context after each tuple */ + ResetExprContext(tmpcontext); - if (aggref->aggdistinct) + outerslot = ExecProcNode(outerPlan); + if (TupIsNull(outerslot)) { - /* in DISTINCT mode, we may ignore nulls */ - if (isNull) - continue; - /* putdatum has to be called in per-query context */ - MemoryContextSwitchTo(oldContext); - tuplesort_putdatum(peraggstate->sortstate, - newVal, isNull); - MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); + /* no more outer-plan tuples available */ + aggstate->agg_done = true; + break; } - else + /* set up for next advance_aggregates call */ + tmpcontext->ecxt_scantuple = outerslot; + + /* + * If we are grouping, check whether we've crossed a group + * boundary. + */ + if (node->aggstrategy == AGG_SORTED) { - advance_transition_function(peraggstate, - newVal, isNull); + if (!execTuplesMatch(firstSlot->val, + outerslot->val, + firstSlot->ttc_tupleDescriptor, + node->numCols, node->grpColIdx, + aggstate->eqfunctions, + tmpcontext->ecxt_per_tuple_memory)) + { + /* + * Save the first input tuple of the next group. + */ + aggstate->grp_firstTuple = heap_copytuple(outerslot->val); + break; + } } } - - /* - * Make the other context current so that these transition - * results are preserved. - */ - aggstate->which_cxt = 1 - aggstate->which_cxt; - - MemoryContextSwitchTo(oldContext); - - /* - * Keep a copy of the first input tuple for the projection. - * (We only need one since only the GROUP BY columns in it can - * be referenced, and these will be the same for all tuples - * aggregated over.) - */ - if (!inputTuple) - inputTuple = heap_copytuple(outerslot->val); } /* * Done scanning input tuple group. Finalize each aggregate * calculation, and stash results in the per-output-tuple context. - * - * This is a bit tricky when there are both DISTINCT and plain - * aggregates: we must first finalize all the plain aggs and then - * all the DISTINCT ones. This is needed because the last - * transition values for the plain aggs are stored in the - * not-current working context, and we have to evaluate those aggs - * (and stash the results in the output tup_cxt!) before we start - * flipping contexts again in process_sorted_aggregate. */ - oldContext = MemoryContextSwitchTo(aggstate->tup_cxt); for (aggno = 0; aggno < aggstate->numaggs; aggno++) { AggStatePerAgg peraggstate = &peragg[aggno]; + AggStatePerGroup pergroupstate = &pergroup[aggno]; + + if (peraggstate->aggref->aggdistinct) + process_sorted_aggregate(aggstate, peraggstate, pergroupstate); - if (!peraggstate->aggref->aggdistinct) - finalize_aggregate(peraggstate, - &aggvalues[aggno], &aggnulls[aggno]); + finalize_aggregate(aggstate, peraggstate, pergroupstate, + &aggvalues[aggno], &aggnulls[aggno]); } - MemoryContextSwitchTo(oldContext); - for (aggno = 0; aggno < aggstate->numaggs; aggno++) + + /* + * If we have no first tuple (ie, the outerPlan didn't return + * anything), create a dummy all-nulls input tuple for use by + * ExecProject. 99.44% of the time this is a waste of cycles, + * because ordinarily the projected output tuple's targetlist + * cannot contain any direct (non-aggregated) references to input + * columns, so the dummy tuple will not be referenced. However + * there are special cases where this isn't so --- in particular + * an UPDATE involving an aggregate will have a targetlist + * reference to ctid. We need to return a null for ctid in that + * situation, not coredump. + * + * The values returned for the aggregates will be the initial values + * of the transition functions. + */ + if (TupIsNull(firstSlot)) { - AggStatePerAgg peraggstate = &peragg[aggno]; + TupleDesc tupType; - if (peraggstate->aggref->aggdistinct) + /* Should only happen in non-grouped mode */ + Assert(node->aggstrategy == AGG_PLAIN); + Assert(aggstate->agg_done); + + tupType = firstSlot->ttc_tupleDescriptor; + /* watch out for zero-column input tuples, though... */ + if (tupType && tupType->natts > 0) { - process_sorted_aggregate(aggstate, peraggstate); - oldContext = MemoryContextSwitchTo(aggstate->tup_cxt); - finalize_aggregate(peraggstate, - &aggvalues[aggno], &aggnulls[aggno]); - MemoryContextSwitchTo(oldContext); + HeapTuple nullsTuple; + Datum *dvalues; + char *dnulls; + + dvalues = (Datum *) palloc0(sizeof(Datum) * tupType->natts); + dnulls = (char *) palloc(sizeof(char) * tupType->natts); + MemSet(dnulls, 'n', sizeof(char) * tupType->natts); + nullsTuple = heap_formtuple(tupType, dvalues, dnulls); + ExecStoreTuple(nullsTuple, + firstSlot, + InvalidBuffer, + true); + pfree(dvalues); + pfree(dnulls); } } /* - * If the outerPlan is a Group node, we will reach here after each - * group. We are not done unless the Group node is done (a little - * ugliness here while we reach into the Group's state to find - * out). Furthermore, when grouping we return nothing at all - * unless we had some input tuple(s). By the nature of Group, - * there are no empty groups, so if we get here with no input the - * whole scan is empty. - * - * If the outerPlan isn't a Group, we are done when we get here, and - * we will emit a (single) tuple even if there were no input - * tuples. + * Form a projection tuple using the aggregate results and the + * representative input tuple. Store it in the result tuple slot. + * Note we do not support aggregates returning sets ... */ - if (IsA(outerPlan, Group)) + econtext->ecxt_scantuple = firstSlot; + resultSlot = ExecProject(projInfo, NULL); + + /* + * If the completed tuple does not match the qualifications, it is + * ignored and we loop back to try to process another group. + * Otherwise, return the tuple. + */ + } + while (!ExecQual(aggstate->ss.ps.qual, econtext, false)); + + return resultSlot; +} + +/* + * ExecAgg for hashed case: phase 1, read input and build hash table + */ +static void +agg_fill_hash_table(AggState *aggstate) +{ + PlanState *outerPlan; + ExprContext *tmpcontext; + AggHashEntry entry; + TupleTableSlot *outerslot; + + /* + * get state info from node + */ + outerPlan = outerPlanState(aggstate); + /* tmpcontext is the per-input-tuple expression context */ + tmpcontext = aggstate->tmpcontext; + + /* + * Process each outer-plan tuple, and then fetch the next one, until + * we exhaust the outer plan. + */ + for (;;) + { + outerslot = ExecProcNode(outerPlan); + if (TupIsNull(outerslot)) + break; + /* set up for advance_aggregates call */ + tmpcontext->ecxt_scantuple = outerslot; + + /* Find or build hashtable entry for this tuple's group */ + entry = lookup_hash_entry(aggstate, outerslot); + + /* Advance the aggregates */ + advance_aggregates(aggstate, entry->pergroup); + + /* Reset per-input-tuple context after each tuple */ + ResetExprContext(tmpcontext); + } + + aggstate->table_filled = true; + /* Initialize to walk the hash table */ + ResetTupleHashIterator(aggstate->hashtable, &aggstate->hashiter); +} + +/* + * ExecAgg for hashed case: phase 2, retrieving groups from hash table + */ +static TupleTableSlot * +agg_retrieve_hash_table(AggState *aggstate) +{ + ExprContext *econtext; + ProjectionInfo *projInfo; + Datum *aggvalues; + bool *aggnulls; + AggStatePerAgg peragg; + AggStatePerGroup pergroup; + AggHashEntry entry; + TupleTableSlot *firstSlot; + TupleTableSlot *resultSlot; + int aggno; + + /* + * get state info from node + */ + /* econtext is the per-output-tuple expression context */ + econtext = aggstate->ss.ps.ps_ExprContext; + aggvalues = econtext->ecxt_aggvalues; + aggnulls = econtext->ecxt_aggnulls; + projInfo = aggstate->ss.ps.ps_ProjInfo; + peragg = aggstate->peragg; + firstSlot = aggstate->ss.ss_ScanTupleSlot; + + /* + * We loop retrieving groups until we find one satisfying + * aggstate->ss.ps.qual + */ + do + { + if (aggstate->agg_done) + return NULL; + + /* + * Find the next entry in the hash table + */ + entry = (AggHashEntry) ScanTupleHashTable(&aggstate->hashiter); + if (entry == NULL) { - /* aggregation over groups */ - aggstate->agg_done = ((Group *) outerPlan)->grpstate->grp_done; - /* check for no groups */ - if (inputTuple == NULL) - return NULL; + /* No more entries in hashtable, so done */ + aggstate->agg_done = TRUE; + return NULL; } - else - { - aggstate->agg_done = true; - /* - * If inputtuple==NULL (ie, the outerPlan didn't return - * anything), create a dummy all-nulls input tuple for use by - * ExecProject. 99.44% of the time this is a waste of cycles, - * because ordinarily the projected output tuple's targetlist - * cannot contain any direct (non-aggregated) references to - * input columns, so the dummy tuple will not be referenced. - * However there are special cases where this isn't so --- in - * particular an UPDATE involving an aggregate will have a - * targetlist reference to ctid. We need to return a null for - * ctid in that situation, not coredump. - * - * The values returned for the aggregates will be the initial - * values of the transition functions. - */ - if (inputTuple == NULL) - { - TupleDesc tupType; - Datum *tupValue; - char *null_array; - AttrNumber attnum; - - tupType = aggstate->csstate.css_ScanTupleSlot->ttc_tupleDescriptor; - tupValue = projInfo->pi_tupValue; - /* watch out for null input tuples, though... */ - if (tupType && tupValue) - { - null_array = (char *) palloc(sizeof(char) * tupType->natts); - for (attnum = 0; attnum < tupType->natts; attnum++) - null_array[attnum] = 'n'; - inputTuple = heap_formtuple(tupType, tupValue, null_array); - pfree(null_array); - } - } - } + /* + * Clear the per-output-tuple context for each group + */ + ResetExprContext(econtext); /* - * Store the representative input tuple in the tuple table slot - * reserved for it. The tuple will be deleted when it is cleared - * from the slot. + * Store the copied first input tuple in the tuple table slot + * reserved for it, so that it can be used in ExecProject. */ - ExecStoreTuple(inputTuple, - aggstate->csstate.css_ScanTupleSlot, + ExecStoreTuple(entry->shared.firstTuple, + firstSlot, InvalidBuffer, - true); - econtext->ecxt_scantuple = aggstate->csstate.css_ScanTupleSlot; + false); + + pergroup = entry->pergroup; /* - * Do projection and qual check in the per-output-tuple context. + * Finalize each aggregate calculation, and stash results in the + * per-output-tuple context. */ - econtext->ecxt_per_tuple_memory = aggstate->tup_cxt; + for (aggno = 0; aggno < aggstate->numaggs; aggno++) + { + AggStatePerAgg peraggstate = &peragg[aggno]; + AggStatePerGroup pergroupstate = &pergroup[aggno]; + + Assert(!peraggstate->aggref->aggdistinct); + finalize_aggregate(aggstate, peraggstate, pergroupstate, + &aggvalues[aggno], &aggnulls[aggno]); + } /* * Form a projection tuple using the aggregate results and the * representative input tuple. Store it in the result tuple slot. * Note we do not support aggregates returning sets ... */ + econtext->ecxt_scantuple = firstSlot; resultSlot = ExecProject(projInfo, NULL); /* @@ -705,7 +1000,7 @@ ExecAgg(Agg *node) * Otherwise, return the tuple. */ } - while (!ExecQual(node->plan.qual, econtext, false)); + while (!ExecQual(aggstate->ss.ps.qual, econtext, false)); return resultSlot; } @@ -717,8 +1012,8 @@ ExecAgg(Agg *node) * planner and initializes its outer subtree * ----------------- */ -bool -ExecInitAgg(Agg *node, EState *estate, Plan *parent) +AggState * +ExecInitAgg(Agg *node, EState *estate) { AggState *aggstate; AggStatePerAgg peragg; @@ -728,138 +1023,214 @@ ExecInitAgg(Agg *node, EState *estate, Plan *parent) aggno; List *alist; - /* - * assign the node's execution state - */ - node->plan.state = estate; - /* * create state structure */ aggstate = makeNode(AggState); - node->aggstate = aggstate; + aggstate->ss.ps.plan = (Plan *) node; + aggstate->ss.ps.state = estate; + + aggstate->aggs = NIL; + aggstate->numaggs = 0; + aggstate->eqfunctions = NULL; + aggstate->hashfunctions = NULL; + aggstate->peragg = NULL; aggstate->agg_done = false; + aggstate->pergroup = NULL; + aggstate->grp_firstTuple = NULL; + aggstate->hashtable = NULL; /* - * find aggregates in targetlist and quals - * - * Note: pull_agg_clauses also checks that no aggs contain other agg - * calls in their arguments. This would make no sense under SQL - * semantics anyway (and it's forbidden by the spec). Because that is - * true, we don't need to worry about evaluating the aggs in any - * particular order. + * Create expression contexts. We need two, one for per-input-tuple + * processing and one for per-output-tuple processing. We cheat a + * little by using ExecAssignExprContext() to build both. */ - aggstate->aggs = nconc(pull_agg_clause((Node *) node->plan.targetlist), - pull_agg_clause((Node *) node->plan.qual)); - aggstate->numaggs = numaggs = length(aggstate->aggs); - if (numaggs <= 0) - { - /* - * This used to be treated as an error, but we can't do that - * anymore because constant-expression simplification could - * optimize away all of the Aggrefs in the targetlist and qual. - * So, just make a debug note, and force numaggs positive so that - * palloc()s below don't choke. - */ - elog(LOG, "ExecInitAgg: could not find any aggregate functions"); - numaggs = 1; - } - - /* - * Create expression context - */ - ExecAssignExprContext(estate, &aggstate->csstate.cstate); + ExecAssignExprContext(estate, &aggstate->ss.ps); + aggstate->tmpcontext = aggstate->ss.ps.ps_ExprContext; + ExecAssignExprContext(estate, &aggstate->ss.ps); /* - * We actually need three separate expression memory contexts: one for - * calculating per-output-tuple values (ie, the finished aggregate - * results), and two that we ping-pong between for per-input-tuple - * evaluation of input expressions and transition functions. The - * context made by ExecAssignExprContext() is used as the output - * context. + * We also need a long-lived memory context for holding hashtable data + * structures and transition values. NOTE: the details of what is + * stored in aggcontext and what is stored in the regular per-query + * memory context are driven by a simple decision: we want to reset + * the aggcontext in ExecReScanAgg to recover no-longer-wanted space. */ - aggstate->tup_cxt = - aggstate->csstate.cstate.cs_ExprContext->ecxt_per_tuple_memory; - aggstate->agg_cxt[0] = - AllocSetContextCreate(CurrentMemoryContext, - "AggExprContext1", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); - aggstate->agg_cxt[1] = + aggstate->aggcontext = AllocSetContextCreate(CurrentMemoryContext, - "AggExprContext2", + "AggContext", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); - aggstate->which_cxt = 0; #define AGG_NSLOTS 2 /* * tuple table initialization */ - ExecInitScanTupleSlot(estate, &aggstate->csstate); - ExecInitResultTupleSlot(estate, &aggstate->csstate.cstate); + ExecInitScanTupleSlot(estate, &aggstate->ss); + ExecInitResultTupleSlot(estate, &aggstate->ss.ps); /* - * Set up aggregate-result storage in the expr context, and also - * allocate my private per-agg working storage + * initialize child expressions + * + * Note: ExecInitExpr finds Aggrefs for us, and also checks that no aggs + * contain other agg calls in their arguments. This would make no + * sense under SQL semantics anyway (and it's forbidden by the spec). + * Because that is true, we don't need to worry about evaluating the + * aggs in any particular order. */ - econtext = aggstate->csstate.cstate.cs_ExprContext; - econtext->ecxt_aggvalues = (Datum *) palloc(sizeof(Datum) * numaggs); - MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * numaggs); - econtext->ecxt_aggnulls = (bool *) palloc(sizeof(bool) * numaggs); - MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * numaggs); - - peragg = (AggStatePerAgg) palloc(sizeof(AggStatePerAggData) * numaggs); - MemSet(peragg, 0, sizeof(AggStatePerAggData) * numaggs); - aggstate->peragg = peragg; + aggstate->ss.ps.targetlist = (List *) + ExecInitExpr((Expr *) node->plan.targetlist, + (PlanState *) aggstate); + aggstate->ss.ps.qual = (List *) + ExecInitExpr((Expr *) node->plan.qual, + (PlanState *) aggstate); /* * initialize child nodes */ outerPlan = outerPlan(node); - ExecInitNode(outerPlan, estate, (Plan *) node); + outerPlanState(aggstate) = ExecInitNode(outerPlan, estate); /* * initialize source tuple type. */ - ExecAssignScanTypeFromOuterPlan((Plan *) node, &aggstate->csstate); + ExecAssignScanTypeFromOuterPlan(&aggstate->ss); /* * Initialize result tuple type and projection info. */ - ExecAssignResultTypeFromTL((Plan *) node, &aggstate->csstate.cstate); - ExecAssignProjectionInfo((Plan *) node, &aggstate->csstate.cstate); + ExecAssignResultTypeFromTL(&aggstate->ss.ps); + ExecAssignProjectionInfo(&aggstate->ss.ps); + + /* + * get the count of aggregates in targetlist and quals + */ + numaggs = aggstate->numaggs; + Assert(numaggs == length(aggstate->aggs)); + if (numaggs <= 0) + { + /* + * This is not an error condition: we might be using the Agg node + * just to do hash-based grouping. Even in the regular case, + * constant-expression simplification could optimize away all of + * the Aggrefs in the targetlist and qual. So keep going, but + * force local copy of numaggs positive so that palloc()s below + * don't choke. + */ + numaggs = 1; + } + + /* + * If we are grouping, precompute fmgr lookup data for inner loop. We + * need both equality and hashing functions to do it by hashing, but + * only equality if not hashing. + */ + if (node->numCols > 0) + { + if (node->aggstrategy == AGG_HASHED) + execTuplesHashPrepare(ExecGetScanType(&aggstate->ss), + node->numCols, + node->grpColIdx, + &aggstate->eqfunctions, + &aggstate->hashfunctions); + else + aggstate->eqfunctions = + execTuplesMatchPrepare(ExecGetScanType(&aggstate->ss), + node->numCols, + node->grpColIdx); + } + + /* + * Set up aggregate-result storage in the output expr context, and + * also allocate my private per-agg working storage + */ + econtext = aggstate->ss.ps.ps_ExprContext; + econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numaggs); + econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numaggs); + + peragg = (AggStatePerAgg) palloc0(sizeof(AggStatePerAggData) * numaggs); + aggstate->peragg = peragg; + + if (node->aggstrategy == AGG_HASHED) + { + build_hash_table(aggstate); + aggstate->table_filled = false; + } + else + { + AggStatePerGroup pergroup; + + pergroup = (AggStatePerGroup) palloc0(sizeof(AggStatePerGroupData) * numaggs); + aggstate->pergroup = pergroup; + } /* * Perform lookups of aggregate function info, and initialize the - * unchanging fields of the per-agg data + * unchanging fields of the per-agg data. We also detect duplicate + * aggregates (for example, "SELECT sum(x) ... HAVING sum(x) > 0"). + * When duplicates are detected, we only make an AggStatePerAgg struct + * for the first one. The clones are simply pointed at the same + * result entry by giving them duplicate aggno values. */ aggno = -1; foreach(alist, aggstate->aggs) { - Aggref *aggref = (Aggref *) lfirst(alist); - AggStatePerAgg peraggstate = &peragg[++aggno]; + AggrefExprState *aggrefstate = (AggrefExprState *) lfirst(alist); + Aggref *aggref = (Aggref *) aggrefstate->xprstate.expr; + AggStatePerAgg peraggstate; + Oid inputType; HeapTuple aggTuple; Form_pg_aggregate aggform; + Oid aggtranstype; AclResult aclresult; Oid transfn_oid, finalfn_oid; + Expr *transfnexpr, + *finalfnexpr; Datum textInitVal; + int i; + + /* Planner should have assigned aggregate to correct level */ + Assert(aggref->agglevelsup == 0); + + /* Look for a previous duplicate aggregate */ + for (i = 0; i <= aggno; i++) + { + if (equal(aggref, peragg[i].aggref) && + !contain_volatile_functions((Node *) aggref)) + break; + } + if (i <= aggno) + { + /* Found a match to an existing entry, so just mark it */ + aggrefstate->aggno = i; + continue; + } + + /* Nope, so assign a new PerAgg record */ + peraggstate = &peragg[++aggno]; - /* Mark Aggref node with its associated index in the result array */ - aggref->aggno = aggno; + /* Mark Aggref state node with assigned index in the result array */ + aggrefstate->aggno = aggno; /* Fill in the peraggstate data */ + peraggstate->aggrefstate = aggrefstate; peraggstate->aggref = aggref; + /* + * Get actual datatype of the input. We need this because it may + * be different from the agg's declared input type, when the agg + * accepts ANY (eg, COUNT(*)) or ANYARRAY or ANYELEMENT. + */ + inputType = exprType((Node *) aggref->target); + aggTuple = SearchSysCache(AGGFNOID, ObjectIdGetDatum(aggref->aggfnoid), 0, 0, 0); if (!HeapTupleIsValid(aggTuple)) - elog(ERROR, "ExecAgg: cache lookup failed for aggregate %u", + elog(ERROR, "cache lookup failed for aggregate %u", aggref->aggfnoid); aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple); @@ -867,18 +1238,56 @@ ExecInitAgg(Agg *node, EState *estate, Plan *parent) aclresult = pg_proc_aclcheck(aggref->aggfnoid, GetUserId(), ACL_EXECUTE); if (aclresult != ACLCHECK_OK) - aclcheck_error(aclresult, get_func_name(aggref->aggfnoid)); + aclcheck_error(aclresult, ACL_KIND_PROC, + get_func_name(aggref->aggfnoid)); + + peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn; + peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn; + + /* resolve actual type of transition state, if polymorphic */ + aggtranstype = aggform->aggtranstype; + if (aggtranstype == ANYARRAYOID || aggtranstype == ANYELEMENTOID) + { + /* have to fetch the agg's declared input type... */ + Oid agg_arg_types[FUNC_MAX_ARGS]; + int agg_nargs; + + (void) get_func_signature(aggref->aggfnoid, + agg_arg_types, &agg_nargs); + Assert(agg_nargs == 1); + aggtranstype = resolve_generic_type(aggtranstype, + inputType, + agg_arg_types[0]); + } + + /* build expression trees using actual argument & result types */ + build_aggregate_fnexprs(inputType, + aggtranstype, + aggref->aggtype, + transfn_oid, + finalfn_oid, + &transfnexpr, + &finalfnexpr); + + fmgr_info(transfn_oid, &peraggstate->transfn); + peraggstate->transfn.fn_expr = (Node *) transfnexpr; + + if (OidIsValid(finalfn_oid)) + { + fmgr_info(finalfn_oid, &peraggstate->finalfn); + peraggstate->finalfn.fn_expr = (Node *) finalfnexpr; + } get_typlenbyval(aggref->aggtype, &peraggstate->resulttypeLen, &peraggstate->resulttypeByVal); - get_typlenbyval(aggform->aggtranstype, + get_typlenbyval(aggtranstype, &peraggstate->transtypeLen, &peraggstate->transtypeByVal); /* - * initval is potentially null, so don't try to access it as a struct - * field. Must do it the hard way with SysCacheGetAttr. + * initval is potentially null, so don't try to access it as a + * struct field. Must do it the hard way with SysCacheGetAttr. */ textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple, Anum_pg_aggregate_agginitval, @@ -888,14 +1297,7 @@ ExecInitAgg(Agg *node, EState *estate, Plan *parent) peraggstate->initValue = (Datum) 0; else peraggstate->initValue = GetAggInitVal(textInitVal, - aggform->aggtranstype); - - peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn; - peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn; - - fmgr_info(transfn_oid, &peraggstate->transfn); - if (OidIsValid(finalfn_oid)) - fmgr_info(finalfn_oid, &peraggstate->finalfn); + aggtranstype); /* * If the transfn is strict and the initval is NULL, make sure @@ -906,48 +1308,38 @@ ExecInitAgg(Agg *node, EState *estate, Plan *parent) */ if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull) { - /* - * Note: use the type from the input expression here, not - * from pg_proc.proargtypes, because the latter might be 0. - * (Consider COUNT(*).) - */ - Oid inputType = exprType(aggref->target); - - if (!IsBinaryCompatible(inputType, aggform->aggtranstype)) - elog(ERROR, "Aggregate %u needs to have compatible input type and transition type", - aggref->aggfnoid); + if (!IsBinaryCoercible(inputType, aggtranstype)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("aggregate %u needs to have compatible input type and transition type", + aggref->aggfnoid))); } if (aggref->aggdistinct) { - /* - * Note: use the type from the input expression here, not - * from pg_proc.proargtypes, because the latter might be 0. - * (Consider COUNT(*).) - */ - Oid inputType = exprType(aggref->target); Oid eq_function; + /* We don't implement DISTINCT aggs in the HASHED case */ + Assert(node->aggstrategy != AGG_HASHED); + peraggstate->inputType = inputType; get_typlenbyval(inputType, &peraggstate->inputtypeLen, &peraggstate->inputtypeByVal); - eq_function = compatible_oper_funcid(makeList1(makeString("=")), - inputType, inputType, - true); - if (!OidIsValid(eq_function)) - elog(ERROR, "Unable to identify an equality operator for type %s", - format_type_be(inputType)); + eq_function = equality_oper_funcid(inputType); fmgr_info(eq_function, &(peraggstate->equalfn)); - peraggstate->sortOperator = any_ordering_op(inputType); + peraggstate->sortOperator = ordering_oper_opid(inputType); peraggstate->sortstate = NULL; } ReleaseSysCache(aggTuple); } - return TRUE; + /* Update numaggs to match number of unique aggregates found */ + aggstate->numaggs = aggno + 1; + + return aggstate; } static Datum @@ -965,7 +1357,7 @@ GetAggInitVal(Datum textInitVal, Oid transtype) ObjectIdGetDatum(transtype), 0, 0, 0); if (!HeapTupleIsValid(tup)) - elog(ERROR, "GetAggInitVal: cache lookup failed on aggregate transition function return type %u", transtype); + elog(ERROR, "cache lookup failed for type %u", transtype); typinput = ((Form_pg_type) GETSTRUCT(tup))->typinput; typelem = ((Form_pg_type) GETSTRUCT(tup))->typelem; @@ -989,49 +1381,104 @@ ExecCountSlotsAgg(Agg *node) } void -ExecEndAgg(Agg *node) +ExecEndAgg(AggState *node) { - AggState *aggstate = node->aggstate; - Plan *outerPlan; + PlanState *outerPlan; + int aggno; - ExecFreeProjectionInfo(&aggstate->csstate.cstate); + /* Make sure we have closed any open tuplesorts */ + for (aggno = 0; aggno < node->numaggs; aggno++) + { + AggStatePerAgg peraggstate = &node->peragg[aggno]; - /* - * Make sure ExecFreeExprContext() frees the right expr context... - */ - aggstate->csstate.cstate.cs_ExprContext->ecxt_per_tuple_memory = - aggstate->tup_cxt; - ExecFreeExprContext(&aggstate->csstate.cstate); + if (peraggstate->sortstate) + tuplesort_end(peraggstate->sortstate); + } /* - * ... and I free the others. + * Free both the expr contexts. */ - MemoryContextDelete(aggstate->agg_cxt[0]); - MemoryContextDelete(aggstate->agg_cxt[1]); - - outerPlan = outerPlan(node); - ExecEndNode(outerPlan, (Plan *) node); + ExecFreeExprContext(&node->ss.ps); + node->ss.ps.ps_ExprContext = node->tmpcontext; + ExecFreeExprContext(&node->ss.ps); /* clean up tuple table */ - ExecClearTuple(aggstate->csstate.css_ScanTupleSlot); + ExecClearTuple(node->ss.ss_ScanTupleSlot); + + MemoryContextDelete(node->aggcontext); + + outerPlan = outerPlanState(node); + ExecEndNode(outerPlan); } void -ExecReScanAgg(Agg *node, ExprContext *exprCtxt, Plan *parent) +ExecReScanAgg(AggState *node, ExprContext *exprCtxt) { - AggState *aggstate = node->aggstate; - ExprContext *econtext = aggstate->csstate.cstate.cs_ExprContext; + ExprContext *econtext = node->ss.ps.ps_ExprContext; + int aggno; - aggstate->agg_done = false; - MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * aggstate->numaggs); - MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * aggstate->numaggs); + node->agg_done = false; + + if (((Agg *) node->ss.ps.plan)->aggstrategy == AGG_HASHED) + { + /* + * In the hashed case, if we haven't yet built the hash table then + * we can just return; nothing done yet, so nothing to undo. If + * subnode's chgParam is not NULL then it will be re-scanned by + * ExecProcNode, else no reason to re-scan it at all. + */ + if (!node->table_filled) + return; + + /* + * If we do have the hash table and the subplan does not have any + * parameter changes, then we can just rescan the existing hash + * table; no need to build it again. + */ + if (((PlanState *) node)->lefttree->chgParam == NULL) + { + ResetTupleHashIterator(node->hashtable, &node->hashiter); + return; + } + } + + /* Make sure we have closed any open tuplesorts */ + for (aggno = 0; aggno < node->numaggs; aggno++) + { + AggStatePerAgg peraggstate = &node->peragg[aggno]; + + if (peraggstate->sortstate) + tuplesort_end(peraggstate->sortstate); + peraggstate->sortstate = NULL; + } + + /* Release first tuple of group, if we have made a copy */ + if (node->grp_firstTuple != NULL) + { + heap_freetuple(node->grp_firstTuple); + node->grp_firstTuple = NULL; + } + + /* Forget current agg values */ + MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numaggs); + MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numaggs); + + /* Release all temp storage */ + MemoryContextReset(node->aggcontext); + + if (((Agg *) node->ss.ps.plan)->aggstrategy == AGG_HASHED) + { + /* Rebuild an empty hash table */ + build_hash_table(node); + node->table_filled = false; + } /* * if chgParam of subnode is not null then plan will be re-scanned by * first ExecProcNode. */ - if (((Plan *) node)->lefttree->chgParam == NULL) - ExecReScan(((Plan *) node)->lefttree, exprCtxt, (Plan *) node); + if (((PlanState *) node)->lefttree->chgParam == NULL) + ExecReScan(((PlanState *) node)->lefttree, exprCtxt); } /* @@ -1047,7 +1494,7 @@ ExecReScanAgg(Agg *node, ExprContext *exprCtxt, Plan *parent) Datum aggregate_dummy(PG_FUNCTION_ARGS) { - elog(ERROR, "Aggregate function %u called as normal function", + elog(ERROR, "aggregate function %u called as normal function", fcinfo->flinfo->fn_oid); return (Datum) 0; /* keep compiler quiet */ }