From: Robert Haas Date: Tue, 19 Dec 2017 17:21:56 +0000 (-0500) Subject: Try again to fix accumulation of parallel worker instrumentation. X-Git-Tag: REL_11_BETA1~1050 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=8526bcb2df76d5171b4f4d6dc7a97560a73a5eff;p=postgresql Try again to fix accumulation of parallel worker instrumentation. When a Gather or Gather Merge node is started and stopped multiple times, accumulate instrumentation data only once, at the end, instead of after each execution, to avoid recording inflated totals. Commit 778e78ae9fa51e58f41cbdc72b293291d02d8984, the previous attempt at a fix, instead reset the state after every execution, which worked for the general instrumentation data but had problems for the additional instrumentation specific to Sort and Hash nodes. Report by hubert depesz lubaczewski. Analysis and fix by Amit Kapila, following a design proposal from Thomas Munro, with a comment tweak by me. Discussion: http://postgr.es/m/20171127175631.GA405@depesz.com --- diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 6b6064637b..02b5aa517b 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -899,12 +899,8 @@ ExecParallelReInitializeDSM(PlanState *planstate, pcxt); break; case T_HashState: - /* even when not parallel-aware, for EXPLAIN ANALYZE */ - ExecHashReInitializeDSM((HashState *) planstate, pcxt); - break; case T_SortState: - /* even when not parallel-aware, for EXPLAIN ANALYZE */ - ExecSortReInitializeDSM((SortState *) planstate, pcxt); + /* these nodes have DSM state, but no reinitialization is required */ break; default: @@ -977,7 +973,7 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate, /* * Finish parallel execution. We wait for parallel workers to finish, and - * accumulate their buffer usage and instrumentation. + * accumulate their buffer usage. */ void ExecParallelFinish(ParallelExecutorInfo *pei) @@ -1023,23 +1019,23 @@ ExecParallelFinish(ParallelExecutorInfo *pei) for (i = 0; i < nworkers; i++) InstrAccumParallelQuery(&pei->buffer_usage[i]); - /* Finally, accumulate instrumentation, if any. */ - if (pei->instrumentation) - ExecParallelRetrieveInstrumentation(pei->planstate, - pei->instrumentation); - pei->finished = true; } /* - * Clean up whatever ParallelExecutorInfo resources still exist after - * ExecParallelFinish. We separate these routines because someone might - * want to examine the contents of the DSM after ExecParallelFinish and - * before calling this routine. + * Accumulate instrumentation, and then clean up whatever ParallelExecutorInfo + * resources still exist after ExecParallelFinish. We separate these + * routines because someone might want to examine the contents of the DSM + * after ExecParallelFinish and before calling this routine. */ void ExecParallelCleanup(ParallelExecutorInfo *pei) { + /* Accumulate instrumentation, if any. */ + if (pei->instrumentation) + ExecParallelRetrieveInstrumentation(pei->planstate, + pei->instrumentation); + /* Free any serialized parameters. */ if (DsaPointerIsValid(pei->param_exec)) { diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index 6fe5d69d55..afd7384e94 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -1669,19 +1669,6 @@ ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt) node->shared_info); } -/* - * Reset shared state before beginning a fresh scan. - */ -void -ExecHashReInitializeDSM(HashState *node, ParallelContext *pcxt) -{ - if (node->shared_info != NULL) - { - memset(node->shared_info->hinstrument, 0, - node->shared_info->num_workers * sizeof(HashInstrumentation)); - } -} - /* * Locate the DSM space for hash table instrumentation data that we'll write * to at shutdown time. diff --git a/src/backend/executor/nodeSort.c b/src/backend/executor/nodeSort.c index 73aa3715e6..d593378f74 100644 --- a/src/backend/executor/nodeSort.c +++ b/src/backend/executor/nodeSort.c @@ -396,23 +396,6 @@ ExecSortInitializeDSM(SortState *node, ParallelContext *pcxt) node->shared_info); } -/* ---------------------------------------------------------------- - * ExecSortReInitializeDSM - * - * Reset shared state before beginning a fresh scan. - * ---------------------------------------------------------------- - */ -void -ExecSortReInitializeDSM(SortState *node, ParallelContext *pcxt) -{ - /* If there's any instrumentation space, clear it for next time */ - if (node->shared_info != NULL) - { - memset(node->shared_info->sinstrument, 0, - node->shared_info->num_workers * sizeof(TuplesortInstrumentation)); - } -} - /* ---------------------------------------------------------------- * ExecSortInitializeWorker * diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h index 75d4c70f6f..0974f1edc2 100644 --- a/src/include/executor/nodeHash.h +++ b/src/include/executor/nodeHash.h @@ -52,7 +52,6 @@ extern int ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue); extern void ExecHashEstimate(HashState *node, ParallelContext *pcxt); extern void ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt); extern void ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt); -extern void ExecHashReInitializeDSM(HashState *node, ParallelContext *pcxt); extern void ExecHashRetrieveInstrumentation(HashState *node); extern void ExecShutdownHash(HashState *node); extern void ExecHashGetInstrumentation(HashInstrumentation *instrument, diff --git a/src/include/executor/nodeSort.h b/src/include/executor/nodeSort.h index cc61a9db69..627a04c3fd 100644 --- a/src/include/executor/nodeSort.h +++ b/src/include/executor/nodeSort.h @@ -26,7 +26,6 @@ extern void ExecReScanSort(SortState *node); /* parallel instrumentation support */ extern void ExecSortEstimate(SortState *node, ParallelContext *pcxt); extern void ExecSortInitializeDSM(SortState *node, ParallelContext *pcxt); -extern void ExecSortReInitializeDSM(SortState *node, ParallelContext *pcxt); extern void ExecSortInitializeWorker(SortState *node, ParallelWorkerContext *pwcxt); extern void ExecSortRetrieveInstrumentation(SortState *node); diff --git a/src/test/regress/expected/select_parallel.out b/src/test/regress/expected/select_parallel.out index 86a55922c8..7824ca52ca 100644 --- a/src/test/regress/expected/select_parallel.out +++ b/src/test/regress/expected/select_parallel.out @@ -465,14 +465,71 @@ select count(*) from bmscantest where a>1; 99999 (1 row) +-- test accumulation of stats for parallel nodes reset enable_seqscan; +alter table tenk2 set (parallel_workers = 0); +explain (analyze, timing off, summary off, costs off) + select count(*) from tenk1, tenk2 where tenk1.hundred > 1 + and tenk2.thousand=0; + QUERY PLAN +-------------------------------------------------------------------------- + Aggregate (actual rows=1 loops=1) + -> Nested Loop (actual rows=98000 loops=1) + -> Seq Scan on tenk2 (actual rows=10 loops=1) + Filter: (thousand = 0) + Rows Removed by Filter: 9990 + -> Gather (actual rows=9800 loops=10) + Workers Planned: 4 + Workers Launched: 4 + -> Parallel Seq Scan on tenk1 (actual rows=1960 loops=50) + Filter: (hundred > 1) + Rows Removed by Filter: 40 +(11 rows) + +alter table tenk2 reset (parallel_workers); +reset work_mem; +create function explain_parallel_sort_stats() returns setof text +language plpgsql as +$$ +declare ln text; +begin + for ln in + explain (analyze, timing off, summary off, costs off) + select * from + (select ten from tenk1 where ten < 100 order by ten) ss + right join (values (1),(2),(3)) v(x) on true + loop + ln := regexp_replace(ln, 'Memory: \S*', 'Memory: xxx'); + return next ln; + end loop; +end; +$$; +select * from explain_parallel_sort_stats(); + explain_parallel_sort_stats +-------------------------------------------------------------------------- + Nested Loop Left Join (actual rows=30000 loops=1) + -> Values Scan on "*VALUES*" (actual rows=3 loops=1) + -> Gather Merge (actual rows=10000 loops=3) + Workers Planned: 4 + Workers Launched: 4 + -> Sort (actual rows=2000 loops=15) + Sort Key: tenk1.ten + Sort Method: quicksort Memory: xxx + Worker 0: Sort Method: quicksort Memory: xxx + Worker 1: Sort Method: quicksort Memory: xxx + Worker 2: Sort Method: quicksort Memory: xxx + Worker 3: Sort Method: quicksort Memory: xxx + -> Parallel Seq Scan on tenk1 (actual rows=2000 loops=15) + Filter: (ten < 100) +(14 rows) + reset enable_indexscan; reset enable_hashjoin; reset enable_mergejoin; reset enable_material; reset effective_io_concurrency; -reset work_mem; drop table bmscantest; +drop function explain_parallel_sort_stats(); -- test parallel merge join path. set enable_hashjoin to off; set enable_nestloop to off; diff --git a/src/test/regress/sql/select_parallel.sql b/src/test/regress/sql/select_parallel.sql index fb35ca3376..b12ba0b74a 100644 --- a/src/test/regress/sql/select_parallel.sql +++ b/src/test/regress/sql/select_parallel.sql @@ -179,14 +179,40 @@ insert into bmscantest select r, 'fooooooooooooooooooooooooooooooooooooooooooooo create index i_bmtest ON bmscantest(a); select count(*) from bmscantest where a>1; +-- test accumulation of stats for parallel nodes reset enable_seqscan; +alter table tenk2 set (parallel_workers = 0); +explain (analyze, timing off, summary off, costs off) + select count(*) from tenk1, tenk2 where tenk1.hundred > 1 + and tenk2.thousand=0; +alter table tenk2 reset (parallel_workers); + +reset work_mem; +create function explain_parallel_sort_stats() returns setof text +language plpgsql as +$$ +declare ln text; +begin + for ln in + explain (analyze, timing off, summary off, costs off) + select * from + (select ten from tenk1 where ten < 100 order by ten) ss + right join (values (1),(2),(3)) v(x) on true + loop + ln := regexp_replace(ln, 'Memory: \S*', 'Memory: xxx'); + return next ln; + end loop; +end; +$$; +select * from explain_parallel_sort_stats(); + reset enable_indexscan; reset enable_hashjoin; reset enable_mergejoin; reset enable_material; reset effective_io_concurrency; -reset work_mem; drop table bmscantest; +drop function explain_parallel_sort_stats(); -- test parallel merge join path. set enable_hashjoin to off;