]> granicus.if.org Git - postgresql/commitdiff
Further performance improvements in sorting: reduce number of comparisons
authorTom Lane <tgl@sss.pgh.pa.us>
Sat, 30 Oct 1999 17:27:15 +0000 (17:27 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Sat, 30 Oct 1999 17:27:15 +0000 (17:27 +0000)
during initial run formation by keeping both current run and next-run
tuples in the same heap (yup, Knuth is smarter than I am).  And, during
merge passes, make use of available sort memory to load multiple tuples
from any one input 'tape' at a time, thereby improving locality of
access to the temp file.

src/backend/utils/sort/tuplesort.c

index 2240564fa25b4986f5f7f445a5d84667bf79a992..5297fde36dcff0b5da385d9fe4a974cebe1535a6 100644 (file)
  * disk space as soon as each block is read from its "tape".
  *
  * We do not form the initial runs using Knuth's recommended replacement
- * selection method (Algorithm 5.4.1R), because it uses a fixed number of
- * records in memory at all times.  Since we are dealing with tuples that
- * may vary considerably in size, we want to be able to vary the number of
- * records kept in memory to ensure full utilization of the allowed sort
- * memory space.  This is easily done by keeping a variable-size heap in
- * which the records of the current run are stored, plus a variable-size
- * unsorted array holding records that must go into the next run.
+ * selection data structure (Algorithm 5.4.1R), because it uses a fixed
+ * number of records in memory at all times.  Since we are dealing with
+ * tuples that may vary considerably in size, we want to be able to vary
+ * the number of records kept in memory to ensure full utilization of the
+ * allowed sort memory space.  So, we keep the tuples in a variable-size
+ * heap, with the next record to go out at the top of the heap.  Like
+ * Algorithm 5.4.1R, each record is stored with the run number that it
+ * must go into, and we use (run number, key) as the ordering key for the
+ * heap.  When the run number at the top of the heap changes, we know that
+ * no more records of the prior run are left in the heap.
  *
  * The (approximate) amount of memory allowed for any one sort operation
  * is given in kilobytes by the external variable SortMem.  Initially,
  * tuples just by scanning the tuple array sequentially.  If we do exceed
  * SortMem, we construct a heap using Algorithm H and begin to emit tuples
  * into sorted runs in temporary tapes, emitting just enough tuples at each
- * step to get back within the SortMem limit.  New tuples are added to the
- * heap if they can go into the current run, else they are temporarily added
- * to the unsorted array.  Whenever the heap empties, we construct a new heap
- * from the current contents of the unsorted array, and begin a new run with a
- * new output tape (selected per Algorithm D).  After the end of the input
- * is reached, we dump out remaining tuples in memory into a final run
- * (or two), then merge the runs using Algorithm D.
+ * step to get back within the SortMem limit.  Whenever the run number at
+ * the top of the heap changes, we begin a new run with a new output tape
+ * (selected per Algorithm D).  After the end of the input is reached,
+ * we dump out remaining tuples in memory into a final run (or two),
+ * then merge the runs using Algorithm D.
+ *
+ * When merging runs, we use a heap containing just the frontmost tuple from
+ * each source run; we repeatedly output the smallest tuple and insert the
+ * next tuple from its source tape (if any).  When the heap empties, the merge
+ * is complete.  The basic merge algorithm thus needs very little memory ---
+ * only M tuples for an M-way merge, and M is at most six in the present code.
+ * However, we can still make good use of our full SortMem allocation by
+ * pre-reading additional tuples from each source tape.  Without prereading,
+ * our access pattern to the temporary file would be very erratic; on average
+ * we'd read one block from each of M source tapes during the same time that
+ * we're writing M blocks to the output tape, so there is no sequentiality of
+ * access at all, defeating the read-ahead methods used by most Unix kernels.
+ * Worse, the output tape gets written into a very random sequence of blocks
+ * of the temp file, ensuring that things will be even worse when it comes
+ * time to read that tape.  A straightforward merge pass thus ends up doing a
+ * lot of waiting for disk seeks.  We can improve matters by prereading from
+ * each source tape sequentially, loading about SortMem/M bytes from each tape
+ * in turn.  Then we run the merge algorithm, writing but not reading until
+ * one of the preloaded tuple series runs out.  Then we switch back to preread
+ * mode, fill memory again, and repeat.  This approach helps to localize both
+ * read and write accesses.
  *
  * When the caller requests random access to the sort result, we form
  * the final sorted run on a logical tape which is then "frozen", so
@@ -55,7 +77,7 @@
  * Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/utils/sort/tuplesort.c,v 1.1 1999/10/17 22:15:05 tgl Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/utils/sort/tuplesort.c,v 1.2 1999/10/30 17:27:15 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -129,30 +151,65 @@ struct Tuplesortstate
         * of memory space consumed.
         */
        void * (*readtup) (Tuplesortstate *state, int tapenum, unsigned int len);
+       /*
+        * Obtain memory space occupied by a stored tuple.  (This routine is
+        * only needed in the FINALMERGE case, since copytup, writetup, and
+        * readtup are expected to adjust availMem appropriately.)
+        */
+       unsigned int (*tuplesize) (Tuplesortstate *state, void *tup);
 
        /*
-        * This array holds "unsorted" tuples during the input phases.
-        * If we are able to complete the sort in memory, it holds the
-        * final sorted result as well.
+        * This array holds pointers to tuples in sort memory.  If we are in
+        * state INITIAL, the tuples are in no particular order; if we are in
+        * state SORTEDINMEM, the tuples are in final sorted order; in states
+        * BUILDRUNS and FINALMERGE, the tuples are organized in "heap" order
+        * per Algorithm H.  (Note that memtupcount only counts the tuples that
+        * are part of the heap --- during merge passes, memtuples[] entries
+        * beyond TAPERANGE are never in the heap and are used to hold
+        * pre-read tuples.)  In state SORTEDONTAPE, the array is not used.
         */
        void      **memtuples;          /* array of pointers to palloc'd tuples */
        int                     memtupcount;    /* number of tuples currently present */
        int                     memtupsize;             /* allocated length of memtuples array */
 
        /*
-        * This array holds the partially-sorted "heap" of tuples that will go
-        * out in the current run during BUILDRUNS state.  While completing
-        * the sort, we use it to merge runs of tuples from input tapes.
-        * It is never allocated unless we need to use tapes.
+        * While building initial runs, this array holds the run number for each
+        * tuple in memtuples[].  During merge passes, we re-use it to hold the
+        * input tape number that each tuple in the heap was read from, or to hold
+        * the index of the next tuple pre-read from the same tape in the case of
+        * pre-read entries.  This array is never allocated unless we need to use
+        * tapes.  Whenever it is allocated, it has the same length as
+        * memtuples[].
         */
-       void      **heaptuples;         /* array of pointers to palloc'd tuples */
-       int                     heaptupcount;   /* number of tuples currently present */
-       int                     heaptupsize;    /* allocated length of heaptuples array */
+       int                *memtupindex;        /* index value associated with memtuples[i] */
+
+       /*
+        * While building initial runs, this is the current output run number
+        * (starting at 0).  Afterwards, it is the number of initial runs we made.
+        */
+       int                     currentRun;
+
        /*
-        * While merging, this array holds the actual number of the input tape
-        * that each tuple in heaptuples[] came from.
+        * These variables are only used during merge passes.  mergeactive[i]
+        * is true if we are reading an input run from (actual) tape number i
+        * and have not yet exhausted that run.  mergenext[i] is the memtuples
+        * index of the next pre-read tuple (next to be loaded into the heap)
+        * for tape i, or 0 if we are out of pre-read tuples.  mergelast[i]
+        * similarly points to the last pre-read tuple from each tape.
+        * mergeavailmem[i] is the amount of unused space allocated for tape i.
+        * mergefreelist and mergefirstfree keep track of unused locations
+        * in the memtuples[] array.  memtupindex[] links together pre-read
+        * tuples for each tape as well as recycled locations in mergefreelist.
+        * It is OK to use 0 as a null link in these lists, because memtuples[0]
+        * is part of the merge heap and is never a pre-read tuple.
         */
-       int                *heapsrctapes;
+       bool            mergeactive[MAXTAPES]; /* Active input run source? */
+       int                     mergenext[MAXTAPES]; /* first preread tuple for each source */
+       int                     mergelast[MAXTAPES]; /* last preread tuple for each source */
+       long            mergeavailmem[MAXTAPES]; /* availMem for prereading tapes */
+       long            spacePerTape;   /* actual per-tape target usage */
+       int                     mergefreelist;  /* head of freelist of recycled slots */
+       int                     mergefirstfree; /* first slot never used in this merge */
 
        /*
         * Variables for Algorithm D.  Note that destTape is a "logical" tape
@@ -166,8 +223,6 @@ struct Tuplesortstate
        int                     tp_dummy[MAXTAPES];     /* # of dummy runs for each tape (D[]) */
        int                     tp_tapenum[MAXTAPES]; /* Actual tape numbers (TAPE[]) */
 
-       bool            multipleRuns;   /* T if we have created more than 1 run */
-
        /*
         * These variables are used after completion of sorting to keep track
         * of the next tuple to return.  (In the tape case, the tape's current
@@ -202,6 +257,7 @@ struct Tuplesortstate
 #define COPYTUP(state,tup)     ((*(state)->copytup) (state, tup))
 #define WRITETUP(state,tape,tup)       ((*(state)->writetup) (state, tape, tup))
 #define READTUP(state,tape,len)        ((*(state)->readtup) (state, tape, len))
+#define TUPLESIZE(state,tup)   ((*(state)->tuplesize) (state, tup))
 #define LACKMEM(state)         ((state)->availMem < 0)
 #define USEMEM(state,amt)      ((state)->availMem -= (amt))
 #define FREEMEM(state,amt)     ((state)->availMem += (amt))
@@ -239,7 +295,7 @@ struct Tuplesortstate
  *
  * We count space requested for tuples against the SortMem limit.
  * Fixed-size space (primarily the LogicalTapeSet I/O buffers) is not
- * counted, nor do we count the variable-size memtuples and heaptuples
+ * counted, nor do we count the variable-size memtuples and memtupindex
  * arrays.  (Even though those could grow pretty large, they should be
  * small compared to the tuples proper, so this is not unreasonable.)
  *
@@ -271,11 +327,11 @@ static void selectnewtape(Tuplesortstate *state);
 static void mergeruns(Tuplesortstate *state);
 static void mergeonerun(Tuplesortstate *state);
 static void beginmerge(Tuplesortstate *state);
-static void beginrun(Tuplesortstate *state);
+static void mergepreread(Tuplesortstate *state);
 static void dumptuples(Tuplesortstate *state, bool alltuples);
 static void tuplesort_heap_insert(Tuplesortstate *state, void *tuple,
-                                                                 int tapenum);
-static void tuplesort_heap_siftup(Tuplesortstate *state);
+                                                                 int tupleindex, bool checkIndex);
+static void tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex);
 static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
 static void markrunend(Tuplesortstate *state, int tapenum);
 static int qsort_comparetup(const void *a, const void *b);
@@ -285,12 +341,14 @@ static void *copytup_heap(Tuplesortstate *state, void *tup);
 static void writetup_heap(Tuplesortstate *state, int tapenum, void *tup);
 static void *readtup_heap(Tuplesortstate *state, int tapenum,
                                                  unsigned int len);
+static unsigned int tuplesize_heap(Tuplesortstate *state, void *tup);
 static int comparetup_index(Tuplesortstate *state,
                                                        const void *a, const void *b);
 static void *copytup_index(Tuplesortstate *state, void *tup);
 static void writetup_index(Tuplesortstate *state, int tapenum, void *tup);
 static void *readtup_index(Tuplesortstate *state, int tapenum,
                                                   unsigned int len);
+static unsigned int tuplesize_index(Tuplesortstate *state, void *tup);
 
 /*
  * Since qsort(3) will not pass any context info to qsort_comparetup(),
@@ -332,10 +390,9 @@ tuplesort_begin_common(bool randomAccess)
        state->memtupsize = 1024;       /* initial guess */
        state->memtuples = (void **) palloc(state->memtupsize * sizeof(void *));
 
-       state->heaptuples = NULL;       /* until and unless needed */
-       state->heaptupcount = 0;
-       state->heaptupsize = 0;
-       state->heapsrctapes = NULL;
+       state->memtupindex = NULL;      /* until and unless needed */
+
+       state->currentRun = 0;
 
        /* Algorithm D variables will be initialized by inittapes, if needed */
 
@@ -359,6 +416,7 @@ tuplesort_begin_heap(TupleDesc tupDesc,
        state->copytup = copytup_heap;
        state->writetup = writetup_heap;
        state->readtup = readtup_heap;
+       state->tuplesize = tuplesize_heap;
 
        state->tupDesc = tupDesc;
        state->nKeys = nkeys;
@@ -378,6 +436,7 @@ tuplesort_begin_index(Relation indexRel,
        state->copytup = copytup_index;
        state->writetup = writetup_index;
        state->readtup = readtup_index;
+       state->tuplesize = tuplesize_index;
 
        state->indexRel = indexRel;
        state->enforceUnique = enforceUnique;
@@ -403,14 +462,8 @@ tuplesort_end(Tuplesortstate *state)
                        pfree(state->memtuples[i]);
                pfree(state->memtuples);
        }
-       if (state->heaptuples)
-       {
-               for (i = 0; i < state->heaptupcount; i++)
-                       pfree(state->heaptuples[i]);
-               pfree(state->heaptuples);
-       }
-       if (state->heapsrctapes)
-               pfree(state->heapsrctapes);
+       if (state->memtupindex)
+               pfree(state->memtupindex);
 }
 
 /*
@@ -450,7 +503,6 @@ tuplesort_puttuple(Tuplesortstate *state, void *tuple)
                         * Nope; time to switch to tape-based operation.
                         */
                        inittapes(state);
-                       beginrun(state);
                        /*
                         * Dump tuples until we are back under the limit.
                         */
@@ -458,36 +510,23 @@ tuplesort_puttuple(Tuplesortstate *state, void *tuple)
                        break;
                case TSS_BUILDRUNS:
                        /*
-                        * Insert the copied tuple into the heap if it can go into the
-                        * current run; otherwise add it to the unsorted array, whence
-                        * it will go into the next run.
-                        *
-                        * The tuple can go into the current run if it is >= the first
-                        * not-yet-output tuple.  (Actually, it could go into the current
-                        * run if it is >= the most recently output tuple ... but that
-                        * would require keeping around the tuple we last output, and
-                        * it's simplest to let writetup free the tuple when written.)
+                        * Insert the copied tuple into the heap, with run number
+                        * currentRun if it can go into the current run, else run
+                        * number currentRun+1.  The tuple can go into the current run
+                        * if it is >= the first not-yet-output tuple.  (Actually,
+                        * it could go into the current run if it is >= the most recently
+                        * output tuple ... but that would require keeping around the
+                        * tuple we last output, and it's simplest to let writetup free
+                        * each tuple as soon as it's written.)
                         *
                         * Note there will always be at least one tuple in the heap
                         * at this point; see dumptuples.
                         */
-                       Assert(state->heaptupcount > 0);
-                       if (COMPARETUP(state, tuple, state->heaptuples[0]) >= 0)
-                       {
-                               tuplesort_heap_insert(state, tuple, 0);
-                       }
+                       Assert(state->memtupcount > 0);
+                       if (COMPARETUP(state, tuple, state->memtuples[0]) >= 0)
+                               tuplesort_heap_insert(state, tuple, state->currentRun, true);
                        else
-                       {
-                               if (state->memtupcount >= state->memtupsize)
-                               {
-                                       /* Grow the unsorted array as needed. */
-                                       state->memtupsize *= 2;
-                                       state->memtuples = (void **)
-                                               repalloc(state->memtuples,
-                                                                state->memtupsize * sizeof(void *));
-                               }
-                               state->memtuples[state->memtupcount++] = tuple;
-                       }
+                               tuplesort_heap_insert(state, tuple, state->currentRun+1, true);
                        /*
                         * If we are over the memory limit, dump tuples till we're under.
                         */
@@ -529,7 +568,7 @@ tuplesort_performsort(Tuplesortstate *state)
                         * Finish tape-based sort.  First, flush all tuples remaining
                         * in memory out to tape; then merge until we have a single
                         * remaining run (or, if !randomAccess, one run per tape).
-                        * Note that mergeruns sets the correct status.
+                        * Note that mergeruns sets the correct state->status.
                         */
                        dumptuples(state, true);
                        mergeruns(state);
@@ -675,17 +714,35 @@ tuplesort_gettuple(Tuplesortstate *state, bool forward,
                        /*
                         * This code should match the inner loop of mergeonerun().
                         */
-                       if (state->heaptupcount > 0)
+                       if (state->memtupcount > 0)
                        {
-                               int             srcTape = state->heapsrctapes[0];
-
-                               tup = state->heaptuples[0];
-                               tuplesort_heap_siftup(state);
-                               if ((tuplen = getlen(state, srcTape, true)) != 0)
+                               int                             srcTape = state->memtupindex[0];
+                               unsigned int    tuplen;
+                               int                             tupIndex;
+                               void               *newtup;
+
+                               tup = state->memtuples[0];
+                               /* returned tuple is no longer counted in our memory space */
+                               tuplen = TUPLESIZE(state, tup);
+                               state->availMem += tuplen;
+                               state->mergeavailmem[srcTape] += tuplen;
+                               tuplesort_heap_siftup(state, false);
+                               if ((tupIndex = state->mergenext[srcTape]) == 0)
                                {
-                                       void   *newtup = READTUP(state, srcTape, tuplen);
-                                       tuplesort_heap_insert(state, newtup, srcTape);
+                                       /* out of preloaded data on this tape, try to read more */
+                                       mergepreread(state);
+                                       /* if still no data, we've reached end of run on this tape */
+                                       if ((tupIndex = state->mergenext[srcTape]) == 0)
+                                               return tup;
                                }
+                               /* pull next preread tuple from list, insert in heap */
+                               newtup = state->memtuples[tupIndex];
+                               state->mergenext[srcTape] = state->memtupindex[tupIndex];
+                               if (state->mergenext[srcTape] == 0)
+                                       state->mergelast[srcTape] = 0;
+                               state->memtupindex[tupIndex] = state->mergefreelist;
+                               state->mergefreelist = tupIndex;
+                               tuplesort_heap_insert(state, newtup, srcTape, false);
                                return tup;
                        }
                        return NULL;
@@ -704,18 +761,31 @@ tuplesort_gettuple(Tuplesortstate *state, bool forward,
 static void
 inittapes(Tuplesortstate *state)
 {
-       int                     j;
+       int                     ntuples,
+                               j;
 
        state->tapeset = LogicalTapeSetCreate(MAXTAPES);
 
        /*
-        * Initialize heaptuples array slightly larger than current memtuples
-        * usage; memtupcount is probably a good guess at how many tuples we
-        * will be able to have in the heap at once.
+        * Allocate the memtupindex array, same size as memtuples.
+        */
+       state->memtupindex = (int *) palloc(state->memtupsize * sizeof(int));
+
+       /*
+        * Convert the unsorted contents of memtuples[] into a heap.
+        * Each tuple is marked as belonging to run number zero.
+        *
+        * NOTE: we pass false for checkIndex since there's no point in
+        * comparing indexes in this step, even though we do intend the
+        * indexes to be part of the sort key...
         */
-       state->heaptupcount = 0;
-       state->heaptupsize = state->memtupcount + state->memtupcount / 4;
-       state->heaptuples = (void **) palloc(state->heaptupsize * sizeof(void *));
+       ntuples = state->memtupcount;
+       state->memtupcount = 0;                 /* make the heap empty */
+       for (j = 0; j < ntuples; j++)
+               tuplesort_heap_insert(state, state->memtuples[j], 0, false);
+       Assert(state->memtupcount == ntuples);
+
+       state->currentRun = 0;
 
        /*
         * Initialize variables of Algorithm D (step D1).
@@ -733,8 +803,6 @@ inittapes(Tuplesortstate *state)
        state->Level = 1;
        state->destTape = 0;
 
-       state->multipleRuns = false;
-
        state->status = TSS_BUILDRUNS;
 }
 
@@ -750,9 +818,6 @@ selectnewtape(Tuplesortstate *state)
        int             j;
        int             a;
 
-       /* We now have at least two initial runs */
-       state->multipleRuns = true;
-
        /* Step D3: advance j (destTape) */
        if (state->tp_dummy[state->destTape] < state->tp_dummy[state->destTape+1])
        {
@@ -791,13 +856,13 @@ mergeruns(Tuplesortstate *state)
                        svDummy;
 
        Assert(state->status == TSS_BUILDRUNS);
-       Assert(state->memtupcount == 0 && state->heaptupcount == 0);
+       Assert(state->memtupcount == 0);
        /*
         * If we produced only one initial run (quite likely if the total
         * data volume is between 1X and 2X SortMem), we can just use that
         * tape as the finished output, rather than doing a useless merge.
         */
-       if (! state->multipleRuns)
+       if (state->currentRun == 1)
        {
                state->result_tape = state->tp_tapenum[state->destTape];
                /* must freeze and rewind the finished output tape */
@@ -896,8 +961,10 @@ mergeonerun(Tuplesortstate *state)
 {
        int                             destTape = state->tp_tapenum[TAPERANGE];
        int                             srcTape;
-       unsigned int    tuplen;
+       int                             tupIndex;
        void               *tup;
+       long                    priorAvail,
+                                       spaceFreed;
 
        /*
         * Start the merge by loading one tuple from each active source tape
@@ -910,18 +977,34 @@ mergeonerun(Tuplesortstate *state)
         * writing it out, and replacing it with next tuple from same tape
         * (if there is another one).
         */
-       while (state->heaptupcount > 0)
+       while (state->memtupcount > 0)
        {
-               WRITETUP(state, destTape, state->heaptuples[0]);
-               srcTape = state->heapsrctapes[0];
-               tuplesort_heap_siftup(state);
-               if ((tuplen = getlen(state, srcTape, true)) != 0)
+               /* write the tuple to destTape */
+               priorAvail = state->availMem;
+               srcTape = state->memtupindex[0];
+               WRITETUP(state, destTape, state->memtuples[0]);
+               /* writetup adjusted total free space, now fix per-tape space */
+               spaceFreed = state->availMem - priorAvail;
+               state->mergeavailmem[srcTape] += spaceFreed;
+               /* compact the heap */
+               tuplesort_heap_siftup(state, false);
+               if ((tupIndex = state->mergenext[srcTape]) == 0)
                {
-                       tup = READTUP(state, srcTape, tuplen);
-                       tuplesort_heap_insert(state, tup, srcTape);
+                       /* out of preloaded data on this tape, try to read more */
+                       mergepreread(state);
+                       /* if still no data, we've reached end of run on this tape */
+                       if ((tupIndex = state->mergenext[srcTape]) == 0)
+                               continue;
                }
+               /* pull next preread tuple from list, insert in heap */
+               tup = state->memtuples[tupIndex];
+               state->mergenext[srcTape] = state->memtupindex[tupIndex];
+               if (state->mergenext[srcTape] == 0)
+                       state->mergelast[srcTape] = 0;
+               state->memtupindex[tupIndex] = state->mergefreelist;
+               state->mergefreelist = tupIndex;
+               tuplesort_heap_insert(state, tup, srcTape, false);
        }
-
        /*
         * When the heap empties, we're done.  Write an end-of-run marker
         * on the output tape, and increment its count of real runs.
@@ -933,21 +1016,31 @@ mergeonerun(Tuplesortstate *state)
 /*
  * beginmerge - initialize for a merge pass
  *
- * We load the first tuple from each nondummy input run into the heap.
- * We also decrease the counts of real and dummy runs for each tape.
+ * We decrease the counts of real and dummy runs for each tape, and mark
+ * which tapes contain active input runs in mergeactive[].  Then, load
+ * as many tuples as we can from each active input tape, and finally
+ * fill the merge heap with the first tuple from each active tape.
  */
 static void
 beginmerge(Tuplesortstate *state)
 {
+       int                             activeTapes;
        int                             tapenum;
        int                             srcTape;
-       unsigned int    tuplen;
-       void               *tup;
 
-       Assert(state->heaptuples != NULL && state->heaptupcount == 0);
-       if (state->heapsrctapes == NULL)
-               state->heapsrctapes = (int *) palloc(MAXTAPES * sizeof(int));
+       /* Heap should be empty here */
+       Assert(state->memtupcount == 0);
+
+       /* Clear merge-pass state variables */
+       memset(state->mergeactive, 0, sizeof(state->mergeactive));
+       memset(state->mergenext, 0, sizeof(state->mergenext));
+       memset(state->mergelast, 0, sizeof(state->mergelast));
+       memset(state->mergeavailmem, 0, sizeof(state->mergeavailmem));
+       state->mergefreelist = 0;       /* nothing in the freelist */
+       state->mergefirstfree = MAXTAPES; /* first slot available for preread */
 
+       /* Adjust run counts and mark the active tapes */
+       activeTapes = 0;
        for (tapenum = 0; tapenum < TAPERANGE; tapenum++)
        {
                if (state->tp_dummy[tapenum] > 0)
@@ -959,34 +1052,135 @@ beginmerge(Tuplesortstate *state)
                        Assert(state->tp_runs[tapenum] > 0);
                        state->tp_runs[tapenum]--;
                        srcTape = state->tp_tapenum[tapenum];
-                       tuplen = getlen(state, srcTape, false);
-                       tup = READTUP(state, srcTape, tuplen);
-                       tuplesort_heap_insert(state, tup, srcTape);
+                       state->mergeactive[srcTape] = true;
+                       activeTapes++;
                }
        }
 
+       /*
+        * Initialize space allocation to let each active input tape have
+        * an equal share of preread space.
+        */
+       Assert(activeTapes > 0);
+       state->spacePerTape = state->availMem / activeTapes;
+       for (srcTape = 0; srcTape < MAXTAPES; srcTape++)
+       {
+               if (state->mergeactive[srcTape])
+                       state->mergeavailmem[srcTape] = state->spacePerTape;
+       }
+
+       /*
+        * Preread as many tuples as possible (and at least one) from each
+        * active tape
+        */
+       mergepreread(state);
+
+       /* Load the merge heap with the first tuple from each input tape */
+       for (srcTape = 0; srcTape < MAXTAPES; srcTape++)
+       {
+               int                     tupIndex = state->mergenext[srcTape];
+               void       *tup;
+
+               if (tupIndex)
+               {
+                       tup = state->memtuples[tupIndex];
+                       state->mergenext[srcTape] = state->memtupindex[tupIndex];
+                       if (state->mergenext[srcTape] == 0)
+                               state->mergelast[srcTape] = 0;
+                       state->memtupindex[tupIndex] = state->mergefreelist;
+                       state->mergefreelist = tupIndex;
+                       tuplesort_heap_insert(state, tup, srcTape, false);
+               }
+       }
 }
 
 /*
- * beginrun - start a new initial run
+ * mergepreread - load tuples from merge input tapes
  *
- * The tuples presently in the unsorted memory array are moved into
- * the heap.
+ * This routine exists to improve sequentiality of reads during a merge pass,
+ * as explained in the header comments of this file.  Load tuples from each
+ * active source tape until the tape's run is exhausted or it has used up
+ * its fair share of available memory.  In any case, we guarantee that there
+ * is at one preread tuple available from each unexhausted input tape.
  */
 static void
-beginrun(Tuplesortstate *state)
+mergepreread(Tuplesortstate *state)
 {
-       int             i;
+       int                             srcTape;
+       unsigned int    tuplen;
+       void               *tup;
+       int                             tupIndex;
+       long                    priorAvail,
+                                       spaceUsed;
 
-       Assert(state->heaptupcount == 0 && state->memtupcount > 0);
-       for (i = 0; i < state->memtupcount; i++)
-               tuplesort_heap_insert(state, state->memtuples[i], 0);
-       state->memtupcount = 0;
+       for (srcTape = 0; srcTape < MAXTAPES; srcTape++)
+       {
+               if (! state->mergeactive[srcTape])
+                       continue;
+               /*
+                * Skip reading from any tape that still has at least half
+                * of its target memory filled with tuples (threshold fraction
+                * may need adjustment?).  This avoids reading just a few tuples
+                * when the incoming runs are not being consumed evenly.
+                */
+               if (state->mergenext[srcTape] != 0 &&
+                       state->mergeavailmem[srcTape] <= state->spacePerTape / 2)
+                       continue;
+               /*
+                * Read tuples from this tape until it has used up its free memory,
+                * but ensure that we have at least one.
+                */
+               priorAvail = state->availMem;
+               state->availMem = state->mergeavailmem[srcTape];
+               while (! LACKMEM(state) || state->mergenext[srcTape] == 0)
+               {
+                       /* read next tuple, if any */
+                       if ((tuplen = getlen(state, srcTape, true)) == 0)
+                       {
+                               state->mergeactive[srcTape] = false;
+                               break;
+                       }
+                       tup = READTUP(state, srcTape, tuplen);
+                       /* find or make a free slot in memtuples[] for it */
+                       tupIndex = state->mergefreelist;
+                       if (tupIndex)
+                               state->mergefreelist = state->memtupindex[tupIndex];
+                       else
+                       {
+                               tupIndex = state->mergefirstfree++;
+                               /* Might need to enlarge arrays! */
+                               if (tupIndex >= state->memtupsize)
+                               {
+                                       state->memtupsize *= 2;
+                                       state->memtuples = (void **)
+                                               repalloc(state->memtuples,
+                                                                state->memtupsize * sizeof(void *));
+                                       state->memtupindex = (int *)
+                                               repalloc(state->memtupindex,
+                                                                state->memtupsize * sizeof(int));
+                               }
+                       }
+                       /* store tuple, append to list for its tape */
+                       state->memtuples[tupIndex] = tup;
+                       state->memtupindex[tupIndex] = 0;
+                       if (state->mergelast[srcTape])
+                               state->memtupindex[state->mergelast[srcTape]] = tupIndex;
+                       else
+                               state->mergenext[srcTape] = tupIndex;
+                       state->mergelast[srcTape] = tupIndex;
+               }
+               /* update per-tape and global availmem counts */
+               spaceUsed = state->mergeavailmem[srcTape] - state->availMem;
+               state->mergeavailmem[srcTape] = state->availMem;
+               state->availMem = priorAvail - spaceUsed;
+       }
 }
 
 /*
  * dumptuples - remove tuples from heap and write to tape
  *
+ * This is used during initial-run building, but not during merging.
+ *
  * When alltuples = false, dump only enough tuples to get under the
  * availMem limit (and leave at least one tuple in the heap in any case,
  * since puttuple assumes it always has a tuple to compare to).
@@ -994,37 +1188,42 @@ beginrun(Tuplesortstate *state)
  * When alltuples = true, dump everything currently in memory.
  * (This case is only used at end of input data.)
  *
- * If we empty the heap, then start a new run using the tuples that
- * have accumulated in memtuples[] (if any).
+ * If we empty the heap, close out the current run and return (this should
+ * only happen at end of input data).  If we see that the tuple run number
+ * at the top of the heap has changed, start a new run.
  */
 static void
 dumptuples(Tuplesortstate *state, bool alltuples)
 {
        while (alltuples ||
-                  (LACKMEM(state) &&
-                       (state->heaptupcount > 0 || state->memtupcount > 0)))
+                  (LACKMEM(state) && state->memtupcount > 1))
        {
                /*
                 * Dump the heap's frontmost entry, and sift up to remove it
                 * from the heap.
                 */
-               Assert(state->heaptupcount > 0);
+               Assert(state->memtupcount > 0);
                WRITETUP(state, state->tp_tapenum[state->destTape],
-                                state->heaptuples[0]);
-               tuplesort_heap_siftup(state);
+                                state->memtuples[0]);
+               tuplesort_heap_siftup(state, true);
                /*
-                * If the heap is now empty, we've finished a run.
+                * If the heap is empty *or* top run number has changed,
+                * we've finished the current run.
                 */
-               if (state->heaptupcount == 0)
+               if (state->memtupcount == 0 ||
+                       state->currentRun != state->memtupindex[0])
                {
                        markrunend(state, state->tp_tapenum[state->destTape]);
+                       state->currentRun++;
                        state->tp_runs[state->destTape]++;
                        state->tp_dummy[state->destTape]--;     /* per Alg D step D2 */
+                       /*
+                        * Done if heap is empty, else prepare for new run.
+                        */
                        if (state->memtupcount == 0)
-                               break;                  /* all input data has been written to tape */
-                       /* Select new output tape and start a new run */
+                               break;
+                       Assert(state->currentRun == state->memtupindex[0]);
                        selectnewtape(state);
-                       beginrun(state);
                }
        }
 }
@@ -1119,88 +1318,102 @@ tuplesort_restorepos(Tuplesortstate *state)
 
 /*
  * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
+ *
+ * The heap lives in state->memtuples[], with parallel data storage
+ * for indexes in state->memtupindex[].  If checkIndex is true, use
+ * the tuple index as the front of the sort key; otherwise, no.
  */
 
+#define HEAPCOMPARE(tup1,index1,tup2,index2) \
+       (checkIndex && (index1 != index2) ? index1 - index2 : \
+        COMPARETUP(state, tup1, tup2))
+
 /*
  * Insert a new tuple into an empty or existing heap, maintaining the
- * heap invariant.  The heap lives in state->heaptuples[].  Also, if
- * state->heapsrctapes is not NULL, we store each tuple's source tapenum
- * in the corresponding element of state->heapsrctapes[].
+ * heap invariant.
  */
 static void
 tuplesort_heap_insert(Tuplesortstate *state, void *tuple,
-                                         int tapenum)
+                                         int tupleindex, bool checkIndex)
 {
+       void  **memtuples;
+       int        *memtupindex;
        int             j;
 
        /*
-        * Make sure heaptuples[] can handle another entry.
-        * NOTE: we do not enlarge heapsrctapes[]; it's supposed
-        * to be big enough when created.
+        * Make sure memtuples[] can handle another entry.
         */
-       if (state->heaptupcount >= state->heaptupsize)
+       if (state->memtupcount >= state->memtupsize)
        {
-               /* Grow the unsorted array as needed. */
-               state->heaptupsize *= 2;
-               state->heaptuples = (void **)
-                       repalloc(state->heaptuples,
-                                        state->heaptupsize * sizeof(void *));
+               state->memtupsize *= 2;
+               state->memtuples = (void **)
+                       repalloc(state->memtuples,
+                                        state->memtupsize * sizeof(void *));
+               state->memtupindex = (int *)
+                       repalloc(state->memtupindex,
+                                        state->memtupsize * sizeof(int));
        }
+       memtuples = state->memtuples;
+       memtupindex = state->memtupindex;
        /*
         * Sift-up the new entry, per Knuth 5.2.3 exercise 16.
         * Note that Knuth is using 1-based array indexes, not 0-based.
         */
-       j = state->heaptupcount++;
-       while (j > 0) {
+       j = state->memtupcount++;
+       while (j > 0)
+       {
                int             i = (j-1) >> 1;
 
-               if (COMPARETUP(state, tuple, state->heaptuples[i]) >= 0)
+               if (HEAPCOMPARE(tuple, tupleindex,
+                                               memtuples[i], memtupindex[i]) >= 0)
                        break;
-               state->heaptuples[j] = state->heaptuples[i];
-               if (state->heapsrctapes)
-                       state->heapsrctapes[j] = state->heapsrctapes[i];
+               memtuples[j] = memtuples[i];
+               memtupindex[j] = memtupindex[i];
                j = i;
        }
-       state->heaptuples[j] = tuple;
-       if (state->heapsrctapes)
-               state->heapsrctapes[j] = tapenum;
+       memtuples[j] = tuple;
+       memtupindex[j] = tupleindex;
 }
 
 /*
- * The tuple at state->heaptuples[0] has been removed from the heap.
- * Decrement heaptupcount, and sift up to maintain the heap invariant.
+ * The tuple at state->memtuples[0] has been removed from the heap.
+ * Decrement memtupcount, and sift up to maintain the heap invariant.
  */
 static void
-tuplesort_heap_siftup(Tuplesortstate *state)
+tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex)
 {
-       void  **heaptuples = state->heaptuples;
+       void  **memtuples = state->memtuples;
+       int        *memtupindex = state->memtupindex;
        void   *tuple;
-       int             i,
+       int             tupindex,
+                       i,
                        n;
 
-       if (--state->heaptupcount <= 0)
+       if (--state->memtupcount <= 0)
                return;
-       n = state->heaptupcount;
-       tuple = heaptuples[n];          /* tuple that must be reinserted */
+       n = state->memtupcount;
+       tuple = memtuples[n];           /* tuple that must be reinserted */
+       tupindex = memtupindex[n];
        i = 0;                                          /* i is where the "hole" is */
-    for (;;) {
+    for (;;)
+       {
                int             j = 2*i + 1;
 
                if (j >= n)
                        break;
                if (j+1 < n &&
-                       COMPARETUP(state, heaptuples[j], heaptuples[j+1]) > 0)
+                       HEAPCOMPARE(memtuples[j], memtupindex[j],
+                                               memtuples[j+1], memtupindex[j+1]) > 0)
                        j++;
-               if (COMPARETUP(state, tuple, heaptuples[j]) <= 0)
+               if (HEAPCOMPARE(tuple, tupindex,
+                                               memtuples[j], memtupindex[j]) <= 0)
                        break;
-               heaptuples[i] = heaptuples[j];
-               if (state->heapsrctapes)
-                       state->heapsrctapes[i] = state->heapsrctapes[j];
+               memtuples[i] = memtuples[j];
+               memtupindex[i] = memtupindex[j];
                i = j;
     }
-    heaptuples[i] = tuple;
-       if (state->heapsrctapes)
-               state->heapsrctapes[i] = state->heapsrctapes[n];
+    memtuples[i] = tuple;
+       memtupindex[i] = tupindex;
 }
 
 
@@ -1252,6 +1465,7 @@ comparetup_heap(Tuplesortstate *state, const void *a, const void *b)
 {
        HeapTuple       ltup = (HeapTuple) a;
        HeapTuple       rtup = (HeapTuple) b;
+       TupleDesc       tupDesc = state->tupDesc;
        int                     nkey;
 
        for (nkey = 0; nkey < state->nKeys; nkey++)
@@ -1265,11 +1479,11 @@ comparetup_heap(Tuplesortstate *state, const void *a, const void *b)
 
                lattr = heap_getattr(ltup,
                                                         scanKey->sk_attno,
-                                                        state->tupDesc,
+                                                        tupDesc,
                                                         &isnull1);
                rattr = heap_getattr(rtup,
                                                         scanKey->sk_attno,
-                                                        state->tupDesc,
+                                                        tupDesc,
                                                         &isnull2);
                if (isnull1)
                {
@@ -1351,6 +1565,14 @@ readtup_heap(Tuplesortstate *state, int tapenum, unsigned int len)
        return (void *) tuple;
 }
 
+static unsigned int
+tuplesize_heap(Tuplesortstate *state, void *tup)
+{
+       HeapTuple       tuple = (HeapTuple) tup;
+
+       return HEAPTUPLESIZE + tuple->t_len;
+}
+
 
 /*
  * Routines specialized for IndexTuple case
@@ -1368,16 +1590,17 @@ comparetup_index(Tuplesortstate *state, const void *a, const void *b)
        IndexTuple      rtup = (IndexTuple) b;
        TupleDesc       itdesc = state->indexRel->rd_att;
        bool            equal_isnull = false;
-       Datum           lattr,
-                               rattr;
-       bool            isnull1,
-                               isnull2;
        int                     i;
 
-       for (i = 0; i < itdesc->natts; i++)
+       for (i = 1; i <= itdesc->natts; i++)
        {
-               lattr = index_getattr(ltup, i + 1, itdesc, &isnull1);
-               rattr = index_getattr(rtup, i + 1, itdesc, &isnull2);
+               Datum           lattr,
+                                       rattr;
+               bool            isnull1,
+                                       isnull2;
+
+               lattr = index_getattr(ltup, i, itdesc, &isnull1);
+               rattr = index_getattr(rtup, i, itdesc, &isnull2);
 
                if (isnull1)
                {
@@ -1389,11 +1612,11 @@ comparetup_index(Tuplesortstate *state, const void *a, const void *b)
                else if (isnull2)
                        return -1;
 
-               if (_bt_invokestrat(state->indexRel, i + 1,
+               if (_bt_invokestrat(state->indexRel, i,
                                                        BTGreaterStrategyNumber,
                                                        lattr, rattr))
                        return 1;
-               if (_bt_invokestrat(state->indexRel, i + 1,
+               if (_bt_invokestrat(state->indexRel, i,
                                                        BTGreaterStrategyNumber,
                                                        rattr, lattr))
                        return -1;
@@ -1463,3 +1686,12 @@ readtup_index(Tuplesortstate *state, int tapenum, unsigned int len)
                        elog(ERROR, "tuplesort: unexpected end of data");
        return (void *) tuple;
 }
+
+static unsigned int
+tuplesize_index(Tuplesortstate *state, void *tup)
+{
+       IndexTuple              tuple = (IndexTuple) tup;
+       unsigned int    tuplen = IndexTupleSize(tuple);
+
+       return tuplen;
+}