#include "access/session.h"
#include "access/xact.h"
#include "access/xlog.h"
+#include "catalog/index.h"
#include "catalog/namespace.h"
#include "commands/async.h"
#include "executor/execParallel.h"
#define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008)
#define PARALLEL_KEY_ENTRYPOINT UINT64CONST(0xFFFFFFFFFFFF0009)
#define PARALLEL_KEY_SESSION_DSM UINT64CONST(0xFFFFFFFFFFFF000A)
+#define PARALLEL_KEY_REINDEX_STATE UINT64CONST(0xFFFFFFFFFFFF000B)
/* Fixed-size parallel state. */
typedef struct FixedParallelState
Size tsnaplen = 0;
Size asnaplen = 0;
Size tstatelen = 0;
+ Size reindexlen = 0;
Size segsize = 0;
int i;
FixedParallelState *fps;
tstatelen = EstimateTransactionStateSpace();
shm_toc_estimate_chunk(&pcxt->estimator, tstatelen);
shm_toc_estimate_chunk(&pcxt->estimator, sizeof(dsm_handle));
+ reindexlen = EstimateReindexStateSpace();
+ shm_toc_estimate_chunk(&pcxt->estimator, reindexlen);
/* If you add more chunks here, you probably need to add keys. */
- shm_toc_estimate_keys(&pcxt->estimator, 7);
+ shm_toc_estimate_keys(&pcxt->estimator, 8);
/* Estimate space need for error queues. */
StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) ==
char *tsnapspace;
char *asnapspace;
char *tstatespace;
+ char *reindexspace;
char *error_queue_space;
char *session_dsm_handle_space;
char *entrypointstate;
SerializeTransactionState(tstatelen, tstatespace);
shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_STATE, tstatespace);
+ /* Serialize reindex state. */
+ reindexspace = shm_toc_allocate(pcxt->toc, reindexlen);
+ SerializeReindexState(reindexlen, reindexspace);
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_REINDEX_STATE, reindexspace);
+
/* Allocate space for worker information. */
pcxt->worker = palloc0(sizeof(ParallelWorkerInfo) * pcxt->nworkers);
char *tsnapspace;
char *asnapspace;
char *tstatespace;
+ char *reindexspace;
StringInfoData msgbuf;
char *session_dsm_handle_space;
/* Set ParallelMasterBackendId so we know how to address temp relations. */
ParallelMasterBackendId = fps->parallel_master_backend_id;
+ /* Restore reindex state. */
+ reindexspace = shm_toc_lookup(toc, PARALLEL_KEY_REINDEX_STATE, false);
+ RestoreReindexState(reindexspace);
+
/*
* We've initialized all of our state now; nothing should change
* hereafter.
tups_inserted;
} v_i_state;
+/*
+ * Pointer-free representation of variables used when reindexing system
+ * catalogs; we use this to propagate those values to parallel workers.
+ */
+typedef struct
+{
+ Oid currentlyReindexedHeap;
+ Oid currentlyReindexedIndex;
+ int numPendingReindexedIndexes;
+ Oid pendingReindexedIndexes[FLEXIBLE_ARRAY_MEMBER];
+} SerializedReindexState;
+
/* non-export function prototypes */
static bool relationHasPrimaryKey(Relation rel);
static TupleDesc ConstructTupleDescriptor(Relation heapRelation,
* When we are busy reindexing a system index, this code provides support
* for preventing catalog lookups from using that index. We also make use
* of this to catch attempted uses of user indexes during reindexing of
- * those indexes.
+ * those indexes. This information is propagated to parallel workers;
+ * attempting to change it during a parallel operation is not permitted.
* ----------------------------------------------------------------
*/
static void
ResetReindexProcessing(void)
{
+ if (IsInParallelMode())
+ elog(ERROR, "cannot modify reindex state during a parallel operation");
currentlyReindexedHeap = InvalidOid;
currentlyReindexedIndex = InvalidOid;
}
/* Reindexing is not re-entrant. */
if (pendingReindexedIndexes)
elog(ERROR, "cannot reindex while reindexing");
+ if (IsInParallelMode())
+ elog(ERROR, "cannot modify reindex state during a parallel operation");
pendingReindexedIndexes = list_copy(indexes);
}
static void
RemoveReindexPending(Oid indexOid)
{
+ if (IsInParallelMode())
+ elog(ERROR, "cannot modify reindex state during a parallel operation");
pendingReindexedIndexes = list_delete_oid(pendingReindexedIndexes,
indexOid);
}
static void
ResetReindexPending(void)
{
+ if (IsInParallelMode())
+ elog(ERROR, "cannot modify reindex state during a parallel operation");
pendingReindexedIndexes = NIL;
}
+
+/*
+ * EstimateReindexStateSpace
+ * Estimate space needed to pass reindex state to parallel workers.
+ */
+extern Size
+EstimateReindexStateSpace(void)
+{
+ return offsetof(SerializedReindexState, pendingReindexedIndexes)
+ + mul_size(sizeof(Oid), list_length(pendingReindexedIndexes));
+}
+
+/*
+ * SerializeReindexState
+ * Serialize reindex state for parallel workers.
+ */
+extern void
+SerializeReindexState(Size maxsize, char *start_address)
+{
+ SerializedReindexState *sistate = (SerializedReindexState *) start_address;
+ int c = 0;
+ ListCell *lc;
+
+ sistate->currentlyReindexedHeap = currentlyReindexedHeap;
+ sistate->currentlyReindexedIndex = currentlyReindexedIndex;
+ sistate->numPendingReindexedIndexes = list_length(pendingReindexedIndexes);
+ foreach(lc, pendingReindexedIndexes)
+ sistate->pendingReindexedIndexes[c++] = lfirst_oid(lc);
+}
+
+/*
+ * RestoreReindexState
+ * Restore reindex state in a parallel worker.
+ */
+extern void
+RestoreReindexState(void *reindexstate)
+{
+ SerializedReindexState *sistate = (SerializedReindexState *) reindexstate;
+ int c = 0;
+ MemoryContext oldcontext;
+
+ currentlyReindexedHeap = sistate->currentlyReindexedHeap;
+ currentlyReindexedIndex = sistate->currentlyReindexedIndex;
+
+ Assert(pendingReindexedIndexes == NIL);
+ oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+ for (c = 0; c < sistate->numPendingReindexedIndexes; ++c)
+ pendingReindexedIndexes =
+ lappend_oid(pendingReindexedIndexes,
+ sistate->pendingReindexedIndexes[c]);
+ MemoryContextSwitchTo(oldcontext);
+}