* WRKR_IDLE: it's waiting for a command
* WRKR_WORKING: it's working on a command
* WRKR_TERMINATED: process ended
+ * The pstate->te[] entry for each worker is valid when it's in WRKR_WORKING
+ * state, and must be NULL in other states.
*/
#include "postgres_fe.h"
#define NO_SLOT (-1) /* Failure result for GetIdleWorker() */
+/* Worker process statuses */
+typedef enum
+{
+ WRKR_IDLE,
+ WRKR_WORKING,
+ WRKR_TERMINATED
+} T_WorkerStatus;
+
+/*
+ * Private per-parallel-worker state (typedef for this is in parallel.h).
+ *
+ * Much of this is valid only in the master process (or, on Windows, should
+ * be touched only by the master thread). But the AH field should be touched
+ * only by workers. The pipe descriptors are valid everywhere.
+ */
+struct ParallelSlot
+{
+ T_WorkerStatus workerStatus; /* see enum above */
+
+ /* These fields are valid if workerStatus == WRKR_WORKING: */
+ ParallelCompletionPtr callback; /* function to call on completion */
+ void *callback_data; /* passthru data for it */
+
+ ArchiveHandle *AH; /* Archive data worker is using */
+
+ int pipeRead; /* master's end of the pipes */
+ int pipeWrite;
+ int pipeRevRead; /* child's end of the pipes */
+ int pipeRevWrite;
+
+ /* Child process/thread identity info: */
+#ifdef WIN32
+ uintptr_t hThread;
+ unsigned int threadId;
+#else
+ pid_t pid;
+#endif
+};
+
#ifdef WIN32
/*
}
#endif /* WIN32 */
- /* On all platforms, update workerStatus as well */
+ /* On all platforms, update workerStatus and te[] as well */
Assert(j < pstate->numWorkers);
slot->workerStatus = WRKR_TERMINATED;
+ pstate->te[j] = NULL;
}
}
{
ParallelState *pstate;
int i;
- const size_t slotSize = AH->public.numWorkers * sizeof(ParallelSlot);
Assert(AH->public.numWorkers > 0);
pstate = (ParallelState *) pg_malloc(sizeof(ParallelState));
pstate->numWorkers = AH->public.numWorkers;
+ pstate->te = NULL;
pstate->parallelSlot = NULL;
if (AH->public.numWorkers == 1)
return pstate;
- pstate->parallelSlot = (ParallelSlot *) pg_malloc(slotSize);
- memset((void *) pstate->parallelSlot, 0, slotSize);
+ pstate->te = (TocEntry **)
+ pg_malloc0(pstate->numWorkers * sizeof(TocEntry *));
+ pstate->parallelSlot = (ParallelSlot *)
+ pg_malloc0(pstate->numWorkers * sizeof(ParallelSlot));
#ifdef WIN32
/* Make fmtId() and fmtQualifiedId() use thread-local storage */
"could not create communication channels: %s\n",
strerror(errno));
+ pstate->te[i] = NULL; /* just for safety */
+
slot->workerStatus = WRKR_IDLE;
slot->AH = NULL;
- slot->te = NULL;
slot->callback = NULL;
slot->callback_data = NULL;
set_cancel_pstate(NULL);
/* Release state (mere neatnik-ism, since we're about to terminate) */
+ free(pstate->te);
free(pstate->parallelSlot);
free(pstate);
}
/* Remember worker is busy, and which TocEntry it's working on */
pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
- pstate->parallelSlot[worker].te = te;
pstate->parallelSlot[worker].callback = callback;
pstate->parallelSlot[worker].callback_data = callback_data;
+ pstate->te[worker] = te;
}
/*
if (messageStartsWith(msg, "OK "))
{
ParallelSlot *slot = &pstate->parallelSlot[worker];
- TocEntry *te = slot->te;
+ TocEntry *te = pstate->te[worker];
int status;
status = parseWorkerResponse(AH, te, msg);
slot->callback(AH, te, status, slot->callback_data);
slot->workerStatus = WRKR_IDLE;
- slot->te = NULL;
+ pstate->te[worker] = NULL;
}
else
exit_horribly(modulename,
WFW_ALL_IDLE
} WFW_WaitOption;
-/* Worker process statuses */
-typedef enum
-{
- WRKR_IDLE,
- WRKR_WORKING,
- WRKR_TERMINATED
-} T_WorkerStatus;
-
-/*
- * Per-parallel-worker state of parallel.c.
- *
- * Much of this is valid only in the master process (or, on Windows, should
- * be touched only by the master thread). But the AH field should be touched
- * only by workers. The pipe descriptors are valid everywhere.
- */
-typedef struct ParallelSlot
-{
- T_WorkerStatus workerStatus; /* see enum above */
-
- /* These fields are valid if workerStatus == WRKR_WORKING: */
- TocEntry *te; /* item being worked on */
- ParallelCompletionPtr callback; /* function to call on completion */
- void *callback_data; /* passthru data for it */
-
- ArchiveHandle *AH; /* Archive data worker is using */
-
- int pipeRead; /* master's end of the pipes */
- int pipeWrite;
- int pipeRevRead; /* child's end of the pipes */
- int pipeRevWrite;
-
- /* Child process/thread identity info: */
-#ifdef WIN32
- uintptr_t hThread;
- unsigned int threadId;
-#else
- pid_t pid;
-#endif
-} ParallelSlot;
+/* ParallelSlot is an opaque struct known only within parallel.c */
+typedef struct ParallelSlot ParallelSlot;
/* Overall state for parallel.c */
typedef struct ParallelState
{
int numWorkers; /* allowed number of workers */
- ParallelSlot *parallelSlot; /* array of numWorkers slots */
+ /* these arrays have numWorkers entries, one per worker: */
+ TocEntry **te; /* item being worked on, or NULL */
+ ParallelSlot *parallelSlot; /* private info about each worker */
} ParallelState;
#ifdef WIN32
for (k = 0; k < pstate->numWorkers; k++)
{
- if (pstate->parallelSlot[k].workerStatus == WRKR_WORKING &&
- pstate->parallelSlot[k].te->section == SECTION_DATA)
+ TocEntry *running_te = pstate->te[k];
+
+ if (running_te != NULL &&
+ running_te->section == SECTION_DATA)
count++;
}
if (pstate->numWorkers == 0 || count * 4 < pstate->numWorkers)
*/
for (i = 0; i < pstate->numWorkers; i++)
{
- TocEntry *running_te;
+ TocEntry *running_te = pstate->te[i];
- if (pstate->parallelSlot[i].workerStatus != WRKR_WORKING)
+ if (running_te == NULL)
continue;
- running_te = pstate->parallelSlot[i].te;
-
if (has_lock_conflicts(te, running_te) ||
has_lock_conflicts(running_te, te))
{