From: Robert Haas Date: Fri, 19 Jan 2018 12:48:44 +0000 (-0500) Subject: Transfer state pertaining to pending REINDEX operations to workers. X-Git-Tag: REL_11_BETA1~923 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=29d58fd3;p=postgresql Transfer state pertaining to pending REINDEX operations to workers. This will allow the pending patch for parallel CREATE INDEX to work on system catalogs, and to provide the same level of protection against use of user indexes while they are being rebuilt that we have for non-parallel CREATE INDEX. Patch by me, reviewed by Peter Geoghegan. Discussion: http://postgr.es/m/CA+TgmoYN-YQU9JsGQcqFLovZ-C+Xgp1_xhJQad=cunGG-_p5gg@mail.gmail.com Discussion: http://postgr.es/m/CAH2-Wzkv4UNkXYhqQRqk-u9rS7h5c-4cCW+EqQ8K_WSeS43aZg@mail.gmail.com --- diff --git a/src/backend/access/transam/README.parallel b/src/backend/access/transam/README.parallel index 5c33c40ae9..32994719e3 100644 --- a/src/backend/access/transam/README.parallel +++ b/src/backend/access/transam/README.parallel @@ -122,6 +122,9 @@ worker. This includes: values are restored, this incidentally sets SessionUserId and OuterUserId to the correct values. This final step restores CurrentUserId. + - State related to pending REINDEX operations, which prevents access to + an index that is currently being rebuilt. + To prevent undetected or unprincipled deadlocks when running in parallel mode, this could should eventually handle heavyweight locks in some way. This is not implemented yet. diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index f720896e50..0a0157a878 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -18,6 +18,7 @@ #include "access/session.h" #include "access/xact.h" #include "access/xlog.h" +#include "catalog/index.h" #include "catalog/namespace.h" #include "commands/async.h" #include "executor/execParallel.h" @@ -67,6 +68,7 @@ #define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008) #define PARALLEL_KEY_ENTRYPOINT UINT64CONST(0xFFFFFFFFFFFF0009) #define PARALLEL_KEY_SESSION_DSM UINT64CONST(0xFFFFFFFFFFFF000A) +#define PARALLEL_KEY_REINDEX_STATE UINT64CONST(0xFFFFFFFFFFFF000B) /* Fixed-size parallel state. */ typedef struct FixedParallelState @@ -200,6 +202,7 @@ InitializeParallelDSM(ParallelContext *pcxt) Size tsnaplen = 0; Size asnaplen = 0; Size tstatelen = 0; + Size reindexlen = 0; Size segsize = 0; int i; FixedParallelState *fps; @@ -249,8 +252,10 @@ InitializeParallelDSM(ParallelContext *pcxt) tstatelen = EstimateTransactionStateSpace(); shm_toc_estimate_chunk(&pcxt->estimator, tstatelen); shm_toc_estimate_chunk(&pcxt->estimator, sizeof(dsm_handle)); + reindexlen = EstimateReindexStateSpace(); + shm_toc_estimate_chunk(&pcxt->estimator, reindexlen); /* If you add more chunks here, you probably need to add keys. */ - shm_toc_estimate_keys(&pcxt->estimator, 7); + shm_toc_estimate_keys(&pcxt->estimator, 8); /* Estimate space need for error queues. */ StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) == @@ -319,6 +324,7 @@ InitializeParallelDSM(ParallelContext *pcxt) char *tsnapspace; char *asnapspace; char *tstatespace; + char *reindexspace; char *error_queue_space; char *session_dsm_handle_space; char *entrypointstate; @@ -360,6 +366,11 @@ InitializeParallelDSM(ParallelContext *pcxt) SerializeTransactionState(tstatelen, tstatespace); shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_STATE, tstatespace); + /* Serialize reindex state. */ + reindexspace = shm_toc_allocate(pcxt->toc, reindexlen); + SerializeReindexState(reindexlen, reindexspace); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_REINDEX_STATE, reindexspace); + /* Allocate space for worker information. */ pcxt->worker = palloc0(sizeof(ParallelWorkerInfo) * pcxt->nworkers); @@ -972,6 +983,7 @@ ParallelWorkerMain(Datum main_arg) char *tsnapspace; char *asnapspace; char *tstatespace; + char *reindexspace; StringInfoData msgbuf; char *session_dsm_handle_space; @@ -1137,6 +1149,10 @@ ParallelWorkerMain(Datum main_arg) /* Set ParallelMasterBackendId so we know how to address temp relations. */ ParallelMasterBackendId = fps->parallel_master_backend_id; + /* Restore reindex state. */ + reindexspace = shm_toc_lookup(toc, PARALLEL_KEY_REINDEX_STATE, false); + RestoreReindexState(reindexspace); + /* * We've initialized all of our state now; nothing should change * hereafter. diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c index 330488b96f..007b929a6f 100644 --- a/src/backend/catalog/index.c +++ b/src/backend/catalog/index.c @@ -86,6 +86,18 @@ typedef struct tups_inserted; } v_i_state; +/* + * Pointer-free representation of variables used when reindexing system + * catalogs; we use this to propagate those values to parallel workers. + */ +typedef struct +{ + Oid currentlyReindexedHeap; + Oid currentlyReindexedIndex; + int numPendingReindexedIndexes; + Oid pendingReindexedIndexes[FLEXIBLE_ARRAY_MEMBER]; +} SerializedReindexState; + /* non-export function prototypes */ static bool relationHasPrimaryKey(Relation rel); static TupleDesc ConstructTupleDescriptor(Relation heapRelation, @@ -3653,7 +3665,8 @@ reindex_relation(Oid relid, int flags, int options) * When we are busy reindexing a system index, this code provides support * for preventing catalog lookups from using that index. We also make use * of this to catch attempted uses of user indexes during reindexing of - * those indexes. + * those indexes. This information is propagated to parallel workers; + * attempting to change it during a parallel operation is not permitted. * ---------------------------------------------------------------- */ @@ -3719,6 +3732,8 @@ SetReindexProcessing(Oid heapOid, Oid indexOid) static void ResetReindexProcessing(void) { + if (IsInParallelMode()) + elog(ERROR, "cannot modify reindex state during a parallel operation"); currentlyReindexedHeap = InvalidOid; currentlyReindexedIndex = InvalidOid; } @@ -3736,6 +3751,8 @@ SetReindexPending(List *indexes) /* Reindexing is not re-entrant. */ if (pendingReindexedIndexes) elog(ERROR, "cannot reindex while reindexing"); + if (IsInParallelMode()) + elog(ERROR, "cannot modify reindex state during a parallel operation"); pendingReindexedIndexes = list_copy(indexes); } @@ -3746,6 +3763,8 @@ SetReindexPending(List *indexes) static void RemoveReindexPending(Oid indexOid) { + if (IsInParallelMode()) + elog(ERROR, "cannot modify reindex state during a parallel operation"); pendingReindexedIndexes = list_delete_oid(pendingReindexedIndexes, indexOid); } @@ -3757,5 +3776,59 @@ RemoveReindexPending(Oid indexOid) static void ResetReindexPending(void) { + if (IsInParallelMode()) + elog(ERROR, "cannot modify reindex state during a parallel operation"); pendingReindexedIndexes = NIL; } + +/* + * EstimateReindexStateSpace + * Estimate space needed to pass reindex state to parallel workers. + */ +extern Size +EstimateReindexStateSpace(void) +{ + return offsetof(SerializedReindexState, pendingReindexedIndexes) + + mul_size(sizeof(Oid), list_length(pendingReindexedIndexes)); +} + +/* + * SerializeReindexState + * Serialize reindex state for parallel workers. + */ +extern void +SerializeReindexState(Size maxsize, char *start_address) +{ + SerializedReindexState *sistate = (SerializedReindexState *) start_address; + int c = 0; + ListCell *lc; + + sistate->currentlyReindexedHeap = currentlyReindexedHeap; + sistate->currentlyReindexedIndex = currentlyReindexedIndex; + sistate->numPendingReindexedIndexes = list_length(pendingReindexedIndexes); + foreach(lc, pendingReindexedIndexes) + sistate->pendingReindexedIndexes[c++] = lfirst_oid(lc); +} + +/* + * RestoreReindexState + * Restore reindex state in a parallel worker. + */ +extern void +RestoreReindexState(void *reindexstate) +{ + SerializedReindexState *sistate = (SerializedReindexState *) reindexstate; + int c = 0; + MemoryContext oldcontext; + + currentlyReindexedHeap = sistate->currentlyReindexedHeap; + currentlyReindexedIndex = sistate->currentlyReindexedIndex; + + Assert(pendingReindexedIndexes == NIL); + oldcontext = MemoryContextSwitchTo(TopMemoryContext); + for (c = 0; c < sistate->numPendingReindexedIndexes; ++c) + pendingReindexedIndexes = + lappend_oid(pendingReindexedIndexes, + sistate->pendingReindexedIndexes[c]); + MemoryContextSwitchTo(oldcontext); +} diff --git a/src/include/catalog/index.h b/src/include/catalog/index.h index 12bf35567a..4790f0c735 100644 --- a/src/include/catalog/index.h +++ b/src/include/catalog/index.h @@ -134,4 +134,8 @@ extern bool ReindexIsProcessingHeap(Oid heapOid); extern bool ReindexIsProcessingIndex(Oid indexOid); extern Oid IndexGetRelation(Oid indexId, bool missing_ok); +extern Size EstimateReindexStateSpace(void); +extern void SerializeReindexState(Size maxsize, char *start_address); +extern void RestoreReindexState(void *reindexstate); + #endif /* INDEX_H */