]> granicus.if.org Git - postgresql/blob - src/backend/utils/sort/tuplesort.c
Defend against brain-dead QNX implementation of qsort().
[postgresql] / src / backend / utils / sort / tuplesort.c
1 /*-------------------------------------------------------------------------
2  *
3  * tuplesort.c
4  *        Generalized tuple sorting routines.
5  *
6  * This module handles sorting of heap tuples, index tuples, or single
7  * Datums (and could easily support other kinds of sortable objects,
8  * if necessary).  It works efficiently for both small and large amounts
9  * of data.  Small amounts are sorted in-memory using qsort().  Large
10  * amounts are sorted using temporary files and a standard external sort
11  * algorithm.
12  *
13  * See Knuth, volume 3, for more than you want to know about the external
14  * sorting algorithm.  We divide the input into sorted runs using replacement
15  * selection, in the form of a priority tree implemented as a heap
16  * (essentially his Algorithm 5.2.3H), then merge the runs using polyphase
17  * merge, Knuth's Algorithm 5.4.2D.  The logical "tapes" used by Algorithm D
18  * are implemented by logtape.c, which avoids space wastage by recycling
19  * disk space as soon as each block is read from its "tape".
20  *
21  * We do not form the initial runs using Knuth's recommended replacement
22  * selection data structure (Algorithm 5.4.1R), because it uses a fixed
23  * number of records in memory at all times.  Since we are dealing with
24  * tuples that may vary considerably in size, we want to be able to vary
25  * the number of records kept in memory to ensure full utilization of the
26  * allowed sort memory space.  So, we keep the tuples in a variable-size
27  * heap, with the next record to go out at the top of the heap.  Like
28  * Algorithm 5.4.1R, each record is stored with the run number that it
29  * must go into, and we use (run number, key) as the ordering key for the
30  * heap.  When the run number at the top of the heap changes, we know that
31  * no more records of the prior run are left in the heap.
32  *
33  * The (approximate) amount of memory allowed for any one sort operation
34  * is given in kilobytes by the external variable SortMem.      Initially,
35  * we absorb tuples and simply store them in an unsorted array as long as
36  * we haven't exceeded SortMem.  If we reach the end of the input without
37  * exceeding SortMem, we sort the array using qsort() and subsequently return
38  * tuples just by scanning the tuple array sequentially.  If we do exceed
39  * SortMem, we construct a heap using Algorithm H and begin to emit tuples
40  * into sorted runs in temporary tapes, emitting just enough tuples at each
41  * step to get back within the SortMem limit.  Whenever the run number at
42  * the top of the heap changes, we begin a new run with a new output tape
43  * (selected per Algorithm D).  After the end of the input is reached,
44  * we dump out remaining tuples in memory into a final run (or two),
45  * then merge the runs using Algorithm D.
46  *
47  * When merging runs, we use a heap containing just the frontmost tuple from
48  * each source run; we repeatedly output the smallest tuple and insert the
49  * next tuple from its source tape (if any).  When the heap empties, the merge
50  * is complete.  The basic merge algorithm thus needs very little memory ---
51  * only M tuples for an M-way merge, and M is at most six in the present code.
52  * However, we can still make good use of our full SortMem allocation by
53  * pre-reading additional tuples from each source tape.  Without prereading,
54  * our access pattern to the temporary file would be very erratic; on average
55  * we'd read one block from each of M source tapes during the same time that
56  * we're writing M blocks to the output tape, so there is no sequentiality of
57  * access at all, defeating the read-ahead methods used by most Unix kernels.
58  * Worse, the output tape gets written into a very random sequence of blocks
59  * of the temp file, ensuring that things will be even worse when it comes
60  * time to read that tape.      A straightforward merge pass thus ends up doing a
61  * lot of waiting for disk seeks.  We can improve matters by prereading from
62  * each source tape sequentially, loading about SortMem/M bytes from each tape
63  * in turn.  Then we run the merge algorithm, writing but not reading until
64  * one of the preloaded tuple series runs out.  Then we switch back to preread
65  * mode, fill memory again, and repeat.  This approach helps to localize both
66  * read and write accesses.
67  *
68  * When the caller requests random access to the sort result, we form
69  * the final sorted run on a logical tape which is then "frozen", so
70  * that we can access it randomly.      When the caller does not need random
71  * access, we return from tuplesort_performsort() as soon as we are down
72  * to one run per logical tape.  The final merge is then performed
73  * on-the-fly as the caller repeatedly calls tuplesort_gettuple; this
74  * saves one cycle of writing all the data out to disk and reading it in.
75  *
76  *
77  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
78  * Portions Copyright (c) 1994, Regents of the University of California
79  *
80  * IDENTIFICATION
81  *        $Header: /cvsroot/pgsql/src/backend/utils/sort/tuplesort.c,v 1.21 2001/11/11 22:00:25 tgl Exp $
82  *
83  *-------------------------------------------------------------------------
84  */
85
86 #include "postgres.h"
87
88 #include "access/heapam.h"
89 #include "access/nbtree.h"
90 #include "catalog/catname.h"
91 #include "catalog/pg_amop.h"
92 #include "catalog/pg_amproc.h"
93 #include "catalog/pg_operator.h"
94 #include "miscadmin.h"
95 #include "utils/fmgroids.h"
96 #include "utils/logtape.h"
97 #include "utils/lsyscache.h"
98 #include "utils/syscache.h"
99 #include "utils/tuplesort.h"
100
101 /*
102  * Possible states of a Tuplesort object.  These denote the states that
103  * persist between calls of Tuplesort routines.
104  */
105 typedef enum
106 {
107         TSS_INITIAL,                            /* Loading tuples; still within memory
108                                                                  * limit */
109         TSS_BUILDRUNS,                          /* Loading tuples; writing to tape */
110         TSS_SORTEDINMEM,                        /* Sort completed entirely in memory */
111         TSS_SORTEDONTAPE,                       /* Sort completed, final run is on tape */
112         TSS_FINALMERGE                          /* Performing final merge on-the-fly */
113 } TupSortStatus;
114
115 /*
116  * We use a seven-tape polyphase merge, which is the "sweet spot" on the
117  * tapes-to-passes curve according to Knuth's figure 70 (section 5.4.2).
118  */
119 #define MAXTAPES                7               /* Knuth's T */
120 #define TAPERANGE               (MAXTAPES-1)    /* Knuth's P */
121
122 /*
123  * Private state of a Tuplesort operation.
124  */
125 struct Tuplesortstate
126 {
127         TupSortStatus status;           /* enumerated value as shown above */
128         bool            randomAccess;   /* did caller request random access? */
129         long            availMem;               /* remaining memory available, in bytes */
130         LogicalTapeSet *tapeset;        /* logtape.c object for tapes in a temp
131                                                                  * file */
132
133         /*
134          * These function pointers decouple the routines that must know what
135          * kind of tuple we are sorting from the routines that don't need to
136          * know it. They are set up by the tuplesort_begin_xxx routines.
137          *
138          * Function to compare two tuples; result is per qsort() convention, ie:
139          *
140          * <0, 0, >0 according as a<b, a=b, a>b.
141          */
142         int                     (*comparetup) (Tuplesortstate *state, const void *a, const void *b);
143
144         /*
145          * Function to copy a supplied input tuple into palloc'd space. (NB:
146          * we assume that a single pfree() is enough to release the tuple
147          * later, so the representation must be "flat" in one palloc chunk.)
148          * state->availMem must be decreased by the amount of space used.
149          */
150         void       *(*copytup) (Tuplesortstate *state, void *tup);
151
152         /*
153          * Function to write a stored tuple onto tape.  The representation of
154          * the tuple on tape need not be the same as it is in memory;
155          * requirements on the tape representation are given below.  After
156          * writing the tuple, pfree() it, and increase state->availMem by the
157          * amount of memory space thereby released.
158          */
159         void            (*writetup) (Tuplesortstate *state, int tapenum, void *tup);
160
161         /*
162          * Function to read a stored tuple from tape back into memory. 'len'
163          * is the already-read length of the stored tuple.      Create and return
164          * a palloc'd copy, and decrease state->availMem by the amount of
165          * memory space consumed.
166          */
167         void       *(*readtup) (Tuplesortstate *state, int tapenum, unsigned int len);
168
169         /*
170          * Obtain memory space occupied by a stored tuple.      (This routine is
171          * only needed in the FINALMERGE case, since copytup, writetup, and
172          * readtup are expected to adjust availMem appropriately.)
173          */
174         unsigned int (*tuplesize) (Tuplesortstate *state, void *tup);
175
176         /*
177          * This array holds pointers to tuples in sort memory.  If we are in
178          * state INITIAL, the tuples are in no particular order; if we are in
179          * state SORTEDINMEM, the tuples are in final sorted order; in states
180          * BUILDRUNS and FINALMERGE, the tuples are organized in "heap" order
181          * per Algorithm H.  (Note that memtupcount only counts the tuples
182          * that are part of the heap --- during merge passes, memtuples[]
183          * entries beyond TAPERANGE are never in the heap and are used to hold
184          * pre-read tuples.)  In state SORTEDONTAPE, the array is not used.
185          */
186         void      **memtuples;          /* array of pointers to palloc'd tuples */
187         int                     memtupcount;    /* number of tuples currently present */
188         int                     memtupsize;             /* allocated length of memtuples array */
189
190         /*
191          * While building initial runs, this array holds the run number for
192          * each tuple in memtuples[].  During merge passes, we re-use it to
193          * hold the input tape number that each tuple in the heap was read
194          * from, or to hold the index of the next tuple pre-read from the same
195          * tape in the case of pre-read entries.  This array is never
196          * allocated unless we need to use tapes.  Whenever it is allocated,
197          * it has the same length as memtuples[].
198          */
199         int                *memtupindex;        /* index value associated with
200                                                                  * memtuples[i] */
201
202         /*
203          * While building initial runs, this is the current output run number
204          * (starting at 0).  Afterwards, it is the number of initial runs we
205          * made.
206          */
207         int                     currentRun;
208
209         /*
210          * These variables are only used during merge passes.  mergeactive[i]
211          * is true if we are reading an input run from (actual) tape number i
212          * and have not yet exhausted that run.  mergenext[i] is the memtuples
213          * index of the next pre-read tuple (next to be loaded into the heap)
214          * for tape i, or 0 if we are out of pre-read tuples.  mergelast[i]
215          * similarly points to the last pre-read tuple from each tape.
216          * mergeavailmem[i] is the amount of unused space allocated for tape
217          * i. mergefreelist and mergefirstfree keep track of unused locations
218          * in the memtuples[] array.  memtupindex[] links together pre-read
219          * tuples for each tape as well as recycled locations in
220          * mergefreelist. It is OK to use 0 as a null link in these lists,
221          * because memtuples[0] is part of the merge heap and is never a
222          * pre-read tuple.
223          */
224         bool            mergeactive[MAXTAPES];  /* Active input run source? */
225         int                     mergenext[MAXTAPES];    /* first preread tuple for each
226                                                                                  * source */
227         int                     mergelast[MAXTAPES];    /* last preread tuple for each
228                                                                                  * source */
229         long            mergeavailmem[MAXTAPES];                /* availMem for prereading
230                                                                                                  * tapes */
231         long            spacePerTape;   /* actual per-tape target usage */
232         int                     mergefreelist;  /* head of freelist of recycled slots */
233         int                     mergefirstfree; /* first slot never used in this merge */
234
235         /*
236          * Variables for Algorithm D.  Note that destTape is a "logical" tape
237          * number, ie, an index into the tp_xxx[] arrays.  Be careful to keep
238          * "logical" and "actual" tape numbers straight!
239          */
240         int                     Level;                  /* Knuth's l */
241         int                     destTape;               /* current output tape (Knuth's j, less 1) */
242         int                     tp_fib[MAXTAPES];               /* Target Fibonacci run counts
243                                                                                  * (A[]) */
244         int                     tp_runs[MAXTAPES];              /* # of real runs on each tape */
245         int                     tp_dummy[MAXTAPES];             /* # of dummy runs for each tape
246                                                                                  * (D[]) */
247         int                     tp_tapenum[MAXTAPES];   /* Actual tape numbers (TAPE[]) */
248
249         /*
250          * These variables are used after completion of sorting to keep track
251          * of the next tuple to return.  (In the tape case, the tape's current
252          * read position is also critical state.)
253          */
254         int                     result_tape;    /* actual tape number of finished output */
255         int                     current;                /* array index (only used if SORTEDINMEM) */
256         bool            eof_reached;    /* reached EOF (needed for cursors) */
257
258         /* markpos_xxx holds marked position for mark and restore */
259         long            markpos_block;  /* tape block# (only used if SORTEDONTAPE) */
260         int                     markpos_offset; /* saved "current", or offset in tape
261                                                                  * block */
262         bool            markpos_eof;    /* saved "eof_reached" */
263
264         /*
265          * These variables are specific to the HeapTuple case; they are set by
266          * tuplesort_begin_heap and used only by the HeapTuple routines.
267          */
268         TupleDesc       tupDesc;
269         int                     nKeys;
270         ScanKey         scanKeys;
271         SortFunctionKind *sortFnKinds;
272
273         /*
274          * These variables are specific to the IndexTuple case; they are set
275          * by tuplesort_begin_index and used only by the IndexTuple routines.
276          */
277         Relation        indexRel;
278         ScanKey         indexScanKey;
279         bool            enforceUnique;  /* complain if we find duplicate tuples */
280
281         /*
282          * These variables are specific to the Datum case; they are set by
283          * tuplesort_begin_datum and used only by the DatumTuple routines.
284          */
285         Oid                     datumType;
286         Oid                     sortOperator;
287         FmgrInfo        sortOpFn;               /* cached lookup data for sortOperator */
288         SortFunctionKind sortFnKind;
289         /* we need typelen and byval in order to know how to copy the Datums. */
290         int                     datumTypeLen;
291         bool            datumTypeByVal;
292 };
293
294 #define COMPARETUP(state,a,b)   ((*(state)->comparetup) (state, a, b))
295 #define COPYTUP(state,tup)      ((*(state)->copytup) (state, tup))
296 #define WRITETUP(state,tape,tup)        ((*(state)->writetup) (state, tape, tup))
297 #define READTUP(state,tape,len) ((*(state)->readtup) (state, tape, len))
298 #define TUPLESIZE(state,tup)    ((*(state)->tuplesize) (state, tup))
299 #define LACKMEM(state)          ((state)->availMem < 0)
300 #define USEMEM(state,amt)       ((state)->availMem -= (amt))
301 #define FREEMEM(state,amt)      ((state)->availMem += (amt))
302
303 /*--------------------
304  *
305  * NOTES about on-tape representation of tuples:
306  *
307  * We require the first "unsigned int" of a stored tuple to be the total size
308  * on-tape of the tuple, including itself (so it is never zero; an all-zero
309  * unsigned int is used to delimit runs).  The remainder of the stored tuple
310  * may or may not match the in-memory representation of the tuple ---
311  * any conversion needed is the job of the writetup and readtup routines.
312  *
313  * If state->randomAccess is true, then the stored representation of the
314  * tuple must be followed by another "unsigned int" that is a copy of the
315  * length --- so the total tape space used is actually sizeof(unsigned int)
316  * more than the stored length value.  This allows read-backwards.      When
317  * randomAccess is not true, the write/read routines may omit the extra
318  * length word.
319  *
320  * writetup is expected to write both length words as well as the tuple
321  * data.  When readtup is called, the tape is positioned just after the
322  * front length word; readtup must read the tuple data and advance past
323  * the back length word (if present).
324  *
325  * The write/read routines can make use of the tuple description data
326  * stored in the Tuplesortstate record, if needed.      They are also expected
327  * to adjust state->availMem by the amount of memory space (not tape space!)
328  * released or consumed.  There is no error return from either writetup
329  * or readtup; they should elog() on failure.
330  *
331  *
332  * NOTES about memory consumption calculations:
333  *
334  * We count space requested for tuples against the SortMem limit.
335  * Fixed-size space (primarily the LogicalTapeSet I/O buffers) is not
336  * counted, nor do we count the variable-size memtuples and memtupindex
337  * arrays.      (Even though those could grow pretty large, they should be
338  * small compared to the tuples proper, so this is not unreasonable.)
339  *
340  * The major deficiency in this approach is that it ignores palloc overhead.
341  * The memory space actually allocated for a palloc chunk is always more
342  * than the request size, and could be considerably more (as much as 2X
343  * larger, in the current aset.c implementation).  So the space used could
344  * be considerably more than SortMem says.
345  *
346  * One way to fix this is to add a memory management function that, given
347  * a pointer to a palloc'd chunk, returns the actual space consumed by the
348  * chunk.  This would be very easy in the current aset.c module, but I'm
349  * hesitant to do it because it might be unpleasant to support in future
350  * implementations of memory management.  (For example, a direct
351  * implementation of palloc as malloc could not support such a function
352  * portably.)
353  *
354  * A cruder answer is just to apply a fudge factor, say by initializing
355  * availMem to only three-quarters of what SortMem indicates.  This is
356  * probably the right answer if anyone complains that SortMem is not being
357  * obeyed very faithfully.
358  *
359  *--------------------
360  */
361
362 /*
363  * For sorting single Datums, we build "pseudo tuples" that just carry
364  * the datum's value and null flag.  For pass-by-reference data types,
365  * the actual data value appears after the DatumTupleHeader (MAXALIGNed,
366  * of course), and the value field in the header is just a pointer to it.
367  */
368
369 typedef struct
370 {
371         Datum           val;
372         bool            isNull;
373 } DatumTuple;
374
375
376 static Tuplesortstate *tuplesort_begin_common(bool randomAccess);
377 static void puttuple_common(Tuplesortstate *state, void *tuple);
378 static void inittapes(Tuplesortstate *state);
379 static void selectnewtape(Tuplesortstate *state);
380 static void mergeruns(Tuplesortstate *state);
381 static void mergeonerun(Tuplesortstate *state);
382 static void beginmerge(Tuplesortstate *state);
383 static void mergepreread(Tuplesortstate *state);
384 static void dumptuples(Tuplesortstate *state, bool alltuples);
385 static void tuplesort_heap_insert(Tuplesortstate *state, void *tuple,
386                                           int tupleindex, bool checkIndex);
387 static void tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex);
388 static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
389 static void markrunend(Tuplesortstate *state, int tapenum);
390 static int      qsort_comparetup(const void *a, const void *b);
391 static int comparetup_heap(Tuplesortstate *state,
392                                 const void *a, const void *b);
393 static void *copytup_heap(Tuplesortstate *state, void *tup);
394 static void writetup_heap(Tuplesortstate *state, int tapenum, void *tup);
395 static void *readtup_heap(Tuplesortstate *state, int tapenum,
396                          unsigned int len);
397 static unsigned int tuplesize_heap(Tuplesortstate *state, void *tup);
398 static int comparetup_index(Tuplesortstate *state,
399                                  const void *a, const void *b);
400 static void *copytup_index(Tuplesortstate *state, void *tup);
401 static void writetup_index(Tuplesortstate *state, int tapenum, void *tup);
402 static void *readtup_index(Tuplesortstate *state, int tapenum,
403                           unsigned int len);
404 static unsigned int tuplesize_index(Tuplesortstate *state, void *tup);
405 static int comparetup_datum(Tuplesortstate *state,
406                                  const void *a, const void *b);
407 static void *copytup_datum(Tuplesortstate *state, void *tup);
408 static void writetup_datum(Tuplesortstate *state, int tapenum, void *tup);
409 static void *readtup_datum(Tuplesortstate *state, int tapenum,
410                           unsigned int len);
411 static unsigned int tuplesize_datum(Tuplesortstate *state, void *tup);
412
413 /*
414  * Since qsort(3) will not pass any context info to qsort_comparetup(),
415  * we have to use this ugly static variable.  It is set to point to the
416  * active Tuplesortstate object just before calling qsort.      It should
417  * not be used directly by anything except qsort_comparetup().
418  */
419 static Tuplesortstate *qsort_tuplesortstate;
420
421
422 /*
423  *              tuplesort_begin_xxx
424  *
425  * Initialize for a tuple sort operation.
426  *
427  * After calling tuplesort_begin, the caller should call tuplesort_puttuple
428  * zero or more times, then call tuplesort_performsort when all the tuples
429  * have been supplied.  After performsort, retrieve the tuples in sorted
430  * order by calling tuplesort_gettuple until it returns NULL.  (If random
431  * access was requested, rescan, markpos, and restorepos can also be called.)
432  * For Datum sorts, putdatum/getdatum are used instead of puttuple/gettuple.
433  * Call tuplesort_end to terminate the operation and release memory/disk space.
434  */
435
436 static Tuplesortstate *
437 tuplesort_begin_common(bool randomAccess)
438 {
439         Tuplesortstate *state;
440
441         state = (Tuplesortstate *) palloc(sizeof(Tuplesortstate));
442
443         MemSet((char *) state, 0, sizeof(Tuplesortstate));
444
445         state->status = TSS_INITIAL;
446         state->randomAccess = randomAccess;
447         state->availMem = SortMem * 1024L;
448         state->tapeset = NULL;
449
450         state->memtupcount = 0;
451         state->memtupsize = 1024;       /* initial guess */
452         state->memtuples = (void **) palloc(state->memtupsize * sizeof(void *));
453
454         state->memtupindex = NULL;      /* until and unless needed */
455
456         state->currentRun = 0;
457
458         /* Algorithm D variables will be initialized by inittapes, if needed */
459
460         state->result_tape = -1;        /* flag that result tape has not been
461                                                                  * formed */
462
463         return state;
464 }
465
466 Tuplesortstate *
467 tuplesort_begin_heap(TupleDesc tupDesc,
468                                          int nkeys,
469                                          Oid *sortOperators, AttrNumber *attNums,
470                                          bool randomAccess)
471 {
472         Tuplesortstate *state = tuplesort_begin_common(randomAccess);
473         int                     i;
474
475         AssertArg(nkeys > 0);
476
477         state->comparetup = comparetup_heap;
478         state->copytup = copytup_heap;
479         state->writetup = writetup_heap;
480         state->readtup = readtup_heap;
481         state->tuplesize = tuplesize_heap;
482
483         state->tupDesc = tupDesc;
484         state->nKeys = nkeys;
485         state->scanKeys = (ScanKey) palloc(nkeys * sizeof(ScanKeyData));
486         MemSet(state->scanKeys, 0, nkeys * sizeof(ScanKeyData));
487         state->sortFnKinds = (SortFunctionKind *)
488                 palloc(nkeys * sizeof(SortFunctionKind));
489         MemSet(state->sortFnKinds, 0, nkeys * sizeof(SortFunctionKind));
490
491         for (i = 0; i < nkeys; i++)
492         {
493                 RegProcedure sortFunction;
494
495                 AssertArg(sortOperators[i] != 0);
496                 AssertArg(attNums[i] != 0);
497
498                 /* select a function that implements the sort operator */
499                 SelectSortFunction(sortOperators[i], &sortFunction,
500                                                    &state->sortFnKinds[i]);
501
502                 ScanKeyEntryInitialize(&state->scanKeys[i],
503                                                            0x0,
504                                                            attNums[i],
505                                                            sortFunction,
506                                                            (Datum) 0);
507         }
508
509         return state;
510 }
511
512 Tuplesortstate *
513 tuplesort_begin_index(Relation indexRel,
514                                           bool enforceUnique,
515                                           bool randomAccess)
516 {
517         Tuplesortstate *state = tuplesort_begin_common(randomAccess);
518
519         state->comparetup = comparetup_index;
520         state->copytup = copytup_index;
521         state->writetup = writetup_index;
522         state->readtup = readtup_index;
523         state->tuplesize = tuplesize_index;
524
525         state->indexRel = indexRel;
526         /* see comments below about btree dependence of this code... */
527         state->indexScanKey = _bt_mkscankey_nodata(indexRel);
528         state->enforceUnique = enforceUnique;
529
530         return state;
531 }
532
533 Tuplesortstate *
534 tuplesort_begin_datum(Oid datumType,
535                                           Oid sortOperator,
536                                           bool randomAccess)
537 {
538         Tuplesortstate *state = tuplesort_begin_common(randomAccess);
539         RegProcedure sortFunction;
540         int16           typlen;
541         bool            typbyval;
542
543         state->comparetup = comparetup_datum;
544         state->copytup = copytup_datum;
545         state->writetup = writetup_datum;
546         state->readtup = readtup_datum;
547         state->tuplesize = tuplesize_datum;
548
549         state->datumType = datumType;
550         state->sortOperator = sortOperator;
551
552         /* select a function that implements the sort operator */
553         SelectSortFunction(sortOperator, &sortFunction, &state->sortFnKind);
554         /* and look up the function */
555         fmgr_info(sortFunction, &state->sortOpFn);
556
557         /* lookup necessary attributes of the datum type */
558         get_typlenbyval(datumType, &typlen, &typbyval);
559         state->datumTypeLen = typlen;
560         state->datumTypeByVal = typbyval;
561
562         return state;
563 }
564
565 /*
566  * tuplesort_end
567  *
568  *      Release resources and clean up.
569  */
570 void
571 tuplesort_end(Tuplesortstate *state)
572 {
573         int                     i;
574
575         if (state->tapeset)
576                 LogicalTapeSetClose(state->tapeset);
577         if (state->memtuples)
578         {
579                 for (i = 0; i < state->memtupcount; i++)
580                         pfree(state->memtuples[i]);
581                 pfree(state->memtuples);
582         }
583         if (state->memtupindex)
584                 pfree(state->memtupindex);
585
586         /*
587          * this stuff might better belong in a variant-specific shutdown
588          * routine
589          */
590         if (state->scanKeys)
591                 pfree(state->scanKeys);
592         if (state->sortFnKinds)
593                 pfree(state->sortFnKinds);
594
595         pfree(state);
596 }
597
598 /*
599  * Accept one tuple while collecting input data for sort.
600  *
601  * Note that the input tuple is always copied; the caller need not save it.
602  */
603 void
604 tuplesort_puttuple(Tuplesortstate *state, void *tuple)
605 {
606         /*
607          * Copy the given tuple into memory we control, and decrease availMem.
608          * Then call the code shared with the Datum case.
609          */
610         tuple = COPYTUP(state, tuple);
611
612         puttuple_common(state, tuple);
613 }
614
615 /*
616  * Accept one Datum while collecting input data for sort.
617  *
618  * If the Datum is pass-by-ref type, the value will be copied.
619  */
620 void
621 tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull)
622 {
623         DatumTuple *tuple;
624
625         /*
626          * Build pseudo-tuple carrying the datum, and decrease availMem.
627          */
628         if (isNull || state->datumTypeByVal)
629         {
630                 USEMEM(state, sizeof(DatumTuple));
631                 tuple = (DatumTuple *) palloc(sizeof(DatumTuple));
632                 tuple->val = val;
633                 tuple->isNull = isNull;
634         }
635         else
636         {
637                 int                     datalen = state->datumTypeLen;
638                 int                     tuplelen;
639                 char       *newVal;
640
641                 if (datalen == -1)              /* variable length type? */
642                         datalen = VARSIZE((struct varlena *) DatumGetPointer(val));
643                 tuplelen = datalen + MAXALIGN(sizeof(DatumTuple));
644                 USEMEM(state, tuplelen);
645                 newVal = (char *) palloc(tuplelen);
646                 tuple = (DatumTuple *) newVal;
647                 newVal += MAXALIGN(sizeof(DatumTuple));
648                 memcpy(newVal, DatumGetPointer(val), datalen);
649                 tuple->val = PointerGetDatum(newVal);
650                 tuple->isNull = false;
651         }
652
653         puttuple_common(state, (void *) tuple);
654 }
655
656 /*
657  * Shared code for tuple and datum cases.
658  */
659 static void
660 puttuple_common(Tuplesortstate *state, void *tuple)
661 {
662         switch (state->status)
663         {
664                 case TSS_INITIAL:
665
666                         /*
667                          * Save the copied tuple into the unsorted array.
668                          */
669                         if (state->memtupcount >= state->memtupsize)
670                         {
671                                 /* Grow the unsorted array as needed. */
672                                 state->memtupsize *= 2;
673                                 state->memtuples = (void **)
674                                         repalloc(state->memtuples,
675                                                          state->memtupsize * sizeof(void *));
676                         }
677                         state->memtuples[state->memtupcount++] = tuple;
678
679                         /*
680                          * Done if we still fit in available memory.
681                          */
682                         if (!LACKMEM(state))
683                                 return;
684
685                         /*
686                          * Nope; time to switch to tape-based operation.
687                          */
688                         inittapes(state);
689
690                         /*
691                          * Dump tuples until we are back under the limit.
692                          */
693                         dumptuples(state, false);
694                         break;
695                 case TSS_BUILDRUNS:
696
697                         /*
698                          * Insert the copied tuple into the heap, with run number
699                          * currentRun if it can go into the current run, else run
700                          * number currentRun+1.  The tuple can go into the current run
701                          * if it is >= the first not-yet-output tuple.  (Actually, it
702                          * could go into the current run if it is >= the most recently
703                          * output tuple ... but that would require keeping around the
704                          * tuple we last output, and it's simplest to let writetup
705                          * free each tuple as soon as it's written.)
706                          *
707                          * Note there will always be at least one tuple in the heap at
708                          * this point; see dumptuples.
709                          */
710                         Assert(state->memtupcount > 0);
711                         if (COMPARETUP(state, tuple, state->memtuples[0]) >= 0)
712                                 tuplesort_heap_insert(state, tuple, state->currentRun, true);
713                         else
714                                 tuplesort_heap_insert(state, tuple, state->currentRun + 1, true);
715
716                         /*
717                          * If we are over the memory limit, dump tuples till we're
718                          * under.
719                          */
720                         dumptuples(state, false);
721                         break;
722                 default:
723                         elog(ERROR, "tuplesort_puttuple: invalid state");
724                         break;
725         }
726 }
727
728 /*
729  * All tuples have been provided; finish the sort.
730  */
731 void
732 tuplesort_performsort(Tuplesortstate *state)
733 {
734         switch (state->status)
735         {
736                 case TSS_INITIAL:
737
738                         /*
739                          * We were able to accumulate all the tuples within the
740                          * allowed amount of memory.  Just qsort 'em and we're done.
741                          */
742                         if (state->memtupcount > 1)
743                         {
744                                 qsort_tuplesortstate = state;
745                                 qsort((void *) state->memtuples, state->memtupcount,
746                                           sizeof(void *), qsort_comparetup);
747                         }
748                         state->current = 0;
749                         state->eof_reached = false;
750                         state->markpos_offset = 0;
751                         state->markpos_eof = false;
752                         state->status = TSS_SORTEDINMEM;
753                         break;
754                 case TSS_BUILDRUNS:
755
756                         /*
757                          * Finish tape-based sort.      First, flush all tuples remaining
758                          * in memory out to tape; then merge until we have a single
759                          * remaining run (or, if !randomAccess, one run per tape).
760                          * Note that mergeruns sets the correct state->status.
761                          */
762                         dumptuples(state, true);
763                         mergeruns(state);
764                         state->eof_reached = false;
765                         state->markpos_block = 0L;
766                         state->markpos_offset = 0;
767                         state->markpos_eof = false;
768                         break;
769                 default:
770                         elog(ERROR, "tuplesort_performsort: invalid state");
771                         break;
772         }
773 }
774
775 /*
776  * Fetch the next tuple in either forward or back direction.
777  * Returns NULL if no more tuples.      If should_free is set, the
778  * caller must pfree the returned tuple when done with it.
779  */
780 void *
781 tuplesort_gettuple(Tuplesortstate *state, bool forward,
782                                    bool *should_free)
783 {
784         unsigned int tuplen;
785         void       *tup;
786
787         switch (state->status)
788         {
789                 case TSS_SORTEDINMEM:
790                         Assert(forward || state->randomAccess);
791                         *should_free = false;
792                         if (forward)
793                         {
794                                 if (state->current < state->memtupcount)
795                                         return state->memtuples[state->current++];
796                                 state->eof_reached = true;
797                                 return NULL;
798                         }
799                         else
800                         {
801                                 if (state->current <= 0)
802                                         return NULL;
803
804                                 /*
805                                  * if all tuples are fetched already then we return last
806                                  * tuple, else - tuple before last returned.
807                                  */
808                                 if (state->eof_reached)
809                                         state->eof_reached = false;
810                                 else
811                                 {
812                                         state->current--;       /* last returned tuple */
813                                         if (state->current <= 0)
814                                                 return NULL;
815                                 }
816                                 return state->memtuples[state->current - 1];
817                         }
818                         break;
819
820                 case TSS_SORTEDONTAPE:
821                         Assert(forward || state->randomAccess);
822                         *should_free = true;
823                         if (forward)
824                         {
825                                 if (state->eof_reached)
826                                         return NULL;
827                                 if ((tuplen = getlen(state, state->result_tape, true)) != 0)
828                                 {
829                                         tup = READTUP(state, state->result_tape, tuplen);
830                                         return tup;
831                                 }
832                                 else
833                                 {
834                                         state->eof_reached = true;
835                                         return NULL;
836                                 }
837                         }
838
839                         /*
840                          * Backward.
841                          *
842                          * if all tuples are fetched already then we return last tuple,
843                          * else - tuple before last returned.
844                          */
845                         if (state->eof_reached)
846                         {
847                                 /*
848                                  * Seek position is pointing just past the zero tuplen at
849                                  * the end of file; back up to fetch last tuple's ending
850                                  * length word.  If seek fails we must have a completely
851                                  * empty file.
852                                  */
853                                 if (!LogicalTapeBackspace(state->tapeset,
854                                                                                   state->result_tape,
855                                                                                   2 * sizeof(unsigned int)))
856                                         return NULL;
857                                 state->eof_reached = false;
858                         }
859                         else
860                         {
861                                 /*
862                                  * Back up and fetch previously-returned tuple's ending
863                                  * length word.  If seek fails, assume we are at start of
864                                  * file.
865                                  */
866                                 if (!LogicalTapeBackspace(state->tapeset,
867                                                                                   state->result_tape,
868                                                                                   sizeof(unsigned int)))
869                                         return NULL;
870                                 tuplen = getlen(state, state->result_tape, false);
871
872                                 /*
873                                  * Back up to get ending length word of tuple before it.
874                                  */
875                                 if (!LogicalTapeBackspace(state->tapeset,
876                                                                                   state->result_tape,
877                                                                           tuplen + 2 * sizeof(unsigned int)))
878                                 {
879                                         /*
880                                          * If that fails, presumably the prev tuple is the
881                                          * first in the file.  Back up so that it becomes next
882                                          * to read in forward direction (not obviously right,
883                                          * but that is what in-memory case does).
884                                          */
885                                         if (!LogicalTapeBackspace(state->tapeset,
886                                                                                           state->result_tape,
887                                                                                   tuplen + sizeof(unsigned int)))
888                                                 elog(ERROR, "tuplesort_gettuple: bogus tuple len in backward scan");
889                                         return NULL;
890                                 }
891                         }
892
893                         tuplen = getlen(state, state->result_tape, false);
894
895                         /*
896                          * Now we have the length of the prior tuple, back up and read
897                          * it. Note: READTUP expects we are positioned after the
898                          * initial length word of the tuple, so back up to that point.
899                          */
900                         if (!LogicalTapeBackspace(state->tapeset,
901                                                                           state->result_tape,
902                                                                           tuplen))
903                                 elog(ERROR, "tuplesort_gettuple: bogus tuple len in backward scan");
904                         tup = READTUP(state, state->result_tape, tuplen);
905                         return tup;
906
907                 case TSS_FINALMERGE:
908                         Assert(forward);
909                         *should_free = true;
910
911                         /*
912                          * This code should match the inner loop of mergeonerun().
913                          */
914                         if (state->memtupcount > 0)
915                         {
916                                 int                     srcTape = state->memtupindex[0];
917                                 unsigned int tuplen;
918                                 int                     tupIndex;
919                                 void       *newtup;
920
921                                 tup = state->memtuples[0];
922                                 /* returned tuple is no longer counted in our memory space */
923                                 tuplen = TUPLESIZE(state, tup);
924                                 state->availMem += tuplen;
925                                 state->mergeavailmem[srcTape] += tuplen;
926                                 tuplesort_heap_siftup(state, false);
927                                 if ((tupIndex = state->mergenext[srcTape]) == 0)
928                                 {
929                                         /*
930                                          * out of preloaded data on this tape, try to read
931                                          * more
932                                          */
933                                         mergepreread(state);
934
935                                         /*
936                                          * if still no data, we've reached end of run on this
937                                          * tape
938                                          */
939                                         if ((tupIndex = state->mergenext[srcTape]) == 0)
940                                                 return tup;
941                                 }
942                                 /* pull next preread tuple from list, insert in heap */
943                                 newtup = state->memtuples[tupIndex];
944                                 state->mergenext[srcTape] = state->memtupindex[tupIndex];
945                                 if (state->mergenext[srcTape] == 0)
946                                         state->mergelast[srcTape] = 0;
947                                 state->memtupindex[tupIndex] = state->mergefreelist;
948                                 state->mergefreelist = tupIndex;
949                                 tuplesort_heap_insert(state, newtup, srcTape, false);
950                                 return tup;
951                         }
952                         return NULL;
953
954                 default:
955                         elog(ERROR, "tuplesort_gettuple: invalid state");
956                         return NULL;            /* keep compiler quiet */
957         }
958 }
959
960 /*
961  * Fetch the next Datum in either forward or back direction.
962  * Returns FALSE if no more datums.
963  *
964  * If the Datum is pass-by-ref type, the returned value is freshly palloc'd
965  * and is now owned by the caller.
966  */
967 bool
968 tuplesort_getdatum(Tuplesortstate *state, bool forward,
969                                    Datum *val, bool *isNull)
970 {
971         DatumTuple *tuple;
972         bool            should_free;
973
974         tuple = (DatumTuple *) tuplesort_gettuple(state, forward, &should_free);
975
976         if (tuple == NULL)
977                 return false;
978
979         if (tuple->isNull || state->datumTypeByVal)
980         {
981                 *val = tuple->val;
982                 *isNull = tuple->isNull;
983         }
984         else
985         {
986                 int                     datalen = state->datumTypeLen;
987                 char       *newVal;
988
989                 if (datalen == -1)              /* variable length type? */
990                         datalen = VARSIZE((struct varlena *) DatumGetPointer(tuple->val));
991                 newVal = (char *) palloc(datalen);
992                 memcpy(newVal, DatumGetPointer(tuple->val), datalen);
993                 *val = PointerGetDatum(newVal);
994                 *isNull = false;
995         }
996
997         if (should_free)
998                 pfree(tuple);
999
1000         return true;
1001 }
1002
1003
1004 /*
1005  * inittapes - initialize for tape sorting.
1006  *
1007  * This is called only if we have found we don't have room to sort in memory.
1008  */
1009 static void
1010 inittapes(Tuplesortstate *state)
1011 {
1012         int                     ntuples,
1013                                 j;
1014
1015         state->tapeset = LogicalTapeSetCreate(MAXTAPES);
1016
1017         /*
1018          * Allocate the memtupindex array, same size as memtuples.
1019          */
1020         state->memtupindex = (int *) palloc(state->memtupsize * sizeof(int));
1021
1022         /*
1023          * Convert the unsorted contents of memtuples[] into a heap. Each
1024          * tuple is marked as belonging to run number zero.
1025          *
1026          * NOTE: we pass false for checkIndex since there's no point in comparing
1027          * indexes in this step, even though we do intend the indexes to be
1028          * part of the sort key...
1029          */
1030         ntuples = state->memtupcount;
1031         state->memtupcount = 0;         /* make the heap empty */
1032         for (j = 0; j < ntuples; j++)
1033                 tuplesort_heap_insert(state, state->memtuples[j], 0, false);
1034         Assert(state->memtupcount == ntuples);
1035
1036         state->currentRun = 0;
1037
1038         /*
1039          * Initialize variables of Algorithm D (step D1).
1040          */
1041         for (j = 0; j < MAXTAPES; j++)
1042         {
1043                 state->tp_fib[j] = 1;
1044                 state->tp_runs[j] = 0;
1045                 state->tp_dummy[j] = 1;
1046                 state->tp_tapenum[j] = j;
1047         }
1048         state->tp_fib[TAPERANGE] = 0;
1049         state->tp_dummy[TAPERANGE] = 0;
1050
1051         state->Level = 1;
1052         state->destTape = 0;
1053
1054         state->status = TSS_BUILDRUNS;
1055 }
1056
1057 /*
1058  * selectnewtape -- select new tape for new initial run.
1059  *
1060  * This is called after finishing a run when we know another run
1061  * must be started.  This implements steps D3, D4 of Algorithm D.
1062  */
1063 static void
1064 selectnewtape(Tuplesortstate *state)
1065 {
1066         int                     j;
1067         int                     a;
1068
1069         /* Step D3: advance j (destTape) */
1070         if (state->tp_dummy[state->destTape] < state->tp_dummy[state->destTape + 1])
1071         {
1072                 state->destTape++;
1073                 return;
1074         }
1075         if (state->tp_dummy[state->destTape] != 0)
1076         {
1077                 state->destTape = 0;
1078                 return;
1079         }
1080
1081         /* Step D4: increase level */
1082         state->Level++;
1083         a = state->tp_fib[0];
1084         for (j = 0; j < TAPERANGE; j++)
1085         {
1086                 state->tp_dummy[j] = a + state->tp_fib[j + 1] - state->tp_fib[j];
1087                 state->tp_fib[j] = a + state->tp_fib[j + 1];
1088         }
1089         state->destTape = 0;
1090 }
1091
1092 /*
1093  * mergeruns -- merge all the completed initial runs.
1094  *
1095  * This implements steps D5, D6 of Algorithm D.  All input data has
1096  * already been written to initial runs on tape (see dumptuples).
1097  */
1098 static void
1099 mergeruns(Tuplesortstate *state)
1100 {
1101         int                     tapenum,
1102                                 svTape,
1103                                 svRuns,
1104                                 svDummy;
1105
1106         Assert(state->status == TSS_BUILDRUNS);
1107         Assert(state->memtupcount == 0);
1108
1109         /*
1110          * If we produced only one initial run (quite likely if the total data
1111          * volume is between 1X and 2X SortMem), we can just use that tape as
1112          * the finished output, rather than doing a useless merge.
1113          */
1114         if (state->currentRun == 1)
1115         {
1116                 state->result_tape = state->tp_tapenum[state->destTape];
1117                 /* must freeze and rewind the finished output tape */
1118                 LogicalTapeFreeze(state->tapeset, state->result_tape);
1119                 state->status = TSS_SORTEDONTAPE;
1120                 return;
1121         }
1122
1123         /* End of step D2: rewind all output tapes to prepare for merging */
1124         for (tapenum = 0; tapenum < TAPERANGE; tapenum++)
1125                 LogicalTapeRewind(state->tapeset, tapenum, false);
1126
1127         for (;;)
1128         {
1129                 /* Step D5: merge runs onto tape[T] until tape[P] is empty */
1130                 while (state->tp_runs[TAPERANGE - 1] || state->tp_dummy[TAPERANGE - 1])
1131                 {
1132                         bool            allDummy = true;
1133                         bool            allOneRun = true;
1134
1135                         for (tapenum = 0; tapenum < TAPERANGE; tapenum++)
1136                         {
1137                                 if (state->tp_dummy[tapenum] == 0)
1138                                         allDummy = false;
1139                                 if (state->tp_runs[tapenum] + state->tp_dummy[tapenum] != 1)
1140                                         allOneRun = false;
1141                         }
1142
1143                         /*
1144                          * If we don't have to produce a materialized sorted tape,
1145                          * quit as soon as we're down to one real/dummy run per tape.
1146                          */
1147                         if (!state->randomAccess && allOneRun)
1148                         {
1149                                 Assert(!allDummy);
1150                                 /* Initialize for the final merge pass */
1151                                 beginmerge(state);
1152                                 state->status = TSS_FINALMERGE;
1153                                 return;
1154                         }
1155                         if (allDummy)
1156                         {
1157                                 state->tp_dummy[TAPERANGE]++;
1158                                 for (tapenum = 0; tapenum < TAPERANGE; tapenum++)
1159                                         state->tp_dummy[tapenum]--;
1160                         }
1161                         else
1162                                 mergeonerun(state);
1163                 }
1164                 /* Step D6: decrease level */
1165                 if (--state->Level == 0)
1166                         break;
1167                 /* rewind output tape T to use as new input */
1168                 LogicalTapeRewind(state->tapeset, state->tp_tapenum[TAPERANGE],
1169                                                   false);
1170                 /* rewind used-up input tape P, and prepare it for write pass */
1171                 LogicalTapeRewind(state->tapeset, state->tp_tapenum[TAPERANGE - 1],
1172                                                   true);
1173                 state->tp_runs[TAPERANGE - 1] = 0;
1174
1175                 /*
1176                  * reassign tape units per step D6; note we no longer care about
1177                  * A[]
1178                  */
1179                 svTape = state->tp_tapenum[TAPERANGE];
1180                 svDummy = state->tp_dummy[TAPERANGE];
1181                 svRuns = state->tp_runs[TAPERANGE];
1182                 for (tapenum = TAPERANGE; tapenum > 0; tapenum--)
1183                 {
1184                         state->tp_tapenum[tapenum] = state->tp_tapenum[tapenum - 1];
1185                         state->tp_dummy[tapenum] = state->tp_dummy[tapenum - 1];
1186                         state->tp_runs[tapenum] = state->tp_runs[tapenum - 1];
1187                 }
1188                 state->tp_tapenum[0] = svTape;
1189                 state->tp_dummy[0] = svDummy;
1190                 state->tp_runs[0] = svRuns;
1191         }
1192
1193         /*
1194          * Done.  Knuth says that the result is on TAPE[1], but since we
1195          * exited the loop without performing the last iteration of step D6,
1196          * we have not rearranged the tape unit assignment, and therefore the
1197          * result is on TAPE[T].  We need to do it this way so that we can
1198          * freeze the final output tape while rewinding it.  The last
1199          * iteration of step D6 would be a waste of cycles anyway...
1200          */
1201         state->result_tape = state->tp_tapenum[TAPERANGE];
1202         LogicalTapeFreeze(state->tapeset, state->result_tape);
1203         state->status = TSS_SORTEDONTAPE;
1204 }
1205
1206 /*
1207  * Merge one run from each input tape, except ones with dummy runs.
1208  *
1209  * This is the inner loop of Algorithm D step D5.  We know that the
1210  * output tape is TAPE[T].
1211  */
1212 static void
1213 mergeonerun(Tuplesortstate *state)
1214 {
1215         int                     destTape = state->tp_tapenum[TAPERANGE];
1216         int                     srcTape;
1217         int                     tupIndex;
1218         void       *tup;
1219         long            priorAvail,
1220                                 spaceFreed;
1221
1222         /*
1223          * Start the merge by loading one tuple from each active source tape
1224          * into the heap.  We can also decrease the input run/dummy run
1225          * counts.
1226          */
1227         beginmerge(state);
1228
1229         /*
1230          * Execute merge by repeatedly extracting lowest tuple in heap,
1231          * writing it out, and replacing it with next tuple from same tape (if
1232          * there is another one).
1233          */
1234         while (state->memtupcount > 0)
1235         {
1236                 /* write the tuple to destTape */
1237                 priorAvail = state->availMem;
1238                 srcTape = state->memtupindex[0];
1239                 WRITETUP(state, destTape, state->memtuples[0]);
1240                 /* writetup adjusted total free space, now fix per-tape space */
1241                 spaceFreed = state->availMem - priorAvail;
1242                 state->mergeavailmem[srcTape] += spaceFreed;
1243                 /* compact the heap */
1244                 tuplesort_heap_siftup(state, false);
1245                 if ((tupIndex = state->mergenext[srcTape]) == 0)
1246                 {
1247                         /* out of preloaded data on this tape, try to read more */
1248                         mergepreread(state);
1249                         /* if still no data, we've reached end of run on this tape */
1250                         if ((tupIndex = state->mergenext[srcTape]) == 0)
1251                                 continue;
1252                 }
1253                 /* pull next preread tuple from list, insert in heap */
1254                 tup = state->memtuples[tupIndex];
1255                 state->mergenext[srcTape] = state->memtupindex[tupIndex];
1256                 if (state->mergenext[srcTape] == 0)
1257                         state->mergelast[srcTape] = 0;
1258                 state->memtupindex[tupIndex] = state->mergefreelist;
1259                 state->mergefreelist = tupIndex;
1260                 tuplesort_heap_insert(state, tup, srcTape, false);
1261         }
1262
1263         /*
1264          * When the heap empties, we're done.  Write an end-of-run marker on
1265          * the output tape, and increment its count of real runs.
1266          */
1267         markrunend(state, destTape);
1268         state->tp_runs[TAPERANGE]++;
1269 }
1270
1271 /*
1272  * beginmerge - initialize for a merge pass
1273  *
1274  * We decrease the counts of real and dummy runs for each tape, and mark
1275  * which tapes contain active input runs in mergeactive[].      Then, load
1276  * as many tuples as we can from each active input tape, and finally
1277  * fill the merge heap with the first tuple from each active tape.
1278  */
1279 static void
1280 beginmerge(Tuplesortstate *state)
1281 {
1282         int                     activeTapes;
1283         int                     tapenum;
1284         int                     srcTape;
1285
1286         /* Heap should be empty here */
1287         Assert(state->memtupcount == 0);
1288
1289         /* Clear merge-pass state variables */
1290         memset(state->mergeactive, 0, sizeof(state->mergeactive));
1291         memset(state->mergenext, 0, sizeof(state->mergenext));
1292         memset(state->mergelast, 0, sizeof(state->mergelast));
1293         memset(state->mergeavailmem, 0, sizeof(state->mergeavailmem));
1294         state->mergefreelist = 0;       /* nothing in the freelist */
1295         state->mergefirstfree = MAXTAPES;       /* first slot available for
1296                                                                                  * preread */
1297
1298         /* Adjust run counts and mark the active tapes */
1299         activeTapes = 0;
1300         for (tapenum = 0; tapenum < TAPERANGE; tapenum++)
1301         {
1302                 if (state->tp_dummy[tapenum] > 0)
1303                         state->tp_dummy[tapenum]--;
1304                 else
1305                 {
1306                         Assert(state->tp_runs[tapenum] > 0);
1307                         state->tp_runs[tapenum]--;
1308                         srcTape = state->tp_tapenum[tapenum];
1309                         state->mergeactive[srcTape] = true;
1310                         activeTapes++;
1311                 }
1312         }
1313
1314         /*
1315          * Initialize space allocation to let each active input tape have an
1316          * equal share of preread space.
1317          */
1318         Assert(activeTapes > 0);
1319         state->spacePerTape = state->availMem / activeTapes;
1320         for (srcTape = 0; srcTape < MAXTAPES; srcTape++)
1321         {
1322                 if (state->mergeactive[srcTape])
1323                         state->mergeavailmem[srcTape] = state->spacePerTape;
1324         }
1325
1326         /*
1327          * Preread as many tuples as possible (and at least one) from each
1328          * active tape
1329          */
1330         mergepreread(state);
1331
1332         /* Load the merge heap with the first tuple from each input tape */
1333         for (srcTape = 0; srcTape < MAXTAPES; srcTape++)
1334         {
1335                 int                     tupIndex = state->mergenext[srcTape];
1336                 void       *tup;
1337
1338                 if (tupIndex)
1339                 {
1340                         tup = state->memtuples[tupIndex];
1341                         state->mergenext[srcTape] = state->memtupindex[tupIndex];
1342                         if (state->mergenext[srcTape] == 0)
1343                                 state->mergelast[srcTape] = 0;
1344                         state->memtupindex[tupIndex] = state->mergefreelist;
1345                         state->mergefreelist = tupIndex;
1346                         tuplesort_heap_insert(state, tup, srcTape, false);
1347                 }
1348         }
1349 }
1350
1351 /*
1352  * mergepreread - load tuples from merge input tapes
1353  *
1354  * This routine exists to improve sequentiality of reads during a merge pass,
1355  * as explained in the header comments of this file.  Load tuples from each
1356  * active source tape until the tape's run is exhausted or it has used up
1357  * its fair share of available memory.  In any case, we guarantee that there
1358  * is at one preread tuple available from each unexhausted input tape.
1359  */
1360 static void
1361 mergepreread(Tuplesortstate *state)
1362 {
1363         int                     srcTape;
1364         unsigned int tuplen;
1365         void       *tup;
1366         int                     tupIndex;
1367         long            priorAvail,
1368                                 spaceUsed;
1369
1370         for (srcTape = 0; srcTape < MAXTAPES; srcTape++)
1371         {
1372                 if (!state->mergeactive[srcTape])
1373                         continue;
1374
1375                 /*
1376                  * Skip reading from any tape that still has at least half of its
1377                  * target memory filled with tuples (threshold fraction may need
1378                  * adjustment?).  This avoids reading just a few tuples when the
1379                  * incoming runs are not being consumed evenly.
1380                  */
1381                 if (state->mergenext[srcTape] != 0 &&
1382                         state->mergeavailmem[srcTape] <= state->spacePerTape / 2)
1383                         continue;
1384
1385                 /*
1386                  * Read tuples from this tape until it has used up its free
1387                  * memory, but ensure that we have at least one.
1388                  */
1389                 priorAvail = state->availMem;
1390                 state->availMem = state->mergeavailmem[srcTape];
1391                 while (!LACKMEM(state) || state->mergenext[srcTape] == 0)
1392                 {
1393                         /* read next tuple, if any */
1394                         if ((tuplen = getlen(state, srcTape, true)) == 0)
1395                         {
1396                                 state->mergeactive[srcTape] = false;
1397                                 break;
1398                         }
1399                         tup = READTUP(state, srcTape, tuplen);
1400                         /* find or make a free slot in memtuples[] for it */
1401                         tupIndex = state->mergefreelist;
1402                         if (tupIndex)
1403                                 state->mergefreelist = state->memtupindex[tupIndex];
1404                         else
1405                         {
1406                                 tupIndex = state->mergefirstfree++;
1407                                 /* Might need to enlarge arrays! */
1408                                 if (tupIndex >= state->memtupsize)
1409                                 {
1410                                         state->memtupsize *= 2;
1411                                         state->memtuples = (void **)
1412                                                 repalloc(state->memtuples,
1413                                                                  state->memtupsize * sizeof(void *));
1414                                         state->memtupindex = (int *)
1415                                                 repalloc(state->memtupindex,
1416                                                                  state->memtupsize * sizeof(int));
1417                                 }
1418                         }
1419                         /* store tuple, append to list for its tape */
1420                         state->memtuples[tupIndex] = tup;
1421                         state->memtupindex[tupIndex] = 0;
1422                         if (state->mergelast[srcTape])
1423                                 state->memtupindex[state->mergelast[srcTape]] = tupIndex;
1424                         else
1425                                 state->mergenext[srcTape] = tupIndex;
1426                         state->mergelast[srcTape] = tupIndex;
1427                 }
1428                 /* update per-tape and global availmem counts */
1429                 spaceUsed = state->mergeavailmem[srcTape] - state->availMem;
1430                 state->mergeavailmem[srcTape] = state->availMem;
1431                 state->availMem = priorAvail - spaceUsed;
1432         }
1433 }
1434
1435 /*
1436  * dumptuples - remove tuples from heap and write to tape
1437  *
1438  * This is used during initial-run building, but not during merging.
1439  *
1440  * When alltuples = false, dump only enough tuples to get under the
1441  * availMem limit (and leave at least one tuple in the heap in any case,
1442  * since puttuple assumes it always has a tuple to compare to).
1443  *
1444  * When alltuples = true, dump everything currently in memory.
1445  * (This case is only used at end of input data.)
1446  *
1447  * If we empty the heap, close out the current run and return (this should
1448  * only happen at end of input data).  If we see that the tuple run number
1449  * at the top of the heap has changed, start a new run.
1450  */
1451 static void
1452 dumptuples(Tuplesortstate *state, bool alltuples)
1453 {
1454         while (alltuples ||
1455                    (LACKMEM(state) && state->memtupcount > 1))
1456         {
1457                 /*
1458                  * Dump the heap's frontmost entry, and sift up to remove it from
1459                  * the heap.
1460                  */
1461                 Assert(state->memtupcount > 0);
1462                 WRITETUP(state, state->tp_tapenum[state->destTape],
1463                                  state->memtuples[0]);
1464                 tuplesort_heap_siftup(state, true);
1465
1466                 /*
1467                  * If the heap is empty *or* top run number has changed, we've
1468                  * finished the current run.
1469                  */
1470                 if (state->memtupcount == 0 ||
1471                         state->currentRun != state->memtupindex[0])
1472                 {
1473                         markrunend(state, state->tp_tapenum[state->destTape]);
1474                         state->currentRun++;
1475                         state->tp_runs[state->destTape]++;
1476                         state->tp_dummy[state->destTape]--; /* per Alg D step D2 */
1477
1478                         /*
1479                          * Done if heap is empty, else prepare for new run.
1480                          */
1481                         if (state->memtupcount == 0)
1482                                 break;
1483                         Assert(state->currentRun == state->memtupindex[0]);
1484                         selectnewtape(state);
1485                 }
1486         }
1487 }
1488
1489 /*
1490  * tuplesort_rescan             - rewind and replay the scan
1491  */
1492 void
1493 tuplesort_rescan(Tuplesortstate *state)
1494 {
1495         Assert(state->randomAccess);
1496
1497         switch (state->status)
1498         {
1499                 case TSS_SORTEDINMEM:
1500                         state->current = 0;
1501                         state->eof_reached = false;
1502                         state->markpos_offset = 0;
1503                         state->markpos_eof = false;
1504                         break;
1505                 case TSS_SORTEDONTAPE:
1506                         LogicalTapeRewind(state->tapeset,
1507                                                           state->result_tape,
1508                                                           false);
1509                         state->eof_reached = false;
1510                         state->markpos_block = 0L;
1511                         state->markpos_offset = 0;
1512                         state->markpos_eof = false;
1513                         break;
1514                 default:
1515                         elog(ERROR, "tuplesort_rescan: invalid state");
1516                         break;
1517         }
1518 }
1519
1520 /*
1521  * tuplesort_markpos    - saves current position in the merged sort file
1522  */
1523 void
1524 tuplesort_markpos(Tuplesortstate *state)
1525 {
1526         Assert(state->randomAccess);
1527
1528         switch (state->status)
1529         {
1530                 case TSS_SORTEDINMEM:
1531                         state->markpos_offset = state->current;
1532                         state->markpos_eof = state->eof_reached;
1533                         break;
1534                 case TSS_SORTEDONTAPE:
1535                         LogicalTapeTell(state->tapeset,
1536                                                         state->result_tape,
1537                                                         &state->markpos_block,
1538                                                         &state->markpos_offset);
1539                         state->markpos_eof = state->eof_reached;
1540                         break;
1541                 default:
1542                         elog(ERROR, "tuplesort_markpos: invalid state");
1543                         break;
1544         }
1545 }
1546
1547 /*
1548  * tuplesort_restorepos - restores current position in merged sort file to
1549  *                                                last saved position
1550  */
1551 void
1552 tuplesort_restorepos(Tuplesortstate *state)
1553 {
1554         Assert(state->randomAccess);
1555
1556         switch (state->status)
1557         {
1558                 case TSS_SORTEDINMEM:
1559                         state->current = state->markpos_offset;
1560                         state->eof_reached = state->markpos_eof;
1561                         break;
1562                 case TSS_SORTEDONTAPE:
1563                         if (!LogicalTapeSeek(state->tapeset,
1564                                                                  state->result_tape,
1565                                                                  state->markpos_block,
1566                                                                  state->markpos_offset))
1567                                 elog(ERROR, "tuplesort_restorepos failed");
1568                         state->eof_reached = state->markpos_eof;
1569                         break;
1570                 default:
1571                         elog(ERROR, "tuplesort_restorepos: invalid state");
1572                         break;
1573         }
1574 }
1575
1576
1577 /*
1578  * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
1579  *
1580  * The heap lives in state->memtuples[], with parallel data storage
1581  * for indexes in state->memtupindex[].  If checkIndex is true, use
1582  * the tuple index as the front of the sort key; otherwise, no.
1583  */
1584
1585 #define HEAPCOMPARE(tup1,index1,tup2,index2) \
1586         (checkIndex && (index1 != index2) ? index1 - index2 : \
1587          COMPARETUP(state, tup1, tup2))
1588
1589 /*
1590  * Insert a new tuple into an empty or existing heap, maintaining the
1591  * heap invariant.
1592  */
1593 static void
1594 tuplesort_heap_insert(Tuplesortstate *state, void *tuple,
1595                                           int tupleindex, bool checkIndex)
1596 {
1597         void      **memtuples;
1598         int                *memtupindex;
1599         int                     j;
1600
1601         /*
1602          * Make sure memtuples[] can handle another entry.
1603          */
1604         if (state->memtupcount >= state->memtupsize)
1605         {
1606                 state->memtupsize *= 2;
1607                 state->memtuples = (void **)
1608                         repalloc(state->memtuples,
1609                                          state->memtupsize * sizeof(void *));
1610                 state->memtupindex = (int *)
1611                         repalloc(state->memtupindex,
1612                                          state->memtupsize * sizeof(int));
1613         }
1614         memtuples = state->memtuples;
1615         memtupindex = state->memtupindex;
1616
1617         /*
1618          * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth
1619          * is using 1-based array indexes, not 0-based.
1620          */
1621         j = state->memtupcount++;
1622         while (j > 0)
1623         {
1624                 int                     i = (j - 1) >> 1;
1625
1626                 if (HEAPCOMPARE(tuple, tupleindex,
1627                                                 memtuples[i], memtupindex[i]) >= 0)
1628                         break;
1629                 memtuples[j] = memtuples[i];
1630                 memtupindex[j] = memtupindex[i];
1631                 j = i;
1632         }
1633         memtuples[j] = tuple;
1634         memtupindex[j] = tupleindex;
1635 }
1636
1637 /*
1638  * The tuple at state->memtuples[0] has been removed from the heap.
1639  * Decrement memtupcount, and sift up to maintain the heap invariant.
1640  */
1641 static void
1642 tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex)
1643 {
1644         void      **memtuples = state->memtuples;
1645         int                *memtupindex = state->memtupindex;
1646         void       *tuple;
1647         int                     tupindex,
1648                                 i,
1649                                 n;
1650
1651         if (--state->memtupcount <= 0)
1652                 return;
1653         n = state->memtupcount;
1654         tuple = memtuples[n];           /* tuple that must be reinserted */
1655         tupindex = memtupindex[n];
1656         i = 0;                                          /* i is where the "hole" is */
1657         for (;;)
1658         {
1659                 int                     j = 2 * i + 1;
1660
1661                 if (j >= n)
1662                         break;
1663                 if (j + 1 < n &&
1664                         HEAPCOMPARE(memtuples[j], memtupindex[j],
1665                                                 memtuples[j + 1], memtupindex[j + 1]) > 0)
1666                         j++;
1667                 if (HEAPCOMPARE(tuple, tupindex,
1668                                                 memtuples[j], memtupindex[j]) <= 0)
1669                         break;
1670                 memtuples[i] = memtuples[j];
1671                 memtupindex[i] = memtupindex[j];
1672                 i = j;
1673         }
1674         memtuples[i] = tuple;
1675         memtupindex[i] = tupindex;
1676 }
1677
1678
1679 /*
1680  * Tape interface routines
1681  */
1682
1683 static unsigned int
1684 getlen(Tuplesortstate *state, int tapenum, bool eofOK)
1685 {
1686         unsigned int len;
1687
1688         if (LogicalTapeRead(state->tapeset, tapenum, (void *) &len,
1689                                                 sizeof(len)) != sizeof(len))
1690                 elog(ERROR, "tuplesort: unexpected end of tape");
1691         if (len == 0 && !eofOK)
1692                 elog(ERROR, "tuplesort: unexpected end of data");
1693         return len;
1694 }
1695
1696 static void
1697 markrunend(Tuplesortstate *state, int tapenum)
1698 {
1699         unsigned int len = 0;
1700
1701         LogicalTapeWrite(state->tapeset, tapenum, (void *) &len, sizeof(len));
1702 }
1703
1704
1705 /*
1706  * qsort interface
1707  */
1708
1709 static int
1710 qsort_comparetup(const void *a, const void *b)
1711 {
1712         /* The passed pointers are pointers to void * ... */
1713
1714         return COMPARETUP(qsort_tuplesortstate, *(void **) a, *(void **) b);
1715 }
1716
1717
1718 /*
1719  * Routines specialized for HeapTuple case
1720  */
1721
1722 static int
1723 comparetup_heap(Tuplesortstate *state, const void *a, const void *b)
1724 {
1725         HeapTuple       ltup = (HeapTuple) a;
1726         HeapTuple       rtup = (HeapTuple) b;
1727         TupleDesc       tupDesc = state->tupDesc;
1728         int                     nkey;
1729
1730         for (nkey = 0; nkey < state->nKeys; nkey++)
1731         {
1732                 ScanKey         scanKey = state->scanKeys + nkey;
1733                 AttrNumber      attno = scanKey->sk_attno;
1734                 Datum           datum1,
1735                                         datum2;
1736                 bool            isnull1,
1737                                         isnull2;
1738                 int32           compare;
1739
1740                 datum1 = heap_getattr(ltup, attno, tupDesc, &isnull1);
1741                 datum2 = heap_getattr(rtup, attno, tupDesc, &isnull2);
1742
1743                 compare = ApplySortFunction(&scanKey->sk_func,
1744                                                                         state->sortFnKinds[nkey],
1745                                                                         datum1, isnull1, datum2, isnull2);
1746                 if (compare != 0)
1747                 {
1748                         /* dead code? SK_COMMUTE can't actually be set here, can it? */
1749                         if (scanKey->sk_flags & SK_COMMUTE)
1750                                 compare = -compare;
1751                         return compare;
1752                 }
1753         }
1754
1755         return 0;
1756 }
1757
1758 static void *
1759 copytup_heap(Tuplesortstate *state, void *tup)
1760 {
1761         HeapTuple       tuple = (HeapTuple) tup;
1762
1763         USEMEM(state, HEAPTUPLESIZE + tuple->t_len);
1764         return (void *) heap_copytuple(tuple);
1765 }
1766
1767 /*
1768  * We don't bother to write the HeapTupleData part of the tuple.
1769  */
1770
1771 static void
1772 writetup_heap(Tuplesortstate *state, int tapenum, void *tup)
1773 {
1774         HeapTuple       tuple = (HeapTuple) tup;
1775         unsigned int tuplen;
1776
1777         tuplen = tuple->t_len + sizeof(tuplen);
1778         LogicalTapeWrite(state->tapeset, tapenum,
1779                                          (void *) &tuplen, sizeof(tuplen));
1780         LogicalTapeWrite(state->tapeset, tapenum,
1781                                          (void *) tuple->t_data, tuple->t_len);
1782         if (state->randomAccess)        /* need trailing length word? */
1783                 LogicalTapeWrite(state->tapeset, tapenum,
1784                                                  (void *) &tuplen, sizeof(tuplen));
1785
1786         FREEMEM(state, HEAPTUPLESIZE + tuple->t_len);
1787         heap_freetuple(tuple);
1788 }
1789
1790 static void *
1791 readtup_heap(Tuplesortstate *state, int tapenum, unsigned int len)
1792 {
1793         unsigned int tuplen = len - sizeof(unsigned int) + HEAPTUPLESIZE;
1794         HeapTuple       tuple = (HeapTuple) palloc(tuplen);
1795
1796         USEMEM(state, tuplen);
1797         /* reconstruct the HeapTupleData portion */
1798         tuple->t_len = len - sizeof(unsigned int);
1799         ItemPointerSetInvalid(&(tuple->t_self));
1800         tuple->t_datamcxt = CurrentMemoryContext;
1801         tuple->t_data = (HeapTupleHeader) (((char *) tuple) + HEAPTUPLESIZE);
1802         /* read in the tuple proper */
1803         if (LogicalTapeRead(state->tapeset, tapenum, (void *) tuple->t_data,
1804                                                 tuple->t_len) != tuple->t_len)
1805                 elog(ERROR, "tuplesort: unexpected end of data");
1806         if (state->randomAccess)        /* need trailing length word? */
1807                 if (LogicalTapeRead(state->tapeset, tapenum, (void *) &tuplen,
1808                                                         sizeof(tuplen)) != sizeof(tuplen))
1809                         elog(ERROR, "tuplesort: unexpected end of data");
1810         return (void *) tuple;
1811 }
1812
1813 static unsigned int
1814 tuplesize_heap(Tuplesortstate *state, void *tup)
1815 {
1816         HeapTuple       tuple = (HeapTuple) tup;
1817
1818         return HEAPTUPLESIZE + tuple->t_len;
1819 }
1820
1821
1822 /*
1823  * Routines specialized for IndexTuple case
1824  *
1825  * NOTE: actually, these are specialized for the btree case; it's not
1826  * clear whether you could use them for a non-btree index.      Possibly
1827  * you'd need to make another set of routines if you needed to sort
1828  * according to another kind of index.
1829  */
1830
1831 static int
1832 comparetup_index(Tuplesortstate *state, const void *a, const void *b)
1833 {
1834         /*
1835          * This is almost the same as _bt_tuplecompare(), but we need to keep
1836          * track of whether any null fields are present.
1837          */
1838         IndexTuple      tuple1 = (IndexTuple) a;
1839         IndexTuple      tuple2 = (IndexTuple) b;
1840         Relation        rel = state->indexRel;
1841         int                     keysz = RelationGetNumberOfAttributes(rel);
1842         ScanKey         scankey = state->indexScanKey;
1843         TupleDesc       tupDes;
1844         int                     i;
1845         bool            equal_hasnull = false;
1846
1847         tupDes = RelationGetDescr(rel);
1848
1849         for (i = 1; i <= keysz; i++)
1850         {
1851                 ScanKey         entry = &scankey[i - 1];
1852                 Datum           datum1,
1853                                         datum2;
1854                 bool            isnull1,
1855                                         isnull2;
1856                 int32           compare;
1857
1858                 datum1 = index_getattr(tuple1, i, tupDes, &isnull1);
1859                 datum2 = index_getattr(tuple2, i, tupDes, &isnull2);
1860
1861                 /* see comments about NULLs handling in btbuild */
1862
1863                 /* the comparison function is always of CMP type */
1864                 compare = ApplySortFunction(&entry->sk_func, SORTFUNC_CMP,
1865                                                                         datum1, isnull1, datum2, isnull2);
1866
1867                 if (compare != 0)
1868                         return (int) compare;           /* done when we find unequal
1869                                                                                  * attributes */
1870
1871                 /* they are equal, so we only need to examine one null flag */
1872                 if (isnull1)
1873                         equal_hasnull = true;
1874         }
1875
1876         /*
1877          * If btree has asked us to enforce uniqueness, complain if two equal
1878          * tuples are detected (unless there was at least one NULL field).
1879          *
1880          * It is sufficient to make the test here, because if two tuples are
1881          * equal they *must* get compared at some stage of the sort ---
1882          * otherwise the sort algorithm wouldn't have checked whether one must
1883          * appear before the other.
1884          *
1885          * Some rather brain-dead implementations of qsort will sometimes
1886          * call the comparison routine to compare a value to itself.  (At this
1887          * writing only QNX 4 is known to do such silly things.)  Don't raise
1888          * a bogus error in that case.
1889          */
1890         if (state->enforceUnique && !equal_hasnull && tuple1 != tuple2)
1891                 elog(ERROR, "Cannot create unique index. Table contains non-unique values");
1892
1893         return 0;
1894 }
1895
1896 static void *
1897 copytup_index(Tuplesortstate *state, void *tup)
1898 {
1899         IndexTuple      tuple = (IndexTuple) tup;
1900         unsigned int tuplen = IndexTupleSize(tuple);
1901         IndexTuple      newtuple;
1902
1903         USEMEM(state, tuplen);
1904         newtuple = (IndexTuple) palloc(tuplen);
1905         memcpy(newtuple, tuple, tuplen);
1906
1907         return (void *) newtuple;
1908 }
1909
1910 static void
1911 writetup_index(Tuplesortstate *state, int tapenum, void *tup)
1912 {
1913         IndexTuple      tuple = (IndexTuple) tup;
1914         unsigned int tuplen;
1915
1916         tuplen = IndexTupleSize(tuple) + sizeof(tuplen);
1917         LogicalTapeWrite(state->tapeset, tapenum,
1918                                          (void *) &tuplen, sizeof(tuplen));
1919         LogicalTapeWrite(state->tapeset, tapenum,
1920                                          (void *) tuple, IndexTupleSize(tuple));
1921         if (state->randomAccess)        /* need trailing length word? */
1922                 LogicalTapeWrite(state->tapeset, tapenum,
1923                                                  (void *) &tuplen, sizeof(tuplen));
1924
1925         FREEMEM(state, IndexTupleSize(tuple));
1926         pfree(tuple);
1927 }
1928
1929 static void *
1930 readtup_index(Tuplesortstate *state, int tapenum, unsigned int len)
1931 {
1932         unsigned int tuplen = len - sizeof(unsigned int);
1933         IndexTuple      tuple = (IndexTuple) palloc(tuplen);
1934
1935         USEMEM(state, tuplen);
1936         if (LogicalTapeRead(state->tapeset, tapenum, (void *) tuple,
1937                                                 tuplen) != tuplen)
1938                 elog(ERROR, "tuplesort: unexpected end of data");
1939         if (state->randomAccess)        /* need trailing length word? */
1940                 if (LogicalTapeRead(state->tapeset, tapenum, (void *) &tuplen,
1941                                                         sizeof(tuplen)) != sizeof(tuplen))
1942                         elog(ERROR, "tuplesort: unexpected end of data");
1943         return (void *) tuple;
1944 }
1945
1946 static unsigned int
1947 tuplesize_index(Tuplesortstate *state, void *tup)
1948 {
1949         IndexTuple      tuple = (IndexTuple) tup;
1950         unsigned int tuplen = IndexTupleSize(tuple);
1951
1952         return tuplen;
1953 }
1954
1955
1956 /*
1957  * Routines specialized for DatumTuple case
1958  */
1959
1960 static int
1961 comparetup_datum(Tuplesortstate *state, const void *a, const void *b)
1962 {
1963         DatumTuple *ltup = (DatumTuple *) a;
1964         DatumTuple *rtup = (DatumTuple *) b;
1965
1966         return ApplySortFunction(&state->sortOpFn, state->sortFnKind,
1967                                                          ltup->val, ltup->isNull,
1968                                                          rtup->val, rtup->isNull);
1969 }
1970
1971 static void *
1972 copytup_datum(Tuplesortstate *state, void *tup)
1973 {
1974         /* Not currently needed */
1975         elog(ERROR, "copytup_datum() should not be called");
1976         return NULL;
1977 }
1978
1979 static void
1980 writetup_datum(Tuplesortstate *state, int tapenum, void *tup)
1981 {
1982         DatumTuple *tuple = (DatumTuple *) tup;
1983         unsigned int tuplen = tuplesize_datum(state, tup);
1984         unsigned int writtenlen = tuplen + sizeof(unsigned int);
1985
1986         LogicalTapeWrite(state->tapeset, tapenum,
1987                                          (void *) &writtenlen, sizeof(writtenlen));
1988         LogicalTapeWrite(state->tapeset, tapenum,
1989                                          (void *) tuple, tuplen);
1990         if (state->randomAccess)        /* need trailing length word? */
1991                 LogicalTapeWrite(state->tapeset, tapenum,
1992                                                  (void *) &writtenlen, sizeof(writtenlen));
1993
1994         FREEMEM(state, tuplen);
1995         pfree(tuple);
1996 }
1997
1998 static void *
1999 readtup_datum(Tuplesortstate *state, int tapenum, unsigned int len)
2000 {
2001         unsigned int tuplen = len - sizeof(unsigned int);
2002         DatumTuple *tuple = (DatumTuple *) palloc(tuplen);
2003
2004         USEMEM(state, tuplen);
2005         if (LogicalTapeRead(state->tapeset, tapenum, (void *) tuple,
2006                                                 tuplen) != tuplen)
2007                 elog(ERROR, "tuplesort: unexpected end of data");
2008         if (state->randomAccess)        /* need trailing length word? */
2009                 if (LogicalTapeRead(state->tapeset, tapenum, (void *) &tuplen,
2010                                                         sizeof(tuplen)) != sizeof(tuplen))
2011                         elog(ERROR, "tuplesort: unexpected end of data");
2012
2013         if (!tuple->isNull && !state->datumTypeByVal)
2014                 tuple->val = PointerGetDatum(((char *) tuple) +
2015                                                                          MAXALIGN(sizeof(DatumTuple)));
2016         return (void *) tuple;
2017 }
2018
2019 static unsigned int
2020 tuplesize_datum(Tuplesortstate *state, void *tup)
2021 {
2022         DatumTuple *tuple = (DatumTuple *) tup;
2023
2024         if (tuple->isNull || state->datumTypeByVal)
2025                 return (unsigned int) sizeof(DatumTuple);
2026         else
2027         {
2028                 int                     datalen = state->datumTypeLen;
2029                 int                     tuplelen;
2030
2031                 if (datalen == -1)              /* variable length type? */
2032                         datalen = VARSIZE((struct varlena *) DatumGetPointer(tuple->val));
2033                 tuplelen = datalen + MAXALIGN(sizeof(DatumTuple));
2034                 return (unsigned int) tuplelen;
2035         }
2036 }
2037
2038
2039 /*
2040  * This routine selects an appropriate sorting function to implement
2041  * a sort operator as efficiently as possible.  The straightforward
2042  * method is to use the operator's implementation proc --- ie, "<"
2043  * comparison.  However, that way often requires two calls of the function
2044  * per comparison.      If we can find a btree three-way comparator function
2045  * associated with the operator, we can use it to do the comparisons
2046  * more efficiently.  We also support the possibility that the operator
2047  * is ">" (descending sort), in which case we have to reverse the output
2048  * of the btree comparator.
2049  *
2050  * Possibly this should live somewhere else (backend/catalog/, maybe?).
2051  */
2052 void
2053 SelectSortFunction(Oid sortOperator,
2054                                    RegProcedure *sortFunction,
2055                                    SortFunctionKind *kind)
2056 {
2057         Relation        relation;
2058         HeapScanDesc scan;
2059         ScanKeyData skey[1];
2060         HeapTuple       tuple;
2061         Form_pg_operator optup;
2062         Oid                     opclass = InvalidOid;
2063
2064         /*
2065          * Scan pg_amop to see if the target operator is registered as the "<"
2066          * or ">" operator of any btree opclass.  It's possible that it might
2067          * be registered both ways (eg, if someone were to build a "reverse
2068          * sort" opclass for some reason); prefer the "<" case if so. If the
2069          * operator is registered the same way in multiple opclasses, assume
2070          * we can use the associated comparator function from any one.
2071          */
2072         ScanKeyEntryInitialize(&skey[0], 0x0,
2073                                                    Anum_pg_amop_amopopr,
2074                                                    F_OIDEQ,
2075                                                    ObjectIdGetDatum(sortOperator));
2076
2077         relation = heap_openr(AccessMethodOperatorRelationName, AccessShareLock);
2078         scan = heap_beginscan(relation, false, SnapshotNow, 1, skey);
2079
2080         while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
2081         {
2082                 Form_pg_amop aform = (Form_pg_amop) GETSTRUCT(tuple);
2083
2084                 if (!opclass_is_btree(aform->amopclaid))
2085                         continue;
2086                 if (aform->amopstrategy == BTLessStrategyNumber)
2087                 {
2088                         opclass = aform->amopclaid;
2089                         *kind = SORTFUNC_CMP;
2090                         break;                          /* done looking */
2091                 }
2092                 else if (aform->amopstrategy == BTGreaterStrategyNumber)
2093                 {
2094                         opclass = aform->amopclaid;
2095                         *kind = SORTFUNC_REVCMP;
2096                         /* keep scanning in hopes of finding a BTLess entry */
2097                 }
2098         }
2099
2100         heap_endscan(scan);
2101         heap_close(relation, AccessShareLock);
2102
2103         if (OidIsValid(opclass))
2104         {
2105                 /* Found a suitable opclass, get its comparator support function */
2106                 tuple = SearchSysCache(AMPROCNUM,
2107                                                            ObjectIdGetDatum(opclass),
2108                                                            Int16GetDatum(BTORDER_PROC),
2109                                                            0, 0);
2110                 if (HeapTupleIsValid(tuple))
2111                 {
2112                         Form_pg_amproc aform = (Form_pg_amproc) GETSTRUCT(tuple);
2113
2114                         *sortFunction = aform->amproc;
2115                         ReleaseSysCache(tuple);
2116                         Assert(RegProcedureIsValid(*sortFunction));
2117                         return;
2118                 }
2119         }
2120
2121         /*
2122          * Can't find a comparator, so use the operator as-is.  Decide whether
2123          * it is forward or reverse sort by looking at its name (grotty, but
2124          * this only matters for deciding which end NULLs should get sorted
2125          * to).
2126          */
2127         tuple = SearchSysCache(OPEROID,
2128                                                    ObjectIdGetDatum(sortOperator),
2129                                                    0, 0, 0);
2130         if (!HeapTupleIsValid(tuple))
2131                 elog(ERROR, "SelectSortFunction: cache lookup failed for operator %u",
2132                          sortOperator);
2133         optup = (Form_pg_operator) GETSTRUCT(tuple);
2134         if (strcmp(NameStr(optup->oprname), ">") == 0)
2135                 *kind = SORTFUNC_REVLT;
2136         else
2137                 *kind = SORTFUNC_LT;
2138         *sortFunction = optup->oprcode;
2139         ReleaseSysCache(tuple);
2140
2141         Assert(RegProcedureIsValid(*sortFunction));
2142 }
2143
2144 /*
2145  * Apply a sort function (by now converted to fmgr lookup form)
2146  * and return a 3-way comparison result.  This takes care of handling
2147  * NULLs and sort ordering direction properly.
2148  */
2149 int32
2150 ApplySortFunction(FmgrInfo *sortFunction, SortFunctionKind kind,
2151                                   Datum datum1, bool isNull1,
2152                                   Datum datum2, bool isNull2)
2153 {
2154         switch (kind)
2155         {
2156                 case SORTFUNC_LT:
2157                         if (isNull1)
2158                         {
2159                                 if (isNull2)
2160                                         return 0;
2161                                 return 1;               /* NULL sorts after non-NULL */
2162                         }
2163                         if (isNull2)
2164                                 return -1;
2165                         if (DatumGetBool(FunctionCall2(sortFunction, datum1, datum2)))
2166                                 return -1;              /* a < b */
2167                         if (DatumGetBool(FunctionCall2(sortFunction, datum2, datum1)))
2168                                 return 1;               /* a > b */
2169                         return 0;
2170
2171                 case SORTFUNC_REVLT:
2172                         /* We reverse the ordering of NULLs, but not the operator */
2173                         if (isNull1)
2174                         {
2175                                 if (isNull2)
2176                                         return 0;
2177                                 return -1;              /* NULL sorts before non-NULL */
2178                         }
2179                         if (isNull2)
2180                                 return 1;
2181                         if (DatumGetBool(FunctionCall2(sortFunction, datum1, datum2)))
2182                                 return -1;              /* a < b */
2183                         if (DatumGetBool(FunctionCall2(sortFunction, datum2, datum1)))
2184                                 return 1;               /* a > b */
2185                         return 0;
2186
2187                 case SORTFUNC_CMP:
2188                         if (isNull1)
2189                         {
2190                                 if (isNull2)
2191                                         return 0;
2192                                 return 1;               /* NULL sorts after non-NULL */
2193                         }
2194                         if (isNull2)
2195                                 return -1;
2196                         return DatumGetInt32(FunctionCall2(sortFunction,
2197                                                                                            datum1, datum2));
2198
2199                 case SORTFUNC_REVCMP:
2200                         if (isNull1)
2201                         {
2202                                 if (isNull2)
2203                                         return 0;
2204                                 return -1;              /* NULL sorts before non-NULL */
2205                         }
2206                         if (isNull2)
2207                                 return 1;
2208                         return -DatumGetInt32(FunctionCall2(sortFunction,
2209                                                                                                 datum1, datum2));
2210
2211                 default:
2212                         elog(ERROR, "Invalid SortFunctionKind %d", (int) kind);
2213                         return 0;                       /* can't get here, but keep compiler quiet */
2214         }
2215 }