]> granicus.if.org Git - postgresql/commitdiff
Allow a parallel context to relaunch workers.
authorRobert Haas <rhaas@postgresql.org>
Fri, 16 Oct 2015 21:18:05 +0000 (17:18 -0400)
committerRobert Haas <rhaas@postgresql.org>
Fri, 16 Oct 2015 21:18:05 +0000 (17:18 -0400)
This may allow some callers to avoid the overhead involved in tearing
down a parallel context and then setting up a new one, which means
releasing the DSM and then allocating and populating a new one.  I
suspect we'll want to revise the Gather node to make use of this new
capability, but even if not it may be useful elsewhere and requires
very little additional code.

src/backend/access/transam/README.parallel
src/backend/access/transam/parallel.c
src/include/access/parallel.h

index 10051863fed6c3ed7d7c7002f0886ad7258d894b..dfcbafabf0831563c027ad0e143f1abeb9decd62 100644 (file)
@@ -221,3 +221,8 @@ pattern looks like this:
        DestroyParallelContext(pcxt);
 
        ExitParallelMode();
+
+If desired, after WaitForParallelWorkersToFinish() has been called, another
+call to LaunchParallelWorkers() can be made using the same parallel context.
+Calls to these two functions can be alternated any number of times before
+destroying the parallel context.
index 17f9a5ae6e433e0bb4349f403805ae29d19f71ed..0085987f32439f9cc35b4c1609f4e308aba9bad0 100644 (file)
@@ -404,6 +404,52 @@ LaunchParallelWorkers(ParallelContext *pcxt)
        /* We might be running in a short-lived memory context. */
        oldcontext = MemoryContextSwitchTo(TopTransactionContext);
 
+       /*
+        * This function can be called for a parallel context for which it has
+        * already been called previously, but only if all of the old workers
+        * have already exited.  When this case arises, we need to do some extra
+        * reinitialization.
+        */
+       if (pcxt->nworkers_launched > 0)
+       {
+               FixedParallelState *fps;
+               char       *error_queue_space;
+
+               /* Clean out old worker handles. */
+               for (i = 0; i < pcxt->nworkers; ++i)
+               {
+                       if (pcxt->worker[i].error_mqh != NULL)
+                               elog(ERROR, "previously launched worker still alive");
+                       if (pcxt->worker[i].bgwhandle != NULL)
+                       {
+                               pfree(pcxt->worker[i].bgwhandle);
+                               pcxt->worker[i].bgwhandle = NULL;
+                       }
+               }
+
+               /* Reset a few bits of fixed parallel state to a clean state. */
+               fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED);
+               fps->workers_attached = 0;
+               fps->last_xlog_end = 0;
+
+               /* Recreate error queues. */
+               error_queue_space =
+                       shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE);
+               for (i = 0; i < pcxt->nworkers; ++i)
+               {
+                       char       *start;
+                       shm_mq     *mq;
+
+                       start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
+                       mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
+                       shm_mq_set_receiver(mq, MyProc);
+                       pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
+               }
+
+               /* Reset number of workers launched. */
+               pcxt->nworkers_launched = 0;
+       }
+
        /* Configure a worker. */
        snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d",
                         MyProcPid);
@@ -428,8 +474,11 @@ LaunchParallelWorkers(ParallelContext *pcxt)
                if (!any_registrations_failed &&
                        RegisterDynamicBackgroundWorker(&worker,
                                                                                        &pcxt->worker[i].bgwhandle))
+               {
                        shm_mq_set_handle(pcxt->worker[i].error_mqh,
                                                          pcxt->worker[i].bgwhandle);
+                       pcxt->nworkers_launched++;
+               }
                else
                {
                        /*
index 44f0616cb86547fbe96ca5e164a9fe51ac00f1b0..d4b7c5dd75b4a7361df1597df10f0b198e618330 100644 (file)
@@ -35,6 +35,7 @@ typedef struct ParallelContext
        dlist_node      node;
        SubTransactionId subid;
        int                     nworkers;
+       int                     nworkers_launched;
        parallel_worker_main_type entrypoint;
        char       *library_name;
        char       *function_name;