]> granicus.if.org Git - postgresql/commitdiff
Try again to fix accumulation of parallel worker instrumentation.
authorRobert Haas <rhaas@postgresql.org>
Tue, 19 Dec 2017 17:21:56 +0000 (12:21 -0500)
committerRobert Haas <rhaas@postgresql.org>
Tue, 19 Dec 2017 17:21:56 +0000 (12:21 -0500)
When a Gather or Gather Merge node is started and stopped multiple
times, accumulate instrumentation data only once, at the end, instead
of after each execution, to avoid recording inflated totals.

Commit 778e78ae9fa51e58f41cbdc72b293291d02d8984, the previous attempt
at a fix, instead reset the state after every execution, which worked
for the general instrumentation data but had problems for the additional
instrumentation specific to Sort and Hash nodes.

Report by hubert depesz lubaczewski.  Analysis and fix by Amit Kapila,
following a design proposal from Thomas Munro, with a comment tweak
by me.

Discussion: http://postgr.es/m/20171127175631.GA405@depesz.com

src/backend/executor/execParallel.c
src/backend/executor/nodeHash.c
src/backend/executor/nodeSort.c
src/include/executor/nodeHash.h
src/include/executor/nodeSort.h
src/test/regress/expected/select_parallel.out
src/test/regress/sql/select_parallel.sql

index 6b6064637b88a767741cc8efd67c87a8d2608296..02b5aa517b5f0c8a6bdbb1e5ab19bfb055396ec9 100644 (file)
@@ -899,12 +899,8 @@ ExecParallelReInitializeDSM(PlanState *planstate,
                                                                                          pcxt);
                        break;
                case T_HashState:
-                       /* even when not parallel-aware, for EXPLAIN ANALYZE */
-                       ExecHashReInitializeDSM((HashState *) planstate, pcxt);
-                       break;
                case T_SortState:
-                       /* even when not parallel-aware, for EXPLAIN ANALYZE */
-                       ExecSortReInitializeDSM((SortState *) planstate, pcxt);
+                       /* these nodes have DSM state, but no reinitialization is required */
                        break;
 
                default:
@@ -977,7 +973,7 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate,
 
 /*
  * Finish parallel execution.  We wait for parallel workers to finish, and
- * accumulate their buffer usage and instrumentation.
+ * accumulate their buffer usage.
  */
 void
 ExecParallelFinish(ParallelExecutorInfo *pei)
@@ -1023,23 +1019,23 @@ ExecParallelFinish(ParallelExecutorInfo *pei)
        for (i = 0; i < nworkers; i++)
                InstrAccumParallelQuery(&pei->buffer_usage[i]);
 
-       /* Finally, accumulate instrumentation, if any. */
-       if (pei->instrumentation)
-               ExecParallelRetrieveInstrumentation(pei->planstate,
-                                                                                       pei->instrumentation);
-
        pei->finished = true;
 }
 
 /*
- * Clean up whatever ParallelExecutorInfo resources still exist after
- * ExecParallelFinish.  We separate these routines because someone might
- * want to examine the contents of the DSM after ExecParallelFinish and
- * before calling this routine.
+ * Accumulate instrumentation, and then clean up whatever ParallelExecutorInfo
+ * resources still exist after ExecParallelFinish.  We separate these
+ * routines because someone might want to examine the contents of the DSM
+ * after ExecParallelFinish and before calling this routine.
  */
 void
 ExecParallelCleanup(ParallelExecutorInfo *pei)
 {
+       /* Accumulate instrumentation, if any. */
+       if (pei->instrumentation)
+               ExecParallelRetrieveInstrumentation(pei->planstate,
+                                                                                       pei->instrumentation);
+
        /* Free any serialized parameters. */
        if (DsaPointerIsValid(pei->param_exec))
        {
index 6fe5d69d5589086da7524d09dbba98242916aed1..afd7384e945f26f694db1fcbd18e270f6398f375 100644 (file)
@@ -1669,19 +1669,6 @@ ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt)
                                   node->shared_info);
 }
 
-/*
- * Reset shared state before beginning a fresh scan.
- */
-void
-ExecHashReInitializeDSM(HashState *node, ParallelContext *pcxt)
-{
-       if (node->shared_info != NULL)
-       {
-               memset(node->shared_info->hinstrument, 0,
-                          node->shared_info->num_workers * sizeof(HashInstrumentation));
-       }
-}
-
 /*
  * Locate the DSM space for hash table instrumentation data that we'll write
  * to at shutdown time.
index 73aa3715e6d56778a94ed1e5475dacd58c1e9f46..d593378f74fc2ee474b14a16ed1287ab737ea576 100644 (file)
@@ -396,23 +396,6 @@ ExecSortInitializeDSM(SortState *node, ParallelContext *pcxt)
                                   node->shared_info);
 }
 
-/* ----------------------------------------------------------------
- *             ExecSortReInitializeDSM
- *
- *             Reset shared state before beginning a fresh scan.
- * ----------------------------------------------------------------
- */
-void
-ExecSortReInitializeDSM(SortState *node, ParallelContext *pcxt)
-{
-       /* If there's any instrumentation space, clear it for next time */
-       if (node->shared_info != NULL)
-       {
-               memset(node->shared_info->sinstrument, 0,
-                          node->shared_info->num_workers * sizeof(TuplesortInstrumentation));
-       }
-}
-
 /* ----------------------------------------------------------------
  *             ExecSortInitializeWorker
  *
index 75d4c70f6f6f15aeb451fe0ef65823088fe749fd..0974f1edc2163cf5accd07070b49df3237da332d 100644 (file)
@@ -52,7 +52,6 @@ extern int    ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue);
 extern void ExecHashEstimate(HashState *node, ParallelContext *pcxt);
 extern void ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt);
 extern void ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt);
-extern void ExecHashReInitializeDSM(HashState *node, ParallelContext *pcxt);
 extern void ExecHashRetrieveInstrumentation(HashState *node);
 extern void ExecShutdownHash(HashState *node);
 extern void ExecHashGetInstrumentation(HashInstrumentation *instrument,
index cc61a9db6977fa67f2c817f0415e310eccdd673d..627a04c3fdc8e5d582e87628ea89b873ef773fba 100644 (file)
@@ -26,7 +26,6 @@ extern void ExecReScanSort(SortState *node);
 /* parallel instrumentation support */
 extern void ExecSortEstimate(SortState *node, ParallelContext *pcxt);
 extern void ExecSortInitializeDSM(SortState *node, ParallelContext *pcxt);
-extern void ExecSortReInitializeDSM(SortState *node, ParallelContext *pcxt);
 extern void ExecSortInitializeWorker(SortState *node, ParallelWorkerContext *pwcxt);
 extern void ExecSortRetrieveInstrumentation(SortState *node);
 
index 86a55922c878f92ebd90615a225cd5475969ef74..7824ca52ca435553b92eb64969e79955503b171e 100644 (file)
@@ -465,14 +465,71 @@ select count(*) from bmscantest where a>1;
  99999
 (1 row)
 
+-- test accumulation of stats for parallel nodes
 reset enable_seqscan;
+alter table tenk2 set (parallel_workers = 0);
+explain (analyze, timing off, summary off, costs off)
+   select count(*) from tenk1, tenk2 where tenk1.hundred > 1
+        and tenk2.thousand=0;
+                                QUERY PLAN                                
+--------------------------------------------------------------------------
+ Aggregate (actual rows=1 loops=1)
+   ->  Nested Loop (actual rows=98000 loops=1)
+         ->  Seq Scan on tenk2 (actual rows=10 loops=1)
+               Filter: (thousand = 0)
+               Rows Removed by Filter: 9990
+         ->  Gather (actual rows=9800 loops=10)
+               Workers Planned: 4
+               Workers Launched: 4
+               ->  Parallel Seq Scan on tenk1 (actual rows=1960 loops=50)
+                     Filter: (hundred > 1)
+                     Rows Removed by Filter: 40
+(11 rows)
+
+alter table tenk2 reset (parallel_workers);
+reset work_mem;
+create function explain_parallel_sort_stats() returns setof text
+language plpgsql as
+$$
+declare ln text;
+begin
+    for ln in
+        explain (analyze, timing off, summary off, costs off)
+          select * from
+          (select ten from tenk1 where ten < 100 order by ten) ss
+          right join (values (1),(2),(3)) v(x) on true
+    loop
+        ln := regexp_replace(ln, 'Memory: \S*',  'Memory: xxx');
+        return next ln;
+    end loop;
+end;
+$$;
+select * from explain_parallel_sort_stats();
+                       explain_parallel_sort_stats                        
+--------------------------------------------------------------------------
+ Nested Loop Left Join (actual rows=30000 loops=1)
+   ->  Values Scan on "*VALUES*" (actual rows=3 loops=1)
+   ->  Gather Merge (actual rows=10000 loops=3)
+         Workers Planned: 4
+         Workers Launched: 4
+         ->  Sort (actual rows=2000 loops=15)
+               Sort Key: tenk1.ten
+               Sort Method: quicksort  Memory: xxx
+               Worker 0:  Sort Method: quicksort  Memory: xxx
+               Worker 1:  Sort Method: quicksort  Memory: xxx
+               Worker 2:  Sort Method: quicksort  Memory: xxx
+               Worker 3:  Sort Method: quicksort  Memory: xxx
+               ->  Parallel Seq Scan on tenk1 (actual rows=2000 loops=15)
+                     Filter: (ten < 100)
+(14 rows)
+
 reset enable_indexscan;
 reset enable_hashjoin;
 reset enable_mergejoin;
 reset enable_material;
 reset effective_io_concurrency;
-reset work_mem;
 drop table bmscantest;
+drop function explain_parallel_sort_stats();
 -- test parallel merge join path.
 set enable_hashjoin to off;
 set enable_nestloop to off;
index fb35ca33769e79262eb1da68831ec7c3d0e17067..b12ba0b74a075f4380a8ded67a4c8865c52b88da 100644 (file)
@@ -179,14 +179,40 @@ insert into bmscantest select r, 'fooooooooooooooooooooooooooooooooooooooooooooo
 create index i_bmtest ON bmscantest(a);
 select count(*) from bmscantest where a>1;
 
+-- test accumulation of stats for parallel nodes
 reset enable_seqscan;
+alter table tenk2 set (parallel_workers = 0);
+explain (analyze, timing off, summary off, costs off)
+   select count(*) from tenk1, tenk2 where tenk1.hundred > 1
+        and tenk2.thousand=0;
+alter table tenk2 reset (parallel_workers);
+
+reset work_mem;
+create function explain_parallel_sort_stats() returns setof text
+language plpgsql as
+$$
+declare ln text;
+begin
+    for ln in
+        explain (analyze, timing off, summary off, costs off)
+          select * from
+          (select ten from tenk1 where ten < 100 order by ten) ss
+          right join (values (1),(2),(3)) v(x) on true
+    loop
+        ln := regexp_replace(ln, 'Memory: \S*',  'Memory: xxx');
+        return next ln;
+    end loop;
+end;
+$$;
+select * from explain_parallel_sort_stats();
+
 reset enable_indexscan;
 reset enable_hashjoin;
 reset enable_mergejoin;
 reset enable_material;
 reset effective_io_concurrency;
-reset work_mem;
 drop table bmscantest;
+drop function explain_parallel_sort_stats();
 
 -- test parallel merge join path.
 set enable_hashjoin to off;