]> granicus.if.org Git - postgresql/commitdiff
Code review for tqueue.c: fix memory leaks, speed it up, other fixes.
authorTom Lane <tgl@sss.pgh.pa.us>
Sun, 31 Jul 2016 20:05:12 +0000 (16:05 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Sun, 31 Jul 2016 20:05:12 +0000 (16:05 -0400)
When doing record typmod remapping, tqueue.c did fresh catalog lookups
for each tuple it processed, which was pretty horrible performance-wise
(it seemed to about halve the already none-too-quick speed of bulk reads
in parallel mode).  Worse, it insisted on putting bits of that data into
TopMemoryContext, from where it never freed them, causing a
session-lifespan memory leak.  (I suppose this was coded with the idea
that the sender process would quit after finishing the query ---
but the receiver uses the same code.)

Restructure to avoid repetitive catalog lookups and to keep that data
in a query-lifespan context, in or below the context where the
TQueueDestReceiver or TupleQueueReader itself lives.

Fix some other bugs such as continuing to use a tupledesc after
releasing our refcount on it.  Clean up cavalier datatype choices
(typmods are int32, please, not int, and certainly not Oid).  Improve
comments and error message wording.

src/backend/executor/tqueue.c

index 64555599ceeb83c7aec43143d5846c0a3ae4d1cc..58d0eeaf0ba85d09139d11128e27f53dcbacc35d 100644 (file)
@@ -3,16 +3,25 @@
  * tqueue.c
  *       Use shm_mq to send & receive tuples between parallel backends
  *
+ * Most of the complexity in this module arises from transient RECORD types,
+ * which all have type RECORDOID and are distinguished by typmod numbers
+ * that are managed per-backend (see src/backend/utils/cache/typcache.c).
+ * The sender's set of RECORD typmod assignments probably doesn't match the
+ * receiver's.  To deal with this, we make the sender send a description
+ * of each transient RECORD type appearing in the data it sends.  The
+ * receiver finds or creates a matching type in its own typcache, and then
+ * maps the sender's typmod for that type to its own typmod.
+ *
  * A DestReceiver of type DestTupleQueue, which is a TQueueDestReceiver
  * under the hood, writes tuples from the executor to a shm_mq.  If
  * necessary, it also writes control messages describing transient
  * record types used within the tuple.
  *
- * A TupleQueueReader reads tuples, and if any are sent control messages,
+ * A TupleQueueReader reads tuples, and control messages if any are sent,
  * from a shm_mq and returns the tuples.  If transient record types are
- * in use, it registers those types based on the received control messages
- * and rewrites the typemods sent by the remote side to the corresponding
- * local record typemods.
+ * in use, it registers those types locally based on the control messages
+ * and rewrites the typmods sent by the remote side to the corresponding
+ * local record typmods.
  *
  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
 #include "utils/syscache.h"
 #include "utils/typcache.h"
 
+
+/*
+ * The data transferred through the shm_mq is divided into messages.
+ * One-byte messages are mode-switch messages, telling the receiver to switch
+ * between "control" and "data" modes.  (We always start up in "data" mode.)
+ * Otherwise, when in "data" mode, each message is a tuple.  When in "control"
+ * mode, each message defines one transient-typmod-to-tupledesc mapping to
+ * let us interpret future tuples.  Both of those cases certainly require
+ * more than one byte, so no confusion is possible.
+ */
+#define TUPLE_QUEUE_MODE_CONTROL       'c' /* mode-switch message contents */
+#define TUPLE_QUEUE_MODE_DATA          'd'
+
+/*
+ * Both the sender and receiver build trees of TupleRemapInfo nodes to help
+ * them identify which (sub) fields of transmitted tuples are composite and
+ * may thus need remap processing.  We might need to look within arrays and
+ * ranges, not only composites, to find composite sub-fields.  A NULL
+ * TupleRemapInfo pointer indicates that it is known that the described field
+ * is not composite and has no composite substructure.
+ *
+ * Note that we currently have to look at each composite field at runtime,
+ * even if we believe it's of a named composite type (i.e., not RECORD).
+ * This is because we allow the actual value to be a compatible transient
+ * RECORD type.  That's grossly inefficient, and it would be good to get
+ * rid of the requirement, but it's not clear what would need to change.
+ *
+ * Also, we allow the top-level tuple structure, as well as the actual
+ * structure of composite subfields, to change from one tuple to the next
+ * at runtime.  This may well be entirely historical, but it's mostly free
+ * to support given the previous requirement; and other places in the system
+ * also permit this, so it's not entirely clear if we could drop it.
+ */
+
 typedef enum
 {
-       TQUEUE_REMAP_NONE,                      /* no special processing required */
        TQUEUE_REMAP_ARRAY,                     /* array */
        TQUEUE_REMAP_RANGE,                     /* range */
-       TQUEUE_REMAP_RECORD                     /* composite type, named or anonymous */
-} RemapClass;
+       TQUEUE_REMAP_RECORD                     /* composite type, named or transient */
+} TupleRemapClass;
 
-typedef struct
+typedef struct TupleRemapInfo TupleRemapInfo;
+
+typedef struct ArrayRemapInfo
 {
-       int                     natts;
-       RemapClass      mapping[FLEXIBLE_ARRAY_MEMBER];
-} RemapInfo;
+       int16           typlen;                 /* array element type's storage properties */
+       bool            typbyval;
+       char            typalign;
+       TupleRemapInfo *element_remap;          /* array element type's remap info */
+} ArrayRemapInfo;
 
-typedef struct
+typedef struct RangeRemapInfo
 {
-       DestReceiver pub;
-       shm_mq_handle *handle;
-       MemoryContext tmpcontext;
-       HTAB       *recordhtab;
-       char            mode;
-       TupleDesc       tupledesc;
-       RemapInfo  *remapinfo;
+       TypeCacheEntry *typcache;       /* range type's typcache entry */
+       TupleRemapInfo *bound_remap;    /* range bound type's remap info */
+} RangeRemapInfo;
+
+typedef struct RecordRemapInfo
+{
+       /* Original (remote) type ID info last seen for this composite field */
+       Oid                     rectypid;
+       int32           rectypmod;
+       /* Local RECORD typmod, or -1 if unset; not used on sender side */
+       int32           localtypmod;
+       /* If no fields of the record require remapping, these are NULL: */
+       TupleDesc       tupledesc;              /* copy of record's tupdesc */
+       TupleRemapInfo **field_remap;           /* each field's remap info */
+} RecordRemapInfo;
+
+struct TupleRemapInfo
+{
+       TupleRemapClass remapclass;
+       union
+       {
+               ArrayRemapInfo arr;
+               RangeRemapInfo rng;
+               RecordRemapInfo rec;
+       }                       u;
+};
+
+/*
+ * DestReceiver object's private contents
+ *
+ * queue and tupledesc are pointers to data supplied by DestReceiver's caller.
+ * The recordhtab and remap info are owned by the DestReceiver and are kept
+ * in mycontext.  tmpcontext is a tuple-lifespan context to hold cruft
+ * created while traversing each tuple to find record subfields.
+ */
+typedef struct TQueueDestReceiver
+{
+       DestReceiver pub;                       /* public fields */
+       shm_mq_handle *queue;           /* shm_mq to send to */
+       MemoryContext mycontext;        /* context containing TQueueDestReceiver */
+       MemoryContext tmpcontext;       /* per-tuple context, if needed */
+       HTAB       *recordhtab;         /* table of transmitted typmods, if needed */
+       char            mode;                   /* current message mode */
+       TupleDesc       tupledesc;              /* current top-level tuple descriptor */
+       TupleRemapInfo **field_remapinfo;       /* current top-level remap info */
 } TQueueDestReceiver;
 
-typedef struct RecordTypemodMap
+/*
+ * Hash table entries for mapping remote to local typmods.
+ */
+typedef struct RecordTypmodMap
 {
-       int                     remotetypmod;
-       int                     localtypmod;
-} RecordTypemodMap;
+       int32           remotetypmod;   /* hash key (must be first!) */
+       int32           localtypmod;
+} RecordTypmodMap;
 
+/*
+ * TupleQueueReader object's private contents
+ *
+ * queue and tupledesc are pointers to data supplied by reader's caller.
+ * The typmodmap and remap info are owned by the TupleQueueReader and
+ * are kept in mycontext.
+ *
+ * "typedef struct TupleQueueReader TupleQueueReader" is in tqueue.h
+ */
 struct TupleQueueReader
 {
-       shm_mq_handle *queue;
-       char            mode;
-       TupleDesc       tupledesc;
-       RemapInfo  *remapinfo;
-       HTAB       *typmodmap;
+       shm_mq_handle *queue;           /* shm_mq to receive from */
+       MemoryContext mycontext;        /* context containing TupleQueueReader */
+       HTAB       *typmodmap;          /* RecordTypmodMap hashtable, if needed */
+       char            mode;                   /* current message mode */
+       TupleDesc       tupledesc;              /* current top-level tuple descriptor */
+       TupleRemapInfo **field_remapinfo;       /* current top-level remap info */
 };
 
-#define                TUPLE_QUEUE_MODE_CONTROL                        'c'
-#define                TUPLE_QUEUE_MODE_DATA                           'd'
-
-static void tqueueWalk(TQueueDestReceiver *tqueue, RemapClass walktype,
-                  Datum value);
-static void tqueueWalkRecord(TQueueDestReceiver *tqueue, Datum value);
-static void tqueueWalkArray(TQueueDestReceiver *tqueue, Datum value);
-static void tqueueWalkRange(TQueueDestReceiver *tqueue, Datum value);
-static void tqueueSendTypmodInfo(TQueueDestReceiver *tqueue, int typmod,
-                                        TupleDesc tupledesc);
+/* Local function prototypes */
+static void TQExamine(TQueueDestReceiver *tqueue,
+                 TupleRemapInfo *remapinfo,
+                 Datum value);
+static void TQExamineArray(TQueueDestReceiver *tqueue,
+                          ArrayRemapInfo *remapinfo,
+                          Datum value);
+static void TQExamineRange(TQueueDestReceiver *tqueue,
+                          RangeRemapInfo *remapinfo,
+                          Datum value);
+static void TQExamineRecord(TQueueDestReceiver *tqueue,
+                               RecordRemapInfo *remapinfo,
+                               Datum value);
+static void TQSendRecordInfo(TQueueDestReceiver *tqueue, int32 typmod,
+                                TupleDesc tupledesc);
 static void TupleQueueHandleControlMessage(TupleQueueReader *reader,
                                                           Size nbytes, char *data);
 static HeapTuple TupleQueueHandleDataMessage(TupleQueueReader *reader,
                                                        Size nbytes, HeapTupleHeader data);
-static HeapTuple TupleQueueRemapTuple(TupleQueueReader *reader,
-                                        TupleDesc tupledesc, RemapInfo *remapinfo,
-                                        HeapTuple tuple);
-static Datum TupleQueueRemap(TupleQueueReader *reader, RemapClass remapclass,
-                               Datum value);
-static Datum TupleQueueRemapArray(TupleQueueReader *reader, Datum value);
-static Datum TupleQueueRemapRange(TupleQueueReader *reader, Datum value);
-static Datum TupleQueueRemapRecord(TupleQueueReader *reader, Datum value);
-static RemapClass GetRemapClass(Oid typeid);
-static RemapInfo *BuildRemapInfo(TupleDesc tupledesc);
+static HeapTuple TQRemapTuple(TupleQueueReader *reader,
+                        TupleDesc tupledesc,
+                        TupleRemapInfo **field_remapinfo,
+                        HeapTuple tuple);
+static Datum TQRemap(TupleQueueReader *reader, TupleRemapInfo *remapinfo,
+               Datum value, bool *changed);
+static Datum TQRemapArray(TupleQueueReader *reader, ArrayRemapInfo *remapinfo,
+                        Datum value, bool *changed);
+static Datum TQRemapRange(TupleQueueReader *reader, RangeRemapInfo *remapinfo,
+                        Datum value, bool *changed);
+static Datum TQRemapRecord(TupleQueueReader *reader, RecordRemapInfo *remapinfo,
+                         Datum value, bool *changed);
+static TupleRemapInfo *BuildTupleRemapInfo(Oid typid, MemoryContext mycontext);
+static TupleRemapInfo *BuildArrayRemapInfo(Oid elemtypid,
+                                       MemoryContext mycontext);
+static TupleRemapInfo *BuildRangeRemapInfo(Oid rngtypid,
+                                       MemoryContext mycontext);
+static TupleRemapInfo **BuildFieldRemapInfo(TupleDesc tupledesc,
+                                       MemoryContext mycontext);
+
 
 /*
- * Receive a tuple.
+ * Receive a tuple from a query, and send it to the designated shm_mq.
  *
- * This is, at core, pretty simple: just send the tuple to the designated
- * shm_mq.  The complicated part is that if the tuple contains transient
- * record types (see lookup_rowtype_tupdesc), we need to send control
- * information to the shm_mq receiver so that those typemods can be correctly
- * interpreted, as they are merely held in a backend-local cache.  Worse, the
- * record type may not at the top level: we could have a range over an array
- * type over a range type over a range type over an array type over a record,
- * or something like that.
+ * Returns TRUE if successful, FALSE if shm_mq has been detached.
  */
 static bool
 tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
@@ -124,43 +229,49 @@ tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
        shm_mq_result result;
 
        /*
-        * Test to see whether the tupledesc has changed; if so, set up for the
-        * new tupledesc.  This is a strange test both because the executor really
+        * If first time through, compute remapping info for the top-level fields.
+        * On later calls, if the tupledesc has changed, set up for the new
+        * tupledesc.  (This is a strange test both because the executor really
         * shouldn't change the tupledesc, and also because it would be unsafe if
         * the old tupledesc could be freed and a new one allocated at the same
         * address.  But since some very old code in printtup.c uses a similar
-        * test, we adopt it here as well.
+        * approach, we adopt it here as well.)
+        *
+        * Here and elsewhere in this module, when replacing remapping info we
+        * pfree the top-level object because that's easy, but we don't bother to
+        * recursively free any substructure.  This would lead to query-lifespan
+        * memory leaks if the mapping info actually changed frequently, but since
+        * we don't expect that to happen, it doesn't seem worth expending code to
+        * prevent it.
         */
        if (tqueue->tupledesc != tupledesc)
        {
-               if (tqueue->remapinfo != NULL)
-                       pfree(tqueue->remapinfo);
-               tqueue->remapinfo = BuildRemapInfo(tupledesc);
+               /* Is it worth trying to free substructure of the remap tree? */
+               if (tqueue->field_remapinfo != NULL)
+                       pfree(tqueue->field_remapinfo);
+               tqueue->field_remapinfo = BuildFieldRemapInfo(tupledesc,
+                                                                                                         tqueue->mycontext);
                tqueue->tupledesc = tupledesc;
        }
 
-       tuple = ExecMaterializeSlot(slot);
-
        /*
-        * When, because of the types being transmitted, no record typemod mapping
+        * When, because of the types being transmitted, no record typmod mapping
         * can be needed, we can skip a good deal of work.
         */
-       if (tqueue->remapinfo != NULL)
+       if (tqueue->field_remapinfo != NULL)
        {
-               RemapInfo  *remapinfo = tqueue->remapinfo;
-               AttrNumber      i;
+               TupleRemapInfo **remapinfo = tqueue->field_remapinfo;
+               int                     i;
                MemoryContext oldcontext = NULL;
 
-               /* Deform the tuple so we can examine it, if not done already. */
+               /* Deform the tuple so we can examine fields, if not done already. */
                slot_getallattrs(slot);
 
-               /* Iterate over each attribute and search it for transient typemods. */
-               Assert(slot->tts_tupleDescriptor->natts == remapinfo->natts);
-               for (i = 0; i < remapinfo->natts; ++i)
+               /* Iterate over each attribute and search it for transient typmods. */
+               for (i = 0; i < tupledesc->natts; i++)
                {
                        /* Ignore nulls and types that don't need special handling. */
-                       if (slot->tts_isnull[i] ||
-                               remapinfo->mapping[i] == TQUEUE_REMAP_NONE)
+                       if (slot->tts_isnull[i] || remapinfo[i] == NULL)
                                continue;
 
                        /* Switch to temporary memory context to avoid leaking. */
@@ -168,16 +279,16 @@ tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
                        {
                                if (tqueue->tmpcontext == NULL)
                                        tqueue->tmpcontext =
-                                               AllocSetContextCreate(TopMemoryContext,
-                                                                                         "tqueue temporary context",
+                                               AllocSetContextCreate(tqueue->mycontext,
+                                                                                         "tqueue sender temp context",
                                                                                          ALLOCSET_DEFAULT_MINSIZE,
                                                                                          ALLOCSET_DEFAULT_INITSIZE,
                                                                                          ALLOCSET_DEFAULT_MAXSIZE);
                                oldcontext = MemoryContextSwitchTo(tqueue->tmpcontext);
                        }
 
-                       /* Invoke the appropriate walker function. */
-                       tqueueWalk(tqueue, remapinfo->mapping[i], slot->tts_values[i]);
+                       /* Examine the value. */
+                       TQExamine(tqueue, remapinfo[i], slot->tts_values[i]);
                }
 
                /* If we used the temp context, reset it and restore prior context. */
@@ -191,217 +302,232 @@ tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
                if (tqueue->mode != TUPLE_QUEUE_MODE_DATA)
                {
                        tqueue->mode = TUPLE_QUEUE_MODE_DATA;
-                       shm_mq_send(tqueue->handle, sizeof(char), &tqueue->mode, false);
+                       shm_mq_send(tqueue->queue, sizeof(char), &tqueue->mode, false);
                }
        }
 
        /* Send the tuple itself. */
-       result = shm_mq_send(tqueue->handle, tuple->t_len, tuple->t_data, false);
+       tuple = ExecMaterializeSlot(slot);
+       result = shm_mq_send(tqueue->queue, tuple->t_len, tuple->t_data, false);
 
+       /* Check for failure. */
        if (result == SHM_MQ_DETACHED)
                return false;
        else if (result != SHM_MQ_SUCCESS)
                ereport(ERROR,
                                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-                                errmsg("could not send tuples")));
+                                errmsg("could not send tuple to shared-memory queue")));
 
        return true;
 }
 
 /*
- * Invoke the appropriate walker function based on the given RemapClass.
+ * Examine the given datum and send any necessary control messages for
+ * transient record types contained in it.
+ *
+ * remapinfo is previously-computed remapping info about the datum's type.
+ *
+ * This function just dispatches based on the remap class.
  */
 static void
-tqueueWalk(TQueueDestReceiver *tqueue, RemapClass walktype, Datum value)
+TQExamine(TQueueDestReceiver *tqueue, TupleRemapInfo *remapinfo, Datum value)
 {
+       /* This is recursive, so it could be driven to stack overflow. */
        check_stack_depth();
 
-       switch (walktype)
+       switch (remapinfo->remapclass)
        {
-               case TQUEUE_REMAP_NONE:
-                       break;
                case TQUEUE_REMAP_ARRAY:
-                       tqueueWalkArray(tqueue, value);
+                       TQExamineArray(tqueue, &remapinfo->u.arr, value);
                        break;
                case TQUEUE_REMAP_RANGE:
-                       tqueueWalkRange(tqueue, value);
+                       TQExamineRange(tqueue, &remapinfo->u.rng, value);
                        break;
                case TQUEUE_REMAP_RECORD:
-                       tqueueWalkRecord(tqueue, value);
+                       TQExamineRecord(tqueue, &remapinfo->u.rec, value);
                        break;
        }
 }
 
 /*
- * Walk a record and send control messages for transient record types
- * contained therein.
+ * Examine a record datum and send any necessary control messages for
+ * transient record types contained in it.
  */
 static void
-tqueueWalkRecord(TQueueDestReceiver *tqueue, Datum value)
+TQExamineRecord(TQueueDestReceiver *tqueue, RecordRemapInfo *remapinfo,
+                               Datum value)
 {
        HeapTupleHeader tup;
-       Oid                     typeid;
-       Oid                     typmod;
+       Oid                     typid;
+       int32           typmod;
        TupleDesc       tupledesc;
-       RemapInfo  *remapinfo;
 
-       /* Extract typmod from tuple. */
+       /* Extract type OID and typmod from tuple. */
        tup = DatumGetHeapTupleHeader(value);
-       typeid = HeapTupleHeaderGetTypeId(tup);
+       typid = HeapTupleHeaderGetTypeId(tup);
        typmod = HeapTupleHeaderGetTypMod(tup);
 
-       /* Look up tuple descriptor in typecache. */
-       tupledesc = lookup_rowtype_tupdesc(typeid, typmod);
-
        /*
-        * If this is a transient record time, send its TupleDesc as a control
-        * message.  (tqueueSendTypemodInfo is smart enough to do this only once
-        * per typmod.)
+        * If first time through, or if this isn't the same composite type as last
+        * time, consider sending a control message, and then look up the
+        * necessary information for examining the fields.
         */
-       if (typeid == RECORDOID)
-               tqueueSendTypmodInfo(tqueue, typmod, tupledesc);
+       if (typid != remapinfo->rectypid || typmod != remapinfo->rectypmod)
+       {
+               /* Free any old data. */
+               if (remapinfo->tupledesc != NULL)
+                       FreeTupleDesc(remapinfo->tupledesc);
+               /* Is it worth trying to free substructure of the remap tree? */
+               if (remapinfo->field_remap != NULL)
+                       pfree(remapinfo->field_remap);
 
-       /*
-        * Build the remap information for this tupledesc.  We might want to think
-        * about keeping a cache of this information keyed by typeid and typemod,
-        * but let's keep it simple for now.
-        */
-       remapinfo = BuildRemapInfo(tupledesc);
+               /* Look up tuple descriptor in typcache. */
+               tupledesc = lookup_rowtype_tupdesc(typid, typmod);
+
+               /*
+                * If this is a transient record type, send the tupledesc in a control
+                * message.  (TQSendRecordInfo is smart enough to do this only once
+                * per typmod.)
+                */
+               if (typid == RECORDOID)
+                       TQSendRecordInfo(tqueue, typmod, tupledesc);
+
+               /* Figure out whether fields need recursive processing. */
+               remapinfo->field_remap = BuildFieldRemapInfo(tupledesc,
+                                                                                                        tqueue->mycontext);
+               if (remapinfo->field_remap != NULL)
+               {
+                       /*
+                        * We need to inspect the record contents, so save a copy of the
+                        * tupdesc.  (We could possibly just reference the typcache's
+                        * copy, but then it's problematic when to release the refcount.)
+                        */
+                       MemoryContext oldcontext = MemoryContextSwitchTo(tqueue->mycontext);
+
+                       remapinfo->tupledesc = CreateTupleDescCopy(tupledesc);
+                       MemoryContextSwitchTo(oldcontext);
+               }
+               else
+               {
+                       /* No fields of the record require remapping. */
+                       remapinfo->tupledesc = NULL;
+               }
+               remapinfo->rectypid = typid;
+               remapinfo->rectypmod = typmod;
+
+               /* Release reference count acquired by lookup_rowtype_tupdesc. */
+               DecrTupleDescRefCount(tupledesc);
+       }
 
        /*
-        * If remapping is required, deform the tuple and process each field. When
-        * BuildRemapInfo is null, the data types are such that there can be no
-        * transient record types here, so we can skip all this work.
+        * If field remapping is required, deform the tuple and examine each
+        * field.
         */
-       if (remapinfo != NULL)
+       if (remapinfo->field_remap != NULL)
        {
                Datum      *values;
                bool       *isnull;
                HeapTupleData tdata;
-               AttrNumber      i;
+               int                     i;
 
                /* Deform the tuple so we can check each column within. */
-               values = palloc(tupledesc->natts * sizeof(Datum));
-               isnull = palloc(tupledesc->natts * sizeof(bool));
+               tupledesc = remapinfo->tupledesc;
+               values = (Datum *) palloc(tupledesc->natts * sizeof(Datum));
+               isnull = (bool *) palloc(tupledesc->natts * sizeof(bool));
                tdata.t_len = HeapTupleHeaderGetDatumLength(tup);
                ItemPointerSetInvalid(&(tdata.t_self));
                tdata.t_tableOid = InvalidOid;
                tdata.t_data = tup;
                heap_deform_tuple(&tdata, tupledesc, values, isnull);
 
-               /* Recursively check each non-NULL attribute. */
-               for (i = 0; i < tupledesc->natts; ++i)
-                       if (!isnull[i])
-                               tqueueWalk(tqueue, remapinfo->mapping[i], values[i]);
-       }
+               /* Recursively check each interesting non-NULL attribute. */
+               for (i = 0; i < tupledesc->natts; i++)
+               {
+                       if (!isnull[i] && remapinfo->field_remap[i])
+                               TQExamine(tqueue, remapinfo->field_remap[i], values[i]);
+               }
 
-       /* Release reference count acquired by lookup_rowtype_tupdesc. */
-       DecrTupleDescRefCount(tupledesc);
+               /* Need not clean up, since we're in a short-lived context. */
+       }
 }
 
 /*
- * Walk a record and send control messages for transient record types
- * contained therein.
+ * Examine an array datum and send any necessary control messages for
+ * transient record types contained in it.
  */
 static void
-tqueueWalkArray(TQueueDestReceiver *tqueue, Datum value)
+TQExamineArray(TQueueDestReceiver *tqueue, ArrayRemapInfo *remapinfo,
+                          Datum value)
 {
        ArrayType  *arr = DatumGetArrayTypeP(value);
-       Oid                     typeid = ARR_ELEMTYPE(arr);
-       RemapClass      remapclass;
-       int16           typlen;
-       bool            typbyval;
-       char            typalign;
+       Oid                     typid = ARR_ELEMTYPE(arr);
        Datum      *elem_values;
        bool       *elem_nulls;
        int                     num_elems;
        int                     i;
 
-       remapclass = GetRemapClass(typeid);
-
-       /*
-        * If the elements of the array don't need to be walked, we shouldn't have
-        * been called in the first place: GetRemapClass should have returned NULL
-        * when asked about this array type.
-        */
-       Assert(remapclass != TQUEUE_REMAP_NONE);
-
        /* Deconstruct the array. */
-       get_typlenbyvalalign(typeid, &typlen, &typbyval, &typalign);
-       deconstruct_array(arr, typeid, typlen, typbyval, typalign,
+       deconstruct_array(arr, typid, remapinfo->typlen,
+                                         remapinfo->typbyval, remapinfo->typalign,
                                          &elem_values, &elem_nulls, &num_elems);
 
-       /* Walk each element. */
-       for (i = 0; i < num_elems; ++i)
+       /* Examine each element. */
+       for (i = 0; i < num_elems; i++)
+       {
                if (!elem_nulls[i])
-                       tqueueWalk(tqueue, remapclass, elem_values[i]);
+                       TQExamine(tqueue, remapinfo->element_remap, elem_values[i]);
+       }
 }
 
 /*
- * Walk a range type and send control messages for transient record types
- * contained therein.
+ * Examine a range datum and send any necessary control messages for
+ * transient record types contained in it.
  */
 static void
-tqueueWalkRange(TQueueDestReceiver *tqueue, Datum value)
+TQExamineRange(TQueueDestReceiver *tqueue, RangeRemapInfo *remapinfo,
+                          Datum value)
 {
        RangeType  *range = DatumGetRangeType(value);
-       Oid                     typeid = RangeTypeGetOid(range);
-       RemapClass      remapclass;
-       TypeCacheEntry *typcache;
        RangeBound      lower;
        RangeBound      upper;
        bool            empty;
 
-       /*
-        * Extract the lower and upper bounds.  It might be worth implementing
-        * some caching scheme here so that we don't look up the same typeids in
-        * the type cache repeatedly, but for now let's keep it simple.
-        */
-       typcache = lookup_type_cache(typeid, TYPECACHE_RANGE_INFO);
-       if (typcache->rngelemtype == NULL)
-               elog(ERROR, "type %u is not a range type", typeid);
-       range_deserialize(typcache, range, &lower, &upper, &empty);
+       /* Extract the lower and upper bounds. */
+       range_deserialize(remapinfo->typcache, range, &lower, &upper, &empty);
 
        /* Nothing to do for an empty range. */
        if (empty)
                return;
 
-       /*
-        * If the range bounds don't need to be walked, we shouldn't have been
-        * called in the first place: GetRemapClass should have returned NULL when
-        * asked about this range type.
-        */
-       remapclass = GetRemapClass(typcache->rngelemtype->type_id);
-       Assert(remapclass != TQUEUE_REMAP_NONE);
-
-       /* Walk each bound, if present. */
+       /* Examine each bound, if present. */
        if (!upper.infinite)
-               tqueueWalk(tqueue, remapclass, upper.val);
+               TQExamine(tqueue, remapinfo->bound_remap, upper.val);
        if (!lower.infinite)
-               tqueueWalk(tqueue, remapclass, lower.val);
+               TQExamine(tqueue, remapinfo->bound_remap, lower.val);
 }
 
 /*
- * Send tuple descriptor information for a transient typemod, unless we've
+ * Send tuple descriptor information for a transient typmod, unless we've
  * already done so previously.
  */
 static void
-tqueueSendTypmodInfo(TQueueDestReceiver *tqueue, int typmod,
-                                        TupleDesc tupledesc)
+TQSendRecordInfo(TQueueDestReceiver *tqueue, int32 typmod, TupleDesc tupledesc)
 {
        StringInfoData buf;
        bool            found;
-       AttrNumber      i;
+       int                     i;
 
        /* Initialize hash table if not done yet. */
        if (tqueue->recordhtab == NULL)
        {
                HASHCTL         ctl;
 
-               ctl.keysize = sizeof(int);
-               ctl.entrysize = sizeof(int);
-               ctl.hcxt = TopMemoryContext;
-               tqueue->recordhtab = hash_create("tqueue record hashtable",
+               MemSet(&ctl, 0, sizeof(ctl));
+               /* Hash table entries are just typmods */
+               ctl.keysize = sizeof(int32);
+               ctl.entrysize = sizeof(int32);
+               ctl.hcxt = tqueue->mycontext;
+               tqueue->recordhtab = hash_create("tqueue sender record type hashtable",
                                                                                 100, &ctl,
                                                                          HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
        }
@@ -411,25 +537,30 @@ tqueueSendTypmodInfo(TQueueDestReceiver *tqueue, int typmod,
        if (found)
                return;
 
+       elog(DEBUG3, "sending tqueue control message for record typmod %d", typmod);
+
        /* If message queue is in data mode, switch to control mode. */
        if (tqueue->mode != TUPLE_QUEUE_MODE_CONTROL)
        {
                tqueue->mode = TUPLE_QUEUE_MODE_CONTROL;
-               shm_mq_send(tqueue->handle, sizeof(char), &tqueue->mode, false);
+               shm_mq_send(tqueue->queue, sizeof(char), &tqueue->mode, false);
        }
 
        /* Assemble a control message. */
        initStringInfo(&buf);
-       appendBinaryStringInfo(&buf, (char *) &typmod, sizeof(int));
+       appendBinaryStringInfo(&buf, (char *) &typmod, sizeof(int32));
        appendBinaryStringInfo(&buf, (char *) &tupledesc->natts, sizeof(int));
-       appendBinaryStringInfo(&buf, (char *) &tupledesc->tdhasoid,
-                                                  sizeof(bool));
-       for (i = 0; i < tupledesc->natts; ++i)
+       appendBinaryStringInfo(&buf, (char *) &tupledesc->tdhasoid, sizeof(bool));
+       for (i = 0; i < tupledesc->natts; i++)
+       {
                appendBinaryStringInfo(&buf, (char *) tupledesc->attrs[i],
                                                           sizeof(FormData_pg_attribute));
+       }
 
        /* Send control message. */
-       shm_mq_send(tqueue->handle, buf.len, buf.data, false);
+       shm_mq_send(tqueue->queue, buf.len, buf.data, false);
+
+       /* We assume it's OK to leak buf because we're in a short-lived context. */
 }
 
 /*
@@ -449,7 +580,7 @@ tqueueShutdownReceiver(DestReceiver *self)
 {
        TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
 
-       shm_mq_detach(shm_mq_get_queue(tqueue->handle));
+       shm_mq_detach(shm_mq_get_queue(tqueue->queue));
 }
 
 /*
@@ -464,8 +595,9 @@ tqueueDestroyReceiver(DestReceiver *self)
                MemoryContextDelete(tqueue->tmpcontext);
        if (tqueue->recordhtab != NULL)
                hash_destroy(tqueue->recordhtab);
-       if (tqueue->remapinfo != NULL)
-               pfree(tqueue->remapinfo);
+       /* Is it worth trying to free substructure of the remap tree? */
+       if (tqueue->field_remapinfo != NULL)
+               pfree(tqueue->field_remapinfo);
        pfree(self);
 }
 
@@ -484,11 +616,14 @@ CreateTupleQueueDestReceiver(shm_mq_handle *handle)
        self->pub.rShutdown = tqueueShutdownReceiver;
        self->pub.rDestroy = tqueueDestroyReceiver;
        self->pub.mydest = DestTupleQueue;
-       self->handle = handle;
+       self->queue = handle;
+       self->mycontext = CurrentMemoryContext;
        self->tmpcontext = NULL;
        self->recordhtab = NULL;
        self->mode = TUPLE_QUEUE_MODE_DATA;
-       self->remapinfo = NULL;
+       /* Top-level tupledesc is not known yet */
+       self->tupledesc = NULL;
+       self->field_remapinfo = NULL;
 
        return (DestReceiver *) self;
 }
@@ -502,9 +637,11 @@ CreateTupleQueueReader(shm_mq_handle *handle, TupleDesc tupledesc)
        TupleQueueReader *reader = palloc0(sizeof(TupleQueueReader));
 
        reader->queue = handle;
+       reader->mycontext = CurrentMemoryContext;
+       reader->typmodmap = NULL;
        reader->mode = TUPLE_QUEUE_MODE_DATA;
        reader->tupledesc = tupledesc;
-       reader->remapinfo = BuildRemapInfo(tupledesc);
+       reader->field_remapinfo = BuildFieldRemapInfo(tupledesc, reader->mycontext);
 
        return reader;
 }
@@ -516,8 +653,11 @@ void
 DestroyTupleQueueReader(TupleQueueReader *reader)
 {
        shm_mq_detach(shm_mq_get_queue(reader->queue));
-       if (reader->remapinfo != NULL)
-               pfree(reader->remapinfo);
+       if (reader->typmodmap != NULL)
+               hash_destroy(reader->typmodmap);
+       /* Is it worth trying to free substructure of the remap tree? */
+       if (reader->field_remapinfo != NULL)
+               pfree(reader->field_remapinfo);
        pfree(reader);
 }
 
@@ -567,14 +707,7 @@ TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
                Assert(result == SHM_MQ_SUCCESS);
 
                /*
-                * OK, we got a message.  Process it.
-                *
-                * One-byte messages are mode switch messages, so that we can switch
-                * between "control" and "data" mode.  Otherwise, when in "data" mode,
-                * each message is a tuple.  When in "control" mode, each message
-                * provides a transient-typmod-to-tupledesc mapping to let us
-                * interpret future tuples.  Both of those cases certainly require
-                * more than one byte, so no confusion is possible.
+                * We got a message (see message spec at top of file).  Process it.
                 */
                if (nbytes == 1)
                {
@@ -592,7 +725,7 @@ TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
                        TupleQueueHandleControlMessage(reader, nbytes, data);
                }
                else
-                       elog(ERROR, "invalid mode: %d", (int) reader->mode);
+                       elog(ERROR, "unrecognized tqueue mode: %d", (int) reader->mode);
        }
 }
 
@@ -606,220 +739,306 @@ TupleQueueHandleDataMessage(TupleQueueReader *reader,
 {
        HeapTupleData htup;
 
+       /*
+        * Set up a dummy HeapTupleData pointing to the data from the shm_mq
+        * (which had better be sufficiently aligned).
+        */
        ItemPointerSetInvalid(&htup.t_self);
        htup.t_tableOid = InvalidOid;
        htup.t_len = nbytes;
        htup.t_data = data;
 
-       return TupleQueueRemapTuple(reader, reader->tupledesc, reader->remapinfo,
-                                                               &htup);
+       /*
+        * Either just copy the data into a regular palloc'd tuple, or remap it,
+        * as required.
+        */
+       return TQRemapTuple(reader,
+                                               reader->tupledesc,
+                                               reader->field_remapinfo,
+                                               &htup);
 }
 
 /*
- * Remap tuple typmods per control information received from remote side.
+ * Copy the given tuple, remapping any transient typmods contained in it.
  */
 static HeapTuple
-TupleQueueRemapTuple(TupleQueueReader *reader, TupleDesc tupledesc,
-                                        RemapInfo *remapinfo, HeapTuple tuple)
+TQRemapTuple(TupleQueueReader *reader,
+                        TupleDesc tupledesc,
+                        TupleRemapInfo **field_remapinfo,
+                        HeapTuple tuple)
 {
        Datum      *values;
        bool       *isnull;
+       bool            changed = false;
        int                     i;
 
        /*
         * If no remapping is necessary, just copy the tuple into a single
         * palloc'd chunk, as caller will expect.
         */
-       if (remapinfo == NULL)
+       if (field_remapinfo == NULL)
                return heap_copytuple(tuple);
 
        /* Deform tuple so we can remap record typmods for individual attrs. */
-       values = palloc(tupledesc->natts * sizeof(Datum));
-       isnull = palloc(tupledesc->natts * sizeof(bool));
+       values = (Datum *) palloc(tupledesc->natts * sizeof(Datum));
+       isnull = (bool *) palloc(tupledesc->natts * sizeof(bool));
        heap_deform_tuple(tuple, tupledesc, values, isnull);
-       Assert(tupledesc->natts == remapinfo->natts);
 
-       /* Recursively check each non-NULL attribute. */
-       for (i = 0; i < tupledesc->natts; ++i)
+       /* Recursively process each interesting non-NULL attribute. */
+       for (i = 0; i < tupledesc->natts; i++)
        {
-               if (isnull[i] || remapinfo->mapping[i] == TQUEUE_REMAP_NONE)
+               if (isnull[i] || field_remapinfo[i] == NULL)
                        continue;
-               values[i] = TupleQueueRemap(reader, remapinfo->mapping[i], values[i]);
+               values[i] = TQRemap(reader, field_remapinfo[i], values[i], &changed);
        }
 
-       /* Reform the modified tuple. */
-       return heap_form_tuple(tupledesc, values, isnull);
+       /* Reconstruct the modified tuple, if anything was modified. */
+       if (changed)
+               return heap_form_tuple(tupledesc, values, isnull);
+       else
+               return heap_copytuple(tuple);
 }
 
 /*
- * Remap a value based on the specified remap class.
+ * Process the given datum and replace any transient record typmods
+ * contained in it.  Set *changed to TRUE if we actually changed the datum.
+ *
+ * remapinfo is previously-computed remapping info about the datum's type.
+ *
+ * This function just dispatches based on the remap class.
  */
 static Datum
-TupleQueueRemap(TupleQueueReader *reader, RemapClass remapclass, Datum value)
+TQRemap(TupleQueueReader *reader, TupleRemapInfo *remapinfo,
+               Datum value, bool *changed)
 {
+       /* This is recursive, so it could be driven to stack overflow. */
        check_stack_depth();
 
-       switch (remapclass)
+       switch (remapinfo->remapclass)
        {
-               case TQUEUE_REMAP_NONE:
-                       /* caller probably shouldn't have called us at all, but... */
-                       return value;
-
                case TQUEUE_REMAP_ARRAY:
-                       return TupleQueueRemapArray(reader, value);
+                       return TQRemapArray(reader, &remapinfo->u.arr, value, changed);
 
                case TQUEUE_REMAP_RANGE:
-                       return TupleQueueRemapRange(reader, value);
+                       return TQRemapRange(reader, &remapinfo->u.rng, value, changed);
 
                case TQUEUE_REMAP_RECORD:
-                       return TupleQueueRemapRecord(reader, value);
+                       return TQRemapRecord(reader, &remapinfo->u.rec, value, changed);
        }
 
-       elog(ERROR, "unknown remap class: %d", (int) remapclass);
+       elog(ERROR, "unrecognized tqueue remap class: %d",
+                (int) remapinfo->remapclass);
        return (Datum) 0;
 }
 
 /*
- * Remap an array.
+ * Process the given array datum and replace any transient record typmods
+ * contained in it.  Set *changed to TRUE if we actually changed the datum.
  */
 static Datum
-TupleQueueRemapArray(TupleQueueReader *reader, Datum value)
+TQRemapArray(TupleQueueReader *reader, ArrayRemapInfo *remapinfo,
+                        Datum value, bool *changed)
 {
        ArrayType  *arr = DatumGetArrayTypeP(value);
-       Oid                     typeid = ARR_ELEMTYPE(arr);
-       RemapClass      remapclass;
-       int16           typlen;
-       bool            typbyval;
-       char            typalign;
+       Oid                     typid = ARR_ELEMTYPE(arr);
+       bool            element_changed = false;
        Datum      *elem_values;
        bool       *elem_nulls;
        int                     num_elems;
        int                     i;
 
-       remapclass = GetRemapClass(typeid);
-
-       /*
-        * If the elements of the array don't need to be walked, we shouldn't have
-        * been called in the first place: GetRemapClass should have returned NULL
-        * when asked about this array type.
-        */
-       Assert(remapclass != TQUEUE_REMAP_NONE);
-
        /* Deconstruct the array. */
-       get_typlenbyvalalign(typeid, &typlen, &typbyval, &typalign);
-       deconstruct_array(arr, typeid, typlen, typbyval, typalign,
+       deconstruct_array(arr, typid, remapinfo->typlen,
+                                         remapinfo->typbyval, remapinfo->typalign,
                                          &elem_values, &elem_nulls, &num_elems);
 
        /* Remap each element. */
-       for (i = 0; i < num_elems; ++i)
+       for (i = 0; i < num_elems; i++)
+       {
                if (!elem_nulls[i])
-                       elem_values[i] = TupleQueueRemap(reader, remapclass,
-                                                                                        elem_values[i]);
-
-       /* Reconstruct and return the array.  */
-       arr = construct_md_array(elem_values, elem_nulls,
-                                                        ARR_NDIM(arr), ARR_DIMS(arr), ARR_LBOUND(arr),
-                                                        typeid, typlen, typbyval, typalign);
-       return PointerGetDatum(arr);
+                       elem_values[i] = TQRemap(reader,
+                                                                        remapinfo->element_remap,
+                                                                        elem_values[i],
+                                                                        &element_changed);
+       }
+
+       if (element_changed)
+       {
+               /* Reconstruct and return the array.  */
+               *changed = true;
+               arr = construct_md_array(elem_values, elem_nulls,
+                                                          ARR_NDIM(arr), ARR_DIMS(arr), ARR_LBOUND(arr),
+                                                                typid, remapinfo->typlen,
+                                                                remapinfo->typbyval, remapinfo->typalign);
+               return PointerGetDatum(arr);
+       }
+
+       /* Else just return the value as-is. */
+       return value;
 }
 
 /*
- * Remap a range type.
+ * Process the given range datum and replace any transient record typmods
+ * contained in it.  Set *changed to TRUE if we actually changed the datum.
  */
 static Datum
-TupleQueueRemapRange(TupleQueueReader *reader, Datum value)
+TQRemapRange(TupleQueueReader *reader, RangeRemapInfo *remapinfo,
+                        Datum value, bool *changed)
 {
        RangeType  *range = DatumGetRangeType(value);
-       Oid                     typeid = RangeTypeGetOid(range);
-       RemapClass      remapclass;
-       TypeCacheEntry *typcache;
+       bool            bound_changed = false;
        RangeBound      lower;
        RangeBound      upper;
        bool            empty;
 
-       /*
-        * Extract the lower and upper bounds.  As in tqueueWalkRange, some
-        * caching might be a good idea here.
-        */
-       typcache = lookup_type_cache(typeid, TYPECACHE_RANGE_INFO);
-       if (typcache->rngelemtype == NULL)
-               elog(ERROR, "type %u is not a range type", typeid);
-       range_deserialize(typcache, range, &lower, &upper, &empty);
+       /* Extract the lower and upper bounds. */
+       range_deserialize(remapinfo->typcache, range, &lower, &upper, &empty);
 
        /* Nothing to do for an empty range. */
        if (empty)
                return value;
 
-       /*
-        * If the range bounds don't need to be walked, we shouldn't have been
-        * called in the first place: GetRemapClass should have returned NULL when
-        * asked about this range type.
-        */
-       remapclass = GetRemapClass(typcache->rngelemtype->type_id);
-       Assert(remapclass != TQUEUE_REMAP_NONE);
-
        /* Remap each bound, if present. */
        if (!upper.infinite)
-               upper.val = TupleQueueRemap(reader, remapclass, upper.val);
+               upper.val = TQRemap(reader, remapinfo->bound_remap,
+                                                       upper.val, &bound_changed);
        if (!lower.infinite)
-               lower.val = TupleQueueRemap(reader, remapclass, lower.val);
+               lower.val = TQRemap(reader, remapinfo->bound_remap,
+                                                       lower.val, &bound_changed);
 
-       /* And reserialize. */
-       range = range_serialize(typcache, &lower, &upper, empty);
-       return RangeTypeGetDatum(range);
+       if (bound_changed)
+       {
+               /* Reserialize.  */
+               *changed = true;
+               range = range_serialize(remapinfo->typcache, &lower, &upper, empty);
+               return RangeTypeGetDatum(range);
+       }
+
+       /* Else just return the value as-is. */
+       return value;
 }
 
 /*
- * Remap a record.
+ * Process the given record datum and replace any transient record typmods
+ * contained in it.  Set *changed to TRUE if we actually changed the datum.
  */
 static Datum
-TupleQueueRemapRecord(TupleQueueReader *reader, Datum value)
+TQRemapRecord(TupleQueueReader *reader, RecordRemapInfo *remapinfo,
+                         Datum value, bool *changed)
 {
        HeapTupleHeader tup;
-       Oid                     typeid;
-       int                     typmod;
-       RecordTypemodMap *mapent;
+       Oid                     typid;
+       int32           typmod;
+       bool            changed_typmod;
        TupleDesc       tupledesc;
-       RemapInfo  *remapinfo;
-       HeapTupleData htup;
-       HeapTuple       atup;
 
-       /* Fetch type OID and typemod. */
+       /* Extract type OID and typmod from tuple. */
        tup = DatumGetHeapTupleHeader(value);
-       typeid = HeapTupleHeaderGetTypeId(tup);
+       typid = HeapTupleHeaderGetTypeId(tup);
        typmod = HeapTupleHeaderGetTypMod(tup);
 
+       /*
+        * If first time through, or if this isn't the same composite type as last
+        * time, identify the required typmod mapping, and then look up the
+        * necessary information for processing the fields.
+        */
+       if (typid != remapinfo->rectypid || typmod != remapinfo->rectypmod)
+       {
+               /* Free any old data. */
+               if (remapinfo->tupledesc != NULL)
+                       FreeTupleDesc(remapinfo->tupledesc);
+               /* Is it worth trying to free substructure of the remap tree? */
+               if (remapinfo->field_remap != NULL)
+                       pfree(remapinfo->field_remap);
+
+               /* If transient record type, look up matching local typmod. */
+               if (typid == RECORDOID)
+               {
+                       RecordTypmodMap *mapent;
+
+                       Assert(reader->typmodmap != NULL);
+                       mapent = hash_search(reader->typmodmap, &typmod,
+                                                                HASH_FIND, NULL);
+                       if (mapent == NULL)
+                               elog(ERROR, "tqueue received unrecognized remote typmod %d",
+                                        typmod);
+                       remapinfo->localtypmod = mapent->localtypmod;
+               }
+               else
+                       remapinfo->localtypmod = -1;
+
+               /* Look up tuple descriptor in typcache. */
+               tupledesc = lookup_rowtype_tupdesc(typid, remapinfo->localtypmod);
+
+               /* Figure out whether fields need recursive processing. */
+               remapinfo->field_remap = BuildFieldRemapInfo(tupledesc,
+                                                                                                        reader->mycontext);
+               if (remapinfo->field_remap != NULL)
+               {
+                       /*
+                        * We need to inspect the record contents, so save a copy of the
+                        * tupdesc.  (We could possibly just reference the typcache's
+                        * copy, but then it's problematic when to release the refcount.)
+                        */
+                       MemoryContext oldcontext = MemoryContextSwitchTo(reader->mycontext);
+
+                       remapinfo->tupledesc = CreateTupleDescCopy(tupledesc);
+                       MemoryContextSwitchTo(oldcontext);
+               }
+               else
+               {
+                       /* No fields of the record require remapping. */
+                       remapinfo->tupledesc = NULL;
+               }
+               remapinfo->rectypid = typid;
+               remapinfo->rectypmod = typmod;
+
+               /* Release reference count acquired by lookup_rowtype_tupdesc. */
+               DecrTupleDescRefCount(tupledesc);
+       }
+
        /* If transient record, replace remote typmod with local typmod. */
-       if (typeid == RECORDOID)
+       if (typid == RECORDOID && typmod != remapinfo->localtypmod)
        {
-               Assert(reader->typmodmap != NULL);
-               mapent = hash_search(reader->typmodmap, &typmod,
-                                                        HASH_FIND, NULL);
-               if (mapent == NULL)
-                       elog(ERROR, "found unrecognized remote typmod %d", typmod);
-               typmod = mapent->localtypmod;
+               typmod = remapinfo->localtypmod;
+               changed_typmod = true;
        }
+       else
+               changed_typmod = false;
 
        /*
-        * Fetch tupledesc and compute remap info.  We should probably cache this
-        * so that we don't have to keep recomputing it.
+        * If we need to change the typmod, or if there are any potentially
+        * remappable fields, replace the tuple.
         */
-       tupledesc = lookup_rowtype_tupdesc(typeid, typmod);
-       remapinfo = BuildRemapInfo(tupledesc);
-       DecrTupleDescRefCount(tupledesc);
+       if (changed_typmod || remapinfo->field_remap != NULL)
+       {
+               HeapTupleData htup;
+               HeapTuple       atup;
+
+               /* For now, assume we always need to change the tuple in this case. */
+               *changed = true;
+
+               /* Copy tuple, possibly remapping contained fields. */
+               ItemPointerSetInvalid(&htup.t_self);
+               htup.t_tableOid = InvalidOid;
+               htup.t_len = HeapTupleHeaderGetDatumLength(tup);
+               htup.t_data = tup;
+               atup = TQRemapTuple(reader,
+                                                       remapinfo->tupledesc,
+                                                       remapinfo->field_remap,
+                                                       &htup);
+
+               /* Apply the correct labeling for a local Datum. */
+               HeapTupleHeaderSetTypeId(atup->t_data, typid);
+               HeapTupleHeaderSetTypMod(atup->t_data, typmod);
+               HeapTupleHeaderSetDatumLength(atup->t_data, htup.t_len);
+
+               /* And return the results. */
+               return HeapTupleHeaderGetDatum(atup->t_data);
+       }
 
-       /* Remap tuple. */
-       ItemPointerSetInvalid(&htup.t_self);
-       htup.t_tableOid = InvalidOid;
-       htup.t_len = HeapTupleHeaderGetDatumLength(tup);
-       htup.t_data = tup;
-       atup = TupleQueueRemapTuple(reader, tupledesc, remapinfo, &htup);
-       HeapTupleHeaderSetTypeId(atup->t_data, typeid);
-       HeapTupleHeaderSetTypMod(atup->t_data, typmod);
-       HeapTupleHeaderSetDatumLength(atup->t_data, htup.t_len);
-
-       /* And return the results. */
-       return HeapTupleHeaderGetDatum(atup->t_data);
+       /* Else just return the value as-is. */
+       return value;
 }
 
 /*
@@ -833,57 +1052,54 @@ static void
 TupleQueueHandleControlMessage(TupleQueueReader *reader, Size nbytes,
                                                           char *data)
 {
+       int32           remotetypmod;
        int                     natts;
-       int                     remotetypmod;
        bool            hasoid;
-       char       *buf = data;
-       int                     rc = 0;
-       int                     i;
+       Size            offset = 0;
        Form_pg_attribute *attrs;
-       MemoryContext oldcontext;
        TupleDesc       tupledesc;
-       RecordTypemodMap *mapent;
+       RecordTypmodMap *mapent;
        bool            found;
+       int                     i;
 
        /* Extract remote typmod. */
-       memcpy(&remotetypmod, &buf[rc], sizeof(int));
-       rc += sizeof(int);
+       memcpy(&remotetypmod, &data[offset], sizeof(int32));
+       offset += sizeof(int32);
 
        /* Extract attribute count. */
-       memcpy(&natts, &buf[rc], sizeof(int));
-       rc += sizeof(int);
+       memcpy(&natts, &data[offset], sizeof(int));
+       offset += sizeof(int);
 
        /* Extract hasoid flag. */
-       memcpy(&hasoid, &buf[rc], sizeof(bool));
-       rc += sizeof(bool);
+       memcpy(&hasoid, &data[offset], sizeof(bool));
+       offset += sizeof(bool);
 
-       /* Extract attribute details. */
-       oldcontext = MemoryContextSwitchTo(CurTransactionContext);
+       /* Extract attribute details. The tupledesc made here is just transient. */
        attrs = palloc(natts * sizeof(Form_pg_attribute));
-       for (i = 0; i < natts; ++i)
+       for (i = 0; i < natts; i++)
        {
                attrs[i] = palloc(sizeof(FormData_pg_attribute));
-               memcpy(attrs[i], &buf[rc], sizeof(FormData_pg_attribute));
-               rc += sizeof(FormData_pg_attribute);
+               memcpy(attrs[i], &data[offset], sizeof(FormData_pg_attribute));
+               offset += sizeof(FormData_pg_attribute);
        }
-       MemoryContextSwitchTo(oldcontext);
 
        /* We should have read the whole message. */
-       Assert(rc == nbytes);
+       Assert(offset == nbytes);
 
-       /* Construct TupleDesc. */
+       /* Construct TupleDesc, and assign a local typmod. */
        tupledesc = CreateTupleDesc(natts, hasoid, attrs);
        tupledesc = BlessTupleDesc(tupledesc);
 
-       /* Create map if it doesn't exist already. */
+       /* Create mapping hashtable if it doesn't exist already. */
        if (reader->typmodmap == NULL)
        {
                HASHCTL         ctl;
 
-               ctl.keysize = sizeof(int);
-               ctl.entrysize = sizeof(RecordTypemodMap);
-               ctl.hcxt = CurTransactionContext;
-               reader->typmodmap = hash_create("typmodmap hashtable",
+               MemSet(&ctl, 0, sizeof(ctl));
+               ctl.keysize = sizeof(int32);
+               ctl.entrysize = sizeof(RecordTypmodMap);
+               ctl.hcxt = reader->mycontext;
+               reader->typmodmap = hash_create("tqueue receiver record type hashtable",
                                                                                100, &ctl,
                                                                          HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
        }
@@ -892,139 +1108,171 @@ TupleQueueHandleControlMessage(TupleQueueReader *reader, Size nbytes,
        mapent = hash_search(reader->typmodmap, &remotetypmod, HASH_ENTER,
                                                 &found);
        if (found)
-               elog(ERROR, "duplicate message for typmod %d",
+               elog(ERROR, "duplicate tqueue control message for typmod %d",
                         remotetypmod);
        mapent->localtypmod = tupledesc->tdtypmod;
-       elog(DEBUG3, "mapping remote typmod %d to local typmod %d",
-                remotetypmod, tupledesc->tdtypmod);
+
+       elog(DEBUG3, "tqueue mapping remote typmod %d to local typmod %d",
+                remotetypmod, mapent->localtypmod);
 }
 
 /*
- * Build a mapping indicating what remapping class applies to each attribute
- * described by a tupledesc.
+ * Build remap info for the specified data type, storing it in mycontext.
+ * Returns NULL if neither the type nor any subtype could require remapping.
  */
-static RemapInfo *
-BuildRemapInfo(TupleDesc tupledesc)
+static TupleRemapInfo *
+BuildTupleRemapInfo(Oid typid, MemoryContext mycontext)
 {
-       RemapInfo  *remapinfo;
-       Size            size;
-       AttrNumber      i;
-       bool            noop = true;
+       HeapTuple       tup;
+       Form_pg_type typ;
+
+       /* This is recursive, so it could be driven to stack overflow. */
+       check_stack_depth();
 
-       size = offsetof(RemapInfo, mapping) +
-               sizeof(RemapClass) * tupledesc->natts;
-       remapinfo = MemoryContextAllocZero(TopMemoryContext, size);
-       remapinfo->natts = tupledesc->natts;
-       for (i = 0; i < tupledesc->natts; ++i)
+restart:
+       tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(typid));
+       if (!HeapTupleIsValid(tup))
+               elog(ERROR, "cache lookup failed for type %u", typid);
+       typ = (Form_pg_type) GETSTRUCT(tup);
+
+       /* Look through domains to underlying base type. */
+       if (typ->typtype == TYPTYPE_DOMAIN)
        {
-               Form_pg_attribute attr = tupledesc->attrs[i];
+               typid = typ->typbasetype;
+               ReleaseSysCache(tup);
+               goto restart;
+       }
 
-               if (attr->attisdropped)
-               {
-                       remapinfo->mapping[i] = TQUEUE_REMAP_NONE;
-                       continue;
-               }
+       /* If it's a true array type, deal with it that way. */
+       if (OidIsValid(typ->typelem) && typ->typlen == -1)
+       {
+               typid = typ->typelem;
+               ReleaseSysCache(tup);
+               return BuildArrayRemapInfo(typid, mycontext);
+       }
 
-               remapinfo->mapping[i] = GetRemapClass(attr->atttypid);
-               if (remapinfo->mapping[i] != TQUEUE_REMAP_NONE)
-                       noop = false;
+       /* Similarly, deal with ranges appropriately. */
+       if (typ->typtype == TYPTYPE_RANGE)
+       {
+               ReleaseSysCache(tup);
+               return BuildRangeRemapInfo(typid, mycontext);
        }
 
-       if (noop)
+       /*
+        * If it's a composite type (including RECORD), set up for remapping.  We
+        * don't attempt to determine the status of subfields here, since we do
+        * not have enough information yet; just mark everything invalid.
+        */
+       if (typ->typtype == TYPTYPE_COMPOSITE || typid == RECORDOID)
        {
-               pfree(remapinfo);
-               remapinfo = NULL;
+               TupleRemapInfo *remapinfo;
+
+               remapinfo = (TupleRemapInfo *)
+                       MemoryContextAlloc(mycontext, sizeof(TupleRemapInfo));
+               remapinfo->remapclass = TQUEUE_REMAP_RECORD;
+               remapinfo->u.rec.rectypid = InvalidOid;
+               remapinfo->u.rec.rectypmod = -1;
+               remapinfo->u.rec.localtypmod = -1;
+               remapinfo->u.rec.tupledesc = NULL;
+               remapinfo->u.rec.field_remap = NULL;
+               ReleaseSysCache(tup);
+               return remapinfo;
        }
 
+       /* Nothing else can possibly need remapping attention. */
+       ReleaseSysCache(tup);
+       return NULL;
+}
+
+static TupleRemapInfo *
+BuildArrayRemapInfo(Oid elemtypid, MemoryContext mycontext)
+{
+       TupleRemapInfo *remapinfo;
+       TupleRemapInfo *element_remapinfo;
+
+       /* See if element type requires remapping. */
+       element_remapinfo = BuildTupleRemapInfo(elemtypid, mycontext);
+       /* If not, the array doesn't either. */
+       if (element_remapinfo == NULL)
+               return NULL;
+       /* OK, set up to remap the array. */
+       remapinfo = (TupleRemapInfo *)
+               MemoryContextAlloc(mycontext, sizeof(TupleRemapInfo));
+       remapinfo->remapclass = TQUEUE_REMAP_ARRAY;
+       get_typlenbyvalalign(elemtypid,
+                                                &remapinfo->u.arr.typlen,
+                                                &remapinfo->u.arr.typbyval,
+                                                &remapinfo->u.arr.typalign);
+       remapinfo->u.arr.element_remap = element_remapinfo;
+       return remapinfo;
+}
+
+static TupleRemapInfo *
+BuildRangeRemapInfo(Oid rngtypid, MemoryContext mycontext)
+{
+       TupleRemapInfo *remapinfo;
+       TupleRemapInfo *bound_remapinfo;
+       TypeCacheEntry *typcache;
+
+       /*
+        * Get range info from the typcache.  We assume this pointer will stay
+        * valid for the duration of the query.
+        */
+       typcache = lookup_type_cache(rngtypid, TYPECACHE_RANGE_INFO);
+       if (typcache->rngelemtype == NULL)
+               elog(ERROR, "type %u is not a range type", rngtypid);
+
+       /* See if range bound type requires remapping. */
+       bound_remapinfo = BuildTupleRemapInfo(typcache->rngelemtype->type_id,
+                                                                                 mycontext);
+       /* If not, the range doesn't either. */
+       if (bound_remapinfo == NULL)
+               return NULL;
+       /* OK, set up to remap the range. */
+       remapinfo = (TupleRemapInfo *)
+               MemoryContextAlloc(mycontext, sizeof(TupleRemapInfo));
+       remapinfo->remapclass = TQUEUE_REMAP_RANGE;
+       remapinfo->u.rng.typcache = typcache;
+       remapinfo->u.rng.bound_remap = bound_remapinfo;
        return remapinfo;
 }
 
 /*
- * Determine the remap class assocociated with a particular data type.
- *
- * Transient record types need to have the typmod applied on the sending side
- * replaced with a value on the receiving side that has the same meaning.
- *
- * Arrays, range types, and all record types (including named composite types)
- * need to searched for transient record values buried within them.
- * Surprisingly, a walker is required even when the indicated type is a
- * composite type, because the actual value may be a compatible transient
- * record type.
+ * Build remap info for fields of the type described by the given tupdesc.
+ * Returns an array of TupleRemapInfo pointers, or NULL if no field
+ * requires remapping.  Data is allocated in mycontext.
  */
-static RemapClass
-GetRemapClass(Oid typeid)
+static TupleRemapInfo **
+BuildFieldRemapInfo(TupleDesc tupledesc, MemoryContext mycontext)
 {
-       RemapClass      forceResult = TQUEUE_REMAP_NONE;
-       RemapClass      innerResult = TQUEUE_REMAP_NONE;
+       TupleRemapInfo **remapinfo;
+       bool            noop = true;
+       int                     i;
 
-       for (;;)
+       /* Recursively determine the remapping status of each field. */
+       remapinfo = (TupleRemapInfo **)
+               MemoryContextAlloc(mycontext,
+                                                  tupledesc->natts * sizeof(TupleRemapInfo *));
+       for (i = 0; i < tupledesc->natts; i++)
        {
-               HeapTuple       tup;
-               Form_pg_type typ;
-
-               /* Simple cases. */
-               if (typeid == RECORDOID)
-               {
-                       innerResult = TQUEUE_REMAP_RECORD;
-                       break;
-               }
-               if (typeid == RECORDARRAYOID)
-               {
-                       innerResult = TQUEUE_REMAP_ARRAY;
-                       break;
-               }
-
-               /* Otherwise, we need a syscache lookup to figure it out. */
-               tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(typeid));
-               if (!HeapTupleIsValid(tup))
-                       elog(ERROR, "cache lookup failed for type %u", typeid);
-               typ = (Form_pg_type) GETSTRUCT(tup);
-
-               /* Look through domains to underlying base type. */
-               if (typ->typtype == TYPTYPE_DOMAIN)
-               {
-                       typeid = typ->typbasetype;
-                       ReleaseSysCache(tup);
-                       continue;
-               }
-
-               /*
-                * Look through arrays to underlying base type, but the final return
-                * value must be either TQUEUE_REMAP_ARRAY or TQUEUE_REMAP_NONE.  (If
-                * this is an array of integers, for example, we don't need to walk
-                * it.)
-                */
-               if (OidIsValid(typ->typelem) && typ->typlen == -1)
-               {
-                       typeid = typ->typelem;
-                       ReleaseSysCache(tup);
-                       if (forceResult == TQUEUE_REMAP_NONE)
-                               forceResult = TQUEUE_REMAP_ARRAY;
-                       continue;
-               }
+               Form_pg_attribute attr = tupledesc->attrs[i];
 
-               /*
-                * Similarly, look through ranges to the underlying base type, but the
-                * final return value must be either TQUEUE_REMAP_RANGE or
-                * TQUEUE_REMAP_NONE.
-                */
-               if (typ->typtype == TYPTYPE_RANGE)
+               if (attr->attisdropped)
                {
-                       ReleaseSysCache(tup);
-                       if (forceResult == TQUEUE_REMAP_NONE)
-                               forceResult = TQUEUE_REMAP_RANGE;
-                       typeid = get_range_subtype(typeid);
+                       remapinfo[i] = NULL;
                        continue;
                }
+               remapinfo[i] = BuildTupleRemapInfo(attr->atttypid, mycontext);
+               if (remapinfo[i] != NULL)
+                       noop = false;
+       }
 
-               /* Walk composite types.  Nothing else needs special handling. */
-               if (typ->typtype == TYPTYPE_COMPOSITE)
-                       innerResult = TQUEUE_REMAP_RECORD;
-               ReleaseSysCache(tup);
-               break;
+       /* If no fields require remapping, report that by returning NULL. */
+       if (noop)
+       {
+               pfree(remapinfo);
+               remapinfo = NULL;
        }
 
-       if (innerResult != TQUEUE_REMAP_NONE && forceResult != TQUEUE_REMAP_NONE)
-               return forceResult;
-       return innerResult;
+       return remapinfo;
 }