]> granicus.if.org Git - postgresql/blob - src/backend/access/transam/parallel.c
Add support for coordinating record typmods among parallel workers.
[postgresql] / src / backend / access / transam / parallel.c
1 /*-------------------------------------------------------------------------
2  *
3  * parallel.c
4  *        Infrastructure for launching parallel workers
5  *
6  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  *        src/backend/access/transam/parallel.c
11  *
12  *-------------------------------------------------------------------------
13  */
14
15 #include "postgres.h"
16
17 #include "access/parallel.h"
18 #include "access/session.h"
19 #include "access/xact.h"
20 #include "access/xlog.h"
21 #include "catalog/namespace.h"
22 #include "commands/async.h"
23 #include "executor/execParallel.h"
24 #include "libpq/libpq.h"
25 #include "libpq/pqformat.h"
26 #include "libpq/pqmq.h"
27 #include "miscadmin.h"
28 #include "optimizer/planmain.h"
29 #include "pgstat.h"
30 #include "storage/ipc.h"
31 #include "storage/sinval.h"
32 #include "storage/spin.h"
33 #include "tcop/tcopprot.h"
34 #include "utils/combocid.h"
35 #include "utils/guc.h"
36 #include "utils/inval.h"
37 #include "utils/memutils.h"
38 #include "utils/resowner.h"
39 #include "utils/snapmgr.h"
40 #include "utils/typcache.h"
41
42
43 /*
44  * We don't want to waste a lot of memory on an error queue which, most of
45  * the time, will process only a handful of small messages.  However, it is
46  * desirable to make it large enough that a typical ErrorResponse can be sent
47  * without blocking.  That way, a worker that errors out can write the whole
48  * message into the queue and terminate without waiting for the user backend.
49  */
50 #define PARALLEL_ERROR_QUEUE_SIZE                       16384
51
52 /* Magic number for parallel context TOC. */
53 #define PARALLEL_MAGIC                                          0x50477c7c
54
55 /*
56  * Magic numbers for per-context parallel state sharing.  Higher-level code
57  * should use smaller values, leaving these very large ones for use by this
58  * module.
59  */
60 #define PARALLEL_KEY_FIXED                                      UINT64CONST(0xFFFFFFFFFFFF0001)
61 #define PARALLEL_KEY_ERROR_QUEUE                        UINT64CONST(0xFFFFFFFFFFFF0002)
62 #define PARALLEL_KEY_LIBRARY                            UINT64CONST(0xFFFFFFFFFFFF0003)
63 #define PARALLEL_KEY_GUC                                        UINT64CONST(0xFFFFFFFFFFFF0004)
64 #define PARALLEL_KEY_COMBO_CID                          UINT64CONST(0xFFFFFFFFFFFF0005)
65 #define PARALLEL_KEY_TRANSACTION_SNAPSHOT       UINT64CONST(0xFFFFFFFFFFFF0006)
66 #define PARALLEL_KEY_ACTIVE_SNAPSHOT            UINT64CONST(0xFFFFFFFFFFFF0007)
67 #define PARALLEL_KEY_TRANSACTION_STATE          UINT64CONST(0xFFFFFFFFFFFF0008)
68 #define PARALLEL_KEY_ENTRYPOINT                         UINT64CONST(0xFFFFFFFFFFFF0009)
69 #define PARALLEL_KEY_SESSION_DSM                        UINT64CONST(0xFFFFFFFFFFFF000A)
70
71 /* Fixed-size parallel state. */
72 typedef struct FixedParallelState
73 {
74         /* Fixed-size state that workers must restore. */
75         Oid                     database_id;
76         Oid                     authenticated_user_id;
77         Oid                     current_user_id;
78         Oid                     temp_namespace_id;
79         Oid                     temp_toast_namespace_id;
80         int                     sec_context;
81         PGPROC     *parallel_master_pgproc;
82         pid_t           parallel_master_pid;
83         BackendId       parallel_master_backend_id;
84
85         /* Mutex protects remaining fields. */
86         slock_t         mutex;
87
88         /* Maximum XactLastRecEnd of any worker. */
89         XLogRecPtr      last_xlog_end;
90 } FixedParallelState;
91
92 /*
93  * Our parallel worker number.  We initialize this to -1, meaning that we are
94  * not a parallel worker.  In parallel workers, it will be set to a value >= 0
95  * and < the number of workers before any user code is invoked; each parallel
96  * worker will get a different parallel worker number.
97  */
98 int                     ParallelWorkerNumber = -1;
99
100 /* Is there a parallel message pending which we need to receive? */
101 volatile bool ParallelMessagePending = false;
102
103 /* Are we initializing a parallel worker? */
104 bool            InitializingParallelWorker = false;
105
106 /* Pointer to our fixed parallel state. */
107 static FixedParallelState *MyFixedParallelState;
108
109 /* List of active parallel contexts. */
110 static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list);
111
112 /*
113  * List of internal parallel worker entry points.  We need this for
114  * reasons explained in LookupParallelWorkerFunction(), below.
115  */
116 static const struct
117 {
118         const char *fn_name;
119         parallel_worker_main_type fn_addr;
120 }                       InternalParallelWorkers[] =
121
122 {
123         {
124                 "ParallelQueryMain", ParallelQueryMain
125         }
126 };
127
128 /* Private functions. */
129 static void HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg);
130 static void WaitForParallelWorkersToExit(ParallelContext *pcxt);
131 static parallel_worker_main_type LookupParallelWorkerFunction(const char *libraryname, const char *funcname);
132
133
134 /*
135  * Establish a new parallel context.  This should be done after entering
136  * parallel mode, and (unless there is an error) the context should be
137  * destroyed before exiting the current subtransaction.
138  */
139 ParallelContext *
140 CreateParallelContext(const char *library_name, const char *function_name,
141                                           int nworkers)
142 {
143         MemoryContext oldcontext;
144         ParallelContext *pcxt;
145
146         /* It is unsafe to create a parallel context if not in parallel mode. */
147         Assert(IsInParallelMode());
148
149         /* Number of workers should be non-negative. */
150         Assert(nworkers >= 0);
151
152         /*
153          * If dynamic shared memory is not available, we won't be able to use
154          * background workers.
155          */
156         if (dynamic_shared_memory_type == DSM_IMPL_NONE)
157                 nworkers = 0;
158
159         /*
160          * If we are running under serializable isolation, we can't use parallel
161          * workers, at least not until somebody enhances that mechanism to be
162          * parallel-aware.
163          */
164         if (IsolationIsSerializable())
165                 nworkers = 0;
166
167         /* We might be running in a short-lived memory context. */
168         oldcontext = MemoryContextSwitchTo(TopTransactionContext);
169
170         /* Initialize a new ParallelContext. */
171         pcxt = palloc0(sizeof(ParallelContext));
172         pcxt->subid = GetCurrentSubTransactionId();
173         pcxt->nworkers = nworkers;
174         pcxt->library_name = pstrdup(library_name);
175         pcxt->function_name = pstrdup(function_name);
176         pcxt->error_context_stack = error_context_stack;
177         shm_toc_initialize_estimator(&pcxt->estimator);
178         dlist_push_head(&pcxt_list, &pcxt->node);
179
180         /* Restore previous memory context. */
181         MemoryContextSwitchTo(oldcontext);
182
183         return pcxt;
184 }
185
186 /*
187  * Establish the dynamic shared memory segment for a parallel context and
188  * copy state and other bookkeeping information that will be needed by
189  * parallel workers into it.
190  */
191 void
192 InitializeParallelDSM(ParallelContext *pcxt)
193 {
194         MemoryContext oldcontext;
195         Size            library_len = 0;
196         Size            guc_len = 0;
197         Size            combocidlen = 0;
198         Size            tsnaplen = 0;
199         Size            asnaplen = 0;
200         Size            tstatelen = 0;
201         Size            segsize = 0;
202         int                     i;
203         FixedParallelState *fps;
204         dsm_handle      session_dsm_handle = DSM_HANDLE_INVALID;
205         Snapshot        transaction_snapshot = GetTransactionSnapshot();
206         Snapshot        active_snapshot = GetActiveSnapshot();
207
208         /* We might be running in a very short-lived memory context. */
209         oldcontext = MemoryContextSwitchTo(TopTransactionContext);
210
211         /* Allow space to store the fixed-size parallel state. */
212         shm_toc_estimate_chunk(&pcxt->estimator, sizeof(FixedParallelState));
213         shm_toc_estimate_keys(&pcxt->estimator, 1);
214
215         /*
216          * Normally, the user will have requested at least one worker process, but
217          * if by chance they have not, we can skip a bunch of things here.
218          */
219         if (pcxt->nworkers > 0)
220         {
221                 /* Get (or create) the per-session DSM segment's handle. */
222                 session_dsm_handle = GetSessionDsmHandle();
223
224                 /*
225                  * If we weren't able to create a per-session DSM segment, then we can
226                  * continue but we can't safely launch any workers because their
227                  * record typmods would be incompatible so they couldn't exchange
228                  * tuples.
229                  */
230                 if (session_dsm_handle == DSM_HANDLE_INVALID)
231                         pcxt->nworkers = 0;
232         }
233
234         if (pcxt->nworkers > 0)
235         {
236                 /* Estimate space for various kinds of state sharing. */
237                 library_len = EstimateLibraryStateSpace();
238                 shm_toc_estimate_chunk(&pcxt->estimator, library_len);
239                 guc_len = EstimateGUCStateSpace();
240                 shm_toc_estimate_chunk(&pcxt->estimator, guc_len);
241                 combocidlen = EstimateComboCIDStateSpace();
242                 shm_toc_estimate_chunk(&pcxt->estimator, combocidlen);
243                 tsnaplen = EstimateSnapshotSpace(transaction_snapshot);
244                 shm_toc_estimate_chunk(&pcxt->estimator, tsnaplen);
245                 asnaplen = EstimateSnapshotSpace(active_snapshot);
246                 shm_toc_estimate_chunk(&pcxt->estimator, asnaplen);
247                 tstatelen = EstimateTransactionStateSpace();
248                 shm_toc_estimate_chunk(&pcxt->estimator, tstatelen);
249                 shm_toc_estimate_chunk(&pcxt->estimator, sizeof(dsm_handle));
250                 /* If you add more chunks here, you probably need to add keys. */
251                 shm_toc_estimate_keys(&pcxt->estimator, 7);
252
253                 /* Estimate space need for error queues. */
254                 StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) ==
255                                                  PARALLEL_ERROR_QUEUE_SIZE,
256                                                  "parallel error queue size not buffer-aligned");
257                 shm_toc_estimate_chunk(&pcxt->estimator,
258                                                            mul_size(PARALLEL_ERROR_QUEUE_SIZE,
259                                                                                 pcxt->nworkers));
260                 shm_toc_estimate_keys(&pcxt->estimator, 1);
261
262                 /* Estimate how much we'll need for the entrypoint info. */
263                 shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name) +
264                                                            strlen(pcxt->function_name) + 2);
265                 shm_toc_estimate_keys(&pcxt->estimator, 1);
266         }
267
268         /*
269          * Create DSM and initialize with new table of contents.  But if the user
270          * didn't request any workers, then don't bother creating a dynamic shared
271          * memory segment; instead, just use backend-private memory.
272          *
273          * Also, if we can't create a dynamic shared memory segment because the
274          * maximum number of segments have already been created, then fall back to
275          * backend-private memory, and plan not to use any workers.  We hope this
276          * won't happen very often, but it's better to abandon the use of
277          * parallelism than to fail outright.
278          */
279         segsize = shm_toc_estimate(&pcxt->estimator);
280         if (pcxt->nworkers > 0)
281                 pcxt->seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS);
282         if (pcxt->seg != NULL)
283                 pcxt->toc = shm_toc_create(PARALLEL_MAGIC,
284                                                                    dsm_segment_address(pcxt->seg),
285                                                                    segsize);
286         else
287         {
288                 pcxt->nworkers = 0;
289                 pcxt->private_memory = MemoryContextAlloc(TopMemoryContext, segsize);
290                 pcxt->toc = shm_toc_create(PARALLEL_MAGIC, pcxt->private_memory,
291                                                                    segsize);
292         }
293
294         /* Initialize fixed-size state in shared memory. */
295         fps = (FixedParallelState *)
296                 shm_toc_allocate(pcxt->toc, sizeof(FixedParallelState));
297         fps->database_id = MyDatabaseId;
298         fps->authenticated_user_id = GetAuthenticatedUserId();
299         GetUserIdAndSecContext(&fps->current_user_id, &fps->sec_context);
300         GetTempNamespaceState(&fps->temp_namespace_id,
301                                                   &fps->temp_toast_namespace_id);
302         fps->parallel_master_pgproc = MyProc;
303         fps->parallel_master_pid = MyProcPid;
304         fps->parallel_master_backend_id = MyBackendId;
305         SpinLockInit(&fps->mutex);
306         fps->last_xlog_end = 0;
307         shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps);
308
309         /* We can skip the rest of this if we're not budgeting for any workers. */
310         if (pcxt->nworkers > 0)
311         {
312                 char       *libraryspace;
313                 char       *gucspace;
314                 char       *combocidspace;
315                 char       *tsnapspace;
316                 char       *asnapspace;
317                 char       *tstatespace;
318                 char       *error_queue_space;
319                 char       *session_dsm_handle_space;
320                 char       *entrypointstate;
321                 Size            lnamelen;
322
323                 /* Serialize shared libraries we have loaded. */
324                 libraryspace = shm_toc_allocate(pcxt->toc, library_len);
325                 SerializeLibraryState(library_len, libraryspace);
326                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_LIBRARY, libraryspace);
327
328                 /* Serialize GUC settings. */
329                 gucspace = shm_toc_allocate(pcxt->toc, guc_len);
330                 SerializeGUCState(guc_len, gucspace);
331                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_GUC, gucspace);
332
333                 /* Serialize combo CID state. */
334                 combocidspace = shm_toc_allocate(pcxt->toc, combocidlen);
335                 SerializeComboCIDState(combocidlen, combocidspace);
336                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_COMBO_CID, combocidspace);
337
338                 /* Serialize transaction snapshot and active snapshot. */
339                 tsnapspace = shm_toc_allocate(pcxt->toc, tsnaplen);
340                 SerializeSnapshot(transaction_snapshot, tsnapspace);
341                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT,
342                                            tsnapspace);
343                 asnapspace = shm_toc_allocate(pcxt->toc, asnaplen);
344                 SerializeSnapshot(active_snapshot, asnapspace);
345                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, asnapspace);
346
347                 /* Provide the handle for per-session segment. */
348                 session_dsm_handle_space = shm_toc_allocate(pcxt->toc,
349                                                                                                         sizeof(dsm_handle));
350                 *(dsm_handle *) session_dsm_handle_space = session_dsm_handle;
351                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_SESSION_DSM,
352                                            session_dsm_handle_space);
353
354                 /* Serialize transaction state. */
355                 tstatespace = shm_toc_allocate(pcxt->toc, tstatelen);
356                 SerializeTransactionState(tstatelen, tstatespace);
357                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_STATE, tstatespace);
358
359                 /* Allocate space for worker information. */
360                 pcxt->worker = palloc0(sizeof(ParallelWorkerInfo) * pcxt->nworkers);
361
362                 /*
363                  * Establish error queues in dynamic shared memory.
364                  *
365                  * These queues should be used only for transmitting ErrorResponse,
366                  * NoticeResponse, and NotifyResponse protocol messages.  Tuple data
367                  * should be transmitted via separate (possibly larger?) queues.
368                  */
369                 error_queue_space =
370                         shm_toc_allocate(pcxt->toc,
371                                                          mul_size(PARALLEL_ERROR_QUEUE_SIZE,
372                                                                           pcxt->nworkers));
373                 for (i = 0; i < pcxt->nworkers; ++i)
374                 {
375                         char       *start;
376                         shm_mq     *mq;
377
378                         start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
379                         mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
380                         shm_mq_set_receiver(mq, MyProc);
381                         pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
382                 }
383                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, error_queue_space);
384
385                 /*
386                  * Serialize entrypoint information.  It's unsafe to pass function
387                  * pointers across processes, as the function pointer may be different
388                  * in each process in EXEC_BACKEND builds, so we always pass library
389                  * and function name.  (We use library name "postgres" for functions
390                  * in the core backend.)
391                  */
392                 lnamelen = strlen(pcxt->library_name);
393                 entrypointstate = shm_toc_allocate(pcxt->toc, lnamelen +
394                                                                                    strlen(pcxt->function_name) + 2);
395                 strcpy(entrypointstate, pcxt->library_name);
396                 strcpy(entrypointstate + lnamelen + 1, pcxt->function_name);
397                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_ENTRYPOINT, entrypointstate);
398         }
399
400         /* Restore previous memory context. */
401         MemoryContextSwitchTo(oldcontext);
402 }
403
404 /*
405  * Reinitialize the dynamic shared memory segment for a parallel context such
406  * that we could launch workers for it again.
407  */
408 void
409 ReinitializeParallelDSM(ParallelContext *pcxt)
410 {
411         FixedParallelState *fps;
412         char       *error_queue_space;
413         int                     i;
414
415         /* Wait for any old workers to exit. */
416         if (pcxt->nworkers_launched > 0)
417         {
418                 WaitForParallelWorkersToFinish(pcxt);
419                 WaitForParallelWorkersToExit(pcxt);
420                 pcxt->nworkers_launched = 0;
421         }
422
423         /* Reset a few bits of fixed parallel state to a clean state. */
424         fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
425         fps->last_xlog_end = 0;
426
427         /* Recreate error queues. */
428         error_queue_space =
429                 shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, false);
430         for (i = 0; i < pcxt->nworkers; ++i)
431         {
432                 char       *start;
433                 shm_mq     *mq;
434
435                 start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
436                 mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
437                 shm_mq_set_receiver(mq, MyProc);
438                 pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
439         }
440 }
441
442 /*
443  * Launch parallel workers.
444  */
445 void
446 LaunchParallelWorkers(ParallelContext *pcxt)
447 {
448         MemoryContext oldcontext;
449         BackgroundWorker worker;
450         int                     i;
451         bool            any_registrations_failed = false;
452
453         /* Skip this if we have no workers. */
454         if (pcxt->nworkers == 0)
455                 return;
456
457         /* We need to be a lock group leader. */
458         BecomeLockGroupLeader();
459
460         /* If we do have workers, we'd better have a DSM segment. */
461         Assert(pcxt->seg != NULL);
462
463         /* We might be running in a short-lived memory context. */
464         oldcontext = MemoryContextSwitchTo(TopTransactionContext);
465
466         /* Configure a worker. */
467         memset(&worker, 0, sizeof(worker));
468         snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d",
469                          MyProcPid);
470         worker.bgw_flags =
471                 BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION
472                 | BGWORKER_CLASS_PARALLEL;
473         worker.bgw_start_time = BgWorkerStart_ConsistentState;
474         worker.bgw_restart_time = BGW_NEVER_RESTART;
475         sprintf(worker.bgw_library_name, "postgres");
476         sprintf(worker.bgw_function_name, "ParallelWorkerMain");
477         worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(pcxt->seg));
478         worker.bgw_notify_pid = MyProcPid;
479
480         /*
481          * Start workers.
482          *
483          * The caller must be able to tolerate ending up with fewer workers than
484          * expected, so there is no need to throw an error here if registration
485          * fails.  It wouldn't help much anyway, because registering the worker in
486          * no way guarantees that it will start up and initialize successfully.
487          */
488         for (i = 0; i < pcxt->nworkers; ++i)
489         {
490                 memcpy(worker.bgw_extra, &i, sizeof(int));
491                 if (!any_registrations_failed &&
492                         RegisterDynamicBackgroundWorker(&worker,
493                                                                                         &pcxt->worker[i].bgwhandle))
494                 {
495                         shm_mq_set_handle(pcxt->worker[i].error_mqh,
496                                                           pcxt->worker[i].bgwhandle);
497                         pcxt->nworkers_launched++;
498                 }
499                 else
500                 {
501                         /*
502                          * If we weren't able to register the worker, then we've bumped up
503                          * against the max_worker_processes limit, and future
504                          * registrations will probably fail too, so arrange to skip them.
505                          * But we still have to execute this code for the remaining slots
506                          * to make sure that we forget about the error queues we budgeted
507                          * for those workers.  Otherwise, we'll wait for them to start,
508                          * but they never will.
509                          */
510                         any_registrations_failed = true;
511                         pcxt->worker[i].bgwhandle = NULL;
512                         shm_mq_detach(pcxt->worker[i].error_mqh);
513                         pcxt->worker[i].error_mqh = NULL;
514                 }
515         }
516
517         /* Restore previous memory context. */
518         MemoryContextSwitchTo(oldcontext);
519 }
520
521 /*
522  * Wait for all workers to finish computing.
523  *
524  * Even if the parallel operation seems to have completed successfully, it's
525  * important to call this function afterwards.  We must not miss any errors
526  * the workers may have thrown during the parallel operation, or any that they
527  * may yet throw while shutting down.
528  *
529  * Also, we want to update our notion of XactLastRecEnd based on worker
530  * feedback.
531  */
532 void
533 WaitForParallelWorkersToFinish(ParallelContext *pcxt)
534 {
535         for (;;)
536         {
537                 bool            anyone_alive = false;
538                 int                     i;
539
540                 /*
541                  * This will process any parallel messages that are pending, which may
542                  * change the outcome of the loop that follows.  It may also throw an
543                  * error propagated from a worker.
544                  */
545                 CHECK_FOR_INTERRUPTS();
546
547                 for (i = 0; i < pcxt->nworkers_launched; ++i)
548                 {
549                         if (pcxt->worker[i].error_mqh != NULL)
550                         {
551                                 anyone_alive = true;
552                                 break;
553                         }
554                 }
555
556                 if (!anyone_alive)
557                         break;
558
559                 WaitLatch(MyLatch, WL_LATCH_SET, -1,
560                                   WAIT_EVENT_PARALLEL_FINISH);
561                 ResetLatch(MyLatch);
562         }
563
564         if (pcxt->toc != NULL)
565         {
566                 FixedParallelState *fps;
567
568                 fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
569                 if (fps->last_xlog_end > XactLastRecEnd)
570                         XactLastRecEnd = fps->last_xlog_end;
571         }
572 }
573
574 /*
575  * Wait for all workers to exit.
576  *
577  * This function ensures that workers have been completely shutdown.  The
578  * difference between WaitForParallelWorkersToFinish and this function is
579  * that former just ensures that last message sent by worker backend is
580  * received by master backend whereas this ensures the complete shutdown.
581  */
582 static void
583 WaitForParallelWorkersToExit(ParallelContext *pcxt)
584 {
585         int                     i;
586
587         /* Wait until the workers actually die. */
588         for (i = 0; i < pcxt->nworkers_launched; ++i)
589         {
590                 BgwHandleStatus status;
591
592                 if (pcxt->worker == NULL || pcxt->worker[i].bgwhandle == NULL)
593                         continue;
594
595                 status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle);
596
597                 /*
598                  * If the postmaster kicked the bucket, we have no chance of cleaning
599                  * up safely -- we won't be able to tell when our workers are actually
600                  * dead.  This doesn't necessitate a PANIC since they will all abort
601                  * eventually, but we can't safely continue this session.
602                  */
603                 if (status == BGWH_POSTMASTER_DIED)
604                         ereport(FATAL,
605                                         (errcode(ERRCODE_ADMIN_SHUTDOWN),
606                                          errmsg("postmaster exited during a parallel transaction")));
607
608                 /* Release memory. */
609                 pfree(pcxt->worker[i].bgwhandle);
610                 pcxt->worker[i].bgwhandle = NULL;
611         }
612 }
613
614 /*
615  * Destroy a parallel context.
616  *
617  * If expecting a clean exit, you should use WaitForParallelWorkersToFinish()
618  * first, before calling this function.  When this function is invoked, any
619  * remaining workers are forcibly killed; the dynamic shared memory segment
620  * is unmapped; and we then wait (uninterruptibly) for the workers to exit.
621  */
622 void
623 DestroyParallelContext(ParallelContext *pcxt)
624 {
625         int                     i;
626
627         /*
628          * Be careful about order of operations here!  We remove the parallel
629          * context from the list before we do anything else; otherwise, if an
630          * error occurs during a subsequent step, we might try to nuke it again
631          * from AtEOXact_Parallel or AtEOSubXact_Parallel.
632          */
633         dlist_delete(&pcxt->node);
634
635         /* Kill each worker in turn, and forget their error queues. */
636         if (pcxt->worker != NULL)
637         {
638                 for (i = 0; i < pcxt->nworkers_launched; ++i)
639                 {
640                         if (pcxt->worker[i].error_mqh != NULL)
641                         {
642                                 TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
643
644                                 shm_mq_detach(pcxt->worker[i].error_mqh);
645                                 pcxt->worker[i].error_mqh = NULL;
646                         }
647                 }
648         }
649
650         /*
651          * If we have allocated a shared memory segment, detach it.  This will
652          * implicitly detach the error queues, and any other shared memory queues,
653          * stored there.
654          */
655         if (pcxt->seg != NULL)
656         {
657                 dsm_detach(pcxt->seg);
658                 pcxt->seg = NULL;
659         }
660
661         /*
662          * If this parallel context is actually in backend-private memory rather
663          * than shared memory, free that memory instead.
664          */
665         if (pcxt->private_memory != NULL)
666         {
667                 pfree(pcxt->private_memory);
668                 pcxt->private_memory = NULL;
669         }
670
671         /*
672          * We can't finish transaction commit or abort until all of the workers
673          * have exited.  This means, in particular, that we can't respond to
674          * interrupts at this stage.
675          */
676         HOLD_INTERRUPTS();
677         WaitForParallelWorkersToExit(pcxt);
678         RESUME_INTERRUPTS();
679
680         /* Free the worker array itself. */
681         if (pcxt->worker != NULL)
682         {
683                 pfree(pcxt->worker);
684                 pcxt->worker = NULL;
685         }
686
687         /* Free memory. */
688         pfree(pcxt->library_name);
689         pfree(pcxt->function_name);
690         pfree(pcxt);
691 }
692
693 /*
694  * Are there any parallel contexts currently active?
695  */
696 bool
697 ParallelContextActive(void)
698 {
699         return !dlist_is_empty(&pcxt_list);
700 }
701
702 /*
703  * Handle receipt of an interrupt indicating a parallel worker message.
704  *
705  * Note: this is called within a signal handler!  All we can do is set
706  * a flag that will cause the next CHECK_FOR_INTERRUPTS() to invoke
707  * HandleParallelMessages().
708  */
709 void
710 HandleParallelMessageInterrupt(void)
711 {
712         InterruptPending = true;
713         ParallelMessagePending = true;
714         SetLatch(MyLatch);
715 }
716
717 /*
718  * Handle any queued protocol messages received from parallel workers.
719  */
720 void
721 HandleParallelMessages(void)
722 {
723         dlist_iter      iter;
724         MemoryContext oldcontext;
725
726         static MemoryContext hpm_context = NULL;
727
728         /*
729          * This is invoked from ProcessInterrupts(), and since some of the
730          * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
731          * for recursive calls if more signals are received while this runs.  It's
732          * unclear that recursive entry would be safe, and it doesn't seem useful
733          * even if it is safe, so let's block interrupts until done.
734          */
735         HOLD_INTERRUPTS();
736
737         /*
738          * Moreover, CurrentMemoryContext might be pointing almost anywhere.  We
739          * don't want to risk leaking data into long-lived contexts, so let's do
740          * our work here in a private context that we can reset on each use.
741          */
742         if (hpm_context == NULL)        /* first time through? */
743                 hpm_context = AllocSetContextCreate(TopMemoryContext,
744                                                                                         "HandleParallelMessages",
745                                                                                         ALLOCSET_DEFAULT_SIZES);
746         else
747                 MemoryContextReset(hpm_context);
748
749         oldcontext = MemoryContextSwitchTo(hpm_context);
750
751         /* OK to process messages.  Reset the flag saying there are more to do. */
752         ParallelMessagePending = false;
753
754         dlist_foreach(iter, &pcxt_list)
755         {
756                 ParallelContext *pcxt;
757                 int                     i;
758
759                 pcxt = dlist_container(ParallelContext, node, iter.cur);
760                 if (pcxt->worker == NULL)
761                         continue;
762
763                 for (i = 0; i < pcxt->nworkers_launched; ++i)
764                 {
765                         /*
766                          * Read as many messages as we can from each worker, but stop when
767                          * either (1) the worker's error queue goes away, which can happen
768                          * if we receive a Terminate message from the worker; or (2) no
769                          * more messages can be read from the worker without blocking.
770                          */
771                         while (pcxt->worker[i].error_mqh != NULL)
772                         {
773                                 shm_mq_result res;
774                                 Size            nbytes;
775                                 void       *data;
776
777                                 res = shm_mq_receive(pcxt->worker[i].error_mqh, &nbytes,
778                                                                          &data, true);
779                                 if (res == SHM_MQ_WOULD_BLOCK)
780                                         break;
781                                 else if (res == SHM_MQ_SUCCESS)
782                                 {
783                                         StringInfoData msg;
784
785                                         initStringInfo(&msg);
786                                         appendBinaryStringInfo(&msg, data, nbytes);
787                                         HandleParallelMessage(pcxt, i, &msg);
788                                         pfree(msg.data);
789                                 }
790                                 else
791                                         ereport(ERROR,
792                                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
793                                                          errmsg("lost connection to parallel worker")));
794                         }
795                 }
796         }
797
798         MemoryContextSwitchTo(oldcontext);
799
800         /* Might as well clear the context on our way out */
801         MemoryContextReset(hpm_context);
802
803         RESUME_INTERRUPTS();
804 }
805
806 /*
807  * Handle a single protocol message received from a single parallel worker.
808  */
809 static void
810 HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
811 {
812         char            msgtype;
813
814         msgtype = pq_getmsgbyte(msg);
815
816         switch (msgtype)
817         {
818                 case 'K':                               /* BackendKeyData */
819                         {
820                                 int32           pid = pq_getmsgint(msg, 4);
821
822                                 (void) pq_getmsgint(msg, 4);    /* discard cancel key */
823                                 (void) pq_getmsgend(msg);
824                                 pcxt->worker[i].pid = pid;
825                                 break;
826                         }
827
828                 case 'E':                               /* ErrorResponse */
829                 case 'N':                               /* NoticeResponse */
830                         {
831                                 ErrorData       edata;
832                                 ErrorContextCallback *save_error_context_stack;
833
834                                 /* Parse ErrorResponse or NoticeResponse. */
835                                 pq_parse_errornotice(msg, &edata);
836
837                                 /* Death of a worker isn't enough justification for suicide. */
838                                 edata.elevel = Min(edata.elevel, ERROR);
839
840                                 /*
841                                  * If desired, add a context line to show that this is a
842                                  * message propagated from a parallel worker.  Otherwise, it
843                                  * can sometimes be confusing to understand what actually
844                                  * happened.  (We don't do this in FORCE_PARALLEL_REGRESS mode
845                                  * because it causes test-result instability depending on
846                                  * whether a parallel worker is actually used or not.)
847                                  */
848                                 if (force_parallel_mode != FORCE_PARALLEL_REGRESS)
849                                 {
850                                         if (edata.context)
851                                                 edata.context = psprintf("%s\n%s", edata.context,
852                                                                                                  _("parallel worker"));
853                                         else
854                                                 edata.context = pstrdup(_("parallel worker"));
855                                 }
856
857                                 /*
858                                  * Context beyond that should use the error context callbacks
859                                  * that were in effect when the ParallelContext was created,
860                                  * not the current ones.
861                                  */
862                                 save_error_context_stack = error_context_stack;
863                                 error_context_stack = pcxt->error_context_stack;
864
865                                 /* Rethrow error or print notice. */
866                                 ThrowErrorData(&edata);
867
868                                 /* Not an error, so restore previous context stack. */
869                                 error_context_stack = save_error_context_stack;
870
871                                 break;
872                         }
873
874                 case 'A':                               /* NotifyResponse */
875                         {
876                                 /* Propagate NotifyResponse. */
877                                 int32           pid;
878                                 const char *channel;
879                                 const char *payload;
880
881                                 pid = pq_getmsgint(msg, 4);
882                                 channel = pq_getmsgrawstring(msg);
883                                 payload = pq_getmsgrawstring(msg);
884                                 pq_endmessage(msg);
885
886                                 NotifyMyFrontEnd(channel, payload, pid);
887
888                                 break;
889                         }
890
891                 case 'X':                               /* Terminate, indicating clean exit */
892                         {
893                                 shm_mq_detach(pcxt->worker[i].error_mqh);
894                                 pcxt->worker[i].error_mqh = NULL;
895                                 break;
896                         }
897
898                 default:
899                         {
900                                 elog(ERROR, "unrecognized message type received from parallel worker: %c (message length %d bytes)",
901                                          msgtype, msg->len);
902                         }
903         }
904 }
905
906 /*
907  * End-of-subtransaction cleanup for parallel contexts.
908  *
909  * Currently, it's forbidden to enter or leave a subtransaction while
910  * parallel mode is in effect, so we could just blow away everything.  But
911  * we may want to relax that restriction in the future, so this code
912  * contemplates that there may be multiple subtransaction IDs in pcxt_list.
913  */
914 void
915 AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId)
916 {
917         while (!dlist_is_empty(&pcxt_list))
918         {
919                 ParallelContext *pcxt;
920
921                 pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
922                 if (pcxt->subid != mySubId)
923                         break;
924                 if (isCommit)
925                         elog(WARNING, "leaked parallel context");
926                 DestroyParallelContext(pcxt);
927         }
928 }
929
930 /*
931  * End-of-transaction cleanup for parallel contexts.
932  */
933 void
934 AtEOXact_Parallel(bool isCommit)
935 {
936         while (!dlist_is_empty(&pcxt_list))
937         {
938                 ParallelContext *pcxt;
939
940                 pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
941                 if (isCommit)
942                         elog(WARNING, "leaked parallel context");
943                 DestroyParallelContext(pcxt);
944         }
945 }
946
947 /*
948  * Main entrypoint for parallel workers.
949  */
950 void
951 ParallelWorkerMain(Datum main_arg)
952 {
953         dsm_segment *seg;
954         shm_toc    *toc;
955         FixedParallelState *fps;
956         char       *error_queue_space;
957         shm_mq     *mq;
958         shm_mq_handle *mqh;
959         char       *libraryspace;
960         char       *entrypointstate;
961         char       *library_name;
962         char       *function_name;
963         parallel_worker_main_type entrypt;
964         char       *gucspace;
965         char       *combocidspace;
966         char       *tsnapspace;
967         char       *asnapspace;
968         char       *tstatespace;
969         StringInfoData msgbuf;
970         char       *session_dsm_handle_space;
971
972         /* Set flag to indicate that we're initializing a parallel worker. */
973         InitializingParallelWorker = true;
974
975         /* Establish signal handlers. */
976         pqsignal(SIGTERM, die);
977         BackgroundWorkerUnblockSignals();
978
979         /* Determine and set our parallel worker number. */
980         Assert(ParallelWorkerNumber == -1);
981         memcpy(&ParallelWorkerNumber, MyBgworkerEntry->bgw_extra, sizeof(int));
982
983         /* Set up a memory context and resource owner. */
984         Assert(CurrentResourceOwner == NULL);
985         CurrentResourceOwner = ResourceOwnerCreate(NULL, "parallel toplevel");
986         CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext,
987                                                                                                  "Parallel worker",
988                                                                                                  ALLOCSET_DEFAULT_SIZES);
989
990         /*
991          * Now that we have a resource owner, we can attach to the dynamic shared
992          * memory segment and read the table of contents.
993          */
994         seg = dsm_attach(DatumGetUInt32(main_arg));
995         if (seg == NULL)
996                 ereport(ERROR,
997                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
998                                  errmsg("could not map dynamic shared memory segment")));
999         toc = shm_toc_attach(PARALLEL_MAGIC, dsm_segment_address(seg));
1000         if (toc == NULL)
1001                 ereport(ERROR,
1002                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1003                                  errmsg("invalid magic number in dynamic shared memory segment")));
1004
1005         /* Look up fixed parallel state. */
1006         fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED, false);
1007         MyFixedParallelState = fps;
1008
1009         /*
1010          * Now that we have a worker number, we can find and attach to the error
1011          * queue provided for us.  That's good, because until we do that, any
1012          * errors that happen here will not be reported back to the process that
1013          * requested that this worker be launched.
1014          */
1015         error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE, false);
1016         mq = (shm_mq *) (error_queue_space +
1017                                          ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE);
1018         shm_mq_set_sender(mq, MyProc);
1019         mqh = shm_mq_attach(mq, seg, NULL);
1020         pq_redirect_to_shm_mq(seg, mqh);
1021         pq_set_parallel_master(fps->parallel_master_pid,
1022                                                    fps->parallel_master_backend_id);
1023
1024         /*
1025          * Send a BackendKeyData message to the process that initiated parallelism
1026          * so that it has access to our PID before it receives any other messages
1027          * from us.  Our cancel key is sent, too, since that's the way the
1028          * protocol message is defined, but it won't actually be used for anything
1029          * in this case.
1030          */
1031         pq_beginmessage(&msgbuf, 'K');
1032         pq_sendint(&msgbuf, (int32) MyProcPid, sizeof(int32));
1033         pq_sendint(&msgbuf, (int32) MyCancelKey, sizeof(int32));
1034         pq_endmessage(&msgbuf);
1035
1036         /*
1037          * Hooray! Primary initialization is complete.  Now, we need to set up our
1038          * backend-local state to match the original backend.
1039          */
1040
1041         /*
1042          * Join locking group.  We must do this before anything that could try to
1043          * acquire a heavyweight lock, because any heavyweight locks acquired to
1044          * this point could block either directly against the parallel group
1045          * leader or against some process which in turn waits for a lock that
1046          * conflicts with the parallel group leader, causing an undetected
1047          * deadlock.  (If we can't join the lock group, the leader has gone away,
1048          * so just exit quietly.)
1049          */
1050         if (!BecomeLockGroupMember(fps->parallel_master_pgproc,
1051                                                            fps->parallel_master_pid))
1052                 return;
1053
1054         /*
1055          * Load libraries that were loaded by original backend.  We want to do
1056          * this before restoring GUCs, because the libraries might define custom
1057          * variables.
1058          */
1059         libraryspace = shm_toc_lookup(toc, PARALLEL_KEY_LIBRARY, false);
1060         RestoreLibraryState(libraryspace);
1061
1062         /*
1063          * Identify the entry point to be called.  In theory this could result in
1064          * loading an additional library, though most likely the entry point is in
1065          * the core backend or in a library we just loaded.
1066          */
1067         entrypointstate = shm_toc_lookup(toc, PARALLEL_KEY_ENTRYPOINT, false);
1068         library_name = entrypointstate;
1069         function_name = entrypointstate + strlen(library_name) + 1;
1070
1071         entrypt = LookupParallelWorkerFunction(library_name, function_name);
1072
1073         /* Restore database connection. */
1074         BackgroundWorkerInitializeConnectionByOid(fps->database_id,
1075                                                                                           fps->authenticated_user_id);
1076
1077         /*
1078          * Set the client encoding to the database encoding, since that is what
1079          * the leader will expect.
1080          */
1081         SetClientEncoding(GetDatabaseEncoding());
1082
1083         /* Restore GUC values from launching backend. */
1084         gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC, false);
1085         StartTransactionCommand();
1086         RestoreGUCState(gucspace);
1087         CommitTransactionCommand();
1088
1089         /* Crank up a transaction state appropriate to a parallel worker. */
1090         tstatespace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_STATE, false);
1091         StartParallelWorkerTransaction(tstatespace);
1092
1093         /* Restore combo CID state. */
1094         combocidspace = shm_toc_lookup(toc, PARALLEL_KEY_COMBO_CID, false);
1095         RestoreComboCIDState(combocidspace);
1096
1097         /* Attach to the per-session DSM segment and contained objects. */
1098         session_dsm_handle_space =
1099                 shm_toc_lookup(toc, PARALLEL_KEY_SESSION_DSM, false);
1100         AttachSession(*(dsm_handle *) session_dsm_handle_space);
1101
1102         /* Restore transaction snapshot. */
1103         tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT, false);
1104         RestoreTransactionSnapshot(RestoreSnapshot(tsnapspace),
1105                                                            fps->parallel_master_pgproc);
1106
1107         /* Restore active snapshot. */
1108         asnapspace = shm_toc_lookup(toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, false);
1109         PushActiveSnapshot(RestoreSnapshot(asnapspace));
1110
1111         /*
1112          * We've changed which tuples we can see, and must therefore invalidate
1113          * system caches.
1114          */
1115         InvalidateSystemCaches();
1116
1117         /* Restore user ID and security context. */
1118         SetUserIdAndSecContext(fps->current_user_id, fps->sec_context);
1119
1120         /* Restore temp-namespace state to ensure search path matches leader's. */
1121         SetTempNamespaceState(fps->temp_namespace_id,
1122                                                   fps->temp_toast_namespace_id);
1123
1124         /* Set ParallelMasterBackendId so we know how to address temp relations. */
1125         ParallelMasterBackendId = fps->parallel_master_backend_id;
1126
1127         /*
1128          * We've initialized all of our state now; nothing should change
1129          * hereafter.
1130          */
1131         InitializingParallelWorker = false;
1132         EnterParallelMode();
1133
1134         /*
1135          * Time to do the real work: invoke the caller-supplied code.
1136          */
1137         entrypt(seg, toc);
1138
1139         /* Must exit parallel mode to pop active snapshot. */
1140         ExitParallelMode();
1141
1142         /* Must pop active snapshot so resowner.c doesn't complain. */
1143         PopActiveSnapshot();
1144
1145         /* Shut down the parallel-worker transaction. */
1146         EndParallelWorkerTransaction();
1147
1148         /* Detach from the per-session DSM segment. */
1149         DetachSession();
1150
1151         /* Report success. */
1152         pq_putmessage('X', NULL, 0);
1153 }
1154
1155 /*
1156  * Update shared memory with the ending location of the last WAL record we
1157  * wrote, if it's greater than the value already stored there.
1158  */
1159 void
1160 ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end)
1161 {
1162         FixedParallelState *fps = MyFixedParallelState;
1163
1164         Assert(fps != NULL);
1165         SpinLockAcquire(&fps->mutex);
1166         if (fps->last_xlog_end < last_xlog_end)
1167                 fps->last_xlog_end = last_xlog_end;
1168         SpinLockRelease(&fps->mutex);
1169 }
1170
1171 /*
1172  * Look up (and possibly load) a parallel worker entry point function.
1173  *
1174  * For functions contained in the core code, we use library name "postgres"
1175  * and consult the InternalParallelWorkers array.  External functions are
1176  * looked up, and loaded if necessary, using load_external_function().
1177  *
1178  * The point of this is to pass function names as strings across process
1179  * boundaries.  We can't pass actual function addresses because of the
1180  * possibility that the function has been loaded at a different address
1181  * in a different process.  This is obviously a hazard for functions in
1182  * loadable libraries, but it can happen even for functions in the core code
1183  * on platforms using EXEC_BACKEND (e.g., Windows).
1184  *
1185  * At some point it might be worthwhile to get rid of InternalParallelWorkers[]
1186  * in favor of applying load_external_function() for core functions too;
1187  * but that raises portability issues that are not worth addressing now.
1188  */
1189 static parallel_worker_main_type
1190 LookupParallelWorkerFunction(const char *libraryname, const char *funcname)
1191 {
1192         /*
1193          * If the function is to be loaded from postgres itself, search the
1194          * InternalParallelWorkers array.
1195          */
1196         if (strcmp(libraryname, "postgres") == 0)
1197         {
1198                 int                     i;
1199
1200                 for (i = 0; i < lengthof(InternalParallelWorkers); i++)
1201                 {
1202                         if (strcmp(InternalParallelWorkers[i].fn_name, funcname) == 0)
1203                                 return InternalParallelWorkers[i].fn_addr;
1204                 }
1205
1206                 /* We can only reach this by programming error. */
1207                 elog(ERROR, "internal function \"%s\" not found", funcname);
1208         }
1209
1210         /* Otherwise load from external library. */
1211         return (parallel_worker_main_type)
1212                 load_external_function(libraryname, funcname, true, NULL);
1213 }