* disk space as soon as each block is read from its "tape".
*
* We do not form the initial runs using Knuth's recommended replacement
- * selection method (Algorithm 5.4.1R), because it uses a fixed number of
- * records in memory at all times. Since we are dealing with tuples that
- * may vary considerably in size, we want to be able to vary the number of
- * records kept in memory to ensure full utilization of the allowed sort
- * memory space. This is easily done by keeping a variable-size heap in
- * which the records of the current run are stored, plus a variable-size
- * unsorted array holding records that must go into the next run.
+ * selection data structure (Algorithm 5.4.1R), because it uses a fixed
+ * number of records in memory at all times. Since we are dealing with
+ * tuples that may vary considerably in size, we want to be able to vary
+ * the number of records kept in memory to ensure full utilization of the
+ * allowed sort memory space. So, we keep the tuples in a variable-size
+ * heap, with the next record to go out at the top of the heap. Like
+ * Algorithm 5.4.1R, each record is stored with the run number that it
+ * must go into, and we use (run number, key) as the ordering key for the
+ * heap. When the run number at the top of the heap changes, we know that
+ * no more records of the prior run are left in the heap.
*
* The (approximate) amount of memory allowed for any one sort operation
* is given in kilobytes by the external variable SortMem. Initially,
* tuples just by scanning the tuple array sequentially. If we do exceed
* SortMem, we construct a heap using Algorithm H and begin to emit tuples
* into sorted runs in temporary tapes, emitting just enough tuples at each
- * step to get back within the SortMem limit. New tuples are added to the
- * heap if they can go into the current run, else they are temporarily added
- * to the unsorted array. Whenever the heap empties, we construct a new heap
- * from the current contents of the unsorted array, and begin a new run with a
- * new output tape (selected per Algorithm D). After the end of the input
- * is reached, we dump out remaining tuples in memory into a final run
- * (or two), then merge the runs using Algorithm D.
+ * step to get back within the SortMem limit. Whenever the run number at
+ * the top of the heap changes, we begin a new run with a new output tape
+ * (selected per Algorithm D). After the end of the input is reached,
+ * we dump out remaining tuples in memory into a final run (or two),
+ * then merge the runs using Algorithm D.
+ *
+ * When merging runs, we use a heap containing just the frontmost tuple from
+ * each source run; we repeatedly output the smallest tuple and insert the
+ * next tuple from its source tape (if any). When the heap empties, the merge
+ * is complete. The basic merge algorithm thus needs very little memory ---
+ * only M tuples for an M-way merge, and M is at most six in the present code.
+ * However, we can still make good use of our full SortMem allocation by
+ * pre-reading additional tuples from each source tape. Without prereading,
+ * our access pattern to the temporary file would be very erratic; on average
+ * we'd read one block from each of M source tapes during the same time that
+ * we're writing M blocks to the output tape, so there is no sequentiality of
+ * access at all, defeating the read-ahead methods used by most Unix kernels.
+ * Worse, the output tape gets written into a very random sequence of blocks
+ * of the temp file, ensuring that things will be even worse when it comes
+ * time to read that tape. A straightforward merge pass thus ends up doing a
+ * lot of waiting for disk seeks. We can improve matters by prereading from
+ * each source tape sequentially, loading about SortMem/M bytes from each tape
+ * in turn. Then we run the merge algorithm, writing but not reading until
+ * one of the preloaded tuple series runs out. Then we switch back to preread
+ * mode, fill memory again, and repeat. This approach helps to localize both
+ * read and write accesses.
*
* When the caller requests random access to the sort result, we form
* the final sorted run on a logical tape which is then "frozen", so
* Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/utils/sort/tuplesort.c,v 1.1 1999/10/17 22:15:05 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/utils/sort/tuplesort.c,v 1.2 1999/10/30 17:27:15 tgl Exp $
*
*-------------------------------------------------------------------------
*/
* of memory space consumed.
*/
void * (*readtup) (Tuplesortstate *state, int tapenum, unsigned int len);
+ /*
+ * Obtain memory space occupied by a stored tuple. (This routine is
+ * only needed in the FINALMERGE case, since copytup, writetup, and
+ * readtup are expected to adjust availMem appropriately.)
+ */
+ unsigned int (*tuplesize) (Tuplesortstate *state, void *tup);
/*
- * This array holds "unsorted" tuples during the input phases.
- * If we are able to complete the sort in memory, it holds the
- * final sorted result as well.
+ * This array holds pointers to tuples in sort memory. If we are in
+ * state INITIAL, the tuples are in no particular order; if we are in
+ * state SORTEDINMEM, the tuples are in final sorted order; in states
+ * BUILDRUNS and FINALMERGE, the tuples are organized in "heap" order
+ * per Algorithm H. (Note that memtupcount only counts the tuples that
+ * are part of the heap --- during merge passes, memtuples[] entries
+ * beyond TAPERANGE are never in the heap and are used to hold
+ * pre-read tuples.) In state SORTEDONTAPE, the array is not used.
*/
void **memtuples; /* array of pointers to palloc'd tuples */
int memtupcount; /* number of tuples currently present */
int memtupsize; /* allocated length of memtuples array */
/*
- * This array holds the partially-sorted "heap" of tuples that will go
- * out in the current run during BUILDRUNS state. While completing
- * the sort, we use it to merge runs of tuples from input tapes.
- * It is never allocated unless we need to use tapes.
+ * While building initial runs, this array holds the run number for each
+ * tuple in memtuples[]. During merge passes, we re-use it to hold the
+ * input tape number that each tuple in the heap was read from, or to hold
+ * the index of the next tuple pre-read from the same tape in the case of
+ * pre-read entries. This array is never allocated unless we need to use
+ * tapes. Whenever it is allocated, it has the same length as
+ * memtuples[].
*/
- void **heaptuples; /* array of pointers to palloc'd tuples */
- int heaptupcount; /* number of tuples currently present */
- int heaptupsize; /* allocated length of heaptuples array */
+ int *memtupindex; /* index value associated with memtuples[i] */
+
+ /*
+ * While building initial runs, this is the current output run number
+ * (starting at 0). Afterwards, it is the number of initial runs we made.
+ */
+ int currentRun;
+
/*
- * While merging, this array holds the actual number of the input tape
- * that each tuple in heaptuples[] came from.
+ * These variables are only used during merge passes. mergeactive[i]
+ * is true if we are reading an input run from (actual) tape number i
+ * and have not yet exhausted that run. mergenext[i] is the memtuples
+ * index of the next pre-read tuple (next to be loaded into the heap)
+ * for tape i, or 0 if we are out of pre-read tuples. mergelast[i]
+ * similarly points to the last pre-read tuple from each tape.
+ * mergeavailmem[i] is the amount of unused space allocated for tape i.
+ * mergefreelist and mergefirstfree keep track of unused locations
+ * in the memtuples[] array. memtupindex[] links together pre-read
+ * tuples for each tape as well as recycled locations in mergefreelist.
+ * It is OK to use 0 as a null link in these lists, because memtuples[0]
+ * is part of the merge heap and is never a pre-read tuple.
*/
- int *heapsrctapes;
+ bool mergeactive[MAXTAPES]; /* Active input run source? */
+ int mergenext[MAXTAPES]; /* first preread tuple for each source */
+ int mergelast[MAXTAPES]; /* last preread tuple for each source */
+ long mergeavailmem[MAXTAPES]; /* availMem for prereading tapes */
+ long spacePerTape; /* actual per-tape target usage */
+ int mergefreelist; /* head of freelist of recycled slots */
+ int mergefirstfree; /* first slot never used in this merge */
/*
* Variables for Algorithm D. Note that destTape is a "logical" tape
int tp_dummy[MAXTAPES]; /* # of dummy runs for each tape (D[]) */
int tp_tapenum[MAXTAPES]; /* Actual tape numbers (TAPE[]) */
- bool multipleRuns; /* T if we have created more than 1 run */
-
/*
* These variables are used after completion of sorting to keep track
* of the next tuple to return. (In the tape case, the tape's current
#define COPYTUP(state,tup) ((*(state)->copytup) (state, tup))
#define WRITETUP(state,tape,tup) ((*(state)->writetup) (state, tape, tup))
#define READTUP(state,tape,len) ((*(state)->readtup) (state, tape, len))
+#define TUPLESIZE(state,tup) ((*(state)->tuplesize) (state, tup))
#define LACKMEM(state) ((state)->availMem < 0)
#define USEMEM(state,amt) ((state)->availMem -= (amt))
#define FREEMEM(state,amt) ((state)->availMem += (amt))
*
* We count space requested for tuples against the SortMem limit.
* Fixed-size space (primarily the LogicalTapeSet I/O buffers) is not
- * counted, nor do we count the variable-size memtuples and heaptuples
+ * counted, nor do we count the variable-size memtuples and memtupindex
* arrays. (Even though those could grow pretty large, they should be
* small compared to the tuples proper, so this is not unreasonable.)
*
static void mergeruns(Tuplesortstate *state);
static void mergeonerun(Tuplesortstate *state);
static void beginmerge(Tuplesortstate *state);
-static void beginrun(Tuplesortstate *state);
+static void mergepreread(Tuplesortstate *state);
static void dumptuples(Tuplesortstate *state, bool alltuples);
static void tuplesort_heap_insert(Tuplesortstate *state, void *tuple,
- int tapenum);
-static void tuplesort_heap_siftup(Tuplesortstate *state);
+ int tupleindex, bool checkIndex);
+static void tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex);
static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
static void markrunend(Tuplesortstate *state, int tapenum);
static int qsort_comparetup(const void *a, const void *b);
static void writetup_heap(Tuplesortstate *state, int tapenum, void *tup);
static void *readtup_heap(Tuplesortstate *state, int tapenum,
unsigned int len);
+static unsigned int tuplesize_heap(Tuplesortstate *state, void *tup);
static int comparetup_index(Tuplesortstate *state,
const void *a, const void *b);
static void *copytup_index(Tuplesortstate *state, void *tup);
static void writetup_index(Tuplesortstate *state, int tapenum, void *tup);
static void *readtup_index(Tuplesortstate *state, int tapenum,
unsigned int len);
+static unsigned int tuplesize_index(Tuplesortstate *state, void *tup);
/*
* Since qsort(3) will not pass any context info to qsort_comparetup(),
state->memtupsize = 1024; /* initial guess */
state->memtuples = (void **) palloc(state->memtupsize * sizeof(void *));
- state->heaptuples = NULL; /* until and unless needed */
- state->heaptupcount = 0;
- state->heaptupsize = 0;
- state->heapsrctapes = NULL;
+ state->memtupindex = NULL; /* until and unless needed */
+
+ state->currentRun = 0;
/* Algorithm D variables will be initialized by inittapes, if needed */
state->copytup = copytup_heap;
state->writetup = writetup_heap;
state->readtup = readtup_heap;
+ state->tuplesize = tuplesize_heap;
state->tupDesc = tupDesc;
state->nKeys = nkeys;
state->copytup = copytup_index;
state->writetup = writetup_index;
state->readtup = readtup_index;
+ state->tuplesize = tuplesize_index;
state->indexRel = indexRel;
state->enforceUnique = enforceUnique;
pfree(state->memtuples[i]);
pfree(state->memtuples);
}
- if (state->heaptuples)
- {
- for (i = 0; i < state->heaptupcount; i++)
- pfree(state->heaptuples[i]);
- pfree(state->heaptuples);
- }
- if (state->heapsrctapes)
- pfree(state->heapsrctapes);
+ if (state->memtupindex)
+ pfree(state->memtupindex);
}
/*
* Nope; time to switch to tape-based operation.
*/
inittapes(state);
- beginrun(state);
/*
* Dump tuples until we are back under the limit.
*/
break;
case TSS_BUILDRUNS:
/*
- * Insert the copied tuple into the heap if it can go into the
- * current run; otherwise add it to the unsorted array, whence
- * it will go into the next run.
- *
- * The tuple can go into the current run if it is >= the first
- * not-yet-output tuple. (Actually, it could go into the current
- * run if it is >= the most recently output tuple ... but that
- * would require keeping around the tuple we last output, and
- * it's simplest to let writetup free the tuple when written.)
+ * Insert the copied tuple into the heap, with run number
+ * currentRun if it can go into the current run, else run
+ * number currentRun+1. The tuple can go into the current run
+ * if it is >= the first not-yet-output tuple. (Actually,
+ * it could go into the current run if it is >= the most recently
+ * output tuple ... but that would require keeping around the
+ * tuple we last output, and it's simplest to let writetup free
+ * each tuple as soon as it's written.)
*
* Note there will always be at least one tuple in the heap
* at this point; see dumptuples.
*/
- Assert(state->heaptupcount > 0);
- if (COMPARETUP(state, tuple, state->heaptuples[0]) >= 0)
- {
- tuplesort_heap_insert(state, tuple, 0);
- }
+ Assert(state->memtupcount > 0);
+ if (COMPARETUP(state, tuple, state->memtuples[0]) >= 0)
+ tuplesort_heap_insert(state, tuple, state->currentRun, true);
else
- {
- if (state->memtupcount >= state->memtupsize)
- {
- /* Grow the unsorted array as needed. */
- state->memtupsize *= 2;
- state->memtuples = (void **)
- repalloc(state->memtuples,
- state->memtupsize * sizeof(void *));
- }
- state->memtuples[state->memtupcount++] = tuple;
- }
+ tuplesort_heap_insert(state, tuple, state->currentRun+1, true);
/*
* If we are over the memory limit, dump tuples till we're under.
*/
* Finish tape-based sort. First, flush all tuples remaining
* in memory out to tape; then merge until we have a single
* remaining run (or, if !randomAccess, one run per tape).
- * Note that mergeruns sets the correct status.
+ * Note that mergeruns sets the correct state->status.
*/
dumptuples(state, true);
mergeruns(state);
/*
* This code should match the inner loop of mergeonerun().
*/
- if (state->heaptupcount > 0)
+ if (state->memtupcount > 0)
{
- int srcTape = state->heapsrctapes[0];
-
- tup = state->heaptuples[0];
- tuplesort_heap_siftup(state);
- if ((tuplen = getlen(state, srcTape, true)) != 0)
+ int srcTape = state->memtupindex[0];
+ unsigned int tuplen;
+ int tupIndex;
+ void *newtup;
+
+ tup = state->memtuples[0];
+ /* returned tuple is no longer counted in our memory space */
+ tuplen = TUPLESIZE(state, tup);
+ state->availMem += tuplen;
+ state->mergeavailmem[srcTape] += tuplen;
+ tuplesort_heap_siftup(state, false);
+ if ((tupIndex = state->mergenext[srcTape]) == 0)
{
- void *newtup = READTUP(state, srcTape, tuplen);
- tuplesort_heap_insert(state, newtup, srcTape);
+ /* out of preloaded data on this tape, try to read more */
+ mergepreread(state);
+ /* if still no data, we've reached end of run on this tape */
+ if ((tupIndex = state->mergenext[srcTape]) == 0)
+ return tup;
}
+ /* pull next preread tuple from list, insert in heap */
+ newtup = state->memtuples[tupIndex];
+ state->mergenext[srcTape] = state->memtupindex[tupIndex];
+ if (state->mergenext[srcTape] == 0)
+ state->mergelast[srcTape] = 0;
+ state->memtupindex[tupIndex] = state->mergefreelist;
+ state->mergefreelist = tupIndex;
+ tuplesort_heap_insert(state, newtup, srcTape, false);
return tup;
}
return NULL;
static void
inittapes(Tuplesortstate *state)
{
- int j;
+ int ntuples,
+ j;
state->tapeset = LogicalTapeSetCreate(MAXTAPES);
/*
- * Initialize heaptuples array slightly larger than current memtuples
- * usage; memtupcount is probably a good guess at how many tuples we
- * will be able to have in the heap at once.
+ * Allocate the memtupindex array, same size as memtuples.
+ */
+ state->memtupindex = (int *) palloc(state->memtupsize * sizeof(int));
+
+ /*
+ * Convert the unsorted contents of memtuples[] into a heap.
+ * Each tuple is marked as belonging to run number zero.
+ *
+ * NOTE: we pass false for checkIndex since there's no point in
+ * comparing indexes in this step, even though we do intend the
+ * indexes to be part of the sort key...
*/
- state->heaptupcount = 0;
- state->heaptupsize = state->memtupcount + state->memtupcount / 4;
- state->heaptuples = (void **) palloc(state->heaptupsize * sizeof(void *));
+ ntuples = state->memtupcount;
+ state->memtupcount = 0; /* make the heap empty */
+ for (j = 0; j < ntuples; j++)
+ tuplesort_heap_insert(state, state->memtuples[j], 0, false);
+ Assert(state->memtupcount == ntuples);
+
+ state->currentRun = 0;
/*
* Initialize variables of Algorithm D (step D1).
state->Level = 1;
state->destTape = 0;
- state->multipleRuns = false;
-
state->status = TSS_BUILDRUNS;
}
int j;
int a;
- /* We now have at least two initial runs */
- state->multipleRuns = true;
-
/* Step D3: advance j (destTape) */
if (state->tp_dummy[state->destTape] < state->tp_dummy[state->destTape+1])
{
svDummy;
Assert(state->status == TSS_BUILDRUNS);
- Assert(state->memtupcount == 0 && state->heaptupcount == 0);
+ Assert(state->memtupcount == 0);
/*
* If we produced only one initial run (quite likely if the total
* data volume is between 1X and 2X SortMem), we can just use that
* tape as the finished output, rather than doing a useless merge.
*/
- if (! state->multipleRuns)
+ if (state->currentRun == 1)
{
state->result_tape = state->tp_tapenum[state->destTape];
/* must freeze and rewind the finished output tape */
{
int destTape = state->tp_tapenum[TAPERANGE];
int srcTape;
- unsigned int tuplen;
+ int tupIndex;
void *tup;
+ long priorAvail,
+ spaceFreed;
/*
* Start the merge by loading one tuple from each active source tape
* writing it out, and replacing it with next tuple from same tape
* (if there is another one).
*/
- while (state->heaptupcount > 0)
+ while (state->memtupcount > 0)
{
- WRITETUP(state, destTape, state->heaptuples[0]);
- srcTape = state->heapsrctapes[0];
- tuplesort_heap_siftup(state);
- if ((tuplen = getlen(state, srcTape, true)) != 0)
+ /* write the tuple to destTape */
+ priorAvail = state->availMem;
+ srcTape = state->memtupindex[0];
+ WRITETUP(state, destTape, state->memtuples[0]);
+ /* writetup adjusted total free space, now fix per-tape space */
+ spaceFreed = state->availMem - priorAvail;
+ state->mergeavailmem[srcTape] += spaceFreed;
+ /* compact the heap */
+ tuplesort_heap_siftup(state, false);
+ if ((tupIndex = state->mergenext[srcTape]) == 0)
{
- tup = READTUP(state, srcTape, tuplen);
- tuplesort_heap_insert(state, tup, srcTape);
+ /* out of preloaded data on this tape, try to read more */
+ mergepreread(state);
+ /* if still no data, we've reached end of run on this tape */
+ if ((tupIndex = state->mergenext[srcTape]) == 0)
+ continue;
}
+ /* pull next preread tuple from list, insert in heap */
+ tup = state->memtuples[tupIndex];
+ state->mergenext[srcTape] = state->memtupindex[tupIndex];
+ if (state->mergenext[srcTape] == 0)
+ state->mergelast[srcTape] = 0;
+ state->memtupindex[tupIndex] = state->mergefreelist;
+ state->mergefreelist = tupIndex;
+ tuplesort_heap_insert(state, tup, srcTape, false);
}
-
/*
* When the heap empties, we're done. Write an end-of-run marker
* on the output tape, and increment its count of real runs.
/*
* beginmerge - initialize for a merge pass
*
- * We load the first tuple from each nondummy input run into the heap.
- * We also decrease the counts of real and dummy runs for each tape.
+ * We decrease the counts of real and dummy runs for each tape, and mark
+ * which tapes contain active input runs in mergeactive[]. Then, load
+ * as many tuples as we can from each active input tape, and finally
+ * fill the merge heap with the first tuple from each active tape.
*/
static void
beginmerge(Tuplesortstate *state)
{
+ int activeTapes;
int tapenum;
int srcTape;
- unsigned int tuplen;
- void *tup;
- Assert(state->heaptuples != NULL && state->heaptupcount == 0);
- if (state->heapsrctapes == NULL)
- state->heapsrctapes = (int *) palloc(MAXTAPES * sizeof(int));
+ /* Heap should be empty here */
+ Assert(state->memtupcount == 0);
+
+ /* Clear merge-pass state variables */
+ memset(state->mergeactive, 0, sizeof(state->mergeactive));
+ memset(state->mergenext, 0, sizeof(state->mergenext));
+ memset(state->mergelast, 0, sizeof(state->mergelast));
+ memset(state->mergeavailmem, 0, sizeof(state->mergeavailmem));
+ state->mergefreelist = 0; /* nothing in the freelist */
+ state->mergefirstfree = MAXTAPES; /* first slot available for preread */
+ /* Adjust run counts and mark the active tapes */
+ activeTapes = 0;
for (tapenum = 0; tapenum < TAPERANGE; tapenum++)
{
if (state->tp_dummy[tapenum] > 0)
Assert(state->tp_runs[tapenum] > 0);
state->tp_runs[tapenum]--;
srcTape = state->tp_tapenum[tapenum];
- tuplen = getlen(state, srcTape, false);
- tup = READTUP(state, srcTape, tuplen);
- tuplesort_heap_insert(state, tup, srcTape);
+ state->mergeactive[srcTape] = true;
+ activeTapes++;
}
}
+ /*
+ * Initialize space allocation to let each active input tape have
+ * an equal share of preread space.
+ */
+ Assert(activeTapes > 0);
+ state->spacePerTape = state->availMem / activeTapes;
+ for (srcTape = 0; srcTape < MAXTAPES; srcTape++)
+ {
+ if (state->mergeactive[srcTape])
+ state->mergeavailmem[srcTape] = state->spacePerTape;
+ }
+
+ /*
+ * Preread as many tuples as possible (and at least one) from each
+ * active tape
+ */
+ mergepreread(state);
+
+ /* Load the merge heap with the first tuple from each input tape */
+ for (srcTape = 0; srcTape < MAXTAPES; srcTape++)
+ {
+ int tupIndex = state->mergenext[srcTape];
+ void *tup;
+
+ if (tupIndex)
+ {
+ tup = state->memtuples[tupIndex];
+ state->mergenext[srcTape] = state->memtupindex[tupIndex];
+ if (state->mergenext[srcTape] == 0)
+ state->mergelast[srcTape] = 0;
+ state->memtupindex[tupIndex] = state->mergefreelist;
+ state->mergefreelist = tupIndex;
+ tuplesort_heap_insert(state, tup, srcTape, false);
+ }
+ }
}
/*
- * beginrun - start a new initial run
+ * mergepreread - load tuples from merge input tapes
*
- * The tuples presently in the unsorted memory array are moved into
- * the heap.
+ * This routine exists to improve sequentiality of reads during a merge pass,
+ * as explained in the header comments of this file. Load tuples from each
+ * active source tape until the tape's run is exhausted or it has used up
+ * its fair share of available memory. In any case, we guarantee that there
+ * is at one preread tuple available from each unexhausted input tape.
*/
static void
-beginrun(Tuplesortstate *state)
+mergepreread(Tuplesortstate *state)
{
- int i;
+ int srcTape;
+ unsigned int tuplen;
+ void *tup;
+ int tupIndex;
+ long priorAvail,
+ spaceUsed;
- Assert(state->heaptupcount == 0 && state->memtupcount > 0);
- for (i = 0; i < state->memtupcount; i++)
- tuplesort_heap_insert(state, state->memtuples[i], 0);
- state->memtupcount = 0;
+ for (srcTape = 0; srcTape < MAXTAPES; srcTape++)
+ {
+ if (! state->mergeactive[srcTape])
+ continue;
+ /*
+ * Skip reading from any tape that still has at least half
+ * of its target memory filled with tuples (threshold fraction
+ * may need adjustment?). This avoids reading just a few tuples
+ * when the incoming runs are not being consumed evenly.
+ */
+ if (state->mergenext[srcTape] != 0 &&
+ state->mergeavailmem[srcTape] <= state->spacePerTape / 2)
+ continue;
+ /*
+ * Read tuples from this tape until it has used up its free memory,
+ * but ensure that we have at least one.
+ */
+ priorAvail = state->availMem;
+ state->availMem = state->mergeavailmem[srcTape];
+ while (! LACKMEM(state) || state->mergenext[srcTape] == 0)
+ {
+ /* read next tuple, if any */
+ if ((tuplen = getlen(state, srcTape, true)) == 0)
+ {
+ state->mergeactive[srcTape] = false;
+ break;
+ }
+ tup = READTUP(state, srcTape, tuplen);
+ /* find or make a free slot in memtuples[] for it */
+ tupIndex = state->mergefreelist;
+ if (tupIndex)
+ state->mergefreelist = state->memtupindex[tupIndex];
+ else
+ {
+ tupIndex = state->mergefirstfree++;
+ /* Might need to enlarge arrays! */
+ if (tupIndex >= state->memtupsize)
+ {
+ state->memtupsize *= 2;
+ state->memtuples = (void **)
+ repalloc(state->memtuples,
+ state->memtupsize * sizeof(void *));
+ state->memtupindex = (int *)
+ repalloc(state->memtupindex,
+ state->memtupsize * sizeof(int));
+ }
+ }
+ /* store tuple, append to list for its tape */
+ state->memtuples[tupIndex] = tup;
+ state->memtupindex[tupIndex] = 0;
+ if (state->mergelast[srcTape])
+ state->memtupindex[state->mergelast[srcTape]] = tupIndex;
+ else
+ state->mergenext[srcTape] = tupIndex;
+ state->mergelast[srcTape] = tupIndex;
+ }
+ /* update per-tape and global availmem counts */
+ spaceUsed = state->mergeavailmem[srcTape] - state->availMem;
+ state->mergeavailmem[srcTape] = state->availMem;
+ state->availMem = priorAvail - spaceUsed;
+ }
}
/*
* dumptuples - remove tuples from heap and write to tape
*
+ * This is used during initial-run building, but not during merging.
+ *
* When alltuples = false, dump only enough tuples to get under the
* availMem limit (and leave at least one tuple in the heap in any case,
* since puttuple assumes it always has a tuple to compare to).
* When alltuples = true, dump everything currently in memory.
* (This case is only used at end of input data.)
*
- * If we empty the heap, then start a new run using the tuples that
- * have accumulated in memtuples[] (if any).
+ * If we empty the heap, close out the current run and return (this should
+ * only happen at end of input data). If we see that the tuple run number
+ * at the top of the heap has changed, start a new run.
*/
static void
dumptuples(Tuplesortstate *state, bool alltuples)
{
while (alltuples ||
- (LACKMEM(state) &&
- (state->heaptupcount > 0 || state->memtupcount > 0)))
+ (LACKMEM(state) && state->memtupcount > 1))
{
/*
* Dump the heap's frontmost entry, and sift up to remove it
* from the heap.
*/
- Assert(state->heaptupcount > 0);
+ Assert(state->memtupcount > 0);
WRITETUP(state, state->tp_tapenum[state->destTape],
- state->heaptuples[0]);
- tuplesort_heap_siftup(state);
+ state->memtuples[0]);
+ tuplesort_heap_siftup(state, true);
/*
- * If the heap is now empty, we've finished a run.
+ * If the heap is empty *or* top run number has changed,
+ * we've finished the current run.
*/
- if (state->heaptupcount == 0)
+ if (state->memtupcount == 0 ||
+ state->currentRun != state->memtupindex[0])
{
markrunend(state, state->tp_tapenum[state->destTape]);
+ state->currentRun++;
state->tp_runs[state->destTape]++;
state->tp_dummy[state->destTape]--; /* per Alg D step D2 */
+ /*
+ * Done if heap is empty, else prepare for new run.
+ */
if (state->memtupcount == 0)
- break; /* all input data has been written to tape */
- /* Select new output tape and start a new run */
+ break;
+ Assert(state->currentRun == state->memtupindex[0]);
selectnewtape(state);
- beginrun(state);
}
}
}
/*
* Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
+ *
+ * The heap lives in state->memtuples[], with parallel data storage
+ * for indexes in state->memtupindex[]. If checkIndex is true, use
+ * the tuple index as the front of the sort key; otherwise, no.
*/
+#define HEAPCOMPARE(tup1,index1,tup2,index2) \
+ (checkIndex && (index1 != index2) ? index1 - index2 : \
+ COMPARETUP(state, tup1, tup2))
+
/*
* Insert a new tuple into an empty or existing heap, maintaining the
- * heap invariant. The heap lives in state->heaptuples[]. Also, if
- * state->heapsrctapes is not NULL, we store each tuple's source tapenum
- * in the corresponding element of state->heapsrctapes[].
+ * heap invariant.
*/
static void
tuplesort_heap_insert(Tuplesortstate *state, void *tuple,
- int tapenum)
+ int tupleindex, bool checkIndex)
{
+ void **memtuples;
+ int *memtupindex;
int j;
/*
- * Make sure heaptuples[] can handle another entry.
- * NOTE: we do not enlarge heapsrctapes[]; it's supposed
- * to be big enough when created.
+ * Make sure memtuples[] can handle another entry.
*/
- if (state->heaptupcount >= state->heaptupsize)
+ if (state->memtupcount >= state->memtupsize)
{
- /* Grow the unsorted array as needed. */
- state->heaptupsize *= 2;
- state->heaptuples = (void **)
- repalloc(state->heaptuples,
- state->heaptupsize * sizeof(void *));
+ state->memtupsize *= 2;
+ state->memtuples = (void **)
+ repalloc(state->memtuples,
+ state->memtupsize * sizeof(void *));
+ state->memtupindex = (int *)
+ repalloc(state->memtupindex,
+ state->memtupsize * sizeof(int));
}
+ memtuples = state->memtuples;
+ memtupindex = state->memtupindex;
/*
* Sift-up the new entry, per Knuth 5.2.3 exercise 16.
* Note that Knuth is using 1-based array indexes, not 0-based.
*/
- j = state->heaptupcount++;
- while (j > 0) {
+ j = state->memtupcount++;
+ while (j > 0)
+ {
int i = (j-1) >> 1;
- if (COMPARETUP(state, tuple, state->heaptuples[i]) >= 0)
+ if (HEAPCOMPARE(tuple, tupleindex,
+ memtuples[i], memtupindex[i]) >= 0)
break;
- state->heaptuples[j] = state->heaptuples[i];
- if (state->heapsrctapes)
- state->heapsrctapes[j] = state->heapsrctapes[i];
+ memtuples[j] = memtuples[i];
+ memtupindex[j] = memtupindex[i];
j = i;
}
- state->heaptuples[j] = tuple;
- if (state->heapsrctapes)
- state->heapsrctapes[j] = tapenum;
+ memtuples[j] = tuple;
+ memtupindex[j] = tupleindex;
}
/*
- * The tuple at state->heaptuples[0] has been removed from the heap.
- * Decrement heaptupcount, and sift up to maintain the heap invariant.
+ * The tuple at state->memtuples[0] has been removed from the heap.
+ * Decrement memtupcount, and sift up to maintain the heap invariant.
*/
static void
-tuplesort_heap_siftup(Tuplesortstate *state)
+tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex)
{
- void **heaptuples = state->heaptuples;
+ void **memtuples = state->memtuples;
+ int *memtupindex = state->memtupindex;
void *tuple;
- int i,
+ int tupindex,
+ i,
n;
- if (--state->heaptupcount <= 0)
+ if (--state->memtupcount <= 0)
return;
- n = state->heaptupcount;
- tuple = heaptuples[n]; /* tuple that must be reinserted */
+ n = state->memtupcount;
+ tuple = memtuples[n]; /* tuple that must be reinserted */
+ tupindex = memtupindex[n];
i = 0; /* i is where the "hole" is */
- for (;;) {
+ for (;;)
+ {
int j = 2*i + 1;
if (j >= n)
break;
if (j+1 < n &&
- COMPARETUP(state, heaptuples[j], heaptuples[j+1]) > 0)
+ HEAPCOMPARE(memtuples[j], memtupindex[j],
+ memtuples[j+1], memtupindex[j+1]) > 0)
j++;
- if (COMPARETUP(state, tuple, heaptuples[j]) <= 0)
+ if (HEAPCOMPARE(tuple, tupindex,
+ memtuples[j], memtupindex[j]) <= 0)
break;
- heaptuples[i] = heaptuples[j];
- if (state->heapsrctapes)
- state->heapsrctapes[i] = state->heapsrctapes[j];
+ memtuples[i] = memtuples[j];
+ memtupindex[i] = memtupindex[j];
i = j;
}
- heaptuples[i] = tuple;
- if (state->heapsrctapes)
- state->heapsrctapes[i] = state->heapsrctapes[n];
+ memtuples[i] = tuple;
+ memtupindex[i] = tupindex;
}
{
HeapTuple ltup = (HeapTuple) a;
HeapTuple rtup = (HeapTuple) b;
+ TupleDesc tupDesc = state->tupDesc;
int nkey;
for (nkey = 0; nkey < state->nKeys; nkey++)
lattr = heap_getattr(ltup,
scanKey->sk_attno,
- state->tupDesc,
+ tupDesc,
&isnull1);
rattr = heap_getattr(rtup,
scanKey->sk_attno,
- state->tupDesc,
+ tupDesc,
&isnull2);
if (isnull1)
{
return (void *) tuple;
}
+static unsigned int
+tuplesize_heap(Tuplesortstate *state, void *tup)
+{
+ HeapTuple tuple = (HeapTuple) tup;
+
+ return HEAPTUPLESIZE + tuple->t_len;
+}
+
/*
* Routines specialized for IndexTuple case
IndexTuple rtup = (IndexTuple) b;
TupleDesc itdesc = state->indexRel->rd_att;
bool equal_isnull = false;
- Datum lattr,
- rattr;
- bool isnull1,
- isnull2;
int i;
- for (i = 0; i < itdesc->natts; i++)
+ for (i = 1; i <= itdesc->natts; i++)
{
- lattr = index_getattr(ltup, i + 1, itdesc, &isnull1);
- rattr = index_getattr(rtup, i + 1, itdesc, &isnull2);
+ Datum lattr,
+ rattr;
+ bool isnull1,
+ isnull2;
+
+ lattr = index_getattr(ltup, i, itdesc, &isnull1);
+ rattr = index_getattr(rtup, i, itdesc, &isnull2);
if (isnull1)
{
else if (isnull2)
return -1;
- if (_bt_invokestrat(state->indexRel, i + 1,
+ if (_bt_invokestrat(state->indexRel, i,
BTGreaterStrategyNumber,
lattr, rattr))
return 1;
- if (_bt_invokestrat(state->indexRel, i + 1,
+ if (_bt_invokestrat(state->indexRel, i,
BTGreaterStrategyNumber,
rattr, lattr))
return -1;
elog(ERROR, "tuplesort: unexpected end of data");
return (void *) tuple;
}
+
+static unsigned int
+tuplesize_index(Tuplesortstate *state, void *tup)
+{
+ IndexTuple tuple = (IndexTuple) tup;
+ unsigned int tuplen = IndexTupleSize(tuple);
+
+ return tuplen;
+}