]> granicus.if.org Git - postgresql/blob - src/backend/utils/sort/tuplestore.c
Arrange to squeeze out the MINIMAL_TUPLE_PADDING in the tuple representation
[postgresql] / src / backend / utils / sort / tuplestore.c
1 /*-------------------------------------------------------------------------
2  *
3  * tuplestore.c
4  *        Generalized routines for temporary tuple storage.
5  *
6  * This module handles temporary storage of tuples for purposes such
7  * as Materialize nodes, hashjoin batch files, etc.  It is essentially
8  * a dumbed-down version of tuplesort.c; it does no sorting of tuples
9  * but can only store and regurgitate a sequence of tuples.  However,
10  * because no sort is required, it is allowed to start reading the sequence
11  * before it has all been written.      This is particularly useful for cursors,
12  * because it allows random access within the already-scanned portion of
13  * a query without having to process the underlying scan to completion.
14  * Also, it is possible to support multiple independent read pointers.
15  *
16  * A temporary file is used to handle the data if it exceeds the
17  * space limit specified by the caller.
18  *
19  * The (approximate) amount of memory allowed to the tuplestore is specified
20  * in kilobytes by the caller.  We absorb tuples and simply store them in an
21  * in-memory array as long as we haven't exceeded maxKBytes.  If we do exceed
22  * maxKBytes, we dump all the tuples into a temp file and then read from that
23  * when needed.
24  *
25  * Upon creation, a tuplestore supports a single read pointer, numbered 0.
26  * Additional read pointers can be created using tuplestore_alloc_read_pointer.
27  * Mark/restore behavior is supported by copying read pointers.
28  *
29  * When the caller requests backward-scan capability, we write the temp file
30  * in a format that allows either forward or backward scan.  Otherwise, only
31  * forward scan is allowed.  A request for backward scan must be made before
32  * putting any tuples into the tuplestore.  Rewind is normally allowed but
33  * can be turned off via tuplestore_set_eflags; turning off both backward
34  * scan and rewind for all read pointers enables truncation of the tuplestore
35  * at the oldest read point for minimal memory usage.
36  *
37  * Note: in TSS_WRITEFILE state, the temp file's seek position is the
38  * current write position, and the write-position variables in the tuplestore
39  * aren't kept up to date.  Similarly, in TSS_READFILE state the temp file's
40  * seek position is the active read pointer's position, and that read pointer
41  * isn't kept up to date.  We update the appropriate variables using ftell()
42  * before switching to the other state or activating a different read pointer.
43  *
44  *
45  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
46  * Portions Copyright (c) 1994, Regents of the University of California
47  *
48  * IDENTIFICATION
49  *        $PostgreSQL: pgsql/src/backend/utils/sort/tuplestore.c,v 1.43 2008/10/28 15:51:03 tgl Exp $
50  *
51  *-------------------------------------------------------------------------
52  */
53
54 #include "postgres.h"
55
56 #include "commands/tablespace.h"
57 #include "executor/executor.h"
58 #include "storage/buffile.h"
59 #include "utils/memutils.h"
60 #include "utils/tuplestore.h"
61
62
63 /*
64  * Possible states of a Tuplestore object.      These denote the states that
65  * persist between calls of Tuplestore routines.
66  */
67 typedef enum
68 {
69         TSS_INMEM,                                      /* Tuples still fit in memory */
70         TSS_WRITEFILE,                          /* Writing to temp file */
71         TSS_READFILE                            /* Reading from temp file */
72 } TupStoreStatus;
73
74 /*
75  * State for a single read pointer.  If we are in state INMEM then all the
76  * read pointers' "current" fields denote the read positions.  In state
77  * WRITEFILE, the file/offset fields denote the read positions.  In state
78  * READFILE, inactive read pointers have valid file/offset, but the active
79  * read pointer implicitly has position equal to the temp file's seek position.
80  *
81  * Special case: if eof_reached is true, then the pointer's read position is
82  * implicitly equal to the write position, and current/file/offset aren't
83  * maintained.  This way we need not update all the read pointers each time
84  * we write.
85  */
86 typedef struct
87 {
88         int                     eflags;                 /* capability flags */
89         bool            eof_reached;    /* read has reached EOF */
90         int                     current;                /* next array index to read */
91         int                     file;                   /* temp file# */
92         off_t           offset;                 /* byte offset in file */
93 } TSReadPointer;
94
95 /*
96  * Private state of a Tuplestore operation.
97  */
98 struct Tuplestorestate
99 {
100         TupStoreStatus status;          /* enumerated value as shown above */
101         int                     eflags;                 /* capability flags (OR of pointers' flags) */
102         bool            backward;               /* store extra length words in file? */
103         bool            interXact;              /* keep open through transactions? */
104         long            availMem;               /* remaining memory available, in bytes */
105         BufFile    *myfile;                     /* underlying file, or NULL if none */
106
107         /*
108          * These function pointers decouple the routines that must know what kind
109          * of tuple we are handling from the routines that don't need to know it.
110          * They are set up by the tuplestore_begin_xxx routines.
111          *
112          * (Although tuplestore.c currently only supports heap tuples, I've copied
113          * this part of tuplesort.c so that extension to other kinds of objects
114          * will be easy if it's ever needed.)
115          *
116          * Function to copy a supplied input tuple into palloc'd space. (NB: we
117          * assume that a single pfree() is enough to release the tuple later, so
118          * the representation must be "flat" in one palloc chunk.) state->availMem
119          * must be decreased by the amount of space used.
120          */
121         void       *(*copytup) (Tuplestorestate *state, void *tup);
122
123         /*
124          * Function to write a stored tuple onto tape.  The representation of the
125          * tuple on tape need not be the same as it is in memory; requirements on
126          * the tape representation are given below.  After writing the tuple,
127          * pfree() it, and increase state->availMem by the amount of memory space
128          * thereby released.
129          */
130         void            (*writetup) (Tuplestorestate *state, void *tup);
131
132         /*
133          * Function to read a stored tuple from tape back into memory. 'len' is
134          * the already-read length of the stored tuple.  Create and return a
135          * palloc'd copy, and decrease state->availMem by the amount of memory
136          * space consumed.
137          */
138         void       *(*readtup) (Tuplestorestate *state, unsigned int len);
139
140         /*
141          * This array holds pointers to tuples in memory if we are in state INMEM.
142          * In states WRITEFILE and READFILE it's not used.
143          */
144         void      **memtuples;          /* array of pointers to palloc'd tuples */
145         int                     memtupcount;    /* number of tuples currently present */
146         int                     memtupsize;             /* allocated length of memtuples array */
147
148         /*
149          * These variables are used to keep track of the current positions.
150          *
151          * In state WRITEFILE, the current file seek position is the write point;
152          * in state READFILE, the write position is remembered in writepos_xxx.
153          * (The write position is the same as EOF, but since BufFileSeek doesn't
154          * currently implement SEEK_END, we have to remember it explicitly.)
155          */
156         TSReadPointer *readptrs;        /* array of read pointers */
157         int                     activeptr;              /* index of the active read pointer */
158         int                     readptrcount;   /* number of pointers currently valid */
159         int                     readptrsize;    /* allocated length of readptrs array */
160
161         int                     writepos_file;  /* file# (valid if READFILE state) */
162         off_t           writepos_offset; /* offset (valid if READFILE state) */
163 };
164
165 #define COPYTUP(state,tup)      ((*(state)->copytup) (state, tup))
166 #define WRITETUP(state,tup) ((*(state)->writetup) (state, tup))
167 #define READTUP(state,len)      ((*(state)->readtup) (state, len))
168 #define LACKMEM(state)          ((state)->availMem < 0)
169 #define USEMEM(state,amt)       ((state)->availMem -= (amt))
170 #define FREEMEM(state,amt)      ((state)->availMem += (amt))
171
172 /*--------------------
173  *
174  * NOTES about on-tape representation of tuples:
175  *
176  * We require the first "unsigned int" of a stored tuple to be the total size
177  * on-tape of the tuple, including itself (so it is never zero).
178  * The remainder of the stored tuple
179  * may or may not match the in-memory representation of the tuple ---
180  * any conversion needed is the job of the writetup and readtup routines.
181  *
182  * If state->backward is true, then the stored representation of
183  * the tuple must be followed by another "unsigned int" that is a copy of the
184  * length --- so the total tape space used is actually sizeof(unsigned int)
185  * more than the stored length value.  This allows read-backwards.      When
186  * state->backward is not set, the write/read routines may omit the extra
187  * length word.
188  *
189  * writetup is expected to write both length words as well as the tuple
190  * data.  When readtup is called, the tape is positioned just after the
191  * front length word; readtup must read the tuple data and advance past
192  * the back length word (if present).
193  *
194  * The write/read routines can make use of the tuple description data
195  * stored in the Tuplestorestate record, if needed. They are also expected
196  * to adjust state->availMem by the amount of memory space (not tape space!)
197  * released or consumed.  There is no error return from either writetup
198  * or readtup; they should ereport() on failure.
199  *
200  *
201  * NOTES about memory consumption calculations:
202  *
203  * We count space allocated for tuples against the maxKBytes limit,
204  * plus the space used by the variable-size array memtuples.
205  * Fixed-size space (primarily the BufFile I/O buffer) is not counted.
206  * We don't worry about the size of the read pointer array, either.
207  *
208  * Note that we count actual space used (as shown by GetMemoryChunkSpace)
209  * rather than the originally-requested size.  This is important since
210  * palloc can add substantial overhead.  It's not a complete answer since
211  * we won't count any wasted space in palloc allocation blocks, but it's
212  * a lot better than what we were doing before 7.3.
213  *
214  *--------------------
215  */
216
217
218 static Tuplestorestate *tuplestore_begin_common(int eflags,
219                                                 bool interXact,
220                                                 int maxKBytes);
221 static void tuplestore_puttuple_common(Tuplestorestate *state, void *tuple);
222 static void dumptuples(Tuplestorestate *state);
223 static void tuplestore_trim(Tuplestorestate *state);
224 static unsigned int getlen(Tuplestorestate *state, bool eofOK);
225 static void *copytup_heap(Tuplestorestate *state, void *tup);
226 static void writetup_heap(Tuplestorestate *state, void *tup);
227 static void *readtup_heap(Tuplestorestate *state, unsigned int len);
228
229
230 /*
231  *              tuplestore_begin_xxx
232  *
233  * Initialize for a tuple store operation.
234  */
235 static Tuplestorestate *
236 tuplestore_begin_common(int eflags, bool interXact, int maxKBytes)
237 {
238         Tuplestorestate *state;
239
240         state = (Tuplestorestate *) palloc0(sizeof(Tuplestorestate));
241
242         state->status = TSS_INMEM;
243         state->eflags = eflags;
244         state->interXact = interXact;
245         state->availMem = maxKBytes * 1024L;
246         state->myfile = NULL;
247
248         state->memtupcount = 0;
249         state->memtupsize = 1024;       /* initial guess */
250         state->memtuples = (void **) palloc(state->memtupsize * sizeof(void *));
251
252         USEMEM(state, GetMemoryChunkSpace(state->memtuples));
253
254         state->activeptr = 0;
255         state->readptrcount = 1;
256         state->readptrsize = 8;         /* arbitrary */
257         state->readptrs = (TSReadPointer *)
258                 palloc(state->readptrsize * sizeof(TSReadPointer));
259
260         state->readptrs[0].eflags = eflags;
261         state->readptrs[0].eof_reached = false;
262         state->readptrs[0].current = 0;
263
264         return state;
265 }
266
267 /*
268  * tuplestore_begin_heap
269  *
270  * Create a new tuplestore; other types of tuple stores (other than
271  * "heap" tuple stores, for heap tuples) are possible, but not presently
272  * implemented.
273  *
274  * randomAccess: if true, both forward and backward accesses to the
275  * tuple store are allowed.
276  *
277  * interXact: if true, the files used for on-disk storage persist beyond the
278  * end of the current transaction.      NOTE: It's the caller's responsibility to
279  * create such a tuplestore in a memory context that will also survive
280  * transaction boundaries, and to ensure the tuplestore is closed when it's
281  * no longer wanted.
282  *
283  * maxKBytes: how much data to store in memory (any data beyond this
284  * amount is paged to disk).  When in doubt, use work_mem.
285  */
286 Tuplestorestate *
287 tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
288 {
289         Tuplestorestate *state;
290         int                     eflags;
291
292         /*
293          * This interpretation of the meaning of randomAccess is compatible with
294          * the pre-8.3 behavior of tuplestores.
295          */
296         eflags = randomAccess ?
297                 (EXEC_FLAG_BACKWARD | EXEC_FLAG_REWIND) :
298                 (EXEC_FLAG_REWIND);
299
300         state = tuplestore_begin_common(eflags, interXact, maxKBytes);
301
302         state->copytup = copytup_heap;
303         state->writetup = writetup_heap;
304         state->readtup = readtup_heap;
305
306         return state;
307 }
308
309 /*
310  * tuplestore_set_eflags
311  *
312  * Set the capability flags for read pointer 0 at a finer grain than is
313  * allowed by tuplestore_begin_xxx.  This must be called before inserting
314  * any data into the tuplestore.
315  *
316  * eflags is a bitmask following the meanings used for executor node
317  * startup flags (see executor.h).      tuplestore pays attention to these bits:
318  *              EXEC_FLAG_REWIND                need rewind to start
319  *              EXEC_FLAG_BACKWARD              need backward fetch
320  * If tuplestore_set_eflags is not called, REWIND is allowed, and BACKWARD
321  * is set per "randomAccess" in the tuplestore_begin_xxx call.
322  */
323 void
324 tuplestore_set_eflags(Tuplestorestate *state, int eflags)
325 {
326         int                     i;
327
328         if (state->status != TSS_INMEM || state->memtupcount != 0)
329                 elog(ERROR, "too late to call tuplestore_set_eflags");
330
331         state->readptrs[0].eflags = eflags;
332         for (i = 1; i < state->readptrcount; i++)
333                 eflags |= state->readptrs[i].eflags;
334         state->eflags = eflags;
335 }
336
337 /*
338  * tuplestore_alloc_read_pointer - allocate another read pointer.
339  *
340  * Returns the pointer's index.
341  *
342  * The new pointer initially copies the position of read pointer 0.
343  * It can have its own eflags, but if any data has been inserted into
344  * the tuplestore, these eflags must not represent an increase in
345  * requirements.
346  */
347 int
348 tuplestore_alloc_read_pointer(Tuplestorestate *state, int eflags)
349 {
350         /* Check for possible increase of requirements */
351         if (state->status != TSS_INMEM || state->memtupcount != 0)
352         {
353                 if ((state->eflags | eflags) != state->eflags)
354                         elog(ERROR, "too late to require new tuplestore eflags");
355         }
356
357         /* Make room for another read pointer if needed */
358         if (state->readptrcount >= state->readptrsize)
359         {
360                 int             newcnt = state->readptrsize * 2;
361
362                 state->readptrs = (TSReadPointer *)
363                         repalloc(state->readptrs, newcnt * sizeof(TSReadPointer));
364                 state->readptrsize = newcnt;
365         }
366
367         /* And set it up */
368         state->readptrs[state->readptrcount] = state->readptrs[0];
369         state->readptrs[state->readptrcount].eflags = eflags;
370
371         state->eflags |= eflags;
372
373         return state->readptrcount++;
374 }
375
376 /*
377  * tuplestore_clear
378  *
379  *      Delete all the contents of a tuplestore, and reset its read pointers
380  *      to the start.
381  */
382 void
383 tuplestore_clear(Tuplestorestate *state)
384 {
385         int                     i;
386         TSReadPointer *readptr;
387
388         if (state->myfile)
389                 BufFileClose(state->myfile);
390         state->myfile = NULL;
391         if (state->memtuples)
392         {
393                 for (i = 0; i < state->memtupcount; i++)
394                 {
395                         FREEMEM(state, GetMemoryChunkSpace(state->memtuples[i]));
396                         pfree(state->memtuples[i]);
397                 }
398         }
399         state->status = TSS_INMEM;
400         state->memtupcount = 0;
401         readptr = state->readptrs;
402         for (i = 0; i < state->readptrcount; readptr++, i++)
403         {
404                 readptr->eof_reached = false;
405                 readptr->current = 0;
406         }
407 }
408
409 /*
410  * tuplestore_end
411  *
412  *      Release resources and clean up.
413  */
414 void
415 tuplestore_end(Tuplestorestate *state)
416 {
417         int                     i;
418
419         if (state->myfile)
420                 BufFileClose(state->myfile);
421         if (state->memtuples)
422         {
423                 for (i = 0; i < state->memtupcount; i++)
424                         pfree(state->memtuples[i]);
425                 pfree(state->memtuples);
426         }
427         pfree(state->readptrs);
428         pfree(state);
429 }
430
431 /*
432  * tuplestore_select_read_pointer - make the specified read pointer active
433  */
434 void
435 tuplestore_select_read_pointer(Tuplestorestate *state, int ptr)
436 {
437         TSReadPointer *readptr;
438         TSReadPointer *oldptr;
439
440         Assert(ptr >= 0 && ptr < state->readptrcount);
441
442         /* No work if already active */
443         if (ptr == state->activeptr)
444                 return;
445
446         readptr = &state->readptrs[ptr];
447         oldptr = &state->readptrs[state->activeptr];
448
449         switch (state->status)
450         {
451                 case TSS_INMEM:
452                 case TSS_WRITEFILE:
453                         /* no work */
454                         break;
455                 case TSS_READFILE:
456                         /*
457                          * First, save the current read position in the pointer about
458                          * to become inactive.
459                          */
460                         if (!oldptr->eof_reached)
461                                 BufFileTell(state->myfile,
462                                                         &oldptr->file,
463                                                         &oldptr->offset);
464
465                         /*
466                          * We have to make the temp file's seek position equal to the
467                          * logical position of the new read pointer.  In eof_reached
468                          * state, that's the EOF, which we have available from the saved
469                          * write position.
470                          */
471                         if (readptr->eof_reached)
472                         {
473                                 if (BufFileSeek(state->myfile,
474                                                                 state->writepos_file,
475                                                                 state->writepos_offset,
476                                                                 SEEK_SET) != 0)
477                                         elog(ERROR, "tuplestore seek failed");
478                         }
479                         else
480                         {
481                                 if (BufFileSeek(state->myfile,
482                                                                 readptr->file,
483                                                                 readptr->offset,
484                                                                 SEEK_SET) != 0)
485                                         elog(ERROR, "tuplestore seek failed");
486                         }
487                         break;
488                 default:
489                         elog(ERROR, "invalid tuplestore state");
490                         break;
491         }
492
493         state->activeptr = ptr;
494 }
495
496 /*
497  * tuplestore_ateof
498  *
499  * Returns the active read pointer's eof_reached state.
500  */
501 bool
502 tuplestore_ateof(Tuplestorestate *state)
503 {
504         return state->readptrs[state->activeptr].eof_reached;
505 }
506
507 /*
508  * Accept one tuple and append it to the tuplestore.
509  *
510  * Note that the input tuple is always copied; the caller need not save it.
511  *
512  * If the active read pointer is currently "at EOF", it remains so (the read
513  * pointer implicitly advances along with the write pointer); otherwise the
514  * read pointer is unchanged.  Non-active read pointers do not move, which
515  * means they are certain to not be "at EOF" immediately after puttuple.
516  * This curious-seeming behavior is for the convenience of nodeMaterial.c and
517  * nodeCtescan.c, which would otherwise need to do extra pointer repositioning
518  * steps.
519  *
520  * tuplestore_puttupleslot() is a convenience routine to collect data from
521  * a TupleTableSlot without an extra copy operation.
522  */
523 void
524 tuplestore_puttupleslot(Tuplestorestate *state,
525                                                 TupleTableSlot *slot)
526 {
527         MinimalTuple tuple;
528
529         /*
530          * Form a MinimalTuple in working memory
531          */
532         tuple = ExecCopySlotMinimalTuple(slot);
533         USEMEM(state, GetMemoryChunkSpace(tuple));
534
535         tuplestore_puttuple_common(state, (void *) tuple);
536 }
537
538 /*
539  * "Standard" case to copy from a HeapTuple.  This is actually now somewhat
540  * deprecated, but not worth getting rid of in view of the number of callers.
541  */
542 void
543 tuplestore_puttuple(Tuplestorestate *state, HeapTuple tuple)
544 {
545         /*
546          * Copy the tuple.      (Must do this even in WRITEFILE case.)
547          */
548         tuple = COPYTUP(state, tuple);
549
550         tuplestore_puttuple_common(state, (void *) tuple);
551 }
552
553 /*
554  * Similar to tuplestore_puttuple(), but start from the values + nulls
555  * array. This avoids requiring that the caller construct a HeapTuple,
556  * saving a copy.
557  */
558 void
559 tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc,
560                                          Datum *values, bool *isnull)
561 {
562         MinimalTuple tuple;
563
564         tuple = heap_form_minimal_tuple(tdesc, values, isnull);
565
566         tuplestore_puttuple_common(state, (void *) tuple);
567 }
568
569 static void
570 tuplestore_puttuple_common(Tuplestorestate *state, void *tuple)
571 {
572         TSReadPointer *readptr;
573         int                     i;
574
575         switch (state->status)
576         {
577                 case TSS_INMEM:
578
579                         /*
580                          * Update read pointers as needed; see API spec above.
581                          */
582                         readptr = state->readptrs;
583                         for (i = 0; i < state->readptrcount; readptr++, i++)
584                         {
585                                 if (readptr->eof_reached && i != state->activeptr)
586                                 {
587                                         readptr->eof_reached = false;
588                                         readptr->current = state->memtupcount;
589                                 }
590                         }
591
592                         /*
593                          * Grow the array as needed.  Note that we try to grow the array
594                          * when there is still one free slot remaining --- if we fail,
595                          * there'll still be room to store the incoming tuple, and then
596                          * we'll switch to tape-based operation.
597                          */
598                         if (state->memtupcount >= state->memtupsize - 1)
599                         {
600                                 /*
601                                  * See grow_memtuples() in tuplesort.c for the rationale
602                                  * behind these two tests.
603                                  */
604                                 if (state->availMem > (long) (state->memtupsize * sizeof(void *)) &&
605                                         (Size) (state->memtupsize * 2) < MaxAllocSize / sizeof(void *))
606                                 {
607                                         FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
608                                         state->memtupsize *= 2;
609                                         state->memtuples = (void **)
610                                                 repalloc(state->memtuples,
611                                                                  state->memtupsize * sizeof(void *));
612                                         USEMEM(state, GetMemoryChunkSpace(state->memtuples));
613                                 }
614                         }
615
616                         /* Stash the tuple in the in-memory array */
617                         state->memtuples[state->memtupcount++] = tuple;
618
619                         /*
620                          * Done if we still fit in available memory and have array slots.
621                          */
622                         if (state->memtupcount < state->memtupsize && !LACKMEM(state))
623                                 return;
624
625                         /*
626                          * Nope; time to switch to tape-based operation.  Make sure that
627                          * the temp file(s) are created in suitable temp tablespaces.
628                          */
629                         PrepareTempTablespaces();
630                         state->myfile = BufFileCreateTemp(state->interXact);
631                         /*
632                          * Freeze the decision about whether trailing length words
633                          * will be used.  We can't change this choice once data is on
634                          * tape, even though callers might drop the requirement.
635                          */
636                         state->backward = (state->eflags & EXEC_FLAG_BACKWARD) != 0;
637                         state->status = TSS_WRITEFILE;
638                         dumptuples(state);
639                         break;
640                 case TSS_WRITEFILE:
641
642                         /*
643                          * Update read pointers as needed; see API spec above.
644                          * Note: BufFileTell is quite cheap, so not worth trying
645                          * to avoid multiple calls.
646                          */
647                         readptr = state->readptrs;
648                         for (i = 0; i < state->readptrcount; readptr++, i++)
649                         {
650                                 if (readptr->eof_reached && i != state->activeptr)
651                                 {
652                                         readptr->eof_reached = false;
653                                         BufFileTell(state->myfile,
654                                                                 &readptr->file,
655                                                                 &readptr->offset);
656                                 }
657                         }
658
659                         WRITETUP(state, tuple);
660                         break;
661                 case TSS_READFILE:
662
663                         /*
664                          * Switch from reading to writing.
665                          */
666                         if (!state->readptrs[state->activeptr].eof_reached)
667                                 BufFileTell(state->myfile,
668                                                         &state->readptrs[state->activeptr].file,
669                                                         &state->readptrs[state->activeptr].offset);
670                         if (BufFileSeek(state->myfile,
671                                                         state->writepos_file, state->writepos_offset,
672                                                         SEEK_SET) != 0)
673                                 elog(ERROR, "tuplestore seek to EOF failed");
674                         state->status = TSS_WRITEFILE;
675
676                         /*
677                          * Update read pointers as needed; see API spec above.
678                          */
679                         readptr = state->readptrs;
680                         for (i = 0; i < state->readptrcount; readptr++, i++)
681                         {
682                                 if (readptr->eof_reached && i != state->activeptr)
683                                 {
684                                         readptr->eof_reached = false;
685                                         readptr->file = state->writepos_file;
686                                         readptr->offset = state->writepos_offset;
687                                 }
688                         }
689
690                         WRITETUP(state, tuple);
691                         break;
692                 default:
693                         elog(ERROR, "invalid tuplestore state");
694                         break;
695         }
696 }
697
698 /*
699  * Fetch the next tuple in either forward or back direction.
700  * Returns NULL if no more tuples.      If should_free is set, the
701  * caller must pfree the returned tuple when done with it.
702  *
703  * Backward scan is only allowed if randomAccess was set true or
704  * EXEC_FLAG_BACKWARD was specified to tuplestore_set_eflags().
705  */
706 static void *
707 tuplestore_gettuple(Tuplestorestate *state, bool forward,
708                                         bool *should_free)
709 {
710         TSReadPointer *readptr = &state->readptrs[state->activeptr];
711         unsigned int tuplen;
712         void       *tup;
713
714         Assert(forward || (readptr->eflags & EXEC_FLAG_BACKWARD));
715
716         switch (state->status)
717         {
718                 case TSS_INMEM:
719                         *should_free = false;
720                         if (forward)
721                         {
722                                 if (readptr->eof_reached)
723                                         return NULL;
724                                 if (readptr->current < state->memtupcount)
725                                 {
726                                         /*
727                                          * We have another tuple, so return it.  Note: in
728                                          * principle we could try tuplestore_trim() here after
729                                          * advancing current, but this would cost cycles with
730                                          * little chance of success, so we don't bother.
731                                          */
732                                         return state->memtuples[readptr->current++];
733                                 }
734                                 readptr->eof_reached = true;
735                                 return NULL;
736                         }
737                         else
738                         {
739                                 /*
740                                  * if all tuples are fetched already then we return last
741                                  * tuple, else - tuple before last returned.
742                                  */
743                                 if (readptr->eof_reached)
744                                 {
745                                         readptr->current = state->memtupcount;
746                                         readptr->eof_reached = false;
747                                 }
748                                 else
749                                 {
750                                         if (readptr->current <= 0)
751                                                 return NULL;
752                                         readptr->current--;     /* last returned tuple */
753                                 }
754                                 if (readptr->current <= 0)
755                                         return NULL;
756                                 return state->memtuples[readptr->current - 1];
757                         }
758                         break;
759
760                 case TSS_WRITEFILE:
761                         /* Skip state change if we'll just return NULL */
762                         if (readptr->eof_reached && forward)
763                                 return NULL;
764
765                         /*
766                          * Switch from writing to reading.
767                          */
768                         BufFileTell(state->myfile,
769                                                 &state->writepos_file, &state->writepos_offset);
770                         if (!readptr->eof_reached)
771                                 if (BufFileSeek(state->myfile,
772                                                                 readptr->file, readptr->offset,
773                                                                 SEEK_SET) != 0)
774                                         elog(ERROR, "tuplestore seek failed");
775                         state->status = TSS_READFILE;
776                         /* FALL THRU into READFILE case */
777
778                 case TSS_READFILE:
779                         *should_free = true;
780                         if (forward)
781                         {
782                                 if ((tuplen = getlen(state, true)) != 0)
783                                 {
784                                         tup = READTUP(state, tuplen);
785                                         return tup;
786                                 }
787                                 else
788                                 {
789                                         readptr->eof_reached = true;
790                                         return NULL;
791                                 }
792                         }
793
794                         /*
795                          * Backward.
796                          *
797                          * if all tuples are fetched already then we return last tuple,
798                          * else - tuple before last returned.
799                          *
800                          * Back up to fetch previously-returned tuple's ending length
801                          * word. If seek fails, assume we are at start of file.
802                          */
803                         if (BufFileSeek(state->myfile, 0, -(long) sizeof(unsigned int),
804                                                         SEEK_CUR) != 0)
805                         {
806                                 /* even a failed backwards fetch gets you out of eof state */
807                                 readptr->eof_reached = false;
808                                 return NULL;
809                         }
810                         tuplen = getlen(state, false);
811
812                         if (readptr->eof_reached)
813                         {
814                                 readptr->eof_reached = false;
815                                 /* We will return the tuple returned before returning NULL */
816                         }
817                         else
818                         {
819                                 /*
820                                  * Back up to get ending length word of tuple before it.
821                                  */
822                                 if (BufFileSeek(state->myfile, 0,
823                                                                 -(long) (tuplen + 2 * sizeof(unsigned int)),
824                                                                 SEEK_CUR) != 0)
825                                 {
826                                         /*
827                                          * If that fails, presumably the prev tuple is the first
828                                          * in the file.  Back up so that it becomes next to read
829                                          * in forward direction (not obviously right, but that is
830                                          * what in-memory case does).
831                                          */
832                                         if (BufFileSeek(state->myfile, 0,
833                                                                         -(long) (tuplen + sizeof(unsigned int)),
834                                                                         SEEK_CUR) != 0)
835                                                 elog(ERROR, "bogus tuple length in backward scan");
836                                         return NULL;
837                                 }
838                                 tuplen = getlen(state, false);
839                         }
840
841                         /*
842                          * Now we have the length of the prior tuple, back up and read it.
843                          * Note: READTUP expects we are positioned after the initial
844                          * length word of the tuple, so back up to that point.
845                          */
846                         if (BufFileSeek(state->myfile, 0,
847                                                         -(long) tuplen,
848                                                         SEEK_CUR) != 0)
849                                 elog(ERROR, "bogus tuple length in backward scan");
850                         tup = READTUP(state, tuplen);
851                         return tup;
852
853                 default:
854                         elog(ERROR, "invalid tuplestore state");
855                         return NULL;            /* keep compiler quiet */
856         }
857 }
858
859 /*
860  * tuplestore_gettupleslot - exported function to fetch a MinimalTuple
861  *
862  * If successful, put tuple in slot and return TRUE; else, clear the slot
863  * and return FALSE.
864  */
865 bool
866 tuplestore_gettupleslot(Tuplestorestate *state, bool forward,
867                                                 TupleTableSlot *slot)
868 {
869         MinimalTuple tuple;
870         bool            should_free;
871
872         tuple = (MinimalTuple) tuplestore_gettuple(state, forward, &should_free);
873
874         if (tuple)
875         {
876                 ExecStoreMinimalTuple(tuple, slot, should_free);
877                 return true;
878         }
879         else
880         {
881                 ExecClearTuple(slot);
882                 return false;
883         }
884 }
885
886 /*
887  * tuplestore_advance - exported function to adjust position without fetching
888  *
889  * We could optimize this case to avoid palloc/pfree overhead, but for the
890  * moment it doesn't seem worthwhile.
891  */
892 bool
893 tuplestore_advance(Tuplestorestate *state, bool forward)
894 {
895         void       *tuple;
896         bool            should_free;
897
898         tuple = tuplestore_gettuple(state, forward, &should_free);
899
900         if (tuple)
901         {
902                 if (should_free)
903                         pfree(tuple);
904                 return true;
905         }
906         else
907         {
908                 return false;
909         }
910 }
911
912 /*
913  * dumptuples - remove tuples from memory and write to tape
914  *
915  * As a side effect, we must convert each read pointer's position from
916  * "current" to file/offset format.  But eof_reached pointers don't
917  * need to change state.
918  */
919 static void
920 dumptuples(Tuplestorestate *state)
921 {
922         int                     i;
923
924         for (i = 0;; i++)
925         {
926                 TSReadPointer *readptr = state->readptrs;
927                 int                     j;
928
929                 for (j = 0; j < state->readptrcount; readptr++, j++)
930                 {
931                         if (i == readptr->current && !readptr->eof_reached)
932                                 BufFileTell(state->myfile,
933                                                         &readptr->file, &readptr->offset);
934                 }
935                 if (i >= state->memtupcount)
936                         break;
937                 WRITETUP(state, state->memtuples[i]);
938         }
939         state->memtupcount = 0;
940 }
941
942 /*
943  * tuplestore_rescan            - rewind the active read pointer to start
944  */
945 void
946 tuplestore_rescan(Tuplestorestate *state)
947 {
948         TSReadPointer *readptr = &state->readptrs[state->activeptr];
949
950         Assert(readptr->eflags & EXEC_FLAG_REWIND);
951
952         switch (state->status)
953         {
954                 case TSS_INMEM:
955                         readptr->eof_reached = false;
956                         readptr->current = 0;
957                         break;
958                 case TSS_WRITEFILE:
959                         readptr->eof_reached = false;
960                         readptr->file = 0;
961                         readptr->offset = 0L;
962                         break;
963                 case TSS_READFILE:
964                         readptr->eof_reached = false;
965                         if (BufFileSeek(state->myfile, 0, 0L, SEEK_SET) != 0)
966                                 elog(ERROR, "tuplestore seek to start failed");
967                         break;
968                 default:
969                         elog(ERROR, "invalid tuplestore state");
970                         break;
971         }
972 }
973
974 /*
975  * tuplestore_copy_read_pointer - copy a read pointer's state to another
976  */
977 void
978 tuplestore_copy_read_pointer(Tuplestorestate *state,
979                                                          int srcptr, int destptr)
980 {
981         TSReadPointer *sptr = &state->readptrs[srcptr];
982         TSReadPointer *dptr = &state->readptrs[destptr];
983
984         Assert(srcptr >= 0 && srcptr < state->readptrcount);
985         Assert(destptr >= 0 && destptr < state->readptrcount);
986
987         /* Assigning to self is a no-op */
988         if (srcptr == destptr)
989                 return;
990
991         if (dptr->eflags != sptr->eflags)
992         {
993                 /* Possible change of overall eflags, so copy and then recompute */
994                 int             eflags;
995                 int             i;
996
997                 *dptr = *sptr;
998                 eflags = state->readptrs[0].eflags;
999                 for (i = 1; i < state->readptrcount; i++)
1000                         eflags |= state->readptrs[i].eflags;
1001                 state->eflags = eflags;
1002         }
1003         else
1004                 *dptr = *sptr;
1005
1006         switch (state->status)
1007         {
1008                 case TSS_INMEM:
1009                         /* We might be able to truncate the tuplestore */
1010                         tuplestore_trim(state);
1011                         break;
1012                 case TSS_WRITEFILE:
1013                         break;
1014                 case TSS_READFILE:
1015                         /*
1016                          * This case is a bit tricky since the active read pointer's
1017                          * position corresponds to the seek point, not what is in its
1018                          * variables.  Assigning to the active requires a seek, and
1019                          * assigning from the active requires a tell, except when
1020                          * eof_reached.
1021                          */
1022                         if (destptr == state->activeptr)
1023                         {
1024                                 if (dptr->eof_reached)
1025                                 {
1026                                         if (BufFileSeek(state->myfile,
1027                                                                         state->writepos_file,
1028                                                                         state->writepos_offset,
1029                                                                         SEEK_SET) != 0)
1030                                                 elog(ERROR, "tuplestore seek failed");
1031                                 }
1032                                 else
1033                                 {
1034                                         if (BufFileSeek(state->myfile,
1035                                                                         dptr->file, dptr->offset,
1036                                                                         SEEK_SET) != 0)
1037                                                 elog(ERROR, "tuplestore seek failed");
1038                                 }
1039                         }
1040                         else if (srcptr == state->activeptr)
1041                         {
1042                                 if (!dptr->eof_reached)
1043                                         BufFileTell(state->myfile,
1044                                                                 &dptr->file,
1045                                                                 &dptr->offset);
1046                         }
1047                         break;
1048                 default:
1049                         elog(ERROR, "invalid tuplestore state");
1050                         break;
1051         }
1052 }
1053
1054 /*
1055  * tuplestore_trim      - remove all no-longer-needed tuples
1056  */
1057 static void
1058 tuplestore_trim(Tuplestorestate *state)
1059 {
1060         int                     oldest;
1061         int                     nremove;
1062         int                     i;
1063
1064         /*
1065          * We can truncate the tuplestore if neither backward scan nor
1066          * rewind capability are required by any read pointer.
1067          */
1068         if (state->eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_REWIND))
1069                 return;
1070
1071         /*
1072          * We don't bother trimming temp files since it usually would mean more
1073          * work than just letting them sit in kernel buffers until they age out.
1074          */
1075         if (state->status != TSS_INMEM)
1076                 return;
1077
1078         /* Find the oldest read pointer */
1079         oldest = state->memtupcount;
1080         for (i = 0; i < state->readptrcount; i++)
1081         {
1082                 if (!state->readptrs[i].eof_reached)
1083                         oldest = Min(oldest, state->readptrs[i].current);
1084         }
1085
1086         /*
1087          * Note: you might think we could remove all the tuples before the oldest
1088          * "current", since that one is the next to be returned.  However,
1089          * since tuplestore_gettuple returns a direct pointer to our
1090          * internal copy of the tuple, it's likely that the caller has
1091          * still got the tuple just before "current" referenced in a slot.
1092          * So we keep one extra tuple before the oldest "current".
1093          */
1094         nremove = oldest - 1;
1095         if (nremove <= 0)
1096                 return;                                 /* nothing to do */
1097         Assert(nremove <= state->memtupcount);
1098
1099         /* Release no-longer-needed tuples */
1100         for (i = 0; i < nremove; i++)
1101         {
1102                 FREEMEM(state, GetMemoryChunkSpace(state->memtuples[i]));
1103                 pfree(state->memtuples[i]);
1104         }
1105
1106         /*
1107          * Slide the array down and readjust pointers.  This may look pretty
1108          * stupid, but we expect that there will usually not be very many
1109          * tuple-pointers to move, so this isn't that expensive; and it keeps a
1110          * lot of other logic simple.
1111          *
1112          * In fact, in the current usage for merge joins, it's demonstrable that
1113          * there will always be exactly one non-removed tuple; so optimize that
1114          * case.
1115          */
1116         if (nremove + 1 == state->memtupcount)
1117                 state->memtuples[0] = state->memtuples[nremove];
1118         else
1119                 memmove(state->memtuples, state->memtuples + nremove,
1120                                 (state->memtupcount - nremove) * sizeof(void *));
1121
1122         state->memtupcount -= nremove;
1123         for (i = 0; i < state->readptrcount; i++)
1124         {
1125                 if (!state->readptrs[i].eof_reached)
1126                         state->readptrs[i].current -= nremove;
1127         }
1128 }
1129
1130
1131 /*
1132  * Tape interface routines
1133  */
1134
1135 static unsigned int
1136 getlen(Tuplestorestate *state, bool eofOK)
1137 {
1138         unsigned int len;
1139         size_t          nbytes;
1140
1141         nbytes = BufFileRead(state->myfile, (void *) &len, sizeof(len));
1142         if (nbytes == sizeof(len))
1143                 return len;
1144         if (nbytes != 0)
1145                 elog(ERROR, "unexpected end of tape");
1146         if (!eofOK)
1147                 elog(ERROR, "unexpected end of data");
1148         return 0;
1149 }
1150
1151
1152 /*
1153  * Routines specialized for HeapTuple case
1154  *
1155  * The stored form is actually a MinimalTuple, but for largely historical
1156  * reasons we allow COPYTUP to work from a HeapTuple.
1157  *
1158  * Since MinimalTuple already has length in its first word, we don't need
1159  * to write that separately.
1160  */
1161
1162 static void *
1163 copytup_heap(Tuplestorestate *state, void *tup)
1164 {
1165         MinimalTuple tuple;
1166
1167         tuple = minimal_tuple_from_heap_tuple((HeapTuple) tup);
1168         USEMEM(state, GetMemoryChunkSpace(tuple));
1169         return (void *) tuple;
1170 }
1171
1172 static void
1173 writetup_heap(Tuplestorestate *state, void *tup)
1174 {
1175         MinimalTuple tuple = (MinimalTuple) tup;
1176         /* the part of the MinimalTuple we'll write: */
1177         char       *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
1178         unsigned int tupbodylen = tuple->t_len - MINIMAL_TUPLE_DATA_OFFSET;
1179         /* total on-disk footprint: */
1180         unsigned int tuplen = tupbodylen + sizeof(int);
1181
1182         if (BufFileWrite(state->myfile, (void *) &tuplen,
1183                                          sizeof(tuplen)) != sizeof(tuplen))
1184                 elog(ERROR, "write failed");
1185         if (BufFileWrite(state->myfile, (void *) tupbody,
1186                                          tupbodylen) != (size_t) tupbodylen)
1187                 elog(ERROR, "write failed");
1188         if (state->backward)            /* need trailing length word? */
1189                 if (BufFileWrite(state->myfile, (void *) &tuplen,
1190                                                  sizeof(tuplen)) != sizeof(tuplen))
1191                         elog(ERROR, "write failed");
1192
1193         FREEMEM(state, GetMemoryChunkSpace(tuple));
1194         heap_free_minimal_tuple(tuple);
1195 }
1196
1197 static void *
1198 readtup_heap(Tuplestorestate *state, unsigned int len)
1199 {
1200         unsigned int tupbodylen = len - sizeof(int);
1201         unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET;
1202         MinimalTuple tuple = (MinimalTuple) palloc(tuplen);
1203         char       *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
1204
1205         USEMEM(state, GetMemoryChunkSpace(tuple));
1206         /* read in the tuple proper */
1207         tuple->t_len = tuplen;
1208         if (BufFileRead(state->myfile, (void *) tupbody,
1209                                          tupbodylen) != (size_t) tupbodylen)
1210                 elog(ERROR, "unexpected end of data");
1211         if (state->backward)            /* need trailing length word? */
1212                 if (BufFileRead(state->myfile, (void *) &tuplen,
1213                                                 sizeof(tuplen)) != sizeof(tuplen))
1214                         elog(ERROR, "unexpected end of data");
1215         return (void *) tuple;
1216 }