1 /*-------------------------------------------------------------------------
4 * Infrastructure for launching parallel workers
6 * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
10 * src/backend/access/transam/parallel.c
12 *-------------------------------------------------------------------------
17 #include "access/xact.h"
18 #include "access/xlog.h"
19 #include "access/parallel.h"
20 #include "commands/async.h"
21 #include "libpq/libpq.h"
22 #include "libpq/pqformat.h"
23 #include "libpq/pqmq.h"
24 #include "miscadmin.h"
25 #include "storage/ipc.h"
26 #include "storage/sinval.h"
27 #include "storage/spin.h"
28 #include "tcop/tcopprot.h"
29 #include "utils/combocid.h"
30 #include "utils/guc.h"
31 #include "utils/memutils.h"
32 #include "utils/resowner.h"
33 #include "utils/snapmgr.h"
36 * We don't want to waste a lot of memory on an error queue which, most of
37 * the time, will process only a handful of small messages. However, it is
38 * desirable to make it large enough that a typical ErrorResponse can be sent
39 * without blocking. That way, a worker that errors out can write the whole
40 * message into the queue and terminate without waiting for the user backend.
42 #define PARALLEL_ERROR_QUEUE_SIZE 16384
44 /* Magic number for parallel context TOC. */
45 #define PARALLEL_MAGIC 0x50477c7c
48 * Magic numbers for parallel state sharing. Higher-level code should use
49 * smaller values, leaving these very large ones for use by this module.
51 #define PARALLEL_KEY_FIXED UINT64CONST(0xFFFFFFFFFFFF0001)
52 #define PARALLEL_KEY_ERROR_QUEUE UINT64CONST(0xFFFFFFFFFFFF0002)
53 #define PARALLEL_KEY_LIBRARY UINT64CONST(0xFFFFFFFFFFFF0003)
54 #define PARALLEL_KEY_GUC UINT64CONST(0xFFFFFFFFFFFF0004)
55 #define PARALLEL_KEY_COMBO_CID UINT64CONST(0xFFFFFFFFFFFF0005)
56 #define PARALLEL_KEY_TRANSACTION_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0006)
57 #define PARALLEL_KEY_ACTIVE_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0007)
58 #define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008)
59 #define PARALLEL_KEY_EXTENSION_TRAMPOLINE UINT64CONST(0xFFFFFFFFFFFF0009)
61 /* Fixed-size parallel state. */
62 typedef struct FixedParallelState
64 /* Fixed-size state that workers must restore. */
66 Oid authenticated_user_id;
69 PGPROC *parallel_master_pgproc;
70 pid_t parallel_master_pid;
71 BackendId parallel_master_backend_id;
73 /* Entrypoint for parallel workers. */
74 parallel_worker_main_type entrypoint;
76 /* Mutex protects remaining fields. */
79 /* Track whether workers have attached. */
83 /* Maximum XactLastRecEnd of any worker. */
84 XLogRecPtr last_xlog_end;
88 * Our parallel worker number. We initialize this to -1, meaning that we are
89 * not a parallel worker. In parallel workers, it will be set to a value >= 0
90 * and < the number of workers before any user code is invoked; each parallel
91 * worker will get a different parallel worker number.
93 int ParallelWorkerNumber = -1;
95 /* Is there a parallel message pending which we need to receive? */
96 bool ParallelMessagePending = false;
98 /* Pointer to our fixed parallel state. */
99 static FixedParallelState *MyFixedParallelState;
101 /* List of active parallel contexts. */
102 static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list);
104 /* Private functions. */
105 static void HandleParallelMessage(ParallelContext *, int, StringInfo msg);
106 static void ParallelErrorContext(void *arg);
107 static void ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc);
108 static void ParallelWorkerMain(Datum main_arg);
111 * Establish a new parallel context. This should be done after entering
112 * parallel mode, and (unless there is an error) the context should be
113 * destroyed before exiting the current subtransaction.
116 CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers)
118 MemoryContext oldcontext;
119 ParallelContext *pcxt;
121 /* It is unsafe to create a parallel context if not in parallel mode. */
122 Assert(IsInParallelMode());
124 /* Number of workers should be non-negative. */
125 Assert(nworkers >= 0);
128 * If dynamic shared memory is not available, we won't be able to use
129 * background workers.
131 if (dynamic_shared_memory_type == DSM_IMPL_NONE)
134 /* We might be running in a short-lived memory context. */
135 oldcontext = MemoryContextSwitchTo(TopTransactionContext);
137 /* Initialize a new ParallelContext. */
138 pcxt = palloc0(sizeof(ParallelContext));
139 pcxt->subid = GetCurrentSubTransactionId();
140 pcxt->nworkers = nworkers;
141 pcxt->entrypoint = entrypoint;
142 pcxt->error_context_stack = error_context_stack;
143 shm_toc_initialize_estimator(&pcxt->estimator);
144 dlist_push_head(&pcxt_list, &pcxt->node);
146 /* Restore previous memory context. */
147 MemoryContextSwitchTo(oldcontext);
153 * Establish a new parallel context that calls a function provided by an
154 * extension. This works around the fact that the library might get mapped
155 * at a different address in each backend.
158 CreateParallelContextForExternalFunction(char *library_name,
162 MemoryContext oldcontext;
163 ParallelContext *pcxt;
165 /* We might be running in a very short-lived memory context. */
166 oldcontext = MemoryContextSwitchTo(TopTransactionContext);
168 /* Create the context. */
169 pcxt = CreateParallelContext(ParallelExtensionTrampoline, nworkers);
170 pcxt->library_name = pstrdup(library_name);
171 pcxt->function_name = pstrdup(function_name);
173 /* Restore previous memory context. */
174 MemoryContextSwitchTo(oldcontext);
180 * Establish the dynamic shared memory segment for a parallel context and
181 * copied state and other bookkeeping information that will need by parallel
185 InitializeParallelDSM(ParallelContext *pcxt)
187 MemoryContext oldcontext;
188 Size library_len = 0;
190 Size combocidlen = 0;
196 FixedParallelState *fps;
197 Snapshot transaction_snapshot = GetTransactionSnapshot();
198 Snapshot active_snapshot = GetActiveSnapshot();
200 /* We might be running in a very short-lived memory context. */
201 oldcontext = MemoryContextSwitchTo(TopTransactionContext);
203 /* Allow space to store the fixed-size parallel state. */
204 shm_toc_estimate_chunk(&pcxt->estimator, sizeof(FixedParallelState));
205 shm_toc_estimate_keys(&pcxt->estimator, 1);
208 * Normally, the user will have requested at least one worker process,
209 * but if by chance they have not, we can skip a bunch of things here.
211 if (pcxt->nworkers > 0)
213 /* Estimate space for various kinds of state sharing. */
214 library_len = EstimateLibraryStateSpace();
215 shm_toc_estimate_chunk(&pcxt->estimator, library_len);
216 guc_len = EstimateGUCStateSpace();
217 shm_toc_estimate_chunk(&pcxt->estimator, guc_len);
218 combocidlen = EstimateComboCIDStateSpace();
219 shm_toc_estimate_chunk(&pcxt->estimator, combocidlen);
220 tsnaplen = EstimateSnapshotSpace(transaction_snapshot);
221 shm_toc_estimate_chunk(&pcxt->estimator, tsnaplen);
222 asnaplen = EstimateSnapshotSpace(active_snapshot);
223 shm_toc_estimate_chunk(&pcxt->estimator, asnaplen);
224 tstatelen = EstimateTransactionStateSpace();
225 shm_toc_estimate_chunk(&pcxt->estimator, tstatelen);
226 /* If you add more chunks here, you probably need to add keys. */
227 shm_toc_estimate_keys(&pcxt->estimator, 6);
229 /* Estimate space need for error queues. */
230 StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) ==
231 PARALLEL_ERROR_QUEUE_SIZE,
232 "parallel error queue size not buffer-aligned");
233 shm_toc_estimate_chunk(&pcxt->estimator,
234 PARALLEL_ERROR_QUEUE_SIZE * pcxt->nworkers);
235 shm_toc_estimate_keys(&pcxt->estimator, 1);
237 /* Estimate how much we'll need for extension entrypoint info. */
238 if (pcxt->library_name != NULL)
240 Assert(pcxt->entrypoint == ParallelExtensionTrampoline);
241 Assert(pcxt->function_name != NULL);
242 shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name)
243 + strlen(pcxt->function_name) + 2);
244 shm_toc_estimate_keys(&pcxt->estimator, 1);
249 * Create DSM and initialize with new table of contents. But if the user
250 * didn't request any workers, then don't bother creating a dynamic shared
251 * memory segment; instead, just use backend-private memory.
253 * Also, if we can't create a dynamic shared memory segment because the
254 * maximum number of segments have already been created, then fall back
255 * to backend-private memory, and plan not to use any workers. We hope
256 * this won't happen very often, but it's better to abandon the use of
257 * parallelism than to fail outright.
259 segsize = shm_toc_estimate(&pcxt->estimator);
260 if (pcxt->nworkers != 0)
261 pcxt->seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS);
262 if (pcxt->seg != NULL)
263 pcxt->toc = shm_toc_create(PARALLEL_MAGIC,
264 dsm_segment_address(pcxt->seg),
269 pcxt->private_memory = MemoryContextAlloc(TopMemoryContext, segsize);
270 pcxt->toc = shm_toc_create(PARALLEL_MAGIC, pcxt->private_memory,
274 /* Initialize fixed-size state in shared memory. */
275 fps = (FixedParallelState *)
276 shm_toc_allocate(pcxt->toc, sizeof(FixedParallelState));
277 fps->database_id = MyDatabaseId;
278 fps->authenticated_user_id = GetAuthenticatedUserId();
279 GetUserIdAndSecContext(&fps->current_user_id, &fps->sec_context);
280 fps->parallel_master_pgproc = MyProc;
281 fps->parallel_master_pid = MyProcPid;
282 fps->parallel_master_backend_id = MyBackendId;
283 fps->entrypoint = pcxt->entrypoint;
284 SpinLockInit(&fps->mutex);
285 fps->workers_expected = pcxt->nworkers;
286 fps->workers_attached = 0;
287 fps->last_xlog_end = 0;
288 shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps);
290 /* We can skip the rest of this if we're not budgeting for any workers. */
291 if (pcxt->nworkers > 0)
299 char *error_queue_space;
301 /* Serialize shared libraries we have loaded. */
302 libraryspace = shm_toc_allocate(pcxt->toc, library_len);
303 SerializeLibraryState(library_len, libraryspace);
304 shm_toc_insert(pcxt->toc, PARALLEL_KEY_LIBRARY, libraryspace);
306 /* Serialize GUC settings. */
307 gucspace = shm_toc_allocate(pcxt->toc, guc_len);
308 SerializeGUCState(guc_len, gucspace);
309 shm_toc_insert(pcxt->toc, PARALLEL_KEY_GUC, gucspace);
311 /* Serialize combo CID state. */
312 combocidspace = shm_toc_allocate(pcxt->toc, combocidlen);
313 SerializeComboCIDState(combocidlen, combocidspace);
314 shm_toc_insert(pcxt->toc, PARALLEL_KEY_COMBO_CID, combocidspace);
316 /* Serialize transaction snapshot and active snapshot. */
317 tsnapspace = shm_toc_allocate(pcxt->toc, tsnaplen);
318 SerializeSnapshot(transaction_snapshot, tsnapspace);
319 shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT,
321 asnapspace = shm_toc_allocate(pcxt->toc, asnaplen);
322 SerializeSnapshot(active_snapshot, asnapspace);
323 shm_toc_insert(pcxt->toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, asnapspace);
325 /* Serialize transaction state. */
326 tstatespace = shm_toc_allocate(pcxt->toc, tstatelen);
327 SerializeTransactionState(tstatelen, tstatespace);
328 shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_STATE, tstatespace);
330 /* Allocate space for worker information. */
331 pcxt->worker = palloc0(sizeof(ParallelWorkerInfo) * pcxt->nworkers);
334 * Establish error queues in dynamic shared memory.
336 * These queues should be used only for transmitting ErrorResponse,
337 * NoticeResponse, and NotifyResponse protocol messages. Tuple data
338 * should be transmitted via separate (possibly larger?) queues.
341 shm_toc_allocate(pcxt->toc,
342 PARALLEL_ERROR_QUEUE_SIZE * pcxt->nworkers);
343 for (i = 0; i < pcxt->nworkers; ++i)
348 start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
349 mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
350 shm_mq_set_receiver(mq, MyProc);
351 pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
353 shm_toc_insert(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, error_queue_space);
355 /* Serialize extension entrypoint information. */
356 if (pcxt->library_name != NULL)
358 Size lnamelen = strlen(pcxt->library_name);
359 char *extensionstate;
361 extensionstate = shm_toc_allocate(pcxt->toc, lnamelen
362 + strlen(pcxt->function_name) + 2);
363 strcpy(extensionstate, pcxt->library_name);
364 strcpy(extensionstate + lnamelen + 1, pcxt->function_name);
365 shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXTENSION_TRAMPOLINE,
370 /* Restore previous memory context. */
371 MemoryContextSwitchTo(oldcontext);
375 * Launch parallel workers.
378 LaunchParallelWorkers(ParallelContext *pcxt)
380 MemoryContext oldcontext;
381 BackgroundWorker worker;
383 bool any_registrations_failed = false;
385 /* Skip this if we have no workers. */
386 if (pcxt->nworkers == 0)
389 /* If we do have workers, we'd better have a DSM segment. */
390 Assert(pcxt->seg != NULL);
392 /* We might be running in a short-lived memory context. */
393 oldcontext = MemoryContextSwitchTo(TopTransactionContext);
395 /* Configure a worker. */
396 snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d",
399 BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
400 worker.bgw_start_time = BgWorkerStart_ConsistentState;
401 worker.bgw_restart_time = BGW_NEVER_RESTART;
402 worker.bgw_main = ParallelWorkerMain;
403 worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(pcxt->seg));
404 worker.bgw_notify_pid = MyProcPid;
409 * The caller must be able to tolerate ending up with fewer workers than
410 * expected, so there is no need to throw an error here if registration
411 * fails. It wouldn't help much anyway, because registering the worker
412 * in no way guarantees that it will start up and initialize successfully.
414 for (i = 0; i < pcxt->nworkers; ++i)
416 if (!any_registrations_failed &&
417 RegisterDynamicBackgroundWorker(&worker,
418 &pcxt->worker[i].bgwhandle))
419 shm_mq_set_handle(pcxt->worker[i].error_mqh,
420 pcxt->worker[i].bgwhandle);
424 * If we weren't able to register the worker, then we've bumped
425 * up against the max_worker_processes limit, and future
426 * registrations will probably fail too, so arrange to skip them.
427 * But we still have to execute this code for the remaining slots
428 * to make sure that we forget about the error queues we budgeted
429 * for those workers. Otherwise, we'll wait for them to start,
430 * but they never will.
432 any_registrations_failed = true;
433 pcxt->worker[i].bgwhandle = NULL;
434 pcxt->worker[i].error_mqh = NULL;
438 /* Restore previous memory context. */
439 MemoryContextSwitchTo(oldcontext);
443 * Wait for all workers to exit.
445 * Even if the parallel operation seems to have completed successfully, it's
446 * important to call this function afterwards. We must not miss any errors
447 * the workers may have thrown during the parallel operation, or any that they
448 * may yet throw while shutting down.
450 * Also, we want to update our notion of XactLastRecEnd based on worker
454 WaitForParallelWorkersToFinish(ParallelContext *pcxt)
458 bool anyone_alive = false;
462 * This will process any parallel messages that are pending, which
463 * may change the outcome of the loop that follows. It may also
464 * throw an error propagated from a worker.
466 CHECK_FOR_INTERRUPTS();
468 for (i = 0; i < pcxt->nworkers; ++i)
470 if (pcxt->worker[i].error_mqh != NULL)
480 WaitLatch(&MyProc->procLatch, WL_LATCH_SET, -1);
481 ResetLatch(&MyProc->procLatch);
484 if (pcxt->toc != NULL)
486 FixedParallelState *fps;
488 fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED);
489 if (fps->last_xlog_end > XactLastRecEnd)
490 XactLastRecEnd = fps->last_xlog_end;
495 * Destroy a parallel context.
497 * If expecting a clean exit, you should use WaitForParallelWorkersToFinish()
498 * first, before calling this function. When this function is invoked, any
499 * remaining workers are forcibly killed; the dynamic shared memory segment
500 * is unmapped; and we then wait (uninterruptibly) for the workers to exit.
503 DestroyParallelContext(ParallelContext *pcxt)
508 * Be careful about order of operations here! We remove the parallel
509 * context from the list before we do anything else; otherwise, if an
510 * error occurs during a subsequent step, we might try to nuke it again
511 * from AtEOXact_Parallel or AtEOSubXact_Parallel.
513 dlist_delete(&pcxt->node);
515 /* Kill each worker in turn, and forget their error queues. */
516 for (i = 0; i < pcxt->nworkers; ++i)
518 if (pcxt->worker[i].bgwhandle != NULL)
519 TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
520 if (pcxt->worker[i].error_mqh != NULL)
522 pfree(pcxt->worker[i].error_mqh);
523 pcxt->worker[i].error_mqh = NULL;
528 * If we have allocated a shared memory segment, detach it. This will
529 * implicitly detach the error queues, and any other shared memory queues,
532 if (pcxt->seg != NULL)
534 dsm_detach(pcxt->seg);
539 * If this parallel context is actually in backend-private memory rather
540 * than shared memory, free that memory instead.
542 if (pcxt->private_memory != NULL)
544 pfree(pcxt->private_memory);
545 pcxt->private_memory = NULL;
548 /* Wait until the workers actually die. */
549 for (i = 0; i < pcxt->nworkers; ++i)
551 BgwHandleStatus status;
553 if (pcxt->worker[i].bgwhandle == NULL)
557 * We can't finish transaction commit or abort until all of the
558 * workers are dead. This means, in particular, that we can't respond
559 * to interrupts at this stage.
562 status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle);
566 * If the postmaster kicked the bucket, we have no chance of cleaning
567 * up safely -- we won't be able to tell when our workers are actually
568 * dead. This doesn't necessitate a PANIC since they will all abort
569 * eventually, but we can't safely continue this session.
571 if (status == BGWH_POSTMASTER_DIED)
573 (errcode(ERRCODE_ADMIN_SHUTDOWN),
574 errmsg("postmaster exited during a parallel transaction")));
576 /* Release memory. */
577 pfree(pcxt->worker[i].bgwhandle);
578 pcxt->worker[i].bgwhandle = NULL;
581 /* Free the worker array itself. */
582 if (pcxt->worker != NULL)
593 * Are there any parallel contexts currently active?
596 ParallelContextActive(void)
598 return !dlist_is_empty(&pcxt_list);
602 * Handle receipt of an interrupt indicating a parallel worker message.
605 HandleParallelMessageInterrupt(void)
607 int save_errno = errno;
609 InterruptPending = true;
610 ParallelMessagePending = true;
617 * Handle any queued protocol messages received from parallel workers.
620 HandleParallelMessages(void)
624 ParallelMessagePending = false;
626 dlist_foreach(iter, &pcxt_list)
628 ParallelContext *pcxt;
633 pcxt = dlist_container(ParallelContext, node, iter.cur);
634 if (pcxt->worker == NULL)
637 for (i = 0; i < pcxt->nworkers; ++i)
640 * Read as many messages as we can from each worker, but stop
641 * when either (1) the error queue goes away, which can happen if
642 * we receive a Terminate message from the worker; or (2) no more
643 * messages can be read from the worker without blocking.
645 while (pcxt->worker[i].error_mqh != NULL)
649 res = shm_mq_receive(pcxt->worker[i].error_mqh, &nbytes,
651 if (res == SHM_MQ_WOULD_BLOCK)
653 else if (res == SHM_MQ_SUCCESS)
657 initStringInfo(&msg);
658 appendBinaryStringInfo(&msg, data, nbytes);
659 HandleParallelMessage(pcxt, i, &msg);
664 (errcode(ERRCODE_INTERNAL_ERROR), /* XXX: wrong errcode? */
665 errmsg("lost connection to parallel worker")));
667 /* This might make the error queue go away. */
668 CHECK_FOR_INTERRUPTS();
675 * Handle a single protocol message received from a single parallel worker.
678 HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
682 msgtype = pq_getmsgbyte(msg);
686 case 'K': /* BackendKeyData */
688 int32 pid = pq_getmsgint(msg, 4);
689 (void) pq_getmsgint(msg, 4); /* discard cancel key */
690 (void) pq_getmsgend(msg);
691 pcxt->worker[i].pid = pid;
695 case 'E': /* ErrorResponse */
696 case 'N': /* NoticeResponse */
699 ErrorContextCallback errctx;
700 ErrorContextCallback *save_error_context_stack;
703 * Rethrow the error using the error context callbacks that
704 * were in effect when the context was created, not the
707 save_error_context_stack = error_context_stack;
708 errctx.callback = ParallelErrorContext;
709 errctx.arg = &pcxt->worker[i].pid;
710 errctx.previous = pcxt->error_context_stack;
711 error_context_stack = &errctx;
713 /* Parse ErrorReponse or NoticeResponse. */
714 pq_parse_errornotice(msg, &edata);
716 /* Death of a worker isn't enough justification for suicide. */
717 edata.elevel = Min(edata.elevel, ERROR);
719 /* Rethrow error or notice. */
720 ThrowErrorData(&edata);
722 /* Restore previous context. */
723 error_context_stack = save_error_context_stack;
728 case 'A': /* NotifyResponse */
730 /* Propagate NotifyResponse. */
731 pq_putmessage(msg->data[0], &msg->data[1], msg->len - 1);
735 case 'X': /* Terminate, indicating clean exit */
737 pfree(pcxt->worker[i].bgwhandle);
738 pfree(pcxt->worker[i].error_mqh);
739 pcxt->worker[i].bgwhandle = NULL;
740 pcxt->worker[i].error_mqh = NULL;
746 elog(ERROR, "unknown message type: %c (%d bytes)",
753 * End-of-subtransaction cleanup for parallel contexts.
755 * Currently, it's forbidden to enter or leave a subtransaction while
756 * parallel mode is in effect, so we could just blow away everything. But
757 * we may want to relax that restriction in the future, so this code
758 * contemplates that there may be multiple subtransaction IDs in pcxt_list.
761 AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId)
763 while (!dlist_is_empty(&pcxt_list))
765 ParallelContext *pcxt;
767 pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
768 if (pcxt->subid != mySubId)
771 elog(WARNING, "leaked parallel context");
772 DestroyParallelContext(pcxt);
777 * End-of-transaction cleanup for parallel contexts.
780 AtEOXact_Parallel(bool isCommit)
782 while (!dlist_is_empty(&pcxt_list))
784 ParallelContext *pcxt;
786 pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
788 elog(WARNING, "leaked parallel context");
789 DestroyParallelContext(pcxt);
794 * Main entrypoint for parallel workers.
797 ParallelWorkerMain(Datum main_arg)
801 FixedParallelState *fps;
802 char *error_queue_space;
811 StringInfoData msgbuf;
813 /* Establish signal handlers. */
814 pqsignal(SIGTERM, die);
815 BackgroundWorkerUnblockSignals();
817 /* Set up a memory context and resource owner. */
818 Assert(CurrentResourceOwner == NULL);
819 CurrentResourceOwner = ResourceOwnerCreate(NULL, "parallel toplevel");
820 CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext,
822 ALLOCSET_DEFAULT_MINSIZE,
823 ALLOCSET_DEFAULT_INITSIZE,
824 ALLOCSET_DEFAULT_MAXSIZE);
827 * Now that we have a resource owner, we can attach to the dynamic
828 * shared memory segment and read the table of contents.
830 seg = dsm_attach(DatumGetUInt32(main_arg));
833 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
834 errmsg("unable to map dynamic shared memory segment")));
835 toc = shm_toc_attach(PARALLEL_MAGIC, dsm_segment_address(seg));
838 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
839 errmsg("bad magic number in dynamic shared memory segment")));
841 /* Determine and set our worker number. */
842 fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED);
844 Assert(ParallelWorkerNumber == -1);
845 SpinLockAcquire(&fps->mutex);
846 if (fps->workers_attached < fps->workers_expected)
847 ParallelWorkerNumber = fps->workers_attached++;
848 SpinLockRelease(&fps->mutex);
849 if (ParallelWorkerNumber < 0)
851 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
852 errmsg("too many parallel workers already attached")));
853 MyFixedParallelState = fps;
856 * Now that we have a worker number, we can find and attach to the error
857 * queue provided for us. That's good, because until we do that, any
858 * errors that happen here will not be reported back to the process that
859 * requested that this worker be launched.
861 error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE);
862 mq = (shm_mq *) (error_queue_space +
863 ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE);
864 shm_mq_set_sender(mq, MyProc);
865 mqh = shm_mq_attach(mq, seg, NULL);
866 pq_redirect_to_shm_mq(mq, mqh);
867 pq_set_parallel_master(fps->parallel_master_pid,
868 fps->parallel_master_backend_id);
871 * Send a BackendKeyData message to the process that initiated parallelism
872 * so that it has access to our PID before it receives any other messages
873 * from us. Our cancel key is sent, too, since that's the way the protocol
874 * message is defined, but it won't actually be used for anything in this
877 pq_beginmessage(&msgbuf, 'K');
878 pq_sendint(&msgbuf, (int32) MyProcPid, sizeof(int32));
879 pq_sendint(&msgbuf, (int32) MyCancelKey, sizeof(int32));
880 pq_endmessage(&msgbuf);
883 * Hooray! Primary initialization is complete. Now, we need to set up
884 * our backend-local state to match the original backend.
888 * Load libraries that were loaded by original backend. We want to do this
889 * before restoring GUCs, because the libraries might define custom
892 libraryspace = shm_toc_lookup(toc, PARALLEL_KEY_LIBRARY);
893 Assert(libraryspace != NULL);
894 RestoreLibraryState(libraryspace);
896 /* Restore database connection. */
897 BackgroundWorkerInitializeConnectionByOid(fps->database_id,
898 fps->authenticated_user_id);
900 /* Restore GUC values from launching backend. */
901 gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC);
902 Assert(gucspace != NULL);
903 StartTransactionCommand();
904 RestoreGUCState(gucspace);
905 CommitTransactionCommand();
907 /* Crank up a transaction state appropriate to a parallel worker. */
908 tstatespace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_STATE);
909 StartParallelWorkerTransaction(tstatespace);
911 /* Restore combo CID state. */
912 combocidspace = shm_toc_lookup(toc, PARALLEL_KEY_COMBO_CID);
913 Assert(combocidspace != NULL);
914 RestoreComboCIDState(combocidspace);
916 /* Restore transaction snapshot. */
917 tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT);
918 Assert(tsnapspace != NULL);
919 RestoreTransactionSnapshot(RestoreSnapshot(tsnapspace),
920 fps->parallel_master_pgproc);
922 /* Restore active snapshot. */
923 asnapspace = shm_toc_lookup(toc, PARALLEL_KEY_ACTIVE_SNAPSHOT);
924 Assert(asnapspace != NULL);
925 PushActiveSnapshot(RestoreSnapshot(asnapspace));
927 /* Restore user ID and security context. */
928 SetUserIdAndSecContext(fps->current_user_id, fps->sec_context);
931 * We've initialized all of our state now; nothing should change hereafter.
936 * Time to do the real work: invoke the caller-supplied code.
938 * If you get a crash at this line, see the comments for
939 * ParallelExtensionTrampoline.
941 fps->entrypoint(seg, toc);
943 /* Must exit parallel mode to pop active snapshot. */
946 /* Must pop active snapshot so resowner.c doesn't complain. */
949 /* Shut down the parallel-worker transaction. */
950 EndParallelWorkerTransaction();
952 /* Report success. */
953 pq_putmessage('X', NULL, 0);
957 * It's unsafe for the entrypoint invoked by ParallelWorkerMain to be a
958 * function living in a dynamically loaded module, because the module might
959 * not be loaded in every process, or might be loaded but not at the same
960 * address. To work around that problem, CreateParallelContextForExtension()
961 * arranges to call this function rather than calling the extension-provided
962 * function directly; and this function then looks up the real entrypoint and
966 ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc)
968 char *extensionstate;
971 parallel_worker_main_type entrypt;
973 extensionstate = shm_toc_lookup(toc, PARALLEL_KEY_EXTENSION_TRAMPOLINE);
974 Assert(extensionstate != NULL);
975 library_name = extensionstate;
976 function_name = extensionstate + strlen(library_name) + 1;
978 entrypt = (parallel_worker_main_type)
979 load_external_function(library_name, function_name, true, NULL);
984 * Give the user a hint that this is a message propagated from a parallel
985 * worker. Otherwise, it can sometimes be confusing to understand what
989 ParallelErrorContext(void *arg)
991 errcontext("parallel worker, pid %d", * (int32 *) arg);
995 * Update shared memory with the ending location of the last WAL record we
996 * wrote, if it's greater than the value already stored there.
999 ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end)
1001 FixedParallelState *fps = MyFixedParallelState;
1003 Assert(fps != NULL);
1004 SpinLockAcquire(&fps->mutex);
1005 if (fps->last_xlog_end < last_xlog_end)
1006 fps->last_xlog_end = last_xlog_end;
1007 SpinLockRelease(&fps->mutex);