]> granicus.if.org Git - postgresql/commitdiff
Minor optimizations based on ParallelContext having nworkers_launched.
authorRobert Haas <rhaas@postgresql.org>
Fri, 4 Mar 2016 17:59:10 +0000 (12:59 -0500)
committerRobert Haas <rhaas@postgresql.org>
Fri, 4 Mar 2016 17:59:10 +0000 (12:59 -0500)
Originally, we didn't have nworkers_launched, so code that used parallel
contexts had to be preprared for the possibility that not all of the
workers requested actually got launched.  But now we can count on knowing
the number of workers that were successfully launched, which can shave
off a few cycles and simplify some code slightly.

Amit Kapila, reviewed by Haribabu Kommi, per a suggestion from Peter
Geoghegan.

src/backend/access/transam/parallel.c
src/backend/executor/execParallel.c
src/backend/executor/nodeGather.c

index 4f91cd0265db041c671ab39ace93dd70b7a170ea..0bba9a7dbdaef765292fc528062315017586b859 100644 (file)
@@ -520,7 +520,7 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt)
                 */
                CHECK_FOR_INTERRUPTS();
 
-               for (i = 0; i < pcxt->nworkers; ++i)
+               for (i = 0; i < pcxt->nworkers_launched; ++i)
                {
                        if (pcxt->worker[i].error_mqh != NULL)
                        {
@@ -560,7 +560,7 @@ WaitForParallelWorkersToExit(ParallelContext *pcxt)
        int                     i;
 
        /* Wait until the workers actually die. */
-       for (i = 0; i < pcxt->nworkers; ++i)
+       for (i = 0; i < pcxt->nworkers_launched; ++i)
        {
                BgwHandleStatus status;
 
@@ -610,7 +610,7 @@ DestroyParallelContext(ParallelContext *pcxt)
        /* Kill each worker in turn, and forget their error queues. */
        if (pcxt->worker != NULL)
        {
-               for (i = 0; i < pcxt->nworkers; ++i)
+               for (i = 0; i < pcxt->nworkers_launched; ++i)
                {
                        if (pcxt->worker[i].error_mqh != NULL)
                        {
@@ -708,7 +708,7 @@ HandleParallelMessages(void)
                if (pcxt->worker == NULL)
                        continue;
 
-               for (i = 0; i < pcxt->nworkers; ++i)
+               for (i = 0; i < pcxt->nworkers_launched; ++i)
                {
                        /*
                         * Read as many messages as we can from each worker, but stop when
index 95e8e41d2bbc86f8a744f2722fbf0921da322761..93c786abdbe4dc265abe1bc867480f555e086b59 100644 (file)
@@ -522,7 +522,7 @@ ExecParallelFinish(ParallelExecutorInfo *pei)
        WaitForParallelWorkersToFinish(pei->pcxt);
 
        /* Next, accumulate buffer usage. */
-       for (i = 0; i < pei->pcxt->nworkers; ++i)
+       for (i = 0; i < pei->pcxt->nworkers_launched; ++i)
                InstrAccumParallelQuery(&pei->buffer_usage[i]);
 
        /* Finally, accumulate instrumentation, if any. */
index 16c981b48b277a26b84040c59d736d04df54c205..3f0ed6963277bf5e8ef438580e2b4835c95c597d 100644 (file)
@@ -153,7 +153,6 @@ ExecGather(GatherState *node)
                if (gather->num_workers > 0 && IsInParallelMode())
                {
                        ParallelContext *pcxt;
-                       bool    got_any_worker = false;
 
                        /* Initialize the workers required to execute Gather node. */
                        if (!node->pei)
@@ -169,29 +168,26 @@ ExecGather(GatherState *node)
                        LaunchParallelWorkers(pcxt);
 
                        /* Set up tuple queue readers to read the results. */
-                       if (pcxt->nworkers > 0)
+                       if (pcxt->nworkers_launched > 0)
                        {
                                node->nreaders = 0;
                                node->reader =
-                                       palloc(pcxt->nworkers * sizeof(TupleQueueReader *));
+                                       palloc(pcxt->nworkers_launched * sizeof(TupleQueueReader *));
 
-                               for (i = 0; i < pcxt->nworkers; ++i)
+                               for (i = 0; i < pcxt->nworkers_launched; ++i)
                                {
-                                       if (pcxt->worker[i].bgwhandle == NULL)
-                                               continue;
-
                                        shm_mq_set_handle(node->pei->tqueue[i],
                                                                          pcxt->worker[i].bgwhandle);
                                        node->reader[node->nreaders++] =
                                                CreateTupleQueueReader(node->pei->tqueue[i],
                                                                                           fslot->tts_tupleDescriptor);
-                                       got_any_worker = true;
                                }
                        }
-
-                       /* No workers?  Then never mind. */
-                       if (!got_any_worker)
+                       else
+                       {
+                               /* No workers?  Then never mind. */
                                ExecShutdownGatherWorkers(node);
+                       }
                }
 
                /* Run plan locally if no workers or not single-copy. */