* the merge is complete. The basic merge algorithm thus needs very little
* memory --- only M tuples for an M-way merge, and M is constrained to a
* small number. However, we can still make good use of our full workMem
- * allocation by pre-reading additional tuples from each source tape. Without
+ * allocation by pre-reading additional blocks from each source tape. Without
* prereading, our access pattern to the temporary file would be very erratic;
* on average we'd read one block from each of M source tapes during the same
* time that we're writing M blocks to the output tape, so there is no
* worse when it comes time to read that tape. A straightforward merge pass
* thus ends up doing a lot of waiting for disk seeks. We can improve matters
* by prereading from each source tape sequentially, loading about workMem/M
- * bytes from each tape in turn. Then we run the merge algorithm, writing but
- * not reading until one of the preloaded tuple series runs out. Then we
- * switch back to preread mode, fill memory again, and repeat. This approach
- * helps to localize both read and write accesses.
+ * bytes from each tape in turn, and making the sequential blocks immediately
+ * available for reuse. This approach helps to localize both read and write
+ * accesses. The pre-reading is handled by logtape.c, we just tell it how
+ * much memory to use for the buffers.
*
* When the caller requests random access to the sort result, we form
* the final sorted run on a logical tape which is then "frozen", so
* The objects we actually sort are SortTuple structs. These contain
* a pointer to the tuple proper (might be a MinimalTuple or IndexTuple),
* which is a separate palloc chunk --- we assume it is just one chunk and
- * can be freed by a simple pfree() (except during final on-the-fly merge,
- * when memory is used in batch). SortTuples also contain the tuple's
- * first key column in Datum/nullflag format, and an index integer.
+ * can be freed by a simple pfree() (except during merge, when we use a
+ * simple slab allocator). SortTuples also contain the tuple's first key
+ * column in Datum/nullflag format, and an index integer.
*
* Storing the first key column lets us save heap_getattr or index_getattr
* calls during tuple comparisons. We could extract and save all the key
* it now only distinguishes RUN_FIRST and HEAP_RUN_NEXT, since replacement
* selection is always abandoned after the first run; no other run number
* should be represented here. During merge passes, we re-use it to hold the
- * input tape number that each tuple in the heap was read from, or to hold the
- * index of the next tuple pre-read from the same tape in the case of pre-read
- * entries. tupindex goes unused if the sort occurs entirely in memory.
+ * input tape number that each tuple in the heap was read from. tupindex goes
+ * unused if the sort occurs entirely in memory.
*/
typedef struct
{
int tupindex; /* see notes above */
} SortTuple;
+/*
+ * During merge, we use a pre-allocated set of fixed-size slots to hold
+ * tuples. To avoid palloc/pfree overhead.
+ *
+ * Merge doesn't require a lot of memory, so we can afford to waste some,
+ * by using gratuitously-sized slots. If a tuple is larger than 1 kB, the
+ * palloc() overhead is not significant anymore.
+ *
+ * 'nextfree' is valid when this chunk is in the free list. When in use, the
+ * slot holds a tuple.
+ */
+#define SLAB_SLOT_SIZE 1024
+
+typedef union SlabSlot
+{
+ union SlabSlot *nextfree;
+ char buffer[SLAB_SLOT_SIZE];
+} SlabSlot;
/*
* Possible states of a Tuplesort object. These denote the states that
/*
* Function to write a stored tuple onto tape. The representation of the
* tuple on tape need not be the same as it is in memory; requirements on
- * the tape representation are given below. After writing the tuple,
- * pfree() the out-of-line data (not the SortTuple struct!), and increase
- * state->availMem by the amount of memory space thereby released.
+ * the tape representation are given below. Unless the slab allocator is
+ * used, after writing the tuple, pfree() the out-of-line data (not the
+ * SortTuple struct!), and increase state->availMem by the amount of
+ * memory space thereby released.
*/
void (*writetup) (Tuplesortstate *state, int tapenum,
SortTuple *stup);
/*
* Function to read a stored tuple from tape back into memory. 'len' is
- * the already-read length of the stored tuple. Create a palloc'd copy,
- * initialize tuple/datum1/isnull1 in the target SortTuple struct, and
- * decrease state->availMem by the amount of memory space consumed. (See
- * batchUsed notes for details on how memory is handled when incremental
- * accounting is abandoned.)
+ * the already-read length of the stored tuple. The tuple is allocated
+ * from the slab memory arena, or is palloc'd, see readtup_alloc().
*/
void (*readtup) (Tuplesortstate *state, SortTuple *stup,
int tapenum, unsigned int len);
- /*
- * Function to move a caller tuple. This is usually implemented as a
- * memmove() shim, but function may also perform additional fix-up of
- * caller tuple where needed. Batch memory support requires the movement
- * of caller tuples from one location in memory to another.
- */
- void (*movetup) (void *dest, void *src, unsigned int len);
-
/*
* This array holds the tuples now in sort memory. If we are in state
* INITIAL, the tuples are in no particular order; if we are in state
* SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
* and FINALMERGE, the tuples are organized in "heap" order per Algorithm
- * H. (Note that memtupcount only counts the tuples that are part of the
- * heap --- during merge passes, memtuples[] entries beyond tapeRange are
- * never in the heap and are used to hold pre-read tuples.) In state
- * SORTEDONTAPE, the array is not used.
+ * H. In state SORTEDONTAPE, the array is not used.
*/
SortTuple *memtuples; /* array of SortTuple structs */
int memtupcount; /* number of tuples currently present */
bool growmemtuples; /* memtuples' growth still underway? */
/*
- * Memory for tuples is sometimes allocated in batch, rather than
- * incrementally. This implies that incremental memory accounting has
- * been abandoned. Currently, this only happens for the final on-the-fly
- * merge step. Large batch allocations can store tuples (e.g.
- * IndexTuples) without palloc() fragmentation and other overhead.
+ * Memory for tuples is sometimes allocated using a simple slab allocator,
+ * rather than with palloc(). Currently, we switch to slab allocation
+ * when we start merging. Merging only needs to keep a small, fixed
+ * number of tuples in memory at any time, so we can avoid the
+ * palloc/pfree overhead by recycling a fixed number of fixed-size slots
+ * to hold the tuples.
+ *
+ * For the slab, we use one large allocation, divided into SLAB_SLOT_SIZE
+ * slots. The allocation is sized to have one slot per tape, plus one
+ * additional slot. We need that many slots to hold all the tuples kept
+ * in the heap during merge, plus the one we have last returned from the
+ * sort, with tuplesort_gettuple.
+ *
+ * Initially, all the slots are kept in a linked list of free slots. When
+ * a tuple is read from a tape, it is put to the next available slot, if
+ * it fits. If the tuple is larger than SLAB_SLOT_SIZE, it is palloc'd
+ * instead.
+ *
+ * When we're done processing a tuple, we return the slot back to the free
+ * list, or pfree() if it was palloc'd. We know that a tuple was
+ * allocated from the slab, if its pointer value is between
+ * slabMemoryBegin and -End.
+ *
+ * When the slab allocator is used, the USEMEM/LACKMEM mechanism of
+ * tracking memory usage is not used.
+ */
+ bool slabAllocatorUsed;
+
+ char *slabMemoryBegin; /* beginning of slab memory arena */
+ char *slabMemoryEnd; /* end of slab memory arena */
+ SlabSlot *slabFreeHead; /* head of free list */
+
+ /*
+ * When we return a tuple to the caller in tuplesort_gettuple_XXX, that
+ * came from a tape (that is, in TSS_SORTEDONTAPE or TSS_FINALMERGE
+ * modes), we remember the tuple in 'lastReturnedTuple', so that we can
+ * recycle the memory on next gettuple call.
*/
- bool batchUsed;
+ void *lastReturnedTuple;
/*
* While building initial runs, this indicates if the replacement
*/
/*
- * These variables are only used during merge passes. mergeactive[i] is
- * true if we are reading an input run from (actual) tape number i and
- * have not yet exhausted that run. mergenext[i] is the memtuples index
- * of the next pre-read tuple (next to be loaded into the heap) for tape
- * i, or 0 if we are out of pre-read tuples. mergelast[i] similarly
- * points to the last pre-read tuple from each tape. mergeavailslots[i]
- * is the number of unused memtuples[] slots reserved for tape i, and
- * mergeavailmem[i] is the amount of unused space allocated for tape i.
- * mergefreelist and mergefirstfree keep track of unused locations in the
- * memtuples[] array. The memtuples[].tupindex fields link together
- * pre-read tuples for each tape as well as recycled locations in
- * mergefreelist. It is OK to use 0 as a null link in these lists, because
- * memtuples[0] is part of the merge heap and is never a pre-read tuple.
+ * This variable is only used during merge passes. mergeactive[i] is true
+ * if we are reading an input run from (actual) tape number i and have not
+ * yet exhausted that run.
*/
bool *mergeactive; /* active input run source? */
- int *mergenext; /* first preread tuple for each source */
- int *mergelast; /* last preread tuple for each source */
- int *mergeavailslots; /* slots left for prereading each tape */
- int64 *mergeavailmem; /* availMem for prereading each tape */
- int mergefreelist; /* head of freelist of recycled slots */
- int mergefirstfree; /* first slot never used in this merge */
-
- /*
- * Per-tape batch state, when final on-the-fly merge consumes memory from
- * just a few large allocations.
- *
- * Aside from the general benefits of performing fewer individual retail
- * palloc() calls, this also helps make merging more cache efficient,
- * since each tape's tuples must naturally be accessed sequentially (in
- * sorted order).
- */
- int64 spacePerTape; /* Space (memory) for tuples (not slots) */
- char **mergetuples; /* Each tape's memory allocation */
- char **mergecurrent; /* Current offset into each tape's memory */
- char **mergetail; /* Last item's start point for each tape */
- char **mergeoverflow; /* Retail palloc() "overflow" for each tape */
/*
* Variables for Algorithm D. Note that destTape is a "logical" tape
#endif
};
+/*
+ * Is the given tuple allocated from the slab memory arena?
+ */
+#define IS_SLAB_SLOT(state, tuple) \
+ ((char *) (tuple) >= (state)->slabMemoryBegin && \
+ (char *) (tuple) < (state)->slabMemoryEnd)
+
+/*
+ * Return the given tuple to the slab memory free list, or free it
+ * if it was palloc'd.
+ */
+#define RELEASE_SLAB_SLOT(state, tuple) \
+ do { \
+ SlabSlot *buf = (SlabSlot *) tuple; \
+ \
+ if (IS_SLAB_SLOT((state), buf)) \
+ { \
+ buf->nextfree = (state)->slabFreeHead; \
+ (state)->slabFreeHead = buf; \
+ } else \
+ pfree(buf); \
+ } while(0)
+
#define COMPARETUP(state,a,b) ((*(state)->comparetup) (a, b, state))
#define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup))
#define WRITETUP(state,tape,stup) ((*(state)->writetup) (state, tape, stup))
#define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len))
-#define MOVETUP(dest,src,len) ((*(state)->movetup) (dest, src, len))
-#define LACKMEM(state) ((state)->availMem < 0 && !(state)->batchUsed)
+#define LACKMEM(state) ((state)->availMem < 0 && !(state)->slabAllocatorUsed)
#define USEMEM(state,amt) ((state)->availMem -= (amt))
#define FREEMEM(state,amt) ((state)->availMem += (amt))
static bool useselection(Tuplesortstate *state);
static void inittapes(Tuplesortstate *state);
static void selectnewtape(Tuplesortstate *state);
+static void init_slab_allocator(Tuplesortstate *state, int numSlots);
+static void init_tape_buffers(Tuplesortstate *state, int numInputTapes);
static void mergeruns(Tuplesortstate *state);
static void mergeonerun(Tuplesortstate *state);
-static void beginmerge(Tuplesortstate *state, bool finalMergeBatch);
-static void batchmemtuples(Tuplesortstate *state);
-static void mergebatch(Tuplesortstate *state, int64 spacePerTape);
-static void mergebatchone(Tuplesortstate *state, int srcTape,
- SortTuple *stup, bool *should_free);
-static void mergebatchfreetape(Tuplesortstate *state, int srcTape,
- SortTuple *rtup, bool *should_free);
-static void *mergebatchalloc(Tuplesortstate *state, int tapenum, Size tuplen);
-static void mergepreread(Tuplesortstate *state);
-static void mergeprereadone(Tuplesortstate *state, int srcTape);
+static void beginmerge(Tuplesortstate *state);
+static bool mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup);
static void dumptuples(Tuplesortstate *state, bool alltuples);
static void dumpbatch(Tuplesortstate *state, bool alltuples);
static void make_bounded_heap(Tuplesortstate *state);
static void reversedirection(Tuplesortstate *state);
static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
static void markrunend(Tuplesortstate *state, int tapenum);
-static void *readtup_alloc(Tuplesortstate *state, int tapenum, Size tuplen);
+static void *readtup_alloc(Tuplesortstate *state, Size tuplen);
static int comparetup_heap(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state);
static void copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup);
SortTuple *stup);
static void readtup_heap(Tuplesortstate *state, SortTuple *stup,
int tapenum, unsigned int len);
-static void movetup_heap(void *dest, void *src, unsigned int len);
static int comparetup_cluster(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state);
static void copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup);
SortTuple *stup);
static void readtup_cluster(Tuplesortstate *state, SortTuple *stup,
int tapenum, unsigned int len);
-static void movetup_cluster(void *dest, void *src, unsigned int len);
static int comparetup_index_btree(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state);
static int comparetup_index_hash(const SortTuple *a, const SortTuple *b,
SortTuple *stup);
static void readtup_index(Tuplesortstate *state, SortTuple *stup,
int tapenum, unsigned int len);
-static void movetup_index(void *dest, void *src, unsigned int len);
static int comparetup_datum(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state);
static void copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup);
SortTuple *stup);
static void readtup_datum(Tuplesortstate *state, SortTuple *stup,
int tapenum, unsigned int len);
-static void movetup_datum(void *dest, void *src, unsigned int len);
static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
/*
ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1);
state->growmemtuples = true;
- state->batchUsed = false;
+ state->slabAllocatorUsed = false;
state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
USEMEM(state, GetMemoryChunkSpace(state->memtuples));
state->copytup = copytup_heap;
state->writetup = writetup_heap;
state->readtup = readtup_heap;
- state->movetup = movetup_heap;
state->tupDesc = tupDesc; /* assume we need not copy tupDesc */
state->abbrevNext = 10;
state->copytup = copytup_cluster;
state->writetup = writetup_cluster;
state->readtup = readtup_cluster;
- state->movetup = movetup_cluster;
state->abbrevNext = 10;
state->indexInfo = BuildIndexInfo(indexRel);
state->copytup = copytup_index;
state->writetup = writetup_index;
state->readtup = readtup_index;
- state->movetup = movetup_index;
state->abbrevNext = 10;
state->heapRel = heapRel;
state->copytup = copytup_index;
state->writetup = writetup_index;
state->readtup = readtup_index;
- state->movetup = movetup_index;
state->heapRel = heapRel;
state->indexRel = indexRel;
state->copytup = copytup_datum;
state->writetup = writetup_datum;
state->readtup = readtup_datum;
- state->movetup = movetup_datum;
state->abbrevNext = 10;
state->datumType = datumType;
{
case TSS_SORTEDINMEM:
Assert(forward || state->randomAccess);
- Assert(!state->batchUsed);
+ Assert(!state->slabAllocatorUsed);
*should_free = false;
if (forward)
{
case TSS_SORTEDONTAPE:
Assert(forward || state->randomAccess);
- Assert(!state->batchUsed);
- *should_free = true;
+ Assert(state->slabAllocatorUsed);
+
+ /*
+ * The slot that held the tuple that we returned in previous
+ * gettuple call can now be reused.
+ */
+ if (state->lastReturnedTuple)
+ {
+ RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
+ state->lastReturnedTuple = NULL;
+ }
+
if (forward)
{
if (state->eof_reached)
return false;
+
if ((tuplen = getlen(state, state->result_tape, true)) != 0)
{
READTUP(state, stup, state->result_tape, tuplen);
+
+ /*
+ * Remember the tuple we return, so that we can recycle
+ * its memory on next call. (This can be NULL, in the
+ * !state->tuples case).
+ */
+ state->lastReturnedTuple = stup->tuple;
+
+ *should_free = false;
return true;
}
else
tuplen))
elog(ERROR, "bogus tuple length in backward scan");
READTUP(state, stup, state->result_tape, tuplen);
+
+ /*
+ * Remember the tuple we return, so that we can recycle its memory
+ * on next call. (This can be NULL, in the Datum case).
+ */
+ state->lastReturnedTuple = stup->tuple;
+
+ *should_free = false;
return true;
case TSS_FINALMERGE:
Assert(forward);
- Assert(state->batchUsed || !state->tuples);
- /* For now, assume tuple is stored in tape's batch memory */
+ /* We are managing memory ourselves, with the slab allocator. */
+ Assert(state->slabAllocatorUsed);
*should_free = false;
+ /*
+ * The slab slot holding the tuple that we returned in previous
+ * gettuple call can now be reused.
+ */
+ if (state->lastReturnedTuple)
+ {
+ RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
+ state->lastReturnedTuple = NULL;
+ }
+
/*
* This code should match the inner loop of mergeonerun().
*/
if (state->memtupcount > 0)
{
int srcTape = state->memtuples[0].tupindex;
- int tupIndex;
- SortTuple *newtup;
+ SortTuple newtup;
+
+ *stup = state->memtuples[0];
/*
- * Returned tuple is still counted in our memory space most of
- * the time. See mergebatchone() for discussion of why caller
- * may occasionally be required to free returned tuple, and
- * how preread memory is managed with regard to edge cases
- * more generally.
+ * Remember the tuple we return, so that we can recycle its
+ * memory on next call. (This can be NULL, in the Datum case).
*/
- *stup = state->memtuples[0];
- if ((tupIndex = state->mergenext[srcTape]) == 0)
+ state->lastReturnedTuple = stup->tuple;
+
+ /*
+ * Pull next tuple from tape, and replace the returned tuple
+ * at top of the heap with it.
+ */
+ if (!mergereadnext(state, srcTape, &newtup))
{
/*
- * out of preloaded data on this tape, try to read more
- *
- * Unlike mergeonerun(), we only preload from the single
- * tape that's run dry, though not before preparing its
- * batch memory for a new round of sequential consumption.
- * See mergepreread() comments.
+ * If no more data, we've reached end of run on this tape.
+ * Remove the top node from the heap.
*/
- if (state->batchUsed)
- mergebatchone(state, srcTape, stup, should_free);
-
- mergeprereadone(state, srcTape);
+ tuplesort_heap_delete_top(state, false);
/*
- * if still no data, we've reached end of run on this tape
+ * Rewind to free the read buffer. It'd go away at the
+ * end of the sort anyway, but better to release the
+ * memory early.
*/
- if ((tupIndex = state->mergenext[srcTape]) == 0)
- {
- /* Remove the top node from the heap */
- tuplesort_heap_delete_top(state, false);
- /* Free tape's buffer, avoiding dangling pointer */
- if (state->batchUsed)
- mergebatchfreetape(state, srcTape, stup, should_free);
- return true;
- }
+ LogicalTapeRewind(state->tapeset, srcTape, true);
+ return true;
}
-
- /*
- * pull next preread tuple from list, and replace the returned
- * tuple at top of the heap with it.
- */
- newtup = &state->memtuples[tupIndex];
- state->mergenext[srcTape] = newtup->tupindex;
- if (state->mergenext[srcTape] == 0)
- state->mergelast[srcTape] = 0;
- newtup->tupindex = srcTape;
- tuplesort_heap_replace_top(state, newtup, false);
- /* put the now-unused memtuples entry on the freelist */
- newtup->tupindex = state->mergefreelist;
- state->mergefreelist = tupIndex;
- state->mergeavailslots[srcTape]++;
+ newtup.tupindex = srcTape;
+ tuplesort_heap_replace_top(state, &newtup, false);
return true;
}
return false;
/* Compute number of tapes to use: merge order plus 1 */
maxTapes = tuplesort_merge_order(state->allowedMem) + 1;
- /*
- * We must have at least 2*maxTapes slots in the memtuples[] array, else
- * we'd not have room for merge heap plus preread. It seems unlikely that
- * this case would ever occur, but be safe.
- */
- maxTapes = Min(maxTapes, state->memtupsize / 2);
-
state->maxTapes = maxTapes;
state->tapeRange = maxTapes - 1;
#endif
/*
- * Decrease availMem to reflect the space needed for tape buffers; but
- * don't decrease it to the point that we have no room for tuples. (That
- * case is only likely to occur if sorting pass-by-value Datums; in all
- * other scenarios the memtuples[] array is unlikely to occupy more than
- * half of allowedMem. In the pass-by-value case it's not important to
- * account for tuple space, so we don't care if LACKMEM becomes
- * inaccurate.)
+ * Decrease availMem to reflect the space needed for tape buffers, when
+ * writing the initial runs; but don't decrease it to the point that we
+ * have no room for tuples. (That case is only likely to occur if sorting
+ * pass-by-value Datums; in all other scenarios the memtuples[] array is
+ * unlikely to occupy more than half of allowedMem. In the pass-by-value
+ * case it's not important to account for tuple space, so we don't care if
+ * LACKMEM becomes inaccurate.)
*/
tapeSpace = (int64) maxTapes *TAPE_BUFFER_OVERHEAD;
state->tapeset = LogicalTapeSetCreate(maxTapes);
state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool));
- state->mergenext = (int *) palloc0(maxTapes * sizeof(int));
- state->mergelast = (int *) palloc0(maxTapes * sizeof(int));
- state->mergeavailslots = (int *) palloc0(maxTapes * sizeof(int));
- state->mergeavailmem = (int64 *) palloc0(maxTapes * sizeof(int64));
- state->mergetuples = (char **) palloc0(maxTapes * sizeof(char *));
- state->mergecurrent = (char **) palloc0(maxTapes * sizeof(char *));
- state->mergetail = (char **) palloc0(maxTapes * sizeof(char *));
- state->mergeoverflow = (char **) palloc0(maxTapes * sizeof(char *));
state->tp_fib = (int *) palloc0(maxTapes * sizeof(int));
state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int));
state->destTape = 0;
}
+/*
+ * Initialize the slab allocation arena, for the given number of slots.
+ */
+static void
+init_slab_allocator(Tuplesortstate *state, int numSlots)
+{
+ if (numSlots > 0)
+ {
+ char *p;
+ int i;
+
+ state->slabMemoryBegin = palloc(numSlots * SLAB_SLOT_SIZE);
+ state->slabMemoryEnd = state->slabMemoryBegin +
+ numSlots * SLAB_SLOT_SIZE;
+ state->slabFreeHead = (SlabSlot *) state->slabMemoryBegin;
+ USEMEM(state, numSlots * SLAB_SLOT_SIZE);
+
+ p = state->slabMemoryBegin;
+ for (i = 0; i < numSlots - 1; i++)
+ {
+ ((SlabSlot *) p)->nextfree = (SlabSlot *) (p + SLAB_SLOT_SIZE);
+ p += SLAB_SLOT_SIZE;
+ }
+ ((SlabSlot *) p)->nextfree = NULL;
+ }
+ else
+ {
+ state->slabMemoryBegin = state->slabMemoryEnd = NULL;
+ state->slabFreeHead = NULL;
+ }
+ state->slabAllocatorUsed = true;
+}
+
+/*
+ * Divide all remaining work memory (availMem) as read buffers, for all
+ * the tapes that will be used during the merge.
+ *
+ * We use the number of possible *input* tapes here, rather than maxTapes,
+ * for the calculation. At all times, we'll be reading from at most
+ * numInputTapes tapes, and one tape is used for output (unless we do an
+ * on-the-fly final merge, in which case we don't have an output tape).
+ */
+static void
+init_tape_buffers(Tuplesortstate *state, int numInputTapes)
+{
+ int64 availBlocks;
+ int64 blocksPerTape;
+ int remainder;
+ int tapenum;
+
+ /*
+ * Divide availMem evenly among the number of input tapes.
+ */
+ availBlocks = state->availMem / BLCKSZ;
+ blocksPerTape = availBlocks / numInputTapes;
+ remainder = availBlocks % numInputTapes;
+ USEMEM(state, availBlocks * BLCKSZ);
+
+#ifdef TRACE_SORT
+ if (trace_sort)
+ elog(LOG, "using " INT64_FORMAT " KB of memory for read buffers among %d input tapes",
+ (availBlocks * BLCKSZ) / 1024, numInputTapes);
+#endif
+
+ /*
+ * Use one page per tape, even if we are out of memory.
+ * tuplesort_merge_order() should've chosen the number of tapes so that
+ * this can't happen, but better safe than sorry. (This also protects
+ * from a negative availMem.)
+ */
+ if (blocksPerTape < 1)
+ {
+ blocksPerTape = 1;
+ remainder = 0;
+ }
+
+ /*
+ * Set the buffers for the tapes.
+ *
+ * In a multi-phase merge, the tape that is initially used as an output
+ * tape, will later be rewound and read from, and should also use a large
+ * buffer at that point. So we must loop up to maxTapes, not just
+ * numInputTapes!
+ *
+ * If there are fewer runs than tapes, we will set the buffer size also
+ * for tapes that will go completely unused, but that's harmless.
+ * LogicalTapeAssignReadBufferSize() doesn't allocate the buffer
+ * immediately, it just sets the size that will be used, when the tape is
+ * rewound for read, and the tape isn't empty.
+ */
+ for (tapenum = 0; tapenum < state->maxTapes; tapenum++)
+ {
+ int64 numBlocks = blocksPerTape + (tapenum < remainder ? 1 : 0);
+
+ LogicalTapeAssignReadBufferSize(state->tapeset, tapenum,
+ numBlocks * BLCKSZ);
+ }
+}
+
/*
* mergeruns -- merge all the completed initial runs.
*
svTape,
svRuns,
svDummy;
+ int numTapes;
+ int numInputTapes;
Assert(state->status == TSS_BUILDRUNS);
Assert(state->memtupcount == 0);
state->sortKeys->abbrev_full_comparator = NULL;
}
+ /*
+ * Reset tuple memory. We've freed all the tuples that we previously
+ * allocated. We will use the slab allocator from now on.
+ */
+ MemoryContextDelete(state->tuplecontext);
+ state->tuplecontext = NULL;
+
+ /*
+ * We no longer need a large memtuples array. (We will allocate a smaller
+ * one for the heap later.)
+ */
+ FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
+ pfree(state->memtuples);
+ state->memtuples = NULL;
+
+ /*
+ * If we had fewer runs than tapes, refund the memory that we imagined we
+ * would need for the tape buffers of the unused tapes.
+ *
+ * numTapes and numInputTapes reflect the actual number of tapes we will
+ * use. Note that the output tape's tape number is maxTapes - 1, so the
+ * tape numbers of the used tapes are not consecutive, and you cannot just
+ * loop from 0 to numTapes to visit all used tapes!
+ */
+ if (state->Level == 1)
+ {
+ numInputTapes = state->currentRun;
+ numTapes = numInputTapes + 1;
+ FREEMEM(state, (state->maxTapes - numTapes) * TAPE_BUFFER_OVERHEAD);
+ }
+ else
+ {
+ numInputTapes = state->tapeRange;
+ numTapes = state->maxTapes;
+ }
+
+ /*
+ * Initialize the slab allocator. We need one slab slot per input tape,
+ * for the tuples in the heap, plus one to hold the tuple last returned
+ * from tuplesort_gettuple. (If we're sorting pass-by-val Datums,
+ * however, we don't need to do allocate anything.)
+ *
+ * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism
+ * to track memory usage of individual tuples.
+ */
+ if (state->tuples)
+ init_slab_allocator(state, numInputTapes + 1);
+ else
+ init_slab_allocator(state, 0);
+
/*
* If we produced only one initial run (quite likely if the total data
* volume is between 1X and 2X workMem when replacement selection is used,
return;
}
+ /*
+ * Use all the spare memory we have available for read buffers for the
+ * tapes.
+ *
+ * We do this only after checking for the case that we produced only one
+ * initial run, because there is no need to use a large read buffer when
+ * we're reading from a single tape. With one tape, the I/O pattern will
+ * be the same regardless of the buffer size.
+ *
+ * We don't try to "rebalance" the amount of memory among tapes, when we
+ * start a new merge phase, even if some tapes can be inactive in the
+ * phase. That would be hard, because logtape.c doesn't know where one
+ * run ends and another begins. When a new merge phase begins, and a tape
+ * doesn't participate in it, its buffer nevertheless already contains
+ * tuples from the next run on same tape, so we cannot release the buffer.
+ * That's OK in practice, merge performance isn't that sensitive to the
+ * amount of buffers used, and most merge phases use all or almost all
+ * tapes, anyway.
+ */
+ init_tape_buffers(state, numInputTapes);
+
+ /*
+ * Allocate a new 'memtuples' array, for the heap. It will hold one tuple
+ * from each input tape.
+ */
+ state->memtupsize = numInputTapes;
+ state->memtuples = (SortTuple *) palloc(numInputTapes * sizeof(SortTuple));
+ USEMEM(state, GetMemoryChunkSpace(state->memtuples));
+
/* End of step D2: rewind all output tapes to prepare for merging */
for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
LogicalTapeRewind(state->tapeset, tapenum, false);
/* Tell logtape.c we won't be writing anymore */
LogicalTapeSetForgetFreeSpace(state->tapeset);
/* Initialize for the final merge pass */
- beginmerge(state, state->tuples);
+ beginmerge(state);
state->status = TSS_FINALMERGE;
return;
}
state->result_tape = state->tp_tapenum[state->tapeRange];
LogicalTapeFreeze(state->tapeset, state->result_tape);
state->status = TSS_SORTEDONTAPE;
+
+ /* Release the read buffers of all the other tapes, by rewinding them. */
+ for (tapenum = 0; tapenum < state->maxTapes; tapenum++)
+ {
+ if (tapenum != state->result_tape)
+ LogicalTapeRewind(state->tapeset, tapenum, true);
+ }
}
/*
{
int destTape = state->tp_tapenum[state->tapeRange];
int srcTape;
- int tupIndex;
- SortTuple *tup;
- int64 priorAvail,
- spaceFreed;
/*
* Start the merge by loading one tuple from each active source tape into
* the heap. We can also decrease the input run/dummy run counts.
*/
- beginmerge(state, false);
+ beginmerge(state);
/*
* Execute merge by repeatedly extracting lowest tuple in heap, writing it
*/
while (state->memtupcount > 0)
{
+ SortTuple stup;
+
/* write the tuple to destTape */
- priorAvail = state->availMem;
srcTape = state->memtuples[0].tupindex;
WRITETUP(state, destTape, &state->memtuples[0]);
- /* writetup adjusted total free space, now fix per-tape space */
- spaceFreed = state->availMem - priorAvail;
- state->mergeavailmem[srcTape] += spaceFreed;
- if ((tupIndex = state->mergenext[srcTape]) == 0)
- {
- /* out of preloaded data on this tape, try to read more */
- mergepreread(state);
- /* if still no data, we've reached end of run on this tape */
- if ((tupIndex = state->mergenext[srcTape]) == 0)
- {
- /* remove the written-out tuple from the heap */
- tuplesort_heap_delete_top(state, false);
- continue;
- }
- }
+
+ /* recycle the slot of the tuple we just wrote out, for the next read */
+ RELEASE_SLAB_SLOT(state, state->memtuples[0].tuple);
/*
- * pull next preread tuple from list, and replace the written-out
- * tuple in the heap with it.
+ * pull next tuple from the tape, and replace the written-out tuple in
+ * the heap with it.
*/
- tup = &state->memtuples[tupIndex];
- state->mergenext[srcTape] = tup->tupindex;
- if (state->mergenext[srcTape] == 0)
- state->mergelast[srcTape] = 0;
- tup->tupindex = srcTape;
- tuplesort_heap_replace_top(state, tup, false);
- /* put the now-unused memtuples entry on the freelist */
- tup->tupindex = state->mergefreelist;
- state->mergefreelist = tupIndex;
- state->mergeavailslots[srcTape]++;
- }
+ if (mergereadnext(state, srcTape, &stup))
+ {
+ stup.tupindex = srcTape;
+ tuplesort_heap_replace_top(state, &stup, false);
- /*
- * Reset tuple memory. We've freed all of the tuples that we previously
- * allocated, but AllocSetFree will have put those chunks of memory on
- * particular free lists, bucketed by size class. Thus, although all of
- * that memory is free, it is effectively fragmented. Resetting the
- * context gets us out from under that problem.
- */
- MemoryContextReset(state->tuplecontext);
+ }
+ else
+ tuplesort_heap_delete_top(state, false);
+ }
/*
* When the heap empties, we're done. Write an end-of-run marker on the
* which tapes contain active input runs in mergeactive[]. Then, load
* as many tuples as we can from each active input tape, and finally
* fill the merge heap with the first tuple from each active tape.
- *
- * finalMergeBatch indicates if this is the beginning of a final on-the-fly
- * merge where a batched allocation of tuple memory is required.
*/
static void
-beginmerge(Tuplesortstate *state, bool finalMergeBatch)
+beginmerge(Tuplesortstate *state)
{
int activeTapes;
int tapenum;
int srcTape;
- int slotsPerTape;
- int64 spacePerTape;
/* Heap should be empty here */
Assert(state->memtupcount == 0);
activeTapes++;
}
}
- state->activeTapes = activeTapes;
-
- /* Clear merge-pass state variables */
- memset(state->mergenext, 0,
- state->maxTapes * sizeof(*state->mergenext));
- memset(state->mergelast, 0,
- state->maxTapes * sizeof(*state->mergelast));
- state->mergefreelist = 0; /* nothing in the freelist */
- state->mergefirstfree = activeTapes; /* 1st slot avail for preread */
-
- if (finalMergeBatch)
- {
- /* Free outright buffers for tape never actually allocated */
- FREEMEM(state, (state->maxTapes - activeTapes) * TAPE_BUFFER_OVERHEAD);
-
- /*
- * Grow memtuples one last time, since the palloc() overhead no longer
- * incurred can make a big difference
- */
- batchmemtuples(state);
- }
-
- /*
- * Initialize space allocation to let each active input tape have an equal
- * share of preread space.
- */
Assert(activeTapes > 0);
- slotsPerTape = (state->memtupsize - state->mergefirstfree) / activeTapes;
- Assert(slotsPerTape > 0);
- spacePerTape = MAXALIGN_DOWN(state->availMem / activeTapes);
- for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
- {
- if (state->mergeactive[srcTape])
- {
- state->mergeavailslots[srcTape] = slotsPerTape;
- state->mergeavailmem[srcTape] = spacePerTape;
- }
- }
-
- /*
- * Preallocate tuple batch memory for each tape. This is the memory used
- * for tuples themselves (not SortTuples), so it's never used by
- * pass-by-value datum sorts. Memory allocation is performed here at most
- * once per sort, just in advance of the final on-the-fly merge step.
- */
- if (finalMergeBatch)
- mergebatch(state, spacePerTape);
-
- /*
- * Preread as many tuples as possible (and at least one) from each active
- * tape
- */
- mergepreread(state);
+ state->activeTapes = activeTapes;
/* Load the merge heap with the first tuple from each input tape */
for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
{
- int tupIndex = state->mergenext[srcTape];
- SortTuple *tup;
+ SortTuple tup;
- if (tupIndex)
+ if (mergereadnext(state, srcTape, &tup))
{
- tup = &state->memtuples[tupIndex];
- state->mergenext[srcTape] = tup->tupindex;
- if (state->mergenext[srcTape] == 0)
- state->mergelast[srcTape] = 0;
- tup->tupindex = srcTape;
- tuplesort_heap_insert(state, tup, false);
- /* put the now-unused memtuples entry on the freelist */
- tup->tupindex = state->mergefreelist;
- state->mergefreelist = tupIndex;
- state->mergeavailslots[srcTape]++;
-
-#ifdef TRACE_SORT
- if (trace_sort && finalMergeBatch)
- {
- int64 perTapeKB = (spacePerTape + 1023) / 1024;
- int64 usedSpaceKB;
- int usedSlots;
-
- /*
- * Report how effective batchmemtuples() was in balancing the
- * number of slots against the need for memory for the
- * underlying tuples (e.g. IndexTuples). The big preread of
- * all tapes when switching to FINALMERGE state should be
- * fairly representative of memory utilization during the
- * final merge step, and in any case is the only point at
- * which all tapes are guaranteed to have depleted either
- * their batch memory allowance or slot allowance. Ideally,
- * both will be completely depleted for every tape by now.
- */
- usedSpaceKB = (state->mergecurrent[srcTape] -
- state->mergetuples[srcTape] + 1023) / 1024;
- usedSlots = slotsPerTape - state->mergeavailslots[srcTape];
-
- elog(LOG, "tape %d initially used " INT64_FORMAT " KB of "
- INT64_FORMAT " KB batch (%2.3f) and %d out of %d slots "
- "(%2.3f)", srcTape,
- usedSpaceKB, perTapeKB,
- (double) usedSpaceKB / (double) perTapeKB,
- usedSlots, slotsPerTape,
- (double) usedSlots / (double) slotsPerTape);
- }
-#endif
+ tup.tupindex = srcTape;
+ tuplesort_heap_insert(state, &tup, false);
}
}
}
/*
- * batchmemtuples - grow memtuples without palloc overhead
- *
- * When called, availMem should be approximately the amount of memory we'd
- * require to allocate memtupsize - memtupcount tuples (not SortTuples/slots)
- * that were allocated with palloc() overhead, and in doing so use up all
- * allocated slots. However, though slots and tuple memory is in balance
- * following the last grow_memtuples() call, that's predicated on the observed
- * average tuple size for the "final" grow_memtuples() call, which includes
- * palloc overhead. During the final merge pass, where we will arrange to
- * squeeze out the palloc overhead, we might need more slots in the memtuples
- * array.
- *
- * To make that happen, arrange for the amount of remaining memory to be
- * exactly equal to the palloc overhead multiplied by the current size of
- * the memtuples array, force the grow_memtuples flag back to true (it's
- * probably but not necessarily false on entry to this routine), and then
- * call grow_memtuples. This simulates loading enough tuples to fill the
- * whole memtuples array and then having some space left over because of the
- * elided palloc overhead. We expect that grow_memtuples() will conclude that
- * it can't double the size of the memtuples array but that it can increase
- * it by some percentage; but if it does decide to double it, that just means
- * that we've never managed to use many slots in the memtuples array, in which
- * case doubling it shouldn't hurt anything anyway.
- */
-static void
-batchmemtuples(Tuplesortstate *state)
-{
- int64 refund;
- int64 availMemLessRefund;
- int memtupsize = state->memtupsize;
-
- /* Caller error if we have no tapes */
- Assert(state->activeTapes > 0);
-
- /* For simplicity, assume no memtuples are actually currently counted */
- Assert(state->memtupcount == 0);
-
- /*
- * Refund STANDARDCHUNKHEADERSIZE per tuple.
- *
- * This sometimes fails to make memory use perfectly balanced, but it
- * should never make the situation worse. Note that Assert-enabled builds
- * get a larger refund, due to a varying STANDARDCHUNKHEADERSIZE.
- */
- refund = memtupsize * STANDARDCHUNKHEADERSIZE;
- availMemLessRefund = state->availMem - refund;
-
- /*
- * We need to be sure that we do not cause LACKMEM to become true, else
- * the batch allocation size could be calculated as negative, causing
- * havoc. Hence, if availMemLessRefund is negative at this point, we must
- * do nothing. Moreover, if it's positive but rather small, there's
- * little point in proceeding because we could only increase memtuples by
- * a small amount, not worth the cost of the repalloc's. We somewhat
- * arbitrarily set the threshold at ALLOCSET_DEFAULT_INITSIZE per tape.
- * (Note that this does not represent any assumption about tuple sizes.)
- */
- if (availMemLessRefund <=
- (int64) state->activeTapes * ALLOCSET_DEFAULT_INITSIZE)
- return;
-
- /*
- * To establish balanced memory use after refunding palloc overhead,
- * temporarily have our accounting indicate that we've allocated all
- * memory we're allowed to less that refund, and call grow_memtuples() to
- * have it increase the number of slots.
- */
- state->growmemtuples = true;
- USEMEM(state, availMemLessRefund);
- (void) grow_memtuples(state);
- state->growmemtuples = false;
- /* availMem must stay accurate for spacePerTape calculation */
- FREEMEM(state, availMemLessRefund);
- if (LACKMEM(state))
- elog(ERROR, "unexpected out-of-memory situation in tuplesort");
-
-#ifdef TRACE_SORT
- if (trace_sort)
- {
- Size OldKb = (memtupsize * sizeof(SortTuple) + 1023) / 1024;
- Size NewKb = (state->memtupsize * sizeof(SortTuple) + 1023) / 1024;
-
- elog(LOG, "grew memtuples %1.2fx from %d (%zu KB) to %d (%zu KB) for final merge",
- (double) NewKb / (double) OldKb,
- memtupsize, OldKb,
- state->memtupsize, NewKb);
- }
-#endif
-}
-
-/*
- * mergebatch - initialize tuple memory in batch
- *
- * This allows sequential access to sorted tuples buffered in memory from
- * tapes/runs on disk during a final on-the-fly merge step. Note that the
- * memory is not used for SortTuples, but for the underlying tuples (e.g.
- * MinimalTuples).
- *
- * Note that when batch memory is used, there is a simple division of space
- * into large buffers (one per active tape). The conventional incremental
- * memory accounting (calling USEMEM() and FREEMEM()) is abandoned. Instead,
- * when each tape's memory budget is exceeded, a retail palloc() "overflow" is
- * performed, which is then immediately detected in a way that is analogous to
- * LACKMEM(). This keeps each tape's use of memory fair, which is always a
- * goal.
- */
-static void
-mergebatch(Tuplesortstate *state, int64 spacePerTape)
-{
- int srcTape;
-
- Assert(state->activeTapes > 0);
- Assert(state->tuples);
-
- /*
- * For the purposes of tuplesort's memory accounting, the batch allocation
- * is special, and regular memory accounting through USEMEM() calls is
- * abandoned (see mergeprereadone()).
- */
- for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
- {
- char *mergetuples;
-
- if (!state->mergeactive[srcTape])
- continue;
-
- /* Allocate buffer for each active tape */
- mergetuples = MemoryContextAllocHuge(state->tuplecontext,
- spacePerTape);
-
- /* Initialize state for tape */
- state->mergetuples[srcTape] = mergetuples;
- state->mergecurrent[srcTape] = mergetuples;
- state->mergetail[srcTape] = mergetuples;
- state->mergeoverflow[srcTape] = NULL;
- }
-
- state->batchUsed = true;
- state->spacePerTape = spacePerTape;
-}
-
-/*
- * mergebatchone - prepare batch memory for one merge input tape
- *
- * This is called following the exhaustion of preread tuples for one input
- * tape. All that actually occurs is that the state for the source tape is
- * reset to indicate that all memory may be reused.
- *
- * This routine must deal with fixing up the tuple that is about to be returned
- * to the client, due to "overflow" allocations.
- */
-static void
-mergebatchone(Tuplesortstate *state, int srcTape, SortTuple *rtup,
- bool *should_free)
-{
- Assert(state->batchUsed);
-
- /*
- * Tuple about to be returned to caller ("stup") is final preread tuple
- * from tape, just removed from the top of the heap. Special steps around
- * memory management must be performed for that tuple, to make sure it
- * isn't overwritten early.
- */
- if (!state->mergeoverflow[srcTape])
- {
- Size tupLen;
-
- /*
- * Mark tuple buffer range for reuse, but be careful to move final,
- * tail tuple to start of space for next run so that it's available to
- * caller when stup is returned, and remains available at least until
- * the next tuple is requested.
- */
- tupLen = state->mergecurrent[srcTape] - state->mergetail[srcTape];
- state->mergecurrent[srcTape] = state->mergetuples[srcTape];
- MOVETUP(state->mergecurrent[srcTape], state->mergetail[srcTape],
- tupLen);
-
- /* Make SortTuple at top of the merge heap point to new tuple */
- rtup->tuple = (void *) state->mergecurrent[srcTape];
-
- state->mergetail[srcTape] = state->mergecurrent[srcTape];
- state->mergecurrent[srcTape] += tupLen;
- }
- else
- {
- /*
- * Handle an "overflow" retail palloc.
- *
- * This is needed when we run out of tuple memory for the tape.
- */
- state->mergecurrent[srcTape] = state->mergetuples[srcTape];
- state->mergetail[srcTape] = state->mergetuples[srcTape];
-
- if (rtup->tuple)
- {
- Assert(rtup->tuple == (void *) state->mergeoverflow[srcTape]);
- /* Caller should free palloc'd tuple */
- *should_free = true;
- }
- state->mergeoverflow[srcTape] = NULL;
- }
-}
-
-/*
- * mergebatchfreetape - handle final clean-up for batch memory once tape is
- * about to become exhausted
- *
- * All tuples are returned from tape, but a single final tuple, *rtup, is to be
- * passed back to caller. Free tape's batch allocation buffer while ensuring
- * that the final tuple is managed appropriately.
- */
-static void
-mergebatchfreetape(Tuplesortstate *state, int srcTape, SortTuple *rtup,
- bool *should_free)
-{
- Assert(state->batchUsed);
- Assert(state->status == TSS_FINALMERGE);
-
- /*
- * Tuple may or may not already be an overflow allocation from
- * mergebatchone()
- */
- if (!*should_free && rtup->tuple)
- {
- /*
- * Final tuple still in tape's batch allocation.
- *
- * Return palloc()'d copy to caller, and have it freed in a similar
- * manner to overflow allocation. Otherwise, we'd free batch memory
- * and pass back a pointer to garbage. Note that we deliberately
- * allocate this in the parent tuplesort context, to be on the safe
- * side.
- */
- Size tuplen;
- void *oldTuple = rtup->tuple;
-
- tuplen = state->mergecurrent[srcTape] - state->mergetail[srcTape];
- rtup->tuple = MemoryContextAlloc(state->sortcontext, tuplen);
- MOVETUP(rtup->tuple, oldTuple, tuplen);
- *should_free = true;
- }
-
- /* Free spacePerTape-sized buffer */
- pfree(state->mergetuples[srcTape]);
-}
-
-/*
- * mergebatchalloc - allocate memory for one tuple using a batch memory
- * "logical allocation".
- *
- * This is used for the final on-the-fly merge phase only. READTUP() routines
- * receive memory from here in place of palloc() and USEMEM() calls.
- *
- * Tuple tapenum is passed, ensuring each tape's tuples are stored in sorted,
- * contiguous order (while allowing safe reuse of memory made available to
- * each tape). This maximizes locality of access as tuples are returned by
- * final merge.
+ * mergereadnext - read next tuple from one merge input tape
*
- * Caller must not subsequently attempt to free memory returned here. In
- * general, only mergebatch* functions know about how memory returned from
- * here should be freed, and this function's caller must ensure that batch
- * memory management code will definitely have the opportunity to do the right
- * thing during the final on-the-fly merge.
+ * Returns false on EOF.
*/
-static void *
-mergebatchalloc(Tuplesortstate *state, int tapenum, Size tuplen)
-{
- Size reserve_tuplen = MAXALIGN(tuplen);
- char *ret;
-
- /* Should overflow at most once before mergebatchone() call: */
- Assert(state->mergeoverflow[tapenum] == NULL);
- Assert(state->batchUsed);
-
- /* It should be possible to use precisely spacePerTape memory at once */
- if (state->mergecurrent[tapenum] + reserve_tuplen <=
- state->mergetuples[tapenum] + state->spacePerTape)
- {
- /*
- * Usual case -- caller is returned pointer into its tape's buffer,
- * and an offset from that point is recorded as where tape has
- * consumed up to for current round of preloading.
- */
- ret = state->mergetail[tapenum] = state->mergecurrent[tapenum];
- state->mergecurrent[tapenum] += reserve_tuplen;
- }
- else
- {
- /*
- * Allocate memory, and record as tape's overflow allocation. This
- * will be detected quickly, in a similar fashion to a LACKMEM()
- * condition, and should not happen again before a new round of
- * preloading for caller's tape. Note that we deliberately allocate
- * this in the parent tuplesort context, to be on the safe side.
- *
- * Sometimes, this does not happen because merging runs out of slots
- * before running out of memory.
- */
- ret = state->mergeoverflow[tapenum] =
- MemoryContextAlloc(state->sortcontext, tuplen);
- }
-
- return ret;
-}
-
-/*
- * mergepreread - load tuples from merge input tapes
- *
- * This routine exists to improve sequentiality of reads during a merge pass,
- * as explained in the header comments of this file. Load tuples from each
- * active source tape until the tape's run is exhausted or it has used up
- * its fair share of available memory. In any case, we guarantee that there
- * is at least one preread tuple available from each unexhausted input tape.
- *
- * We invoke this routine at the start of a merge pass for initial load,
- * and then whenever any tape's preread data runs out. Note that we load
- * as much data as possible from all tapes, not just the one that ran out.
- * This is because logtape.c works best with a usage pattern that alternates
- * between reading a lot of data and writing a lot of data, so whenever we
- * are forced to read, we should fill working memory completely.
- *
- * In FINALMERGE state, we *don't* use this routine, but instead just preread
- * from the single tape that ran dry. There's no read/write alternation in
- * that state and so no point in scanning through all the tapes to fix one.
- * (Moreover, there may be quite a lot of inactive tapes in that state, since
- * we might have had many fewer runs than tapes. In a regular tape-to-tape
- * merge we can expect most of the tapes to be active. Plus, only
- * FINALMERGE state has to consider memory management for a batch
- * allocation.)
- */
-static void
-mergepreread(Tuplesortstate *state)
-{
- int srcTape;
-
- for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
- mergeprereadone(state, srcTape);
-}
-
-/*
- * mergeprereadone - load tuples from one merge input tape
- *
- * Read tuples from the specified tape until it has used up its free memory
- * or array slots; but ensure that we have at least one tuple, if any are
- * to be had.
- */
-static void
-mergeprereadone(Tuplesortstate *state, int srcTape)
+static bool
+mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup)
{
unsigned int tuplen;
- SortTuple stup;
- int tupIndex;
- int64 priorAvail,
- spaceUsed;
if (!state->mergeactive[srcTape])
- return; /* tape's run is already exhausted */
-
- /*
- * Manage per-tape availMem. Only actually matters when batch memory not
- * in use.
- */
- priorAvail = state->availMem;
- state->availMem = state->mergeavailmem[srcTape];
+ return false; /* tape's run is already exhausted */
- /*
- * When batch memory is used if final on-the-fly merge, only mergeoverflow
- * test is relevant; otherwise, only LACKMEM() test is relevant.
- */
- while ((state->mergeavailslots[srcTape] > 0 &&
- state->mergeoverflow[srcTape] == NULL && !LACKMEM(state)) ||
- state->mergenext[srcTape] == 0)
+ /* read next tuple, if any */
+ if ((tuplen = getlen(state, srcTape, true)) == 0)
{
- /* read next tuple, if any */
- if ((tuplen = getlen(state, srcTape, true)) == 0)
- {
- state->mergeactive[srcTape] = false;
- break;
- }
- READTUP(state, &stup, srcTape, tuplen);
- /* find a free slot in memtuples[] for it */
- tupIndex = state->mergefreelist;
- if (tupIndex)
- state->mergefreelist = state->memtuples[tupIndex].tupindex;
- else
- {
- tupIndex = state->mergefirstfree++;
- Assert(tupIndex < state->memtupsize);
- }
- state->mergeavailslots[srcTape]--;
- /* store tuple, append to list for its tape */
- stup.tupindex = 0;
- state->memtuples[tupIndex] = stup;
- if (state->mergelast[srcTape])
- state->memtuples[state->mergelast[srcTape]].tupindex = tupIndex;
- else
- state->mergenext[srcTape] = tupIndex;
- state->mergelast[srcTape] = tupIndex;
+ state->mergeactive[srcTape] = false;
+ return false;
}
- /* update per-tape and global availmem counts */
- spaceUsed = state->mergeavailmem[srcTape] - state->availMem;
- state->mergeavailmem[srcTape] = state->availMem;
- state->availMem = priorAvail - spaceUsed;
+ READTUP(state, stup, srcTape, tuplen);
+
+ return true;
}
/*
/*
* Reset tuple memory. We've freed all of the tuples that we previously
* allocated. It's important to avoid fragmentation when there is a stark
- * change in allocation patterns due to the use of batch memory.
- * Fragmentation due to AllocSetFree's bucketing by size class might be
- * particularly bad if this step wasn't taken.
+ * change in the sizes of incoming tuples. Fragmentation due to
+ * AllocSetFree's bucketing by size class might be particularly bad if
+ * this step wasn't taken.
*/
MemoryContextReset(state->tuplecontext);
}
/*
- * Get memory for tuple from within READTUP() routine. Allocate
- * memory and account for that, or consume from tape's batch
- * allocation.
+ * Get memory for tuple from within READTUP() routine.
*
- * Memory returned here in the final on-the-fly merge case is recycled
- * from tape's batch allocation. Otherwise, callers must pfree() or
- * reset tuple child memory context, and account for that with a
- * FREEMEM(). Currently, this only ever needs to happen in WRITETUP()
- * routines.
+ * We use next free slot from the slab allocator, or palloc() if the tuple
+ * is too large for that.
*/
static void *
-readtup_alloc(Tuplesortstate *state, int tapenum, Size tuplen)
+readtup_alloc(Tuplesortstate *state, Size tuplen)
{
- if (state->batchUsed)
- {
- /*
- * No USEMEM() call, because during final on-the-fly merge accounting
- * is based on tape-private state. ("Overflow" allocations are
- * detected as an indication that a new round or preloading is
- * required. Preloading marks existing contents of tape's batch buffer
- * for reuse.)
- */
- return mergebatchalloc(state, tapenum, tuplen);
- }
+ SlabSlot *buf;
+
+ /*
+ * We pre-allocate enough slots in the slab arena that we should never run
+ * out.
+ */
+ Assert(state->slabFreeHead);
+
+ if (tuplen > SLAB_SLOT_SIZE || !state->slabFreeHead)
+ return MemoryContextAlloc(state->sortcontext, tuplen);
else
{
- char *ret;
+ buf = state->slabFreeHead;
+ /* Reuse this slot */
+ state->slabFreeHead = buf->nextfree;
- /* Batch allocation yet to be performed */
- ret = MemoryContextAlloc(state->tuplecontext, tuplen);
- USEMEM(state, GetMemoryChunkSpace(ret));
- return ret;
+ return buf;
}
}
LogicalTapeWrite(state->tapeset, tapenum,
(void *) &tuplen, sizeof(tuplen));
- FREEMEM(state, GetMemoryChunkSpace(tuple));
- heap_free_minimal_tuple(tuple);
+ if (!state->slabAllocatorUsed)
+ {
+ FREEMEM(state, GetMemoryChunkSpace(tuple));
+ heap_free_minimal_tuple(tuple);
+ }
}
static void
{
unsigned int tupbodylen = len - sizeof(int);
unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET;
- MinimalTuple tuple = (MinimalTuple) readtup_alloc(state, tapenum, tuplen);
+ MinimalTuple tuple = (MinimalTuple) readtup_alloc(state, tuplen);
char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
HeapTupleData htup;
&stup->isnull1);
}
-static void
-movetup_heap(void *dest, void *src, unsigned int len)
-{
- memmove(dest, src, len);
-}
-
/*
* Routines specialized for the CLUSTER case (HeapTuple data, with
* comparisons per a btree index definition)
LogicalTapeWrite(state->tapeset, tapenum,
&tuplen, sizeof(tuplen));
- FREEMEM(state, GetMemoryChunkSpace(tuple));
- heap_freetuple(tuple);
+ if (!state->slabAllocatorUsed)
+ {
+ FREEMEM(state, GetMemoryChunkSpace(tuple));
+ heap_freetuple(tuple);
+ }
}
static void
{
unsigned int t_len = tuplen - sizeof(ItemPointerData) - sizeof(int);
HeapTuple tuple = (HeapTuple) readtup_alloc(state,
- tapenum,
t_len + HEAPTUPLESIZE);
/* Reconstruct the HeapTupleData header */
&stup->isnull1);
}
-static void
-movetup_cluster(void *dest, void *src, unsigned int len)
-{
- HeapTuple tuple;
-
- memmove(dest, src, len);
-
- /* Repoint the HeapTupleData header */
- tuple = (HeapTuple) dest;
- tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
-}
-
-
/*
* Routines specialized for IndexTuple case
*
LogicalTapeWrite(state->tapeset, tapenum,
(void *) &tuplen, sizeof(tuplen));
- FREEMEM(state, GetMemoryChunkSpace(tuple));
- pfree(tuple);
+ if (!state->slabAllocatorUsed)
+ {
+ FREEMEM(state, GetMemoryChunkSpace(tuple));
+ pfree(tuple);
+ }
}
static void
int tapenum, unsigned int len)
{
unsigned int tuplen = len - sizeof(unsigned int);
- IndexTuple tuple = (IndexTuple) readtup_alloc(state, tapenum, tuplen);
+ IndexTuple tuple = (IndexTuple) readtup_alloc(state, tuplen);
LogicalTapeReadExact(state->tapeset, tapenum,
tuple, tuplen);
&stup->isnull1);
}
-static void
-movetup_index(void *dest, void *src, unsigned int len)
-{
- memmove(dest, src, len);
-}
-
/*
* Routines specialized for DatumTuple case
*/
LogicalTapeWrite(state->tapeset, tapenum,
(void *) &writtenlen, sizeof(writtenlen));
- if (stup->tuple)
+ if (!state->slabAllocatorUsed && stup->tuple)
{
FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
pfree(stup->tuple);
}
else
{
- void *raddr = readtup_alloc(state, tapenum, tuplen);
+ void *raddr = readtup_alloc(state, tuplen);
LogicalTapeReadExact(state->tapeset, tapenum,
raddr, tuplen);
&tuplen, sizeof(tuplen));
}
-static void
-movetup_datum(void *dest, void *src, unsigned int len)
-{
- memmove(dest, src, len);
-}
-
/*
* Convenience routine to free a tuple previously loaded into sort memory
*/