]> granicus.if.org Git - postgresql/blob - src/backend/access/transam/parallel.c
Remove dynamic_shared_memory_type=none
[postgresql] / src / backend / access / transam / parallel.c
1 /*-------------------------------------------------------------------------
2  *
3  * parallel.c
4  *        Infrastructure for launching parallel workers
5  *
6  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  *        src/backend/access/transam/parallel.c
11  *
12  *-------------------------------------------------------------------------
13  */
14
15 #include "postgres.h"
16
17 #include "access/nbtree.h"
18 #include "access/parallel.h"
19 #include "access/session.h"
20 #include "access/xact.h"
21 #include "access/xlog.h"
22 #include "catalog/index.h"
23 #include "catalog/namespace.h"
24 #include "commands/async.h"
25 #include "executor/execParallel.h"
26 #include "libpq/libpq.h"
27 #include "libpq/pqformat.h"
28 #include "libpq/pqmq.h"
29 #include "miscadmin.h"
30 #include "optimizer/planmain.h"
31 #include "pgstat.h"
32 #include "storage/ipc.h"
33 #include "storage/sinval.h"
34 #include "storage/spin.h"
35 #include "tcop/tcopprot.h"
36 #include "utils/combocid.h"
37 #include "utils/guc.h"
38 #include "utils/inval.h"
39 #include "utils/memutils.h"
40 #include "utils/resowner.h"
41 #include "utils/snapmgr.h"
42 #include "utils/typcache.h"
43
44
45 /*
46  * We don't want to waste a lot of memory on an error queue which, most of
47  * the time, will process only a handful of small messages.  However, it is
48  * desirable to make it large enough that a typical ErrorResponse can be sent
49  * without blocking.  That way, a worker that errors out can write the whole
50  * message into the queue and terminate without waiting for the user backend.
51  */
52 #define PARALLEL_ERROR_QUEUE_SIZE                       16384
53
54 /* Magic number for parallel context TOC. */
55 #define PARALLEL_MAGIC                                          0x50477c7c
56
57 /*
58  * Magic numbers for per-context parallel state sharing.  Higher-level code
59  * should use smaller values, leaving these very large ones for use by this
60  * module.
61  */
62 #define PARALLEL_KEY_FIXED                                      UINT64CONST(0xFFFFFFFFFFFF0001)
63 #define PARALLEL_KEY_ERROR_QUEUE                        UINT64CONST(0xFFFFFFFFFFFF0002)
64 #define PARALLEL_KEY_LIBRARY                            UINT64CONST(0xFFFFFFFFFFFF0003)
65 #define PARALLEL_KEY_GUC                                        UINT64CONST(0xFFFFFFFFFFFF0004)
66 #define PARALLEL_KEY_COMBO_CID                          UINT64CONST(0xFFFFFFFFFFFF0005)
67 #define PARALLEL_KEY_TRANSACTION_SNAPSHOT       UINT64CONST(0xFFFFFFFFFFFF0006)
68 #define PARALLEL_KEY_ACTIVE_SNAPSHOT            UINT64CONST(0xFFFFFFFFFFFF0007)
69 #define PARALLEL_KEY_TRANSACTION_STATE          UINT64CONST(0xFFFFFFFFFFFF0008)
70 #define PARALLEL_KEY_ENTRYPOINT                         UINT64CONST(0xFFFFFFFFFFFF0009)
71 #define PARALLEL_KEY_SESSION_DSM                        UINT64CONST(0xFFFFFFFFFFFF000A)
72 #define PARALLEL_KEY_REINDEX_STATE                      UINT64CONST(0xFFFFFFFFFFFF000B)
73
74 /* Fixed-size parallel state. */
75 typedef struct FixedParallelState
76 {
77         /* Fixed-size state that workers must restore. */
78         Oid                     database_id;
79         Oid                     authenticated_user_id;
80         Oid                     current_user_id;
81         Oid                     outer_user_id;
82         Oid                     temp_namespace_id;
83         Oid                     temp_toast_namespace_id;
84         int                     sec_context;
85         bool            is_superuser;
86         PGPROC     *parallel_master_pgproc;
87         pid_t           parallel_master_pid;
88         BackendId       parallel_master_backend_id;
89
90         /* Mutex protects remaining fields. */
91         slock_t         mutex;
92
93         /* Maximum XactLastRecEnd of any worker. */
94         XLogRecPtr      last_xlog_end;
95 } FixedParallelState;
96
97 /*
98  * Our parallel worker number.  We initialize this to -1, meaning that we are
99  * not a parallel worker.  In parallel workers, it will be set to a value >= 0
100  * and < the number of workers before any user code is invoked; each parallel
101  * worker will get a different parallel worker number.
102  */
103 int                     ParallelWorkerNumber = -1;
104
105 /* Is there a parallel message pending which we need to receive? */
106 volatile bool ParallelMessagePending = false;
107
108 /* Are we initializing a parallel worker? */
109 bool            InitializingParallelWorker = false;
110
111 /* Pointer to our fixed parallel state. */
112 static FixedParallelState *MyFixedParallelState;
113
114 /* List of active parallel contexts. */
115 static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list);
116
117 /* Backend-local copy of data from FixedParallelState. */
118 static pid_t ParallelMasterPid;
119
120 /*
121  * List of internal parallel worker entry points.  We need this for
122  * reasons explained in LookupParallelWorkerFunction(), below.
123  */
124 static const struct
125 {
126         const char *fn_name;
127         parallel_worker_main_type fn_addr;
128 }                       InternalParallelWorkers[] =
129
130 {
131         {
132                 "ParallelQueryMain", ParallelQueryMain
133         },
134         {
135                 "_bt_parallel_build_main", _bt_parallel_build_main
136         }
137 };
138
139 /* Private functions. */
140 static void HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg);
141 static void WaitForParallelWorkersToExit(ParallelContext *pcxt);
142 static parallel_worker_main_type LookupParallelWorkerFunction(const char *libraryname, const char *funcname);
143 static void ParallelWorkerShutdown(int code, Datum arg);
144
145
146 /*
147  * Establish a new parallel context.  This should be done after entering
148  * parallel mode, and (unless there is an error) the context should be
149  * destroyed before exiting the current subtransaction.
150  */
151 ParallelContext *
152 CreateParallelContext(const char *library_name, const char *function_name,
153                                           int nworkers, bool serializable_okay)
154 {
155         MemoryContext oldcontext;
156         ParallelContext *pcxt;
157
158         /* It is unsafe to create a parallel context if not in parallel mode. */
159         Assert(IsInParallelMode());
160
161         /* Number of workers should be non-negative. */
162         Assert(nworkers >= 0);
163
164         /*
165          * If we are running under serializable isolation, we can't use parallel
166          * workers, at least not until somebody enhances that mechanism to be
167          * parallel-aware.  Utility statement callers may ask us to ignore this
168          * restriction because they're always able to safely ignore the fact that
169          * SIREAD locks do not work with parallelism.
170          */
171         if (IsolationIsSerializable() && !serializable_okay)
172                 nworkers = 0;
173
174         /* We might be running in a short-lived memory context. */
175         oldcontext = MemoryContextSwitchTo(TopTransactionContext);
176
177         /* Initialize a new ParallelContext. */
178         pcxt = palloc0(sizeof(ParallelContext));
179         pcxt->subid = GetCurrentSubTransactionId();
180         pcxt->nworkers = nworkers;
181         pcxt->library_name = pstrdup(library_name);
182         pcxt->function_name = pstrdup(function_name);
183         pcxt->error_context_stack = error_context_stack;
184         shm_toc_initialize_estimator(&pcxt->estimator);
185         dlist_push_head(&pcxt_list, &pcxt->node);
186
187         /* Restore previous memory context. */
188         MemoryContextSwitchTo(oldcontext);
189
190         return pcxt;
191 }
192
193 /*
194  * Establish the dynamic shared memory segment for a parallel context and
195  * copy state and other bookkeeping information that will be needed by
196  * parallel workers into it.
197  */
198 void
199 InitializeParallelDSM(ParallelContext *pcxt)
200 {
201         MemoryContext oldcontext;
202         Size            library_len = 0;
203         Size            guc_len = 0;
204         Size            combocidlen = 0;
205         Size            tsnaplen = 0;
206         Size            asnaplen = 0;
207         Size            tstatelen = 0;
208         Size            reindexlen = 0;
209         Size            segsize = 0;
210         int                     i;
211         FixedParallelState *fps;
212         dsm_handle      session_dsm_handle = DSM_HANDLE_INVALID;
213         Snapshot        transaction_snapshot = GetTransactionSnapshot();
214         Snapshot        active_snapshot = GetActiveSnapshot();
215
216         /* We might be running in a very short-lived memory context. */
217         oldcontext = MemoryContextSwitchTo(TopTransactionContext);
218
219         /* Allow space to store the fixed-size parallel state. */
220         shm_toc_estimate_chunk(&pcxt->estimator, sizeof(FixedParallelState));
221         shm_toc_estimate_keys(&pcxt->estimator, 1);
222
223         /*
224          * Normally, the user will have requested at least one worker process, but
225          * if by chance they have not, we can skip a bunch of things here.
226          */
227         if (pcxt->nworkers > 0)
228         {
229                 /* Get (or create) the per-session DSM segment's handle. */
230                 session_dsm_handle = GetSessionDsmHandle();
231
232                 /*
233                  * If we weren't able to create a per-session DSM segment, then we can
234                  * continue but we can't safely launch any workers because their
235                  * record typmods would be incompatible so they couldn't exchange
236                  * tuples.
237                  */
238                 if (session_dsm_handle == DSM_HANDLE_INVALID)
239                         pcxt->nworkers = 0;
240         }
241
242         if (pcxt->nworkers > 0)
243         {
244                 /* Estimate space for various kinds of state sharing. */
245                 library_len = EstimateLibraryStateSpace();
246                 shm_toc_estimate_chunk(&pcxt->estimator, library_len);
247                 guc_len = EstimateGUCStateSpace();
248                 shm_toc_estimate_chunk(&pcxt->estimator, guc_len);
249                 combocidlen = EstimateComboCIDStateSpace();
250                 shm_toc_estimate_chunk(&pcxt->estimator, combocidlen);
251                 tsnaplen = EstimateSnapshotSpace(transaction_snapshot);
252                 shm_toc_estimate_chunk(&pcxt->estimator, tsnaplen);
253                 asnaplen = EstimateSnapshotSpace(active_snapshot);
254                 shm_toc_estimate_chunk(&pcxt->estimator, asnaplen);
255                 tstatelen = EstimateTransactionStateSpace();
256                 shm_toc_estimate_chunk(&pcxt->estimator, tstatelen);
257                 shm_toc_estimate_chunk(&pcxt->estimator, sizeof(dsm_handle));
258                 reindexlen = EstimateReindexStateSpace();
259                 shm_toc_estimate_chunk(&pcxt->estimator, reindexlen);
260                 /* If you add more chunks here, you probably need to add keys. */
261                 shm_toc_estimate_keys(&pcxt->estimator, 8);
262
263                 /* Estimate space need for error queues. */
264                 StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) ==
265                                                  PARALLEL_ERROR_QUEUE_SIZE,
266                                                  "parallel error queue size not buffer-aligned");
267                 shm_toc_estimate_chunk(&pcxt->estimator,
268                                                            mul_size(PARALLEL_ERROR_QUEUE_SIZE,
269                                                                                 pcxt->nworkers));
270                 shm_toc_estimate_keys(&pcxt->estimator, 1);
271
272                 /* Estimate how much we'll need for the entrypoint info. */
273                 shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name) +
274                                                            strlen(pcxt->function_name) + 2);
275                 shm_toc_estimate_keys(&pcxt->estimator, 1);
276         }
277
278         /*
279          * Create DSM and initialize with new table of contents.  But if the user
280          * didn't request any workers, then don't bother creating a dynamic shared
281          * memory segment; instead, just use backend-private memory.
282          *
283          * Also, if we can't create a dynamic shared memory segment because the
284          * maximum number of segments have already been created, then fall back to
285          * backend-private memory, and plan not to use any workers.  We hope this
286          * won't happen very often, but it's better to abandon the use of
287          * parallelism than to fail outright.
288          */
289         segsize = shm_toc_estimate(&pcxt->estimator);
290         if (pcxt->nworkers > 0)
291                 pcxt->seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS);
292         if (pcxt->seg != NULL)
293                 pcxt->toc = shm_toc_create(PARALLEL_MAGIC,
294                                                                    dsm_segment_address(pcxt->seg),
295                                                                    segsize);
296         else
297         {
298                 pcxt->nworkers = 0;
299                 pcxt->private_memory = MemoryContextAlloc(TopMemoryContext, segsize);
300                 pcxt->toc = shm_toc_create(PARALLEL_MAGIC, pcxt->private_memory,
301                                                                    segsize);
302         }
303
304         /* Initialize fixed-size state in shared memory. */
305         fps = (FixedParallelState *)
306                 shm_toc_allocate(pcxt->toc, sizeof(FixedParallelState));
307         fps->database_id = MyDatabaseId;
308         fps->authenticated_user_id = GetAuthenticatedUserId();
309         fps->outer_user_id = GetCurrentRoleId();
310         fps->is_superuser = session_auth_is_superuser;
311         GetUserIdAndSecContext(&fps->current_user_id, &fps->sec_context);
312         GetTempNamespaceState(&fps->temp_namespace_id,
313                                                   &fps->temp_toast_namespace_id);
314         fps->parallel_master_pgproc = MyProc;
315         fps->parallel_master_pid = MyProcPid;
316         fps->parallel_master_backend_id = MyBackendId;
317         SpinLockInit(&fps->mutex);
318         fps->last_xlog_end = 0;
319         shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps);
320
321         /* We can skip the rest of this if we're not budgeting for any workers. */
322         if (pcxt->nworkers > 0)
323         {
324                 char       *libraryspace;
325                 char       *gucspace;
326                 char       *combocidspace;
327                 char       *tsnapspace;
328                 char       *asnapspace;
329                 char       *tstatespace;
330                 char       *reindexspace;
331                 char       *error_queue_space;
332                 char       *session_dsm_handle_space;
333                 char       *entrypointstate;
334                 Size            lnamelen;
335
336                 /* Serialize shared libraries we have loaded. */
337                 libraryspace = shm_toc_allocate(pcxt->toc, library_len);
338                 SerializeLibraryState(library_len, libraryspace);
339                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_LIBRARY, libraryspace);
340
341                 /* Serialize GUC settings. */
342                 gucspace = shm_toc_allocate(pcxt->toc, guc_len);
343                 SerializeGUCState(guc_len, gucspace);
344                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_GUC, gucspace);
345
346                 /* Serialize combo CID state. */
347                 combocidspace = shm_toc_allocate(pcxt->toc, combocidlen);
348                 SerializeComboCIDState(combocidlen, combocidspace);
349                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_COMBO_CID, combocidspace);
350
351                 /* Serialize transaction snapshot and active snapshot. */
352                 tsnapspace = shm_toc_allocate(pcxt->toc, tsnaplen);
353                 SerializeSnapshot(transaction_snapshot, tsnapspace);
354                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT,
355                                            tsnapspace);
356                 asnapspace = shm_toc_allocate(pcxt->toc, asnaplen);
357                 SerializeSnapshot(active_snapshot, asnapspace);
358                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, asnapspace);
359
360                 /* Provide the handle for per-session segment. */
361                 session_dsm_handle_space = shm_toc_allocate(pcxt->toc,
362                                                                                                         sizeof(dsm_handle));
363                 *(dsm_handle *) session_dsm_handle_space = session_dsm_handle;
364                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_SESSION_DSM,
365                                            session_dsm_handle_space);
366
367                 /* Serialize transaction state. */
368                 tstatespace = shm_toc_allocate(pcxt->toc, tstatelen);
369                 SerializeTransactionState(tstatelen, tstatespace);
370                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_STATE, tstatespace);
371
372                 /* Serialize reindex state. */
373                 reindexspace = shm_toc_allocate(pcxt->toc, reindexlen);
374                 SerializeReindexState(reindexlen, reindexspace);
375                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_REINDEX_STATE, reindexspace);
376
377                 /* Allocate space for worker information. */
378                 pcxt->worker = palloc0(sizeof(ParallelWorkerInfo) * pcxt->nworkers);
379
380                 /*
381                  * Establish error queues in dynamic shared memory.
382                  *
383                  * These queues should be used only for transmitting ErrorResponse,
384                  * NoticeResponse, and NotifyResponse protocol messages.  Tuple data
385                  * should be transmitted via separate (possibly larger?) queues.
386                  */
387                 error_queue_space =
388                         shm_toc_allocate(pcxt->toc,
389                                                          mul_size(PARALLEL_ERROR_QUEUE_SIZE,
390                                                                           pcxt->nworkers));
391                 for (i = 0; i < pcxt->nworkers; ++i)
392                 {
393                         char       *start;
394                         shm_mq     *mq;
395
396                         start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
397                         mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
398                         shm_mq_set_receiver(mq, MyProc);
399                         pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
400                 }
401                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, error_queue_space);
402
403                 /*
404                  * Serialize entrypoint information.  It's unsafe to pass function
405                  * pointers across processes, as the function pointer may be different
406                  * in each process in EXEC_BACKEND builds, so we always pass library
407                  * and function name.  (We use library name "postgres" for functions
408                  * in the core backend.)
409                  */
410                 lnamelen = strlen(pcxt->library_name);
411                 entrypointstate = shm_toc_allocate(pcxt->toc, lnamelen +
412                                                                                    strlen(pcxt->function_name) + 2);
413                 strcpy(entrypointstate, pcxt->library_name);
414                 strcpy(entrypointstate + lnamelen + 1, pcxt->function_name);
415                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_ENTRYPOINT, entrypointstate);
416         }
417
418         /* Restore previous memory context. */
419         MemoryContextSwitchTo(oldcontext);
420 }
421
422 /*
423  * Reinitialize the dynamic shared memory segment for a parallel context such
424  * that we could launch workers for it again.
425  */
426 void
427 ReinitializeParallelDSM(ParallelContext *pcxt)
428 {
429         FixedParallelState *fps;
430
431         /* Wait for any old workers to exit. */
432         if (pcxt->nworkers_launched > 0)
433         {
434                 WaitForParallelWorkersToFinish(pcxt);
435                 WaitForParallelWorkersToExit(pcxt);
436                 pcxt->nworkers_launched = 0;
437                 if (pcxt->known_attached_workers)
438                 {
439                         pfree(pcxt->known_attached_workers);
440                         pcxt->known_attached_workers = NULL;
441                         pcxt->nknown_attached_workers = 0;
442                 }
443         }
444
445         /* Reset a few bits of fixed parallel state to a clean state. */
446         fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
447         fps->last_xlog_end = 0;
448
449         /* Recreate error queues (if they exist). */
450         if (pcxt->nworkers > 0)
451         {
452                 char       *error_queue_space;
453                 int                     i;
454
455                 error_queue_space =
456                         shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, false);
457                 for (i = 0; i < pcxt->nworkers; ++i)
458                 {
459                         char       *start;
460                         shm_mq     *mq;
461
462                         start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
463                         mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
464                         shm_mq_set_receiver(mq, MyProc);
465                         pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
466                 }
467         }
468 }
469
470 /*
471  * Launch parallel workers.
472  */
473 void
474 LaunchParallelWorkers(ParallelContext *pcxt)
475 {
476         MemoryContext oldcontext;
477         BackgroundWorker worker;
478         int                     i;
479         bool            any_registrations_failed = false;
480
481         /* Skip this if we have no workers. */
482         if (pcxt->nworkers == 0)
483                 return;
484
485         /* We need to be a lock group leader. */
486         BecomeLockGroupLeader();
487
488         /* If we do have workers, we'd better have a DSM segment. */
489         Assert(pcxt->seg != NULL);
490
491         /* We might be running in a short-lived memory context. */
492         oldcontext = MemoryContextSwitchTo(TopTransactionContext);
493
494         /* Configure a worker. */
495         memset(&worker, 0, sizeof(worker));
496         snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d",
497                          MyProcPid);
498         snprintf(worker.bgw_type, BGW_MAXLEN, "parallel worker");
499         worker.bgw_flags =
500                 BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION
501                 | BGWORKER_CLASS_PARALLEL;
502         worker.bgw_start_time = BgWorkerStart_ConsistentState;
503         worker.bgw_restart_time = BGW_NEVER_RESTART;
504         sprintf(worker.bgw_library_name, "postgres");
505         sprintf(worker.bgw_function_name, "ParallelWorkerMain");
506         worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(pcxt->seg));
507         worker.bgw_notify_pid = MyProcPid;
508
509         /*
510          * Start workers.
511          *
512          * The caller must be able to tolerate ending up with fewer workers than
513          * expected, so there is no need to throw an error here if registration
514          * fails.  It wouldn't help much anyway, because registering the worker in
515          * no way guarantees that it will start up and initialize successfully.
516          */
517         for (i = 0; i < pcxt->nworkers; ++i)
518         {
519                 memcpy(worker.bgw_extra, &i, sizeof(int));
520                 if (!any_registrations_failed &&
521                         RegisterDynamicBackgroundWorker(&worker,
522                                                                                         &pcxt->worker[i].bgwhandle))
523                 {
524                         shm_mq_set_handle(pcxt->worker[i].error_mqh,
525                                                           pcxt->worker[i].bgwhandle);
526                         pcxt->nworkers_launched++;
527                 }
528                 else
529                 {
530                         /*
531                          * If we weren't able to register the worker, then we've bumped up
532                          * against the max_worker_processes limit, and future
533                          * registrations will probably fail too, so arrange to skip them.
534                          * But we still have to execute this code for the remaining slots
535                          * to make sure that we forget about the error queues we budgeted
536                          * for those workers.  Otherwise, we'll wait for them to start,
537                          * but they never will.
538                          */
539                         any_registrations_failed = true;
540                         pcxt->worker[i].bgwhandle = NULL;
541                         shm_mq_detach(pcxt->worker[i].error_mqh);
542                         pcxt->worker[i].error_mqh = NULL;
543                 }
544         }
545
546         /*
547          * Now that nworkers_launched has taken its final value, we can initialize
548          * known_attached_workers.
549          */
550         if (pcxt->nworkers_launched > 0)
551         {
552                 pcxt->known_attached_workers =
553                         palloc0(sizeof(bool) * pcxt->nworkers_launched);
554                 pcxt->nknown_attached_workers = 0;
555         }
556
557         /* Restore previous memory context. */
558         MemoryContextSwitchTo(oldcontext);
559 }
560
561 /*
562  * Wait for all workers to attach to their error queues, and throw an error if
563  * any worker fails to do this.
564  *
565  * Callers can assume that if this function returns successfully, then the
566  * number of workers given by pcxt->nworkers_launched have initialized and
567  * attached to their error queues.  Whether or not these workers are guaranteed
568  * to still be running depends on what code the caller asked them to run;
569  * this function does not guarantee that they have not exited.  However, it
570  * does guarantee that any workers which exited must have done so cleanly and
571  * after successfully performing the work with which they were tasked.
572  *
573  * If this function is not called, then some of the workers that were launched
574  * may not have been started due to a fork() failure, or may have exited during
575  * early startup prior to attaching to the error queue, so nworkers_launched
576  * cannot be viewed as completely reliable.  It will never be less than the
577  * number of workers which actually started, but it might be more.  Any workers
578  * that failed to start will still be discovered by
579  * WaitForParallelWorkersToFinish and an error will be thrown at that time,
580  * provided that function is eventually reached.
581  *
582  * In general, the leader process should do as much work as possible before
583  * calling this function.  fork() failures and other early-startup failures
584  * are very uncommon, and having the leader sit idle when it could be doing
585  * useful work is undesirable.  However, if the leader needs to wait for
586  * all of its workers or for a specific worker, it may want to call this
587  * function before doing so.  If not, it must make some other provision for
588  * the failure-to-start case, lest it wait forever.  On the other hand, a
589  * leader which never waits for a worker that might not be started yet, or
590  * at least never does so prior to WaitForParallelWorkersToFinish(), need not
591  * call this function at all.
592  */
593 void
594 WaitForParallelWorkersToAttach(ParallelContext *pcxt)
595 {
596         int                     i;
597
598         /* Skip this if we have no launched workers. */
599         if (pcxt->nworkers_launched == 0)
600                 return;
601
602         for (;;)
603         {
604                 /*
605                  * This will process any parallel messages that are pending and it may
606                  * also throw an error propagated from a worker.
607                  */
608                 CHECK_FOR_INTERRUPTS();
609
610                 for (i = 0; i < pcxt->nworkers_launched; ++i)
611                 {
612                         BgwHandleStatus status;
613                         shm_mq     *mq;
614                         int                     rc;
615                         pid_t           pid;
616
617                         if (pcxt->known_attached_workers[i])
618                                 continue;
619
620                         /*
621                          * If error_mqh is NULL, then the worker has already exited
622                          * cleanly.
623                          */
624                         if (pcxt->worker[i].error_mqh == NULL)
625                         {
626                                 pcxt->known_attached_workers[i] = true;
627                                 ++pcxt->nknown_attached_workers;
628                                 continue;
629                         }
630
631                         status = GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle, &pid);
632                         if (status == BGWH_STARTED)
633                         {
634                                 /* Has the worker attached to the error queue? */
635                                 mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
636                                 if (shm_mq_get_sender(mq) != NULL)
637                                 {
638                                         /* Yes, so it is known to be attached. */
639                                         pcxt->known_attached_workers[i] = true;
640                                         ++pcxt->nknown_attached_workers;
641                                 }
642                         }
643                         else if (status == BGWH_STOPPED)
644                         {
645                                 /*
646                                  * If the worker stopped without attaching to the error queue,
647                                  * throw an error.
648                                  */
649                                 mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
650                                 if (shm_mq_get_sender(mq) == NULL)
651                                         ereport(ERROR,
652                                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
653                                                          errmsg("parallel worker failed to initialize"),
654                                                          errhint("More details may be available in the server log.")));
655
656                                 pcxt->known_attached_workers[i] = true;
657                                 ++pcxt->nknown_attached_workers;
658                         }
659                         else
660                         {
661                                 /*
662                                  * Worker not yet started, so we must wait.  The postmaster
663                                  * will notify us if the worker's state changes.  Our latch
664                                  * might also get set for some other reason, but if so we'll
665                                  * just end up waiting for the same worker again.
666                                  */
667                                 rc = WaitLatch(MyLatch,
668                                                            WL_LATCH_SET | WL_POSTMASTER_DEATH,
669                                                            -1, WAIT_EVENT_BGWORKER_STARTUP);
670
671                                 /* emergency bailout if postmaster has died */
672                                 if (rc & WL_POSTMASTER_DEATH)
673                                         proc_exit(1);
674
675                                 if (rc & WL_LATCH_SET)
676                                         ResetLatch(MyLatch);
677                         }
678                 }
679
680                 /* If all workers are known to have started, we're done. */
681                 if (pcxt->nknown_attached_workers >= pcxt->nworkers_launched)
682                 {
683                         Assert(pcxt->nknown_attached_workers == pcxt->nworkers_launched);
684                         break;
685                 }
686         }
687 }
688
689 /*
690  * Wait for all workers to finish computing.
691  *
692  * Even if the parallel operation seems to have completed successfully, it's
693  * important to call this function afterwards.  We must not miss any errors
694  * the workers may have thrown during the parallel operation, or any that they
695  * may yet throw while shutting down.
696  *
697  * Also, we want to update our notion of XactLastRecEnd based on worker
698  * feedback.
699  */
700 void
701 WaitForParallelWorkersToFinish(ParallelContext *pcxt)
702 {
703         for (;;)
704         {
705                 bool            anyone_alive = false;
706                 int                     nfinished = 0;
707                 int                     i;
708
709                 /*
710                  * This will process any parallel messages that are pending, which may
711                  * change the outcome of the loop that follows.  It may also throw an
712                  * error propagated from a worker.
713                  */
714                 CHECK_FOR_INTERRUPTS();
715
716                 for (i = 0; i < pcxt->nworkers_launched; ++i)
717                 {
718                         /*
719                          * If error_mqh is NULL, then the worker has already exited
720                          * cleanly.  If we have received a message through error_mqh from
721                          * the worker, we know it started up cleanly, and therefore we're
722                          * certain to be notified when it exits.
723                          */
724                         if (pcxt->worker[i].error_mqh == NULL)
725                                 ++nfinished;
726                         else if (pcxt->known_attached_workers[i])
727                         {
728                                 anyone_alive = true;
729                                 break;
730                         }
731                 }
732
733                 if (!anyone_alive)
734                 {
735                         /* If all workers are known to have finished, we're done. */
736                         if (nfinished >= pcxt->nworkers_launched)
737                         {
738                                 Assert(nfinished == pcxt->nworkers_launched);
739                                 break;
740                         }
741
742                         /*
743                          * We didn't detect any living workers, but not all workers are
744                          * known to have exited cleanly.  Either not all workers have
745                          * launched yet, or maybe some of them failed to start or
746                          * terminated abnormally.
747                          */
748                         for (i = 0; i < pcxt->nworkers_launched; ++i)
749                         {
750                                 pid_t           pid;
751                                 shm_mq     *mq;
752
753                                 /*
754                                  * If the worker is BGWH_NOT_YET_STARTED or BGWH_STARTED, we
755                                  * should just keep waiting.  If it is BGWH_STOPPED, then
756                                  * further investigation is needed.
757                                  */
758                                 if (pcxt->worker[i].error_mqh == NULL ||
759                                         pcxt->worker[i].bgwhandle == NULL ||
760                                         GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle,
761                                                                                    &pid) != BGWH_STOPPED)
762                                         continue;
763
764                                 /*
765                                  * Check whether the worker ended up stopped without ever
766                                  * attaching to the error queue.  If so, the postmaster was
767                                  * unable to fork the worker or it exited without initializing
768                                  * properly.  We must throw an error, since the caller may
769                                  * have been expecting the worker to do some work before
770                                  * exiting.
771                                  */
772                                 mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
773                                 if (shm_mq_get_sender(mq) == NULL)
774                                         ereport(ERROR,
775                                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
776                                                          errmsg("parallel worker failed to initialize"),
777                                                          errhint("More details may be available in the server log.")));
778
779                                 /*
780                                  * The worker is stopped, but is attached to the error queue.
781                                  * Unless there's a bug somewhere, this will only happen when
782                                  * the worker writes messages and terminates after the
783                                  * CHECK_FOR_INTERRUPTS() near the top of this function and
784                                  * before the call to GetBackgroundWorkerPid().  In that case,
785                                  * or latch should have been set as well and the right things
786                                  * will happen on the next pass through the loop.
787                                  */
788                         }
789                 }
790
791                 WaitLatch(MyLatch, WL_LATCH_SET, -1,
792                                   WAIT_EVENT_PARALLEL_FINISH);
793                 ResetLatch(MyLatch);
794         }
795
796         if (pcxt->toc != NULL)
797         {
798                 FixedParallelState *fps;
799
800                 fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
801                 if (fps->last_xlog_end > XactLastRecEnd)
802                         XactLastRecEnd = fps->last_xlog_end;
803         }
804 }
805
806 /*
807  * Wait for all workers to exit.
808  *
809  * This function ensures that workers have been completely shutdown.  The
810  * difference between WaitForParallelWorkersToFinish and this function is
811  * that former just ensures that last message sent by worker backend is
812  * received by master backend whereas this ensures the complete shutdown.
813  */
814 static void
815 WaitForParallelWorkersToExit(ParallelContext *pcxt)
816 {
817         int                     i;
818
819         /* Wait until the workers actually die. */
820         for (i = 0; i < pcxt->nworkers_launched; ++i)
821         {
822                 BgwHandleStatus status;
823
824                 if (pcxt->worker == NULL || pcxt->worker[i].bgwhandle == NULL)
825                         continue;
826
827                 status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle);
828
829                 /*
830                  * If the postmaster kicked the bucket, we have no chance of cleaning
831                  * up safely -- we won't be able to tell when our workers are actually
832                  * dead.  This doesn't necessitate a PANIC since they will all abort
833                  * eventually, but we can't safely continue this session.
834                  */
835                 if (status == BGWH_POSTMASTER_DIED)
836                         ereport(FATAL,
837                                         (errcode(ERRCODE_ADMIN_SHUTDOWN),
838                                          errmsg("postmaster exited during a parallel transaction")));
839
840                 /* Release memory. */
841                 pfree(pcxt->worker[i].bgwhandle);
842                 pcxt->worker[i].bgwhandle = NULL;
843         }
844 }
845
846 /*
847  * Destroy a parallel context.
848  *
849  * If expecting a clean exit, you should use WaitForParallelWorkersToFinish()
850  * first, before calling this function.  When this function is invoked, any
851  * remaining workers are forcibly killed; the dynamic shared memory segment
852  * is unmapped; and we then wait (uninterruptibly) for the workers to exit.
853  */
854 void
855 DestroyParallelContext(ParallelContext *pcxt)
856 {
857         int                     i;
858
859         /*
860          * Be careful about order of operations here!  We remove the parallel
861          * context from the list before we do anything else; otherwise, if an
862          * error occurs during a subsequent step, we might try to nuke it again
863          * from AtEOXact_Parallel or AtEOSubXact_Parallel.
864          */
865         dlist_delete(&pcxt->node);
866
867         /* Kill each worker in turn, and forget their error queues. */
868         if (pcxt->worker != NULL)
869         {
870                 for (i = 0; i < pcxt->nworkers_launched; ++i)
871                 {
872                         if (pcxt->worker[i].error_mqh != NULL)
873                         {
874                                 TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
875
876                                 shm_mq_detach(pcxt->worker[i].error_mqh);
877                                 pcxt->worker[i].error_mqh = NULL;
878                         }
879                 }
880         }
881
882         /*
883          * If we have allocated a shared memory segment, detach it.  This will
884          * implicitly detach the error queues, and any other shared memory queues,
885          * stored there.
886          */
887         if (pcxt->seg != NULL)
888         {
889                 dsm_detach(pcxt->seg);
890                 pcxt->seg = NULL;
891         }
892
893         /*
894          * If this parallel context is actually in backend-private memory rather
895          * than shared memory, free that memory instead.
896          */
897         if (pcxt->private_memory != NULL)
898         {
899                 pfree(pcxt->private_memory);
900                 pcxt->private_memory = NULL;
901         }
902
903         /*
904          * We can't finish transaction commit or abort until all of the workers
905          * have exited.  This means, in particular, that we can't respond to
906          * interrupts at this stage.
907          */
908         HOLD_INTERRUPTS();
909         WaitForParallelWorkersToExit(pcxt);
910         RESUME_INTERRUPTS();
911
912         /* Free the worker array itself. */
913         if (pcxt->worker != NULL)
914         {
915                 pfree(pcxt->worker);
916                 pcxt->worker = NULL;
917         }
918
919         /* Free memory. */
920         pfree(pcxt->library_name);
921         pfree(pcxt->function_name);
922         pfree(pcxt);
923 }
924
925 /*
926  * Are there any parallel contexts currently active?
927  */
928 bool
929 ParallelContextActive(void)
930 {
931         return !dlist_is_empty(&pcxt_list);
932 }
933
934 /*
935  * Handle receipt of an interrupt indicating a parallel worker message.
936  *
937  * Note: this is called within a signal handler!  All we can do is set
938  * a flag that will cause the next CHECK_FOR_INTERRUPTS() to invoke
939  * HandleParallelMessages().
940  */
941 void
942 HandleParallelMessageInterrupt(void)
943 {
944         InterruptPending = true;
945         ParallelMessagePending = true;
946         SetLatch(MyLatch);
947 }
948
949 /*
950  * Handle any queued protocol messages received from parallel workers.
951  */
952 void
953 HandleParallelMessages(void)
954 {
955         dlist_iter      iter;
956         MemoryContext oldcontext;
957
958         static MemoryContext hpm_context = NULL;
959
960         /*
961          * This is invoked from ProcessInterrupts(), and since some of the
962          * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
963          * for recursive calls if more signals are received while this runs.  It's
964          * unclear that recursive entry would be safe, and it doesn't seem useful
965          * even if it is safe, so let's block interrupts until done.
966          */
967         HOLD_INTERRUPTS();
968
969         /*
970          * Moreover, CurrentMemoryContext might be pointing almost anywhere.  We
971          * don't want to risk leaking data into long-lived contexts, so let's do
972          * our work here in a private context that we can reset on each use.
973          */
974         if (hpm_context == NULL)        /* first time through? */
975                 hpm_context = AllocSetContextCreate(TopMemoryContext,
976                                                                                         "HandleParallelMessages",
977                                                                                         ALLOCSET_DEFAULT_SIZES);
978         else
979                 MemoryContextReset(hpm_context);
980
981         oldcontext = MemoryContextSwitchTo(hpm_context);
982
983         /* OK to process messages.  Reset the flag saying there are more to do. */
984         ParallelMessagePending = false;
985
986         dlist_foreach(iter, &pcxt_list)
987         {
988                 ParallelContext *pcxt;
989                 int                     i;
990
991                 pcxt = dlist_container(ParallelContext, node, iter.cur);
992                 if (pcxt->worker == NULL)
993                         continue;
994
995                 for (i = 0; i < pcxt->nworkers_launched; ++i)
996                 {
997                         /*
998                          * Read as many messages as we can from each worker, but stop when
999                          * either (1) the worker's error queue goes away, which can happen
1000                          * if we receive a Terminate message from the worker; or (2) no
1001                          * more messages can be read from the worker without blocking.
1002                          */
1003                         while (pcxt->worker[i].error_mqh != NULL)
1004                         {
1005                                 shm_mq_result res;
1006                                 Size            nbytes;
1007                                 void       *data;
1008
1009                                 res = shm_mq_receive(pcxt->worker[i].error_mqh, &nbytes,
1010                                                                          &data, true);
1011                                 if (res == SHM_MQ_WOULD_BLOCK)
1012                                         break;
1013                                 else if (res == SHM_MQ_SUCCESS)
1014                                 {
1015                                         StringInfoData msg;
1016
1017                                         initStringInfo(&msg);
1018                                         appendBinaryStringInfo(&msg, data, nbytes);
1019                                         HandleParallelMessage(pcxt, i, &msg);
1020                                         pfree(msg.data);
1021                                 }
1022                                 else
1023                                         ereport(ERROR,
1024                                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1025                                                          errmsg("lost connection to parallel worker")));
1026                         }
1027                 }
1028         }
1029
1030         MemoryContextSwitchTo(oldcontext);
1031
1032         /* Might as well clear the context on our way out */
1033         MemoryContextReset(hpm_context);
1034
1035         RESUME_INTERRUPTS();
1036 }
1037
1038 /*
1039  * Handle a single protocol message received from a single parallel worker.
1040  */
1041 static void
1042 HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
1043 {
1044         char            msgtype;
1045
1046         if (pcxt->known_attached_workers != NULL &&
1047                 !pcxt->known_attached_workers[i])
1048         {
1049                 pcxt->known_attached_workers[i] = true;
1050                 pcxt->nknown_attached_workers++;
1051         }
1052
1053         msgtype = pq_getmsgbyte(msg);
1054
1055         switch (msgtype)
1056         {
1057                 case 'K':                               /* BackendKeyData */
1058                         {
1059                                 int32           pid = pq_getmsgint(msg, 4);
1060
1061                                 (void) pq_getmsgint(msg, 4);    /* discard cancel key */
1062                                 (void) pq_getmsgend(msg);
1063                                 pcxt->worker[i].pid = pid;
1064                                 break;
1065                         }
1066
1067                 case 'E':                               /* ErrorResponse */
1068                 case 'N':                               /* NoticeResponse */
1069                         {
1070                                 ErrorData       edata;
1071                                 ErrorContextCallback *save_error_context_stack;
1072
1073                                 /* Parse ErrorResponse or NoticeResponse. */
1074                                 pq_parse_errornotice(msg, &edata);
1075
1076                                 /* Death of a worker isn't enough justification for suicide. */
1077                                 edata.elevel = Min(edata.elevel, ERROR);
1078
1079                                 /*
1080                                  * If desired, add a context line to show that this is a
1081                                  * message propagated from a parallel worker.  Otherwise, it
1082                                  * can sometimes be confusing to understand what actually
1083                                  * happened.  (We don't do this in FORCE_PARALLEL_REGRESS mode
1084                                  * because it causes test-result instability depending on
1085                                  * whether a parallel worker is actually used or not.)
1086                                  */
1087                                 if (force_parallel_mode != FORCE_PARALLEL_REGRESS)
1088                                 {
1089                                         if (edata.context)
1090                                                 edata.context = psprintf("%s\n%s", edata.context,
1091                                                                                                  _("parallel worker"));
1092                                         else
1093                                                 edata.context = pstrdup(_("parallel worker"));
1094                                 }
1095
1096                                 /*
1097                                  * Context beyond that should use the error context callbacks
1098                                  * that were in effect when the ParallelContext was created,
1099                                  * not the current ones.
1100                                  */
1101                                 save_error_context_stack = error_context_stack;
1102                                 error_context_stack = pcxt->error_context_stack;
1103
1104                                 /* Rethrow error or print notice. */
1105                                 ThrowErrorData(&edata);
1106
1107                                 /* Not an error, so restore previous context stack. */
1108                                 error_context_stack = save_error_context_stack;
1109
1110                                 break;
1111                         }
1112
1113                 case 'A':                               /* NotifyResponse */
1114                         {
1115                                 /* Propagate NotifyResponse. */
1116                                 int32           pid;
1117                                 const char *channel;
1118                                 const char *payload;
1119
1120                                 pid = pq_getmsgint(msg, 4);
1121                                 channel = pq_getmsgrawstring(msg);
1122                                 payload = pq_getmsgrawstring(msg);
1123                                 pq_endmessage(msg);
1124
1125                                 NotifyMyFrontEnd(channel, payload, pid);
1126
1127                                 break;
1128                         }
1129
1130                 case 'X':                               /* Terminate, indicating clean exit */
1131                         {
1132                                 shm_mq_detach(pcxt->worker[i].error_mqh);
1133                                 pcxt->worker[i].error_mqh = NULL;
1134                                 break;
1135                         }
1136
1137                 default:
1138                         {
1139                                 elog(ERROR, "unrecognized message type received from parallel worker: %c (message length %d bytes)",
1140                                          msgtype, msg->len);
1141                         }
1142         }
1143 }
1144
1145 /*
1146  * End-of-subtransaction cleanup for parallel contexts.
1147  *
1148  * Currently, it's forbidden to enter or leave a subtransaction while
1149  * parallel mode is in effect, so we could just blow away everything.  But
1150  * we may want to relax that restriction in the future, so this code
1151  * contemplates that there may be multiple subtransaction IDs in pcxt_list.
1152  */
1153 void
1154 AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId)
1155 {
1156         while (!dlist_is_empty(&pcxt_list))
1157         {
1158                 ParallelContext *pcxt;
1159
1160                 pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
1161                 if (pcxt->subid != mySubId)
1162                         break;
1163                 if (isCommit)
1164                         elog(WARNING, "leaked parallel context");
1165                 DestroyParallelContext(pcxt);
1166         }
1167 }
1168
1169 /*
1170  * End-of-transaction cleanup for parallel contexts.
1171  */
1172 void
1173 AtEOXact_Parallel(bool isCommit)
1174 {
1175         while (!dlist_is_empty(&pcxt_list))
1176         {
1177                 ParallelContext *pcxt;
1178
1179                 pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
1180                 if (isCommit)
1181                         elog(WARNING, "leaked parallel context");
1182                 DestroyParallelContext(pcxt);
1183         }
1184 }
1185
1186 /*
1187  * Main entrypoint for parallel workers.
1188  */
1189 void
1190 ParallelWorkerMain(Datum main_arg)
1191 {
1192         dsm_segment *seg;
1193         shm_toc    *toc;
1194         FixedParallelState *fps;
1195         char       *error_queue_space;
1196         shm_mq     *mq;
1197         shm_mq_handle *mqh;
1198         char       *libraryspace;
1199         char       *entrypointstate;
1200         char       *library_name;
1201         char       *function_name;
1202         parallel_worker_main_type entrypt;
1203         char       *gucspace;
1204         char       *combocidspace;
1205         char       *tsnapspace;
1206         char       *asnapspace;
1207         char       *tstatespace;
1208         char       *reindexspace;
1209         StringInfoData msgbuf;
1210         char       *session_dsm_handle_space;
1211
1212         /* Set flag to indicate that we're initializing a parallel worker. */
1213         InitializingParallelWorker = true;
1214
1215         /* Establish signal handlers. */
1216         pqsignal(SIGTERM, die);
1217         BackgroundWorkerUnblockSignals();
1218
1219         /* Determine and set our parallel worker number. */
1220         Assert(ParallelWorkerNumber == -1);
1221         memcpy(&ParallelWorkerNumber, MyBgworkerEntry->bgw_extra, sizeof(int));
1222
1223         /* Set up a memory context and resource owner. */
1224         Assert(CurrentResourceOwner == NULL);
1225         CurrentResourceOwner = ResourceOwnerCreate(NULL, "parallel toplevel");
1226         CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext,
1227                                                                                                  "Parallel worker",
1228                                                                                                  ALLOCSET_DEFAULT_SIZES);
1229
1230         /*
1231          * Now that we have a resource owner, we can attach to the dynamic shared
1232          * memory segment and read the table of contents.
1233          */
1234         seg = dsm_attach(DatumGetUInt32(main_arg));
1235         if (seg == NULL)
1236                 ereport(ERROR,
1237                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1238                                  errmsg("could not map dynamic shared memory segment")));
1239         toc = shm_toc_attach(PARALLEL_MAGIC, dsm_segment_address(seg));
1240         if (toc == NULL)
1241                 ereport(ERROR,
1242                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1243                                  errmsg("invalid magic number in dynamic shared memory segment")));
1244
1245         /* Look up fixed parallel state. */
1246         fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED, false);
1247         MyFixedParallelState = fps;
1248
1249         /* Arrange to signal the leader if we exit. */
1250         ParallelMasterPid = fps->parallel_master_pid;
1251         ParallelMasterBackendId = fps->parallel_master_backend_id;
1252         on_shmem_exit(ParallelWorkerShutdown, (Datum) 0);
1253
1254         /*
1255          * Now we can find and attach to the error queue provided for us.  That's
1256          * good, because until we do that, any errors that happen here will not be
1257          * reported back to the process that requested that this worker be
1258          * launched.
1259          */
1260         error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE, false);
1261         mq = (shm_mq *) (error_queue_space +
1262                                          ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE);
1263         shm_mq_set_sender(mq, MyProc);
1264         mqh = shm_mq_attach(mq, seg, NULL);
1265         pq_redirect_to_shm_mq(seg, mqh);
1266         pq_set_parallel_master(fps->parallel_master_pid,
1267                                                    fps->parallel_master_backend_id);
1268
1269         /*
1270          * Send a BackendKeyData message to the process that initiated parallelism
1271          * so that it has access to our PID before it receives any other messages
1272          * from us.  Our cancel key is sent, too, since that's the way the
1273          * protocol message is defined, but it won't actually be used for anything
1274          * in this case.
1275          */
1276         pq_beginmessage(&msgbuf, 'K');
1277         pq_sendint32(&msgbuf, (int32) MyProcPid);
1278         pq_sendint32(&msgbuf, (int32) MyCancelKey);
1279         pq_endmessage(&msgbuf);
1280
1281         /*
1282          * Hooray! Primary initialization is complete.  Now, we need to set up our
1283          * backend-local state to match the original backend.
1284          */
1285
1286         /*
1287          * Join locking group.  We must do this before anything that could try to
1288          * acquire a heavyweight lock, because any heavyweight locks acquired to
1289          * this point could block either directly against the parallel group
1290          * leader or against some process which in turn waits for a lock that
1291          * conflicts with the parallel group leader, causing an undetected
1292          * deadlock.  (If we can't join the lock group, the leader has gone away,
1293          * so just exit quietly.)
1294          */
1295         if (!BecomeLockGroupMember(fps->parallel_master_pgproc,
1296                                                            fps->parallel_master_pid))
1297                 return;
1298
1299         /*
1300          * Load libraries that were loaded by original backend.  We want to do
1301          * this before restoring GUCs, because the libraries might define custom
1302          * variables.
1303          */
1304         libraryspace = shm_toc_lookup(toc, PARALLEL_KEY_LIBRARY, false);
1305         RestoreLibraryState(libraryspace);
1306
1307         /*
1308          * Identify the entry point to be called.  In theory this could result in
1309          * loading an additional library, though most likely the entry point is in
1310          * the core backend or in a library we just loaded.
1311          */
1312         entrypointstate = shm_toc_lookup(toc, PARALLEL_KEY_ENTRYPOINT, false);
1313         library_name = entrypointstate;
1314         function_name = entrypointstate + strlen(library_name) + 1;
1315
1316         entrypt = LookupParallelWorkerFunction(library_name, function_name);
1317
1318         /* Restore database connection. */
1319         BackgroundWorkerInitializeConnectionByOid(fps->database_id,
1320                                                                                           fps->authenticated_user_id,
1321                                                                                           0);
1322
1323         /*
1324          * Set the client encoding to the database encoding, since that is what
1325          * the leader will expect.
1326          */
1327         SetClientEncoding(GetDatabaseEncoding());
1328
1329         /* Restore GUC values from launching backend. */
1330         gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC, false);
1331         StartTransactionCommand();
1332         RestoreGUCState(gucspace);
1333         CommitTransactionCommand();
1334
1335         /* Crank up a transaction state appropriate to a parallel worker. */
1336         tstatespace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_STATE, false);
1337         StartParallelWorkerTransaction(tstatespace);
1338
1339         /* Restore combo CID state. */
1340         combocidspace = shm_toc_lookup(toc, PARALLEL_KEY_COMBO_CID, false);
1341         RestoreComboCIDState(combocidspace);
1342
1343         /* Attach to the per-session DSM segment and contained objects. */
1344         session_dsm_handle_space =
1345                 shm_toc_lookup(toc, PARALLEL_KEY_SESSION_DSM, false);
1346         AttachSession(*(dsm_handle *) session_dsm_handle_space);
1347
1348         /* Restore transaction snapshot. */
1349         tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT, false);
1350         RestoreTransactionSnapshot(RestoreSnapshot(tsnapspace),
1351                                                            fps->parallel_master_pgproc);
1352
1353         /* Restore active snapshot. */
1354         asnapspace = shm_toc_lookup(toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, false);
1355         PushActiveSnapshot(RestoreSnapshot(asnapspace));
1356
1357         /*
1358          * We've changed which tuples we can see, and must therefore invalidate
1359          * system caches.
1360          */
1361         InvalidateSystemCaches();
1362
1363         /*
1364          * Restore current role id.  Skip verifying whether session user is
1365          * allowed to become this role and blindly restore the leader's state for
1366          * current role.
1367          */
1368         SetCurrentRoleId(fps->outer_user_id, fps->is_superuser);
1369
1370         /* Restore user ID and security context. */
1371         SetUserIdAndSecContext(fps->current_user_id, fps->sec_context);
1372
1373         /* Restore temp-namespace state to ensure search path matches leader's. */
1374         SetTempNamespaceState(fps->temp_namespace_id,
1375                                                   fps->temp_toast_namespace_id);
1376
1377         /* Restore reindex state. */
1378         reindexspace = shm_toc_lookup(toc, PARALLEL_KEY_REINDEX_STATE, false);
1379         RestoreReindexState(reindexspace);
1380
1381         /*
1382          * We've initialized all of our state now; nothing should change
1383          * hereafter.
1384          */
1385         InitializingParallelWorker = false;
1386         EnterParallelMode();
1387
1388         /*
1389          * Time to do the real work: invoke the caller-supplied code.
1390          */
1391         entrypt(seg, toc);
1392
1393         /* Must exit parallel mode to pop active snapshot. */
1394         ExitParallelMode();
1395
1396         /* Must pop active snapshot so resowner.c doesn't complain. */
1397         PopActiveSnapshot();
1398
1399         /* Shut down the parallel-worker transaction. */
1400         EndParallelWorkerTransaction();
1401
1402         /* Detach from the per-session DSM segment. */
1403         DetachSession();
1404
1405         /* Report success. */
1406         pq_putmessage('X', NULL, 0);
1407 }
1408
1409 /*
1410  * Update shared memory with the ending location of the last WAL record we
1411  * wrote, if it's greater than the value already stored there.
1412  */
1413 void
1414 ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end)
1415 {
1416         FixedParallelState *fps = MyFixedParallelState;
1417
1418         Assert(fps != NULL);
1419         SpinLockAcquire(&fps->mutex);
1420         if (fps->last_xlog_end < last_xlog_end)
1421                 fps->last_xlog_end = last_xlog_end;
1422         SpinLockRelease(&fps->mutex);
1423 }
1424
1425 /*
1426  * Make sure the leader tries to read from our error queue one more time.
1427  * This guards against the case where we exit uncleanly without sending an
1428  * ErrorResponse to the leader, for example because some code calls proc_exit
1429  * directly.
1430  */
1431 static void
1432 ParallelWorkerShutdown(int code, Datum arg)
1433 {
1434         SendProcSignal(ParallelMasterPid,
1435                                    PROCSIG_PARALLEL_MESSAGE,
1436                                    ParallelMasterBackendId);
1437 }
1438
1439 /*
1440  * Look up (and possibly load) a parallel worker entry point function.
1441  *
1442  * For functions contained in the core code, we use library name "postgres"
1443  * and consult the InternalParallelWorkers array.  External functions are
1444  * looked up, and loaded if necessary, using load_external_function().
1445  *
1446  * The point of this is to pass function names as strings across process
1447  * boundaries.  We can't pass actual function addresses because of the
1448  * possibility that the function has been loaded at a different address
1449  * in a different process.  This is obviously a hazard for functions in
1450  * loadable libraries, but it can happen even for functions in the core code
1451  * on platforms using EXEC_BACKEND (e.g., Windows).
1452  *
1453  * At some point it might be worthwhile to get rid of InternalParallelWorkers[]
1454  * in favor of applying load_external_function() for core functions too;
1455  * but that raises portability issues that are not worth addressing now.
1456  */
1457 static parallel_worker_main_type
1458 LookupParallelWorkerFunction(const char *libraryname, const char *funcname)
1459 {
1460         /*
1461          * If the function is to be loaded from postgres itself, search the
1462          * InternalParallelWorkers array.
1463          */
1464         if (strcmp(libraryname, "postgres") == 0)
1465         {
1466                 int                     i;
1467
1468                 for (i = 0; i < lengthof(InternalParallelWorkers); i++)
1469                 {
1470                         if (strcmp(InternalParallelWorkers[i].fn_name, funcname) == 0)
1471                                 return InternalParallelWorkers[i].fn_addr;
1472                 }
1473
1474                 /* We can only reach this by programming error. */
1475                 elog(ERROR, "internal function \"%s\" not found", funcname);
1476         }
1477
1478         /* Otherwise load from external library. */
1479         return (parallel_worker_main_type)
1480                 load_external_function(libraryname, funcname, true, NULL);
1481 }