]> granicus.if.org Git - postgresql/commitdiff
Make struct ParallelSlot private within pg_dump/parallel.c.
authorTom Lane <tgl@sss.pgh.pa.us>
Tue, 27 Sep 2016 18:29:12 +0000 (14:29 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Tue, 27 Sep 2016 18:29:12 +0000 (14:29 -0400)
The only field of this struct that other files have any need to touch
is the pointer to the TocEntry a worker is working on.  (Well,
pg_backup_archiver.c is actually looking at workerStatus too, but that
can be finessed by specifying that the TocEntry pointer is NULL for a
non-busy worker.)

Hence, move out the TocEntry pointers to a separate array within
struct ParallelState, and then we can make struct ParallelSlot private.

I noted the possibility of this previously, but hadn't got round to
actually doing it.

Discussion: <1188.1464544443@sss.pgh.pa.us>

src/bin/pg_dump/parallel.c
src/bin/pg_dump/parallel.h
src/bin/pg_dump/pg_backup_archiver.c

index 0e2bfa106a741d99ff83d84459152c62d0c3ec87..5630dc626d75123465596e9cc96b7c054c0cb081 100644 (file)
@@ -45,6 +45,8 @@
  *             WRKR_IDLE: it's waiting for a command
  *             WRKR_WORKING: it's working on a command
  *             WRKR_TERMINATED: process ended
+ * The pstate->te[] entry for each worker is valid when it's in WRKR_WORKING
+ * state, and must be NULL in other states.
  */
 
 #include "postgres_fe.h"
 
 #define NO_SLOT (-1)                   /* Failure result for GetIdleWorker() */
 
+/* Worker process statuses */
+typedef enum
+{
+       WRKR_IDLE,
+       WRKR_WORKING,
+       WRKR_TERMINATED
+} T_WorkerStatus;
+
+/*
+ * Private per-parallel-worker state (typedef for this is in parallel.h).
+ *
+ * Much of this is valid only in the master process (or, on Windows, should
+ * be touched only by the master thread).  But the AH field should be touched
+ * only by workers.  The pipe descriptors are valid everywhere.
+ */
+struct ParallelSlot
+{
+       T_WorkerStatus workerStatus;    /* see enum above */
+
+       /* These fields are valid if workerStatus == WRKR_WORKING: */
+       ParallelCompletionPtr callback;         /* function to call on completion */
+       void       *callback_data;      /* passthru data for it */
+
+       ArchiveHandle *AH;                      /* Archive data worker is using */
+
+       int                     pipeRead;               /* master's end of the pipes */
+       int                     pipeWrite;
+       int                     pipeRevRead;    /* child's end of the pipes */
+       int                     pipeRevWrite;
+
+       /* Child process/thread identity info: */
+#ifdef WIN32
+       uintptr_t       hThread;
+       unsigned int threadId;
+#else
+       pid_t           pid;
+#endif
+};
+
 #ifdef WIN32
 
 /*
@@ -475,9 +516,10 @@ WaitForTerminatingWorkers(ParallelState *pstate)
                }
 #endif   /* WIN32 */
 
-               /* On all platforms, update workerStatus as well */
+               /* On all platforms, update workerStatus and te[] as well */
                Assert(j < pstate->numWorkers);
                slot->workerStatus = WRKR_TERMINATED;
+               pstate->te[j] = NULL;
        }
 }
 
@@ -870,20 +912,22 @@ ParallelBackupStart(ArchiveHandle *AH)
 {
        ParallelState *pstate;
        int                     i;
-       const size_t slotSize = AH->public.numWorkers * sizeof(ParallelSlot);
 
        Assert(AH->public.numWorkers > 0);
 
        pstate = (ParallelState *) pg_malloc(sizeof(ParallelState));
 
        pstate->numWorkers = AH->public.numWorkers;
+       pstate->te = NULL;
        pstate->parallelSlot = NULL;
 
        if (AH->public.numWorkers == 1)
                return pstate;
 
-       pstate->parallelSlot = (ParallelSlot *) pg_malloc(slotSize);
-       memset((void *) pstate->parallelSlot, 0, slotSize);
+       pstate->te = (TocEntry **)
+               pg_malloc0(pstate->numWorkers * sizeof(TocEntry *));
+       pstate->parallelSlot = (ParallelSlot *)
+               pg_malloc0(pstate->numWorkers * sizeof(ParallelSlot));
 
 #ifdef WIN32
        /* Make fmtId() and fmtQualifiedId() use thread-local storage */
@@ -929,9 +973,10 @@ ParallelBackupStart(ArchiveHandle *AH)
                                                  "could not create communication channels: %s\n",
                                                  strerror(errno));
 
+               pstate->te[i] = NULL;   /* just for safety */
+
                slot->workerStatus = WRKR_IDLE;
                slot->AH = NULL;
-               slot->te = NULL;
                slot->callback = NULL;
                slot->callback_data = NULL;
 
@@ -1062,6 +1107,7 @@ ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
        set_cancel_pstate(NULL);
 
        /* Release state (mere neatnik-ism, since we're about to terminate) */
+       free(pstate->te);
        free(pstate->parallelSlot);
        free(pstate);
 }
@@ -1201,9 +1247,9 @@ DispatchJobForTocEntry(ArchiveHandle *AH,
 
        /* Remember worker is busy, and which TocEntry it's working on */
        pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
-       pstate->parallelSlot[worker].te = te;
        pstate->parallelSlot[worker].callback = callback;
        pstate->parallelSlot[worker].callback_data = callback_data;
+       pstate->te[worker] = te;
 }
 
 /*
@@ -1394,13 +1440,13 @@ ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
        if (messageStartsWith(msg, "OK "))
        {
                ParallelSlot *slot = &pstate->parallelSlot[worker];
-               TocEntry   *te = slot->te;
+               TocEntry   *te = pstate->te[worker];
                int                     status;
 
                status = parseWorkerResponse(AH, te, msg);
                slot->callback(AH, te, status, slot->callback_data);
                slot->workerStatus = WRKR_IDLE;
-               slot->te = NULL;
+               pstate->te[worker] = NULL;
        }
        else
                exit_horribly(modulename,
index 8ee629b10689dfd7c0298960d98ef9c0cf3f051f..e0c442cf37794363e5699afba02b9486c1ee7d5c 100644 (file)
@@ -33,51 +33,16 @@ typedef enum
        WFW_ALL_IDLE
 } WFW_WaitOption;
 
-/* Worker process statuses */
-typedef enum
-{
-       WRKR_IDLE,
-       WRKR_WORKING,
-       WRKR_TERMINATED
-} T_WorkerStatus;
-
-/*
- * Per-parallel-worker state of parallel.c.
- *
- * Much of this is valid only in the master process (or, on Windows, should
- * be touched only by the master thread).  But the AH field should be touched
- * only by workers.  The pipe descriptors are valid everywhere.
- */
-typedef struct ParallelSlot
-{
-       T_WorkerStatus workerStatus;    /* see enum above */
-
-       /* These fields are valid if workerStatus == WRKR_WORKING: */
-       TocEntry   *te;                         /* item being worked on */
-       ParallelCompletionPtr callback;         /* function to call on completion */
-       void       *callback_data;      /* passthru data for it */
-
-       ArchiveHandle *AH;                      /* Archive data worker is using */
-
-       int                     pipeRead;               /* master's end of the pipes */
-       int                     pipeWrite;
-       int                     pipeRevRead;    /* child's end of the pipes */
-       int                     pipeRevWrite;
-
-       /* Child process/thread identity info: */
-#ifdef WIN32
-       uintptr_t       hThread;
-       unsigned int threadId;
-#else
-       pid_t           pid;
-#endif
-} ParallelSlot;
+/* ParallelSlot is an opaque struct known only within parallel.c */
+typedef struct ParallelSlot ParallelSlot;
 
 /* Overall state for parallel.c */
 typedef struct ParallelState
 {
        int                     numWorkers;             /* allowed number of workers */
-       ParallelSlot *parallelSlot; /* array of numWorkers slots */
+       /* these arrays have numWorkers entries, one per worker: */
+       TocEntry  **te;                         /* item being worked on, or NULL */
+       ParallelSlot *parallelSlot; /* private info about each worker */
 } ParallelState;
 
 #ifdef WIN32
index e19c24aec94f61def094bb73f8ff4521c11ca227..bba8b6ca9f90813e5193b8e9ba785eef37679eae 100644 (file)
@@ -4027,8 +4027,10 @@ get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
 
                for (k = 0; k < pstate->numWorkers; k++)
                {
-                       if (pstate->parallelSlot[k].workerStatus == WRKR_WORKING &&
-                               pstate->parallelSlot[k].te->section == SECTION_DATA)
+                       TocEntry   *running_te = pstate->te[k];
+
+                       if (running_te != NULL &&
+                               running_te->section == SECTION_DATA)
                                count++;
                }
                if (pstate->numWorkers == 0 || count * 4 < pstate->numWorkers)
@@ -4049,12 +4051,10 @@ get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
                 */
                for (i = 0; i < pstate->numWorkers; i++)
                {
-                       TocEntry   *running_te;
+                       TocEntry   *running_te = pstate->te[i];
 
-                       if (pstate->parallelSlot[i].workerStatus != WRKR_WORKING)
+                       if (running_te == NULL)
                                continue;
-                       running_te = pstate->parallelSlot[i].te;
-
                        if (has_lock_conflicts(te, running_te) ||
                                has_lock_conflicts(running_te, te))
                        {