]> granicus.if.org Git - postgresql/blobdiff - src/backend/utils/sort/tuplesort.c
Improve memory management for external sorts.
[postgresql] / src / backend / utils / sort / tuplesort.c
index 67d86ed83b48cd125d407c59d01a0e0dacb8095a..c3f666e2f114a0433a75d6ba97e7b235bf9699ab 100644 (file)
@@ -138,7 +138,8 @@ bool                optimize_bounded_sort = true;
  * The objects we actually sort are SortTuple structs.  These contain
  * a pointer to the tuple proper (might be a MinimalTuple or IndexTuple),
  * which is a separate palloc chunk --- we assume it is just one chunk and
- * can be freed by a simple pfree().  SortTuples also contain the tuple's
+ * can be freed by a simple pfree() (except during final on-the-fly merge,
+ * when memory is used in batch).  SortTuples also contain the tuple's
  * first key column in Datum/nullflag format, and an index integer.
  *
  * Storing the first key column lets us save heap_getattr or index_getattr
@@ -220,11 +221,13 @@ struct Tuplesortstate
                                                                 * tuples to return? */
        bool            boundUsed;              /* true if we made use of a bounded heap */
        int                     bound;                  /* if bounded, the maximum number of tuples */
+       bool            tuples;                 /* Can SortTuple.tuple ever be set? */
        int64           availMem;               /* remaining memory available, in bytes */
        int64           allowedMem;             /* total memory allowed, in bytes */
        int                     maxTapes;               /* number of tapes (Knuth's T) */
        int                     tapeRange;              /* maxTapes-1 (Knuth's P) */
-       MemoryContext sortcontext;      /* memory context holding all sort data */
+       MemoryContext sortcontext;      /* memory context holding most sort data */
+       MemoryContext tuplecontext;     /* memory context holding tuple data */
        LogicalTapeSet *tapeset;        /* logtape.c object for tapes in a temp file */
 
        /*
@@ -280,6 +283,15 @@ struct Tuplesortstate
        int                     memtupsize;             /* allocated length of memtuples array */
        bool            growmemtuples;  /* memtuples' growth still underway? */
 
+       /*
+        * Memory for tuples is sometimes allocated in batch, rather than
+        * incrementally.  This implies that incremental memory accounting has been
+        * abandoned.  Currently, this only happens for the final on-the-fly merge
+        * step.  Large batch allocations can store tuples (e.g. IndexTuples)
+        * without palloc() fragmentation and other overhead.
+        */
+       bool            batchUsed;
+
        /*
         * While building initial runs, this is the current output run number
         * (starting at 0).  Afterwards, it is the number of initial runs we made.
@@ -314,6 +326,21 @@ struct Tuplesortstate
        int                     mergefreelist;  /* head of freelist of recycled slots */
        int                     mergefirstfree; /* first slot never used in this merge */
 
+       /*
+        * Per-tape batch state, when final on-the-fly merge consumes memory from
+        * just a few large allocations.
+        *
+        * Aside from the general benefits of performing fewer individual retail
+        * palloc() calls, this also helps make merging more cache efficient, since
+        * each tape's tuples must naturally be accessed sequentially (in sorted
+        * order).
+        */
+       int64           spacePerTape;   /* Space (memory) for tuples (not slots) */
+       char      **mergetuples;        /* Each tape's memory allocation */
+       char      **mergecurrent;       /* Current offset into each tape's memory */
+       char      **mergetail;          /* Last item's start point for each tape */
+       char      **mergeoverflow;      /* Retail palloc() "overflow" for each tape */
+
        /*
         * Variables for Algorithm D.  Note that destTape is a "logical" tape
         * number, ie, an index into the tp_xxx[] arrays.  Be careful to keep
@@ -389,9 +416,8 @@ struct Tuplesortstate
         * tuplesort_begin_datum and used only by the DatumTuple routines.
         */
        Oid                     datumType;
-       /* we need typelen and byval in order to know how to copy the Datums. */
+       /* we need typelen in order to know how to copy the Datums. */
        int                     datumTypeLen;
-       bool            datumTypeByVal;
 
        /*
         * Resource snapshot for time of sort start.
@@ -405,7 +431,7 @@ struct Tuplesortstate
 #define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup))
 #define WRITETUP(state,tape,stup)      ((*(state)->writetup) (state, tape, stup))
 #define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len))
-#define LACKMEM(state)         ((state)->availMem < 0)
+#define LACKMEM(state)         ((state)->availMem < 0 && !(state)->batchUsed)
 #define USEMEM(state,amt)      ((state)->availMem -= (amt))
 #define FREEMEM(state,amt)     ((state)->availMem += (amt))
 
@@ -447,7 +473,13 @@ struct Tuplesortstate
  * rather than the originally-requested size.  This is important since
  * palloc can add substantial overhead.  It's not a complete answer since
  * we won't count any wasted space in palloc allocation blocks, but it's
- * a lot better than what we were doing before 7.3.
+ * a lot better than what we were doing before 7.3.  As of 9.6, a
+ * separate memory context is used for caller passed tuples.  Resetting
+ * it at certain key increments significantly ameliorates fragmentation.
+ * Note that this places a responsibility on readtup and copytup routines
+ * to use the right memory context for these tuples (and to not use the
+ * reset context for anything whose lifetime needs to span multiple
+ * external sort runs).
  */
 
 /* When using this macro, beware of double evaluation of len */
@@ -465,7 +497,14 @@ static void inittapes(Tuplesortstate *state);
 static void selectnewtape(Tuplesortstate *state);
 static void mergeruns(Tuplesortstate *state);
 static void mergeonerun(Tuplesortstate *state);
-static void beginmerge(Tuplesortstate *state);
+static void beginmerge(Tuplesortstate *state, bool finalMerge);
+static void batchmemtuples(Tuplesortstate *state);
+static void mergebatch(Tuplesortstate *state, int64 spacePerTape);
+static void mergebatchone(Tuplesortstate *state, int srcTape,
+                                         SortTuple *stup, bool *should_free);
+static void mergebatchfreetape(Tuplesortstate *state, int srcTape,
+                                                          SortTuple *rtup, bool *should_free);
+static void *mergebatchalloc(Tuplesortstate *state, int tapenum, Size tuplen);
 static void mergepreread(Tuplesortstate *state);
 static void mergeprereadone(Tuplesortstate *state, int srcTape);
 static void dumptuples(Tuplesortstate *state, bool alltuples);
@@ -477,6 +516,7 @@ static void tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex);
 static void reversedirection(Tuplesortstate *state);
 static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
 static void markrunend(Tuplesortstate *state, int tapenum);
+static void *readtup_alloc(Tuplesortstate *state, int tapenum, Size tuplen);
 static int comparetup_heap(const SortTuple *a, const SortTuple *b,
                                Tuplesortstate *state);
 static void copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup);
@@ -543,6 +583,7 @@ tuplesort_begin_common(int workMem, bool randomAccess)
 {
        Tuplesortstate *state;
        MemoryContext sortcontext;
+       MemoryContext tuplecontext;
        MemoryContext oldcontext;
 
        /*
@@ -550,11 +591,26 @@ tuplesort_begin_common(int workMem, bool randomAccess)
         * needed by the sort will live inside this context.
         */
        sortcontext = AllocSetContextCreate(CurrentMemoryContext,
-                                                                               "TupleSort",
+                                                                               "TupleSort main",
                                                                                ALLOCSET_DEFAULT_MINSIZE,
                                                                                ALLOCSET_DEFAULT_INITSIZE,
                                                                                ALLOCSET_DEFAULT_MAXSIZE);
 
+       /*
+        * Caller tuple (e.g. IndexTuple) memory context.
+        *
+        * A dedicated child content used exclusively for caller passed tuples
+        * eases memory management.  Resetting at key points reduces fragmentation.
+        * Note that the memtuples array of SortTuples is allocated in the parent
+        * context, not this context, because there is no need to free memtuples
+        * early.
+        */
+       tuplecontext = AllocSetContextCreate(sortcontext,
+                                                                                "Caller tuples",
+                                                                                ALLOCSET_DEFAULT_MINSIZE,
+                                                                                ALLOCSET_DEFAULT_INITSIZE,
+                                                                                ALLOCSET_DEFAULT_MAXSIZE);
+
        /*
         * Make the Tuplesortstate within the per-sort context.  This way, we
         * don't need a separate pfree() operation for it at shutdown.
@@ -571,10 +627,12 @@ tuplesort_begin_common(int workMem, bool randomAccess)
        state->status = TSS_INITIAL;
        state->randomAccess = randomAccess;
        state->bounded = false;
+       state->tuples = true;
        state->boundUsed = false;
        state->allowedMem = workMem * (int64) 1024;
        state->availMem = state->allowedMem;
        state->sortcontext = sortcontext;
+       state->tuplecontext = tuplecontext;
        state->tapeset = NULL;
 
        state->memtupcount = 0;
@@ -587,6 +645,7 @@ tuplesort_begin_common(int workMem, bool randomAccess)
                                                ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1);
 
        state->growmemtuples = true;
+       state->batchUsed = false;
        state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
 
        USEMEM(state, GetMemoryChunkSpace(state->memtuples));
@@ -922,7 +981,7 @@ tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
        /* lookup necessary attributes of the datum type */
        get_typlenbyval(datumType, &typlen, &typbyval);
        state->datumTypeLen = typlen;
-       state->datumTypeByVal = typbyval;
+       state->tuples = !typbyval;
 
        /* Prepare SortSupport data */
        state->sortKeys = (SortSupport) palloc0(sizeof(SortSupportData));
@@ -1258,7 +1317,7 @@ tuplesort_putindextuplevalues(Tuplesortstate *state, Relation rel,
                                                          ItemPointer self, Datum *values,
                                                          bool *isnull)
 {
-       MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
+       MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
        SortTuple       stup;
        Datum           original;
        IndexTuple      tuple;
@@ -1273,6 +1332,8 @@ tuplesort_putindextuplevalues(Tuplesortstate *state, Relation rel,
                                                         RelationGetDescr(state->indexRel),
                                                         &stup.isnull1);
 
+       MemoryContextSwitchTo(state->sortcontext);
+
        if (!state->sortKeys || !state->sortKeys->abbrev_converter || stup.isnull1)
        {
                /*
@@ -1333,7 +1394,7 @@ tuplesort_putindextuplevalues(Tuplesortstate *state, Relation rel,
 void
 tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull)
 {
-       MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
+       MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
        SortTuple       stup;
 
        /*
@@ -1348,7 +1409,7 @@ tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull)
         * identical to stup.tuple.
         */
 
-       if (isNull || state->datumTypeByVal)
+       if (isNull || !state->tuples)
        {
                /*
                 * Set datum1 to zeroed representation for NULLs (to be consistent, and
@@ -1357,6 +1418,7 @@ tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull)
                stup.datum1 = !isNull ? val : (Datum) 0;
                stup.isnull1 = isNull;
                stup.tuple = NULL;              /* no separate storage */
+               MemoryContextSwitchTo(state->sortcontext);
        }
        else
        {
@@ -1365,6 +1427,7 @@ tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull)
                stup.isnull1 = false;
                stup.tuple = DatumGetPointer(original);
                USEMEM(state, GetMemoryChunkSpace(stup.tuple));
+               MemoryContextSwitchTo(state->sortcontext);
 
                if (!state->sortKeys->abbrev_converter)
                {
@@ -1670,6 +1733,7 @@ tuplesort_performsort(Tuplesortstate *state)
  * Internal routine to fetch the next tuple in either forward or back
  * direction into *stup.  Returns FALSE if no more tuples.
  * If *should_free is set, the caller must pfree stup.tuple when done with it.
+ * Otherwise, caller should not use tuple following next call here.
  */
 static bool
 tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
@@ -1681,6 +1745,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
        {
                case TSS_SORTEDINMEM:
                        Assert(forward || state->randomAccess);
+                       Assert(!state->batchUsed);
                        *should_free = false;
                        if (forward)
                        {
@@ -1725,6 +1790,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
 
                case TSS_SORTEDONTAPE:
                        Assert(forward || state->randomAccess);
+                       Assert(!state->batchUsed);
                        *should_free = true;
                        if (forward)
                        {
@@ -1810,7 +1876,9 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
 
                case TSS_FINALMERGE:
                        Assert(forward);
-                       *should_free = true;
+                       Assert(state->batchUsed || !state->tuples);
+                       /* For now, assume tuple is stored in tape's batch memory */
+                       *should_free = false;
 
                        /*
                         * This code should match the inner loop of mergeonerun().
@@ -1818,18 +1886,17 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
                        if (state->memtupcount > 0)
                        {
                                int                     srcTape = state->memtuples[0].tupindex;
-                               Size            tuplen;
                                int                     tupIndex;
                                SortTuple  *newtup;
 
+                               /*
+                                * Returned tuple is still counted in our memory space most
+                                * of the time.  See mergebatchone() for discussion of why
+                                * caller may occasionally be required to free returned
+                                * tuple, and how preread memory is managed with regard to
+                                * edge cases more generally.
+                                */
                                *stup = state->memtuples[0];
-                               /* returned tuple is no longer counted in our memory space */
-                               if (stup->tuple)
-                               {
-                                       tuplen = GetMemoryChunkSpace(stup->tuple);
-                                       state->availMem += tuplen;
-                                       state->mergeavailmem[srcTape] += tuplen;
-                               }
                                tuplesort_heap_siftup(state, false);
                                if ((tupIndex = state->mergenext[srcTape]) == 0)
                                {
@@ -1837,15 +1904,25 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
                                         * out of preloaded data on this tape, try to read more
                                         *
                                         * Unlike mergeonerun(), we only preload from the single
-                                        * tape that's run dry.  See mergepreread() comments.
+                                        * tape that's run dry, though not before preparing its
+                                        * batch memory for a new round of sequential consumption.
+                                        * See mergepreread() comments.
                                         */
+                                       if (state->batchUsed)
+                                               mergebatchone(state, srcTape, stup, should_free);
+
                                        mergeprereadone(state, srcTape);
 
                                        /*
                                         * if still no data, we've reached end of run on this tape
                                         */
                                        if ((tupIndex = state->mergenext[srcTape]) == 0)
+                                       {
+                                               /* Free tape's buffer, avoiding dangling pointer */
+                                               if (state->batchUsed)
+                                                       mergebatchfreetape(state, srcTape, stup, should_free);
                                                return true;
+                                       }
                                }
                                /* pull next preread tuple from list, insert in heap */
                                newtup = &state->memtuples[tupIndex];
@@ -1912,6 +1989,8 @@ tuplesort_gettupleslot(Tuplesortstate *state, bool forward,
  * Fetch the next tuple in either forward or back direction.
  * Returns NULL if no more tuples.  If *should_free is set, the
  * caller must pfree the returned tuple when done with it.
+ * If it is not set, caller should not use tuple following next
+ * call here.
  */
 HeapTuple
 tuplesort_getheaptuple(Tuplesortstate *state, bool forward, bool *should_free)
@@ -1931,6 +2010,8 @@ tuplesort_getheaptuple(Tuplesortstate *state, bool forward, bool *should_free)
  * Fetch the next index tuple in either forward or back direction.
  * Returns NULL if no more tuples.  If *should_free is set, the
  * caller must pfree the returned tuple when done with it.
+ * If it is not set, caller should not use tuple following next
+ * call here.
  */
 IndexTuple
 tuplesort_getindextuple(Tuplesortstate *state, bool forward,
@@ -1979,7 +2060,7 @@ tuplesort_getdatum(Tuplesortstate *state, bool forward,
        if (state->sortKeys->abbrev_converter && abbrev)
                *abbrev = stup.datum1;
 
-       if (stup.isnull1 || state->datumTypeByVal)
+       if (stup.isnull1 || !state->tuples)
        {
                *val = stup.datum1;
                *isNull = stup.isnull1;
@@ -2162,6 +2243,10 @@ inittapes(Tuplesortstate *state)
        state->mergelast = (int *) palloc0(maxTapes * sizeof(int));
        state->mergeavailslots = (int *) palloc0(maxTapes * sizeof(int));
        state->mergeavailmem = (int64 *) palloc0(maxTapes * sizeof(int64));
+       state->mergetuples = (char **) palloc0(maxTapes * sizeof(char *));
+       state->mergecurrent = (char **) palloc0(maxTapes * sizeof(char *));
+       state->mergetail = (char **) palloc0(maxTapes * sizeof(char *));
+       state->mergeoverflow = (char **) palloc0(maxTapes * sizeof(char *));
        state->tp_fib = (int *) palloc0(maxTapes * sizeof(int));
        state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
        state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int));
@@ -2320,7 +2405,7 @@ mergeruns(Tuplesortstate *state)
                                /* Tell logtape.c we won't be writing anymore */
                                LogicalTapeSetForgetFreeSpace(state->tapeset);
                                /* Initialize for the final merge pass */
-                               beginmerge(state);
+                               beginmerge(state, state->tuples);
                                state->status = TSS_FINALMERGE;
                                return;
                        }
@@ -2412,7 +2497,7 @@ mergeonerun(Tuplesortstate *state)
         * Start the merge by loading one tuple from each active source tape into
         * the heap.  We can also decrease the input run/dummy run counts.
         */
-       beginmerge(state);
+       beginmerge(state, false);
 
        /*
         * Execute merge by repeatedly extracting lowest tuple in heap, writing it
@@ -2450,6 +2535,12 @@ mergeonerun(Tuplesortstate *state)
                state->mergeavailslots[srcTape]++;
        }
 
+       /*
+        * Reset tuple memory, now that no caller tuples are needed in memory.
+        * This prevents fragmentation.
+        */
+       MemoryContextReset(state->tuplecontext);
+
        /*
         * When the heap empties, we're done.  Write an end-of-run marker on the
         * output tape, and increment its count of real runs.
@@ -2471,9 +2562,12 @@ mergeonerun(Tuplesortstate *state)
  * which tapes contain active input runs in mergeactive[].  Then, load
  * as many tuples as we can from each active input tape, and finally
  * fill the merge heap with the first tuple from each active tape.
+ *
+ * finalMergeBatch indicates if this is the beginning of a final on-the-fly
+ * merge where a batched allocation of tuple memory is required.
  */
 static void
-beginmerge(Tuplesortstate *state)
+beginmerge(Tuplesortstate *state, bool finalMergeBatch)
 {
        int                     activeTapes;
        int                     tapenum;
@@ -2511,6 +2605,18 @@ beginmerge(Tuplesortstate *state)
        state->mergefreelist = 0;       /* nothing in the freelist */
        state->mergefirstfree = activeTapes;            /* 1st slot avail for preread */
 
+       if (finalMergeBatch)
+       {
+               /* Free outright buffers for tape never actually allocated */
+               FREEMEM(state, (state->maxTapes - activeTapes) * TAPE_BUFFER_OVERHEAD);
+
+               /*
+                * Grow memtuples one last time, since the palloc() overhead no longer
+                * incurred can make a big difference
+                */
+               batchmemtuples(state);
+       }
+
        /*
         * Initialize space allocation to let each active input tape have an equal
         * share of preread space.
@@ -2518,7 +2624,7 @@ beginmerge(Tuplesortstate *state)
        Assert(activeTapes > 0);
        slotsPerTape = (state->memtupsize - state->mergefirstfree) / activeTapes;
        Assert(slotsPerTape > 0);
-       spacePerTape = state->availMem / activeTapes;
+       spacePerTape = MAXALIGN_DOWN(state->availMem / activeTapes);
        for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
        {
                if (state->mergeactive[srcTape])
@@ -2528,6 +2634,15 @@ beginmerge(Tuplesortstate *state)
                }
        }
 
+       /*
+        * Preallocate tuple batch memory for each tape.  This is the memory used
+        * for tuples themselves (not SortTuples), so it's never used by
+        * pass-by-value datum sorts.  Memory allocation is performed here at most
+        * once per sort, just in advance of the final on-the-fly merge step.
+        */
+       if (finalMergeBatch)
+               mergebatch(state, spacePerTape);
+
        /*
         * Preread as many tuples as possible (and at least one) from each active
         * tape
@@ -2551,8 +2666,316 @@ beginmerge(Tuplesortstate *state)
                        tup->tupindex = state->mergefreelist;
                        state->mergefreelist = tupIndex;
                        state->mergeavailslots[srcTape]++;
+
+#ifdef TRACE_SORT
+                       if (trace_sort && finalMergeBatch)
+                       {
+                               int64           perTapeKB = (spacePerTape + 1023) / 1024;
+                               int64           usedSpaceKB;
+                               int                     usedSlots;
+
+                               /*
+                                * Report how effective batchmemtuples() was in balancing
+                                * the number of slots against the need for memory for the
+                                * underlying tuples (e.g. IndexTuples).  The big preread of
+                                * all tapes when switching to FINALMERGE state should be
+                                * fairly representative of memory utilization during the
+                                * final merge step, and in any case is the only point at
+                                * which all tapes are guaranteed to have depleted either
+                                * their batch memory allowance or slot allowance.  Ideally,
+                                * both will be completely depleted for every tape by now.
+                                */
+                               usedSpaceKB = (state->mergecurrent[srcTape] -
+                                                          state->mergetuples[srcTape] + 1023) / 1024;
+                               usedSlots = slotsPerTape - state->mergeavailslots[srcTape];
+
+                               elog(LOG, "tape %d initially used %ld KB of %ld KB batch "
+                                        "(%2.3f) and %d out of %d slots (%2.3f)", srcTape,
+                                        usedSpaceKB, perTapeKB,
+                                        (double) usedSpaceKB / (double) perTapeKB,
+                                        usedSlots, slotsPerTape,
+                                        (double) usedSlots / (double) slotsPerTape);
+                       }
+#endif
+               }
+       }
+}
+
+/*
+ * batchmemtuples - grow memtuples without palloc overhead
+ *
+ * When called, availMem should be approximately the amount of memory we'd
+ * require to allocate memtupsize - memtupcount tuples (not SortTuples/slots)
+ * that were allocated with palloc() overhead, and in doing so use up all
+ * allocated slots.  However, though slots and tuple memory is in balance
+ * following the last grow_memtuples() call, that's predicated on the observed
+ * average tuple size for the "final" grow_memtuples() call, which includes
+ * palloc overhead.
+ *
+ * This will perform an actual final grow_memtuples() call without any palloc()
+ * overhead, rebalancing the use of memory between slots and tuples.
+ */
+static void
+batchmemtuples(Tuplesortstate *state)
+{
+       int64                   refund;
+       int64                   availMemLessRefund;
+       int                             memtupsize = state->memtupsize;
+
+       /* For simplicity, assume no memtuples are actually currently counted */
+       Assert(state->memtupcount == 0);
+
+       /*
+        * Refund STANDARDCHUNKHEADERSIZE per tuple.
+        *
+        * This sometimes fails to make memory use prefectly balanced, but it
+        * should never make the situation worse.  Note that Assert-enabled builds
+        * get a larger refund, due to a varying STANDARDCHUNKHEADERSIZE.
+        */
+       refund = memtupsize * STANDARDCHUNKHEADERSIZE;
+       availMemLessRefund = state->availMem - refund;
+
+       /*
+        * To establish balanced memory use after refunding palloc overhead,
+        * temporarily have our accounting indicate that we've allocated all
+        * memory we're allowed to less that refund, and call grow_memtuples()
+        * to have it increase the number of slots.
+        */
+       state->growmemtuples = true;
+       USEMEM(state, availMemLessRefund);
+       (void) grow_memtuples(state);
+       /* Should not matter, but be tidy */
+       FREEMEM(state, availMemLessRefund);
+       state->growmemtuples = false;
+
+#ifdef TRACE_SORT
+       if (trace_sort)
+       {
+               Size    OldKb = (memtupsize * sizeof(SortTuple) + 1023) / 1024;
+               Size    NewKb = (state->memtupsize * sizeof(SortTuple) + 1023) / 1024;
+
+               elog(LOG, "grew memtuples %1.2fx from %d (%zu KB) to %d (%zu KB) for final merge",
+                        (double) NewKb / (double) OldKb,
+                        memtupsize, OldKb,
+                        state->memtupsize, NewKb);
+       }
+#endif
+}
+
+/*
+ * mergebatch - initialize tuple memory in batch
+ *
+ * This allows sequential access to sorted tuples buffered in memory from
+ * tapes/runs on disk during a final on-the-fly merge step.  Note that the
+ * memory is not used for SortTuples, but for the underlying tuples (e.g.
+ * MinimalTuples).
+ *
+ * Note that when batch memory is used, there is a simple division of space
+ * into large buffers (one per active tape).  The conventional incremental
+ * memory accounting (calling USEMEM() and FREEMEM()) is abandoned.  Instead,
+ * when each tape's memory budget is exceeded, a retail palloc() "overflow" is
+ * performed, which is then immediately detected in a way that is analogous to
+ * LACKMEM().  This keeps each tape's use of memory fair, which is always a
+ * goal.
+ */
+static void
+mergebatch(Tuplesortstate *state, int64 spacePerTape)
+{
+       int             srcTape;
+
+       Assert(state->activeTapes > 0);
+       Assert(state->tuples);
+
+       /*
+        * For the purposes of tuplesort's memory accounting, the batch allocation
+        * is special, and regular memory accounting through USEMEM() calls is
+        * abandoned (see mergeprereadone()).
+        */
+       for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
+       {
+               char       *mergetuples;
+
+               if (!state->mergeactive[srcTape])
+                       continue;
+
+               /* Allocate buffer for each active tape */
+               mergetuples = MemoryContextAllocHuge(state->tuplecontext,
+                                                                                        spacePerTape);
+
+               /* Initialize state for tape */
+               state->mergetuples[srcTape] = mergetuples;
+               state->mergecurrent[srcTape] = mergetuples;
+               state->mergetail[srcTape] = mergetuples;
+               state->mergeoverflow[srcTape] = NULL;
+       }
+
+       state->batchUsed = true;
+       state->spacePerTape = spacePerTape;
+}
+
+/*
+ * mergebatchone - prepare batch memory for one merge input tape
+ *
+ * This is called following the exhaustion of preread tuples for one input
+ * tape.  All that actually occurs is that the state for the source tape is
+ * reset to indicate that all memory may be reused.
+ *
+ * This routine must deal with fixing up the tuple that is about to be returned
+ * to the client, due to "overflow" allocations.
+ */
+static void
+mergebatchone(Tuplesortstate *state, int srcTape, SortTuple *rtup,
+                         bool *should_free)
+{
+       Assert(state->batchUsed);
+
+       /*
+        * Tuple about to be returned to caller ("stup") is final preread tuple
+        * from tape, just removed from the top of the heap.  Special steps around
+        * memory management must be performed for that tuple, to make sure it
+        * isn't overwritten early.
+        */
+       if (!state->mergeoverflow[srcTape])
+       {
+               Size    tupLen;
+
+               /*
+                * Mark tuple buffer range for reuse, but be careful to move final,
+                * tail tuple to start of space for next run so that it's available
+                * to caller when stup is returned, and remains available at least
+                * until the next tuple is requested.
+                */
+               tupLen = state->mergecurrent[srcTape] - state->mergetail[srcTape];
+               state->mergecurrent[srcTape] = state->mergetuples[srcTape];
+               memmove(state->mergecurrent[srcTape], state->mergetail[srcTape],
+                               tupLen);
+
+               /* Make SortTuple at top of the merge heap point to new tuple */
+               rtup->tuple = (void *) state->mergecurrent[srcTape];
+
+               state->mergetail[srcTape] = state->mergecurrent[srcTape];
+               state->mergecurrent[srcTape] += tupLen;
+       }
+       else
+       {
+               /*
+                * Handle an "overflow" retail palloc.
+                *
+                * This is needed when we run out of tuple memory for the tape.
+                */
+               state->mergecurrent[srcTape] = state->mergetuples[srcTape];
+               state->mergetail[srcTape] = state->mergetuples[srcTape];
+
+               if (rtup->tuple)
+               {
+                       Assert(rtup->tuple == (void *) state->mergeoverflow[srcTape]);
+                       /* Caller should free palloc'd tuple */
+                       *should_free = true;
                }
+               state->mergeoverflow[srcTape] = NULL;
+       }
+}
+
+/*
+ * mergebatchfreetape - handle final clean-up for batch memory once tape is
+ * about to become exhausted
+ *
+ * All tuples are returned from tape, but a single final tuple, *rtup, is to be
+ * passed back to caller.  Free tape's batch allocation buffer while ensuring
+ * that the final tuple is managed appropriately.
+ */
+static void
+mergebatchfreetape(Tuplesortstate *state, int srcTape, SortTuple *rtup,
+                                  bool *should_free)
+{
+       Assert(state->batchUsed);
+       Assert(state->status == TSS_FINALMERGE);
+
+       /*
+        * Tuple may or may not already be an overflow allocation from
+        * mergebatchone()
+        */
+       if (!*should_free && rtup->tuple)
+       {
+               /*
+                * Final tuple still in tape's batch allocation.
+                *
+                * Return palloc()'d copy to caller, and have it freed in a similar
+                * manner to overflow allocation.  Otherwise, we'd free batch memory
+                * and pass back a pointer to garbage.  Note that we deliberately
+                * allocate this in the parent tuplesort context, to be on the safe
+                * side.
+                */
+               Size            tuplen;
+               void       *oldTuple = rtup->tuple;
+
+               tuplen = state->mergecurrent[srcTape] - state->mergetail[srcTape];
+               rtup->tuple = MemoryContextAlloc(state->sortcontext, tuplen);
+               memcpy(rtup->tuple, oldTuple, tuplen);
+               *should_free = true;
+       }
+
+       /* Free spacePerTape-sized buffer */
+       pfree(state->mergetuples[srcTape]);
+}
+
+/*
+ * mergebatchalloc - allocate memory for one tuple using a batch memory
+ * "logical allocation".
+ *
+ * This is used for the final on-the-fly merge phase only.  READTUP() routines
+ * receive memory from here in place of palloc() and USEMEM() calls.
+ *
+ * Tuple tapenum is passed, ensuring each tape's tuples are stored in sorted,
+ * contiguous order (while allowing safe reuse of memory made available to
+ * each tape).  This maximizes locality of access as tuples are returned by
+ * final merge.
+ *
+ * Caller must not subsequently attempt to free memory returned here.  In
+ * general, only mergebatch* functions know about how memory returned from
+ * here should be freed, and this function's caller must ensure that batch
+ * memory management code will definitely have the opportunity to do the right
+ * thing during the final on-the-fly merge.
+ */
+static void *
+mergebatchalloc(Tuplesortstate *state, int tapenum, Size tuplen)
+{
+       Size            reserve_tuplen = MAXALIGN(tuplen);
+       char       *ret;
+
+       /* Should overflow at most once before mergebatchone() call: */
+       Assert(state->mergeoverflow[tapenum] == NULL);
+       Assert(state->batchUsed);
+
+       /* It should be possible to use precisely spacePerTape memory at once */
+       if (state->mergecurrent[tapenum] + reserve_tuplen <=
+               state->mergetuples[tapenum] + state->spacePerTape)
+       {
+               /*
+                * Usual case -- caller is returned pointer into its tape's buffer, and
+                * an offset from that point is recorded as where tape has consumed up
+                * to for current round of preloading.
+                */
+               ret = state->mergetail[tapenum] = state->mergecurrent[tapenum];
+               state->mergecurrent[tapenum] += reserve_tuplen;
        }
+       else
+       {
+               /*
+                * Allocate memory, and record as tape's overflow allocation.  This
+                * will be detected quickly, in a similar fashion to a LACKMEM()
+                * condition, and should not happen again before a new round of
+                * preloading for caller's tape.  Note that we deliberately allocate
+                * this in the parent tuplesort context, to be on the safe side.
+                *
+                * Sometimes, this does not happen because merging runs out of slots
+                * before running out of memory.
+                */
+               ret = state->mergeoverflow[tapenum] =
+                       MemoryContextAlloc(state->sortcontext, tuplen);
+       }
+
+       return ret;
 }
 
 /*
@@ -2576,7 +2999,9 @@ beginmerge(Tuplesortstate *state)
  * that state and so no point in scanning through all the tapes to fix one.
  * (Moreover, there may be quite a lot of inactive tapes in that state, since
  * we might have had many fewer runs than tapes.  In a regular tape-to-tape
- * merge we can expect most of the tapes to be active.)
+ * merge we can expect most of the tapes to be active.  Plus, only
+ * FINALMERGE state has to consider memory management for a batch
+ * allocation.)
  */
 static void
 mergepreread(Tuplesortstate *state)
@@ -2605,9 +3030,20 @@ mergeprereadone(Tuplesortstate *state, int srcTape)
 
        if (!state->mergeactive[srcTape])
                return;                                 /* tape's run is already exhausted */
+
+       /*
+        * Manage per-tape availMem.  Only actually matters when batch memory not
+        * in use.
+        */
        priorAvail = state->availMem;
        state->availMem = state->mergeavailmem[srcTape];
-       while ((state->mergeavailslots[srcTape] > 0 && !LACKMEM(state)) ||
+
+       /*
+        * When batch memory is used if final on-the-fly merge, only mergeoverflow
+        * test is relevant; otherwise, only LACKMEM() test is relevant.
+        */
+       while ((state->mergeavailslots[srcTape] > 0 &&
+                       state->mergeoverflow[srcTape] == NULL && !LACKMEM(state)) ||
                   state->mergenext[srcTape] == 0)
        {
                /* read next tuple, if any */
@@ -3093,6 +3529,42 @@ markrunend(Tuplesortstate *state, int tapenum)
        LogicalTapeWrite(state->tapeset, tapenum, (void *) &len, sizeof(len));
 }
 
+/*
+ * Get memory for tuple from within READTUP() routine.  Allocate
+ * memory and account for that, or consume from tape's batch
+ * allocation.
+ *
+ * Memory returned here in the final on-the-fly merge case is recycled
+ * from tape's batch allocation.  Otherwise, callers must pfree() or
+ * reset tuple child memory context, and account for that with a
+ * FREEMEM().  Currently, this only ever needs to happen in WRITETUP()
+ * routines.
+ */
+static void *
+readtup_alloc(Tuplesortstate *state, int tapenum, Size tuplen)
+{
+       if (state->batchUsed)
+       {
+               /*
+                * No USEMEM() call, because during final on-the-fly merge
+                * accounting is based on tape-private state. ("Overflow"
+                * allocations are detected as an indication that a new round
+                * or preloading is required. Preloading marks existing
+                * contents of tape's batch buffer for reuse.)
+                */
+               return mergebatchalloc(state, tapenum, tuplen);
+       }
+       else
+       {
+               char       *ret;
+
+               /* Batch allocation yet to be performed */
+               ret = MemoryContextAlloc(state->tuplecontext, tuplen);
+               USEMEM(state, GetMemoryChunkSpace(ret));
+               return ret;
+       }
+}
+
 
 /*
  * Routines specialized for HeapTuple (actually MinimalTuple) case
@@ -3171,6 +3643,7 @@ copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup)
        Datum           original;
        MinimalTuple tuple;
        HeapTupleData htup;
+       MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
 
        /* copy the tuple into sort storage */
        tuple = ExecCopySlotMinimalTuple(slot);
@@ -3184,6 +3657,8 @@ copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup)
                                                        state->tupDesc,
                                                        &stup->isnull1);
 
+       MemoryContextSwitchTo(oldcontext);
+
        if (!state->sortKeys->abbrev_converter || stup->isnull1)
        {
                /*
@@ -3266,11 +3741,10 @@ readtup_heap(Tuplesortstate *state, SortTuple *stup,
 {
        unsigned int tupbodylen = len - sizeof(int);
        unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET;
-       MinimalTuple tuple = (MinimalTuple) palloc(tuplen);
+       MinimalTuple tuple = (MinimalTuple) readtup_alloc(state, tapenum, tuplen);
        char       *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
        HeapTupleData htup;
 
-       USEMEM(state, GetMemoryChunkSpace(tuple));
        /* read in the tuple proper */
        tuple->t_len = tuplen;
        LogicalTapeReadExact(state->tapeset, tapenum,
@@ -3409,12 +3883,15 @@ copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup)
 {
        HeapTuple       tuple = (HeapTuple) tup;
        Datum           original;
+       MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
 
        /* copy the tuple into sort storage */
        tuple = heap_copytuple(tuple);
        stup->tuple = (void *) tuple;
        USEMEM(state, GetMemoryChunkSpace(tuple));
 
+       MemoryContextSwitchTo(oldcontext);
+
        /*
         * set up first-column key value, and potentially abbreviate, if it's a
         * simple column
@@ -3501,9 +3978,10 @@ readtup_cluster(Tuplesortstate *state, SortTuple *stup,
                                int tapenum, unsigned int tuplen)
 {
        unsigned int t_len = tuplen - sizeof(ItemPointerData) - sizeof(int);
-       HeapTuple       tuple = (HeapTuple) palloc(t_len + HEAPTUPLESIZE);
+       HeapTuple       tuple = (HeapTuple) readtup_alloc(state,
+                                                                                                 tapenum,
+                                                                                                 t_len + HEAPTUPLESIZE);
 
-       USEMEM(state, GetMemoryChunkSpace(tuple));
        /* Reconstruct the HeapTupleData header */
        tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
        tuple->t_len = t_len;
@@ -3722,7 +4200,7 @@ copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup)
        Datum           original;
 
        /* copy the tuple into sort storage */
-       newtuple = (IndexTuple) palloc(tuplen);
+       newtuple = (IndexTuple) MemoryContextAlloc(state->tuplecontext, tuplen);
        memcpy(newtuple, tuple, tuplen);
        USEMEM(state, GetMemoryChunkSpace(newtuple));
        stup->tuple = (void *) newtuple;
@@ -3804,9 +4282,8 @@ readtup_index(Tuplesortstate *state, SortTuple *stup,
                          int tapenum, unsigned int len)
 {
        unsigned int tuplen = len - sizeof(unsigned int);
-       IndexTuple      tuple = (IndexTuple) palloc(tuplen);
+       IndexTuple      tuple = (IndexTuple) readtup_alloc(state, tapenum, tuplen);
 
-       USEMEM(state, GetMemoryChunkSpace(tuple));
        LogicalTapeReadExact(state->tapeset, tapenum,
                                                 tuple, tuplen);
        if (state->randomAccess)        /* need trailing length word? */
@@ -3864,7 +4341,7 @@ writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup)
                waddr = NULL;
                tuplen = 0;
        }
-       else if (state->datumTypeByVal)
+       else if (!state->tuples)
        {
                waddr = &stup->datum1;
                tuplen = sizeof(Datum);
@@ -3906,7 +4383,7 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup,
                stup->isnull1 = true;
                stup->tuple = NULL;
        }
-       else if (state->datumTypeByVal)
+       else if (!state->tuples)
        {
                Assert(tuplen == sizeof(Datum));
                LogicalTapeReadExact(state->tapeset, tapenum,
@@ -3916,14 +4393,13 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup,
        }
        else
        {
-               void       *raddr = palloc(tuplen);
+               void       *raddr = readtup_alloc(state, tapenum, tuplen);
 
                LogicalTapeReadExact(state->tapeset, tapenum,
                                                         raddr, tuplen);
                stup->datum1 = PointerGetDatum(raddr);
                stup->isnull1 = false;
                stup->tuple = raddr;
-               USEMEM(state, GetMemoryChunkSpace(raddr));
        }
 
        if (state->randomAccess)        /* need trailing length word? */