--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * session.c
+ * Encapsulation of user session.
+ *
+ * This is intended to contain data that needs to be shared between backends
+ * performing work for a client session. In particular such a session is
+ * shared between the leader and worker processes for parallel queries. At
+ * some later point it might also become useful infrastructure for separating
+ * backends from client connections, e.g. for the purpose of pooling.
+ *
+ * Currently this infrastructure is used to share:
+ * - typemod registry for ephemeral row-types, i.e. BlessTupleDesc etc.
+ *
+ * Portions Copyright (c) 2017, PostgreSQL Global Development Group
+ *
+ * src/backend/access/common/session.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/session.h"
+#include "storage/lwlock.h"
+#include "storage/shm_toc.h"
+#include "utils/memutils.h"
+#include "utils/typcache.h"
+
+/* Magic number for per-session DSM TOC. */
+#define SESSION_MAGIC 0xabb0fbc9
+
+/*
+ * We want to create a DSA area to store shared state that has the same
+ * lifetime as a session. So far, it's only used to hold the shared record
+ * type registry. We don't want it to have to create any DSM segments just
+ * yet in common cases, so we'll give it enough space to hold a very small
+ * SharedRecordTypmodRegistry.
+ */
+#define SESSION_DSA_SIZE 0x30000
+
+/*
+ * Magic numbers for state sharing in the per-session DSM area.
+ */
+#define SESSION_KEY_DSA UINT64CONST(0xFFFFFFFFFFFF0001)
+#define SESSION_KEY_RECORD_TYPMOD_REGISTRY UINT64CONST(0xFFFFFFFFFFFF0002)
+
+/* This backend's current session. */
+Session *CurrentSession = NULL;
+
+/*
+ * Set up CurrentSession to point to an empty Session object.
+ */
+void
+InitializeSession(void)
+{
+ CurrentSession = MemoryContextAllocZero(TopMemoryContext, sizeof(Session));
+}
+
+/*
+ * Initialize the per-session DSM segment if it isn't already initialized, and
+ * return its handle so that worker processes can attach to it.
+ *
+ * Unlike the per-context DSM segment, this segement and its contents are
+ * reused for future parallel queries.
+ *
+ * Return DSM_HANDLE_INVALID if a segment can't be allocated due to lack of
+ * resources.
+ */
+dsm_handle
+GetSessionDsmHandle(void)
+{
+ shm_toc_estimator estimator;
+ shm_toc *toc;
+ dsm_segment *seg;
+ size_t typmod_registry_size;
+ size_t size;
+ void *dsa_space;
+ void *typmod_registry_space;
+ dsa_area *dsa;
+ MemoryContext old_context;
+
+ /*
+ * If we have already created a session-scope DSM segment in this backend,
+ * return its handle. The same segment will be used for the rest of this
+ * backend's lifetime.
+ */
+ if (CurrentSession->segment != NULL)
+ return dsm_segment_handle(CurrentSession->segment);
+
+ /* Otherwise, prepare to set one up. */
+ old_context = MemoryContextSwitchTo(TopMemoryContext);
+ shm_toc_initialize_estimator(&estimator);
+
+ /* Estimate space for the per-session DSA area. */
+ shm_toc_estimate_keys(&estimator, 1);
+ shm_toc_estimate_chunk(&estimator, SESSION_DSA_SIZE);
+
+ /* Estimate space for the per-session record typmod registry. */
+ typmod_registry_size = SharedRecordTypmodRegistryEstimate();
+ shm_toc_estimate_keys(&estimator, 1);
+ shm_toc_estimate_chunk(&estimator, typmod_registry_size);
+
+ /* Set up segment and TOC. */
+ size = shm_toc_estimate(&estimator);
+ seg = dsm_create(size, DSM_CREATE_NULL_IF_MAXSEGMENTS);
+ if (seg == NULL)
+ {
+ MemoryContextSwitchTo(old_context);
+
+ return DSM_HANDLE_INVALID;
+ }
+ toc = shm_toc_create(SESSION_MAGIC,
+ dsm_segment_address(seg),
+ size);
+
+ /* Create per-session DSA area. */
+ dsa_space = shm_toc_allocate(toc, SESSION_DSA_SIZE);
+ dsa = dsa_create_in_place(dsa_space,
+ SESSION_DSA_SIZE,
+ LWTRANCHE_SESSION_DSA,
+ seg);
+ shm_toc_insert(toc, SESSION_KEY_DSA, dsa_space);
+
+
+ /* Create session-scoped shared record typmod registry. */
+ typmod_registry_space = shm_toc_allocate(toc, typmod_registry_size);
+ SharedRecordTypmodRegistryInit((SharedRecordTypmodRegistry *)
+ typmod_registry_space, seg, dsa);
+ shm_toc_insert(toc, SESSION_KEY_RECORD_TYPMOD_REGISTRY,
+ typmod_registry_space);
+
+ /*
+ * If we got this far, we can pin the shared memory so it stays mapped for
+ * the rest of this backend's life. If we don't make it this far, cleanup
+ * callbacks for anything we installed above (ie currently
+ * SharedRecordTypemodRegistry) will run when the DSM segment is detached
+ * by CurrentResourceOwner so we aren't left with a broken CurrentSession.
+ */
+ dsm_pin_mapping(seg);
+ dsa_pin_mapping(dsa);
+
+ /* Make segment and area available via CurrentSession. */
+ CurrentSession->segment = seg;
+ CurrentSession->area = dsa;
+
+ MemoryContextSwitchTo(old_context);
+
+ return dsm_segment_handle(seg);
+}
+
+/*
+ * Attach to a per-session DSM segment provided by a parallel leader.
+ */
+void
+AttachSession(dsm_handle handle)
+{
+ dsm_segment *seg;
+ shm_toc *toc;
+ void *dsa_space;
+ void *typmod_registry_space;
+ dsa_area *dsa;
+ MemoryContext old_context;
+
+ old_context = MemoryContextSwitchTo(TopMemoryContext);
+
+ /* Attach to the DSM segment. */
+ seg = dsm_attach(handle);
+ if (seg == NULL)
+ elog(ERROR, "could not attach to per-session DSM segment");
+ toc = shm_toc_attach(SESSION_MAGIC, dsm_segment_address(seg));
+
+ /* Attach to the DSA area. */
+ dsa_space = shm_toc_lookup(toc, SESSION_KEY_DSA, false);
+ dsa = dsa_attach_in_place(dsa_space, seg);
+
+ /* Make them available via the current session. */
+ CurrentSession->segment = seg;
+ CurrentSession->area = dsa;
+
+ /* Attach to the shared record typmod registry. */
+ typmod_registry_space =
+ shm_toc_lookup(toc, SESSION_KEY_RECORD_TYPMOD_REGISTRY, false);
+ SharedRecordTypmodRegistryAttach((SharedRecordTypmodRegistry *)
+ typmod_registry_space);
+
+ /* Remain attached until end of backend or DetachSession(). */
+ dsm_pin_mapping(seg);
+ dsa_pin_mapping(dsa);
+
+ MemoryContextSwitchTo(old_context);
+}
+
+/*
+ * Detach from the current session DSM segment. It's not strictly necessary
+ * to do this explicitly since we'll detach automatically at backend exit, but
+ * if we ever reuse parallel workers it will become important for workers to
+ * detach from one session before attaching to another. Note that this runs
+ * detach hooks.
+ */
+void
+DetachSession(void)
+{
+ /* Runs detach hooks. */
+ dsm_detach(CurrentSession->segment);
+ CurrentSession->segment = NULL;
+ dsa_detach(CurrentSession->area);
+ CurrentSession->area = NULL;
+}
#include "access/heapam.h"
#include "access/htup_details.h"
#include "access/nbtree.h"
+#include "access/parallel.h"
+#include "access/session.h"
#include "catalog/indexing.h"
#include "catalog/pg_am.h"
#include "catalog/pg_constraint.h"
#include "catalog/pg_type.h"
#include "commands/defrem.h"
#include "executor/executor.h"
+#include "lib/dshash.h"
#include "optimizer/planner.h"
+#include "storage/lwlock.h"
#include "utils/builtins.h"
#include "utils/catcache.h"
#include "utils/fmgroids.h"
TupleDesc tupdesc;
} RecordCacheEntry;
+/*
+ * To deal with non-anonymous record types that are exchanged by backends
+ * involved in a parallel query, we also need a shared verion of the above.
+ */
+struct SharedRecordTypmodRegistry
+{
+ /* A hash table for finding a matching TupleDesc. */
+ dshash_table_handle record_table_handle;
+ /* A hash table for finding a TupleDesc by typmod. */
+ dshash_table_handle typmod_table_handle;
+ /* A source of new record typmod numbers. */
+ pg_atomic_uint32 next_typmod;
+};
+
+/*
+ * When using shared tuple descriptors as hash table keys we need a way to be
+ * able to search for an equal shared TupleDesc using a backend-local
+ * TupleDesc. So we use this type which can hold either, and hash and compare
+ * functions that know how to handle both.
+ */
+typedef struct SharedRecordTableKey
+{
+ union
+ {
+ TupleDesc local_tupdesc;
+ dsa_pointer shared_tupdesc;
+ };
+ bool shared;
+} SharedRecordTableKey;
+
+/*
+ * The shared version of RecordCacheEntry. This lets us look up a typmod
+ * using a TupleDesc which may be in local or shared memory.
+ */
+typedef struct SharedRecordTableEntry
+{
+ SharedRecordTableKey key;
+} SharedRecordTableEntry;
+
+/*
+ * An entry in SharedRecordTypmodRegistry's typmod table. This lets us look
+ * up a TupleDesc in shared memory using a typmod.
+ */
+typedef struct SharedTypmodTableEntry
+{
+ uint32 typmod;
+ dsa_pointer shared_tupdesc;
+} SharedTypmodTableEntry;
+
+/*
+ * A comparator function for SharedTupleDescTableKey.
+ */
+static int
+shared_record_table_compare(const void *a, const void *b, size_t size,
+ void *arg)
+{
+ dsa_area *area = (dsa_area *) arg;
+ SharedRecordTableKey *k1 = (SharedRecordTableKey *) a;
+ SharedRecordTableKey *k2 = (SharedRecordTableKey *) b;
+ TupleDesc t1;
+ TupleDesc t2;
+
+ if (k1->shared)
+ t1 = (TupleDesc) dsa_get_address(area, k1->shared_tupdesc);
+ else
+ t1 = k1->local_tupdesc;
+
+ if (k2->shared)
+ t2 = (TupleDesc) dsa_get_address(area, k2->shared_tupdesc);
+ else
+ t2 = k2->local_tupdesc;
+
+ return equalTupleDescs(t1, t2) ? 0 : 1;
+}
+
+/*
+ * A hash function for SharedRecordTableKey.
+ */
+static uint32
+shared_record_table_hash(const void *a, size_t size, void *arg)
+{
+ dsa_area *area = (dsa_area *) arg;
+ SharedRecordTableKey *k = (SharedRecordTableKey *) a;
+ TupleDesc t;
+
+ if (k->shared)
+ t = (TupleDesc) dsa_get_address(area, k->shared_tupdesc);
+ else
+ t = k->local_tupdesc;
+
+ return hashTupleDesc(t);
+}
+
+/* Parameters for SharedRecordTypmodRegistry's TupleDesc table. */
+static const dshash_parameters srtr_record_table_params = {
+ sizeof(SharedRecordTableKey), /* unused */
+ sizeof(SharedRecordTableEntry),
+ shared_record_table_compare,
+ shared_record_table_hash,
+ LWTRANCHE_SESSION_RECORD_TABLE
+};
+
+/* Parameters for SharedRecordTypmodRegistry's typmod hash table. */
+static const dshash_parameters srtr_typmod_table_params = {
+ sizeof(uint32),
+ sizeof(SharedTypmodTableEntry),
+ dshash_memcmp,
+ dshash_memhash,
+ LWTRANCHE_SESSION_TYPMOD_TABLE
+};
+
static HTAB *RecordCacheHash = NULL;
static TupleDesc *RecordCacheArray = NULL;
static void load_enum_cache_data(TypeCacheEntry *tcache);
static EnumItem *find_enumitem(TypeCacheEnumData *enumdata, Oid arg);
static int enum_oid_cmp(const void *left, const void *right);
+static void shared_record_typmod_registry_detach(dsm_segment *segment,
+ Datum datum);
+static void shared_record_typmod_registry_worker_detach(dsm_segment *segment,
+ Datum datum);
+static TupleDesc find_or_make_matching_shared_tupledesc(TupleDesc tupdesc);
+static dsa_pointer share_tupledesc(dsa_area *area, TupleDesc tupdesc,
+ uint32 typmod);
/*
/*
* Reset info about hash functions whenever we pick up new info about
- * equality operator. This is so we can ensure that the hash functions
- * match the operator.
+ * equality operator. This is so we can ensure that the hash
+ * functions match the operator.
*/
typentry->flags &= ~(TCFLAGS_CHECKED_HASH_PROC);
typentry->flags &= ~(TCFLAGS_CHECKED_HASH_EXTENDED_PROC);
typentry->flags |= TCFLAGS_CHECKED_FIELD_PROPERTIES;
}
+/*
+ * Make sure that RecordCacheArray is large enough to store 'typmod'.
+ */
+static void
+ensure_record_cache_typmod_slot_exists(int32 typmod)
+{
+ if (RecordCacheArray == NULL)
+ {
+ RecordCacheArray = (TupleDesc *)
+ MemoryContextAllocZero(CacheMemoryContext, 64 * sizeof(TupleDesc));
+ RecordCacheArrayLen = 64;
+ }
+
+ if (typmod >= RecordCacheArrayLen)
+ {
+ int32 newlen = RecordCacheArrayLen * 2;
+
+ while (typmod >= newlen)
+ newlen *= 2;
+
+ RecordCacheArray = (TupleDesc *) repalloc(RecordCacheArray,
+ newlen * sizeof(TupleDesc));
+ memset(RecordCacheArray + RecordCacheArrayLen, 0,
+ (newlen - RecordCacheArrayLen) * sizeof(TupleDesc *));
+ RecordCacheArrayLen = newlen;
+ }
+}
/*
* lookup_rowtype_tupdesc_internal --- internal routine to lookup a rowtype
/*
* It's a transient record type, so look in our record-type table.
*/
- if (typmod < 0 || typmod >= NextRecordTypmod)
+ if (typmod >= 0)
{
- if (!noError)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("record type has not been registered")));
- return NULL;
+ /* It is already in our local cache? */
+ if (typmod < RecordCacheArrayLen &&
+ RecordCacheArray[typmod] != NULL)
+ return RecordCacheArray[typmod];
+
+ /* Are we attached to a shared record typmod registry? */
+ if (CurrentSession->shared_typmod_registry != NULL)
+ {
+ SharedTypmodTableEntry *entry;
+
+ /* Try to find it in the shared typmod index. */
+ entry = dshash_find(CurrentSession->shared_typmod_table,
+ &typmod, false);
+ if (entry != NULL)
+ {
+ TupleDesc tupdesc;
+
+ tupdesc = (TupleDesc)
+ dsa_get_address(CurrentSession->area,
+ entry->shared_tupdesc);
+ Assert(typmod == tupdesc->tdtypmod);
+
+ /* We may need to extend the local RecordCacheArray. */
+ ensure_record_cache_typmod_slot_exists(typmod);
+
+ /*
+ * Our local array can now point directly to the TupleDesc
+ * in shared memory.
+ */
+ RecordCacheArray[typmod] = tupdesc;
+ Assert(tupdesc->tdrefcount == -1);
+
+ dshash_release_lock(CurrentSession->shared_typmod_table,
+ entry);
+
+ return RecordCacheArray[typmod];
+ }
+ }
}
- return RecordCacheArray[typmod];
+
+ if (!noError)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("record type has not been registered")));
+ return NULL;
}
}
TupleDesc tupDesc;
tupDesc = lookup_rowtype_tupdesc_internal(type_id, typmod, false);
- IncrTupleDescRefCount(tupDesc);
+ PinTupleDesc(tupDesc);
return tupDesc;
}
tupDesc = lookup_rowtype_tupdesc_internal(type_id, typmod, noError);
if (tupDesc != NULL)
- IncrTupleDescRefCount(tupDesc);
+ PinTupleDesc(tupDesc);
return tupDesc;
}
RecordCacheEntry *recentry;
TupleDesc entDesc;
bool found;
- int32 newtypmod;
MemoryContext oldcxt;
Assert(tupDesc->tdtypeid == RECORDOID);
recentry->tupdesc = NULL;
oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
- if (RecordCacheArray == NULL)
+ /* Look in the SharedRecordTypmodRegistry, if attached */
+ entDesc = find_or_make_matching_shared_tupledesc(tupDesc);
+ if (entDesc == NULL)
{
- RecordCacheArray = (TupleDesc *) palloc(64 * sizeof(TupleDesc));
- RecordCacheArrayLen = 64;
+ /* Reference-counted local cache only. */
+ entDesc = CreateTupleDescCopy(tupDesc);
+ entDesc->tdrefcount = 1;
+ entDesc->tdtypmod = NextRecordTypmod++;
}
- else if (NextRecordTypmod >= RecordCacheArrayLen)
+ ensure_record_cache_typmod_slot_exists(entDesc->tdtypmod);
+ RecordCacheArray[entDesc->tdtypmod] = entDesc;
+ recentry->tupdesc = entDesc;
+
+ /* Update the caller's tuple descriptor. */
+ tupDesc->tdtypmod = entDesc->tdtypmod;
+
+ MemoryContextSwitchTo(oldcxt);
+}
+
+/*
+ * Return the amout of shmem required to hold a SharedRecordTypmodRegistry.
+ * This exists only to avoid exposing private innards of
+ * SharedRecordTypmodRegistry in a header.
+ */
+size_t
+SharedRecordTypmodRegistryEstimate(void)
+{
+ return sizeof(SharedRecordTypmodRegistry);
+}
+
+/*
+ * Initialize 'registry' in a pre-existing shared memory region, which must be
+ * maximally aligned and have space for SharedRecordTypmodRegistryEstimate()
+ * bytes.
+ *
+ * 'area' will be used to allocate shared memory space as required for the
+ * typemod registration. The current process, expected to be a leader process
+ * in a parallel query, will be attached automatically and its current record
+ * types will be loaded into *registry. While attached, all calls to
+ * assign_record_type_typmod will use the shared registry. Worker backends
+ * will need to attach explicitly.
+ *
+ * Note that this function takes 'area' and 'segment' as arguments rather than
+ * accessing them via CurrentSession, because they aren't installed there
+ * until after this function runs.
+ */
+void
+SharedRecordTypmodRegistryInit(SharedRecordTypmodRegistry *registry,
+ dsm_segment *segment,
+ dsa_area *area)
+{
+ MemoryContext old_context;
+ dshash_table *record_table;
+ dshash_table *typmod_table;
+ int32 typmod;
+
+ Assert(!IsParallelWorker());
+
+ /* We can't already be attached to a shared registry. */
+ Assert(CurrentSession->shared_typmod_registry == NULL);
+ Assert(CurrentSession->shared_record_table == NULL);
+ Assert(CurrentSession->shared_typmod_table == NULL);
+
+ old_context = MemoryContextSwitchTo(TopMemoryContext);
+
+ /* Create the hash table of tuple descriptors indexed by themselves. */
+ record_table = dshash_create(area, &srtr_record_table_params, area);
+
+ /* Create the hash table of tuple descriptors indexed by typmod. */
+ typmod_table = dshash_create(area, &srtr_typmod_table_params, NULL);
+
+ MemoryContextSwitchTo(old_context);
+
+ /* Initialize the SharedRecordTypmodRegistry. */
+ registry->record_table_handle = dshash_get_hash_table_handle(record_table);
+ registry->typmod_table_handle = dshash_get_hash_table_handle(typmod_table);
+ pg_atomic_init_u32(®istry->next_typmod, NextRecordTypmod);
+
+ /*
+ * Copy all entries from this backend's private registry into the shared
+ * registry.
+ */
+ for (typmod = 0; typmod < NextRecordTypmod; ++typmod)
{
- int32 newlen = RecordCacheArrayLen * 2;
+ SharedTypmodTableEntry *typmod_table_entry;
+ SharedRecordTableEntry *record_table_entry;
+ SharedRecordTableKey record_table_key;
+ dsa_pointer shared_dp;
+ TupleDesc tupdesc;
+ bool found;
- RecordCacheArray = (TupleDesc *) repalloc(RecordCacheArray,
- newlen * sizeof(TupleDesc));
- RecordCacheArrayLen = newlen;
+ tupdesc = RecordCacheArray[typmod];
+ if (tupdesc == NULL)
+ continue;
+
+ /* Copy the TupleDesc into shared memory. */
+ shared_dp = share_tupledesc(area, tupdesc, typmod);
+
+ /* Insert into the typmod table. */
+ typmod_table_entry = dshash_find_or_insert(typmod_table,
+ &tupdesc->tdtypmod,
+ &found);
+ if (found)
+ elog(ERROR, "cannot create duplicate shared record typmod");
+ typmod_table_entry->typmod = tupdesc->tdtypmod;
+ typmod_table_entry->shared_tupdesc = shared_dp;
+ dshash_release_lock(typmod_table, typmod_table_entry);
+
+ /* Insert into the record table. */
+ record_table_key.shared = false;
+ record_table_key.local_tupdesc = tupdesc;
+ record_table_entry = dshash_find_or_insert(record_table,
+ &record_table_key,
+ &found);
+ if (!found)
+ {
+ record_table_entry->key.shared = true;
+ record_table_entry->key.shared_tupdesc = shared_dp;
+ }
+ dshash_release_lock(record_table, record_table_entry);
}
- /* if fail in subrs, no damage except possibly some wasted memory... */
- entDesc = CreateTupleDescCopy(tupDesc);
- recentry->tupdesc = entDesc;
- /* mark it as a reference-counted tupdesc */
- entDesc->tdrefcount = 1;
- /* now it's safe to advance NextRecordTypmod */
- newtypmod = NextRecordTypmod++;
- entDesc->tdtypmod = newtypmod;
- RecordCacheArray[newtypmod] = entDesc;
+ /*
+ * Set up the global state that will tell assign_record_type_typmod and
+ * lookup_rowtype_tupdesc_internal about the shared registry.
+ */
+ CurrentSession->shared_record_table = record_table;
+ CurrentSession->shared_typmod_table = typmod_table;
+ CurrentSession->shared_typmod_registry = registry;
- /* report to caller as well */
- tupDesc->tdtypmod = newtypmod;
+ /*
+ * We install a detach hook in the leader, but only to handle cleanup on
+ * failure during GetSessionDsmHandle(). Once GetSessionDsmHandle() pins
+ * the memory, the leader process will use a shared registry until it
+ * exits.
+ */
+ on_dsm_detach(segment, shared_record_typmod_registry_detach, (Datum) 0);
+}
- MemoryContextSwitchTo(oldcxt);
+/*
+ * Attach to 'registry', which must have been initialized already by another
+ * backend. Future calls to assign_record_type_typmod and
+ * lookup_rowtype_tupdesc_internal will use the shared registry until the
+ * current session is detached.
+ */
+void
+SharedRecordTypmodRegistryAttach(SharedRecordTypmodRegistry *registry)
+{
+ MemoryContext old_context;
+ dshash_table *record_table;
+ dshash_table *typmod_table;
+
+ Assert(IsParallelWorker());
+
+ /* We can't already be attached to a shared registry. */
+ Assert(CurrentSession != NULL);
+ Assert(CurrentSession->segment != NULL);
+ Assert(CurrentSession->area != NULL);
+ Assert(CurrentSession->shared_typmod_registry == NULL);
+ Assert(CurrentSession->shared_record_table == NULL);
+ Assert(CurrentSession->shared_typmod_table == NULL);
+
+ /*
+ * We can't already have typmods in our local cache, because they'd clash
+ * with those imported by SharedRecordTypmodRegistryInit. This should be
+ * a freshly started parallel worker. If we ever support worker
+ * recycling, a worker would need to zap its local cache in between
+ * servicing different queries, in order to be able to call this and
+ * synchronize typmods with a new leader; see
+ * shared_record_typmod_registry_detach().
+ */
+ Assert(NextRecordTypmod == 0);
+
+ old_context = MemoryContextSwitchTo(TopMemoryContext);
+
+ /* Attach to the two hash tables. */
+ record_table = dshash_attach(CurrentSession->area,
+ &srtr_record_table_params,
+ registry->record_table_handle,
+ CurrentSession->area);
+ typmod_table = dshash_attach(CurrentSession->area,
+ &srtr_typmod_table_params,
+ registry->typmod_table_handle,
+ NULL);
+
+ MemoryContextSwitchTo(old_context);
+
+ /*
+ * We install a different detach callback that performs a more complete
+ * reset of backend local state.
+ */
+ on_dsm_detach(CurrentSession->segment,
+ shared_record_typmod_registry_worker_detach,
+ PointerGetDatum(registry));
+
+ /*
+ * Set up the session state that will tell assign_record_type_typmod and
+ * lookup_rowtype_tupdesc_internal about the shared registry.
+ */
+ CurrentSession->shared_typmod_registry = registry;
+ CurrentSession->shared_record_table = record_table;
+ CurrentSession->shared_typmod_table = typmod_table;
}
/*
else
return 0;
}
+
+/*
+ * Copy 'tupdesc' into newly allocated shared memory in 'area', set its typmod
+ * to the given value and return a dsa_pointer.
+ */
+static dsa_pointer
+share_tupledesc(dsa_area *area, TupleDesc tupdesc, uint32 typmod)
+{
+ dsa_pointer shared_dp;
+ TupleDesc shared;
+
+ shared_dp = dsa_allocate(area, TupleDescSize(tupdesc));
+ shared = (TupleDesc) dsa_get_address(area, shared_dp);
+ TupleDescCopy(shared, tupdesc);
+ shared->tdtypmod = typmod;
+
+ return shared_dp;
+}
+
+/*
+ * If we are attached to a SharedRecordTypmodRegistry, use it to find or
+ * create a shared TupleDesc that matches 'tupdesc'. Otherwise return NULL.
+ * Tuple descriptors returned by this function are not reference counted, and
+ * will exist at least as long as the current backend remained attached to the
+ * current session.
+ */
+static TupleDesc
+find_or_make_matching_shared_tupledesc(TupleDesc tupdesc)
+{
+ TupleDesc result;
+ SharedRecordTableKey key;
+ SharedRecordTableEntry *record_table_entry;
+ SharedTypmodTableEntry *typmod_table_entry;
+ dsa_pointer shared_dp;
+ bool found;
+ uint32 typmod;
+
+ /* If not even attached, nothing to do. */
+ if (CurrentSession->shared_typmod_registry == NULL)
+ return NULL;
+
+ /* Try to find a matching tuple descriptor in the record table. */
+ key.shared = false;
+ key.local_tupdesc = tupdesc;
+ record_table_entry = (SharedRecordTableEntry *)
+ dshash_find(CurrentSession->shared_record_table, &key, false);
+ if (record_table_entry)
+ {
+ Assert(record_table_entry->key.shared);
+ dshash_release_lock(CurrentSession->shared_record_table,
+ record_table_entry);
+ result = (TupleDesc)
+ dsa_get_address(CurrentSession->area,
+ record_table_entry->key.shared_tupdesc);
+ Assert(result->tdrefcount == -1);
+
+ return result;
+ }
+
+ /* Allocate a new typmod number. This will be wasted if we error out. */
+ typmod = (int)
+ pg_atomic_fetch_add_u32(&CurrentSession->shared_typmod_registry->next_typmod,
+ 1);
+
+ /* Copy the TupleDesc into shared memory. */
+ shared_dp = share_tupledesc(CurrentSession->area, tupdesc, typmod);
+
+ /*
+ * Create an entry in the typmod table so that others will understand this
+ * typmod number.
+ */
+ PG_TRY();
+ {
+ typmod_table_entry = (SharedTypmodTableEntry *)
+ dshash_find_or_insert(CurrentSession->shared_typmod_table,
+ &typmod, &found);
+ if (found)
+ elog(ERROR, "cannot create duplicate shared record typmod");
+ }
+ PG_CATCH();
+ {
+ dsa_free(CurrentSession->area, shared_dp);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+ typmod_table_entry->typmod = typmod;
+ typmod_table_entry->shared_tupdesc = shared_dp;
+ dshash_release_lock(CurrentSession->shared_typmod_table,
+ typmod_table_entry);
+
+ /*
+ * Finally create an entry in the record table so others with matching
+ * tuple descriptors can reuse the typmod.
+ */
+ record_table_entry = (SharedRecordTableEntry *)
+ dshash_find_or_insert(CurrentSession->shared_record_table, &key,
+ &found);
+ if (found)
+ {
+ /*
+ * Someone concurrently inserted a matching tuple descriptor since the
+ * first time we checked. Use that one instead.
+ */
+ dshash_release_lock(CurrentSession->shared_record_table,
+ record_table_entry);
+
+ /* Might as well free up the space used by the one we created. */
+ found = dshash_delete_key(CurrentSession->shared_typmod_table,
+ &typmod);
+ Assert(found);
+ dsa_free(CurrentSession->area, shared_dp);
+
+ /* Return the one we found. */
+ Assert(record_table_entry->key.shared);
+ result = (TupleDesc)
+ dsa_get_address(CurrentSession->area,
+ record_table_entry->key.shared);
+ Assert(result->tdrefcount == -1);
+
+ return result;
+ }
+
+ /* Store it and return it. */
+ record_table_entry->key.shared = true;
+ record_table_entry->key.shared_tupdesc = shared_dp;
+ dshash_release_lock(CurrentSession->shared_record_table,
+ record_table_entry);
+ result = (TupleDesc)
+ dsa_get_address(CurrentSession->area, shared_dp);
+ Assert(result->tdrefcount == -1);
+
+ return result;
+}
+
+/*
+ * Detach hook to forget about the current shared record typmod
+ * infrastructure. This is registered directly in leader backends, and
+ * reached only in case of error or shutdown. It's also reached indirectly
+ * via the worker detach callback below.
+ */
+static void
+shared_record_typmod_registry_detach(dsm_segment *segment, Datum datum)
+{
+ /* Be cautious here: maybe we didn't finish initializing. */
+ if (CurrentSession->shared_record_table != NULL)
+ {
+ dshash_detach(CurrentSession->shared_record_table);
+ CurrentSession->shared_record_table = NULL;
+ }
+ if (CurrentSession->shared_typmod_table != NULL)
+ {
+ dshash_detach(CurrentSession->shared_typmod_table);
+ CurrentSession->shared_typmod_table = NULL;
+ }
+ CurrentSession->shared_typmod_registry = NULL;
+}
+
+/*
+ * Deatch hook allowing workers to disconnect from shared record typmod
+ * registry. The resulting state should allow a worker to attach to a
+ * different leader, if worker reuse pools are invented.
+ */
+static void
+shared_record_typmod_registry_worker_detach(dsm_segment *segment, Datum datum)
+{
+ /*
+ * Forget everything we learned about record typmods as part of the
+ * session we are disconnecting from, and return to the initial state.
+ */
+ if (RecordCacheArray != NULL)
+ {
+ int32 i;
+
+ for (i = 0; i < RecordCacheArrayLen; ++i)
+ {
+ if (RecordCacheArray[i] != NULL)
+ {
+ TupleDesc tupdesc = RecordCacheArray[i];
+
+ /*
+ * Pointers to tuple descriptors in shared memory are not
+ * reference counted, so we are not responsible for freeing
+ * them. They'll survive as long as the shared session
+ * exists, which should be as long as the owning leader
+ * backend exists. In theory we do need to free local
+ * reference counted tuple descriptors however, and we can't
+ * do that with DescTupleDescRefCount() because we aren't
+ * using a resource owner. In practice we don't expect to
+ * find any non-shared TupleDesc object in a worker.
+ */
+ if (tupdesc->tdrefcount != -1)
+ {
+ Assert(tupdesc->tdrefcount > 0);
+ if (--tupdesc->tdrefcount == 0)
+ FreeTupleDesc(tupdesc);
+ }
+ }
+ }
+ pfree(RecordCacheArray);
+ RecordCacheArray = NULL;
+ }
+ if (RecordCacheHash != NULL)
+ {
+ hash_destroy(RecordCacheHash);
+ RecordCacheHash = NULL;
+ }
+ NextRecordTypmod = 0;
+ /* Call the code common to leader and worker detach. */
+ shared_record_typmod_registry_detach(segment, datum);
+}