]> granicus.if.org Git - postgresql/blob - src/backend/utils/sort/tuplestore.c
Update copyright for the year 2010.
[postgresql] / src / backend / utils / sort / tuplestore.c
1 /*-------------------------------------------------------------------------
2  *
3  * tuplestore.c
4  *        Generalized routines for temporary tuple storage.
5  *
6  * This module handles temporary storage of tuples for purposes such
7  * as Materialize nodes, hashjoin batch files, etc.  It is essentially
8  * a dumbed-down version of tuplesort.c; it does no sorting of tuples
9  * but can only store and regurgitate a sequence of tuples.  However,
10  * because no sort is required, it is allowed to start reading the sequence
11  * before it has all been written.      This is particularly useful for cursors,
12  * because it allows random access within the already-scanned portion of
13  * a query without having to process the underlying scan to completion.
14  * Also, it is possible to support multiple independent read pointers.
15  *
16  * A temporary file is used to handle the data if it exceeds the
17  * space limit specified by the caller.
18  *
19  * The (approximate) amount of memory allowed to the tuplestore is specified
20  * in kilobytes by the caller.  We absorb tuples and simply store them in an
21  * in-memory array as long as we haven't exceeded maxKBytes.  If we do exceed
22  * maxKBytes, we dump all the tuples into a temp file and then read from that
23  * when needed.
24  *
25  * Upon creation, a tuplestore supports a single read pointer, numbered 0.
26  * Additional read pointers can be created using tuplestore_alloc_read_pointer.
27  * Mark/restore behavior is supported by copying read pointers.
28  *
29  * When the caller requests backward-scan capability, we write the temp file
30  * in a format that allows either forward or backward scan.  Otherwise, only
31  * forward scan is allowed.  A request for backward scan must be made before
32  * putting any tuples into the tuplestore.      Rewind is normally allowed but
33  * can be turned off via tuplestore_set_eflags; turning off rewind for all
34  * read pointers enables truncation of the tuplestore at the oldest read point
35  * for minimal memory usage.  (The caller must explicitly call tuplestore_trim
36  * at appropriate times for truncation to actually happen.)
37  *
38  * Note: in TSS_WRITEFILE state, the temp file's seek position is the
39  * current write position, and the write-position variables in the tuplestore
40  * aren't kept up to date.  Similarly, in TSS_READFILE state the temp file's
41  * seek position is the active read pointer's position, and that read pointer
42  * isn't kept up to date.  We update the appropriate variables using ftell()
43  * before switching to the other state or activating a different read pointer.
44  *
45  *
46  * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
47  * Portions Copyright (c) 1994, Regents of the University of California
48  *
49  * IDENTIFICATION
50  *        $PostgreSQL: pgsql/src/backend/utils/sort/tuplestore.c,v 1.50 2010/01/02 16:57:58 momjian Exp $
51  *
52  *-------------------------------------------------------------------------
53  */
54
55 #include "postgres.h"
56
57 #include "commands/tablespace.h"
58 #include "executor/executor.h"
59 #include "storage/buffile.h"
60 #include "utils/memutils.h"
61 #include "utils/resowner.h"
62 #include "utils/tuplestore.h"
63
64
65 /*
66  * Possible states of a Tuplestore object.      These denote the states that
67  * persist between calls of Tuplestore routines.
68  */
69 typedef enum
70 {
71         TSS_INMEM,                                      /* Tuples still fit in memory */
72         TSS_WRITEFILE,                          /* Writing to temp file */
73         TSS_READFILE                            /* Reading from temp file */
74 } TupStoreStatus;
75
76 /*
77  * State for a single read pointer.  If we are in state INMEM then all the
78  * read pointers' "current" fields denote the read positions.  In state
79  * WRITEFILE, the file/offset fields denote the read positions.  In state
80  * READFILE, inactive read pointers have valid file/offset, but the active
81  * read pointer implicitly has position equal to the temp file's seek position.
82  *
83  * Special case: if eof_reached is true, then the pointer's read position is
84  * implicitly equal to the write position, and current/file/offset aren't
85  * maintained.  This way we need not update all the read pointers each time
86  * we write.
87  */
88 typedef struct
89 {
90         int                     eflags;                 /* capability flags */
91         bool            eof_reached;    /* read has reached EOF */
92         int                     current;                /* next array index to read */
93         int                     file;                   /* temp file# */
94         off_t           offset;                 /* byte offset in file */
95 } TSReadPointer;
96
97 /*
98  * Private state of a Tuplestore operation.
99  */
100 struct Tuplestorestate
101 {
102         TupStoreStatus status;          /* enumerated value as shown above */
103         int                     eflags;                 /* capability flags (OR of pointers' flags) */
104         bool            backward;               /* store extra length words in file? */
105         bool            interXact;              /* keep open through transactions? */
106         bool            truncated;              /* tuplestore_trim has removed tuples? */
107         long            availMem;               /* remaining memory available, in bytes */
108         BufFile    *myfile;                     /* underlying file, or NULL if none */
109         MemoryContext context;          /* memory context for holding tuples */
110         ResourceOwner resowner;         /* resowner for holding temp files */
111
112         /*
113          * These function pointers decouple the routines that must know what kind
114          * of tuple we are handling from the routines that don't need to know it.
115          * They are set up by the tuplestore_begin_xxx routines.
116          *
117          * (Although tuplestore.c currently only supports heap tuples, I've copied
118          * this part of tuplesort.c so that extension to other kinds of objects
119          * will be easy if it's ever needed.)
120          *
121          * Function to copy a supplied input tuple into palloc'd space. (NB: we
122          * assume that a single pfree() is enough to release the tuple later, so
123          * the representation must be "flat" in one palloc chunk.) state->availMem
124          * must be decreased by the amount of space used.
125          */
126         void       *(*copytup) (Tuplestorestate *state, void *tup);
127
128         /*
129          * Function to write a stored tuple onto tape.  The representation of the
130          * tuple on tape need not be the same as it is in memory; requirements on
131          * the tape representation are given below.  After writing the tuple,
132          * pfree() it, and increase state->availMem by the amount of memory space
133          * thereby released.
134          */
135         void            (*writetup) (Tuplestorestate *state, void *tup);
136
137         /*
138          * Function to read a stored tuple from tape back into memory. 'len' is
139          * the already-read length of the stored tuple.  Create and return a
140          * palloc'd copy, and decrease state->availMem by the amount of memory
141          * space consumed.
142          */
143         void       *(*readtup) (Tuplestorestate *state, unsigned int len);
144
145         /*
146          * This array holds pointers to tuples in memory if we are in state INMEM.
147          * In states WRITEFILE and READFILE it's not used.
148          */
149         void      **memtuples;          /* array of pointers to palloc'd tuples */
150         int                     memtupcount;    /* number of tuples currently present */
151         int                     memtupsize;             /* allocated length of memtuples array */
152
153         /*
154          * These variables are used to keep track of the current positions.
155          *
156          * In state WRITEFILE, the current file seek position is the write point;
157          * in state READFILE, the write position is remembered in writepos_xxx.
158          * (The write position is the same as EOF, but since BufFileSeek doesn't
159          * currently implement SEEK_END, we have to remember it explicitly.)
160          */
161         TSReadPointer *readptrs;        /* array of read pointers */
162         int                     activeptr;              /* index of the active read pointer */
163         int                     readptrcount;   /* number of pointers currently valid */
164         int                     readptrsize;    /* allocated length of readptrs array */
165
166         int                     writepos_file;  /* file# (valid if READFILE state) */
167         off_t           writepos_offset;        /* offset (valid if READFILE state) */
168 };
169
170 #define COPYTUP(state,tup)      ((*(state)->copytup) (state, tup))
171 #define WRITETUP(state,tup) ((*(state)->writetup) (state, tup))
172 #define READTUP(state,len)      ((*(state)->readtup) (state, len))
173 #define LACKMEM(state)          ((state)->availMem < 0)
174 #define USEMEM(state,amt)       ((state)->availMem -= (amt))
175 #define FREEMEM(state,amt)      ((state)->availMem += (amt))
176
177 /*--------------------
178  *
179  * NOTES about on-tape representation of tuples:
180  *
181  * We require the first "unsigned int" of a stored tuple to be the total size
182  * on-tape of the tuple, including itself (so it is never zero).
183  * The remainder of the stored tuple
184  * may or may not match the in-memory representation of the tuple ---
185  * any conversion needed is the job of the writetup and readtup routines.
186  *
187  * If state->backward is true, then the stored representation of
188  * the tuple must be followed by another "unsigned int" that is a copy of the
189  * length --- so the total tape space used is actually sizeof(unsigned int)
190  * more than the stored length value.  This allows read-backwards.      When
191  * state->backward is not set, the write/read routines may omit the extra
192  * length word.
193  *
194  * writetup is expected to write both length words as well as the tuple
195  * data.  When readtup is called, the tape is positioned just after the
196  * front length word; readtup must read the tuple data and advance past
197  * the back length word (if present).
198  *
199  * The write/read routines can make use of the tuple description data
200  * stored in the Tuplestorestate record, if needed. They are also expected
201  * to adjust state->availMem by the amount of memory space (not tape space!)
202  * released or consumed.  There is no error return from either writetup
203  * or readtup; they should ereport() on failure.
204  *
205  *
206  * NOTES about memory consumption calculations:
207  *
208  * We count space allocated for tuples against the maxKBytes limit,
209  * plus the space used by the variable-size array memtuples.
210  * Fixed-size space (primarily the BufFile I/O buffer) is not counted.
211  * We don't worry about the size of the read pointer array, either.
212  *
213  * Note that we count actual space used (as shown by GetMemoryChunkSpace)
214  * rather than the originally-requested size.  This is important since
215  * palloc can add substantial overhead.  It's not a complete answer since
216  * we won't count any wasted space in palloc allocation blocks, but it's
217  * a lot better than what we were doing before 7.3.
218  *
219  *--------------------
220  */
221
222
223 static Tuplestorestate *tuplestore_begin_common(int eflags,
224                                                 bool interXact,
225                                                 int maxKBytes);
226 static void tuplestore_puttuple_common(Tuplestorestate *state, void *tuple);
227 static void dumptuples(Tuplestorestate *state);
228 static unsigned int getlen(Tuplestorestate *state, bool eofOK);
229 static void *copytup_heap(Tuplestorestate *state, void *tup);
230 static void writetup_heap(Tuplestorestate *state, void *tup);
231 static void *readtup_heap(Tuplestorestate *state, unsigned int len);
232
233
234 /*
235  *              tuplestore_begin_xxx
236  *
237  * Initialize for a tuple store operation.
238  */
239 static Tuplestorestate *
240 tuplestore_begin_common(int eflags, bool interXact, int maxKBytes)
241 {
242         Tuplestorestate *state;
243
244         state = (Tuplestorestate *) palloc0(sizeof(Tuplestorestate));
245
246         state->status = TSS_INMEM;
247         state->eflags = eflags;
248         state->interXact = interXact;
249         state->truncated = false;
250         state->availMem = maxKBytes * 1024L;
251         state->myfile = NULL;
252         state->context = CurrentMemoryContext;
253         state->resowner = CurrentResourceOwner;
254
255         state->memtupcount = 0;
256         state->memtupsize = 1024;       /* initial guess */
257         state->memtuples = (void **) palloc(state->memtupsize * sizeof(void *));
258
259         USEMEM(state, GetMemoryChunkSpace(state->memtuples));
260
261         state->activeptr = 0;
262         state->readptrcount = 1;
263         state->readptrsize = 8;         /* arbitrary */
264         state->readptrs = (TSReadPointer *)
265                 palloc(state->readptrsize * sizeof(TSReadPointer));
266
267         state->readptrs[0].eflags = eflags;
268         state->readptrs[0].eof_reached = false;
269         state->readptrs[0].current = 0;
270
271         return state;
272 }
273
274 /*
275  * tuplestore_begin_heap
276  *
277  * Create a new tuplestore; other types of tuple stores (other than
278  * "heap" tuple stores, for heap tuples) are possible, but not presently
279  * implemented.
280  *
281  * randomAccess: if true, both forward and backward accesses to the
282  * tuple store are allowed.
283  *
284  * interXact: if true, the files used for on-disk storage persist beyond the
285  * end of the current transaction.      NOTE: It's the caller's responsibility to
286  * create such a tuplestore in a memory context and resource owner that will
287  * also survive transaction boundaries, and to ensure the tuplestore is closed
288  * when it's no longer wanted.
289  *
290  * maxKBytes: how much data to store in memory (any data beyond this
291  * amount is paged to disk).  When in doubt, use work_mem.
292  */
293 Tuplestorestate *
294 tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
295 {
296         Tuplestorestate *state;
297         int                     eflags;
298
299         /*
300          * This interpretation of the meaning of randomAccess is compatible with
301          * the pre-8.3 behavior of tuplestores.
302          */
303         eflags = randomAccess ?
304                 (EXEC_FLAG_BACKWARD | EXEC_FLAG_REWIND) :
305                 (EXEC_FLAG_REWIND);
306
307         state = tuplestore_begin_common(eflags, interXact, maxKBytes);
308
309         state->copytup = copytup_heap;
310         state->writetup = writetup_heap;
311         state->readtup = readtup_heap;
312
313         return state;
314 }
315
316 /*
317  * tuplestore_set_eflags
318  *
319  * Set the capability flags for read pointer 0 at a finer grain than is
320  * allowed by tuplestore_begin_xxx.  This must be called before inserting
321  * any data into the tuplestore.
322  *
323  * eflags is a bitmask following the meanings used for executor node
324  * startup flags (see executor.h).      tuplestore pays attention to these bits:
325  *              EXEC_FLAG_REWIND                need rewind to start
326  *              EXEC_FLAG_BACKWARD              need backward fetch
327  * If tuplestore_set_eflags is not called, REWIND is allowed, and BACKWARD
328  * is set per "randomAccess" in the tuplestore_begin_xxx call.
329  *
330  * NOTE: setting BACKWARD without REWIND means the pointer can read backwards,
331  * but not further than the truncation point (the furthest-back read pointer
332  * position at the time of the last tuplestore_trim call).
333  */
334 void
335 tuplestore_set_eflags(Tuplestorestate *state, int eflags)
336 {
337         int                     i;
338
339         if (state->status != TSS_INMEM || state->memtupcount != 0)
340                 elog(ERROR, "too late to call tuplestore_set_eflags");
341
342         state->readptrs[0].eflags = eflags;
343         for (i = 1; i < state->readptrcount; i++)
344                 eflags |= state->readptrs[i].eflags;
345         state->eflags = eflags;
346 }
347
348 /*
349  * tuplestore_alloc_read_pointer - allocate another read pointer.
350  *
351  * Returns the pointer's index.
352  *
353  * The new pointer initially copies the position of read pointer 0.
354  * It can have its own eflags, but if any data has been inserted into
355  * the tuplestore, these eflags must not represent an increase in
356  * requirements.
357  */
358 int
359 tuplestore_alloc_read_pointer(Tuplestorestate *state, int eflags)
360 {
361         /* Check for possible increase of requirements */
362         if (state->status != TSS_INMEM || state->memtupcount != 0)
363         {
364                 if ((state->eflags | eflags) != state->eflags)
365                         elog(ERROR, "too late to require new tuplestore eflags");
366         }
367
368         /* Make room for another read pointer if needed */
369         if (state->readptrcount >= state->readptrsize)
370         {
371                 int                     newcnt = state->readptrsize * 2;
372
373                 state->readptrs = (TSReadPointer *)
374                         repalloc(state->readptrs, newcnt * sizeof(TSReadPointer));
375                 state->readptrsize = newcnt;
376         }
377
378         /* And set it up */
379         state->readptrs[state->readptrcount] = state->readptrs[0];
380         state->readptrs[state->readptrcount].eflags = eflags;
381
382         state->eflags |= eflags;
383
384         return state->readptrcount++;
385 }
386
387 /*
388  * tuplestore_clear
389  *
390  *      Delete all the contents of a tuplestore, and reset its read pointers
391  *      to the start.
392  */
393 void
394 tuplestore_clear(Tuplestorestate *state)
395 {
396         int                     i;
397         TSReadPointer *readptr;
398
399         if (state->myfile)
400                 BufFileClose(state->myfile);
401         state->myfile = NULL;
402         if (state->memtuples)
403         {
404                 for (i = 0; i < state->memtupcount; i++)
405                 {
406                         FREEMEM(state, GetMemoryChunkSpace(state->memtuples[i]));
407                         pfree(state->memtuples[i]);
408                 }
409         }
410         state->status = TSS_INMEM;
411         state->truncated = false;
412         state->memtupcount = 0;
413         readptr = state->readptrs;
414         for (i = 0; i < state->readptrcount; readptr++, i++)
415         {
416                 readptr->eof_reached = false;
417                 readptr->current = 0;
418         }
419 }
420
421 /*
422  * tuplestore_end
423  *
424  *      Release resources and clean up.
425  */
426 void
427 tuplestore_end(Tuplestorestate *state)
428 {
429         int                     i;
430
431         if (state->myfile)
432                 BufFileClose(state->myfile);
433         if (state->memtuples)
434         {
435                 for (i = 0; i < state->memtupcount; i++)
436                         pfree(state->memtuples[i]);
437                 pfree(state->memtuples);
438         }
439         pfree(state->readptrs);
440         pfree(state);
441 }
442
443 /*
444  * tuplestore_select_read_pointer - make the specified read pointer active
445  */
446 void
447 tuplestore_select_read_pointer(Tuplestorestate *state, int ptr)
448 {
449         TSReadPointer *readptr;
450         TSReadPointer *oldptr;
451
452         Assert(ptr >= 0 && ptr < state->readptrcount);
453
454         /* No work if already active */
455         if (ptr == state->activeptr)
456                 return;
457
458         readptr = &state->readptrs[ptr];
459         oldptr = &state->readptrs[state->activeptr];
460
461         switch (state->status)
462         {
463                 case TSS_INMEM:
464                 case TSS_WRITEFILE:
465                         /* no work */
466                         break;
467                 case TSS_READFILE:
468
469                         /*
470                          * First, save the current read position in the pointer about to
471                          * become inactive.
472                          */
473                         if (!oldptr->eof_reached)
474                                 BufFileTell(state->myfile,
475                                                         &oldptr->file,
476                                                         &oldptr->offset);
477
478                         /*
479                          * We have to make the temp file's seek position equal to the
480                          * logical position of the new read pointer.  In eof_reached
481                          * state, that's the EOF, which we have available from the saved
482                          * write position.
483                          */
484                         if (readptr->eof_reached)
485                         {
486                                 if (BufFileSeek(state->myfile,
487                                                                 state->writepos_file,
488                                                                 state->writepos_offset,
489                                                                 SEEK_SET) != 0)
490                                         elog(ERROR, "tuplestore seek failed");
491                         }
492                         else
493                         {
494                                 if (BufFileSeek(state->myfile,
495                                                                 readptr->file,
496                                                                 readptr->offset,
497                                                                 SEEK_SET) != 0)
498                                         elog(ERROR, "tuplestore seek failed");
499                         }
500                         break;
501                 default:
502                         elog(ERROR, "invalid tuplestore state");
503                         break;
504         }
505
506         state->activeptr = ptr;
507 }
508
509 /*
510  * tuplestore_ateof
511  *
512  * Returns the active read pointer's eof_reached state.
513  */
514 bool
515 tuplestore_ateof(Tuplestorestate *state)
516 {
517         return state->readptrs[state->activeptr].eof_reached;
518 }
519
520 /*
521  * Accept one tuple and append it to the tuplestore.
522  *
523  * Note that the input tuple is always copied; the caller need not save it.
524  *
525  * If the active read pointer is currently "at EOF", it remains so (the read
526  * pointer implicitly advances along with the write pointer); otherwise the
527  * read pointer is unchanged.  Non-active read pointers do not move, which
528  * means they are certain to not be "at EOF" immediately after puttuple.
529  * This curious-seeming behavior is for the convenience of nodeMaterial.c and
530  * nodeCtescan.c, which would otherwise need to do extra pointer repositioning
531  * steps.
532  *
533  * tuplestore_puttupleslot() is a convenience routine to collect data from
534  * a TupleTableSlot without an extra copy operation.
535  */
536 void
537 tuplestore_puttupleslot(Tuplestorestate *state,
538                                                 TupleTableSlot *slot)
539 {
540         MinimalTuple tuple;
541         MemoryContext oldcxt = MemoryContextSwitchTo(state->context);
542
543         /*
544          * Form a MinimalTuple in working memory
545          */
546         tuple = ExecCopySlotMinimalTuple(slot);
547         USEMEM(state, GetMemoryChunkSpace(tuple));
548
549         tuplestore_puttuple_common(state, (void *) tuple);
550
551         MemoryContextSwitchTo(oldcxt);
552 }
553
554 /*
555  * "Standard" case to copy from a HeapTuple.  This is actually now somewhat
556  * deprecated, but not worth getting rid of in view of the number of callers.
557  */
558 void
559 tuplestore_puttuple(Tuplestorestate *state, HeapTuple tuple)
560 {
561         MemoryContext oldcxt = MemoryContextSwitchTo(state->context);
562
563         /*
564          * Copy the tuple.      (Must do this even in WRITEFILE case.)
565          */
566         tuple = COPYTUP(state, tuple);
567
568         tuplestore_puttuple_common(state, (void *) tuple);
569
570         MemoryContextSwitchTo(oldcxt);
571 }
572
573 /*
574  * Similar to tuplestore_puttuple(), but start from the values + nulls
575  * array. This avoids requiring that the caller construct a HeapTuple,
576  * saving a copy.
577  */
578 void
579 tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc,
580                                          Datum *values, bool *isnull)
581 {
582         MinimalTuple tuple;
583         MemoryContext oldcxt = MemoryContextSwitchTo(state->context);
584
585         tuple = heap_form_minimal_tuple(tdesc, values, isnull);
586
587         tuplestore_puttuple_common(state, (void *) tuple);
588
589         MemoryContextSwitchTo(oldcxt);  
590 }
591
592 static void
593 tuplestore_puttuple_common(Tuplestorestate *state, void *tuple)
594 {
595         TSReadPointer *readptr;
596         int                     i;
597         ResourceOwner oldowner;
598
599         switch (state->status)
600         {
601                 case TSS_INMEM:
602
603                         /*
604                          * Update read pointers as needed; see API spec above.
605                          */
606                         readptr = state->readptrs;
607                         for (i = 0; i < state->readptrcount; readptr++, i++)
608                         {
609                                 if (readptr->eof_reached && i != state->activeptr)
610                                 {
611                                         readptr->eof_reached = false;
612                                         readptr->current = state->memtupcount;
613                                 }
614                         }
615
616                         /*
617                          * Grow the array as needed.  Note that we try to grow the array
618                          * when there is still one free slot remaining --- if we fail,
619                          * there'll still be room to store the incoming tuple, and then
620                          * we'll switch to tape-based operation.
621                          */
622                         if (state->memtupcount >= state->memtupsize - 1)
623                         {
624                                 /*
625                                  * See grow_memtuples() in tuplesort.c for the rationale
626                                  * behind these two tests.
627                                  */
628                                 if (state->availMem > (long) (state->memtupsize * sizeof(void *)) &&
629                                         (Size) (state->memtupsize * 2) < MaxAllocSize / sizeof(void *))
630                                 {
631                                         FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
632                                         state->memtupsize *= 2;
633                                         state->memtuples = (void **)
634                                                 repalloc(state->memtuples,
635                                                                  state->memtupsize * sizeof(void *));
636                                         USEMEM(state, GetMemoryChunkSpace(state->memtuples));
637                                 }
638                         }
639
640                         /* Stash the tuple in the in-memory array */
641                         state->memtuples[state->memtupcount++] = tuple;
642
643                         /*
644                          * Done if we still fit in available memory and have array slots.
645                          */
646                         if (state->memtupcount < state->memtupsize && !LACKMEM(state))
647                                 return;
648
649                         /*
650                          * Nope; time to switch to tape-based operation.  Make sure that
651                          * the temp file(s) are created in suitable temp tablespaces.
652                          */
653                         PrepareTempTablespaces();
654
655                         /* associate the file with the store's resource owner */
656                         oldowner = CurrentResourceOwner;
657                         CurrentResourceOwner = state->resowner;
658
659                         state->myfile = BufFileCreateTemp(state->interXact);
660
661                         CurrentResourceOwner = oldowner;
662
663                         /*
664                          * Freeze the decision about whether trailing length words will be
665                          * used.  We can't change this choice once data is on tape, even
666                          * though callers might drop the requirement.
667                          */
668                         state->backward = (state->eflags & EXEC_FLAG_BACKWARD) != 0;
669                         state->status = TSS_WRITEFILE;
670                         dumptuples(state);
671                         break;
672                 case TSS_WRITEFILE:
673
674                         /*
675                          * Update read pointers as needed; see API spec above. Note:
676                          * BufFileTell is quite cheap, so not worth trying to avoid
677                          * multiple calls.
678                          */
679                         readptr = state->readptrs;
680                         for (i = 0; i < state->readptrcount; readptr++, i++)
681                         {
682                                 if (readptr->eof_reached && i != state->activeptr)
683                                 {
684                                         readptr->eof_reached = false;
685                                         BufFileTell(state->myfile,
686                                                                 &readptr->file,
687                                                                 &readptr->offset);
688                                 }
689                         }
690
691                         WRITETUP(state, tuple);
692                         break;
693                 case TSS_READFILE:
694
695                         /*
696                          * Switch from reading to writing.
697                          */
698                         if (!state->readptrs[state->activeptr].eof_reached)
699                                 BufFileTell(state->myfile,
700                                                         &state->readptrs[state->activeptr].file,
701                                                         &state->readptrs[state->activeptr].offset);
702                         if (BufFileSeek(state->myfile,
703                                                         state->writepos_file, state->writepos_offset,
704                                                         SEEK_SET) != 0)
705                                 elog(ERROR, "tuplestore seek to EOF failed");
706                         state->status = TSS_WRITEFILE;
707
708                         /*
709                          * Update read pointers as needed; see API spec above.
710                          */
711                         readptr = state->readptrs;
712                         for (i = 0; i < state->readptrcount; readptr++, i++)
713                         {
714                                 if (readptr->eof_reached && i != state->activeptr)
715                                 {
716                                         readptr->eof_reached = false;
717                                         readptr->file = state->writepos_file;
718                                         readptr->offset = state->writepos_offset;
719                                 }
720                         }
721
722                         WRITETUP(state, tuple);
723                         break;
724                 default:
725                         elog(ERROR, "invalid tuplestore state");
726                         break;
727         }
728 }
729
730 /*
731  * Fetch the next tuple in either forward or back direction.
732  * Returns NULL if no more tuples.      If should_free is set, the
733  * caller must pfree the returned tuple when done with it.
734  *
735  * Backward scan is only allowed if randomAccess was set true or
736  * EXEC_FLAG_BACKWARD was specified to tuplestore_set_eflags().
737  */
738 static void *
739 tuplestore_gettuple(Tuplestorestate *state, bool forward,
740                                         bool *should_free)
741 {
742         TSReadPointer *readptr = &state->readptrs[state->activeptr];
743         unsigned int tuplen;
744         void       *tup;
745
746         Assert(forward || (readptr->eflags & EXEC_FLAG_BACKWARD));
747
748         switch (state->status)
749         {
750                 case TSS_INMEM:
751                         *should_free = false;
752                         if (forward)
753                         {
754                                 if (readptr->eof_reached)
755                                         return NULL;
756                                 if (readptr->current < state->memtupcount)
757                                 {
758                                         /* We have another tuple, so return it */
759                                         return state->memtuples[readptr->current++];
760                                 }
761                                 readptr->eof_reached = true;
762                                 return NULL;
763                         }
764                         else
765                         {
766                                 /*
767                                  * if all tuples are fetched already then we return last
768                                  * tuple, else tuple before last returned.
769                                  */
770                                 if (readptr->eof_reached)
771                                 {
772                                         readptr->current = state->memtupcount;
773                                         readptr->eof_reached = false;
774                                 }
775                                 else
776                                 {
777                                         if (readptr->current <= 0)
778                                         {
779                                                 Assert(!state->truncated);
780                                                 return NULL;
781                                         }
782                                         readptr->current--; /* last returned tuple */
783                                 }
784                                 if (readptr->current <= 0)
785                                 {
786                                         Assert(!state->truncated);
787                                         return NULL;
788                                 }
789                                 return state->memtuples[readptr->current - 1];
790                         }
791                         break;
792
793                 case TSS_WRITEFILE:
794                         /* Skip state change if we'll just return NULL */
795                         if (readptr->eof_reached && forward)
796                                 return NULL;
797
798                         /*
799                          * Switch from writing to reading.
800                          */
801                         BufFileTell(state->myfile,
802                                                 &state->writepos_file, &state->writepos_offset);
803                         if (!readptr->eof_reached)
804                                 if (BufFileSeek(state->myfile,
805                                                                 readptr->file, readptr->offset,
806                                                                 SEEK_SET) != 0)
807                                         elog(ERROR, "tuplestore seek failed");
808                         state->status = TSS_READFILE;
809                         /* FALL THRU into READFILE case */
810
811                 case TSS_READFILE:
812                         *should_free = true;
813                         if (forward)
814                         {
815                                 if ((tuplen = getlen(state, true)) != 0)
816                                 {
817                                         tup = READTUP(state, tuplen);
818                                         return tup;
819                                 }
820                                 else
821                                 {
822                                         readptr->eof_reached = true;
823                                         return NULL;
824                                 }
825                         }
826
827                         /*
828                          * Backward.
829                          *
830                          * if all tuples are fetched already then we return last tuple,
831                          * else tuple before last returned.
832                          *
833                          * Back up to fetch previously-returned tuple's ending length
834                          * word. If seek fails, assume we are at start of file.
835                          */
836                         if (BufFileSeek(state->myfile, 0, -(long) sizeof(unsigned int),
837                                                         SEEK_CUR) != 0)
838                         {
839                                 /* even a failed backwards fetch gets you out of eof state */
840                                 readptr->eof_reached = false;
841                                 Assert(!state->truncated);
842                                 return NULL;
843                         }
844                         tuplen = getlen(state, false);
845
846                         if (readptr->eof_reached)
847                         {
848                                 readptr->eof_reached = false;
849                                 /* We will return the tuple returned before returning NULL */
850                         }
851                         else
852                         {
853                                 /*
854                                  * Back up to get ending length word of tuple before it.
855                                  */
856                                 if (BufFileSeek(state->myfile, 0,
857                                                                 -(long) (tuplen + 2 * sizeof(unsigned int)),
858                                                                 SEEK_CUR) != 0)
859                                 {
860                                         /*
861                                          * If that fails, presumably the prev tuple is the first
862                                          * in the file.  Back up so that it becomes next to read
863                                          * in forward direction (not obviously right, but that is
864                                          * what in-memory case does).
865                                          */
866                                         if (BufFileSeek(state->myfile, 0,
867                                                                         -(long) (tuplen + sizeof(unsigned int)),
868                                                                         SEEK_CUR) != 0)
869                                                 elog(ERROR, "bogus tuple length in backward scan");
870                                         Assert(!state->truncated);
871                                         return NULL;
872                                 }
873                                 tuplen = getlen(state, false);
874                         }
875
876                         /*
877                          * Now we have the length of the prior tuple, back up and read it.
878                          * Note: READTUP expects we are positioned after the initial
879                          * length word of the tuple, so back up to that point.
880                          */
881                         if (BufFileSeek(state->myfile, 0,
882                                                         -(long) tuplen,
883                                                         SEEK_CUR) != 0)
884                                 elog(ERROR, "bogus tuple length in backward scan");
885                         tup = READTUP(state, tuplen);
886                         return tup;
887
888                 default:
889                         elog(ERROR, "invalid tuplestore state");
890                         return NULL;            /* keep compiler quiet */
891         }
892 }
893
894 /*
895  * tuplestore_gettupleslot - exported function to fetch a MinimalTuple
896  *
897  * If successful, put tuple in slot and return TRUE; else, clear the slot
898  * and return FALSE.
899  *
900  * If copy is TRUE, the slot receives a copied tuple (allocated in current
901  * memory context) that will stay valid regardless of future manipulations of
902  * the tuplestore's state.  If copy is FALSE, the slot may just receive a
903  * pointer to a tuple held within the tuplestore.  The latter is more
904  * efficient but the slot contents may be corrupted if additional writes to
905  * the tuplestore occur.  (If using tuplestore_trim, see comments therein.)
906  */
907 bool
908 tuplestore_gettupleslot(Tuplestorestate *state, bool forward,
909                                                 bool copy, TupleTableSlot *slot)
910 {
911         MinimalTuple tuple;
912         bool            should_free;
913
914         tuple = (MinimalTuple) tuplestore_gettuple(state, forward, &should_free);
915
916         if (tuple)
917         {
918                 if (copy && !should_free)
919                 {
920                         tuple = heap_copy_minimal_tuple(tuple);
921                         should_free = true;
922                 }
923                 ExecStoreMinimalTuple(tuple, slot, should_free);
924                 return true;
925         }
926         else
927         {
928                 ExecClearTuple(slot);
929                 return false;
930         }
931 }
932
933 /*
934  * tuplestore_advance - exported function to adjust position without fetching
935  *
936  * We could optimize this case to avoid palloc/pfree overhead, but for the
937  * moment it doesn't seem worthwhile.  (XXX this probably needs to be
938  * reconsidered given the needs of window functions.)
939  */
940 bool
941 tuplestore_advance(Tuplestorestate *state, bool forward)
942 {
943         void       *tuple;
944         bool            should_free;
945
946         tuple = tuplestore_gettuple(state, forward, &should_free);
947
948         if (tuple)
949         {
950                 if (should_free)
951                         pfree(tuple);
952                 return true;
953         }
954         else
955         {
956                 return false;
957         }
958 }
959
960 /*
961  * dumptuples - remove tuples from memory and write to tape
962  *
963  * As a side effect, we must convert each read pointer's position from
964  * "current" to file/offset format.  But eof_reached pointers don't
965  * need to change state.
966  */
967 static void
968 dumptuples(Tuplestorestate *state)
969 {
970         int                     i;
971
972         for (i = 0;; i++)
973         {
974                 TSReadPointer *readptr = state->readptrs;
975                 int                     j;
976
977                 for (j = 0; j < state->readptrcount; readptr++, j++)
978                 {
979                         if (i == readptr->current && !readptr->eof_reached)
980                                 BufFileTell(state->myfile,
981                                                         &readptr->file, &readptr->offset);
982                 }
983                 if (i >= state->memtupcount)
984                         break;
985                 WRITETUP(state, state->memtuples[i]);
986         }
987         state->memtupcount = 0;
988 }
989
990 /*
991  * tuplestore_rescan            - rewind the active read pointer to start
992  */
993 void
994 tuplestore_rescan(Tuplestorestate *state)
995 {
996         TSReadPointer *readptr = &state->readptrs[state->activeptr];
997
998         Assert(readptr->eflags & EXEC_FLAG_REWIND);
999         Assert(!state->truncated);
1000
1001         switch (state->status)
1002         {
1003                 case TSS_INMEM:
1004                         readptr->eof_reached = false;
1005                         readptr->current = 0;
1006                         break;
1007                 case TSS_WRITEFILE:
1008                         readptr->eof_reached = false;
1009                         readptr->file = 0;
1010                         readptr->offset = 0L;
1011                         break;
1012                 case TSS_READFILE:
1013                         readptr->eof_reached = false;
1014                         if (BufFileSeek(state->myfile, 0, 0L, SEEK_SET) != 0)
1015                                 elog(ERROR, "tuplestore seek to start failed");
1016                         break;
1017                 default:
1018                         elog(ERROR, "invalid tuplestore state");
1019                         break;
1020         }
1021 }
1022
1023 /*
1024  * tuplestore_copy_read_pointer - copy a read pointer's state to another
1025  */
1026 void
1027 tuplestore_copy_read_pointer(Tuplestorestate *state,
1028                                                          int srcptr, int destptr)
1029 {
1030         TSReadPointer *sptr = &state->readptrs[srcptr];
1031         TSReadPointer *dptr = &state->readptrs[destptr];
1032
1033         Assert(srcptr >= 0 && srcptr < state->readptrcount);
1034         Assert(destptr >= 0 && destptr < state->readptrcount);
1035
1036         /* Assigning to self is a no-op */
1037         if (srcptr == destptr)
1038                 return;
1039
1040         if (dptr->eflags != sptr->eflags)
1041         {
1042                 /* Possible change of overall eflags, so copy and then recompute */
1043                 int                     eflags;
1044                 int                     i;
1045
1046                 *dptr = *sptr;
1047                 eflags = state->readptrs[0].eflags;
1048                 for (i = 1; i < state->readptrcount; i++)
1049                         eflags |= state->readptrs[i].eflags;
1050                 state->eflags = eflags;
1051         }
1052         else
1053                 *dptr = *sptr;
1054
1055         switch (state->status)
1056         {
1057                 case TSS_INMEM:
1058                 case TSS_WRITEFILE:
1059                         /* no work */
1060                         break;
1061                 case TSS_READFILE:
1062
1063                         /*
1064                          * This case is a bit tricky since the active read pointer's
1065                          * position corresponds to the seek point, not what is in its
1066                          * variables.  Assigning to the active requires a seek, and
1067                          * assigning from the active requires a tell, except when
1068                          * eof_reached.
1069                          */
1070                         if (destptr == state->activeptr)
1071                         {
1072                                 if (dptr->eof_reached)
1073                                 {
1074                                         if (BufFileSeek(state->myfile,
1075                                                                         state->writepos_file,
1076                                                                         state->writepos_offset,
1077                                                                         SEEK_SET) != 0)
1078                                                 elog(ERROR, "tuplestore seek failed");
1079                                 }
1080                                 else
1081                                 {
1082                                         if (BufFileSeek(state->myfile,
1083                                                                         dptr->file, dptr->offset,
1084                                                                         SEEK_SET) != 0)
1085                                                 elog(ERROR, "tuplestore seek failed");
1086                                 }
1087                         }
1088                         else if (srcptr == state->activeptr)
1089                         {
1090                                 if (!dptr->eof_reached)
1091                                         BufFileTell(state->myfile,
1092                                                                 &dptr->file,
1093                                                                 &dptr->offset);
1094                         }
1095                         break;
1096                 default:
1097                         elog(ERROR, "invalid tuplestore state");
1098                         break;
1099         }
1100 }
1101
1102 /*
1103  * tuplestore_trim      - remove all no-longer-needed tuples
1104  *
1105  * Calling this function authorizes the tuplestore to delete all tuples
1106  * before the oldest read pointer, if no read pointer is marked as requiring
1107  * REWIND capability.
1108  *
1109  * Note: this is obviously safe if no pointer has BACKWARD capability either.
1110  * If a pointer is marked as BACKWARD but not REWIND capable, it means that
1111  * the pointer can be moved backward but not before the oldest other read
1112  * pointer.
1113  */
1114 void
1115 tuplestore_trim(Tuplestorestate *state)
1116 {
1117         int                     oldest;
1118         int                     nremove;
1119         int                     i;
1120
1121         /*
1122          * Truncation is disallowed if any read pointer requires rewind
1123          * capability.
1124          */
1125         if (state->eflags & EXEC_FLAG_REWIND)
1126                 return;
1127
1128         /*
1129          * We don't bother trimming temp files since it usually would mean more
1130          * work than just letting them sit in kernel buffers until they age out.
1131          */
1132         if (state->status != TSS_INMEM)
1133                 return;
1134
1135         /* Find the oldest read pointer */
1136         oldest = state->memtupcount;
1137         for (i = 0; i < state->readptrcount; i++)
1138         {
1139                 if (!state->readptrs[i].eof_reached)
1140                         oldest = Min(oldest, state->readptrs[i].current);
1141         }
1142
1143         /*
1144          * Note: you might think we could remove all the tuples before the oldest
1145          * "current", since that one is the next to be returned.  However, since
1146          * tuplestore_gettuple returns a direct pointer to our internal copy of
1147          * the tuple, it's likely that the caller has still got the tuple just
1148          * before "current" referenced in a slot. So we keep one extra tuple
1149          * before the oldest "current".  (Strictly speaking, we could require such
1150          * callers to use the "copy" flag to tuplestore_gettupleslot, but for
1151          * efficiency we allow this one case to not use "copy".)
1152          */
1153         nremove = oldest - 1;
1154         if (nremove <= 0)
1155                 return;                                 /* nothing to do */
1156         Assert(nremove <= state->memtupcount);
1157
1158         /* Release no-longer-needed tuples */
1159         for (i = 0; i < nremove; i++)
1160         {
1161                 FREEMEM(state, GetMemoryChunkSpace(state->memtuples[i]));
1162                 pfree(state->memtuples[i]);
1163         }
1164
1165         /*
1166          * Slide the array down and readjust pointers.  This may look pretty
1167          * stupid, but we expect that there will usually not be very many
1168          * tuple-pointers to move, so this isn't that expensive; and it keeps a
1169          * lot of other logic simple.
1170          *
1171          * In fact, in the current usage for merge joins, it's demonstrable that
1172          * there will always be exactly one non-removed tuple; so optimize that
1173          * case.
1174          */
1175         if (nremove + 1 == state->memtupcount)
1176                 state->memtuples[0] = state->memtuples[nremove];
1177         else
1178                 memmove(state->memtuples, state->memtuples + nremove,
1179                                 (state->memtupcount - nremove) * sizeof(void *));
1180
1181         state->memtupcount -= nremove;
1182         for (i = 0; i < state->readptrcount; i++)
1183         {
1184                 if (!state->readptrs[i].eof_reached)
1185                         state->readptrs[i].current -= nremove;
1186         }
1187
1188         /* mark tuplestore as truncated (used for Assert crosschecks only) */
1189         state->truncated = true;
1190 }
1191
1192 /*
1193  * tuplestore_in_memory
1194  *
1195  * Returns true if the tuplestore has not spilled to disk.
1196  *
1197  * XXX exposing this is a violation of modularity ... should get rid of it.
1198  */
1199 bool
1200 tuplestore_in_memory(Tuplestorestate *state)
1201 {
1202         return (state->status == TSS_INMEM);
1203 }
1204
1205
1206 /*
1207  * Tape interface routines
1208  */
1209
1210 static unsigned int
1211 getlen(Tuplestorestate *state, bool eofOK)
1212 {
1213         unsigned int len;
1214         size_t          nbytes;
1215
1216         nbytes = BufFileRead(state->myfile, (void *) &len, sizeof(len));
1217         if (nbytes == sizeof(len))
1218                 return len;
1219         if (nbytes != 0)
1220                 elog(ERROR, "unexpected end of tape");
1221         if (!eofOK)
1222                 elog(ERROR, "unexpected end of data");
1223         return 0;
1224 }
1225
1226
1227 /*
1228  * Routines specialized for HeapTuple case
1229  *
1230  * The stored form is actually a MinimalTuple, but for largely historical
1231  * reasons we allow COPYTUP to work from a HeapTuple.
1232  *
1233  * Since MinimalTuple already has length in its first word, we don't need
1234  * to write that separately.
1235  */
1236
1237 static void *
1238 copytup_heap(Tuplestorestate *state, void *tup)
1239 {
1240         MinimalTuple tuple;
1241
1242         tuple = minimal_tuple_from_heap_tuple((HeapTuple) tup);
1243         USEMEM(state, GetMemoryChunkSpace(tuple));
1244         return (void *) tuple;
1245 }
1246
1247 static void
1248 writetup_heap(Tuplestorestate *state, void *tup)
1249 {
1250         MinimalTuple tuple = (MinimalTuple) tup;
1251
1252         /* the part of the MinimalTuple we'll write: */
1253         char       *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
1254         unsigned int tupbodylen = tuple->t_len - MINIMAL_TUPLE_DATA_OFFSET;
1255
1256         /* total on-disk footprint: */
1257         unsigned int tuplen = tupbodylen + sizeof(int);
1258
1259         if (BufFileWrite(state->myfile, (void *) &tuplen,
1260                                          sizeof(tuplen)) != sizeof(tuplen))
1261                 elog(ERROR, "write failed");
1262         if (BufFileWrite(state->myfile, (void *) tupbody,
1263                                          tupbodylen) != (size_t) tupbodylen)
1264                 elog(ERROR, "write failed");
1265         if (state->backward)            /* need trailing length word? */
1266                 if (BufFileWrite(state->myfile, (void *) &tuplen,
1267                                                  sizeof(tuplen)) != sizeof(tuplen))
1268                         elog(ERROR, "write failed");
1269
1270         FREEMEM(state, GetMemoryChunkSpace(tuple));
1271         heap_free_minimal_tuple(tuple);
1272 }
1273
1274 static void *
1275 readtup_heap(Tuplestorestate *state, unsigned int len)
1276 {
1277         unsigned int tupbodylen = len - sizeof(int);
1278         unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET;
1279         MinimalTuple tuple = (MinimalTuple) palloc(tuplen);
1280         char       *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
1281
1282         USEMEM(state, GetMemoryChunkSpace(tuple));
1283         /* read in the tuple proper */
1284         tuple->t_len = tuplen;
1285         if (BufFileRead(state->myfile, (void *) tupbody,
1286                                         tupbodylen) != (size_t) tupbodylen)
1287                 elog(ERROR, "unexpected end of data");
1288         if (state->backward)            /* need trailing length word? */
1289                 if (BufFileRead(state->myfile, (void *) &tuplen,
1290                                                 sizeof(tuplen)) != sizeof(tuplen))
1291                         elog(ERROR, "unexpected end of data");
1292         return (void *) tuple;
1293 }