From a9ed875fdc2c44b5793a07727274786b417fc924 Mon Sep 17 00:00:00 2001 From: Tom Lane Date: Sun, 31 Jul 2016 16:05:12 -0400 Subject: [PATCH] Code review for tqueue.c: fix memory leaks, speed it up, other fixes. When doing record typmod remapping, tqueue.c did fresh catalog lookups for each tuple it processed, which was pretty horrible performance-wise (it seemed to about halve the already none-too-quick speed of bulk reads in parallel mode). Worse, it insisted on putting bits of that data into TopMemoryContext, from where it never freed them, causing a session-lifespan memory leak. (I suppose this was coded with the idea that the sender process would quit after finishing the query --- but the receiver uses the same code.) Restructure to avoid repetitive catalog lookups and to keep that data in a query-lifespan context, in or below the context where the TQueueDestReceiver or TupleQueueReader itself lives. Fix some other bugs such as continuing to use a tupledesc after releasing our refcount on it. Clean up cavalier datatype choices (typmods are int32, please, not int, and certainly not Oid). Improve comments and error message wording. --- src/backend/executor/tqueue.c | 1142 ++++++++++++++++++++------------- 1 file changed, 695 insertions(+), 447 deletions(-) diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c index 64555599ce..58d0eeaf0b 100644 --- a/src/backend/executor/tqueue.c +++ b/src/backend/executor/tqueue.c @@ -3,16 +3,25 @@ * tqueue.c * Use shm_mq to send & receive tuples between parallel backends * + * Most of the complexity in this module arises from transient RECORD types, + * which all have type RECORDOID and are distinguished by typmod numbers + * that are managed per-backend (see src/backend/utils/cache/typcache.c). + * The sender's set of RECORD typmod assignments probably doesn't match the + * receiver's. To deal with this, we make the sender send a description + * of each transient RECORD type appearing in the data it sends. The + * receiver finds or creates a matching type in its own typcache, and then + * maps the sender's typmod for that type to its own typmod. + * * A DestReceiver of type DestTupleQueue, which is a TQueueDestReceiver * under the hood, writes tuples from the executor to a shm_mq. If * necessary, it also writes control messages describing transient * record types used within the tuple. * - * A TupleQueueReader reads tuples, and if any are sent control messages, + * A TupleQueueReader reads tuples, and control messages if any are sent, * from a shm_mq and returns the tuples. If transient record types are - * in use, it registers those types based on the received control messages - * and rewrites the typemods sent by the remote side to the corresponding - * local record typemods. + * in use, it registers those types locally based on the control messages + * and rewrites the typmods sent by the remote side to the corresponding + * local record typmods. * * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California @@ -38,82 +47,178 @@ #include "utils/syscache.h" #include "utils/typcache.h" + +/* + * The data transferred through the shm_mq is divided into messages. + * One-byte messages are mode-switch messages, telling the receiver to switch + * between "control" and "data" modes. (We always start up in "data" mode.) + * Otherwise, when in "data" mode, each message is a tuple. When in "control" + * mode, each message defines one transient-typmod-to-tupledesc mapping to + * let us interpret future tuples. Both of those cases certainly require + * more than one byte, so no confusion is possible. + */ +#define TUPLE_QUEUE_MODE_CONTROL 'c' /* mode-switch message contents */ +#define TUPLE_QUEUE_MODE_DATA 'd' + +/* + * Both the sender and receiver build trees of TupleRemapInfo nodes to help + * them identify which (sub) fields of transmitted tuples are composite and + * may thus need remap processing. We might need to look within arrays and + * ranges, not only composites, to find composite sub-fields. A NULL + * TupleRemapInfo pointer indicates that it is known that the described field + * is not composite and has no composite substructure. + * + * Note that we currently have to look at each composite field at runtime, + * even if we believe it's of a named composite type (i.e., not RECORD). + * This is because we allow the actual value to be a compatible transient + * RECORD type. That's grossly inefficient, and it would be good to get + * rid of the requirement, but it's not clear what would need to change. + * + * Also, we allow the top-level tuple structure, as well as the actual + * structure of composite subfields, to change from one tuple to the next + * at runtime. This may well be entirely historical, but it's mostly free + * to support given the previous requirement; and other places in the system + * also permit this, so it's not entirely clear if we could drop it. + */ + typedef enum { - TQUEUE_REMAP_NONE, /* no special processing required */ TQUEUE_REMAP_ARRAY, /* array */ TQUEUE_REMAP_RANGE, /* range */ - TQUEUE_REMAP_RECORD /* composite type, named or anonymous */ -} RemapClass; + TQUEUE_REMAP_RECORD /* composite type, named or transient */ +} TupleRemapClass; -typedef struct +typedef struct TupleRemapInfo TupleRemapInfo; + +typedef struct ArrayRemapInfo { - int natts; - RemapClass mapping[FLEXIBLE_ARRAY_MEMBER]; -} RemapInfo; + int16 typlen; /* array element type's storage properties */ + bool typbyval; + char typalign; + TupleRemapInfo *element_remap; /* array element type's remap info */ +} ArrayRemapInfo; -typedef struct +typedef struct RangeRemapInfo { - DestReceiver pub; - shm_mq_handle *handle; - MemoryContext tmpcontext; - HTAB *recordhtab; - char mode; - TupleDesc tupledesc; - RemapInfo *remapinfo; + TypeCacheEntry *typcache; /* range type's typcache entry */ + TupleRemapInfo *bound_remap; /* range bound type's remap info */ +} RangeRemapInfo; + +typedef struct RecordRemapInfo +{ + /* Original (remote) type ID info last seen for this composite field */ + Oid rectypid; + int32 rectypmod; + /* Local RECORD typmod, or -1 if unset; not used on sender side */ + int32 localtypmod; + /* If no fields of the record require remapping, these are NULL: */ + TupleDesc tupledesc; /* copy of record's tupdesc */ + TupleRemapInfo **field_remap; /* each field's remap info */ +} RecordRemapInfo; + +struct TupleRemapInfo +{ + TupleRemapClass remapclass; + union + { + ArrayRemapInfo arr; + RangeRemapInfo rng; + RecordRemapInfo rec; + } u; +}; + +/* + * DestReceiver object's private contents + * + * queue and tupledesc are pointers to data supplied by DestReceiver's caller. + * The recordhtab and remap info are owned by the DestReceiver and are kept + * in mycontext. tmpcontext is a tuple-lifespan context to hold cruft + * created while traversing each tuple to find record subfields. + */ +typedef struct TQueueDestReceiver +{ + DestReceiver pub; /* public fields */ + shm_mq_handle *queue; /* shm_mq to send to */ + MemoryContext mycontext; /* context containing TQueueDestReceiver */ + MemoryContext tmpcontext; /* per-tuple context, if needed */ + HTAB *recordhtab; /* table of transmitted typmods, if needed */ + char mode; /* current message mode */ + TupleDesc tupledesc; /* current top-level tuple descriptor */ + TupleRemapInfo **field_remapinfo; /* current top-level remap info */ } TQueueDestReceiver; -typedef struct RecordTypemodMap +/* + * Hash table entries for mapping remote to local typmods. + */ +typedef struct RecordTypmodMap { - int remotetypmod; - int localtypmod; -} RecordTypemodMap; + int32 remotetypmod; /* hash key (must be first!) */ + int32 localtypmod; +} RecordTypmodMap; +/* + * TupleQueueReader object's private contents + * + * queue and tupledesc are pointers to data supplied by reader's caller. + * The typmodmap and remap info are owned by the TupleQueueReader and + * are kept in mycontext. + * + * "typedef struct TupleQueueReader TupleQueueReader" is in tqueue.h + */ struct TupleQueueReader { - shm_mq_handle *queue; - char mode; - TupleDesc tupledesc; - RemapInfo *remapinfo; - HTAB *typmodmap; + shm_mq_handle *queue; /* shm_mq to receive from */ + MemoryContext mycontext; /* context containing TupleQueueReader */ + HTAB *typmodmap; /* RecordTypmodMap hashtable, if needed */ + char mode; /* current message mode */ + TupleDesc tupledesc; /* current top-level tuple descriptor */ + TupleRemapInfo **field_remapinfo; /* current top-level remap info */ }; -#define TUPLE_QUEUE_MODE_CONTROL 'c' -#define TUPLE_QUEUE_MODE_DATA 'd' - -static void tqueueWalk(TQueueDestReceiver *tqueue, RemapClass walktype, - Datum value); -static void tqueueWalkRecord(TQueueDestReceiver *tqueue, Datum value); -static void tqueueWalkArray(TQueueDestReceiver *tqueue, Datum value); -static void tqueueWalkRange(TQueueDestReceiver *tqueue, Datum value); -static void tqueueSendTypmodInfo(TQueueDestReceiver *tqueue, int typmod, - TupleDesc tupledesc); +/* Local function prototypes */ +static void TQExamine(TQueueDestReceiver *tqueue, + TupleRemapInfo *remapinfo, + Datum value); +static void TQExamineArray(TQueueDestReceiver *tqueue, + ArrayRemapInfo *remapinfo, + Datum value); +static void TQExamineRange(TQueueDestReceiver *tqueue, + RangeRemapInfo *remapinfo, + Datum value); +static void TQExamineRecord(TQueueDestReceiver *tqueue, + RecordRemapInfo *remapinfo, + Datum value); +static void TQSendRecordInfo(TQueueDestReceiver *tqueue, int32 typmod, + TupleDesc tupledesc); static void TupleQueueHandleControlMessage(TupleQueueReader *reader, Size nbytes, char *data); static HeapTuple TupleQueueHandleDataMessage(TupleQueueReader *reader, Size nbytes, HeapTupleHeader data); -static HeapTuple TupleQueueRemapTuple(TupleQueueReader *reader, - TupleDesc tupledesc, RemapInfo *remapinfo, - HeapTuple tuple); -static Datum TupleQueueRemap(TupleQueueReader *reader, RemapClass remapclass, - Datum value); -static Datum TupleQueueRemapArray(TupleQueueReader *reader, Datum value); -static Datum TupleQueueRemapRange(TupleQueueReader *reader, Datum value); -static Datum TupleQueueRemapRecord(TupleQueueReader *reader, Datum value); -static RemapClass GetRemapClass(Oid typeid); -static RemapInfo *BuildRemapInfo(TupleDesc tupledesc); +static HeapTuple TQRemapTuple(TupleQueueReader *reader, + TupleDesc tupledesc, + TupleRemapInfo **field_remapinfo, + HeapTuple tuple); +static Datum TQRemap(TupleQueueReader *reader, TupleRemapInfo *remapinfo, + Datum value, bool *changed); +static Datum TQRemapArray(TupleQueueReader *reader, ArrayRemapInfo *remapinfo, + Datum value, bool *changed); +static Datum TQRemapRange(TupleQueueReader *reader, RangeRemapInfo *remapinfo, + Datum value, bool *changed); +static Datum TQRemapRecord(TupleQueueReader *reader, RecordRemapInfo *remapinfo, + Datum value, bool *changed); +static TupleRemapInfo *BuildTupleRemapInfo(Oid typid, MemoryContext mycontext); +static TupleRemapInfo *BuildArrayRemapInfo(Oid elemtypid, + MemoryContext mycontext); +static TupleRemapInfo *BuildRangeRemapInfo(Oid rngtypid, + MemoryContext mycontext); +static TupleRemapInfo **BuildFieldRemapInfo(TupleDesc tupledesc, + MemoryContext mycontext); + /* - * Receive a tuple. + * Receive a tuple from a query, and send it to the designated shm_mq. * - * This is, at core, pretty simple: just send the tuple to the designated - * shm_mq. The complicated part is that if the tuple contains transient - * record types (see lookup_rowtype_tupdesc), we need to send control - * information to the shm_mq receiver so that those typemods can be correctly - * interpreted, as they are merely held in a backend-local cache. Worse, the - * record type may not at the top level: we could have a range over an array - * type over a range type over a range type over an array type over a record, - * or something like that. + * Returns TRUE if successful, FALSE if shm_mq has been detached. */ static bool tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self) @@ -124,43 +229,49 @@ tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self) shm_mq_result result; /* - * Test to see whether the tupledesc has changed; if so, set up for the - * new tupledesc. This is a strange test both because the executor really + * If first time through, compute remapping info for the top-level fields. + * On later calls, if the tupledesc has changed, set up for the new + * tupledesc. (This is a strange test both because the executor really * shouldn't change the tupledesc, and also because it would be unsafe if * the old tupledesc could be freed and a new one allocated at the same * address. But since some very old code in printtup.c uses a similar - * test, we adopt it here as well. + * approach, we adopt it here as well.) + * + * Here and elsewhere in this module, when replacing remapping info we + * pfree the top-level object because that's easy, but we don't bother to + * recursively free any substructure. This would lead to query-lifespan + * memory leaks if the mapping info actually changed frequently, but since + * we don't expect that to happen, it doesn't seem worth expending code to + * prevent it. */ if (tqueue->tupledesc != tupledesc) { - if (tqueue->remapinfo != NULL) - pfree(tqueue->remapinfo); - tqueue->remapinfo = BuildRemapInfo(tupledesc); + /* Is it worth trying to free substructure of the remap tree? */ + if (tqueue->field_remapinfo != NULL) + pfree(tqueue->field_remapinfo); + tqueue->field_remapinfo = BuildFieldRemapInfo(tupledesc, + tqueue->mycontext); tqueue->tupledesc = tupledesc; } - tuple = ExecMaterializeSlot(slot); - /* - * When, because of the types being transmitted, no record typemod mapping + * When, because of the types being transmitted, no record typmod mapping * can be needed, we can skip a good deal of work. */ - if (tqueue->remapinfo != NULL) + if (tqueue->field_remapinfo != NULL) { - RemapInfo *remapinfo = tqueue->remapinfo; - AttrNumber i; + TupleRemapInfo **remapinfo = tqueue->field_remapinfo; + int i; MemoryContext oldcontext = NULL; - /* Deform the tuple so we can examine it, if not done already. */ + /* Deform the tuple so we can examine fields, if not done already. */ slot_getallattrs(slot); - /* Iterate over each attribute and search it for transient typemods. */ - Assert(slot->tts_tupleDescriptor->natts == remapinfo->natts); - for (i = 0; i < remapinfo->natts; ++i) + /* Iterate over each attribute and search it for transient typmods. */ + for (i = 0; i < tupledesc->natts; i++) { /* Ignore nulls and types that don't need special handling. */ - if (slot->tts_isnull[i] || - remapinfo->mapping[i] == TQUEUE_REMAP_NONE) + if (slot->tts_isnull[i] || remapinfo[i] == NULL) continue; /* Switch to temporary memory context to avoid leaking. */ @@ -168,16 +279,16 @@ tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self) { if (tqueue->tmpcontext == NULL) tqueue->tmpcontext = - AllocSetContextCreate(TopMemoryContext, - "tqueue temporary context", + AllocSetContextCreate(tqueue->mycontext, + "tqueue sender temp context", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); oldcontext = MemoryContextSwitchTo(tqueue->tmpcontext); } - /* Invoke the appropriate walker function. */ - tqueueWalk(tqueue, remapinfo->mapping[i], slot->tts_values[i]); + /* Examine the value. */ + TQExamine(tqueue, remapinfo[i], slot->tts_values[i]); } /* If we used the temp context, reset it and restore prior context. */ @@ -191,217 +302,232 @@ tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self) if (tqueue->mode != TUPLE_QUEUE_MODE_DATA) { tqueue->mode = TUPLE_QUEUE_MODE_DATA; - shm_mq_send(tqueue->handle, sizeof(char), &tqueue->mode, false); + shm_mq_send(tqueue->queue, sizeof(char), &tqueue->mode, false); } } /* Send the tuple itself. */ - result = shm_mq_send(tqueue->handle, tuple->t_len, tuple->t_data, false); + tuple = ExecMaterializeSlot(slot); + result = shm_mq_send(tqueue->queue, tuple->t_len, tuple->t_data, false); + /* Check for failure. */ if (result == SHM_MQ_DETACHED) return false; else if (result != SHM_MQ_SUCCESS) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("could not send tuples"))); + errmsg("could not send tuple to shared-memory queue"))); return true; } /* - * Invoke the appropriate walker function based on the given RemapClass. + * Examine the given datum and send any necessary control messages for + * transient record types contained in it. + * + * remapinfo is previously-computed remapping info about the datum's type. + * + * This function just dispatches based on the remap class. */ static void -tqueueWalk(TQueueDestReceiver *tqueue, RemapClass walktype, Datum value) +TQExamine(TQueueDestReceiver *tqueue, TupleRemapInfo *remapinfo, Datum value) { + /* This is recursive, so it could be driven to stack overflow. */ check_stack_depth(); - switch (walktype) + switch (remapinfo->remapclass) { - case TQUEUE_REMAP_NONE: - break; case TQUEUE_REMAP_ARRAY: - tqueueWalkArray(tqueue, value); + TQExamineArray(tqueue, &remapinfo->u.arr, value); break; case TQUEUE_REMAP_RANGE: - tqueueWalkRange(tqueue, value); + TQExamineRange(tqueue, &remapinfo->u.rng, value); break; case TQUEUE_REMAP_RECORD: - tqueueWalkRecord(tqueue, value); + TQExamineRecord(tqueue, &remapinfo->u.rec, value); break; } } /* - * Walk a record and send control messages for transient record types - * contained therein. + * Examine a record datum and send any necessary control messages for + * transient record types contained in it. */ static void -tqueueWalkRecord(TQueueDestReceiver *tqueue, Datum value) +TQExamineRecord(TQueueDestReceiver *tqueue, RecordRemapInfo *remapinfo, + Datum value) { HeapTupleHeader tup; - Oid typeid; - Oid typmod; + Oid typid; + int32 typmod; TupleDesc tupledesc; - RemapInfo *remapinfo; - /* Extract typmod from tuple. */ + /* Extract type OID and typmod from tuple. */ tup = DatumGetHeapTupleHeader(value); - typeid = HeapTupleHeaderGetTypeId(tup); + typid = HeapTupleHeaderGetTypeId(tup); typmod = HeapTupleHeaderGetTypMod(tup); - /* Look up tuple descriptor in typecache. */ - tupledesc = lookup_rowtype_tupdesc(typeid, typmod); - /* - * If this is a transient record time, send its TupleDesc as a control - * message. (tqueueSendTypemodInfo is smart enough to do this only once - * per typmod.) + * If first time through, or if this isn't the same composite type as last + * time, consider sending a control message, and then look up the + * necessary information for examining the fields. */ - if (typeid == RECORDOID) - tqueueSendTypmodInfo(tqueue, typmod, tupledesc); + if (typid != remapinfo->rectypid || typmod != remapinfo->rectypmod) + { + /* Free any old data. */ + if (remapinfo->tupledesc != NULL) + FreeTupleDesc(remapinfo->tupledesc); + /* Is it worth trying to free substructure of the remap tree? */ + if (remapinfo->field_remap != NULL) + pfree(remapinfo->field_remap); - /* - * Build the remap information for this tupledesc. We might want to think - * about keeping a cache of this information keyed by typeid and typemod, - * but let's keep it simple for now. - */ - remapinfo = BuildRemapInfo(tupledesc); + /* Look up tuple descriptor in typcache. */ + tupledesc = lookup_rowtype_tupdesc(typid, typmod); + + /* + * If this is a transient record type, send the tupledesc in a control + * message. (TQSendRecordInfo is smart enough to do this only once + * per typmod.) + */ + if (typid == RECORDOID) + TQSendRecordInfo(tqueue, typmod, tupledesc); + + /* Figure out whether fields need recursive processing. */ + remapinfo->field_remap = BuildFieldRemapInfo(tupledesc, + tqueue->mycontext); + if (remapinfo->field_remap != NULL) + { + /* + * We need to inspect the record contents, so save a copy of the + * tupdesc. (We could possibly just reference the typcache's + * copy, but then it's problematic when to release the refcount.) + */ + MemoryContext oldcontext = MemoryContextSwitchTo(tqueue->mycontext); + + remapinfo->tupledesc = CreateTupleDescCopy(tupledesc); + MemoryContextSwitchTo(oldcontext); + } + else + { + /* No fields of the record require remapping. */ + remapinfo->tupledesc = NULL; + } + remapinfo->rectypid = typid; + remapinfo->rectypmod = typmod; + + /* Release reference count acquired by lookup_rowtype_tupdesc. */ + DecrTupleDescRefCount(tupledesc); + } /* - * If remapping is required, deform the tuple and process each field. When - * BuildRemapInfo is null, the data types are such that there can be no - * transient record types here, so we can skip all this work. + * If field remapping is required, deform the tuple and examine each + * field. */ - if (remapinfo != NULL) + if (remapinfo->field_remap != NULL) { Datum *values; bool *isnull; HeapTupleData tdata; - AttrNumber i; + int i; /* Deform the tuple so we can check each column within. */ - values = palloc(tupledesc->natts * sizeof(Datum)); - isnull = palloc(tupledesc->natts * sizeof(bool)); + tupledesc = remapinfo->tupledesc; + values = (Datum *) palloc(tupledesc->natts * sizeof(Datum)); + isnull = (bool *) palloc(tupledesc->natts * sizeof(bool)); tdata.t_len = HeapTupleHeaderGetDatumLength(tup); ItemPointerSetInvalid(&(tdata.t_self)); tdata.t_tableOid = InvalidOid; tdata.t_data = tup; heap_deform_tuple(&tdata, tupledesc, values, isnull); - /* Recursively check each non-NULL attribute. */ - for (i = 0; i < tupledesc->natts; ++i) - if (!isnull[i]) - tqueueWalk(tqueue, remapinfo->mapping[i], values[i]); - } + /* Recursively check each interesting non-NULL attribute. */ + for (i = 0; i < tupledesc->natts; i++) + { + if (!isnull[i] && remapinfo->field_remap[i]) + TQExamine(tqueue, remapinfo->field_remap[i], values[i]); + } - /* Release reference count acquired by lookup_rowtype_tupdesc. */ - DecrTupleDescRefCount(tupledesc); + /* Need not clean up, since we're in a short-lived context. */ + } } /* - * Walk a record and send control messages for transient record types - * contained therein. + * Examine an array datum and send any necessary control messages for + * transient record types contained in it. */ static void -tqueueWalkArray(TQueueDestReceiver *tqueue, Datum value) +TQExamineArray(TQueueDestReceiver *tqueue, ArrayRemapInfo *remapinfo, + Datum value) { ArrayType *arr = DatumGetArrayTypeP(value); - Oid typeid = ARR_ELEMTYPE(arr); - RemapClass remapclass; - int16 typlen; - bool typbyval; - char typalign; + Oid typid = ARR_ELEMTYPE(arr); Datum *elem_values; bool *elem_nulls; int num_elems; int i; - remapclass = GetRemapClass(typeid); - - /* - * If the elements of the array don't need to be walked, we shouldn't have - * been called in the first place: GetRemapClass should have returned NULL - * when asked about this array type. - */ - Assert(remapclass != TQUEUE_REMAP_NONE); - /* Deconstruct the array. */ - get_typlenbyvalalign(typeid, &typlen, &typbyval, &typalign); - deconstruct_array(arr, typeid, typlen, typbyval, typalign, + deconstruct_array(arr, typid, remapinfo->typlen, + remapinfo->typbyval, remapinfo->typalign, &elem_values, &elem_nulls, &num_elems); - /* Walk each element. */ - for (i = 0; i < num_elems; ++i) + /* Examine each element. */ + for (i = 0; i < num_elems; i++) + { if (!elem_nulls[i]) - tqueueWalk(tqueue, remapclass, elem_values[i]); + TQExamine(tqueue, remapinfo->element_remap, elem_values[i]); + } } /* - * Walk a range type and send control messages for transient record types - * contained therein. + * Examine a range datum and send any necessary control messages for + * transient record types contained in it. */ static void -tqueueWalkRange(TQueueDestReceiver *tqueue, Datum value) +TQExamineRange(TQueueDestReceiver *tqueue, RangeRemapInfo *remapinfo, + Datum value) { RangeType *range = DatumGetRangeType(value); - Oid typeid = RangeTypeGetOid(range); - RemapClass remapclass; - TypeCacheEntry *typcache; RangeBound lower; RangeBound upper; bool empty; - /* - * Extract the lower and upper bounds. It might be worth implementing - * some caching scheme here so that we don't look up the same typeids in - * the type cache repeatedly, but for now let's keep it simple. - */ - typcache = lookup_type_cache(typeid, TYPECACHE_RANGE_INFO); - if (typcache->rngelemtype == NULL) - elog(ERROR, "type %u is not a range type", typeid); - range_deserialize(typcache, range, &lower, &upper, &empty); + /* Extract the lower and upper bounds. */ + range_deserialize(remapinfo->typcache, range, &lower, &upper, &empty); /* Nothing to do for an empty range. */ if (empty) return; - /* - * If the range bounds don't need to be walked, we shouldn't have been - * called in the first place: GetRemapClass should have returned NULL when - * asked about this range type. - */ - remapclass = GetRemapClass(typcache->rngelemtype->type_id); - Assert(remapclass != TQUEUE_REMAP_NONE); - - /* Walk each bound, if present. */ + /* Examine each bound, if present. */ if (!upper.infinite) - tqueueWalk(tqueue, remapclass, upper.val); + TQExamine(tqueue, remapinfo->bound_remap, upper.val); if (!lower.infinite) - tqueueWalk(tqueue, remapclass, lower.val); + TQExamine(tqueue, remapinfo->bound_remap, lower.val); } /* - * Send tuple descriptor information for a transient typemod, unless we've + * Send tuple descriptor information for a transient typmod, unless we've * already done so previously. */ static void -tqueueSendTypmodInfo(TQueueDestReceiver *tqueue, int typmod, - TupleDesc tupledesc) +TQSendRecordInfo(TQueueDestReceiver *tqueue, int32 typmod, TupleDesc tupledesc) { StringInfoData buf; bool found; - AttrNumber i; + int i; /* Initialize hash table if not done yet. */ if (tqueue->recordhtab == NULL) { HASHCTL ctl; - ctl.keysize = sizeof(int); - ctl.entrysize = sizeof(int); - ctl.hcxt = TopMemoryContext; - tqueue->recordhtab = hash_create("tqueue record hashtable", + MemSet(&ctl, 0, sizeof(ctl)); + /* Hash table entries are just typmods */ + ctl.keysize = sizeof(int32); + ctl.entrysize = sizeof(int32); + ctl.hcxt = tqueue->mycontext; + tqueue->recordhtab = hash_create("tqueue sender record type hashtable", 100, &ctl, HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); } @@ -411,25 +537,30 @@ tqueueSendTypmodInfo(TQueueDestReceiver *tqueue, int typmod, if (found) return; + elog(DEBUG3, "sending tqueue control message for record typmod %d", typmod); + /* If message queue is in data mode, switch to control mode. */ if (tqueue->mode != TUPLE_QUEUE_MODE_CONTROL) { tqueue->mode = TUPLE_QUEUE_MODE_CONTROL; - shm_mq_send(tqueue->handle, sizeof(char), &tqueue->mode, false); + shm_mq_send(tqueue->queue, sizeof(char), &tqueue->mode, false); } /* Assemble a control message. */ initStringInfo(&buf); - appendBinaryStringInfo(&buf, (char *) &typmod, sizeof(int)); + appendBinaryStringInfo(&buf, (char *) &typmod, sizeof(int32)); appendBinaryStringInfo(&buf, (char *) &tupledesc->natts, sizeof(int)); - appendBinaryStringInfo(&buf, (char *) &tupledesc->tdhasoid, - sizeof(bool)); - for (i = 0; i < tupledesc->natts; ++i) + appendBinaryStringInfo(&buf, (char *) &tupledesc->tdhasoid, sizeof(bool)); + for (i = 0; i < tupledesc->natts; i++) + { appendBinaryStringInfo(&buf, (char *) tupledesc->attrs[i], sizeof(FormData_pg_attribute)); + } /* Send control message. */ - shm_mq_send(tqueue->handle, buf.len, buf.data, false); + shm_mq_send(tqueue->queue, buf.len, buf.data, false); + + /* We assume it's OK to leak buf because we're in a short-lived context. */ } /* @@ -449,7 +580,7 @@ tqueueShutdownReceiver(DestReceiver *self) { TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self; - shm_mq_detach(shm_mq_get_queue(tqueue->handle)); + shm_mq_detach(shm_mq_get_queue(tqueue->queue)); } /* @@ -464,8 +595,9 @@ tqueueDestroyReceiver(DestReceiver *self) MemoryContextDelete(tqueue->tmpcontext); if (tqueue->recordhtab != NULL) hash_destroy(tqueue->recordhtab); - if (tqueue->remapinfo != NULL) - pfree(tqueue->remapinfo); + /* Is it worth trying to free substructure of the remap tree? */ + if (tqueue->field_remapinfo != NULL) + pfree(tqueue->field_remapinfo); pfree(self); } @@ -484,11 +616,14 @@ CreateTupleQueueDestReceiver(shm_mq_handle *handle) self->pub.rShutdown = tqueueShutdownReceiver; self->pub.rDestroy = tqueueDestroyReceiver; self->pub.mydest = DestTupleQueue; - self->handle = handle; + self->queue = handle; + self->mycontext = CurrentMemoryContext; self->tmpcontext = NULL; self->recordhtab = NULL; self->mode = TUPLE_QUEUE_MODE_DATA; - self->remapinfo = NULL; + /* Top-level tupledesc is not known yet */ + self->tupledesc = NULL; + self->field_remapinfo = NULL; return (DestReceiver *) self; } @@ -502,9 +637,11 @@ CreateTupleQueueReader(shm_mq_handle *handle, TupleDesc tupledesc) TupleQueueReader *reader = palloc0(sizeof(TupleQueueReader)); reader->queue = handle; + reader->mycontext = CurrentMemoryContext; + reader->typmodmap = NULL; reader->mode = TUPLE_QUEUE_MODE_DATA; reader->tupledesc = tupledesc; - reader->remapinfo = BuildRemapInfo(tupledesc); + reader->field_remapinfo = BuildFieldRemapInfo(tupledesc, reader->mycontext); return reader; } @@ -516,8 +653,11 @@ void DestroyTupleQueueReader(TupleQueueReader *reader) { shm_mq_detach(shm_mq_get_queue(reader->queue)); - if (reader->remapinfo != NULL) - pfree(reader->remapinfo); + if (reader->typmodmap != NULL) + hash_destroy(reader->typmodmap); + /* Is it worth trying to free substructure of the remap tree? */ + if (reader->field_remapinfo != NULL) + pfree(reader->field_remapinfo); pfree(reader); } @@ -567,14 +707,7 @@ TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done) Assert(result == SHM_MQ_SUCCESS); /* - * OK, we got a message. Process it. - * - * One-byte messages are mode switch messages, so that we can switch - * between "control" and "data" mode. Otherwise, when in "data" mode, - * each message is a tuple. When in "control" mode, each message - * provides a transient-typmod-to-tupledesc mapping to let us - * interpret future tuples. Both of those cases certainly require - * more than one byte, so no confusion is possible. + * We got a message (see message spec at top of file). Process it. */ if (nbytes == 1) { @@ -592,7 +725,7 @@ TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done) TupleQueueHandleControlMessage(reader, nbytes, data); } else - elog(ERROR, "invalid mode: %d", (int) reader->mode); + elog(ERROR, "unrecognized tqueue mode: %d", (int) reader->mode); } } @@ -606,220 +739,306 @@ TupleQueueHandleDataMessage(TupleQueueReader *reader, { HeapTupleData htup; + /* + * Set up a dummy HeapTupleData pointing to the data from the shm_mq + * (which had better be sufficiently aligned). + */ ItemPointerSetInvalid(&htup.t_self); htup.t_tableOid = InvalidOid; htup.t_len = nbytes; htup.t_data = data; - return TupleQueueRemapTuple(reader, reader->tupledesc, reader->remapinfo, - &htup); + /* + * Either just copy the data into a regular palloc'd tuple, or remap it, + * as required. + */ + return TQRemapTuple(reader, + reader->tupledesc, + reader->field_remapinfo, + &htup); } /* - * Remap tuple typmods per control information received from remote side. + * Copy the given tuple, remapping any transient typmods contained in it. */ static HeapTuple -TupleQueueRemapTuple(TupleQueueReader *reader, TupleDesc tupledesc, - RemapInfo *remapinfo, HeapTuple tuple) +TQRemapTuple(TupleQueueReader *reader, + TupleDesc tupledesc, + TupleRemapInfo **field_remapinfo, + HeapTuple tuple) { Datum *values; bool *isnull; + bool changed = false; int i; /* * If no remapping is necessary, just copy the tuple into a single * palloc'd chunk, as caller will expect. */ - if (remapinfo == NULL) + if (field_remapinfo == NULL) return heap_copytuple(tuple); /* Deform tuple so we can remap record typmods for individual attrs. */ - values = palloc(tupledesc->natts * sizeof(Datum)); - isnull = palloc(tupledesc->natts * sizeof(bool)); + values = (Datum *) palloc(tupledesc->natts * sizeof(Datum)); + isnull = (bool *) palloc(tupledesc->natts * sizeof(bool)); heap_deform_tuple(tuple, tupledesc, values, isnull); - Assert(tupledesc->natts == remapinfo->natts); - /* Recursively check each non-NULL attribute. */ - for (i = 0; i < tupledesc->natts; ++i) + /* Recursively process each interesting non-NULL attribute. */ + for (i = 0; i < tupledesc->natts; i++) { - if (isnull[i] || remapinfo->mapping[i] == TQUEUE_REMAP_NONE) + if (isnull[i] || field_remapinfo[i] == NULL) continue; - values[i] = TupleQueueRemap(reader, remapinfo->mapping[i], values[i]); + values[i] = TQRemap(reader, field_remapinfo[i], values[i], &changed); } - /* Reform the modified tuple. */ - return heap_form_tuple(tupledesc, values, isnull); + /* Reconstruct the modified tuple, if anything was modified. */ + if (changed) + return heap_form_tuple(tupledesc, values, isnull); + else + return heap_copytuple(tuple); } /* - * Remap a value based on the specified remap class. + * Process the given datum and replace any transient record typmods + * contained in it. Set *changed to TRUE if we actually changed the datum. + * + * remapinfo is previously-computed remapping info about the datum's type. + * + * This function just dispatches based on the remap class. */ static Datum -TupleQueueRemap(TupleQueueReader *reader, RemapClass remapclass, Datum value) +TQRemap(TupleQueueReader *reader, TupleRemapInfo *remapinfo, + Datum value, bool *changed) { + /* This is recursive, so it could be driven to stack overflow. */ check_stack_depth(); - switch (remapclass) + switch (remapinfo->remapclass) { - case TQUEUE_REMAP_NONE: - /* caller probably shouldn't have called us at all, but... */ - return value; - case TQUEUE_REMAP_ARRAY: - return TupleQueueRemapArray(reader, value); + return TQRemapArray(reader, &remapinfo->u.arr, value, changed); case TQUEUE_REMAP_RANGE: - return TupleQueueRemapRange(reader, value); + return TQRemapRange(reader, &remapinfo->u.rng, value, changed); case TQUEUE_REMAP_RECORD: - return TupleQueueRemapRecord(reader, value); + return TQRemapRecord(reader, &remapinfo->u.rec, value, changed); } - elog(ERROR, "unknown remap class: %d", (int) remapclass); + elog(ERROR, "unrecognized tqueue remap class: %d", + (int) remapinfo->remapclass); return (Datum) 0; } /* - * Remap an array. + * Process the given array datum and replace any transient record typmods + * contained in it. Set *changed to TRUE if we actually changed the datum. */ static Datum -TupleQueueRemapArray(TupleQueueReader *reader, Datum value) +TQRemapArray(TupleQueueReader *reader, ArrayRemapInfo *remapinfo, + Datum value, bool *changed) { ArrayType *arr = DatumGetArrayTypeP(value); - Oid typeid = ARR_ELEMTYPE(arr); - RemapClass remapclass; - int16 typlen; - bool typbyval; - char typalign; + Oid typid = ARR_ELEMTYPE(arr); + bool element_changed = false; Datum *elem_values; bool *elem_nulls; int num_elems; int i; - remapclass = GetRemapClass(typeid); - - /* - * If the elements of the array don't need to be walked, we shouldn't have - * been called in the first place: GetRemapClass should have returned NULL - * when asked about this array type. - */ - Assert(remapclass != TQUEUE_REMAP_NONE); - /* Deconstruct the array. */ - get_typlenbyvalalign(typeid, &typlen, &typbyval, &typalign); - deconstruct_array(arr, typeid, typlen, typbyval, typalign, + deconstruct_array(arr, typid, remapinfo->typlen, + remapinfo->typbyval, remapinfo->typalign, &elem_values, &elem_nulls, &num_elems); /* Remap each element. */ - for (i = 0; i < num_elems; ++i) + for (i = 0; i < num_elems; i++) + { if (!elem_nulls[i]) - elem_values[i] = TupleQueueRemap(reader, remapclass, - elem_values[i]); - - /* Reconstruct and return the array. */ - arr = construct_md_array(elem_values, elem_nulls, - ARR_NDIM(arr), ARR_DIMS(arr), ARR_LBOUND(arr), - typeid, typlen, typbyval, typalign); - return PointerGetDatum(arr); + elem_values[i] = TQRemap(reader, + remapinfo->element_remap, + elem_values[i], + &element_changed); + } + + if (element_changed) + { + /* Reconstruct and return the array. */ + *changed = true; + arr = construct_md_array(elem_values, elem_nulls, + ARR_NDIM(arr), ARR_DIMS(arr), ARR_LBOUND(arr), + typid, remapinfo->typlen, + remapinfo->typbyval, remapinfo->typalign); + return PointerGetDatum(arr); + } + + /* Else just return the value as-is. */ + return value; } /* - * Remap a range type. + * Process the given range datum and replace any transient record typmods + * contained in it. Set *changed to TRUE if we actually changed the datum. */ static Datum -TupleQueueRemapRange(TupleQueueReader *reader, Datum value) +TQRemapRange(TupleQueueReader *reader, RangeRemapInfo *remapinfo, + Datum value, bool *changed) { RangeType *range = DatumGetRangeType(value); - Oid typeid = RangeTypeGetOid(range); - RemapClass remapclass; - TypeCacheEntry *typcache; + bool bound_changed = false; RangeBound lower; RangeBound upper; bool empty; - /* - * Extract the lower and upper bounds. As in tqueueWalkRange, some - * caching might be a good idea here. - */ - typcache = lookup_type_cache(typeid, TYPECACHE_RANGE_INFO); - if (typcache->rngelemtype == NULL) - elog(ERROR, "type %u is not a range type", typeid); - range_deserialize(typcache, range, &lower, &upper, &empty); + /* Extract the lower and upper bounds. */ + range_deserialize(remapinfo->typcache, range, &lower, &upper, &empty); /* Nothing to do for an empty range. */ if (empty) return value; - /* - * If the range bounds don't need to be walked, we shouldn't have been - * called in the first place: GetRemapClass should have returned NULL when - * asked about this range type. - */ - remapclass = GetRemapClass(typcache->rngelemtype->type_id); - Assert(remapclass != TQUEUE_REMAP_NONE); - /* Remap each bound, if present. */ if (!upper.infinite) - upper.val = TupleQueueRemap(reader, remapclass, upper.val); + upper.val = TQRemap(reader, remapinfo->bound_remap, + upper.val, &bound_changed); if (!lower.infinite) - lower.val = TupleQueueRemap(reader, remapclass, lower.val); + lower.val = TQRemap(reader, remapinfo->bound_remap, + lower.val, &bound_changed); - /* And reserialize. */ - range = range_serialize(typcache, &lower, &upper, empty); - return RangeTypeGetDatum(range); + if (bound_changed) + { + /* Reserialize. */ + *changed = true; + range = range_serialize(remapinfo->typcache, &lower, &upper, empty); + return RangeTypeGetDatum(range); + } + + /* Else just return the value as-is. */ + return value; } /* - * Remap a record. + * Process the given record datum and replace any transient record typmods + * contained in it. Set *changed to TRUE if we actually changed the datum. */ static Datum -TupleQueueRemapRecord(TupleQueueReader *reader, Datum value) +TQRemapRecord(TupleQueueReader *reader, RecordRemapInfo *remapinfo, + Datum value, bool *changed) { HeapTupleHeader tup; - Oid typeid; - int typmod; - RecordTypemodMap *mapent; + Oid typid; + int32 typmod; + bool changed_typmod; TupleDesc tupledesc; - RemapInfo *remapinfo; - HeapTupleData htup; - HeapTuple atup; - /* Fetch type OID and typemod. */ + /* Extract type OID and typmod from tuple. */ tup = DatumGetHeapTupleHeader(value); - typeid = HeapTupleHeaderGetTypeId(tup); + typid = HeapTupleHeaderGetTypeId(tup); typmod = HeapTupleHeaderGetTypMod(tup); + /* + * If first time through, or if this isn't the same composite type as last + * time, identify the required typmod mapping, and then look up the + * necessary information for processing the fields. + */ + if (typid != remapinfo->rectypid || typmod != remapinfo->rectypmod) + { + /* Free any old data. */ + if (remapinfo->tupledesc != NULL) + FreeTupleDesc(remapinfo->tupledesc); + /* Is it worth trying to free substructure of the remap tree? */ + if (remapinfo->field_remap != NULL) + pfree(remapinfo->field_remap); + + /* If transient record type, look up matching local typmod. */ + if (typid == RECORDOID) + { + RecordTypmodMap *mapent; + + Assert(reader->typmodmap != NULL); + mapent = hash_search(reader->typmodmap, &typmod, + HASH_FIND, NULL); + if (mapent == NULL) + elog(ERROR, "tqueue received unrecognized remote typmod %d", + typmod); + remapinfo->localtypmod = mapent->localtypmod; + } + else + remapinfo->localtypmod = -1; + + /* Look up tuple descriptor in typcache. */ + tupledesc = lookup_rowtype_tupdesc(typid, remapinfo->localtypmod); + + /* Figure out whether fields need recursive processing. */ + remapinfo->field_remap = BuildFieldRemapInfo(tupledesc, + reader->mycontext); + if (remapinfo->field_remap != NULL) + { + /* + * We need to inspect the record contents, so save a copy of the + * tupdesc. (We could possibly just reference the typcache's + * copy, but then it's problematic when to release the refcount.) + */ + MemoryContext oldcontext = MemoryContextSwitchTo(reader->mycontext); + + remapinfo->tupledesc = CreateTupleDescCopy(tupledesc); + MemoryContextSwitchTo(oldcontext); + } + else + { + /* No fields of the record require remapping. */ + remapinfo->tupledesc = NULL; + } + remapinfo->rectypid = typid; + remapinfo->rectypmod = typmod; + + /* Release reference count acquired by lookup_rowtype_tupdesc. */ + DecrTupleDescRefCount(tupledesc); + } + /* If transient record, replace remote typmod with local typmod. */ - if (typeid == RECORDOID) + if (typid == RECORDOID && typmod != remapinfo->localtypmod) { - Assert(reader->typmodmap != NULL); - mapent = hash_search(reader->typmodmap, &typmod, - HASH_FIND, NULL); - if (mapent == NULL) - elog(ERROR, "found unrecognized remote typmod %d", typmod); - typmod = mapent->localtypmod; + typmod = remapinfo->localtypmod; + changed_typmod = true; } + else + changed_typmod = false; /* - * Fetch tupledesc and compute remap info. We should probably cache this - * so that we don't have to keep recomputing it. + * If we need to change the typmod, or if there are any potentially + * remappable fields, replace the tuple. */ - tupledesc = lookup_rowtype_tupdesc(typeid, typmod); - remapinfo = BuildRemapInfo(tupledesc); - DecrTupleDescRefCount(tupledesc); + if (changed_typmod || remapinfo->field_remap != NULL) + { + HeapTupleData htup; + HeapTuple atup; + + /* For now, assume we always need to change the tuple in this case. */ + *changed = true; + + /* Copy tuple, possibly remapping contained fields. */ + ItemPointerSetInvalid(&htup.t_self); + htup.t_tableOid = InvalidOid; + htup.t_len = HeapTupleHeaderGetDatumLength(tup); + htup.t_data = tup; + atup = TQRemapTuple(reader, + remapinfo->tupledesc, + remapinfo->field_remap, + &htup); + + /* Apply the correct labeling for a local Datum. */ + HeapTupleHeaderSetTypeId(atup->t_data, typid); + HeapTupleHeaderSetTypMod(atup->t_data, typmod); + HeapTupleHeaderSetDatumLength(atup->t_data, htup.t_len); + + /* And return the results. */ + return HeapTupleHeaderGetDatum(atup->t_data); + } - /* Remap tuple. */ - ItemPointerSetInvalid(&htup.t_self); - htup.t_tableOid = InvalidOid; - htup.t_len = HeapTupleHeaderGetDatumLength(tup); - htup.t_data = tup; - atup = TupleQueueRemapTuple(reader, tupledesc, remapinfo, &htup); - HeapTupleHeaderSetTypeId(atup->t_data, typeid); - HeapTupleHeaderSetTypMod(atup->t_data, typmod); - HeapTupleHeaderSetDatumLength(atup->t_data, htup.t_len); - - /* And return the results. */ - return HeapTupleHeaderGetDatum(atup->t_data); + /* Else just return the value as-is. */ + return value; } /* @@ -833,57 +1052,54 @@ static void TupleQueueHandleControlMessage(TupleQueueReader *reader, Size nbytes, char *data) { + int32 remotetypmod; int natts; - int remotetypmod; bool hasoid; - char *buf = data; - int rc = 0; - int i; + Size offset = 0; Form_pg_attribute *attrs; - MemoryContext oldcontext; TupleDesc tupledesc; - RecordTypemodMap *mapent; + RecordTypmodMap *mapent; bool found; + int i; /* Extract remote typmod. */ - memcpy(&remotetypmod, &buf[rc], sizeof(int)); - rc += sizeof(int); + memcpy(&remotetypmod, &data[offset], sizeof(int32)); + offset += sizeof(int32); /* Extract attribute count. */ - memcpy(&natts, &buf[rc], sizeof(int)); - rc += sizeof(int); + memcpy(&natts, &data[offset], sizeof(int)); + offset += sizeof(int); /* Extract hasoid flag. */ - memcpy(&hasoid, &buf[rc], sizeof(bool)); - rc += sizeof(bool); + memcpy(&hasoid, &data[offset], sizeof(bool)); + offset += sizeof(bool); - /* Extract attribute details. */ - oldcontext = MemoryContextSwitchTo(CurTransactionContext); + /* Extract attribute details. The tupledesc made here is just transient. */ attrs = palloc(natts * sizeof(Form_pg_attribute)); - for (i = 0; i < natts; ++i) + for (i = 0; i < natts; i++) { attrs[i] = palloc(sizeof(FormData_pg_attribute)); - memcpy(attrs[i], &buf[rc], sizeof(FormData_pg_attribute)); - rc += sizeof(FormData_pg_attribute); + memcpy(attrs[i], &data[offset], sizeof(FormData_pg_attribute)); + offset += sizeof(FormData_pg_attribute); } - MemoryContextSwitchTo(oldcontext); /* We should have read the whole message. */ - Assert(rc == nbytes); + Assert(offset == nbytes); - /* Construct TupleDesc. */ + /* Construct TupleDesc, and assign a local typmod. */ tupledesc = CreateTupleDesc(natts, hasoid, attrs); tupledesc = BlessTupleDesc(tupledesc); - /* Create map if it doesn't exist already. */ + /* Create mapping hashtable if it doesn't exist already. */ if (reader->typmodmap == NULL) { HASHCTL ctl; - ctl.keysize = sizeof(int); - ctl.entrysize = sizeof(RecordTypemodMap); - ctl.hcxt = CurTransactionContext; - reader->typmodmap = hash_create("typmodmap hashtable", + MemSet(&ctl, 0, sizeof(ctl)); + ctl.keysize = sizeof(int32); + ctl.entrysize = sizeof(RecordTypmodMap); + ctl.hcxt = reader->mycontext; + reader->typmodmap = hash_create("tqueue receiver record type hashtable", 100, &ctl, HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); } @@ -892,139 +1108,171 @@ TupleQueueHandleControlMessage(TupleQueueReader *reader, Size nbytes, mapent = hash_search(reader->typmodmap, &remotetypmod, HASH_ENTER, &found); if (found) - elog(ERROR, "duplicate message for typmod %d", + elog(ERROR, "duplicate tqueue control message for typmod %d", remotetypmod); mapent->localtypmod = tupledesc->tdtypmod; - elog(DEBUG3, "mapping remote typmod %d to local typmod %d", - remotetypmod, tupledesc->tdtypmod); + + elog(DEBUG3, "tqueue mapping remote typmod %d to local typmod %d", + remotetypmod, mapent->localtypmod); } /* - * Build a mapping indicating what remapping class applies to each attribute - * described by a tupledesc. + * Build remap info for the specified data type, storing it in mycontext. + * Returns NULL if neither the type nor any subtype could require remapping. */ -static RemapInfo * -BuildRemapInfo(TupleDesc tupledesc) +static TupleRemapInfo * +BuildTupleRemapInfo(Oid typid, MemoryContext mycontext) { - RemapInfo *remapinfo; - Size size; - AttrNumber i; - bool noop = true; + HeapTuple tup; + Form_pg_type typ; + + /* This is recursive, so it could be driven to stack overflow. */ + check_stack_depth(); - size = offsetof(RemapInfo, mapping) + - sizeof(RemapClass) * tupledesc->natts; - remapinfo = MemoryContextAllocZero(TopMemoryContext, size); - remapinfo->natts = tupledesc->natts; - for (i = 0; i < tupledesc->natts; ++i) +restart: + tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(typid)); + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for type %u", typid); + typ = (Form_pg_type) GETSTRUCT(tup); + + /* Look through domains to underlying base type. */ + if (typ->typtype == TYPTYPE_DOMAIN) { - Form_pg_attribute attr = tupledesc->attrs[i]; + typid = typ->typbasetype; + ReleaseSysCache(tup); + goto restart; + } - if (attr->attisdropped) - { - remapinfo->mapping[i] = TQUEUE_REMAP_NONE; - continue; - } + /* If it's a true array type, deal with it that way. */ + if (OidIsValid(typ->typelem) && typ->typlen == -1) + { + typid = typ->typelem; + ReleaseSysCache(tup); + return BuildArrayRemapInfo(typid, mycontext); + } - remapinfo->mapping[i] = GetRemapClass(attr->atttypid); - if (remapinfo->mapping[i] != TQUEUE_REMAP_NONE) - noop = false; + /* Similarly, deal with ranges appropriately. */ + if (typ->typtype == TYPTYPE_RANGE) + { + ReleaseSysCache(tup); + return BuildRangeRemapInfo(typid, mycontext); } - if (noop) + /* + * If it's a composite type (including RECORD), set up for remapping. We + * don't attempt to determine the status of subfields here, since we do + * not have enough information yet; just mark everything invalid. + */ + if (typ->typtype == TYPTYPE_COMPOSITE || typid == RECORDOID) { - pfree(remapinfo); - remapinfo = NULL; + TupleRemapInfo *remapinfo; + + remapinfo = (TupleRemapInfo *) + MemoryContextAlloc(mycontext, sizeof(TupleRemapInfo)); + remapinfo->remapclass = TQUEUE_REMAP_RECORD; + remapinfo->u.rec.rectypid = InvalidOid; + remapinfo->u.rec.rectypmod = -1; + remapinfo->u.rec.localtypmod = -1; + remapinfo->u.rec.tupledesc = NULL; + remapinfo->u.rec.field_remap = NULL; + ReleaseSysCache(tup); + return remapinfo; } + /* Nothing else can possibly need remapping attention. */ + ReleaseSysCache(tup); + return NULL; +} + +static TupleRemapInfo * +BuildArrayRemapInfo(Oid elemtypid, MemoryContext mycontext) +{ + TupleRemapInfo *remapinfo; + TupleRemapInfo *element_remapinfo; + + /* See if element type requires remapping. */ + element_remapinfo = BuildTupleRemapInfo(elemtypid, mycontext); + /* If not, the array doesn't either. */ + if (element_remapinfo == NULL) + return NULL; + /* OK, set up to remap the array. */ + remapinfo = (TupleRemapInfo *) + MemoryContextAlloc(mycontext, sizeof(TupleRemapInfo)); + remapinfo->remapclass = TQUEUE_REMAP_ARRAY; + get_typlenbyvalalign(elemtypid, + &remapinfo->u.arr.typlen, + &remapinfo->u.arr.typbyval, + &remapinfo->u.arr.typalign); + remapinfo->u.arr.element_remap = element_remapinfo; + return remapinfo; +} + +static TupleRemapInfo * +BuildRangeRemapInfo(Oid rngtypid, MemoryContext mycontext) +{ + TupleRemapInfo *remapinfo; + TupleRemapInfo *bound_remapinfo; + TypeCacheEntry *typcache; + + /* + * Get range info from the typcache. We assume this pointer will stay + * valid for the duration of the query. + */ + typcache = lookup_type_cache(rngtypid, TYPECACHE_RANGE_INFO); + if (typcache->rngelemtype == NULL) + elog(ERROR, "type %u is not a range type", rngtypid); + + /* See if range bound type requires remapping. */ + bound_remapinfo = BuildTupleRemapInfo(typcache->rngelemtype->type_id, + mycontext); + /* If not, the range doesn't either. */ + if (bound_remapinfo == NULL) + return NULL; + /* OK, set up to remap the range. */ + remapinfo = (TupleRemapInfo *) + MemoryContextAlloc(mycontext, sizeof(TupleRemapInfo)); + remapinfo->remapclass = TQUEUE_REMAP_RANGE; + remapinfo->u.rng.typcache = typcache; + remapinfo->u.rng.bound_remap = bound_remapinfo; return remapinfo; } /* - * Determine the remap class assocociated with a particular data type. - * - * Transient record types need to have the typmod applied on the sending side - * replaced with a value on the receiving side that has the same meaning. - * - * Arrays, range types, and all record types (including named composite types) - * need to searched for transient record values buried within them. - * Surprisingly, a walker is required even when the indicated type is a - * composite type, because the actual value may be a compatible transient - * record type. + * Build remap info for fields of the type described by the given tupdesc. + * Returns an array of TupleRemapInfo pointers, or NULL if no field + * requires remapping. Data is allocated in mycontext. */ -static RemapClass -GetRemapClass(Oid typeid) +static TupleRemapInfo ** +BuildFieldRemapInfo(TupleDesc tupledesc, MemoryContext mycontext) { - RemapClass forceResult = TQUEUE_REMAP_NONE; - RemapClass innerResult = TQUEUE_REMAP_NONE; + TupleRemapInfo **remapinfo; + bool noop = true; + int i; - for (;;) + /* Recursively determine the remapping status of each field. */ + remapinfo = (TupleRemapInfo **) + MemoryContextAlloc(mycontext, + tupledesc->natts * sizeof(TupleRemapInfo *)); + for (i = 0; i < tupledesc->natts; i++) { - HeapTuple tup; - Form_pg_type typ; - - /* Simple cases. */ - if (typeid == RECORDOID) - { - innerResult = TQUEUE_REMAP_RECORD; - break; - } - if (typeid == RECORDARRAYOID) - { - innerResult = TQUEUE_REMAP_ARRAY; - break; - } - - /* Otherwise, we need a syscache lookup to figure it out. */ - tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(typeid)); - if (!HeapTupleIsValid(tup)) - elog(ERROR, "cache lookup failed for type %u", typeid); - typ = (Form_pg_type) GETSTRUCT(tup); - - /* Look through domains to underlying base type. */ - if (typ->typtype == TYPTYPE_DOMAIN) - { - typeid = typ->typbasetype; - ReleaseSysCache(tup); - continue; - } - - /* - * Look through arrays to underlying base type, but the final return - * value must be either TQUEUE_REMAP_ARRAY or TQUEUE_REMAP_NONE. (If - * this is an array of integers, for example, we don't need to walk - * it.) - */ - if (OidIsValid(typ->typelem) && typ->typlen == -1) - { - typeid = typ->typelem; - ReleaseSysCache(tup); - if (forceResult == TQUEUE_REMAP_NONE) - forceResult = TQUEUE_REMAP_ARRAY; - continue; - } + Form_pg_attribute attr = tupledesc->attrs[i]; - /* - * Similarly, look through ranges to the underlying base type, but the - * final return value must be either TQUEUE_REMAP_RANGE or - * TQUEUE_REMAP_NONE. - */ - if (typ->typtype == TYPTYPE_RANGE) + if (attr->attisdropped) { - ReleaseSysCache(tup); - if (forceResult == TQUEUE_REMAP_NONE) - forceResult = TQUEUE_REMAP_RANGE; - typeid = get_range_subtype(typeid); + remapinfo[i] = NULL; continue; } + remapinfo[i] = BuildTupleRemapInfo(attr->atttypid, mycontext); + if (remapinfo[i] != NULL) + noop = false; + } - /* Walk composite types. Nothing else needs special handling. */ - if (typ->typtype == TYPTYPE_COMPOSITE) - innerResult = TQUEUE_REMAP_RECORD; - ReleaseSysCache(tup); - break; + /* If no fields require remapping, report that by returning NULL. */ + if (noop) + { + pfree(remapinfo); + remapinfo = NULL; } - if (innerResult != TQUEUE_REMAP_NONE && forceResult != TQUEUE_REMAP_NONE) - return forceResult; - return innerResult; + return remapinfo; } -- 2.40.0