]> granicus.if.org Git - postgresql/blob - src/backend/utils/sort/tuplesort.c
Use InitFunctionCallInfoData() macro instead of MemSet in performance
[postgresql] / src / backend / utils / sort / tuplesort.c
1 /*-------------------------------------------------------------------------
2  *
3  * tuplesort.c
4  *        Generalized tuple sorting routines.
5  *
6  * This module handles sorting of heap tuples, index tuples, or single
7  * Datums (and could easily support other kinds of sortable objects,
8  * if necessary).  It works efficiently for both small and large amounts
9  * of data.  Small amounts are sorted in-memory using qsort().  Large
10  * amounts are sorted using temporary files and a standard external sort
11  * algorithm.
12  *
13  * See Knuth, volume 3, for more than you want to know about the external
14  * sorting algorithm.  We divide the input into sorted runs using replacement
15  * selection, in the form of a priority tree implemented as a heap
16  * (essentially his Algorithm 5.2.3H), then merge the runs using polyphase
17  * merge, Knuth's Algorithm 5.4.2D.  The logical "tapes" used by Algorithm D
18  * are implemented by logtape.c, which avoids space wastage by recycling
19  * disk space as soon as each block is read from its "tape".
20  *
21  * We do not form the initial runs using Knuth's recommended replacement
22  * selection data structure (Algorithm 5.4.1R), because it uses a fixed
23  * number of records in memory at all times.  Since we are dealing with
24  * tuples that may vary considerably in size, we want to be able to vary
25  * the number of records kept in memory to ensure full utilization of the
26  * allowed sort memory space.  So, we keep the tuples in a variable-size
27  * heap, with the next record to go out at the top of the heap.  Like
28  * Algorithm 5.4.1R, each record is stored with the run number that it
29  * must go into, and we use (run number, key) as the ordering key for the
30  * heap.  When the run number at the top of the heap changes, we know that
31  * no more records of the prior run are left in the heap.
32  *
33  * The approximate amount of memory allowed for any one sort operation
34  * is specified in kilobytes by the caller (most pass work_mem).  Initially,
35  * we absorb tuples and simply store them in an unsorted array as long as
36  * we haven't exceeded workMem.  If we reach the end of the input without
37  * exceeding workMem, we sort the array using qsort() and subsequently return
38  * tuples just by scanning the tuple array sequentially.  If we do exceed
39  * workMem, we construct a heap using Algorithm H and begin to emit tuples
40  * into sorted runs in temporary tapes, emitting just enough tuples at each
41  * step to get back within the workMem limit.  Whenever the run number at
42  * the top of the heap changes, we begin a new run with a new output tape
43  * (selected per Algorithm D).  After the end of the input is reached,
44  * we dump out remaining tuples in memory into a final run (or two),
45  * then merge the runs using Algorithm D.
46  *
47  * When merging runs, we use a heap containing just the frontmost tuple from
48  * each source run; we repeatedly output the smallest tuple and insert the
49  * next tuple from its source tape (if any).  When the heap empties, the merge
50  * is complete.  The basic merge algorithm thus needs very little memory ---
51  * only M tuples for an M-way merge, and M is at most six in the present code.
52  * However, we can still make good use of our full workMem allocation by
53  * pre-reading additional tuples from each source tape.  Without prereading,
54  * our access pattern to the temporary file would be very erratic; on average
55  * we'd read one block from each of M source tapes during the same time that
56  * we're writing M blocks to the output tape, so there is no sequentiality of
57  * access at all, defeating the read-ahead methods used by most Unix kernels.
58  * Worse, the output tape gets written into a very random sequence of blocks
59  * of the temp file, ensuring that things will be even worse when it comes
60  * time to read that tape.      A straightforward merge pass thus ends up doing a
61  * lot of waiting for disk seeks.  We can improve matters by prereading from
62  * each source tape sequentially, loading about workMem/M bytes from each tape
63  * in turn.  Then we run the merge algorithm, writing but not reading until
64  * one of the preloaded tuple series runs out.  Then we switch back to preread
65  * mode, fill memory again, and repeat.  This approach helps to localize both
66  * read and write accesses.
67  *
68  * When the caller requests random access to the sort result, we form
69  * the final sorted run on a logical tape which is then "frozen", so
70  * that we can access it randomly.      When the caller does not need random
71  * access, we return from tuplesort_performsort() as soon as we are down
72  * to one run per logical tape.  The final merge is then performed
73  * on-the-fly as the caller repeatedly calls tuplesort_gettuple; this
74  * saves one cycle of writing all the data out to disk and reading it in.
75  *
76  *
77  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
78  * Portions Copyright (c) 1994, Regents of the University of California
79  *
80  * IDENTIFICATION
81  *        $PostgreSQL: pgsql/src/backend/utils/sort/tuplesort.c,v 1.47 2005/03/22 20:13:08 tgl Exp $
82  *
83  *-------------------------------------------------------------------------
84  */
85
86 #include "postgres.h"
87
88 #include "access/heapam.h"
89 #include "access/nbtree.h"
90 #include "catalog/pg_amop.h"
91 #include "catalog/pg_operator.h"
92 #include "miscadmin.h"
93 #include "utils/catcache.h"
94 #include "utils/datum.h"
95 #include "utils/logtape.h"
96 #include "utils/lsyscache.h"
97 #include "utils/syscache.h"
98 #include "utils/tuplesort.h"
99
100
101 /*
102  * Possible states of a Tuplesort object.  These denote the states that
103  * persist between calls of Tuplesort routines.
104  */
105 typedef enum
106 {
107         TSS_INITIAL,                            /* Loading tuples; still within memory
108                                                                  * limit */
109         TSS_BUILDRUNS,                          /* Loading tuples; writing to tape */
110         TSS_SORTEDINMEM,                        /* Sort completed entirely in memory */
111         TSS_SORTEDONTAPE,                       /* Sort completed, final run is on tape */
112         TSS_FINALMERGE                          /* Performing final merge on-the-fly */
113 } TupSortStatus;
114
115 /*
116  * We use a seven-tape polyphase merge, which is the "sweet spot" on the
117  * tapes-to-passes curve according to Knuth's figure 70 (section 5.4.2).
118  */
119 #define MAXTAPES                7               /* Knuth's T */
120 #define TAPERANGE               (MAXTAPES-1)    /* Knuth's P */
121
122 /*
123  * Private state of a Tuplesort operation.
124  */
125 struct Tuplesortstate
126 {
127         TupSortStatus status;           /* enumerated value as shown above */
128         bool            randomAccess;   /* did caller request random access? */
129         long            availMem;               /* remaining memory available, in bytes */
130         LogicalTapeSet *tapeset;        /* logtape.c object for tapes in a temp
131                                                                  * file */
132
133         /*
134          * These function pointers decouple the routines that must know what
135          * kind of tuple we are sorting from the routines that don't need to
136          * know it. They are set up by the tuplesort_begin_xxx routines.
137          *
138          * Function to compare two tuples; result is per qsort() convention, ie:
139          *
140          * <0, 0, >0 according as a<b, a=b, a>b.
141          */
142         int                     (*comparetup) (Tuplesortstate *state, const void *a, const void *b);
143
144         /*
145          * Function to copy a supplied input tuple into palloc'd space. (NB:
146          * we assume that a single pfree() is enough to release the tuple
147          * later, so the representation must be "flat" in one palloc chunk.)
148          * state->availMem must be decreased by the amount of space used.
149          */
150         void       *(*copytup) (Tuplesortstate *state, void *tup);
151
152         /*
153          * Function to write a stored tuple onto tape.  The representation of
154          * the tuple on tape need not be the same as it is in memory;
155          * requirements on the tape representation are given below.  After
156          * writing the tuple, pfree() it, and increase state->availMem by the
157          * amount of memory space thereby released.
158          */
159         void            (*writetup) (Tuplesortstate *state, int tapenum, void *tup);
160
161         /*
162          * Function to read a stored tuple from tape back into memory. 'len'
163          * is the already-read length of the stored tuple.      Create and return
164          * a palloc'd copy, and decrease state->availMem by the amount of
165          * memory space consumed.
166          */
167         void       *(*readtup) (Tuplesortstate *state, int tapenum, unsigned int len);
168
169         /*
170          * This array holds pointers to tuples in sort memory.  If we are in
171          * state INITIAL, the tuples are in no particular order; if we are in
172          * state SORTEDINMEM, the tuples are in final sorted order; in states
173          * BUILDRUNS and FINALMERGE, the tuples are organized in "heap" order
174          * per Algorithm H.  (Note that memtupcount only counts the tuples
175          * that are part of the heap --- during merge passes, memtuples[]
176          * entries beyond TAPERANGE are never in the heap and are used to hold
177          * pre-read tuples.)  In state SORTEDONTAPE, the array is not used.
178          */
179         void      **memtuples;          /* array of pointers to palloc'd tuples */
180         int                     memtupcount;    /* number of tuples currently present */
181         int                     memtupsize;             /* allocated length of memtuples array */
182
183         /*
184          * While building initial runs, this array holds the run number for
185          * each tuple in memtuples[].  During merge passes, we re-use it to
186          * hold the input tape number that each tuple in the heap was read
187          * from, or to hold the index of the next tuple pre-read from the same
188          * tape in the case of pre-read entries.  This array is never
189          * allocated unless we need to use tapes.  Whenever it is allocated,
190          * it has the same length as memtuples[].
191          */
192         int                *memtupindex;        /* index value associated with
193                                                                  * memtuples[i] */
194
195         /*
196          * While building initial runs, this is the current output run number
197          * (starting at 0).  Afterwards, it is the number of initial runs we
198          * made.
199          */
200         int                     currentRun;
201
202         /*
203          * These variables are only used during merge passes.  mergeactive[i]
204          * is true if we are reading an input run from (actual) tape number i
205          * and have not yet exhausted that run.  mergenext[i] is the memtuples
206          * index of the next pre-read tuple (next to be loaded into the heap)
207          * for tape i, or 0 if we are out of pre-read tuples.  mergelast[i]
208          * similarly points to the last pre-read tuple from each tape.
209          * mergeavailmem[i] is the amount of unused space allocated for tape
210          * i. mergefreelist and mergefirstfree keep track of unused locations
211          * in the memtuples[] array.  memtupindex[] links together pre-read
212          * tuples for each tape as well as recycled locations in
213          * mergefreelist. It is OK to use 0 as a null link in these lists,
214          * because memtuples[0] is part of the merge heap and is never a
215          * pre-read tuple.
216          */
217         bool            mergeactive[MAXTAPES];  /* Active input run source? */
218         int                     mergenext[MAXTAPES];    /* first preread tuple for each
219                                                                                  * source */
220         int                     mergelast[MAXTAPES];    /* last preread tuple for each
221                                                                                  * source */
222         long            mergeavailmem[MAXTAPES];                /* availMem for prereading
223                                                                                                  * tapes */
224         long            spacePerTape;   /* actual per-tape target usage */
225         int                     mergefreelist;  /* head of freelist of recycled slots */
226         int                     mergefirstfree; /* first slot never used in this merge */
227
228         /*
229          * Variables for Algorithm D.  Note that destTape is a "logical" tape
230          * number, ie, an index into the tp_xxx[] arrays.  Be careful to keep
231          * "logical" and "actual" tape numbers straight!
232          */
233         int                     Level;                  /* Knuth's l */
234         int                     destTape;               /* current output tape (Knuth's j, less 1) */
235         int                     tp_fib[MAXTAPES];               /* Target Fibonacci run counts
236                                                                                  * (A[]) */
237         int                     tp_runs[MAXTAPES];              /* # of real runs on each tape */
238         int                     tp_dummy[MAXTAPES];             /* # of dummy runs for each tape
239                                                                                  * (D[]) */
240         int                     tp_tapenum[MAXTAPES];   /* Actual tape numbers (TAPE[]) */
241
242         /*
243          * These variables are used after completion of sorting to keep track
244          * of the next tuple to return.  (In the tape case, the tape's current
245          * read position is also critical state.)
246          */
247         int                     result_tape;    /* actual tape number of finished output */
248         int                     current;                /* array index (only used if SORTEDINMEM) */
249         bool            eof_reached;    /* reached EOF (needed for cursors) */
250
251         /* markpos_xxx holds marked position for mark and restore */
252         long            markpos_block;  /* tape block# (only used if SORTEDONTAPE) */
253         int                     markpos_offset; /* saved "current", or offset in tape
254                                                                  * block */
255         bool            markpos_eof;    /* saved "eof_reached" */
256
257         /*
258          * These variables are specific to the HeapTuple case; they are set by
259          * tuplesort_begin_heap and used only by the HeapTuple routines.
260          */
261         TupleDesc       tupDesc;
262         int                     nKeys;
263         ScanKey         scanKeys;
264         SortFunctionKind *sortFnKinds;
265
266         /*
267          * These variables are specific to the IndexTuple case; they are set
268          * by tuplesort_begin_index and used only by the IndexTuple routines.
269          */
270         Relation        indexRel;
271         ScanKey         indexScanKey;
272         bool            enforceUnique;  /* complain if we find duplicate tuples */
273
274         /*
275          * These variables are specific to the Datum case; they are set by
276          * tuplesort_begin_datum and used only by the DatumTuple routines.
277          */
278         Oid                     datumType;
279         Oid                     sortOperator;
280         FmgrInfo        sortOpFn;               /* cached lookup data for sortOperator */
281         SortFunctionKind sortFnKind;
282         /* we need typelen and byval in order to know how to copy the Datums. */
283         int                     datumTypeLen;
284         bool            datumTypeByVal;
285 };
286
287 #define COMPARETUP(state,a,b)   ((*(state)->comparetup) (state, a, b))
288 #define COPYTUP(state,tup)      ((*(state)->copytup) (state, tup))
289 #define WRITETUP(state,tape,tup)        ((*(state)->writetup) (state, tape, tup))
290 #define READTUP(state,tape,len) ((*(state)->readtup) (state, tape, len))
291 #define LACKMEM(state)          ((state)->availMem < 0)
292 #define USEMEM(state,amt)       ((state)->availMem -= (amt))
293 #define FREEMEM(state,amt)      ((state)->availMem += (amt))
294
295 /*--------------------
296  *
297  * NOTES about on-tape representation of tuples:
298  *
299  * We require the first "unsigned int" of a stored tuple to be the total size
300  * on-tape of the tuple, including itself (so it is never zero; an all-zero
301  * unsigned int is used to delimit runs).  The remainder of the stored tuple
302  * may or may not match the in-memory representation of the tuple ---
303  * any conversion needed is the job of the writetup and readtup routines.
304  *
305  * If state->randomAccess is true, then the stored representation of the
306  * tuple must be followed by another "unsigned int" that is a copy of the
307  * length --- so the total tape space used is actually sizeof(unsigned int)
308  * more than the stored length value.  This allows read-backwards.      When
309  * randomAccess is not true, the write/read routines may omit the extra
310  * length word.
311  *
312  * writetup is expected to write both length words as well as the tuple
313  * data.  When readtup is called, the tape is positioned just after the
314  * front length word; readtup must read the tuple data and advance past
315  * the back length word (if present).
316  *
317  * The write/read routines can make use of the tuple description data
318  * stored in the Tuplesortstate record, if needed.      They are also expected
319  * to adjust state->availMem by the amount of memory space (not tape space!)
320  * released or consumed.  There is no error return from either writetup
321  * or readtup; they should ereport() on failure.
322  *
323  *
324  * NOTES about memory consumption calculations:
325  *
326  * We count space allocated for tuples against the workMem limit, plus
327  * the space used by the variable-size arrays memtuples and memtupindex.
328  * Fixed-size space (primarily the LogicalTapeSet I/O buffers) is not
329  * counted.
330  *
331  * Note that we count actual space used (as shown by GetMemoryChunkSpace)
332  * rather than the originally-requested size.  This is important since
333  * palloc can add substantial overhead.  It's not a complete answer since
334  * we won't count any wasted space in palloc allocation blocks, but it's
335  * a lot better than what we were doing before 7.3.
336  *
337  *--------------------
338  */
339
340 /*
341  * For sorting single Datums, we build "pseudo tuples" that just carry
342  * the datum's value and null flag.  For pass-by-reference data types,
343  * the actual data value appears after the DatumTupleHeader (MAXALIGNed,
344  * of course), and the value field in the header is just a pointer to it.
345  */
346
347 typedef struct
348 {
349         Datum           val;
350         bool            isNull;
351 } DatumTuple;
352
353
354 static Tuplesortstate *tuplesort_begin_common(int workMem, bool randomAccess);
355 static void puttuple_common(Tuplesortstate *state, void *tuple);
356 static void inittapes(Tuplesortstate *state);
357 static void selectnewtape(Tuplesortstate *state);
358 static void mergeruns(Tuplesortstate *state);
359 static void mergeonerun(Tuplesortstate *state);
360 static void beginmerge(Tuplesortstate *state);
361 static void mergepreread(Tuplesortstate *state);
362 static void dumptuples(Tuplesortstate *state, bool alltuples);
363 static void tuplesort_heap_insert(Tuplesortstate *state, void *tuple,
364                                           int tupleindex, bool checkIndex);
365 static void tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex);
366 static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
367 static void markrunend(Tuplesortstate *state, int tapenum);
368 static int      qsort_comparetup(const void *a, const void *b);
369 static int comparetup_heap(Tuplesortstate *state,
370                                 const void *a, const void *b);
371 static void *copytup_heap(Tuplesortstate *state, void *tup);
372 static void writetup_heap(Tuplesortstate *state, int tapenum, void *tup);
373 static void *readtup_heap(Tuplesortstate *state, int tapenum,
374                          unsigned int len);
375 static int comparetup_index(Tuplesortstate *state,
376                                  const void *a, const void *b);
377 static void *copytup_index(Tuplesortstate *state, void *tup);
378 static void writetup_index(Tuplesortstate *state, int tapenum, void *tup);
379 static void *readtup_index(Tuplesortstate *state, int tapenum,
380                           unsigned int len);
381 static int comparetup_datum(Tuplesortstate *state,
382                                  const void *a, const void *b);
383 static void *copytup_datum(Tuplesortstate *state, void *tup);
384 static void writetup_datum(Tuplesortstate *state, int tapenum, void *tup);
385 static void *readtup_datum(Tuplesortstate *state, int tapenum,
386                           unsigned int len);
387
388 /*
389  * Since qsort(3) will not pass any context info to qsort_comparetup(),
390  * we have to use this ugly static variable.  It is set to point to the
391  * active Tuplesortstate object just before calling qsort.      It should
392  * not be used directly by anything except qsort_comparetup().
393  */
394 static Tuplesortstate *qsort_tuplesortstate;
395
396
397 /*
398  *              tuplesort_begin_xxx
399  *
400  * Initialize for a tuple sort operation.
401  *
402  * After calling tuplesort_begin, the caller should call tuplesort_puttuple
403  * zero or more times, then call tuplesort_performsort when all the tuples
404  * have been supplied.  After performsort, retrieve the tuples in sorted
405  * order by calling tuplesort_gettuple until it returns NULL.  (If random
406  * access was requested, rescan, markpos, and restorepos can also be called.)
407  * For Datum sorts, putdatum/getdatum are used instead of puttuple/gettuple.
408  * Call tuplesort_end to terminate the operation and release memory/disk space.
409  *
410  * Each variant of tuplesort_begin has a workMem parameter specifying the
411  * maximum number of kilobytes of RAM to use before spilling data to disk.
412  * (The normal value of this parameter is work_mem, but some callers use
413  * other values.)  Each variant also has a randomAccess parameter specifying
414  * whether the caller needs non-sequential access to the sort result.
415  */
416
417 static Tuplesortstate *
418 tuplesort_begin_common(int workMem, bool randomAccess)
419 {
420         Tuplesortstate *state;
421
422         state = (Tuplesortstate *) palloc0(sizeof(Tuplesortstate));
423
424         state->status = TSS_INITIAL;
425         state->randomAccess = randomAccess;
426         state->availMem = workMem * 1024L;
427         state->tapeset = NULL;
428
429         state->memtupcount = 0;
430         state->memtupsize = 1024;       /* initial guess */
431         state->memtuples = (void **) palloc(state->memtupsize * sizeof(void *));
432
433         state->memtupindex = NULL;      /* until and unless needed */
434
435         USEMEM(state, GetMemoryChunkSpace(state->memtuples));
436
437         state->currentRun = 0;
438
439         /* Algorithm D variables will be initialized by inittapes, if needed */
440
441         state->result_tape = -1;        /* flag that result tape has not been
442                                                                  * formed */
443
444         return state;
445 }
446
447 Tuplesortstate *
448 tuplesort_begin_heap(TupleDesc tupDesc,
449                                          int nkeys,
450                                          Oid *sortOperators, AttrNumber *attNums,
451                                          int workMem, bool randomAccess)
452 {
453         Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
454         int                     i;
455
456         AssertArg(nkeys > 0);
457
458         state->comparetup = comparetup_heap;
459         state->copytup = copytup_heap;
460         state->writetup = writetup_heap;
461         state->readtup = readtup_heap;
462
463         state->tupDesc = tupDesc;
464         state->nKeys = nkeys;
465         state->scanKeys = (ScanKey) palloc0(nkeys * sizeof(ScanKeyData));
466         state->sortFnKinds = (SortFunctionKind *)
467                 palloc0(nkeys * sizeof(SortFunctionKind));
468
469         for (i = 0; i < nkeys; i++)
470         {
471                 RegProcedure sortFunction;
472
473                 AssertArg(sortOperators[i] != 0);
474                 AssertArg(attNums[i] != 0);
475
476                 /* select a function that implements the sort operator */
477                 SelectSortFunction(sortOperators[i], &sortFunction,
478                                                    &state->sortFnKinds[i]);
479
480                 /*
481                  * We needn't fill in sk_strategy or sk_subtype since these
482                  * scankeys will never be passed to an index.
483                  */
484                 ScanKeyInit(&state->scanKeys[i],
485                                         attNums[i],
486                                         InvalidStrategy,
487                                         sortFunction,
488                                         (Datum) 0);
489         }
490
491         return state;
492 }
493
494 Tuplesortstate *
495 tuplesort_begin_index(Relation indexRel,
496                                           bool enforceUnique,
497                                           int workMem, bool randomAccess)
498 {
499         Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
500
501         state->comparetup = comparetup_index;
502         state->copytup = copytup_index;
503         state->writetup = writetup_index;
504         state->readtup = readtup_index;
505
506         state->indexRel = indexRel;
507         /* see comments below about btree dependence of this code... */
508         state->indexScanKey = _bt_mkscankey_nodata(indexRel);
509         state->enforceUnique = enforceUnique;
510
511         return state;
512 }
513
514 Tuplesortstate *
515 tuplesort_begin_datum(Oid datumType,
516                                           Oid sortOperator,
517                                           int workMem, bool randomAccess)
518 {
519         Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
520         RegProcedure sortFunction;
521         int16           typlen;
522         bool            typbyval;
523
524         state->comparetup = comparetup_datum;
525         state->copytup = copytup_datum;
526         state->writetup = writetup_datum;
527         state->readtup = readtup_datum;
528
529         state->datumType = datumType;
530         state->sortOperator = sortOperator;
531
532         /* select a function that implements the sort operator */
533         SelectSortFunction(sortOperator, &sortFunction, &state->sortFnKind);
534         /* and look up the function */
535         fmgr_info(sortFunction, &state->sortOpFn);
536
537         /* lookup necessary attributes of the datum type */
538         get_typlenbyval(datumType, &typlen, &typbyval);
539         state->datumTypeLen = typlen;
540         state->datumTypeByVal = typbyval;
541
542         return state;
543 }
544
545 /*
546  * tuplesort_end
547  *
548  *      Release resources and clean up.
549  */
550 void
551 tuplesort_end(Tuplesortstate *state)
552 {
553         int                     i;
554
555         if (state->tapeset)
556                 LogicalTapeSetClose(state->tapeset);
557         if (state->memtuples)
558         {
559                 for (i = 0; i < state->memtupcount; i++)
560                         pfree(state->memtuples[i]);
561                 pfree(state->memtuples);
562         }
563         if (state->memtupindex)
564                 pfree(state->memtupindex);
565
566         /*
567          * this stuff might better belong in a variant-specific shutdown
568          * routine
569          */
570         if (state->scanKeys)
571                 pfree(state->scanKeys);
572         if (state->sortFnKinds)
573                 pfree(state->sortFnKinds);
574
575         pfree(state);
576 }
577
578 /*
579  * Accept one tuple while collecting input data for sort.
580  *
581  * Note that the input tuple is always copied; the caller need not save it.
582  */
583 void
584 tuplesort_puttuple(Tuplesortstate *state, void *tuple)
585 {
586         /*
587          * Copy the given tuple into memory we control, and decrease availMem.
588          * Then call the code shared with the Datum case.
589          */
590         tuple = COPYTUP(state, tuple);
591
592         puttuple_common(state, tuple);
593 }
594
595 /*
596  * Accept one Datum while collecting input data for sort.
597  *
598  * If the Datum is pass-by-ref type, the value will be copied.
599  */
600 void
601 tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull)
602 {
603         DatumTuple *tuple;
604
605         /*
606          * Build pseudo-tuple carrying the datum, and decrease availMem.
607          */
608         if (isNull || state->datumTypeByVal)
609         {
610                 tuple = (DatumTuple *) palloc(sizeof(DatumTuple));
611                 tuple->val = val;
612                 tuple->isNull = isNull;
613         }
614         else
615         {
616                 Size            datalen;
617                 Size            tuplelen;
618                 char       *newVal;
619
620                 datalen = datumGetSize(val, false, state->datumTypeLen);
621                 tuplelen = datalen + MAXALIGN(sizeof(DatumTuple));
622                 tuple = (DatumTuple *) palloc(tuplelen);
623                 newVal = ((char *) tuple) + MAXALIGN(sizeof(DatumTuple));
624                 memcpy(newVal, DatumGetPointer(val), datalen);
625                 tuple->val = PointerGetDatum(newVal);
626                 tuple->isNull = false;
627         }
628
629         USEMEM(state, GetMemoryChunkSpace(tuple));
630
631         puttuple_common(state, (void *) tuple);
632 }
633
634 /*
635  * Shared code for tuple and datum cases.
636  */
637 static void
638 puttuple_common(Tuplesortstate *state, void *tuple)
639 {
640         switch (state->status)
641         {
642                 case TSS_INITIAL:
643
644                         /*
645                          * Save the copied tuple into the unsorted array.
646                          */
647                         if (state->memtupcount >= state->memtupsize)
648                         {
649                                 /* Grow the unsorted array as needed. */
650                                 FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
651                                 state->memtupsize *= 2;
652                                 state->memtuples = (void **)
653                                         repalloc(state->memtuples,
654                                                          state->memtupsize * sizeof(void *));
655                                 USEMEM(state, GetMemoryChunkSpace(state->memtuples));
656                         }
657                         state->memtuples[state->memtupcount++] = tuple;
658
659                         /*
660                          * Done if we still fit in available memory.
661                          */
662                         if (!LACKMEM(state))
663                                 return;
664
665                         /*
666                          * Nope; time to switch to tape-based operation.
667                          */
668                         inittapes(state);
669
670                         /*
671                          * Dump tuples until we are back under the limit.
672                          */
673                         dumptuples(state, false);
674                         break;
675                 case TSS_BUILDRUNS:
676
677                         /*
678                          * Insert the copied tuple into the heap, with run number
679                          * currentRun if it can go into the current run, else run
680                          * number currentRun+1.  The tuple can go into the current run
681                          * if it is >= the first not-yet-output tuple.  (Actually, it
682                          * could go into the current run if it is >= the most recently
683                          * output tuple ... but that would require keeping around the
684                          * tuple we last output, and it's simplest to let writetup
685                          * free each tuple as soon as it's written.)
686                          *
687                          * Note there will always be at least one tuple in the heap at
688                          * this point; see dumptuples.
689                          */
690                         Assert(state->memtupcount > 0);
691                         if (COMPARETUP(state, tuple, state->memtuples[0]) >= 0)
692                                 tuplesort_heap_insert(state, tuple, state->currentRun, true);
693                         else
694                                 tuplesort_heap_insert(state, tuple, state->currentRun + 1, true);
695
696                         /*
697                          * If we are over the memory limit, dump tuples till we're
698                          * under.
699                          */
700                         dumptuples(state, false);
701                         break;
702                 default:
703                         elog(ERROR, "invalid tuplesort state");
704                         break;
705         }
706 }
707
708 /*
709  * All tuples have been provided; finish the sort.
710  */
711 void
712 tuplesort_performsort(Tuplesortstate *state)
713 {
714         switch (state->status)
715         {
716                 case TSS_INITIAL:
717
718                         /*
719                          * We were able to accumulate all the tuples within the
720                          * allowed amount of memory.  Just qsort 'em and we're done.
721                          */
722                         if (state->memtupcount > 1)
723                         {
724                                 qsort_tuplesortstate = state;
725                                 qsort((void *) state->memtuples, state->memtupcount,
726                                           sizeof(void *), qsort_comparetup);
727                         }
728                         state->current = 0;
729                         state->eof_reached = false;
730                         state->markpos_offset = 0;
731                         state->markpos_eof = false;
732                         state->status = TSS_SORTEDINMEM;
733                         break;
734                 case TSS_BUILDRUNS:
735
736                         /*
737                          * Finish tape-based sort.      First, flush all tuples remaining
738                          * in memory out to tape; then merge until we have a single
739                          * remaining run (or, if !randomAccess, one run per tape).
740                          * Note that mergeruns sets the correct state->status.
741                          */
742                         dumptuples(state, true);
743                         mergeruns(state);
744                         state->eof_reached = false;
745                         state->markpos_block = 0L;
746                         state->markpos_offset = 0;
747                         state->markpos_eof = false;
748                         break;
749                 default:
750                         elog(ERROR, "invalid tuplesort state");
751                         break;
752         }
753 }
754
755 /*
756  * Fetch the next tuple in either forward or back direction.
757  * Returns NULL if no more tuples.      If should_free is set, the
758  * caller must pfree the returned tuple when done with it.
759  */
760 void *
761 tuplesort_gettuple(Tuplesortstate *state, bool forward,
762                                    bool *should_free)
763 {
764         unsigned int tuplen;
765         void       *tup;
766
767         switch (state->status)
768         {
769                 case TSS_SORTEDINMEM:
770                         Assert(forward || state->randomAccess);
771                         *should_free = false;
772                         if (forward)
773                         {
774                                 if (state->current < state->memtupcount)
775                                         return state->memtuples[state->current++];
776                                 state->eof_reached = true;
777                                 return NULL;
778                         }
779                         else
780                         {
781                                 if (state->current <= 0)
782                                         return NULL;
783
784                                 /*
785                                  * if all tuples are fetched already then we return last
786                                  * tuple, else - tuple before last returned.
787                                  */
788                                 if (state->eof_reached)
789                                         state->eof_reached = false;
790                                 else
791                                 {
792                                         state->current--;       /* last returned tuple */
793                                         if (state->current <= 0)
794                                                 return NULL;
795                                 }
796                                 return state->memtuples[state->current - 1];
797                         }
798                         break;
799
800                 case TSS_SORTEDONTAPE:
801                         Assert(forward || state->randomAccess);
802                         *should_free = true;
803                         if (forward)
804                         {
805                                 if (state->eof_reached)
806                                         return NULL;
807                                 if ((tuplen = getlen(state, state->result_tape, true)) != 0)
808                                 {
809                                         tup = READTUP(state, state->result_tape, tuplen);
810                                         return tup;
811                                 }
812                                 else
813                                 {
814                                         state->eof_reached = true;
815                                         return NULL;
816                                 }
817                         }
818
819                         /*
820                          * Backward.
821                          *
822                          * if all tuples are fetched already then we return last tuple,
823                          * else - tuple before last returned.
824                          */
825                         if (state->eof_reached)
826                         {
827                                 /*
828                                  * Seek position is pointing just past the zero tuplen at
829                                  * the end of file; back up to fetch last tuple's ending
830                                  * length word.  If seek fails we must have a completely
831                                  * empty file.
832                                  */
833                                 if (!LogicalTapeBackspace(state->tapeset,
834                                                                                   state->result_tape,
835                                                                                   2 * sizeof(unsigned int)))
836                                         return NULL;
837                                 state->eof_reached = false;
838                         }
839                         else
840                         {
841                                 /*
842                                  * Back up and fetch previously-returned tuple's ending
843                                  * length word.  If seek fails, assume we are at start of
844                                  * file.
845                                  */
846                                 if (!LogicalTapeBackspace(state->tapeset,
847                                                                                   state->result_tape,
848                                                                                   sizeof(unsigned int)))
849                                         return NULL;
850                                 tuplen = getlen(state, state->result_tape, false);
851
852                                 /*
853                                  * Back up to get ending length word of tuple before it.
854                                  */
855                                 if (!LogicalTapeBackspace(state->tapeset,
856                                                                                   state->result_tape,
857                                                                           tuplen + 2 * sizeof(unsigned int)))
858                                 {
859                                         /*
860                                          * If that fails, presumably the prev tuple is the
861                                          * first in the file.  Back up so that it becomes next
862                                          * to read in forward direction (not obviously right,
863                                          * but that is what in-memory case does).
864                                          */
865                                         if (!LogicalTapeBackspace(state->tapeset,
866                                                                                           state->result_tape,
867                                                                                   tuplen + sizeof(unsigned int)))
868                                                 elog(ERROR, "bogus tuple length in backward scan");
869                                         return NULL;
870                                 }
871                         }
872
873                         tuplen = getlen(state, state->result_tape, false);
874
875                         /*
876                          * Now we have the length of the prior tuple, back up and read
877                          * it. Note: READTUP expects we are positioned after the
878                          * initial length word of the tuple, so back up to that point.
879                          */
880                         if (!LogicalTapeBackspace(state->tapeset,
881                                                                           state->result_tape,
882                                                                           tuplen))
883                                 elog(ERROR, "bogus tuple length in backward scan");
884                         tup = READTUP(state, state->result_tape, tuplen);
885                         return tup;
886
887                 case TSS_FINALMERGE:
888                         Assert(forward);
889                         *should_free = true;
890
891                         /*
892                          * This code should match the inner loop of mergeonerun().
893                          */
894                         if (state->memtupcount > 0)
895                         {
896                                 int                     srcTape = state->memtupindex[0];
897                                 Size            tuplen;
898                                 int                     tupIndex;
899                                 void       *newtup;
900
901                                 tup = state->memtuples[0];
902                                 /* returned tuple is no longer counted in our memory space */
903                                 tuplen = GetMemoryChunkSpace(tup);
904                                 state->availMem += tuplen;
905                                 state->mergeavailmem[srcTape] += tuplen;
906                                 tuplesort_heap_siftup(state, false);
907                                 if ((tupIndex = state->mergenext[srcTape]) == 0)
908                                 {
909                                         /*
910                                          * out of preloaded data on this tape, try to read
911                                          * more
912                                          */
913                                         mergepreread(state);
914
915                                         /*
916                                          * if still no data, we've reached end of run on this
917                                          * tape
918                                          */
919                                         if ((tupIndex = state->mergenext[srcTape]) == 0)
920                                                 return tup;
921                                 }
922                                 /* pull next preread tuple from list, insert in heap */
923                                 newtup = state->memtuples[tupIndex];
924                                 state->mergenext[srcTape] = state->memtupindex[tupIndex];
925                                 if (state->mergenext[srcTape] == 0)
926                                         state->mergelast[srcTape] = 0;
927                                 state->memtupindex[tupIndex] = state->mergefreelist;
928                                 state->mergefreelist = tupIndex;
929                                 tuplesort_heap_insert(state, newtup, srcTape, false);
930                                 return tup;
931                         }
932                         return NULL;
933
934                 default:
935                         elog(ERROR, "invalid tuplesort state");
936                         return NULL;            /* keep compiler quiet */
937         }
938 }
939
940 /*
941  * Fetch the next Datum in either forward or back direction.
942  * Returns FALSE if no more datums.
943  *
944  * If the Datum is pass-by-ref type, the returned value is freshly palloc'd
945  * and is now owned by the caller.
946  */
947 bool
948 tuplesort_getdatum(Tuplesortstate *state, bool forward,
949                                    Datum *val, bool *isNull)
950 {
951         DatumTuple *tuple;
952         bool            should_free;
953
954         tuple = (DatumTuple *) tuplesort_gettuple(state, forward, &should_free);
955
956         if (tuple == NULL)
957                 return false;
958
959         if (tuple->isNull || state->datumTypeByVal)
960         {
961                 *val = tuple->val;
962                 *isNull = tuple->isNull;
963         }
964         else
965         {
966                 *val = datumCopy(tuple->val, false, state->datumTypeLen);
967                 *isNull = false;
968         }
969
970         if (should_free)
971                 pfree(tuple);
972
973         return true;
974 }
975
976
977 /*
978  * inittapes - initialize for tape sorting.
979  *
980  * This is called only if we have found we don't have room to sort in memory.
981  */
982 static void
983 inittapes(Tuplesortstate *state)
984 {
985         int                     ntuples,
986                                 j;
987
988         state->tapeset = LogicalTapeSetCreate(MAXTAPES);
989
990         /*
991          * Allocate the memtupindex array, same size as memtuples.
992          */
993         state->memtupindex = (int *) palloc(state->memtupsize * sizeof(int));
994
995         USEMEM(state, GetMemoryChunkSpace(state->memtupindex));
996
997         /*
998          * Convert the unsorted contents of memtuples[] into a heap. Each
999          * tuple is marked as belonging to run number zero.
1000          *
1001          * NOTE: we pass false for checkIndex since there's no point in comparing
1002          * indexes in this step, even though we do intend the indexes to be
1003          * part of the sort key...
1004          */
1005         ntuples = state->memtupcount;
1006         state->memtupcount = 0;         /* make the heap empty */
1007         for (j = 0; j < ntuples; j++)
1008                 tuplesort_heap_insert(state, state->memtuples[j], 0, false);
1009         Assert(state->memtupcount == ntuples);
1010
1011         state->currentRun = 0;
1012
1013         /*
1014          * Initialize variables of Algorithm D (step D1).
1015          */
1016         for (j = 0; j < MAXTAPES; j++)
1017         {
1018                 state->tp_fib[j] = 1;
1019                 state->tp_runs[j] = 0;
1020                 state->tp_dummy[j] = 1;
1021                 state->tp_tapenum[j] = j;
1022         }
1023         state->tp_fib[TAPERANGE] = 0;
1024         state->tp_dummy[TAPERANGE] = 0;
1025
1026         state->Level = 1;
1027         state->destTape = 0;
1028
1029         state->status = TSS_BUILDRUNS;
1030 }
1031
1032 /*
1033  * selectnewtape -- select new tape for new initial run.
1034  *
1035  * This is called after finishing a run when we know another run
1036  * must be started.  This implements steps D3, D4 of Algorithm D.
1037  */
1038 static void
1039 selectnewtape(Tuplesortstate *state)
1040 {
1041         int                     j;
1042         int                     a;
1043
1044         /* Step D3: advance j (destTape) */
1045         if (state->tp_dummy[state->destTape] < state->tp_dummy[state->destTape + 1])
1046         {
1047                 state->destTape++;
1048                 return;
1049         }
1050         if (state->tp_dummy[state->destTape] != 0)
1051         {
1052                 state->destTape = 0;
1053                 return;
1054         }
1055
1056         /* Step D4: increase level */
1057         state->Level++;
1058         a = state->tp_fib[0];
1059         for (j = 0; j < TAPERANGE; j++)
1060         {
1061                 state->tp_dummy[j] = a + state->tp_fib[j + 1] - state->tp_fib[j];
1062                 state->tp_fib[j] = a + state->tp_fib[j + 1];
1063         }
1064         state->destTape = 0;
1065 }
1066
1067 /*
1068  * mergeruns -- merge all the completed initial runs.
1069  *
1070  * This implements steps D5, D6 of Algorithm D.  All input data has
1071  * already been written to initial runs on tape (see dumptuples).
1072  */
1073 static void
1074 mergeruns(Tuplesortstate *state)
1075 {
1076         int                     tapenum,
1077                                 svTape,
1078                                 svRuns,
1079                                 svDummy;
1080
1081         Assert(state->status == TSS_BUILDRUNS);
1082         Assert(state->memtupcount == 0);
1083
1084         /*
1085          * If we produced only one initial run (quite likely if the total data
1086          * volume is between 1X and 2X workMem), we can just use that tape as
1087          * the finished output, rather than doing a useless merge.
1088          */
1089         if (state->currentRun == 1)
1090         {
1091                 state->result_tape = state->tp_tapenum[state->destTape];
1092                 /* must freeze and rewind the finished output tape */
1093                 LogicalTapeFreeze(state->tapeset, state->result_tape);
1094                 state->status = TSS_SORTEDONTAPE;
1095                 return;
1096         }
1097
1098         /* End of step D2: rewind all output tapes to prepare for merging */
1099         for (tapenum = 0; tapenum < TAPERANGE; tapenum++)
1100                 LogicalTapeRewind(state->tapeset, tapenum, false);
1101
1102         for (;;)
1103         {
1104                 /* Step D5: merge runs onto tape[T] until tape[P] is empty */
1105                 while (state->tp_runs[TAPERANGE - 1] || state->tp_dummy[TAPERANGE - 1])
1106                 {
1107                         bool            allDummy = true;
1108                         bool            allOneRun = true;
1109
1110                         for (tapenum = 0; tapenum < TAPERANGE; tapenum++)
1111                         {
1112                                 if (state->tp_dummy[tapenum] == 0)
1113                                         allDummy = false;
1114                                 if (state->tp_runs[tapenum] + state->tp_dummy[tapenum] != 1)
1115                                         allOneRun = false;
1116                         }
1117
1118                         /*
1119                          * If we don't have to produce a materialized sorted tape,
1120                          * quit as soon as we're down to one real/dummy run per tape.
1121                          */
1122                         if (!state->randomAccess && allOneRun)
1123                         {
1124                                 Assert(!allDummy);
1125                                 /* Initialize for the final merge pass */
1126                                 beginmerge(state);
1127                                 state->status = TSS_FINALMERGE;
1128                                 return;
1129                         }
1130                         if (allDummy)
1131                         {
1132                                 state->tp_dummy[TAPERANGE]++;
1133                                 for (tapenum = 0; tapenum < TAPERANGE; tapenum++)
1134                                         state->tp_dummy[tapenum]--;
1135                         }
1136                         else
1137                                 mergeonerun(state);
1138                 }
1139                 /* Step D6: decrease level */
1140                 if (--state->Level == 0)
1141                         break;
1142                 /* rewind output tape T to use as new input */
1143                 LogicalTapeRewind(state->tapeset, state->tp_tapenum[TAPERANGE],
1144                                                   false);
1145                 /* rewind used-up input tape P, and prepare it for write pass */
1146                 LogicalTapeRewind(state->tapeset, state->tp_tapenum[TAPERANGE - 1],
1147                                                   true);
1148                 state->tp_runs[TAPERANGE - 1] = 0;
1149
1150                 /*
1151                  * reassign tape units per step D6; note we no longer care about
1152                  * A[]
1153                  */
1154                 svTape = state->tp_tapenum[TAPERANGE];
1155                 svDummy = state->tp_dummy[TAPERANGE];
1156                 svRuns = state->tp_runs[TAPERANGE];
1157                 for (tapenum = TAPERANGE; tapenum > 0; tapenum--)
1158                 {
1159                         state->tp_tapenum[tapenum] = state->tp_tapenum[tapenum - 1];
1160                         state->tp_dummy[tapenum] = state->tp_dummy[tapenum - 1];
1161                         state->tp_runs[tapenum] = state->tp_runs[tapenum - 1];
1162                 }
1163                 state->tp_tapenum[0] = svTape;
1164                 state->tp_dummy[0] = svDummy;
1165                 state->tp_runs[0] = svRuns;
1166         }
1167
1168         /*
1169          * Done.  Knuth says that the result is on TAPE[1], but since we
1170          * exited the loop without performing the last iteration of step D6,
1171          * we have not rearranged the tape unit assignment, and therefore the
1172          * result is on TAPE[T].  We need to do it this way so that we can
1173          * freeze the final output tape while rewinding it.  The last
1174          * iteration of step D6 would be a waste of cycles anyway...
1175          */
1176         state->result_tape = state->tp_tapenum[TAPERANGE];
1177         LogicalTapeFreeze(state->tapeset, state->result_tape);
1178         state->status = TSS_SORTEDONTAPE;
1179 }
1180
1181 /*
1182  * Merge one run from each input tape, except ones with dummy runs.
1183  *
1184  * This is the inner loop of Algorithm D step D5.  We know that the
1185  * output tape is TAPE[T].
1186  */
1187 static void
1188 mergeonerun(Tuplesortstate *state)
1189 {
1190         int                     destTape = state->tp_tapenum[TAPERANGE];
1191         int                     srcTape;
1192         int                     tupIndex;
1193         void       *tup;
1194         long            priorAvail,
1195                                 spaceFreed;
1196
1197         /*
1198          * Start the merge by loading one tuple from each active source tape
1199          * into the heap.  We can also decrease the input run/dummy run
1200          * counts.
1201          */
1202         beginmerge(state);
1203
1204         /*
1205          * Execute merge by repeatedly extracting lowest tuple in heap,
1206          * writing it out, and replacing it with next tuple from same tape (if
1207          * there is another one).
1208          */
1209         while (state->memtupcount > 0)
1210         {
1211                 CHECK_FOR_INTERRUPTS();
1212                 /* write the tuple to destTape */
1213                 priorAvail = state->availMem;
1214                 srcTape = state->memtupindex[0];
1215                 WRITETUP(state, destTape, state->memtuples[0]);
1216                 /* writetup adjusted total free space, now fix per-tape space */
1217                 spaceFreed = state->availMem - priorAvail;
1218                 state->mergeavailmem[srcTape] += spaceFreed;
1219                 /* compact the heap */
1220                 tuplesort_heap_siftup(state, false);
1221                 if ((tupIndex = state->mergenext[srcTape]) == 0)
1222                 {
1223                         /* out of preloaded data on this tape, try to read more */
1224                         mergepreread(state);
1225                         /* if still no data, we've reached end of run on this tape */
1226                         if ((tupIndex = state->mergenext[srcTape]) == 0)
1227                                 continue;
1228                 }
1229                 /* pull next preread tuple from list, insert in heap */
1230                 tup = state->memtuples[tupIndex];
1231                 state->mergenext[srcTape] = state->memtupindex[tupIndex];
1232                 if (state->mergenext[srcTape] == 0)
1233                         state->mergelast[srcTape] = 0;
1234                 state->memtupindex[tupIndex] = state->mergefreelist;
1235                 state->mergefreelist = tupIndex;
1236                 tuplesort_heap_insert(state, tup, srcTape, false);
1237         }
1238
1239         /*
1240          * When the heap empties, we're done.  Write an end-of-run marker on
1241          * the output tape, and increment its count of real runs.
1242          */
1243         markrunend(state, destTape);
1244         state->tp_runs[TAPERANGE]++;
1245 }
1246
1247 /*
1248  * beginmerge - initialize for a merge pass
1249  *
1250  * We decrease the counts of real and dummy runs for each tape, and mark
1251  * which tapes contain active input runs in mergeactive[].      Then, load
1252  * as many tuples as we can from each active input tape, and finally
1253  * fill the merge heap with the first tuple from each active tape.
1254  */
1255 static void
1256 beginmerge(Tuplesortstate *state)
1257 {
1258         int                     activeTapes;
1259         int                     tapenum;
1260         int                     srcTape;
1261
1262         /* Heap should be empty here */
1263         Assert(state->memtupcount == 0);
1264
1265         /* Clear merge-pass state variables */
1266         memset(state->mergeactive, 0, sizeof(state->mergeactive));
1267         memset(state->mergenext, 0, sizeof(state->mergenext));
1268         memset(state->mergelast, 0, sizeof(state->mergelast));
1269         memset(state->mergeavailmem, 0, sizeof(state->mergeavailmem));
1270         state->mergefreelist = 0;       /* nothing in the freelist */
1271         state->mergefirstfree = MAXTAPES;       /* first slot available for
1272                                                                                  * preread */
1273
1274         /* Adjust run counts and mark the active tapes */
1275         activeTapes = 0;
1276         for (tapenum = 0; tapenum < TAPERANGE; tapenum++)
1277         {
1278                 if (state->tp_dummy[tapenum] > 0)
1279                         state->tp_dummy[tapenum]--;
1280                 else
1281                 {
1282                         Assert(state->tp_runs[tapenum] > 0);
1283                         state->tp_runs[tapenum]--;
1284                         srcTape = state->tp_tapenum[tapenum];
1285                         state->mergeactive[srcTape] = true;
1286                         activeTapes++;
1287                 }
1288         }
1289
1290         /*
1291          * Initialize space allocation to let each active input tape have an
1292          * equal share of preread space.
1293          */
1294         Assert(activeTapes > 0);
1295         state->spacePerTape = state->availMem / activeTapes;
1296         for (srcTape = 0; srcTape < MAXTAPES; srcTape++)
1297         {
1298                 if (state->mergeactive[srcTape])
1299                         state->mergeavailmem[srcTape] = state->spacePerTape;
1300         }
1301
1302         /*
1303          * Preread as many tuples as possible (and at least one) from each
1304          * active tape
1305          */
1306         mergepreread(state);
1307
1308         /* Load the merge heap with the first tuple from each input tape */
1309         for (srcTape = 0; srcTape < MAXTAPES; srcTape++)
1310         {
1311                 int                     tupIndex = state->mergenext[srcTape];
1312                 void       *tup;
1313
1314                 if (tupIndex)
1315                 {
1316                         tup = state->memtuples[tupIndex];
1317                         state->mergenext[srcTape] = state->memtupindex[tupIndex];
1318                         if (state->mergenext[srcTape] == 0)
1319                                 state->mergelast[srcTape] = 0;
1320                         state->memtupindex[tupIndex] = state->mergefreelist;
1321                         state->mergefreelist = tupIndex;
1322                         tuplesort_heap_insert(state, tup, srcTape, false);
1323                 }
1324         }
1325 }
1326
1327 /*
1328  * mergepreread - load tuples from merge input tapes
1329  *
1330  * This routine exists to improve sequentiality of reads during a merge pass,
1331  * as explained in the header comments of this file.  Load tuples from each
1332  * active source tape until the tape's run is exhausted or it has used up
1333  * its fair share of available memory.  In any case, we guarantee that there
1334  * is at one preread tuple available from each unexhausted input tape.
1335  */
1336 static void
1337 mergepreread(Tuplesortstate *state)
1338 {
1339         int                     srcTape;
1340         unsigned int tuplen;
1341         void       *tup;
1342         int                     tupIndex;
1343         long            priorAvail,
1344                                 spaceUsed;
1345
1346         for (srcTape = 0; srcTape < MAXTAPES; srcTape++)
1347         {
1348                 if (!state->mergeactive[srcTape])
1349                         continue;
1350
1351                 /*
1352                  * Skip reading from any tape that still has at least half of its
1353                  * target memory filled with tuples (threshold fraction may need
1354                  * adjustment?).  This avoids reading just a few tuples when the
1355                  * incoming runs are not being consumed evenly.
1356                  */
1357                 if (state->mergenext[srcTape] != 0 &&
1358                         state->mergeavailmem[srcTape] <= state->spacePerTape / 2)
1359                         continue;
1360
1361                 /*
1362                  * Read tuples from this tape until it has used up its free
1363                  * memory, but ensure that we have at least one.
1364                  */
1365                 priorAvail = state->availMem;
1366                 state->availMem = state->mergeavailmem[srcTape];
1367                 while (!LACKMEM(state) || state->mergenext[srcTape] == 0)
1368                 {
1369                         /* read next tuple, if any */
1370                         if ((tuplen = getlen(state, srcTape, true)) == 0)
1371                         {
1372                                 state->mergeactive[srcTape] = false;
1373                                 break;
1374                         }
1375                         tup = READTUP(state, srcTape, tuplen);
1376                         /* find or make a free slot in memtuples[] for it */
1377                         tupIndex = state->mergefreelist;
1378                         if (tupIndex)
1379                                 state->mergefreelist = state->memtupindex[tupIndex];
1380                         else
1381                         {
1382                                 tupIndex = state->mergefirstfree++;
1383                                 /* Might need to enlarge arrays! */
1384                                 if (tupIndex >= state->memtupsize)
1385                                 {
1386                                         FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
1387                                         FREEMEM(state, GetMemoryChunkSpace(state->memtupindex));
1388                                         state->memtupsize *= 2;
1389                                         state->memtuples = (void **)
1390                                                 repalloc(state->memtuples,
1391                                                                  state->memtupsize * sizeof(void *));
1392                                         state->memtupindex = (int *)
1393                                                 repalloc(state->memtupindex,
1394                                                                  state->memtupsize * sizeof(int));
1395                                         USEMEM(state, GetMemoryChunkSpace(state->memtuples));
1396                                         USEMEM(state, GetMemoryChunkSpace(state->memtupindex));
1397                                 }
1398                         }
1399                         /* store tuple, append to list for its tape */
1400                         state->memtuples[tupIndex] = tup;
1401                         state->memtupindex[tupIndex] = 0;
1402                         if (state->mergelast[srcTape])
1403                                 state->memtupindex[state->mergelast[srcTape]] = tupIndex;
1404                         else
1405                                 state->mergenext[srcTape] = tupIndex;
1406                         state->mergelast[srcTape] = tupIndex;
1407                 }
1408                 /* update per-tape and global availmem counts */
1409                 spaceUsed = state->mergeavailmem[srcTape] - state->availMem;
1410                 state->mergeavailmem[srcTape] = state->availMem;
1411                 state->availMem = priorAvail - spaceUsed;
1412         }
1413 }
1414
1415 /*
1416  * dumptuples - remove tuples from heap and write to tape
1417  *
1418  * This is used during initial-run building, but not during merging.
1419  *
1420  * When alltuples = false, dump only enough tuples to get under the
1421  * availMem limit (and leave at least one tuple in the heap in any case,
1422  * since puttuple assumes it always has a tuple to compare to).
1423  *
1424  * When alltuples = true, dump everything currently in memory.
1425  * (This case is only used at end of input data.)
1426  *
1427  * If we empty the heap, close out the current run and return (this should
1428  * only happen at end of input data).  If we see that the tuple run number
1429  * at the top of the heap has changed, start a new run.
1430  */
1431 static void
1432 dumptuples(Tuplesortstate *state, bool alltuples)
1433 {
1434         while (alltuples ||
1435                    (LACKMEM(state) && state->memtupcount > 1))
1436         {
1437                 /*
1438                  * Dump the heap's frontmost entry, and sift up to remove it from
1439                  * the heap.
1440                  */
1441                 Assert(state->memtupcount > 0);
1442                 WRITETUP(state, state->tp_tapenum[state->destTape],
1443                                  state->memtuples[0]);
1444                 tuplesort_heap_siftup(state, true);
1445
1446                 /*
1447                  * If the heap is empty *or* top run number has changed, we've
1448                  * finished the current run.
1449                  */
1450                 if (state->memtupcount == 0 ||
1451                         state->currentRun != state->memtupindex[0])
1452                 {
1453                         markrunend(state, state->tp_tapenum[state->destTape]);
1454                         state->currentRun++;
1455                         state->tp_runs[state->destTape]++;
1456                         state->tp_dummy[state->destTape]--; /* per Alg D step D2 */
1457
1458                         /*
1459                          * Done if heap is empty, else prepare for new run.
1460                          */
1461                         if (state->memtupcount == 0)
1462                                 break;
1463                         Assert(state->currentRun == state->memtupindex[0]);
1464                         selectnewtape(state);
1465                 }
1466         }
1467 }
1468
1469 /*
1470  * tuplesort_rescan             - rewind and replay the scan
1471  */
1472 void
1473 tuplesort_rescan(Tuplesortstate *state)
1474 {
1475         Assert(state->randomAccess);
1476
1477         switch (state->status)
1478         {
1479                 case TSS_SORTEDINMEM:
1480                         state->current = 0;
1481                         state->eof_reached = false;
1482                         state->markpos_offset = 0;
1483                         state->markpos_eof = false;
1484                         break;
1485                 case TSS_SORTEDONTAPE:
1486                         LogicalTapeRewind(state->tapeset,
1487                                                           state->result_tape,
1488                                                           false);
1489                         state->eof_reached = false;
1490                         state->markpos_block = 0L;
1491                         state->markpos_offset = 0;
1492                         state->markpos_eof = false;
1493                         break;
1494                 default:
1495                         elog(ERROR, "invalid tuplesort state");
1496                         break;
1497         }
1498 }
1499
1500 /*
1501  * tuplesort_markpos    - saves current position in the merged sort file
1502  */
1503 void
1504 tuplesort_markpos(Tuplesortstate *state)
1505 {
1506         Assert(state->randomAccess);
1507
1508         switch (state->status)
1509         {
1510                 case TSS_SORTEDINMEM:
1511                         state->markpos_offset = state->current;
1512                         state->markpos_eof = state->eof_reached;
1513                         break;
1514                 case TSS_SORTEDONTAPE:
1515                         LogicalTapeTell(state->tapeset,
1516                                                         state->result_tape,
1517                                                         &state->markpos_block,
1518                                                         &state->markpos_offset);
1519                         state->markpos_eof = state->eof_reached;
1520                         break;
1521                 default:
1522                         elog(ERROR, "invalid tuplesort state");
1523                         break;
1524         }
1525 }
1526
1527 /*
1528  * tuplesort_restorepos - restores current position in merged sort file to
1529  *                                                last saved position
1530  */
1531 void
1532 tuplesort_restorepos(Tuplesortstate *state)
1533 {
1534         Assert(state->randomAccess);
1535
1536         switch (state->status)
1537         {
1538                 case TSS_SORTEDINMEM:
1539                         state->current = state->markpos_offset;
1540                         state->eof_reached = state->markpos_eof;
1541                         break;
1542                 case TSS_SORTEDONTAPE:
1543                         if (!LogicalTapeSeek(state->tapeset,
1544                                                                  state->result_tape,
1545                                                                  state->markpos_block,
1546                                                                  state->markpos_offset))
1547                                 elog(ERROR, "tuplesort_restorepos failed");
1548                         state->eof_reached = state->markpos_eof;
1549                         break;
1550                 default:
1551                         elog(ERROR, "invalid tuplesort state");
1552                         break;
1553         }
1554 }
1555
1556
1557 /*
1558  * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
1559  *
1560  * The heap lives in state->memtuples[], with parallel data storage
1561  * for indexes in state->memtupindex[].  If checkIndex is true, use
1562  * the tuple index as the front of the sort key; otherwise, no.
1563  */
1564
1565 #define HEAPCOMPARE(tup1,index1,tup2,index2) \
1566         (checkIndex && (index1 != index2) ? index1 - index2 : \
1567          COMPARETUP(state, tup1, tup2))
1568
1569 /*
1570  * Insert a new tuple into an empty or existing heap, maintaining the
1571  * heap invariant.
1572  */
1573 static void
1574 tuplesort_heap_insert(Tuplesortstate *state, void *tuple,
1575                                           int tupleindex, bool checkIndex)
1576 {
1577         void      **memtuples;
1578         int                *memtupindex;
1579         int                     j;
1580
1581         /*
1582          * Make sure memtuples[] can handle another entry.
1583          */
1584         if (state->memtupcount >= state->memtupsize)
1585         {
1586                 FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
1587                 FREEMEM(state, GetMemoryChunkSpace(state->memtupindex));
1588                 state->memtupsize *= 2;
1589                 state->memtuples = (void **)
1590                         repalloc(state->memtuples,
1591                                          state->memtupsize * sizeof(void *));
1592                 state->memtupindex = (int *)
1593                         repalloc(state->memtupindex,
1594                                          state->memtupsize * sizeof(int));
1595                 USEMEM(state, GetMemoryChunkSpace(state->memtuples));
1596                 USEMEM(state, GetMemoryChunkSpace(state->memtupindex));
1597         }
1598         memtuples = state->memtuples;
1599         memtupindex = state->memtupindex;
1600
1601         /*
1602          * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth
1603          * is using 1-based array indexes, not 0-based.
1604          */
1605         j = state->memtupcount++;
1606         while (j > 0)
1607         {
1608                 int                     i = (j - 1) >> 1;
1609
1610                 if (HEAPCOMPARE(tuple, tupleindex,
1611                                                 memtuples[i], memtupindex[i]) >= 0)
1612                         break;
1613                 memtuples[j] = memtuples[i];
1614                 memtupindex[j] = memtupindex[i];
1615                 j = i;
1616         }
1617         memtuples[j] = tuple;
1618         memtupindex[j] = tupleindex;
1619 }
1620
1621 /*
1622  * The tuple at state->memtuples[0] has been removed from the heap.
1623  * Decrement memtupcount, and sift up to maintain the heap invariant.
1624  */
1625 static void
1626 tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex)
1627 {
1628         void      **memtuples = state->memtuples;
1629         int                *memtupindex = state->memtupindex;
1630         void       *tuple;
1631         int                     tupindex,
1632                                 i,
1633                                 n;
1634
1635         if (--state->memtupcount <= 0)
1636                 return;
1637         n = state->memtupcount;
1638         tuple = memtuples[n];           /* tuple that must be reinserted */
1639         tupindex = memtupindex[n];
1640         i = 0;                                          /* i is where the "hole" is */
1641         for (;;)
1642         {
1643                 int                     j = 2 * i + 1;
1644
1645                 if (j >= n)
1646                         break;
1647                 if (j + 1 < n &&
1648                         HEAPCOMPARE(memtuples[j], memtupindex[j],
1649                                                 memtuples[j + 1], memtupindex[j + 1]) > 0)
1650                         j++;
1651                 if (HEAPCOMPARE(tuple, tupindex,
1652                                                 memtuples[j], memtupindex[j]) <= 0)
1653                         break;
1654                 memtuples[i] = memtuples[j];
1655                 memtupindex[i] = memtupindex[j];
1656                 i = j;
1657         }
1658         memtuples[i] = tuple;
1659         memtupindex[i] = tupindex;
1660 }
1661
1662
1663 /*
1664  * Tape interface routines
1665  */
1666
1667 static unsigned int
1668 getlen(Tuplesortstate *state, int tapenum, bool eofOK)
1669 {
1670         unsigned int len;
1671
1672         if (LogicalTapeRead(state->tapeset, tapenum, (void *) &len,
1673                                                 sizeof(len)) != sizeof(len))
1674                 elog(ERROR, "unexpected end of tape");
1675         if (len == 0 && !eofOK)
1676                 elog(ERROR, "unexpected end of data");
1677         return len;
1678 }
1679
1680 static void
1681 markrunend(Tuplesortstate *state, int tapenum)
1682 {
1683         unsigned int len = 0;
1684
1685         LogicalTapeWrite(state->tapeset, tapenum, (void *) &len, sizeof(len));
1686 }
1687
1688
1689 /*
1690  * qsort interface
1691  */
1692
1693 static int
1694 qsort_comparetup(const void *a, const void *b)
1695 {
1696         /* The passed pointers are pointers to void * ... */
1697
1698         return COMPARETUP(qsort_tuplesortstate, *(void **) a, *(void **) b);
1699 }
1700
1701
1702 /*
1703  * This routine selects an appropriate sorting function to implement
1704  * a sort operator as efficiently as possible.  The straightforward
1705  * method is to use the operator's implementation proc --- ie, "<"
1706  * comparison.  However, that way often requires two calls of the function
1707  * per comparison.      If we can find a btree three-way comparator function
1708  * associated with the operator, we can use it to do the comparisons
1709  * more efficiently.  We also support the possibility that the operator
1710  * is ">" (descending sort), in which case we have to reverse the output
1711  * of the btree comparator.
1712  *
1713  * Possibly this should live somewhere else (backend/catalog/, maybe?).
1714  */
1715 void
1716 SelectSortFunction(Oid sortOperator,
1717                                    RegProcedure *sortFunction,
1718                                    SortFunctionKind *kind)
1719 {
1720         CatCList   *catlist;
1721         int                     i;
1722         HeapTuple       tuple;
1723         Form_pg_operator optup;
1724         Oid                     opclass = InvalidOid;
1725
1726         /*
1727          * Search pg_amop to see if the target operator is registered as the
1728          * "<" or ">" operator of any btree opclass.  It's possible that it
1729          * might be registered both ways (eg, if someone were to build a
1730          * "reverse sort" opclass for some reason); prefer the "<" case if so.
1731          * If the operator is registered the same way in multiple opclasses,
1732          * assume we can use the associated comparator function from any one.
1733          */
1734         catlist = SearchSysCacheList(AMOPOPID, 1,
1735                                                                  ObjectIdGetDatum(sortOperator),
1736                                                                  0, 0, 0);
1737
1738         for (i = 0; i < catlist->n_members; i++)
1739         {
1740                 Form_pg_amop aform;
1741
1742                 tuple = &catlist->members[i]->tuple;
1743                 aform = (Form_pg_amop) GETSTRUCT(tuple);
1744
1745                 if (!opclass_is_btree(aform->amopclaid))
1746                         continue;
1747                 /* must be of default subtype, too */
1748                 if (aform->amopsubtype != InvalidOid)
1749                         continue;
1750
1751                 if (aform->amopstrategy == BTLessStrategyNumber)
1752                 {
1753                         opclass = aform->amopclaid;
1754                         *kind = SORTFUNC_CMP;
1755                         break;                          /* done looking */
1756                 }
1757                 else if (aform->amopstrategy == BTGreaterStrategyNumber)
1758                 {
1759                         opclass = aform->amopclaid;
1760                         *kind = SORTFUNC_REVCMP;
1761                         /* keep scanning in hopes of finding a BTLess entry */
1762                 }
1763         }
1764
1765         ReleaseSysCacheList(catlist);
1766
1767         if (OidIsValid(opclass))
1768         {
1769                 /* Found a suitable opclass, get its default comparator function */
1770                 *sortFunction = get_opclass_proc(opclass, InvalidOid, BTORDER_PROC);
1771                 Assert(RegProcedureIsValid(*sortFunction));
1772                 return;
1773         }
1774
1775         /*
1776          * Can't find a comparator, so use the operator as-is.  Decide whether
1777          * it is forward or reverse sort by looking at its name (grotty, but
1778          * this only matters for deciding which end NULLs should get sorted
1779          * to).  XXX possibly better idea: see whether its selectivity
1780          * function is scalargtcmp?
1781          */
1782         tuple = SearchSysCache(OPEROID,
1783                                                    ObjectIdGetDatum(sortOperator),
1784                                                    0, 0, 0);
1785         if (!HeapTupleIsValid(tuple))
1786                 elog(ERROR, "cache lookup failed for operator %u", sortOperator);
1787         optup = (Form_pg_operator) GETSTRUCT(tuple);
1788         if (strcmp(NameStr(optup->oprname), ">") == 0)
1789                 *kind = SORTFUNC_REVLT;
1790         else
1791                 *kind = SORTFUNC_LT;
1792         *sortFunction = optup->oprcode;
1793         ReleaseSysCache(tuple);
1794
1795         Assert(RegProcedureIsValid(*sortFunction));
1796 }
1797
1798 /*
1799  * Inline-able copy of FunctionCall2() to save some cycles in sorting.
1800  */
1801 static inline Datum
1802 myFunctionCall2(FmgrInfo *flinfo, Datum arg1, Datum arg2)
1803 {
1804         FunctionCallInfoData fcinfo;
1805         Datum           result;
1806
1807         InitFunctionCallInfoData(fcinfo, flinfo, 2, NULL, NULL);
1808
1809         fcinfo.arg[0] = arg1;
1810         fcinfo.arg[1] = arg2;
1811         fcinfo.argnull[0] = false;
1812         fcinfo.argnull[1] = false;
1813
1814         result = FunctionCallInvoke(&fcinfo);
1815
1816         /* Check for null result, since caller is clearly not expecting one */
1817         if (fcinfo.isnull)
1818                 elog(ERROR, "function %u returned NULL", fcinfo.flinfo->fn_oid);
1819
1820         return result;
1821 }
1822
1823 /*
1824  * Apply a sort function (by now converted to fmgr lookup form)
1825  * and return a 3-way comparison result.  This takes care of handling
1826  * NULLs and sort ordering direction properly.
1827  */
1828 static inline int32
1829 inlineApplySortFunction(FmgrInfo *sortFunction, SortFunctionKind kind,
1830                                                 Datum datum1, bool isNull1,
1831                                                 Datum datum2, bool isNull2)
1832 {
1833         switch (kind)
1834         {
1835                 case SORTFUNC_LT:
1836                         if (isNull1)
1837                         {
1838                                 if (isNull2)
1839                                         return 0;
1840                                 return 1;               /* NULL sorts after non-NULL */
1841                         }
1842                         if (isNull2)
1843                                 return -1;
1844                         if (DatumGetBool(myFunctionCall2(sortFunction, datum1, datum2)))
1845                                 return -1;              /* a < b */
1846                         if (DatumGetBool(myFunctionCall2(sortFunction, datum2, datum1)))
1847                                 return 1;               /* a > b */
1848                         return 0;
1849
1850                 case SORTFUNC_REVLT:
1851                         /* We reverse the ordering of NULLs, but not the operator */
1852                         if (isNull1)
1853                         {
1854                                 if (isNull2)
1855                                         return 0;
1856                                 return -1;              /* NULL sorts before non-NULL */
1857                         }
1858                         if (isNull2)
1859                                 return 1;
1860                         if (DatumGetBool(myFunctionCall2(sortFunction, datum1, datum2)))
1861                                 return -1;              /* a < b */
1862                         if (DatumGetBool(myFunctionCall2(sortFunction, datum2, datum1)))
1863                                 return 1;               /* a > b */
1864                         return 0;
1865
1866                 case SORTFUNC_CMP:
1867                         if (isNull1)
1868                         {
1869                                 if (isNull2)
1870                                         return 0;
1871                                 return 1;               /* NULL sorts after non-NULL */
1872                         }
1873                         if (isNull2)
1874                                 return -1;
1875                         return DatumGetInt32(myFunctionCall2(sortFunction,
1876                                                                                                  datum1, datum2));
1877
1878                 case SORTFUNC_REVCMP:
1879                         if (isNull1)
1880                         {
1881                                 if (isNull2)
1882                                         return 0;
1883                                 return -1;              /* NULL sorts before non-NULL */
1884                         }
1885                         if (isNull2)
1886                                 return 1;
1887                         return -DatumGetInt32(myFunctionCall2(sortFunction,
1888                                                                                                   datum1, datum2));
1889
1890                 default:
1891                         elog(ERROR, "unrecognized SortFunctionKind: %d", (int) kind);
1892                         return 0;                       /* can't get here, but keep compiler quiet */
1893         }
1894 }
1895
1896 /*
1897  * Non-inline ApplySortFunction() --- this is needed only to conform to
1898  * C99's brain-dead notions about how to implement inline functions...
1899  */
1900 int32
1901 ApplySortFunction(FmgrInfo *sortFunction, SortFunctionKind kind,
1902                                   Datum datum1, bool isNull1,
1903                                   Datum datum2, bool isNull2)
1904 {
1905         return inlineApplySortFunction(sortFunction, kind,
1906                                                                    datum1, isNull1,
1907                                                                    datum2, isNull2);
1908 }
1909
1910
1911 /*
1912  * Routines specialized for HeapTuple case
1913  */
1914
1915 static int
1916 comparetup_heap(Tuplesortstate *state, const void *a, const void *b)
1917 {
1918         HeapTuple       ltup = (HeapTuple) a;
1919         HeapTuple       rtup = (HeapTuple) b;
1920         TupleDesc       tupDesc = state->tupDesc;
1921         int                     nkey;
1922
1923         for (nkey = 0; nkey < state->nKeys; nkey++)
1924         {
1925                 ScanKey         scanKey = state->scanKeys + nkey;
1926                 AttrNumber      attno = scanKey->sk_attno;
1927                 Datum           datum1,
1928                                         datum2;
1929                 bool            isnull1,
1930                                         isnull2;
1931                 int32           compare;
1932
1933                 datum1 = heap_getattr(ltup, attno, tupDesc, &isnull1);
1934                 datum2 = heap_getattr(rtup, attno, tupDesc, &isnull2);
1935
1936                 compare = inlineApplySortFunction(&scanKey->sk_func,
1937                                                                                   state->sortFnKinds[nkey],
1938                                                                                   datum1, isnull1,
1939                                                                                   datum2, isnull2);
1940                 if (compare != 0)
1941                         return compare;
1942         }
1943
1944         return 0;
1945 }
1946
1947 static void *
1948 copytup_heap(Tuplesortstate *state, void *tup)
1949 {
1950         HeapTuple       tuple = (HeapTuple) tup;
1951
1952         tuple = heap_copytuple(tuple);
1953         USEMEM(state, GetMemoryChunkSpace(tuple));
1954         return (void *) tuple;
1955 }
1956
1957 /*
1958  * We don't bother to write the HeapTupleData part of the tuple.
1959  */
1960
1961 static void
1962 writetup_heap(Tuplesortstate *state, int tapenum, void *tup)
1963 {
1964         HeapTuple       tuple = (HeapTuple) tup;
1965         unsigned int tuplen;
1966
1967         tuplen = tuple->t_len + sizeof(tuplen);
1968         LogicalTapeWrite(state->tapeset, tapenum,
1969                                          (void *) &tuplen, sizeof(tuplen));
1970         LogicalTapeWrite(state->tapeset, tapenum,
1971                                          (void *) tuple->t_data, tuple->t_len);
1972         if (state->randomAccess)        /* need trailing length word? */
1973                 LogicalTapeWrite(state->tapeset, tapenum,
1974                                                  (void *) &tuplen, sizeof(tuplen));
1975
1976         FREEMEM(state, GetMemoryChunkSpace(tuple));
1977         heap_freetuple(tuple);
1978 }
1979
1980 static void *
1981 readtup_heap(Tuplesortstate *state, int tapenum, unsigned int len)
1982 {
1983         unsigned int tuplen = len - sizeof(unsigned int) + HEAPTUPLESIZE;
1984         HeapTuple       tuple = (HeapTuple) palloc(tuplen);
1985
1986         USEMEM(state, GetMemoryChunkSpace(tuple));
1987         /* reconstruct the HeapTupleData portion */
1988         tuple->t_len = len - sizeof(unsigned int);
1989         ItemPointerSetInvalid(&(tuple->t_self));
1990         tuple->t_datamcxt = CurrentMemoryContext;
1991         tuple->t_data = (HeapTupleHeader) (((char *) tuple) + HEAPTUPLESIZE);
1992         /* read in the tuple proper */
1993         if (LogicalTapeRead(state->tapeset, tapenum, (void *) tuple->t_data,
1994                                                 tuple->t_len) != tuple->t_len)
1995                 elog(ERROR, "unexpected end of data");
1996         if (state->randomAccess)        /* need trailing length word? */
1997                 if (LogicalTapeRead(state->tapeset, tapenum, (void *) &tuplen,
1998                                                         sizeof(tuplen)) != sizeof(tuplen))
1999                         elog(ERROR, "unexpected end of data");
2000         return (void *) tuple;
2001 }
2002
2003
2004 /*
2005  * Routines specialized for IndexTuple case
2006  *
2007  * NOTE: actually, these are specialized for the btree case; it's not
2008  * clear whether you could use them for a non-btree index.      Possibly
2009  * you'd need to make another set of routines if you needed to sort
2010  * according to another kind of index.
2011  */
2012
2013 static int
2014 comparetup_index(Tuplesortstate *state, const void *a, const void *b)
2015 {
2016         /*
2017          * This is almost the same as _bt_tuplecompare(), but we need to keep
2018          * track of whether any null fields are present.  Also see the special
2019          * treatment for equal keys at the end.
2020          */
2021         IndexTuple      tuple1 = (IndexTuple) a;
2022         IndexTuple      tuple2 = (IndexTuple) b;
2023         Relation        rel = state->indexRel;
2024         int                     keysz = RelationGetNumberOfAttributes(rel);
2025         ScanKey         scankey = state->indexScanKey;
2026         TupleDesc       tupDes;
2027         int                     i;
2028         bool            equal_hasnull = false;
2029
2030         tupDes = RelationGetDescr(rel);
2031
2032         for (i = 1; i <= keysz; i++)
2033         {
2034                 ScanKey         entry = &scankey[i - 1];
2035                 Datum           datum1,
2036                                         datum2;
2037                 bool            isnull1,
2038                                         isnull2;
2039                 int32           compare;
2040
2041                 datum1 = index_getattr(tuple1, i, tupDes, &isnull1);
2042                 datum2 = index_getattr(tuple2, i, tupDes, &isnull2);
2043
2044                 /* see comments about NULLs handling in btbuild */
2045
2046                 /* the comparison function is always of CMP type */
2047                 compare = inlineApplySortFunction(&entry->sk_func, SORTFUNC_CMP,
2048                                                                                   datum1, isnull1,
2049                                                                                   datum2, isnull2);
2050
2051                 if (compare != 0)
2052                         return (int) compare;           /* done when we find unequal
2053                                                                                  * attributes */
2054
2055                 /* they are equal, so we only need to examine one null flag */
2056                 if (isnull1)
2057                         equal_hasnull = true;
2058         }
2059
2060         /*
2061          * If btree has asked us to enforce uniqueness, complain if two equal
2062          * tuples are detected (unless there was at least one NULL field).
2063          *
2064          * It is sufficient to make the test here, because if two tuples are
2065          * equal they *must* get compared at some stage of the sort ---
2066          * otherwise the sort algorithm wouldn't have checked whether one must
2067          * appear before the other.
2068          *
2069          * Some rather brain-dead implementations of qsort will sometimes call
2070          * the comparison routine to compare a value to itself.  (At this
2071          * writing only QNX 4 is known to do such silly things.)  Don't raise
2072          * a bogus error in that case.
2073          */
2074         if (state->enforceUnique && !equal_hasnull && tuple1 != tuple2)
2075                 ereport(ERROR,
2076                                 (errcode(ERRCODE_UNIQUE_VIOLATION),
2077                                  errmsg("could not create unique index"),
2078                                  errdetail("Table contains duplicated values.")));
2079
2080         /*
2081          * If key values are equal, we sort on ItemPointer.  This does not
2082          * affect validity of the finished index, but it offers cheap
2083          * insurance against performance problems with bad qsort
2084          * implementations that have trouble with large numbers of equal keys.
2085          */
2086         {
2087                 BlockNumber blk1 = ItemPointerGetBlockNumber(&tuple1->t_tid);
2088                 BlockNumber blk2 = ItemPointerGetBlockNumber(&tuple2->t_tid);
2089
2090                 if (blk1 != blk2)
2091                         return (blk1 < blk2) ? -1 : 1;
2092         }
2093         {
2094                 OffsetNumber pos1 = ItemPointerGetOffsetNumber(&tuple1->t_tid);
2095                 OffsetNumber pos2 = ItemPointerGetOffsetNumber(&tuple2->t_tid);
2096
2097                 if (pos1 != pos2)
2098                         return (pos1 < pos2) ? -1 : 1;
2099         }
2100
2101         return 0;
2102 }
2103
2104 static void *
2105 copytup_index(Tuplesortstate *state, void *tup)
2106 {
2107         IndexTuple      tuple = (IndexTuple) tup;
2108         unsigned int tuplen = IndexTupleSize(tuple);
2109         IndexTuple      newtuple;
2110
2111         newtuple = (IndexTuple) palloc(tuplen);
2112         USEMEM(state, GetMemoryChunkSpace(newtuple));
2113
2114         memcpy(newtuple, tuple, tuplen);
2115
2116         return (void *) newtuple;
2117 }
2118
2119 static void
2120 writetup_index(Tuplesortstate *state, int tapenum, void *tup)
2121 {
2122         IndexTuple      tuple = (IndexTuple) tup;
2123         unsigned int tuplen;
2124
2125         tuplen = IndexTupleSize(tuple) + sizeof(tuplen);
2126         LogicalTapeWrite(state->tapeset, tapenum,
2127                                          (void *) &tuplen, sizeof(tuplen));
2128         LogicalTapeWrite(state->tapeset, tapenum,
2129                                          (void *) tuple, IndexTupleSize(tuple));
2130         if (state->randomAccess)        /* need trailing length word? */
2131                 LogicalTapeWrite(state->tapeset, tapenum,
2132                                                  (void *) &tuplen, sizeof(tuplen));
2133
2134         FREEMEM(state, GetMemoryChunkSpace(tuple));
2135         pfree(tuple);
2136 }
2137
2138 static void *
2139 readtup_index(Tuplesortstate *state, int tapenum, unsigned int len)
2140 {
2141         unsigned int tuplen = len - sizeof(unsigned int);
2142         IndexTuple      tuple = (IndexTuple) palloc(tuplen);
2143
2144         USEMEM(state, GetMemoryChunkSpace(tuple));
2145         if (LogicalTapeRead(state->tapeset, tapenum, (void *) tuple,
2146                                                 tuplen) != tuplen)
2147                 elog(ERROR, "unexpected end of data");
2148         if (state->randomAccess)        /* need trailing length word? */
2149                 if (LogicalTapeRead(state->tapeset, tapenum, (void *) &tuplen,
2150                                                         sizeof(tuplen)) != sizeof(tuplen))
2151                         elog(ERROR, "unexpected end of data");
2152         return (void *) tuple;
2153 }
2154
2155
2156 /*
2157  * Routines specialized for DatumTuple case
2158  */
2159
2160 static int
2161 comparetup_datum(Tuplesortstate *state, const void *a, const void *b)
2162 {
2163         DatumTuple *ltup = (DatumTuple *) a;
2164         DatumTuple *rtup = (DatumTuple *) b;
2165
2166         return inlineApplySortFunction(&state->sortOpFn, state->sortFnKind,
2167                                                                    ltup->val, ltup->isNull,
2168                                                                    rtup->val, rtup->isNull);
2169 }
2170
2171 static void *
2172 copytup_datum(Tuplesortstate *state, void *tup)
2173 {
2174         /* Not currently needed */
2175         elog(ERROR, "copytup_datum() should not be called");
2176         return NULL;
2177 }
2178
2179 static void
2180 writetup_datum(Tuplesortstate *state, int tapenum, void *tup)
2181 {
2182         DatumTuple *tuple = (DatumTuple *) tup;
2183         unsigned int tuplen;
2184         unsigned int writtenlen;
2185
2186         if (tuple->isNull || state->datumTypeByVal)
2187                 tuplen = sizeof(DatumTuple);
2188         else
2189         {
2190                 Size            datalen;
2191
2192                 datalen = datumGetSize(tuple->val, false, state->datumTypeLen);
2193                 tuplen = datalen + MAXALIGN(sizeof(DatumTuple));
2194         }
2195
2196         writtenlen = tuplen + sizeof(unsigned int);
2197
2198         LogicalTapeWrite(state->tapeset, tapenum,
2199                                          (void *) &writtenlen, sizeof(writtenlen));
2200         LogicalTapeWrite(state->tapeset, tapenum,
2201                                          (void *) tuple, tuplen);
2202         if (state->randomAccess)        /* need trailing length word? */
2203                 LogicalTapeWrite(state->tapeset, tapenum,
2204                                                  (void *) &writtenlen, sizeof(writtenlen));
2205
2206         FREEMEM(state, GetMemoryChunkSpace(tuple));
2207         pfree(tuple);
2208 }
2209
2210 static void *
2211 readtup_datum(Tuplesortstate *state, int tapenum, unsigned int len)
2212 {
2213         unsigned int tuplen = len - sizeof(unsigned int);
2214         DatumTuple *tuple = (DatumTuple *) palloc(tuplen);
2215
2216         USEMEM(state, GetMemoryChunkSpace(tuple));
2217         if (LogicalTapeRead(state->tapeset, tapenum, (void *) tuple,
2218                                                 tuplen) != tuplen)
2219                 elog(ERROR, "unexpected end of data");
2220         if (state->randomAccess)        /* need trailing length word? */
2221                 if (LogicalTapeRead(state->tapeset, tapenum, (void *) &tuplen,
2222                                                         sizeof(tuplen)) != sizeof(tuplen))
2223                         elog(ERROR, "unexpected end of data");
2224
2225         if (!tuple->isNull && !state->datumTypeByVal)
2226                 tuple->val = PointerGetDatum(((char *) tuple) +
2227                                                                          MAXALIGN(sizeof(DatumTuple)));
2228         return (void *) tuple;
2229 }