]> granicus.if.org Git - postgresql/blob - src/backend/utils/sort/tuplesort.c
Create a "sort support" interface API for faster sorting.
[postgresql] / src / backend / utils / sort / tuplesort.c
1 /*-------------------------------------------------------------------------
2  *
3  * tuplesort.c
4  *        Generalized tuple sorting routines.
5  *
6  * This module handles sorting of heap tuples, index tuples, or single
7  * Datums (and could easily support other kinds of sortable objects,
8  * if necessary).  It works efficiently for both small and large amounts
9  * of data.  Small amounts are sorted in-memory using qsort().  Large
10  * amounts are sorted using temporary files and a standard external sort
11  * algorithm.
12  *
13  * See Knuth, volume 3, for more than you want to know about the external
14  * sorting algorithm.  We divide the input into sorted runs using replacement
15  * selection, in the form of a priority tree implemented as a heap
16  * (essentially his Algorithm 5.2.3H), then merge the runs using polyphase
17  * merge, Knuth's Algorithm 5.4.2D.  The logical "tapes" used by Algorithm D
18  * are implemented by logtape.c, which avoids space wastage by recycling
19  * disk space as soon as each block is read from its "tape".
20  *
21  * We do not form the initial runs using Knuth's recommended replacement
22  * selection data structure (Algorithm 5.4.1R), because it uses a fixed
23  * number of records in memory at all times.  Since we are dealing with
24  * tuples that may vary considerably in size, we want to be able to vary
25  * the number of records kept in memory to ensure full utilization of the
26  * allowed sort memory space.  So, we keep the tuples in a variable-size
27  * heap, with the next record to go out at the top of the heap.  Like
28  * Algorithm 5.4.1R, each record is stored with the run number that it
29  * must go into, and we use (run number, key) as the ordering key for the
30  * heap.  When the run number at the top of the heap changes, we know that
31  * no more records of the prior run are left in the heap.
32  *
33  * The approximate amount of memory allowed for any one sort operation
34  * is specified in kilobytes by the caller (most pass work_mem).  Initially,
35  * we absorb tuples and simply store them in an unsorted array as long as
36  * we haven't exceeded workMem.  If we reach the end of the input without
37  * exceeding workMem, we sort the array using qsort() and subsequently return
38  * tuples just by scanning the tuple array sequentially.  If we do exceed
39  * workMem, we construct a heap using Algorithm H and begin to emit tuples
40  * into sorted runs in temporary tapes, emitting just enough tuples at each
41  * step to get back within the workMem limit.  Whenever the run number at
42  * the top of the heap changes, we begin a new run with a new output tape
43  * (selected per Algorithm D).  After the end of the input is reached,
44  * we dump out remaining tuples in memory into a final run (or two),
45  * then merge the runs using Algorithm D.
46  *
47  * When merging runs, we use a heap containing just the frontmost tuple from
48  * each source run; we repeatedly output the smallest tuple and insert the
49  * next tuple from its source tape (if any).  When the heap empties, the merge
50  * is complete.  The basic merge algorithm thus needs very little memory ---
51  * only M tuples for an M-way merge, and M is constrained to a small number.
52  * However, we can still make good use of our full workMem allocation by
53  * pre-reading additional tuples from each source tape.  Without prereading,
54  * our access pattern to the temporary file would be very erratic; on average
55  * we'd read one block from each of M source tapes during the same time that
56  * we're writing M blocks to the output tape, so there is no sequentiality of
57  * access at all, defeating the read-ahead methods used by most Unix kernels.
58  * Worse, the output tape gets written into a very random sequence of blocks
59  * of the temp file, ensuring that things will be even worse when it comes
60  * time to read that tape.      A straightforward merge pass thus ends up doing a
61  * lot of waiting for disk seeks.  We can improve matters by prereading from
62  * each source tape sequentially, loading about workMem/M bytes from each tape
63  * in turn.  Then we run the merge algorithm, writing but not reading until
64  * one of the preloaded tuple series runs out.  Then we switch back to preread
65  * mode, fill memory again, and repeat.  This approach helps to localize both
66  * read and write accesses.
67  *
68  * When the caller requests random access to the sort result, we form
69  * the final sorted run on a logical tape which is then "frozen", so
70  * that we can access it randomly.      When the caller does not need random
71  * access, we return from tuplesort_performsort() as soon as we are down
72  * to one run per logical tape.  The final merge is then performed
73  * on-the-fly as the caller repeatedly calls tuplesort_getXXX; this
74  * saves one cycle of writing all the data out to disk and reading it in.
75  *
76  * Before Postgres 8.2, we always used a seven-tape polyphase merge, on the
77  * grounds that 7 is the "sweet spot" on the tapes-to-passes curve according
78  * to Knuth's figure 70 (section 5.4.2).  However, Knuth is assuming that
79  * tape drives are expensive beasts, and in particular that there will always
80  * be many more runs than tape drives.  In our implementation a "tape drive"
81  * doesn't cost much more than a few Kb of memory buffers, so we can afford
82  * to have lots of them.  In particular, if we can have as many tape drives
83  * as sorted runs, we can eliminate any repeated I/O at all.  In the current
84  * code we determine the number of tapes M on the basis of workMem: we want
85  * workMem/M to be large enough that we read a fair amount of data each time
86  * we preread from a tape, so as to maintain the locality of access described
87  * above.  Nonetheless, with large workMem we can have many tapes.
88  *
89  *
90  * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
91  * Portions Copyright (c) 1994, Regents of the University of California
92  *
93  * IDENTIFICATION
94  *        src/backend/utils/sort/tuplesort.c
95  *
96  *-------------------------------------------------------------------------
97  */
98
99 #include "postgres.h"
100
101 #include <limits.h>
102
103 #include "access/nbtree.h"
104 #include "catalog/index.h"
105 #include "commands/tablespace.h"
106 #include "executor/executor.h"
107 #include "miscadmin.h"
108 #include "pg_trace.h"
109 #include "utils/datum.h"
110 #include "utils/logtape.h"
111 #include "utils/lsyscache.h"
112 #include "utils/memutils.h"
113 #include "utils/pg_rusage.h"
114 #include "utils/rel.h"
115 #include "utils/sortsupport.h"
116 #include "utils/tuplesort.h"
117
118
119 /* sort-type codes for sort__start probes */
120 #define HEAP_SORT               0
121 #define INDEX_SORT              1
122 #define DATUM_SORT              2
123 #define CLUSTER_SORT    3
124
125 /* GUC variables */
126 #ifdef TRACE_SORT
127 bool            trace_sort = false;
128 #endif
129
130 #ifdef DEBUG_BOUNDED_SORT
131 bool            optimize_bounded_sort = true;
132 #endif
133
134
135 /*
136  * The objects we actually sort are SortTuple structs.  These contain
137  * a pointer to the tuple proper (might be a MinimalTuple or IndexTuple),
138  * which is a separate palloc chunk --- we assume it is just one chunk and
139  * can be freed by a simple pfree().  SortTuples also contain the tuple's
140  * first key column in Datum/nullflag format, and an index integer.
141  *
142  * Storing the first key column lets us save heap_getattr or index_getattr
143  * calls during tuple comparisons.      We could extract and save all the key
144  * columns not just the first, but this would increase code complexity and
145  * overhead, and wouldn't actually save any comparison cycles in the common
146  * case where the first key determines the comparison result.  Note that
147  * for a pass-by-reference datatype, datum1 points into the "tuple" storage.
148  *
149  * When sorting single Datums, the data value is represented directly by
150  * datum1/isnull1.      If the datatype is pass-by-reference and isnull1 is false,
151  * then datum1 points to a separately palloc'd data value that is also pointed
152  * to by the "tuple" pointer; otherwise "tuple" is NULL.
153  *
154  * While building initial runs, tupindex holds the tuple's run number.  During
155  * merge passes, we re-use it to hold the input tape number that each tuple in
156  * the heap was read from, or to hold the index of the next tuple pre-read
157  * from the same tape in the case of pre-read entries.  tupindex goes unused
158  * if the sort occurs entirely in memory.
159  */
160 typedef struct
161 {
162         void       *tuple;                      /* the tuple proper */
163         Datum           datum1;                 /* value of first key column */
164         bool            isnull1;                /* is first key column NULL? */
165         int                     tupindex;               /* see notes above */
166 } SortTuple;
167
168
169 /*
170  * Possible states of a Tuplesort object.  These denote the states that
171  * persist between calls of Tuplesort routines.
172  */
173 typedef enum
174 {
175         TSS_INITIAL,                            /* Loading tuples; still within memory limit */
176         TSS_BOUNDED,                            /* Loading tuples into bounded-size heap */
177         TSS_BUILDRUNS,                          /* Loading tuples; writing to tape */
178         TSS_SORTEDINMEM,                        /* Sort completed entirely in memory */
179         TSS_SORTEDONTAPE,                       /* Sort completed, final run is on tape */
180         TSS_FINALMERGE                          /* Performing final merge on-the-fly */
181 } TupSortStatus;
182
183 /*
184  * Parameters for calculation of number of tapes to use --- see inittapes()
185  * and tuplesort_merge_order().
186  *
187  * In this calculation we assume that each tape will cost us about 3 blocks
188  * worth of buffer space (which is an underestimate for very large data
189  * volumes, but it's probably close enough --- see logtape.c).
190  *
191  * MERGE_BUFFER_SIZE is how much data we'd like to read from each input
192  * tape during a preread cycle (see discussion at top of file).
193  */
194 #define MINORDER                6               /* minimum merge order */
195 #define TAPE_BUFFER_OVERHEAD            (BLCKSZ * 3)
196 #define MERGE_BUFFER_SIZE                       (BLCKSZ * 32)
197
198 /*
199  * Private state of a Tuplesort operation.
200  */
201 struct Tuplesortstate
202 {
203         TupSortStatus status;           /* enumerated value as shown above */
204         int                     nKeys;                  /* number of columns in sort key */
205         bool            randomAccess;   /* did caller request random access? */
206         bool            bounded;                /* did caller specify a maximum number of
207                                                                  * tuples to return? */
208         bool            boundUsed;              /* true if we made use of a bounded heap */
209         int                     bound;                  /* if bounded, the maximum number of tuples */
210         long            availMem;               /* remaining memory available, in bytes */
211         long            allowedMem;             /* total memory allowed, in bytes */
212         int                     maxTapes;               /* number of tapes (Knuth's T) */
213         int                     tapeRange;              /* maxTapes-1 (Knuth's P) */
214         MemoryContext sortcontext;      /* memory context holding all sort data */
215         LogicalTapeSet *tapeset;        /* logtape.c object for tapes in a temp file */
216
217         /*
218          * These function pointers decouple the routines that must know what kind
219          * of tuple we are sorting from the routines that don't need to know it.
220          * They are set up by the tuplesort_begin_xxx routines.
221          *
222          * Function to compare two tuples; result is per qsort() convention, ie:
223          * <0, 0, >0 according as a<b, a=b, a>b.  The API must match
224          * qsort_arg_comparator.
225          */
226         int                     (*comparetup) (const SortTuple *a, const SortTuple *b,
227                                                                                    Tuplesortstate *state);
228
229         /*
230          * Function to copy a supplied input tuple into palloc'd space and set up
231          * its SortTuple representation (ie, set tuple/datum1/isnull1).  Also,
232          * state->availMem must be decreased by the amount of space used for the
233          * tuple copy (note the SortTuple struct itself is not counted).
234          */
235         void            (*copytup) (Tuplesortstate *state, SortTuple *stup, void *tup);
236
237         /*
238          * Function to write a stored tuple onto tape.  The representation of the
239          * tuple on tape need not be the same as it is in memory; requirements on
240          * the tape representation are given below.  After writing the tuple,
241          * pfree() the out-of-line data (not the SortTuple struct!), and increase
242          * state->availMem by the amount of memory space thereby released.
243          */
244         void            (*writetup) (Tuplesortstate *state, int tapenum,
245                                                                                  SortTuple *stup);
246
247         /*
248          * Function to read a stored tuple from tape back into memory. 'len' is
249          * the already-read length of the stored tuple.  Create a palloc'd copy,
250          * initialize tuple/datum1/isnull1 in the target SortTuple struct, and
251          * decrease state->availMem by the amount of memory space consumed.
252          */
253         void            (*readtup) (Tuplesortstate *state, SortTuple *stup,
254                                                                                 int tapenum, unsigned int len);
255
256         /*
257          * Function to reverse the sort direction from its current state. (We
258          * could dispense with this if we wanted to enforce that all variants
259          * represent the sort key information alike.)
260          */
261         void            (*reversedirection) (Tuplesortstate *state);
262
263         /*
264          * This array holds the tuples now in sort memory.      If we are in state
265          * INITIAL, the tuples are in no particular order; if we are in state
266          * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
267          * and FINALMERGE, the tuples are organized in "heap" order per Algorithm
268          * H.  (Note that memtupcount only counts the tuples that are part of the
269          * heap --- during merge passes, memtuples[] entries beyond tapeRange are
270          * never in the heap and are used to hold pre-read tuples.)  In state
271          * SORTEDONTAPE, the array is not used.
272          */
273         SortTuple  *memtuples;          /* array of SortTuple structs */
274         int                     memtupcount;    /* number of tuples currently present */
275         int                     memtupsize;             /* allocated length of memtuples array */
276
277         /*
278          * While building initial runs, this is the current output run number
279          * (starting at 0).  Afterwards, it is the number of initial runs we made.
280          */
281         int                     currentRun;
282
283         /*
284          * Unless otherwise noted, all pointer variables below are pointers to
285          * arrays of length maxTapes, holding per-tape data.
286          */
287
288         /*
289          * These variables are only used during merge passes.  mergeactive[i] is
290          * true if we are reading an input run from (actual) tape number i and
291          * have not yet exhausted that run.  mergenext[i] is the memtuples index
292          * of the next pre-read tuple (next to be loaded into the heap) for tape
293          * i, or 0 if we are out of pre-read tuples.  mergelast[i] similarly
294          * points to the last pre-read tuple from each tape.  mergeavailslots[i]
295          * is the number of unused memtuples[] slots reserved for tape i, and
296          * mergeavailmem[i] is the amount of unused space allocated for tape i.
297          * mergefreelist and mergefirstfree keep track of unused locations in the
298          * memtuples[] array.  The memtuples[].tupindex fields link together
299          * pre-read tuples for each tape as well as recycled locations in
300          * mergefreelist. It is OK to use 0 as a null link in these lists, because
301          * memtuples[0] is part of the merge heap and is never a pre-read tuple.
302          */
303         bool       *mergeactive;        /* active input run source? */
304         int                *mergenext;          /* first preread tuple for each source */
305         int                *mergelast;          /* last preread tuple for each source */
306         int                *mergeavailslots;    /* slots left for prereading each tape */
307         long       *mergeavailmem;      /* availMem for prereading each tape */
308         int                     mergefreelist;  /* head of freelist of recycled slots */
309         int                     mergefirstfree; /* first slot never used in this merge */
310
311         /*
312          * Variables for Algorithm D.  Note that destTape is a "logical" tape
313          * number, ie, an index into the tp_xxx[] arrays.  Be careful to keep
314          * "logical" and "actual" tape numbers straight!
315          */
316         int                     Level;                  /* Knuth's l */
317         int                     destTape;               /* current output tape (Knuth's j, less 1) */
318         int                *tp_fib;                     /* Target Fibonacci run counts (A[]) */
319         int                *tp_runs;            /* # of real runs on each tape */
320         int                *tp_dummy;           /* # of dummy runs for each tape (D[]) */
321         int                *tp_tapenum;         /* Actual tape numbers (TAPE[]) */
322         int                     activeTapes;    /* # of active input tapes in merge pass */
323
324         /*
325          * These variables are used after completion of sorting to keep track of
326          * the next tuple to return.  (In the tape case, the tape's current read
327          * position is also critical state.)
328          */
329         int                     result_tape;    /* actual tape number of finished output */
330         int                     current;                /* array index (only used if SORTEDINMEM) */
331         bool            eof_reached;    /* reached EOF (needed for cursors) */
332
333         /* markpos_xxx holds marked position for mark and restore */
334         long            markpos_block;  /* tape block# (only used if SORTEDONTAPE) */
335         int                     markpos_offset; /* saved "current", or offset in tape block */
336         bool            markpos_eof;    /* saved "eof_reached" */
337
338         /*
339          * These variables are specific to the MinimalTuple case; they are set by
340          * tuplesort_begin_heap and used only by the MinimalTuple routines.
341          */
342         TupleDesc       tupDesc;
343         SortSupport     sortKeys;               /* array of length nKeys */
344
345         /*
346          * These variables are specific to the CLUSTER case; they are set by
347          * tuplesort_begin_cluster.  Note CLUSTER also uses tupDesc and
348          * indexScanKey.
349          */
350         IndexInfo  *indexInfo;          /* info about index being used for reference */
351         EState     *estate;                     /* for evaluating index expressions */
352
353         /*
354          * These variables are specific to the IndexTuple case; they are set by
355          * tuplesort_begin_index_xxx and used only by the IndexTuple routines.
356          */
357         Relation        indexRel;               /* index being built */
358
359         /* These are specific to the index_btree subcase: */
360         ScanKey         indexScanKey;
361         bool            enforceUnique;  /* complain if we find duplicate tuples */
362
363         /* These are specific to the index_hash subcase: */
364         uint32          hash_mask;              /* mask for sortable part of hash code */
365
366         /*
367          * These variables are specific to the Datum case; they are set by
368          * tuplesort_begin_datum and used only by the DatumTuple routines.
369          */
370         Oid                     datumType;
371         SortSupport     datumKey;
372         /* we need typelen and byval in order to know how to copy the Datums. */
373         int                     datumTypeLen;
374         bool            datumTypeByVal;
375
376         /*
377          * Resource snapshot for time of sort start.
378          */
379 #ifdef TRACE_SORT
380         PGRUsage        ru_start;
381 #endif
382 };
383
384 #define COMPARETUP(state,a,b)   ((*(state)->comparetup) (a, b, state))
385 #define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup))
386 #define WRITETUP(state,tape,stup)       ((*(state)->writetup) (state, tape, stup))
387 #define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len))
388 #define REVERSEDIRECTION(state) ((*(state)->reversedirection) (state))
389 #define LACKMEM(state)          ((state)->availMem < 0)
390 #define USEMEM(state,amt)       ((state)->availMem -= (amt))
391 #define FREEMEM(state,amt)      ((state)->availMem += (amt))
392
393 /*
394  * NOTES about on-tape representation of tuples:
395  *
396  * We require the first "unsigned int" of a stored tuple to be the total size
397  * on-tape of the tuple, including itself (so it is never zero; an all-zero
398  * unsigned int is used to delimit runs).  The remainder of the stored tuple
399  * may or may not match the in-memory representation of the tuple ---
400  * any conversion needed is the job of the writetup and readtup routines.
401  *
402  * If state->randomAccess is true, then the stored representation of the
403  * tuple must be followed by another "unsigned int" that is a copy of the
404  * length --- so the total tape space used is actually sizeof(unsigned int)
405  * more than the stored length value.  This allows read-backwards.      When
406  * randomAccess is not true, the write/read routines may omit the extra
407  * length word.
408  *
409  * writetup is expected to write both length words as well as the tuple
410  * data.  When readtup is called, the tape is positioned just after the
411  * front length word; readtup must read the tuple data and advance past
412  * the back length word (if present).
413  *
414  * The write/read routines can make use of the tuple description data
415  * stored in the Tuplesortstate record, if needed.      They are also expected
416  * to adjust state->availMem by the amount of memory space (not tape space!)
417  * released or consumed.  There is no error return from either writetup
418  * or readtup; they should ereport() on failure.
419  *
420  *
421  * NOTES about memory consumption calculations:
422  *
423  * We count space allocated for tuples against the workMem limit, plus
424  * the space used by the variable-size memtuples array.  Fixed-size space
425  * is not counted; it's small enough to not be interesting.
426  *
427  * Note that we count actual space used (as shown by GetMemoryChunkSpace)
428  * rather than the originally-requested size.  This is important since
429  * palloc can add substantial overhead.  It's not a complete answer since
430  * we won't count any wasted space in palloc allocation blocks, but it's
431  * a lot better than what we were doing before 7.3.
432  */
433
434 /* When using this macro, beware of double evaluation of len */
435 #define LogicalTapeReadExact(tapeset, tapenum, ptr, len) \
436         do { \
437                 if (LogicalTapeRead(tapeset, tapenum, ptr, len) != (size_t) (len)) \
438                         elog(ERROR, "unexpected end of data"); \
439         } while(0)
440
441
442 static Tuplesortstate *tuplesort_begin_common(int workMem, bool randomAccess);
443 static void puttuple_common(Tuplesortstate *state, SortTuple *tuple);
444 static void inittapes(Tuplesortstate *state);
445 static void selectnewtape(Tuplesortstate *state);
446 static void mergeruns(Tuplesortstate *state);
447 static void mergeonerun(Tuplesortstate *state);
448 static void beginmerge(Tuplesortstate *state);
449 static void mergepreread(Tuplesortstate *state);
450 static void mergeprereadone(Tuplesortstate *state, int srcTape);
451 static void dumptuples(Tuplesortstate *state, bool alltuples);
452 static void make_bounded_heap(Tuplesortstate *state);
453 static void sort_bounded_heap(Tuplesortstate *state);
454 static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple,
455                                           int tupleindex, bool checkIndex);
456 static void tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex);
457 static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
458 static void markrunend(Tuplesortstate *state, int tapenum);
459 static int comparetup_heap(const SortTuple *a, const SortTuple *b,
460                                 Tuplesortstate *state);
461 static void copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup);
462 static void writetup_heap(Tuplesortstate *state, int tapenum,
463                           SortTuple *stup);
464 static void readtup_heap(Tuplesortstate *state, SortTuple *stup,
465                          int tapenum, unsigned int len);
466 static void reversedirection_heap(Tuplesortstate *state);
467 static int comparetup_cluster(const SortTuple *a, const SortTuple *b,
468                                    Tuplesortstate *state);
469 static void copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup);
470 static void writetup_cluster(Tuplesortstate *state, int tapenum,
471                                  SortTuple *stup);
472 static void readtup_cluster(Tuplesortstate *state, SortTuple *stup,
473                                 int tapenum, unsigned int len);
474 static int comparetup_index_btree(const SortTuple *a, const SortTuple *b,
475                                            Tuplesortstate *state);
476 static int comparetup_index_hash(const SortTuple *a, const SortTuple *b,
477                                           Tuplesortstate *state);
478 static void copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup);
479 static void writetup_index(Tuplesortstate *state, int tapenum,
480                            SortTuple *stup);
481 static void readtup_index(Tuplesortstate *state, SortTuple *stup,
482                           int tapenum, unsigned int len);
483 static void reversedirection_index_btree(Tuplesortstate *state);
484 static void reversedirection_index_hash(Tuplesortstate *state);
485 static int comparetup_datum(const SortTuple *a, const SortTuple *b,
486                                  Tuplesortstate *state);
487 static void copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup);
488 static void writetup_datum(Tuplesortstate *state, int tapenum,
489                            SortTuple *stup);
490 static void readtup_datum(Tuplesortstate *state, SortTuple *stup,
491                           int tapenum, unsigned int len);
492 static void reversedirection_datum(Tuplesortstate *state);
493 static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
494
495
496 /*
497  *              tuplesort_begin_xxx
498  *
499  * Initialize for a tuple sort operation.
500  *
501  * After calling tuplesort_begin, the caller should call tuplesort_putXXX
502  * zero or more times, then call tuplesort_performsort when all the tuples
503  * have been supplied.  After performsort, retrieve the tuples in sorted
504  * order by calling tuplesort_getXXX until it returns false/NULL.  (If random
505  * access was requested, rescan, markpos, and restorepos can also be called.)
506  * Call tuplesort_end to terminate the operation and release memory/disk space.
507  *
508  * Each variant of tuplesort_begin has a workMem parameter specifying the
509  * maximum number of kilobytes of RAM to use before spilling data to disk.
510  * (The normal value of this parameter is work_mem, but some callers use
511  * other values.)  Each variant also has a randomAccess parameter specifying
512  * whether the caller needs non-sequential access to the sort result.
513  */
514
515 static Tuplesortstate *
516 tuplesort_begin_common(int workMem, bool randomAccess)
517 {
518         Tuplesortstate *state;
519         MemoryContext sortcontext;
520         MemoryContext oldcontext;
521
522         /*
523          * Create a working memory context for this sort operation. All data
524          * needed by the sort will live inside this context.
525          */
526         sortcontext = AllocSetContextCreate(CurrentMemoryContext,
527                                                                                 "TupleSort",
528                                                                                 ALLOCSET_DEFAULT_MINSIZE,
529                                                                                 ALLOCSET_DEFAULT_INITSIZE,
530                                                                                 ALLOCSET_DEFAULT_MAXSIZE);
531
532         /*
533          * Make the Tuplesortstate within the per-sort context.  This way, we
534          * don't need a separate pfree() operation for it at shutdown.
535          */
536         oldcontext = MemoryContextSwitchTo(sortcontext);
537
538         state = (Tuplesortstate *) palloc0(sizeof(Tuplesortstate));
539
540 #ifdef TRACE_SORT
541         if (trace_sort)
542                 pg_rusage_init(&state->ru_start);
543 #endif
544
545         state->status = TSS_INITIAL;
546         state->randomAccess = randomAccess;
547         state->bounded = false;
548         state->boundUsed = false;
549         state->allowedMem = workMem * 1024L;
550         state->availMem = state->allowedMem;
551         state->sortcontext = sortcontext;
552         state->tapeset = NULL;
553
554         state->memtupcount = 0;
555         state->memtupsize = 1024;       /* initial guess */
556         state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
557
558         USEMEM(state, GetMemoryChunkSpace(state->memtuples));
559
560         /* workMem must be large enough for the minimal memtuples array */
561         if (LACKMEM(state))
562                 elog(ERROR, "insufficient memory allowed for sort");
563
564         state->currentRun = 0;
565
566         /*
567          * maxTapes, tapeRange, and Algorithm D variables will be initialized by
568          * inittapes(), if needed
569          */
570
571         state->result_tape = -1;        /* flag that result tape has not been formed */
572
573         MemoryContextSwitchTo(oldcontext);
574
575         return state;
576 }
577
578 Tuplesortstate *
579 tuplesort_begin_heap(TupleDesc tupDesc,
580                                          int nkeys, AttrNumber *attNums,
581                                          Oid *sortOperators, Oid *sortCollations,
582                                          bool *nullsFirstFlags,
583                                          int workMem, bool randomAccess)
584 {
585         Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
586         MemoryContext oldcontext;
587         int                     i;
588
589         oldcontext = MemoryContextSwitchTo(state->sortcontext);
590
591         AssertArg(nkeys > 0);
592
593 #ifdef TRACE_SORT
594         if (trace_sort)
595                 elog(LOG,
596                          "begin tuple sort: nkeys = %d, workMem = %d, randomAccess = %c",
597                          nkeys, workMem, randomAccess ? 't' : 'f');
598 #endif
599
600         state->nKeys = nkeys;
601
602         TRACE_POSTGRESQL_SORT_START(HEAP_SORT,
603                                                                 false,  /* no unique check */
604                                                                 nkeys,
605                                                                 workMem,
606                                                                 randomAccess);
607
608         state->comparetup = comparetup_heap;
609         state->copytup = copytup_heap;
610         state->writetup = writetup_heap;
611         state->readtup = readtup_heap;
612         state->reversedirection = reversedirection_heap;
613
614         state->tupDesc = tupDesc;       /* assume we need not copy tupDesc */
615
616         /* Prepare SortSupport data for each column */
617         state->sortKeys = (SortSupport) palloc0(nkeys * sizeof(SortSupportData));
618
619         for (i = 0; i < nkeys; i++)
620         {
621                 SortSupport     sortKey = state->sortKeys + i;
622
623                 AssertArg(attNums[i] != 0);
624                 AssertArg(sortOperators[i] != 0);
625
626                 sortKey->ssup_cxt = CurrentMemoryContext;
627                 sortKey->ssup_collation = sortCollations[i];
628                 sortKey->ssup_nulls_first = nullsFirstFlags[i];
629                 sortKey->ssup_attno = attNums[i];
630
631                 PrepareSortSupportFromOrderingOp(sortOperators[i], sortKey);
632         }
633
634         MemoryContextSwitchTo(oldcontext);
635
636         return state;
637 }
638
639 Tuplesortstate *
640 tuplesort_begin_cluster(TupleDesc tupDesc,
641                                                 Relation indexRel,
642                                                 int workMem, bool randomAccess)
643 {
644         Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
645         MemoryContext oldcontext;
646
647         Assert(indexRel->rd_rel->relam == BTREE_AM_OID);
648
649         oldcontext = MemoryContextSwitchTo(state->sortcontext);
650
651 #ifdef TRACE_SORT
652         if (trace_sort)
653                 elog(LOG,
654                          "begin tuple sort: nkeys = %d, workMem = %d, randomAccess = %c",
655                          RelationGetNumberOfAttributes(indexRel),
656                          workMem, randomAccess ? 't' : 'f');
657 #endif
658
659         state->nKeys = RelationGetNumberOfAttributes(indexRel);
660
661         TRACE_POSTGRESQL_SORT_START(CLUSTER_SORT,
662                                                                 false,  /* no unique check */
663                                                                 state->nKeys,
664                                                                 workMem,
665                                                                 randomAccess);
666
667         state->comparetup = comparetup_cluster;
668         state->copytup = copytup_cluster;
669         state->writetup = writetup_cluster;
670         state->readtup = readtup_cluster;
671         state->reversedirection = reversedirection_index_btree;
672
673         state->indexInfo = BuildIndexInfo(indexRel);
674         state->indexScanKey = _bt_mkscankey_nodata(indexRel);
675
676         state->tupDesc = tupDesc;       /* assume we need not copy tupDesc */
677
678         if (state->indexInfo->ii_Expressions != NULL)
679         {
680                 TupleTableSlot *slot;
681                 ExprContext *econtext;
682
683                 /*
684                  * We will need to use FormIndexDatum to evaluate the index
685                  * expressions.  To do that, we need an EState, as well as a
686                  * TupleTableSlot to put the table tuples into.  The econtext's
687                  * scantuple has to point to that slot, too.
688                  */
689                 state->estate = CreateExecutorState();
690                 slot = MakeSingleTupleTableSlot(tupDesc);
691                 econtext = GetPerTupleExprContext(state->estate);
692                 econtext->ecxt_scantuple = slot;
693         }
694
695         MemoryContextSwitchTo(oldcontext);
696
697         return state;
698 }
699
700 Tuplesortstate *
701 tuplesort_begin_index_btree(Relation indexRel,
702                                                         bool enforceUnique,
703                                                         int workMem, bool randomAccess)
704 {
705         Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
706         MemoryContext oldcontext;
707
708         oldcontext = MemoryContextSwitchTo(state->sortcontext);
709
710 #ifdef TRACE_SORT
711         if (trace_sort)
712                 elog(LOG,
713                          "begin index sort: unique = %c, workMem = %d, randomAccess = %c",
714                          enforceUnique ? 't' : 'f',
715                          workMem, randomAccess ? 't' : 'f');
716 #endif
717
718         state->nKeys = RelationGetNumberOfAttributes(indexRel);
719
720         TRACE_POSTGRESQL_SORT_START(INDEX_SORT,
721                                                                 enforceUnique,
722                                                                 state->nKeys,
723                                                                 workMem,
724                                                                 randomAccess);
725
726         state->comparetup = comparetup_index_btree;
727         state->copytup = copytup_index;
728         state->writetup = writetup_index;
729         state->readtup = readtup_index;
730         state->reversedirection = reversedirection_index_btree;
731
732         state->indexRel = indexRel;
733         state->indexScanKey = _bt_mkscankey_nodata(indexRel);
734         state->enforceUnique = enforceUnique;
735
736         MemoryContextSwitchTo(oldcontext);
737
738         return state;
739 }
740
741 Tuplesortstate *
742 tuplesort_begin_index_hash(Relation indexRel,
743                                                    uint32 hash_mask,
744                                                    int workMem, bool randomAccess)
745 {
746         Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
747         MemoryContext oldcontext;
748
749         oldcontext = MemoryContextSwitchTo(state->sortcontext);
750
751 #ifdef TRACE_SORT
752         if (trace_sort)
753                 elog(LOG,
754                 "begin index sort: hash_mask = 0x%x, workMem = %d, randomAccess = %c",
755                          hash_mask,
756                          workMem, randomAccess ? 't' : 'f');
757 #endif
758
759         state->nKeys = 1;                       /* Only one sort column, the hash code */
760
761         state->comparetup = comparetup_index_hash;
762         state->copytup = copytup_index;
763         state->writetup = writetup_index;
764         state->readtup = readtup_index;
765         state->reversedirection = reversedirection_index_hash;
766
767         state->indexRel = indexRel;
768
769         state->hash_mask = hash_mask;
770
771         MemoryContextSwitchTo(oldcontext);
772
773         return state;
774 }
775
776 Tuplesortstate *
777 tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
778                                           bool nullsFirstFlag,
779                                           int workMem, bool randomAccess)
780 {
781         Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
782         MemoryContext oldcontext;
783         int16           typlen;
784         bool            typbyval;
785
786         oldcontext = MemoryContextSwitchTo(state->sortcontext);
787
788 #ifdef TRACE_SORT
789         if (trace_sort)
790                 elog(LOG,
791                          "begin datum sort: workMem = %d, randomAccess = %c",
792                          workMem, randomAccess ? 't' : 'f');
793 #endif
794
795         state->nKeys = 1;                       /* always a one-column sort */
796
797         TRACE_POSTGRESQL_SORT_START(DATUM_SORT,
798                                                                 false,  /* no unique check */
799                                                                 1,
800                                                                 workMem,
801                                                                 randomAccess);
802
803         state->comparetup = comparetup_datum;
804         state->copytup = copytup_datum;
805         state->writetup = writetup_datum;
806         state->readtup = readtup_datum;
807         state->reversedirection = reversedirection_datum;
808
809         state->datumType = datumType;
810
811         /* Prepare SortSupport data */
812         state->datumKey = (SortSupport) palloc0(sizeof(SortSupportData));
813
814         state->datumKey->ssup_cxt = CurrentMemoryContext;
815         state->datumKey->ssup_collation = sortCollation;
816         state->datumKey->ssup_nulls_first = nullsFirstFlag;
817
818         PrepareSortSupportFromOrderingOp(sortOperator, state->datumKey);
819
820         /* lookup necessary attributes of the datum type */
821         get_typlenbyval(datumType, &typlen, &typbyval);
822         state->datumTypeLen = typlen;
823         state->datumTypeByVal = typbyval;
824
825         MemoryContextSwitchTo(oldcontext);
826
827         return state;
828 }
829
830 /*
831  * tuplesort_set_bound
832  *
833  *      Advise tuplesort that at most the first N result tuples are required.
834  *
835  * Must be called before inserting any tuples.  (Actually, we could allow it
836  * as long as the sort hasn't spilled to disk, but there seems no need for
837  * delayed calls at the moment.)
838  *
839  * This is a hint only. The tuplesort may still return more tuples than
840  * requested.
841  */
842 void
843 tuplesort_set_bound(Tuplesortstate *state, int64 bound)
844 {
845         /* Assert we're called before loading any tuples */
846         Assert(state->status == TSS_INITIAL);
847         Assert(state->memtupcount == 0);
848         Assert(!state->bounded);
849
850 #ifdef DEBUG_BOUNDED_SORT
851         /* Honor GUC setting that disables the feature (for easy testing) */
852         if (!optimize_bounded_sort)
853                 return;
854 #endif
855
856         /* We want to be able to compute bound * 2, so limit the setting */
857         if (bound > (int64) (INT_MAX / 2))
858                 return;
859
860         state->bounded = true;
861         state->bound = (int) bound;
862 }
863
864 /*
865  * tuplesort_end
866  *
867  *      Release resources and clean up.
868  *
869  * NOTE: after calling this, any pointers returned by tuplesort_getXXX are
870  * pointing to garbage.  Be careful not to attempt to use or free such
871  * pointers afterwards!
872  */
873 void
874 tuplesort_end(Tuplesortstate *state)
875 {
876         /* context swap probably not needed, but let's be safe */
877         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
878
879 #ifdef TRACE_SORT
880         long            spaceUsed;
881
882         if (state->tapeset)
883                 spaceUsed = LogicalTapeSetBlocks(state->tapeset);
884         else
885                 spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
886 #endif
887
888         /*
889          * Delete temporary "tape" files, if any.
890          *
891          * Note: want to include this in reported total cost of sort, hence need
892          * for two #ifdef TRACE_SORT sections.
893          */
894         if (state->tapeset)
895                 LogicalTapeSetClose(state->tapeset);
896
897 #ifdef TRACE_SORT
898         if (trace_sort)
899         {
900                 if (state->tapeset)
901                         elog(LOG, "external sort ended, %ld disk blocks used: %s",
902                                  spaceUsed, pg_rusage_show(&state->ru_start));
903                 else
904                         elog(LOG, "internal sort ended, %ld KB used: %s",
905                                  spaceUsed, pg_rusage_show(&state->ru_start));
906         }
907
908         TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, spaceUsed);
909 #else
910
911         /*
912          * If you disabled TRACE_SORT, you can still probe sort__done, but you
913          * ain't getting space-used stats.
914          */
915         TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, 0L);
916 #endif
917
918         /* Free any execution state created for CLUSTER case */
919         if (state->estate != NULL)
920         {
921                 ExprContext *econtext = GetPerTupleExprContext(state->estate);
922
923                 ExecDropSingleTupleTableSlot(econtext->ecxt_scantuple);
924                 FreeExecutorState(state->estate);
925         }
926
927         MemoryContextSwitchTo(oldcontext);
928
929         /*
930          * Free the per-sort memory context, thereby releasing all working memory,
931          * including the Tuplesortstate struct itself.
932          */
933         MemoryContextDelete(state->sortcontext);
934 }
935
936 /*
937  * Grow the memtuples[] array, if possible within our memory constraint.
938  * Return TRUE if able to enlarge the array, FALSE if not.
939  *
940  * At each increment we double the size of the array.  When we are short
941  * on memory we could consider smaller increases, but because availMem
942  * moves around with tuple addition/removal, this might result in thrashing.
943  * Small increases in the array size are likely to be pretty inefficient.
944  */
945 static bool
946 grow_memtuples(Tuplesortstate *state)
947 {
948         /*
949          * We need to be sure that we do not cause LACKMEM to become true, else
950          * the space management algorithm will go nuts.  We assume here that the
951          * memory chunk overhead associated with the memtuples array is constant
952          * and so there will be no unexpected addition to what we ask for.      (The
953          * minimum array size established in tuplesort_begin_common is large
954          * enough to force palloc to treat it as a separate chunk, so this
955          * assumption should be good.  But let's check it.)
956          */
957         if (state->availMem <= (long) (state->memtupsize * sizeof(SortTuple)))
958                 return false;
959
960         /*
961          * On a 64-bit machine, allowedMem could be high enough to get us into
962          * trouble with MaxAllocSize, too.
963          */
964         if ((Size) (state->memtupsize * 2) >= MaxAllocSize / sizeof(SortTuple))
965                 return false;
966
967         FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
968         state->memtupsize *= 2;
969         state->memtuples = (SortTuple *)
970                 repalloc(state->memtuples,
971                                  state->memtupsize * sizeof(SortTuple));
972         USEMEM(state, GetMemoryChunkSpace(state->memtuples));
973         if (LACKMEM(state))
974                 elog(ERROR, "unexpected out-of-memory situation during sort");
975         return true;
976 }
977
978 /*
979  * Accept one tuple while collecting input data for sort.
980  *
981  * Note that the input data is always copied; the caller need not save it.
982  */
983 void
984 tuplesort_puttupleslot(Tuplesortstate *state, TupleTableSlot *slot)
985 {
986         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
987         SortTuple       stup;
988
989         /*
990          * Copy the given tuple into memory we control, and decrease availMem.
991          * Then call the common code.
992          */
993         COPYTUP(state, &stup, (void *) slot);
994
995         puttuple_common(state, &stup);
996
997         MemoryContextSwitchTo(oldcontext);
998 }
999
1000 /*
1001  * Accept one tuple while collecting input data for sort.
1002  *
1003  * Note that the input data is always copied; the caller need not save it.
1004  */
1005 void
1006 tuplesort_putheaptuple(Tuplesortstate *state, HeapTuple tup)
1007 {
1008         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1009         SortTuple       stup;
1010
1011         /*
1012          * Copy the given tuple into memory we control, and decrease availMem.
1013          * Then call the common code.
1014          */
1015         COPYTUP(state, &stup, (void *) tup);
1016
1017         puttuple_common(state, &stup);
1018
1019         MemoryContextSwitchTo(oldcontext);
1020 }
1021
1022 /*
1023  * Accept one index tuple while collecting input data for sort.
1024  *
1025  * Note that the input tuple is always copied; the caller need not save it.
1026  */
1027 void
1028 tuplesort_putindextuple(Tuplesortstate *state, IndexTuple tuple)
1029 {
1030         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1031         SortTuple       stup;
1032
1033         /*
1034          * Copy the given tuple into memory we control, and decrease availMem.
1035          * Then call the common code.
1036          */
1037         COPYTUP(state, &stup, (void *) tuple);
1038
1039         puttuple_common(state, &stup);
1040
1041         MemoryContextSwitchTo(oldcontext);
1042 }
1043
1044 /*
1045  * Accept one Datum while collecting input data for sort.
1046  *
1047  * If the Datum is pass-by-ref type, the value will be copied.
1048  */
1049 void
1050 tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull)
1051 {
1052         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1053         SortTuple       stup;
1054
1055         /*
1056          * If it's a pass-by-reference value, copy it into memory we control, and
1057          * decrease availMem.  Then call the common code.
1058          */
1059         if (isNull || state->datumTypeByVal)
1060         {
1061                 stup.datum1 = val;
1062                 stup.isnull1 = isNull;
1063                 stup.tuple = NULL;              /* no separate storage */
1064         }
1065         else
1066         {
1067                 stup.datum1 = datumCopy(val, false, state->datumTypeLen);
1068                 stup.isnull1 = false;
1069                 stup.tuple = DatumGetPointer(stup.datum1);
1070                 USEMEM(state, GetMemoryChunkSpace(stup.tuple));
1071         }
1072
1073         puttuple_common(state, &stup);
1074
1075         MemoryContextSwitchTo(oldcontext);
1076 }
1077
1078 /*
1079  * Shared code for tuple and datum cases.
1080  */
1081 static void
1082 puttuple_common(Tuplesortstate *state, SortTuple *tuple)
1083 {
1084         switch (state->status)
1085         {
1086                 case TSS_INITIAL:
1087
1088                         /*
1089                          * Save the tuple into the unsorted array.      First, grow the array
1090                          * as needed.  Note that we try to grow the array when there is
1091                          * still one free slot remaining --- if we fail, there'll still be
1092                          * room to store the incoming tuple, and then we'll switch to
1093                          * tape-based operation.
1094                          */
1095                         if (state->memtupcount >= state->memtupsize - 1)
1096                         {
1097                                 (void) grow_memtuples(state);
1098                                 Assert(state->memtupcount < state->memtupsize);
1099                         }
1100                         state->memtuples[state->memtupcount++] = *tuple;
1101
1102                         /*
1103                          * Check if it's time to switch over to a bounded heapsort. We do
1104                          * so if the input tuple count exceeds twice the desired tuple
1105                          * count (this is a heuristic for where heapsort becomes cheaper
1106                          * than a quicksort), or if we've just filled workMem and have
1107                          * enough tuples to meet the bound.
1108                          *
1109                          * Note that once we enter TSS_BOUNDED state we will always try to
1110                          * complete the sort that way.  In the worst case, if later input
1111                          * tuples are larger than earlier ones, this might cause us to
1112                          * exceed workMem significantly.
1113                          */
1114                         if (state->bounded &&
1115                                 (state->memtupcount > state->bound * 2 ||
1116                                  (state->memtupcount > state->bound && LACKMEM(state))))
1117                         {
1118 #ifdef TRACE_SORT
1119                                 if (trace_sort)
1120                                         elog(LOG, "switching to bounded heapsort at %d tuples: %s",
1121                                                  state->memtupcount,
1122                                                  pg_rusage_show(&state->ru_start));
1123 #endif
1124                                 make_bounded_heap(state);
1125                                 return;
1126                         }
1127
1128                         /*
1129                          * Done if we still fit in available memory and have array slots.
1130                          */
1131                         if (state->memtupcount < state->memtupsize && !LACKMEM(state))
1132                                 return;
1133
1134                         /*
1135                          * Nope; time to switch to tape-based operation.
1136                          */
1137                         inittapes(state);
1138
1139                         /*
1140                          * Dump tuples until we are back under the limit.
1141                          */
1142                         dumptuples(state, false);
1143                         break;
1144
1145                 case TSS_BOUNDED:
1146
1147                         /*
1148                          * We don't want to grow the array here, so check whether the new
1149                          * tuple can be discarded before putting it in.  This should be a
1150                          * good speed optimization, too, since when there are many more
1151                          * input tuples than the bound, most input tuples can be discarded
1152                          * with just this one comparison.  Note that because we currently
1153                          * have the sort direction reversed, we must check for <= not >=.
1154                          */
1155                         if (COMPARETUP(state, tuple, &state->memtuples[0]) <= 0)
1156                         {
1157                                 /* new tuple <= top of the heap, so we can discard it */
1158                                 free_sort_tuple(state, tuple);
1159                         }
1160                         else
1161                         {
1162                                 /* discard top of heap, sift up, insert new tuple */
1163                                 free_sort_tuple(state, &state->memtuples[0]);
1164                                 tuplesort_heap_siftup(state, false);
1165                                 tuplesort_heap_insert(state, tuple, 0, false);
1166                         }
1167                         break;
1168
1169                 case TSS_BUILDRUNS:
1170
1171                         /*
1172                          * Insert the tuple into the heap, with run number currentRun if
1173                          * it can go into the current run, else run number currentRun+1.
1174                          * The tuple can go into the current run if it is >= the first
1175                          * not-yet-output tuple.  (Actually, it could go into the current
1176                          * run if it is >= the most recently output tuple ... but that
1177                          * would require keeping around the tuple we last output, and it's
1178                          * simplest to let writetup free each tuple as soon as it's
1179                          * written.)
1180                          *
1181                          * Note there will always be at least one tuple in the heap at
1182                          * this point; see dumptuples.
1183                          */
1184                         Assert(state->memtupcount > 0);
1185                         if (COMPARETUP(state, tuple, &state->memtuples[0]) >= 0)
1186                                 tuplesort_heap_insert(state, tuple, state->currentRun, true);
1187                         else
1188                                 tuplesort_heap_insert(state, tuple, state->currentRun + 1, true);
1189
1190                         /*
1191                          * If we are over the memory limit, dump tuples till we're under.
1192                          */
1193                         dumptuples(state, false);
1194                         break;
1195
1196                 default:
1197                         elog(ERROR, "invalid tuplesort state");
1198                         break;
1199         }
1200 }
1201
1202 /*
1203  * All tuples have been provided; finish the sort.
1204  */
1205 void
1206 tuplesort_performsort(Tuplesortstate *state)
1207 {
1208         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1209
1210 #ifdef TRACE_SORT
1211         if (trace_sort)
1212                 elog(LOG, "performsort starting: %s",
1213                          pg_rusage_show(&state->ru_start));
1214 #endif
1215
1216         switch (state->status)
1217         {
1218                 case TSS_INITIAL:
1219
1220                         /*
1221                          * We were able to accumulate all the tuples within the allowed
1222                          * amount of memory.  Just qsort 'em and we're done.
1223                          */
1224                         if (state->memtupcount > 1)
1225                                 qsort_arg((void *) state->memtuples,
1226                                                   state->memtupcount,
1227                                                   sizeof(SortTuple),
1228                                                   (qsort_arg_comparator) state->comparetup,
1229                                                   (void *) state);
1230                         state->current = 0;
1231                         state->eof_reached = false;
1232                         state->markpos_offset = 0;
1233                         state->markpos_eof = false;
1234                         state->status = TSS_SORTEDINMEM;
1235                         break;
1236
1237                 case TSS_BOUNDED:
1238
1239                         /*
1240                          * We were able to accumulate all the tuples required for output
1241                          * in memory, using a heap to eliminate excess tuples.  Now we
1242                          * have to transform the heap to a properly-sorted array.
1243                          */
1244                         sort_bounded_heap(state);
1245                         state->current = 0;
1246                         state->eof_reached = false;
1247                         state->markpos_offset = 0;
1248                         state->markpos_eof = false;
1249                         state->status = TSS_SORTEDINMEM;
1250                         break;
1251
1252                 case TSS_BUILDRUNS:
1253
1254                         /*
1255                          * Finish tape-based sort.      First, flush all tuples remaining in
1256                          * memory out to tape; then merge until we have a single remaining
1257                          * run (or, if !randomAccess, one run per tape). Note that
1258                          * mergeruns sets the correct state->status.
1259                          */
1260                         dumptuples(state, true);
1261                         mergeruns(state);
1262                         state->eof_reached = false;
1263                         state->markpos_block = 0L;
1264                         state->markpos_offset = 0;
1265                         state->markpos_eof = false;
1266                         break;
1267
1268                 default:
1269                         elog(ERROR, "invalid tuplesort state");
1270                         break;
1271         }
1272
1273 #ifdef TRACE_SORT
1274         if (trace_sort)
1275         {
1276                 if (state->status == TSS_FINALMERGE)
1277                         elog(LOG, "performsort done (except %d-way final merge): %s",
1278                                  state->activeTapes,
1279                                  pg_rusage_show(&state->ru_start));
1280                 else
1281                         elog(LOG, "performsort done: %s",
1282                                  pg_rusage_show(&state->ru_start));
1283         }
1284 #endif
1285
1286         MemoryContextSwitchTo(oldcontext);
1287 }
1288
1289 /*
1290  * Internal routine to fetch the next tuple in either forward or back
1291  * direction into *stup.  Returns FALSE if no more tuples.
1292  * If *should_free is set, the caller must pfree stup.tuple when done with it.
1293  */
1294 static bool
1295 tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
1296                                                   SortTuple *stup, bool *should_free)
1297 {
1298         unsigned int tuplen;
1299
1300         switch (state->status)
1301         {
1302                 case TSS_SORTEDINMEM:
1303                         Assert(forward || state->randomAccess);
1304                         *should_free = false;
1305                         if (forward)
1306                         {
1307                                 if (state->current < state->memtupcount)
1308                                 {
1309                                         *stup = state->memtuples[state->current++];
1310                                         return true;
1311                                 }
1312                                 state->eof_reached = true;
1313
1314                                 /*
1315                                  * Complain if caller tries to retrieve more tuples than
1316                                  * originally asked for in a bounded sort.      This is because
1317                                  * returning EOF here might be the wrong thing.
1318                                  */
1319                                 if (state->bounded && state->current >= state->bound)
1320                                         elog(ERROR, "retrieved too many tuples in a bounded sort");
1321
1322                                 return false;
1323                         }
1324                         else
1325                         {
1326                                 if (state->current <= 0)
1327                                         return false;
1328
1329                                 /*
1330                                  * if all tuples are fetched already then we return last
1331                                  * tuple, else - tuple before last returned.
1332                                  */
1333                                 if (state->eof_reached)
1334                                         state->eof_reached = false;
1335                                 else
1336                                 {
1337                                         state->current--;       /* last returned tuple */
1338                                         if (state->current <= 0)
1339                                                 return false;
1340                                 }
1341                                 *stup = state->memtuples[state->current - 1];
1342                                 return true;
1343                         }
1344                         break;
1345
1346                 case TSS_SORTEDONTAPE:
1347                         Assert(forward || state->randomAccess);
1348                         *should_free = true;
1349                         if (forward)
1350                         {
1351                                 if (state->eof_reached)
1352                                         return false;
1353                                 if ((tuplen = getlen(state, state->result_tape, true)) != 0)
1354                                 {
1355                                         READTUP(state, stup, state->result_tape, tuplen);
1356                                         return true;
1357                                 }
1358                                 else
1359                                 {
1360                                         state->eof_reached = true;
1361                                         return false;
1362                                 }
1363                         }
1364
1365                         /*
1366                          * Backward.
1367                          *
1368                          * if all tuples are fetched already then we return last tuple,
1369                          * else - tuple before last returned.
1370                          */
1371                         if (state->eof_reached)
1372                         {
1373                                 /*
1374                                  * Seek position is pointing just past the zero tuplen at the
1375                                  * end of file; back up to fetch last tuple's ending length
1376                                  * word.  If seek fails we must have a completely empty file.
1377                                  */
1378                                 if (!LogicalTapeBackspace(state->tapeset,
1379                                                                                   state->result_tape,
1380                                                                                   2 * sizeof(unsigned int)))
1381                                         return false;
1382                                 state->eof_reached = false;
1383                         }
1384                         else
1385                         {
1386                                 /*
1387                                  * Back up and fetch previously-returned tuple's ending length
1388                                  * word.  If seek fails, assume we are at start of file.
1389                                  */
1390                                 if (!LogicalTapeBackspace(state->tapeset,
1391                                                                                   state->result_tape,
1392                                                                                   sizeof(unsigned int)))
1393                                         return false;
1394                                 tuplen = getlen(state, state->result_tape, false);
1395
1396                                 /*
1397                                  * Back up to get ending length word of tuple before it.
1398                                  */
1399                                 if (!LogicalTapeBackspace(state->tapeset,
1400                                                                                   state->result_tape,
1401                                                                                   tuplen + 2 * sizeof(unsigned int)))
1402                                 {
1403                                         /*
1404                                          * If that fails, presumably the prev tuple is the first
1405                                          * in the file.  Back up so that it becomes next to read
1406                                          * in forward direction (not obviously right, but that is
1407                                          * what in-memory case does).
1408                                          */
1409                                         if (!LogicalTapeBackspace(state->tapeset,
1410                                                                                           state->result_tape,
1411                                                                                           tuplen + sizeof(unsigned int)))
1412                                                 elog(ERROR, "bogus tuple length in backward scan");
1413                                         return false;
1414                                 }
1415                         }
1416
1417                         tuplen = getlen(state, state->result_tape, false);
1418
1419                         /*
1420                          * Now we have the length of the prior tuple, back up and read it.
1421                          * Note: READTUP expects we are positioned after the initial
1422                          * length word of the tuple, so back up to that point.
1423                          */
1424                         if (!LogicalTapeBackspace(state->tapeset,
1425                                                                           state->result_tape,
1426                                                                           tuplen))
1427                                 elog(ERROR, "bogus tuple length in backward scan");
1428                         READTUP(state, stup, state->result_tape, tuplen);
1429                         return true;
1430
1431                 case TSS_FINALMERGE:
1432                         Assert(forward);
1433                         *should_free = true;
1434
1435                         /*
1436                          * This code should match the inner loop of mergeonerun().
1437                          */
1438                         if (state->memtupcount > 0)
1439                         {
1440                                 int                     srcTape = state->memtuples[0].tupindex;
1441                                 Size            tuplen;
1442                                 int                     tupIndex;
1443                                 SortTuple  *newtup;
1444
1445                                 *stup = state->memtuples[0];
1446                                 /* returned tuple is no longer counted in our memory space */
1447                                 if (stup->tuple)
1448                                 {
1449                                         tuplen = GetMemoryChunkSpace(stup->tuple);
1450                                         state->availMem += tuplen;
1451                                         state->mergeavailmem[srcTape] += tuplen;
1452                                 }
1453                                 tuplesort_heap_siftup(state, false);
1454                                 if ((tupIndex = state->mergenext[srcTape]) == 0)
1455                                 {
1456                                         /*
1457                                          * out of preloaded data on this tape, try to read more
1458                                          *
1459                                          * Unlike mergeonerun(), we only preload from the single
1460                                          * tape that's run dry.  See mergepreread() comments.
1461                                          */
1462                                         mergeprereadone(state, srcTape);
1463
1464                                         /*
1465                                          * if still no data, we've reached end of run on this tape
1466                                          */
1467                                         if ((tupIndex = state->mergenext[srcTape]) == 0)
1468                                                 return true;
1469                                 }
1470                                 /* pull next preread tuple from list, insert in heap */
1471                                 newtup = &state->memtuples[tupIndex];
1472                                 state->mergenext[srcTape] = newtup->tupindex;
1473                                 if (state->mergenext[srcTape] == 0)
1474                                         state->mergelast[srcTape] = 0;
1475                                 tuplesort_heap_insert(state, newtup, srcTape, false);
1476                                 /* put the now-unused memtuples entry on the freelist */
1477                                 newtup->tupindex = state->mergefreelist;
1478                                 state->mergefreelist = tupIndex;
1479                                 state->mergeavailslots[srcTape]++;
1480                                 return true;
1481                         }
1482                         return false;
1483
1484                 default:
1485                         elog(ERROR, "invalid tuplesort state");
1486                         return false;           /* keep compiler quiet */
1487         }
1488 }
1489
1490 /*
1491  * Fetch the next tuple in either forward or back direction.
1492  * If successful, put tuple in slot and return TRUE; else, clear the slot
1493  * and return FALSE.
1494  */
1495 bool
1496 tuplesort_gettupleslot(Tuplesortstate *state, bool forward,
1497                                            TupleTableSlot *slot)
1498 {
1499         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1500         SortTuple       stup;
1501         bool            should_free;
1502
1503         if (!tuplesort_gettuple_common(state, forward, &stup, &should_free))
1504                 stup.tuple = NULL;
1505
1506         MemoryContextSwitchTo(oldcontext);
1507
1508         if (stup.tuple)
1509         {
1510                 ExecStoreMinimalTuple((MinimalTuple) stup.tuple, slot, should_free);
1511                 return true;
1512         }
1513         else
1514         {
1515                 ExecClearTuple(slot);
1516                 return false;
1517         }
1518 }
1519
1520 /*
1521  * Fetch the next tuple in either forward or back direction.
1522  * Returns NULL if no more tuples.      If *should_free is set, the
1523  * caller must pfree the returned tuple when done with it.
1524  */
1525 HeapTuple
1526 tuplesort_getheaptuple(Tuplesortstate *state, bool forward, bool *should_free)
1527 {
1528         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1529         SortTuple       stup;
1530
1531         if (!tuplesort_gettuple_common(state, forward, &stup, should_free))
1532                 stup.tuple = NULL;
1533
1534         MemoryContextSwitchTo(oldcontext);
1535
1536         return stup.tuple;
1537 }
1538
1539 /*
1540  * Fetch the next index tuple in either forward or back direction.
1541  * Returns NULL if no more tuples.      If *should_free is set, the
1542  * caller must pfree the returned tuple when done with it.
1543  */
1544 IndexTuple
1545 tuplesort_getindextuple(Tuplesortstate *state, bool forward,
1546                                                 bool *should_free)
1547 {
1548         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1549         SortTuple       stup;
1550
1551         if (!tuplesort_gettuple_common(state, forward, &stup, should_free))
1552                 stup.tuple = NULL;
1553
1554         MemoryContextSwitchTo(oldcontext);
1555
1556         return (IndexTuple) stup.tuple;
1557 }
1558
1559 /*
1560  * Fetch the next Datum in either forward or back direction.
1561  * Returns FALSE if no more datums.
1562  *
1563  * If the Datum is pass-by-ref type, the returned value is freshly palloc'd
1564  * and is now owned by the caller.
1565  */
1566 bool
1567 tuplesort_getdatum(Tuplesortstate *state, bool forward,
1568                                    Datum *val, bool *isNull)
1569 {
1570         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1571         SortTuple       stup;
1572         bool            should_free;
1573
1574         if (!tuplesort_gettuple_common(state, forward, &stup, &should_free))
1575         {
1576                 MemoryContextSwitchTo(oldcontext);
1577                 return false;
1578         }
1579
1580         if (stup.isnull1 || state->datumTypeByVal)
1581         {
1582                 *val = stup.datum1;
1583                 *isNull = stup.isnull1;
1584         }
1585         else
1586         {
1587                 if (should_free)
1588                         *val = stup.datum1;
1589                 else
1590                         *val = datumCopy(stup.datum1, false, state->datumTypeLen);
1591                 *isNull = false;
1592         }
1593
1594         MemoryContextSwitchTo(oldcontext);
1595
1596         return true;
1597 }
1598
1599 /*
1600  * tuplesort_merge_order - report merge order we'll use for given memory
1601  * (note: "merge order" just means the number of input tapes in the merge).
1602  *
1603  * This is exported for use by the planner.  allowedMem is in bytes.
1604  */
1605 int
1606 tuplesort_merge_order(long allowedMem)
1607 {
1608         int                     mOrder;
1609
1610         /*
1611          * We need one tape for each merge input, plus another one for the output,
1612          * and each of these tapes needs buffer space.  In addition we want
1613          * MERGE_BUFFER_SIZE workspace per input tape (but the output tape doesn't
1614          * count).
1615          *
1616          * Note: you might be thinking we need to account for the memtuples[]
1617          * array in this calculation, but we effectively treat that as part of the
1618          * MERGE_BUFFER_SIZE workspace.
1619          */
1620         mOrder = (allowedMem - TAPE_BUFFER_OVERHEAD) /
1621                 (MERGE_BUFFER_SIZE + TAPE_BUFFER_OVERHEAD);
1622
1623         /* Even in minimum memory, use at least a MINORDER merge */
1624         mOrder = Max(mOrder, MINORDER);
1625
1626         return mOrder;
1627 }
1628
1629 /*
1630  * inittapes - initialize for tape sorting.
1631  *
1632  * This is called only if we have found we don't have room to sort in memory.
1633  */
1634 static void
1635 inittapes(Tuplesortstate *state)
1636 {
1637         int                     maxTapes,
1638                                 ntuples,
1639                                 j;
1640         long            tapeSpace;
1641
1642         /* Compute number of tapes to use: merge order plus 1 */
1643         maxTapes = tuplesort_merge_order(state->allowedMem) + 1;
1644
1645         /*
1646          * We must have at least 2*maxTapes slots in the memtuples[] array, else
1647          * we'd not have room for merge heap plus preread.  It seems unlikely that
1648          * this case would ever occur, but be safe.
1649          */
1650         maxTapes = Min(maxTapes, state->memtupsize / 2);
1651
1652         state->maxTapes = maxTapes;
1653         state->tapeRange = maxTapes - 1;
1654
1655 #ifdef TRACE_SORT
1656         if (trace_sort)
1657                 elog(LOG, "switching to external sort with %d tapes: %s",
1658                          maxTapes, pg_rusage_show(&state->ru_start));
1659 #endif
1660
1661         /*
1662          * Decrease availMem to reflect the space needed for tape buffers; but
1663          * don't decrease it to the point that we have no room for tuples. (That
1664          * case is only likely to occur if sorting pass-by-value Datums; in all
1665          * other scenarios the memtuples[] array is unlikely to occupy more than
1666          * half of allowedMem.  In the pass-by-value case it's not important to
1667          * account for tuple space, so we don't care if LACKMEM becomes
1668          * inaccurate.)
1669          */
1670         tapeSpace = maxTapes * TAPE_BUFFER_OVERHEAD;
1671         if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
1672                 USEMEM(state, tapeSpace);
1673
1674         /*
1675          * Make sure that the temp file(s) underlying the tape set are created in
1676          * suitable temp tablespaces.
1677          */
1678         PrepareTempTablespaces();
1679
1680         /*
1681          * Create the tape set and allocate the per-tape data arrays.
1682          */
1683         state->tapeset = LogicalTapeSetCreate(maxTapes);
1684
1685         state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool));
1686         state->mergenext = (int *) palloc0(maxTapes * sizeof(int));
1687         state->mergelast = (int *) palloc0(maxTapes * sizeof(int));
1688         state->mergeavailslots = (int *) palloc0(maxTapes * sizeof(int));
1689         state->mergeavailmem = (long *) palloc0(maxTapes * sizeof(long));
1690         state->tp_fib = (int *) palloc0(maxTapes * sizeof(int));
1691         state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
1692         state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int));
1693         state->tp_tapenum = (int *) palloc0(maxTapes * sizeof(int));
1694
1695         /*
1696          * Convert the unsorted contents of memtuples[] into a heap. Each tuple is
1697          * marked as belonging to run number zero.
1698          *
1699          * NOTE: we pass false for checkIndex since there's no point in comparing
1700          * indexes in this step, even though we do intend the indexes to be part
1701          * of the sort key...
1702          */
1703         ntuples = state->memtupcount;
1704         state->memtupcount = 0;         /* make the heap empty */
1705         for (j = 0; j < ntuples; j++)
1706         {
1707                 /* Must copy source tuple to avoid possible overwrite */
1708                 SortTuple       stup = state->memtuples[j];
1709
1710                 tuplesort_heap_insert(state, &stup, 0, false);
1711         }
1712         Assert(state->memtupcount == ntuples);
1713
1714         state->currentRun = 0;
1715
1716         /*
1717          * Initialize variables of Algorithm D (step D1).
1718          */
1719         for (j = 0; j < maxTapes; j++)
1720         {
1721                 state->tp_fib[j] = 1;
1722                 state->tp_runs[j] = 0;
1723                 state->tp_dummy[j] = 1;
1724                 state->tp_tapenum[j] = j;
1725         }
1726         state->tp_fib[state->tapeRange] = 0;
1727         state->tp_dummy[state->tapeRange] = 0;
1728
1729         state->Level = 1;
1730         state->destTape = 0;
1731
1732         state->status = TSS_BUILDRUNS;
1733 }
1734
1735 /*
1736  * selectnewtape -- select new tape for new initial run.
1737  *
1738  * This is called after finishing a run when we know another run
1739  * must be started.  This implements steps D3, D4 of Algorithm D.
1740  */
1741 static void
1742 selectnewtape(Tuplesortstate *state)
1743 {
1744         int                     j;
1745         int                     a;
1746
1747         /* Step D3: advance j (destTape) */
1748         if (state->tp_dummy[state->destTape] < state->tp_dummy[state->destTape + 1])
1749         {
1750                 state->destTape++;
1751                 return;
1752         }
1753         if (state->tp_dummy[state->destTape] != 0)
1754         {
1755                 state->destTape = 0;
1756                 return;
1757         }
1758
1759         /* Step D4: increase level */
1760         state->Level++;
1761         a = state->tp_fib[0];
1762         for (j = 0; j < state->tapeRange; j++)
1763         {
1764                 state->tp_dummy[j] = a + state->tp_fib[j + 1] - state->tp_fib[j];
1765                 state->tp_fib[j] = a + state->tp_fib[j + 1];
1766         }
1767         state->destTape = 0;
1768 }
1769
1770 /*
1771  * mergeruns -- merge all the completed initial runs.
1772  *
1773  * This implements steps D5, D6 of Algorithm D.  All input data has
1774  * already been written to initial runs on tape (see dumptuples).
1775  */
1776 static void
1777 mergeruns(Tuplesortstate *state)
1778 {
1779         int                     tapenum,
1780                                 svTape,
1781                                 svRuns,
1782                                 svDummy;
1783
1784         Assert(state->status == TSS_BUILDRUNS);
1785         Assert(state->memtupcount == 0);
1786
1787         /*
1788          * If we produced only one initial run (quite likely if the total data
1789          * volume is between 1X and 2X workMem), we can just use that tape as the
1790          * finished output, rather than doing a useless merge.  (This obvious
1791          * optimization is not in Knuth's algorithm.)
1792          */
1793         if (state->currentRun == 1)
1794         {
1795                 state->result_tape = state->tp_tapenum[state->destTape];
1796                 /* must freeze and rewind the finished output tape */
1797                 LogicalTapeFreeze(state->tapeset, state->result_tape);
1798                 state->status = TSS_SORTEDONTAPE;
1799                 return;
1800         }
1801
1802         /* End of step D2: rewind all output tapes to prepare for merging */
1803         for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
1804                 LogicalTapeRewind(state->tapeset, tapenum, false);
1805
1806         for (;;)
1807         {
1808                 /*
1809                  * At this point we know that tape[T] is empty.  If there's just one
1810                  * (real or dummy) run left on each input tape, then only one merge
1811                  * pass remains.  If we don't have to produce a materialized sorted
1812                  * tape, we can stop at this point and do the final merge on-the-fly.
1813                  */
1814                 if (!state->randomAccess)
1815                 {
1816                         bool            allOneRun = true;
1817
1818                         Assert(state->tp_runs[state->tapeRange] == 0);
1819                         for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
1820                         {
1821                                 if (state->tp_runs[tapenum] + state->tp_dummy[tapenum] != 1)
1822                                 {
1823                                         allOneRun = false;
1824                                         break;
1825                                 }
1826                         }
1827                         if (allOneRun)
1828                         {
1829                                 /* Tell logtape.c we won't be writing anymore */
1830                                 LogicalTapeSetForgetFreeSpace(state->tapeset);
1831                                 /* Initialize for the final merge pass */
1832                                 beginmerge(state);
1833                                 state->status = TSS_FINALMERGE;
1834                                 return;
1835                         }
1836                 }
1837
1838                 /* Step D5: merge runs onto tape[T] until tape[P] is empty */
1839                 while (state->tp_runs[state->tapeRange - 1] ||
1840                            state->tp_dummy[state->tapeRange - 1])
1841                 {
1842                         bool            allDummy = true;
1843
1844                         for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
1845                         {
1846                                 if (state->tp_dummy[tapenum] == 0)
1847                                 {
1848                                         allDummy = false;
1849                                         break;
1850                                 }
1851                         }
1852
1853                         if (allDummy)
1854                         {
1855                                 state->tp_dummy[state->tapeRange]++;
1856                                 for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
1857                                         state->tp_dummy[tapenum]--;
1858                         }
1859                         else
1860                                 mergeonerun(state);
1861                 }
1862
1863                 /* Step D6: decrease level */
1864                 if (--state->Level == 0)
1865                         break;
1866                 /* rewind output tape T to use as new input */
1867                 LogicalTapeRewind(state->tapeset, state->tp_tapenum[state->tapeRange],
1868                                                   false);
1869                 /* rewind used-up input tape P, and prepare it for write pass */
1870                 LogicalTapeRewind(state->tapeset, state->tp_tapenum[state->tapeRange - 1],
1871                                                   true);
1872                 state->tp_runs[state->tapeRange - 1] = 0;
1873
1874                 /*
1875                  * reassign tape units per step D6; note we no longer care about A[]
1876                  */
1877                 svTape = state->tp_tapenum[state->tapeRange];
1878                 svDummy = state->tp_dummy[state->tapeRange];
1879                 svRuns = state->tp_runs[state->tapeRange];
1880                 for (tapenum = state->tapeRange; tapenum > 0; tapenum--)
1881                 {
1882                         state->tp_tapenum[tapenum] = state->tp_tapenum[tapenum - 1];
1883                         state->tp_dummy[tapenum] = state->tp_dummy[tapenum - 1];
1884                         state->tp_runs[tapenum] = state->tp_runs[tapenum - 1];
1885                 }
1886                 state->tp_tapenum[0] = svTape;
1887                 state->tp_dummy[0] = svDummy;
1888                 state->tp_runs[0] = svRuns;
1889         }
1890
1891         /*
1892          * Done.  Knuth says that the result is on TAPE[1], but since we exited
1893          * the loop without performing the last iteration of step D6, we have not
1894          * rearranged the tape unit assignment, and therefore the result is on
1895          * TAPE[T].  We need to do it this way so that we can freeze the final
1896          * output tape while rewinding it.      The last iteration of step D6 would be
1897          * a waste of cycles anyway...
1898          */
1899         state->result_tape = state->tp_tapenum[state->tapeRange];
1900         LogicalTapeFreeze(state->tapeset, state->result_tape);
1901         state->status = TSS_SORTEDONTAPE;
1902 }
1903
1904 /*
1905  * Merge one run from each input tape, except ones with dummy runs.
1906  *
1907  * This is the inner loop of Algorithm D step D5.  We know that the
1908  * output tape is TAPE[T].
1909  */
1910 static void
1911 mergeonerun(Tuplesortstate *state)
1912 {
1913         int                     destTape = state->tp_tapenum[state->tapeRange];
1914         int                     srcTape;
1915         int                     tupIndex;
1916         SortTuple  *tup;
1917         long            priorAvail,
1918                                 spaceFreed;
1919
1920         /*
1921          * Start the merge by loading one tuple from each active source tape into
1922          * the heap.  We can also decrease the input run/dummy run counts.
1923          */
1924         beginmerge(state);
1925
1926         /*
1927          * Execute merge by repeatedly extracting lowest tuple in heap, writing it
1928          * out, and replacing it with next tuple from same tape (if there is
1929          * another one).
1930          */
1931         while (state->memtupcount > 0)
1932         {
1933                 /* write the tuple to destTape */
1934                 priorAvail = state->availMem;
1935                 srcTape = state->memtuples[0].tupindex;
1936                 WRITETUP(state, destTape, &state->memtuples[0]);
1937                 /* writetup adjusted total free space, now fix per-tape space */
1938                 spaceFreed = state->availMem - priorAvail;
1939                 state->mergeavailmem[srcTape] += spaceFreed;
1940                 /* compact the heap */
1941                 tuplesort_heap_siftup(state, false);
1942                 if ((tupIndex = state->mergenext[srcTape]) == 0)
1943                 {
1944                         /* out of preloaded data on this tape, try to read more */
1945                         mergepreread(state);
1946                         /* if still no data, we've reached end of run on this tape */
1947                         if ((tupIndex = state->mergenext[srcTape]) == 0)
1948                                 continue;
1949                 }
1950                 /* pull next preread tuple from list, insert in heap */
1951                 tup = &state->memtuples[tupIndex];
1952                 state->mergenext[srcTape] = tup->tupindex;
1953                 if (state->mergenext[srcTape] == 0)
1954                         state->mergelast[srcTape] = 0;
1955                 tuplesort_heap_insert(state, tup, srcTape, false);
1956                 /* put the now-unused memtuples entry on the freelist */
1957                 tup->tupindex = state->mergefreelist;
1958                 state->mergefreelist = tupIndex;
1959                 state->mergeavailslots[srcTape]++;
1960         }
1961
1962         /*
1963          * When the heap empties, we're done.  Write an end-of-run marker on the
1964          * output tape, and increment its count of real runs.
1965          */
1966         markrunend(state, destTape);
1967         state->tp_runs[state->tapeRange]++;
1968
1969 #ifdef TRACE_SORT
1970         if (trace_sort)
1971                 elog(LOG, "finished %d-way merge step: %s", state->activeTapes,
1972                          pg_rusage_show(&state->ru_start));
1973 #endif
1974 }
1975
1976 /*
1977  * beginmerge - initialize for a merge pass
1978  *
1979  * We decrease the counts of real and dummy runs for each tape, and mark
1980  * which tapes contain active input runs in mergeactive[].      Then, load
1981  * as many tuples as we can from each active input tape, and finally
1982  * fill the merge heap with the first tuple from each active tape.
1983  */
1984 static void
1985 beginmerge(Tuplesortstate *state)
1986 {
1987         int                     activeTapes;
1988         int                     tapenum;
1989         int                     srcTape;
1990         int                     slotsPerTape;
1991         long            spacePerTape;
1992
1993         /* Heap should be empty here */
1994         Assert(state->memtupcount == 0);
1995
1996         /* Adjust run counts and mark the active tapes */
1997         memset(state->mergeactive, 0,
1998                    state->maxTapes * sizeof(*state->mergeactive));
1999         activeTapes = 0;
2000         for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
2001         {
2002                 if (state->tp_dummy[tapenum] > 0)
2003                         state->tp_dummy[tapenum]--;
2004                 else
2005                 {
2006                         Assert(state->tp_runs[tapenum] > 0);
2007                         state->tp_runs[tapenum]--;
2008                         srcTape = state->tp_tapenum[tapenum];
2009                         state->mergeactive[srcTape] = true;
2010                         activeTapes++;
2011                 }
2012         }
2013         state->activeTapes = activeTapes;
2014
2015         /* Clear merge-pass state variables */
2016         memset(state->mergenext, 0,
2017                    state->maxTapes * sizeof(*state->mergenext));
2018         memset(state->mergelast, 0,
2019                    state->maxTapes * sizeof(*state->mergelast));
2020         state->mergefreelist = 0;       /* nothing in the freelist */
2021         state->mergefirstfree = activeTapes;            /* 1st slot avail for preread */
2022
2023         /*
2024          * Initialize space allocation to let each active input tape have an equal
2025          * share of preread space.
2026          */
2027         Assert(activeTapes > 0);
2028         slotsPerTape = (state->memtupsize - state->mergefirstfree) / activeTapes;
2029         Assert(slotsPerTape > 0);
2030         spacePerTape = state->availMem / activeTapes;
2031         for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
2032         {
2033                 if (state->mergeactive[srcTape])
2034                 {
2035                         state->mergeavailslots[srcTape] = slotsPerTape;
2036                         state->mergeavailmem[srcTape] = spacePerTape;
2037                 }
2038         }
2039
2040         /*
2041          * Preread as many tuples as possible (and at least one) from each active
2042          * tape
2043          */
2044         mergepreread(state);
2045
2046         /* Load the merge heap with the first tuple from each input tape */
2047         for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
2048         {
2049                 int                     tupIndex = state->mergenext[srcTape];
2050                 SortTuple  *tup;
2051
2052                 if (tupIndex)
2053                 {
2054                         tup = &state->memtuples[tupIndex];
2055                         state->mergenext[srcTape] = tup->tupindex;
2056                         if (state->mergenext[srcTape] == 0)
2057                                 state->mergelast[srcTape] = 0;
2058                         tuplesort_heap_insert(state, tup, srcTape, false);
2059                         /* put the now-unused memtuples entry on the freelist */
2060                         tup->tupindex = state->mergefreelist;
2061                         state->mergefreelist = tupIndex;
2062                         state->mergeavailslots[srcTape]++;
2063                 }
2064         }
2065 }
2066
2067 /*
2068  * mergepreread - load tuples from merge input tapes
2069  *
2070  * This routine exists to improve sequentiality of reads during a merge pass,
2071  * as explained in the header comments of this file.  Load tuples from each
2072  * active source tape until the tape's run is exhausted or it has used up
2073  * its fair share of available memory.  In any case, we guarantee that there
2074  * is at least one preread tuple available from each unexhausted input tape.
2075  *
2076  * We invoke this routine at the start of a merge pass for initial load,
2077  * and then whenever any tape's preread data runs out.  Note that we load
2078  * as much data as possible from all tapes, not just the one that ran out.
2079  * This is because logtape.c works best with a usage pattern that alternates
2080  * between reading a lot of data and writing a lot of data, so whenever we
2081  * are forced to read, we should fill working memory completely.
2082  *
2083  * In FINALMERGE state, we *don't* use this routine, but instead just preread
2084  * from the single tape that ran dry.  There's no read/write alternation in
2085  * that state and so no point in scanning through all the tapes to fix one.
2086  * (Moreover, there may be quite a lot of inactive tapes in that state, since
2087  * we might have had many fewer runs than tapes.  In a regular tape-to-tape
2088  * merge we can expect most of the tapes to be active.)
2089  */
2090 static void
2091 mergepreread(Tuplesortstate *state)
2092 {
2093         int                     srcTape;
2094
2095         for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
2096                 mergeprereadone(state, srcTape);
2097 }
2098
2099 /*
2100  * mergeprereadone - load tuples from one merge input tape
2101  *
2102  * Read tuples from the specified tape until it has used up its free memory
2103  * or array slots; but ensure that we have at least one tuple, if any are
2104  * to be had.
2105  */
2106 static void
2107 mergeprereadone(Tuplesortstate *state, int srcTape)
2108 {
2109         unsigned int tuplen;
2110         SortTuple       stup;
2111         int                     tupIndex;
2112         long            priorAvail,
2113                                 spaceUsed;
2114
2115         if (!state->mergeactive[srcTape])
2116                 return;                                 /* tape's run is already exhausted */
2117         priorAvail = state->availMem;
2118         state->availMem = state->mergeavailmem[srcTape];
2119         while ((state->mergeavailslots[srcTape] > 0 && !LACKMEM(state)) ||
2120                    state->mergenext[srcTape] == 0)
2121         {
2122                 /* read next tuple, if any */
2123                 if ((tuplen = getlen(state, srcTape, true)) == 0)
2124                 {
2125                         state->mergeactive[srcTape] = false;
2126                         break;
2127                 }
2128                 READTUP(state, &stup, srcTape, tuplen);
2129                 /* find a free slot in memtuples[] for it */
2130                 tupIndex = state->mergefreelist;
2131                 if (tupIndex)
2132                         state->mergefreelist = state->memtuples[tupIndex].tupindex;
2133                 else
2134                 {
2135                         tupIndex = state->mergefirstfree++;
2136                         Assert(tupIndex < state->memtupsize);
2137                 }
2138                 state->mergeavailslots[srcTape]--;
2139                 /* store tuple, append to list for its tape */
2140                 stup.tupindex = 0;
2141                 state->memtuples[tupIndex] = stup;
2142                 if (state->mergelast[srcTape])
2143                         state->memtuples[state->mergelast[srcTape]].tupindex = tupIndex;
2144                 else
2145                         state->mergenext[srcTape] = tupIndex;
2146                 state->mergelast[srcTape] = tupIndex;
2147         }
2148         /* update per-tape and global availmem counts */
2149         spaceUsed = state->mergeavailmem[srcTape] - state->availMem;
2150         state->mergeavailmem[srcTape] = state->availMem;
2151         state->availMem = priorAvail - spaceUsed;
2152 }
2153
2154 /*
2155  * dumptuples - remove tuples from heap and write to tape
2156  *
2157  * This is used during initial-run building, but not during merging.
2158  *
2159  * When alltuples = false, dump only enough tuples to get under the
2160  * availMem limit (and leave at least one tuple in the heap in any case,
2161  * since puttuple assumes it always has a tuple to compare to).  We also
2162  * insist there be at least one free slot in the memtuples[] array.
2163  *
2164  * When alltuples = true, dump everything currently in memory.
2165  * (This case is only used at end of input data.)
2166  *
2167  * If we empty the heap, close out the current run and return (this should
2168  * only happen at end of input data).  If we see that the tuple run number
2169  * at the top of the heap has changed, start a new run.
2170  */
2171 static void
2172 dumptuples(Tuplesortstate *state, bool alltuples)
2173 {
2174         while (alltuples ||
2175                    (LACKMEM(state) && state->memtupcount > 1) ||
2176                    state->memtupcount >= state->memtupsize)
2177         {
2178                 /*
2179                  * Dump the heap's frontmost entry, and sift up to remove it from the
2180                  * heap.
2181                  */
2182                 Assert(state->memtupcount > 0);
2183                 WRITETUP(state, state->tp_tapenum[state->destTape],
2184                                  &state->memtuples[0]);
2185                 tuplesort_heap_siftup(state, true);
2186
2187                 /*
2188                  * If the heap is empty *or* top run number has changed, we've
2189                  * finished the current run.
2190                  */
2191                 if (state->memtupcount == 0 ||
2192                         state->currentRun != state->memtuples[0].tupindex)
2193                 {
2194                         markrunend(state, state->tp_tapenum[state->destTape]);
2195                         state->currentRun++;
2196                         state->tp_runs[state->destTape]++;
2197                         state->tp_dummy[state->destTape]--; /* per Alg D step D2 */
2198
2199 #ifdef TRACE_SORT
2200                         if (trace_sort)
2201                                 elog(LOG, "finished writing%s run %d to tape %d: %s",
2202                                          (state->memtupcount == 0) ? " final" : "",
2203                                          state->currentRun, state->destTape,
2204                                          pg_rusage_show(&state->ru_start));
2205 #endif
2206
2207                         /*
2208                          * Done if heap is empty, else prepare for new run.
2209                          */
2210                         if (state->memtupcount == 0)
2211                                 break;
2212                         Assert(state->currentRun == state->memtuples[0].tupindex);
2213                         selectnewtape(state);
2214                 }
2215         }
2216 }
2217
2218 /*
2219  * tuplesort_rescan             - rewind and replay the scan
2220  */
2221 void
2222 tuplesort_rescan(Tuplesortstate *state)
2223 {
2224         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
2225
2226         Assert(state->randomAccess);
2227
2228         switch (state->status)
2229         {
2230                 case TSS_SORTEDINMEM:
2231                         state->current = 0;
2232                         state->eof_reached = false;
2233                         state->markpos_offset = 0;
2234                         state->markpos_eof = false;
2235                         break;
2236                 case TSS_SORTEDONTAPE:
2237                         LogicalTapeRewind(state->tapeset,
2238                                                           state->result_tape,
2239                                                           false);
2240                         state->eof_reached = false;
2241                         state->markpos_block = 0L;
2242                         state->markpos_offset = 0;
2243                         state->markpos_eof = false;
2244                         break;
2245                 default:
2246                         elog(ERROR, "invalid tuplesort state");
2247                         break;
2248         }
2249
2250         MemoryContextSwitchTo(oldcontext);
2251 }
2252
2253 /*
2254  * tuplesort_markpos    - saves current position in the merged sort file
2255  */
2256 void
2257 tuplesort_markpos(Tuplesortstate *state)
2258 {
2259         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
2260
2261         Assert(state->randomAccess);
2262
2263         switch (state->status)
2264         {
2265                 case TSS_SORTEDINMEM:
2266                         state->markpos_offset = state->current;
2267                         state->markpos_eof = state->eof_reached;
2268                         break;
2269                 case TSS_SORTEDONTAPE:
2270                         LogicalTapeTell(state->tapeset,
2271                                                         state->result_tape,
2272                                                         &state->markpos_block,
2273                                                         &state->markpos_offset);
2274                         state->markpos_eof = state->eof_reached;
2275                         break;
2276                 default:
2277                         elog(ERROR, "invalid tuplesort state");
2278                         break;
2279         }
2280
2281         MemoryContextSwitchTo(oldcontext);
2282 }
2283
2284 /*
2285  * tuplesort_restorepos - restores current position in merged sort file to
2286  *                                                last saved position
2287  */
2288 void
2289 tuplesort_restorepos(Tuplesortstate *state)
2290 {
2291         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
2292
2293         Assert(state->randomAccess);
2294
2295         switch (state->status)
2296         {
2297                 case TSS_SORTEDINMEM:
2298                         state->current = state->markpos_offset;
2299                         state->eof_reached = state->markpos_eof;
2300                         break;
2301                 case TSS_SORTEDONTAPE:
2302                         if (!LogicalTapeSeek(state->tapeset,
2303                                                                  state->result_tape,
2304                                                                  state->markpos_block,
2305                                                                  state->markpos_offset))
2306                                 elog(ERROR, "tuplesort_restorepos failed");
2307                         state->eof_reached = state->markpos_eof;
2308                         break;
2309                 default:
2310                         elog(ERROR, "invalid tuplesort state");
2311                         break;
2312         }
2313
2314         MemoryContextSwitchTo(oldcontext);
2315 }
2316
2317 /*
2318  * tuplesort_get_stats - extract summary statistics
2319  *
2320  * This can be called after tuplesort_performsort() finishes to obtain
2321  * printable summary information about how the sort was performed.
2322  * spaceUsed is measured in kilobytes.
2323  */
2324 void
2325 tuplesort_get_stats(Tuplesortstate *state,
2326                                         const char **sortMethod,
2327                                         const char **spaceType,
2328                                         long *spaceUsed)
2329 {
2330         /*
2331          * Note: it might seem we should provide both memory and disk usage for a
2332          * disk-based sort.  However, the current code doesn't track memory space
2333          * accurately once we have begun to return tuples to the caller (since we
2334          * don't account for pfree's the caller is expected to do), so we cannot
2335          * rely on availMem in a disk sort.  This does not seem worth the overhead
2336          * to fix.      Is it worth creating an API for the memory context code to
2337          * tell us how much is actually used in sortcontext?
2338          */
2339         if (state->tapeset)
2340         {
2341                 *spaceType = "Disk";
2342                 *spaceUsed = LogicalTapeSetBlocks(state->tapeset) * (BLCKSZ / 1024);
2343         }
2344         else
2345         {
2346                 *spaceType = "Memory";
2347                 *spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
2348         }
2349
2350         switch (state->status)
2351         {
2352                 case TSS_SORTEDINMEM:
2353                         if (state->boundUsed)
2354                                 *sortMethod = "top-N heapsort";
2355                         else
2356                                 *sortMethod = "quicksort";
2357                         break;
2358                 case TSS_SORTEDONTAPE:
2359                         *sortMethod = "external sort";
2360                         break;
2361                 case TSS_FINALMERGE:
2362                         *sortMethod = "external merge";
2363                         break;
2364                 default:
2365                         *sortMethod = "still in progress";
2366                         break;
2367         }
2368 }
2369
2370
2371 /*
2372  * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
2373  *
2374  * Compare two SortTuples.      If checkIndex is true, use the tuple index
2375  * as the front of the sort key; otherwise, no.
2376  */
2377
2378 #define HEAPCOMPARE(tup1,tup2) \
2379         (checkIndex && ((tup1)->tupindex != (tup2)->tupindex) ? \
2380          ((tup1)->tupindex) - ((tup2)->tupindex) : \
2381          COMPARETUP(state, tup1, tup2))
2382
2383 /*
2384  * Convert the existing unordered array of SortTuples to a bounded heap,
2385  * discarding all but the smallest "state->bound" tuples.
2386  *
2387  * When working with a bounded heap, we want to keep the largest entry
2388  * at the root (array entry zero), instead of the smallest as in the normal
2389  * sort case.  This allows us to discard the largest entry cheaply.
2390  * Therefore, we temporarily reverse the sort direction.
2391  *
2392  * We assume that all entries in a bounded heap will always have tupindex
2393  * zero; it therefore doesn't matter that HEAPCOMPARE() doesn't reverse
2394  * the direction of comparison for tupindexes.
2395  */
2396 static void
2397 make_bounded_heap(Tuplesortstate *state)
2398 {
2399         int                     tupcount = state->memtupcount;
2400         int                     i;
2401
2402         Assert(state->status == TSS_INITIAL);
2403         Assert(state->bounded);
2404         Assert(tupcount >= state->bound);
2405
2406         /* Reverse sort direction so largest entry will be at root */
2407         REVERSEDIRECTION(state);
2408
2409         state->memtupcount = 0;         /* make the heap empty */
2410         for (i = 0; i < tupcount; i++)
2411         {
2412                 if (state->memtupcount >= state->bound &&
2413                   COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0)
2414                 {
2415                         /* New tuple would just get thrown out, so skip it */
2416                         free_sort_tuple(state, &state->memtuples[i]);
2417                 }
2418                 else
2419                 {
2420                         /* Insert next tuple into heap */
2421                         /* Must copy source tuple to avoid possible overwrite */
2422                         SortTuple       stup = state->memtuples[i];
2423
2424                         tuplesort_heap_insert(state, &stup, 0, false);
2425
2426                         /* If heap too full, discard largest entry */
2427                         if (state->memtupcount > state->bound)
2428                         {
2429                                 free_sort_tuple(state, &state->memtuples[0]);
2430                                 tuplesort_heap_siftup(state, false);
2431                         }
2432                 }
2433         }
2434
2435         Assert(state->memtupcount == state->bound);
2436         state->status = TSS_BOUNDED;
2437 }
2438
2439 /*
2440  * Convert the bounded heap to a properly-sorted array
2441  */
2442 static void
2443 sort_bounded_heap(Tuplesortstate *state)
2444 {
2445         int                     tupcount = state->memtupcount;
2446
2447         Assert(state->status == TSS_BOUNDED);
2448         Assert(state->bounded);
2449         Assert(tupcount == state->bound);
2450
2451         /*
2452          * We can unheapify in place because each sift-up will remove the largest
2453          * entry, which we can promptly store in the newly freed slot at the end.
2454          * Once we're down to a single-entry heap, we're done.
2455          */
2456         while (state->memtupcount > 1)
2457         {
2458                 SortTuple       stup = state->memtuples[0];
2459
2460                 /* this sifts-up the next-largest entry and decreases memtupcount */
2461                 tuplesort_heap_siftup(state, false);
2462                 state->memtuples[state->memtupcount] = stup;
2463         }
2464         state->memtupcount = tupcount;
2465
2466         /*
2467          * Reverse sort direction back to the original state.  This is not
2468          * actually necessary but seems like a good idea for tidiness.
2469          */
2470         REVERSEDIRECTION(state);
2471
2472         state->status = TSS_SORTEDINMEM;
2473         state->boundUsed = true;
2474 }
2475
2476 /*
2477  * Insert a new tuple into an empty or existing heap, maintaining the
2478  * heap invariant.      Caller is responsible for ensuring there's room.
2479  *
2480  * Note: we assume *tuple is a temporary variable that can be scribbled on.
2481  * For some callers, tuple actually points to a memtuples[] entry above the
2482  * end of the heap.  This is safe as long as it's not immediately adjacent
2483  * to the end of the heap (ie, in the [memtupcount] array entry) --- if it
2484  * is, it might get overwritten before being moved into the heap!
2485  */
2486 static void
2487 tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple,
2488                                           int tupleindex, bool checkIndex)
2489 {
2490         SortTuple  *memtuples;
2491         int                     j;
2492
2493         /*
2494          * Save the tupleindex --- see notes above about writing on *tuple. It's a
2495          * historical artifact that tupleindex is passed as a separate argument
2496          * and not in *tuple, but it's notationally convenient so let's leave it
2497          * that way.
2498          */
2499         tuple->tupindex = tupleindex;
2500
2501         memtuples = state->memtuples;
2502         Assert(state->memtupcount < state->memtupsize);
2503
2504         /*
2505          * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is
2506          * using 1-based array indexes, not 0-based.
2507          */
2508         j = state->memtupcount++;
2509         while (j > 0)
2510         {
2511                 int                     i = (j - 1) >> 1;
2512
2513                 if (HEAPCOMPARE(tuple, &memtuples[i]) >= 0)
2514                         break;
2515                 memtuples[j] = memtuples[i];
2516                 j = i;
2517         }
2518         memtuples[j] = *tuple;
2519 }
2520
2521 /*
2522  * The tuple at state->memtuples[0] has been removed from the heap.
2523  * Decrement memtupcount, and sift up to maintain the heap invariant.
2524  */
2525 static void
2526 tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex)
2527 {
2528         SortTuple  *memtuples = state->memtuples;
2529         SortTuple  *tuple;
2530         int                     i,
2531                                 n;
2532
2533         if (--state->memtupcount <= 0)
2534                 return;
2535         n = state->memtupcount;
2536         tuple = &memtuples[n];          /* tuple that must be reinserted */
2537         i = 0;                                          /* i is where the "hole" is */
2538         for (;;)
2539         {
2540                 int                     j = 2 * i + 1;
2541
2542                 if (j >= n)
2543                         break;
2544                 if (j + 1 < n &&
2545                         HEAPCOMPARE(&memtuples[j], &memtuples[j + 1]) > 0)
2546                         j++;
2547                 if (HEAPCOMPARE(tuple, &memtuples[j]) <= 0)
2548                         break;
2549                 memtuples[i] = memtuples[j];
2550                 i = j;
2551         }
2552         memtuples[i] = *tuple;
2553 }
2554
2555
2556 /*
2557  * Tape interface routines
2558  */
2559
2560 static unsigned int
2561 getlen(Tuplesortstate *state, int tapenum, bool eofOK)
2562 {
2563         unsigned int len;
2564
2565         if (LogicalTapeRead(state->tapeset, tapenum,
2566                                                 &len, sizeof(len)) != sizeof(len))
2567                 elog(ERROR, "unexpected end of tape");
2568         if (len == 0 && !eofOK)
2569                 elog(ERROR, "unexpected end of data");
2570         return len;
2571 }
2572
2573 static void
2574 markrunend(Tuplesortstate *state, int tapenum)
2575 {
2576         unsigned int len = 0;
2577
2578         LogicalTapeWrite(state->tapeset, tapenum, (void *) &len, sizeof(len));
2579 }
2580
2581
2582 /*
2583  * Inline-able copy of FunctionCall2Coll() to save some cycles in sorting.
2584  */
2585 static inline Datum
2586 myFunctionCall2Coll(FmgrInfo *flinfo, Oid collation, Datum arg1, Datum arg2)
2587 {
2588         FunctionCallInfoData fcinfo;
2589         Datum           result;
2590
2591         InitFunctionCallInfoData(fcinfo, flinfo, 2, collation, NULL, NULL);
2592
2593         fcinfo.arg[0] = arg1;
2594         fcinfo.arg[1] = arg2;
2595         fcinfo.argnull[0] = false;
2596         fcinfo.argnull[1] = false;
2597
2598         result = FunctionCallInvoke(&fcinfo);
2599
2600         /* Check for null result, since caller is clearly not expecting one */
2601         if (fcinfo.isnull)
2602                 elog(ERROR, "function %u returned NULL", fcinfo.flinfo->fn_oid);
2603
2604         return result;
2605 }
2606
2607 /*
2608  * Apply a sort function (by now converted to fmgr lookup form)
2609  * and return a 3-way comparison result.  This takes care of handling
2610  * reverse-sort and NULLs-ordering properly.  We assume that DESC and
2611  * NULLS_FIRST options are encoded in sk_flags the same way btree does it.
2612  */
2613 static inline int32
2614 inlineApplySortFunction(FmgrInfo *sortFunction, int sk_flags, Oid collation,
2615                                                 Datum datum1, bool isNull1,
2616                                                 Datum datum2, bool isNull2)
2617 {
2618         int32           compare;
2619
2620         if (isNull1)
2621         {
2622                 if (isNull2)
2623                         compare = 0;            /* NULL "=" NULL */
2624                 else if (sk_flags & SK_BT_NULLS_FIRST)
2625                         compare = -1;           /* NULL "<" NOT_NULL */
2626                 else
2627                         compare = 1;            /* NULL ">" NOT_NULL */
2628         }
2629         else if (isNull2)
2630         {
2631                 if (sk_flags & SK_BT_NULLS_FIRST)
2632                         compare = 1;            /* NOT_NULL ">" NULL */
2633                 else
2634                         compare = -1;           /* NOT_NULL "<" NULL */
2635         }
2636         else
2637         {
2638                 compare = DatumGetInt32(myFunctionCall2Coll(sortFunction, collation,
2639                                                                                                         datum1, datum2));
2640
2641                 if (sk_flags & SK_BT_DESC)
2642                         compare = -compare;
2643         }
2644
2645         return compare;
2646 }
2647
2648
2649 /*
2650  * Routines specialized for HeapTuple (actually MinimalTuple) case
2651  */
2652
2653 static int
2654 comparetup_heap(const SortTuple *a, const SortTuple *b, Tuplesortstate *state)
2655 {
2656         SortSupport     sortKey = state->sortKeys;
2657         HeapTupleData ltup;
2658         HeapTupleData rtup;
2659         TupleDesc       tupDesc;
2660         int                     nkey;
2661         int32           compare;
2662
2663         /* Allow interrupting long sorts */
2664         CHECK_FOR_INTERRUPTS();
2665
2666         /* Compare the leading sort key */
2667         compare = ApplySortComparator(a->datum1, a->isnull1,
2668                                                                   b->datum1, b->isnull1,
2669                                                                   sortKey);
2670         if (compare != 0)
2671                 return compare;
2672
2673         /* Compare additional sort keys */
2674         ltup.t_len = ((MinimalTuple) a->tuple)->t_len + MINIMAL_TUPLE_OFFSET;
2675         ltup.t_data = (HeapTupleHeader) ((char *) a->tuple - MINIMAL_TUPLE_OFFSET);
2676         rtup.t_len = ((MinimalTuple) b->tuple)->t_len + MINIMAL_TUPLE_OFFSET;
2677         rtup.t_data = (HeapTupleHeader) ((char *) b->tuple - MINIMAL_TUPLE_OFFSET);
2678         tupDesc = state->tupDesc;
2679         sortKey++;
2680         for (nkey = 1; nkey < state->nKeys; nkey++, sortKey++)
2681         {
2682                 AttrNumber      attno = sortKey->ssup_attno;
2683                 Datum           datum1,
2684                                         datum2;
2685                 bool            isnull1,
2686                                         isnull2;
2687
2688                 datum1 = heap_getattr(&ltup, attno, tupDesc, &isnull1);
2689                 datum2 = heap_getattr(&rtup, attno, tupDesc, &isnull2);
2690
2691                 compare = ApplySortComparator(datum1, isnull1,
2692                                                                           datum2, isnull2,
2693                                                                           sortKey);
2694                 if (compare != 0)
2695                         return compare;
2696         }
2697
2698         return 0;
2699 }
2700
2701 static void
2702 copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup)
2703 {
2704         /*
2705          * We expect the passed "tup" to be a TupleTableSlot, and form a
2706          * MinimalTuple using the exported interface for that.
2707          */
2708         TupleTableSlot *slot = (TupleTableSlot *) tup;
2709         MinimalTuple tuple;
2710         HeapTupleData htup;
2711
2712         /* copy the tuple into sort storage */
2713         tuple = ExecCopySlotMinimalTuple(slot);
2714         stup->tuple = (void *) tuple;
2715         USEMEM(state, GetMemoryChunkSpace(tuple));
2716         /* set up first-column key value */
2717         htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET;
2718         htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET);
2719         stup->datum1 = heap_getattr(&htup,
2720                                                                 state->sortKeys[0].ssup_attno,
2721                                                                 state->tupDesc,
2722                                                                 &stup->isnull1);
2723 }
2724
2725 static void
2726 writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup)
2727 {
2728         MinimalTuple tuple = (MinimalTuple) stup->tuple;
2729
2730         /* the part of the MinimalTuple we'll write: */
2731         char       *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
2732         unsigned int tupbodylen = tuple->t_len - MINIMAL_TUPLE_DATA_OFFSET;
2733
2734         /* total on-disk footprint: */
2735         unsigned int tuplen = tupbodylen + sizeof(int);
2736
2737         LogicalTapeWrite(state->tapeset, tapenum,
2738                                          (void *) &tuplen, sizeof(tuplen));
2739         LogicalTapeWrite(state->tapeset, tapenum,
2740                                          (void *) tupbody, tupbodylen);
2741         if (state->randomAccess)        /* need trailing length word? */
2742                 LogicalTapeWrite(state->tapeset, tapenum,
2743                                                  (void *) &tuplen, sizeof(tuplen));
2744
2745         FREEMEM(state, GetMemoryChunkSpace(tuple));
2746         heap_free_minimal_tuple(tuple);
2747 }
2748
2749 static void
2750 readtup_heap(Tuplesortstate *state, SortTuple *stup,
2751                          int tapenum, unsigned int len)
2752 {
2753         unsigned int tupbodylen = len - sizeof(int);
2754         unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET;
2755         MinimalTuple tuple = (MinimalTuple) palloc(tuplen);
2756         char       *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
2757         HeapTupleData htup;
2758
2759         USEMEM(state, GetMemoryChunkSpace(tuple));
2760         /* read in the tuple proper */
2761         tuple->t_len = tuplen;
2762         LogicalTapeReadExact(state->tapeset, tapenum,
2763                                                  tupbody, tupbodylen);
2764         if (state->randomAccess)        /* need trailing length word? */
2765                 LogicalTapeReadExact(state->tapeset, tapenum,
2766                                                          &tuplen, sizeof(tuplen));
2767         stup->tuple = (void *) tuple;
2768         /* set up first-column key value */
2769         htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET;
2770         htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET);
2771         stup->datum1 = heap_getattr(&htup,
2772                                                                 state->sortKeys[0].ssup_attno,
2773                                                                 state->tupDesc,
2774                                                                 &stup->isnull1);
2775 }
2776
2777 static void
2778 reversedirection_heap(Tuplesortstate *state)
2779 {
2780         SortSupport     sortKey = state->sortKeys;
2781         int                     nkey;
2782
2783         for (nkey = 0; nkey < state->nKeys; nkey++, sortKey++)
2784         {
2785                 sortKey->ssup_reverse = !sortKey->ssup_reverse;
2786                 sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first;
2787         }
2788 }
2789
2790
2791 /*
2792  * Routines specialized for the CLUSTER case (HeapTuple data, with
2793  * comparisons per a btree index definition)
2794  */
2795
2796 static int
2797 comparetup_cluster(const SortTuple *a, const SortTuple *b,
2798                                    Tuplesortstate *state)
2799 {
2800         ScanKey         scanKey = state->indexScanKey;
2801         HeapTuple       ltup;
2802         HeapTuple       rtup;
2803         TupleDesc       tupDesc;
2804         int                     nkey;
2805         int32           compare;
2806
2807         /* Allow interrupting long sorts */
2808         CHECK_FOR_INTERRUPTS();
2809
2810         /* Compare the leading sort key, if it's simple */
2811         if (state->indexInfo->ii_KeyAttrNumbers[0] != 0)
2812         {
2813                 compare = inlineApplySortFunction(&scanKey->sk_func, scanKey->sk_flags,
2814                                                                                   scanKey->sk_collation,
2815                                                                                   a->datum1, a->isnull1,
2816                                                                                   b->datum1, b->isnull1);
2817                 if (compare != 0 || state->nKeys == 1)
2818                         return compare;
2819                 /* Compare additional columns the hard way */
2820                 scanKey++;
2821                 nkey = 1;
2822         }
2823         else
2824         {
2825                 /* Must compare all keys the hard way */
2826                 nkey = 0;
2827         }
2828
2829         /* Compare additional sort keys */
2830         ltup = (HeapTuple) a->tuple;
2831         rtup = (HeapTuple) b->tuple;
2832
2833         if (state->indexInfo->ii_Expressions == NULL)
2834         {
2835                 /* If not expression index, just compare the proper heap attrs */
2836                 tupDesc = state->tupDesc;
2837
2838                 for (; nkey < state->nKeys; nkey++, scanKey++)
2839                 {
2840                         AttrNumber      attno = state->indexInfo->ii_KeyAttrNumbers[nkey];
2841                         Datum           datum1,
2842                                                 datum2;
2843                         bool            isnull1,
2844                                                 isnull2;
2845
2846                         datum1 = heap_getattr(ltup, attno, tupDesc, &isnull1);
2847                         datum2 = heap_getattr(rtup, attno, tupDesc, &isnull2);
2848
2849                         compare = inlineApplySortFunction(&scanKey->sk_func,
2850                                                                                           scanKey->sk_flags,
2851                                                                                           scanKey->sk_collation,
2852                                                                                           datum1, isnull1,
2853                                                                                           datum2, isnull2);
2854                         if (compare != 0)
2855                                 return compare;
2856                 }
2857         }
2858         else
2859         {
2860                 /*
2861                  * In the expression index case, compute the whole index tuple and
2862                  * then compare values.  It would perhaps be faster to compute only as
2863                  * many columns as we need to compare, but that would require
2864                  * duplicating all the logic in FormIndexDatum.
2865                  */
2866                 Datum           l_index_values[INDEX_MAX_KEYS];
2867                 bool            l_index_isnull[INDEX_MAX_KEYS];
2868                 Datum           r_index_values[INDEX_MAX_KEYS];
2869                 bool            r_index_isnull[INDEX_MAX_KEYS];
2870                 TupleTableSlot *ecxt_scantuple;
2871
2872                 /* Reset context each time to prevent memory leakage */
2873                 ResetPerTupleExprContext(state->estate);
2874
2875                 ecxt_scantuple = GetPerTupleExprContext(state->estate)->ecxt_scantuple;
2876
2877                 ExecStoreTuple(ltup, ecxt_scantuple, InvalidBuffer, false);
2878                 FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate,
2879                                            l_index_values, l_index_isnull);
2880
2881                 ExecStoreTuple(rtup, ecxt_scantuple, InvalidBuffer, false);
2882                 FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate,
2883                                            r_index_values, r_index_isnull);
2884
2885                 for (; nkey < state->nKeys; nkey++, scanKey++)
2886                 {
2887                         compare = inlineApplySortFunction(&scanKey->sk_func,
2888                                                                                           scanKey->sk_flags,
2889                                                                                           scanKey->sk_collation,
2890                                                                                           l_index_values[nkey],
2891                                                                                           l_index_isnull[nkey],
2892                                                                                           r_index_values[nkey],
2893                                                                                           r_index_isnull[nkey]);
2894                         if (compare != 0)
2895                                 return compare;
2896                 }
2897         }
2898
2899         return 0;
2900 }
2901
2902 static void
2903 copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup)
2904 {
2905         HeapTuple       tuple = (HeapTuple) tup;
2906
2907         /* copy the tuple into sort storage */
2908         tuple = heap_copytuple(tuple);
2909         stup->tuple = (void *) tuple;
2910         USEMEM(state, GetMemoryChunkSpace(tuple));
2911         /* set up first-column key value, if it's a simple column */
2912         if (state->indexInfo->ii_KeyAttrNumbers[0] != 0)
2913                 stup->datum1 = heap_getattr(tuple,
2914                                                                         state->indexInfo->ii_KeyAttrNumbers[0],
2915                                                                         state->tupDesc,
2916                                                                         &stup->isnull1);
2917 }
2918
2919 static void
2920 writetup_cluster(Tuplesortstate *state, int tapenum, SortTuple *stup)
2921 {
2922         HeapTuple       tuple = (HeapTuple) stup->tuple;
2923         unsigned int tuplen = tuple->t_len + sizeof(ItemPointerData) + sizeof(int);
2924
2925         /* We need to store t_self, but not other fields of HeapTupleData */
2926         LogicalTapeWrite(state->tapeset, tapenum,
2927                                          &tuplen, sizeof(tuplen));
2928         LogicalTapeWrite(state->tapeset, tapenum,
2929                                          &tuple->t_self, sizeof(ItemPointerData));
2930         LogicalTapeWrite(state->tapeset, tapenum,
2931                                          tuple->t_data, tuple->t_len);
2932         if (state->randomAccess)        /* need trailing length word? */
2933                 LogicalTapeWrite(state->tapeset, tapenum,
2934                                                  &tuplen, sizeof(tuplen));
2935
2936         FREEMEM(state, GetMemoryChunkSpace(tuple));
2937         heap_freetuple(tuple);
2938 }
2939
2940 static void
2941 readtup_cluster(Tuplesortstate *state, SortTuple *stup,
2942                                 int tapenum, unsigned int tuplen)
2943 {
2944         unsigned int t_len = tuplen - sizeof(ItemPointerData) - sizeof(int);
2945         HeapTuple       tuple = (HeapTuple) palloc(t_len + HEAPTUPLESIZE);
2946
2947         USEMEM(state, GetMemoryChunkSpace(tuple));
2948         /* Reconstruct the HeapTupleData header */
2949         tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
2950         tuple->t_len = t_len;
2951         LogicalTapeReadExact(state->tapeset, tapenum,
2952                                                  &tuple->t_self, sizeof(ItemPointerData));
2953         /* We don't currently bother to reconstruct t_tableOid */
2954         tuple->t_tableOid = InvalidOid;
2955         /* Read in the tuple body */
2956         LogicalTapeReadExact(state->tapeset, tapenum,
2957                                                  tuple->t_data, tuple->t_len);
2958         if (state->randomAccess)        /* need trailing length word? */
2959                 LogicalTapeReadExact(state->tapeset, tapenum,
2960                                                          &tuplen, sizeof(tuplen));
2961         stup->tuple = (void *) tuple;
2962         /* set up first-column key value, if it's a simple column */
2963         if (state->indexInfo->ii_KeyAttrNumbers[0] != 0)
2964                 stup->datum1 = heap_getattr(tuple,
2965                                                                         state->indexInfo->ii_KeyAttrNumbers[0],
2966                                                                         state->tupDesc,
2967                                                                         &stup->isnull1);
2968 }
2969
2970
2971 /*
2972  * Routines specialized for IndexTuple case
2973  *
2974  * The btree and hash cases require separate comparison functions, but the
2975  * IndexTuple representation is the same so the copy/write/read support
2976  * functions can be shared.
2977  */
2978
2979 static int
2980 comparetup_index_btree(const SortTuple *a, const SortTuple *b,
2981                                            Tuplesortstate *state)
2982 {
2983         /*
2984          * This is similar to _bt_tuplecompare(), but we have already done the
2985          * index_getattr calls for the first column, and we need to keep track of
2986          * whether any null fields are present.  Also see the special treatment
2987          * for equal keys at the end.
2988          */
2989         ScanKey         scanKey = state->indexScanKey;
2990         IndexTuple      tuple1;
2991         IndexTuple      tuple2;
2992         int                     keysz;
2993         TupleDesc       tupDes;
2994         bool            equal_hasnull = false;
2995         int                     nkey;
2996         int32           compare;
2997
2998         /* Allow interrupting long sorts */
2999         CHECK_FOR_INTERRUPTS();
3000
3001         /* Compare the leading sort key */
3002         compare = inlineApplySortFunction(&scanKey->sk_func, scanKey->sk_flags,
3003                                                                           scanKey->sk_collation,
3004                                                                           a->datum1, a->isnull1,
3005                                                                           b->datum1, b->isnull1);
3006         if (compare != 0)
3007                 return compare;
3008
3009         /* they are equal, so we only need to examine one null flag */
3010         if (a->isnull1)
3011                 equal_hasnull = true;
3012
3013         /* Compare additional sort keys */
3014         tuple1 = (IndexTuple) a->tuple;
3015         tuple2 = (IndexTuple) b->tuple;
3016         keysz = state->nKeys;
3017         tupDes = RelationGetDescr(state->indexRel);
3018         scanKey++;
3019         for (nkey = 2; nkey <= keysz; nkey++, scanKey++)
3020         {
3021                 Datum           datum1,
3022                                         datum2;
3023                 bool            isnull1,
3024                                         isnull2;
3025
3026                 datum1 = index_getattr(tuple1, nkey, tupDes, &isnull1);
3027                 datum2 = index_getattr(tuple2, nkey, tupDes, &isnull2);
3028
3029                 compare = inlineApplySortFunction(&scanKey->sk_func, scanKey->sk_flags,
3030                                                                                   scanKey->sk_collation,
3031                                                                                   datum1, isnull1,
3032                                                                                   datum2, isnull2);
3033                 if (compare != 0)
3034                         return compare;         /* done when we find unequal attributes */
3035
3036                 /* they are equal, so we only need to examine one null flag */
3037                 if (isnull1)
3038                         equal_hasnull = true;
3039         }
3040
3041         /*
3042          * If btree has asked us to enforce uniqueness, complain if two equal
3043          * tuples are detected (unless there was at least one NULL field).
3044          *
3045          * It is sufficient to make the test here, because if two tuples are equal
3046          * they *must* get compared at some stage of the sort --- otherwise the
3047          * sort algorithm wouldn't have checked whether one must appear before the
3048          * other.
3049          *
3050          * Some rather brain-dead implementations of qsort will sometimes call the
3051          * comparison routine to compare a value to itself.  (At this writing only
3052          * QNX 4 is known to do such silly things; we don't support QNX anymore,
3053          * but perhaps the behavior still exists elsewhere.)  Don't raise a bogus
3054          * error in that case.
3055          */
3056         if (state->enforceUnique && !equal_hasnull && tuple1 != tuple2)
3057         {
3058                 Datum           values[INDEX_MAX_KEYS];
3059                 bool            isnull[INDEX_MAX_KEYS];
3060
3061                 index_deform_tuple(tuple1, tupDes, values, isnull);
3062                 ereport(ERROR,
3063                                 (errcode(ERRCODE_UNIQUE_VIOLATION),
3064                                  errmsg("could not create unique index \"%s\"",
3065                                                 RelationGetRelationName(state->indexRel)),
3066                                  errdetail("Key %s is duplicated.",
3067                                                    BuildIndexValueDescription(state->indexRel,
3068                                                                                                           values, isnull))));
3069         }
3070
3071         /*
3072          * If key values are equal, we sort on ItemPointer.  This does not affect
3073          * validity of the finished index, but it offers cheap insurance against
3074          * performance problems with bad qsort implementations that have trouble
3075          * with large numbers of equal keys.
3076          */
3077         {
3078                 BlockNumber blk1 = ItemPointerGetBlockNumber(&tuple1->t_tid);
3079                 BlockNumber blk2 = ItemPointerGetBlockNumber(&tuple2->t_tid);
3080
3081                 if (blk1 != blk2)
3082                         return (blk1 < blk2) ? -1 : 1;
3083         }
3084         {
3085                 OffsetNumber pos1 = ItemPointerGetOffsetNumber(&tuple1->t_tid);
3086                 OffsetNumber pos2 = ItemPointerGetOffsetNumber(&tuple2->t_tid);
3087
3088                 if (pos1 != pos2)
3089                         return (pos1 < pos2) ? -1 : 1;
3090         }
3091
3092         return 0;
3093 }
3094
3095 static int
3096 comparetup_index_hash(const SortTuple *a, const SortTuple *b,
3097                                           Tuplesortstate *state)
3098 {
3099         uint32          hash1;
3100         uint32          hash2;
3101         IndexTuple      tuple1;
3102         IndexTuple      tuple2;
3103
3104         /* Allow interrupting long sorts */
3105         CHECK_FOR_INTERRUPTS();
3106
3107         /*
3108          * Fetch hash keys and mask off bits we don't want to sort by. We know
3109          * that the first column of the index tuple is the hash key.
3110          */
3111         Assert(!a->isnull1);
3112         hash1 = DatumGetUInt32(a->datum1) & state->hash_mask;
3113         Assert(!b->isnull1);
3114         hash2 = DatumGetUInt32(b->datum1) & state->hash_mask;
3115
3116         if (hash1 > hash2)
3117                 return 1;
3118         else if (hash1 < hash2)
3119                 return -1;
3120
3121         /*
3122          * If hash values are equal, we sort on ItemPointer.  This does not affect
3123          * validity of the finished index, but it offers cheap insurance against
3124          * performance problems with bad qsort implementations that have trouble
3125          * with large numbers of equal keys.
3126          */
3127         tuple1 = (IndexTuple) a->tuple;
3128         tuple2 = (IndexTuple) b->tuple;
3129
3130         {
3131                 BlockNumber blk1 = ItemPointerGetBlockNumber(&tuple1->t_tid);
3132                 BlockNumber blk2 = ItemPointerGetBlockNumber(&tuple2->t_tid);
3133
3134                 if (blk1 != blk2)
3135                         return (blk1 < blk2) ? -1 : 1;
3136         }
3137         {
3138                 OffsetNumber pos1 = ItemPointerGetOffsetNumber(&tuple1->t_tid);
3139                 OffsetNumber pos2 = ItemPointerGetOffsetNumber(&tuple2->t_tid);
3140
3141                 if (pos1 != pos2)
3142                         return (pos1 < pos2) ? -1 : 1;
3143         }
3144
3145         return 0;
3146 }
3147
3148 static void
3149 copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup)
3150 {
3151         IndexTuple      tuple = (IndexTuple) tup;
3152         unsigned int tuplen = IndexTupleSize(tuple);
3153         IndexTuple      newtuple;
3154
3155         /* copy the tuple into sort storage */
3156         newtuple = (IndexTuple) palloc(tuplen);
3157         memcpy(newtuple, tuple, tuplen);
3158         USEMEM(state, GetMemoryChunkSpace(newtuple));
3159         stup->tuple = (void *) newtuple;
3160         /* set up first-column key value */
3161         stup->datum1 = index_getattr(newtuple,
3162                                                                  1,
3163                                                                  RelationGetDescr(state->indexRel),
3164                                                                  &stup->isnull1);
3165 }
3166
3167 static void
3168 writetup_index(Tuplesortstate *state, int tapenum, SortTuple *stup)
3169 {
3170         IndexTuple      tuple = (IndexTuple) stup->tuple;
3171         unsigned int tuplen;
3172
3173         tuplen = IndexTupleSize(tuple) + sizeof(tuplen);
3174         LogicalTapeWrite(state->tapeset, tapenum,
3175                                          (void *) &tuplen, sizeof(tuplen));
3176         LogicalTapeWrite(state->tapeset, tapenum,
3177                                          (void *) tuple, IndexTupleSize(tuple));
3178         if (state->randomAccess)        /* need trailing length word? */
3179                 LogicalTapeWrite(state->tapeset, tapenum,
3180                                                  (void *) &tuplen, sizeof(tuplen));
3181
3182         FREEMEM(state, GetMemoryChunkSpace(tuple));
3183         pfree(tuple);
3184 }
3185
3186 static void
3187 readtup_index(Tuplesortstate *state, SortTuple *stup,
3188                           int tapenum, unsigned int len)
3189 {
3190         unsigned int tuplen = len - sizeof(unsigned int);
3191         IndexTuple      tuple = (IndexTuple) palloc(tuplen);
3192
3193         USEMEM(state, GetMemoryChunkSpace(tuple));
3194         LogicalTapeReadExact(state->tapeset, tapenum,
3195                                                  tuple, tuplen);
3196         if (state->randomAccess)        /* need trailing length word? */
3197                 LogicalTapeReadExact(state->tapeset, tapenum,
3198                                                          &tuplen, sizeof(tuplen));
3199         stup->tuple = (void *) tuple;
3200         /* set up first-column key value */
3201         stup->datum1 = index_getattr(tuple,
3202                                                                  1,
3203                                                                  RelationGetDescr(state->indexRel),
3204                                                                  &stup->isnull1);
3205 }
3206
3207 static void
3208 reversedirection_index_btree(Tuplesortstate *state)
3209 {
3210         ScanKey         scanKey = state->indexScanKey;
3211         int                     nkey;
3212
3213         for (nkey = 0; nkey < state->nKeys; nkey++, scanKey++)
3214         {
3215                 scanKey->sk_flags ^= (SK_BT_DESC | SK_BT_NULLS_FIRST);
3216         }
3217 }
3218
3219 static void
3220 reversedirection_index_hash(Tuplesortstate *state)
3221 {
3222         /* We don't support reversing direction in a hash index sort */
3223         elog(ERROR, "reversedirection_index_hash is not implemented");
3224 }
3225
3226
3227 /*
3228  * Routines specialized for DatumTuple case
3229  */
3230
3231 static int
3232 comparetup_datum(const SortTuple *a, const SortTuple *b, Tuplesortstate *state)
3233 {
3234         /* Allow interrupting long sorts */
3235         CHECK_FOR_INTERRUPTS();
3236
3237         return ApplySortComparator(a->datum1, a->isnull1,
3238                                                            b->datum1, b->isnull1,
3239                                                            state->datumKey);
3240 }
3241
3242 static void
3243 copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup)
3244 {
3245         /* Not currently needed */
3246         elog(ERROR, "copytup_datum() should not be called");
3247 }
3248
3249 static void
3250 writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup)
3251 {
3252         void       *waddr;
3253         unsigned int tuplen;
3254         unsigned int writtenlen;
3255
3256         if (stup->isnull1)
3257         {
3258                 waddr = NULL;
3259                 tuplen = 0;
3260         }
3261         else if (state->datumTypeByVal)
3262         {
3263                 waddr = &stup->datum1;
3264                 tuplen = sizeof(Datum);
3265         }
3266         else
3267         {
3268                 waddr = DatumGetPointer(stup->datum1);
3269                 tuplen = datumGetSize(stup->datum1, false, state->datumTypeLen);
3270                 Assert(tuplen != 0);
3271         }
3272
3273         writtenlen = tuplen + sizeof(unsigned int);
3274
3275         LogicalTapeWrite(state->tapeset, tapenum,
3276                                          (void *) &writtenlen, sizeof(writtenlen));
3277         LogicalTapeWrite(state->tapeset, tapenum,
3278                                          waddr, tuplen);
3279         if (state->randomAccess)        /* need trailing length word? */
3280                 LogicalTapeWrite(state->tapeset, tapenum,
3281                                                  (void *) &writtenlen, sizeof(writtenlen));
3282
3283         if (stup->tuple)
3284         {
3285                 FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
3286                 pfree(stup->tuple);
3287         }
3288 }
3289
3290 static void
3291 readtup_datum(Tuplesortstate *state, SortTuple *stup,
3292                           int tapenum, unsigned int len)
3293 {
3294         unsigned int tuplen = len - sizeof(unsigned int);
3295
3296         if (tuplen == 0)
3297         {
3298                 /* it's NULL */
3299                 stup->datum1 = (Datum) 0;
3300                 stup->isnull1 = true;
3301                 stup->tuple = NULL;
3302         }
3303         else if (state->datumTypeByVal)
3304         {
3305                 Assert(tuplen == sizeof(Datum));
3306                 LogicalTapeReadExact(state->tapeset, tapenum,
3307                                                          &stup->datum1, tuplen);
3308                 stup->isnull1 = false;
3309                 stup->tuple = NULL;
3310         }
3311         else
3312         {
3313                 void       *raddr = palloc(tuplen);
3314
3315                 LogicalTapeReadExact(state->tapeset, tapenum,
3316                                                          raddr, tuplen);
3317                 stup->datum1 = PointerGetDatum(raddr);
3318                 stup->isnull1 = false;
3319                 stup->tuple = raddr;
3320                 USEMEM(state, GetMemoryChunkSpace(raddr));
3321         }
3322
3323         if (state->randomAccess)        /* need trailing length word? */
3324                 LogicalTapeReadExact(state->tapeset, tapenum,
3325                                                          &tuplen, sizeof(tuplen));
3326 }
3327
3328 static void
3329 reversedirection_datum(Tuplesortstate *state)
3330 {
3331         state->datumKey->ssup_reverse = !state->datumKey->ssup_reverse;
3332         state->datumKey->ssup_nulls_first = !state->datumKey->ssup_nulls_first;
3333 }
3334
3335 /*
3336  * Convenience routine to free a tuple previously loaded into sort memory
3337  */
3338 static void
3339 free_sort_tuple(Tuplesortstate *state, SortTuple *stup)
3340 {
3341         FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
3342         pfree(stup->tuple);
3343 }