]> granicus.if.org Git - postgresql/commitdiff
Add parallel_leader_participation GUC.
authorRobert Haas <rhaas@postgresql.org>
Wed, 15 Nov 2017 13:17:29 +0000 (08:17 -0500)
committerRobert Haas <rhaas@postgresql.org>
Wed, 15 Nov 2017 13:23:18 +0000 (08:23 -0500)
Sometimes, for testing, it's useful to have the leader do nothing but
read tuples from workers; and it's possible that could work out better
even in production.

Thomas Munro, reviewed by Amit Kapila and by me.  A few final tweaks
by me.

Discussion: http://postgr.es/m/CAEepm=2U++Lp3bNTv2Bv_kkr5NE2pOyHhxU=G0YTa4ZhSYhHiw@mail.gmail.com

doc/src/sgml/config.sgml
src/backend/executor/nodeGather.c
src/backend/executor/nodeGatherMerge.c
src/backend/optimizer/path/costsize.c
src/backend/optimizer/plan/planner.c
src/backend/utils/misc/guc.c
src/backend/utils/misc/postgresql.conf.sample
src/include/optimizer/planmain.h
src/test/regress/expected/select_parallel.out
src/test/regress/sql/select_parallel.sql

index 996e82534abae6d9c9d4e41ec3a8ead7b5e39991..fc1752fb3f1ffd4bef6971116038dc41f4b9acf5 100644 (file)
@@ -4265,6 +4265,32 @@ SELECT * FROM parent WHERE key = 2400;
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-parallel-leader-participation" xreflabel="parallel_leader_participation">
+      <term>
+       <varname>parallel_leader_participation</varname> (<type>boolean</type>)
+       <indexterm>
+        <primary>
+         <varname>parallel_leader_participation</varname> configuration
+         parameter
+        </primary>
+       </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Allows the leader process to execute the query plan under
+        <literal>Gather</literal> and <literal>Gather Merge</literal> nodes
+        instead of waiting for worker processes.  The default is
+        <literal>on</literal>.  Setting this value to <literal>off</literal>
+        reduces the likelihood that workers will become blocked because the
+        leader is not reading tuples fast enough, but requires the leader
+        process to wait for worker processes to start up before the first
+        tuples can be produced.  The degree to which the leader can help or
+        hinder performance depends on the plan type, number of workers and
+        query duration.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-force-parallel-mode" xreflabel="force_parallel_mode">
       <term><varname>force_parallel_mode</varname> (<type>enum</type>)
       <indexterm>
index 639f4f5af88f24ce6cb586d63cc59dca2ede1660..0298c65d06502e1254c52618e0d758f63758c535 100644 (file)
@@ -38,6 +38,7 @@
 #include "executor/nodeSubplan.h"
 #include "executor/tqueue.h"
 #include "miscadmin.h"
+#include "optimizer/planmain.h"
 #include "pgstat.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
@@ -73,7 +74,8 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
        gatherstate->ps.ExecProcNode = ExecGather;
 
        gatherstate->initialized = false;
-       gatherstate->need_to_scan_locally = !node->single_copy;
+       gatherstate->need_to_scan_locally =
+               !node->single_copy && parallel_leader_participation;
        gatherstate->tuples_needed = -1;
 
        /*
@@ -193,9 +195,9 @@ ExecGather(PlanState *pstate)
                        node->nextreader = 0;
                }
 
-               /* Run plan locally if no workers or not single-copy. */
+               /* Run plan locally if no workers or enabled and not single-copy. */
                node->need_to_scan_locally = (node->nreaders == 0)
-                       || !gather->single_copy;
+                       || (!gather->single_copy && parallel_leader_participation);
                node->initialized = true;
        }
 
index 5625b12521004291c75b7814c03cbc5b0925503b..7206ab919758c7fc05133ad60f183a052c998c3f 100644 (file)
@@ -23,6 +23,7 @@
 #include "executor/tqueue.h"
 #include "lib/binaryheap.h"
 #include "miscadmin.h"
+#include "optimizer/planmain.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
 
@@ -233,8 +234,9 @@ ExecGatherMerge(PlanState *pstate)
                        }
                }
 
-               /* always allow leader to participate */
-               node->need_to_scan_locally = true;
+               /* allow leader to participate if enabled or no choice */
+               if (parallel_leader_participation || node->nreaders == 0)
+                       node->need_to_scan_locally = true;
                node->initialized = true;
        }
 
index 2d2df60886aca95e7d16bdf3d49c2136f95bef76..d11bf19e30aa685784df65cead38f3c42f98639b 100644 (file)
@@ -5137,7 +5137,6 @@ static double
 get_parallel_divisor(Path *path)
 {
        double          parallel_divisor = path->parallel_workers;
-       double          leader_contribution;
 
        /*
         * Early experience with parallel query suggests that when there is only
@@ -5150,9 +5149,14 @@ get_parallel_divisor(Path *path)
         * its time servicing each worker, and the remainder executing the
         * parallel plan.
         */
-       leader_contribution = 1.0 - (0.3 * path->parallel_workers);
-       if (leader_contribution > 0)
-               parallel_divisor += leader_contribution;
+       if (parallel_leader_participation)
+       {
+               double          leader_contribution;
+
+               leader_contribution = 1.0 - (0.3 * path->parallel_workers);
+               if (leader_contribution > 0)
+                       parallel_divisor += leader_contribution;
+       }
 
        return parallel_divisor;
 }
index 90fd9cc9598a8fd36c218174de9f8aa7c59a870c..4c00a1453bd07542685485997914bd1de7f6e335 100644 (file)
@@ -61,6 +61,7 @@
 /* GUC parameters */
 double         cursor_tuple_fraction = DEFAULT_CURSOR_TUPLE_FRACTION;
 int                    force_parallel_mode = FORCE_PARALLEL_OFF;
+bool           parallel_leader_participation = true;
 
 /* Hook for plugins to get control in planner() */
 planner_hook_type planner_hook = NULL;
index c4c1afa084ba2b99c345434c37ad2f8041d665b2..6dcd738be649f44c9ff2260e0e35705d0a0cd8a9 100644 (file)
@@ -1676,6 +1676,16 @@ static struct config_bool ConfigureNamesBool[] =
                NULL, NULL, NULL
        },
 
+       {
+               {"parallel_leader_participation", PGC_USERSET, RESOURCES_ASYNCHRONOUS,
+                       gettext_noop("Controls whether Gather and Gather Merge also run subplans."),
+                       gettext_noop("Should gather nodes also run subplans, or just gather tuples?")
+               },
+               &parallel_leader_participation,
+               true,
+               NULL, NULL, NULL
+       },
+
        /* End-of-list marker */
        {
                {NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL
index 368b280c8af7faf1ec8fca71a2891adab485683a..c7cd72ade2fa96e5c5b6b1e3cdeb3f37c0af28dd 100644 (file)
 #effective_io_concurrency = 1          # 1-1000; 0 disables prefetching
 #max_worker_processes = 8              # (change requires restart)
 #max_parallel_workers_per_gather = 2   # taken from max_parallel_workers
+#parallel_leader_particulation = on
 #max_parallel_workers = 8              # maximum number of max_worker_processes that
                                        # can be used in parallel queries
 #old_snapshot_threshold = -1           # 1min-60d; -1 disables; 0 is immediate
index f1d16cffab07ee1584ba9246b125d13cefa2a9ad..d6133228bdde56a6242c500eea52deedbc5ca020 100644 (file)
@@ -29,6 +29,7 @@ typedef enum
 #define DEFAULT_CURSOR_TUPLE_FRACTION 0.1
 extern double cursor_tuple_fraction;
 extern int     force_parallel_mode;
+extern bool parallel_leader_participation;
 
 /* query_planner callback to compute query_pathkeys */
 typedef void (*query_pathkeys_callback) (PlannerInfo *root, void *extra);
index 63ed6a33c15f286d6e3211bc61389a6cb099cdd0..06aeddd80552d0ff66a351294766458cbe013b5d 100644 (file)
@@ -34,6 +34,49 @@ select count(*) from a_star;
     50
 (1 row)
 
+-- test with leader participation disabled
+set parallel_leader_participation = off;
+explain (costs off)
+  select count(*) from tenk1 where stringu1 = 'GRAAAA';
+                       QUERY PLAN                        
+---------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 4
+         ->  Partial Aggregate
+               ->  Parallel Seq Scan on tenk1
+                     Filter: (stringu1 = 'GRAAAA'::name)
+(6 rows)
+
+select count(*) from tenk1 where stringu1 = 'GRAAAA';
+ count 
+-------
+    15
+(1 row)
+
+-- test with leader participation disabled, but no workers available (so
+-- the leader will have to run the plan despite the setting)
+set max_parallel_workers = 0;
+explain (costs off)
+  select count(*) from tenk1 where stringu1 = 'GRAAAA';
+                       QUERY PLAN                        
+---------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 4
+         ->  Partial Aggregate
+               ->  Parallel Seq Scan on tenk1
+                     Filter: (stringu1 = 'GRAAAA'::name)
+(6 rows)
+
+select count(*) from tenk1 where stringu1 = 'GRAAAA';
+ count 
+-------
+    15
+(1 row)
+
+reset max_parallel_workers;
+reset parallel_leader_participation;
 -- test that parallel_restricted function doesn't run in worker
 alter table tenk1 set (parallel_workers = 4);
 explain (verbose, costs off)
@@ -400,6 +443,49 @@ explain (costs off, verbose)
 (11 rows)
 
 drop function simple_func(integer);
+-- test gather merge with parallel leader participation disabled
+set parallel_leader_participation = off;
+explain (costs off)
+   select count(*) from tenk1 group by twenty;
+                     QUERY PLAN                     
+----------------------------------------------------
+ Finalize GroupAggregate
+   Group Key: twenty
+   ->  Gather Merge
+         Workers Planned: 4
+         ->  Partial GroupAggregate
+               Group Key: twenty
+               ->  Sort
+                     Sort Key: twenty
+                     ->  Parallel Seq Scan on tenk1
+(9 rows)
+
+select count(*) from tenk1 group by twenty;
+ count 
+-------
+   500
+   500
+   500
+   500
+   500
+   500
+   500
+   500
+   500
+   500
+   500
+   500
+   500
+   500
+   500
+   500
+   500
+   500
+   500
+   500
+(20 rows)
+
+reset parallel_leader_participation;
 --test rescan behavior of gather merge
 set enable_material = false;
 explain (costs off)
@@ -508,6 +594,33 @@ select string4 from tenk1 order by string4 limit 5;
  AAAAxx
 (5 rows)
 
+-- gather merge test with 0 workers, with parallel leader
+-- participation disabled (the leader will have to run the plan
+-- despite the setting)
+set parallel_leader_participation = off;
+explain (costs off)
+   select string4 from tenk1 order by string4 limit 5;
+                  QUERY PLAN                  
+----------------------------------------------
+ Limit
+   ->  Gather Merge
+         Workers Planned: 4
+         ->  Sort
+               Sort Key: string4
+               ->  Parallel Seq Scan on tenk1
+(6 rows)
+
+select string4 from tenk1 order by string4 limit 5;
+ string4 
+---------
+ AAAAxx
+ AAAAxx
+ AAAAxx
+ AAAAxx
+ AAAAxx
+(5 rows)
+
+reset parallel_leader_participation;
 reset max_parallel_workers;
 SAVEPOINT settings;
 SET LOCAL force_parallel_mode = 1;
index 1bd2821083d08534a38921fd61bcf26d6c358123..b701b35408e45b6b856c2af781380bcfd4526f63 100644 (file)
@@ -19,6 +19,22 @@ explain (costs off)
   select count(*) from a_star;
 select count(*) from a_star;
 
+-- test with leader participation disabled
+set parallel_leader_participation = off;
+explain (costs off)
+  select count(*) from tenk1 where stringu1 = 'GRAAAA';
+select count(*) from tenk1 where stringu1 = 'GRAAAA';
+
+-- test with leader participation disabled, but no workers available (so
+-- the leader will have to run the plan despite the setting)
+set max_parallel_workers = 0;
+explain (costs off)
+  select count(*) from tenk1 where stringu1 = 'GRAAAA';
+select count(*) from tenk1 where stringu1 = 'GRAAAA';
+
+reset max_parallel_workers;
+reset parallel_leader_participation;
+
 -- test that parallel_restricted function doesn't run in worker
 alter table tenk1 set (parallel_workers = 4);
 explain (verbose, costs off)
@@ -157,6 +173,16 @@ explain (costs off, verbose)
 
 drop function simple_func(integer);
 
+-- test gather merge with parallel leader participation disabled
+set parallel_leader_participation = off;
+
+explain (costs off)
+   select count(*) from tenk1 group by twenty;
+
+select count(*) from tenk1 group by twenty;
+
+reset parallel_leader_participation;
+
 --test rescan behavior of gather merge
 set enable_material = false;
 
@@ -192,6 +218,16 @@ set max_parallel_workers = 0;
 explain (costs off)
    select string4 from tenk1 order by string4 limit 5;
 select string4 from tenk1 order by string4 limit 5;
+
+-- gather merge test with 0 workers, with parallel leader
+-- participation disabled (the leader will have to run the plan
+-- despite the setting)
+set parallel_leader_participation = off;
+explain (costs off)
+   select string4 from tenk1 order by string4 limit 5;
+select string4 from tenk1 order by string4 limit 5;
+
+reset parallel_leader_participation;
 reset max_parallel_workers;
 
 SAVEPOINT settings;