]> granicus.if.org Git - postgresql/commitdiff
Support parallel aggregation.
authorRobert Haas <rhaas@postgresql.org>
Mon, 21 Mar 2016 13:20:53 +0000 (09:20 -0400)
committerRobert Haas <rhaas@postgresql.org>
Mon, 21 Mar 2016 13:30:18 +0000 (09:30 -0400)
Parallel workers can now partially aggregate the data and pass the
transition values back to the leader, which can combine the partial
results to produce the final answer.

David Rowley, based on earlier work by Haribabu Kommi.  Reviewed by
Álvaro Herrera, Tomas Vondra, Amit Kapila, James Sewell, and me.

23 files changed:
src/backend/executor/execQual.c
src/backend/nodes/copyfuncs.c
src/backend/nodes/equalfuncs.c
src/backend/nodes/nodeFuncs.c
src/backend/nodes/outfuncs.c
src/backend/nodes/readfuncs.c
src/backend/optimizer/path/allpaths.c
src/backend/optimizer/path/costsize.c
src/backend/optimizer/plan/createplan.c
src/backend/optimizer/plan/planner.c
src/backend/optimizer/plan/setrefs.c
src/backend/optimizer/prep/prepunion.c
src/backend/optimizer/util/clauses.c
src/backend/optimizer/util/pathnode.c
src/backend/optimizer/util/tlist.c
src/backend/parser/parse_func.c
src/include/catalog/catversion.h
src/include/nodes/primnodes.h
src/include/nodes/relation.h
src/include/optimizer/clauses.h
src/include/optimizer/cost.h
src/include/optimizer/pathnode.h
src/include/optimizer/tlist.h

index 778b6c1552f044238662180e6337f39392b0d3b9..4df4a9b0f7d4587c0679ad1a6c06210332530e23 100644 (file)
@@ -4515,6 +4515,14 @@ ExecInitExpr(Expr *node, PlanState *parent)
                                if (parent && IsA(parent, AggState))
                                {
                                        AggState   *aggstate = (AggState *) parent;
+                                       Aggref     *aggref = (Aggref *) node;
+
+                                       if (aggstate->finalizeAggs &&
+                                               aggref->aggoutputtype != aggref->aggtype)
+                                       {
+                                               /* planner messed up */
+                                               elog(ERROR, "Aggref aggoutputtype must match aggtype");
+                                       }
 
                                        aggstate->aggs = lcons(astate, aggstate->aggs);
                                        aggstate->numaggs++;
index 458983430576d0a9f37b30c2ba2c512065cde08c..6b5d1d6efce66e7949d18e176c0f148b932fc28b 100644 (file)
@@ -1233,6 +1233,7 @@ _copyAggref(const Aggref *from)
 
        COPY_SCALAR_FIELD(aggfnoid);
        COPY_SCALAR_FIELD(aggtype);
+       COPY_SCALAR_FIELD(aggoutputtype);
        COPY_SCALAR_FIELD(aggcollid);
        COPY_SCALAR_FIELD(inputcollid);
        COPY_NODE_FIELD(aggdirectargs);
index b9c395942831c3fee973e0cd984f1ac215717b5f..87eb859e05e81004bb4eafb917d738d4a6a96d7b 100644 (file)
@@ -192,6 +192,7 @@ _equalAggref(const Aggref *a, const Aggref *b)
 {
        COMPARE_SCALAR_FIELD(aggfnoid);
        COMPARE_SCALAR_FIELD(aggtype);
+       COMPARE_SCALAR_FIELD(aggoutputtype);
        COMPARE_SCALAR_FIELD(aggcollid);
        COMPARE_SCALAR_FIELD(inputcollid);
        COMPARE_NODE_FIELD(aggdirectargs);
index b4ea440f0523f361203dfdfde5820a6b29f9fb85..46af872116148acc83ae72cb65826f31748a7e04 100644 (file)
@@ -57,7 +57,7 @@ exprType(const Node *expr)
                        type = ((const Param *) expr)->paramtype;
                        break;
                case T_Aggref:
-                       type = ((const Aggref *) expr)->aggtype;
+                       type = ((const Aggref *) expr)->aggoutputtype;
                        break;
                case T_GroupingFunc:
                        type = INT4OID;
index 1144a4c1c71150cf728bcb5a0f3a20f7161929d7..32d03f7f257ca483b6d340c0992bab0fe50fe6d4 100644 (file)
@@ -1033,6 +1033,7 @@ _outAggref(StringInfo str, const Aggref *node)
 
        WRITE_OID_FIELD(aggfnoid);
        WRITE_OID_FIELD(aggtype);
+       WRITE_OID_FIELD(aggoutputtype);
        WRITE_OID_FIELD(aggcollid);
        WRITE_OID_FIELD(inputcollid);
        WRITE_NODE_FIELD(aggdirectargs);
index d63de7f9073a39f6c280b8abfbcd15ea7ee88d77..6db0492e152ada149ccf6ee9a30bba67d50dffb1 100644 (file)
@@ -552,6 +552,7 @@ _readAggref(void)
 
        READ_OID_FIELD(aggfnoid);
        READ_OID_FIELD(aggtype);
+       READ_OID_FIELD(aggoutputtype);
        READ_OID_FIELD(aggcollid);
        READ_OID_FIELD(inputcollid);
        READ_NODE_FIELD(aggdirectargs);
index 4f60b85861da54204524439b83cf1ed861f689b1..e1a5d339f2ac9f2801633e9a99d44b661057c613 100644 (file)
@@ -1968,7 +1968,8 @@ generate_gather_paths(PlannerInfo *root, RelOptInfo *rel)
         */
        cheapest_partial_path = linitial(rel->partial_pathlist);
        simple_gather_path = (Path *)
-               create_gather_path(root, rel, cheapest_partial_path, NULL);
+               create_gather_path(root, rel, cheapest_partial_path, rel->reltarget,
+                                                  NULL, NULL);
        add_path(rel, simple_gather_path);
 }
 
index 943fcde3b8e5b26b3d10fee093741be7fae3d1c0..9f572d759b7c7635c40becb36bbddd12849dcbd5 100644 (file)
@@ -350,16 +350,22 @@ cost_samplescan(Path *path, PlannerInfo *root,
  *
  * 'rel' is the relation to be operated upon
  * 'param_info' is the ParamPathInfo if this is a parameterized path, else NULL
+ * 'rows' may be used to point to a row estimate; if non-NULL, it overrides
+ * both 'rel' and 'param_info'.  This is useful when the path doesn't exactly
+ * correspond to any particular RelOptInfo.
  */
 void
 cost_gather(GatherPath *path, PlannerInfo *root,
-                       RelOptInfo *rel, ParamPathInfo *param_info)
+                       RelOptInfo *rel, ParamPathInfo *param_info,
+                       double *rows)
 {
        Cost            startup_cost = 0;
        Cost            run_cost = 0;
 
        /* Mark the path with the correct row estimate */
-       if (param_info)
+       if (rows)
+               path->path.rows = *rows;
+       else if (param_info)
                path->path.rows = param_info->ppi_rows;
        else
                path->path.rows = rel->rows;
@@ -1751,6 +1757,8 @@ cost_agg(Path *path, PlannerInfo *root,
        {
                /* must be AGG_HASHED */
                startup_cost = input_total_cost;
+               if (!enable_hashagg)
+                       startup_cost += disable_cost;
                startup_cost += aggcosts->transCost.startup;
                startup_cost += aggcosts->transCost.per_tuple * input_tuples;
                startup_cost += (cpu_operator_cost * numGroupCols) * input_tuples;
index 087cb9c4419a28af566112c4c5553bf7d11665e7..d159a17fd264ace9478945197343e827bc04151c 100644 (file)
@@ -1575,8 +1575,8 @@ create_agg_plan(PlannerInfo *root, AggPath *best_path)
 
        plan = make_agg(tlist, quals,
                                        best_path->aggstrategy,
-                                       false,
-                                       true,
+                                       best_path->combineStates,
+                                       best_path->finalizeAggs,
                                        list_length(best_path->groupClause),
                                        extract_grouping_cols(best_path->groupClause,
                                                                                  subplan->targetlist),
index fc0a2d8de35c34da3d25a589c2eaa25eeebaae14..5229c845d2028ffdf3326058264662d374a51de3 100644 (file)
@@ -106,6 +106,11 @@ static double get_number_of_groups(PlannerInfo *root,
                                         double path_rows,
                                         List *rollup_lists,
                                         List *rollup_groupclauses);
+static void set_grouped_rel_consider_parallel(PlannerInfo *root,
+                                        RelOptInfo *grouped_rel,
+                                        PathTarget *target);
+static Size estimate_hashagg_tablesize(Path *path, AggClauseCosts *agg_costs,
+                                        double dNumGroups);
 static RelOptInfo *create_grouping_paths(PlannerInfo *root,
                                          RelOptInfo *input_rel,
                                          PathTarget *target,
@@ -134,6 +139,8 @@ static RelOptInfo *create_ordered_paths(PlannerInfo *root,
                                         double limit_tuples);
 static PathTarget *make_group_input_target(PlannerInfo *root,
                                                PathTarget *final_target);
+static PathTarget *make_partialgroup_input_target(PlannerInfo *root,
+                                                                                                 PathTarget *final_target);
 static List *postprocess_setop_tlist(List *new_tlist, List *orig_tlist);
 static List *select_active_windows(PlannerInfo *root, WindowFuncLists *wflists);
 static PathTarget *make_window_input_target(PlannerInfo *root,
@@ -1740,6 +1747,19 @@ grouping_planner(PlannerInfo *root, bool inheritance_update,
                        }
                }
 
+               /*
+                * Likewise for any partial paths, although this case is simpler, since
+                * we don't track the cheapest path.
+                */
+               foreach(lc, current_rel->partial_pathlist)
+               {
+                       Path       *subpath = (Path *) lfirst(lc);
+
+                       Assert(subpath->param_info == NULL);
+                       lfirst(lc) = apply_projection_to_path(root, current_rel,
+                                                                                       subpath, scanjoin_target);
+               }
+
                /*
                 * Save the various upper-rel PathTargets we just computed into
                 * root->upper_targets[].  The core code doesn't use this, but it
@@ -3133,6 +3153,79 @@ get_number_of_groups(PlannerInfo *root,
        return dNumGroups;
 }
 
+/*
+ * set_grouped_rel_consider_parallel
+ *       Determine if it's safe to generate partial paths for grouping.
+ */
+static void
+set_grouped_rel_consider_parallel(PlannerInfo *root, RelOptInfo *grouped_rel,
+                                                                 PathTarget *target)
+{
+       Query      *parse = root->parse;
+
+       Assert(grouped_rel->reloptkind == RELOPT_UPPER_REL);
+
+       /*
+        * If there are no aggregates or GROUP BY clause, then no parallel
+        * aggregation is possible.  At present, it doesn't matter whether
+        * consider_parallel gets set in this case, because none of the upper rels
+        * on top of this one try to set the flag or examine it, so we just bail
+        * out as quickly as possible.  We might need to be more clever here in
+        * the future.
+        */
+       if (!parse->hasAggs && parse->groupClause == NIL)
+               return;
+
+       /*
+        * Similarly, bail out quickly if GROUPING SETS are present; we can't
+        * support those at present.
+        */
+       if (parse->groupingSets)
+               return;
+
+       /*
+        * If parallel-restricted functiosn are present in the target list or the
+        * HAVING clause, we cannot safely go parallel.
+        */
+       if (has_parallel_hazard((Node *) target->exprs, false) ||
+               has_parallel_hazard((Node *) parse->havingQual, false))
+               return;
+
+       /*
+        * All that's left to check now is to make sure all aggregate functions
+        * support partial mode. If there's no aggregates then we can skip checking
+        * that.
+        */
+       if (!parse->hasAggs)
+               grouped_rel->consider_parallel = true;
+       else if (aggregates_allow_partial((Node *) target->exprs) == PAT_ANY &&
+                        aggregates_allow_partial(root->parse->havingQual) == PAT_ANY)
+               grouped_rel->consider_parallel = true;
+}
+
+/*
+ * estimate_hashagg_tablesize
+ *       estimate the number of bytes that a hash aggregate hashtable will
+ *       require based on the agg_costs, path width and dNumGroups.
+ */
+static Size
+estimate_hashagg_tablesize(Path *path, AggClauseCosts *agg_costs,
+                                                  double dNumGroups)
+{
+       Size            hashentrysize;
+
+       /* Estimate per-hash-entry space at tuple width... */
+       hashentrysize = MAXALIGN(path->pathtarget->width) +
+               MAXALIGN(SizeofMinimalTupleHeader);
+
+       /* plus space for pass-by-ref transition values... */
+       hashentrysize += agg_costs->transitionSpace;
+       /* plus the per-hash-entry overhead */
+       hashentrysize += hash_agg_entry_size(agg_costs->numAggs);
+
+       return hashentrysize * dNumGroups;
+}
+
 /*
  * create_grouping_paths
  *
@@ -3149,9 +3242,8 @@ get_number_of_groups(PlannerInfo *root,
  *
  * We need to consider sorted and hashed aggregation in the same function,
  * because otherwise (1) it would be harder to throw an appropriate error
- * message if neither way works, and (2) we should not allow enable_hashagg or
- * hashtable size considerations to dissuade us from using hashing if sorting
- * is not possible.
+ * message if neither way works, and (2) we should not allow hashtable size
+ * considerations to dissuade us from using hashing if sorting is not possible.
  */
 static RelOptInfo *
 create_grouping_paths(PlannerInfo *root,
@@ -3163,9 +3255,14 @@ create_grouping_paths(PlannerInfo *root,
        Query      *parse = root->parse;
        Path       *cheapest_path = input_rel->cheapest_total_path;
        RelOptInfo *grouped_rel;
+       PathTarget *partial_grouping_target = NULL;
        AggClauseCosts agg_costs;
+       Size            hashaggtablesize;
        double          dNumGroups;
-       bool            allow_hash;
+       double          dNumPartialGroups = 0;
+       bool            can_hash;
+       bool            can_sort;
+
        ListCell   *lc;
 
        /* For now, do all work in the (GROUP_AGG, NULL) upperrel */
@@ -3259,12 +3356,155 @@ create_grouping_paths(PlannerInfo *root,
                                                                          rollup_groupclauses);
 
        /*
-        * Consider sort-based implementations of grouping, if possible.  (Note
-        * that if groupClause is empty, grouping_is_sortable() is trivially true,
-        * and all the pathkeys_contained_in() tests will succeed too, so that
-        * we'll consider every surviving input path.)
+        * Partial paths in the input rel could allow us to perform aggregation in
+        * parallel. set_grouped_rel_consider_parallel() will determine if it's
+        * going to be safe to do so.
+        */
+       if (input_rel->partial_pathlist != NIL)
+               set_grouped_rel_consider_parallel(root, grouped_rel, target);
+
+       /*
+        * Determine whether it's possible to perform sort-based implementations
+        * of grouping.  (Note that if groupClause is empty, grouping_is_sortable()
+        * is trivially true, and all the pathkeys_contained_in() tests will
+        * succeed too, so that we'll consider every surviving input path.)
         */
-       if (grouping_is_sortable(parse->groupClause))
+       can_sort = grouping_is_sortable(parse->groupClause);
+
+       /*
+        * Determine whether we should consider hash-based implementations of
+        * grouping.
+        *
+        * Hashed aggregation only applies if we're grouping.  We currently can't
+        * hash if there are grouping sets, though.
+        *
+        * Executor doesn't support hashed aggregation with DISTINCT or ORDER BY
+        * aggregates.  (Doing so would imply storing *all* the input values in
+        * the hash table, and/or running many sorts in parallel, either of which
+        * seems like a certain loser.)  We similarly don't support ordered-set
+        * aggregates in hashed aggregation, but that case is also included in the
+        * numOrderedAggs count.
+        *
+        * Note: grouping_is_hashable() is much more expensive to check than the
+        * other gating conditions, so we want to do it last.
+        */
+       can_hash = (parse->groupClause != NIL &&
+                               parse->groupingSets == NIL &&
+                               agg_costs.numOrderedAggs == 0 &&
+                               grouping_is_hashable(parse->groupClause));
+
+       /*
+        * Before generating paths for grouped_rel, we first generate any possible
+        * partial paths; that way, later code can easily consider both parallel
+        * and non-parallel approaches to grouping.  Note that the partial paths
+        * we generate here are also partially aggregated, so simply pushing a
+        * Gather node on top is insufficient to create a final path, as would be
+        * the case for a scan/join rel.
+        */
+       if (grouped_rel->consider_parallel)
+       {
+               Path   *cheapest_partial_path = linitial(input_rel->partial_pathlist);
+
+               /*
+                * Build target list for partial aggregate paths. We cannot reuse the
+                * final target as Aggrefs must be set in partial mode, and we must
+                * also include Aggrefs from the HAVING clause in the target as these
+                * may not be present in the final target.
+                */
+               partial_grouping_target = make_partialgroup_input_target(root, target);
+
+               /* Estimate number of partial groups. */
+               dNumPartialGroups = get_number_of_groups(root,
+                                                               clamp_row_est(cheapest_partial_path->rows),
+                                                                                                NIL,
+                                                                                                NIL);
+
+               if (can_sort)
+               {
+                       /* Checked in set_grouped_rel_consider_parallel() */
+                       Assert(parse->hasAggs || parse->groupClause);
+
+                       /*
+                        * Use any available suitably-sorted path as input, and also
+                        * consider sorting the cheapest partial path.
+                        */
+                       foreach(lc, input_rel->partial_pathlist)
+                       {
+                               Path       *path = (Path *) lfirst(lc);
+                               bool            is_sorted;
+
+                               is_sorted = pathkeys_contained_in(root->group_pathkeys,
+                                                                                                 path->pathkeys);
+                               if (path == cheapest_partial_path || is_sorted)
+                               {
+                                       /* Sort the cheapest partial path, if it isn't already */
+                                       if (!is_sorted)
+                                               path = (Path *) create_sort_path(root,
+                                                                                                                grouped_rel,
+                                                                                                                path,
+                                                                                                                root->group_pathkeys,
+                                                                                                                -1.0);
+
+                                       if (parse->hasAggs)
+                                               add_partial_path(grouped_rel, (Path *)
+                                                                       create_agg_path(root,
+                                                                                                       grouped_rel,
+                                                                                                       path,
+                                                                                                       partial_grouping_target,
+                                                               parse->groupClause ? AGG_SORTED : AGG_PLAIN,
+                                                                                                       parse->groupClause,
+                                                                                                       NIL,
+                                                                                                       &agg_costs,
+                                                                                                       dNumPartialGroups,
+                                                                                                       false,
+                                                                                                       false));
+                                       else
+                                               add_partial_path(grouped_rel, (Path *)
+                                                                       create_group_path(root,
+                                                                                                         grouped_rel,
+                                                                                                         path,
+                                                                                                         partial_grouping_target,
+                                                                                                         parse->groupClause,
+                                                                                                         NIL,
+                                                                                                         dNumPartialGroups));
+                               }
+                       }
+               }
+
+               if (can_hash)
+               {
+                       /* Checked above */
+                       Assert(parse->hasAggs || parse->groupClause);
+
+                       hashaggtablesize =
+                               estimate_hashagg_tablesize(cheapest_partial_path,
+                                                                                  &agg_costs,
+                                                                                  dNumPartialGroups);
+
+                       /*
+                        * Tentatively produce a partial HashAgg Path, depending on if it
+                        * looks as if the hash table will fit in work_mem.
+                        */
+                       if (hashaggtablesize < work_mem * 1024L)
+                       {
+                               add_partial_path(grouped_rel, (Path *)
+                                                       create_agg_path(root,
+                                                                                       grouped_rel,
+                                                                                       cheapest_partial_path,
+                                                                                       partial_grouping_target,
+                                                                                       AGG_HASHED,
+                                                                                       parse->groupClause,
+                                                                                       NIL,
+                                                                                       &agg_costs,
+                                                                                       dNumPartialGroups,
+                                                                                       false,
+                                                                                       false));
+                       }
+               }
+       }
+
+       /* Build final grouping paths */
+       if (can_sort)
        {
                /*
                 * Use any available suitably-sorted path as input, and also consider
@@ -3320,7 +3560,9 @@ create_grouping_paths(PlannerInfo *root,
                                                                                         parse->groupClause,
                                                                                         (List *) parse->havingQual,
                                                                                         &agg_costs,
-                                                                                        dNumGroups));
+                                                                                        dNumGroups,
+                                                                                        false,
+                                                                                        true));
                                }
                                else if (parse->groupClause)
                                {
@@ -3344,69 +3586,131 @@ create_grouping_paths(PlannerInfo *root,
                                }
                        }
                }
-       }
 
-       /*
-        * Consider hash-based implementations of grouping, if possible.
-        *
-        * Hashed aggregation only applies if we're grouping.  We currently can't
-        * hash if there are grouping sets, though.
-        *
-        * Executor doesn't support hashed aggregation with DISTINCT or ORDER BY
-        * aggregates.  (Doing so would imply storing *all* the input values in
-        * the hash table, and/or running many sorts in parallel, either of which
-        * seems like a certain loser.)  We similarly don't support ordered-set
-        * aggregates in hashed aggregation, but that case is also included in the
-        * numOrderedAggs count.
-        *
-        * Note: grouping_is_hashable() is much more expensive to check than the
-        * other gating conditions, so we want to do it last.
-        */
-       allow_hash = (parse->groupClause != NIL &&
-                                 parse->groupingSets == NIL &&
-                                 agg_costs.numOrderedAggs == 0);
-
-       /* Consider reasons to disable hashing, but only if we can sort instead */
-       if (allow_hash && grouped_rel->pathlist != NIL)
-       {
-               if (!enable_hashagg)
-                       allow_hash = false;
-               else
+               /*
+                * Now generate a complete GroupAgg Path atop of the cheapest partial
+                * path. We need only bother with the cheapest path here, as the output
+                * of Gather is never sorted.
+                */
+               if (grouped_rel->partial_pathlist)
                {
+                       Path   *path = (Path *) linitial(grouped_rel->partial_pathlist);
+                       double total_groups = path->rows * path->parallel_degree;
+
+                       path = (Path *) create_gather_path(root,
+                                                                                          grouped_rel,
+                                                                                          path,
+                                                                                          partial_grouping_target,
+                                                                                          NULL,
+                                                                                          &total_groups);
+
                        /*
-                        * Don't hash if it doesn't look like the hashtable will fit into
-                        * work_mem.
+                        * Gather is always unsorted, so we'll need to sort, unless there's
+                        * no GROUP BY clause, in which case there will only be a single
+                        * group.
                         */
-                       Size            hashentrysize;
-
-                       /* Estimate per-hash-entry space at tuple width... */
-                       hashentrysize = MAXALIGN(cheapest_path->pathtarget->width) +
-                               MAXALIGN(SizeofMinimalTupleHeader);
-                       /* plus space for pass-by-ref transition values... */
-                       hashentrysize += agg_costs.transitionSpace;
-                       /* plus the per-hash-entry overhead */
-                       hashentrysize += hash_agg_entry_size(agg_costs.numAggs);
-
-                       if (hashentrysize * dNumGroups > work_mem * 1024L)
-                               allow_hash = false;
+                       if (parse->groupClause)
+                               path = (Path *) create_sort_path(root,
+                                                                                                grouped_rel,
+                                                                                                path,
+                                                                                                root->group_pathkeys,
+                                                                                                -1.0);
+
+                       if (parse->hasAggs)
+                               add_path(grouped_rel, (Path *)
+                                                       create_agg_path(root,
+                                                                                       grouped_rel,
+                                                                                       path,
+                                                                                       target,
+                                                               parse->groupClause ? AGG_SORTED : AGG_PLAIN,
+                                                                                       parse->groupClause,
+                                                                                       (List *) parse->havingQual,
+                                                                                       &agg_costs,
+                                                                                       dNumGroups,
+                                                                                       true,
+                                                                                       true));
+                       else
+                               add_path(grouped_rel, (Path *)
+                                                       create_group_path(root,
+                                                                                         grouped_rel,
+                                                                                         path,
+                                                                                         target,
+                                                                                         parse->groupClause,
+                                                                                         (List *) parse->havingQual,
+                                                                                         dNumGroups));
                }
        }
 
-       if (allow_hash && grouping_is_hashable(parse->groupClause))
+       if (can_hash)
        {
+               hashaggtablesize = estimate_hashagg_tablesize(cheapest_path,
+                                                                                                         &agg_costs,
+                                                                                                         dNumGroups);
+
                /*
-                * We just need an Agg over the cheapest-total input path, since input
-                * order won't matter.
+                * Provided that the estimated size of the hashtable does not exceed
+                * work_mem, we'll generate a HashAgg Path, although if we were unable
+                * to sort above, then we'd better generate a Path, so that we at least
+                * have one.
                 */
-               add_path(grouped_rel, (Path *)
-                                create_agg_path(root, grouped_rel,
-                                                                cheapest_path,
-                                                                target,
-                                                                AGG_HASHED,
-                                                                parse->groupClause,
-                                                                (List *) parse->havingQual,
-                                                                &agg_costs,
-                                                                dNumGroups));
+               if (hashaggtablesize < work_mem * 1024L ||
+                       grouped_rel->pathlist == NIL)
+               {
+                       /*
+                        * We just need an Agg over the cheapest-total input path, since input
+                        * order won't matter.
+                        */
+                       add_path(grouped_rel, (Path *)
+                                        create_agg_path(root, grouped_rel,
+                                                                        cheapest_path,
+                                                                        target,
+                                                                        AGG_HASHED,
+                                                                        parse->groupClause,
+                                                                        (List *) parse->havingQual,
+                                                                        &agg_costs,
+                                                                        dNumGroups,
+                                                                        false,
+                                                                        true));
+               }
+
+               /*
+                * Generate a HashAgg Path atop of the cheapest partial path. Once
+                * again, we'll only do this if it looks as though the hash table won't
+                * exceed work_mem.
+                */
+               if (grouped_rel->partial_pathlist)
+               {
+                       Path   *path = (Path *) linitial(grouped_rel->partial_pathlist);
+
+                       hashaggtablesize = estimate_hashagg_tablesize(path,
+                                                                                                                 &agg_costs,
+                                                                                                                 dNumGroups);
+
+                       if (hashaggtablesize < work_mem * 1024L)
+                       {
+                               double total_groups = path->rows * path->parallel_degree;
+
+                               path = (Path *) create_gather_path(root,
+                                                                                                  grouped_rel,
+                                                                                                  path,
+                                                                                                  partial_grouping_target,
+                                                                                                  NULL,
+                                                                                                  &total_groups);
+
+                               add_path(grouped_rel, (Path *)
+                                                       create_agg_path(root,
+                                                                                       grouped_rel,
+                                                                                       path,
+                                                                                       target,
+                                                                                       AGG_HASHED,
+                                                                                       parse->groupClause,
+                                                                                       (List *) parse->havingQual,
+                                                                                       &agg_costs,
+                                                                                       dNumGroups,
+                                                                                       true,
+                                                                                       true));
+                       }
+               }
        }
 
        /* Give a helpful error if we failed to find any implementation */
@@ -3735,7 +4039,9 @@ create_distinct_paths(PlannerInfo *root,
                                                                 parse->distinctClause,
                                                                 NIL,
                                                                 NULL,
-                                                                numDistinctRows));
+                                                                numDistinctRows,
+                                                                false,
+                                                                true));
        }
 
        /* Give a helpful error if we failed to find any implementation */
@@ -3914,6 +4220,92 @@ make_group_input_target(PlannerInfo *root, PathTarget *final_target)
        return set_pathtarget_cost_width(root, input_target);
 }
 
+/*
+ * make_partialgroup_input_target
+ *       Generate appropriate PathTarget for input for Partial Aggregate nodes.
+ *
+ * Similar to make_group_input_target(), only we don't recurse into Aggrefs, as
+ * we need these to remain intact so that they can be found later in Combine
+ * Aggregate nodes during set_combineagg_references(). Vars will be still
+ * pulled out of non-Aggref nodes as these will still be required by the
+ * combine aggregate phase.
+ *
+ * We also convert any Aggrefs which we do find and put them into partial mode,
+ * this adjusts the Aggref's return type so that the partially calculated
+ * aggregate value can make its way up the execution tree up to the Finalize
+ * Aggregate node.
+ */
+static PathTarget *
+make_partialgroup_input_target(PlannerInfo *root, PathTarget *final_target)
+{
+       Query      *parse = root->parse;
+       PathTarget *input_target;
+       List       *non_group_cols;
+       List       *non_group_exprs;
+       int                     i;
+       ListCell   *lc;
+
+       input_target = create_empty_pathtarget();
+       non_group_cols = NIL;
+
+       i = 0;
+       foreach(lc, final_target->exprs)
+       {
+               Expr       *expr = (Expr *) lfirst(lc);
+               Index           sgref = final_target->sortgrouprefs[i];
+
+               if (sgref && parse->groupClause &&
+                       get_sortgroupref_clause_noerr(sgref, parse->groupClause) != NULL)
+               {
+                       /*
+                        * It's a grouping column, so add it to the input target as-is.
+                        */
+                       add_column_to_pathtarget(input_target, expr, sgref);
+               }
+               else
+               {
+                       /*
+                        * Non-grouping column, so just remember the expression for later
+                        * call to pull_var_clause.
+                        */
+                       non_group_cols = lappend(non_group_cols, expr);
+               }
+
+               i++;
+       }
+
+       /*
+        * If there's a HAVING clause, we'll need the Aggrefs it uses, too.
+        */
+       if (parse->havingQual)
+               non_group_cols = lappend(non_group_cols, parse->havingQual);
+
+       /*
+        * Pull out all the Vars mentioned in non-group cols (plus HAVING), and
+        * add them to the input target if not already present.  (A Var used
+        * directly as a GROUP BY item will be present already.)  Note this
+        * includes Vars used in resjunk items, so we are covering the needs of
+        * ORDER BY and window specifications.  Vars used within Aggrefs will be
+        * ignored and the Aggrefs themselves will be added to the PathTarget.
+        */
+       non_group_exprs = pull_var_clause((Node *) non_group_cols,
+                                                                         PVC_INCLUDE_AGGREGATES |
+                                                                         PVC_RECURSE_WINDOWFUNCS |
+                                                                         PVC_INCLUDE_PLACEHOLDERS);
+
+       add_new_columns_to_pathtarget(input_target, non_group_exprs);
+
+       /* clean up cruft */
+       list_free(non_group_exprs);
+       list_free(non_group_cols);
+
+       /* Adjust Aggrefs to put them in partial mode. */
+       apply_partialaggref_adjustment(input_target);
+
+       /* XXX this causes some redundant cost calculation ... */
+       return set_pathtarget_cost_width(root, input_target);
+}
+
 /*
  * postprocess_setop_tlist
  *       Fix up targetlist returned by plan_set_operations().
index aa2c3084fc8d268637ec2769b81b5ad8b17c2c7d..16f572faf425b3ea2f1c95e1b4d14b4262c884c1 100644 (file)
@@ -104,6 +104,8 @@ static Node *fix_scan_expr_mutator(Node *node, fix_scan_expr_context *context);
 static bool fix_scan_expr_walker(Node *node, fix_scan_expr_context *context);
 static void set_join_references(PlannerInfo *root, Join *join, int rtoffset);
 static void set_upper_references(PlannerInfo *root, Plan *plan, int rtoffset);
+static void set_combineagg_references(PlannerInfo *root, Plan *plan,
+                                                                         int rtoffset);
 static void set_dummy_tlist_references(Plan *plan, int rtoffset);
 static indexed_tlist *build_tlist_index(List *tlist);
 static Var *search_indexed_tlist_for_var(Var *var,
@@ -117,6 +119,8 @@ static Var *search_indexed_tlist_for_sortgroupref(Node *node,
                                                                          Index sortgroupref,
                                                                          indexed_tlist *itlist,
                                                                          Index newvarno);
+static Var *search_indexed_tlist_for_partial_aggref(Aggref *aggref,
+                                          indexed_tlist *itlist, Index newvarno);
 static List *fix_join_expr(PlannerInfo *root,
                          List *clauses,
                          indexed_tlist *outer_itlist,
@@ -131,6 +135,13 @@ static Node *fix_upper_expr(PlannerInfo *root,
                           int rtoffset);
 static Node *fix_upper_expr_mutator(Node *node,
                                           fix_upper_expr_context *context);
+static Node *fix_combine_agg_expr(PlannerInfo *root,
+                                                                 Node *node,
+                                                                 indexed_tlist *subplan_itlist,
+                                                                 Index newvarno,
+                                                                 int rtoffset);
+static Node *fix_combine_agg_expr_mutator(Node *node,
+                                                                                 fix_upper_expr_context *context);
 static List *set_returning_clause_references(PlannerInfo *root,
                                                                List *rlist,
                                                                Plan *topplan,
@@ -667,8 +678,16 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset)
                        }
                        break;
                case T_Agg:
-                       set_upper_references(root, plan, rtoffset);
-                       break;
+                       {
+                               Agg *aggplan = (Agg *) plan;
+
+                               if (aggplan->combineStates)
+                                       set_combineagg_references(root, plan, rtoffset);
+                               else
+                                       set_upper_references(root, plan, rtoffset);
+
+                               break;
+                       }
                case T_Group:
                        set_upper_references(root, plan, rtoffset);
                        break;
@@ -1701,6 +1720,73 @@ set_upper_references(PlannerInfo *root, Plan *plan, int rtoffset)
        pfree(subplan_itlist);
 }
 
+/*
+ * set_combineagg_references
+ *       This serves the same function as set_upper_references(), but treats
+ *       Aggrefs differently. Here we transform Aggref nodes args to suit the
+ *       combine aggregate phase. This means that the Aggref->args are converted
+ *       to reference the corresponding aggregate function in the subplan rather
+ *       than simple Var(s), as would be the case for a non-combine aggregate
+ *       node.
+ */
+static void
+set_combineagg_references(PlannerInfo *root, Plan *plan, int rtoffset)
+{
+       Plan       *subplan = plan->lefttree;
+       indexed_tlist *subplan_itlist;
+       List       *output_targetlist;
+       ListCell   *l;
+
+       Assert(IsA(plan, Agg));
+       Assert(((Agg *) plan)->combineStates);
+
+       subplan_itlist = build_tlist_index(subplan->targetlist);
+
+       output_targetlist = NIL;
+
+       foreach(l, plan->targetlist)
+       {
+               TargetEntry *tle = (TargetEntry *) lfirst(l);
+               Node       *newexpr;
+
+               /* If it's a non-Var sort/group item, first try to match by sortref */
+               if (tle->ressortgroupref != 0 && !IsA(tle->expr, Var))
+               {
+                       newexpr = (Node *)
+                               search_indexed_tlist_for_sortgroupref((Node *) tle->expr,
+                                                                                                         tle->ressortgroupref,
+                                                                                                         subplan_itlist,
+                                                                                                         OUTER_VAR);
+                       if (!newexpr)
+                               newexpr = fix_combine_agg_expr(root,
+                                                                                          (Node *) tle->expr,
+                                                                                          subplan_itlist,
+                                                                                          OUTER_VAR,
+                                                                                          rtoffset);
+               }
+               else
+                       newexpr = fix_combine_agg_expr(root,
+                                                                                  (Node *) tle->expr,
+                                                                                  subplan_itlist,
+                                                                                  OUTER_VAR,
+                                                                                  rtoffset);
+               tle = flatCopyTargetEntry(tle);
+               tle->expr = (Expr *) newexpr;
+               output_targetlist = lappend(output_targetlist, tle);
+       }
+
+       plan->targetlist = output_targetlist;
+
+       plan->qual = (List *)
+               fix_combine_agg_expr(root,
+                                                        (Node *) plan->qual,
+                                                        subplan_itlist,
+                                                        OUTER_VAR,
+                                                        rtoffset);
+
+       pfree(subplan_itlist);
+}
+
 /*
  * set_dummy_tlist_references
  *       Replace the targetlist of an upper-level plan node with a simple
@@ -1967,6 +2053,68 @@ search_indexed_tlist_for_sortgroupref(Node *node,
        return NULL;                            /* no match */
 }
 
+/*
+ * search_indexed_tlist_for_partial_aggref - find an Aggref in an indexed tlist
+ *
+ * Aggrefs for partial aggregates have their aggoutputtype adjusted to set it
+ * to the aggregate state's type. This means that a standard equal() comparison
+ * won't match when comparing an Aggref which is in partial mode with an Aggref
+ * which is not. Here we manually compare all of the fields apart from
+ * aggoutputtype.
+ */
+static Var *
+search_indexed_tlist_for_partial_aggref(Aggref *aggref, indexed_tlist *itlist,
+                                                                               Index newvarno)
+{
+       ListCell *lc;
+
+       foreach(lc, itlist->tlist)
+       {
+               TargetEntry *tle = (TargetEntry *) lfirst(lc);
+
+               if (IsA(tle->expr, Aggref))
+               {
+                       Aggref     *tlistaggref = (Aggref *) tle->expr;
+                       Var                *newvar;
+
+                       if (aggref->aggfnoid != tlistaggref->aggfnoid)
+                               continue;
+                       if (aggref->aggtype != tlistaggref->aggtype)
+                               continue;
+                       /* ignore aggoutputtype */
+                       if (aggref->aggcollid != tlistaggref->aggcollid)
+                               continue;
+                       if (aggref->inputcollid != tlistaggref->inputcollid)
+                               continue;
+                       if (!equal(aggref->aggdirectargs, tlistaggref->aggdirectargs))
+                               continue;
+                       if (!equal(aggref->args, tlistaggref->args))
+                               continue;
+                       if (!equal(aggref->aggorder, tlistaggref->aggorder))
+                               continue;
+                       if (!equal(aggref->aggdistinct, tlistaggref->aggdistinct))
+                               continue;
+                       if (!equal(aggref->aggfilter, tlistaggref->aggfilter))
+                               continue;
+                       if (aggref->aggstar != tlistaggref->aggstar)
+                               continue;
+                       if (aggref->aggvariadic != tlistaggref->aggvariadic)
+                               continue;
+                       if (aggref->aggkind != tlistaggref->aggkind)
+                               continue;
+                       if (aggref->agglevelsup != tlistaggref->agglevelsup)
+                               continue;
+
+                       newvar = makeVarFromTargetEntry(newvarno, tle);
+                       newvar->varnoold = 0;   /* wasn't ever a plain Var */
+                       newvar->varoattno = 0;
+
+                       return newvar;
+               }
+       }
+       return NULL;
+}
+
 /*
  * fix_join_expr
  *        Create a new set of targetlist entries or join qual clauses by
@@ -2237,6 +2385,105 @@ fix_upper_expr_mutator(Node *node, fix_upper_expr_context *context)
                                                                   (void *) context);
 }
 
+/*
+ * fix_combine_agg_expr
+ *       Like fix_upper_expr() but additionally adjusts the Aggref->args of
+ *       Aggrefs so that they references the corresponding Aggref in the subplan.
+ */
+static Node *
+fix_combine_agg_expr(PlannerInfo *root,
+                          Node *node,
+                          indexed_tlist *subplan_itlist,
+                          Index newvarno,
+                          int rtoffset)
+{
+       fix_upper_expr_context context;
+
+       context.root = root;
+       context.subplan_itlist = subplan_itlist;
+       context.newvarno = newvarno;
+       context.rtoffset = rtoffset;
+       return fix_combine_agg_expr_mutator(node, &context);
+}
+
+static Node *
+fix_combine_agg_expr_mutator(Node *node, fix_upper_expr_context *context)
+{
+       Var                *newvar;
+
+       if (node == NULL)
+               return NULL;
+       if (IsA(node, Var))
+       {
+               Var                *var = (Var *) node;
+
+               newvar = search_indexed_tlist_for_var(var,
+                                                                                         context->subplan_itlist,
+                                                                                         context->newvarno,
+                                                                                         context->rtoffset);
+               if (!newvar)
+                       elog(ERROR, "variable not found in subplan target list");
+               return (Node *) newvar;
+       }
+       if (IsA(node, PlaceHolderVar))
+       {
+               PlaceHolderVar *phv = (PlaceHolderVar *) node;
+
+               /* See if the PlaceHolderVar has bubbled up from a lower plan node */
+               if (context->subplan_itlist->has_ph_vars)
+               {
+                       newvar = search_indexed_tlist_for_non_var((Node *) phv,
+                                                                                                         context->subplan_itlist,
+                                                                                                         context->newvarno);
+                       if (newvar)
+                               return (Node *) newvar;
+               }
+               /* If not supplied by input plan, evaluate the contained expr */
+               return fix_upper_expr_mutator((Node *) phv->phexpr, context);
+       }
+       if (IsA(node, Param))
+               return fix_param_node(context->root, (Param *) node);
+       if (IsA(node, Aggref))
+       {
+               Aggref             *aggref = (Aggref *) node;
+
+               newvar = search_indexed_tlist_for_partial_aggref(aggref,
+                                                                                                        context->subplan_itlist,
+                                                                                                                context->newvarno);
+               if (newvar)
+               {
+                       Aggref             *newaggref;
+                       TargetEntry        *newtle;
+
+                       /*
+                        * Now build a new TargetEntry for the Aggref's arguments which is
+                        * a single Var which references the corresponding AggRef in the
+                        * node below.
+                        */
+                       newtle = makeTargetEntry((Expr *) newvar, 1, NULL, false);
+                       newaggref = (Aggref *) copyObject(aggref);
+                       newaggref->args = list_make1(newtle);
+
+                       return (Node *) newaggref;
+               }
+               else
+                       elog(ERROR, "Aggref not found in subplan target list");
+       }
+       /* Try matching more complex expressions too, if tlist has any */
+       if (context->subplan_itlist->has_non_vars)
+       {
+               newvar = search_indexed_tlist_for_non_var(node,
+                                                                                                 context->subplan_itlist,
+                                                                                                 context->newvarno);
+               if (newvar)
+                       return (Node *) newvar;
+       }
+       fix_expr_common(context->root, node);
+       return expression_tree_mutator(node,
+                                                                  fix_combine_agg_expr_mutator,
+                                                                  (void *) context);
+}
+
 /*
  * set_returning_clause_references
  *             Perform setrefs.c's work on a RETURNING targetlist
index 6ea3319e5fc130d39cecdf7698ebffbf0e0a05a7..fb139af2c1c9488b29f64a7535ed6b429e62e26d 100644 (file)
@@ -859,7 +859,9 @@ make_union_unique(SetOperationStmt *op, Path *path, List *tlist,
                                                                                groupList,
                                                                                NIL,
                                                                                NULL,
-                                                                               dNumGroups);
+                                                                               dNumGroups,
+                                                                               false,
+                                                                               true);
        }
        else
        {
index b692e18e3d4af7b2f2524eaa8273f4e88d777394..d80dfbe5c9fa6b8811c493179a327433ee82f282 100644 (file)
 #include "utils/syscache.h"
 #include "utils/typcache.h"
 
+typedef struct
+{
+       PartialAggType allowedtype;
+} partial_agg_context;
 
 typedef struct
 {
@@ -93,6 +97,8 @@ typedef struct
        bool            allow_restricted;
 } has_parallel_hazard_arg;
 
+static bool aggregates_allow_partial_walker(Node *node,
+                                                                                       partial_agg_context *context);
 static bool contain_agg_clause_walker(Node *node, void *context);
 static bool count_agg_clauses_walker(Node *node,
                                                 count_agg_clauses_context *context);
@@ -399,6 +405,79 @@ make_ands_implicit(Expr *clause)
  *             Aggregate-function clause manipulation
  *****************************************************************************/
 
+/*
+ * aggregates_allow_partial
+ *             Recursively search for Aggref clauses and determine the maximum
+ *             level of partial aggregation which can be supported.
+ */
+PartialAggType
+aggregates_allow_partial(Node *clause)
+{
+       partial_agg_context context;
+
+       /* initially any type is okay, until we find Aggrefs which say otherwise */
+       context.allowedtype = PAT_ANY;
+
+       if (!aggregates_allow_partial_walker(clause, &context))
+               return context.allowedtype;
+       return context.allowedtype;
+}
+
+static bool
+aggregates_allow_partial_walker(Node *node, partial_agg_context *context)
+{
+       if (node == NULL)
+               return false;
+       if (IsA(node, Aggref))
+       {
+               Aggref     *aggref = (Aggref *) node;
+               HeapTuple       aggTuple;
+               Form_pg_aggregate aggform;
+
+               Assert(aggref->agglevelsup == 0);
+
+               /*
+                * We can't perform partial aggregation with Aggrefs containing a
+                * DISTINCT or ORDER BY clause.
+                */
+               if (aggref->aggdistinct || aggref->aggorder)
+               {
+                       context->allowedtype = PAT_DISABLED;
+                       return true;    /* abort search */
+               }
+               aggTuple = SearchSysCache1(AGGFNOID,
+                                                                  ObjectIdGetDatum(aggref->aggfnoid));
+               if (!HeapTupleIsValid(aggTuple))
+                       elog(ERROR, "cache lookup failed for aggregate %u",
+                                aggref->aggfnoid);
+               aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
+
+               /*
+                * If there is no combine function, then partial aggregation is not
+                * possible.
+                */
+               if (!OidIsValid(aggform->aggcombinefn))
+               {
+                       ReleaseSysCache(aggTuple);
+                       context->allowedtype = PAT_DISABLED;
+                       return true;    /* abort search */
+               }
+
+               /*
+                * If we find any aggs with an internal transtype then we must ensure
+                * that pointers to aggregate states are not passed to other processes;
+                * therefore, we set the maximum allowed type to PAT_INTERNAL_ONLY.
+                */
+               if (aggform->aggtranstype == INTERNALOID)
+                       context->allowedtype = PAT_INTERNAL_ONLY;
+
+               ReleaseSysCache(aggTuple);
+               return false; /* continue searching */
+       }
+       return expression_tree_walker(node, aggregates_allow_partial_walker,
+                                                                 (void *) context);
+}
+
 /*
  * contain_agg_clause
  *       Recursively search for Aggref/GroupingFunc nodes within a clause.
index 541f7790ab0bb14ea56e994f931ff033b1cb4aba..16b34fcf46a51898e3c3ed037c741cd03fc5f75b 100644 (file)
@@ -1645,10 +1645,12 @@ translate_sub_tlist(List *tlist, int relid)
  * create_gather_path
  *       Creates a path corresponding to a gather scan, returning the
  *       pathnode.
+ *
+ * 'rows' may optionally be set to override row estimates from other sources.
  */
 GatherPath *
 create_gather_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
-                                  Relids required_outer)
+                                  PathTarget *target, Relids required_outer, double *rows)
 {
        GatherPath *pathnode = makeNode(GatherPath);
 
@@ -1656,7 +1658,7 @@ create_gather_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
 
        pathnode->path.pathtype = T_Gather;
        pathnode->path.parent = rel;
-       pathnode->path.pathtarget = rel->reltarget;
+       pathnode->path.pathtarget = target;
        pathnode->path.param_info = get_baserel_parampathinfo(root, rel,
                                                                                                                  required_outer);
        pathnode->path.parallel_aware = false;
@@ -1674,7 +1676,7 @@ create_gather_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
                pathnode->single_copy = true;
        }
 
-       cost_gather(pathnode, root, rel, pathnode->path.param_info);
+       cost_gather(pathnode, root, rel, pathnode->path.param_info, rows);
 
        return pathnode;
 }
@@ -2417,6 +2419,8 @@ create_upper_unique_path(PlannerInfo *root,
  * 'qual' is the HAVING quals if any
  * 'aggcosts' contains cost info about the aggregate functions to be computed
  * 'numGroups' is the estimated number of groups (1 if not grouping)
+ * 'combineStates' is set to true if the Agg node should combine agg states
+ * 'finalizeAggs' is set to false if the Agg node should not call the finalfn
  */
 AggPath *
 create_agg_path(PlannerInfo *root,
@@ -2427,7 +2431,9 @@ create_agg_path(PlannerInfo *root,
                                List *groupClause,
                                List *qual,
                                const AggClauseCosts *aggcosts,
-                               double numGroups)
+                               double numGroups,
+                               bool combineStates,
+                               bool finalizeAggs)
 {
        AggPath    *pathnode = makeNode(AggPath);
 
@@ -2450,6 +2456,8 @@ create_agg_path(PlannerInfo *root,
        pathnode->numGroups = numGroups;
        pathnode->groupClause = groupClause;
        pathnode->qual = qual;
+       pathnode->finalizeAggs = finalizeAggs;
+       pathnode->combineStates = combineStates;
 
        cost_agg(&pathnode->path, root,
                         aggstrategy, aggcosts,
index b297d87e7ec4abf93f95470f57498f726c85c3b1..cd421b14632b2a73711409287b31625ff576f614 100644 (file)
  */
 #include "postgres.h"
 
+#include "access/htup_details.h"
+#include "catalog/pg_aggregate.h"
 #include "nodes/makefuncs.h"
 #include "nodes/nodeFuncs.h"
 #include "optimizer/tlist.h"
+#include "utils/syscache.h"
 
 
 /*****************************************************************************
@@ -748,3 +751,45 @@ apply_pathtarget_labeling_to_tlist(List *tlist, PathTarget *target)
                i++;
        }
 }
+
+/*
+ * apply_partialaggref_adjustment
+ *       Convert PathTarget to be suitable for a partial aggregate node. We simply
+ *       adjust any Aggref nodes found in the target and set the aggoutputtype to
+ *       the aggtranstype. This allows exprType() to return the actual type that
+ *       will be produced.
+ *
+ * Note: We expect 'target' to be a flat target list and not have Aggrefs burried
+ * within other expressions.
+ */
+void
+apply_partialaggref_adjustment(PathTarget *target)
+{
+       ListCell *lc;
+
+       foreach(lc, target->exprs)
+       {
+               Aggref *aggref = (Aggref *) lfirst(lc);
+
+               if (IsA(aggref, Aggref))
+               {
+                       HeapTuple       aggTuple;
+                       Form_pg_aggregate aggform;
+                       Aggref     *newaggref;
+
+                       aggTuple = SearchSysCache1(AGGFNOID,
+                                                                          ObjectIdGetDatum(aggref->aggfnoid));
+                       if (!HeapTupleIsValid(aggTuple))
+                               elog(ERROR, "cache lookup failed for aggregate %u",
+                                        aggref->aggfnoid);
+                       aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
+
+                       newaggref = (Aggref *) copyObject(aggref);
+                       newaggref->aggoutputtype = aggform->aggtranstype;
+
+                       lfirst(lc) = newaggref;
+
+                       ReleaseSysCache(aggTuple);
+               }
+       }
+}
index 9744d0dc68994de7ec2cd7736a5bd65455d9410b..485960f753cfb1693d3b2beffa75af22976db226 100644 (file)
@@ -647,7 +647,8 @@ ParseFuncOrColumn(ParseState *pstate, List *funcname, List *fargs,
                Aggref     *aggref = makeNode(Aggref);
 
                aggref->aggfnoid = funcid;
-               aggref->aggtype = rettype;
+               /* default the outputtype to be the same as aggtype */
+               aggref->aggtype = aggref->aggoutputtype = rettype;
                /* aggcollid and inputcollid will be set by parse_collate.c */
                /* aggdirectargs and args will be set by transformAggregateCall */
                /* aggorder and aggdistinct will be set by transformAggregateCall */
index 7c9e9eb19f675b048346f24cdc512f4bd43e224c..3568cb27e814705045424fa17c8866f465a69967 100644 (file)
@@ -53,6 +53,6 @@
  */
 
 /*                                                     yyyymmddN */
-#define CATALOG_VERSION_NO     201603181
+#define CATALOG_VERSION_NO     201603211
 
 #endif
index f942378363489a807096170aacaae4076847822a..1ffc0a1e5e2d991426c21018f1e179c59ffa7ec2 100644 (file)
@@ -255,12 +255,21 @@ typedef struct Param
  * DISTINCT is not supported in this case, so aggdistinct will be NIL.
  * The direct arguments appear in aggdirectargs (as a list of plain
  * expressions, not TargetEntry nodes).
+ *
+ * 'aggtype' and 'aggoutputtype' are the same except when we're performing
+ * partal aggregation; in that case, we output transition states.  Nothing
+ * interesting happens in the Aggref itself, but we must set the output data
+ * type to whatever type is used for transition values.
+ *
+ * Note: If you are adding fields here you may also need to add a comparison
+ * in search_indexed_tlist_for_partial_aggref()
  */
 typedef struct Aggref
 {
        Expr            xpr;
        Oid                     aggfnoid;               /* pg_proc Oid of the aggregate */
-       Oid                     aggtype;                /* type Oid of result of the aggregate */
+       Oid                     aggtype;                /* type Oid of final result of the aggregate */
+       Oid                     aggoutputtype;  /* type Oid of result of this aggregate */
        Oid                     aggcollid;              /* OID of collation of result */
        Oid                     inputcollid;    /* OID of collation that function should use */
        List       *aggdirectargs;      /* direct arguments, if an ordered-set agg */
index 503269693bac3c1ba23b8668dddb1490ea68a3fb..ee7007aacec6ff1d92bb450ca84f5995481fdec2 100644 (file)
@@ -1309,6 +1309,8 @@ typedef struct AggPath
        double          numGroups;              /* estimated number of groups in input */
        List       *groupClause;        /* a list of SortGroupClause's */
        List       *qual;                       /* quals (HAVING quals), if any */
+       bool            combineStates;  /* input is partially aggregated agg states */
+       bool            finalizeAggs;   /* should the executor call the finalfn? */
 } AggPath;
 
 /*
index 3b3fd0fc9a05a74cb5739f8a41f2d195b9231636..3ab57f155d23736683497551c0b1a5997c3e5acc 100644 (file)
@@ -27,6 +27,23 @@ typedef struct
        List      **windowFuncs;        /* lists of WindowFuncs for each winref */
 } WindowFuncLists;
 
+/*
+ * PartialAggType
+ *     PartialAggType stores whether partial aggregation is allowed and
+ *     which context it is allowed in. We require three states here as there are
+ *     two different contexts in which partial aggregation is safe. For aggregates
+ *     which have an 'stype' of INTERNAL, it is okay to pass a pointer to the
+ *  aggregate state within a single process, since the datum is just a
+ *     pointer. In cases where the aggregate state must be passed between
+ *  different processes, for example during parallel aggregation, passing
+ *  pointers directly is not going to work.
+ */
+typedef enum
+{
+       PAT_ANY = 0,            /* Any type of partial aggregation is okay. */
+       PAT_INTERNAL_ONLY,      /* Some aggregates support only internal mode. */
+       PAT_DISABLED            /* Some aggregates don't support partial mode at all */
+} PartialAggType;
 
 extern Expr *make_opclause(Oid opno, Oid opresulttype, bool opretset,
                          Expr *leftop, Expr *rightop,
@@ -47,6 +64,7 @@ extern Node *make_and_qual(Node *qual1, Node *qual2);
 extern Expr *make_ands_explicit(List *andclauses);
 extern List *make_ands_implicit(Expr *clause);
 
+extern PartialAggType aggregates_allow_partial(Node *clause);
 extern bool contain_agg_clause(Node *clause);
 extern void count_agg_clauses(PlannerInfo *root, Node *clause,
                                  AggClauseCosts *costs);
index fea2bb77f4d9b61ce8237abac64088ebaebe8f90..d4adca6836a30ead68c5299bd77da66d3ba34129 100644 (file)
@@ -150,7 +150,7 @@ extern void final_cost_hashjoin(PlannerInfo *root, HashPath *path,
                                        SpecialJoinInfo *sjinfo,
                                        SemiAntiJoinFactors *semifactors);
 extern void cost_gather(GatherPath *path, PlannerInfo *root,
-                       RelOptInfo *baserel, ParamPathInfo *param_info);
+                       RelOptInfo *baserel, ParamPathInfo *param_info, double *rows);
 extern void cost_subplan(PlannerInfo *root, SubPlan *subplan, Plan *plan);
 extern void cost_qual_eval(QualCost *cost, List *quals, PlannerInfo *root);
 extern void cost_qual_eval_node(QualCost *cost, Node *qual, PlannerInfo *root);
index d1eb22f27a459a5b0f29734469a27ba5b4cd5b14..1744ff058e8999ab0f6c1a02e33db500c7fc82f5 100644 (file)
@@ -74,7 +74,8 @@ extern MaterialPath *create_material_path(RelOptInfo *rel, Path *subpath);
 extern UniquePath *create_unique_path(PlannerInfo *root, RelOptInfo *rel,
                                   Path *subpath, SpecialJoinInfo *sjinfo);
 extern GatherPath *create_gather_path(PlannerInfo *root,
-                                  RelOptInfo *rel, Path *subpath, Relids required_outer);
+                                  RelOptInfo *rel, Path *subpath, PathTarget *target,
+                                  Relids required_outer, double *rows);
 extern SubqueryScanPath *create_subqueryscan_path(PlannerInfo *root,
                                                 RelOptInfo *rel, Path *subpath,
                                                 List *pathkeys, Relids required_outer);
@@ -168,7 +169,9 @@ extern AggPath *create_agg_path(PlannerInfo *root,
                                List *groupClause,
                                List *qual,
                                const AggClauseCosts *aggcosts,
-                               double numGroups);
+                               double numGroups,
+                               bool combineStates,
+                               bool finalizeAggs);
 extern GroupingSetsPath *create_groupingsets_path(PlannerInfo *root,
                                                 RelOptInfo *rel,
                                                 Path *subpath,
index 0d745a089148c09efee499f09346f73eb285c61a..de58db1db2a680a913c1f5227c849d1918143db3 100644 (file)
@@ -61,6 +61,7 @@ extern void add_column_to_pathtarget(PathTarget *target,
 extern void add_new_column_to_pathtarget(PathTarget *target, Expr *expr);
 extern void add_new_columns_to_pathtarget(PathTarget *target, List *exprs);
 extern void apply_pathtarget_labeling_to_tlist(List *tlist, PathTarget *target);
+extern void apply_partialaggref_adjustment(PathTarget *target);
 
 /* Convenience macro to get a PathTarget with valid cost/width fields */
 #define create_pathtarget(root, tlist) \