1 /*-------------------------------------------------------------------------
4 * Infrastructure for launching parallel workers
6 * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
10 * src/backend/access/transam/parallel.c
12 *-------------------------------------------------------------------------
17 #include "access/nbtree.h"
18 #include "access/parallel.h"
19 #include "access/session.h"
20 #include "access/xact.h"
21 #include "access/xlog.h"
22 #include "catalog/index.h"
23 #include "catalog/namespace.h"
24 #include "commands/async.h"
25 #include "executor/execParallel.h"
26 #include "libpq/libpq.h"
27 #include "libpq/pqformat.h"
28 #include "libpq/pqmq.h"
29 #include "miscadmin.h"
30 #include "optimizer/planmain.h"
32 #include "storage/ipc.h"
33 #include "storage/sinval.h"
34 #include "storage/spin.h"
35 #include "tcop/tcopprot.h"
36 #include "utils/combocid.h"
37 #include "utils/guc.h"
38 #include "utils/inval.h"
39 #include "utils/memutils.h"
40 #include "utils/resowner.h"
41 #include "utils/snapmgr.h"
42 #include "utils/typcache.h"
46 * We don't want to waste a lot of memory on an error queue which, most of
47 * the time, will process only a handful of small messages. However, it is
48 * desirable to make it large enough that a typical ErrorResponse can be sent
49 * without blocking. That way, a worker that errors out can write the whole
50 * message into the queue and terminate without waiting for the user backend.
52 #define PARALLEL_ERROR_QUEUE_SIZE 16384
54 /* Magic number for parallel context TOC. */
55 #define PARALLEL_MAGIC 0x50477c7c
58 * Magic numbers for per-context parallel state sharing. Higher-level code
59 * should use smaller values, leaving these very large ones for use by this
62 #define PARALLEL_KEY_FIXED UINT64CONST(0xFFFFFFFFFFFF0001)
63 #define PARALLEL_KEY_ERROR_QUEUE UINT64CONST(0xFFFFFFFFFFFF0002)
64 #define PARALLEL_KEY_LIBRARY UINT64CONST(0xFFFFFFFFFFFF0003)
65 #define PARALLEL_KEY_GUC UINT64CONST(0xFFFFFFFFFFFF0004)
66 #define PARALLEL_KEY_COMBO_CID UINT64CONST(0xFFFFFFFFFFFF0005)
67 #define PARALLEL_KEY_TRANSACTION_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0006)
68 #define PARALLEL_KEY_ACTIVE_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0007)
69 #define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008)
70 #define PARALLEL_KEY_ENTRYPOINT UINT64CONST(0xFFFFFFFFFFFF0009)
71 #define PARALLEL_KEY_SESSION_DSM UINT64CONST(0xFFFFFFFFFFFF000A)
72 #define PARALLEL_KEY_REINDEX_STATE UINT64CONST(0xFFFFFFFFFFFF000B)
74 /* Fixed-size parallel state. */
75 typedef struct FixedParallelState
77 /* Fixed-size state that workers must restore. */
79 Oid authenticated_user_id;
82 Oid temp_namespace_id;
83 Oid temp_toast_namespace_id;
86 PGPROC *parallel_master_pgproc;
87 pid_t parallel_master_pid;
88 BackendId parallel_master_backend_id;
90 /* Mutex protects remaining fields. */
93 /* Maximum XactLastRecEnd of any worker. */
94 XLogRecPtr last_xlog_end;
98 * Our parallel worker number. We initialize this to -1, meaning that we are
99 * not a parallel worker. In parallel workers, it will be set to a value >= 0
100 * and < the number of workers before any user code is invoked; each parallel
101 * worker will get a different parallel worker number.
103 int ParallelWorkerNumber = -1;
105 /* Is there a parallel message pending which we need to receive? */
106 volatile bool ParallelMessagePending = false;
108 /* Are we initializing a parallel worker? */
109 bool InitializingParallelWorker = false;
111 /* Pointer to our fixed parallel state. */
112 static FixedParallelState *MyFixedParallelState;
114 /* List of active parallel contexts. */
115 static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list);
117 /* Backend-local copy of data from FixedParallelState. */
118 static pid_t ParallelMasterPid;
121 * List of internal parallel worker entry points. We need this for
122 * reasons explained in LookupParallelWorkerFunction(), below.
127 parallel_worker_main_type fn_addr;
128 } InternalParallelWorkers[] =
132 "ParallelQueryMain", ParallelQueryMain
135 "_bt_parallel_build_main", _bt_parallel_build_main
139 /* Private functions. */
140 static void HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg);
141 static void WaitForParallelWorkersToExit(ParallelContext *pcxt);
142 static parallel_worker_main_type LookupParallelWorkerFunction(const char *libraryname, const char *funcname);
143 static void ParallelWorkerShutdown(int code, Datum arg);
147 * Establish a new parallel context. This should be done after entering
148 * parallel mode, and (unless there is an error) the context should be
149 * destroyed before exiting the current subtransaction.
152 CreateParallelContext(const char *library_name, const char *function_name,
153 int nworkers, bool serializable_okay)
155 MemoryContext oldcontext;
156 ParallelContext *pcxt;
158 /* It is unsafe to create a parallel context if not in parallel mode. */
159 Assert(IsInParallelMode());
161 /* Number of workers should be non-negative. */
162 Assert(nworkers >= 0);
165 * If we are running under serializable isolation, we can't use parallel
166 * workers, at least not until somebody enhances that mechanism to be
167 * parallel-aware. Utility statement callers may ask us to ignore this
168 * restriction because they're always able to safely ignore the fact that
169 * SIREAD locks do not work with parallelism.
171 if (IsolationIsSerializable() && !serializable_okay)
174 /* We might be running in a short-lived memory context. */
175 oldcontext = MemoryContextSwitchTo(TopTransactionContext);
177 /* Initialize a new ParallelContext. */
178 pcxt = palloc0(sizeof(ParallelContext));
179 pcxt->subid = GetCurrentSubTransactionId();
180 pcxt->nworkers = nworkers;
181 pcxt->library_name = pstrdup(library_name);
182 pcxt->function_name = pstrdup(function_name);
183 pcxt->error_context_stack = error_context_stack;
184 shm_toc_initialize_estimator(&pcxt->estimator);
185 dlist_push_head(&pcxt_list, &pcxt->node);
187 /* Restore previous memory context. */
188 MemoryContextSwitchTo(oldcontext);
194 * Establish the dynamic shared memory segment for a parallel context and
195 * copy state and other bookkeeping information that will be needed by
196 * parallel workers into it.
199 InitializeParallelDSM(ParallelContext *pcxt)
201 MemoryContext oldcontext;
202 Size library_len = 0;
204 Size combocidlen = 0;
211 FixedParallelState *fps;
212 dsm_handle session_dsm_handle = DSM_HANDLE_INVALID;
213 Snapshot transaction_snapshot = GetTransactionSnapshot();
214 Snapshot active_snapshot = GetActiveSnapshot();
216 /* We might be running in a very short-lived memory context. */
217 oldcontext = MemoryContextSwitchTo(TopTransactionContext);
219 /* Allow space to store the fixed-size parallel state. */
220 shm_toc_estimate_chunk(&pcxt->estimator, sizeof(FixedParallelState));
221 shm_toc_estimate_keys(&pcxt->estimator, 1);
224 * Normally, the user will have requested at least one worker process, but
225 * if by chance they have not, we can skip a bunch of things here.
227 if (pcxt->nworkers > 0)
229 /* Get (or create) the per-session DSM segment's handle. */
230 session_dsm_handle = GetSessionDsmHandle();
233 * If we weren't able to create a per-session DSM segment, then we can
234 * continue but we can't safely launch any workers because their
235 * record typmods would be incompatible so they couldn't exchange
238 if (session_dsm_handle == DSM_HANDLE_INVALID)
242 if (pcxt->nworkers > 0)
244 /* Estimate space for various kinds of state sharing. */
245 library_len = EstimateLibraryStateSpace();
246 shm_toc_estimate_chunk(&pcxt->estimator, library_len);
247 guc_len = EstimateGUCStateSpace();
248 shm_toc_estimate_chunk(&pcxt->estimator, guc_len);
249 combocidlen = EstimateComboCIDStateSpace();
250 shm_toc_estimate_chunk(&pcxt->estimator, combocidlen);
251 tsnaplen = EstimateSnapshotSpace(transaction_snapshot);
252 shm_toc_estimate_chunk(&pcxt->estimator, tsnaplen);
253 asnaplen = EstimateSnapshotSpace(active_snapshot);
254 shm_toc_estimate_chunk(&pcxt->estimator, asnaplen);
255 tstatelen = EstimateTransactionStateSpace();
256 shm_toc_estimate_chunk(&pcxt->estimator, tstatelen);
257 shm_toc_estimate_chunk(&pcxt->estimator, sizeof(dsm_handle));
258 reindexlen = EstimateReindexStateSpace();
259 shm_toc_estimate_chunk(&pcxt->estimator, reindexlen);
260 /* If you add more chunks here, you probably need to add keys. */
261 shm_toc_estimate_keys(&pcxt->estimator, 8);
263 /* Estimate space need for error queues. */
264 StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) ==
265 PARALLEL_ERROR_QUEUE_SIZE,
266 "parallel error queue size not buffer-aligned");
267 shm_toc_estimate_chunk(&pcxt->estimator,
268 mul_size(PARALLEL_ERROR_QUEUE_SIZE,
270 shm_toc_estimate_keys(&pcxt->estimator, 1);
272 /* Estimate how much we'll need for the entrypoint info. */
273 shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name) +
274 strlen(pcxt->function_name) + 2);
275 shm_toc_estimate_keys(&pcxt->estimator, 1);
279 * Create DSM and initialize with new table of contents. But if the user
280 * didn't request any workers, then don't bother creating a dynamic shared
281 * memory segment; instead, just use backend-private memory.
283 * Also, if we can't create a dynamic shared memory segment because the
284 * maximum number of segments have already been created, then fall back to
285 * backend-private memory, and plan not to use any workers. We hope this
286 * won't happen very often, but it's better to abandon the use of
287 * parallelism than to fail outright.
289 segsize = shm_toc_estimate(&pcxt->estimator);
290 if (pcxt->nworkers > 0)
291 pcxt->seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS);
292 if (pcxt->seg != NULL)
293 pcxt->toc = shm_toc_create(PARALLEL_MAGIC,
294 dsm_segment_address(pcxt->seg),
299 pcxt->private_memory = MemoryContextAlloc(TopMemoryContext, segsize);
300 pcxt->toc = shm_toc_create(PARALLEL_MAGIC, pcxt->private_memory,
304 /* Initialize fixed-size state in shared memory. */
305 fps = (FixedParallelState *)
306 shm_toc_allocate(pcxt->toc, sizeof(FixedParallelState));
307 fps->database_id = MyDatabaseId;
308 fps->authenticated_user_id = GetAuthenticatedUserId();
309 fps->outer_user_id = GetCurrentRoleId();
310 fps->is_superuser = session_auth_is_superuser;
311 GetUserIdAndSecContext(&fps->current_user_id, &fps->sec_context);
312 GetTempNamespaceState(&fps->temp_namespace_id,
313 &fps->temp_toast_namespace_id);
314 fps->parallel_master_pgproc = MyProc;
315 fps->parallel_master_pid = MyProcPid;
316 fps->parallel_master_backend_id = MyBackendId;
317 SpinLockInit(&fps->mutex);
318 fps->last_xlog_end = 0;
319 shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps);
321 /* We can skip the rest of this if we're not budgeting for any workers. */
322 if (pcxt->nworkers > 0)
331 char *error_queue_space;
332 char *session_dsm_handle_space;
333 char *entrypointstate;
336 /* Serialize shared libraries we have loaded. */
337 libraryspace = shm_toc_allocate(pcxt->toc, library_len);
338 SerializeLibraryState(library_len, libraryspace);
339 shm_toc_insert(pcxt->toc, PARALLEL_KEY_LIBRARY, libraryspace);
341 /* Serialize GUC settings. */
342 gucspace = shm_toc_allocate(pcxt->toc, guc_len);
343 SerializeGUCState(guc_len, gucspace);
344 shm_toc_insert(pcxt->toc, PARALLEL_KEY_GUC, gucspace);
346 /* Serialize combo CID state. */
347 combocidspace = shm_toc_allocate(pcxt->toc, combocidlen);
348 SerializeComboCIDState(combocidlen, combocidspace);
349 shm_toc_insert(pcxt->toc, PARALLEL_KEY_COMBO_CID, combocidspace);
351 /* Serialize transaction snapshot and active snapshot. */
352 tsnapspace = shm_toc_allocate(pcxt->toc, tsnaplen);
353 SerializeSnapshot(transaction_snapshot, tsnapspace);
354 shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT,
356 asnapspace = shm_toc_allocate(pcxt->toc, asnaplen);
357 SerializeSnapshot(active_snapshot, asnapspace);
358 shm_toc_insert(pcxt->toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, asnapspace);
360 /* Provide the handle for per-session segment. */
361 session_dsm_handle_space = shm_toc_allocate(pcxt->toc,
363 *(dsm_handle *) session_dsm_handle_space = session_dsm_handle;
364 shm_toc_insert(pcxt->toc, PARALLEL_KEY_SESSION_DSM,
365 session_dsm_handle_space);
367 /* Serialize transaction state. */
368 tstatespace = shm_toc_allocate(pcxt->toc, tstatelen);
369 SerializeTransactionState(tstatelen, tstatespace);
370 shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_STATE, tstatespace);
372 /* Serialize reindex state. */
373 reindexspace = shm_toc_allocate(pcxt->toc, reindexlen);
374 SerializeReindexState(reindexlen, reindexspace);
375 shm_toc_insert(pcxt->toc, PARALLEL_KEY_REINDEX_STATE, reindexspace);
377 /* Allocate space for worker information. */
378 pcxt->worker = palloc0(sizeof(ParallelWorkerInfo) * pcxt->nworkers);
381 * Establish error queues in dynamic shared memory.
383 * These queues should be used only for transmitting ErrorResponse,
384 * NoticeResponse, and NotifyResponse protocol messages. Tuple data
385 * should be transmitted via separate (possibly larger?) queues.
388 shm_toc_allocate(pcxt->toc,
389 mul_size(PARALLEL_ERROR_QUEUE_SIZE,
391 for (i = 0; i < pcxt->nworkers; ++i)
396 start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
397 mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
398 shm_mq_set_receiver(mq, MyProc);
399 pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
401 shm_toc_insert(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, error_queue_space);
404 * Serialize entrypoint information. It's unsafe to pass function
405 * pointers across processes, as the function pointer may be different
406 * in each process in EXEC_BACKEND builds, so we always pass library
407 * and function name. (We use library name "postgres" for functions
408 * in the core backend.)
410 lnamelen = strlen(pcxt->library_name);
411 entrypointstate = shm_toc_allocate(pcxt->toc, lnamelen +
412 strlen(pcxt->function_name) + 2);
413 strcpy(entrypointstate, pcxt->library_name);
414 strcpy(entrypointstate + lnamelen + 1, pcxt->function_name);
415 shm_toc_insert(pcxt->toc, PARALLEL_KEY_ENTRYPOINT, entrypointstate);
418 /* Restore previous memory context. */
419 MemoryContextSwitchTo(oldcontext);
423 * Reinitialize the dynamic shared memory segment for a parallel context such
424 * that we could launch workers for it again.
427 ReinitializeParallelDSM(ParallelContext *pcxt)
429 FixedParallelState *fps;
431 /* Wait for any old workers to exit. */
432 if (pcxt->nworkers_launched > 0)
434 WaitForParallelWorkersToFinish(pcxt);
435 WaitForParallelWorkersToExit(pcxt);
436 pcxt->nworkers_launched = 0;
437 if (pcxt->known_attached_workers)
439 pfree(pcxt->known_attached_workers);
440 pcxt->known_attached_workers = NULL;
441 pcxt->nknown_attached_workers = 0;
445 /* Reset a few bits of fixed parallel state to a clean state. */
446 fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
447 fps->last_xlog_end = 0;
449 /* Recreate error queues (if they exist). */
450 if (pcxt->nworkers > 0)
452 char *error_queue_space;
456 shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, false);
457 for (i = 0; i < pcxt->nworkers; ++i)
462 start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
463 mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
464 shm_mq_set_receiver(mq, MyProc);
465 pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
471 * Launch parallel workers.
474 LaunchParallelWorkers(ParallelContext *pcxt)
476 MemoryContext oldcontext;
477 BackgroundWorker worker;
479 bool any_registrations_failed = false;
481 /* Skip this if we have no workers. */
482 if (pcxt->nworkers == 0)
485 /* We need to be a lock group leader. */
486 BecomeLockGroupLeader();
488 /* If we do have workers, we'd better have a DSM segment. */
489 Assert(pcxt->seg != NULL);
491 /* We might be running in a short-lived memory context. */
492 oldcontext = MemoryContextSwitchTo(TopTransactionContext);
494 /* Configure a worker. */
495 memset(&worker, 0, sizeof(worker));
496 snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d",
498 snprintf(worker.bgw_type, BGW_MAXLEN, "parallel worker");
500 BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION
501 | BGWORKER_CLASS_PARALLEL;
502 worker.bgw_start_time = BgWorkerStart_ConsistentState;
503 worker.bgw_restart_time = BGW_NEVER_RESTART;
504 sprintf(worker.bgw_library_name, "postgres");
505 sprintf(worker.bgw_function_name, "ParallelWorkerMain");
506 worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(pcxt->seg));
507 worker.bgw_notify_pid = MyProcPid;
512 * The caller must be able to tolerate ending up with fewer workers than
513 * expected, so there is no need to throw an error here if registration
514 * fails. It wouldn't help much anyway, because registering the worker in
515 * no way guarantees that it will start up and initialize successfully.
517 for (i = 0; i < pcxt->nworkers; ++i)
519 memcpy(worker.bgw_extra, &i, sizeof(int));
520 if (!any_registrations_failed &&
521 RegisterDynamicBackgroundWorker(&worker,
522 &pcxt->worker[i].bgwhandle))
524 shm_mq_set_handle(pcxt->worker[i].error_mqh,
525 pcxt->worker[i].bgwhandle);
526 pcxt->nworkers_launched++;
531 * If we weren't able to register the worker, then we've bumped up
532 * against the max_worker_processes limit, and future
533 * registrations will probably fail too, so arrange to skip them.
534 * But we still have to execute this code for the remaining slots
535 * to make sure that we forget about the error queues we budgeted
536 * for those workers. Otherwise, we'll wait for them to start,
537 * but they never will.
539 any_registrations_failed = true;
540 pcxt->worker[i].bgwhandle = NULL;
541 shm_mq_detach(pcxt->worker[i].error_mqh);
542 pcxt->worker[i].error_mqh = NULL;
547 * Now that nworkers_launched has taken its final value, we can initialize
548 * known_attached_workers.
550 if (pcxt->nworkers_launched > 0)
552 pcxt->known_attached_workers =
553 palloc0(sizeof(bool) * pcxt->nworkers_launched);
554 pcxt->nknown_attached_workers = 0;
557 /* Restore previous memory context. */
558 MemoryContextSwitchTo(oldcontext);
562 * Wait for all workers to attach to their error queues, and throw an error if
563 * any worker fails to do this.
565 * Callers can assume that if this function returns successfully, then the
566 * number of workers given by pcxt->nworkers_launched have initialized and
567 * attached to their error queues. Whether or not these workers are guaranteed
568 * to still be running depends on what code the caller asked them to run;
569 * this function does not guarantee that they have not exited. However, it
570 * does guarantee that any workers which exited must have done so cleanly and
571 * after successfully performing the work with which they were tasked.
573 * If this function is not called, then some of the workers that were launched
574 * may not have been started due to a fork() failure, or may have exited during
575 * early startup prior to attaching to the error queue, so nworkers_launched
576 * cannot be viewed as completely reliable. It will never be less than the
577 * number of workers which actually started, but it might be more. Any workers
578 * that failed to start will still be discovered by
579 * WaitForParallelWorkersToFinish and an error will be thrown at that time,
580 * provided that function is eventually reached.
582 * In general, the leader process should do as much work as possible before
583 * calling this function. fork() failures and other early-startup failures
584 * are very uncommon, and having the leader sit idle when it could be doing
585 * useful work is undesirable. However, if the leader needs to wait for
586 * all of its workers or for a specific worker, it may want to call this
587 * function before doing so. If not, it must make some other provision for
588 * the failure-to-start case, lest it wait forever. On the other hand, a
589 * leader which never waits for a worker that might not be started yet, or
590 * at least never does so prior to WaitForParallelWorkersToFinish(), need not
591 * call this function at all.
594 WaitForParallelWorkersToAttach(ParallelContext *pcxt)
598 /* Skip this if we have no launched workers. */
599 if (pcxt->nworkers_launched == 0)
605 * This will process any parallel messages that are pending and it may
606 * also throw an error propagated from a worker.
608 CHECK_FOR_INTERRUPTS();
610 for (i = 0; i < pcxt->nworkers_launched; ++i)
612 BgwHandleStatus status;
617 if (pcxt->known_attached_workers[i])
621 * If error_mqh is NULL, then the worker has already exited
624 if (pcxt->worker[i].error_mqh == NULL)
626 pcxt->known_attached_workers[i] = true;
627 ++pcxt->nknown_attached_workers;
631 status = GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle, &pid);
632 if (status == BGWH_STARTED)
634 /* Has the worker attached to the error queue? */
635 mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
636 if (shm_mq_get_sender(mq) != NULL)
638 /* Yes, so it is known to be attached. */
639 pcxt->known_attached_workers[i] = true;
640 ++pcxt->nknown_attached_workers;
643 else if (status == BGWH_STOPPED)
646 * If the worker stopped without attaching to the error queue,
649 mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
650 if (shm_mq_get_sender(mq) == NULL)
652 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
653 errmsg("parallel worker failed to initialize"),
654 errhint("More details may be available in the server log.")));
656 pcxt->known_attached_workers[i] = true;
657 ++pcxt->nknown_attached_workers;
662 * Worker not yet started, so we must wait. The postmaster
663 * will notify us if the worker's state changes. Our latch
664 * might also get set for some other reason, but if so we'll
665 * just end up waiting for the same worker again.
667 rc = WaitLatch(MyLatch,
668 WL_LATCH_SET | WL_POSTMASTER_DEATH,
669 -1, WAIT_EVENT_BGWORKER_STARTUP);
671 /* emergency bailout if postmaster has died */
672 if (rc & WL_POSTMASTER_DEATH)
675 if (rc & WL_LATCH_SET)
680 /* If all workers are known to have started, we're done. */
681 if (pcxt->nknown_attached_workers >= pcxt->nworkers_launched)
683 Assert(pcxt->nknown_attached_workers == pcxt->nworkers_launched);
690 * Wait for all workers to finish computing.
692 * Even if the parallel operation seems to have completed successfully, it's
693 * important to call this function afterwards. We must not miss any errors
694 * the workers may have thrown during the parallel operation, or any that they
695 * may yet throw while shutting down.
697 * Also, we want to update our notion of XactLastRecEnd based on worker
701 WaitForParallelWorkersToFinish(ParallelContext *pcxt)
705 bool anyone_alive = false;
710 * This will process any parallel messages that are pending, which may
711 * change the outcome of the loop that follows. It may also throw an
712 * error propagated from a worker.
714 CHECK_FOR_INTERRUPTS();
716 for (i = 0; i < pcxt->nworkers_launched; ++i)
719 * If error_mqh is NULL, then the worker has already exited
720 * cleanly. If we have received a message through error_mqh from
721 * the worker, we know it started up cleanly, and therefore we're
722 * certain to be notified when it exits.
724 if (pcxt->worker[i].error_mqh == NULL)
726 else if (pcxt->known_attached_workers[i])
735 /* If all workers are known to have finished, we're done. */
736 if (nfinished >= pcxt->nworkers_launched)
738 Assert(nfinished == pcxt->nworkers_launched);
743 * We didn't detect any living workers, but not all workers are
744 * known to have exited cleanly. Either not all workers have
745 * launched yet, or maybe some of them failed to start or
746 * terminated abnormally.
748 for (i = 0; i < pcxt->nworkers_launched; ++i)
754 * If the worker is BGWH_NOT_YET_STARTED or BGWH_STARTED, we
755 * should just keep waiting. If it is BGWH_STOPPED, then
756 * further investigation is needed.
758 if (pcxt->worker[i].error_mqh == NULL ||
759 pcxt->worker[i].bgwhandle == NULL ||
760 GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle,
761 &pid) != BGWH_STOPPED)
765 * Check whether the worker ended up stopped without ever
766 * attaching to the error queue. If so, the postmaster was
767 * unable to fork the worker or it exited without initializing
768 * properly. We must throw an error, since the caller may
769 * have been expecting the worker to do some work before
772 mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
773 if (shm_mq_get_sender(mq) == NULL)
775 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
776 errmsg("parallel worker failed to initialize"),
777 errhint("More details may be available in the server log.")));
780 * The worker is stopped, but is attached to the error queue.
781 * Unless there's a bug somewhere, this will only happen when
782 * the worker writes messages and terminates after the
783 * CHECK_FOR_INTERRUPTS() near the top of this function and
784 * before the call to GetBackgroundWorkerPid(). In that case,
785 * or latch should have been set as well and the right things
786 * will happen on the next pass through the loop.
791 WaitLatch(MyLatch, WL_LATCH_SET, -1,
792 WAIT_EVENT_PARALLEL_FINISH);
796 if (pcxt->toc != NULL)
798 FixedParallelState *fps;
800 fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
801 if (fps->last_xlog_end > XactLastRecEnd)
802 XactLastRecEnd = fps->last_xlog_end;
807 * Wait for all workers to exit.
809 * This function ensures that workers have been completely shutdown. The
810 * difference between WaitForParallelWorkersToFinish and this function is
811 * that former just ensures that last message sent by worker backend is
812 * received by master backend whereas this ensures the complete shutdown.
815 WaitForParallelWorkersToExit(ParallelContext *pcxt)
819 /* Wait until the workers actually die. */
820 for (i = 0; i < pcxt->nworkers_launched; ++i)
822 BgwHandleStatus status;
824 if (pcxt->worker == NULL || pcxt->worker[i].bgwhandle == NULL)
827 status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle);
830 * If the postmaster kicked the bucket, we have no chance of cleaning
831 * up safely -- we won't be able to tell when our workers are actually
832 * dead. This doesn't necessitate a PANIC since they will all abort
833 * eventually, but we can't safely continue this session.
835 if (status == BGWH_POSTMASTER_DIED)
837 (errcode(ERRCODE_ADMIN_SHUTDOWN),
838 errmsg("postmaster exited during a parallel transaction")));
840 /* Release memory. */
841 pfree(pcxt->worker[i].bgwhandle);
842 pcxt->worker[i].bgwhandle = NULL;
847 * Destroy a parallel context.
849 * If expecting a clean exit, you should use WaitForParallelWorkersToFinish()
850 * first, before calling this function. When this function is invoked, any
851 * remaining workers are forcibly killed; the dynamic shared memory segment
852 * is unmapped; and we then wait (uninterruptibly) for the workers to exit.
855 DestroyParallelContext(ParallelContext *pcxt)
860 * Be careful about order of operations here! We remove the parallel
861 * context from the list before we do anything else; otherwise, if an
862 * error occurs during a subsequent step, we might try to nuke it again
863 * from AtEOXact_Parallel or AtEOSubXact_Parallel.
865 dlist_delete(&pcxt->node);
867 /* Kill each worker in turn, and forget their error queues. */
868 if (pcxt->worker != NULL)
870 for (i = 0; i < pcxt->nworkers_launched; ++i)
872 if (pcxt->worker[i].error_mqh != NULL)
874 TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
876 shm_mq_detach(pcxt->worker[i].error_mqh);
877 pcxt->worker[i].error_mqh = NULL;
883 * If we have allocated a shared memory segment, detach it. This will
884 * implicitly detach the error queues, and any other shared memory queues,
887 if (pcxt->seg != NULL)
889 dsm_detach(pcxt->seg);
894 * If this parallel context is actually in backend-private memory rather
895 * than shared memory, free that memory instead.
897 if (pcxt->private_memory != NULL)
899 pfree(pcxt->private_memory);
900 pcxt->private_memory = NULL;
904 * We can't finish transaction commit or abort until all of the workers
905 * have exited. This means, in particular, that we can't respond to
906 * interrupts at this stage.
909 WaitForParallelWorkersToExit(pcxt);
912 /* Free the worker array itself. */
913 if (pcxt->worker != NULL)
920 pfree(pcxt->library_name);
921 pfree(pcxt->function_name);
926 * Are there any parallel contexts currently active?
929 ParallelContextActive(void)
931 return !dlist_is_empty(&pcxt_list);
935 * Handle receipt of an interrupt indicating a parallel worker message.
937 * Note: this is called within a signal handler! All we can do is set
938 * a flag that will cause the next CHECK_FOR_INTERRUPTS() to invoke
939 * HandleParallelMessages().
942 HandleParallelMessageInterrupt(void)
944 InterruptPending = true;
945 ParallelMessagePending = true;
950 * Handle any queued protocol messages received from parallel workers.
953 HandleParallelMessages(void)
956 MemoryContext oldcontext;
958 static MemoryContext hpm_context = NULL;
961 * This is invoked from ProcessInterrupts(), and since some of the
962 * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
963 * for recursive calls if more signals are received while this runs. It's
964 * unclear that recursive entry would be safe, and it doesn't seem useful
965 * even if it is safe, so let's block interrupts until done.
970 * Moreover, CurrentMemoryContext might be pointing almost anywhere. We
971 * don't want to risk leaking data into long-lived contexts, so let's do
972 * our work here in a private context that we can reset on each use.
974 if (hpm_context == NULL) /* first time through? */
975 hpm_context = AllocSetContextCreate(TopMemoryContext,
976 "HandleParallelMessages",
977 ALLOCSET_DEFAULT_SIZES);
979 MemoryContextReset(hpm_context);
981 oldcontext = MemoryContextSwitchTo(hpm_context);
983 /* OK to process messages. Reset the flag saying there are more to do. */
984 ParallelMessagePending = false;
986 dlist_foreach(iter, &pcxt_list)
988 ParallelContext *pcxt;
991 pcxt = dlist_container(ParallelContext, node, iter.cur);
992 if (pcxt->worker == NULL)
995 for (i = 0; i < pcxt->nworkers_launched; ++i)
998 * Read as many messages as we can from each worker, but stop when
999 * either (1) the worker's error queue goes away, which can happen
1000 * if we receive a Terminate message from the worker; or (2) no
1001 * more messages can be read from the worker without blocking.
1003 while (pcxt->worker[i].error_mqh != NULL)
1009 res = shm_mq_receive(pcxt->worker[i].error_mqh, &nbytes,
1011 if (res == SHM_MQ_WOULD_BLOCK)
1013 else if (res == SHM_MQ_SUCCESS)
1017 initStringInfo(&msg);
1018 appendBinaryStringInfo(&msg, data, nbytes);
1019 HandleParallelMessage(pcxt, i, &msg);
1024 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1025 errmsg("lost connection to parallel worker")));
1030 MemoryContextSwitchTo(oldcontext);
1032 /* Might as well clear the context on our way out */
1033 MemoryContextReset(hpm_context);
1035 RESUME_INTERRUPTS();
1039 * Handle a single protocol message received from a single parallel worker.
1042 HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
1046 if (pcxt->known_attached_workers != NULL &&
1047 !pcxt->known_attached_workers[i])
1049 pcxt->known_attached_workers[i] = true;
1050 pcxt->nknown_attached_workers++;
1053 msgtype = pq_getmsgbyte(msg);
1057 case 'K': /* BackendKeyData */
1059 int32 pid = pq_getmsgint(msg, 4);
1061 (void) pq_getmsgint(msg, 4); /* discard cancel key */
1062 (void) pq_getmsgend(msg);
1063 pcxt->worker[i].pid = pid;
1067 case 'E': /* ErrorResponse */
1068 case 'N': /* NoticeResponse */
1071 ErrorContextCallback *save_error_context_stack;
1073 /* Parse ErrorResponse or NoticeResponse. */
1074 pq_parse_errornotice(msg, &edata);
1076 /* Death of a worker isn't enough justification for suicide. */
1077 edata.elevel = Min(edata.elevel, ERROR);
1080 * If desired, add a context line to show that this is a
1081 * message propagated from a parallel worker. Otherwise, it
1082 * can sometimes be confusing to understand what actually
1083 * happened. (We don't do this in FORCE_PARALLEL_REGRESS mode
1084 * because it causes test-result instability depending on
1085 * whether a parallel worker is actually used or not.)
1087 if (force_parallel_mode != FORCE_PARALLEL_REGRESS)
1090 edata.context = psprintf("%s\n%s", edata.context,
1091 _("parallel worker"));
1093 edata.context = pstrdup(_("parallel worker"));
1097 * Context beyond that should use the error context callbacks
1098 * that were in effect when the ParallelContext was created,
1099 * not the current ones.
1101 save_error_context_stack = error_context_stack;
1102 error_context_stack = pcxt->error_context_stack;
1104 /* Rethrow error or print notice. */
1105 ThrowErrorData(&edata);
1107 /* Not an error, so restore previous context stack. */
1108 error_context_stack = save_error_context_stack;
1113 case 'A': /* NotifyResponse */
1115 /* Propagate NotifyResponse. */
1117 const char *channel;
1118 const char *payload;
1120 pid = pq_getmsgint(msg, 4);
1121 channel = pq_getmsgrawstring(msg);
1122 payload = pq_getmsgrawstring(msg);
1125 NotifyMyFrontEnd(channel, payload, pid);
1130 case 'X': /* Terminate, indicating clean exit */
1132 shm_mq_detach(pcxt->worker[i].error_mqh);
1133 pcxt->worker[i].error_mqh = NULL;
1139 elog(ERROR, "unrecognized message type received from parallel worker: %c (message length %d bytes)",
1146 * End-of-subtransaction cleanup for parallel contexts.
1148 * Currently, it's forbidden to enter or leave a subtransaction while
1149 * parallel mode is in effect, so we could just blow away everything. But
1150 * we may want to relax that restriction in the future, so this code
1151 * contemplates that there may be multiple subtransaction IDs in pcxt_list.
1154 AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId)
1156 while (!dlist_is_empty(&pcxt_list))
1158 ParallelContext *pcxt;
1160 pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
1161 if (pcxt->subid != mySubId)
1164 elog(WARNING, "leaked parallel context");
1165 DestroyParallelContext(pcxt);
1170 * End-of-transaction cleanup for parallel contexts.
1173 AtEOXact_Parallel(bool isCommit)
1175 while (!dlist_is_empty(&pcxt_list))
1177 ParallelContext *pcxt;
1179 pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
1181 elog(WARNING, "leaked parallel context");
1182 DestroyParallelContext(pcxt);
1187 * Main entrypoint for parallel workers.
1190 ParallelWorkerMain(Datum main_arg)
1194 FixedParallelState *fps;
1195 char *error_queue_space;
1199 char *entrypointstate;
1201 char *function_name;
1202 parallel_worker_main_type entrypt;
1204 char *combocidspace;
1209 StringInfoData msgbuf;
1210 char *session_dsm_handle_space;
1212 /* Set flag to indicate that we're initializing a parallel worker. */
1213 InitializingParallelWorker = true;
1215 /* Establish signal handlers. */
1216 pqsignal(SIGTERM, die);
1217 BackgroundWorkerUnblockSignals();
1219 /* Determine and set our parallel worker number. */
1220 Assert(ParallelWorkerNumber == -1);
1221 memcpy(&ParallelWorkerNumber, MyBgworkerEntry->bgw_extra, sizeof(int));
1223 /* Set up a memory context and resource owner. */
1224 Assert(CurrentResourceOwner == NULL);
1225 CurrentResourceOwner = ResourceOwnerCreate(NULL, "parallel toplevel");
1226 CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext,
1228 ALLOCSET_DEFAULT_SIZES);
1231 * Now that we have a resource owner, we can attach to the dynamic shared
1232 * memory segment and read the table of contents.
1234 seg = dsm_attach(DatumGetUInt32(main_arg));
1237 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1238 errmsg("could not map dynamic shared memory segment")));
1239 toc = shm_toc_attach(PARALLEL_MAGIC, dsm_segment_address(seg));
1242 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1243 errmsg("invalid magic number in dynamic shared memory segment")));
1245 /* Look up fixed parallel state. */
1246 fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED, false);
1247 MyFixedParallelState = fps;
1249 /* Arrange to signal the leader if we exit. */
1250 ParallelMasterPid = fps->parallel_master_pid;
1251 ParallelMasterBackendId = fps->parallel_master_backend_id;
1252 on_shmem_exit(ParallelWorkerShutdown, (Datum) 0);
1255 * Now we can find and attach to the error queue provided for us. That's
1256 * good, because until we do that, any errors that happen here will not be
1257 * reported back to the process that requested that this worker be
1260 error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE, false);
1261 mq = (shm_mq *) (error_queue_space +
1262 ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE);
1263 shm_mq_set_sender(mq, MyProc);
1264 mqh = shm_mq_attach(mq, seg, NULL);
1265 pq_redirect_to_shm_mq(seg, mqh);
1266 pq_set_parallel_master(fps->parallel_master_pid,
1267 fps->parallel_master_backend_id);
1270 * Send a BackendKeyData message to the process that initiated parallelism
1271 * so that it has access to our PID before it receives any other messages
1272 * from us. Our cancel key is sent, too, since that's the way the
1273 * protocol message is defined, but it won't actually be used for anything
1276 pq_beginmessage(&msgbuf, 'K');
1277 pq_sendint32(&msgbuf, (int32) MyProcPid);
1278 pq_sendint32(&msgbuf, (int32) MyCancelKey);
1279 pq_endmessage(&msgbuf);
1282 * Hooray! Primary initialization is complete. Now, we need to set up our
1283 * backend-local state to match the original backend.
1287 * Join locking group. We must do this before anything that could try to
1288 * acquire a heavyweight lock, because any heavyweight locks acquired to
1289 * this point could block either directly against the parallel group
1290 * leader or against some process which in turn waits for a lock that
1291 * conflicts with the parallel group leader, causing an undetected
1292 * deadlock. (If we can't join the lock group, the leader has gone away,
1293 * so just exit quietly.)
1295 if (!BecomeLockGroupMember(fps->parallel_master_pgproc,
1296 fps->parallel_master_pid))
1300 * Load libraries that were loaded by original backend. We want to do
1301 * this before restoring GUCs, because the libraries might define custom
1304 libraryspace = shm_toc_lookup(toc, PARALLEL_KEY_LIBRARY, false);
1305 RestoreLibraryState(libraryspace);
1308 * Identify the entry point to be called. In theory this could result in
1309 * loading an additional library, though most likely the entry point is in
1310 * the core backend or in a library we just loaded.
1312 entrypointstate = shm_toc_lookup(toc, PARALLEL_KEY_ENTRYPOINT, false);
1313 library_name = entrypointstate;
1314 function_name = entrypointstate + strlen(library_name) + 1;
1316 entrypt = LookupParallelWorkerFunction(library_name, function_name);
1318 /* Restore database connection. */
1319 BackgroundWorkerInitializeConnectionByOid(fps->database_id,
1320 fps->authenticated_user_id,
1324 * Set the client encoding to the database encoding, since that is what
1325 * the leader will expect.
1327 SetClientEncoding(GetDatabaseEncoding());
1329 /* Restore GUC values from launching backend. */
1330 gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC, false);
1331 StartTransactionCommand();
1332 RestoreGUCState(gucspace);
1333 CommitTransactionCommand();
1335 /* Crank up a transaction state appropriate to a parallel worker. */
1336 tstatespace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_STATE, false);
1337 StartParallelWorkerTransaction(tstatespace);
1339 /* Restore combo CID state. */
1340 combocidspace = shm_toc_lookup(toc, PARALLEL_KEY_COMBO_CID, false);
1341 RestoreComboCIDState(combocidspace);
1343 /* Attach to the per-session DSM segment and contained objects. */
1344 session_dsm_handle_space =
1345 shm_toc_lookup(toc, PARALLEL_KEY_SESSION_DSM, false);
1346 AttachSession(*(dsm_handle *) session_dsm_handle_space);
1348 /* Restore transaction snapshot. */
1349 tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT, false);
1350 RestoreTransactionSnapshot(RestoreSnapshot(tsnapspace),
1351 fps->parallel_master_pgproc);
1353 /* Restore active snapshot. */
1354 asnapspace = shm_toc_lookup(toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, false);
1355 PushActiveSnapshot(RestoreSnapshot(asnapspace));
1358 * We've changed which tuples we can see, and must therefore invalidate
1361 InvalidateSystemCaches();
1364 * Restore current role id. Skip verifying whether session user is
1365 * allowed to become this role and blindly restore the leader's state for
1368 SetCurrentRoleId(fps->outer_user_id, fps->is_superuser);
1370 /* Restore user ID and security context. */
1371 SetUserIdAndSecContext(fps->current_user_id, fps->sec_context);
1373 /* Restore temp-namespace state to ensure search path matches leader's. */
1374 SetTempNamespaceState(fps->temp_namespace_id,
1375 fps->temp_toast_namespace_id);
1377 /* Restore reindex state. */
1378 reindexspace = shm_toc_lookup(toc, PARALLEL_KEY_REINDEX_STATE, false);
1379 RestoreReindexState(reindexspace);
1382 * We've initialized all of our state now; nothing should change
1385 InitializingParallelWorker = false;
1386 EnterParallelMode();
1389 * Time to do the real work: invoke the caller-supplied code.
1393 /* Must exit parallel mode to pop active snapshot. */
1396 /* Must pop active snapshot so resowner.c doesn't complain. */
1397 PopActiveSnapshot();
1399 /* Shut down the parallel-worker transaction. */
1400 EndParallelWorkerTransaction();
1402 /* Detach from the per-session DSM segment. */
1405 /* Report success. */
1406 pq_putmessage('X', NULL, 0);
1410 * Update shared memory with the ending location of the last WAL record we
1411 * wrote, if it's greater than the value already stored there.
1414 ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end)
1416 FixedParallelState *fps = MyFixedParallelState;
1418 Assert(fps != NULL);
1419 SpinLockAcquire(&fps->mutex);
1420 if (fps->last_xlog_end < last_xlog_end)
1421 fps->last_xlog_end = last_xlog_end;
1422 SpinLockRelease(&fps->mutex);
1426 * Make sure the leader tries to read from our error queue one more time.
1427 * This guards against the case where we exit uncleanly without sending an
1428 * ErrorResponse to the leader, for example because some code calls proc_exit
1432 ParallelWorkerShutdown(int code, Datum arg)
1434 SendProcSignal(ParallelMasterPid,
1435 PROCSIG_PARALLEL_MESSAGE,
1436 ParallelMasterBackendId);
1440 * Look up (and possibly load) a parallel worker entry point function.
1442 * For functions contained in the core code, we use library name "postgres"
1443 * and consult the InternalParallelWorkers array. External functions are
1444 * looked up, and loaded if necessary, using load_external_function().
1446 * The point of this is to pass function names as strings across process
1447 * boundaries. We can't pass actual function addresses because of the
1448 * possibility that the function has been loaded at a different address
1449 * in a different process. This is obviously a hazard for functions in
1450 * loadable libraries, but it can happen even for functions in the core code
1451 * on platforms using EXEC_BACKEND (e.g., Windows).
1453 * At some point it might be worthwhile to get rid of InternalParallelWorkers[]
1454 * in favor of applying load_external_function() for core functions too;
1455 * but that raises portability issues that are not worth addressing now.
1457 static parallel_worker_main_type
1458 LookupParallelWorkerFunction(const char *libraryname, const char *funcname)
1461 * If the function is to be loaded from postgres itself, search the
1462 * InternalParallelWorkers array.
1464 if (strcmp(libraryname, "postgres") == 0)
1468 for (i = 0; i < lengthof(InternalParallelWorkers); i++)
1470 if (strcmp(InternalParallelWorkers[i].fn_name, funcname) == 0)
1471 return InternalParallelWorkers[i].fn_addr;
1474 /* We can only reach this by programming error. */
1475 elog(ERROR, "internal function \"%s\" not found", funcname);
1478 /* Otherwise load from external library. */
1479 return (parallel_worker_main_type)
1480 load_external_function(libraryname, funcname, true, NULL);