]> granicus.if.org Git - postgresql/commitdiff
Allow parallel workers to execute subplans.
authorRobert Haas <rhaas@postgresql.org>
Tue, 14 Feb 2017 23:09:47 +0000 (18:09 -0500)
committerRobert Haas <rhaas@postgresql.org>
Tue, 14 Feb 2017 23:16:03 +0000 (18:16 -0500)
This doesn't do anything to make Param nodes anything other than
parallel-restricted, so this only helps with uncorrelated subplans,
and it's not necessarily very cheap because each worker will run the
subplan separately (just as a Hash Join will build a separate copy of
the hash table in each participating process), but it's a first step
toward supporting cases that are more likely to help in practice, and
is occasionally useful on its own.

Amit Kapila, reviewed and tested by Rafia Sabih, Dilip Kumar, and
me.

Discussion: http://postgr.es/m/CAA4eK1+e8Z45D2n+rnDMDYsVEb5iW7jqaCH_tvPMYau=1Rru9w@mail.gmail.com

src/backend/executor/execParallel.c
src/backend/nodes/copyfuncs.c
src/backend/nodes/equalfuncs.c
src/backend/nodes/outfuncs.c
src/backend/nodes/readfuncs.c
src/backend/optimizer/plan/subselect.c
src/backend/optimizer/util/clauses.c
src/include/nodes/primnodes.h
src/test/regress/expected/select_parallel.out
src/test/regress/sql/select_parallel.sql

index fe87c9ae71deef2da21233653b778a712ed4c3ec..784dbaf590ef8496fd7dde6d9511317630e1570e 100644 (file)
@@ -156,7 +156,7 @@ ExecSerializePlan(Plan *plan, EState *estate)
        pstmt->planTree = plan;
        pstmt->rtable = estate->es_range_table;
        pstmt->resultRelations = NIL;
-       pstmt->subplans = NIL;
+       pstmt->subplans = estate->es_plannedstmt->subplans;
        pstmt->rewindPlanIDs = NULL;
        pstmt->rowMarks = NIL;
        pstmt->relationOids = NIL;
index 30d733e57a002d23f45ccf653b0bca3962991f6a..12324ab63f13814ef7d4b5b2a1d41c60c483919a 100644 (file)
@@ -1495,6 +1495,7 @@ _copySubPlan(const SubPlan *from)
        COPY_SCALAR_FIELD(firstColCollation);
        COPY_SCALAR_FIELD(useHashTable);
        COPY_SCALAR_FIELD(unknownEqFalse);
+       COPY_SCALAR_FIELD(parallel_safe);
        COPY_NODE_FIELD(setParam);
        COPY_NODE_FIELD(parParam);
        COPY_NODE_FIELD(args);
index 55c73b729262012cd66bbd2ef8db77aa7f4c45dc..6d1dabe17e9602635b168c8d266a76adfc7d901b 100644 (file)
@@ -423,6 +423,7 @@ _equalSubPlan(const SubPlan *a, const SubPlan *b)
        COMPARE_SCALAR_FIELD(firstColCollation);
        COMPARE_SCALAR_FIELD(useHashTable);
        COMPARE_SCALAR_FIELD(unknownEqFalse);
+       COMPARE_SCALAR_FIELD(parallel_safe);
        COMPARE_NODE_FIELD(setParam);
        COMPARE_NODE_FIELD(parParam);
        COMPARE_NODE_FIELD(args);
index 1560ac39895bfeaaa521e61956c6a9696bc73009..b3802b4428fd3e21ebb8affbcc0f9e6fe6dbe8dc 100644 (file)
@@ -1226,6 +1226,7 @@ _outSubPlan(StringInfo str, const SubPlan *node)
        WRITE_OID_FIELD(firstColCollation);
        WRITE_BOOL_FIELD(useHashTable);
        WRITE_BOOL_FIELD(unknownEqFalse);
+       WRITE_BOOL_FIELD(parallel_safe);
        WRITE_NODE_FIELD(setParam);
        WRITE_NODE_FIELD(parParam);
        WRITE_NODE_FIELD(args);
index dcfa6ee28df51753c0572d2db230831e549b3b44..d2f69fe70ba366073b2fac21da67e8e4eecab8a3 100644 (file)
@@ -2233,6 +2233,7 @@ _readSubPlan(void)
        READ_OID_FIELD(firstColCollation);
        READ_BOOL_FIELD(useHashTable);
        READ_BOOL_FIELD(unknownEqFalse);
+       READ_BOOL_FIELD(parallel_safe);
        READ_NODE_FIELD(setParam);
        READ_NODE_FIELD(parParam);
        READ_NODE_FIELD(args);
index 9fc748973e719b6a8998d379e9f9094f8383abe1..7954c445dd91798ad2f3f59e745f34fe2db066e1 100644 (file)
@@ -58,7 +58,7 @@ static Node *build_subplan(PlannerInfo *root, Plan *plan, PlannerInfo *subroot,
                          List *plan_params,
                          SubLinkType subLinkType, int subLinkId,
                          Node *testexpr, bool adjust_testexpr,
-                         bool unknownEqFalse);
+                         bool unknownEqFalse, bool parallel_safe);
 static List *generate_subquery_params(PlannerInfo *root, List *tlist,
                                                 List **paramIds);
 static List *generate_subquery_vars(PlannerInfo *root, List *tlist,
@@ -551,7 +551,8 @@ make_subplan(PlannerInfo *root, Query *orig_subquery,
        /* And convert to SubPlan or InitPlan format. */
        result = build_subplan(root, plan, subroot, plan_params,
                                                   subLinkType, subLinkId,
-                                                  testexpr, true, isTopQual);
+                                                  testexpr, true, isTopQual,
+                                                  best_path->parallel_safe);
 
        /*
         * If it's a correlated EXISTS with an unimportant targetlist, we might be
@@ -604,7 +605,8 @@ make_subplan(PlannerInfo *root, Query *orig_subquery,
                                                                                                         plan_params,
                                                                                                         ANY_SUBLINK, 0,
                                                                                                         newtestexpr,
-                                                                                                        false, true);
+                                                                                                        false, true,
+                                                                                                  best_path->parallel_safe);
                                /* Check we got what we expected */
                                Assert(IsA(hashplan, SubPlan));
                                Assert(hashplan->parParam == NIL);
@@ -634,7 +636,7 @@ build_subplan(PlannerInfo *root, Plan *plan, PlannerInfo *subroot,
                          List *plan_params,
                          SubLinkType subLinkType, int subLinkId,
                          Node *testexpr, bool adjust_testexpr,
-                         bool unknownEqFalse)
+                         bool unknownEqFalse, bool parallel_safe)
 {
        Node       *result;
        SubPlan    *splan;
@@ -653,6 +655,7 @@ build_subplan(PlannerInfo *root, Plan *plan, PlannerInfo *subroot,
                                           &splan->firstColCollation);
        splan->useHashTable = false;
        splan->unknownEqFalse = unknownEqFalse;
+       splan->parallel_safe = parallel_safe;
        splan->setParam = NIL;
        splan->parParam = NIL;
        splan->args = NIL;
@@ -1213,6 +1216,12 @@ SS_process_ctes(PlannerInfo *root)
                                                   &splan->firstColCollation);
                splan->useHashTable = false;
                splan->unknownEqFalse = false;
+
+               /*
+                * CTE scans are not considered for parallelism (cf
+                * set_rel_consider_parallel).
+                */
+               splan->parallel_safe = false;
                splan->setParam = NIL;
                splan->parParam = NIL;
                splan->args = NIL;
index d589dc2544b0fe1bfc5bd483a6a2c7eb5b3f1d2b..3dedee6d69984b6c906f2ce90de885b43c15f6e2 100644 (file)
@@ -1162,21 +1162,19 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context)
        }
 
        /*
-        * Since we don't have the ability to push subplans down to workers at
-        * present, we treat subplan references as parallel-restricted.  We need
-        * not worry about examining their contents; if they are unsafe, we would
-        * have found that out while examining the whole tree before reduction of
-        * sublinks to subplans.  (Really we should not see SubLink during a
-        * max_interesting == restricted scan, but if we do, return true.)
+        * Really we should not see SubLink during a max_interesting == restricted
+        * scan, but if we do, return true.
         */
-       else if (IsA(node, SubLink) ||
-                        IsA(node, SubPlan) ||
-                        IsA(node, AlternativeSubPlan))
+       else if (IsA(node, SubLink))
        {
                if (max_parallel_hazard_test(PROPARALLEL_RESTRICTED, context))
                        return true;
        }
 
+       /* We can push the subplans only if they are parallel-safe. */
+       else if (IsA(node, SubPlan))
+               return !((SubPlan *) node)->parallel_safe;
+
        /*
         * We can't pass Params to workers at the moment either, so they are also
         * parallel-restricted.
index f72ec247ffe92fff6fafded205ef423f71c9e1b2..235bc7509662ed255a8c15e154252e2925560e45 100644 (file)
@@ -677,6 +677,7 @@ typedef struct SubPlan
        bool            unknownEqFalse; /* TRUE if it's okay to return FALSE when the
                                                                 * spec result is UNKNOWN; this allows much
                                                                 * simpler handling of null values */
+       bool            parallel_safe;  /* OK to use as part of parallel plan? */
        /* Information for passing params into and out of the subselect: */
        /* setParam and parParam are lists of integers (param IDs) */
        List       *setParam;           /* initplan subqueries have to set these
index 18e21b7f1325443ba18a603d5024fe5765dda509..8786678f0cf0efb69ac5a03842f095271f2e9fd7 100644 (file)
@@ -99,6 +99,32 @@ explain (costs off)
    ->  Index Only Scan using tenk1_unique1 on tenk1
 (3 rows)
 
+-- test parallel plans for queries containing un-correlated subplans.
+alter table tenk2 set (parallel_workers = 0);
+explain (costs off)
+       select count(*) from tenk1 where (two, four) not in
+       (select hundred, thousand from tenk2 where thousand > 100);
+                      QUERY PLAN                      
+------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 4
+         ->  Partial Aggregate
+               ->  Parallel Seq Scan on tenk1
+                     Filter: (NOT (hashed SubPlan 1))
+                     SubPlan 1
+                       ->  Seq Scan on tenk2
+                             Filter: (thousand > 100)
+(9 rows)
+
+select count(*) from tenk1 where (two, four) not in
+       (select hundred, thousand from tenk2 where thousand > 100);
+ count 
+-------
+ 10000
+(1 row)
+
+alter table tenk2 reset (parallel_workers);
 set force_parallel_mode=1;
 explain (costs off)
   select stringu1::int2 from tenk1 where unique1 = 1;
index 8b4090f2ec18ea7c94998cb3425da300069820b0..def9939d2e6a1d3ceb54e60cf0735b6be18df688 100644 (file)
@@ -39,6 +39,15 @@ explain (costs off)
        select  sum(parallel_restricted(unique1)) from tenk1
        group by(parallel_restricted(unique1));
 
+-- test parallel plans for queries containing un-correlated subplans.
+alter table tenk2 set (parallel_workers = 0);
+explain (costs off)
+       select count(*) from tenk1 where (two, four) not in
+       (select hundred, thousand from tenk2 where thousand > 100);
+select count(*) from tenk1 where (two, four) not in
+       (select hundred, thousand from tenk2 where thousand > 100);
+alter table tenk2 reset (parallel_workers);
+
 set force_parallel_mode=1;
 
 explain (costs off)