* tqueue.c
* Use shm_mq to send & receive tuples between parallel backends
*
- * Most of the complexity in this module arises from transient RECORD types,
- * which all have type RECORDOID and are distinguished by typmod numbers
- * that are managed per-backend (see src/backend/utils/cache/typcache.c).
- * The sender's set of RECORD typmod assignments probably doesn't match the
- * receiver's. To deal with this, we make the sender send a description
- * of each transient RECORD type appearing in the data it sends. The
- * receiver finds or creates a matching type in its own typcache, and then
- * maps the sender's typmod for that type to its own typmod.
- *
* A DestReceiver of type DestTupleQueue, which is a TQueueDestReceiver
- * under the hood, writes tuples from the executor to a shm_mq. If
- * necessary, it also writes control messages describing transient
- * record types used within the tuple.
+ * under the hood, writes tuples from the executor to a shm_mq.
*
- * A TupleQueueReader reads tuples, and control messages if any are sent,
- * from a shm_mq and returns the tuples. If transient record types are
- * in use, it registers those types locally based on the control messages
- * and rewrites the typmods sent by the remote side to the corresponding
- * local record typmods.
+ * A TupleQueueReader reads tuples from a shm_mq and returns the tuples.
*
* Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
#include "postgres.h"
#include "access/htup_details.h"
-#include "catalog/pg_type.h"
#include "executor/tqueue.h"
-#include "funcapi.h"
-#include "lib/stringinfo.h"
-#include "miscadmin.h"
-#include "utils/array.h"
-#include "utils/lsyscache.h"
-#include "utils/memutils.h"
-#include "utils/rangetypes.h"
-#include "utils/syscache.h"
-#include "utils/typcache.h"
-
-
-/*
- * The data transferred through the shm_mq is divided into messages.
- * One-byte messages are mode-switch messages, telling the receiver to switch
- * between "control" and "data" modes. (We always start up in "data" mode.)
- * Otherwise, when in "data" mode, each message is a tuple. When in "control"
- * mode, each message defines one transient-typmod-to-tupledesc mapping to
- * let us interpret future tuples. Both of those cases certainly require
- * more than one byte, so no confusion is possible.
- */
-#define TUPLE_QUEUE_MODE_CONTROL 'c' /* mode-switch message contents */
-#define TUPLE_QUEUE_MODE_DATA 'd'
-
-/*
- * Both the sender and receiver build trees of TupleRemapInfo nodes to help
- * them identify which (sub) fields of transmitted tuples are composite and
- * may thus need remap processing. We might need to look within arrays and
- * ranges, not only composites, to find composite sub-fields. A NULL
- * TupleRemapInfo pointer indicates that it is known that the described field
- * is not composite and has no composite substructure.
- *
- * Note that we currently have to look at each composite field at runtime,
- * even if we believe it's of a named composite type (i.e., not RECORD).
- * This is because we allow the actual value to be a compatible transient
- * RECORD type. That's grossly inefficient, and it would be good to get
- * rid of the requirement, but it's not clear what would need to change.
- *
- * Also, we allow the top-level tuple structure, as well as the actual
- * structure of composite subfields, to change from one tuple to the next
- * at runtime. This may well be entirely historical, but it's mostly free
- * to support given the previous requirement; and other places in the system
- * also permit this, so it's not entirely clear if we could drop it.
- */
-
-typedef enum
-{
- TQUEUE_REMAP_ARRAY, /* array */
- TQUEUE_REMAP_RANGE, /* range */
- TQUEUE_REMAP_RECORD /* composite type, named or transient */
-} TupleRemapClass;
-
-typedef struct TupleRemapInfo TupleRemapInfo;
-
-typedef struct ArrayRemapInfo
-{
- int16 typlen; /* array element type's storage properties */
- bool typbyval;
- char typalign;
- TupleRemapInfo *element_remap; /* array element type's remap info */
-} ArrayRemapInfo;
-
-typedef struct RangeRemapInfo
-{
- TypeCacheEntry *typcache; /* range type's typcache entry */
- TupleRemapInfo *bound_remap; /* range bound type's remap info */
-} RangeRemapInfo;
-
-typedef struct RecordRemapInfo
-{
- /* Original (remote) type ID info last seen for this composite field */
- Oid rectypid;
- int32 rectypmod;
- /* Local RECORD typmod, or -1 if unset; not used on sender side */
- int32 localtypmod;
- /* If no fields of the record require remapping, these are NULL: */
- TupleDesc tupledesc; /* copy of record's tupdesc */
- TupleRemapInfo **field_remap; /* each field's remap info */
-} RecordRemapInfo;
-
-struct TupleRemapInfo
-{
- TupleRemapClass remapclass;
- union
- {
- ArrayRemapInfo arr;
- RangeRemapInfo rng;
- RecordRemapInfo rec;
- } u;
-};
/*
* DestReceiver object's private contents
*
- * queue and tupledesc are pointers to data supplied by DestReceiver's caller.
- * The recordhtab and remap info are owned by the DestReceiver and are kept
- * in mycontext. tmpcontext is a tuple-lifespan context to hold cruft
- * created while traversing each tuple to find record subfields.
+ * queue is a pointer to data supplied by DestReceiver's caller.
*/
typedef struct TQueueDestReceiver
{
DestReceiver pub; /* public fields */
shm_mq_handle *queue; /* shm_mq to send to */
- MemoryContext mycontext; /* context containing TQueueDestReceiver */
- MemoryContext tmpcontext; /* per-tuple context, if needed */
- HTAB *recordhtab; /* table of transmitted typmods, if needed */
- char mode; /* current message mode */
- TupleDesc tupledesc; /* current top-level tuple descriptor */
- TupleRemapInfo **field_remapinfo; /* current top-level remap info */
} TQueueDestReceiver;
-/*
- * Hash table entries for mapping remote to local typmods.
- */
-typedef struct RecordTypmodMap
-{
- int32 remotetypmod; /* hash key (must be first!) */
- int32 localtypmod;
-} RecordTypmodMap;
-
/*
* TupleQueueReader object's private contents
*
- * queue and tupledesc are pointers to data supplied by reader's caller.
- * The typmodmap and remap info are owned by the TupleQueueReader and
- * are kept in mycontext.
+ * queue is a pointer to data supplied by reader's caller.
*
* "typedef struct TupleQueueReader TupleQueueReader" is in tqueue.h
*/
struct TupleQueueReader
{
shm_mq_handle *queue; /* shm_mq to receive from */
- MemoryContext mycontext; /* context containing TupleQueueReader */
- HTAB *typmodmap; /* RecordTypmodMap hashtable, if needed */
- char mode; /* current message mode */
- TupleDesc tupledesc; /* current top-level tuple descriptor */
- TupleRemapInfo **field_remapinfo; /* current top-level remap info */
};
-/* Local function prototypes */
-static void TQExamine(TQueueDestReceiver *tqueue,
- TupleRemapInfo *remapinfo,
- Datum value);
-static void TQExamineArray(TQueueDestReceiver *tqueue,
- ArrayRemapInfo *remapinfo,
- Datum value);
-static void TQExamineRange(TQueueDestReceiver *tqueue,
- RangeRemapInfo *remapinfo,
- Datum value);
-static void TQExamineRecord(TQueueDestReceiver *tqueue,
- RecordRemapInfo *remapinfo,
- Datum value);
-static void TQSendRecordInfo(TQueueDestReceiver *tqueue, int32 typmod,
- TupleDesc tupledesc);
-static void TupleQueueHandleControlMessage(TupleQueueReader *reader,
- Size nbytes, char *data);
-static HeapTuple TupleQueueHandleDataMessage(TupleQueueReader *reader,
- Size nbytes, HeapTupleHeader data);
-static HeapTuple TQRemapTuple(TupleQueueReader *reader,
- TupleDesc tupledesc,
- TupleRemapInfo **field_remapinfo,
- HeapTuple tuple);
-static Datum TQRemap(TupleQueueReader *reader, TupleRemapInfo *remapinfo,
- Datum value, bool *changed);
-static Datum TQRemapArray(TupleQueueReader *reader, ArrayRemapInfo *remapinfo,
- Datum value, bool *changed);
-static Datum TQRemapRange(TupleQueueReader *reader, RangeRemapInfo *remapinfo,
- Datum value, bool *changed);
-static Datum TQRemapRecord(TupleQueueReader *reader, RecordRemapInfo *remapinfo,
- Datum value, bool *changed);
-static TupleRemapInfo *BuildTupleRemapInfo(Oid typid, MemoryContext mycontext);
-static TupleRemapInfo *BuildArrayRemapInfo(Oid elemtypid,
- MemoryContext mycontext);
-static TupleRemapInfo *BuildRangeRemapInfo(Oid rngtypid,
- MemoryContext mycontext);
-static TupleRemapInfo **BuildFieldRemapInfo(TupleDesc tupledesc,
- MemoryContext mycontext);
-
-
/*
* Receive a tuple from a query, and send it to the designated shm_mq.
*
tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
{
TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
- TupleDesc tupledesc = slot->tts_tupleDescriptor;
HeapTuple tuple;
shm_mq_result result;
- /*
- * If first time through, compute remapping info for the top-level fields.
- * On later calls, if the tupledesc has changed, set up for the new
- * tupledesc. (This is a strange test both because the executor really
- * shouldn't change the tupledesc, and also because it would be unsafe if
- * the old tupledesc could be freed and a new one allocated at the same
- * address. But since some very old code in printtup.c uses a similar
- * approach, we adopt it here as well.)
- *
- * Here and elsewhere in this module, when replacing remapping info we
- * pfree the top-level object because that's easy, but we don't bother to
- * recursively free any substructure. This would lead to query-lifespan
- * memory leaks if the mapping info actually changed frequently, but since
- * we don't expect that to happen, it doesn't seem worth expending code to
- * prevent it.
- */
- if (tqueue->tupledesc != tupledesc)
- {
- /* Is it worth trying to free substructure of the remap tree? */
- if (tqueue->field_remapinfo != NULL)
- pfree(tqueue->field_remapinfo);
- tqueue->field_remapinfo = BuildFieldRemapInfo(tupledesc,
- tqueue->mycontext);
- tqueue->tupledesc = tupledesc;
- }
-
- /*
- * When, because of the types being transmitted, no record typmod mapping
- * can be needed, we can skip a good deal of work.
- */
- if (tqueue->field_remapinfo != NULL)
- {
- TupleRemapInfo **remapinfo = tqueue->field_remapinfo;
- int i;
- MemoryContext oldcontext = NULL;
-
- /* Deform the tuple so we can examine fields, if not done already. */
- slot_getallattrs(slot);
-
- /* Iterate over each attribute and search it for transient typmods. */
- for (i = 0; i < tupledesc->natts; i++)
- {
- /* Ignore nulls and types that don't need special handling. */
- if (slot->tts_isnull[i] || remapinfo[i] == NULL)
- continue;
-
- /* Switch to temporary memory context to avoid leaking. */
- if (oldcontext == NULL)
- {
- if (tqueue->tmpcontext == NULL)
- tqueue->tmpcontext =
- AllocSetContextCreate(tqueue->mycontext,
- "tqueue sender temp context",
- ALLOCSET_DEFAULT_SIZES);
- oldcontext = MemoryContextSwitchTo(tqueue->tmpcontext);
- }
-
- /* Examine the value. */
- TQExamine(tqueue, remapinfo[i], slot->tts_values[i]);
- }
-
- /* If we used the temp context, reset it and restore prior context. */
- if (oldcontext != NULL)
- {
- MemoryContextSwitchTo(oldcontext);
- MemoryContextReset(tqueue->tmpcontext);
- }
-
- /* If we entered control mode, switch back to data mode. */
- if (tqueue->mode != TUPLE_QUEUE_MODE_DATA)
- {
- tqueue->mode = TUPLE_QUEUE_MODE_DATA;
- shm_mq_send(tqueue->queue, sizeof(char), &tqueue->mode, false);
- }
- }
-
/* Send the tuple itself. */
tuple = ExecMaterializeSlot(slot);
result = shm_mq_send(tqueue->queue, tuple->t_len, tuple->t_data, false);
return true;
}
-/*
- * Examine the given datum and send any necessary control messages for
- * transient record types contained in it.
- *
- * remapinfo is previously-computed remapping info about the datum's type.
- *
- * This function just dispatches based on the remap class.
- */
-static void
-TQExamine(TQueueDestReceiver *tqueue, TupleRemapInfo *remapinfo, Datum value)
-{
- /* This is recursive, so it could be driven to stack overflow. */
- check_stack_depth();
-
- switch (remapinfo->remapclass)
- {
- case TQUEUE_REMAP_ARRAY:
- TQExamineArray(tqueue, &remapinfo->u.arr, value);
- break;
- case TQUEUE_REMAP_RANGE:
- TQExamineRange(tqueue, &remapinfo->u.rng, value);
- break;
- case TQUEUE_REMAP_RECORD:
- TQExamineRecord(tqueue, &remapinfo->u.rec, value);
- break;
- }
-}
-
-/*
- * Examine a record datum and send any necessary control messages for
- * transient record types contained in it.
- */
-static void
-TQExamineRecord(TQueueDestReceiver *tqueue, RecordRemapInfo *remapinfo,
- Datum value)
-{
- HeapTupleHeader tup;
- Oid typid;
- int32 typmod;
- TupleDesc tupledesc;
-
- /* Extract type OID and typmod from tuple. */
- tup = DatumGetHeapTupleHeader(value);
- typid = HeapTupleHeaderGetTypeId(tup);
- typmod = HeapTupleHeaderGetTypMod(tup);
-
- /*
- * If first time through, or if this isn't the same composite type as last
- * time, consider sending a control message, and then look up the
- * necessary information for examining the fields.
- */
- if (typid != remapinfo->rectypid || typmod != remapinfo->rectypmod)
- {
- /* Free any old data. */
- if (remapinfo->tupledesc != NULL)
- FreeTupleDesc(remapinfo->tupledesc);
- /* Is it worth trying to free substructure of the remap tree? */
- if (remapinfo->field_remap != NULL)
- pfree(remapinfo->field_remap);
-
- /* Look up tuple descriptor in typcache. */
- tupledesc = lookup_rowtype_tupdesc(typid, typmod);
-
- /*
- * If this is a transient record type, send the tupledesc in a control
- * message. (TQSendRecordInfo is smart enough to do this only once
- * per typmod.)
- */
- if (typid == RECORDOID)
- TQSendRecordInfo(tqueue, typmod, tupledesc);
-
- /* Figure out whether fields need recursive processing. */
- remapinfo->field_remap = BuildFieldRemapInfo(tupledesc,
- tqueue->mycontext);
- if (remapinfo->field_remap != NULL)
- {
- /*
- * We need to inspect the record contents, so save a copy of the
- * tupdesc. (We could possibly just reference the typcache's
- * copy, but then it's problematic when to release the refcount.)
- */
- MemoryContext oldcontext = MemoryContextSwitchTo(tqueue->mycontext);
-
- remapinfo->tupledesc = CreateTupleDescCopy(tupledesc);
- MemoryContextSwitchTo(oldcontext);
- }
- else
- {
- /* No fields of the record require remapping. */
- remapinfo->tupledesc = NULL;
- }
- remapinfo->rectypid = typid;
- remapinfo->rectypmod = typmod;
-
- /* Release reference count acquired by lookup_rowtype_tupdesc. */
- DecrTupleDescRefCount(tupledesc);
- }
-
- /*
- * If field remapping is required, deform the tuple and examine each
- * field.
- */
- if (remapinfo->field_remap != NULL)
- {
- Datum *values;
- bool *isnull;
- HeapTupleData tdata;
- int i;
-
- /* Deform the tuple so we can check each column within. */
- tupledesc = remapinfo->tupledesc;
- values = (Datum *) palloc(tupledesc->natts * sizeof(Datum));
- isnull = (bool *) palloc(tupledesc->natts * sizeof(bool));
- tdata.t_len = HeapTupleHeaderGetDatumLength(tup);
- ItemPointerSetInvalid(&(tdata.t_self));
- tdata.t_tableOid = InvalidOid;
- tdata.t_data = tup;
- heap_deform_tuple(&tdata, tupledesc, values, isnull);
-
- /* Recursively check each interesting non-NULL attribute. */
- for (i = 0; i < tupledesc->natts; i++)
- {
- if (!isnull[i] && remapinfo->field_remap[i])
- TQExamine(tqueue, remapinfo->field_remap[i], values[i]);
- }
-
- /* Need not clean up, since we're in a short-lived context. */
- }
-}
-
-/*
- * Examine an array datum and send any necessary control messages for
- * transient record types contained in it.
- */
-static void
-TQExamineArray(TQueueDestReceiver *tqueue, ArrayRemapInfo *remapinfo,
- Datum value)
-{
- ArrayType *arr = DatumGetArrayTypeP(value);
- Oid typid = ARR_ELEMTYPE(arr);
- Datum *elem_values;
- bool *elem_nulls;
- int num_elems;
- int i;
-
- /* Deconstruct the array. */
- deconstruct_array(arr, typid, remapinfo->typlen,
- remapinfo->typbyval, remapinfo->typalign,
- &elem_values, &elem_nulls, &num_elems);
-
- /* Examine each element. */
- for (i = 0; i < num_elems; i++)
- {
- if (!elem_nulls[i])
- TQExamine(tqueue, remapinfo->element_remap, elem_values[i]);
- }
-}
-
-/*
- * Examine a range datum and send any necessary control messages for
- * transient record types contained in it.
- */
-static void
-TQExamineRange(TQueueDestReceiver *tqueue, RangeRemapInfo *remapinfo,
- Datum value)
-{
- RangeType *range = DatumGetRangeType(value);
- RangeBound lower;
- RangeBound upper;
- bool empty;
-
- /* Extract the lower and upper bounds. */
- range_deserialize(remapinfo->typcache, range, &lower, &upper, &empty);
-
- /* Nothing to do for an empty range. */
- if (empty)
- return;
-
- /* Examine each bound, if present. */
- if (!upper.infinite)
- TQExamine(tqueue, remapinfo->bound_remap, upper.val);
- if (!lower.infinite)
- TQExamine(tqueue, remapinfo->bound_remap, lower.val);
-}
-
-/*
- * Send tuple descriptor information for a transient typmod, unless we've
- * already done so previously.
- */
-static void
-TQSendRecordInfo(TQueueDestReceiver *tqueue, int32 typmod, TupleDesc tupledesc)
-{
- StringInfoData buf;
- bool found;
- int i;
-
- /* Initialize hash table if not done yet. */
- if (tqueue->recordhtab == NULL)
- {
- HASHCTL ctl;
-
- MemSet(&ctl, 0, sizeof(ctl));
- /* Hash table entries are just typmods */
- ctl.keysize = sizeof(int32);
- ctl.entrysize = sizeof(int32);
- ctl.hcxt = tqueue->mycontext;
- tqueue->recordhtab = hash_create("tqueue sender record type hashtable",
- 100, &ctl,
- HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
- }
-
- /* Have we already seen this record type? If not, must report it. */
- hash_search(tqueue->recordhtab, &typmod, HASH_ENTER, &found);
- if (found)
- return;
-
- elog(DEBUG3, "sending tqueue control message for record typmod %d", typmod);
-
- /* If message queue is in data mode, switch to control mode. */
- if (tqueue->mode != TUPLE_QUEUE_MODE_CONTROL)
- {
- tqueue->mode = TUPLE_QUEUE_MODE_CONTROL;
- shm_mq_send(tqueue->queue, sizeof(char), &tqueue->mode, false);
- }
-
- /* Assemble a control message. */
- initStringInfo(&buf);
- appendBinaryStringInfo(&buf, (char *) &typmod, sizeof(int32));
- appendBinaryStringInfo(&buf, (char *) &tupledesc->natts, sizeof(int));
- appendBinaryStringInfo(&buf, (char *) &tupledesc->tdhasoid, sizeof(bool));
- for (i = 0; i < tupledesc->natts; i++)
- {
- appendBinaryStringInfo(&buf, (char *) TupleDescAttr(tupledesc, i),
- sizeof(FormData_pg_attribute));
- }
-
- /* Send control message. */
- shm_mq_send(tqueue->queue, buf.len, buf.data, false);
-
- /* We assume it's OK to leak buf because we're in a short-lived context. */
-}
-
/*
* Prepare to receive tuples from executor.
*/
/* We probably already detached from queue, but let's be sure */
if (tqueue->queue != NULL)
shm_mq_detach(tqueue->queue);
- if (tqueue->tmpcontext != NULL)
- MemoryContextDelete(tqueue->tmpcontext);
- if (tqueue->recordhtab != NULL)
- hash_destroy(tqueue->recordhtab);
- /* Is it worth trying to free substructure of the remap tree? */
- if (tqueue->field_remapinfo != NULL)
- pfree(tqueue->field_remapinfo);
pfree(self);
}
self->pub.rDestroy = tqueueDestroyReceiver;
self->pub.mydest = DestTupleQueue;
self->queue = handle;
- self->mycontext = CurrentMemoryContext;
- self->tmpcontext = NULL;
- self->recordhtab = NULL;
- self->mode = TUPLE_QUEUE_MODE_DATA;
- /* Top-level tupledesc is not known yet */
- self->tupledesc = NULL;
- self->field_remapinfo = NULL;
return (DestReceiver *) self;
}
* Create a tuple queue reader.
*/
TupleQueueReader *
-CreateTupleQueueReader(shm_mq_handle *handle, TupleDesc tupledesc)
+CreateTupleQueueReader(shm_mq_handle *handle)
{
TupleQueueReader *reader = palloc0(sizeof(TupleQueueReader));
reader->queue = handle;
- reader->mycontext = CurrentMemoryContext;
- reader->typmodmap = NULL;
- reader->mode = TUPLE_QUEUE_MODE_DATA;
- reader->tupledesc = tupledesc;
- reader->field_remapinfo = BuildFieldRemapInfo(tupledesc, reader->mycontext);
return reader;
}
void
DestroyTupleQueueReader(TupleQueueReader *reader)
{
- if (reader->typmodmap != NULL)
- hash_destroy(reader->typmodmap);
- /* Is it worth trying to free substructure of the remap tree? */
- if (reader->field_remapinfo != NULL)
- pfree(reader->field_remapinfo);
pfree(reader);
}
* is set to true when there are no remaining tuples and otherwise to false.
*
* The returned tuple, if any, is allocated in CurrentMemoryContext.
- * That should be a short-lived (tuple-lifespan) context, because we are
- * pretty cavalier about leaking memory in that context if we have to do
- * tuple remapping.
*
* Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK, this can still
* accumulate bytes from a partially-read message, so it's useful to call
HeapTuple
TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
{
+ HeapTupleData htup;
shm_mq_result result;
+ Size nbytes;
+ void *data;
if (done != NULL)
*done = false;
- for (;;)
- {
- Size nbytes;
- void *data;
+ /* Attempt to read a message. */
+ result = shm_mq_receive(reader->queue, &nbytes, &data, nowait);
- /* Attempt to read a message. */
- result = shm_mq_receive(reader->queue, &nbytes, &data, nowait);
-
- /* If queue is detached, set *done and return NULL. */
- if (result == SHM_MQ_DETACHED)
- {
- if (done != NULL)
- *done = true;
- return NULL;
- }
-
- /* In non-blocking mode, bail out if no message ready yet. */
- if (result == SHM_MQ_WOULD_BLOCK)
- return NULL;
- Assert(result == SHM_MQ_SUCCESS);
-
- /*
- * We got a message (see message spec at top of file). Process it.
- */
- if (nbytes == 1)
- {
- /* Mode switch message. */
- reader->mode = ((char *) data)[0];
- }
- else if (reader->mode == TUPLE_QUEUE_MODE_DATA)
- {
- /* Tuple data. */
- return TupleQueueHandleDataMessage(reader, nbytes, data);
- }
- else if (reader->mode == TUPLE_QUEUE_MODE_CONTROL)
- {
- /* Control message, describing a transient record type. */
- TupleQueueHandleControlMessage(reader, nbytes, data);
- }
- else
- elog(ERROR, "unrecognized tqueue mode: %d", (int) reader->mode);
+ /* If queue is detached, set *done and return NULL. */
+ if (result == SHM_MQ_DETACHED)
+ {
+ if (done != NULL)
+ *done = true;
+ return NULL;
}
-}
-/*
- * Handle a data message - that is, a tuple - from the remote side.
- */
-static HeapTuple
-TupleQueueHandleDataMessage(TupleQueueReader *reader,
- Size nbytes,
- HeapTupleHeader data)
-{
- HeapTupleData htup;
+ /* In non-blocking mode, bail out if no message ready yet. */
+ if (result == SHM_MQ_WOULD_BLOCK)
+ return NULL;
+ Assert(result == SHM_MQ_SUCCESS);
/*
* Set up a dummy HeapTupleData pointing to the data from the shm_mq
htup.t_len = nbytes;
htup.t_data = data;
- /*
- * Either just copy the data into a regular palloc'd tuple, or remap it,
- * as required.
- */
- return TQRemapTuple(reader,
- reader->tupledesc,
- reader->field_remapinfo,
- &htup);
-}
-
-/*
- * Copy the given tuple, remapping any transient typmods contained in it.
- */
-static HeapTuple
-TQRemapTuple(TupleQueueReader *reader,
- TupleDesc tupledesc,
- TupleRemapInfo **field_remapinfo,
- HeapTuple tuple)
-{
- Datum *values;
- bool *isnull;
- bool changed = false;
- int i;
-
- /*
- * If no remapping is necessary, just copy the tuple into a single
- * palloc'd chunk, as caller will expect.
- */
- if (field_remapinfo == NULL)
- return heap_copytuple(tuple);
-
- /* Deform tuple so we can remap record typmods for individual attrs. */
- values = (Datum *) palloc(tupledesc->natts * sizeof(Datum));
- isnull = (bool *) palloc(tupledesc->natts * sizeof(bool));
- heap_deform_tuple(tuple, tupledesc, values, isnull);
-
- /* Recursively process each interesting non-NULL attribute. */
- for (i = 0; i < tupledesc->natts; i++)
- {
- if (isnull[i] || field_remapinfo[i] == NULL)
- continue;
- values[i] = TQRemap(reader, field_remapinfo[i], values[i], &changed);
- }
-
- /* Reconstruct the modified tuple, if anything was modified. */
- if (changed)
- return heap_form_tuple(tupledesc, values, isnull);
- else
- return heap_copytuple(tuple);
-}
-
-/*
- * Process the given datum and replace any transient record typmods
- * contained in it. Set *changed to TRUE if we actually changed the datum.
- *
- * remapinfo is previously-computed remapping info about the datum's type.
- *
- * This function just dispatches based on the remap class.
- */
-static Datum
-TQRemap(TupleQueueReader *reader, TupleRemapInfo *remapinfo,
- Datum value, bool *changed)
-{
- /* This is recursive, so it could be driven to stack overflow. */
- check_stack_depth();
-
- switch (remapinfo->remapclass)
- {
- case TQUEUE_REMAP_ARRAY:
- return TQRemapArray(reader, &remapinfo->u.arr, value, changed);
-
- case TQUEUE_REMAP_RANGE:
- return TQRemapRange(reader, &remapinfo->u.rng, value, changed);
-
- case TQUEUE_REMAP_RECORD:
- return TQRemapRecord(reader, &remapinfo->u.rec, value, changed);
- }
-
- elog(ERROR, "unrecognized tqueue remap class: %d",
- (int) remapinfo->remapclass);
- return (Datum) 0;
-}
-
-/*
- * Process the given array datum and replace any transient record typmods
- * contained in it. Set *changed to TRUE if we actually changed the datum.
- */
-static Datum
-TQRemapArray(TupleQueueReader *reader, ArrayRemapInfo *remapinfo,
- Datum value, bool *changed)
-{
- ArrayType *arr = DatumGetArrayTypeP(value);
- Oid typid = ARR_ELEMTYPE(arr);
- bool element_changed = false;
- Datum *elem_values;
- bool *elem_nulls;
- int num_elems;
- int i;
-
- /* Deconstruct the array. */
- deconstruct_array(arr, typid, remapinfo->typlen,
- remapinfo->typbyval, remapinfo->typalign,
- &elem_values, &elem_nulls, &num_elems);
-
- /* Remap each element. */
- for (i = 0; i < num_elems; i++)
- {
- if (!elem_nulls[i])
- elem_values[i] = TQRemap(reader,
- remapinfo->element_remap,
- elem_values[i],
- &element_changed);
- }
-
- if (element_changed)
- {
- /* Reconstruct and return the array. */
- *changed = true;
- arr = construct_md_array(elem_values, elem_nulls,
- ARR_NDIM(arr), ARR_DIMS(arr), ARR_LBOUND(arr),
- typid, remapinfo->typlen,
- remapinfo->typbyval, remapinfo->typalign);
- return PointerGetDatum(arr);
- }
-
- /* Else just return the value as-is. */
- return value;
-}
-
-/*
- * Process the given range datum and replace any transient record typmods
- * contained in it. Set *changed to TRUE if we actually changed the datum.
- */
-static Datum
-TQRemapRange(TupleQueueReader *reader, RangeRemapInfo *remapinfo,
- Datum value, bool *changed)
-{
- RangeType *range = DatumGetRangeType(value);
- bool bound_changed = false;
- RangeBound lower;
- RangeBound upper;
- bool empty;
-
- /* Extract the lower and upper bounds. */
- range_deserialize(remapinfo->typcache, range, &lower, &upper, &empty);
-
- /* Nothing to do for an empty range. */
- if (empty)
- return value;
-
- /* Remap each bound, if present. */
- if (!upper.infinite)
- upper.val = TQRemap(reader, remapinfo->bound_remap,
- upper.val, &bound_changed);
- if (!lower.infinite)
- lower.val = TQRemap(reader, remapinfo->bound_remap,
- lower.val, &bound_changed);
-
- if (bound_changed)
- {
- /* Reserialize. */
- *changed = true;
- range = range_serialize(remapinfo->typcache, &lower, &upper, empty);
- return RangeTypeGetDatum(range);
- }
-
- /* Else just return the value as-is. */
- return value;
-}
-
-/*
- * Process the given record datum and replace any transient record typmods
- * contained in it. Set *changed to TRUE if we actually changed the datum.
- */
-static Datum
-TQRemapRecord(TupleQueueReader *reader, RecordRemapInfo *remapinfo,
- Datum value, bool *changed)
-{
- HeapTupleHeader tup;
- Oid typid;
- int32 typmod;
- bool changed_typmod;
- TupleDesc tupledesc;
-
- /* Extract type OID and typmod from tuple. */
- tup = DatumGetHeapTupleHeader(value);
- typid = HeapTupleHeaderGetTypeId(tup);
- typmod = HeapTupleHeaderGetTypMod(tup);
-
- /*
- * If first time through, or if this isn't the same composite type as last
- * time, identify the required typmod mapping, and then look up the
- * necessary information for processing the fields.
- */
- if (typid != remapinfo->rectypid || typmod != remapinfo->rectypmod)
- {
- /* Free any old data. */
- if (remapinfo->tupledesc != NULL)
- FreeTupleDesc(remapinfo->tupledesc);
- /* Is it worth trying to free substructure of the remap tree? */
- if (remapinfo->field_remap != NULL)
- pfree(remapinfo->field_remap);
-
- /* If transient record type, look up matching local typmod. */
- if (typid == RECORDOID)
- {
- RecordTypmodMap *mapent;
-
- Assert(reader->typmodmap != NULL);
- mapent = hash_search(reader->typmodmap, &typmod,
- HASH_FIND, NULL);
- if (mapent == NULL)
- elog(ERROR, "tqueue received unrecognized remote typmod %d",
- typmod);
- remapinfo->localtypmod = mapent->localtypmod;
- }
- else
- remapinfo->localtypmod = -1;
-
- /* Look up tuple descriptor in typcache. */
- tupledesc = lookup_rowtype_tupdesc(typid, remapinfo->localtypmod);
-
- /* Figure out whether fields need recursive processing. */
- remapinfo->field_remap = BuildFieldRemapInfo(tupledesc,
- reader->mycontext);
- if (remapinfo->field_remap != NULL)
- {
- /*
- * We need to inspect the record contents, so save a copy of the
- * tupdesc. (We could possibly just reference the typcache's
- * copy, but then it's problematic when to release the refcount.)
- */
- MemoryContext oldcontext = MemoryContextSwitchTo(reader->mycontext);
-
- remapinfo->tupledesc = CreateTupleDescCopy(tupledesc);
- MemoryContextSwitchTo(oldcontext);
- }
- else
- {
- /* No fields of the record require remapping. */
- remapinfo->tupledesc = NULL;
- }
- remapinfo->rectypid = typid;
- remapinfo->rectypmod = typmod;
-
- /* Release reference count acquired by lookup_rowtype_tupdesc. */
- DecrTupleDescRefCount(tupledesc);
- }
-
- /* If transient record, replace remote typmod with local typmod. */
- if (typid == RECORDOID && typmod != remapinfo->localtypmod)
- {
- typmod = remapinfo->localtypmod;
- changed_typmod = true;
- }
- else
- changed_typmod = false;
-
- /*
- * If we need to change the typmod, or if there are any potentially
- * remappable fields, replace the tuple.
- */
- if (changed_typmod || remapinfo->field_remap != NULL)
- {
- HeapTupleData htup;
- HeapTuple atup;
-
- /* For now, assume we always need to change the tuple in this case. */
- *changed = true;
-
- /* Copy tuple, possibly remapping contained fields. */
- ItemPointerSetInvalid(&htup.t_self);
- htup.t_tableOid = InvalidOid;
- htup.t_len = HeapTupleHeaderGetDatumLength(tup);
- htup.t_data = tup;
- atup = TQRemapTuple(reader,
- remapinfo->tupledesc,
- remapinfo->field_remap,
- &htup);
-
- /* Apply the correct labeling for a local Datum. */
- HeapTupleHeaderSetTypeId(atup->t_data, typid);
- HeapTupleHeaderSetTypMod(atup->t_data, typmod);
- HeapTupleHeaderSetDatumLength(atup->t_data, htup.t_len);
-
- /* And return the results. */
- return HeapTupleHeaderGetDatum(atup->t_data);
- }
-
- /* Else just return the value as-is. */
- return value;
-}
-
-/*
- * Handle a control message from the tuple queue reader.
- *
- * Control messages are sent when the remote side is sending tuples that
- * contain transient record types. We need to arrange to bless those
- * record types locally and translate between remote and local typmods.
- */
-static void
-TupleQueueHandleControlMessage(TupleQueueReader *reader, Size nbytes,
- char *data)
-{
- int32 remotetypmod;
- int natts;
- bool hasoid;
- Size offset = 0;
- Form_pg_attribute *attrs;
- TupleDesc tupledesc;
- RecordTypmodMap *mapent;
- bool found;
- int i;
-
- /* Extract remote typmod. */
- memcpy(&remotetypmod, &data[offset], sizeof(int32));
- offset += sizeof(int32);
-
- /* Extract attribute count. */
- memcpy(&natts, &data[offset], sizeof(int));
- offset += sizeof(int);
-
- /* Extract hasoid flag. */
- memcpy(&hasoid, &data[offset], sizeof(bool));
- offset += sizeof(bool);
-
- /* Extract attribute details. The tupledesc made here is just transient. */
- attrs = palloc(natts * sizeof(Form_pg_attribute));
- for (i = 0; i < natts; i++)
- {
- attrs[i] = palloc(sizeof(FormData_pg_attribute));
- memcpy(attrs[i], &data[offset], sizeof(FormData_pg_attribute));
- offset += sizeof(FormData_pg_attribute);
- }
-
- /* We should have read the whole message. */
- Assert(offset == nbytes);
-
- /* Construct TupleDesc, and assign a local typmod. */
- tupledesc = CreateTupleDesc(natts, hasoid, attrs);
- tupledesc = BlessTupleDesc(tupledesc);
-
- /* Create mapping hashtable if it doesn't exist already. */
- if (reader->typmodmap == NULL)
- {
- HASHCTL ctl;
-
- MemSet(&ctl, 0, sizeof(ctl));
- ctl.keysize = sizeof(int32);
- ctl.entrysize = sizeof(RecordTypmodMap);
- ctl.hcxt = reader->mycontext;
- reader->typmodmap = hash_create("tqueue receiver record type hashtable",
- 100, &ctl,
- HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
- }
-
- /* Create map entry. */
- mapent = hash_search(reader->typmodmap, &remotetypmod, HASH_ENTER,
- &found);
- if (found)
- elog(ERROR, "duplicate tqueue control message for typmod %d",
- remotetypmod);
- mapent->localtypmod = tupledesc->tdtypmod;
-
- elog(DEBUG3, "tqueue mapping remote typmod %d to local typmod %d",
- remotetypmod, mapent->localtypmod);
-}
-
-/*
- * Build remap info for the specified data type, storing it in mycontext.
- * Returns NULL if neither the type nor any subtype could require remapping.
- */
-static TupleRemapInfo *
-BuildTupleRemapInfo(Oid typid, MemoryContext mycontext)
-{
- HeapTuple tup;
- Form_pg_type typ;
-
- /* This is recursive, so it could be driven to stack overflow. */
- check_stack_depth();
-
-restart:
- tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(typid));
- if (!HeapTupleIsValid(tup))
- elog(ERROR, "cache lookup failed for type %u", typid);
- typ = (Form_pg_type) GETSTRUCT(tup);
-
- /* Look through domains to underlying base type. */
- if (typ->typtype == TYPTYPE_DOMAIN)
- {
- typid = typ->typbasetype;
- ReleaseSysCache(tup);
- goto restart;
- }
-
- /* If it's a true array type, deal with it that way. */
- if (OidIsValid(typ->typelem) && typ->typlen == -1)
- {
- typid = typ->typelem;
- ReleaseSysCache(tup);
- return BuildArrayRemapInfo(typid, mycontext);
- }
-
- /* Similarly, deal with ranges appropriately. */
- if (typ->typtype == TYPTYPE_RANGE)
- {
- ReleaseSysCache(tup);
- return BuildRangeRemapInfo(typid, mycontext);
- }
-
- /*
- * If it's a composite type (including RECORD), set up for remapping. We
- * don't attempt to determine the status of subfields here, since we do
- * not have enough information yet; just mark everything invalid.
- */
- if (typ->typtype == TYPTYPE_COMPOSITE || typid == RECORDOID)
- {
- TupleRemapInfo *remapinfo;
-
- remapinfo = (TupleRemapInfo *)
- MemoryContextAlloc(mycontext, sizeof(TupleRemapInfo));
- remapinfo->remapclass = TQUEUE_REMAP_RECORD;
- remapinfo->u.rec.rectypid = InvalidOid;
- remapinfo->u.rec.rectypmod = -1;
- remapinfo->u.rec.localtypmod = -1;
- remapinfo->u.rec.tupledesc = NULL;
- remapinfo->u.rec.field_remap = NULL;
- ReleaseSysCache(tup);
- return remapinfo;
- }
-
- /* Nothing else can possibly need remapping attention. */
- ReleaseSysCache(tup);
- return NULL;
-}
-
-static TupleRemapInfo *
-BuildArrayRemapInfo(Oid elemtypid, MemoryContext mycontext)
-{
- TupleRemapInfo *remapinfo;
- TupleRemapInfo *element_remapinfo;
-
- /* See if element type requires remapping. */
- element_remapinfo = BuildTupleRemapInfo(elemtypid, mycontext);
- /* If not, the array doesn't either. */
- if (element_remapinfo == NULL)
- return NULL;
- /* OK, set up to remap the array. */
- remapinfo = (TupleRemapInfo *)
- MemoryContextAlloc(mycontext, sizeof(TupleRemapInfo));
- remapinfo->remapclass = TQUEUE_REMAP_ARRAY;
- get_typlenbyvalalign(elemtypid,
- &remapinfo->u.arr.typlen,
- &remapinfo->u.arr.typbyval,
- &remapinfo->u.arr.typalign);
- remapinfo->u.arr.element_remap = element_remapinfo;
- return remapinfo;
-}
-
-static TupleRemapInfo *
-BuildRangeRemapInfo(Oid rngtypid, MemoryContext mycontext)
-{
- TupleRemapInfo *remapinfo;
- TupleRemapInfo *bound_remapinfo;
- TypeCacheEntry *typcache;
-
- /*
- * Get range info from the typcache. We assume this pointer will stay
- * valid for the duration of the query.
- */
- typcache = lookup_type_cache(rngtypid, TYPECACHE_RANGE_INFO);
- if (typcache->rngelemtype == NULL)
- elog(ERROR, "type %u is not a range type", rngtypid);
-
- /* See if range bound type requires remapping. */
- bound_remapinfo = BuildTupleRemapInfo(typcache->rngelemtype->type_id,
- mycontext);
- /* If not, the range doesn't either. */
- if (bound_remapinfo == NULL)
- return NULL;
- /* OK, set up to remap the range. */
- remapinfo = (TupleRemapInfo *)
- MemoryContextAlloc(mycontext, sizeof(TupleRemapInfo));
- remapinfo->remapclass = TQUEUE_REMAP_RANGE;
- remapinfo->u.rng.typcache = typcache;
- remapinfo->u.rng.bound_remap = bound_remapinfo;
- return remapinfo;
-}
-
-/*
- * Build remap info for fields of the type described by the given tupdesc.
- * Returns an array of TupleRemapInfo pointers, or NULL if no field
- * requires remapping. Data is allocated in mycontext.
- */
-static TupleRemapInfo **
-BuildFieldRemapInfo(TupleDesc tupledesc, MemoryContext mycontext)
-{
- TupleRemapInfo **remapinfo;
- bool noop = true;
- int i;
-
- /* Recursively determine the remapping status of each field. */
- remapinfo = (TupleRemapInfo **)
- MemoryContextAlloc(mycontext,
- tupledesc->natts * sizeof(TupleRemapInfo *));
- for (i = 0; i < tupledesc->natts; i++)
- {
- Form_pg_attribute attr = TupleDescAttr(tupledesc, i);
-
- if (attr->attisdropped)
- {
- remapinfo[i] = NULL;
- continue;
- }
- remapinfo[i] = BuildTupleRemapInfo(attr->atttypid, mycontext);
- if (remapinfo[i] != NULL)
- noop = false;
- }
-
- /* If no fields require remapping, report that by returning NULL. */
- if (noop)
- {
- pfree(remapinfo);
- remapinfo = NULL;
- }
-
- return remapinfo;
+ return heap_copytuple(&htup);
}