* See Knuth, volume 3, for more than you want to know about the external
* sorting algorithm. Historically, we divided the input into sorted runs
* using replacement selection, in the form of a priority tree implemented
- * as a heap (essentially his Algorithm 5.2.3H), but now we only do that
- * for the first run, and only if the run would otherwise end up being very
- * short. We merge the runs using polyphase merge, Knuth's Algorithm
- * 5.4.2D. The logical "tapes" used by Algorithm D are implemented by
- * logtape.c, which avoids space wastage by recycling disk space as soon
- * as each block is read from its "tape".
- *
- * We do not use Knuth's recommended data structure (Algorithm 5.4.1R) for
- * the replacement selection, because it uses a fixed number of records
- * in memory at all times. Since we are dealing with tuples that may vary
- * considerably in size, we want to be able to vary the number of records
- * kept in memory to ensure full utilization of the allowed sort memory
- * space. So, we keep the tuples in a variable-size heap, with the next
- * record to go out at the top of the heap. Like Algorithm 5.4.1R, each
- * record is stored with the run number that it must go into, and we use
- * (run number, key) as the ordering key for the heap. When the run number
- * at the top of the heap changes, we know that no more records of the prior
- * run are left in the heap. Note that there are in practice only ever two
- * distinct run numbers, because since PostgreSQL 9.6, we only use
- * replacement selection to form the first run.
- *
- * In PostgreSQL 9.6, a heap (based on Knuth's Algorithm H, with some small
- * customizations) is only used with the aim of producing just one run,
- * thereby avoiding all merging. Only the first run can use replacement
- * selection, which is why there are now only two possible valid run
- * numbers, and why heapification is customized to not distinguish between
- * tuples in the second run (those will be quicksorted). We generally
- * prefer a simple hybrid sort-merge strategy, where runs are sorted in much
- * the same way as the entire input of an internal sort is sorted (using
- * qsort()). The replacement_sort_tuples GUC controls the limited remaining
- * use of replacement selection for the first run.
- *
- * There are several reasons to favor a hybrid sort-merge strategy.
- * Maintaining a priority tree/heap has poor CPU cache characteristics.
- * Furthermore, the growth in main memory sizes has greatly diminished the
- * value of having runs that are larger than available memory, even in the
- * case where there is partially sorted input and runs can be made far
- * larger by using a heap. In most cases, a single-pass merge step is all
- * that is required even when runs are no larger than available memory.
- * Avoiding multiple merge passes was traditionally considered to be the
- * major advantage of using replacement selection.
+ * as a heap (essentially his Algorithm 5.2.3H), but now we always use
+ * quicksort for run generation. We merge the runs using polyphase merge,
+ * Knuth's Algorithm 5.4.2D. The logical "tapes" used by Algorithm D are
+ * implemented by logtape.c, which avoids space wastage by recycling disk
+ * space as soon as each block is read from its "tape".
*
* The approximate amount of memory allowed for any one sort operation
* is specified in kilobytes by the caller (most pass work_mem). Initially,
* workMem, we begin to emit tuples into sorted runs in temporary tapes.
* When tuples are dumped in batch after quicksorting, we begin a new run
* with a new output tape (selected per Algorithm D). After the end of the
- * input is reached, we dump out remaining tuples in memory into a final run
- * (or two, when replacement selection is still used), then merge the runs
- * using Algorithm D.
+ * input is reached, we dump out remaining tuples in memory into a final run,
+ * then merge the runs using Algorithm D.
*
* When merging runs, we use a heap containing just the frontmost tuple from
* each source run; we repeatedly output the smallest tuple and replace it
* above. Nonetheless, with large workMem we can have many tapes (but not
* too many -- see the comments in tuplesort_merge_order).
*
+ * This module supports parallel sorting. Parallel sorts involve coordination
+ * among one or more worker processes, and a leader process, each with its own
+ * tuplesort state. The leader process (or, more accurately, the
+ * Tuplesortstate associated with a leader process) creates a full tapeset
+ * consisting of worker tapes with one run to merge; a run for every
+ * worker process. This is then merged. Worker processes are guaranteed to
+ * produce exactly one output run from their partial input.
*
- * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
#include <limits.h>
+#include "access/hash.h"
#include "access/htup_details.h"
#include "access/nbtree.h"
#include "catalog/index.h"
#define DATUM_SORT 2
#define CLUSTER_SORT 3
+/* Sort parallel code from state for sort__start probes */
+#define PARALLEL_SORT(state) ((state)->shared == NULL ? 0 : \
+ (state)->worker >= 0 ? 1 : 2)
+
/* GUC variables */
#ifdef TRACE_SORT
bool trace_sort = false;
* described above. Accordingly, "tuple" is always used in preference to
* datum1 as the authoritative value for pass-by-reference cases.
*
- * While building initial runs, tupindex holds the tuple's run number.
- * Historically, the run number could meaningfully distinguish many runs, but
- * it now only distinguishes RUN_FIRST and HEAP_RUN_NEXT, since replacement
- * selection is always abandoned after the first run; no other run number
- * should be represented here. During merge passes, we re-use it to hold the
- * input tape number that each tuple in the heap was read from. tupindex goes
- * unused if the sort occurs entirely in memory.
+ * tupindex holds the input tape number that each tuple in the heap was read
+ * from during merge passes.
*/
typedef struct
{
#define TAPE_BUFFER_OVERHEAD BLCKSZ
#define MERGE_BUFFER_SIZE (BLCKSZ * 32)
- /*
- * Run numbers, used during external sort operations.
- *
- * HEAP_RUN_NEXT is only used for SortTuple.tupindex, never state.currentRun.
- */
-#define RUN_FIRST 0
-#define HEAP_RUN_NEXT INT_MAX
-#define RUN_SECOND 1
-
typedef int (*SortTupleComparator) (const SortTuple *a, const SortTuple *b,
- Tuplesortstate *state);
+ Tuplesortstate *state);
/*
* Private state of a Tuplesort operation.
* memory space thereby released.
*/
void (*writetup) (Tuplesortstate *state, int tapenum,
- SortTuple *stup);
+ SortTuple *stup);
/*
* Function to read a stored tuple from tape back into memory. 'len' is
* from the slab memory arena, or is palloc'd, see readtup_alloc().
*/
void (*readtup) (Tuplesortstate *state, SortTuple *stup,
- int tapenum, unsigned int len);
+ int tapenum, unsigned int len);
/*
* This array holds the tuples now in sort memory. If we are in state
void *lastReturnedTuple;
/*
- * While building initial runs, this indicates if the replacement
- * selection strategy is in use. When it isn't, then a simple hybrid
- * sort-merge strategy is in use instead (runs are quicksorted).
- */
- bool replaceActive;
-
- /*
- * While building initial runs, this is the current output run number
- * (starting at RUN_FIRST). Afterwards, it is the number of initial runs
- * we made.
+ * While building initial runs, this is the current output run number.
+ * Afterwards, it is the number of initial runs we made.
*/
int currentRun;
int markpos_offset; /* saved "current", or offset in tape block */
bool markpos_eof; /* saved "eof_reached" */
+ /*
+ * These variables are used during parallel sorting.
+ *
+ * worker is our worker identifier. Follows the general convention that
+ * -1 value relates to a leader tuplesort, and values >= 0 worker
+ * tuplesorts. (-1 can also be a serial tuplesort.)
+ *
+ * shared is mutable shared memory state, which is used to coordinate
+ * parallel sorts.
+ *
+ * nParticipants is the number of worker Tuplesortstates known by the
+ * leader to have actually been launched, which implies that they must
+ * finish a run leader can merge. Typically includes a worker state held
+ * by the leader process itself. Set in the leader Tuplesortstate only.
+ */
+ int worker;
+ Sharedsort *shared;
+ int nParticipants;
+
/*
* The sortKeys variable is used by every case other than the hash index
* case; it is set by tuplesort_begin_xxx. tupDesc is only used by the
bool enforceUnique; /* complain if we find duplicate tuples */
/* These are specific to the index_hash subcase: */
- uint32 hash_mask; /* mask for sortable part of hash code */
+ uint32 high_mask; /* masks for sortable part of hash code */
+ uint32 low_mask;
+ uint32 max_buckets;
/*
* These variables are specific to the Datum case; they are set by
#endif
};
+/*
+ * Private mutable state of tuplesort-parallel-operation. This is allocated
+ * in shared memory.
+ */
+struct Sharedsort
+{
+ /* mutex protects all fields prior to tapes */
+ slock_t mutex;
+
+ /*
+ * currentWorker generates ordinal identifier numbers for parallel sort
+ * workers. These start from 0, and are always gapless.
+ *
+ * Workers increment workersFinished to indicate having finished. If this
+ * is equal to state.nParticipants within the leader, leader is ready to
+ * merge worker runs.
+ */
+ int currentWorker;
+ int workersFinished;
+
+ /* Temporary file space */
+ SharedFileSet fileset;
+
+ /* Size of tapes flexible array */
+ int nTapes;
+
+ /*
+ * Tapes array used by workers to report back information needed by the
+ * leader to concatenate all worker tapes into one for merging
+ */
+ TapeShare tapes[FLEXIBLE_ARRAY_MEMBER];
+};
+
/*
* Is the given tuple allocated from the slab memory arena?
*/
#define LACKMEM(state) ((state)->availMem < 0 && !(state)->slabAllocatorUsed)
#define USEMEM(state,amt) ((state)->availMem -= (amt))
#define FREEMEM(state,amt) ((state)->availMem += (amt))
+#define SERIAL(state) ((state)->shared == NULL)
+#define WORKER(state) ((state)->shared && (state)->worker != -1)
+#define LEADER(state) ((state)->shared && (state)->worker == -1)
/*
* NOTES about on-tape representation of tuples:
} while(0)
-static Tuplesortstate *tuplesort_begin_common(int workMem, bool randomAccess);
+static Tuplesortstate *tuplesort_begin_common(int workMem,
+ SortCoordinate coordinate,
+ bool randomAccess);
static void puttuple_common(Tuplesortstate *state, SortTuple *tuple);
static bool consider_abort_common(Tuplesortstate *state);
-static bool useselection(Tuplesortstate *state);
-static void inittapes(Tuplesortstate *state);
+static void inittapes(Tuplesortstate *state, bool mergeruns);
+static void inittapestate(Tuplesortstate *state, int maxTapes);
static void selectnewtape(Tuplesortstate *state);
static void init_slab_allocator(Tuplesortstate *state, int numSlots);
static void mergeruns(Tuplesortstate *state);
static void beginmerge(Tuplesortstate *state);
static bool mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup);
static void dumptuples(Tuplesortstate *state, bool alltuples);
-static void dumpbatch(Tuplesortstate *state, bool alltuples);
static void make_bounded_heap(Tuplesortstate *state);
static void sort_bounded_heap(Tuplesortstate *state);
static void tuplesort_sort_memtuples(Tuplesortstate *state);
-static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple,
- bool checkIndex);
-static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple,
- bool checkIndex);
-static void tuplesort_heap_delete_top(Tuplesortstate *state, bool checkIndex);
+static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple);
+static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple);
+static void tuplesort_heap_delete_top(Tuplesortstate *state);
static void reversedirection(Tuplesortstate *state);
static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
static void markrunend(Tuplesortstate *state, int tapenum);
SortTuple *stup);
static void readtup_datum(Tuplesortstate *state, SortTuple *stup,
int tapenum, unsigned int len);
+static int worker_get_identifier(Tuplesortstate *state);
+static void worker_freeze_result_tape(Tuplesortstate *state);
+static void worker_nomergeruns(Tuplesortstate *state);
+static void leader_takeover_tapes(Tuplesortstate *state);
static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
/*
*/
static Tuplesortstate *
-tuplesort_begin_common(int workMem, bool randomAccess)
+tuplesort_begin_common(int workMem, SortCoordinate coordinate,
+ bool randomAccess)
{
Tuplesortstate *state;
MemoryContext sortcontext;
MemoryContext tuplecontext;
MemoryContext oldcontext;
+ /* See leader_takeover_tapes() remarks on randomAccess support */
+ if (coordinate && randomAccess)
+ elog(ERROR, "random access disallowed under parallel sort");
+
/*
* Create a working memory context for this sort operation. All data
* needed by the sort will live inside this context.
state->bounded = false;
state->tuples = true;
state->boundUsed = false;
- state->allowedMem = workMem * (int64) 1024;
+
+ /*
+ * workMem is forced to be at least 64KB, the current minimum valid value
+ * for the work_mem GUC. This is a defense against parallel sort callers
+ * that divide out memory among many workers in a way that leaves each
+ * with very little memory.
+ */
+ state->allowedMem = Max(workMem, 64) * (int64) 1024;
state->availMem = state->allowedMem;
state->sortcontext = sortcontext;
state->tuplecontext = tuplecontext;
* see comments in grow_memtuples().
*/
state->memtupsize = Max(1024,
- ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1);
+ ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1);
state->growmemtuples = true;
state->slabAllocatorUsed = false;
if (LACKMEM(state))
elog(ERROR, "insufficient memory allowed for sort");
- state->currentRun = RUN_FIRST;
+ state->currentRun = 0;
/*
* maxTapes, tapeRange, and Algorithm D variables will be initialized by
state->result_tape = -1; /* flag that result tape has not been formed */
+ /*
+ * Initialize parallel-related state based on coordination information
+ * from caller
+ */
+ if (!coordinate)
+ {
+ /* Serial sort */
+ state->shared = NULL;
+ state->worker = -1;
+ state->nParticipants = -1;
+ }
+ else if (coordinate->isWorker)
+ {
+ /* Parallel worker produces exactly one final run from all input */
+ state->shared = coordinate->sharedsort;
+ state->worker = worker_get_identifier(state);
+ state->nParticipants = -1;
+ }
+ else
+ {
+ /* Parallel leader state only used for final merge */
+ state->shared = coordinate->sharedsort;
+ state->worker = -1;
+ state->nParticipants = coordinate->nParticipants;
+ Assert(state->nParticipants >= 1);
+ }
+
MemoryContextSwitchTo(oldcontext);
return state;
int nkeys, AttrNumber *attNums,
Oid *sortOperators, Oid *sortCollations,
bool *nullsFirstFlags,
- int workMem, bool randomAccess)
+ int workMem, SortCoordinate coordinate, bool randomAccess)
{
- Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
+ Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+ randomAccess);
MemoryContext oldcontext;
int i;
false, /* no unique check */
nkeys,
workMem,
- randomAccess);
+ randomAccess,
+ PARALLEL_SORT(state));
state->comparetup = comparetup_heap;
state->copytup = copytup_heap;
Tuplesortstate *
tuplesort_begin_cluster(TupleDesc tupDesc,
Relation indexRel,
- int workMem, bool randomAccess)
+ int workMem,
+ SortCoordinate coordinate, bool randomAccess)
{
- Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
- ScanKey indexScanKey;
+ Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+ randomAccess);
+ BTScanInsert indexScanKey;
MemoryContext oldcontext;
int i;
workMem, randomAccess ? 't' : 'f');
#endif
- state->nKeys = RelationGetNumberOfAttributes(indexRel);
+ state->nKeys = IndexRelationGetNumberOfKeyAttributes(indexRel);
TRACE_POSTGRESQL_SORT_START(CLUSTER_SORT,
false, /* no unique check */
state->nKeys,
workMem,
- randomAccess);
+ randomAccess,
+ PARALLEL_SORT(state));
state->comparetup = comparetup_cluster;
state->copytup = copytup_cluster;
state->tupDesc = tupDesc; /* assume we need not copy tupDesc */
- indexScanKey = _bt_mkscankey_nodata(indexRel);
+ indexScanKey = _bt_mkscankey(indexRel, NULL);
if (state->indexInfo->ii_Expressions != NULL)
{
* scantuple has to point to that slot, too.
*/
state->estate = CreateExecutorState();
- slot = MakeSingleTupleTableSlot(tupDesc);
+ slot = MakeSingleTupleTableSlot(tupDesc, &TTSOpsVirtual);
econtext = GetPerTupleExprContext(state->estate);
econtext->ecxt_scantuple = slot;
}
for (i = 0; i < state->nKeys; i++)
{
SortSupport sortKey = state->sortKeys + i;
- ScanKey scanKey = indexScanKey + i;
+ ScanKey scanKey = indexScanKey->scankeys + i;
int16 strategy;
sortKey->ssup_cxt = CurrentMemoryContext;
PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey);
}
- _bt_freeskey(indexScanKey);
+ pfree(indexScanKey);
MemoryContextSwitchTo(oldcontext);
tuplesort_begin_index_btree(Relation heapRel,
Relation indexRel,
bool enforceUnique,
- int workMem, bool randomAccess)
+ int workMem,
+ SortCoordinate coordinate,
+ bool randomAccess)
{
- Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
- ScanKey indexScanKey;
+ Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+ randomAccess);
+ BTScanInsert indexScanKey;
MemoryContext oldcontext;
int i;
workMem, randomAccess ? 't' : 'f');
#endif
- state->nKeys = RelationGetNumberOfAttributes(indexRel);
+ state->nKeys = IndexRelationGetNumberOfKeyAttributes(indexRel);
TRACE_POSTGRESQL_SORT_START(INDEX_SORT,
enforceUnique,
state->nKeys,
workMem,
- randomAccess);
+ randomAccess,
+ PARALLEL_SORT(state));
state->comparetup = comparetup_index_btree;
state->copytup = copytup_index;
state->indexRel = indexRel;
state->enforceUnique = enforceUnique;
- indexScanKey = _bt_mkscankey_nodata(indexRel);
- state->nKeys = RelationGetNumberOfAttributes(indexRel);
+ indexScanKey = _bt_mkscankey(indexRel, NULL);
/* Prepare SortSupport data for each column */
state->sortKeys = (SortSupport) palloc0(state->nKeys *
for (i = 0; i < state->nKeys; i++)
{
SortSupport sortKey = state->sortKeys + i;
- ScanKey scanKey = indexScanKey + i;
+ ScanKey scanKey = indexScanKey->scankeys + i;
int16 strategy;
sortKey->ssup_cxt = CurrentMemoryContext;
PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey);
}
- _bt_freeskey(indexScanKey);
+ pfree(indexScanKey);
MemoryContextSwitchTo(oldcontext);
Tuplesortstate *
tuplesort_begin_index_hash(Relation heapRel,
Relation indexRel,
- uint32 hash_mask,
- int workMem, bool randomAccess)
+ uint32 high_mask,
+ uint32 low_mask,
+ uint32 max_buckets,
+ int workMem,
+ SortCoordinate coordinate,
+ bool randomAccess)
{
- Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
+ Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+ randomAccess);
MemoryContext oldcontext;
oldcontext = MemoryContextSwitchTo(state->sortcontext);
#ifdef TRACE_SORT
if (trace_sort)
elog(LOG,
- "begin index sort: hash_mask = 0x%x, workMem = %d, randomAccess = %c",
- hash_mask,
+ "begin index sort: high_mask = 0x%x, low_mask = 0x%x, "
+ "max_buckets = 0x%x, workMem = %d, randomAccess = %c",
+ high_mask,
+ low_mask,
+ max_buckets,
workMem, randomAccess ? 't' : 'f');
#endif
state->heapRel = heapRel;
state->indexRel = indexRel;
- state->hash_mask = hash_mask;
+ state->high_mask = high_mask;
+ state->low_mask = low_mask;
+ state->max_buckets = max_buckets;
MemoryContextSwitchTo(oldcontext);
Tuplesortstate *
tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
- bool nullsFirstFlag,
- int workMem, bool randomAccess)
+ bool nullsFirstFlag, int workMem,
+ SortCoordinate coordinate, bool randomAccess)
{
- Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
+ Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+ randomAccess);
MemoryContext oldcontext;
int16 typlen;
bool typbyval;
false, /* no unique check */
1,
workMem,
- randomAccess);
+ randomAccess,
+ PARALLEL_SORT(state));
state->comparetup = comparetup_datum;
state->copytup = copytup_datum;
* delayed calls at the moment.)
*
* This is a hint only. The tuplesort may still return more tuples than
- * requested.
+ * requested. Parallel leader tuplesorts will always ignore the hint.
*/
void
tuplesort_set_bound(Tuplesortstate *state, int64 bound)
Assert(state->status == TSS_INITIAL);
Assert(state->memtupcount == 0);
Assert(!state->bounded);
+ Assert(!WORKER(state));
#ifdef DEBUG_BOUNDED_SORT
/* Honor GUC setting that disables the feature (for easy testing) */
return;
#endif
+ /* Parallel leader ignores hint */
+ if (LEADER(state))
+ return;
+
/* We want to be able to compute bound * 2, so limit the setting */
if (bound > (int64) (INT_MAX / 2))
return;
if (trace_sort)
{
if (state->tapeset)
- elog(LOG, "external sort ended, %ld disk blocks used: %s",
- spaceUsed, pg_rusage_show(&state->ru_start));
+ elog(LOG, "%s of worker %d ended, %ld disk blocks used: %s",
+ SERIAL(state) ? "external sort" : "parallel external sort",
+ state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
else
- elog(LOG, "internal sort ended, %ld KB used: %s",
- spaceUsed, pg_rusage_show(&state->ru_start));
+ elog(LOG, "%s of worker %d ended, %ld KB used: %s",
+ SERIAL(state) ? "internal sort" : "unperformed parallel sort",
+ state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
}
TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, spaceUsed);
/*
* Grow the memtuples[] array, if possible within our memory constraint. We
* must not exceed INT_MAX tuples in memory or the caller-provided memory
- * limit. Return TRUE if we were able to enlarge the array, FALSE if not.
+ * limit. Return true if we were able to enlarge the array, false if not.
*
* Normally, at each increment we double the size of the array. When doing
* that would exceed a limit, we attempt one last, smaller increase (and then
static void
puttuple_common(Tuplesortstate *state, SortTuple *tuple)
{
+ Assert(!LEADER(state));
+
switch (state->status)
{
case TSS_INITIAL:
/*
* Nope; time to switch to tape-based operation.
*/
- inittapes(state);
+ inittapes(state, true);
/*
- * Dump tuples until we are back under the limit.
+ * Dump all tuples.
*/
dumptuples(state, false);
break;
{
/* discard top of heap, replacing it with the new tuple */
free_sort_tuple(state, &state->memtuples[0]);
- tuple->tupindex = 0; /* not used */
- tuplesort_heap_replace_top(state, tuple, false);
+ tuplesort_heap_replace_top(state, tuple);
}
break;
case TSS_BUILDRUNS:
/*
- * Insert the tuple into the heap, with run number currentRun if
- * it can go into the current run, else HEAP_RUN_NEXT. The tuple
- * can go into the current run if it is >= the first
- * not-yet-output tuple. (Actually, it could go into the current
- * run if it is >= the most recently output tuple ... but that
- * would require keeping around the tuple we last output, and it's
- * simplest to let writetup free each tuple as soon as it's
- * written.)
- *
- * Note that this only applies when:
- *
- * - currentRun is RUN_FIRST
- *
- * - Replacement selection is in use (typically it is never used).
- *
- * When these two conditions are not both true, all tuples are
- * appended indifferently, much like the TSS_INITIAL case.
- *
- * There should always be room to store the incoming tuple.
+ * Save the tuple into the unsorted array (there must be space)
*/
- Assert(!state->replaceActive || state->memtupcount > 0);
- if (state->replaceActive &&
- COMPARETUP(state, tuple, &state->memtuples[0]) >= 0)
- {
- Assert(state->currentRun == RUN_FIRST);
-
- /*
- * Insert tuple into first, fully heapified run.
- *
- * Unlike classic replacement selection, which this module was
- * previously based on, only RUN_FIRST tuples are fully
- * heapified. Any second/next run tuples are appended
- * indifferently. While HEAP_RUN_NEXT tuples may be sifted
- * out of the way of first run tuples, COMPARETUP() will never
- * be called for the run's tuples during sifting (only our
- * initial COMPARETUP() call is required for the tuple, to
- * determine that the tuple does not belong in RUN_FIRST).
- */
- tuple->tupindex = state->currentRun;
- tuplesort_heap_insert(state, tuple, true);
- }
- else
- {
- /*
- * Tuple was determined to not belong to heapified RUN_FIRST,
- * or replacement selection not in play. Append the tuple to
- * memtuples indifferently.
- *
- * dumptuples() does not trust that the next run's tuples are
- * heapified. Anything past the first run will always be
- * quicksorted even when replacement selection is initially
- * used. (When it's never used, every tuple still takes this
- * path.)
- */
- tuple->tupindex = HEAP_RUN_NEXT;
- state->memtuples[state->memtupcount++] = *tuple;
- }
+ state->memtuples[state->memtupcount++] = *tuple;
/*
- * If we are over the memory limit, dump tuples till we're under.
+ * If we are over the memory limit, dump all tuples.
*/
dumptuples(state, false);
break;
#ifdef TRACE_SORT
if (trace_sort)
- elog(LOG, "performsort starting: %s",
- pg_rusage_show(&state->ru_start));
+ elog(LOG, "performsort of worker %d starting: %s",
+ state->worker, pg_rusage_show(&state->ru_start));
#endif
switch (state->status)
/*
* We were able to accumulate all the tuples within the allowed
- * amount of memory. Just qsort 'em and we're done.
+ * amount of memory, or leader to take over worker tapes
*/
- tuplesort_sort_memtuples(state);
+ if (SERIAL(state))
+ {
+ /* Just qsort 'em and we're done */
+ tuplesort_sort_memtuples(state);
+ state->status = TSS_SORTEDINMEM;
+ }
+ else if (WORKER(state))
+ {
+ /*
+ * Parallel workers must still dump out tuples to tape. No
+ * merge is required to produce single output run, though.
+ */
+ inittapes(state, false);
+ dumptuples(state, true);
+ worker_nomergeruns(state);
+ state->status = TSS_SORTEDONTAPE;
+ }
+ else
+ {
+ /*
+ * Leader will take over worker tapes and merge worker runs.
+ * Note that mergeruns sets the correct state->status.
+ */
+ leader_takeover_tapes(state);
+ mergeruns(state);
+ }
state->current = 0;
state->eof_reached = false;
+ state->markpos_block = 0L;
state->markpos_offset = 0;
state->markpos_eof = false;
- state->status = TSS_SORTEDINMEM;
break;
case TSS_BOUNDED:
/*
* Finish tape-based sort. First, flush all tuples remaining in
* memory out to tape; then merge until we have a single remaining
- * run (or, if !randomAccess, one run per tape). Note that
- * mergeruns sets the correct state->status.
+ * run (or, if !randomAccess and !WORKER(), one run per tape).
+ * Note that mergeruns sets the correct state->status.
*/
dumptuples(state, true);
mergeruns(state);
if (trace_sort)
{
if (state->status == TSS_FINALMERGE)
- elog(LOG, "performsort done (except %d-way final merge): %s",
- state->activeTapes,
+ elog(LOG, "performsort of worker %d done (except %d-way final merge): %s",
+ state->worker, state->activeTapes,
pg_rusage_show(&state->ru_start));
else
- elog(LOG, "performsort done: %s",
- pg_rusage_show(&state->ru_start));
+ elog(LOG, "performsort of worker %d done: %s",
+ state->worker, pg_rusage_show(&state->ru_start));
}
#endif
/*
* Internal routine to fetch the next tuple in either forward or back
- * direction into *stup. Returns FALSE if no more tuples.
+ * direction into *stup. Returns false if no more tuples.
* Returned tuple belongs to tuplesort memory context, and must not be freed
- * by caller. Caller should not use tuple following next call here.
+ * by caller. Note that fetched tuple is stored in memory that may be
+ * recycled by any future fetch.
*/
static bool
tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
unsigned int tuplen;
size_t nmoved;
+ Assert(!WORKER(state));
+
switch (state->status)
{
case TSS_SORTEDINMEM:
*/
nmoved = LogicalTapeBackspace(state->tapeset,
state->result_tape,
- tuplen + 2 * sizeof(unsigned int));
+ tuplen + 2 * sizeof(unsigned int));
if (nmoved == tuplen + sizeof(unsigned int))
{
/*
* If no more data, we've reached end of run on this tape.
* Remove the top node from the heap.
*/
- tuplesort_heap_delete_top(state, false);
+ tuplesort_heap_delete_top(state);
/*
* Rewind to free the read buffer. It'd go away at the
return true;
}
newtup.tupindex = srcTape;
- tuplesort_heap_replace_top(state, &newtup, false);
+ tuplesort_heap_replace_top(state, &newtup);
return true;
}
return false;
/*
* Fetch the next tuple in either forward or back direction.
- * If successful, put tuple in slot and return TRUE; else, clear the slot
- * and return FALSE.
+ * If successful, put tuple in slot and return true; else, clear the slot
+ * and return false.
*
- * Caller may optionally be passed back abbreviated value (on TRUE return
+ * Caller may optionally be passed back abbreviated value (on true return
* value) when abbreviation was used, which can be used to cheaply avoid
* equality checks that might otherwise be required. Caller can safely make a
* determination of "non-equal tuple" based on simple binary inequality. A
* NULL value in leading attribute will set abbreviated value to zeroed
* representation, which caller may rely on in abbreviated inequality check.
*
- * The slot receives a copied tuple (sometimes allocated in caller memory
- * context) that will stay valid regardless of future manipulations of the
- * tuplesort's state.
+ * If copy is true, the slot receives a tuple that's been copied into the
+ * caller's memory context, so that it will stay valid regardless of future
+ * manipulations of the tuplesort's state (up to and including deleting the
+ * tuplesort). If copy is false, the slot will just receive a pointer to a
+ * tuple held within the tuplesort, which is more efficient, but only safe for
+ * callers that are prepared to have any subsequent manipulation of the
+ * tuplesort's state invalidate slot contents.
*/
bool
-tuplesort_gettupleslot(Tuplesortstate *state, bool forward,
+tuplesort_gettupleslot(Tuplesortstate *state, bool forward, bool copy,
TupleTableSlot *slot, Datum *abbrev)
{
MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
if (state->sortKeys->abbrev_converter && abbrev)
*abbrev = stup.datum1;
- stup.tuple = heap_copy_minimal_tuple((MinimalTuple) stup.tuple);
- ExecStoreMinimalTuple((MinimalTuple) stup.tuple, slot, true);
+ if (copy)
+ stup.tuple = heap_copy_minimal_tuple((MinimalTuple) stup.tuple);
+
+ ExecStoreMinimalTuple((MinimalTuple) stup.tuple, slot, copy);
return true;
}
else
/*
* Fetch the next tuple in either forward or back direction.
* Returns NULL if no more tuples. Returned tuple belongs to tuplesort memory
- * context, and must not be freed by caller. Caller should not use tuple
- * following next call here.
+ * context, and must not be freed by caller. Caller may not rely on tuple
+ * remaining valid after any further manipulation of tuplesort.
*/
HeapTuple
tuplesort_getheaptuple(Tuplesortstate *state, bool forward)
/*
* Fetch the next index tuple in either forward or back direction.
* Returns NULL if no more tuples. Returned tuple belongs to tuplesort memory
- * context, and must not be freed by caller. Caller should not use tuple
- * following next call here.
+ * context, and must not be freed by caller. Caller may not rely on tuple
+ * remaining valid after any further manipulation of tuplesort.
*/
IndexTuple
tuplesort_getindextuple(Tuplesortstate *state, bool forward)
/*
* Fetch the next Datum in either forward or back direction.
- * Returns FALSE if no more datums.
+ * Returns false if no more datums.
*
* If the Datum is pass-by-ref type, the returned value is freshly palloc'd
- * and is now owned by the caller (this differs from similar routines for
- * other types of tuplesorts).
+ * in caller's context, and is now owned by the caller (this differs from
+ * similar routines for other types of tuplesorts).
*
- * Caller may optionally be passed back abbreviated value (on TRUE return
+ * Caller may optionally be passed back abbreviated value (on true return
* value) when abbreviation was used, which can be used to cheaply avoid
* equality checks that might otherwise be required. Caller can safely make a
* determination of "non-equal tuple" based on simple binary inequality. A
return false;
}
+ /* Ensure we copy into caller's memory context */
+ MemoryContextSwitchTo(oldcontext);
+
/* Record abbreviated key for caller */
if (state->sortKeys->abbrev_converter && abbrev)
*abbrev = stup.datum1;
*isNull = false;
}
- MemoryContextSwitchTo(oldcontext);
-
return true;
}
/*
* Advance over N tuples in either forward or back direction,
* without returning any data. N==0 is a no-op.
- * Returns TRUE if successful, FALSE if ran out of tuples.
+ * Returns true if successful, false if ran out of tuples.
*/
bool
tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
*/
Assert(forward);
Assert(ntuples >= 0);
+ Assert(!WORKER(state));
switch (state->status)
{
* which in turn can cause the same sort to need more runs, which makes
* merging slower even if it can still be done in a single pass. Also,
* high order merges are quite slow due to CPU cache effects; it can be
- * faster to pay the I/O cost of a polyphase merge than to perform a single
- * merge pass across many hundreds of tapes.
+ * faster to pay the I/O cost of a polyphase merge than to perform a
+ * single merge pass across many hundreds of tapes.
*/
mOrder = Max(mOrder, MINORDER);
mOrder = Min(mOrder, MAXORDER);
return mOrder;
}
-/*
- * useselection - determine algorithm to use to sort first run.
- *
- * It can sometimes be useful to use the replacement selection algorithm if it
- * results in one large run, and there is little available workMem. See
- * remarks on RUN_SECOND optimization within dumptuples().
- */
-static bool
-useselection(Tuplesortstate *state)
-{
- /*
- * memtupsize might be noticeably higher than memtupcount here in atypical
- * cases. It seems slightly preferable to not allow recent outliers to
- * impact this determination. Note that caller's trace_sort output
- * reports memtupcount instead.
- */
- if (state->memtupsize <= replacement_sort_tuples)
- return true;
-
- return false;
-}
-
/*
* inittapes - initialize for tape sorting.
*
- * This is called only if we have found we don't have room to sort in memory.
+ * This is called only if we have found we won't sort in memory.
*/
static void
-inittapes(Tuplesortstate *state)
+inittapes(Tuplesortstate *state, bool mergeruns)
{
int maxTapes,
j;
- int64 tapeSpace;
-
- /* Compute number of tapes to use: merge order plus 1 */
- maxTapes = tuplesort_merge_order(state->allowedMem) + 1;
-
- state->maxTapes = maxTapes;
- state->tapeRange = maxTapes - 1;
-
-#ifdef TRACE_SORT
- if (trace_sort)
- elog(LOG, "switching to external sort with %d tapes: %s",
- maxTapes, pg_rusage_show(&state->ru_start));
-#endif
-
- /*
- * Decrease availMem to reflect the space needed for tape buffers, when
- * writing the initial runs; but don't decrease it to the point that we
- * have no room for tuples. (That case is only likely to occur if sorting
- * pass-by-value Datums; in all other scenarios the memtuples[] array is
- * unlikely to occupy more than half of allowedMem. In the pass-by-value
- * case it's not important to account for tuple space, so we don't care if
- * LACKMEM becomes inaccurate.)
- */
- tapeSpace = (int64) maxTapes *TAPE_BUFFER_OVERHEAD;
-
- if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
- USEMEM(state, tapeSpace);
- /*
- * Make sure that the temp file(s) underlying the tape set are created in
- * suitable temp tablespaces.
- */
- PrepareTempTablespaces();
-
- /*
- * Create the tape set and allocate the per-tape data arrays.
- */
- state->tapeset = LogicalTapeSetCreate(maxTapes);
-
- state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool));
- state->tp_fib = (int *) palloc0(maxTapes * sizeof(int));
- state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
- state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int));
- state->tp_tapenum = (int *) palloc0(maxTapes * sizeof(int));
-
- /*
- * Give replacement selection a try based on user setting. There will be
- * a switch to a simple hybrid sort-merge strategy after the first run
- * (iff we could not output one long run).
- */
- state->replaceActive = useselection(state);
+ Assert(!LEADER(state));
- if (state->replaceActive)
+ if (mergeruns)
{
- /*
- * Convert the unsorted contents of memtuples[] into a heap. Each
- * tuple is marked as belonging to run number zero.
- *
- * NOTE: we pass false for checkIndex since there's no point in
- * comparing indexes in this step, even though we do intend the
- * indexes to be part of the sort key...
- */
- int ntuples = state->memtupcount;
+ /* Compute number of tapes to use: merge order plus 1 */
+ maxTapes = tuplesort_merge_order(state->allowedMem) + 1;
+ }
+ else
+ {
+ /* Workers can sometimes produce single run, output without merge */
+ Assert(WORKER(state));
+ maxTapes = MINORDER + 1;
+ }
#ifdef TRACE_SORT
- if (trace_sort)
- elog(LOG, "replacement selection will sort %d first run tuples",
- state->memtupcount);
+ if (trace_sort)
+ elog(LOG, "worker %d switching to external sort with %d tapes: %s",
+ state->worker, maxTapes, pg_rusage_show(&state->ru_start));
#endif
- state->memtupcount = 0; /* make the heap empty */
- for (j = 0; j < ntuples; j++)
- {
- /* Must copy source tuple to avoid possible overwrite */
- SortTuple stup = state->memtuples[j];
+ /* Create the tape set and allocate the per-tape data arrays */
+ inittapestate(state, maxTapes);
+ state->tapeset =
+ LogicalTapeSetCreate(maxTapes, NULL,
+ state->shared ? &state->shared->fileset : NULL,
+ state->worker);
- stup.tupindex = RUN_FIRST;
- tuplesort_heap_insert(state, &stup, false);
- }
- Assert(state->memtupcount == ntuples);
- }
-
- state->currentRun = RUN_FIRST;
+ state->currentRun = 0;
/*
* Initialize variables of Algorithm D (step D1).
state->status = TSS_BUILDRUNS;
}
+/*
+ * inittapestate - initialize generic tape management state
+ */
+static void
+inittapestate(Tuplesortstate *state, int maxTapes)
+{
+ int64 tapeSpace;
+
+ /*
+ * Decrease availMem to reflect the space needed for tape buffers; but
+ * don't decrease it to the point that we have no room for tuples. (That
+ * case is only likely to occur if sorting pass-by-value Datums; in all
+ * other scenarios the memtuples[] array is unlikely to occupy more than
+ * half of allowedMem. In the pass-by-value case it's not important to
+ * account for tuple space, so we don't care if LACKMEM becomes
+ * inaccurate.)
+ */
+ tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD;
+
+ if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
+ USEMEM(state, tapeSpace);
+
+ /*
+ * Make sure that the temp file(s) underlying the tape set are created in
+ * suitable temp tablespaces. For parallel sorts, this should have been
+ * called already, but it doesn't matter if it is called a second time.
+ */
+ PrepareTempTablespaces();
+
+ state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool));
+ state->tp_fib = (int *) palloc0(maxTapes * sizeof(int));
+ state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
+ state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int));
+ state->tp_tapenum = (int *) palloc0(maxTapes * sizeof(int));
+
+ /* Record # of tapes allocated (for duration of sort) */
+ state->maxTapes = maxTapes;
+ /* Record maximum # of tapes usable as inputs when merging */
+ state->tapeRange = maxTapes - 1;
+}
+
/*
* selectnewtape -- select new tape for new initial run.
*
else
init_slab_allocator(state, 0);
- /*
- * If we produced only one initial run (quite likely if the total data
- * volume is between 1X and 2X workMem when replacement selection is used,
- * but something we particular count on when input is presorted), we can
- * just use that tape as the finished output, rather than doing a useless
- * merge. (This obvious optimization is not in Knuth's algorithm.)
- */
- if (state->currentRun == RUN_SECOND)
- {
- state->result_tape = state->tp_tapenum[state->destTape];
- /* must freeze and rewind the finished output tape */
- LogicalTapeFreeze(state->tapeset, state->result_tape);
- state->status = TSS_SORTEDONTAPE;
- return;
- }
-
/*
* Allocate a new 'memtuples' array, for the heap. It will hold one tuple
* from each input tape.
* Use all the remaining memory we have available for read buffers among
* the input tapes.
*
- * We do this only after checking for the case that we produced only one
- * initial run, because there is no need to use a large read buffer when
- * we're reading from a single tape. With one tape, the I/O pattern will
- * be the same regardless of the buffer size.
- *
* We don't try to "rebalance" the memory among tapes, when we start a new
* merge phase, even if some tapes are inactive in the new phase. That
* would be hard, because logtape.c doesn't know where one run ends and
*/
#ifdef TRACE_SORT
if (trace_sort)
- elog(LOG, "using " INT64_FORMAT " KB of memory for read buffers among %d input tapes",
- (state->availMem) / 1024, numInputTapes);
+ elog(LOG, "worker %d using " INT64_FORMAT " KB of memory for read buffers among %d input tapes",
+ state->worker, state->availMem / 1024, numInputTapes);
#endif
state->read_buffer_size = Max(state->availMem / numInputTapes, 0);
* pass remains. If we don't have to produce a materialized sorted
* tape, we can stop at this point and do the final merge on-the-fly.
*/
- if (!state->randomAccess)
+ if (!state->randomAccess && !WORKER(state))
{
bool allOneRun = true;
* a waste of cycles anyway...
*/
state->result_tape = state->tp_tapenum[state->tapeRange];
- LogicalTapeFreeze(state->tapeset, state->result_tape);
+ if (!WORKER(state))
+ LogicalTapeFreeze(state->tapeset, state->result_tape, NULL);
+ else
+ worker_freeze_result_tape(state);
state->status = TSS_SORTEDONTAPE;
/* Release the read buffers of all the other tapes, by rewinding them. */
WRITETUP(state, destTape, &state->memtuples[0]);
/* recycle the slot of the tuple we just wrote out, for the next read */
- RELEASE_SLAB_SLOT(state, state->memtuples[0].tuple);
+ if (state->memtuples[0].tuple)
+ RELEASE_SLAB_SLOT(state, state->memtuples[0].tuple);
/*
* pull next tuple from the tape, and replace the written-out tuple in
if (mergereadnext(state, srcTape, &stup))
{
stup.tupindex = srcTape;
- tuplesort_heap_replace_top(state, &stup, false);
+ tuplesort_heap_replace_top(state, &stup);
}
else
- tuplesort_heap_delete_top(state, false);
+ tuplesort_heap_delete_top(state);
}
/*
#ifdef TRACE_SORT
if (trace_sort)
- elog(LOG, "finished %d-way merge step: %s", state->activeTapes,
- pg_rusage_show(&state->ru_start));
+ elog(LOG, "worker %d finished %d-way merge step: %s", state->worker,
+ state->activeTapes, pg_rusage_show(&state->ru_start));
#endif
}
if (mergereadnext(state, srcTape, &tup))
{
tup.tupindex = srcTape;
- tuplesort_heap_insert(state, &tup, false);
+ tuplesort_heap_insert(state, &tup);
}
}
}
}
/*
- * dumptuples - remove tuples from memtuples and write to tape
+ * dumptuples - remove tuples from memtuples and write initial run to tape
*
- * This is used during initial-run building, but not during merging.
- *
- * When alltuples = false and replacement selection is still active, dump
- * only enough tuples to get under the availMem limit (and leave at least
- * one tuple in memtuples, since puttuple will then assume it is a heap that
- * has a tuple to compare to). We always insist there be at least one free
- * slot in the memtuples[] array.
- *
- * When alltuples = true, dump everything currently in memory. (This
- * case is only used at end of input data, although in practice only the
- * first run could fail to dump all tuples when we LACKMEM(), and only
- * when replacement selection is active.)
- *
- * If, when replacement selection is active, we see that the tuple run
- * number at the top of the heap has changed, start a new run. This must be
- * the first run, because replacement selection is always abandoned for all
- * further runs.
+ * When alltuples = true, dump everything currently in memory. (This case is
+ * only used at end of input data.)
*/
static void
dumptuples(Tuplesortstate *state, bool alltuples)
-{
- while (alltuples ||
- (LACKMEM(state) && state->memtupcount > 1) ||
- state->memtupcount >= state->memtupsize)
- {
- if (state->replaceActive)
- {
- /*
- * Still holding out for a case favorable to replacement
- * selection. Still incrementally spilling using heap.
- *
- * Dump the heap's frontmost entry, and remove it from the heap.
- */
- Assert(state->memtupcount > 0);
- WRITETUP(state, state->tp_tapenum[state->destTape],
- &state->memtuples[0]);
- tuplesort_heap_delete_top(state, true);
- }
- else
- {
- /*
- * Once committed to quicksorting runs, never incrementally spill
- */
- dumpbatch(state, alltuples);
- break;
- }
-
- /*
- * If top run number has changed, we've finished the current run (this
- * can only be the first run), and will no longer spill incrementally.
- */
- if (state->memtupcount == 0 ||
- state->memtuples[0].tupindex == HEAP_RUN_NEXT)
- {
- markrunend(state, state->tp_tapenum[state->destTape]);
- Assert(state->currentRun == RUN_FIRST);
- state->currentRun++;
- state->tp_runs[state->destTape]++;
- state->tp_dummy[state->destTape]--; /* per Alg D step D2 */
-
-#ifdef TRACE_SORT
- if (trace_sort)
- elog(LOG, "finished incrementally writing %s run %d to tape %d: %s",
- (state->memtupcount == 0) ? "only" : "first",
- state->currentRun, state->destTape,
- pg_rusage_show(&state->ru_start));
-#endif
-
- /*
- * Done if heap is empty, which is possible when there is only one
- * long run.
- */
- Assert(state->currentRun == RUN_SECOND);
- if (state->memtupcount == 0)
- {
- /*
- * Replacement selection best case; no final merge required,
- * because there was only one initial run (second run has no
- * tuples). See RUN_SECOND case in mergeruns().
- */
- break;
- }
-
- /*
- * Abandon replacement selection for second run (as well as any
- * subsequent runs).
- */
- state->replaceActive = false;
-
- /*
- * First tuple of next run should not be heapified, and so will
- * bear placeholder run number. In practice this must actually be
- * the second run, which just became the currentRun, so we're
- * clear to quicksort and dump the tuples in batch next time
- * memtuples becomes full.
- */
- Assert(state->memtuples[0].tupindex == HEAP_RUN_NEXT);
- selectnewtape(state);
- }
- }
-}
-
-/*
- * dumpbatch - sort and dump all memtuples, forming one run on tape
- *
- * Second or subsequent runs are never heapified by this module (although
- * heapification still respects run number differences between the first and
- * second runs), and a heap (replacement selection priority queue) is often
- * avoided in the first place.
- */
-static void
-dumpbatch(Tuplesortstate *state, bool alltuples)
{
int memtupwrite;
int i;
+ /*
+ * Nothing to do if we still fit in available memory and have array slots,
+ * unless this is the final call during initial run generation.
+ */
+ if (state->memtupcount < state->memtupsize && !LACKMEM(state) &&
+ !alltuples)
+ return;
+
/*
* Final call might require no sorting, in rare cases where we just so
* happen to have previously LACKMEM()'d at the point where exactly all
#ifdef TRACE_SORT
if (trace_sort)
- elog(LOG, "starting quicksort of run %d: %s",
- state->currentRun, pg_rusage_show(&state->ru_start));
+ elog(LOG, "worker %d starting quicksort of run %d: %s",
+ state->worker, state->currentRun,
+ pg_rusage_show(&state->ru_start));
#endif
/*
#ifdef TRACE_SORT
if (trace_sort)
- elog(LOG, "finished quicksort of run %d: %s",
- state->currentRun, pg_rusage_show(&state->ru_start));
+ elog(LOG, "worker %d finished quicksort of run %d: %s",
+ state->worker, state->currentRun,
+ pg_rusage_show(&state->ru_start));
#endif
memtupwrite = state->memtupcount;
#ifdef TRACE_SORT
if (trace_sort)
- elog(LOG, "finished writing run %d to tape %d: %s",
- state->currentRun, state->destTape,
+ elog(LOG, "worker %d finished writing run %d to tape %d: %s",
+ state->worker, state->currentRun, state->destTape,
pg_rusage_show(&state->ru_start));
#endif
*
* This can be called after tuplesort_performsort() finishes to obtain
* printable summary information about how the sort was performed.
- * spaceUsed is measured in kilobytes.
*/
void
tuplesort_get_stats(Tuplesortstate *state,
- const char **sortMethod,
- const char **spaceType,
- long *spaceUsed)
+ TuplesortInstrumentation *stats)
{
/*
* Note: it might seem we should provide both memory and disk usage for a
*/
if (state->tapeset)
{
- *spaceType = "Disk";
- *spaceUsed = LogicalTapeSetBlocks(state->tapeset) * (BLCKSZ / 1024);
+ stats->spaceType = SORT_SPACE_TYPE_DISK;
+ stats->spaceUsed = LogicalTapeSetBlocks(state->tapeset) * (BLCKSZ / 1024);
}
else
{
- *spaceType = "Memory";
- *spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
+ stats->spaceType = SORT_SPACE_TYPE_MEMORY;
+ stats->spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
}
switch (state->status)
{
case TSS_SORTEDINMEM:
if (state->boundUsed)
- *sortMethod = "top-N heapsort";
+ stats->sortMethod = SORT_TYPE_TOP_N_HEAPSORT;
else
- *sortMethod = "quicksort";
+ stats->sortMethod = SORT_TYPE_QUICKSORT;
break;
case TSS_SORTEDONTAPE:
- *sortMethod = "external sort";
+ stats->sortMethod = SORT_TYPE_EXTERNAL_SORT;
break;
case TSS_FINALMERGE:
- *sortMethod = "external merge";
+ stats->sortMethod = SORT_TYPE_EXTERNAL_MERGE;
break;
default:
- *sortMethod = "still in progress";
+ stats->sortMethod = SORT_TYPE_STILL_IN_PROGRESS;
break;
}
}
+/*
+ * Convert TuplesortMethod to a string.
+ */
+const char *
+tuplesort_method_name(TuplesortMethod m)
+{
+ switch (m)
+ {
+ case SORT_TYPE_STILL_IN_PROGRESS:
+ return "still in progress";
+ case SORT_TYPE_TOP_N_HEAPSORT:
+ return "top-N heapsort";
+ case SORT_TYPE_QUICKSORT:
+ return "quicksort";
+ case SORT_TYPE_EXTERNAL_SORT:
+ return "external sort";
+ case SORT_TYPE_EXTERNAL_MERGE:
+ return "external merge";
+ }
+
+ return "unknown";
+}
+
+/*
+ * Convert TuplesortSpaceType to a string.
+ */
+const char *
+tuplesort_space_type_name(TuplesortSpaceType t)
+{
+ Assert(t == SORT_SPACE_TYPE_DISK || t == SORT_SPACE_TYPE_MEMORY);
+ return t == SORT_SPACE_TYPE_DISK ? "Disk" : "Memory";
+}
+
/*
* Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
- *
- * Compare two SortTuples. If checkIndex is true, use the tuple index
- * as the front of the sort key; otherwise, no.
- *
- * Note that for checkIndex callers, the heap invariant is never
- * maintained beyond the first run, and so there are no COMPARETUP()
- * calls needed to distinguish tuples in HEAP_RUN_NEXT.
*/
-#define HEAPCOMPARE(tup1,tup2) \
- (checkIndex && ((tup1)->tupindex != (tup2)->tupindex || \
- (tup1)->tupindex == HEAP_RUN_NEXT) ? \
- ((tup1)->tupindex) - ((tup2)->tupindex) : \
- COMPARETUP(state, tup1, tup2))
-
/*
* Convert the existing unordered array of SortTuples to a bounded heap,
* discarding all but the smallest "state->bound" tuples.
* at the root (array entry zero), instead of the smallest as in the normal
* sort case. This allows us to discard the largest entry cheaply.
* Therefore, we temporarily reverse the sort direction.
- *
- * We assume that all entries in a bounded heap will always have tupindex
- * zero; it therefore doesn't matter that HEAPCOMPARE() doesn't reverse
- * the direction of comparison for tupindexes.
*/
static void
make_bounded_heap(Tuplesortstate *state)
Assert(state->status == TSS_INITIAL);
Assert(state->bounded);
Assert(tupcount >= state->bound);
+ Assert(SERIAL(state));
/* Reverse sort direction so largest entry will be at root */
reversedirection(state);
/* Must copy source tuple to avoid possible overwrite */
SortTuple stup = state->memtuples[i];
- stup.tupindex = 0; /* not used */
- tuplesort_heap_insert(state, &stup, false);
+ tuplesort_heap_insert(state, &stup);
}
else
{
CHECK_FOR_INTERRUPTS();
}
else
- tuplesort_heap_replace_top(state, &state->memtuples[i], false);
+ tuplesort_heap_replace_top(state, &state->memtuples[i]);
}
}
Assert(state->status == TSS_BOUNDED);
Assert(state->bounded);
Assert(tupcount == state->bound);
+ Assert(SERIAL(state));
/*
* We can unheapify in place because each delete-top call will remove the
SortTuple stup = state->memtuples[0];
/* this sifts-up the next-largest entry and decreases memtupcount */
- tuplesort_heap_delete_top(state, false);
+ tuplesort_heap_delete_top(state);
state->memtuples[state->memtupcount] = stup;
}
state->memtupcount = tupcount;
/*
* Sort all memtuples using specialized qsort() routines.
*
- * Quicksort is used for small in-memory sorts. Quicksort is also generally
- * preferred to replacement selection for generating runs during external sort
- * operations, although replacement selection is sometimes used for the first
- * run.
+ * Quicksort is used for small in-memory sorts, and external sort runs.
*/
static void
tuplesort_sort_memtuples(Tuplesortstate *state)
{
+ Assert(!LEADER(state));
+
if (state->memtupcount > 1)
{
/* Can we use the single-key sort function? */
* is, it might get overwritten before being moved into the heap!
*/
static void
-tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple,
- bool checkIndex)
+tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple)
{
SortTuple *memtuples;
int j;
memtuples = state->memtuples;
Assert(state->memtupcount < state->memtupsize);
- Assert(!checkIndex || tuple->tupindex == RUN_FIRST);
CHECK_FOR_INTERRUPTS();
{
int i = (j - 1) >> 1;
- if (HEAPCOMPARE(tuple, &memtuples[i]) >= 0)
+ if (COMPARETUP(state, tuple, &memtuples[i]) >= 0)
break;
memtuples[j] = memtuples[i];
j = i;
* if necessary.
*/
static void
-tuplesort_heap_delete_top(Tuplesortstate *state, bool checkIndex)
+tuplesort_heap_delete_top(Tuplesortstate *state)
{
SortTuple *memtuples = state->memtuples;
SortTuple *tuple;
- Assert(!checkIndex || state->currentRun == RUN_FIRST);
if (--state->memtupcount <= 0)
return;
* current top node with it.
*/
tuple = &memtuples[state->memtupcount];
- tuplesort_heap_replace_top(state, tuple, checkIndex);
+ tuplesort_heap_replace_top(state, tuple);
}
/*
* Heapsort, steps H3-H8).
*/
static void
-tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple,
- bool checkIndex)
+tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple)
{
SortTuple *memtuples = state->memtuples;
- int i,
+ unsigned int i,
n;
- Assert(!checkIndex || state->currentRun == RUN_FIRST);
Assert(state->memtupcount >= 1);
CHECK_FOR_INTERRUPTS();
+ /*
+ * state->memtupcount is "int", but we use "unsigned int" for i, j, n.
+ * This prevents overflow in the "2 * i + 1" calculation, since at the top
+ * of the loop we must have i < n <= INT_MAX <= UINT_MAX/2.
+ */
n = state->memtupcount;
i = 0; /* i is where the "hole" is */
for (;;)
{
- int j = 2 * i + 1;
+ unsigned int j = 2 * i + 1;
if (j >= n)
break;
if (j + 1 < n &&
- HEAPCOMPARE(&memtuples[j], &memtuples[j + 1]) > 0)
+ COMPARETUP(state, &memtuples[j], &memtuples[j + 1]) > 0)
j++;
- if (HEAPCOMPARE(tuple, &memtuples[j]) <= 0)
+ if (COMPARETUP(state, tuple, &memtuples[j]) <= 0)
break;
memtuples[i] = memtuples[j];
i = j;
datum2;
bool isnull1,
isnull2;
- AttrNumber leading = state->indexInfo->ii_KeyAttrNumbers[0];
+ AttrNumber leading = state->indexInfo->ii_IndexAttrNumbers[0];
/* Be prepared to compare additional sort keys */
ltup = (HeapTuple) a->tuple;
for (; nkey < state->nKeys; nkey++, sortKey++)
{
- AttrNumber attno = state->indexInfo->ii_KeyAttrNumbers[nkey];
+ AttrNumber attno = state->indexInfo->ii_IndexAttrNumbers[nkey];
datum1 = heap_getattr(ltup, attno, tupDesc, &isnull1);
datum2 = heap_getattr(rtup, attno, tupDesc, &isnull2);
ecxt_scantuple = GetPerTupleExprContext(state->estate)->ecxt_scantuple;
- ExecStoreTuple(ltup, ecxt_scantuple, InvalidBuffer, false);
+ ExecStoreHeapTuple(ltup, ecxt_scantuple, false);
FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate,
l_index_values, l_index_isnull);
- ExecStoreTuple(rtup, ecxt_scantuple, InvalidBuffer, false);
+ ExecStoreHeapTuple(rtup, ecxt_scantuple, false);
FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate,
r_index_values, r_index_isnull);
* set up first-column key value, and potentially abbreviate, if it's a
* simple column
*/
- if (state->indexInfo->ii_KeyAttrNumbers[0] == 0)
+ if (state->indexInfo->ii_IndexAttrNumbers[0] == 0)
return;
original = heap_getattr(tuple,
- state->indexInfo->ii_KeyAttrNumbers[0],
+ state->indexInfo->ii_IndexAttrNumbers[0],
state->tupDesc,
&stup->isnull1);
tuple = (HeapTuple) mtup->tuple;
mtup->datum1 = heap_getattr(tuple,
- state->indexInfo->ii_KeyAttrNumbers[0],
+ state->indexInfo->ii_IndexAttrNumbers[0],
state->tupDesc,
&mtup->isnull1);
}
&tuplen, sizeof(tuplen));
stup->tuple = (void *) tuple;
/* set up first-column key value, if it's a simple column */
- if (state->indexInfo->ii_KeyAttrNumbers[0] != 0)
+ if (state->indexInfo->ii_IndexAttrNumbers[0] != 0)
stup->datum1 = heap_getattr(tuple,
- state->indexInfo->ii_KeyAttrNumbers[0],
+ state->indexInfo->ii_IndexAttrNumbers[0],
state->tupDesc,
&stup->isnull1);
}
key_desc ? errdetail("Key %s is duplicated.", key_desc) :
errdetail("Duplicate keys exist."),
errtableconstraint(state->heapRel,
- RelationGetRelationName(state->indexRel))));
+ RelationGetRelationName(state->indexRel))));
}
/*
- * If key values are equal, we sort on ItemPointer. This does not affect
- * validity of the finished index, but it may be useful to have index
- * scans in physical order.
+ * If key values are equal, we sort on ItemPointer. This is required for
+ * btree indexes, since heap TID is treated as an implicit last key
+ * attribute in order to ensure that all keys in the index are physically
+ * unique.
*/
{
BlockNumber blk1 = ItemPointerGetBlockNumber(&tuple1->t_tid);
return (pos1 < pos2) ? -1 : 1;
}
+ /* ItemPointer values should never be equal */
+ Assert(false);
+
return 0;
}
comparetup_index_hash(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state)
{
- uint32 hash1;
- uint32 hash2;
+ Bucket bucket1;
+ Bucket bucket2;
IndexTuple tuple1;
IndexTuple tuple2;
* that the first column of the index tuple is the hash key.
*/
Assert(!a->isnull1);
- hash1 = DatumGetUInt32(a->datum1) & state->hash_mask;
+ bucket1 = _hash_hashkey2bucket(DatumGetUInt32(a->datum1),
+ state->max_buckets, state->high_mask,
+ state->low_mask);
Assert(!b->isnull1);
- hash2 = DatumGetUInt32(b->datum1) & state->hash_mask;
-
- if (hash1 > hash2)
+ bucket2 = _hash_hashkey2bucket(DatumGetUInt32(b->datum1),
+ state->max_buckets, state->high_mask,
+ state->low_mask);
+ if (bucket1 > bucket2)
return 1;
- else if (hash1 < hash2)
+ else if (bucket1 < bucket2)
return -1;
/*
return (pos1 < pos2) ? -1 : 1;
}
+ /* ItemPointer values should never be equal */
+ Assert(false);
+
return 0;
}
if (state->sortKeys->abbrev_converter)
compare = ApplySortAbbrevFullComparator(PointerGetDatum(a->tuple), a->isnull1,
- PointerGetDatum(b->tuple), b->isnull1,
+ PointerGetDatum(b->tuple), b->isnull1,
state->sortKeys);
return compare;
&tuplen, sizeof(tuplen));
}
+/*
+ * Parallel sort routines
+ */
+
+/*
+ * tuplesort_estimate_shared - estimate required shared memory allocation
+ *
+ * nWorkers is an estimate of the number of workers (it's the number that
+ * will be requested).
+ */
+Size
+tuplesort_estimate_shared(int nWorkers)
+{
+ Size tapesSize;
+
+ Assert(nWorkers > 0);
+
+ /* Make sure that BufFile shared state is MAXALIGN'd */
+ tapesSize = mul_size(sizeof(TapeShare), nWorkers);
+ tapesSize = MAXALIGN(add_size(tapesSize, offsetof(Sharedsort, tapes)));
+
+ return tapesSize;
+}
+
+/*
+ * tuplesort_initialize_shared - initialize shared tuplesort state
+ *
+ * Must be called from leader process before workers are launched, to
+ * establish state needed up-front for worker tuplesortstates. nWorkers
+ * should match the argument passed to tuplesort_estimate_shared().
+ */
+void
+tuplesort_initialize_shared(Sharedsort *shared, int nWorkers, dsm_segment *seg)
+{
+ int i;
+
+ Assert(nWorkers > 0);
+
+ SpinLockInit(&shared->mutex);
+ shared->currentWorker = 0;
+ shared->workersFinished = 0;
+ SharedFileSetInit(&shared->fileset, seg);
+ shared->nTapes = nWorkers;
+ for (i = 0; i < nWorkers; i++)
+ {
+ shared->tapes[i].firstblocknumber = 0L;
+ }
+}
+
+/*
+ * tuplesort_attach_shared - attach to shared tuplesort state
+ *
+ * Must be called by all worker processes.
+ */
+void
+tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg)
+{
+ /* Attach to SharedFileSet */
+ SharedFileSetAttach(&shared->fileset, seg);
+}
+
+/*
+ * worker_get_identifier - Assign and return ordinal identifier for worker
+ *
+ * The order in which these are assigned is not well defined, and should not
+ * matter; worker numbers across parallel sort participants need only be
+ * distinct and gapless. logtape.c requires this.
+ *
+ * Note that the identifiers assigned from here have no relation to
+ * ParallelWorkerNumber number, to avoid making any assumption about
+ * caller's requirements. However, we do follow the ParallelWorkerNumber
+ * convention of representing a non-worker with worker number -1. This
+ * includes the leader, as well as serial Tuplesort processes.
+ */
+static int
+worker_get_identifier(Tuplesortstate *state)
+{
+ Sharedsort *shared = state->shared;
+ int worker;
+
+ Assert(WORKER(state));
+
+ SpinLockAcquire(&shared->mutex);
+ worker = shared->currentWorker++;
+ SpinLockRelease(&shared->mutex);
+
+ return worker;
+}
+
+/*
+ * worker_freeze_result_tape - freeze worker's result tape for leader
+ *
+ * This is called by workers just after the result tape has been determined,
+ * instead of calling LogicalTapeFreeze() directly. They do so because
+ * workers require a few additional steps over similar serial
+ * TSS_SORTEDONTAPE external sort cases, which also happen here. The extra
+ * steps are around freeing now unneeded resources, and representing to
+ * leader that worker's input run is available for its merge.
+ *
+ * There should only be one final output run for each worker, which consists
+ * of all tuples that were originally input into worker.
+ */
+static void
+worker_freeze_result_tape(Tuplesortstate *state)
+{
+ Sharedsort *shared = state->shared;
+ TapeShare output;
+
+ Assert(WORKER(state));
+ Assert(state->result_tape != -1);
+ Assert(state->memtupcount == 0);
+
+ /*
+ * Free most remaining memory, in case caller is sensitive to our holding
+ * on to it. memtuples may not be a tiny merge heap at this point.
+ */
+ pfree(state->memtuples);
+ /* Be tidy */
+ state->memtuples = NULL;
+ state->memtupsize = 0;
+
+ /*
+ * Parallel worker requires result tape metadata, which is to be stored in
+ * shared memory for leader
+ */
+ LogicalTapeFreeze(state->tapeset, state->result_tape, &output);
+
+ /* Store properties of output tape, and update finished worker count */
+ SpinLockAcquire(&shared->mutex);
+ shared->tapes[state->worker] = output;
+ shared->workersFinished++;
+ SpinLockRelease(&shared->mutex);
+}
+
+/*
+ * worker_nomergeruns - dump memtuples in worker, without merging
+ *
+ * This called as an alternative to mergeruns() with a worker when no
+ * merging is required.
+ */
+static void
+worker_nomergeruns(Tuplesortstate *state)
+{
+ Assert(WORKER(state));
+ Assert(state->result_tape == -1);
+
+ state->result_tape = state->tp_tapenum[state->destTape];
+ worker_freeze_result_tape(state);
+}
+
+/*
+ * leader_takeover_tapes - create tapeset for leader from worker tapes
+ *
+ * So far, leader Tuplesortstate has performed no actual sorting. By now, all
+ * sorting has occurred in workers, all of which must have already returned
+ * from tuplesort_performsort().
+ *
+ * When this returns, leader process is left in a state that is virtually
+ * indistinguishable from it having generated runs as a serial external sort
+ * might have.
+ */
+static void
+leader_takeover_tapes(Tuplesortstate *state)
+{
+ Sharedsort *shared = state->shared;
+ int nParticipants = state->nParticipants;
+ int workersFinished;
+ int j;
+
+ Assert(LEADER(state));
+ Assert(nParticipants >= 1);
+
+ SpinLockAcquire(&shared->mutex);
+ workersFinished = shared->workersFinished;
+ SpinLockRelease(&shared->mutex);
+
+ if (nParticipants != workersFinished)
+ elog(ERROR, "cannot take over tapes before all workers finish");
+
+ /*
+ * Create the tapeset from worker tapes, including a leader-owned tape at
+ * the end. Parallel workers are far more expensive than logical tapes,
+ * so the number of tapes allocated here should never be excessive.
+ *
+ * We still have a leader tape, though it's not possible to write to it
+ * due to restrictions in the shared fileset infrastructure used by
+ * logtape.c. It will never be written to in practice because
+ * randomAccess is disallowed for parallel sorts.
+ */
+ inittapestate(state, nParticipants + 1);
+ state->tapeset = LogicalTapeSetCreate(nParticipants + 1, shared->tapes,
+ &shared->fileset, state->worker);
+
+ /* mergeruns() relies on currentRun for # of runs (in one-pass cases) */
+ state->currentRun = nParticipants;
+
+ /*
+ * Initialize variables of Algorithm D to be consistent with runs from
+ * workers having been generated in the leader.
+ *
+ * There will always be exactly 1 run per worker, and exactly one input
+ * tape per run, because workers always output exactly 1 run, even when
+ * there were no input tuples for workers to sort.
+ */
+ for (j = 0; j < state->maxTapes; j++)
+ {
+ /* One real run; no dummy runs for worker tapes */
+ state->tp_fib[j] = 1;
+ state->tp_runs[j] = 1;
+ state->tp_dummy[j] = 0;
+ state->tp_tapenum[j] = j;
+ }
+ /* Leader tape gets one dummy run, and no real runs */
+ state->tp_fib[state->tapeRange] = 0;
+ state->tp_runs[state->tapeRange] = 0;
+ state->tp_dummy[state->tapeRange] = 1;
+
+ state->Level = 1;
+ state->destTape = 0;
+
+ state->status = TSS_BUILDRUNS;
+}
+
/*
* Convenience routine to free a tuple previously loaded into sort memory
*/