]> granicus.if.org Git - postgresql/blob - src/backend/utils/sort/tuplesort.c
Improve trace_sort code to also show the total memory or disk space used.
[postgresql] / src / backend / utils / sort / tuplesort.c
1 /*-------------------------------------------------------------------------
2  *
3  * tuplesort.c
4  *        Generalized tuple sorting routines.
5  *
6  * This module handles sorting of heap tuples, index tuples, or single
7  * Datums (and could easily support other kinds of sortable objects,
8  * if necessary).  It works efficiently for both small and large amounts
9  * of data.  Small amounts are sorted in-memory using qsort().  Large
10  * amounts are sorted using temporary files and a standard external sort
11  * algorithm.
12  *
13  * See Knuth, volume 3, for more than you want to know about the external
14  * sorting algorithm.  We divide the input into sorted runs using replacement
15  * selection, in the form of a priority tree implemented as a heap
16  * (essentially his Algorithm 5.2.3H), then merge the runs using polyphase
17  * merge, Knuth's Algorithm 5.4.2D.  The logical "tapes" used by Algorithm D
18  * are implemented by logtape.c, which avoids space wastage by recycling
19  * disk space as soon as each block is read from its "tape".
20  *
21  * We do not form the initial runs using Knuth's recommended replacement
22  * selection data structure (Algorithm 5.4.1R), because it uses a fixed
23  * number of records in memory at all times.  Since we are dealing with
24  * tuples that may vary considerably in size, we want to be able to vary
25  * the number of records kept in memory to ensure full utilization of the
26  * allowed sort memory space.  So, we keep the tuples in a variable-size
27  * heap, with the next record to go out at the top of the heap.  Like
28  * Algorithm 5.4.1R, each record is stored with the run number that it
29  * must go into, and we use (run number, key) as the ordering key for the
30  * heap.  When the run number at the top of the heap changes, we know that
31  * no more records of the prior run are left in the heap.
32  *
33  * The approximate amount of memory allowed for any one sort operation
34  * is specified in kilobytes by the caller (most pass work_mem).  Initially,
35  * we absorb tuples and simply store them in an unsorted array as long as
36  * we haven't exceeded workMem.  If we reach the end of the input without
37  * exceeding workMem, we sort the array using qsort() and subsequently return
38  * tuples just by scanning the tuple array sequentially.  If we do exceed
39  * workMem, we construct a heap using Algorithm H and begin to emit tuples
40  * into sorted runs in temporary tapes, emitting just enough tuples at each
41  * step to get back within the workMem limit.  Whenever the run number at
42  * the top of the heap changes, we begin a new run with a new output tape
43  * (selected per Algorithm D).  After the end of the input is reached,
44  * we dump out remaining tuples in memory into a final run (or two),
45  * then merge the runs using Algorithm D.
46  *
47  * When merging runs, we use a heap containing just the frontmost tuple from
48  * each source run; we repeatedly output the smallest tuple and insert the
49  * next tuple from its source tape (if any).  When the heap empties, the merge
50  * is complete.  The basic merge algorithm thus needs very little memory ---
51  * only M tuples for an M-way merge, and M is at most six in the present code.
52  * However, we can still make good use of our full workMem allocation by
53  * pre-reading additional tuples from each source tape.  Without prereading,
54  * our access pattern to the temporary file would be very erratic; on average
55  * we'd read one block from each of M source tapes during the same time that
56  * we're writing M blocks to the output tape, so there is no sequentiality of
57  * access at all, defeating the read-ahead methods used by most Unix kernels.
58  * Worse, the output tape gets written into a very random sequence of blocks
59  * of the temp file, ensuring that things will be even worse when it comes
60  * time to read that tape.      A straightforward merge pass thus ends up doing a
61  * lot of waiting for disk seeks.  We can improve matters by prereading from
62  * each source tape sequentially, loading about workMem/M bytes from each tape
63  * in turn.  Then we run the merge algorithm, writing but not reading until
64  * one of the preloaded tuple series runs out.  Then we switch back to preread
65  * mode, fill memory again, and repeat.  This approach helps to localize both
66  * read and write accesses.
67  *
68  * When the caller requests random access to the sort result, we form
69  * the final sorted run on a logical tape which is then "frozen", so
70  * that we can access it randomly.      When the caller does not need random
71  * access, we return from tuplesort_performsort() as soon as we are down
72  * to one run per logical tape.  The final merge is then performed
73  * on-the-fly as the caller repeatedly calls tuplesort_gettuple; this
74  * saves one cycle of writing all the data out to disk and reading it in.
75  *
76  *
77  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
78  * Portions Copyright (c) 1994, Regents of the University of California
79  *
80  * IDENTIFICATION
81  *        $PostgreSQL: pgsql/src/backend/utils/sort/tuplesort.c,v 1.53 2005/10/18 22:59:37 tgl Exp $
82  *
83  *-------------------------------------------------------------------------
84  */
85
86 #include "postgres.h"
87
88 #include "access/heapam.h"
89 #include "access/nbtree.h"
90 #include "catalog/pg_amop.h"
91 #include "catalog/pg_operator.h"
92 #include "miscadmin.h"
93 #include "utils/catcache.h"
94 #include "utils/datum.h"
95 #include "utils/logtape.h"
96 #include "utils/lsyscache.h"
97 #include "utils/memutils.h"
98 #include "utils/pg_rusage.h"
99 #include "utils/syscache.h"
100 #include "utils/tuplesort.h"
101
102
103 /* GUC variable */
104 #ifdef TRACE_SORT
105 bool            trace_sort = false;
106 #endif
107
108
109 /*
110  * Possible states of a Tuplesort object.  These denote the states that
111  * persist between calls of Tuplesort routines.
112  */
113 typedef enum
114 {
115         TSS_INITIAL,                            /* Loading tuples; still within memory limit */
116         TSS_BUILDRUNS,                          /* Loading tuples; writing to tape */
117         TSS_SORTEDINMEM,                        /* Sort completed entirely in memory */
118         TSS_SORTEDONTAPE,                       /* Sort completed, final run is on tape */
119         TSS_FINALMERGE                          /* Performing final merge on-the-fly */
120 } TupSortStatus;
121
122 /*
123  * We use a seven-tape polyphase merge, which is the "sweet spot" on the
124  * tapes-to-passes curve according to Knuth's figure 70 (section 5.4.2).
125  */
126 #define MAXTAPES                7               /* Knuth's T */
127 #define TAPERANGE               (MAXTAPES-1)    /* Knuth's P */
128
129 /*
130  * Private state of a Tuplesort operation.
131  */
132 struct Tuplesortstate
133 {
134         TupSortStatus status;           /* enumerated value as shown above */
135         bool            randomAccess;   /* did caller request random access? */
136         long            availMem;               /* remaining memory available, in bytes */
137         long            allowedMem;             /* total memory allowed, in bytes */
138         LogicalTapeSet *tapeset;        /* logtape.c object for tapes in a temp file */
139
140         /*
141          * These function pointers decouple the routines that must know what kind
142          * of tuple we are sorting from the routines that don't need to know it.
143          * They are set up by the tuplesort_begin_xxx routines.
144          *
145          * Function to compare two tuples; result is per qsort() convention, ie:
146          *
147          * <0, 0, >0 according as a<b, a=b, a>b.
148          */
149         int                     (*comparetup) (Tuplesortstate *state, const void *a, const void *b);
150
151         /*
152          * Function to copy a supplied input tuple into palloc'd space. (NB: we
153          * assume that a single pfree() is enough to release the tuple later, so
154          * the representation must be "flat" in one palloc chunk.) state->availMem
155          * must be decreased by the amount of space used.
156          */
157         void       *(*copytup) (Tuplesortstate *state, void *tup);
158
159         /*
160          * Function to write a stored tuple onto tape.  The representation of the
161          * tuple on tape need not be the same as it is in memory; requirements on
162          * the tape representation are given below.  After writing the tuple,
163          * pfree() it, and increase state->availMem by the amount of memory space
164          * thereby released.
165          */
166         void            (*writetup) (Tuplesortstate *state, int tapenum, void *tup);
167
168         /*
169          * Function to read a stored tuple from tape back into memory. 'len' is
170          * the already-read length of the stored tuple.  Create and return a
171          * palloc'd copy, and decrease state->availMem by the amount of memory
172          * space consumed.
173          */
174         void       *(*readtup) (Tuplesortstate *state, int tapenum, unsigned int len);
175
176         /*
177          * This array holds pointers to tuples in sort memory.  If we are in state
178          * INITIAL, the tuples are in no particular order; if we are in state
179          * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
180          * and FINALMERGE, the tuples are organized in "heap" order per Algorithm
181          * H.  (Note that memtupcount only counts the tuples that are part of the
182          * heap --- during merge passes, memtuples[] entries beyond TAPERANGE are
183          * never in the heap and are used to hold pre-read tuples.)  In state
184          * SORTEDONTAPE, the array is not used.
185          */
186         void      **memtuples;          /* array of pointers to palloc'd tuples */
187         int                     memtupcount;    /* number of tuples currently present */
188         int                     memtupsize;             /* allocated length of memtuples array */
189
190         /*
191          * While building initial runs, this array holds the run number for each
192          * tuple in memtuples[].  During merge passes, we re-use it to hold the
193          * input tape number that each tuple in the heap was read from, or to hold
194          * the index of the next tuple pre-read from the same tape in the case of
195          * pre-read entries.  This array is never allocated unless we need to use
196          * tapes.  Whenever it is allocated, it has the same length as
197          * memtuples[].
198          */
199         int                *memtupindex;        /* index value associated with memtuples[i] */
200
201         /*
202          * While building initial runs, this is the current output run number
203          * (starting at 0).  Afterwards, it is the number of initial runs we made.
204          */
205         int                     currentRun;
206
207         /*
208          * These variables are only used during merge passes.  mergeactive[i] is
209          * true if we are reading an input run from (actual) tape number i and
210          * have not yet exhausted that run.  mergenext[i] is the memtuples index
211          * of the next pre-read tuple (next to be loaded into the heap) for tape
212          * i, or 0 if we are out of pre-read tuples.  mergelast[i] similarly
213          * points to the last pre-read tuple from each tape. mergeavailmem[i] is
214          * the amount of unused space allocated for tape i. mergefreelist and
215          * mergefirstfree keep track of unused locations in the memtuples[] array.
216          * memtupindex[] links together pre-read tuples for each tape as well as
217          * recycled locations in mergefreelist. It is OK to use 0 as a null link
218          * in these lists, because memtuples[0] is part of the merge heap and is
219          * never a pre-read tuple.
220          */
221         bool            mergeactive[MAXTAPES];  /* Active input run source? */
222         int                     mergenext[MAXTAPES];    /* first preread tuple for each source */
223         int                     mergelast[MAXTAPES];    /* last preread tuple for each source */
224         long            mergeavailmem[MAXTAPES];                /* availMem for prereading
225                                                                                                  * tapes */
226         long            spacePerTape;   /* actual per-tape target usage */
227         int                     mergefreelist;  /* head of freelist of recycled slots */
228         int                     mergefirstfree; /* first slot never used in this merge */
229
230         /*
231          * Variables for Algorithm D.  Note that destTape is a "logical" tape
232          * number, ie, an index into the tp_xxx[] arrays.  Be careful to keep
233          * "logical" and "actual" tape numbers straight!
234          */
235         int                     Level;                  /* Knuth's l */
236         int                     destTape;               /* current output tape (Knuth's j, less 1) */
237         int                     tp_fib[MAXTAPES];               /* Target Fibonacci run counts (A[]) */
238         int                     tp_runs[MAXTAPES];              /* # of real runs on each tape */
239         int                     tp_dummy[MAXTAPES];             /* # of dummy runs for each tape (D[]) */
240         int                     tp_tapenum[MAXTAPES];   /* Actual tape numbers (TAPE[]) */
241
242         /*
243          * These variables are used after completion of sorting to keep track of
244          * the next tuple to return.  (In the tape case, the tape's current read
245          * position is also critical state.)
246          */
247         int                     result_tape;    /* actual tape number of finished output */
248         int                     current;                /* array index (only used if SORTEDINMEM) */
249         bool            eof_reached;    /* reached EOF (needed for cursors) */
250
251         /* markpos_xxx holds marked position for mark and restore */
252         long            markpos_block;  /* tape block# (only used if SORTEDONTAPE) */
253         int                     markpos_offset; /* saved "current", or offset in tape block */
254         bool            markpos_eof;    /* saved "eof_reached" */
255
256         /*
257          * These variables are specific to the HeapTuple case; they are set by
258          * tuplesort_begin_heap and used only by the HeapTuple routines.
259          */
260         TupleDesc       tupDesc;
261         int                     nKeys;
262         ScanKey         scanKeys;
263         SortFunctionKind *sortFnKinds;
264
265         /*
266          * These variables are specific to the IndexTuple case; they are set by
267          * tuplesort_begin_index and used only by the IndexTuple routines.
268          */
269         Relation        indexRel;
270         ScanKey         indexScanKey;
271         bool            enforceUnique;  /* complain if we find duplicate tuples */
272
273         /*
274          * These variables are specific to the Datum case; they are set by
275          * tuplesort_begin_datum and used only by the DatumTuple routines.
276          */
277         Oid                     datumType;
278         Oid                     sortOperator;
279         FmgrInfo        sortOpFn;               /* cached lookup data for sortOperator */
280         SortFunctionKind sortFnKind;
281         /* we need typelen and byval in order to know how to copy the Datums. */
282         int                     datumTypeLen;
283         bool            datumTypeByVal;
284
285         /*
286          * Resource snapshot for time of sort start.
287          */
288 #ifdef TRACE_SORT
289         PGRUsage        ru_start;
290 #endif
291 };
292
293 #define COMPARETUP(state,a,b)   ((*(state)->comparetup) (state, a, b))
294 #define COPYTUP(state,tup)      ((*(state)->copytup) (state, tup))
295 #define WRITETUP(state,tape,tup)        ((*(state)->writetup) (state, tape, tup))
296 #define READTUP(state,tape,len) ((*(state)->readtup) (state, tape, len))
297 #define LACKMEM(state)          ((state)->availMem < 0)
298 #define USEMEM(state,amt)       ((state)->availMem -= (amt))
299 #define FREEMEM(state,amt)      ((state)->availMem += (amt))
300
301 /*--------------------
302  *
303  * NOTES about on-tape representation of tuples:
304  *
305  * We require the first "unsigned int" of a stored tuple to be the total size
306  * on-tape of the tuple, including itself (so it is never zero; an all-zero
307  * unsigned int is used to delimit runs).  The remainder of the stored tuple
308  * may or may not match the in-memory representation of the tuple ---
309  * any conversion needed is the job of the writetup and readtup routines.
310  *
311  * If state->randomAccess is true, then the stored representation of the
312  * tuple must be followed by another "unsigned int" that is a copy of the
313  * length --- so the total tape space used is actually sizeof(unsigned int)
314  * more than the stored length value.  This allows read-backwards.      When
315  * randomAccess is not true, the write/read routines may omit the extra
316  * length word.
317  *
318  * writetup is expected to write both length words as well as the tuple
319  * data.  When readtup is called, the tape is positioned just after the
320  * front length word; readtup must read the tuple data and advance past
321  * the back length word (if present).
322  *
323  * The write/read routines can make use of the tuple description data
324  * stored in the Tuplesortstate record, if needed.      They are also expected
325  * to adjust state->availMem by the amount of memory space (not tape space!)
326  * released or consumed.  There is no error return from either writetup
327  * or readtup; they should ereport() on failure.
328  *
329  *
330  * NOTES about memory consumption calculations:
331  *
332  * We count space allocated for tuples against the workMem limit, plus
333  * the space used by the variable-size arrays memtuples and memtupindex.
334  * Fixed-size space (primarily the LogicalTapeSet I/O buffers) is not
335  * counted.
336  *
337  * Note that we count actual space used (as shown by GetMemoryChunkSpace)
338  * rather than the originally-requested size.  This is important since
339  * palloc can add substantial overhead.  It's not a complete answer since
340  * we won't count any wasted space in palloc allocation blocks, but it's
341  * a lot better than what we were doing before 7.3.
342  *
343  *--------------------
344  */
345
346 /*
347  * For sorting single Datums, we build "pseudo tuples" that just carry
348  * the datum's value and null flag.  For pass-by-reference data types,
349  * the actual data value appears after the DatumTupleHeader (MAXALIGNed,
350  * of course), and the value field in the header is just a pointer to it.
351  */
352
353 typedef struct
354 {
355         Datum           val;
356         bool            isNull;
357 } DatumTuple;
358
359
360 static Tuplesortstate *tuplesort_begin_common(int workMem, bool randomAccess);
361 static void puttuple_common(Tuplesortstate *state, void *tuple);
362 static void inittapes(Tuplesortstate *state);
363 static void selectnewtape(Tuplesortstate *state);
364 static void mergeruns(Tuplesortstate *state);
365 static void mergeonerun(Tuplesortstate *state);
366 static void beginmerge(Tuplesortstate *state);
367 static void mergepreread(Tuplesortstate *state);
368 static void dumptuples(Tuplesortstate *state, bool alltuples);
369 static void tuplesort_heap_insert(Tuplesortstate *state, void *tuple,
370                                           int tupleindex, bool checkIndex);
371 static void tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex);
372 static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
373 static void markrunend(Tuplesortstate *state, int tapenum);
374 static int      qsort_comparetup(const void *a, const void *b);
375 static int comparetup_heap(Tuplesortstate *state,
376                                 const void *a, const void *b);
377 static void *copytup_heap(Tuplesortstate *state, void *tup);
378 static void writetup_heap(Tuplesortstate *state, int tapenum, void *tup);
379 static void *readtup_heap(Tuplesortstate *state, int tapenum,
380                          unsigned int len);
381 static int comparetup_index(Tuplesortstate *state,
382                                  const void *a, const void *b);
383 static void *copytup_index(Tuplesortstate *state, void *tup);
384 static void writetup_index(Tuplesortstate *state, int tapenum, void *tup);
385 static void *readtup_index(Tuplesortstate *state, int tapenum,
386                           unsigned int len);
387 static int comparetup_datum(Tuplesortstate *state,
388                                  const void *a, const void *b);
389 static void *copytup_datum(Tuplesortstate *state, void *tup);
390 static void writetup_datum(Tuplesortstate *state, int tapenum, void *tup);
391 static void *readtup_datum(Tuplesortstate *state, int tapenum,
392                           unsigned int len);
393
394 /*
395  * Since qsort(3) will not pass any context info to qsort_comparetup(),
396  * we have to use this ugly static variable.  It is set to point to the
397  * active Tuplesortstate object just before calling qsort.      It should
398  * not be used directly by anything except qsort_comparetup().
399  */
400 static Tuplesortstate *qsort_tuplesortstate;
401
402
403 /*
404  *              tuplesort_begin_xxx
405  *
406  * Initialize for a tuple sort operation.
407  *
408  * After calling tuplesort_begin, the caller should call tuplesort_puttuple
409  * zero or more times, then call tuplesort_performsort when all the tuples
410  * have been supplied.  After performsort, retrieve the tuples in sorted
411  * order by calling tuplesort_gettuple until it returns NULL.  (If random
412  * access was requested, rescan, markpos, and restorepos can also be called.)
413  * For Datum sorts, putdatum/getdatum are used instead of puttuple/gettuple.
414  * Call tuplesort_end to terminate the operation and release memory/disk space.
415  *
416  * Each variant of tuplesort_begin has a workMem parameter specifying the
417  * maximum number of kilobytes of RAM to use before spilling data to disk.
418  * (The normal value of this parameter is work_mem, but some callers use
419  * other values.)  Each variant also has a randomAccess parameter specifying
420  * whether the caller needs non-sequential access to the sort result.
421  */
422
423 static Tuplesortstate *
424 tuplesort_begin_common(int workMem, bool randomAccess)
425 {
426         Tuplesortstate *state;
427
428         state = (Tuplesortstate *) palloc0(sizeof(Tuplesortstate));
429
430 #ifdef TRACE_SORT
431         if (trace_sort)
432                 pg_rusage_init(&state->ru_start);
433 #endif
434
435         state->status = TSS_INITIAL;
436         state->randomAccess = randomAccess;
437         state->allowedMem = workMem * 1024L;
438         state->availMem = state->allowedMem;
439         state->tapeset = NULL;
440
441         state->memtupcount = 0;
442         state->memtupsize = 1024;       /* initial guess */
443         state->memtuples = (void **) palloc(state->memtupsize * sizeof(void *));
444
445         state->memtupindex = NULL;      /* until and unless needed */
446
447         USEMEM(state, GetMemoryChunkSpace(state->memtuples));
448
449         state->currentRun = 0;
450
451         /* Algorithm D variables will be initialized by inittapes, if needed */
452
453         state->result_tape = -1;        /* flag that result tape has not been formed */
454
455         return state;
456 }
457
458 Tuplesortstate *
459 tuplesort_begin_heap(TupleDesc tupDesc,
460                                          int nkeys,
461                                          Oid *sortOperators, AttrNumber *attNums,
462                                          int workMem, bool randomAccess)
463 {
464         Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
465         int                     i;
466
467         AssertArg(nkeys > 0);
468
469 #ifdef TRACE_SORT
470         if (trace_sort)
471                 elog(NOTICE,
472                          "begin tuple sort: nkeys = %d, workMem = %d, randomAccess = %c",
473                          nkeys, workMem, randomAccess ? 't' : 'f');
474 #endif
475
476         state->comparetup = comparetup_heap;
477         state->copytup = copytup_heap;
478         state->writetup = writetup_heap;
479         state->readtup = readtup_heap;
480
481         state->tupDesc = tupDesc;
482         state->nKeys = nkeys;
483         state->scanKeys = (ScanKey) palloc0(nkeys * sizeof(ScanKeyData));
484         state->sortFnKinds = (SortFunctionKind *)
485                 palloc0(nkeys * sizeof(SortFunctionKind));
486
487         for (i = 0; i < nkeys; i++)
488         {
489                 RegProcedure sortFunction;
490
491                 AssertArg(sortOperators[i] != 0);
492                 AssertArg(attNums[i] != 0);
493
494                 /* select a function that implements the sort operator */
495                 SelectSortFunction(sortOperators[i], &sortFunction,
496                                                    &state->sortFnKinds[i]);
497
498                 /*
499                  * We needn't fill in sk_strategy or sk_subtype since these scankeys
500                  * will never be passed to an index.
501                  */
502                 ScanKeyInit(&state->scanKeys[i],
503                                         attNums[i],
504                                         InvalidStrategy,
505                                         sortFunction,
506                                         (Datum) 0);
507         }
508
509         return state;
510 }
511
512 Tuplesortstate *
513 tuplesort_begin_index(Relation indexRel,
514                                           bool enforceUnique,
515                                           int workMem, bool randomAccess)
516 {
517         Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
518
519 #ifdef TRACE_SORT
520         if (trace_sort)
521                 elog(NOTICE,
522                          "begin index sort: unique = %c, workMem = %d, randomAccess = %c",
523                          enforceUnique ? 't' : 'f',
524                          workMem, randomAccess ? 't' : 'f');
525 #endif
526
527         state->comparetup = comparetup_index;
528         state->copytup = copytup_index;
529         state->writetup = writetup_index;
530         state->readtup = readtup_index;
531
532         state->indexRel = indexRel;
533         /* see comments below about btree dependence of this code... */
534         state->indexScanKey = _bt_mkscankey_nodata(indexRel);
535         state->enforceUnique = enforceUnique;
536
537         return state;
538 }
539
540 Tuplesortstate *
541 tuplesort_begin_datum(Oid datumType,
542                                           Oid sortOperator,
543                                           int workMem, bool randomAccess)
544 {
545         Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
546         RegProcedure sortFunction;
547         int16           typlen;
548         bool            typbyval;
549
550 #ifdef TRACE_SORT
551         if (trace_sort)
552                 elog(NOTICE,
553                          "begin datum sort: workMem = %d, randomAccess = %c",
554                          workMem, randomAccess ? 't' : 'f');
555 #endif
556
557         state->comparetup = comparetup_datum;
558         state->copytup = copytup_datum;
559         state->writetup = writetup_datum;
560         state->readtup = readtup_datum;
561
562         state->datumType = datumType;
563         state->sortOperator = sortOperator;
564
565         /* select a function that implements the sort operator */
566         SelectSortFunction(sortOperator, &sortFunction, &state->sortFnKind);
567         /* and look up the function */
568         fmgr_info(sortFunction, &state->sortOpFn);
569
570         /* lookup necessary attributes of the datum type */
571         get_typlenbyval(datumType, &typlen, &typbyval);
572         state->datumTypeLen = typlen;
573         state->datumTypeByVal = typbyval;
574
575         return state;
576 }
577
578 /*
579  * tuplesort_end
580  *
581  *      Release resources and clean up.
582  */
583 void
584 tuplesort_end(Tuplesortstate *state)
585 {
586         int                     i;
587 #ifdef TRACE_SORT
588         long            spaceUsed;
589 #endif
590
591         if (state->tapeset)
592         {
593 #ifdef TRACE_SORT
594                 spaceUsed = LogicalTapeSetBlocks(state->tapeset);
595 #endif
596                 LogicalTapeSetClose(state->tapeset);
597         }
598         else
599         {
600 #ifdef TRACE_SORT
601                 spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
602 #endif
603         }
604
605         if (state->memtuples)
606         {
607                 for (i = 0; i < state->memtupcount; i++)
608                         pfree(state->memtuples[i]);
609                 pfree(state->memtuples);
610         }
611         if (state->memtupindex)
612                 pfree(state->memtupindex);
613
614         /*
615          * this stuff might better belong in a variant-specific shutdown routine
616          */
617         if (state->scanKeys)
618                 pfree(state->scanKeys);
619         if (state->sortFnKinds)
620                 pfree(state->sortFnKinds);
621
622 #ifdef TRACE_SORT
623         if (trace_sort)
624         {
625                 if (state->tapeset)
626                         elog(NOTICE, "external sort ended, %ld disk blocks used: %s",
627                                  spaceUsed, pg_rusage_show(&state->ru_start));
628                 else
629                         elog(NOTICE, "internal sort ended, %ld KB used: %s",
630                                  spaceUsed, pg_rusage_show(&state->ru_start));
631         }
632 #endif
633
634         pfree(state);
635 }
636
637 /*
638  * Accept one tuple while collecting input data for sort.
639  *
640  * Note that the input tuple is always copied; the caller need not save it.
641  */
642 void
643 tuplesort_puttuple(Tuplesortstate *state, void *tuple)
644 {
645         /*
646          * Copy the given tuple into memory we control, and decrease availMem.
647          * Then call the code shared with the Datum case.
648          */
649         tuple = COPYTUP(state, tuple);
650
651         puttuple_common(state, tuple);
652 }
653
654 /*
655  * Accept one Datum while collecting input data for sort.
656  *
657  * If the Datum is pass-by-ref type, the value will be copied.
658  */
659 void
660 tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull)
661 {
662         DatumTuple *tuple;
663
664         /*
665          * Build pseudo-tuple carrying the datum, and decrease availMem.
666          */
667         if (isNull || state->datumTypeByVal)
668         {
669                 tuple = (DatumTuple *) palloc(sizeof(DatumTuple));
670                 tuple->val = val;
671                 tuple->isNull = isNull;
672         }
673         else
674         {
675                 Size            datalen;
676                 Size            tuplelen;
677                 char       *newVal;
678
679                 datalen = datumGetSize(val, false, state->datumTypeLen);
680                 tuplelen = datalen + MAXALIGN(sizeof(DatumTuple));
681                 tuple = (DatumTuple *) palloc(tuplelen);
682                 newVal = ((char *) tuple) + MAXALIGN(sizeof(DatumTuple));
683                 memcpy(newVal, DatumGetPointer(val), datalen);
684                 tuple->val = PointerGetDatum(newVal);
685                 tuple->isNull = false;
686         }
687
688         USEMEM(state, GetMemoryChunkSpace(tuple));
689
690         puttuple_common(state, (void *) tuple);
691 }
692
693 /*
694  * Shared code for tuple and datum cases.
695  */
696 static void
697 puttuple_common(Tuplesortstate *state, void *tuple)
698 {
699         switch (state->status)
700         {
701                 case TSS_INITIAL:
702
703                         /*
704                          * Save the copied tuple into the unsorted array.
705                          */
706                         if (state->memtupcount >= state->memtupsize)
707                         {
708                                 /* Grow the unsorted array as needed. */
709                                 FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
710                                 state->memtupsize *= 2;
711                                 state->memtuples = (void **)
712                                         repalloc(state->memtuples,
713                                                          state->memtupsize * sizeof(void *));
714                                 USEMEM(state, GetMemoryChunkSpace(state->memtuples));
715                         }
716                         state->memtuples[state->memtupcount++] = tuple;
717
718                         /*
719                          * Done if we still fit in available memory.
720                          */
721                         if (!LACKMEM(state))
722                                 return;
723
724                         /*
725                          * Nope; time to switch to tape-based operation.
726                          */
727                         inittapes(state);
728
729                         /*
730                          * Dump tuples until we are back under the limit.
731                          */
732                         dumptuples(state, false);
733                         break;
734                 case TSS_BUILDRUNS:
735
736                         /*
737                          * Insert the copied tuple into the heap, with run number
738                          * currentRun if it can go into the current run, else run number
739                          * currentRun+1.  The tuple can go into the current run if it is
740                          * >= the first not-yet-output tuple.  (Actually, it could go into
741                          * the current run if it is >= the most recently output tuple ...
742                          * but that would require keeping around the tuple we last output,
743                          * and it's simplest to let writetup free each tuple as soon as
744                          * it's written.)
745                          *
746                          * Note there will always be at least one tuple in the heap at this
747                          * point; see dumptuples.
748                          */
749                         Assert(state->memtupcount > 0);
750                         if (COMPARETUP(state, tuple, state->memtuples[0]) >= 0)
751                                 tuplesort_heap_insert(state, tuple, state->currentRun, true);
752                         else
753                                 tuplesort_heap_insert(state, tuple, state->currentRun + 1, true);
754
755                         /*
756                          * If we are over the memory limit, dump tuples till we're under.
757                          */
758                         dumptuples(state, false);
759                         break;
760                 default:
761                         elog(ERROR, "invalid tuplesort state");
762                         break;
763         }
764 }
765
766 /*
767  * All tuples have been provided; finish the sort.
768  */
769 void
770 tuplesort_performsort(Tuplesortstate *state)
771 {
772 #ifdef TRACE_SORT
773         if (trace_sort)
774                 elog(NOTICE, "performsort starting: %s",
775                          pg_rusage_show(&state->ru_start));
776 #endif
777
778         switch (state->status)
779         {
780                 case TSS_INITIAL:
781
782                         /*
783                          * We were able to accumulate all the tuples within the allowed
784                          * amount of memory.  Just qsort 'em and we're done.
785                          */
786                         if (state->memtupcount > 1)
787                         {
788                                 qsort_tuplesortstate = state;
789                                 qsort((void *) state->memtuples, state->memtupcount,
790                                           sizeof(void *), qsort_comparetup);
791                         }
792                         state->current = 0;
793                         state->eof_reached = false;
794                         state->markpos_offset = 0;
795                         state->markpos_eof = false;
796                         state->status = TSS_SORTEDINMEM;
797                         break;
798                 case TSS_BUILDRUNS:
799
800                         /*
801                          * Finish tape-based sort.      First, flush all tuples remaining in
802                          * memory out to tape; then merge until we have a single remaining
803                          * run (or, if !randomAccess, one run per tape). Note that
804                          * mergeruns sets the correct state->status.
805                          */
806                         dumptuples(state, true);
807                         mergeruns(state);
808                         state->eof_reached = false;
809                         state->markpos_block = 0L;
810                         state->markpos_offset = 0;
811                         state->markpos_eof = false;
812                         break;
813                 default:
814                         elog(ERROR, "invalid tuplesort state");
815                         break;
816         }
817
818 #ifdef TRACE_SORT
819         if (trace_sort)
820                 elog(NOTICE, "performsort done%s: %s",
821                          (state->status == TSS_FINALMERGE) ? " (except final merge)" : "",
822                          pg_rusage_show(&state->ru_start));
823 #endif
824 }
825
826 /*
827  * Fetch the next tuple in either forward or back direction.
828  * Returns NULL if no more tuples.      If should_free is set, the
829  * caller must pfree the returned tuple when done with it.
830  */
831 void *
832 tuplesort_gettuple(Tuplesortstate *state, bool forward,
833                                    bool *should_free)
834 {
835         unsigned int tuplen;
836         void       *tup;
837
838         switch (state->status)
839         {
840                 case TSS_SORTEDINMEM:
841                         Assert(forward || state->randomAccess);
842                         *should_free = false;
843                         if (forward)
844                         {
845                                 if (state->current < state->memtupcount)
846                                         return state->memtuples[state->current++];
847                                 state->eof_reached = true;
848                                 return NULL;
849                         }
850                         else
851                         {
852                                 if (state->current <= 0)
853                                         return NULL;
854
855                                 /*
856                                  * if all tuples are fetched already then we return last
857                                  * tuple, else - tuple before last returned.
858                                  */
859                                 if (state->eof_reached)
860                                         state->eof_reached = false;
861                                 else
862                                 {
863                                         state->current--;       /* last returned tuple */
864                                         if (state->current <= 0)
865                                                 return NULL;
866                                 }
867                                 return state->memtuples[state->current - 1];
868                         }
869                         break;
870
871                 case TSS_SORTEDONTAPE:
872                         Assert(forward || state->randomAccess);
873                         *should_free = true;
874                         if (forward)
875                         {
876                                 if (state->eof_reached)
877                                         return NULL;
878                                 if ((tuplen = getlen(state, state->result_tape, true)) != 0)
879                                 {
880                                         tup = READTUP(state, state->result_tape, tuplen);
881                                         return tup;
882                                 }
883                                 else
884                                 {
885                                         state->eof_reached = true;
886                                         return NULL;
887                                 }
888                         }
889
890                         /*
891                          * Backward.
892                          *
893                          * if all tuples are fetched already then we return last tuple, else
894                          * - tuple before last returned.
895                          */
896                         if (state->eof_reached)
897                         {
898                                 /*
899                                  * Seek position is pointing just past the zero tuplen at the
900                                  * end of file; back up to fetch last tuple's ending length
901                                  * word.  If seek fails we must have a completely empty file.
902                                  */
903                                 if (!LogicalTapeBackspace(state->tapeset,
904                                                                                   state->result_tape,
905                                                                                   2 * sizeof(unsigned int)))
906                                         return NULL;
907                                 state->eof_reached = false;
908                         }
909                         else
910                         {
911                                 /*
912                                  * Back up and fetch previously-returned tuple's ending length
913                                  * word.  If seek fails, assume we are at start of file.
914                                  */
915                                 if (!LogicalTapeBackspace(state->tapeset,
916                                                                                   state->result_tape,
917                                                                                   sizeof(unsigned int)))
918                                         return NULL;
919                                 tuplen = getlen(state, state->result_tape, false);
920
921                                 /*
922                                  * Back up to get ending length word of tuple before it.
923                                  */
924                                 if (!LogicalTapeBackspace(state->tapeset,
925                                                                                   state->result_tape,
926                                                                                   tuplen + 2 * sizeof(unsigned int)))
927                                 {
928                                         /*
929                                          * If that fails, presumably the prev tuple is the first
930                                          * in the file.  Back up so that it becomes next to read
931                                          * in forward direction (not obviously right, but that is
932                                          * what in-memory case does).
933                                          */
934                                         if (!LogicalTapeBackspace(state->tapeset,
935                                                                                           state->result_tape,
936                                                                                           tuplen + sizeof(unsigned int)))
937                                                 elog(ERROR, "bogus tuple length in backward scan");
938                                         return NULL;
939                                 }
940                         }
941
942                         tuplen = getlen(state, state->result_tape, false);
943
944                         /*
945                          * Now we have the length of the prior tuple, back up and read it.
946                          * Note: READTUP expects we are positioned after the initial
947                          * length word of the tuple, so back up to that point.
948                          */
949                         if (!LogicalTapeBackspace(state->tapeset,
950                                                                           state->result_tape,
951                                                                           tuplen))
952                                 elog(ERROR, "bogus tuple length in backward scan");
953                         tup = READTUP(state, state->result_tape, tuplen);
954                         return tup;
955
956                 case TSS_FINALMERGE:
957                         Assert(forward);
958                         *should_free = true;
959
960                         /*
961                          * This code should match the inner loop of mergeonerun().
962                          */
963                         if (state->memtupcount > 0)
964                         {
965                                 int                     srcTape = state->memtupindex[0];
966                                 Size            tuplen;
967                                 int                     tupIndex;
968                                 void       *newtup;
969
970                                 tup = state->memtuples[0];
971                                 /* returned tuple is no longer counted in our memory space */
972                                 tuplen = GetMemoryChunkSpace(tup);
973                                 state->availMem += tuplen;
974                                 state->mergeavailmem[srcTape] += tuplen;
975                                 tuplesort_heap_siftup(state, false);
976                                 if ((tupIndex = state->mergenext[srcTape]) == 0)
977                                 {
978                                         /*
979                                          * out of preloaded data on this tape, try to read more
980                                          */
981                                         mergepreread(state);
982
983                                         /*
984                                          * if still no data, we've reached end of run on this tape
985                                          */
986                                         if ((tupIndex = state->mergenext[srcTape]) == 0)
987                                                 return tup;
988                                 }
989                                 /* pull next preread tuple from list, insert in heap */
990                                 newtup = state->memtuples[tupIndex];
991                                 state->mergenext[srcTape] = state->memtupindex[tupIndex];
992                                 if (state->mergenext[srcTape] == 0)
993                                         state->mergelast[srcTape] = 0;
994                                 state->memtupindex[tupIndex] = state->mergefreelist;
995                                 state->mergefreelist = tupIndex;
996                                 tuplesort_heap_insert(state, newtup, srcTape, false);
997                                 return tup;
998                         }
999                         return NULL;
1000
1001                 default:
1002                         elog(ERROR, "invalid tuplesort state");
1003                         return NULL;            /* keep compiler quiet */
1004         }
1005 }
1006
1007 /*
1008  * Fetch the next Datum in either forward or back direction.
1009  * Returns FALSE if no more datums.
1010  *
1011  * If the Datum is pass-by-ref type, the returned value is freshly palloc'd
1012  * and is now owned by the caller.
1013  */
1014 bool
1015 tuplesort_getdatum(Tuplesortstate *state, bool forward,
1016                                    Datum *val, bool *isNull)
1017 {
1018         DatumTuple *tuple;
1019         bool            should_free;
1020
1021         tuple = (DatumTuple *) tuplesort_gettuple(state, forward, &should_free);
1022
1023         if (tuple == NULL)
1024                 return false;
1025
1026         if (tuple->isNull || state->datumTypeByVal)
1027         {
1028                 *val = tuple->val;
1029                 *isNull = tuple->isNull;
1030         }
1031         else
1032         {
1033                 *val = datumCopy(tuple->val, false, state->datumTypeLen);
1034                 *isNull = false;
1035         }
1036
1037         if (should_free)
1038                 pfree(tuple);
1039
1040         return true;
1041 }
1042
1043
1044 /*
1045  * inittapes - initialize for tape sorting.
1046  *
1047  * This is called only if we have found we don't have room to sort in memory.
1048  */
1049 static void
1050 inittapes(Tuplesortstate *state)
1051 {
1052         int                     ntuples,
1053                                 j;
1054
1055 #ifdef TRACE_SORT
1056         if (trace_sort)
1057                 elog(NOTICE, "switching to external sort: %s",
1058                          pg_rusage_show(&state->ru_start));
1059 #endif
1060
1061         state->tapeset = LogicalTapeSetCreate(MAXTAPES);
1062
1063         /*
1064          * Allocate the memtupindex array, same size as memtuples.
1065          */
1066         state->memtupindex = (int *) palloc(state->memtupsize * sizeof(int));
1067
1068         USEMEM(state, GetMemoryChunkSpace(state->memtupindex));
1069
1070         /*
1071          * Convert the unsorted contents of memtuples[] into a heap. Each tuple is
1072          * marked as belonging to run number zero.
1073          *
1074          * NOTE: we pass false for checkIndex since there's no point in comparing
1075          * indexes in this step, even though we do intend the indexes to be part
1076          * of the sort key...
1077          */
1078         ntuples = state->memtupcount;
1079         state->memtupcount = 0;         /* make the heap empty */
1080         for (j = 0; j < ntuples; j++)
1081                 tuplesort_heap_insert(state, state->memtuples[j], 0, false);
1082         Assert(state->memtupcount == ntuples);
1083
1084         state->currentRun = 0;
1085
1086         /*
1087          * Initialize variables of Algorithm D (step D1).
1088          */
1089         for (j = 0; j < MAXTAPES; j++)
1090         {
1091                 state->tp_fib[j] = 1;
1092                 state->tp_runs[j] = 0;
1093                 state->tp_dummy[j] = 1;
1094                 state->tp_tapenum[j] = j;
1095         }
1096         state->tp_fib[TAPERANGE] = 0;
1097         state->tp_dummy[TAPERANGE] = 0;
1098
1099         state->Level = 1;
1100         state->destTape = 0;
1101
1102         state->status = TSS_BUILDRUNS;
1103 }
1104
1105 /*
1106  * selectnewtape -- select new tape for new initial run.
1107  *
1108  * This is called after finishing a run when we know another run
1109  * must be started.  This implements steps D3, D4 of Algorithm D.
1110  */
1111 static void
1112 selectnewtape(Tuplesortstate *state)
1113 {
1114         int                     j;
1115         int                     a;
1116
1117         /* Step D3: advance j (destTape) */
1118         if (state->tp_dummy[state->destTape] < state->tp_dummy[state->destTape + 1])
1119         {
1120                 state->destTape++;
1121                 return;
1122         }
1123         if (state->tp_dummy[state->destTape] != 0)
1124         {
1125                 state->destTape = 0;
1126                 return;
1127         }
1128
1129         /* Step D4: increase level */
1130         state->Level++;
1131         a = state->tp_fib[0];
1132         for (j = 0; j < TAPERANGE; j++)
1133         {
1134                 state->tp_dummy[j] = a + state->tp_fib[j + 1] - state->tp_fib[j];
1135                 state->tp_fib[j] = a + state->tp_fib[j + 1];
1136         }
1137         state->destTape = 0;
1138 }
1139
1140 /*
1141  * mergeruns -- merge all the completed initial runs.
1142  *
1143  * This implements steps D5, D6 of Algorithm D.  All input data has
1144  * already been written to initial runs on tape (see dumptuples).
1145  */
1146 static void
1147 mergeruns(Tuplesortstate *state)
1148 {
1149         int                     tapenum,
1150                                 svTape,
1151                                 svRuns,
1152                                 svDummy;
1153
1154         Assert(state->status == TSS_BUILDRUNS);
1155         Assert(state->memtupcount == 0);
1156
1157         /*
1158          * If we produced only one initial run (quite likely if the total data
1159          * volume is between 1X and 2X workMem), we can just use that tape as the
1160          * finished output, rather than doing a useless merge.
1161          */
1162         if (state->currentRun == 1)
1163         {
1164                 state->result_tape = state->tp_tapenum[state->destTape];
1165                 /* must freeze and rewind the finished output tape */
1166                 LogicalTapeFreeze(state->tapeset, state->result_tape);
1167                 state->status = TSS_SORTEDONTAPE;
1168                 return;
1169         }
1170
1171         /* End of step D2: rewind all output tapes to prepare for merging */
1172         for (tapenum = 0; tapenum < TAPERANGE; tapenum++)
1173                 LogicalTapeRewind(state->tapeset, tapenum, false);
1174
1175         for (;;)
1176         {
1177                 /* Step D5: merge runs onto tape[T] until tape[P] is empty */
1178                 while (state->tp_runs[TAPERANGE - 1] || state->tp_dummy[TAPERANGE - 1])
1179                 {
1180                         bool            allDummy = true;
1181                         bool            allOneRun = true;
1182
1183                         for (tapenum = 0; tapenum < TAPERANGE; tapenum++)
1184                         {
1185                                 if (state->tp_dummy[tapenum] == 0)
1186                                         allDummy = false;
1187                                 if (state->tp_runs[tapenum] + state->tp_dummy[tapenum] != 1)
1188                                         allOneRun = false;
1189                         }
1190
1191                         /*
1192                          * If we don't have to produce a materialized sorted tape, quit as
1193                          * soon as we're down to one real/dummy run per tape.
1194                          */
1195                         if (!state->randomAccess && allOneRun)
1196                         {
1197                                 Assert(!allDummy);
1198                                 /* Initialize for the final merge pass */
1199                                 beginmerge(state);
1200                                 state->status = TSS_FINALMERGE;
1201                                 return;
1202                         }
1203                         if (allDummy)
1204                         {
1205                                 state->tp_dummy[TAPERANGE]++;
1206                                 for (tapenum = 0; tapenum < TAPERANGE; tapenum++)
1207                                         state->tp_dummy[tapenum]--;
1208                         }
1209                         else
1210                                 mergeonerun(state);
1211                 }
1212                 /* Step D6: decrease level */
1213                 if (--state->Level == 0)
1214                         break;
1215                 /* rewind output tape T to use as new input */
1216                 LogicalTapeRewind(state->tapeset, state->tp_tapenum[TAPERANGE],
1217                                                   false);
1218                 /* rewind used-up input tape P, and prepare it for write pass */
1219                 LogicalTapeRewind(state->tapeset, state->tp_tapenum[TAPERANGE - 1],
1220                                                   true);
1221                 state->tp_runs[TAPERANGE - 1] = 0;
1222
1223                 /*
1224                  * reassign tape units per step D6; note we no longer care about A[]
1225                  */
1226                 svTape = state->tp_tapenum[TAPERANGE];
1227                 svDummy = state->tp_dummy[TAPERANGE];
1228                 svRuns = state->tp_runs[TAPERANGE];
1229                 for (tapenum = TAPERANGE; tapenum > 0; tapenum--)
1230                 {
1231                         state->tp_tapenum[tapenum] = state->tp_tapenum[tapenum - 1];
1232                         state->tp_dummy[tapenum] = state->tp_dummy[tapenum - 1];
1233                         state->tp_runs[tapenum] = state->tp_runs[tapenum - 1];
1234                 }
1235                 state->tp_tapenum[0] = svTape;
1236                 state->tp_dummy[0] = svDummy;
1237                 state->tp_runs[0] = svRuns;
1238         }
1239
1240         /*
1241          * Done.  Knuth says that the result is on TAPE[1], but since we exited
1242          * the loop without performing the last iteration of step D6, we have not
1243          * rearranged the tape unit assignment, and therefore the result is on
1244          * TAPE[T].  We need to do it this way so that we can freeze the final
1245          * output tape while rewinding it.      The last iteration of step D6 would be
1246          * a waste of cycles anyway...
1247          */
1248         state->result_tape = state->tp_tapenum[TAPERANGE];
1249         LogicalTapeFreeze(state->tapeset, state->result_tape);
1250         state->status = TSS_SORTEDONTAPE;
1251 }
1252
1253 /*
1254  * Merge one run from each input tape, except ones with dummy runs.
1255  *
1256  * This is the inner loop of Algorithm D step D5.  We know that the
1257  * output tape is TAPE[T].
1258  */
1259 static void
1260 mergeonerun(Tuplesortstate *state)
1261 {
1262         int                     destTape = state->tp_tapenum[TAPERANGE];
1263         int                     srcTape;
1264         int                     tupIndex;
1265         void       *tup;
1266         long            priorAvail,
1267                                 spaceFreed;
1268
1269         /*
1270          * Start the merge by loading one tuple from each active source tape into
1271          * the heap.  We can also decrease the input run/dummy run counts.
1272          */
1273         beginmerge(state);
1274
1275         /*
1276          * Execute merge by repeatedly extracting lowest tuple in heap, writing it
1277          * out, and replacing it with next tuple from same tape (if there is
1278          * another one).
1279          */
1280         while (state->memtupcount > 0)
1281         {
1282                 CHECK_FOR_INTERRUPTS();
1283                 /* write the tuple to destTape */
1284                 priorAvail = state->availMem;
1285                 srcTape = state->memtupindex[0];
1286                 WRITETUP(state, destTape, state->memtuples[0]);
1287                 /* writetup adjusted total free space, now fix per-tape space */
1288                 spaceFreed = state->availMem - priorAvail;
1289                 state->mergeavailmem[srcTape] += spaceFreed;
1290                 /* compact the heap */
1291                 tuplesort_heap_siftup(state, false);
1292                 if ((tupIndex = state->mergenext[srcTape]) == 0)
1293                 {
1294                         /* out of preloaded data on this tape, try to read more */
1295                         mergepreread(state);
1296                         /* if still no data, we've reached end of run on this tape */
1297                         if ((tupIndex = state->mergenext[srcTape]) == 0)
1298                                 continue;
1299                 }
1300                 /* pull next preread tuple from list, insert in heap */
1301                 tup = state->memtuples[tupIndex];
1302                 state->mergenext[srcTape] = state->memtupindex[tupIndex];
1303                 if (state->mergenext[srcTape] == 0)
1304                         state->mergelast[srcTape] = 0;
1305                 state->memtupindex[tupIndex] = state->mergefreelist;
1306                 state->mergefreelist = tupIndex;
1307                 tuplesort_heap_insert(state, tup, srcTape, false);
1308         }
1309
1310         /*
1311          * When the heap empties, we're done.  Write an end-of-run marker on the
1312          * output tape, and increment its count of real runs.
1313          */
1314         markrunend(state, destTape);
1315         state->tp_runs[TAPERANGE]++;
1316
1317 #ifdef TRACE_SORT
1318         if (trace_sort)
1319                 elog(NOTICE, "finished merge step: %s",
1320                          pg_rusage_show(&state->ru_start));
1321 #endif
1322 }
1323
1324 /*
1325  * beginmerge - initialize for a merge pass
1326  *
1327  * We decrease the counts of real and dummy runs for each tape, and mark
1328  * which tapes contain active input runs in mergeactive[].      Then, load
1329  * as many tuples as we can from each active input tape, and finally
1330  * fill the merge heap with the first tuple from each active tape.
1331  */
1332 static void
1333 beginmerge(Tuplesortstate *state)
1334 {
1335         int                     activeTapes;
1336         int                     tapenum;
1337         int                     srcTape;
1338
1339         /* Heap should be empty here */
1340         Assert(state->memtupcount == 0);
1341
1342         /* Clear merge-pass state variables */
1343         memset(state->mergeactive, 0, sizeof(state->mergeactive));
1344         memset(state->mergenext, 0, sizeof(state->mergenext));
1345         memset(state->mergelast, 0, sizeof(state->mergelast));
1346         memset(state->mergeavailmem, 0, sizeof(state->mergeavailmem));
1347         state->mergefreelist = 0;       /* nothing in the freelist */
1348         state->mergefirstfree = MAXTAPES;       /* first slot available for preread */
1349
1350         /* Adjust run counts and mark the active tapes */
1351         activeTapes = 0;
1352         for (tapenum = 0; tapenum < TAPERANGE; tapenum++)
1353         {
1354                 if (state->tp_dummy[tapenum] > 0)
1355                         state->tp_dummy[tapenum]--;
1356                 else
1357                 {
1358                         Assert(state->tp_runs[tapenum] > 0);
1359                         state->tp_runs[tapenum]--;
1360                         srcTape = state->tp_tapenum[tapenum];
1361                         state->mergeactive[srcTape] = true;
1362                         activeTapes++;
1363                 }
1364         }
1365
1366         /*
1367          * Initialize space allocation to let each active input tape have an equal
1368          * share of preread space.
1369          */
1370         Assert(activeTapes > 0);
1371         state->spacePerTape = state->availMem / activeTapes;
1372         for (srcTape = 0; srcTape < MAXTAPES; srcTape++)
1373         {
1374                 if (state->mergeactive[srcTape])
1375                         state->mergeavailmem[srcTape] = state->spacePerTape;
1376         }
1377
1378         /*
1379          * Preread as many tuples as possible (and at least one) from each active
1380          * tape
1381          */
1382         mergepreread(state);
1383
1384         /* Load the merge heap with the first tuple from each input tape */
1385         for (srcTape = 0; srcTape < MAXTAPES; srcTape++)
1386         {
1387                 int                     tupIndex = state->mergenext[srcTape];
1388                 void       *tup;
1389
1390                 if (tupIndex)
1391                 {
1392                         tup = state->memtuples[tupIndex];
1393                         state->mergenext[srcTape] = state->memtupindex[tupIndex];
1394                         if (state->mergenext[srcTape] == 0)
1395                                 state->mergelast[srcTape] = 0;
1396                         state->memtupindex[tupIndex] = state->mergefreelist;
1397                         state->mergefreelist = tupIndex;
1398                         tuplesort_heap_insert(state, tup, srcTape, false);
1399                 }
1400         }
1401 }
1402
1403 /*
1404  * mergepreread - load tuples from merge input tapes
1405  *
1406  * This routine exists to improve sequentiality of reads during a merge pass,
1407  * as explained in the header comments of this file.  Load tuples from each
1408  * active source tape until the tape's run is exhausted or it has used up
1409  * its fair share of available memory.  In any case, we guarantee that there
1410  * is at least one preread tuple available from each unexhausted input tape.
1411  */
1412 static void
1413 mergepreread(Tuplesortstate *state)
1414 {
1415         int                     srcTape;
1416         unsigned int tuplen;
1417         void       *tup;
1418         int                     tupIndex;
1419         long            priorAvail,
1420                                 spaceUsed;
1421
1422         for (srcTape = 0; srcTape < MAXTAPES; srcTape++)
1423         {
1424                 if (!state->mergeactive[srcTape])
1425                         continue;
1426
1427                 /*
1428                  * Skip reading from any tape that still has at least half of its
1429                  * target memory filled with tuples (threshold fraction may need
1430                  * adjustment?).  This avoids reading just a few tuples when the
1431                  * incoming runs are not being consumed evenly.
1432                  */
1433                 if (state->mergenext[srcTape] != 0 &&
1434                         state->mergeavailmem[srcTape] <= state->spacePerTape / 2)
1435                         continue;
1436
1437                 /*
1438                  * Read tuples from this tape until it has used up its free memory,
1439                  * but ensure that we have at least one.
1440                  */
1441                 priorAvail = state->availMem;
1442                 state->availMem = state->mergeavailmem[srcTape];
1443                 while (!LACKMEM(state) || state->mergenext[srcTape] == 0)
1444                 {
1445                         /* read next tuple, if any */
1446                         if ((tuplen = getlen(state, srcTape, true)) == 0)
1447                         {
1448                                 state->mergeactive[srcTape] = false;
1449                                 break;
1450                         }
1451                         tup = READTUP(state, srcTape, tuplen);
1452                         /* find or make a free slot in memtuples[] for it */
1453                         tupIndex = state->mergefreelist;
1454                         if (tupIndex)
1455                                 state->mergefreelist = state->memtupindex[tupIndex];
1456                         else
1457                         {
1458                                 tupIndex = state->mergefirstfree++;
1459                                 /* Might need to enlarge arrays! */
1460                                 if (tupIndex >= state->memtupsize)
1461                                 {
1462                                         FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
1463                                         FREEMEM(state, GetMemoryChunkSpace(state->memtupindex));
1464                                         state->memtupsize *= 2;
1465                                         state->memtuples = (void **)
1466                                                 repalloc(state->memtuples,
1467                                                                  state->memtupsize * sizeof(void *));
1468                                         state->memtupindex = (int *)
1469                                                 repalloc(state->memtupindex,
1470                                                                  state->memtupsize * sizeof(int));
1471                                         USEMEM(state, GetMemoryChunkSpace(state->memtuples));
1472                                         USEMEM(state, GetMemoryChunkSpace(state->memtupindex));
1473                                 }
1474                         }
1475                         /* store tuple, append to list for its tape */
1476                         state->memtuples[tupIndex] = tup;
1477                         state->memtupindex[tupIndex] = 0;
1478                         if (state->mergelast[srcTape])
1479                                 state->memtupindex[state->mergelast[srcTape]] = tupIndex;
1480                         else
1481                                 state->mergenext[srcTape] = tupIndex;
1482                         state->mergelast[srcTape] = tupIndex;
1483                 }
1484                 /* update per-tape and global availmem counts */
1485                 spaceUsed = state->mergeavailmem[srcTape] - state->availMem;
1486                 state->mergeavailmem[srcTape] = state->availMem;
1487                 state->availMem = priorAvail - spaceUsed;
1488         }
1489 }
1490
1491 /*
1492  * dumptuples - remove tuples from heap and write to tape
1493  *
1494  * This is used during initial-run building, but not during merging.
1495  *
1496  * When alltuples = false, dump only enough tuples to get under the
1497  * availMem limit (and leave at least one tuple in the heap in any case,
1498  * since puttuple assumes it always has a tuple to compare to).
1499  *
1500  * When alltuples = true, dump everything currently in memory.
1501  * (This case is only used at end of input data.)
1502  *
1503  * If we empty the heap, close out the current run and return (this should
1504  * only happen at end of input data).  If we see that the tuple run number
1505  * at the top of the heap has changed, start a new run.
1506  */
1507 static void
1508 dumptuples(Tuplesortstate *state, bool alltuples)
1509 {
1510         while (alltuples ||
1511                    (LACKMEM(state) && state->memtupcount > 1))
1512         {
1513                 /*
1514                  * Dump the heap's frontmost entry, and sift up to remove it from the
1515                  * heap.
1516                  */
1517                 Assert(state->memtupcount > 0);
1518                 WRITETUP(state, state->tp_tapenum[state->destTape],
1519                                  state->memtuples[0]);
1520                 tuplesort_heap_siftup(state, true);
1521
1522                 /*
1523                  * If the heap is empty *or* top run number has changed, we've
1524                  * finished the current run.
1525                  */
1526                 if (state->memtupcount == 0 ||
1527                         state->currentRun != state->memtupindex[0])
1528                 {
1529                         markrunend(state, state->tp_tapenum[state->destTape]);
1530                         state->currentRun++;
1531                         state->tp_runs[state->destTape]++;
1532                         state->tp_dummy[state->destTape]--; /* per Alg D step D2 */
1533
1534 #ifdef TRACE_SORT
1535                         if (trace_sort)
1536                                 elog(NOTICE, "finished writing%s run %d: %s",
1537                                          (state->memtupcount == 0) ? " final" : "",
1538                                          state->currentRun,
1539                                          pg_rusage_show(&state->ru_start));
1540 #endif
1541
1542                         /*
1543                          * Done if heap is empty, else prepare for new run.
1544                          */
1545                         if (state->memtupcount == 0)
1546                                 break;
1547                         Assert(state->currentRun == state->memtupindex[0]);
1548                         selectnewtape(state);
1549                 }
1550         }
1551 }
1552
1553 /*
1554  * tuplesort_rescan             - rewind and replay the scan
1555  */
1556 void
1557 tuplesort_rescan(Tuplesortstate *state)
1558 {
1559         Assert(state->randomAccess);
1560
1561         switch (state->status)
1562         {
1563                 case TSS_SORTEDINMEM:
1564                         state->current = 0;
1565                         state->eof_reached = false;
1566                         state->markpos_offset = 0;
1567                         state->markpos_eof = false;
1568                         break;
1569                 case TSS_SORTEDONTAPE:
1570                         LogicalTapeRewind(state->tapeset,
1571                                                           state->result_tape,
1572                                                           false);
1573                         state->eof_reached = false;
1574                         state->markpos_block = 0L;
1575                         state->markpos_offset = 0;
1576                         state->markpos_eof = false;
1577                         break;
1578                 default:
1579                         elog(ERROR, "invalid tuplesort state");
1580                         break;
1581         }
1582 }
1583
1584 /*
1585  * tuplesort_markpos    - saves current position in the merged sort file
1586  */
1587 void
1588 tuplesort_markpos(Tuplesortstate *state)
1589 {
1590         Assert(state->randomAccess);
1591
1592         switch (state->status)
1593         {
1594                 case TSS_SORTEDINMEM:
1595                         state->markpos_offset = state->current;
1596                         state->markpos_eof = state->eof_reached;
1597                         break;
1598                 case TSS_SORTEDONTAPE:
1599                         LogicalTapeTell(state->tapeset,
1600                                                         state->result_tape,
1601                                                         &state->markpos_block,
1602                                                         &state->markpos_offset);
1603                         state->markpos_eof = state->eof_reached;
1604                         break;
1605                 default:
1606                         elog(ERROR, "invalid tuplesort state");
1607                         break;
1608         }
1609 }
1610
1611 /*
1612  * tuplesort_restorepos - restores current position in merged sort file to
1613  *                                                last saved position
1614  */
1615 void
1616 tuplesort_restorepos(Tuplesortstate *state)
1617 {
1618         Assert(state->randomAccess);
1619
1620         switch (state->status)
1621         {
1622                 case TSS_SORTEDINMEM:
1623                         state->current = state->markpos_offset;
1624                         state->eof_reached = state->markpos_eof;
1625                         break;
1626                 case TSS_SORTEDONTAPE:
1627                         if (!LogicalTapeSeek(state->tapeset,
1628                                                                  state->result_tape,
1629                                                                  state->markpos_block,
1630                                                                  state->markpos_offset))
1631                                 elog(ERROR, "tuplesort_restorepos failed");
1632                         state->eof_reached = state->markpos_eof;
1633                         break;
1634                 default:
1635                         elog(ERROR, "invalid tuplesort state");
1636                         break;
1637         }
1638 }
1639
1640
1641 /*
1642  * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
1643  *
1644  * The heap lives in state->memtuples[], with parallel data storage
1645  * for indexes in state->memtupindex[].  If checkIndex is true, use
1646  * the tuple index as the front of the sort key; otherwise, no.
1647  */
1648
1649 #define HEAPCOMPARE(tup1,index1,tup2,index2) \
1650         (checkIndex && (index1 != index2) ? (index1) - (index2) : \
1651          COMPARETUP(state, tup1, tup2))
1652
1653 /*
1654  * Insert a new tuple into an empty or existing heap, maintaining the
1655  * heap invariant.
1656  */
1657 static void
1658 tuplesort_heap_insert(Tuplesortstate *state, void *tuple,
1659                                           int tupleindex, bool checkIndex)
1660 {
1661         void      **memtuples;
1662         int                *memtupindex;
1663         int                     j;
1664
1665         /*
1666          * Make sure memtuples[] can handle another entry.
1667          */
1668         if (state->memtupcount >= state->memtupsize)
1669         {
1670                 FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
1671                 FREEMEM(state, GetMemoryChunkSpace(state->memtupindex));
1672                 state->memtupsize *= 2;
1673                 state->memtuples = (void **)
1674                         repalloc(state->memtuples,
1675                                          state->memtupsize * sizeof(void *));
1676                 state->memtupindex = (int *)
1677                         repalloc(state->memtupindex,
1678                                          state->memtupsize * sizeof(int));
1679                 USEMEM(state, GetMemoryChunkSpace(state->memtuples));
1680                 USEMEM(state, GetMemoryChunkSpace(state->memtupindex));
1681         }
1682         memtuples = state->memtuples;
1683         memtupindex = state->memtupindex;
1684
1685         /*
1686          * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is
1687          * using 1-based array indexes, not 0-based.
1688          */
1689         j = state->memtupcount++;
1690         while (j > 0)
1691         {
1692                 int                     i = (j - 1) >> 1;
1693
1694                 if (HEAPCOMPARE(tuple, tupleindex,
1695                                                 memtuples[i], memtupindex[i]) >= 0)
1696                         break;
1697                 memtuples[j] = memtuples[i];
1698                 memtupindex[j] = memtupindex[i];
1699                 j = i;
1700         }
1701         memtuples[j] = tuple;
1702         memtupindex[j] = tupleindex;
1703 }
1704
1705 /*
1706  * The tuple at state->memtuples[0] has been removed from the heap.
1707  * Decrement memtupcount, and sift up to maintain the heap invariant.
1708  */
1709 static void
1710 tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex)
1711 {
1712         void      **memtuples = state->memtuples;
1713         int                *memtupindex = state->memtupindex;
1714         void       *tuple;
1715         int                     tupindex,
1716                                 i,
1717                                 n;
1718
1719         if (--state->memtupcount <= 0)
1720                 return;
1721         n = state->memtupcount;
1722         tuple = memtuples[n];           /* tuple that must be reinserted */
1723         tupindex = memtupindex[n];
1724         i = 0;                                          /* i is where the "hole" is */
1725         for (;;)
1726         {
1727                 int                     j = 2 * i + 1;
1728
1729                 if (j >= n)
1730                         break;
1731                 if (j + 1 < n &&
1732                         HEAPCOMPARE(memtuples[j], memtupindex[j],
1733                                                 memtuples[j + 1], memtupindex[j + 1]) > 0)
1734                         j++;
1735                 if (HEAPCOMPARE(tuple, tupindex,
1736                                                 memtuples[j], memtupindex[j]) <= 0)
1737                         break;
1738                 memtuples[i] = memtuples[j];
1739                 memtupindex[i] = memtupindex[j];
1740                 i = j;
1741         }
1742         memtuples[i] = tuple;
1743         memtupindex[i] = tupindex;
1744 }
1745
1746
1747 /*
1748  * Tape interface routines
1749  */
1750
1751 static unsigned int
1752 getlen(Tuplesortstate *state, int tapenum, bool eofOK)
1753 {
1754         unsigned int len;
1755
1756         if (LogicalTapeRead(state->tapeset, tapenum, (void *) &len,
1757                                                 sizeof(len)) != sizeof(len))
1758                 elog(ERROR, "unexpected end of tape");
1759         if (len == 0 && !eofOK)
1760                 elog(ERROR, "unexpected end of data");
1761         return len;
1762 }
1763
1764 static void
1765 markrunend(Tuplesortstate *state, int tapenum)
1766 {
1767         unsigned int len = 0;
1768
1769         LogicalTapeWrite(state->tapeset, tapenum, (void *) &len, sizeof(len));
1770 }
1771
1772
1773 /*
1774  * qsort interface
1775  */
1776
1777 static int
1778 qsort_comparetup(const void *a, const void *b)
1779 {
1780         /* The passed pointers are pointers to void * ... */
1781
1782         return COMPARETUP(qsort_tuplesortstate, *(void **) a, *(void **) b);
1783 }
1784
1785
1786 /*
1787  * This routine selects an appropriate sorting function to implement
1788  * a sort operator as efficiently as possible.  The straightforward
1789  * method is to use the operator's implementation proc --- ie, "<"
1790  * comparison.  However, that way often requires two calls of the function
1791  * per comparison.      If we can find a btree three-way comparator function
1792  * associated with the operator, we can use it to do the comparisons
1793  * more efficiently.  We also support the possibility that the operator
1794  * is ">" (descending sort), in which case we have to reverse the output
1795  * of the btree comparator.
1796  *
1797  * Possibly this should live somewhere else (backend/catalog/, maybe?).
1798  */
1799 void
1800 SelectSortFunction(Oid sortOperator,
1801                                    RegProcedure *sortFunction,
1802                                    SortFunctionKind *kind)
1803 {
1804         CatCList   *catlist;
1805         int                     i;
1806         HeapTuple       tuple;
1807         Form_pg_operator optup;
1808         Oid                     opclass = InvalidOid;
1809
1810         /*
1811          * Search pg_amop to see if the target operator is registered as the "<"
1812          * or ">" operator of any btree opclass.  It's possible that it might be
1813          * registered both ways (eg, if someone were to build a "reverse sort"
1814          * opclass for some reason); prefer the "<" case if so. If the operator is
1815          * registered the same way in multiple opclasses, assume we can use the
1816          * associated comparator function from any one.
1817          */
1818         catlist = SearchSysCacheList(AMOPOPID, 1,
1819                                                                  ObjectIdGetDatum(sortOperator),
1820                                                                  0, 0, 0);
1821
1822         for (i = 0; i < catlist->n_members; i++)
1823         {
1824                 Form_pg_amop aform;
1825
1826                 tuple = &catlist->members[i]->tuple;
1827                 aform = (Form_pg_amop) GETSTRUCT(tuple);
1828
1829                 if (!opclass_is_btree(aform->amopclaid))
1830                         continue;
1831                 /* must be of default subtype, too */
1832                 if (aform->amopsubtype != InvalidOid)
1833                         continue;
1834
1835                 if (aform->amopstrategy == BTLessStrategyNumber)
1836                 {
1837                         opclass = aform->amopclaid;
1838                         *kind = SORTFUNC_CMP;
1839                         break;                          /* done looking */
1840                 }
1841                 else if (aform->amopstrategy == BTGreaterStrategyNumber)
1842                 {
1843                         opclass = aform->amopclaid;
1844                         *kind = SORTFUNC_REVCMP;
1845                         /* keep scanning in hopes of finding a BTLess entry */
1846                 }
1847         }
1848
1849         ReleaseSysCacheList(catlist);
1850
1851         if (OidIsValid(opclass))
1852         {
1853                 /* Found a suitable opclass, get its default comparator function */
1854                 *sortFunction = get_opclass_proc(opclass, InvalidOid, BTORDER_PROC);
1855                 Assert(RegProcedureIsValid(*sortFunction));
1856                 return;
1857         }
1858
1859         /*
1860          * Can't find a comparator, so use the operator as-is.  Decide whether it
1861          * is forward or reverse sort by looking at its name (grotty, but this
1862          * only matters for deciding which end NULLs should get sorted to).  XXX
1863          * possibly better idea: see whether its selectivity function is
1864          * scalargtcmp?
1865          */
1866         tuple = SearchSysCache(OPEROID,
1867                                                    ObjectIdGetDatum(sortOperator),
1868                                                    0, 0, 0);
1869         if (!HeapTupleIsValid(tuple))
1870                 elog(ERROR, "cache lookup failed for operator %u", sortOperator);
1871         optup = (Form_pg_operator) GETSTRUCT(tuple);
1872         if (strcmp(NameStr(optup->oprname), ">") == 0)
1873                 *kind = SORTFUNC_REVLT;
1874         else
1875                 *kind = SORTFUNC_LT;
1876         *sortFunction = optup->oprcode;
1877         ReleaseSysCache(tuple);
1878
1879         Assert(RegProcedureIsValid(*sortFunction));
1880 }
1881
1882 /*
1883  * Inline-able copy of FunctionCall2() to save some cycles in sorting.
1884  */
1885 static inline Datum
1886 myFunctionCall2(FmgrInfo *flinfo, Datum arg1, Datum arg2)
1887 {
1888         FunctionCallInfoData fcinfo;
1889         Datum           result;
1890
1891         InitFunctionCallInfoData(fcinfo, flinfo, 2, NULL, NULL);
1892
1893         fcinfo.arg[0] = arg1;
1894         fcinfo.arg[1] = arg2;
1895         fcinfo.argnull[0] = false;
1896         fcinfo.argnull[1] = false;
1897
1898         result = FunctionCallInvoke(&fcinfo);
1899
1900         /* Check for null result, since caller is clearly not expecting one */
1901         if (fcinfo.isnull)
1902                 elog(ERROR, "function %u returned NULL", fcinfo.flinfo->fn_oid);
1903
1904         return result;
1905 }
1906
1907 /*
1908  * Apply a sort function (by now converted to fmgr lookup form)
1909  * and return a 3-way comparison result.  This takes care of handling
1910  * NULLs and sort ordering direction properly.
1911  */
1912 static inline int32
1913 inlineApplySortFunction(FmgrInfo *sortFunction, SortFunctionKind kind,
1914                                                 Datum datum1, bool isNull1,
1915                                                 Datum datum2, bool isNull2)
1916 {
1917         switch (kind)
1918         {
1919                 case SORTFUNC_LT:
1920                         if (isNull1)
1921                         {
1922                                 if (isNull2)
1923                                         return 0;
1924                                 return 1;               /* NULL sorts after non-NULL */
1925                         }
1926                         if (isNull2)
1927                                 return -1;
1928                         if (DatumGetBool(myFunctionCall2(sortFunction, datum1, datum2)))
1929                                 return -1;              /* a < b */
1930                         if (DatumGetBool(myFunctionCall2(sortFunction, datum2, datum1)))
1931                                 return 1;               /* a > b */
1932                         return 0;
1933
1934                 case SORTFUNC_REVLT:
1935                         /* We reverse the ordering of NULLs, but not the operator */
1936                         if (isNull1)
1937                         {
1938                                 if (isNull2)
1939                                         return 0;
1940                                 return -1;              /* NULL sorts before non-NULL */
1941                         }
1942                         if (isNull2)
1943                                 return 1;
1944                         if (DatumGetBool(myFunctionCall2(sortFunction, datum1, datum2)))
1945                                 return -1;              /* a < b */
1946                         if (DatumGetBool(myFunctionCall2(sortFunction, datum2, datum1)))
1947                                 return 1;               /* a > b */
1948                         return 0;
1949
1950                 case SORTFUNC_CMP:
1951                         if (isNull1)
1952                         {
1953                                 if (isNull2)
1954                                         return 0;
1955                                 return 1;               /* NULL sorts after non-NULL */
1956                         }
1957                         if (isNull2)
1958                                 return -1;
1959                         return DatumGetInt32(myFunctionCall2(sortFunction,
1960                                                                                                  datum1, datum2));
1961
1962                 case SORTFUNC_REVCMP:
1963                         if (isNull1)
1964                         {
1965                                 if (isNull2)
1966                                         return 0;
1967                                 return -1;              /* NULL sorts before non-NULL */
1968                         }
1969                         if (isNull2)
1970                                 return 1;
1971                         return -DatumGetInt32(myFunctionCall2(sortFunction,
1972                                                                                                   datum1, datum2));
1973
1974                 default:
1975                         elog(ERROR, "unrecognized SortFunctionKind: %d", (int) kind);
1976                         return 0;                       /* can't get here, but keep compiler quiet */
1977         }
1978 }
1979
1980 /*
1981  * Non-inline ApplySortFunction() --- this is needed only to conform to
1982  * C99's brain-dead notions about how to implement inline functions...
1983  */
1984 int32
1985 ApplySortFunction(FmgrInfo *sortFunction, SortFunctionKind kind,
1986                                   Datum datum1, bool isNull1,
1987                                   Datum datum2, bool isNull2)
1988 {
1989         return inlineApplySortFunction(sortFunction, kind,
1990                                                                    datum1, isNull1,
1991                                                                    datum2, isNull2);
1992 }
1993
1994
1995 /*
1996  * Routines specialized for HeapTuple case
1997  */
1998
1999 static int
2000 comparetup_heap(Tuplesortstate *state, const void *a, const void *b)
2001 {
2002         HeapTuple       ltup = (HeapTuple) a;
2003         HeapTuple       rtup = (HeapTuple) b;
2004         TupleDesc       tupDesc = state->tupDesc;
2005         int                     nkey;
2006
2007         for (nkey = 0; nkey < state->nKeys; nkey++)
2008         {
2009                 ScanKey         scanKey = state->scanKeys + nkey;
2010                 AttrNumber      attno = scanKey->sk_attno;
2011                 Datum           datum1,
2012                                         datum2;
2013                 bool            isnull1,
2014                                         isnull2;
2015                 int32           compare;
2016
2017                 datum1 = heap_getattr(ltup, attno, tupDesc, &isnull1);
2018                 datum2 = heap_getattr(rtup, attno, tupDesc, &isnull2);
2019
2020                 compare = inlineApplySortFunction(&scanKey->sk_func,
2021                                                                                   state->sortFnKinds[nkey],
2022                                                                                   datum1, isnull1,
2023                                                                                   datum2, isnull2);
2024                 if (compare != 0)
2025                         return compare;
2026         }
2027
2028         return 0;
2029 }
2030
2031 static void *
2032 copytup_heap(Tuplesortstate *state, void *tup)
2033 {
2034         HeapTuple       tuple = (HeapTuple) tup;
2035
2036         tuple = heap_copytuple(tuple);
2037         USEMEM(state, GetMemoryChunkSpace(tuple));
2038         return (void *) tuple;
2039 }
2040
2041 /*
2042  * We don't bother to write the HeapTupleData part of the tuple.
2043  */
2044
2045 static void
2046 writetup_heap(Tuplesortstate *state, int tapenum, void *tup)
2047 {
2048         HeapTuple       tuple = (HeapTuple) tup;
2049         unsigned int tuplen;
2050
2051         tuplen = tuple->t_len + sizeof(tuplen);
2052         LogicalTapeWrite(state->tapeset, tapenum,
2053                                          (void *) &tuplen, sizeof(tuplen));
2054         LogicalTapeWrite(state->tapeset, tapenum,
2055                                          (void *) tuple->t_data, tuple->t_len);
2056         if (state->randomAccess)        /* need trailing length word? */
2057                 LogicalTapeWrite(state->tapeset, tapenum,
2058                                                  (void *) &tuplen, sizeof(tuplen));
2059
2060         FREEMEM(state, GetMemoryChunkSpace(tuple));
2061         heap_freetuple(tuple);
2062 }
2063
2064 static void *
2065 readtup_heap(Tuplesortstate *state, int tapenum, unsigned int len)
2066 {
2067         unsigned int tuplen = len - sizeof(unsigned int) + HEAPTUPLESIZE;
2068         HeapTuple       tuple = (HeapTuple) palloc(tuplen);
2069
2070         USEMEM(state, GetMemoryChunkSpace(tuple));
2071         /* reconstruct the HeapTupleData portion */
2072         tuple->t_len = len - sizeof(unsigned int);
2073         ItemPointerSetInvalid(&(tuple->t_self));
2074         tuple->t_datamcxt = CurrentMemoryContext;
2075         tuple->t_data = (HeapTupleHeader) (((char *) tuple) + HEAPTUPLESIZE);
2076         /* read in the tuple proper */
2077         if (LogicalTapeRead(state->tapeset, tapenum, (void *) tuple->t_data,
2078                                                 tuple->t_len) != tuple->t_len)
2079                 elog(ERROR, "unexpected end of data");
2080         if (state->randomAccess)        /* need trailing length word? */
2081                 if (LogicalTapeRead(state->tapeset, tapenum, (void *) &tuplen,
2082                                                         sizeof(tuplen)) != sizeof(tuplen))
2083                         elog(ERROR, "unexpected end of data");
2084         return (void *) tuple;
2085 }
2086
2087
2088 /*
2089  * Routines specialized for IndexTuple case
2090  *
2091  * NOTE: actually, these are specialized for the btree case; it's not
2092  * clear whether you could use them for a non-btree index.      Possibly
2093  * you'd need to make another set of routines if you needed to sort
2094  * according to another kind of index.
2095  */
2096
2097 static int
2098 comparetup_index(Tuplesortstate *state, const void *a, const void *b)
2099 {
2100         /*
2101          * This is almost the same as _bt_tuplecompare(), but we need to keep
2102          * track of whether any null fields are present.  Also see the special
2103          * treatment for equal keys at the end.
2104          */
2105         IndexTuple      tuple1 = (IndexTuple) a;
2106         IndexTuple      tuple2 = (IndexTuple) b;
2107         Relation        rel = state->indexRel;
2108         int                     keysz = RelationGetNumberOfAttributes(rel);
2109         ScanKey         scankey = state->indexScanKey;
2110         TupleDesc       tupDes;
2111         int                     i;
2112         bool            equal_hasnull = false;
2113
2114         tupDes = RelationGetDescr(rel);
2115
2116         for (i = 1; i <= keysz; i++)
2117         {
2118                 ScanKey         entry = &scankey[i - 1];
2119                 Datum           datum1,
2120                                         datum2;
2121                 bool            isnull1,
2122                                         isnull2;
2123                 int32           compare;
2124
2125                 datum1 = index_getattr(tuple1, i, tupDes, &isnull1);
2126                 datum2 = index_getattr(tuple2, i, tupDes, &isnull2);
2127
2128                 /* see comments about NULLs handling in btbuild */
2129
2130                 /* the comparison function is always of CMP type */
2131                 compare = inlineApplySortFunction(&entry->sk_func, SORTFUNC_CMP,
2132                                                                                   datum1, isnull1,
2133                                                                                   datum2, isnull2);
2134
2135                 if (compare != 0)
2136                         return (int) compare;           /* done when we find unequal
2137                                                                                  * attributes */
2138
2139                 /* they are equal, so we only need to examine one null flag */
2140                 if (isnull1)
2141                         equal_hasnull = true;
2142         }
2143
2144         /*
2145          * If btree has asked us to enforce uniqueness, complain if two equal
2146          * tuples are detected (unless there was at least one NULL field).
2147          *
2148          * It is sufficient to make the test here, because if two tuples are equal
2149          * they *must* get compared at some stage of the sort --- otherwise the
2150          * sort algorithm wouldn't have checked whether one must appear before the
2151          * other.
2152          *
2153          * Some rather brain-dead implementations of qsort will sometimes call the
2154          * comparison routine to compare a value to itself.  (At this writing only
2155          * QNX 4 is known to do such silly things.)  Don't raise a bogus error in
2156          * that case.
2157          */
2158         if (state->enforceUnique && !equal_hasnull && tuple1 != tuple2)
2159                 ereport(ERROR,
2160                                 (errcode(ERRCODE_UNIQUE_VIOLATION),
2161                                  errmsg("could not create unique index"),
2162                                  errdetail("Table contains duplicated values.")));
2163
2164         /*
2165          * If key values are equal, we sort on ItemPointer.  This does not affect
2166          * validity of the finished index, but it offers cheap insurance against
2167          * performance problems with bad qsort implementations that have trouble
2168          * with large numbers of equal keys.
2169          */
2170         {
2171                 BlockNumber blk1 = ItemPointerGetBlockNumber(&tuple1->t_tid);
2172                 BlockNumber blk2 = ItemPointerGetBlockNumber(&tuple2->t_tid);
2173
2174                 if (blk1 != blk2)
2175                         return (blk1 < blk2) ? -1 : 1;
2176         }
2177         {
2178                 OffsetNumber pos1 = ItemPointerGetOffsetNumber(&tuple1->t_tid);
2179                 OffsetNumber pos2 = ItemPointerGetOffsetNumber(&tuple2->t_tid);
2180
2181                 if (pos1 != pos2)
2182                         return (pos1 < pos2) ? -1 : 1;
2183         }
2184
2185         return 0;
2186 }
2187
2188 static void *
2189 copytup_index(Tuplesortstate *state, void *tup)
2190 {
2191         IndexTuple      tuple = (IndexTuple) tup;
2192         unsigned int tuplen = IndexTupleSize(tuple);
2193         IndexTuple      newtuple;
2194
2195         newtuple = (IndexTuple) palloc(tuplen);
2196         USEMEM(state, GetMemoryChunkSpace(newtuple));
2197
2198         memcpy(newtuple, tuple, tuplen);
2199
2200         return (void *) newtuple;
2201 }
2202
2203 static void
2204 writetup_index(Tuplesortstate *state, int tapenum, void *tup)
2205 {
2206         IndexTuple      tuple = (IndexTuple) tup;
2207         unsigned int tuplen;
2208
2209         tuplen = IndexTupleSize(tuple) + sizeof(tuplen);
2210         LogicalTapeWrite(state->tapeset, tapenum,
2211                                          (void *) &tuplen, sizeof(tuplen));
2212         LogicalTapeWrite(state->tapeset, tapenum,
2213                                          (void *) tuple, IndexTupleSize(tuple));
2214         if (state->randomAccess)        /* need trailing length word? */
2215                 LogicalTapeWrite(state->tapeset, tapenum,
2216                                                  (void *) &tuplen, sizeof(tuplen));
2217
2218         FREEMEM(state, GetMemoryChunkSpace(tuple));
2219         pfree(tuple);
2220 }
2221
2222 static void *
2223 readtup_index(Tuplesortstate *state, int tapenum, unsigned int len)
2224 {
2225         unsigned int tuplen = len - sizeof(unsigned int);
2226         IndexTuple      tuple = (IndexTuple) palloc(tuplen);
2227
2228         USEMEM(state, GetMemoryChunkSpace(tuple));
2229         if (LogicalTapeRead(state->tapeset, tapenum, (void *) tuple,
2230                                                 tuplen) != tuplen)
2231                 elog(ERROR, "unexpected end of data");
2232         if (state->randomAccess)        /* need trailing length word? */
2233                 if (LogicalTapeRead(state->tapeset, tapenum, (void *) &tuplen,
2234                                                         sizeof(tuplen)) != sizeof(tuplen))
2235                         elog(ERROR, "unexpected end of data");
2236         return (void *) tuple;
2237 }
2238
2239
2240 /*
2241  * Routines specialized for DatumTuple case
2242  */
2243
2244 static int
2245 comparetup_datum(Tuplesortstate *state, const void *a, const void *b)
2246 {
2247         DatumTuple *ltup = (DatumTuple *) a;
2248         DatumTuple *rtup = (DatumTuple *) b;
2249
2250         return inlineApplySortFunction(&state->sortOpFn, state->sortFnKind,
2251                                                                    ltup->val, ltup->isNull,
2252                                                                    rtup->val, rtup->isNull);
2253 }
2254
2255 static void *
2256 copytup_datum(Tuplesortstate *state, void *tup)
2257 {
2258         /* Not currently needed */
2259         elog(ERROR, "copytup_datum() should not be called");
2260         return NULL;
2261 }
2262
2263 static void
2264 writetup_datum(Tuplesortstate *state, int tapenum, void *tup)
2265 {
2266         DatumTuple *tuple = (DatumTuple *) tup;
2267         unsigned int tuplen;
2268         unsigned int writtenlen;
2269
2270         if (tuple->isNull || state->datumTypeByVal)
2271                 tuplen = sizeof(DatumTuple);
2272         else
2273         {
2274                 Size            datalen;
2275
2276                 datalen = datumGetSize(tuple->val, false, state->datumTypeLen);
2277                 tuplen = datalen + MAXALIGN(sizeof(DatumTuple));
2278         }
2279
2280         writtenlen = tuplen + sizeof(unsigned int);
2281
2282         LogicalTapeWrite(state->tapeset, tapenum,
2283                                          (void *) &writtenlen, sizeof(writtenlen));
2284         LogicalTapeWrite(state->tapeset, tapenum,
2285                                          (void *) tuple, tuplen);
2286         if (state->randomAccess)        /* need trailing length word? */
2287                 LogicalTapeWrite(state->tapeset, tapenum,
2288                                                  (void *) &writtenlen, sizeof(writtenlen));
2289
2290         FREEMEM(state, GetMemoryChunkSpace(tuple));
2291         pfree(tuple);
2292 }
2293
2294 static void *
2295 readtup_datum(Tuplesortstate *state, int tapenum, unsigned int len)
2296 {
2297         unsigned int tuplen = len - sizeof(unsigned int);
2298         DatumTuple *tuple = (DatumTuple *) palloc(tuplen);
2299
2300         USEMEM(state, GetMemoryChunkSpace(tuple));
2301         if (LogicalTapeRead(state->tapeset, tapenum, (void *) tuple,
2302                                                 tuplen) != tuplen)
2303                 elog(ERROR, "unexpected end of data");
2304         if (state->randomAccess)        /* need trailing length word? */
2305                 if (LogicalTapeRead(state->tapeset, tapenum, (void *) &tuplen,
2306                                                         sizeof(tuplen)) != sizeof(tuplen))
2307                         elog(ERROR, "unexpected end of data");
2308
2309         if (!tuple->isNull && !state->datumTypeByVal)
2310                 tuple->val = PointerGetDatum(((char *) tuple) +
2311                                                                          MAXALIGN(sizeof(DatumTuple)));
2312         return (void *) tuple;
2313 }