]> granicus.if.org Git - postgresql/commitdiff
Fix accumulation of parallel worker instrumentation.
authorRobert Haas <rhaas@postgresql.org>
Tue, 5 Dec 2017 19:35:33 +0000 (14:35 -0500)
committerRobert Haas <rhaas@postgresql.org>
Tue, 5 Dec 2017 19:35:33 +0000 (14:35 -0500)
When a Gather or Gather Merge node is started and stopped multiple
times, the old code wouldn't reset the shared state between executions,
potentially resulting in dramatically inflated instrumentation data
for nodes beneath it.  (The per-worker instrumentation ended up OK,
I think, but the overall totals were inflated.)

Report by hubert depesz lubaczewski.  Analysis and fix by Amit Kapila,
reviewed and tweaked a bit by me.

Discussion: http://postgr.es/m/20171127175631.GA405@depesz.com

src/backend/executor/execParallel.c
src/test/regress/expected/select_parallel.out
src/test/regress/sql/select_parallel.sql

index 0aca00b0e68ca2608ed9bff4ed79e0355fde0165..ff5cff59b0f02b4e375c8b8c564e9411f6961ac9 100644 (file)
@@ -808,6 +808,19 @@ ExecParallelReinitialize(PlanState *planstate,
        /* Old workers must already be shut down */
        Assert(pei->finished);
 
+       /* Clear the instrumentation space from the last round. */
+       if (pei->instrumentation)
+       {
+               Instrumentation *instrument;
+               SharedExecutorInstrumentation *sh_instr;
+               int                     i;
+
+               sh_instr = pei->instrumentation;
+               instrument = GetInstrumentationArray(sh_instr);
+               for (i = 0; i < sh_instr->num_workers * sh_instr->num_plan_nodes; ++i)
+                       InstrInit(&instrument[i], pei->planstate->state->es_instrument);
+       }
+
        /* Force parameters we're going to pass to workers to be evaluated. */
        ExecEvalParamExecParams(sendParams, estate);
 
@@ -925,21 +938,33 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate,
        for (n = 0; n < instrumentation->num_workers; ++n)
                InstrAggNode(planstate->instrument, &instrument[n]);
 
-       /*
-        * Also store the per-worker detail.
-        *
-        * Worker instrumentation should be allocated in the same context as the
-        * regular instrumentation information, which is the per-query context.
-        * Switch into per-query memory context.
-        */
-       oldcontext = MemoryContextSwitchTo(planstate->state->es_query_cxt);
-       ibytes = mul_size(instrumentation->num_workers, sizeof(Instrumentation));
-       planstate->worker_instrument =
-               palloc(ibytes + offsetof(WorkerInstrumentation, instrument));
-       MemoryContextSwitchTo(oldcontext);
+       if (!planstate->worker_instrument)
+       {
+               /*
+                * Allocate space for the per-worker detail.
+                *
+                * Worker instrumentation should be allocated in the same context as
+                * the regular instrumentation information, which is the per-query
+                * context. Switch into per-query memory context.
+                */
+               oldcontext = MemoryContextSwitchTo(planstate->state->es_query_cxt);
+               ibytes =
+                       mul_size(instrumentation->num_workers, sizeof(Instrumentation));
+               planstate->worker_instrument =
+                       palloc(ibytes + offsetof(WorkerInstrumentation, instrument));
+               MemoryContextSwitchTo(oldcontext);
+
+               for (n = 0; n < instrumentation->num_workers; ++n)
+                       InstrInit(&planstate->worker_instrument->instrument[n],
+                                         planstate->state->es_instrument);
+       }
 
        planstate->worker_instrument->num_workers = instrumentation->num_workers;
-       memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
+
+       /* Accumulate the per-worker detail. */
+       for (n = 0; n < instrumentation->num_workers; ++n)
+               InstrAggNode(&planstate->worker_instrument->instrument[n],
+                                        &instrument[n]);
 
        /* Perform any node-type-specific work that needs to be done. */
        switch (nodeTag(planstate))
index d1d5b228ce0996b93b9716738c44358ad3e1cd59..b748c98c9156a749c5b78b5db874f0f9ad01a484 100644 (file)
@@ -378,7 +378,28 @@ select count(*) from bmscantest where a>1;
  99999
 (1 row)
 
+-- test accumulation of stats for parallel node
 reset enable_seqscan;
+alter table tenk2 set (parallel_workers = 0);
+explain (analyze, timing off, summary off, costs off)
+       select count(*) from tenk1, tenk2 where tenk1.hundred > 1
+        and tenk2.thousand=0;
+                                QUERY PLAN                                
+--------------------------------------------------------------------------
+ Aggregate (actual rows=1 loops=1)
+   ->  Nested Loop (actual rows=98000 loops=1)
+         ->  Seq Scan on tenk2 (actual rows=10 loops=1)
+               Filter: (thousand = 0)
+               Rows Removed by Filter: 9990
+         ->  Gather (actual rows=9800 loops=10)
+               Workers Planned: 4
+               Workers Launched: 4
+               ->  Parallel Seq Scan on tenk1 (actual rows=1960 loops=50)
+                     Filter: (hundred > 1)
+                     Rows Removed by Filter: 40
+(11 rows)
+
+alter table tenk2 reset (parallel_workers);
 reset enable_indexscan;
 reset enable_hashjoin;
 reset enable_mergejoin;
index bb4e34adbe0f1bfbafbfca0834bdae1e3e7dd222..00df92779c6b52907e0784e449c74074b191af86 100644 (file)
@@ -149,7 +149,14 @@ insert into bmscantest select r, 'fooooooooooooooooooooooooooooooooooooooooooooo
 create index i_bmtest ON bmscantest(a);
 select count(*) from bmscantest where a>1;
 
+-- test accumulation of stats for parallel node
 reset enable_seqscan;
+alter table tenk2 set (parallel_workers = 0);
+explain (analyze, timing off, summary off, costs off)
+       select count(*) from tenk1, tenk2 where tenk1.hundred > 1
+        and tenk2.thousand=0;
+alter table tenk2 reset (parallel_workers);
+
 reset enable_indexscan;
 reset enable_hashjoin;
 reset enable_mergejoin;