* algorithm.
*
* See Knuth, volume 3, for more than you want to know about the external
- * sorting algorithm. We divide the input into sorted runs using replacement
- * selection, in the form of a priority tree implemented as a heap
- * (essentially his Algorithm 5.2.3H), then merge the runs using polyphase
- * merge, Knuth's Algorithm 5.4.2D. The logical "tapes" used by Algorithm D
- * are implemented by logtape.c, which avoids space wastage by recycling
- * disk space as soon as each block is read from its "tape".
- *
- * We do not form the initial runs using Knuth's recommended replacement
- * selection data structure (Algorithm 5.4.1R), because it uses a fixed
- * number of records in memory at all times. Since we are dealing with
- * tuples that may vary considerably in size, we want to be able to vary
- * the number of records kept in memory to ensure full utilization of the
- * allowed sort memory space. So, we keep the tuples in a variable-size
- * heap, with the next record to go out at the top of the heap. Like
- * Algorithm 5.4.1R, each record is stored with the run number that it
- * must go into, and we use (run number, key) as the ordering key for the
- * heap. When the run number at the top of the heap changes, we know that
- * no more records of the prior run are left in the heap.
+ * sorting algorithm. Historically, we divided the input into sorted runs
+ * using replacement selection, in the form of a priority tree implemented
+ * as a heap (essentially his Algorithm 5.2.3H), but now we always use
+ * quicksort for run generation. We merge the runs using polyphase merge,
+ * Knuth's Algorithm 5.4.2D. The logical "tapes" used by Algorithm D are
+ * implemented by logtape.c, which avoids space wastage by recycling disk
+ * space as soon as each block is read from its "tape".
*
* The approximate amount of memory allowed for any one sort operation
* is specified in kilobytes by the caller (most pass work_mem). Initially,
* we haven't exceeded workMem. If we reach the end of the input without
* exceeding workMem, we sort the array using qsort() and subsequently return
* tuples just by scanning the tuple array sequentially. If we do exceed
- * workMem, we construct a heap using Algorithm H and begin to emit tuples
- * into sorted runs in temporary tapes, emitting just enough tuples at each
- * step to get back within the workMem limit. Whenever the run number at
- * the top of the heap changes, we begin a new run with a new output tape
- * (selected per Algorithm D). After the end of the input is reached,
- * we dump out remaining tuples in memory into a final run (or two),
+ * workMem, we begin to emit tuples into sorted runs in temporary tapes.
+ * When tuples are dumped in batch after quicksorting, we begin a new run
+ * with a new output tape (selected per Algorithm D). After the end of the
+ * input is reached, we dump out remaining tuples in memory into a final run,
* then merge the runs using Algorithm D.
*
* When merging runs, we use a heap containing just the frontmost tuple from
- * each source run; we repeatedly output the smallest tuple and insert the
- * next tuple from its source tape (if any). When the heap empties, the merge
- * is complete. The basic merge algorithm thus needs very little memory ---
- * only M tuples for an M-way merge, and M is constrained to a small number.
- * However, we can still make good use of our full workMem allocation by
- * pre-reading additional tuples from each source tape. Without prereading,
- * our access pattern to the temporary file would be very erratic; on average
- * we'd read one block from each of M source tapes during the same time that
- * we're writing M blocks to the output tape, so there is no sequentiality of
- * access at all, defeating the read-ahead methods used by most Unix kernels.
- * Worse, the output tape gets written into a very random sequence of blocks
- * of the temp file, ensuring that things will be even worse when it comes
- * time to read that tape. A straightforward merge pass thus ends up doing a
- * lot of waiting for disk seeks. We can improve matters by prereading from
- * each source tape sequentially, loading about workMem/M bytes from each tape
- * in turn. Then we run the merge algorithm, writing but not reading until
- * one of the preloaded tuple series runs out. Then we switch back to preread
- * mode, fill memory again, and repeat. This approach helps to localize both
- * read and write accesses.
+ * each source run; we repeatedly output the smallest tuple and replace it
+ * with the next tuple from its source tape (if any). When the heap empties,
+ * the merge is complete. The basic merge algorithm thus needs very little
+ * memory --- only M tuples for an M-way merge, and M is constrained to a
+ * small number. However, we can still make good use of our full workMem
+ * allocation by pre-reading additional blocks from each source tape. Without
+ * prereading, our access pattern to the temporary file would be very erratic;
+ * on average we'd read one block from each of M source tapes during the same
+ * time that we're writing M blocks to the output tape, so there is no
+ * sequentiality of access at all, defeating the read-ahead methods used by
+ * most Unix kernels. Worse, the output tape gets written into a very random
+ * sequence of blocks of the temp file, ensuring that things will be even
+ * worse when it comes time to read that tape. A straightforward merge pass
+ * thus ends up doing a lot of waiting for disk seeks. We can improve matters
+ * by prereading from each source tape sequentially, loading about workMem/M
+ * bytes from each tape in turn, and making the sequential blocks immediately
+ * available for reuse. This approach helps to localize both read and write
+ * accesses. The pre-reading is handled by logtape.c, we just tell it how
+ * much memory to use for the buffers.
*
* When the caller requests random access to the sort result, we form
* the final sorted run on a logical tape which is then "frozen", so
* code we determine the number of tapes M on the basis of workMem: we want
* workMem/M to be large enough that we read a fair amount of data each time
* we preread from a tape, so as to maintain the locality of access described
- * above. Nonetheless, with large workMem we can have many tapes.
+ * above. Nonetheless, with large workMem we can have many tapes (but not
+ * too many -- see the comments in tuplesort_merge_order).
*
+ * This module supports parallel sorting. Parallel sorts involve coordination
+ * among one or more worker processes, and a leader process, each with its own
+ * tuplesort state. The leader process (or, more accurately, the
+ * Tuplesortstate associated with a leader process) creates a full tapeset
+ * consisting of worker tapes with one run to merge; a run for every
+ * worker process. This is then merged. Worker processes are guaranteed to
+ * produce exactly one output run from their partial input.
*
- * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
#include <limits.h>
+#include "access/hash.h"
#include "access/htup_details.h"
#include "access/nbtree.h"
#include "catalog/index.h"
+#include "catalog/pg_am.h"
#include "commands/tablespace.h"
#include "executor/executor.h"
#include "miscadmin.h"
#define DATUM_SORT 2
#define CLUSTER_SORT 3
+/* Sort parallel code from state for sort__start probes */
+#define PARALLEL_SORT(state) ((state)->shared == NULL ? 0 : \
+ (state)->worker >= 0 ? 1 : 2)
+
/* GUC variables */
#ifdef TRACE_SORT
bool trace_sort = false;
* The objects we actually sort are SortTuple structs. These contain
* a pointer to the tuple proper (might be a MinimalTuple or IndexTuple),
* which is a separate palloc chunk --- we assume it is just one chunk and
- * can be freed by a simple pfree(). SortTuples also contain the tuple's
- * first key column in Datum/nullflag format, and an index integer.
+ * can be freed by a simple pfree() (except during merge, when we use a
+ * simple slab allocator). SortTuples also contain the tuple's first key
+ * column in Datum/nullflag format, and an index integer.
*
* Storing the first key column lets us save heap_getattr or index_getattr
* calls during tuple comparisons. We could extract and save all the key
* case where the first key determines the comparison result. Note that
* for a pass-by-reference datatype, datum1 points into the "tuple" storage.
*
+ * There is one special case: when the sort support infrastructure provides an
+ * "abbreviated key" representation, where the key is (typically) a pass by
+ * value proxy for a pass by reference type. In this case, the abbreviated key
+ * is stored in datum1 in place of the actual first key column.
+ *
* When sorting single Datums, the data value is represented directly by
- * datum1/isnull1. If the datatype is pass-by-reference and isnull1 is false,
- * then datum1 points to a separately palloc'd data value that is also pointed
- * to by the "tuple" pointer; otherwise "tuple" is NULL.
+ * datum1/isnull1 for pass by value types (or null values). If the datatype is
+ * pass-by-reference and isnull1 is false, then "tuple" points to a separately
+ * palloc'd data value, otherwise "tuple" is NULL. The value of datum1 is then
+ * either the same pointer as "tuple", or is an abbreviated key value as
+ * described above. Accordingly, "tuple" is always used in preference to
+ * datum1 as the authoritative value for pass-by-reference cases.
*
- * While building initial runs, tupindex holds the tuple's run number. During
- * merge passes, we re-use it to hold the input tape number that each tuple in
- * the heap was read from, or to hold the index of the next tuple pre-read
- * from the same tape in the case of pre-read entries. tupindex goes unused
- * if the sort occurs entirely in memory.
+ * tupindex holds the input tape number that each tuple in the heap was read
+ * from during merge passes.
*/
typedef struct
{
- void *tuple; /* the tuple proper */
+ void *tuple; /* the tuple itself */
Datum datum1; /* value of first key column */
bool isnull1; /* is first key column NULL? */
int tupindex; /* see notes above */
} SortTuple;
+/*
+ * During merge, we use a pre-allocated set of fixed-size slots to hold
+ * tuples. To avoid palloc/pfree overhead.
+ *
+ * Merge doesn't require a lot of memory, so we can afford to waste some,
+ * by using gratuitously-sized slots. If a tuple is larger than 1 kB, the
+ * palloc() overhead is not significant anymore.
+ *
+ * 'nextfree' is valid when this chunk is in the free list. When in use, the
+ * slot holds a tuple.
+ */
+#define SLAB_SLOT_SIZE 1024
+
+typedef union SlabSlot
+{
+ union SlabSlot *nextfree;
+ char buffer[SLAB_SLOT_SIZE];
+} SlabSlot;
/*
* Possible states of a Tuplesort object. These denote the states that
* Parameters for calculation of number of tapes to use --- see inittapes()
* and tuplesort_merge_order().
*
- * In this calculation we assume that each tape will cost us about 3 blocks
- * worth of buffer space (which is an underestimate for very large data
- * volumes, but it's probably close enough --- see logtape.c).
+ * In this calculation we assume that each tape will cost us about 1 blocks
+ * worth of buffer space. This ignores the overhead of all the other data
+ * structures needed for each tape, but it's probably close enough.
*
* MERGE_BUFFER_SIZE is how much data we'd like to read from each input
* tape during a preread cycle (see discussion at top of file).
*/
#define MINORDER 6 /* minimum merge order */
-#define TAPE_BUFFER_OVERHEAD (BLCKSZ * 3)
+#define MAXORDER 500 /* maximum merge order */
+#define TAPE_BUFFER_OVERHEAD BLCKSZ
#define MERGE_BUFFER_SIZE (BLCKSZ * 32)
typedef int (*SortTupleComparator) (const SortTuple *a, const SortTuple *b,
- Tuplesortstate *state);
+ Tuplesortstate *state);
/*
* Private state of a Tuplesort operation.
* tuples to return? */
bool boundUsed; /* true if we made use of a bounded heap */
int bound; /* if bounded, the maximum number of tuples */
+ bool tuples; /* Can SortTuple.tuple ever be set? */
int64 availMem; /* remaining memory available, in bytes */
int64 allowedMem; /* total memory allowed, in bytes */
int maxTapes; /* number of tapes (Knuth's T) */
int tapeRange; /* maxTapes-1 (Knuth's P) */
- MemoryContext sortcontext; /* memory context holding all sort data */
+ MemoryContext sortcontext; /* memory context holding most sort data */
+ MemoryContext tuplecontext; /* sub-context of sortcontext for tuple data */
LogicalTapeSet *tapeset; /* logtape.c object for tapes in a temp file */
/*
/*
* Function to write a stored tuple onto tape. The representation of the
* tuple on tape need not be the same as it is in memory; requirements on
- * the tape representation are given below. After writing the tuple,
- * pfree() the out-of-line data (not the SortTuple struct!), and increase
- * state->availMem by the amount of memory space thereby released.
+ * the tape representation are given below. Unless the slab allocator is
+ * used, after writing the tuple, pfree() the out-of-line data (not the
+ * SortTuple struct!), and increase state->availMem by the amount of
+ * memory space thereby released.
*/
void (*writetup) (Tuplesortstate *state, int tapenum,
- SortTuple *stup);
+ SortTuple *stup);
/*
* Function to read a stored tuple from tape back into memory. 'len' is
- * the already-read length of the stored tuple. Create a palloc'd copy,
- * initialize tuple/datum1/isnull1 in the target SortTuple struct, and
- * decrease state->availMem by the amount of memory space consumed.
+ * the already-read length of the stored tuple. The tuple is allocated
+ * from the slab memory arena, or is palloc'd, see readtup_alloc().
*/
void (*readtup) (Tuplesortstate *state, SortTuple *stup,
- int tapenum, unsigned int len);
-
- /*
- * Function to reverse the sort direction from its current state. (We
- * could dispense with this if we wanted to enforce that all variants
- * represent the sort key information alike.)
- */
- void (*reversedirection) (Tuplesortstate *state);
+ int tapenum, unsigned int len);
/*
* This array holds the tuples now in sort memory. If we are in state
* INITIAL, the tuples are in no particular order; if we are in state
* SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
* and FINALMERGE, the tuples are organized in "heap" order per Algorithm
- * H. (Note that memtupcount only counts the tuples that are part of the
- * heap --- during merge passes, memtuples[] entries beyond tapeRange are
- * never in the heap and are used to hold pre-read tuples.) In state
- * SORTEDONTAPE, the array is not used.
+ * H. In state SORTEDONTAPE, the array is not used.
*/
SortTuple *memtuples; /* array of SortTuple structs */
int memtupcount; /* number of tuples currently present */
bool growmemtuples; /* memtuples' growth still underway? */
/*
- * While building initial runs, this is the current output run number
- * (starting at 0). Afterwards, it is the number of initial runs we made.
+ * Memory for tuples is sometimes allocated using a simple slab allocator,
+ * rather than with palloc(). Currently, we switch to slab allocation
+ * when we start merging. Merging only needs to keep a small, fixed
+ * number of tuples in memory at any time, so we can avoid the
+ * palloc/pfree overhead by recycling a fixed number of fixed-size slots
+ * to hold the tuples.
+ *
+ * For the slab, we use one large allocation, divided into SLAB_SLOT_SIZE
+ * slots. The allocation is sized to have one slot per tape, plus one
+ * additional slot. We need that many slots to hold all the tuples kept
+ * in the heap during merge, plus the one we have last returned from the
+ * sort, with tuplesort_gettuple.
+ *
+ * Initially, all the slots are kept in a linked list of free slots. When
+ * a tuple is read from a tape, it is put to the next available slot, if
+ * it fits. If the tuple is larger than SLAB_SLOT_SIZE, it is palloc'd
+ * instead.
+ *
+ * When we're done processing a tuple, we return the slot back to the free
+ * list, or pfree() if it was palloc'd. We know that a tuple was
+ * allocated from the slab, if its pointer value is between
+ * slabMemoryBegin and -End.
+ *
+ * When the slab allocator is used, the USEMEM/LACKMEM mechanism of
+ * tracking memory usage is not used.
+ */
+ bool slabAllocatorUsed;
+
+ char *slabMemoryBegin; /* beginning of slab memory arena */
+ char *slabMemoryEnd; /* end of slab memory arena */
+ SlabSlot *slabFreeHead; /* head of free list */
+
+ /* Buffer size to use for reading input tapes, during merge. */
+ size_t read_buffer_size;
+
+ /*
+ * When we return a tuple to the caller in tuplesort_gettuple_XXX, that
+ * came from a tape (that is, in TSS_SORTEDONTAPE or TSS_FINALMERGE
+ * modes), we remember the tuple in 'lastReturnedTuple', so that we can
+ * recycle the memory on next gettuple call.
+ */
+ void *lastReturnedTuple;
+
+ /*
+ * While building initial runs, this is the current output run number.
+ * Afterwards, it is the number of initial runs we made.
*/
int currentRun;
*/
/*
- * These variables are only used during merge passes. mergeactive[i] is
- * true if we are reading an input run from (actual) tape number i and
- * have not yet exhausted that run. mergenext[i] is the memtuples index
- * of the next pre-read tuple (next to be loaded into the heap) for tape
- * i, or 0 if we are out of pre-read tuples. mergelast[i] similarly
- * points to the last pre-read tuple from each tape. mergeavailslots[i]
- * is the number of unused memtuples[] slots reserved for tape i, and
- * mergeavailmem[i] is the amount of unused space allocated for tape i.
- * mergefreelist and mergefirstfree keep track of unused locations in the
- * memtuples[] array. The memtuples[].tupindex fields link together
- * pre-read tuples for each tape as well as recycled locations in
- * mergefreelist. It is OK to use 0 as a null link in these lists, because
- * memtuples[0] is part of the merge heap and is never a pre-read tuple.
+ * This variable is only used during merge passes. mergeactive[i] is true
+ * if we are reading an input run from (actual) tape number i and have not
+ * yet exhausted that run.
*/
bool *mergeactive; /* active input run source? */
- int *mergenext; /* first preread tuple for each source */
- int *mergelast; /* last preread tuple for each source */
- int *mergeavailslots; /* slots left for prereading each tape */
- int64 *mergeavailmem; /* availMem for prereading each tape */
- int mergefreelist; /* head of freelist of recycled slots */
- int mergefirstfree; /* first slot never used in this merge */
/*
* Variables for Algorithm D. Note that destTape is a "logical" tape
bool markpos_eof; /* saved "eof_reached" */
/*
- * These variables are specific to the MinimalTuple case; they are set by
- * tuplesort_begin_heap and used only by the MinimalTuple routines.
+ * These variables are used during parallel sorting.
+ *
+ * worker is our worker identifier. Follows the general convention that
+ * -1 value relates to a leader tuplesort, and values >= 0 worker
+ * tuplesorts. (-1 can also be a serial tuplesort.)
+ *
+ * shared is mutable shared memory state, which is used to coordinate
+ * parallel sorts.
+ *
+ * nParticipants is the number of worker Tuplesortstates known by the
+ * leader to have actually been launched, which implies that they must
+ * finish a run leader can merge. Typically includes a worker state held
+ * by the leader process itself. Set in the leader Tuplesortstate only.
+ */
+ int worker;
+ Sharedsort *shared;
+ int nParticipants;
+
+ /*
+ * The sortKeys variable is used by every case other than the hash index
+ * case; it is set by tuplesort_begin_xxx. tupDesc is only used by the
+ * MinimalTuple and CLUSTER routines, though.
*/
TupleDesc tupDesc;
SortSupport sortKeys; /* array of length nKeys */
*/
SortSupport onlyKey;
+ /*
+ * Additional state for managing "abbreviated key" sortsupport routines
+ * (which currently may be used by all cases except the hash index case).
+ * Tracks the intervals at which the optimization's effectiveness is
+ * tested.
+ */
+ int64 abbrevNext; /* Tuple # at which to next check
+ * applicability */
+
/*
* These variables are specific to the CLUSTER case; they are set by
- * tuplesort_begin_cluster. Note CLUSTER also uses tupDesc and
- * indexScanKey.
+ * tuplesort_begin_cluster.
*/
IndexInfo *indexInfo; /* info about index being used for reference */
EState *estate; /* for evaluating index expressions */
Relation indexRel; /* index being built */
/* These are specific to the index_btree subcase: */
- ScanKey indexScanKey;
bool enforceUnique; /* complain if we find duplicate tuples */
/* These are specific to the index_hash subcase: */
- uint32 hash_mask; /* mask for sortable part of hash code */
+ uint32 high_mask; /* masks for sortable part of hash code */
+ uint32 low_mask;
+ uint32 max_buckets;
/*
* These variables are specific to the Datum case; they are set by
* tuplesort_begin_datum and used only by the DatumTuple routines.
*/
Oid datumType;
- /* we need typelen and byval in order to know how to copy the Datums. */
+ /* we need typelen in order to know how to copy the Datums. */
int datumTypeLen;
- bool datumTypeByVal;
/*
* Resource snapshot for time of sort start.
#endif
};
+/*
+ * Private mutable state of tuplesort-parallel-operation. This is allocated
+ * in shared memory.
+ */
+struct Sharedsort
+{
+ /* mutex protects all fields prior to tapes */
+ slock_t mutex;
+
+ /*
+ * currentWorker generates ordinal identifier numbers for parallel sort
+ * workers. These start from 0, and are always gapless.
+ *
+ * Workers increment workersFinished to indicate having finished. If this
+ * is equal to state.nParticipants within the leader, leader is ready to
+ * merge worker runs.
+ */
+ int currentWorker;
+ int workersFinished;
+
+ /* Temporary file space */
+ SharedFileSet fileset;
+
+ /* Size of tapes flexible array */
+ int nTapes;
+
+ /*
+ * Tapes array used by workers to report back information needed by the
+ * leader to concatenate all worker tapes into one for merging
+ */
+ TapeShare tapes[FLEXIBLE_ARRAY_MEMBER];
+};
+
+/*
+ * Is the given tuple allocated from the slab memory arena?
+ */
+#define IS_SLAB_SLOT(state, tuple) \
+ ((char *) (tuple) >= (state)->slabMemoryBegin && \
+ (char *) (tuple) < (state)->slabMemoryEnd)
+
+/*
+ * Return the given tuple to the slab memory free list, or free it
+ * if it was palloc'd.
+ */
+#define RELEASE_SLAB_SLOT(state, tuple) \
+ do { \
+ SlabSlot *buf = (SlabSlot *) tuple; \
+ \
+ if (IS_SLAB_SLOT((state), buf)) \
+ { \
+ buf->nextfree = (state)->slabFreeHead; \
+ (state)->slabFreeHead = buf; \
+ } else \
+ pfree(buf); \
+ } while(0)
+
#define COMPARETUP(state,a,b) ((*(state)->comparetup) (a, b, state))
#define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup))
#define WRITETUP(state,tape,stup) ((*(state)->writetup) (state, tape, stup))
#define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len))
-#define REVERSEDIRECTION(state) ((*(state)->reversedirection) (state))
-#define LACKMEM(state) ((state)->availMem < 0)
+#define LACKMEM(state) ((state)->availMem < 0 && !(state)->slabAllocatorUsed)
#define USEMEM(state,amt) ((state)->availMem -= (amt))
#define FREEMEM(state,amt) ((state)->availMem += (amt))
+#define SERIAL(state) ((state)->shared == NULL)
+#define WORKER(state) ((state)->shared && (state)->worker != -1)
+#define LEADER(state) ((state)->shared && (state)->worker == -1)
/*
* NOTES about on-tape representation of tuples:
* rather than the originally-requested size. This is important since
* palloc can add substantial overhead. It's not a complete answer since
* we won't count any wasted space in palloc allocation blocks, but it's
- * a lot better than what we were doing before 7.3.
+ * a lot better than what we were doing before 7.3. As of 9.6, a
+ * separate memory context is used for caller passed tuples. Resetting
+ * it at certain key increments significantly ameliorates fragmentation.
+ * Note that this places a responsibility on readtup and copytup routines
+ * to use the right memory context for these tuples (and to not use the
+ * reset context for anything whose lifetime needs to span multiple
+ * external sort runs).
*/
/* When using this macro, beware of double evaluation of len */
} while(0)
-static Tuplesortstate *tuplesort_begin_common(int workMem, bool randomAccess);
+static Tuplesortstate *tuplesort_begin_common(int workMem,
+ SortCoordinate coordinate,
+ bool randomAccess);
static void puttuple_common(Tuplesortstate *state, SortTuple *tuple);
-static void inittapes(Tuplesortstate *state);
+static bool consider_abort_common(Tuplesortstate *state);
+static void inittapes(Tuplesortstate *state, bool mergeruns);
+static void inittapestate(Tuplesortstate *state, int maxTapes);
static void selectnewtape(Tuplesortstate *state);
+static void init_slab_allocator(Tuplesortstate *state, int numSlots);
static void mergeruns(Tuplesortstate *state);
static void mergeonerun(Tuplesortstate *state);
static void beginmerge(Tuplesortstate *state);
-static void mergepreread(Tuplesortstate *state);
-static void mergeprereadone(Tuplesortstate *state, int srcTape);
+static bool mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup);
static void dumptuples(Tuplesortstate *state, bool alltuples);
static void make_bounded_heap(Tuplesortstate *state);
static void sort_bounded_heap(Tuplesortstate *state);
-static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple,
- int tupleindex, bool checkIndex);
-static void tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex);
+static void tuplesort_sort_memtuples(Tuplesortstate *state);
+static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple);
+static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple);
+static void tuplesort_heap_delete_top(Tuplesortstate *state);
+static void reversedirection(Tuplesortstate *state);
static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
static void markrunend(Tuplesortstate *state, int tapenum);
+static void *readtup_alloc(Tuplesortstate *state, Size tuplen);
static int comparetup_heap(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state);
static void copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup);
SortTuple *stup);
static void readtup_heap(Tuplesortstate *state, SortTuple *stup,
int tapenum, unsigned int len);
-static void reversedirection_heap(Tuplesortstate *state);
static int comparetup_cluster(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state);
static void copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup);
SortTuple *stup);
static void readtup_index(Tuplesortstate *state, SortTuple *stup,
int tapenum, unsigned int len);
-static void reversedirection_index_btree(Tuplesortstate *state);
-static void reversedirection_index_hash(Tuplesortstate *state);
static int comparetup_datum(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state);
static void copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup);
SortTuple *stup);
static void readtup_datum(Tuplesortstate *state, SortTuple *stup,
int tapenum, unsigned int len);
-static void reversedirection_datum(Tuplesortstate *state);
+static int worker_get_identifier(Tuplesortstate *state);
+static void worker_freeze_result_tape(Tuplesortstate *state);
+static void worker_nomergeruns(Tuplesortstate *state);
+static void leader_takeover_tapes(Tuplesortstate *state);
static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
/*
*/
static Tuplesortstate *
-tuplesort_begin_common(int workMem, bool randomAccess)
+tuplesort_begin_common(int workMem, SortCoordinate coordinate,
+ bool randomAccess)
{
Tuplesortstate *state;
MemoryContext sortcontext;
+ MemoryContext tuplecontext;
MemoryContext oldcontext;
+ /* See leader_takeover_tapes() remarks on randomAccess support */
+ if (coordinate && randomAccess)
+ elog(ERROR, "random access disallowed under parallel sort");
+
/*
* Create a working memory context for this sort operation. All data
* needed by the sort will live inside this context.
*/
sortcontext = AllocSetContextCreate(CurrentMemoryContext,
- "TupleSort",
- ALLOCSET_DEFAULT_MINSIZE,
- ALLOCSET_DEFAULT_INITSIZE,
- ALLOCSET_DEFAULT_MAXSIZE);
+ "TupleSort main",
+ ALLOCSET_DEFAULT_SIZES);
+
+ /*
+ * Caller tuple (e.g. IndexTuple) memory context.
+ *
+ * A dedicated child context used exclusively for caller passed tuples
+ * eases memory management. Resetting at key points reduces
+ * fragmentation. Note that the memtuples array of SortTuples is allocated
+ * in the parent context, not this context, because there is no need to
+ * free memtuples early.
+ */
+ tuplecontext = AllocSetContextCreate(sortcontext,
+ "Caller tuples",
+ ALLOCSET_DEFAULT_SIZES);
/*
* Make the Tuplesortstate within the per-sort context. This way, we
state->status = TSS_INITIAL;
state->randomAccess = randomAccess;
state->bounded = false;
+ state->tuples = true;
state->boundUsed = false;
- state->allowedMem = workMem * (int64) 1024;
+
+ /*
+ * workMem is forced to be at least 64KB, the current minimum valid value
+ * for the work_mem GUC. This is a defense against parallel sort callers
+ * that divide out memory among many workers in a way that leaves each
+ * with very little memory.
+ */
+ state->allowedMem = Max(workMem, 64) * (int64) 1024;
state->availMem = state->allowedMem;
state->sortcontext = sortcontext;
+ state->tuplecontext = tuplecontext;
state->tapeset = NULL;
state->memtupcount = 0;
- state->memtupsize = 1024; /* initial guess */
+
+ /*
+ * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
+ * see comments in grow_memtuples().
+ */
+ state->memtupsize = Max(1024,
+ ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1);
+
state->growmemtuples = true;
+ state->slabAllocatorUsed = false;
state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
USEMEM(state, GetMemoryChunkSpace(state->memtuples));
state->result_tape = -1; /* flag that result tape has not been formed */
+ /*
+ * Initialize parallel-related state based on coordination information
+ * from caller
+ */
+ if (!coordinate)
+ {
+ /* Serial sort */
+ state->shared = NULL;
+ state->worker = -1;
+ state->nParticipants = -1;
+ }
+ else if (coordinate->isWorker)
+ {
+ /* Parallel worker produces exactly one final run from all input */
+ state->shared = coordinate->sharedsort;
+ state->worker = worker_get_identifier(state);
+ state->nParticipants = -1;
+ }
+ else
+ {
+ /* Parallel leader state only used for final merge */
+ state->shared = coordinate->sharedsort;
+ state->worker = -1;
+ state->nParticipants = coordinate->nParticipants;
+ Assert(state->nParticipants >= 1);
+ }
+
MemoryContextSwitchTo(oldcontext);
return state;
int nkeys, AttrNumber *attNums,
Oid *sortOperators, Oid *sortCollations,
bool *nullsFirstFlags,
- int workMem, bool randomAccess)
+ int workMem, SortCoordinate coordinate, bool randomAccess)
{
- Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
+ Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+ randomAccess);
MemoryContext oldcontext;
int i;
false, /* no unique check */
nkeys,
workMem,
- randomAccess);
+ randomAccess,
+ PARALLEL_SORT(state));
state->comparetup = comparetup_heap;
state->copytup = copytup_heap;
state->writetup = writetup_heap;
state->readtup = readtup_heap;
- state->reversedirection = reversedirection_heap;
state->tupDesc = tupDesc; /* assume we need not copy tupDesc */
+ state->abbrevNext = 10;
/* Prepare SortSupport data for each column */
state->sortKeys = (SortSupport) palloc0(nkeys * sizeof(SortSupportData));
sortKey->ssup_collation = sortCollations[i];
sortKey->ssup_nulls_first = nullsFirstFlags[i];
sortKey->ssup_attno = attNums[i];
+ /* Convey if abbreviation optimization is applicable in principle */
+ sortKey->abbreviate = (i == 0);
PrepareSortSupportFromOrderingOp(sortOperators[i], sortKey);
}
- if (nkeys == 1)
+ /*
+ * The "onlyKey" optimization cannot be used with abbreviated keys, since
+ * tie-breaker comparisons may be required. Typically, the optimization
+ * is only of value to pass-by-value types anyway, whereas abbreviated
+ * keys are typically only of value to pass-by-reference types.
+ */
+ if (nkeys == 1 && !state->sortKeys->abbrev_converter)
state->onlyKey = state->sortKeys;
MemoryContextSwitchTo(oldcontext);
Tuplesortstate *
tuplesort_begin_cluster(TupleDesc tupDesc,
Relation indexRel,
- int workMem, bool randomAccess)
+ int workMem,
+ SortCoordinate coordinate, bool randomAccess)
{
- Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
+ Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+ randomAccess);
+ BTScanInsert indexScanKey;
MemoryContext oldcontext;
+ int i;
Assert(indexRel->rd_rel->relam == BTREE_AM_OID);
workMem, randomAccess ? 't' : 'f');
#endif
- state->nKeys = RelationGetNumberOfAttributes(indexRel);
+ state->nKeys = IndexRelationGetNumberOfKeyAttributes(indexRel);
TRACE_POSTGRESQL_SORT_START(CLUSTER_SORT,
false, /* no unique check */
state->nKeys,
workMem,
- randomAccess);
+ randomAccess,
+ PARALLEL_SORT(state));
state->comparetup = comparetup_cluster;
state->copytup = copytup_cluster;
state->writetup = writetup_cluster;
state->readtup = readtup_cluster;
- state->reversedirection = reversedirection_index_btree;
+ state->abbrevNext = 10;
state->indexInfo = BuildIndexInfo(indexRel);
- state->indexScanKey = _bt_mkscankey_nodata(indexRel);
state->tupDesc = tupDesc; /* assume we need not copy tupDesc */
+ indexScanKey = _bt_mkscankey(indexRel, NULL);
+
if (state->indexInfo->ii_Expressions != NULL)
{
TupleTableSlot *slot;
* scantuple has to point to that slot, too.
*/
state->estate = CreateExecutorState();
- slot = MakeSingleTupleTableSlot(tupDesc);
+ slot = MakeSingleTupleTableSlot(tupDesc, &TTSOpsVirtual);
econtext = GetPerTupleExprContext(state->estate);
econtext->ecxt_scantuple = slot;
}
+ /* Prepare SortSupport data for each column */
+ state->sortKeys = (SortSupport) palloc0(state->nKeys *
+ sizeof(SortSupportData));
+
+ for (i = 0; i < state->nKeys; i++)
+ {
+ SortSupport sortKey = state->sortKeys + i;
+ ScanKey scanKey = indexScanKey->scankeys + i;
+ int16 strategy;
+
+ sortKey->ssup_cxt = CurrentMemoryContext;
+ sortKey->ssup_collation = scanKey->sk_collation;
+ sortKey->ssup_nulls_first =
+ (scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0;
+ sortKey->ssup_attno = scanKey->sk_attno;
+ /* Convey if abbreviation optimization is applicable in principle */
+ sortKey->abbreviate = (i == 0);
+
+ AssertState(sortKey->ssup_attno != 0);
+
+ strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ?
+ BTGreaterStrategyNumber : BTLessStrategyNumber;
+
+ PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey);
+ }
+
+ pfree(indexScanKey);
+
MemoryContextSwitchTo(oldcontext);
return state;
tuplesort_begin_index_btree(Relation heapRel,
Relation indexRel,
bool enforceUnique,
- int workMem, bool randomAccess)
+ int workMem,
+ SortCoordinate coordinate,
+ bool randomAccess)
{
- Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
+ Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+ randomAccess);
+ BTScanInsert indexScanKey;
MemoryContext oldcontext;
+ int i;
oldcontext = MemoryContextSwitchTo(state->sortcontext);
workMem, randomAccess ? 't' : 'f');
#endif
- state->nKeys = RelationGetNumberOfAttributes(indexRel);
+ state->nKeys = IndexRelationGetNumberOfKeyAttributes(indexRel);
TRACE_POSTGRESQL_SORT_START(INDEX_SORT,
enforceUnique,
state->nKeys,
workMem,
- randomAccess);
+ randomAccess,
+ PARALLEL_SORT(state));
state->comparetup = comparetup_index_btree;
state->copytup = copytup_index;
state->writetup = writetup_index;
state->readtup = readtup_index;
- state->reversedirection = reversedirection_index_btree;
+ state->abbrevNext = 10;
state->heapRel = heapRel;
state->indexRel = indexRel;
- state->indexScanKey = _bt_mkscankey_nodata(indexRel);
state->enforceUnique = enforceUnique;
+ indexScanKey = _bt_mkscankey(indexRel, NULL);
+
+ /* Prepare SortSupport data for each column */
+ state->sortKeys = (SortSupport) palloc0(state->nKeys *
+ sizeof(SortSupportData));
+
+ for (i = 0; i < state->nKeys; i++)
+ {
+ SortSupport sortKey = state->sortKeys + i;
+ ScanKey scanKey = indexScanKey->scankeys + i;
+ int16 strategy;
+
+ sortKey->ssup_cxt = CurrentMemoryContext;
+ sortKey->ssup_collation = scanKey->sk_collation;
+ sortKey->ssup_nulls_first =
+ (scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0;
+ sortKey->ssup_attno = scanKey->sk_attno;
+ /* Convey if abbreviation optimization is applicable in principle */
+ sortKey->abbreviate = (i == 0);
+
+ AssertState(sortKey->ssup_attno != 0);
+
+ strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ?
+ BTGreaterStrategyNumber : BTLessStrategyNumber;
+
+ PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey);
+ }
+
+ pfree(indexScanKey);
+
MemoryContextSwitchTo(oldcontext);
return state;
Tuplesortstate *
tuplesort_begin_index_hash(Relation heapRel,
Relation indexRel,
- uint32 hash_mask,
- int workMem, bool randomAccess)
+ uint32 high_mask,
+ uint32 low_mask,
+ uint32 max_buckets,
+ int workMem,
+ SortCoordinate coordinate,
+ bool randomAccess)
{
- Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
+ Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+ randomAccess);
MemoryContext oldcontext;
oldcontext = MemoryContextSwitchTo(state->sortcontext);
#ifdef TRACE_SORT
if (trace_sort)
elog(LOG,
- "begin index sort: hash_mask = 0x%x, workMem = %d, randomAccess = %c",
- hash_mask,
+ "begin index sort: high_mask = 0x%x, low_mask = 0x%x, "
+ "max_buckets = 0x%x, workMem = %d, randomAccess = %c",
+ high_mask,
+ low_mask,
+ max_buckets,
workMem, randomAccess ? 't' : 'f');
#endif
state->copytup = copytup_index;
state->writetup = writetup_index;
state->readtup = readtup_index;
- state->reversedirection = reversedirection_index_hash;
state->heapRel = heapRel;
state->indexRel = indexRel;
- state->hash_mask = hash_mask;
+ state->high_mask = high_mask;
+ state->low_mask = low_mask;
+ state->max_buckets = max_buckets;
MemoryContextSwitchTo(oldcontext);
Tuplesortstate *
tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
- bool nullsFirstFlag,
- int workMem, bool randomAccess)
+ bool nullsFirstFlag, int workMem,
+ SortCoordinate coordinate, bool randomAccess)
{
- Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
+ Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+ randomAccess);
MemoryContext oldcontext;
int16 typlen;
bool typbyval;
false, /* no unique check */
1,
workMem,
- randomAccess);
+ randomAccess,
+ PARALLEL_SORT(state));
state->comparetup = comparetup_datum;
state->copytup = copytup_datum;
state->writetup = writetup_datum;
state->readtup = readtup_datum;
- state->reversedirection = reversedirection_datum;
+ state->abbrevNext = 10;
state->datumType = datumType;
+ /* lookup necessary attributes of the datum type */
+ get_typlenbyval(datumType, &typlen, &typbyval);
+ state->datumTypeLen = typlen;
+ state->tuples = !typbyval;
+
/* Prepare SortSupport data */
- state->onlyKey = (SortSupport) palloc0(sizeof(SortSupportData));
+ state->sortKeys = (SortSupport) palloc0(sizeof(SortSupportData));
+
+ state->sortKeys->ssup_cxt = CurrentMemoryContext;
+ state->sortKeys->ssup_collation = sortCollation;
+ state->sortKeys->ssup_nulls_first = nullsFirstFlag;
- state->onlyKey->ssup_cxt = CurrentMemoryContext;
- state->onlyKey->ssup_collation = sortCollation;
- state->onlyKey->ssup_nulls_first = nullsFirstFlag;
+ /*
+ * Abbreviation is possible here only for by-reference types. In theory,
+ * a pass-by-value datatype could have an abbreviated form that is cheaper
+ * to compare. In a tuple sort, we could support that, because we can
+ * always extract the original datum from the tuple is needed. Here, we
+ * can't, because a datum sort only stores a single copy of the datum; the
+ * "tuple" field of each sortTuple is NULL.
+ */
+ state->sortKeys->abbreviate = !typbyval;
- PrepareSortSupportFromOrderingOp(sortOperator, state->onlyKey);
+ PrepareSortSupportFromOrderingOp(sortOperator, state->sortKeys);
- /* lookup necessary attributes of the datum type */
- get_typlenbyval(datumType, &typlen, &typbyval);
- state->datumTypeLen = typlen;
- state->datumTypeByVal = typbyval;
+ /*
+ * The "onlyKey" optimization cannot be used with abbreviated keys, since
+ * tie-breaker comparisons may be required. Typically, the optimization
+ * is only of value to pass-by-value types anyway, whereas abbreviated
+ * keys are typically only of value to pass-by-reference types.
+ */
+ if (!state->sortKeys->abbrev_converter)
+ state->onlyKey = state->sortKeys;
MemoryContextSwitchTo(oldcontext);
* delayed calls at the moment.)
*
* This is a hint only. The tuplesort may still return more tuples than
- * requested.
+ * requested. Parallel leader tuplesorts will always ignore the hint.
*/
void
tuplesort_set_bound(Tuplesortstate *state, int64 bound)
Assert(state->status == TSS_INITIAL);
Assert(state->memtupcount == 0);
Assert(!state->bounded);
+ Assert(!WORKER(state));
#ifdef DEBUG_BOUNDED_SORT
/* Honor GUC setting that disables the feature (for easy testing) */
return;
#endif
+ /* Parallel leader ignores hint */
+ if (LEADER(state))
+ return;
+
/* We want to be able to compute bound * 2, so limit the setting */
if (bound > (int64) (INT_MAX / 2))
return;
state->bounded = true;
state->bound = (int) bound;
+
+ /*
+ * Bounded sorts are not an effective target for abbreviated key
+ * optimization. Disable by setting state to be consistent with no
+ * abbreviation support.
+ */
+ state->sortKeys->abbrev_converter = NULL;
+ if (state->sortKeys->abbrev_full_comparator)
+ state->sortKeys->comparator = state->sortKeys->abbrev_full_comparator;
+
+ /* Not strictly necessary, but be tidy */
+ state->sortKeys->abbrev_abort = NULL;
+ state->sortKeys->abbrev_full_comparator = NULL;
}
/*
if (trace_sort)
{
if (state->tapeset)
- elog(LOG, "external sort ended, %ld disk blocks used: %s",
- spaceUsed, pg_rusage_show(&state->ru_start));
+ elog(LOG, "%s of worker %d ended, %ld disk blocks used: %s",
+ SERIAL(state) ? "external sort" : "parallel external sort",
+ state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
else
- elog(LOG, "internal sort ended, %ld KB used: %s",
- spaceUsed, pg_rusage_show(&state->ru_start));
+ elog(LOG, "%s of worker %d ended, %ld KB used: %s",
+ SERIAL(state) ? "internal sort" : "unperformed parallel sort",
+ state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
}
TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, spaceUsed);
/*
* Grow the memtuples[] array, if possible within our memory constraint. We
* must not exceed INT_MAX tuples in memory or the caller-provided memory
- * limit. Return TRUE if we were able to enlarge the array, FALSE if not.
+ * limit. Return true if we were able to enlarge the array, false if not.
*
* Normally, at each increment we double the size of the array. When doing
* that would exceed a limit, we attempt one last, smaller increase (and then
* never generate a dangerous request, but to be safe, check explicitly
* that the array growth fits within availMem. (We could still cause
* LACKMEM if the memory chunk overhead associated with the memtuples
- * array were to increase. That shouldn't happen with any sane value of
- * allowedMem, because at any array size large enough to risk LACKMEM,
- * palloc would be treating both old and new arrays as separate chunks.
- * But we'll check LACKMEM explicitly below just in case.)
+ * array were to increase. That shouldn't happen because we chose the
+ * initial array size large enough to ensure that palloc will be treating
+ * both old and new arrays as separate chunks. But we'll check LACKMEM
+ * explicitly below just in case.)
*/
if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(SortTuple)))
goto noalloc;
state->memtupsize * sizeof(SortTuple));
USEMEM(state, GetMemoryChunkSpace(state->memtuples));
if (LACKMEM(state))
- elog(ERROR, "unexpected out-of-memory situation during sort");
+ elog(ERROR, "unexpected out-of-memory situation in tuplesort");
return true;
noalloc:
ItemPointer self, Datum *values,
bool *isnull)
{
- MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
+ MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
SortTuple stup;
+ Datum original;
+ IndexTuple tuple;
stup.tuple = index_form_tuple(RelationGetDescr(rel), values, isnull);
- ((IndexTuple) stup.tuple)->t_tid = *self;
+ tuple = ((IndexTuple) stup.tuple);
+ tuple->t_tid = *self;
USEMEM(state, GetMemoryChunkSpace(stup.tuple));
/* set up first-column key value */
- stup.datum1 = index_getattr((IndexTuple) stup.tuple,
- 1,
- RelationGetDescr(state->indexRel),
- &stup.isnull1);
+ original = index_getattr(tuple,
+ 1,
+ RelationGetDescr(state->indexRel),
+ &stup.isnull1);
+
+ MemoryContextSwitchTo(state->sortcontext);
+
+ if (!state->sortKeys || !state->sortKeys->abbrev_converter || stup.isnull1)
+ {
+ /*
+ * Store ordinary Datum representation, or NULL value. If there is a
+ * converter it won't expect NULL values, and cost model is not
+ * required to account for NULL, so in that case we avoid calling
+ * converter and just set datum1 to zeroed representation (to be
+ * consistent, and to support cheap inequality tests for NULL
+ * abbreviated keys).
+ */
+ stup.datum1 = original;
+ }
+ else if (!consider_abort_common(state))
+ {
+ /* Store abbreviated key representation */
+ stup.datum1 = state->sortKeys->abbrev_converter(original,
+ state->sortKeys);
+ }
+ else
+ {
+ /* Abort abbreviation */
+ int i;
+
+ stup.datum1 = original;
+
+ /*
+ * Set state to be consistent with never trying abbreviation.
+ *
+ * Alter datum1 representation in already-copied tuples, so as to
+ * ensure a consistent representation (current tuple was just
+ * handled). It does not matter if some dumped tuples are already
+ * sorted on tape, since serialized tuples lack abbreviated keys
+ * (TSS_BUILDRUNS state prevents control reaching here in any case).
+ */
+ for (i = 0; i < state->memtupcount; i++)
+ {
+ SortTuple *mtup = &state->memtuples[i];
+
+ tuple = mtup->tuple;
+ mtup->datum1 = index_getattr(tuple,
+ 1,
+ RelationGetDescr(state->indexRel),
+ &mtup->isnull1);
+ }
+ }
+
puttuple_common(state, &stup);
MemoryContextSwitchTo(oldcontext);
void
tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull)
{
- MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
+ MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
SortTuple stup;
/*
- * If it's a pass-by-reference value, copy it into memory we control, and
- * decrease availMem. Then call the common code.
+ * Pass-by-value types or null values are just stored directly in
+ * stup.datum1 (and stup.tuple is not used and set to NULL).
+ *
+ * Non-null pass-by-reference values need to be copied into memory we
+ * control, and possibly abbreviated. The copied value is pointed to by
+ * stup.tuple and is treated as the canonical copy (e.g. to return via
+ * tuplesort_getdatum or when writing to tape); stup.datum1 gets the
+ * abbreviated value if abbreviation is happening, otherwise it's
+ * identical to stup.tuple.
*/
- if (isNull || state->datumTypeByVal)
+
+ if (isNull || !state->tuples)
{
- stup.datum1 = val;
+ /*
+ * Set datum1 to zeroed representation for NULLs (to be consistent,
+ * and to support cheap inequality tests for NULL abbreviated keys).
+ */
+ stup.datum1 = !isNull ? val : (Datum) 0;
stup.isnull1 = isNull;
stup.tuple = NULL; /* no separate storage */
+ MemoryContextSwitchTo(state->sortcontext);
}
else
{
- stup.datum1 = datumCopy(val, false, state->datumTypeLen);
+ Datum original = datumCopy(val, false, state->datumTypeLen);
+
stup.isnull1 = false;
- stup.tuple = DatumGetPointer(stup.datum1);
+ stup.tuple = DatumGetPointer(original);
USEMEM(state, GetMemoryChunkSpace(stup.tuple));
+ MemoryContextSwitchTo(state->sortcontext);
+
+ if (!state->sortKeys->abbrev_converter)
+ {
+ stup.datum1 = original;
+ }
+ else if (!consider_abort_common(state))
+ {
+ /* Store abbreviated key representation */
+ stup.datum1 = state->sortKeys->abbrev_converter(original,
+ state->sortKeys);
+ }
+ else
+ {
+ /* Abort abbreviation */
+ int i;
+
+ stup.datum1 = original;
+
+ /*
+ * Set state to be consistent with never trying abbreviation.
+ *
+ * Alter datum1 representation in already-copied tuples, so as to
+ * ensure a consistent representation (current tuple was just
+ * handled). It does not matter if some dumped tuples are already
+ * sorted on tape, since serialized tuples lack abbreviated keys
+ * (TSS_BUILDRUNS state prevents control reaching here in any
+ * case).
+ */
+ for (i = 0; i < state->memtupcount; i++)
+ {
+ SortTuple *mtup = &state->memtuples[i];
+
+ mtup->datum1 = PointerGetDatum(mtup->tuple);
+ }
+ }
}
puttuple_common(state, &stup);
static void
puttuple_common(Tuplesortstate *state, SortTuple *tuple)
{
+ Assert(!LEADER(state));
+
switch (state->status)
{
case TSS_INITIAL:
/*
* Nope; time to switch to tape-based operation.
*/
- inittapes(state);
+ inittapes(state, true);
/*
- * Dump tuples until we are back under the limit.
+ * Dump all tuples.
*/
dumptuples(state, false);
break;
}
else
{
- /* discard top of heap, sift up, insert new tuple */
+ /* discard top of heap, replacing it with the new tuple */
free_sort_tuple(state, &state->memtuples[0]);
- tuplesort_heap_siftup(state, false);
- tuplesort_heap_insert(state, tuple, 0, false);
+ tuplesort_heap_replace_top(state, tuple);
}
break;
case TSS_BUILDRUNS:
/*
- * Insert the tuple into the heap, with run number currentRun if
- * it can go into the current run, else run number currentRun+1.
- * The tuple can go into the current run if it is >= the first
- * not-yet-output tuple. (Actually, it could go into the current
- * run if it is >= the most recently output tuple ... but that
- * would require keeping around the tuple we last output, and it's
- * simplest to let writetup free each tuple as soon as it's
- * written.)
- *
- * Note there will always be at least one tuple in the heap at
- * this point; see dumptuples.
+ * Save the tuple into the unsorted array (there must be space)
*/
- Assert(state->memtupcount > 0);
- if (COMPARETUP(state, tuple, &state->memtuples[0]) >= 0)
- tuplesort_heap_insert(state, tuple, state->currentRun, true);
- else
- tuplesort_heap_insert(state, tuple, state->currentRun + 1, true);
+ state->memtuples[state->memtupcount++] = *tuple;
/*
- * If we are over the memory limit, dump tuples till we're under.
+ * If we are over the memory limit, dump all tuples.
*/
dumptuples(state, false);
break;
}
}
+static bool
+consider_abort_common(Tuplesortstate *state)
+{
+ Assert(state->sortKeys[0].abbrev_converter != NULL);
+ Assert(state->sortKeys[0].abbrev_abort != NULL);
+ Assert(state->sortKeys[0].abbrev_full_comparator != NULL);
+
+ /*
+ * Check effectiveness of abbreviation optimization. Consider aborting
+ * when still within memory limit.
+ */
+ if (state->status == TSS_INITIAL &&
+ state->memtupcount >= state->abbrevNext)
+ {
+ state->abbrevNext *= 2;
+
+ /*
+ * Check opclass-supplied abbreviation abort routine. It may indicate
+ * that abbreviation should not proceed.
+ */
+ if (!state->sortKeys->abbrev_abort(state->memtupcount,
+ state->sortKeys))
+ return false;
+
+ /*
+ * Finally, restore authoritative comparator, and indicate that
+ * abbreviation is not in play by setting abbrev_converter to NULL
+ */
+ state->sortKeys[0].comparator = state->sortKeys[0].abbrev_full_comparator;
+ state->sortKeys[0].abbrev_converter = NULL;
+ /* Not strictly necessary, but be tidy */
+ state->sortKeys[0].abbrev_abort = NULL;
+ state->sortKeys[0].abbrev_full_comparator = NULL;
+
+ /* Give up - expect original pass-by-value representation */
+ return true;
+ }
+
+ return false;
+}
+
/*
* All tuples have been provided; finish the sort.
*/
#ifdef TRACE_SORT
if (trace_sort)
- elog(LOG, "performsort starting: %s",
- pg_rusage_show(&state->ru_start));
+ elog(LOG, "performsort of worker %d starting: %s",
+ state->worker, pg_rusage_show(&state->ru_start));
#endif
switch (state->status)
/*
* We were able to accumulate all the tuples within the allowed
- * amount of memory. Just qsort 'em and we're done.
+ * amount of memory, or leader to take over worker tapes
*/
- if (state->memtupcount > 1)
+ if (SERIAL(state))
{
- /* Can we use the single-key sort function? */
- if (state->onlyKey != NULL)
- qsort_ssup(state->memtuples, state->memtupcount,
- state->onlyKey);
- else
- qsort_tuple(state->memtuples,
- state->memtupcount,
- state->comparetup,
- state);
+ /* Just qsort 'em and we're done */
+ tuplesort_sort_memtuples(state);
+ state->status = TSS_SORTEDINMEM;
+ }
+ else if (WORKER(state))
+ {
+ /*
+ * Parallel workers must still dump out tuples to tape. No
+ * merge is required to produce single output run, though.
+ */
+ inittapes(state, false);
+ dumptuples(state, true);
+ worker_nomergeruns(state);
+ state->status = TSS_SORTEDONTAPE;
+ }
+ else
+ {
+ /*
+ * Leader will take over worker tapes and merge worker runs.
+ * Note that mergeruns sets the correct state->status.
+ */
+ leader_takeover_tapes(state);
+ mergeruns(state);
}
state->current = 0;
state->eof_reached = false;
+ state->markpos_block = 0L;
state->markpos_offset = 0;
state->markpos_eof = false;
- state->status = TSS_SORTEDINMEM;
break;
case TSS_BOUNDED:
/*
* Finish tape-based sort. First, flush all tuples remaining in
* memory out to tape; then merge until we have a single remaining
- * run (or, if !randomAccess, one run per tape). Note that
- * mergeruns sets the correct state->status.
+ * run (or, if !randomAccess and !WORKER(), one run per tape).
+ * Note that mergeruns sets the correct state->status.
*/
dumptuples(state, true);
mergeruns(state);
if (trace_sort)
{
if (state->status == TSS_FINALMERGE)
- elog(LOG, "performsort done (except %d-way final merge): %s",
- state->activeTapes,
+ elog(LOG, "performsort of worker %d done (except %d-way final merge): %s",
+ state->worker, state->activeTapes,
pg_rusage_show(&state->ru_start));
else
- elog(LOG, "performsort done: %s",
- pg_rusage_show(&state->ru_start));
+ elog(LOG, "performsort of worker %d done: %s",
+ state->worker, pg_rusage_show(&state->ru_start));
}
#endif
/*
* Internal routine to fetch the next tuple in either forward or back
- * direction into *stup. Returns FALSE if no more tuples.
- * If *should_free is set, the caller must pfree stup.tuple when done with it.
+ * direction into *stup. Returns false if no more tuples.
+ * Returned tuple belongs to tuplesort memory context, and must not be freed
+ * by caller. Note that fetched tuple is stored in memory that may be
+ * recycled by any future fetch.
*/
static bool
tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
- SortTuple *stup, bool *should_free)
+ SortTuple *stup)
{
unsigned int tuplen;
+ size_t nmoved;
+
+ Assert(!WORKER(state));
switch (state->status)
{
case TSS_SORTEDINMEM:
Assert(forward || state->randomAccess);
- *should_free = false;
+ Assert(!state->slabAllocatorUsed);
if (forward)
{
if (state->current < state->memtupcount)
case TSS_SORTEDONTAPE:
Assert(forward || state->randomAccess);
- *should_free = true;
+ Assert(state->slabAllocatorUsed);
+
+ /*
+ * The slot that held the tuple that we returned in previous
+ * gettuple call can now be reused.
+ */
+ if (state->lastReturnedTuple)
+ {
+ RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
+ state->lastReturnedTuple = NULL;
+ }
+
if (forward)
{
if (state->eof_reached)
return false;
+
if ((tuplen = getlen(state, state->result_tape, true)) != 0)
{
READTUP(state, stup, state->result_tape, tuplen);
+
+ /*
+ * Remember the tuple we return, so that we can recycle
+ * its memory on next call. (This can be NULL, in the
+ * !state->tuples case).
+ */
+ state->lastReturnedTuple = stup->tuple;
+
return true;
}
else
* end of file; back up to fetch last tuple's ending length
* word. If seek fails we must have a completely empty file.
*/
- if (!LogicalTapeBackspace(state->tapeset,
- state->result_tape,
- 2 * sizeof(unsigned int)))
+ nmoved = LogicalTapeBackspace(state->tapeset,
+ state->result_tape,
+ 2 * sizeof(unsigned int));
+ if (nmoved == 0)
return false;
+ else if (nmoved != 2 * sizeof(unsigned int))
+ elog(ERROR, "unexpected tape position");
state->eof_reached = false;
}
else
* Back up and fetch previously-returned tuple's ending length
* word. If seek fails, assume we are at start of file.
*/
- if (!LogicalTapeBackspace(state->tapeset,
- state->result_tape,
- sizeof(unsigned int)))
+ nmoved = LogicalTapeBackspace(state->tapeset,
+ state->result_tape,
+ sizeof(unsigned int));
+ if (nmoved == 0)
return false;
+ else if (nmoved != sizeof(unsigned int))
+ elog(ERROR, "unexpected tape position");
tuplen = getlen(state, state->result_tape, false);
/*
* Back up to get ending length word of tuple before it.
*/
- if (!LogicalTapeBackspace(state->tapeset,
- state->result_tape,
- tuplen + 2 * sizeof(unsigned int)))
+ nmoved = LogicalTapeBackspace(state->tapeset,
+ state->result_tape,
+ tuplen + 2 * sizeof(unsigned int));
+ if (nmoved == tuplen + sizeof(unsigned int))
{
/*
- * If that fails, presumably the prev tuple is the first
- * in the file. Back up so that it becomes next to read
- * in forward direction (not obviously right, but that is
- * what in-memory case does).
+ * We backed up over the previous tuple, but there was no
+ * ending length word before it. That means that the prev
+ * tuple is the first tuple in the file. It is now the
+ * next to read in forward direction (not obviously right,
+ * but that is what in-memory case does).
*/
- if (!LogicalTapeBackspace(state->tapeset,
- state->result_tape,
- tuplen + sizeof(unsigned int)))
- elog(ERROR, "bogus tuple length in backward scan");
return false;
}
+ else if (nmoved != tuplen + 2 * sizeof(unsigned int))
+ elog(ERROR, "bogus tuple length in backward scan");
}
tuplen = getlen(state, state->result_tape, false);
* Note: READTUP expects we are positioned after the initial
* length word of the tuple, so back up to that point.
*/
- if (!LogicalTapeBackspace(state->tapeset,
- state->result_tape,
- tuplen))
+ nmoved = LogicalTapeBackspace(state->tapeset,
+ state->result_tape,
+ tuplen);
+ if (nmoved != tuplen)
elog(ERROR, "bogus tuple length in backward scan");
READTUP(state, stup, state->result_tape, tuplen);
+
+ /*
+ * Remember the tuple we return, so that we can recycle its memory
+ * on next call. (This can be NULL, in the Datum case).
+ */
+ state->lastReturnedTuple = stup->tuple;
+
return true;
case TSS_FINALMERGE:
Assert(forward);
- *should_free = true;
+ /* We are managing memory ourselves, with the slab allocator. */
+ Assert(state->slabAllocatorUsed);
+
+ /*
+ * The slab slot holding the tuple that we returned in previous
+ * gettuple call can now be reused.
+ */
+ if (state->lastReturnedTuple)
+ {
+ RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
+ state->lastReturnedTuple = NULL;
+ }
/*
* This code should match the inner loop of mergeonerun().
if (state->memtupcount > 0)
{
int srcTape = state->memtuples[0].tupindex;
- Size tuplen;
- int tupIndex;
- SortTuple *newtup;
+ SortTuple newtup;
*stup = state->memtuples[0];
- /* returned tuple is no longer counted in our memory space */
- if (stup->tuple)
- {
- tuplen = GetMemoryChunkSpace(stup->tuple);
- state->availMem += tuplen;
- state->mergeavailmem[srcTape] += tuplen;
- }
- tuplesort_heap_siftup(state, false);
- if ((tupIndex = state->mergenext[srcTape]) == 0)
+
+ /*
+ * Remember the tuple we return, so that we can recycle its
+ * memory on next call. (This can be NULL, in the Datum case).
+ */
+ state->lastReturnedTuple = stup->tuple;
+
+ /*
+ * Pull next tuple from tape, and replace the returned tuple
+ * at top of the heap with it.
+ */
+ if (!mergereadnext(state, srcTape, &newtup))
{
/*
- * out of preloaded data on this tape, try to read more
- *
- * Unlike mergeonerun(), we only preload from the single
- * tape that's run dry. See mergepreread() comments.
+ * If no more data, we've reached end of run on this tape.
+ * Remove the top node from the heap.
*/
- mergeprereadone(state, srcTape);
+ tuplesort_heap_delete_top(state);
/*
- * if still no data, we've reached end of run on this tape
+ * Rewind to free the read buffer. It'd go away at the
+ * end of the sort anyway, but better to release the
+ * memory early.
*/
- if ((tupIndex = state->mergenext[srcTape]) == 0)
- return true;
+ LogicalTapeRewindForWrite(state->tapeset, srcTape);
+ return true;
}
- /* pull next preread tuple from list, insert in heap */
- newtup = &state->memtuples[tupIndex];
- state->mergenext[srcTape] = newtup->tupindex;
- if (state->mergenext[srcTape] == 0)
- state->mergelast[srcTape] = 0;
- tuplesort_heap_insert(state, newtup, srcTape, false);
- /* put the now-unused memtuples entry on the freelist */
- newtup->tupindex = state->mergefreelist;
- state->mergefreelist = tupIndex;
- state->mergeavailslots[srcTape]++;
+ newtup.tupindex = srcTape;
+ tuplesort_heap_replace_top(state, &newtup);
return true;
}
return false;
/*
* Fetch the next tuple in either forward or back direction.
- * If successful, put tuple in slot and return TRUE; else, clear the slot
- * and return FALSE.
+ * If successful, put tuple in slot and return true; else, clear the slot
+ * and return false.
+ *
+ * Caller may optionally be passed back abbreviated value (on true return
+ * value) when abbreviation was used, which can be used to cheaply avoid
+ * equality checks that might otherwise be required. Caller can safely make a
+ * determination of "non-equal tuple" based on simple binary inequality. A
+ * NULL value in leading attribute will set abbreviated value to zeroed
+ * representation, which caller may rely on in abbreviated inequality check.
+ *
+ * If copy is true, the slot receives a tuple that's been copied into the
+ * caller's memory context, so that it will stay valid regardless of future
+ * manipulations of the tuplesort's state (up to and including deleting the
+ * tuplesort). If copy is false, the slot will just receive a pointer to a
+ * tuple held within the tuplesort, which is more efficient, but only safe for
+ * callers that are prepared to have any subsequent manipulation of the
+ * tuplesort's state invalidate slot contents.
*/
bool
-tuplesort_gettupleslot(Tuplesortstate *state, bool forward,
- TupleTableSlot *slot)
+tuplesort_gettupleslot(Tuplesortstate *state, bool forward, bool copy,
+ TupleTableSlot *slot, Datum *abbrev)
{
MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
SortTuple stup;
- bool should_free;
- if (!tuplesort_gettuple_common(state, forward, &stup, &should_free))
+ if (!tuplesort_gettuple_common(state, forward, &stup))
stup.tuple = NULL;
MemoryContextSwitchTo(oldcontext);
if (stup.tuple)
{
- ExecStoreMinimalTuple((MinimalTuple) stup.tuple, slot, should_free);
+ /* Record abbreviated key for caller */
+ if (state->sortKeys->abbrev_converter && abbrev)
+ *abbrev = stup.datum1;
+
+ if (copy)
+ stup.tuple = heap_copy_minimal_tuple((MinimalTuple) stup.tuple);
+
+ ExecStoreMinimalTuple((MinimalTuple) stup.tuple, slot, copy);
return true;
}
else
/*
* Fetch the next tuple in either forward or back direction.
- * Returns NULL if no more tuples. If *should_free is set, the
- * caller must pfree the returned tuple when done with it.
+ * Returns NULL if no more tuples. Returned tuple belongs to tuplesort memory
+ * context, and must not be freed by caller. Caller may not rely on tuple
+ * remaining valid after any further manipulation of tuplesort.
*/
HeapTuple
-tuplesort_getheaptuple(Tuplesortstate *state, bool forward, bool *should_free)
+tuplesort_getheaptuple(Tuplesortstate *state, bool forward)
{
MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
SortTuple stup;
- if (!tuplesort_gettuple_common(state, forward, &stup, should_free))
+ if (!tuplesort_gettuple_common(state, forward, &stup))
stup.tuple = NULL;
MemoryContextSwitchTo(oldcontext);
/*
* Fetch the next index tuple in either forward or back direction.
- * Returns NULL if no more tuples. If *should_free is set, the
- * caller must pfree the returned tuple when done with it.
+ * Returns NULL if no more tuples. Returned tuple belongs to tuplesort memory
+ * context, and must not be freed by caller. Caller may not rely on tuple
+ * remaining valid after any further manipulation of tuplesort.
*/
IndexTuple
-tuplesort_getindextuple(Tuplesortstate *state, bool forward,
- bool *should_free)
+tuplesort_getindextuple(Tuplesortstate *state, bool forward)
{
MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
SortTuple stup;
- if (!tuplesort_gettuple_common(state, forward, &stup, should_free))
+ if (!tuplesort_gettuple_common(state, forward, &stup))
stup.tuple = NULL;
MemoryContextSwitchTo(oldcontext);
/*
* Fetch the next Datum in either forward or back direction.
- * Returns FALSE if no more datums.
+ * Returns false if no more datums.
*
* If the Datum is pass-by-ref type, the returned value is freshly palloc'd
- * and is now owned by the caller.
+ * in caller's context, and is now owned by the caller (this differs from
+ * similar routines for other types of tuplesorts).
+ *
+ * Caller may optionally be passed back abbreviated value (on true return
+ * value) when abbreviation was used, which can be used to cheaply avoid
+ * equality checks that might otherwise be required. Caller can safely make a
+ * determination of "non-equal tuple" based on simple binary inequality. A
+ * NULL value will have a zeroed abbreviated value representation, which caller
+ * may rely on in abbreviated inequality check.
*/
bool
tuplesort_getdatum(Tuplesortstate *state, bool forward,
- Datum *val, bool *isNull)
+ Datum *val, bool *isNull, Datum *abbrev)
{
MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
SortTuple stup;
- bool should_free;
- if (!tuplesort_gettuple_common(state, forward, &stup, &should_free))
+ if (!tuplesort_gettuple_common(state, forward, &stup))
{
MemoryContextSwitchTo(oldcontext);
return false;
}
- if (stup.isnull1 || state->datumTypeByVal)
+ /* Ensure we copy into caller's memory context */
+ MemoryContextSwitchTo(oldcontext);
+
+ /* Record abbreviated key for caller */
+ if (state->sortKeys->abbrev_converter && abbrev)
+ *abbrev = stup.datum1;
+
+ if (stup.isnull1 || !state->tuples)
{
*val = stup.datum1;
*isNull = stup.isnull1;
}
else
{
- if (should_free)
- *val = stup.datum1;
- else
- *val = datumCopy(stup.datum1, false, state->datumTypeLen);
+ /* use stup.tuple because stup.datum1 may be an abbreviation */
+ *val = datumCopy(PointerGetDatum(stup.tuple), false, state->datumTypeLen);
*isNull = false;
}
- MemoryContextSwitchTo(oldcontext);
-
return true;
}
/*
* Advance over N tuples in either forward or back direction,
* without returning any data. N==0 is a no-op.
- * Returns TRUE if successful, FALSE if ran out of tuples.
+ * Returns true if successful, false if ran out of tuples.
*/
bool
tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
*/
Assert(forward);
Assert(ntuples >= 0);
+ Assert(!WORKER(state));
switch (state->status)
{
while (ntuples-- > 0)
{
SortTuple stup;
- bool should_free;
- if (!tuplesort_gettuple_common(state, forward,
- &stup, &should_free))
+ if (!tuplesort_gettuple_common(state, forward, &stup))
{
MemoryContextSwitchTo(oldcontext);
return false;
}
- if (should_free && stup.tuple)
- pfree(stup.tuple);
CHECK_FOR_INTERRUPTS();
}
MemoryContextSwitchTo(oldcontext);
mOrder = (allowedMem - TAPE_BUFFER_OVERHEAD) /
(MERGE_BUFFER_SIZE + TAPE_BUFFER_OVERHEAD);
- /* Even in minimum memory, use at least a MINORDER merge */
+ /*
+ * Even in minimum memory, use at least a MINORDER merge. On the other
+ * hand, even when we have lots of memory, do not use more than a MAXORDER
+ * merge. Tapes are pretty cheap, but they're not entirely free. Each
+ * additional tape reduces the amount of memory available to build runs,
+ * which in turn can cause the same sort to need more runs, which makes
+ * merging slower even if it can still be done in a single pass. Also,
+ * high order merges are quite slow due to CPU cache effects; it can be
+ * faster to pay the I/O cost of a polyphase merge than to perform a
+ * single merge pass across many hundreds of tapes.
+ */
mOrder = Max(mOrder, MINORDER);
+ mOrder = Min(mOrder, MAXORDER);
return mOrder;
}
/*
* inittapes - initialize for tape sorting.
*
- * This is called only if we have found we don't have room to sort in memory.
+ * This is called only if we have found we won't sort in memory.
*/
static void
-inittapes(Tuplesortstate *state)
+inittapes(Tuplesortstate *state, bool mergeruns)
{
int maxTapes,
- ntuples,
j;
- int64 tapeSpace;
- /* Compute number of tapes to use: merge order plus 1 */
- maxTapes = tuplesort_merge_order(state->allowedMem) + 1;
+ Assert(!LEADER(state));
- /*
- * We must have at least 2*maxTapes slots in the memtuples[] array, else
- * we'd not have room for merge heap plus preread. It seems unlikely that
- * this case would ever occur, but be safe.
- */
- maxTapes = Min(maxTapes, state->memtupsize / 2);
-
- state->maxTapes = maxTapes;
- state->tapeRange = maxTapes - 1;
+ if (mergeruns)
+ {
+ /* Compute number of tapes to use: merge order plus 1 */
+ maxTapes = tuplesort_merge_order(state->allowedMem) + 1;
+ }
+ else
+ {
+ /* Workers can sometimes produce single run, output without merge */
+ Assert(WORKER(state));
+ maxTapes = MINORDER + 1;
+ }
#ifdef TRACE_SORT
if (trace_sort)
- elog(LOG, "switching to external sort with %d tapes: %s",
- maxTapes, pg_rusage_show(&state->ru_start));
+ elog(LOG, "worker %d switching to external sort with %d tapes: %s",
+ state->worker, maxTapes, pg_rusage_show(&state->ru_start));
#endif
+ /* Create the tape set and allocate the per-tape data arrays */
+ inittapestate(state, maxTapes);
+ state->tapeset =
+ LogicalTapeSetCreate(maxTapes, NULL,
+ state->shared ? &state->shared->fileset : NULL,
+ state->worker);
+
+ state->currentRun = 0;
+
+ /*
+ * Initialize variables of Algorithm D (step D1).
+ */
+ for (j = 0; j < maxTapes; j++)
+ {
+ state->tp_fib[j] = 1;
+ state->tp_runs[j] = 0;
+ state->tp_dummy[j] = 1;
+ state->tp_tapenum[j] = j;
+ }
+ state->tp_fib[state->tapeRange] = 0;
+ state->tp_dummy[state->tapeRange] = 0;
+
+ state->Level = 1;
+ state->destTape = 0;
+
+ state->status = TSS_BUILDRUNS;
+}
+
+/*
+ * inittapestate - initialize generic tape management state
+ */
+static void
+inittapestate(Tuplesortstate *state, int maxTapes)
+{
+ int64 tapeSpace;
+
/*
* Decrease availMem to reflect the space needed for tape buffers; but
* don't decrease it to the point that we have no room for tuples. (That
* account for tuple space, so we don't care if LACKMEM becomes
* inaccurate.)
*/
- tapeSpace = (int64) maxTapes *TAPE_BUFFER_OVERHEAD;
+ tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD;
if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
USEMEM(state, tapeSpace);
/*
* Make sure that the temp file(s) underlying the tape set are created in
- * suitable temp tablespaces.
+ * suitable temp tablespaces. For parallel sorts, this should have been
+ * called already, but it doesn't matter if it is called a second time.
*/
PrepareTempTablespaces();
- /*
- * Create the tape set and allocate the per-tape data arrays.
- */
- state->tapeset = LogicalTapeSetCreate(maxTapes);
-
state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool));
- state->mergenext = (int *) palloc0(maxTapes * sizeof(int));
- state->mergelast = (int *) palloc0(maxTapes * sizeof(int));
- state->mergeavailslots = (int *) palloc0(maxTapes * sizeof(int));
- state->mergeavailmem = (int64 *) palloc0(maxTapes * sizeof(int64));
state->tp_fib = (int *) palloc0(maxTapes * sizeof(int));
state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int));
state->tp_tapenum = (int *) palloc0(maxTapes * sizeof(int));
- /*
- * Convert the unsorted contents of memtuples[] into a heap. Each tuple is
- * marked as belonging to run number zero.
- *
- * NOTE: we pass false for checkIndex since there's no point in comparing
- * indexes in this step, even though we do intend the indexes to be part
- * of the sort key...
- */
- ntuples = state->memtupcount;
- state->memtupcount = 0; /* make the heap empty */
- for (j = 0; j < ntuples; j++)
- {
- /* Must copy source tuple to avoid possible overwrite */
- SortTuple stup = state->memtuples[j];
-
- tuplesort_heap_insert(state, &stup, 0, false);
- }
- Assert(state->memtupcount == ntuples);
-
- state->currentRun = 0;
-
- /*
- * Initialize variables of Algorithm D (step D1).
- */
- for (j = 0; j < maxTapes; j++)
- {
- state->tp_fib[j] = 1;
- state->tp_runs[j] = 0;
- state->tp_dummy[j] = 1;
- state->tp_tapenum[j] = j;
- }
- state->tp_fib[state->tapeRange] = 0;
- state->tp_dummy[state->tapeRange] = 0;
-
- state->Level = 1;
- state->destTape = 0;
-
- state->status = TSS_BUILDRUNS;
+ /* Record # of tapes allocated (for duration of sort) */
+ state->maxTapes = maxTapes;
+ /* Record maximum # of tapes usable as inputs when merging */
+ state->tapeRange = maxTapes - 1;
}
/*
state->destTape = 0;
}
+/*
+ * Initialize the slab allocation arena, for the given number of slots.
+ */
+static void
+init_slab_allocator(Tuplesortstate *state, int numSlots)
+{
+ if (numSlots > 0)
+ {
+ char *p;
+ int i;
+
+ state->slabMemoryBegin = palloc(numSlots * SLAB_SLOT_SIZE);
+ state->slabMemoryEnd = state->slabMemoryBegin +
+ numSlots * SLAB_SLOT_SIZE;
+ state->slabFreeHead = (SlabSlot *) state->slabMemoryBegin;
+ USEMEM(state, numSlots * SLAB_SLOT_SIZE);
+
+ p = state->slabMemoryBegin;
+ for (i = 0; i < numSlots - 1; i++)
+ {
+ ((SlabSlot *) p)->nextfree = (SlabSlot *) (p + SLAB_SLOT_SIZE);
+ p += SLAB_SLOT_SIZE;
+ }
+ ((SlabSlot *) p)->nextfree = NULL;
+ }
+ else
+ {
+ state->slabMemoryBegin = state->slabMemoryEnd = NULL;
+ state->slabFreeHead = NULL;
+ }
+ state->slabAllocatorUsed = true;
+}
+
/*
* mergeruns -- merge all the completed initial runs.
*
svTape,
svRuns,
svDummy;
+ int numTapes;
+ int numInputTapes;
Assert(state->status == TSS_BUILDRUNS);
Assert(state->memtupcount == 0);
+ if (state->sortKeys != NULL && state->sortKeys->abbrev_converter != NULL)
+ {
+ /*
+ * If there are multiple runs to be merged, when we go to read back
+ * tuples from disk, abbreviated keys will not have been stored, and
+ * we don't care to regenerate them. Disable abbreviation from this
+ * point on.
+ */
+ state->sortKeys->abbrev_converter = NULL;
+ state->sortKeys->comparator = state->sortKeys->abbrev_full_comparator;
+
+ /* Not strictly necessary, but be tidy */
+ state->sortKeys->abbrev_abort = NULL;
+ state->sortKeys->abbrev_full_comparator = NULL;
+ }
+
+ /*
+ * Reset tuple memory. We've freed all the tuples that we previously
+ * allocated. We will use the slab allocator from now on.
+ */
+ MemoryContextDelete(state->tuplecontext);
+ state->tuplecontext = NULL;
+
+ /*
+ * We no longer need a large memtuples array. (We will allocate a smaller
+ * one for the heap later.)
+ */
+ FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
+ pfree(state->memtuples);
+ state->memtuples = NULL;
+
/*
- * If we produced only one initial run (quite likely if the total data
- * volume is between 1X and 2X workMem), we can just use that tape as the
- * finished output, rather than doing a useless merge. (This obvious
- * optimization is not in Knuth's algorithm.)
+ * If we had fewer runs than tapes, refund the memory that we imagined we
+ * would need for the tape buffers of the unused tapes.
+ *
+ * numTapes and numInputTapes reflect the actual number of tapes we will
+ * use. Note that the output tape's tape number is maxTapes - 1, so the
+ * tape numbers of the used tapes are not consecutive, and you cannot just
+ * loop from 0 to numTapes to visit all used tapes!
*/
- if (state->currentRun == 1)
+ if (state->Level == 1)
{
- state->result_tape = state->tp_tapenum[state->destTape];
- /* must freeze and rewind the finished output tape */
- LogicalTapeFreeze(state->tapeset, state->result_tape);
- state->status = TSS_SORTEDONTAPE;
- return;
+ numInputTapes = state->currentRun;
+ numTapes = numInputTapes + 1;
+ FREEMEM(state, (state->maxTapes - numTapes) * TAPE_BUFFER_OVERHEAD);
}
+ else
+ {
+ numInputTapes = state->tapeRange;
+ numTapes = state->maxTapes;
+ }
+
+ /*
+ * Initialize the slab allocator. We need one slab slot per input tape,
+ * for the tuples in the heap, plus one to hold the tuple last returned
+ * from tuplesort_gettuple. (If we're sorting pass-by-val Datums,
+ * however, we don't need to do allocate anything.)
+ *
+ * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism
+ * to track memory usage of individual tuples.
+ */
+ if (state->tuples)
+ init_slab_allocator(state, numInputTapes + 1);
+ else
+ init_slab_allocator(state, 0);
+
+ /*
+ * Allocate a new 'memtuples' array, for the heap. It will hold one tuple
+ * from each input tape.
+ */
+ state->memtupsize = numInputTapes;
+ state->memtuples = (SortTuple *) palloc(numInputTapes * sizeof(SortTuple));
+ USEMEM(state, GetMemoryChunkSpace(state->memtuples));
+
+ /*
+ * Use all the remaining memory we have available for read buffers among
+ * the input tapes.
+ *
+ * We don't try to "rebalance" the memory among tapes, when we start a new
+ * merge phase, even if some tapes are inactive in the new phase. That
+ * would be hard, because logtape.c doesn't know where one run ends and
+ * another begins. When a new merge phase begins, and a tape doesn't
+ * participate in it, its buffer nevertheless already contains tuples from
+ * the next run on same tape, so we cannot release the buffer. That's OK
+ * in practice, merge performance isn't that sensitive to the amount of
+ * buffers used, and most merge phases use all or almost all tapes,
+ * anyway.
+ */
+#ifdef TRACE_SORT
+ if (trace_sort)
+ elog(LOG, "worker %d using " INT64_FORMAT " KB of memory for read buffers among %d input tapes",
+ state->worker, state->availMem / 1024, numInputTapes);
+#endif
+
+ state->read_buffer_size = Max(state->availMem / numInputTapes, 0);
+ USEMEM(state, state->read_buffer_size * numInputTapes);
/* End of step D2: rewind all output tapes to prepare for merging */
for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
- LogicalTapeRewind(state->tapeset, tapenum, false);
+ LogicalTapeRewindForRead(state->tapeset, tapenum, state->read_buffer_size);
for (;;)
{
* pass remains. If we don't have to produce a materialized sorted
* tape, we can stop at this point and do the final merge on-the-fly.
*/
- if (!state->randomAccess)
+ if (!state->randomAccess && !WORKER(state))
{
bool allOneRun = true;
if (--state->Level == 0)
break;
/* rewind output tape T to use as new input */
- LogicalTapeRewind(state->tapeset, state->tp_tapenum[state->tapeRange],
- false);
+ LogicalTapeRewindForRead(state->tapeset, state->tp_tapenum[state->tapeRange],
+ state->read_buffer_size);
/* rewind used-up input tape P, and prepare it for write pass */
- LogicalTapeRewind(state->tapeset, state->tp_tapenum[state->tapeRange - 1],
- true);
+ LogicalTapeRewindForWrite(state->tapeset, state->tp_tapenum[state->tapeRange - 1]);
state->tp_runs[state->tapeRange - 1] = 0;
/*
* a waste of cycles anyway...
*/
state->result_tape = state->tp_tapenum[state->tapeRange];
- LogicalTapeFreeze(state->tapeset, state->result_tape);
+ if (!WORKER(state))
+ LogicalTapeFreeze(state->tapeset, state->result_tape, NULL);
+ else
+ worker_freeze_result_tape(state);
state->status = TSS_SORTEDONTAPE;
+
+ /* Release the read buffers of all the other tapes, by rewinding them. */
+ for (tapenum = 0; tapenum < state->maxTapes; tapenum++)
+ {
+ if (tapenum != state->result_tape)
+ LogicalTapeRewindForWrite(state->tapeset, tapenum);
+ }
}
/*
{
int destTape = state->tp_tapenum[state->tapeRange];
int srcTape;
- int tupIndex;
- SortTuple *tup;
- int64 priorAvail,
- spaceFreed;
/*
* Start the merge by loading one tuple from each active source tape into
*/
while (state->memtupcount > 0)
{
+ SortTuple stup;
+
/* write the tuple to destTape */
- priorAvail = state->availMem;
srcTape = state->memtuples[0].tupindex;
WRITETUP(state, destTape, &state->memtuples[0]);
- /* writetup adjusted total free space, now fix per-tape space */
- spaceFreed = state->availMem - priorAvail;
- state->mergeavailmem[srcTape] += spaceFreed;
- /* compact the heap */
- tuplesort_heap_siftup(state, false);
- if ((tupIndex = state->mergenext[srcTape]) == 0)
+
+ /* recycle the slot of the tuple we just wrote out, for the next read */
+ if (state->memtuples[0].tuple)
+ RELEASE_SLAB_SLOT(state, state->memtuples[0].tuple);
+
+ /*
+ * pull next tuple from the tape, and replace the written-out tuple in
+ * the heap with it.
+ */
+ if (mergereadnext(state, srcTape, &stup))
{
- /* out of preloaded data on this tape, try to read more */
- mergepreread(state);
- /* if still no data, we've reached end of run on this tape */
- if ((tupIndex = state->mergenext[srcTape]) == 0)
- continue;
+ stup.tupindex = srcTape;
+ tuplesort_heap_replace_top(state, &stup);
+
}
- /* pull next preread tuple from list, insert in heap */
- tup = &state->memtuples[tupIndex];
- state->mergenext[srcTape] = tup->tupindex;
- if (state->mergenext[srcTape] == 0)
- state->mergelast[srcTape] = 0;
- tuplesort_heap_insert(state, tup, srcTape, false);
- /* put the now-unused memtuples entry on the freelist */
- tup->tupindex = state->mergefreelist;
- state->mergefreelist = tupIndex;
- state->mergeavailslots[srcTape]++;
+ else
+ tuplesort_heap_delete_top(state);
}
/*
#ifdef TRACE_SORT
if (trace_sort)
- elog(LOG, "finished %d-way merge step: %s", state->activeTapes,
- pg_rusage_show(&state->ru_start));
+ elog(LOG, "worker %d finished %d-way merge step: %s", state->worker,
+ state->activeTapes, pg_rusage_show(&state->ru_start));
#endif
}
* beginmerge - initialize for a merge pass
*
* We decrease the counts of real and dummy runs for each tape, and mark
- * which tapes contain active input runs in mergeactive[]. Then, load
- * as many tuples as we can from each active input tape, and finally
- * fill the merge heap with the first tuple from each active tape.
+ * which tapes contain active input runs in mergeactive[]. Then, fill the
+ * merge heap with the first tuple from each active tape.
*/
static void
beginmerge(Tuplesortstate *state)
int activeTapes;
int tapenum;
int srcTape;
- int slotsPerTape;
- int64 spacePerTape;
/* Heap should be empty here */
Assert(state->memtupcount == 0);
activeTapes++;
}
}
- state->activeTapes = activeTapes;
-
- /* Clear merge-pass state variables */
- memset(state->mergenext, 0,
- state->maxTapes * sizeof(*state->mergenext));
- memset(state->mergelast, 0,
- state->maxTapes * sizeof(*state->mergelast));
- state->mergefreelist = 0; /* nothing in the freelist */
- state->mergefirstfree = activeTapes; /* 1st slot avail for preread */
-
- /*
- * Initialize space allocation to let each active input tape have an equal
- * share of preread space.
- */
Assert(activeTapes > 0);
- slotsPerTape = (state->memtupsize - state->mergefirstfree) / activeTapes;
- Assert(slotsPerTape > 0);
- spacePerTape = state->availMem / activeTapes;
- for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
- {
- if (state->mergeactive[srcTape])
- {
- state->mergeavailslots[srcTape] = slotsPerTape;
- state->mergeavailmem[srcTape] = spacePerTape;
- }
- }
-
- /*
- * Preread as many tuples as possible (and at least one) from each active
- * tape
- */
- mergepreread(state);
+ state->activeTapes = activeTapes;
/* Load the merge heap with the first tuple from each input tape */
for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
{
- int tupIndex = state->mergenext[srcTape];
- SortTuple *tup;
+ SortTuple tup;
- if (tupIndex)
+ if (mergereadnext(state, srcTape, &tup))
{
- tup = &state->memtuples[tupIndex];
- state->mergenext[srcTape] = tup->tupindex;
- if (state->mergenext[srcTape] == 0)
- state->mergelast[srcTape] = 0;
- tuplesort_heap_insert(state, tup, srcTape, false);
- /* put the now-unused memtuples entry on the freelist */
- tup->tupindex = state->mergefreelist;
- state->mergefreelist = tupIndex;
- state->mergeavailslots[srcTape]++;
+ tup.tupindex = srcTape;
+ tuplesort_heap_insert(state, &tup);
}
}
}
/*
- * mergepreread - load tuples from merge input tapes
- *
- * This routine exists to improve sequentiality of reads during a merge pass,
- * as explained in the header comments of this file. Load tuples from each
- * active source tape until the tape's run is exhausted or it has used up
- * its fair share of available memory. In any case, we guarantee that there
- * is at least one preread tuple available from each unexhausted input tape.
- *
- * We invoke this routine at the start of a merge pass for initial load,
- * and then whenever any tape's preread data runs out. Note that we load
- * as much data as possible from all tapes, not just the one that ran out.
- * This is because logtape.c works best with a usage pattern that alternates
- * between reading a lot of data and writing a lot of data, so whenever we
- * are forced to read, we should fill working memory completely.
- *
- * In FINALMERGE state, we *don't* use this routine, but instead just preread
- * from the single tape that ran dry. There's no read/write alternation in
- * that state and so no point in scanning through all the tapes to fix one.
- * (Moreover, there may be quite a lot of inactive tapes in that state, since
- * we might have had many fewer runs than tapes. In a regular tape-to-tape
- * merge we can expect most of the tapes to be active.)
- */
-static void
-mergepreread(Tuplesortstate *state)
-{
- int srcTape;
-
- for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
- mergeprereadone(state, srcTape);
-}
-
-/*
- * mergeprereadone - load tuples from one merge input tape
+ * mergereadnext - read next tuple from one merge input tape
*
- * Read tuples from the specified tape until it has used up its free memory
- * or array slots; but ensure that we have at least one tuple, if any are
- * to be had.
+ * Returns false on EOF.
*/
-static void
-mergeprereadone(Tuplesortstate *state, int srcTape)
+static bool
+mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup)
{
unsigned int tuplen;
- SortTuple stup;
- int tupIndex;
- int64 priorAvail,
- spaceUsed;
if (!state->mergeactive[srcTape])
- return; /* tape's run is already exhausted */
- priorAvail = state->availMem;
- state->availMem = state->mergeavailmem[srcTape];
- while ((state->mergeavailslots[srcTape] > 0 && !LACKMEM(state)) ||
- state->mergenext[srcTape] == 0)
- {
- /* read next tuple, if any */
- if ((tuplen = getlen(state, srcTape, true)) == 0)
- {
- state->mergeactive[srcTape] = false;
- break;
- }
- READTUP(state, &stup, srcTape, tuplen);
- /* find a free slot in memtuples[] for it */
- tupIndex = state->mergefreelist;
- if (tupIndex)
- state->mergefreelist = state->memtuples[tupIndex].tupindex;
- else
- {
- tupIndex = state->mergefirstfree++;
- Assert(tupIndex < state->memtupsize);
- }
- state->mergeavailslots[srcTape]--;
- /* store tuple, append to list for its tape */
- stup.tupindex = 0;
- state->memtuples[tupIndex] = stup;
- if (state->mergelast[srcTape])
- state->memtuples[state->mergelast[srcTape]].tupindex = tupIndex;
- else
- state->mergenext[srcTape] = tupIndex;
- state->mergelast[srcTape] = tupIndex;
+ return false; /* tape's run is already exhausted */
+
+ /* read next tuple, if any */
+ if ((tuplen = getlen(state, srcTape, true)) == 0)
+ {
+ state->mergeactive[srcTape] = false;
+ return false;
}
- /* update per-tape and global availmem counts */
- spaceUsed = state->mergeavailmem[srcTape] - state->availMem;
- state->mergeavailmem[srcTape] = state->availMem;
- state->availMem = priorAvail - spaceUsed;
+ READTUP(state, stup, srcTape, tuplen);
+
+ return true;
}
/*
- * dumptuples - remove tuples from heap and write to tape
- *
- * This is used during initial-run building, but not during merging.
- *
- * When alltuples = false, dump only enough tuples to get under the
- * availMem limit (and leave at least one tuple in the heap in any case,
- * since puttuple assumes it always has a tuple to compare to). We also
- * insist there be at least one free slot in the memtuples[] array.
- *
- * When alltuples = true, dump everything currently in memory.
- * (This case is only used at end of input data.)
+ * dumptuples - remove tuples from memtuples and write initial run to tape
*
- * If we empty the heap, close out the current run and return (this should
- * only happen at end of input data). If we see that the tuple run number
- * at the top of the heap has changed, start a new run.
+ * When alltuples = true, dump everything currently in memory. (This case is
+ * only used at end of input data.)
*/
static void
dumptuples(Tuplesortstate *state, bool alltuples)
{
- while (alltuples ||
- (LACKMEM(state) && state->memtupcount > 1) ||
- state->memtupcount >= state->memtupsize)
+ int memtupwrite;
+ int i;
+
+ /*
+ * Nothing to do if we still fit in available memory and have array slots,
+ * unless this is the final call during initial run generation.
+ */
+ if (state->memtupcount < state->memtupsize && !LACKMEM(state) &&
+ !alltuples)
+ return;
+
+ /*
+ * Final call might require no sorting, in rare cases where we just so
+ * happen to have previously LACKMEM()'d at the point where exactly all
+ * remaining tuples are loaded into memory, just before input was
+ * exhausted.
+ *
+ * In general, short final runs are quite possible. Rather than allowing
+ * a special case where there was a superfluous selectnewtape() call (i.e.
+ * a call with no subsequent run actually written to destTape), we prefer
+ * to write out a 0 tuple run.
+ *
+ * mergereadnext() is prepared for 0 tuple runs, and will reliably mark
+ * the tape inactive for the merge when called from beginmerge(). This
+ * case is therefore similar to the case where mergeonerun() finds a dummy
+ * run for the tape, and so doesn't need to merge a run from the tape (or
+ * conceptually "merges" the dummy run, if you prefer). According to
+ * Knuth, Algorithm D "isn't strictly optimal" in its method of
+ * distribution and dummy run assignment; this edge case seems very
+ * unlikely to make that appreciably worse.
+ */
+ Assert(state->status == TSS_BUILDRUNS);
+
+ /*
+ * It seems unlikely that this limit will ever be exceeded, but take no
+ * chances
+ */
+ if (state->currentRun == INT_MAX)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+ errmsg("cannot have more than %d runs for an external sort",
+ INT_MAX)));
+
+ state->currentRun++;
+
+#ifdef TRACE_SORT
+ if (trace_sort)
+ elog(LOG, "worker %d starting quicksort of run %d: %s",
+ state->worker, state->currentRun,
+ pg_rusage_show(&state->ru_start));
+#endif
+
+ /*
+ * Sort all tuples accumulated within the allowed amount of memory for
+ * this run using quicksort
+ */
+ tuplesort_sort_memtuples(state);
+
+#ifdef TRACE_SORT
+ if (trace_sort)
+ elog(LOG, "worker %d finished quicksort of run %d: %s",
+ state->worker, state->currentRun,
+ pg_rusage_show(&state->ru_start));
+#endif
+
+ memtupwrite = state->memtupcount;
+ for (i = 0; i < memtupwrite; i++)
{
- /*
- * Dump the heap's frontmost entry, and sift up to remove it from the
- * heap.
- */
- Assert(state->memtupcount > 0);
WRITETUP(state, state->tp_tapenum[state->destTape],
- &state->memtuples[0]);
- tuplesort_heap_siftup(state, true);
+ &state->memtuples[i]);
+ state->memtupcount--;
+ }
- /*
- * If the heap is empty *or* top run number has changed, we've
- * finished the current run.
- */
- if (state->memtupcount == 0 ||
- state->currentRun != state->memtuples[0].tupindex)
- {
- markrunend(state, state->tp_tapenum[state->destTape]);
- state->currentRun++;
- state->tp_runs[state->destTape]++;
- state->tp_dummy[state->destTape]--; /* per Alg D step D2 */
+ /*
+ * Reset tuple memory. We've freed all of the tuples that we previously
+ * allocated. It's important to avoid fragmentation when there is a stark
+ * change in the sizes of incoming tuples. Fragmentation due to
+ * AllocSetFree's bucketing by size class might be particularly bad if
+ * this step wasn't taken.
+ */
+ MemoryContextReset(state->tuplecontext);
+
+ markrunend(state, state->tp_tapenum[state->destTape]);
+ state->tp_runs[state->destTape]++;
+ state->tp_dummy[state->destTape]--; /* per Alg D step D2 */
#ifdef TRACE_SORT
- if (trace_sort)
- elog(LOG, "finished writing%s run %d to tape %d: %s",
- (state->memtupcount == 0) ? " final" : "",
- state->currentRun, state->destTape,
- pg_rusage_show(&state->ru_start));
+ if (trace_sort)
+ elog(LOG, "worker %d finished writing run %d to tape %d: %s",
+ state->worker, state->currentRun, state->destTape,
+ pg_rusage_show(&state->ru_start));
#endif
- /*
- * Done if heap is empty, else prepare for new run.
- */
- if (state->memtupcount == 0)
- break;
- Assert(state->currentRun == state->memtuples[0].tupindex);
- selectnewtape(state);
- }
- }
+ if (!alltuples)
+ selectnewtape(state);
}
/*
state->markpos_eof = false;
break;
case TSS_SORTEDONTAPE:
- LogicalTapeRewind(state->tapeset,
- state->result_tape,
- false);
+ LogicalTapeRewindForRead(state->tapeset,
+ state->result_tape,
+ 0);
state->eof_reached = false;
state->markpos_block = 0L;
state->markpos_offset = 0;
state->eof_reached = state->markpos_eof;
break;
case TSS_SORTEDONTAPE:
- if (!LogicalTapeSeek(state->tapeset,
- state->result_tape,
- state->markpos_block,
- state->markpos_offset))
- elog(ERROR, "tuplesort_restorepos failed");
+ LogicalTapeSeek(state->tapeset,
+ state->result_tape,
+ state->markpos_block,
+ state->markpos_offset);
state->eof_reached = state->markpos_eof;
break;
default:
*
* This can be called after tuplesort_performsort() finishes to obtain
* printable summary information about how the sort was performed.
- * spaceUsed is measured in kilobytes.
*/
void
tuplesort_get_stats(Tuplesortstate *state,
- const char **sortMethod,
- const char **spaceType,
- long *spaceUsed)
+ TuplesortInstrumentation *stats)
{
/*
* Note: it might seem we should provide both memory and disk usage for a
*/
if (state->tapeset)
{
- *spaceType = "Disk";
- *spaceUsed = LogicalTapeSetBlocks(state->tapeset) * (BLCKSZ / 1024);
+ stats->spaceType = SORT_SPACE_TYPE_DISK;
+ stats->spaceUsed = LogicalTapeSetBlocks(state->tapeset) * (BLCKSZ / 1024);
}
else
{
- *spaceType = "Memory";
- *spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
+ stats->spaceType = SORT_SPACE_TYPE_MEMORY;
+ stats->spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
}
switch (state->status)
{
case TSS_SORTEDINMEM:
if (state->boundUsed)
- *sortMethod = "top-N heapsort";
+ stats->sortMethod = SORT_TYPE_TOP_N_HEAPSORT;
else
- *sortMethod = "quicksort";
+ stats->sortMethod = SORT_TYPE_QUICKSORT;
break;
case TSS_SORTEDONTAPE:
- *sortMethod = "external sort";
+ stats->sortMethod = SORT_TYPE_EXTERNAL_SORT;
break;
case TSS_FINALMERGE:
- *sortMethod = "external merge";
+ stats->sortMethod = SORT_TYPE_EXTERNAL_MERGE;
break;
default:
- *sortMethod = "still in progress";
+ stats->sortMethod = SORT_TYPE_STILL_IN_PROGRESS;
break;
}
}
+/*
+ * Convert TuplesortMethod to a string.
+ */
+const char *
+tuplesort_method_name(TuplesortMethod m)
+{
+ switch (m)
+ {
+ case SORT_TYPE_STILL_IN_PROGRESS:
+ return "still in progress";
+ case SORT_TYPE_TOP_N_HEAPSORT:
+ return "top-N heapsort";
+ case SORT_TYPE_QUICKSORT:
+ return "quicksort";
+ case SORT_TYPE_EXTERNAL_SORT:
+ return "external sort";
+ case SORT_TYPE_EXTERNAL_MERGE:
+ return "external merge";
+ }
+
+ return "unknown";
+}
/*
- * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
- *
- * Compare two SortTuples. If checkIndex is true, use the tuple index
- * as the front of the sort key; otherwise, no.
+ * Convert TuplesortSpaceType to a string.
*/
+const char *
+tuplesort_space_type_name(TuplesortSpaceType t)
+{
+ Assert(t == SORT_SPACE_TYPE_DISK || t == SORT_SPACE_TYPE_MEMORY);
+ return t == SORT_SPACE_TYPE_DISK ? "Disk" : "Memory";
+}
+
-#define HEAPCOMPARE(tup1,tup2) \
- (checkIndex && ((tup1)->tupindex != (tup2)->tupindex) ? \
- ((tup1)->tupindex) - ((tup2)->tupindex) : \
- COMPARETUP(state, tup1, tup2))
+/*
+ * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
+ */
/*
* Convert the existing unordered array of SortTuples to a bounded heap,
* at the root (array entry zero), instead of the smallest as in the normal
* sort case. This allows us to discard the largest entry cheaply.
* Therefore, we temporarily reverse the sort direction.
- *
- * We assume that all entries in a bounded heap will always have tupindex
- * zero; it therefore doesn't matter that HEAPCOMPARE() doesn't reverse
- * the direction of comparison for tupindexes.
*/
static void
make_bounded_heap(Tuplesortstate *state)
Assert(state->status == TSS_INITIAL);
Assert(state->bounded);
Assert(tupcount >= state->bound);
+ Assert(SERIAL(state));
/* Reverse sort direction so largest entry will be at root */
- REVERSEDIRECTION(state);
+ reversedirection(state);
state->memtupcount = 0; /* make the heap empty */
for (i = 0; i < tupcount; i++)
{
- if (state->memtupcount >= state->bound &&
- COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0)
- {
- /* New tuple would just get thrown out, so skip it */
- free_sort_tuple(state, &state->memtuples[i]);
- CHECK_FOR_INTERRUPTS();
- }
- else
+ if (state->memtupcount < state->bound)
{
/* Insert next tuple into heap */
/* Must copy source tuple to avoid possible overwrite */
SortTuple stup = state->memtuples[i];
- tuplesort_heap_insert(state, &stup, 0, false);
-
- /* If heap too full, discard largest entry */
- if (state->memtupcount > state->bound)
+ tuplesort_heap_insert(state, &stup);
+ }
+ else
+ {
+ /*
+ * The heap is full. Replace the largest entry with the new
+ * tuple, or just discard it, if it's larger than anything already
+ * in the heap.
+ */
+ if (COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0)
{
- free_sort_tuple(state, &state->memtuples[0]);
- tuplesort_heap_siftup(state, false);
+ free_sort_tuple(state, &state->memtuples[i]);
+ CHECK_FOR_INTERRUPTS();
}
+ else
+ tuplesort_heap_replace_top(state, &state->memtuples[i]);
}
}
Assert(state->status == TSS_BOUNDED);
Assert(state->bounded);
Assert(tupcount == state->bound);
+ Assert(SERIAL(state));
/*
- * We can unheapify in place because each sift-up will remove the largest
- * entry, which we can promptly store in the newly freed slot at the end.
- * Once we're down to a single-entry heap, we're done.
+ * We can unheapify in place because each delete-top call will remove the
+ * largest entry, which we can promptly store in the newly freed slot at
+ * the end. Once we're down to a single-entry heap, we're done.
*/
while (state->memtupcount > 1)
{
SortTuple stup = state->memtuples[0];
/* this sifts-up the next-largest entry and decreases memtupcount */
- tuplesort_heap_siftup(state, false);
+ tuplesort_heap_delete_top(state);
state->memtuples[state->memtupcount] = stup;
}
state->memtupcount = tupcount;
* Reverse sort direction back to the original state. This is not
* actually necessary but seems like a good idea for tidiness.
*/
- REVERSEDIRECTION(state);
+ reversedirection(state);
state->status = TSS_SORTEDINMEM;
state->boundUsed = true;
}
/*
- * Insert a new tuple into an empty or existing heap, maintaining the
- * heap invariant. Caller is responsible for ensuring there's room.
+ * Sort all memtuples using specialized qsort() routines.
*
- * Note: we assume *tuple is a temporary variable that can be scribbled on.
- * For some callers, tuple actually points to a memtuples[] entry above the
+ * Quicksort is used for small in-memory sorts, and external sort runs.
+ */
+static void
+tuplesort_sort_memtuples(Tuplesortstate *state)
+{
+ Assert(!LEADER(state));
+
+ if (state->memtupcount > 1)
+ {
+ /* Can we use the single-key sort function? */
+ if (state->onlyKey != NULL)
+ qsort_ssup(state->memtuples, state->memtupcount,
+ state->onlyKey);
+ else
+ qsort_tuple(state->memtuples,
+ state->memtupcount,
+ state->comparetup,
+ state);
+ }
+}
+
+/*
+ * Insert a new tuple into an empty or existing heap, maintaining the
+ * heap invariant. Caller is responsible for ensuring there's room.
+ *
+ * Note: For some callers, tuple points to a memtuples[] entry above the
* end of the heap. This is safe as long as it's not immediately adjacent
* to the end of the heap (ie, in the [memtupcount] array entry) --- if it
* is, it might get overwritten before being moved into the heap!
*/
static void
-tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple,
- int tupleindex, bool checkIndex)
+tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple)
{
SortTuple *memtuples;
int j;
- /*
- * Save the tupleindex --- see notes above about writing on *tuple. It's a
- * historical artifact that tupleindex is passed as a separate argument
- * and not in *tuple, but it's notationally convenient so let's leave it
- * that way.
- */
- tuple->tupindex = tupleindex;
-
memtuples = state->memtuples;
Assert(state->memtupcount < state->memtupsize);
{
int i = (j - 1) >> 1;
- if (HEAPCOMPARE(tuple, &memtuples[i]) >= 0)
+ if (COMPARETUP(state, tuple, &memtuples[i]) >= 0)
break;
memtuples[j] = memtuples[i];
j = i;
}
/*
- * The tuple at state->memtuples[0] has been removed from the heap.
- * Decrement memtupcount, and sift up to maintain the heap invariant.
+ * Remove the tuple at state->memtuples[0] from the heap. Decrement
+ * memtupcount, and sift up to maintain the heap invariant.
+ *
+ * The caller has already free'd the tuple the top node points to,
+ * if necessary.
*/
static void
-tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex)
+tuplesort_heap_delete_top(Tuplesortstate *state)
{
SortTuple *memtuples = state->memtuples;
SortTuple *tuple;
- int i,
- n;
if (--state->memtupcount <= 0)
return;
+ /*
+ * Remove the last tuple in the heap, and re-insert it, by replacing the
+ * current top node with it.
+ */
+ tuple = &memtuples[state->memtupcount];
+ tuplesort_heap_replace_top(state, tuple);
+}
+
+/*
+ * Replace the tuple at state->memtuples[0] with a new tuple. Sift up to
+ * maintain the heap invariant.
+ *
+ * This corresponds to Knuth's "sift-up" algorithm (Algorithm 5.2.3H,
+ * Heapsort, steps H3-H8).
+ */
+static void
+tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple)
+{
+ SortTuple *memtuples = state->memtuples;
+ unsigned int i,
+ n;
+
+ Assert(state->memtupcount >= 1);
+
CHECK_FOR_INTERRUPTS();
+ /*
+ * state->memtupcount is "int", but we use "unsigned int" for i, j, n.
+ * This prevents overflow in the "2 * i + 1" calculation, since at the top
+ * of the loop we must have i < n <= INT_MAX <= UINT_MAX/2.
+ */
n = state->memtupcount;
- tuple = &memtuples[n]; /* tuple that must be reinserted */
i = 0; /* i is where the "hole" is */
for (;;)
{
- int j = 2 * i + 1;
+ unsigned int j = 2 * i + 1;
if (j >= n)
break;
if (j + 1 < n &&
- HEAPCOMPARE(&memtuples[j], &memtuples[j + 1]) > 0)
+ COMPARETUP(state, &memtuples[j], &memtuples[j + 1]) > 0)
j++;
- if (HEAPCOMPARE(tuple, &memtuples[j]) <= 0)
+ if (COMPARETUP(state, tuple, &memtuples[j]) <= 0)
break;
memtuples[i] = memtuples[j];
i = j;
memtuples[i] = *tuple;
}
+/*
+ * Function to reverse the sort direction from its current state
+ *
+ * It is not safe to call this when performing hash tuplesorts
+ */
+static void
+reversedirection(Tuplesortstate *state)
+{
+ SortSupport sortKey = state->sortKeys;
+ int nkey;
+
+ for (nkey = 0; nkey < state->nKeys; nkey++, sortKey++)
+ {
+ sortKey->ssup_reverse = !sortKey->ssup_reverse;
+ sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first;
+ }
+}
+
/*
* Tape interface routines
LogicalTapeWrite(state->tapeset, tapenum, (void *) &len, sizeof(len));
}
-
/*
- * Inline-able copy of FunctionCall2Coll() to save some cycles in sorting.
+ * Get memory for tuple from within READTUP() routine.
+ *
+ * We use next free slot from the slab allocator, or palloc() if the tuple
+ * is too large for that.
*/
-static inline Datum
-myFunctionCall2Coll(FmgrInfo *flinfo, Oid collation, Datum arg1, Datum arg2)
+static void *
+readtup_alloc(Tuplesortstate *state, Size tuplen)
{
- FunctionCallInfoData fcinfo;
- Datum result;
-
- InitFunctionCallInfoData(fcinfo, flinfo, 2, collation, NULL, NULL);
-
- fcinfo.arg[0] = arg1;
- fcinfo.arg[1] = arg2;
- fcinfo.argnull[0] = false;
- fcinfo.argnull[1] = false;
-
- result = FunctionCallInvoke(&fcinfo);
-
- /* Check for null result, since caller is clearly not expecting one */
- if (fcinfo.isnull)
- elog(ERROR, "function %u returned NULL", fcinfo.flinfo->fn_oid);
+ SlabSlot *buf;
- return result;
-}
-
-/*
- * Apply a sort function (by now converted to fmgr lookup form)
- * and return a 3-way comparison result. This takes care of handling
- * reverse-sort and NULLs-ordering properly. We assume that DESC and
- * NULLS_FIRST options are encoded in sk_flags the same way btree does it.
- */
-static inline int32
-inlineApplySortFunction(FmgrInfo *sortFunction, int sk_flags, Oid collation,
- Datum datum1, bool isNull1,
- Datum datum2, bool isNull2)
-{
- int32 compare;
+ /*
+ * We pre-allocate enough slots in the slab arena that we should never run
+ * out.
+ */
+ Assert(state->slabFreeHead);
- if (isNull1)
- {
- if (isNull2)
- compare = 0; /* NULL "=" NULL */
- else if (sk_flags & SK_BT_NULLS_FIRST)
- compare = -1; /* NULL "<" NOT_NULL */
- else
- compare = 1; /* NULL ">" NOT_NULL */
- }
- else if (isNull2)
- {
- if (sk_flags & SK_BT_NULLS_FIRST)
- compare = 1; /* NOT_NULL ">" NULL */
- else
- compare = -1; /* NOT_NULL "<" NULL */
- }
+ if (tuplen > SLAB_SLOT_SIZE || !state->slabFreeHead)
+ return MemoryContextAlloc(state->sortcontext, tuplen);
else
{
- compare = DatumGetInt32(myFunctionCall2Coll(sortFunction, collation,
- datum1, datum2));
+ buf = state->slabFreeHead;
+ /* Reuse this slot */
+ state->slabFreeHead = buf->nextfree;
- if (sk_flags & SK_BT_DESC)
- compare = -compare;
+ return buf;
}
-
- return compare;
}
TupleDesc tupDesc;
int nkey;
int32 compare;
+ AttrNumber attno;
+ Datum datum1,
+ datum2;
+ bool isnull1,
+ isnull2;
+
/* Compare the leading sort key */
compare = ApplySortComparator(a->datum1, a->isnull1,
rtup.t_len = ((MinimalTuple) b->tuple)->t_len + MINIMAL_TUPLE_OFFSET;
rtup.t_data = (HeapTupleHeader) ((char *) b->tuple - MINIMAL_TUPLE_OFFSET);
tupDesc = state->tupDesc;
+
+ if (sortKey->abbrev_converter)
+ {
+ attno = sortKey->ssup_attno;
+
+ datum1 = heap_getattr(<up, attno, tupDesc, &isnull1);
+ datum2 = heap_getattr(&rtup, attno, tupDesc, &isnull2);
+
+ compare = ApplySortAbbrevFullComparator(datum1, isnull1,
+ datum2, isnull2,
+ sortKey);
+ if (compare != 0)
+ return compare;
+ }
+
sortKey++;
for (nkey = 1; nkey < state->nKeys; nkey++, sortKey++)
{
- AttrNumber attno = sortKey->ssup_attno;
- Datum datum1,
- datum2;
- bool isnull1,
- isnull2;
+ attno = sortKey->ssup_attno;
datum1 = heap_getattr(<up, attno, tupDesc, &isnull1);
datum2 = heap_getattr(&rtup, attno, tupDesc, &isnull2);
* MinimalTuple using the exported interface for that.
*/
TupleTableSlot *slot = (TupleTableSlot *) tup;
+ Datum original;
MinimalTuple tuple;
HeapTupleData htup;
+ MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
/* copy the tuple into sort storage */
tuple = ExecCopySlotMinimalTuple(slot);
/* set up first-column key value */
htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET;
htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET);
- stup->datum1 = heap_getattr(&htup,
- state->sortKeys[0].ssup_attno,
- state->tupDesc,
- &stup->isnull1);
+ original = heap_getattr(&htup,
+ state->sortKeys[0].ssup_attno,
+ state->tupDesc,
+ &stup->isnull1);
+
+ MemoryContextSwitchTo(oldcontext);
+
+ if (!state->sortKeys->abbrev_converter || stup->isnull1)
+ {
+ /*
+ * Store ordinary Datum representation, or NULL value. If there is a
+ * converter it won't expect NULL values, and cost model is not
+ * required to account for NULL, so in that case we avoid calling
+ * converter and just set datum1 to zeroed representation (to be
+ * consistent, and to support cheap inequality tests for NULL
+ * abbreviated keys).
+ */
+ stup->datum1 = original;
+ }
+ else if (!consider_abort_common(state))
+ {
+ /* Store abbreviated key representation */
+ stup->datum1 = state->sortKeys->abbrev_converter(original,
+ state->sortKeys);
+ }
+ else
+ {
+ /* Abort abbreviation */
+ int i;
+
+ stup->datum1 = original;
+
+ /*
+ * Set state to be consistent with never trying abbreviation.
+ *
+ * Alter datum1 representation in already-copied tuples, so as to
+ * ensure a consistent representation (current tuple was just
+ * handled). It does not matter if some dumped tuples are already
+ * sorted on tape, since serialized tuples lack abbreviated keys
+ * (TSS_BUILDRUNS state prevents control reaching here in any case).
+ */
+ for (i = 0; i < state->memtupcount; i++)
+ {
+ SortTuple *mtup = &state->memtuples[i];
+
+ htup.t_len = ((MinimalTuple) mtup->tuple)->t_len +
+ MINIMAL_TUPLE_OFFSET;
+ htup.t_data = (HeapTupleHeader) ((char *) mtup->tuple -
+ MINIMAL_TUPLE_OFFSET);
+
+ mtup->datum1 = heap_getattr(&htup,
+ state->sortKeys[0].ssup_attno,
+ state->tupDesc,
+ &mtup->isnull1);
+ }
+ }
}
static void
LogicalTapeWrite(state->tapeset, tapenum,
(void *) &tuplen, sizeof(tuplen));
- FREEMEM(state, GetMemoryChunkSpace(tuple));
- heap_free_minimal_tuple(tuple);
+ if (!state->slabAllocatorUsed)
+ {
+ FREEMEM(state, GetMemoryChunkSpace(tuple));
+ heap_free_minimal_tuple(tuple);
+ }
}
static void
{
unsigned int tupbodylen = len - sizeof(int);
unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET;
- MinimalTuple tuple = (MinimalTuple) palloc(tuplen);
+ MinimalTuple tuple = (MinimalTuple) readtup_alloc(state, tuplen);
char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
HeapTupleData htup;
- USEMEM(state, GetMemoryChunkSpace(tuple));
/* read in the tuple proper */
tuple->t_len = tuplen;
LogicalTapeReadExact(state->tapeset, tapenum,
&stup->isnull1);
}
-static void
-reversedirection_heap(Tuplesortstate *state)
-{
- SortSupport sortKey = state->sortKeys;
- int nkey;
-
- for (nkey = 0; nkey < state->nKeys; nkey++, sortKey++)
- {
- sortKey->ssup_reverse = !sortKey->ssup_reverse;
- sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first;
- }
-}
-
-
/*
* Routines specialized for the CLUSTER case (HeapTuple data, with
* comparisons per a btree index definition)
comparetup_cluster(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state)
{
- ScanKey scanKey = state->indexScanKey;
+ SortSupport sortKey = state->sortKeys;
HeapTuple ltup;
HeapTuple rtup;
TupleDesc tupDesc;
int nkey;
int32 compare;
+ Datum datum1,
+ datum2;
+ bool isnull1,
+ isnull2;
+ AttrNumber leading = state->indexInfo->ii_IndexAttrNumbers[0];
+
+ /* Be prepared to compare additional sort keys */
+ ltup = (HeapTuple) a->tuple;
+ rtup = (HeapTuple) b->tuple;
+ tupDesc = state->tupDesc;
/* Compare the leading sort key, if it's simple */
- if (state->indexInfo->ii_KeyAttrNumbers[0] != 0)
+ if (leading != 0)
{
- compare = inlineApplySortFunction(&scanKey->sk_func, scanKey->sk_flags,
- scanKey->sk_collation,
- a->datum1, a->isnull1,
- b->datum1, b->isnull1);
+ compare = ApplySortComparator(a->datum1, a->isnull1,
+ b->datum1, b->isnull1,
+ sortKey);
+ if (compare != 0)
+ return compare;
+
+ if (sortKey->abbrev_converter)
+ {
+ datum1 = heap_getattr(ltup, leading, tupDesc, &isnull1);
+ datum2 = heap_getattr(rtup, leading, tupDesc, &isnull2);
+
+ compare = ApplySortAbbrevFullComparator(datum1, isnull1,
+ datum2, isnull2,
+ sortKey);
+ }
if (compare != 0 || state->nKeys == 1)
return compare;
/* Compare additional columns the hard way */
- scanKey++;
+ sortKey++;
nkey = 1;
}
else
nkey = 0;
}
- /* Compare additional sort keys */
- ltup = (HeapTuple) a->tuple;
- rtup = (HeapTuple) b->tuple;
-
if (state->indexInfo->ii_Expressions == NULL)
{
/* If not expression index, just compare the proper heap attrs */
- tupDesc = state->tupDesc;
- for (; nkey < state->nKeys; nkey++, scanKey++)
+ for (; nkey < state->nKeys; nkey++, sortKey++)
{
- AttrNumber attno = state->indexInfo->ii_KeyAttrNumbers[nkey];
- Datum datum1,
- datum2;
- bool isnull1,
- isnull2;
+ AttrNumber attno = state->indexInfo->ii_IndexAttrNumbers[nkey];
datum1 = heap_getattr(ltup, attno, tupDesc, &isnull1);
datum2 = heap_getattr(rtup, attno, tupDesc, &isnull2);
- compare = inlineApplySortFunction(&scanKey->sk_func,
- scanKey->sk_flags,
- scanKey->sk_collation,
- datum1, isnull1,
- datum2, isnull2);
+ compare = ApplySortComparator(datum1, isnull1,
+ datum2, isnull2,
+ sortKey);
if (compare != 0)
return compare;
}
ecxt_scantuple = GetPerTupleExprContext(state->estate)->ecxt_scantuple;
- ExecStoreTuple(ltup, ecxt_scantuple, InvalidBuffer, false);
+ ExecStoreHeapTuple(ltup, ecxt_scantuple, false);
FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate,
l_index_values, l_index_isnull);
- ExecStoreTuple(rtup, ecxt_scantuple, InvalidBuffer, false);
+ ExecStoreHeapTuple(rtup, ecxt_scantuple, false);
FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate,
r_index_values, r_index_isnull);
- for (; nkey < state->nKeys; nkey++, scanKey++)
+ for (; nkey < state->nKeys; nkey++, sortKey++)
{
- compare = inlineApplySortFunction(&scanKey->sk_func,
- scanKey->sk_flags,
- scanKey->sk_collation,
- l_index_values[nkey],
- l_index_isnull[nkey],
- r_index_values[nkey],
- r_index_isnull[nkey]);
+ compare = ApplySortComparator(l_index_values[nkey],
+ l_index_isnull[nkey],
+ r_index_values[nkey],
+ r_index_isnull[nkey],
+ sortKey);
if (compare != 0)
return compare;
}
copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup)
{
HeapTuple tuple = (HeapTuple) tup;
+ Datum original;
+ MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
/* copy the tuple into sort storage */
tuple = heap_copytuple(tuple);
stup->tuple = (void *) tuple;
USEMEM(state, GetMemoryChunkSpace(tuple));
- /* set up first-column key value, if it's a simple column */
- if (state->indexInfo->ii_KeyAttrNumbers[0] != 0)
- stup->datum1 = heap_getattr(tuple,
- state->indexInfo->ii_KeyAttrNumbers[0],
- state->tupDesc,
- &stup->isnull1);
+
+ MemoryContextSwitchTo(oldcontext);
+
+ /*
+ * set up first-column key value, and potentially abbreviate, if it's a
+ * simple column
+ */
+ if (state->indexInfo->ii_IndexAttrNumbers[0] == 0)
+ return;
+
+ original = heap_getattr(tuple,
+ state->indexInfo->ii_IndexAttrNumbers[0],
+ state->tupDesc,
+ &stup->isnull1);
+
+ if (!state->sortKeys->abbrev_converter || stup->isnull1)
+ {
+ /*
+ * Store ordinary Datum representation, or NULL value. If there is a
+ * converter it won't expect NULL values, and cost model is not
+ * required to account for NULL, so in that case we avoid calling
+ * converter and just set datum1 to zeroed representation (to be
+ * consistent, and to support cheap inequality tests for NULL
+ * abbreviated keys).
+ */
+ stup->datum1 = original;
+ }
+ else if (!consider_abort_common(state))
+ {
+ /* Store abbreviated key representation */
+ stup->datum1 = state->sortKeys->abbrev_converter(original,
+ state->sortKeys);
+ }
+ else
+ {
+ /* Abort abbreviation */
+ int i;
+
+ stup->datum1 = original;
+
+ /*
+ * Set state to be consistent with never trying abbreviation.
+ *
+ * Alter datum1 representation in already-copied tuples, so as to
+ * ensure a consistent representation (current tuple was just
+ * handled). It does not matter if some dumped tuples are already
+ * sorted on tape, since serialized tuples lack abbreviated keys
+ * (TSS_BUILDRUNS state prevents control reaching here in any case).
+ */
+ for (i = 0; i < state->memtupcount; i++)
+ {
+ SortTuple *mtup = &state->memtuples[i];
+
+ tuple = (HeapTuple) mtup->tuple;
+ mtup->datum1 = heap_getattr(tuple,
+ state->indexInfo->ii_IndexAttrNumbers[0],
+ state->tupDesc,
+ &mtup->isnull1);
+ }
+ }
}
static void
LogicalTapeWrite(state->tapeset, tapenum,
&tuplen, sizeof(tuplen));
- FREEMEM(state, GetMemoryChunkSpace(tuple));
- heap_freetuple(tuple);
+ if (!state->slabAllocatorUsed)
+ {
+ FREEMEM(state, GetMemoryChunkSpace(tuple));
+ heap_freetuple(tuple);
+ }
}
static void
int tapenum, unsigned int tuplen)
{
unsigned int t_len = tuplen - sizeof(ItemPointerData) - sizeof(int);
- HeapTuple tuple = (HeapTuple) palloc(t_len + HEAPTUPLESIZE);
+ HeapTuple tuple = (HeapTuple) readtup_alloc(state,
+ t_len + HEAPTUPLESIZE);
- USEMEM(state, GetMemoryChunkSpace(tuple));
/* Reconstruct the HeapTupleData header */
tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
tuple->t_len = t_len;
&tuplen, sizeof(tuplen));
stup->tuple = (void *) tuple;
/* set up first-column key value, if it's a simple column */
- if (state->indexInfo->ii_KeyAttrNumbers[0] != 0)
+ if (state->indexInfo->ii_IndexAttrNumbers[0] != 0)
stup->datum1 = heap_getattr(tuple,
- state->indexInfo->ii_KeyAttrNumbers[0],
+ state->indexInfo->ii_IndexAttrNumbers[0],
state->tupDesc,
&stup->isnull1);
}
-
/*
* Routines specialized for IndexTuple case
*
Tuplesortstate *state)
{
/*
- * This is similar to _bt_tuplecompare(), but we have already done the
- * index_getattr calls for the first column, and we need to keep track of
- * whether any null fields are present. Also see the special treatment
- * for equal keys at the end.
+ * This is similar to comparetup_heap(), but expects index tuples. There
+ * is also special handling for enforcing uniqueness, and special
+ * treatment for equal keys at the end.
*/
- ScanKey scanKey = state->indexScanKey;
+ SortSupport sortKey = state->sortKeys;
IndexTuple tuple1;
IndexTuple tuple2;
int keysz;
bool equal_hasnull = false;
int nkey;
int32 compare;
+ Datum datum1,
+ datum2;
+ bool isnull1,
+ isnull2;
+
/* Compare the leading sort key */
- compare = inlineApplySortFunction(&scanKey->sk_func, scanKey->sk_flags,
- scanKey->sk_collation,
- a->datum1, a->isnull1,
- b->datum1, b->isnull1);
+ compare = ApplySortComparator(a->datum1, a->isnull1,
+ b->datum1, b->isnull1,
+ sortKey);
if (compare != 0)
return compare;
- /* they are equal, so we only need to examine one null flag */
- if (a->isnull1)
- equal_hasnull = true;
-
/* Compare additional sort keys */
tuple1 = (IndexTuple) a->tuple;
tuple2 = (IndexTuple) b->tuple;
keysz = state->nKeys;
tupDes = RelationGetDescr(state->indexRel);
- scanKey++;
- for (nkey = 2; nkey <= keysz; nkey++, scanKey++)
+
+ if (sortKey->abbrev_converter)
{
- Datum datum1,
- datum2;
- bool isnull1,
- isnull2;
+ datum1 = index_getattr(tuple1, 1, tupDes, &isnull1);
+ datum2 = index_getattr(tuple2, 1, tupDes, &isnull2);
+
+ compare = ApplySortAbbrevFullComparator(datum1, isnull1,
+ datum2, isnull2,
+ sortKey);
+ if (compare != 0)
+ return compare;
+ }
+ /* they are equal, so we only need to examine one null flag */
+ if (a->isnull1)
+ equal_hasnull = true;
+
+ sortKey++;
+ for (nkey = 2; nkey <= keysz; nkey++, sortKey++)
+ {
datum1 = index_getattr(tuple1, nkey, tupDes, &isnull1);
datum2 = index_getattr(tuple2, nkey, tupDes, &isnull2);
- compare = inlineApplySortFunction(&scanKey->sk_func, scanKey->sk_flags,
- scanKey->sk_collation,
- datum1, isnull1,
- datum2, isnull2);
+ compare = ApplySortComparator(datum1, isnull1,
+ datum2, isnull2,
+ sortKey);
if (compare != 0)
return compare; /* done when we find unequal attributes */
{
Datum values[INDEX_MAX_KEYS];
bool isnull[INDEX_MAX_KEYS];
+ char *key_desc;
/*
* Some rather brain-dead implementations of qsort (such as the one in
Assert(tuple1 != tuple2);
index_deform_tuple(tuple1, tupDes, values, isnull);
+
+ key_desc = BuildIndexValueDescription(state->indexRel, values, isnull);
+
ereport(ERROR,
(errcode(ERRCODE_UNIQUE_VIOLATION),
errmsg("could not create unique index \"%s\"",
RelationGetRelationName(state->indexRel)),
- errdetail("Key %s is duplicated.",
- BuildIndexValueDescription(state->indexRel,
- values, isnull)),
+ key_desc ? errdetail("Key %s is duplicated.", key_desc) :
+ errdetail("Duplicate keys exist."),
errtableconstraint(state->heapRel,
- RelationGetRelationName(state->indexRel))));
+ RelationGetRelationName(state->indexRel))));
}
/*
- * If key values are equal, we sort on ItemPointer. This does not affect
- * validity of the finished index, but it may be useful to have index
- * scans in physical order.
+ * If key values are equal, we sort on ItemPointer. This is required for
+ * btree indexes, since heap TID is treated as an implicit last key
+ * attribute in order to ensure that all keys in the index are physically
+ * unique.
*/
{
BlockNumber blk1 = ItemPointerGetBlockNumber(&tuple1->t_tid);
return (pos1 < pos2) ? -1 : 1;
}
+ /* ItemPointer values should never be equal */
+ Assert(false);
+
return 0;
}
comparetup_index_hash(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state)
{
- uint32 hash1;
- uint32 hash2;
+ Bucket bucket1;
+ Bucket bucket2;
IndexTuple tuple1;
IndexTuple tuple2;
* that the first column of the index tuple is the hash key.
*/
Assert(!a->isnull1);
- hash1 = DatumGetUInt32(a->datum1) & state->hash_mask;
+ bucket1 = _hash_hashkey2bucket(DatumGetUInt32(a->datum1),
+ state->max_buckets, state->high_mask,
+ state->low_mask);
Assert(!b->isnull1);
- hash2 = DatumGetUInt32(b->datum1) & state->hash_mask;
-
- if (hash1 > hash2)
+ bucket2 = _hash_hashkey2bucket(DatumGetUInt32(b->datum1),
+ state->max_buckets, state->high_mask,
+ state->low_mask);
+ if (bucket1 > bucket2)
return 1;
- else if (hash1 < hash2)
+ else if (bucket1 < bucket2)
return -1;
/*
return (pos1 < pos2) ? -1 : 1;
}
+ /* ItemPointer values should never be equal */
+ Assert(false);
+
return 0;
}
IndexTuple tuple = (IndexTuple) tup;
unsigned int tuplen = IndexTupleSize(tuple);
IndexTuple newtuple;
+ Datum original;
/* copy the tuple into sort storage */
- newtuple = (IndexTuple) palloc(tuplen);
+ newtuple = (IndexTuple) MemoryContextAlloc(state->tuplecontext, tuplen);
memcpy(newtuple, tuple, tuplen);
USEMEM(state, GetMemoryChunkSpace(newtuple));
stup->tuple = (void *) newtuple;
/* set up first-column key value */
- stup->datum1 = index_getattr(newtuple,
- 1,
- RelationGetDescr(state->indexRel),
- &stup->isnull1);
+ original = index_getattr(newtuple,
+ 1,
+ RelationGetDescr(state->indexRel),
+ &stup->isnull1);
+
+ if (!state->sortKeys->abbrev_converter || stup->isnull1)
+ {
+ /*
+ * Store ordinary Datum representation, or NULL value. If there is a
+ * converter it won't expect NULL values, and cost model is not
+ * required to account for NULL, so in that case we avoid calling
+ * converter and just set datum1 to zeroed representation (to be
+ * consistent, and to support cheap inequality tests for NULL
+ * abbreviated keys).
+ */
+ stup->datum1 = original;
+ }
+ else if (!consider_abort_common(state))
+ {
+ /* Store abbreviated key representation */
+ stup->datum1 = state->sortKeys->abbrev_converter(original,
+ state->sortKeys);
+ }
+ else
+ {
+ /* Abort abbreviation */
+ int i;
+
+ stup->datum1 = original;
+
+ /*
+ * Set state to be consistent with never trying abbreviation.
+ *
+ * Alter datum1 representation in already-copied tuples, so as to
+ * ensure a consistent representation (current tuple was just
+ * handled). It does not matter if some dumped tuples are already
+ * sorted on tape, since serialized tuples lack abbreviated keys
+ * (TSS_BUILDRUNS state prevents control reaching here in any case).
+ */
+ for (i = 0; i < state->memtupcount; i++)
+ {
+ SortTuple *mtup = &state->memtuples[i];
+
+ tuple = (IndexTuple) mtup->tuple;
+ mtup->datum1 = index_getattr(tuple,
+ 1,
+ RelationGetDescr(state->indexRel),
+ &mtup->isnull1);
+ }
+ }
}
static void
LogicalTapeWrite(state->tapeset, tapenum,
(void *) &tuplen, sizeof(tuplen));
- FREEMEM(state, GetMemoryChunkSpace(tuple));
- pfree(tuple);
+ if (!state->slabAllocatorUsed)
+ {
+ FREEMEM(state, GetMemoryChunkSpace(tuple));
+ pfree(tuple);
+ }
}
static void
int tapenum, unsigned int len)
{
unsigned int tuplen = len - sizeof(unsigned int);
- IndexTuple tuple = (IndexTuple) palloc(tuplen);
+ IndexTuple tuple = (IndexTuple) readtup_alloc(state, tuplen);
- USEMEM(state, GetMemoryChunkSpace(tuple));
LogicalTapeReadExact(state->tapeset, tapenum,
tuple, tuplen);
if (state->randomAccess) /* need trailing length word? */
&stup->isnull1);
}
-static void
-reversedirection_index_btree(Tuplesortstate *state)
-{
- ScanKey scanKey = state->indexScanKey;
- int nkey;
-
- for (nkey = 0; nkey < state->nKeys; nkey++, scanKey++)
- {
- scanKey->sk_flags ^= (SK_BT_DESC | SK_BT_NULLS_FIRST);
- }
-}
-
-static void
-reversedirection_index_hash(Tuplesortstate *state)
-{
- /* We don't support reversing direction in a hash index sort */
- elog(ERROR, "reversedirection_index_hash is not implemented");
-}
-
-
/*
* Routines specialized for DatumTuple case
*/
static int
comparetup_datum(const SortTuple *a, const SortTuple *b, Tuplesortstate *state)
{
- return ApplySortComparator(a->datum1, a->isnull1,
- b->datum1, b->isnull1,
- state->onlyKey);
+ int compare;
+
+ compare = ApplySortComparator(a->datum1, a->isnull1,
+ b->datum1, b->isnull1,
+ state->sortKeys);
+ if (compare != 0)
+ return compare;
+
+ /* if we have abbreviations, then "tuple" has the original value */
+
+ if (state->sortKeys->abbrev_converter)
+ compare = ApplySortAbbrevFullComparator(PointerGetDatum(a->tuple), a->isnull1,
+ PointerGetDatum(b->tuple), b->isnull1,
+ state->sortKeys);
+
+ return compare;
}
static void
waddr = NULL;
tuplen = 0;
}
- else if (state->datumTypeByVal)
+ else if (!state->tuples)
{
waddr = &stup->datum1;
tuplen = sizeof(Datum);
}
else
{
- waddr = DatumGetPointer(stup->datum1);
- tuplen = datumGetSize(stup->datum1, false, state->datumTypeLen);
+ waddr = stup->tuple;
+ tuplen = datumGetSize(PointerGetDatum(stup->tuple), false, state->datumTypeLen);
Assert(tuplen != 0);
}
LogicalTapeWrite(state->tapeset, tapenum,
(void *) &writtenlen, sizeof(writtenlen));
- if (stup->tuple)
+ if (!state->slabAllocatorUsed && stup->tuple)
{
FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
pfree(stup->tuple);
stup->isnull1 = true;
stup->tuple = NULL;
}
- else if (state->datumTypeByVal)
+ else if (!state->tuples)
{
Assert(tuplen == sizeof(Datum));
LogicalTapeReadExact(state->tapeset, tapenum,
}
else
{
- void *raddr = palloc(tuplen);
+ void *raddr = readtup_alloc(state, tuplen);
LogicalTapeReadExact(state->tapeset, tapenum,
raddr, tuplen);
stup->datum1 = PointerGetDatum(raddr);
stup->isnull1 = false;
stup->tuple = raddr;
- USEMEM(state, GetMemoryChunkSpace(raddr));
}
if (state->randomAccess) /* need trailing length word? */
&tuplen, sizeof(tuplen));
}
+/*
+ * Parallel sort routines
+ */
+
+/*
+ * tuplesort_estimate_shared - estimate required shared memory allocation
+ *
+ * nWorkers is an estimate of the number of workers (it's the number that
+ * will be requested).
+ */
+Size
+tuplesort_estimate_shared(int nWorkers)
+{
+ Size tapesSize;
+
+ Assert(nWorkers > 0);
+
+ /* Make sure that BufFile shared state is MAXALIGN'd */
+ tapesSize = mul_size(sizeof(TapeShare), nWorkers);
+ tapesSize = MAXALIGN(add_size(tapesSize, offsetof(Sharedsort, tapes)));
+
+ return tapesSize;
+}
+
+/*
+ * tuplesort_initialize_shared - initialize shared tuplesort state
+ *
+ * Must be called from leader process before workers are launched, to
+ * establish state needed up-front for worker tuplesortstates. nWorkers
+ * should match the argument passed to tuplesort_estimate_shared().
+ */
+void
+tuplesort_initialize_shared(Sharedsort *shared, int nWorkers, dsm_segment *seg)
+{
+ int i;
+
+ Assert(nWorkers > 0);
+
+ SpinLockInit(&shared->mutex);
+ shared->currentWorker = 0;
+ shared->workersFinished = 0;
+ SharedFileSetInit(&shared->fileset, seg);
+ shared->nTapes = nWorkers;
+ for (i = 0; i < nWorkers; i++)
+ {
+ shared->tapes[i].firstblocknumber = 0L;
+ }
+}
+
+/*
+ * tuplesort_attach_shared - attach to shared tuplesort state
+ *
+ * Must be called by all worker processes.
+ */
+void
+tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg)
+{
+ /* Attach to SharedFileSet */
+ SharedFileSetAttach(&shared->fileset, seg);
+}
+
+/*
+ * worker_get_identifier - Assign and return ordinal identifier for worker
+ *
+ * The order in which these are assigned is not well defined, and should not
+ * matter; worker numbers across parallel sort participants need only be
+ * distinct and gapless. logtape.c requires this.
+ *
+ * Note that the identifiers assigned from here have no relation to
+ * ParallelWorkerNumber number, to avoid making any assumption about
+ * caller's requirements. However, we do follow the ParallelWorkerNumber
+ * convention of representing a non-worker with worker number -1. This
+ * includes the leader, as well as serial Tuplesort processes.
+ */
+static int
+worker_get_identifier(Tuplesortstate *state)
+{
+ Sharedsort *shared = state->shared;
+ int worker;
+
+ Assert(WORKER(state));
+
+ SpinLockAcquire(&shared->mutex);
+ worker = shared->currentWorker++;
+ SpinLockRelease(&shared->mutex);
+
+ return worker;
+}
+
+/*
+ * worker_freeze_result_tape - freeze worker's result tape for leader
+ *
+ * This is called by workers just after the result tape has been determined,
+ * instead of calling LogicalTapeFreeze() directly. They do so because
+ * workers require a few additional steps over similar serial
+ * TSS_SORTEDONTAPE external sort cases, which also happen here. The extra
+ * steps are around freeing now unneeded resources, and representing to
+ * leader that worker's input run is available for its merge.
+ *
+ * There should only be one final output run for each worker, which consists
+ * of all tuples that were originally input into worker.
+ */
static void
-reversedirection_datum(Tuplesortstate *state)
+worker_freeze_result_tape(Tuplesortstate *state)
{
- state->onlyKey->ssup_reverse = !state->onlyKey->ssup_reverse;
- state->onlyKey->ssup_nulls_first = !state->onlyKey->ssup_nulls_first;
+ Sharedsort *shared = state->shared;
+ TapeShare output;
+
+ Assert(WORKER(state));
+ Assert(state->result_tape != -1);
+ Assert(state->memtupcount == 0);
+
+ /*
+ * Free most remaining memory, in case caller is sensitive to our holding
+ * on to it. memtuples may not be a tiny merge heap at this point.
+ */
+ pfree(state->memtuples);
+ /* Be tidy */
+ state->memtuples = NULL;
+ state->memtupsize = 0;
+
+ /*
+ * Parallel worker requires result tape metadata, which is to be stored in
+ * shared memory for leader
+ */
+ LogicalTapeFreeze(state->tapeset, state->result_tape, &output);
+
+ /* Store properties of output tape, and update finished worker count */
+ SpinLockAcquire(&shared->mutex);
+ shared->tapes[state->worker] = output;
+ shared->workersFinished++;
+ SpinLockRelease(&shared->mutex);
+}
+
+/*
+ * worker_nomergeruns - dump memtuples in worker, without merging
+ *
+ * This called as an alternative to mergeruns() with a worker when no
+ * merging is required.
+ */
+static void
+worker_nomergeruns(Tuplesortstate *state)
+{
+ Assert(WORKER(state));
+ Assert(state->result_tape == -1);
+
+ state->result_tape = state->tp_tapenum[state->destTape];
+ worker_freeze_result_tape(state);
+}
+
+/*
+ * leader_takeover_tapes - create tapeset for leader from worker tapes
+ *
+ * So far, leader Tuplesortstate has performed no actual sorting. By now, all
+ * sorting has occurred in workers, all of which must have already returned
+ * from tuplesort_performsort().
+ *
+ * When this returns, leader process is left in a state that is virtually
+ * indistinguishable from it having generated runs as a serial external sort
+ * might have.
+ */
+static void
+leader_takeover_tapes(Tuplesortstate *state)
+{
+ Sharedsort *shared = state->shared;
+ int nParticipants = state->nParticipants;
+ int workersFinished;
+ int j;
+
+ Assert(LEADER(state));
+ Assert(nParticipants >= 1);
+
+ SpinLockAcquire(&shared->mutex);
+ workersFinished = shared->workersFinished;
+ SpinLockRelease(&shared->mutex);
+
+ if (nParticipants != workersFinished)
+ elog(ERROR, "cannot take over tapes before all workers finish");
+
+ /*
+ * Create the tapeset from worker tapes, including a leader-owned tape at
+ * the end. Parallel workers are far more expensive than logical tapes,
+ * so the number of tapes allocated here should never be excessive.
+ *
+ * We still have a leader tape, though it's not possible to write to it
+ * due to restrictions in the shared fileset infrastructure used by
+ * logtape.c. It will never be written to in practice because
+ * randomAccess is disallowed for parallel sorts.
+ */
+ inittapestate(state, nParticipants + 1);
+ state->tapeset = LogicalTapeSetCreate(nParticipants + 1, shared->tapes,
+ &shared->fileset, state->worker);
+
+ /* mergeruns() relies on currentRun for # of runs (in one-pass cases) */
+ state->currentRun = nParticipants;
+
+ /*
+ * Initialize variables of Algorithm D to be consistent with runs from
+ * workers having been generated in the leader.
+ *
+ * There will always be exactly 1 run per worker, and exactly one input
+ * tape per run, because workers always output exactly 1 run, even when
+ * there were no input tuples for workers to sort.
+ */
+ for (j = 0; j < state->maxTapes; j++)
+ {
+ /* One real run; no dummy runs for worker tapes */
+ state->tp_fib[j] = 1;
+ state->tp_runs[j] = 1;
+ state->tp_dummy[j] = 0;
+ state->tp_tapenum[j] = j;
+ }
+ /* Leader tape gets one dummy run, and no real runs */
+ state->tp_fib[state->tapeRange] = 0;
+ state->tp_runs[state->tapeRange] = 0;
+ state->tp_dummy[state->tapeRange] = 1;
+
+ state->Level = 1;
+ state->destTape = 0;
+
+ state->status = TSS_BUILDRUNS;
}
/*