]> granicus.if.org Git - postgresql/blobdiff - src/backend/utils/sort/tuplesort.c
Make heap TID a tiebreaker nbtree index column.
[postgresql] / src / backend / utils / sort / tuplesort.c
index dbedc270537f536de1b12ef0c41e239217e62e33..3eebd9ef51cf4ead7da7490c2af4a9149f1e76f4 100644 (file)
  * algorithm.
  *
  * See Knuth, volume 3, for more than you want to know about the external
- * sorting algorithm.  We divide the input into sorted runs using replacement
- * selection, in the form of a priority tree implemented as a heap
- * (essentially his Algorithm 5.2.3H), then merge the runs using polyphase
- * merge, Knuth's Algorithm 5.4.2D.  The logical "tapes" used by Algorithm D
- * are implemented by logtape.c, which avoids space wastage by recycling
- * disk space as soon as each block is read from its "tape".
- *
- * We do not form the initial runs using Knuth's recommended replacement
- * selection data structure (Algorithm 5.4.1R), because it uses a fixed
- * number of records in memory at all times.  Since we are dealing with
- * tuples that may vary considerably in size, we want to be able to vary
- * the number of records kept in memory to ensure full utilization of the
- * allowed sort memory space.  So, we keep the tuples in a variable-size
- * heap, with the next record to go out at the top of the heap.  Like
- * Algorithm 5.4.1R, each record is stored with the run number that it
- * must go into, and we use (run number, key) as the ordering key for the
- * heap.  When the run number at the top of the heap changes, we know that
- * no more records of the prior run are left in the heap.
+ * sorting algorithm.  Historically, we divided the input into sorted runs
+ * using replacement selection, in the form of a priority tree implemented
+ * as a heap (essentially his Algorithm 5.2.3H), but now we always use
+ * quicksort for run generation.  We merge the runs using polyphase merge,
+ * Knuth's Algorithm 5.4.2D.  The logical "tapes" used by Algorithm D are
+ * implemented by logtape.c, which avoids space wastage by recycling disk
+ * space as soon as each block is read from its "tape".
  *
  * The approximate amount of memory allowed for any one sort operation
  * is specified in kilobytes by the caller (most pass work_mem).  Initially,
  * we haven't exceeded workMem.  If we reach the end of the input without
  * exceeding workMem, we sort the array using qsort() and subsequently return
  * tuples just by scanning the tuple array sequentially.  If we do exceed
- * workMem, we construct a heap using Algorithm H and begin to emit tuples
- * into sorted runs in temporary tapes, emitting just enough tuples at each
- * step to get back within the workMem limit.  Whenever the run number at
- * the top of the heap changes, we begin a new run with a new output tape
- * (selected per Algorithm D).  After the end of the input is reached,
- * we dump out remaining tuples in memory into a final run (or two),
+ * workMem, we begin to emit tuples into sorted runs in temporary tapes.
+ * When tuples are dumped in batch after quicksorting, we begin a new run
+ * with a new output tape (selected per Algorithm D).  After the end of the
+ * input is reached, we dump out remaining tuples in memory into a final run,
  * then merge the runs using Algorithm D.
  *
  * When merging runs, we use a heap containing just the frontmost tuple from
- * each source run; we repeatedly output the smallest tuple and insert the
- * next tuple from its source tape (if any).  When the heap empties, the merge
- * is complete.  The basic merge algorithm thus needs very little memory ---
- * only M tuples for an M-way merge, and M is constrained to a small number.
- * However, we can still make good use of our full workMem allocation by
- * pre-reading additional tuples from each source tape.  Without prereading,
- * our access pattern to the temporary file would be very erratic; on average
- * we'd read one block from each of M source tapes during the same time that
- * we're writing M blocks to the output tape, so there is no sequentiality of
- * access at all, defeating the read-ahead methods used by most Unix kernels.
- * Worse, the output tape gets written into a very random sequence of blocks
- * of the temp file, ensuring that things will be even worse when it comes
- * time to read that tape.  A straightforward merge pass thus ends up doing a
- * lot of waiting for disk seeks.  We can improve matters by prereading from
- * each source tape sequentially, loading about workMem/M bytes from each tape
- * in turn.  Then we run the merge algorithm, writing but not reading until
- * one of the preloaded tuple series runs out.  Then we switch back to preread
- * mode, fill memory again, and repeat.  This approach helps to localize both
- * read and write accesses.
+ * each source run; we repeatedly output the smallest tuple and replace it
+ * with the next tuple from its source tape (if any).  When the heap empties,
+ * the merge is complete.  The basic merge algorithm thus needs very little
+ * memory --- only M tuples for an M-way merge, and M is constrained to a
+ * small number.  However, we can still make good use of our full workMem
+ * allocation by pre-reading additional blocks from each source tape.  Without
+ * prereading, our access pattern to the temporary file would be very erratic;
+ * on average we'd read one block from each of M source tapes during the same
+ * time that we're writing M blocks to the output tape, so there is no
+ * sequentiality of access at all, defeating the read-ahead methods used by
+ * most Unix kernels.  Worse, the output tape gets written into a very random
+ * sequence of blocks of the temp file, ensuring that things will be even
+ * worse when it comes time to read that tape.  A straightforward merge pass
+ * thus ends up doing a lot of waiting for disk seeks.  We can improve matters
+ * by prereading from each source tape sequentially, loading about workMem/M
+ * bytes from each tape in turn, and making the sequential blocks immediately
+ * available for reuse.  This approach helps to localize both read and write
+ * accesses.  The pre-reading is handled by logtape.c, we just tell it how
+ * much memory to use for the buffers.
  *
  * When the caller requests random access to the sort result, we form
  * the final sorted run on a logical tape which is then "frozen", so
  * code we determine the number of tapes M on the basis of workMem: we want
  * workMem/M to be large enough that we read a fair amount of data each time
  * we preread from a tape, so as to maintain the locality of access described
- * above.  Nonetheless, with large workMem we can have many tapes.
+ * above.  Nonetheless, with large workMem we can have many tapes (but not
+ * too many -- see the comments in tuplesort_merge_order).
+ *
+ * This module supports parallel sorting.  Parallel sorts involve coordination
+ * among one or more worker processes, and a leader process, each with its own
+ * tuplesort state.  The leader process (or, more accurately, the
+ * Tuplesortstate associated with a leader process) creates a full tapeset
+ * consisting of worker tapes with one run to merge; a run for every
+ * worker process.  This is then merged.  Worker processes are guaranteed to
+ * produce exactly one output run from their partial input.
  *
  *
- * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
 
 #include <limits.h>
 
+#include "access/hash.h"
 #include "access/htup_details.h"
 #include "access/nbtree.h"
 #include "catalog/index.h"
 #define DATUM_SORT             2
 #define CLUSTER_SORT   3
 
+/* Sort parallel code from state for sort__start probes */
+#define PARALLEL_SORT(state)   ((state)->shared == NULL ? 0 : \
+                                                                (state)->worker >= 0 ? 1 : 2)
+
 /* GUC variables */
 #ifdef TRACE_SORT
 bool           trace_sort = false;
@@ -138,9 +139,9 @@ bool                optimize_bounded_sort = true;
  * The objects we actually sort are SortTuple structs.  These contain
  * a pointer to the tuple proper (might be a MinimalTuple or IndexTuple),
  * which is a separate palloc chunk --- we assume it is just one chunk and
- * can be freed by a simple pfree() (except during final on-the-fly merge,
- * when memory is used in batch).  SortTuples also contain the tuple's
- * first key column in Datum/nullflag format, and an index integer.
+ * can be freed by a simple pfree() (except during merge, when we use a
+ * simple slab allocator).  SortTuples also contain the tuple's first key
+ * column in Datum/nullflag format, and an index integer.
  *
  * Storing the first key column lets us save heap_getattr or index_getattr
  * calls during tuple comparisons.  We could extract and save all the key
@@ -162,20 +163,35 @@ bool              optimize_bounded_sort = true;
  * described above.  Accordingly, "tuple" is always used in preference to
  * datum1 as the authoritative value for pass-by-reference cases.
  *
- * While building initial runs, tupindex holds the tuple's run number.  During
- * merge passes, we re-use it to hold the input tape number that each tuple in
- * the heap was read from, or to hold the index of the next tuple pre-read
- * from the same tape in the case of pre-read entries.  tupindex goes unused
- * if the sort occurs entirely in memory.
+ * tupindex holds the input tape number that each tuple in the heap was read
+ * from during merge passes.
  */
 typedef struct
 {
-       void       *tuple;                      /* the tuple proper */
+       void       *tuple;                      /* the tuple itself */
        Datum           datum1;                 /* value of first key column */
        bool            isnull1;                /* is first key column NULL? */
        int                     tupindex;               /* see notes above */
 } SortTuple;
 
+/*
+ * During merge, we use a pre-allocated set of fixed-size slots to hold
+ * tuples.  To avoid palloc/pfree overhead.
+ *
+ * Merge doesn't require a lot of memory, so we can afford to waste some,
+ * by using gratuitously-sized slots.  If a tuple is larger than 1 kB, the
+ * palloc() overhead is not significant anymore.
+ *
+ * 'nextfree' is valid when this chunk is in the free list.  When in use, the
+ * slot holds a tuple.
+ */
+#define SLAB_SLOT_SIZE 1024
+
+typedef union SlabSlot
+{
+       union SlabSlot *nextfree;
+       char            buffer[SLAB_SLOT_SIZE];
+} SlabSlot;
 
 /*
  * Possible states of a Tuplesort object.  These denote the states that
@@ -195,19 +211,20 @@ typedef enum
  * Parameters for calculation of number of tapes to use --- see inittapes()
  * and tuplesort_merge_order().
  *
- * In this calculation we assume that each tape will cost us about 3 blocks
- * worth of buffer space (which is an underestimate for very large data
- * volumes, but it's probably close enough --- see logtape.c).
+ * In this calculation we assume that each tape will cost us about 1 blocks
+ * worth of buffer space.  This ignores the overhead of all the other data
+ * structures needed for each tape, but it's probably close enough.
  *
  * MERGE_BUFFER_SIZE is how much data we'd like to read from each input
  * tape during a preread cycle (see discussion at top of file).
  */
 #define MINORDER               6               /* minimum merge order */
-#define TAPE_BUFFER_OVERHEAD           (BLCKSZ * 3)
+#define MAXORDER               500             /* maximum merge order */
+#define TAPE_BUFFER_OVERHEAD           BLCKSZ
 #define MERGE_BUFFER_SIZE                      (BLCKSZ * 32)
 
 typedef int (*SortTupleComparator) (const SortTuple *a, const SortTuple *b,
-                                                                                               Tuplesortstate *state);
+                                                                       Tuplesortstate *state);
 
 /*
  * Private state of a Tuplesort operation.
@@ -227,7 +244,7 @@ struct Tuplesortstate
        int                     maxTapes;               /* number of tapes (Knuth's T) */
        int                     tapeRange;              /* maxTapes-1 (Knuth's P) */
        MemoryContext sortcontext;      /* memory context holding most sort data */
-       MemoryContext tuplecontext;     /* sub-context of sortcontext for tuple data */
+       MemoryContext tuplecontext; /* sub-context of sortcontext for tuple data */
        LogicalTapeSet *tapeset;        /* logtape.c object for tapes in a temp file */
 
        /*
@@ -252,31 +269,28 @@ struct Tuplesortstate
        /*
         * Function to write a stored tuple onto tape.  The representation of the
         * tuple on tape need not be the same as it is in memory; requirements on
-        * the tape representation are given below.  After writing the tuple,
-        * pfree() the out-of-line data (not the SortTuple struct!), and increase
-        * state->availMem by the amount of memory space thereby released.
+        * the tape representation are given below.  Unless the slab allocator is
+        * used, after writing the tuple, pfree() the out-of-line data (not the
+        * SortTuple struct!), and increase state->availMem by the amount of
+        * memory space thereby released.
         */
        void            (*writetup) (Tuplesortstate *state, int tapenum,
-                                                                                SortTuple *stup);
+                                                        SortTuple *stup);
 
        /*
         * Function to read a stored tuple from tape back into memory. 'len' is
-        * the already-read length of the stored tuple.  Create a palloc'd copy,
-        * initialize tuple/datum1/isnull1 in the target SortTuple struct, and
-        * decrease state->availMem by the amount of memory space consumed.
+        * the already-read length of the stored tuple.  The tuple is allocated
+        * from the slab memory arena, or is palloc'd, see readtup_alloc().
         */
        void            (*readtup) (Tuplesortstate *state, SortTuple *stup,
-                                                                               int tapenum, unsigned int len);
+                                                       int tapenum, unsigned int len);
 
        /*
         * This array holds the tuples now in sort memory.  If we are in state
         * INITIAL, the tuples are in no particular order; if we are in state
         * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
         * and FINALMERGE, the tuples are organized in "heap" order per Algorithm
-        * H.  (Note that memtupcount only counts the tuples that are part of the
-        * heap --- during merge passes, memtuples[] entries beyond tapeRange are
-        * never in the heap and are used to hold pre-read tuples.)  In state
-        * SORTEDONTAPE, the array is not used.
+        * H.  In state SORTEDONTAPE, the array is not used.
         */
        SortTuple  *memtuples;          /* array of SortTuple structs */
        int                     memtupcount;    /* number of tuples currently present */
@@ -284,17 +298,52 @@ struct Tuplesortstate
        bool            growmemtuples;  /* memtuples' growth still underway? */
 
        /*
-        * Memory for tuples is sometimes allocated in batch, rather than
-        * incrementally.  This implies that incremental memory accounting has been
-        * abandoned.  Currently, this only happens for the final on-the-fly merge
-        * step.  Large batch allocations can store tuples (e.g. IndexTuples)
-        * without palloc() fragmentation and other overhead.
+        * Memory for tuples is sometimes allocated using a simple slab allocator,
+        * rather than with palloc().  Currently, we switch to slab allocation
+        * when we start merging.  Merging only needs to keep a small, fixed
+        * number of tuples in memory at any time, so we can avoid the
+        * palloc/pfree overhead by recycling a fixed number of fixed-size slots
+        * to hold the tuples.
+        *
+        * For the slab, we use one large allocation, divided into SLAB_SLOT_SIZE
+        * slots.  The allocation is sized to have one slot per tape, plus one
+        * additional slot.  We need that many slots to hold all the tuples kept
+        * in the heap during merge, plus the one we have last returned from the
+        * sort, with tuplesort_gettuple.
+        *
+        * Initially, all the slots are kept in a linked list of free slots.  When
+        * a tuple is read from a tape, it is put to the next available slot, if
+        * it fits.  If the tuple is larger than SLAB_SLOT_SIZE, it is palloc'd
+        * instead.
+        *
+        * When we're done processing a tuple, we return the slot back to the free
+        * list, or pfree() if it was palloc'd.  We know that a tuple was
+        * allocated from the slab, if its pointer value is between
+        * slabMemoryBegin and -End.
+        *
+        * When the slab allocator is used, the USEMEM/LACKMEM mechanism of
+        * tracking memory usage is not used.
+        */
+       bool            slabAllocatorUsed;
+
+       char       *slabMemoryBegin;    /* beginning of slab memory arena */
+       char       *slabMemoryEnd;      /* end of slab memory arena */
+       SlabSlot   *slabFreeHead;       /* head of free list */
+
+       /* Buffer size to use for reading input tapes, during merge. */
+       size_t          read_buffer_size;
+
+       /*
+        * When we return a tuple to the caller in tuplesort_gettuple_XXX, that
+        * came from a tape (that is, in TSS_SORTEDONTAPE or TSS_FINALMERGE
+        * modes), we remember the tuple in 'lastReturnedTuple', so that we can
+        * recycle the memory on next gettuple call.
         */
-       bool            batchUsed;
+       void       *lastReturnedTuple;
 
        /*
-        * While building initial runs, this is the current output run number
-        * (starting at 0).  Afterwards, it is the number of initial runs we made.
+        * While building initial runs, this is the current output run number.
+        * Afterwards, it is the number of initial runs we made.
         */
        int                     currentRun;
 
@@ -304,42 +353,11 @@ struct Tuplesortstate
         */
 
        /*
-        * These variables are only used during merge passes.  mergeactive[i] is
-        * true if we are reading an input run from (actual) tape number i and
-        * have not yet exhausted that run.  mergenext[i] is the memtuples index
-        * of the next pre-read tuple (next to be loaded into the heap) for tape
-        * i, or 0 if we are out of pre-read tuples.  mergelast[i] similarly
-        * points to the last pre-read tuple from each tape.  mergeavailslots[i]
-        * is the number of unused memtuples[] slots reserved for tape i, and
-        * mergeavailmem[i] is the amount of unused space allocated for tape i.
-        * mergefreelist and mergefirstfree keep track of unused locations in the
-        * memtuples[] array.  The memtuples[].tupindex fields link together
-        * pre-read tuples for each tape as well as recycled locations in
-        * mergefreelist. It is OK to use 0 as a null link in these lists, because
-        * memtuples[0] is part of the merge heap and is never a pre-read tuple.
+        * This variable is only used during merge passes.  mergeactive[i] is true
+        * if we are reading an input run from (actual) tape number i and have not
+        * yet exhausted that run.
         */
        bool       *mergeactive;        /* active input run source? */
-       int                *mergenext;          /* first preread tuple for each source */
-       int                *mergelast;          /* last preread tuple for each source */
-       int                *mergeavailslots;    /* slots left for prereading each tape */
-       int64      *mergeavailmem;      /* availMem for prereading each tape */
-       int                     mergefreelist;  /* head of freelist of recycled slots */
-       int                     mergefirstfree; /* first slot never used in this merge */
-
-       /*
-        * Per-tape batch state, when final on-the-fly merge consumes memory from
-        * just a few large allocations.
-        *
-        * Aside from the general benefits of performing fewer individual retail
-        * palloc() calls, this also helps make merging more cache efficient, since
-        * each tape's tuples must naturally be accessed sequentially (in sorted
-        * order).
-        */
-       int64           spacePerTape;   /* Space (memory) for tuples (not slots) */
-       char      **mergetuples;        /* Each tape's memory allocation */
-       char      **mergecurrent;       /* Current offset into each tape's memory */
-       char      **mergetail;          /* Last item's start point for each tape */
-       char      **mergeoverflow;      /* Retail palloc() "overflow" for each tape */
 
        /*
         * Variables for Algorithm D.  Note that destTape is a "logical" tape
@@ -368,6 +386,25 @@ struct Tuplesortstate
        int                     markpos_offset; /* saved "current", or offset in tape block */
        bool            markpos_eof;    /* saved "eof_reached" */
 
+       /*
+        * These variables are used during parallel sorting.
+        *
+        * worker is our worker identifier.  Follows the general convention that
+        * -1 value relates to a leader tuplesort, and values >= 0 worker
+        * tuplesorts. (-1 can also be a serial tuplesort.)
+        *
+        * shared is mutable shared memory state, which is used to coordinate
+        * parallel sorts.
+        *
+        * nParticipants is the number of worker Tuplesortstates known by the
+        * leader to have actually been launched, which implies that they must
+        * finish a run leader can merge.  Typically includes a worker state held
+        * by the leader process itself.  Set in the leader Tuplesortstate only.
+        */
+       int                     worker;
+       Sharedsort *shared;
+       int                     nParticipants;
+
        /*
         * The sortKeys variable is used by every case other than the hash index
         * case; it is set by tuplesort_begin_xxx.  tupDesc is only used by the
@@ -409,7 +446,9 @@ struct Tuplesortstate
        bool            enforceUnique;  /* complain if we find duplicate tuples */
 
        /* These are specific to the index_hash subcase: */
-       uint32          hash_mask;              /* mask for sortable part of hash code */
+       uint32          high_mask;              /* masks for sortable part of hash code */
+       uint32          low_mask;
+       uint32          max_buckets;
 
        /*
         * These variables are specific to the Datum case; they are set by
@@ -427,13 +466,72 @@ struct Tuplesortstate
 #endif
 };
 
+/*
+ * Private mutable state of tuplesort-parallel-operation.  This is allocated
+ * in shared memory.
+ */
+struct Sharedsort
+{
+       /* mutex protects all fields prior to tapes */
+       slock_t         mutex;
+
+       /*
+        * currentWorker generates ordinal identifier numbers for parallel sort
+        * workers.  These start from 0, and are always gapless.
+        *
+        * Workers increment workersFinished to indicate having finished.  If this
+        * is equal to state.nParticipants within the leader, leader is ready to
+        * merge worker runs.
+        */
+       int                     currentWorker;
+       int                     workersFinished;
+
+       /* Temporary file space */
+       SharedFileSet fileset;
+
+       /* Size of tapes flexible array */
+       int                     nTapes;
+
+       /*
+        * Tapes array used by workers to report back information needed by the
+        * leader to concatenate all worker tapes into one for merging
+        */
+       TapeShare       tapes[FLEXIBLE_ARRAY_MEMBER];
+};
+
+/*
+ * Is the given tuple allocated from the slab memory arena?
+ */
+#define IS_SLAB_SLOT(state, tuple) \
+       ((char *) (tuple) >= (state)->slabMemoryBegin && \
+        (char *) (tuple) < (state)->slabMemoryEnd)
+
+/*
+ * Return the given tuple to the slab memory free list, or free it
+ * if it was palloc'd.
+ */
+#define RELEASE_SLAB_SLOT(state, tuple) \
+       do { \
+               SlabSlot *buf = (SlabSlot *) tuple; \
+               \
+               if (IS_SLAB_SLOT((state), buf)) \
+               { \
+                       buf->nextfree = (state)->slabFreeHead; \
+                       (state)->slabFreeHead = buf; \
+               } else \
+                       pfree(buf); \
+       } while(0)
+
 #define COMPARETUP(state,a,b)  ((*(state)->comparetup) (a, b, state))
 #define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup))
 #define WRITETUP(state,tape,stup)      ((*(state)->writetup) (state, tape, stup))
 #define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len))
-#define LACKMEM(state)         ((state)->availMem < 0 && !(state)->batchUsed)
+#define LACKMEM(state)         ((state)->availMem < 0 && !(state)->slabAllocatorUsed)
 #define USEMEM(state,amt)      ((state)->availMem -= (amt))
 #define FREEMEM(state,amt)     ((state)->availMem += (amt))
+#define SERIAL(state)          ((state)->shared == NULL)
+#define WORKER(state)          ((state)->shared && (state)->worker != -1)
+#define LEADER(state)          ((state)->shared && (state)->worker == -1)
 
 /*
  * NOTES about on-tape representation of tuples:
@@ -490,33 +588,30 @@ struct Tuplesortstate
        } while(0)
 
 
-static Tuplesortstate *tuplesort_begin_common(int workMem, bool randomAccess);
+static Tuplesortstate *tuplesort_begin_common(int workMem,
+                                          SortCoordinate coordinate,
+                                          bool randomAccess);
 static void puttuple_common(Tuplesortstate *state, SortTuple *tuple);
 static bool consider_abort_common(Tuplesortstate *state);
-static void inittapes(Tuplesortstate *state);
+static void inittapes(Tuplesortstate *state, bool mergeruns);
+static void inittapestate(Tuplesortstate *state, int maxTapes);
 static void selectnewtape(Tuplesortstate *state);
+static void init_slab_allocator(Tuplesortstate *state, int numSlots);
 static void mergeruns(Tuplesortstate *state);
 static void mergeonerun(Tuplesortstate *state);
-static void beginmerge(Tuplesortstate *state, bool finalMerge);
-static void batchmemtuples(Tuplesortstate *state);
-static void mergebatch(Tuplesortstate *state, int64 spacePerTape);
-static void mergebatchone(Tuplesortstate *state, int srcTape,
-                                         SortTuple *stup, bool *should_free);
-static void mergebatchfreetape(Tuplesortstate *state, int srcTape,
-                                                          SortTuple *rtup, bool *should_free);
-static void *mergebatchalloc(Tuplesortstate *state, int tapenum, Size tuplen);
-static void mergepreread(Tuplesortstate *state);
-static void mergeprereadone(Tuplesortstate *state, int srcTape);
+static void beginmerge(Tuplesortstate *state);
+static bool mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup);
 static void dumptuples(Tuplesortstate *state, bool alltuples);
 static void make_bounded_heap(Tuplesortstate *state);
 static void sort_bounded_heap(Tuplesortstate *state);
-static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple,
-                                         int tupleindex, bool checkIndex);
-static void tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex);
+static void tuplesort_sort_memtuples(Tuplesortstate *state);
+static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple);
+static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple);
+static void tuplesort_heap_delete_top(Tuplesortstate *state);
 static void reversedirection(Tuplesortstate *state);
 static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
 static void markrunend(Tuplesortstate *state, int tapenum);
-static void *readtup_alloc(Tuplesortstate *state, int tapenum, Size tuplen);
+static void *readtup_alloc(Tuplesortstate *state, Size tuplen);
 static int comparetup_heap(const SortTuple *a, const SortTuple *b,
                                Tuplesortstate *state);
 static void copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup);
@@ -547,6 +642,10 @@ static void writetup_datum(Tuplesortstate *state, int tapenum,
                           SortTuple *stup);
 static void readtup_datum(Tuplesortstate *state, SortTuple *stup,
                          int tapenum, unsigned int len);
+static int     worker_get_identifier(Tuplesortstate *state);
+static void worker_freeze_result_tape(Tuplesortstate *state);
+static void worker_nomergeruns(Tuplesortstate *state);
+static void leader_takeover_tapes(Tuplesortstate *state);
 static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
 
 /*
@@ -579,37 +678,38 @@ static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
  */
 
 static Tuplesortstate *
-tuplesort_begin_common(int workMem, bool randomAccess)
+tuplesort_begin_common(int workMem, SortCoordinate coordinate,
+                                          bool randomAccess)
 {
        Tuplesortstate *state;
        MemoryContext sortcontext;
        MemoryContext tuplecontext;
        MemoryContext oldcontext;
 
+       /* See leader_takeover_tapes() remarks on randomAccess support */
+       if (coordinate && randomAccess)
+               elog(ERROR, "random access disallowed under parallel sort");
+
        /*
         * Create a working memory context for this sort operation. All data
         * needed by the sort will live inside this context.
         */
        sortcontext = AllocSetContextCreate(CurrentMemoryContext,
                                                                                "TupleSort main",
-                                                                               ALLOCSET_DEFAULT_MINSIZE,
-                                                                               ALLOCSET_DEFAULT_INITSIZE,
-                                                                               ALLOCSET_DEFAULT_MAXSIZE);
+                                                                               ALLOCSET_DEFAULT_SIZES);
 
        /*
         * Caller tuple (e.g. IndexTuple) memory context.
         *
-        * A dedicated child content used exclusively for caller passed tuples
-        * eases memory management.  Resetting at key points reduces fragmentation.
-        * Note that the memtuples array of SortTuples is allocated in the parent
-        * context, not this context, because there is no need to free memtuples
-        * early.
+        * A dedicated child context used exclusively for caller passed tuples
+        * eases memory management.  Resetting at key points reduces
+        * fragmentation. Note that the memtuples array of SortTuples is allocated
+        * in the parent context, not this context, because there is no need to
+        * free memtuples early.
         */
        tuplecontext = AllocSetContextCreate(sortcontext,
                                                                                 "Caller tuples",
-                                                                                ALLOCSET_DEFAULT_MINSIZE,
-                                                                                ALLOCSET_DEFAULT_INITSIZE,
-                                                                                ALLOCSET_DEFAULT_MAXSIZE);
+                                                                                ALLOCSET_DEFAULT_SIZES);
 
        /*
         * Make the Tuplesortstate within the per-sort context.  This way, we
@@ -629,7 +729,14 @@ tuplesort_begin_common(int workMem, bool randomAccess)
        state->bounded = false;
        state->tuples = true;
        state->boundUsed = false;
-       state->allowedMem = workMem * (int64) 1024;
+
+       /*
+        * workMem is forced to be at least 64KB, the current minimum valid value
+        * for the work_mem GUC.  This is a defense against parallel sort callers
+        * that divide out memory among many workers in a way that leaves each
+        * with very little memory.
+        */
+       state->allowedMem = Max(workMem, 64) * (int64) 1024;
        state->availMem = state->allowedMem;
        state->sortcontext = sortcontext;
        state->tuplecontext = tuplecontext;
@@ -642,10 +749,10 @@ tuplesort_begin_common(int workMem, bool randomAccess)
         * see comments in grow_memtuples().
         */
        state->memtupsize = Max(1024,
-                                               ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1);
+                                                       ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1);
 
        state->growmemtuples = true;
-       state->batchUsed = false;
+       state->slabAllocatorUsed = false;
        state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
 
        USEMEM(state, GetMemoryChunkSpace(state->memtuples));
@@ -663,6 +770,33 @@ tuplesort_begin_common(int workMem, bool randomAccess)
 
        state->result_tape = -1;        /* flag that result tape has not been formed */
 
+       /*
+        * Initialize parallel-related state based on coordination information
+        * from caller
+        */
+       if (!coordinate)
+       {
+               /* Serial sort */
+               state->shared = NULL;
+               state->worker = -1;
+               state->nParticipants = -1;
+       }
+       else if (coordinate->isWorker)
+       {
+               /* Parallel worker produces exactly one final run from all input */
+               state->shared = coordinate->sharedsort;
+               state->worker = worker_get_identifier(state);
+               state->nParticipants = -1;
+       }
+       else
+       {
+               /* Parallel leader state only used for final merge */
+               state->shared = coordinate->sharedsort;
+               state->worker = -1;
+               state->nParticipants = coordinate->nParticipants;
+               Assert(state->nParticipants >= 1);
+       }
+
        MemoryContextSwitchTo(oldcontext);
 
        return state;
@@ -673,9 +807,10 @@ tuplesort_begin_heap(TupleDesc tupDesc,
                                         int nkeys, AttrNumber *attNums,
                                         Oid *sortOperators, Oid *sortCollations,
                                         bool *nullsFirstFlags,
-                                        int workMem, bool randomAccess)
+                                        int workMem, SortCoordinate coordinate, bool randomAccess)
 {
-       Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
+       Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+                                                                                                  randomAccess);
        MemoryContext oldcontext;
        int                     i;
 
@@ -696,7 +831,8 @@ tuplesort_begin_heap(TupleDesc tupDesc,
                                                                false,  /* no unique check */
                                                                nkeys,
                                                                workMem,
-                                                               randomAccess);
+                                                               randomAccess,
+                                                               PARALLEL_SORT(state));
 
        state->comparetup = comparetup_heap;
        state->copytup = copytup_heap;
@@ -743,10 +879,12 @@ tuplesort_begin_heap(TupleDesc tupDesc,
 Tuplesortstate *
 tuplesort_begin_cluster(TupleDesc tupDesc,
                                                Relation indexRel,
-                                               int workMem, bool randomAccess)
+                                               int workMem,
+                                               SortCoordinate coordinate, bool randomAccess)
 {
-       Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
-       ScanKey         indexScanKey;
+       Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+                                                                                                  randomAccess);
+       BTScanInsert indexScanKey;
        MemoryContext oldcontext;
        int                     i;
 
@@ -762,13 +900,14 @@ tuplesort_begin_cluster(TupleDesc tupDesc,
                         workMem, randomAccess ? 't' : 'f');
 #endif
 
-       state->nKeys = RelationGetNumberOfAttributes(indexRel);
+       state->nKeys = IndexRelationGetNumberOfKeyAttributes(indexRel);
 
        TRACE_POSTGRESQL_SORT_START(CLUSTER_SORT,
                                                                false,  /* no unique check */
                                                                state->nKeys,
                                                                workMem,
-                                                               randomAccess);
+                                                               randomAccess,
+                                                               PARALLEL_SORT(state));
 
        state->comparetup = comparetup_cluster;
        state->copytup = copytup_cluster;
@@ -780,7 +919,7 @@ tuplesort_begin_cluster(TupleDesc tupDesc,
 
        state->tupDesc = tupDesc;       /* assume we need not copy tupDesc */
 
-       indexScanKey = _bt_mkscankey_nodata(indexRel);
+       indexScanKey = _bt_mkscankey(indexRel, NULL);
 
        if (state->indexInfo->ii_Expressions != NULL)
        {
@@ -794,7 +933,7 @@ tuplesort_begin_cluster(TupleDesc tupDesc,
                 * scantuple has to point to that slot, too.
                 */
                state->estate = CreateExecutorState();
-               slot = MakeSingleTupleTableSlot(tupDesc);
+               slot = MakeSingleTupleTableSlot(tupDesc, &TTSOpsVirtual);
                econtext = GetPerTupleExprContext(state->estate);
                econtext->ecxt_scantuple = slot;
        }
@@ -806,7 +945,7 @@ tuplesort_begin_cluster(TupleDesc tupDesc,
        for (i = 0; i < state->nKeys; i++)
        {
                SortSupport sortKey = state->sortKeys + i;
-               ScanKey         scanKey = indexScanKey + i;
+               ScanKey         scanKey = indexScanKey->scankeys + i;
                int16           strategy;
 
                sortKey->ssup_cxt = CurrentMemoryContext;
@@ -825,7 +964,7 @@ tuplesort_begin_cluster(TupleDesc tupDesc,
                PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey);
        }
 
-       _bt_freeskey(indexScanKey);
+       pfree(indexScanKey);
 
        MemoryContextSwitchTo(oldcontext);
 
@@ -836,10 +975,13 @@ Tuplesortstate *
 tuplesort_begin_index_btree(Relation heapRel,
                                                        Relation indexRel,
                                                        bool enforceUnique,
-                                                       int workMem, bool randomAccess)
+                                                       int workMem,
+                                                       SortCoordinate coordinate,
+                                                       bool randomAccess)
 {
-       Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
-       ScanKey         indexScanKey;
+       Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+                                                                                                  randomAccess);
+       BTScanInsert indexScanKey;
        MemoryContext oldcontext;
        int                     i;
 
@@ -853,13 +995,14 @@ tuplesort_begin_index_btree(Relation heapRel,
                         workMem, randomAccess ? 't' : 'f');
 #endif
 
-       state->nKeys = RelationGetNumberOfAttributes(indexRel);
+       state->nKeys = IndexRelationGetNumberOfKeyAttributes(indexRel);
 
        TRACE_POSTGRESQL_SORT_START(INDEX_SORT,
                                                                enforceUnique,
                                                                state->nKeys,
                                                                workMem,
-                                                               randomAccess);
+                                                               randomAccess,
+                                                               PARALLEL_SORT(state));
 
        state->comparetup = comparetup_index_btree;
        state->copytup = copytup_index;
@@ -871,8 +1014,7 @@ tuplesort_begin_index_btree(Relation heapRel,
        state->indexRel = indexRel;
        state->enforceUnique = enforceUnique;
 
-       indexScanKey = _bt_mkscankey_nodata(indexRel);
-       state->nKeys = RelationGetNumberOfAttributes(indexRel);
+       indexScanKey = _bt_mkscankey(indexRel, NULL);
 
        /* Prepare SortSupport data for each column */
        state->sortKeys = (SortSupport) palloc0(state->nKeys *
@@ -881,7 +1023,7 @@ tuplesort_begin_index_btree(Relation heapRel,
        for (i = 0; i < state->nKeys; i++)
        {
                SortSupport sortKey = state->sortKeys + i;
-               ScanKey         scanKey = indexScanKey + i;
+               ScanKey         scanKey = indexScanKey->scankeys + i;
                int16           strategy;
 
                sortKey->ssup_cxt = CurrentMemoryContext;
@@ -900,7 +1042,7 @@ tuplesort_begin_index_btree(Relation heapRel,
                PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey);
        }
 
-       _bt_freeskey(indexScanKey);
+       pfree(indexScanKey);
 
        MemoryContextSwitchTo(oldcontext);
 
@@ -910,10 +1052,15 @@ tuplesort_begin_index_btree(Relation heapRel,
 Tuplesortstate *
 tuplesort_begin_index_hash(Relation heapRel,
                                                   Relation indexRel,
-                                                  uint32 hash_mask,
-                                                  int workMem, bool randomAccess)
+                                                  uint32 high_mask,
+                                                  uint32 low_mask,
+                                                  uint32 max_buckets,
+                                                  int workMem,
+                                                  SortCoordinate coordinate,
+                                                  bool randomAccess)
 {
-       Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
+       Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+                                                                                                  randomAccess);
        MemoryContext oldcontext;
 
        oldcontext = MemoryContextSwitchTo(state->sortcontext);
@@ -921,8 +1068,11 @@ tuplesort_begin_index_hash(Relation heapRel,
 #ifdef TRACE_SORT
        if (trace_sort)
                elog(LOG,
-               "begin index sort: hash_mask = 0x%x, workMem = %d, randomAccess = %c",
-                        hash_mask,
+                        "begin index sort: high_mask = 0x%x, low_mask = 0x%x, "
+                        "max_buckets = 0x%x, workMem = %d, randomAccess = %c",
+                        high_mask,
+                        low_mask,
+                        max_buckets,
                         workMem, randomAccess ? 't' : 'f');
 #endif
 
@@ -936,7 +1086,9 @@ tuplesort_begin_index_hash(Relation heapRel,
        state->heapRel = heapRel;
        state->indexRel = indexRel;
 
-       state->hash_mask = hash_mask;
+       state->high_mask = high_mask;
+       state->low_mask = low_mask;
+       state->max_buckets = max_buckets;
 
        MemoryContextSwitchTo(oldcontext);
 
@@ -945,10 +1097,11 @@ tuplesort_begin_index_hash(Relation heapRel,
 
 Tuplesortstate *
 tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
-                                         bool nullsFirstFlag,
-                                         int workMem, bool randomAccess)
+                                         bool nullsFirstFlag, int workMem,
+                                         SortCoordinate coordinate, bool randomAccess)
 {
-       Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
+       Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+                                                                                                  randomAccess);
        MemoryContext oldcontext;
        int16           typlen;
        bool            typbyval;
@@ -968,7 +1121,8 @@ tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
                                                                false,  /* no unique check */
                                                                1,
                                                                workMem,
-                                                               randomAccess);
+                                                               randomAccess,
+                                                               PARALLEL_SORT(state));
 
        state->comparetup = comparetup_datum;
        state->copytup = copytup_datum;
@@ -995,8 +1149,8 @@ tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
         * a pass-by-value datatype could have an abbreviated form that is cheaper
         * to compare.  In a tuple sort, we could support that, because we can
         * always extract the original datum from the tuple is needed.  Here, we
-        * can't, because a datum sort only stores a single copy of the datum;
-        * the "tuple" field of each sortTuple is NULL.
+        * can't, because a datum sort only stores a single copy of the datum; the
+        * "tuple" field of each sortTuple is NULL.
         */
        state->sortKeys->abbreviate = !typbyval;
 
@@ -1026,7 +1180,7 @@ tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
  * delayed calls at the moment.)
  *
  * This is a hint only. The tuplesort may still return more tuples than
- * requested.
+ * requested.  Parallel leader tuplesorts will always ignore the hint.
  */
 void
 tuplesort_set_bound(Tuplesortstate *state, int64 bound)
@@ -1035,6 +1189,7 @@ tuplesort_set_bound(Tuplesortstate *state, int64 bound)
        Assert(state->status == TSS_INITIAL);
        Assert(state->memtupcount == 0);
        Assert(!state->bounded);
+       Assert(!WORKER(state));
 
 #ifdef DEBUG_BOUNDED_SORT
        /* Honor GUC setting that disables the feature (for easy testing) */
@@ -1042,6 +1197,10 @@ tuplesort_set_bound(Tuplesortstate *state, int64 bound)
                return;
 #endif
 
+       /* Parallel leader ignores hint */
+       if (LEADER(state))
+               return;
+
        /* We want to be able to compute bound * 2, so limit the setting */
        if (bound > (int64) (INT_MAX / 2))
                return;
@@ -1100,11 +1259,13 @@ tuplesort_end(Tuplesortstate *state)
        if (trace_sort)
        {
                if (state->tapeset)
-                       elog(LOG, "external sort ended, %ld disk blocks used: %s",
-                                spaceUsed, pg_rusage_show(&state->ru_start));
+                       elog(LOG, "%s of worker %d ended, %ld disk blocks used: %s",
+                                SERIAL(state) ? "external sort" : "parallel external sort",
+                                state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
                else
-                       elog(LOG, "internal sort ended, %ld KB used: %s",
-                                spaceUsed, pg_rusage_show(&state->ru_start));
+                       elog(LOG, "%s of worker %d ended, %ld KB used: %s",
+                                SERIAL(state) ? "internal sort" : "unperformed parallel sort",
+                                state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
        }
 
        TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, spaceUsed);
@@ -1138,7 +1299,7 @@ tuplesort_end(Tuplesortstate *state)
 /*
  * Grow the memtuples[] array, if possible within our memory constraint.  We
  * must not exceed INT_MAX tuples in memory or the caller-provided memory
- * limit.  Return TRUE if we were able to enlarge the array, FALSE if not.
+ * limit.  Return true if we were able to enlarge the array, false if not.
  *
  * Normally, at each increment we double the size of the array.  When doing
  * that would exceed a limit, we attempt one last, smaller increase (and then
@@ -1366,8 +1527,7 @@ tuplesort_putindextuplevalues(Tuplesortstate *state, Relation rel,
                 * ensure a consistent representation (current tuple was just
                 * handled).  It does not matter if some dumped tuples are already
                 * sorted on tape, since serialized tuples lack abbreviated keys
-                * (TSS_BUILDRUNS state prevents control reaching here in any
-                * case).
+                * (TSS_BUILDRUNS state prevents control reaching here in any case).
                 */
                for (i = 0; i < state->memtupcount; i++)
                {
@@ -1377,7 +1537,7 @@ tuplesort_putindextuplevalues(Tuplesortstate *state, Relation rel,
                        mtup->datum1 = index_getattr(tuple,
                                                                                 1,
                                                                                 RelationGetDescr(state->indexRel),
-                                                                                &stup.isnull1);
+                                                                                &mtup->isnull1);
                }
        }
 
@@ -1412,8 +1572,8 @@ tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull)
        if (isNull || !state->tuples)
        {
                /*
-                * Set datum1 to zeroed representation for NULLs (to be consistent, and
-                * to support cheap inequality tests for NULL abbreviated keys).
+                * Set datum1 to zeroed representation for NULLs (to be consistent,
+                * and to support cheap inequality tests for NULL abbreviated keys).
                 */
                stup.datum1 = !isNull ? val : (Datum) 0;
                stup.isnull1 = isNull;
@@ -1451,10 +1611,10 @@ tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull)
                         *
                         * Alter datum1 representation in already-copied tuples, so as to
                         * ensure a consistent representation (current tuple was just
-                        * handled).  It does not matter if some dumped tuples are
-                        * already sorted on tape, since serialized tuples lack
-                        * abbreviated keys (TSS_BUILDRUNS state prevents control
-                        * reaching here in any case).
+                        * handled).  It does not matter if some dumped tuples are already
+                        * sorted on tape, since serialized tuples lack abbreviated keys
+                        * (TSS_BUILDRUNS state prevents control reaching here in any
+                        * case).
                         */
                        for (i = 0; i < state->memtupcount; i++)
                        {
@@ -1476,6 +1636,8 @@ tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull)
 static void
 puttuple_common(Tuplesortstate *state, SortTuple *tuple)
 {
+       Assert(!LEADER(state));
+
        switch (state->status)
        {
                case TSS_INITIAL:
@@ -1529,10 +1691,10 @@ puttuple_common(Tuplesortstate *state, SortTuple *tuple)
                        /*
                         * Nope; time to switch to tape-based operation.
                         */
-                       inittapes(state);
+                       inittapes(state, true);
 
                        /*
-                        * Dump tuples until we are back under the limit.
+                        * Dump all tuples.
                         */
                        dumptuples(state, false);
                        break;
@@ -1555,36 +1717,21 @@ puttuple_common(Tuplesortstate *state, SortTuple *tuple)
                        }
                        else
                        {
-                               /* discard top of heap, sift up, insert new tuple */
+                               /* discard top of heap, replacing it with the new tuple */
                                free_sort_tuple(state, &state->memtuples[0]);
-                               tuplesort_heap_siftup(state, false);
-                               tuplesort_heap_insert(state, tuple, 0, false);
+                               tuplesort_heap_replace_top(state, tuple);
                        }
                        break;
 
                case TSS_BUILDRUNS:
 
                        /*
-                        * Insert the tuple into the heap, with run number currentRun if
-                        * it can go into the current run, else run number currentRun+1.
-                        * The tuple can go into the current run if it is >= the first
-                        * not-yet-output tuple.  (Actually, it could go into the current
-                        * run if it is >= the most recently output tuple ... but that
-                        * would require keeping around the tuple we last output, and it's
-                        * simplest to let writetup free each tuple as soon as it's
-                        * written.)
-                        *
-                        * Note there will always be at least one tuple in the heap at
-                        * this point; see dumptuples.
+                        * Save the tuple into the unsorted array (there must be space)
                         */
-                       Assert(state->memtupcount > 0);
-                       if (COMPARETUP(state, tuple, &state->memtuples[0]) >= 0)
-                               tuplesort_heap_insert(state, tuple, state->currentRun, true);
-                       else
-                               tuplesort_heap_insert(state, tuple, state->currentRun + 1, true);
+                       state->memtuples[state->memtupcount++] = *tuple;
 
                        /*
-                        * If we are over the memory limit, dump tuples till we're under.
+                        * If we are over the memory limit, dump all tuples.
                         */
                        dumptuples(state, false);
                        break;
@@ -1646,8 +1793,8 @@ tuplesort_performsort(Tuplesortstate *state)
 
 #ifdef TRACE_SORT
        if (trace_sort)
-               elog(LOG, "performsort starting: %s",
-                        pg_rusage_show(&state->ru_start));
+               elog(LOG, "performsort of worker %d starting: %s",
+                        state->worker, pg_rusage_show(&state->ru_start));
 #endif
 
        switch (state->status)
@@ -1656,25 +1803,39 @@ tuplesort_performsort(Tuplesortstate *state)
 
                        /*
                         * We were able to accumulate all the tuples within the allowed
-                        * amount of memory.  Just qsort 'em and we're done.
+                        * amount of memory, or leader to take over worker tapes
                         */
-                       if (state->memtupcount > 1)
+                       if (SERIAL(state))
                        {
-                               /* Can we use the single-key sort function? */
-                               if (state->onlyKey != NULL)
-                                       qsort_ssup(state->memtuples, state->memtupcount,
-                                                          state->onlyKey);
-                               else
-                                       qsort_tuple(state->memtuples,
-                                                               state->memtupcount,
-                                                               state->comparetup,
-                                                               state);
+                               /* Just qsort 'em and we're done */
+                               tuplesort_sort_memtuples(state);
+                               state->status = TSS_SORTEDINMEM;
+                       }
+                       else if (WORKER(state))
+                       {
+                               /*
+                                * Parallel workers must still dump out tuples to tape.  No
+                                * merge is required to produce single output run, though.
+                                */
+                               inittapes(state, false);
+                               dumptuples(state, true);
+                               worker_nomergeruns(state);
+                               state->status = TSS_SORTEDONTAPE;
+                       }
+                       else
+                       {
+                               /*
+                                * Leader will take over worker tapes and merge worker runs.
+                                * Note that mergeruns sets the correct state->status.
+                                */
+                               leader_takeover_tapes(state);
+                               mergeruns(state);
                        }
                        state->current = 0;
                        state->eof_reached = false;
+                       state->markpos_block = 0L;
                        state->markpos_offset = 0;
                        state->markpos_eof = false;
-                       state->status = TSS_SORTEDINMEM;
                        break;
 
                case TSS_BOUNDED:
@@ -1697,8 +1858,8 @@ tuplesort_performsort(Tuplesortstate *state)
                        /*
                         * Finish tape-based sort.  First, flush all tuples remaining in
                         * memory out to tape; then merge until we have a single remaining
-                        * run (or, if !randomAccess, one run per tape). Note that
-                        * mergeruns sets the correct state->status.
+                        * run (or, if !randomAccess and !WORKER(), one run per tape).
+                        * Note that mergeruns sets the correct state->status.
                         */
                        dumptuples(state, true);
                        mergeruns(state);
@@ -1717,12 +1878,12 @@ tuplesort_performsort(Tuplesortstate *state)
        if (trace_sort)
        {
                if (state->status == TSS_FINALMERGE)
-                       elog(LOG, "performsort done (except %d-way final merge): %s",
-                                state->activeTapes,
+                       elog(LOG, "performsort of worker %d done (except %d-way final merge): %s",
+                                state->worker, state->activeTapes,
                                 pg_rusage_show(&state->ru_start));
                else
-                       elog(LOG, "performsort done: %s",
-                                pg_rusage_show(&state->ru_start));
+                       elog(LOG, "performsort of worker %d done: %s",
+                                state->worker, pg_rusage_show(&state->ru_start));
        }
 #endif
 
@@ -1731,22 +1892,25 @@ tuplesort_performsort(Tuplesortstate *state)
 
 /*
  * Internal routine to fetch the next tuple in either forward or back
- * direction into *stup.  Returns FALSE if no more tuples.
- * If *should_free is set, the caller must pfree stup.tuple when done with it.
- * Otherwise, caller should not use tuple following next call here.
+ * direction into *stup.  Returns false if no more tuples.
+ * Returned tuple belongs to tuplesort memory context, and must not be freed
+ * by caller.  Note that fetched tuple is stored in memory that may be
+ * recycled by any future fetch.
  */
 static bool
 tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
-                                                 SortTuple *stup, bool *should_free)
+                                                 SortTuple *stup)
 {
        unsigned int tuplen;
+       size_t          nmoved;
+
+       Assert(!WORKER(state));
 
        switch (state->status)
        {
                case TSS_SORTEDINMEM:
                        Assert(forward || state->randomAccess);
-                       Assert(!state->batchUsed);
-                       *should_free = false;
+                       Assert(!state->slabAllocatorUsed);
                        if (forward)
                        {
                                if (state->current < state->memtupcount)
@@ -1790,15 +1954,34 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
 
                case TSS_SORTEDONTAPE:
                        Assert(forward || state->randomAccess);
-                       Assert(!state->batchUsed);
-                       *should_free = true;
+                       Assert(state->slabAllocatorUsed);
+
+                       /*
+                        * The slot that held the tuple that we returned in previous
+                        * gettuple call can now be reused.
+                        */
+                       if (state->lastReturnedTuple)
+                       {
+                               RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
+                               state->lastReturnedTuple = NULL;
+                       }
+
                        if (forward)
                        {
                                if (state->eof_reached)
                                        return false;
+
                                if ((tuplen = getlen(state, state->result_tape, true)) != 0)
                                {
                                        READTUP(state, stup, state->result_tape, tuplen);
+
+                                       /*
+                                        * Remember the tuple we return, so that we can recycle
+                                        * its memory on next call.  (This can be NULL, in the
+                                        * !state->tuples case).
+                                        */
+                                       state->lastReturnedTuple = stup->tuple;
+
                                        return true;
                                }
                                else
@@ -1821,10 +2004,13 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
                                 * end of file; back up to fetch last tuple's ending length
                                 * word.  If seek fails we must have a completely empty file.
                                 */
-                               if (!LogicalTapeBackspace(state->tapeset,
-                                                                                 state->result_tape,
-                                                                                 2 * sizeof(unsigned int)))
+                               nmoved = LogicalTapeBackspace(state->tapeset,
+                                                                                         state->result_tape,
+                                                                                         2 * sizeof(unsigned int));
+                               if (nmoved == 0)
                                        return false;
+                               else if (nmoved != 2 * sizeof(unsigned int))
+                                       elog(ERROR, "unexpected tape position");
                                state->eof_reached = false;
                        }
                        else
@@ -1833,31 +2019,34 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
                                 * Back up and fetch previously-returned tuple's ending length
                                 * word.  If seek fails, assume we are at start of file.
                                 */
-                               if (!LogicalTapeBackspace(state->tapeset,
-                                                                                 state->result_tape,
-                                                                                 sizeof(unsigned int)))
+                               nmoved = LogicalTapeBackspace(state->tapeset,
+                                                                                         state->result_tape,
+                                                                                         sizeof(unsigned int));
+                               if (nmoved == 0)
                                        return false;
+                               else if (nmoved != sizeof(unsigned int))
+                                       elog(ERROR, "unexpected tape position");
                                tuplen = getlen(state, state->result_tape, false);
 
                                /*
                                 * Back up to get ending length word of tuple before it.
                                 */
-                               if (!LogicalTapeBackspace(state->tapeset,
-                                                                                 state->result_tape,
-                                                                                 tuplen + 2 * sizeof(unsigned int)))
+                               nmoved = LogicalTapeBackspace(state->tapeset,
+                                                                                         state->result_tape,
+                                                                                         tuplen + 2 * sizeof(unsigned int));
+                               if (nmoved == tuplen + sizeof(unsigned int))
                                {
                                        /*
-                                        * If that fails, presumably the prev tuple is the first
-                                        * in the file.  Back up so that it becomes next to read
-                                        * in forward direction (not obviously right, but that is
-                                        * what in-memory case does).
+                                        * We backed up over the previous tuple, but there was no
+                                        * ending length word before it.  That means that the prev
+                                        * tuple is the first tuple in the file.  It is now the
+                                        * next to read in forward direction (not obviously right,
+                                        * but that is what in-memory case does).
                                         */
-                                       if (!LogicalTapeBackspace(state->tapeset,
-                                                                                         state->result_tape,
-                                                                                         tuplen + sizeof(unsigned int)))
-                                               elog(ERROR, "bogus tuple length in backward scan");
                                        return false;
                                }
+                               else if (nmoved != tuplen + 2 * sizeof(unsigned int))
+                                       elog(ERROR, "bogus tuple length in backward scan");
                        }
 
                        tuplen = getlen(state, state->result_tape, false);
@@ -1867,18 +2056,35 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
                         * Note: READTUP expects we are positioned after the initial
                         * length word of the tuple, so back up to that point.
                         */
-                       if (!LogicalTapeBackspace(state->tapeset,
-                                                                         state->result_tape,
-                                                                         tuplen))
+                       nmoved = LogicalTapeBackspace(state->tapeset,
+                                                                                 state->result_tape,
+                                                                                 tuplen);
+                       if (nmoved != tuplen)
                                elog(ERROR, "bogus tuple length in backward scan");
                        READTUP(state, stup, state->result_tape, tuplen);
+
+                       /*
+                        * Remember the tuple we return, so that we can recycle its memory
+                        * on next call. (This can be NULL, in the Datum case).
+                        */
+                       state->lastReturnedTuple = stup->tuple;
+
                        return true;
 
                case TSS_FINALMERGE:
                        Assert(forward);
-                       Assert(state->batchUsed || !state->tuples);
-                       /* For now, assume tuple is stored in tape's batch memory */
-                       *should_free = false;
+                       /* We are managing memory ourselves, with the slab allocator. */
+                       Assert(state->slabAllocatorUsed);
+
+                       /*
+                        * The slab slot holding the tuple that we returned in previous
+                        * gettuple call can now be reused.
+                        */
+                       if (state->lastReturnedTuple)
+                       {
+                               RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
+                               state->lastReturnedTuple = NULL;
+                       }
 
                        /*
                         * This code should match the inner loop of mergeonerun().
@@ -1886,54 +2092,38 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
                        if (state->memtupcount > 0)
                        {
                                int                     srcTape = state->memtuples[0].tupindex;
-                               int                     tupIndex;
-                               SortTuple  *newtup;
+                               SortTuple       newtup;
+
+                               *stup = state->memtuples[0];
 
                                /*
-                                * Returned tuple is still counted in our memory space most
-                                * of the time.  See mergebatchone() for discussion of why
-                                * caller may occasionally be required to free returned
-                                * tuple, and how preread memory is managed with regard to
-                                * edge cases more generally.
+                                * Remember the tuple we return, so that we can recycle its
+                                * memory on next call. (This can be NULL, in the Datum case).
                                 */
-                               *stup = state->memtuples[0];
-                               tuplesort_heap_siftup(state, false);
-                               if ((tupIndex = state->mergenext[srcTape]) == 0)
+                               state->lastReturnedTuple = stup->tuple;
+
+                               /*
+                                * Pull next tuple from tape, and replace the returned tuple
+                                * at top of the heap with it.
+                                */
+                               if (!mergereadnext(state, srcTape, &newtup))
                                {
                                        /*
-                                        * out of preloaded data on this tape, try to read more
-                                        *
-                                        * Unlike mergeonerun(), we only preload from the single
-                                        * tape that's run dry, though not before preparing its
-                                        * batch memory for a new round of sequential consumption.
-                                        * See mergepreread() comments.
+                                        * If no more data, we've reached end of run on this tape.
+                                        * Remove the top node from the heap.
                                         */
-                                       if (state->batchUsed)
-                                               mergebatchone(state, srcTape, stup, should_free);
-
-                                       mergeprereadone(state, srcTape);
+                                       tuplesort_heap_delete_top(state);
 
                                        /*
-                                        * if still no data, we've reached end of run on this tape
+                                        * Rewind to free the read buffer.  It'd go away at the
+                                        * end of the sort anyway, but better to release the
+                                        * memory early.
                                         */
-                                       if ((tupIndex = state->mergenext[srcTape]) == 0)
-                                       {
-                                               /* Free tape's buffer, avoiding dangling pointer */
-                                               if (state->batchUsed)
-                                                       mergebatchfreetape(state, srcTape, stup, should_free);
-                                               return true;
-                                       }
+                                       LogicalTapeRewindForWrite(state->tapeset, srcTape);
+                                       return true;
                                }
-                               /* pull next preread tuple from list, insert in heap */
-                               newtup = &state->memtuples[tupIndex];
-                               state->mergenext[srcTape] = newtup->tupindex;
-                               if (state->mergenext[srcTape] == 0)
-                                       state->mergelast[srcTape] = 0;
-                               tuplesort_heap_insert(state, newtup, srcTape, false);
-                               /* put the now-unused memtuples entry on the freelist */
-                               newtup->tupindex = state->mergefreelist;
-                               state->mergefreelist = tupIndex;
-                               state->mergeavailslots[srcTape]++;
+                               newtup.tupindex = srcTape;
+                               tuplesort_heap_replace_top(state, &newtup);
                                return true;
                        }
                        return false;
@@ -1946,25 +2136,32 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
 
 /*
  * Fetch the next tuple in either forward or back direction.
- * If successful, put tuple in slot and return TRUE; else, clear the slot
- * and return FALSE.
+ * If successful, put tuple in slot and return true; else, clear the slot
+ * and return false.
  *
- * Caller may optionally be passed back abbreviated value (on TRUE return
+ * Caller may optionally be passed back abbreviated value (on true return
  * value) when abbreviation was used, which can be used to cheaply avoid
  * equality checks that might otherwise be required.  Caller can safely make a
  * determination of "non-equal tuple" based on simple binary inequality.  A
  * NULL value in leading attribute will set abbreviated value to zeroed
  * representation, which caller may rely on in abbreviated inequality check.
+ *
+ * If copy is true, the slot receives a tuple that's been copied into the
+ * caller's memory context, so that it will stay valid regardless of future
+ * manipulations of the tuplesort's state (up to and including deleting the
+ * tuplesort).  If copy is false, the slot will just receive a pointer to a
+ * tuple held within the tuplesort, which is more efficient, but only safe for
+ * callers that are prepared to have any subsequent manipulation of the
+ * tuplesort's state invalidate slot contents.
  */
 bool
-tuplesort_gettupleslot(Tuplesortstate *state, bool forward,
+tuplesort_gettupleslot(Tuplesortstate *state, bool forward, bool copy,
                                           TupleTableSlot *slot, Datum *abbrev)
 {
        MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
        SortTuple       stup;
-       bool            should_free;
 
-       if (!tuplesort_gettuple_common(state, forward, &stup, &should_free))
+       if (!tuplesort_gettuple_common(state, forward, &stup))
                stup.tuple = NULL;
 
        MemoryContextSwitchTo(oldcontext);
@@ -1975,7 +2172,10 @@ tuplesort_gettupleslot(Tuplesortstate *state, bool forward,
                if (state->sortKeys->abbrev_converter && abbrev)
                        *abbrev = stup.datum1;
 
-               ExecStoreMinimalTuple((MinimalTuple) stup.tuple, slot, should_free);
+               if (copy)
+                       stup.tuple = heap_copy_minimal_tuple((MinimalTuple) stup.tuple);
+
+               ExecStoreMinimalTuple((MinimalTuple) stup.tuple, slot, copy);
                return true;
        }
        else
@@ -1987,18 +2187,17 @@ tuplesort_gettupleslot(Tuplesortstate *state, bool forward,
 
 /*
  * Fetch the next tuple in either forward or back direction.
- * Returns NULL if no more tuples.  If *should_free is set, the
- * caller must pfree the returned tuple when done with it.
- * If it is not set, caller should not use tuple following next
- * call here.
+ * Returns NULL if no more tuples.  Returned tuple belongs to tuplesort memory
+ * context, and must not be freed by caller.  Caller may not rely on tuple
+ * remaining valid after any further manipulation of tuplesort.
  */
 HeapTuple
-tuplesort_getheaptuple(Tuplesortstate *state, bool forward, bool *should_free)
+tuplesort_getheaptuple(Tuplesortstate *state, bool forward)
 {
        MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
        SortTuple       stup;
 
-       if (!tuplesort_gettuple_common(state, forward, &stup, should_free))
+       if (!tuplesort_gettuple_common(state, forward, &stup))
                stup.tuple = NULL;
 
        MemoryContextSwitchTo(oldcontext);
@@ -2008,19 +2207,17 @@ tuplesort_getheaptuple(Tuplesortstate *state, bool forward, bool *should_free)
 
 /*
  * Fetch the next index tuple in either forward or back direction.
- * Returns NULL if no more tuples.  If *should_free is set, the
- * caller must pfree the returned tuple when done with it.
- * If it is not set, caller should not use tuple following next
- * call here.
+ * Returns NULL if no more tuples.  Returned tuple belongs to tuplesort memory
+ * context, and must not be freed by caller.  Caller may not rely on tuple
+ * remaining valid after any further manipulation of tuplesort.
  */
 IndexTuple
-tuplesort_getindextuple(Tuplesortstate *state, bool forward,
-                                               bool *should_free)
+tuplesort_getindextuple(Tuplesortstate *state, bool forward)
 {
        MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
        SortTuple       stup;
 
-       if (!tuplesort_gettuple_common(state, forward, &stup, should_free))
+       if (!tuplesort_gettuple_common(state, forward, &stup))
                stup.tuple = NULL;
 
        MemoryContextSwitchTo(oldcontext);
@@ -2030,12 +2227,13 @@ tuplesort_getindextuple(Tuplesortstate *state, bool forward,
 
 /*
  * Fetch the next Datum in either forward or back direction.
- * Returns FALSE if no more datums.
+ * Returns false if no more datums.
  *
  * If the Datum is pass-by-ref type, the returned value is freshly palloc'd
- * and is now owned by the caller.
+ * in caller's context, and is now owned by the caller (this differs from
+ * similar routines for other types of tuplesorts).
  *
- * Caller may optionally be passed back abbreviated value (on TRUE return
+ * Caller may optionally be passed back abbreviated value (on true return
  * value) when abbreviation was used, which can be used to cheaply avoid
  * equality checks that might otherwise be required.  Caller can safely make a
  * determination of "non-equal tuple" based on simple binary inequality.  A
@@ -2048,14 +2246,16 @@ tuplesort_getdatum(Tuplesortstate *state, bool forward,
 {
        MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
        SortTuple       stup;
-       bool            should_free;
 
-       if (!tuplesort_gettuple_common(state, forward, &stup, &should_free))
+       if (!tuplesort_gettuple_common(state, forward, &stup))
        {
                MemoryContextSwitchTo(oldcontext);
                return false;
        }
 
+       /* Ensure we copy into caller's memory context */
+       MemoryContextSwitchTo(oldcontext);
+
        /* Record abbreviated key for caller */
        if (state->sortKeys->abbrev_converter && abbrev)
                *abbrev = stup.datum1;
@@ -2068,23 +2268,17 @@ tuplesort_getdatum(Tuplesortstate *state, bool forward,
        else
        {
                /* use stup.tuple because stup.datum1 may be an abbreviation */
-
-               if (should_free)
-                       *val = PointerGetDatum(stup.tuple);
-               else
-                       *val = datumCopy(PointerGetDatum(stup.tuple), false, state->datumTypeLen);
+               *val = datumCopy(PointerGetDatum(stup.tuple), false, state->datumTypeLen);
                *isNull = false;
        }
 
-       MemoryContextSwitchTo(oldcontext);
-
        return true;
 }
 
 /*
  * Advance over N tuples in either forward or back direction,
  * without returning any data.  N==0 is a no-op.
- * Returns TRUE if successful, FALSE if ran out of tuples.
+ * Returns true if successful, false if ran out of tuples.
  */
 bool
 tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
@@ -2097,6 +2291,7 @@ tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
         */
        Assert(forward);
        Assert(ntuples >= 0);
+       Assert(!WORKER(state));
 
        switch (state->status)
        {
@@ -2130,16 +2325,12 @@ tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
                        while (ntuples-- > 0)
                        {
                                SortTuple       stup;
-                               bool            should_free;
 
-                               if (!tuplesort_gettuple_common(state, forward,
-                                                                                          &stup, &should_free))
+                               if (!tuplesort_gettuple_common(state, forward, &stup))
                                {
                                        MemoryContextSwitchTo(oldcontext);
                                        return false;
                                }
-                               if (should_free && stup.tuple)
-                                       pfree(stup.tuple);
                                CHECK_FOR_INTERRUPTS();
                        }
                        MemoryContextSwitchTo(oldcontext);
@@ -2175,8 +2366,19 @@ tuplesort_merge_order(int64 allowedMem)
        mOrder = (allowedMem - TAPE_BUFFER_OVERHEAD) /
                (MERGE_BUFFER_SIZE + TAPE_BUFFER_OVERHEAD);
 
-       /* Even in minimum memory, use at least a MINORDER merge */
+       /*
+        * Even in minimum memory, use at least a MINORDER merge.  On the other
+        * hand, even when we have lots of memory, do not use more than a MAXORDER
+        * merge.  Tapes are pretty cheap, but they're not entirely free.  Each
+        * additional tape reduces the amount of memory available to build runs,
+        * which in turn can cause the same sort to need more runs, which makes
+        * merging slower even if it can still be done in a single pass.  Also,
+        * high order merges are quite slow due to CPU cache effects; it can be
+        * faster to pay the I/O cost of a polyphase merge than to perform a
+        * single merge pass across many hundreds of tapes.
+        */
        mOrder = Max(mOrder, MINORDER);
+       mOrder = Min(mOrder, MAXORDER);
 
        return mOrder;
 }
@@ -2184,35 +2386,70 @@ tuplesort_merge_order(int64 allowedMem)
 /*
  * inittapes - initialize for tape sorting.
  *
- * This is called only if we have found we don't have room to sort in memory.
+ * This is called only if we have found we won't sort in memory.
  */
 static void
-inittapes(Tuplesortstate *state)
+inittapes(Tuplesortstate *state, bool mergeruns)
 {
        int                     maxTapes,
-                               ntuples,
                                j;
-       int64           tapeSpace;
 
-       /* Compute number of tapes to use: merge order plus 1 */
-       maxTapes = tuplesort_merge_order(state->allowedMem) + 1;
+       Assert(!LEADER(state));
 
-       /*
-        * We must have at least 2*maxTapes slots in the memtuples[] array, else
-        * we'd not have room for merge heap plus preread.  It seems unlikely that
-        * this case would ever occur, but be safe.
-        */
-       maxTapes = Min(maxTapes, state->memtupsize / 2);
-
-       state->maxTapes = maxTapes;
-       state->tapeRange = maxTapes - 1;
+       if (mergeruns)
+       {
+               /* Compute number of tapes to use: merge order plus 1 */
+               maxTapes = tuplesort_merge_order(state->allowedMem) + 1;
+       }
+       else
+       {
+               /* Workers can sometimes produce single run, output without merge */
+               Assert(WORKER(state));
+               maxTapes = MINORDER + 1;
+       }
 
 #ifdef TRACE_SORT
        if (trace_sort)
-               elog(LOG, "switching to external sort with %d tapes: %s",
-                        maxTapes, pg_rusage_show(&state->ru_start));
+               elog(LOG, "worker %d switching to external sort with %d tapes: %s",
+                        state->worker, maxTapes, pg_rusage_show(&state->ru_start));
 #endif
 
+       /* Create the tape set and allocate the per-tape data arrays */
+       inittapestate(state, maxTapes);
+       state->tapeset =
+               LogicalTapeSetCreate(maxTapes, NULL,
+                                                        state->shared ? &state->shared->fileset : NULL,
+                                                        state->worker);
+
+       state->currentRun = 0;
+
+       /*
+        * Initialize variables of Algorithm D (step D1).
+        */
+       for (j = 0; j < maxTapes; j++)
+       {
+               state->tp_fib[j] = 1;
+               state->tp_runs[j] = 0;
+               state->tp_dummy[j] = 1;
+               state->tp_tapenum[j] = j;
+       }
+       state->tp_fib[state->tapeRange] = 0;
+       state->tp_dummy[state->tapeRange] = 0;
+
+       state->Level = 1;
+       state->destTape = 0;
+
+       state->status = TSS_BUILDRUNS;
+}
+
+/*
+ * inittapestate - initialize generic tape management state
+ */
+static void
+inittapestate(Tuplesortstate *state, int maxTapes)
+{
+       int64           tapeSpace;
+
        /*
         * Decrease availMem to reflect the space needed for tape buffers; but
         * don't decrease it to the point that we have no room for tuples. (That
@@ -2222,74 +2459,28 @@ inittapes(Tuplesortstate *state)
         * account for tuple space, so we don't care if LACKMEM becomes
         * inaccurate.)
         */
-       tapeSpace = (int64) maxTapes *TAPE_BUFFER_OVERHEAD;
+       tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD;
 
        if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
                USEMEM(state, tapeSpace);
 
        /*
         * Make sure that the temp file(s) underlying the tape set are created in
-        * suitable temp tablespaces.
+        * suitable temp tablespaces.  For parallel sorts, this should have been
+        * called already, but it doesn't matter if it is called a second time.
         */
        PrepareTempTablespaces();
 
-       /*
-        * Create the tape set and allocate the per-tape data arrays.
-        */
-       state->tapeset = LogicalTapeSetCreate(maxTapes);
-
        state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool));
-       state->mergenext = (int *) palloc0(maxTapes * sizeof(int));
-       state->mergelast = (int *) palloc0(maxTapes * sizeof(int));
-       state->mergeavailslots = (int *) palloc0(maxTapes * sizeof(int));
-       state->mergeavailmem = (int64 *) palloc0(maxTapes * sizeof(int64));
-       state->mergetuples = (char **) palloc0(maxTapes * sizeof(char *));
-       state->mergecurrent = (char **) palloc0(maxTapes * sizeof(char *));
-       state->mergetail = (char **) palloc0(maxTapes * sizeof(char *));
-       state->mergeoverflow = (char **) palloc0(maxTapes * sizeof(char *));
        state->tp_fib = (int *) palloc0(maxTapes * sizeof(int));
        state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
        state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int));
        state->tp_tapenum = (int *) palloc0(maxTapes * sizeof(int));
 
-       /*
-        * Convert the unsorted contents of memtuples[] into a heap. Each tuple is
-        * marked as belonging to run number zero.
-        *
-        * NOTE: we pass false for checkIndex since there's no point in comparing
-        * indexes in this step, even though we do intend the indexes to be part
-        * of the sort key...
-        */
-       ntuples = state->memtupcount;
-       state->memtupcount = 0;         /* make the heap empty */
-       for (j = 0; j < ntuples; j++)
-       {
-               /* Must copy source tuple to avoid possible overwrite */
-               SortTuple       stup = state->memtuples[j];
-
-               tuplesort_heap_insert(state, &stup, 0, false);
-       }
-       Assert(state->memtupcount == ntuples);
-
-       state->currentRun = 0;
-
-       /*
-        * Initialize variables of Algorithm D (step D1).
-        */
-       for (j = 0; j < maxTapes; j++)
-       {
-               state->tp_fib[j] = 1;
-               state->tp_runs[j] = 0;
-               state->tp_dummy[j] = 1;
-               state->tp_tapenum[j] = j;
-       }
-       state->tp_fib[state->tapeRange] = 0;
-       state->tp_dummy[state->tapeRange] = 0;
-
-       state->Level = 1;
-       state->destTape = 0;
-
-       state->status = TSS_BUILDRUNS;
+       /* Record # of tapes allocated (for duration of sort) */
+       state->maxTapes = maxTapes;
+       /* Record maximum # of tapes usable as inputs when merging */
+       state->tapeRange = maxTapes - 1;
 }
 
 /*
@@ -2327,6 +2518,39 @@ selectnewtape(Tuplesortstate *state)
        state->destTape = 0;
 }
 
+/*
+ * Initialize the slab allocation arena, for the given number of slots.
+ */
+static void
+init_slab_allocator(Tuplesortstate *state, int numSlots)
+{
+       if (numSlots > 0)
+       {
+               char       *p;
+               int                     i;
+
+               state->slabMemoryBegin = palloc(numSlots * SLAB_SLOT_SIZE);
+               state->slabMemoryEnd = state->slabMemoryBegin +
+                       numSlots * SLAB_SLOT_SIZE;
+               state->slabFreeHead = (SlabSlot *) state->slabMemoryBegin;
+               USEMEM(state, numSlots * SLAB_SLOT_SIZE);
+
+               p = state->slabMemoryBegin;
+               for (i = 0; i < numSlots - 1; i++)
+               {
+                       ((SlabSlot *) p)->nextfree = (SlabSlot *) (p + SLAB_SLOT_SIZE);
+                       p += SLAB_SLOT_SIZE;
+               }
+               ((SlabSlot *) p)->nextfree = NULL;
+       }
+       else
+       {
+               state->slabMemoryBegin = state->slabMemoryEnd = NULL;
+               state->slabFreeHead = NULL;
+       }
+       state->slabAllocatorUsed = true;
+}
+
 /*
  * mergeruns -- merge all the completed initial runs.
  *
@@ -2340,6 +2564,8 @@ mergeruns(Tuplesortstate *state)
                                svTape,
                                svRuns,
                                svDummy;
+       int                     numTapes;
+       int                     numInputTapes;
 
        Assert(state->status == TSS_BUILDRUNS);
        Assert(state->memtupcount == 0);
@@ -2361,23 +2587,89 @@ mergeruns(Tuplesortstate *state)
        }
 
        /*
-        * If we produced only one initial run (quite likely if the total data
-        * volume is between 1X and 2X workMem), we can just use that tape as the
-        * finished output, rather than doing a useless merge.  (This obvious
-        * optimization is not in Knuth's algorithm.)
+        * Reset tuple memory.  We've freed all the tuples that we previously
+        * allocated.  We will use the slab allocator from now on.
+        */
+       MemoryContextDelete(state->tuplecontext);
+       state->tuplecontext = NULL;
+
+       /*
+        * We no longer need a large memtuples array.  (We will allocate a smaller
+        * one for the heap later.)
+        */
+       FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
+       pfree(state->memtuples);
+       state->memtuples = NULL;
+
+       /*
+        * If we had fewer runs than tapes, refund the memory that we imagined we
+        * would need for the tape buffers of the unused tapes.
+        *
+        * numTapes and numInputTapes reflect the actual number of tapes we will
+        * use.  Note that the output tape's tape number is maxTapes - 1, so the
+        * tape numbers of the used tapes are not consecutive, and you cannot just
+        * loop from 0 to numTapes to visit all used tapes!
         */
-       if (state->currentRun == 1)
+       if (state->Level == 1)
        {
-               state->result_tape = state->tp_tapenum[state->destTape];
-               /* must freeze and rewind the finished output tape */
-               LogicalTapeFreeze(state->tapeset, state->result_tape);
-               state->status = TSS_SORTEDONTAPE;
-               return;
+               numInputTapes = state->currentRun;
+               numTapes = numInputTapes + 1;
+               FREEMEM(state, (state->maxTapes - numTapes) * TAPE_BUFFER_OVERHEAD);
+       }
+       else
+       {
+               numInputTapes = state->tapeRange;
+               numTapes = state->maxTapes;
        }
 
+       /*
+        * Initialize the slab allocator.  We need one slab slot per input tape,
+        * for the tuples in the heap, plus one to hold the tuple last returned
+        * from tuplesort_gettuple.  (If we're sorting pass-by-val Datums,
+        * however, we don't need to do allocate anything.)
+        *
+        * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism
+        * to track memory usage of individual tuples.
+        */
+       if (state->tuples)
+               init_slab_allocator(state, numInputTapes + 1);
+       else
+               init_slab_allocator(state, 0);
+
+       /*
+        * Allocate a new 'memtuples' array, for the heap.  It will hold one tuple
+        * from each input tape.
+        */
+       state->memtupsize = numInputTapes;
+       state->memtuples = (SortTuple *) palloc(numInputTapes * sizeof(SortTuple));
+       USEMEM(state, GetMemoryChunkSpace(state->memtuples));
+
+       /*
+        * Use all the remaining memory we have available for read buffers among
+        * the input tapes.
+        *
+        * We don't try to "rebalance" the memory among tapes, when we start a new
+        * merge phase, even if some tapes are inactive in the new phase.  That
+        * would be hard, because logtape.c doesn't know where one run ends and
+        * another begins.  When a new merge phase begins, and a tape doesn't
+        * participate in it, its buffer nevertheless already contains tuples from
+        * the next run on same tape, so we cannot release the buffer.  That's OK
+        * in practice, merge performance isn't that sensitive to the amount of
+        * buffers used, and most merge phases use all or almost all tapes,
+        * anyway.
+        */
+#ifdef TRACE_SORT
+       if (trace_sort)
+               elog(LOG, "worker %d using " INT64_FORMAT " KB of memory for read buffers among %d input tapes",
+                        state->worker, state->availMem / 1024, numInputTapes);
+#endif
+
+       state->read_buffer_size = Max(state->availMem / numInputTapes, 0);
+       USEMEM(state, state->read_buffer_size * numInputTapes);
+
        /* End of step D2: rewind all output tapes to prepare for merging */
        for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
-               LogicalTapeRewind(state->tapeset, tapenum, false);
+               LogicalTapeRewindForRead(state->tapeset, tapenum, state->read_buffer_size);
 
        for (;;)
        {
@@ -2387,7 +2679,7 @@ mergeruns(Tuplesortstate *state)
                 * pass remains.  If we don't have to produce a materialized sorted
                 * tape, we can stop at this point and do the final merge on-the-fly.
                 */
-               if (!state->randomAccess)
+               if (!state->randomAccess && !WORKER(state))
                {
                        bool            allOneRun = true;
 
@@ -2405,7 +2697,7 @@ mergeruns(Tuplesortstate *state)
                                /* Tell logtape.c we won't be writing anymore */
                                LogicalTapeSetForgetFreeSpace(state->tapeset);
                                /* Initialize for the final merge pass */
-                               beginmerge(state, state->tuples);
+                               beginmerge(state);
                                state->status = TSS_FINALMERGE;
                                return;
                        }
@@ -2440,11 +2732,10 @@ mergeruns(Tuplesortstate *state)
                if (--state->Level == 0)
                        break;
                /* rewind output tape T to use as new input */
-               LogicalTapeRewind(state->tapeset, state->tp_tapenum[state->tapeRange],
-                                                 false);
+               LogicalTapeRewindForRead(state->tapeset, state->tp_tapenum[state->tapeRange],
+                                                                state->read_buffer_size);
                /* rewind used-up input tape P, and prepare it for write pass */
-               LogicalTapeRewind(state->tapeset, state->tp_tapenum[state->tapeRange - 1],
-                                                 true);
+               LogicalTapeRewindForWrite(state->tapeset, state->tp_tapenum[state->tapeRange - 1]);
                state->tp_runs[state->tapeRange - 1] = 0;
 
                /*
@@ -2473,8 +2764,18 @@ mergeruns(Tuplesortstate *state)
         * a waste of cycles anyway...
         */
        state->result_tape = state->tp_tapenum[state->tapeRange];
-       LogicalTapeFreeze(state->tapeset, state->result_tape);
+       if (!WORKER(state))
+               LogicalTapeFreeze(state->tapeset, state->result_tape, NULL);
+       else
+               worker_freeze_result_tape(state);
        state->status = TSS_SORTEDONTAPE;
+
+       /* Release the read buffers of all the other tapes, by rewinding them. */
+       for (tapenum = 0; tapenum < state->maxTapes; tapenum++)
+       {
+               if (tapenum != state->result_tape)
+                       LogicalTapeRewindForWrite(state->tapeset, tapenum);
+       }
 }
 
 /*
@@ -2488,16 +2789,12 @@ mergeonerun(Tuplesortstate *state)
 {
        int                     destTape = state->tp_tapenum[state->tapeRange];
        int                     srcTape;
-       int                     tupIndex;
-       SortTuple  *tup;
-       int64           priorAvail,
-                               spaceFreed;
 
        /*
         * Start the merge by loading one tuple from each active source tape into
         * the heap.  We can also decrease the input run/dummy run counts.
         */
-       beginmerge(state, false);
+       beginmerge(state);
 
        /*
         * Execute merge by repeatedly extracting lowest tuple in heap, writing it
@@ -2506,43 +2803,29 @@ mergeonerun(Tuplesortstate *state)
         */
        while (state->memtupcount > 0)
        {
+               SortTuple       stup;
+
                /* write the tuple to destTape */
-               priorAvail = state->availMem;
                srcTape = state->memtuples[0].tupindex;
                WRITETUP(state, destTape, &state->memtuples[0]);
-               /* writetup adjusted total free space, now fix per-tape space */
-               spaceFreed = state->availMem - priorAvail;
-               state->mergeavailmem[srcTape] += spaceFreed;
-               /* compact the heap */
-               tuplesort_heap_siftup(state, false);
-               if ((tupIndex = state->mergenext[srcTape]) == 0)
-               {
-                       /* out of preloaded data on this tape, try to read more */
-                       mergepreread(state);
-                       /* if still no data, we've reached end of run on this tape */
-                       if ((tupIndex = state->mergenext[srcTape]) == 0)
-                               continue;
-               }
-               /* pull next preread tuple from list, insert in heap */
-               tup = &state->memtuples[tupIndex];
-               state->mergenext[srcTape] = tup->tupindex;
-               if (state->mergenext[srcTape] == 0)
-                       state->mergelast[srcTape] = 0;
-               tuplesort_heap_insert(state, tup, srcTape, false);
-               /* put the now-unused memtuples entry on the freelist */
-               tup->tupindex = state->mergefreelist;
-               state->mergefreelist = tupIndex;
-               state->mergeavailslots[srcTape]++;
-       }
 
-       /*
-        * Reset tuple memory.  We've freed all of the tuples that we previously
-        * allocated, but AllocSetFree will have put those chunks of memory on
-        * particular free lists, bucketed by size class.  Thus, although all of
-        * that memory is free, it is effectively fragmented.  Resetting the
-        * context gets us out from under that problem.
-        */
-       MemoryContextReset(state->tuplecontext);
+               /* recycle the slot of the tuple we just wrote out, for the next read */
+               if (state->memtuples[0].tuple)
+                       RELEASE_SLAB_SLOT(state, state->memtuples[0].tuple);
+
+               /*
+                * pull next tuple from the tape, and replace the written-out tuple in
+                * the heap with it.
+                */
+               if (mergereadnext(state, srcTape, &stup))
+               {
+                       stup.tupindex = srcTape;
+                       tuplesort_heap_replace_top(state, &stup);
+
+               }
+               else
+                       tuplesort_heap_delete_top(state);
+       }
 
        /*
         * When the heap empties, we're done.  Write an end-of-run marker on the
@@ -2553,8 +2836,8 @@ mergeonerun(Tuplesortstate *state)
 
 #ifdef TRACE_SORT
        if (trace_sort)
-               elog(LOG, "finished %d-way merge step: %s", state->activeTapes,
-                        pg_rusage_show(&state->ru_start));
+               elog(LOG, "worker %d finished %d-way merge step: %s", state->worker,
+                        state->activeTapes, pg_rusage_show(&state->ru_start));
 #endif
 }
 
@@ -2562,21 +2845,15 @@ mergeonerun(Tuplesortstate *state)
  * beginmerge - initialize for a merge pass
  *
  * We decrease the counts of real and dummy runs for each tape, and mark
- * which tapes contain active input runs in mergeactive[].  Then, load
- * as many tuples as we can from each active input tape, and finally
- * fill the merge heap with the first tuple from each active tape.
- *
- * finalMergeBatch indicates if this is the beginning of a final on-the-fly
- * merge where a batched allocation of tuple memory is required.
+ * which tapes contain active input runs in mergeactive[].  Then, fill the
+ * merge heap with the first tuple from each active tape.
  */
 static void
-beginmerge(Tuplesortstate *state, bool finalMergeBatch)
+beginmerge(Tuplesortstate *state)
 {
        int                     activeTapes;
        int                     tapenum;
        int                     srcTape;
-       int                     slotsPerTape;
-       int64           spacePerTape;
 
        /* Heap should be empty here */
        Assert(state->memtupcount == 0);
@@ -2598,562 +2875,150 @@ beginmerge(Tuplesortstate *state, bool finalMergeBatch)
                        activeTapes++;
                }
        }
-       state->activeTapes = activeTapes;
-
-       /* Clear merge-pass state variables */
-       memset(state->mergenext, 0,
-                  state->maxTapes * sizeof(*state->mergenext));
-       memset(state->mergelast, 0,
-                  state->maxTapes * sizeof(*state->mergelast));
-       state->mergefreelist = 0;       /* nothing in the freelist */
-       state->mergefirstfree = activeTapes;            /* 1st slot avail for preread */
-
-       if (finalMergeBatch)
-       {
-               /* Free outright buffers for tape never actually allocated */
-               FREEMEM(state, (state->maxTapes - activeTapes) * TAPE_BUFFER_OVERHEAD);
-
-               /*
-                * Grow memtuples one last time, since the palloc() overhead no longer
-                * incurred can make a big difference
-                */
-               batchmemtuples(state);
-       }
-
-       /*
-        * Initialize space allocation to let each active input tape have an equal
-        * share of preread space.
-        */
        Assert(activeTapes > 0);
-       slotsPerTape = (state->memtupsize - state->mergefirstfree) / activeTapes;
-       Assert(slotsPerTape > 0);
-       spacePerTape = MAXALIGN_DOWN(state->availMem / activeTapes);
-       for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
-       {
-               if (state->mergeactive[srcTape])
-               {
-                       state->mergeavailslots[srcTape] = slotsPerTape;
-                       state->mergeavailmem[srcTape] = spacePerTape;
-               }
-       }
-
-       /*
-        * Preallocate tuple batch memory for each tape.  This is the memory used
-        * for tuples themselves (not SortTuples), so it's never used by
-        * pass-by-value datum sorts.  Memory allocation is performed here at most
-        * once per sort, just in advance of the final on-the-fly merge step.
-        */
-       if (finalMergeBatch)
-               mergebatch(state, spacePerTape);
-
-       /*
-        * Preread as many tuples as possible (and at least one) from each active
-        * tape
-        */
-       mergepreread(state);
+       state->activeTapes = activeTapes;
 
        /* Load the merge heap with the first tuple from each input tape */
        for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
        {
-               int                     tupIndex = state->mergenext[srcTape];
-               SortTuple  *tup;
+               SortTuple       tup;
 
-               if (tupIndex)
+               if (mergereadnext(state, srcTape, &tup))
                {
-                       tup = &state->memtuples[tupIndex];
-                       state->mergenext[srcTape] = tup->tupindex;
-                       if (state->mergenext[srcTape] == 0)
-                               state->mergelast[srcTape] = 0;
-                       tuplesort_heap_insert(state, tup, srcTape, false);
-                       /* put the now-unused memtuples entry on the freelist */
-                       tup->tupindex = state->mergefreelist;
-                       state->mergefreelist = tupIndex;
-                       state->mergeavailslots[srcTape]++;
-
-#ifdef TRACE_SORT
-                       if (trace_sort && finalMergeBatch)
-                       {
-                               int64           perTapeKB = (spacePerTape + 1023) / 1024;
-                               int64           usedSpaceKB;
-                               int                     usedSlots;
-
-                               /*
-                                * Report how effective batchmemtuples() was in balancing
-                                * the number of slots against the need for memory for the
-                                * underlying tuples (e.g. IndexTuples).  The big preread of
-                                * all tapes when switching to FINALMERGE state should be
-                                * fairly representative of memory utilization during the
-                                * final merge step, and in any case is the only point at
-                                * which all tapes are guaranteed to have depleted either
-                                * their batch memory allowance or slot allowance.  Ideally,
-                                * both will be completely depleted for every tape by now.
-                                */
-                               usedSpaceKB = (state->mergecurrent[srcTape] -
-                                                          state->mergetuples[srcTape] + 1023) / 1024;
-                               usedSlots = slotsPerTape - state->mergeavailslots[srcTape];
-
-                               elog(LOG, "tape %d initially used %ld KB of %ld KB batch "
-                                        "(%2.3f) and %d out of %d slots (%2.3f)", srcTape,
-                                        usedSpaceKB, perTapeKB,
-                                        (double) usedSpaceKB / (double) perTapeKB,
-                                        usedSlots, slotsPerTape,
-                                        (double) usedSlots / (double) slotsPerTape);
-                       }
-#endif
+                       tup.tupindex = srcTape;
+                       tuplesort_heap_insert(state, &tup);
                }
        }
 }
 
 /*
- * batchmemtuples - grow memtuples without palloc overhead
- *
- * When called, availMem should be approximately the amount of memory we'd
- * require to allocate memtupsize - memtupcount tuples (not SortTuples/slots)
- * that were allocated with palloc() overhead, and in doing so use up all
- * allocated slots.  However, though slots and tuple memory is in balance
- * following the last grow_memtuples() call, that's predicated on the observed
- * average tuple size for the "final" grow_memtuples() call, which includes
- * palloc overhead.  During the final merge pass, where we will arrange to
- * squeeze out the palloc overhead, we might need more slots in the memtuples
- * array.
+ * mergereadnext - read next tuple from one merge input tape
  *
- * To make that happen, arrange for the amount of remaining memory to be
- * exactly equal to the palloc overhead multiplied by the current size of
- * the memtuples array, force the grow_memtuples flag back to true (it's
- * probably but not necessarily false on entry to this routine), and then
- * call grow_memtuples.  This simulates loading enough tuples to fill the
- * whole memtuples array and then having some space left over because of the
- * elided palloc overhead.  We expect that grow_memtuples() will conclude that
- * it can't double the size of the memtuples array but that it can increase
- * it by some percentage; but if it does decide to double it, that just means
- * that we've never managed to use many slots in the memtuples array, in which
- * case doubling it shouldn't hurt anything anyway.
+ * Returns false on EOF.
  */
-static void
-batchmemtuples(Tuplesortstate *state)
+static bool
+mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup)
 {
-       int64                   refund;
-       int64                   availMemLessRefund;
-       int                             memtupsize = state->memtupsize;
-
-       /* For simplicity, assume no memtuples are actually currently counted */
-       Assert(state->memtupcount == 0);
-
-       /*
-        * Refund STANDARDCHUNKHEADERSIZE per tuple.
-        *
-        * This sometimes fails to make memory use perfectly balanced, but it
-        * should never make the situation worse.  Note that Assert-enabled builds
-        * get a larger refund, due to a varying STANDARDCHUNKHEADERSIZE.
-        */
-       refund = memtupsize * STANDARDCHUNKHEADERSIZE;
-       availMemLessRefund = state->availMem - refund;
+       unsigned int tuplen;
 
-       /*
-        * To establish balanced memory use after refunding palloc overhead,
-        * temporarily have our accounting indicate that we've allocated all
-        * memory we're allowed to less that refund, and call grow_memtuples()
-        * to have it increase the number of slots.
-        */
-       state->growmemtuples = true;
-       USEMEM(state, availMemLessRefund);
-       (void) grow_memtuples(state);
-       /* Should not matter, but be tidy */
-       FREEMEM(state, availMemLessRefund);
-       state->growmemtuples = false;
+       if (!state->mergeactive[srcTape])
+               return false;                   /* tape's run is already exhausted */
 
-#ifdef TRACE_SORT
-       if (trace_sort)
+       /* read next tuple, if any */
+       if ((tuplen = getlen(state, srcTape, true)) == 0)
        {
-               Size    OldKb = (memtupsize * sizeof(SortTuple) + 1023) / 1024;
-               Size    NewKb = (state->memtupsize * sizeof(SortTuple) + 1023) / 1024;
-
-               elog(LOG, "grew memtuples %1.2fx from %d (%zu KB) to %d (%zu KB) for final merge",
-                        (double) NewKb / (double) OldKb,
-                        memtupsize, OldKb,
-                        state->memtupsize, NewKb);
+               state->mergeactive[srcTape] = false;
+               return false;
        }
-#endif
+       READTUP(state, stup, srcTape, tuplen);
+
+       return true;
 }
 
 /*
- * mergebatch - initialize tuple memory in batch
- *
- * This allows sequential access to sorted tuples buffered in memory from
- * tapes/runs on disk during a final on-the-fly merge step.  Note that the
- * memory is not used for SortTuples, but for the underlying tuples (e.g.
- * MinimalTuples).
+ * dumptuples - remove tuples from memtuples and write initial run to tape
  *
- * Note that when batch memory is used, there is a simple division of space
- * into large buffers (one per active tape).  The conventional incremental
- * memory accounting (calling USEMEM() and FREEMEM()) is abandoned.  Instead,
- * when each tape's memory budget is exceeded, a retail palloc() "overflow" is
- * performed, which is then immediately detected in a way that is analogous to
- * LACKMEM().  This keeps each tape's use of memory fair, which is always a
- * goal.
+ * When alltuples = true, dump everything currently in memory.  (This case is
+ * only used at end of input data.)
  */
 static void
-mergebatch(Tuplesortstate *state, int64 spacePerTape)
+dumptuples(Tuplesortstate *state, bool alltuples)
 {
-       int             srcTape;
-
-       Assert(state->activeTapes > 0);
-       Assert(state->tuples);
+       int                     memtupwrite;
+       int                     i;
 
        /*
-        * For the purposes of tuplesort's memory accounting, the batch allocation
-        * is special, and regular memory accounting through USEMEM() calls is
-        * abandoned (see mergeprereadone()).
+        * Nothing to do if we still fit in available memory and have array slots,
+        * unless this is the final call during initial run generation.
         */
-       for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
-       {
-               char       *mergetuples;
-
-               if (!state->mergeactive[srcTape])
-                       continue;
-
-               /* Allocate buffer for each active tape */
-               mergetuples = MemoryContextAllocHuge(state->tuplecontext,
-                                                                                        spacePerTape);
-
-               /* Initialize state for tape */
-               state->mergetuples[srcTape] = mergetuples;
-               state->mergecurrent[srcTape] = mergetuples;
-               state->mergetail[srcTape] = mergetuples;
-               state->mergeoverflow[srcTape] = NULL;
-       }
-
-       state->batchUsed = true;
-       state->spacePerTape = spacePerTape;
-}
-
-/*
- * mergebatchone - prepare batch memory for one merge input tape
- *
- * This is called following the exhaustion of preread tuples for one input
- * tape.  All that actually occurs is that the state for the source tape is
- * reset to indicate that all memory may be reused.
- *
- * This routine must deal with fixing up the tuple that is about to be returned
- * to the client, due to "overflow" allocations.
- */
-static void
-mergebatchone(Tuplesortstate *state, int srcTape, SortTuple *rtup,
-                         bool *should_free)
-{
-       Assert(state->batchUsed);
+       if (state->memtupcount < state->memtupsize && !LACKMEM(state) &&
+               !alltuples)
+               return;
 
        /*
-        * Tuple about to be returned to caller ("stup") is final preread tuple
-        * from tape, just removed from the top of the heap.  Special steps around
-        * memory management must be performed for that tuple, to make sure it
-        * isn't overwritten early.
+        * Final call might require no sorting, in rare cases where we just so
+        * happen to have previously LACKMEM()'d at the point where exactly all
+        * remaining tuples are loaded into memory, just before input was
+        * exhausted.
+        *
+        * In general, short final runs are quite possible.  Rather than allowing
+        * a special case where there was a superfluous selectnewtape() call (i.e.
+        * a call with no subsequent run actually written to destTape), we prefer
+        * to write out a 0 tuple run.
+        *
+        * mergereadnext() is prepared for 0 tuple runs, and will reliably mark
+        * the tape inactive for the merge when called from beginmerge().  This
+        * case is therefore similar to the case where mergeonerun() finds a dummy
+        * run for the tape, and so doesn't need to merge a run from the tape (or
+        * conceptually "merges" the dummy run, if you prefer).  According to
+        * Knuth, Algorithm D "isn't strictly optimal" in its method of
+        * distribution and dummy run assignment; this edge case seems very
+        * unlikely to make that appreciably worse.
         */
-       if (!state->mergeoverflow[srcTape])
-       {
-               Size    tupLen;
-
-               /*
-                * Mark tuple buffer range for reuse, but be careful to move final,
-                * tail tuple to start of space for next run so that it's available
-                * to caller when stup is returned, and remains available at least
-                * until the next tuple is requested.
-                */
-               tupLen = state->mergecurrent[srcTape] - state->mergetail[srcTape];
-               state->mergecurrent[srcTape] = state->mergetuples[srcTape];
-               memmove(state->mergecurrent[srcTape], state->mergetail[srcTape],
-                               tupLen);
-
-               /* Make SortTuple at top of the merge heap point to new tuple */
-               rtup->tuple = (void *) state->mergecurrent[srcTape];
-
-               state->mergetail[srcTape] = state->mergecurrent[srcTape];
-               state->mergecurrent[srcTape] += tupLen;
-       }
-       else
-       {
-               /*
-                * Handle an "overflow" retail palloc.
-                *
-                * This is needed when we run out of tuple memory for the tape.
-                */
-               state->mergecurrent[srcTape] = state->mergetuples[srcTape];
-               state->mergetail[srcTape] = state->mergetuples[srcTape];
-
-               if (rtup->tuple)
-               {
-                       Assert(rtup->tuple == (void *) state->mergeoverflow[srcTape]);
-                       /* Caller should free palloc'd tuple */
-                       *should_free = true;
-               }
-               state->mergeoverflow[srcTape] = NULL;
-       }
-}
-
-/*
- * mergebatchfreetape - handle final clean-up for batch memory once tape is
- * about to become exhausted
- *
- * All tuples are returned from tape, but a single final tuple, *rtup, is to be
- * passed back to caller.  Free tape's batch allocation buffer while ensuring
- * that the final tuple is managed appropriately.
- */
-static void
-mergebatchfreetape(Tuplesortstate *state, int srcTape, SortTuple *rtup,
-                                  bool *should_free)
-{
-       Assert(state->batchUsed);
-       Assert(state->status == TSS_FINALMERGE);
+       Assert(state->status == TSS_BUILDRUNS);
 
        /*
-        * Tuple may or may not already be an overflow allocation from
-        * mergebatchone()
+        * It seems unlikely that this limit will ever be exceeded, but take no
+        * chances
         */
-       if (!*should_free && rtup->tuple)
-       {
-               /*
-                * Final tuple still in tape's batch allocation.
-                *
-                * Return palloc()'d copy to caller, and have it freed in a similar
-                * manner to overflow allocation.  Otherwise, we'd free batch memory
-                * and pass back a pointer to garbage.  Note that we deliberately
-                * allocate this in the parent tuplesort context, to be on the safe
-                * side.
-                */
-               Size            tuplen;
-               void       *oldTuple = rtup->tuple;
+       if (state->currentRun == INT_MAX)
+               ereport(ERROR,
+                               (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+                                errmsg("cannot have more than %d runs for an external sort",
+                                               INT_MAX)));
 
-               tuplen = state->mergecurrent[srcTape] - state->mergetail[srcTape];
-               rtup->tuple = MemoryContextAlloc(state->sortcontext, tuplen);
-               memcpy(rtup->tuple, oldTuple, tuplen);
-               *should_free = true;
-       }
+       state->currentRun++;
 
-       /* Free spacePerTape-sized buffer */
-       pfree(state->mergetuples[srcTape]);
-}
+#ifdef TRACE_SORT
+       if (trace_sort)
+               elog(LOG, "worker %d starting quicksort of run %d: %s",
+                        state->worker, state->currentRun,
+                        pg_rusage_show(&state->ru_start));
+#endif
 
-/*
- * mergebatchalloc - allocate memory for one tuple using a batch memory
- * "logical allocation".
- *
- * This is used for the final on-the-fly merge phase only.  READTUP() routines
- * receive memory from here in place of palloc() and USEMEM() calls.
- *
- * Tuple tapenum is passed, ensuring each tape's tuples are stored in sorted,
- * contiguous order (while allowing safe reuse of memory made available to
- * each tape).  This maximizes locality of access as tuples are returned by
- * final merge.
- *
- * Caller must not subsequently attempt to free memory returned here.  In
- * general, only mergebatch* functions know about how memory returned from
- * here should be freed, and this function's caller must ensure that batch
- * memory management code will definitely have the opportunity to do the right
- * thing during the final on-the-fly merge.
- */
-static void *
-mergebatchalloc(Tuplesortstate *state, int tapenum, Size tuplen)
-{
-       Size            reserve_tuplen = MAXALIGN(tuplen);
-       char       *ret;
+       /*
+        * Sort all tuples accumulated within the allowed amount of memory for
+        * this run using quicksort
+        */
+       tuplesort_sort_memtuples(state);
 
-       /* Should overflow at most once before mergebatchone() call: */
-       Assert(state->mergeoverflow[tapenum] == NULL);
-       Assert(state->batchUsed);
+#ifdef TRACE_SORT
+       if (trace_sort)
+               elog(LOG, "worker %d finished quicksort of run %d: %s",
+                        state->worker, state->currentRun,
+                        pg_rusage_show(&state->ru_start));
+#endif
 
-       /* It should be possible to use precisely spacePerTape memory at once */
-       if (state->mergecurrent[tapenum] + reserve_tuplen <=
-               state->mergetuples[tapenum] + state->spacePerTape)
-       {
-               /*
-                * Usual case -- caller is returned pointer into its tape's buffer, and
-                * an offset from that point is recorded as where tape has consumed up
-                * to for current round of preloading.
-                */
-               ret = state->mergetail[tapenum] = state->mergecurrent[tapenum];
-               state->mergecurrent[tapenum] += reserve_tuplen;
-       }
-       else
+       memtupwrite = state->memtupcount;
+       for (i = 0; i < memtupwrite; i++)
        {
-               /*
-                * Allocate memory, and record as tape's overflow allocation.  This
-                * will be detected quickly, in a similar fashion to a LACKMEM()
-                * condition, and should not happen again before a new round of
-                * preloading for caller's tape.  Note that we deliberately allocate
-                * this in the parent tuplesort context, to be on the safe side.
-                *
-                * Sometimes, this does not happen because merging runs out of slots
-                * before running out of memory.
-                */
-               ret = state->mergeoverflow[tapenum] =
-                       MemoryContextAlloc(state->sortcontext, tuplen);
+               WRITETUP(state, state->tp_tapenum[state->destTape],
+                                &state->memtuples[i]);
+               state->memtupcount--;
        }
 
-       return ret;
-}
-
-/*
- * mergepreread - load tuples from merge input tapes
- *
- * This routine exists to improve sequentiality of reads during a merge pass,
- * as explained in the header comments of this file.  Load tuples from each
- * active source tape until the tape's run is exhausted or it has used up
- * its fair share of available memory.  In any case, we guarantee that there
- * is at least one preread tuple available from each unexhausted input tape.
- *
- * We invoke this routine at the start of a merge pass for initial load,
- * and then whenever any tape's preread data runs out.  Note that we load
- * as much data as possible from all tapes, not just the one that ran out.
- * This is because logtape.c works best with a usage pattern that alternates
- * between reading a lot of data and writing a lot of data, so whenever we
- * are forced to read, we should fill working memory completely.
- *
- * In FINALMERGE state, we *don't* use this routine, but instead just preread
- * from the single tape that ran dry.  There's no read/write alternation in
- * that state and so no point in scanning through all the tapes to fix one.
- * (Moreover, there may be quite a lot of inactive tapes in that state, since
- * we might have had many fewer runs than tapes.  In a regular tape-to-tape
- * merge we can expect most of the tapes to be active.  Plus, only
- * FINALMERGE state has to consider memory management for a batch
- * allocation.)
- */
-static void
-mergepreread(Tuplesortstate *state)
-{
-       int                     srcTape;
-
-       for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
-               mergeprereadone(state, srcTape);
-}
-
-/*
- * mergeprereadone - load tuples from one merge input tape
- *
- * Read tuples from the specified tape until it has used up its free memory
- * or array slots; but ensure that we have at least one tuple, if any are
- * to be had.
- */
-static void
-mergeprereadone(Tuplesortstate *state, int srcTape)
-{
-       unsigned int tuplen;
-       SortTuple       stup;
-       int                     tupIndex;
-       int64           priorAvail,
-                               spaceUsed;
-
-       if (!state->mergeactive[srcTape])
-               return;                                 /* tape's run is already exhausted */
-
        /*
-        * Manage per-tape availMem.  Only actually matters when batch memory not
-        * in use.
-        */
-       priorAvail = state->availMem;
-       state->availMem = state->mergeavailmem[srcTape];
-
-       /*
-        * When batch memory is used if final on-the-fly merge, only mergeoverflow
-        * test is relevant; otherwise, only LACKMEM() test is relevant.
+        * Reset tuple memory.  We've freed all of the tuples that we previously
+        * allocated.  It's important to avoid fragmentation when there is a stark
+        * change in the sizes of incoming tuples.  Fragmentation due to
+        * AllocSetFree's bucketing by size class might be particularly bad if
+        * this step wasn't taken.
         */
-       while ((state->mergeavailslots[srcTape] > 0 &&
-                       state->mergeoverflow[srcTape] == NULL && !LACKMEM(state)) ||
-                  state->mergenext[srcTape] == 0)
-       {
-               /* read next tuple, if any */
-               if ((tuplen = getlen(state, srcTape, true)) == 0)
-               {
-                       state->mergeactive[srcTape] = false;
-                       break;
-               }
-               READTUP(state, &stup, srcTape, tuplen);
-               /* find a free slot in memtuples[] for it */
-               tupIndex = state->mergefreelist;
-               if (tupIndex)
-                       state->mergefreelist = state->memtuples[tupIndex].tupindex;
-               else
-               {
-                       tupIndex = state->mergefirstfree++;
-                       Assert(tupIndex < state->memtupsize);
-               }
-               state->mergeavailslots[srcTape]--;
-               /* store tuple, append to list for its tape */
-               stup.tupindex = 0;
-               state->memtuples[tupIndex] = stup;
-               if (state->mergelast[srcTape])
-                       state->memtuples[state->mergelast[srcTape]].tupindex = tupIndex;
-               else
-                       state->mergenext[srcTape] = tupIndex;
-               state->mergelast[srcTape] = tupIndex;
-       }
-       /* update per-tape and global availmem counts */
-       spaceUsed = state->mergeavailmem[srcTape] - state->availMem;
-       state->mergeavailmem[srcTape] = state->availMem;
-       state->availMem = priorAvail - spaceUsed;
-}
-
-/*
- * dumptuples - remove tuples from heap and write to tape
- *
- * This is used during initial-run building, but not during merging.
- *
- * When alltuples = false, dump only enough tuples to get under the
- * availMem limit (and leave at least one tuple in the heap in any case,
- * since puttuple assumes it always has a tuple to compare to).  We also
- * insist there be at least one free slot in the memtuples[] array.
- *
- * When alltuples = true, dump everything currently in memory.
- * (This case is only used at end of input data.)
- *
- * If we empty the heap, close out the current run and return (this should
- * only happen at end of input data).  If we see that the tuple run number
- * at the top of the heap has changed, start a new run.
- */
-static void
-dumptuples(Tuplesortstate *state, bool alltuples)
-{
-       while (alltuples ||
-                  (LACKMEM(state) && state->memtupcount > 1) ||
-                  state->memtupcount >= state->memtupsize)
-       {
-               /*
-                * Dump the heap's frontmost entry, and sift up to remove it from the
-                * heap.
-                */
-               Assert(state->memtupcount > 0);
-               WRITETUP(state, state->tp_tapenum[state->destTape],
-                                &state->memtuples[0]);
-               tuplesort_heap_siftup(state, true);
+       MemoryContextReset(state->tuplecontext);
 
-               /*
-                * If the heap is empty *or* top run number has changed, we've
-                * finished the current run.
-                */
-               if (state->memtupcount == 0 ||
-                       state->currentRun != state->memtuples[0].tupindex)
-               {
-                       markrunend(state, state->tp_tapenum[state->destTape]);
-                       state->currentRun++;
-                       state->tp_runs[state->destTape]++;
-                       state->tp_dummy[state->destTape]--; /* per Alg D step D2 */
+       markrunend(state, state->tp_tapenum[state->destTape]);
+       state->tp_runs[state->destTape]++;
+       state->tp_dummy[state->destTape]--; /* per Alg D step D2 */
 
 #ifdef TRACE_SORT
-                       if (trace_sort)
-                               elog(LOG, "finished writing%s run %d to tape %d: %s",
-                                        (state->memtupcount == 0) ? " final" : "",
-                                        state->currentRun, state->destTape,
-                                        pg_rusage_show(&state->ru_start));
+       if (trace_sort)
+               elog(LOG, "worker %d finished writing run %d to tape %d: %s",
+                        state->worker, state->currentRun, state->destTape,
+                        pg_rusage_show(&state->ru_start));
 #endif
 
-                       /*
-                        * Done if heap is empty, else prepare for new run.
-                        */
-                       if (state->memtupcount == 0)
-                               break;
-                       Assert(state->currentRun == state->memtuples[0].tupindex);
-                       selectnewtape(state);
-               }
-       }
+       if (!alltuples)
+               selectnewtape(state);
 }
 
 /*
@@ -3175,9 +3040,9 @@ tuplesort_rescan(Tuplesortstate *state)
                        state->markpos_eof = false;
                        break;
                case TSS_SORTEDONTAPE:
-                       LogicalTapeRewind(state->tapeset,
-                                                         state->result_tape,
-                                                         false);
+                       LogicalTapeRewindForRead(state->tapeset,
+                                                                        state->result_tape,
+                                                                        0);
                        state->eof_reached = false;
                        state->markpos_block = 0L;
                        state->markpos_offset = 0;
@@ -3240,11 +3105,10 @@ tuplesort_restorepos(Tuplesortstate *state)
                        state->eof_reached = state->markpos_eof;
                        break;
                case TSS_SORTEDONTAPE:
-                       if (!LogicalTapeSeek(state->tapeset,
-                                                                state->result_tape,
-                                                                state->markpos_block,
-                                                                state->markpos_offset))
-                               elog(ERROR, "tuplesort_restorepos failed");
+                       LogicalTapeSeek(state->tapeset,
+                                                       state->result_tape,
+                                                       state->markpos_block,
+                                                       state->markpos_offset);
                        state->eof_reached = state->markpos_eof;
                        break;
                default:
@@ -3260,13 +3124,10 @@ tuplesort_restorepos(Tuplesortstate *state)
  *
  * This can be called after tuplesort_performsort() finishes to obtain
  * printable summary information about how the sort was performed.
- * spaceUsed is measured in kilobytes.
  */
 void
 tuplesort_get_stats(Tuplesortstate *state,
-                                       const char **sortMethod,
-                                       const char **spaceType,
-                                       long *spaceUsed)
+                                       TuplesortInstrumentation *stats)
 {
        /*
         * Note: it might seem we should provide both memory and disk usage for a
@@ -3279,47 +3140,72 @@ tuplesort_get_stats(Tuplesortstate *state,
         */
        if (state->tapeset)
        {
-               *spaceType = "Disk";
-               *spaceUsed = LogicalTapeSetBlocks(state->tapeset) * (BLCKSZ / 1024);
+               stats->spaceType = SORT_SPACE_TYPE_DISK;
+               stats->spaceUsed = LogicalTapeSetBlocks(state->tapeset) * (BLCKSZ / 1024);
        }
        else
        {
-               *spaceType = "Memory";
-               *spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
+               stats->spaceType = SORT_SPACE_TYPE_MEMORY;
+               stats->spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
        }
 
        switch (state->status)
        {
                case TSS_SORTEDINMEM:
                        if (state->boundUsed)
-                               *sortMethod = "top-N heapsort";
+                               stats->sortMethod = SORT_TYPE_TOP_N_HEAPSORT;
                        else
-                               *sortMethod = "quicksort";
+                               stats->sortMethod = SORT_TYPE_QUICKSORT;
                        break;
                case TSS_SORTEDONTAPE:
-                       *sortMethod = "external sort";
+                       stats->sortMethod = SORT_TYPE_EXTERNAL_SORT;
                        break;
                case TSS_FINALMERGE:
-                       *sortMethod = "external merge";
+                       stats->sortMethod = SORT_TYPE_EXTERNAL_MERGE;
                        break;
                default:
-                       *sortMethod = "still in progress";
+                       stats->sortMethod = SORT_TYPE_STILL_IN_PROGRESS;
                        break;
        }
 }
 
+/*
+ * Convert TuplesortMethod to a string.
+ */
+const char *
+tuplesort_method_name(TuplesortMethod m)
+{
+       switch (m)
+       {
+               case SORT_TYPE_STILL_IN_PROGRESS:
+                       return "still in progress";
+               case SORT_TYPE_TOP_N_HEAPSORT:
+                       return "top-N heapsort";
+               case SORT_TYPE_QUICKSORT:
+                       return "quicksort";
+               case SORT_TYPE_EXTERNAL_SORT:
+                       return "external sort";
+               case SORT_TYPE_EXTERNAL_MERGE:
+                       return "external merge";
+       }
+
+       return "unknown";
+}
 
 /*
- * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
- *
- * Compare two SortTuples.  If checkIndex is true, use the tuple index
- * as the front of the sort key; otherwise, no.
+ * Convert TuplesortSpaceType to a string.
  */
+const char *
+tuplesort_space_type_name(TuplesortSpaceType t)
+{
+       Assert(t == SORT_SPACE_TYPE_DISK || t == SORT_SPACE_TYPE_MEMORY);
+       return t == SORT_SPACE_TYPE_DISK ? "Disk" : "Memory";
+}
+
 
-#define HEAPCOMPARE(tup1,tup2) \
-       (checkIndex && ((tup1)->tupindex != (tup2)->tupindex) ? \
-        ((tup1)->tupindex) - ((tup2)->tupindex) : \
-        COMPARETUP(state, tup1, tup2))
+/*
+ * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
+ */
 
 /*
  * Convert the existing unordered array of SortTuples to a bounded heap,
@@ -3329,10 +3215,6 @@ tuplesort_get_stats(Tuplesortstate *state,
  * at the root (array entry zero), instead of the smallest as in the normal
  * sort case.  This allows us to discard the largest entry cheaply.
  * Therefore, we temporarily reverse the sort direction.
- *
- * We assume that all entries in a bounded heap will always have tupindex
- * zero; it therefore doesn't matter that HEAPCOMPARE() doesn't reverse
- * the direction of comparison for tupindexes.
  */
 static void
 make_bounded_heap(Tuplesortstate *state)
@@ -3343,6 +3225,7 @@ make_bounded_heap(Tuplesortstate *state)
        Assert(state->status == TSS_INITIAL);
        Assert(state->bounded);
        Assert(tupcount >= state->bound);
+       Assert(SERIAL(state));
 
        /* Reverse sort direction so largest entry will be at root */
        reversedirection(state);
@@ -3350,27 +3233,28 @@ make_bounded_heap(Tuplesortstate *state)
        state->memtupcount = 0;         /* make the heap empty */
        for (i = 0; i < tupcount; i++)
        {
-               if (state->memtupcount >= state->bound &&
-                 COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0)
-               {
-                       /* New tuple would just get thrown out, so skip it */
-                       free_sort_tuple(state, &state->memtuples[i]);
-                       CHECK_FOR_INTERRUPTS();
-               }
-               else
+               if (state->memtupcount < state->bound)
                {
                        /* Insert next tuple into heap */
                        /* Must copy source tuple to avoid possible overwrite */
                        SortTuple       stup = state->memtuples[i];
 
-                       tuplesort_heap_insert(state, &stup, 0, false);
-
-                       /* If heap too full, discard largest entry */
-                       if (state->memtupcount > state->bound)
+                       tuplesort_heap_insert(state, &stup);
+               }
+               else
+               {
+                       /*
+                        * The heap is full.  Replace the largest entry with the new
+                        * tuple, or just discard it, if it's larger than anything already
+                        * in the heap.
+                        */
+                       if (COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0)
                        {
-                               free_sort_tuple(state, &state->memtuples[0]);
-                               tuplesort_heap_siftup(state, false);
+                               free_sort_tuple(state, &state->memtuples[i]);
+                               CHECK_FOR_INTERRUPTS();
                        }
+                       else
+                               tuplesort_heap_replace_top(state, &state->memtuples[i]);
                }
        }
 
@@ -3389,18 +3273,19 @@ sort_bounded_heap(Tuplesortstate *state)
        Assert(state->status == TSS_BOUNDED);
        Assert(state->bounded);
        Assert(tupcount == state->bound);
+       Assert(SERIAL(state));
 
        /*
-        * We can unheapify in place because each sift-up will remove the largest
-        * entry, which we can promptly store in the newly freed slot at the end.
-        * Once we're down to a single-entry heap, we're done.
+        * We can unheapify in place because each delete-top call will remove the
+        * largest entry, which we can promptly store in the newly freed slot at
+        * the end.  Once we're down to a single-entry heap, we're done.
         */
        while (state->memtupcount > 1)
        {
                SortTuple       stup = state->memtuples[0];
 
                /* this sifts-up the next-largest entry and decreases memtupcount */
-               tuplesort_heap_siftup(state, false);
+               tuplesort_heap_delete_top(state);
                state->memtuples[state->memtupcount] = stup;
        }
        state->memtupcount = tupcount;
@@ -3415,31 +3300,45 @@ sort_bounded_heap(Tuplesortstate *state)
        state->boundUsed = true;
 }
 
+/*
+ * Sort all memtuples using specialized qsort() routines.
+ *
+ * Quicksort is used for small in-memory sorts, and external sort runs.
+ */
+static void
+tuplesort_sort_memtuples(Tuplesortstate *state)
+{
+       Assert(!LEADER(state));
+
+       if (state->memtupcount > 1)
+       {
+               /* Can we use the single-key sort function? */
+               if (state->onlyKey != NULL)
+                       qsort_ssup(state->memtuples, state->memtupcount,
+                                          state->onlyKey);
+               else
+                       qsort_tuple(state->memtuples,
+                                               state->memtupcount,
+                                               state->comparetup,
+                                               state);
+       }
+}
+
 /*
  * Insert a new tuple into an empty or existing heap, maintaining the
  * heap invariant.  Caller is responsible for ensuring there's room.
  *
- * Note: we assume *tuple is a temporary variable that can be scribbled on.
- * For some callers, tuple actually points to a memtuples[] entry above the
+ * Note: For some callers, tuple points to a memtuples[] entry above the
  * end of the heap.  This is safe as long as it's not immediately adjacent
  * to the end of the heap (ie, in the [memtupcount] array entry) --- if it
  * is, it might get overwritten before being moved into the heap!
  */
 static void
-tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple,
-                                         int tupleindex, bool checkIndex)
+tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple)
 {
        SortTuple  *memtuples;
        int                     j;
 
-       /*
-        * Save the tupleindex --- see notes above about writing on *tuple. It's a
-        * historical artifact that tupleindex is passed as a separate argument
-        * and not in *tuple, but it's notationally convenient so let's leave it
-        * that way.
-        */
-       tuple->tupindex = tupleindex;
-
        memtuples = state->memtuples;
        Assert(state->memtupcount < state->memtupsize);
 
@@ -3454,7 +3353,7 @@ tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple,
        {
                int                     i = (j - 1) >> 1;
 
-               if (HEAPCOMPARE(tuple, &memtuples[i]) >= 0)
+               if (COMPARETUP(state, tuple, &memtuples[i]) >= 0)
                        break;
                memtuples[j] = memtuples[i];
                j = i;
@@ -3463,35 +3362,64 @@ tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple,
 }
 
 /*
- * The tuple at state->memtuples[0] has been removed from the heap.
- * Decrement memtupcount, and sift up to maintain the heap invariant.
+ * Remove the tuple at state->memtuples[0] from the heap.  Decrement
+ * memtupcount, and sift up to maintain the heap invariant.
+ *
+ * The caller has already free'd the tuple the top node points to,
+ * if necessary.
  */
 static void
-tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex)
+tuplesort_heap_delete_top(Tuplesortstate *state)
 {
        SortTuple  *memtuples = state->memtuples;
        SortTuple  *tuple;
-       int                     i,
-                               n;
 
        if (--state->memtupcount <= 0)
                return;
 
+       /*
+        * Remove the last tuple in the heap, and re-insert it, by replacing the
+        * current top node with it.
+        */
+       tuple = &memtuples[state->memtupcount];
+       tuplesort_heap_replace_top(state, tuple);
+}
+
+/*
+ * Replace the tuple at state->memtuples[0] with a new tuple.  Sift up to
+ * maintain the heap invariant.
+ *
+ * This corresponds to Knuth's "sift-up" algorithm (Algorithm 5.2.3H,
+ * Heapsort, steps H3-H8).
+ */
+static void
+tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple)
+{
+       SortTuple  *memtuples = state->memtuples;
+       unsigned int i,
+                               n;
+
+       Assert(state->memtupcount >= 1);
+
        CHECK_FOR_INTERRUPTS();
 
+       /*
+        * state->memtupcount is "int", but we use "unsigned int" for i, j, n.
+        * This prevents overflow in the "2 * i + 1" calculation, since at the top
+        * of the loop we must have i < n <= INT_MAX <= UINT_MAX/2.
+        */
        n = state->memtupcount;
-       tuple = &memtuples[n];          /* tuple that must be reinserted */
        i = 0;                                          /* i is where the "hole" is */
        for (;;)
        {
-               int                     j = 2 * i + 1;
+               unsigned int j = 2 * i + 1;
 
                if (j >= n)
                        break;
                if (j + 1 < n &&
-                       HEAPCOMPARE(&memtuples[j], &memtuples[j + 1]) > 0)
+                       COMPARETUP(state, &memtuples[j], &memtuples[j + 1]) > 0)
                        j++;
-               if (HEAPCOMPARE(tuple, &memtuples[j]) <= 0)
+               if (COMPARETUP(state, tuple, &memtuples[j]) <= 0)
                        break;
                memtuples[i] = memtuples[j];
                i = j;
@@ -3544,38 +3472,31 @@ markrunend(Tuplesortstate *state, int tapenum)
 }
 
 /*
- * Get memory for tuple from within READTUP() routine.  Allocate
- * memory and account for that, or consume from tape's batch
- * allocation.
+ * Get memory for tuple from within READTUP() routine.
  *
- * Memory returned here in the final on-the-fly merge case is recycled
- * from tape's batch allocation.  Otherwise, callers must pfree() or
- * reset tuple child memory context, and account for that with a
- * FREEMEM().  Currently, this only ever needs to happen in WRITETUP()
- * routines.
+ * We use next free slot from the slab allocator, or palloc() if the tuple
+ * is too large for that.
  */
 static void *
-readtup_alloc(Tuplesortstate *state, int tapenum, Size tuplen)
+readtup_alloc(Tuplesortstate *state, Size tuplen)
 {
-       if (state->batchUsed)
-       {
-               /*
-                * No USEMEM() call, because during final on-the-fly merge
-                * accounting is based on tape-private state. ("Overflow"
-                * allocations are detected as an indication that a new round
-                * or preloading is required. Preloading marks existing
-                * contents of tape's batch buffer for reuse.)
-                */
-               return mergebatchalloc(state, tapenum, tuplen);
-       }
+       SlabSlot   *buf;
+
+       /*
+        * We pre-allocate enough slots in the slab arena that we should never run
+        * out.
+        */
+       Assert(state->slabFreeHead);
+
+       if (tuplen > SLAB_SLOT_SIZE || !state->slabFreeHead)
+               return MemoryContextAlloc(state->sortcontext, tuplen);
        else
        {
-               char       *ret;
+               buf = state->slabFreeHead;
+               /* Reuse this slot */
+               state->slabFreeHead = buf->nextfree;
 
-               /* Batch allocation yet to be performed */
-               ret = MemoryContextAlloc(state->tuplecontext, tuplen);
-               USEMEM(state, GetMemoryChunkSpace(ret));
-               return ret;
+               return buf;
        }
 }
 
@@ -3705,8 +3626,7 @@ copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup)
                 * ensure a consistent representation (current tuple was just
                 * handled).  It does not matter if some dumped tuples are already
                 * sorted on tape, since serialized tuples lack abbreviated keys
-                * (TSS_BUILDRUNS state prevents control reaching here in any
-                * case).
+                * (TSS_BUILDRUNS state prevents control reaching here in any case).
                 */
                for (i = 0; i < state->memtupcount; i++)
                {
@@ -3745,8 +3665,11 @@ writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup)
                LogicalTapeWrite(state->tapeset, tapenum,
                                                 (void *) &tuplen, sizeof(tuplen));
 
-       FREEMEM(state, GetMemoryChunkSpace(tuple));
-       heap_free_minimal_tuple(tuple);
+       if (!state->slabAllocatorUsed)
+       {
+               FREEMEM(state, GetMemoryChunkSpace(tuple));
+               heap_free_minimal_tuple(tuple);
+       }
 }
 
 static void
@@ -3755,7 +3678,7 @@ readtup_heap(Tuplesortstate *state, SortTuple *stup,
 {
        unsigned int tupbodylen = len - sizeof(int);
        unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET;
-       MinimalTuple tuple = (MinimalTuple) readtup_alloc(state, tapenum, tuplen);
+       MinimalTuple tuple = (MinimalTuple) readtup_alloc(state, tuplen);
        char       *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
        HeapTupleData htup;
 
@@ -3795,7 +3718,7 @@ comparetup_cluster(const SortTuple *a, const SortTuple *b,
                                datum2;
        bool            isnull1,
                                isnull2;
-       AttrNumber      leading = state->indexInfo->ii_KeyAttrNumbers[0];
+       AttrNumber      leading = state->indexInfo->ii_IndexAttrNumbers[0];
 
        /* Be prepared to compare additional sort keys */
        ltup = (HeapTuple) a->tuple;
@@ -3838,7 +3761,7 @@ comparetup_cluster(const SortTuple *a, const SortTuple *b,
 
                for (; nkey < state->nKeys; nkey++, sortKey++)
                {
-                       AttrNumber      attno = state->indexInfo->ii_KeyAttrNumbers[nkey];
+                       AttrNumber      attno = state->indexInfo->ii_IndexAttrNumbers[nkey];
 
                        datum1 = heap_getattr(ltup, attno, tupDesc, &isnull1);
                        datum2 = heap_getattr(rtup, attno, tupDesc, &isnull2);
@@ -3869,11 +3792,11 @@ comparetup_cluster(const SortTuple *a, const SortTuple *b,
 
                ecxt_scantuple = GetPerTupleExprContext(state->estate)->ecxt_scantuple;
 
-               ExecStoreTuple(ltup, ecxt_scantuple, InvalidBuffer, false);
+               ExecStoreHeapTuple(ltup, ecxt_scantuple, false);
                FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate,
                                           l_index_values, l_index_isnull);
 
-               ExecStoreTuple(rtup, ecxt_scantuple, InvalidBuffer, false);
+               ExecStoreHeapTuple(rtup, ecxt_scantuple, false);
                FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate,
                                           r_index_values, r_index_isnull);
 
@@ -3910,11 +3833,11 @@ copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup)
         * set up first-column key value, and potentially abbreviate, if it's a
         * simple column
         */
-       if (state->indexInfo->ii_KeyAttrNumbers[0] == 0)
+       if (state->indexInfo->ii_IndexAttrNumbers[0] == 0)
                return;
 
        original = heap_getattr(tuple,
-                                                       state->indexInfo->ii_KeyAttrNumbers[0],
+                                                       state->indexInfo->ii_IndexAttrNumbers[0],
                                                        state->tupDesc,
                                                        &stup->isnull1);
 
@@ -3950,8 +3873,7 @@ copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup)
                 * ensure a consistent representation (current tuple was just
                 * handled).  It does not matter if some dumped tuples are already
                 * sorted on tape, since serialized tuples lack abbreviated keys
-                * (TSS_BUILDRUNS state prevents control reaching here in any
-                * case).
+                * (TSS_BUILDRUNS state prevents control reaching here in any case).
                 */
                for (i = 0; i < state->memtupcount; i++)
                {
@@ -3959,9 +3881,9 @@ copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup)
 
                        tuple = (HeapTuple) mtup->tuple;
                        mtup->datum1 = heap_getattr(tuple,
-                                                                         state->indexInfo->ii_KeyAttrNumbers[0],
+                                                                               state->indexInfo->ii_IndexAttrNumbers[0],
                                                                                state->tupDesc,
-                                                                               &stup->isnull1);
+                                                                               &mtup->isnull1);
                }
        }
 }
@@ -3983,8 +3905,11 @@ writetup_cluster(Tuplesortstate *state, int tapenum, SortTuple *stup)
                LogicalTapeWrite(state->tapeset, tapenum,
                                                 &tuplen, sizeof(tuplen));
 
-       FREEMEM(state, GetMemoryChunkSpace(tuple));
-       heap_freetuple(tuple);
+       if (!state->slabAllocatorUsed)
+       {
+               FREEMEM(state, GetMemoryChunkSpace(tuple));
+               heap_freetuple(tuple);
+       }
 }
 
 static void
@@ -3993,7 +3918,6 @@ readtup_cluster(Tuplesortstate *state, SortTuple *stup,
 {
        unsigned int t_len = tuplen - sizeof(ItemPointerData) - sizeof(int);
        HeapTuple       tuple = (HeapTuple) readtup_alloc(state,
-                                                                                                 tapenum,
                                                                                                  t_len + HEAPTUPLESIZE);
 
        /* Reconstruct the HeapTupleData header */
@@ -4011,14 +3935,13 @@ readtup_cluster(Tuplesortstate *state, SortTuple *stup,
                                                         &tuplen, sizeof(tuplen));
        stup->tuple = (void *) tuple;
        /* set up first-column key value, if it's a simple column */
-       if (state->indexInfo->ii_KeyAttrNumbers[0] != 0)
+       if (state->indexInfo->ii_IndexAttrNumbers[0] != 0)
                stup->datum1 = heap_getattr(tuple,
-                                                                       state->indexInfo->ii_KeyAttrNumbers[0],
+                                                                       state->indexInfo->ii_IndexAttrNumbers[0],
                                                                        state->tupDesc,
                                                                        &stup->isnull1);
 }
 
-
 /*
  * Routines specialized for IndexTuple case
  *
@@ -4130,13 +4053,14 @@ comparetup_index_btree(const SortTuple *a, const SortTuple *b,
                                 key_desc ? errdetail("Key %s is duplicated.", key_desc) :
                                 errdetail("Duplicate keys exist."),
                                 errtableconstraint(state->heapRel,
-                                                                RelationGetRelationName(state->indexRel))));
+                                                                       RelationGetRelationName(state->indexRel))));
        }
 
        /*
-        * If key values are equal, we sort on ItemPointer.  This does not affect
-        * validity of the finished index, but it may be useful to have index
-        * scans in physical order.
+        * If key values are equal, we sort on ItemPointer.  This is required for
+        * btree indexes, since heap TID is treated as an implicit last key
+        * attribute in order to ensure that all keys in the index are physically
+        * unique.
         */
        {
                BlockNumber blk1 = ItemPointerGetBlockNumber(&tuple1->t_tid);
@@ -4153,6 +4077,9 @@ comparetup_index_btree(const SortTuple *a, const SortTuple *b,
                        return (pos1 < pos2) ? -1 : 1;
        }
 
+       /* ItemPointer values should never be equal */
+       Assert(false);
+
        return 0;
 }
 
@@ -4160,8 +4087,8 @@ static int
 comparetup_index_hash(const SortTuple *a, const SortTuple *b,
                                          Tuplesortstate *state)
 {
-       uint32          hash1;
-       uint32          hash2;
+       Bucket          bucket1;
+       Bucket          bucket2;
        IndexTuple      tuple1;
        IndexTuple      tuple2;
 
@@ -4170,13 +4097,16 @@ comparetup_index_hash(const SortTuple *a, const SortTuple *b,
         * that the first column of the index tuple is the hash key.
         */
        Assert(!a->isnull1);
-       hash1 = DatumGetUInt32(a->datum1) & state->hash_mask;
+       bucket1 = _hash_hashkey2bucket(DatumGetUInt32(a->datum1),
+                                                                  state->max_buckets, state->high_mask,
+                                                                  state->low_mask);
        Assert(!b->isnull1);
-       hash2 = DatumGetUInt32(b->datum1) & state->hash_mask;
-
-       if (hash1 > hash2)
+       bucket2 = _hash_hashkey2bucket(DatumGetUInt32(b->datum1),
+                                                                  state->max_buckets, state->high_mask,
+                                                                  state->low_mask);
+       if (bucket1 > bucket2)
                return 1;
-       else if (hash1 < hash2)
+       else if (bucket1 < bucket2)
                return -1;
 
        /*
@@ -4202,6 +4132,9 @@ comparetup_index_hash(const SortTuple *a, const SortTuple *b,
                        return (pos1 < pos2) ? -1 : 1;
        }
 
+       /* ItemPointer values should never be equal */
+       Assert(false);
+
        return 0;
 }
 
@@ -4256,8 +4189,7 @@ copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup)
                 * ensure a consistent representation (current tuple was just
                 * handled).  It does not matter if some dumped tuples are already
                 * sorted on tape, since serialized tuples lack abbreviated keys
-                * (TSS_BUILDRUNS state prevents control reaching here in any
-                * case).
+                * (TSS_BUILDRUNS state prevents control reaching here in any case).
                 */
                for (i = 0; i < state->memtupcount; i++)
                {
@@ -4267,7 +4199,7 @@ copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup)
                        mtup->datum1 = index_getattr(tuple,
                                                                                 1,
                                                                                 RelationGetDescr(state->indexRel),
-                                                                                &stup->isnull1);
+                                                                                &mtup->isnull1);
                }
        }
 }
@@ -4287,8 +4219,11 @@ writetup_index(Tuplesortstate *state, int tapenum, SortTuple *stup)
                LogicalTapeWrite(state->tapeset, tapenum,
                                                 (void *) &tuplen, sizeof(tuplen));
 
-       FREEMEM(state, GetMemoryChunkSpace(tuple));
-       pfree(tuple);
+       if (!state->slabAllocatorUsed)
+       {
+               FREEMEM(state, GetMemoryChunkSpace(tuple));
+               pfree(tuple);
+       }
 }
 
 static void
@@ -4296,7 +4231,7 @@ readtup_index(Tuplesortstate *state, SortTuple *stup,
                          int tapenum, unsigned int len)
 {
        unsigned int tuplen = len - sizeof(unsigned int);
-       IndexTuple      tuple = (IndexTuple) readtup_alloc(state, tapenum, tuplen);
+       IndexTuple      tuple = (IndexTuple) readtup_alloc(state, tuplen);
 
        LogicalTapeReadExact(state->tapeset, tapenum,
                                                 tuple, tuplen);
@@ -4330,7 +4265,7 @@ comparetup_datum(const SortTuple *a, const SortTuple *b, Tuplesortstate *state)
 
        if (state->sortKeys->abbrev_converter)
                compare = ApplySortAbbrevFullComparator(PointerGetDatum(a->tuple), a->isnull1,
-                                                                          PointerGetDatum(b->tuple), b->isnull1,
+                                                                                               PointerGetDatum(b->tuple), b->isnull1,
                                                                                                state->sortKeys);
 
        return compare;
@@ -4377,7 +4312,7 @@ writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup)
                LogicalTapeWrite(state->tapeset, tapenum,
                                                 (void *) &writtenlen, sizeof(writtenlen));
 
-       if (stup->tuple)
+       if (!state->slabAllocatorUsed && stup->tuple)
        {
                FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
                pfree(stup->tuple);
@@ -4407,7 +4342,7 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup,
        }
        else
        {
-               void       *raddr = readtup_alloc(state, tapenum, tuplen);
+               void       *raddr = readtup_alloc(state, tuplen);
 
                LogicalTapeReadExact(state->tapeset, tapenum,
                                                         raddr, tuplen);
@@ -4421,6 +4356,229 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup,
                                                         &tuplen, sizeof(tuplen));
 }
 
+/*
+ * Parallel sort routines
+ */
+
+/*
+ * tuplesort_estimate_shared - estimate required shared memory allocation
+ *
+ * nWorkers is an estimate of the number of workers (it's the number that
+ * will be requested).
+ */
+Size
+tuplesort_estimate_shared(int nWorkers)
+{
+       Size            tapesSize;
+
+       Assert(nWorkers > 0);
+
+       /* Make sure that BufFile shared state is MAXALIGN'd */
+       tapesSize = mul_size(sizeof(TapeShare), nWorkers);
+       tapesSize = MAXALIGN(add_size(tapesSize, offsetof(Sharedsort, tapes)));
+
+       return tapesSize;
+}
+
+/*
+ * tuplesort_initialize_shared - initialize shared tuplesort state
+ *
+ * Must be called from leader process before workers are launched, to
+ * establish state needed up-front for worker tuplesortstates.  nWorkers
+ * should match the argument passed to tuplesort_estimate_shared().
+ */
+void
+tuplesort_initialize_shared(Sharedsort *shared, int nWorkers, dsm_segment *seg)
+{
+       int                     i;
+
+       Assert(nWorkers > 0);
+
+       SpinLockInit(&shared->mutex);
+       shared->currentWorker = 0;
+       shared->workersFinished = 0;
+       SharedFileSetInit(&shared->fileset, seg);
+       shared->nTapes = nWorkers;
+       for (i = 0; i < nWorkers; i++)
+       {
+               shared->tapes[i].firstblocknumber = 0L;
+       }
+}
+
+/*
+ * tuplesort_attach_shared - attach to shared tuplesort state
+ *
+ * Must be called by all worker processes.
+ */
+void
+tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg)
+{
+       /* Attach to SharedFileSet */
+       SharedFileSetAttach(&shared->fileset, seg);
+}
+
+/*
+ * worker_get_identifier - Assign and return ordinal identifier for worker
+ *
+ * The order in which these are assigned is not well defined, and should not
+ * matter; worker numbers across parallel sort participants need only be
+ * distinct and gapless.  logtape.c requires this.
+ *
+ * Note that the identifiers assigned from here have no relation to
+ * ParallelWorkerNumber number, to avoid making any assumption about
+ * caller's requirements.  However, we do follow the ParallelWorkerNumber
+ * convention of representing a non-worker with worker number -1.  This
+ * includes the leader, as well as serial Tuplesort processes.
+ */
+static int
+worker_get_identifier(Tuplesortstate *state)
+{
+       Sharedsort *shared = state->shared;
+       int                     worker;
+
+       Assert(WORKER(state));
+
+       SpinLockAcquire(&shared->mutex);
+       worker = shared->currentWorker++;
+       SpinLockRelease(&shared->mutex);
+
+       return worker;
+}
+
+/*
+ * worker_freeze_result_tape - freeze worker's result tape for leader
+ *
+ * This is called by workers just after the result tape has been determined,
+ * instead of calling LogicalTapeFreeze() directly.  They do so because
+ * workers require a few additional steps over similar serial
+ * TSS_SORTEDONTAPE external sort cases, which also happen here.  The extra
+ * steps are around freeing now unneeded resources, and representing to
+ * leader that worker's input run is available for its merge.
+ *
+ * There should only be one final output run for each worker, which consists
+ * of all tuples that were originally input into worker.
+ */
+static void
+worker_freeze_result_tape(Tuplesortstate *state)
+{
+       Sharedsort *shared = state->shared;
+       TapeShare       output;
+
+       Assert(WORKER(state));
+       Assert(state->result_tape != -1);
+       Assert(state->memtupcount == 0);
+
+       /*
+        * Free most remaining memory, in case caller is sensitive to our holding
+        * on to it.  memtuples may not be a tiny merge heap at this point.
+        */
+       pfree(state->memtuples);
+       /* Be tidy */
+       state->memtuples = NULL;
+       state->memtupsize = 0;
+
+       /*
+        * Parallel worker requires result tape metadata, which is to be stored in
+        * shared memory for leader
+        */
+       LogicalTapeFreeze(state->tapeset, state->result_tape, &output);
+
+       /* Store properties of output tape, and update finished worker count */
+       SpinLockAcquire(&shared->mutex);
+       shared->tapes[state->worker] = output;
+       shared->workersFinished++;
+       SpinLockRelease(&shared->mutex);
+}
+
+/*
+ * worker_nomergeruns - dump memtuples in worker, without merging
+ *
+ * This called as an alternative to mergeruns() with a worker when no
+ * merging is required.
+ */
+static void
+worker_nomergeruns(Tuplesortstate *state)
+{
+       Assert(WORKER(state));
+       Assert(state->result_tape == -1);
+
+       state->result_tape = state->tp_tapenum[state->destTape];
+       worker_freeze_result_tape(state);
+}
+
+/*
+ * leader_takeover_tapes - create tapeset for leader from worker tapes
+ *
+ * So far, leader Tuplesortstate has performed no actual sorting.  By now, all
+ * sorting has occurred in workers, all of which must have already returned
+ * from tuplesort_performsort().
+ *
+ * When this returns, leader process is left in a state that is virtually
+ * indistinguishable from it having generated runs as a serial external sort
+ * might have.
+ */
+static void
+leader_takeover_tapes(Tuplesortstate *state)
+{
+       Sharedsort *shared = state->shared;
+       int                     nParticipants = state->nParticipants;
+       int                     workersFinished;
+       int                     j;
+
+       Assert(LEADER(state));
+       Assert(nParticipants >= 1);
+
+       SpinLockAcquire(&shared->mutex);
+       workersFinished = shared->workersFinished;
+       SpinLockRelease(&shared->mutex);
+
+       if (nParticipants != workersFinished)
+               elog(ERROR, "cannot take over tapes before all workers finish");
+
+       /*
+        * Create the tapeset from worker tapes, including a leader-owned tape at
+        * the end.  Parallel workers are far more expensive than logical tapes,
+        * so the number of tapes allocated here should never be excessive.
+        *
+        * We still have a leader tape, though it's not possible to write to it
+        * due to restrictions in the shared fileset infrastructure used by
+        * logtape.c.  It will never be written to in practice because
+        * randomAccess is disallowed for parallel sorts.
+        */
+       inittapestate(state, nParticipants + 1);
+       state->tapeset = LogicalTapeSetCreate(nParticipants + 1, shared->tapes,
+                                                                                 &shared->fileset, state->worker);
+
+       /* mergeruns() relies on currentRun for # of runs (in one-pass cases) */
+       state->currentRun = nParticipants;
+
+       /*
+        * Initialize variables of Algorithm D to be consistent with runs from
+        * workers having been generated in the leader.
+        *
+        * There will always be exactly 1 run per worker, and exactly one input
+        * tape per run, because workers always output exactly 1 run, even when
+        * there were no input tuples for workers to sort.
+        */
+       for (j = 0; j < state->maxTapes; j++)
+       {
+               /* One real run; no dummy runs for worker tapes */
+               state->tp_fib[j] = 1;
+               state->tp_runs[j] = 1;
+               state->tp_dummy[j] = 0;
+               state->tp_tapenum[j] = j;
+       }
+       /* Leader tape gets one dummy run, and no real runs */
+       state->tp_fib[state->tapeRange] = 0;
+       state->tp_runs[state->tapeRange] = 0;
+       state->tp_dummy[state->tapeRange] = 1;
+
+       state->Level = 1;
+       state->destTape = 0;
+
+       state->status = TSS_BUILDRUNS;
+}
+
 /*
  * Convenience routine to free a tuple previously loaded into sort memory
  */