]> granicus.if.org Git - postgresql/blobdiff - src/backend/access/transam/parallel.c
Fix initialization of fake LSN for unlogged relations
[postgresql] / src / backend / access / transam / parallel.c
index cdaa32e29a487281e91d6068dda4c750fdadb89a..55d129a64f7f344b3136e79609377974e073006a 100644 (file)
@@ -3,7 +3,7 @@
  * parallel.c
  *       Infrastructure for launching parallel workers
  *
- * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
@@ -19,6 +19,7 @@
 #include "access/session.h"
 #include "access/xact.h"
 #include "access/xlog.h"
+#include "catalog/pg_enum.h"
 #include "catalog/index.h"
 #include "catalog/namespace.h"
 #include "commands/async.h"
 #include "libpq/pqformat.h"
 #include "libpq/pqmq.h"
 #include "miscadmin.h"
-#include "optimizer/planmain.h"
+#include "optimizer/optimizer.h"
 #include "pgstat.h"
 #include "storage/ipc.h"
+#include "storage/predicate.h"
 #include "storage/sinval.h"
 #include "storage/spin.h"
 #include "tcop/tcopprot.h"
@@ -71,6 +73,7 @@
 #define PARALLEL_KEY_SESSION_DSM                       UINT64CONST(0xFFFFFFFFFFFF000A)
 #define PARALLEL_KEY_REINDEX_STATE                     UINT64CONST(0xFFFFFFFFFFFF000B)
 #define PARALLEL_KEY_RELMAPPER_STATE           UINT64CONST(0xFFFFFFFFFFFF000C)
+#define PARALLEL_KEY_ENUMBLACKLIST                     UINT64CONST(0xFFFFFFFFFFFF000D)
 
 /* Fixed-size parallel state. */
 typedef struct FixedParallelState
@@ -87,6 +90,9 @@ typedef struct FixedParallelState
        PGPROC     *parallel_master_pgproc;
        pid_t           parallel_master_pid;
        BackendId       parallel_master_backend_id;
+       TimestampTz xact_ts;
+       TimestampTz stmt_ts;
+       SerializableXactHandle serializable_xact_handle;
 
        /* Mutex protects remaining fields. */
        slock_t         mutex;
@@ -151,7 +157,7 @@ static void ParallelWorkerShutdown(int code, Datum arg);
  */
 ParallelContext *
 CreateParallelContext(const char *library_name, const char *function_name,
-                                         int nworkers, bool serializable_okay)
+                                         int nworkers)
 {
        MemoryContext oldcontext;
        ParallelContext *pcxt;
@@ -162,16 +168,6 @@ CreateParallelContext(const char *library_name, const char *function_name,
        /* Number of workers should be non-negative. */
        Assert(nworkers >= 0);
 
-       /*
-        * If we are running under serializable isolation, we can't use parallel
-        * workers, at least not until somebody enhances that mechanism to be
-        * parallel-aware.  Utility statement callers may ask us to ignore this
-        * restriction because they're always able to safely ignore the fact that
-        * SIREAD locks do not work with parallelism.
-        */
-       if (IsolationIsSerializable() && !serializable_okay)
-               nworkers = 0;
-
        /* We might be running in a short-lived memory context. */
        oldcontext = MemoryContextSwitchTo(TopTransactionContext);
 
@@ -208,6 +204,7 @@ InitializeParallelDSM(ParallelContext *pcxt)
        Size            tstatelen = 0;
        Size            reindexlen = 0;
        Size            relmapperlen = 0;
+       Size            enumblacklistlen = 0;
        Size            segsize = 0;
        int                     i;
        FixedParallelState *fps;
@@ -261,8 +258,10 @@ InitializeParallelDSM(ParallelContext *pcxt)
                shm_toc_estimate_chunk(&pcxt->estimator, reindexlen);
                relmapperlen = EstimateRelationMapSpace();
                shm_toc_estimate_chunk(&pcxt->estimator, relmapperlen);
+               enumblacklistlen = EstimateEnumBlacklistSpace();
+               shm_toc_estimate_chunk(&pcxt->estimator, enumblacklistlen);
                /* If you add more chunks here, you probably need to add keys. */
-               shm_toc_estimate_keys(&pcxt->estimator, 9);
+               shm_toc_estimate_keys(&pcxt->estimator, 10);
 
                /* Estimate space need for error queues. */
                StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) ==
@@ -318,6 +317,9 @@ InitializeParallelDSM(ParallelContext *pcxt)
        fps->parallel_master_pgproc = MyProc;
        fps->parallel_master_pid = MyProcPid;
        fps->parallel_master_backend_id = MyBackendId;
+       fps->xact_ts = GetCurrentTransactionStartTimestamp();
+       fps->stmt_ts = GetCurrentStatementStartTimestamp();
+       fps->serializable_xact_handle = ShareSerializableXact();
        SpinLockInit(&fps->mutex);
        fps->last_xlog_end = 0;
        shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps);
@@ -336,6 +338,7 @@ InitializeParallelDSM(ParallelContext *pcxt)
                char       *error_queue_space;
                char       *session_dsm_handle_space;
                char       *entrypointstate;
+               char       *enumblacklistspace;
                Size            lnamelen;
 
                /* Serialize shared libraries we have loaded. */
@@ -385,6 +388,12 @@ InitializeParallelDSM(ParallelContext *pcxt)
                shm_toc_insert(pcxt->toc, PARALLEL_KEY_RELMAPPER_STATE,
                                           relmapperspace);
 
+               /* Serialize enum blacklist state. */
+               enumblacklistspace = shm_toc_allocate(pcxt->toc, enumblacklistlen);
+               SerializeEnumBlacklist(enumblacklistspace, enumblacklistlen);
+               shm_toc_insert(pcxt->toc, PARALLEL_KEY_ENUMBLACKLIST,
+                                          enumblacklistspace);
+
                /* Allocate space for worker information. */
                pcxt->worker = palloc0(sizeof(ParallelWorkerInfo) * pcxt->nworkers);
 
@@ -676,13 +685,9 @@ WaitForParallelWorkersToAttach(ParallelContext *pcxt)
                                 * just end up waiting for the same worker again.
                                 */
                                rc = WaitLatch(MyLatch,
-                                                          WL_LATCH_SET | WL_POSTMASTER_DEATH,
+                                                          WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
                                                           -1, WAIT_EVENT_BGWORKER_STARTUP);
 
-                               /* emergency bailout if postmaster has died */
-                               if (rc & WL_POSTMASTER_DEATH)
-                                       proc_exit(1);
-
                                if (rc & WL_LATCH_SET)
                                        ResetLatch(MyLatch);
                        }
@@ -799,8 +804,8 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt)
                        }
                }
 
-               WaitLatch(MyLatch, WL_LATCH_SET, -1,
-                                 WAIT_EVENT_PARALLEL_FINISH);
+               (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
+                                                WAIT_EVENT_PARALLEL_FINISH);
                ResetLatch(MyLatch);
        }
 
@@ -1218,6 +1223,7 @@ ParallelWorkerMain(Datum main_arg)
        char       *tstatespace;
        char       *reindexspace;
        char       *relmapperspace;
+       char       *enumblacklistspace;
        StringInfoData msgbuf;
        char       *session_dsm_handle_space;
 
@@ -1311,6 +1317,13 @@ ParallelWorkerMain(Datum main_arg)
                                                           fps->parallel_master_pid))
                return;
 
+       /*
+        * Restore transaction and statement start-time timestamps.  This must
+        * happen before anything that would start a transaction, else asserts in
+        * xact.c will fire.
+        */
+       SetParallelStartTimestamps(fps->xact_ts, fps->stmt_ts);
+
        /*
         * Identify the entry point to be called.  In theory this could result in
         * loading an additional library, though most likely the entry point is in
@@ -1397,6 +1410,14 @@ ParallelWorkerMain(Datum main_arg)
        relmapperspace = shm_toc_lookup(toc, PARALLEL_KEY_RELMAPPER_STATE, false);
        RestoreRelationMap(relmapperspace);
 
+       /* Restore enum blacklist. */
+       enumblacklistspace = shm_toc_lookup(toc, PARALLEL_KEY_ENUMBLACKLIST,
+                                                                               false);
+       RestoreEnumBlacklist(enumblacklistspace);
+
+       /* Attach to the leader's serializable transaction, if SERIALIZABLE. */
+       AttachSerializableXact(fps->serializable_xact_handle);
+
        /*
         * We've initialized all of our state now; nothing should change
         * hereafter.