* this context can be reset once per output tuple.
*
*
- * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/executor/nodeAgg.c,v 1.96 2002/11/19 23:21:57 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/executor/nodeAgg.c,v 1.118 2004/02/03 17:34:02 tgl Exp $
*
*-------------------------------------------------------------------------
*/
#include "catalog/pg_operator.h"
#include "executor/executor.h"
#include "executor/nodeAgg.h"
-#include "executor/nodeGroup.h"
-#include "executor/nodeHash.h"
#include "miscadmin.h"
#include "optimizer/clauses.h"
+#include "parser/parse_agg.h"
#include "parser/parse_coerce.h"
#include "parser/parse_expr.h"
#include "parser/parse_oper.h"
* thereafter:
*/
- /* Link to Aggref node this working state is for */
+ /* Links to Aggref expr and state nodes this working state is for */
+ AggrefExprState *aggrefstate;
Aggref *aggref;
/* Oids of transfer functions */
/*
* To implement hashed aggregation, we need a hashtable that stores a
* representative tuple and an array of AggStatePerGroup structs for each
- * distinct set of GROUP BY column values. We compute the hash key from
+ * distinct set of GROUP BY column values. We compute the hash key from
* the GROUP BY columns.
*/
+typedef struct AggHashEntryData *AggHashEntry;
+
typedef struct AggHashEntryData
{
- AggHashEntry next; /* next entry in same hash bucket */
- uint32 hashkey; /* exact hash key of this entry */
- HeapTuple firstTuple; /* copy of first tuple in this group */
+ TupleHashEntryData shared; /* common header for hash table entries */
/* per-aggregate transition status array - must be last! */
AggStatePerGroupData pergroup[1]; /* VARIABLE LENGTH ARRAY */
} AggHashEntryData; /* VARIABLE LENGTH STRUCT */
-typedef struct AggHashTableData
-{
- int nbuckets; /* number of buckets in hash table */
- AggHashEntry buckets[1]; /* VARIABLE LENGTH ARRAY */
-} AggHashTableData; /* VARIABLE LENGTH STRUCT */
-
static void initialize_aggregates(AggState *aggstate,
- AggStatePerAgg peragg,
- AggStatePerGroup pergroup);
+ AggStatePerAgg peragg,
+ AggStatePerGroup pergroup);
static void advance_transition_function(AggState *aggstate,
- AggStatePerAgg peraggstate,
- AggStatePerGroup pergroupstate,
- Datum newVal, bool isNull);
+ AggStatePerAgg peraggstate,
+ AggStatePerGroup pergroupstate,
+ Datum newVal, bool isNull);
static void advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup);
static void process_sorted_aggregate(AggState *aggstate,
- AggStatePerAgg peraggstate,
- AggStatePerGroup pergroupstate);
+ AggStatePerAgg peraggstate,
+ AggStatePerGroup pergroupstate);
static void finalize_aggregate(AggState *aggstate,
- AggStatePerAgg peraggstate,
- AggStatePerGroup pergroupstate,
- Datum *resultVal, bool *resultIsNull);
-static void build_hash_table(Agg *node);
-static AggHashEntry lookup_hash_entry(Agg *node, TupleTableSlot *slot);
-static TupleTableSlot *agg_retrieve_direct(Agg *node);
-static void agg_fill_hash_table(Agg *node);
-static TupleTableSlot *agg_retrieve_hash_table(Agg *node);
+ AggStatePerAgg peraggstate,
+ AggStatePerGroup pergroupstate,
+ Datum *resultVal, bool *resultIsNull);
+static void build_hash_table(AggState *aggstate);
+static AggHashEntry lookup_hash_entry(AggState *aggstate,
+ TupleTableSlot *slot);
+static TupleTableSlot *agg_retrieve_direct(AggState *aggstate);
+static void agg_fill_hash_table(AggState *aggstate);
+static TupleTableSlot *agg_retrieve_hash_table(AggState *aggstate);
static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
peraggstate->sortstate =
tuplesort_begin_datum(peraggstate->inputType,
peraggstate->sortOperator,
- false);
+ work_mem, false);
}
/*
oldContext = MemoryContextSwitchTo(aggstate->aggcontext);
pergroupstate->transValue = datumCopy(peraggstate->initValue,
- peraggstate->transtypeByVal,
- peraggstate->transtypeLen);
+ peraggstate->transtypeByVal,
+ peraggstate->transtypeLen);
MemoryContextSwitchTo(oldContext);
}
pergroupstate->transValueIsNull = peraggstate->initValueIsNull;
/*
- * If the initial value for the transition state doesn't exist in the
- * pg_aggregate table then we will let the first non-NULL value
- * returned from the outer procNode become the initial value. (This is
- * useful for aggregates like max() and min().) The noTransValue flag
- * signals that we still need to do this.
+ * If the initial value for the transition state doesn't exist in
+ * the pg_aggregate table then we will let the first non-NULL
+ * value returned from the outer procNode become the initial
+ * value. (This is useful for aggregates like max() and min().)
+ * The noTransValue flag signals that we still need to do this.
*/
pergroupstate->noTransValue = peraggstate->initValueIsNull;
}
if (peraggstate->transfn.fn_strict)
{
/*
- * For a strict transfn, nothing happens at a NULL input
- * tuple; we just keep the prior transValue.
+ * For a strict transfn, nothing happens at a NULL input tuple; we
+ * just keep the prior transValue.
*/
if (isNull)
return;
* here is OK.)
*
* We must copy the datum into aggcontext if it is pass-by-ref.
- * We do not need to pfree the old transValue, since it's NULL.
+ * We do not need to pfree the old transValue, since it's
+ * NULL.
*/
oldContext = MemoryContextSwitchTo(aggstate->aggcontext);
pergroupstate->transValue = datumCopy(newVal,
- peraggstate->transtypeByVal,
- peraggstate->transtypeLen);
+ peraggstate->transtypeByVal,
+ peraggstate->transtypeLen);
pergroupstate->transValueIsNull = false;
pergroupstate->noTransValue = false;
MemoryContextSwitchTo(oldContext);
newVal = FunctionCallInvoke(&fcinfo);
/*
- * If pass-by-ref datatype, must copy the new value into aggcontext and
- * pfree the prior transValue. But if transfn returned a pointer to its
- * first input, we don't need to do anything.
+ * If pass-by-ref datatype, must copy the new value into aggcontext
+ * and pfree the prior transValue. But if transfn returned a pointer
+ * to its first input, we don't need to do anything.
*/
if (!peraggstate->transtypeByVal &&
- DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
+ DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
{
if (!fcinfo.isnull)
{
}
/*
- * Advance all the aggregates for one input tuple. The input tuple
+ * Advance all the aggregates for one input tuple. The input tuple
* has been stored in tmpcontext->ecxt_scantuple, so that it is accessible
* to ExecEvalExpr. pergroup is the array of per-group structs to use
* (this might be in a hashtable entry).
{
AggStatePerAgg peraggstate = &aggstate->peragg[aggno];
AggStatePerGroup pergroupstate = &pergroup[aggno];
+ AggrefExprState *aggrefstate = peraggstate->aggrefstate;
Aggref *aggref = peraggstate->aggref;
Datum newVal;
bool isNull;
- newVal = ExecEvalExprSwitchContext(aggref->target, econtext,
+ newVal = ExecEvalExprSwitchContext(aggrefstate->target, econtext,
&isNull, NULL);
if (aggref->aggdistinct)
continue;
/*
- * Clear and select the working context for evaluation of
- * the equality function and transition function.
+ * Clear and select the working context for evaluation of the
+ * equality function and transition function.
*/
MemoryContextReset(workcontext);
oldContext = MemoryContextSwitchTo(workcontext);
{
MemoryContext oldContext;
- oldContext = MemoryContextSwitchTo(aggstate->csstate.cstate.cs_ExprContext->ecxt_per_tuple_memory);
+ oldContext = MemoryContextSwitchTo(aggstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
/*
* Apply the agg's finalfn if one is provided, else return transValue.
* The hash table always lives in the aggcontext memory context.
*/
static void
-build_hash_table(Agg *node)
+build_hash_table(AggState *aggstate)
{
- AggState *aggstate = node->aggstate;
- AggHashTable hashtable;
- Size tabsize;
+ Agg *node = (Agg *) aggstate->ss.ps.plan;
+ MemoryContext tmpmem = aggstate->tmpcontext->ecxt_per_tuple_memory;
+ Size entrysize;
Assert(node->aggstrategy == AGG_HASHED);
Assert(node->numGroups > 0);
- tabsize = sizeof(AggHashTableData) +
- (node->numGroups - 1) * sizeof(AggHashEntry);
- hashtable = (AggHashTable) MemoryContextAlloc(aggstate->aggcontext,
- tabsize);
- MemSet(hashtable, 0, tabsize);
- hashtable->nbuckets = node->numGroups;
- aggstate->hashtable = hashtable;
+
+ entrysize = sizeof(AggHashEntryData) +
+ (aggstate->numaggs - 1) *sizeof(AggStatePerGroupData);
+
+ aggstate->hashtable = BuildTupleHashTable(node->numCols,
+ node->grpColIdx,
+ aggstate->eqfunctions,
+ aggstate->hashfunctions,
+ node->numGroups,
+ entrysize,
+ aggstate->aggcontext,
+ tmpmem);
}
/*
* When called, CurrentMemoryContext should be the per-query context.
*/
static AggHashEntry
-lookup_hash_entry(Agg *node, TupleTableSlot *slot)
+lookup_hash_entry(AggState *aggstate, TupleTableSlot *slot)
{
- AggState *aggstate = node->aggstate;
- AggHashTable hashtable = aggstate->hashtable;
- MemoryContext tmpmem = aggstate->tmpcontext->ecxt_per_tuple_memory;
- HeapTuple tuple = slot->val;
- TupleDesc tupdesc = slot->ttc_tupleDescriptor;
- uint32 hashkey = 0;
- int i;
- int bucketno;
- AggHashEntry entry;
- MemoryContext oldContext;
- Size entrysize;
+ AggHashEntry entry;
+ bool isnew;
- /* Need to run the hash function in short-lived context */
- oldContext = MemoryContextSwitchTo(tmpmem);
+ entry = (AggHashEntry) LookupTupleHashEntry(aggstate->hashtable,
+ slot,
+ &isnew);
- for (i = 0; i < node->numCols; i++)
+ if (isnew)
{
- AttrNumber att = node->grpColIdx[i];
- Datum attr;
- bool isNull;
-
- /* rotate hashkey left 1 bit at each step */
- hashkey = (hashkey << 1) | ((hashkey & 0x80000000) ? 1 : 0);
-
- attr = heap_getattr(tuple, att, tupdesc, &isNull);
- if (isNull)
- continue; /* treat nulls as having hash key 0 */
- hashkey ^= ComputeHashFunc(attr,
- (int) tupdesc->attrs[att - 1]->attlen,
- tupdesc->attrs[att - 1]->attbyval);
- }
- bucketno = hashkey % (uint32) hashtable->nbuckets;
-
- for (entry = hashtable->buckets[bucketno];
- entry != NULL;
- entry = entry->next)
- {
- /* Quick check using hashkey */
- if (entry->hashkey != hashkey)
- continue;
- if (execTuplesMatch(entry->firstTuple,
- tuple,
- tupdesc,
- node->numCols, node->grpColIdx,
- aggstate->eqfunctions,
- tmpmem))
- {
- MemoryContextSwitchTo(oldContext);
- return entry;
- }
+ /* initialize aggregates for new tuple group */
+ initialize_aggregates(aggstate, aggstate->peragg, entry->pergroup);
}
- /* Not there, so build a new one */
- MemoryContextSwitchTo(aggstate->aggcontext);
- entrysize = sizeof(AggHashEntryData) +
- (aggstate->numaggs - 1) * sizeof(AggStatePerGroupData);
- entry = (AggHashEntry) palloc0(entrysize);
-
- entry->hashkey = hashkey;
- entry->firstTuple = heap_copytuple(tuple);
-
- entry->next = hashtable->buckets[bucketno];
- hashtable->buckets[bucketno] = entry;
-
- MemoryContextSwitchTo(oldContext);
-
- /* initialize aggregates for new tuple group */
- initialize_aggregates(aggstate, aggstate->peragg, entry->pergroup);
-
return entry;
}
* the appropriate attribute for each aggregate function use (Aggref
* node) appearing in the targetlist or qual of the node. The number
* of tuples to aggregate over depends on whether grouped or plain
- * aggregation is selected. In grouped aggregation, we produce a result
+ * aggregation is selected. In grouped aggregation, we produce a result
* row for each group; in plain aggregation there's a single result row
- * for the whole query. In either case, the value of each aggregate is
+ * for the whole query. In either case, the value of each aggregate is
* stored in the expression context to be used when ExecProject evaluates
* the result tuple.
*/
TupleTableSlot *
-ExecAgg(Agg *node)
+ExecAgg(AggState *node)
{
- AggState *aggstate = node->aggstate;
-
- if (aggstate->agg_done)
+ if (node->agg_done)
return NULL;
- if (node->aggstrategy == AGG_HASHED)
+ if (((Agg *) node->ss.ps.plan)->aggstrategy == AGG_HASHED)
{
- if (!aggstate->table_filled)
+ if (!node->table_filled)
agg_fill_hash_table(node);
return agg_retrieve_hash_table(node);
}
else
- {
return agg_retrieve_direct(node);
- }
}
/*
* ExecAgg for non-hashed case
*/
static TupleTableSlot *
-agg_retrieve_direct(Agg *node)
+agg_retrieve_direct(AggState *aggstate)
{
- AggState *aggstate;
- Plan *outerPlan;
+ Agg *node = (Agg *) aggstate->ss.ps.plan;
+ PlanState *outerPlan;
ExprContext *econtext;
ExprContext *tmpcontext;
ProjectionInfo *projInfo;
/*
* get state info from node
*/
- aggstate = node->aggstate;
- outerPlan = outerPlan(node);
+ outerPlan = outerPlanState(aggstate);
/* econtext is the per-output-tuple expression context */
- econtext = aggstate->csstate.cstate.cs_ExprContext;
+ econtext = aggstate->ss.ps.ps_ExprContext;
aggvalues = econtext->ecxt_aggvalues;
aggnulls = econtext->ecxt_aggnulls;
/* tmpcontext is the per-input-tuple expression context */
tmpcontext = aggstate->tmpcontext;
- projInfo = aggstate->csstate.cstate.cs_ProjInfo;
+ projInfo = aggstate->ss.ps.ps_ProjInfo;
peragg = aggstate->peragg;
pergroup = aggstate->pergroup;
- firstSlot = aggstate->csstate.css_ScanTupleSlot;
+ firstSlot = aggstate->ss.ss_ScanTupleSlot;
/*
* We loop retrieving groups until we find one matching
- * node->plan.qual
+ * aggstate->ss.ps.qual
*/
do
{
*/
if (aggstate->grp_firstTuple == NULL)
{
- outerslot = ExecProcNode(outerPlan, (Plan *) node);
+ outerslot = ExecProcNode(outerPlan);
if (!TupIsNull(outerslot))
{
/*
firstSlot,
InvalidBuffer,
true);
- aggstate->grp_firstTuple = NULL; /* don't keep two pointers */
+ aggstate->grp_firstTuple = NULL; /* don't keep two pointers */
/* set up for first advance_aggregates call */
tmpcontext->ecxt_scantuple = firstSlot;
/* Reset per-input-tuple context after each tuple */
ResetExprContext(tmpcontext);
- outerslot = ExecProcNode(outerPlan, (Plan *) node);
+ outerslot = ExecProcNode(outerPlan);
if (TupIsNull(outerslot))
{
/* no more outer-plan tuples available */
firstSlot->ttc_tupleDescriptor,
node->numCols, node->grpColIdx,
aggstate->eqfunctions,
- tmpcontext->ecxt_per_tuple_memory))
+ tmpcontext->ecxt_per_tuple_memory))
{
/*
* Save the first input tuple of the next group.
* anything), create a dummy all-nulls input tuple for use by
* ExecProject. 99.44% of the time this is a waste of cycles,
* because ordinarily the projected output tuple's targetlist
- * cannot contain any direct (non-aggregated) references to
- * input columns, so the dummy tuple will not be referenced.
- * However there are special cases where this isn't so --- in
- * particular an UPDATE involving an aggregate will have a
- * targetlist reference to ctid. We need to return a null for
- * ctid in that situation, not coredump.
+ * cannot contain any direct (non-aggregated) references to input
+ * columns, so the dummy tuple will not be referenced. However
+ * there are special cases where this isn't so --- in particular
+ * an UPDATE involving an aggregate will have a targetlist
+ * reference to ctid. We need to return a null for ctid in that
+ * situation, not coredump.
*
- * The values returned for the aggregates will be the initial
- * values of the transition functions.
+ * The values returned for the aggregates will be the initial values
+ * of the transition functions.
*/
if (TupIsNull(firstSlot))
{
* Otherwise, return the tuple.
*/
}
- while (!ExecQual(node->plan.qual, econtext, false));
+ while (!ExecQual(aggstate->ss.ps.qual, econtext, false));
return resultSlot;
}
* ExecAgg for hashed case: phase 1, read input and build hash table
*/
static void
-agg_fill_hash_table(Agg *node)
+agg_fill_hash_table(AggState *aggstate)
{
- AggState *aggstate;
- Plan *outerPlan;
+ PlanState *outerPlan;
ExprContext *tmpcontext;
- AggHashEntry entry;
+ AggHashEntry entry;
TupleTableSlot *outerslot;
/*
* get state info from node
*/
- aggstate = node->aggstate;
- outerPlan = outerPlan(node);
+ outerPlan = outerPlanState(aggstate);
/* tmpcontext is the per-input-tuple expression context */
tmpcontext = aggstate->tmpcontext;
/*
- * Process each outer-plan tuple, and then fetch the next one,
- * until we exhaust the outer plan.
+ * Process each outer-plan tuple, and then fetch the next one, until
+ * we exhaust the outer plan.
*/
for (;;)
{
- outerslot = ExecProcNode(outerPlan, (Plan *) node);
+ outerslot = ExecProcNode(outerPlan);
if (TupIsNull(outerslot))
break;
/* set up for advance_aggregates call */
tmpcontext->ecxt_scantuple = outerslot;
/* Find or build hashtable entry for this tuple's group */
- entry = lookup_hash_entry(node, outerslot);
+ entry = lookup_hash_entry(aggstate, outerslot);
/* Advance the aggregates */
advance_aggregates(aggstate, entry->pergroup);
aggstate->table_filled = true;
/* Initialize to walk the hash table */
- aggstate->next_hash_entry = NULL;
- aggstate->next_hash_bucket = 0;
+ ResetTupleHashIterator(aggstate->hashtable, &aggstate->hashiter);
}
/*
* ExecAgg for hashed case: phase 2, retrieving groups from hash table
*/
static TupleTableSlot *
-agg_retrieve_hash_table(Agg *node)
+agg_retrieve_hash_table(AggState *aggstate)
{
- AggState *aggstate;
ExprContext *econtext;
ProjectionInfo *projInfo;
Datum *aggvalues;
bool *aggnulls;
AggStatePerAgg peragg;
AggStatePerGroup pergroup;
- AggHashTable hashtable;
- AggHashEntry entry;
+ AggHashEntry entry;
TupleTableSlot *firstSlot;
TupleTableSlot *resultSlot;
int aggno;
/*
* get state info from node
*/
- aggstate = node->aggstate;
/* econtext is the per-output-tuple expression context */
- econtext = aggstate->csstate.cstate.cs_ExprContext;
+ econtext = aggstate->ss.ps.ps_ExprContext;
aggvalues = econtext->ecxt_aggvalues;
aggnulls = econtext->ecxt_aggnulls;
- projInfo = aggstate->csstate.cstate.cs_ProjInfo;
+ projInfo = aggstate->ss.ps.ps_ProjInfo;
peragg = aggstate->peragg;
- hashtable = aggstate->hashtable;
- firstSlot = aggstate->csstate.css_ScanTupleSlot;
+ firstSlot = aggstate->ss.ss_ScanTupleSlot;
/*
- * We loop retrieving groups until we find one matching
- * node->plan.qual
+ * We loop retrieving groups until we find one satisfying
+ * aggstate->ss.ps.qual
*/
do
{
/*
* Find the next entry in the hash table
*/
- entry = aggstate->next_hash_entry;
- while (entry == NULL)
+ entry = (AggHashEntry) ScanTupleHashTable(&aggstate->hashiter);
+ if (entry == NULL)
{
- if (aggstate->next_hash_bucket >= hashtable->nbuckets)
- {
- /* No more entries in hashtable, so done */
- aggstate->agg_done = TRUE;
- return NULL;
- }
- entry = hashtable->buckets[aggstate->next_hash_bucket++];
+ /* No more entries in hashtable, so done */
+ aggstate->agg_done = TRUE;
+ return NULL;
}
- aggstate->next_hash_entry = entry->next;
/*
* Clear the per-output-tuple context for each group
* Store the copied first input tuple in the tuple table slot
* reserved for it, so that it can be used in ExecProject.
*/
- ExecStoreTuple(entry->firstTuple,
+ ExecStoreTuple(entry->shared.firstTuple,
firstSlot,
InvalidBuffer,
false);
* Otherwise, return the tuple.
*/
}
- while (!ExecQual(node->plan.qual, econtext, false));
+ while (!ExecQual(aggstate->ss.ps.qual, econtext, false));
return resultSlot;
}
* planner and initializes its outer subtree
* -----------------
*/
-bool
-ExecInitAgg(Agg *node, EState *estate, Plan *parent)
+AggState *
+ExecInitAgg(Agg *node, EState *estate)
{
AggState *aggstate;
AggStatePerAgg peragg;
aggno;
List *alist;
- /*
- * assign the node's execution state
- */
- node->plan.state = estate;
-
/*
* create state structure
*/
aggstate = makeNode(AggState);
- node->aggstate = aggstate;
+ aggstate->ss.ps.plan = (Plan *) node;
+ aggstate->ss.ps.state = estate;
+
+ aggstate->aggs = NIL;
+ aggstate->numaggs = 0;
aggstate->eqfunctions = NULL;
+ aggstate->hashfunctions = NULL;
aggstate->peragg = NULL;
aggstate->agg_done = false;
aggstate->pergroup = NULL;
aggstate->hashtable = NULL;
/*
- * find aggregates in targetlist and quals
- *
- * Note: pull_agg_clauses also checks that no aggs contain other agg
- * calls in their arguments. This would make no sense under SQL
- * semantics anyway (and it's forbidden by the spec). Because that is
- * true, we don't need to worry about evaluating the aggs in any
- * particular order.
- */
- aggstate->aggs = nconc(pull_agg_clause((Node *) node->plan.targetlist),
- pull_agg_clause((Node *) node->plan.qual));
- aggstate->numaggs = numaggs = length(aggstate->aggs);
- if (numaggs <= 0)
- {
- /*
- * This is not an error condition: we might be using the Agg node just
- * to do hash-based grouping. Even in the regular case,
- * constant-expression simplification could optimize away all of the
- * Aggrefs in the targetlist and qual. So keep going, but force local
- * copy of numaggs positive so that palloc()s below don't choke.
- */
- numaggs = 1;
- }
-
- /*
- * Create expression contexts. We need two, one for per-input-tuple
- * processing and one for per-output-tuple processing. We cheat a little
- * by using ExecAssignExprContext() to build both.
+ * Create expression contexts. We need two, one for per-input-tuple
+ * processing and one for per-output-tuple processing. We cheat a
+ * little by using ExecAssignExprContext() to build both.
*/
- ExecAssignExprContext(estate, &aggstate->csstate.cstate);
- aggstate->tmpcontext = aggstate->csstate.cstate.cs_ExprContext;
- ExecAssignExprContext(estate, &aggstate->csstate.cstate);
+ ExecAssignExprContext(estate, &aggstate->ss.ps);
+ aggstate->tmpcontext = aggstate->ss.ps.ps_ExprContext;
+ ExecAssignExprContext(estate, &aggstate->ss.ps);
/*
- * We also need a long-lived memory context for holding hashtable
- * data structures and transition values. NOTE: the details of what
- * is stored in aggcontext and what is stored in the regular per-query
- * memory context are driven by a simple decision: we want to reset the
- * aggcontext in ExecReScanAgg to recover no-longer-wanted space.
+ * We also need a long-lived memory context for holding hashtable data
+ * structures and transition values. NOTE: the details of what is
+ * stored in aggcontext and what is stored in the regular per-query
+ * memory context are driven by a simple decision: we want to reset
+ * the aggcontext in ExecReScanAgg to recover no-longer-wanted space.
*/
aggstate->aggcontext =
AllocSetContextCreate(CurrentMemoryContext,
/*
* tuple table initialization
*/
- ExecInitScanTupleSlot(estate, &aggstate->csstate);
- ExecInitResultTupleSlot(estate, &aggstate->csstate.cstate);
+ ExecInitScanTupleSlot(estate, &aggstate->ss);
+ ExecInitResultTupleSlot(estate, &aggstate->ss.ps);
/*
- * Set up aggregate-result storage in the output expr context, and also
- * allocate my private per-agg working storage
+ * initialize child expressions
+ *
+ * Note: ExecInitExpr finds Aggrefs for us, and also checks that no aggs
+ * contain other agg calls in their arguments. This would make no
+ * sense under SQL semantics anyway (and it's forbidden by the spec).
+ * Because that is true, we don't need to worry about evaluating the
+ * aggs in any particular order.
*/
- econtext = aggstate->csstate.cstate.cs_ExprContext;
- econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numaggs);
- econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numaggs);
-
- peragg = (AggStatePerAgg) palloc0(sizeof(AggStatePerAggData) * numaggs);
- aggstate->peragg = peragg;
-
- if (node->aggstrategy == AGG_HASHED)
- {
- build_hash_table(node);
- aggstate->table_filled = false;
- }
- else
- {
- AggStatePerGroup pergroup;
-
- pergroup = (AggStatePerGroup) palloc0(sizeof(AggStatePerGroupData) * numaggs);
- aggstate->pergroup = pergroup;
- }
+ aggstate->ss.ps.targetlist = (List *)
+ ExecInitExpr((Expr *) node->plan.targetlist,
+ (PlanState *) aggstate);
+ aggstate->ss.ps.qual = (List *)
+ ExecInitExpr((Expr *) node->plan.qual,
+ (PlanState *) aggstate);
/*
* initialize child nodes
*/
outerPlan = outerPlan(node);
- ExecInitNode(outerPlan, estate, (Plan *) node);
+ outerPlanState(aggstate) = ExecInitNode(outerPlan, estate);
/*
* initialize source tuple type.
*/
- ExecAssignScanTypeFromOuterPlan((Plan *) node, &aggstate->csstate);
+ ExecAssignScanTypeFromOuterPlan(&aggstate->ss);
/*
* Initialize result tuple type and projection info.
*/
- ExecAssignResultTypeFromTL((Plan *) node, &aggstate->csstate.cstate);
- ExecAssignProjectionInfo((Plan *) node, &aggstate->csstate.cstate);
+ ExecAssignResultTypeFromTL(&aggstate->ss.ps);
+ ExecAssignProjectionInfo(&aggstate->ss.ps);
+
+ /*
+ * get the count of aggregates in targetlist and quals
+ */
+ numaggs = aggstate->numaggs;
+ Assert(numaggs == length(aggstate->aggs));
+ if (numaggs <= 0)
+ {
+ /*
+ * This is not an error condition: we might be using the Agg node
+ * just to do hash-based grouping. Even in the regular case,
+ * constant-expression simplification could optimize away all of
+ * the Aggrefs in the targetlist and qual. So keep going, but
+ * force local copy of numaggs positive so that palloc()s below
+ * don't choke.
+ */
+ numaggs = 1;
+ }
/*
- * If we are grouping, precompute fmgr lookup data for inner loop
+ * If we are grouping, precompute fmgr lookup data for inner loop. We
+ * need both equality and hashing functions to do it by hashing, but
+ * only equality if not hashing.
*/
if (node->numCols > 0)
{
- aggstate->eqfunctions =
- execTuplesMatchPrepare(ExecGetScanType(&aggstate->csstate),
- node->numCols,
- node->grpColIdx);
+ if (node->aggstrategy == AGG_HASHED)
+ execTuplesHashPrepare(ExecGetScanType(&aggstate->ss),
+ node->numCols,
+ node->grpColIdx,
+ &aggstate->eqfunctions,
+ &aggstate->hashfunctions);
+ else
+ aggstate->eqfunctions =
+ execTuplesMatchPrepare(ExecGetScanType(&aggstate->ss),
+ node->numCols,
+ node->grpColIdx);
+ }
+
+ /*
+ * Set up aggregate-result storage in the output expr context, and
+ * also allocate my private per-agg working storage
+ */
+ econtext = aggstate->ss.ps.ps_ExprContext;
+ econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numaggs);
+ econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numaggs);
+
+ peragg = (AggStatePerAgg) palloc0(sizeof(AggStatePerAggData) * numaggs);
+ aggstate->peragg = peragg;
+
+ if (node->aggstrategy == AGG_HASHED)
+ {
+ build_hash_table(aggstate);
+ aggstate->table_filled = false;
+ }
+ else
+ {
+ AggStatePerGroup pergroup;
+
+ pergroup = (AggStatePerGroup) palloc0(sizeof(AggStatePerGroupData) * numaggs);
+ aggstate->pergroup = pergroup;
}
/*
* Perform lookups of aggregate function info, and initialize the
- * unchanging fields of the per-agg data
+ * unchanging fields of the per-agg data. We also detect duplicate
+ * aggregates (for example, "SELECT sum(x) ... HAVING sum(x) > 0").
+ * When duplicates are detected, we only make an AggStatePerAgg struct
+ * for the first one. The clones are simply pointed at the same
+ * result entry by giving them duplicate aggno values.
*/
aggno = -1;
foreach(alist, aggstate->aggs)
{
- Aggref *aggref = (Aggref *) lfirst(alist);
- AggStatePerAgg peraggstate = &peragg[++aggno];
+ AggrefExprState *aggrefstate = (AggrefExprState *) lfirst(alist);
+ Aggref *aggref = (Aggref *) aggrefstate->xprstate.expr;
+ AggStatePerAgg peraggstate;
+ Oid inputType;
HeapTuple aggTuple;
Form_pg_aggregate aggform;
+ Oid aggtranstype;
AclResult aclresult;
Oid transfn_oid,
finalfn_oid;
+ Expr *transfnexpr,
+ *finalfnexpr;
Datum textInitVal;
+ int i;
- /* Mark Aggref node with its associated index in the result array */
- aggref->aggno = aggno;
+ /* Planner should have assigned aggregate to correct level */
+ Assert(aggref->agglevelsup == 0);
+
+ /* Look for a previous duplicate aggregate */
+ for (i = 0; i <= aggno; i++)
+ {
+ if (equal(aggref, peragg[i].aggref) &&
+ !contain_volatile_functions((Node *) aggref))
+ break;
+ }
+ if (i <= aggno)
+ {
+ /* Found a match to an existing entry, so just mark it */
+ aggrefstate->aggno = i;
+ continue;
+ }
+
+ /* Nope, so assign a new PerAgg record */
+ peraggstate = &peragg[++aggno];
+
+ /* Mark Aggref state node with assigned index in the result array */
+ aggrefstate->aggno = aggno;
/* Fill in the peraggstate data */
+ peraggstate->aggrefstate = aggrefstate;
peraggstate->aggref = aggref;
+ /*
+ * Get actual datatype of the input. We need this because it may
+ * be different from the agg's declared input type, when the agg
+ * accepts ANY (eg, COUNT(*)) or ANYARRAY or ANYELEMENT.
+ */
+ inputType = exprType((Node *) aggref->target);
+
aggTuple = SearchSysCache(AGGFNOID,
ObjectIdGetDatum(aggref->aggfnoid),
0, 0, 0);
if (!HeapTupleIsValid(aggTuple))
- elog(ERROR, "ExecAgg: cache lookup failed for aggregate %u",
+ elog(ERROR, "cache lookup failed for aggregate %u",
aggref->aggfnoid);
aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
aclresult = pg_proc_aclcheck(aggref->aggfnoid, GetUserId(),
ACL_EXECUTE);
if (aclresult != ACLCHECK_OK)
- aclcheck_error(aclresult, get_func_name(aggref->aggfnoid));
+ aclcheck_error(aclresult, ACL_KIND_PROC,
+ get_func_name(aggref->aggfnoid));
+
+ peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn;
+ peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
+
+ /* resolve actual type of transition state, if polymorphic */
+ aggtranstype = aggform->aggtranstype;
+ if (aggtranstype == ANYARRAYOID || aggtranstype == ANYELEMENTOID)
+ {
+ /* have to fetch the agg's declared input type... */
+ Oid agg_arg_types[FUNC_MAX_ARGS];
+ int agg_nargs;
+
+ (void) get_func_signature(aggref->aggfnoid,
+ agg_arg_types, &agg_nargs);
+ Assert(agg_nargs == 1);
+ aggtranstype = resolve_generic_type(aggtranstype,
+ inputType,
+ agg_arg_types[0]);
+ }
+
+ /* build expression trees using actual argument & result types */
+ build_aggregate_fnexprs(inputType,
+ aggtranstype,
+ aggref->aggtype,
+ transfn_oid,
+ finalfn_oid,
+ &transfnexpr,
+ &finalfnexpr);
+
+ fmgr_info(transfn_oid, &peraggstate->transfn);
+ peraggstate->transfn.fn_expr = (Node *) transfnexpr;
+
+ if (OidIsValid(finalfn_oid))
+ {
+ fmgr_info(finalfn_oid, &peraggstate->finalfn);
+ peraggstate->finalfn.fn_expr = (Node *) finalfnexpr;
+ }
get_typlenbyval(aggref->aggtype,
&peraggstate->resulttypeLen,
&peraggstate->resulttypeByVal);
- get_typlenbyval(aggform->aggtranstype,
+ get_typlenbyval(aggtranstype,
&peraggstate->transtypeLen,
&peraggstate->transtypeByVal);
peraggstate->initValue = (Datum) 0;
else
peraggstate->initValue = GetAggInitVal(textInitVal,
- aggform->aggtranstype);
-
- peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn;
- peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
-
- fmgr_info(transfn_oid, &peraggstate->transfn);
- if (OidIsValid(finalfn_oid))
- fmgr_info(finalfn_oid, &peraggstate->finalfn);
+ aggtranstype);
/*
* If the transfn is strict and the initval is NULL, make sure
*/
if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull)
{
- /*
- * Note: use the type from the input expression here, not from
- * pg_proc.proargtypes, because the latter might be 0.
- * (Consider COUNT(*).)
- */
- Oid inputType = exprType(aggref->target);
-
- if (!IsBinaryCoercible(inputType, aggform->aggtranstype))
- elog(ERROR, "Aggregate %u needs to have compatible input type and transition type",
- aggref->aggfnoid);
+ if (!IsBinaryCoercible(inputType, aggtranstype))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
+ errmsg("aggregate %u needs to have compatible input type and transition type",
+ aggref->aggfnoid)));
}
if (aggref->aggdistinct)
{
- /*
- * Note: use the type from the input expression here, not from
- * pg_proc.proargtypes, because the latter might be a pseudotype.
- * (Consider COUNT(*).)
- */
- Oid inputType = exprType(aggref->target);
Oid eq_function;
/* We don't implement DISTINCT aggs in the HASHED case */
&peraggstate->inputtypeLen,
&peraggstate->inputtypeByVal);
- eq_function = compatible_oper_funcid(makeList1(makeString("=")),
- inputType, inputType,
- true);
- if (!OidIsValid(eq_function))
- elog(ERROR, "Unable to identify an equality operator for type %s",
- format_type_be(inputType));
+ eq_function = equality_oper_funcid(inputType);
fmgr_info(eq_function, &(peraggstate->equalfn));
- peraggstate->sortOperator = any_ordering_op(inputType);
+ peraggstate->sortOperator = ordering_oper_opid(inputType);
peraggstate->sortstate = NULL;
}
ReleaseSysCache(aggTuple);
}
- return TRUE;
+ /* Update numaggs to match number of unique aggregates found */
+ aggstate->numaggs = aggno + 1;
+
+ return aggstate;
}
static Datum
ObjectIdGetDatum(transtype),
0, 0, 0);
if (!HeapTupleIsValid(tup))
- elog(ERROR, "GetAggInitVal: cache lookup failed on aggregate transition function return type %u", transtype);
+ elog(ERROR, "cache lookup failed for type %u", transtype);
typinput = ((Form_pg_type) GETSTRUCT(tup))->typinput;
typelem = ((Form_pg_type) GETSTRUCT(tup))->typelem;
}
void
-ExecEndAgg(Agg *node)
+ExecEndAgg(AggState *node)
{
- AggState *aggstate = node->aggstate;
- Plan *outerPlan;
+ PlanState *outerPlan;
int aggno;
/* Make sure we have closed any open tuplesorts */
- for (aggno = 0; aggno < aggstate->numaggs; aggno++)
+ for (aggno = 0; aggno < node->numaggs; aggno++)
{
- AggStatePerAgg peraggstate = &aggstate->peragg[aggno];
+ AggStatePerAgg peraggstate = &node->peragg[aggno];
if (peraggstate->sortstate)
tuplesort_end(peraggstate->sortstate);
}
- ExecFreeProjectionInfo(&aggstate->csstate.cstate);
-
/*
* Free both the expr contexts.
*/
- ExecFreeExprContext(&aggstate->csstate.cstate);
- aggstate->csstate.cstate.cs_ExprContext = aggstate->tmpcontext;
- ExecFreeExprContext(&aggstate->csstate.cstate);
+ ExecFreeExprContext(&node->ss.ps);
+ node->ss.ps.ps_ExprContext = node->tmpcontext;
+ ExecFreeExprContext(&node->ss.ps);
- MemoryContextDelete(aggstate->aggcontext);
+ /* clean up tuple table */
+ ExecClearTuple(node->ss.ss_ScanTupleSlot);
- outerPlan = outerPlan(node);
- ExecEndNode(outerPlan, (Plan *) node);
+ MemoryContextDelete(node->aggcontext);
- /* clean up tuple table */
- ExecClearTuple(aggstate->csstate.css_ScanTupleSlot);
- if (aggstate->grp_firstTuple != NULL)
- {
- heap_freetuple(aggstate->grp_firstTuple);
- aggstate->grp_firstTuple = NULL;
- }
+ outerPlan = outerPlanState(node);
+ ExecEndNode(outerPlan);
}
void
-ExecReScanAgg(Agg *node, ExprContext *exprCtxt, Plan *parent)
+ExecReScanAgg(AggState *node, ExprContext *exprCtxt)
{
- AggState *aggstate = node->aggstate;
- ExprContext *econtext = aggstate->csstate.cstate.cs_ExprContext;
+ ExprContext *econtext = node->ss.ps.ps_ExprContext;
int aggno;
+ node->agg_done = false;
+
+ if (((Agg *) node->ss.ps.plan)->aggstrategy == AGG_HASHED)
+ {
+ /*
+ * In the hashed case, if we haven't yet built the hash table then
+ * we can just return; nothing done yet, so nothing to undo. If
+ * subnode's chgParam is not NULL then it will be re-scanned by
+ * ExecProcNode, else no reason to re-scan it at all.
+ */
+ if (!node->table_filled)
+ return;
+
+ /*
+ * If we do have the hash table and the subplan does not have any
+ * parameter changes, then we can just rescan the existing hash
+ * table; no need to build it again.
+ */
+ if (((PlanState *) node)->lefttree->chgParam == NULL)
+ {
+ ResetTupleHashIterator(node->hashtable, &node->hashiter);
+ return;
+ }
+ }
+
/* Make sure we have closed any open tuplesorts */
- for (aggno = 0; aggno < aggstate->numaggs; aggno++)
+ for (aggno = 0; aggno < node->numaggs; aggno++)
{
- AggStatePerAgg peraggstate = &aggstate->peragg[aggno];
+ AggStatePerAgg peraggstate = &node->peragg[aggno];
if (peraggstate->sortstate)
tuplesort_end(peraggstate->sortstate);
peraggstate->sortstate = NULL;
}
- aggstate->agg_done = false;
- if (aggstate->grp_firstTuple != NULL)
+ /* Release first tuple of group, if we have made a copy */
+ if (node->grp_firstTuple != NULL)
{
- heap_freetuple(aggstate->grp_firstTuple);
- aggstate->grp_firstTuple = NULL;
+ heap_freetuple(node->grp_firstTuple);
+ node->grp_firstTuple = NULL;
}
- MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * aggstate->numaggs);
- MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * aggstate->numaggs);
- MemoryContextReset(aggstate->aggcontext);
+ /* Forget current agg values */
+ MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numaggs);
+ MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numaggs);
- if (node->aggstrategy == AGG_HASHED)
+ /* Release all temp storage */
+ MemoryContextReset(node->aggcontext);
+
+ if (((Agg *) node->ss.ps.plan)->aggstrategy == AGG_HASHED)
{
+ /* Rebuild an empty hash table */
build_hash_table(node);
- aggstate->table_filled = false;
+ node->table_filled = false;
}
/*
* if chgParam of subnode is not null then plan will be re-scanned by
* first ExecProcNode.
*/
- if (((Plan *) node)->lefttree->chgParam == NULL)
- ExecReScan(((Plan *) node)->lefttree, exprCtxt, (Plan *) node);
+ if (((PlanState *) node)->lefttree->chgParam == NULL)
+ ExecReScan(((PlanState *) node)->lefttree, exprCtxt);
}
/*
Datum
aggregate_dummy(PG_FUNCTION_ARGS)
{
- elog(ERROR, "Aggregate function %u called as normal function",
+ elog(ERROR, "aggregate function %u called as normal function",
fcinfo->flinfo->fn_oid);
return (Datum) 0; /* keep compiler quiet */
}