X-Git-Url: https://granicus.if.org/sourcecode?a=blobdiff_plain;f=src%2Fbackend%2Fexecutor%2FnodeAgg.c;h=cb0a64c42771beb660c1f30a4c2aec1f39dd1711;hb=391c3811a2b7f4cd666e1b4f35534046a862abbb;hp=0216f8ebde7fe83f122af02ff1682614204a5630;hpb=b60be3f2f8d094da79e04c6eda888f401b09dc39;p=postgresql diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c index 0216f8ebde..cb0a64c427 100644 --- a/src/backend/executor/nodeAgg.c +++ b/src/backend/executor/nodeAgg.c @@ -41,11 +41,11 @@ * this context can be reset once per output tuple. * * - * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group + * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/executor/nodeAgg.c,v 1.96 2002/11/19 23:21:57 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/executor/nodeAgg.c,v 1.118 2004/02/03 17:34:02 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -57,10 +57,9 @@ #include "catalog/pg_operator.h" #include "executor/executor.h" #include "executor/nodeAgg.h" -#include "executor/nodeGroup.h" -#include "executor/nodeHash.h" #include "miscadmin.h" #include "optimizer/clauses.h" +#include "parser/parse_agg.h" #include "parser/parse_coerce.h" #include "parser/parse_expr.h" #include "parser/parse_oper.h" @@ -82,7 +81,8 @@ typedef struct AggStatePerAggData * thereafter: */ - /* Link to Aggref node this working state is for */ + /* Links to Aggref expr and state nodes this working state is for */ + AggrefExprState *aggrefstate; Aggref *aggref; /* Oids of transfer functions */ @@ -178,45 +178,40 @@ typedef struct AggStatePerGroupData /* * To implement hashed aggregation, we need a hashtable that stores a * representative tuple and an array of AggStatePerGroup structs for each - * distinct set of GROUP BY column values. We compute the hash key from + * distinct set of GROUP BY column values. We compute the hash key from * the GROUP BY columns. */ +typedef struct AggHashEntryData *AggHashEntry; + typedef struct AggHashEntryData { - AggHashEntry next; /* next entry in same hash bucket */ - uint32 hashkey; /* exact hash key of this entry */ - HeapTuple firstTuple; /* copy of first tuple in this group */ + TupleHashEntryData shared; /* common header for hash table entries */ /* per-aggregate transition status array - must be last! */ AggStatePerGroupData pergroup[1]; /* VARIABLE LENGTH ARRAY */ } AggHashEntryData; /* VARIABLE LENGTH STRUCT */ -typedef struct AggHashTableData -{ - int nbuckets; /* number of buckets in hash table */ - AggHashEntry buckets[1]; /* VARIABLE LENGTH ARRAY */ -} AggHashTableData; /* VARIABLE LENGTH STRUCT */ - static void initialize_aggregates(AggState *aggstate, - AggStatePerAgg peragg, - AggStatePerGroup pergroup); + AggStatePerAgg peragg, + AggStatePerGroup pergroup); static void advance_transition_function(AggState *aggstate, - AggStatePerAgg peraggstate, - AggStatePerGroup pergroupstate, - Datum newVal, bool isNull); + AggStatePerAgg peraggstate, + AggStatePerGroup pergroupstate, + Datum newVal, bool isNull); static void advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup); static void process_sorted_aggregate(AggState *aggstate, - AggStatePerAgg peraggstate, - AggStatePerGroup pergroupstate); + AggStatePerAgg peraggstate, + AggStatePerGroup pergroupstate); static void finalize_aggregate(AggState *aggstate, - AggStatePerAgg peraggstate, - AggStatePerGroup pergroupstate, - Datum *resultVal, bool *resultIsNull); -static void build_hash_table(Agg *node); -static AggHashEntry lookup_hash_entry(Agg *node, TupleTableSlot *slot); -static TupleTableSlot *agg_retrieve_direct(Agg *node); -static void agg_fill_hash_table(Agg *node); -static TupleTableSlot *agg_retrieve_hash_table(Agg *node); + AggStatePerAgg peraggstate, + AggStatePerGroup pergroupstate, + Datum *resultVal, bool *resultIsNull); +static void build_hash_table(AggState *aggstate); +static AggHashEntry lookup_hash_entry(AggState *aggstate, + TupleTableSlot *slot); +static TupleTableSlot *agg_retrieve_direct(AggState *aggstate); +static void agg_fill_hash_table(AggState *aggstate); +static TupleTableSlot *agg_retrieve_hash_table(AggState *aggstate); static Datum GetAggInitVal(Datum textInitVal, Oid transtype); @@ -253,7 +248,7 @@ initialize_aggregates(AggState *aggstate, peraggstate->sortstate = tuplesort_begin_datum(peraggstate->inputType, peraggstate->sortOperator, - false); + work_mem, false); } /* @@ -270,18 +265,18 @@ initialize_aggregates(AggState *aggstate, oldContext = MemoryContextSwitchTo(aggstate->aggcontext); pergroupstate->transValue = datumCopy(peraggstate->initValue, - peraggstate->transtypeByVal, - peraggstate->transtypeLen); + peraggstate->transtypeByVal, + peraggstate->transtypeLen); MemoryContextSwitchTo(oldContext); } pergroupstate->transValueIsNull = peraggstate->initValueIsNull; /* - * If the initial value for the transition state doesn't exist in the - * pg_aggregate table then we will let the first non-NULL value - * returned from the outer procNode become the initial value. (This is - * useful for aggregates like max() and min().) The noTransValue flag - * signals that we still need to do this. + * If the initial value for the transition state doesn't exist in + * the pg_aggregate table then we will let the first non-NULL + * value returned from the outer procNode become the initial + * value. (This is useful for aggregates like max() and min().) + * The noTransValue flag signals that we still need to do this. */ pergroupstate->noTransValue = peraggstate->initValueIsNull; } @@ -304,8 +299,8 @@ advance_transition_function(AggState *aggstate, if (peraggstate->transfn.fn_strict) { /* - * For a strict transfn, nothing happens at a NULL input - * tuple; we just keep the prior transValue. + * For a strict transfn, nothing happens at a NULL input tuple; we + * just keep the prior transValue. */ if (isNull) return; @@ -319,12 +314,13 @@ advance_transition_function(AggState *aggstate, * here is OK.) * * We must copy the datum into aggcontext if it is pass-by-ref. - * We do not need to pfree the old transValue, since it's NULL. + * We do not need to pfree the old transValue, since it's + * NULL. */ oldContext = MemoryContextSwitchTo(aggstate->aggcontext); pergroupstate->transValue = datumCopy(newVal, - peraggstate->transtypeByVal, - peraggstate->transtypeLen); + peraggstate->transtypeByVal, + peraggstate->transtypeLen); pergroupstate->transValueIsNull = false; pergroupstate->noTransValue = false; MemoryContextSwitchTo(oldContext); @@ -368,12 +364,12 @@ advance_transition_function(AggState *aggstate, newVal = FunctionCallInvoke(&fcinfo); /* - * If pass-by-ref datatype, must copy the new value into aggcontext and - * pfree the prior transValue. But if transfn returned a pointer to its - * first input, we don't need to do anything. + * If pass-by-ref datatype, must copy the new value into aggcontext + * and pfree the prior transValue. But if transfn returned a pointer + * to its first input, we don't need to do anything. */ if (!peraggstate->transtypeByVal && - DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue)) + DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue)) { if (!fcinfo.isnull) { @@ -393,7 +389,7 @@ advance_transition_function(AggState *aggstate, } /* - * Advance all the aggregates for one input tuple. The input tuple + * Advance all the aggregates for one input tuple. The input tuple * has been stored in tmpcontext->ecxt_scantuple, so that it is accessible * to ExecEvalExpr. pergroup is the array of per-group structs to use * (this might be in a hashtable entry). @@ -410,11 +406,12 @@ advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup) { AggStatePerAgg peraggstate = &aggstate->peragg[aggno]; AggStatePerGroup pergroupstate = &pergroup[aggno]; + AggrefExprState *aggrefstate = peraggstate->aggrefstate; Aggref *aggref = peraggstate->aggref; Datum newVal; bool isNull; - newVal = ExecEvalExprSwitchContext(aggref->target, econtext, + newVal = ExecEvalExprSwitchContext(aggrefstate->target, econtext, &isNull, NULL); if (aggref->aggdistinct) @@ -471,8 +468,8 @@ process_sorted_aggregate(AggState *aggstate, continue; /* - * Clear and select the working context for evaluation of - * the equality function and transition function. + * Clear and select the working context for evaluation of the + * equality function and transition function. */ MemoryContextReset(workcontext); oldContext = MemoryContextSwitchTo(workcontext); @@ -521,7 +518,7 @@ finalize_aggregate(AggState *aggstate, { MemoryContext oldContext; - oldContext = MemoryContextSwitchTo(aggstate->csstate.cstate.cs_ExprContext->ecxt_per_tuple_memory); + oldContext = MemoryContextSwitchTo(aggstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory); /* * Apply the agg's finalfn if one is provided, else return transValue. @@ -572,21 +569,26 @@ finalize_aggregate(AggState *aggstate, * The hash table always lives in the aggcontext memory context. */ static void -build_hash_table(Agg *node) +build_hash_table(AggState *aggstate) { - AggState *aggstate = node->aggstate; - AggHashTable hashtable; - Size tabsize; + Agg *node = (Agg *) aggstate->ss.ps.plan; + MemoryContext tmpmem = aggstate->tmpcontext->ecxt_per_tuple_memory; + Size entrysize; Assert(node->aggstrategy == AGG_HASHED); Assert(node->numGroups > 0); - tabsize = sizeof(AggHashTableData) + - (node->numGroups - 1) * sizeof(AggHashEntry); - hashtable = (AggHashTable) MemoryContextAlloc(aggstate->aggcontext, - tabsize); - MemSet(hashtable, 0, tabsize); - hashtable->nbuckets = node->numGroups; - aggstate->hashtable = hashtable; + + entrysize = sizeof(AggHashEntryData) + + (aggstate->numaggs - 1) *sizeof(AggStatePerGroupData); + + aggstate->hashtable = BuildTupleHashTable(node->numCols, + node->grpColIdx, + aggstate->eqfunctions, + aggstate->hashfunctions, + node->numGroups, + entrysize, + aggstate->aggcontext, + tmpmem); } /* @@ -596,77 +598,21 @@ build_hash_table(Agg *node) * When called, CurrentMemoryContext should be the per-query context. */ static AggHashEntry -lookup_hash_entry(Agg *node, TupleTableSlot *slot) +lookup_hash_entry(AggState *aggstate, TupleTableSlot *slot) { - AggState *aggstate = node->aggstate; - AggHashTable hashtable = aggstate->hashtable; - MemoryContext tmpmem = aggstate->tmpcontext->ecxt_per_tuple_memory; - HeapTuple tuple = slot->val; - TupleDesc tupdesc = slot->ttc_tupleDescriptor; - uint32 hashkey = 0; - int i; - int bucketno; - AggHashEntry entry; - MemoryContext oldContext; - Size entrysize; + AggHashEntry entry; + bool isnew; - /* Need to run the hash function in short-lived context */ - oldContext = MemoryContextSwitchTo(tmpmem); + entry = (AggHashEntry) LookupTupleHashEntry(aggstate->hashtable, + slot, + &isnew); - for (i = 0; i < node->numCols; i++) + if (isnew) { - AttrNumber att = node->grpColIdx[i]; - Datum attr; - bool isNull; - - /* rotate hashkey left 1 bit at each step */ - hashkey = (hashkey << 1) | ((hashkey & 0x80000000) ? 1 : 0); - - attr = heap_getattr(tuple, att, tupdesc, &isNull); - if (isNull) - continue; /* treat nulls as having hash key 0 */ - hashkey ^= ComputeHashFunc(attr, - (int) tupdesc->attrs[att - 1]->attlen, - tupdesc->attrs[att - 1]->attbyval); - } - bucketno = hashkey % (uint32) hashtable->nbuckets; - - for (entry = hashtable->buckets[bucketno]; - entry != NULL; - entry = entry->next) - { - /* Quick check using hashkey */ - if (entry->hashkey != hashkey) - continue; - if (execTuplesMatch(entry->firstTuple, - tuple, - tupdesc, - node->numCols, node->grpColIdx, - aggstate->eqfunctions, - tmpmem)) - { - MemoryContextSwitchTo(oldContext); - return entry; - } + /* initialize aggregates for new tuple group */ + initialize_aggregates(aggstate, aggstate->peragg, entry->pergroup); } - /* Not there, so build a new one */ - MemoryContextSwitchTo(aggstate->aggcontext); - entrysize = sizeof(AggHashEntryData) + - (aggstate->numaggs - 1) * sizeof(AggStatePerGroupData); - entry = (AggHashEntry) palloc0(entrysize); - - entry->hashkey = hashkey; - entry->firstTuple = heap_copytuple(tuple); - - entry->next = hashtable->buckets[bucketno]; - hashtable->buckets[bucketno] = entry; - - MemoryContextSwitchTo(oldContext); - - /* initialize aggregates for new tuple group */ - initialize_aggregates(aggstate, aggstate->peragg, entry->pergroup); - return entry; } @@ -677,40 +623,36 @@ lookup_hash_entry(Agg *node, TupleTableSlot *slot) * the appropriate attribute for each aggregate function use (Aggref * node) appearing in the targetlist or qual of the node. The number * of tuples to aggregate over depends on whether grouped or plain - * aggregation is selected. In grouped aggregation, we produce a result + * aggregation is selected. In grouped aggregation, we produce a result * row for each group; in plain aggregation there's a single result row - * for the whole query. In either case, the value of each aggregate is + * for the whole query. In either case, the value of each aggregate is * stored in the expression context to be used when ExecProject evaluates * the result tuple. */ TupleTableSlot * -ExecAgg(Agg *node) +ExecAgg(AggState *node) { - AggState *aggstate = node->aggstate; - - if (aggstate->agg_done) + if (node->agg_done) return NULL; - if (node->aggstrategy == AGG_HASHED) + if (((Agg *) node->ss.ps.plan)->aggstrategy == AGG_HASHED) { - if (!aggstate->table_filled) + if (!node->table_filled) agg_fill_hash_table(node); return agg_retrieve_hash_table(node); } else - { return agg_retrieve_direct(node); - } } /* * ExecAgg for non-hashed case */ static TupleTableSlot * -agg_retrieve_direct(Agg *node) +agg_retrieve_direct(AggState *aggstate) { - AggState *aggstate; - Plan *outerPlan; + Agg *node = (Agg *) aggstate->ss.ps.plan; + PlanState *outerPlan; ExprContext *econtext; ExprContext *tmpcontext; ProjectionInfo *projInfo; @@ -726,22 +668,21 @@ agg_retrieve_direct(Agg *node) /* * get state info from node */ - aggstate = node->aggstate; - outerPlan = outerPlan(node); + outerPlan = outerPlanState(aggstate); /* econtext is the per-output-tuple expression context */ - econtext = aggstate->csstate.cstate.cs_ExprContext; + econtext = aggstate->ss.ps.ps_ExprContext; aggvalues = econtext->ecxt_aggvalues; aggnulls = econtext->ecxt_aggnulls; /* tmpcontext is the per-input-tuple expression context */ tmpcontext = aggstate->tmpcontext; - projInfo = aggstate->csstate.cstate.cs_ProjInfo; + projInfo = aggstate->ss.ps.ps_ProjInfo; peragg = aggstate->peragg; pergroup = aggstate->pergroup; - firstSlot = aggstate->csstate.css_ScanTupleSlot; + firstSlot = aggstate->ss.ss_ScanTupleSlot; /* * We loop retrieving groups until we find one matching - * node->plan.qual + * aggstate->ss.ps.qual */ do { @@ -754,7 +695,7 @@ agg_retrieve_direct(Agg *node) */ if (aggstate->grp_firstTuple == NULL) { - outerslot = ExecProcNode(outerPlan, (Plan *) node); + outerslot = ExecProcNode(outerPlan); if (!TupIsNull(outerslot)) { /* @@ -794,7 +735,7 @@ agg_retrieve_direct(Agg *node) firstSlot, InvalidBuffer, true); - aggstate->grp_firstTuple = NULL; /* don't keep two pointers */ + aggstate->grp_firstTuple = NULL; /* don't keep two pointers */ /* set up for first advance_aggregates call */ tmpcontext->ecxt_scantuple = firstSlot; @@ -810,7 +751,7 @@ agg_retrieve_direct(Agg *node) /* Reset per-input-tuple context after each tuple */ ResetExprContext(tmpcontext); - outerslot = ExecProcNode(outerPlan, (Plan *) node); + outerslot = ExecProcNode(outerPlan); if (TupIsNull(outerslot)) { /* no more outer-plan tuples available */ @@ -831,7 +772,7 @@ agg_retrieve_direct(Agg *node) firstSlot->ttc_tupleDescriptor, node->numCols, node->grpColIdx, aggstate->eqfunctions, - tmpcontext->ecxt_per_tuple_memory)) + tmpcontext->ecxt_per_tuple_memory)) { /* * Save the first input tuple of the next group. @@ -864,15 +805,15 @@ agg_retrieve_direct(Agg *node) * anything), create a dummy all-nulls input tuple for use by * ExecProject. 99.44% of the time this is a waste of cycles, * because ordinarily the projected output tuple's targetlist - * cannot contain any direct (non-aggregated) references to - * input columns, so the dummy tuple will not be referenced. - * However there are special cases where this isn't so --- in - * particular an UPDATE involving an aggregate will have a - * targetlist reference to ctid. We need to return a null for - * ctid in that situation, not coredump. + * cannot contain any direct (non-aggregated) references to input + * columns, so the dummy tuple will not be referenced. However + * there are special cases where this isn't so --- in particular + * an UPDATE involving an aggregate will have a targetlist + * reference to ctid. We need to return a null for ctid in that + * situation, not coredump. * - * The values returned for the aggregates will be the initial - * values of the transition functions. + * The values returned for the aggregates will be the initial values + * of the transition functions. */ if (TupIsNull(firstSlot)) { @@ -917,7 +858,7 @@ agg_retrieve_direct(Agg *node) * Otherwise, return the tuple. */ } - while (!ExecQual(node->plan.qual, econtext, false)); + while (!ExecQual(aggstate->ss.ps.qual, econtext, false)); return resultSlot; } @@ -926,36 +867,34 @@ agg_retrieve_direct(Agg *node) * ExecAgg for hashed case: phase 1, read input and build hash table */ static void -agg_fill_hash_table(Agg *node) +agg_fill_hash_table(AggState *aggstate) { - AggState *aggstate; - Plan *outerPlan; + PlanState *outerPlan; ExprContext *tmpcontext; - AggHashEntry entry; + AggHashEntry entry; TupleTableSlot *outerslot; /* * get state info from node */ - aggstate = node->aggstate; - outerPlan = outerPlan(node); + outerPlan = outerPlanState(aggstate); /* tmpcontext is the per-input-tuple expression context */ tmpcontext = aggstate->tmpcontext; /* - * Process each outer-plan tuple, and then fetch the next one, - * until we exhaust the outer plan. + * Process each outer-plan tuple, and then fetch the next one, until + * we exhaust the outer plan. */ for (;;) { - outerslot = ExecProcNode(outerPlan, (Plan *) node); + outerslot = ExecProcNode(outerPlan); if (TupIsNull(outerslot)) break; /* set up for advance_aggregates call */ tmpcontext->ecxt_scantuple = outerslot; /* Find or build hashtable entry for this tuple's group */ - entry = lookup_hash_entry(node, outerslot); + entry = lookup_hash_entry(aggstate, outerslot); /* Advance the aggregates */ advance_aggregates(aggstate, entry->pergroup); @@ -966,25 +905,22 @@ agg_fill_hash_table(Agg *node) aggstate->table_filled = true; /* Initialize to walk the hash table */ - aggstate->next_hash_entry = NULL; - aggstate->next_hash_bucket = 0; + ResetTupleHashIterator(aggstate->hashtable, &aggstate->hashiter); } /* * ExecAgg for hashed case: phase 2, retrieving groups from hash table */ static TupleTableSlot * -agg_retrieve_hash_table(Agg *node) +agg_retrieve_hash_table(AggState *aggstate) { - AggState *aggstate; ExprContext *econtext; ProjectionInfo *projInfo; Datum *aggvalues; bool *aggnulls; AggStatePerAgg peragg; AggStatePerGroup pergroup; - AggHashTable hashtable; - AggHashEntry entry; + AggHashEntry entry; TupleTableSlot *firstSlot; TupleTableSlot *resultSlot; int aggno; @@ -992,19 +928,17 @@ agg_retrieve_hash_table(Agg *node) /* * get state info from node */ - aggstate = node->aggstate; /* econtext is the per-output-tuple expression context */ - econtext = aggstate->csstate.cstate.cs_ExprContext; + econtext = aggstate->ss.ps.ps_ExprContext; aggvalues = econtext->ecxt_aggvalues; aggnulls = econtext->ecxt_aggnulls; - projInfo = aggstate->csstate.cstate.cs_ProjInfo; + projInfo = aggstate->ss.ps.ps_ProjInfo; peragg = aggstate->peragg; - hashtable = aggstate->hashtable; - firstSlot = aggstate->csstate.css_ScanTupleSlot; + firstSlot = aggstate->ss.ss_ScanTupleSlot; /* - * We loop retrieving groups until we find one matching - * node->plan.qual + * We loop retrieving groups until we find one satisfying + * aggstate->ss.ps.qual */ do { @@ -1014,18 +948,13 @@ agg_retrieve_hash_table(Agg *node) /* * Find the next entry in the hash table */ - entry = aggstate->next_hash_entry; - while (entry == NULL) + entry = (AggHashEntry) ScanTupleHashTable(&aggstate->hashiter); + if (entry == NULL) { - if (aggstate->next_hash_bucket >= hashtable->nbuckets) - { - /* No more entries in hashtable, so done */ - aggstate->agg_done = TRUE; - return NULL; - } - entry = hashtable->buckets[aggstate->next_hash_bucket++]; + /* No more entries in hashtable, so done */ + aggstate->agg_done = TRUE; + return NULL; } - aggstate->next_hash_entry = entry->next; /* * Clear the per-output-tuple context for each group @@ -1036,7 +965,7 @@ agg_retrieve_hash_table(Agg *node) * Store the copied first input tuple in the tuple table slot * reserved for it, so that it can be used in ExecProject. */ - ExecStoreTuple(entry->firstTuple, + ExecStoreTuple(entry->shared.firstTuple, firstSlot, InvalidBuffer, false); @@ -1071,7 +1000,7 @@ agg_retrieve_hash_table(Agg *node) * Otherwise, return the tuple. */ } - while (!ExecQual(node->plan.qual, econtext, false)); + while (!ExecQual(aggstate->ss.ps.qual, econtext, false)); return resultSlot; } @@ -1083,8 +1012,8 @@ agg_retrieve_hash_table(Agg *node) * planner and initializes its outer subtree * ----------------- */ -bool -ExecInitAgg(Agg *node, EState *estate, Plan *parent) +AggState * +ExecInitAgg(Agg *node, EState *estate) { AggState *aggstate; AggStatePerAgg peragg; @@ -1094,17 +1023,17 @@ ExecInitAgg(Agg *node, EState *estate, Plan *parent) aggno; List *alist; - /* - * assign the node's execution state - */ - node->plan.state = estate; - /* * create state structure */ aggstate = makeNode(AggState); - node->aggstate = aggstate; + aggstate->ss.ps.plan = (Plan *) node; + aggstate->ss.ps.state = estate; + + aggstate->aggs = NIL; + aggstate->numaggs = 0; aggstate->eqfunctions = NULL; + aggstate->hashfunctions = NULL; aggstate->peragg = NULL; aggstate->agg_done = false; aggstate->pergroup = NULL; @@ -1112,44 +1041,20 @@ ExecInitAgg(Agg *node, EState *estate, Plan *parent) aggstate->hashtable = NULL; /* - * find aggregates in targetlist and quals - * - * Note: pull_agg_clauses also checks that no aggs contain other agg - * calls in their arguments. This would make no sense under SQL - * semantics anyway (and it's forbidden by the spec). Because that is - * true, we don't need to worry about evaluating the aggs in any - * particular order. - */ - aggstate->aggs = nconc(pull_agg_clause((Node *) node->plan.targetlist), - pull_agg_clause((Node *) node->plan.qual)); - aggstate->numaggs = numaggs = length(aggstate->aggs); - if (numaggs <= 0) - { - /* - * This is not an error condition: we might be using the Agg node just - * to do hash-based grouping. Even in the regular case, - * constant-expression simplification could optimize away all of the - * Aggrefs in the targetlist and qual. So keep going, but force local - * copy of numaggs positive so that palloc()s below don't choke. - */ - numaggs = 1; - } - - /* - * Create expression contexts. We need two, one for per-input-tuple - * processing and one for per-output-tuple processing. We cheat a little - * by using ExecAssignExprContext() to build both. + * Create expression contexts. We need two, one for per-input-tuple + * processing and one for per-output-tuple processing. We cheat a + * little by using ExecAssignExprContext() to build both. */ - ExecAssignExprContext(estate, &aggstate->csstate.cstate); - aggstate->tmpcontext = aggstate->csstate.cstate.cs_ExprContext; - ExecAssignExprContext(estate, &aggstate->csstate.cstate); + ExecAssignExprContext(estate, &aggstate->ss.ps); + aggstate->tmpcontext = aggstate->ss.ps.ps_ExprContext; + ExecAssignExprContext(estate, &aggstate->ss.ps); /* - * We also need a long-lived memory context for holding hashtable - * data structures and transition values. NOTE: the details of what - * is stored in aggcontext and what is stored in the regular per-query - * memory context are driven by a simple decision: we want to reset the - * aggcontext in ExecReScanAgg to recover no-longer-wanted space. + * We also need a long-lived memory context for holding hashtable data + * structures and transition values. NOTE: the details of what is + * stored in aggcontext and what is stored in the regular per-query + * memory context are driven by a simple decision: we want to reset + * the aggcontext in ExecReScanAgg to recover no-longer-wanted space. */ aggstate->aggcontext = AllocSetContextCreate(CurrentMemoryContext, @@ -1163,88 +1068,169 @@ ExecInitAgg(Agg *node, EState *estate, Plan *parent) /* * tuple table initialization */ - ExecInitScanTupleSlot(estate, &aggstate->csstate); - ExecInitResultTupleSlot(estate, &aggstate->csstate.cstate); + ExecInitScanTupleSlot(estate, &aggstate->ss); + ExecInitResultTupleSlot(estate, &aggstate->ss.ps); /* - * Set up aggregate-result storage in the output expr context, and also - * allocate my private per-agg working storage + * initialize child expressions + * + * Note: ExecInitExpr finds Aggrefs for us, and also checks that no aggs + * contain other agg calls in their arguments. This would make no + * sense under SQL semantics anyway (and it's forbidden by the spec). + * Because that is true, we don't need to worry about evaluating the + * aggs in any particular order. */ - econtext = aggstate->csstate.cstate.cs_ExprContext; - econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numaggs); - econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numaggs); - - peragg = (AggStatePerAgg) palloc0(sizeof(AggStatePerAggData) * numaggs); - aggstate->peragg = peragg; - - if (node->aggstrategy == AGG_HASHED) - { - build_hash_table(node); - aggstate->table_filled = false; - } - else - { - AggStatePerGroup pergroup; - - pergroup = (AggStatePerGroup) palloc0(sizeof(AggStatePerGroupData) * numaggs); - aggstate->pergroup = pergroup; - } + aggstate->ss.ps.targetlist = (List *) + ExecInitExpr((Expr *) node->plan.targetlist, + (PlanState *) aggstate); + aggstate->ss.ps.qual = (List *) + ExecInitExpr((Expr *) node->plan.qual, + (PlanState *) aggstate); /* * initialize child nodes */ outerPlan = outerPlan(node); - ExecInitNode(outerPlan, estate, (Plan *) node); + outerPlanState(aggstate) = ExecInitNode(outerPlan, estate); /* * initialize source tuple type. */ - ExecAssignScanTypeFromOuterPlan((Plan *) node, &aggstate->csstate); + ExecAssignScanTypeFromOuterPlan(&aggstate->ss); /* * Initialize result tuple type and projection info. */ - ExecAssignResultTypeFromTL((Plan *) node, &aggstate->csstate.cstate); - ExecAssignProjectionInfo((Plan *) node, &aggstate->csstate.cstate); + ExecAssignResultTypeFromTL(&aggstate->ss.ps); + ExecAssignProjectionInfo(&aggstate->ss.ps); + + /* + * get the count of aggregates in targetlist and quals + */ + numaggs = aggstate->numaggs; + Assert(numaggs == length(aggstate->aggs)); + if (numaggs <= 0) + { + /* + * This is not an error condition: we might be using the Agg node + * just to do hash-based grouping. Even in the regular case, + * constant-expression simplification could optimize away all of + * the Aggrefs in the targetlist and qual. So keep going, but + * force local copy of numaggs positive so that palloc()s below + * don't choke. + */ + numaggs = 1; + } /* - * If we are grouping, precompute fmgr lookup data for inner loop + * If we are grouping, precompute fmgr lookup data for inner loop. We + * need both equality and hashing functions to do it by hashing, but + * only equality if not hashing. */ if (node->numCols > 0) { - aggstate->eqfunctions = - execTuplesMatchPrepare(ExecGetScanType(&aggstate->csstate), - node->numCols, - node->grpColIdx); + if (node->aggstrategy == AGG_HASHED) + execTuplesHashPrepare(ExecGetScanType(&aggstate->ss), + node->numCols, + node->grpColIdx, + &aggstate->eqfunctions, + &aggstate->hashfunctions); + else + aggstate->eqfunctions = + execTuplesMatchPrepare(ExecGetScanType(&aggstate->ss), + node->numCols, + node->grpColIdx); + } + + /* + * Set up aggregate-result storage in the output expr context, and + * also allocate my private per-agg working storage + */ + econtext = aggstate->ss.ps.ps_ExprContext; + econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numaggs); + econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numaggs); + + peragg = (AggStatePerAgg) palloc0(sizeof(AggStatePerAggData) * numaggs); + aggstate->peragg = peragg; + + if (node->aggstrategy == AGG_HASHED) + { + build_hash_table(aggstate); + aggstate->table_filled = false; + } + else + { + AggStatePerGroup pergroup; + + pergroup = (AggStatePerGroup) palloc0(sizeof(AggStatePerGroupData) * numaggs); + aggstate->pergroup = pergroup; } /* * Perform lookups of aggregate function info, and initialize the - * unchanging fields of the per-agg data + * unchanging fields of the per-agg data. We also detect duplicate + * aggregates (for example, "SELECT sum(x) ... HAVING sum(x) > 0"). + * When duplicates are detected, we only make an AggStatePerAgg struct + * for the first one. The clones are simply pointed at the same + * result entry by giving them duplicate aggno values. */ aggno = -1; foreach(alist, aggstate->aggs) { - Aggref *aggref = (Aggref *) lfirst(alist); - AggStatePerAgg peraggstate = &peragg[++aggno]; + AggrefExprState *aggrefstate = (AggrefExprState *) lfirst(alist); + Aggref *aggref = (Aggref *) aggrefstate->xprstate.expr; + AggStatePerAgg peraggstate; + Oid inputType; HeapTuple aggTuple; Form_pg_aggregate aggform; + Oid aggtranstype; AclResult aclresult; Oid transfn_oid, finalfn_oid; + Expr *transfnexpr, + *finalfnexpr; Datum textInitVal; + int i; - /* Mark Aggref node with its associated index in the result array */ - aggref->aggno = aggno; + /* Planner should have assigned aggregate to correct level */ + Assert(aggref->agglevelsup == 0); + + /* Look for a previous duplicate aggregate */ + for (i = 0; i <= aggno; i++) + { + if (equal(aggref, peragg[i].aggref) && + !contain_volatile_functions((Node *) aggref)) + break; + } + if (i <= aggno) + { + /* Found a match to an existing entry, so just mark it */ + aggrefstate->aggno = i; + continue; + } + + /* Nope, so assign a new PerAgg record */ + peraggstate = &peragg[++aggno]; + + /* Mark Aggref state node with assigned index in the result array */ + aggrefstate->aggno = aggno; /* Fill in the peraggstate data */ + peraggstate->aggrefstate = aggrefstate; peraggstate->aggref = aggref; + /* + * Get actual datatype of the input. We need this because it may + * be different from the agg's declared input type, when the agg + * accepts ANY (eg, COUNT(*)) or ANYARRAY or ANYELEMENT. + */ + inputType = exprType((Node *) aggref->target); + aggTuple = SearchSysCache(AGGFNOID, ObjectIdGetDatum(aggref->aggfnoid), 0, 0, 0); if (!HeapTupleIsValid(aggTuple)) - elog(ERROR, "ExecAgg: cache lookup failed for aggregate %u", + elog(ERROR, "cache lookup failed for aggregate %u", aggref->aggfnoid); aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple); @@ -1252,12 +1238,50 @@ ExecInitAgg(Agg *node, EState *estate, Plan *parent) aclresult = pg_proc_aclcheck(aggref->aggfnoid, GetUserId(), ACL_EXECUTE); if (aclresult != ACLCHECK_OK) - aclcheck_error(aclresult, get_func_name(aggref->aggfnoid)); + aclcheck_error(aclresult, ACL_KIND_PROC, + get_func_name(aggref->aggfnoid)); + + peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn; + peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn; + + /* resolve actual type of transition state, if polymorphic */ + aggtranstype = aggform->aggtranstype; + if (aggtranstype == ANYARRAYOID || aggtranstype == ANYELEMENTOID) + { + /* have to fetch the agg's declared input type... */ + Oid agg_arg_types[FUNC_MAX_ARGS]; + int agg_nargs; + + (void) get_func_signature(aggref->aggfnoid, + agg_arg_types, &agg_nargs); + Assert(agg_nargs == 1); + aggtranstype = resolve_generic_type(aggtranstype, + inputType, + agg_arg_types[0]); + } + + /* build expression trees using actual argument & result types */ + build_aggregate_fnexprs(inputType, + aggtranstype, + aggref->aggtype, + transfn_oid, + finalfn_oid, + &transfnexpr, + &finalfnexpr); + + fmgr_info(transfn_oid, &peraggstate->transfn); + peraggstate->transfn.fn_expr = (Node *) transfnexpr; + + if (OidIsValid(finalfn_oid)) + { + fmgr_info(finalfn_oid, &peraggstate->finalfn); + peraggstate->finalfn.fn_expr = (Node *) finalfnexpr; + } get_typlenbyval(aggref->aggtype, &peraggstate->resulttypeLen, &peraggstate->resulttypeByVal); - get_typlenbyval(aggform->aggtranstype, + get_typlenbyval(aggtranstype, &peraggstate->transtypeLen, &peraggstate->transtypeByVal); @@ -1273,14 +1297,7 @@ ExecInitAgg(Agg *node, EState *estate, Plan *parent) peraggstate->initValue = (Datum) 0; else peraggstate->initValue = GetAggInitVal(textInitVal, - aggform->aggtranstype); - - peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn; - peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn; - - fmgr_info(transfn_oid, &peraggstate->transfn); - if (OidIsValid(finalfn_oid)) - fmgr_info(finalfn_oid, &peraggstate->finalfn); + aggtranstype); /* * If the transfn is strict and the initval is NULL, make sure @@ -1291,26 +1308,15 @@ ExecInitAgg(Agg *node, EState *estate, Plan *parent) */ if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull) { - /* - * Note: use the type from the input expression here, not from - * pg_proc.proargtypes, because the latter might be 0. - * (Consider COUNT(*).) - */ - Oid inputType = exprType(aggref->target); - - if (!IsBinaryCoercible(inputType, aggform->aggtranstype)) - elog(ERROR, "Aggregate %u needs to have compatible input type and transition type", - aggref->aggfnoid); + if (!IsBinaryCoercible(inputType, aggtranstype)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("aggregate %u needs to have compatible input type and transition type", + aggref->aggfnoid))); } if (aggref->aggdistinct) { - /* - * Note: use the type from the input expression here, not from - * pg_proc.proargtypes, because the latter might be a pseudotype. - * (Consider COUNT(*).) - */ - Oid inputType = exprType(aggref->target); Oid eq_function; /* We don't implement DISTINCT aggs in the HASHED case */ @@ -1321,21 +1327,19 @@ ExecInitAgg(Agg *node, EState *estate, Plan *parent) &peraggstate->inputtypeLen, &peraggstate->inputtypeByVal); - eq_function = compatible_oper_funcid(makeList1(makeString("=")), - inputType, inputType, - true); - if (!OidIsValid(eq_function)) - elog(ERROR, "Unable to identify an equality operator for type %s", - format_type_be(inputType)); + eq_function = equality_oper_funcid(inputType); fmgr_info(eq_function, &(peraggstate->equalfn)); - peraggstate->sortOperator = any_ordering_op(inputType); + peraggstate->sortOperator = ordering_oper_opid(inputType); peraggstate->sortstate = NULL; } ReleaseSysCache(aggTuple); } - return TRUE; + /* Update numaggs to match number of unique aggregates found */ + aggstate->numaggs = aggno + 1; + + return aggstate; } static Datum @@ -1353,7 +1357,7 @@ GetAggInitVal(Datum textInitVal, Oid transtype) ObjectIdGetDatum(transtype), 0, 0, 0); if (!HeapTupleIsValid(tup)) - elog(ERROR, "GetAggInitVal: cache lookup failed on aggregate transition function return type %u", transtype); + elog(ERROR, "cache lookup failed for type %u", transtype); typinput = ((Form_pg_type) GETSTRUCT(tup))->typinput; typelem = ((Form_pg_type) GETSTRUCT(tup))->typelem; @@ -1377,84 +1381,104 @@ ExecCountSlotsAgg(Agg *node) } void -ExecEndAgg(Agg *node) +ExecEndAgg(AggState *node) { - AggState *aggstate = node->aggstate; - Plan *outerPlan; + PlanState *outerPlan; int aggno; /* Make sure we have closed any open tuplesorts */ - for (aggno = 0; aggno < aggstate->numaggs; aggno++) + for (aggno = 0; aggno < node->numaggs; aggno++) { - AggStatePerAgg peraggstate = &aggstate->peragg[aggno]; + AggStatePerAgg peraggstate = &node->peragg[aggno]; if (peraggstate->sortstate) tuplesort_end(peraggstate->sortstate); } - ExecFreeProjectionInfo(&aggstate->csstate.cstate); - /* * Free both the expr contexts. */ - ExecFreeExprContext(&aggstate->csstate.cstate); - aggstate->csstate.cstate.cs_ExprContext = aggstate->tmpcontext; - ExecFreeExprContext(&aggstate->csstate.cstate); + ExecFreeExprContext(&node->ss.ps); + node->ss.ps.ps_ExprContext = node->tmpcontext; + ExecFreeExprContext(&node->ss.ps); - MemoryContextDelete(aggstate->aggcontext); + /* clean up tuple table */ + ExecClearTuple(node->ss.ss_ScanTupleSlot); - outerPlan = outerPlan(node); - ExecEndNode(outerPlan, (Plan *) node); + MemoryContextDelete(node->aggcontext); - /* clean up tuple table */ - ExecClearTuple(aggstate->csstate.css_ScanTupleSlot); - if (aggstate->grp_firstTuple != NULL) - { - heap_freetuple(aggstate->grp_firstTuple); - aggstate->grp_firstTuple = NULL; - } + outerPlan = outerPlanState(node); + ExecEndNode(outerPlan); } void -ExecReScanAgg(Agg *node, ExprContext *exprCtxt, Plan *parent) +ExecReScanAgg(AggState *node, ExprContext *exprCtxt) { - AggState *aggstate = node->aggstate; - ExprContext *econtext = aggstate->csstate.cstate.cs_ExprContext; + ExprContext *econtext = node->ss.ps.ps_ExprContext; int aggno; + node->agg_done = false; + + if (((Agg *) node->ss.ps.plan)->aggstrategy == AGG_HASHED) + { + /* + * In the hashed case, if we haven't yet built the hash table then + * we can just return; nothing done yet, so nothing to undo. If + * subnode's chgParam is not NULL then it will be re-scanned by + * ExecProcNode, else no reason to re-scan it at all. + */ + if (!node->table_filled) + return; + + /* + * If we do have the hash table and the subplan does not have any + * parameter changes, then we can just rescan the existing hash + * table; no need to build it again. + */ + if (((PlanState *) node)->lefttree->chgParam == NULL) + { + ResetTupleHashIterator(node->hashtable, &node->hashiter); + return; + } + } + /* Make sure we have closed any open tuplesorts */ - for (aggno = 0; aggno < aggstate->numaggs; aggno++) + for (aggno = 0; aggno < node->numaggs; aggno++) { - AggStatePerAgg peraggstate = &aggstate->peragg[aggno]; + AggStatePerAgg peraggstate = &node->peragg[aggno]; if (peraggstate->sortstate) tuplesort_end(peraggstate->sortstate); peraggstate->sortstate = NULL; } - aggstate->agg_done = false; - if (aggstate->grp_firstTuple != NULL) + /* Release first tuple of group, if we have made a copy */ + if (node->grp_firstTuple != NULL) { - heap_freetuple(aggstate->grp_firstTuple); - aggstate->grp_firstTuple = NULL; + heap_freetuple(node->grp_firstTuple); + node->grp_firstTuple = NULL; } - MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * aggstate->numaggs); - MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * aggstate->numaggs); - MemoryContextReset(aggstate->aggcontext); + /* Forget current agg values */ + MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numaggs); + MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numaggs); - if (node->aggstrategy == AGG_HASHED) + /* Release all temp storage */ + MemoryContextReset(node->aggcontext); + + if (((Agg *) node->ss.ps.plan)->aggstrategy == AGG_HASHED) { + /* Rebuild an empty hash table */ build_hash_table(node); - aggstate->table_filled = false; + node->table_filled = false; } /* * if chgParam of subnode is not null then plan will be re-scanned by * first ExecProcNode. */ - if (((Plan *) node)->lefttree->chgParam == NULL) - ExecReScan(((Plan *) node)->lefttree, exprCtxt, (Plan *) node); + if (((PlanState *) node)->lefttree->chgParam == NULL) + ExecReScan(((PlanState *) node)->lefttree, exprCtxt); } /* @@ -1470,7 +1494,7 @@ ExecReScanAgg(Agg *node, ExprContext *exprCtxt, Plan *parent) Datum aggregate_dummy(PG_FUNCTION_ARGS) { - elog(ERROR, "Aggregate function %u called as normal function", + elog(ERROR, "aggregate function %u called as normal function", fcinfo->flinfo->fn_oid); return (Datum) 0; /* keep compiler quiet */ }