]> granicus.if.org Git - postgresql/commitdiff
Add new function WaitForParallelWorkersToAttach.
authorRobert Haas <rhaas@postgresql.org>
Fri, 2 Feb 2018 14:00:59 +0000 (09:00 -0500)
committerRobert Haas <rhaas@postgresql.org>
Fri, 2 Feb 2018 14:00:59 +0000 (09:00 -0500)
Once this function has been called, we know that all workers have
started and attached to their error queues -- so if any of them
subsequently exit uncleanly, we'll be sure to throw an ERROR promptly.
Otherwise, users of the ParallelContext machinery must be careful not
to wait forever for a worker that has failed to start.  Parallel query
manages to work without needing this for reasons explained in new
comments added by this patch, but it's a useful primitive for other
parallel operations, such as the pending patch to make creating a
btree index run in parallel.

Amit Kapila, revised by me.  Additional review by Peter Geoghegan.

Discussion: http://postgr.es/m/CAA4eK1+e2MzyouF5bg=OtyhDSX+=Ao=3htN=T-r_6s3gCtKFiw@mail.gmail.com

src/backend/access/transam/parallel.c
src/backend/executor/nodeGather.c
src/backend/executor/nodeGatherMerge.c
src/include/access/parallel.h

index 54d9ea7be056664dd3ce8971575677020c7099eb..5b45b07e7c1aa21c5918627c889296927ee523b8 100644 (file)
@@ -437,10 +437,11 @@ ReinitializeParallelDSM(ParallelContext *pcxt)
                WaitForParallelWorkersToFinish(pcxt);
                WaitForParallelWorkersToExit(pcxt);
                pcxt->nworkers_launched = 0;
-               if (pcxt->any_message_received)
+               if (pcxt->known_attached_workers)
                {
-                       pfree(pcxt->any_message_received);
-                       pcxt->any_message_received = NULL;
+                       pfree(pcxt->known_attached_workers);
+                       pcxt->known_attached_workers = NULL;
+                       pcxt->nknown_attached_workers = 0;
                }
        }
 
@@ -542,16 +543,147 @@ LaunchParallelWorkers(ParallelContext *pcxt)
 
        /*
         * Now that nworkers_launched has taken its final value, we can initialize
-        * any_message_received.
+        * known_attached_workers.
         */
        if (pcxt->nworkers_launched > 0)
-               pcxt->any_message_received =
+       {
+               pcxt->known_attached_workers =
                        palloc0(sizeof(bool) * pcxt->nworkers_launched);
+               pcxt->nknown_attached_workers = 0;
+       }
 
        /* Restore previous memory context. */
        MemoryContextSwitchTo(oldcontext);
 }
 
+/*
+ * Wait for all workers to attach to their error queues, and throw an error if
+ * any worker fails to do this.
+ *
+ * Callers can assume that if this function returns successfully, then the
+ * number of workers given by pcxt->nworkers_launched have initialized and
+ * attached to their error queues.  Whether or not these workers are guaranteed
+ * to still be running depends on what code the caller asked them to run;
+ * this function does not guarantee that they have not exited.  However, it
+ * does guarantee that any workers which exited must have done so cleanly and
+ * after successfully performing the work with which they were tasked.
+ *
+ * If this function is not called, then some of the workers that were launched
+ * may not have been started due to a fork() failure, or may have exited during
+ * early startup prior to attaching to the error queue, so nworkers_launched
+ * cannot be viewed as completely reliable.  It will never be less than the
+ * number of workers which actually started, but it might be more.  Any workers
+ * that failed to start will still be discovered by
+ * WaitForParallelWorkersToFinish and an error will be thrown at that time,
+ * provided that function is eventually reached.
+ *
+ * In general, the leader process should do as much work as possible before
+ * calling this function.  fork() failures and other early-startup failures
+ * are very uncommon, and having the leader sit idle when it could be doing
+ * useful work is undesirable.  However, if the leader needs to wait for
+ * all of its workers or for a specific worker, it may want to call this
+ * function before doing so.  If not, it must make some other provision for
+ * the failure-to-start case, lest it wait forever.  On the other hand, a
+ * leader which never waits for a worker that might not be started yet, or
+ * at least never does so prior to WaitForParallelWorkersToFinish(), need not
+ * call this function at all.
+ */
+void
+WaitForParallelWorkersToAttach(ParallelContext *pcxt)
+{
+       int                     i;
+
+       /* Skip this if we have no launched workers. */
+       if (pcxt->nworkers_launched == 0)
+               return;
+
+       for (;;)
+       {
+               /*
+                * This will process any parallel messages that are pending and it may
+                * also throw an error propagated from a worker.
+                */
+               CHECK_FOR_INTERRUPTS();
+
+               for (i = 0; i < pcxt->nworkers_launched; ++i)
+               {
+                       BgwHandleStatus status;
+                       shm_mq     *mq;
+                       int                     rc;
+                       pid_t           pid;
+
+                       if (pcxt->known_attached_workers[i])
+                               continue;
+
+                       /*
+                        * If error_mqh is NULL, then the worker has already exited
+                        * cleanly.
+                        */
+                       if (pcxt->worker[i].error_mqh == NULL)
+                       {
+                               pcxt->known_attached_workers[i] = true;
+                               ++pcxt->nknown_attached_workers;
+                               continue;
+                       }
+
+                       status = GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle, &pid);
+                       if (status == BGWH_STARTED)
+                       {
+                               /* Has the worker attached to the error queue? */
+                               mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
+                               if (shm_mq_get_sender(mq) != NULL)
+                               {
+                                       /* Yes, so it is known to be attached. */
+                                       pcxt->known_attached_workers[i] = true;
+                                       ++pcxt->nknown_attached_workers;
+                               }
+                       }
+                       else if (status == BGWH_STOPPED)
+                       {
+                               /*
+                                * If the worker stopped without attaching to the error queue,
+                                * throw an error.
+                                */
+                               mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
+                               if (shm_mq_get_sender(mq) == NULL)
+                                       ereport(ERROR,
+                                                       (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                                        errmsg("parallel worker failed to initialize"),
+                                                        errhint("More details may be available in the server log.")));
+
+                               pcxt->known_attached_workers[i] = true;
+                               ++pcxt->nknown_attached_workers;
+                       }
+                       else
+                       {
+                               /*
+                                * Worker not yet started, so we must wait.  The postmaster
+                                * will notify us if the worker's state changes.  Our latch
+                                * might also get set for some other reason, but if so we'll
+                                * just end up waiting for the same worker again.
+                                */
+                               rc = WaitLatch(MyLatch,
+                                                          WL_LATCH_SET | WL_POSTMASTER_DEATH,
+                                                          -1, WAIT_EVENT_BGWORKER_STARTUP);
+
+                               /* emergency bailout if postmaster has died */
+                               if (rc & WL_POSTMASTER_DEATH)
+                                       proc_exit(1);
+
+                               if (rc & WL_LATCH_SET)
+                                       ResetLatch(MyLatch);
+                       }
+               }
+
+               /* If all workers are known to have started, we're done. */
+               if (pcxt->nknown_attached_workers >= pcxt->nworkers_launched)
+               {
+                       Assert(pcxt->nknown_attached_workers == pcxt->nworkers_launched);
+                       break;
+               }
+       }
+}
+
 /*
  * Wait for all workers to finish computing.
  *
@@ -589,7 +721,7 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt)
                         */
                        if (pcxt->worker[i].error_mqh == NULL)
                                ++nfinished;
-                       else if (pcxt->any_message_received[i])
+                       else if (pcxt->known_attached_workers[i])
                        {
                                anyone_alive = true;
                                break;
@@ -909,8 +1041,12 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
 {
        char            msgtype;
 
-       if (pcxt->any_message_received != NULL)
-               pcxt->any_message_received[i] = true;
+       if (pcxt->known_attached_workers != NULL &&
+               !pcxt->known_attached_workers[i])
+       {
+               pcxt->known_attached_workers[i] = true;
+               pcxt->nknown_attached_workers++;
+       }
 
        msgtype = pq_getmsgbyte(msg);
 
index 89266b53712fce0f7c0382976bcd94052b75cc62..58eadd45b83a882257d2d1594b407dff2c0fb99b 100644 (file)
@@ -312,7 +312,14 @@ gather_readnext(GatherState *gatherstate)
                /* Check for async events, particularly messages from workers. */
                CHECK_FOR_INTERRUPTS();
 
-               /* Attempt to read a tuple, but don't block if none is available. */
+               /*
+                * Attempt to read a tuple, but don't block if none is available.
+                *
+                * Note that TupleQueueReaderNext will just return NULL for a worker
+                * which fails to initialize.  We'll treat that worker as having
+                * produced no tuples; WaitForParallelWorkersToFinish will error out
+                * when we get there.
+                */
                Assert(gatherstate->nextreader < gatherstate->nreaders);
                reader = gatherstate->reader[gatherstate->nextreader];
                tup = TupleQueueReaderNext(reader, true, &readerdone);
index a3e34c69800d56ef5298d9331b0bad1904fc6519..6858c91e8c268f6e690328ec3143fcff91c68428 100644 (file)
@@ -710,7 +710,14 @@ gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait,
        /* Check for async events, particularly messages from workers. */
        CHECK_FOR_INTERRUPTS();
 
-       /* Attempt to read a tuple. */
+       /*
+        * Attempt to read a tuple.
+        *
+        * Note that TupleQueueReaderNext will just return NULL for a worker which
+        * fails to initialize.  We'll treat that worker as having produced no
+        * tuples; WaitForParallelWorkersToFinish will error out when we get
+        * there.
+        */
        reader = gm_state->reader[nreader - 1];
        tup = TupleQueueReaderNext(reader, nowait, done);
 
index 32c2e32bea0e2453b829f78c339221c34892dd6f..d0c218b1854b8df98bea1142691701e4048850ee 100644 (file)
@@ -43,7 +43,8 @@ typedef struct ParallelContext
        void       *private_memory;
        shm_toc    *toc;
        ParallelWorkerInfo *worker;
-       bool       *any_message_received;
+       int                     nknown_attached_workers;
+       bool       *known_attached_workers;
 } ParallelContext;
 
 typedef struct ParallelWorkerContext
@@ -62,6 +63,7 @@ extern ParallelContext *CreateParallelContext(const char *library_name, const ch
 extern void InitializeParallelDSM(ParallelContext *pcxt);
 extern void ReinitializeParallelDSM(ParallelContext *pcxt);
 extern void LaunchParallelWorkers(ParallelContext *pcxt);
+extern void WaitForParallelWorkersToAttach(ParallelContext *pcxt);
 extern void WaitForParallelWorkersToFinish(ParallelContext *pcxt);
 extern void DestroyParallelContext(ParallelContext *pcxt);
 extern bool ParallelContextActive(void);