]> granicus.if.org Git - postgresql/blob - src/backend/utils/sort/tuplesort.c
Improve memory space management in tuplesort and tuplestore.
[postgresql] / src / backend / utils / sort / tuplesort.c
1 /*-------------------------------------------------------------------------
2  *
3  * tuplesort.c
4  *        Generalized tuple sorting routines.
5  *
6  * This module handles sorting of heap tuples, index tuples, or single
7  * Datums (and could easily support other kinds of sortable objects,
8  * if necessary).  It works efficiently for both small and large amounts
9  * of data.  Small amounts are sorted in-memory using qsort().  Large
10  * amounts are sorted using temporary files and a standard external sort
11  * algorithm.
12  *
13  * See Knuth, volume 3, for more than you want to know about the external
14  * sorting algorithm.  We divide the input into sorted runs using replacement
15  * selection, in the form of a priority tree implemented as a heap
16  * (essentially his Algorithm 5.2.3H), then merge the runs using polyphase
17  * merge, Knuth's Algorithm 5.4.2D.  The logical "tapes" used by Algorithm D
18  * are implemented by logtape.c, which avoids space wastage by recycling
19  * disk space as soon as each block is read from its "tape".
20  *
21  * We do not form the initial runs using Knuth's recommended replacement
22  * selection data structure (Algorithm 5.4.1R), because it uses a fixed
23  * number of records in memory at all times.  Since we are dealing with
24  * tuples that may vary considerably in size, we want to be able to vary
25  * the number of records kept in memory to ensure full utilization of the
26  * allowed sort memory space.  So, we keep the tuples in a variable-size
27  * heap, with the next record to go out at the top of the heap.  Like
28  * Algorithm 5.4.1R, each record is stored with the run number that it
29  * must go into, and we use (run number, key) as the ordering key for the
30  * heap.  When the run number at the top of the heap changes, we know that
31  * no more records of the prior run are left in the heap.
32  *
33  * The approximate amount of memory allowed for any one sort operation
34  * is specified in kilobytes by the caller (most pass work_mem).  Initially,
35  * we absorb tuples and simply store them in an unsorted array as long as
36  * we haven't exceeded workMem.  If we reach the end of the input without
37  * exceeding workMem, we sort the array using qsort() and subsequently return
38  * tuples just by scanning the tuple array sequentially.  If we do exceed
39  * workMem, we construct a heap using Algorithm H and begin to emit tuples
40  * into sorted runs in temporary tapes, emitting just enough tuples at each
41  * step to get back within the workMem limit.  Whenever the run number at
42  * the top of the heap changes, we begin a new run with a new output tape
43  * (selected per Algorithm D).  After the end of the input is reached,
44  * we dump out remaining tuples in memory into a final run (or two),
45  * then merge the runs using Algorithm D.
46  *
47  * When merging runs, we use a heap containing just the frontmost tuple from
48  * each source run; we repeatedly output the smallest tuple and insert the
49  * next tuple from its source tape (if any).  When the heap empties, the merge
50  * is complete.  The basic merge algorithm thus needs very little memory ---
51  * only M tuples for an M-way merge, and M is constrained to a small number.
52  * However, we can still make good use of our full workMem allocation by
53  * pre-reading additional tuples from each source tape.  Without prereading,
54  * our access pattern to the temporary file would be very erratic; on average
55  * we'd read one block from each of M source tapes during the same time that
56  * we're writing M blocks to the output tape, so there is no sequentiality of
57  * access at all, defeating the read-ahead methods used by most Unix kernels.
58  * Worse, the output tape gets written into a very random sequence of blocks
59  * of the temp file, ensuring that things will be even worse when it comes
60  * time to read that tape.      A straightforward merge pass thus ends up doing a
61  * lot of waiting for disk seeks.  We can improve matters by prereading from
62  * each source tape sequentially, loading about workMem/M bytes from each tape
63  * in turn.  Then we run the merge algorithm, writing but not reading until
64  * one of the preloaded tuple series runs out.  Then we switch back to preread
65  * mode, fill memory again, and repeat.  This approach helps to localize both
66  * read and write accesses.
67  *
68  * When the caller requests random access to the sort result, we form
69  * the final sorted run on a logical tape which is then "frozen", so
70  * that we can access it randomly.      When the caller does not need random
71  * access, we return from tuplesort_performsort() as soon as we are down
72  * to one run per logical tape.  The final merge is then performed
73  * on-the-fly as the caller repeatedly calls tuplesort_getXXX; this
74  * saves one cycle of writing all the data out to disk and reading it in.
75  *
76  * Before Postgres 8.2, we always used a seven-tape polyphase merge, on the
77  * grounds that 7 is the "sweet spot" on the tapes-to-passes curve according
78  * to Knuth's figure 70 (section 5.4.2).  However, Knuth is assuming that
79  * tape drives are expensive beasts, and in particular that there will always
80  * be many more runs than tape drives.  In our implementation a "tape drive"
81  * doesn't cost much more than a few Kb of memory buffers, so we can afford
82  * to have lots of them.  In particular, if we can have as many tape drives
83  * as sorted runs, we can eliminate any repeated I/O at all.  In the current
84  * code we determine the number of tapes M on the basis of workMem: we want
85  * workMem/M to be large enough that we read a fair amount of data each time
86  * we preread from a tape, so as to maintain the locality of access described
87  * above.  Nonetheless, with large workMem we can have many tapes.
88  *
89  *
90  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
91  * Portions Copyright (c) 1994, Regents of the University of California
92  *
93  * IDENTIFICATION
94  *        src/backend/utils/sort/tuplesort.c
95  *
96  *-------------------------------------------------------------------------
97  */
98
99 #include "postgres.h"
100
101 #include <limits.h>
102
103 #include "access/htup_details.h"
104 #include "access/nbtree.h"
105 #include "catalog/index.h"
106 #include "commands/tablespace.h"
107 #include "executor/executor.h"
108 #include "miscadmin.h"
109 #include "pg_trace.h"
110 #include "utils/datum.h"
111 #include "utils/logtape.h"
112 #include "utils/lsyscache.h"
113 #include "utils/memutils.h"
114 #include "utils/pg_rusage.h"
115 #include "utils/rel.h"
116 #include "utils/sortsupport.h"
117 #include "utils/tuplesort.h"
118
119
120 /* sort-type codes for sort__start probes */
121 #define HEAP_SORT               0
122 #define INDEX_SORT              1
123 #define DATUM_SORT              2
124 #define CLUSTER_SORT    3
125
126 /* GUC variables */
127 #ifdef TRACE_SORT
128 bool            trace_sort = false;
129 #endif
130
131 #ifdef DEBUG_BOUNDED_SORT
132 bool            optimize_bounded_sort = true;
133 #endif
134
135
136 /*
137  * The objects we actually sort are SortTuple structs.  These contain
138  * a pointer to the tuple proper (might be a MinimalTuple or IndexTuple),
139  * which is a separate palloc chunk --- we assume it is just one chunk and
140  * can be freed by a simple pfree().  SortTuples also contain the tuple's
141  * first key column in Datum/nullflag format, and an index integer.
142  *
143  * Storing the first key column lets us save heap_getattr or index_getattr
144  * calls during tuple comparisons.      We could extract and save all the key
145  * columns not just the first, but this would increase code complexity and
146  * overhead, and wouldn't actually save any comparison cycles in the common
147  * case where the first key determines the comparison result.  Note that
148  * for a pass-by-reference datatype, datum1 points into the "tuple" storage.
149  *
150  * When sorting single Datums, the data value is represented directly by
151  * datum1/isnull1.      If the datatype is pass-by-reference and isnull1 is false,
152  * then datum1 points to a separately palloc'd data value that is also pointed
153  * to by the "tuple" pointer; otherwise "tuple" is NULL.
154  *
155  * While building initial runs, tupindex holds the tuple's run number.  During
156  * merge passes, we re-use it to hold the input tape number that each tuple in
157  * the heap was read from, or to hold the index of the next tuple pre-read
158  * from the same tape in the case of pre-read entries.  tupindex goes unused
159  * if the sort occurs entirely in memory.
160  */
161 typedef struct
162 {
163         void       *tuple;                      /* the tuple proper */
164         Datum           datum1;                 /* value of first key column */
165         bool            isnull1;                /* is first key column NULL? */
166         int                     tupindex;               /* see notes above */
167 } SortTuple;
168
169
170 /*
171  * Possible states of a Tuplesort object.  These denote the states that
172  * persist between calls of Tuplesort routines.
173  */
174 typedef enum
175 {
176         TSS_INITIAL,                            /* Loading tuples; still within memory limit */
177         TSS_BOUNDED,                            /* Loading tuples into bounded-size heap */
178         TSS_BUILDRUNS,                          /* Loading tuples; writing to tape */
179         TSS_SORTEDINMEM,                        /* Sort completed entirely in memory */
180         TSS_SORTEDONTAPE,                       /* Sort completed, final run is on tape */
181         TSS_FINALMERGE                          /* Performing final merge on-the-fly */
182 } TupSortStatus;
183
184 /*
185  * Parameters for calculation of number of tapes to use --- see inittapes()
186  * and tuplesort_merge_order().
187  *
188  * In this calculation we assume that each tape will cost us about 3 blocks
189  * worth of buffer space (which is an underestimate for very large data
190  * volumes, but it's probably close enough --- see logtape.c).
191  *
192  * MERGE_BUFFER_SIZE is how much data we'd like to read from each input
193  * tape during a preread cycle (see discussion at top of file).
194  */
195 #define MINORDER                6               /* minimum merge order */
196 #define TAPE_BUFFER_OVERHEAD            (BLCKSZ * 3)
197 #define MERGE_BUFFER_SIZE                       (BLCKSZ * 32)
198
199 typedef int (*SortTupleComparator) (const SortTuple *a, const SortTuple *b,
200                                                                                                 Tuplesortstate *state);
201
202 /*
203  * Private state of a Tuplesort operation.
204  */
205 struct Tuplesortstate
206 {
207         TupSortStatus status;           /* enumerated value as shown above */
208         int                     nKeys;                  /* number of columns in sort key */
209         bool            randomAccess;   /* did caller request random access? */
210         bool            bounded;                /* did caller specify a maximum number of
211                                                                  * tuples to return? */
212         bool            boundUsed;              /* true if we made use of a bounded heap */
213         int                     bound;                  /* if bounded, the maximum number of tuples */
214         long            availMem;               /* remaining memory available, in bytes */
215         long            allowedMem;             /* total memory allowed, in bytes */
216         int                     maxTapes;               /* number of tapes (Knuth's T) */
217         int                     tapeRange;              /* maxTapes-1 (Knuth's P) */
218         MemoryContext sortcontext;      /* memory context holding all sort data */
219         LogicalTapeSet *tapeset;        /* logtape.c object for tapes in a temp file */
220
221         /*
222          * These function pointers decouple the routines that must know what kind
223          * of tuple we are sorting from the routines that don't need to know it.
224          * They are set up by the tuplesort_begin_xxx routines.
225          *
226          * Function to compare two tuples; result is per qsort() convention, ie:
227          * <0, 0, >0 according as a<b, a=b, a>b.  The API must match
228          * qsort_arg_comparator.
229          */
230         SortTupleComparator comparetup;
231
232         /*
233          * Function to copy a supplied input tuple into palloc'd space and set up
234          * its SortTuple representation (ie, set tuple/datum1/isnull1).  Also,
235          * state->availMem must be decreased by the amount of space used for the
236          * tuple copy (note the SortTuple struct itself is not counted).
237          */
238         void            (*copytup) (Tuplesortstate *state, SortTuple *stup, void *tup);
239
240         /*
241          * Function to write a stored tuple onto tape.  The representation of the
242          * tuple on tape need not be the same as it is in memory; requirements on
243          * the tape representation are given below.  After writing the tuple,
244          * pfree() the out-of-line data (not the SortTuple struct!), and increase
245          * state->availMem by the amount of memory space thereby released.
246          */
247         void            (*writetup) (Tuplesortstate *state, int tapenum,
248                                                                                  SortTuple *stup);
249
250         /*
251          * Function to read a stored tuple from tape back into memory. 'len' is
252          * the already-read length of the stored tuple.  Create a palloc'd copy,
253          * initialize tuple/datum1/isnull1 in the target SortTuple struct, and
254          * decrease state->availMem by the amount of memory space consumed.
255          */
256         void            (*readtup) (Tuplesortstate *state, SortTuple *stup,
257                                                                                 int tapenum, unsigned int len);
258
259         /*
260          * Function to reverse the sort direction from its current state. (We
261          * could dispense with this if we wanted to enforce that all variants
262          * represent the sort key information alike.)
263          */
264         void            (*reversedirection) (Tuplesortstate *state);
265
266         /*
267          * This array holds the tuples now in sort memory.      If we are in state
268          * INITIAL, the tuples are in no particular order; if we are in state
269          * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
270          * and FINALMERGE, the tuples are organized in "heap" order per Algorithm
271          * H.  (Note that memtupcount only counts the tuples that are part of the
272          * heap --- during merge passes, memtuples[] entries beyond tapeRange are
273          * never in the heap and are used to hold pre-read tuples.)  In state
274          * SORTEDONTAPE, the array is not used.
275          */
276         SortTuple  *memtuples;          /* array of SortTuple structs */
277         int                     memtupcount;    /* number of tuples currently present */
278         int                     memtupsize;             /* allocated length of memtuples array */
279         bool            growmemtuples;  /* memtuples' growth still underway? */
280
281         /*
282          * While building initial runs, this is the current output run number
283          * (starting at 0).  Afterwards, it is the number of initial runs we made.
284          */
285         int                     currentRun;
286
287         /*
288          * Unless otherwise noted, all pointer variables below are pointers to
289          * arrays of length maxTapes, holding per-tape data.
290          */
291
292         /*
293          * These variables are only used during merge passes.  mergeactive[i] is
294          * true if we are reading an input run from (actual) tape number i and
295          * have not yet exhausted that run.  mergenext[i] is the memtuples index
296          * of the next pre-read tuple (next to be loaded into the heap) for tape
297          * i, or 0 if we are out of pre-read tuples.  mergelast[i] similarly
298          * points to the last pre-read tuple from each tape.  mergeavailslots[i]
299          * is the number of unused memtuples[] slots reserved for tape i, and
300          * mergeavailmem[i] is the amount of unused space allocated for tape i.
301          * mergefreelist and mergefirstfree keep track of unused locations in the
302          * memtuples[] array.  The memtuples[].tupindex fields link together
303          * pre-read tuples for each tape as well as recycled locations in
304          * mergefreelist. It is OK to use 0 as a null link in these lists, because
305          * memtuples[0] is part of the merge heap and is never a pre-read tuple.
306          */
307         bool       *mergeactive;        /* active input run source? */
308         int                *mergenext;          /* first preread tuple for each source */
309         int                *mergelast;          /* last preread tuple for each source */
310         int                *mergeavailslots;    /* slots left for prereading each tape */
311         long       *mergeavailmem;      /* availMem for prereading each tape */
312         int                     mergefreelist;  /* head of freelist of recycled slots */
313         int                     mergefirstfree; /* first slot never used in this merge */
314
315         /*
316          * Variables for Algorithm D.  Note that destTape is a "logical" tape
317          * number, ie, an index into the tp_xxx[] arrays.  Be careful to keep
318          * "logical" and "actual" tape numbers straight!
319          */
320         int                     Level;                  /* Knuth's l */
321         int                     destTape;               /* current output tape (Knuth's j, less 1) */
322         int                *tp_fib;                     /* Target Fibonacci run counts (A[]) */
323         int                *tp_runs;            /* # of real runs on each tape */
324         int                *tp_dummy;           /* # of dummy runs for each tape (D[]) */
325         int                *tp_tapenum;         /* Actual tape numbers (TAPE[]) */
326         int                     activeTapes;    /* # of active input tapes in merge pass */
327
328         /*
329          * These variables are used after completion of sorting to keep track of
330          * the next tuple to return.  (In the tape case, the tape's current read
331          * position is also critical state.)
332          */
333         int                     result_tape;    /* actual tape number of finished output */
334         int                     current;                /* array index (only used if SORTEDINMEM) */
335         bool            eof_reached;    /* reached EOF (needed for cursors) */
336
337         /* markpos_xxx holds marked position for mark and restore */
338         long            markpos_block;  /* tape block# (only used if SORTEDONTAPE) */
339         int                     markpos_offset; /* saved "current", or offset in tape block */
340         bool            markpos_eof;    /* saved "eof_reached" */
341
342         /*
343          * These variables are specific to the MinimalTuple case; they are set by
344          * tuplesort_begin_heap and used only by the MinimalTuple routines.
345          */
346         TupleDesc       tupDesc;
347         SortSupport sortKeys;           /* array of length nKeys */
348
349         /*
350          * This variable is shared by the single-key MinimalTuple case and the
351          * Datum case (which both use qsort_ssup()).  Otherwise it's NULL.
352          */
353         SortSupport onlyKey;
354
355         /*
356          * These variables are specific to the CLUSTER case; they are set by
357          * tuplesort_begin_cluster.  Note CLUSTER also uses tupDesc and
358          * indexScanKey.
359          */
360         IndexInfo  *indexInfo;          /* info about index being used for reference */
361         EState     *estate;                     /* for evaluating index expressions */
362
363         /*
364          * These variables are specific to the IndexTuple case; they are set by
365          * tuplesort_begin_index_xxx and used only by the IndexTuple routines.
366          */
367         Relation        indexRel;               /* index being built */
368
369         /* These are specific to the index_btree subcase: */
370         ScanKey         indexScanKey;
371         bool            enforceUnique;  /* complain if we find duplicate tuples */
372
373         /* These are specific to the index_hash subcase: */
374         uint32          hash_mask;              /* mask for sortable part of hash code */
375
376         /*
377          * These variables are specific to the Datum case; they are set by
378          * tuplesort_begin_datum and used only by the DatumTuple routines.
379          */
380         Oid                     datumType;
381         /* we need typelen and byval in order to know how to copy the Datums. */
382         int                     datumTypeLen;
383         bool            datumTypeByVal;
384
385         /*
386          * Resource snapshot for time of sort start.
387          */
388 #ifdef TRACE_SORT
389         PGRUsage        ru_start;
390 #endif
391 };
392
393 #define COMPARETUP(state,a,b)   ((*(state)->comparetup) (a, b, state))
394 #define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup))
395 #define WRITETUP(state,tape,stup)       ((*(state)->writetup) (state, tape, stup))
396 #define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len))
397 #define REVERSEDIRECTION(state) ((*(state)->reversedirection) (state))
398 #define LACKMEM(state)          ((state)->availMem < 0)
399 #define USEMEM(state,amt)       ((state)->availMem -= (amt))
400 #define FREEMEM(state,amt)      ((state)->availMem += (amt))
401
402 /*
403  * NOTES about on-tape representation of tuples:
404  *
405  * We require the first "unsigned int" of a stored tuple to be the total size
406  * on-tape of the tuple, including itself (so it is never zero; an all-zero
407  * unsigned int is used to delimit runs).  The remainder of the stored tuple
408  * may or may not match the in-memory representation of the tuple ---
409  * any conversion needed is the job of the writetup and readtup routines.
410  *
411  * If state->randomAccess is true, then the stored representation of the
412  * tuple must be followed by another "unsigned int" that is a copy of the
413  * length --- so the total tape space used is actually sizeof(unsigned int)
414  * more than the stored length value.  This allows read-backwards.      When
415  * randomAccess is not true, the write/read routines may omit the extra
416  * length word.
417  *
418  * writetup is expected to write both length words as well as the tuple
419  * data.  When readtup is called, the tape is positioned just after the
420  * front length word; readtup must read the tuple data and advance past
421  * the back length word (if present).
422  *
423  * The write/read routines can make use of the tuple description data
424  * stored in the Tuplesortstate record, if needed.      They are also expected
425  * to adjust state->availMem by the amount of memory space (not tape space!)
426  * released or consumed.  There is no error return from either writetup
427  * or readtup; they should ereport() on failure.
428  *
429  *
430  * NOTES about memory consumption calculations:
431  *
432  * We count space allocated for tuples against the workMem limit, plus
433  * the space used by the variable-size memtuples array.  Fixed-size space
434  * is not counted; it's small enough to not be interesting.
435  *
436  * Note that we count actual space used (as shown by GetMemoryChunkSpace)
437  * rather than the originally-requested size.  This is important since
438  * palloc can add substantial overhead.  It's not a complete answer since
439  * we won't count any wasted space in palloc allocation blocks, but it's
440  * a lot better than what we were doing before 7.3.
441  */
442
443 /* When using this macro, beware of double evaluation of len */
444 #define LogicalTapeReadExact(tapeset, tapenum, ptr, len) \
445         do { \
446                 if (LogicalTapeRead(tapeset, tapenum, ptr, len) != (size_t) (len)) \
447                         elog(ERROR, "unexpected end of data"); \
448         } while(0)
449
450
451 static Tuplesortstate *tuplesort_begin_common(int workMem, bool randomAccess);
452 static void puttuple_common(Tuplesortstate *state, SortTuple *tuple);
453 static void inittapes(Tuplesortstate *state);
454 static void selectnewtape(Tuplesortstate *state);
455 static void mergeruns(Tuplesortstate *state);
456 static void mergeonerun(Tuplesortstate *state);
457 static void beginmerge(Tuplesortstate *state);
458 static void mergepreread(Tuplesortstate *state);
459 static void mergeprereadone(Tuplesortstate *state, int srcTape);
460 static void dumptuples(Tuplesortstate *state, bool alltuples);
461 static void make_bounded_heap(Tuplesortstate *state);
462 static void sort_bounded_heap(Tuplesortstate *state);
463 static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple,
464                                           int tupleindex, bool checkIndex);
465 static void tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex);
466 static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
467 static void markrunend(Tuplesortstate *state, int tapenum);
468 static int comparetup_heap(const SortTuple *a, const SortTuple *b,
469                                 Tuplesortstate *state);
470 static void copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup);
471 static void writetup_heap(Tuplesortstate *state, int tapenum,
472                           SortTuple *stup);
473 static void readtup_heap(Tuplesortstate *state, SortTuple *stup,
474                          int tapenum, unsigned int len);
475 static void reversedirection_heap(Tuplesortstate *state);
476 static int comparetup_cluster(const SortTuple *a, const SortTuple *b,
477                                    Tuplesortstate *state);
478 static void copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup);
479 static void writetup_cluster(Tuplesortstate *state, int tapenum,
480                                  SortTuple *stup);
481 static void readtup_cluster(Tuplesortstate *state, SortTuple *stup,
482                                 int tapenum, unsigned int len);
483 static int comparetup_index_btree(const SortTuple *a, const SortTuple *b,
484                                            Tuplesortstate *state);
485 static int comparetup_index_hash(const SortTuple *a, const SortTuple *b,
486                                           Tuplesortstate *state);
487 static void copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup);
488 static void writetup_index(Tuplesortstate *state, int tapenum,
489                            SortTuple *stup);
490 static void readtup_index(Tuplesortstate *state, SortTuple *stup,
491                           int tapenum, unsigned int len);
492 static void reversedirection_index_btree(Tuplesortstate *state);
493 static void reversedirection_index_hash(Tuplesortstate *state);
494 static int comparetup_datum(const SortTuple *a, const SortTuple *b,
495                                  Tuplesortstate *state);
496 static void copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup);
497 static void writetup_datum(Tuplesortstate *state, int tapenum,
498                            SortTuple *stup);
499 static void readtup_datum(Tuplesortstate *state, SortTuple *stup,
500                           int tapenum, unsigned int len);
501 static void reversedirection_datum(Tuplesortstate *state);
502 static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
503
504 /*
505  * Special versions of qsort just for SortTuple objects.  qsort_tuple() sorts
506  * any variant of SortTuples, using the appropriate comparetup function.
507  * qsort_ssup() is specialized for the case where the comparetup function
508  * reduces to ApplySortComparator(), that is single-key MinimalTuple sorts
509  * and Datum sorts.
510  */
511 #include "qsort_tuple.c"
512
513
514 /*
515  *              tuplesort_begin_xxx
516  *
517  * Initialize for a tuple sort operation.
518  *
519  * After calling tuplesort_begin, the caller should call tuplesort_putXXX
520  * zero or more times, then call tuplesort_performsort when all the tuples
521  * have been supplied.  After performsort, retrieve the tuples in sorted
522  * order by calling tuplesort_getXXX until it returns false/NULL.  (If random
523  * access was requested, rescan, markpos, and restorepos can also be called.)
524  * Call tuplesort_end to terminate the operation and release memory/disk space.
525  *
526  * Each variant of tuplesort_begin has a workMem parameter specifying the
527  * maximum number of kilobytes of RAM to use before spilling data to disk.
528  * (The normal value of this parameter is work_mem, but some callers use
529  * other values.)  Each variant also has a randomAccess parameter specifying
530  * whether the caller needs non-sequential access to the sort result.
531  */
532
533 static Tuplesortstate *
534 tuplesort_begin_common(int workMem, bool randomAccess)
535 {
536         Tuplesortstate *state;
537         MemoryContext sortcontext;
538         MemoryContext oldcontext;
539
540         /*
541          * Create a working memory context for this sort operation. All data
542          * needed by the sort will live inside this context.
543          */
544         sortcontext = AllocSetContextCreate(CurrentMemoryContext,
545                                                                                 "TupleSort",
546                                                                                 ALLOCSET_DEFAULT_MINSIZE,
547                                                                                 ALLOCSET_DEFAULT_INITSIZE,
548                                                                                 ALLOCSET_DEFAULT_MAXSIZE);
549
550         /*
551          * Make the Tuplesortstate within the per-sort context.  This way, we
552          * don't need a separate pfree() operation for it at shutdown.
553          */
554         oldcontext = MemoryContextSwitchTo(sortcontext);
555
556         state = (Tuplesortstate *) palloc0(sizeof(Tuplesortstate));
557
558 #ifdef TRACE_SORT
559         if (trace_sort)
560                 pg_rusage_init(&state->ru_start);
561 #endif
562
563         state->status = TSS_INITIAL;
564         state->randomAccess = randomAccess;
565         state->bounded = false;
566         state->boundUsed = false;
567         state->allowedMem = workMem * 1024L;
568         state->availMem = state->allowedMem;
569         state->sortcontext = sortcontext;
570         state->tapeset = NULL;
571
572         state->memtupcount = 0;
573         state->memtupsize = 1024;       /* initial guess */
574         state->growmemtuples = true;
575         state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
576
577         USEMEM(state, GetMemoryChunkSpace(state->memtuples));
578
579         /* workMem must be large enough for the minimal memtuples array */
580         if (LACKMEM(state))
581                 elog(ERROR, "insufficient memory allowed for sort");
582
583         state->currentRun = 0;
584
585         /*
586          * maxTapes, tapeRange, and Algorithm D variables will be initialized by
587          * inittapes(), if needed
588          */
589
590         state->result_tape = -1;        /* flag that result tape has not been formed */
591
592         MemoryContextSwitchTo(oldcontext);
593
594         return state;
595 }
596
597 Tuplesortstate *
598 tuplesort_begin_heap(TupleDesc tupDesc,
599                                          int nkeys, AttrNumber *attNums,
600                                          Oid *sortOperators, Oid *sortCollations,
601                                          bool *nullsFirstFlags,
602                                          int workMem, bool randomAccess)
603 {
604         Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
605         MemoryContext oldcontext;
606         int                     i;
607
608         oldcontext = MemoryContextSwitchTo(state->sortcontext);
609
610         AssertArg(nkeys > 0);
611
612 #ifdef TRACE_SORT
613         if (trace_sort)
614                 elog(LOG,
615                          "begin tuple sort: nkeys = %d, workMem = %d, randomAccess = %c",
616                          nkeys, workMem, randomAccess ? 't' : 'f');
617 #endif
618
619         state->nKeys = nkeys;
620
621         TRACE_POSTGRESQL_SORT_START(HEAP_SORT,
622                                                                 false,  /* no unique check */
623                                                                 nkeys,
624                                                                 workMem,
625                                                                 randomAccess);
626
627         state->comparetup = comparetup_heap;
628         state->copytup = copytup_heap;
629         state->writetup = writetup_heap;
630         state->readtup = readtup_heap;
631         state->reversedirection = reversedirection_heap;
632
633         state->tupDesc = tupDesc;       /* assume we need not copy tupDesc */
634
635         /* Prepare SortSupport data for each column */
636         state->sortKeys = (SortSupport) palloc0(nkeys * sizeof(SortSupportData));
637
638         for (i = 0; i < nkeys; i++)
639         {
640                 SortSupport sortKey = state->sortKeys + i;
641
642                 AssertArg(attNums[i] != 0);
643                 AssertArg(sortOperators[i] != 0);
644
645                 sortKey->ssup_cxt = CurrentMemoryContext;
646                 sortKey->ssup_collation = sortCollations[i];
647                 sortKey->ssup_nulls_first = nullsFirstFlags[i];
648                 sortKey->ssup_attno = attNums[i];
649
650                 PrepareSortSupportFromOrderingOp(sortOperators[i], sortKey);
651         }
652
653         if (nkeys == 1)
654                 state->onlyKey = state->sortKeys;
655
656         MemoryContextSwitchTo(oldcontext);
657
658         return state;
659 }
660
661 Tuplesortstate *
662 tuplesort_begin_cluster(TupleDesc tupDesc,
663                                                 Relation indexRel,
664                                                 int workMem, bool randomAccess)
665 {
666         Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
667         MemoryContext oldcontext;
668
669         Assert(indexRel->rd_rel->relam == BTREE_AM_OID);
670
671         oldcontext = MemoryContextSwitchTo(state->sortcontext);
672
673 #ifdef TRACE_SORT
674         if (trace_sort)
675                 elog(LOG,
676                          "begin tuple sort: nkeys = %d, workMem = %d, randomAccess = %c",
677                          RelationGetNumberOfAttributes(indexRel),
678                          workMem, randomAccess ? 't' : 'f');
679 #endif
680
681         state->nKeys = RelationGetNumberOfAttributes(indexRel);
682
683         TRACE_POSTGRESQL_SORT_START(CLUSTER_SORT,
684                                                                 false,  /* no unique check */
685                                                                 state->nKeys,
686                                                                 workMem,
687                                                                 randomAccess);
688
689         state->comparetup = comparetup_cluster;
690         state->copytup = copytup_cluster;
691         state->writetup = writetup_cluster;
692         state->readtup = readtup_cluster;
693         state->reversedirection = reversedirection_index_btree;
694
695         state->indexInfo = BuildIndexInfo(indexRel);
696         state->indexScanKey = _bt_mkscankey_nodata(indexRel);
697
698         state->tupDesc = tupDesc;       /* assume we need not copy tupDesc */
699
700         if (state->indexInfo->ii_Expressions != NULL)
701         {
702                 TupleTableSlot *slot;
703                 ExprContext *econtext;
704
705                 /*
706                  * We will need to use FormIndexDatum to evaluate the index
707                  * expressions.  To do that, we need an EState, as well as a
708                  * TupleTableSlot to put the table tuples into.  The econtext's
709                  * scantuple has to point to that slot, too.
710                  */
711                 state->estate = CreateExecutorState();
712                 slot = MakeSingleTupleTableSlot(tupDesc);
713                 econtext = GetPerTupleExprContext(state->estate);
714                 econtext->ecxt_scantuple = slot;
715         }
716
717         MemoryContextSwitchTo(oldcontext);
718
719         return state;
720 }
721
722 Tuplesortstate *
723 tuplesort_begin_index_btree(Relation indexRel,
724                                                         bool enforceUnique,
725                                                         int workMem, bool randomAccess)
726 {
727         Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
728         MemoryContext oldcontext;
729
730         oldcontext = MemoryContextSwitchTo(state->sortcontext);
731
732 #ifdef TRACE_SORT
733         if (trace_sort)
734                 elog(LOG,
735                          "begin index sort: unique = %c, workMem = %d, randomAccess = %c",
736                          enforceUnique ? 't' : 'f',
737                          workMem, randomAccess ? 't' : 'f');
738 #endif
739
740         state->nKeys = RelationGetNumberOfAttributes(indexRel);
741
742         TRACE_POSTGRESQL_SORT_START(INDEX_SORT,
743                                                                 enforceUnique,
744                                                                 state->nKeys,
745                                                                 workMem,
746                                                                 randomAccess);
747
748         state->comparetup = comparetup_index_btree;
749         state->copytup = copytup_index;
750         state->writetup = writetup_index;
751         state->readtup = readtup_index;
752         state->reversedirection = reversedirection_index_btree;
753
754         state->indexRel = indexRel;
755         state->indexScanKey = _bt_mkscankey_nodata(indexRel);
756         state->enforceUnique = enforceUnique;
757
758         MemoryContextSwitchTo(oldcontext);
759
760         return state;
761 }
762
763 Tuplesortstate *
764 tuplesort_begin_index_hash(Relation indexRel,
765                                                    uint32 hash_mask,
766                                                    int workMem, bool randomAccess)
767 {
768         Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
769         MemoryContext oldcontext;
770
771         oldcontext = MemoryContextSwitchTo(state->sortcontext);
772
773 #ifdef TRACE_SORT
774         if (trace_sort)
775                 elog(LOG,
776                 "begin index sort: hash_mask = 0x%x, workMem = %d, randomAccess = %c",
777                          hash_mask,
778                          workMem, randomAccess ? 't' : 'f');
779 #endif
780
781         state->nKeys = 1;                       /* Only one sort column, the hash code */
782
783         state->comparetup = comparetup_index_hash;
784         state->copytup = copytup_index;
785         state->writetup = writetup_index;
786         state->readtup = readtup_index;
787         state->reversedirection = reversedirection_index_hash;
788
789         state->indexRel = indexRel;
790
791         state->hash_mask = hash_mask;
792
793         MemoryContextSwitchTo(oldcontext);
794
795         return state;
796 }
797
798 Tuplesortstate *
799 tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
800                                           bool nullsFirstFlag,
801                                           int workMem, bool randomAccess)
802 {
803         Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
804         MemoryContext oldcontext;
805         int16           typlen;
806         bool            typbyval;
807
808         oldcontext = MemoryContextSwitchTo(state->sortcontext);
809
810 #ifdef TRACE_SORT
811         if (trace_sort)
812                 elog(LOG,
813                          "begin datum sort: workMem = %d, randomAccess = %c",
814                          workMem, randomAccess ? 't' : 'f');
815 #endif
816
817         state->nKeys = 1;                       /* always a one-column sort */
818
819         TRACE_POSTGRESQL_SORT_START(DATUM_SORT,
820                                                                 false,  /* no unique check */
821                                                                 1,
822                                                                 workMem,
823                                                                 randomAccess);
824
825         state->comparetup = comparetup_datum;
826         state->copytup = copytup_datum;
827         state->writetup = writetup_datum;
828         state->readtup = readtup_datum;
829         state->reversedirection = reversedirection_datum;
830
831         state->datumType = datumType;
832
833         /* Prepare SortSupport data */
834         state->onlyKey = (SortSupport) palloc0(sizeof(SortSupportData));
835
836         state->onlyKey->ssup_cxt = CurrentMemoryContext;
837         state->onlyKey->ssup_collation = sortCollation;
838         state->onlyKey->ssup_nulls_first = nullsFirstFlag;
839
840         PrepareSortSupportFromOrderingOp(sortOperator, state->onlyKey);
841
842         /* lookup necessary attributes of the datum type */
843         get_typlenbyval(datumType, &typlen, &typbyval);
844         state->datumTypeLen = typlen;
845         state->datumTypeByVal = typbyval;
846
847         MemoryContextSwitchTo(oldcontext);
848
849         return state;
850 }
851
852 /*
853  * tuplesort_set_bound
854  *
855  *      Advise tuplesort that at most the first N result tuples are required.
856  *
857  * Must be called before inserting any tuples.  (Actually, we could allow it
858  * as long as the sort hasn't spilled to disk, but there seems no need for
859  * delayed calls at the moment.)
860  *
861  * This is a hint only. The tuplesort may still return more tuples than
862  * requested.
863  */
864 void
865 tuplesort_set_bound(Tuplesortstate *state, int64 bound)
866 {
867         /* Assert we're called before loading any tuples */
868         Assert(state->status == TSS_INITIAL);
869         Assert(state->memtupcount == 0);
870         Assert(!state->bounded);
871
872 #ifdef DEBUG_BOUNDED_SORT
873         /* Honor GUC setting that disables the feature (for easy testing) */
874         if (!optimize_bounded_sort)
875                 return;
876 #endif
877
878         /* We want to be able to compute bound * 2, so limit the setting */
879         if (bound > (int64) (INT_MAX / 2))
880                 return;
881
882         state->bounded = true;
883         state->bound = (int) bound;
884 }
885
886 /*
887  * tuplesort_end
888  *
889  *      Release resources and clean up.
890  *
891  * NOTE: after calling this, any pointers returned by tuplesort_getXXX are
892  * pointing to garbage.  Be careful not to attempt to use or free such
893  * pointers afterwards!
894  */
895 void
896 tuplesort_end(Tuplesortstate *state)
897 {
898         /* context swap probably not needed, but let's be safe */
899         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
900
901 #ifdef TRACE_SORT
902         long            spaceUsed;
903
904         if (state->tapeset)
905                 spaceUsed = LogicalTapeSetBlocks(state->tapeset);
906         else
907                 spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
908 #endif
909
910         /*
911          * Delete temporary "tape" files, if any.
912          *
913          * Note: want to include this in reported total cost of sort, hence need
914          * for two #ifdef TRACE_SORT sections.
915          */
916         if (state->tapeset)
917                 LogicalTapeSetClose(state->tapeset);
918
919 #ifdef TRACE_SORT
920         if (trace_sort)
921         {
922                 if (state->tapeset)
923                         elog(LOG, "external sort ended, %ld disk blocks used: %s",
924                                  spaceUsed, pg_rusage_show(&state->ru_start));
925                 else
926                         elog(LOG, "internal sort ended, %ld KB used: %s",
927                                  spaceUsed, pg_rusage_show(&state->ru_start));
928         }
929
930         TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, spaceUsed);
931 #else
932
933         /*
934          * If you disabled TRACE_SORT, you can still probe sort__done, but you
935          * ain't getting space-used stats.
936          */
937         TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, 0L);
938 #endif
939
940         /* Free any execution state created for CLUSTER case */
941         if (state->estate != NULL)
942         {
943                 ExprContext *econtext = GetPerTupleExprContext(state->estate);
944
945                 ExecDropSingleTupleTableSlot(econtext->ecxt_scantuple);
946                 FreeExecutorState(state->estate);
947         }
948
949         MemoryContextSwitchTo(oldcontext);
950
951         /*
952          * Free the per-sort memory context, thereby releasing all working memory,
953          * including the Tuplesortstate struct itself.
954          */
955         MemoryContextDelete(state->sortcontext);
956 }
957
958 /*
959  * Grow the memtuples[] array, if possible within our memory constraint.
960  * Return TRUE if we were able to enlarge the array, FALSE if not.
961  *
962  * Normally, at each increment we double the size of the array.  When we no
963  * longer have enough memory to do that, we attempt one last, smaller increase
964  * (and then clear the growmemtuples flag so we don't try any more).  That
965  * allows us to use allowedMem as fully as possible; sticking to the pure
966  * doubling rule could result in almost half of allowedMem going unused.
967  * Because availMem moves around with tuple addition/removal, we need some
968  * rule to prevent making repeated small increases in memtupsize, which would
969  * just be useless thrashing.  The growmemtuples flag accomplishes that and
970  * also prevents useless recalculations in this function.
971  */
972 static bool
973 grow_memtuples(Tuplesortstate *state)
974 {
975         int                     newmemtupsize;
976         int                     memtupsize = state->memtupsize;
977         long            memNowUsed = state->allowedMem - state->availMem;
978
979         /* Forget it if we've already maxed out memtuples, per comment above */
980         if (!state->growmemtuples)
981                 return false;
982
983         /* Select new value of memtupsize */
984         if (memNowUsed <= state->availMem)
985         {
986                 /*
987                  * It is surely safe to double memtupsize if we've used no more than
988                  * half of allowedMem.
989                  *
990                  * Note: it might seem that we need to worry about memtupsize * 2
991                  * overflowing an int, but the MaxAllocSize clamp applied below
992                  * ensures the existing memtupsize can't be large enough for that.
993                  */
994                 newmemtupsize = memtupsize * 2;
995         }
996         else
997         {
998                 /*
999                  * This will be the last increment of memtupsize.  Abandon doubling
1000                  * strategy and instead increase as much as we safely can.
1001                  *
1002                  * To stay within allowedMem, we can't increase memtupsize by more
1003                  * than availMem / sizeof(SortTuple) elements.  In practice, we want
1004                  * to increase it by considerably less, because we need to leave some
1005                  * space for the tuples to which the new array slots will refer.  We
1006                  * assume the new tuples will be about the same size as the tuples
1007                  * we've already seen, and thus we can extrapolate from the space
1008                  * consumption so far to estimate an appropriate new size for the
1009                  * memtuples array.  The optimal value might be higher or lower than
1010                  * this estimate, but it's hard to know that in advance.
1011                  *
1012                  * This calculation is safe against enlarging the array so much that
1013                  * LACKMEM becomes true, because the memory currently used includes
1014                  * the present array; thus, there would be enough allowedMem for the
1015                  * new array elements even if no other memory were currently used.
1016                  *
1017                  * We do the arithmetic in float8, because otherwise the product of
1018                  * memtupsize and allowedMem could overflow.  (A little algebra shows
1019                  * that grow_ratio must be less than 2 here, so we are not risking
1020                  * integer overflow this way.)  Any inaccuracy in the result should be
1021                  * insignificant; but even if we computed a completely insane result,
1022                  * the checks below will prevent anything really bad from happening.
1023                  */
1024                 double          grow_ratio;
1025
1026                 grow_ratio = (double) state->allowedMem / (double) memNowUsed;
1027                 newmemtupsize = (int) (memtupsize * grow_ratio);
1028
1029                 /* We won't make any further enlargement attempts */
1030                 state->growmemtuples = false;
1031         }
1032
1033         /* Must enlarge array by at least one element, else report failure */
1034         if (newmemtupsize <= memtupsize)
1035                 goto noalloc;
1036
1037         /*
1038          * On a 64-bit machine, allowedMem could be more than MaxAllocSize.  Clamp
1039          * to ensure our request won't be rejected by palloc.
1040          */
1041         if ((Size) newmemtupsize >= MaxAllocSize / sizeof(SortTuple))
1042         {
1043                 newmemtupsize = (int) (MaxAllocSize / sizeof(SortTuple));
1044                 state->growmemtuples = false;   /* can't grow any more */
1045         }
1046
1047         /*
1048          * We need to be sure that we do not cause LACKMEM to become true, else
1049          * the space management algorithm will go nuts.  The code above should
1050          * never generate a dangerous request, but to be safe, check explicitly
1051          * that the array growth fits within availMem.  (We could still cause
1052          * LACKMEM if the memory chunk overhead associated with the memtuples
1053          * array were to increase.      That shouldn't happen with any sane value of
1054          * allowedMem, because at any array size large enough to risk LACKMEM,
1055          * palloc would be treating both old and new arrays as separate chunks.
1056          * But we'll check LACKMEM explicitly below just in case.)
1057          */
1058         if (state->availMem < (long) ((newmemtupsize - memtupsize) * sizeof(SortTuple)))
1059                 goto noalloc;
1060
1061         /* OK, do it */
1062         FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
1063         state->memtupsize = newmemtupsize;
1064         state->memtuples = (SortTuple *)
1065                 repalloc(state->memtuples,
1066                                  state->memtupsize * sizeof(SortTuple));
1067         USEMEM(state, GetMemoryChunkSpace(state->memtuples));
1068         if (LACKMEM(state))
1069                 elog(ERROR, "unexpected out-of-memory situation during sort");
1070         return true;
1071
1072 noalloc:
1073         /* If for any reason we didn't realloc, shut off future attempts */
1074         state->growmemtuples = false;
1075         return false;
1076 }
1077
1078 /*
1079  * Accept one tuple while collecting input data for sort.
1080  *
1081  * Note that the input data is always copied; the caller need not save it.
1082  */
1083 void
1084 tuplesort_puttupleslot(Tuplesortstate *state, TupleTableSlot *slot)
1085 {
1086         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1087         SortTuple       stup;
1088
1089         /*
1090          * Copy the given tuple into memory we control, and decrease availMem.
1091          * Then call the common code.
1092          */
1093         COPYTUP(state, &stup, (void *) slot);
1094
1095         puttuple_common(state, &stup);
1096
1097         MemoryContextSwitchTo(oldcontext);
1098 }
1099
1100 /*
1101  * Accept one tuple while collecting input data for sort.
1102  *
1103  * Note that the input data is always copied; the caller need not save it.
1104  */
1105 void
1106 tuplesort_putheaptuple(Tuplesortstate *state, HeapTuple tup)
1107 {
1108         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1109         SortTuple       stup;
1110
1111         /*
1112          * Copy the given tuple into memory we control, and decrease availMem.
1113          * Then call the common code.
1114          */
1115         COPYTUP(state, &stup, (void *) tup);
1116
1117         puttuple_common(state, &stup);
1118
1119         MemoryContextSwitchTo(oldcontext);
1120 }
1121
1122 /*
1123  * Accept one index tuple while collecting input data for sort.
1124  *
1125  * Note that the input tuple is always copied; the caller need not save it.
1126  */
1127 void
1128 tuplesort_putindextuple(Tuplesortstate *state, IndexTuple tuple)
1129 {
1130         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1131         SortTuple       stup;
1132
1133         /*
1134          * Copy the given tuple into memory we control, and decrease availMem.
1135          * Then call the common code.
1136          */
1137         COPYTUP(state, &stup, (void *) tuple);
1138
1139         puttuple_common(state, &stup);
1140
1141         MemoryContextSwitchTo(oldcontext);
1142 }
1143
1144 /*
1145  * Accept one Datum while collecting input data for sort.
1146  *
1147  * If the Datum is pass-by-ref type, the value will be copied.
1148  */
1149 void
1150 tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull)
1151 {
1152         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1153         SortTuple       stup;
1154
1155         /*
1156          * If it's a pass-by-reference value, copy it into memory we control, and
1157          * decrease availMem.  Then call the common code.
1158          */
1159         if (isNull || state->datumTypeByVal)
1160         {
1161                 stup.datum1 = val;
1162                 stup.isnull1 = isNull;
1163                 stup.tuple = NULL;              /* no separate storage */
1164         }
1165         else
1166         {
1167                 stup.datum1 = datumCopy(val, false, state->datumTypeLen);
1168                 stup.isnull1 = false;
1169                 stup.tuple = DatumGetPointer(stup.datum1);
1170                 USEMEM(state, GetMemoryChunkSpace(stup.tuple));
1171         }
1172
1173         puttuple_common(state, &stup);
1174
1175         MemoryContextSwitchTo(oldcontext);
1176 }
1177
1178 /*
1179  * Shared code for tuple and datum cases.
1180  */
1181 static void
1182 puttuple_common(Tuplesortstate *state, SortTuple *tuple)
1183 {
1184         switch (state->status)
1185         {
1186                 case TSS_INITIAL:
1187
1188                         /*
1189                          * Save the tuple into the unsorted array.      First, grow the array
1190                          * as needed.  Note that we try to grow the array when there is
1191                          * still one free slot remaining --- if we fail, there'll still be
1192                          * room to store the incoming tuple, and then we'll switch to
1193                          * tape-based operation.
1194                          */
1195                         if (state->memtupcount >= state->memtupsize - 1)
1196                         {
1197                                 (void) grow_memtuples(state);
1198                                 Assert(state->memtupcount < state->memtupsize);
1199                         }
1200                         state->memtuples[state->memtupcount++] = *tuple;
1201
1202                         /*
1203                          * Check if it's time to switch over to a bounded heapsort. We do
1204                          * so if the input tuple count exceeds twice the desired tuple
1205                          * count (this is a heuristic for where heapsort becomes cheaper
1206                          * than a quicksort), or if we've just filled workMem and have
1207                          * enough tuples to meet the bound.
1208                          *
1209                          * Note that once we enter TSS_BOUNDED state we will always try to
1210                          * complete the sort that way.  In the worst case, if later input
1211                          * tuples are larger than earlier ones, this might cause us to
1212                          * exceed workMem significantly.
1213                          */
1214                         if (state->bounded &&
1215                                 (state->memtupcount > state->bound * 2 ||
1216                                  (state->memtupcount > state->bound && LACKMEM(state))))
1217                         {
1218 #ifdef TRACE_SORT
1219                                 if (trace_sort)
1220                                         elog(LOG, "switching to bounded heapsort at %d tuples: %s",
1221                                                  state->memtupcount,
1222                                                  pg_rusage_show(&state->ru_start));
1223 #endif
1224                                 make_bounded_heap(state);
1225                                 return;
1226                         }
1227
1228                         /*
1229                          * Done if we still fit in available memory and have array slots.
1230                          */
1231                         if (state->memtupcount < state->memtupsize && !LACKMEM(state))
1232                                 return;
1233
1234                         /*
1235                          * Nope; time to switch to tape-based operation.
1236                          */
1237                         inittapes(state);
1238
1239                         /*
1240                          * Dump tuples until we are back under the limit.
1241                          */
1242                         dumptuples(state, false);
1243                         break;
1244
1245                 case TSS_BOUNDED:
1246
1247                         /*
1248                          * We don't want to grow the array here, so check whether the new
1249                          * tuple can be discarded before putting it in.  This should be a
1250                          * good speed optimization, too, since when there are many more
1251                          * input tuples than the bound, most input tuples can be discarded
1252                          * with just this one comparison.  Note that because we currently
1253                          * have the sort direction reversed, we must check for <= not >=.
1254                          */
1255                         if (COMPARETUP(state, tuple, &state->memtuples[0]) <= 0)
1256                         {
1257                                 /* new tuple <= top of the heap, so we can discard it */
1258                                 free_sort_tuple(state, tuple);
1259                                 CHECK_FOR_INTERRUPTS();
1260                         }
1261                         else
1262                         {
1263                                 /* discard top of heap, sift up, insert new tuple */
1264                                 free_sort_tuple(state, &state->memtuples[0]);
1265                                 tuplesort_heap_siftup(state, false);
1266                                 tuplesort_heap_insert(state, tuple, 0, false);
1267                         }
1268                         break;
1269
1270                 case TSS_BUILDRUNS:
1271
1272                         /*
1273                          * Insert the tuple into the heap, with run number currentRun if
1274                          * it can go into the current run, else run number currentRun+1.
1275                          * The tuple can go into the current run if it is >= the first
1276                          * not-yet-output tuple.  (Actually, it could go into the current
1277                          * run if it is >= the most recently output tuple ... but that
1278                          * would require keeping around the tuple we last output, and it's
1279                          * simplest to let writetup free each tuple as soon as it's
1280                          * written.)
1281                          *
1282                          * Note there will always be at least one tuple in the heap at
1283                          * this point; see dumptuples.
1284                          */
1285                         Assert(state->memtupcount > 0);
1286                         if (COMPARETUP(state, tuple, &state->memtuples[0]) >= 0)
1287                                 tuplesort_heap_insert(state, tuple, state->currentRun, true);
1288                         else
1289                                 tuplesort_heap_insert(state, tuple, state->currentRun + 1, true);
1290
1291                         /*
1292                          * If we are over the memory limit, dump tuples till we're under.
1293                          */
1294                         dumptuples(state, false);
1295                         break;
1296
1297                 default:
1298                         elog(ERROR, "invalid tuplesort state");
1299                         break;
1300         }
1301 }
1302
1303 /*
1304  * All tuples have been provided; finish the sort.
1305  */
1306 void
1307 tuplesort_performsort(Tuplesortstate *state)
1308 {
1309         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1310
1311 #ifdef TRACE_SORT
1312         if (trace_sort)
1313                 elog(LOG, "performsort starting: %s",
1314                          pg_rusage_show(&state->ru_start));
1315 #endif
1316
1317         switch (state->status)
1318         {
1319                 case TSS_INITIAL:
1320
1321                         /*
1322                          * We were able to accumulate all the tuples within the allowed
1323                          * amount of memory.  Just qsort 'em and we're done.
1324                          */
1325                         if (state->memtupcount > 1)
1326                         {
1327                                 /* Can we use the single-key sort function? */
1328                                 if (state->onlyKey != NULL)
1329                                         qsort_ssup(state->memtuples, state->memtupcount,
1330                                                            state->onlyKey);
1331                                 else
1332                                         qsort_tuple(state->memtuples,
1333                                                                 state->memtupcount,
1334                                                                 state->comparetup,
1335                                                                 state);
1336                         }
1337                         state->current = 0;
1338                         state->eof_reached = false;
1339                         state->markpos_offset = 0;
1340                         state->markpos_eof = false;
1341                         state->status = TSS_SORTEDINMEM;
1342                         break;
1343
1344                 case TSS_BOUNDED:
1345
1346                         /*
1347                          * We were able to accumulate all the tuples required for output
1348                          * in memory, using a heap to eliminate excess tuples.  Now we
1349                          * have to transform the heap to a properly-sorted array.
1350                          */
1351                         sort_bounded_heap(state);
1352                         state->current = 0;
1353                         state->eof_reached = false;
1354                         state->markpos_offset = 0;
1355                         state->markpos_eof = false;
1356                         state->status = TSS_SORTEDINMEM;
1357                         break;
1358
1359                 case TSS_BUILDRUNS:
1360
1361                         /*
1362                          * Finish tape-based sort.      First, flush all tuples remaining in
1363                          * memory out to tape; then merge until we have a single remaining
1364                          * run (or, if !randomAccess, one run per tape). Note that
1365                          * mergeruns sets the correct state->status.
1366                          */
1367                         dumptuples(state, true);
1368                         mergeruns(state);
1369                         state->eof_reached = false;
1370                         state->markpos_block = 0L;
1371                         state->markpos_offset = 0;
1372                         state->markpos_eof = false;
1373                         break;
1374
1375                 default:
1376                         elog(ERROR, "invalid tuplesort state");
1377                         break;
1378         }
1379
1380 #ifdef TRACE_SORT
1381         if (trace_sort)
1382         {
1383                 if (state->status == TSS_FINALMERGE)
1384                         elog(LOG, "performsort done (except %d-way final merge): %s",
1385                                  state->activeTapes,
1386                                  pg_rusage_show(&state->ru_start));
1387                 else
1388                         elog(LOG, "performsort done: %s",
1389                                  pg_rusage_show(&state->ru_start));
1390         }
1391 #endif
1392
1393         MemoryContextSwitchTo(oldcontext);
1394 }
1395
1396 /*
1397  * Internal routine to fetch the next tuple in either forward or back
1398  * direction into *stup.  Returns FALSE if no more tuples.
1399  * If *should_free is set, the caller must pfree stup.tuple when done with it.
1400  */
1401 static bool
1402 tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
1403                                                   SortTuple *stup, bool *should_free)
1404 {
1405         unsigned int tuplen;
1406
1407         switch (state->status)
1408         {
1409                 case TSS_SORTEDINMEM:
1410                         Assert(forward || state->randomAccess);
1411                         *should_free = false;
1412                         if (forward)
1413                         {
1414                                 if (state->current < state->memtupcount)
1415                                 {
1416                                         *stup = state->memtuples[state->current++];
1417                                         return true;
1418                                 }
1419                                 state->eof_reached = true;
1420
1421                                 /*
1422                                  * Complain if caller tries to retrieve more tuples than
1423                                  * originally asked for in a bounded sort.      This is because
1424                                  * returning EOF here might be the wrong thing.
1425                                  */
1426                                 if (state->bounded && state->current >= state->bound)
1427                                         elog(ERROR, "retrieved too many tuples in a bounded sort");
1428
1429                                 return false;
1430                         }
1431                         else
1432                         {
1433                                 if (state->current <= 0)
1434                                         return false;
1435
1436                                 /*
1437                                  * if all tuples are fetched already then we return last
1438                                  * tuple, else - tuple before last returned.
1439                                  */
1440                                 if (state->eof_reached)
1441                                         state->eof_reached = false;
1442                                 else
1443                                 {
1444                                         state->current--;       /* last returned tuple */
1445                                         if (state->current <= 0)
1446                                                 return false;
1447                                 }
1448                                 *stup = state->memtuples[state->current - 1];
1449                                 return true;
1450                         }
1451                         break;
1452
1453                 case TSS_SORTEDONTAPE:
1454                         Assert(forward || state->randomAccess);
1455                         *should_free = true;
1456                         if (forward)
1457                         {
1458                                 if (state->eof_reached)
1459                                         return false;
1460                                 if ((tuplen = getlen(state, state->result_tape, true)) != 0)
1461                                 {
1462                                         READTUP(state, stup, state->result_tape, tuplen);
1463                                         return true;
1464                                 }
1465                                 else
1466                                 {
1467                                         state->eof_reached = true;
1468                                         return false;
1469                                 }
1470                         }
1471
1472                         /*
1473                          * Backward.
1474                          *
1475                          * if all tuples are fetched already then we return last tuple,
1476                          * else - tuple before last returned.
1477                          */
1478                         if (state->eof_reached)
1479                         {
1480                                 /*
1481                                  * Seek position is pointing just past the zero tuplen at the
1482                                  * end of file; back up to fetch last tuple's ending length
1483                                  * word.  If seek fails we must have a completely empty file.
1484                                  */
1485                                 if (!LogicalTapeBackspace(state->tapeset,
1486                                                                                   state->result_tape,
1487                                                                                   2 * sizeof(unsigned int)))
1488                                         return false;
1489                                 state->eof_reached = false;
1490                         }
1491                         else
1492                         {
1493                                 /*
1494                                  * Back up and fetch previously-returned tuple's ending length
1495                                  * word.  If seek fails, assume we are at start of file.
1496                                  */
1497                                 if (!LogicalTapeBackspace(state->tapeset,
1498                                                                                   state->result_tape,
1499                                                                                   sizeof(unsigned int)))
1500                                         return false;
1501                                 tuplen = getlen(state, state->result_tape, false);
1502
1503                                 /*
1504                                  * Back up to get ending length word of tuple before it.
1505                                  */
1506                                 if (!LogicalTapeBackspace(state->tapeset,
1507                                                                                   state->result_tape,
1508                                                                                   tuplen + 2 * sizeof(unsigned int)))
1509                                 {
1510                                         /*
1511                                          * If that fails, presumably the prev tuple is the first
1512                                          * in the file.  Back up so that it becomes next to read
1513                                          * in forward direction (not obviously right, but that is
1514                                          * what in-memory case does).
1515                                          */
1516                                         if (!LogicalTapeBackspace(state->tapeset,
1517                                                                                           state->result_tape,
1518                                                                                           tuplen + sizeof(unsigned int)))
1519                                                 elog(ERROR, "bogus tuple length in backward scan");
1520                                         return false;
1521                                 }
1522                         }
1523
1524                         tuplen = getlen(state, state->result_tape, false);
1525
1526                         /*
1527                          * Now we have the length of the prior tuple, back up and read it.
1528                          * Note: READTUP expects we are positioned after the initial
1529                          * length word of the tuple, so back up to that point.
1530                          */
1531                         if (!LogicalTapeBackspace(state->tapeset,
1532                                                                           state->result_tape,
1533                                                                           tuplen))
1534                                 elog(ERROR, "bogus tuple length in backward scan");
1535                         READTUP(state, stup, state->result_tape, tuplen);
1536                         return true;
1537
1538                 case TSS_FINALMERGE:
1539                         Assert(forward);
1540                         *should_free = true;
1541
1542                         /*
1543                          * This code should match the inner loop of mergeonerun().
1544                          */
1545                         if (state->memtupcount > 0)
1546                         {
1547                                 int                     srcTape = state->memtuples[0].tupindex;
1548                                 Size            tuplen;
1549                                 int                     tupIndex;
1550                                 SortTuple  *newtup;
1551
1552                                 *stup = state->memtuples[0];
1553                                 /* returned tuple is no longer counted in our memory space */
1554                                 if (stup->tuple)
1555                                 {
1556                                         tuplen = GetMemoryChunkSpace(stup->tuple);
1557                                         state->availMem += tuplen;
1558                                         state->mergeavailmem[srcTape] += tuplen;
1559                                 }
1560                                 tuplesort_heap_siftup(state, false);
1561                                 if ((tupIndex = state->mergenext[srcTape]) == 0)
1562                                 {
1563                                         /*
1564                                          * out of preloaded data on this tape, try to read more
1565                                          *
1566                                          * Unlike mergeonerun(), we only preload from the single
1567                                          * tape that's run dry.  See mergepreread() comments.
1568                                          */
1569                                         mergeprereadone(state, srcTape);
1570
1571                                         /*
1572                                          * if still no data, we've reached end of run on this tape
1573                                          */
1574                                         if ((tupIndex = state->mergenext[srcTape]) == 0)
1575                                                 return true;
1576                                 }
1577                                 /* pull next preread tuple from list, insert in heap */
1578                                 newtup = &state->memtuples[tupIndex];
1579                                 state->mergenext[srcTape] = newtup->tupindex;
1580                                 if (state->mergenext[srcTape] == 0)
1581                                         state->mergelast[srcTape] = 0;
1582                                 tuplesort_heap_insert(state, newtup, srcTape, false);
1583                                 /* put the now-unused memtuples entry on the freelist */
1584                                 newtup->tupindex = state->mergefreelist;
1585                                 state->mergefreelist = tupIndex;
1586                                 state->mergeavailslots[srcTape]++;
1587                                 return true;
1588                         }
1589                         return false;
1590
1591                 default:
1592                         elog(ERROR, "invalid tuplesort state");
1593                         return false;           /* keep compiler quiet */
1594         }
1595 }
1596
1597 /*
1598  * Fetch the next tuple in either forward or back direction.
1599  * If successful, put tuple in slot and return TRUE; else, clear the slot
1600  * and return FALSE.
1601  */
1602 bool
1603 tuplesort_gettupleslot(Tuplesortstate *state, bool forward,
1604                                            TupleTableSlot *slot)
1605 {
1606         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1607         SortTuple       stup;
1608         bool            should_free;
1609
1610         if (!tuplesort_gettuple_common(state, forward, &stup, &should_free))
1611                 stup.tuple = NULL;
1612
1613         MemoryContextSwitchTo(oldcontext);
1614
1615         if (stup.tuple)
1616         {
1617                 ExecStoreMinimalTuple((MinimalTuple) stup.tuple, slot, should_free);
1618                 return true;
1619         }
1620         else
1621         {
1622                 ExecClearTuple(slot);
1623                 return false;
1624         }
1625 }
1626
1627 /*
1628  * Fetch the next tuple in either forward or back direction.
1629  * Returns NULL if no more tuples.      If *should_free is set, the
1630  * caller must pfree the returned tuple when done with it.
1631  */
1632 HeapTuple
1633 tuplesort_getheaptuple(Tuplesortstate *state, bool forward, bool *should_free)
1634 {
1635         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1636         SortTuple       stup;
1637
1638         if (!tuplesort_gettuple_common(state, forward, &stup, should_free))
1639                 stup.tuple = NULL;
1640
1641         MemoryContextSwitchTo(oldcontext);
1642
1643         return stup.tuple;
1644 }
1645
1646 /*
1647  * Fetch the next index tuple in either forward or back direction.
1648  * Returns NULL if no more tuples.      If *should_free is set, the
1649  * caller must pfree the returned tuple when done with it.
1650  */
1651 IndexTuple
1652 tuplesort_getindextuple(Tuplesortstate *state, bool forward,
1653                                                 bool *should_free)
1654 {
1655         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1656         SortTuple       stup;
1657
1658         if (!tuplesort_gettuple_common(state, forward, &stup, should_free))
1659                 stup.tuple = NULL;
1660
1661         MemoryContextSwitchTo(oldcontext);
1662
1663         return (IndexTuple) stup.tuple;
1664 }
1665
1666 /*
1667  * Fetch the next Datum in either forward or back direction.
1668  * Returns FALSE if no more datums.
1669  *
1670  * If the Datum is pass-by-ref type, the returned value is freshly palloc'd
1671  * and is now owned by the caller.
1672  */
1673 bool
1674 tuplesort_getdatum(Tuplesortstate *state, bool forward,
1675                                    Datum *val, bool *isNull)
1676 {
1677         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1678         SortTuple       stup;
1679         bool            should_free;
1680
1681         if (!tuplesort_gettuple_common(state, forward, &stup, &should_free))
1682         {
1683                 MemoryContextSwitchTo(oldcontext);
1684                 return false;
1685         }
1686
1687         if (stup.isnull1 || state->datumTypeByVal)
1688         {
1689                 *val = stup.datum1;
1690                 *isNull = stup.isnull1;
1691         }
1692         else
1693         {
1694                 if (should_free)
1695                         *val = stup.datum1;
1696                 else
1697                         *val = datumCopy(stup.datum1, false, state->datumTypeLen);
1698                 *isNull = false;
1699         }
1700
1701         MemoryContextSwitchTo(oldcontext);
1702
1703         return true;
1704 }
1705
1706 /*
1707  * tuplesort_merge_order - report merge order we'll use for given memory
1708  * (note: "merge order" just means the number of input tapes in the merge).
1709  *
1710  * This is exported for use by the planner.  allowedMem is in bytes.
1711  */
1712 int
1713 tuplesort_merge_order(long allowedMem)
1714 {
1715         int                     mOrder;
1716
1717         /*
1718          * We need one tape for each merge input, plus another one for the output,
1719          * and each of these tapes needs buffer space.  In addition we want
1720          * MERGE_BUFFER_SIZE workspace per input tape (but the output tape doesn't
1721          * count).
1722          *
1723          * Note: you might be thinking we need to account for the memtuples[]
1724          * array in this calculation, but we effectively treat that as part of the
1725          * MERGE_BUFFER_SIZE workspace.
1726          */
1727         mOrder = (allowedMem - TAPE_BUFFER_OVERHEAD) /
1728                 (MERGE_BUFFER_SIZE + TAPE_BUFFER_OVERHEAD);
1729
1730         /* Even in minimum memory, use at least a MINORDER merge */
1731         mOrder = Max(mOrder, MINORDER);
1732
1733         return mOrder;
1734 }
1735
1736 /*
1737  * inittapes - initialize for tape sorting.
1738  *
1739  * This is called only if we have found we don't have room to sort in memory.
1740  */
1741 static void
1742 inittapes(Tuplesortstate *state)
1743 {
1744         int                     maxTapes,
1745                                 ntuples,
1746                                 j;
1747         long            tapeSpace;
1748
1749         /* Compute number of tapes to use: merge order plus 1 */
1750         maxTapes = tuplesort_merge_order(state->allowedMem) + 1;
1751
1752         /*
1753          * We must have at least 2*maxTapes slots in the memtuples[] array, else
1754          * we'd not have room for merge heap plus preread.  It seems unlikely that
1755          * this case would ever occur, but be safe.
1756          */
1757         maxTapes = Min(maxTapes, state->memtupsize / 2);
1758
1759         state->maxTapes = maxTapes;
1760         state->tapeRange = maxTapes - 1;
1761
1762 #ifdef TRACE_SORT
1763         if (trace_sort)
1764                 elog(LOG, "switching to external sort with %d tapes: %s",
1765                          maxTapes, pg_rusage_show(&state->ru_start));
1766 #endif
1767
1768         /*
1769          * Decrease availMem to reflect the space needed for tape buffers; but
1770          * don't decrease it to the point that we have no room for tuples. (That
1771          * case is only likely to occur if sorting pass-by-value Datums; in all
1772          * other scenarios the memtuples[] array is unlikely to occupy more than
1773          * half of allowedMem.  In the pass-by-value case it's not important to
1774          * account for tuple space, so we don't care if LACKMEM becomes
1775          * inaccurate.)
1776          */
1777         tapeSpace = maxTapes * TAPE_BUFFER_OVERHEAD;
1778         if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
1779                 USEMEM(state, tapeSpace);
1780
1781         /*
1782          * Make sure that the temp file(s) underlying the tape set are created in
1783          * suitable temp tablespaces.
1784          */
1785         PrepareTempTablespaces();
1786
1787         /*
1788          * Create the tape set and allocate the per-tape data arrays.
1789          */
1790         state->tapeset = LogicalTapeSetCreate(maxTapes);
1791
1792         state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool));
1793         state->mergenext = (int *) palloc0(maxTapes * sizeof(int));
1794         state->mergelast = (int *) palloc0(maxTapes * sizeof(int));
1795         state->mergeavailslots = (int *) palloc0(maxTapes * sizeof(int));
1796         state->mergeavailmem = (long *) palloc0(maxTapes * sizeof(long));
1797         state->tp_fib = (int *) palloc0(maxTapes * sizeof(int));
1798         state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
1799         state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int));
1800         state->tp_tapenum = (int *) palloc0(maxTapes * sizeof(int));
1801
1802         /*
1803          * Convert the unsorted contents of memtuples[] into a heap. Each tuple is
1804          * marked as belonging to run number zero.
1805          *
1806          * NOTE: we pass false for checkIndex since there's no point in comparing
1807          * indexes in this step, even though we do intend the indexes to be part
1808          * of the sort key...
1809          */
1810         ntuples = state->memtupcount;
1811         state->memtupcount = 0;         /* make the heap empty */
1812         for (j = 0; j < ntuples; j++)
1813         {
1814                 /* Must copy source tuple to avoid possible overwrite */
1815                 SortTuple       stup = state->memtuples[j];
1816
1817                 tuplesort_heap_insert(state, &stup, 0, false);
1818         }
1819         Assert(state->memtupcount == ntuples);
1820
1821         state->currentRun = 0;
1822
1823         /*
1824          * Initialize variables of Algorithm D (step D1).
1825          */
1826         for (j = 0; j < maxTapes; j++)
1827         {
1828                 state->tp_fib[j] = 1;
1829                 state->tp_runs[j] = 0;
1830                 state->tp_dummy[j] = 1;
1831                 state->tp_tapenum[j] = j;
1832         }
1833         state->tp_fib[state->tapeRange] = 0;
1834         state->tp_dummy[state->tapeRange] = 0;
1835
1836         state->Level = 1;
1837         state->destTape = 0;
1838
1839         state->status = TSS_BUILDRUNS;
1840 }
1841
1842 /*
1843  * selectnewtape -- select new tape for new initial run.
1844  *
1845  * This is called after finishing a run when we know another run
1846  * must be started.  This implements steps D3, D4 of Algorithm D.
1847  */
1848 static void
1849 selectnewtape(Tuplesortstate *state)
1850 {
1851         int                     j;
1852         int                     a;
1853
1854         /* Step D3: advance j (destTape) */
1855         if (state->tp_dummy[state->destTape] < state->tp_dummy[state->destTape + 1])
1856         {
1857                 state->destTape++;
1858                 return;
1859         }
1860         if (state->tp_dummy[state->destTape] != 0)
1861         {
1862                 state->destTape = 0;
1863                 return;
1864         }
1865
1866         /* Step D4: increase level */
1867         state->Level++;
1868         a = state->tp_fib[0];
1869         for (j = 0; j < state->tapeRange; j++)
1870         {
1871                 state->tp_dummy[j] = a + state->tp_fib[j + 1] - state->tp_fib[j];
1872                 state->tp_fib[j] = a + state->tp_fib[j + 1];
1873         }
1874         state->destTape = 0;
1875 }
1876
1877 /*
1878  * mergeruns -- merge all the completed initial runs.
1879  *
1880  * This implements steps D5, D6 of Algorithm D.  All input data has
1881  * already been written to initial runs on tape (see dumptuples).
1882  */
1883 static void
1884 mergeruns(Tuplesortstate *state)
1885 {
1886         int                     tapenum,
1887                                 svTape,
1888                                 svRuns,
1889                                 svDummy;
1890
1891         Assert(state->status == TSS_BUILDRUNS);
1892         Assert(state->memtupcount == 0);
1893
1894         /*
1895          * If we produced only one initial run (quite likely if the total data
1896          * volume is between 1X and 2X workMem), we can just use that tape as the
1897          * finished output, rather than doing a useless merge.  (This obvious
1898          * optimization is not in Knuth's algorithm.)
1899          */
1900         if (state->currentRun == 1)
1901         {
1902                 state->result_tape = state->tp_tapenum[state->destTape];
1903                 /* must freeze and rewind the finished output tape */
1904                 LogicalTapeFreeze(state->tapeset, state->result_tape);
1905                 state->status = TSS_SORTEDONTAPE;
1906                 return;
1907         }
1908
1909         /* End of step D2: rewind all output tapes to prepare for merging */
1910         for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
1911                 LogicalTapeRewind(state->tapeset, tapenum, false);
1912
1913         for (;;)
1914         {
1915                 /*
1916                  * At this point we know that tape[T] is empty.  If there's just one
1917                  * (real or dummy) run left on each input tape, then only one merge
1918                  * pass remains.  If we don't have to produce a materialized sorted
1919                  * tape, we can stop at this point and do the final merge on-the-fly.
1920                  */
1921                 if (!state->randomAccess)
1922                 {
1923                         bool            allOneRun = true;
1924
1925                         Assert(state->tp_runs[state->tapeRange] == 0);
1926                         for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
1927                         {
1928                                 if (state->tp_runs[tapenum] + state->tp_dummy[tapenum] != 1)
1929                                 {
1930                                         allOneRun = false;
1931                                         break;
1932                                 }
1933                         }
1934                         if (allOneRun)
1935                         {
1936                                 /* Tell logtape.c we won't be writing anymore */
1937                                 LogicalTapeSetForgetFreeSpace(state->tapeset);
1938                                 /* Initialize for the final merge pass */
1939                                 beginmerge(state);
1940                                 state->status = TSS_FINALMERGE;
1941                                 return;
1942                         }
1943                 }
1944
1945                 /* Step D5: merge runs onto tape[T] until tape[P] is empty */
1946                 while (state->tp_runs[state->tapeRange - 1] ||
1947                            state->tp_dummy[state->tapeRange - 1])
1948                 {
1949                         bool            allDummy = true;
1950
1951                         for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
1952                         {
1953                                 if (state->tp_dummy[tapenum] == 0)
1954                                 {
1955                                         allDummy = false;
1956                                         break;
1957                                 }
1958                         }
1959
1960                         if (allDummy)
1961                         {
1962                                 state->tp_dummy[state->tapeRange]++;
1963                                 for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
1964                                         state->tp_dummy[tapenum]--;
1965                         }
1966                         else
1967                                 mergeonerun(state);
1968                 }
1969
1970                 /* Step D6: decrease level */
1971                 if (--state->Level == 0)
1972                         break;
1973                 /* rewind output tape T to use as new input */
1974                 LogicalTapeRewind(state->tapeset, state->tp_tapenum[state->tapeRange],
1975                                                   false);
1976                 /* rewind used-up input tape P, and prepare it for write pass */
1977                 LogicalTapeRewind(state->tapeset, state->tp_tapenum[state->tapeRange - 1],
1978                                                   true);
1979                 state->tp_runs[state->tapeRange - 1] = 0;
1980
1981                 /*
1982                  * reassign tape units per step D6; note we no longer care about A[]
1983                  */
1984                 svTape = state->tp_tapenum[state->tapeRange];
1985                 svDummy = state->tp_dummy[state->tapeRange];
1986                 svRuns = state->tp_runs[state->tapeRange];
1987                 for (tapenum = state->tapeRange; tapenum > 0; tapenum--)
1988                 {
1989                         state->tp_tapenum[tapenum] = state->tp_tapenum[tapenum - 1];
1990                         state->tp_dummy[tapenum] = state->tp_dummy[tapenum - 1];
1991                         state->tp_runs[tapenum] = state->tp_runs[tapenum - 1];
1992                 }
1993                 state->tp_tapenum[0] = svTape;
1994                 state->tp_dummy[0] = svDummy;
1995                 state->tp_runs[0] = svRuns;
1996         }
1997
1998         /*
1999          * Done.  Knuth says that the result is on TAPE[1], but since we exited
2000          * the loop without performing the last iteration of step D6, we have not
2001          * rearranged the tape unit assignment, and therefore the result is on
2002          * TAPE[T].  We need to do it this way so that we can freeze the final
2003          * output tape while rewinding it.      The last iteration of step D6 would be
2004          * a waste of cycles anyway...
2005          */
2006         state->result_tape = state->tp_tapenum[state->tapeRange];
2007         LogicalTapeFreeze(state->tapeset, state->result_tape);
2008         state->status = TSS_SORTEDONTAPE;
2009 }
2010
2011 /*
2012  * Merge one run from each input tape, except ones with dummy runs.
2013  *
2014  * This is the inner loop of Algorithm D step D5.  We know that the
2015  * output tape is TAPE[T].
2016  */
2017 static void
2018 mergeonerun(Tuplesortstate *state)
2019 {
2020         int                     destTape = state->tp_tapenum[state->tapeRange];
2021         int                     srcTape;
2022         int                     tupIndex;
2023         SortTuple  *tup;
2024         long            priorAvail,
2025                                 spaceFreed;
2026
2027         /*
2028          * Start the merge by loading one tuple from each active source tape into
2029          * the heap.  We can also decrease the input run/dummy run counts.
2030          */
2031         beginmerge(state);
2032
2033         /*
2034          * Execute merge by repeatedly extracting lowest tuple in heap, writing it
2035          * out, and replacing it with next tuple from same tape (if there is
2036          * another one).
2037          */
2038         while (state->memtupcount > 0)
2039         {
2040                 /* write the tuple to destTape */
2041                 priorAvail = state->availMem;
2042                 srcTape = state->memtuples[0].tupindex;
2043                 WRITETUP(state, destTape, &state->memtuples[0]);
2044                 /* writetup adjusted total free space, now fix per-tape space */
2045                 spaceFreed = state->availMem - priorAvail;
2046                 state->mergeavailmem[srcTape] += spaceFreed;
2047                 /* compact the heap */
2048                 tuplesort_heap_siftup(state, false);
2049                 if ((tupIndex = state->mergenext[srcTape]) == 0)
2050                 {
2051                         /* out of preloaded data on this tape, try to read more */
2052                         mergepreread(state);
2053                         /* if still no data, we've reached end of run on this tape */
2054                         if ((tupIndex = state->mergenext[srcTape]) == 0)
2055                                 continue;
2056                 }
2057                 /* pull next preread tuple from list, insert in heap */
2058                 tup = &state->memtuples[tupIndex];
2059                 state->mergenext[srcTape] = tup->tupindex;
2060                 if (state->mergenext[srcTape] == 0)
2061                         state->mergelast[srcTape] = 0;
2062                 tuplesort_heap_insert(state, tup, srcTape, false);
2063                 /* put the now-unused memtuples entry on the freelist */
2064                 tup->tupindex = state->mergefreelist;
2065                 state->mergefreelist = tupIndex;
2066                 state->mergeavailslots[srcTape]++;
2067         }
2068
2069         /*
2070          * When the heap empties, we're done.  Write an end-of-run marker on the
2071          * output tape, and increment its count of real runs.
2072          */
2073         markrunend(state, destTape);
2074         state->tp_runs[state->tapeRange]++;
2075
2076 #ifdef TRACE_SORT
2077         if (trace_sort)
2078                 elog(LOG, "finished %d-way merge step: %s", state->activeTapes,
2079                          pg_rusage_show(&state->ru_start));
2080 #endif
2081 }
2082
2083 /*
2084  * beginmerge - initialize for a merge pass
2085  *
2086  * We decrease the counts of real and dummy runs for each tape, and mark
2087  * which tapes contain active input runs in mergeactive[].      Then, load
2088  * as many tuples as we can from each active input tape, and finally
2089  * fill the merge heap with the first tuple from each active tape.
2090  */
2091 static void
2092 beginmerge(Tuplesortstate *state)
2093 {
2094         int                     activeTapes;
2095         int                     tapenum;
2096         int                     srcTape;
2097         int                     slotsPerTape;
2098         long            spacePerTape;
2099
2100         /* Heap should be empty here */
2101         Assert(state->memtupcount == 0);
2102
2103         /* Adjust run counts and mark the active tapes */
2104         memset(state->mergeactive, 0,
2105                    state->maxTapes * sizeof(*state->mergeactive));
2106         activeTapes = 0;
2107         for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
2108         {
2109                 if (state->tp_dummy[tapenum] > 0)
2110                         state->tp_dummy[tapenum]--;
2111                 else
2112                 {
2113                         Assert(state->tp_runs[tapenum] > 0);
2114                         state->tp_runs[tapenum]--;
2115                         srcTape = state->tp_tapenum[tapenum];
2116                         state->mergeactive[srcTape] = true;
2117                         activeTapes++;
2118                 }
2119         }
2120         state->activeTapes = activeTapes;
2121
2122         /* Clear merge-pass state variables */
2123         memset(state->mergenext, 0,
2124                    state->maxTapes * sizeof(*state->mergenext));
2125         memset(state->mergelast, 0,
2126                    state->maxTapes * sizeof(*state->mergelast));
2127         state->mergefreelist = 0;       /* nothing in the freelist */
2128         state->mergefirstfree = activeTapes;            /* 1st slot avail for preread */
2129
2130         /*
2131          * Initialize space allocation to let each active input tape have an equal
2132          * share of preread space.
2133          */
2134         Assert(activeTapes > 0);
2135         slotsPerTape = (state->memtupsize - state->mergefirstfree) / activeTapes;
2136         Assert(slotsPerTape > 0);
2137         spacePerTape = state->availMem / activeTapes;
2138         for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
2139         {
2140                 if (state->mergeactive[srcTape])
2141                 {
2142                         state->mergeavailslots[srcTape] = slotsPerTape;
2143                         state->mergeavailmem[srcTape] = spacePerTape;
2144                 }
2145         }
2146
2147         /*
2148          * Preread as many tuples as possible (and at least one) from each active
2149          * tape
2150          */
2151         mergepreread(state);
2152
2153         /* Load the merge heap with the first tuple from each input tape */
2154         for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
2155         {
2156                 int                     tupIndex = state->mergenext[srcTape];
2157                 SortTuple  *tup;
2158
2159                 if (tupIndex)
2160                 {
2161                         tup = &state->memtuples[tupIndex];
2162                         state->mergenext[srcTape] = tup->tupindex;
2163                         if (state->mergenext[srcTape] == 0)
2164                                 state->mergelast[srcTape] = 0;
2165                         tuplesort_heap_insert(state, tup, srcTape, false);
2166                         /* put the now-unused memtuples entry on the freelist */
2167                         tup->tupindex = state->mergefreelist;
2168                         state->mergefreelist = tupIndex;
2169                         state->mergeavailslots[srcTape]++;
2170                 }
2171         }
2172 }
2173
2174 /*
2175  * mergepreread - load tuples from merge input tapes
2176  *
2177  * This routine exists to improve sequentiality of reads during a merge pass,
2178  * as explained in the header comments of this file.  Load tuples from each
2179  * active source tape until the tape's run is exhausted or it has used up
2180  * its fair share of available memory.  In any case, we guarantee that there
2181  * is at least one preread tuple available from each unexhausted input tape.
2182  *
2183  * We invoke this routine at the start of a merge pass for initial load,
2184  * and then whenever any tape's preread data runs out.  Note that we load
2185  * as much data as possible from all tapes, not just the one that ran out.
2186  * This is because logtape.c works best with a usage pattern that alternates
2187  * between reading a lot of data and writing a lot of data, so whenever we
2188  * are forced to read, we should fill working memory completely.
2189  *
2190  * In FINALMERGE state, we *don't* use this routine, but instead just preread
2191  * from the single tape that ran dry.  There's no read/write alternation in
2192  * that state and so no point in scanning through all the tapes to fix one.
2193  * (Moreover, there may be quite a lot of inactive tapes in that state, since
2194  * we might have had many fewer runs than tapes.  In a regular tape-to-tape
2195  * merge we can expect most of the tapes to be active.)
2196  */
2197 static void
2198 mergepreread(Tuplesortstate *state)
2199 {
2200         int                     srcTape;
2201
2202         for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
2203                 mergeprereadone(state, srcTape);
2204 }
2205
2206 /*
2207  * mergeprereadone - load tuples from one merge input tape
2208  *
2209  * Read tuples from the specified tape until it has used up its free memory
2210  * or array slots; but ensure that we have at least one tuple, if any are
2211  * to be had.
2212  */
2213 static void
2214 mergeprereadone(Tuplesortstate *state, int srcTape)
2215 {
2216         unsigned int tuplen;
2217         SortTuple       stup;
2218         int                     tupIndex;
2219         long            priorAvail,
2220                                 spaceUsed;
2221
2222         if (!state->mergeactive[srcTape])
2223                 return;                                 /* tape's run is already exhausted */
2224         priorAvail = state->availMem;
2225         state->availMem = state->mergeavailmem[srcTape];
2226         while ((state->mergeavailslots[srcTape] > 0 && !LACKMEM(state)) ||
2227                    state->mergenext[srcTape] == 0)
2228         {
2229                 /* read next tuple, if any */
2230                 if ((tuplen = getlen(state, srcTape, true)) == 0)
2231                 {
2232                         state->mergeactive[srcTape] = false;
2233                         break;
2234                 }
2235                 READTUP(state, &stup, srcTape, tuplen);
2236                 /* find a free slot in memtuples[] for it */
2237                 tupIndex = state->mergefreelist;
2238                 if (tupIndex)
2239                         state->mergefreelist = state->memtuples[tupIndex].tupindex;
2240                 else
2241                 {
2242                         tupIndex = state->mergefirstfree++;
2243                         Assert(tupIndex < state->memtupsize);
2244                 }
2245                 state->mergeavailslots[srcTape]--;
2246                 /* store tuple, append to list for its tape */
2247                 stup.tupindex = 0;
2248                 state->memtuples[tupIndex] = stup;
2249                 if (state->mergelast[srcTape])
2250                         state->memtuples[state->mergelast[srcTape]].tupindex = tupIndex;
2251                 else
2252                         state->mergenext[srcTape] = tupIndex;
2253                 state->mergelast[srcTape] = tupIndex;
2254         }
2255         /* update per-tape and global availmem counts */
2256         spaceUsed = state->mergeavailmem[srcTape] - state->availMem;
2257         state->mergeavailmem[srcTape] = state->availMem;
2258         state->availMem = priorAvail - spaceUsed;
2259 }
2260
2261 /*
2262  * dumptuples - remove tuples from heap and write to tape
2263  *
2264  * This is used during initial-run building, but not during merging.
2265  *
2266  * When alltuples = false, dump only enough tuples to get under the
2267  * availMem limit (and leave at least one tuple in the heap in any case,
2268  * since puttuple assumes it always has a tuple to compare to).  We also
2269  * insist there be at least one free slot in the memtuples[] array.
2270  *
2271  * When alltuples = true, dump everything currently in memory.
2272  * (This case is only used at end of input data.)
2273  *
2274  * If we empty the heap, close out the current run and return (this should
2275  * only happen at end of input data).  If we see that the tuple run number
2276  * at the top of the heap has changed, start a new run.
2277  */
2278 static void
2279 dumptuples(Tuplesortstate *state, bool alltuples)
2280 {
2281         while (alltuples ||
2282                    (LACKMEM(state) && state->memtupcount > 1) ||
2283                    state->memtupcount >= state->memtupsize)
2284         {
2285                 /*
2286                  * Dump the heap's frontmost entry, and sift up to remove it from the
2287                  * heap.
2288                  */
2289                 Assert(state->memtupcount > 0);
2290                 WRITETUP(state, state->tp_tapenum[state->destTape],
2291                                  &state->memtuples[0]);
2292                 tuplesort_heap_siftup(state, true);
2293
2294                 /*
2295                  * If the heap is empty *or* top run number has changed, we've
2296                  * finished the current run.
2297                  */
2298                 if (state->memtupcount == 0 ||
2299                         state->currentRun != state->memtuples[0].tupindex)
2300                 {
2301                         markrunend(state, state->tp_tapenum[state->destTape]);
2302                         state->currentRun++;
2303                         state->tp_runs[state->destTape]++;
2304                         state->tp_dummy[state->destTape]--; /* per Alg D step D2 */
2305
2306 #ifdef TRACE_SORT
2307                         if (trace_sort)
2308                                 elog(LOG, "finished writing%s run %d to tape %d: %s",
2309                                          (state->memtupcount == 0) ? " final" : "",
2310                                          state->currentRun, state->destTape,
2311                                          pg_rusage_show(&state->ru_start));
2312 #endif
2313
2314                         /*
2315                          * Done if heap is empty, else prepare for new run.
2316                          */
2317                         if (state->memtupcount == 0)
2318                                 break;
2319                         Assert(state->currentRun == state->memtuples[0].tupindex);
2320                         selectnewtape(state);
2321                 }
2322         }
2323 }
2324
2325 /*
2326  * tuplesort_rescan             - rewind and replay the scan
2327  */
2328 void
2329 tuplesort_rescan(Tuplesortstate *state)
2330 {
2331         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
2332
2333         Assert(state->randomAccess);
2334
2335         switch (state->status)
2336         {
2337                 case TSS_SORTEDINMEM:
2338                         state->current = 0;
2339                         state->eof_reached = false;
2340                         state->markpos_offset = 0;
2341                         state->markpos_eof = false;
2342                         break;
2343                 case TSS_SORTEDONTAPE:
2344                         LogicalTapeRewind(state->tapeset,
2345                                                           state->result_tape,
2346                                                           false);
2347                         state->eof_reached = false;
2348                         state->markpos_block = 0L;
2349                         state->markpos_offset = 0;
2350                         state->markpos_eof = false;
2351                         break;
2352                 default:
2353                         elog(ERROR, "invalid tuplesort state");
2354                         break;
2355         }
2356
2357         MemoryContextSwitchTo(oldcontext);
2358 }
2359
2360 /*
2361  * tuplesort_markpos    - saves current position in the merged sort file
2362  */
2363 void
2364 tuplesort_markpos(Tuplesortstate *state)
2365 {
2366         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
2367
2368         Assert(state->randomAccess);
2369
2370         switch (state->status)
2371         {
2372                 case TSS_SORTEDINMEM:
2373                         state->markpos_offset = state->current;
2374                         state->markpos_eof = state->eof_reached;
2375                         break;
2376                 case TSS_SORTEDONTAPE:
2377                         LogicalTapeTell(state->tapeset,
2378                                                         state->result_tape,
2379                                                         &state->markpos_block,
2380                                                         &state->markpos_offset);
2381                         state->markpos_eof = state->eof_reached;
2382                         break;
2383                 default:
2384                         elog(ERROR, "invalid tuplesort state");
2385                         break;
2386         }
2387
2388         MemoryContextSwitchTo(oldcontext);
2389 }
2390
2391 /*
2392  * tuplesort_restorepos - restores current position in merged sort file to
2393  *                                                last saved position
2394  */
2395 void
2396 tuplesort_restorepos(Tuplesortstate *state)
2397 {
2398         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
2399
2400         Assert(state->randomAccess);
2401
2402         switch (state->status)
2403         {
2404                 case TSS_SORTEDINMEM:
2405                         state->current = state->markpos_offset;
2406                         state->eof_reached = state->markpos_eof;
2407                         break;
2408                 case TSS_SORTEDONTAPE:
2409                         if (!LogicalTapeSeek(state->tapeset,
2410                                                                  state->result_tape,
2411                                                                  state->markpos_block,
2412                                                                  state->markpos_offset))
2413                                 elog(ERROR, "tuplesort_restorepos failed");
2414                         state->eof_reached = state->markpos_eof;
2415                         break;
2416                 default:
2417                         elog(ERROR, "invalid tuplesort state");
2418                         break;
2419         }
2420
2421         MemoryContextSwitchTo(oldcontext);
2422 }
2423
2424 /*
2425  * tuplesort_get_stats - extract summary statistics
2426  *
2427  * This can be called after tuplesort_performsort() finishes to obtain
2428  * printable summary information about how the sort was performed.
2429  * spaceUsed is measured in kilobytes.
2430  */
2431 void
2432 tuplesort_get_stats(Tuplesortstate *state,
2433                                         const char **sortMethod,
2434                                         const char **spaceType,
2435                                         long *spaceUsed)
2436 {
2437         /*
2438          * Note: it might seem we should provide both memory and disk usage for a
2439          * disk-based sort.  However, the current code doesn't track memory space
2440          * accurately once we have begun to return tuples to the caller (since we
2441          * don't account for pfree's the caller is expected to do), so we cannot
2442          * rely on availMem in a disk sort.  This does not seem worth the overhead
2443          * to fix.      Is it worth creating an API for the memory context code to
2444          * tell us how much is actually used in sortcontext?
2445          */
2446         if (state->tapeset)
2447         {
2448                 *spaceType = "Disk";
2449                 *spaceUsed = LogicalTapeSetBlocks(state->tapeset) * (BLCKSZ / 1024);
2450         }
2451         else
2452         {
2453                 *spaceType = "Memory";
2454                 *spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
2455         }
2456
2457         switch (state->status)
2458         {
2459                 case TSS_SORTEDINMEM:
2460                         if (state->boundUsed)
2461                                 *sortMethod = "top-N heapsort";
2462                         else
2463                                 *sortMethod = "quicksort";
2464                         break;
2465                 case TSS_SORTEDONTAPE:
2466                         *sortMethod = "external sort";
2467                         break;
2468                 case TSS_FINALMERGE:
2469                         *sortMethod = "external merge";
2470                         break;
2471                 default:
2472                         *sortMethod = "still in progress";
2473                         break;
2474         }
2475 }
2476
2477
2478 /*
2479  * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
2480  *
2481  * Compare two SortTuples.      If checkIndex is true, use the tuple index
2482  * as the front of the sort key; otherwise, no.
2483  */
2484
2485 #define HEAPCOMPARE(tup1,tup2) \
2486         (checkIndex && ((tup1)->tupindex != (tup2)->tupindex) ? \
2487          ((tup1)->tupindex) - ((tup2)->tupindex) : \
2488          COMPARETUP(state, tup1, tup2))
2489
2490 /*
2491  * Convert the existing unordered array of SortTuples to a bounded heap,
2492  * discarding all but the smallest "state->bound" tuples.
2493  *
2494  * When working with a bounded heap, we want to keep the largest entry
2495  * at the root (array entry zero), instead of the smallest as in the normal
2496  * sort case.  This allows us to discard the largest entry cheaply.
2497  * Therefore, we temporarily reverse the sort direction.
2498  *
2499  * We assume that all entries in a bounded heap will always have tupindex
2500  * zero; it therefore doesn't matter that HEAPCOMPARE() doesn't reverse
2501  * the direction of comparison for tupindexes.
2502  */
2503 static void
2504 make_bounded_heap(Tuplesortstate *state)
2505 {
2506         int                     tupcount = state->memtupcount;
2507         int                     i;
2508
2509         Assert(state->status == TSS_INITIAL);
2510         Assert(state->bounded);
2511         Assert(tupcount >= state->bound);
2512
2513         /* Reverse sort direction so largest entry will be at root */
2514         REVERSEDIRECTION(state);
2515
2516         state->memtupcount = 0;         /* make the heap empty */
2517         for (i = 0; i < tupcount; i++)
2518         {
2519                 if (state->memtupcount >= state->bound &&
2520                   COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0)
2521                 {
2522                         /* New tuple would just get thrown out, so skip it */
2523                         free_sort_tuple(state, &state->memtuples[i]);
2524                         CHECK_FOR_INTERRUPTS();
2525                 }
2526                 else
2527                 {
2528                         /* Insert next tuple into heap */
2529                         /* Must copy source tuple to avoid possible overwrite */
2530                         SortTuple       stup = state->memtuples[i];
2531
2532                         tuplesort_heap_insert(state, &stup, 0, false);
2533
2534                         /* If heap too full, discard largest entry */
2535                         if (state->memtupcount > state->bound)
2536                         {
2537                                 free_sort_tuple(state, &state->memtuples[0]);
2538                                 tuplesort_heap_siftup(state, false);
2539                         }
2540                 }
2541         }
2542
2543         Assert(state->memtupcount == state->bound);
2544         state->status = TSS_BOUNDED;
2545 }
2546
2547 /*
2548  * Convert the bounded heap to a properly-sorted array
2549  */
2550 static void
2551 sort_bounded_heap(Tuplesortstate *state)
2552 {
2553         int                     tupcount = state->memtupcount;
2554
2555         Assert(state->status == TSS_BOUNDED);
2556         Assert(state->bounded);
2557         Assert(tupcount == state->bound);
2558
2559         /*
2560          * We can unheapify in place because each sift-up will remove the largest
2561          * entry, which we can promptly store in the newly freed slot at the end.
2562          * Once we're down to a single-entry heap, we're done.
2563          */
2564         while (state->memtupcount > 1)
2565         {
2566                 SortTuple       stup = state->memtuples[0];
2567
2568                 /* this sifts-up the next-largest entry and decreases memtupcount */
2569                 tuplesort_heap_siftup(state, false);
2570                 state->memtuples[state->memtupcount] = stup;
2571         }
2572         state->memtupcount = tupcount;
2573
2574         /*
2575          * Reverse sort direction back to the original state.  This is not
2576          * actually necessary but seems like a good idea for tidiness.
2577          */
2578         REVERSEDIRECTION(state);
2579
2580         state->status = TSS_SORTEDINMEM;
2581         state->boundUsed = true;
2582 }
2583
2584 /*
2585  * Insert a new tuple into an empty or existing heap, maintaining the
2586  * heap invariant.      Caller is responsible for ensuring there's room.
2587  *
2588  * Note: we assume *tuple is a temporary variable that can be scribbled on.
2589  * For some callers, tuple actually points to a memtuples[] entry above the
2590  * end of the heap.  This is safe as long as it's not immediately adjacent
2591  * to the end of the heap (ie, in the [memtupcount] array entry) --- if it
2592  * is, it might get overwritten before being moved into the heap!
2593  */
2594 static void
2595 tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple,
2596                                           int tupleindex, bool checkIndex)
2597 {
2598         SortTuple  *memtuples;
2599         int                     j;
2600
2601         /*
2602          * Save the tupleindex --- see notes above about writing on *tuple. It's a
2603          * historical artifact that tupleindex is passed as a separate argument
2604          * and not in *tuple, but it's notationally convenient so let's leave it
2605          * that way.
2606          */
2607         tuple->tupindex = tupleindex;
2608
2609         memtuples = state->memtuples;
2610         Assert(state->memtupcount < state->memtupsize);
2611
2612         CHECK_FOR_INTERRUPTS();
2613
2614         /*
2615          * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is
2616          * using 1-based array indexes, not 0-based.
2617          */
2618         j = state->memtupcount++;
2619         while (j > 0)
2620         {
2621                 int                     i = (j - 1) >> 1;
2622
2623                 if (HEAPCOMPARE(tuple, &memtuples[i]) >= 0)
2624                         break;
2625                 memtuples[j] = memtuples[i];
2626                 j = i;
2627         }
2628         memtuples[j] = *tuple;
2629 }
2630
2631 /*
2632  * The tuple at state->memtuples[0] has been removed from the heap.
2633  * Decrement memtupcount, and sift up to maintain the heap invariant.
2634  */
2635 static void
2636 tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex)
2637 {
2638         SortTuple  *memtuples = state->memtuples;
2639         SortTuple  *tuple;
2640         int                     i,
2641                                 n;
2642
2643         if (--state->memtupcount <= 0)
2644                 return;
2645
2646         CHECK_FOR_INTERRUPTS();
2647
2648         n = state->memtupcount;
2649         tuple = &memtuples[n];          /* tuple that must be reinserted */
2650         i = 0;                                          /* i is where the "hole" is */
2651         for (;;)
2652         {
2653                 int                     j = 2 * i + 1;
2654
2655                 if (j >= n)
2656                         break;
2657                 if (j + 1 < n &&
2658                         HEAPCOMPARE(&memtuples[j], &memtuples[j + 1]) > 0)
2659                         j++;
2660                 if (HEAPCOMPARE(tuple, &memtuples[j]) <= 0)
2661                         break;
2662                 memtuples[i] = memtuples[j];
2663                 i = j;
2664         }
2665         memtuples[i] = *tuple;
2666 }
2667
2668
2669 /*
2670  * Tape interface routines
2671  */
2672
2673 static unsigned int
2674 getlen(Tuplesortstate *state, int tapenum, bool eofOK)
2675 {
2676         unsigned int len;
2677
2678         if (LogicalTapeRead(state->tapeset, tapenum,
2679                                                 &len, sizeof(len)) != sizeof(len))
2680                 elog(ERROR, "unexpected end of tape");
2681         if (len == 0 && !eofOK)
2682                 elog(ERROR, "unexpected end of data");
2683         return len;
2684 }
2685
2686 static void
2687 markrunend(Tuplesortstate *state, int tapenum)
2688 {
2689         unsigned int len = 0;
2690
2691         LogicalTapeWrite(state->tapeset, tapenum, (void *) &len, sizeof(len));
2692 }
2693
2694
2695 /*
2696  * Inline-able copy of FunctionCall2Coll() to save some cycles in sorting.
2697  */
2698 static inline Datum
2699 myFunctionCall2Coll(FmgrInfo *flinfo, Oid collation, Datum arg1, Datum arg2)
2700 {
2701         FunctionCallInfoData fcinfo;
2702         Datum           result;
2703
2704         InitFunctionCallInfoData(fcinfo, flinfo, 2, collation, NULL, NULL);
2705
2706         fcinfo.arg[0] = arg1;
2707         fcinfo.arg[1] = arg2;
2708         fcinfo.argnull[0] = false;
2709         fcinfo.argnull[1] = false;
2710
2711         result = FunctionCallInvoke(&fcinfo);
2712
2713         /* Check for null result, since caller is clearly not expecting one */
2714         if (fcinfo.isnull)
2715                 elog(ERROR, "function %u returned NULL", fcinfo.flinfo->fn_oid);
2716
2717         return result;
2718 }
2719
2720 /*
2721  * Apply a sort function (by now converted to fmgr lookup form)
2722  * and return a 3-way comparison result.  This takes care of handling
2723  * reverse-sort and NULLs-ordering properly.  We assume that DESC and
2724  * NULLS_FIRST options are encoded in sk_flags the same way btree does it.
2725  */
2726 static inline int32
2727 inlineApplySortFunction(FmgrInfo *sortFunction, int sk_flags, Oid collation,
2728                                                 Datum datum1, bool isNull1,
2729                                                 Datum datum2, bool isNull2)
2730 {
2731         int32           compare;
2732
2733         if (isNull1)
2734         {
2735                 if (isNull2)
2736                         compare = 0;            /* NULL "=" NULL */
2737                 else if (sk_flags & SK_BT_NULLS_FIRST)
2738                         compare = -1;           /* NULL "<" NOT_NULL */
2739                 else
2740                         compare = 1;            /* NULL ">" NOT_NULL */
2741         }
2742         else if (isNull2)
2743         {
2744                 if (sk_flags & SK_BT_NULLS_FIRST)
2745                         compare = 1;            /* NOT_NULL ">" NULL */
2746                 else
2747                         compare = -1;           /* NOT_NULL "<" NULL */
2748         }
2749         else
2750         {
2751                 compare = DatumGetInt32(myFunctionCall2Coll(sortFunction, collation,
2752                                                                                                         datum1, datum2));
2753
2754                 if (sk_flags & SK_BT_DESC)
2755                         compare = -compare;
2756         }
2757
2758         return compare;
2759 }
2760
2761
2762 /*
2763  * Routines specialized for HeapTuple (actually MinimalTuple) case
2764  */
2765
2766 static int
2767 comparetup_heap(const SortTuple *a, const SortTuple *b, Tuplesortstate *state)
2768 {
2769         SortSupport sortKey = state->sortKeys;
2770         HeapTupleData ltup;
2771         HeapTupleData rtup;
2772         TupleDesc       tupDesc;
2773         int                     nkey;
2774         int32           compare;
2775
2776         /* Compare the leading sort key */
2777         compare = ApplySortComparator(a->datum1, a->isnull1,
2778                                                                   b->datum1, b->isnull1,
2779                                                                   sortKey);
2780         if (compare != 0)
2781                 return compare;
2782
2783         /* Compare additional sort keys */
2784         ltup.t_len = ((MinimalTuple) a->tuple)->t_len + MINIMAL_TUPLE_OFFSET;
2785         ltup.t_data = (HeapTupleHeader) ((char *) a->tuple - MINIMAL_TUPLE_OFFSET);
2786         rtup.t_len = ((MinimalTuple) b->tuple)->t_len + MINIMAL_TUPLE_OFFSET;
2787         rtup.t_data = (HeapTupleHeader) ((char *) b->tuple - MINIMAL_TUPLE_OFFSET);
2788         tupDesc = state->tupDesc;
2789         sortKey++;
2790         for (nkey = 1; nkey < state->nKeys; nkey++, sortKey++)
2791         {
2792                 AttrNumber      attno = sortKey->ssup_attno;
2793                 Datum           datum1,
2794                                         datum2;
2795                 bool            isnull1,
2796                                         isnull2;
2797
2798                 datum1 = heap_getattr(&ltup, attno, tupDesc, &isnull1);
2799                 datum2 = heap_getattr(&rtup, attno, tupDesc, &isnull2);
2800
2801                 compare = ApplySortComparator(datum1, isnull1,
2802                                                                           datum2, isnull2,
2803                                                                           sortKey);
2804                 if (compare != 0)
2805                         return compare;
2806         }
2807
2808         return 0;
2809 }
2810
2811 static void
2812 copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup)
2813 {
2814         /*
2815          * We expect the passed "tup" to be a TupleTableSlot, and form a
2816          * MinimalTuple using the exported interface for that.
2817          */
2818         TupleTableSlot *slot = (TupleTableSlot *) tup;
2819         MinimalTuple tuple;
2820         HeapTupleData htup;
2821
2822         /* copy the tuple into sort storage */
2823         tuple = ExecCopySlotMinimalTuple(slot);
2824         stup->tuple = (void *) tuple;
2825         USEMEM(state, GetMemoryChunkSpace(tuple));
2826         /* set up first-column key value */
2827         htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET;
2828         htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET);
2829         stup->datum1 = heap_getattr(&htup,
2830                                                                 state->sortKeys[0].ssup_attno,
2831                                                                 state->tupDesc,
2832                                                                 &stup->isnull1);
2833 }
2834
2835 static void
2836 writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup)
2837 {
2838         MinimalTuple tuple = (MinimalTuple) stup->tuple;
2839
2840         /* the part of the MinimalTuple we'll write: */
2841         char       *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
2842         unsigned int tupbodylen = tuple->t_len - MINIMAL_TUPLE_DATA_OFFSET;
2843
2844         /* total on-disk footprint: */
2845         unsigned int tuplen = tupbodylen + sizeof(int);
2846
2847         LogicalTapeWrite(state->tapeset, tapenum,
2848                                          (void *) &tuplen, sizeof(tuplen));
2849         LogicalTapeWrite(state->tapeset, tapenum,
2850                                          (void *) tupbody, tupbodylen);
2851         if (state->randomAccess)        /* need trailing length word? */
2852                 LogicalTapeWrite(state->tapeset, tapenum,
2853                                                  (void *) &tuplen, sizeof(tuplen));
2854
2855         FREEMEM(state, GetMemoryChunkSpace(tuple));
2856         heap_free_minimal_tuple(tuple);
2857 }
2858
2859 static void
2860 readtup_heap(Tuplesortstate *state, SortTuple *stup,
2861                          int tapenum, unsigned int len)
2862 {
2863         unsigned int tupbodylen = len - sizeof(int);
2864         unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET;
2865         MinimalTuple tuple = (MinimalTuple) palloc(tuplen);
2866         char       *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
2867         HeapTupleData htup;
2868
2869         USEMEM(state, GetMemoryChunkSpace(tuple));
2870         /* read in the tuple proper */
2871         tuple->t_len = tuplen;
2872         LogicalTapeReadExact(state->tapeset, tapenum,
2873                                                  tupbody, tupbodylen);
2874         if (state->randomAccess)        /* need trailing length word? */
2875                 LogicalTapeReadExact(state->tapeset, tapenum,
2876                                                          &tuplen, sizeof(tuplen));
2877         stup->tuple = (void *) tuple;
2878         /* set up first-column key value */
2879         htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET;
2880         htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET);
2881         stup->datum1 = heap_getattr(&htup,
2882                                                                 state->sortKeys[0].ssup_attno,
2883                                                                 state->tupDesc,
2884                                                                 &stup->isnull1);
2885 }
2886
2887 static void
2888 reversedirection_heap(Tuplesortstate *state)
2889 {
2890         SortSupport sortKey = state->sortKeys;
2891         int                     nkey;
2892
2893         for (nkey = 0; nkey < state->nKeys; nkey++, sortKey++)
2894         {
2895                 sortKey->ssup_reverse = !sortKey->ssup_reverse;
2896                 sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first;
2897         }
2898 }
2899
2900
2901 /*
2902  * Routines specialized for the CLUSTER case (HeapTuple data, with
2903  * comparisons per a btree index definition)
2904  */
2905
2906 static int
2907 comparetup_cluster(const SortTuple *a, const SortTuple *b,
2908                                    Tuplesortstate *state)
2909 {
2910         ScanKey         scanKey = state->indexScanKey;
2911         HeapTuple       ltup;
2912         HeapTuple       rtup;
2913         TupleDesc       tupDesc;
2914         int                     nkey;
2915         int32           compare;
2916
2917         /* Compare the leading sort key, if it's simple */
2918         if (state->indexInfo->ii_KeyAttrNumbers[0] != 0)
2919         {
2920                 compare = inlineApplySortFunction(&scanKey->sk_func, scanKey->sk_flags,
2921                                                                                   scanKey->sk_collation,
2922                                                                                   a->datum1, a->isnull1,
2923                                                                                   b->datum1, b->isnull1);
2924                 if (compare != 0 || state->nKeys == 1)
2925                         return compare;
2926                 /* Compare additional columns the hard way */
2927                 scanKey++;
2928                 nkey = 1;
2929         }
2930         else
2931         {
2932                 /* Must compare all keys the hard way */
2933                 nkey = 0;
2934         }
2935
2936         /* Compare additional sort keys */
2937         ltup = (HeapTuple) a->tuple;
2938         rtup = (HeapTuple) b->tuple;
2939
2940         if (state->indexInfo->ii_Expressions == NULL)
2941         {
2942                 /* If not expression index, just compare the proper heap attrs */
2943                 tupDesc = state->tupDesc;
2944
2945                 for (; nkey < state->nKeys; nkey++, scanKey++)
2946                 {
2947                         AttrNumber      attno = state->indexInfo->ii_KeyAttrNumbers[nkey];
2948                         Datum           datum1,
2949                                                 datum2;
2950                         bool            isnull1,
2951                                                 isnull2;
2952
2953                         datum1 = heap_getattr(ltup, attno, tupDesc, &isnull1);
2954                         datum2 = heap_getattr(rtup, attno, tupDesc, &isnull2);
2955
2956                         compare = inlineApplySortFunction(&scanKey->sk_func,
2957                                                                                           scanKey->sk_flags,
2958                                                                                           scanKey->sk_collation,
2959                                                                                           datum1, isnull1,
2960                                                                                           datum2, isnull2);
2961                         if (compare != 0)
2962                                 return compare;
2963                 }
2964         }
2965         else
2966         {
2967                 /*
2968                  * In the expression index case, compute the whole index tuple and
2969                  * then compare values.  It would perhaps be faster to compute only as
2970                  * many columns as we need to compare, but that would require
2971                  * duplicating all the logic in FormIndexDatum.
2972                  */
2973                 Datum           l_index_values[INDEX_MAX_KEYS];
2974                 bool            l_index_isnull[INDEX_MAX_KEYS];
2975                 Datum           r_index_values[INDEX_MAX_KEYS];
2976                 bool            r_index_isnull[INDEX_MAX_KEYS];
2977                 TupleTableSlot *ecxt_scantuple;
2978
2979                 /* Reset context each time to prevent memory leakage */
2980                 ResetPerTupleExprContext(state->estate);
2981
2982                 ecxt_scantuple = GetPerTupleExprContext(state->estate)->ecxt_scantuple;
2983
2984                 ExecStoreTuple(ltup, ecxt_scantuple, InvalidBuffer, false);
2985                 FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate,
2986                                            l_index_values, l_index_isnull);
2987
2988                 ExecStoreTuple(rtup, ecxt_scantuple, InvalidBuffer, false);
2989                 FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate,
2990                                            r_index_values, r_index_isnull);
2991
2992                 for (; nkey < state->nKeys; nkey++, scanKey++)
2993                 {
2994                         compare = inlineApplySortFunction(&scanKey->sk_func,
2995                                                                                           scanKey->sk_flags,
2996                                                                                           scanKey->sk_collation,
2997                                                                                           l_index_values[nkey],
2998                                                                                           l_index_isnull[nkey],
2999                                                                                           r_index_values[nkey],
3000                                                                                           r_index_isnull[nkey]);
3001                         if (compare != 0)
3002                                 return compare;
3003                 }
3004         }
3005
3006         return 0;
3007 }
3008
3009 static void
3010 copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup)
3011 {
3012         HeapTuple       tuple = (HeapTuple) tup;
3013
3014         /* copy the tuple into sort storage */
3015         tuple = heap_copytuple(tuple);
3016         stup->tuple = (void *) tuple;
3017         USEMEM(state, GetMemoryChunkSpace(tuple));
3018         /* set up first-column key value, if it's a simple column */
3019         if (state->indexInfo->ii_KeyAttrNumbers[0] != 0)
3020                 stup->datum1 = heap_getattr(tuple,
3021                                                                         state->indexInfo->ii_KeyAttrNumbers[0],
3022                                                                         state->tupDesc,
3023                                                                         &stup->isnull1);
3024 }
3025
3026 static void
3027 writetup_cluster(Tuplesortstate *state, int tapenum, SortTuple *stup)
3028 {
3029         HeapTuple       tuple = (HeapTuple) stup->tuple;
3030         unsigned int tuplen = tuple->t_len + sizeof(ItemPointerData) + sizeof(int);
3031
3032         /* We need to store t_self, but not other fields of HeapTupleData */
3033         LogicalTapeWrite(state->tapeset, tapenum,
3034                                          &tuplen, sizeof(tuplen));
3035         LogicalTapeWrite(state->tapeset, tapenum,
3036                                          &tuple->t_self, sizeof(ItemPointerData));
3037         LogicalTapeWrite(state->tapeset, tapenum,
3038                                          tuple->t_data, tuple->t_len);
3039         if (state->randomAccess)        /* need trailing length word? */
3040                 LogicalTapeWrite(state->tapeset, tapenum,
3041                                                  &tuplen, sizeof(tuplen));
3042
3043         FREEMEM(state, GetMemoryChunkSpace(tuple));
3044         heap_freetuple(tuple);
3045 }
3046
3047 static void
3048 readtup_cluster(Tuplesortstate *state, SortTuple *stup,
3049                                 int tapenum, unsigned int tuplen)
3050 {
3051         unsigned int t_len = tuplen - sizeof(ItemPointerData) - sizeof(int);
3052         HeapTuple       tuple = (HeapTuple) palloc(t_len + HEAPTUPLESIZE);
3053
3054         USEMEM(state, GetMemoryChunkSpace(tuple));
3055         /* Reconstruct the HeapTupleData header */
3056         tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
3057         tuple->t_len = t_len;
3058         LogicalTapeReadExact(state->tapeset, tapenum,
3059                                                  &tuple->t_self, sizeof(ItemPointerData));
3060         /* We don't currently bother to reconstruct t_tableOid */
3061         tuple->t_tableOid = InvalidOid;
3062         /* Read in the tuple body */
3063         LogicalTapeReadExact(state->tapeset, tapenum,
3064                                                  tuple->t_data, tuple->t_len);
3065         if (state->randomAccess)        /* need trailing length word? */
3066                 LogicalTapeReadExact(state->tapeset, tapenum,
3067                                                          &tuplen, sizeof(tuplen));
3068         stup->tuple = (void *) tuple;
3069         /* set up first-column key value, if it's a simple column */
3070         if (state->indexInfo->ii_KeyAttrNumbers[0] != 0)
3071                 stup->datum1 = heap_getattr(tuple,
3072                                                                         state->indexInfo->ii_KeyAttrNumbers[0],
3073                                                                         state->tupDesc,
3074                                                                         &stup->isnull1);
3075 }
3076
3077
3078 /*
3079  * Routines specialized for IndexTuple case
3080  *
3081  * The btree and hash cases require separate comparison functions, but the
3082  * IndexTuple representation is the same so the copy/write/read support
3083  * functions can be shared.
3084  */
3085
3086 static int
3087 comparetup_index_btree(const SortTuple *a, const SortTuple *b,
3088                                            Tuplesortstate *state)
3089 {
3090         /*
3091          * This is similar to _bt_tuplecompare(), but we have already done the
3092          * index_getattr calls for the first column, and we need to keep track of
3093          * whether any null fields are present.  Also see the special treatment
3094          * for equal keys at the end.
3095          */
3096         ScanKey         scanKey = state->indexScanKey;
3097         IndexTuple      tuple1;
3098         IndexTuple      tuple2;
3099         int                     keysz;
3100         TupleDesc       tupDes;
3101         bool            equal_hasnull = false;
3102         int                     nkey;
3103         int32           compare;
3104
3105         /* Compare the leading sort key */
3106         compare = inlineApplySortFunction(&scanKey->sk_func, scanKey->sk_flags,
3107                                                                           scanKey->sk_collation,
3108                                                                           a->datum1, a->isnull1,
3109                                                                           b->datum1, b->isnull1);
3110         if (compare != 0)
3111                 return compare;
3112
3113         /* they are equal, so we only need to examine one null flag */
3114         if (a->isnull1)
3115                 equal_hasnull = true;
3116
3117         /* Compare additional sort keys */
3118         tuple1 = (IndexTuple) a->tuple;
3119         tuple2 = (IndexTuple) b->tuple;
3120         keysz = state->nKeys;
3121         tupDes = RelationGetDescr(state->indexRel);
3122         scanKey++;
3123         for (nkey = 2; nkey <= keysz; nkey++, scanKey++)
3124         {
3125                 Datum           datum1,
3126                                         datum2;
3127                 bool            isnull1,
3128                                         isnull2;
3129
3130                 datum1 = index_getattr(tuple1, nkey, tupDes, &isnull1);
3131                 datum2 = index_getattr(tuple2, nkey, tupDes, &isnull2);
3132
3133                 compare = inlineApplySortFunction(&scanKey->sk_func, scanKey->sk_flags,
3134                                                                                   scanKey->sk_collation,
3135                                                                                   datum1, isnull1,
3136                                                                                   datum2, isnull2);
3137                 if (compare != 0)
3138                         return compare;         /* done when we find unequal attributes */
3139
3140                 /* they are equal, so we only need to examine one null flag */
3141                 if (isnull1)
3142                         equal_hasnull = true;
3143         }
3144
3145         /*
3146          * If btree has asked us to enforce uniqueness, complain if two equal
3147          * tuples are detected (unless there was at least one NULL field).
3148          *
3149          * It is sufficient to make the test here, because if two tuples are equal
3150          * they *must* get compared at some stage of the sort --- otherwise the
3151          * sort algorithm wouldn't have checked whether one must appear before the
3152          * other.
3153          */
3154         if (state->enforceUnique && !equal_hasnull)
3155         {
3156                 Datum           values[INDEX_MAX_KEYS];
3157                 bool            isnull[INDEX_MAX_KEYS];
3158
3159                 /*
3160                  * Some rather brain-dead implementations of qsort (such as the one in
3161                  * QNX 4) will sometimes call the comparison routine to compare a
3162                  * value to itself, but we always use our own implementation, which
3163                  * does not.
3164                  */
3165                 Assert(tuple1 != tuple2);
3166
3167                 index_deform_tuple(tuple1, tupDes, values, isnull);
3168                 ereport(ERROR,
3169                                 (errcode(ERRCODE_UNIQUE_VIOLATION),
3170                                  errmsg("could not create unique index \"%s\"",
3171                                                 RelationGetRelationName(state->indexRel)),
3172                                  errdetail("Key %s is duplicated.",
3173                                                    BuildIndexValueDescription(state->indexRel,
3174                                                                                                           values, isnull))));
3175         }
3176
3177         /*
3178          * If key values are equal, we sort on ItemPointer.  This does not affect
3179          * validity of the finished index, but it may be useful to have index
3180          * scans in physical order.
3181          */
3182         {
3183                 BlockNumber blk1 = ItemPointerGetBlockNumber(&tuple1->t_tid);
3184                 BlockNumber blk2 = ItemPointerGetBlockNumber(&tuple2->t_tid);
3185
3186                 if (blk1 != blk2)
3187                         return (blk1 < blk2) ? -1 : 1;
3188         }
3189         {
3190                 OffsetNumber pos1 = ItemPointerGetOffsetNumber(&tuple1->t_tid);
3191                 OffsetNumber pos2 = ItemPointerGetOffsetNumber(&tuple2->t_tid);
3192
3193                 if (pos1 != pos2)
3194                         return (pos1 < pos2) ? -1 : 1;
3195         }
3196
3197         return 0;
3198 }
3199
3200 static int
3201 comparetup_index_hash(const SortTuple *a, const SortTuple *b,
3202                                           Tuplesortstate *state)
3203 {
3204         uint32          hash1;
3205         uint32          hash2;
3206         IndexTuple      tuple1;
3207         IndexTuple      tuple2;
3208
3209         /*
3210          * Fetch hash keys and mask off bits we don't want to sort by. We know
3211          * that the first column of the index tuple is the hash key.
3212          */
3213         Assert(!a->isnull1);
3214         hash1 = DatumGetUInt32(a->datum1) & state->hash_mask;
3215         Assert(!b->isnull1);
3216         hash2 = DatumGetUInt32(b->datum1) & state->hash_mask;
3217
3218         if (hash1 > hash2)
3219                 return 1;
3220         else if (hash1 < hash2)
3221                 return -1;
3222
3223         /*
3224          * If hash values are equal, we sort on ItemPointer.  This does not affect
3225          * validity of the finished index, but it may be useful to have index
3226          * scans in physical order.
3227          */
3228         tuple1 = (IndexTuple) a->tuple;
3229         tuple2 = (IndexTuple) b->tuple;
3230
3231         {
3232                 BlockNumber blk1 = ItemPointerGetBlockNumber(&tuple1->t_tid);
3233                 BlockNumber blk2 = ItemPointerGetBlockNumber(&tuple2->t_tid);
3234
3235                 if (blk1 != blk2)
3236                         return (blk1 < blk2) ? -1 : 1;
3237         }
3238         {
3239                 OffsetNumber pos1 = ItemPointerGetOffsetNumber(&tuple1->t_tid);
3240                 OffsetNumber pos2 = ItemPointerGetOffsetNumber(&tuple2->t_tid);
3241
3242                 if (pos1 != pos2)
3243                         return (pos1 < pos2) ? -1 : 1;
3244         }
3245
3246         return 0;
3247 }
3248
3249 static void
3250 copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup)
3251 {
3252         IndexTuple      tuple = (IndexTuple) tup;
3253         unsigned int tuplen = IndexTupleSize(tuple);
3254         IndexTuple      newtuple;
3255
3256         /* copy the tuple into sort storage */
3257         newtuple = (IndexTuple) palloc(tuplen);
3258         memcpy(newtuple, tuple, tuplen);
3259         USEMEM(state, GetMemoryChunkSpace(newtuple));
3260         stup->tuple = (void *) newtuple;
3261         /* set up first-column key value */
3262         stup->datum1 = index_getattr(newtuple,
3263                                                                  1,
3264                                                                  RelationGetDescr(state->indexRel),
3265                                                                  &stup->isnull1);
3266 }
3267
3268 static void
3269 writetup_index(Tuplesortstate *state, int tapenum, SortTuple *stup)
3270 {
3271         IndexTuple      tuple = (IndexTuple) stup->tuple;
3272         unsigned int tuplen;
3273
3274         tuplen = IndexTupleSize(tuple) + sizeof(tuplen);
3275         LogicalTapeWrite(state->tapeset, tapenum,
3276                                          (void *) &tuplen, sizeof(tuplen));
3277         LogicalTapeWrite(state->tapeset, tapenum,
3278                                          (void *) tuple, IndexTupleSize(tuple));
3279         if (state->randomAccess)        /* need trailing length word? */
3280                 LogicalTapeWrite(state->tapeset, tapenum,
3281                                                  (void *) &tuplen, sizeof(tuplen));
3282
3283         FREEMEM(state, GetMemoryChunkSpace(tuple));
3284         pfree(tuple);
3285 }
3286
3287 static void
3288 readtup_index(Tuplesortstate *state, SortTuple *stup,
3289                           int tapenum, unsigned int len)
3290 {
3291         unsigned int tuplen = len - sizeof(unsigned int);
3292         IndexTuple      tuple = (IndexTuple) palloc(tuplen);
3293
3294         USEMEM(state, GetMemoryChunkSpace(tuple));
3295         LogicalTapeReadExact(state->tapeset, tapenum,
3296                                                  tuple, tuplen);
3297         if (state->randomAccess)        /* need trailing length word? */
3298                 LogicalTapeReadExact(state->tapeset, tapenum,
3299                                                          &tuplen, sizeof(tuplen));
3300         stup->tuple = (void *) tuple;
3301         /* set up first-column key value */
3302         stup->datum1 = index_getattr(tuple,
3303                                                                  1,
3304                                                                  RelationGetDescr(state->indexRel),
3305                                                                  &stup->isnull1);
3306 }
3307
3308 static void
3309 reversedirection_index_btree(Tuplesortstate *state)
3310 {
3311         ScanKey         scanKey = state->indexScanKey;
3312         int                     nkey;
3313
3314         for (nkey = 0; nkey < state->nKeys; nkey++, scanKey++)
3315         {
3316                 scanKey->sk_flags ^= (SK_BT_DESC | SK_BT_NULLS_FIRST);
3317         }
3318 }
3319
3320 static void
3321 reversedirection_index_hash(Tuplesortstate *state)
3322 {
3323         /* We don't support reversing direction in a hash index sort */
3324         elog(ERROR, "reversedirection_index_hash is not implemented");
3325 }
3326
3327
3328 /*
3329  * Routines specialized for DatumTuple case
3330  */
3331
3332 static int
3333 comparetup_datum(const SortTuple *a, const SortTuple *b, Tuplesortstate *state)
3334 {
3335         return ApplySortComparator(a->datum1, a->isnull1,
3336                                                            b->datum1, b->isnull1,
3337                                                            state->onlyKey);
3338 }
3339
3340 static void
3341 copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup)
3342 {
3343         /* Not currently needed */
3344         elog(ERROR, "copytup_datum() should not be called");
3345 }
3346
3347 static void
3348 writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup)
3349 {
3350         void       *waddr;
3351         unsigned int tuplen;
3352         unsigned int writtenlen;
3353
3354         if (stup->isnull1)
3355         {
3356                 waddr = NULL;
3357                 tuplen = 0;
3358         }
3359         else if (state->datumTypeByVal)
3360         {
3361                 waddr = &stup->datum1;
3362                 tuplen = sizeof(Datum);
3363         }
3364         else
3365         {
3366                 waddr = DatumGetPointer(stup->datum1);
3367                 tuplen = datumGetSize(stup->datum1, false, state->datumTypeLen);
3368                 Assert(tuplen != 0);
3369         }
3370
3371         writtenlen = tuplen + sizeof(unsigned int);
3372
3373         LogicalTapeWrite(state->tapeset, tapenum,
3374                                          (void *) &writtenlen, sizeof(writtenlen));
3375         LogicalTapeWrite(state->tapeset, tapenum,
3376                                          waddr, tuplen);
3377         if (state->randomAccess)        /* need trailing length word? */
3378                 LogicalTapeWrite(state->tapeset, tapenum,
3379                                                  (void *) &writtenlen, sizeof(writtenlen));
3380
3381         if (stup->tuple)
3382         {
3383                 FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
3384                 pfree(stup->tuple);
3385         }
3386 }
3387
3388 static void
3389 readtup_datum(Tuplesortstate *state, SortTuple *stup,
3390                           int tapenum, unsigned int len)
3391 {
3392         unsigned int tuplen = len - sizeof(unsigned int);
3393
3394         if (tuplen == 0)
3395         {
3396                 /* it's NULL */
3397                 stup->datum1 = (Datum) 0;
3398                 stup->isnull1 = true;
3399                 stup->tuple = NULL;
3400         }
3401         else if (state->datumTypeByVal)
3402         {
3403                 Assert(tuplen == sizeof(Datum));
3404                 LogicalTapeReadExact(state->tapeset, tapenum,
3405                                                          &stup->datum1, tuplen);
3406                 stup->isnull1 = false;
3407                 stup->tuple = NULL;
3408         }
3409         else
3410         {
3411                 void       *raddr = palloc(tuplen);
3412
3413                 LogicalTapeReadExact(state->tapeset, tapenum,
3414                                                          raddr, tuplen);
3415                 stup->datum1 = PointerGetDatum(raddr);
3416                 stup->isnull1 = false;
3417                 stup->tuple = raddr;
3418                 USEMEM(state, GetMemoryChunkSpace(raddr));
3419         }
3420
3421         if (state->randomAccess)        /* need trailing length word? */
3422                 LogicalTapeReadExact(state->tapeset, tapenum,
3423                                                          &tuplen, sizeof(tuplen));
3424 }
3425
3426 static void
3427 reversedirection_datum(Tuplesortstate *state)
3428 {
3429         state->onlyKey->ssup_reverse = !state->onlyKey->ssup_reverse;
3430         state->onlyKey->ssup_nulls_first = !state->onlyKey->ssup_nulls_first;
3431 }
3432
3433 /*
3434  * Convenience routine to free a tuple previously loaded into sort memory
3435  */
3436 static void
3437 free_sort_tuple(Tuplesortstate *state, SortTuple *stup)
3438 {
3439         FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
3440         pfree(stup->tuple);
3441 }