]> granicus.if.org Git - postgresql/blob - src/backend/access/transam/parallel.c
Propagate xactStartTimestamp and stmtStartTimestamp to parallel workers.
[postgresql] / src / backend / access / transam / parallel.c
1 /*-------------------------------------------------------------------------
2  *
3  * parallel.c
4  *        Infrastructure for launching parallel workers
5  *
6  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  *        src/backend/access/transam/parallel.c
11  *
12  *-------------------------------------------------------------------------
13  */
14
15 #include "postgres.h"
16
17 #include "access/nbtree.h"
18 #include "access/parallel.h"
19 #include "access/session.h"
20 #include "access/xact.h"
21 #include "access/xlog.h"
22 #include "catalog/index.h"
23 #include "catalog/namespace.h"
24 #include "commands/async.h"
25 #include "executor/execParallel.h"
26 #include "libpq/libpq.h"
27 #include "libpq/pqformat.h"
28 #include "libpq/pqmq.h"
29 #include "miscadmin.h"
30 #include "optimizer/planmain.h"
31 #include "pgstat.h"
32 #include "storage/ipc.h"
33 #include "storage/sinval.h"
34 #include "storage/spin.h"
35 #include "tcop/tcopprot.h"
36 #include "utils/combocid.h"
37 #include "utils/guc.h"
38 #include "utils/inval.h"
39 #include "utils/memutils.h"
40 #include "utils/relmapper.h"
41 #include "utils/snapmgr.h"
42 #include "utils/typcache.h"
43
44
45 /*
46  * We don't want to waste a lot of memory on an error queue which, most of
47  * the time, will process only a handful of small messages.  However, it is
48  * desirable to make it large enough that a typical ErrorResponse can be sent
49  * without blocking.  That way, a worker that errors out can write the whole
50  * message into the queue and terminate without waiting for the user backend.
51  */
52 #define PARALLEL_ERROR_QUEUE_SIZE                       16384
53
54 /* Magic number for parallel context TOC. */
55 #define PARALLEL_MAGIC                                          0x50477c7c
56
57 /*
58  * Magic numbers for per-context parallel state sharing.  Higher-level code
59  * should use smaller values, leaving these very large ones for use by this
60  * module.
61  */
62 #define PARALLEL_KEY_FIXED                                      UINT64CONST(0xFFFFFFFFFFFF0001)
63 #define PARALLEL_KEY_ERROR_QUEUE                        UINT64CONST(0xFFFFFFFFFFFF0002)
64 #define PARALLEL_KEY_LIBRARY                            UINT64CONST(0xFFFFFFFFFFFF0003)
65 #define PARALLEL_KEY_GUC                                        UINT64CONST(0xFFFFFFFFFFFF0004)
66 #define PARALLEL_KEY_COMBO_CID                          UINT64CONST(0xFFFFFFFFFFFF0005)
67 #define PARALLEL_KEY_TRANSACTION_SNAPSHOT       UINT64CONST(0xFFFFFFFFFFFF0006)
68 #define PARALLEL_KEY_ACTIVE_SNAPSHOT            UINT64CONST(0xFFFFFFFFFFFF0007)
69 #define PARALLEL_KEY_TRANSACTION_STATE          UINT64CONST(0xFFFFFFFFFFFF0008)
70 #define PARALLEL_KEY_ENTRYPOINT                         UINT64CONST(0xFFFFFFFFFFFF0009)
71 #define PARALLEL_KEY_SESSION_DSM                        UINT64CONST(0xFFFFFFFFFFFF000A)
72 #define PARALLEL_KEY_REINDEX_STATE                      UINT64CONST(0xFFFFFFFFFFFF000B)
73 #define PARALLEL_KEY_RELMAPPER_STATE            UINT64CONST(0xFFFFFFFFFFFF000C)
74
75 /* Fixed-size parallel state. */
76 typedef struct FixedParallelState
77 {
78         /* Fixed-size state that workers must restore. */
79         Oid                     database_id;
80         Oid                     authenticated_user_id;
81         Oid                     current_user_id;
82         Oid                     outer_user_id;
83         Oid                     temp_namespace_id;
84         Oid                     temp_toast_namespace_id;
85         int                     sec_context;
86         bool            is_superuser;
87         PGPROC     *parallel_master_pgproc;
88         pid_t           parallel_master_pid;
89         BackendId       parallel_master_backend_id;
90         TimestampTz xact_ts;
91         TimestampTz stmt_ts;
92
93         /* Mutex protects remaining fields. */
94         slock_t         mutex;
95
96         /* Maximum XactLastRecEnd of any worker. */
97         XLogRecPtr      last_xlog_end;
98 } FixedParallelState;
99
100 /*
101  * Our parallel worker number.  We initialize this to -1, meaning that we are
102  * not a parallel worker.  In parallel workers, it will be set to a value >= 0
103  * and < the number of workers before any user code is invoked; each parallel
104  * worker will get a different parallel worker number.
105  */
106 int                     ParallelWorkerNumber = -1;
107
108 /* Is there a parallel message pending which we need to receive? */
109 volatile bool ParallelMessagePending = false;
110
111 /* Are we initializing a parallel worker? */
112 bool            InitializingParallelWorker = false;
113
114 /* Pointer to our fixed parallel state. */
115 static FixedParallelState *MyFixedParallelState;
116
117 /* List of active parallel contexts. */
118 static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list);
119
120 /* Backend-local copy of data from FixedParallelState. */
121 static pid_t ParallelMasterPid;
122
123 /*
124  * List of internal parallel worker entry points.  We need this for
125  * reasons explained in LookupParallelWorkerFunction(), below.
126  */
127 static const struct
128 {
129         const char *fn_name;
130         parallel_worker_main_type fn_addr;
131 }                       InternalParallelWorkers[] =
132
133 {
134         {
135                 "ParallelQueryMain", ParallelQueryMain
136         },
137         {
138                 "_bt_parallel_build_main", _bt_parallel_build_main
139         }
140 };
141
142 /* Private functions. */
143 static void HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg);
144 static void WaitForParallelWorkersToExit(ParallelContext *pcxt);
145 static parallel_worker_main_type LookupParallelWorkerFunction(const char *libraryname, const char *funcname);
146 static void ParallelWorkerShutdown(int code, Datum arg);
147
148
149 /*
150  * Establish a new parallel context.  This should be done after entering
151  * parallel mode, and (unless there is an error) the context should be
152  * destroyed before exiting the current subtransaction.
153  */
154 ParallelContext *
155 CreateParallelContext(const char *library_name, const char *function_name,
156                                           int nworkers, bool serializable_okay)
157 {
158         MemoryContext oldcontext;
159         ParallelContext *pcxt;
160
161         /* It is unsafe to create a parallel context if not in parallel mode. */
162         Assert(IsInParallelMode());
163
164         /* Number of workers should be non-negative. */
165         Assert(nworkers >= 0);
166
167         /*
168          * If we are running under serializable isolation, we can't use parallel
169          * workers, at least not until somebody enhances that mechanism to be
170          * parallel-aware.  Utility statement callers may ask us to ignore this
171          * restriction because they're always able to safely ignore the fact that
172          * SIREAD locks do not work with parallelism.
173          */
174         if (IsolationIsSerializable() && !serializable_okay)
175                 nworkers = 0;
176
177         /* We might be running in a short-lived memory context. */
178         oldcontext = MemoryContextSwitchTo(TopTransactionContext);
179
180         /* Initialize a new ParallelContext. */
181         pcxt = palloc0(sizeof(ParallelContext));
182         pcxt->subid = GetCurrentSubTransactionId();
183         pcxt->nworkers = nworkers;
184         pcxt->library_name = pstrdup(library_name);
185         pcxt->function_name = pstrdup(function_name);
186         pcxt->error_context_stack = error_context_stack;
187         shm_toc_initialize_estimator(&pcxt->estimator);
188         dlist_push_head(&pcxt_list, &pcxt->node);
189
190         /* Restore previous memory context. */
191         MemoryContextSwitchTo(oldcontext);
192
193         return pcxt;
194 }
195
196 /*
197  * Establish the dynamic shared memory segment for a parallel context and
198  * copy state and other bookkeeping information that will be needed by
199  * parallel workers into it.
200  */
201 void
202 InitializeParallelDSM(ParallelContext *pcxt)
203 {
204         MemoryContext oldcontext;
205         Size            library_len = 0;
206         Size            guc_len = 0;
207         Size            combocidlen = 0;
208         Size            tsnaplen = 0;
209         Size            asnaplen = 0;
210         Size            tstatelen = 0;
211         Size            reindexlen = 0;
212         Size            relmapperlen = 0;
213         Size            segsize = 0;
214         int                     i;
215         FixedParallelState *fps;
216         dsm_handle      session_dsm_handle = DSM_HANDLE_INVALID;
217         Snapshot        transaction_snapshot = GetTransactionSnapshot();
218         Snapshot        active_snapshot = GetActiveSnapshot();
219
220         /* We might be running in a very short-lived memory context. */
221         oldcontext = MemoryContextSwitchTo(TopTransactionContext);
222
223         /* Allow space to store the fixed-size parallel state. */
224         shm_toc_estimate_chunk(&pcxt->estimator, sizeof(FixedParallelState));
225         shm_toc_estimate_keys(&pcxt->estimator, 1);
226
227         /*
228          * Normally, the user will have requested at least one worker process, but
229          * if by chance they have not, we can skip a bunch of things here.
230          */
231         if (pcxt->nworkers > 0)
232         {
233                 /* Get (or create) the per-session DSM segment's handle. */
234                 session_dsm_handle = GetSessionDsmHandle();
235
236                 /*
237                  * If we weren't able to create a per-session DSM segment, then we can
238                  * continue but we can't safely launch any workers because their
239                  * record typmods would be incompatible so they couldn't exchange
240                  * tuples.
241                  */
242                 if (session_dsm_handle == DSM_HANDLE_INVALID)
243                         pcxt->nworkers = 0;
244         }
245
246         if (pcxt->nworkers > 0)
247         {
248                 /* Estimate space for various kinds of state sharing. */
249                 library_len = EstimateLibraryStateSpace();
250                 shm_toc_estimate_chunk(&pcxt->estimator, library_len);
251                 guc_len = EstimateGUCStateSpace();
252                 shm_toc_estimate_chunk(&pcxt->estimator, guc_len);
253                 combocidlen = EstimateComboCIDStateSpace();
254                 shm_toc_estimate_chunk(&pcxt->estimator, combocidlen);
255                 tsnaplen = EstimateSnapshotSpace(transaction_snapshot);
256                 shm_toc_estimate_chunk(&pcxt->estimator, tsnaplen);
257                 asnaplen = EstimateSnapshotSpace(active_snapshot);
258                 shm_toc_estimate_chunk(&pcxt->estimator, asnaplen);
259                 tstatelen = EstimateTransactionStateSpace();
260                 shm_toc_estimate_chunk(&pcxt->estimator, tstatelen);
261                 shm_toc_estimate_chunk(&pcxt->estimator, sizeof(dsm_handle));
262                 reindexlen = EstimateReindexStateSpace();
263                 shm_toc_estimate_chunk(&pcxt->estimator, reindexlen);
264                 relmapperlen = EstimateRelationMapSpace();
265                 shm_toc_estimate_chunk(&pcxt->estimator, relmapperlen);
266                 /* If you add more chunks here, you probably need to add keys. */
267                 shm_toc_estimate_keys(&pcxt->estimator, 9);
268
269                 /* Estimate space need for error queues. */
270                 StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) ==
271                                                  PARALLEL_ERROR_QUEUE_SIZE,
272                                                  "parallel error queue size not buffer-aligned");
273                 shm_toc_estimate_chunk(&pcxt->estimator,
274                                                            mul_size(PARALLEL_ERROR_QUEUE_SIZE,
275                                                                                 pcxt->nworkers));
276                 shm_toc_estimate_keys(&pcxt->estimator, 1);
277
278                 /* Estimate how much we'll need for the entrypoint info. */
279                 shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name) +
280                                                            strlen(pcxt->function_name) + 2);
281                 shm_toc_estimate_keys(&pcxt->estimator, 1);
282         }
283
284         /*
285          * Create DSM and initialize with new table of contents.  But if the user
286          * didn't request any workers, then don't bother creating a dynamic shared
287          * memory segment; instead, just use backend-private memory.
288          *
289          * Also, if we can't create a dynamic shared memory segment because the
290          * maximum number of segments have already been created, then fall back to
291          * backend-private memory, and plan not to use any workers.  We hope this
292          * won't happen very often, but it's better to abandon the use of
293          * parallelism than to fail outright.
294          */
295         segsize = shm_toc_estimate(&pcxt->estimator);
296         if (pcxt->nworkers > 0)
297                 pcxt->seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS);
298         if (pcxt->seg != NULL)
299                 pcxt->toc = shm_toc_create(PARALLEL_MAGIC,
300                                                                    dsm_segment_address(pcxt->seg),
301                                                                    segsize);
302         else
303         {
304                 pcxt->nworkers = 0;
305                 pcxt->private_memory = MemoryContextAlloc(TopMemoryContext, segsize);
306                 pcxt->toc = shm_toc_create(PARALLEL_MAGIC, pcxt->private_memory,
307                                                                    segsize);
308         }
309
310         /* Initialize fixed-size state in shared memory. */
311         fps = (FixedParallelState *)
312                 shm_toc_allocate(pcxt->toc, sizeof(FixedParallelState));
313         fps->database_id = MyDatabaseId;
314         fps->authenticated_user_id = GetAuthenticatedUserId();
315         fps->outer_user_id = GetCurrentRoleId();
316         fps->is_superuser = session_auth_is_superuser;
317         GetUserIdAndSecContext(&fps->current_user_id, &fps->sec_context);
318         GetTempNamespaceState(&fps->temp_namespace_id,
319                                                   &fps->temp_toast_namespace_id);
320         fps->parallel_master_pgproc = MyProc;
321         fps->parallel_master_pid = MyProcPid;
322         fps->parallel_master_backend_id = MyBackendId;
323         fps->xact_ts = GetCurrentTransactionStartTimestamp();
324         fps->stmt_ts = GetCurrentStatementStartTimestamp();
325         SpinLockInit(&fps->mutex);
326         fps->last_xlog_end = 0;
327         shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps);
328
329         /* We can skip the rest of this if we're not budgeting for any workers. */
330         if (pcxt->nworkers > 0)
331         {
332                 char       *libraryspace;
333                 char       *gucspace;
334                 char       *combocidspace;
335                 char       *tsnapspace;
336                 char       *asnapspace;
337                 char       *tstatespace;
338                 char       *reindexspace;
339                 char       *relmapperspace;
340                 char       *error_queue_space;
341                 char       *session_dsm_handle_space;
342                 char       *entrypointstate;
343                 Size            lnamelen;
344
345                 /* Serialize shared libraries we have loaded. */
346                 libraryspace = shm_toc_allocate(pcxt->toc, library_len);
347                 SerializeLibraryState(library_len, libraryspace);
348                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_LIBRARY, libraryspace);
349
350                 /* Serialize GUC settings. */
351                 gucspace = shm_toc_allocate(pcxt->toc, guc_len);
352                 SerializeGUCState(guc_len, gucspace);
353                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_GUC, gucspace);
354
355                 /* Serialize combo CID state. */
356                 combocidspace = shm_toc_allocate(pcxt->toc, combocidlen);
357                 SerializeComboCIDState(combocidlen, combocidspace);
358                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_COMBO_CID, combocidspace);
359
360                 /* Serialize transaction snapshot and active snapshot. */
361                 tsnapspace = shm_toc_allocate(pcxt->toc, tsnaplen);
362                 SerializeSnapshot(transaction_snapshot, tsnapspace);
363                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT,
364                                            tsnapspace);
365                 asnapspace = shm_toc_allocate(pcxt->toc, asnaplen);
366                 SerializeSnapshot(active_snapshot, asnapspace);
367                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, asnapspace);
368
369                 /* Provide the handle for per-session segment. */
370                 session_dsm_handle_space = shm_toc_allocate(pcxt->toc,
371                                                                                                         sizeof(dsm_handle));
372                 *(dsm_handle *) session_dsm_handle_space = session_dsm_handle;
373                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_SESSION_DSM,
374                                            session_dsm_handle_space);
375
376                 /* Serialize transaction state. */
377                 tstatespace = shm_toc_allocate(pcxt->toc, tstatelen);
378                 SerializeTransactionState(tstatelen, tstatespace);
379                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_STATE, tstatespace);
380
381                 /* Serialize reindex state. */
382                 reindexspace = shm_toc_allocate(pcxt->toc, reindexlen);
383                 SerializeReindexState(reindexlen, reindexspace);
384                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_REINDEX_STATE, reindexspace);
385
386                 /* Serialize relmapper state. */
387                 relmapperspace = shm_toc_allocate(pcxt->toc, relmapperlen);
388                 SerializeRelationMap(relmapperlen, relmapperspace);
389                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_RELMAPPER_STATE,
390                                            relmapperspace);
391
392                 /* Allocate space for worker information. */
393                 pcxt->worker = palloc0(sizeof(ParallelWorkerInfo) * pcxt->nworkers);
394
395                 /*
396                  * Establish error queues in dynamic shared memory.
397                  *
398                  * These queues should be used only for transmitting ErrorResponse,
399                  * NoticeResponse, and NotifyResponse protocol messages.  Tuple data
400                  * should be transmitted via separate (possibly larger?) queues.
401                  */
402                 error_queue_space =
403                         shm_toc_allocate(pcxt->toc,
404                                                          mul_size(PARALLEL_ERROR_QUEUE_SIZE,
405                                                                           pcxt->nworkers));
406                 for (i = 0; i < pcxt->nworkers; ++i)
407                 {
408                         char       *start;
409                         shm_mq     *mq;
410
411                         start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
412                         mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
413                         shm_mq_set_receiver(mq, MyProc);
414                         pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
415                 }
416                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, error_queue_space);
417
418                 /*
419                  * Serialize entrypoint information.  It's unsafe to pass function
420                  * pointers across processes, as the function pointer may be different
421                  * in each process in EXEC_BACKEND builds, so we always pass library
422                  * and function name.  (We use library name "postgres" for functions
423                  * in the core backend.)
424                  */
425                 lnamelen = strlen(pcxt->library_name);
426                 entrypointstate = shm_toc_allocate(pcxt->toc, lnamelen +
427                                                                                    strlen(pcxt->function_name) + 2);
428                 strcpy(entrypointstate, pcxt->library_name);
429                 strcpy(entrypointstate + lnamelen + 1, pcxt->function_name);
430                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_ENTRYPOINT, entrypointstate);
431         }
432
433         /* Restore previous memory context. */
434         MemoryContextSwitchTo(oldcontext);
435 }
436
437 /*
438  * Reinitialize the dynamic shared memory segment for a parallel context such
439  * that we could launch workers for it again.
440  */
441 void
442 ReinitializeParallelDSM(ParallelContext *pcxt)
443 {
444         FixedParallelState *fps;
445
446         /* Wait for any old workers to exit. */
447         if (pcxt->nworkers_launched > 0)
448         {
449                 WaitForParallelWorkersToFinish(pcxt);
450                 WaitForParallelWorkersToExit(pcxt);
451                 pcxt->nworkers_launched = 0;
452                 if (pcxt->known_attached_workers)
453                 {
454                         pfree(pcxt->known_attached_workers);
455                         pcxt->known_attached_workers = NULL;
456                         pcxt->nknown_attached_workers = 0;
457                 }
458         }
459
460         /* Reset a few bits of fixed parallel state to a clean state. */
461         fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
462         fps->last_xlog_end = 0;
463
464         /* Recreate error queues (if they exist). */
465         if (pcxt->nworkers > 0)
466         {
467                 char       *error_queue_space;
468                 int                     i;
469
470                 error_queue_space =
471                         shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, false);
472                 for (i = 0; i < pcxt->nworkers; ++i)
473                 {
474                         char       *start;
475                         shm_mq     *mq;
476
477                         start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
478                         mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
479                         shm_mq_set_receiver(mq, MyProc);
480                         pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
481                 }
482         }
483 }
484
485 /*
486  * Launch parallel workers.
487  */
488 void
489 LaunchParallelWorkers(ParallelContext *pcxt)
490 {
491         MemoryContext oldcontext;
492         BackgroundWorker worker;
493         int                     i;
494         bool            any_registrations_failed = false;
495
496         /* Skip this if we have no workers. */
497         if (pcxt->nworkers == 0)
498                 return;
499
500         /* We need to be a lock group leader. */
501         BecomeLockGroupLeader();
502
503         /* If we do have workers, we'd better have a DSM segment. */
504         Assert(pcxt->seg != NULL);
505
506         /* We might be running in a short-lived memory context. */
507         oldcontext = MemoryContextSwitchTo(TopTransactionContext);
508
509         /* Configure a worker. */
510         memset(&worker, 0, sizeof(worker));
511         snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d",
512                          MyProcPid);
513         snprintf(worker.bgw_type, BGW_MAXLEN, "parallel worker");
514         worker.bgw_flags =
515                 BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION
516                 | BGWORKER_CLASS_PARALLEL;
517         worker.bgw_start_time = BgWorkerStart_ConsistentState;
518         worker.bgw_restart_time = BGW_NEVER_RESTART;
519         sprintf(worker.bgw_library_name, "postgres");
520         sprintf(worker.bgw_function_name, "ParallelWorkerMain");
521         worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(pcxt->seg));
522         worker.bgw_notify_pid = MyProcPid;
523
524         /*
525          * Start workers.
526          *
527          * The caller must be able to tolerate ending up with fewer workers than
528          * expected, so there is no need to throw an error here if registration
529          * fails.  It wouldn't help much anyway, because registering the worker in
530          * no way guarantees that it will start up and initialize successfully.
531          */
532         for (i = 0; i < pcxt->nworkers; ++i)
533         {
534                 memcpy(worker.bgw_extra, &i, sizeof(int));
535                 if (!any_registrations_failed &&
536                         RegisterDynamicBackgroundWorker(&worker,
537                                                                                         &pcxt->worker[i].bgwhandle))
538                 {
539                         shm_mq_set_handle(pcxt->worker[i].error_mqh,
540                                                           pcxt->worker[i].bgwhandle);
541                         pcxt->nworkers_launched++;
542                 }
543                 else
544                 {
545                         /*
546                          * If we weren't able to register the worker, then we've bumped up
547                          * against the max_worker_processes limit, and future
548                          * registrations will probably fail too, so arrange to skip them.
549                          * But we still have to execute this code for the remaining slots
550                          * to make sure that we forget about the error queues we budgeted
551                          * for those workers.  Otherwise, we'll wait for them to start,
552                          * but they never will.
553                          */
554                         any_registrations_failed = true;
555                         pcxt->worker[i].bgwhandle = NULL;
556                         shm_mq_detach(pcxt->worker[i].error_mqh);
557                         pcxt->worker[i].error_mqh = NULL;
558                 }
559         }
560
561         /*
562          * Now that nworkers_launched has taken its final value, we can initialize
563          * known_attached_workers.
564          */
565         if (pcxt->nworkers_launched > 0)
566         {
567                 pcxt->known_attached_workers =
568                         palloc0(sizeof(bool) * pcxt->nworkers_launched);
569                 pcxt->nknown_attached_workers = 0;
570         }
571
572         /* Restore previous memory context. */
573         MemoryContextSwitchTo(oldcontext);
574 }
575
576 /*
577  * Wait for all workers to attach to their error queues, and throw an error if
578  * any worker fails to do this.
579  *
580  * Callers can assume that if this function returns successfully, then the
581  * number of workers given by pcxt->nworkers_launched have initialized and
582  * attached to their error queues.  Whether or not these workers are guaranteed
583  * to still be running depends on what code the caller asked them to run;
584  * this function does not guarantee that they have not exited.  However, it
585  * does guarantee that any workers which exited must have done so cleanly and
586  * after successfully performing the work with which they were tasked.
587  *
588  * If this function is not called, then some of the workers that were launched
589  * may not have been started due to a fork() failure, or may have exited during
590  * early startup prior to attaching to the error queue, so nworkers_launched
591  * cannot be viewed as completely reliable.  It will never be less than the
592  * number of workers which actually started, but it might be more.  Any workers
593  * that failed to start will still be discovered by
594  * WaitForParallelWorkersToFinish and an error will be thrown at that time,
595  * provided that function is eventually reached.
596  *
597  * In general, the leader process should do as much work as possible before
598  * calling this function.  fork() failures and other early-startup failures
599  * are very uncommon, and having the leader sit idle when it could be doing
600  * useful work is undesirable.  However, if the leader needs to wait for
601  * all of its workers or for a specific worker, it may want to call this
602  * function before doing so.  If not, it must make some other provision for
603  * the failure-to-start case, lest it wait forever.  On the other hand, a
604  * leader which never waits for a worker that might not be started yet, or
605  * at least never does so prior to WaitForParallelWorkersToFinish(), need not
606  * call this function at all.
607  */
608 void
609 WaitForParallelWorkersToAttach(ParallelContext *pcxt)
610 {
611         int                     i;
612
613         /* Skip this if we have no launched workers. */
614         if (pcxt->nworkers_launched == 0)
615                 return;
616
617         for (;;)
618         {
619                 /*
620                  * This will process any parallel messages that are pending and it may
621                  * also throw an error propagated from a worker.
622                  */
623                 CHECK_FOR_INTERRUPTS();
624
625                 for (i = 0; i < pcxt->nworkers_launched; ++i)
626                 {
627                         BgwHandleStatus status;
628                         shm_mq     *mq;
629                         int                     rc;
630                         pid_t           pid;
631
632                         if (pcxt->known_attached_workers[i])
633                                 continue;
634
635                         /*
636                          * If error_mqh is NULL, then the worker has already exited
637                          * cleanly.
638                          */
639                         if (pcxt->worker[i].error_mqh == NULL)
640                         {
641                                 pcxt->known_attached_workers[i] = true;
642                                 ++pcxt->nknown_attached_workers;
643                                 continue;
644                         }
645
646                         status = GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle, &pid);
647                         if (status == BGWH_STARTED)
648                         {
649                                 /* Has the worker attached to the error queue? */
650                                 mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
651                                 if (shm_mq_get_sender(mq) != NULL)
652                                 {
653                                         /* Yes, so it is known to be attached. */
654                                         pcxt->known_attached_workers[i] = true;
655                                         ++pcxt->nknown_attached_workers;
656                                 }
657                         }
658                         else if (status == BGWH_STOPPED)
659                         {
660                                 /*
661                                  * If the worker stopped without attaching to the error queue,
662                                  * throw an error.
663                                  */
664                                 mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
665                                 if (shm_mq_get_sender(mq) == NULL)
666                                         ereport(ERROR,
667                                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
668                                                          errmsg("parallel worker failed to initialize"),
669                                                          errhint("More details may be available in the server log.")));
670
671                                 pcxt->known_attached_workers[i] = true;
672                                 ++pcxt->nknown_attached_workers;
673                         }
674                         else
675                         {
676                                 /*
677                                  * Worker not yet started, so we must wait.  The postmaster
678                                  * will notify us if the worker's state changes.  Our latch
679                                  * might also get set for some other reason, but if so we'll
680                                  * just end up waiting for the same worker again.
681                                  */
682                                 rc = WaitLatch(MyLatch,
683                                                            WL_LATCH_SET | WL_POSTMASTER_DEATH,
684                                                            -1, WAIT_EVENT_BGWORKER_STARTUP);
685
686                                 /* emergency bailout if postmaster has died */
687                                 if (rc & WL_POSTMASTER_DEATH)
688                                         proc_exit(1);
689
690                                 if (rc & WL_LATCH_SET)
691                                         ResetLatch(MyLatch);
692                         }
693                 }
694
695                 /* If all workers are known to have started, we're done. */
696                 if (pcxt->nknown_attached_workers >= pcxt->nworkers_launched)
697                 {
698                         Assert(pcxt->nknown_attached_workers == pcxt->nworkers_launched);
699                         break;
700                 }
701         }
702 }
703
704 /*
705  * Wait for all workers to finish computing.
706  *
707  * Even if the parallel operation seems to have completed successfully, it's
708  * important to call this function afterwards.  We must not miss any errors
709  * the workers may have thrown during the parallel operation, or any that they
710  * may yet throw while shutting down.
711  *
712  * Also, we want to update our notion of XactLastRecEnd based on worker
713  * feedback.
714  */
715 void
716 WaitForParallelWorkersToFinish(ParallelContext *pcxt)
717 {
718         for (;;)
719         {
720                 bool            anyone_alive = false;
721                 int                     nfinished = 0;
722                 int                     i;
723
724                 /*
725                  * This will process any parallel messages that are pending, which may
726                  * change the outcome of the loop that follows.  It may also throw an
727                  * error propagated from a worker.
728                  */
729                 CHECK_FOR_INTERRUPTS();
730
731                 for (i = 0; i < pcxt->nworkers_launched; ++i)
732                 {
733                         /*
734                          * If error_mqh is NULL, then the worker has already exited
735                          * cleanly.  If we have received a message through error_mqh from
736                          * the worker, we know it started up cleanly, and therefore we're
737                          * certain to be notified when it exits.
738                          */
739                         if (pcxt->worker[i].error_mqh == NULL)
740                                 ++nfinished;
741                         else if (pcxt->known_attached_workers[i])
742                         {
743                                 anyone_alive = true;
744                                 break;
745                         }
746                 }
747
748                 if (!anyone_alive)
749                 {
750                         /* If all workers are known to have finished, we're done. */
751                         if (nfinished >= pcxt->nworkers_launched)
752                         {
753                                 Assert(nfinished == pcxt->nworkers_launched);
754                                 break;
755                         }
756
757                         /*
758                          * We didn't detect any living workers, but not all workers are
759                          * known to have exited cleanly.  Either not all workers have
760                          * launched yet, or maybe some of them failed to start or
761                          * terminated abnormally.
762                          */
763                         for (i = 0; i < pcxt->nworkers_launched; ++i)
764                         {
765                                 pid_t           pid;
766                                 shm_mq     *mq;
767
768                                 /*
769                                  * If the worker is BGWH_NOT_YET_STARTED or BGWH_STARTED, we
770                                  * should just keep waiting.  If it is BGWH_STOPPED, then
771                                  * further investigation is needed.
772                                  */
773                                 if (pcxt->worker[i].error_mqh == NULL ||
774                                         pcxt->worker[i].bgwhandle == NULL ||
775                                         GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle,
776                                                                                    &pid) != BGWH_STOPPED)
777                                         continue;
778
779                                 /*
780                                  * Check whether the worker ended up stopped without ever
781                                  * attaching to the error queue.  If so, the postmaster was
782                                  * unable to fork the worker or it exited without initializing
783                                  * properly.  We must throw an error, since the caller may
784                                  * have been expecting the worker to do some work before
785                                  * exiting.
786                                  */
787                                 mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
788                                 if (shm_mq_get_sender(mq) == NULL)
789                                         ereport(ERROR,
790                                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
791                                                          errmsg("parallel worker failed to initialize"),
792                                                          errhint("More details may be available in the server log.")));
793
794                                 /*
795                                  * The worker is stopped, but is attached to the error queue.
796                                  * Unless there's a bug somewhere, this will only happen when
797                                  * the worker writes messages and terminates after the
798                                  * CHECK_FOR_INTERRUPTS() near the top of this function and
799                                  * before the call to GetBackgroundWorkerPid().  In that case,
800                                  * or latch should have been set as well and the right things
801                                  * will happen on the next pass through the loop.
802                                  */
803                         }
804                 }
805
806                 WaitLatch(MyLatch, WL_LATCH_SET, -1,
807                                   WAIT_EVENT_PARALLEL_FINISH);
808                 ResetLatch(MyLatch);
809         }
810
811         if (pcxt->toc != NULL)
812         {
813                 FixedParallelState *fps;
814
815                 fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
816                 if (fps->last_xlog_end > XactLastRecEnd)
817                         XactLastRecEnd = fps->last_xlog_end;
818         }
819 }
820
821 /*
822  * Wait for all workers to exit.
823  *
824  * This function ensures that workers have been completely shutdown.  The
825  * difference between WaitForParallelWorkersToFinish and this function is
826  * that former just ensures that last message sent by worker backend is
827  * received by master backend whereas this ensures the complete shutdown.
828  */
829 static void
830 WaitForParallelWorkersToExit(ParallelContext *pcxt)
831 {
832         int                     i;
833
834         /* Wait until the workers actually die. */
835         for (i = 0; i < pcxt->nworkers_launched; ++i)
836         {
837                 BgwHandleStatus status;
838
839                 if (pcxt->worker == NULL || pcxt->worker[i].bgwhandle == NULL)
840                         continue;
841
842                 status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle);
843
844                 /*
845                  * If the postmaster kicked the bucket, we have no chance of cleaning
846                  * up safely -- we won't be able to tell when our workers are actually
847                  * dead.  This doesn't necessitate a PANIC since they will all abort
848                  * eventually, but we can't safely continue this session.
849                  */
850                 if (status == BGWH_POSTMASTER_DIED)
851                         ereport(FATAL,
852                                         (errcode(ERRCODE_ADMIN_SHUTDOWN),
853                                          errmsg("postmaster exited during a parallel transaction")));
854
855                 /* Release memory. */
856                 pfree(pcxt->worker[i].bgwhandle);
857                 pcxt->worker[i].bgwhandle = NULL;
858         }
859 }
860
861 /*
862  * Destroy a parallel context.
863  *
864  * If expecting a clean exit, you should use WaitForParallelWorkersToFinish()
865  * first, before calling this function.  When this function is invoked, any
866  * remaining workers are forcibly killed; the dynamic shared memory segment
867  * is unmapped; and we then wait (uninterruptibly) for the workers to exit.
868  */
869 void
870 DestroyParallelContext(ParallelContext *pcxt)
871 {
872         int                     i;
873
874         /*
875          * Be careful about order of operations here!  We remove the parallel
876          * context from the list before we do anything else; otherwise, if an
877          * error occurs during a subsequent step, we might try to nuke it again
878          * from AtEOXact_Parallel or AtEOSubXact_Parallel.
879          */
880         dlist_delete(&pcxt->node);
881
882         /* Kill each worker in turn, and forget their error queues. */
883         if (pcxt->worker != NULL)
884         {
885                 for (i = 0; i < pcxt->nworkers_launched; ++i)
886                 {
887                         if (pcxt->worker[i].error_mqh != NULL)
888                         {
889                                 TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
890
891                                 shm_mq_detach(pcxt->worker[i].error_mqh);
892                                 pcxt->worker[i].error_mqh = NULL;
893                         }
894                 }
895         }
896
897         /*
898          * If we have allocated a shared memory segment, detach it.  This will
899          * implicitly detach the error queues, and any other shared memory queues,
900          * stored there.
901          */
902         if (pcxt->seg != NULL)
903         {
904                 dsm_detach(pcxt->seg);
905                 pcxt->seg = NULL;
906         }
907
908         /*
909          * If this parallel context is actually in backend-private memory rather
910          * than shared memory, free that memory instead.
911          */
912         if (pcxt->private_memory != NULL)
913         {
914                 pfree(pcxt->private_memory);
915                 pcxt->private_memory = NULL;
916         }
917
918         /*
919          * We can't finish transaction commit or abort until all of the workers
920          * have exited.  This means, in particular, that we can't respond to
921          * interrupts at this stage.
922          */
923         HOLD_INTERRUPTS();
924         WaitForParallelWorkersToExit(pcxt);
925         RESUME_INTERRUPTS();
926
927         /* Free the worker array itself. */
928         if (pcxt->worker != NULL)
929         {
930                 pfree(pcxt->worker);
931                 pcxt->worker = NULL;
932         }
933
934         /* Free memory. */
935         pfree(pcxt->library_name);
936         pfree(pcxt->function_name);
937         pfree(pcxt);
938 }
939
940 /*
941  * Are there any parallel contexts currently active?
942  */
943 bool
944 ParallelContextActive(void)
945 {
946         return !dlist_is_empty(&pcxt_list);
947 }
948
949 /*
950  * Handle receipt of an interrupt indicating a parallel worker message.
951  *
952  * Note: this is called within a signal handler!  All we can do is set
953  * a flag that will cause the next CHECK_FOR_INTERRUPTS() to invoke
954  * HandleParallelMessages().
955  */
956 void
957 HandleParallelMessageInterrupt(void)
958 {
959         InterruptPending = true;
960         ParallelMessagePending = true;
961         SetLatch(MyLatch);
962 }
963
964 /*
965  * Handle any queued protocol messages received from parallel workers.
966  */
967 void
968 HandleParallelMessages(void)
969 {
970         dlist_iter      iter;
971         MemoryContext oldcontext;
972
973         static MemoryContext hpm_context = NULL;
974
975         /*
976          * This is invoked from ProcessInterrupts(), and since some of the
977          * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
978          * for recursive calls if more signals are received while this runs.  It's
979          * unclear that recursive entry would be safe, and it doesn't seem useful
980          * even if it is safe, so let's block interrupts until done.
981          */
982         HOLD_INTERRUPTS();
983
984         /*
985          * Moreover, CurrentMemoryContext might be pointing almost anywhere.  We
986          * don't want to risk leaking data into long-lived contexts, so let's do
987          * our work here in a private context that we can reset on each use.
988          */
989         if (hpm_context == NULL)        /* first time through? */
990                 hpm_context = AllocSetContextCreate(TopMemoryContext,
991                                                                                         "HandleParallelMessages",
992                                                                                         ALLOCSET_DEFAULT_SIZES);
993         else
994                 MemoryContextReset(hpm_context);
995
996         oldcontext = MemoryContextSwitchTo(hpm_context);
997
998         /* OK to process messages.  Reset the flag saying there are more to do. */
999         ParallelMessagePending = false;
1000
1001         dlist_foreach(iter, &pcxt_list)
1002         {
1003                 ParallelContext *pcxt;
1004                 int                     i;
1005
1006                 pcxt = dlist_container(ParallelContext, node, iter.cur);
1007                 if (pcxt->worker == NULL)
1008                         continue;
1009
1010                 for (i = 0; i < pcxt->nworkers_launched; ++i)
1011                 {
1012                         /*
1013                          * Read as many messages as we can from each worker, but stop when
1014                          * either (1) the worker's error queue goes away, which can happen
1015                          * if we receive a Terminate message from the worker; or (2) no
1016                          * more messages can be read from the worker without blocking.
1017                          */
1018                         while (pcxt->worker[i].error_mqh != NULL)
1019                         {
1020                                 shm_mq_result res;
1021                                 Size            nbytes;
1022                                 void       *data;
1023
1024                                 res = shm_mq_receive(pcxt->worker[i].error_mqh, &nbytes,
1025                                                                          &data, true);
1026                                 if (res == SHM_MQ_WOULD_BLOCK)
1027                                         break;
1028                                 else if (res == SHM_MQ_SUCCESS)
1029                                 {
1030                                         StringInfoData msg;
1031
1032                                         initStringInfo(&msg);
1033                                         appendBinaryStringInfo(&msg, data, nbytes);
1034                                         HandleParallelMessage(pcxt, i, &msg);
1035                                         pfree(msg.data);
1036                                 }
1037                                 else
1038                                         ereport(ERROR,
1039                                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1040                                                          errmsg("lost connection to parallel worker")));
1041                         }
1042                 }
1043         }
1044
1045         MemoryContextSwitchTo(oldcontext);
1046
1047         /* Might as well clear the context on our way out */
1048         MemoryContextReset(hpm_context);
1049
1050         RESUME_INTERRUPTS();
1051 }
1052
1053 /*
1054  * Handle a single protocol message received from a single parallel worker.
1055  */
1056 static void
1057 HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
1058 {
1059         char            msgtype;
1060
1061         if (pcxt->known_attached_workers != NULL &&
1062                 !pcxt->known_attached_workers[i])
1063         {
1064                 pcxt->known_attached_workers[i] = true;
1065                 pcxt->nknown_attached_workers++;
1066         }
1067
1068         msgtype = pq_getmsgbyte(msg);
1069
1070         switch (msgtype)
1071         {
1072                 case 'K':                               /* BackendKeyData */
1073                         {
1074                                 int32           pid = pq_getmsgint(msg, 4);
1075
1076                                 (void) pq_getmsgint(msg, 4);    /* discard cancel key */
1077                                 (void) pq_getmsgend(msg);
1078                                 pcxt->worker[i].pid = pid;
1079                                 break;
1080                         }
1081
1082                 case 'E':                               /* ErrorResponse */
1083                 case 'N':                               /* NoticeResponse */
1084                         {
1085                                 ErrorData       edata;
1086                                 ErrorContextCallback *save_error_context_stack;
1087
1088                                 /* Parse ErrorResponse or NoticeResponse. */
1089                                 pq_parse_errornotice(msg, &edata);
1090
1091                                 /* Death of a worker isn't enough justification for suicide. */
1092                                 edata.elevel = Min(edata.elevel, ERROR);
1093
1094                                 /*
1095                                  * If desired, add a context line to show that this is a
1096                                  * message propagated from a parallel worker.  Otherwise, it
1097                                  * can sometimes be confusing to understand what actually
1098                                  * happened.  (We don't do this in FORCE_PARALLEL_REGRESS mode
1099                                  * because it causes test-result instability depending on
1100                                  * whether a parallel worker is actually used or not.)
1101                                  */
1102                                 if (force_parallel_mode != FORCE_PARALLEL_REGRESS)
1103                                 {
1104                                         if (edata.context)
1105                                                 edata.context = psprintf("%s\n%s", edata.context,
1106                                                                                                  _("parallel worker"));
1107                                         else
1108                                                 edata.context = pstrdup(_("parallel worker"));
1109                                 }
1110
1111                                 /*
1112                                  * Context beyond that should use the error context callbacks
1113                                  * that were in effect when the ParallelContext was created,
1114                                  * not the current ones.
1115                                  */
1116                                 save_error_context_stack = error_context_stack;
1117                                 error_context_stack = pcxt->error_context_stack;
1118
1119                                 /* Rethrow error or print notice. */
1120                                 ThrowErrorData(&edata);
1121
1122                                 /* Not an error, so restore previous context stack. */
1123                                 error_context_stack = save_error_context_stack;
1124
1125                                 break;
1126                         }
1127
1128                 case 'A':                               /* NotifyResponse */
1129                         {
1130                                 /* Propagate NotifyResponse. */
1131                                 int32           pid;
1132                                 const char *channel;
1133                                 const char *payload;
1134
1135                                 pid = pq_getmsgint(msg, 4);
1136                                 channel = pq_getmsgrawstring(msg);
1137                                 payload = pq_getmsgrawstring(msg);
1138                                 pq_endmessage(msg);
1139
1140                                 NotifyMyFrontEnd(channel, payload, pid);
1141
1142                                 break;
1143                         }
1144
1145                 case 'X':                               /* Terminate, indicating clean exit */
1146                         {
1147                                 shm_mq_detach(pcxt->worker[i].error_mqh);
1148                                 pcxt->worker[i].error_mqh = NULL;
1149                                 break;
1150                         }
1151
1152                 default:
1153                         {
1154                                 elog(ERROR, "unrecognized message type received from parallel worker: %c (message length %d bytes)",
1155                                          msgtype, msg->len);
1156                         }
1157         }
1158 }
1159
1160 /*
1161  * End-of-subtransaction cleanup for parallel contexts.
1162  *
1163  * Currently, it's forbidden to enter or leave a subtransaction while
1164  * parallel mode is in effect, so we could just blow away everything.  But
1165  * we may want to relax that restriction in the future, so this code
1166  * contemplates that there may be multiple subtransaction IDs in pcxt_list.
1167  */
1168 void
1169 AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId)
1170 {
1171         while (!dlist_is_empty(&pcxt_list))
1172         {
1173                 ParallelContext *pcxt;
1174
1175                 pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
1176                 if (pcxt->subid != mySubId)
1177                         break;
1178                 if (isCommit)
1179                         elog(WARNING, "leaked parallel context");
1180                 DestroyParallelContext(pcxt);
1181         }
1182 }
1183
1184 /*
1185  * End-of-transaction cleanup for parallel contexts.
1186  */
1187 void
1188 AtEOXact_Parallel(bool isCommit)
1189 {
1190         while (!dlist_is_empty(&pcxt_list))
1191         {
1192                 ParallelContext *pcxt;
1193
1194                 pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
1195                 if (isCommit)
1196                         elog(WARNING, "leaked parallel context");
1197                 DestroyParallelContext(pcxt);
1198         }
1199 }
1200
1201 /*
1202  * Main entrypoint for parallel workers.
1203  */
1204 void
1205 ParallelWorkerMain(Datum main_arg)
1206 {
1207         dsm_segment *seg;
1208         shm_toc    *toc;
1209         FixedParallelState *fps;
1210         char       *error_queue_space;
1211         shm_mq     *mq;
1212         shm_mq_handle *mqh;
1213         char       *libraryspace;
1214         char       *entrypointstate;
1215         char       *library_name;
1216         char       *function_name;
1217         parallel_worker_main_type entrypt;
1218         char       *gucspace;
1219         char       *combocidspace;
1220         char       *tsnapspace;
1221         char       *asnapspace;
1222         char       *tstatespace;
1223         char       *reindexspace;
1224         char       *relmapperspace;
1225         StringInfoData msgbuf;
1226         char       *session_dsm_handle_space;
1227
1228         /* Set flag to indicate that we're initializing a parallel worker. */
1229         InitializingParallelWorker = true;
1230
1231         /* Establish signal handlers. */
1232         pqsignal(SIGTERM, die);
1233         BackgroundWorkerUnblockSignals();
1234
1235         /* Determine and set our parallel worker number. */
1236         Assert(ParallelWorkerNumber == -1);
1237         memcpy(&ParallelWorkerNumber, MyBgworkerEntry->bgw_extra, sizeof(int));
1238
1239         /* Set up a memory context to work in, just for cleanliness. */
1240         CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext,
1241                                                                                                  "Parallel worker",
1242                                                                                                  ALLOCSET_DEFAULT_SIZES);
1243
1244         /*
1245          * Attach to the dynamic shared memory segment for the parallel query, and
1246          * find its table of contents.
1247          *
1248          * Note: at this point, we have not created any ResourceOwner in this
1249          * process.  This will result in our DSM mapping surviving until process
1250          * exit, which is fine.  If there were a ResourceOwner, it would acquire
1251          * ownership of the mapping, but we have no need for that.
1252          */
1253         seg = dsm_attach(DatumGetUInt32(main_arg));
1254         if (seg == NULL)
1255                 ereport(ERROR,
1256                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1257                                  errmsg("could not map dynamic shared memory segment")));
1258         toc = shm_toc_attach(PARALLEL_MAGIC, dsm_segment_address(seg));
1259         if (toc == NULL)
1260                 ereport(ERROR,
1261                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1262                                  errmsg("invalid magic number in dynamic shared memory segment")));
1263
1264         /* Look up fixed parallel state. */
1265         fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED, false);
1266         MyFixedParallelState = fps;
1267
1268         /* Arrange to signal the leader if we exit. */
1269         ParallelMasterPid = fps->parallel_master_pid;
1270         ParallelMasterBackendId = fps->parallel_master_backend_id;
1271         on_shmem_exit(ParallelWorkerShutdown, (Datum) 0);
1272
1273         /*
1274          * Now we can find and attach to the error queue provided for us.  That's
1275          * good, because until we do that, any errors that happen here will not be
1276          * reported back to the process that requested that this worker be
1277          * launched.
1278          */
1279         error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE, false);
1280         mq = (shm_mq *) (error_queue_space +
1281                                          ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE);
1282         shm_mq_set_sender(mq, MyProc);
1283         mqh = shm_mq_attach(mq, seg, NULL);
1284         pq_redirect_to_shm_mq(seg, mqh);
1285         pq_set_parallel_master(fps->parallel_master_pid,
1286                                                    fps->parallel_master_backend_id);
1287
1288         /*
1289          * Send a BackendKeyData message to the process that initiated parallelism
1290          * so that it has access to our PID before it receives any other messages
1291          * from us.  Our cancel key is sent, too, since that's the way the
1292          * protocol message is defined, but it won't actually be used for anything
1293          * in this case.
1294          */
1295         pq_beginmessage(&msgbuf, 'K');
1296         pq_sendint32(&msgbuf, (int32) MyProcPid);
1297         pq_sendint32(&msgbuf, (int32) MyCancelKey);
1298         pq_endmessage(&msgbuf);
1299
1300         /*
1301          * Hooray! Primary initialization is complete.  Now, we need to set up our
1302          * backend-local state to match the original backend.
1303          */
1304
1305         /*
1306          * Join locking group.  We must do this before anything that could try to
1307          * acquire a heavyweight lock, because any heavyweight locks acquired to
1308          * this point could block either directly against the parallel group
1309          * leader or against some process which in turn waits for a lock that
1310          * conflicts with the parallel group leader, causing an undetected
1311          * deadlock.  (If we can't join the lock group, the leader has gone away,
1312          * so just exit quietly.)
1313          */
1314         if (!BecomeLockGroupMember(fps->parallel_master_pgproc,
1315                                                            fps->parallel_master_pid))
1316                 return;
1317
1318         /*
1319          * Restore transaction and statement start-time timestamps.  This must
1320          * happen before anything that would start a transaction, else asserts in
1321          * xact.c will fire.
1322          */
1323         SetParallelStartTimestamps(fps->xact_ts, fps->stmt_ts);
1324
1325         /*
1326          * Identify the entry point to be called.  In theory this could result in
1327          * loading an additional library, though most likely the entry point is in
1328          * the core backend or in a library we just loaded.
1329          */
1330         entrypointstate = shm_toc_lookup(toc, PARALLEL_KEY_ENTRYPOINT, false);
1331         library_name = entrypointstate;
1332         function_name = entrypointstate + strlen(library_name) + 1;
1333
1334         entrypt = LookupParallelWorkerFunction(library_name, function_name);
1335
1336         /* Restore database connection. */
1337         BackgroundWorkerInitializeConnectionByOid(fps->database_id,
1338                                                                                           fps->authenticated_user_id,
1339                                                                                           0);
1340
1341         /*
1342          * Set the client encoding to the database encoding, since that is what
1343          * the leader will expect.
1344          */
1345         SetClientEncoding(GetDatabaseEncoding());
1346
1347         /*
1348          * Load libraries that were loaded by original backend.  We want to do
1349          * this before restoring GUCs, because the libraries might define custom
1350          * variables.
1351          */
1352         libraryspace = shm_toc_lookup(toc, PARALLEL_KEY_LIBRARY, false);
1353         StartTransactionCommand();
1354         RestoreLibraryState(libraryspace);
1355
1356         /* Restore GUC values from launching backend. */
1357         gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC, false);
1358         RestoreGUCState(gucspace);
1359         CommitTransactionCommand();
1360
1361         /* Crank up a transaction state appropriate to a parallel worker. */
1362         tstatespace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_STATE, false);
1363         StartParallelWorkerTransaction(tstatespace);
1364
1365         /* Restore combo CID state. */
1366         combocidspace = shm_toc_lookup(toc, PARALLEL_KEY_COMBO_CID, false);
1367         RestoreComboCIDState(combocidspace);
1368
1369         /* Attach to the per-session DSM segment and contained objects. */
1370         session_dsm_handle_space =
1371                 shm_toc_lookup(toc, PARALLEL_KEY_SESSION_DSM, false);
1372         AttachSession(*(dsm_handle *) session_dsm_handle_space);
1373
1374         /* Restore transaction snapshot. */
1375         tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT, false);
1376         RestoreTransactionSnapshot(RestoreSnapshot(tsnapspace),
1377                                                            fps->parallel_master_pgproc);
1378
1379         /* Restore active snapshot. */
1380         asnapspace = shm_toc_lookup(toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, false);
1381         PushActiveSnapshot(RestoreSnapshot(asnapspace));
1382
1383         /*
1384          * We've changed which tuples we can see, and must therefore invalidate
1385          * system caches.
1386          */
1387         InvalidateSystemCaches();
1388
1389         /*
1390          * Restore current role id.  Skip verifying whether session user is
1391          * allowed to become this role and blindly restore the leader's state for
1392          * current role.
1393          */
1394         SetCurrentRoleId(fps->outer_user_id, fps->is_superuser);
1395
1396         /* Restore user ID and security context. */
1397         SetUserIdAndSecContext(fps->current_user_id, fps->sec_context);
1398
1399         /* Restore temp-namespace state to ensure search path matches leader's. */
1400         SetTempNamespaceState(fps->temp_namespace_id,
1401                                                   fps->temp_toast_namespace_id);
1402
1403         /* Restore reindex state. */
1404         reindexspace = shm_toc_lookup(toc, PARALLEL_KEY_REINDEX_STATE, false);
1405         RestoreReindexState(reindexspace);
1406
1407         /* Restore relmapper state. */
1408         relmapperspace = shm_toc_lookup(toc, PARALLEL_KEY_RELMAPPER_STATE, false);
1409         RestoreRelationMap(relmapperspace);
1410
1411         /*
1412          * We've initialized all of our state now; nothing should change
1413          * hereafter.
1414          */
1415         InitializingParallelWorker = false;
1416         EnterParallelMode();
1417
1418         /*
1419          * Time to do the real work: invoke the caller-supplied code.
1420          */
1421         entrypt(seg, toc);
1422
1423         /* Must exit parallel mode to pop active snapshot. */
1424         ExitParallelMode();
1425
1426         /* Must pop active snapshot so snapmgr.c doesn't complain. */
1427         PopActiveSnapshot();
1428
1429         /* Shut down the parallel-worker transaction. */
1430         EndParallelWorkerTransaction();
1431
1432         /* Detach from the per-session DSM segment. */
1433         DetachSession();
1434
1435         /* Report success. */
1436         pq_putmessage('X', NULL, 0);
1437 }
1438
1439 /*
1440  * Update shared memory with the ending location of the last WAL record we
1441  * wrote, if it's greater than the value already stored there.
1442  */
1443 void
1444 ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end)
1445 {
1446         FixedParallelState *fps = MyFixedParallelState;
1447
1448         Assert(fps != NULL);
1449         SpinLockAcquire(&fps->mutex);
1450         if (fps->last_xlog_end < last_xlog_end)
1451                 fps->last_xlog_end = last_xlog_end;
1452         SpinLockRelease(&fps->mutex);
1453 }
1454
1455 /*
1456  * Make sure the leader tries to read from our error queue one more time.
1457  * This guards against the case where we exit uncleanly without sending an
1458  * ErrorResponse to the leader, for example because some code calls proc_exit
1459  * directly.
1460  */
1461 static void
1462 ParallelWorkerShutdown(int code, Datum arg)
1463 {
1464         SendProcSignal(ParallelMasterPid,
1465                                    PROCSIG_PARALLEL_MESSAGE,
1466                                    ParallelMasterBackendId);
1467 }
1468
1469 /*
1470  * Look up (and possibly load) a parallel worker entry point function.
1471  *
1472  * For functions contained in the core code, we use library name "postgres"
1473  * and consult the InternalParallelWorkers array.  External functions are
1474  * looked up, and loaded if necessary, using load_external_function().
1475  *
1476  * The point of this is to pass function names as strings across process
1477  * boundaries.  We can't pass actual function addresses because of the
1478  * possibility that the function has been loaded at a different address
1479  * in a different process.  This is obviously a hazard for functions in
1480  * loadable libraries, but it can happen even for functions in the core code
1481  * on platforms using EXEC_BACKEND (e.g., Windows).
1482  *
1483  * At some point it might be worthwhile to get rid of InternalParallelWorkers[]
1484  * in favor of applying load_external_function() for core functions too;
1485  * but that raises portability issues that are not worth addressing now.
1486  */
1487 static parallel_worker_main_type
1488 LookupParallelWorkerFunction(const char *libraryname, const char *funcname)
1489 {
1490         /*
1491          * If the function is to be loaded from postgres itself, search the
1492          * InternalParallelWorkers array.
1493          */
1494         if (strcmp(libraryname, "postgres") == 0)
1495         {
1496                 int                     i;
1497
1498                 for (i = 0; i < lengthof(InternalParallelWorkers); i++)
1499                 {
1500                         if (strcmp(InternalParallelWorkers[i].fn_name, funcname) == 0)
1501                                 return InternalParallelWorkers[i].fn_addr;
1502                 }
1503
1504                 /* We can only reach this by programming error. */
1505                 elog(ERROR, "internal function \"%s\" not found", funcname);
1506         }
1507
1508         /* Otherwise load from external library. */
1509         return (parallel_worker_main_type)
1510                 load_external_function(libraryname, funcname, true, NULL);
1511 }