#include "utils/memutils.h"
#include "utils/rel.h"
-/*
- * Tuple array for each worker
- */
-typedef struct GMReaderTupleBuffer
-{
- HeapTuple *tuple;
- int readCounter;
- int nTuples;
- bool done;
-} GMReaderTupleBuffer;
-
/*
* When we read tuples from workers, it's a good idea to read several at once
* for efficiency when possible: this minimizes context-switching overhead.
* But reading too many at a time wastes memory without improving performance.
+ * We'll read up to MAX_TUPLE_STORE tuples (in addition to the first one).
*/
#define MAX_TUPLE_STORE 10
+/*
+ * Pending-tuple array for each worker. This holds additional tuples that
+ * we were able to fetch from the worker, but can't process yet. In addition,
+ * this struct holds the "done" flag indicating the worker is known to have
+ * no more tuples. (We do not use this struct for the leader; we don't keep
+ * any pending tuples for the leader, and the need_to_scan_locally flag serves
+ * as its "done" indicator.)
+ */
+typedef struct GMReaderTupleBuffer
+{
+ HeapTuple *tuple; /* array of length MAX_TUPLE_STORE */
+ int nTuples; /* number of tuples currently stored */
+ int readCounter; /* index of next tuple to extract */
+ bool done; /* true if reader is known exhausted */
+} GMReaderTupleBuffer;
+
static TupleTableSlot *ExecGatherMerge(PlanState *pstate);
static int32 heap_compare_slots(Datum a, Datum b, void *arg);
static TupleTableSlot *gather_merge_getnext(GatherMergeState *gm_state);
static void ExecShutdownGatherMergeWorkers(GatherMergeState *node);
static bool gather_merge_readnext(GatherMergeState *gm_state, int reader,
bool nowait);
-static void form_tuple_array(GatherMergeState *gm_state, int reader);
+static void load_tuple_array(GatherMergeState *gm_state, int reader);
/* ----------------------------------------------------------------
* ExecInitGather
gm_state->ps.plan = (Plan *) node;
gm_state->ps.state = estate;
gm_state->ps.ExecProcNode = ExecGatherMerge;
+
+ gm_state->initialized = false;
+ gm_state->gm_initialized = false;
gm_state->tuples_needed = -1;
/*
ExecAssignExprContext(estate, &gm_state->ps);
/*
- * initialize child expressions
+ * GatherMerge doesn't support checking a qual (it's always more efficient
+ * to do it in the child node).
*/
- gm_state->ps.qual =
- ExecInitQual(node->plan.qual, &gm_state->ps);
+ Assert(!node->plan.qual);
/*
* tuple table initialization
ExecAssignResultTypeFromTL(&gm_state->ps);
ExecAssignProjectionInfo(&gm_state->ps, NULL);
- gm_state->gm_initialized = false;
-
/*
* initialize sort-key information
*/
if (!node->initialized)
{
EState *estate = node->ps.state;
- GatherMerge *gm = (GatherMerge *) node->ps.plan;
+ GatherMerge *gm = castNode(GatherMerge, node->ps.plan);
/*
* Sometimes we might have to run without parallelism; but if parallel
/* Try to launch workers. */
pcxt = node->pei->pcxt;
LaunchParallelWorkers(pcxt);
+ /* We save # workers launched for the benefit of EXPLAIN */
node->nworkers_launched = pcxt->nworkers_launched;
+ node->nreaders = 0;
/* Set up tuple queue readers to read the results. */
if (pcxt->nworkers_launched > 0)
{
- node->nreaders = 0;
node->reader = palloc(pcxt->nworkers_launched *
sizeof(TupleQueueReader *));
- Assert(gm->numCols);
-
for (i = 0; i < pcxt->nworkers_launched; ++i)
{
shm_mq_set_handle(node->pei->tqueue[i],
return NULL;
/*
- * form the result tuple using ExecProject(), and return it --- unless the
- * projection produces an empty set, in which case we must loop back
- * around for another tuple
+ * Form the result tuple using ExecProject(), and return it.
*/
econtext->ecxt_outertuple = slot;
return ExecProject(node->ps.ps_ProjInfo);
gather_merge_init(GatherMergeState *gm_state)
{
int nreaders = gm_state->nreaders;
- bool initialize = true;
+ bool nowait = true;
int i;
/*
- * Allocate gm_slots for the number of worker + one more slot for leader.
+ * Allocate gm_slots for the number of workers + one more slot for leader.
* Last slot is always for leader. Leader always calls ExecProcNode() to
* read the tuple which will return the TupleTableSlot. Later it will
* directly get assigned to gm_slot. So just initialize leader gm_slot
- * with NULL. For other slots below code will call
- * ExecInitExtraTupleSlot() which will do the initialization of worker
- * slots.
+ * with NULL. For other slots, code below will call
+ * ExecInitExtraTupleSlot() to create a slot for the worker's results.
*/
gm_state->gm_slots =
palloc((gm_state->nreaders + 1) * sizeof(TupleTableSlot *));
/* Initialize the tuple slot and tuple array for each worker */
gm_state->gm_tuple_buffers =
(GMReaderTupleBuffer *) palloc0(sizeof(GMReaderTupleBuffer) *
- (gm_state->nreaders + 1));
+ gm_state->nreaders);
for (i = 0; i < gm_state->nreaders; i++)
{
- /* Allocate the tuple array with MAX_TUPLE_STORE size */
+ /* Allocate the tuple array with length MAX_TUPLE_STORE */
gm_state->gm_tuple_buffers[i].tuple =
(HeapTuple *) palloc0(sizeof(HeapTuple) * MAX_TUPLE_STORE);
/*
* First, try to read a tuple from each worker (including leader) in
- * nowait mode, so that we initialize read from each worker as well as
- * leader. After this, if all active workers are unable to produce a
- * tuple, then re-read and this time use wait mode. For workers that were
- * able to produce a tuple in the earlier loop and are still active, just
- * try to fill the tuple array if more tuples are avaiable.
+ * nowait mode. After this, if not all workers were able to produce a
+ * tuple (or a "done" indication), then re-read from remaining workers,
+ * this time using wait mode. Add all live readers (those producing at
+ * least one tuple) to the heap.
*/
reread:
for (i = 0; i < nreaders + 1; i++)
{
CHECK_FOR_INTERRUPTS();
- if (!gm_state->gm_tuple_buffers[i].done &&
- (TupIsNull(gm_state->gm_slots[i]) ||
- gm_state->gm_slots[i]->tts_isempty))
+ /* ignore this source if already known done */
+ if ((i < nreaders) ?
+ !gm_state->gm_tuple_buffers[i].done :
+ gm_state->need_to_scan_locally)
{
- if (gather_merge_readnext(gm_state, i, initialize))
+ if (TupIsNull(gm_state->gm_slots[i]))
{
- binaryheap_add_unordered(gm_state->gm_heap,
- Int32GetDatum(i));
+ /* Don't have a tuple yet, try to get one */
+ if (gather_merge_readnext(gm_state, i, nowait))
+ binaryheap_add_unordered(gm_state->gm_heap,
+ Int32GetDatum(i));
+ }
+ else
+ {
+ /*
+ * We already got at least one tuple from this worker, but
+ * might as well see if it has any more ready by now.
+ */
+ load_tuple_array(gm_state, i);
}
}
- else
- form_tuple_array(gm_state, i);
}
- initialize = false;
+ /* need not recheck leader, since nowait doesn't matter for it */
for (i = 0; i < nreaders; i++)
+ {
if (!gm_state->gm_tuple_buffers[i].done &&
- (TupIsNull(gm_state->gm_slots[i]) ||
- gm_state->gm_slots[i]->tts_isempty))
+ TupIsNull(gm_state->gm_slots[i]))
+ {
+ nowait = false;
goto reread;
+ }
+ }
+ /* Now heapify the heap. */
binaryheap_build(gm_state->gm_heap);
+
gm_state->gm_initialized = true;
}
for (i = 0; i < gm_state->nreaders; i++)
{
pfree(gm_state->gm_tuple_buffers[i].tuple);
- gm_state->gm_slots[i] = ExecClearTuple(gm_state->gm_slots[i]);
+ ExecClearTuple(gm_state->gm_slots[i]);
}
/* Free tuple array as we don't need it any more */
if (gather_merge_readnext(gm_state, i, false))
binaryheap_replace_first(gm_state->gm_heap, Int32GetDatum(i));
else
+ {
+ /* reader exhausted, remove it from heap */
(void) binaryheap_remove_first(gm_state->gm_heap);
+ }
}
if (binaryheap_empty(gm_state->gm_heap))
}
/*
- * Read the tuple for given reader in nowait mode, and form the tuple array.
+ * Read tuple(s) for given reader in nowait mode, and load into its tuple
+ * array, until we have MAX_TUPLE_STORE of them or would have to block.
*/
static void
-form_tuple_array(GatherMergeState *gm_state, int reader)
+load_tuple_array(GatherMergeState *gm_state, int reader)
{
- GMReaderTupleBuffer *tuple_buffer = &gm_state->gm_tuple_buffers[reader];
+ GMReaderTupleBuffer *tuple_buffer;
int i;
- /* Last slot is for leader and we don't build tuple array for leader */
+ /* Don't do anything if this is the leader. */
if (reader == gm_state->nreaders)
return;
- /*
- * We here because we already read all the tuples from the tuple array, so
- * initialize the counter to zero.
- */
+ tuple_buffer = &gm_state->gm_tuple_buffers[reader];
+
+ /* If there's nothing in the array, reset the counters to zero. */
if (tuple_buffer->nTuples == tuple_buffer->readCounter)
tuple_buffer->nTuples = tuple_buffer->readCounter = 0;
- /* Tuple array is already full? */
- if (tuple_buffer->nTuples == MAX_TUPLE_STORE)
- return;
-
+ /* Try to fill additional slots in the array. */
for (i = tuple_buffer->nTuples; i < MAX_TUPLE_STORE; i++)
{
- tuple_buffer->tuple[i] = heap_copytuple(gm_readnext_tuple(gm_state,
- reader,
- false,
- &tuple_buffer->done));
- if (!HeapTupleIsValid(tuple_buffer->tuple[i]))
+ HeapTuple tuple;
+
+ tuple = gm_readnext_tuple(gm_state,
+ reader,
+ true,
+ &tuple_buffer->done);
+ if (!HeapTupleIsValid(tuple))
break;
+ tuple_buffer->tuple[i] = heap_copytuple(tuple);
tuple_buffer->nTuples++;
}
}
/*
* Store the next tuple for a given reader into the appropriate slot.
*
- * Returns false if the reader is exhausted, and true otherwise.
+ * Returns true if successful, false if not (either reader is exhausted,
+ * or we didn't want to wait for a tuple). Sets done flag if reader
+ * is found to be exhausted.
*/
static bool
gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
{
GMReaderTupleBuffer *tuple_buffer;
- HeapTuple tup = NULL;
+ HeapTuple tup;
/*
* If we're being asked to generate a tuple from the leader, then we just
gm_state->gm_slots[reader] = outerTupleSlot;
return true;
}
- gm_state->gm_tuple_buffers[reader].done = true;
+ /* need_to_scan_locally serves as "done" flag for leader */
gm_state->need_to_scan_locally = false;
}
return false;
if (tuple_buffer->nTuples > tuple_buffer->readCounter)
{
/* Return any tuple previously read that is still buffered. */
- tuple_buffer = &gm_state->gm_tuple_buffers[reader];
tup = tuple_buffer->tuple[tuple_buffer->readCounter++];
}
else if (tuple_buffer->done)
else
{
/* Read and buffer next tuple. */
- tup = heap_copytuple(gm_readnext_tuple(gm_state,
- reader,
- nowait,
- &tuple_buffer->done));
+ tup = gm_readnext_tuple(gm_state,
+ reader,
+ nowait,
+ &tuple_buffer->done);
+ if (!HeapTupleIsValid(tup))
+ return false;
+ tup = heap_copytuple(tup);
/*
* Attempt to read more tuples in nowait mode and store them in the
- * tuple array.
+ * pending-tuple array for the reader.
*/
- if (HeapTupleIsValid(tup))
- form_tuple_array(gm_state, reader);
- else
- return false;
+ load_tuple_array(gm_state, reader);
}
Assert(HeapTupleIsValid(tup));
bool *done)
{
TupleQueueReader *reader;
- HeapTuple tup = NULL;
+ HeapTuple tup;
MemoryContext oldContext;
MemoryContext tupleContext;
- tupleContext = gm_state->ps.ps_ExprContext->ecxt_per_tuple_memory;
-
- if (done != NULL)
- *done = false;
-
/* Check for async events, particularly messages from workers. */
CHECK_FOR_INTERRUPTS();
reader = gm_state->reader[nreader];
/* Run TupleQueueReaders in per-tuple context */
+ tupleContext = gm_state->ps.ps_ExprContext->ecxt_per_tuple_memory;
oldContext = MemoryContextSwitchTo(tupleContext);
tup = TupleQueueReaderNext(reader, nowait, done);
MemoryContextSwitchTo(oldContext);
typedef struct GatherState
{
PlanState ps; /* its first field is NodeTag */
- bool initialized;
- struct ParallelExecutorInfo *pei;
- int nreaders;
- int nextreader;
- int nworkers_launched;
- struct TupleQueueReader **reader;
- TupleTableSlot *funnel_slot;
- bool need_to_scan_locally;
+ bool initialized; /* workers launched? */
+ bool need_to_scan_locally; /* need to read from local plan? */
int64 tuples_needed; /* tuple bound, see ExecSetTupleBound */
+ /* these fields are set up once: */
+ TupleTableSlot *funnel_slot;
+ struct ParallelExecutorInfo *pei;
+ /* all remaining fields are reinitialized during a rescan: */
+ int nworkers_launched; /* original number of workers */
+ int nreaders; /* number of still-active workers */
+ int nextreader; /* next one to try to read from */
+ struct TupleQueueReader **reader; /* array with nreaders active entries */
} GatherState;
/* ----------------
* merge the results into a single sorted stream.
* ----------------
*/
-struct GMReaderTuple;
+struct GMReaderTupleBuffer; /* private in nodeGatherMerge.c */
typedef struct GatherMergeState
{
PlanState ps; /* its first field is NodeTag */
- bool initialized;
+ bool initialized; /* workers launched? */
+ bool gm_initialized; /* gather_merge_init() done? */
+ bool need_to_scan_locally; /* need to read from local plan? */
+ int64 tuples_needed; /* tuple bound, see ExecSetTupleBound */
+ /* these fields are set up once: */
+ TupleDesc tupDesc; /* descriptor for subplan result tuples */
+ int gm_nkeys; /* number of sort columns */
+ SortSupport gm_sortkeys; /* array of length gm_nkeys */
struct ParallelExecutorInfo *pei;
- int nreaders;
- int nworkers_launched;
- struct TupleQueueReader **reader;
- TupleDesc tupDesc;
- TupleTableSlot **gm_slots;
+ /* all remaining fields are reinitialized during a rescan: */
+ int nworkers_launched; /* original number of workers */
+ int nreaders; /* number of active workers */
+ TupleTableSlot **gm_slots; /* array with nreaders+1 entries */
+ struct TupleQueueReader **reader; /* array with nreaders active entries */
+ struct GMReaderTupleBuffer *gm_tuple_buffers; /* nreaders tuple buffers */
struct binaryheap *gm_heap; /* binary heap of slot indices */
- bool gm_initialized; /* gather merge initilized ? */
- bool need_to_scan_locally;
- int64 tuples_needed; /* tuple bound, see ExecSetTupleBound */
- int gm_nkeys;
- SortSupport gm_sortkeys; /* array of length ms_nkeys */
- struct GMReaderTupleBuffer *gm_tuple_buffers; /* tuple buffer per reader */
} GatherMergeState;
/* ----------------