* Use shm_mq to send & receive tuples between parallel backends
*
* A DestReceiver of type DestTupleQueue, which is a TQueueDestReceiver
- * under the hood, writes tuples from the executor to a shm_mq.
+ * under the hood, writes tuples from the executor to a shm_mq. If
+ * necessary, it also writes control messages describing transient
+ * record types used within the tuple.
*
- * A TupleQueueFunnel helps manage the process of reading tuples from
- * one or more shm_mq objects being used as tuple queues.
+ * A TupleQueueReader reads tuples, and if any are sent control messages,
+ * from a shm_mq and returns the tuples. If transient record types are
+ * in use, it registers those types based on the received control messages
+ * and rewrites the typemods sent by the remote side to the corresponding
+ * local record typemods.
*
* Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
#include "postgres.h"
#include "access/htup_details.h"
+#include "catalog/pg_type.h"
#include "executor/tqueue.h"
+#include "funcapi.h"
+#include "lib/stringinfo.h"
#include "miscadmin.h"
+#include "utils/array.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "utils/rangetypes.h"
+#include "utils/syscache.h"
+#include "utils/typcache.h"
+
+typedef enum
+{
+ TQUEUE_REMAP_NONE, /* no special processing required */
+ TQUEUE_REMAP_ARRAY, /* array */
+ TQUEUE_REMAP_RANGE, /* range */
+ TQUEUE_REMAP_RECORD /* composite type, named or anonymous */
+} RemapClass;
+
+typedef struct
+{
+ int natts;
+ RemapClass mapping[FLEXIBLE_ARRAY_MEMBER];
+} RemapInfo;
typedef struct
{
DestReceiver pub;
shm_mq_handle *handle;
+ MemoryContext tmpcontext;
+ HTAB *recordhtab;
+ char mode;
+ TupleDesc tupledesc;
+ RemapInfo *remapinfo;
} TQueueDestReceiver;
-struct TupleQueueFunnel
+typedef struct RecordTypemodMap
{
- int nqueues;
- int maxqueues;
- int nextqueue;
- shm_mq_handle **queue;
+ int remotetypmod;
+ int localtypmod;
+} RecordTypemodMap;
+
+struct TupleQueueReader
+{
+ shm_mq_handle *queue;
+ char mode;
+ TupleDesc tupledesc;
+ RemapInfo *remapinfo;
+ HTAB *typmodmap;
};
+#define TUPLE_QUEUE_MODE_CONTROL 'c'
+#define TUPLE_QUEUE_MODE_DATA 'd'
+
+static void tqueueWalk(TQueueDestReceiver * tqueue, RemapClass walktype,
+ Datum value);
+static void tqueueWalkRecord(TQueueDestReceiver * tqueue, Datum value);
+static void tqueueWalkArray(TQueueDestReceiver * tqueue, Datum value);
+static void tqueueWalkRange(TQueueDestReceiver * tqueue, Datum value);
+static void tqueueSendTypmodInfo(TQueueDestReceiver * tqueue, int typmod,
+ TupleDesc tupledesc);
+static void TupleQueueHandleControlMessage(TupleQueueReader *reader,
+ Size nbytes, char *data);
+static HeapTuple TupleQueueHandleDataMessage(TupleQueueReader *reader,
+ Size nbytes, HeapTupleHeader data);
+static HeapTuple TupleQueueRemapTuple(TupleQueueReader *reader,
+ TupleDesc tupledesc, RemapInfo * remapinfo,
+ HeapTuple tuple);
+static Datum TupleQueueRemap(TupleQueueReader *reader, RemapClass remapclass,
+ Datum value);
+static Datum TupleQueueRemapArray(TupleQueueReader *reader, Datum value);
+static Datum TupleQueueRemapRange(TupleQueueReader *reader, Datum value);
+static Datum TupleQueueRemapRecord(TupleQueueReader *reader, Datum value);
+static RemapClass GetRemapClass(Oid typeid);
+static RemapInfo *BuildRemapInfo(TupleDesc tupledesc);
+
/*
* Receive a tuple.
+ *
+ * This is, at core, pretty simple: just send the tuple to the designated
+ * shm_mq. The complicated part is that if the tuple contains transient
+ * record types (see lookup_rowtype_tupdesc), we need to send control
+ * information to the shm_mq receiver so that those typemods can be correctly
+ * interpreted, as they are merely held in a backend-local cache. Worse, the
+ * record type may not at the top level: we could have a range over an array
+ * type over a range type over a range type over an array type over a record,
+ * or something like that.
*/
static void
tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
{
TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
+ TupleDesc tupledesc = slot->tts_tupleDescriptor;
HeapTuple tuple;
+ HeapTupleHeader tup;
+
+ /*
+ * Test to see whether the tupledesc has changed; if so, set up for the
+ * new tupledesc. This is a strange test both because the executor really
+ * shouldn't change the tupledesc, and also because it would be unsafe if
+ * the old tupledesc could be freed and a new one allocated at the same
+ * address. But since some very old code in printtup.c uses this test, we
+ * adopt it here as well.
+ */
+ if (tqueue->tupledesc != tupledesc ||
+ tqueue->remapinfo->natts != tupledesc->natts)
+ {
+ if (tqueue->remapinfo != NULL)
+ pfree(tqueue->remapinfo);
+ tqueue->remapinfo = BuildRemapInfo(tupledesc);
+ }
tuple = ExecMaterializeSlot(slot);
+ tup = tuple->t_data;
+
+ /*
+ * When, because of the types being transmitted, no record typemod mapping
+ * can be needed, we can skip a good deal of work.
+ */
+ if (tqueue->remapinfo != NULL)
+ {
+ RemapInfo *remapinfo = tqueue->remapinfo;
+ AttrNumber i;
+ MemoryContext oldcontext = NULL;
+
+ /* Deform the tuple so we can examine it, if not done already. */
+ slot_getallattrs(slot);
+
+ /* Iterate over each attribute and search it for transient typemods. */
+ Assert(slot->tts_tupleDescriptor->natts == remapinfo->natts);
+ for (i = 0; i < remapinfo->natts; ++i)
+ {
+ /* Ignore nulls and types that don't need special handling. */
+ if (slot->tts_isnull[i] ||
+ remapinfo->mapping[i] == TQUEUE_REMAP_NONE)
+ continue;
+
+ /* Switch to temporary memory context to avoid leaking. */
+ if (oldcontext == NULL)
+ {
+ if (tqueue->tmpcontext == NULL)
+ tqueue->tmpcontext =
+ AllocSetContextCreate(TopMemoryContext,
+ "tqueue temporary context",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+ oldcontext = MemoryContextSwitchTo(tqueue->tmpcontext);
+ }
+
+ /* Invoke the appropriate walker function. */
+ tqueueWalk(tqueue, remapinfo->mapping[i], slot->tts_values[i]);
+ }
+
+ /* If we used the temp context, reset it and restore prior context. */
+ if (oldcontext != NULL)
+ {
+ MemoryContextSwitchTo(oldcontext);
+ MemoryContextReset(tqueue->tmpcontext);
+ }
+
+ /* If we entered control mode, switch back to data mode. */
+ if (tqueue->mode != TUPLE_QUEUE_MODE_DATA)
+ {
+ tqueue->mode = TUPLE_QUEUE_MODE_DATA;
+ shm_mq_send(tqueue->handle, sizeof(char), &tqueue->mode, false);
+ }
+ }
+
+ /* Send the tuple itself. */
shm_mq_send(tqueue->handle, tuple->t_len, tuple->t_data, false);
}
+/*
+ * Invoke the appropriate walker function based on the given RemapClass.
+ */
+static void
+tqueueWalk(TQueueDestReceiver * tqueue, RemapClass walktype, Datum value)
+{
+ check_stack_depth();
+
+ switch (walktype)
+ {
+ case TQUEUE_REMAP_NONE:
+ break;
+ case TQUEUE_REMAP_ARRAY:
+ tqueueWalkArray(tqueue, value);
+ break;
+ case TQUEUE_REMAP_RANGE:
+ tqueueWalkRange(tqueue, value);
+ break;
+ case TQUEUE_REMAP_RECORD:
+ tqueueWalkRecord(tqueue, value);
+ break;
+ }
+}
+
+/*
+ * Walk a record and send control messages for transient record types
+ * contained therein.
+ */
+static void
+tqueueWalkRecord(TQueueDestReceiver * tqueue, Datum value)
+{
+ HeapTupleHeader tup;
+ Oid typeid;
+ Oid typmod;
+ TupleDesc tupledesc;
+ RemapInfo *remapinfo;
+
+ /* Extract typmod from tuple. */
+ tup = DatumGetHeapTupleHeader(value);
+ typeid = HeapTupleHeaderGetTypeId(tup);
+ typmod = HeapTupleHeaderGetTypMod(tup);
+
+ /* Look up tuple descriptor in typecache. */
+ tupledesc = lookup_rowtype_tupdesc(typeid, typmod);
+
+ /*
+ * If this is a transient record time, send its TupleDesc as a control
+ * message. (tqueueSendTypemodInfo is smart enough to do this only once
+ * per typmod.)
+ */
+ if (typeid == RECORDOID)
+ tqueueSendTypmodInfo(tqueue, typmod, tupledesc);
+
+ /*
+ * Build the remap information for this tupledesc. We might want to think
+ * about keeping a cache of this information keyed by typeid and typemod,
+ * but let's keep it simple for now.
+ */
+ remapinfo = BuildRemapInfo(tupledesc);
+
+ /*
+ * If remapping is required, deform the tuple and process each field. When
+ * BuildRemapInfo is null, the data types are such that there can be no
+ * transient record types here, so we can skip all this work.
+ */
+ if (remapinfo != NULL)
+ {
+ Datum *values;
+ bool *isnull;
+ HeapTupleData tdata;
+ AttrNumber i;
+
+ /* Deform the tuple so we can check each column within. */
+ values = palloc(tupledesc->natts * sizeof(Datum));
+ isnull = palloc(tupledesc->natts * sizeof(bool));
+ tdata.t_len = HeapTupleHeaderGetDatumLength(tup);
+ ItemPointerSetInvalid(&(tdata.t_self));
+ tdata.t_tableOid = InvalidOid;
+ tdata.t_data = tup;
+ heap_deform_tuple(&tdata, tupledesc, values, isnull);
+
+ /* Recursively check each non-NULL attribute. */
+ for (i = 0; i < tupledesc->natts; ++i)
+ if (!isnull[i])
+ tqueueWalk(tqueue, remapinfo->mapping[i], values[i]);
+ }
+
+ /* Release reference count acquired by lookup_rowtype_tupdesc. */
+ DecrTupleDescRefCount(tupledesc);
+}
+
+/*
+ * Walk a record and send control messages for transient record types
+ * contained therein.
+ */
+static void
+tqueueWalkArray(TQueueDestReceiver * tqueue, Datum value)
+{
+ ArrayType *arr = DatumGetArrayTypeP(value);
+ Oid typeid = ARR_ELEMTYPE(arr);
+ RemapClass remapclass;
+ int16 typlen;
+ bool typbyval;
+ char typalign;
+ Datum *elem_values;
+ bool *elem_nulls;
+ int num_elems;
+ int i;
+
+ remapclass = GetRemapClass(typeid);
+
+ /*
+ * If the elements of the array don't need to be walked, we shouldn't have
+ * been called in the first place: GetRemapClass should have returned NULL
+ * when asked about this array type.
+ */
+ Assert(remapclass != TQUEUE_REMAP_NONE);
+
+ /* Deconstruct the array. */
+ get_typlenbyvalalign(typeid, &typlen, &typbyval, &typalign);
+ deconstruct_array(arr, typeid, typlen, typbyval, typalign,
+ &elem_values, &elem_nulls, &num_elems);
+
+ /* Walk each element. */
+ for (i = 0; i < num_elems; ++i)
+ if (!elem_nulls[i])
+ tqueueWalk(tqueue, remapclass, elem_values[i]);
+}
+
+/*
+ * Walk a range type and send control messages for transient record types
+ * contained therein.
+ */
+static void
+tqueueWalkRange(TQueueDestReceiver * tqueue, Datum value)
+{
+ RangeType *range = DatumGetRangeType(value);
+ Oid typeid = RangeTypeGetOid(range);
+ RemapClass remapclass;
+ TypeCacheEntry *typcache;
+ RangeBound lower;
+ RangeBound upper;
+ bool empty;
+
+ /*
+ * Extract the lower and upper bounds. It might be worth implementing
+ * some caching scheme here so that we don't look up the same typeids in
+ * the type cache repeatedly, but for now let's keep it simple.
+ */
+ typcache = lookup_type_cache(typeid, TYPECACHE_RANGE_INFO);
+ if (typcache->rngelemtype == NULL)
+ elog(ERROR, "type %u is not a range type", typeid);
+ range_deserialize(typcache, range, &lower, &upper, &empty);
+
+ /* Nothing to do for an empty range. */
+ if (empty)
+ return;
+
+ /*
+ * If the range bounds don't need to be walked, we shouldn't have been
+ * called in the first place: GetRemapClass should have returned NULL when
+ * asked about this range type.
+ */
+ remapclass = GetRemapClass(typeid);
+ Assert(remapclass != TQUEUE_REMAP_NONE);
+
+ /* Walk each bound, if present. */
+ if (!upper.infinite)
+ tqueueWalk(tqueue, remapclass, upper.val);
+ if (!lower.infinite)
+ tqueueWalk(tqueue, remapclass, lower.val);
+}
+
+/*
+ * Send tuple descriptor information for a transient typemod, unless we've
+ * already done so previously.
+ */
+static void
+tqueueSendTypmodInfo(TQueueDestReceiver * tqueue, int typmod,
+ TupleDesc tupledesc)
+{
+ StringInfoData buf;
+ bool found;
+ AttrNumber i;
+
+ /* Initialize hash table if not done yet. */
+ if (tqueue->recordhtab == NULL)
+ {
+ HASHCTL ctl;
+
+ ctl.keysize = sizeof(int);
+ ctl.entrysize = sizeof(int);
+ ctl.hcxt = TopMemoryContext;
+ tqueue->recordhtab = hash_create("tqueue record hashtable",
+ 100, &ctl, HASH_ELEM | HASH_CONTEXT);
+ }
+
+ /* Have we already seen this record type? If not, must report it. */
+ hash_search(tqueue->recordhtab, &typmod, HASH_ENTER, &found);
+ if (found)
+ return;
+
+ /* If message queue is in data mode, switch to control mode. */
+ if (tqueue->mode != TUPLE_QUEUE_MODE_CONTROL)
+ {
+ tqueue->mode = TUPLE_QUEUE_MODE_CONTROL;
+ shm_mq_send(tqueue->handle, sizeof(char), &tqueue->mode, false);
+ }
+
+ /* Assemble a control message. */
+ initStringInfo(&buf);
+ appendBinaryStringInfo(&buf, (char *) &typmod, sizeof(int));
+ appendBinaryStringInfo(&buf, (char *) &tupledesc->natts, sizeof(int));
+ appendBinaryStringInfo(&buf, (char *) &tupledesc->tdhasoid,
+ sizeof(bool));
+ for (i = 0; i < tupledesc->natts; ++i)
+ appendBinaryStringInfo(&buf, (char *) tupledesc->attrs[i],
+ sizeof(FormData_pg_attribute));
+
+ /* Send control message. */
+ shm_mq_send(tqueue->handle, buf.len, buf.data, false);
+}
+
/*
* Prepare to receive tuples from executor.
*/
static void
tqueueDestroyReceiver(DestReceiver *self)
{
+ TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
+
+ if (tqueue->tmpcontext != NULL)
+ MemoryContextDelete(tqueue->tmpcontext);
+ if (tqueue->recordhtab != NULL)
+ hash_destroy(tqueue->recordhtab);
+ if (tqueue->remapinfo != NULL)
+ pfree(tqueue->remapinfo);
pfree(self);
}
self->pub.rDestroy = tqueueDestroyReceiver;
self->pub.mydest = DestTupleQueue;
self->handle = handle;
+ self->tmpcontext = NULL;
+ self->recordhtab = NULL;
+ self->mode = TUPLE_QUEUE_MODE_DATA;
+ self->remapinfo = NULL;
return (DestReceiver *) self;
}
/*
- * Create a tuple queue funnel.
+ * Create a tuple queue reader.
*/
-TupleQueueFunnel *
-CreateTupleQueueFunnel(void)
+TupleQueueReader *
+CreateTupleQueueReader(shm_mq_handle *handle, TupleDesc tupledesc)
{
- TupleQueueFunnel *funnel = palloc0(sizeof(TupleQueueFunnel));
+ TupleQueueReader *reader = palloc0(sizeof(TupleQueueReader));
- funnel->maxqueues = 8;
- funnel->queue = palloc(funnel->maxqueues * sizeof(shm_mq_handle *));
+ reader->queue = handle;
+ reader->mode = TUPLE_QUEUE_MODE_DATA;
+ reader->tupledesc = tupledesc;
+ reader->remapinfo = BuildRemapInfo(tupledesc);
- return funnel;
+ return reader;
}
/*
- * Destroy a tuple queue funnel.
+ * Destroy a tuple queue reader.
*/
void
-DestroyTupleQueueFunnel(TupleQueueFunnel *funnel)
+DestroyTupleQueueReader(TupleQueueReader *reader)
{
- int i;
+ shm_mq_detach(shm_mq_get_queue(reader->queue));
+ if (reader->remapinfo != NULL)
+ pfree(reader->remapinfo);
+ pfree(reader);
+}
+
+/*
+ * Fetch a tuple from a tuple queue reader.
+ *
+ * Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK, this can still
+ * accumulate bytes from a partially-read message, so it's useful to call
+ * this with nowait = true even if nothing is returned.
+ *
+ * The return value is NULL if there are no remaining queues or if
+ * nowait = true and no tuple is ready to return. *done, if not NULL,
+ * is set to true when queue is detached and otherwise to false.
+ */
+HeapTuple
+TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
+{
+ shm_mq_result result;
+
+ if (done != NULL)
+ *done = false;
+
+ for (;;)
+ {
+ Size nbytes;
+ void *data;
+
+ /* Attempt to read a message. */
+ result = shm_mq_receive(reader->queue, &nbytes, &data, true);
+
+ /* If queue is detached, set *done and return NULL. */
+ if (result == SHM_MQ_DETACHED)
+ {
+ if (done != NULL)
+ *done = true;
+ return NULL;
+ }
+
+ /* In non-blocking mode, bail out if no message ready yet. */
+ if (result == SHM_MQ_WOULD_BLOCK)
+ return NULL;
+ Assert(result == SHM_MQ_SUCCESS);
- for (i = 0; i < funnel->nqueues; i++)
- shm_mq_detach(shm_mq_get_queue(funnel->queue[i]));
- pfree(funnel->queue);
- pfree(funnel);
+ /*
+ * OK, we got a message. Process it.
+ *
+ * One-byte messages are mode switch messages, so that we can switch
+ * between "control" and "data" mode. When in "data" mode, each
+ * message (unless exactly one byte) is a tuple. When in "control"
+ * mode, each message provides a transient-typmod-to-tupledesc mapping
+ * so we can interpret future tuples.
+ */
+ if (nbytes == 1)
+ {
+ /* Mode switch message. */
+ reader->mode = ((char *) data)[0];
+ }
+ else if (reader->mode == TUPLE_QUEUE_MODE_DATA)
+ {
+ /* Tuple data. */
+ return TupleQueueHandleDataMessage(reader, nbytes, data);
+ }
+ else if (reader->mode == TUPLE_QUEUE_MODE_CONTROL)
+ {
+ /* Control message, describing a transient record type. */
+ TupleQueueHandleControlMessage(reader, nbytes, data);
+ }
+ else
+ elog(ERROR, "invalid mode: %d", (int) reader->mode);
+ }
}
/*
- * Remember the shared memory queue handle in funnel.
+ * Handle a data message - that is, a tuple - from the remote side.
*/
-void
-RegisterTupleQueueOnFunnel(TupleQueueFunnel *funnel, shm_mq_handle *handle)
+static HeapTuple
+TupleQueueHandleDataMessage(TupleQueueReader *reader,
+ Size nbytes,
+ HeapTupleHeader data)
{
- if (funnel->nqueues < funnel->maxqueues)
+ HeapTupleData htup;
+
+ ItemPointerSetInvalid(&htup.t_self);
+ htup.t_tableOid = InvalidOid;
+ htup.t_len = nbytes;
+ htup.t_data = data;
+
+ return TupleQueueRemapTuple(reader, reader->tupledesc, reader->remapinfo,
+ &htup);
+}
+
+/*
+ * Remap tuple typmods per control information received from remote side.
+ */
+static HeapTuple
+TupleQueueRemapTuple(TupleQueueReader *reader, TupleDesc tupledesc,
+ RemapInfo * remapinfo, HeapTuple tuple)
+{
+ Datum *values;
+ bool *isnull;
+ bool dirty = false;
+ int i;
+
+ /*
+ * If no remapping is necessary, just copy the tuple into a single
+ * palloc'd chunk, as caller will expect.
+ */
+ if (remapinfo == NULL)
+ return heap_copytuple(tuple);
+
+ /* Deform tuple so we can remap record typmods for individual attrs. */
+ values = palloc(tupledesc->natts * sizeof(Datum));
+ isnull = palloc(tupledesc->natts * sizeof(bool));
+ heap_deform_tuple(tuple, tupledesc, values, isnull);
+ Assert(tupledesc->natts == remapinfo->natts);
+
+ /* Recursively check each non-NULL attribute. */
+ for (i = 0; i < tupledesc->natts; ++i)
{
- funnel->queue[funnel->nqueues++] = handle;
- return;
+ if (isnull[i] || remapinfo->mapping[i] == TQUEUE_REMAP_NONE)
+ continue;
+ values[i] = TupleQueueRemap(reader, remapinfo->mapping[i], values[i]);
+ dirty = true;
}
- if (funnel->nqueues >= funnel->maxqueues)
+ /* Reform the modified tuple. */
+ return heap_form_tuple(tupledesc, values, isnull);
+}
+
+/*
+ * Remap a value based on the specified remap class.
+ */
+static Datum
+TupleQueueRemap(TupleQueueReader *reader, RemapClass remapclass, Datum value)
+{
+ check_stack_depth();
+
+ switch (remapclass)
{
- int newsize = funnel->nqueues * 2;
+ case TQUEUE_REMAP_NONE:
+ /* caller probably shouldn't have called us at all, but... */
+ return value;
+
+ case TQUEUE_REMAP_ARRAY:
+ return TupleQueueRemapArray(reader, value);
- Assert(funnel->nqueues == funnel->maxqueues);
+ case TQUEUE_REMAP_RANGE:
+ return TupleQueueRemapRange(reader, value);
- funnel->queue = repalloc(funnel->queue,
- newsize * sizeof(shm_mq_handle *));
- funnel->maxqueues = newsize;
+ case TQUEUE_REMAP_RECORD:
+ return TupleQueueRemapRecord(reader, value);
}
+}
- funnel->queue[funnel->nqueues++] = handle;
+/*
+ * Remap an array.
+ */
+static Datum
+TupleQueueRemapArray(TupleQueueReader *reader, Datum value)
+{
+ ArrayType *arr = DatumGetArrayTypeP(value);
+ Oid typeid = ARR_ELEMTYPE(arr);
+ RemapClass remapclass;
+ int16 typlen;
+ bool typbyval;
+ char typalign;
+ Datum *elem_values;
+ bool *elem_nulls;
+ int num_elems;
+ int i;
+
+ remapclass = GetRemapClass(typeid);
+
+ /*
+ * If the elements of the array don't need to be walked, we shouldn't have
+ * been called in the first place: GetRemapClass should have returned NULL
+ * when asked about this array type.
+ */
+ Assert(remapclass != TQUEUE_REMAP_NONE);
+
+ /* Deconstruct the array. */
+ get_typlenbyvalalign(typeid, &typlen, &typbyval, &typalign);
+ deconstruct_array(arr, typeid, typlen, typbyval, typalign,
+ &elem_values, &elem_nulls, &num_elems);
+
+ /* Remap each element. */
+ for (i = 0; i < num_elems; ++i)
+ if (!elem_nulls[i])
+ elem_values[i] = TupleQueueRemap(reader, remapclass,
+ elem_values[i]);
+
+ /* Reconstruct and return the array. */
+ arr = construct_md_array(elem_values, elem_nulls,
+ ARR_NDIM(arr), ARR_DIMS(arr), ARR_LBOUND(arr),
+ typeid, typlen, typbyval, typalign);
+ return PointerGetDatum(arr);
}
/*
- * Fetch a tuple from a tuple queue funnel.
- *
- * We try to read from the queues in round-robin fashion so as to avoid
- * the situation where some workers get their tuples read expediently while
- * others are barely ever serviced.
- *
- * Even when nowait = false, we read from the individual queues in
- * non-blocking mode. Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK,
- * it can still accumulate bytes from a partially-read message, so doing it
- * this way should outperform doing a blocking read on each queue in turn.
- *
- * The return value is NULL if there are no remaining queues or if
- * nowait = true and no queue returned a tuple without blocking. *done, if
- * not NULL, is set to true when there are no remaining queues and false in
- * any other case.
+ * Remap a range type.
*/
-HeapTuple
-TupleQueueFunnelNext(TupleQueueFunnel *funnel, bool nowait, bool *done)
+static Datum
+TupleQueueRemapRange(TupleQueueReader *reader, Datum value)
{
- int waitpos = funnel->nextqueue;
+ RangeType *range = DatumGetRangeType(value);
+ Oid typeid = RangeTypeGetOid(range);
+ RemapClass remapclass;
+ TypeCacheEntry *typcache;
+ RangeBound lower;
+ RangeBound upper;
+ bool empty;
+
+ /*
+ * Extract the lower and upper bounds. As in tqueueWalkRange, some
+ * caching might be a good idea here.
+ */
+ typcache = lookup_type_cache(typeid, TYPECACHE_RANGE_INFO);
+ if (typcache->rngelemtype == NULL)
+ elog(ERROR, "type %u is not a range type", typeid);
+ range_deserialize(typcache, range, &lower, &upper, &empty);
+
+ /* Nothing to do for an empty range. */
+ if (empty)
+ return value;
+
+ /*
+ * If the range bounds don't need to be walked, we shouldn't have been
+ * called in the first place: GetRemapClass should have returned NULL when
+ * asked about this range type.
+ */
+ remapclass = GetRemapClass(typeid);
+ Assert(remapclass != TQUEUE_REMAP_NONE);
+
+ /* Remap each bound, if present. */
+ if (!upper.infinite)
+ upper.val = TupleQueueRemap(reader, remapclass, upper.val);
+ if (!lower.infinite)
+ lower.val = TupleQueueRemap(reader, remapclass, lower.val);
+
+ /* And reserialize. */
+ range = range_serialize(typcache, &lower, &upper, empty);
+ return RangeTypeGetDatum(range);
+}
- /* Corner case: called before adding any queues, or after all are gone. */
- if (funnel->nqueues == 0)
+/*
+ * Remap a record.
+ */
+static Datum
+TupleQueueRemapRecord(TupleQueueReader *reader, Datum value)
+{
+ HeapTupleHeader tup;
+ Oid typeid;
+ int typmod;
+ RecordTypemodMap *mapent;
+ TupleDesc tupledesc;
+ RemapInfo *remapinfo;
+ HeapTupleData htup;
+ HeapTuple atup;
+
+ /* Fetch type OID and typemod. */
+ tup = DatumGetHeapTupleHeader(value);
+ typeid = HeapTupleHeaderGetTypeId(tup);
+ typmod = HeapTupleHeaderGetTypMod(tup);
+
+ /* If transient record, replace remote typmod with local typmod. */
+ if (typeid == RECORDOID)
{
- if (done != NULL)
- *done = true;
- return NULL;
+ Assert(reader->typmodmap != NULL);
+ mapent = hash_search(reader->typmodmap, &typmod,
+ HASH_FIND, NULL);
+ if (mapent == NULL)
+ elog(ERROR, "found unrecognized remote typmod %d", typmod);
+ typmod = mapent->localtypmod;
}
- if (done != NULL)
- *done = false;
+ /*
+ * Fetch tupledesc and compute remap info. We should probably cache this
+ * so that we don't have to keep recomputing it.
+ */
+ tupledesc = lookup_rowtype_tupdesc(typeid, typmod);
+ remapinfo = BuildRemapInfo(tupledesc);
+ DecrTupleDescRefCount(tupledesc);
+
+ /* Remap tuple. */
+ ItemPointerSetInvalid(&htup.t_self);
+ htup.t_tableOid = InvalidOid;
+ htup.t_len = HeapTupleHeaderGetDatumLength(tup);
+ htup.t_data = tup;
+ atup = TupleQueueRemapTuple(reader, tupledesc, remapinfo, &htup);
+ HeapTupleHeaderSetTypeId(atup->t_data, typeid);
+ HeapTupleHeaderSetTypMod(atup->t_data, typmod);
+ HeapTupleHeaderSetDatumLength(atup->t_data, htup.t_len);
+
+ /* And return the results. */
+ return HeapTupleHeaderGetDatum(atup->t_data);
+}
- for (;;)
+/*
+ * Handle a control message from the tuple queue reader.
+ *
+ * Control messages are sent when the remote side is sending tuples that
+ * contain transient record types. We need to arrange to bless those
+ * record types locally and translate between remote and local typmods.
+ */
+static void
+TupleQueueHandleControlMessage(TupleQueueReader *reader, Size nbytes,
+ char *data)
+{
+ int natts;
+ int remotetypmod;
+ bool hasoid;
+ char *buf = data;
+ int rc = 0;
+ int i;
+ Form_pg_attribute *attrs;
+ MemoryContext oldcontext;
+ TupleDesc tupledesc;
+ RecordTypemodMap *mapent;
+ bool found;
+
+ /* Extract remote typmod. */
+ memcpy(&remotetypmod, &buf[rc], sizeof(int));
+ rc += sizeof(int);
+
+ /* Extract attribute count. */
+ memcpy(&natts, &buf[rc], sizeof(int));
+ rc += sizeof(int);
+
+ /* Extract hasoid flag. */
+ memcpy(&hasoid, &buf[rc], sizeof(bool));
+ rc += sizeof(bool);
+
+ /* Extract attribute details. */
+ oldcontext = MemoryContextSwitchTo(CurTransactionContext);
+ attrs = palloc(natts * sizeof(Form_pg_attribute));
+ for (i = 0; i < natts; ++i)
{
- shm_mq_handle *mqh = funnel->queue[funnel->nextqueue];
- shm_mq_result result;
- Size nbytes;
- void *data;
+ attrs[i] = palloc(sizeof(FormData_pg_attribute));
+ memcpy(attrs[i], &buf[rc], sizeof(FormData_pg_attribute));
+ rc += sizeof(FormData_pg_attribute);
+ }
+ MemoryContextSwitchTo(oldcontext);
- /* Attempt to read a message. */
- result = shm_mq_receive(mqh, &nbytes, &data, true);
+ /* We should have read the whole message. */
+ Assert(rc == nbytes);
- /*
- * Normally, we advance funnel->nextqueue to the next queue at this
- * point, but if we're pointing to a queue that we've just discovered
- * is detached, then forget that queue and leave the pointer where it
- * is until the number of remaining queues fall below that pointer and
- * at that point make the pointer point to the first queue.
- */
- if (result != SHM_MQ_DETACHED)
- funnel->nextqueue = (funnel->nextqueue + 1) % funnel->nqueues;
- else
+ /* Construct TupleDesc. */
+ tupledesc = CreateTupleDesc(natts, hasoid, attrs);
+ tupledesc = BlessTupleDesc(tupledesc);
+
+ /* Create map if it doesn't exist already. */
+ if (reader->typmodmap == NULL)
+ {
+ HASHCTL ctl;
+
+ ctl.keysize = sizeof(int);
+ ctl.entrysize = sizeof(RecordTypemodMap);
+ ctl.hcxt = CurTransactionContext;
+ reader->typmodmap = hash_create("typmodmap hashtable",
+ 100, &ctl, HASH_ELEM | HASH_CONTEXT);
+ }
+
+ /* Create map entry. */
+ mapent = hash_search(reader->typmodmap, &remotetypmod, HASH_ENTER,
+ &found);
+ if (found)
+ elog(ERROR, "duplicate message for typmod %d",
+ remotetypmod);
+ mapent->localtypmod = tupledesc->tdtypmod;
+ elog(DEBUG3, "mapping remote typmod %d to local typmod %d",
+ remotetypmod, tupledesc->tdtypmod);
+}
+
+/*
+ * Build a mapping indicating what remapping class applies to each attribute
+ * described by a tupledesc.
+ */
+static RemapInfo *
+BuildRemapInfo(TupleDesc tupledesc)
+{
+ RemapInfo *remapinfo;
+ Size size;
+ AttrNumber i;
+ bool noop = true;
+ StringInfoData buf;
+
+ initStringInfo(&buf);
+
+ size = offsetof(RemapInfo, mapping) +
+ sizeof(RemapClass) * tupledesc->natts;
+ remapinfo = MemoryContextAllocZero(TopMemoryContext, size);
+ remapinfo->natts = tupledesc->natts;
+ for (i = 0; i < tupledesc->natts; ++i)
+ {
+ Form_pg_attribute attr = tupledesc->attrs[i];
+
+ if (attr->attisdropped)
{
- --funnel->nqueues;
- if (funnel->nqueues == 0)
- {
- if (done != NULL)
- *done = true;
- return NULL;
- }
+ remapinfo->mapping[i] = TQUEUE_REMAP_NONE;
+ continue;
+ }
- memmove(&funnel->queue[funnel->nextqueue],
- &funnel->queue[funnel->nextqueue + 1],
- sizeof(shm_mq_handle *)
- * (funnel->nqueues - funnel->nextqueue));
+ remapinfo->mapping[i] = GetRemapClass(attr->atttypid);
+ if (remapinfo->mapping[i] != TQUEUE_REMAP_NONE)
+ noop = false;
+ }
+
+ if (noop)
+ {
+ appendStringInfo(&buf, "noop");
+ pfree(remapinfo);
+ remapinfo = NULL;
+ }
+
+ return remapinfo;
+}
+
+/*
+ * Determine the remap class assocociated with a particular data type.
+ *
+ * Transient record types need to have the typmod applied on the sending side
+ * replaced with a value on the receiving side that has the same meaning.
+ *
+ * Arrays, range types, and all record types (including named composite types)
+ * need to searched for transient record values buried within them.
+ * Surprisingly, a walker is required even when the indicated type is a
+ * composite type, because the actual value may be a compatible transient
+ * record type.
+ */
+static RemapClass
+GetRemapClass(Oid typeid)
+{
+ RemapClass forceResult = TQUEUE_REMAP_NONE;
+ RemapClass innerResult = TQUEUE_REMAP_NONE;
+
+ for (;;)
+ {
+ HeapTuple tup;
+ Form_pg_type typ;
- if (funnel->nextqueue >= funnel->nqueues)
- funnel->nextqueue = 0;
+ /* Simple cases. */
+ if (typeid == RECORDOID)
+ {
+ innerResult = TQUEUE_REMAP_RECORD;
+ break;
+ }
+ if (typeid == RECORDARRAYOID)
+ {
+ innerResult = TQUEUE_REMAP_ARRAY;
+ break;
+ }
- if (funnel->nextqueue < waitpos)
- --waitpos;
+ /* Otherwise, we need a syscache lookup to figure it out. */
+ tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(typeid));
+ if (!HeapTupleIsValid(tup))
+ elog(ERROR, "cache lookup failed for type %u", typeid);
+ typ = (Form_pg_type) GETSTRUCT(tup);
+ /* Look through domains to underlying base type. */
+ if (typ->typtype == TYPTYPE_DOMAIN)
+ {
+ typeid = typ->typbasetype;
+ ReleaseSysCache(tup);
continue;
}
- /* If we got a message, return it. */
- if (result == SHM_MQ_SUCCESS)
+ /*
+ * Look through arrays to underlying base type, but the final return
+ * value must be either TQUEUE_REMAP_ARRAY or TQUEUE_REMAP_NONE. (If
+ * this is an array of integers, for example, we don't need to walk
+ * it.)
+ */
+ if (OidIsValid(typ->typelem) && typ->typlen == -1)
{
- HeapTupleData htup;
-
- /*
- * The tuple data we just read from the queue is only valid until
- * we again attempt to read from it. Copy the tuple into a single
- * palloc'd chunk as callers will expect.
- */
- ItemPointerSetInvalid(&htup.t_self);
- htup.t_tableOid = InvalidOid;
- htup.t_len = nbytes;
- htup.t_data = data;
- return heap_copytuple(&htup);
+ typeid = typ->typelem;
+ ReleaseSysCache(tup);
+ if (forceResult == TQUEUE_REMAP_NONE)
+ forceResult = TQUEUE_REMAP_ARRAY;
+ continue;
}
/*
- * If we've visited all of the queues, then we should either give up
- * and return NULL (if we're in non-blocking mode) or wait for the
- * process latch to be set (otherwise).
+ * Similarly, look through ranges to the underlying base type, but the
+ * final return value must be either TQUEUE_REMAP_RANGE or
+ * TQUEUE_REMAP_NONE.
*/
- if (funnel->nextqueue == waitpos)
+ if (typ->typtype == TYPTYPE_RANGE)
{
- if (nowait)
- return NULL;
- WaitLatch(MyLatch, WL_LATCH_SET, 0);
- CHECK_FOR_INTERRUPTS();
- ResetLatch(MyLatch);
+ ReleaseSysCache(tup);
+ if (forceResult == TQUEUE_REMAP_NONE)
+ forceResult = TQUEUE_REMAP_RANGE;
+ typeid = get_range_subtype(typeid);
+ continue;
}
+
+ /* Walk composite types. Nothing else needs special handling. */
+ if (typ->typtype == TYPTYPE_COMPOSITE)
+ innerResult = TQUEUE_REMAP_RECORD;
+ ReleaseSysCache(tup);
+ break;
}
+
+ if (innerResult != TQUEUE_REMAP_NONE && forceResult != TQUEUE_REMAP_NONE)
+ return forceResult;
+ return innerResult;
}