]> granicus.if.org Git - postgresql/commitdiff
Pass extra data to bgworkers, and use this to fix parallel contexts.
authorRobert Haas <rhaas@postgresql.org>
Thu, 5 Nov 2015 17:05:38 +0000 (12:05 -0500)
committerRobert Haas <rhaas@postgresql.org>
Thu, 5 Nov 2015 17:13:56 +0000 (12:13 -0500)
Up until now, the total amount of data that could be passed to a
background worker at startup was one datum, which can be a small as
4 bytes on some systems.  That's enough to pass a dsm_handle or an
array index, but not much else.  Add a bgw_extra flag to the
BackgroundWorker struct, allowing up to 128 bytes to be passed to
a new worker on any platform.

Use this to fix a problem I recently discovered with the parallel
context machinery added in 9.5: the master assigns each worker an
array index, and each worker subsequently assigns itself an array
index, and there's nothing to guarantee that the two sets of indexes
match, leading to chaos.

Normally, I would not back-patch the change to add bgw_extra, since it
is basically a feature addition.  However, since 9.5 is still in beta
and there seems to be no other sensible way to repair the broken
parallel context machinery, back-patch to 9.5.  Existing background
worker code can ignore the bgw_extra field without a problem, but
might need to be recompiled since the structure size has changed.

Report and patch by me.  Review by Amit Kapila.

doc/src/sgml/bgworker.sgml
src/backend/access/transam/parallel.c
src/backend/postmaster/bgworker.c
src/include/postmaster/bgworker.h

index c17d39857c2e870264abc00a64753ce63a2f1f08..505e362879ad10edd8c42633a39d00de61bbc83c 100644 (file)
@@ -58,6 +58,7 @@ typedef struct BackgroundWorker
     char        bgw_library_name[BGW_MAXLEN];   /* only if bgw_main is NULL */
     char        bgw_function_name[BGW_MAXLEN];  /* only if bgw_main is NULL */
     Datum       bgw_main_arg;
+    char        bgw_extra[BGW_EXTRALEN];
     int         bgw_notify_pid;
 } BackgroundWorker;
 </programlisting>
@@ -182,6 +183,13 @@ typedef struct BackgroundWorker
    new background worker process.
   </para>
 
+  <para>
+   <structfield>bgw_extra</structfield> can contain extra data to be passed
+   to the background worker.  Unlike <structfield>bgw_main_arg</>, this data
+   is not passed as an argument to the worker's main function, but it can be
+   accessed via <literal>MyBgworkerEntry</literal>, as discussed above.
+  </para>
+
   <para>
    <structfield>bgw_notify_pid</structfield> is the PID of a PostgreSQL
    backend process to which the postmaster should send <literal>SIGUSR1</>
index 79cc9880bbbacc175871c8a14827799d3615e002..5edaaf4bd2addca4c5f5e61fea9dd9e8047f7db4 100644 (file)
@@ -77,10 +77,6 @@ typedef struct FixedParallelState
        /* Mutex protects remaining fields. */
        slock_t         mutex;
 
-       /* Track whether workers have attached. */
-       int                     workers_expected;
-       int                     workers_attached;
-
        /* Maximum XactLastRecEnd of any worker. */
        XLogRecPtr      last_xlog_end;
 } FixedParallelState;
@@ -295,8 +291,6 @@ InitializeParallelDSM(ParallelContext *pcxt)
        fps->parallel_master_backend_id = MyBackendId;
        fps->entrypoint = pcxt->entrypoint;
        SpinLockInit(&fps->mutex);
-       fps->workers_expected = pcxt->nworkers;
-       fps->workers_attached = 0;
        fps->last_xlog_end = 0;
        shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps);
 
@@ -403,7 +397,6 @@ ReinitializeParallelDSM(ParallelContext *pcxt)
 
        /* Reset a few bits of fixed parallel state to a clean state. */
        fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED);
-       fps->workers_attached = 0;
        fps->last_xlog_end = 0;
 
        /* Recreate error queues. */
@@ -455,6 +448,7 @@ LaunchParallelWorkers(ParallelContext *pcxt)
        worker.bgw_main = ParallelWorkerMain;
        worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(pcxt->seg));
        worker.bgw_notify_pid = MyProcPid;
+       memset(&worker.bgw_extra, 0, BGW_EXTRALEN);
 
        /*
         * Start workers.
@@ -466,6 +460,7 @@ LaunchParallelWorkers(ParallelContext *pcxt)
         */
        for (i = 0; i < pcxt->nworkers; ++i)
        {
+               memcpy(worker.bgw_extra, &i, sizeof(int));
                if (!any_registrations_failed &&
                        RegisterDynamicBackgroundWorker(&worker,
                                                                                        &pcxt->worker[i].bgwhandle))
@@ -891,6 +886,10 @@ ParallelWorkerMain(Datum main_arg)
        pqsignal(SIGTERM, die);
        BackgroundWorkerUnblockSignals();
 
+       /* Determine and set our parallel worker number. */
+       Assert(ParallelWorkerNumber == -1);
+       memcpy(&ParallelWorkerNumber, MyBgworkerEntry->bgw_extra, sizeof(int));
+
        /* Set up a memory context and resource owner. */
        Assert(CurrentResourceOwner == NULL);
        CurrentResourceOwner = ResourceOwnerCreate(NULL, "parallel toplevel");
@@ -915,18 +914,9 @@ ParallelWorkerMain(Datum main_arg)
                                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                           errmsg("bad magic number in dynamic shared memory segment")));
 
-       /* Determine and set our worker number. */
+       /* Look up fixed parallel state. */
        fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED);
        Assert(fps != NULL);
-       Assert(ParallelWorkerNumber == -1);
-       SpinLockAcquire(&fps->mutex);
-       if (fps->workers_attached < fps->workers_expected)
-               ParallelWorkerNumber = fps->workers_attached++;
-       SpinLockRelease(&fps->mutex);
-       if (ParallelWorkerNumber < 0)
-               ereport(ERROR,
-                               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-                                errmsg("too many parallel workers already attached")));
        MyFixedParallelState = fps;
 
        /*
index cf505d6230c448cf8e7f7af84599fa7fbb2c159b..2a878eb6a6f5e3457d56563009d9b9a618da0cb7 100644 (file)
@@ -314,6 +314,7 @@ BackgroundWorkerStateChange(void)
                rw->rw_worker.bgw_restart_time = slot->worker.bgw_restart_time;
                rw->rw_worker.bgw_main = slot->worker.bgw_main;
                rw->rw_worker.bgw_main_arg = slot->worker.bgw_main_arg;
+               memcpy(rw->rw_worker.bgw_extra, slot->worker.bgw_extra, BGW_EXTRALEN);
 
                /*
                 * Copy the PID to be notified about state changes, but only if the
index f0a9530654532038f9ef2d3ddfd8069a337be34f..6e0b5cd9fc84887e4ddbc2fcda30456f781051dc 100644 (file)
@@ -74,6 +74,7 @@ typedef enum
 #define BGW_DEFAULT_RESTART_INTERVAL   60
 #define BGW_NEVER_RESTART                              -1
 #define BGW_MAXLEN                                             64
+#define BGW_EXTRALEN                                   128
 
 typedef struct BackgroundWorker
 {
@@ -85,6 +86,7 @@ typedef struct BackgroundWorker
        char            bgw_library_name[BGW_MAXLEN];   /* only if bgw_main is NULL */
        char            bgw_function_name[BGW_MAXLEN];  /* only if bgw_main is NULL */
        Datum           bgw_main_arg;
+       char            bgw_extra[BGW_EXTRALEN];
        pid_t           bgw_notify_pid; /* SIGUSR1 this backend on start/stop */
 } BackgroundWorker;