]> granicus.if.org Git - postgresql/blobdiff - src/backend/utils/sort/tuplesort.c
Make heap TID a tiebreaker nbtree index column.
[postgresql] / src / backend / utils / sort / tuplesort.c
index cbaf009cdfc0fb086c021a986e13b271824bd6c9..3eebd9ef51cf4ead7da7490c2af4a9149f1e76f4 100644 (file)
  * See Knuth, volume 3, for more than you want to know about the external
  * sorting algorithm.  Historically, we divided the input into sorted runs
  * using replacement selection, in the form of a priority tree implemented
- * as a heap (essentially his Algorithm 5.2.3H), but now we only do that
- * for the first run, and only if the run would otherwise end up being very
- * short.  We merge the runs using polyphase merge, Knuth's Algorithm
- * 5.4.2D.  The logical "tapes" used by Algorithm D are implemented by
- * logtape.c, which avoids space wastage by recycling disk space as soon
- * as each block is read from its "tape".
- *
- * We do not use Knuth's recommended data structure (Algorithm 5.4.1R) for
- * the replacement selection, because it uses a fixed number of records
- * in memory at all times.  Since we are dealing with tuples that may vary
- * considerably in size, we want to be able to vary the number of records
- * kept in memory to ensure full utilization of the allowed sort memory
- * space.  So, we keep the tuples in a variable-size heap, with the next
- * record to go out at the top of the heap.  Like Algorithm 5.4.1R, each
- * record is stored with the run number that it must go into, and we use
- * (run number, key) as the ordering key for the heap.  When the run number
- * at the top of the heap changes, we know that no more records of the prior
- * run are left in the heap.  Note that there are in practice only ever two
- * distinct run numbers, because since PostgreSQL 9.6, we only use
- * replacement selection to form the first run.
- *
- * In PostgreSQL 9.6, a heap (based on Knuth's Algorithm H, with some small
- * customizations) is only used with the aim of producing just one run,
- * thereby avoiding all merging.  Only the first run can use replacement
- * selection, which is why there are now only two possible valid run
- * numbers, and why heapification is customized to not distinguish between
- * tuples in the second run (those will be quicksorted).  We generally
- * prefer a simple hybrid sort-merge strategy, where runs are sorted in much
- * the same way as the entire input of an internal sort is sorted (using
- * qsort()).  The replacement_sort_tuples GUC controls the limited remaining
- * use of replacement selection for the first run.
- *
- * There are several reasons to favor a hybrid sort-merge strategy.
- * Maintaining a priority tree/heap has poor CPU cache characteristics.
- * Furthermore, the growth in main memory sizes has greatly diminished the
- * value of having runs that are larger than available memory, even in the
- * case where there is partially sorted input and runs can be made far
- * larger by using a heap.  In most cases, a single-pass merge step is all
- * that is required even when runs are no larger than available memory.
- * Avoiding multiple merge passes was traditionally considered to be the
- * major advantage of using replacement selection.
+ * as a heap (essentially his Algorithm 5.2.3H), but now we always use
+ * quicksort for run generation.  We merge the runs using polyphase merge,
+ * Knuth's Algorithm 5.4.2D.  The logical "tapes" used by Algorithm D are
+ * implemented by logtape.c, which avoids space wastage by recycling disk
+ * space as soon as each block is read from its "tape".
  *
  * The approximate amount of memory allowed for any one sort operation
  * is specified in kilobytes by the caller (most pass work_mem).  Initially,
@@ -64,9 +28,8 @@
  * workMem, we begin to emit tuples into sorted runs in temporary tapes.
  * When tuples are dumped in batch after quicksorting, we begin a new run
  * with a new output tape (selected per Algorithm D).  After the end of the
- * input is reached, we dump out remaining tuples in memory into a final run
- * (or two, when replacement selection is still used), then merge the runs
- * using Algorithm D.
+ * input is reached, we dump out remaining tuples in memory into a final run,
+ * then merge the runs using Algorithm D.
  *
  * When merging runs, we use a heap containing just the frontmost tuple from
  * each source run; we repeatedly output the smallest tuple and replace it
  * above.  Nonetheless, with large workMem we can have many tapes (but not
  * too many -- see the comments in tuplesort_merge_order).
  *
+ * This module supports parallel sorting.  Parallel sorts involve coordination
+ * among one or more worker processes, and a leader process, each with its own
+ * tuplesort state.  The leader process (or, more accurately, the
+ * Tuplesortstate associated with a leader process) creates a full tapeset
+ * consisting of worker tapes with one run to merge; a run for every
+ * worker process.  This is then merged.  Worker processes are guaranteed to
+ * produce exactly one output run from their partial input.
  *
- * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
 
 #include <limits.h>
 
+#include "access/hash.h"
 #include "access/htup_details.h"
 #include "access/nbtree.h"
 #include "catalog/index.h"
 #define DATUM_SORT             2
 #define CLUSTER_SORT   3
 
+/* Sort parallel code from state for sort__start probes */
+#define PARALLEL_SORT(state)   ((state)->shared == NULL ? 0 : \
+                                                                (state)->worker >= 0 ? 1 : 2)
+
 /* GUC variables */
 #ifdef TRACE_SORT
 bool           trace_sort = false;
@@ -187,13 +163,8 @@ bool               optimize_bounded_sort = true;
  * described above.  Accordingly, "tuple" is always used in preference to
  * datum1 as the authoritative value for pass-by-reference cases.
  *
- * While building initial runs, tupindex holds the tuple's run number.
- * Historically, the run number could meaningfully distinguish many runs, but
- * it now only distinguishes RUN_FIRST and HEAP_RUN_NEXT, since replacement
- * selection is always abandoned after the first run; no other run number
- * should be represented here.  During merge passes, we re-use it to hold the
- * input tape number that each tuple in the heap was read from.  tupindex goes
- * unused if the sort occurs entirely in memory.
+ * tupindex holds the input tape number that each tuple in the heap was read
+ * from during merge passes.
  */
 typedef struct
 {
@@ -252,17 +223,8 @@ typedef enum
 #define TAPE_BUFFER_OVERHEAD           BLCKSZ
 #define MERGE_BUFFER_SIZE                      (BLCKSZ * 32)
 
- /*
-  * Run numbers, used during external sort operations.
-  *
-  * HEAP_RUN_NEXT is only used for SortTuple.tupindex, never state.currentRun.
-  */
-#define RUN_FIRST              0
-#define HEAP_RUN_NEXT  INT_MAX
-#define RUN_SECOND             1
-
 typedef int (*SortTupleComparator) (const SortTuple *a, const SortTuple *b,
-                                                                                               Tuplesortstate *state);
+                                                                       Tuplesortstate *state);
 
 /*
  * Private state of a Tuplesort operation.
@@ -313,7 +275,7 @@ struct Tuplesortstate
         * memory space thereby released.
         */
        void            (*writetup) (Tuplesortstate *state, int tapenum,
-                                                                                SortTuple *stup);
+                                                        SortTuple *stup);
 
        /*
         * Function to read a stored tuple from tape back into memory. 'len' is
@@ -321,7 +283,7 @@ struct Tuplesortstate
         * from the slab memory arena, or is palloc'd, see readtup_alloc().
         */
        void            (*readtup) (Tuplesortstate *state, SortTuple *stup,
-                                                                               int tapenum, unsigned int len);
+                                                       int tapenum, unsigned int len);
 
        /*
         * This array holds the tuples now in sort memory.  If we are in state
@@ -380,16 +342,8 @@ struct Tuplesortstate
        void       *lastReturnedTuple;
 
        /*
-        * While building initial runs, this indicates if the replacement
-        * selection strategy is in use.  When it isn't, then a simple hybrid
-        * sort-merge strategy is in use instead (runs are quicksorted).
-        */
-       bool            replaceActive;
-
-       /*
-        * While building initial runs, this is the current output run number
-        * (starting at RUN_FIRST).  Afterwards, it is the number of initial runs
-        * we made.
+        * While building initial runs, this is the current output run number.
+        * Afterwards, it is the number of initial runs we made.
         */
        int                     currentRun;
 
@@ -432,6 +386,25 @@ struct Tuplesortstate
        int                     markpos_offset; /* saved "current", or offset in tape block */
        bool            markpos_eof;    /* saved "eof_reached" */
 
+       /*
+        * These variables are used during parallel sorting.
+        *
+        * worker is our worker identifier.  Follows the general convention that
+        * -1 value relates to a leader tuplesort, and values >= 0 worker
+        * tuplesorts. (-1 can also be a serial tuplesort.)
+        *
+        * shared is mutable shared memory state, which is used to coordinate
+        * parallel sorts.
+        *
+        * nParticipants is the number of worker Tuplesortstates known by the
+        * leader to have actually been launched, which implies that they must
+        * finish a run leader can merge.  Typically includes a worker state held
+        * by the leader process itself.  Set in the leader Tuplesortstate only.
+        */
+       int                     worker;
+       Sharedsort *shared;
+       int                     nParticipants;
+
        /*
         * The sortKeys variable is used by every case other than the hash index
         * case; it is set by tuplesort_begin_xxx.  tupDesc is only used by the
@@ -473,7 +446,9 @@ struct Tuplesortstate
        bool            enforceUnique;  /* complain if we find duplicate tuples */
 
        /* These are specific to the index_hash subcase: */
-       uint32          hash_mask;              /* mask for sortable part of hash code */
+       uint32          high_mask;              /* masks for sortable part of hash code */
+       uint32          low_mask;
+       uint32          max_buckets;
 
        /*
         * These variables are specific to the Datum case; they are set by
@@ -491,6 +466,39 @@ struct Tuplesortstate
 #endif
 };
 
+/*
+ * Private mutable state of tuplesort-parallel-operation.  This is allocated
+ * in shared memory.
+ */
+struct Sharedsort
+{
+       /* mutex protects all fields prior to tapes */
+       slock_t         mutex;
+
+       /*
+        * currentWorker generates ordinal identifier numbers for parallel sort
+        * workers.  These start from 0, and are always gapless.
+        *
+        * Workers increment workersFinished to indicate having finished.  If this
+        * is equal to state.nParticipants within the leader, leader is ready to
+        * merge worker runs.
+        */
+       int                     currentWorker;
+       int                     workersFinished;
+
+       /* Temporary file space */
+       SharedFileSet fileset;
+
+       /* Size of tapes flexible array */
+       int                     nTapes;
+
+       /*
+        * Tapes array used by workers to report back information needed by the
+        * leader to concatenate all worker tapes into one for merging
+        */
+       TapeShare       tapes[FLEXIBLE_ARRAY_MEMBER];
+};
+
 /*
  * Is the given tuple allocated from the slab memory arena?
  */
@@ -521,6 +529,9 @@ struct Tuplesortstate
 #define LACKMEM(state)         ((state)->availMem < 0 && !(state)->slabAllocatorUsed)
 #define USEMEM(state,amt)      ((state)->availMem -= (amt))
 #define FREEMEM(state,amt)     ((state)->availMem += (amt))
+#define SERIAL(state)          ((state)->shared == NULL)
+#define WORKER(state)          ((state)->shared && (state)->worker != -1)
+#define LEADER(state)          ((state)->shared && (state)->worker == -1)
 
 /*
  * NOTES about on-tape representation of tuples:
@@ -577,11 +588,13 @@ struct Tuplesortstate
        } while(0)
 
 
-static Tuplesortstate *tuplesort_begin_common(int workMem, bool randomAccess);
+static Tuplesortstate *tuplesort_begin_common(int workMem,
+                                          SortCoordinate coordinate,
+                                          bool randomAccess);
 static void puttuple_common(Tuplesortstate *state, SortTuple *tuple);
 static bool consider_abort_common(Tuplesortstate *state);
-static bool useselection(Tuplesortstate *state);
-static void inittapes(Tuplesortstate *state);
+static void inittapes(Tuplesortstate *state, bool mergeruns);
+static void inittapestate(Tuplesortstate *state, int maxTapes);
 static void selectnewtape(Tuplesortstate *state);
 static void init_slab_allocator(Tuplesortstate *state, int numSlots);
 static void mergeruns(Tuplesortstate *state);
@@ -589,15 +602,12 @@ static void mergeonerun(Tuplesortstate *state);
 static void beginmerge(Tuplesortstate *state);
 static bool mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup);
 static void dumptuples(Tuplesortstate *state, bool alltuples);
-static void dumpbatch(Tuplesortstate *state, bool alltuples);
 static void make_bounded_heap(Tuplesortstate *state);
 static void sort_bounded_heap(Tuplesortstate *state);
 static void tuplesort_sort_memtuples(Tuplesortstate *state);
-static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple,
-                                         bool checkIndex);
-static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple,
-                                                  bool checkIndex);
-static void tuplesort_heap_delete_top(Tuplesortstate *state, bool checkIndex);
+static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple);
+static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple);
+static void tuplesort_heap_delete_top(Tuplesortstate *state);
 static void reversedirection(Tuplesortstate *state);
 static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
 static void markrunend(Tuplesortstate *state, int tapenum);
@@ -632,6 +642,10 @@ static void writetup_datum(Tuplesortstate *state, int tapenum,
                           SortTuple *stup);
 static void readtup_datum(Tuplesortstate *state, SortTuple *stup,
                          int tapenum, unsigned int len);
+static int     worker_get_identifier(Tuplesortstate *state);
+static void worker_freeze_result_tape(Tuplesortstate *state);
+static void worker_nomergeruns(Tuplesortstate *state);
+static void leader_takeover_tapes(Tuplesortstate *state);
 static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
 
 /*
@@ -664,13 +678,18 @@ static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
  */
 
 static Tuplesortstate *
-tuplesort_begin_common(int workMem, bool randomAccess)
+tuplesort_begin_common(int workMem, SortCoordinate coordinate,
+                                          bool randomAccess)
 {
        Tuplesortstate *state;
        MemoryContext sortcontext;
        MemoryContext tuplecontext;
        MemoryContext oldcontext;
 
+       /* See leader_takeover_tapes() remarks on randomAccess support */
+       if (coordinate && randomAccess)
+               elog(ERROR, "random access disallowed under parallel sort");
+
        /*
         * Create a working memory context for this sort operation. All data
         * needed by the sort will live inside this context.
@@ -710,7 +729,14 @@ tuplesort_begin_common(int workMem, bool randomAccess)
        state->bounded = false;
        state->tuples = true;
        state->boundUsed = false;
-       state->allowedMem = workMem * (int64) 1024;
+
+       /*
+        * workMem is forced to be at least 64KB, the current minimum valid value
+        * for the work_mem GUC.  This is a defense against parallel sort callers
+        * that divide out memory among many workers in a way that leaves each
+        * with very little memory.
+        */
+       state->allowedMem = Max(workMem, 64) * (int64) 1024;
        state->availMem = state->allowedMem;
        state->sortcontext = sortcontext;
        state->tuplecontext = tuplecontext;
@@ -723,7 +749,7 @@ tuplesort_begin_common(int workMem, bool randomAccess)
         * see comments in grow_memtuples().
         */
        state->memtupsize = Max(1024,
-                                               ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1);
+                                                       ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1);
 
        state->growmemtuples = true;
        state->slabAllocatorUsed = false;
@@ -735,7 +761,7 @@ tuplesort_begin_common(int workMem, bool randomAccess)
        if (LACKMEM(state))
                elog(ERROR, "insufficient memory allowed for sort");
 
-       state->currentRun = RUN_FIRST;
+       state->currentRun = 0;
 
        /*
         * maxTapes, tapeRange, and Algorithm D variables will be initialized by
@@ -744,6 +770,33 @@ tuplesort_begin_common(int workMem, bool randomAccess)
 
        state->result_tape = -1;        /* flag that result tape has not been formed */
 
+       /*
+        * Initialize parallel-related state based on coordination information
+        * from caller
+        */
+       if (!coordinate)
+       {
+               /* Serial sort */
+               state->shared = NULL;
+               state->worker = -1;
+               state->nParticipants = -1;
+       }
+       else if (coordinate->isWorker)
+       {
+               /* Parallel worker produces exactly one final run from all input */
+               state->shared = coordinate->sharedsort;
+               state->worker = worker_get_identifier(state);
+               state->nParticipants = -1;
+       }
+       else
+       {
+               /* Parallel leader state only used for final merge */
+               state->shared = coordinate->sharedsort;
+               state->worker = -1;
+               state->nParticipants = coordinate->nParticipants;
+               Assert(state->nParticipants >= 1);
+       }
+
        MemoryContextSwitchTo(oldcontext);
 
        return state;
@@ -754,9 +807,10 @@ tuplesort_begin_heap(TupleDesc tupDesc,
                                         int nkeys, AttrNumber *attNums,
                                         Oid *sortOperators, Oid *sortCollations,
                                         bool *nullsFirstFlags,
-                                        int workMem, bool randomAccess)
+                                        int workMem, SortCoordinate coordinate, bool randomAccess)
 {
-       Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
+       Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+                                                                                                  randomAccess);
        MemoryContext oldcontext;
        int                     i;
 
@@ -777,7 +831,8 @@ tuplesort_begin_heap(TupleDesc tupDesc,
                                                                false,  /* no unique check */
                                                                nkeys,
                                                                workMem,
-                                                               randomAccess);
+                                                               randomAccess,
+                                                               PARALLEL_SORT(state));
 
        state->comparetup = comparetup_heap;
        state->copytup = copytup_heap;
@@ -824,10 +879,12 @@ tuplesort_begin_heap(TupleDesc tupDesc,
 Tuplesortstate *
 tuplesort_begin_cluster(TupleDesc tupDesc,
                                                Relation indexRel,
-                                               int workMem, bool randomAccess)
+                                               int workMem,
+                                               SortCoordinate coordinate, bool randomAccess)
 {
-       Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
-       ScanKey         indexScanKey;
+       Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+                                                                                                  randomAccess);
+       BTScanInsert indexScanKey;
        MemoryContext oldcontext;
        int                     i;
 
@@ -843,13 +900,14 @@ tuplesort_begin_cluster(TupleDesc tupDesc,
                         workMem, randomAccess ? 't' : 'f');
 #endif
 
-       state->nKeys = RelationGetNumberOfAttributes(indexRel);
+       state->nKeys = IndexRelationGetNumberOfKeyAttributes(indexRel);
 
        TRACE_POSTGRESQL_SORT_START(CLUSTER_SORT,
                                                                false,  /* no unique check */
                                                                state->nKeys,
                                                                workMem,
-                                                               randomAccess);
+                                                               randomAccess,
+                                                               PARALLEL_SORT(state));
 
        state->comparetup = comparetup_cluster;
        state->copytup = copytup_cluster;
@@ -861,7 +919,7 @@ tuplesort_begin_cluster(TupleDesc tupDesc,
 
        state->tupDesc = tupDesc;       /* assume we need not copy tupDesc */
 
-       indexScanKey = _bt_mkscankey_nodata(indexRel);
+       indexScanKey = _bt_mkscankey(indexRel, NULL);
 
        if (state->indexInfo->ii_Expressions != NULL)
        {
@@ -875,7 +933,7 @@ tuplesort_begin_cluster(TupleDesc tupDesc,
                 * scantuple has to point to that slot, too.
                 */
                state->estate = CreateExecutorState();
-               slot = MakeSingleTupleTableSlot(tupDesc);
+               slot = MakeSingleTupleTableSlot(tupDesc, &TTSOpsVirtual);
                econtext = GetPerTupleExprContext(state->estate);
                econtext->ecxt_scantuple = slot;
        }
@@ -887,7 +945,7 @@ tuplesort_begin_cluster(TupleDesc tupDesc,
        for (i = 0; i < state->nKeys; i++)
        {
                SortSupport sortKey = state->sortKeys + i;
-               ScanKey         scanKey = indexScanKey + i;
+               ScanKey         scanKey = indexScanKey->scankeys + i;
                int16           strategy;
 
                sortKey->ssup_cxt = CurrentMemoryContext;
@@ -906,7 +964,7 @@ tuplesort_begin_cluster(TupleDesc tupDesc,
                PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey);
        }
 
-       _bt_freeskey(indexScanKey);
+       pfree(indexScanKey);
 
        MemoryContextSwitchTo(oldcontext);
 
@@ -917,10 +975,13 @@ Tuplesortstate *
 tuplesort_begin_index_btree(Relation heapRel,
                                                        Relation indexRel,
                                                        bool enforceUnique,
-                                                       int workMem, bool randomAccess)
+                                                       int workMem,
+                                                       SortCoordinate coordinate,
+                                                       bool randomAccess)
 {
-       Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
-       ScanKey         indexScanKey;
+       Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+                                                                                                  randomAccess);
+       BTScanInsert indexScanKey;
        MemoryContext oldcontext;
        int                     i;
 
@@ -934,13 +995,14 @@ tuplesort_begin_index_btree(Relation heapRel,
                         workMem, randomAccess ? 't' : 'f');
 #endif
 
-       state->nKeys = RelationGetNumberOfAttributes(indexRel);
+       state->nKeys = IndexRelationGetNumberOfKeyAttributes(indexRel);
 
        TRACE_POSTGRESQL_SORT_START(INDEX_SORT,
                                                                enforceUnique,
                                                                state->nKeys,
                                                                workMem,
-                                                               randomAccess);
+                                                               randomAccess,
+                                                               PARALLEL_SORT(state));
 
        state->comparetup = comparetup_index_btree;
        state->copytup = copytup_index;
@@ -952,8 +1014,7 @@ tuplesort_begin_index_btree(Relation heapRel,
        state->indexRel = indexRel;
        state->enforceUnique = enforceUnique;
 
-       indexScanKey = _bt_mkscankey_nodata(indexRel);
-       state->nKeys = RelationGetNumberOfAttributes(indexRel);
+       indexScanKey = _bt_mkscankey(indexRel, NULL);
 
        /* Prepare SortSupport data for each column */
        state->sortKeys = (SortSupport) palloc0(state->nKeys *
@@ -962,7 +1023,7 @@ tuplesort_begin_index_btree(Relation heapRel,
        for (i = 0; i < state->nKeys; i++)
        {
                SortSupport sortKey = state->sortKeys + i;
-               ScanKey         scanKey = indexScanKey + i;
+               ScanKey         scanKey = indexScanKey->scankeys + i;
                int16           strategy;
 
                sortKey->ssup_cxt = CurrentMemoryContext;
@@ -981,7 +1042,7 @@ tuplesort_begin_index_btree(Relation heapRel,
                PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey);
        }
 
-       _bt_freeskey(indexScanKey);
+       pfree(indexScanKey);
 
        MemoryContextSwitchTo(oldcontext);
 
@@ -991,10 +1052,15 @@ tuplesort_begin_index_btree(Relation heapRel,
 Tuplesortstate *
 tuplesort_begin_index_hash(Relation heapRel,
                                                   Relation indexRel,
-                                                  uint32 hash_mask,
-                                                  int workMem, bool randomAccess)
+                                                  uint32 high_mask,
+                                                  uint32 low_mask,
+                                                  uint32 max_buckets,
+                                                  int workMem,
+                                                  SortCoordinate coordinate,
+                                                  bool randomAccess)
 {
-       Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
+       Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+                                                                                                  randomAccess);
        MemoryContext oldcontext;
 
        oldcontext = MemoryContextSwitchTo(state->sortcontext);
@@ -1002,8 +1068,11 @@ tuplesort_begin_index_hash(Relation heapRel,
 #ifdef TRACE_SORT
        if (trace_sort)
                elog(LOG,
-               "begin index sort: hash_mask = 0x%x, workMem = %d, randomAccess = %c",
-                        hash_mask,
+                        "begin index sort: high_mask = 0x%x, low_mask = 0x%x, "
+                        "max_buckets = 0x%x, workMem = %d, randomAccess = %c",
+                        high_mask,
+                        low_mask,
+                        max_buckets,
                         workMem, randomAccess ? 't' : 'f');
 #endif
 
@@ -1017,7 +1086,9 @@ tuplesort_begin_index_hash(Relation heapRel,
        state->heapRel = heapRel;
        state->indexRel = indexRel;
 
-       state->hash_mask = hash_mask;
+       state->high_mask = high_mask;
+       state->low_mask = low_mask;
+       state->max_buckets = max_buckets;
 
        MemoryContextSwitchTo(oldcontext);
 
@@ -1026,10 +1097,11 @@ tuplesort_begin_index_hash(Relation heapRel,
 
 Tuplesortstate *
 tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
-                                         bool nullsFirstFlag,
-                                         int workMem, bool randomAccess)
+                                         bool nullsFirstFlag, int workMem,
+                                         SortCoordinate coordinate, bool randomAccess)
 {
-       Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
+       Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+                                                                                                  randomAccess);
        MemoryContext oldcontext;
        int16           typlen;
        bool            typbyval;
@@ -1049,7 +1121,8 @@ tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
                                                                false,  /* no unique check */
                                                                1,
                                                                workMem,
-                                                               randomAccess);
+                                                               randomAccess,
+                                                               PARALLEL_SORT(state));
 
        state->comparetup = comparetup_datum;
        state->copytup = copytup_datum;
@@ -1107,7 +1180,7 @@ tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
  * delayed calls at the moment.)
  *
  * This is a hint only. The tuplesort may still return more tuples than
- * requested.
+ * requested.  Parallel leader tuplesorts will always ignore the hint.
  */
 void
 tuplesort_set_bound(Tuplesortstate *state, int64 bound)
@@ -1116,6 +1189,7 @@ tuplesort_set_bound(Tuplesortstate *state, int64 bound)
        Assert(state->status == TSS_INITIAL);
        Assert(state->memtupcount == 0);
        Assert(!state->bounded);
+       Assert(!WORKER(state));
 
 #ifdef DEBUG_BOUNDED_SORT
        /* Honor GUC setting that disables the feature (for easy testing) */
@@ -1123,6 +1197,10 @@ tuplesort_set_bound(Tuplesortstate *state, int64 bound)
                return;
 #endif
 
+       /* Parallel leader ignores hint */
+       if (LEADER(state))
+               return;
+
        /* We want to be able to compute bound * 2, so limit the setting */
        if (bound > (int64) (INT_MAX / 2))
                return;
@@ -1181,11 +1259,13 @@ tuplesort_end(Tuplesortstate *state)
        if (trace_sort)
        {
                if (state->tapeset)
-                       elog(LOG, "external sort ended, %ld disk blocks used: %s",
-                                spaceUsed, pg_rusage_show(&state->ru_start));
+                       elog(LOG, "%s of worker %d ended, %ld disk blocks used: %s",
+                                SERIAL(state) ? "external sort" : "parallel external sort",
+                                state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
                else
-                       elog(LOG, "internal sort ended, %ld KB used: %s",
-                                spaceUsed, pg_rusage_show(&state->ru_start));
+                       elog(LOG, "%s of worker %d ended, %ld KB used: %s",
+                                SERIAL(state) ? "internal sort" : "unperformed parallel sort",
+                                state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
        }
 
        TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, spaceUsed);
@@ -1219,7 +1299,7 @@ tuplesort_end(Tuplesortstate *state)
 /*
  * Grow the memtuples[] array, if possible within our memory constraint.  We
  * must not exceed INT_MAX tuples in memory or the caller-provided memory
- * limit.  Return TRUE if we were able to enlarge the array, FALSE if not.
+ * limit.  Return true if we were able to enlarge the array, false if not.
  *
  * Normally, at each increment we double the size of the array.  When doing
  * that would exceed a limit, we attempt one last, smaller increase (and then
@@ -1556,6 +1636,8 @@ tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull)
 static void
 puttuple_common(Tuplesortstate *state, SortTuple *tuple)
 {
+       Assert(!LEADER(state));
+
        switch (state->status)
        {
                case TSS_INITIAL:
@@ -1609,10 +1691,10 @@ puttuple_common(Tuplesortstate *state, SortTuple *tuple)
                        /*
                         * Nope; time to switch to tape-based operation.
                         */
-                       inittapes(state);
+                       inittapes(state, true);
 
                        /*
-                        * Dump tuples until we are back under the limit.
+                        * Dump all tuples.
                         */
                        dumptuples(state, false);
                        break;
@@ -1637,74 +1719,19 @@ puttuple_common(Tuplesortstate *state, SortTuple *tuple)
                        {
                                /* discard top of heap, replacing it with the new tuple */
                                free_sort_tuple(state, &state->memtuples[0]);
-                               tuple->tupindex = 0;    /* not used */
-                               tuplesort_heap_replace_top(state, tuple, false);
+                               tuplesort_heap_replace_top(state, tuple);
                        }
                        break;
 
                case TSS_BUILDRUNS:
 
                        /*
-                        * Insert the tuple into the heap, with run number currentRun if
-                        * it can go into the current run, else HEAP_RUN_NEXT.  The tuple
-                        * can go into the current run if it is >= the first
-                        * not-yet-output tuple.  (Actually, it could go into the current
-                        * run if it is >= the most recently output tuple ... but that
-                        * would require keeping around the tuple we last output, and it's
-                        * simplest to let writetup free each tuple as soon as it's
-                        * written.)
-                        *
-                        * Note that this only applies when:
-                        *
-                        * - currentRun is RUN_FIRST
-                        *
-                        * - Replacement selection is in use (typically it is never used).
-                        *
-                        * When these two conditions are not both true, all tuples are
-                        * appended indifferently, much like the TSS_INITIAL case.
-                        *
-                        * There should always be room to store the incoming tuple.
+                        * Save the tuple into the unsorted array (there must be space)
                         */
-                       Assert(!state->replaceActive || state->memtupcount > 0);
-                       if (state->replaceActive &&
-                               COMPARETUP(state, tuple, &state->memtuples[0]) >= 0)
-                       {
-                               Assert(state->currentRun == RUN_FIRST);
-
-                               /*
-                                * Insert tuple into first, fully heapified run.
-                                *
-                                * Unlike classic replacement selection, which this module was
-                                * previously based on, only RUN_FIRST tuples are fully
-                                * heapified.  Any second/next run tuples are appended
-                                * indifferently.  While HEAP_RUN_NEXT tuples may be sifted
-                                * out of the way of first run tuples, COMPARETUP() will never
-                                * be called for the run's tuples during sifting (only our
-                                * initial COMPARETUP() call is required for the tuple, to
-                                * determine that the tuple does not belong in RUN_FIRST).
-                                */
-                               tuple->tupindex = state->currentRun;
-                               tuplesort_heap_insert(state, tuple, true);
-                       }
-                       else
-                       {
-                               /*
-                                * Tuple was determined to not belong to heapified RUN_FIRST,
-                                * or replacement selection not in play.  Append the tuple to
-                                * memtuples indifferently.
-                                *
-                                * dumptuples() does not trust that the next run's tuples are
-                                * heapified.  Anything past the first run will always be
-                                * quicksorted even when replacement selection is initially
-                                * used.  (When it's never used, every tuple still takes this
-                                * path.)
-                                */
-                               tuple->tupindex = HEAP_RUN_NEXT;
-                               state->memtuples[state->memtupcount++] = *tuple;
-                       }
+                       state->memtuples[state->memtupcount++] = *tuple;
 
                        /*
-                        * If we are over the memory limit, dump tuples till we're under.
+                        * If we are over the memory limit, dump all tuples.
                         */
                        dumptuples(state, false);
                        break;
@@ -1766,8 +1793,8 @@ tuplesort_performsort(Tuplesortstate *state)
 
 #ifdef TRACE_SORT
        if (trace_sort)
-               elog(LOG, "performsort starting: %s",
-                        pg_rusage_show(&state->ru_start));
+               elog(LOG, "performsort of worker %d starting: %s",
+                        state->worker, pg_rusage_show(&state->ru_start));
 #endif
 
        switch (state->status)
@@ -1776,14 +1803,39 @@ tuplesort_performsort(Tuplesortstate *state)
 
                        /*
                         * We were able to accumulate all the tuples within the allowed
-                        * amount of memory.  Just qsort 'em and we're done.
+                        * amount of memory, or leader to take over worker tapes
                         */
-                       tuplesort_sort_memtuples(state);
+                       if (SERIAL(state))
+                       {
+                               /* Just qsort 'em and we're done */
+                               tuplesort_sort_memtuples(state);
+                               state->status = TSS_SORTEDINMEM;
+                       }
+                       else if (WORKER(state))
+                       {
+                               /*
+                                * Parallel workers must still dump out tuples to tape.  No
+                                * merge is required to produce single output run, though.
+                                */
+                               inittapes(state, false);
+                               dumptuples(state, true);
+                               worker_nomergeruns(state);
+                               state->status = TSS_SORTEDONTAPE;
+                       }
+                       else
+                       {
+                               /*
+                                * Leader will take over worker tapes and merge worker runs.
+                                * Note that mergeruns sets the correct state->status.
+                                */
+                               leader_takeover_tapes(state);
+                               mergeruns(state);
+                       }
                        state->current = 0;
                        state->eof_reached = false;
+                       state->markpos_block = 0L;
                        state->markpos_offset = 0;
                        state->markpos_eof = false;
-                       state->status = TSS_SORTEDINMEM;
                        break;
 
                case TSS_BOUNDED:
@@ -1806,8 +1858,8 @@ tuplesort_performsort(Tuplesortstate *state)
                        /*
                         * Finish tape-based sort.  First, flush all tuples remaining in
                         * memory out to tape; then merge until we have a single remaining
-                        * run (or, if !randomAccess, one run per tape). Note that
-                        * mergeruns sets the correct state->status.
+                        * run (or, if !randomAccess and !WORKER(), one run per tape).
+                        * Note that mergeruns sets the correct state->status.
                         */
                        dumptuples(state, true);
                        mergeruns(state);
@@ -1826,12 +1878,12 @@ tuplesort_performsort(Tuplesortstate *state)
        if (trace_sort)
        {
                if (state->status == TSS_FINALMERGE)
-                       elog(LOG, "performsort done (except %d-way final merge): %s",
-                                state->activeTapes,
+                       elog(LOG, "performsort of worker %d done (except %d-way final merge): %s",
+                                state->worker, state->activeTapes,
                                 pg_rusage_show(&state->ru_start));
                else
-                       elog(LOG, "performsort done: %s",
-                                pg_rusage_show(&state->ru_start));
+                       elog(LOG, "performsort of worker %d done: %s",
+                                state->worker, pg_rusage_show(&state->ru_start));
        }
 #endif
 
@@ -1840,9 +1892,10 @@ tuplesort_performsort(Tuplesortstate *state)
 
 /*
  * Internal routine to fetch the next tuple in either forward or back
- * direction into *stup.  Returns FALSE if no more tuples.
+ * direction into *stup.  Returns false if no more tuples.
  * Returned tuple belongs to tuplesort memory context, and must not be freed
- * by caller.  Caller should not use tuple following next call here.
+ * by caller.  Note that fetched tuple is stored in memory that may be
+ * recycled by any future fetch.
  */
 static bool
 tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
@@ -1851,6 +1904,8 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
        unsigned int tuplen;
        size_t          nmoved;
 
+       Assert(!WORKER(state));
+
        switch (state->status)
        {
                case TSS_SORTEDINMEM:
@@ -1978,7 +2033,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
                                 */
                                nmoved = LogicalTapeBackspace(state->tapeset,
                                                                                          state->result_tape,
-                                                                                 tuplen + 2 * sizeof(unsigned int));
+                                                                                         tuplen + 2 * sizeof(unsigned int));
                                if (nmoved == tuplen + sizeof(unsigned int))
                                {
                                        /*
@@ -2057,7 +2112,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
                                         * If no more data, we've reached end of run on this tape.
                                         * Remove the top node from the heap.
                                         */
-                                       tuplesort_heap_delete_top(state, false);
+                                       tuplesort_heap_delete_top(state);
 
                                        /*
                                         * Rewind to free the read buffer.  It'd go away at the
@@ -2068,7 +2123,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
                                        return true;
                                }
                                newtup.tupindex = srcTape;
-                               tuplesort_heap_replace_top(state, &newtup, false);
+                               tuplesort_heap_replace_top(state, &newtup);
                                return true;
                        }
                        return false;
@@ -2081,22 +2136,26 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
 
 /*
  * Fetch the next tuple in either forward or back direction.
- * If successful, put tuple in slot and return TRUE; else, clear the slot
- * and return FALSE.
+ * If successful, put tuple in slot and return true; else, clear the slot
+ * and return false.
  *
- * Caller may optionally be passed back abbreviated value (on TRUE return
+ * Caller may optionally be passed back abbreviated value (on true return
  * value) when abbreviation was used, which can be used to cheaply avoid
  * equality checks that might otherwise be required.  Caller can safely make a
  * determination of "non-equal tuple" based on simple binary inequality.  A
  * NULL value in leading attribute will set abbreviated value to zeroed
  * representation, which caller may rely on in abbreviated inequality check.
  *
- * The slot receives a copied tuple (sometimes allocated in caller memory
- * context) that will stay valid regardless of future manipulations of the
- * tuplesort's state.
+ * If copy is true, the slot receives a tuple that's been copied into the
+ * caller's memory context, so that it will stay valid regardless of future
+ * manipulations of the tuplesort's state (up to and including deleting the
+ * tuplesort).  If copy is false, the slot will just receive a pointer to a
+ * tuple held within the tuplesort, which is more efficient, but only safe for
+ * callers that are prepared to have any subsequent manipulation of the
+ * tuplesort's state invalidate slot contents.
  */
 bool
-tuplesort_gettupleslot(Tuplesortstate *state, bool forward,
+tuplesort_gettupleslot(Tuplesortstate *state, bool forward, bool copy,
                                           TupleTableSlot *slot, Datum *abbrev)
 {
        MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
@@ -2113,8 +2172,10 @@ tuplesort_gettupleslot(Tuplesortstate *state, bool forward,
                if (state->sortKeys->abbrev_converter && abbrev)
                        *abbrev = stup.datum1;
 
-               stup.tuple = heap_copy_minimal_tuple((MinimalTuple) stup.tuple);
-               ExecStoreMinimalTuple((MinimalTuple) stup.tuple, slot, true);
+               if (copy)
+                       stup.tuple = heap_copy_minimal_tuple((MinimalTuple) stup.tuple);
+
+               ExecStoreMinimalTuple((MinimalTuple) stup.tuple, slot, copy);
                return true;
        }
        else
@@ -2127,8 +2188,8 @@ tuplesort_gettupleslot(Tuplesortstate *state, bool forward,
 /*
  * Fetch the next tuple in either forward or back direction.
  * Returns NULL if no more tuples.  Returned tuple belongs to tuplesort memory
- * context, and must not be freed by caller.  Caller should not use tuple
- * following next call here.
+ * context, and must not be freed by caller.  Caller may not rely on tuple
+ * remaining valid after any further manipulation of tuplesort.
  */
 HeapTuple
 tuplesort_getheaptuple(Tuplesortstate *state, bool forward)
@@ -2147,8 +2208,8 @@ tuplesort_getheaptuple(Tuplesortstate *state, bool forward)
 /*
  * Fetch the next index tuple in either forward or back direction.
  * Returns NULL if no more tuples.  Returned tuple belongs to tuplesort memory
- * context, and must not be freed by caller.  Caller should not use tuple
- * following next call here.
+ * context, and must not be freed by caller.  Caller may not rely on tuple
+ * remaining valid after any further manipulation of tuplesort.
  */
 IndexTuple
 tuplesort_getindextuple(Tuplesortstate *state, bool forward)
@@ -2166,13 +2227,13 @@ tuplesort_getindextuple(Tuplesortstate *state, bool forward)
 
 /*
  * Fetch the next Datum in either forward or back direction.
- * Returns FALSE if no more datums.
+ * Returns false if no more datums.
  *
  * If the Datum is pass-by-ref type, the returned value is freshly palloc'd
- * and is now owned by the caller (this differs from similar routines for
- * other types of tuplesorts).
+ * in caller's context, and is now owned by the caller (this differs from
+ * similar routines for other types of tuplesorts).
  *
- * Caller may optionally be passed back abbreviated value (on TRUE return
+ * Caller may optionally be passed back abbreviated value (on true return
  * value) when abbreviation was used, which can be used to cheaply avoid
  * equality checks that might otherwise be required.  Caller can safely make a
  * determination of "non-equal tuple" based on simple binary inequality.  A
@@ -2192,6 +2253,9 @@ tuplesort_getdatum(Tuplesortstate *state, bool forward,
                return false;
        }
 
+       /* Ensure we copy into caller's memory context */
+       MemoryContextSwitchTo(oldcontext);
+
        /* Record abbreviated key for caller */
        if (state->sortKeys->abbrev_converter && abbrev)
                *abbrev = stup.datum1;
@@ -2208,15 +2272,13 @@ tuplesort_getdatum(Tuplesortstate *state, bool forward,
                *isNull = false;
        }
 
-       MemoryContextSwitchTo(oldcontext);
-
        return true;
 }
 
 /*
  * Advance over N tuples in either forward or back direction,
  * without returning any data.  N==0 is a no-op.
- * Returns TRUE if successful, FALSE if ran out of tuples.
+ * Returns true if successful, false if ran out of tuples.
  */
 bool
 tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
@@ -2229,6 +2291,7 @@ tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
         */
        Assert(forward);
        Assert(ntuples >= 0);
+       Assert(!WORKER(state));
 
        switch (state->status)
        {
@@ -2311,8 +2374,8 @@ tuplesort_merge_order(int64 allowedMem)
         * which in turn can cause the same sort to need more runs, which makes
         * merging slower even if it can still be done in a single pass.  Also,
         * high order merges are quite slow due to CPU cache effects; it can be
-        * faster to pay the I/O cost of a polyphase merge than to perform a single
-        * merge pass across many hundreds of tapes.
+        * faster to pay the I/O cost of a polyphase merge than to perform a
+        * single merge pass across many hundreds of tapes.
         */
        mOrder = Max(mOrder, MINORDER);
        mOrder = Min(mOrder, MAXORDER);
@@ -2320,121 +2383,45 @@ tuplesort_merge_order(int64 allowedMem)
        return mOrder;
 }
 
-/*
- * useselection - determine algorithm to use to sort first run.
- *
- * It can sometimes be useful to use the replacement selection algorithm if it
- * results in one large run, and there is little available workMem.  See
- * remarks on RUN_SECOND optimization within dumptuples().
- */
-static bool
-useselection(Tuplesortstate *state)
-{
-       /*
-        * memtupsize might be noticeably higher than memtupcount here in atypical
-        * cases.  It seems slightly preferable to not allow recent outliers to
-        * impact this determination.  Note that caller's trace_sort output
-        * reports memtupcount instead.
-        */
-       if (state->memtupsize <= replacement_sort_tuples)
-               return true;
-
-       return false;
-}
-
 /*
  * inittapes - initialize for tape sorting.
  *
- * This is called only if we have found we don't have room to sort in memory.
+ * This is called only if we have found we won't sort in memory.
  */
 static void
-inittapes(Tuplesortstate *state)
+inittapes(Tuplesortstate *state, bool mergeruns)
 {
        int                     maxTapes,
                                j;
-       int64           tapeSpace;
-
-       /* Compute number of tapes to use: merge order plus 1 */
-       maxTapes = tuplesort_merge_order(state->allowedMem) + 1;
-
-       state->maxTapes = maxTapes;
-       state->tapeRange = maxTapes - 1;
-
-#ifdef TRACE_SORT
-       if (trace_sort)
-               elog(LOG, "switching to external sort with %d tapes: %s",
-                        maxTapes, pg_rusage_show(&state->ru_start));
-#endif
-
-       /*
-        * Decrease availMem to reflect the space needed for tape buffers, when
-        * writing the initial runs; but don't decrease it to the point that we
-        * have no room for tuples.  (That case is only likely to occur if sorting
-        * pass-by-value Datums; in all other scenarios the memtuples[] array is
-        * unlikely to occupy more than half of allowedMem.  In the pass-by-value
-        * case it's not important to account for tuple space, so we don't care if
-        * LACKMEM becomes inaccurate.)
-        */
-       tapeSpace = (int64) maxTapes *TAPE_BUFFER_OVERHEAD;
-
-       if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
-               USEMEM(state, tapeSpace);
 
-       /*
-        * Make sure that the temp file(s) underlying the tape set are created in
-        * suitable temp tablespaces.
-        */
-       PrepareTempTablespaces();
-
-       /*
-        * Create the tape set and allocate the per-tape data arrays.
-        */
-       state->tapeset = LogicalTapeSetCreate(maxTapes);
-
-       state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool));
-       state->tp_fib = (int *) palloc0(maxTapes * sizeof(int));
-       state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
-       state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int));
-       state->tp_tapenum = (int *) palloc0(maxTapes * sizeof(int));
-
-       /*
-        * Give replacement selection a try based on user setting.  There will be
-        * a switch to a simple hybrid sort-merge strategy after the first run
-        * (iff we could not output one long run).
-        */
-       state->replaceActive = useselection(state);
+       Assert(!LEADER(state));
 
-       if (state->replaceActive)
+       if (mergeruns)
        {
-               /*
-                * Convert the unsorted contents of memtuples[] into a heap. Each
-                * tuple is marked as belonging to run number zero.
-                *
-                * NOTE: we pass false for checkIndex since there's no point in
-                * comparing indexes in this step, even though we do intend the
-                * indexes to be part of the sort key...
-                */
-               int                     ntuples = state->memtupcount;
+               /* Compute number of tapes to use: merge order plus 1 */
+               maxTapes = tuplesort_merge_order(state->allowedMem) + 1;
+       }
+       else
+       {
+               /* Workers can sometimes produce single run, output without merge */
+               Assert(WORKER(state));
+               maxTapes = MINORDER + 1;
+       }
 
 #ifdef TRACE_SORT
-               if (trace_sort)
-                       elog(LOG, "replacement selection will sort %d first run tuples",
-                                state->memtupcount);
+       if (trace_sort)
+               elog(LOG, "worker %d switching to external sort with %d tapes: %s",
+                        state->worker, maxTapes, pg_rusage_show(&state->ru_start));
 #endif
-               state->memtupcount = 0; /* make the heap empty */
 
-               for (j = 0; j < ntuples; j++)
-               {
-                       /* Must copy source tuple to avoid possible overwrite */
-                       SortTuple       stup = state->memtuples[j];
+       /* Create the tape set and allocate the per-tape data arrays */
+       inittapestate(state, maxTapes);
+       state->tapeset =
+               LogicalTapeSetCreate(maxTapes, NULL,
+                                                        state->shared ? &state->shared->fileset : NULL,
+                                                        state->worker);
 
-                       stup.tupindex = RUN_FIRST;
-                       tuplesort_heap_insert(state, &stup, false);
-               }
-               Assert(state->memtupcount == ntuples);
-       }
-
-       state->currentRun = RUN_FIRST;
+       state->currentRun = 0;
 
        /*
         * Initialize variables of Algorithm D (step D1).
@@ -2455,6 +2442,47 @@ inittapes(Tuplesortstate *state)
        state->status = TSS_BUILDRUNS;
 }
 
+/*
+ * inittapestate - initialize generic tape management state
+ */
+static void
+inittapestate(Tuplesortstate *state, int maxTapes)
+{
+       int64           tapeSpace;
+
+       /*
+        * Decrease availMem to reflect the space needed for tape buffers; but
+        * don't decrease it to the point that we have no room for tuples. (That
+        * case is only likely to occur if sorting pass-by-value Datums; in all
+        * other scenarios the memtuples[] array is unlikely to occupy more than
+        * half of allowedMem.  In the pass-by-value case it's not important to
+        * account for tuple space, so we don't care if LACKMEM becomes
+        * inaccurate.)
+        */
+       tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD;
+
+       if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
+               USEMEM(state, tapeSpace);
+
+       /*
+        * Make sure that the temp file(s) underlying the tape set are created in
+        * suitable temp tablespaces.  For parallel sorts, this should have been
+        * called already, but it doesn't matter if it is called a second time.
+        */
+       PrepareTempTablespaces();
+
+       state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool));
+       state->tp_fib = (int *) palloc0(maxTapes * sizeof(int));
+       state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
+       state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int));
+       state->tp_tapenum = (int *) palloc0(maxTapes * sizeof(int));
+
+       /* Record # of tapes allocated (for duration of sort) */
+       state->maxTapes = maxTapes;
+       /* Record maximum # of tapes usable as inputs when merging */
+       state->tapeRange = maxTapes - 1;
+}
+
 /*
  * selectnewtape -- select new tape for new initial run.
  *
@@ -2608,22 +2636,6 @@ mergeruns(Tuplesortstate *state)
        else
                init_slab_allocator(state, 0);
 
-       /*
-        * If we produced only one initial run (quite likely if the total data
-        * volume is between 1X and 2X workMem when replacement selection is used,
-        * but something we particular count on when input is presorted), we can
-        * just use that tape as the finished output, rather than doing a useless
-        * merge.  (This obvious optimization is not in Knuth's algorithm.)
-        */
-       if (state->currentRun == RUN_SECOND)
-       {
-               state->result_tape = state->tp_tapenum[state->destTape];
-               /* must freeze and rewind the finished output tape */
-               LogicalTapeFreeze(state->tapeset, state->result_tape);
-               state->status = TSS_SORTEDONTAPE;
-               return;
-       }
-
        /*
         * Allocate a new 'memtuples' array, for the heap.  It will hold one tuple
         * from each input tape.
@@ -2636,11 +2648,6 @@ mergeruns(Tuplesortstate *state)
         * Use all the remaining memory we have available for read buffers among
         * the input tapes.
         *
-        * We do this only after checking for the case that we produced only one
-        * initial run, because there is no need to use a large read buffer when
-        * we're reading from a single tape.  With one tape, the I/O pattern will
-        * be the same regardless of the buffer size.
-        *
         * We don't try to "rebalance" the memory among tapes, when we start a new
         * merge phase, even if some tapes are inactive in the new phase.  That
         * would be hard, because logtape.c doesn't know where one run ends and
@@ -2653,8 +2660,8 @@ mergeruns(Tuplesortstate *state)
         */
 #ifdef TRACE_SORT
        if (trace_sort)
-               elog(LOG, "using " INT64_FORMAT " KB of memory for read buffers among %d input tapes",
-                        (state->availMem) / 1024, numInputTapes);
+               elog(LOG, "worker %d using " INT64_FORMAT " KB of memory for read buffers among %d input tapes",
+                        state->worker, state->availMem / 1024, numInputTapes);
 #endif
 
        state->read_buffer_size = Max(state->availMem / numInputTapes, 0);
@@ -2672,7 +2679,7 @@ mergeruns(Tuplesortstate *state)
                 * pass remains.  If we don't have to produce a materialized sorted
                 * tape, we can stop at this point and do the final merge on-the-fly.
                 */
-               if (!state->randomAccess)
+               if (!state->randomAccess && !WORKER(state))
                {
                        bool            allOneRun = true;
 
@@ -2757,7 +2764,10 @@ mergeruns(Tuplesortstate *state)
         * a waste of cycles anyway...
         */
        state->result_tape = state->tp_tapenum[state->tapeRange];
-       LogicalTapeFreeze(state->tapeset, state->result_tape);
+       if (!WORKER(state))
+               LogicalTapeFreeze(state->tapeset, state->result_tape, NULL);
+       else
+               worker_freeze_result_tape(state);
        state->status = TSS_SORTEDONTAPE;
 
        /* Release the read buffers of all the other tapes, by rewinding them. */
@@ -2800,7 +2810,8 @@ mergeonerun(Tuplesortstate *state)
                WRITETUP(state, destTape, &state->memtuples[0]);
 
                /* recycle the slot of the tuple we just wrote out, for the next read */
-               RELEASE_SLAB_SLOT(state, state->memtuples[0].tuple);
+               if (state->memtuples[0].tuple)
+                       RELEASE_SLAB_SLOT(state, state->memtuples[0].tuple);
 
                /*
                 * pull next tuple from the tape, and replace the written-out tuple in
@@ -2809,11 +2820,11 @@ mergeonerun(Tuplesortstate *state)
                if (mergereadnext(state, srcTape, &stup))
                {
                        stup.tupindex = srcTape;
-                       tuplesort_heap_replace_top(state, &stup, false);
+                       tuplesort_heap_replace_top(state, &stup);
 
                }
                else
-                       tuplesort_heap_delete_top(state, false);
+                       tuplesort_heap_delete_top(state);
        }
 
        /*
@@ -2825,8 +2836,8 @@ mergeonerun(Tuplesortstate *state)
 
 #ifdef TRACE_SORT
        if (trace_sort)
-               elog(LOG, "finished %d-way merge step: %s", state->activeTapes,
-                        pg_rusage_show(&state->ru_start));
+               elog(LOG, "worker %d finished %d-way merge step: %s", state->worker,
+                        state->activeTapes, pg_rusage_show(&state->ru_start));
 #endif
 }
 
@@ -2875,7 +2886,7 @@ beginmerge(Tuplesortstate *state)
                if (mergereadnext(state, srcTape, &tup))
                {
                        tup.tupindex = srcTape;
-                       tuplesort_heap_insert(state, &tup, false);
+                       tuplesort_heap_insert(state, &tup);
                }
        }
 }
@@ -2905,124 +2916,25 @@ mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup)
 }
 
 /*
- * dumptuples - remove tuples from memtuples and write to tape
+ * dumptuples - remove tuples from memtuples and write initial run to tape
  *
- * This is used during initial-run building, but not during merging.
- *
- * When alltuples = false and replacement selection is still active, dump
- * only enough tuples to get under the availMem limit (and leave at least
- * one tuple in memtuples, since puttuple will then assume it is a heap that
- * has a tuple to compare to).  We always insist there be at least one free
- * slot in the memtuples[] array.
- *
- * When alltuples = true, dump everything currently in memory.  (This
- * case is only used at end of input data, although in practice only the
- * first run could fail to dump all tuples when we LACKMEM(), and only
- * when replacement selection is active.)
- *
- * If, when replacement selection is active, we see that the tuple run
- * number at the top of the heap has changed, start a new run.  This must be
- * the first run, because replacement selection is always abandoned for all
- * further runs.
+ * When alltuples = true, dump everything currently in memory.  (This case is
+ * only used at end of input data.)
  */
 static void
 dumptuples(Tuplesortstate *state, bool alltuples)
-{
-       while (alltuples ||
-                  (LACKMEM(state) && state->memtupcount > 1) ||
-                  state->memtupcount >= state->memtupsize)
-       {
-               if (state->replaceActive)
-               {
-                       /*
-                        * Still holding out for a case favorable to replacement
-                        * selection. Still incrementally spilling using heap.
-                        *
-                        * Dump the heap's frontmost entry, and remove it from the heap.
-                        */
-                       Assert(state->memtupcount > 0);
-                       WRITETUP(state, state->tp_tapenum[state->destTape],
-                                        &state->memtuples[0]);
-                       tuplesort_heap_delete_top(state, true);
-               }
-               else
-               {
-                       /*
-                        * Once committed to quicksorting runs, never incrementally spill
-                        */
-                       dumpbatch(state, alltuples);
-                       break;
-               }
-
-               /*
-                * If top run number has changed, we've finished the current run (this
-                * can only be the first run), and will no longer spill incrementally.
-                */
-               if (state->memtupcount == 0 ||
-                       state->memtuples[0].tupindex == HEAP_RUN_NEXT)
-               {
-                       markrunend(state, state->tp_tapenum[state->destTape]);
-                       Assert(state->currentRun == RUN_FIRST);
-                       state->currentRun++;
-                       state->tp_runs[state->destTape]++;
-                       state->tp_dummy[state->destTape]--; /* per Alg D step D2 */
-
-#ifdef TRACE_SORT
-                       if (trace_sort)
-                               elog(LOG, "finished incrementally writing %s run %d to tape %d: %s",
-                                        (state->memtupcount == 0) ? "only" : "first",
-                                        state->currentRun, state->destTape,
-                                        pg_rusage_show(&state->ru_start));
-#endif
-
-                       /*
-                        * Done if heap is empty, which is possible when there is only one
-                        * long run.
-                        */
-                       Assert(state->currentRun == RUN_SECOND);
-                       if (state->memtupcount == 0)
-                       {
-                               /*
-                                * Replacement selection best case; no final merge required,
-                                * because there was only one initial run (second run has no
-                                * tuples).  See RUN_SECOND case in mergeruns().
-                                */
-                               break;
-                       }
-
-                       /*
-                        * Abandon replacement selection for second run (as well as any
-                        * subsequent runs).
-                        */
-                       state->replaceActive = false;
-
-                       /*
-                        * First tuple of next run should not be heapified, and so will
-                        * bear placeholder run number.  In practice this must actually be
-                        * the second run, which just became the currentRun, so we're
-                        * clear to quicksort and dump the tuples in batch next time
-                        * memtuples becomes full.
-                        */
-                       Assert(state->memtuples[0].tupindex == HEAP_RUN_NEXT);
-                       selectnewtape(state);
-               }
-       }
-}
-
-/*
- * dumpbatch - sort and dump all memtuples, forming one run on tape
- *
- * Second or subsequent runs are never heapified by this module (although
- * heapification still respects run number differences between the first and
- * second runs), and a heap (replacement selection priority queue) is often
- * avoided in the first place.
- */
-static void
-dumpbatch(Tuplesortstate *state, bool alltuples)
 {
        int                     memtupwrite;
        int                     i;
 
+       /*
+        * Nothing to do if we still fit in available memory and have array slots,
+        * unless this is the final call during initial run generation.
+        */
+       if (state->memtupcount < state->memtupsize && !LACKMEM(state) &&
+               !alltuples)
+               return;
+
        /*
         * Final call might require no sorting, in rare cases where we just so
         * happen to have previously LACKMEM()'d at the point where exactly all
@@ -3059,8 +2971,9 @@ dumpbatch(Tuplesortstate *state, bool alltuples)
 
 #ifdef TRACE_SORT
        if (trace_sort)
-               elog(LOG, "starting quicksort of run %d: %s",
-                        state->currentRun, pg_rusage_show(&state->ru_start));
+               elog(LOG, "worker %d starting quicksort of run %d: %s",
+                        state->worker, state->currentRun,
+                        pg_rusage_show(&state->ru_start));
 #endif
 
        /*
@@ -3071,8 +2984,9 @@ dumpbatch(Tuplesortstate *state, bool alltuples)
 
 #ifdef TRACE_SORT
        if (trace_sort)
-               elog(LOG, "finished quicksort of run %d: %s",
-                        state->currentRun, pg_rusage_show(&state->ru_start));
+               elog(LOG, "worker %d finished quicksort of run %d: %s",
+                        state->worker, state->currentRun,
+                        pg_rusage_show(&state->ru_start));
 #endif
 
        memtupwrite = state->memtupcount;
@@ -3098,8 +3012,8 @@ dumpbatch(Tuplesortstate *state, bool alltuples)
 
 #ifdef TRACE_SORT
        if (trace_sort)
-               elog(LOG, "finished writing run %d to tape %d: %s",
-                        state->currentRun, state->destTape,
+               elog(LOG, "worker %d finished writing run %d to tape %d: %s",
+                        state->worker, state->currentRun, state->destTape,
                         pg_rusage_show(&state->ru_start));
 #endif
 
@@ -3210,13 +3124,10 @@ tuplesort_restorepos(Tuplesortstate *state)
  *
  * This can be called after tuplesort_performsort() finishes to obtain
  * printable summary information about how the sort was performed.
- * spaceUsed is measured in kilobytes.
  */
 void
 tuplesort_get_stats(Tuplesortstate *state,
-                                       const char **sortMethod,
-                                       const char **spaceType,
-                                       long *spaceUsed)
+                                       TuplesortInstrumentation *stats)
 {
        /*
         * Note: it might seem we should provide both memory and disk usage for a
@@ -3229,53 +3140,73 @@ tuplesort_get_stats(Tuplesortstate *state,
         */
        if (state->tapeset)
        {
-               *spaceType = "Disk";
-               *spaceUsed = LogicalTapeSetBlocks(state->tapeset) * (BLCKSZ / 1024);
+               stats->spaceType = SORT_SPACE_TYPE_DISK;
+               stats->spaceUsed = LogicalTapeSetBlocks(state->tapeset) * (BLCKSZ / 1024);
        }
        else
        {
-               *spaceType = "Memory";
-               *spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
+               stats->spaceType = SORT_SPACE_TYPE_MEMORY;
+               stats->spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
        }
 
        switch (state->status)
        {
                case TSS_SORTEDINMEM:
                        if (state->boundUsed)
-                               *sortMethod = "top-N heapsort";
+                               stats->sortMethod = SORT_TYPE_TOP_N_HEAPSORT;
                        else
-                               *sortMethod = "quicksort";
+                               stats->sortMethod = SORT_TYPE_QUICKSORT;
                        break;
                case TSS_SORTEDONTAPE:
-                       *sortMethod = "external sort";
+                       stats->sortMethod = SORT_TYPE_EXTERNAL_SORT;
                        break;
                case TSS_FINALMERGE:
-                       *sortMethod = "external merge";
+                       stats->sortMethod = SORT_TYPE_EXTERNAL_MERGE;
                        break;
                default:
-                       *sortMethod = "still in progress";
+                       stats->sortMethod = SORT_TYPE_STILL_IN_PROGRESS;
                        break;
        }
 }
 
+/*
+ * Convert TuplesortMethod to a string.
+ */
+const char *
+tuplesort_method_name(TuplesortMethod m)
+{
+       switch (m)
+       {
+               case SORT_TYPE_STILL_IN_PROGRESS:
+                       return "still in progress";
+               case SORT_TYPE_TOP_N_HEAPSORT:
+                       return "top-N heapsort";
+               case SORT_TYPE_QUICKSORT:
+                       return "quicksort";
+               case SORT_TYPE_EXTERNAL_SORT:
+                       return "external sort";
+               case SORT_TYPE_EXTERNAL_MERGE:
+                       return "external merge";
+       }
+
+       return "unknown";
+}
+
+/*
+ * Convert TuplesortSpaceType to a string.
+ */
+const char *
+tuplesort_space_type_name(TuplesortSpaceType t)
+{
+       Assert(t == SORT_SPACE_TYPE_DISK || t == SORT_SPACE_TYPE_MEMORY);
+       return t == SORT_SPACE_TYPE_DISK ? "Disk" : "Memory";
+}
+
 
 /*
  * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
- *
- * Compare two SortTuples.  If checkIndex is true, use the tuple index
- * as the front of the sort key; otherwise, no.
- *
- * Note that for checkIndex callers, the heap invariant is never
- * maintained beyond the first run, and so there are no COMPARETUP()
- * calls needed to distinguish tuples in HEAP_RUN_NEXT.
  */
 
-#define HEAPCOMPARE(tup1,tup2) \
-       (checkIndex && ((tup1)->tupindex != (tup2)->tupindex || \
-                                       (tup1)->tupindex == HEAP_RUN_NEXT) ? \
-        ((tup1)->tupindex) - ((tup2)->tupindex) : \
-        COMPARETUP(state, tup1, tup2))
-
 /*
  * Convert the existing unordered array of SortTuples to a bounded heap,
  * discarding all but the smallest "state->bound" tuples.
@@ -3284,10 +3215,6 @@ tuplesort_get_stats(Tuplesortstate *state,
  * at the root (array entry zero), instead of the smallest as in the normal
  * sort case.  This allows us to discard the largest entry cheaply.
  * Therefore, we temporarily reverse the sort direction.
- *
- * We assume that all entries in a bounded heap will always have tupindex
- * zero; it therefore doesn't matter that HEAPCOMPARE() doesn't reverse
- * the direction of comparison for tupindexes.
  */
 static void
 make_bounded_heap(Tuplesortstate *state)
@@ -3298,6 +3225,7 @@ make_bounded_heap(Tuplesortstate *state)
        Assert(state->status == TSS_INITIAL);
        Assert(state->bounded);
        Assert(tupcount >= state->bound);
+       Assert(SERIAL(state));
 
        /* Reverse sort direction so largest entry will be at root */
        reversedirection(state);
@@ -3311,8 +3239,7 @@ make_bounded_heap(Tuplesortstate *state)
                        /* Must copy source tuple to avoid possible overwrite */
                        SortTuple       stup = state->memtuples[i];
 
-                       stup.tupindex = 0;      /* not used */
-                       tuplesort_heap_insert(state, &stup, false);
+                       tuplesort_heap_insert(state, &stup);
                }
                else
                {
@@ -3327,7 +3254,7 @@ make_bounded_heap(Tuplesortstate *state)
                                CHECK_FOR_INTERRUPTS();
                        }
                        else
-                               tuplesort_heap_replace_top(state, &state->memtuples[i], false);
+                               tuplesort_heap_replace_top(state, &state->memtuples[i]);
                }
        }
 
@@ -3346,6 +3273,7 @@ sort_bounded_heap(Tuplesortstate *state)
        Assert(state->status == TSS_BOUNDED);
        Assert(state->bounded);
        Assert(tupcount == state->bound);
+       Assert(SERIAL(state));
 
        /*
         * We can unheapify in place because each delete-top call will remove the
@@ -3357,7 +3285,7 @@ sort_bounded_heap(Tuplesortstate *state)
                SortTuple       stup = state->memtuples[0];
 
                /* this sifts-up the next-largest entry and decreases memtupcount */
-               tuplesort_heap_delete_top(state, false);
+               tuplesort_heap_delete_top(state);
                state->memtuples[state->memtupcount] = stup;
        }
        state->memtupcount = tupcount;
@@ -3375,14 +3303,13 @@ sort_bounded_heap(Tuplesortstate *state)
 /*
  * Sort all memtuples using specialized qsort() routines.
  *
- * Quicksort is used for small in-memory sorts.  Quicksort is also generally
- * preferred to replacement selection for generating runs during external sort
- * operations, although replacement selection is sometimes used for the first
- * run.
+ * Quicksort is used for small in-memory sorts, and external sort runs.
  */
 static void
 tuplesort_sort_memtuples(Tuplesortstate *state)
 {
+       Assert(!LEADER(state));
+
        if (state->memtupcount > 1)
        {
                /* Can we use the single-key sort function? */
@@ -3407,15 +3334,13 @@ tuplesort_sort_memtuples(Tuplesortstate *state)
  * is, it might get overwritten before being moved into the heap!
  */
 static void
-tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple,
-                                         bool checkIndex)
+tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple)
 {
        SortTuple  *memtuples;
        int                     j;
 
        memtuples = state->memtuples;
        Assert(state->memtupcount < state->memtupsize);
-       Assert(!checkIndex || tuple->tupindex == RUN_FIRST);
 
        CHECK_FOR_INTERRUPTS();
 
@@ -3428,7 +3353,7 @@ tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple,
        {
                int                     i = (j - 1) >> 1;
 
-               if (HEAPCOMPARE(tuple, &memtuples[i]) >= 0)
+               if (COMPARETUP(state, tuple, &memtuples[i]) >= 0)
                        break;
                memtuples[j] = memtuples[i];
                j = i;
@@ -3444,12 +3369,11 @@ tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple,
  * if necessary.
  */
 static void
-tuplesort_heap_delete_top(Tuplesortstate *state, bool checkIndex)
+tuplesort_heap_delete_top(Tuplesortstate *state)
 {
        SortTuple  *memtuples = state->memtuples;
        SortTuple  *tuple;
 
-       Assert(!checkIndex || state->currentRun == RUN_FIRST);
        if (--state->memtupcount <= 0)
                return;
 
@@ -3458,7 +3382,7 @@ tuplesort_heap_delete_top(Tuplesortstate *state, bool checkIndex)
         * current top node with it.
         */
        tuple = &memtuples[state->memtupcount];
-       tuplesort_heap_replace_top(state, tuple, checkIndex);
+       tuplesort_heap_replace_top(state, tuple);
 }
 
 /*
@@ -3469,30 +3393,33 @@ tuplesort_heap_delete_top(Tuplesortstate *state, bool checkIndex)
  * Heapsort, steps H3-H8).
  */
 static void
-tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple,
-                                                  bool checkIndex)
+tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple)
 {
        SortTuple  *memtuples = state->memtuples;
-       int                     i,
+       unsigned int i,
                                n;
 
-       Assert(!checkIndex || state->currentRun == RUN_FIRST);
        Assert(state->memtupcount >= 1);
 
        CHECK_FOR_INTERRUPTS();
 
+       /*
+        * state->memtupcount is "int", but we use "unsigned int" for i, j, n.
+        * This prevents overflow in the "2 * i + 1" calculation, since at the top
+        * of the loop we must have i < n <= INT_MAX <= UINT_MAX/2.
+        */
        n = state->memtupcount;
        i = 0;                                          /* i is where the "hole" is */
        for (;;)
        {
-               int                     j = 2 * i + 1;
+               unsigned int j = 2 * i + 1;
 
                if (j >= n)
                        break;
                if (j + 1 < n &&
-                       HEAPCOMPARE(&memtuples[j], &memtuples[j + 1]) > 0)
+                       COMPARETUP(state, &memtuples[j], &memtuples[j + 1]) > 0)
                        j++;
-               if (HEAPCOMPARE(tuple, &memtuples[j]) <= 0)
+               if (COMPARETUP(state, tuple, &memtuples[j]) <= 0)
                        break;
                memtuples[i] = memtuples[j];
                i = j;
@@ -3791,7 +3718,7 @@ comparetup_cluster(const SortTuple *a, const SortTuple *b,
                                datum2;
        bool            isnull1,
                                isnull2;
-       AttrNumber      leading = state->indexInfo->ii_KeyAttrNumbers[0];
+       AttrNumber      leading = state->indexInfo->ii_IndexAttrNumbers[0];
 
        /* Be prepared to compare additional sort keys */
        ltup = (HeapTuple) a->tuple;
@@ -3834,7 +3761,7 @@ comparetup_cluster(const SortTuple *a, const SortTuple *b,
 
                for (; nkey < state->nKeys; nkey++, sortKey++)
                {
-                       AttrNumber      attno = state->indexInfo->ii_KeyAttrNumbers[nkey];
+                       AttrNumber      attno = state->indexInfo->ii_IndexAttrNumbers[nkey];
 
                        datum1 = heap_getattr(ltup, attno, tupDesc, &isnull1);
                        datum2 = heap_getattr(rtup, attno, tupDesc, &isnull2);
@@ -3865,11 +3792,11 @@ comparetup_cluster(const SortTuple *a, const SortTuple *b,
 
                ecxt_scantuple = GetPerTupleExprContext(state->estate)->ecxt_scantuple;
 
-               ExecStoreTuple(ltup, ecxt_scantuple, InvalidBuffer, false);
+               ExecStoreHeapTuple(ltup, ecxt_scantuple, false);
                FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate,
                                           l_index_values, l_index_isnull);
 
-               ExecStoreTuple(rtup, ecxt_scantuple, InvalidBuffer, false);
+               ExecStoreHeapTuple(rtup, ecxt_scantuple, false);
                FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate,
                                           r_index_values, r_index_isnull);
 
@@ -3906,11 +3833,11 @@ copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup)
         * set up first-column key value, and potentially abbreviate, if it's a
         * simple column
         */
-       if (state->indexInfo->ii_KeyAttrNumbers[0] == 0)
+       if (state->indexInfo->ii_IndexAttrNumbers[0] == 0)
                return;
 
        original = heap_getattr(tuple,
-                                                       state->indexInfo->ii_KeyAttrNumbers[0],
+                                                       state->indexInfo->ii_IndexAttrNumbers[0],
                                                        state->tupDesc,
                                                        &stup->isnull1);
 
@@ -3954,7 +3881,7 @@ copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup)
 
                        tuple = (HeapTuple) mtup->tuple;
                        mtup->datum1 = heap_getattr(tuple,
-                                                                         state->indexInfo->ii_KeyAttrNumbers[0],
+                                                                               state->indexInfo->ii_IndexAttrNumbers[0],
                                                                                state->tupDesc,
                                                                                &mtup->isnull1);
                }
@@ -4008,9 +3935,9 @@ readtup_cluster(Tuplesortstate *state, SortTuple *stup,
                                                         &tuplen, sizeof(tuplen));
        stup->tuple = (void *) tuple;
        /* set up first-column key value, if it's a simple column */
-       if (state->indexInfo->ii_KeyAttrNumbers[0] != 0)
+       if (state->indexInfo->ii_IndexAttrNumbers[0] != 0)
                stup->datum1 = heap_getattr(tuple,
-                                                                       state->indexInfo->ii_KeyAttrNumbers[0],
+                                                                       state->indexInfo->ii_IndexAttrNumbers[0],
                                                                        state->tupDesc,
                                                                        &stup->isnull1);
 }
@@ -4126,13 +4053,14 @@ comparetup_index_btree(const SortTuple *a, const SortTuple *b,
                                 key_desc ? errdetail("Key %s is duplicated.", key_desc) :
                                 errdetail("Duplicate keys exist."),
                                 errtableconstraint(state->heapRel,
-                                                                RelationGetRelationName(state->indexRel))));
+                                                                       RelationGetRelationName(state->indexRel))));
        }
 
        /*
-        * If key values are equal, we sort on ItemPointer.  This does not affect
-        * validity of the finished index, but it may be useful to have index
-        * scans in physical order.
+        * If key values are equal, we sort on ItemPointer.  This is required for
+        * btree indexes, since heap TID is treated as an implicit last key
+        * attribute in order to ensure that all keys in the index are physically
+        * unique.
         */
        {
                BlockNumber blk1 = ItemPointerGetBlockNumber(&tuple1->t_tid);
@@ -4149,6 +4077,9 @@ comparetup_index_btree(const SortTuple *a, const SortTuple *b,
                        return (pos1 < pos2) ? -1 : 1;
        }
 
+       /* ItemPointer values should never be equal */
+       Assert(false);
+
        return 0;
 }
 
@@ -4156,8 +4087,8 @@ static int
 comparetup_index_hash(const SortTuple *a, const SortTuple *b,
                                          Tuplesortstate *state)
 {
-       uint32          hash1;
-       uint32          hash2;
+       Bucket          bucket1;
+       Bucket          bucket2;
        IndexTuple      tuple1;
        IndexTuple      tuple2;
 
@@ -4166,13 +4097,16 @@ comparetup_index_hash(const SortTuple *a, const SortTuple *b,
         * that the first column of the index tuple is the hash key.
         */
        Assert(!a->isnull1);
-       hash1 = DatumGetUInt32(a->datum1) & state->hash_mask;
+       bucket1 = _hash_hashkey2bucket(DatumGetUInt32(a->datum1),
+                                                                  state->max_buckets, state->high_mask,
+                                                                  state->low_mask);
        Assert(!b->isnull1);
-       hash2 = DatumGetUInt32(b->datum1) & state->hash_mask;
-
-       if (hash1 > hash2)
+       bucket2 = _hash_hashkey2bucket(DatumGetUInt32(b->datum1),
+                                                                  state->max_buckets, state->high_mask,
+                                                                  state->low_mask);
+       if (bucket1 > bucket2)
                return 1;
-       else if (hash1 < hash2)
+       else if (bucket1 < bucket2)
                return -1;
 
        /*
@@ -4198,6 +4132,9 @@ comparetup_index_hash(const SortTuple *a, const SortTuple *b,
                        return (pos1 < pos2) ? -1 : 1;
        }
 
+       /* ItemPointer values should never be equal */
+       Assert(false);
+
        return 0;
 }
 
@@ -4328,7 +4265,7 @@ comparetup_datum(const SortTuple *a, const SortTuple *b, Tuplesortstate *state)
 
        if (state->sortKeys->abbrev_converter)
                compare = ApplySortAbbrevFullComparator(PointerGetDatum(a->tuple), a->isnull1,
-                                                                          PointerGetDatum(b->tuple), b->isnull1,
+                                                                                               PointerGetDatum(b->tuple), b->isnull1,
                                                                                                state->sortKeys);
 
        return compare;
@@ -4419,6 +4356,229 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup,
                                                         &tuplen, sizeof(tuplen));
 }
 
+/*
+ * Parallel sort routines
+ */
+
+/*
+ * tuplesort_estimate_shared - estimate required shared memory allocation
+ *
+ * nWorkers is an estimate of the number of workers (it's the number that
+ * will be requested).
+ */
+Size
+tuplesort_estimate_shared(int nWorkers)
+{
+       Size            tapesSize;
+
+       Assert(nWorkers > 0);
+
+       /* Make sure that BufFile shared state is MAXALIGN'd */
+       tapesSize = mul_size(sizeof(TapeShare), nWorkers);
+       tapesSize = MAXALIGN(add_size(tapesSize, offsetof(Sharedsort, tapes)));
+
+       return tapesSize;
+}
+
+/*
+ * tuplesort_initialize_shared - initialize shared tuplesort state
+ *
+ * Must be called from leader process before workers are launched, to
+ * establish state needed up-front for worker tuplesortstates.  nWorkers
+ * should match the argument passed to tuplesort_estimate_shared().
+ */
+void
+tuplesort_initialize_shared(Sharedsort *shared, int nWorkers, dsm_segment *seg)
+{
+       int                     i;
+
+       Assert(nWorkers > 0);
+
+       SpinLockInit(&shared->mutex);
+       shared->currentWorker = 0;
+       shared->workersFinished = 0;
+       SharedFileSetInit(&shared->fileset, seg);
+       shared->nTapes = nWorkers;
+       for (i = 0; i < nWorkers; i++)
+       {
+               shared->tapes[i].firstblocknumber = 0L;
+       }
+}
+
+/*
+ * tuplesort_attach_shared - attach to shared tuplesort state
+ *
+ * Must be called by all worker processes.
+ */
+void
+tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg)
+{
+       /* Attach to SharedFileSet */
+       SharedFileSetAttach(&shared->fileset, seg);
+}
+
+/*
+ * worker_get_identifier - Assign and return ordinal identifier for worker
+ *
+ * The order in which these are assigned is not well defined, and should not
+ * matter; worker numbers across parallel sort participants need only be
+ * distinct and gapless.  logtape.c requires this.
+ *
+ * Note that the identifiers assigned from here have no relation to
+ * ParallelWorkerNumber number, to avoid making any assumption about
+ * caller's requirements.  However, we do follow the ParallelWorkerNumber
+ * convention of representing a non-worker with worker number -1.  This
+ * includes the leader, as well as serial Tuplesort processes.
+ */
+static int
+worker_get_identifier(Tuplesortstate *state)
+{
+       Sharedsort *shared = state->shared;
+       int                     worker;
+
+       Assert(WORKER(state));
+
+       SpinLockAcquire(&shared->mutex);
+       worker = shared->currentWorker++;
+       SpinLockRelease(&shared->mutex);
+
+       return worker;
+}
+
+/*
+ * worker_freeze_result_tape - freeze worker's result tape for leader
+ *
+ * This is called by workers just after the result tape has been determined,
+ * instead of calling LogicalTapeFreeze() directly.  They do so because
+ * workers require a few additional steps over similar serial
+ * TSS_SORTEDONTAPE external sort cases, which also happen here.  The extra
+ * steps are around freeing now unneeded resources, and representing to
+ * leader that worker's input run is available for its merge.
+ *
+ * There should only be one final output run for each worker, which consists
+ * of all tuples that were originally input into worker.
+ */
+static void
+worker_freeze_result_tape(Tuplesortstate *state)
+{
+       Sharedsort *shared = state->shared;
+       TapeShare       output;
+
+       Assert(WORKER(state));
+       Assert(state->result_tape != -1);
+       Assert(state->memtupcount == 0);
+
+       /*
+        * Free most remaining memory, in case caller is sensitive to our holding
+        * on to it.  memtuples may not be a tiny merge heap at this point.
+        */
+       pfree(state->memtuples);
+       /* Be tidy */
+       state->memtuples = NULL;
+       state->memtupsize = 0;
+
+       /*
+        * Parallel worker requires result tape metadata, which is to be stored in
+        * shared memory for leader
+        */
+       LogicalTapeFreeze(state->tapeset, state->result_tape, &output);
+
+       /* Store properties of output tape, and update finished worker count */
+       SpinLockAcquire(&shared->mutex);
+       shared->tapes[state->worker] = output;
+       shared->workersFinished++;
+       SpinLockRelease(&shared->mutex);
+}
+
+/*
+ * worker_nomergeruns - dump memtuples in worker, without merging
+ *
+ * This called as an alternative to mergeruns() with a worker when no
+ * merging is required.
+ */
+static void
+worker_nomergeruns(Tuplesortstate *state)
+{
+       Assert(WORKER(state));
+       Assert(state->result_tape == -1);
+
+       state->result_tape = state->tp_tapenum[state->destTape];
+       worker_freeze_result_tape(state);
+}
+
+/*
+ * leader_takeover_tapes - create tapeset for leader from worker tapes
+ *
+ * So far, leader Tuplesortstate has performed no actual sorting.  By now, all
+ * sorting has occurred in workers, all of which must have already returned
+ * from tuplesort_performsort().
+ *
+ * When this returns, leader process is left in a state that is virtually
+ * indistinguishable from it having generated runs as a serial external sort
+ * might have.
+ */
+static void
+leader_takeover_tapes(Tuplesortstate *state)
+{
+       Sharedsort *shared = state->shared;
+       int                     nParticipants = state->nParticipants;
+       int                     workersFinished;
+       int                     j;
+
+       Assert(LEADER(state));
+       Assert(nParticipants >= 1);
+
+       SpinLockAcquire(&shared->mutex);
+       workersFinished = shared->workersFinished;
+       SpinLockRelease(&shared->mutex);
+
+       if (nParticipants != workersFinished)
+               elog(ERROR, "cannot take over tapes before all workers finish");
+
+       /*
+        * Create the tapeset from worker tapes, including a leader-owned tape at
+        * the end.  Parallel workers are far more expensive than logical tapes,
+        * so the number of tapes allocated here should never be excessive.
+        *
+        * We still have a leader tape, though it's not possible to write to it
+        * due to restrictions in the shared fileset infrastructure used by
+        * logtape.c.  It will never be written to in practice because
+        * randomAccess is disallowed for parallel sorts.
+        */
+       inittapestate(state, nParticipants + 1);
+       state->tapeset = LogicalTapeSetCreate(nParticipants + 1, shared->tapes,
+                                                                                 &shared->fileset, state->worker);
+
+       /* mergeruns() relies on currentRun for # of runs (in one-pass cases) */
+       state->currentRun = nParticipants;
+
+       /*
+        * Initialize variables of Algorithm D to be consistent with runs from
+        * workers having been generated in the leader.
+        *
+        * There will always be exactly 1 run per worker, and exactly one input
+        * tape per run, because workers always output exactly 1 run, even when
+        * there were no input tuples for workers to sort.
+        */
+       for (j = 0; j < state->maxTapes; j++)
+       {
+               /* One real run; no dummy runs for worker tapes */
+               state->tp_fib[j] = 1;
+               state->tp_runs[j] = 1;
+               state->tp_dummy[j] = 0;
+               state->tp_tapenum[j] = j;
+       }
+       /* Leader tape gets one dummy run, and no real runs */
+       state->tp_fib[state->tapeRange] = 0;
+       state->tp_runs[state->tapeRange] = 0;
+       state->tp_dummy[state->tapeRange] = 1;
+
+       state->Level = 1;
+       state->destTape = 0;
+
+       state->status = TSS_BUILDRUNS;
+}
+
 /*
  * Convenience routine to free a tuple previously loaded into sort memory
  */