]> granicus.if.org Git - postgresql/blob - src/backend/utils/sort/tuplesort.c
Add a trace_sort option to help with measuring resource usage of external
[postgresql] / src / backend / utils / sort / tuplesort.c
1 /*-------------------------------------------------------------------------
2  *
3  * tuplesort.c
4  *        Generalized tuple sorting routines.
5  *
6  * This module handles sorting of heap tuples, index tuples, or single
7  * Datums (and could easily support other kinds of sortable objects,
8  * if necessary).  It works efficiently for both small and large amounts
9  * of data.  Small amounts are sorted in-memory using qsort().  Large
10  * amounts are sorted using temporary files and a standard external sort
11  * algorithm.
12  *
13  * See Knuth, volume 3, for more than you want to know about the external
14  * sorting algorithm.  We divide the input into sorted runs using replacement
15  * selection, in the form of a priority tree implemented as a heap
16  * (essentially his Algorithm 5.2.3H), then merge the runs using polyphase
17  * merge, Knuth's Algorithm 5.4.2D.  The logical "tapes" used by Algorithm D
18  * are implemented by logtape.c, which avoids space wastage by recycling
19  * disk space as soon as each block is read from its "tape".
20  *
21  * We do not form the initial runs using Knuth's recommended replacement
22  * selection data structure (Algorithm 5.4.1R), because it uses a fixed
23  * number of records in memory at all times.  Since we are dealing with
24  * tuples that may vary considerably in size, we want to be able to vary
25  * the number of records kept in memory to ensure full utilization of the
26  * allowed sort memory space.  So, we keep the tuples in a variable-size
27  * heap, with the next record to go out at the top of the heap.  Like
28  * Algorithm 5.4.1R, each record is stored with the run number that it
29  * must go into, and we use (run number, key) as the ordering key for the
30  * heap.  When the run number at the top of the heap changes, we know that
31  * no more records of the prior run are left in the heap.
32  *
33  * The approximate amount of memory allowed for any one sort operation
34  * is specified in kilobytes by the caller (most pass work_mem).  Initially,
35  * we absorb tuples and simply store them in an unsorted array as long as
36  * we haven't exceeded workMem.  If we reach the end of the input without
37  * exceeding workMem, we sort the array using qsort() and subsequently return
38  * tuples just by scanning the tuple array sequentially.  If we do exceed
39  * workMem, we construct a heap using Algorithm H and begin to emit tuples
40  * into sorted runs in temporary tapes, emitting just enough tuples at each
41  * step to get back within the workMem limit.  Whenever the run number at
42  * the top of the heap changes, we begin a new run with a new output tape
43  * (selected per Algorithm D).  After the end of the input is reached,
44  * we dump out remaining tuples in memory into a final run (or two),
45  * then merge the runs using Algorithm D.
46  *
47  * When merging runs, we use a heap containing just the frontmost tuple from
48  * each source run; we repeatedly output the smallest tuple and insert the
49  * next tuple from its source tape (if any).  When the heap empties, the merge
50  * is complete.  The basic merge algorithm thus needs very little memory ---
51  * only M tuples for an M-way merge, and M is at most six in the present code.
52  * However, we can still make good use of our full workMem allocation by
53  * pre-reading additional tuples from each source tape.  Without prereading,
54  * our access pattern to the temporary file would be very erratic; on average
55  * we'd read one block from each of M source tapes during the same time that
56  * we're writing M blocks to the output tape, so there is no sequentiality of
57  * access at all, defeating the read-ahead methods used by most Unix kernels.
58  * Worse, the output tape gets written into a very random sequence of blocks
59  * of the temp file, ensuring that things will be even worse when it comes
60  * time to read that tape.      A straightforward merge pass thus ends up doing a
61  * lot of waiting for disk seeks.  We can improve matters by prereading from
62  * each source tape sequentially, loading about workMem/M bytes from each tape
63  * in turn.  Then we run the merge algorithm, writing but not reading until
64  * one of the preloaded tuple series runs out.  Then we switch back to preread
65  * mode, fill memory again, and repeat.  This approach helps to localize both
66  * read and write accesses.
67  *
68  * When the caller requests random access to the sort result, we form
69  * the final sorted run on a logical tape which is then "frozen", so
70  * that we can access it randomly.      When the caller does not need random
71  * access, we return from tuplesort_performsort() as soon as we are down
72  * to one run per logical tape.  The final merge is then performed
73  * on-the-fly as the caller repeatedly calls tuplesort_gettuple; this
74  * saves one cycle of writing all the data out to disk and reading it in.
75  *
76  *
77  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
78  * Portions Copyright (c) 1994, Regents of the University of California
79  *
80  * IDENTIFICATION
81  *        $PostgreSQL: pgsql/src/backend/utils/sort/tuplesort.c,v 1.51 2005/10/03 22:55:54 tgl Exp $
82  *
83  *-------------------------------------------------------------------------
84  */
85
86 #include "postgres.h"
87
88 #include "access/heapam.h"
89 #include "access/nbtree.h"
90 #include "catalog/pg_amop.h"
91 #include "catalog/pg_operator.h"
92 #include "miscadmin.h"
93 #include "utils/catcache.h"
94 #include "utils/datum.h"
95 #include "utils/logtape.h"
96 #include "utils/lsyscache.h"
97 #include "utils/memutils.h"
98 #include "utils/pg_rusage.h"
99 #include "utils/syscache.h"
100 #include "utils/tuplesort.h"
101
102
103 /* GUC variable */
104 #ifdef TRACE_SORT
105 bool                    trace_sort = false;
106 #endif
107
108
109 /*
110  * Possible states of a Tuplesort object.  These denote the states that
111  * persist between calls of Tuplesort routines.
112  */
113 typedef enum
114 {
115         TSS_INITIAL,                            /* Loading tuples; still within memory
116                                                                  * limit */
117         TSS_BUILDRUNS,                          /* Loading tuples; writing to tape */
118         TSS_SORTEDINMEM,                        /* Sort completed entirely in memory */
119         TSS_SORTEDONTAPE,                       /* Sort completed, final run is on tape */
120         TSS_FINALMERGE                          /* Performing final merge on-the-fly */
121 } TupSortStatus;
122
123 /*
124  * We use a seven-tape polyphase merge, which is the "sweet spot" on the
125  * tapes-to-passes curve according to Knuth's figure 70 (section 5.4.2).
126  */
127 #define MAXTAPES                7               /* Knuth's T */
128 #define TAPERANGE               (MAXTAPES-1)    /* Knuth's P */
129
130 /*
131  * Private state of a Tuplesort operation.
132  */
133 struct Tuplesortstate
134 {
135         TupSortStatus status;           /* enumerated value as shown above */
136         bool            randomAccess;   /* did caller request random access? */
137         long            availMem;               /* remaining memory available, in bytes */
138         LogicalTapeSet *tapeset;        /* logtape.c object for tapes in a temp
139                                                                  * file */
140
141         /*
142          * These function pointers decouple the routines that must know what
143          * kind of tuple we are sorting from the routines that don't need to
144          * know it. They are set up by the tuplesort_begin_xxx routines.
145          *
146          * Function to compare two tuples; result is per qsort() convention, ie:
147          *
148          * <0, 0, >0 according as a<b, a=b, a>b.
149          */
150         int                     (*comparetup) (Tuplesortstate *state, const void *a, const void *b);
151
152         /*
153          * Function to copy a supplied input tuple into palloc'd space. (NB:
154          * we assume that a single pfree() is enough to release the tuple
155          * later, so the representation must be "flat" in one palloc chunk.)
156          * state->availMem must be decreased by the amount of space used.
157          */
158         void       *(*copytup) (Tuplesortstate *state, void *tup);
159
160         /*
161          * Function to write a stored tuple onto tape.  The representation of
162          * the tuple on tape need not be the same as it is in memory;
163          * requirements on the tape representation are given below.  After
164          * writing the tuple, pfree() it, and increase state->availMem by the
165          * amount of memory space thereby released.
166          */
167         void            (*writetup) (Tuplesortstate *state, int tapenum, void *tup);
168
169         /*
170          * Function to read a stored tuple from tape back into memory. 'len'
171          * is the already-read length of the stored tuple.      Create and return
172          * a palloc'd copy, and decrease state->availMem by the amount of
173          * memory space consumed.
174          */
175         void       *(*readtup) (Tuplesortstate *state, int tapenum, unsigned int len);
176
177         /*
178          * This array holds pointers to tuples in sort memory.  If we are in
179          * state INITIAL, the tuples are in no particular order; if we are in
180          * state SORTEDINMEM, the tuples are in final sorted order; in states
181          * BUILDRUNS and FINALMERGE, the tuples are organized in "heap" order
182          * per Algorithm H.  (Note that memtupcount only counts the tuples
183          * that are part of the heap --- during merge passes, memtuples[]
184          * entries beyond TAPERANGE are never in the heap and are used to hold
185          * pre-read tuples.)  In state SORTEDONTAPE, the array is not used.
186          */
187         void      **memtuples;          /* array of pointers to palloc'd tuples */
188         int                     memtupcount;    /* number of tuples currently present */
189         int                     memtupsize;             /* allocated length of memtuples array */
190
191         /*
192          * While building initial runs, this array holds the run number for
193          * each tuple in memtuples[].  During merge passes, we re-use it to
194          * hold the input tape number that each tuple in the heap was read
195          * from, or to hold the index of the next tuple pre-read from the same
196          * tape in the case of pre-read entries.  This array is never
197          * allocated unless we need to use tapes.  Whenever it is allocated,
198          * it has the same length as memtuples[].
199          */
200         int                *memtupindex;        /* index value associated with
201                                                                  * memtuples[i] */
202
203         /*
204          * While building initial runs, this is the current output run number
205          * (starting at 0).  Afterwards, it is the number of initial runs we
206          * made.
207          */
208         int                     currentRun;
209
210         /*
211          * These variables are only used during merge passes.  mergeactive[i]
212          * is true if we are reading an input run from (actual) tape number i
213          * and have not yet exhausted that run.  mergenext[i] is the memtuples
214          * index of the next pre-read tuple (next to be loaded into the heap)
215          * for tape i, or 0 if we are out of pre-read tuples.  mergelast[i]
216          * similarly points to the last pre-read tuple from each tape.
217          * mergeavailmem[i] is the amount of unused space allocated for tape
218          * i. mergefreelist and mergefirstfree keep track of unused locations
219          * in the memtuples[] array.  memtupindex[] links together pre-read
220          * tuples for each tape as well as recycled locations in
221          * mergefreelist. It is OK to use 0 as a null link in these lists,
222          * because memtuples[0] is part of the merge heap and is never a
223          * pre-read tuple.
224          */
225         bool            mergeactive[MAXTAPES];  /* Active input run source? */
226         int                     mergenext[MAXTAPES];    /* first preread tuple for each
227                                                                                  * source */
228         int                     mergelast[MAXTAPES];    /* last preread tuple for each
229                                                                                  * source */
230         long            mergeavailmem[MAXTAPES];                /* availMem for prereading
231                                                                                                  * tapes */
232         long            spacePerTape;   /* actual per-tape target usage */
233         int                     mergefreelist;  /* head of freelist of recycled slots */
234         int                     mergefirstfree; /* first slot never used in this merge */
235
236         /*
237          * Variables for Algorithm D.  Note that destTape is a "logical" tape
238          * number, ie, an index into the tp_xxx[] arrays.  Be careful to keep
239          * "logical" and "actual" tape numbers straight!
240          */
241         int                     Level;                  /* Knuth's l */
242         int                     destTape;               /* current output tape (Knuth's j, less 1) */
243         int                     tp_fib[MAXTAPES];               /* Target Fibonacci run counts
244                                                                                  * (A[]) */
245         int                     tp_runs[MAXTAPES];              /* # of real runs on each tape */
246         int                     tp_dummy[MAXTAPES];             /* # of dummy runs for each tape
247                                                                                  * (D[]) */
248         int                     tp_tapenum[MAXTAPES];   /* Actual tape numbers (TAPE[]) */
249
250         /*
251          * These variables are used after completion of sorting to keep track
252          * of the next tuple to return.  (In the tape case, the tape's current
253          * read position is also critical state.)
254          */
255         int                     result_tape;    /* actual tape number of finished output */
256         int                     current;                /* array index (only used if SORTEDINMEM) */
257         bool            eof_reached;    /* reached EOF (needed for cursors) */
258
259         /* markpos_xxx holds marked position for mark and restore */
260         long            markpos_block;  /* tape block# (only used if SORTEDONTAPE) */
261         int                     markpos_offset; /* saved "current", or offset in tape
262                                                                  * block */
263         bool            markpos_eof;    /* saved "eof_reached" */
264
265         /*
266          * These variables are specific to the HeapTuple case; they are set by
267          * tuplesort_begin_heap and used only by the HeapTuple routines.
268          */
269         TupleDesc       tupDesc;
270         int                     nKeys;
271         ScanKey         scanKeys;
272         SortFunctionKind *sortFnKinds;
273
274         /*
275          * These variables are specific to the IndexTuple case; they are set
276          * by tuplesort_begin_index and used only by the IndexTuple routines.
277          */
278         Relation        indexRel;
279         ScanKey         indexScanKey;
280         bool            enforceUnique;  /* complain if we find duplicate tuples */
281
282         /*
283          * These variables are specific to the Datum case; they are set by
284          * tuplesort_begin_datum and used only by the DatumTuple routines.
285          */
286         Oid                     datumType;
287         Oid                     sortOperator;
288         FmgrInfo        sortOpFn;               /* cached lookup data for sortOperator */
289         SortFunctionKind sortFnKind;
290         /* we need typelen and byval in order to know how to copy the Datums. */
291         int                     datumTypeLen;
292         bool            datumTypeByVal;
293
294         /*
295          * Resource snapshot for time of sort start.
296          */
297 #ifdef TRACE_SORT
298         PGRUsage        ru_start;
299 #endif
300 };
301
302 #define COMPARETUP(state,a,b)   ((*(state)->comparetup) (state, a, b))
303 #define COPYTUP(state,tup)      ((*(state)->copytup) (state, tup))
304 #define WRITETUP(state,tape,tup)        ((*(state)->writetup) (state, tape, tup))
305 #define READTUP(state,tape,len) ((*(state)->readtup) (state, tape, len))
306 #define LACKMEM(state)          ((state)->availMem < 0)
307 #define USEMEM(state,amt)       ((state)->availMem -= (amt))
308 #define FREEMEM(state,amt)      ((state)->availMem += (amt))
309
310 /*--------------------
311  *
312  * NOTES about on-tape representation of tuples:
313  *
314  * We require the first "unsigned int" of a stored tuple to be the total size
315  * on-tape of the tuple, including itself (so it is never zero; an all-zero
316  * unsigned int is used to delimit runs).  The remainder of the stored tuple
317  * may or may not match the in-memory representation of the tuple ---
318  * any conversion needed is the job of the writetup and readtup routines.
319  *
320  * If state->randomAccess is true, then the stored representation of the
321  * tuple must be followed by another "unsigned int" that is a copy of the
322  * length --- so the total tape space used is actually sizeof(unsigned int)
323  * more than the stored length value.  This allows read-backwards.      When
324  * randomAccess is not true, the write/read routines may omit the extra
325  * length word.
326  *
327  * writetup is expected to write both length words as well as the tuple
328  * data.  When readtup is called, the tape is positioned just after the
329  * front length word; readtup must read the tuple data and advance past
330  * the back length word (if present).
331  *
332  * The write/read routines can make use of the tuple description data
333  * stored in the Tuplesortstate record, if needed.      They are also expected
334  * to adjust state->availMem by the amount of memory space (not tape space!)
335  * released or consumed.  There is no error return from either writetup
336  * or readtup; they should ereport() on failure.
337  *
338  *
339  * NOTES about memory consumption calculations:
340  *
341  * We count space allocated for tuples against the workMem limit, plus
342  * the space used by the variable-size arrays memtuples and memtupindex.
343  * Fixed-size space (primarily the LogicalTapeSet I/O buffers) is not
344  * counted.
345  *
346  * Note that we count actual space used (as shown by GetMemoryChunkSpace)
347  * rather than the originally-requested size.  This is important since
348  * palloc can add substantial overhead.  It's not a complete answer since
349  * we won't count any wasted space in palloc allocation blocks, but it's
350  * a lot better than what we were doing before 7.3.
351  *
352  *--------------------
353  */
354
355 /*
356  * For sorting single Datums, we build "pseudo tuples" that just carry
357  * the datum's value and null flag.  For pass-by-reference data types,
358  * the actual data value appears after the DatumTupleHeader (MAXALIGNed,
359  * of course), and the value field in the header is just a pointer to it.
360  */
361
362 typedef struct
363 {
364         Datum           val;
365         bool            isNull;
366 } DatumTuple;
367
368
369 static Tuplesortstate *tuplesort_begin_common(int workMem, bool randomAccess);
370 static void puttuple_common(Tuplesortstate *state, void *tuple);
371 static void inittapes(Tuplesortstate *state);
372 static void selectnewtape(Tuplesortstate *state);
373 static void mergeruns(Tuplesortstate *state);
374 static void mergeonerun(Tuplesortstate *state);
375 static void beginmerge(Tuplesortstate *state);
376 static void mergepreread(Tuplesortstate *state);
377 static void dumptuples(Tuplesortstate *state, bool alltuples);
378 static void tuplesort_heap_insert(Tuplesortstate *state, void *tuple,
379                                           int tupleindex, bool checkIndex);
380 static void tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex);
381 static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
382 static void markrunend(Tuplesortstate *state, int tapenum);
383 static int      qsort_comparetup(const void *a, const void *b);
384 static int comparetup_heap(Tuplesortstate *state,
385                                 const void *a, const void *b);
386 static void *copytup_heap(Tuplesortstate *state, void *tup);
387 static void writetup_heap(Tuplesortstate *state, int tapenum, void *tup);
388 static void *readtup_heap(Tuplesortstate *state, int tapenum,
389                          unsigned int len);
390 static int comparetup_index(Tuplesortstate *state,
391                                  const void *a, const void *b);
392 static void *copytup_index(Tuplesortstate *state, void *tup);
393 static void writetup_index(Tuplesortstate *state, int tapenum, void *tup);
394 static void *readtup_index(Tuplesortstate *state, int tapenum,
395                           unsigned int len);
396 static int comparetup_datum(Tuplesortstate *state,
397                                  const void *a, const void *b);
398 static void *copytup_datum(Tuplesortstate *state, void *tup);
399 static void writetup_datum(Tuplesortstate *state, int tapenum, void *tup);
400 static void *readtup_datum(Tuplesortstate *state, int tapenum,
401                           unsigned int len);
402
403 /*
404  * Since qsort(3) will not pass any context info to qsort_comparetup(),
405  * we have to use this ugly static variable.  It is set to point to the
406  * active Tuplesortstate object just before calling qsort.      It should
407  * not be used directly by anything except qsort_comparetup().
408  */
409 static Tuplesortstate *qsort_tuplesortstate;
410
411
412 /*
413  *              tuplesort_begin_xxx
414  *
415  * Initialize for a tuple sort operation.
416  *
417  * After calling tuplesort_begin, the caller should call tuplesort_puttuple
418  * zero or more times, then call tuplesort_performsort when all the tuples
419  * have been supplied.  After performsort, retrieve the tuples in sorted
420  * order by calling tuplesort_gettuple until it returns NULL.  (If random
421  * access was requested, rescan, markpos, and restorepos can also be called.)
422  * For Datum sorts, putdatum/getdatum are used instead of puttuple/gettuple.
423  * Call tuplesort_end to terminate the operation and release memory/disk space.
424  *
425  * Each variant of tuplesort_begin has a workMem parameter specifying the
426  * maximum number of kilobytes of RAM to use before spilling data to disk.
427  * (The normal value of this parameter is work_mem, but some callers use
428  * other values.)  Each variant also has a randomAccess parameter specifying
429  * whether the caller needs non-sequential access to the sort result.
430  */
431
432 static Tuplesortstate *
433 tuplesort_begin_common(int workMem, bool randomAccess)
434 {
435         Tuplesortstate *state;
436
437         state = (Tuplesortstate *) palloc0(sizeof(Tuplesortstate));
438
439 #ifdef TRACE_SORT
440         if (trace_sort)
441                 pg_rusage_init(&state->ru_start);
442 #endif
443
444         state->status = TSS_INITIAL;
445         state->randomAccess = randomAccess;
446         state->availMem = workMem * 1024L;
447         state->tapeset = NULL;
448
449         state->memtupcount = 0;
450         state->memtupsize = 1024;       /* initial guess */
451         state->memtuples = (void **) palloc(state->memtupsize * sizeof(void *));
452
453         state->memtupindex = NULL;      /* until and unless needed */
454
455         USEMEM(state, GetMemoryChunkSpace(state->memtuples));
456
457         state->currentRun = 0;
458
459         /* Algorithm D variables will be initialized by inittapes, if needed */
460
461         state->result_tape = -1;        /* flag that result tape has not been
462                                                                  * formed */
463
464         return state;
465 }
466
467 Tuplesortstate *
468 tuplesort_begin_heap(TupleDesc tupDesc,
469                                          int nkeys,
470                                          Oid *sortOperators, AttrNumber *attNums,
471                                          int workMem, bool randomAccess)
472 {
473         Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
474         int                     i;
475
476         AssertArg(nkeys > 0);
477
478 #ifdef TRACE_SORT
479         if (trace_sort)
480                 elog(NOTICE,
481                          "begin tuple sort: nkeys = %d, workMem = %d, randomAccess = %c",
482                          nkeys, workMem, randomAccess ? 't' : 'f');
483 #endif
484
485         state->comparetup = comparetup_heap;
486         state->copytup = copytup_heap;
487         state->writetup = writetup_heap;
488         state->readtup = readtup_heap;
489
490         state->tupDesc = tupDesc;
491         state->nKeys = nkeys;
492         state->scanKeys = (ScanKey) palloc0(nkeys * sizeof(ScanKeyData));
493         state->sortFnKinds = (SortFunctionKind *)
494                 palloc0(nkeys * sizeof(SortFunctionKind));
495
496         for (i = 0; i < nkeys; i++)
497         {
498                 RegProcedure sortFunction;
499
500                 AssertArg(sortOperators[i] != 0);
501                 AssertArg(attNums[i] != 0);
502
503                 /* select a function that implements the sort operator */
504                 SelectSortFunction(sortOperators[i], &sortFunction,
505                                                    &state->sortFnKinds[i]);
506
507                 /*
508                  * We needn't fill in sk_strategy or sk_subtype since these
509                  * scankeys will never be passed to an index.
510                  */
511                 ScanKeyInit(&state->scanKeys[i],
512                                         attNums[i],
513                                         InvalidStrategy,
514                                         sortFunction,
515                                         (Datum) 0);
516         }
517
518         return state;
519 }
520
521 Tuplesortstate *
522 tuplesort_begin_index(Relation indexRel,
523                                           bool enforceUnique,
524                                           int workMem, bool randomAccess)
525 {
526         Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
527
528 #ifdef TRACE_SORT
529         if (trace_sort)
530                 elog(NOTICE,
531                          "begin index sort: unique = %c, workMem = %d, randomAccess = %c",
532                          enforceUnique ? 't' : 'f',
533                          workMem, randomAccess ? 't' : 'f');
534 #endif
535
536         state->comparetup = comparetup_index;
537         state->copytup = copytup_index;
538         state->writetup = writetup_index;
539         state->readtup = readtup_index;
540
541         state->indexRel = indexRel;
542         /* see comments below about btree dependence of this code... */
543         state->indexScanKey = _bt_mkscankey_nodata(indexRel);
544         state->enforceUnique = enforceUnique;
545
546         return state;
547 }
548
549 Tuplesortstate *
550 tuplesort_begin_datum(Oid datumType,
551                                           Oid sortOperator,
552                                           int workMem, bool randomAccess)
553 {
554         Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
555         RegProcedure sortFunction;
556         int16           typlen;
557         bool            typbyval;
558
559 #ifdef TRACE_SORT
560         if (trace_sort)
561                 elog(NOTICE,
562                          "begin datum sort: workMem = %d, randomAccess = %c",
563                          workMem, randomAccess ? 't' : 'f');
564 #endif
565
566         state->comparetup = comparetup_datum;
567         state->copytup = copytup_datum;
568         state->writetup = writetup_datum;
569         state->readtup = readtup_datum;
570
571         state->datumType = datumType;
572         state->sortOperator = sortOperator;
573
574         /* select a function that implements the sort operator */
575         SelectSortFunction(sortOperator, &sortFunction, &state->sortFnKind);
576         /* and look up the function */
577         fmgr_info(sortFunction, &state->sortOpFn);
578
579         /* lookup necessary attributes of the datum type */
580         get_typlenbyval(datumType, &typlen, &typbyval);
581         state->datumTypeLen = typlen;
582         state->datumTypeByVal = typbyval;
583
584         return state;
585 }
586
587 /*
588  * tuplesort_end
589  *
590  *      Release resources and clean up.
591  */
592 void
593 tuplesort_end(Tuplesortstate *state)
594 {
595         int                     i;
596
597         if (state->tapeset)
598                 LogicalTapeSetClose(state->tapeset);
599         if (state->memtuples)
600         {
601                 for (i = 0; i < state->memtupcount; i++)
602                         pfree(state->memtuples[i]);
603                 pfree(state->memtuples);
604         }
605         if (state->memtupindex)
606                 pfree(state->memtupindex);
607
608         /*
609          * this stuff might better belong in a variant-specific shutdown
610          * routine
611          */
612         if (state->scanKeys)
613                 pfree(state->scanKeys);
614         if (state->sortFnKinds)
615                 pfree(state->sortFnKinds);
616
617 #ifdef TRACE_SORT
618         if (trace_sort)
619                 elog(NOTICE, "sort ended: %s",
620                          pg_rusage_show(&state->ru_start));
621 #endif
622
623         pfree(state);
624 }
625
626 /*
627  * Accept one tuple while collecting input data for sort.
628  *
629  * Note that the input tuple is always copied; the caller need not save it.
630  */
631 void
632 tuplesort_puttuple(Tuplesortstate *state, void *tuple)
633 {
634         /*
635          * Copy the given tuple into memory we control, and decrease availMem.
636          * Then call the code shared with the Datum case.
637          */
638         tuple = COPYTUP(state, tuple);
639
640         puttuple_common(state, tuple);
641 }
642
643 /*
644  * Accept one Datum while collecting input data for sort.
645  *
646  * If the Datum is pass-by-ref type, the value will be copied.
647  */
648 void
649 tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull)
650 {
651         DatumTuple *tuple;
652
653         /*
654          * Build pseudo-tuple carrying the datum, and decrease availMem.
655          */
656         if (isNull || state->datumTypeByVal)
657         {
658                 tuple = (DatumTuple *) palloc(sizeof(DatumTuple));
659                 tuple->val = val;
660                 tuple->isNull = isNull;
661         }
662         else
663         {
664                 Size            datalen;
665                 Size            tuplelen;
666                 char       *newVal;
667
668                 datalen = datumGetSize(val, false, state->datumTypeLen);
669                 tuplelen = datalen + MAXALIGN(sizeof(DatumTuple));
670                 tuple = (DatumTuple *) palloc(tuplelen);
671                 newVal = ((char *) tuple) + MAXALIGN(sizeof(DatumTuple));
672                 memcpy(newVal, DatumGetPointer(val), datalen);
673                 tuple->val = PointerGetDatum(newVal);
674                 tuple->isNull = false;
675         }
676
677         USEMEM(state, GetMemoryChunkSpace(tuple));
678
679         puttuple_common(state, (void *) tuple);
680 }
681
682 /*
683  * Shared code for tuple and datum cases.
684  */
685 static void
686 puttuple_common(Tuplesortstate *state, void *tuple)
687 {
688         switch (state->status)
689         {
690                 case TSS_INITIAL:
691
692                         /*
693                          * Save the copied tuple into the unsorted array.
694                          */
695                         if (state->memtupcount >= state->memtupsize)
696                         {
697                                 /* Grow the unsorted array as needed. */
698                                 FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
699                                 state->memtupsize *= 2;
700                                 state->memtuples = (void **)
701                                         repalloc(state->memtuples,
702                                                          state->memtupsize * sizeof(void *));
703                                 USEMEM(state, GetMemoryChunkSpace(state->memtuples));
704                         }
705                         state->memtuples[state->memtupcount++] = tuple;
706
707                         /*
708                          * Done if we still fit in available memory.
709                          */
710                         if (!LACKMEM(state))
711                                 return;
712
713                         /*
714                          * Nope; time to switch to tape-based operation.
715                          */
716                         inittapes(state);
717
718                         /*
719                          * Dump tuples until we are back under the limit.
720                          */
721                         dumptuples(state, false);
722                         break;
723                 case TSS_BUILDRUNS:
724
725                         /*
726                          * Insert the copied tuple into the heap, with run number
727                          * currentRun if it can go into the current run, else run
728                          * number currentRun+1.  The tuple can go into the current run
729                          * if it is >= the first not-yet-output tuple.  (Actually, it
730                          * could go into the current run if it is >= the most recently
731                          * output tuple ... but that would require keeping around the
732                          * tuple we last output, and it's simplest to let writetup
733                          * free each tuple as soon as it's written.)
734                          *
735                          * Note there will always be at least one tuple in the heap at
736                          * this point; see dumptuples.
737                          */
738                         Assert(state->memtupcount > 0);
739                         if (COMPARETUP(state, tuple, state->memtuples[0]) >= 0)
740                                 tuplesort_heap_insert(state, tuple, state->currentRun, true);
741                         else
742                                 tuplesort_heap_insert(state, tuple, state->currentRun + 1, true);
743
744                         /*
745                          * If we are over the memory limit, dump tuples till we're
746                          * under.
747                          */
748                         dumptuples(state, false);
749                         break;
750                 default:
751                         elog(ERROR, "invalid tuplesort state");
752                         break;
753         }
754 }
755
756 /*
757  * All tuples have been provided; finish the sort.
758  */
759 void
760 tuplesort_performsort(Tuplesortstate *state)
761 {
762 #ifdef TRACE_SORT
763         if (trace_sort)
764                 elog(NOTICE, "performsort starting: %s",
765                          pg_rusage_show(&state->ru_start));
766 #endif
767
768         switch (state->status)
769         {
770                 case TSS_INITIAL:
771
772                         /*
773                          * We were able to accumulate all the tuples within the
774                          * allowed amount of memory.  Just qsort 'em and we're done.
775                          */
776                         if (state->memtupcount > 1)
777                         {
778                                 qsort_tuplesortstate = state;
779                                 qsort((void *) state->memtuples, state->memtupcount,
780                                           sizeof(void *), qsort_comparetup);
781                         }
782                         state->current = 0;
783                         state->eof_reached = false;
784                         state->markpos_offset = 0;
785                         state->markpos_eof = false;
786                         state->status = TSS_SORTEDINMEM;
787                         break;
788                 case TSS_BUILDRUNS:
789
790                         /*
791                          * Finish tape-based sort.      First, flush all tuples remaining
792                          * in memory out to tape; then merge until we have a single
793                          * remaining run (or, if !randomAccess, one run per tape).
794                          * Note that mergeruns sets the correct state->status.
795                          */
796                         dumptuples(state, true);
797                         mergeruns(state);
798                         state->eof_reached = false;
799                         state->markpos_block = 0L;
800                         state->markpos_offset = 0;
801                         state->markpos_eof = false;
802                         break;
803                 default:
804                         elog(ERROR, "invalid tuplesort state");
805                         break;
806         }
807
808 #ifdef TRACE_SORT
809         if (trace_sort)
810                 elog(NOTICE, "performsort done%s: %s",
811                          (state->status == TSS_FINALMERGE) ? " (except final merge)" : "",
812                          pg_rusage_show(&state->ru_start));
813 #endif
814 }
815
816 /*
817  * Fetch the next tuple in either forward or back direction.
818  * Returns NULL if no more tuples.      If should_free is set, the
819  * caller must pfree the returned tuple when done with it.
820  */
821 void *
822 tuplesort_gettuple(Tuplesortstate *state, bool forward,
823                                    bool *should_free)
824 {
825         unsigned int tuplen;
826         void       *tup;
827
828         switch (state->status)
829         {
830                 case TSS_SORTEDINMEM:
831                         Assert(forward || state->randomAccess);
832                         *should_free = false;
833                         if (forward)
834                         {
835                                 if (state->current < state->memtupcount)
836                                         return state->memtuples[state->current++];
837                                 state->eof_reached = true;
838                                 return NULL;
839                         }
840                         else
841                         {
842                                 if (state->current <= 0)
843                                         return NULL;
844
845                                 /*
846                                  * if all tuples are fetched already then we return last
847                                  * tuple, else - tuple before last returned.
848                                  */
849                                 if (state->eof_reached)
850                                         state->eof_reached = false;
851                                 else
852                                 {
853                                         state->current--;       /* last returned tuple */
854                                         if (state->current <= 0)
855                                                 return NULL;
856                                 }
857                                 return state->memtuples[state->current - 1];
858                         }
859                         break;
860
861                 case TSS_SORTEDONTAPE:
862                         Assert(forward || state->randomAccess);
863                         *should_free = true;
864                         if (forward)
865                         {
866                                 if (state->eof_reached)
867                                         return NULL;
868                                 if ((tuplen = getlen(state, state->result_tape, true)) != 0)
869                                 {
870                                         tup = READTUP(state, state->result_tape, tuplen);
871                                         return tup;
872                                 }
873                                 else
874                                 {
875                                         state->eof_reached = true;
876                                         return NULL;
877                                 }
878                         }
879
880                         /*
881                          * Backward.
882                          *
883                          * if all tuples are fetched already then we return last tuple,
884                          * else - tuple before last returned.
885                          */
886                         if (state->eof_reached)
887                         {
888                                 /*
889                                  * Seek position is pointing just past the zero tuplen at
890                                  * the end of file; back up to fetch last tuple's ending
891                                  * length word.  If seek fails we must have a completely
892                                  * empty file.
893                                  */
894                                 if (!LogicalTapeBackspace(state->tapeset,
895                                                                                   state->result_tape,
896                                                                                   2 * sizeof(unsigned int)))
897                                         return NULL;
898                                 state->eof_reached = false;
899                         }
900                         else
901                         {
902                                 /*
903                                  * Back up and fetch previously-returned tuple's ending
904                                  * length word.  If seek fails, assume we are at start of
905                                  * file.
906                                  */
907                                 if (!LogicalTapeBackspace(state->tapeset,
908                                                                                   state->result_tape,
909                                                                                   sizeof(unsigned int)))
910                                         return NULL;
911                                 tuplen = getlen(state, state->result_tape, false);
912
913                                 /*
914                                  * Back up to get ending length word of tuple before it.
915                                  */
916                                 if (!LogicalTapeBackspace(state->tapeset,
917                                                                                   state->result_tape,
918                                                                           tuplen + 2 * sizeof(unsigned int)))
919                                 {
920                                         /*
921                                          * If that fails, presumably the prev tuple is the
922                                          * first in the file.  Back up so that it becomes next
923                                          * to read in forward direction (not obviously right,
924                                          * but that is what in-memory case does).
925                                          */
926                                         if (!LogicalTapeBackspace(state->tapeset,
927                                                                                           state->result_tape,
928                                                                                   tuplen + sizeof(unsigned int)))
929                                                 elog(ERROR, "bogus tuple length in backward scan");
930                                         return NULL;
931                                 }
932                         }
933
934                         tuplen = getlen(state, state->result_tape, false);
935
936                         /*
937                          * Now we have the length of the prior tuple, back up and read
938                          * it. Note: READTUP expects we are positioned after the
939                          * initial length word of the tuple, so back up to that point.
940                          */
941                         if (!LogicalTapeBackspace(state->tapeset,
942                                                                           state->result_tape,
943                                                                           tuplen))
944                                 elog(ERROR, "bogus tuple length in backward scan");
945                         tup = READTUP(state, state->result_tape, tuplen);
946                         return tup;
947
948                 case TSS_FINALMERGE:
949                         Assert(forward);
950                         *should_free = true;
951
952                         /*
953                          * This code should match the inner loop of mergeonerun().
954                          */
955                         if (state->memtupcount > 0)
956                         {
957                                 int                     srcTape = state->memtupindex[0];
958                                 Size            tuplen;
959                                 int                     tupIndex;
960                                 void       *newtup;
961
962                                 tup = state->memtuples[0];
963                                 /* returned tuple is no longer counted in our memory space */
964                                 tuplen = GetMemoryChunkSpace(tup);
965                                 state->availMem += tuplen;
966                                 state->mergeavailmem[srcTape] += tuplen;
967                                 tuplesort_heap_siftup(state, false);
968                                 if ((tupIndex = state->mergenext[srcTape]) == 0)
969                                 {
970                                         /*
971                                          * out of preloaded data on this tape, try to read
972                                          * more
973                                          */
974                                         mergepreread(state);
975
976                                         /*
977                                          * if still no data, we've reached end of run on this
978                                          * tape
979                                          */
980                                         if ((tupIndex = state->mergenext[srcTape]) == 0)
981                                                 return tup;
982                                 }
983                                 /* pull next preread tuple from list, insert in heap */
984                                 newtup = state->memtuples[tupIndex];
985                                 state->mergenext[srcTape] = state->memtupindex[tupIndex];
986                                 if (state->mergenext[srcTape] == 0)
987                                         state->mergelast[srcTape] = 0;
988                                 state->memtupindex[tupIndex] = state->mergefreelist;
989                                 state->mergefreelist = tupIndex;
990                                 tuplesort_heap_insert(state, newtup, srcTape, false);
991                                 return tup;
992                         }
993                         return NULL;
994
995                 default:
996                         elog(ERROR, "invalid tuplesort state");
997                         return NULL;            /* keep compiler quiet */
998         }
999 }
1000
1001 /*
1002  * Fetch the next Datum in either forward or back direction.
1003  * Returns FALSE if no more datums.
1004  *
1005  * If the Datum is pass-by-ref type, the returned value is freshly palloc'd
1006  * and is now owned by the caller.
1007  */
1008 bool
1009 tuplesort_getdatum(Tuplesortstate *state, bool forward,
1010                                    Datum *val, bool *isNull)
1011 {
1012         DatumTuple *tuple;
1013         bool            should_free;
1014
1015         tuple = (DatumTuple *) tuplesort_gettuple(state, forward, &should_free);
1016
1017         if (tuple == NULL)
1018                 return false;
1019
1020         if (tuple->isNull || state->datumTypeByVal)
1021         {
1022                 *val = tuple->val;
1023                 *isNull = tuple->isNull;
1024         }
1025         else
1026         {
1027                 *val = datumCopy(tuple->val, false, state->datumTypeLen);
1028                 *isNull = false;
1029         }
1030
1031         if (should_free)
1032                 pfree(tuple);
1033
1034         return true;
1035 }
1036
1037
1038 /*
1039  * inittapes - initialize for tape sorting.
1040  *
1041  * This is called only if we have found we don't have room to sort in memory.
1042  */
1043 static void
1044 inittapes(Tuplesortstate *state)
1045 {
1046         int                     ntuples,
1047                                 j;
1048
1049 #ifdef TRACE_SORT
1050         if (trace_sort)
1051                 elog(NOTICE, "switching to external sort: %s",
1052                          pg_rusage_show(&state->ru_start));
1053 #endif
1054
1055         state->tapeset = LogicalTapeSetCreate(MAXTAPES);
1056
1057         /*
1058          * Allocate the memtupindex array, same size as memtuples.
1059          */
1060         state->memtupindex = (int *) palloc(state->memtupsize * sizeof(int));
1061
1062         USEMEM(state, GetMemoryChunkSpace(state->memtupindex));
1063
1064         /*
1065          * Convert the unsorted contents of memtuples[] into a heap. Each
1066          * tuple is marked as belonging to run number zero.
1067          *
1068          * NOTE: we pass false for checkIndex since there's no point in comparing
1069          * indexes in this step, even though we do intend the indexes to be
1070          * part of the sort key...
1071          */
1072         ntuples = state->memtupcount;
1073         state->memtupcount = 0;         /* make the heap empty */
1074         for (j = 0; j < ntuples; j++)
1075                 tuplesort_heap_insert(state, state->memtuples[j], 0, false);
1076         Assert(state->memtupcount == ntuples);
1077
1078         state->currentRun = 0;
1079
1080         /*
1081          * Initialize variables of Algorithm D (step D1).
1082          */
1083         for (j = 0; j < MAXTAPES; j++)
1084         {
1085                 state->tp_fib[j] = 1;
1086                 state->tp_runs[j] = 0;
1087                 state->tp_dummy[j] = 1;
1088                 state->tp_tapenum[j] = j;
1089         }
1090         state->tp_fib[TAPERANGE] = 0;
1091         state->tp_dummy[TAPERANGE] = 0;
1092
1093         state->Level = 1;
1094         state->destTape = 0;
1095
1096         state->status = TSS_BUILDRUNS;
1097 }
1098
1099 /*
1100  * selectnewtape -- select new tape for new initial run.
1101  *
1102  * This is called after finishing a run when we know another run
1103  * must be started.  This implements steps D3, D4 of Algorithm D.
1104  */
1105 static void
1106 selectnewtape(Tuplesortstate *state)
1107 {
1108         int                     j;
1109         int                     a;
1110
1111         /* Step D3: advance j (destTape) */
1112         if (state->tp_dummy[state->destTape] < state->tp_dummy[state->destTape + 1])
1113         {
1114                 state->destTape++;
1115                 return;
1116         }
1117         if (state->tp_dummy[state->destTape] != 0)
1118         {
1119                 state->destTape = 0;
1120                 return;
1121         }
1122
1123         /* Step D4: increase level */
1124         state->Level++;
1125         a = state->tp_fib[0];
1126         for (j = 0; j < TAPERANGE; j++)
1127         {
1128                 state->tp_dummy[j] = a + state->tp_fib[j + 1] - state->tp_fib[j];
1129                 state->tp_fib[j] = a + state->tp_fib[j + 1];
1130         }
1131         state->destTape = 0;
1132 }
1133
1134 /*
1135  * mergeruns -- merge all the completed initial runs.
1136  *
1137  * This implements steps D5, D6 of Algorithm D.  All input data has
1138  * already been written to initial runs on tape (see dumptuples).
1139  */
1140 static void
1141 mergeruns(Tuplesortstate *state)
1142 {
1143         int                     tapenum,
1144                                 svTape,
1145                                 svRuns,
1146                                 svDummy;
1147
1148         Assert(state->status == TSS_BUILDRUNS);
1149         Assert(state->memtupcount == 0);
1150
1151         /*
1152          * If we produced only one initial run (quite likely if the total data
1153          * volume is between 1X and 2X workMem), we can just use that tape as
1154          * the finished output, rather than doing a useless merge.
1155          */
1156         if (state->currentRun == 1)
1157         {
1158                 state->result_tape = state->tp_tapenum[state->destTape];
1159                 /* must freeze and rewind the finished output tape */
1160                 LogicalTapeFreeze(state->tapeset, state->result_tape);
1161                 state->status = TSS_SORTEDONTAPE;
1162                 return;
1163         }
1164
1165         /* End of step D2: rewind all output tapes to prepare for merging */
1166         for (tapenum = 0; tapenum < TAPERANGE; tapenum++)
1167                 LogicalTapeRewind(state->tapeset, tapenum, false);
1168
1169         for (;;)
1170         {
1171                 /* Step D5: merge runs onto tape[T] until tape[P] is empty */
1172                 while (state->tp_runs[TAPERANGE - 1] || state->tp_dummy[TAPERANGE - 1])
1173                 {
1174                         bool            allDummy = true;
1175                         bool            allOneRun = true;
1176
1177                         for (tapenum = 0; tapenum < TAPERANGE; tapenum++)
1178                         {
1179                                 if (state->tp_dummy[tapenum] == 0)
1180                                         allDummy = false;
1181                                 if (state->tp_runs[tapenum] + state->tp_dummy[tapenum] != 1)
1182                                         allOneRun = false;
1183                         }
1184
1185                         /*
1186                          * If we don't have to produce a materialized sorted tape,
1187                          * quit as soon as we're down to one real/dummy run per tape.
1188                          */
1189                         if (!state->randomAccess && allOneRun)
1190                         {
1191                                 Assert(!allDummy);
1192                                 /* Initialize for the final merge pass */
1193                                 beginmerge(state);
1194                                 state->status = TSS_FINALMERGE;
1195                                 return;
1196                         }
1197                         if (allDummy)
1198                         {
1199                                 state->tp_dummy[TAPERANGE]++;
1200                                 for (tapenum = 0; tapenum < TAPERANGE; tapenum++)
1201                                         state->tp_dummy[tapenum]--;
1202                         }
1203                         else
1204                                 mergeonerun(state);
1205                 }
1206                 /* Step D6: decrease level */
1207                 if (--state->Level == 0)
1208                         break;
1209                 /* rewind output tape T to use as new input */
1210                 LogicalTapeRewind(state->tapeset, state->tp_tapenum[TAPERANGE],
1211                                                   false);
1212                 /* rewind used-up input tape P, and prepare it for write pass */
1213                 LogicalTapeRewind(state->tapeset, state->tp_tapenum[TAPERANGE - 1],
1214                                                   true);
1215                 state->tp_runs[TAPERANGE - 1] = 0;
1216
1217                 /*
1218                  * reassign tape units per step D6; note we no longer care about
1219                  * A[]
1220                  */
1221                 svTape = state->tp_tapenum[TAPERANGE];
1222                 svDummy = state->tp_dummy[TAPERANGE];
1223                 svRuns = state->tp_runs[TAPERANGE];
1224                 for (tapenum = TAPERANGE; tapenum > 0; tapenum--)
1225                 {
1226                         state->tp_tapenum[tapenum] = state->tp_tapenum[tapenum - 1];
1227                         state->tp_dummy[tapenum] = state->tp_dummy[tapenum - 1];
1228                         state->tp_runs[tapenum] = state->tp_runs[tapenum - 1];
1229                 }
1230                 state->tp_tapenum[0] = svTape;
1231                 state->tp_dummy[0] = svDummy;
1232                 state->tp_runs[0] = svRuns;
1233         }
1234
1235         /*
1236          * Done.  Knuth says that the result is on TAPE[1], but since we
1237          * exited the loop without performing the last iteration of step D6,
1238          * we have not rearranged the tape unit assignment, and therefore the
1239          * result is on TAPE[T].  We need to do it this way so that we can
1240          * freeze the final output tape while rewinding it.  The last
1241          * iteration of step D6 would be a waste of cycles anyway...
1242          */
1243         state->result_tape = state->tp_tapenum[TAPERANGE];
1244         LogicalTapeFreeze(state->tapeset, state->result_tape);
1245         state->status = TSS_SORTEDONTAPE;
1246 }
1247
1248 /*
1249  * Merge one run from each input tape, except ones with dummy runs.
1250  *
1251  * This is the inner loop of Algorithm D step D5.  We know that the
1252  * output tape is TAPE[T].
1253  */
1254 static void
1255 mergeonerun(Tuplesortstate *state)
1256 {
1257         int                     destTape = state->tp_tapenum[TAPERANGE];
1258         int                     srcTape;
1259         int                     tupIndex;
1260         void       *tup;
1261         long            priorAvail,
1262                                 spaceFreed;
1263
1264         /*
1265          * Start the merge by loading one tuple from each active source tape
1266          * into the heap.  We can also decrease the input run/dummy run
1267          * counts.
1268          */
1269         beginmerge(state);
1270
1271         /*
1272          * Execute merge by repeatedly extracting lowest tuple in heap,
1273          * writing it out, and replacing it with next tuple from same tape (if
1274          * there is another one).
1275          */
1276         while (state->memtupcount > 0)
1277         {
1278                 CHECK_FOR_INTERRUPTS();
1279                 /* write the tuple to destTape */
1280                 priorAvail = state->availMem;
1281                 srcTape = state->memtupindex[0];
1282                 WRITETUP(state, destTape, state->memtuples[0]);
1283                 /* writetup adjusted total free space, now fix per-tape space */
1284                 spaceFreed = state->availMem - priorAvail;
1285                 state->mergeavailmem[srcTape] += spaceFreed;
1286                 /* compact the heap */
1287                 tuplesort_heap_siftup(state, false);
1288                 if ((tupIndex = state->mergenext[srcTape]) == 0)
1289                 {
1290                         /* out of preloaded data on this tape, try to read more */
1291                         mergepreread(state);
1292                         /* if still no data, we've reached end of run on this tape */
1293                         if ((tupIndex = state->mergenext[srcTape]) == 0)
1294                                 continue;
1295                 }
1296                 /* pull next preread tuple from list, insert in heap */
1297                 tup = state->memtuples[tupIndex];
1298                 state->mergenext[srcTape] = state->memtupindex[tupIndex];
1299                 if (state->mergenext[srcTape] == 0)
1300                         state->mergelast[srcTape] = 0;
1301                 state->memtupindex[tupIndex] = state->mergefreelist;
1302                 state->mergefreelist = tupIndex;
1303                 tuplesort_heap_insert(state, tup, srcTape, false);
1304         }
1305
1306         /*
1307          * When the heap empties, we're done.  Write an end-of-run marker on
1308          * the output tape, and increment its count of real runs.
1309          */
1310         markrunend(state, destTape);
1311         state->tp_runs[TAPERANGE]++;
1312
1313 #ifdef TRACE_SORT
1314         if (trace_sort)
1315                 elog(NOTICE, "finished merge step: %s",
1316                          pg_rusage_show(&state->ru_start));
1317 #endif
1318 }
1319
1320 /*
1321  * beginmerge - initialize for a merge pass
1322  *
1323  * We decrease the counts of real and dummy runs for each tape, and mark
1324  * which tapes contain active input runs in mergeactive[].      Then, load
1325  * as many tuples as we can from each active input tape, and finally
1326  * fill the merge heap with the first tuple from each active tape.
1327  */
1328 static void
1329 beginmerge(Tuplesortstate *state)
1330 {
1331         int                     activeTapes;
1332         int                     tapenum;
1333         int                     srcTape;
1334
1335         /* Heap should be empty here */
1336         Assert(state->memtupcount == 0);
1337
1338         /* Clear merge-pass state variables */
1339         memset(state->mergeactive, 0, sizeof(state->mergeactive));
1340         memset(state->mergenext, 0, sizeof(state->mergenext));
1341         memset(state->mergelast, 0, sizeof(state->mergelast));
1342         memset(state->mergeavailmem, 0, sizeof(state->mergeavailmem));
1343         state->mergefreelist = 0;       /* nothing in the freelist */
1344         state->mergefirstfree = MAXTAPES;       /* first slot available for
1345                                                                                  * preread */
1346
1347         /* Adjust run counts and mark the active tapes */
1348         activeTapes = 0;
1349         for (tapenum = 0; tapenum < TAPERANGE; tapenum++)
1350         {
1351                 if (state->tp_dummy[tapenum] > 0)
1352                         state->tp_dummy[tapenum]--;
1353                 else
1354                 {
1355                         Assert(state->tp_runs[tapenum] > 0);
1356                         state->tp_runs[tapenum]--;
1357                         srcTape = state->tp_tapenum[tapenum];
1358                         state->mergeactive[srcTape] = true;
1359                         activeTapes++;
1360                 }
1361         }
1362
1363         /*
1364          * Initialize space allocation to let each active input tape have an
1365          * equal share of preread space.
1366          */
1367         Assert(activeTapes > 0);
1368         state->spacePerTape = state->availMem / activeTapes;
1369         for (srcTape = 0; srcTape < MAXTAPES; srcTape++)
1370         {
1371                 if (state->mergeactive[srcTape])
1372                         state->mergeavailmem[srcTape] = state->spacePerTape;
1373         }
1374
1375         /*
1376          * Preread as many tuples as possible (and at least one) from each
1377          * active tape
1378          */
1379         mergepreread(state);
1380
1381         /* Load the merge heap with the first tuple from each input tape */
1382         for (srcTape = 0; srcTape < MAXTAPES; srcTape++)
1383         {
1384                 int                     tupIndex = state->mergenext[srcTape];
1385                 void       *tup;
1386
1387                 if (tupIndex)
1388                 {
1389                         tup = state->memtuples[tupIndex];
1390                         state->mergenext[srcTape] = state->memtupindex[tupIndex];
1391                         if (state->mergenext[srcTape] == 0)
1392                                 state->mergelast[srcTape] = 0;
1393                         state->memtupindex[tupIndex] = state->mergefreelist;
1394                         state->mergefreelist = tupIndex;
1395                         tuplesort_heap_insert(state, tup, srcTape, false);
1396                 }
1397         }
1398 }
1399
1400 /*
1401  * mergepreread - load tuples from merge input tapes
1402  *
1403  * This routine exists to improve sequentiality of reads during a merge pass,
1404  * as explained in the header comments of this file.  Load tuples from each
1405  * active source tape until the tape's run is exhausted or it has used up
1406  * its fair share of available memory.  In any case, we guarantee that there
1407  * is at least one preread tuple available from each unexhausted input tape.
1408  */
1409 static void
1410 mergepreread(Tuplesortstate *state)
1411 {
1412         int                     srcTape;
1413         unsigned int tuplen;
1414         void       *tup;
1415         int                     tupIndex;
1416         long            priorAvail,
1417                                 spaceUsed;
1418
1419         for (srcTape = 0; srcTape < MAXTAPES; srcTape++)
1420         {
1421                 if (!state->mergeactive[srcTape])
1422                         continue;
1423
1424                 /*
1425                  * Skip reading from any tape that still has at least half of its
1426                  * target memory filled with tuples (threshold fraction may need
1427                  * adjustment?).  This avoids reading just a few tuples when the
1428                  * incoming runs are not being consumed evenly.
1429                  */
1430                 if (state->mergenext[srcTape] != 0 &&
1431                         state->mergeavailmem[srcTape] <= state->spacePerTape / 2)
1432                         continue;
1433
1434                 /*
1435                  * Read tuples from this tape until it has used up its free
1436                  * memory, but ensure that we have at least one.
1437                  */
1438                 priorAvail = state->availMem;
1439                 state->availMem = state->mergeavailmem[srcTape];
1440                 while (!LACKMEM(state) || state->mergenext[srcTape] == 0)
1441                 {
1442                         /* read next tuple, if any */
1443                         if ((tuplen = getlen(state, srcTape, true)) == 0)
1444                         {
1445                                 state->mergeactive[srcTape] = false;
1446                                 break;
1447                         }
1448                         tup = READTUP(state, srcTape, tuplen);
1449                         /* find or make a free slot in memtuples[] for it */
1450                         tupIndex = state->mergefreelist;
1451                         if (tupIndex)
1452                                 state->mergefreelist = state->memtupindex[tupIndex];
1453                         else
1454                         {
1455                                 tupIndex = state->mergefirstfree++;
1456                                 /* Might need to enlarge arrays! */
1457                                 if (tupIndex >= state->memtupsize)
1458                                 {
1459                                         FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
1460                                         FREEMEM(state, GetMemoryChunkSpace(state->memtupindex));
1461                                         state->memtupsize *= 2;
1462                                         state->memtuples = (void **)
1463                                                 repalloc(state->memtuples,
1464                                                                  state->memtupsize * sizeof(void *));
1465                                         state->memtupindex = (int *)
1466                                                 repalloc(state->memtupindex,
1467                                                                  state->memtupsize * sizeof(int));
1468                                         USEMEM(state, GetMemoryChunkSpace(state->memtuples));
1469                                         USEMEM(state, GetMemoryChunkSpace(state->memtupindex));
1470                                 }
1471                         }
1472                         /* store tuple, append to list for its tape */
1473                         state->memtuples[tupIndex] = tup;
1474                         state->memtupindex[tupIndex] = 0;
1475                         if (state->mergelast[srcTape])
1476                                 state->memtupindex[state->mergelast[srcTape]] = tupIndex;
1477                         else
1478                                 state->mergenext[srcTape] = tupIndex;
1479                         state->mergelast[srcTape] = tupIndex;
1480                 }
1481                 /* update per-tape and global availmem counts */
1482                 spaceUsed = state->mergeavailmem[srcTape] - state->availMem;
1483                 state->mergeavailmem[srcTape] = state->availMem;
1484                 state->availMem = priorAvail - spaceUsed;
1485         }
1486 }
1487
1488 /*
1489  * dumptuples - remove tuples from heap and write to tape
1490  *
1491  * This is used during initial-run building, but not during merging.
1492  *
1493  * When alltuples = false, dump only enough tuples to get under the
1494  * availMem limit (and leave at least one tuple in the heap in any case,
1495  * since puttuple assumes it always has a tuple to compare to).
1496  *
1497  * When alltuples = true, dump everything currently in memory.
1498  * (This case is only used at end of input data.)
1499  *
1500  * If we empty the heap, close out the current run and return (this should
1501  * only happen at end of input data).  If we see that the tuple run number
1502  * at the top of the heap has changed, start a new run.
1503  */
1504 static void
1505 dumptuples(Tuplesortstate *state, bool alltuples)
1506 {
1507         while (alltuples ||
1508                    (LACKMEM(state) && state->memtupcount > 1))
1509         {
1510                 /*
1511                  * Dump the heap's frontmost entry, and sift up to remove it from
1512                  * the heap.
1513                  */
1514                 Assert(state->memtupcount > 0);
1515                 WRITETUP(state, state->tp_tapenum[state->destTape],
1516                                  state->memtuples[0]);
1517                 tuplesort_heap_siftup(state, true);
1518
1519                 /*
1520                  * If the heap is empty *or* top run number has changed, we've
1521                  * finished the current run.
1522                  */
1523                 if (state->memtupcount == 0 ||
1524                         state->currentRun != state->memtupindex[0])
1525                 {
1526                         markrunend(state, state->tp_tapenum[state->destTape]);
1527                         state->currentRun++;
1528                         state->tp_runs[state->destTape]++;
1529                         state->tp_dummy[state->destTape]--; /* per Alg D step D2 */
1530
1531 #ifdef TRACE_SORT
1532                         if (trace_sort)
1533                                 elog(NOTICE, "finished writing%s run %d: %s",
1534                                          (state->memtupcount == 0) ? " final" : "",
1535                                          state->currentRun,
1536                                          pg_rusage_show(&state->ru_start));
1537 #endif
1538
1539                         /*
1540                          * Done if heap is empty, else prepare for new run.
1541                          */
1542                         if (state->memtupcount == 0)
1543                                 break;
1544                         Assert(state->currentRun == state->memtupindex[0]);
1545                         selectnewtape(state);
1546                 }
1547         }
1548 }
1549
1550 /*
1551  * tuplesort_rescan             - rewind and replay the scan
1552  */
1553 void
1554 tuplesort_rescan(Tuplesortstate *state)
1555 {
1556         Assert(state->randomAccess);
1557
1558         switch (state->status)
1559         {
1560                 case TSS_SORTEDINMEM:
1561                         state->current = 0;
1562                         state->eof_reached = false;
1563                         state->markpos_offset = 0;
1564                         state->markpos_eof = false;
1565                         break;
1566                 case TSS_SORTEDONTAPE:
1567                         LogicalTapeRewind(state->tapeset,
1568                                                           state->result_tape,
1569                                                           false);
1570                         state->eof_reached = false;
1571                         state->markpos_block = 0L;
1572                         state->markpos_offset = 0;
1573                         state->markpos_eof = false;
1574                         break;
1575                 default:
1576                         elog(ERROR, "invalid tuplesort state");
1577                         break;
1578         }
1579 }
1580
1581 /*
1582  * tuplesort_markpos    - saves current position in the merged sort file
1583  */
1584 void
1585 tuplesort_markpos(Tuplesortstate *state)
1586 {
1587         Assert(state->randomAccess);
1588
1589         switch (state->status)
1590         {
1591                 case TSS_SORTEDINMEM:
1592                         state->markpos_offset = state->current;
1593                         state->markpos_eof = state->eof_reached;
1594                         break;
1595                 case TSS_SORTEDONTAPE:
1596                         LogicalTapeTell(state->tapeset,
1597                                                         state->result_tape,
1598                                                         &state->markpos_block,
1599                                                         &state->markpos_offset);
1600                         state->markpos_eof = state->eof_reached;
1601                         break;
1602                 default:
1603                         elog(ERROR, "invalid tuplesort state");
1604                         break;
1605         }
1606 }
1607
1608 /*
1609  * tuplesort_restorepos - restores current position in merged sort file to
1610  *                                                last saved position
1611  */
1612 void
1613 tuplesort_restorepos(Tuplesortstate *state)
1614 {
1615         Assert(state->randomAccess);
1616
1617         switch (state->status)
1618         {
1619                 case TSS_SORTEDINMEM:
1620                         state->current = state->markpos_offset;
1621                         state->eof_reached = state->markpos_eof;
1622                         break;
1623                 case TSS_SORTEDONTAPE:
1624                         if (!LogicalTapeSeek(state->tapeset,
1625                                                                  state->result_tape,
1626                                                                  state->markpos_block,
1627                                                                  state->markpos_offset))
1628                                 elog(ERROR, "tuplesort_restorepos failed");
1629                         state->eof_reached = state->markpos_eof;
1630                         break;
1631                 default:
1632                         elog(ERROR, "invalid tuplesort state");
1633                         break;
1634         }
1635 }
1636
1637
1638 /*
1639  * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
1640  *
1641  * The heap lives in state->memtuples[], with parallel data storage
1642  * for indexes in state->memtupindex[].  If checkIndex is true, use
1643  * the tuple index as the front of the sort key; otherwise, no.
1644  */
1645
1646 #define HEAPCOMPARE(tup1,index1,tup2,index2) \
1647         (checkIndex && (index1 != index2) ? (index1) - (index2) : \
1648          COMPARETUP(state, tup1, tup2))
1649
1650 /*
1651  * Insert a new tuple into an empty or existing heap, maintaining the
1652  * heap invariant.
1653  */
1654 static void
1655 tuplesort_heap_insert(Tuplesortstate *state, void *tuple,
1656                                           int tupleindex, bool checkIndex)
1657 {
1658         void      **memtuples;
1659         int                *memtupindex;
1660         int                     j;
1661
1662         /*
1663          * Make sure memtuples[] can handle another entry.
1664          */
1665         if (state->memtupcount >= state->memtupsize)
1666         {
1667                 FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
1668                 FREEMEM(state, GetMemoryChunkSpace(state->memtupindex));
1669                 state->memtupsize *= 2;
1670                 state->memtuples = (void **)
1671                         repalloc(state->memtuples,
1672                                          state->memtupsize * sizeof(void *));
1673                 state->memtupindex = (int *)
1674                         repalloc(state->memtupindex,
1675                                          state->memtupsize * sizeof(int));
1676                 USEMEM(state, GetMemoryChunkSpace(state->memtuples));
1677                 USEMEM(state, GetMemoryChunkSpace(state->memtupindex));
1678         }
1679         memtuples = state->memtuples;
1680         memtupindex = state->memtupindex;
1681
1682         /*
1683          * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth
1684          * is using 1-based array indexes, not 0-based.
1685          */
1686         j = state->memtupcount++;
1687         while (j > 0)
1688         {
1689                 int                     i = (j - 1) >> 1;
1690
1691                 if (HEAPCOMPARE(tuple, tupleindex,
1692                                                 memtuples[i], memtupindex[i]) >= 0)
1693                         break;
1694                 memtuples[j] = memtuples[i];
1695                 memtupindex[j] = memtupindex[i];
1696                 j = i;
1697         }
1698         memtuples[j] = tuple;
1699         memtupindex[j] = tupleindex;
1700 }
1701
1702 /*
1703  * The tuple at state->memtuples[0] has been removed from the heap.
1704  * Decrement memtupcount, and sift up to maintain the heap invariant.
1705  */
1706 static void
1707 tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex)
1708 {
1709         void      **memtuples = state->memtuples;
1710         int                *memtupindex = state->memtupindex;
1711         void       *tuple;
1712         int                     tupindex,
1713                                 i,
1714                                 n;
1715
1716         if (--state->memtupcount <= 0)
1717                 return;
1718         n = state->memtupcount;
1719         tuple = memtuples[n];           /* tuple that must be reinserted */
1720         tupindex = memtupindex[n];
1721         i = 0;                                          /* i is where the "hole" is */
1722         for (;;)
1723         {
1724                 int                     j = 2 * i + 1;
1725
1726                 if (j >= n)
1727                         break;
1728                 if (j + 1 < n &&
1729                         HEAPCOMPARE(memtuples[j], memtupindex[j],
1730                                                 memtuples[j + 1], memtupindex[j + 1]) > 0)
1731                         j++;
1732                 if (HEAPCOMPARE(tuple, tupindex,
1733                                                 memtuples[j], memtupindex[j]) <= 0)
1734                         break;
1735                 memtuples[i] = memtuples[j];
1736                 memtupindex[i] = memtupindex[j];
1737                 i = j;
1738         }
1739         memtuples[i] = tuple;
1740         memtupindex[i] = tupindex;
1741 }
1742
1743
1744 /*
1745  * Tape interface routines
1746  */
1747
1748 static unsigned int
1749 getlen(Tuplesortstate *state, int tapenum, bool eofOK)
1750 {
1751         unsigned int len;
1752
1753         if (LogicalTapeRead(state->tapeset, tapenum, (void *) &len,
1754                                                 sizeof(len)) != sizeof(len))
1755                 elog(ERROR, "unexpected end of tape");
1756         if (len == 0 && !eofOK)
1757                 elog(ERROR, "unexpected end of data");
1758         return len;
1759 }
1760
1761 static void
1762 markrunend(Tuplesortstate *state, int tapenum)
1763 {
1764         unsigned int len = 0;
1765
1766         LogicalTapeWrite(state->tapeset, tapenum, (void *) &len, sizeof(len));
1767 }
1768
1769
1770 /*
1771  * qsort interface
1772  */
1773
1774 static int
1775 qsort_comparetup(const void *a, const void *b)
1776 {
1777         /* The passed pointers are pointers to void * ... */
1778
1779         return COMPARETUP(qsort_tuplesortstate, *(void **) a, *(void **) b);
1780 }
1781
1782
1783 /*
1784  * This routine selects an appropriate sorting function to implement
1785  * a sort operator as efficiently as possible.  The straightforward
1786  * method is to use the operator's implementation proc --- ie, "<"
1787  * comparison.  However, that way often requires two calls of the function
1788  * per comparison.      If we can find a btree three-way comparator function
1789  * associated with the operator, we can use it to do the comparisons
1790  * more efficiently.  We also support the possibility that the operator
1791  * is ">" (descending sort), in which case we have to reverse the output
1792  * of the btree comparator.
1793  *
1794  * Possibly this should live somewhere else (backend/catalog/, maybe?).
1795  */
1796 void
1797 SelectSortFunction(Oid sortOperator,
1798                                    RegProcedure *sortFunction,
1799                                    SortFunctionKind *kind)
1800 {
1801         CatCList   *catlist;
1802         int                     i;
1803         HeapTuple       tuple;
1804         Form_pg_operator optup;
1805         Oid                     opclass = InvalidOid;
1806
1807         /*
1808          * Search pg_amop to see if the target operator is registered as the
1809          * "<" or ">" operator of any btree opclass.  It's possible that it
1810          * might be registered both ways (eg, if someone were to build a
1811          * "reverse sort" opclass for some reason); prefer the "<" case if so.
1812          * If the operator is registered the same way in multiple opclasses,
1813          * assume we can use the associated comparator function from any one.
1814          */
1815         catlist = SearchSysCacheList(AMOPOPID, 1,
1816                                                                  ObjectIdGetDatum(sortOperator),
1817                                                                  0, 0, 0);
1818
1819         for (i = 0; i < catlist->n_members; i++)
1820         {
1821                 Form_pg_amop aform;
1822
1823                 tuple = &catlist->members[i]->tuple;
1824                 aform = (Form_pg_amop) GETSTRUCT(tuple);
1825
1826                 if (!opclass_is_btree(aform->amopclaid))
1827                         continue;
1828                 /* must be of default subtype, too */
1829                 if (aform->amopsubtype != InvalidOid)
1830                         continue;
1831
1832                 if (aform->amopstrategy == BTLessStrategyNumber)
1833                 {
1834                         opclass = aform->amopclaid;
1835                         *kind = SORTFUNC_CMP;
1836                         break;                          /* done looking */
1837                 }
1838                 else if (aform->amopstrategy == BTGreaterStrategyNumber)
1839                 {
1840                         opclass = aform->amopclaid;
1841                         *kind = SORTFUNC_REVCMP;
1842                         /* keep scanning in hopes of finding a BTLess entry */
1843                 }
1844         }
1845
1846         ReleaseSysCacheList(catlist);
1847
1848         if (OidIsValid(opclass))
1849         {
1850                 /* Found a suitable opclass, get its default comparator function */
1851                 *sortFunction = get_opclass_proc(opclass, InvalidOid, BTORDER_PROC);
1852                 Assert(RegProcedureIsValid(*sortFunction));
1853                 return;
1854         }
1855
1856         /*
1857          * Can't find a comparator, so use the operator as-is.  Decide whether
1858          * it is forward or reverse sort by looking at its name (grotty, but
1859          * this only matters for deciding which end NULLs should get sorted
1860          * to).  XXX possibly better idea: see whether its selectivity
1861          * function is scalargtcmp?
1862          */
1863         tuple = SearchSysCache(OPEROID,
1864                                                    ObjectIdGetDatum(sortOperator),
1865                                                    0, 0, 0);
1866         if (!HeapTupleIsValid(tuple))
1867                 elog(ERROR, "cache lookup failed for operator %u", sortOperator);
1868         optup = (Form_pg_operator) GETSTRUCT(tuple);
1869         if (strcmp(NameStr(optup->oprname), ">") == 0)
1870                 *kind = SORTFUNC_REVLT;
1871         else
1872                 *kind = SORTFUNC_LT;
1873         *sortFunction = optup->oprcode;
1874         ReleaseSysCache(tuple);
1875
1876         Assert(RegProcedureIsValid(*sortFunction));
1877 }
1878
1879 /*
1880  * Inline-able copy of FunctionCall2() to save some cycles in sorting.
1881  */
1882 static inline Datum
1883 myFunctionCall2(FmgrInfo *flinfo, Datum arg1, Datum arg2)
1884 {
1885         FunctionCallInfoData fcinfo;
1886         Datum           result;
1887
1888         InitFunctionCallInfoData(fcinfo, flinfo, 2, NULL, NULL);
1889
1890         fcinfo.arg[0] = arg1;
1891         fcinfo.arg[1] = arg2;
1892         fcinfo.argnull[0] = false;
1893         fcinfo.argnull[1] = false;
1894
1895         result = FunctionCallInvoke(&fcinfo);
1896
1897         /* Check for null result, since caller is clearly not expecting one */
1898         if (fcinfo.isnull)
1899                 elog(ERROR, "function %u returned NULL", fcinfo.flinfo->fn_oid);
1900
1901         return result;
1902 }
1903
1904 /*
1905  * Apply a sort function (by now converted to fmgr lookup form)
1906  * and return a 3-way comparison result.  This takes care of handling
1907  * NULLs and sort ordering direction properly.
1908  */
1909 static inline int32
1910 inlineApplySortFunction(FmgrInfo *sortFunction, SortFunctionKind kind,
1911                                                 Datum datum1, bool isNull1,
1912                                                 Datum datum2, bool isNull2)
1913 {
1914         switch (kind)
1915         {
1916                 case SORTFUNC_LT:
1917                         if (isNull1)
1918                         {
1919                                 if (isNull2)
1920                                         return 0;
1921                                 return 1;               /* NULL sorts after non-NULL */
1922                         }
1923                         if (isNull2)
1924                                 return -1;
1925                         if (DatumGetBool(myFunctionCall2(sortFunction, datum1, datum2)))
1926                                 return -1;              /* a < b */
1927                         if (DatumGetBool(myFunctionCall2(sortFunction, datum2, datum1)))
1928                                 return 1;               /* a > b */
1929                         return 0;
1930
1931                 case SORTFUNC_REVLT:
1932                         /* We reverse the ordering of NULLs, but not the operator */
1933                         if (isNull1)
1934                         {
1935                                 if (isNull2)
1936                                         return 0;
1937                                 return -1;              /* NULL sorts before non-NULL */
1938                         }
1939                         if (isNull2)
1940                                 return 1;
1941                         if (DatumGetBool(myFunctionCall2(sortFunction, datum1, datum2)))
1942                                 return -1;              /* a < b */
1943                         if (DatumGetBool(myFunctionCall2(sortFunction, datum2, datum1)))
1944                                 return 1;               /* a > b */
1945                         return 0;
1946
1947                 case SORTFUNC_CMP:
1948                         if (isNull1)
1949                         {
1950                                 if (isNull2)
1951                                         return 0;
1952                                 return 1;               /* NULL sorts after non-NULL */
1953                         }
1954                         if (isNull2)
1955                                 return -1;
1956                         return DatumGetInt32(myFunctionCall2(sortFunction,
1957                                                                                                  datum1, datum2));
1958
1959                 case SORTFUNC_REVCMP:
1960                         if (isNull1)
1961                         {
1962                                 if (isNull2)
1963                                         return 0;
1964                                 return -1;              /* NULL sorts before non-NULL */
1965                         }
1966                         if (isNull2)
1967                                 return 1;
1968                         return -DatumGetInt32(myFunctionCall2(sortFunction,
1969                                                                                                   datum1, datum2));
1970
1971                 default:
1972                         elog(ERROR, "unrecognized SortFunctionKind: %d", (int) kind);
1973                         return 0;                       /* can't get here, but keep compiler quiet */
1974         }
1975 }
1976
1977 /*
1978  * Non-inline ApplySortFunction() --- this is needed only to conform to
1979  * C99's brain-dead notions about how to implement inline functions...
1980  */
1981 int32
1982 ApplySortFunction(FmgrInfo *sortFunction, SortFunctionKind kind,
1983                                   Datum datum1, bool isNull1,
1984                                   Datum datum2, bool isNull2)
1985 {
1986         return inlineApplySortFunction(sortFunction, kind,
1987                                                                    datum1, isNull1,
1988                                                                    datum2, isNull2);
1989 }
1990
1991
1992 /*
1993  * Routines specialized for HeapTuple case
1994  */
1995
1996 static int
1997 comparetup_heap(Tuplesortstate *state, const void *a, const void *b)
1998 {
1999         HeapTuple       ltup = (HeapTuple) a;
2000         HeapTuple       rtup = (HeapTuple) b;
2001         TupleDesc       tupDesc = state->tupDesc;
2002         int                     nkey;
2003
2004         for (nkey = 0; nkey < state->nKeys; nkey++)
2005         {
2006                 ScanKey         scanKey = state->scanKeys + nkey;
2007                 AttrNumber      attno = scanKey->sk_attno;
2008                 Datum           datum1,
2009                                         datum2;
2010                 bool            isnull1,
2011                                         isnull2;
2012                 int32           compare;
2013
2014                 datum1 = heap_getattr(ltup, attno, tupDesc, &isnull1);
2015                 datum2 = heap_getattr(rtup, attno, tupDesc, &isnull2);
2016
2017                 compare = inlineApplySortFunction(&scanKey->sk_func,
2018                                                                                   state->sortFnKinds[nkey],
2019                                                                                   datum1, isnull1,
2020                                                                                   datum2, isnull2);
2021                 if (compare != 0)
2022                         return compare;
2023         }
2024
2025         return 0;
2026 }
2027
2028 static void *
2029 copytup_heap(Tuplesortstate *state, void *tup)
2030 {
2031         HeapTuple       tuple = (HeapTuple) tup;
2032
2033         tuple = heap_copytuple(tuple);
2034         USEMEM(state, GetMemoryChunkSpace(tuple));
2035         return (void *) tuple;
2036 }
2037
2038 /*
2039  * We don't bother to write the HeapTupleData part of the tuple.
2040  */
2041
2042 static void
2043 writetup_heap(Tuplesortstate *state, int tapenum, void *tup)
2044 {
2045         HeapTuple       tuple = (HeapTuple) tup;
2046         unsigned int tuplen;
2047
2048         tuplen = tuple->t_len + sizeof(tuplen);
2049         LogicalTapeWrite(state->tapeset, tapenum,
2050                                          (void *) &tuplen, sizeof(tuplen));
2051         LogicalTapeWrite(state->tapeset, tapenum,
2052                                          (void *) tuple->t_data, tuple->t_len);
2053         if (state->randomAccess)        /* need trailing length word? */
2054                 LogicalTapeWrite(state->tapeset, tapenum,
2055                                                  (void *) &tuplen, sizeof(tuplen));
2056
2057         FREEMEM(state, GetMemoryChunkSpace(tuple));
2058         heap_freetuple(tuple);
2059 }
2060
2061 static void *
2062 readtup_heap(Tuplesortstate *state, int tapenum, unsigned int len)
2063 {
2064         unsigned int tuplen = len - sizeof(unsigned int) + HEAPTUPLESIZE;
2065         HeapTuple       tuple = (HeapTuple) palloc(tuplen);
2066
2067         USEMEM(state, GetMemoryChunkSpace(tuple));
2068         /* reconstruct the HeapTupleData portion */
2069         tuple->t_len = len - sizeof(unsigned int);
2070         ItemPointerSetInvalid(&(tuple->t_self));
2071         tuple->t_datamcxt = CurrentMemoryContext;
2072         tuple->t_data = (HeapTupleHeader) (((char *) tuple) + HEAPTUPLESIZE);
2073         /* read in the tuple proper */
2074         if (LogicalTapeRead(state->tapeset, tapenum, (void *) tuple->t_data,
2075                                                 tuple->t_len) != tuple->t_len)
2076                 elog(ERROR, "unexpected end of data");
2077         if (state->randomAccess)        /* need trailing length word? */
2078                 if (LogicalTapeRead(state->tapeset, tapenum, (void *) &tuplen,
2079                                                         sizeof(tuplen)) != sizeof(tuplen))
2080                         elog(ERROR, "unexpected end of data");
2081         return (void *) tuple;
2082 }
2083
2084
2085 /*
2086  * Routines specialized for IndexTuple case
2087  *
2088  * NOTE: actually, these are specialized for the btree case; it's not
2089  * clear whether you could use them for a non-btree index.      Possibly
2090  * you'd need to make another set of routines if you needed to sort
2091  * according to another kind of index.
2092  */
2093
2094 static int
2095 comparetup_index(Tuplesortstate *state, const void *a, const void *b)
2096 {
2097         /*
2098          * This is almost the same as _bt_tuplecompare(), but we need to keep
2099          * track of whether any null fields are present.  Also see the special
2100          * treatment for equal keys at the end.
2101          */
2102         IndexTuple      tuple1 = (IndexTuple) a;
2103         IndexTuple      tuple2 = (IndexTuple) b;
2104         Relation        rel = state->indexRel;
2105         int                     keysz = RelationGetNumberOfAttributes(rel);
2106         ScanKey         scankey = state->indexScanKey;
2107         TupleDesc       tupDes;
2108         int                     i;
2109         bool            equal_hasnull = false;
2110
2111         tupDes = RelationGetDescr(rel);
2112
2113         for (i = 1; i <= keysz; i++)
2114         {
2115                 ScanKey         entry = &scankey[i - 1];
2116                 Datum           datum1,
2117                                         datum2;
2118                 bool            isnull1,
2119                                         isnull2;
2120                 int32           compare;
2121
2122                 datum1 = index_getattr(tuple1, i, tupDes, &isnull1);
2123                 datum2 = index_getattr(tuple2, i, tupDes, &isnull2);
2124
2125                 /* see comments about NULLs handling in btbuild */
2126
2127                 /* the comparison function is always of CMP type */
2128                 compare = inlineApplySortFunction(&entry->sk_func, SORTFUNC_CMP,
2129                                                                                   datum1, isnull1,
2130                                                                                   datum2, isnull2);
2131
2132                 if (compare != 0)
2133                         return (int) compare;           /* done when we find unequal
2134                                                                                  * attributes */
2135
2136                 /* they are equal, so we only need to examine one null flag */
2137                 if (isnull1)
2138                         equal_hasnull = true;
2139         }
2140
2141         /*
2142          * If btree has asked us to enforce uniqueness, complain if two equal
2143          * tuples are detected (unless there was at least one NULL field).
2144          *
2145          * It is sufficient to make the test here, because if two tuples are
2146          * equal they *must* get compared at some stage of the sort ---
2147          * otherwise the sort algorithm wouldn't have checked whether one must
2148          * appear before the other.
2149          *
2150          * Some rather brain-dead implementations of qsort will sometimes call
2151          * the comparison routine to compare a value to itself.  (At this
2152          * writing only QNX 4 is known to do such silly things.)  Don't raise
2153          * a bogus error in that case.
2154          */
2155         if (state->enforceUnique && !equal_hasnull && tuple1 != tuple2)
2156                 ereport(ERROR,
2157                                 (errcode(ERRCODE_UNIQUE_VIOLATION),
2158                                  errmsg("could not create unique index"),
2159                                  errdetail("Table contains duplicated values.")));
2160
2161         /*
2162          * If key values are equal, we sort on ItemPointer.  This does not
2163          * affect validity of the finished index, but it offers cheap
2164          * insurance against performance problems with bad qsort
2165          * implementations that have trouble with large numbers of equal keys.
2166          */
2167         {
2168                 BlockNumber blk1 = ItemPointerGetBlockNumber(&tuple1->t_tid);
2169                 BlockNumber blk2 = ItemPointerGetBlockNumber(&tuple2->t_tid);
2170
2171                 if (blk1 != blk2)
2172                         return (blk1 < blk2) ? -1 : 1;
2173         }
2174         {
2175                 OffsetNumber pos1 = ItemPointerGetOffsetNumber(&tuple1->t_tid);
2176                 OffsetNumber pos2 = ItemPointerGetOffsetNumber(&tuple2->t_tid);
2177
2178                 if (pos1 != pos2)
2179                         return (pos1 < pos2) ? -1 : 1;
2180         }
2181
2182         return 0;
2183 }
2184
2185 static void *
2186 copytup_index(Tuplesortstate *state, void *tup)
2187 {
2188         IndexTuple      tuple = (IndexTuple) tup;
2189         unsigned int tuplen = IndexTupleSize(tuple);
2190         IndexTuple      newtuple;
2191
2192         newtuple = (IndexTuple) palloc(tuplen);
2193         USEMEM(state, GetMemoryChunkSpace(newtuple));
2194
2195         memcpy(newtuple, tuple, tuplen);
2196
2197         return (void *) newtuple;
2198 }
2199
2200 static void
2201 writetup_index(Tuplesortstate *state, int tapenum, void *tup)
2202 {
2203         IndexTuple      tuple = (IndexTuple) tup;
2204         unsigned int tuplen;
2205
2206         tuplen = IndexTupleSize(tuple) + sizeof(tuplen);
2207         LogicalTapeWrite(state->tapeset, tapenum,
2208                                          (void *) &tuplen, sizeof(tuplen));
2209         LogicalTapeWrite(state->tapeset, tapenum,
2210                                          (void *) tuple, IndexTupleSize(tuple));
2211         if (state->randomAccess)        /* need trailing length word? */
2212                 LogicalTapeWrite(state->tapeset, tapenum,
2213                                                  (void *) &tuplen, sizeof(tuplen));
2214
2215         FREEMEM(state, GetMemoryChunkSpace(tuple));
2216         pfree(tuple);
2217 }
2218
2219 static void *
2220 readtup_index(Tuplesortstate *state, int tapenum, unsigned int len)
2221 {
2222         unsigned int tuplen = len - sizeof(unsigned int);
2223         IndexTuple      tuple = (IndexTuple) palloc(tuplen);
2224
2225         USEMEM(state, GetMemoryChunkSpace(tuple));
2226         if (LogicalTapeRead(state->tapeset, tapenum, (void *) tuple,
2227                                                 tuplen) != tuplen)
2228                 elog(ERROR, "unexpected end of data");
2229         if (state->randomAccess)        /* need trailing length word? */
2230                 if (LogicalTapeRead(state->tapeset, tapenum, (void *) &tuplen,
2231                                                         sizeof(tuplen)) != sizeof(tuplen))
2232                         elog(ERROR, "unexpected end of data");
2233         return (void *) tuple;
2234 }
2235
2236
2237 /*
2238  * Routines specialized for DatumTuple case
2239  */
2240
2241 static int
2242 comparetup_datum(Tuplesortstate *state, const void *a, const void *b)
2243 {
2244         DatumTuple *ltup = (DatumTuple *) a;
2245         DatumTuple *rtup = (DatumTuple *) b;
2246
2247         return inlineApplySortFunction(&state->sortOpFn, state->sortFnKind,
2248                                                                    ltup->val, ltup->isNull,
2249                                                                    rtup->val, rtup->isNull);
2250 }
2251
2252 static void *
2253 copytup_datum(Tuplesortstate *state, void *tup)
2254 {
2255         /* Not currently needed */
2256         elog(ERROR, "copytup_datum() should not be called");
2257         return NULL;
2258 }
2259
2260 static void
2261 writetup_datum(Tuplesortstate *state, int tapenum, void *tup)
2262 {
2263         DatumTuple *tuple = (DatumTuple *) tup;
2264         unsigned int tuplen;
2265         unsigned int writtenlen;
2266
2267         if (tuple->isNull || state->datumTypeByVal)
2268                 tuplen = sizeof(DatumTuple);
2269         else
2270         {
2271                 Size            datalen;
2272
2273                 datalen = datumGetSize(tuple->val, false, state->datumTypeLen);
2274                 tuplen = datalen + MAXALIGN(sizeof(DatumTuple));
2275         }
2276
2277         writtenlen = tuplen + sizeof(unsigned int);
2278
2279         LogicalTapeWrite(state->tapeset, tapenum,
2280                                          (void *) &writtenlen, sizeof(writtenlen));
2281         LogicalTapeWrite(state->tapeset, tapenum,
2282                                          (void *) tuple, tuplen);
2283         if (state->randomAccess)        /* need trailing length word? */
2284                 LogicalTapeWrite(state->tapeset, tapenum,
2285                                                  (void *) &writtenlen, sizeof(writtenlen));
2286
2287         FREEMEM(state, GetMemoryChunkSpace(tuple));
2288         pfree(tuple);
2289 }
2290
2291 static void *
2292 readtup_datum(Tuplesortstate *state, int tapenum, unsigned int len)
2293 {
2294         unsigned int tuplen = len - sizeof(unsigned int);
2295         DatumTuple *tuple = (DatumTuple *) palloc(tuplen);
2296
2297         USEMEM(state, GetMemoryChunkSpace(tuple));
2298         if (LogicalTapeRead(state->tapeset, tapenum, (void *) tuple,
2299                                                 tuplen) != tuplen)
2300                 elog(ERROR, "unexpected end of data");
2301         if (state->randomAccess)        /* need trailing length word? */
2302                 if (LogicalTapeRead(state->tapeset, tapenum, (void *) &tuplen,
2303                                                         sizeof(tuplen)) != sizeof(tuplen))
2304                         elog(ERROR, "unexpected end of data");
2305
2306         if (!tuple->isNull && !state->datumTypeByVal)
2307                 tuple->val = PointerGetDatum(((char *) tuple) +
2308                                                                          MAXALIGN(sizeof(DatumTuple)));
2309         return (void *) tuple;
2310 }