* tqueue.c
* Use shm_mq to send & receive tuples between parallel backends
*
+ * Most of the complexity in this module arises from transient RECORD types,
+ * which all have type RECORDOID and are distinguished by typmod numbers
+ * that are managed per-backend (see src/backend/utils/cache/typcache.c).
+ * The sender's set of RECORD typmod assignments probably doesn't match the
+ * receiver's. To deal with this, we make the sender send a description
+ * of each transient RECORD type appearing in the data it sends. The
+ * receiver finds or creates a matching type in its own typcache, and then
+ * maps the sender's typmod for that type to its own typmod.
+ *
* A DestReceiver of type DestTupleQueue, which is a TQueueDestReceiver
* under the hood, writes tuples from the executor to a shm_mq. If
* necessary, it also writes control messages describing transient
* record types used within the tuple.
*
- * A TupleQueueReader reads tuples, and if any are sent control messages,
+ * A TupleQueueReader reads tuples, and control messages if any are sent,
* from a shm_mq and returns the tuples. If transient record types are
- * in use, it registers those types based on the received control messages
- * and rewrites the typemods sent by the remote side to the corresponding
- * local record typemods.
+ * in use, it registers those types locally based on the control messages
+ * and rewrites the typmods sent by the remote side to the corresponding
+ * local record typmods.
*
* Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
#include "utils/syscache.h"
#include "utils/typcache.h"
+
+/*
+ * The data transferred through the shm_mq is divided into messages.
+ * One-byte messages are mode-switch messages, telling the receiver to switch
+ * between "control" and "data" modes. (We always start up in "data" mode.)
+ * Otherwise, when in "data" mode, each message is a tuple. When in "control"
+ * mode, each message defines one transient-typmod-to-tupledesc mapping to
+ * let us interpret future tuples. Both of those cases certainly require
+ * more than one byte, so no confusion is possible.
+ */
+#define TUPLE_QUEUE_MODE_CONTROL 'c' /* mode-switch message contents */
+#define TUPLE_QUEUE_MODE_DATA 'd'
+
+/*
+ * Both the sender and receiver build trees of TupleRemapInfo nodes to help
+ * them identify which (sub) fields of transmitted tuples are composite and
+ * may thus need remap processing. We might need to look within arrays and
+ * ranges, not only composites, to find composite sub-fields. A NULL
+ * TupleRemapInfo pointer indicates that it is known that the described field
+ * is not composite and has no composite substructure.
+ *
+ * Note that we currently have to look at each composite field at runtime,
+ * even if we believe it's of a named composite type (i.e., not RECORD).
+ * This is because we allow the actual value to be a compatible transient
+ * RECORD type. That's grossly inefficient, and it would be good to get
+ * rid of the requirement, but it's not clear what would need to change.
+ *
+ * Also, we allow the top-level tuple structure, as well as the actual
+ * structure of composite subfields, to change from one tuple to the next
+ * at runtime. This may well be entirely historical, but it's mostly free
+ * to support given the previous requirement; and other places in the system
+ * also permit this, so it's not entirely clear if we could drop it.
+ */
+
typedef enum
{
- TQUEUE_REMAP_NONE, /* no special processing required */
TQUEUE_REMAP_ARRAY, /* array */
TQUEUE_REMAP_RANGE, /* range */
- TQUEUE_REMAP_RECORD /* composite type, named or anonymous */
-} RemapClass;
+ TQUEUE_REMAP_RECORD /* composite type, named or transient */
+} TupleRemapClass;
-typedef struct
+typedef struct TupleRemapInfo TupleRemapInfo;
+
+typedef struct ArrayRemapInfo
{
- int natts;
- RemapClass mapping[FLEXIBLE_ARRAY_MEMBER];
-} RemapInfo;
+ int16 typlen; /* array element type's storage properties */
+ bool typbyval;
+ char typalign;
+ TupleRemapInfo *element_remap; /* array element type's remap info */
+} ArrayRemapInfo;
-typedef struct
+typedef struct RangeRemapInfo
{
- DestReceiver pub;
- shm_mq_handle *handle;
- MemoryContext tmpcontext;
- HTAB *recordhtab;
- char mode;
- TupleDesc tupledesc;
- RemapInfo *remapinfo;
+ TypeCacheEntry *typcache; /* range type's typcache entry */
+ TupleRemapInfo *bound_remap; /* range bound type's remap info */
+} RangeRemapInfo;
+
+typedef struct RecordRemapInfo
+{
+ /* Original (remote) type ID info last seen for this composite field */
+ Oid rectypid;
+ int32 rectypmod;
+ /* Local RECORD typmod, or -1 if unset; not used on sender side */
+ int32 localtypmod;
+ /* If no fields of the record require remapping, these are NULL: */
+ TupleDesc tupledesc; /* copy of record's tupdesc */
+ TupleRemapInfo **field_remap; /* each field's remap info */
+} RecordRemapInfo;
+
+struct TupleRemapInfo
+{
+ TupleRemapClass remapclass;
+ union
+ {
+ ArrayRemapInfo arr;
+ RangeRemapInfo rng;
+ RecordRemapInfo rec;
+ } u;
+};
+
+/*
+ * DestReceiver object's private contents
+ *
+ * queue and tupledesc are pointers to data supplied by DestReceiver's caller.
+ * The recordhtab and remap info are owned by the DestReceiver and are kept
+ * in mycontext. tmpcontext is a tuple-lifespan context to hold cruft
+ * created while traversing each tuple to find record subfields.
+ */
+typedef struct TQueueDestReceiver
+{
+ DestReceiver pub; /* public fields */
+ shm_mq_handle *queue; /* shm_mq to send to */
+ MemoryContext mycontext; /* context containing TQueueDestReceiver */
+ MemoryContext tmpcontext; /* per-tuple context, if needed */
+ HTAB *recordhtab; /* table of transmitted typmods, if needed */
+ char mode; /* current message mode */
+ TupleDesc tupledesc; /* current top-level tuple descriptor */
+ TupleRemapInfo **field_remapinfo; /* current top-level remap info */
} TQueueDestReceiver;
-typedef struct RecordTypemodMap
+/*
+ * Hash table entries for mapping remote to local typmods.
+ */
+typedef struct RecordTypmodMap
{
- int remotetypmod;
- int localtypmod;
-} RecordTypemodMap;
+ int32 remotetypmod; /* hash key (must be first!) */
+ int32 localtypmod;
+} RecordTypmodMap;
+/*
+ * TupleQueueReader object's private contents
+ *
+ * queue and tupledesc are pointers to data supplied by reader's caller.
+ * The typmodmap and remap info are owned by the TupleQueueReader and
+ * are kept in mycontext.
+ *
+ * "typedef struct TupleQueueReader TupleQueueReader" is in tqueue.h
+ */
struct TupleQueueReader
{
- shm_mq_handle *queue;
- char mode;
- TupleDesc tupledesc;
- RemapInfo *remapinfo;
- HTAB *typmodmap;
+ shm_mq_handle *queue; /* shm_mq to receive from */
+ MemoryContext mycontext; /* context containing TupleQueueReader */
+ HTAB *typmodmap; /* RecordTypmodMap hashtable, if needed */
+ char mode; /* current message mode */
+ TupleDesc tupledesc; /* current top-level tuple descriptor */
+ TupleRemapInfo **field_remapinfo; /* current top-level remap info */
};
-#define TUPLE_QUEUE_MODE_CONTROL 'c'
-#define TUPLE_QUEUE_MODE_DATA 'd'
-
-static void tqueueWalk(TQueueDestReceiver *tqueue, RemapClass walktype,
- Datum value);
-static void tqueueWalkRecord(TQueueDestReceiver *tqueue, Datum value);
-static void tqueueWalkArray(TQueueDestReceiver *tqueue, Datum value);
-static void tqueueWalkRange(TQueueDestReceiver *tqueue, Datum value);
-static void tqueueSendTypmodInfo(TQueueDestReceiver *tqueue, int typmod,
- TupleDesc tupledesc);
+/* Local function prototypes */
+static void TQExamine(TQueueDestReceiver *tqueue,
+ TupleRemapInfo *remapinfo,
+ Datum value);
+static void TQExamineArray(TQueueDestReceiver *tqueue,
+ ArrayRemapInfo *remapinfo,
+ Datum value);
+static void TQExamineRange(TQueueDestReceiver *tqueue,
+ RangeRemapInfo *remapinfo,
+ Datum value);
+static void TQExamineRecord(TQueueDestReceiver *tqueue,
+ RecordRemapInfo *remapinfo,
+ Datum value);
+static void TQSendRecordInfo(TQueueDestReceiver *tqueue, int32 typmod,
+ TupleDesc tupledesc);
static void TupleQueueHandleControlMessage(TupleQueueReader *reader,
Size nbytes, char *data);
static HeapTuple TupleQueueHandleDataMessage(TupleQueueReader *reader,
Size nbytes, HeapTupleHeader data);
-static HeapTuple TupleQueueRemapTuple(TupleQueueReader *reader,
- TupleDesc tupledesc, RemapInfo *remapinfo,
- HeapTuple tuple);
-static Datum TupleQueueRemap(TupleQueueReader *reader, RemapClass remapclass,
- Datum value);
-static Datum TupleQueueRemapArray(TupleQueueReader *reader, Datum value);
-static Datum TupleQueueRemapRange(TupleQueueReader *reader, Datum value);
-static Datum TupleQueueRemapRecord(TupleQueueReader *reader, Datum value);
-static RemapClass GetRemapClass(Oid typeid);
-static RemapInfo *BuildRemapInfo(TupleDesc tupledesc);
+static HeapTuple TQRemapTuple(TupleQueueReader *reader,
+ TupleDesc tupledesc,
+ TupleRemapInfo **field_remapinfo,
+ HeapTuple tuple);
+static Datum TQRemap(TupleQueueReader *reader, TupleRemapInfo *remapinfo,
+ Datum value, bool *changed);
+static Datum TQRemapArray(TupleQueueReader *reader, ArrayRemapInfo *remapinfo,
+ Datum value, bool *changed);
+static Datum TQRemapRange(TupleQueueReader *reader, RangeRemapInfo *remapinfo,
+ Datum value, bool *changed);
+static Datum TQRemapRecord(TupleQueueReader *reader, RecordRemapInfo *remapinfo,
+ Datum value, bool *changed);
+static TupleRemapInfo *BuildTupleRemapInfo(Oid typid, MemoryContext mycontext);
+static TupleRemapInfo *BuildArrayRemapInfo(Oid elemtypid,
+ MemoryContext mycontext);
+static TupleRemapInfo *BuildRangeRemapInfo(Oid rngtypid,
+ MemoryContext mycontext);
+static TupleRemapInfo **BuildFieldRemapInfo(TupleDesc tupledesc,
+ MemoryContext mycontext);
+
/*
- * Receive a tuple.
+ * Receive a tuple from a query, and send it to the designated shm_mq.
*
- * This is, at core, pretty simple: just send the tuple to the designated
- * shm_mq. The complicated part is that if the tuple contains transient
- * record types (see lookup_rowtype_tupdesc), we need to send control
- * information to the shm_mq receiver so that those typemods can be correctly
- * interpreted, as they are merely held in a backend-local cache. Worse, the
- * record type may not at the top level: we could have a range over an array
- * type over a range type over a range type over an array type over a record,
- * or something like that.
+ * Returns TRUE if successful, FALSE if shm_mq has been detached.
*/
static bool
tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
shm_mq_result result;
/*
- * Test to see whether the tupledesc has changed; if so, set up for the
- * new tupledesc. This is a strange test both because the executor really
+ * If first time through, compute remapping info for the top-level fields.
+ * On later calls, if the tupledesc has changed, set up for the new
+ * tupledesc. (This is a strange test both because the executor really
* shouldn't change the tupledesc, and also because it would be unsafe if
* the old tupledesc could be freed and a new one allocated at the same
* address. But since some very old code in printtup.c uses a similar
- * test, we adopt it here as well.
+ * approach, we adopt it here as well.)
+ *
+ * Here and elsewhere in this module, when replacing remapping info we
+ * pfree the top-level object because that's easy, but we don't bother to
+ * recursively free any substructure. This would lead to query-lifespan
+ * memory leaks if the mapping info actually changed frequently, but since
+ * we don't expect that to happen, it doesn't seem worth expending code to
+ * prevent it.
*/
if (tqueue->tupledesc != tupledesc)
{
- if (tqueue->remapinfo != NULL)
- pfree(tqueue->remapinfo);
- tqueue->remapinfo = BuildRemapInfo(tupledesc);
+ /* Is it worth trying to free substructure of the remap tree? */
+ if (tqueue->field_remapinfo != NULL)
+ pfree(tqueue->field_remapinfo);
+ tqueue->field_remapinfo = BuildFieldRemapInfo(tupledesc,
+ tqueue->mycontext);
tqueue->tupledesc = tupledesc;
}
- tuple = ExecMaterializeSlot(slot);
-
/*
- * When, because of the types being transmitted, no record typemod mapping
+ * When, because of the types being transmitted, no record typmod mapping
* can be needed, we can skip a good deal of work.
*/
- if (tqueue->remapinfo != NULL)
+ if (tqueue->field_remapinfo != NULL)
{
- RemapInfo *remapinfo = tqueue->remapinfo;
- AttrNumber i;
+ TupleRemapInfo **remapinfo = tqueue->field_remapinfo;
+ int i;
MemoryContext oldcontext = NULL;
- /* Deform the tuple so we can examine it, if not done already. */
+ /* Deform the tuple so we can examine fields, if not done already. */
slot_getallattrs(slot);
- /* Iterate over each attribute and search it for transient typemods. */
- Assert(slot->tts_tupleDescriptor->natts == remapinfo->natts);
- for (i = 0; i < remapinfo->natts; ++i)
+ /* Iterate over each attribute and search it for transient typmods. */
+ for (i = 0; i < tupledesc->natts; i++)
{
/* Ignore nulls and types that don't need special handling. */
- if (slot->tts_isnull[i] ||
- remapinfo->mapping[i] == TQUEUE_REMAP_NONE)
+ if (slot->tts_isnull[i] || remapinfo[i] == NULL)
continue;
/* Switch to temporary memory context to avoid leaking. */
{
if (tqueue->tmpcontext == NULL)
tqueue->tmpcontext =
- AllocSetContextCreate(TopMemoryContext,
- "tqueue temporary context",
+ AllocSetContextCreate(tqueue->mycontext,
+ "tqueue sender temp context",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
oldcontext = MemoryContextSwitchTo(tqueue->tmpcontext);
}
- /* Invoke the appropriate walker function. */
- tqueueWalk(tqueue, remapinfo->mapping[i], slot->tts_values[i]);
+ /* Examine the value. */
+ TQExamine(tqueue, remapinfo[i], slot->tts_values[i]);
}
/* If we used the temp context, reset it and restore prior context. */
if (tqueue->mode != TUPLE_QUEUE_MODE_DATA)
{
tqueue->mode = TUPLE_QUEUE_MODE_DATA;
- shm_mq_send(tqueue->handle, sizeof(char), &tqueue->mode, false);
+ shm_mq_send(tqueue->queue, sizeof(char), &tqueue->mode, false);
}
}
/* Send the tuple itself. */
- result = shm_mq_send(tqueue->handle, tuple->t_len, tuple->t_data, false);
+ tuple = ExecMaterializeSlot(slot);
+ result = shm_mq_send(tqueue->queue, tuple->t_len, tuple->t_data, false);
+ /* Check for failure. */
if (result == SHM_MQ_DETACHED)
return false;
else if (result != SHM_MQ_SUCCESS)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("could not send tuples")));
+ errmsg("could not send tuple to shared-memory queue")));
return true;
}
/*
- * Invoke the appropriate walker function based on the given RemapClass.
+ * Examine the given datum and send any necessary control messages for
+ * transient record types contained in it.
+ *
+ * remapinfo is previously-computed remapping info about the datum's type.
+ *
+ * This function just dispatches based on the remap class.
*/
static void
-tqueueWalk(TQueueDestReceiver *tqueue, RemapClass walktype, Datum value)
+TQExamine(TQueueDestReceiver *tqueue, TupleRemapInfo *remapinfo, Datum value)
{
+ /* This is recursive, so it could be driven to stack overflow. */
check_stack_depth();
- switch (walktype)
+ switch (remapinfo->remapclass)
{
- case TQUEUE_REMAP_NONE:
- break;
case TQUEUE_REMAP_ARRAY:
- tqueueWalkArray(tqueue, value);
+ TQExamineArray(tqueue, &remapinfo->u.arr, value);
break;
case TQUEUE_REMAP_RANGE:
- tqueueWalkRange(tqueue, value);
+ TQExamineRange(tqueue, &remapinfo->u.rng, value);
break;
case TQUEUE_REMAP_RECORD:
- tqueueWalkRecord(tqueue, value);
+ TQExamineRecord(tqueue, &remapinfo->u.rec, value);
break;
}
}
/*
- * Walk a record and send control messages for transient record types
- * contained therein.
+ * Examine a record datum and send any necessary control messages for
+ * transient record types contained in it.
*/
static void
-tqueueWalkRecord(TQueueDestReceiver *tqueue, Datum value)
+TQExamineRecord(TQueueDestReceiver *tqueue, RecordRemapInfo *remapinfo,
+ Datum value)
{
HeapTupleHeader tup;
- Oid typeid;
- Oid typmod;
+ Oid typid;
+ int32 typmod;
TupleDesc tupledesc;
- RemapInfo *remapinfo;
- /* Extract typmod from tuple. */
+ /* Extract type OID and typmod from tuple. */
tup = DatumGetHeapTupleHeader(value);
- typeid = HeapTupleHeaderGetTypeId(tup);
+ typid = HeapTupleHeaderGetTypeId(tup);
typmod = HeapTupleHeaderGetTypMod(tup);
- /* Look up tuple descriptor in typecache. */
- tupledesc = lookup_rowtype_tupdesc(typeid, typmod);
-
/*
- * If this is a transient record time, send its TupleDesc as a control
- * message. (tqueueSendTypemodInfo is smart enough to do this only once
- * per typmod.)
+ * If first time through, or if this isn't the same composite type as last
+ * time, consider sending a control message, and then look up the
+ * necessary information for examining the fields.
*/
- if (typeid == RECORDOID)
- tqueueSendTypmodInfo(tqueue, typmod, tupledesc);
+ if (typid != remapinfo->rectypid || typmod != remapinfo->rectypmod)
+ {
+ /* Free any old data. */
+ if (remapinfo->tupledesc != NULL)
+ FreeTupleDesc(remapinfo->tupledesc);
+ /* Is it worth trying to free substructure of the remap tree? */
+ if (remapinfo->field_remap != NULL)
+ pfree(remapinfo->field_remap);
- /*
- * Build the remap information for this tupledesc. We might want to think
- * about keeping a cache of this information keyed by typeid and typemod,
- * but let's keep it simple for now.
- */
- remapinfo = BuildRemapInfo(tupledesc);
+ /* Look up tuple descriptor in typcache. */
+ tupledesc = lookup_rowtype_tupdesc(typid, typmod);
+
+ /*
+ * If this is a transient record type, send the tupledesc in a control
+ * message. (TQSendRecordInfo is smart enough to do this only once
+ * per typmod.)
+ */
+ if (typid == RECORDOID)
+ TQSendRecordInfo(tqueue, typmod, tupledesc);
+
+ /* Figure out whether fields need recursive processing. */
+ remapinfo->field_remap = BuildFieldRemapInfo(tupledesc,
+ tqueue->mycontext);
+ if (remapinfo->field_remap != NULL)
+ {
+ /*
+ * We need to inspect the record contents, so save a copy of the
+ * tupdesc. (We could possibly just reference the typcache's
+ * copy, but then it's problematic when to release the refcount.)
+ */
+ MemoryContext oldcontext = MemoryContextSwitchTo(tqueue->mycontext);
+
+ remapinfo->tupledesc = CreateTupleDescCopy(tupledesc);
+ MemoryContextSwitchTo(oldcontext);
+ }
+ else
+ {
+ /* No fields of the record require remapping. */
+ remapinfo->tupledesc = NULL;
+ }
+ remapinfo->rectypid = typid;
+ remapinfo->rectypmod = typmod;
+
+ /* Release reference count acquired by lookup_rowtype_tupdesc. */
+ DecrTupleDescRefCount(tupledesc);
+ }
/*
- * If remapping is required, deform the tuple and process each field. When
- * BuildRemapInfo is null, the data types are such that there can be no
- * transient record types here, so we can skip all this work.
+ * If field remapping is required, deform the tuple and examine each
+ * field.
*/
- if (remapinfo != NULL)
+ if (remapinfo->field_remap != NULL)
{
Datum *values;
bool *isnull;
HeapTupleData tdata;
- AttrNumber i;
+ int i;
/* Deform the tuple so we can check each column within. */
- values = palloc(tupledesc->natts * sizeof(Datum));
- isnull = palloc(tupledesc->natts * sizeof(bool));
+ tupledesc = remapinfo->tupledesc;
+ values = (Datum *) palloc(tupledesc->natts * sizeof(Datum));
+ isnull = (bool *) palloc(tupledesc->natts * sizeof(bool));
tdata.t_len = HeapTupleHeaderGetDatumLength(tup);
ItemPointerSetInvalid(&(tdata.t_self));
tdata.t_tableOid = InvalidOid;
tdata.t_data = tup;
heap_deform_tuple(&tdata, tupledesc, values, isnull);
- /* Recursively check each non-NULL attribute. */
- for (i = 0; i < tupledesc->natts; ++i)
- if (!isnull[i])
- tqueueWalk(tqueue, remapinfo->mapping[i], values[i]);
- }
+ /* Recursively check each interesting non-NULL attribute. */
+ for (i = 0; i < tupledesc->natts; i++)
+ {
+ if (!isnull[i] && remapinfo->field_remap[i])
+ TQExamine(tqueue, remapinfo->field_remap[i], values[i]);
+ }
- /* Release reference count acquired by lookup_rowtype_tupdesc. */
- DecrTupleDescRefCount(tupledesc);
+ /* Need not clean up, since we're in a short-lived context. */
+ }
}
/*
- * Walk a record and send control messages for transient record types
- * contained therein.
+ * Examine an array datum and send any necessary control messages for
+ * transient record types contained in it.
*/
static void
-tqueueWalkArray(TQueueDestReceiver *tqueue, Datum value)
+TQExamineArray(TQueueDestReceiver *tqueue, ArrayRemapInfo *remapinfo,
+ Datum value)
{
ArrayType *arr = DatumGetArrayTypeP(value);
- Oid typeid = ARR_ELEMTYPE(arr);
- RemapClass remapclass;
- int16 typlen;
- bool typbyval;
- char typalign;
+ Oid typid = ARR_ELEMTYPE(arr);
Datum *elem_values;
bool *elem_nulls;
int num_elems;
int i;
- remapclass = GetRemapClass(typeid);
-
- /*
- * If the elements of the array don't need to be walked, we shouldn't have
- * been called in the first place: GetRemapClass should have returned NULL
- * when asked about this array type.
- */
- Assert(remapclass != TQUEUE_REMAP_NONE);
-
/* Deconstruct the array. */
- get_typlenbyvalalign(typeid, &typlen, &typbyval, &typalign);
- deconstruct_array(arr, typeid, typlen, typbyval, typalign,
+ deconstruct_array(arr, typid, remapinfo->typlen,
+ remapinfo->typbyval, remapinfo->typalign,
&elem_values, &elem_nulls, &num_elems);
- /* Walk each element. */
- for (i = 0; i < num_elems; ++i)
+ /* Examine each element. */
+ for (i = 0; i < num_elems; i++)
+ {
if (!elem_nulls[i])
- tqueueWalk(tqueue, remapclass, elem_values[i]);
+ TQExamine(tqueue, remapinfo->element_remap, elem_values[i]);
+ }
}
/*
- * Walk a range type and send control messages for transient record types
- * contained therein.
+ * Examine a range datum and send any necessary control messages for
+ * transient record types contained in it.
*/
static void
-tqueueWalkRange(TQueueDestReceiver *tqueue, Datum value)
+TQExamineRange(TQueueDestReceiver *tqueue, RangeRemapInfo *remapinfo,
+ Datum value)
{
RangeType *range = DatumGetRangeType(value);
- Oid typeid = RangeTypeGetOid(range);
- RemapClass remapclass;
- TypeCacheEntry *typcache;
RangeBound lower;
RangeBound upper;
bool empty;
- /*
- * Extract the lower and upper bounds. It might be worth implementing
- * some caching scheme here so that we don't look up the same typeids in
- * the type cache repeatedly, but for now let's keep it simple.
- */
- typcache = lookup_type_cache(typeid, TYPECACHE_RANGE_INFO);
- if (typcache->rngelemtype == NULL)
- elog(ERROR, "type %u is not a range type", typeid);
- range_deserialize(typcache, range, &lower, &upper, &empty);
+ /* Extract the lower and upper bounds. */
+ range_deserialize(remapinfo->typcache, range, &lower, &upper, &empty);
/* Nothing to do for an empty range. */
if (empty)
return;
- /*
- * If the range bounds don't need to be walked, we shouldn't have been
- * called in the first place: GetRemapClass should have returned NULL when
- * asked about this range type.
- */
- remapclass = GetRemapClass(typcache->rngelemtype->type_id);
- Assert(remapclass != TQUEUE_REMAP_NONE);
-
- /* Walk each bound, if present. */
+ /* Examine each bound, if present. */
if (!upper.infinite)
- tqueueWalk(tqueue, remapclass, upper.val);
+ TQExamine(tqueue, remapinfo->bound_remap, upper.val);
if (!lower.infinite)
- tqueueWalk(tqueue, remapclass, lower.val);
+ TQExamine(tqueue, remapinfo->bound_remap, lower.val);
}
/*
- * Send tuple descriptor information for a transient typemod, unless we've
+ * Send tuple descriptor information for a transient typmod, unless we've
* already done so previously.
*/
static void
-tqueueSendTypmodInfo(TQueueDestReceiver *tqueue, int typmod,
- TupleDesc tupledesc)
+TQSendRecordInfo(TQueueDestReceiver *tqueue, int32 typmod, TupleDesc tupledesc)
{
StringInfoData buf;
bool found;
- AttrNumber i;
+ int i;
/* Initialize hash table if not done yet. */
if (tqueue->recordhtab == NULL)
{
HASHCTL ctl;
- ctl.keysize = sizeof(int);
- ctl.entrysize = sizeof(int);
- ctl.hcxt = TopMemoryContext;
- tqueue->recordhtab = hash_create("tqueue record hashtable",
+ MemSet(&ctl, 0, sizeof(ctl));
+ /* Hash table entries are just typmods */
+ ctl.keysize = sizeof(int32);
+ ctl.entrysize = sizeof(int32);
+ ctl.hcxt = tqueue->mycontext;
+ tqueue->recordhtab = hash_create("tqueue sender record type hashtable",
100, &ctl,
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
}
if (found)
return;
+ elog(DEBUG3, "sending tqueue control message for record typmod %d", typmod);
+
/* If message queue is in data mode, switch to control mode. */
if (tqueue->mode != TUPLE_QUEUE_MODE_CONTROL)
{
tqueue->mode = TUPLE_QUEUE_MODE_CONTROL;
- shm_mq_send(tqueue->handle, sizeof(char), &tqueue->mode, false);
+ shm_mq_send(tqueue->queue, sizeof(char), &tqueue->mode, false);
}
/* Assemble a control message. */
initStringInfo(&buf);
- appendBinaryStringInfo(&buf, (char *) &typmod, sizeof(int));
+ appendBinaryStringInfo(&buf, (char *) &typmod, sizeof(int32));
appendBinaryStringInfo(&buf, (char *) &tupledesc->natts, sizeof(int));
- appendBinaryStringInfo(&buf, (char *) &tupledesc->tdhasoid,
- sizeof(bool));
- for (i = 0; i < tupledesc->natts; ++i)
+ appendBinaryStringInfo(&buf, (char *) &tupledesc->tdhasoid, sizeof(bool));
+ for (i = 0; i < tupledesc->natts; i++)
+ {
appendBinaryStringInfo(&buf, (char *) tupledesc->attrs[i],
sizeof(FormData_pg_attribute));
+ }
/* Send control message. */
- shm_mq_send(tqueue->handle, buf.len, buf.data, false);
+ shm_mq_send(tqueue->queue, buf.len, buf.data, false);
+
+ /* We assume it's OK to leak buf because we're in a short-lived context. */
}
/*
{
TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
- shm_mq_detach(shm_mq_get_queue(tqueue->handle));
+ shm_mq_detach(shm_mq_get_queue(tqueue->queue));
}
/*
MemoryContextDelete(tqueue->tmpcontext);
if (tqueue->recordhtab != NULL)
hash_destroy(tqueue->recordhtab);
- if (tqueue->remapinfo != NULL)
- pfree(tqueue->remapinfo);
+ /* Is it worth trying to free substructure of the remap tree? */
+ if (tqueue->field_remapinfo != NULL)
+ pfree(tqueue->field_remapinfo);
pfree(self);
}
self->pub.rShutdown = tqueueShutdownReceiver;
self->pub.rDestroy = tqueueDestroyReceiver;
self->pub.mydest = DestTupleQueue;
- self->handle = handle;
+ self->queue = handle;
+ self->mycontext = CurrentMemoryContext;
self->tmpcontext = NULL;
self->recordhtab = NULL;
self->mode = TUPLE_QUEUE_MODE_DATA;
- self->remapinfo = NULL;
+ /* Top-level tupledesc is not known yet */
+ self->tupledesc = NULL;
+ self->field_remapinfo = NULL;
return (DestReceiver *) self;
}
TupleQueueReader *reader = palloc0(sizeof(TupleQueueReader));
reader->queue = handle;
+ reader->mycontext = CurrentMemoryContext;
+ reader->typmodmap = NULL;
reader->mode = TUPLE_QUEUE_MODE_DATA;
reader->tupledesc = tupledesc;
- reader->remapinfo = BuildRemapInfo(tupledesc);
+ reader->field_remapinfo = BuildFieldRemapInfo(tupledesc, reader->mycontext);
return reader;
}
DestroyTupleQueueReader(TupleQueueReader *reader)
{
shm_mq_detach(shm_mq_get_queue(reader->queue));
- if (reader->remapinfo != NULL)
- pfree(reader->remapinfo);
+ if (reader->typmodmap != NULL)
+ hash_destroy(reader->typmodmap);
+ /* Is it worth trying to free substructure of the remap tree? */
+ if (reader->field_remapinfo != NULL)
+ pfree(reader->field_remapinfo);
pfree(reader);
}
Assert(result == SHM_MQ_SUCCESS);
/*
- * OK, we got a message. Process it.
- *
- * One-byte messages are mode switch messages, so that we can switch
- * between "control" and "data" mode. Otherwise, when in "data" mode,
- * each message is a tuple. When in "control" mode, each message
- * provides a transient-typmod-to-tupledesc mapping to let us
- * interpret future tuples. Both of those cases certainly require
- * more than one byte, so no confusion is possible.
+ * We got a message (see message spec at top of file). Process it.
*/
if (nbytes == 1)
{
TupleQueueHandleControlMessage(reader, nbytes, data);
}
else
- elog(ERROR, "invalid mode: %d", (int) reader->mode);
+ elog(ERROR, "unrecognized tqueue mode: %d", (int) reader->mode);
}
}
{
HeapTupleData htup;
+ /*
+ * Set up a dummy HeapTupleData pointing to the data from the shm_mq
+ * (which had better be sufficiently aligned).
+ */
ItemPointerSetInvalid(&htup.t_self);
htup.t_tableOid = InvalidOid;
htup.t_len = nbytes;
htup.t_data = data;
- return TupleQueueRemapTuple(reader, reader->tupledesc, reader->remapinfo,
- &htup);
+ /*
+ * Either just copy the data into a regular palloc'd tuple, or remap it,
+ * as required.
+ */
+ return TQRemapTuple(reader,
+ reader->tupledesc,
+ reader->field_remapinfo,
+ &htup);
}
/*
- * Remap tuple typmods per control information received from remote side.
+ * Copy the given tuple, remapping any transient typmods contained in it.
*/
static HeapTuple
-TupleQueueRemapTuple(TupleQueueReader *reader, TupleDesc tupledesc,
- RemapInfo *remapinfo, HeapTuple tuple)
+TQRemapTuple(TupleQueueReader *reader,
+ TupleDesc tupledesc,
+ TupleRemapInfo **field_remapinfo,
+ HeapTuple tuple)
{
Datum *values;
bool *isnull;
+ bool changed = false;
int i;
/*
* If no remapping is necessary, just copy the tuple into a single
* palloc'd chunk, as caller will expect.
*/
- if (remapinfo == NULL)
+ if (field_remapinfo == NULL)
return heap_copytuple(tuple);
/* Deform tuple so we can remap record typmods for individual attrs. */
- values = palloc(tupledesc->natts * sizeof(Datum));
- isnull = palloc(tupledesc->natts * sizeof(bool));
+ values = (Datum *) palloc(tupledesc->natts * sizeof(Datum));
+ isnull = (bool *) palloc(tupledesc->natts * sizeof(bool));
heap_deform_tuple(tuple, tupledesc, values, isnull);
- Assert(tupledesc->natts == remapinfo->natts);
- /* Recursively check each non-NULL attribute. */
- for (i = 0; i < tupledesc->natts; ++i)
+ /* Recursively process each interesting non-NULL attribute. */
+ for (i = 0; i < tupledesc->natts; i++)
{
- if (isnull[i] || remapinfo->mapping[i] == TQUEUE_REMAP_NONE)
+ if (isnull[i] || field_remapinfo[i] == NULL)
continue;
- values[i] = TupleQueueRemap(reader, remapinfo->mapping[i], values[i]);
+ values[i] = TQRemap(reader, field_remapinfo[i], values[i], &changed);
}
- /* Reform the modified tuple. */
- return heap_form_tuple(tupledesc, values, isnull);
+ /* Reconstruct the modified tuple, if anything was modified. */
+ if (changed)
+ return heap_form_tuple(tupledesc, values, isnull);
+ else
+ return heap_copytuple(tuple);
}
/*
- * Remap a value based on the specified remap class.
+ * Process the given datum and replace any transient record typmods
+ * contained in it. Set *changed to TRUE if we actually changed the datum.
+ *
+ * remapinfo is previously-computed remapping info about the datum's type.
+ *
+ * This function just dispatches based on the remap class.
*/
static Datum
-TupleQueueRemap(TupleQueueReader *reader, RemapClass remapclass, Datum value)
+TQRemap(TupleQueueReader *reader, TupleRemapInfo *remapinfo,
+ Datum value, bool *changed)
{
+ /* This is recursive, so it could be driven to stack overflow. */
check_stack_depth();
- switch (remapclass)
+ switch (remapinfo->remapclass)
{
- case TQUEUE_REMAP_NONE:
- /* caller probably shouldn't have called us at all, but... */
- return value;
-
case TQUEUE_REMAP_ARRAY:
- return TupleQueueRemapArray(reader, value);
+ return TQRemapArray(reader, &remapinfo->u.arr, value, changed);
case TQUEUE_REMAP_RANGE:
- return TupleQueueRemapRange(reader, value);
+ return TQRemapRange(reader, &remapinfo->u.rng, value, changed);
case TQUEUE_REMAP_RECORD:
- return TupleQueueRemapRecord(reader, value);
+ return TQRemapRecord(reader, &remapinfo->u.rec, value, changed);
}
- elog(ERROR, "unknown remap class: %d", (int) remapclass);
+ elog(ERROR, "unrecognized tqueue remap class: %d",
+ (int) remapinfo->remapclass);
return (Datum) 0;
}
/*
- * Remap an array.
+ * Process the given array datum and replace any transient record typmods
+ * contained in it. Set *changed to TRUE if we actually changed the datum.
*/
static Datum
-TupleQueueRemapArray(TupleQueueReader *reader, Datum value)
+TQRemapArray(TupleQueueReader *reader, ArrayRemapInfo *remapinfo,
+ Datum value, bool *changed)
{
ArrayType *arr = DatumGetArrayTypeP(value);
- Oid typeid = ARR_ELEMTYPE(arr);
- RemapClass remapclass;
- int16 typlen;
- bool typbyval;
- char typalign;
+ Oid typid = ARR_ELEMTYPE(arr);
+ bool element_changed = false;
Datum *elem_values;
bool *elem_nulls;
int num_elems;
int i;
- remapclass = GetRemapClass(typeid);
-
- /*
- * If the elements of the array don't need to be walked, we shouldn't have
- * been called in the first place: GetRemapClass should have returned NULL
- * when asked about this array type.
- */
- Assert(remapclass != TQUEUE_REMAP_NONE);
-
/* Deconstruct the array. */
- get_typlenbyvalalign(typeid, &typlen, &typbyval, &typalign);
- deconstruct_array(arr, typeid, typlen, typbyval, typalign,
+ deconstruct_array(arr, typid, remapinfo->typlen,
+ remapinfo->typbyval, remapinfo->typalign,
&elem_values, &elem_nulls, &num_elems);
/* Remap each element. */
- for (i = 0; i < num_elems; ++i)
+ for (i = 0; i < num_elems; i++)
+ {
if (!elem_nulls[i])
- elem_values[i] = TupleQueueRemap(reader, remapclass,
- elem_values[i]);
-
- /* Reconstruct and return the array. */
- arr = construct_md_array(elem_values, elem_nulls,
- ARR_NDIM(arr), ARR_DIMS(arr), ARR_LBOUND(arr),
- typeid, typlen, typbyval, typalign);
- return PointerGetDatum(arr);
+ elem_values[i] = TQRemap(reader,
+ remapinfo->element_remap,
+ elem_values[i],
+ &element_changed);
+ }
+
+ if (element_changed)
+ {
+ /* Reconstruct and return the array. */
+ *changed = true;
+ arr = construct_md_array(elem_values, elem_nulls,
+ ARR_NDIM(arr), ARR_DIMS(arr), ARR_LBOUND(arr),
+ typid, remapinfo->typlen,
+ remapinfo->typbyval, remapinfo->typalign);
+ return PointerGetDatum(arr);
+ }
+
+ /* Else just return the value as-is. */
+ return value;
}
/*
- * Remap a range type.
+ * Process the given range datum and replace any transient record typmods
+ * contained in it. Set *changed to TRUE if we actually changed the datum.
*/
static Datum
-TupleQueueRemapRange(TupleQueueReader *reader, Datum value)
+TQRemapRange(TupleQueueReader *reader, RangeRemapInfo *remapinfo,
+ Datum value, bool *changed)
{
RangeType *range = DatumGetRangeType(value);
- Oid typeid = RangeTypeGetOid(range);
- RemapClass remapclass;
- TypeCacheEntry *typcache;
+ bool bound_changed = false;
RangeBound lower;
RangeBound upper;
bool empty;
- /*
- * Extract the lower and upper bounds. As in tqueueWalkRange, some
- * caching might be a good idea here.
- */
- typcache = lookup_type_cache(typeid, TYPECACHE_RANGE_INFO);
- if (typcache->rngelemtype == NULL)
- elog(ERROR, "type %u is not a range type", typeid);
- range_deserialize(typcache, range, &lower, &upper, &empty);
+ /* Extract the lower and upper bounds. */
+ range_deserialize(remapinfo->typcache, range, &lower, &upper, &empty);
/* Nothing to do for an empty range. */
if (empty)
return value;
- /*
- * If the range bounds don't need to be walked, we shouldn't have been
- * called in the first place: GetRemapClass should have returned NULL when
- * asked about this range type.
- */
- remapclass = GetRemapClass(typcache->rngelemtype->type_id);
- Assert(remapclass != TQUEUE_REMAP_NONE);
-
/* Remap each bound, if present. */
if (!upper.infinite)
- upper.val = TupleQueueRemap(reader, remapclass, upper.val);
+ upper.val = TQRemap(reader, remapinfo->bound_remap,
+ upper.val, &bound_changed);
if (!lower.infinite)
- lower.val = TupleQueueRemap(reader, remapclass, lower.val);
+ lower.val = TQRemap(reader, remapinfo->bound_remap,
+ lower.val, &bound_changed);
- /* And reserialize. */
- range = range_serialize(typcache, &lower, &upper, empty);
- return RangeTypeGetDatum(range);
+ if (bound_changed)
+ {
+ /* Reserialize. */
+ *changed = true;
+ range = range_serialize(remapinfo->typcache, &lower, &upper, empty);
+ return RangeTypeGetDatum(range);
+ }
+
+ /* Else just return the value as-is. */
+ return value;
}
/*
- * Remap a record.
+ * Process the given record datum and replace any transient record typmods
+ * contained in it. Set *changed to TRUE if we actually changed the datum.
*/
static Datum
-TupleQueueRemapRecord(TupleQueueReader *reader, Datum value)
+TQRemapRecord(TupleQueueReader *reader, RecordRemapInfo *remapinfo,
+ Datum value, bool *changed)
{
HeapTupleHeader tup;
- Oid typeid;
- int typmod;
- RecordTypemodMap *mapent;
+ Oid typid;
+ int32 typmod;
+ bool changed_typmod;
TupleDesc tupledesc;
- RemapInfo *remapinfo;
- HeapTupleData htup;
- HeapTuple atup;
- /* Fetch type OID and typemod. */
+ /* Extract type OID and typmod from tuple. */
tup = DatumGetHeapTupleHeader(value);
- typeid = HeapTupleHeaderGetTypeId(tup);
+ typid = HeapTupleHeaderGetTypeId(tup);
typmod = HeapTupleHeaderGetTypMod(tup);
+ /*
+ * If first time through, or if this isn't the same composite type as last
+ * time, identify the required typmod mapping, and then look up the
+ * necessary information for processing the fields.
+ */
+ if (typid != remapinfo->rectypid || typmod != remapinfo->rectypmod)
+ {
+ /* Free any old data. */
+ if (remapinfo->tupledesc != NULL)
+ FreeTupleDesc(remapinfo->tupledesc);
+ /* Is it worth trying to free substructure of the remap tree? */
+ if (remapinfo->field_remap != NULL)
+ pfree(remapinfo->field_remap);
+
+ /* If transient record type, look up matching local typmod. */
+ if (typid == RECORDOID)
+ {
+ RecordTypmodMap *mapent;
+
+ Assert(reader->typmodmap != NULL);
+ mapent = hash_search(reader->typmodmap, &typmod,
+ HASH_FIND, NULL);
+ if (mapent == NULL)
+ elog(ERROR, "tqueue received unrecognized remote typmod %d",
+ typmod);
+ remapinfo->localtypmod = mapent->localtypmod;
+ }
+ else
+ remapinfo->localtypmod = -1;
+
+ /* Look up tuple descriptor in typcache. */
+ tupledesc = lookup_rowtype_tupdesc(typid, remapinfo->localtypmod);
+
+ /* Figure out whether fields need recursive processing. */
+ remapinfo->field_remap = BuildFieldRemapInfo(tupledesc,
+ reader->mycontext);
+ if (remapinfo->field_remap != NULL)
+ {
+ /*
+ * We need to inspect the record contents, so save a copy of the
+ * tupdesc. (We could possibly just reference the typcache's
+ * copy, but then it's problematic when to release the refcount.)
+ */
+ MemoryContext oldcontext = MemoryContextSwitchTo(reader->mycontext);
+
+ remapinfo->tupledesc = CreateTupleDescCopy(tupledesc);
+ MemoryContextSwitchTo(oldcontext);
+ }
+ else
+ {
+ /* No fields of the record require remapping. */
+ remapinfo->tupledesc = NULL;
+ }
+ remapinfo->rectypid = typid;
+ remapinfo->rectypmod = typmod;
+
+ /* Release reference count acquired by lookup_rowtype_tupdesc. */
+ DecrTupleDescRefCount(tupledesc);
+ }
+
/* If transient record, replace remote typmod with local typmod. */
- if (typeid == RECORDOID)
+ if (typid == RECORDOID && typmod != remapinfo->localtypmod)
{
- Assert(reader->typmodmap != NULL);
- mapent = hash_search(reader->typmodmap, &typmod,
- HASH_FIND, NULL);
- if (mapent == NULL)
- elog(ERROR, "found unrecognized remote typmod %d", typmod);
- typmod = mapent->localtypmod;
+ typmod = remapinfo->localtypmod;
+ changed_typmod = true;
}
+ else
+ changed_typmod = false;
/*
- * Fetch tupledesc and compute remap info. We should probably cache this
- * so that we don't have to keep recomputing it.
+ * If we need to change the typmod, or if there are any potentially
+ * remappable fields, replace the tuple.
*/
- tupledesc = lookup_rowtype_tupdesc(typeid, typmod);
- remapinfo = BuildRemapInfo(tupledesc);
- DecrTupleDescRefCount(tupledesc);
+ if (changed_typmod || remapinfo->field_remap != NULL)
+ {
+ HeapTupleData htup;
+ HeapTuple atup;
+
+ /* For now, assume we always need to change the tuple in this case. */
+ *changed = true;
+
+ /* Copy tuple, possibly remapping contained fields. */
+ ItemPointerSetInvalid(&htup.t_self);
+ htup.t_tableOid = InvalidOid;
+ htup.t_len = HeapTupleHeaderGetDatumLength(tup);
+ htup.t_data = tup;
+ atup = TQRemapTuple(reader,
+ remapinfo->tupledesc,
+ remapinfo->field_remap,
+ &htup);
+
+ /* Apply the correct labeling for a local Datum. */
+ HeapTupleHeaderSetTypeId(atup->t_data, typid);
+ HeapTupleHeaderSetTypMod(atup->t_data, typmod);
+ HeapTupleHeaderSetDatumLength(atup->t_data, htup.t_len);
+
+ /* And return the results. */
+ return HeapTupleHeaderGetDatum(atup->t_data);
+ }
- /* Remap tuple. */
- ItemPointerSetInvalid(&htup.t_self);
- htup.t_tableOid = InvalidOid;
- htup.t_len = HeapTupleHeaderGetDatumLength(tup);
- htup.t_data = tup;
- atup = TupleQueueRemapTuple(reader, tupledesc, remapinfo, &htup);
- HeapTupleHeaderSetTypeId(atup->t_data, typeid);
- HeapTupleHeaderSetTypMod(atup->t_data, typmod);
- HeapTupleHeaderSetDatumLength(atup->t_data, htup.t_len);
-
- /* And return the results. */
- return HeapTupleHeaderGetDatum(atup->t_data);
+ /* Else just return the value as-is. */
+ return value;
}
/*
TupleQueueHandleControlMessage(TupleQueueReader *reader, Size nbytes,
char *data)
{
+ int32 remotetypmod;
int natts;
- int remotetypmod;
bool hasoid;
- char *buf = data;
- int rc = 0;
- int i;
+ Size offset = 0;
Form_pg_attribute *attrs;
- MemoryContext oldcontext;
TupleDesc tupledesc;
- RecordTypemodMap *mapent;
+ RecordTypmodMap *mapent;
bool found;
+ int i;
/* Extract remote typmod. */
- memcpy(&remotetypmod, &buf[rc], sizeof(int));
- rc += sizeof(int);
+ memcpy(&remotetypmod, &data[offset], sizeof(int32));
+ offset += sizeof(int32);
/* Extract attribute count. */
- memcpy(&natts, &buf[rc], sizeof(int));
- rc += sizeof(int);
+ memcpy(&natts, &data[offset], sizeof(int));
+ offset += sizeof(int);
/* Extract hasoid flag. */
- memcpy(&hasoid, &buf[rc], sizeof(bool));
- rc += sizeof(bool);
+ memcpy(&hasoid, &data[offset], sizeof(bool));
+ offset += sizeof(bool);
- /* Extract attribute details. */
- oldcontext = MemoryContextSwitchTo(CurTransactionContext);
+ /* Extract attribute details. The tupledesc made here is just transient. */
attrs = palloc(natts * sizeof(Form_pg_attribute));
- for (i = 0; i < natts; ++i)
+ for (i = 0; i < natts; i++)
{
attrs[i] = palloc(sizeof(FormData_pg_attribute));
- memcpy(attrs[i], &buf[rc], sizeof(FormData_pg_attribute));
- rc += sizeof(FormData_pg_attribute);
+ memcpy(attrs[i], &data[offset], sizeof(FormData_pg_attribute));
+ offset += sizeof(FormData_pg_attribute);
}
- MemoryContextSwitchTo(oldcontext);
/* We should have read the whole message. */
- Assert(rc == nbytes);
+ Assert(offset == nbytes);
- /* Construct TupleDesc. */
+ /* Construct TupleDesc, and assign a local typmod. */
tupledesc = CreateTupleDesc(natts, hasoid, attrs);
tupledesc = BlessTupleDesc(tupledesc);
- /* Create map if it doesn't exist already. */
+ /* Create mapping hashtable if it doesn't exist already. */
if (reader->typmodmap == NULL)
{
HASHCTL ctl;
- ctl.keysize = sizeof(int);
- ctl.entrysize = sizeof(RecordTypemodMap);
- ctl.hcxt = CurTransactionContext;
- reader->typmodmap = hash_create("typmodmap hashtable",
+ MemSet(&ctl, 0, sizeof(ctl));
+ ctl.keysize = sizeof(int32);
+ ctl.entrysize = sizeof(RecordTypmodMap);
+ ctl.hcxt = reader->mycontext;
+ reader->typmodmap = hash_create("tqueue receiver record type hashtable",
100, &ctl,
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
}
mapent = hash_search(reader->typmodmap, &remotetypmod, HASH_ENTER,
&found);
if (found)
- elog(ERROR, "duplicate message for typmod %d",
+ elog(ERROR, "duplicate tqueue control message for typmod %d",
remotetypmod);
mapent->localtypmod = tupledesc->tdtypmod;
- elog(DEBUG3, "mapping remote typmod %d to local typmod %d",
- remotetypmod, tupledesc->tdtypmod);
+
+ elog(DEBUG3, "tqueue mapping remote typmod %d to local typmod %d",
+ remotetypmod, mapent->localtypmod);
}
/*
- * Build a mapping indicating what remapping class applies to each attribute
- * described by a tupledesc.
+ * Build remap info for the specified data type, storing it in mycontext.
+ * Returns NULL if neither the type nor any subtype could require remapping.
*/
-static RemapInfo *
-BuildRemapInfo(TupleDesc tupledesc)
+static TupleRemapInfo *
+BuildTupleRemapInfo(Oid typid, MemoryContext mycontext)
{
- RemapInfo *remapinfo;
- Size size;
- AttrNumber i;
- bool noop = true;
+ HeapTuple tup;
+ Form_pg_type typ;
+
+ /* This is recursive, so it could be driven to stack overflow. */
+ check_stack_depth();
- size = offsetof(RemapInfo, mapping) +
- sizeof(RemapClass) * tupledesc->natts;
- remapinfo = MemoryContextAllocZero(TopMemoryContext, size);
- remapinfo->natts = tupledesc->natts;
- for (i = 0; i < tupledesc->natts; ++i)
+restart:
+ tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(typid));
+ if (!HeapTupleIsValid(tup))
+ elog(ERROR, "cache lookup failed for type %u", typid);
+ typ = (Form_pg_type) GETSTRUCT(tup);
+
+ /* Look through domains to underlying base type. */
+ if (typ->typtype == TYPTYPE_DOMAIN)
{
- Form_pg_attribute attr = tupledesc->attrs[i];
+ typid = typ->typbasetype;
+ ReleaseSysCache(tup);
+ goto restart;
+ }
- if (attr->attisdropped)
- {
- remapinfo->mapping[i] = TQUEUE_REMAP_NONE;
- continue;
- }
+ /* If it's a true array type, deal with it that way. */
+ if (OidIsValid(typ->typelem) && typ->typlen == -1)
+ {
+ typid = typ->typelem;
+ ReleaseSysCache(tup);
+ return BuildArrayRemapInfo(typid, mycontext);
+ }
- remapinfo->mapping[i] = GetRemapClass(attr->atttypid);
- if (remapinfo->mapping[i] != TQUEUE_REMAP_NONE)
- noop = false;
+ /* Similarly, deal with ranges appropriately. */
+ if (typ->typtype == TYPTYPE_RANGE)
+ {
+ ReleaseSysCache(tup);
+ return BuildRangeRemapInfo(typid, mycontext);
}
- if (noop)
+ /*
+ * If it's a composite type (including RECORD), set up for remapping. We
+ * don't attempt to determine the status of subfields here, since we do
+ * not have enough information yet; just mark everything invalid.
+ */
+ if (typ->typtype == TYPTYPE_COMPOSITE || typid == RECORDOID)
{
- pfree(remapinfo);
- remapinfo = NULL;
+ TupleRemapInfo *remapinfo;
+
+ remapinfo = (TupleRemapInfo *)
+ MemoryContextAlloc(mycontext, sizeof(TupleRemapInfo));
+ remapinfo->remapclass = TQUEUE_REMAP_RECORD;
+ remapinfo->u.rec.rectypid = InvalidOid;
+ remapinfo->u.rec.rectypmod = -1;
+ remapinfo->u.rec.localtypmod = -1;
+ remapinfo->u.rec.tupledesc = NULL;
+ remapinfo->u.rec.field_remap = NULL;
+ ReleaseSysCache(tup);
+ return remapinfo;
}
+ /* Nothing else can possibly need remapping attention. */
+ ReleaseSysCache(tup);
+ return NULL;
+}
+
+static TupleRemapInfo *
+BuildArrayRemapInfo(Oid elemtypid, MemoryContext mycontext)
+{
+ TupleRemapInfo *remapinfo;
+ TupleRemapInfo *element_remapinfo;
+
+ /* See if element type requires remapping. */
+ element_remapinfo = BuildTupleRemapInfo(elemtypid, mycontext);
+ /* If not, the array doesn't either. */
+ if (element_remapinfo == NULL)
+ return NULL;
+ /* OK, set up to remap the array. */
+ remapinfo = (TupleRemapInfo *)
+ MemoryContextAlloc(mycontext, sizeof(TupleRemapInfo));
+ remapinfo->remapclass = TQUEUE_REMAP_ARRAY;
+ get_typlenbyvalalign(elemtypid,
+ &remapinfo->u.arr.typlen,
+ &remapinfo->u.arr.typbyval,
+ &remapinfo->u.arr.typalign);
+ remapinfo->u.arr.element_remap = element_remapinfo;
+ return remapinfo;
+}
+
+static TupleRemapInfo *
+BuildRangeRemapInfo(Oid rngtypid, MemoryContext mycontext)
+{
+ TupleRemapInfo *remapinfo;
+ TupleRemapInfo *bound_remapinfo;
+ TypeCacheEntry *typcache;
+
+ /*
+ * Get range info from the typcache. We assume this pointer will stay
+ * valid for the duration of the query.
+ */
+ typcache = lookup_type_cache(rngtypid, TYPECACHE_RANGE_INFO);
+ if (typcache->rngelemtype == NULL)
+ elog(ERROR, "type %u is not a range type", rngtypid);
+
+ /* See if range bound type requires remapping. */
+ bound_remapinfo = BuildTupleRemapInfo(typcache->rngelemtype->type_id,
+ mycontext);
+ /* If not, the range doesn't either. */
+ if (bound_remapinfo == NULL)
+ return NULL;
+ /* OK, set up to remap the range. */
+ remapinfo = (TupleRemapInfo *)
+ MemoryContextAlloc(mycontext, sizeof(TupleRemapInfo));
+ remapinfo->remapclass = TQUEUE_REMAP_RANGE;
+ remapinfo->u.rng.typcache = typcache;
+ remapinfo->u.rng.bound_remap = bound_remapinfo;
return remapinfo;
}
/*
- * Determine the remap class assocociated with a particular data type.
- *
- * Transient record types need to have the typmod applied on the sending side
- * replaced with a value on the receiving side that has the same meaning.
- *
- * Arrays, range types, and all record types (including named composite types)
- * need to searched for transient record values buried within them.
- * Surprisingly, a walker is required even when the indicated type is a
- * composite type, because the actual value may be a compatible transient
- * record type.
+ * Build remap info for fields of the type described by the given tupdesc.
+ * Returns an array of TupleRemapInfo pointers, or NULL if no field
+ * requires remapping. Data is allocated in mycontext.
*/
-static RemapClass
-GetRemapClass(Oid typeid)
+static TupleRemapInfo **
+BuildFieldRemapInfo(TupleDesc tupledesc, MemoryContext mycontext)
{
- RemapClass forceResult = TQUEUE_REMAP_NONE;
- RemapClass innerResult = TQUEUE_REMAP_NONE;
+ TupleRemapInfo **remapinfo;
+ bool noop = true;
+ int i;
- for (;;)
+ /* Recursively determine the remapping status of each field. */
+ remapinfo = (TupleRemapInfo **)
+ MemoryContextAlloc(mycontext,
+ tupledesc->natts * sizeof(TupleRemapInfo *));
+ for (i = 0; i < tupledesc->natts; i++)
{
- HeapTuple tup;
- Form_pg_type typ;
-
- /* Simple cases. */
- if (typeid == RECORDOID)
- {
- innerResult = TQUEUE_REMAP_RECORD;
- break;
- }
- if (typeid == RECORDARRAYOID)
- {
- innerResult = TQUEUE_REMAP_ARRAY;
- break;
- }
-
- /* Otherwise, we need a syscache lookup to figure it out. */
- tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(typeid));
- if (!HeapTupleIsValid(tup))
- elog(ERROR, "cache lookup failed for type %u", typeid);
- typ = (Form_pg_type) GETSTRUCT(tup);
-
- /* Look through domains to underlying base type. */
- if (typ->typtype == TYPTYPE_DOMAIN)
- {
- typeid = typ->typbasetype;
- ReleaseSysCache(tup);
- continue;
- }
-
- /*
- * Look through arrays to underlying base type, but the final return
- * value must be either TQUEUE_REMAP_ARRAY or TQUEUE_REMAP_NONE. (If
- * this is an array of integers, for example, we don't need to walk
- * it.)
- */
- if (OidIsValid(typ->typelem) && typ->typlen == -1)
- {
- typeid = typ->typelem;
- ReleaseSysCache(tup);
- if (forceResult == TQUEUE_REMAP_NONE)
- forceResult = TQUEUE_REMAP_ARRAY;
- continue;
- }
+ Form_pg_attribute attr = tupledesc->attrs[i];
- /*
- * Similarly, look through ranges to the underlying base type, but the
- * final return value must be either TQUEUE_REMAP_RANGE or
- * TQUEUE_REMAP_NONE.
- */
- if (typ->typtype == TYPTYPE_RANGE)
+ if (attr->attisdropped)
{
- ReleaseSysCache(tup);
- if (forceResult == TQUEUE_REMAP_NONE)
- forceResult = TQUEUE_REMAP_RANGE;
- typeid = get_range_subtype(typeid);
+ remapinfo[i] = NULL;
continue;
}
+ remapinfo[i] = BuildTupleRemapInfo(attr->atttypid, mycontext);
+ if (remapinfo[i] != NULL)
+ noop = false;
+ }
- /* Walk composite types. Nothing else needs special handling. */
- if (typ->typtype == TYPTYPE_COMPOSITE)
- innerResult = TQUEUE_REMAP_RECORD;
- ReleaseSysCache(tup);
- break;
+ /* If no fields require remapping, report that by returning NULL. */
+ if (noop)
+ {
+ pfree(remapinfo);
+ remapinfo = NULL;
}
- if (innerResult != TQUEUE_REMAP_NONE && forceResult != TQUEUE_REMAP_NONE)
- return forceResult;
- return innerResult;
+ return remapinfo;
}