]> granicus.if.org Git - postgresql/commitdiff
Transfer state pertaining to pending REINDEX operations to workers.
authorRobert Haas <rhaas@postgresql.org>
Fri, 19 Jan 2018 12:48:44 +0000 (07:48 -0500)
committerRobert Haas <rhaas@postgresql.org>
Fri, 19 Jan 2018 12:48:54 +0000 (07:48 -0500)
This will allow the pending patch for parallel CREATE INDEX to work
on system catalogs, and to provide the same level of protection
against use of user indexes while they are being rebuilt that we
have for non-parallel CREATE INDEX.

Patch by me, reviewed by Peter Geoghegan.

Discussion: http://postgr.es/m/CA+TgmoYN-YQU9JsGQcqFLovZ-C+Xgp1_xhJQad=cunGG-_p5gg@mail.gmail.com
Discussion: http://postgr.es/m/CAH2-Wzkv4UNkXYhqQRqk-u9rS7h5c-4cCW+EqQ8K_WSeS43aZg@mail.gmail.com

src/backend/access/transam/README.parallel
src/backend/access/transam/parallel.c
src/backend/catalog/index.c
src/include/catalog/index.h

index 5c33c40ae957080cef3c94ed4145556016436f11..32994719e3b039fec02b9bb54689ec410f1ca5f0 100644 (file)
@@ -122,6 +122,9 @@ worker.  This includes:
     values are restored, this incidentally sets SessionUserId and OuterUserId
     to the correct values.  This final step restores CurrentUserId.
 
+  - State related to pending REINDEX operations, which prevents access to
+    an index that is currently being rebuilt.
+
 To prevent undetected or unprincipled deadlocks when running in parallel mode,
 this could should eventually handle heavyweight locks in some way.  This is
 not implemented yet.
index f720896e5071f2a4db70655daaf05a2660b2930b..0a0157a8781b88c161a2e86652de1f499018ba28 100644 (file)
@@ -18,6 +18,7 @@
 #include "access/session.h"
 #include "access/xact.h"
 #include "access/xlog.h"
+#include "catalog/index.h"
 #include "catalog/namespace.h"
 #include "commands/async.h"
 #include "executor/execParallel.h"
@@ -67,6 +68,7 @@
 #define PARALLEL_KEY_TRANSACTION_STATE         UINT64CONST(0xFFFFFFFFFFFF0008)
 #define PARALLEL_KEY_ENTRYPOINT                                UINT64CONST(0xFFFFFFFFFFFF0009)
 #define PARALLEL_KEY_SESSION_DSM                       UINT64CONST(0xFFFFFFFFFFFF000A)
+#define PARALLEL_KEY_REINDEX_STATE                     UINT64CONST(0xFFFFFFFFFFFF000B)
 
 /* Fixed-size parallel state. */
 typedef struct FixedParallelState
@@ -200,6 +202,7 @@ InitializeParallelDSM(ParallelContext *pcxt)
        Size            tsnaplen = 0;
        Size            asnaplen = 0;
        Size            tstatelen = 0;
+       Size            reindexlen = 0;
        Size            segsize = 0;
        int                     i;
        FixedParallelState *fps;
@@ -249,8 +252,10 @@ InitializeParallelDSM(ParallelContext *pcxt)
                tstatelen = EstimateTransactionStateSpace();
                shm_toc_estimate_chunk(&pcxt->estimator, tstatelen);
                shm_toc_estimate_chunk(&pcxt->estimator, sizeof(dsm_handle));
+               reindexlen = EstimateReindexStateSpace();
+               shm_toc_estimate_chunk(&pcxt->estimator, reindexlen);
                /* If you add more chunks here, you probably need to add keys. */
-               shm_toc_estimate_keys(&pcxt->estimator, 7);
+               shm_toc_estimate_keys(&pcxt->estimator, 8);
 
                /* Estimate space need for error queues. */
                StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) ==
@@ -319,6 +324,7 @@ InitializeParallelDSM(ParallelContext *pcxt)
                char       *tsnapspace;
                char       *asnapspace;
                char       *tstatespace;
+               char       *reindexspace;
                char       *error_queue_space;
                char       *session_dsm_handle_space;
                char       *entrypointstate;
@@ -360,6 +366,11 @@ InitializeParallelDSM(ParallelContext *pcxt)
                SerializeTransactionState(tstatelen, tstatespace);
                shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_STATE, tstatespace);
 
+               /* Serialize reindex state. */
+               reindexspace = shm_toc_allocate(pcxt->toc, reindexlen);
+               SerializeReindexState(reindexlen, reindexspace);
+               shm_toc_insert(pcxt->toc, PARALLEL_KEY_REINDEX_STATE, reindexspace);
+
                /* Allocate space for worker information. */
                pcxt->worker = palloc0(sizeof(ParallelWorkerInfo) * pcxt->nworkers);
 
@@ -972,6 +983,7 @@ ParallelWorkerMain(Datum main_arg)
        char       *tsnapspace;
        char       *asnapspace;
        char       *tstatespace;
+       char       *reindexspace;
        StringInfoData msgbuf;
        char       *session_dsm_handle_space;
 
@@ -1137,6 +1149,10 @@ ParallelWorkerMain(Datum main_arg)
        /* Set ParallelMasterBackendId so we know how to address temp relations. */
        ParallelMasterBackendId = fps->parallel_master_backend_id;
 
+       /* Restore reindex state. */
+       reindexspace = shm_toc_lookup(toc, PARALLEL_KEY_REINDEX_STATE, false);
+       RestoreReindexState(reindexspace);
+
        /*
         * We've initialized all of our state now; nothing should change
         * hereafter.
index 330488b96f559f4429a09a02eaaaf33275defddc..007b929a6faaef4ab03235d11ff58954c0ce0cfd 100644 (file)
@@ -86,6 +86,18 @@ typedef struct
                                tups_inserted;
 } v_i_state;
 
+/*
+ * Pointer-free representation of variables used when reindexing system
+ * catalogs; we use this to propagate those values to parallel workers.
+ */
+typedef struct
+{
+       Oid                     currentlyReindexedHeap;
+       Oid                     currentlyReindexedIndex;
+       int                     numPendingReindexedIndexes;
+       Oid                     pendingReindexedIndexes[FLEXIBLE_ARRAY_MEMBER];
+} SerializedReindexState;
+
 /* non-export function prototypes */
 static bool relationHasPrimaryKey(Relation rel);
 static TupleDesc ConstructTupleDescriptor(Relation heapRelation,
@@ -3653,7 +3665,8 @@ reindex_relation(Oid relid, int flags, int options)
  * When we are busy reindexing a system index, this code provides support
  * for preventing catalog lookups from using that index.  We also make use
  * of this to catch attempted uses of user indexes during reindexing of
- * those indexes.
+ * those indexes.  This information is propagated to parallel workers;
+ * attempting to change it during a parallel operation is not permitted.
  * ----------------------------------------------------------------
  */
 
@@ -3719,6 +3732,8 @@ SetReindexProcessing(Oid heapOid, Oid indexOid)
 static void
 ResetReindexProcessing(void)
 {
+       if (IsInParallelMode())
+               elog(ERROR, "cannot modify reindex state during a parallel operation");
        currentlyReindexedHeap = InvalidOid;
        currentlyReindexedIndex = InvalidOid;
 }
@@ -3736,6 +3751,8 @@ SetReindexPending(List *indexes)
        /* Reindexing is not re-entrant. */
        if (pendingReindexedIndexes)
                elog(ERROR, "cannot reindex while reindexing");
+       if (IsInParallelMode())
+               elog(ERROR, "cannot modify reindex state during a parallel operation");
        pendingReindexedIndexes = list_copy(indexes);
 }
 
@@ -3746,6 +3763,8 @@ SetReindexPending(List *indexes)
 static void
 RemoveReindexPending(Oid indexOid)
 {
+       if (IsInParallelMode())
+               elog(ERROR, "cannot modify reindex state during a parallel operation");
        pendingReindexedIndexes = list_delete_oid(pendingReindexedIndexes,
                                                                                          indexOid);
 }
@@ -3757,5 +3776,59 @@ RemoveReindexPending(Oid indexOid)
 static void
 ResetReindexPending(void)
 {
+       if (IsInParallelMode())
+               elog(ERROR, "cannot modify reindex state during a parallel operation");
        pendingReindexedIndexes = NIL;
 }
+
+/*
+ * EstimateReindexStateSpace
+ *             Estimate space needed to pass reindex state to parallel workers.
+ */
+extern Size
+EstimateReindexStateSpace(void)
+{
+       return offsetof(SerializedReindexState, pendingReindexedIndexes)
+               + mul_size(sizeof(Oid), list_length(pendingReindexedIndexes));
+}
+
+/*
+ * SerializeReindexState
+ *             Serialize reindex state for parallel workers.
+ */
+extern void
+SerializeReindexState(Size maxsize, char *start_address)
+{
+       SerializedReindexState *sistate = (SerializedReindexState *) start_address;
+       int                     c = 0;
+       ListCell   *lc;
+
+       sistate->currentlyReindexedHeap = currentlyReindexedHeap;
+       sistate->currentlyReindexedIndex = currentlyReindexedIndex;
+       sistate->numPendingReindexedIndexes = list_length(pendingReindexedIndexes);
+       foreach(lc, pendingReindexedIndexes)
+               sistate->pendingReindexedIndexes[c++] = lfirst_oid(lc);
+}
+
+/*
+ * RestoreReindexState
+ *             Restore reindex state in a parallel worker.
+ */
+extern void
+RestoreReindexState(void *reindexstate)
+{
+       SerializedReindexState *sistate = (SerializedReindexState *) reindexstate;
+       int                     c = 0;
+       MemoryContext   oldcontext;
+
+       currentlyReindexedHeap = sistate->currentlyReindexedHeap;
+       currentlyReindexedIndex = sistate->currentlyReindexedIndex;
+
+       Assert(pendingReindexedIndexes == NIL);
+       oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+       for (c = 0; c < sistate->numPendingReindexedIndexes; ++c)
+               pendingReindexedIndexes =
+                       lappend_oid(pendingReindexedIndexes,
+                                               sistate->pendingReindexedIndexes[c]);
+       MemoryContextSwitchTo(oldcontext);
+}
index 12bf35567a7e334a9cf36624c2c6bfac4bd3f2f4..4790f0c735f4c258b68981437b130bd43f841937 100644 (file)
@@ -134,4 +134,8 @@ extern bool ReindexIsProcessingHeap(Oid heapOid);
 extern bool ReindexIsProcessingIndex(Oid indexOid);
 extern Oid     IndexGetRelation(Oid indexId, bool missing_ok);
 
+extern Size EstimateReindexStateSpace(void);
+extern void SerializeReindexState(Size maxsize, char *start_address);
+extern void RestoreReindexState(void *reindexstate);
+
 #endif                                                 /* INDEX_H */