From 166b61a88ef8e9fb97eba7b7ab8062e214c93af8 Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Wed, 18 Nov 2015 12:35:25 -0500 Subject: [PATCH] Avoid aggregating worker instrumentation multiple times. Amit Kapila, per design ideas from me. --- src/backend/executor/execParallel.c | 18 +++++++++++++----- src/backend/executor/nodeGather.c | 6 +----- src/include/executor/execParallel.h | 3 ++- 3 files changed, 16 insertions(+), 11 deletions(-) diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index eae13c5647..6730037710 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -277,13 +277,15 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize) } /* - * Re-initialize the response queues for backend workers to return tuples - * to the main backend and start the workers. + * Re-initialize the parallel executor info such that it can be reused by + * workers. */ -shm_mq_handle ** -ExecParallelReinitializeTupleQueues(ParallelContext *pcxt) +void +ExecParallelReinitialize(ParallelExecutorInfo *pei) { - return ExecParallelSetupTupleQueues(pcxt, true); + ReinitializeParallelDSM(pei->pcxt); + pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true); + pei->finished = false; } /* @@ -308,6 +310,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) /* Allocate object for return value. */ pei = palloc0(sizeof(ParallelExecutorInfo)); + pei->finished = false; pei->planstate = planstate; /* Fix up and serialize plan to be sent to workers. */ @@ -469,6 +472,9 @@ ExecParallelFinish(ParallelExecutorInfo *pei) { int i; + if (pei->finished) + return; + /* First, wait for the workers to finish. */ WaitForParallelWorkersToFinish(pei->pcxt); @@ -480,6 +486,8 @@ ExecParallelFinish(ParallelExecutorInfo *pei) if (pei->instrumentation) ExecParallelRetrieveInstrumentation(pei->planstate, pei->instrumentation); + + pei->finished = true; } /* diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c index b368b48d01..b6e82d1664 100644 --- a/src/backend/executor/nodeGather.c +++ b/src/backend/executor/nodeGather.c @@ -456,11 +456,7 @@ ExecReScanGather(GatherState *node) node->initialized = false; if (node->pei) - { - ReinitializeParallelDSM(node->pei->pcxt); - node->pei->tqueue = - ExecParallelReinitializeTupleQueues(node->pei->pcxt); - } + ExecParallelReinitialize(node->pei); ExecReScan(node->ps.lefttree); } diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h index 23c29ebb90..b43af1dd2b 100644 --- a/src/include/executor/execParallel.h +++ b/src/include/executor/execParallel.h @@ -27,12 +27,13 @@ typedef struct ParallelExecutorInfo BufferUsage *buffer_usage; SharedExecutorInstrumentation *instrumentation; shm_mq_handle **tqueue; + bool finished; } ParallelExecutorInfo; extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers); extern void ExecParallelFinish(ParallelExecutorInfo *pei); extern void ExecParallelCleanup(ParallelExecutorInfo *pei); -extern shm_mq_handle **ExecParallelReinitializeTupleQueues(ParallelContext *pcxt); +extern void ExecParallelReinitialize(ParallelExecutorInfo *pei); #endif /* EXECPARALLEL_H */ -- 2.40.0