]> granicus.if.org Git - postgresql/blob - src/backend/utils/sort/tuplesort.c
Make heap TID a tiebreaker nbtree index column.
[postgresql] / src / backend / utils / sort / tuplesort.c
1 /*-------------------------------------------------------------------------
2  *
3  * tuplesort.c
4  *        Generalized tuple sorting routines.
5  *
6  * This module handles sorting of heap tuples, index tuples, or single
7  * Datums (and could easily support other kinds of sortable objects,
8  * if necessary).  It works efficiently for both small and large amounts
9  * of data.  Small amounts are sorted in-memory using qsort().  Large
10  * amounts are sorted using temporary files and a standard external sort
11  * algorithm.
12  *
13  * See Knuth, volume 3, for more than you want to know about the external
14  * sorting algorithm.  Historically, we divided the input into sorted runs
15  * using replacement selection, in the form of a priority tree implemented
16  * as a heap (essentially his Algorithm 5.2.3H), but now we always use
17  * quicksort for run generation.  We merge the runs using polyphase merge,
18  * Knuth's Algorithm 5.4.2D.  The logical "tapes" used by Algorithm D are
19  * implemented by logtape.c, which avoids space wastage by recycling disk
20  * space as soon as each block is read from its "tape".
21  *
22  * The approximate amount of memory allowed for any one sort operation
23  * is specified in kilobytes by the caller (most pass work_mem).  Initially,
24  * we absorb tuples and simply store them in an unsorted array as long as
25  * we haven't exceeded workMem.  If we reach the end of the input without
26  * exceeding workMem, we sort the array using qsort() and subsequently return
27  * tuples just by scanning the tuple array sequentially.  If we do exceed
28  * workMem, we begin to emit tuples into sorted runs in temporary tapes.
29  * When tuples are dumped in batch after quicksorting, we begin a new run
30  * with a new output tape (selected per Algorithm D).  After the end of the
31  * input is reached, we dump out remaining tuples in memory into a final run,
32  * then merge the runs using Algorithm D.
33  *
34  * When merging runs, we use a heap containing just the frontmost tuple from
35  * each source run; we repeatedly output the smallest tuple and replace it
36  * with the next tuple from its source tape (if any).  When the heap empties,
37  * the merge is complete.  The basic merge algorithm thus needs very little
38  * memory --- only M tuples for an M-way merge, and M is constrained to a
39  * small number.  However, we can still make good use of our full workMem
40  * allocation by pre-reading additional blocks from each source tape.  Without
41  * prereading, our access pattern to the temporary file would be very erratic;
42  * on average we'd read one block from each of M source tapes during the same
43  * time that we're writing M blocks to the output tape, so there is no
44  * sequentiality of access at all, defeating the read-ahead methods used by
45  * most Unix kernels.  Worse, the output tape gets written into a very random
46  * sequence of blocks of the temp file, ensuring that things will be even
47  * worse when it comes time to read that tape.  A straightforward merge pass
48  * thus ends up doing a lot of waiting for disk seeks.  We can improve matters
49  * by prereading from each source tape sequentially, loading about workMem/M
50  * bytes from each tape in turn, and making the sequential blocks immediately
51  * available for reuse.  This approach helps to localize both read and write
52  * accesses.  The pre-reading is handled by logtape.c, we just tell it how
53  * much memory to use for the buffers.
54  *
55  * When the caller requests random access to the sort result, we form
56  * the final sorted run on a logical tape which is then "frozen", so
57  * that we can access it randomly.  When the caller does not need random
58  * access, we return from tuplesort_performsort() as soon as we are down
59  * to one run per logical tape.  The final merge is then performed
60  * on-the-fly as the caller repeatedly calls tuplesort_getXXX; this
61  * saves one cycle of writing all the data out to disk and reading it in.
62  *
63  * Before Postgres 8.2, we always used a seven-tape polyphase merge, on the
64  * grounds that 7 is the "sweet spot" on the tapes-to-passes curve according
65  * to Knuth's figure 70 (section 5.4.2).  However, Knuth is assuming that
66  * tape drives are expensive beasts, and in particular that there will always
67  * be many more runs than tape drives.  In our implementation a "tape drive"
68  * doesn't cost much more than a few Kb of memory buffers, so we can afford
69  * to have lots of them.  In particular, if we can have as many tape drives
70  * as sorted runs, we can eliminate any repeated I/O at all.  In the current
71  * code we determine the number of tapes M on the basis of workMem: we want
72  * workMem/M to be large enough that we read a fair amount of data each time
73  * we preread from a tape, so as to maintain the locality of access described
74  * above.  Nonetheless, with large workMem we can have many tapes (but not
75  * too many -- see the comments in tuplesort_merge_order).
76  *
77  * This module supports parallel sorting.  Parallel sorts involve coordination
78  * among one or more worker processes, and a leader process, each with its own
79  * tuplesort state.  The leader process (or, more accurately, the
80  * Tuplesortstate associated with a leader process) creates a full tapeset
81  * consisting of worker tapes with one run to merge; a run for every
82  * worker process.  This is then merged.  Worker processes are guaranteed to
83  * produce exactly one output run from their partial input.
84  *
85  *
86  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
87  * Portions Copyright (c) 1994, Regents of the University of California
88  *
89  * IDENTIFICATION
90  *        src/backend/utils/sort/tuplesort.c
91  *
92  *-------------------------------------------------------------------------
93  */
94
95 #include "postgres.h"
96
97 #include <limits.h>
98
99 #include "access/hash.h"
100 #include "access/htup_details.h"
101 #include "access/nbtree.h"
102 #include "catalog/index.h"
103 #include "catalog/pg_am.h"
104 #include "commands/tablespace.h"
105 #include "executor/executor.h"
106 #include "miscadmin.h"
107 #include "pg_trace.h"
108 #include "utils/datum.h"
109 #include "utils/logtape.h"
110 #include "utils/lsyscache.h"
111 #include "utils/memutils.h"
112 #include "utils/pg_rusage.h"
113 #include "utils/rel.h"
114 #include "utils/sortsupport.h"
115 #include "utils/tuplesort.h"
116
117
118 /* sort-type codes for sort__start probes */
119 #define HEAP_SORT               0
120 #define INDEX_SORT              1
121 #define DATUM_SORT              2
122 #define CLUSTER_SORT    3
123
124 /* Sort parallel code from state for sort__start probes */
125 #define PARALLEL_SORT(state)    ((state)->shared == NULL ? 0 : \
126                                                                  (state)->worker >= 0 ? 1 : 2)
127
128 /* GUC variables */
129 #ifdef TRACE_SORT
130 bool            trace_sort = false;
131 #endif
132
133 #ifdef DEBUG_BOUNDED_SORT
134 bool            optimize_bounded_sort = true;
135 #endif
136
137
138 /*
139  * The objects we actually sort are SortTuple structs.  These contain
140  * a pointer to the tuple proper (might be a MinimalTuple or IndexTuple),
141  * which is a separate palloc chunk --- we assume it is just one chunk and
142  * can be freed by a simple pfree() (except during merge, when we use a
143  * simple slab allocator).  SortTuples also contain the tuple's first key
144  * column in Datum/nullflag format, and an index integer.
145  *
146  * Storing the first key column lets us save heap_getattr or index_getattr
147  * calls during tuple comparisons.  We could extract and save all the key
148  * columns not just the first, but this would increase code complexity and
149  * overhead, and wouldn't actually save any comparison cycles in the common
150  * case where the first key determines the comparison result.  Note that
151  * for a pass-by-reference datatype, datum1 points into the "tuple" storage.
152  *
153  * There is one special case: when the sort support infrastructure provides an
154  * "abbreviated key" representation, where the key is (typically) a pass by
155  * value proxy for a pass by reference type.  In this case, the abbreviated key
156  * is stored in datum1 in place of the actual first key column.
157  *
158  * When sorting single Datums, the data value is represented directly by
159  * datum1/isnull1 for pass by value types (or null values).  If the datatype is
160  * pass-by-reference and isnull1 is false, then "tuple" points to a separately
161  * palloc'd data value, otherwise "tuple" is NULL.  The value of datum1 is then
162  * either the same pointer as "tuple", or is an abbreviated key value as
163  * described above.  Accordingly, "tuple" is always used in preference to
164  * datum1 as the authoritative value for pass-by-reference cases.
165  *
166  * tupindex holds the input tape number that each tuple in the heap was read
167  * from during merge passes.
168  */
169 typedef struct
170 {
171         void       *tuple;                      /* the tuple itself */
172         Datum           datum1;                 /* value of first key column */
173         bool            isnull1;                /* is first key column NULL? */
174         int                     tupindex;               /* see notes above */
175 } SortTuple;
176
177 /*
178  * During merge, we use a pre-allocated set of fixed-size slots to hold
179  * tuples.  To avoid palloc/pfree overhead.
180  *
181  * Merge doesn't require a lot of memory, so we can afford to waste some,
182  * by using gratuitously-sized slots.  If a tuple is larger than 1 kB, the
183  * palloc() overhead is not significant anymore.
184  *
185  * 'nextfree' is valid when this chunk is in the free list.  When in use, the
186  * slot holds a tuple.
187  */
188 #define SLAB_SLOT_SIZE 1024
189
190 typedef union SlabSlot
191 {
192         union SlabSlot *nextfree;
193         char            buffer[SLAB_SLOT_SIZE];
194 } SlabSlot;
195
196 /*
197  * Possible states of a Tuplesort object.  These denote the states that
198  * persist between calls of Tuplesort routines.
199  */
200 typedef enum
201 {
202         TSS_INITIAL,                            /* Loading tuples; still within memory limit */
203         TSS_BOUNDED,                            /* Loading tuples into bounded-size heap */
204         TSS_BUILDRUNS,                          /* Loading tuples; writing to tape */
205         TSS_SORTEDINMEM,                        /* Sort completed entirely in memory */
206         TSS_SORTEDONTAPE,                       /* Sort completed, final run is on tape */
207         TSS_FINALMERGE                          /* Performing final merge on-the-fly */
208 } TupSortStatus;
209
210 /*
211  * Parameters for calculation of number of tapes to use --- see inittapes()
212  * and tuplesort_merge_order().
213  *
214  * In this calculation we assume that each tape will cost us about 1 blocks
215  * worth of buffer space.  This ignores the overhead of all the other data
216  * structures needed for each tape, but it's probably close enough.
217  *
218  * MERGE_BUFFER_SIZE is how much data we'd like to read from each input
219  * tape during a preread cycle (see discussion at top of file).
220  */
221 #define MINORDER                6               /* minimum merge order */
222 #define MAXORDER                500             /* maximum merge order */
223 #define TAPE_BUFFER_OVERHEAD            BLCKSZ
224 #define MERGE_BUFFER_SIZE                       (BLCKSZ * 32)
225
226 typedef int (*SortTupleComparator) (const SortTuple *a, const SortTuple *b,
227                                                                         Tuplesortstate *state);
228
229 /*
230  * Private state of a Tuplesort operation.
231  */
232 struct Tuplesortstate
233 {
234         TupSortStatus status;           /* enumerated value as shown above */
235         int                     nKeys;                  /* number of columns in sort key */
236         bool            randomAccess;   /* did caller request random access? */
237         bool            bounded;                /* did caller specify a maximum number of
238                                                                  * tuples to return? */
239         bool            boundUsed;              /* true if we made use of a bounded heap */
240         int                     bound;                  /* if bounded, the maximum number of tuples */
241         bool            tuples;                 /* Can SortTuple.tuple ever be set? */
242         int64           availMem;               /* remaining memory available, in bytes */
243         int64           allowedMem;             /* total memory allowed, in bytes */
244         int                     maxTapes;               /* number of tapes (Knuth's T) */
245         int                     tapeRange;              /* maxTapes-1 (Knuth's P) */
246         MemoryContext sortcontext;      /* memory context holding most sort data */
247         MemoryContext tuplecontext; /* sub-context of sortcontext for tuple data */
248         LogicalTapeSet *tapeset;        /* logtape.c object for tapes in a temp file */
249
250         /*
251          * These function pointers decouple the routines that must know what kind
252          * of tuple we are sorting from the routines that don't need to know it.
253          * They are set up by the tuplesort_begin_xxx routines.
254          *
255          * Function to compare two tuples; result is per qsort() convention, ie:
256          * <0, 0, >0 according as a<b, a=b, a>b.  The API must match
257          * qsort_arg_comparator.
258          */
259         SortTupleComparator comparetup;
260
261         /*
262          * Function to copy a supplied input tuple into palloc'd space and set up
263          * its SortTuple representation (ie, set tuple/datum1/isnull1).  Also,
264          * state->availMem must be decreased by the amount of space used for the
265          * tuple copy (note the SortTuple struct itself is not counted).
266          */
267         void            (*copytup) (Tuplesortstate *state, SortTuple *stup, void *tup);
268
269         /*
270          * Function to write a stored tuple onto tape.  The representation of the
271          * tuple on tape need not be the same as it is in memory; requirements on
272          * the tape representation are given below.  Unless the slab allocator is
273          * used, after writing the tuple, pfree() the out-of-line data (not the
274          * SortTuple struct!), and increase state->availMem by the amount of
275          * memory space thereby released.
276          */
277         void            (*writetup) (Tuplesortstate *state, int tapenum,
278                                                          SortTuple *stup);
279
280         /*
281          * Function to read a stored tuple from tape back into memory. 'len' is
282          * the already-read length of the stored tuple.  The tuple is allocated
283          * from the slab memory arena, or is palloc'd, see readtup_alloc().
284          */
285         void            (*readtup) (Tuplesortstate *state, SortTuple *stup,
286                                                         int tapenum, unsigned int len);
287
288         /*
289          * This array holds the tuples now in sort memory.  If we are in state
290          * INITIAL, the tuples are in no particular order; if we are in state
291          * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
292          * and FINALMERGE, the tuples are organized in "heap" order per Algorithm
293          * H.  In state SORTEDONTAPE, the array is not used.
294          */
295         SortTuple  *memtuples;          /* array of SortTuple structs */
296         int                     memtupcount;    /* number of tuples currently present */
297         int                     memtupsize;             /* allocated length of memtuples array */
298         bool            growmemtuples;  /* memtuples' growth still underway? */
299
300         /*
301          * Memory for tuples is sometimes allocated using a simple slab allocator,
302          * rather than with palloc().  Currently, we switch to slab allocation
303          * when we start merging.  Merging only needs to keep a small, fixed
304          * number of tuples in memory at any time, so we can avoid the
305          * palloc/pfree overhead by recycling a fixed number of fixed-size slots
306          * to hold the tuples.
307          *
308          * For the slab, we use one large allocation, divided into SLAB_SLOT_SIZE
309          * slots.  The allocation is sized to have one slot per tape, plus one
310          * additional slot.  We need that many slots to hold all the tuples kept
311          * in the heap during merge, plus the one we have last returned from the
312          * sort, with tuplesort_gettuple.
313          *
314          * Initially, all the slots are kept in a linked list of free slots.  When
315          * a tuple is read from a tape, it is put to the next available slot, if
316          * it fits.  If the tuple is larger than SLAB_SLOT_SIZE, it is palloc'd
317          * instead.
318          *
319          * When we're done processing a tuple, we return the slot back to the free
320          * list, or pfree() if it was palloc'd.  We know that a tuple was
321          * allocated from the slab, if its pointer value is between
322          * slabMemoryBegin and -End.
323          *
324          * When the slab allocator is used, the USEMEM/LACKMEM mechanism of
325          * tracking memory usage is not used.
326          */
327         bool            slabAllocatorUsed;
328
329         char       *slabMemoryBegin;    /* beginning of slab memory arena */
330         char       *slabMemoryEnd;      /* end of slab memory arena */
331         SlabSlot   *slabFreeHead;       /* head of free list */
332
333         /* Buffer size to use for reading input tapes, during merge. */
334         size_t          read_buffer_size;
335
336         /*
337          * When we return a tuple to the caller in tuplesort_gettuple_XXX, that
338          * came from a tape (that is, in TSS_SORTEDONTAPE or TSS_FINALMERGE
339          * modes), we remember the tuple in 'lastReturnedTuple', so that we can
340          * recycle the memory on next gettuple call.
341          */
342         void       *lastReturnedTuple;
343
344         /*
345          * While building initial runs, this is the current output run number.
346          * Afterwards, it is the number of initial runs we made.
347          */
348         int                     currentRun;
349
350         /*
351          * Unless otherwise noted, all pointer variables below are pointers to
352          * arrays of length maxTapes, holding per-tape data.
353          */
354
355         /*
356          * This variable is only used during merge passes.  mergeactive[i] is true
357          * if we are reading an input run from (actual) tape number i and have not
358          * yet exhausted that run.
359          */
360         bool       *mergeactive;        /* active input run source? */
361
362         /*
363          * Variables for Algorithm D.  Note that destTape is a "logical" tape
364          * number, ie, an index into the tp_xxx[] arrays.  Be careful to keep
365          * "logical" and "actual" tape numbers straight!
366          */
367         int                     Level;                  /* Knuth's l */
368         int                     destTape;               /* current output tape (Knuth's j, less 1) */
369         int                *tp_fib;                     /* Target Fibonacci run counts (A[]) */
370         int                *tp_runs;            /* # of real runs on each tape */
371         int                *tp_dummy;           /* # of dummy runs for each tape (D[]) */
372         int                *tp_tapenum;         /* Actual tape numbers (TAPE[]) */
373         int                     activeTapes;    /* # of active input tapes in merge pass */
374
375         /*
376          * These variables are used after completion of sorting to keep track of
377          * the next tuple to return.  (In the tape case, the tape's current read
378          * position is also critical state.)
379          */
380         int                     result_tape;    /* actual tape number of finished output */
381         int                     current;                /* array index (only used if SORTEDINMEM) */
382         bool            eof_reached;    /* reached EOF (needed for cursors) */
383
384         /* markpos_xxx holds marked position for mark and restore */
385         long            markpos_block;  /* tape block# (only used if SORTEDONTAPE) */
386         int                     markpos_offset; /* saved "current", or offset in tape block */
387         bool            markpos_eof;    /* saved "eof_reached" */
388
389         /*
390          * These variables are used during parallel sorting.
391          *
392          * worker is our worker identifier.  Follows the general convention that
393          * -1 value relates to a leader tuplesort, and values >= 0 worker
394          * tuplesorts. (-1 can also be a serial tuplesort.)
395          *
396          * shared is mutable shared memory state, which is used to coordinate
397          * parallel sorts.
398          *
399          * nParticipants is the number of worker Tuplesortstates known by the
400          * leader to have actually been launched, which implies that they must
401          * finish a run leader can merge.  Typically includes a worker state held
402          * by the leader process itself.  Set in the leader Tuplesortstate only.
403          */
404         int                     worker;
405         Sharedsort *shared;
406         int                     nParticipants;
407
408         /*
409          * The sortKeys variable is used by every case other than the hash index
410          * case; it is set by tuplesort_begin_xxx.  tupDesc is only used by the
411          * MinimalTuple and CLUSTER routines, though.
412          */
413         TupleDesc       tupDesc;
414         SortSupport sortKeys;           /* array of length nKeys */
415
416         /*
417          * This variable is shared by the single-key MinimalTuple case and the
418          * Datum case (which both use qsort_ssup()).  Otherwise it's NULL.
419          */
420         SortSupport onlyKey;
421
422         /*
423          * Additional state for managing "abbreviated key" sortsupport routines
424          * (which currently may be used by all cases except the hash index case).
425          * Tracks the intervals at which the optimization's effectiveness is
426          * tested.
427          */
428         int64           abbrevNext;             /* Tuple # at which to next check
429                                                                  * applicability */
430
431         /*
432          * These variables are specific to the CLUSTER case; they are set by
433          * tuplesort_begin_cluster.
434          */
435         IndexInfo  *indexInfo;          /* info about index being used for reference */
436         EState     *estate;                     /* for evaluating index expressions */
437
438         /*
439          * These variables are specific to the IndexTuple case; they are set by
440          * tuplesort_begin_index_xxx and used only by the IndexTuple routines.
441          */
442         Relation        heapRel;                /* table the index is being built on */
443         Relation        indexRel;               /* index being built */
444
445         /* These are specific to the index_btree subcase: */
446         bool            enforceUnique;  /* complain if we find duplicate tuples */
447
448         /* These are specific to the index_hash subcase: */
449         uint32          high_mask;              /* masks for sortable part of hash code */
450         uint32          low_mask;
451         uint32          max_buckets;
452
453         /*
454          * These variables are specific to the Datum case; they are set by
455          * tuplesort_begin_datum and used only by the DatumTuple routines.
456          */
457         Oid                     datumType;
458         /* we need typelen in order to know how to copy the Datums. */
459         int                     datumTypeLen;
460
461         /*
462          * Resource snapshot for time of sort start.
463          */
464 #ifdef TRACE_SORT
465         PGRUsage        ru_start;
466 #endif
467 };
468
469 /*
470  * Private mutable state of tuplesort-parallel-operation.  This is allocated
471  * in shared memory.
472  */
473 struct Sharedsort
474 {
475         /* mutex protects all fields prior to tapes */
476         slock_t         mutex;
477
478         /*
479          * currentWorker generates ordinal identifier numbers for parallel sort
480          * workers.  These start from 0, and are always gapless.
481          *
482          * Workers increment workersFinished to indicate having finished.  If this
483          * is equal to state.nParticipants within the leader, leader is ready to
484          * merge worker runs.
485          */
486         int                     currentWorker;
487         int                     workersFinished;
488
489         /* Temporary file space */
490         SharedFileSet fileset;
491
492         /* Size of tapes flexible array */
493         int                     nTapes;
494
495         /*
496          * Tapes array used by workers to report back information needed by the
497          * leader to concatenate all worker tapes into one for merging
498          */
499         TapeShare       tapes[FLEXIBLE_ARRAY_MEMBER];
500 };
501
502 /*
503  * Is the given tuple allocated from the slab memory arena?
504  */
505 #define IS_SLAB_SLOT(state, tuple) \
506         ((char *) (tuple) >= (state)->slabMemoryBegin && \
507          (char *) (tuple) < (state)->slabMemoryEnd)
508
509 /*
510  * Return the given tuple to the slab memory free list, or free it
511  * if it was palloc'd.
512  */
513 #define RELEASE_SLAB_SLOT(state, tuple) \
514         do { \
515                 SlabSlot *buf = (SlabSlot *) tuple; \
516                 \
517                 if (IS_SLAB_SLOT((state), buf)) \
518                 { \
519                         buf->nextfree = (state)->slabFreeHead; \
520                         (state)->slabFreeHead = buf; \
521                 } else \
522                         pfree(buf); \
523         } while(0)
524
525 #define COMPARETUP(state,a,b)   ((*(state)->comparetup) (a, b, state))
526 #define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup))
527 #define WRITETUP(state,tape,stup)       ((*(state)->writetup) (state, tape, stup))
528 #define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len))
529 #define LACKMEM(state)          ((state)->availMem < 0 && !(state)->slabAllocatorUsed)
530 #define USEMEM(state,amt)       ((state)->availMem -= (amt))
531 #define FREEMEM(state,amt)      ((state)->availMem += (amt))
532 #define SERIAL(state)           ((state)->shared == NULL)
533 #define WORKER(state)           ((state)->shared && (state)->worker != -1)
534 #define LEADER(state)           ((state)->shared && (state)->worker == -1)
535
536 /*
537  * NOTES about on-tape representation of tuples:
538  *
539  * We require the first "unsigned int" of a stored tuple to be the total size
540  * on-tape of the tuple, including itself (so it is never zero; an all-zero
541  * unsigned int is used to delimit runs).  The remainder of the stored tuple
542  * may or may not match the in-memory representation of the tuple ---
543  * any conversion needed is the job of the writetup and readtup routines.
544  *
545  * If state->randomAccess is true, then the stored representation of the
546  * tuple must be followed by another "unsigned int" that is a copy of the
547  * length --- so the total tape space used is actually sizeof(unsigned int)
548  * more than the stored length value.  This allows read-backwards.  When
549  * randomAccess is not true, the write/read routines may omit the extra
550  * length word.
551  *
552  * writetup is expected to write both length words as well as the tuple
553  * data.  When readtup is called, the tape is positioned just after the
554  * front length word; readtup must read the tuple data and advance past
555  * the back length word (if present).
556  *
557  * The write/read routines can make use of the tuple description data
558  * stored in the Tuplesortstate record, if needed.  They are also expected
559  * to adjust state->availMem by the amount of memory space (not tape space!)
560  * released or consumed.  There is no error return from either writetup
561  * or readtup; they should ereport() on failure.
562  *
563  *
564  * NOTES about memory consumption calculations:
565  *
566  * We count space allocated for tuples against the workMem limit, plus
567  * the space used by the variable-size memtuples array.  Fixed-size space
568  * is not counted; it's small enough to not be interesting.
569  *
570  * Note that we count actual space used (as shown by GetMemoryChunkSpace)
571  * rather than the originally-requested size.  This is important since
572  * palloc can add substantial overhead.  It's not a complete answer since
573  * we won't count any wasted space in palloc allocation blocks, but it's
574  * a lot better than what we were doing before 7.3.  As of 9.6, a
575  * separate memory context is used for caller passed tuples.  Resetting
576  * it at certain key increments significantly ameliorates fragmentation.
577  * Note that this places a responsibility on readtup and copytup routines
578  * to use the right memory context for these tuples (and to not use the
579  * reset context for anything whose lifetime needs to span multiple
580  * external sort runs).
581  */
582
583 /* When using this macro, beware of double evaluation of len */
584 #define LogicalTapeReadExact(tapeset, tapenum, ptr, len) \
585         do { \
586                 if (LogicalTapeRead(tapeset, tapenum, ptr, len) != (size_t) (len)) \
587                         elog(ERROR, "unexpected end of data"); \
588         } while(0)
589
590
591 static Tuplesortstate *tuplesort_begin_common(int workMem,
592                                            SortCoordinate coordinate,
593                                            bool randomAccess);
594 static void puttuple_common(Tuplesortstate *state, SortTuple *tuple);
595 static bool consider_abort_common(Tuplesortstate *state);
596 static void inittapes(Tuplesortstate *state, bool mergeruns);
597 static void inittapestate(Tuplesortstate *state, int maxTapes);
598 static void selectnewtape(Tuplesortstate *state);
599 static void init_slab_allocator(Tuplesortstate *state, int numSlots);
600 static void mergeruns(Tuplesortstate *state);
601 static void mergeonerun(Tuplesortstate *state);
602 static void beginmerge(Tuplesortstate *state);
603 static bool mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup);
604 static void dumptuples(Tuplesortstate *state, bool alltuples);
605 static void make_bounded_heap(Tuplesortstate *state);
606 static void sort_bounded_heap(Tuplesortstate *state);
607 static void tuplesort_sort_memtuples(Tuplesortstate *state);
608 static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple);
609 static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple);
610 static void tuplesort_heap_delete_top(Tuplesortstate *state);
611 static void reversedirection(Tuplesortstate *state);
612 static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
613 static void markrunend(Tuplesortstate *state, int tapenum);
614 static void *readtup_alloc(Tuplesortstate *state, Size tuplen);
615 static int comparetup_heap(const SortTuple *a, const SortTuple *b,
616                                 Tuplesortstate *state);
617 static void copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup);
618 static void writetup_heap(Tuplesortstate *state, int tapenum,
619                           SortTuple *stup);
620 static void readtup_heap(Tuplesortstate *state, SortTuple *stup,
621                          int tapenum, unsigned int len);
622 static int comparetup_cluster(const SortTuple *a, const SortTuple *b,
623                                    Tuplesortstate *state);
624 static void copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup);
625 static void writetup_cluster(Tuplesortstate *state, int tapenum,
626                                  SortTuple *stup);
627 static void readtup_cluster(Tuplesortstate *state, SortTuple *stup,
628                                 int tapenum, unsigned int len);
629 static int comparetup_index_btree(const SortTuple *a, const SortTuple *b,
630                                            Tuplesortstate *state);
631 static int comparetup_index_hash(const SortTuple *a, const SortTuple *b,
632                                           Tuplesortstate *state);
633 static void copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup);
634 static void writetup_index(Tuplesortstate *state, int tapenum,
635                            SortTuple *stup);
636 static void readtup_index(Tuplesortstate *state, SortTuple *stup,
637                           int tapenum, unsigned int len);
638 static int comparetup_datum(const SortTuple *a, const SortTuple *b,
639                                  Tuplesortstate *state);
640 static void copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup);
641 static void writetup_datum(Tuplesortstate *state, int tapenum,
642                            SortTuple *stup);
643 static void readtup_datum(Tuplesortstate *state, SortTuple *stup,
644                           int tapenum, unsigned int len);
645 static int      worker_get_identifier(Tuplesortstate *state);
646 static void worker_freeze_result_tape(Tuplesortstate *state);
647 static void worker_nomergeruns(Tuplesortstate *state);
648 static void leader_takeover_tapes(Tuplesortstate *state);
649 static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
650
651 /*
652  * Special versions of qsort just for SortTuple objects.  qsort_tuple() sorts
653  * any variant of SortTuples, using the appropriate comparetup function.
654  * qsort_ssup() is specialized for the case where the comparetup function
655  * reduces to ApplySortComparator(), that is single-key MinimalTuple sorts
656  * and Datum sorts.
657  */
658 #include "qsort_tuple.c"
659
660
661 /*
662  *              tuplesort_begin_xxx
663  *
664  * Initialize for a tuple sort operation.
665  *
666  * After calling tuplesort_begin, the caller should call tuplesort_putXXX
667  * zero or more times, then call tuplesort_performsort when all the tuples
668  * have been supplied.  After performsort, retrieve the tuples in sorted
669  * order by calling tuplesort_getXXX until it returns false/NULL.  (If random
670  * access was requested, rescan, markpos, and restorepos can also be called.)
671  * Call tuplesort_end to terminate the operation and release memory/disk space.
672  *
673  * Each variant of tuplesort_begin has a workMem parameter specifying the
674  * maximum number of kilobytes of RAM to use before spilling data to disk.
675  * (The normal value of this parameter is work_mem, but some callers use
676  * other values.)  Each variant also has a randomAccess parameter specifying
677  * whether the caller needs non-sequential access to the sort result.
678  */
679
680 static Tuplesortstate *
681 tuplesort_begin_common(int workMem, SortCoordinate coordinate,
682                                            bool randomAccess)
683 {
684         Tuplesortstate *state;
685         MemoryContext sortcontext;
686         MemoryContext tuplecontext;
687         MemoryContext oldcontext;
688
689         /* See leader_takeover_tapes() remarks on randomAccess support */
690         if (coordinate && randomAccess)
691                 elog(ERROR, "random access disallowed under parallel sort");
692
693         /*
694          * Create a working memory context for this sort operation. All data
695          * needed by the sort will live inside this context.
696          */
697         sortcontext = AllocSetContextCreate(CurrentMemoryContext,
698                                                                                 "TupleSort main",
699                                                                                 ALLOCSET_DEFAULT_SIZES);
700
701         /*
702          * Caller tuple (e.g. IndexTuple) memory context.
703          *
704          * A dedicated child context used exclusively for caller passed tuples
705          * eases memory management.  Resetting at key points reduces
706          * fragmentation. Note that the memtuples array of SortTuples is allocated
707          * in the parent context, not this context, because there is no need to
708          * free memtuples early.
709          */
710         tuplecontext = AllocSetContextCreate(sortcontext,
711                                                                                  "Caller tuples",
712                                                                                  ALLOCSET_DEFAULT_SIZES);
713
714         /*
715          * Make the Tuplesortstate within the per-sort context.  This way, we
716          * don't need a separate pfree() operation for it at shutdown.
717          */
718         oldcontext = MemoryContextSwitchTo(sortcontext);
719
720         state = (Tuplesortstate *) palloc0(sizeof(Tuplesortstate));
721
722 #ifdef TRACE_SORT
723         if (trace_sort)
724                 pg_rusage_init(&state->ru_start);
725 #endif
726
727         state->status = TSS_INITIAL;
728         state->randomAccess = randomAccess;
729         state->bounded = false;
730         state->tuples = true;
731         state->boundUsed = false;
732
733         /*
734          * workMem is forced to be at least 64KB, the current minimum valid value
735          * for the work_mem GUC.  This is a defense against parallel sort callers
736          * that divide out memory among many workers in a way that leaves each
737          * with very little memory.
738          */
739         state->allowedMem = Max(workMem, 64) * (int64) 1024;
740         state->availMem = state->allowedMem;
741         state->sortcontext = sortcontext;
742         state->tuplecontext = tuplecontext;
743         state->tapeset = NULL;
744
745         state->memtupcount = 0;
746
747         /*
748          * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
749          * see comments in grow_memtuples().
750          */
751         state->memtupsize = Max(1024,
752                                                         ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1);
753
754         state->growmemtuples = true;
755         state->slabAllocatorUsed = false;
756         state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
757
758         USEMEM(state, GetMemoryChunkSpace(state->memtuples));
759
760         /* workMem must be large enough for the minimal memtuples array */
761         if (LACKMEM(state))
762                 elog(ERROR, "insufficient memory allowed for sort");
763
764         state->currentRun = 0;
765
766         /*
767          * maxTapes, tapeRange, and Algorithm D variables will be initialized by
768          * inittapes(), if needed
769          */
770
771         state->result_tape = -1;        /* flag that result tape has not been formed */
772
773         /*
774          * Initialize parallel-related state based on coordination information
775          * from caller
776          */
777         if (!coordinate)
778         {
779                 /* Serial sort */
780                 state->shared = NULL;
781                 state->worker = -1;
782                 state->nParticipants = -1;
783         }
784         else if (coordinate->isWorker)
785         {
786                 /* Parallel worker produces exactly one final run from all input */
787                 state->shared = coordinate->sharedsort;
788                 state->worker = worker_get_identifier(state);
789                 state->nParticipants = -1;
790         }
791         else
792         {
793                 /* Parallel leader state only used for final merge */
794                 state->shared = coordinate->sharedsort;
795                 state->worker = -1;
796                 state->nParticipants = coordinate->nParticipants;
797                 Assert(state->nParticipants >= 1);
798         }
799
800         MemoryContextSwitchTo(oldcontext);
801
802         return state;
803 }
804
805 Tuplesortstate *
806 tuplesort_begin_heap(TupleDesc tupDesc,
807                                          int nkeys, AttrNumber *attNums,
808                                          Oid *sortOperators, Oid *sortCollations,
809                                          bool *nullsFirstFlags,
810                                          int workMem, SortCoordinate coordinate, bool randomAccess)
811 {
812         Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
813                                                                                                    randomAccess);
814         MemoryContext oldcontext;
815         int                     i;
816
817         oldcontext = MemoryContextSwitchTo(state->sortcontext);
818
819         AssertArg(nkeys > 0);
820
821 #ifdef TRACE_SORT
822         if (trace_sort)
823                 elog(LOG,
824                          "begin tuple sort: nkeys = %d, workMem = %d, randomAccess = %c",
825                          nkeys, workMem, randomAccess ? 't' : 'f');
826 #endif
827
828         state->nKeys = nkeys;
829
830         TRACE_POSTGRESQL_SORT_START(HEAP_SORT,
831                                                                 false,  /* no unique check */
832                                                                 nkeys,
833                                                                 workMem,
834                                                                 randomAccess,
835                                                                 PARALLEL_SORT(state));
836
837         state->comparetup = comparetup_heap;
838         state->copytup = copytup_heap;
839         state->writetup = writetup_heap;
840         state->readtup = readtup_heap;
841
842         state->tupDesc = tupDesc;       /* assume we need not copy tupDesc */
843         state->abbrevNext = 10;
844
845         /* Prepare SortSupport data for each column */
846         state->sortKeys = (SortSupport) palloc0(nkeys * sizeof(SortSupportData));
847
848         for (i = 0; i < nkeys; i++)
849         {
850                 SortSupport sortKey = state->sortKeys + i;
851
852                 AssertArg(attNums[i] != 0);
853                 AssertArg(sortOperators[i] != 0);
854
855                 sortKey->ssup_cxt = CurrentMemoryContext;
856                 sortKey->ssup_collation = sortCollations[i];
857                 sortKey->ssup_nulls_first = nullsFirstFlags[i];
858                 sortKey->ssup_attno = attNums[i];
859                 /* Convey if abbreviation optimization is applicable in principle */
860                 sortKey->abbreviate = (i == 0);
861
862                 PrepareSortSupportFromOrderingOp(sortOperators[i], sortKey);
863         }
864
865         /*
866          * The "onlyKey" optimization cannot be used with abbreviated keys, since
867          * tie-breaker comparisons may be required.  Typically, the optimization
868          * is only of value to pass-by-value types anyway, whereas abbreviated
869          * keys are typically only of value to pass-by-reference types.
870          */
871         if (nkeys == 1 && !state->sortKeys->abbrev_converter)
872                 state->onlyKey = state->sortKeys;
873
874         MemoryContextSwitchTo(oldcontext);
875
876         return state;
877 }
878
879 Tuplesortstate *
880 tuplesort_begin_cluster(TupleDesc tupDesc,
881                                                 Relation indexRel,
882                                                 int workMem,
883                                                 SortCoordinate coordinate, bool randomAccess)
884 {
885         Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
886                                                                                                    randomAccess);
887         BTScanInsert indexScanKey;
888         MemoryContext oldcontext;
889         int                     i;
890
891         Assert(indexRel->rd_rel->relam == BTREE_AM_OID);
892
893         oldcontext = MemoryContextSwitchTo(state->sortcontext);
894
895 #ifdef TRACE_SORT
896         if (trace_sort)
897                 elog(LOG,
898                          "begin tuple sort: nkeys = %d, workMem = %d, randomAccess = %c",
899                          RelationGetNumberOfAttributes(indexRel),
900                          workMem, randomAccess ? 't' : 'f');
901 #endif
902
903         state->nKeys = IndexRelationGetNumberOfKeyAttributes(indexRel);
904
905         TRACE_POSTGRESQL_SORT_START(CLUSTER_SORT,
906                                                                 false,  /* no unique check */
907                                                                 state->nKeys,
908                                                                 workMem,
909                                                                 randomAccess,
910                                                                 PARALLEL_SORT(state));
911
912         state->comparetup = comparetup_cluster;
913         state->copytup = copytup_cluster;
914         state->writetup = writetup_cluster;
915         state->readtup = readtup_cluster;
916         state->abbrevNext = 10;
917
918         state->indexInfo = BuildIndexInfo(indexRel);
919
920         state->tupDesc = tupDesc;       /* assume we need not copy tupDesc */
921
922         indexScanKey = _bt_mkscankey(indexRel, NULL);
923
924         if (state->indexInfo->ii_Expressions != NULL)
925         {
926                 TupleTableSlot *slot;
927                 ExprContext *econtext;
928
929                 /*
930                  * We will need to use FormIndexDatum to evaluate the index
931                  * expressions.  To do that, we need an EState, as well as a
932                  * TupleTableSlot to put the table tuples into.  The econtext's
933                  * scantuple has to point to that slot, too.
934                  */
935                 state->estate = CreateExecutorState();
936                 slot = MakeSingleTupleTableSlot(tupDesc, &TTSOpsVirtual);
937                 econtext = GetPerTupleExprContext(state->estate);
938                 econtext->ecxt_scantuple = slot;
939         }
940
941         /* Prepare SortSupport data for each column */
942         state->sortKeys = (SortSupport) palloc0(state->nKeys *
943                                                                                         sizeof(SortSupportData));
944
945         for (i = 0; i < state->nKeys; i++)
946         {
947                 SortSupport sortKey = state->sortKeys + i;
948                 ScanKey         scanKey = indexScanKey->scankeys + i;
949                 int16           strategy;
950
951                 sortKey->ssup_cxt = CurrentMemoryContext;
952                 sortKey->ssup_collation = scanKey->sk_collation;
953                 sortKey->ssup_nulls_first =
954                         (scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0;
955                 sortKey->ssup_attno = scanKey->sk_attno;
956                 /* Convey if abbreviation optimization is applicable in principle */
957                 sortKey->abbreviate = (i == 0);
958
959                 AssertState(sortKey->ssup_attno != 0);
960
961                 strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ?
962                         BTGreaterStrategyNumber : BTLessStrategyNumber;
963
964                 PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey);
965         }
966
967         pfree(indexScanKey);
968
969         MemoryContextSwitchTo(oldcontext);
970
971         return state;
972 }
973
974 Tuplesortstate *
975 tuplesort_begin_index_btree(Relation heapRel,
976                                                         Relation indexRel,
977                                                         bool enforceUnique,
978                                                         int workMem,
979                                                         SortCoordinate coordinate,
980                                                         bool randomAccess)
981 {
982         Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
983                                                                                                    randomAccess);
984         BTScanInsert indexScanKey;
985         MemoryContext oldcontext;
986         int                     i;
987
988         oldcontext = MemoryContextSwitchTo(state->sortcontext);
989
990 #ifdef TRACE_SORT
991         if (trace_sort)
992                 elog(LOG,
993                          "begin index sort: unique = %c, workMem = %d, randomAccess = %c",
994                          enforceUnique ? 't' : 'f',
995                          workMem, randomAccess ? 't' : 'f');
996 #endif
997
998         state->nKeys = IndexRelationGetNumberOfKeyAttributes(indexRel);
999
1000         TRACE_POSTGRESQL_SORT_START(INDEX_SORT,
1001                                                                 enforceUnique,
1002                                                                 state->nKeys,
1003                                                                 workMem,
1004                                                                 randomAccess,
1005                                                                 PARALLEL_SORT(state));
1006
1007         state->comparetup = comparetup_index_btree;
1008         state->copytup = copytup_index;
1009         state->writetup = writetup_index;
1010         state->readtup = readtup_index;
1011         state->abbrevNext = 10;
1012
1013         state->heapRel = heapRel;
1014         state->indexRel = indexRel;
1015         state->enforceUnique = enforceUnique;
1016
1017         indexScanKey = _bt_mkscankey(indexRel, NULL);
1018
1019         /* Prepare SortSupport data for each column */
1020         state->sortKeys = (SortSupport) palloc0(state->nKeys *
1021                                                                                         sizeof(SortSupportData));
1022
1023         for (i = 0; i < state->nKeys; i++)
1024         {
1025                 SortSupport sortKey = state->sortKeys + i;
1026                 ScanKey         scanKey = indexScanKey->scankeys + i;
1027                 int16           strategy;
1028
1029                 sortKey->ssup_cxt = CurrentMemoryContext;
1030                 sortKey->ssup_collation = scanKey->sk_collation;
1031                 sortKey->ssup_nulls_first =
1032                         (scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0;
1033                 sortKey->ssup_attno = scanKey->sk_attno;
1034                 /* Convey if abbreviation optimization is applicable in principle */
1035                 sortKey->abbreviate = (i == 0);
1036
1037                 AssertState(sortKey->ssup_attno != 0);
1038
1039                 strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ?
1040                         BTGreaterStrategyNumber : BTLessStrategyNumber;
1041
1042                 PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey);
1043         }
1044
1045         pfree(indexScanKey);
1046
1047         MemoryContextSwitchTo(oldcontext);
1048
1049         return state;
1050 }
1051
1052 Tuplesortstate *
1053 tuplesort_begin_index_hash(Relation heapRel,
1054                                                    Relation indexRel,
1055                                                    uint32 high_mask,
1056                                                    uint32 low_mask,
1057                                                    uint32 max_buckets,
1058                                                    int workMem,
1059                                                    SortCoordinate coordinate,
1060                                                    bool randomAccess)
1061 {
1062         Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
1063                                                                                                    randomAccess);
1064         MemoryContext oldcontext;
1065
1066         oldcontext = MemoryContextSwitchTo(state->sortcontext);
1067
1068 #ifdef TRACE_SORT
1069         if (trace_sort)
1070                 elog(LOG,
1071                          "begin index sort: high_mask = 0x%x, low_mask = 0x%x, "
1072                          "max_buckets = 0x%x, workMem = %d, randomAccess = %c",
1073                          high_mask,
1074                          low_mask,
1075                          max_buckets,
1076                          workMem, randomAccess ? 't' : 'f');
1077 #endif
1078
1079         state->nKeys = 1;                       /* Only one sort column, the hash code */
1080
1081         state->comparetup = comparetup_index_hash;
1082         state->copytup = copytup_index;
1083         state->writetup = writetup_index;
1084         state->readtup = readtup_index;
1085
1086         state->heapRel = heapRel;
1087         state->indexRel = indexRel;
1088
1089         state->high_mask = high_mask;
1090         state->low_mask = low_mask;
1091         state->max_buckets = max_buckets;
1092
1093         MemoryContextSwitchTo(oldcontext);
1094
1095         return state;
1096 }
1097
1098 Tuplesortstate *
1099 tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
1100                                           bool nullsFirstFlag, int workMem,
1101                                           SortCoordinate coordinate, bool randomAccess)
1102 {
1103         Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
1104                                                                                                    randomAccess);
1105         MemoryContext oldcontext;
1106         int16           typlen;
1107         bool            typbyval;
1108
1109         oldcontext = MemoryContextSwitchTo(state->sortcontext);
1110
1111 #ifdef TRACE_SORT
1112         if (trace_sort)
1113                 elog(LOG,
1114                          "begin datum sort: workMem = %d, randomAccess = %c",
1115                          workMem, randomAccess ? 't' : 'f');
1116 #endif
1117
1118         state->nKeys = 1;                       /* always a one-column sort */
1119
1120         TRACE_POSTGRESQL_SORT_START(DATUM_SORT,
1121                                                                 false,  /* no unique check */
1122                                                                 1,
1123                                                                 workMem,
1124                                                                 randomAccess,
1125                                                                 PARALLEL_SORT(state));
1126
1127         state->comparetup = comparetup_datum;
1128         state->copytup = copytup_datum;
1129         state->writetup = writetup_datum;
1130         state->readtup = readtup_datum;
1131         state->abbrevNext = 10;
1132
1133         state->datumType = datumType;
1134
1135         /* lookup necessary attributes of the datum type */
1136         get_typlenbyval(datumType, &typlen, &typbyval);
1137         state->datumTypeLen = typlen;
1138         state->tuples = !typbyval;
1139
1140         /* Prepare SortSupport data */
1141         state->sortKeys = (SortSupport) palloc0(sizeof(SortSupportData));
1142
1143         state->sortKeys->ssup_cxt = CurrentMemoryContext;
1144         state->sortKeys->ssup_collation = sortCollation;
1145         state->sortKeys->ssup_nulls_first = nullsFirstFlag;
1146
1147         /*
1148          * Abbreviation is possible here only for by-reference types.  In theory,
1149          * a pass-by-value datatype could have an abbreviated form that is cheaper
1150          * to compare.  In a tuple sort, we could support that, because we can
1151          * always extract the original datum from the tuple is needed.  Here, we
1152          * can't, because a datum sort only stores a single copy of the datum; the
1153          * "tuple" field of each sortTuple is NULL.
1154          */
1155         state->sortKeys->abbreviate = !typbyval;
1156
1157         PrepareSortSupportFromOrderingOp(sortOperator, state->sortKeys);
1158
1159         /*
1160          * The "onlyKey" optimization cannot be used with abbreviated keys, since
1161          * tie-breaker comparisons may be required.  Typically, the optimization
1162          * is only of value to pass-by-value types anyway, whereas abbreviated
1163          * keys are typically only of value to pass-by-reference types.
1164          */
1165         if (!state->sortKeys->abbrev_converter)
1166                 state->onlyKey = state->sortKeys;
1167
1168         MemoryContextSwitchTo(oldcontext);
1169
1170         return state;
1171 }
1172
1173 /*
1174  * tuplesort_set_bound
1175  *
1176  *      Advise tuplesort that at most the first N result tuples are required.
1177  *
1178  * Must be called before inserting any tuples.  (Actually, we could allow it
1179  * as long as the sort hasn't spilled to disk, but there seems no need for
1180  * delayed calls at the moment.)
1181  *
1182  * This is a hint only. The tuplesort may still return more tuples than
1183  * requested.  Parallel leader tuplesorts will always ignore the hint.
1184  */
1185 void
1186 tuplesort_set_bound(Tuplesortstate *state, int64 bound)
1187 {
1188         /* Assert we're called before loading any tuples */
1189         Assert(state->status == TSS_INITIAL);
1190         Assert(state->memtupcount == 0);
1191         Assert(!state->bounded);
1192         Assert(!WORKER(state));
1193
1194 #ifdef DEBUG_BOUNDED_SORT
1195         /* Honor GUC setting that disables the feature (for easy testing) */
1196         if (!optimize_bounded_sort)
1197                 return;
1198 #endif
1199
1200         /* Parallel leader ignores hint */
1201         if (LEADER(state))
1202                 return;
1203
1204         /* We want to be able to compute bound * 2, so limit the setting */
1205         if (bound > (int64) (INT_MAX / 2))
1206                 return;
1207
1208         state->bounded = true;
1209         state->bound = (int) bound;
1210
1211         /*
1212          * Bounded sorts are not an effective target for abbreviated key
1213          * optimization.  Disable by setting state to be consistent with no
1214          * abbreviation support.
1215          */
1216         state->sortKeys->abbrev_converter = NULL;
1217         if (state->sortKeys->abbrev_full_comparator)
1218                 state->sortKeys->comparator = state->sortKeys->abbrev_full_comparator;
1219
1220         /* Not strictly necessary, but be tidy */
1221         state->sortKeys->abbrev_abort = NULL;
1222         state->sortKeys->abbrev_full_comparator = NULL;
1223 }
1224
1225 /*
1226  * tuplesort_end
1227  *
1228  *      Release resources and clean up.
1229  *
1230  * NOTE: after calling this, any pointers returned by tuplesort_getXXX are
1231  * pointing to garbage.  Be careful not to attempt to use or free such
1232  * pointers afterwards!
1233  */
1234 void
1235 tuplesort_end(Tuplesortstate *state)
1236 {
1237         /* context swap probably not needed, but let's be safe */
1238         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1239
1240 #ifdef TRACE_SORT
1241         long            spaceUsed;
1242
1243         if (state->tapeset)
1244                 spaceUsed = LogicalTapeSetBlocks(state->tapeset);
1245         else
1246                 spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
1247 #endif
1248
1249         /*
1250          * Delete temporary "tape" files, if any.
1251          *
1252          * Note: want to include this in reported total cost of sort, hence need
1253          * for two #ifdef TRACE_SORT sections.
1254          */
1255         if (state->tapeset)
1256                 LogicalTapeSetClose(state->tapeset);
1257
1258 #ifdef TRACE_SORT
1259         if (trace_sort)
1260         {
1261                 if (state->tapeset)
1262                         elog(LOG, "%s of worker %d ended, %ld disk blocks used: %s",
1263                                  SERIAL(state) ? "external sort" : "parallel external sort",
1264                                  state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
1265                 else
1266                         elog(LOG, "%s of worker %d ended, %ld KB used: %s",
1267                                  SERIAL(state) ? "internal sort" : "unperformed parallel sort",
1268                                  state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
1269         }
1270
1271         TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, spaceUsed);
1272 #else
1273
1274         /*
1275          * If you disabled TRACE_SORT, you can still probe sort__done, but you
1276          * ain't getting space-used stats.
1277          */
1278         TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, 0L);
1279 #endif
1280
1281         /* Free any execution state created for CLUSTER case */
1282         if (state->estate != NULL)
1283         {
1284                 ExprContext *econtext = GetPerTupleExprContext(state->estate);
1285
1286                 ExecDropSingleTupleTableSlot(econtext->ecxt_scantuple);
1287                 FreeExecutorState(state->estate);
1288         }
1289
1290         MemoryContextSwitchTo(oldcontext);
1291
1292         /*
1293          * Free the per-sort memory context, thereby releasing all working memory,
1294          * including the Tuplesortstate struct itself.
1295          */
1296         MemoryContextDelete(state->sortcontext);
1297 }
1298
1299 /*
1300  * Grow the memtuples[] array, if possible within our memory constraint.  We
1301  * must not exceed INT_MAX tuples in memory or the caller-provided memory
1302  * limit.  Return true if we were able to enlarge the array, false if not.
1303  *
1304  * Normally, at each increment we double the size of the array.  When doing
1305  * that would exceed a limit, we attempt one last, smaller increase (and then
1306  * clear the growmemtuples flag so we don't try any more).  That allows us to
1307  * use memory as fully as permitted; sticking to the pure doubling rule could
1308  * result in almost half going unused.  Because availMem moves around with
1309  * tuple addition/removal, we need some rule to prevent making repeated small
1310  * increases in memtupsize, which would just be useless thrashing.  The
1311  * growmemtuples flag accomplishes that and also prevents useless
1312  * recalculations in this function.
1313  */
1314 static bool
1315 grow_memtuples(Tuplesortstate *state)
1316 {
1317         int                     newmemtupsize;
1318         int                     memtupsize = state->memtupsize;
1319         int64           memNowUsed = state->allowedMem - state->availMem;
1320
1321         /* Forget it if we've already maxed out memtuples, per comment above */
1322         if (!state->growmemtuples)
1323                 return false;
1324
1325         /* Select new value of memtupsize */
1326         if (memNowUsed <= state->availMem)
1327         {
1328                 /*
1329                  * We've used no more than half of allowedMem; double our usage,
1330                  * clamping at INT_MAX tuples.
1331                  */
1332                 if (memtupsize < INT_MAX / 2)
1333                         newmemtupsize = memtupsize * 2;
1334                 else
1335                 {
1336                         newmemtupsize = INT_MAX;
1337                         state->growmemtuples = false;
1338                 }
1339         }
1340         else
1341         {
1342                 /*
1343                  * This will be the last increment of memtupsize.  Abandon doubling
1344                  * strategy and instead increase as much as we safely can.
1345                  *
1346                  * To stay within allowedMem, we can't increase memtupsize by more
1347                  * than availMem / sizeof(SortTuple) elements.  In practice, we want
1348                  * to increase it by considerably less, because we need to leave some
1349                  * space for the tuples to which the new array slots will refer.  We
1350                  * assume the new tuples will be about the same size as the tuples
1351                  * we've already seen, and thus we can extrapolate from the space
1352                  * consumption so far to estimate an appropriate new size for the
1353                  * memtuples array.  The optimal value might be higher or lower than
1354                  * this estimate, but it's hard to know that in advance.  We again
1355                  * clamp at INT_MAX tuples.
1356                  *
1357                  * This calculation is safe against enlarging the array so much that
1358                  * LACKMEM becomes true, because the memory currently used includes
1359                  * the present array; thus, there would be enough allowedMem for the
1360                  * new array elements even if no other memory were currently used.
1361                  *
1362                  * We do the arithmetic in float8, because otherwise the product of
1363                  * memtupsize and allowedMem could overflow.  Any inaccuracy in the
1364                  * result should be insignificant; but even if we computed a
1365                  * completely insane result, the checks below will prevent anything
1366                  * really bad from happening.
1367                  */
1368                 double          grow_ratio;
1369
1370                 grow_ratio = (double) state->allowedMem / (double) memNowUsed;
1371                 if (memtupsize * grow_ratio < INT_MAX)
1372                         newmemtupsize = (int) (memtupsize * grow_ratio);
1373                 else
1374                         newmemtupsize = INT_MAX;
1375
1376                 /* We won't make any further enlargement attempts */
1377                 state->growmemtuples = false;
1378         }
1379
1380         /* Must enlarge array by at least one element, else report failure */
1381         if (newmemtupsize <= memtupsize)
1382                 goto noalloc;
1383
1384         /*
1385          * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize.  Clamp
1386          * to ensure our request won't be rejected.  Note that we can easily
1387          * exhaust address space before facing this outcome.  (This is presently
1388          * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but
1389          * don't rely on that at this distance.)
1390          */
1391         if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(SortTuple))
1392         {
1393                 newmemtupsize = (int) (MaxAllocHugeSize / sizeof(SortTuple));
1394                 state->growmemtuples = false;   /* can't grow any more */
1395         }
1396
1397         /*
1398          * We need to be sure that we do not cause LACKMEM to become true, else
1399          * the space management algorithm will go nuts.  The code above should
1400          * never generate a dangerous request, but to be safe, check explicitly
1401          * that the array growth fits within availMem.  (We could still cause
1402          * LACKMEM if the memory chunk overhead associated with the memtuples
1403          * array were to increase.  That shouldn't happen because we chose the
1404          * initial array size large enough to ensure that palloc will be treating
1405          * both old and new arrays as separate chunks.  But we'll check LACKMEM
1406          * explicitly below just in case.)
1407          */
1408         if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(SortTuple)))
1409                 goto noalloc;
1410
1411         /* OK, do it */
1412         FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
1413         state->memtupsize = newmemtupsize;
1414         state->memtuples = (SortTuple *)
1415                 repalloc_huge(state->memtuples,
1416                                           state->memtupsize * sizeof(SortTuple));
1417         USEMEM(state, GetMemoryChunkSpace(state->memtuples));
1418         if (LACKMEM(state))
1419                 elog(ERROR, "unexpected out-of-memory situation in tuplesort");
1420         return true;
1421
1422 noalloc:
1423         /* If for any reason we didn't realloc, shut off future attempts */
1424         state->growmemtuples = false;
1425         return false;
1426 }
1427
1428 /*
1429  * Accept one tuple while collecting input data for sort.
1430  *
1431  * Note that the input data is always copied; the caller need not save it.
1432  */
1433 void
1434 tuplesort_puttupleslot(Tuplesortstate *state, TupleTableSlot *slot)
1435 {
1436         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1437         SortTuple       stup;
1438
1439         /*
1440          * Copy the given tuple into memory we control, and decrease availMem.
1441          * Then call the common code.
1442          */
1443         COPYTUP(state, &stup, (void *) slot);
1444
1445         puttuple_common(state, &stup);
1446
1447         MemoryContextSwitchTo(oldcontext);
1448 }
1449
1450 /*
1451  * Accept one tuple while collecting input data for sort.
1452  *
1453  * Note that the input data is always copied; the caller need not save it.
1454  */
1455 void
1456 tuplesort_putheaptuple(Tuplesortstate *state, HeapTuple tup)
1457 {
1458         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1459         SortTuple       stup;
1460
1461         /*
1462          * Copy the given tuple into memory we control, and decrease availMem.
1463          * Then call the common code.
1464          */
1465         COPYTUP(state, &stup, (void *) tup);
1466
1467         puttuple_common(state, &stup);
1468
1469         MemoryContextSwitchTo(oldcontext);
1470 }
1471
1472 /*
1473  * Collect one index tuple while collecting input data for sort, building
1474  * it from caller-supplied values.
1475  */
1476 void
1477 tuplesort_putindextuplevalues(Tuplesortstate *state, Relation rel,
1478                                                           ItemPointer self, Datum *values,
1479                                                           bool *isnull)
1480 {
1481         MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
1482         SortTuple       stup;
1483         Datum           original;
1484         IndexTuple      tuple;
1485
1486         stup.tuple = index_form_tuple(RelationGetDescr(rel), values, isnull);
1487         tuple = ((IndexTuple) stup.tuple);
1488         tuple->t_tid = *self;
1489         USEMEM(state, GetMemoryChunkSpace(stup.tuple));
1490         /* set up first-column key value */
1491         original = index_getattr(tuple,
1492                                                          1,
1493                                                          RelationGetDescr(state->indexRel),
1494                                                          &stup.isnull1);
1495
1496         MemoryContextSwitchTo(state->sortcontext);
1497
1498         if (!state->sortKeys || !state->sortKeys->abbrev_converter || stup.isnull1)
1499         {
1500                 /*
1501                  * Store ordinary Datum representation, or NULL value.  If there is a
1502                  * converter it won't expect NULL values, and cost model is not
1503                  * required to account for NULL, so in that case we avoid calling
1504                  * converter and just set datum1 to zeroed representation (to be
1505                  * consistent, and to support cheap inequality tests for NULL
1506                  * abbreviated keys).
1507                  */
1508                 stup.datum1 = original;
1509         }
1510         else if (!consider_abort_common(state))
1511         {
1512                 /* Store abbreviated key representation */
1513                 stup.datum1 = state->sortKeys->abbrev_converter(original,
1514                                                                                                                 state->sortKeys);
1515         }
1516         else
1517         {
1518                 /* Abort abbreviation */
1519                 int                     i;
1520
1521                 stup.datum1 = original;
1522
1523                 /*
1524                  * Set state to be consistent with never trying abbreviation.
1525                  *
1526                  * Alter datum1 representation in already-copied tuples, so as to
1527                  * ensure a consistent representation (current tuple was just
1528                  * handled).  It does not matter if some dumped tuples are already
1529                  * sorted on tape, since serialized tuples lack abbreviated keys
1530                  * (TSS_BUILDRUNS state prevents control reaching here in any case).
1531                  */
1532                 for (i = 0; i < state->memtupcount; i++)
1533                 {
1534                         SortTuple  *mtup = &state->memtuples[i];
1535
1536                         tuple = mtup->tuple;
1537                         mtup->datum1 = index_getattr(tuple,
1538                                                                                  1,
1539                                                                                  RelationGetDescr(state->indexRel),
1540                                                                                  &mtup->isnull1);
1541                 }
1542         }
1543
1544         puttuple_common(state, &stup);
1545
1546         MemoryContextSwitchTo(oldcontext);
1547 }
1548
1549 /*
1550  * Accept one Datum while collecting input data for sort.
1551  *
1552  * If the Datum is pass-by-ref type, the value will be copied.
1553  */
1554 void
1555 tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull)
1556 {
1557         MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
1558         SortTuple       stup;
1559
1560         /*
1561          * Pass-by-value types or null values are just stored directly in
1562          * stup.datum1 (and stup.tuple is not used and set to NULL).
1563          *
1564          * Non-null pass-by-reference values need to be copied into memory we
1565          * control, and possibly abbreviated. The copied value is pointed to by
1566          * stup.tuple and is treated as the canonical copy (e.g. to return via
1567          * tuplesort_getdatum or when writing to tape); stup.datum1 gets the
1568          * abbreviated value if abbreviation is happening, otherwise it's
1569          * identical to stup.tuple.
1570          */
1571
1572         if (isNull || !state->tuples)
1573         {
1574                 /*
1575                  * Set datum1 to zeroed representation for NULLs (to be consistent,
1576                  * and to support cheap inequality tests for NULL abbreviated keys).
1577                  */
1578                 stup.datum1 = !isNull ? val : (Datum) 0;
1579                 stup.isnull1 = isNull;
1580                 stup.tuple = NULL;              /* no separate storage */
1581                 MemoryContextSwitchTo(state->sortcontext);
1582         }
1583         else
1584         {
1585                 Datum           original = datumCopy(val, false, state->datumTypeLen);
1586
1587                 stup.isnull1 = false;
1588                 stup.tuple = DatumGetPointer(original);
1589                 USEMEM(state, GetMemoryChunkSpace(stup.tuple));
1590                 MemoryContextSwitchTo(state->sortcontext);
1591
1592                 if (!state->sortKeys->abbrev_converter)
1593                 {
1594                         stup.datum1 = original;
1595                 }
1596                 else if (!consider_abort_common(state))
1597                 {
1598                         /* Store abbreviated key representation */
1599                         stup.datum1 = state->sortKeys->abbrev_converter(original,
1600                                                                                                                         state->sortKeys);
1601                 }
1602                 else
1603                 {
1604                         /* Abort abbreviation */
1605                         int                     i;
1606
1607                         stup.datum1 = original;
1608
1609                         /*
1610                          * Set state to be consistent with never trying abbreviation.
1611                          *
1612                          * Alter datum1 representation in already-copied tuples, so as to
1613                          * ensure a consistent representation (current tuple was just
1614                          * handled).  It does not matter if some dumped tuples are already
1615                          * sorted on tape, since serialized tuples lack abbreviated keys
1616                          * (TSS_BUILDRUNS state prevents control reaching here in any
1617                          * case).
1618                          */
1619                         for (i = 0; i < state->memtupcount; i++)
1620                         {
1621                                 SortTuple  *mtup = &state->memtuples[i];
1622
1623                                 mtup->datum1 = PointerGetDatum(mtup->tuple);
1624                         }
1625                 }
1626         }
1627
1628         puttuple_common(state, &stup);
1629
1630         MemoryContextSwitchTo(oldcontext);
1631 }
1632
1633 /*
1634  * Shared code for tuple and datum cases.
1635  */
1636 static void
1637 puttuple_common(Tuplesortstate *state, SortTuple *tuple)
1638 {
1639         Assert(!LEADER(state));
1640
1641         switch (state->status)
1642         {
1643                 case TSS_INITIAL:
1644
1645                         /*
1646                          * Save the tuple into the unsorted array.  First, grow the array
1647                          * as needed.  Note that we try to grow the array when there is
1648                          * still one free slot remaining --- if we fail, there'll still be
1649                          * room to store the incoming tuple, and then we'll switch to
1650                          * tape-based operation.
1651                          */
1652                         if (state->memtupcount >= state->memtupsize - 1)
1653                         {
1654                                 (void) grow_memtuples(state);
1655                                 Assert(state->memtupcount < state->memtupsize);
1656                         }
1657                         state->memtuples[state->memtupcount++] = *tuple;
1658
1659                         /*
1660                          * Check if it's time to switch over to a bounded heapsort. We do
1661                          * so if the input tuple count exceeds twice the desired tuple
1662                          * count (this is a heuristic for where heapsort becomes cheaper
1663                          * than a quicksort), or if we've just filled workMem and have
1664                          * enough tuples to meet the bound.
1665                          *
1666                          * Note that once we enter TSS_BOUNDED state we will always try to
1667                          * complete the sort that way.  In the worst case, if later input
1668                          * tuples are larger than earlier ones, this might cause us to
1669                          * exceed workMem significantly.
1670                          */
1671                         if (state->bounded &&
1672                                 (state->memtupcount > state->bound * 2 ||
1673                                  (state->memtupcount > state->bound && LACKMEM(state))))
1674                         {
1675 #ifdef TRACE_SORT
1676                                 if (trace_sort)
1677                                         elog(LOG, "switching to bounded heapsort at %d tuples: %s",
1678                                                  state->memtupcount,
1679                                                  pg_rusage_show(&state->ru_start));
1680 #endif
1681                                 make_bounded_heap(state);
1682                                 return;
1683                         }
1684
1685                         /*
1686                          * Done if we still fit in available memory and have array slots.
1687                          */
1688                         if (state->memtupcount < state->memtupsize && !LACKMEM(state))
1689                                 return;
1690
1691                         /*
1692                          * Nope; time to switch to tape-based operation.
1693                          */
1694                         inittapes(state, true);
1695
1696                         /*
1697                          * Dump all tuples.
1698                          */
1699                         dumptuples(state, false);
1700                         break;
1701
1702                 case TSS_BOUNDED:
1703
1704                         /*
1705                          * We don't want to grow the array here, so check whether the new
1706                          * tuple can be discarded before putting it in.  This should be a
1707                          * good speed optimization, too, since when there are many more
1708                          * input tuples than the bound, most input tuples can be discarded
1709                          * with just this one comparison.  Note that because we currently
1710                          * have the sort direction reversed, we must check for <= not >=.
1711                          */
1712                         if (COMPARETUP(state, tuple, &state->memtuples[0]) <= 0)
1713                         {
1714                                 /* new tuple <= top of the heap, so we can discard it */
1715                                 free_sort_tuple(state, tuple);
1716                                 CHECK_FOR_INTERRUPTS();
1717                         }
1718                         else
1719                         {
1720                                 /* discard top of heap, replacing it with the new tuple */
1721                                 free_sort_tuple(state, &state->memtuples[0]);
1722                                 tuplesort_heap_replace_top(state, tuple);
1723                         }
1724                         break;
1725
1726                 case TSS_BUILDRUNS:
1727
1728                         /*
1729                          * Save the tuple into the unsorted array (there must be space)
1730                          */
1731                         state->memtuples[state->memtupcount++] = *tuple;
1732
1733                         /*
1734                          * If we are over the memory limit, dump all tuples.
1735                          */
1736                         dumptuples(state, false);
1737                         break;
1738
1739                 default:
1740                         elog(ERROR, "invalid tuplesort state");
1741                         break;
1742         }
1743 }
1744
1745 static bool
1746 consider_abort_common(Tuplesortstate *state)
1747 {
1748         Assert(state->sortKeys[0].abbrev_converter != NULL);
1749         Assert(state->sortKeys[0].abbrev_abort != NULL);
1750         Assert(state->sortKeys[0].abbrev_full_comparator != NULL);
1751
1752         /*
1753          * Check effectiveness of abbreviation optimization.  Consider aborting
1754          * when still within memory limit.
1755          */
1756         if (state->status == TSS_INITIAL &&
1757                 state->memtupcount >= state->abbrevNext)
1758         {
1759                 state->abbrevNext *= 2;
1760
1761                 /*
1762                  * Check opclass-supplied abbreviation abort routine.  It may indicate
1763                  * that abbreviation should not proceed.
1764                  */
1765                 if (!state->sortKeys->abbrev_abort(state->memtupcount,
1766                                                                                    state->sortKeys))
1767                         return false;
1768
1769                 /*
1770                  * Finally, restore authoritative comparator, and indicate that
1771                  * abbreviation is not in play by setting abbrev_converter to NULL
1772                  */
1773                 state->sortKeys[0].comparator = state->sortKeys[0].abbrev_full_comparator;
1774                 state->sortKeys[0].abbrev_converter = NULL;
1775                 /* Not strictly necessary, but be tidy */
1776                 state->sortKeys[0].abbrev_abort = NULL;
1777                 state->sortKeys[0].abbrev_full_comparator = NULL;
1778
1779                 /* Give up - expect original pass-by-value representation */
1780                 return true;
1781         }
1782
1783         return false;
1784 }
1785
1786 /*
1787  * All tuples have been provided; finish the sort.
1788  */
1789 void
1790 tuplesort_performsort(Tuplesortstate *state)
1791 {
1792         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1793
1794 #ifdef TRACE_SORT
1795         if (trace_sort)
1796                 elog(LOG, "performsort of worker %d starting: %s",
1797                          state->worker, pg_rusage_show(&state->ru_start));
1798 #endif
1799
1800         switch (state->status)
1801         {
1802                 case TSS_INITIAL:
1803
1804                         /*
1805                          * We were able to accumulate all the tuples within the allowed
1806                          * amount of memory, or leader to take over worker tapes
1807                          */
1808                         if (SERIAL(state))
1809                         {
1810                                 /* Just qsort 'em and we're done */
1811                                 tuplesort_sort_memtuples(state);
1812                                 state->status = TSS_SORTEDINMEM;
1813                         }
1814                         else if (WORKER(state))
1815                         {
1816                                 /*
1817                                  * Parallel workers must still dump out tuples to tape.  No
1818                                  * merge is required to produce single output run, though.
1819                                  */
1820                                 inittapes(state, false);
1821                                 dumptuples(state, true);
1822                                 worker_nomergeruns(state);
1823                                 state->status = TSS_SORTEDONTAPE;
1824                         }
1825                         else
1826                         {
1827                                 /*
1828                                  * Leader will take over worker tapes and merge worker runs.
1829                                  * Note that mergeruns sets the correct state->status.
1830                                  */
1831                                 leader_takeover_tapes(state);
1832                                 mergeruns(state);
1833                         }
1834                         state->current = 0;
1835                         state->eof_reached = false;
1836                         state->markpos_block = 0L;
1837                         state->markpos_offset = 0;
1838                         state->markpos_eof = false;
1839                         break;
1840
1841                 case TSS_BOUNDED:
1842
1843                         /*
1844                          * We were able to accumulate all the tuples required for output
1845                          * in memory, using a heap to eliminate excess tuples.  Now we
1846                          * have to transform the heap to a properly-sorted array.
1847                          */
1848                         sort_bounded_heap(state);
1849                         state->current = 0;
1850                         state->eof_reached = false;
1851                         state->markpos_offset = 0;
1852                         state->markpos_eof = false;
1853                         state->status = TSS_SORTEDINMEM;
1854                         break;
1855
1856                 case TSS_BUILDRUNS:
1857
1858                         /*
1859                          * Finish tape-based sort.  First, flush all tuples remaining in
1860                          * memory out to tape; then merge until we have a single remaining
1861                          * run (or, if !randomAccess and !WORKER(), one run per tape).
1862                          * Note that mergeruns sets the correct state->status.
1863                          */
1864                         dumptuples(state, true);
1865                         mergeruns(state);
1866                         state->eof_reached = false;
1867                         state->markpos_block = 0L;
1868                         state->markpos_offset = 0;
1869                         state->markpos_eof = false;
1870                         break;
1871
1872                 default:
1873                         elog(ERROR, "invalid tuplesort state");
1874                         break;
1875         }
1876
1877 #ifdef TRACE_SORT
1878         if (trace_sort)
1879         {
1880                 if (state->status == TSS_FINALMERGE)
1881                         elog(LOG, "performsort of worker %d done (except %d-way final merge): %s",
1882                                  state->worker, state->activeTapes,
1883                                  pg_rusage_show(&state->ru_start));
1884                 else
1885                         elog(LOG, "performsort of worker %d done: %s",
1886                                  state->worker, pg_rusage_show(&state->ru_start));
1887         }
1888 #endif
1889
1890         MemoryContextSwitchTo(oldcontext);
1891 }
1892
1893 /*
1894  * Internal routine to fetch the next tuple in either forward or back
1895  * direction into *stup.  Returns false if no more tuples.
1896  * Returned tuple belongs to tuplesort memory context, and must not be freed
1897  * by caller.  Note that fetched tuple is stored in memory that may be
1898  * recycled by any future fetch.
1899  */
1900 static bool
1901 tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
1902                                                   SortTuple *stup)
1903 {
1904         unsigned int tuplen;
1905         size_t          nmoved;
1906
1907         Assert(!WORKER(state));
1908
1909         switch (state->status)
1910         {
1911                 case TSS_SORTEDINMEM:
1912                         Assert(forward || state->randomAccess);
1913                         Assert(!state->slabAllocatorUsed);
1914                         if (forward)
1915                         {
1916                                 if (state->current < state->memtupcount)
1917                                 {
1918                                         *stup = state->memtuples[state->current++];
1919                                         return true;
1920                                 }
1921                                 state->eof_reached = true;
1922
1923                                 /*
1924                                  * Complain if caller tries to retrieve more tuples than
1925                                  * originally asked for in a bounded sort.  This is because
1926                                  * returning EOF here might be the wrong thing.
1927                                  */
1928                                 if (state->bounded && state->current >= state->bound)
1929                                         elog(ERROR, "retrieved too many tuples in a bounded sort");
1930
1931                                 return false;
1932                         }
1933                         else
1934                         {
1935                                 if (state->current <= 0)
1936                                         return false;
1937
1938                                 /*
1939                                  * if all tuples are fetched already then we return last
1940                                  * tuple, else - tuple before last returned.
1941                                  */
1942                                 if (state->eof_reached)
1943                                         state->eof_reached = false;
1944                                 else
1945                                 {
1946                                         state->current--;       /* last returned tuple */
1947                                         if (state->current <= 0)
1948                                                 return false;
1949                                 }
1950                                 *stup = state->memtuples[state->current - 1];
1951                                 return true;
1952                         }
1953                         break;
1954
1955                 case TSS_SORTEDONTAPE:
1956                         Assert(forward || state->randomAccess);
1957                         Assert(state->slabAllocatorUsed);
1958
1959                         /*
1960                          * The slot that held the tuple that we returned in previous
1961                          * gettuple call can now be reused.
1962                          */
1963                         if (state->lastReturnedTuple)
1964                         {
1965                                 RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
1966                                 state->lastReturnedTuple = NULL;
1967                         }
1968
1969                         if (forward)
1970                         {
1971                                 if (state->eof_reached)
1972                                         return false;
1973
1974                                 if ((tuplen = getlen(state, state->result_tape, true)) != 0)
1975                                 {
1976                                         READTUP(state, stup, state->result_tape, tuplen);
1977
1978                                         /*
1979                                          * Remember the tuple we return, so that we can recycle
1980                                          * its memory on next call.  (This can be NULL, in the
1981                                          * !state->tuples case).
1982                                          */
1983                                         state->lastReturnedTuple = stup->tuple;
1984
1985                                         return true;
1986                                 }
1987                                 else
1988                                 {
1989                                         state->eof_reached = true;
1990                                         return false;
1991                                 }
1992                         }
1993
1994                         /*
1995                          * Backward.
1996                          *
1997                          * if all tuples are fetched already then we return last tuple,
1998                          * else - tuple before last returned.
1999                          */
2000                         if (state->eof_reached)
2001                         {
2002                                 /*
2003                                  * Seek position is pointing just past the zero tuplen at the
2004                                  * end of file; back up to fetch last tuple's ending length
2005                                  * word.  If seek fails we must have a completely empty file.
2006                                  */
2007                                 nmoved = LogicalTapeBackspace(state->tapeset,
2008                                                                                           state->result_tape,
2009                                                                                           2 * sizeof(unsigned int));
2010                                 if (nmoved == 0)
2011                                         return false;
2012                                 else if (nmoved != 2 * sizeof(unsigned int))
2013                                         elog(ERROR, "unexpected tape position");
2014                                 state->eof_reached = false;
2015                         }
2016                         else
2017                         {
2018                                 /*
2019                                  * Back up and fetch previously-returned tuple's ending length
2020                                  * word.  If seek fails, assume we are at start of file.
2021                                  */
2022                                 nmoved = LogicalTapeBackspace(state->tapeset,
2023                                                                                           state->result_tape,
2024                                                                                           sizeof(unsigned int));
2025                                 if (nmoved == 0)
2026                                         return false;
2027                                 else if (nmoved != sizeof(unsigned int))
2028                                         elog(ERROR, "unexpected tape position");
2029                                 tuplen = getlen(state, state->result_tape, false);
2030
2031                                 /*
2032                                  * Back up to get ending length word of tuple before it.
2033                                  */
2034                                 nmoved = LogicalTapeBackspace(state->tapeset,
2035                                                                                           state->result_tape,
2036                                                                                           tuplen + 2 * sizeof(unsigned int));
2037                                 if (nmoved == tuplen + sizeof(unsigned int))
2038                                 {
2039                                         /*
2040                                          * We backed up over the previous tuple, but there was no
2041                                          * ending length word before it.  That means that the prev
2042                                          * tuple is the first tuple in the file.  It is now the
2043                                          * next to read in forward direction (not obviously right,
2044                                          * but that is what in-memory case does).
2045                                          */
2046                                         return false;
2047                                 }
2048                                 else if (nmoved != tuplen + 2 * sizeof(unsigned int))
2049                                         elog(ERROR, "bogus tuple length in backward scan");
2050                         }
2051
2052                         tuplen = getlen(state, state->result_tape, false);
2053
2054                         /*
2055                          * Now we have the length of the prior tuple, back up and read it.
2056                          * Note: READTUP expects we are positioned after the initial
2057                          * length word of the tuple, so back up to that point.
2058                          */
2059                         nmoved = LogicalTapeBackspace(state->tapeset,
2060                                                                                   state->result_tape,
2061                                                                                   tuplen);
2062                         if (nmoved != tuplen)
2063                                 elog(ERROR, "bogus tuple length in backward scan");
2064                         READTUP(state, stup, state->result_tape, tuplen);
2065
2066                         /*
2067                          * Remember the tuple we return, so that we can recycle its memory
2068                          * on next call. (This can be NULL, in the Datum case).
2069                          */
2070                         state->lastReturnedTuple = stup->tuple;
2071
2072                         return true;
2073
2074                 case TSS_FINALMERGE:
2075                         Assert(forward);
2076                         /* We are managing memory ourselves, with the slab allocator. */
2077                         Assert(state->slabAllocatorUsed);
2078
2079                         /*
2080                          * The slab slot holding the tuple that we returned in previous
2081                          * gettuple call can now be reused.
2082                          */
2083                         if (state->lastReturnedTuple)
2084                         {
2085                                 RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
2086                                 state->lastReturnedTuple = NULL;
2087                         }
2088
2089                         /*
2090                          * This code should match the inner loop of mergeonerun().
2091                          */
2092                         if (state->memtupcount > 0)
2093                         {
2094                                 int                     srcTape = state->memtuples[0].tupindex;
2095                                 SortTuple       newtup;
2096
2097                                 *stup = state->memtuples[0];
2098
2099                                 /*
2100                                  * Remember the tuple we return, so that we can recycle its
2101                                  * memory on next call. (This can be NULL, in the Datum case).
2102                                  */
2103                                 state->lastReturnedTuple = stup->tuple;
2104
2105                                 /*
2106                                  * Pull next tuple from tape, and replace the returned tuple
2107                                  * at top of the heap with it.
2108                                  */
2109                                 if (!mergereadnext(state, srcTape, &newtup))
2110                                 {
2111                                         /*
2112                                          * If no more data, we've reached end of run on this tape.
2113                                          * Remove the top node from the heap.
2114                                          */
2115                                         tuplesort_heap_delete_top(state);
2116
2117                                         /*
2118                                          * Rewind to free the read buffer.  It'd go away at the
2119                                          * end of the sort anyway, but better to release the
2120                                          * memory early.
2121                                          */
2122                                         LogicalTapeRewindForWrite(state->tapeset, srcTape);
2123                                         return true;
2124                                 }
2125                                 newtup.tupindex = srcTape;
2126                                 tuplesort_heap_replace_top(state, &newtup);
2127                                 return true;
2128                         }
2129                         return false;
2130
2131                 default:
2132                         elog(ERROR, "invalid tuplesort state");
2133                         return false;           /* keep compiler quiet */
2134         }
2135 }
2136
2137 /*
2138  * Fetch the next tuple in either forward or back direction.
2139  * If successful, put tuple in slot and return true; else, clear the slot
2140  * and return false.
2141  *
2142  * Caller may optionally be passed back abbreviated value (on true return
2143  * value) when abbreviation was used, which can be used to cheaply avoid
2144  * equality checks that might otherwise be required.  Caller can safely make a
2145  * determination of "non-equal tuple" based on simple binary inequality.  A
2146  * NULL value in leading attribute will set abbreviated value to zeroed
2147  * representation, which caller may rely on in abbreviated inequality check.
2148  *
2149  * If copy is true, the slot receives a tuple that's been copied into the
2150  * caller's memory context, so that it will stay valid regardless of future
2151  * manipulations of the tuplesort's state (up to and including deleting the
2152  * tuplesort).  If copy is false, the slot will just receive a pointer to a
2153  * tuple held within the tuplesort, which is more efficient, but only safe for
2154  * callers that are prepared to have any subsequent manipulation of the
2155  * tuplesort's state invalidate slot contents.
2156  */
2157 bool
2158 tuplesort_gettupleslot(Tuplesortstate *state, bool forward, bool copy,
2159                                            TupleTableSlot *slot, Datum *abbrev)
2160 {
2161         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
2162         SortTuple       stup;
2163
2164         if (!tuplesort_gettuple_common(state, forward, &stup))
2165                 stup.tuple = NULL;
2166
2167         MemoryContextSwitchTo(oldcontext);
2168
2169         if (stup.tuple)
2170         {
2171                 /* Record abbreviated key for caller */
2172                 if (state->sortKeys->abbrev_converter && abbrev)
2173                         *abbrev = stup.datum1;
2174
2175                 if (copy)
2176                         stup.tuple = heap_copy_minimal_tuple((MinimalTuple) stup.tuple);
2177
2178                 ExecStoreMinimalTuple((MinimalTuple) stup.tuple, slot, copy);
2179                 return true;
2180         }
2181         else
2182         {
2183                 ExecClearTuple(slot);
2184                 return false;
2185         }
2186 }
2187
2188 /*
2189  * Fetch the next tuple in either forward or back direction.
2190  * Returns NULL if no more tuples.  Returned tuple belongs to tuplesort memory
2191  * context, and must not be freed by caller.  Caller may not rely on tuple
2192  * remaining valid after any further manipulation of tuplesort.
2193  */
2194 HeapTuple
2195 tuplesort_getheaptuple(Tuplesortstate *state, bool forward)
2196 {
2197         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
2198         SortTuple       stup;
2199
2200         if (!tuplesort_gettuple_common(state, forward, &stup))
2201                 stup.tuple = NULL;
2202
2203         MemoryContextSwitchTo(oldcontext);
2204
2205         return stup.tuple;
2206 }
2207
2208 /*
2209  * Fetch the next index tuple in either forward or back direction.
2210  * Returns NULL if no more tuples.  Returned tuple belongs to tuplesort memory
2211  * context, and must not be freed by caller.  Caller may not rely on tuple
2212  * remaining valid after any further manipulation of tuplesort.
2213  */
2214 IndexTuple
2215 tuplesort_getindextuple(Tuplesortstate *state, bool forward)
2216 {
2217         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
2218         SortTuple       stup;
2219
2220         if (!tuplesort_gettuple_common(state, forward, &stup))
2221                 stup.tuple = NULL;
2222
2223         MemoryContextSwitchTo(oldcontext);
2224
2225         return (IndexTuple) stup.tuple;
2226 }
2227
2228 /*
2229  * Fetch the next Datum in either forward or back direction.
2230  * Returns false if no more datums.
2231  *
2232  * If the Datum is pass-by-ref type, the returned value is freshly palloc'd
2233  * in caller's context, and is now owned by the caller (this differs from
2234  * similar routines for other types of tuplesorts).
2235  *
2236  * Caller may optionally be passed back abbreviated value (on true return
2237  * value) when abbreviation was used, which can be used to cheaply avoid
2238  * equality checks that might otherwise be required.  Caller can safely make a
2239  * determination of "non-equal tuple" based on simple binary inequality.  A
2240  * NULL value will have a zeroed abbreviated value representation, which caller
2241  * may rely on in abbreviated inequality check.
2242  */
2243 bool
2244 tuplesort_getdatum(Tuplesortstate *state, bool forward,
2245                                    Datum *val, bool *isNull, Datum *abbrev)
2246 {
2247         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
2248         SortTuple       stup;
2249
2250         if (!tuplesort_gettuple_common(state, forward, &stup))
2251         {
2252                 MemoryContextSwitchTo(oldcontext);
2253                 return false;
2254         }
2255
2256         /* Ensure we copy into caller's memory context */
2257         MemoryContextSwitchTo(oldcontext);
2258
2259         /* Record abbreviated key for caller */
2260         if (state->sortKeys->abbrev_converter && abbrev)
2261                 *abbrev = stup.datum1;
2262
2263         if (stup.isnull1 || !state->tuples)
2264         {
2265                 *val = stup.datum1;
2266                 *isNull = stup.isnull1;
2267         }
2268         else
2269         {
2270                 /* use stup.tuple because stup.datum1 may be an abbreviation */
2271                 *val = datumCopy(PointerGetDatum(stup.tuple), false, state->datumTypeLen);
2272                 *isNull = false;
2273         }
2274
2275         return true;
2276 }
2277
2278 /*
2279  * Advance over N tuples in either forward or back direction,
2280  * without returning any data.  N==0 is a no-op.
2281  * Returns true if successful, false if ran out of tuples.
2282  */
2283 bool
2284 tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
2285 {
2286         MemoryContext oldcontext;
2287
2288         /*
2289          * We don't actually support backwards skip yet, because no callers need
2290          * it.  The API is designed to allow for that later, though.
2291          */
2292         Assert(forward);
2293         Assert(ntuples >= 0);
2294         Assert(!WORKER(state));
2295
2296         switch (state->status)
2297         {
2298                 case TSS_SORTEDINMEM:
2299                         if (state->memtupcount - state->current >= ntuples)
2300                         {
2301                                 state->current += ntuples;
2302                                 return true;
2303                         }
2304                         state->current = state->memtupcount;
2305                         state->eof_reached = true;
2306
2307                         /*
2308                          * Complain if caller tries to retrieve more tuples than
2309                          * originally asked for in a bounded sort.  This is because
2310                          * returning EOF here might be the wrong thing.
2311                          */
2312                         if (state->bounded && state->current >= state->bound)
2313                                 elog(ERROR, "retrieved too many tuples in a bounded sort");
2314
2315                         return false;
2316
2317                 case TSS_SORTEDONTAPE:
2318                 case TSS_FINALMERGE:
2319
2320                         /*
2321                          * We could probably optimize these cases better, but for now it's
2322                          * not worth the trouble.
2323                          */
2324                         oldcontext = MemoryContextSwitchTo(state->sortcontext);
2325                         while (ntuples-- > 0)
2326                         {
2327                                 SortTuple       stup;
2328
2329                                 if (!tuplesort_gettuple_common(state, forward, &stup))
2330                                 {
2331                                         MemoryContextSwitchTo(oldcontext);
2332                                         return false;
2333                                 }
2334                                 CHECK_FOR_INTERRUPTS();
2335                         }
2336                         MemoryContextSwitchTo(oldcontext);
2337                         return true;
2338
2339                 default:
2340                         elog(ERROR, "invalid tuplesort state");
2341                         return false;           /* keep compiler quiet */
2342         }
2343 }
2344
2345 /*
2346  * tuplesort_merge_order - report merge order we'll use for given memory
2347  * (note: "merge order" just means the number of input tapes in the merge).
2348  *
2349  * This is exported for use by the planner.  allowedMem is in bytes.
2350  */
2351 int
2352 tuplesort_merge_order(int64 allowedMem)
2353 {
2354         int                     mOrder;
2355
2356         /*
2357          * We need one tape for each merge input, plus another one for the output,
2358          * and each of these tapes needs buffer space.  In addition we want
2359          * MERGE_BUFFER_SIZE workspace per input tape (but the output tape doesn't
2360          * count).
2361          *
2362          * Note: you might be thinking we need to account for the memtuples[]
2363          * array in this calculation, but we effectively treat that as part of the
2364          * MERGE_BUFFER_SIZE workspace.
2365          */
2366         mOrder = (allowedMem - TAPE_BUFFER_OVERHEAD) /
2367                 (MERGE_BUFFER_SIZE + TAPE_BUFFER_OVERHEAD);
2368
2369         /*
2370          * Even in minimum memory, use at least a MINORDER merge.  On the other
2371          * hand, even when we have lots of memory, do not use more than a MAXORDER
2372          * merge.  Tapes are pretty cheap, but they're not entirely free.  Each
2373          * additional tape reduces the amount of memory available to build runs,
2374          * which in turn can cause the same sort to need more runs, which makes
2375          * merging slower even if it can still be done in a single pass.  Also,
2376          * high order merges are quite slow due to CPU cache effects; it can be
2377          * faster to pay the I/O cost of a polyphase merge than to perform a
2378          * single merge pass across many hundreds of tapes.
2379          */
2380         mOrder = Max(mOrder, MINORDER);
2381         mOrder = Min(mOrder, MAXORDER);
2382
2383         return mOrder;
2384 }
2385
2386 /*
2387  * inittapes - initialize for tape sorting.
2388  *
2389  * This is called only if we have found we won't sort in memory.
2390  */
2391 static void
2392 inittapes(Tuplesortstate *state, bool mergeruns)
2393 {
2394         int                     maxTapes,
2395                                 j;
2396
2397         Assert(!LEADER(state));
2398
2399         if (mergeruns)
2400         {
2401                 /* Compute number of tapes to use: merge order plus 1 */
2402                 maxTapes = tuplesort_merge_order(state->allowedMem) + 1;
2403         }
2404         else
2405         {
2406                 /* Workers can sometimes produce single run, output without merge */
2407                 Assert(WORKER(state));
2408                 maxTapes = MINORDER + 1;
2409         }
2410
2411 #ifdef TRACE_SORT
2412         if (trace_sort)
2413                 elog(LOG, "worker %d switching to external sort with %d tapes: %s",
2414                          state->worker, maxTapes, pg_rusage_show(&state->ru_start));
2415 #endif
2416
2417         /* Create the tape set and allocate the per-tape data arrays */
2418         inittapestate(state, maxTapes);
2419         state->tapeset =
2420                 LogicalTapeSetCreate(maxTapes, NULL,
2421                                                          state->shared ? &state->shared->fileset : NULL,
2422                                                          state->worker);
2423
2424         state->currentRun = 0;
2425
2426         /*
2427          * Initialize variables of Algorithm D (step D1).
2428          */
2429         for (j = 0; j < maxTapes; j++)
2430         {
2431                 state->tp_fib[j] = 1;
2432                 state->tp_runs[j] = 0;
2433                 state->tp_dummy[j] = 1;
2434                 state->tp_tapenum[j] = j;
2435         }
2436         state->tp_fib[state->tapeRange] = 0;
2437         state->tp_dummy[state->tapeRange] = 0;
2438
2439         state->Level = 1;
2440         state->destTape = 0;
2441
2442         state->status = TSS_BUILDRUNS;
2443 }
2444
2445 /*
2446  * inittapestate - initialize generic tape management state
2447  */
2448 static void
2449 inittapestate(Tuplesortstate *state, int maxTapes)
2450 {
2451         int64           tapeSpace;
2452
2453         /*
2454          * Decrease availMem to reflect the space needed for tape buffers; but
2455          * don't decrease it to the point that we have no room for tuples. (That
2456          * case is only likely to occur if sorting pass-by-value Datums; in all
2457          * other scenarios the memtuples[] array is unlikely to occupy more than
2458          * half of allowedMem.  In the pass-by-value case it's not important to
2459          * account for tuple space, so we don't care if LACKMEM becomes
2460          * inaccurate.)
2461          */
2462         tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD;
2463
2464         if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
2465                 USEMEM(state, tapeSpace);
2466
2467         /*
2468          * Make sure that the temp file(s) underlying the tape set are created in
2469          * suitable temp tablespaces.  For parallel sorts, this should have been
2470          * called already, but it doesn't matter if it is called a second time.
2471          */
2472         PrepareTempTablespaces();
2473
2474         state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool));
2475         state->tp_fib = (int *) palloc0(maxTapes * sizeof(int));
2476         state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
2477         state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int));
2478         state->tp_tapenum = (int *) palloc0(maxTapes * sizeof(int));
2479
2480         /* Record # of tapes allocated (for duration of sort) */
2481         state->maxTapes = maxTapes;
2482         /* Record maximum # of tapes usable as inputs when merging */
2483         state->tapeRange = maxTapes - 1;
2484 }
2485
2486 /*
2487  * selectnewtape -- select new tape for new initial run.
2488  *
2489  * This is called after finishing a run when we know another run
2490  * must be started.  This implements steps D3, D4 of Algorithm D.
2491  */
2492 static void
2493 selectnewtape(Tuplesortstate *state)
2494 {
2495         int                     j;
2496         int                     a;
2497
2498         /* Step D3: advance j (destTape) */
2499         if (state->tp_dummy[state->destTape] < state->tp_dummy[state->destTape + 1])
2500         {
2501                 state->destTape++;
2502                 return;
2503         }
2504         if (state->tp_dummy[state->destTape] != 0)
2505         {
2506                 state->destTape = 0;
2507                 return;
2508         }
2509
2510         /* Step D4: increase level */
2511         state->Level++;
2512         a = state->tp_fib[0];
2513         for (j = 0; j < state->tapeRange; j++)
2514         {
2515                 state->tp_dummy[j] = a + state->tp_fib[j + 1] - state->tp_fib[j];
2516                 state->tp_fib[j] = a + state->tp_fib[j + 1];
2517         }
2518         state->destTape = 0;
2519 }
2520
2521 /*
2522  * Initialize the slab allocation arena, for the given number of slots.
2523  */
2524 static void
2525 init_slab_allocator(Tuplesortstate *state, int numSlots)
2526 {
2527         if (numSlots > 0)
2528         {
2529                 char       *p;
2530                 int                     i;
2531
2532                 state->slabMemoryBegin = palloc(numSlots * SLAB_SLOT_SIZE);
2533                 state->slabMemoryEnd = state->slabMemoryBegin +
2534                         numSlots * SLAB_SLOT_SIZE;
2535                 state->slabFreeHead = (SlabSlot *) state->slabMemoryBegin;
2536                 USEMEM(state, numSlots * SLAB_SLOT_SIZE);
2537
2538                 p = state->slabMemoryBegin;
2539                 for (i = 0; i < numSlots - 1; i++)
2540                 {
2541                         ((SlabSlot *) p)->nextfree = (SlabSlot *) (p + SLAB_SLOT_SIZE);
2542                         p += SLAB_SLOT_SIZE;
2543                 }
2544                 ((SlabSlot *) p)->nextfree = NULL;
2545         }
2546         else
2547         {
2548                 state->slabMemoryBegin = state->slabMemoryEnd = NULL;
2549                 state->slabFreeHead = NULL;
2550         }
2551         state->slabAllocatorUsed = true;
2552 }
2553
2554 /*
2555  * mergeruns -- merge all the completed initial runs.
2556  *
2557  * This implements steps D5, D6 of Algorithm D.  All input data has
2558  * already been written to initial runs on tape (see dumptuples).
2559  */
2560 static void
2561 mergeruns(Tuplesortstate *state)
2562 {
2563         int                     tapenum,
2564                                 svTape,
2565                                 svRuns,
2566                                 svDummy;
2567         int                     numTapes;
2568         int                     numInputTapes;
2569
2570         Assert(state->status == TSS_BUILDRUNS);
2571         Assert(state->memtupcount == 0);
2572
2573         if (state->sortKeys != NULL && state->sortKeys->abbrev_converter != NULL)
2574         {
2575                 /*
2576                  * If there are multiple runs to be merged, when we go to read back
2577                  * tuples from disk, abbreviated keys will not have been stored, and
2578                  * we don't care to regenerate them.  Disable abbreviation from this
2579                  * point on.
2580                  */
2581                 state->sortKeys->abbrev_converter = NULL;
2582                 state->sortKeys->comparator = state->sortKeys->abbrev_full_comparator;
2583
2584                 /* Not strictly necessary, but be tidy */
2585                 state->sortKeys->abbrev_abort = NULL;
2586                 state->sortKeys->abbrev_full_comparator = NULL;
2587         }
2588
2589         /*
2590          * Reset tuple memory.  We've freed all the tuples that we previously
2591          * allocated.  We will use the slab allocator from now on.
2592          */
2593         MemoryContextDelete(state->tuplecontext);
2594         state->tuplecontext = NULL;
2595
2596         /*
2597          * We no longer need a large memtuples array.  (We will allocate a smaller
2598          * one for the heap later.)
2599          */
2600         FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
2601         pfree(state->memtuples);
2602         state->memtuples = NULL;
2603
2604         /*
2605          * If we had fewer runs than tapes, refund the memory that we imagined we
2606          * would need for the tape buffers of the unused tapes.
2607          *
2608          * numTapes and numInputTapes reflect the actual number of tapes we will
2609          * use.  Note that the output tape's tape number is maxTapes - 1, so the
2610          * tape numbers of the used tapes are not consecutive, and you cannot just
2611          * loop from 0 to numTapes to visit all used tapes!
2612          */
2613         if (state->Level == 1)
2614         {
2615                 numInputTapes = state->currentRun;
2616                 numTapes = numInputTapes + 1;
2617                 FREEMEM(state, (state->maxTapes - numTapes) * TAPE_BUFFER_OVERHEAD);
2618         }
2619         else
2620         {
2621                 numInputTapes = state->tapeRange;
2622                 numTapes = state->maxTapes;
2623         }
2624
2625         /*
2626          * Initialize the slab allocator.  We need one slab slot per input tape,
2627          * for the tuples in the heap, plus one to hold the tuple last returned
2628          * from tuplesort_gettuple.  (If we're sorting pass-by-val Datums,
2629          * however, we don't need to do allocate anything.)
2630          *
2631          * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism
2632          * to track memory usage of individual tuples.
2633          */
2634         if (state->tuples)
2635                 init_slab_allocator(state, numInputTapes + 1);
2636         else
2637                 init_slab_allocator(state, 0);
2638
2639         /*
2640          * Allocate a new 'memtuples' array, for the heap.  It will hold one tuple
2641          * from each input tape.
2642          */
2643         state->memtupsize = numInputTapes;
2644         state->memtuples = (SortTuple *) palloc(numInputTapes * sizeof(SortTuple));
2645         USEMEM(state, GetMemoryChunkSpace(state->memtuples));
2646
2647         /*
2648          * Use all the remaining memory we have available for read buffers among
2649          * the input tapes.
2650          *
2651          * We don't try to "rebalance" the memory among tapes, when we start a new
2652          * merge phase, even if some tapes are inactive in the new phase.  That
2653          * would be hard, because logtape.c doesn't know where one run ends and
2654          * another begins.  When a new merge phase begins, and a tape doesn't
2655          * participate in it, its buffer nevertheless already contains tuples from
2656          * the next run on same tape, so we cannot release the buffer.  That's OK
2657          * in practice, merge performance isn't that sensitive to the amount of
2658          * buffers used, and most merge phases use all or almost all tapes,
2659          * anyway.
2660          */
2661 #ifdef TRACE_SORT
2662         if (trace_sort)
2663                 elog(LOG, "worker %d using " INT64_FORMAT " KB of memory for read buffers among %d input tapes",
2664                          state->worker, state->availMem / 1024, numInputTapes);
2665 #endif
2666
2667         state->read_buffer_size = Max(state->availMem / numInputTapes, 0);
2668         USEMEM(state, state->read_buffer_size * numInputTapes);
2669
2670         /* End of step D2: rewind all output tapes to prepare for merging */
2671         for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
2672                 LogicalTapeRewindForRead(state->tapeset, tapenum, state->read_buffer_size);
2673
2674         for (;;)
2675         {
2676                 /*
2677                  * At this point we know that tape[T] is empty.  If there's just one
2678                  * (real or dummy) run left on each input tape, then only one merge
2679                  * pass remains.  If we don't have to produce a materialized sorted
2680                  * tape, we can stop at this point and do the final merge on-the-fly.
2681                  */
2682                 if (!state->randomAccess && !WORKER(state))
2683                 {
2684                         bool            allOneRun = true;
2685
2686                         Assert(state->tp_runs[state->tapeRange] == 0);
2687                         for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
2688                         {
2689                                 if (state->tp_runs[tapenum] + state->tp_dummy[tapenum] != 1)
2690                                 {
2691                                         allOneRun = false;
2692                                         break;
2693                                 }
2694                         }
2695                         if (allOneRun)
2696                         {
2697                                 /* Tell logtape.c we won't be writing anymore */
2698                                 LogicalTapeSetForgetFreeSpace(state->tapeset);
2699                                 /* Initialize for the final merge pass */
2700                                 beginmerge(state);
2701                                 state->status = TSS_FINALMERGE;
2702                                 return;
2703                         }
2704                 }
2705
2706                 /* Step D5: merge runs onto tape[T] until tape[P] is empty */
2707                 while (state->tp_runs[state->tapeRange - 1] ||
2708                            state->tp_dummy[state->tapeRange - 1])
2709                 {
2710                         bool            allDummy = true;
2711
2712                         for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
2713                         {
2714                                 if (state->tp_dummy[tapenum] == 0)
2715                                 {
2716                                         allDummy = false;
2717                                         break;
2718                                 }
2719                         }
2720
2721                         if (allDummy)
2722                         {
2723                                 state->tp_dummy[state->tapeRange]++;
2724                                 for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
2725                                         state->tp_dummy[tapenum]--;
2726                         }
2727                         else
2728                                 mergeonerun(state);
2729                 }
2730
2731                 /* Step D6: decrease level */
2732                 if (--state->Level == 0)
2733                         break;
2734                 /* rewind output tape T to use as new input */
2735                 LogicalTapeRewindForRead(state->tapeset, state->tp_tapenum[state->tapeRange],
2736                                                                  state->read_buffer_size);
2737                 /* rewind used-up input tape P, and prepare it for write pass */
2738                 LogicalTapeRewindForWrite(state->tapeset, state->tp_tapenum[state->tapeRange - 1]);
2739                 state->tp_runs[state->tapeRange - 1] = 0;
2740
2741                 /*
2742                  * reassign tape units per step D6; note we no longer care about A[]
2743                  */
2744                 svTape = state->tp_tapenum[state->tapeRange];
2745                 svDummy = state->tp_dummy[state->tapeRange];
2746                 svRuns = state->tp_runs[state->tapeRange];
2747                 for (tapenum = state->tapeRange; tapenum > 0; tapenum--)
2748                 {
2749                         state->tp_tapenum[tapenum] = state->tp_tapenum[tapenum - 1];
2750                         state->tp_dummy[tapenum] = state->tp_dummy[tapenum - 1];
2751                         state->tp_runs[tapenum] = state->tp_runs[tapenum - 1];
2752                 }
2753                 state->tp_tapenum[0] = svTape;
2754                 state->tp_dummy[0] = svDummy;
2755                 state->tp_runs[0] = svRuns;
2756         }
2757
2758         /*
2759          * Done.  Knuth says that the result is on TAPE[1], but since we exited
2760          * the loop without performing the last iteration of step D6, we have not
2761          * rearranged the tape unit assignment, and therefore the result is on
2762          * TAPE[T].  We need to do it this way so that we can freeze the final
2763          * output tape while rewinding it.  The last iteration of step D6 would be
2764          * a waste of cycles anyway...
2765          */
2766         state->result_tape = state->tp_tapenum[state->tapeRange];
2767         if (!WORKER(state))
2768                 LogicalTapeFreeze(state->tapeset, state->result_tape, NULL);
2769         else
2770                 worker_freeze_result_tape(state);
2771         state->status = TSS_SORTEDONTAPE;
2772
2773         /* Release the read buffers of all the other tapes, by rewinding them. */
2774         for (tapenum = 0; tapenum < state->maxTapes; tapenum++)
2775         {
2776                 if (tapenum != state->result_tape)
2777                         LogicalTapeRewindForWrite(state->tapeset, tapenum);
2778         }
2779 }
2780
2781 /*
2782  * Merge one run from each input tape, except ones with dummy runs.
2783  *
2784  * This is the inner loop of Algorithm D step D5.  We know that the
2785  * output tape is TAPE[T].
2786  */
2787 static void
2788 mergeonerun(Tuplesortstate *state)
2789 {
2790         int                     destTape = state->tp_tapenum[state->tapeRange];
2791         int                     srcTape;
2792
2793         /*
2794          * Start the merge by loading one tuple from each active source tape into
2795          * the heap.  We can also decrease the input run/dummy run counts.
2796          */
2797         beginmerge(state);
2798
2799         /*
2800          * Execute merge by repeatedly extracting lowest tuple in heap, writing it
2801          * out, and replacing it with next tuple from same tape (if there is
2802          * another one).
2803          */
2804         while (state->memtupcount > 0)
2805         {
2806                 SortTuple       stup;
2807
2808                 /* write the tuple to destTape */
2809                 srcTape = state->memtuples[0].tupindex;
2810                 WRITETUP(state, destTape, &state->memtuples[0]);
2811
2812                 /* recycle the slot of the tuple we just wrote out, for the next read */
2813                 if (state->memtuples[0].tuple)
2814                         RELEASE_SLAB_SLOT(state, state->memtuples[0].tuple);
2815
2816                 /*
2817                  * pull next tuple from the tape, and replace the written-out tuple in
2818                  * the heap with it.
2819                  */
2820                 if (mergereadnext(state, srcTape, &stup))
2821                 {
2822                         stup.tupindex = srcTape;
2823                         tuplesort_heap_replace_top(state, &stup);
2824
2825                 }
2826                 else
2827                         tuplesort_heap_delete_top(state);
2828         }
2829
2830         /*
2831          * When the heap empties, we're done.  Write an end-of-run marker on the
2832          * output tape, and increment its count of real runs.
2833          */
2834         markrunend(state, destTape);
2835         state->tp_runs[state->tapeRange]++;
2836
2837 #ifdef TRACE_SORT
2838         if (trace_sort)
2839                 elog(LOG, "worker %d finished %d-way merge step: %s", state->worker,
2840                          state->activeTapes, pg_rusage_show(&state->ru_start));
2841 #endif
2842 }
2843
2844 /*
2845  * beginmerge - initialize for a merge pass
2846  *
2847  * We decrease the counts of real and dummy runs for each tape, and mark
2848  * which tapes contain active input runs in mergeactive[].  Then, fill the
2849  * merge heap with the first tuple from each active tape.
2850  */
2851 static void
2852 beginmerge(Tuplesortstate *state)
2853 {
2854         int                     activeTapes;
2855         int                     tapenum;
2856         int                     srcTape;
2857
2858         /* Heap should be empty here */
2859         Assert(state->memtupcount == 0);
2860
2861         /* Adjust run counts and mark the active tapes */
2862         memset(state->mergeactive, 0,
2863                    state->maxTapes * sizeof(*state->mergeactive));
2864         activeTapes = 0;
2865         for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
2866         {
2867                 if (state->tp_dummy[tapenum] > 0)
2868                         state->tp_dummy[tapenum]--;
2869                 else
2870                 {
2871                         Assert(state->tp_runs[tapenum] > 0);
2872                         state->tp_runs[tapenum]--;
2873                         srcTape = state->tp_tapenum[tapenum];
2874                         state->mergeactive[srcTape] = true;
2875                         activeTapes++;
2876                 }
2877         }
2878         Assert(activeTapes > 0);
2879         state->activeTapes = activeTapes;
2880
2881         /* Load the merge heap with the first tuple from each input tape */
2882         for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
2883         {
2884                 SortTuple       tup;
2885
2886                 if (mergereadnext(state, srcTape, &tup))
2887                 {
2888                         tup.tupindex = srcTape;
2889                         tuplesort_heap_insert(state, &tup);
2890                 }
2891         }
2892 }
2893
2894 /*
2895  * mergereadnext - read next tuple from one merge input tape
2896  *
2897  * Returns false on EOF.
2898  */
2899 static bool
2900 mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup)
2901 {
2902         unsigned int tuplen;
2903
2904         if (!state->mergeactive[srcTape])
2905                 return false;                   /* tape's run is already exhausted */
2906
2907         /* read next tuple, if any */
2908         if ((tuplen = getlen(state, srcTape, true)) == 0)
2909         {
2910                 state->mergeactive[srcTape] = false;
2911                 return false;
2912         }
2913         READTUP(state, stup, srcTape, tuplen);
2914
2915         return true;
2916 }
2917
2918 /*
2919  * dumptuples - remove tuples from memtuples and write initial run to tape
2920  *
2921  * When alltuples = true, dump everything currently in memory.  (This case is
2922  * only used at end of input data.)
2923  */
2924 static void
2925 dumptuples(Tuplesortstate *state, bool alltuples)
2926 {
2927         int                     memtupwrite;
2928         int                     i;
2929
2930         /*
2931          * Nothing to do if we still fit in available memory and have array slots,
2932          * unless this is the final call during initial run generation.
2933          */
2934         if (state->memtupcount < state->memtupsize && !LACKMEM(state) &&
2935                 !alltuples)
2936                 return;
2937
2938         /*
2939          * Final call might require no sorting, in rare cases where we just so
2940          * happen to have previously LACKMEM()'d at the point where exactly all
2941          * remaining tuples are loaded into memory, just before input was
2942          * exhausted.
2943          *
2944          * In general, short final runs are quite possible.  Rather than allowing
2945          * a special case where there was a superfluous selectnewtape() call (i.e.
2946          * a call with no subsequent run actually written to destTape), we prefer
2947          * to write out a 0 tuple run.
2948          *
2949          * mergereadnext() is prepared for 0 tuple runs, and will reliably mark
2950          * the tape inactive for the merge when called from beginmerge().  This
2951          * case is therefore similar to the case where mergeonerun() finds a dummy
2952          * run for the tape, and so doesn't need to merge a run from the tape (or
2953          * conceptually "merges" the dummy run, if you prefer).  According to
2954          * Knuth, Algorithm D "isn't strictly optimal" in its method of
2955          * distribution and dummy run assignment; this edge case seems very
2956          * unlikely to make that appreciably worse.
2957          */
2958         Assert(state->status == TSS_BUILDRUNS);
2959
2960         /*
2961          * It seems unlikely that this limit will ever be exceeded, but take no
2962          * chances
2963          */
2964         if (state->currentRun == INT_MAX)
2965                 ereport(ERROR,
2966                                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
2967                                  errmsg("cannot have more than %d runs for an external sort",
2968                                                 INT_MAX)));
2969
2970         state->currentRun++;
2971
2972 #ifdef TRACE_SORT
2973         if (trace_sort)
2974                 elog(LOG, "worker %d starting quicksort of run %d: %s",
2975                          state->worker, state->currentRun,
2976                          pg_rusage_show(&state->ru_start));
2977 #endif
2978
2979         /*
2980          * Sort all tuples accumulated within the allowed amount of memory for
2981          * this run using quicksort
2982          */
2983         tuplesort_sort_memtuples(state);
2984
2985 #ifdef TRACE_SORT
2986         if (trace_sort)
2987                 elog(LOG, "worker %d finished quicksort of run %d: %s",
2988                          state->worker, state->currentRun,
2989                          pg_rusage_show(&state->ru_start));
2990 #endif
2991
2992         memtupwrite = state->memtupcount;
2993         for (i = 0; i < memtupwrite; i++)
2994         {
2995                 WRITETUP(state, state->tp_tapenum[state->destTape],
2996                                  &state->memtuples[i]);
2997                 state->memtupcount--;
2998         }
2999
3000         /*
3001          * Reset tuple memory.  We've freed all of the tuples that we previously
3002          * allocated.  It's important to avoid fragmentation when there is a stark
3003          * change in the sizes of incoming tuples.  Fragmentation due to
3004          * AllocSetFree's bucketing by size class might be particularly bad if
3005          * this step wasn't taken.
3006          */
3007         MemoryContextReset(state->tuplecontext);
3008
3009         markrunend(state, state->tp_tapenum[state->destTape]);
3010         state->tp_runs[state->destTape]++;
3011         state->tp_dummy[state->destTape]--; /* per Alg D step D2 */
3012
3013 #ifdef TRACE_SORT
3014         if (trace_sort)
3015                 elog(LOG, "worker %d finished writing run %d to tape %d: %s",
3016                          state->worker, state->currentRun, state->destTape,
3017                          pg_rusage_show(&state->ru_start));
3018 #endif
3019
3020         if (!alltuples)
3021                 selectnewtape(state);
3022 }
3023
3024 /*
3025  * tuplesort_rescan             - rewind and replay the scan
3026  */
3027 void
3028 tuplesort_rescan(Tuplesortstate *state)
3029 {
3030         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
3031
3032         Assert(state->randomAccess);
3033
3034         switch (state->status)
3035         {
3036                 case TSS_SORTEDINMEM:
3037                         state->current = 0;
3038                         state->eof_reached = false;
3039                         state->markpos_offset = 0;
3040                         state->markpos_eof = false;
3041                         break;
3042                 case TSS_SORTEDONTAPE:
3043                         LogicalTapeRewindForRead(state->tapeset,
3044                                                                          state->result_tape,
3045                                                                          0);
3046                         state->eof_reached = false;
3047                         state->markpos_block = 0L;
3048                         state->markpos_offset = 0;
3049                         state->markpos_eof = false;
3050                         break;
3051                 default:
3052                         elog(ERROR, "invalid tuplesort state");
3053                         break;
3054         }
3055
3056         MemoryContextSwitchTo(oldcontext);
3057 }
3058
3059 /*
3060  * tuplesort_markpos    - saves current position in the merged sort file
3061  */
3062 void
3063 tuplesort_markpos(Tuplesortstate *state)
3064 {
3065         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
3066
3067         Assert(state->randomAccess);
3068
3069         switch (state->status)
3070         {
3071                 case TSS_SORTEDINMEM:
3072                         state->markpos_offset = state->current;
3073                         state->markpos_eof = state->eof_reached;
3074                         break;
3075                 case TSS_SORTEDONTAPE:
3076                         LogicalTapeTell(state->tapeset,
3077                                                         state->result_tape,
3078                                                         &state->markpos_block,
3079                                                         &state->markpos_offset);
3080                         state->markpos_eof = state->eof_reached;
3081                         break;
3082                 default:
3083                         elog(ERROR, "invalid tuplesort state");
3084                         break;
3085         }
3086
3087         MemoryContextSwitchTo(oldcontext);
3088 }
3089
3090 /*
3091  * tuplesort_restorepos - restores current position in merged sort file to
3092  *                                                last saved position
3093  */
3094 void
3095 tuplesort_restorepos(Tuplesortstate *state)
3096 {
3097         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
3098
3099         Assert(state->randomAccess);
3100
3101         switch (state->status)
3102         {
3103                 case TSS_SORTEDINMEM:
3104                         state->current = state->markpos_offset;
3105                         state->eof_reached = state->markpos_eof;
3106                         break;
3107                 case TSS_SORTEDONTAPE:
3108                         LogicalTapeSeek(state->tapeset,
3109                                                         state->result_tape,
3110                                                         state->markpos_block,
3111                                                         state->markpos_offset);
3112                         state->eof_reached = state->markpos_eof;
3113                         break;
3114                 default:
3115                         elog(ERROR, "invalid tuplesort state");
3116                         break;
3117         }
3118
3119         MemoryContextSwitchTo(oldcontext);
3120 }
3121
3122 /*
3123  * tuplesort_get_stats - extract summary statistics
3124  *
3125  * This can be called after tuplesort_performsort() finishes to obtain
3126  * printable summary information about how the sort was performed.
3127  */
3128 void
3129 tuplesort_get_stats(Tuplesortstate *state,
3130                                         TuplesortInstrumentation *stats)
3131 {
3132         /*
3133          * Note: it might seem we should provide both memory and disk usage for a
3134          * disk-based sort.  However, the current code doesn't track memory space
3135          * accurately once we have begun to return tuples to the caller (since we
3136          * don't account for pfree's the caller is expected to do), so we cannot
3137          * rely on availMem in a disk sort.  This does not seem worth the overhead
3138          * to fix.  Is it worth creating an API for the memory context code to
3139          * tell us how much is actually used in sortcontext?
3140          */
3141         if (state->tapeset)
3142         {
3143                 stats->spaceType = SORT_SPACE_TYPE_DISK;
3144                 stats->spaceUsed = LogicalTapeSetBlocks(state->tapeset) * (BLCKSZ / 1024);
3145         }
3146         else
3147         {
3148                 stats->spaceType = SORT_SPACE_TYPE_MEMORY;
3149                 stats->spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
3150         }
3151
3152         switch (state->status)
3153         {
3154                 case TSS_SORTEDINMEM:
3155                         if (state->boundUsed)
3156                                 stats->sortMethod = SORT_TYPE_TOP_N_HEAPSORT;
3157                         else
3158                                 stats->sortMethod = SORT_TYPE_QUICKSORT;
3159                         break;
3160                 case TSS_SORTEDONTAPE:
3161                         stats->sortMethod = SORT_TYPE_EXTERNAL_SORT;
3162                         break;
3163                 case TSS_FINALMERGE:
3164                         stats->sortMethod = SORT_TYPE_EXTERNAL_MERGE;
3165                         break;
3166                 default:
3167                         stats->sortMethod = SORT_TYPE_STILL_IN_PROGRESS;
3168                         break;
3169         }
3170 }
3171
3172 /*
3173  * Convert TuplesortMethod to a string.
3174  */
3175 const char *
3176 tuplesort_method_name(TuplesortMethod m)
3177 {
3178         switch (m)
3179         {
3180                 case SORT_TYPE_STILL_IN_PROGRESS:
3181                         return "still in progress";
3182                 case SORT_TYPE_TOP_N_HEAPSORT:
3183                         return "top-N heapsort";
3184                 case SORT_TYPE_QUICKSORT:
3185                         return "quicksort";
3186                 case SORT_TYPE_EXTERNAL_SORT:
3187                         return "external sort";
3188                 case SORT_TYPE_EXTERNAL_MERGE:
3189                         return "external merge";
3190         }
3191
3192         return "unknown";
3193 }
3194
3195 /*
3196  * Convert TuplesortSpaceType to a string.
3197  */
3198 const char *
3199 tuplesort_space_type_name(TuplesortSpaceType t)
3200 {
3201         Assert(t == SORT_SPACE_TYPE_DISK || t == SORT_SPACE_TYPE_MEMORY);
3202         return t == SORT_SPACE_TYPE_DISK ? "Disk" : "Memory";
3203 }
3204
3205
3206 /*
3207  * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
3208  */
3209
3210 /*
3211  * Convert the existing unordered array of SortTuples to a bounded heap,
3212  * discarding all but the smallest "state->bound" tuples.
3213  *
3214  * When working with a bounded heap, we want to keep the largest entry
3215  * at the root (array entry zero), instead of the smallest as in the normal
3216  * sort case.  This allows us to discard the largest entry cheaply.
3217  * Therefore, we temporarily reverse the sort direction.
3218  */
3219 static void
3220 make_bounded_heap(Tuplesortstate *state)
3221 {
3222         int                     tupcount = state->memtupcount;
3223         int                     i;
3224
3225         Assert(state->status == TSS_INITIAL);
3226         Assert(state->bounded);
3227         Assert(tupcount >= state->bound);
3228         Assert(SERIAL(state));
3229
3230         /* Reverse sort direction so largest entry will be at root */
3231         reversedirection(state);
3232
3233         state->memtupcount = 0;         /* make the heap empty */
3234         for (i = 0; i < tupcount; i++)
3235         {
3236                 if (state->memtupcount < state->bound)
3237                 {
3238                         /* Insert next tuple into heap */
3239                         /* Must copy source tuple to avoid possible overwrite */
3240                         SortTuple       stup = state->memtuples[i];
3241
3242                         tuplesort_heap_insert(state, &stup);
3243                 }
3244                 else
3245                 {
3246                         /*
3247                          * The heap is full.  Replace the largest entry with the new
3248                          * tuple, or just discard it, if it's larger than anything already
3249                          * in the heap.
3250                          */
3251                         if (COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0)
3252                         {
3253                                 free_sort_tuple(state, &state->memtuples[i]);
3254                                 CHECK_FOR_INTERRUPTS();
3255                         }
3256                         else
3257                                 tuplesort_heap_replace_top(state, &state->memtuples[i]);
3258                 }
3259         }
3260
3261         Assert(state->memtupcount == state->bound);
3262         state->status = TSS_BOUNDED;
3263 }
3264
3265 /*
3266  * Convert the bounded heap to a properly-sorted array
3267  */
3268 static void
3269 sort_bounded_heap(Tuplesortstate *state)
3270 {
3271         int                     tupcount = state->memtupcount;
3272
3273         Assert(state->status == TSS_BOUNDED);
3274         Assert(state->bounded);
3275         Assert(tupcount == state->bound);
3276         Assert(SERIAL(state));
3277
3278         /*
3279          * We can unheapify in place because each delete-top call will remove the
3280          * largest entry, which we can promptly store in the newly freed slot at
3281          * the end.  Once we're down to a single-entry heap, we're done.
3282          */
3283         while (state->memtupcount > 1)
3284         {
3285                 SortTuple       stup = state->memtuples[0];
3286
3287                 /* this sifts-up the next-largest entry and decreases memtupcount */
3288                 tuplesort_heap_delete_top(state);
3289                 state->memtuples[state->memtupcount] = stup;
3290         }
3291         state->memtupcount = tupcount;
3292
3293         /*
3294          * Reverse sort direction back to the original state.  This is not
3295          * actually necessary but seems like a good idea for tidiness.
3296          */
3297         reversedirection(state);
3298
3299         state->status = TSS_SORTEDINMEM;
3300         state->boundUsed = true;
3301 }
3302
3303 /*
3304  * Sort all memtuples using specialized qsort() routines.
3305  *
3306  * Quicksort is used for small in-memory sorts, and external sort runs.
3307  */
3308 static void
3309 tuplesort_sort_memtuples(Tuplesortstate *state)
3310 {
3311         Assert(!LEADER(state));
3312
3313         if (state->memtupcount > 1)
3314         {
3315                 /* Can we use the single-key sort function? */
3316                 if (state->onlyKey != NULL)
3317                         qsort_ssup(state->memtuples, state->memtupcount,
3318                                            state->onlyKey);
3319                 else
3320                         qsort_tuple(state->memtuples,
3321                                                 state->memtupcount,
3322                                                 state->comparetup,
3323                                                 state);
3324         }
3325 }
3326
3327 /*
3328  * Insert a new tuple into an empty or existing heap, maintaining the
3329  * heap invariant.  Caller is responsible for ensuring there's room.
3330  *
3331  * Note: For some callers, tuple points to a memtuples[] entry above the
3332  * end of the heap.  This is safe as long as it's not immediately adjacent
3333  * to the end of the heap (ie, in the [memtupcount] array entry) --- if it
3334  * is, it might get overwritten before being moved into the heap!
3335  */
3336 static void
3337 tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple)
3338 {
3339         SortTuple  *memtuples;
3340         int                     j;
3341
3342         memtuples = state->memtuples;
3343         Assert(state->memtupcount < state->memtupsize);
3344
3345         CHECK_FOR_INTERRUPTS();
3346
3347         /*
3348          * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is
3349          * using 1-based array indexes, not 0-based.
3350          */
3351         j = state->memtupcount++;
3352         while (j > 0)
3353         {
3354                 int                     i = (j - 1) >> 1;
3355
3356                 if (COMPARETUP(state, tuple, &memtuples[i]) >= 0)
3357                         break;
3358                 memtuples[j] = memtuples[i];
3359                 j = i;
3360         }
3361         memtuples[j] = *tuple;
3362 }
3363
3364 /*
3365  * Remove the tuple at state->memtuples[0] from the heap.  Decrement
3366  * memtupcount, and sift up to maintain the heap invariant.
3367  *
3368  * The caller has already free'd the tuple the top node points to,
3369  * if necessary.
3370  */
3371 static void
3372 tuplesort_heap_delete_top(Tuplesortstate *state)
3373 {
3374         SortTuple  *memtuples = state->memtuples;
3375         SortTuple  *tuple;
3376
3377         if (--state->memtupcount <= 0)
3378                 return;
3379
3380         /*
3381          * Remove the last tuple in the heap, and re-insert it, by replacing the
3382          * current top node with it.
3383          */
3384         tuple = &memtuples[state->memtupcount];
3385         tuplesort_heap_replace_top(state, tuple);
3386 }
3387
3388 /*
3389  * Replace the tuple at state->memtuples[0] with a new tuple.  Sift up to
3390  * maintain the heap invariant.
3391  *
3392  * This corresponds to Knuth's "sift-up" algorithm (Algorithm 5.2.3H,
3393  * Heapsort, steps H3-H8).
3394  */
3395 static void
3396 tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple)
3397 {
3398         SortTuple  *memtuples = state->memtuples;
3399         unsigned int i,
3400                                 n;
3401
3402         Assert(state->memtupcount >= 1);
3403
3404         CHECK_FOR_INTERRUPTS();
3405
3406         /*
3407          * state->memtupcount is "int", but we use "unsigned int" for i, j, n.
3408          * This prevents overflow in the "2 * i + 1" calculation, since at the top
3409          * of the loop we must have i < n <= INT_MAX <= UINT_MAX/2.
3410          */
3411         n = state->memtupcount;
3412         i = 0;                                          /* i is where the "hole" is */
3413         for (;;)
3414         {
3415                 unsigned int j = 2 * i + 1;
3416
3417                 if (j >= n)
3418                         break;
3419                 if (j + 1 < n &&
3420                         COMPARETUP(state, &memtuples[j], &memtuples[j + 1]) > 0)
3421                         j++;
3422                 if (COMPARETUP(state, tuple, &memtuples[j]) <= 0)
3423                         break;
3424                 memtuples[i] = memtuples[j];
3425                 i = j;
3426         }
3427         memtuples[i] = *tuple;
3428 }
3429
3430 /*
3431  * Function to reverse the sort direction from its current state
3432  *
3433  * It is not safe to call this when performing hash tuplesorts
3434  */
3435 static void
3436 reversedirection(Tuplesortstate *state)
3437 {
3438         SortSupport sortKey = state->sortKeys;
3439         int                     nkey;
3440
3441         for (nkey = 0; nkey < state->nKeys; nkey++, sortKey++)
3442         {
3443                 sortKey->ssup_reverse = !sortKey->ssup_reverse;
3444                 sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first;
3445         }
3446 }
3447
3448
3449 /*
3450  * Tape interface routines
3451  */
3452
3453 static unsigned int
3454 getlen(Tuplesortstate *state, int tapenum, bool eofOK)
3455 {
3456         unsigned int len;
3457
3458         if (LogicalTapeRead(state->tapeset, tapenum,
3459                                                 &len, sizeof(len)) != sizeof(len))
3460                 elog(ERROR, "unexpected end of tape");
3461         if (len == 0 && !eofOK)
3462                 elog(ERROR, "unexpected end of data");
3463         return len;
3464 }
3465
3466 static void
3467 markrunend(Tuplesortstate *state, int tapenum)
3468 {
3469         unsigned int len = 0;
3470
3471         LogicalTapeWrite(state->tapeset, tapenum, (void *) &len, sizeof(len));
3472 }
3473
3474 /*
3475  * Get memory for tuple from within READTUP() routine.
3476  *
3477  * We use next free slot from the slab allocator, or palloc() if the tuple
3478  * is too large for that.
3479  */
3480 static void *
3481 readtup_alloc(Tuplesortstate *state, Size tuplen)
3482 {
3483         SlabSlot   *buf;
3484
3485         /*
3486          * We pre-allocate enough slots in the slab arena that we should never run
3487          * out.
3488          */
3489         Assert(state->slabFreeHead);
3490
3491         if (tuplen > SLAB_SLOT_SIZE || !state->slabFreeHead)
3492                 return MemoryContextAlloc(state->sortcontext, tuplen);
3493         else
3494         {
3495                 buf = state->slabFreeHead;
3496                 /* Reuse this slot */
3497                 state->slabFreeHead = buf->nextfree;
3498
3499                 return buf;
3500         }
3501 }
3502
3503
3504 /*
3505  * Routines specialized for HeapTuple (actually MinimalTuple) case
3506  */
3507
3508 static int
3509 comparetup_heap(const SortTuple *a, const SortTuple *b, Tuplesortstate *state)
3510 {
3511         SortSupport sortKey = state->sortKeys;
3512         HeapTupleData ltup;
3513         HeapTupleData rtup;
3514         TupleDesc       tupDesc;
3515         int                     nkey;
3516         int32           compare;
3517         AttrNumber      attno;
3518         Datum           datum1,
3519                                 datum2;
3520         bool            isnull1,
3521                                 isnull2;
3522
3523
3524         /* Compare the leading sort key */
3525         compare = ApplySortComparator(a->datum1, a->isnull1,
3526                                                                   b->datum1, b->isnull1,
3527                                                                   sortKey);
3528         if (compare != 0)
3529                 return compare;
3530
3531         /* Compare additional sort keys */
3532         ltup.t_len = ((MinimalTuple) a->tuple)->t_len + MINIMAL_TUPLE_OFFSET;
3533         ltup.t_data = (HeapTupleHeader) ((char *) a->tuple - MINIMAL_TUPLE_OFFSET);
3534         rtup.t_len = ((MinimalTuple) b->tuple)->t_len + MINIMAL_TUPLE_OFFSET;
3535         rtup.t_data = (HeapTupleHeader) ((char *) b->tuple - MINIMAL_TUPLE_OFFSET);
3536         tupDesc = state->tupDesc;
3537
3538         if (sortKey->abbrev_converter)
3539         {
3540                 attno = sortKey->ssup_attno;
3541
3542                 datum1 = heap_getattr(&ltup, attno, tupDesc, &isnull1);
3543                 datum2 = heap_getattr(&rtup, attno, tupDesc, &isnull2);
3544
3545                 compare = ApplySortAbbrevFullComparator(datum1, isnull1,
3546                                                                                                 datum2, isnull2,
3547                                                                                                 sortKey);
3548                 if (compare != 0)
3549                         return compare;
3550         }
3551
3552         sortKey++;
3553         for (nkey = 1; nkey < state->nKeys; nkey++, sortKey++)
3554         {
3555                 attno = sortKey->ssup_attno;
3556
3557                 datum1 = heap_getattr(&ltup, attno, tupDesc, &isnull1);
3558                 datum2 = heap_getattr(&rtup, attno, tupDesc, &isnull2);
3559
3560                 compare = ApplySortComparator(datum1, isnull1,
3561                                                                           datum2, isnull2,
3562                                                                           sortKey);
3563                 if (compare != 0)
3564                         return compare;
3565         }
3566
3567         return 0;
3568 }
3569
3570 static void
3571 copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup)
3572 {
3573         /*
3574          * We expect the passed "tup" to be a TupleTableSlot, and form a
3575          * MinimalTuple using the exported interface for that.
3576          */
3577         TupleTableSlot *slot = (TupleTableSlot *) tup;
3578         Datum           original;
3579         MinimalTuple tuple;
3580         HeapTupleData htup;
3581         MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
3582
3583         /* copy the tuple into sort storage */
3584         tuple = ExecCopySlotMinimalTuple(slot);
3585         stup->tuple = (void *) tuple;
3586         USEMEM(state, GetMemoryChunkSpace(tuple));
3587         /* set up first-column key value */
3588         htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET;
3589         htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET);
3590         original = heap_getattr(&htup,
3591                                                         state->sortKeys[0].ssup_attno,
3592                                                         state->tupDesc,
3593                                                         &stup->isnull1);
3594
3595         MemoryContextSwitchTo(oldcontext);
3596
3597         if (!state->sortKeys->abbrev_converter || stup->isnull1)
3598         {
3599                 /*
3600                  * Store ordinary Datum representation, or NULL value.  If there is a
3601                  * converter it won't expect NULL values, and cost model is not
3602                  * required to account for NULL, so in that case we avoid calling
3603                  * converter and just set datum1 to zeroed representation (to be
3604                  * consistent, and to support cheap inequality tests for NULL
3605                  * abbreviated keys).
3606                  */
3607                 stup->datum1 = original;
3608         }
3609         else if (!consider_abort_common(state))
3610         {
3611                 /* Store abbreviated key representation */
3612                 stup->datum1 = state->sortKeys->abbrev_converter(original,
3613                                                                                                                  state->sortKeys);
3614         }
3615         else
3616         {
3617                 /* Abort abbreviation */
3618                 int                     i;
3619
3620                 stup->datum1 = original;
3621
3622                 /*
3623                  * Set state to be consistent with never trying abbreviation.
3624                  *
3625                  * Alter datum1 representation in already-copied tuples, so as to
3626                  * ensure a consistent representation (current tuple was just
3627                  * handled).  It does not matter if some dumped tuples are already
3628                  * sorted on tape, since serialized tuples lack abbreviated keys
3629                  * (TSS_BUILDRUNS state prevents control reaching here in any case).
3630                  */
3631                 for (i = 0; i < state->memtupcount; i++)
3632                 {
3633                         SortTuple  *mtup = &state->memtuples[i];
3634
3635                         htup.t_len = ((MinimalTuple) mtup->tuple)->t_len +
3636                                 MINIMAL_TUPLE_OFFSET;
3637                         htup.t_data = (HeapTupleHeader) ((char *) mtup->tuple -
3638                                                                                          MINIMAL_TUPLE_OFFSET);
3639
3640                         mtup->datum1 = heap_getattr(&htup,
3641                                                                                 state->sortKeys[0].ssup_attno,
3642                                                                                 state->tupDesc,
3643                                                                                 &mtup->isnull1);
3644                 }
3645         }
3646 }
3647
3648 static void
3649 writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup)
3650 {
3651         MinimalTuple tuple = (MinimalTuple) stup->tuple;
3652
3653         /* the part of the MinimalTuple we'll write: */
3654         char       *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
3655         unsigned int tupbodylen = tuple->t_len - MINIMAL_TUPLE_DATA_OFFSET;
3656
3657         /* total on-disk footprint: */
3658         unsigned int tuplen = tupbodylen + sizeof(int);
3659
3660         LogicalTapeWrite(state->tapeset, tapenum,
3661                                          (void *) &tuplen, sizeof(tuplen));
3662         LogicalTapeWrite(state->tapeset, tapenum,
3663                                          (void *) tupbody, tupbodylen);
3664         if (state->randomAccess)        /* need trailing length word? */
3665                 LogicalTapeWrite(state->tapeset, tapenum,
3666                                                  (void *) &tuplen, sizeof(tuplen));
3667
3668         if (!state->slabAllocatorUsed)
3669         {
3670                 FREEMEM(state, GetMemoryChunkSpace(tuple));
3671                 heap_free_minimal_tuple(tuple);
3672         }
3673 }
3674
3675 static void
3676 readtup_heap(Tuplesortstate *state, SortTuple *stup,
3677                          int tapenum, unsigned int len)
3678 {
3679         unsigned int tupbodylen = len - sizeof(int);
3680         unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET;
3681         MinimalTuple tuple = (MinimalTuple) readtup_alloc(state, tuplen);
3682         char       *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
3683         HeapTupleData htup;
3684
3685         /* read in the tuple proper */
3686         tuple->t_len = tuplen;
3687         LogicalTapeReadExact(state->tapeset, tapenum,
3688                                                  tupbody, tupbodylen);
3689         if (state->randomAccess)        /* need trailing length word? */
3690                 LogicalTapeReadExact(state->tapeset, tapenum,
3691                                                          &tuplen, sizeof(tuplen));
3692         stup->tuple = (void *) tuple;
3693         /* set up first-column key value */
3694         htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET;
3695         htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET);
3696         stup->datum1 = heap_getattr(&htup,
3697                                                                 state->sortKeys[0].ssup_attno,
3698                                                                 state->tupDesc,
3699                                                                 &stup->isnull1);
3700 }
3701
3702 /*
3703  * Routines specialized for the CLUSTER case (HeapTuple data, with
3704  * comparisons per a btree index definition)
3705  */
3706
3707 static int
3708 comparetup_cluster(const SortTuple *a, const SortTuple *b,
3709                                    Tuplesortstate *state)
3710 {
3711         SortSupport sortKey = state->sortKeys;
3712         HeapTuple       ltup;
3713         HeapTuple       rtup;
3714         TupleDesc       tupDesc;
3715         int                     nkey;
3716         int32           compare;
3717         Datum           datum1,
3718                                 datum2;
3719         bool            isnull1,
3720                                 isnull2;
3721         AttrNumber      leading = state->indexInfo->ii_IndexAttrNumbers[0];
3722
3723         /* Be prepared to compare additional sort keys */
3724         ltup = (HeapTuple) a->tuple;
3725         rtup = (HeapTuple) b->tuple;
3726         tupDesc = state->tupDesc;
3727
3728         /* Compare the leading sort key, if it's simple */
3729         if (leading != 0)
3730         {
3731                 compare = ApplySortComparator(a->datum1, a->isnull1,
3732                                                                           b->datum1, b->isnull1,
3733                                                                           sortKey);
3734                 if (compare != 0)
3735                         return compare;
3736
3737                 if (sortKey->abbrev_converter)
3738                 {
3739                         datum1 = heap_getattr(ltup, leading, tupDesc, &isnull1);
3740                         datum2 = heap_getattr(rtup, leading, tupDesc, &isnull2);
3741
3742                         compare = ApplySortAbbrevFullComparator(datum1, isnull1,
3743                                                                                                         datum2, isnull2,
3744                                                                                                         sortKey);
3745                 }
3746                 if (compare != 0 || state->nKeys == 1)
3747                         return compare;
3748                 /* Compare additional columns the hard way */
3749                 sortKey++;
3750                 nkey = 1;
3751         }
3752         else
3753         {
3754                 /* Must compare all keys the hard way */
3755                 nkey = 0;
3756         }
3757
3758         if (state->indexInfo->ii_Expressions == NULL)
3759         {
3760                 /* If not expression index, just compare the proper heap attrs */
3761
3762                 for (; nkey < state->nKeys; nkey++, sortKey++)
3763                 {
3764                         AttrNumber      attno = state->indexInfo->ii_IndexAttrNumbers[nkey];
3765
3766                         datum1 = heap_getattr(ltup, attno, tupDesc, &isnull1);
3767                         datum2 = heap_getattr(rtup, attno, tupDesc, &isnull2);
3768
3769                         compare = ApplySortComparator(datum1, isnull1,
3770                                                                                   datum2, isnull2,
3771                                                                                   sortKey);
3772                         if (compare != 0)
3773                                 return compare;
3774                 }
3775         }
3776         else
3777         {
3778                 /*
3779                  * In the expression index case, compute the whole index tuple and
3780                  * then compare values.  It would perhaps be faster to compute only as
3781                  * many columns as we need to compare, but that would require
3782                  * duplicating all the logic in FormIndexDatum.
3783                  */
3784                 Datum           l_index_values[INDEX_MAX_KEYS];
3785                 bool            l_index_isnull[INDEX_MAX_KEYS];
3786                 Datum           r_index_values[INDEX_MAX_KEYS];
3787                 bool            r_index_isnull[INDEX_MAX_KEYS];
3788                 TupleTableSlot *ecxt_scantuple;
3789
3790                 /* Reset context each time to prevent memory leakage */
3791                 ResetPerTupleExprContext(state->estate);
3792
3793                 ecxt_scantuple = GetPerTupleExprContext(state->estate)->ecxt_scantuple;
3794
3795                 ExecStoreHeapTuple(ltup, ecxt_scantuple, false);
3796                 FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate,
3797                                            l_index_values, l_index_isnull);
3798
3799                 ExecStoreHeapTuple(rtup, ecxt_scantuple, false);
3800                 FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate,
3801                                            r_index_values, r_index_isnull);
3802
3803                 for (; nkey < state->nKeys; nkey++, sortKey++)
3804                 {
3805                         compare = ApplySortComparator(l_index_values[nkey],
3806                                                                                   l_index_isnull[nkey],
3807                                                                                   r_index_values[nkey],
3808                                                                                   r_index_isnull[nkey],
3809                                                                                   sortKey);
3810                         if (compare != 0)
3811                                 return compare;
3812                 }
3813         }
3814
3815         return 0;
3816 }
3817
3818 static void
3819 copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup)
3820 {
3821         HeapTuple       tuple = (HeapTuple) tup;
3822         Datum           original;
3823         MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
3824
3825         /* copy the tuple into sort storage */
3826         tuple = heap_copytuple(tuple);
3827         stup->tuple = (void *) tuple;
3828         USEMEM(state, GetMemoryChunkSpace(tuple));
3829
3830         MemoryContextSwitchTo(oldcontext);
3831
3832         /*
3833          * set up first-column key value, and potentially abbreviate, if it's a
3834          * simple column
3835          */
3836         if (state->indexInfo->ii_IndexAttrNumbers[0] == 0)
3837                 return;
3838
3839         original = heap_getattr(tuple,
3840                                                         state->indexInfo->ii_IndexAttrNumbers[0],
3841                                                         state->tupDesc,
3842                                                         &stup->isnull1);
3843
3844         if (!state->sortKeys->abbrev_converter || stup->isnull1)
3845         {
3846                 /*
3847                  * Store ordinary Datum representation, or NULL value.  If there is a
3848                  * converter it won't expect NULL values, and cost model is not
3849                  * required to account for NULL, so in that case we avoid calling
3850                  * converter and just set datum1 to zeroed representation (to be
3851                  * consistent, and to support cheap inequality tests for NULL
3852                  * abbreviated keys).
3853                  */
3854                 stup->datum1 = original;
3855         }
3856         else if (!consider_abort_common(state))
3857         {
3858                 /* Store abbreviated key representation */
3859                 stup->datum1 = state->sortKeys->abbrev_converter(original,
3860                                                                                                                  state->sortKeys);
3861         }
3862         else
3863         {
3864                 /* Abort abbreviation */
3865                 int                     i;
3866
3867                 stup->datum1 = original;
3868
3869                 /*
3870                  * Set state to be consistent with never trying abbreviation.
3871                  *
3872                  * Alter datum1 representation in already-copied tuples, so as to
3873                  * ensure a consistent representation (current tuple was just
3874                  * handled).  It does not matter if some dumped tuples are already
3875                  * sorted on tape, since serialized tuples lack abbreviated keys
3876                  * (TSS_BUILDRUNS state prevents control reaching here in any case).
3877                  */
3878                 for (i = 0; i < state->memtupcount; i++)
3879                 {
3880                         SortTuple  *mtup = &state->memtuples[i];
3881
3882                         tuple = (HeapTuple) mtup->tuple;
3883                         mtup->datum1 = heap_getattr(tuple,
3884                                                                                 state->indexInfo->ii_IndexAttrNumbers[0],
3885                                                                                 state->tupDesc,
3886                                                                                 &mtup->isnull1);
3887                 }
3888         }
3889 }
3890
3891 static void
3892 writetup_cluster(Tuplesortstate *state, int tapenum, SortTuple *stup)
3893 {
3894         HeapTuple       tuple = (HeapTuple) stup->tuple;
3895         unsigned int tuplen = tuple->t_len + sizeof(ItemPointerData) + sizeof(int);
3896
3897         /* We need to store t_self, but not other fields of HeapTupleData */
3898         LogicalTapeWrite(state->tapeset, tapenum,
3899                                          &tuplen, sizeof(tuplen));
3900         LogicalTapeWrite(state->tapeset, tapenum,
3901                                          &tuple->t_self, sizeof(ItemPointerData));
3902         LogicalTapeWrite(state->tapeset, tapenum,
3903                                          tuple->t_data, tuple->t_len);
3904         if (state->randomAccess)        /* need trailing length word? */
3905                 LogicalTapeWrite(state->tapeset, tapenum,
3906                                                  &tuplen, sizeof(tuplen));
3907
3908         if (!state->slabAllocatorUsed)
3909         {
3910                 FREEMEM(state, GetMemoryChunkSpace(tuple));
3911                 heap_freetuple(tuple);
3912         }
3913 }
3914
3915 static void
3916 readtup_cluster(Tuplesortstate *state, SortTuple *stup,
3917                                 int tapenum, unsigned int tuplen)
3918 {
3919         unsigned int t_len = tuplen - sizeof(ItemPointerData) - sizeof(int);
3920         HeapTuple       tuple = (HeapTuple) readtup_alloc(state,
3921                                                                                                   t_len + HEAPTUPLESIZE);
3922
3923         /* Reconstruct the HeapTupleData header */
3924         tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
3925         tuple->t_len = t_len;
3926         LogicalTapeReadExact(state->tapeset, tapenum,
3927                                                  &tuple->t_self, sizeof(ItemPointerData));
3928         /* We don't currently bother to reconstruct t_tableOid */
3929         tuple->t_tableOid = InvalidOid;
3930         /* Read in the tuple body */
3931         LogicalTapeReadExact(state->tapeset, tapenum,
3932                                                  tuple->t_data, tuple->t_len);
3933         if (state->randomAccess)        /* need trailing length word? */
3934                 LogicalTapeReadExact(state->tapeset, tapenum,
3935                                                          &tuplen, sizeof(tuplen));
3936         stup->tuple = (void *) tuple;
3937         /* set up first-column key value, if it's a simple column */
3938         if (state->indexInfo->ii_IndexAttrNumbers[0] != 0)
3939                 stup->datum1 = heap_getattr(tuple,
3940                                                                         state->indexInfo->ii_IndexAttrNumbers[0],
3941                                                                         state->tupDesc,
3942                                                                         &stup->isnull1);
3943 }
3944
3945 /*
3946  * Routines specialized for IndexTuple case
3947  *
3948  * The btree and hash cases require separate comparison functions, but the
3949  * IndexTuple representation is the same so the copy/write/read support
3950  * functions can be shared.
3951  */
3952
3953 static int
3954 comparetup_index_btree(const SortTuple *a, const SortTuple *b,
3955                                            Tuplesortstate *state)
3956 {
3957         /*
3958          * This is similar to comparetup_heap(), but expects index tuples.  There
3959          * is also special handling for enforcing uniqueness, and special
3960          * treatment for equal keys at the end.
3961          */
3962         SortSupport sortKey = state->sortKeys;
3963         IndexTuple      tuple1;
3964         IndexTuple      tuple2;
3965         int                     keysz;
3966         TupleDesc       tupDes;
3967         bool            equal_hasnull = false;
3968         int                     nkey;
3969         int32           compare;
3970         Datum           datum1,
3971                                 datum2;
3972         bool            isnull1,
3973                                 isnull2;
3974
3975
3976         /* Compare the leading sort key */
3977         compare = ApplySortComparator(a->datum1, a->isnull1,
3978                                                                   b->datum1, b->isnull1,
3979                                                                   sortKey);
3980         if (compare != 0)
3981                 return compare;
3982
3983         /* Compare additional sort keys */
3984         tuple1 = (IndexTuple) a->tuple;
3985         tuple2 = (IndexTuple) b->tuple;
3986         keysz = state->nKeys;
3987         tupDes = RelationGetDescr(state->indexRel);
3988
3989         if (sortKey->abbrev_converter)
3990         {
3991                 datum1 = index_getattr(tuple1, 1, tupDes, &isnull1);
3992                 datum2 = index_getattr(tuple2, 1, tupDes, &isnull2);
3993
3994                 compare = ApplySortAbbrevFullComparator(datum1, isnull1,
3995                                                                                                 datum2, isnull2,
3996                                                                                                 sortKey);
3997                 if (compare != 0)
3998                         return compare;
3999         }
4000
4001         /* they are equal, so we only need to examine one null flag */
4002         if (a->isnull1)
4003                 equal_hasnull = true;
4004
4005         sortKey++;
4006         for (nkey = 2; nkey <= keysz; nkey++, sortKey++)
4007         {
4008                 datum1 = index_getattr(tuple1, nkey, tupDes, &isnull1);
4009                 datum2 = index_getattr(tuple2, nkey, tupDes, &isnull2);
4010
4011                 compare = ApplySortComparator(datum1, isnull1,
4012                                                                           datum2, isnull2,
4013                                                                           sortKey);
4014                 if (compare != 0)
4015                         return compare;         /* done when we find unequal attributes */
4016
4017                 /* they are equal, so we only need to examine one null flag */
4018                 if (isnull1)
4019                         equal_hasnull = true;
4020         }
4021
4022         /*
4023          * If btree has asked us to enforce uniqueness, complain if two equal
4024          * tuples are detected (unless there was at least one NULL field).
4025          *
4026          * It is sufficient to make the test here, because if two tuples are equal
4027          * they *must* get compared at some stage of the sort --- otherwise the
4028          * sort algorithm wouldn't have checked whether one must appear before the
4029          * other.
4030          */
4031         if (state->enforceUnique && !equal_hasnull)
4032         {
4033                 Datum           values[INDEX_MAX_KEYS];
4034                 bool            isnull[INDEX_MAX_KEYS];
4035                 char       *key_desc;
4036
4037                 /*
4038                  * Some rather brain-dead implementations of qsort (such as the one in
4039                  * QNX 4) will sometimes call the comparison routine to compare a
4040                  * value to itself, but we always use our own implementation, which
4041                  * does not.
4042                  */
4043                 Assert(tuple1 != tuple2);
4044
4045                 index_deform_tuple(tuple1, tupDes, values, isnull);
4046
4047                 key_desc = BuildIndexValueDescription(state->indexRel, values, isnull);
4048
4049                 ereport(ERROR,
4050                                 (errcode(ERRCODE_UNIQUE_VIOLATION),
4051                                  errmsg("could not create unique index \"%s\"",
4052                                                 RelationGetRelationName(state->indexRel)),
4053                                  key_desc ? errdetail("Key %s is duplicated.", key_desc) :
4054                                  errdetail("Duplicate keys exist."),
4055                                  errtableconstraint(state->heapRel,
4056                                                                         RelationGetRelationName(state->indexRel))));
4057         }
4058
4059         /*
4060          * If key values are equal, we sort on ItemPointer.  This is required for
4061          * btree indexes, since heap TID is treated as an implicit last key
4062          * attribute in order to ensure that all keys in the index are physically
4063          * unique.
4064          */
4065         {
4066                 BlockNumber blk1 = ItemPointerGetBlockNumber(&tuple1->t_tid);
4067                 BlockNumber blk2 = ItemPointerGetBlockNumber(&tuple2->t_tid);
4068
4069                 if (blk1 != blk2)
4070                         return (blk1 < blk2) ? -1 : 1;
4071         }
4072         {
4073                 OffsetNumber pos1 = ItemPointerGetOffsetNumber(&tuple1->t_tid);
4074                 OffsetNumber pos2 = ItemPointerGetOffsetNumber(&tuple2->t_tid);
4075
4076                 if (pos1 != pos2)
4077                         return (pos1 < pos2) ? -1 : 1;
4078         }
4079
4080         /* ItemPointer values should never be equal */
4081         Assert(false);
4082
4083         return 0;
4084 }
4085
4086 static int
4087 comparetup_index_hash(const SortTuple *a, const SortTuple *b,
4088                                           Tuplesortstate *state)
4089 {
4090         Bucket          bucket1;
4091         Bucket          bucket2;
4092         IndexTuple      tuple1;
4093         IndexTuple      tuple2;
4094
4095         /*
4096          * Fetch hash keys and mask off bits we don't want to sort by. We know
4097          * that the first column of the index tuple is the hash key.
4098          */
4099         Assert(!a->isnull1);
4100         bucket1 = _hash_hashkey2bucket(DatumGetUInt32(a->datum1),
4101                                                                    state->max_buckets, state->high_mask,
4102                                                                    state->low_mask);
4103         Assert(!b->isnull1);
4104         bucket2 = _hash_hashkey2bucket(DatumGetUInt32(b->datum1),
4105                                                                    state->max_buckets, state->high_mask,
4106                                                                    state->low_mask);
4107         if (bucket1 > bucket2)
4108                 return 1;
4109         else if (bucket1 < bucket2)
4110                 return -1;
4111
4112         /*
4113          * If hash values are equal, we sort on ItemPointer.  This does not affect
4114          * validity of the finished index, but it may be useful to have index
4115          * scans in physical order.
4116          */
4117         tuple1 = (IndexTuple) a->tuple;
4118         tuple2 = (IndexTuple) b->tuple;
4119
4120         {
4121                 BlockNumber blk1 = ItemPointerGetBlockNumber(&tuple1->t_tid);
4122                 BlockNumber blk2 = ItemPointerGetBlockNumber(&tuple2->t_tid);
4123
4124                 if (blk1 != blk2)
4125                         return (blk1 < blk2) ? -1 : 1;
4126         }
4127         {
4128                 OffsetNumber pos1 = ItemPointerGetOffsetNumber(&tuple1->t_tid);
4129                 OffsetNumber pos2 = ItemPointerGetOffsetNumber(&tuple2->t_tid);
4130
4131                 if (pos1 != pos2)
4132                         return (pos1 < pos2) ? -1 : 1;
4133         }
4134
4135         /* ItemPointer values should never be equal */
4136         Assert(false);
4137
4138         return 0;
4139 }
4140
4141 static void
4142 copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup)
4143 {
4144         IndexTuple      tuple = (IndexTuple) tup;
4145         unsigned int tuplen = IndexTupleSize(tuple);
4146         IndexTuple      newtuple;
4147         Datum           original;
4148
4149         /* copy the tuple into sort storage */
4150         newtuple = (IndexTuple) MemoryContextAlloc(state->tuplecontext, tuplen);
4151         memcpy(newtuple, tuple, tuplen);
4152         USEMEM(state, GetMemoryChunkSpace(newtuple));
4153         stup->tuple = (void *) newtuple;
4154         /* set up first-column key value */
4155         original = index_getattr(newtuple,
4156                                                          1,
4157                                                          RelationGetDescr(state->indexRel),
4158                                                          &stup->isnull1);
4159
4160         if (!state->sortKeys->abbrev_converter || stup->isnull1)
4161         {
4162                 /*
4163                  * Store ordinary Datum representation, or NULL value.  If there is a
4164                  * converter it won't expect NULL values, and cost model is not
4165                  * required to account for NULL, so in that case we avoid calling
4166                  * converter and just set datum1 to zeroed representation (to be
4167                  * consistent, and to support cheap inequality tests for NULL
4168                  * abbreviated keys).
4169                  */
4170                 stup->datum1 = original;
4171         }
4172         else if (!consider_abort_common(state))
4173         {
4174                 /* Store abbreviated key representation */
4175                 stup->datum1 = state->sortKeys->abbrev_converter(original,
4176                                                                                                                  state->sortKeys);
4177         }
4178         else
4179         {
4180                 /* Abort abbreviation */
4181                 int                     i;
4182
4183                 stup->datum1 = original;
4184
4185                 /*
4186                  * Set state to be consistent with never trying abbreviation.
4187                  *
4188                  * Alter datum1 representation in already-copied tuples, so as to
4189                  * ensure a consistent representation (current tuple was just
4190                  * handled).  It does not matter if some dumped tuples are already
4191                  * sorted on tape, since serialized tuples lack abbreviated keys
4192                  * (TSS_BUILDRUNS state prevents control reaching here in any case).
4193                  */
4194                 for (i = 0; i < state->memtupcount; i++)
4195                 {
4196                         SortTuple  *mtup = &state->memtuples[i];
4197
4198                         tuple = (IndexTuple) mtup->tuple;
4199                         mtup->datum1 = index_getattr(tuple,
4200                                                                                  1,
4201                                                                                  RelationGetDescr(state->indexRel),
4202                                                                                  &mtup->isnull1);
4203                 }
4204         }
4205 }
4206
4207 static void
4208 writetup_index(Tuplesortstate *state, int tapenum, SortTuple *stup)
4209 {
4210         IndexTuple      tuple = (IndexTuple) stup->tuple;
4211         unsigned int tuplen;
4212
4213         tuplen = IndexTupleSize(tuple) + sizeof(tuplen);
4214         LogicalTapeWrite(state->tapeset, tapenum,
4215                                          (void *) &tuplen, sizeof(tuplen));
4216         LogicalTapeWrite(state->tapeset, tapenum,
4217                                          (void *) tuple, IndexTupleSize(tuple));
4218         if (state->randomAccess)        /* need trailing length word? */
4219                 LogicalTapeWrite(state->tapeset, tapenum,
4220                                                  (void *) &tuplen, sizeof(tuplen));
4221
4222         if (!state->slabAllocatorUsed)
4223         {
4224                 FREEMEM(state, GetMemoryChunkSpace(tuple));
4225                 pfree(tuple);
4226         }
4227 }
4228
4229 static void
4230 readtup_index(Tuplesortstate *state, SortTuple *stup,
4231                           int tapenum, unsigned int len)
4232 {
4233         unsigned int tuplen = len - sizeof(unsigned int);
4234         IndexTuple      tuple = (IndexTuple) readtup_alloc(state, tuplen);
4235
4236         LogicalTapeReadExact(state->tapeset, tapenum,
4237                                                  tuple, tuplen);
4238         if (state->randomAccess)        /* need trailing length word? */
4239                 LogicalTapeReadExact(state->tapeset, tapenum,
4240                                                          &tuplen, sizeof(tuplen));
4241         stup->tuple = (void *) tuple;
4242         /* set up first-column key value */
4243         stup->datum1 = index_getattr(tuple,
4244                                                                  1,
4245                                                                  RelationGetDescr(state->indexRel),
4246                                                                  &stup->isnull1);
4247 }
4248
4249 /*
4250  * Routines specialized for DatumTuple case
4251  */
4252
4253 static int
4254 comparetup_datum(const SortTuple *a, const SortTuple *b, Tuplesortstate *state)
4255 {
4256         int                     compare;
4257
4258         compare = ApplySortComparator(a->datum1, a->isnull1,
4259                                                                   b->datum1, b->isnull1,
4260                                                                   state->sortKeys);
4261         if (compare != 0)
4262                 return compare;
4263
4264         /* if we have abbreviations, then "tuple" has the original value */
4265
4266         if (state->sortKeys->abbrev_converter)
4267                 compare = ApplySortAbbrevFullComparator(PointerGetDatum(a->tuple), a->isnull1,
4268                                                                                                 PointerGetDatum(b->tuple), b->isnull1,
4269                                                                                                 state->sortKeys);
4270
4271         return compare;
4272 }
4273
4274 static void
4275 copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup)
4276 {
4277         /* Not currently needed */
4278         elog(ERROR, "copytup_datum() should not be called");
4279 }
4280
4281 static void
4282 writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup)
4283 {
4284         void       *waddr;
4285         unsigned int tuplen;
4286         unsigned int writtenlen;
4287
4288         if (stup->isnull1)
4289         {
4290                 waddr = NULL;
4291                 tuplen = 0;
4292         }
4293         else if (!state->tuples)
4294         {
4295                 waddr = &stup->datum1;
4296                 tuplen = sizeof(Datum);
4297         }
4298         else
4299         {
4300                 waddr = stup->tuple;
4301                 tuplen = datumGetSize(PointerGetDatum(stup->tuple), false, state->datumTypeLen);
4302                 Assert(tuplen != 0);
4303         }
4304
4305         writtenlen = tuplen + sizeof(unsigned int);
4306
4307         LogicalTapeWrite(state->tapeset, tapenum,
4308                                          (void *) &writtenlen, sizeof(writtenlen));
4309         LogicalTapeWrite(state->tapeset, tapenum,
4310                                          waddr, tuplen);
4311         if (state->randomAccess)        /* need trailing length word? */
4312                 LogicalTapeWrite(state->tapeset, tapenum,
4313                                                  (void *) &writtenlen, sizeof(writtenlen));
4314
4315         if (!state->slabAllocatorUsed && stup->tuple)
4316         {
4317                 FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
4318                 pfree(stup->tuple);
4319         }
4320 }
4321
4322 static void
4323 readtup_datum(Tuplesortstate *state, SortTuple *stup,
4324                           int tapenum, unsigned int len)
4325 {
4326         unsigned int tuplen = len - sizeof(unsigned int);
4327
4328         if (tuplen == 0)
4329         {
4330                 /* it's NULL */
4331                 stup->datum1 = (Datum) 0;
4332                 stup->isnull1 = true;
4333                 stup->tuple = NULL;
4334         }
4335         else if (!state->tuples)
4336         {
4337                 Assert(tuplen == sizeof(Datum));
4338                 LogicalTapeReadExact(state->tapeset, tapenum,
4339                                                          &stup->datum1, tuplen);
4340                 stup->isnull1 = false;
4341                 stup->tuple = NULL;
4342         }
4343         else
4344         {
4345                 void       *raddr = readtup_alloc(state, tuplen);
4346
4347                 LogicalTapeReadExact(state->tapeset, tapenum,
4348                                                          raddr, tuplen);
4349                 stup->datum1 = PointerGetDatum(raddr);
4350                 stup->isnull1 = false;
4351                 stup->tuple = raddr;
4352         }
4353
4354         if (state->randomAccess)        /* need trailing length word? */
4355                 LogicalTapeReadExact(state->tapeset, tapenum,
4356                                                          &tuplen, sizeof(tuplen));
4357 }
4358
4359 /*
4360  * Parallel sort routines
4361  */
4362
4363 /*
4364  * tuplesort_estimate_shared - estimate required shared memory allocation
4365  *
4366  * nWorkers is an estimate of the number of workers (it's the number that
4367  * will be requested).
4368  */
4369 Size
4370 tuplesort_estimate_shared(int nWorkers)
4371 {
4372         Size            tapesSize;
4373
4374         Assert(nWorkers > 0);
4375
4376         /* Make sure that BufFile shared state is MAXALIGN'd */
4377         tapesSize = mul_size(sizeof(TapeShare), nWorkers);
4378         tapesSize = MAXALIGN(add_size(tapesSize, offsetof(Sharedsort, tapes)));
4379
4380         return tapesSize;
4381 }
4382
4383 /*
4384  * tuplesort_initialize_shared - initialize shared tuplesort state
4385  *
4386  * Must be called from leader process before workers are launched, to
4387  * establish state needed up-front for worker tuplesortstates.  nWorkers
4388  * should match the argument passed to tuplesort_estimate_shared().
4389  */
4390 void
4391 tuplesort_initialize_shared(Sharedsort *shared, int nWorkers, dsm_segment *seg)
4392 {
4393         int                     i;
4394
4395         Assert(nWorkers > 0);
4396
4397         SpinLockInit(&shared->mutex);
4398         shared->currentWorker = 0;
4399         shared->workersFinished = 0;
4400         SharedFileSetInit(&shared->fileset, seg);
4401         shared->nTapes = nWorkers;
4402         for (i = 0; i < nWorkers; i++)
4403         {
4404                 shared->tapes[i].firstblocknumber = 0L;
4405         }
4406 }
4407
4408 /*
4409  * tuplesort_attach_shared - attach to shared tuplesort state
4410  *
4411  * Must be called by all worker processes.
4412  */
4413 void
4414 tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg)
4415 {
4416         /* Attach to SharedFileSet */
4417         SharedFileSetAttach(&shared->fileset, seg);
4418 }
4419
4420 /*
4421  * worker_get_identifier - Assign and return ordinal identifier for worker
4422  *
4423  * The order in which these are assigned is not well defined, and should not
4424  * matter; worker numbers across parallel sort participants need only be
4425  * distinct and gapless.  logtape.c requires this.
4426  *
4427  * Note that the identifiers assigned from here have no relation to
4428  * ParallelWorkerNumber number, to avoid making any assumption about
4429  * caller's requirements.  However, we do follow the ParallelWorkerNumber
4430  * convention of representing a non-worker with worker number -1.  This
4431  * includes the leader, as well as serial Tuplesort processes.
4432  */
4433 static int
4434 worker_get_identifier(Tuplesortstate *state)
4435 {
4436         Sharedsort *shared = state->shared;
4437         int                     worker;
4438
4439         Assert(WORKER(state));
4440
4441         SpinLockAcquire(&shared->mutex);
4442         worker = shared->currentWorker++;
4443         SpinLockRelease(&shared->mutex);
4444
4445         return worker;
4446 }
4447
4448 /*
4449  * worker_freeze_result_tape - freeze worker's result tape for leader
4450  *
4451  * This is called by workers just after the result tape has been determined,
4452  * instead of calling LogicalTapeFreeze() directly.  They do so because
4453  * workers require a few additional steps over similar serial
4454  * TSS_SORTEDONTAPE external sort cases, which also happen here.  The extra
4455  * steps are around freeing now unneeded resources, and representing to
4456  * leader that worker's input run is available for its merge.
4457  *
4458  * There should only be one final output run for each worker, which consists
4459  * of all tuples that were originally input into worker.
4460  */
4461 static void
4462 worker_freeze_result_tape(Tuplesortstate *state)
4463 {
4464         Sharedsort *shared = state->shared;
4465         TapeShare       output;
4466
4467         Assert(WORKER(state));
4468         Assert(state->result_tape != -1);
4469         Assert(state->memtupcount == 0);
4470
4471         /*
4472          * Free most remaining memory, in case caller is sensitive to our holding
4473          * on to it.  memtuples may not be a tiny merge heap at this point.
4474          */
4475         pfree(state->memtuples);
4476         /* Be tidy */
4477         state->memtuples = NULL;
4478         state->memtupsize = 0;
4479
4480         /*
4481          * Parallel worker requires result tape metadata, which is to be stored in
4482          * shared memory for leader
4483          */
4484         LogicalTapeFreeze(state->tapeset, state->result_tape, &output);
4485
4486         /* Store properties of output tape, and update finished worker count */
4487         SpinLockAcquire(&shared->mutex);
4488         shared->tapes[state->worker] = output;
4489         shared->workersFinished++;
4490         SpinLockRelease(&shared->mutex);
4491 }
4492
4493 /*
4494  * worker_nomergeruns - dump memtuples in worker, without merging
4495  *
4496  * This called as an alternative to mergeruns() with a worker when no
4497  * merging is required.
4498  */
4499 static void
4500 worker_nomergeruns(Tuplesortstate *state)
4501 {
4502         Assert(WORKER(state));
4503         Assert(state->result_tape == -1);
4504
4505         state->result_tape = state->tp_tapenum[state->destTape];
4506         worker_freeze_result_tape(state);
4507 }
4508
4509 /*
4510  * leader_takeover_tapes - create tapeset for leader from worker tapes
4511  *
4512  * So far, leader Tuplesortstate has performed no actual sorting.  By now, all
4513  * sorting has occurred in workers, all of which must have already returned
4514  * from tuplesort_performsort().
4515  *
4516  * When this returns, leader process is left in a state that is virtually
4517  * indistinguishable from it having generated runs as a serial external sort
4518  * might have.
4519  */
4520 static void
4521 leader_takeover_tapes(Tuplesortstate *state)
4522 {
4523         Sharedsort *shared = state->shared;
4524         int                     nParticipants = state->nParticipants;
4525         int                     workersFinished;
4526         int                     j;
4527
4528         Assert(LEADER(state));
4529         Assert(nParticipants >= 1);
4530
4531         SpinLockAcquire(&shared->mutex);
4532         workersFinished = shared->workersFinished;
4533         SpinLockRelease(&shared->mutex);
4534
4535         if (nParticipants != workersFinished)
4536                 elog(ERROR, "cannot take over tapes before all workers finish");
4537
4538         /*
4539          * Create the tapeset from worker tapes, including a leader-owned tape at
4540          * the end.  Parallel workers are far more expensive than logical tapes,
4541          * so the number of tapes allocated here should never be excessive.
4542          *
4543          * We still have a leader tape, though it's not possible to write to it
4544          * due to restrictions in the shared fileset infrastructure used by
4545          * logtape.c.  It will never be written to in practice because
4546          * randomAccess is disallowed for parallel sorts.
4547          */
4548         inittapestate(state, nParticipants + 1);
4549         state->tapeset = LogicalTapeSetCreate(nParticipants + 1, shared->tapes,
4550                                                                                   &shared->fileset, state->worker);
4551
4552         /* mergeruns() relies on currentRun for # of runs (in one-pass cases) */
4553         state->currentRun = nParticipants;
4554
4555         /*
4556          * Initialize variables of Algorithm D to be consistent with runs from
4557          * workers having been generated in the leader.
4558          *
4559          * There will always be exactly 1 run per worker, and exactly one input
4560          * tape per run, because workers always output exactly 1 run, even when
4561          * there were no input tuples for workers to sort.
4562          */
4563         for (j = 0; j < state->maxTapes; j++)
4564         {
4565                 /* One real run; no dummy runs for worker tapes */
4566                 state->tp_fib[j] = 1;
4567                 state->tp_runs[j] = 1;
4568                 state->tp_dummy[j] = 0;
4569                 state->tp_tapenum[j] = j;
4570         }
4571         /* Leader tape gets one dummy run, and no real runs */
4572         state->tp_fib[state->tapeRange] = 0;
4573         state->tp_runs[state->tapeRange] = 0;
4574         state->tp_dummy[state->tapeRange] = 1;
4575
4576         state->Level = 1;
4577         state->destTape = 0;
4578
4579         state->status = TSS_BUILDRUNS;
4580 }
4581
4582 /*
4583  * Convenience routine to free a tuple previously loaded into sort memory
4584  */
4585 static void
4586 free_sort_tuple(Tuplesortstate *state, SortTuple *stup)
4587 {
4588         FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
4589         pfree(stup->tuple);
4590 }