#include "utils/guc.h"
#include "utils/inval.h"
#include "utils/memutils.h"
+#include "utils/relmapper.h"
#include "utils/snapmgr.h"
#include "utils/typcache.h"
#define PARALLEL_KEY_ENTRYPOINT UINT64CONST(0xFFFFFFFFFFFF0009)
#define PARALLEL_KEY_SESSION_DSM UINT64CONST(0xFFFFFFFFFFFF000A)
#define PARALLEL_KEY_REINDEX_STATE UINT64CONST(0xFFFFFFFFFFFF000B)
+#define PARALLEL_KEY_RELMAPPER_STATE UINT64CONST(0xFFFFFFFFFFFF000C)
/* Fixed-size parallel state. */
typedef struct FixedParallelState
Size asnaplen = 0;
Size tstatelen = 0;
Size reindexlen = 0;
+ Size relmapperlen = 0;
Size segsize = 0;
int i;
FixedParallelState *fps;
shm_toc_estimate_chunk(&pcxt->estimator, sizeof(dsm_handle));
reindexlen = EstimateReindexStateSpace();
shm_toc_estimate_chunk(&pcxt->estimator, reindexlen);
+ relmapperlen = EstimateRelationMapSpace();
+ shm_toc_estimate_chunk(&pcxt->estimator, relmapperlen);
/* If you add more chunks here, you probably need to add keys. */
- shm_toc_estimate_keys(&pcxt->estimator, 8);
+ shm_toc_estimate_keys(&pcxt->estimator, 9);
/* Estimate space need for error queues. */
StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) ==
char *asnapspace;
char *tstatespace;
char *reindexspace;
+ char *relmapperspace;
char *error_queue_space;
char *session_dsm_handle_space;
char *entrypointstate;
SerializeReindexState(reindexlen, reindexspace);
shm_toc_insert(pcxt->toc, PARALLEL_KEY_REINDEX_STATE, reindexspace);
+ /* Serialize relmapper state. */
+ relmapperspace = shm_toc_allocate(pcxt->toc, relmapperlen);
+ SerializeRelationMap(relmapperlen, relmapperspace);
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_RELMAPPER_STATE,
+ relmapperspace);
+
/* Allocate space for worker information. */
pcxt->worker = palloc0(sizeof(ParallelWorkerInfo) * pcxt->nworkers);
char *asnapspace;
char *tstatespace;
char *reindexspace;
+ char *relmapperspace;
StringInfoData msgbuf;
char *session_dsm_handle_space;
reindexspace = shm_toc_lookup(toc, PARALLEL_KEY_REINDEX_STATE, false);
RestoreReindexState(reindexspace);
+ /* Restore relmapper state. */
+ relmapperspace = shm_toc_lookup(toc, PARALLEL_KEY_RELMAPPER_STATE, false);
+ RestoreRelationMap(relmapperspace);
+
/*
* We've initialized all of our state now; nothing should change
* hereafter.
int32 pad; /* to make the struct size be 512 exactly */
} RelMapFile;
+/*
+ * State for serializing local and shared relmappings for parallel workers
+ * (active states only). See notes on active_* and pending_* updates state.
+ */
+typedef struct SerializedActiveRelMaps
+{
+ RelMapFile active_shared_updates;
+ RelMapFile active_local_updates;
+} SerializedActiveRelMaps;
+
/*
* The currently known contents of the shared map file and our database's
* local map file are stored here. These can be reloaded from disk
* they will become active at the next CommandCounterIncrement. This setup
* lets map updates act similarly to updates of pg_class rows, ie, they
* become visible only at the next CommandCounterIncrement boundary.
+ *
+ * Active shared and active local updates are serialized by the parallel
+ * infrastructure, and deserialized within parallel workers.
*/
static RelMapFile active_shared_updates;
static RelMapFile active_local_updates;
else
{
/*
- * We don't currently support map changes within subtransactions. This
- * could be done with more bookkeeping infrastructure, but it doesn't
- * presently seem worth it.
+ * We don't currently support map changes within subtransactions, or
+ * when in parallel mode. This could be done with more bookkeeping
+ * infrastructure, but it doesn't presently seem worth it.
*/
if (GetCurrentTransactionNestLevel() > 1)
elog(ERROR, "cannot change relation mapping within subtransaction");
+ if (IsInParallelMode())
+ elog(ERROR, "cannot change relation mapping in parallel mode");
+
if (immediate)
{
/* Make it active, but only locally */
*
* During abort, we just have to throw away any pending map changes.
* Normal post-abort cleanup will take care of fixing relcache entries.
+ * Parallel worker commit/abort is handled by resetting active mappings
+ * that may have been received from the leader process. (There should be
+ * no pending updates in parallel workers.)
*/
void
-AtEOXact_RelationMap(bool isCommit)
+AtEOXact_RelationMap(bool isCommit, bool isParallelWorker)
{
- if (isCommit)
+ if (isCommit && !isParallelWorker)
{
/*
* We should not get here with any "pending" updates. (We could
}
else
{
- /* Abort --- drop all local and pending updates */
+ /* Abort or parallel worker --- drop all local and pending updates */
+ Assert(!isParallelWorker || pending_shared_updates.num_mappings == 0);
+ Assert(!isParallelWorker || pending_local_updates.num_mappings == 0);
+
active_shared_updates.num_mappings = 0;
active_local_updates.num_mappings = 0;
pending_shared_updates.num_mappings = 0;
load_relmap_file(false);
}
+/*
+ * EstimateRelationMapSpace
+ *
+ * Estimate space needed to pass active shared and local relmaps to parallel
+ * workers.
+ */
+Size
+EstimateRelationMapSpace(void)
+{
+ return sizeof(SerializedActiveRelMaps);
+}
+
+/*
+ * SerializeRelationMap
+ *
+ * Serialize active shared and local relmap state for parallel workers.
+ */
+void
+SerializeRelationMap(Size maxSize, char *startAddress)
+{
+ SerializedActiveRelMaps *relmaps;
+
+ Assert(maxSize >= EstimateRelationMapSpace());
+
+ relmaps = (SerializedActiveRelMaps *) startAddress;
+ relmaps->active_shared_updates = active_shared_updates;
+ relmaps->active_local_updates = active_local_updates;
+}
+
+/*
+ * RestoreRelationMap
+ *
+ * Restore active shared and local relmap state within a parallel worker.
+ */
+void
+RestoreRelationMap(char *startAddress)
+{
+ SerializedActiveRelMaps *relmaps;
+
+ if (active_shared_updates.num_mappings != 0 ||
+ active_local_updates.num_mappings != 0 ||
+ pending_shared_updates.num_mappings != 0 ||
+ pending_local_updates.num_mappings != 0)
+ elog(ERROR, "parallel worker has existing mappings");
+
+ relmaps = (SerializedActiveRelMaps *) startAddress;
+ active_shared_updates = relmaps->active_shared_updates;
+ active_local_updates = relmaps->active_local_updates;
+}
+
/*
* load_relmap_file -- load data from the shared or local map file
*
extern void RelationMapInvalidateAll(void);
extern void AtCCI_RelationMap(void);
-extern void AtEOXact_RelationMap(bool isCommit);
+extern void AtEOXact_RelationMap(bool isCommit, bool isParallelWorker);
extern void AtPrepare_RelationMap(void);
extern void CheckPointRelationMap(void);
extern void RelationMapInitializePhase2(void);
extern void RelationMapInitializePhase3(void);
+extern Size EstimateRelationMapSpace(void);
+extern void SerializeRelationMap(Size maxSize, char *startAddress);
+extern void RestoreRelationMap(char *startAddress);
+
extern void relmap_redo(XLogReaderState *record);
extern void relmap_desc(StringInfo buf, XLogReaderState *record);
extern const char *relmap_identify(uint8 info);