]> granicus.if.org Git - postgresql/blobdiff - src/backend/access/transam/parallel.c
Fix initialization of fake LSN for unlogged relations
[postgresql] / src / backend / access / transam / parallel.c
index 5b45b07e7c1aa21c5918627c889296927ee523b8..55d129a64f7f344b3136e79609377974e073006a 100644 (file)
@@ -3,7 +3,7 @@
  * parallel.c
  *       Infrastructure for launching parallel workers
  *
- * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
 
 #include "postgres.h"
 
+#include "access/nbtree.h"
 #include "access/parallel.h"
 #include "access/session.h"
 #include "access/xact.h"
 #include "access/xlog.h"
+#include "catalog/pg_enum.h"
 #include "catalog/index.h"
 #include "catalog/namespace.h"
 #include "commands/async.h"
 #include "libpq/pqformat.h"
 #include "libpq/pqmq.h"
 #include "miscadmin.h"
-#include "optimizer/planmain.h"
+#include "optimizer/optimizer.h"
 #include "pgstat.h"
 #include "storage/ipc.h"
+#include "storage/predicate.h"
 #include "storage/sinval.h"
 #include "storage/spin.h"
 #include "tcop/tcopprot.h"
@@ -36,7 +39,7 @@
 #include "utils/guc.h"
 #include "utils/inval.h"
 #include "utils/memutils.h"
-#include "utils/resowner.h"
+#include "utils/relmapper.h"
 #include "utils/snapmgr.h"
 #include "utils/typcache.h"
 
@@ -69,6 +72,8 @@
 #define PARALLEL_KEY_ENTRYPOINT                                UINT64CONST(0xFFFFFFFFFFFF0009)
 #define PARALLEL_KEY_SESSION_DSM                       UINT64CONST(0xFFFFFFFFFFFF000A)
 #define PARALLEL_KEY_REINDEX_STATE                     UINT64CONST(0xFFFFFFFFFFFF000B)
+#define PARALLEL_KEY_RELMAPPER_STATE           UINT64CONST(0xFFFFFFFFFFFF000C)
+#define PARALLEL_KEY_ENUMBLACKLIST                     UINT64CONST(0xFFFFFFFFFFFF000D)
 
 /* Fixed-size parallel state. */
 typedef struct FixedParallelState
@@ -85,6 +90,9 @@ typedef struct FixedParallelState
        PGPROC     *parallel_master_pgproc;
        pid_t           parallel_master_pid;
        BackendId       parallel_master_backend_id;
+       TimestampTz xact_ts;
+       TimestampTz stmt_ts;
+       SerializableXactHandle serializable_xact_handle;
 
        /* Mutex protects remaining fields. */
        slock_t         mutex;
@@ -129,6 +137,9 @@ static const struct
 {
        {
                "ParallelQueryMain", ParallelQueryMain
+       },
+       {
+               "_bt_parallel_build_main", _bt_parallel_build_main
        }
 };
 
@@ -157,21 +168,6 @@ CreateParallelContext(const char *library_name, const char *function_name,
        /* Number of workers should be non-negative. */
        Assert(nworkers >= 0);
 
-       /*
-        * If dynamic shared memory is not available, we won't be able to use
-        * background workers.
-        */
-       if (dynamic_shared_memory_type == DSM_IMPL_NONE)
-               nworkers = 0;
-
-       /*
-        * If we are running under serializable isolation, we can't use parallel
-        * workers, at least not until somebody enhances that mechanism to be
-        * parallel-aware.
-        */
-       if (IsolationIsSerializable())
-               nworkers = 0;
-
        /* We might be running in a short-lived memory context. */
        oldcontext = MemoryContextSwitchTo(TopTransactionContext);
 
@@ -207,6 +203,8 @@ InitializeParallelDSM(ParallelContext *pcxt)
        Size            asnaplen = 0;
        Size            tstatelen = 0;
        Size            reindexlen = 0;
+       Size            relmapperlen = 0;
+       Size            enumblacklistlen = 0;
        Size            segsize = 0;
        int                     i;
        FixedParallelState *fps;
@@ -258,8 +256,12 @@ InitializeParallelDSM(ParallelContext *pcxt)
                shm_toc_estimate_chunk(&pcxt->estimator, sizeof(dsm_handle));
                reindexlen = EstimateReindexStateSpace();
                shm_toc_estimate_chunk(&pcxt->estimator, reindexlen);
+               relmapperlen = EstimateRelationMapSpace();
+               shm_toc_estimate_chunk(&pcxt->estimator, relmapperlen);
+               enumblacklistlen = EstimateEnumBlacklistSpace();
+               shm_toc_estimate_chunk(&pcxt->estimator, enumblacklistlen);
                /* If you add more chunks here, you probably need to add keys. */
-               shm_toc_estimate_keys(&pcxt->estimator, 8);
+               shm_toc_estimate_keys(&pcxt->estimator, 10);
 
                /* Estimate space need for error queues. */
                StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) ==
@@ -315,6 +317,9 @@ InitializeParallelDSM(ParallelContext *pcxt)
        fps->parallel_master_pgproc = MyProc;
        fps->parallel_master_pid = MyProcPid;
        fps->parallel_master_backend_id = MyBackendId;
+       fps->xact_ts = GetCurrentTransactionStartTimestamp();
+       fps->stmt_ts = GetCurrentStatementStartTimestamp();
+       fps->serializable_xact_handle = ShareSerializableXact();
        SpinLockInit(&fps->mutex);
        fps->last_xlog_end = 0;
        shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps);
@@ -329,9 +334,11 @@ InitializeParallelDSM(ParallelContext *pcxt)
                char       *asnapspace;
                char       *tstatespace;
                char       *reindexspace;
+               char       *relmapperspace;
                char       *error_queue_space;
                char       *session_dsm_handle_space;
                char       *entrypointstate;
+               char       *enumblacklistspace;
                Size            lnamelen;
 
                /* Serialize shared libraries we have loaded. */
@@ -375,6 +382,18 @@ InitializeParallelDSM(ParallelContext *pcxt)
                SerializeReindexState(reindexlen, reindexspace);
                shm_toc_insert(pcxt->toc, PARALLEL_KEY_REINDEX_STATE, reindexspace);
 
+               /* Serialize relmapper state. */
+               relmapperspace = shm_toc_allocate(pcxt->toc, relmapperlen);
+               SerializeRelationMap(relmapperlen, relmapperspace);
+               shm_toc_insert(pcxt->toc, PARALLEL_KEY_RELMAPPER_STATE,
+                                          relmapperspace);
+
+               /* Serialize enum blacklist state. */
+               enumblacklistspace = shm_toc_allocate(pcxt->toc, enumblacklistlen);
+               SerializeEnumBlacklist(enumblacklistspace, enumblacklistlen);
+               shm_toc_insert(pcxt->toc, PARALLEL_KEY_ENUMBLACKLIST,
+                                          enumblacklistspace);
+
                /* Allocate space for worker information. */
                pcxt->worker = palloc0(sizeof(ParallelWorkerInfo) * pcxt->nworkers);
 
@@ -428,8 +447,6 @@ void
 ReinitializeParallelDSM(ParallelContext *pcxt)
 {
        FixedParallelState *fps;
-       char       *error_queue_space;
-       int                     i;
 
        /* Wait for any old workers to exit. */
        if (pcxt->nworkers_launched > 0)
@@ -450,18 +467,23 @@ ReinitializeParallelDSM(ParallelContext *pcxt)
        fps->last_xlog_end = 0;
 
        /* Recreate error queues (if they exist). */
-       error_queue_space =
-               shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, true);
-       Assert(pcxt->nworkers == 0 || error_queue_space != NULL);
-       for (i = 0; i < pcxt->nworkers; ++i)
+       if (pcxt->nworkers > 0)
        {
-               char       *start;
-               shm_mq     *mq;
+               char       *error_queue_space;
+               int                     i;
 
-               start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
-               mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
-               shm_mq_set_receiver(mq, MyProc);
-               pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
+               error_queue_space =
+                       shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, false);
+               for (i = 0; i < pcxt->nworkers; ++i)
+               {
+                       char       *start;
+                       shm_mq     *mq;
+
+                       start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
+                       mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
+                       shm_mq_set_receiver(mq, MyProc);
+                       pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
+               }
        }
 }
 
@@ -663,13 +685,9 @@ WaitForParallelWorkersToAttach(ParallelContext *pcxt)
                                 * just end up waiting for the same worker again.
                                 */
                                rc = WaitLatch(MyLatch,
-                                                          WL_LATCH_SET | WL_POSTMASTER_DEATH,
+                                                          WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
                                                           -1, WAIT_EVENT_BGWORKER_STARTUP);
 
-                               /* emergency bailout if postmaster has died */
-                               if (rc & WL_POSTMASTER_DEATH)
-                                       proc_exit(1);
-
                                if (rc & WL_LATCH_SET)
                                        ResetLatch(MyLatch);
                        }
@@ -786,8 +804,8 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt)
                        }
                }
 
-               WaitLatch(MyLatch, WL_LATCH_SET, -1,
-                                 WAIT_EVENT_PARALLEL_FINISH);
+               (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
+                                                WAIT_EVENT_PARALLEL_FINISH);
                ResetLatch(MyLatch);
        }
 
@@ -1204,6 +1222,8 @@ ParallelWorkerMain(Datum main_arg)
        char       *asnapspace;
        char       *tstatespace;
        char       *reindexspace;
+       char       *relmapperspace;
+       char       *enumblacklistspace;
        StringInfoData msgbuf;
        char       *session_dsm_handle_space;
 
@@ -1218,16 +1238,19 @@ ParallelWorkerMain(Datum main_arg)
        Assert(ParallelWorkerNumber == -1);
        memcpy(&ParallelWorkerNumber, MyBgworkerEntry->bgw_extra, sizeof(int));
 
-       /* Set up a memory context and resource owner. */
-       Assert(CurrentResourceOwner == NULL);
-       CurrentResourceOwner = ResourceOwnerCreate(NULL, "parallel toplevel");
+       /* Set up a memory context to work in, just for cleanliness. */
        CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext,
                                                                                                 "Parallel worker",
                                                                                                 ALLOCSET_DEFAULT_SIZES);
 
        /*
-        * Now that we have a resource owner, we can attach to the dynamic shared
-        * memory segment and read the table of contents.
+        * Attach to the dynamic shared memory segment for the parallel query, and
+        * find its table of contents.
+        *
+        * Note: at this point, we have not created any ResourceOwner in this
+        * process.  This will result in our DSM mapping surviving until process
+        * exit, which is fine.  If there were a ResourceOwner, it would acquire
+        * ownership of the mapping, but we have no need for that.
         */
        seg = dsm_attach(DatumGetUInt32(main_arg));
        if (seg == NULL)
@@ -1295,12 +1318,11 @@ ParallelWorkerMain(Datum main_arg)
                return;
 
        /*
-        * Load libraries that were loaded by original backend.  We want to do
-        * this before restoring GUCs, because the libraries might define custom
-        * variables.
+        * Restore transaction and statement start-time timestamps.  This must
+        * happen before anything that would start a transaction, else asserts in
+        * xact.c will fire.
         */
-       libraryspace = shm_toc_lookup(toc, PARALLEL_KEY_LIBRARY, false);
-       RestoreLibraryState(libraryspace);
+       SetParallelStartTimestamps(fps->xact_ts, fps->stmt_ts);
 
        /*
         * Identify the entry point to be called.  In theory this could result in
@@ -1315,7 +1337,8 @@ ParallelWorkerMain(Datum main_arg)
 
        /* Restore database connection. */
        BackgroundWorkerInitializeConnectionByOid(fps->database_id,
-                                                                                         fps->authenticated_user_id);
+                                                                                         fps->authenticated_user_id,
+                                                                                         0);
 
        /*
         * Set the client encoding to the database encoding, since that is what
@@ -1323,9 +1346,17 @@ ParallelWorkerMain(Datum main_arg)
         */
        SetClientEncoding(GetDatabaseEncoding());
 
+       /*
+        * Load libraries that were loaded by original backend.  We want to do
+        * this before restoring GUCs, because the libraries might define custom
+        * variables.
+        */
+       libraryspace = shm_toc_lookup(toc, PARALLEL_KEY_LIBRARY, false);
+       StartTransactionCommand();
+       RestoreLibraryState(libraryspace);
+
        /* Restore GUC values from launching backend. */
        gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC, false);
-       StartTransactionCommand();
        RestoreGUCState(gucspace);
        CommitTransactionCommand();
 
@@ -1375,6 +1406,18 @@ ParallelWorkerMain(Datum main_arg)
        reindexspace = shm_toc_lookup(toc, PARALLEL_KEY_REINDEX_STATE, false);
        RestoreReindexState(reindexspace);
 
+       /* Restore relmapper state. */
+       relmapperspace = shm_toc_lookup(toc, PARALLEL_KEY_RELMAPPER_STATE, false);
+       RestoreRelationMap(relmapperspace);
+
+       /* Restore enum blacklist. */
+       enumblacklistspace = shm_toc_lookup(toc, PARALLEL_KEY_ENUMBLACKLIST,
+                                                                               false);
+       RestoreEnumBlacklist(enumblacklistspace);
+
+       /* Attach to the leader's serializable transaction, if SERIALIZABLE. */
+       AttachSerializableXact(fps->serializable_xact_handle);
+
        /*
         * We've initialized all of our state now; nothing should change
         * hereafter.
@@ -1390,7 +1433,7 @@ ParallelWorkerMain(Datum main_arg)
        /* Must exit parallel mode to pop active snapshot. */
        ExitParallelMode();
 
-       /* Must pop active snapshot so resowner.c doesn't complain. */
+       /* Must pop active snapshot so snapmgr.c doesn't complain. */
        PopActiveSnapshot();
 
        /* Shut down the parallel-worker transaction. */