]> granicus.if.org Git - postgresql/blob - src/backend/access/transam/parallel.c
Update copyright for 2018
[postgresql] / src / backend / access / transam / parallel.c
1 /*-------------------------------------------------------------------------
2  *
3  * parallel.c
4  *        Infrastructure for launching parallel workers
5  *
6  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  *        src/backend/access/transam/parallel.c
11  *
12  *-------------------------------------------------------------------------
13  */
14
15 #include "postgres.h"
16
17 #include "access/parallel.h"
18 #include "access/session.h"
19 #include "access/xact.h"
20 #include "access/xlog.h"
21 #include "catalog/namespace.h"
22 #include "commands/async.h"
23 #include "executor/execParallel.h"
24 #include "libpq/libpq.h"
25 #include "libpq/pqformat.h"
26 #include "libpq/pqmq.h"
27 #include "miscadmin.h"
28 #include "optimizer/planmain.h"
29 #include "pgstat.h"
30 #include "storage/ipc.h"
31 #include "storage/sinval.h"
32 #include "storage/spin.h"
33 #include "tcop/tcopprot.h"
34 #include "utils/combocid.h"
35 #include "utils/guc.h"
36 #include "utils/inval.h"
37 #include "utils/memutils.h"
38 #include "utils/resowner.h"
39 #include "utils/snapmgr.h"
40 #include "utils/typcache.h"
41
42
43 /*
44  * We don't want to waste a lot of memory on an error queue which, most of
45  * the time, will process only a handful of small messages.  However, it is
46  * desirable to make it large enough that a typical ErrorResponse can be sent
47  * without blocking.  That way, a worker that errors out can write the whole
48  * message into the queue and terminate without waiting for the user backend.
49  */
50 #define PARALLEL_ERROR_QUEUE_SIZE                       16384
51
52 /* Magic number for parallel context TOC. */
53 #define PARALLEL_MAGIC                                          0x50477c7c
54
55 /*
56  * Magic numbers for per-context parallel state sharing.  Higher-level code
57  * should use smaller values, leaving these very large ones for use by this
58  * module.
59  */
60 #define PARALLEL_KEY_FIXED                                      UINT64CONST(0xFFFFFFFFFFFF0001)
61 #define PARALLEL_KEY_ERROR_QUEUE                        UINT64CONST(0xFFFFFFFFFFFF0002)
62 #define PARALLEL_KEY_LIBRARY                            UINT64CONST(0xFFFFFFFFFFFF0003)
63 #define PARALLEL_KEY_GUC                                        UINT64CONST(0xFFFFFFFFFFFF0004)
64 #define PARALLEL_KEY_COMBO_CID                          UINT64CONST(0xFFFFFFFFFFFF0005)
65 #define PARALLEL_KEY_TRANSACTION_SNAPSHOT       UINT64CONST(0xFFFFFFFFFFFF0006)
66 #define PARALLEL_KEY_ACTIVE_SNAPSHOT            UINT64CONST(0xFFFFFFFFFFFF0007)
67 #define PARALLEL_KEY_TRANSACTION_STATE          UINT64CONST(0xFFFFFFFFFFFF0008)
68 #define PARALLEL_KEY_ENTRYPOINT                         UINT64CONST(0xFFFFFFFFFFFF0009)
69 #define PARALLEL_KEY_SESSION_DSM                        UINT64CONST(0xFFFFFFFFFFFF000A)
70
71 /* Fixed-size parallel state. */
72 typedef struct FixedParallelState
73 {
74         /* Fixed-size state that workers must restore. */
75         Oid                     database_id;
76         Oid                     authenticated_user_id;
77         Oid                     current_user_id;
78         Oid                     outer_user_id;
79         Oid                     temp_namespace_id;
80         Oid                     temp_toast_namespace_id;
81         int                     sec_context;
82         bool            is_superuser;
83         PGPROC     *parallel_master_pgproc;
84         pid_t           parallel_master_pid;
85         BackendId       parallel_master_backend_id;
86
87         /* Mutex protects remaining fields. */
88         slock_t         mutex;
89
90         /* Maximum XactLastRecEnd of any worker. */
91         XLogRecPtr      last_xlog_end;
92 } FixedParallelState;
93
94 /*
95  * Our parallel worker number.  We initialize this to -1, meaning that we are
96  * not a parallel worker.  In parallel workers, it will be set to a value >= 0
97  * and < the number of workers before any user code is invoked; each parallel
98  * worker will get a different parallel worker number.
99  */
100 int                     ParallelWorkerNumber = -1;
101
102 /* Is there a parallel message pending which we need to receive? */
103 volatile bool ParallelMessagePending = false;
104
105 /* Are we initializing a parallel worker? */
106 bool            InitializingParallelWorker = false;
107
108 /* Pointer to our fixed parallel state. */
109 static FixedParallelState *MyFixedParallelState;
110
111 /* List of active parallel contexts. */
112 static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list);
113
114 /*
115  * List of internal parallel worker entry points.  We need this for
116  * reasons explained in LookupParallelWorkerFunction(), below.
117  */
118 static const struct
119 {
120         const char *fn_name;
121         parallel_worker_main_type fn_addr;
122 }                       InternalParallelWorkers[] =
123
124 {
125         {
126                 "ParallelQueryMain", ParallelQueryMain
127         }
128 };
129
130 /* Private functions. */
131 static void HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg);
132 static void WaitForParallelWorkersToExit(ParallelContext *pcxt);
133 static parallel_worker_main_type LookupParallelWorkerFunction(const char *libraryname, const char *funcname);
134
135
136 /*
137  * Establish a new parallel context.  This should be done after entering
138  * parallel mode, and (unless there is an error) the context should be
139  * destroyed before exiting the current subtransaction.
140  */
141 ParallelContext *
142 CreateParallelContext(const char *library_name, const char *function_name,
143                                           int nworkers)
144 {
145         MemoryContext oldcontext;
146         ParallelContext *pcxt;
147
148         /* It is unsafe to create a parallel context if not in parallel mode. */
149         Assert(IsInParallelMode());
150
151         /* Number of workers should be non-negative. */
152         Assert(nworkers >= 0);
153
154         /*
155          * If dynamic shared memory is not available, we won't be able to use
156          * background workers.
157          */
158         if (dynamic_shared_memory_type == DSM_IMPL_NONE)
159                 nworkers = 0;
160
161         /*
162          * If we are running under serializable isolation, we can't use parallel
163          * workers, at least not until somebody enhances that mechanism to be
164          * parallel-aware.
165          */
166         if (IsolationIsSerializable())
167                 nworkers = 0;
168
169         /* We might be running in a short-lived memory context. */
170         oldcontext = MemoryContextSwitchTo(TopTransactionContext);
171
172         /* Initialize a new ParallelContext. */
173         pcxt = palloc0(sizeof(ParallelContext));
174         pcxt->subid = GetCurrentSubTransactionId();
175         pcxt->nworkers = nworkers;
176         pcxt->library_name = pstrdup(library_name);
177         pcxt->function_name = pstrdup(function_name);
178         pcxt->error_context_stack = error_context_stack;
179         shm_toc_initialize_estimator(&pcxt->estimator);
180         dlist_push_head(&pcxt_list, &pcxt->node);
181
182         /* Restore previous memory context. */
183         MemoryContextSwitchTo(oldcontext);
184
185         return pcxt;
186 }
187
188 /*
189  * Establish the dynamic shared memory segment for a parallel context and
190  * copy state and other bookkeeping information that will be needed by
191  * parallel workers into it.
192  */
193 void
194 InitializeParallelDSM(ParallelContext *pcxt)
195 {
196         MemoryContext oldcontext;
197         Size            library_len = 0;
198         Size            guc_len = 0;
199         Size            combocidlen = 0;
200         Size            tsnaplen = 0;
201         Size            asnaplen = 0;
202         Size            tstatelen = 0;
203         Size            segsize = 0;
204         int                     i;
205         FixedParallelState *fps;
206         dsm_handle      session_dsm_handle = DSM_HANDLE_INVALID;
207         Snapshot        transaction_snapshot = GetTransactionSnapshot();
208         Snapshot        active_snapshot = GetActiveSnapshot();
209
210         /* We might be running in a very short-lived memory context. */
211         oldcontext = MemoryContextSwitchTo(TopTransactionContext);
212
213         /* Allow space to store the fixed-size parallel state. */
214         shm_toc_estimate_chunk(&pcxt->estimator, sizeof(FixedParallelState));
215         shm_toc_estimate_keys(&pcxt->estimator, 1);
216
217         /*
218          * Normally, the user will have requested at least one worker process, but
219          * if by chance they have not, we can skip a bunch of things here.
220          */
221         if (pcxt->nworkers > 0)
222         {
223                 /* Get (or create) the per-session DSM segment's handle. */
224                 session_dsm_handle = GetSessionDsmHandle();
225
226                 /*
227                  * If we weren't able to create a per-session DSM segment, then we can
228                  * continue but we can't safely launch any workers because their
229                  * record typmods would be incompatible so they couldn't exchange
230                  * tuples.
231                  */
232                 if (session_dsm_handle == DSM_HANDLE_INVALID)
233                         pcxt->nworkers = 0;
234         }
235
236         if (pcxt->nworkers > 0)
237         {
238                 /* Estimate space for various kinds of state sharing. */
239                 library_len = EstimateLibraryStateSpace();
240                 shm_toc_estimate_chunk(&pcxt->estimator, library_len);
241                 guc_len = EstimateGUCStateSpace();
242                 shm_toc_estimate_chunk(&pcxt->estimator, guc_len);
243                 combocidlen = EstimateComboCIDStateSpace();
244                 shm_toc_estimate_chunk(&pcxt->estimator, combocidlen);
245                 tsnaplen = EstimateSnapshotSpace(transaction_snapshot);
246                 shm_toc_estimate_chunk(&pcxt->estimator, tsnaplen);
247                 asnaplen = EstimateSnapshotSpace(active_snapshot);
248                 shm_toc_estimate_chunk(&pcxt->estimator, asnaplen);
249                 tstatelen = EstimateTransactionStateSpace();
250                 shm_toc_estimate_chunk(&pcxt->estimator, tstatelen);
251                 shm_toc_estimate_chunk(&pcxt->estimator, sizeof(dsm_handle));
252                 /* If you add more chunks here, you probably need to add keys. */
253                 shm_toc_estimate_keys(&pcxt->estimator, 7);
254
255                 /* Estimate space need for error queues. */
256                 StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) ==
257                                                  PARALLEL_ERROR_QUEUE_SIZE,
258                                                  "parallel error queue size not buffer-aligned");
259                 shm_toc_estimate_chunk(&pcxt->estimator,
260                                                            mul_size(PARALLEL_ERROR_QUEUE_SIZE,
261                                                                                 pcxt->nworkers));
262                 shm_toc_estimate_keys(&pcxt->estimator, 1);
263
264                 /* Estimate how much we'll need for the entrypoint info. */
265                 shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name) +
266                                                            strlen(pcxt->function_name) + 2);
267                 shm_toc_estimate_keys(&pcxt->estimator, 1);
268         }
269
270         /*
271          * Create DSM and initialize with new table of contents.  But if the user
272          * didn't request any workers, then don't bother creating a dynamic shared
273          * memory segment; instead, just use backend-private memory.
274          *
275          * Also, if we can't create a dynamic shared memory segment because the
276          * maximum number of segments have already been created, then fall back to
277          * backend-private memory, and plan not to use any workers.  We hope this
278          * won't happen very often, but it's better to abandon the use of
279          * parallelism than to fail outright.
280          */
281         segsize = shm_toc_estimate(&pcxt->estimator);
282         if (pcxt->nworkers > 0)
283                 pcxt->seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS);
284         if (pcxt->seg != NULL)
285                 pcxt->toc = shm_toc_create(PARALLEL_MAGIC,
286                                                                    dsm_segment_address(pcxt->seg),
287                                                                    segsize);
288         else
289         {
290                 pcxt->nworkers = 0;
291                 pcxt->private_memory = MemoryContextAlloc(TopMemoryContext, segsize);
292                 pcxt->toc = shm_toc_create(PARALLEL_MAGIC, pcxt->private_memory,
293                                                                    segsize);
294         }
295
296         /* Initialize fixed-size state in shared memory. */
297         fps = (FixedParallelState *)
298                 shm_toc_allocate(pcxt->toc, sizeof(FixedParallelState));
299         fps->database_id = MyDatabaseId;
300         fps->authenticated_user_id = GetAuthenticatedUserId();
301         fps->outer_user_id = GetCurrentRoleId();
302         fps->is_superuser = session_auth_is_superuser;
303         GetUserIdAndSecContext(&fps->current_user_id, &fps->sec_context);
304         GetTempNamespaceState(&fps->temp_namespace_id,
305                                                   &fps->temp_toast_namespace_id);
306         fps->parallel_master_pgproc = MyProc;
307         fps->parallel_master_pid = MyProcPid;
308         fps->parallel_master_backend_id = MyBackendId;
309         SpinLockInit(&fps->mutex);
310         fps->last_xlog_end = 0;
311         shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps);
312
313         /* We can skip the rest of this if we're not budgeting for any workers. */
314         if (pcxt->nworkers > 0)
315         {
316                 char       *libraryspace;
317                 char       *gucspace;
318                 char       *combocidspace;
319                 char       *tsnapspace;
320                 char       *asnapspace;
321                 char       *tstatespace;
322                 char       *error_queue_space;
323                 char       *session_dsm_handle_space;
324                 char       *entrypointstate;
325                 Size            lnamelen;
326
327                 /* Serialize shared libraries we have loaded. */
328                 libraryspace = shm_toc_allocate(pcxt->toc, library_len);
329                 SerializeLibraryState(library_len, libraryspace);
330                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_LIBRARY, libraryspace);
331
332                 /* Serialize GUC settings. */
333                 gucspace = shm_toc_allocate(pcxt->toc, guc_len);
334                 SerializeGUCState(guc_len, gucspace);
335                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_GUC, gucspace);
336
337                 /* Serialize combo CID state. */
338                 combocidspace = shm_toc_allocate(pcxt->toc, combocidlen);
339                 SerializeComboCIDState(combocidlen, combocidspace);
340                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_COMBO_CID, combocidspace);
341
342                 /* Serialize transaction snapshot and active snapshot. */
343                 tsnapspace = shm_toc_allocate(pcxt->toc, tsnaplen);
344                 SerializeSnapshot(transaction_snapshot, tsnapspace);
345                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT,
346                                            tsnapspace);
347                 asnapspace = shm_toc_allocate(pcxt->toc, asnaplen);
348                 SerializeSnapshot(active_snapshot, asnapspace);
349                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, asnapspace);
350
351                 /* Provide the handle for per-session segment. */
352                 session_dsm_handle_space = shm_toc_allocate(pcxt->toc,
353                                                                                                         sizeof(dsm_handle));
354                 *(dsm_handle *) session_dsm_handle_space = session_dsm_handle;
355                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_SESSION_DSM,
356                                            session_dsm_handle_space);
357
358                 /* Serialize transaction state. */
359                 tstatespace = shm_toc_allocate(pcxt->toc, tstatelen);
360                 SerializeTransactionState(tstatelen, tstatespace);
361                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_STATE, tstatespace);
362
363                 /* Allocate space for worker information. */
364                 pcxt->worker = palloc0(sizeof(ParallelWorkerInfo) * pcxt->nworkers);
365
366                 /*
367                  * Establish error queues in dynamic shared memory.
368                  *
369                  * These queues should be used only for transmitting ErrorResponse,
370                  * NoticeResponse, and NotifyResponse protocol messages.  Tuple data
371                  * should be transmitted via separate (possibly larger?) queues.
372                  */
373                 error_queue_space =
374                         shm_toc_allocate(pcxt->toc,
375                                                          mul_size(PARALLEL_ERROR_QUEUE_SIZE,
376                                                                           pcxt->nworkers));
377                 for (i = 0; i < pcxt->nworkers; ++i)
378                 {
379                         char       *start;
380                         shm_mq     *mq;
381
382                         start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
383                         mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
384                         shm_mq_set_receiver(mq, MyProc);
385                         pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
386                 }
387                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, error_queue_space);
388
389                 /*
390                  * Serialize entrypoint information.  It's unsafe to pass function
391                  * pointers across processes, as the function pointer may be different
392                  * in each process in EXEC_BACKEND builds, so we always pass library
393                  * and function name.  (We use library name "postgres" for functions
394                  * in the core backend.)
395                  */
396                 lnamelen = strlen(pcxt->library_name);
397                 entrypointstate = shm_toc_allocate(pcxt->toc, lnamelen +
398                                                                                    strlen(pcxt->function_name) + 2);
399                 strcpy(entrypointstate, pcxt->library_name);
400                 strcpy(entrypointstate + lnamelen + 1, pcxt->function_name);
401                 shm_toc_insert(pcxt->toc, PARALLEL_KEY_ENTRYPOINT, entrypointstate);
402         }
403
404         /* Restore previous memory context. */
405         MemoryContextSwitchTo(oldcontext);
406 }
407
408 /*
409  * Reinitialize the dynamic shared memory segment for a parallel context such
410  * that we could launch workers for it again.
411  */
412 void
413 ReinitializeParallelDSM(ParallelContext *pcxt)
414 {
415         FixedParallelState *fps;
416         char       *error_queue_space;
417         int                     i;
418
419         /* Wait for any old workers to exit. */
420         if (pcxt->nworkers_launched > 0)
421         {
422                 WaitForParallelWorkersToFinish(pcxt);
423                 WaitForParallelWorkersToExit(pcxt);
424                 pcxt->nworkers_launched = 0;
425         }
426
427         /* Reset a few bits of fixed parallel state to a clean state. */
428         fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
429         fps->last_xlog_end = 0;
430
431         /* Recreate error queues (if they exist). */
432         error_queue_space =
433                 shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, true);
434         Assert(pcxt->nworkers == 0 || error_queue_space != NULL);
435         for (i = 0; i < pcxt->nworkers; ++i)
436         {
437                 char       *start;
438                 shm_mq     *mq;
439
440                 start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
441                 mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
442                 shm_mq_set_receiver(mq, MyProc);
443                 pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
444         }
445 }
446
447 /*
448  * Launch parallel workers.
449  */
450 void
451 LaunchParallelWorkers(ParallelContext *pcxt)
452 {
453         MemoryContext oldcontext;
454         BackgroundWorker worker;
455         int                     i;
456         bool            any_registrations_failed = false;
457
458         /* Skip this if we have no workers. */
459         if (pcxt->nworkers == 0)
460                 return;
461
462         /* We need to be a lock group leader. */
463         BecomeLockGroupLeader();
464
465         /* If we do have workers, we'd better have a DSM segment. */
466         Assert(pcxt->seg != NULL);
467
468         /* We might be running in a short-lived memory context. */
469         oldcontext = MemoryContextSwitchTo(TopTransactionContext);
470
471         /* Configure a worker. */
472         memset(&worker, 0, sizeof(worker));
473         snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d",
474                          MyProcPid);
475         snprintf(worker.bgw_type, BGW_MAXLEN, "parallel worker");
476         worker.bgw_flags =
477                 BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION
478                 | BGWORKER_CLASS_PARALLEL;
479         worker.bgw_start_time = BgWorkerStart_ConsistentState;
480         worker.bgw_restart_time = BGW_NEVER_RESTART;
481         sprintf(worker.bgw_library_name, "postgres");
482         sprintf(worker.bgw_function_name, "ParallelWorkerMain");
483         worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(pcxt->seg));
484         worker.bgw_notify_pid = MyProcPid;
485
486         /*
487          * Start workers.
488          *
489          * The caller must be able to tolerate ending up with fewer workers than
490          * expected, so there is no need to throw an error here if registration
491          * fails.  It wouldn't help much anyway, because registering the worker in
492          * no way guarantees that it will start up and initialize successfully.
493          */
494         for (i = 0; i < pcxt->nworkers; ++i)
495         {
496                 memcpy(worker.bgw_extra, &i, sizeof(int));
497                 if (!any_registrations_failed &&
498                         RegisterDynamicBackgroundWorker(&worker,
499                                                                                         &pcxt->worker[i].bgwhandle))
500                 {
501                         shm_mq_set_handle(pcxt->worker[i].error_mqh,
502                                                           pcxt->worker[i].bgwhandle);
503                         pcxt->nworkers_launched++;
504                 }
505                 else
506                 {
507                         /*
508                          * If we weren't able to register the worker, then we've bumped up
509                          * against the max_worker_processes limit, and future
510                          * registrations will probably fail too, so arrange to skip them.
511                          * But we still have to execute this code for the remaining slots
512                          * to make sure that we forget about the error queues we budgeted
513                          * for those workers.  Otherwise, we'll wait for them to start,
514                          * but they never will.
515                          */
516                         any_registrations_failed = true;
517                         pcxt->worker[i].bgwhandle = NULL;
518                         shm_mq_detach(pcxt->worker[i].error_mqh);
519                         pcxt->worker[i].error_mqh = NULL;
520                 }
521         }
522
523         /* Restore previous memory context. */
524         MemoryContextSwitchTo(oldcontext);
525 }
526
527 /*
528  * Wait for all workers to finish computing.
529  *
530  * Even if the parallel operation seems to have completed successfully, it's
531  * important to call this function afterwards.  We must not miss any errors
532  * the workers may have thrown during the parallel operation, or any that they
533  * may yet throw while shutting down.
534  *
535  * Also, we want to update our notion of XactLastRecEnd based on worker
536  * feedback.
537  */
538 void
539 WaitForParallelWorkersToFinish(ParallelContext *pcxt)
540 {
541         for (;;)
542         {
543                 bool            anyone_alive = false;
544                 int                     i;
545
546                 /*
547                  * This will process any parallel messages that are pending, which may
548                  * change the outcome of the loop that follows.  It may also throw an
549                  * error propagated from a worker.
550                  */
551                 CHECK_FOR_INTERRUPTS();
552
553                 for (i = 0; i < pcxt->nworkers_launched; ++i)
554                 {
555                         if (pcxt->worker[i].error_mqh != NULL)
556                         {
557                                 anyone_alive = true;
558                                 break;
559                         }
560                 }
561
562                 if (!anyone_alive)
563                         break;
564
565                 WaitLatch(MyLatch, WL_LATCH_SET, -1,
566                                   WAIT_EVENT_PARALLEL_FINISH);
567                 ResetLatch(MyLatch);
568         }
569
570         if (pcxt->toc != NULL)
571         {
572                 FixedParallelState *fps;
573
574                 fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
575                 if (fps->last_xlog_end > XactLastRecEnd)
576                         XactLastRecEnd = fps->last_xlog_end;
577         }
578 }
579
580 /*
581  * Wait for all workers to exit.
582  *
583  * This function ensures that workers have been completely shutdown.  The
584  * difference between WaitForParallelWorkersToFinish and this function is
585  * that former just ensures that last message sent by worker backend is
586  * received by master backend whereas this ensures the complete shutdown.
587  */
588 static void
589 WaitForParallelWorkersToExit(ParallelContext *pcxt)
590 {
591         int                     i;
592
593         /* Wait until the workers actually die. */
594         for (i = 0; i < pcxt->nworkers_launched; ++i)
595         {
596                 BgwHandleStatus status;
597
598                 if (pcxt->worker == NULL || pcxt->worker[i].bgwhandle == NULL)
599                         continue;
600
601                 status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle);
602
603                 /*
604                  * If the postmaster kicked the bucket, we have no chance of cleaning
605                  * up safely -- we won't be able to tell when our workers are actually
606                  * dead.  This doesn't necessitate a PANIC since they will all abort
607                  * eventually, but we can't safely continue this session.
608                  */
609                 if (status == BGWH_POSTMASTER_DIED)
610                         ereport(FATAL,
611                                         (errcode(ERRCODE_ADMIN_SHUTDOWN),
612                                          errmsg("postmaster exited during a parallel transaction")));
613
614                 /* Release memory. */
615                 pfree(pcxt->worker[i].bgwhandle);
616                 pcxt->worker[i].bgwhandle = NULL;
617         }
618 }
619
620 /*
621  * Destroy a parallel context.
622  *
623  * If expecting a clean exit, you should use WaitForParallelWorkersToFinish()
624  * first, before calling this function.  When this function is invoked, any
625  * remaining workers are forcibly killed; the dynamic shared memory segment
626  * is unmapped; and we then wait (uninterruptibly) for the workers to exit.
627  */
628 void
629 DestroyParallelContext(ParallelContext *pcxt)
630 {
631         int                     i;
632
633         /*
634          * Be careful about order of operations here!  We remove the parallel
635          * context from the list before we do anything else; otherwise, if an
636          * error occurs during a subsequent step, we might try to nuke it again
637          * from AtEOXact_Parallel or AtEOSubXact_Parallel.
638          */
639         dlist_delete(&pcxt->node);
640
641         /* Kill each worker in turn, and forget their error queues. */
642         if (pcxt->worker != NULL)
643         {
644                 for (i = 0; i < pcxt->nworkers_launched; ++i)
645                 {
646                         if (pcxt->worker[i].error_mqh != NULL)
647                         {
648                                 TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
649
650                                 shm_mq_detach(pcxt->worker[i].error_mqh);
651                                 pcxt->worker[i].error_mqh = NULL;
652                         }
653                 }
654         }
655
656         /*
657          * If we have allocated a shared memory segment, detach it.  This will
658          * implicitly detach the error queues, and any other shared memory queues,
659          * stored there.
660          */
661         if (pcxt->seg != NULL)
662         {
663                 dsm_detach(pcxt->seg);
664                 pcxt->seg = NULL;
665         }
666
667         /*
668          * If this parallel context is actually in backend-private memory rather
669          * than shared memory, free that memory instead.
670          */
671         if (pcxt->private_memory != NULL)
672         {
673                 pfree(pcxt->private_memory);
674                 pcxt->private_memory = NULL;
675         }
676
677         /*
678          * We can't finish transaction commit or abort until all of the workers
679          * have exited.  This means, in particular, that we can't respond to
680          * interrupts at this stage.
681          */
682         HOLD_INTERRUPTS();
683         WaitForParallelWorkersToExit(pcxt);
684         RESUME_INTERRUPTS();
685
686         /* Free the worker array itself. */
687         if (pcxt->worker != NULL)
688         {
689                 pfree(pcxt->worker);
690                 pcxt->worker = NULL;
691         }
692
693         /* Free memory. */
694         pfree(pcxt->library_name);
695         pfree(pcxt->function_name);
696         pfree(pcxt);
697 }
698
699 /*
700  * Are there any parallel contexts currently active?
701  */
702 bool
703 ParallelContextActive(void)
704 {
705         return !dlist_is_empty(&pcxt_list);
706 }
707
708 /*
709  * Handle receipt of an interrupt indicating a parallel worker message.
710  *
711  * Note: this is called within a signal handler!  All we can do is set
712  * a flag that will cause the next CHECK_FOR_INTERRUPTS() to invoke
713  * HandleParallelMessages().
714  */
715 void
716 HandleParallelMessageInterrupt(void)
717 {
718         InterruptPending = true;
719         ParallelMessagePending = true;
720         SetLatch(MyLatch);
721 }
722
723 /*
724  * Handle any queued protocol messages received from parallel workers.
725  */
726 void
727 HandleParallelMessages(void)
728 {
729         dlist_iter      iter;
730         MemoryContext oldcontext;
731
732         static MemoryContext hpm_context = NULL;
733
734         /*
735          * This is invoked from ProcessInterrupts(), and since some of the
736          * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
737          * for recursive calls if more signals are received while this runs.  It's
738          * unclear that recursive entry would be safe, and it doesn't seem useful
739          * even if it is safe, so let's block interrupts until done.
740          */
741         HOLD_INTERRUPTS();
742
743         /*
744          * Moreover, CurrentMemoryContext might be pointing almost anywhere.  We
745          * don't want to risk leaking data into long-lived contexts, so let's do
746          * our work here in a private context that we can reset on each use.
747          */
748         if (hpm_context == NULL)        /* first time through? */
749                 hpm_context = AllocSetContextCreate(TopMemoryContext,
750                                                                                         "HandleParallelMessages",
751                                                                                         ALLOCSET_DEFAULT_SIZES);
752         else
753                 MemoryContextReset(hpm_context);
754
755         oldcontext = MemoryContextSwitchTo(hpm_context);
756
757         /* OK to process messages.  Reset the flag saying there are more to do. */
758         ParallelMessagePending = false;
759
760         dlist_foreach(iter, &pcxt_list)
761         {
762                 ParallelContext *pcxt;
763                 int                     i;
764
765                 pcxt = dlist_container(ParallelContext, node, iter.cur);
766                 if (pcxt->worker == NULL)
767                         continue;
768
769                 for (i = 0; i < pcxt->nworkers_launched; ++i)
770                 {
771                         /*
772                          * Read as many messages as we can from each worker, but stop when
773                          * either (1) the worker's error queue goes away, which can happen
774                          * if we receive a Terminate message from the worker; or (2) no
775                          * more messages can be read from the worker without blocking.
776                          */
777                         while (pcxt->worker[i].error_mqh != NULL)
778                         {
779                                 shm_mq_result res;
780                                 Size            nbytes;
781                                 void       *data;
782
783                                 res = shm_mq_receive(pcxt->worker[i].error_mqh, &nbytes,
784                                                                          &data, true);
785                                 if (res == SHM_MQ_WOULD_BLOCK)
786                                         break;
787                                 else if (res == SHM_MQ_SUCCESS)
788                                 {
789                                         StringInfoData msg;
790
791                                         initStringInfo(&msg);
792                                         appendBinaryStringInfo(&msg, data, nbytes);
793                                         HandleParallelMessage(pcxt, i, &msg);
794                                         pfree(msg.data);
795                                 }
796                                 else
797                                         ereport(ERROR,
798                                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
799                                                          errmsg("lost connection to parallel worker")));
800                         }
801                 }
802         }
803
804         MemoryContextSwitchTo(oldcontext);
805
806         /* Might as well clear the context on our way out */
807         MemoryContextReset(hpm_context);
808
809         RESUME_INTERRUPTS();
810 }
811
812 /*
813  * Handle a single protocol message received from a single parallel worker.
814  */
815 static void
816 HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
817 {
818         char            msgtype;
819
820         msgtype = pq_getmsgbyte(msg);
821
822         switch (msgtype)
823         {
824                 case 'K':                               /* BackendKeyData */
825                         {
826                                 int32           pid = pq_getmsgint(msg, 4);
827
828                                 (void) pq_getmsgint(msg, 4);    /* discard cancel key */
829                                 (void) pq_getmsgend(msg);
830                                 pcxt->worker[i].pid = pid;
831                                 break;
832                         }
833
834                 case 'E':                               /* ErrorResponse */
835                 case 'N':                               /* NoticeResponse */
836                         {
837                                 ErrorData       edata;
838                                 ErrorContextCallback *save_error_context_stack;
839
840                                 /* Parse ErrorResponse or NoticeResponse. */
841                                 pq_parse_errornotice(msg, &edata);
842
843                                 /* Death of a worker isn't enough justification for suicide. */
844                                 edata.elevel = Min(edata.elevel, ERROR);
845
846                                 /*
847                                  * If desired, add a context line to show that this is a
848                                  * message propagated from a parallel worker.  Otherwise, it
849                                  * can sometimes be confusing to understand what actually
850                                  * happened.  (We don't do this in FORCE_PARALLEL_REGRESS mode
851                                  * because it causes test-result instability depending on
852                                  * whether a parallel worker is actually used or not.)
853                                  */
854                                 if (force_parallel_mode != FORCE_PARALLEL_REGRESS)
855                                 {
856                                         if (edata.context)
857                                                 edata.context = psprintf("%s\n%s", edata.context,
858                                                                                                  _("parallel worker"));
859                                         else
860                                                 edata.context = pstrdup(_("parallel worker"));
861                                 }
862
863                                 /*
864                                  * Context beyond that should use the error context callbacks
865                                  * that were in effect when the ParallelContext was created,
866                                  * not the current ones.
867                                  */
868                                 save_error_context_stack = error_context_stack;
869                                 error_context_stack = pcxt->error_context_stack;
870
871                                 /* Rethrow error or print notice. */
872                                 ThrowErrorData(&edata);
873
874                                 /* Not an error, so restore previous context stack. */
875                                 error_context_stack = save_error_context_stack;
876
877                                 break;
878                         }
879
880                 case 'A':                               /* NotifyResponse */
881                         {
882                                 /* Propagate NotifyResponse. */
883                                 int32           pid;
884                                 const char *channel;
885                                 const char *payload;
886
887                                 pid = pq_getmsgint(msg, 4);
888                                 channel = pq_getmsgrawstring(msg);
889                                 payload = pq_getmsgrawstring(msg);
890                                 pq_endmessage(msg);
891
892                                 NotifyMyFrontEnd(channel, payload, pid);
893
894                                 break;
895                         }
896
897                 case 'X':                               /* Terminate, indicating clean exit */
898                         {
899                                 shm_mq_detach(pcxt->worker[i].error_mqh);
900                                 pcxt->worker[i].error_mqh = NULL;
901                                 break;
902                         }
903
904                 default:
905                         {
906                                 elog(ERROR, "unrecognized message type received from parallel worker: %c (message length %d bytes)",
907                                          msgtype, msg->len);
908                         }
909         }
910 }
911
912 /*
913  * End-of-subtransaction cleanup for parallel contexts.
914  *
915  * Currently, it's forbidden to enter or leave a subtransaction while
916  * parallel mode is in effect, so we could just blow away everything.  But
917  * we may want to relax that restriction in the future, so this code
918  * contemplates that there may be multiple subtransaction IDs in pcxt_list.
919  */
920 void
921 AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId)
922 {
923         while (!dlist_is_empty(&pcxt_list))
924         {
925                 ParallelContext *pcxt;
926
927                 pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
928                 if (pcxt->subid != mySubId)
929                         break;
930                 if (isCommit)
931                         elog(WARNING, "leaked parallel context");
932                 DestroyParallelContext(pcxt);
933         }
934 }
935
936 /*
937  * End-of-transaction cleanup for parallel contexts.
938  */
939 void
940 AtEOXact_Parallel(bool isCommit)
941 {
942         while (!dlist_is_empty(&pcxt_list))
943         {
944                 ParallelContext *pcxt;
945
946                 pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
947                 if (isCommit)
948                         elog(WARNING, "leaked parallel context");
949                 DestroyParallelContext(pcxt);
950         }
951 }
952
953 /*
954  * Main entrypoint for parallel workers.
955  */
956 void
957 ParallelWorkerMain(Datum main_arg)
958 {
959         dsm_segment *seg;
960         shm_toc    *toc;
961         FixedParallelState *fps;
962         char       *error_queue_space;
963         shm_mq     *mq;
964         shm_mq_handle *mqh;
965         char       *libraryspace;
966         char       *entrypointstate;
967         char       *library_name;
968         char       *function_name;
969         parallel_worker_main_type entrypt;
970         char       *gucspace;
971         char       *combocidspace;
972         char       *tsnapspace;
973         char       *asnapspace;
974         char       *tstatespace;
975         StringInfoData msgbuf;
976         char       *session_dsm_handle_space;
977
978         /* Set flag to indicate that we're initializing a parallel worker. */
979         InitializingParallelWorker = true;
980
981         /* Establish signal handlers. */
982         pqsignal(SIGTERM, die);
983         BackgroundWorkerUnblockSignals();
984
985         /* Determine and set our parallel worker number. */
986         Assert(ParallelWorkerNumber == -1);
987         memcpy(&ParallelWorkerNumber, MyBgworkerEntry->bgw_extra, sizeof(int));
988
989         /* Set up a memory context and resource owner. */
990         Assert(CurrentResourceOwner == NULL);
991         CurrentResourceOwner = ResourceOwnerCreate(NULL, "parallel toplevel");
992         CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext,
993                                                                                                  "Parallel worker",
994                                                                                                  ALLOCSET_DEFAULT_SIZES);
995
996         /*
997          * Now that we have a resource owner, we can attach to the dynamic shared
998          * memory segment and read the table of contents.
999          */
1000         seg = dsm_attach(DatumGetUInt32(main_arg));
1001         if (seg == NULL)
1002                 ereport(ERROR,
1003                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1004                                  errmsg("could not map dynamic shared memory segment")));
1005         toc = shm_toc_attach(PARALLEL_MAGIC, dsm_segment_address(seg));
1006         if (toc == NULL)
1007                 ereport(ERROR,
1008                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1009                                  errmsg("invalid magic number in dynamic shared memory segment")));
1010
1011         /* Look up fixed parallel state. */
1012         fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED, false);
1013         MyFixedParallelState = fps;
1014
1015         /*
1016          * Now that we have a worker number, we can find and attach to the error
1017          * queue provided for us.  That's good, because until we do that, any
1018          * errors that happen here will not be reported back to the process that
1019          * requested that this worker be launched.
1020          */
1021         error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE, false);
1022         mq = (shm_mq *) (error_queue_space +
1023                                          ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE);
1024         shm_mq_set_sender(mq, MyProc);
1025         mqh = shm_mq_attach(mq, seg, NULL);
1026         pq_redirect_to_shm_mq(seg, mqh);
1027         pq_set_parallel_master(fps->parallel_master_pid,
1028                                                    fps->parallel_master_backend_id);
1029
1030         /*
1031          * Send a BackendKeyData message to the process that initiated parallelism
1032          * so that it has access to our PID before it receives any other messages
1033          * from us.  Our cancel key is sent, too, since that's the way the
1034          * protocol message is defined, but it won't actually be used for anything
1035          * in this case.
1036          */
1037         pq_beginmessage(&msgbuf, 'K');
1038         pq_sendint32(&msgbuf, (int32) MyProcPid);
1039         pq_sendint32(&msgbuf, (int32) MyCancelKey);
1040         pq_endmessage(&msgbuf);
1041
1042         /*
1043          * Hooray! Primary initialization is complete.  Now, we need to set up our
1044          * backend-local state to match the original backend.
1045          */
1046
1047         /*
1048          * Join locking group.  We must do this before anything that could try to
1049          * acquire a heavyweight lock, because any heavyweight locks acquired to
1050          * this point could block either directly against the parallel group
1051          * leader or against some process which in turn waits for a lock that
1052          * conflicts with the parallel group leader, causing an undetected
1053          * deadlock.  (If we can't join the lock group, the leader has gone away,
1054          * so just exit quietly.)
1055          */
1056         if (!BecomeLockGroupMember(fps->parallel_master_pgproc,
1057                                                            fps->parallel_master_pid))
1058                 return;
1059
1060         /*
1061          * Load libraries that were loaded by original backend.  We want to do
1062          * this before restoring GUCs, because the libraries might define custom
1063          * variables.
1064          */
1065         libraryspace = shm_toc_lookup(toc, PARALLEL_KEY_LIBRARY, false);
1066         RestoreLibraryState(libraryspace);
1067
1068         /*
1069          * Identify the entry point to be called.  In theory this could result in
1070          * loading an additional library, though most likely the entry point is in
1071          * the core backend or in a library we just loaded.
1072          */
1073         entrypointstate = shm_toc_lookup(toc, PARALLEL_KEY_ENTRYPOINT, false);
1074         library_name = entrypointstate;
1075         function_name = entrypointstate + strlen(library_name) + 1;
1076
1077         entrypt = LookupParallelWorkerFunction(library_name, function_name);
1078
1079         /* Restore database connection. */
1080         BackgroundWorkerInitializeConnectionByOid(fps->database_id,
1081                                                                                           fps->authenticated_user_id);
1082
1083         /*
1084          * Set the client encoding to the database encoding, since that is what
1085          * the leader will expect.
1086          */
1087         SetClientEncoding(GetDatabaseEncoding());
1088
1089         /* Restore GUC values from launching backend. */
1090         gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC, false);
1091         StartTransactionCommand();
1092         RestoreGUCState(gucspace);
1093         CommitTransactionCommand();
1094
1095         /* Crank up a transaction state appropriate to a parallel worker. */
1096         tstatespace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_STATE, false);
1097         StartParallelWorkerTransaction(tstatespace);
1098
1099         /* Restore combo CID state. */
1100         combocidspace = shm_toc_lookup(toc, PARALLEL_KEY_COMBO_CID, false);
1101         RestoreComboCIDState(combocidspace);
1102
1103         /* Attach to the per-session DSM segment and contained objects. */
1104         session_dsm_handle_space =
1105                 shm_toc_lookup(toc, PARALLEL_KEY_SESSION_DSM, false);
1106         AttachSession(*(dsm_handle *) session_dsm_handle_space);
1107
1108         /* Restore transaction snapshot. */
1109         tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT, false);
1110         RestoreTransactionSnapshot(RestoreSnapshot(tsnapspace),
1111                                                            fps->parallel_master_pgproc);
1112
1113         /* Restore active snapshot. */
1114         asnapspace = shm_toc_lookup(toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, false);
1115         PushActiveSnapshot(RestoreSnapshot(asnapspace));
1116
1117         /*
1118          * We've changed which tuples we can see, and must therefore invalidate
1119          * system caches.
1120          */
1121         InvalidateSystemCaches();
1122
1123         /*
1124          * Restore current role id.  Skip verifying whether session user is
1125          * allowed to become this role and blindly restore the leader's state for
1126          * current role.
1127          */
1128         SetCurrentRoleId(fps->outer_user_id, fps->is_superuser);
1129
1130         /* Restore user ID and security context. */
1131         SetUserIdAndSecContext(fps->current_user_id, fps->sec_context);
1132
1133         /* Restore temp-namespace state to ensure search path matches leader's. */
1134         SetTempNamespaceState(fps->temp_namespace_id,
1135                                                   fps->temp_toast_namespace_id);
1136
1137         /* Set ParallelMasterBackendId so we know how to address temp relations. */
1138         ParallelMasterBackendId = fps->parallel_master_backend_id;
1139
1140         /*
1141          * We've initialized all of our state now; nothing should change
1142          * hereafter.
1143          */
1144         InitializingParallelWorker = false;
1145         EnterParallelMode();
1146
1147         /*
1148          * Time to do the real work: invoke the caller-supplied code.
1149          */
1150         entrypt(seg, toc);
1151
1152         /* Must exit parallel mode to pop active snapshot. */
1153         ExitParallelMode();
1154
1155         /* Must pop active snapshot so resowner.c doesn't complain. */
1156         PopActiveSnapshot();
1157
1158         /* Shut down the parallel-worker transaction. */
1159         EndParallelWorkerTransaction();
1160
1161         /* Detach from the per-session DSM segment. */
1162         DetachSession();
1163
1164         /* Report success. */
1165         pq_putmessage('X', NULL, 0);
1166 }
1167
1168 /*
1169  * Update shared memory with the ending location of the last WAL record we
1170  * wrote, if it's greater than the value already stored there.
1171  */
1172 void
1173 ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end)
1174 {
1175         FixedParallelState *fps = MyFixedParallelState;
1176
1177         Assert(fps != NULL);
1178         SpinLockAcquire(&fps->mutex);
1179         if (fps->last_xlog_end < last_xlog_end)
1180                 fps->last_xlog_end = last_xlog_end;
1181         SpinLockRelease(&fps->mutex);
1182 }
1183
1184 /*
1185  * Look up (and possibly load) a parallel worker entry point function.
1186  *
1187  * For functions contained in the core code, we use library name "postgres"
1188  * and consult the InternalParallelWorkers array.  External functions are
1189  * looked up, and loaded if necessary, using load_external_function().
1190  *
1191  * The point of this is to pass function names as strings across process
1192  * boundaries.  We can't pass actual function addresses because of the
1193  * possibility that the function has been loaded at a different address
1194  * in a different process.  This is obviously a hazard for functions in
1195  * loadable libraries, but it can happen even for functions in the core code
1196  * on platforms using EXEC_BACKEND (e.g., Windows).
1197  *
1198  * At some point it might be worthwhile to get rid of InternalParallelWorkers[]
1199  * in favor of applying load_external_function() for core functions too;
1200  * but that raises portability issues that are not worth addressing now.
1201  */
1202 static parallel_worker_main_type
1203 LookupParallelWorkerFunction(const char *libraryname, const char *funcname)
1204 {
1205         /*
1206          * If the function is to be loaded from postgres itself, search the
1207          * InternalParallelWorkers array.
1208          */
1209         if (strcmp(libraryname, "postgres") == 0)
1210         {
1211                 int                     i;
1212
1213                 for (i = 0; i < lengthof(InternalParallelWorkers); i++)
1214                 {
1215                         if (strcmp(InternalParallelWorkers[i].fn_name, funcname) == 0)
1216                                 return InternalParallelWorkers[i].fn_addr;
1217                 }
1218
1219                 /* We can only reach this by programming error. */
1220                 elog(ERROR, "internal function \"%s\" not found", funcname);
1221         }
1222
1223         /* Otherwise load from external library. */
1224         return (parallel_worker_main_type)
1225                 load_external_function(libraryname, funcname, true, NULL);
1226 }