]> granicus.if.org Git - postgresql/blob - src/backend/executor/nodeAgg.c
Ye-old pgindent run. Same 4-space tabs.
[postgresql] / src / backend / executor / nodeAgg.c
1 /*-------------------------------------------------------------------------
2  *
3  * nodeAgg.c
4  *        Routines to handle aggregate nodes.
5  *
6  *        ExecAgg evaluates each aggregate in the following steps: (initcond1,
7  *        initcond2 are the initial values and sfunc1, sfunc2, and finalfunc are
8  *        the transition functions.)
9  *
10  *               value1 = initcond1
11  *               value2 = initcond2
12  *               foreach input_value do
13  *                      value1 = sfunc1(value1, input_value)
14  *                      value2 = sfunc2(value2)
15  *               value1 = finalfunc(value1, value2)
16  *
17  *        If initcond1 is NULL then the first non-NULL input_value is
18  *        assigned directly to value1.  sfunc1 isn't applied until value1
19  *        is non-NULL.
20  *
21  *        sfunc1 is never applied when the current tuple's input_value is NULL.
22  *        sfunc2 is applied for each tuple if the aggref is marked 'usenulls',
23  *        otherwise it is only applied when input_value is not NULL.
24  *        (usenulls was formerly used for COUNT(*), but is no longer needed for
25  *        that purpose; as of 10/1999 the support for usenulls is dead code.
26  *        I have not removed it because it seems like a potentially useful
27  *        feature for user-defined aggregates.  We'd just need to add a
28  *        flag column to pg_aggregate and a parameter to CREATE AGGREGATE...)
29  *
30  *
31  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
32  * Portions Copyright (c) 1994, Regents of the University of California
33  *
34  * IDENTIFICATION
35  *        $Header: /cvsroot/pgsql/src/backend/executor/nodeAgg.c,v 1.63 2000/04/12 17:15:09 momjian Exp $
36  *
37  *-------------------------------------------------------------------------
38  */
39
40 #include "postgres.h"
41
42 #include "access/heapam.h"
43 #include "catalog/pg_aggregate.h"
44 #include "catalog/pg_operator.h"
45 #include "executor/executor.h"
46 #include "executor/nodeAgg.h"
47 #include "optimizer/clauses.h"
48 #include "parser/parse_expr.h"
49 #include "parser/parse_oper.h"
50 #include "parser/parse_type.h"
51 #include "utils/syscache.h"
52 #include "utils/tuplesort.h"
53
54 /*
55  * AggStatePerAggData - per-aggregate working state for the Agg scan
56  */
57 typedef struct AggStatePerAggData
58 {
59
60         /*
61          * These values are set up during ExecInitAgg() and do not change
62          * thereafter:
63          */
64
65         /* Link to Aggref node this working state is for */
66         Aggref     *aggref;
67
68         /* Oids of transfer functions */
69         Oid                     xfn1_oid;
70         Oid                     xfn2_oid;
71         Oid                     finalfn_oid;
72
73         /*
74          * fmgr lookup data for transfer functions --- only valid when
75          * corresponding oid is not InvalidOid
76          */
77         FmgrInfo        xfn1;
78         FmgrInfo        xfn2;
79         FmgrInfo        finalfn;
80
81         /*
82          * Type of input data and Oid of sort operator to use for it; only
83          * set/used when aggregate has DISTINCT flag.  (These are not used
84          * directly by nodeAgg, but must be passed to the Tuplesort object.)
85          */
86         Oid                     inputType;
87         Oid                     sortOperator;
88
89         /*
90          * fmgr lookup data for input type's equality operator --- only
91          * set/used when aggregate has DISTINCT flag.
92          */
93         FmgrInfo        equalfn;
94
95         /*
96          * initial values from pg_aggregate entry
97          */
98         Datum           initValue1;             /* for transtype1 */
99         Datum           initValue2;             /* for transtype2 */
100         bool            initValue1IsNull,
101                                 initValue2IsNull;
102
103         /*
104          * We need the len and byval info for the agg's input and transition
105          * data types in order to know how to copy/delete values.
106          */
107         int                     inputtypeLen,
108                                 transtype1Len,
109                                 transtype2Len;
110         bool            inputtypeByVal,
111                                 transtype1ByVal,
112                                 transtype2ByVal;
113
114         /*
115          * These values are working state that is initialized at the start of
116          * an input tuple group and updated for each input tuple.
117          *
118          * For a simple (non DISTINCT) aggregate, we just feed the input values
119          * straight to the transition functions.  If it's DISTINCT, we pass
120          * the input values into a Tuplesort object; then at completion of the
121          * input tuple group, we scan the sorted values, eliminate duplicates,
122          * and run the transition functions on the rest.
123          */
124
125         Tuplesortstate *sortstate;      /* sort object, if a DISTINCT agg */
126
127         Datum           value1,                 /* current transfer values 1 and 2 */
128                                 value2;
129         bool            value1IsNull,
130                                 value2IsNull;
131         bool            noInitValue;    /* true if value1 not set yet */
132
133         /*
134          * Note: right now, noInitValue always has the same value as
135          * value1IsNull. But we should keep them separate because once the
136          * fmgr interface is fixed, we'll need to distinguish a null returned
137          * by transfn1 from a null we haven't yet replaced with an input
138          * value.
139          */
140 } AggStatePerAggData;
141
142
143 static void initialize_aggregate(AggStatePerAgg peraggstate);
144 static void advance_transition_functions(AggStatePerAgg peraggstate,
145                                                          Datum newVal, bool isNull);
146 static void finalize_aggregate(AggStatePerAgg peraggstate,
147                                    Datum *resultVal, bool *resultIsNull);
148 static Datum copyDatum(Datum val, int typLen, bool typByVal);
149
150
151 /*
152  * Initialize one aggregate for a new set of input values.
153  */
154 static void
155 initialize_aggregate(AggStatePerAgg peraggstate)
156 {
157         Aggref     *aggref = peraggstate->aggref;
158
159         /*
160          * Start a fresh sort operation for each DISTINCT aggregate.
161          */
162         if (aggref->aggdistinct)
163         {
164
165                 /*
166                  * In case of rescan, maybe there could be an uncompleted sort
167                  * operation?  Clean it up if so.
168                  */
169                 if (peraggstate->sortstate)
170                         tuplesort_end(peraggstate->sortstate);
171
172                 peraggstate->sortstate =
173                         tuplesort_begin_datum(peraggstate->inputType,
174                                                                   peraggstate->sortOperator,
175                                                                   false);
176         }
177
178         /*
179          * (Re)set value1 and value2 to their initial values.
180          */
181         if (OidIsValid(peraggstate->xfn1_oid) &&
182                 !peraggstate->initValue1IsNull)
183                 peraggstate->value1 = copyDatum(peraggstate->initValue1,
184                                                                                 peraggstate->transtype1Len,
185                                                                                 peraggstate->transtype1ByVal);
186         else
187                 peraggstate->value1 = (Datum) NULL;
188         peraggstate->value1IsNull = peraggstate->initValue1IsNull;
189
190         if (OidIsValid(peraggstate->xfn2_oid) &&
191                 !peraggstate->initValue2IsNull)
192                 peraggstate->value2 = copyDatum(peraggstate->initValue2,
193                                                                                 peraggstate->transtype2Len,
194                                                                                 peraggstate->transtype2ByVal);
195         else
196                 peraggstate->value2 = (Datum) NULL;
197         peraggstate->value2IsNull = peraggstate->initValue2IsNull;
198
199         /* ------------------------------------------
200          * If the initial value for the first transition function
201          * doesn't exist in the pg_aggregate table then we will let
202          * the first value returned from the outer procNode become
203          * the initial value. (This is useful for aggregates like
204          * max{} and min{}.)  The noInitValue flag signals that we
205          * still need to do this.
206          * ------------------------------------------
207          */
208         peraggstate->noInitValue = peraggstate->initValue1IsNull;
209 }
210
211 /*
212  * Given a new input value, advance the transition functions of an aggregate.
213  *
214  * Note: if the agg does not have usenulls set, null inputs will be filtered
215  * out before reaching here.
216  */
217 static void
218 advance_transition_functions(AggStatePerAgg peraggstate,
219                                                          Datum newVal, bool isNull)
220 {
221         Datum           args[2];
222
223         if (OidIsValid(peraggstate->xfn1_oid) && !isNull)
224         {
225                 if (peraggstate->noInitValue)
226                 {
227
228                         /*
229                          * value1 has not been initialized. This is the first non-NULL
230                          * input value. We use it as the initial value for value1.
231                          *
232                          * XXX We assume, without having checked, that the agg's input
233                          * type is binary-compatible with its transtype1!
234                          *
235                          * We have to copy the datum since the tuple from which it came
236                          * will be freed on the next iteration of the scan.
237                          */
238                         peraggstate->value1 = copyDatum(newVal,
239                                                                                         peraggstate->transtype1Len,
240                                                                                         peraggstate->transtype1ByVal);
241                         peraggstate->value1IsNull = false;
242                         peraggstate->noInitValue = false;
243                 }
244                 else
245                 {
246                         /* apply transition function 1 */
247                         args[0] = peraggstate->value1;
248                         args[1] = newVal;
249                         newVal = (Datum) fmgr_c(&peraggstate->xfn1,
250                                                                         (FmgrValues *) args,
251                                                                         &isNull);
252                         if (!peraggstate->transtype1ByVal)
253                                 pfree(peraggstate->value1);
254                         peraggstate->value1 = newVal;
255                 }
256         }
257
258         if (OidIsValid(peraggstate->xfn2_oid))
259         {
260                 /* apply transition function 2 */
261                 args[0] = peraggstate->value2;
262                 isNull = false;                 /* value2 cannot be null, currently */
263                 newVal = (Datum) fmgr_c(&peraggstate->xfn2,
264                                                                 (FmgrValues *) args,
265                                                                 &isNull);
266                 if (!peraggstate->transtype2ByVal)
267                         pfree(peraggstate->value2);
268                 peraggstate->value2 = newVal;
269         }
270 }
271
272 /*
273  * Compute the final value of one aggregate.
274  */
275 static void
276 finalize_aggregate(AggStatePerAgg peraggstate,
277                                    Datum *resultVal, bool *resultIsNull)
278 {
279         Aggref     *aggref = peraggstate->aggref;
280         char       *args[2];
281
282         /*
283          * If it's a DISTINCT aggregate, all we've done so far is to stuff the
284          * input values into the sort object.  Complete the sort, then run the
285          * transition functions on the non-duplicate values.  Note that
286          * DISTINCT always suppresses nulls, per SQL spec, regardless of
287          * usenulls.
288          */
289         if (aggref->aggdistinct)
290         {
291                 Datum           oldVal = (Datum) 0;
292                 bool            haveOldVal = false;
293                 Datum           newVal;
294                 bool            isNull;
295
296                 tuplesort_performsort(peraggstate->sortstate);
297                 while (tuplesort_getdatum(peraggstate->sortstate, true,
298                                                                   &newVal, &isNull))
299                 {
300                         if (isNull)
301                                 continue;
302                         if (haveOldVal)
303                         {
304                                 Datum           equal;
305
306                                 equal = (Datum) (*fmgr_faddr(&peraggstate->equalfn)) (oldVal,
307                                                                                                                                  newVal);
308                                 if (DatumGetInt32(equal) != 0)
309                                 {
310                                         if (!peraggstate->inputtypeByVal)
311                                                 pfree(DatumGetPointer(newVal));
312                                         continue;
313                                 }
314                         }
315                         advance_transition_functions(peraggstate, newVal, false);
316                         if (haveOldVal && !peraggstate->inputtypeByVal)
317                                 pfree(DatumGetPointer(oldVal));
318                         oldVal = newVal;
319                         haveOldVal = true;
320                 }
321                 if (haveOldVal && !peraggstate->inputtypeByVal)
322                         pfree(DatumGetPointer(oldVal));
323                 tuplesort_end(peraggstate->sortstate);
324                 peraggstate->sortstate = NULL;
325         }
326
327         /*
328          * Now apply the agg's finalfn, or substitute the appropriate
329          * transition value if there is no finalfn.
330          *
331          * XXX For now, only apply finalfn if we got at least one non-null input
332          * value.  This prevents zero divide in AVG(). If we had cleaner
333          * handling of null inputs/results in functions, we could probably
334          * take out this hack and define the result for no inputs as whatever
335          * finalfn returns for null input.
336          */
337         if (OidIsValid(peraggstate->finalfn_oid) &&
338                 !peraggstate->noInitValue)
339         {
340                 if (peraggstate->finalfn.fn_nargs > 1)
341                 {
342                         args[0] = (char *) peraggstate->value1;
343                         args[1] = (char *) peraggstate->value2;
344                 }
345                 else if (OidIsValid(peraggstate->xfn1_oid))
346                         args[0] = (char *) peraggstate->value1;
347                 else if (OidIsValid(peraggstate->xfn2_oid))
348                         args[0] = (char *) peraggstate->value2;
349                 else
350                         elog(ERROR, "ExecAgg: no valid transition functions??");
351                 *resultIsNull = false;
352                 *resultVal = (Datum) fmgr_c(&peraggstate->finalfn,
353                                                                         (FmgrValues *) args,
354                                                                         resultIsNull);
355         }
356         else if (OidIsValid(peraggstate->xfn1_oid))
357         {
358                 /* Return value1 */
359                 *resultVal = peraggstate->value1;
360                 *resultIsNull = peraggstate->value1IsNull;
361                 /* prevent pfree below */
362                 peraggstate->value1IsNull = true;
363         }
364         else if (OidIsValid(peraggstate->xfn2_oid))
365         {
366                 /* Return value2 */
367                 *resultVal = peraggstate->value2;
368                 *resultIsNull = peraggstate->value2IsNull;
369                 /* prevent pfree below */
370                 peraggstate->value2IsNull = true;
371         }
372         else
373                 elog(ERROR, "ExecAgg: no valid transition functions??");
374
375         /*
376          * Release any per-group working storage, unless we're passing it back
377          * as the result of the aggregate.
378          */
379         if (OidIsValid(peraggstate->xfn1_oid) &&
380                 !peraggstate->value1IsNull &&
381                 !peraggstate->transtype1ByVal)
382                 pfree(peraggstate->value1);
383
384         if (OidIsValid(peraggstate->xfn2_oid) &&
385                 !peraggstate->value2IsNull &&
386                 !peraggstate->transtype2ByVal)
387                 pfree(peraggstate->value2);
388 }
389
390 /* ---------------------------------------
391  *
392  * ExecAgg -
393  *
394  *        ExecAgg receives tuples from its outer subplan and aggregates over
395  *        the appropriate attribute for each aggregate function use (Aggref
396  *        node) appearing in the targetlist or qual of the node.  The number
397  *        of tuples to aggregate over depends on whether a GROUP BY clause is
398  *        present.      We can produce an aggregate result row per group, or just
399  *        one for the whole query.      The value of each aggregate is stored in
400  *        the expression context to be used when ExecProject evaluates the
401  *        result tuple.
402  *
403  *        If the outer subplan is a Group node, ExecAgg returns as many tuples
404  *        as there are groups.
405  *
406  * ------------------------------------------
407  */
408 TupleTableSlot *
409 ExecAgg(Agg *node)
410 {
411         AggState   *aggstate;
412         EState     *estate;
413         Plan       *outerPlan;
414         ExprContext *econtext;
415         ProjectionInfo *projInfo;
416         Datum      *aggvalues;
417         bool       *aggnulls;
418         AggStatePerAgg peragg;
419         TupleTableSlot *resultSlot;
420         HeapTuple       inputTuple;
421         int                     aggno;
422         bool            isDone;
423         bool            isNull;
424
425         /* ---------------------
426          *      get state info from node
427          * ---------------------
428          */
429         aggstate = node->aggstate;
430         estate = node->plan.state;
431         outerPlan = outerPlan(node);
432         econtext = aggstate->csstate.cstate.cs_ExprContext;
433         aggvalues = econtext->ecxt_aggvalues;
434         aggnulls = econtext->ecxt_aggnulls;
435         projInfo = aggstate->csstate.cstate.cs_ProjInfo;
436         peragg = aggstate->peragg;
437
438         /*
439          * We loop retrieving groups until we find one matching
440          * node->plan.qual
441          */
442         do
443         {
444                 if (aggstate->agg_done)
445                         return NULL;
446
447                 /*
448                  * Initialize working state for a new input tuple group
449                  */
450                 for (aggno = 0; aggno < aggstate->numaggs; aggno++)
451                 {
452                         AggStatePerAgg peraggstate = &peragg[aggno];
453
454                         initialize_aggregate(peraggstate);
455                 }
456
457                 inputTuple = NULL;              /* no saved input tuple yet */
458
459                 /* ----------------
460                  *       for each tuple from the outer plan, update all the aggregates
461                  * ----------------
462                  */
463                 for (;;)
464                 {
465                         TupleTableSlot *outerslot;
466
467                         outerslot = ExecProcNode(outerPlan, (Plan *) node);
468                         if (TupIsNull(outerslot))
469                                 break;
470                         econtext->ecxt_scantuple = outerslot;
471
472                         for (aggno = 0; aggno < aggstate->numaggs; aggno++)
473                         {
474                                 AggStatePerAgg peraggstate = &peragg[aggno];
475                                 Aggref     *aggref = peraggstate->aggref;
476                                 Datum           newVal;
477
478                                 newVal = ExecEvalExpr(aggref->target, econtext,
479                                                                           &isNull, &isDone);
480
481                                 if (isNull && !aggref->usenulls)
482                                         continue;       /* ignore this tuple for this agg */
483
484                                 if (aggref->aggdistinct)
485                                         tuplesort_putdatum(peraggstate->sortstate,
486                                                                            newVal, isNull);
487                                 else
488                                         advance_transition_functions(peraggstate,
489                                                                                                  newVal, isNull);
490                         }
491
492                         /*
493                          * Keep a copy of the first input tuple for the projection.
494                          * (We only need one since only the GROUP BY columns in it can
495                          * be referenced, and these will be the same for all tuples
496                          * aggregated over.)
497                          */
498                         if (!inputTuple)
499                                 inputTuple = heap_copytuple(outerslot->val);
500                 }
501
502                 /*
503                  * Done scanning input tuple group. Finalize each aggregate
504                  * calculation.
505                  */
506                 for (aggno = 0; aggno < aggstate->numaggs; aggno++)
507                 {
508                         AggStatePerAgg peraggstate = &peragg[aggno];
509
510                         finalize_aggregate(peraggstate,
511                                                            &aggvalues[aggno], &aggnulls[aggno]);
512                 }
513
514                 /*
515                  * If the outerPlan is a Group node, we will reach here after each
516                  * group.  We are not done unless the Group node is done (a little
517                  * ugliness here while we reach into the Group's state to find
518                  * out). Furthermore, when grouping we return nothing at all
519                  * unless we had some input tuple(s).  By the nature of Group,
520                  * there are no empty groups, so if we get here with no input the
521                  * whole scan is empty.
522                  *
523                  * If the outerPlan isn't a Group, we are done when we get here, and
524                  * we will emit a (single) tuple even if there were no input
525                  * tuples.
526                  */
527                 if (IsA(outerPlan, Group))
528                 {
529                         /* aggregation over groups */
530                         aggstate->agg_done = ((Group *) outerPlan)->grpstate->grp_done;
531                         /* check for no groups */
532                         if (inputTuple == NULL)
533                                 return NULL;
534                 }
535                 else
536                 {
537                         aggstate->agg_done = true;
538
539                         /*
540                          * If inputtuple==NULL (ie, the outerPlan didn't return
541                          * anything), create a dummy all-nulls input tuple for use by
542                          * execProject. 99.44% of the time this is a waste of cycles,
543                          * because ordinarily the projected output tuple's targetlist
544                          * cannot contain any direct (non-aggregated) references to
545                          * input columns, so the dummy tuple will not be referenced.
546                          * However there are special cases where this isn't so --- in
547                          * particular an UPDATE involving an aggregate will have a
548                          * targetlist reference to ctid.  We need to return a null for
549                          * ctid in that situation, not coredump.
550                          *
551                          * The values returned for the aggregates will be the initial
552                          * values of the transition functions.
553                          */
554                         if (inputTuple == NULL)
555                         {
556                                 TupleDesc       tupType;
557                                 Datum      *tupValue;
558                                 char       *null_array;
559                                 AttrNumber      attnum;
560
561                                 tupType = aggstate->csstate.css_ScanTupleSlot->ttc_tupleDescriptor;
562                                 tupValue = projInfo->pi_tupValue;
563                                 /* watch out for null input tuples, though... */
564                                 if (tupType && tupValue)
565                                 {
566                                         null_array = (char *) palloc(sizeof(char) * tupType->natts);
567                                         for (attnum = 0; attnum < tupType->natts; attnum++)
568                                                 null_array[attnum] = 'n';
569                                         inputTuple = heap_formtuple(tupType, tupValue, null_array);
570                                         pfree(null_array);
571                                 }
572                         }
573                 }
574
575                 /*
576                  * Store the representative input tuple in the tuple table slot
577                  * reserved for it.
578                  */
579                 ExecStoreTuple(inputTuple,
580                                            aggstate->csstate.css_ScanTupleSlot,
581                                            InvalidBuffer,
582                                            true);
583                 econtext->ecxt_scantuple = aggstate->csstate.css_ScanTupleSlot;
584
585                 /*
586                  * Form a projection tuple using the aggregate results and the
587                  * representative input tuple.  Store it in the result tuple slot.
588                  */
589                 resultSlot = ExecProject(projInfo, &isDone);
590
591                 /*
592                  * If the completed tuple does not match the qualifications, it is
593                  * ignored and we loop back to try to process another group.
594                  * Otherwise, return the tuple.
595                  */
596         }
597         while (!ExecQual(node->plan.qual, econtext, false));
598
599         return resultSlot;
600 }
601
602 /* -----------------
603  * ExecInitAgg
604  *
605  *      Creates the run-time information for the agg node produced by the
606  *      planner and initializes its outer subtree
607  * -----------------
608  */
609 bool
610 ExecInitAgg(Agg *node, EState *estate, Plan *parent)
611 {
612         AggState   *aggstate;
613         AggStatePerAgg peragg;
614         Plan       *outerPlan;
615         ExprContext *econtext;
616         int                     numaggs,
617                                 aggno;
618         List       *alist;
619
620         /*
621          * assign the node's execution state
622          */
623         node->plan.state = estate;
624
625         /*
626          * create state structure
627          */
628         aggstate = makeNode(AggState);
629         node->aggstate = aggstate;
630         aggstate->agg_done = false;
631
632         /*
633          * find aggregates in targetlist and quals
634          *
635          * Note: pull_agg_clauses also checks that no aggs contain other agg
636          * calls in their arguments.  This would make no sense under SQL
637          * semantics anyway (and it's forbidden by the spec).  Because that is
638          * true, we don't need to worry about evaluating the aggs in any
639          * particular order.
640          */
641         aggstate->aggs = nconc(pull_agg_clause((Node *) node->plan.targetlist),
642                                                    pull_agg_clause((Node *) node->plan.qual));
643         aggstate->numaggs = numaggs = length(aggstate->aggs);
644         if (numaggs <= 0)
645         {
646
647                 /*
648                  * This used to be treated as an error, but we can't do that
649                  * anymore because constant-expression simplification could
650                  * optimize away all of the Aggrefs in the targetlist and qual.
651                  * So, just make a debug note, and force numaggs positive so that
652                  * palloc()s below don't choke.
653                  */
654                 elog(DEBUG, "ExecInitAgg: could not find any aggregate functions");
655                 numaggs = 1;
656         }
657
658         /*
659          * assign node's base id and create expression context
660          */
661         ExecAssignNodeBaseInfo(estate, &aggstate->csstate.cstate, (Plan *) parent);
662         ExecAssignExprContext(estate, &aggstate->csstate.cstate);
663
664 #define AGG_NSLOTS 2
665
666         /*
667          * tuple table initialization
668          */
669         ExecInitScanTupleSlot(estate, &aggstate->csstate);
670         ExecInitResultTupleSlot(estate, &aggstate->csstate.cstate);
671
672         /*
673          * Set up aggregate-result storage in the expr context, and also
674          * allocate my private per-agg working storage
675          */
676         econtext = aggstate->csstate.cstate.cs_ExprContext;
677         econtext->ecxt_aggvalues = (Datum *) palloc(sizeof(Datum) * numaggs);
678         MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * numaggs);
679         econtext->ecxt_aggnulls = (bool *) palloc(sizeof(bool) * numaggs);
680         MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * numaggs);
681
682         peragg = (AggStatePerAgg) palloc(sizeof(AggStatePerAggData) * numaggs);
683         MemSet(peragg, 0, sizeof(AggStatePerAggData) * numaggs);
684         aggstate->peragg = peragg;
685
686         /*
687          * initialize child nodes
688          */
689         outerPlan = outerPlan(node);
690         ExecInitNode(outerPlan, estate, (Plan *) node);
691
692         /* ----------------
693          *      initialize source tuple type.
694          * ----------------
695          */
696         ExecAssignScanTypeFromOuterPlan((Plan *) node, &aggstate->csstate);
697
698         /*
699          * Initialize result tuple type and projection info.
700          */
701         ExecAssignResultTypeFromTL((Plan *) node, &aggstate->csstate.cstate);
702         ExecAssignProjectionInfo((Plan *) node, &aggstate->csstate.cstate);
703
704         /*
705          * Perform lookups of aggregate function info, and initialize the
706          * unchanging fields of the per-agg data
707          */
708         aggno = -1;
709         foreach(alist, aggstate->aggs)
710         {
711                 Aggref     *aggref = (Aggref *) lfirst(alist);
712                 AggStatePerAgg peraggstate = &peragg[++aggno];
713                 char       *aggname = aggref->aggname;
714                 HeapTuple       aggTuple;
715                 Form_pg_aggregate aggform;
716                 Type            typeInfo;
717                 Oid                     xfn1_oid,
718                                         xfn2_oid,
719                                         finalfn_oid;
720
721                 /* Mark Aggref node with its associated index in the result array */
722                 aggref->aggno = aggno;
723
724                 /* Fill in the peraggstate data */
725                 peraggstate->aggref = aggref;
726
727                 aggTuple = SearchSysCacheTuple(AGGNAME,
728                                                                            PointerGetDatum(aggname),
729                                                                            ObjectIdGetDatum(aggref->basetype),
730                                                                            0, 0);
731                 if (!HeapTupleIsValid(aggTuple))
732                         elog(ERROR, "ExecAgg: cache lookup failed for aggregate %s(%s)",
733                                  aggname,
734                                  typeidTypeName(aggref->basetype));
735                 aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
736
737                 peraggstate->initValue1 = (Datum)
738                         AggNameGetInitVal(aggname,
739                                                           aggform->aggbasetype,
740                                                           1,
741                                                           &peraggstate->initValue1IsNull);
742
743                 peraggstate->initValue2 = (Datum)
744                         AggNameGetInitVal(aggname,
745                                                           aggform->aggbasetype,
746                                                           2,
747                                                           &peraggstate->initValue2IsNull);
748
749                 peraggstate->xfn1_oid = xfn1_oid = aggform->aggtransfn1;
750                 peraggstate->xfn2_oid = xfn2_oid = aggform->aggtransfn2;
751                 peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
752
753                 if (OidIsValid(xfn1_oid))
754                 {
755                         fmgr_info(xfn1_oid, &peraggstate->xfn1);
756                         /* If a transfn1 is specified, transtype1 had better be, too */
757                         typeInfo = typeidType(aggform->aggtranstype1);
758                         peraggstate->transtype1Len = typeLen(typeInfo);
759                         peraggstate->transtype1ByVal = typeByVal(typeInfo);
760                 }
761
762                 if (OidIsValid(xfn2_oid))
763                 {
764                         fmgr_info(xfn2_oid, &peraggstate->xfn2);
765                         /* If a transfn2 is specified, transtype2 had better be, too */
766                         typeInfo = typeidType(aggform->aggtranstype2);
767                         peraggstate->transtype2Len = typeLen(typeInfo);
768                         peraggstate->transtype2ByVal = typeByVal(typeInfo);
769                         /* ------------------------------------------
770                          * If there is a second transition function, its initial
771                          * value must exist -- as it does not depend on data values,
772                          * we have no other way of determining an initial value.
773                          * ------------------------------------------
774                          */
775                         if (peraggstate->initValue2IsNull)
776                                 elog(ERROR, "ExecInitAgg: agginitval2 is null");
777                 }
778
779                 if (OidIsValid(finalfn_oid))
780                         fmgr_info(finalfn_oid, &peraggstate->finalfn);
781
782                 if (aggref->aggdistinct)
783                 {
784                         Oid                     inputType = exprType(aggref->target);
785                         Operator        eq_operator;
786                         Form_pg_operator pgopform;
787
788                         peraggstate->inputType = inputType;
789                         typeInfo = typeidType(inputType);
790                         peraggstate->inputtypeLen = typeLen(typeInfo);
791                         peraggstate->inputtypeByVal = typeByVal(typeInfo);
792
793                         eq_operator = oper("=", inputType, inputType, true);
794                         if (!HeapTupleIsValid(eq_operator))
795                         {
796                                 elog(ERROR, "Unable to identify an equality operator for type '%s'",
797                                          typeidTypeName(inputType));
798                         }
799                         pgopform = (Form_pg_operator) GETSTRUCT(eq_operator);
800                         fmgr_info(pgopform->oprcode, &(peraggstate->equalfn));
801                         peraggstate->sortOperator = any_ordering_op(inputType);
802                         peraggstate->sortstate = NULL;
803                 }
804         }
805
806         return TRUE;
807 }
808
809 int
810 ExecCountSlotsAgg(Agg *node)
811 {
812         return ExecCountSlotsNode(outerPlan(node)) +
813         ExecCountSlotsNode(innerPlan(node)) +
814         AGG_NSLOTS;
815 }
816
817 void
818 ExecEndAgg(Agg *node)
819 {
820         AggState   *aggstate = node->aggstate;
821         Plan       *outerPlan;
822
823         ExecFreeProjectionInfo(&aggstate->csstate.cstate);
824
825         outerPlan = outerPlan(node);
826         ExecEndNode(outerPlan, (Plan *) node);
827
828         /* clean up tuple table */
829         ExecClearTuple(aggstate->csstate.css_ScanTupleSlot);
830 }
831
832 void
833 ExecReScanAgg(Agg *node, ExprContext *exprCtxt, Plan *parent)
834 {
835         AggState   *aggstate = node->aggstate;
836         ExprContext *econtext = aggstate->csstate.cstate.cs_ExprContext;
837
838         aggstate->agg_done = false;
839         MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * aggstate->numaggs);
840         MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * aggstate->numaggs);
841
842         /*
843          * if chgParam of subnode is not null then plan will be re-scanned by
844          * first ExecProcNode.
845          */
846         if (((Plan *) node)->lefttree->chgParam == NULL)
847                 ExecReScan(((Plan *) node)->lefttree, exprCtxt, (Plan *) node);
848
849 }
850
851
852 /*
853  * Helper routine to make a copy of a Datum.
854  *
855  * NB: input had better not be a NULL; might cause null-pointer dereference.
856  */
857 static Datum
858 copyDatum(Datum val, int typLen, bool typByVal)
859 {
860         if (typByVal)
861                 return val;
862         else
863         {
864                 char       *newVal;
865
866                 if (typLen == -1)               /* variable length type? */
867                         typLen = VARSIZE((struct varlena *) DatumGetPointer(val));
868                 newVal = (char *) palloc(typLen);
869                 memcpy(newVal, DatumGetPointer(val), typLen);
870                 return PointerGetDatum(newVal);
871         }
872 }