]> granicus.if.org Git - postgresql/blob - src/backend/utils/sort/tuplesort.c
pgindent run for 8.2.
[postgresql] / src / backend / utils / sort / tuplesort.c
1 /*-------------------------------------------------------------------------
2  *
3  * tuplesort.c
4  *        Generalized tuple sorting routines.
5  *
6  * This module handles sorting of heap tuples, index tuples, or single
7  * Datums (and could easily support other kinds of sortable objects,
8  * if necessary).  It works efficiently for both small and large amounts
9  * of data.  Small amounts are sorted in-memory using qsort().  Large
10  * amounts are sorted using temporary files and a standard external sort
11  * algorithm.
12  *
13  * See Knuth, volume 3, for more than you want to know about the external
14  * sorting algorithm.  We divide the input into sorted runs using replacement
15  * selection, in the form of a priority tree implemented as a heap
16  * (essentially his Algorithm 5.2.3H), then merge the runs using polyphase
17  * merge, Knuth's Algorithm 5.4.2D.  The logical "tapes" used by Algorithm D
18  * are implemented by logtape.c, which avoids space wastage by recycling
19  * disk space as soon as each block is read from its "tape".
20  *
21  * We do not form the initial runs using Knuth's recommended replacement
22  * selection data structure (Algorithm 5.4.1R), because it uses a fixed
23  * number of records in memory at all times.  Since we are dealing with
24  * tuples that may vary considerably in size, we want to be able to vary
25  * the number of records kept in memory to ensure full utilization of the
26  * allowed sort memory space.  So, we keep the tuples in a variable-size
27  * heap, with the next record to go out at the top of the heap.  Like
28  * Algorithm 5.4.1R, each record is stored with the run number that it
29  * must go into, and we use (run number, key) as the ordering key for the
30  * heap.  When the run number at the top of the heap changes, we know that
31  * no more records of the prior run are left in the heap.
32  *
33  * The approximate amount of memory allowed for any one sort operation
34  * is specified in kilobytes by the caller (most pass work_mem).  Initially,
35  * we absorb tuples and simply store them in an unsorted array as long as
36  * we haven't exceeded workMem.  If we reach the end of the input without
37  * exceeding workMem, we sort the array using qsort() and subsequently return
38  * tuples just by scanning the tuple array sequentially.  If we do exceed
39  * workMem, we construct a heap using Algorithm H and begin to emit tuples
40  * into sorted runs in temporary tapes, emitting just enough tuples at each
41  * step to get back within the workMem limit.  Whenever the run number at
42  * the top of the heap changes, we begin a new run with a new output tape
43  * (selected per Algorithm D).  After the end of the input is reached,
44  * we dump out remaining tuples in memory into a final run (or two),
45  * then merge the runs using Algorithm D.
46  *
47  * When merging runs, we use a heap containing just the frontmost tuple from
48  * each source run; we repeatedly output the smallest tuple and insert the
49  * next tuple from its source tape (if any).  When the heap empties, the merge
50  * is complete.  The basic merge algorithm thus needs very little memory ---
51  * only M tuples for an M-way merge, and M is constrained to a small number.
52  * However, we can still make good use of our full workMem allocation by
53  * pre-reading additional tuples from each source tape.  Without prereading,
54  * our access pattern to the temporary file would be very erratic; on average
55  * we'd read one block from each of M source tapes during the same time that
56  * we're writing M blocks to the output tape, so there is no sequentiality of
57  * access at all, defeating the read-ahead methods used by most Unix kernels.
58  * Worse, the output tape gets written into a very random sequence of blocks
59  * of the temp file, ensuring that things will be even worse when it comes
60  * time to read that tape.      A straightforward merge pass thus ends up doing a
61  * lot of waiting for disk seeks.  We can improve matters by prereading from
62  * each source tape sequentially, loading about workMem/M bytes from each tape
63  * in turn.  Then we run the merge algorithm, writing but not reading until
64  * one of the preloaded tuple series runs out.  Then we switch back to preread
65  * mode, fill memory again, and repeat.  This approach helps to localize both
66  * read and write accesses.
67  *
68  * When the caller requests random access to the sort result, we form
69  * the final sorted run on a logical tape which is then "frozen", so
70  * that we can access it randomly.      When the caller does not need random
71  * access, we return from tuplesort_performsort() as soon as we are down
72  * to one run per logical tape.  The final merge is then performed
73  * on-the-fly as the caller repeatedly calls tuplesort_getXXX; this
74  * saves one cycle of writing all the data out to disk and reading it in.
75  *
76  * Before Postgres 8.2, we always used a seven-tape polyphase merge, on the
77  * grounds that 7 is the "sweet spot" on the tapes-to-passes curve according
78  * to Knuth's figure 70 (section 5.4.2).  However, Knuth is assuming that
79  * tape drives are expensive beasts, and in particular that there will always
80  * be many more runs than tape drives.  In our implementation a "tape drive"
81  * doesn't cost much more than a few Kb of memory buffers, so we can afford
82  * to have lots of them.  In particular, if we can have as many tape drives
83  * as sorted runs, we can eliminate any repeated I/O at all.  In the current
84  * code we determine the number of tapes M on the basis of workMem: we want
85  * workMem/M to be large enough that we read a fair amount of data each time
86  * we preread from a tape, so as to maintain the locality of access described
87  * above.  Nonetheless, with large workMem we can have many tapes.
88  *
89  *
90  * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
91  * Portions Copyright (c) 1994, Regents of the University of California
92  *
93  * IDENTIFICATION
94  *        $PostgreSQL: pgsql/src/backend/utils/sort/tuplesort.c,v 1.70 2006/10/04 00:30:04 momjian Exp $
95  *
96  *-------------------------------------------------------------------------
97  */
98
99 #include "postgres.h"
100
101 #include "access/heapam.h"
102 #include "access/nbtree.h"
103 #include "catalog/pg_amop.h"
104 #include "catalog/pg_operator.h"
105 #include "miscadmin.h"
106 #include "utils/datum.h"
107 #include "utils/logtape.h"
108 #include "utils/lsyscache.h"
109 #include "utils/memutils.h"
110 #include "utils/pg_rusage.h"
111 #include "utils/syscache.h"
112 #include "utils/tuplesort.h"
113
114
115 /* GUC variable */
116 #ifdef TRACE_SORT
117 bool            trace_sort = false;
118 #endif
119
120
121 /*
122  * The objects we actually sort are SortTuple structs.  These contain
123  * a pointer to the tuple proper (might be a MinimalTuple or IndexTuple),
124  * which is a separate palloc chunk --- we assume it is just one chunk and
125  * can be freed by a simple pfree().  SortTuples also contain the tuple's
126  * first key column in Datum/nullflag format, and an index integer.
127  *
128  * Storing the first key column lets us save heap_getattr or index_getattr
129  * calls during tuple comparisons.      We could extract and save all the key
130  * columns not just the first, but this would increase code complexity and
131  * overhead, and wouldn't actually save any comparison cycles in the common
132  * case where the first key determines the comparison result.  Note that
133  * for a pass-by-reference datatype, datum1 points into the "tuple" storage.
134  *
135  * When sorting single Datums, the data value is represented directly by
136  * datum1/isnull1.      If the datatype is pass-by-reference and isnull1 is false,
137  * then datum1 points to a separately palloc'd data value that is also pointed
138  * to by the "tuple" pointer; otherwise "tuple" is NULL.
139  *
140  * While building initial runs, tupindex holds the tuple's run number.  During
141  * merge passes, we re-use it to hold the input tape number that each tuple in
142  * the heap was read from, or to hold the index of the next tuple pre-read
143  * from the same tape in the case of pre-read entries.  tupindex goes unused
144  * if the sort occurs entirely in memory.
145  */
146 typedef struct
147 {
148         void       *tuple;                      /* the tuple proper */
149         Datum           datum1;                 /* value of first key column */
150         bool            isnull1;                /* is first key column NULL? */
151         int                     tupindex;               /* see notes above */
152 } SortTuple;
153
154
155 /*
156  * Possible states of a Tuplesort object.  These denote the states that
157  * persist between calls of Tuplesort routines.
158  */
159 typedef enum
160 {
161         TSS_INITIAL,                            /* Loading tuples; still within memory limit */
162         TSS_BUILDRUNS,                          /* Loading tuples; writing to tape */
163         TSS_SORTEDINMEM,                        /* Sort completed entirely in memory */
164         TSS_SORTEDONTAPE,                       /* Sort completed, final run is on tape */
165         TSS_FINALMERGE                          /* Performing final merge on-the-fly */
166 } TupSortStatus;
167
168 /*
169  * Parameters for calculation of number of tapes to use --- see inittapes()
170  * and tuplesort_merge_order().
171  *
172  * In this calculation we assume that each tape will cost us about 3 blocks
173  * worth of buffer space (which is an underestimate for very large data
174  * volumes, but it's probably close enough --- see logtape.c).
175  *
176  * MERGE_BUFFER_SIZE is how much data we'd like to read from each input
177  * tape during a preread cycle (see discussion at top of file).
178  */
179 #define MINORDER                6               /* minimum merge order */
180 #define TAPE_BUFFER_OVERHEAD            (BLCKSZ * 3)
181 #define MERGE_BUFFER_SIZE                       (BLCKSZ * 32)
182
183 /*
184  * Private state of a Tuplesort operation.
185  */
186 struct Tuplesortstate
187 {
188         TupSortStatus status;           /* enumerated value as shown above */
189         int                     nKeys;                  /* number of columns in sort key */
190         bool            randomAccess;   /* did caller request random access? */
191         long            availMem;               /* remaining memory available, in bytes */
192         long            allowedMem;             /* total memory allowed, in bytes */
193         int                     maxTapes;               /* number of tapes (Knuth's T) */
194         int                     tapeRange;              /* maxTapes-1 (Knuth's P) */
195         MemoryContext sortcontext;      /* memory context holding all sort data */
196         LogicalTapeSet *tapeset;        /* logtape.c object for tapes in a temp file */
197
198         /*
199          * These function pointers decouple the routines that must know what kind
200          * of tuple we are sorting from the routines that don't need to know it.
201          * They are set up by the tuplesort_begin_xxx routines.
202          *
203          * Function to compare two tuples; result is per qsort() convention, ie:
204          * <0, 0, >0 according as a<b, a=b, a>b.  The API must match
205          * qsort_arg_comparator.
206          */
207         int                     (*comparetup) (const SortTuple *a, const SortTuple *b,
208                                                                                    Tuplesortstate *state);
209
210         /*
211          * Function to copy a supplied input tuple into palloc'd space and set up
212          * its SortTuple representation (ie, set tuple/datum1/isnull1).  Also,
213          * state->availMem must be decreased by the amount of space used for the
214          * tuple copy (note the SortTuple struct itself is not counted).
215          */
216         void            (*copytup) (Tuplesortstate *state, SortTuple *stup, void *tup);
217
218         /*
219          * Function to write a stored tuple onto tape.  The representation of the
220          * tuple on tape need not be the same as it is in memory; requirements on
221          * the tape representation are given below.  After writing the tuple,
222          * pfree() the out-of-line data (not the SortTuple struct!), and increase
223          * state->availMem by the amount of memory space thereby released.
224          */
225         void            (*writetup) (Tuplesortstate *state, int tapenum,
226                                                                                  SortTuple *stup);
227
228         /*
229          * Function to read a stored tuple from tape back into memory. 'len' is
230          * the already-read length of the stored tuple.  Create a palloc'd copy,
231          * initialize tuple/datum1/isnull1 in the target SortTuple struct, and
232          * decrease state->availMem by the amount of memory space consumed.
233          */
234         void            (*readtup) (Tuplesortstate *state, SortTuple *stup,
235                                                                                 int tapenum, unsigned int len);
236
237         /*
238          * This array holds the tuples now in sort memory.      If we are in state
239          * INITIAL, the tuples are in no particular order; if we are in state
240          * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
241          * and FINALMERGE, the tuples are organized in "heap" order per Algorithm
242          * H.  (Note that memtupcount only counts the tuples that are part of the
243          * heap --- during merge passes, memtuples[] entries beyond tapeRange are
244          * never in the heap and are used to hold pre-read tuples.)  In state
245          * SORTEDONTAPE, the array is not used.
246          */
247         SortTuple  *memtuples;          /* array of SortTuple structs */
248         int                     memtupcount;    /* number of tuples currently present */
249         int                     memtupsize;             /* allocated length of memtuples array */
250
251         /*
252          * While building initial runs, this is the current output run number
253          * (starting at 0).  Afterwards, it is the number of initial runs we made.
254          */
255         int                     currentRun;
256
257         /*
258          * Unless otherwise noted, all pointer variables below are pointers to
259          * arrays of length maxTapes, holding per-tape data.
260          */
261
262         /*
263          * These variables are only used during merge passes.  mergeactive[i] is
264          * true if we are reading an input run from (actual) tape number i and
265          * have not yet exhausted that run.  mergenext[i] is the memtuples index
266          * of the next pre-read tuple (next to be loaded into the heap) for tape
267          * i, or 0 if we are out of pre-read tuples.  mergelast[i] similarly
268          * points to the last pre-read tuple from each tape.  mergeavailslots[i]
269          * is the number of unused memtuples[] slots reserved for tape i, and
270          * mergeavailmem[i] is the amount of unused space allocated for tape i.
271          * mergefreelist and mergefirstfree keep track of unused locations in the
272          * memtuples[] array.  The memtuples[].tupindex fields link together
273          * pre-read tuples for each tape as well as recycled locations in
274          * mergefreelist. It is OK to use 0 as a null link in these lists, because
275          * memtuples[0] is part of the merge heap and is never a pre-read tuple.
276          */
277         bool       *mergeactive;        /* active input run source? */
278         int                *mergenext;          /* first preread tuple for each source */
279         int                *mergelast;          /* last preread tuple for each source */
280         int                *mergeavailslots;    /* slots left for prereading each tape */
281         long       *mergeavailmem;      /* availMem for prereading each tape */
282         int                     mergefreelist;  /* head of freelist of recycled slots */
283         int                     mergefirstfree; /* first slot never used in this merge */
284
285         /*
286          * Variables for Algorithm D.  Note that destTape is a "logical" tape
287          * number, ie, an index into the tp_xxx[] arrays.  Be careful to keep
288          * "logical" and "actual" tape numbers straight!
289          */
290         int                     Level;                  /* Knuth's l */
291         int                     destTape;               /* current output tape (Knuth's j, less 1) */
292         int                *tp_fib;                     /* Target Fibonacci run counts (A[]) */
293         int                *tp_runs;            /* # of real runs on each tape */
294         int                *tp_dummy;           /* # of dummy runs for each tape (D[]) */
295         int                *tp_tapenum;         /* Actual tape numbers (TAPE[]) */
296         int                     activeTapes;    /* # of active input tapes in merge pass */
297
298         /*
299          * These variables are used after completion of sorting to keep track of
300          * the next tuple to return.  (In the tape case, the tape's current read
301          * position is also critical state.)
302          */
303         int                     result_tape;    /* actual tape number of finished output */
304         int                     current;                /* array index (only used if SORTEDINMEM) */
305         bool            eof_reached;    /* reached EOF (needed for cursors) */
306
307         /* markpos_xxx holds marked position for mark and restore */
308         long            markpos_block;  /* tape block# (only used if SORTEDONTAPE) */
309         int                     markpos_offset; /* saved "current", or offset in tape block */
310         bool            markpos_eof;    /* saved "eof_reached" */
311
312         /*
313          * These variables are specific to the MinimalTuple case; they are set by
314          * tuplesort_begin_heap and used only by the MinimalTuple routines.
315          */
316         TupleDesc       tupDesc;
317         ScanKey         scanKeys;               /* array of length nKeys */
318         SortFunctionKind *sortFnKinds;          /* array of length nKeys */
319
320         /*
321          * These variables are specific to the IndexTuple case; they are set by
322          * tuplesort_begin_index and used only by the IndexTuple routines.
323          */
324         Relation        indexRel;
325         ScanKey         indexScanKey;
326         bool            enforceUnique;  /* complain if we find duplicate tuples */
327
328         /*
329          * These variables are specific to the Datum case; they are set by
330          * tuplesort_begin_datum and used only by the DatumTuple routines.
331          */
332         Oid                     datumType;
333         Oid                     sortOperator;
334         FmgrInfo        sortOpFn;               /* cached lookup data for sortOperator */
335         SortFunctionKind sortFnKind;
336         /* we need typelen and byval in order to know how to copy the Datums. */
337         int                     datumTypeLen;
338         bool            datumTypeByVal;
339
340         /*
341          * Resource snapshot for time of sort start.
342          */
343 #ifdef TRACE_SORT
344         PGRUsage        ru_start;
345 #endif
346 };
347
348 #define COMPARETUP(state,a,b)   ((*(state)->comparetup) (a, b, state))
349 #define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup))
350 #define WRITETUP(state,tape,stup)       ((*(state)->writetup) (state, tape, stup))
351 #define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len))
352 #define LACKMEM(state)          ((state)->availMem < 0)
353 #define USEMEM(state,amt)       ((state)->availMem -= (amt))
354 #define FREEMEM(state,amt)      ((state)->availMem += (amt))
355
356 /*
357  * NOTES about on-tape representation of tuples:
358  *
359  * We require the first "unsigned int" of a stored tuple to be the total size
360  * on-tape of the tuple, including itself (so it is never zero; an all-zero
361  * unsigned int is used to delimit runs).  The remainder of the stored tuple
362  * may or may not match the in-memory representation of the tuple ---
363  * any conversion needed is the job of the writetup and readtup routines.
364  *
365  * If state->randomAccess is true, then the stored representation of the
366  * tuple must be followed by another "unsigned int" that is a copy of the
367  * length --- so the total tape space used is actually sizeof(unsigned int)
368  * more than the stored length value.  This allows read-backwards.      When
369  * randomAccess is not true, the write/read routines may omit the extra
370  * length word.
371  *
372  * writetup is expected to write both length words as well as the tuple
373  * data.  When readtup is called, the tape is positioned just after the
374  * front length word; readtup must read the tuple data and advance past
375  * the back length word (if present).
376  *
377  * The write/read routines can make use of the tuple description data
378  * stored in the Tuplesortstate record, if needed.      They are also expected
379  * to adjust state->availMem by the amount of memory space (not tape space!)
380  * released or consumed.  There is no error return from either writetup
381  * or readtup; they should ereport() on failure.
382  *
383  *
384  * NOTES about memory consumption calculations:
385  *
386  * We count space allocated for tuples against the workMem limit, plus
387  * the space used by the variable-size memtuples array.  Fixed-size space
388  * is not counted; it's small enough to not be interesting.
389  *
390  * Note that we count actual space used (as shown by GetMemoryChunkSpace)
391  * rather than the originally-requested size.  This is important since
392  * palloc can add substantial overhead.  It's not a complete answer since
393  * we won't count any wasted space in palloc allocation blocks, but it's
394  * a lot better than what we were doing before 7.3.
395  */
396
397
398 static Tuplesortstate *tuplesort_begin_common(int workMem, bool randomAccess);
399 static void puttuple_common(Tuplesortstate *state, SortTuple *tuple);
400 static void inittapes(Tuplesortstate *state);
401 static void selectnewtape(Tuplesortstate *state);
402 static void mergeruns(Tuplesortstate *state);
403 static void mergeonerun(Tuplesortstate *state);
404 static void beginmerge(Tuplesortstate *state);
405 static void mergepreread(Tuplesortstate *state);
406 static void mergeprereadone(Tuplesortstate *state, int srcTape);
407 static void dumptuples(Tuplesortstate *state, bool alltuples);
408 static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple,
409                                           int tupleindex, bool checkIndex);
410 static void tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex);
411 static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
412 static void markrunend(Tuplesortstate *state, int tapenum);
413 static int comparetup_heap(const SortTuple *a, const SortTuple *b,
414                                 Tuplesortstate *state);
415 static void copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup);
416 static void writetup_heap(Tuplesortstate *state, int tapenum,
417                           SortTuple *stup);
418 static void readtup_heap(Tuplesortstate *state, SortTuple *stup,
419                          int tapenum, unsigned int len);
420 static int comparetup_index(const SortTuple *a, const SortTuple *b,
421                                  Tuplesortstate *state);
422 static void copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup);
423 static void writetup_index(Tuplesortstate *state, int tapenum,
424                            SortTuple *stup);
425 static void readtup_index(Tuplesortstate *state, SortTuple *stup,
426                           int tapenum, unsigned int len);
427 static int comparetup_datum(const SortTuple *a, const SortTuple *b,
428                                  Tuplesortstate *state);
429 static void copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup);
430 static void writetup_datum(Tuplesortstate *state, int tapenum,
431                            SortTuple *stup);
432 static void readtup_datum(Tuplesortstate *state, SortTuple *stup,
433                           int tapenum, unsigned int len);
434
435
436 /*
437  *              tuplesort_begin_xxx
438  *
439  * Initialize for a tuple sort operation.
440  *
441  * After calling tuplesort_begin, the caller should call tuplesort_putXXX
442  * zero or more times, then call tuplesort_performsort when all the tuples
443  * have been supplied.  After performsort, retrieve the tuples in sorted
444  * order by calling tuplesort_getXXX until it returns false/NULL.  (If random
445  * access was requested, rescan, markpos, and restorepos can also be called.)
446  * Call tuplesort_end to terminate the operation and release memory/disk space.
447  *
448  * Each variant of tuplesort_begin has a workMem parameter specifying the
449  * maximum number of kilobytes of RAM to use before spilling data to disk.
450  * (The normal value of this parameter is work_mem, but some callers use
451  * other values.)  Each variant also has a randomAccess parameter specifying
452  * whether the caller needs non-sequential access to the sort result.
453  */
454
455 static Tuplesortstate *
456 tuplesort_begin_common(int workMem, bool randomAccess)
457 {
458         Tuplesortstate *state;
459         MemoryContext sortcontext;
460         MemoryContext oldcontext;
461
462         /*
463          * Create a working memory context for this sort operation. All data
464          * needed by the sort will live inside this context.
465          */
466         sortcontext = AllocSetContextCreate(CurrentMemoryContext,
467                                                                                 "TupleSort",
468                                                                                 ALLOCSET_DEFAULT_MINSIZE,
469                                                                                 ALLOCSET_DEFAULT_INITSIZE,
470                                                                                 ALLOCSET_DEFAULT_MAXSIZE);
471
472         /*
473          * Make the Tuplesortstate within the per-sort context.  This way, we
474          * don't need a separate pfree() operation for it at shutdown.
475          */
476         oldcontext = MemoryContextSwitchTo(sortcontext);
477
478         state = (Tuplesortstate *) palloc0(sizeof(Tuplesortstate));
479
480 #ifdef TRACE_SORT
481         if (trace_sort)
482                 pg_rusage_init(&state->ru_start);
483 #endif
484
485         state->status = TSS_INITIAL;
486         state->randomAccess = randomAccess;
487         state->allowedMem = workMem * 1024L;
488         state->availMem = state->allowedMem;
489         state->sortcontext = sortcontext;
490         state->tapeset = NULL;
491
492         state->memtupcount = 0;
493         state->memtupsize = 1024;       /* initial guess */
494         state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
495
496         USEMEM(state, GetMemoryChunkSpace(state->memtuples));
497
498         /* workMem must be large enough for the minimal memtuples array */
499         if (LACKMEM(state))
500                 elog(ERROR, "insufficient memory allowed for sort");
501
502         state->currentRun = 0;
503
504         /*
505          * maxTapes, tapeRange, and Algorithm D variables will be initialized by
506          * inittapes(), if needed
507          */
508
509         state->result_tape = -1;        /* flag that result tape has not been formed */
510
511         MemoryContextSwitchTo(oldcontext);
512
513         return state;
514 }
515
516 Tuplesortstate *
517 tuplesort_begin_heap(TupleDesc tupDesc,
518                                          int nkeys,
519                                          Oid *sortOperators, AttrNumber *attNums,
520                                          int workMem, bool randomAccess)
521 {
522         Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
523         MemoryContext oldcontext;
524         int                     i;
525
526         oldcontext = MemoryContextSwitchTo(state->sortcontext);
527
528         AssertArg(nkeys > 0);
529
530 #ifdef TRACE_SORT
531         if (trace_sort)
532                 elog(LOG,
533                          "begin tuple sort: nkeys = %d, workMem = %d, randomAccess = %c",
534                          nkeys, workMem, randomAccess ? 't' : 'f');
535 #endif
536
537         state->nKeys = nkeys;
538
539         state->comparetup = comparetup_heap;
540         state->copytup = copytup_heap;
541         state->writetup = writetup_heap;
542         state->readtup = readtup_heap;
543
544         state->tupDesc = tupDesc;       /* assume we need not copy tupDesc */
545         state->scanKeys = (ScanKey) palloc0(nkeys * sizeof(ScanKeyData));
546         state->sortFnKinds = (SortFunctionKind *)
547                 palloc0(nkeys * sizeof(SortFunctionKind));
548
549         for (i = 0; i < nkeys; i++)
550         {
551                 RegProcedure sortFunction;
552
553                 AssertArg(sortOperators[i] != 0);
554                 AssertArg(attNums[i] != 0);
555
556                 /* select a function that implements the sort operator */
557                 SelectSortFunction(sortOperators[i], &sortFunction,
558                                                    &state->sortFnKinds[i]);
559
560                 /*
561                  * We needn't fill in sk_strategy or sk_subtype since these scankeys
562                  * will never be passed to an index.
563                  */
564                 ScanKeyInit(&state->scanKeys[i],
565                                         attNums[i],
566                                         InvalidStrategy,
567                                         sortFunction,
568                                         (Datum) 0);
569         }
570
571         MemoryContextSwitchTo(oldcontext);
572
573         return state;
574 }
575
576 Tuplesortstate *
577 tuplesort_begin_index(Relation indexRel,
578                                           bool enforceUnique,
579                                           int workMem, bool randomAccess)
580 {
581         Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
582         MemoryContext oldcontext;
583
584         oldcontext = MemoryContextSwitchTo(state->sortcontext);
585
586 #ifdef TRACE_SORT
587         if (trace_sort)
588                 elog(LOG,
589                          "begin index sort: unique = %c, workMem = %d, randomAccess = %c",
590                          enforceUnique ? 't' : 'f',
591                          workMem, randomAccess ? 't' : 'f');
592 #endif
593
594         state->nKeys = RelationGetNumberOfAttributes(indexRel);
595
596         state->comparetup = comparetup_index;
597         state->copytup = copytup_index;
598         state->writetup = writetup_index;
599         state->readtup = readtup_index;
600
601         state->indexRel = indexRel;
602         /* see comments below about btree dependence of this code... */
603         state->indexScanKey = _bt_mkscankey_nodata(indexRel);
604         state->enforceUnique = enforceUnique;
605
606         MemoryContextSwitchTo(oldcontext);
607
608         return state;
609 }
610
611 Tuplesortstate *
612 tuplesort_begin_datum(Oid datumType,
613                                           Oid sortOperator,
614                                           int workMem, bool randomAccess)
615 {
616         Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
617         MemoryContext oldcontext;
618         RegProcedure sortFunction;
619         int16           typlen;
620         bool            typbyval;
621
622         oldcontext = MemoryContextSwitchTo(state->sortcontext);
623
624 #ifdef TRACE_SORT
625         if (trace_sort)
626                 elog(LOG,
627                          "begin datum sort: workMem = %d, randomAccess = %c",
628                          workMem, randomAccess ? 't' : 'f');
629 #endif
630
631         state->nKeys = 1;                       /* always a one-column sort */
632
633         state->comparetup = comparetup_datum;
634         state->copytup = copytup_datum;
635         state->writetup = writetup_datum;
636         state->readtup = readtup_datum;
637
638         state->datumType = datumType;
639         state->sortOperator = sortOperator;
640
641         /* select a function that implements the sort operator */
642         SelectSortFunction(sortOperator, &sortFunction, &state->sortFnKind);
643         /* and look up the function */
644         fmgr_info(sortFunction, &state->sortOpFn);
645
646         /* lookup necessary attributes of the datum type */
647         get_typlenbyval(datumType, &typlen, &typbyval);
648         state->datumTypeLen = typlen;
649         state->datumTypeByVal = typbyval;
650
651         MemoryContextSwitchTo(oldcontext);
652
653         return state;
654 }
655
656 /*
657  * tuplesort_end
658  *
659  *      Release resources and clean up.
660  *
661  * NOTE: after calling this, any pointers returned by tuplesort_getXXX are
662  * pointing to garbage.  Be careful not to attempt to use or free such
663  * pointers afterwards!
664  */
665 void
666 tuplesort_end(Tuplesortstate *state)
667 {
668         /* context swap probably not needed, but let's be safe */
669         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
670
671 #ifdef TRACE_SORT
672         long            spaceUsed;
673
674         if (state->tapeset)
675                 spaceUsed = LogicalTapeSetBlocks(state->tapeset);
676         else
677                 spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
678 #endif
679
680         /*
681          * Delete temporary "tape" files, if any.
682          *
683          * Note: want to include this in reported total cost of sort, hence need
684          * for two #ifdef TRACE_SORT sections.
685          */
686         if (state->tapeset)
687                 LogicalTapeSetClose(state->tapeset);
688
689 #ifdef TRACE_SORT
690         if (trace_sort)
691         {
692                 if (state->tapeset)
693                         elog(LOG, "external sort ended, %ld disk blocks used: %s",
694                                  spaceUsed, pg_rusage_show(&state->ru_start));
695                 else
696                         elog(LOG, "internal sort ended, %ld KB used: %s",
697                                  spaceUsed, pg_rusage_show(&state->ru_start));
698         }
699 #endif
700
701         MemoryContextSwitchTo(oldcontext);
702
703         /*
704          * Free the per-sort memory context, thereby releasing all working memory,
705          * including the Tuplesortstate struct itself.
706          */
707         MemoryContextDelete(state->sortcontext);
708 }
709
710 /*
711  * Grow the memtuples[] array, if possible within our memory constraint.
712  * Return TRUE if able to enlarge the array, FALSE if not.
713  *
714  * At each increment we double the size of the array.  When we are short
715  * on memory we could consider smaller increases, but because availMem
716  * moves around with tuple addition/removal, this might result in thrashing.
717  * Small increases in the array size are likely to be pretty inefficient.
718  */
719 static bool
720 grow_memtuples(Tuplesortstate *state)
721 {
722         /*
723          * We need to be sure that we do not cause LACKMEM to become true, else
724          * the space management algorithm will go nuts.  We assume here that the
725          * memory chunk overhead associated with the memtuples array is constant
726          * and so there will be no unexpected addition to what we ask for.      (The
727          * minimum array size established in tuplesort_begin_common is large
728          * enough to force palloc to treat it as a separate chunk, so this
729          * assumption should be good.  But let's check it.)
730          */
731         if (state->availMem <= (long) (state->memtupsize * sizeof(SortTuple)))
732                 return false;
733
734         /*
735          * On a 64-bit machine, allowedMem could be high enough to get us into
736          * trouble with MaxAllocSize, too.
737          */
738         if ((Size) (state->memtupsize * 2) >= MaxAllocSize / sizeof(SortTuple))
739                 return false;
740
741         FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
742         state->memtupsize *= 2;
743         state->memtuples = (SortTuple *)
744                 repalloc(state->memtuples,
745                                  state->memtupsize * sizeof(SortTuple));
746         USEMEM(state, GetMemoryChunkSpace(state->memtuples));
747         if (LACKMEM(state))
748                 elog(ERROR, "unexpected out-of-memory situation during sort");
749         return true;
750 }
751
752 /*
753  * Accept one tuple while collecting input data for sort.
754  *
755  * Note that the input data is always copied; the caller need not save it.
756  */
757 void
758 tuplesort_puttupleslot(Tuplesortstate *state, TupleTableSlot *slot)
759 {
760         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
761         SortTuple       stup;
762
763         /*
764          * Copy the given tuple into memory we control, and decrease availMem.
765          * Then call the common code.
766          */
767         COPYTUP(state, &stup, (void *) slot);
768
769         puttuple_common(state, &stup);
770
771         MemoryContextSwitchTo(oldcontext);
772 }
773
774 /*
775  * Accept one index tuple while collecting input data for sort.
776  *
777  * Note that the input tuple is always copied; the caller need not save it.
778  */
779 void
780 tuplesort_putindextuple(Tuplesortstate *state, IndexTuple tuple)
781 {
782         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
783         SortTuple       stup;
784
785         /*
786          * Copy the given tuple into memory we control, and decrease availMem.
787          * Then call the common code.
788          */
789         COPYTUP(state, &stup, (void *) tuple);
790
791         puttuple_common(state, &stup);
792
793         MemoryContextSwitchTo(oldcontext);
794 }
795
796 /*
797  * Accept one Datum while collecting input data for sort.
798  *
799  * If the Datum is pass-by-ref type, the value will be copied.
800  */
801 void
802 tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull)
803 {
804         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
805         SortTuple       stup;
806
807         /*
808          * If it's a pass-by-reference value, copy it into memory we control, and
809          * decrease availMem.  Then call the common code.
810          */
811         if (isNull || state->datumTypeByVal)
812         {
813                 stup.datum1 = val;
814                 stup.isnull1 = isNull;
815                 stup.tuple = NULL;              /* no separate storage */
816         }
817         else
818         {
819                 stup.datum1 = datumCopy(val, false, state->datumTypeLen);
820                 stup.isnull1 = false;
821                 stup.tuple = DatumGetPointer(stup.datum1);
822                 USEMEM(state, GetMemoryChunkSpace(stup.tuple));
823         }
824
825         puttuple_common(state, &stup);
826
827         MemoryContextSwitchTo(oldcontext);
828 }
829
830 /*
831  * Shared code for tuple and datum cases.
832  */
833 static void
834 puttuple_common(Tuplesortstate *state, SortTuple *tuple)
835 {
836         switch (state->status)
837         {
838                 case TSS_INITIAL:
839
840                         /*
841                          * Save the tuple into the unsorted array.      First, grow the array
842                          * as needed.  Note that we try to grow the array when there is
843                          * still one free slot remaining --- if we fail, there'll still be
844                          * room to store the incoming tuple, and then we'll switch to
845                          * tape-based operation.
846                          */
847                         if (state->memtupcount >= state->memtupsize - 1)
848                         {
849                                 (void) grow_memtuples(state);
850                                 Assert(state->memtupcount < state->memtupsize);
851                         }
852                         state->memtuples[state->memtupcount++] = *tuple;
853
854                         /*
855                          * Done if we still fit in available memory and have array slots.
856                          */
857                         if (state->memtupcount < state->memtupsize && !LACKMEM(state))
858                                 return;
859
860                         /*
861                          * Nope; time to switch to tape-based operation.
862                          */
863                         inittapes(state);
864
865                         /*
866                          * Dump tuples until we are back under the limit.
867                          */
868                         dumptuples(state, false);
869                         break;
870                 case TSS_BUILDRUNS:
871
872                         /*
873                          * Insert the tuple into the heap, with run number currentRun if
874                          * it can go into the current run, else run number currentRun+1.
875                          * The tuple can go into the current run if it is >= the first
876                          * not-yet-output tuple.  (Actually, it could go into the current
877                          * run if it is >= the most recently output tuple ... but that
878                          * would require keeping around the tuple we last output, and it's
879                          * simplest to let writetup free each tuple as soon as it's
880                          * written.)
881                          *
882                          * Note there will always be at least one tuple in the heap at
883                          * this point; see dumptuples.
884                          */
885                         Assert(state->memtupcount > 0);
886                         if (COMPARETUP(state, tuple, &state->memtuples[0]) >= 0)
887                                 tuplesort_heap_insert(state, tuple, state->currentRun, true);
888                         else
889                                 tuplesort_heap_insert(state, tuple, state->currentRun + 1, true);
890
891                         /*
892                          * If we are over the memory limit, dump tuples till we're under.
893                          */
894                         dumptuples(state, false);
895                         break;
896                 default:
897                         elog(ERROR, "invalid tuplesort state");
898                         break;
899         }
900 }
901
902 /*
903  * All tuples have been provided; finish the sort.
904  */
905 void
906 tuplesort_performsort(Tuplesortstate *state)
907 {
908         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
909
910 #ifdef TRACE_SORT
911         if (trace_sort)
912                 elog(LOG, "performsort starting: %s",
913                          pg_rusage_show(&state->ru_start));
914 #endif
915
916         switch (state->status)
917         {
918                 case TSS_INITIAL:
919
920                         /*
921                          * We were able to accumulate all the tuples within the allowed
922                          * amount of memory.  Just qsort 'em and we're done.
923                          */
924                         if (state->memtupcount > 1)
925                                 qsort_arg((void *) state->memtuples,
926                                                   state->memtupcount,
927                                                   sizeof(SortTuple),
928                                                   (qsort_arg_comparator) state->comparetup,
929                                                   (void *) state);
930                         state->current = 0;
931                         state->eof_reached = false;
932                         state->markpos_offset = 0;
933                         state->markpos_eof = false;
934                         state->status = TSS_SORTEDINMEM;
935                         break;
936                 case TSS_BUILDRUNS:
937
938                         /*
939                          * Finish tape-based sort.      First, flush all tuples remaining in
940                          * memory out to tape; then merge until we have a single remaining
941                          * run (or, if !randomAccess, one run per tape). Note that
942                          * mergeruns sets the correct state->status.
943                          */
944                         dumptuples(state, true);
945                         mergeruns(state);
946                         state->eof_reached = false;
947                         state->markpos_block = 0L;
948                         state->markpos_offset = 0;
949                         state->markpos_eof = false;
950                         break;
951                 default:
952                         elog(ERROR, "invalid tuplesort state");
953                         break;
954         }
955
956 #ifdef TRACE_SORT
957         if (trace_sort)
958         {
959                 if (state->status == TSS_FINALMERGE)
960                         elog(LOG, "performsort done (except %d-way final merge): %s",
961                                  state->activeTapes,
962                                  pg_rusage_show(&state->ru_start));
963                 else
964                         elog(LOG, "performsort done: %s",
965                                  pg_rusage_show(&state->ru_start));
966         }
967 #endif
968
969         MemoryContextSwitchTo(oldcontext);
970 }
971
972 /*
973  * Internal routine to fetch the next tuple in either forward or back
974  * direction into *stup.  Returns FALSE if no more tuples.
975  * If *should_free is set, the caller must pfree stup.tuple when done with it.
976  */
977 static bool
978 tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
979                                                   SortTuple *stup, bool *should_free)
980 {
981         unsigned int tuplen;
982
983         switch (state->status)
984         {
985                 case TSS_SORTEDINMEM:
986                         Assert(forward || state->randomAccess);
987                         *should_free = false;
988                         if (forward)
989                         {
990                                 if (state->current < state->memtupcount)
991                                 {
992                                         *stup = state->memtuples[state->current++];
993                                         return true;
994                                 }
995                                 state->eof_reached = true;
996                                 return false;
997                         }
998                         else
999                         {
1000                                 if (state->current <= 0)
1001                                         return false;
1002
1003                                 /*
1004                                  * if all tuples are fetched already then we return last
1005                                  * tuple, else - tuple before last returned.
1006                                  */
1007                                 if (state->eof_reached)
1008                                         state->eof_reached = false;
1009                                 else
1010                                 {
1011                                         state->current--;       /* last returned tuple */
1012                                         if (state->current <= 0)
1013                                                 return false;
1014                                 }
1015                                 *stup = state->memtuples[state->current - 1];
1016                                 return true;
1017                         }
1018                         break;
1019
1020                 case TSS_SORTEDONTAPE:
1021                         Assert(forward || state->randomAccess);
1022                         *should_free = true;
1023                         if (forward)
1024                         {
1025                                 if (state->eof_reached)
1026                                         return false;
1027                                 if ((tuplen = getlen(state, state->result_tape, true)) != 0)
1028                                 {
1029                                         READTUP(state, stup, state->result_tape, tuplen);
1030                                         return true;
1031                                 }
1032                                 else
1033                                 {
1034                                         state->eof_reached = true;
1035                                         return false;
1036                                 }
1037                         }
1038
1039                         /*
1040                          * Backward.
1041                          *
1042                          * if all tuples are fetched already then we return last tuple,
1043                          * else - tuple before last returned.
1044                          */
1045                         if (state->eof_reached)
1046                         {
1047                                 /*
1048                                  * Seek position is pointing just past the zero tuplen at the
1049                                  * end of file; back up to fetch last tuple's ending length
1050                                  * word.  If seek fails we must have a completely empty file.
1051                                  */
1052                                 if (!LogicalTapeBackspace(state->tapeset,
1053                                                                                   state->result_tape,
1054                                                                                   2 * sizeof(unsigned int)))
1055                                         return false;
1056                                 state->eof_reached = false;
1057                         }
1058                         else
1059                         {
1060                                 /*
1061                                  * Back up and fetch previously-returned tuple's ending length
1062                                  * word.  If seek fails, assume we are at start of file.
1063                                  */
1064                                 if (!LogicalTapeBackspace(state->tapeset,
1065                                                                                   state->result_tape,
1066                                                                                   sizeof(unsigned int)))
1067                                         return false;
1068                                 tuplen = getlen(state, state->result_tape, false);
1069
1070                                 /*
1071                                  * Back up to get ending length word of tuple before it.
1072                                  */
1073                                 if (!LogicalTapeBackspace(state->tapeset,
1074                                                                                   state->result_tape,
1075                                                                                   tuplen + 2 * sizeof(unsigned int)))
1076                                 {
1077                                         /*
1078                                          * If that fails, presumably the prev tuple is the first
1079                                          * in the file.  Back up so that it becomes next to read
1080                                          * in forward direction (not obviously right, but that is
1081                                          * what in-memory case does).
1082                                          */
1083                                         if (!LogicalTapeBackspace(state->tapeset,
1084                                                                                           state->result_tape,
1085                                                                                           tuplen + sizeof(unsigned int)))
1086                                                 elog(ERROR, "bogus tuple length in backward scan");
1087                                         return false;
1088                                 }
1089                         }
1090
1091                         tuplen = getlen(state, state->result_tape, false);
1092
1093                         /*
1094                          * Now we have the length of the prior tuple, back up and read it.
1095                          * Note: READTUP expects we are positioned after the initial
1096                          * length word of the tuple, so back up to that point.
1097                          */
1098                         if (!LogicalTapeBackspace(state->tapeset,
1099                                                                           state->result_tape,
1100                                                                           tuplen))
1101                                 elog(ERROR, "bogus tuple length in backward scan");
1102                         READTUP(state, stup, state->result_tape, tuplen);
1103                         return true;
1104
1105                 case TSS_FINALMERGE:
1106                         Assert(forward);
1107                         *should_free = true;
1108
1109                         /*
1110                          * This code should match the inner loop of mergeonerun().
1111                          */
1112                         if (state->memtupcount > 0)
1113                         {
1114                                 int                     srcTape = state->memtuples[0].tupindex;
1115                                 Size            tuplen;
1116                                 int                     tupIndex;
1117                                 SortTuple  *newtup;
1118
1119                                 *stup = state->memtuples[0];
1120                                 /* returned tuple is no longer counted in our memory space */
1121                                 if (stup->tuple)
1122                                 {
1123                                         tuplen = GetMemoryChunkSpace(stup->tuple);
1124                                         state->availMem += tuplen;
1125                                         state->mergeavailmem[srcTape] += tuplen;
1126                                 }
1127                                 tuplesort_heap_siftup(state, false);
1128                                 if ((tupIndex = state->mergenext[srcTape]) == 0)
1129                                 {
1130                                         /*
1131                                          * out of preloaded data on this tape, try to read more
1132                                          *
1133                                          * Unlike mergeonerun(), we only preload from the single
1134                                          * tape that's run dry.  See mergepreread() comments.
1135                                          */
1136                                         mergeprereadone(state, srcTape);
1137
1138                                         /*
1139                                          * if still no data, we've reached end of run on this tape
1140                                          */
1141                                         if ((tupIndex = state->mergenext[srcTape]) == 0)
1142                                                 return true;
1143                                 }
1144                                 /* pull next preread tuple from list, insert in heap */
1145                                 newtup = &state->memtuples[tupIndex];
1146                                 state->mergenext[srcTape] = newtup->tupindex;
1147                                 if (state->mergenext[srcTape] == 0)
1148                                         state->mergelast[srcTape] = 0;
1149                                 tuplesort_heap_insert(state, newtup, srcTape, false);
1150                                 /* put the now-unused memtuples entry on the freelist */
1151                                 newtup->tupindex = state->mergefreelist;
1152                                 state->mergefreelist = tupIndex;
1153                                 state->mergeavailslots[srcTape]++;
1154                                 return true;
1155                         }
1156                         return false;
1157
1158                 default:
1159                         elog(ERROR, "invalid tuplesort state");
1160                         return false;           /* keep compiler quiet */
1161         }
1162 }
1163
1164 /*
1165  * Fetch the next tuple in either forward or back direction.
1166  * If successful, put tuple in slot and return TRUE; else, clear the slot
1167  * and return FALSE.
1168  */
1169 bool
1170 tuplesort_gettupleslot(Tuplesortstate *state, bool forward,
1171                                            TupleTableSlot *slot)
1172 {
1173         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1174         SortTuple       stup;
1175         bool            should_free;
1176
1177         if (!tuplesort_gettuple_common(state, forward, &stup, &should_free))
1178                 stup.tuple = NULL;
1179
1180         MemoryContextSwitchTo(oldcontext);
1181
1182         if (stup.tuple)
1183         {
1184                 ExecStoreMinimalTuple((MinimalTuple) stup.tuple, slot, should_free);
1185                 return true;
1186         }
1187         else
1188         {
1189                 ExecClearTuple(slot);
1190                 return false;
1191         }
1192 }
1193
1194 /*
1195  * Fetch the next index tuple in either forward or back direction.
1196  * Returns NULL if no more tuples.      If *should_free is set, the
1197  * caller must pfree the returned tuple when done with it.
1198  */
1199 IndexTuple
1200 tuplesort_getindextuple(Tuplesortstate *state, bool forward,
1201                                                 bool *should_free)
1202 {
1203         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1204         SortTuple       stup;
1205
1206         if (!tuplesort_gettuple_common(state, forward, &stup, should_free))
1207                 stup.tuple = NULL;
1208
1209         MemoryContextSwitchTo(oldcontext);
1210
1211         return (IndexTuple) stup.tuple;
1212 }
1213
1214 /*
1215  * Fetch the next Datum in either forward or back direction.
1216  * Returns FALSE if no more datums.
1217  *
1218  * If the Datum is pass-by-ref type, the returned value is freshly palloc'd
1219  * and is now owned by the caller.
1220  */
1221 bool
1222 tuplesort_getdatum(Tuplesortstate *state, bool forward,
1223                                    Datum *val, bool *isNull)
1224 {
1225         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1226         SortTuple       stup;
1227         bool            should_free;
1228
1229         if (!tuplesort_gettuple_common(state, forward, &stup, &should_free))
1230         {
1231                 MemoryContextSwitchTo(oldcontext);
1232                 return false;
1233         }
1234
1235         if (stup.isnull1 || state->datumTypeByVal)
1236         {
1237                 *val = stup.datum1;
1238                 *isNull = stup.isnull1;
1239         }
1240         else
1241         {
1242                 if (should_free)
1243                         *val = stup.datum1;
1244                 else
1245                         *val = datumCopy(stup.datum1, false, state->datumTypeLen);
1246                 *isNull = false;
1247         }
1248
1249         MemoryContextSwitchTo(oldcontext);
1250
1251         return true;
1252 }
1253
1254 /*
1255  * tuplesort_merge_order - report merge order we'll use for given memory
1256  * (note: "merge order" just means the number of input tapes in the merge).
1257  *
1258  * This is exported for use by the planner.  allowedMem is in bytes.
1259  */
1260 int
1261 tuplesort_merge_order(long allowedMem)
1262 {
1263         int                     mOrder;
1264
1265         /*
1266          * We need one tape for each merge input, plus another one for the output,
1267          * and each of these tapes needs buffer space.  In addition we want
1268          * MERGE_BUFFER_SIZE workspace per input tape (but the output tape doesn't
1269          * count).
1270          *
1271          * Note: you might be thinking we need to account for the memtuples[]
1272          * array in this calculation, but we effectively treat that as part of the
1273          * MERGE_BUFFER_SIZE workspace.
1274          */
1275         mOrder = (allowedMem - TAPE_BUFFER_OVERHEAD) /
1276                 (MERGE_BUFFER_SIZE + TAPE_BUFFER_OVERHEAD);
1277
1278         /* Even in minimum memory, use at least a MINORDER merge */
1279         mOrder = Max(mOrder, MINORDER);
1280
1281         return mOrder;
1282 }
1283
1284 /*
1285  * inittapes - initialize for tape sorting.
1286  *
1287  * This is called only if we have found we don't have room to sort in memory.
1288  */
1289 static void
1290 inittapes(Tuplesortstate *state)
1291 {
1292         int                     maxTapes,
1293                                 ntuples,
1294                                 j;
1295         long            tapeSpace;
1296
1297         /* Compute number of tapes to use: merge order plus 1 */
1298         maxTapes = tuplesort_merge_order(state->allowedMem) + 1;
1299
1300         /*
1301          * We must have at least 2*maxTapes slots in the memtuples[] array, else
1302          * we'd not have room for merge heap plus preread.  It seems unlikely that
1303          * this case would ever occur, but be safe.
1304          */
1305         maxTapes = Min(maxTapes, state->memtupsize / 2);
1306
1307         state->maxTapes = maxTapes;
1308         state->tapeRange = maxTapes - 1;
1309
1310 #ifdef TRACE_SORT
1311         if (trace_sort)
1312                 elog(LOG, "switching to external sort with %d tapes: %s",
1313                          maxTapes, pg_rusage_show(&state->ru_start));
1314 #endif
1315
1316         /*
1317          * Decrease availMem to reflect the space needed for tape buffers; but
1318          * don't decrease it to the point that we have no room for tuples. (That
1319          * case is only likely to occur if sorting pass-by-value Datums; in all
1320          * other scenarios the memtuples[] array is unlikely to occupy more than
1321          * half of allowedMem.  In the pass-by-value case it's not important to
1322          * account for tuple space, so we don't care if LACKMEM becomes
1323          * inaccurate.)
1324          */
1325         tapeSpace = maxTapes * TAPE_BUFFER_OVERHEAD;
1326         if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
1327                 USEMEM(state, tapeSpace);
1328
1329         /*
1330          * Create the tape set and allocate the per-tape data arrays.
1331          */
1332         state->tapeset = LogicalTapeSetCreate(maxTapes);
1333
1334         state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool));
1335         state->mergenext = (int *) palloc0(maxTapes * sizeof(int));
1336         state->mergelast = (int *) palloc0(maxTapes * sizeof(int));
1337         state->mergeavailslots = (int *) palloc0(maxTapes * sizeof(int));
1338         state->mergeavailmem = (long *) palloc0(maxTapes * sizeof(long));
1339         state->tp_fib = (int *) palloc0(maxTapes * sizeof(int));
1340         state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
1341         state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int));
1342         state->tp_tapenum = (int *) palloc0(maxTapes * sizeof(int));
1343
1344         /*
1345          * Convert the unsorted contents of memtuples[] into a heap. Each tuple is
1346          * marked as belonging to run number zero.
1347          *
1348          * NOTE: we pass false for checkIndex since there's no point in comparing
1349          * indexes in this step, even though we do intend the indexes to be part
1350          * of the sort key...
1351          */
1352         ntuples = state->memtupcount;
1353         state->memtupcount = 0;         /* make the heap empty */
1354         for (j = 0; j < ntuples; j++)
1355         {
1356                 /* Must copy source tuple to avoid possible overwrite */
1357                 SortTuple       stup = state->memtuples[j];
1358
1359                 tuplesort_heap_insert(state, &stup, 0, false);
1360         }
1361         Assert(state->memtupcount == ntuples);
1362
1363         state->currentRun = 0;
1364
1365         /*
1366          * Initialize variables of Algorithm D (step D1).
1367          */
1368         for (j = 0; j < maxTapes; j++)
1369         {
1370                 state->tp_fib[j] = 1;
1371                 state->tp_runs[j] = 0;
1372                 state->tp_dummy[j] = 1;
1373                 state->tp_tapenum[j] = j;
1374         }
1375         state->tp_fib[state->tapeRange] = 0;
1376         state->tp_dummy[state->tapeRange] = 0;
1377
1378         state->Level = 1;
1379         state->destTape = 0;
1380
1381         state->status = TSS_BUILDRUNS;
1382 }
1383
1384 /*
1385  * selectnewtape -- select new tape for new initial run.
1386  *
1387  * This is called after finishing a run when we know another run
1388  * must be started.  This implements steps D3, D4 of Algorithm D.
1389  */
1390 static void
1391 selectnewtape(Tuplesortstate *state)
1392 {
1393         int                     j;
1394         int                     a;
1395
1396         /* Step D3: advance j (destTape) */
1397         if (state->tp_dummy[state->destTape] < state->tp_dummy[state->destTape + 1])
1398         {
1399                 state->destTape++;
1400                 return;
1401         }
1402         if (state->tp_dummy[state->destTape] != 0)
1403         {
1404                 state->destTape = 0;
1405                 return;
1406         }
1407
1408         /* Step D4: increase level */
1409         state->Level++;
1410         a = state->tp_fib[0];
1411         for (j = 0; j < state->tapeRange; j++)
1412         {
1413                 state->tp_dummy[j] = a + state->tp_fib[j + 1] - state->tp_fib[j];
1414                 state->tp_fib[j] = a + state->tp_fib[j + 1];
1415         }
1416         state->destTape = 0;
1417 }
1418
1419 /*
1420  * mergeruns -- merge all the completed initial runs.
1421  *
1422  * This implements steps D5, D6 of Algorithm D.  All input data has
1423  * already been written to initial runs on tape (see dumptuples).
1424  */
1425 static void
1426 mergeruns(Tuplesortstate *state)
1427 {
1428         int                     tapenum,
1429                                 svTape,
1430                                 svRuns,
1431                                 svDummy;
1432
1433         Assert(state->status == TSS_BUILDRUNS);
1434         Assert(state->memtupcount == 0);
1435
1436         /*
1437          * If we produced only one initial run (quite likely if the total data
1438          * volume is between 1X and 2X workMem), we can just use that tape as the
1439          * finished output, rather than doing a useless merge.  (This obvious
1440          * optimization is not in Knuth's algorithm.)
1441          */
1442         if (state->currentRun == 1)
1443         {
1444                 state->result_tape = state->tp_tapenum[state->destTape];
1445                 /* must freeze and rewind the finished output tape */
1446                 LogicalTapeFreeze(state->tapeset, state->result_tape);
1447                 state->status = TSS_SORTEDONTAPE;
1448                 return;
1449         }
1450
1451         /* End of step D2: rewind all output tapes to prepare for merging */
1452         for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
1453                 LogicalTapeRewind(state->tapeset, tapenum, false);
1454
1455         for (;;)
1456         {
1457                 /*
1458                  * At this point we know that tape[T] is empty.  If there's just one
1459                  * (real or dummy) run left on each input tape, then only one merge
1460                  * pass remains.  If we don't have to produce a materialized sorted
1461                  * tape, we can stop at this point and do the final merge on-the-fly.
1462                  */
1463                 if (!state->randomAccess)
1464                 {
1465                         bool            allOneRun = true;
1466
1467                         Assert(state->tp_runs[state->tapeRange] == 0);
1468                         for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
1469                         {
1470                                 if (state->tp_runs[tapenum] + state->tp_dummy[tapenum] != 1)
1471                                 {
1472                                         allOneRun = false;
1473                                         break;
1474                                 }
1475                         }
1476                         if (allOneRun)
1477                         {
1478                                 /* Tell logtape.c we won't be writing anymore */
1479                                 LogicalTapeSetForgetFreeSpace(state->tapeset);
1480                                 /* Initialize for the final merge pass */
1481                                 beginmerge(state);
1482                                 state->status = TSS_FINALMERGE;
1483                                 return;
1484                         }
1485                 }
1486
1487                 /* Step D5: merge runs onto tape[T] until tape[P] is empty */
1488                 while (state->tp_runs[state->tapeRange - 1] ||
1489                            state->tp_dummy[state->tapeRange - 1])
1490                 {
1491                         bool            allDummy = true;
1492
1493                         for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
1494                         {
1495                                 if (state->tp_dummy[tapenum] == 0)
1496                                 {
1497                                         allDummy = false;
1498                                         break;
1499                                 }
1500                         }
1501
1502                         if (allDummy)
1503                         {
1504                                 state->tp_dummy[state->tapeRange]++;
1505                                 for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
1506                                         state->tp_dummy[tapenum]--;
1507                         }
1508                         else
1509                                 mergeonerun(state);
1510                 }
1511
1512                 /* Step D6: decrease level */
1513                 if (--state->Level == 0)
1514                         break;
1515                 /* rewind output tape T to use as new input */
1516                 LogicalTapeRewind(state->tapeset, state->tp_tapenum[state->tapeRange],
1517                                                   false);
1518                 /* rewind used-up input tape P, and prepare it for write pass */
1519                 LogicalTapeRewind(state->tapeset, state->tp_tapenum[state->tapeRange - 1],
1520                                                   true);
1521                 state->tp_runs[state->tapeRange - 1] = 0;
1522
1523                 /*
1524                  * reassign tape units per step D6; note we no longer care about A[]
1525                  */
1526                 svTape = state->tp_tapenum[state->tapeRange];
1527                 svDummy = state->tp_dummy[state->tapeRange];
1528                 svRuns = state->tp_runs[state->tapeRange];
1529                 for (tapenum = state->tapeRange; tapenum > 0; tapenum--)
1530                 {
1531                         state->tp_tapenum[tapenum] = state->tp_tapenum[tapenum - 1];
1532                         state->tp_dummy[tapenum] = state->tp_dummy[tapenum - 1];
1533                         state->tp_runs[tapenum] = state->tp_runs[tapenum - 1];
1534                 }
1535                 state->tp_tapenum[0] = svTape;
1536                 state->tp_dummy[0] = svDummy;
1537                 state->tp_runs[0] = svRuns;
1538         }
1539
1540         /*
1541          * Done.  Knuth says that the result is on TAPE[1], but since we exited
1542          * the loop without performing the last iteration of step D6, we have not
1543          * rearranged the tape unit assignment, and therefore the result is on
1544          * TAPE[T].  We need to do it this way so that we can freeze the final
1545          * output tape while rewinding it.      The last iteration of step D6 would be
1546          * a waste of cycles anyway...
1547          */
1548         state->result_tape = state->tp_tapenum[state->tapeRange];
1549         LogicalTapeFreeze(state->tapeset, state->result_tape);
1550         state->status = TSS_SORTEDONTAPE;
1551 }
1552
1553 /*
1554  * Merge one run from each input tape, except ones with dummy runs.
1555  *
1556  * This is the inner loop of Algorithm D step D5.  We know that the
1557  * output tape is TAPE[T].
1558  */
1559 static void
1560 mergeonerun(Tuplesortstate *state)
1561 {
1562         int                     destTape = state->tp_tapenum[state->tapeRange];
1563         int                     srcTape;
1564         int                     tupIndex;
1565         SortTuple  *tup;
1566         long            priorAvail,
1567                                 spaceFreed;
1568
1569         /*
1570          * Start the merge by loading one tuple from each active source tape into
1571          * the heap.  We can also decrease the input run/dummy run counts.
1572          */
1573         beginmerge(state);
1574
1575         /*
1576          * Execute merge by repeatedly extracting lowest tuple in heap, writing it
1577          * out, and replacing it with next tuple from same tape (if there is
1578          * another one).
1579          */
1580         while (state->memtupcount > 0)
1581         {
1582                 /* write the tuple to destTape */
1583                 priorAvail = state->availMem;
1584                 srcTape = state->memtuples[0].tupindex;
1585                 WRITETUP(state, destTape, &state->memtuples[0]);
1586                 /* writetup adjusted total free space, now fix per-tape space */
1587                 spaceFreed = state->availMem - priorAvail;
1588                 state->mergeavailmem[srcTape] += spaceFreed;
1589                 /* compact the heap */
1590                 tuplesort_heap_siftup(state, false);
1591                 if ((tupIndex = state->mergenext[srcTape]) == 0)
1592                 {
1593                         /* out of preloaded data on this tape, try to read more */
1594                         mergepreread(state);
1595                         /* if still no data, we've reached end of run on this tape */
1596                         if ((tupIndex = state->mergenext[srcTape]) == 0)
1597                                 continue;
1598                 }
1599                 /* pull next preread tuple from list, insert in heap */
1600                 tup = &state->memtuples[tupIndex];
1601                 state->mergenext[srcTape] = tup->tupindex;
1602                 if (state->mergenext[srcTape] == 0)
1603                         state->mergelast[srcTape] = 0;
1604                 tuplesort_heap_insert(state, tup, srcTape, false);
1605                 /* put the now-unused memtuples entry on the freelist */
1606                 tup->tupindex = state->mergefreelist;
1607                 state->mergefreelist = tupIndex;
1608                 state->mergeavailslots[srcTape]++;
1609         }
1610
1611         /*
1612          * When the heap empties, we're done.  Write an end-of-run marker on the
1613          * output tape, and increment its count of real runs.
1614          */
1615         markrunend(state, destTape);
1616         state->tp_runs[state->tapeRange]++;
1617
1618 #ifdef TRACE_SORT
1619         if (trace_sort)
1620                 elog(LOG, "finished %d-way merge step: %s", state->activeTapes,
1621                          pg_rusage_show(&state->ru_start));
1622 #endif
1623 }
1624
1625 /*
1626  * beginmerge - initialize for a merge pass
1627  *
1628  * We decrease the counts of real and dummy runs for each tape, and mark
1629  * which tapes contain active input runs in mergeactive[].      Then, load
1630  * as many tuples as we can from each active input tape, and finally
1631  * fill the merge heap with the first tuple from each active tape.
1632  */
1633 static void
1634 beginmerge(Tuplesortstate *state)
1635 {
1636         int                     activeTapes;
1637         int                     tapenum;
1638         int                     srcTape;
1639         int                     slotsPerTape;
1640         long            spacePerTape;
1641
1642         /* Heap should be empty here */
1643         Assert(state->memtupcount == 0);
1644
1645         /* Adjust run counts and mark the active tapes */
1646         memset(state->mergeactive, 0,
1647                    state->maxTapes * sizeof(*state->mergeactive));
1648         activeTapes = 0;
1649         for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
1650         {
1651                 if (state->tp_dummy[tapenum] > 0)
1652                         state->tp_dummy[tapenum]--;
1653                 else
1654                 {
1655                         Assert(state->tp_runs[tapenum] > 0);
1656                         state->tp_runs[tapenum]--;
1657                         srcTape = state->tp_tapenum[tapenum];
1658                         state->mergeactive[srcTape] = true;
1659                         activeTapes++;
1660                 }
1661         }
1662         state->activeTapes = activeTapes;
1663
1664         /* Clear merge-pass state variables */
1665         memset(state->mergenext, 0,
1666                    state->maxTapes * sizeof(*state->mergenext));
1667         memset(state->mergelast, 0,
1668                    state->maxTapes * sizeof(*state->mergelast));
1669         state->mergefreelist = 0;       /* nothing in the freelist */
1670         state->mergefirstfree = activeTapes;            /* 1st slot avail for preread */
1671
1672         /*
1673          * Initialize space allocation to let each active input tape have an equal
1674          * share of preread space.
1675          */
1676         Assert(activeTapes > 0);
1677         slotsPerTape = (state->memtupsize - state->mergefirstfree) / activeTapes;
1678         Assert(slotsPerTape > 0);
1679         spacePerTape = state->availMem / activeTapes;
1680         for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
1681         {
1682                 if (state->mergeactive[srcTape])
1683                 {
1684                         state->mergeavailslots[srcTape] = slotsPerTape;
1685                         state->mergeavailmem[srcTape] = spacePerTape;
1686                 }
1687         }
1688
1689         /*
1690          * Preread as many tuples as possible (and at least one) from each active
1691          * tape
1692          */
1693         mergepreread(state);
1694
1695         /* Load the merge heap with the first tuple from each input tape */
1696         for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
1697         {
1698                 int                     tupIndex = state->mergenext[srcTape];
1699                 SortTuple  *tup;
1700
1701                 if (tupIndex)
1702                 {
1703                         tup = &state->memtuples[tupIndex];
1704                         state->mergenext[srcTape] = tup->tupindex;
1705                         if (state->mergenext[srcTape] == 0)
1706                                 state->mergelast[srcTape] = 0;
1707                         tuplesort_heap_insert(state, tup, srcTape, false);
1708                         /* put the now-unused memtuples entry on the freelist */
1709                         tup->tupindex = state->mergefreelist;
1710                         state->mergefreelist = tupIndex;
1711                         state->mergeavailslots[srcTape]++;
1712                 }
1713         }
1714 }
1715
1716 /*
1717  * mergepreread - load tuples from merge input tapes
1718  *
1719  * This routine exists to improve sequentiality of reads during a merge pass,
1720  * as explained in the header comments of this file.  Load tuples from each
1721  * active source tape until the tape's run is exhausted or it has used up
1722  * its fair share of available memory.  In any case, we guarantee that there
1723  * is at least one preread tuple available from each unexhausted input tape.
1724  *
1725  * We invoke this routine at the start of a merge pass for initial load,
1726  * and then whenever any tape's preread data runs out.  Note that we load
1727  * as much data as possible from all tapes, not just the one that ran out.
1728  * This is because logtape.c works best with a usage pattern that alternates
1729  * between reading a lot of data and writing a lot of data, so whenever we
1730  * are forced to read, we should fill working memory completely.
1731  *
1732  * In FINALMERGE state, we *don't* use this routine, but instead just preread
1733  * from the single tape that ran dry.  There's no read/write alternation in
1734  * that state and so no point in scanning through all the tapes to fix one.
1735  * (Moreover, there may be quite a lot of inactive tapes in that state, since
1736  * we might have had many fewer runs than tapes.  In a regular tape-to-tape
1737  * merge we can expect most of the tapes to be active.)
1738  */
1739 static void
1740 mergepreread(Tuplesortstate *state)
1741 {
1742         int                     srcTape;
1743
1744         for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
1745                 mergeprereadone(state, srcTape);
1746 }
1747
1748 /*
1749  * mergeprereadone - load tuples from one merge input tape
1750  *
1751  * Read tuples from the specified tape until it has used up its free memory
1752  * or array slots; but ensure that we have at least one tuple, if any are
1753  * to be had.
1754  */
1755 static void
1756 mergeprereadone(Tuplesortstate *state, int srcTape)
1757 {
1758         unsigned int tuplen;
1759         SortTuple       stup;
1760         int                     tupIndex;
1761         long            priorAvail,
1762                                 spaceUsed;
1763
1764         if (!state->mergeactive[srcTape])
1765                 return;                                 /* tape's run is already exhausted */
1766         priorAvail = state->availMem;
1767         state->availMem = state->mergeavailmem[srcTape];
1768         while ((state->mergeavailslots[srcTape] > 0 && !LACKMEM(state)) ||
1769                    state->mergenext[srcTape] == 0)
1770         {
1771                 /* read next tuple, if any */
1772                 if ((tuplen = getlen(state, srcTape, true)) == 0)
1773                 {
1774                         state->mergeactive[srcTape] = false;
1775                         break;
1776                 }
1777                 READTUP(state, &stup, srcTape, tuplen);
1778                 /* find a free slot in memtuples[] for it */
1779                 tupIndex = state->mergefreelist;
1780                 if (tupIndex)
1781                         state->mergefreelist = state->memtuples[tupIndex].tupindex;
1782                 else
1783                 {
1784                         tupIndex = state->mergefirstfree++;
1785                         Assert(tupIndex < state->memtupsize);
1786                 }
1787                 state->mergeavailslots[srcTape]--;
1788                 /* store tuple, append to list for its tape */
1789                 stup.tupindex = 0;
1790                 state->memtuples[tupIndex] = stup;
1791                 if (state->mergelast[srcTape])
1792                         state->memtuples[state->mergelast[srcTape]].tupindex = tupIndex;
1793                 else
1794                         state->mergenext[srcTape] = tupIndex;
1795                 state->mergelast[srcTape] = tupIndex;
1796         }
1797         /* update per-tape and global availmem counts */
1798         spaceUsed = state->mergeavailmem[srcTape] - state->availMem;
1799         state->mergeavailmem[srcTape] = state->availMem;
1800         state->availMem = priorAvail - spaceUsed;
1801 }
1802
1803 /*
1804  * dumptuples - remove tuples from heap and write to tape
1805  *
1806  * This is used during initial-run building, but not during merging.
1807  *
1808  * When alltuples = false, dump only enough tuples to get under the
1809  * availMem limit (and leave at least one tuple in the heap in any case,
1810  * since puttuple assumes it always has a tuple to compare to).  We also
1811  * insist there be at least one free slot in the memtuples[] array.
1812  *
1813  * When alltuples = true, dump everything currently in memory.
1814  * (This case is only used at end of input data.)
1815  *
1816  * If we empty the heap, close out the current run and return (this should
1817  * only happen at end of input data).  If we see that the tuple run number
1818  * at the top of the heap has changed, start a new run.
1819  */
1820 static void
1821 dumptuples(Tuplesortstate *state, bool alltuples)
1822 {
1823         while (alltuples ||
1824                    (LACKMEM(state) && state->memtupcount > 1) ||
1825                    state->memtupcount >= state->memtupsize)
1826         {
1827                 /*
1828                  * Dump the heap's frontmost entry, and sift up to remove it from the
1829                  * heap.
1830                  */
1831                 Assert(state->memtupcount > 0);
1832                 WRITETUP(state, state->tp_tapenum[state->destTape],
1833                                  &state->memtuples[0]);
1834                 tuplesort_heap_siftup(state, true);
1835
1836                 /*
1837                  * If the heap is empty *or* top run number has changed, we've
1838                  * finished the current run.
1839                  */
1840                 if (state->memtupcount == 0 ||
1841                         state->currentRun != state->memtuples[0].tupindex)
1842                 {
1843                         markrunend(state, state->tp_tapenum[state->destTape]);
1844                         state->currentRun++;
1845                         state->tp_runs[state->destTape]++;
1846                         state->tp_dummy[state->destTape]--; /* per Alg D step D2 */
1847
1848 #ifdef TRACE_SORT
1849                         if (trace_sort)
1850                                 elog(LOG, "finished writing%s run %d to tape %d: %s",
1851                                          (state->memtupcount == 0) ? " final" : "",
1852                                          state->currentRun, state->destTape,
1853                                          pg_rusage_show(&state->ru_start));
1854 #endif
1855
1856                         /*
1857                          * Done if heap is empty, else prepare for new run.
1858                          */
1859                         if (state->memtupcount == 0)
1860                                 break;
1861                         Assert(state->currentRun == state->memtuples[0].tupindex);
1862                         selectnewtape(state);
1863                 }
1864         }
1865 }
1866
1867 /*
1868  * tuplesort_rescan             - rewind and replay the scan
1869  */
1870 void
1871 tuplesort_rescan(Tuplesortstate *state)
1872 {
1873         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1874
1875         Assert(state->randomAccess);
1876
1877         switch (state->status)
1878         {
1879                 case TSS_SORTEDINMEM:
1880                         state->current = 0;
1881                         state->eof_reached = false;
1882                         state->markpos_offset = 0;
1883                         state->markpos_eof = false;
1884                         break;
1885                 case TSS_SORTEDONTAPE:
1886                         LogicalTapeRewind(state->tapeset,
1887                                                           state->result_tape,
1888                                                           false);
1889                         state->eof_reached = false;
1890                         state->markpos_block = 0L;
1891                         state->markpos_offset = 0;
1892                         state->markpos_eof = false;
1893                         break;
1894                 default:
1895                         elog(ERROR, "invalid tuplesort state");
1896                         break;
1897         }
1898
1899         MemoryContextSwitchTo(oldcontext);
1900 }
1901
1902 /*
1903  * tuplesort_markpos    - saves current position in the merged sort file
1904  */
1905 void
1906 tuplesort_markpos(Tuplesortstate *state)
1907 {
1908         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1909
1910         Assert(state->randomAccess);
1911
1912         switch (state->status)
1913         {
1914                 case TSS_SORTEDINMEM:
1915                         state->markpos_offset = state->current;
1916                         state->markpos_eof = state->eof_reached;
1917                         break;
1918                 case TSS_SORTEDONTAPE:
1919                         LogicalTapeTell(state->tapeset,
1920                                                         state->result_tape,
1921                                                         &state->markpos_block,
1922                                                         &state->markpos_offset);
1923                         state->markpos_eof = state->eof_reached;
1924                         break;
1925                 default:
1926                         elog(ERROR, "invalid tuplesort state");
1927                         break;
1928         }
1929
1930         MemoryContextSwitchTo(oldcontext);
1931 }
1932
1933 /*
1934  * tuplesort_restorepos - restores current position in merged sort file to
1935  *                                                last saved position
1936  */
1937 void
1938 tuplesort_restorepos(Tuplesortstate *state)
1939 {
1940         MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
1941
1942         Assert(state->randomAccess);
1943
1944         switch (state->status)
1945         {
1946                 case TSS_SORTEDINMEM:
1947                         state->current = state->markpos_offset;
1948                         state->eof_reached = state->markpos_eof;
1949                         break;
1950                 case TSS_SORTEDONTAPE:
1951                         if (!LogicalTapeSeek(state->tapeset,
1952                                                                  state->result_tape,
1953                                                                  state->markpos_block,
1954                                                                  state->markpos_offset))
1955                                 elog(ERROR, "tuplesort_restorepos failed");
1956                         state->eof_reached = state->markpos_eof;
1957                         break;
1958                 default:
1959                         elog(ERROR, "invalid tuplesort state");
1960                         break;
1961         }
1962
1963         MemoryContextSwitchTo(oldcontext);
1964 }
1965
1966
1967 /*
1968  * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
1969  *
1970  * Compare two SortTuples.      If checkIndex is true, use the tuple index
1971  * as the front of the sort key; otherwise, no.
1972  */
1973
1974 #define HEAPCOMPARE(tup1,tup2) \
1975         (checkIndex && ((tup1)->tupindex != (tup2)->tupindex) ? \
1976          ((tup1)->tupindex) - ((tup2)->tupindex) : \
1977          COMPARETUP(state, tup1, tup2))
1978
1979 /*
1980  * Insert a new tuple into an empty or existing heap, maintaining the
1981  * heap invariant.      Caller is responsible for ensuring there's room.
1982  *
1983  * Note: we assume *tuple is a temporary variable that can be scribbled on.
1984  * For some callers, tuple actually points to a memtuples[] entry above the
1985  * end of the heap.  This is safe as long as it's not immediately adjacent
1986  * to the end of the heap (ie, in the [memtupcount] array entry) --- if it
1987  * is, it might get overwritten before being moved into the heap!
1988  */
1989 static void
1990 tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple,
1991                                           int tupleindex, bool checkIndex)
1992 {
1993         SortTuple  *memtuples;
1994         int                     j;
1995
1996         /*
1997          * Save the tupleindex --- see notes above about writing on *tuple. It's a
1998          * historical artifact that tupleindex is passed as a separate argument
1999          * and not in *tuple, but it's notationally convenient so let's leave it
2000          * that way.
2001          */
2002         tuple->tupindex = tupleindex;
2003
2004         memtuples = state->memtuples;
2005         Assert(state->memtupcount < state->memtupsize);
2006
2007         /*
2008          * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is
2009          * using 1-based array indexes, not 0-based.
2010          */
2011         j = state->memtupcount++;
2012         while (j > 0)
2013         {
2014                 int                     i = (j - 1) >> 1;
2015
2016                 if (HEAPCOMPARE(tuple, &memtuples[i]) >= 0)
2017                         break;
2018                 memtuples[j] = memtuples[i];
2019                 j = i;
2020         }
2021         memtuples[j] = *tuple;
2022 }
2023
2024 /*
2025  * The tuple at state->memtuples[0] has been removed from the heap.
2026  * Decrement memtupcount, and sift up to maintain the heap invariant.
2027  */
2028 static void
2029 tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex)
2030 {
2031         SortTuple  *memtuples = state->memtuples;
2032         SortTuple  *tuple;
2033         int                     i,
2034                                 n;
2035
2036         if (--state->memtupcount <= 0)
2037                 return;
2038         n = state->memtupcount;
2039         tuple = &memtuples[n];          /* tuple that must be reinserted */
2040         i = 0;                                          /* i is where the "hole" is */
2041         for (;;)
2042         {
2043                 int                     j = 2 * i + 1;
2044
2045                 if (j >= n)
2046                         break;
2047                 if (j + 1 < n &&
2048                         HEAPCOMPARE(&memtuples[j], &memtuples[j + 1]) > 0)
2049                         j++;
2050                 if (HEAPCOMPARE(tuple, &memtuples[j]) <= 0)
2051                         break;
2052                 memtuples[i] = memtuples[j];
2053                 i = j;
2054         }
2055         memtuples[i] = *tuple;
2056 }
2057
2058
2059 /*
2060  * Tape interface routines
2061  */
2062
2063 static unsigned int
2064 getlen(Tuplesortstate *state, int tapenum, bool eofOK)
2065 {
2066         unsigned int len;
2067
2068         if (LogicalTapeRead(state->tapeset, tapenum, (void *) &len,
2069                                                 sizeof(len)) != sizeof(len))
2070                 elog(ERROR, "unexpected end of tape");
2071         if (len == 0 && !eofOK)
2072                 elog(ERROR, "unexpected end of data");
2073         return len;
2074 }
2075
2076 static void
2077 markrunend(Tuplesortstate *state, int tapenum)
2078 {
2079         unsigned int len = 0;
2080
2081         LogicalTapeWrite(state->tapeset, tapenum, (void *) &len, sizeof(len));
2082 }
2083
2084
2085 /*
2086  * This routine selects an appropriate sorting function to implement
2087  * a sort operator as efficiently as possible.  The straightforward
2088  * method is to use the operator's implementation proc --- ie, "<"
2089  * comparison.  However, that way often requires two calls of the function
2090  * per comparison.      If we can find a btree three-way comparator function
2091  * associated with the operator, we can use it to do the comparisons
2092  * more efficiently.  We also support the possibility that the operator
2093  * is ">" (descending sort), in which case we have to reverse the output
2094  * of the btree comparator.
2095  *
2096  * Possibly this should live somewhere else (backend/catalog/, maybe?).
2097  */
2098 void
2099 SelectSortFunction(Oid sortOperator,
2100                                    RegProcedure *sortFunction,
2101                                    SortFunctionKind *kind)
2102 {
2103         CatCList   *catlist;
2104         int                     i;
2105         HeapTuple       tuple;
2106         Form_pg_operator optup;
2107         Oid                     opclass = InvalidOid;
2108
2109         /*
2110          * Search pg_amop to see if the target operator is registered as the "<"
2111          * or ">" operator of any btree opclass.  It's possible that it might be
2112          * registered both ways (eg, if someone were to build a "reverse sort"
2113          * opclass for some reason); prefer the "<" case if so. If the operator is
2114          * registered the same way in multiple opclasses, assume we can use the
2115          * associated comparator function from any one.
2116          */
2117         catlist = SearchSysCacheList(AMOPOPID, 1,
2118                                                                  ObjectIdGetDatum(sortOperator),
2119                                                                  0, 0, 0);
2120
2121         for (i = 0; i < catlist->n_members; i++)
2122         {
2123                 Form_pg_amop aform;
2124
2125                 tuple = &catlist->members[i]->tuple;
2126                 aform = (Form_pg_amop) GETSTRUCT(tuple);
2127
2128                 if (!opclass_is_btree(aform->amopclaid))
2129                         continue;
2130                 /* must be of default subtype, too */
2131                 if (aform->amopsubtype != InvalidOid)
2132                         continue;
2133
2134                 if (aform->amopstrategy == BTLessStrategyNumber)
2135                 {
2136                         opclass = aform->amopclaid;
2137                         *kind = SORTFUNC_CMP;
2138                         break;                          /* done looking */
2139                 }
2140                 else if (aform->amopstrategy == BTGreaterStrategyNumber)
2141                 {
2142                         opclass = aform->amopclaid;
2143                         *kind = SORTFUNC_REVCMP;
2144                         /* keep scanning in hopes of finding a BTLess entry */
2145                 }
2146         }
2147
2148         ReleaseSysCacheList(catlist);
2149
2150         if (OidIsValid(opclass))
2151         {
2152                 /* Found a suitable opclass, get its default comparator function */
2153                 *sortFunction = get_opclass_proc(opclass, InvalidOid, BTORDER_PROC);
2154                 Assert(RegProcedureIsValid(*sortFunction));
2155                 return;
2156         }
2157
2158         /*
2159          * Can't find a comparator, so use the operator as-is.  Decide whether it
2160          * is forward or reverse sort by looking at its name (grotty, but this
2161          * only matters for deciding which end NULLs should get sorted to).  XXX
2162          * possibly better idea: see whether its selectivity function is
2163          * scalargtcmp?
2164          */
2165         tuple = SearchSysCache(OPEROID,
2166                                                    ObjectIdGetDatum(sortOperator),
2167                                                    0, 0, 0);
2168         if (!HeapTupleIsValid(tuple))
2169                 elog(ERROR, "cache lookup failed for operator %u", sortOperator);
2170         optup = (Form_pg_operator) GETSTRUCT(tuple);
2171         if (strcmp(NameStr(optup->oprname), ">") == 0)
2172                 *kind = SORTFUNC_REVLT;
2173         else
2174                 *kind = SORTFUNC_LT;
2175         *sortFunction = optup->oprcode;
2176         ReleaseSysCache(tuple);
2177
2178         Assert(RegProcedureIsValid(*sortFunction));
2179 }
2180
2181 /*
2182  * Inline-able copy of FunctionCall2() to save some cycles in sorting.
2183  */
2184 static inline Datum
2185 myFunctionCall2(FmgrInfo *flinfo, Datum arg1, Datum arg2)
2186 {
2187         FunctionCallInfoData fcinfo;
2188         Datum           result;
2189
2190         InitFunctionCallInfoData(fcinfo, flinfo, 2, NULL, NULL);
2191
2192         fcinfo.arg[0] = arg1;
2193         fcinfo.arg[1] = arg2;
2194         fcinfo.argnull[0] = false;
2195         fcinfo.argnull[1] = false;
2196
2197         result = FunctionCallInvoke(&fcinfo);
2198
2199         /* Check for null result, since caller is clearly not expecting one */
2200         if (fcinfo.isnull)
2201                 elog(ERROR, "function %u returned NULL", fcinfo.flinfo->fn_oid);
2202
2203         return result;
2204 }
2205
2206 /*
2207  * Apply a sort function (by now converted to fmgr lookup form)
2208  * and return a 3-way comparison result.  This takes care of handling
2209  * NULLs and sort ordering direction properly.
2210  */
2211 static inline int32
2212 inlineApplySortFunction(FmgrInfo *sortFunction, SortFunctionKind kind,
2213                                                 Datum datum1, bool isNull1,
2214                                                 Datum datum2, bool isNull2)
2215 {
2216         switch (kind)
2217         {
2218                 case SORTFUNC_LT:
2219                         if (isNull1)
2220                         {
2221                                 if (isNull2)
2222                                         return 0;
2223                                 return 1;               /* NULL sorts after non-NULL */
2224                         }
2225                         if (isNull2)
2226                                 return -1;
2227                         if (DatumGetBool(myFunctionCall2(sortFunction, datum1, datum2)))
2228                                 return -1;              /* a < b */
2229                         if (DatumGetBool(myFunctionCall2(sortFunction, datum2, datum1)))
2230                                 return 1;               /* a > b */
2231                         return 0;
2232
2233                 case SORTFUNC_REVLT:
2234                         /* We reverse the ordering of NULLs, but not the operator */
2235                         if (isNull1)
2236                         {
2237                                 if (isNull2)
2238                                         return 0;
2239                                 return -1;              /* NULL sorts before non-NULL */
2240                         }
2241                         if (isNull2)
2242                                 return 1;
2243                         if (DatumGetBool(myFunctionCall2(sortFunction, datum1, datum2)))
2244                                 return -1;              /* a < b */
2245                         if (DatumGetBool(myFunctionCall2(sortFunction, datum2, datum1)))
2246                                 return 1;               /* a > b */
2247                         return 0;
2248
2249                 case SORTFUNC_CMP:
2250                         if (isNull1)
2251                         {
2252                                 if (isNull2)
2253                                         return 0;
2254                                 return 1;               /* NULL sorts after non-NULL */
2255                         }
2256                         if (isNull2)
2257                                 return -1;
2258                         return DatumGetInt32(myFunctionCall2(sortFunction,
2259                                                                                                  datum1, datum2));
2260
2261                 case SORTFUNC_REVCMP:
2262                         if (isNull1)
2263                         {
2264                                 if (isNull2)
2265                                         return 0;
2266                                 return -1;              /* NULL sorts before non-NULL */
2267                         }
2268                         if (isNull2)
2269                                 return 1;
2270                         return -DatumGetInt32(myFunctionCall2(sortFunction,
2271                                                                                                   datum1, datum2));
2272
2273                 default:
2274                         elog(ERROR, "unrecognized SortFunctionKind: %d", (int) kind);
2275                         return 0;                       /* can't get here, but keep compiler quiet */
2276         }
2277 }
2278
2279 /*
2280  * Non-inline ApplySortFunction() --- this is needed only to conform to
2281  * C99's brain-dead notions about how to implement inline functions...
2282  */
2283 int32
2284 ApplySortFunction(FmgrInfo *sortFunction, SortFunctionKind kind,
2285                                   Datum datum1, bool isNull1,
2286                                   Datum datum2, bool isNull2)
2287 {
2288         return inlineApplySortFunction(sortFunction, kind,
2289                                                                    datum1, isNull1,
2290                                                                    datum2, isNull2);
2291 }
2292
2293
2294 /*
2295  * Routines specialized for HeapTuple (actually MinimalTuple) case
2296  */
2297
2298 static int
2299 comparetup_heap(const SortTuple *a, const SortTuple *b, Tuplesortstate *state)
2300 {
2301         ScanKey         scanKey = state->scanKeys;
2302         HeapTupleData ltup;
2303         HeapTupleData rtup;
2304         TupleDesc       tupDesc;
2305         int                     nkey;
2306         int32           compare;
2307
2308         /* Allow interrupting long sorts */
2309         CHECK_FOR_INTERRUPTS();
2310
2311         /* Compare the leading sort key */
2312         compare = inlineApplySortFunction(&scanKey->sk_func,
2313                                                                           state->sortFnKinds[0],
2314                                                                           a->datum1, a->isnull1,
2315                                                                           b->datum1, b->isnull1);
2316         if (compare != 0)
2317                 return compare;
2318
2319         /* Compare additional sort keys */
2320         ltup.t_len = ((MinimalTuple) a->tuple)->t_len + MINIMAL_TUPLE_OFFSET;
2321         ltup.t_data = (HeapTupleHeader) ((char *) a->tuple - MINIMAL_TUPLE_OFFSET);
2322         rtup.t_len = ((MinimalTuple) b->tuple)->t_len + MINIMAL_TUPLE_OFFSET;
2323         rtup.t_data = (HeapTupleHeader) ((char *) b->tuple - MINIMAL_TUPLE_OFFSET);
2324         tupDesc = state->tupDesc;
2325         scanKey++;
2326         for (nkey = 1; nkey < state->nKeys; nkey++, scanKey++)
2327         {
2328                 AttrNumber      attno = scanKey->sk_attno;
2329                 Datum           datum1,
2330                                         datum2;
2331                 bool            isnull1,
2332                                         isnull2;
2333
2334                 datum1 = heap_getattr(&ltup, attno, tupDesc, &isnull1);
2335                 datum2 = heap_getattr(&rtup, attno, tupDesc, &isnull2);
2336
2337                 compare = inlineApplySortFunction(&scanKey->sk_func,
2338                                                                                   state->sortFnKinds[nkey],
2339                                                                                   datum1, isnull1,
2340                                                                                   datum2, isnull2);
2341                 if (compare != 0)
2342                         return compare;
2343         }
2344
2345         return 0;
2346 }
2347
2348 static void
2349 copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup)
2350 {
2351         /*
2352          * We expect the passed "tup" to be a TupleTableSlot, and form a
2353          * MinimalTuple using the exported interface for that.
2354          */
2355         TupleTableSlot *slot = (TupleTableSlot *) tup;
2356         MinimalTuple tuple;
2357         HeapTupleData htup;
2358
2359         /* copy the tuple into sort storage */
2360         tuple = ExecCopySlotMinimalTuple(slot);
2361         stup->tuple = (void *) tuple;
2362         USEMEM(state, GetMemoryChunkSpace(tuple));
2363         /* set up first-column key value */
2364         htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET;
2365         htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET);
2366         stup->datum1 = heap_getattr(&htup,
2367                                                                 state->scanKeys[0].sk_attno,
2368                                                                 state->tupDesc,
2369                                                                 &stup->isnull1);
2370 }
2371
2372 /*
2373  * Since MinimalTuple already has length in its first word, we don't need
2374  * to write that separately.
2375  */
2376 static void
2377 writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup)
2378 {
2379         MinimalTuple tuple = (MinimalTuple) stup->tuple;
2380         unsigned int tuplen = tuple->t_len;
2381
2382         LogicalTapeWrite(state->tapeset, tapenum,
2383                                          (void *) tuple, tuplen);
2384         if (state->randomAccess)        /* need trailing length word? */
2385                 LogicalTapeWrite(state->tapeset, tapenum,
2386                                                  (void *) &tuplen, sizeof(tuplen));
2387
2388         FREEMEM(state, GetMemoryChunkSpace(tuple));
2389         heap_free_minimal_tuple(tuple);
2390 }
2391
2392 static void
2393 readtup_heap(Tuplesortstate *state, SortTuple *stup,
2394                          int tapenum, unsigned int len)
2395 {
2396         MinimalTuple tuple = (MinimalTuple) palloc(len);
2397         unsigned int tuplen;
2398         HeapTupleData htup;
2399
2400         USEMEM(state, GetMemoryChunkSpace(tuple));
2401         /* read in the tuple proper */
2402         tuple->t_len = len;
2403         if (LogicalTapeRead(state->tapeset, tapenum,
2404                                                 (void *) ((char *) tuple + sizeof(int)),
2405                                                 len - sizeof(int)) != (size_t) (len - sizeof(int)))
2406                 elog(ERROR, "unexpected end of data");
2407         if (state->randomAccess)        /* need trailing length word? */
2408                 if (LogicalTapeRead(state->tapeset, tapenum, (void *) &tuplen,
2409                                                         sizeof(tuplen)) != sizeof(tuplen))
2410                         elog(ERROR, "unexpected end of data");
2411         stup->tuple = (void *) tuple;
2412         /* set up first-column key value */
2413         htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET;
2414         htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET);
2415         stup->datum1 = heap_getattr(&htup,
2416                                                                 state->scanKeys[0].sk_attno,
2417                                                                 state->tupDesc,
2418                                                                 &stup->isnull1);
2419 }
2420
2421
2422 /*
2423  * Routines specialized for IndexTuple case
2424  *
2425  * NOTE: actually, these are specialized for the btree case; it's not
2426  * clear whether you could use them for a non-btree index.      Possibly
2427  * you'd need to make another set of routines if you needed to sort
2428  * according to another kind of index.
2429  */
2430
2431 static int
2432 comparetup_index(const SortTuple *a, const SortTuple *b, Tuplesortstate *state)
2433 {
2434         /*
2435          * This is similar to _bt_tuplecompare(), but we have already done the
2436          * index_getattr calls for the first column, and we need to keep track of
2437          * whether any null fields are present.  Also see the special treatment
2438          * for equal keys at the end.
2439          */
2440         ScanKey         scanKey = state->indexScanKey;
2441         IndexTuple      tuple1;
2442         IndexTuple      tuple2;
2443         int                     keysz;
2444         TupleDesc       tupDes;
2445         bool            equal_hasnull = false;
2446         int                     nkey;
2447         int32           compare;
2448
2449         /* Allow interrupting long sorts */
2450         CHECK_FOR_INTERRUPTS();
2451
2452         /* Compare the leading sort key */
2453         compare = inlineApplySortFunction(&scanKey->sk_func,
2454                                                                           SORTFUNC_CMP,
2455                                                                           a->datum1, a->isnull1,
2456                                                                           b->datum1, b->isnull1);
2457         if (compare != 0)
2458                 return compare;
2459
2460         /* they are equal, so we only need to examine one null flag */
2461         if (a->isnull1)
2462                 equal_hasnull = true;
2463
2464         /* Compare additional sort keys */
2465         tuple1 = (IndexTuple) a->tuple;
2466         tuple2 = (IndexTuple) b->tuple;
2467         keysz = state->nKeys;
2468         tupDes = RelationGetDescr(state->indexRel);
2469         scanKey++;
2470         for (nkey = 2; nkey <= keysz; nkey++, scanKey++)
2471         {
2472                 Datum           datum1,
2473                                         datum2;
2474                 bool            isnull1,
2475                                         isnull2;
2476
2477                 datum1 = index_getattr(tuple1, nkey, tupDes, &isnull1);
2478                 datum2 = index_getattr(tuple2, nkey, tupDes, &isnull2);
2479
2480                 /* see comments about NULLs handling in btbuild */
2481
2482                 /* the comparison function is always of CMP type */
2483                 compare = inlineApplySortFunction(&scanKey->sk_func,
2484                                                                                   SORTFUNC_CMP,
2485                                                                                   datum1, isnull1,
2486                                                                                   datum2, isnull2);
2487
2488                 if (compare != 0)
2489                         return compare;         /* done when we find unequal attributes */
2490
2491                 /* they are equal, so we only need to examine one null flag */
2492                 if (isnull1)
2493                         equal_hasnull = true;
2494         }
2495
2496         /*
2497          * If btree has asked us to enforce uniqueness, complain if two equal
2498          * tuples are detected (unless there was at least one NULL field).
2499          *
2500          * It is sufficient to make the test here, because if two tuples are equal
2501          * they *must* get compared at some stage of the sort --- otherwise the
2502          * sort algorithm wouldn't have checked whether one must appear before the
2503          * other.
2504          *
2505          * Some rather brain-dead implementations of qsort will sometimes call the
2506          * comparison routine to compare a value to itself.  (At this writing only
2507          * QNX 4 is known to do such silly things; we don't support QNX anymore,
2508          * but perhaps the behavior still exists elsewhere.)  Don't raise a bogus
2509          * error in that case.
2510          */
2511         if (state->enforceUnique && !equal_hasnull && tuple1 != tuple2)
2512                 ereport(ERROR,
2513                                 (errcode(ERRCODE_UNIQUE_VIOLATION),
2514                                  errmsg("could not create unique index"),
2515                                  errdetail("Table contains duplicated values.")));
2516
2517         /*
2518          * If key values are equal, we sort on ItemPointer.  This does not affect
2519          * validity of the finished index, but it offers cheap insurance against
2520          * performance problems with bad qsort implementations that have trouble
2521          * with large numbers of equal keys.
2522          */
2523         {
2524                 BlockNumber blk1 = ItemPointerGetBlockNumber(&tuple1->t_tid);
2525                 BlockNumber blk2 = ItemPointerGetBlockNumber(&tuple2->t_tid);
2526
2527                 if (blk1 != blk2)
2528                         return (blk1 < blk2) ? -1 : 1;
2529         }
2530         {
2531                 OffsetNumber pos1 = ItemPointerGetOffsetNumber(&tuple1->t_tid);
2532                 OffsetNumber pos2 = ItemPointerGetOffsetNumber(&tuple2->t_tid);
2533
2534                 if (pos1 != pos2)
2535                         return (pos1 < pos2) ? -1 : 1;
2536         }
2537
2538         return 0;
2539 }
2540
2541 static void
2542 copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup)
2543 {
2544         IndexTuple      tuple = (IndexTuple) tup;
2545         unsigned int tuplen = IndexTupleSize(tuple);
2546         IndexTuple      newtuple;
2547
2548         /* copy the tuple into sort storage */
2549         newtuple = (IndexTuple) palloc(tuplen);
2550         memcpy(newtuple, tuple, tuplen);
2551         USEMEM(state, GetMemoryChunkSpace(newtuple));
2552         stup->tuple = (void *) newtuple;
2553         /* set up first-column key value */
2554         stup->datum1 = index_getattr(newtuple,
2555                                                                  1,
2556                                                                  RelationGetDescr(state->indexRel),
2557                                                                  &stup->isnull1);
2558 }
2559
2560 static void
2561 writetup_index(Tuplesortstate *state, int tapenum, SortTuple *stup)
2562 {
2563         IndexTuple      tuple = (IndexTuple) stup->tuple;
2564         unsigned int tuplen;
2565
2566         tuplen = IndexTupleSize(tuple) + sizeof(tuplen);
2567         LogicalTapeWrite(state->tapeset, tapenum,
2568                                          (void *) &tuplen, sizeof(tuplen));
2569         LogicalTapeWrite(state->tapeset, tapenum,
2570                                          (void *) tuple, IndexTupleSize(tuple));
2571         if (state->randomAccess)        /* need trailing length word? */
2572                 LogicalTapeWrite(state->tapeset, tapenum,
2573                                                  (void *) &tuplen, sizeof(tuplen));
2574
2575         FREEMEM(state, GetMemoryChunkSpace(tuple));
2576         pfree(tuple);
2577 }
2578
2579 static void
2580 readtup_index(Tuplesortstate *state, SortTuple *stup,
2581                           int tapenum, unsigned int len)
2582 {
2583         unsigned int tuplen = len - sizeof(unsigned int);
2584         IndexTuple      tuple = (IndexTuple) palloc(tuplen);
2585
2586         USEMEM(state, GetMemoryChunkSpace(tuple));
2587         if (LogicalTapeRead(state->tapeset, tapenum, (void *) tuple,
2588                                                 tuplen) != tuplen)
2589                 elog(ERROR, "unexpected end of data");
2590         if (state->randomAccess)        /* need trailing length word? */
2591                 if (LogicalTapeRead(state->tapeset, tapenum, (void *) &tuplen,
2592                                                         sizeof(tuplen)) != sizeof(tuplen))
2593                         elog(ERROR, "unexpected end of data");
2594         stup->tuple = (void *) tuple;
2595         /* set up first-column key value */
2596         stup->datum1 = index_getattr(tuple,
2597                                                                  1,
2598                                                                  RelationGetDescr(state->indexRel),
2599                                                                  &stup->isnull1);
2600 }
2601
2602
2603 /*
2604  * Routines specialized for DatumTuple case
2605  */
2606
2607 static int
2608 comparetup_datum(const SortTuple *a, const SortTuple *b, Tuplesortstate *state)
2609 {
2610         /* Allow interrupting long sorts */
2611         CHECK_FOR_INTERRUPTS();
2612
2613         return inlineApplySortFunction(&state->sortOpFn, state->sortFnKind,
2614                                                                    a->datum1, a->isnull1,
2615                                                                    b->datum1, b->isnull1);
2616 }
2617
2618 static void
2619 copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup)
2620 {
2621         /* Not currently needed */
2622         elog(ERROR, "copytup_datum() should not be called");
2623 }
2624
2625 static void
2626 writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup)
2627 {
2628         void       *waddr;
2629         unsigned int tuplen;
2630         unsigned int writtenlen;
2631
2632         if (stup->isnull1)
2633         {
2634                 waddr = NULL;
2635                 tuplen = 0;
2636         }
2637         else if (state->datumTypeByVal)
2638         {
2639                 waddr = &stup->datum1;
2640                 tuplen = sizeof(Datum);
2641         }
2642         else
2643         {
2644                 waddr = DatumGetPointer(stup->datum1);
2645                 tuplen = datumGetSize(stup->datum1, false, state->datumTypeLen);
2646                 Assert(tuplen != 0);
2647         }
2648
2649         writtenlen = tuplen + sizeof(unsigned int);
2650
2651         LogicalTapeWrite(state->tapeset, tapenum,
2652                                          (void *) &writtenlen, sizeof(writtenlen));
2653         LogicalTapeWrite(state->tapeset, tapenum,
2654                                          waddr, tuplen);
2655         if (state->randomAccess)        /* need trailing length word? */
2656                 LogicalTapeWrite(state->tapeset, tapenum,
2657                                                  (void *) &writtenlen, sizeof(writtenlen));
2658
2659         if (stup->tuple)
2660         {
2661                 FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
2662                 pfree(stup->tuple);
2663         }
2664 }
2665
2666 static void
2667 readtup_datum(Tuplesortstate *state, SortTuple *stup,
2668                           int tapenum, unsigned int len)
2669 {
2670         unsigned int tuplen = len - sizeof(unsigned int);
2671
2672         if (tuplen == 0)
2673         {
2674                 /* it's NULL */
2675                 stup->datum1 = (Datum) 0;
2676                 stup->isnull1 = true;
2677                 stup->tuple = NULL;
2678         }
2679         else if (state->datumTypeByVal)
2680         {
2681                 Assert(tuplen == sizeof(Datum));
2682                 if (LogicalTapeRead(state->tapeset, tapenum, (void *) &stup->datum1,
2683                                                         tuplen) != tuplen)
2684                         elog(ERROR, "unexpected end of data");
2685                 stup->isnull1 = false;
2686                 stup->tuple = NULL;
2687         }
2688         else
2689         {
2690                 void       *raddr = palloc(tuplen);
2691
2692                 if (LogicalTapeRead(state->tapeset, tapenum, raddr,
2693                                                         tuplen) != tuplen)
2694                         elog(ERROR, "unexpected end of data");
2695                 stup->datum1 = PointerGetDatum(raddr);
2696                 stup->isnull1 = false;
2697                 stup->tuple = raddr;
2698                 USEMEM(state, GetMemoryChunkSpace(raddr));
2699         }
2700
2701         if (state->randomAccess)        /* need trailing length word? */
2702                 if (LogicalTapeRead(state->tapeset, tapenum, (void *) &tuplen,
2703                                                         sizeof(tuplen)) != sizeof(tuplen))
2704                         elog(ERROR, "unexpected end of data");
2705 }