]> granicus.if.org Git - postgresql/commitdiff
Update parallel executor support to reuse the same DSM.
authorRobert Haas <rhaas@postgresql.org>
Fri, 30 Oct 2015 09:43:00 +0000 (10:43 +0100)
committerRobert Haas <rhaas@postgresql.org>
Fri, 30 Oct 2015 09:44:54 +0000 (10:44 +0100)
Commit b0b0d84b3d663a148022e900ebfc164284a95f55 purported to make it
possible to relaunch workers using the same parallel context, but it had
an unpleasant race condition: we might reinitialize after the workers
have sent their last control message but before they have dettached the
DSM, leaving to crashes.  Repair by introducing a new ParallelContext
operation, ReinitializeParallelDSM.

Adjust execParallel.c to use this new support, so that we can rescan a
Gather node by relaunching workers but without needing to recreate the
DSM.

Amit Kapila, with some adjustments by me.  Extracted from latest parallel
sequential scan patch.

src/backend/access/transam/README.parallel
src/backend/access/transam/parallel.c
src/backend/executor/execParallel.c
src/backend/executor/nodeGather.c
src/include/access/parallel.h
src/include/executor/execParallel.h

index dfcbafabf0831563c027ad0e143f1abeb9decd62..db9ac3d504de788bf7a0a0729c18cfa5bff5d0c0 100644 (file)
@@ -222,7 +222,9 @@ pattern looks like this:
 
        ExitParallelMode();
 
-If desired, after WaitForParallelWorkersToFinish() has been called, another
-call to LaunchParallelWorkers() can be made using the same parallel context.
-Calls to these two functions can be alternated any number of times before
-destroying the parallel context.
+If desired, after WaitForParallelWorkersToFinish() has been called, the
+context can be reset so that workers can be launched anew using the same
+parallel context.  To do this, first call ReinitializeParallelDSM() to
+reinitialize state managed by the parallel context machinery itself; then,
+perform any other necessary resetting of state; after that, you can again
+call LaunchParallelWorkers.
index 35a873de6bac4aa5425ce173fed1eaa5e64d1bb1..79cc9880bbbacc175871c8a14827799d3615e002 100644 (file)
@@ -110,6 +110,7 @@ static void HandleParallelMessage(ParallelContext *, int, StringInfo msg);
 static void ParallelErrorContext(void *arg);
 static void ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc);
 static void ParallelWorkerMain(Datum main_arg);
+static void WaitForParallelWorkersToExit(ParallelContext *pcxt);
 
 /*
  * Establish a new parallel context.  This should be done after entering
@@ -383,6 +384,46 @@ InitializeParallelDSM(ParallelContext *pcxt)
        MemoryContextSwitchTo(oldcontext);
 }
 
+/*
+ * Reinitialize the dynamic shared memory segment for a parallel context such
+ * that we could launch workers for it again.
+ */
+void
+ReinitializeParallelDSM(ParallelContext *pcxt)
+{
+       FixedParallelState *fps;
+       char       *error_queue_space;
+       int                     i;
+
+       if (pcxt->nworkers_launched == 0)
+               return;
+
+       WaitForParallelWorkersToFinish(pcxt);
+       WaitForParallelWorkersToExit(pcxt);
+
+       /* Reset a few bits of fixed parallel state to a clean state. */
+       fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED);
+       fps->workers_attached = 0;
+       fps->last_xlog_end = 0;
+
+       /* Recreate error queues. */
+       error_queue_space =
+               shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE);
+       for (i = 0; i < pcxt->nworkers; ++i)
+       {
+               char       *start;
+               shm_mq     *mq;
+
+               start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
+               mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
+               shm_mq_set_receiver(mq, MyProc);
+               pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
+       }
+
+       /* Reset number of workers launched. */
+       pcxt->nworkers_launched = 0;
+}
+
 /*
  * Launch parallel workers.
  */
@@ -404,52 +445,6 @@ LaunchParallelWorkers(ParallelContext *pcxt)
        /* We might be running in a short-lived memory context. */
        oldcontext = MemoryContextSwitchTo(TopTransactionContext);
 
-       /*
-        * This function can be called for a parallel context for which it has
-        * already been called previously, but only if all of the old workers
-        * have already exited.  When this case arises, we need to do some extra
-        * reinitialization.
-        */
-       if (pcxt->nworkers_launched > 0)
-       {
-               FixedParallelState *fps;
-               char       *error_queue_space;
-
-               /* Clean out old worker handles. */
-               for (i = 0; i < pcxt->nworkers; ++i)
-               {
-                       if (pcxt->worker[i].error_mqh != NULL)
-                               elog(ERROR, "previously launched worker still alive");
-                       if (pcxt->worker[i].bgwhandle != NULL)
-                       {
-                               pfree(pcxt->worker[i].bgwhandle);
-                               pcxt->worker[i].bgwhandle = NULL;
-                       }
-               }
-
-               /* Reset a few bits of fixed parallel state to a clean state. */
-               fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED);
-               fps->workers_attached = 0;
-               fps->last_xlog_end = 0;
-
-               /* Recreate error queues. */
-               error_queue_space =
-                       shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE);
-               for (i = 0; i < pcxt->nworkers; ++i)
-               {
-                       char       *start;
-                       shm_mq     *mq;
-
-                       start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
-                       mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
-                       shm_mq_set_receiver(mq, MyProc);
-                       pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
-               }
-
-               /* Reset number of workers launched. */
-               pcxt->nworkers_launched = 0;
-       }
-
        /* Configure a worker. */
        snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d",
                         MyProcPid);
@@ -501,7 +496,7 @@ LaunchParallelWorkers(ParallelContext *pcxt)
 }
 
 /*
- * Wait for all workers to exit.
+ * Wait for all workers to finish computing.
  *
  * Even if the parallel operation seems to have completed successfully, it's
  * important to call this function afterwards.  We must not miss any errors
@@ -552,6 +547,46 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt)
        }
 }
 
+/*
+ * Wait for all workers to exit.
+ *
+ * This function ensures that workers have been completely shutdown.  The
+ * difference between WaitForParallelWorkersToFinish and this function is
+ * that former just ensures that last message sent by worker backend is
+ * received by master backend whereas this ensures the complete shutdown.
+ */
+static void
+WaitForParallelWorkersToExit(ParallelContext *pcxt)
+{
+       int                     i;
+
+       /* Wait until the workers actually die. */
+       for (i = 0; i < pcxt->nworkers; ++i)
+       {
+               BgwHandleStatus status;
+
+               if (pcxt->worker == NULL || pcxt->worker[i].bgwhandle == NULL)
+                       continue;
+
+               status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle);
+
+               /*
+                * If the postmaster kicked the bucket, we have no chance of cleaning
+                * up safely -- we won't be able to tell when our workers are actually
+                * dead.  This doesn't necessitate a PANIC since they will all abort
+                * eventually, but we can't safely continue this session.
+                */
+               if (status == BGWH_POSTMASTER_DIED)
+                       ereport(FATAL,
+                                       (errcode(ERRCODE_ADMIN_SHUTDOWN),
+                                errmsg("postmaster exited during a parallel transaction")));
+
+               /* Release memory. */
+               pfree(pcxt->worker[i].bgwhandle);
+               pcxt->worker[i].bgwhandle = NULL;
+       }
+}
+
 /*
  * Destroy a parallel context.
  *
@@ -578,10 +613,10 @@ DestroyParallelContext(ParallelContext *pcxt)
        {
                for (i = 0; i < pcxt->nworkers; ++i)
                {
-                       if (pcxt->worker[i].bgwhandle != NULL)
-                               TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
                        if (pcxt->worker[i].error_mqh != NULL)
                        {
+                               TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
+
                                pfree(pcxt->worker[i].error_mqh);
                                pcxt->worker[i].error_mqh = NULL;
                        }
@@ -609,38 +644,14 @@ DestroyParallelContext(ParallelContext *pcxt)
                pcxt->private_memory = NULL;
        }
 
-       /* Wait until the workers actually die. */
-       for (i = 0; i < pcxt->nworkers; ++i)
-       {
-               BgwHandleStatus status;
-
-               if (pcxt->worker == NULL || pcxt->worker[i].bgwhandle == NULL)
-                       continue;
-
-               /*
-                * We can't finish transaction commit or abort until all of the
-                * workers are dead.  This means, in particular, that we can't respond
-                * to interrupts at this stage.
-                */
-               HOLD_INTERRUPTS();
-               status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle);
-               RESUME_INTERRUPTS();
-
-               /*
-                * If the postmaster kicked the bucket, we have no chance of cleaning
-                * up safely -- we won't be able to tell when our workers are actually
-                * dead.  This doesn't necessitate a PANIC since they will all abort
-                * eventually, but we can't safely continue this session.
-                */
-               if (status == BGWH_POSTMASTER_DIED)
-                       ereport(FATAL,
-                                       (errcode(ERRCODE_ADMIN_SHUTDOWN),
-                                errmsg("postmaster exited during a parallel transaction")));
-
-               /* Release memory. */
-               pfree(pcxt->worker[i].bgwhandle);
-               pcxt->worker[i].bgwhandle = NULL;
-       }
+       /*
+        * We can't finish transaction commit or abort until all of the
+        * workers have exited.  This means, in particular, that we can't respond
+        * to interrupts at this stage.
+        */
+       HOLD_INTERRUPTS();
+       WaitForParallelWorkersToExit(pcxt);
+       RESUME_INTERRUPTS();
 
        /* Free the worker array itself. */
        if (pcxt->worker != NULL)
@@ -799,9 +810,7 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
 
                case 'X':                               /* Terminate, indicating clean exit */
                        {
-                               pfree(pcxt->worker[i].bgwhandle);
                                pfree(pcxt->worker[i].error_mqh);
-                               pcxt->worker[i].bgwhandle = NULL;
                                pcxt->worker[i].error_mqh = NULL;
                                break;
                        }
index efcbaef416c7b29d00de0cb5367bc326944e6f0e..99a9de3cdc397fde4dab1623cdb968508ff87445 100644 (file)
@@ -84,7 +84,8 @@ static bool ExecParallelEstimate(PlanState *node,
                                         ExecParallelEstimateContext *e);
 static bool ExecParallelInitializeDSM(PlanState *node,
                                         ExecParallelInitializeDSMContext *d);
-static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt);
+static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
+                                                        bool reinitialize);
 static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
                                                  SharedExecutorInstrumentation *instrumentation);
 
@@ -217,7 +218,7 @@ ExecParallelInitializeDSM(PlanState *planstate,
  * to the main backend and start the workers.
  */
 static shm_mq_handle **
-ExecParallelSetupTupleQueues(ParallelContext *pcxt)
+ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
 {
        shm_mq_handle **responseq;
        char       *tqueuespace;
@@ -231,9 +232,16 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt)
        responseq = (shm_mq_handle **)
                palloc(pcxt->nworkers * sizeof(shm_mq_handle *));
 
-       /* Allocate space from the DSM for the queues themselves. */
-       tqueuespace = shm_toc_allocate(pcxt->toc,
-                                                                PARALLEL_TUPLE_QUEUE_SIZE * pcxt->nworkers);
+       /*
+        * If not reinitializing, allocate space from the DSM for the queues;
+        * otherwise, find the already allocated space.
+        */
+       if (!reinitialize)
+               tqueuespace =
+                       shm_toc_allocate(pcxt->toc,
+                                                        PARALLEL_TUPLE_QUEUE_SIZE * pcxt->nworkers);
+       else
+               tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE);
 
        /* Create the queues, and become the receiver for each. */
        for (i = 0; i < pcxt->nworkers; ++i)
@@ -248,12 +256,23 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt)
        }
 
        /* Add array of queues to shm_toc, so others can find it. */
-       shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
+       if (!reinitialize)
+               shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
 
        /* Return array of handles. */
        return responseq;
 }
 
+/*
+ * Re-initialize the response queues for backend workers to return tuples
+ * to the main backend and start the workers.
+ */
+shm_mq_handle **
+ExecParallelReinitializeTupleQueues(ParallelContext *pcxt)
+{
+       return ExecParallelSetupTupleQueues(pcxt, true);
+}
+
 /*
  * Sets up the required infrastructure for backend workers to perform
  * execution and return results to the main backend.
@@ -363,7 +382,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
        pei->buffer_usage = bufusage_space;
 
        /* Set up tuple queues. */
-       pei->tqueue = ExecParallelSetupTupleQueues(pcxt);
+       pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
 
        /*
         * If instrumentation options were supplied, allocate space for the
index 9c1533e311341f04b295785a6b9691f69c0b881b..5f589614dc2a56c6c2009ff3745369f61e7cac76 100644 (file)
@@ -41,6 +41,7 @@
 
 
 static TupleTableSlot *gather_getnext(GatherState *gatherstate);
+static void ExecShutdownGatherWorkers(GatherState *node);
 
 
 /* ----------------------------------------------------------------
@@ -150,9 +151,10 @@ ExecGather(GatherState *node)
                        bool    got_any_worker = false;
 
                        /* Initialize the workers required to execute Gather node. */
-                       node->pei = ExecInitParallelPlan(node->ps.lefttree,
-                                                                                        estate,
-                                                                                        gather->num_workers);
+                       if (!node->pei)
+                               node->pei = ExecInitParallelPlan(node->ps.lefttree,
+                                                                                                estate,
+                                                                                                gather->num_workers);
 
                        /*
                         * Register backend workers. We might not get as many as we
@@ -279,7 +281,7 @@ gather_getnext(GatherState *gatherstate)
                                                                           gatherstate->need_to_scan_locally,
                                                                           &done);
                        if (done)
-                               ExecShutdownGather(gatherstate);
+                               ExecShutdownGatherWorkers(gatherstate);
 
                        if (HeapTupleIsValid(tup))
                        {
@@ -308,15 +310,15 @@ gather_getnext(GatherState *gatherstate)
 }
 
 /* ----------------------------------------------------------------
- *             ExecShutdownGather
+ *             ExecShutdownGatherWorkers
  *
- *             Destroy the setup for parallel workers.  Collect all the
- *             stats after workers are stopped, else some work done by
- *             workers won't be accounted.
+ *             Destroy the parallel workers.  Collect all the stats after
+ *             workers are stopped, else some work done by workers won't be
+ *             accounted.
  * ----------------------------------------------------------------
  */
 void
-ExecShutdownGather(GatherState *node)
+ExecShutdownGatherWorkers(GatherState *node)
 {
        /* Shut down tuple queue funnel before shutting down workers. */
        if (node->funnel != NULL)
@@ -327,8 +329,25 @@ ExecShutdownGather(GatherState *node)
 
        /* Now shut down the workers. */
        if (node->pei != NULL)
-       {
                ExecParallelFinish(node->pei);
+}
+
+/* ----------------------------------------------------------------
+ *             ExecShutdownGather
+ *
+ *             Destroy the setup for parallel workers including parallel context.
+ *             Collect all the stats after workers are stopped, else some work
+ *             done by workers won't be accounted.
+ * ----------------------------------------------------------------
+ */
+void
+ExecShutdownGather(GatherState *node)
+{
+       ExecShutdownGatherWorkers(node);
+
+       /* Now destroy the parallel context. */
+       if (node->pei != NULL)
+       {
                ExecParallelCleanup(node->pei);
                node->pei = NULL;
        }
@@ -349,14 +368,21 @@ void
 ExecReScanGather(GatherState *node)
 {
        /*
-        * Re-initialize the parallel context and workers to perform rescan of
-        * relation.  We want to gracefully shutdown all the workers so that they
+        * Re-initialize the parallel workers to perform rescan of relation.
+        * We want to gracefully shutdown all the workers so that they
         * should be able to propagate any error or other information to master
-        * backend before dying.
+        * backend before dying.  Parallel context will be reused for rescan.
         */
-       ExecShutdownGather(node);
+       ExecShutdownGatherWorkers(node);
 
        node->initialized = false;
 
+       if (node->pei)
+       {
+               ReinitializeParallelDSM(node->pei->pcxt);
+               node->pei->tqueue =
+                               ExecParallelReinitializeTupleQueues(node->pei->pcxt);
+       }
+
        ExecReScan(node->ps.lefttree);
 }
index d4b7c5dd75b4a7361df1597df10f0b198e618330..411db7964dbbefed2b3e58bcec4a8c43b2921a59 100644 (file)
@@ -56,6 +56,7 @@ extern bool InitializingParallelWorker;
 extern ParallelContext *CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers);
 extern ParallelContext *CreateParallelContextForExternalFunction(char *library_name, char *function_name, int nworkers);
 extern void InitializeParallelDSM(ParallelContext *);
+extern void ReinitializeParallelDSM(ParallelContext *pcxt);
 extern void LaunchParallelWorkers(ParallelContext *);
 extern void WaitForParallelWorkersToFinish(ParallelContext *);
 extern void DestroyParallelContext(ParallelContext *);
index 505500e76b54898bbe12105a51da6a8ef91a7154..23c29ebb9027de0c40794d82b951e09eda5d0958 100644 (file)
@@ -33,5 +33,6 @@ extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate,
                                         EState *estate, int nworkers);
 extern void ExecParallelFinish(ParallelExecutorInfo *pei);
 extern void ExecParallelCleanup(ParallelExecutorInfo *pei);
+extern shm_mq_handle **ExecParallelReinitializeTupleQueues(ParallelContext *pcxt);
 
 #endif   /* EXECPARALLEL_H */