]> granicus.if.org Git - postgresql/blob - src/backend/utils/sort/tuplesort.c
Update comment.
[postgresql] / src / backend / utils / sort / tuplesort.c
1 /*-------------------------------------------------------------------------
2  *
3  * tuplesort.c
4  *        Generalized tuple sorting routines.
5  *
6  * This module handles sorting of heap tuples, index tuples, or single
7  * Datums (and could easily support other kinds of sortable objects,
8  * if necessary).  It works efficiently for both small and large amounts
9  * of data.  Small amounts are sorted in-memory using qsort().  Large
10  * amounts are sorted using temporary files and a standard external sort
11  * algorithm.
12  *
13  * See Knuth, volume 3, for more than you want to know about the external
14  * sorting algorithm.  We divide the input into sorted runs using replacement
15  * selection, in the form of a priority tree implemented as a heap
16  * (essentially his Algorithm 5.2.3H), then merge the runs using polyphase
17  * merge, Knuth's Algorithm 5.4.2D.  The logical "tapes" used by Algorithm D
18  * are implemented by logtape.c, which avoids space wastage by recycling
19  * disk space as soon as each block is read from its "tape".
20  *
21  * We do not form the initial runs using Knuth's recommended replacement
22  * selection data structure (Algorithm 5.4.1R), because it uses a fixed
23  * number of records in memory at all times.  Since we are dealing with
24  * tuples that may vary considerably in size, we want to be able to vary
25  * the number of records kept in memory to ensure full utilization of the
26  * allowed sort memory space.  So, we keep the tuples in a variable-size
27  * heap, with the next record to go out at the top of the heap.  Like
28  * Algorithm 5.4.1R, each record is stored with the run number that it
29  * must go into, and we use (run number, key) as the ordering key for the
30  * heap.  When the run number at the top of the heap changes, we know that
31  * no more records of the prior run are left in the heap.
32  *
33  * The approximate amount of memory allowed for any one sort operation
34  * is specified in kilobytes by the caller (most pass work_mem).  Initially,
35  * we absorb tuples and simply store them in an unsorted array as long as
36  * we haven't exceeded workMem.  If we reach the end of the input without
37  * exceeding workMem, we sort the array using qsort() and subsequently return
38  * tuples just by scanning the tuple array sequentially.  If we do exceed
39  * workMem, we construct a heap using Algorithm H and begin to emit tuples
40  * into sorted runs in temporary tapes, emitting just enough tuples at each
41  * step to get back within the workMem limit.  Whenever the run number at
42  * the top of the heap changes, we begin a new run with a new output tape
43  * (selected per Algorithm D).  After the end of the input is reached,
44  * we dump out remaining tuples in memory into a final run (or two),
45  * then merge the runs using Algorithm D.
46  *
47  * When merging runs, we use a heap containing just the frontmost tuple from
48  * each source run; we repeatedly output the smallest tuple and insert the
49  * next tuple from its source tape (if any).  When the heap empties, the merge
50  * is complete.  The basic merge algorithm thus needs very little memory ---
51  * only M tuples for an M-way merge, and M is constrained to a small number.
52  * However, we can still make good use of our full workMem allocation by
53  * pre-reading additional tuples from each source tape.  Without prereading,
54  * our access pattern to the temporary file would be very erratic; on average
55  * we'd read one block from each of M source tapes during the same time that
56  * we're writing M blocks to the output tape, so there is no sequentiality of
57  * access at all, defeating the read-ahead methods used by most Unix kernels.
58  * Worse, the output tape gets written into a very random sequence of blocks
59  * of the temp file, ensuring that things will be even worse when it comes
60  * time to read that tape.  A straightforward merge pass thus ends up doing a
61  * lot of waiting for disk seeks.  We can improve matters by prereading from
62  * each source tape sequentially, loading about workMem/M bytes from each tape
63  * in turn.  Then we run the merge algorithm, writing but not reading until
64  * one of the preloaded tuple series runs out.  Then we switch back to preread
65  * mode, fill memory again, and repeat.  This approach helps to localize both
66  * read and write accesses.
67  *
68  * When the caller requests random access to the sort result, we form
69  * the final sorted run on a logical tape which is then "frozen", so
70  * that we can access it randomly.  When the caller does not need random
71  * access, we return from tuplesort_performsort() as soon as we are down
72  * to one run per logical tape.  The final merge is then performed
73  * on-the-fly as the caller repeatedly calls tuplesort_getXXX; this
74  * saves one cycle of writing all the data out to disk and reading it in.
75  *
76  * Before Postgres 8.2, we always used a seven-tape polyphase merge, on the
77  * grounds that 7 is the "sweet spot" on the tapes-to-passes curve according
78  * to Knuth's figure 70 (section 5.4.2).  However, Knuth is assuming that
79  * tape drives are expensive beasts, and in particular that there will always
80  * be many more runs than tape drives.  In our implementation a "tape drive"
81  * doesn't cost much more than a few Kb of memory buffers, so we can afford
82  * to have lots of them.  In particular, if we can have as many tape drives
83  * as sorted runs, we can eliminate any repeated I/O at all.  In the current
84  * code we determine the number of tapes M on the basis of workMem: we want
85  * workMem/M to be large enough that we read a fair amount of data each time
86  * we preread from a tape, so as to maintain the locality of access described
87  * above.  Nonetheless, with large workMem we can have many tapes.
88  *
89  *
90  * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
91  * Portions Copyright (c) 1994, Regents of the University of California
92  *
93  * IDENTIFICATION
94  *        src/backend/utils/sort/tuplesort.c
95  *
96  *-------------------------------------------------------------------------
97  */
98
99 #include "postgres.h"
100
101 #include <limits.h>
102
103 #include "access/htup_details.h"
104 #include "access/nbtree.h"
105 #include "catalog/index.h"
106 #include "commands/tablespace.h"
107 #include "executor/executor.h"
108 #include "miscadmin.h"
109 #include "pg_trace.h"
110 #include "utils/datum.h"
111 #include "utils/logtape.h"
112 #include "utils/lsyscache.h"
113 #include "utils/memutils.h"
114 #include "utils/pg_rusage.h"
115 #include "utils/rel.h"
116 #include "utils/sortsupport.h"
117 #include "utils/tuplesort.h"
118
119
120 /* sort-type codes for sort__start probes */
121 #define HEAP_SORT               0
122 #define INDEX_SORT              1
123 #define DATUM_SORT              2
124 #define CLUSTER_SORT    3
125
126 /* GUC variables */
127 #ifdef TRACE_SORT
128 bool            trace_sort = false;
129 #endif
130
131 #ifdef DEBUG_BOUNDED_SORT
132 bool            optimize_bounded_sort = true;
133 #endif
134
135
136 /*
137  * The objects we actually sort are SortTuple structs.  These contain
138  * a pointer to the tuple proper (might be a MinimalTuple or IndexTuple),
139  * which is a separate palloc chunk --- we assume it is just one chunk and
140  * can be freed by a simple pfree().  SortTuples also contain the tuple's
141  * first key column in Datum/nullflag format, and an index integer.
142  *
143  * Storing the first key column lets us save heap_getattr or index_getattr
144  * calls during tuple comparisons.  We could extract and save all the key
145  * columns not just the first, but this would increase code complexity and
146  * overhead, and wouldn't actually save any comparison cycles in the common
147  * case where the first key determines the comparison result.  Note that
148  * for a pass-by-reference datatype, datum1 points into the "tuple" storage.
149  *
150  * When sorting single Datums, the data value is represented directly by
151  * datum1/isnull1.  If the datatype is pass-by-reference and isnull1 is false,
152  * then datum1 points to a separately palloc'd data value that is also pointed
153  * to by the "tuple" pointer; otherwise "tuple" is NULL.
154  *
155  * While building initial runs, tupindex holds the tuple's run number.  During
156  * merge passes, we re-use it to hold the input tape number that each tuple in
157  * the heap was read from, or to hold the index of the next tuple pre-read
158  * from the same tape in the case of pre-read entries.  tupindex goes unused
159  * if the sort occurs entirely in memory.
160  */
161 typedef struct
162 {
163         void       *tuple;                      /* the tuple proper */
164         Datum           datum1;                 /* value of first key column */
165         bool            isnull1;                /* is first key column NULL? */
166         int                     tupindex;               /* see notes above */
167 } SortTuple;
168
169
170 /*
171  * Possible states of a Tuplesort object.  These denote the states that
172  * persist between calls of Tuplesort routines.
173  */
174 typedef enum
175 {
176         TSS_INITIAL,                            /* Loading tuples; still within memory limit */
177         TSS_BOUNDED,                            /* Loading tuples into bounded-size heap */
178         TSS_BUILDRUNS,                          /* Loading tuples; writing to tape */
179         TSS_SORTEDINMEM,                        /* Sort completed entirely in memory */
180         TSS_SORTEDONTAPE,                       /* Sort completed, final run is on tape */
181         TSS_FINALMERGE                          /* Performing final merge on-the-fly */
182 } TupSortStatus;
183
184 /*
185  * Parameters for calculation of number of tapes to use --- see inittapes()
186  * and tuplesort_merge_order().
187  *
188  * In this calculation we assume that each tape will cost us about 3 blocks
189  * worth of buffer space (which is an underestimate for very large data
190  * volumes, but it's probably close enough --- see logtape.c).
191  *
192  * MERGE_BUFFER_SIZE is how much data we'd like to read from each input
193  * tape during a preread cycle (see discussion at top of file).
194  */
195 #define MINORDER                6               /* minimum merge order */
196 #define TAPE_BUFFER_OVERHEAD            (BLCKSZ * 3)
197 #define MERGE_BUFFER_SIZE                       (BLCKSZ * 32)
198
199 typedef int (*SortTupleComparator) (const SortTuple *a, const SortTuple *b,
200                                                                                                 Tuplesortstate *state);
201
202 /*
203  * Private state of a Tuplesort operation.
204  */
205 struct Tuplesortstate
206 {
207         TupSortStatus status;           /* enumerated value as shown above */
208         int                     nKeys;                  /* number of columns in sort key */
209         bool            randomAccess;   /* did caller request random access? */
210         bool            bounded;                /* did caller specify a maximum number of
211                                                                  * tuples to return? */
212         bool            boundUsed;              /* true if we made use of a bounded heap */
213         int                     bound;                  /* if bounded, the maximum number of tuples */
214         int64           availMem;               /* remaining memory available, in bytes */
215         int64           allowedMem;             /* total memory allowed, in bytes */
216         int                     maxTapes;               /* number of tapes (Knuth's T) */
217         int                     tapeRange;              /* maxTapes-1 (Knuth's P) */
218         MemoryContext sortcontext;      /* memory context holding all sort data */
219         LogicalTapeSet *tapeset;        /* logtape.c object for tapes in a temp file */
220
221         /*
222          * These function pointers decouple the routines that must know what kind
223          * of tuple we are sorting from the routines that don't need to know it.
224          * They are set up by the tuplesort_begin_xxx routines.
225          *
226          * Function to compare two tuples; result is per qsort() convention, ie:
227          * <0, 0, >0 according as a<b, a=b, a>b.  The API must match
228          * qsort_arg_comparator.
229          */
230         SortTupleComparator comparetup;
231
232         /*
233          * Function to copy a supplied input tuple into palloc'd space and set up
234          * its SortTuple representation (ie, set tuple/datum1/isnull1).  Also,
235          * state->availMem must be decreased by the amount of space used for the
236          * tuple copy (note the SortTuple struct itself is not counted).
237          */
238         void            (*copytup) (Tuplesortstate *state, SortTuple *stup, void *tup);
239
240         /*
241          * Function to write a stored tuple onto tape.  The representation of the
242          * tuple on tape need not be the same as it is in memory; requirements on
243          * the tape representation are given below.  After writing the tuple,
244          * pfree() the out-of-line data (not the SortTuple struct!), and increase
245          * state->availMem by the amount of memory space thereby released.
246          */
247         void            (*writetup) (Tuplesortstate *state, int tapenum,
248                                                                                  SortTuple *stup);
249
250         /*
251          * Function to read a stored tuple from tape back into memory. 'len' is
252          * the already-read length of the stored tuple.  Create a palloc'd copy,
253          * initialize tuple/datum1/isnull1 in the target SortTuple struct, and
254          * decrease state->availMem by the amount of memory space consumed.
255          */
256         void            (*readtup) (Tuplesortstate *state, SortTuple *stup,
257                                                                                 int tapenum, unsigned int len);
258
259         /*
260          * Function to reverse the sort direction from its current state. (We
261          * could dispense with this if we wanted to enforce that all variants
262          * represent the sort key information alike.)
263          */
264         void            (*reversedirection) (Tuplesortstate *state);
265
266         /*
267          * This array holds the tuples now in sort memory.  If we are in state
268          * INITIAL, the tuples are in no particular order; if we are in state
269          * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
270          * and FINALMERGE, the tuples are organized in "heap" order per Algorithm
271          * H.  (Note that memtupcount only counts the tuples that are part of the
272          * heap --- during merge passes, memtuples[] entries beyond tapeRange are
273          * never in the heap and are used to hold pre-read tuples.)  In state
274          * SORTEDONTAPE, the array is not used.
275          */
276         SortTuple  *memtuples;          /* array of SortTuple structs */
277         int                     memtupcount;    /* number of tuples currently present */
278         int                     memtupsize;             /* allocated length of memtuples array */
279         bool            growmemtuples;  /* memtuples' growth still underway? */
280
281         /*
282          * While building initial runs, this is the current output run number
283          * (starting at 0).  Afterwards, it is the number of initial runs we made.
284          */
285         int                     currentRun;
286
287         /*
288          * Unless otherwise noted, all pointer variables below are pointers to
289          * arrays of length maxTapes, holding per-tape data.
290          */
291
292         /*
293          * These variables are only used during merge passes.  mergeactive[i] is
294          * true if we are reading an input run from (actual) tape number i and
295          * have not yet exhausted that run.  mergenext[i] is the memtuples index
296          * of the next pre-read tuple (next to be loaded into the heap) for tape
297          * i, or 0 if we are out of pre-read tuples.  mergelast[i] similarly
298          * points to the last pre-read tuple from each tape.  mergeavailslots[i]
299          * is the number of unused memtuples[] slots reserved for tape i, and
300          * mergeavailmem[i] is the amount of unused space allocated for tape i.
301          * mergefreelist and mergefirstfree keep track of unused locations in the
302          * memtuples[] array.  The memtuples[].tupindex fields link together
303          * pre-read tuples for each tape as well as recycled locations in
304          * mergefreelist. It is OK to use 0 as a null link in these lists, because
305          * memtuples[0] is part of the merge heap and is never a pre-read tuple.
306          */
307         bool       *mergeactive;        /* active input run source? */
308         int                *mergenext;          /* first preread tuple for each source */
309         int                *mergelast;          /* last preread tuple for each source */
310         int                *mergeavailslots;    /* slots left for prereading each tape */
311         int64      *mergeavailmem;      /* availMem for prereading each tape */
312         int                     mergefreelist;  /* head of freelist of recycled slots */
313         int                     mergefirstfree; /* first slot never used in this merge */
314
315         /*
316          * Variables for Algorithm D.  Note that destTape is a "logical" tape
317          * number, ie, an index into the tp_xxx[] arrays.  Be careful to keep
318          * "logical" and "actual" tape numbers straight!
319          */
320         int                     Level;                  /* Knuth's l */
321         int                     destTape;               /* current output tape (Knuth's j, less 1) */
322         int                *tp_fib;                     /* Target Fibonacci run counts (A[]) */
323         int                *tp_runs;            /* # of real runs on each tape */
324         int                *tp_dummy;           /* # of dummy runs for each tape (D[]) */
325         int                *tp_tapenum;         /* Actual tape numbers (TAPE[]) */
326         int                     activeTapes;    /* # of active input tapes in merge pass */
327
328         /*
329          * These variables are used after completion of sorting to keep track of
330          * the next tuple to return.  (In the tape case, the tape's current read
331          * position is also critical state.)
332          */
333         int                     result_tape;    /* actual tape number of finished output */
334         int                     current;                /* array index (only used if SORTEDINMEM) */
335         bool            eof_reached;    /* reached EOF (needed for cursors) */
336
337         /* markpos_xxx holds marked position for mark and restore */
338         long            markpos_block;  /* tape block# (only used if SORTEDONTAPE) */
339         int                     markpos_offset; /* saved "current", or offset in tape block */
340         bool            markpos_eof;    /* saved "eof_reached" */
341
342         /*
343          * These variables are specific to the MinimalTuple case; they are set by
344          * tuplesort_begin_heap and used only by the MinimalTuple routines.
345          */
346         TupleDesc       tupDesc;
347         SortSupport sortKeys;           /* array of length nKeys */
348
349         /*
350          * This variable is shared by the single-key MinimalTuple case and the
351          * Datum case (which both use qsort_ssup()).  Otherwise it's NULL.
352          */
353         SortSupport onlyKey;
354
355         /*
356          * These variables are specific to the CLUSTER case; they are set by
357          * tuplesort_begin_cluster.  Note CLUSTER also uses tupDesc and
358          * indexScanKey.
359          */
360         IndexInfo  *indexInfo;          /* info about index being used for reference */
361         EState     *estate;                     /* for evaluating index expressions */
362
363         /*
364          * These variables are specific to the IndexTuple case; they are set by
365          * tuplesort_begin_index_xxx and used only by the IndexTuple routines.
366          */
367         Relation        heapRel;                /* table the index is being built on */
368         Relation        indexRel;               /* index being built */
369
370         /* These are specific to the index_btree subcase: */
371         ScanKey         indexScanKey;
372         bool            enforceUnique;  /* complain if we find duplicate tuples */
373
374         /* These are specific to the index_hash subcase: */
375         uint32          hash_mask;              /* mask for sortable part of hash code */
376
377         /*
378          * These variables are specific to the Datum case; they are set by
379          * tuplesort_begin_datum and used only by the DatumTuple routines.
380          */
381         Oid                     datumType;
382         /* we need typelen and byval in order to know how to copy the Datums. */
383         int                     datumTypeLen;
384         bool            datumTypeByVal;
385
386         /*
387          * Resource snapshot for time of sort start.
388          */
389 #ifdef TRACE_SORT
390         PGRUsage        ru_start;
391 #endif
392 };
393
394 #define COMPARETUP(state,a,b)   ((*(state)->comparetup) (a, b, state))
395 #define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup))
396 #define WRITETUP(state,tape,stup)       ((*(state)->writetup) (state, tape, stup))
397 #define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len))
398 #define REVERSEDIRECTION(state) ((*(state)->reversedirection) (state))
399 #define LACKMEM(state)          ((state)->availMem < 0)
400 #define USEMEM(state,amt)       ((state)->availMem -= (amt))
401 #define FREEMEM(state,amt)      ((state)->availMem += (amt))
402
403 /*
404  * NOTES about on-tape representation of tuples:
405  *
406  * We require the first "unsigned int" of a stored tuple to be the total size
407  * on-tape of the tuple, including itself (so it is never zero; an all-zero
408  * unsigned int is used to delimit runs).  The remainder of the stored tuple
409  * may or may not match the in-memory representation of the tuple ---
410  * any conversion needed is the job of the writetup and readtup routines.
411  *
412  * If state->randomAccess is true, then the stored representation of the
413  * tuple must be followed by another "unsigned int" that is a copy of the
414  * length --- so the total tape space used is actually sizeof(unsigned int)
415  * more than the stored length value.  This allows read-backwards.  When
416  * randomAccess is not true, the write/read routines may omit the extra
417  * length word.
418  *
419  * writetup is expected to write both length words as well as the tuple
420  * data.  When readtup is called, the tape is positioned just after the
421  * front length word; readtup must read the tuple data and advance past
422  * the back length word (if present).
423  *
424  * The write/read routines can make use of the tuple description data
425  * stored in the Tuplesortstate record, if needed.  They are also expected
426  * to adjust state->availMem by the amount of memory space (not tape space!)
427  * released or consumed.  There is no error return from either writetup
428  * or readtup; they should ereport() on failure.
429  *
430  *
431  * NOTES about memory consumption calculations:
432  *
433  * We count space allocated for tuples against the workMem limit, plus
434  * the space used by the variable-size memtuples array.  Fixed-size space
435  * is not counted; it's small enough to not be interesting.
436  *
437  * Note that we count actual space used (as shown by GetMemoryChunkSpace)
438  * rather than the originally-requested size.  This is important since
439  * palloc can add substantial overhead.  It's not a complete answer since
440  * we won't count any wasted space in palloc allocation blocks, but it's
441  * a lot better than what we were doing before 7.3.
442  */
443
444 /* When using this macro, beware of double evaluation of len */
445 #define LogicalTapeReadExact(tapeset, tapenum, ptr, len) \
446         do { \
447                 if (LogicalTapeRead(tapeset, tapenum, ptr, len) != (size_t) (len)) \
448                         elog(ERROR, "unexpected end of data"); \
449         } while(0)
450
451
452 static Tuplesortstate *tuplesort_begin_common(int workMem, bool randomAccess);
453 static void puttuple_common(Tuplesortstate *state, SortTuple *tuple);
454 static void inittapes(Tuplesortstate *state);
455 static void selectnewtape(Tuplesortstate *state);
456 static void mergeruns(Tuplesortstate *state);
457 static void mergeonerun(Tuplesortstate *state);
458 static void beginmerge(Tuplesortstate *state);
459 static void mergepreread(Tuplesortstate *state);
460 static void mergeprereadone(Tuplesortstate *state, int srcTape);
461 static void dumptuples(Tuplesortstate *state, bool alltuples);
462 static void make_bounded_heap(Tuplesortstate *state);
463 static void sort_bounded_heap(Tuplesortstate *state);
464 static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple,
465                                           int tupleindex, bool checkIndex);
466 static void tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex);
467 static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
468 static void markrunend(Tuplesortstate *state, int tapenum);
469 static int comparetup_heap(const SortTuple *a, const SortTuple *b,
470                                 Tuplesortstate *state);
471 static void copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup);
472 static void writetup_heap(Tuplesortstate *state, int tapenum,
473                           SortTuple *stup);
474 static void readtup_heap(Tuplesortstate *state, SortTuple *stup,
475                          int tapenum, unsigned int len);
476 static void reversedirection_heap(Tuplesortstate *state);
477 static int comparetup_cluster(const SortTuple *a, const SortTuple *b,
478                                    Tuplesortstate *state);
479 static void copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup);
480 static void writetup_cluster(Tuplesortstate *state, int tapenum,
481                                  SortTuple *stup);
482 static void readtup_cluster(Tuplesortstate *state, SortTuple *stup,
483                                 int tapenum, unsigned int len);
484 static int comparetup_index_btree(const SortTuple *a, const SortTuple *b,
485                                            Tuplesortstate *state);
486 static int comparetup_index_hash(const SortTuple *a, const SortTuple *b,
487                                           Tuplesortstate *state);
488 static void copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup);
489 static void writetup_index(Tuplesortstate *state, int tapenum,
490                            SortTuple *stup);
491 static void readtup_index(Tuplesortstate *state, SortTuple *stup,
492                           int tapenum, unsigned int len);
493 static void reversedirection_index_btree(Tuplesortstate *state);
494 static void reversedirection_index_hash(Tuplesortstate *state);
495 static int comparetup_datum(const SortTuple *a, const SortTuple *b,
496                                  Tuplesortstate *state);
497 static void copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup);
498 static void writetup_datum(Tuplesortstate *state, int tapenum,
499                            SortTuple *stup);
500 static void readtup_datum(Tuplesortstate *state, SortTuple *stup,
501                           int tapenum, unsigned int len);
502 static void reversedirection_datum(Tuplesortstate *state);
503 static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
504
505 /*
506  * Special versions of qsort just for SortTuple objects.  qsort_tuple() sorts
507  * any variant of SortTuples, using the appropriate comparetup function.
508  * qsort_ssup() is specialized for the case where the comparetup function
509  * reduces to ApplySortComparator(), that is single-key MinimalTuple sorts
510  * and Datum sorts.
511  */
512 #include "qsort_tuple.c"
513
514
515 /*
516  *              tuplesort_begin_xxx
517  *
518  * Initialize for a tuple sort operation.
519  *
520  * After calling tuplesort_begin, the caller should call tuplesort_putXXX
521  * zero or more times, then call tuplesort_performsort when all the tuples
522  * have been supplied.  After performsort, retrieve the tuples in sorted
523  * order by calling tuplesort_getXXX until it returns false/NULL.  (If random
524  * access was requested, rescan, markpos, and restorepos can also be called.)
525  * Call tuplesort_end to terminate the operation and release memory/disk space.
526  *
527  * Each variant of tuplesort_begin has a workMem parameter specifying the
528  * maximum number of kilobytes of RAM to use before spilling data to disk.
529  * (The normal value of this parameter is work_mem, but some callers use
530  * other values.)  Each variant also has a randomAccess parameter specifying
531  * whether the caller needs non-sequential access to the sort result.
532  */
533
534 static Tuplesortstate *
535 tuplesort_begin_common(int workMem, bool randomAccess)
536 {
537         Tuplesortstate *state;
538         MemoryContext sortcontext;
539         MemoryContext oldcontext;
540
541         /*
542          * Create a working memory context for this sort operation. All data
543          * needed by the sort will live inside this context.
544          */
545         sortcontext = AllocSetContextCreate(CurrentMemoryContext,
546                                                                                 "TupleSort",
547                                                                                 ALLOCSET_DEFAULT_MINSIZE,
548                                                                                 ALLOCSET_DEFAULT_INITSIZE,
549                                                                                 ALLOCSET_DEFAULT_MAXSIZE);
550
551         /*
552          * Make the Tuplesortstate within the per-sort context.  This way, we
553          * don't need a separate pfree() operation for it at shutdown.
554          */
555         oldcontext = MemoryContextSwitchTo(sortcontext);
556
557         state = (Tuplesortstate *) palloc0(sizeof(Tuplesortstate));
558
559 #ifdef TRACE_SORT
560         if (trace_sort)
561                 pg_rusage_init(&state->ru_start);
562 #endif
563
564         state->status = TSS_INITIAL;
565         state->randomAccess = randomAccess;
566         state->bounded = false;
567         state->boundUsed = false;
568         state->allowedMem = workMem * (int64) 1024;
569         state->availMem = state->allowedMem;
570         state->sortcontext = sortcontext;
571         state->tapeset = NULL;
572
573         state->memtupcount = 0;
574         state->memtupsize = 1024;       /* initial guess */
575         state->growmemtuples = true;
576         state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
577
578         USEMEM(state, GetMemoryChunkSpace(state->memtuples));
579
580         /* workMem must be large enough for the minimal memtuples array */
581         if (LACKMEM(state))
582                 elog(ERROR, "insufficient memory allowed for sort");
583
584         state->currentRun = 0;
585
586         /*
587          * maxTapes, tapeRange, and Algorithm D variables will be initialized by
588          * inittapes(), if needed
589          */
590
591         state->result_tape = -1;        /* flag that result tape has not been formed */
592
593         MemoryContextSwitchTo(oldcontext);
594
595         return state;
596 }
597
598 Tuplesortstate *
599 tuplesort_begin_heap(TupleDesc tupDesc,
600                                          int nkeys, AttrNumber *attNums,
601                                          Oid *sortOperators, Oid *sortCollations,
602                                          bool *nullsFirstFlags,
603                                          int workMem, bool randomAccess)
604 {
605         Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
606         MemoryContext oldcontext;
607         int                     i;
608
609         oldcontext = MemoryContextSwitchTo(state->sortcontext);
610
611         AssertArg(nkeys > 0);
612
613 #ifdef TRACE_SORT
614         if (trace_sort)
615                 elog(LOG,
616                          "begin tuple sort: nkeys = %d, workMem = %d, randomAccess = %c",
617                          nkeys, workMem, randomAccess ? 't' : 'f');
618 #endif
619
620         state->nKeys = nkeys;
621
622         TRACE_POSTGRESQL_SORT_START(HEAP_SORT,
623                                                                 false,  /* no unique check */
624                                                                 nkeys,
625                                                                 workMem,
626                                                                 randomAccess);
627
628         state->comparetup = comparetup_heap;
629         state->copytup = copytup_heap;
630         state->writetup = writetup_heap;
631         state->readtup = readtup_heap;
632         state->reversedirection = reversedirection_heap;
633
634         state->tupDesc = tupDesc;       /* assume we need not copy tupDesc */
635
636         /* Prepare SortSupport data for each column */
637         state->sortKeys = (SortSupport) palloc0(nkeys * sizeof(SortSupportData));
638
639         for (i = 0; i < nkeys; i++)
640         {
641                 SortSupport sortKey = state->sortKeys + i;
642
643                 AssertArg(attNums[i] != 0);
644                 AssertArg(sortOperators[i] != 0);
645
646                 sortKey->ssup_cxt = CurrentMemoryContext;
647                 sortKey->ssup_collation = sortCollations[i];
648                 sortKey->ssup_nulls_first = nullsFirstFlags[i];
649                 sortKey->ssup_attno = attNums[i];
650
651                 PrepareSortSupportFromOrderingOp(sortOperators[i], sortKey);
652         }
653
654         if (nkeys == 1)
655                 state->onlyKey = state->sortKeys;
656
657         MemoryContextSwitchTo(oldcontext);
658
659         return state;
660 }
661
662 Tuplesortstate *
663 tuplesort_begin_cluster(TupleDesc tupDesc,
664                                                 Relation indexRel,
665                                                 int workMem, bool randomAccess)
666 {
667         Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
668         MemoryContext oldcontext;
669
670         Assert(indexRel->rd_rel->relam == BTREE_AM_OID);
671
672         oldcontext = MemoryContextSwitchTo(state->sortcontext);
673
674 #ifdef TRACE_SORT
675         if (trace_sort)
676                 elog(LOG,
677                          "begin tuple sort: nkeys = %d, workMem = %d, randomAccess = %c",
678                          RelationGetNumberOfAttributes(indexRel),
679                          workMem, randomAccess ? 't' : 'f');
680 #endif
681
682         state->nKeys = RelationGetNumberOfAttributes(indexRel);
683
684         TRACE_POSTGRESQL_SORT_START(CLUSTER_SORT,
685                                                                 false,  /* no unique check */
686                                                                 state->nKeys,
687                                                                 workMem,
688                                                                 randomAccess);
689
690         state->comparetup = comparetup_cluster;
691         state->copytup = copytup_cluster;
692         state->writetup = writetup_cluster;
693         state->readtup = readtup_cluster;
694         state->reversedirection = reversedirection_index_btree;
695
696         state->indexInfo = BuildIndexInfo(indexRel);
697         state->indexScanKey = _bt_mkscankey_nodata(indexRel);
698
699         state->tupDesc = tupDesc;       /* assume we need not copy tupDesc */
700
701         if (state->indexInfo->ii_Expressions != NULL)
702         {
703                 TupleTableSlot *slot;
704                 ExprContext *econtext;
705
706                 /*
707                  * We will need to use FormIndexDatum to evaluate the index
708                  * expressions.  To do that, we need an EState, as well as a
709                  * TupleTableSlot to put the table tuples into.  The econtext's
710                  * scantuple has to point to that slot, too.
711                  */
712                 state->estate = CreateExecutorState();
713                 slot = MakeSingleTupleTableSlot(tupDesc);
714                 econtext = GetPerTupleExprContext(state->estate);
715                 econtext->ecxt_scantuple = slot;
716         }
717
718         MemoryContextSwitchTo(oldcontext);
719
720         return state;
721 }
722
723 Tuplesortstate *
724 tuplesort_begin_index_btree(Relation heapRel,
725                                                         Relation indexRel,
726                                                         bool enforceUnique,
727                                                         int workMem, bool randomAccess)
728 {
729         Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
730         MemoryContext oldcontext;
731
732         oldcontext = MemoryContextSwitchTo(state->sortcontext);
733
734 #ifdef TRACE_SORT
735         if (trace_sort)
736                 elog(LOG,
737                          "begin index sort: unique = %c, workMem = %d, randomAccess = %c",
738                          enforceUnique ? 't' : 'f',
739                          workMem, randomAccess ? 't' : 'f');
740 #endif
741
742         state->nKeys = RelationGetNumberOfAttributes(indexRel);
743
744         TRACE_POSTGRESQL_SORT_START(INDEX_SORT,
745                                                                 enforceUnique,
746                                                                 state->nKeys,
747                                                                 workMem,
748                                                                 randomAccess);
749
750         state->comparetup = comparetup_index_btree;
751         state->copytup = copytup_index;
752         state->writetup = writetup_index;
753         state->readtup = readtup_index;
754         state->reversedirection = reversedirection_index_btree;
755
756         state->heapRel = heapRel;
757         state->indexRel = indexRel;
758         state->indexScanKey = _bt_mkscankey_nodata(indexRel);
759         state->enforceUnique = enforceUnique;
760
761         MemoryContextSwitchTo(oldcontext);
762
763         return state;
764 }
765
766 Tuplesortstate *
767 tuplesort_begin_index_hash(Relation heapRel,
768                                                    Relation indexRel,
769                                                    uint32 hash_mask,
770                                                    int workMem, bool randomAccess)
771 {
772         Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
773         MemoryContext oldcontext;
774
775         oldcontext = MemoryContextSwitchTo(state->sortcontext);
776
777 #ifdef TRACE_SORT
778         if (trace_sort)
779                 elog(LOG,
780                 "begin index sort: hash_mask = 0x%x, workMem = %d, randomAccess = %c",
781                          hash_mask,
782                          workMem, randomAccess ? 't' : 'f');
783 #endif
784
785         state->nKeys = 1;                       /* Only one sort column, the hash code */
786
787         state->comparetup = comparetup_index_hash;
788         state->copytup = copytup_index;
789         state->writetup = writetup_index;
790         state->readtup = readtup_index;
791         state->reversedirection = reversedirection_index_hash;
792
793         state->heapRel = heapRel;
794         state->indexRel = indexRel;
795
796         state->hash_mask = hash_mask;
797
798         MemoryContextSwitchTo(oldcontext);
799
800         return state;
801 }
802
803 Tuplesortstate *
804 tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
805                                           bool nullsFirstFlag,
806                                           int workMem, bool randomAccess)
807 {
808         Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
809         MemoryContext oldcontext;
810         int16           typlen;
811         bool            typbyval;
812
813         oldcontext = MemoryContextSwitchTo(state->sortcontext);
814
815 #ifdef TRACE_SORT
816         if (trace_sort)
817                 elog(LOG,
818                          "begin datum sort: workMem = %d, randomAccess = %c",
819                          workMem, randomAccess ? 't' : 'f');
820 #endif
821
822         state->nKeys = 1;                       /* always a one-column sort */
823
824         TRACE_POSTGRESQL_SORT_START(DATUM_SORT,
825                                                                 false,  /* no unique check */
826                                                                 1,
827                                                                 workMem,
828                                                                 randomAccess);
829
830         state->comparetup = comparetup_datum;
831         state->copytup = copytup_datum;
832         state->writetup = writetup_datum;
833         state->readtup = readtup_datum;
834         state->reversedirection = reversedirection_datum;
835
836         state->datumType = datumType;
837
838         /* Prepare SortSupport data */
839         state->onlyKey = (SortSupport) palloc0(sizeof(SortSupportData));
840
841         state->onlyKey->ssup_cxt = CurrentMemoryContext;
842         state->onlyKey->ssup_collation = sortCollation;
843         state->onlyKey->ssup_nulls_first = nullsFirstFlag;
844
845         PrepareSortSupportFromOrderingOp(sortOperator, state->onlyKey);
846
847         /* lookup necessary attributes of the datum type */
848         get_typlenbyval(datumType, &typlen, &typbyval);
849         state->datumTypeLen = typlen;
850         state->datumTypeByVal = typbyval;
851
852         MemoryContextSwitchTo(oldcontext);
853
854         return state;
855 }
856
857 /*
858  * tuplesort_set_bound
859  *
860  *      Advise tuplesort that at most the first N result tuples are required.
861  *
862  * Must be called before inserting any tuples.  (Actually, we could allow it
863  * as long as the sort hasn't spilled to disk, but there seems no need for
864  * delayed calls at the moment.)
865  *
866  * This is a hint only. The tuplesort may still return more tuples than
867  * requested.
868  */
869 void
870 tuplesort_set_bound(Tuplesortstate *state, int64 bound)
871 {
872         /* Assert we're called before loading any tuples */
873         Assert(state->status == TSS_INITIAL);
874         Assert(state->memtupcount == 0);
875         Assert(!state->bounded);
876
877 #ifdef DEBUG_BOUNDED_SORT
878         /* Honor GUC setting that disables the feature (for easy testing) */
879         if (!optimize_bounded_sort)
880                 return;
881 #endif
882
883         /* We want to be able to compute bound * 2, so limit the setting */
884         if (bound > (int64) (INT_MAX / 2))
885                 return;
886
887         state->bounded = true;
888         state->bound = (int) bound;
889 }
890
891 /*
892  * tuplesort_end
893  *
894  *      Release resources and clean up.
895  *
896  * NOTE: after calling this, any pointers returned by tuplesort_getXXX are
897  * pointing to garbage.  Be careful not to attempt to use or free such
898  * pointers afterwards!
899  */
900 void
901 tuplesort_end(Tuplesortstate *state)
902 {
903         /* context swap probably not needed, but let's be safe */
904         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
905
906 #ifdef TRACE_SORT
907         long            spaceUsed;
908
909         if (state->tapeset)
910                 spaceUsed = LogicalTapeSetBlocks(state->tapeset);
911         else
912                 spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
913 #endif
914
915         /*
916          * Delete temporary "tape" files, if any.
917          *
918          * Note: want to include this in reported total cost of sort, hence need
919          * for two #ifdef TRACE_SORT sections.
920          */
921         if (state->tapeset)
922                 LogicalTapeSetClose(state->tapeset);
923
924 #ifdef TRACE_SORT
925         if (trace_sort)
926         {
927                 if (state->tapeset)
928                         elog(LOG, "external sort ended, %ld disk blocks used: %s",
929                                  spaceUsed, pg_rusage_show(&state->ru_start));
930                 else
931                         elog(LOG, "internal sort ended, %ld KB used: %s",
932                                  spaceUsed, pg_rusage_show(&state->ru_start));
933         }
934
935         TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, spaceUsed);
936 #else
937
938         /*
939          * If you disabled TRACE_SORT, you can still probe sort__done, but you
940          * ain't getting space-used stats.
941          */
942         TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, 0L);
943 #endif
944
945         /* Free any execution state created for CLUSTER case */
946         if (state->estate != NULL)
947         {
948                 ExprContext *econtext = GetPerTupleExprContext(state->estate);
949
950                 ExecDropSingleTupleTableSlot(econtext->ecxt_scantuple);
951                 FreeExecutorState(state->estate);
952         }
953
954         MemoryContextSwitchTo(oldcontext);
955
956         /*
957          * Free the per-sort memory context, thereby releasing all working memory,
958          * including the Tuplesortstate struct itself.
959          */
960         MemoryContextDelete(state->sortcontext);
961 }
962
963 /*
964  * Grow the memtuples[] array, if possible within our memory constraint.  We
965  * must not exceed INT_MAX tuples in memory or the caller-provided memory
966  * limit.  Return TRUE if we were able to enlarge the array, FALSE if not.
967  *
968  * Normally, at each increment we double the size of the array.  When doing
969  * that would exceed a limit, we attempt one last, smaller increase (and then
970  * clear the growmemtuples flag so we don't try any more).  That allows us to
971  * use memory as fully as permitted; sticking to the pure doubling rule could
972  * result in almost half going unused.  Because availMem moves around with
973  * tuple addition/removal, we need some rule to prevent making repeated small
974  * increases in memtupsize, which would just be useless thrashing.  The
975  * growmemtuples flag accomplishes that and also prevents useless
976  * recalculations in this function.
977  */
978 static bool
979 grow_memtuples(Tuplesortstate *state)
980 {
981         int                     newmemtupsize;
982         int                     memtupsize = state->memtupsize;
983         int64           memNowUsed = state->allowedMem - state->availMem;
984
985         /* Forget it if we've already maxed out memtuples, per comment above */
986         if (!state->growmemtuples)
987                 return false;
988
989         /* Select new value of memtupsize */
990         if (memNowUsed <= state->availMem)
991         {
992                 /*
993                  * We've used no more than half of allowedMem; double our usage,
994                  * clamping at INT_MAX tuples.
995                  */
996                 if (memtupsize < INT_MAX / 2)
997                         newmemtupsize = memtupsize * 2;
998                 else
999                 {
1000                         newmemtupsize = INT_MAX;
1001                         state->growmemtuples = false;
1002                 }
1003         }
1004         else
1005         {
1006                 /*
1007                  * This will be the last increment of memtupsize.  Abandon doubling
1008                  * strategy and instead increase as much as we safely can.
1009                  *
1010                  * To stay within allowedMem, we can't increase memtupsize by more
1011                  * than availMem / sizeof(SortTuple) elements.  In practice, we want
1012                  * to increase it by considerably less, because we need to leave some
1013                  * space for the tuples to which the new array slots will refer.  We
1014                  * assume the new tuples will be about the same size as the tuples
1015                  * we've already seen, and thus we can extrapolate from the space
1016                  * consumption so far to estimate an appropriate new size for the
1017                  * memtuples array.  The optimal value might be higher or lower than
1018                  * this estimate, but it's hard to know that in advance.  We again
1019                  * clamp at INT_MAX tuples.
1020                  *
1021                  * This calculation is safe against enlarging the array so much that
1022                  * LACKMEM becomes true, because the memory currently used includes
1023                  * the present array; thus, there would be enough allowedMem for the
1024                  * new array elements even if no other memory were currently used.
1025                  *
1026                  * We do the arithmetic in float8, because otherwise the product of
1027                  * memtupsize and allowedMem could overflow.  Any inaccuracy in the
1028                  * result should be insignificant; but even if we computed a
1029                  * completely insane result, the checks below will prevent anything
1030                  * really bad from happening.
1031                  */
1032                 double          grow_ratio;
1033
1034                 grow_ratio = (double) state->allowedMem / (double) memNowUsed;
1035                 if (memtupsize * grow_ratio < INT_MAX)
1036                         newmemtupsize = (int) (memtupsize * grow_ratio);
1037                 else
1038                         newmemtupsize = INT_MAX;
1039
1040                 /* We won't make any further enlargement attempts */
1041                 state->growmemtuples = false;
1042         }
1043
1044         /* Must enlarge array by at least one element, else report failure */
1045         if (newmemtupsize <= memtupsize)
1046                 goto noalloc;
1047
1048         /*
1049          * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize.  Clamp
1050          * to ensure our request won't be rejected.  Note that we can easily
1051          * exhaust address space before facing this outcome.  (This is presently
1052          * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but
1053          * don't rely on that at this distance.)
1054          */
1055         if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(SortTuple))
1056         {
1057                 newmemtupsize = (int) (MaxAllocHugeSize / sizeof(SortTuple));
1058                 state->growmemtuples = false;   /* can't grow any more */
1059         }
1060
1061         /*
1062          * We need to be sure that we do not cause LACKMEM to become true, else
1063          * the space management algorithm will go nuts.  The code above should
1064          * never generate a dangerous request, but to be safe, check explicitly
1065          * that the array growth fits within availMem.  (We could still cause
1066          * LACKMEM if the memory chunk overhead associated with the memtuples
1067          * array were to increase.  That shouldn't happen with any sane value of
1068          * allowedMem, because at any array size large enough to risk LACKMEM,
1069          * palloc would be treating both old and new arrays as separate chunks.
1070          * But we'll check LACKMEM explicitly below just in case.)
1071          */
1072         if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(SortTuple)))
1073                 goto noalloc;
1074
1075         /* OK, do it */
1076         FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
1077         state->memtupsize = newmemtupsize;
1078         state->memtuples = (SortTuple *)
1079                 repalloc_huge(state->memtuples,
1080                                           state->memtupsize * sizeof(SortTuple));
1081         USEMEM(state, GetMemoryChunkSpace(state->memtuples));
1082         if (LACKMEM(state))
1083                 elog(ERROR, "unexpected out-of-memory situation during sort");
1084         return true;
1085
1086 noalloc:
1087         /* If for any reason we didn't realloc, shut off future attempts */
1088         state->growmemtuples = false;
1089         return false;
1090 }
1091
1092 /*
1093  * Accept one tuple while collecting input data for sort.
1094  *
1095  * Note that the input data is always copied; the caller need not save it.
1096  */
1097 void
1098 tuplesort_puttupleslot(Tuplesortstate *state, TupleTableSlot *slot)
1099 {
1100         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1101         SortTuple       stup;
1102
1103         /*
1104          * Copy the given tuple into memory we control, and decrease availMem.
1105          * Then call the common code.
1106          */
1107         COPYTUP(state, &stup, (void *) slot);
1108
1109         puttuple_common(state, &stup);
1110
1111         MemoryContextSwitchTo(oldcontext);
1112 }
1113
1114 /*
1115  * Accept one tuple while collecting input data for sort.
1116  *
1117  * Note that the input data is always copied; the caller need not save it.
1118  */
1119 void
1120 tuplesort_putheaptuple(Tuplesortstate *state, HeapTuple tup)
1121 {
1122         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1123         SortTuple       stup;
1124
1125         /*
1126          * Copy the given tuple into memory we control, and decrease availMem.
1127          * Then call the common code.
1128          */
1129         COPYTUP(state, &stup, (void *) tup);
1130
1131         puttuple_common(state, &stup);
1132
1133         MemoryContextSwitchTo(oldcontext);
1134 }
1135
1136 /*
1137  * Collect one index tuple while collecting input data for sort, building
1138  * it from caller-supplied values.
1139  */
1140 void
1141 tuplesort_putindextuplevalues(Tuplesortstate *state, Relation rel,
1142                                                           ItemPointer self, Datum *values,
1143                                                           bool *isnull)
1144 {
1145         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1146         SortTuple       stup;
1147
1148         stup.tuple = index_form_tuple(RelationGetDescr(rel), values, isnull);
1149         ((IndexTuple) stup.tuple)->t_tid = *self;
1150         USEMEM(state, GetMemoryChunkSpace(stup.tuple));
1151         /* set up first-column key value */
1152         stup.datum1 = index_getattr((IndexTuple) stup.tuple,
1153                                                                 1,
1154                                                                 RelationGetDescr(state->indexRel),
1155                                                                 &stup.isnull1);
1156         puttuple_common(state, &stup);
1157
1158         MemoryContextSwitchTo(oldcontext);
1159 }
1160
1161 /*
1162  * Accept one Datum while collecting input data for sort.
1163  *
1164  * If the Datum is pass-by-ref type, the value will be copied.
1165  */
1166 void
1167 tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull)
1168 {
1169         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1170         SortTuple       stup;
1171
1172         /*
1173          * If it's a pass-by-reference value, copy it into memory we control, and
1174          * decrease availMem.  Then call the common code.
1175          */
1176         if (isNull || state->datumTypeByVal)
1177         {
1178                 stup.datum1 = val;
1179                 stup.isnull1 = isNull;
1180                 stup.tuple = NULL;              /* no separate storage */
1181         }
1182         else
1183         {
1184                 stup.datum1 = datumCopy(val, false, state->datumTypeLen);
1185                 stup.isnull1 = false;
1186                 stup.tuple = DatumGetPointer(stup.datum1);
1187                 USEMEM(state, GetMemoryChunkSpace(stup.tuple));
1188         }
1189
1190         puttuple_common(state, &stup);
1191
1192         MemoryContextSwitchTo(oldcontext);
1193 }
1194
1195 /*
1196  * Shared code for tuple and datum cases.
1197  */
1198 static void
1199 puttuple_common(Tuplesortstate *state, SortTuple *tuple)
1200 {
1201         switch (state->status)
1202         {
1203                 case TSS_INITIAL:
1204
1205                         /*
1206                          * Save the tuple into the unsorted array.  First, grow the array
1207                          * as needed.  Note that we try to grow the array when there is
1208                          * still one free slot remaining --- if we fail, there'll still be
1209                          * room to store the incoming tuple, and then we'll switch to
1210                          * tape-based operation.
1211                          */
1212                         if (state->memtupcount >= state->memtupsize - 1)
1213                         {
1214                                 (void) grow_memtuples(state);
1215                                 Assert(state->memtupcount < state->memtupsize);
1216                         }
1217                         state->memtuples[state->memtupcount++] = *tuple;
1218
1219                         /*
1220                          * Check if it's time to switch over to a bounded heapsort. We do
1221                          * so if the input tuple count exceeds twice the desired tuple
1222                          * count (this is a heuristic for where heapsort becomes cheaper
1223                          * than a quicksort), or if we've just filled workMem and have
1224                          * enough tuples to meet the bound.
1225                          *
1226                          * Note that once we enter TSS_BOUNDED state we will always try to
1227                          * complete the sort that way.  In the worst case, if later input
1228                          * tuples are larger than earlier ones, this might cause us to
1229                          * exceed workMem significantly.
1230                          */
1231                         if (state->bounded &&
1232                                 (state->memtupcount > state->bound * 2 ||
1233                                  (state->memtupcount > state->bound && LACKMEM(state))))
1234                         {
1235 #ifdef TRACE_SORT
1236                                 if (trace_sort)
1237                                         elog(LOG, "switching to bounded heapsort at %d tuples: %s",
1238                                                  state->memtupcount,
1239                                                  pg_rusage_show(&state->ru_start));
1240 #endif
1241                                 make_bounded_heap(state);
1242                                 return;
1243                         }
1244
1245                         /*
1246                          * Done if we still fit in available memory and have array slots.
1247                          */
1248                         if (state->memtupcount < state->memtupsize && !LACKMEM(state))
1249                                 return;
1250
1251                         /*
1252                          * Nope; time to switch to tape-based operation.
1253                          */
1254                         inittapes(state);
1255
1256                         /*
1257                          * Dump tuples until we are back under the limit.
1258                          */
1259                         dumptuples(state, false);
1260                         break;
1261
1262                 case TSS_BOUNDED:
1263
1264                         /*
1265                          * We don't want to grow the array here, so check whether the new
1266                          * tuple can be discarded before putting it in.  This should be a
1267                          * good speed optimization, too, since when there are many more
1268                          * input tuples than the bound, most input tuples can be discarded
1269                          * with just this one comparison.  Note that because we currently
1270                          * have the sort direction reversed, we must check for <= not >=.
1271                          */
1272                         if (COMPARETUP(state, tuple, &state->memtuples[0]) <= 0)
1273                         {
1274                                 /* new tuple <= top of the heap, so we can discard it */
1275                                 free_sort_tuple(state, tuple);
1276                                 CHECK_FOR_INTERRUPTS();
1277                         }
1278                         else
1279                         {
1280                                 /* discard top of heap, sift up, insert new tuple */
1281                                 free_sort_tuple(state, &state->memtuples[0]);
1282                                 tuplesort_heap_siftup(state, false);
1283                                 tuplesort_heap_insert(state, tuple, 0, false);
1284                         }
1285                         break;
1286
1287                 case TSS_BUILDRUNS:
1288
1289                         /*
1290                          * Insert the tuple into the heap, with run number currentRun if
1291                          * it can go into the current run, else run number currentRun+1.
1292                          * The tuple can go into the current run if it is >= the first
1293                          * not-yet-output tuple.  (Actually, it could go into the current
1294                          * run if it is >= the most recently output tuple ... but that
1295                          * would require keeping around the tuple we last output, and it's
1296                          * simplest to let writetup free each tuple as soon as it's
1297                          * written.)
1298                          *
1299                          * Note there will always be at least one tuple in the heap at
1300                          * this point; see dumptuples.
1301                          */
1302                         Assert(state->memtupcount > 0);
1303                         if (COMPARETUP(state, tuple, &state->memtuples[0]) >= 0)
1304                                 tuplesort_heap_insert(state, tuple, state->currentRun, true);
1305                         else
1306                                 tuplesort_heap_insert(state, tuple, state->currentRun + 1, true);
1307
1308                         /*
1309                          * If we are over the memory limit, dump tuples till we're under.
1310                          */
1311                         dumptuples(state, false);
1312                         break;
1313
1314                 default:
1315                         elog(ERROR, "invalid tuplesort state");
1316                         break;
1317         }
1318 }
1319
1320 /*
1321  * All tuples have been provided; finish the sort.
1322  */
1323 void
1324 tuplesort_performsort(Tuplesortstate *state)
1325 {
1326         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1327
1328 #ifdef TRACE_SORT
1329         if (trace_sort)
1330                 elog(LOG, "performsort starting: %s",
1331                          pg_rusage_show(&state->ru_start));
1332 #endif
1333
1334         switch (state->status)
1335         {
1336                 case TSS_INITIAL:
1337
1338                         /*
1339                          * We were able to accumulate all the tuples within the allowed
1340                          * amount of memory.  Just qsort 'em and we're done.
1341                          */
1342                         if (state->memtupcount > 1)
1343                         {
1344                                 /* Can we use the single-key sort function? */
1345                                 if (state->onlyKey != NULL)
1346                                         qsort_ssup(state->memtuples, state->memtupcount,
1347                                                            state->onlyKey);
1348                                 else
1349                                         qsort_tuple(state->memtuples,
1350                                                                 state->memtupcount,
1351                                                                 state->comparetup,
1352                                                                 state);
1353                         }
1354                         state->current = 0;
1355                         state->eof_reached = false;
1356                         state->markpos_offset = 0;
1357                         state->markpos_eof = false;
1358                         state->status = TSS_SORTEDINMEM;
1359                         break;
1360
1361                 case TSS_BOUNDED:
1362
1363                         /*
1364                          * We were able to accumulate all the tuples required for output
1365                          * in memory, using a heap to eliminate excess tuples.  Now we
1366                          * have to transform the heap to a properly-sorted array.
1367                          */
1368                         sort_bounded_heap(state);
1369                         state->current = 0;
1370                         state->eof_reached = false;
1371                         state->markpos_offset = 0;
1372                         state->markpos_eof = false;
1373                         state->status = TSS_SORTEDINMEM;
1374                         break;
1375
1376                 case TSS_BUILDRUNS:
1377
1378                         /*
1379                          * Finish tape-based sort.  First, flush all tuples remaining in
1380                          * memory out to tape; then merge until we have a single remaining
1381                          * run (or, if !randomAccess, one run per tape). Note that
1382                          * mergeruns sets the correct state->status.
1383                          */
1384                         dumptuples(state, true);
1385                         mergeruns(state);
1386                         state->eof_reached = false;
1387                         state->markpos_block = 0L;
1388                         state->markpos_offset = 0;
1389                         state->markpos_eof = false;
1390                         break;
1391
1392                 default:
1393                         elog(ERROR, "invalid tuplesort state");
1394                         break;
1395         }
1396
1397 #ifdef TRACE_SORT
1398         if (trace_sort)
1399         {
1400                 if (state->status == TSS_FINALMERGE)
1401                         elog(LOG, "performsort done (except %d-way final merge): %s",
1402                                  state->activeTapes,
1403                                  pg_rusage_show(&state->ru_start));
1404                 else
1405                         elog(LOG, "performsort done: %s",
1406                                  pg_rusage_show(&state->ru_start));
1407         }
1408 #endif
1409
1410         MemoryContextSwitchTo(oldcontext);
1411 }
1412
1413 /*
1414  * Internal routine to fetch the next tuple in either forward or back
1415  * direction into *stup.  Returns FALSE if no more tuples.
1416  * If *should_free is set, the caller must pfree stup.tuple when done with it.
1417  */
1418 static bool
1419 tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
1420                                                   SortTuple *stup, bool *should_free)
1421 {
1422         unsigned int tuplen;
1423
1424         switch (state->status)
1425         {
1426                 case TSS_SORTEDINMEM:
1427                         Assert(forward || state->randomAccess);
1428                         *should_free = false;
1429                         if (forward)
1430                         {
1431                                 if (state->current < state->memtupcount)
1432                                 {
1433                                         *stup = state->memtuples[state->current++];
1434                                         return true;
1435                                 }
1436                                 state->eof_reached = true;
1437
1438                                 /*
1439                                  * Complain if caller tries to retrieve more tuples than
1440                                  * originally asked for in a bounded sort.  This is because
1441                                  * returning EOF here might be the wrong thing.
1442                                  */
1443                                 if (state->bounded && state->current >= state->bound)
1444                                         elog(ERROR, "retrieved too many tuples in a bounded sort");
1445
1446                                 return false;
1447                         }
1448                         else
1449                         {
1450                                 if (state->current <= 0)
1451                                         return false;
1452
1453                                 /*
1454                                  * if all tuples are fetched already then we return last
1455                                  * tuple, else - tuple before last returned.
1456                                  */
1457                                 if (state->eof_reached)
1458                                         state->eof_reached = false;
1459                                 else
1460                                 {
1461                                         state->current--;       /* last returned tuple */
1462                                         if (state->current <= 0)
1463                                                 return false;
1464                                 }
1465                                 *stup = state->memtuples[state->current - 1];
1466                                 return true;
1467                         }
1468                         break;
1469
1470                 case TSS_SORTEDONTAPE:
1471                         Assert(forward || state->randomAccess);
1472                         *should_free = true;
1473                         if (forward)
1474                         {
1475                                 if (state->eof_reached)
1476                                         return false;
1477                                 if ((tuplen = getlen(state, state->result_tape, true)) != 0)
1478                                 {
1479                                         READTUP(state, stup, state->result_tape, tuplen);
1480                                         return true;
1481                                 }
1482                                 else
1483                                 {
1484                                         state->eof_reached = true;
1485                                         return false;
1486                                 }
1487                         }
1488
1489                         /*
1490                          * Backward.
1491                          *
1492                          * if all tuples are fetched already then we return last tuple,
1493                          * else - tuple before last returned.
1494                          */
1495                         if (state->eof_reached)
1496                         {
1497                                 /*
1498                                  * Seek position is pointing just past the zero tuplen at the
1499                                  * end of file; back up to fetch last tuple's ending length
1500                                  * word.  If seek fails we must have a completely empty file.
1501                                  */
1502                                 if (!LogicalTapeBackspace(state->tapeset,
1503                                                                                   state->result_tape,
1504                                                                                   2 * sizeof(unsigned int)))
1505                                         return false;
1506                                 state->eof_reached = false;
1507                         }
1508                         else
1509                         {
1510                                 /*
1511                                  * Back up and fetch previously-returned tuple's ending length
1512                                  * word.  If seek fails, assume we are at start of file.
1513                                  */
1514                                 if (!LogicalTapeBackspace(state->tapeset,
1515                                                                                   state->result_tape,
1516                                                                                   sizeof(unsigned int)))
1517                                         return false;
1518                                 tuplen = getlen(state, state->result_tape, false);
1519
1520                                 /*
1521                                  * Back up to get ending length word of tuple before it.
1522                                  */
1523                                 if (!LogicalTapeBackspace(state->tapeset,
1524                                                                                   state->result_tape,
1525                                                                                   tuplen + 2 * sizeof(unsigned int)))
1526                                 {
1527                                         /*
1528                                          * If that fails, presumably the prev tuple is the first
1529                                          * in the file.  Back up so that it becomes next to read
1530                                          * in forward direction (not obviously right, but that is
1531                                          * what in-memory case does).
1532                                          */
1533                                         if (!LogicalTapeBackspace(state->tapeset,
1534                                                                                           state->result_tape,
1535                                                                                           tuplen + sizeof(unsigned int)))
1536                                                 elog(ERROR, "bogus tuple length in backward scan");
1537                                         return false;
1538                                 }
1539                         }
1540
1541                         tuplen = getlen(state, state->result_tape, false);
1542
1543                         /*
1544                          * Now we have the length of the prior tuple, back up and read it.
1545                          * Note: READTUP expects we are positioned after the initial
1546                          * length word of the tuple, so back up to that point.
1547                          */
1548                         if (!LogicalTapeBackspace(state->tapeset,
1549                                                                           state->result_tape,
1550                                                                           tuplen))
1551                                 elog(ERROR, "bogus tuple length in backward scan");
1552                         READTUP(state, stup, state->result_tape, tuplen);
1553                         return true;
1554
1555                 case TSS_FINALMERGE:
1556                         Assert(forward);
1557                         *should_free = true;
1558
1559                         /*
1560                          * This code should match the inner loop of mergeonerun().
1561                          */
1562                         if (state->memtupcount > 0)
1563                         {
1564                                 int                     srcTape = state->memtuples[0].tupindex;
1565                                 Size            tuplen;
1566                                 int                     tupIndex;
1567                                 SortTuple  *newtup;
1568
1569                                 *stup = state->memtuples[0];
1570                                 /* returned tuple is no longer counted in our memory space */
1571                                 if (stup->tuple)
1572                                 {
1573                                         tuplen = GetMemoryChunkSpace(stup->tuple);
1574                                         state->availMem += tuplen;
1575                                         state->mergeavailmem[srcTape] += tuplen;
1576                                 }
1577                                 tuplesort_heap_siftup(state, false);
1578                                 if ((tupIndex = state->mergenext[srcTape]) == 0)
1579                                 {
1580                                         /*
1581                                          * out of preloaded data on this tape, try to read more
1582                                          *
1583                                          * Unlike mergeonerun(), we only preload from the single
1584                                          * tape that's run dry.  See mergepreread() comments.
1585                                          */
1586                                         mergeprereadone(state, srcTape);
1587
1588                                         /*
1589                                          * if still no data, we've reached end of run on this tape
1590                                          */
1591                                         if ((tupIndex = state->mergenext[srcTape]) == 0)
1592                                                 return true;
1593                                 }
1594                                 /* pull next preread tuple from list, insert in heap */
1595                                 newtup = &state->memtuples[tupIndex];
1596                                 state->mergenext[srcTape] = newtup->tupindex;
1597                                 if (state->mergenext[srcTape] == 0)
1598                                         state->mergelast[srcTape] = 0;
1599                                 tuplesort_heap_insert(state, newtup, srcTape, false);
1600                                 /* put the now-unused memtuples entry on the freelist */
1601                                 newtup->tupindex = state->mergefreelist;
1602                                 state->mergefreelist = tupIndex;
1603                                 state->mergeavailslots[srcTape]++;
1604                                 return true;
1605                         }
1606                         return false;
1607
1608                 default:
1609                         elog(ERROR, "invalid tuplesort state");
1610                         return false;           /* keep compiler quiet */
1611         }
1612 }
1613
1614 /*
1615  * Fetch the next tuple in either forward or back direction.
1616  * If successful, put tuple in slot and return TRUE; else, clear the slot
1617  * and return FALSE.
1618  */
1619 bool
1620 tuplesort_gettupleslot(Tuplesortstate *state, bool forward,
1621                                            TupleTableSlot *slot)
1622 {
1623         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1624         SortTuple       stup;
1625         bool            should_free;
1626
1627         if (!tuplesort_gettuple_common(state, forward, &stup, &should_free))
1628                 stup.tuple = NULL;
1629
1630         MemoryContextSwitchTo(oldcontext);
1631
1632         if (stup.tuple)
1633         {
1634                 ExecStoreMinimalTuple((MinimalTuple) stup.tuple, slot, should_free);
1635                 return true;
1636         }
1637         else
1638         {
1639                 ExecClearTuple(slot);
1640                 return false;
1641         }
1642 }
1643
1644 /*
1645  * Fetch the next tuple in either forward or back direction.
1646  * Returns NULL if no more tuples.  If *should_free is set, the
1647  * caller must pfree the returned tuple when done with it.
1648  */
1649 HeapTuple
1650 tuplesort_getheaptuple(Tuplesortstate *state, bool forward, bool *should_free)
1651 {
1652         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1653         SortTuple       stup;
1654
1655         if (!tuplesort_gettuple_common(state, forward, &stup, should_free))
1656                 stup.tuple = NULL;
1657
1658         MemoryContextSwitchTo(oldcontext);
1659
1660         return stup.tuple;
1661 }
1662
1663 /*
1664  * Fetch the next index tuple in either forward or back direction.
1665  * Returns NULL if no more tuples.  If *should_free is set, the
1666  * caller must pfree the returned tuple when done with it.
1667  */
1668 IndexTuple
1669 tuplesort_getindextuple(Tuplesortstate *state, bool forward,
1670                                                 bool *should_free)
1671 {
1672         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1673         SortTuple       stup;
1674
1675         if (!tuplesort_gettuple_common(state, forward, &stup, should_free))
1676                 stup.tuple = NULL;
1677
1678         MemoryContextSwitchTo(oldcontext);
1679
1680         return (IndexTuple) stup.tuple;
1681 }
1682
1683 /*
1684  * Fetch the next Datum in either forward or back direction.
1685  * Returns FALSE if no more datums.
1686  *
1687  * If the Datum is pass-by-ref type, the returned value is freshly palloc'd
1688  * and is now owned by the caller.
1689  */
1690 bool
1691 tuplesort_getdatum(Tuplesortstate *state, bool forward,
1692                                    Datum *val, bool *isNull)
1693 {
1694         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1695         SortTuple       stup;
1696         bool            should_free;
1697
1698         if (!tuplesort_gettuple_common(state, forward, &stup, &should_free))
1699         {
1700                 MemoryContextSwitchTo(oldcontext);
1701                 return false;
1702         }
1703
1704         if (stup.isnull1 || state->datumTypeByVal)
1705         {
1706                 *val = stup.datum1;
1707                 *isNull = stup.isnull1;
1708         }
1709         else
1710         {
1711                 if (should_free)
1712                         *val = stup.datum1;
1713                 else
1714                         *val = datumCopy(stup.datum1, false, state->datumTypeLen);
1715                 *isNull = false;
1716         }
1717
1718         MemoryContextSwitchTo(oldcontext);
1719
1720         return true;
1721 }
1722
1723 /*
1724  * Advance over N tuples in either forward or back direction,
1725  * without returning any data.  N==0 is a no-op.
1726  * Returns TRUE if successful, FALSE if ran out of tuples.
1727  */
1728 bool
1729 tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
1730 {
1731         MemoryContext oldcontext;
1732
1733         /*
1734          * We don't actually support backwards skip yet, because no callers need
1735          * it.  The API is designed to allow for that later, though.
1736          */
1737         Assert(forward);
1738         Assert(ntuples >= 0);
1739
1740         switch (state->status)
1741         {
1742                 case TSS_SORTEDINMEM:
1743                         if (state->memtupcount - state->current >= ntuples)
1744                         {
1745                                 state->current += ntuples;
1746                                 return true;
1747                         }
1748                         state->current = state->memtupcount;
1749                         state->eof_reached = true;
1750
1751                         /*
1752                          * Complain if caller tries to retrieve more tuples than
1753                          * originally asked for in a bounded sort.  This is because
1754                          * returning EOF here might be the wrong thing.
1755                          */
1756                         if (state->bounded && state->current >= state->bound)
1757                                 elog(ERROR, "retrieved too many tuples in a bounded sort");
1758
1759                         return false;
1760
1761                 case TSS_SORTEDONTAPE:
1762                 case TSS_FINALMERGE:
1763
1764                         /*
1765                          * We could probably optimize these cases better, but for now it's
1766                          * not worth the trouble.
1767                          */
1768                         oldcontext = MemoryContextSwitchTo(state->sortcontext);
1769                         while (ntuples-- > 0)
1770                         {
1771                                 SortTuple       stup;
1772                                 bool            should_free;
1773
1774                                 if (!tuplesort_gettuple_common(state, forward,
1775                                                                                            &stup, &should_free))
1776                                 {
1777                                         MemoryContextSwitchTo(oldcontext);
1778                                         return false;
1779                                 }
1780                                 if (should_free && stup.tuple)
1781                                         pfree(stup.tuple);
1782                                 CHECK_FOR_INTERRUPTS();
1783                         }
1784                         MemoryContextSwitchTo(oldcontext);
1785                         return true;
1786
1787                 default:
1788                         elog(ERROR, "invalid tuplesort state");
1789                         return false;           /* keep compiler quiet */
1790         }
1791 }
1792
1793 /*
1794  * tuplesort_merge_order - report merge order we'll use for given memory
1795  * (note: "merge order" just means the number of input tapes in the merge).
1796  *
1797  * This is exported for use by the planner.  allowedMem is in bytes.
1798  */
1799 int
1800 tuplesort_merge_order(int64 allowedMem)
1801 {
1802         int                     mOrder;
1803
1804         /*
1805          * We need one tape for each merge input, plus another one for the output,
1806          * and each of these tapes needs buffer space.  In addition we want
1807          * MERGE_BUFFER_SIZE workspace per input tape (but the output tape doesn't
1808          * count).
1809          *
1810          * Note: you might be thinking we need to account for the memtuples[]
1811          * array in this calculation, but we effectively treat that as part of the
1812          * MERGE_BUFFER_SIZE workspace.
1813          */
1814         mOrder = (allowedMem - TAPE_BUFFER_OVERHEAD) /
1815                 (MERGE_BUFFER_SIZE + TAPE_BUFFER_OVERHEAD);
1816
1817         /* Even in minimum memory, use at least a MINORDER merge */
1818         mOrder = Max(mOrder, MINORDER);
1819
1820         return mOrder;
1821 }
1822
1823 /*
1824  * inittapes - initialize for tape sorting.
1825  *
1826  * This is called only if we have found we don't have room to sort in memory.
1827  */
1828 static void
1829 inittapes(Tuplesortstate *state)
1830 {
1831         int                     maxTapes,
1832                                 ntuples,
1833                                 j;
1834         int64           tapeSpace;
1835
1836         /* Compute number of tapes to use: merge order plus 1 */
1837         maxTapes = tuplesort_merge_order(state->allowedMem) + 1;
1838
1839         /*
1840          * We must have at least 2*maxTapes slots in the memtuples[] array, else
1841          * we'd not have room for merge heap plus preread.  It seems unlikely that
1842          * this case would ever occur, but be safe.
1843          */
1844         maxTapes = Min(maxTapes, state->memtupsize / 2);
1845
1846         state->maxTapes = maxTapes;
1847         state->tapeRange = maxTapes - 1;
1848
1849 #ifdef TRACE_SORT
1850         if (trace_sort)
1851                 elog(LOG, "switching to external sort with %d tapes: %s",
1852                          maxTapes, pg_rusage_show(&state->ru_start));
1853 #endif
1854
1855         /*
1856          * Decrease availMem to reflect the space needed for tape buffers; but
1857          * don't decrease it to the point that we have no room for tuples. (That
1858          * case is only likely to occur if sorting pass-by-value Datums; in all
1859          * other scenarios the memtuples[] array is unlikely to occupy more than
1860          * half of allowedMem.  In the pass-by-value case it's not important to
1861          * account for tuple space, so we don't care if LACKMEM becomes
1862          * inaccurate.)
1863          */
1864         tapeSpace = (int64) maxTapes *TAPE_BUFFER_OVERHEAD;
1865
1866         if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
1867                 USEMEM(state, tapeSpace);
1868
1869         /*
1870          * Make sure that the temp file(s) underlying the tape set are created in
1871          * suitable temp tablespaces.
1872          */
1873         PrepareTempTablespaces();
1874
1875         /*
1876          * Create the tape set and allocate the per-tape data arrays.
1877          */
1878         state->tapeset = LogicalTapeSetCreate(maxTapes);
1879
1880         state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool));
1881         state->mergenext = (int *) palloc0(maxTapes * sizeof(int));
1882         state->mergelast = (int *) palloc0(maxTapes * sizeof(int));
1883         state->mergeavailslots = (int *) palloc0(maxTapes * sizeof(int));
1884         state->mergeavailmem = (int64 *) palloc0(maxTapes * sizeof(int64));
1885         state->tp_fib = (int *) palloc0(maxTapes * sizeof(int));
1886         state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
1887         state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int));
1888         state->tp_tapenum = (int *) palloc0(maxTapes * sizeof(int));
1889
1890         /*
1891          * Convert the unsorted contents of memtuples[] into a heap. Each tuple is
1892          * marked as belonging to run number zero.
1893          *
1894          * NOTE: we pass false for checkIndex since there's no point in comparing
1895          * indexes in this step, even though we do intend the indexes to be part
1896          * of the sort key...
1897          */
1898         ntuples = state->memtupcount;
1899         state->memtupcount = 0;         /* make the heap empty */
1900         for (j = 0; j < ntuples; j++)
1901         {
1902                 /* Must copy source tuple to avoid possible overwrite */
1903                 SortTuple       stup = state->memtuples[j];
1904
1905                 tuplesort_heap_insert(state, &stup, 0, false);
1906         }
1907         Assert(state->memtupcount == ntuples);
1908
1909         state->currentRun = 0;
1910
1911         /*
1912          * Initialize variables of Algorithm D (step D1).
1913          */
1914         for (j = 0; j < maxTapes; j++)
1915         {
1916                 state->tp_fib[j] = 1;
1917                 state->tp_runs[j] = 0;
1918                 state->tp_dummy[j] = 1;
1919                 state->tp_tapenum[j] = j;
1920         }
1921         state->tp_fib[state->tapeRange] = 0;
1922         state->tp_dummy[state->tapeRange] = 0;
1923
1924         state->Level = 1;
1925         state->destTape = 0;
1926
1927         state->status = TSS_BUILDRUNS;
1928 }
1929
1930 /*
1931  * selectnewtape -- select new tape for new initial run.
1932  *
1933  * This is called after finishing a run when we know another run
1934  * must be started.  This implements steps D3, D4 of Algorithm D.
1935  */
1936 static void
1937 selectnewtape(Tuplesortstate *state)
1938 {
1939         int                     j;
1940         int                     a;
1941
1942         /* Step D3: advance j (destTape) */
1943         if (state->tp_dummy[state->destTape] < state->tp_dummy[state->destTape + 1])
1944         {
1945                 state->destTape++;
1946                 return;
1947         }
1948         if (state->tp_dummy[state->destTape] != 0)
1949         {
1950                 state->destTape = 0;
1951                 return;
1952         }
1953
1954         /* Step D4: increase level */
1955         state->Level++;
1956         a = state->tp_fib[0];
1957         for (j = 0; j < state->tapeRange; j++)
1958         {
1959                 state->tp_dummy[j] = a + state->tp_fib[j + 1] - state->tp_fib[j];
1960                 state->tp_fib[j] = a + state->tp_fib[j + 1];
1961         }
1962         state->destTape = 0;
1963 }
1964
1965 /*
1966  * mergeruns -- merge all the completed initial runs.
1967  *
1968  * This implements steps D5, D6 of Algorithm D.  All input data has
1969  * already been written to initial runs on tape (see dumptuples).
1970  */
1971 static void
1972 mergeruns(Tuplesortstate *state)
1973 {
1974         int                     tapenum,
1975                                 svTape,
1976                                 svRuns,
1977                                 svDummy;
1978
1979         Assert(state->status == TSS_BUILDRUNS);
1980         Assert(state->memtupcount == 0);
1981
1982         /*
1983          * If we produced only one initial run (quite likely if the total data
1984          * volume is between 1X and 2X workMem), we can just use that tape as the
1985          * finished output, rather than doing a useless merge.  (This obvious
1986          * optimization is not in Knuth's algorithm.)
1987          */
1988         if (state->currentRun == 1)
1989         {
1990                 state->result_tape = state->tp_tapenum[state->destTape];
1991                 /* must freeze and rewind the finished output tape */
1992                 LogicalTapeFreeze(state->tapeset, state->result_tape);
1993                 state->status = TSS_SORTEDONTAPE;
1994                 return;
1995         }
1996
1997         /* End of step D2: rewind all output tapes to prepare for merging */
1998         for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
1999                 LogicalTapeRewind(state->tapeset, tapenum, false);
2000
2001         for (;;)
2002         {
2003                 /*
2004                  * At this point we know that tape[T] is empty.  If there's just one
2005                  * (real or dummy) run left on each input tape, then only one merge
2006                  * pass remains.  If we don't have to produce a materialized sorted
2007                  * tape, we can stop at this point and do the final merge on-the-fly.
2008                  */
2009                 if (!state->randomAccess)
2010                 {
2011                         bool            allOneRun = true;
2012
2013                         Assert(state->tp_runs[state->tapeRange] == 0);
2014                         for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
2015                         {
2016                                 if (state->tp_runs[tapenum] + state->tp_dummy[tapenum] != 1)
2017                                 {
2018                                         allOneRun = false;
2019                                         break;
2020                                 }
2021                         }
2022                         if (allOneRun)
2023                         {
2024                                 /* Tell logtape.c we won't be writing anymore */
2025                                 LogicalTapeSetForgetFreeSpace(state->tapeset);
2026                                 /* Initialize for the final merge pass */
2027                                 beginmerge(state);
2028                                 state->status = TSS_FINALMERGE;
2029                                 return;
2030                         }
2031                 }
2032
2033                 /* Step D5: merge runs onto tape[T] until tape[P] is empty */
2034                 while (state->tp_runs[state->tapeRange - 1] ||
2035                            state->tp_dummy[state->tapeRange - 1])
2036                 {
2037                         bool            allDummy = true;
2038
2039                         for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
2040                         {
2041                                 if (state->tp_dummy[tapenum] == 0)
2042                                 {
2043                                         allDummy = false;
2044                                         break;
2045                                 }
2046                         }
2047
2048                         if (allDummy)
2049                         {
2050                                 state->tp_dummy[state->tapeRange]++;
2051                                 for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
2052                                         state->tp_dummy[tapenum]--;
2053                         }
2054                         else
2055                                 mergeonerun(state);
2056                 }
2057
2058                 /* Step D6: decrease level */
2059                 if (--state->Level == 0)
2060                         break;
2061                 /* rewind output tape T to use as new input */
2062                 LogicalTapeRewind(state->tapeset, state->tp_tapenum[state->tapeRange],
2063                                                   false);
2064                 /* rewind used-up input tape P, and prepare it for write pass */
2065                 LogicalTapeRewind(state->tapeset, state->tp_tapenum[state->tapeRange - 1],
2066                                                   true);
2067                 state->tp_runs[state->tapeRange - 1] = 0;
2068
2069                 /*
2070                  * reassign tape units per step D6; note we no longer care about A[]
2071                  */
2072                 svTape = state->tp_tapenum[state->tapeRange];
2073                 svDummy = state->tp_dummy[state->tapeRange];
2074                 svRuns = state->tp_runs[state->tapeRange];
2075                 for (tapenum = state->tapeRange; tapenum > 0; tapenum--)
2076                 {
2077                         state->tp_tapenum[tapenum] = state->tp_tapenum[tapenum - 1];
2078                         state->tp_dummy[tapenum] = state->tp_dummy[tapenum - 1];
2079                         state->tp_runs[tapenum] = state->tp_runs[tapenum - 1];
2080                 }
2081                 state->tp_tapenum[0] = svTape;
2082                 state->tp_dummy[0] = svDummy;
2083                 state->tp_runs[0] = svRuns;
2084         }
2085
2086         /*
2087          * Done.  Knuth says that the result is on TAPE[1], but since we exited
2088          * the loop without performing the last iteration of step D6, we have not
2089          * rearranged the tape unit assignment, and therefore the result is on
2090          * TAPE[T].  We need to do it this way so that we can freeze the final
2091          * output tape while rewinding it.  The last iteration of step D6 would be
2092          * a waste of cycles anyway...
2093          */
2094         state->result_tape = state->tp_tapenum[state->tapeRange];
2095         LogicalTapeFreeze(state->tapeset, state->result_tape);
2096         state->status = TSS_SORTEDONTAPE;
2097 }
2098
2099 /*
2100  * Merge one run from each input tape, except ones with dummy runs.
2101  *
2102  * This is the inner loop of Algorithm D step D5.  We know that the
2103  * output tape is TAPE[T].
2104  */
2105 static void
2106 mergeonerun(Tuplesortstate *state)
2107 {
2108         int                     destTape = state->tp_tapenum[state->tapeRange];
2109         int                     srcTape;
2110         int                     tupIndex;
2111         SortTuple  *tup;
2112         int64           priorAvail,
2113                                 spaceFreed;
2114
2115         /*
2116          * Start the merge by loading one tuple from each active source tape into
2117          * the heap.  We can also decrease the input run/dummy run counts.
2118          */
2119         beginmerge(state);
2120
2121         /*
2122          * Execute merge by repeatedly extracting lowest tuple in heap, writing it
2123          * out, and replacing it with next tuple from same tape (if there is
2124          * another one).
2125          */
2126         while (state->memtupcount > 0)
2127         {
2128                 /* write the tuple to destTape */
2129                 priorAvail = state->availMem;
2130                 srcTape = state->memtuples[0].tupindex;
2131                 WRITETUP(state, destTape, &state->memtuples[0]);
2132                 /* writetup adjusted total free space, now fix per-tape space */
2133                 spaceFreed = state->availMem - priorAvail;
2134                 state->mergeavailmem[srcTape] += spaceFreed;
2135                 /* compact the heap */
2136                 tuplesort_heap_siftup(state, false);
2137                 if ((tupIndex = state->mergenext[srcTape]) == 0)
2138                 {
2139                         /* out of preloaded data on this tape, try to read more */
2140                         mergepreread(state);
2141                         /* if still no data, we've reached end of run on this tape */
2142                         if ((tupIndex = state->mergenext[srcTape]) == 0)
2143                                 continue;
2144                 }
2145                 /* pull next preread tuple from list, insert in heap */
2146                 tup = &state->memtuples[tupIndex];
2147                 state->mergenext[srcTape] = tup->tupindex;
2148                 if (state->mergenext[srcTape] == 0)
2149                         state->mergelast[srcTape] = 0;
2150                 tuplesort_heap_insert(state, tup, srcTape, false);
2151                 /* put the now-unused memtuples entry on the freelist */
2152                 tup->tupindex = state->mergefreelist;
2153                 state->mergefreelist = tupIndex;
2154                 state->mergeavailslots[srcTape]++;
2155         }
2156
2157         /*
2158          * When the heap empties, we're done.  Write an end-of-run marker on the
2159          * output tape, and increment its count of real runs.
2160          */
2161         markrunend(state, destTape);
2162         state->tp_runs[state->tapeRange]++;
2163
2164 #ifdef TRACE_SORT
2165         if (trace_sort)
2166                 elog(LOG, "finished %d-way merge step: %s", state->activeTapes,
2167                          pg_rusage_show(&state->ru_start));
2168 #endif
2169 }
2170
2171 /*
2172  * beginmerge - initialize for a merge pass
2173  *
2174  * We decrease the counts of real and dummy runs for each tape, and mark
2175  * which tapes contain active input runs in mergeactive[].  Then, load
2176  * as many tuples as we can from each active input tape, and finally
2177  * fill the merge heap with the first tuple from each active tape.
2178  */
2179 static void
2180 beginmerge(Tuplesortstate *state)
2181 {
2182         int                     activeTapes;
2183         int                     tapenum;
2184         int                     srcTape;
2185         int                     slotsPerTape;
2186         int64           spacePerTape;
2187
2188         /* Heap should be empty here */
2189         Assert(state->memtupcount == 0);
2190
2191         /* Adjust run counts and mark the active tapes */
2192         memset(state->mergeactive, 0,
2193                    state->maxTapes * sizeof(*state->mergeactive));
2194         activeTapes = 0;
2195         for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
2196         {
2197                 if (state->tp_dummy[tapenum] > 0)
2198                         state->tp_dummy[tapenum]--;
2199                 else
2200                 {
2201                         Assert(state->tp_runs[tapenum] > 0);
2202                         state->tp_runs[tapenum]--;
2203                         srcTape = state->tp_tapenum[tapenum];
2204                         state->mergeactive[srcTape] = true;
2205                         activeTapes++;
2206                 }
2207         }
2208         state->activeTapes = activeTapes;
2209
2210         /* Clear merge-pass state variables */
2211         memset(state->mergenext, 0,
2212                    state->maxTapes * sizeof(*state->mergenext));
2213         memset(state->mergelast, 0,
2214                    state->maxTapes * sizeof(*state->mergelast));
2215         state->mergefreelist = 0;       /* nothing in the freelist */
2216         state->mergefirstfree = activeTapes;            /* 1st slot avail for preread */
2217
2218         /*
2219          * Initialize space allocation to let each active input tape have an equal
2220          * share of preread space.
2221          */
2222         Assert(activeTapes > 0);
2223         slotsPerTape = (state->memtupsize - state->mergefirstfree) / activeTapes;
2224         Assert(slotsPerTape > 0);
2225         spacePerTape = state->availMem / activeTapes;
2226         for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
2227         {
2228                 if (state->mergeactive[srcTape])
2229                 {
2230                         state->mergeavailslots[srcTape] = slotsPerTape;
2231                         state->mergeavailmem[srcTape] = spacePerTape;
2232                 }
2233         }
2234
2235         /*
2236          * Preread as many tuples as possible (and at least one) from each active
2237          * tape
2238          */
2239         mergepreread(state);
2240
2241         /* Load the merge heap with the first tuple from each input tape */
2242         for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
2243         {
2244                 int                     tupIndex = state->mergenext[srcTape];
2245                 SortTuple  *tup;
2246
2247                 if (tupIndex)
2248                 {
2249                         tup = &state->memtuples[tupIndex];
2250                         state->mergenext[srcTape] = tup->tupindex;
2251                         if (state->mergenext[srcTape] == 0)
2252                                 state->mergelast[srcTape] = 0;
2253                         tuplesort_heap_insert(state, tup, srcTape, false);
2254                         /* put the now-unused memtuples entry on the freelist */
2255                         tup->tupindex = state->mergefreelist;
2256                         state->mergefreelist = tupIndex;
2257                         state->mergeavailslots[srcTape]++;
2258                 }
2259         }
2260 }
2261
2262 /*
2263  * mergepreread - load tuples from merge input tapes
2264  *
2265  * This routine exists to improve sequentiality of reads during a merge pass,
2266  * as explained in the header comments of this file.  Load tuples from each
2267  * active source tape until the tape's run is exhausted or it has used up
2268  * its fair share of available memory.  In any case, we guarantee that there
2269  * is at least one preread tuple available from each unexhausted input tape.
2270  *
2271  * We invoke this routine at the start of a merge pass for initial load,
2272  * and then whenever any tape's preread data runs out.  Note that we load
2273  * as much data as possible from all tapes, not just the one that ran out.
2274  * This is because logtape.c works best with a usage pattern that alternates
2275  * between reading a lot of data and writing a lot of data, so whenever we
2276  * are forced to read, we should fill working memory completely.
2277  *
2278  * In FINALMERGE state, we *don't* use this routine, but instead just preread
2279  * from the single tape that ran dry.  There's no read/write alternation in
2280  * that state and so no point in scanning through all the tapes to fix one.
2281  * (Moreover, there may be quite a lot of inactive tapes in that state, since
2282  * we might have had many fewer runs than tapes.  In a regular tape-to-tape
2283  * merge we can expect most of the tapes to be active.)
2284  */
2285 static void
2286 mergepreread(Tuplesortstate *state)
2287 {
2288         int                     srcTape;
2289
2290         for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
2291                 mergeprereadone(state, srcTape);
2292 }
2293
2294 /*
2295  * mergeprereadone - load tuples from one merge input tape
2296  *
2297  * Read tuples from the specified tape until it has used up its free memory
2298  * or array slots; but ensure that we have at least one tuple, if any are
2299  * to be had.
2300  */
2301 static void
2302 mergeprereadone(Tuplesortstate *state, int srcTape)
2303 {
2304         unsigned int tuplen;
2305         SortTuple       stup;
2306         int                     tupIndex;
2307         int64           priorAvail,
2308                                 spaceUsed;
2309
2310         if (!state->mergeactive[srcTape])
2311                 return;                                 /* tape's run is already exhausted */
2312         priorAvail = state->availMem;
2313         state->availMem = state->mergeavailmem[srcTape];
2314         while ((state->mergeavailslots[srcTape] > 0 && !LACKMEM(state)) ||
2315                    state->mergenext[srcTape] == 0)
2316         {
2317                 /* read next tuple, if any */
2318                 if ((tuplen = getlen(state, srcTape, true)) == 0)
2319                 {
2320                         state->mergeactive[srcTape] = false;
2321                         break;
2322                 }
2323                 READTUP(state, &stup, srcTape, tuplen);
2324                 /* find a free slot in memtuples[] for it */
2325                 tupIndex = state->mergefreelist;
2326                 if (tupIndex)
2327                         state->mergefreelist = state->memtuples[tupIndex].tupindex;
2328                 else
2329                 {
2330                         tupIndex = state->mergefirstfree++;
2331                         Assert(tupIndex < state->memtupsize);
2332                 }
2333                 state->mergeavailslots[srcTape]--;
2334                 /* store tuple, append to list for its tape */
2335                 stup.tupindex = 0;
2336                 state->memtuples[tupIndex] = stup;
2337                 if (state->mergelast[srcTape])
2338                         state->memtuples[state->mergelast[srcTape]].tupindex = tupIndex;
2339                 else
2340                         state->mergenext[srcTape] = tupIndex;
2341                 state->mergelast[srcTape] = tupIndex;
2342         }
2343         /* update per-tape and global availmem counts */
2344         spaceUsed = state->mergeavailmem[srcTape] - state->availMem;
2345         state->mergeavailmem[srcTape] = state->availMem;
2346         state->availMem = priorAvail - spaceUsed;
2347 }
2348
2349 /*
2350  * dumptuples - remove tuples from heap and write to tape
2351  *
2352  * This is used during initial-run building, but not during merging.
2353  *
2354  * When alltuples = false, dump only enough tuples to get under the
2355  * availMem limit (and leave at least one tuple in the heap in any case,
2356  * since puttuple assumes it always has a tuple to compare to).  We also
2357  * insist there be at least one free slot in the memtuples[] array.
2358  *
2359  * When alltuples = true, dump everything currently in memory.
2360  * (This case is only used at end of input data.)
2361  *
2362  * If we empty the heap, close out the current run and return (this should
2363  * only happen at end of input data).  If we see that the tuple run number
2364  * at the top of the heap has changed, start a new run.
2365  */
2366 static void
2367 dumptuples(Tuplesortstate *state, bool alltuples)
2368 {
2369         while (alltuples ||
2370                    (LACKMEM(state) && state->memtupcount > 1) ||
2371                    state->memtupcount >= state->memtupsize)
2372         {
2373                 /*
2374                  * Dump the heap's frontmost entry, and sift up to remove it from the
2375                  * heap.
2376                  */
2377                 Assert(state->memtupcount > 0);
2378                 WRITETUP(state, state->tp_tapenum[state->destTape],
2379                                  &state->memtuples[0]);
2380                 tuplesort_heap_siftup(state, true);
2381
2382                 /*
2383                  * If the heap is empty *or* top run number has changed, we've
2384                  * finished the current run.
2385                  */
2386                 if (state->memtupcount == 0 ||
2387                         state->currentRun != state->memtuples[0].tupindex)
2388                 {
2389                         markrunend(state, state->tp_tapenum[state->destTape]);
2390                         state->currentRun++;
2391                         state->tp_runs[state->destTape]++;
2392                         state->tp_dummy[state->destTape]--; /* per Alg D step D2 */
2393
2394 #ifdef TRACE_SORT
2395                         if (trace_sort)
2396                                 elog(LOG, "finished writing%s run %d to tape %d: %s",
2397                                          (state->memtupcount == 0) ? " final" : "",
2398                                          state->currentRun, state->destTape,
2399                                          pg_rusage_show(&state->ru_start));
2400 #endif
2401
2402                         /*
2403                          * Done if heap is empty, else prepare for new run.
2404                          */
2405                         if (state->memtupcount == 0)
2406                                 break;
2407                         Assert(state->currentRun == state->memtuples[0].tupindex);
2408                         selectnewtape(state);
2409                 }
2410         }
2411 }
2412
2413 /*
2414  * tuplesort_rescan             - rewind and replay the scan
2415  */
2416 void
2417 tuplesort_rescan(Tuplesortstate *state)
2418 {
2419         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
2420
2421         Assert(state->randomAccess);
2422
2423         switch (state->status)
2424         {
2425                 case TSS_SORTEDINMEM:
2426                         state->current = 0;
2427                         state->eof_reached = false;
2428                         state->markpos_offset = 0;
2429                         state->markpos_eof = false;
2430                         break;
2431                 case TSS_SORTEDONTAPE:
2432                         LogicalTapeRewind(state->tapeset,
2433                                                           state->result_tape,
2434                                                           false);
2435                         state->eof_reached = false;
2436                         state->markpos_block = 0L;
2437                         state->markpos_offset = 0;
2438                         state->markpos_eof = false;
2439                         break;
2440                 default:
2441                         elog(ERROR, "invalid tuplesort state");
2442                         break;
2443         }
2444
2445         MemoryContextSwitchTo(oldcontext);
2446 }
2447
2448 /*
2449  * tuplesort_markpos    - saves current position in the merged sort file
2450  */
2451 void
2452 tuplesort_markpos(Tuplesortstate *state)
2453 {
2454         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
2455
2456         Assert(state->randomAccess);
2457
2458         switch (state->status)
2459         {
2460                 case TSS_SORTEDINMEM:
2461                         state->markpos_offset = state->current;
2462                         state->markpos_eof = state->eof_reached;
2463                         break;
2464                 case TSS_SORTEDONTAPE:
2465                         LogicalTapeTell(state->tapeset,
2466                                                         state->result_tape,
2467                                                         &state->markpos_block,
2468                                                         &state->markpos_offset);
2469                         state->markpos_eof = state->eof_reached;
2470                         break;
2471                 default:
2472                         elog(ERROR, "invalid tuplesort state");
2473                         break;
2474         }
2475
2476         MemoryContextSwitchTo(oldcontext);
2477 }
2478
2479 /*
2480  * tuplesort_restorepos - restores current position in merged sort file to
2481  *                                                last saved position
2482  */
2483 void
2484 tuplesort_restorepos(Tuplesortstate *state)
2485 {
2486         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
2487
2488         Assert(state->randomAccess);
2489
2490         switch (state->status)
2491         {
2492                 case TSS_SORTEDINMEM:
2493                         state->current = state->markpos_offset;
2494                         state->eof_reached = state->markpos_eof;
2495                         break;
2496                 case TSS_SORTEDONTAPE:
2497                         if (!LogicalTapeSeek(state->tapeset,
2498                                                                  state->result_tape,
2499                                                                  state->markpos_block,
2500                                                                  state->markpos_offset))
2501                                 elog(ERROR, "tuplesort_restorepos failed");
2502                         state->eof_reached = state->markpos_eof;
2503                         break;
2504                 default:
2505                         elog(ERROR, "invalid tuplesort state");
2506                         break;
2507         }
2508
2509         MemoryContextSwitchTo(oldcontext);
2510 }
2511
2512 /*
2513  * tuplesort_get_stats - extract summary statistics
2514  *
2515  * This can be called after tuplesort_performsort() finishes to obtain
2516  * printable summary information about how the sort was performed.
2517  * spaceUsed is measured in kilobytes.
2518  */
2519 void
2520 tuplesort_get_stats(Tuplesortstate *state,
2521                                         const char **sortMethod,
2522                                         const char **spaceType,
2523                                         long *spaceUsed)
2524 {
2525         /*
2526          * Note: it might seem we should provide both memory and disk usage for a
2527          * disk-based sort.  However, the current code doesn't track memory space
2528          * accurately once we have begun to return tuples to the caller (since we
2529          * don't account for pfree's the caller is expected to do), so we cannot
2530          * rely on availMem in a disk sort.  This does not seem worth the overhead
2531          * to fix.  Is it worth creating an API for the memory context code to
2532          * tell us how much is actually used in sortcontext?
2533          */
2534         if (state->tapeset)
2535         {
2536                 *spaceType = "Disk";
2537                 *spaceUsed = LogicalTapeSetBlocks(state->tapeset) * (BLCKSZ / 1024);
2538         }
2539         else
2540         {
2541                 *spaceType = "Memory";
2542                 *spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
2543         }
2544
2545         switch (state->status)
2546         {
2547                 case TSS_SORTEDINMEM:
2548                         if (state->boundUsed)
2549                                 *sortMethod = "top-N heapsort";
2550                         else
2551                                 *sortMethod = "quicksort";
2552                         break;
2553                 case TSS_SORTEDONTAPE:
2554                         *sortMethod = "external sort";
2555                         break;
2556                 case TSS_FINALMERGE:
2557                         *sortMethod = "external merge";
2558                         break;
2559                 default:
2560                         *sortMethod = "still in progress";
2561                         break;
2562         }
2563 }
2564
2565
2566 /*
2567  * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
2568  *
2569  * Compare two SortTuples.  If checkIndex is true, use the tuple index
2570  * as the front of the sort key; otherwise, no.
2571  */
2572
2573 #define HEAPCOMPARE(tup1,tup2) \
2574         (checkIndex && ((tup1)->tupindex != (tup2)->tupindex) ? \
2575          ((tup1)->tupindex) - ((tup2)->tupindex) : \
2576          COMPARETUP(state, tup1, tup2))
2577
2578 /*
2579  * Convert the existing unordered array of SortTuples to a bounded heap,
2580  * discarding all but the smallest "state->bound" tuples.
2581  *
2582  * When working with a bounded heap, we want to keep the largest entry
2583  * at the root (array entry zero), instead of the smallest as in the normal
2584  * sort case.  This allows us to discard the largest entry cheaply.
2585  * Therefore, we temporarily reverse the sort direction.
2586  *
2587  * We assume that all entries in a bounded heap will always have tupindex
2588  * zero; it therefore doesn't matter that HEAPCOMPARE() doesn't reverse
2589  * the direction of comparison for tupindexes.
2590  */
2591 static void
2592 make_bounded_heap(Tuplesortstate *state)
2593 {
2594         int                     tupcount = state->memtupcount;
2595         int                     i;
2596
2597         Assert(state->status == TSS_INITIAL);
2598         Assert(state->bounded);
2599         Assert(tupcount >= state->bound);
2600
2601         /* Reverse sort direction so largest entry will be at root */
2602         REVERSEDIRECTION(state);
2603
2604         state->memtupcount = 0;         /* make the heap empty */
2605         for (i = 0; i < tupcount; i++)
2606         {
2607                 if (state->memtupcount >= state->bound &&
2608                   COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0)
2609                 {
2610                         /* New tuple would just get thrown out, so skip it */
2611                         free_sort_tuple(state, &state->memtuples[i]);
2612                         CHECK_FOR_INTERRUPTS();
2613                 }
2614                 else
2615                 {
2616                         /* Insert next tuple into heap */
2617                         /* Must copy source tuple to avoid possible overwrite */
2618                         SortTuple       stup = state->memtuples[i];
2619
2620                         tuplesort_heap_insert(state, &stup, 0, false);
2621
2622                         /* If heap too full, discard largest entry */
2623                         if (state->memtupcount > state->bound)
2624                         {
2625                                 free_sort_tuple(state, &state->memtuples[0]);
2626                                 tuplesort_heap_siftup(state, false);
2627                         }
2628                 }
2629         }
2630
2631         Assert(state->memtupcount == state->bound);
2632         state->status = TSS_BOUNDED;
2633 }
2634
2635 /*
2636  * Convert the bounded heap to a properly-sorted array
2637  */
2638 static void
2639 sort_bounded_heap(Tuplesortstate *state)
2640 {
2641         int                     tupcount = state->memtupcount;
2642
2643         Assert(state->status == TSS_BOUNDED);
2644         Assert(state->bounded);
2645         Assert(tupcount == state->bound);
2646
2647         /*
2648          * We can unheapify in place because each sift-up will remove the largest
2649          * entry, which we can promptly store in the newly freed slot at the end.
2650          * Once we're down to a single-entry heap, we're done.
2651          */
2652         while (state->memtupcount > 1)
2653         {
2654                 SortTuple       stup = state->memtuples[0];
2655
2656                 /* this sifts-up the next-largest entry and decreases memtupcount */
2657                 tuplesort_heap_siftup(state, false);
2658                 state->memtuples[state->memtupcount] = stup;
2659         }
2660         state->memtupcount = tupcount;
2661
2662         /*
2663          * Reverse sort direction back to the original state.  This is not
2664          * actually necessary but seems like a good idea for tidiness.
2665          */
2666         REVERSEDIRECTION(state);
2667
2668         state->status = TSS_SORTEDINMEM;
2669         state->boundUsed = true;
2670 }
2671
2672 /*
2673  * Insert a new tuple into an empty or existing heap, maintaining the
2674  * heap invariant.  Caller is responsible for ensuring there's room.
2675  *
2676  * Note: we assume *tuple is a temporary variable that can be scribbled on.
2677  * For some callers, tuple actually points to a memtuples[] entry above the
2678  * end of the heap.  This is safe as long as it's not immediately adjacent
2679  * to the end of the heap (ie, in the [memtupcount] array entry) --- if it
2680  * is, it might get overwritten before being moved into the heap!
2681  */
2682 static void
2683 tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple,
2684                                           int tupleindex, bool checkIndex)
2685 {
2686         SortTuple  *memtuples;
2687         int                     j;
2688
2689         /*
2690          * Save the tupleindex --- see notes above about writing on *tuple. It's a
2691          * historical artifact that tupleindex is passed as a separate argument
2692          * and not in *tuple, but it's notationally convenient so let's leave it
2693          * that way.
2694          */
2695         tuple->tupindex = tupleindex;
2696
2697         memtuples = state->memtuples;
2698         Assert(state->memtupcount < state->memtupsize);
2699
2700         CHECK_FOR_INTERRUPTS();
2701
2702         /*
2703          * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is
2704          * using 1-based array indexes, not 0-based.
2705          */
2706         j = state->memtupcount++;
2707         while (j > 0)
2708         {
2709                 int                     i = (j - 1) >> 1;
2710
2711                 if (HEAPCOMPARE(tuple, &memtuples[i]) >= 0)
2712                         break;
2713                 memtuples[j] = memtuples[i];
2714                 j = i;
2715         }
2716         memtuples[j] = *tuple;
2717 }
2718
2719 /*
2720  * The tuple at state->memtuples[0] has been removed from the heap.
2721  * Decrement memtupcount, and sift up to maintain the heap invariant.
2722  */
2723 static void
2724 tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex)
2725 {
2726         SortTuple  *memtuples = state->memtuples;
2727         SortTuple  *tuple;
2728         int                     i,
2729                                 n;
2730
2731         if (--state->memtupcount <= 0)
2732                 return;
2733
2734         CHECK_FOR_INTERRUPTS();
2735
2736         n = state->memtupcount;
2737         tuple = &memtuples[n];          /* tuple that must be reinserted */
2738         i = 0;                                          /* i is where the "hole" is */
2739         for (;;)
2740         {
2741                 int                     j = 2 * i + 1;
2742
2743                 if (j >= n)
2744                         break;
2745                 if (j + 1 < n &&
2746                         HEAPCOMPARE(&memtuples[j], &memtuples[j + 1]) > 0)
2747                         j++;
2748                 if (HEAPCOMPARE(tuple, &memtuples[j]) <= 0)
2749                         break;
2750                 memtuples[i] = memtuples[j];
2751                 i = j;
2752         }
2753         memtuples[i] = *tuple;
2754 }
2755
2756
2757 /*
2758  * Tape interface routines
2759  */
2760
2761 static unsigned int
2762 getlen(Tuplesortstate *state, int tapenum, bool eofOK)
2763 {
2764         unsigned int len;
2765
2766         if (LogicalTapeRead(state->tapeset, tapenum,
2767                                                 &len, sizeof(len)) != sizeof(len))
2768                 elog(ERROR, "unexpected end of tape");
2769         if (len == 0 && !eofOK)
2770                 elog(ERROR, "unexpected end of data");
2771         return len;
2772 }
2773
2774 static void
2775 markrunend(Tuplesortstate *state, int tapenum)
2776 {
2777         unsigned int len = 0;
2778
2779         LogicalTapeWrite(state->tapeset, tapenum, (void *) &len, sizeof(len));
2780 }
2781
2782
2783 /*
2784  * Inline-able copy of FunctionCall2Coll() to save some cycles in sorting.
2785  */
2786 static inline Datum
2787 myFunctionCall2Coll(FmgrInfo *flinfo, Oid collation, Datum arg1, Datum arg2)
2788 {
2789         FunctionCallInfoData fcinfo;
2790         Datum           result;
2791
2792         InitFunctionCallInfoData(fcinfo, flinfo, 2, collation, NULL, NULL);
2793
2794         fcinfo.arg[0] = arg1;
2795         fcinfo.arg[1] = arg2;
2796         fcinfo.argnull[0] = false;
2797         fcinfo.argnull[1] = false;
2798
2799         result = FunctionCallInvoke(&fcinfo);
2800
2801         /* Check for null result, since caller is clearly not expecting one */
2802         if (fcinfo.isnull)
2803                 elog(ERROR, "function %u returned NULL", fcinfo.flinfo->fn_oid);
2804
2805         return result;
2806 }
2807
2808 /*
2809  * Apply a sort function (by now converted to fmgr lookup form)
2810  * and return a 3-way comparison result.  This takes care of handling
2811  * reverse-sort and NULLs-ordering properly.  We assume that DESC and
2812  * NULLS_FIRST options are encoded in sk_flags the same way btree does it.
2813  */
2814 static inline int32
2815 inlineApplySortFunction(FmgrInfo *sortFunction, int sk_flags, Oid collation,
2816                                                 Datum datum1, bool isNull1,
2817                                                 Datum datum2, bool isNull2)
2818 {
2819         int32           compare;
2820
2821         if (isNull1)
2822         {
2823                 if (isNull2)
2824                         compare = 0;            /* NULL "=" NULL */
2825                 else if (sk_flags & SK_BT_NULLS_FIRST)
2826                         compare = -1;           /* NULL "<" NOT_NULL */
2827                 else
2828                         compare = 1;            /* NULL ">" NOT_NULL */
2829         }
2830         else if (isNull2)
2831         {
2832                 if (sk_flags & SK_BT_NULLS_FIRST)
2833                         compare = 1;            /* NOT_NULL ">" NULL */
2834                 else
2835                         compare = -1;           /* NOT_NULL "<" NULL */
2836         }
2837         else
2838         {
2839                 compare = DatumGetInt32(myFunctionCall2Coll(sortFunction, collation,
2840                                                                                                         datum1, datum2));
2841
2842                 if (sk_flags & SK_BT_DESC)
2843                         compare = -compare;
2844         }
2845
2846         return compare;
2847 }
2848
2849
2850 /*
2851  * Routines specialized for HeapTuple (actually MinimalTuple) case
2852  */
2853
2854 static int
2855 comparetup_heap(const SortTuple *a, const SortTuple *b, Tuplesortstate *state)
2856 {
2857         SortSupport sortKey = state->sortKeys;
2858         HeapTupleData ltup;
2859         HeapTupleData rtup;
2860         TupleDesc       tupDesc;
2861         int                     nkey;
2862         int32           compare;
2863
2864         /* Compare the leading sort key */
2865         compare = ApplySortComparator(a->datum1, a->isnull1,
2866                                                                   b->datum1, b->isnull1,
2867                                                                   sortKey);
2868         if (compare != 0)
2869                 return compare;
2870
2871         /* Compare additional sort keys */
2872         ltup.t_len = ((MinimalTuple) a->tuple)->t_len + MINIMAL_TUPLE_OFFSET;
2873         ltup.t_data = (HeapTupleHeader) ((char *) a->tuple - MINIMAL_TUPLE_OFFSET);
2874         rtup.t_len = ((MinimalTuple) b->tuple)->t_len + MINIMAL_TUPLE_OFFSET;
2875         rtup.t_data = (HeapTupleHeader) ((char *) b->tuple - MINIMAL_TUPLE_OFFSET);
2876         tupDesc = state->tupDesc;
2877         sortKey++;
2878         for (nkey = 1; nkey < state->nKeys; nkey++, sortKey++)
2879         {
2880                 AttrNumber      attno = sortKey->ssup_attno;
2881                 Datum           datum1,
2882                                         datum2;
2883                 bool            isnull1,
2884                                         isnull2;
2885
2886                 datum1 = heap_getattr(&ltup, attno, tupDesc, &isnull1);
2887                 datum2 = heap_getattr(&rtup, attno, tupDesc, &isnull2);
2888
2889                 compare = ApplySortComparator(datum1, isnull1,
2890                                                                           datum2, isnull2,
2891                                                                           sortKey);
2892                 if (compare != 0)
2893                         return compare;
2894         }
2895
2896         return 0;
2897 }
2898
2899 static void
2900 copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup)
2901 {
2902         /*
2903          * We expect the passed "tup" to be a TupleTableSlot, and form a
2904          * MinimalTuple using the exported interface for that.
2905          */
2906         TupleTableSlot *slot = (TupleTableSlot *) tup;
2907         MinimalTuple tuple;
2908         HeapTupleData htup;
2909
2910         /* copy the tuple into sort storage */
2911         tuple = ExecCopySlotMinimalTuple(slot);
2912         stup->tuple = (void *) tuple;
2913         USEMEM(state, GetMemoryChunkSpace(tuple));
2914         /* set up first-column key value */
2915         htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET;
2916         htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET);
2917         stup->datum1 = heap_getattr(&htup,
2918                                                                 state->sortKeys[0].ssup_attno,
2919                                                                 state->tupDesc,
2920                                                                 &stup->isnull1);
2921 }
2922
2923 static void
2924 writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup)
2925 {
2926         MinimalTuple tuple = (MinimalTuple) stup->tuple;
2927
2928         /* the part of the MinimalTuple we'll write: */
2929         char       *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
2930         unsigned int tupbodylen = tuple->t_len - MINIMAL_TUPLE_DATA_OFFSET;
2931
2932         /* total on-disk footprint: */
2933         unsigned int tuplen = tupbodylen + sizeof(int);
2934
2935         LogicalTapeWrite(state->tapeset, tapenum,
2936                                          (void *) &tuplen, sizeof(tuplen));
2937         LogicalTapeWrite(state->tapeset, tapenum,
2938                                          (void *) tupbody, tupbodylen);
2939         if (state->randomAccess)        /* need trailing length word? */
2940                 LogicalTapeWrite(state->tapeset, tapenum,
2941                                                  (void *) &tuplen, sizeof(tuplen));
2942
2943         FREEMEM(state, GetMemoryChunkSpace(tuple));
2944         heap_free_minimal_tuple(tuple);
2945 }
2946
2947 static void
2948 readtup_heap(Tuplesortstate *state, SortTuple *stup,
2949                          int tapenum, unsigned int len)
2950 {
2951         unsigned int tupbodylen = len - sizeof(int);
2952         unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET;
2953         MinimalTuple tuple = (MinimalTuple) palloc(tuplen);
2954         char       *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
2955         HeapTupleData htup;
2956
2957         USEMEM(state, GetMemoryChunkSpace(tuple));
2958         /* read in the tuple proper */
2959         tuple->t_len = tuplen;
2960         LogicalTapeReadExact(state->tapeset, tapenum,
2961                                                  tupbody, tupbodylen);
2962         if (state->randomAccess)        /* need trailing length word? */
2963                 LogicalTapeReadExact(state->tapeset, tapenum,
2964                                                          &tuplen, sizeof(tuplen));
2965         stup->tuple = (void *) tuple;
2966         /* set up first-column key value */
2967         htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET;
2968         htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET);
2969         stup->datum1 = heap_getattr(&htup,
2970                                                                 state->sortKeys[0].ssup_attno,
2971                                                                 state->tupDesc,
2972                                                                 &stup->isnull1);
2973 }
2974
2975 static void
2976 reversedirection_heap(Tuplesortstate *state)
2977 {
2978         SortSupport sortKey = state->sortKeys;
2979         int                     nkey;
2980
2981         for (nkey = 0; nkey < state->nKeys; nkey++, sortKey++)
2982         {
2983                 sortKey->ssup_reverse = !sortKey->ssup_reverse;
2984                 sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first;
2985         }
2986 }
2987
2988
2989 /*
2990  * Routines specialized for the CLUSTER case (HeapTuple data, with
2991  * comparisons per a btree index definition)
2992  */
2993
2994 static int
2995 comparetup_cluster(const SortTuple *a, const SortTuple *b,
2996                                    Tuplesortstate *state)
2997 {
2998         ScanKey         scanKey = state->indexScanKey;
2999         HeapTuple       ltup;
3000         HeapTuple       rtup;
3001         TupleDesc       tupDesc;
3002         int                     nkey;
3003         int32           compare;
3004
3005         /* Compare the leading sort key, if it's simple */
3006         if (state->indexInfo->ii_KeyAttrNumbers[0] != 0)
3007         {
3008                 compare = inlineApplySortFunction(&scanKey->sk_func, scanKey->sk_flags,
3009                                                                                   scanKey->sk_collation,
3010                                                                                   a->datum1, a->isnull1,
3011                                                                                   b->datum1, b->isnull1);
3012                 if (compare != 0 || state->nKeys == 1)
3013                         return compare;
3014                 /* Compare additional columns the hard way */
3015                 scanKey++;
3016                 nkey = 1;
3017         }
3018         else
3019         {
3020                 /* Must compare all keys the hard way */
3021                 nkey = 0;
3022         }
3023
3024         /* Compare additional sort keys */
3025         ltup = (HeapTuple) a->tuple;
3026         rtup = (HeapTuple) b->tuple;
3027
3028         if (state->indexInfo->ii_Expressions == NULL)
3029         {
3030                 /* If not expression index, just compare the proper heap attrs */
3031                 tupDesc = state->tupDesc;
3032
3033                 for (; nkey < state->nKeys; nkey++, scanKey++)
3034                 {
3035                         AttrNumber      attno = state->indexInfo->ii_KeyAttrNumbers[nkey];
3036                         Datum           datum1,
3037                                                 datum2;
3038                         bool            isnull1,
3039                                                 isnull2;
3040
3041                         datum1 = heap_getattr(ltup, attno, tupDesc, &isnull1);
3042                         datum2 = heap_getattr(rtup, attno, tupDesc, &isnull2);
3043
3044                         compare = inlineApplySortFunction(&scanKey->sk_func,
3045                                                                                           scanKey->sk_flags,
3046                                                                                           scanKey->sk_collation,
3047                                                                                           datum1, isnull1,
3048                                                                                           datum2, isnull2);
3049                         if (compare != 0)
3050                                 return compare;
3051                 }
3052         }
3053         else
3054         {
3055                 /*
3056                  * In the expression index case, compute the whole index tuple and
3057                  * then compare values.  It would perhaps be faster to compute only as
3058                  * many columns as we need to compare, but that would require
3059                  * duplicating all the logic in FormIndexDatum.
3060                  */
3061                 Datum           l_index_values[INDEX_MAX_KEYS];
3062                 bool            l_index_isnull[INDEX_MAX_KEYS];
3063                 Datum           r_index_values[INDEX_MAX_KEYS];
3064                 bool            r_index_isnull[INDEX_MAX_KEYS];
3065                 TupleTableSlot *ecxt_scantuple;
3066
3067                 /* Reset context each time to prevent memory leakage */
3068                 ResetPerTupleExprContext(state->estate);
3069
3070                 ecxt_scantuple = GetPerTupleExprContext(state->estate)->ecxt_scantuple;
3071
3072                 ExecStoreTuple(ltup, ecxt_scantuple, InvalidBuffer, false);
3073                 FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate,
3074                                            l_index_values, l_index_isnull);
3075
3076                 ExecStoreTuple(rtup, ecxt_scantuple, InvalidBuffer, false);
3077                 FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate,
3078                                            r_index_values, r_index_isnull);
3079
3080                 for (; nkey < state->nKeys; nkey++, scanKey++)
3081                 {
3082                         compare = inlineApplySortFunction(&scanKey->sk_func,
3083                                                                                           scanKey->sk_flags,
3084                                                                                           scanKey->sk_collation,
3085                                                                                           l_index_values[nkey],
3086                                                                                           l_index_isnull[nkey],
3087                                                                                           r_index_values[nkey],
3088                                                                                           r_index_isnull[nkey]);
3089                         if (compare != 0)
3090                                 return compare;
3091                 }
3092         }
3093
3094         return 0;
3095 }
3096
3097 static void
3098 copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup)
3099 {
3100         HeapTuple       tuple = (HeapTuple) tup;
3101
3102         /* copy the tuple into sort storage */
3103         tuple = heap_copytuple(tuple);
3104         stup->tuple = (void *) tuple;
3105         USEMEM(state, GetMemoryChunkSpace(tuple));
3106         /* set up first-column key value, if it's a simple column */
3107         if (state->indexInfo->ii_KeyAttrNumbers[0] != 0)
3108                 stup->datum1 = heap_getattr(tuple,
3109                                                                         state->indexInfo->ii_KeyAttrNumbers[0],
3110                                                                         state->tupDesc,
3111                                                                         &stup->isnull1);
3112 }
3113
3114 static void
3115 writetup_cluster(Tuplesortstate *state, int tapenum, SortTuple *stup)
3116 {
3117         HeapTuple       tuple = (HeapTuple) stup->tuple;
3118         unsigned int tuplen = tuple->t_len + sizeof(ItemPointerData) + sizeof(int);
3119
3120         /* We need to store t_self, but not other fields of HeapTupleData */
3121         LogicalTapeWrite(state->tapeset, tapenum,
3122                                          &tuplen, sizeof(tuplen));
3123         LogicalTapeWrite(state->tapeset, tapenum,
3124                                          &tuple->t_self, sizeof(ItemPointerData));
3125         LogicalTapeWrite(state->tapeset, tapenum,
3126                                          tuple->t_data, tuple->t_len);
3127         if (state->randomAccess)        /* need trailing length word? */
3128                 LogicalTapeWrite(state->tapeset, tapenum,
3129                                                  &tuplen, sizeof(tuplen));
3130
3131         FREEMEM(state, GetMemoryChunkSpace(tuple));
3132         heap_freetuple(tuple);
3133 }
3134
3135 static void
3136 readtup_cluster(Tuplesortstate *state, SortTuple *stup,
3137                                 int tapenum, unsigned int tuplen)
3138 {
3139         unsigned int t_len = tuplen - sizeof(ItemPointerData) - sizeof(int);
3140         HeapTuple       tuple = (HeapTuple) palloc(t_len + HEAPTUPLESIZE);
3141
3142         USEMEM(state, GetMemoryChunkSpace(tuple));
3143         /* Reconstruct the HeapTupleData header */
3144         tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
3145         tuple->t_len = t_len;
3146         LogicalTapeReadExact(state->tapeset, tapenum,
3147                                                  &tuple->t_self, sizeof(ItemPointerData));
3148         /* We don't currently bother to reconstruct t_tableOid */
3149         tuple->t_tableOid = InvalidOid;
3150         /* Read in the tuple body */
3151         LogicalTapeReadExact(state->tapeset, tapenum,
3152                                                  tuple->t_data, tuple->t_len);
3153         if (state->randomAccess)        /* need trailing length word? */
3154                 LogicalTapeReadExact(state->tapeset, tapenum,
3155                                                          &tuplen, sizeof(tuplen));
3156         stup->tuple = (void *) tuple;
3157         /* set up first-column key value, if it's a simple column */
3158         if (state->indexInfo->ii_KeyAttrNumbers[0] != 0)
3159                 stup->datum1 = heap_getattr(tuple,
3160                                                                         state->indexInfo->ii_KeyAttrNumbers[0],
3161                                                                         state->tupDesc,
3162                                                                         &stup->isnull1);
3163 }
3164
3165
3166 /*
3167  * Routines specialized for IndexTuple case
3168  *
3169  * The btree and hash cases require separate comparison functions, but the
3170  * IndexTuple representation is the same so the copy/write/read support
3171  * functions can be shared.
3172  */
3173
3174 static int
3175 comparetup_index_btree(const SortTuple *a, const SortTuple *b,
3176                                            Tuplesortstate *state)
3177 {
3178         /*
3179          * This is similar to comparetup_heap(), but expects index tuples.  There
3180          * is also special handling for enforcing uniqueness, and special treatment
3181          * for equal keys at the end.
3182          */
3183         ScanKey         scanKey = state->indexScanKey;
3184         IndexTuple      tuple1;
3185         IndexTuple      tuple2;
3186         int                     keysz;
3187         TupleDesc       tupDes;
3188         bool            equal_hasnull = false;
3189         int                     nkey;
3190         int32           compare;
3191
3192         /* Compare the leading sort key */
3193         compare = inlineApplySortFunction(&scanKey->sk_func, scanKey->sk_flags,
3194                                                                           scanKey->sk_collation,
3195                                                                           a->datum1, a->isnull1,
3196                                                                           b->datum1, b->isnull1);
3197         if (compare != 0)
3198                 return compare;
3199
3200         /* they are equal, so we only need to examine one null flag */
3201         if (a->isnull1)
3202                 equal_hasnull = true;
3203
3204         /* Compare additional sort keys */
3205         tuple1 = (IndexTuple) a->tuple;
3206         tuple2 = (IndexTuple) b->tuple;
3207         keysz = state->nKeys;
3208         tupDes = RelationGetDescr(state->indexRel);
3209         scanKey++;
3210         for (nkey = 2; nkey <= keysz; nkey++, scanKey++)
3211         {
3212                 Datum           datum1,
3213                                         datum2;
3214                 bool            isnull1,
3215                                         isnull2;
3216
3217                 datum1 = index_getattr(tuple1, nkey, tupDes, &isnull1);
3218                 datum2 = index_getattr(tuple2, nkey, tupDes, &isnull2);
3219
3220                 compare = inlineApplySortFunction(&scanKey->sk_func, scanKey->sk_flags,
3221                                                                                   scanKey->sk_collation,
3222                                                                                   datum1, isnull1,
3223                                                                                   datum2, isnull2);
3224                 if (compare != 0)
3225                         return compare;         /* done when we find unequal attributes */
3226
3227                 /* they are equal, so we only need to examine one null flag */
3228                 if (isnull1)
3229                         equal_hasnull = true;
3230         }
3231
3232         /*
3233          * If btree has asked us to enforce uniqueness, complain if two equal
3234          * tuples are detected (unless there was at least one NULL field).
3235          *
3236          * It is sufficient to make the test here, because if two tuples are equal
3237          * they *must* get compared at some stage of the sort --- otherwise the
3238          * sort algorithm wouldn't have checked whether one must appear before the
3239          * other.
3240          */
3241         if (state->enforceUnique && !equal_hasnull)
3242         {
3243                 Datum           values[INDEX_MAX_KEYS];
3244                 bool            isnull[INDEX_MAX_KEYS];
3245
3246                 /*
3247                  * Some rather brain-dead implementations of qsort (such as the one in
3248                  * QNX 4) will sometimes call the comparison routine to compare a
3249                  * value to itself, but we always use our own implementation, which
3250                  * does not.
3251                  */
3252                 Assert(tuple1 != tuple2);
3253
3254                 index_deform_tuple(tuple1, tupDes, values, isnull);
3255                 ereport(ERROR,
3256                                 (errcode(ERRCODE_UNIQUE_VIOLATION),
3257                                  errmsg("could not create unique index \"%s\"",
3258                                                 RelationGetRelationName(state->indexRel)),
3259                                  errdetail("Key %s is duplicated.",
3260                                                    BuildIndexValueDescription(state->indexRel,
3261                                                                                                           values, isnull)),
3262                                  errtableconstraint(state->heapRel,
3263                                                                  RelationGetRelationName(state->indexRel))));
3264         }
3265
3266         /*
3267          * If key values are equal, we sort on ItemPointer.  This does not affect
3268          * validity of the finished index, but it may be useful to have index
3269          * scans in physical order.
3270          */
3271         {
3272                 BlockNumber blk1 = ItemPointerGetBlockNumber(&tuple1->t_tid);
3273                 BlockNumber blk2 = ItemPointerGetBlockNumber(&tuple2->t_tid);
3274
3275                 if (blk1 != blk2)
3276                         return (blk1 < blk2) ? -1 : 1;
3277         }
3278         {
3279                 OffsetNumber pos1 = ItemPointerGetOffsetNumber(&tuple1->t_tid);
3280                 OffsetNumber pos2 = ItemPointerGetOffsetNumber(&tuple2->t_tid);
3281
3282                 if (pos1 != pos2)
3283                         return (pos1 < pos2) ? -1 : 1;
3284         }
3285
3286         return 0;
3287 }
3288
3289 static int
3290 comparetup_index_hash(const SortTuple *a, const SortTuple *b,
3291                                           Tuplesortstate *state)
3292 {
3293         uint32          hash1;
3294         uint32          hash2;
3295         IndexTuple      tuple1;
3296         IndexTuple      tuple2;
3297
3298         /*
3299          * Fetch hash keys and mask off bits we don't want to sort by. We know
3300          * that the first column of the index tuple is the hash key.
3301          */
3302         Assert(!a->isnull1);
3303         hash1 = DatumGetUInt32(a->datum1) & state->hash_mask;
3304         Assert(!b->isnull1);
3305         hash2 = DatumGetUInt32(b->datum1) & state->hash_mask;
3306
3307         if (hash1 > hash2)
3308                 return 1;
3309         else if (hash1 < hash2)
3310                 return -1;
3311
3312         /*
3313          * If hash values are equal, we sort on ItemPointer.  This does not affect
3314          * validity of the finished index, but it may be useful to have index
3315          * scans in physical order.
3316          */
3317         tuple1 = (IndexTuple) a->tuple;
3318         tuple2 = (IndexTuple) b->tuple;
3319
3320         {
3321                 BlockNumber blk1 = ItemPointerGetBlockNumber(&tuple1->t_tid);
3322                 BlockNumber blk2 = ItemPointerGetBlockNumber(&tuple2->t_tid);
3323
3324                 if (blk1 != blk2)
3325                         return (blk1 < blk2) ? -1 : 1;
3326         }
3327         {
3328                 OffsetNumber pos1 = ItemPointerGetOffsetNumber(&tuple1->t_tid);
3329                 OffsetNumber pos2 = ItemPointerGetOffsetNumber(&tuple2->t_tid);
3330
3331                 if (pos1 != pos2)
3332                         return (pos1 < pos2) ? -1 : 1;
3333         }
3334
3335         return 0;
3336 }
3337
3338 static void
3339 copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup)
3340 {
3341         IndexTuple      tuple = (IndexTuple) tup;
3342         unsigned int tuplen = IndexTupleSize(tuple);
3343         IndexTuple      newtuple;
3344
3345         /* copy the tuple into sort storage */
3346         newtuple = (IndexTuple) palloc(tuplen);
3347         memcpy(newtuple, tuple, tuplen);
3348         USEMEM(state, GetMemoryChunkSpace(newtuple));
3349         stup->tuple = (void *) newtuple;
3350         /* set up first-column key value */
3351         stup->datum1 = index_getattr(newtuple,
3352                                                                  1,
3353                                                                  RelationGetDescr(state->indexRel),
3354                                                                  &stup->isnull1);
3355 }
3356
3357 static void
3358 writetup_index(Tuplesortstate *state, int tapenum, SortTuple *stup)
3359 {
3360         IndexTuple      tuple = (IndexTuple) stup->tuple;
3361         unsigned int tuplen;
3362
3363         tuplen = IndexTupleSize(tuple) + sizeof(tuplen);
3364         LogicalTapeWrite(state->tapeset, tapenum,
3365                                          (void *) &tuplen, sizeof(tuplen));
3366         LogicalTapeWrite(state->tapeset, tapenum,
3367                                          (void *) tuple, IndexTupleSize(tuple));
3368         if (state->randomAccess)        /* need trailing length word? */
3369                 LogicalTapeWrite(state->tapeset, tapenum,
3370                                                  (void *) &tuplen, sizeof(tuplen));
3371
3372         FREEMEM(state, GetMemoryChunkSpace(tuple));
3373         pfree(tuple);
3374 }
3375
3376 static void
3377 readtup_index(Tuplesortstate *state, SortTuple *stup,
3378                           int tapenum, unsigned int len)
3379 {
3380         unsigned int tuplen = len - sizeof(unsigned int);
3381         IndexTuple      tuple = (IndexTuple) palloc(tuplen);
3382
3383         USEMEM(state, GetMemoryChunkSpace(tuple));
3384         LogicalTapeReadExact(state->tapeset, tapenum,
3385                                                  tuple, tuplen);
3386         if (state->randomAccess)        /* need trailing length word? */
3387                 LogicalTapeReadExact(state->tapeset, tapenum,
3388                                                          &tuplen, sizeof(tuplen));
3389         stup->tuple = (void *) tuple;
3390         /* set up first-column key value */
3391         stup->datum1 = index_getattr(tuple,
3392                                                                  1,
3393                                                                  RelationGetDescr(state->indexRel),
3394                                                                  &stup->isnull1);
3395 }
3396
3397 static void
3398 reversedirection_index_btree(Tuplesortstate *state)
3399 {
3400         ScanKey         scanKey = state->indexScanKey;
3401         int                     nkey;
3402
3403         for (nkey = 0; nkey < state->nKeys; nkey++, scanKey++)
3404         {
3405                 scanKey->sk_flags ^= (SK_BT_DESC | SK_BT_NULLS_FIRST);
3406         }
3407 }
3408
3409 static void
3410 reversedirection_index_hash(Tuplesortstate *state)
3411 {
3412         /* We don't support reversing direction in a hash index sort */
3413         elog(ERROR, "reversedirection_index_hash is not implemented");
3414 }
3415
3416
3417 /*
3418  * Routines specialized for DatumTuple case
3419  */
3420
3421 static int
3422 comparetup_datum(const SortTuple *a, const SortTuple *b, Tuplesortstate *state)
3423 {
3424         return ApplySortComparator(a->datum1, a->isnull1,
3425                                                            b->datum1, b->isnull1,
3426                                                            state->onlyKey);
3427 }
3428
3429 static void
3430 copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup)
3431 {
3432         /* Not currently needed */
3433         elog(ERROR, "copytup_datum() should not be called");
3434 }
3435
3436 static void
3437 writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup)
3438 {
3439         void       *waddr;
3440         unsigned int tuplen;
3441         unsigned int writtenlen;
3442
3443         if (stup->isnull1)
3444         {
3445                 waddr = NULL;
3446                 tuplen = 0;
3447         }
3448         else if (state->datumTypeByVal)
3449         {
3450                 waddr = &stup->datum1;
3451                 tuplen = sizeof(Datum);
3452         }
3453         else
3454         {
3455                 waddr = DatumGetPointer(stup->datum1);
3456                 tuplen = datumGetSize(stup->datum1, false, state->datumTypeLen);
3457                 Assert(tuplen != 0);
3458         }
3459
3460         writtenlen = tuplen + sizeof(unsigned int);
3461
3462         LogicalTapeWrite(state->tapeset, tapenum,
3463                                          (void *) &writtenlen, sizeof(writtenlen));
3464         LogicalTapeWrite(state->tapeset, tapenum,
3465                                          waddr, tuplen);
3466         if (state->randomAccess)        /* need trailing length word? */
3467                 LogicalTapeWrite(state->tapeset, tapenum,
3468                                                  (void *) &writtenlen, sizeof(writtenlen));
3469
3470         if (stup->tuple)
3471         {
3472                 FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
3473                 pfree(stup->tuple);
3474         }
3475 }
3476
3477 static void
3478 readtup_datum(Tuplesortstate *state, SortTuple *stup,
3479                           int tapenum, unsigned int len)
3480 {
3481         unsigned int tuplen = len - sizeof(unsigned int);
3482
3483         if (tuplen == 0)
3484         {
3485                 /* it's NULL */
3486                 stup->datum1 = (Datum) 0;
3487                 stup->isnull1 = true;
3488                 stup->tuple = NULL;
3489         }
3490         else if (state->datumTypeByVal)
3491         {
3492                 Assert(tuplen == sizeof(Datum));
3493                 LogicalTapeReadExact(state->tapeset, tapenum,
3494                                                          &stup->datum1, tuplen);
3495                 stup->isnull1 = false;
3496                 stup->tuple = NULL;
3497         }
3498         else
3499         {
3500                 void       *raddr = palloc(tuplen);
3501
3502                 LogicalTapeReadExact(state->tapeset, tapenum,
3503                                                          raddr, tuplen);
3504                 stup->datum1 = PointerGetDatum(raddr);
3505                 stup->isnull1 = false;
3506                 stup->tuple = raddr;
3507                 USEMEM(state, GetMemoryChunkSpace(raddr));
3508         }
3509
3510         if (state->randomAccess)        /* need trailing length word? */
3511                 LogicalTapeReadExact(state->tapeset, tapenum,
3512                                                          &tuplen, sizeof(tuplen));
3513 }
3514
3515 static void
3516 reversedirection_datum(Tuplesortstate *state)
3517 {
3518         state->onlyKey->ssup_reverse = !state->onlyKey->ssup_reverse;
3519         state->onlyKey->ssup_nulls_first = !state->onlyKey->ssup_nulls_first;
3520 }
3521
3522 /*
3523  * Convenience routine to free a tuple previously loaded into sort memory
3524  */
3525 static void
3526 free_sort_tuple(Tuplesortstate *state, SortTuple *stup)
3527 {
3528         FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
3529         pfree(stup->tuple);
3530 }