]> granicus.if.org Git - postgresql/blobdiff - src/backend/utils/sort/tuplesort.c
Make heap TID a tiebreaker nbtree index column.
[postgresql] / src / backend / utils / sort / tuplesort.c
index 52f05e6d9ad8b4de676525bb5e67b0e4f9a41a81..3eebd9ef51cf4ead7da7490c2af4a9149f1e76f4 100644 (file)
@@ -6,29 +6,18 @@
  * This module handles sorting of heap tuples, index tuples, or single
  * Datums (and could easily support other kinds of sortable objects,
  * if necessary).  It works efficiently for both small and large amounts
- * of data.  Small amounts are sorted in-memory using qsort(). Large
+ * of data.  Small amounts are sorted in-memory using qsort().  Large
  * amounts are sorted using temporary files and a standard external sort
  * algorithm.
  *
  * See Knuth, volume 3, for more than you want to know about the external
- * sorting algorithm.  We divide the input into sorted runs using replacement
- * selection, in the form of a priority tree implemented as a heap
- * (essentially his Algorithm 5.2.3H), then merge the runs using polyphase
- * merge, Knuth's Algorithm 5.4.2D.  The logical "tapes" used by Algorithm D
- * are implemented by logtape.c, which avoids space wastage by recycling
- * disk space as soon as each block is read from its "tape".
- *
- * We do not form the initial runs using Knuth's recommended replacement
- * selection data structure (Algorithm 5.4.1R), because it uses a fixed
- * number of records in memory at all times.  Since we are dealing with
- * tuples that may vary considerably in size, we want to be able to vary
- * the number of records kept in memory to ensure full utilization of the
- * allowed sort memory space.  So, we keep the tuples in a variable-size
- * heap, with the next record to go out at the top of the heap.  Like
- * Algorithm 5.4.1R, each record is stored with the run number that it
- * must go into, and we use (run number, key) as the ordering key for the
- * heap.  When the run number at the top of the heap changes, we know that
- * no more records of the prior run are left in the heap.
+ * sorting algorithm.  Historically, we divided the input into sorted runs
+ * using replacement selection, in the form of a priority tree implemented
+ * as a heap (essentially his Algorithm 5.2.3H), but now we always use
+ * quicksort for run generation.  We merge the runs using polyphase merge,
+ * Knuth's Algorithm 5.4.2D.  The logical "tapes" used by Algorithm D are
+ * implemented by logtape.c, which avoids space wastage by recycling disk
+ * space as soon as each block is read from its "tape".
  *
  * The approximate amount of memory allowed for any one sort operation
  * is specified in kilobytes by the caller (most pass work_mem).  Initially,
  * we haven't exceeded workMem.  If we reach the end of the input without
  * exceeding workMem, we sort the array using qsort() and subsequently return
  * tuples just by scanning the tuple array sequentially.  If we do exceed
- * workMem, we construct a heap using Algorithm H and begin to emit tuples
- * into sorted runs in temporary tapes, emitting just enough tuples at each
- * step to get back within the workMem limit.  Whenever the run number at
- * the top of the heap changes, we begin a new run with a new output tape
- * (selected per Algorithm D). After the end of the input is reached,
- * we dump out remaining tuples in memory into a final run (or two),
+ * workMem, we begin to emit tuples into sorted runs in temporary tapes.
+ * When tuples are dumped in batch after quicksorting, we begin a new run
+ * with a new output tape (selected per Algorithm D).  After the end of the
+ * input is reached, we dump out remaining tuples in memory into a final run,
  * then merge the runs using Algorithm D.
  *
  * When merging runs, we use a heap containing just the frontmost tuple from
- * each source run; we repeatedly output the smallest tuple and insert the
- * next tuple from its source tape (if any).  When the heap empties, the merge
- * is complete.  The basic merge algorithm thus needs very little memory ---
- * only M tuples for an M-way merge, and M is constrained to a small number.
- * However, we can still make good use of our full workMem allocation by
- * pre-reading additional tuples from each source tape.  Without prereading,
- * our access pattern to the temporary file would be very erratic; on average
- * we'd read one block from each of M source tapes during the same time that
- * we're writing M blocks to the output tape, so there is no sequentiality of
- * access at all, defeating the read-ahead methods used by most Unix kernels.
- * Worse, the output tape gets written into a very random sequence of blocks
- * of the temp file, ensuring that things will be even worse when it comes
- * time to read that tape.     A straightforward merge pass thus ends up doing a
- * lot of waiting for disk seeks.  We can improve matters by prereading from
- * each source tape sequentially, loading about workMem/M bytes from each tape
- * in turn.  Then we run the merge algorithm, writing but not reading until
- * one of the preloaded tuple series runs out. Then we switch back to preread
- * mode, fill memory again, and repeat.  This approach helps to localize both
- * read and write accesses.
+ * each source run; we repeatedly output the smallest tuple and replace it
+ * with the next tuple from its source tape (if any).  When the heap empties,
+ * the merge is complete.  The basic merge algorithm thus needs very little
+ * memory --- only M tuples for an M-way merge, and M is constrained to a
+ * small number.  However, we can still make good use of our full workMem
+ * allocation by pre-reading additional blocks from each source tape.  Without
+ * prereading, our access pattern to the temporary file would be very erratic;
+ * on average we'd read one block from each of M source tapes during the same
+ * time that we're writing M blocks to the output tape, so there is no
+ * sequentiality of access at all, defeating the read-ahead methods used by
+ * most Unix kernels.  Worse, the output tape gets written into a very random
+ * sequence of blocks of the temp file, ensuring that things will be even
+ * worse when it comes time to read that tape.  A straightforward merge pass
+ * thus ends up doing a lot of waiting for disk seeks.  We can improve matters
+ * by prereading from each source tape sequentially, loading about workMem/M
+ * bytes from each tape in turn, and making the sequential blocks immediately
+ * available for reuse.  This approach helps to localize both read and write
+ * accesses.  The pre-reading is handled by logtape.c, we just tell it how
+ * much memory to use for the buffers.
  *
  * When the caller requests random access to the sort result, we form
  * the final sorted run on a logical tape which is then "frozen", so
- * that we can access it randomly.     When the caller does not need random
+ * that we can access it randomly.  When the caller does not need random
  * access, we return from tuplesort_performsort() as soon as we are down
  * to one run per logical tape.  The final merge is then performed
  * on-the-fly as the caller repeatedly calls tuplesort_getXXX; this
  * grounds that 7 is the "sweet spot" on the tapes-to-passes curve according
  * to Knuth's figure 70 (section 5.4.2).  However, Knuth is assuming that
  * tape drives are expensive beasts, and in particular that there will always
- * be many more runs than tape drives. In our implementation a "tape drive"
+ * be many more runs than tape drives.  In our implementation a "tape drive"
  * doesn't cost much more than a few Kb of memory buffers, so we can afford
  * to have lots of them.  In particular, if we can have as many tape drives
  * as sorted runs, we can eliminate any repeated I/O at all.  In the current
  * code we determine the number of tapes M on the basis of workMem: we want
  * workMem/M to be large enough that we read a fair amount of data each time
  * we preread from a tape, so as to maintain the locality of access described
- * above.  Nonetheless, with large workMem we can have many tapes.
+ * above.  Nonetheless, with large workMem we can have many tapes (but not
+ * too many -- see the comments in tuplesort_merge_order).
  *
+ * This module supports parallel sorting.  Parallel sorts involve coordination
+ * among one or more worker processes, and a leader process, each with its own
+ * tuplesort state.  The leader process (or, more accurately, the
+ * Tuplesortstate associated with a leader process) creates a full tapeset
+ * consisting of worker tapes with one run to merge; a run for every
+ * worker process.  This is then merged.  Worker processes are guaranteed to
+ * produce exactly one output run from their partial input.
  *
- * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
 
 #include <limits.h>
 
+#include "access/hash.h"
 #include "access/htup_details.h"
 #include "access/nbtree.h"
 #include "catalog/index.h"
+#include "catalog/pg_am.h"
 #include "commands/tablespace.h"
 #include "executor/executor.h"
 #include "miscadmin.h"
 #define DATUM_SORT             2
 #define CLUSTER_SORT   3
 
+/* Sort parallel code from state for sort__start probes */
+#define PARALLEL_SORT(state)   ((state)->shared == NULL ? 0 : \
+                                                                (state)->worker >= 0 ? 1 : 2)
+
 /* GUC variables */
 #ifdef TRACE_SORT
 bool           trace_sort = false;
@@ -134,38 +136,62 @@ bool              optimize_bounded_sort = true;
 
 
 /*
- * The objects we actually sort are SortTuple structs. These contain
+ * The objects we actually sort are SortTuple structs.  These contain
  * a pointer to the tuple proper (might be a MinimalTuple or IndexTuple),
  * which is a separate palloc chunk --- we assume it is just one chunk and
- * can be freed by a simple pfree().  SortTuples also contain the tuple's
- * first key column in Datum/nullflag format, and an index integer.
+ * can be freed by a simple pfree() (except during merge, when we use a
+ * simple slab allocator).  SortTuples also contain the tuple's first key
+ * column in Datum/nullflag format, and an index integer.
  *
  * Storing the first key column lets us save heap_getattr or index_getattr
- * calls during tuple comparisons.     We could extract and save all the key
+ * calls during tuple comparisons.  We could extract and save all the key
  * columns not just the first, but this would increase code complexity and
  * overhead, and wouldn't actually save any comparison cycles in the common
  * case where the first key determines the comparison result.  Note that
  * for a pass-by-reference datatype, datum1 points into the "tuple" storage.
  *
+ * There is one special case: when the sort support infrastructure provides an
+ * "abbreviated key" representation, where the key is (typically) a pass by
+ * value proxy for a pass by reference type.  In this case, the abbreviated key
+ * is stored in datum1 in place of the actual first key column.
+ *
  * When sorting single Datums, the data value is represented directly by
- * datum1/isnull1.     If the datatype is pass-by-reference and isnull1 is false,
- * then datum1 points to a separately palloc'd data value that is also pointed
- * to by the "tuple" pointer; otherwise "tuple" is NULL.
+ * datum1/isnull1 for pass by value types (or null values).  If the datatype is
+ * pass-by-reference and isnull1 is false, then "tuple" points to a separately
+ * palloc'd data value, otherwise "tuple" is NULL.  The value of datum1 is then
+ * either the same pointer as "tuple", or is an abbreviated key value as
+ * described above.  Accordingly, "tuple" is always used in preference to
+ * datum1 as the authoritative value for pass-by-reference cases.
  *
- * While building initial runs, tupindex holds the tuple's run number.  During
- * merge passes, we re-use it to hold the input tape number that each tuple in
- * the heap was read from, or to hold the index of the next tuple pre-read
- * from the same tape in the case of pre-read entries. tupindex goes unused
- * if the sort occurs entirely in memory.
+ * tupindex holds the input tape number that each tuple in the heap was read
+ * from during merge passes.
  */
 typedef struct
 {
-       void       *tuple;                      /* the tuple proper */
+       void       *tuple;                      /* the tuple itself */
        Datum           datum1;                 /* value of first key column */
        bool            isnull1;                /* is first key column NULL? */
        int                     tupindex;               /* see notes above */
 } SortTuple;
 
+/*
+ * During merge, we use a pre-allocated set of fixed-size slots to hold
+ * tuples.  To avoid palloc/pfree overhead.
+ *
+ * Merge doesn't require a lot of memory, so we can afford to waste some,
+ * by using gratuitously-sized slots.  If a tuple is larger than 1 kB, the
+ * palloc() overhead is not significant anymore.
+ *
+ * 'nextfree' is valid when this chunk is in the free list.  When in use, the
+ * slot holds a tuple.
+ */
+#define SLAB_SLOT_SIZE 1024
+
+typedef union SlabSlot
+{
+       union SlabSlot *nextfree;
+       char            buffer[SLAB_SLOT_SIZE];
+} SlabSlot;
 
 /*
  * Possible states of a Tuplesort object.  These denote the states that
@@ -185,19 +211,20 @@ typedef enum
  * Parameters for calculation of number of tapes to use --- see inittapes()
  * and tuplesort_merge_order().
  *
- * In this calculation we assume that each tape will cost us about 3 blocks
- * worth of buffer space (which is an underestimate for very large data
- * volumes, but it's probably close enough --- see logtape.c).
+ * In this calculation we assume that each tape will cost us about 1 blocks
+ * worth of buffer space.  This ignores the overhead of all the other data
+ * structures needed for each tape, but it's probably close enough.
  *
  * MERGE_BUFFER_SIZE is how much data we'd like to read from each input
  * tape during a preread cycle (see discussion at top of file).
  */
 #define MINORDER               6               /* minimum merge order */
-#define TAPE_BUFFER_OVERHEAD           (BLCKSZ * 3)
+#define MAXORDER               500             /* maximum merge order */
+#define TAPE_BUFFER_OVERHEAD           BLCKSZ
 #define MERGE_BUFFER_SIZE                      (BLCKSZ * 32)
 
 typedef int (*SortTupleComparator) (const SortTuple *a, const SortTuple *b,
-                                                                                               Tuplesortstate *state);
+                                                                       Tuplesortstate *state);
 
 /*
  * Private state of a Tuplesort operation.
@@ -211,11 +238,13 @@ struct Tuplesortstate
                                                                 * tuples to return? */
        bool            boundUsed;              /* true if we made use of a bounded heap */
        int                     bound;                  /* if bounded, the maximum number of tuples */
+       bool            tuples;                 /* Can SortTuple.tuple ever be set? */
        int64           availMem;               /* remaining memory available, in bytes */
        int64           allowedMem;             /* total memory allowed, in bytes */
        int                     maxTapes;               /* number of tapes (Knuth's T) */
        int                     tapeRange;              /* maxTapes-1 (Knuth's P) */
-       MemoryContext sortcontext;      /* memory context holding all sort data */
+       MemoryContext sortcontext;      /* memory context holding most sort data */
+       MemoryContext tuplecontext; /* sub-context of sortcontext for tuple data */
        LogicalTapeSet *tapeset;        /* logtape.c object for tapes in a temp file */
 
        /*
@@ -238,40 +267,30 @@ struct Tuplesortstate
        void            (*copytup) (Tuplesortstate *state, SortTuple *stup, void *tup);
 
        /*
-        * Function to write a stored tuple onto tape.  The representation of the
+        * Function to write a stored tuple onto tape.  The representation of the
         * tuple on tape need not be the same as it is in memory; requirements on
-        * the tape representation are given below.  After writing the tuple,
-        * pfree() the out-of-line data (not the SortTuple struct!), and increase
-        * state->availMem by the amount of memory space thereby released.
+        * the tape representation are given below.  Unless the slab allocator is
+        * used, after writing the tuple, pfree() the out-of-line data (not the
+        * SortTuple struct!), and increase state->availMem by the amount of
+        * memory space thereby released.
         */
        void            (*writetup) (Tuplesortstate *state, int tapenum,
-                                                                                SortTuple *stup);
+                                                        SortTuple *stup);
 
        /*
         * Function to read a stored tuple from tape back into memory. 'len' is
-        * the already-read length of the stored tuple.  Create a palloc'd copy,
-        * initialize tuple/datum1/isnull1 in the target SortTuple struct, and
-        * decrease state->availMem by the amount of memory space consumed.
+        * the already-read length of the stored tuple.  The tuple is allocated
+        * from the slab memory arena, or is palloc'd, see readtup_alloc().
         */
        void            (*readtup) (Tuplesortstate *state, SortTuple *stup,
-                                                                               int tapenum, unsigned int len);
-
-       /*
-        * Function to reverse the sort direction from its current state. (We
-        * could dispense with this if we wanted to enforce that all variants
-        * represent the sort key information alike.)
-        */
-       void            (*reversedirection) (Tuplesortstate *state);
+                                                       int tapenum, unsigned int len);
 
        /*
-        * This array holds the tuples now in sort memory.      If we are in state
+        * This array holds the tuples now in sort memory.  If we are in state
         * INITIAL, the tuples are in no particular order; if we are in state
         * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
         * and FINALMERGE, the tuples are organized in "heap" order per Algorithm
-        * H.  (Note that memtupcount only counts the tuples that are part of the
-        * heap --- during merge passes, memtuples[] entries beyond tapeRange are
-        * never in the heap and are used to hold pre-read tuples.)  In state
-        * SORTEDONTAPE, the array is not used.
+        * H.  In state SORTEDONTAPE, the array is not used.
         */
        SortTuple  *memtuples;          /* array of SortTuple structs */
        int                     memtupcount;    /* number of tuples currently present */
@@ -279,8 +298,52 @@ struct Tuplesortstate
        bool            growmemtuples;  /* memtuples' growth still underway? */
 
        /*
-        * While building initial runs, this is the current output run number
-        * (starting at 0).  Afterwards, it is the number of initial runs we made.
+        * Memory for tuples is sometimes allocated using a simple slab allocator,
+        * rather than with palloc().  Currently, we switch to slab allocation
+        * when we start merging.  Merging only needs to keep a small, fixed
+        * number of tuples in memory at any time, so we can avoid the
+        * palloc/pfree overhead by recycling a fixed number of fixed-size slots
+        * to hold the tuples.
+        *
+        * For the slab, we use one large allocation, divided into SLAB_SLOT_SIZE
+        * slots.  The allocation is sized to have one slot per tape, plus one
+        * additional slot.  We need that many slots to hold all the tuples kept
+        * in the heap during merge, plus the one we have last returned from the
+        * sort, with tuplesort_gettuple.
+        *
+        * Initially, all the slots are kept in a linked list of free slots.  When
+        * a tuple is read from a tape, it is put to the next available slot, if
+        * it fits.  If the tuple is larger than SLAB_SLOT_SIZE, it is palloc'd
+        * instead.
+        *
+        * When we're done processing a tuple, we return the slot back to the free
+        * list, or pfree() if it was palloc'd.  We know that a tuple was
+        * allocated from the slab, if its pointer value is between
+        * slabMemoryBegin and -End.
+        *
+        * When the slab allocator is used, the USEMEM/LACKMEM mechanism of
+        * tracking memory usage is not used.
+        */
+       bool            slabAllocatorUsed;
+
+       char       *slabMemoryBegin;    /* beginning of slab memory arena */
+       char       *slabMemoryEnd;      /* end of slab memory arena */
+       SlabSlot   *slabFreeHead;       /* head of free list */
+
+       /* Buffer size to use for reading input tapes, during merge. */
+       size_t          read_buffer_size;
+
+       /*
+        * When we return a tuple to the caller in tuplesort_gettuple_XXX, that
+        * came from a tape (that is, in TSS_SORTEDONTAPE or TSS_FINALMERGE
+        * modes), we remember the tuple in 'lastReturnedTuple', so that we can
+        * recycle the memory on next gettuple call.
+        */
+       void       *lastReturnedTuple;
+
+       /*
+        * While building initial runs, this is the current output run number.
+        * Afterwards, it is the number of initial runs we made.
         */
        int                     currentRun;
 
@@ -290,27 +353,11 @@ struct Tuplesortstate
         */
 
        /*
-        * These variables are only used during merge passes.  mergeactive[i] is
-        * true if we are reading an input run from (actual) tape number i and
-        * have not yet exhausted that run.  mergenext[i] is the memtuples index
-        * of the next pre-read tuple (next to be loaded into the heap) for tape
-        * i, or 0 if we are out of pre-read tuples.  mergelast[i] similarly
-        * points to the last pre-read tuple from each tape.  mergeavailslots[i]
-        * is the number of unused memtuples[] slots reserved for tape i, and
-        * mergeavailmem[i] is the amount of unused space allocated for tape i.
-        * mergefreelist and mergefirstfree keep track of unused locations in the
-        * memtuples[] array.  The memtuples[].tupindex fields link together
-        * pre-read tuples for each tape as well as recycled locations in
-        * mergefreelist. It is OK to use 0 as a null link in these lists, because
-        * memtuples[0] is part of the merge heap and is never a pre-read tuple.
+        * This variable is only used during merge passes.  mergeactive[i] is true
+        * if we are reading an input run from (actual) tape number i and have not
+        * yet exhausted that run.
         */
        bool       *mergeactive;        /* active input run source? */
-       int                *mergenext;          /* first preread tuple for each source */
-       int                *mergelast;          /* last preread tuple for each source */
-       int                *mergeavailslots;    /* slots left for prereading each tape */
-       int64      *mergeavailmem;      /* availMem for prereading each tape */
-       int                     mergefreelist;  /* head of freelist of recycled slots */
-       int                     mergefirstfree; /* first slot never used in this merge */
 
        /*
         * Variables for Algorithm D.  Note that destTape is a "logical" tape
@@ -340,8 +387,28 @@ struct Tuplesortstate
        bool            markpos_eof;    /* saved "eof_reached" */
 
        /*
-        * These variables are specific to the MinimalTuple case; they are set by
-        * tuplesort_begin_heap and used only by the MinimalTuple routines.
+        * These variables are used during parallel sorting.
+        *
+        * worker is our worker identifier.  Follows the general convention that
+        * -1 value relates to a leader tuplesort, and values >= 0 worker
+        * tuplesorts. (-1 can also be a serial tuplesort.)
+        *
+        * shared is mutable shared memory state, which is used to coordinate
+        * parallel sorts.
+        *
+        * nParticipants is the number of worker Tuplesortstates known by the
+        * leader to have actually been launched, which implies that they must
+        * finish a run leader can merge.  Typically includes a worker state held
+        * by the leader process itself.  Set in the leader Tuplesortstate only.
+        */
+       int                     worker;
+       Sharedsort *shared;
+       int                     nParticipants;
+
+       /*
+        * The sortKeys variable is used by every case other than the hash index
+        * case; it is set by tuplesort_begin_xxx.  tupDesc is only used by the
+        * MinimalTuple and CLUSTER routines, though.
         */
        TupleDesc       tupDesc;
        SortSupport sortKeys;           /* array of length nKeys */
@@ -352,10 +419,18 @@ struct Tuplesortstate
         */
        SortSupport onlyKey;
 
+       /*
+        * Additional state for managing "abbreviated key" sortsupport routines
+        * (which currently may be used by all cases except the hash index case).
+        * Tracks the intervals at which the optimization's effectiveness is
+        * tested.
+        */
+       int64           abbrevNext;             /* Tuple # at which to next check
+                                                                * applicability */
+
        /*
         * These variables are specific to the CLUSTER case; they are set by
-        * tuplesort_begin_cluster.  Note CLUSTER also uses tupDesc and
-        * indexScanKey.
+        * tuplesort_begin_cluster.
         */
        IndexInfo  *indexInfo;          /* info about index being used for reference */
        EState     *estate;                     /* for evaluating index expressions */
@@ -368,20 +443,20 @@ struct Tuplesortstate
        Relation        indexRel;               /* index being built */
 
        /* These are specific to the index_btree subcase: */
-       ScanKey         indexScanKey;
        bool            enforceUnique;  /* complain if we find duplicate tuples */
 
        /* These are specific to the index_hash subcase: */
-       uint32          hash_mask;              /* mask for sortable part of hash code */
+       uint32          high_mask;              /* masks for sortable part of hash code */
+       uint32          low_mask;
+       uint32          max_buckets;
 
        /*
         * These variables are specific to the Datum case; they are set by
         * tuplesort_begin_datum and used only by the DatumTuple routines.
         */
        Oid                     datumType;
-       /* we need typelen and byval in order to know how to copy the Datums. */
+       /* we need typelen in order to know how to copy the Datums. */
        int                     datumTypeLen;
-       bool            datumTypeByVal;
 
        /*
         * Resource snapshot for time of sort start.
@@ -391,14 +466,72 @@ struct Tuplesortstate
 #endif
 };
 
+/*
+ * Private mutable state of tuplesort-parallel-operation.  This is allocated
+ * in shared memory.
+ */
+struct Sharedsort
+{
+       /* mutex protects all fields prior to tapes */
+       slock_t         mutex;
+
+       /*
+        * currentWorker generates ordinal identifier numbers for parallel sort
+        * workers.  These start from 0, and are always gapless.
+        *
+        * Workers increment workersFinished to indicate having finished.  If this
+        * is equal to state.nParticipants within the leader, leader is ready to
+        * merge worker runs.
+        */
+       int                     currentWorker;
+       int                     workersFinished;
+
+       /* Temporary file space */
+       SharedFileSet fileset;
+
+       /* Size of tapes flexible array */
+       int                     nTapes;
+
+       /*
+        * Tapes array used by workers to report back information needed by the
+        * leader to concatenate all worker tapes into one for merging
+        */
+       TapeShare       tapes[FLEXIBLE_ARRAY_MEMBER];
+};
+
+/*
+ * Is the given tuple allocated from the slab memory arena?
+ */
+#define IS_SLAB_SLOT(state, tuple) \
+       ((char *) (tuple) >= (state)->slabMemoryBegin && \
+        (char *) (tuple) < (state)->slabMemoryEnd)
+
+/*
+ * Return the given tuple to the slab memory free list, or free it
+ * if it was palloc'd.
+ */
+#define RELEASE_SLAB_SLOT(state, tuple) \
+       do { \
+               SlabSlot *buf = (SlabSlot *) tuple; \
+               \
+               if (IS_SLAB_SLOT((state), buf)) \
+               { \
+                       buf->nextfree = (state)->slabFreeHead; \
+                       (state)->slabFreeHead = buf; \
+               } else \
+                       pfree(buf); \
+       } while(0)
+
 #define COMPARETUP(state,a,b)  ((*(state)->comparetup) (a, b, state))
 #define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup))
 #define WRITETUP(state,tape,stup)      ((*(state)->writetup) (state, tape, stup))
 #define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len))
-#define REVERSEDIRECTION(state) ((*(state)->reversedirection) (state))
-#define LACKMEM(state)         ((state)->availMem < 0)
+#define LACKMEM(state)         ((state)->availMem < 0 && !(state)->slabAllocatorUsed)
 #define USEMEM(state,amt)      ((state)->availMem -= (amt))
 #define FREEMEM(state,amt)     ((state)->availMem += (amt))
+#define SERIAL(state)          ((state)->shared == NULL)
+#define WORKER(state)          ((state)->shared && (state)->worker != -1)
+#define LEADER(state)          ((state)->shared && (state)->worker == -1)
 
 /*
  * NOTES about on-tape representation of tuples:
@@ -412,7 +545,7 @@ struct Tuplesortstate
  * If state->randomAccess is true, then the stored representation of the
  * tuple must be followed by another "unsigned int" that is a copy of the
  * length --- so the total tape space used is actually sizeof(unsigned int)
- * more than the stored length value.  This allows read-backwards.     When
+ * more than the stored length value.  This allows read-backwards.  When
  * randomAccess is not true, the write/read routines may omit the extra
  * length word.
  *
@@ -422,7 +555,7 @@ struct Tuplesortstate
  * the back length word (if present).
  *
  * The write/read routines can make use of the tuple description data
- * stored in the Tuplesortstate record, if needed.     They are also expected
+ * stored in the Tuplesortstate record, if needed.  They are also expected
  * to adjust state->availMem by the amount of memory space (not tape space!)
  * released or consumed.  There is no error return from either writetup
  * or readtup; they should ereport() on failure.
@@ -438,7 +571,13 @@ struct Tuplesortstate
  * rather than the originally-requested size.  This is important since
  * palloc can add substantial overhead.  It's not a complete answer since
  * we won't count any wasted space in palloc allocation blocks, but it's
- * a lot better than what we were doing before 7.3.
+ * a lot better than what we were doing before 7.3.  As of 9.6, a
+ * separate memory context is used for caller passed tuples.  Resetting
+ * it at certain key increments significantly ameliorates fragmentation.
+ * Note that this places a responsibility on readtup and copytup routines
+ * to use the right memory context for these tuples (and to not use the
+ * reset context for anything whose lifetime needs to span multiple
+ * external sort runs).
  */
 
 /* When using this macro, beware of double evaluation of len */
@@ -449,23 +588,30 @@ struct Tuplesortstate
        } while(0)
 
 
-static Tuplesortstate *tuplesort_begin_common(int workMem, bool randomAccess);
+static Tuplesortstate *tuplesort_begin_common(int workMem,
+                                          SortCoordinate coordinate,
+                                          bool randomAccess);
 static void puttuple_common(Tuplesortstate *state, SortTuple *tuple);
-static void inittapes(Tuplesortstate *state);
+static bool consider_abort_common(Tuplesortstate *state);
+static void inittapes(Tuplesortstate *state, bool mergeruns);
+static void inittapestate(Tuplesortstate *state, int maxTapes);
 static void selectnewtape(Tuplesortstate *state);
+static void init_slab_allocator(Tuplesortstate *state, int numSlots);
 static void mergeruns(Tuplesortstate *state);
 static void mergeonerun(Tuplesortstate *state);
 static void beginmerge(Tuplesortstate *state);
-static void mergepreread(Tuplesortstate *state);
-static void mergeprereadone(Tuplesortstate *state, int srcTape);
+static bool mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup);
 static void dumptuples(Tuplesortstate *state, bool alltuples);
 static void make_bounded_heap(Tuplesortstate *state);
 static void sort_bounded_heap(Tuplesortstate *state);
-static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple,
-                                         int tupleindex, bool checkIndex);
-static void tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex);
+static void tuplesort_sort_memtuples(Tuplesortstate *state);
+static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple);
+static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple);
+static void tuplesort_heap_delete_top(Tuplesortstate *state);
+static void reversedirection(Tuplesortstate *state);
 static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
 static void markrunend(Tuplesortstate *state, int tapenum);
+static void *readtup_alloc(Tuplesortstate *state, Size tuplen);
 static int comparetup_heap(const SortTuple *a, const SortTuple *b,
                                Tuplesortstate *state);
 static void copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup);
@@ -473,7 +619,6 @@ static void writetup_heap(Tuplesortstate *state, int tapenum,
                          SortTuple *stup);
 static void readtup_heap(Tuplesortstate *state, SortTuple *stup,
                         int tapenum, unsigned int len);
-static void reversedirection_heap(Tuplesortstate *state);
 static int comparetup_cluster(const SortTuple *a, const SortTuple *b,
                                   Tuplesortstate *state);
 static void copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup);
@@ -490,8 +635,6 @@ static void writetup_index(Tuplesortstate *state, int tapenum,
                           SortTuple *stup);
 static void readtup_index(Tuplesortstate *state, SortTuple *stup,
                          int tapenum, unsigned int len);
-static void reversedirection_index_btree(Tuplesortstate *state);
-static void reversedirection_index_hash(Tuplesortstate *state);
 static int comparetup_datum(const SortTuple *a, const SortTuple *b,
                                 Tuplesortstate *state);
 static void copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup);
@@ -499,7 +642,10 @@ static void writetup_datum(Tuplesortstate *state, int tapenum,
                           SortTuple *stup);
 static void readtup_datum(Tuplesortstate *state, SortTuple *stup,
                          int tapenum, unsigned int len);
-static void reversedirection_datum(Tuplesortstate *state);
+static int     worker_get_identifier(Tuplesortstate *state);
+static void worker_freeze_result_tape(Tuplesortstate *state);
+static void worker_nomergeruns(Tuplesortstate *state);
+static void leader_takeover_tapes(Tuplesortstate *state);
 static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
 
 /*
@@ -519,7 +665,7 @@ static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
  *
  * After calling tuplesort_begin, the caller should call tuplesort_putXXX
  * zero or more times, then call tuplesort_performsort when all the tuples
- * have been supplied. After performsort, retrieve the tuples in sorted
+ * have been supplied.  After performsort, retrieve the tuples in sorted
  * order by calling tuplesort_getXXX until it returns false/NULL.  (If random
  * access was requested, rescan, markpos, and restorepos can also be called.)
  * Call tuplesort_end to terminate the operation and release memory/disk space.
@@ -532,21 +678,38 @@ static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
  */
 
 static Tuplesortstate *
-tuplesort_begin_common(int workMem, bool randomAccess)
+tuplesort_begin_common(int workMem, SortCoordinate coordinate,
+                                          bool randomAccess)
 {
        Tuplesortstate *state;
        MemoryContext sortcontext;
+       MemoryContext tuplecontext;
        MemoryContext oldcontext;
 
+       /* See leader_takeover_tapes() remarks on randomAccess support */
+       if (coordinate && randomAccess)
+               elog(ERROR, "random access disallowed under parallel sort");
+
        /*
         * Create a working memory context for this sort operation. All data
         * needed by the sort will live inside this context.
         */
        sortcontext = AllocSetContextCreate(CurrentMemoryContext,
-                                                                               "TupleSort",
-                                                                               ALLOCSET_DEFAULT_MINSIZE,
-                                                                               ALLOCSET_DEFAULT_INITSIZE,
-                                                                               ALLOCSET_DEFAULT_MAXSIZE);
+                                                                               "TupleSort main",
+                                                                               ALLOCSET_DEFAULT_SIZES);
+
+       /*
+        * Caller tuple (e.g. IndexTuple) memory context.
+        *
+        * A dedicated child context used exclusively for caller passed tuples
+        * eases memory management.  Resetting at key points reduces
+        * fragmentation. Note that the memtuples array of SortTuples is allocated
+        * in the parent context, not this context, because there is no need to
+        * free memtuples early.
+        */
+       tuplecontext = AllocSetContextCreate(sortcontext,
+                                                                                "Caller tuples",
+                                                                                ALLOCSET_DEFAULT_SIZES);
 
        /*
         * Make the Tuplesortstate within the per-sort context.  This way, we
@@ -564,15 +727,32 @@ tuplesort_begin_common(int workMem, bool randomAccess)
        state->status = TSS_INITIAL;
        state->randomAccess = randomAccess;
        state->bounded = false;
+       state->tuples = true;
        state->boundUsed = false;
-       state->allowedMem = workMem * (int64) 1024;
+
+       /*
+        * workMem is forced to be at least 64KB, the current minimum valid value
+        * for the work_mem GUC.  This is a defense against parallel sort callers
+        * that divide out memory among many workers in a way that leaves each
+        * with very little memory.
+        */
+       state->allowedMem = Max(workMem, 64) * (int64) 1024;
        state->availMem = state->allowedMem;
        state->sortcontext = sortcontext;
+       state->tuplecontext = tuplecontext;
        state->tapeset = NULL;
 
        state->memtupcount = 0;
-       state->memtupsize = 1024;       /* initial guess */
+
+       /*
+        * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
+        * see comments in grow_memtuples().
+        */
+       state->memtupsize = Max(1024,
+                                                       ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1);
+
        state->growmemtuples = true;
+       state->slabAllocatorUsed = false;
        state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
 
        USEMEM(state, GetMemoryChunkSpace(state->memtuples));
@@ -590,6 +770,33 @@ tuplesort_begin_common(int workMem, bool randomAccess)
 
        state->result_tape = -1;        /* flag that result tape has not been formed */
 
+       /*
+        * Initialize parallel-related state based on coordination information
+        * from caller
+        */
+       if (!coordinate)
+       {
+               /* Serial sort */
+               state->shared = NULL;
+               state->worker = -1;
+               state->nParticipants = -1;
+       }
+       else if (coordinate->isWorker)
+       {
+               /* Parallel worker produces exactly one final run from all input */
+               state->shared = coordinate->sharedsort;
+               state->worker = worker_get_identifier(state);
+               state->nParticipants = -1;
+       }
+       else
+       {
+               /* Parallel leader state only used for final merge */
+               state->shared = coordinate->sharedsort;
+               state->worker = -1;
+               state->nParticipants = coordinate->nParticipants;
+               Assert(state->nParticipants >= 1);
+       }
+
        MemoryContextSwitchTo(oldcontext);
 
        return state;
@@ -600,9 +807,10 @@ tuplesort_begin_heap(TupleDesc tupDesc,
                                         int nkeys, AttrNumber *attNums,
                                         Oid *sortOperators, Oid *sortCollations,
                                         bool *nullsFirstFlags,
-                                        int workMem, bool randomAccess)
+                                        int workMem, SortCoordinate coordinate, bool randomAccess)
 {
-       Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
+       Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+                                                                                                  randomAccess);
        MemoryContext oldcontext;
        int                     i;
 
@@ -623,15 +831,16 @@ tuplesort_begin_heap(TupleDesc tupDesc,
                                                                false,  /* no unique check */
                                                                nkeys,
                                                                workMem,
-                                                               randomAccess);
+                                                               randomAccess,
+                                                               PARALLEL_SORT(state));
 
        state->comparetup = comparetup_heap;
        state->copytup = copytup_heap;
        state->writetup = writetup_heap;
        state->readtup = readtup_heap;
-       state->reversedirection = reversedirection_heap;
 
        state->tupDesc = tupDesc;       /* assume we need not copy tupDesc */
+       state->abbrevNext = 10;
 
        /* Prepare SortSupport data for each column */
        state->sortKeys = (SortSupport) palloc0(nkeys * sizeof(SortSupportData));
@@ -647,11 +856,19 @@ tuplesort_begin_heap(TupleDesc tupDesc,
                sortKey->ssup_collation = sortCollations[i];
                sortKey->ssup_nulls_first = nullsFirstFlags[i];
                sortKey->ssup_attno = attNums[i];
+               /* Convey if abbreviation optimization is applicable in principle */
+               sortKey->abbreviate = (i == 0);
 
                PrepareSortSupportFromOrderingOp(sortOperators[i], sortKey);
        }
 
-       if (nkeys == 1)
+       /*
+        * The "onlyKey" optimization cannot be used with abbreviated keys, since
+        * tie-breaker comparisons may be required.  Typically, the optimization
+        * is only of value to pass-by-value types anyway, whereas abbreviated
+        * keys are typically only of value to pass-by-reference types.
+        */
+       if (nkeys == 1 && !state->sortKeys->abbrev_converter)
                state->onlyKey = state->sortKeys;
 
        MemoryContextSwitchTo(oldcontext);
@@ -662,10 +879,14 @@ tuplesort_begin_heap(TupleDesc tupDesc,
 Tuplesortstate *
 tuplesort_begin_cluster(TupleDesc tupDesc,
                                                Relation indexRel,
-                                               int workMem, bool randomAccess)
+                                               int workMem,
+                                               SortCoordinate coordinate, bool randomAccess)
 {
-       Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
+       Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+                                                                                                  randomAccess);
+       BTScanInsert indexScanKey;
        MemoryContext oldcontext;
+       int                     i;
 
        Assert(indexRel->rd_rel->relam == BTREE_AM_OID);
 
@@ -679,25 +900,27 @@ tuplesort_begin_cluster(TupleDesc tupDesc,
                         workMem, randomAccess ? 't' : 'f');
 #endif
 
-       state->nKeys = RelationGetNumberOfAttributes(indexRel);
+       state->nKeys = IndexRelationGetNumberOfKeyAttributes(indexRel);
 
        TRACE_POSTGRESQL_SORT_START(CLUSTER_SORT,
                                                                false,  /* no unique check */
                                                                state->nKeys,
                                                                workMem,
-                                                               randomAccess);
+                                                               randomAccess,
+                                                               PARALLEL_SORT(state));
 
        state->comparetup = comparetup_cluster;
        state->copytup = copytup_cluster;
        state->writetup = writetup_cluster;
        state->readtup = readtup_cluster;
-       state->reversedirection = reversedirection_index_btree;
+       state->abbrevNext = 10;
 
        state->indexInfo = BuildIndexInfo(indexRel);
-       state->indexScanKey = _bt_mkscankey_nodata(indexRel);
 
        state->tupDesc = tupDesc;       /* assume we need not copy tupDesc */
 
+       indexScanKey = _bt_mkscankey(indexRel, NULL);
+
        if (state->indexInfo->ii_Expressions != NULL)
        {
                TupleTableSlot *slot;
@@ -710,11 +933,39 @@ tuplesort_begin_cluster(TupleDesc tupDesc,
                 * scantuple has to point to that slot, too.
                 */
                state->estate = CreateExecutorState();
-               slot = MakeSingleTupleTableSlot(tupDesc);
+               slot = MakeSingleTupleTableSlot(tupDesc, &TTSOpsVirtual);
                econtext = GetPerTupleExprContext(state->estate);
                econtext->ecxt_scantuple = slot;
        }
 
+       /* Prepare SortSupport data for each column */
+       state->sortKeys = (SortSupport) palloc0(state->nKeys *
+                                                                                       sizeof(SortSupportData));
+
+       for (i = 0; i < state->nKeys; i++)
+       {
+               SortSupport sortKey = state->sortKeys + i;
+               ScanKey         scanKey = indexScanKey->scankeys + i;
+               int16           strategy;
+
+               sortKey->ssup_cxt = CurrentMemoryContext;
+               sortKey->ssup_collation = scanKey->sk_collation;
+               sortKey->ssup_nulls_first =
+                       (scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0;
+               sortKey->ssup_attno = scanKey->sk_attno;
+               /* Convey if abbreviation optimization is applicable in principle */
+               sortKey->abbreviate = (i == 0);
+
+               AssertState(sortKey->ssup_attno != 0);
+
+               strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ?
+                       BTGreaterStrategyNumber : BTLessStrategyNumber;
+
+               PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey);
+       }
+
+       pfree(indexScanKey);
+
        MemoryContextSwitchTo(oldcontext);
 
        return state;
@@ -724,10 +975,15 @@ Tuplesortstate *
 tuplesort_begin_index_btree(Relation heapRel,
                                                        Relation indexRel,
                                                        bool enforceUnique,
-                                                       int workMem, bool randomAccess)
+                                                       int workMem,
+                                                       SortCoordinate coordinate,
+                                                       bool randomAccess)
 {
-       Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
+       Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+                                                                                                  randomAccess);
+       BTScanInsert indexScanKey;
        MemoryContext oldcontext;
+       int                     i;
 
        oldcontext = MemoryContextSwitchTo(state->sortcontext);
 
@@ -739,25 +995,55 @@ tuplesort_begin_index_btree(Relation heapRel,
                         workMem, randomAccess ? 't' : 'f');
 #endif
 
-       state->nKeys = RelationGetNumberOfAttributes(indexRel);
+       state->nKeys = IndexRelationGetNumberOfKeyAttributes(indexRel);
 
        TRACE_POSTGRESQL_SORT_START(INDEX_SORT,
                                                                enforceUnique,
                                                                state->nKeys,
                                                                workMem,
-                                                               randomAccess);
+                                                               randomAccess,
+                                                               PARALLEL_SORT(state));
 
        state->comparetup = comparetup_index_btree;
        state->copytup = copytup_index;
        state->writetup = writetup_index;
        state->readtup = readtup_index;
-       state->reversedirection = reversedirection_index_btree;
+       state->abbrevNext = 10;
 
        state->heapRel = heapRel;
        state->indexRel = indexRel;
-       state->indexScanKey = _bt_mkscankey_nodata(indexRel);
        state->enforceUnique = enforceUnique;
 
+       indexScanKey = _bt_mkscankey(indexRel, NULL);
+
+       /* Prepare SortSupport data for each column */
+       state->sortKeys = (SortSupport) palloc0(state->nKeys *
+                                                                                       sizeof(SortSupportData));
+
+       for (i = 0; i < state->nKeys; i++)
+       {
+               SortSupport sortKey = state->sortKeys + i;
+               ScanKey         scanKey = indexScanKey->scankeys + i;
+               int16           strategy;
+
+               sortKey->ssup_cxt = CurrentMemoryContext;
+               sortKey->ssup_collation = scanKey->sk_collation;
+               sortKey->ssup_nulls_first =
+                       (scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0;
+               sortKey->ssup_attno = scanKey->sk_attno;
+               /* Convey if abbreviation optimization is applicable in principle */
+               sortKey->abbreviate = (i == 0);
+
+               AssertState(sortKey->ssup_attno != 0);
+
+               strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ?
+                       BTGreaterStrategyNumber : BTLessStrategyNumber;
+
+               PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey);
+       }
+
+       pfree(indexScanKey);
+
        MemoryContextSwitchTo(oldcontext);
 
        return state;
@@ -766,10 +1052,15 @@ tuplesort_begin_index_btree(Relation heapRel,
 Tuplesortstate *
 tuplesort_begin_index_hash(Relation heapRel,
                                                   Relation indexRel,
-                                                  uint32 hash_mask,
-                                                  int workMem, bool randomAccess)
+                                                  uint32 high_mask,
+                                                  uint32 low_mask,
+                                                  uint32 max_buckets,
+                                                  int workMem,
+                                                  SortCoordinate coordinate,
+                                                  bool randomAccess)
 {
-       Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
+       Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+                                                                                                  randomAccess);
        MemoryContext oldcontext;
 
        oldcontext = MemoryContextSwitchTo(state->sortcontext);
@@ -777,8 +1068,11 @@ tuplesort_begin_index_hash(Relation heapRel,
 #ifdef TRACE_SORT
        if (trace_sort)
                elog(LOG,
-               "begin index sort: hash_mask = 0x%x, workMem = %d, randomAccess = %c",
-                        hash_mask,
+                        "begin index sort: high_mask = 0x%x, low_mask = 0x%x, "
+                        "max_buckets = 0x%x, workMem = %d, randomAccess = %c",
+                        high_mask,
+                        low_mask,
+                        max_buckets,
                         workMem, randomAccess ? 't' : 'f');
 #endif
 
@@ -788,12 +1082,13 @@ tuplesort_begin_index_hash(Relation heapRel,
        state->copytup = copytup_index;
        state->writetup = writetup_index;
        state->readtup = readtup_index;
-       state->reversedirection = reversedirection_index_hash;
 
        state->heapRel = heapRel;
        state->indexRel = indexRel;
 
-       state->hash_mask = hash_mask;
+       state->high_mask = high_mask;
+       state->low_mask = low_mask;
+       state->max_buckets = max_buckets;
 
        MemoryContextSwitchTo(oldcontext);
 
@@ -802,10 +1097,11 @@ tuplesort_begin_index_hash(Relation heapRel,
 
 Tuplesortstate *
 tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
-                                         bool nullsFirstFlag,
-                                         int workMem, bool randomAccess)
+                                         bool nullsFirstFlag, int workMem,
+                                         SortCoordinate coordinate, bool randomAccess)
 {
-       Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
+       Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+                                                                                                  randomAccess);
        MemoryContext oldcontext;
        int16           typlen;
        bool            typbyval;
@@ -825,29 +1121,49 @@ tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
                                                                false,  /* no unique check */
                                                                1,
                                                                workMem,
-                                                               randomAccess);
+                                                               randomAccess,
+                                                               PARALLEL_SORT(state));
 
        state->comparetup = comparetup_datum;
        state->copytup = copytup_datum;
        state->writetup = writetup_datum;
        state->readtup = readtup_datum;
-       state->reversedirection = reversedirection_datum;
+       state->abbrevNext = 10;
 
        state->datumType = datumType;
 
+       /* lookup necessary attributes of the datum type */
+       get_typlenbyval(datumType, &typlen, &typbyval);
+       state->datumTypeLen = typlen;
+       state->tuples = !typbyval;
+
        /* Prepare SortSupport data */
-       state->onlyKey = (SortSupport) palloc0(sizeof(SortSupportData));
+       state->sortKeys = (SortSupport) palloc0(sizeof(SortSupportData));
+
+       state->sortKeys->ssup_cxt = CurrentMemoryContext;
+       state->sortKeys->ssup_collation = sortCollation;
+       state->sortKeys->ssup_nulls_first = nullsFirstFlag;
 
-       state->onlyKey->ssup_cxt = CurrentMemoryContext;
-       state->onlyKey->ssup_collation = sortCollation;
-       state->onlyKey->ssup_nulls_first = nullsFirstFlag;
+       /*
+        * Abbreviation is possible here only for by-reference types.  In theory,
+        * a pass-by-value datatype could have an abbreviated form that is cheaper
+        * to compare.  In a tuple sort, we could support that, because we can
+        * always extract the original datum from the tuple is needed.  Here, we
+        * can't, because a datum sort only stores a single copy of the datum; the
+        * "tuple" field of each sortTuple is NULL.
+        */
+       state->sortKeys->abbreviate = !typbyval;
 
-       PrepareSortSupportFromOrderingOp(sortOperator, state->onlyKey);
+       PrepareSortSupportFromOrderingOp(sortOperator, state->sortKeys);
 
-       /* lookup necessary attributes of the datum type */
-       get_typlenbyval(datumType, &typlen, &typbyval);
-       state->datumTypeLen = typlen;
-       state->datumTypeByVal = typbyval;
+       /*
+        * The "onlyKey" optimization cannot be used with abbreviated keys, since
+        * tie-breaker comparisons may be required.  Typically, the optimization
+        * is only of value to pass-by-value types anyway, whereas abbreviated
+        * keys are typically only of value to pass-by-reference types.
+        */
+       if (!state->sortKeys->abbrev_converter)
+               state->onlyKey = state->sortKeys;
 
        MemoryContextSwitchTo(oldcontext);
 
@@ -859,12 +1175,12 @@ tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
  *
  *     Advise tuplesort that at most the first N result tuples are required.
  *
- * Must be called before inserting any tuples. (Actually, we could allow it
+ * Must be called before inserting any tuples.  (Actually, we could allow it
  * as long as the sort hasn't spilled to disk, but there seems no need for
  * delayed calls at the moment.)
  *
  * This is a hint only. The tuplesort may still return more tuples than
- * requested.
+ * requested.  Parallel leader tuplesorts will always ignore the hint.
  */
 void
 tuplesort_set_bound(Tuplesortstate *state, int64 bound)
@@ -873,6 +1189,7 @@ tuplesort_set_bound(Tuplesortstate *state, int64 bound)
        Assert(state->status == TSS_INITIAL);
        Assert(state->memtupcount == 0);
        Assert(!state->bounded);
+       Assert(!WORKER(state));
 
 #ifdef DEBUG_BOUNDED_SORT
        /* Honor GUC setting that disables the feature (for easy testing) */
@@ -880,12 +1197,29 @@ tuplesort_set_bound(Tuplesortstate *state, int64 bound)
                return;
 #endif
 
+       /* Parallel leader ignores hint */
+       if (LEADER(state))
+               return;
+
        /* We want to be able to compute bound * 2, so limit the setting */
        if (bound > (int64) (INT_MAX / 2))
                return;
 
        state->bounded = true;
        state->bound = (int) bound;
+
+       /*
+        * Bounded sorts are not an effective target for abbreviated key
+        * optimization.  Disable by setting state to be consistent with no
+        * abbreviation support.
+        */
+       state->sortKeys->abbrev_converter = NULL;
+       if (state->sortKeys->abbrev_full_comparator)
+               state->sortKeys->comparator = state->sortKeys->abbrev_full_comparator;
+
+       /* Not strictly necessary, but be tidy */
+       state->sortKeys->abbrev_abort = NULL;
+       state->sortKeys->abbrev_full_comparator = NULL;
 }
 
 /*
@@ -925,11 +1259,13 @@ tuplesort_end(Tuplesortstate *state)
        if (trace_sort)
        {
                if (state->tapeset)
-                       elog(LOG, "external sort ended, %ld disk blocks used: %s",
-                                spaceUsed, pg_rusage_show(&state->ru_start));
+                       elog(LOG, "%s of worker %d ended, %ld disk blocks used: %s",
+                                SERIAL(state) ? "external sort" : "parallel external sort",
+                                state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
                else
-                       elog(LOG, "internal sort ended, %ld KB used: %s",
-                                spaceUsed, pg_rusage_show(&state->ru_start));
+                       elog(LOG, "%s of worker %d ended, %ld KB used: %s",
+                                SERIAL(state) ? "internal sort" : "unperformed parallel sort",
+                                state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
        }
 
        TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, spaceUsed);
@@ -963,7 +1299,7 @@ tuplesort_end(Tuplesortstate *state)
 /*
  * Grow the memtuples[] array, if possible within our memory constraint.  We
  * must not exceed INT_MAX tuples in memory or the caller-provided memory
- * limit.  Return TRUE if we were able to enlarge the array, FALSE if not.
+ * limit.  Return true if we were able to enlarge the array, false if not.
  *
  * Normally, at each increment we double the size of the array.  When doing
  * that would exceed a limit, we attempt one last, smaller increase (and then
@@ -1008,7 +1344,7 @@ grow_memtuples(Tuplesortstate *state)
                 * strategy and instead increase as much as we safely can.
                 *
                 * To stay within allowedMem, we can't increase memtupsize by more
-                * than availMem / sizeof(SortTuple) elements.  In practice, we want
+                * than availMem / sizeof(SortTuple) elements.  In practice, we want
                 * to increase it by considerably less, because we need to leave some
                 * space for the tuples to which the new array slots will refer.  We
                 * assume the new tuples will be about the same size as the tuples
@@ -1062,12 +1398,12 @@ grow_memtuples(Tuplesortstate *state)
         * We need to be sure that we do not cause LACKMEM to become true, else
         * the space management algorithm will go nuts.  The code above should
         * never generate a dangerous request, but to be safe, check explicitly
-        * that the array growth fits within availMem.  (We could still cause
+        * that the array growth fits within availMem.  (We could still cause
         * LACKMEM if the memory chunk overhead associated with the memtuples
-        * array were to increase.      That shouldn't happen with any sane value of
-        * allowedMem, because at any array size large enough to risk LACKMEM,
-        * palloc would be treating both old and new arrays as separate chunks.
-        * But we'll check LACKMEM explicitly below just in case.)
+        * array were to increase.  That shouldn't happen because we chose the
+        * initial array size large enough to ensure that palloc will be treating
+        * both old and new arrays as separate chunks.  But we'll check LACKMEM
+        * explicitly below just in case.)
         */
        if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(SortTuple)))
                goto noalloc;
@@ -1080,7 +1416,7 @@ grow_memtuples(Tuplesortstate *state)
                                          state->memtupsize * sizeof(SortTuple));
        USEMEM(state, GetMemoryChunkSpace(state->memtuples));
        if (LACKMEM(state))
-               elog(ERROR, "unexpected out-of-memory situation during sort");
+               elog(ERROR, "unexpected out-of-memory situation in tuplesort");
        return true;
 
 noalloc:
@@ -1134,21 +1470,76 @@ tuplesort_putheaptuple(Tuplesortstate *state, HeapTuple tup)
 }
 
 /*
- * Accept one index tuple while collecting input data for sort.
- *
- * Note that the input tuple is always copied; the caller need not save it.
+ * Collect one index tuple while collecting input data for sort, building
+ * it from caller-supplied values.
  */
 void
-tuplesort_putindextuple(Tuplesortstate *state, IndexTuple tuple)
+tuplesort_putindextuplevalues(Tuplesortstate *state, Relation rel,
+                                                         ItemPointer self, Datum *values,
+                                                         bool *isnull)
 {
-       MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
+       MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
        SortTuple       stup;
+       Datum           original;
+       IndexTuple      tuple;
 
-       /*
-        * Copy the given tuple into memory we control, and decrease availMem.
-        * Then call the common code.
-        */
-       COPYTUP(state, &stup, (void *) tuple);
+       stup.tuple = index_form_tuple(RelationGetDescr(rel), values, isnull);
+       tuple = ((IndexTuple) stup.tuple);
+       tuple->t_tid = *self;
+       USEMEM(state, GetMemoryChunkSpace(stup.tuple));
+       /* set up first-column key value */
+       original = index_getattr(tuple,
+                                                        1,
+                                                        RelationGetDescr(state->indexRel),
+                                                        &stup.isnull1);
+
+       MemoryContextSwitchTo(state->sortcontext);
+
+       if (!state->sortKeys || !state->sortKeys->abbrev_converter || stup.isnull1)
+       {
+               /*
+                * Store ordinary Datum representation, or NULL value.  If there is a
+                * converter it won't expect NULL values, and cost model is not
+                * required to account for NULL, so in that case we avoid calling
+                * converter and just set datum1 to zeroed representation (to be
+                * consistent, and to support cheap inequality tests for NULL
+                * abbreviated keys).
+                */
+               stup.datum1 = original;
+       }
+       else if (!consider_abort_common(state))
+       {
+               /* Store abbreviated key representation */
+               stup.datum1 = state->sortKeys->abbrev_converter(original,
+                                                                                                               state->sortKeys);
+       }
+       else
+       {
+               /* Abort abbreviation */
+               int                     i;
+
+               stup.datum1 = original;
+
+               /*
+                * Set state to be consistent with never trying abbreviation.
+                *
+                * Alter datum1 representation in already-copied tuples, so as to
+                * ensure a consistent representation (current tuple was just
+                * handled).  It does not matter if some dumped tuples are already
+                * sorted on tape, since serialized tuples lack abbreviated keys
+                * (TSS_BUILDRUNS state prevents control reaching here in any case).
+                */
+               for (i = 0; i < state->memtupcount; i++)
+               {
+                       SortTuple  *mtup = &state->memtuples[i];
+
+                       tuple = mtup->tuple;
+                       mtup->datum1 = index_getattr(tuple,
+                                                                                1,
+                                                                                RelationGetDescr(state->indexRel),
+                                                                                &mtup->isnull1);
+               }
+       }
 
        puttuple_common(state, &stup);
 
@@ -1163,25 +1554,75 @@ tuplesort_putindextuple(Tuplesortstate *state, IndexTuple tuple)
 void
 tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull)
 {
-       MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
+       MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
        SortTuple       stup;
 
        /*
-        * If it's a pass-by-reference value, copy it into memory we control, and
-        * decrease availMem.  Then call the common code.
+        * Pass-by-value types or null values are just stored directly in
+        * stup.datum1 (and stup.tuple is not used and set to NULL).
+        *
+        * Non-null pass-by-reference values need to be copied into memory we
+        * control, and possibly abbreviated. The copied value is pointed to by
+        * stup.tuple and is treated as the canonical copy (e.g. to return via
+        * tuplesort_getdatum or when writing to tape); stup.datum1 gets the
+        * abbreviated value if abbreviation is happening, otherwise it's
+        * identical to stup.tuple.
         */
-       if (isNull || state->datumTypeByVal)
+
+       if (isNull || !state->tuples)
        {
-               stup.datum1 = val;
+               /*
+                * Set datum1 to zeroed representation for NULLs (to be consistent,
+                * and to support cheap inequality tests for NULL abbreviated keys).
+                */
+               stup.datum1 = !isNull ? val : (Datum) 0;
                stup.isnull1 = isNull;
                stup.tuple = NULL;              /* no separate storage */
+               MemoryContextSwitchTo(state->sortcontext);
        }
        else
        {
-               stup.datum1 = datumCopy(val, false, state->datumTypeLen);
+               Datum           original = datumCopy(val, false, state->datumTypeLen);
+
                stup.isnull1 = false;
-               stup.tuple = DatumGetPointer(stup.datum1);
+               stup.tuple = DatumGetPointer(original);
                USEMEM(state, GetMemoryChunkSpace(stup.tuple));
+               MemoryContextSwitchTo(state->sortcontext);
+
+               if (!state->sortKeys->abbrev_converter)
+               {
+                       stup.datum1 = original;
+               }
+               else if (!consider_abort_common(state))
+               {
+                       /* Store abbreviated key representation */
+                       stup.datum1 = state->sortKeys->abbrev_converter(original,
+                                                                                                                       state->sortKeys);
+               }
+               else
+               {
+                       /* Abort abbreviation */
+                       int                     i;
+
+                       stup.datum1 = original;
+
+                       /*
+                        * Set state to be consistent with never trying abbreviation.
+                        *
+                        * Alter datum1 representation in already-copied tuples, so as to
+                        * ensure a consistent representation (current tuple was just
+                        * handled).  It does not matter if some dumped tuples are already
+                        * sorted on tape, since serialized tuples lack abbreviated keys
+                        * (TSS_BUILDRUNS state prevents control reaching here in any
+                        * case).
+                        */
+                       for (i = 0; i < state->memtupcount; i++)
+                       {
+                               SortTuple  *mtup = &state->memtuples[i];
+
+                               mtup->datum1 = PointerGetDatum(mtup->tuple);
+                       }
+               }
        }
 
        puttuple_common(state, &stup);
@@ -1195,12 +1636,14 @@ tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull)
 static void
 puttuple_common(Tuplesortstate *state, SortTuple *tuple)
 {
+       Assert(!LEADER(state));
+
        switch (state->status)
        {
                case TSS_INITIAL:
 
                        /*
-                        * Save the tuple into the unsorted array.      First, grow the array
+                        * Save the tuple into the unsorted array.  First, grow the array
                         * as needed.  Note that we try to grow the array when there is
                         * still one free slot remaining --- if we fail, there'll still be
                         * room to store the incoming tuple, and then we'll switch to
@@ -1221,7 +1664,7 @@ puttuple_common(Tuplesortstate *state, SortTuple *tuple)
                         * enough tuples to meet the bound.
                         *
                         * Note that once we enter TSS_BOUNDED state we will always try to
-                        * complete the sort that way.  In the worst case, if later input
+                        * complete the sort that way.  In the worst case, if later input
                         * tuples are larger than earlier ones, this might cause us to
                         * exceed workMem significantly.
                         */
@@ -1248,10 +1691,10 @@ puttuple_common(Tuplesortstate *state, SortTuple *tuple)
                        /*
                         * Nope; time to switch to tape-based operation.
                         */
-                       inittapes(state);
+                       inittapes(state, true);
 
                        /*
-                        * Dump tuples until we are back under the limit.
+                        * Dump all tuples.
                         */
                        dumptuples(state, false);
                        break;
@@ -1274,36 +1717,21 @@ puttuple_common(Tuplesortstate *state, SortTuple *tuple)
                        }
                        else
                        {
-                               /* discard top of heap, sift up, insert new tuple */
+                               /* discard top of heap, replacing it with the new tuple */
                                free_sort_tuple(state, &state->memtuples[0]);
-                               tuplesort_heap_siftup(state, false);
-                               tuplesort_heap_insert(state, tuple, 0, false);
+                               tuplesort_heap_replace_top(state, tuple);
                        }
                        break;
 
                case TSS_BUILDRUNS:
 
                        /*
-                        * Insert the tuple into the heap, with run number currentRun if
-                        * it can go into the current run, else run number currentRun+1.
-                        * The tuple can go into the current run if it is >= the first
-                        * not-yet-output tuple.  (Actually, it could go into the current
-                        * run if it is >= the most recently output tuple ... but that
-                        * would require keeping around the tuple we last output, and it's
-                        * simplest to let writetup free each tuple as soon as it's
-                        * written.)
-                        *
-                        * Note there will always be at least one tuple in the heap at
-                        * this point; see dumptuples.
+                        * Save the tuple into the unsorted array (there must be space)
                         */
-                       Assert(state->memtupcount > 0);
-                       if (COMPARETUP(state, tuple, &state->memtuples[0]) >= 0)
-                               tuplesort_heap_insert(state, tuple, state->currentRun, true);
-                       else
-                               tuplesort_heap_insert(state, tuple, state->currentRun + 1, true);
+                       state->memtuples[state->memtupcount++] = *tuple;
 
                        /*
-                        * If we are over the memory limit, dump tuples till we're under.
+                        * If we are over the memory limit, dump all tuples.
                         */
                        dumptuples(state, false);
                        break;
@@ -1314,6 +1742,47 @@ puttuple_common(Tuplesortstate *state, SortTuple *tuple)
        }
 }
 
+static bool
+consider_abort_common(Tuplesortstate *state)
+{
+       Assert(state->sortKeys[0].abbrev_converter != NULL);
+       Assert(state->sortKeys[0].abbrev_abort != NULL);
+       Assert(state->sortKeys[0].abbrev_full_comparator != NULL);
+
+       /*
+        * Check effectiveness of abbreviation optimization.  Consider aborting
+        * when still within memory limit.
+        */
+       if (state->status == TSS_INITIAL &&
+               state->memtupcount >= state->abbrevNext)
+       {
+               state->abbrevNext *= 2;
+
+               /*
+                * Check opclass-supplied abbreviation abort routine.  It may indicate
+                * that abbreviation should not proceed.
+                */
+               if (!state->sortKeys->abbrev_abort(state->memtupcount,
+                                                                                  state->sortKeys))
+                       return false;
+
+               /*
+                * Finally, restore authoritative comparator, and indicate that
+                * abbreviation is not in play by setting abbrev_converter to NULL
+                */
+               state->sortKeys[0].comparator = state->sortKeys[0].abbrev_full_comparator;
+               state->sortKeys[0].abbrev_converter = NULL;
+               /* Not strictly necessary, but be tidy */
+               state->sortKeys[0].abbrev_abort = NULL;
+               state->sortKeys[0].abbrev_full_comparator = NULL;
+
+               /* Give up - expect original pass-by-value representation */
+               return true;
+       }
+
+       return false;
+}
+
 /*
  * All tuples have been provided; finish the sort.
  */
@@ -1324,8 +1793,8 @@ tuplesort_performsort(Tuplesortstate *state)
 
 #ifdef TRACE_SORT
        if (trace_sort)
-               elog(LOG, "performsort starting: %s",
-                        pg_rusage_show(&state->ru_start));
+               elog(LOG, "performsort of worker %d starting: %s",
+                        state->worker, pg_rusage_show(&state->ru_start));
 #endif
 
        switch (state->status)
@@ -1334,32 +1803,46 @@ tuplesort_performsort(Tuplesortstate *state)
 
                        /*
                         * We were able to accumulate all the tuples within the allowed
-                        * amount of memory.  Just qsort 'em and we're done.
+                        * amount of memory, or leader to take over worker tapes
                         */
-                       if (state->memtupcount > 1)
+                       if (SERIAL(state))
                        {
-                               /* Can we use the single-key sort function? */
-                               if (state->onlyKey != NULL)
-                                       qsort_ssup(state->memtuples, state->memtupcount,
-                                                          state->onlyKey);
-                               else
-                                       qsort_tuple(state->memtuples,
-                                                               state->memtupcount,
-                                                               state->comparetup,
-                                                               state);
+                               /* Just qsort 'em and we're done */
+                               tuplesort_sort_memtuples(state);
+                               state->status = TSS_SORTEDINMEM;
+                       }
+                       else if (WORKER(state))
+                       {
+                               /*
+                                * Parallel workers must still dump out tuples to tape.  No
+                                * merge is required to produce single output run, though.
+                                */
+                               inittapes(state, false);
+                               dumptuples(state, true);
+                               worker_nomergeruns(state);
+                               state->status = TSS_SORTEDONTAPE;
+                       }
+                       else
+                       {
+                               /*
+                                * Leader will take over worker tapes and merge worker runs.
+                                * Note that mergeruns sets the correct state->status.
+                                */
+                               leader_takeover_tapes(state);
+                               mergeruns(state);
                        }
                        state->current = 0;
                        state->eof_reached = false;
+                       state->markpos_block = 0L;
                        state->markpos_offset = 0;
                        state->markpos_eof = false;
-                       state->status = TSS_SORTEDINMEM;
                        break;
 
                case TSS_BOUNDED:
 
                        /*
                         * We were able to accumulate all the tuples required for output
-                        * in memory, using a heap to eliminate excess tuples.  Now we
+                        * in memory, using a heap to eliminate excess tuples.  Now we
                         * have to transform the heap to a properly-sorted array.
                         */
                        sort_bounded_heap(state);
@@ -1373,10 +1856,10 @@ tuplesort_performsort(Tuplesortstate *state)
                case TSS_BUILDRUNS:
 
                        /*
-                        * Finish tape-based sort.      First, flush all tuples remaining in
+                        * Finish tape-based sort.  First, flush all tuples remaining in
                         * memory out to tape; then merge until we have a single remaining
-                        * run (or, if !randomAccess, one run per tape). Note that
-                        * mergeruns sets the correct state->status.
+                        * run (or, if !randomAccess and !WORKER(), one run per tape).
+                        * Note that mergeruns sets the correct state->status.
                         */
                        dumptuples(state, true);
                        mergeruns(state);
@@ -1395,12 +1878,12 @@ tuplesort_performsort(Tuplesortstate *state)
        if (trace_sort)
        {
                if (state->status == TSS_FINALMERGE)
-                       elog(LOG, "performsort done (except %d-way final merge): %s",
-                                state->activeTapes,
+                       elog(LOG, "performsort of worker %d done (except %d-way final merge): %s",
+                                state->worker, state->activeTapes,
                                 pg_rusage_show(&state->ru_start));
                else
-                       elog(LOG, "performsort done: %s",
-                                pg_rusage_show(&state->ru_start));
+                       elog(LOG, "performsort of worker %d done: %s",
+                                state->worker, pg_rusage_show(&state->ru_start));
        }
 #endif
 
@@ -1409,20 +1892,25 @@ tuplesort_performsort(Tuplesortstate *state)
 
 /*
  * Internal routine to fetch the next tuple in either forward or back
- * direction into *stup.  Returns FALSE if no more tuples.
- * If *should_free is set, the caller must pfree stup.tuple when done with it.
+ * direction into *stup.  Returns false if no more tuples.
+ * Returned tuple belongs to tuplesort memory context, and must not be freed
+ * by caller.  Note that fetched tuple is stored in memory that may be
+ * recycled by any future fetch.
  */
 static bool
 tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
-                                                 SortTuple *stup, bool *should_free)
+                                                 SortTuple *stup)
 {
        unsigned int tuplen;
+       size_t          nmoved;
+
+       Assert(!WORKER(state));
 
        switch (state->status)
        {
                case TSS_SORTEDINMEM:
                        Assert(forward || state->randomAccess);
-                       *should_free = false;
+                       Assert(!state->slabAllocatorUsed);
                        if (forward)
                        {
                                if (state->current < state->memtupcount)
@@ -1434,7 +1922,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
 
                                /*
                                 * Complain if caller tries to retrieve more tuples than
-                                * originally asked for in a bounded sort.      This is because
+                                * originally asked for in a bounded sort.  This is because
                                 * returning EOF here might be the wrong thing.
                                 */
                                if (state->bounded && state->current >= state->bound)
@@ -1466,14 +1954,34 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
 
                case TSS_SORTEDONTAPE:
                        Assert(forward || state->randomAccess);
-                       *should_free = true;
+                       Assert(state->slabAllocatorUsed);
+
+                       /*
+                        * The slot that held the tuple that we returned in previous
+                        * gettuple call can now be reused.
+                        */
+                       if (state->lastReturnedTuple)
+                       {
+                               RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
+                               state->lastReturnedTuple = NULL;
+                       }
+
                        if (forward)
                        {
                                if (state->eof_reached)
                                        return false;
+
                                if ((tuplen = getlen(state, state->result_tape, true)) != 0)
                                {
                                        READTUP(state, stup, state->result_tape, tuplen);
+
+                                       /*
+                                        * Remember the tuple we return, so that we can recycle
+                                        * its memory on next call.  (This can be NULL, in the
+                                        * !state->tuples case).
+                                        */
+                                       state->lastReturnedTuple = stup->tuple;
+
                                        return true;
                                }
                                else
@@ -1496,10 +2004,13 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
                                 * end of file; back up to fetch last tuple's ending length
                                 * word.  If seek fails we must have a completely empty file.
                                 */
-                               if (!LogicalTapeBackspace(state->tapeset,
-                                                                                 state->result_tape,
-                                                                                 2 * sizeof(unsigned int)))
+                               nmoved = LogicalTapeBackspace(state->tapeset,
+                                                                                         state->result_tape,
+                                                                                         2 * sizeof(unsigned int));
+                               if (nmoved == 0)
                                        return false;
+                               else if (nmoved != 2 * sizeof(unsigned int))
+                                       elog(ERROR, "unexpected tape position");
                                state->eof_reached = false;
                        }
                        else
@@ -1508,31 +2019,34 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
                                 * Back up and fetch previously-returned tuple's ending length
                                 * word.  If seek fails, assume we are at start of file.
                                 */
-                               if (!LogicalTapeBackspace(state->tapeset,
-                                                                                 state->result_tape,
-                                                                                 sizeof(unsigned int)))
+                               nmoved = LogicalTapeBackspace(state->tapeset,
+                                                                                         state->result_tape,
+                                                                                         sizeof(unsigned int));
+                               if (nmoved == 0)
                                        return false;
+                               else if (nmoved != sizeof(unsigned int))
+                                       elog(ERROR, "unexpected tape position");
                                tuplen = getlen(state, state->result_tape, false);
 
                                /*
                                 * Back up to get ending length word of tuple before it.
                                 */
-                               if (!LogicalTapeBackspace(state->tapeset,
-                                                                                 state->result_tape,
-                                                                                 tuplen + 2 * sizeof(unsigned int)))
+                               nmoved = LogicalTapeBackspace(state->tapeset,
+                                                                                         state->result_tape,
+                                                                                         tuplen + 2 * sizeof(unsigned int));
+                               if (nmoved == tuplen + sizeof(unsigned int))
                                {
                                        /*
-                                        * If that fails, presumably the prev tuple is the first
-                                        * in the file.  Back up so that it becomes next to read
-                                        * in forward direction (not obviously right, but that is
-                                        * what in-memory case does).
+                                        * We backed up over the previous tuple, but there was no
+                                        * ending length word before it.  That means that the prev
+                                        * tuple is the first tuple in the file.  It is now the
+                                        * next to read in forward direction (not obviously right,
+                                        * but that is what in-memory case does).
                                         */
-                                       if (!LogicalTapeBackspace(state->tapeset,
-                                                                                         state->result_tape,
-                                                                                         tuplen + sizeof(unsigned int)))
-                                               elog(ERROR, "bogus tuple length in backward scan");
                                        return false;
                                }
+                               else if (nmoved != tuplen + 2 * sizeof(unsigned int))
+                                       elog(ERROR, "bogus tuple length in backward scan");
                        }
 
                        tuplen = getlen(state, state->result_tape, false);
@@ -1542,16 +2056,35 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
                         * Note: READTUP expects we are positioned after the initial
                         * length word of the tuple, so back up to that point.
                         */
-                       if (!LogicalTapeBackspace(state->tapeset,
-                                                                         state->result_tape,
-                                                                         tuplen))
+                       nmoved = LogicalTapeBackspace(state->tapeset,
+                                                                                 state->result_tape,
+                                                                                 tuplen);
+                       if (nmoved != tuplen)
                                elog(ERROR, "bogus tuple length in backward scan");
                        READTUP(state, stup, state->result_tape, tuplen);
+
+                       /*
+                        * Remember the tuple we return, so that we can recycle its memory
+                        * on next call. (This can be NULL, in the Datum case).
+                        */
+                       state->lastReturnedTuple = stup->tuple;
+
                        return true;
 
                case TSS_FINALMERGE:
                        Assert(forward);
-                       *should_free = true;
+                       /* We are managing memory ourselves, with the slab allocator. */
+                       Assert(state->slabAllocatorUsed);
+
+                       /*
+                        * The slab slot holding the tuple that we returned in previous
+                        * gettuple call can now be reused.
+                        */
+                       if (state->lastReturnedTuple)
+                       {
+                               RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
+                               state->lastReturnedTuple = NULL;
+                       }
 
                        /*
                         * This code should match the inner loop of mergeonerun().
@@ -1559,45 +2092,38 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
                        if (state->memtupcount > 0)
                        {
                                int                     srcTape = state->memtuples[0].tupindex;
-                               Size            tuplen;
-                               int                     tupIndex;
-                               SortTuple  *newtup;
+                               SortTuple       newtup;
 
                                *stup = state->memtuples[0];
-                               /* returned tuple is no longer counted in our memory space */
-                               if (stup->tuple)
-                               {
-                                       tuplen = GetMemoryChunkSpace(stup->tuple);
-                                       state->availMem += tuplen;
-                                       state->mergeavailmem[srcTape] += tuplen;
-                               }
-                               tuplesort_heap_siftup(state, false);
-                               if ((tupIndex = state->mergenext[srcTape]) == 0)
+
+                               /*
+                                * Remember the tuple we return, so that we can recycle its
+                                * memory on next call. (This can be NULL, in the Datum case).
+                                */
+                               state->lastReturnedTuple = stup->tuple;
+
+                               /*
+                                * Pull next tuple from tape, and replace the returned tuple
+                                * at top of the heap with it.
+                                */
+                               if (!mergereadnext(state, srcTape, &newtup))
                                {
                                        /*
-                                        * out of preloaded data on this tape, try to read more
-                                        *
-                                        * Unlike mergeonerun(), we only preload from the single
-                                        * tape that's run dry.  See mergepreread() comments.
+                                        * If no more data, we've reached end of run on this tape.
+                                        * Remove the top node from the heap.
                                         */
-                                       mergeprereadone(state, srcTape);
+                                       tuplesort_heap_delete_top(state);
 
                                        /*
-                                        * if still no data, we've reached end of run on this tape
+                                        * Rewind to free the read buffer.  It'd go away at the
+                                        * end of the sort anyway, but better to release the
+                                        * memory early.
                                         */
-                                       if ((tupIndex = state->mergenext[srcTape]) == 0)
-                                               return true;
+                                       LogicalTapeRewindForWrite(state->tapeset, srcTape);
+                                       return true;
                                }
-                               /* pull next preread tuple from list, insert in heap */
-                               newtup = &state->memtuples[tupIndex];
-                               state->mergenext[srcTape] = newtup->tupindex;
-                               if (state->mergenext[srcTape] == 0)
-                                       state->mergelast[srcTape] = 0;
-                               tuplesort_heap_insert(state, newtup, srcTape, false);
-                               /* put the now-unused memtuples entry on the freelist */
-                               newtup->tupindex = state->mergefreelist;
-                               state->mergefreelist = tupIndex;
-                               state->mergeavailslots[srcTape]++;
+                               newtup.tupindex = srcTape;
+                               tuplesort_heap_replace_top(state, &newtup);
                                return true;
                        }
                        return false;
@@ -1610,25 +2136,46 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
 
 /*
  * Fetch the next tuple in either forward or back direction.
- * If successful, put tuple in slot and return TRUE; else, clear the slot
- * and return FALSE.
+ * If successful, put tuple in slot and return true; else, clear the slot
+ * and return false.
+ *
+ * Caller may optionally be passed back abbreviated value (on true return
+ * value) when abbreviation was used, which can be used to cheaply avoid
+ * equality checks that might otherwise be required.  Caller can safely make a
+ * determination of "non-equal tuple" based on simple binary inequality.  A
+ * NULL value in leading attribute will set abbreviated value to zeroed
+ * representation, which caller may rely on in abbreviated inequality check.
+ *
+ * If copy is true, the slot receives a tuple that's been copied into the
+ * caller's memory context, so that it will stay valid regardless of future
+ * manipulations of the tuplesort's state (up to and including deleting the
+ * tuplesort).  If copy is false, the slot will just receive a pointer to a
+ * tuple held within the tuplesort, which is more efficient, but only safe for
+ * callers that are prepared to have any subsequent manipulation of the
+ * tuplesort's state invalidate slot contents.
  */
 bool
-tuplesort_gettupleslot(Tuplesortstate *state, bool forward,
-                                          TupleTableSlot *slot)
+tuplesort_gettupleslot(Tuplesortstate *state, bool forward, bool copy,
+                                          TupleTableSlot *slot, Datum *abbrev)
 {
        MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
        SortTuple       stup;
-       bool            should_free;
 
-       if (!tuplesort_gettuple_common(state, forward, &stup, &should_free))
+       if (!tuplesort_gettuple_common(state, forward, &stup))
                stup.tuple = NULL;
 
        MemoryContextSwitchTo(oldcontext);
 
        if (stup.tuple)
        {
-               ExecStoreMinimalTuple((MinimalTuple) stup.tuple, slot, should_free);
+               /* Record abbreviated key for caller */
+               if (state->sortKeys->abbrev_converter && abbrev)
+                       *abbrev = stup.datum1;
+
+               if (copy)
+                       stup.tuple = heap_copy_minimal_tuple((MinimalTuple) stup.tuple);
+
+               ExecStoreMinimalTuple((MinimalTuple) stup.tuple, slot, copy);
                return true;
        }
        else
@@ -1640,16 +2187,17 @@ tuplesort_gettupleslot(Tuplesortstate *state, bool forward,
 
 /*
  * Fetch the next tuple in either forward or back direction.
- * Returns NULL if no more tuples.     If *should_free is set, the
- * caller must pfree the returned tuple when done with it.
+ * Returns NULL if no more tuples.  Returned tuple belongs to tuplesort memory
+ * context, and must not be freed by caller.  Caller may not rely on tuple
+ * remaining valid after any further manipulation of tuplesort.
  */
 HeapTuple
-tuplesort_getheaptuple(Tuplesortstate *state, bool forward, bool *should_free)
+tuplesort_getheaptuple(Tuplesortstate *state, bool forward)
 {
        MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
        SortTuple       stup;
 
-       if (!tuplesort_gettuple_common(state, forward, &stup, should_free))
+       if (!tuplesort_gettuple_common(state, forward, &stup))
                stup.tuple = NULL;
 
        MemoryContextSwitchTo(oldcontext);
@@ -1659,17 +2207,17 @@ tuplesort_getheaptuple(Tuplesortstate *state, bool forward, bool *should_free)
 
 /*
  * Fetch the next index tuple in either forward or back direction.
- * Returns NULL if no more tuples.     If *should_free is set, the
- * caller must pfree the returned tuple when done with it.
+ * Returns NULL if no more tuples.  Returned tuple belongs to tuplesort memory
+ * context, and must not be freed by caller.  Caller may not rely on tuple
+ * remaining valid after any further manipulation of tuplesort.
  */
 IndexTuple
-tuplesort_getindextuple(Tuplesortstate *state, bool forward,
-                                               bool *should_free)
+tuplesort_getindextuple(Tuplesortstate *state, bool forward)
 {
        MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
        SortTuple       stup;
 
-       if (!tuplesort_gettuple_common(state, forward, &stup, should_free))
+       if (!tuplesort_gettuple_common(state, forward, &stup))
                stup.tuple = NULL;
 
        MemoryContextSwitchTo(oldcontext);
@@ -1679,48 +2227,58 @@ tuplesort_getindextuple(Tuplesortstate *state, bool forward,
 
 /*
  * Fetch the next Datum in either forward or back direction.
- * Returns FALSE if no more datums.
+ * Returns false if no more datums.
  *
  * If the Datum is pass-by-ref type, the returned value is freshly palloc'd
- * and is now owned by the caller.
+ * in caller's context, and is now owned by the caller (this differs from
+ * similar routines for other types of tuplesorts).
+ *
+ * Caller may optionally be passed back abbreviated value (on true return
+ * value) when abbreviation was used, which can be used to cheaply avoid
+ * equality checks that might otherwise be required.  Caller can safely make a
+ * determination of "non-equal tuple" based on simple binary inequality.  A
+ * NULL value will have a zeroed abbreviated value representation, which caller
+ * may rely on in abbreviated inequality check.
  */
 bool
 tuplesort_getdatum(Tuplesortstate *state, bool forward,
-                                  Datum *val, bool *isNull)
+                                  Datum *val, bool *isNull, Datum *abbrev)
 {
        MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
        SortTuple       stup;
-       bool            should_free;
 
-       if (!tuplesort_gettuple_common(state, forward, &stup, &should_free))
+       if (!tuplesort_gettuple_common(state, forward, &stup))
        {
                MemoryContextSwitchTo(oldcontext);
                return false;
        }
 
-       if (stup.isnull1 || state->datumTypeByVal)
-       {
+       /* Ensure we copy into caller's memory context */
+       MemoryContextSwitchTo(oldcontext);
+
+       /* Record abbreviated key for caller */
+       if (state->sortKeys->abbrev_converter && abbrev)
+               *abbrev = stup.datum1;
+
+       if (stup.isnull1 || !state->tuples)
+       {
                *val = stup.datum1;
                *isNull = stup.isnull1;
        }
        else
        {
-               if (should_free)
-                       *val = stup.datum1;
-               else
-                       *val = datumCopy(stup.datum1, false, state->datumTypeLen);
+               /* use stup.tuple because stup.datum1 may be an abbreviation */
+               *val = datumCopy(PointerGetDatum(stup.tuple), false, state->datumTypeLen);
                *isNull = false;
        }
 
-       MemoryContextSwitchTo(oldcontext);
-
        return true;
 }
 
 /*
  * Advance over N tuples in either forward or back direction,
  * without returning any data.  N==0 is a no-op.
- * Returns TRUE if successful, FALSE if ran out of tuples.
+ * Returns true if successful, false if ran out of tuples.
  */
 bool
 tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
@@ -1729,10 +2287,11 @@ tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
 
        /*
         * We don't actually support backwards skip yet, because no callers need
-        * it.  The API is designed to allow for that later, though.
+        * it.  The API is designed to allow for that later, though.
         */
        Assert(forward);
        Assert(ntuples >= 0);
+       Assert(!WORKER(state));
 
        switch (state->status)
        {
@@ -1747,7 +2306,7 @@ tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
 
                        /*
                         * Complain if caller tries to retrieve more tuples than
-                        * originally asked for in a bounded sort.      This is because
+                        * originally asked for in a bounded sort.  This is because
                         * returning EOF here might be the wrong thing.
                         */
                        if (state->bounded && state->current >= state->bound)
@@ -1766,16 +2325,12 @@ tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
                        while (ntuples-- > 0)
                        {
                                SortTuple       stup;
-                               bool            should_free;
 
-                               if (!tuplesort_gettuple_common(state, forward,
-                                                                                          &stup, &should_free))
+                               if (!tuplesort_gettuple_common(state, forward, &stup))
                                {
                                        MemoryContextSwitchTo(oldcontext);
                                        return false;
                                }
-                               if (should_free && stup.tuple)
-                                       pfree(stup.tuple);
                                CHECK_FOR_INTERRUPTS();
                        }
                        MemoryContextSwitchTo(oldcontext);
@@ -1800,7 +2355,7 @@ tuplesort_merge_order(int64 allowedMem)
 
        /*
         * We need one tape for each merge input, plus another one for the output,
-        * and each of these tapes needs buffer space.  In addition we want
+        * and each of these tapes needs buffer space.  In addition we want
         * MERGE_BUFFER_SIZE workspace per input tape (but the output tape doesn't
         * count).
         *
@@ -1811,8 +2366,19 @@ tuplesort_merge_order(int64 allowedMem)
        mOrder = (allowedMem - TAPE_BUFFER_OVERHEAD) /
                (MERGE_BUFFER_SIZE + TAPE_BUFFER_OVERHEAD);
 
-       /* Even in minimum memory, use at least a MINORDER merge */
+       /*
+        * Even in minimum memory, use at least a MINORDER merge.  On the other
+        * hand, even when we have lots of memory, do not use more than a MAXORDER
+        * merge.  Tapes are pretty cheap, but they're not entirely free.  Each
+        * additional tape reduces the amount of memory available to build runs,
+        * which in turn can cause the same sort to need more runs, which makes
+        * merging slower even if it can still be done in a single pass.  Also,
+        * high order merges are quite slow due to CPU cache effects; it can be
+        * faster to pay the I/O cost of a polyphase merge than to perform a
+        * single merge pass across many hundreds of tapes.
+        */
        mOrder = Max(mOrder, MINORDER);
+       mOrder = Min(mOrder, MAXORDER);
 
        return mOrder;
 }
@@ -1820,107 +2386,101 @@ tuplesort_merge_order(int64 allowedMem)
 /*
  * inittapes - initialize for tape sorting.
  *
- * This is called only if we have found we don't have room to sort in memory.
+ * This is called only if we have found we won't sort in memory.
  */
 static void
-inittapes(Tuplesortstate *state)
+inittapes(Tuplesortstate *state, bool mergeruns)
 {
        int                     maxTapes,
-                               ntuples,
                                j;
-       int64           tapeSpace;
 
-       /* Compute number of tapes to use: merge order plus 1 */
-       maxTapes = tuplesort_merge_order(state->allowedMem) + 1;
-
-       /*
-        * We must have at least 2*maxTapes slots in the memtuples[] array, else
-        * we'd not have room for merge heap plus preread.  It seems unlikely that
-        * this case would ever occur, but be safe.
-        */
-       maxTapes = Min(maxTapes, state->memtupsize / 2);
+       Assert(!LEADER(state));
 
-       state->maxTapes = maxTapes;
-       state->tapeRange = maxTapes - 1;
+       if (mergeruns)
+       {
+               /* Compute number of tapes to use: merge order plus 1 */
+               maxTapes = tuplesort_merge_order(state->allowedMem) + 1;
+       }
+       else
+       {
+               /* Workers can sometimes produce single run, output without merge */
+               Assert(WORKER(state));
+               maxTapes = MINORDER + 1;
+       }
 
 #ifdef TRACE_SORT
        if (trace_sort)
-               elog(LOG, "switching to external sort with %d tapes: %s",
-                        maxTapes, pg_rusage_show(&state->ru_start));
+               elog(LOG, "worker %d switching to external sort with %d tapes: %s",
+                        state->worker, maxTapes, pg_rusage_show(&state->ru_start));
 #endif
 
+       /* Create the tape set and allocate the per-tape data arrays */
+       inittapestate(state, maxTapes);
+       state->tapeset =
+               LogicalTapeSetCreate(maxTapes, NULL,
+                                                        state->shared ? &state->shared->fileset : NULL,
+                                                        state->worker);
+
+       state->currentRun = 0;
+
+       /*
+        * Initialize variables of Algorithm D (step D1).
+        */
+       for (j = 0; j < maxTapes; j++)
+       {
+               state->tp_fib[j] = 1;
+               state->tp_runs[j] = 0;
+               state->tp_dummy[j] = 1;
+               state->tp_tapenum[j] = j;
+       }
+       state->tp_fib[state->tapeRange] = 0;
+       state->tp_dummy[state->tapeRange] = 0;
+
+       state->Level = 1;
+       state->destTape = 0;
+
+       state->status = TSS_BUILDRUNS;
+}
+
+/*
+ * inittapestate - initialize generic tape management state
+ */
+static void
+inittapestate(Tuplesortstate *state, int maxTapes)
+{
+       int64           tapeSpace;
+
        /*
         * Decrease availMem to reflect the space needed for tape buffers; but
         * don't decrease it to the point that we have no room for tuples. (That
         * case is only likely to occur if sorting pass-by-value Datums; in all
         * other scenarios the memtuples[] array is unlikely to occupy more than
-        * half of allowedMem.  In the pass-by-value case it's not important to
+        * half of allowedMem.  In the pass-by-value case it's not important to
         * account for tuple space, so we don't care if LACKMEM becomes
         * inaccurate.)
         */
        tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD;
+
        if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
                USEMEM(state, tapeSpace);
 
        /*
         * Make sure that the temp file(s) underlying the tape set are created in
-        * suitable temp tablespaces.
+        * suitable temp tablespaces.  For parallel sorts, this should have been
+        * called already, but it doesn't matter if it is called a second time.
         */
        PrepareTempTablespaces();
 
-       /*
-        * Create the tape set and allocate the per-tape data arrays.
-        */
-       state->tapeset = LogicalTapeSetCreate(maxTapes);
-
        state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool));
-       state->mergenext = (int *) palloc0(maxTapes * sizeof(int));
-       state->mergelast = (int *) palloc0(maxTapes * sizeof(int));
-       state->mergeavailslots = (int *) palloc0(maxTapes * sizeof(int));
-       state->mergeavailmem = (int64 *) palloc0(maxTapes * sizeof(int64));
        state->tp_fib = (int *) palloc0(maxTapes * sizeof(int));
        state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
        state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int));
        state->tp_tapenum = (int *) palloc0(maxTapes * sizeof(int));
 
-       /*
-        * Convert the unsorted contents of memtuples[] into a heap. Each tuple is
-        * marked as belonging to run number zero.
-        *
-        * NOTE: we pass false for checkIndex since there's no point in comparing
-        * indexes in this step, even though we do intend the indexes to be part
-        * of the sort key...
-        */
-       ntuples = state->memtupcount;
-       state->memtupcount = 0;         /* make the heap empty */
-       for (j = 0; j < ntuples; j++)
-       {
-               /* Must copy source tuple to avoid possible overwrite */
-               SortTuple       stup = state->memtuples[j];
-
-               tuplesort_heap_insert(state, &stup, 0, false);
-       }
-       Assert(state->memtupcount == ntuples);
-
-       state->currentRun = 0;
-
-       /*
-        * Initialize variables of Algorithm D (step D1).
-        */
-       for (j = 0; j < maxTapes; j++)
-       {
-               state->tp_fib[j] = 1;
-               state->tp_runs[j] = 0;
-               state->tp_dummy[j] = 1;
-               state->tp_tapenum[j] = j;
-       }
-       state->tp_fib[state->tapeRange] = 0;
-       state->tp_dummy[state->tapeRange] = 0;
-
-       state->Level = 1;
-       state->destTape = 0;
-
-       state->status = TSS_BUILDRUNS;
+       /* Record # of tapes allocated (for duration of sort) */
+       state->maxTapes = maxTapes;
+       /* Record maximum # of tapes usable as inputs when merging */
+       state->tapeRange = maxTapes - 1;
 }
 
 /*
@@ -1958,6 +2518,39 @@ selectnewtape(Tuplesortstate *state)
        state->destTape = 0;
 }
 
+/*
+ * Initialize the slab allocation arena, for the given number of slots.
+ */
+static void
+init_slab_allocator(Tuplesortstate *state, int numSlots)
+{
+       if (numSlots > 0)
+       {
+               char       *p;
+               int                     i;
+
+               state->slabMemoryBegin = palloc(numSlots * SLAB_SLOT_SIZE);
+               state->slabMemoryEnd = state->slabMemoryBegin +
+                       numSlots * SLAB_SLOT_SIZE;
+               state->slabFreeHead = (SlabSlot *) state->slabMemoryBegin;
+               USEMEM(state, numSlots * SLAB_SLOT_SIZE);
+
+               p = state->slabMemoryBegin;
+               for (i = 0; i < numSlots - 1; i++)
+               {
+                       ((SlabSlot *) p)->nextfree = (SlabSlot *) (p + SLAB_SLOT_SIZE);
+                       p += SLAB_SLOT_SIZE;
+               }
+               ((SlabSlot *) p)->nextfree = NULL;
+       }
+       else
+       {
+               state->slabMemoryBegin = state->slabMemoryEnd = NULL;
+               state->slabFreeHead = NULL;
+       }
+       state->slabAllocatorUsed = true;
+}
+
 /*
  * mergeruns -- merge all the completed initial runs.
  *
@@ -1971,28 +2564,112 @@ mergeruns(Tuplesortstate *state)
                                svTape,
                                svRuns,
                                svDummy;
+       int                     numTapes;
+       int                     numInputTapes;
 
        Assert(state->status == TSS_BUILDRUNS);
        Assert(state->memtupcount == 0);
 
+       if (state->sortKeys != NULL && state->sortKeys->abbrev_converter != NULL)
+       {
+               /*
+                * If there are multiple runs to be merged, when we go to read back
+                * tuples from disk, abbreviated keys will not have been stored, and
+                * we don't care to regenerate them.  Disable abbreviation from this
+                * point on.
+                */
+               state->sortKeys->abbrev_converter = NULL;
+               state->sortKeys->comparator = state->sortKeys->abbrev_full_comparator;
+
+               /* Not strictly necessary, but be tidy */
+               state->sortKeys->abbrev_abort = NULL;
+               state->sortKeys->abbrev_full_comparator = NULL;
+       }
+
+       /*
+        * Reset tuple memory.  We've freed all the tuples that we previously
+        * allocated.  We will use the slab allocator from now on.
+        */
+       MemoryContextDelete(state->tuplecontext);
+       state->tuplecontext = NULL;
+
+       /*
+        * We no longer need a large memtuples array.  (We will allocate a smaller
+        * one for the heap later.)
+        */
+       FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
+       pfree(state->memtuples);
+       state->memtuples = NULL;
+
        /*
-        * If we produced only one initial run (quite likely if the total data
-        * volume is between 1X and 2X workMem), we can just use that tape as the
-        * finished output, rather than doing a useless merge.  (This obvious
-        * optimization is not in Knuth's algorithm.)
+        * If we had fewer runs than tapes, refund the memory that we imagined we
+        * would need for the tape buffers of the unused tapes.
+        *
+        * numTapes and numInputTapes reflect the actual number of tapes we will
+        * use.  Note that the output tape's tape number is maxTapes - 1, so the
+        * tape numbers of the used tapes are not consecutive, and you cannot just
+        * loop from 0 to numTapes to visit all used tapes!
         */
-       if (state->currentRun == 1)
+       if (state->Level == 1)
        {
-               state->result_tape = state->tp_tapenum[state->destTape];
-               /* must freeze and rewind the finished output tape */
-               LogicalTapeFreeze(state->tapeset, state->result_tape);
-               state->status = TSS_SORTEDONTAPE;
-               return;
+               numInputTapes = state->currentRun;
+               numTapes = numInputTapes + 1;
+               FREEMEM(state, (state->maxTapes - numTapes) * TAPE_BUFFER_OVERHEAD);
+       }
+       else
+       {
+               numInputTapes = state->tapeRange;
+               numTapes = state->maxTapes;
        }
 
+       /*
+        * Initialize the slab allocator.  We need one slab slot per input tape,
+        * for the tuples in the heap, plus one to hold the tuple last returned
+        * from tuplesort_gettuple.  (If we're sorting pass-by-val Datums,
+        * however, we don't need to do allocate anything.)
+        *
+        * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism
+        * to track memory usage of individual tuples.
+        */
+       if (state->tuples)
+               init_slab_allocator(state, numInputTapes + 1);
+       else
+               init_slab_allocator(state, 0);
+
+       /*
+        * Allocate a new 'memtuples' array, for the heap.  It will hold one tuple
+        * from each input tape.
+        */
+       state->memtupsize = numInputTapes;
+       state->memtuples = (SortTuple *) palloc(numInputTapes * sizeof(SortTuple));
+       USEMEM(state, GetMemoryChunkSpace(state->memtuples));
+
+       /*
+        * Use all the remaining memory we have available for read buffers among
+        * the input tapes.
+        *
+        * We don't try to "rebalance" the memory among tapes, when we start a new
+        * merge phase, even if some tapes are inactive in the new phase.  That
+        * would be hard, because logtape.c doesn't know where one run ends and
+        * another begins.  When a new merge phase begins, and a tape doesn't
+        * participate in it, its buffer nevertheless already contains tuples from
+        * the next run on same tape, so we cannot release the buffer.  That's OK
+        * in practice, merge performance isn't that sensitive to the amount of
+        * buffers used, and most merge phases use all or almost all tapes,
+        * anyway.
+        */
+#ifdef TRACE_SORT
+       if (trace_sort)
+               elog(LOG, "worker %d using " INT64_FORMAT " KB of memory for read buffers among %d input tapes",
+                        state->worker, state->availMem / 1024, numInputTapes);
+#endif
+
+       state->read_buffer_size = Max(state->availMem / numInputTapes, 0);
+       USEMEM(state, state->read_buffer_size * numInputTapes);
+
        /* End of step D2: rewind all output tapes to prepare for merging */
        for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
-               LogicalTapeRewind(state->tapeset, tapenum, false);
+               LogicalTapeRewindForRead(state->tapeset, tapenum, state->read_buffer_size);
 
        for (;;)
        {
@@ -2002,7 +2679,7 @@ mergeruns(Tuplesortstate *state)
                 * pass remains.  If we don't have to produce a materialized sorted
                 * tape, we can stop at this point and do the final merge on-the-fly.
                 */
-               if (!state->randomAccess)
+               if (!state->randomAccess && !WORKER(state))
                {
                        bool            allOneRun = true;
 
@@ -2055,11 +2732,10 @@ mergeruns(Tuplesortstate *state)
                if (--state->Level == 0)
                        break;
                /* rewind output tape T to use as new input */
-               LogicalTapeRewind(state->tapeset, state->tp_tapenum[state->tapeRange],
-                                                 false);
+               LogicalTapeRewindForRead(state->tapeset, state->tp_tapenum[state->tapeRange],
+                                                                state->read_buffer_size);
                /* rewind used-up input tape P, and prepare it for write pass */
-               LogicalTapeRewind(state->tapeset, state->tp_tapenum[state->tapeRange - 1],
-                                                 true);
+               LogicalTapeRewindForWrite(state->tapeset, state->tp_tapenum[state->tapeRange - 1]);
                state->tp_runs[state->tapeRange - 1] = 0;
 
                /*
@@ -2084,12 +2760,22 @@ mergeruns(Tuplesortstate *state)
         * the loop without performing the last iteration of step D6, we have not
         * rearranged the tape unit assignment, and therefore the result is on
         * TAPE[T].  We need to do it this way so that we can freeze the final
-        * output tape while rewinding it.      The last iteration of step D6 would be
+        * output tape while rewinding it.  The last iteration of step D6 would be
         * a waste of cycles anyway...
         */
        state->result_tape = state->tp_tapenum[state->tapeRange];
-       LogicalTapeFreeze(state->tapeset, state->result_tape);
+       if (!WORKER(state))
+               LogicalTapeFreeze(state->tapeset, state->result_tape, NULL);
+       else
+               worker_freeze_result_tape(state);
        state->status = TSS_SORTEDONTAPE;
+
+       /* Release the read buffers of all the other tapes, by rewinding them. */
+       for (tapenum = 0; tapenum < state->maxTapes; tapenum++)
+       {
+               if (tapenum != state->result_tape)
+                       LogicalTapeRewindForWrite(state->tapeset, tapenum);
+       }
 }
 
 /*
@@ -2103,10 +2789,6 @@ mergeonerun(Tuplesortstate *state)
 {
        int                     destTape = state->tp_tapenum[state->tapeRange];
        int                     srcTape;
-       int                     tupIndex;
-       SortTuple  *tup;
-       int64           priorAvail,
-                               spaceFreed;
 
        /*
         * Start the merge by loading one tuple from each active source tape into
@@ -2121,33 +2803,28 @@ mergeonerun(Tuplesortstate *state)
         */
        while (state->memtupcount > 0)
        {
+               SortTuple       stup;
+
                /* write the tuple to destTape */
-               priorAvail = state->availMem;
                srcTape = state->memtuples[0].tupindex;
                WRITETUP(state, destTape, &state->memtuples[0]);
-               /* writetup adjusted total free space, now fix per-tape space */
-               spaceFreed = state->availMem - priorAvail;
-               state->mergeavailmem[srcTape] += spaceFreed;
-               /* compact the heap */
-               tuplesort_heap_siftup(state, false);
-               if ((tupIndex = state->mergenext[srcTape]) == 0)
+
+               /* recycle the slot of the tuple we just wrote out, for the next read */
+               if (state->memtuples[0].tuple)
+                       RELEASE_SLAB_SLOT(state, state->memtuples[0].tuple);
+
+               /*
+                * pull next tuple from the tape, and replace the written-out tuple in
+                * the heap with it.
+                */
+               if (mergereadnext(state, srcTape, &stup))
                {
-                       /* out of preloaded data on this tape, try to read more */
-                       mergepreread(state);
-                       /* if still no data, we've reached end of run on this tape */
-                       if ((tupIndex = state->mergenext[srcTape]) == 0)
-                               continue;
+                       stup.tupindex = srcTape;
+                       tuplesort_heap_replace_top(state, &stup);
+
                }
-               /* pull next preread tuple from list, insert in heap */
-               tup = &state->memtuples[tupIndex];
-               state->mergenext[srcTape] = tup->tupindex;
-               if (state->mergenext[srcTape] == 0)
-                       state->mergelast[srcTape] = 0;
-               tuplesort_heap_insert(state, tup, srcTape, false);
-               /* put the now-unused memtuples entry on the freelist */
-               tup->tupindex = state->mergefreelist;
-               state->mergefreelist = tupIndex;
-               state->mergeavailslots[srcTape]++;
+               else
+                       tuplesort_heap_delete_top(state);
        }
 
        /*
@@ -2159,8 +2836,8 @@ mergeonerun(Tuplesortstate *state)
 
 #ifdef TRACE_SORT
        if (trace_sort)
-               elog(LOG, "finished %d-way merge step: %s", state->activeTapes,
-                        pg_rusage_show(&state->ru_start));
+               elog(LOG, "worker %d finished %d-way merge step: %s", state->worker,
+                        state->activeTapes, pg_rusage_show(&state->ru_start));
 #endif
 }
 
@@ -2168,9 +2845,8 @@ mergeonerun(Tuplesortstate *state)
  * beginmerge - initialize for a merge pass
  *
  * We decrease the counts of real and dummy runs for each tape, and mark
- * which tapes contain active input runs in mergeactive[].     Then, load
- * as many tuples as we can from each active input tape, and finally
- * fill the merge heap with the first tuple from each active tape.
+ * which tapes contain active input runs in mergeactive[].  Then, fill the
+ * merge heap with the first tuple from each active tape.
  */
 static void
 beginmerge(Tuplesortstate *state)
@@ -2178,8 +2854,6 @@ beginmerge(Tuplesortstate *state)
        int                     activeTapes;
        int                     tapenum;
        int                     srcTape;
-       int                     slotsPerTape;
-       int64           spacePerTape;
 
        /* Heap should be empty here */
        Assert(state->memtupcount == 0);
@@ -2201,209 +2875,150 @@ beginmerge(Tuplesortstate *state)
                        activeTapes++;
                }
        }
-       state->activeTapes = activeTapes;
-
-       /* Clear merge-pass state variables */
-       memset(state->mergenext, 0,
-                  state->maxTapes * sizeof(*state->mergenext));
-       memset(state->mergelast, 0,
-                  state->maxTapes * sizeof(*state->mergelast));
-       state->mergefreelist = 0;       /* nothing in the freelist */
-       state->mergefirstfree = activeTapes;            /* 1st slot avail for preread */
-
-       /*
-        * Initialize space allocation to let each active input tape have an equal
-        * share of preread space.
-        */
        Assert(activeTapes > 0);
-       slotsPerTape = (state->memtupsize - state->mergefirstfree) / activeTapes;
-       Assert(slotsPerTape > 0);
-       spacePerTape = state->availMem / activeTapes;
-       for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
-       {
-               if (state->mergeactive[srcTape])
-               {
-                       state->mergeavailslots[srcTape] = slotsPerTape;
-                       state->mergeavailmem[srcTape] = spacePerTape;
-               }
-       }
-
-       /*
-        * Preread as many tuples as possible (and at least one) from each active
-        * tape
-        */
-       mergepreread(state);
+       state->activeTapes = activeTapes;
 
        /* Load the merge heap with the first tuple from each input tape */
        for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
        {
-               int                     tupIndex = state->mergenext[srcTape];
-               SortTuple  *tup;
+               SortTuple       tup;
 
-               if (tupIndex)
+               if (mergereadnext(state, srcTape, &tup))
                {
-                       tup = &state->memtuples[tupIndex];
-                       state->mergenext[srcTape] = tup->tupindex;
-                       if (state->mergenext[srcTape] == 0)
-                               state->mergelast[srcTape] = 0;
-                       tuplesort_heap_insert(state, tup, srcTape, false);
-                       /* put the now-unused memtuples entry on the freelist */
-                       tup->tupindex = state->mergefreelist;
-                       state->mergefreelist = tupIndex;
-                       state->mergeavailslots[srcTape]++;
+                       tup.tupindex = srcTape;
+                       tuplesort_heap_insert(state, &tup);
                }
        }
 }
 
 /*
- * mergepreread - load tuples from merge input tapes
- *
- * This routine exists to improve sequentiality of reads during a merge pass,
- * as explained in the header comments of this file.  Load tuples from each
- * active source tape until the tape's run is exhausted or it has used up
- * its fair share of available memory. In any case, we guarantee that there
- * is at least one preread tuple available from each unexhausted input tape.
- *
- * We invoke this routine at the start of a merge pass for initial load,
- * and then whenever any tape's preread data runs out.  Note that we load
- * as much data as possible from all tapes, not just the one that ran out.
- * This is because logtape.c works best with a usage pattern that alternates
- * between reading a lot of data and writing a lot of data, so whenever we
- * are forced to read, we should fill working memory completely.
- *
- * In FINALMERGE state, we *don't* use this routine, but instead just preread
- * from the single tape that ran dry.  There's no read/write alternation in
- * that state and so no point in scanning through all the tapes to fix one.
- * (Moreover, there may be quite a lot of inactive tapes in that state, since
- * we might have had many fewer runs than tapes.  In a regular tape-to-tape
- * merge we can expect most of the tapes to be active.)
- */
-static void
-mergepreread(Tuplesortstate *state)
-{
-       int                     srcTape;
-
-       for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
-               mergeprereadone(state, srcTape);
-}
-
-/*
- * mergeprereadone - load tuples from one merge input tape
+ * mergereadnext - read next tuple from one merge input tape
  *
- * Read tuples from the specified tape until it has used up its free memory
- * or array slots; but ensure that we have at least one tuple, if any are
- * to be had.
+ * Returns false on EOF.
  */
-static void
-mergeprereadone(Tuplesortstate *state, int srcTape)
+static bool
+mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup)
 {
        unsigned int tuplen;
-       SortTuple       stup;
-       int                     tupIndex;
-       int64           priorAvail,
-                               spaceUsed;
 
        if (!state->mergeactive[srcTape])
-               return;                                 /* tape's run is already exhausted */
-       priorAvail = state->availMem;
-       state->availMem = state->mergeavailmem[srcTape];
-       while ((state->mergeavailslots[srcTape] > 0 && !LACKMEM(state)) ||
-                  state->mergenext[srcTape] == 0)
-       {
-               /* read next tuple, if any */
-               if ((tuplen = getlen(state, srcTape, true)) == 0)
-               {
-                       state->mergeactive[srcTape] = false;
-                       break;
-               }
-               READTUP(state, &stup, srcTape, tuplen);
-               /* find a free slot in memtuples[] for it */
-               tupIndex = state->mergefreelist;
-               if (tupIndex)
-                       state->mergefreelist = state->memtuples[tupIndex].tupindex;
-               else
-               {
-                       tupIndex = state->mergefirstfree++;
-                       Assert(tupIndex < state->memtupsize);
-               }
-               state->mergeavailslots[srcTape]--;
-               /* store tuple, append to list for its tape */
-               stup.tupindex = 0;
-               state->memtuples[tupIndex] = stup;
-               if (state->mergelast[srcTape])
-                       state->memtuples[state->mergelast[srcTape]].tupindex = tupIndex;
-               else
-                       state->mergenext[srcTape] = tupIndex;
-               state->mergelast[srcTape] = tupIndex;
+               return false;                   /* tape's run is already exhausted */
+
+       /* read next tuple, if any */
+       if ((tuplen = getlen(state, srcTape, true)) == 0)
+       {
+               state->mergeactive[srcTape] = false;
+               return false;
        }
-       /* update per-tape and global availmem counts */
-       spaceUsed = state->mergeavailmem[srcTape] - state->availMem;
-       state->mergeavailmem[srcTape] = state->availMem;
-       state->availMem = priorAvail - spaceUsed;
+       READTUP(state, stup, srcTape, tuplen);
+
+       return true;
 }
 
 /*
- * dumptuples - remove tuples from heap and write to tape
+ * dumptuples - remove tuples from memtuples and write initial run to tape
  *
- * This is used during initial-run building, but not during merging.
- *
- * When alltuples = false, dump only enough tuples to get under the
- * availMem limit (and leave at least one tuple in the heap in any case,
- * since puttuple assumes it always has a tuple to compare to).  We also
- * insist there be at least one free slot in the memtuples[] array.
- *
- * When alltuples = true, dump everything currently in memory.
- * (This case is only used at end of input data.)
- *
- * If we empty the heap, close out the current run and return (this should
- * only happen at end of input data).  If we see that the tuple run number
- * at the top of the heap has changed, start a new run.
+ * When alltuples = true, dump everything currently in memory.  (This case is
+ * only used at end of input data.)
  */
 static void
 dumptuples(Tuplesortstate *state, bool alltuples)
 {
-       while (alltuples ||
-                  (LACKMEM(state) && state->memtupcount > 1) ||
-                  state->memtupcount >= state->memtupsize)
+       int                     memtupwrite;
+       int                     i;
+
+       /*
+        * Nothing to do if we still fit in available memory and have array slots,
+        * unless this is the final call during initial run generation.
+        */
+       if (state->memtupcount < state->memtupsize && !LACKMEM(state) &&
+               !alltuples)
+               return;
+
+       /*
+        * Final call might require no sorting, in rare cases where we just so
+        * happen to have previously LACKMEM()'d at the point where exactly all
+        * remaining tuples are loaded into memory, just before input was
+        * exhausted.
+        *
+        * In general, short final runs are quite possible.  Rather than allowing
+        * a special case where there was a superfluous selectnewtape() call (i.e.
+        * a call with no subsequent run actually written to destTape), we prefer
+        * to write out a 0 tuple run.
+        *
+        * mergereadnext() is prepared for 0 tuple runs, and will reliably mark
+        * the tape inactive for the merge when called from beginmerge().  This
+        * case is therefore similar to the case where mergeonerun() finds a dummy
+        * run for the tape, and so doesn't need to merge a run from the tape (or
+        * conceptually "merges" the dummy run, if you prefer).  According to
+        * Knuth, Algorithm D "isn't strictly optimal" in its method of
+        * distribution and dummy run assignment; this edge case seems very
+        * unlikely to make that appreciably worse.
+        */
+       Assert(state->status == TSS_BUILDRUNS);
+
+       /*
+        * It seems unlikely that this limit will ever be exceeded, but take no
+        * chances
+        */
+       if (state->currentRun == INT_MAX)
+               ereport(ERROR,
+                               (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+                                errmsg("cannot have more than %d runs for an external sort",
+                                               INT_MAX)));
+
+       state->currentRun++;
+
+#ifdef TRACE_SORT
+       if (trace_sort)
+               elog(LOG, "worker %d starting quicksort of run %d: %s",
+                        state->worker, state->currentRun,
+                        pg_rusage_show(&state->ru_start));
+#endif
+
+       /*
+        * Sort all tuples accumulated within the allowed amount of memory for
+        * this run using quicksort
+        */
+       tuplesort_sort_memtuples(state);
+
+#ifdef TRACE_SORT
+       if (trace_sort)
+               elog(LOG, "worker %d finished quicksort of run %d: %s",
+                        state->worker, state->currentRun,
+                        pg_rusage_show(&state->ru_start));
+#endif
+
+       memtupwrite = state->memtupcount;
+       for (i = 0; i < memtupwrite; i++)
        {
-               /*
-                * Dump the heap's frontmost entry, and sift up to remove it from the
-                * heap.
-                */
-               Assert(state->memtupcount > 0);
                WRITETUP(state, state->tp_tapenum[state->destTape],
-                                &state->memtuples[0]);
-               tuplesort_heap_siftup(state, true);
+                                &state->memtuples[i]);
+               state->memtupcount--;
+       }
 
-               /*
-                * If the heap is empty *or* top run number has changed, we've
-                * finished the current run.
-                */
-               if (state->memtupcount == 0 ||
-                       state->currentRun != state->memtuples[0].tupindex)
-               {
-                       markrunend(state, state->tp_tapenum[state->destTape]);
-                       state->currentRun++;
-                       state->tp_runs[state->destTape]++;
-                       state->tp_dummy[state->destTape]--; /* per Alg D step D2 */
+       /*
+        * Reset tuple memory.  We've freed all of the tuples that we previously
+        * allocated.  It's important to avoid fragmentation when there is a stark
+        * change in the sizes of incoming tuples.  Fragmentation due to
+        * AllocSetFree's bucketing by size class might be particularly bad if
+        * this step wasn't taken.
+        */
+       MemoryContextReset(state->tuplecontext);
+
+       markrunend(state, state->tp_tapenum[state->destTape]);
+       state->tp_runs[state->destTape]++;
+       state->tp_dummy[state->destTape]--; /* per Alg D step D2 */
 
 #ifdef TRACE_SORT
-                       if (trace_sort)
-                               elog(LOG, "finished writing%s run %d to tape %d: %s",
-                                        (state->memtupcount == 0) ? " final" : "",
-                                        state->currentRun, state->destTape,
-                                        pg_rusage_show(&state->ru_start));
+       if (trace_sort)
+               elog(LOG, "worker %d finished writing run %d to tape %d: %s",
+                        state->worker, state->currentRun, state->destTape,
+                        pg_rusage_show(&state->ru_start));
 #endif
 
-                       /*
-                        * Done if heap is empty, else prepare for new run.
-                        */
-                       if (state->memtupcount == 0)
-                               break;
-                       Assert(state->currentRun == state->memtuples[0].tupindex);
-                       selectnewtape(state);
-               }
-       }
+       if (!alltuples)
+               selectnewtape(state);
 }
 
 /*
@@ -2425,9 +3040,9 @@ tuplesort_rescan(Tuplesortstate *state)
                        state->markpos_eof = false;
                        break;
                case TSS_SORTEDONTAPE:
-                       LogicalTapeRewind(state->tapeset,
-                                                         state->result_tape,
-                                                         false);
+                       LogicalTapeRewindForRead(state->tapeset,
+                                                                        state->result_tape,
+                                                                        0);
                        state->eof_reached = false;
                        state->markpos_block = 0L;
                        state->markpos_offset = 0;
@@ -2490,11 +3105,10 @@ tuplesort_restorepos(Tuplesortstate *state)
                        state->eof_reached = state->markpos_eof;
                        break;
                case TSS_SORTEDONTAPE:
-                       if (!LogicalTapeSeek(state->tapeset,
-                                                                state->result_tape,
-                                                                state->markpos_block,
-                                                                state->markpos_offset))
-                               elog(ERROR, "tuplesort_restorepos failed");
+                       LogicalTapeSeek(state->tapeset,
+                                                       state->result_tape,
+                                                       state->markpos_block,
+                                                       state->markpos_offset);
                        state->eof_reached = state->markpos_eof;
                        break;
                default:
@@ -2510,13 +3124,10 @@ tuplesort_restorepos(Tuplesortstate *state)
  *
  * This can be called after tuplesort_performsort() finishes to obtain
  * printable summary information about how the sort was performed.
- * spaceUsed is measured in kilobytes.
  */
 void
 tuplesort_get_stats(Tuplesortstate *state,
-                                       const char **sortMethod,
-                                       const char **spaceType,
-                                       long *spaceUsed)
+                                       TuplesortInstrumentation *stats)
 {
        /*
         * Note: it might seem we should provide both memory and disk usage for a
@@ -2524,53 +3135,78 @@ tuplesort_get_stats(Tuplesortstate *state,
         * accurately once we have begun to return tuples to the caller (since we
         * don't account for pfree's the caller is expected to do), so we cannot
         * rely on availMem in a disk sort.  This does not seem worth the overhead
-        * to fix.      Is it worth creating an API for the memory context code to
+        * to fix.  Is it worth creating an API for the memory context code to
         * tell us how much is actually used in sortcontext?
         */
        if (state->tapeset)
        {
-               *spaceType = "Disk";
-               *spaceUsed = LogicalTapeSetBlocks(state->tapeset) * (BLCKSZ / 1024);
+               stats->spaceType = SORT_SPACE_TYPE_DISK;
+               stats->spaceUsed = LogicalTapeSetBlocks(state->tapeset) * (BLCKSZ / 1024);
        }
        else
        {
-               *spaceType = "Memory";
-               *spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
+               stats->spaceType = SORT_SPACE_TYPE_MEMORY;
+               stats->spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
        }
 
        switch (state->status)
        {
                case TSS_SORTEDINMEM:
                        if (state->boundUsed)
-                               *sortMethod = "top-N heapsort";
+                               stats->sortMethod = SORT_TYPE_TOP_N_HEAPSORT;
                        else
-                               *sortMethod = "quicksort";
+                               stats->sortMethod = SORT_TYPE_QUICKSORT;
                        break;
                case TSS_SORTEDONTAPE:
-                       *sortMethod = "external sort";
+                       stats->sortMethod = SORT_TYPE_EXTERNAL_SORT;
                        break;
                case TSS_FINALMERGE:
-                       *sortMethod = "external merge";
+                       stats->sortMethod = SORT_TYPE_EXTERNAL_MERGE;
                        break;
                default:
-                       *sortMethod = "still in progress";
+                       stats->sortMethod = SORT_TYPE_STILL_IN_PROGRESS;
                        break;
        }
 }
 
+/*
+ * Convert TuplesortMethod to a string.
+ */
+const char *
+tuplesort_method_name(TuplesortMethod m)
+{
+       switch (m)
+       {
+               case SORT_TYPE_STILL_IN_PROGRESS:
+                       return "still in progress";
+               case SORT_TYPE_TOP_N_HEAPSORT:
+                       return "top-N heapsort";
+               case SORT_TYPE_QUICKSORT:
+                       return "quicksort";
+               case SORT_TYPE_EXTERNAL_SORT:
+                       return "external sort";
+               case SORT_TYPE_EXTERNAL_MERGE:
+                       return "external merge";
+       }
+
+       return "unknown";
+}
+
+/*
+ * Convert TuplesortSpaceType to a string.
+ */
+const char *
+tuplesort_space_type_name(TuplesortSpaceType t)
+{
+       Assert(t == SORT_SPACE_TYPE_DISK || t == SORT_SPACE_TYPE_MEMORY);
+       return t == SORT_SPACE_TYPE_DISK ? "Disk" : "Memory";
+}
+
 
 /*
  * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
- *
- * Compare two SortTuples.     If checkIndex is true, use the tuple index
- * as the front of the sort key; otherwise, no.
  */
 
-#define HEAPCOMPARE(tup1,tup2) \
-       (checkIndex && ((tup1)->tupindex != (tup2)->tupindex) ? \
-        ((tup1)->tupindex) - ((tup2)->tupindex) : \
-        COMPARETUP(state, tup1, tup2))
-
 /*
  * Convert the existing unordered array of SortTuples to a bounded heap,
  * discarding all but the smallest "state->bound" tuples.
@@ -2579,10 +3215,6 @@ tuplesort_get_stats(Tuplesortstate *state,
  * at the root (array entry zero), instead of the smallest as in the normal
  * sort case.  This allows us to discard the largest entry cheaply.
  * Therefore, we temporarily reverse the sort direction.
- *
- * We assume that all entries in a bounded heap will always have tupindex
- * zero; it therefore doesn't matter that HEAPCOMPARE() doesn't reverse
- * the direction of comparison for tupindexes.
  */
 static void
 make_bounded_heap(Tuplesortstate *state)
@@ -2593,34 +3225,36 @@ make_bounded_heap(Tuplesortstate *state)
        Assert(state->status == TSS_INITIAL);
        Assert(state->bounded);
        Assert(tupcount >= state->bound);
+       Assert(SERIAL(state));
 
        /* Reverse sort direction so largest entry will be at root */
-       REVERSEDIRECTION(state);
+       reversedirection(state);
 
        state->memtupcount = 0;         /* make the heap empty */
        for (i = 0; i < tupcount; i++)
        {
-               if (state->memtupcount >= state->bound &&
-                 COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0)
-               {
-                       /* New tuple would just get thrown out, so skip it */
-                       free_sort_tuple(state, &state->memtuples[i]);
-                       CHECK_FOR_INTERRUPTS();
-               }
-               else
+               if (state->memtupcount < state->bound)
                {
                        /* Insert next tuple into heap */
                        /* Must copy source tuple to avoid possible overwrite */
                        SortTuple       stup = state->memtuples[i];
 
-                       tuplesort_heap_insert(state, &stup, 0, false);
-
-                       /* If heap too full, discard largest entry */
-                       if (state->memtupcount > state->bound)
-                       {
-                               free_sort_tuple(state, &state->memtuples[0]);
-                               tuplesort_heap_siftup(state, false);
-                       }
+                       tuplesort_heap_insert(state, &stup);
+               }
+               else
+               {
+                       /*
+                        * The heap is full.  Replace the largest entry with the new
+                        * tuple, or just discard it, if it's larger than anything already
+                        * in the heap.
+                        */
+                       if (COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0)
+                       {
+                               free_sort_tuple(state, &state->memtuples[i]);
+                               CHECK_FOR_INTERRUPTS();
+                       }
+                       else
+                               tuplesort_heap_replace_top(state, &state->memtuples[i]);
                }
        }
 
@@ -2639,18 +3273,19 @@ sort_bounded_heap(Tuplesortstate *state)
        Assert(state->status == TSS_BOUNDED);
        Assert(state->bounded);
        Assert(tupcount == state->bound);
+       Assert(SERIAL(state));
 
        /*
-        * We can unheapify in place because each sift-up will remove the largest
-        * entry, which we can promptly store in the newly freed slot at the end.
-        * Once we're down to a single-entry heap, we're done.
+        * We can unheapify in place because each delete-top call will remove the
+        * largest entry, which we can promptly store in the newly freed slot at
+        * the end.  Once we're down to a single-entry heap, we're done.
         */
        while (state->memtupcount > 1)
        {
                SortTuple       stup = state->memtuples[0];
 
                /* this sifts-up the next-largest entry and decreases memtupcount */
-               tuplesort_heap_siftup(state, false);
+               tuplesort_heap_delete_top(state);
                state->memtuples[state->memtupcount] = stup;
        }
        state->memtupcount = tupcount;
@@ -2659,37 +3294,51 @@ sort_bounded_heap(Tuplesortstate *state)
         * Reverse sort direction back to the original state.  This is not
         * actually necessary but seems like a good idea for tidiness.
         */
-       REVERSEDIRECTION(state);
+       reversedirection(state);
 
        state->status = TSS_SORTEDINMEM;
        state->boundUsed = true;
 }
 
+/*
+ * Sort all memtuples using specialized qsort() routines.
+ *
+ * Quicksort is used for small in-memory sorts, and external sort runs.
+ */
+static void
+tuplesort_sort_memtuples(Tuplesortstate *state)
+{
+       Assert(!LEADER(state));
+
+       if (state->memtupcount > 1)
+       {
+               /* Can we use the single-key sort function? */
+               if (state->onlyKey != NULL)
+                       qsort_ssup(state->memtuples, state->memtupcount,
+                                          state->onlyKey);
+               else
+                       qsort_tuple(state->memtuples,
+                                               state->memtupcount,
+                                               state->comparetup,
+                                               state);
+       }
+}
+
 /*
  * Insert a new tuple into an empty or existing heap, maintaining the
- * heap invariant.     Caller is responsible for ensuring there's room.
+ * heap invariant.  Caller is responsible for ensuring there's room.
  *
- * Note: we assume *tuple is a temporary variable that can be scribbled on.
- * For some callers, tuple actually points to a memtuples[] entry above the
+ * Note: For some callers, tuple points to a memtuples[] entry above the
  * end of the heap.  This is safe as long as it's not immediately adjacent
  * to the end of the heap (ie, in the [memtupcount] array entry) --- if it
  * is, it might get overwritten before being moved into the heap!
  */
 static void
-tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple,
-                                         int tupleindex, bool checkIndex)
+tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple)
 {
        SortTuple  *memtuples;
        int                     j;
 
-       /*
-        * Save the tupleindex --- see notes above about writing on *tuple. It's a
-        * historical artifact that tupleindex is passed as a separate argument
-        * and not in *tuple, but it's notationally convenient so let's leave it
-        * that way.
-        */
-       tuple->tupindex = tupleindex;
-
        memtuples = state->memtuples;
        Assert(state->memtupcount < state->memtupsize);
 
@@ -2704,7 +3353,7 @@ tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple,
        {
                int                     i = (j - 1) >> 1;
 
-               if (HEAPCOMPARE(tuple, &memtuples[i]) >= 0)
+               if (COMPARETUP(state, tuple, &memtuples[i]) >= 0)
                        break;
                memtuples[j] = memtuples[i];
                j = i;
@@ -2713,35 +3362,64 @@ tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple,
 }
 
 /*
- * The tuple at state->memtuples[0] has been removed from the heap.
- * Decrement memtupcount, and sift up to maintain the heap invariant.
+ * Remove the tuple at state->memtuples[0] from the heap.  Decrement
+ * memtupcount, and sift up to maintain the heap invariant.
+ *
+ * The caller has already free'd the tuple the top node points to,
+ * if necessary.
  */
 static void
-tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex)
+tuplesort_heap_delete_top(Tuplesortstate *state)
 {
        SortTuple  *memtuples = state->memtuples;
        SortTuple  *tuple;
-       int                     i,
-                               n;
 
        if (--state->memtupcount <= 0)
                return;
 
+       /*
+        * Remove the last tuple in the heap, and re-insert it, by replacing the
+        * current top node with it.
+        */
+       tuple = &memtuples[state->memtupcount];
+       tuplesort_heap_replace_top(state, tuple);
+}
+
+/*
+ * Replace the tuple at state->memtuples[0] with a new tuple.  Sift up to
+ * maintain the heap invariant.
+ *
+ * This corresponds to Knuth's "sift-up" algorithm (Algorithm 5.2.3H,
+ * Heapsort, steps H3-H8).
+ */
+static void
+tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple)
+{
+       SortTuple  *memtuples = state->memtuples;
+       unsigned int i,
+                               n;
+
+       Assert(state->memtupcount >= 1);
+
        CHECK_FOR_INTERRUPTS();
 
+       /*
+        * state->memtupcount is "int", but we use "unsigned int" for i, j, n.
+        * This prevents overflow in the "2 * i + 1" calculation, since at the top
+        * of the loop we must have i < n <= INT_MAX <= UINT_MAX/2.
+        */
        n = state->memtupcount;
-       tuple = &memtuples[n];          /* tuple that must be reinserted */
        i = 0;                                          /* i is where the "hole" is */
        for (;;)
        {
-               int                     j = 2 * i + 1;
+               unsigned int j = 2 * i + 1;
 
                if (j >= n)
                        break;
                if (j + 1 < n &&
-                       HEAPCOMPARE(&memtuples[j], &memtuples[j + 1]) > 0)
+                       COMPARETUP(state, &memtuples[j], &memtuples[j + 1]) > 0)
                        j++;
-               if (HEAPCOMPARE(tuple, &memtuples[j]) <= 0)
+               if (COMPARETUP(state, tuple, &memtuples[j]) <= 0)
                        break;
                memtuples[i] = memtuples[j];
                i = j;
@@ -2749,6 +3427,24 @@ tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex)
        memtuples[i] = *tuple;
 }
 
+/*
+ * Function to reverse the sort direction from its current state
+ *
+ * It is not safe to call this when performing hash tuplesorts
+ */
+static void
+reversedirection(Tuplesortstate *state)
+{
+       SortSupport sortKey = state->sortKeys;
+       int                     nkey;
+
+       for (nkey = 0; nkey < state->nKeys; nkey++, sortKey++)
+       {
+               sortKey->ssup_reverse = !sortKey->ssup_reverse;
+               sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first;
+       }
+}
+
 
 /*
  * Tape interface routines
@@ -2775,71 +3471,33 @@ markrunend(Tuplesortstate *state, int tapenum)
        LogicalTapeWrite(state->tapeset, tapenum, (void *) &len, sizeof(len));
 }
 
-
 /*
- * Inline-able copy of FunctionCall2Coll() to save some cycles in sorting.
+ * Get memory for tuple from within READTUP() routine.
+ *
+ * We use next free slot from the slab allocator, or palloc() if the tuple
+ * is too large for that.
  */
-static inline Datum
-myFunctionCall2Coll(FmgrInfo *flinfo, Oid collation, Datum arg1, Datum arg2)
+static void *
+readtup_alloc(Tuplesortstate *state, Size tuplen)
 {
-       FunctionCallInfoData fcinfo;
-       Datum           result;
-
-       InitFunctionCallInfoData(fcinfo, flinfo, 2, collation, NULL, NULL);
-
-       fcinfo.arg[0] = arg1;
-       fcinfo.arg[1] = arg2;
-       fcinfo.argnull[0] = false;
-       fcinfo.argnull[1] = false;
-
-       result = FunctionCallInvoke(&fcinfo);
-
-       /* Check for null result, since caller is clearly not expecting one */
-       if (fcinfo.isnull)
-               elog(ERROR, "function %u returned NULL", fcinfo.flinfo->fn_oid);
+       SlabSlot   *buf;
 
-       return result;
-}
-
-/*
- * Apply a sort function (by now converted to fmgr lookup form)
- * and return a 3-way comparison result.  This takes care of handling
- * reverse-sort and NULLs-ordering properly.  We assume that DESC and
- * NULLS_FIRST options are encoded in sk_flags the same way btree does it.
- */
-static inline int32
-inlineApplySortFunction(FmgrInfo *sortFunction, int sk_flags, Oid collation,
-                                               Datum datum1, bool isNull1,
-                                               Datum datum2, bool isNull2)
-{
-       int32           compare;
+       /*
+        * We pre-allocate enough slots in the slab arena that we should never run
+        * out.
+        */
+       Assert(state->slabFreeHead);
 
-       if (isNull1)
-       {
-               if (isNull2)
-                       compare = 0;            /* NULL "=" NULL */
-               else if (sk_flags & SK_BT_NULLS_FIRST)
-                       compare = -1;           /* NULL "<" NOT_NULL */
-               else
-                       compare = 1;            /* NULL ">" NOT_NULL */
-       }
-       else if (isNull2)
-       {
-               if (sk_flags & SK_BT_NULLS_FIRST)
-                       compare = 1;            /* NOT_NULL ">" NULL */
-               else
-                       compare = -1;           /* NOT_NULL "<" NULL */
-       }
+       if (tuplen > SLAB_SLOT_SIZE || !state->slabFreeHead)
+               return MemoryContextAlloc(state->sortcontext, tuplen);
        else
        {
-               compare = DatumGetInt32(myFunctionCall2Coll(sortFunction, collation,
-                                                                                                       datum1, datum2));
+               buf = state->slabFreeHead;
+               /* Reuse this slot */
+               state->slabFreeHead = buf->nextfree;
 
-               if (sk_flags & SK_BT_DESC)
-                       compare = -compare;
+               return buf;
        }
-
-       return compare;
 }
 
 
@@ -2856,6 +3514,12 @@ comparetup_heap(const SortTuple *a, const SortTuple *b, Tuplesortstate *state)
        TupleDesc       tupDesc;
        int                     nkey;
        int32           compare;
+       AttrNumber      attno;
+       Datum           datum1,
+                               datum2;
+       bool            isnull1,
+                               isnull2;
+
 
        /* Compare the leading sort key */
        compare = ApplySortComparator(a->datum1, a->isnull1,
@@ -2870,14 +3534,25 @@ comparetup_heap(const SortTuple *a, const SortTuple *b, Tuplesortstate *state)
        rtup.t_len = ((MinimalTuple) b->tuple)->t_len + MINIMAL_TUPLE_OFFSET;
        rtup.t_data = (HeapTupleHeader) ((char *) b->tuple - MINIMAL_TUPLE_OFFSET);
        tupDesc = state->tupDesc;
+
+       if (sortKey->abbrev_converter)
+       {
+               attno = sortKey->ssup_attno;
+
+               datum1 = heap_getattr(&ltup, attno, tupDesc, &isnull1);
+               datum2 = heap_getattr(&rtup, attno, tupDesc, &isnull2);
+
+               compare = ApplySortAbbrevFullComparator(datum1, isnull1,
+                                                                                               datum2, isnull2,
+                                                                                               sortKey);
+               if (compare != 0)
+                       return compare;
+       }
+
        sortKey++;
        for (nkey = 1; nkey < state->nKeys; nkey++, sortKey++)
        {
-               AttrNumber      attno = sortKey->ssup_attno;
-               Datum           datum1,
-                                       datum2;
-               bool            isnull1,
-                                       isnull2;
+               attno = sortKey->ssup_attno;
 
                datum1 = heap_getattr(&ltup, attno, tupDesc, &isnull1);
                datum2 = heap_getattr(&rtup, attno, tupDesc, &isnull2);
@@ -2900,8 +3575,10 @@ copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup)
         * MinimalTuple using the exported interface for that.
         */
        TupleTableSlot *slot = (TupleTableSlot *) tup;
+       Datum           original;
        MinimalTuple tuple;
        HeapTupleData htup;
+       MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
 
        /* copy the tuple into sort storage */
        tuple = ExecCopySlotMinimalTuple(slot);
@@ -2910,10 +3587,62 @@ copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup)
        /* set up first-column key value */
        htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET;
        htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET);
-       stup->datum1 = heap_getattr(&htup,
-                                                               state->sortKeys[0].ssup_attno,
-                                                               state->tupDesc,
-                                                               &stup->isnull1);
+       original = heap_getattr(&htup,
+                                                       state->sortKeys[0].ssup_attno,
+                                                       state->tupDesc,
+                                                       &stup->isnull1);
+
+       MemoryContextSwitchTo(oldcontext);
+
+       if (!state->sortKeys->abbrev_converter || stup->isnull1)
+       {
+               /*
+                * Store ordinary Datum representation, or NULL value.  If there is a
+                * converter it won't expect NULL values, and cost model is not
+                * required to account for NULL, so in that case we avoid calling
+                * converter and just set datum1 to zeroed representation (to be
+                * consistent, and to support cheap inequality tests for NULL
+                * abbreviated keys).
+                */
+               stup->datum1 = original;
+       }
+       else if (!consider_abort_common(state))
+       {
+               /* Store abbreviated key representation */
+               stup->datum1 = state->sortKeys->abbrev_converter(original,
+                                                                                                                state->sortKeys);
+       }
+       else
+       {
+               /* Abort abbreviation */
+               int                     i;
+
+               stup->datum1 = original;
+
+               /*
+                * Set state to be consistent with never trying abbreviation.
+                *
+                * Alter datum1 representation in already-copied tuples, so as to
+                * ensure a consistent representation (current tuple was just
+                * handled).  It does not matter if some dumped tuples are already
+                * sorted on tape, since serialized tuples lack abbreviated keys
+                * (TSS_BUILDRUNS state prevents control reaching here in any case).
+                */
+               for (i = 0; i < state->memtupcount; i++)
+               {
+                       SortTuple  *mtup = &state->memtuples[i];
+
+                       htup.t_len = ((MinimalTuple) mtup->tuple)->t_len +
+                               MINIMAL_TUPLE_OFFSET;
+                       htup.t_data = (HeapTupleHeader) ((char *) mtup->tuple -
+                                                                                        MINIMAL_TUPLE_OFFSET);
+
+                       mtup->datum1 = heap_getattr(&htup,
+                                                                               state->sortKeys[0].ssup_attno,
+                                                                               state->tupDesc,
+                                                                               &mtup->isnull1);
+               }
+       }
 }
 
 static void
@@ -2936,8 +3665,11 @@ writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup)
                LogicalTapeWrite(state->tapeset, tapenum,
                                                 (void *) &tuplen, sizeof(tuplen));
 
-       FREEMEM(state, GetMemoryChunkSpace(tuple));
-       heap_free_minimal_tuple(tuple);
+       if (!state->slabAllocatorUsed)
+       {
+               FREEMEM(state, GetMemoryChunkSpace(tuple));
+               heap_free_minimal_tuple(tuple);
+       }
 }
 
 static void
@@ -2946,11 +3678,10 @@ readtup_heap(Tuplesortstate *state, SortTuple *stup,
 {
        unsigned int tupbodylen = len - sizeof(int);
        unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET;
-       MinimalTuple tuple = (MinimalTuple) palloc(tuplen);
+       MinimalTuple tuple = (MinimalTuple) readtup_alloc(state, tuplen);
        char       *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
        HeapTupleData htup;
 
-       USEMEM(state, GetMemoryChunkSpace(tuple));
        /* read in the tuple proper */
        tuple->t_len = tuplen;
        LogicalTapeReadExact(state->tapeset, tapenum,
@@ -2968,20 +3699,6 @@ readtup_heap(Tuplesortstate *state, SortTuple *stup,
                                                                &stup->isnull1);
 }
 
-static void
-reversedirection_heap(Tuplesortstate *state)
-{
-       SortSupport sortKey = state->sortKeys;
-       int                     nkey;
-
-       for (nkey = 0; nkey < state->nKeys; nkey++, sortKey++)
-       {
-               sortKey->ssup_reverse = !sortKey->ssup_reverse;
-               sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first;
-       }
-}
-
-
 /*
  * Routines specialized for the CLUSTER case (HeapTuple data, with
  * comparisons per a btree index definition)
@@ -2991,24 +3708,45 @@ static int
 comparetup_cluster(const SortTuple *a, const SortTuple *b,
                                   Tuplesortstate *state)
 {
-       ScanKey         scanKey = state->indexScanKey;
+       SortSupport sortKey = state->sortKeys;
        HeapTuple       ltup;
        HeapTuple       rtup;
        TupleDesc       tupDesc;
        int                     nkey;
        int32           compare;
+       Datum           datum1,
+                               datum2;
+       bool            isnull1,
+                               isnull2;
+       AttrNumber      leading = state->indexInfo->ii_IndexAttrNumbers[0];
+
+       /* Be prepared to compare additional sort keys */
+       ltup = (HeapTuple) a->tuple;
+       rtup = (HeapTuple) b->tuple;
+       tupDesc = state->tupDesc;
 
        /* Compare the leading sort key, if it's simple */
-       if (state->indexInfo->ii_KeyAttrNumbers[0] != 0)
+       if (leading != 0)
        {
-               compare = inlineApplySortFunction(&scanKey->sk_func, scanKey->sk_flags,
-                                                                                 scanKey->sk_collation,
-                                                                                 a->datum1, a->isnull1,
-                                                                                 b->datum1, b->isnull1);
+               compare = ApplySortComparator(a->datum1, a->isnull1,
+                                                                         b->datum1, b->isnull1,
+                                                                         sortKey);
+               if (compare != 0)
+                       return compare;
+
+               if (sortKey->abbrev_converter)
+               {
+                       datum1 = heap_getattr(ltup, leading, tupDesc, &isnull1);
+                       datum2 = heap_getattr(rtup, leading, tupDesc, &isnull2);
+
+                       compare = ApplySortAbbrevFullComparator(datum1, isnull1,
+                                                                                                       datum2, isnull2,
+                                                                                                       sortKey);
+               }
                if (compare != 0 || state->nKeys == 1)
                        return compare;
                /* Compare additional columns the hard way */
-               scanKey++;
+               sortKey++;
                nkey = 1;
        }
        else
@@ -3017,31 +3755,20 @@ comparetup_cluster(const SortTuple *a, const SortTuple *b,
                nkey = 0;
        }
 
-       /* Compare additional sort keys */
-       ltup = (HeapTuple) a->tuple;
-       rtup = (HeapTuple) b->tuple;
-
        if (state->indexInfo->ii_Expressions == NULL)
        {
                /* If not expression index, just compare the proper heap attrs */
-               tupDesc = state->tupDesc;
 
-               for (; nkey < state->nKeys; nkey++, scanKey++)
+               for (; nkey < state->nKeys; nkey++, sortKey++)
                {
-                       AttrNumber      attno = state->indexInfo->ii_KeyAttrNumbers[nkey];
-                       Datum           datum1,
-                                               datum2;
-                       bool            isnull1,
-                                               isnull2;
+                       AttrNumber      attno = state->indexInfo->ii_IndexAttrNumbers[nkey];
 
                        datum1 = heap_getattr(ltup, attno, tupDesc, &isnull1);
                        datum2 = heap_getattr(rtup, attno, tupDesc, &isnull2);
 
-                       compare = inlineApplySortFunction(&scanKey->sk_func,
-                                                                                         scanKey->sk_flags,
-                                                                                         scanKey->sk_collation,
-                                                                                         datum1, isnull1,
-                                                                                         datum2, isnull2);
+                       compare = ApplySortComparator(datum1, isnull1,
+                                                                                 datum2, isnull2,
+                                                                                 sortKey);
                        if (compare != 0)
                                return compare;
                }
@@ -3065,23 +3792,21 @@ comparetup_cluster(const SortTuple *a, const SortTuple *b,
 
                ecxt_scantuple = GetPerTupleExprContext(state->estate)->ecxt_scantuple;
 
-               ExecStoreTuple(ltup, ecxt_scantuple, InvalidBuffer, false);
+               ExecStoreHeapTuple(ltup, ecxt_scantuple, false);
                FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate,
                                           l_index_values, l_index_isnull);
 
-               ExecStoreTuple(rtup, ecxt_scantuple, InvalidBuffer, false);
+               ExecStoreHeapTuple(rtup, ecxt_scantuple, false);
                FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate,
                                           r_index_values, r_index_isnull);
 
-               for (; nkey < state->nKeys; nkey++, scanKey++)
+               for (; nkey < state->nKeys; nkey++, sortKey++)
                {
-                       compare = inlineApplySortFunction(&scanKey->sk_func,
-                                                                                         scanKey->sk_flags,
-                                                                                         scanKey->sk_collation,
-                                                                                         l_index_values[nkey],
-                                                                                         l_index_isnull[nkey],
-                                                                                         r_index_values[nkey],
-                                                                                         r_index_isnull[nkey]);
+                       compare = ApplySortComparator(l_index_values[nkey],
+                                                                                 l_index_isnull[nkey],
+                                                                                 r_index_values[nkey],
+                                                                                 r_index_isnull[nkey],
+                                                                                 sortKey);
                        if (compare != 0)
                                return compare;
                }
@@ -3094,17 +3819,73 @@ static void
 copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup)
 {
        HeapTuple       tuple = (HeapTuple) tup;
+       Datum           original;
+       MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
 
        /* copy the tuple into sort storage */
        tuple = heap_copytuple(tuple);
        stup->tuple = (void *) tuple;
        USEMEM(state, GetMemoryChunkSpace(tuple));
-       /* set up first-column key value, if it's a simple column */
-       if (state->indexInfo->ii_KeyAttrNumbers[0] != 0)
-               stup->datum1 = heap_getattr(tuple,
-                                                                       state->indexInfo->ii_KeyAttrNumbers[0],
-                                                                       state->tupDesc,
-                                                                       &stup->isnull1);
+
+       MemoryContextSwitchTo(oldcontext);
+
+       /*
+        * set up first-column key value, and potentially abbreviate, if it's a
+        * simple column
+        */
+       if (state->indexInfo->ii_IndexAttrNumbers[0] == 0)
+               return;
+
+       original = heap_getattr(tuple,
+                                                       state->indexInfo->ii_IndexAttrNumbers[0],
+                                                       state->tupDesc,
+                                                       &stup->isnull1);
+
+       if (!state->sortKeys->abbrev_converter || stup->isnull1)
+       {
+               /*
+                * Store ordinary Datum representation, or NULL value.  If there is a
+                * converter it won't expect NULL values, and cost model is not
+                * required to account for NULL, so in that case we avoid calling
+                * converter and just set datum1 to zeroed representation (to be
+                * consistent, and to support cheap inequality tests for NULL
+                * abbreviated keys).
+                */
+               stup->datum1 = original;
+       }
+       else if (!consider_abort_common(state))
+       {
+               /* Store abbreviated key representation */
+               stup->datum1 = state->sortKeys->abbrev_converter(original,
+                                                                                                                state->sortKeys);
+       }
+       else
+       {
+               /* Abort abbreviation */
+               int                     i;
+
+               stup->datum1 = original;
+
+               /*
+                * Set state to be consistent with never trying abbreviation.
+                *
+                * Alter datum1 representation in already-copied tuples, so as to
+                * ensure a consistent representation (current tuple was just
+                * handled).  It does not matter if some dumped tuples are already
+                * sorted on tape, since serialized tuples lack abbreviated keys
+                * (TSS_BUILDRUNS state prevents control reaching here in any case).
+                */
+               for (i = 0; i < state->memtupcount; i++)
+               {
+                       SortTuple  *mtup = &state->memtuples[i];
+
+                       tuple = (HeapTuple) mtup->tuple;
+                       mtup->datum1 = heap_getattr(tuple,
+                                                                               state->indexInfo->ii_IndexAttrNumbers[0],
+                                                                               state->tupDesc,
+                                                                               &mtup->isnull1);
+               }
+       }
 }
 
 static void
@@ -3124,8 +3905,11 @@ writetup_cluster(Tuplesortstate *state, int tapenum, SortTuple *stup)
                LogicalTapeWrite(state->tapeset, tapenum,
                                                 &tuplen, sizeof(tuplen));
 
-       FREEMEM(state, GetMemoryChunkSpace(tuple));
-       heap_freetuple(tuple);
+       if (!state->slabAllocatorUsed)
+       {
+               FREEMEM(state, GetMemoryChunkSpace(tuple));
+               heap_freetuple(tuple);
+       }
 }
 
 static void
@@ -3133,9 +3917,9 @@ readtup_cluster(Tuplesortstate *state, SortTuple *stup,
                                int tapenum, unsigned int tuplen)
 {
        unsigned int t_len = tuplen - sizeof(ItemPointerData) - sizeof(int);
-       HeapTuple       tuple = (HeapTuple) palloc(t_len + HEAPTUPLESIZE);
+       HeapTuple       tuple = (HeapTuple) readtup_alloc(state,
+                                                                                                 t_len + HEAPTUPLESIZE);
 
-       USEMEM(state, GetMemoryChunkSpace(tuple));
        /* Reconstruct the HeapTupleData header */
        tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
        tuple->t_len = t_len;
@@ -3151,14 +3935,13 @@ readtup_cluster(Tuplesortstate *state, SortTuple *stup,
                                                         &tuplen, sizeof(tuplen));
        stup->tuple = (void *) tuple;
        /* set up first-column key value, if it's a simple column */
-       if (state->indexInfo->ii_KeyAttrNumbers[0] != 0)
+       if (state->indexInfo->ii_IndexAttrNumbers[0] != 0)
                stup->datum1 = heap_getattr(tuple,
-                                                                       state->indexInfo->ii_KeyAttrNumbers[0],
+                                                                       state->indexInfo->ii_IndexAttrNumbers[0],
                                                                        state->tupDesc,
                                                                        &stup->isnull1);
 }
 
-
 /*
  * Routines specialized for IndexTuple case
  *
@@ -3172,12 +3955,11 @@ comparetup_index_btree(const SortTuple *a, const SortTuple *b,
                                           Tuplesortstate *state)
 {
        /*
-        * This is similar to _bt_tuplecompare(), but we have already done the
-        * index_getattr calls for the first column, and we need to keep track of
-        * whether any null fields are present.  Also see the special treatment
-        * for equal keys at the end.
+        * This is similar to comparetup_heap(), but expects index tuples.  There
+        * is also special handling for enforcing uniqueness, and special
+        * treatment for equal keys at the end.
         */
-       ScanKey         scanKey = state->indexScanKey;
+       SortSupport sortKey = state->sortKeys;
        IndexTuple      tuple1;
        IndexTuple      tuple2;
        int                     keysz;
@@ -3185,39 +3967,50 @@ comparetup_index_btree(const SortTuple *a, const SortTuple *b,
        bool            equal_hasnull = false;
        int                     nkey;
        int32           compare;
+       Datum           datum1,
+                               datum2;
+       bool            isnull1,
+                               isnull2;
+
 
        /* Compare the leading sort key */
-       compare = inlineApplySortFunction(&scanKey->sk_func, scanKey->sk_flags,
-                                                                         scanKey->sk_collation,
-                                                                         a->datum1, a->isnull1,
-                                                                         b->datum1, b->isnull1);
+       compare = ApplySortComparator(a->datum1, a->isnull1,
+                                                                 b->datum1, b->isnull1,
+                                                                 sortKey);
        if (compare != 0)
                return compare;
 
-       /* they are equal, so we only need to examine one null flag */
-       if (a->isnull1)
-               equal_hasnull = true;
-
        /* Compare additional sort keys */
        tuple1 = (IndexTuple) a->tuple;
        tuple2 = (IndexTuple) b->tuple;
        keysz = state->nKeys;
        tupDes = RelationGetDescr(state->indexRel);
-       scanKey++;
-       for (nkey = 2; nkey <= keysz; nkey++, scanKey++)
+
+       if (sortKey->abbrev_converter)
        {
-               Datum           datum1,
-                                       datum2;
-               bool            isnull1,
-                                       isnull2;
+               datum1 = index_getattr(tuple1, 1, tupDes, &isnull1);
+               datum2 = index_getattr(tuple2, 1, tupDes, &isnull2);
+
+               compare = ApplySortAbbrevFullComparator(datum1, isnull1,
+                                                                                               datum2, isnull2,
+                                                                                               sortKey);
+               if (compare != 0)
+                       return compare;
+       }
+
+       /* they are equal, so we only need to examine one null flag */
+       if (a->isnull1)
+               equal_hasnull = true;
 
+       sortKey++;
+       for (nkey = 2; nkey <= keysz; nkey++, sortKey++)
+       {
                datum1 = index_getattr(tuple1, nkey, tupDes, &isnull1);
                datum2 = index_getattr(tuple2, nkey, tupDes, &isnull2);
 
-               compare = inlineApplySortFunction(&scanKey->sk_func, scanKey->sk_flags,
-                                                                                 scanKey->sk_collation,
-                                                                                 datum1, isnull1,
-                                                                                 datum2, isnull2);
+               compare = ApplySortComparator(datum1, isnull1,
+                                                                         datum2, isnull2,
+                                                                         sortKey);
                if (compare != 0)
                        return compare;         /* done when we find unequal attributes */
 
@@ -3239,6 +4032,7 @@ comparetup_index_btree(const SortTuple *a, const SortTuple *b,
        {
                Datum           values[INDEX_MAX_KEYS];
                bool            isnull[INDEX_MAX_KEYS];
+               char       *key_desc;
 
                /*
                 * Some rather brain-dead implementations of qsort (such as the one in
@@ -3249,21 +4043,24 @@ comparetup_index_btree(const SortTuple *a, const SortTuple *b,
                Assert(tuple1 != tuple2);
 
                index_deform_tuple(tuple1, tupDes, values, isnull);
+
+               key_desc = BuildIndexValueDescription(state->indexRel, values, isnull);
+
                ereport(ERROR,
                                (errcode(ERRCODE_UNIQUE_VIOLATION),
                                 errmsg("could not create unique index \"%s\"",
                                                RelationGetRelationName(state->indexRel)),
-                                errdetail("Key %s is duplicated.",
-                                                  BuildIndexValueDescription(state->indexRel,
-                                                                                                         values, isnull)),
+                                key_desc ? errdetail("Key %s is duplicated.", key_desc) :
+                                errdetail("Duplicate keys exist."),
                                 errtableconstraint(state->heapRel,
-                                                                RelationGetRelationName(state->indexRel))));
+                                                                       RelationGetRelationName(state->indexRel))));
        }
 
        /*
-        * If key values are equal, we sort on ItemPointer.  This does not affect
-        * validity of the finished index, but it may be useful to have index
-        * scans in physical order.
+        * If key values are equal, we sort on ItemPointer.  This is required for
+        * btree indexes, since heap TID is treated as an implicit last key
+        * attribute in order to ensure that all keys in the index are physically
+        * unique.
         */
        {
                BlockNumber blk1 = ItemPointerGetBlockNumber(&tuple1->t_tid);
@@ -3280,6 +4077,9 @@ comparetup_index_btree(const SortTuple *a, const SortTuple *b,
                        return (pos1 < pos2) ? -1 : 1;
        }
 
+       /* ItemPointer values should never be equal */
+       Assert(false);
+
        return 0;
 }
 
@@ -3287,8 +4087,8 @@ static int
 comparetup_index_hash(const SortTuple *a, const SortTuple *b,
                                          Tuplesortstate *state)
 {
-       uint32          hash1;
-       uint32          hash2;
+       Bucket          bucket1;
+       Bucket          bucket2;
        IndexTuple      tuple1;
        IndexTuple      tuple2;
 
@@ -3297,13 +4097,16 @@ comparetup_index_hash(const SortTuple *a, const SortTuple *b,
         * that the first column of the index tuple is the hash key.
         */
        Assert(!a->isnull1);
-       hash1 = DatumGetUInt32(a->datum1) & state->hash_mask;
+       bucket1 = _hash_hashkey2bucket(DatumGetUInt32(a->datum1),
+                                                                  state->max_buckets, state->high_mask,
+                                                                  state->low_mask);
        Assert(!b->isnull1);
-       hash2 = DatumGetUInt32(b->datum1) & state->hash_mask;
-
-       if (hash1 > hash2)
+       bucket2 = _hash_hashkey2bucket(DatumGetUInt32(b->datum1),
+                                                                  state->max_buckets, state->high_mask,
+                                                                  state->low_mask);
+       if (bucket1 > bucket2)
                return 1;
-       else if (hash1 < hash2)
+       else if (bucket1 < bucket2)
                return -1;
 
        /*
@@ -3329,6 +4132,9 @@ comparetup_index_hash(const SortTuple *a, const SortTuple *b,
                        return (pos1 < pos2) ? -1 : 1;
        }
 
+       /* ItemPointer values should never be equal */
+       Assert(false);
+
        return 0;
 }
 
@@ -3338,17 +4144,64 @@ copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup)
        IndexTuple      tuple = (IndexTuple) tup;
        unsigned int tuplen = IndexTupleSize(tuple);
        IndexTuple      newtuple;
+       Datum           original;
 
        /* copy the tuple into sort storage */
-       newtuple = (IndexTuple) palloc(tuplen);
+       newtuple = (IndexTuple) MemoryContextAlloc(state->tuplecontext, tuplen);
        memcpy(newtuple, tuple, tuplen);
        USEMEM(state, GetMemoryChunkSpace(newtuple));
        stup->tuple = (void *) newtuple;
        /* set up first-column key value */
-       stup->datum1 = index_getattr(newtuple,
-                                                                1,
-                                                                RelationGetDescr(state->indexRel),
-                                                                &stup->isnull1);
+       original = index_getattr(newtuple,
+                                                        1,
+                                                        RelationGetDescr(state->indexRel),
+                                                        &stup->isnull1);
+
+       if (!state->sortKeys->abbrev_converter || stup->isnull1)
+       {
+               /*
+                * Store ordinary Datum representation, or NULL value.  If there is a
+                * converter it won't expect NULL values, and cost model is not
+                * required to account for NULL, so in that case we avoid calling
+                * converter and just set datum1 to zeroed representation (to be
+                * consistent, and to support cheap inequality tests for NULL
+                * abbreviated keys).
+                */
+               stup->datum1 = original;
+       }
+       else if (!consider_abort_common(state))
+       {
+               /* Store abbreviated key representation */
+               stup->datum1 = state->sortKeys->abbrev_converter(original,
+                                                                                                                state->sortKeys);
+       }
+       else
+       {
+               /* Abort abbreviation */
+               int                     i;
+
+               stup->datum1 = original;
+
+               /*
+                * Set state to be consistent with never trying abbreviation.
+                *
+                * Alter datum1 representation in already-copied tuples, so as to
+                * ensure a consistent representation (current tuple was just
+                * handled).  It does not matter if some dumped tuples are already
+                * sorted on tape, since serialized tuples lack abbreviated keys
+                * (TSS_BUILDRUNS state prevents control reaching here in any case).
+                */
+               for (i = 0; i < state->memtupcount; i++)
+               {
+                       SortTuple  *mtup = &state->memtuples[i];
+
+                       tuple = (IndexTuple) mtup->tuple;
+                       mtup->datum1 = index_getattr(tuple,
+                                                                                1,
+                                                                                RelationGetDescr(state->indexRel),
+                                                                                &mtup->isnull1);
+               }
+       }
 }
 
 static void
@@ -3366,8 +4219,11 @@ writetup_index(Tuplesortstate *state, int tapenum, SortTuple *stup)
                LogicalTapeWrite(state->tapeset, tapenum,
                                                 (void *) &tuplen, sizeof(tuplen));
 
-       FREEMEM(state, GetMemoryChunkSpace(tuple));
-       pfree(tuple);
+       if (!state->slabAllocatorUsed)
+       {
+               FREEMEM(state, GetMemoryChunkSpace(tuple));
+               pfree(tuple);
+       }
 }
 
 static void
@@ -3375,9 +4231,8 @@ readtup_index(Tuplesortstate *state, SortTuple *stup,
                          int tapenum, unsigned int len)
 {
        unsigned int tuplen = len - sizeof(unsigned int);
-       IndexTuple      tuple = (IndexTuple) palloc(tuplen);
+       IndexTuple      tuple = (IndexTuple) readtup_alloc(state, tuplen);
 
-       USEMEM(state, GetMemoryChunkSpace(tuple));
        LogicalTapeReadExact(state->tapeset, tapenum,
                                                 tuple, tuplen);
        if (state->randomAccess)        /* need trailing length word? */
@@ -3391,26 +4246,6 @@ readtup_index(Tuplesortstate *state, SortTuple *stup,
                                                                 &stup->isnull1);
 }
 
-static void
-reversedirection_index_btree(Tuplesortstate *state)
-{
-       ScanKey         scanKey = state->indexScanKey;
-       int                     nkey;
-
-       for (nkey = 0; nkey < state->nKeys; nkey++, scanKey++)
-       {
-               scanKey->sk_flags ^= (SK_BT_DESC | SK_BT_NULLS_FIRST);
-       }
-}
-
-static void
-reversedirection_index_hash(Tuplesortstate *state)
-{
-       /* We don't support reversing direction in a hash index sort */
-       elog(ERROR, "reversedirection_index_hash is not implemented");
-}
-
-
 /*
  * Routines specialized for DatumTuple case
  */
@@ -3418,9 +4253,22 @@ reversedirection_index_hash(Tuplesortstate *state)
 static int
 comparetup_datum(const SortTuple *a, const SortTuple *b, Tuplesortstate *state)
 {
-       return ApplySortComparator(a->datum1, a->isnull1,
-                                                          b->datum1, b->isnull1,
-                                                          state->onlyKey);
+       int                     compare;
+
+       compare = ApplySortComparator(a->datum1, a->isnull1,
+                                                                 b->datum1, b->isnull1,
+                                                                 state->sortKeys);
+       if (compare != 0)
+               return compare;
+
+       /* if we have abbreviations, then "tuple" has the original value */
+
+       if (state->sortKeys->abbrev_converter)
+               compare = ApplySortAbbrevFullComparator(PointerGetDatum(a->tuple), a->isnull1,
+                                                                                               PointerGetDatum(b->tuple), b->isnull1,
+                                                                                               state->sortKeys);
+
+       return compare;
 }
 
 static void
@@ -3442,15 +4290,15 @@ writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup)
                waddr = NULL;
                tuplen = 0;
        }
-       else if (state->datumTypeByVal)
+       else if (!state->tuples)
        {
                waddr = &stup->datum1;
                tuplen = sizeof(Datum);
        }
        else
        {
-               waddr = DatumGetPointer(stup->datum1);
-               tuplen = datumGetSize(stup->datum1, false, state->datumTypeLen);
+               waddr = stup->tuple;
+               tuplen = datumGetSize(PointerGetDatum(stup->tuple), false, state->datumTypeLen);
                Assert(tuplen != 0);
        }
 
@@ -3464,7 +4312,7 @@ writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup)
                LogicalTapeWrite(state->tapeset, tapenum,
                                                 (void *) &writtenlen, sizeof(writtenlen));
 
-       if (stup->tuple)
+       if (!state->slabAllocatorUsed && stup->tuple)
        {
                FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
                pfree(stup->tuple);
@@ -3484,7 +4332,7 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup,
                stup->isnull1 = true;
                stup->tuple = NULL;
        }
-       else if (state->datumTypeByVal)
+       else if (!state->tuples)
        {
                Assert(tuplen == sizeof(Datum));
                LogicalTapeReadExact(state->tapeset, tapenum,
@@ -3494,14 +4342,13 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup,
        }
        else
        {
-               void       *raddr = palloc(tuplen);
+               void       *raddr = readtup_alloc(state, tuplen);
 
                LogicalTapeReadExact(state->tapeset, tapenum,
                                                         raddr, tuplen);
                stup->datum1 = PointerGetDatum(raddr);
                stup->isnull1 = false;
                stup->tuple = raddr;
-               USEMEM(state, GetMemoryChunkSpace(raddr));
        }
 
        if (state->randomAccess)        /* need trailing length word? */
@@ -3509,11 +4356,227 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup,
                                                         &tuplen, sizeof(tuplen));
 }
 
+/*
+ * Parallel sort routines
+ */
+
+/*
+ * tuplesort_estimate_shared - estimate required shared memory allocation
+ *
+ * nWorkers is an estimate of the number of workers (it's the number that
+ * will be requested).
+ */
+Size
+tuplesort_estimate_shared(int nWorkers)
+{
+       Size            tapesSize;
+
+       Assert(nWorkers > 0);
+
+       /* Make sure that BufFile shared state is MAXALIGN'd */
+       tapesSize = mul_size(sizeof(TapeShare), nWorkers);
+       tapesSize = MAXALIGN(add_size(tapesSize, offsetof(Sharedsort, tapes)));
+
+       return tapesSize;
+}
+
+/*
+ * tuplesort_initialize_shared - initialize shared tuplesort state
+ *
+ * Must be called from leader process before workers are launched, to
+ * establish state needed up-front for worker tuplesortstates.  nWorkers
+ * should match the argument passed to tuplesort_estimate_shared().
+ */
+void
+tuplesort_initialize_shared(Sharedsort *shared, int nWorkers, dsm_segment *seg)
+{
+       int                     i;
+
+       Assert(nWorkers > 0);
+
+       SpinLockInit(&shared->mutex);
+       shared->currentWorker = 0;
+       shared->workersFinished = 0;
+       SharedFileSetInit(&shared->fileset, seg);
+       shared->nTapes = nWorkers;
+       for (i = 0; i < nWorkers; i++)
+       {
+               shared->tapes[i].firstblocknumber = 0L;
+       }
+}
+
+/*
+ * tuplesort_attach_shared - attach to shared tuplesort state
+ *
+ * Must be called by all worker processes.
+ */
+void
+tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg)
+{
+       /* Attach to SharedFileSet */
+       SharedFileSetAttach(&shared->fileset, seg);
+}
+
+/*
+ * worker_get_identifier - Assign and return ordinal identifier for worker
+ *
+ * The order in which these are assigned is not well defined, and should not
+ * matter; worker numbers across parallel sort participants need only be
+ * distinct and gapless.  logtape.c requires this.
+ *
+ * Note that the identifiers assigned from here have no relation to
+ * ParallelWorkerNumber number, to avoid making any assumption about
+ * caller's requirements.  However, we do follow the ParallelWorkerNumber
+ * convention of representing a non-worker with worker number -1.  This
+ * includes the leader, as well as serial Tuplesort processes.
+ */
+static int
+worker_get_identifier(Tuplesortstate *state)
+{
+       Sharedsort *shared = state->shared;
+       int                     worker;
+
+       Assert(WORKER(state));
+
+       SpinLockAcquire(&shared->mutex);
+       worker = shared->currentWorker++;
+       SpinLockRelease(&shared->mutex);
+
+       return worker;
+}
+
+/*
+ * worker_freeze_result_tape - freeze worker's result tape for leader
+ *
+ * This is called by workers just after the result tape has been determined,
+ * instead of calling LogicalTapeFreeze() directly.  They do so because
+ * workers require a few additional steps over similar serial
+ * TSS_SORTEDONTAPE external sort cases, which also happen here.  The extra
+ * steps are around freeing now unneeded resources, and representing to
+ * leader that worker's input run is available for its merge.
+ *
+ * There should only be one final output run for each worker, which consists
+ * of all tuples that were originally input into worker.
+ */
 static void
-reversedirection_datum(Tuplesortstate *state)
+worker_freeze_result_tape(Tuplesortstate *state)
 {
-       state->onlyKey->ssup_reverse = !state->onlyKey->ssup_reverse;
-       state->onlyKey->ssup_nulls_first = !state->onlyKey->ssup_nulls_first;
+       Sharedsort *shared = state->shared;
+       TapeShare       output;
+
+       Assert(WORKER(state));
+       Assert(state->result_tape != -1);
+       Assert(state->memtupcount == 0);
+
+       /*
+        * Free most remaining memory, in case caller is sensitive to our holding
+        * on to it.  memtuples may not be a tiny merge heap at this point.
+        */
+       pfree(state->memtuples);
+       /* Be tidy */
+       state->memtuples = NULL;
+       state->memtupsize = 0;
+
+       /*
+        * Parallel worker requires result tape metadata, which is to be stored in
+        * shared memory for leader
+        */
+       LogicalTapeFreeze(state->tapeset, state->result_tape, &output);
+
+       /* Store properties of output tape, and update finished worker count */
+       SpinLockAcquire(&shared->mutex);
+       shared->tapes[state->worker] = output;
+       shared->workersFinished++;
+       SpinLockRelease(&shared->mutex);
+}
+
+/*
+ * worker_nomergeruns - dump memtuples in worker, without merging
+ *
+ * This called as an alternative to mergeruns() with a worker when no
+ * merging is required.
+ */
+static void
+worker_nomergeruns(Tuplesortstate *state)
+{
+       Assert(WORKER(state));
+       Assert(state->result_tape == -1);
+
+       state->result_tape = state->tp_tapenum[state->destTape];
+       worker_freeze_result_tape(state);
+}
+
+/*
+ * leader_takeover_tapes - create tapeset for leader from worker tapes
+ *
+ * So far, leader Tuplesortstate has performed no actual sorting.  By now, all
+ * sorting has occurred in workers, all of which must have already returned
+ * from tuplesort_performsort().
+ *
+ * When this returns, leader process is left in a state that is virtually
+ * indistinguishable from it having generated runs as a serial external sort
+ * might have.
+ */
+static void
+leader_takeover_tapes(Tuplesortstate *state)
+{
+       Sharedsort *shared = state->shared;
+       int                     nParticipants = state->nParticipants;
+       int                     workersFinished;
+       int                     j;
+
+       Assert(LEADER(state));
+       Assert(nParticipants >= 1);
+
+       SpinLockAcquire(&shared->mutex);
+       workersFinished = shared->workersFinished;
+       SpinLockRelease(&shared->mutex);
+
+       if (nParticipants != workersFinished)
+               elog(ERROR, "cannot take over tapes before all workers finish");
+
+       /*
+        * Create the tapeset from worker tapes, including a leader-owned tape at
+        * the end.  Parallel workers are far more expensive than logical tapes,
+        * so the number of tapes allocated here should never be excessive.
+        *
+        * We still have a leader tape, though it's not possible to write to it
+        * due to restrictions in the shared fileset infrastructure used by
+        * logtape.c.  It will never be written to in practice because
+        * randomAccess is disallowed for parallel sorts.
+        */
+       inittapestate(state, nParticipants + 1);
+       state->tapeset = LogicalTapeSetCreate(nParticipants + 1, shared->tapes,
+                                                                                 &shared->fileset, state->worker);
+
+       /* mergeruns() relies on currentRun for # of runs (in one-pass cases) */
+       state->currentRun = nParticipants;
+
+       /*
+        * Initialize variables of Algorithm D to be consistent with runs from
+        * workers having been generated in the leader.
+        *
+        * There will always be exactly 1 run per worker, and exactly one input
+        * tape per run, because workers always output exactly 1 run, even when
+        * there were no input tuples for workers to sort.
+        */
+       for (j = 0; j < state->maxTapes; j++)
+       {
+               /* One real run; no dummy runs for worker tapes */
+               state->tp_fib[j] = 1;
+               state->tp_runs[j] = 1;
+               state->tp_dummy[j] = 0;
+               state->tp_tapenum[j] = j;
+       }
+       /* Leader tape gets one dummy run, and no real runs */
+       state->tp_fib[state->tapeRange] = 0;
+       state->tp_runs[state->tapeRange] = 0;
+       state->tp_dummy[state->tapeRange] = 1;
+
+       state->Level = 1;
+       state->destTape = 0;
+
+       state->status = TSS_BUILDRUNS;
 }
 
 /*