1 /*-------------------------------------------------------------------------
4 * Generalized routines for temporary tuple storage.
6 * This module handles temporary storage of tuples for purposes such
7 * as Materialize nodes, hashjoin batch files, etc. It is essentially
8 * a dumbed-down version of tuplesort.c; it does no sorting of tuples
9 * but can only store a sequence of tuples and regurgitate it later.
10 * A temporary file is used to handle the data if it exceeds the
11 * space limit specified by the caller.
13 * The (approximate) amount of memory allowed to the tuplestore is specified
14 * in kilobytes by the caller. We absorb tuples and simply store them in an
15 * in-memory array as long as we haven't exceeded maxKBytes. If we reach the
16 * end of the input without exceeding maxKBytes, we just return tuples during
17 * the read phase by scanning the tuple array sequentially. If we do exceed
18 * maxKBytes, we dump all the tuples into a temp file and then read from that
19 * during the read phase.
21 * When the caller requests random access to the data, we write the temp file
22 * in a format that allows either forward or backward scan.
25 * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
26 * Portions Copyright (c) 1994, Regents of the University of California
29 * $Header: /cvsroot/pgsql/src/backend/utils/sort/tuplestore.c,v 1.5 2001/10/28 06:25:57 momjian Exp $
31 *-------------------------------------------------------------------------
36 #include "access/heapam.h"
37 #include "storage/buffile.h"
38 #include "utils/tuplestore.h"
41 * Possible states of a Tuplestore object. These denote the states that
42 * persist between calls of Tuplestore routines.
46 TSS_INITIAL, /* Loading tuples; still within memory
48 TSS_WRITEFILE, /* Loading tuples; writing to temp file */
49 TSS_READMEM, /* Reading tuples; entirely in memory */
50 TSS_READFILE /* Reading tuples from temp file */
54 * Private state of a Tuplestore operation.
56 struct Tuplestorestate
58 TupStoreStatus status; /* enumerated value as shown above */
59 bool randomAccess; /* did caller request random access? */
60 long availMem; /* remaining memory available, in bytes */
61 BufFile *myfile; /* underlying file, or NULL if none */
64 * These function pointers decouple the routines that must know what
65 * kind of tuple we are handling from the routines that don't need to
66 * know it. They are set up by the tuplestore_begin_xxx routines.
68 * (Although tuplestore.c currently only supports heap tuples, I've
69 * copied this part of tuplesort.c so that extension to other kinds of
70 * objects will be easy if it's ever needed.)
72 * Function to copy a supplied input tuple into palloc'd space. (NB: we
73 * assume that a single pfree() is enough to release the tuple later,
74 * so the representation must be "flat" in one palloc chunk.)
75 * state->availMem must be decreased by the amount of space used.
77 void *(*copytup) (Tuplestorestate *state, void *tup);
80 * Function to write a stored tuple onto tape. The representation of
81 * the tuple on tape need not be the same as it is in memory;
82 * requirements on the tape representation are given below. After
83 * writing the tuple, pfree() it, and increase state->availMem by the
84 * amount of memory space thereby released.
86 void (*writetup) (Tuplestorestate *state, void *tup);
89 * Function to read a stored tuple from tape back into memory. 'len'
90 * is the already-read length of the stored tuple. Create and return
91 * a palloc'd copy, and decrease state->availMem by the amount of
92 * memory space consumed.
94 void *(*readtup) (Tuplestorestate *state, unsigned int len);
97 * This array holds pointers to tuples in memory if we are in state
98 * INITIAL or READMEM. In states WRITEFILE and READFILE it's not
101 void **memtuples; /* array of pointers to palloc'd tuples */
102 int memtupcount; /* number of tuples currently present */
103 int memtupsize; /* allocated length of memtuples array */
106 * These variables are used after completion of storing to keep track
107 * of the next tuple to return. (In the tape case, the tape's current
108 * read position is also critical state.)
110 int current; /* array index (only used if READMEM) */
111 bool eof_reached; /* reached EOF (needed for cursors) */
113 /* markpos_xxx holds marked position for mark and restore */
114 int markpos_file; /* file# (only used if READFILE) */
115 long markpos_offset; /* saved "current", or offset in tape file */
116 bool markpos_eof; /* saved "eof_reached" */
119 #define COPYTUP(state,tup) ((*(state)->copytup) (state, tup))
120 #define WRITETUP(state,tup) ((*(state)->writetup) (state, tup))
121 #define READTUP(state,len) ((*(state)->readtup) (state, len))
122 #define LACKMEM(state) ((state)->availMem < 0)
123 #define USEMEM(state,amt) ((state)->availMem -= (amt))
124 #define FREEMEM(state,amt) ((state)->availMem += (amt))
126 /*--------------------
128 * NOTES about on-tape representation of tuples:
130 * We require the first "unsigned int" of a stored tuple to be the total size
131 * on-tape of the tuple, including itself (so it is never zero; an all-zero
132 * unsigned int is used to delimit runs). The remainder of the stored tuple
133 * may or may not match the in-memory representation of the tuple ---
134 * any conversion needed is the job of the writetup and readtup routines.
136 * If state->randomAccess is true, then the stored representation of the
137 * tuple must be followed by another "unsigned int" that is a copy of the
138 * length --- so the total tape space used is actually sizeof(unsigned int)
139 * more than the stored length value. This allows read-backwards. When
140 * randomAccess is not true, the write/read routines may omit the extra
143 * writetup is expected to write both length words as well as the tuple
144 * data. When readtup is called, the tape is positioned just after the
145 * front length word; readtup must read the tuple data and advance past
146 * the back length word (if present).
148 * The write/read routines can make use of the tuple description data
149 * stored in the Tuplestorestate record, if needed. They are also expected
150 * to adjust state->availMem by the amount of memory space (not tape space!)
151 * released or consumed. There is no error return from either writetup
152 * or readtup; they should elog() on failure.
155 * NOTES about memory consumption calculations:
157 * We count space requested for tuples against the maxKBytes limit.
158 * Fixed-size space (primarily the BufFile I/O buffer) is not
159 * counted, nor do we count the variable-size memtuples array.
160 * (Even though that could grow pretty large, it should be small compared
161 * to the tuples proper, so this is not unreasonable.)
163 * The major deficiency in this approach is that it ignores palloc overhead.
164 * The memory space actually allocated for a palloc chunk is always more
165 * than the request size, and could be considerably more (as much as 2X
166 * larger, in the current aset.c implementation). So the space used could
167 * be considerably more than maxKBytes says.
169 * One way to fix this is to add a memory management function that, given
170 * a pointer to a palloc'd chunk, returns the actual space consumed by the
171 * chunk. This would be very easy in the current aset.c module, but I'm
172 * hesitant to do it because it might be unpleasant to support in future
173 * implementations of memory management. (For example, a direct
174 * implementation of palloc as malloc could not support such a function
177 * A cruder answer is just to apply a fudge factor, say by initializing
178 * availMem to only three-quarters of what maxKBytes indicates. This is
179 * probably the right answer if anyone complains that maxKBytes is not being
180 * obeyed very faithfully.
182 *--------------------
186 static Tuplestorestate *tuplestore_begin_common(bool randomAccess,
188 static void dumptuples(Tuplestorestate *state);
189 static unsigned int getlen(Tuplestorestate *state, bool eofOK);
190 static void markrunend(Tuplestorestate *state);
191 static void *copytup_heap(Tuplestorestate *state, void *tup);
192 static void writetup_heap(Tuplestorestate *state, void *tup);
193 static void *readtup_heap(Tuplestorestate *state, unsigned int len);
197 * tuplestore_begin_xxx
199 * Initialize for a tuple store operation.
201 * After calling tuplestore_begin, the caller should call tuplestore_puttuple
202 * zero or more times, then call tuplestore_donestoring when all the tuples
203 * have been supplied. After donestoring, retrieve the tuples in order
204 * by calling tuplestore_gettuple until it returns NULL. (If random
205 * access was requested, rescan, markpos, and restorepos can also be called.)
206 * Call tuplestore_end to terminate the operation and release memory/disk
210 static Tuplestorestate *
211 tuplestore_begin_common(bool randomAccess, int maxKBytes)
213 Tuplestorestate *state;
215 state = (Tuplestorestate *) palloc(sizeof(Tuplestorestate));
217 MemSet((char *) state, 0, sizeof(Tuplestorestate));
219 state->status = TSS_INITIAL;
220 state->randomAccess = randomAccess;
221 state->availMem = maxKBytes * 1024L;
222 state->myfile = NULL;
224 state->memtupcount = 0;
226 state->memtupsize = 1024; /* initial guess */
228 state->memtupsize = 1; /* won't really need any space */
229 state->memtuples = (void **) palloc(state->memtupsize * sizeof(void *));
235 tuplestore_begin_heap(bool randomAccess, int maxKBytes)
237 Tuplestorestate *state = tuplestore_begin_common(randomAccess, maxKBytes);
239 state->copytup = copytup_heap;
240 state->writetup = writetup_heap;
241 state->readtup = readtup_heap;
249 * Release resources and clean up.
252 tuplestore_end(Tuplestorestate *state)
257 BufFileClose(state->myfile);
258 if (state->memtuples)
260 for (i = 0; i < state->memtupcount; i++)
261 pfree(state->memtuples[i]);
262 pfree(state->memtuples);
267 * Accept one tuple while collecting input data.
269 * Note that the input tuple is always copied; the caller need not save it.
272 tuplestore_puttuple(Tuplestorestate *state, void *tuple)
275 * Copy the tuple. (Must do this even in WRITEFILE case.)
277 tuple = COPYTUP(state, tuple);
279 switch (state->status)
284 * Stash the tuple in the in-memory array.
286 if (state->memtupcount >= state->memtupsize)
288 /* Grow the array as needed. */
289 state->memtupsize *= 2;
290 state->memtuples = (void **)
291 repalloc(state->memtuples,
292 state->memtupsize * sizeof(void *));
294 state->memtuples[state->memtupcount++] = tuple;
297 * Done if we still fit in available memory.
303 * Nope; time to switch to tape-based operation.
305 state->myfile = BufFileCreateTemp();
306 state->status = TSS_WRITEFILE;
310 WRITETUP(state, tuple);
313 elog(ERROR, "tuplestore_puttuple: invalid state");
319 * All tuples have been provided; finish writing.
322 tuplestore_donestoring(Tuplestorestate *state)
324 switch (state->status)
329 * We were able to accumulate all the tuples within the
330 * allowed amount of memory. Just set up to scan them.
333 state->eof_reached = false;
334 state->markpos_offset = 0L;
335 state->markpos_eof = false;
336 state->status = TSS_READMEM;
341 * Write the EOF marker.
346 * Set up for reading from tape.
348 if (BufFileSeek(state->myfile, 0, 0L, SEEK_SET) != 0)
349 elog(ERROR, "tuplestore_donestoring: seek(0) failed");
350 state->eof_reached = false;
351 state->markpos_file = 0;
352 state->markpos_offset = 0L;
353 state->markpos_eof = false;
354 state->status = TSS_READFILE;
357 elog(ERROR, "tuplestore_donestoring: invalid state");
363 * Fetch the next tuple in either forward or back direction.
364 * Returns NULL if no more tuples. If should_free is set, the
365 * caller must pfree the returned tuple when done with it.
368 tuplestore_gettuple(Tuplestorestate *state, bool forward,
374 switch (state->status)
377 Assert(forward || state->randomAccess);
378 *should_free = false;
381 if (state->current < state->memtupcount)
382 return state->memtuples[state->current++];
383 state->eof_reached = true;
388 if (state->current <= 0)
392 * if all tuples are fetched already then we return last
393 * tuple, else - tuple before last returned.
395 if (state->eof_reached)
396 state->eof_reached = false;
399 state->current--; /* last returned tuple */
400 if (state->current <= 0)
403 return state->memtuples[state->current - 1];
408 Assert(forward || state->randomAccess);
412 if (state->eof_reached)
414 if ((tuplen = getlen(state, true)) != 0)
416 tup = READTUP(state, tuplen);
421 state->eof_reached = true;
429 * if all tuples are fetched already then we return last tuple,
430 * else - tuple before last returned.
432 if (state->eof_reached)
435 * Seek position is pointing just past the zero tuplen at
436 * the end of file; back up to fetch last tuple's ending
437 * length word. If seek fails we must have a completely
440 if (BufFileSeek(state->myfile, 0,
441 -(long) (2 * sizeof(unsigned int)),
444 state->eof_reached = false;
449 * Back up and fetch previously-returned tuple's ending
450 * length word. If seek fails, assume we are at start of
453 if (BufFileSeek(state->myfile, 0,
454 -(long) sizeof(unsigned int),
457 tuplen = getlen(state, false);
460 * Back up to get ending length word of tuple before it.
462 if (BufFileSeek(state->myfile, 0,
463 -(long) (tuplen + 2 * sizeof(unsigned int)),
467 * If that fails, presumably the prev tuple is the
468 * first in the file. Back up so that it becomes next
469 * to read in forward direction (not obviously right,
470 * but that is what in-memory case does).
472 if (BufFileSeek(state->myfile, 0,
473 -(long) (tuplen + sizeof(unsigned int)),
475 elog(ERROR, "tuplestore_gettuple: bogus tuple len in backward scan");
480 tuplen = getlen(state, false);
483 * Now we have the length of the prior tuple, back up and read
484 * it. Note: READTUP expects we are positioned after the
485 * initial length word of the tuple, so back up to that point.
487 if (BufFileSeek(state->myfile, 0,
490 elog(ERROR, "tuplestore_gettuple: bogus tuple len in backward scan");
491 tup = READTUP(state, tuplen);
495 elog(ERROR, "tuplestore_gettuple: invalid state");
496 return NULL; /* keep compiler quiet */
501 * dumptuples - remove tuples from memory and write to tape
504 dumptuples(Tuplestorestate *state)
508 for (i = 0; i < state->memtupcount; i++)
509 WRITETUP(state, state->memtuples[i]);
510 state->memtupcount = 0;
514 * tuplestore_rescan - rewind and replay the scan
517 tuplestore_rescan(Tuplestorestate *state)
519 Assert(state->randomAccess);
521 switch (state->status)
525 state->eof_reached = false;
526 state->markpos_offset = 0L;
527 state->markpos_eof = false;
530 if (BufFileSeek(state->myfile, 0, 0L, SEEK_SET) != 0)
531 elog(ERROR, "tuplestore_rescan: seek(0) failed");
532 state->eof_reached = false;
533 state->markpos_file = 0;
534 state->markpos_offset = 0L;
535 state->markpos_eof = false;
538 elog(ERROR, "tuplestore_rescan: invalid state");
544 * tuplestore_markpos - saves current position in the tuple sequence
547 tuplestore_markpos(Tuplestorestate *state)
549 Assert(state->randomAccess);
551 switch (state->status)
554 state->markpos_offset = state->current;
555 state->markpos_eof = state->eof_reached;
558 BufFileTell(state->myfile,
559 &state->markpos_file,
560 &state->markpos_offset);
561 state->markpos_eof = state->eof_reached;
564 elog(ERROR, "tuplestore_markpos: invalid state");
570 * tuplestore_restorepos - restores current position in tuple sequence to
571 * last saved position
574 tuplestore_restorepos(Tuplestorestate *state)
576 Assert(state->randomAccess);
578 switch (state->status)
581 state->current = (int) state->markpos_offset;
582 state->eof_reached = state->markpos_eof;
585 if (BufFileSeek(state->myfile,
587 state->markpos_offset,
589 elog(ERROR, "tuplestore_restorepos failed");
590 state->eof_reached = state->markpos_eof;
593 elog(ERROR, "tuplestore_restorepos: invalid state");
600 * Tape interface routines
604 getlen(Tuplestorestate *state, bool eofOK)
608 if (BufFileRead(state->myfile, (void *) &len, sizeof(len)) != sizeof(len))
609 elog(ERROR, "tuplestore: unexpected end of tape");
610 if (len == 0 && !eofOK)
611 elog(ERROR, "tuplestore: unexpected end of data");
616 markrunend(Tuplestorestate *state)
618 unsigned int len = 0;
620 if (BufFileWrite(state->myfile, (void *) &len, sizeof(len)) != sizeof(len))
621 elog(ERROR, "tuplestore: write failed");
626 * Routines specialized for HeapTuple case
630 copytup_heap(Tuplestorestate *state, void *tup)
632 HeapTuple tuple = (HeapTuple) tup;
634 USEMEM(state, HEAPTUPLESIZE + tuple->t_len);
635 return (void *) heap_copytuple(tuple);
639 * We don't bother to write the HeapTupleData part of the tuple.
643 writetup_heap(Tuplestorestate *state, void *tup)
645 HeapTuple tuple = (HeapTuple) tup;
648 tuplen = tuple->t_len + sizeof(tuplen);
649 if (BufFileWrite(state->myfile, (void *) &tuplen,
650 sizeof(tuplen)) != sizeof(tuplen))
651 elog(ERROR, "tuplestore: write failed");
652 if (BufFileWrite(state->myfile, (void *) tuple->t_data,
653 tuple->t_len) != (size_t) tuple->t_len)
654 elog(ERROR, "tuplestore: write failed");
655 if (state->randomAccess) /* need trailing length word? */
656 if (BufFileWrite(state->myfile, (void *) &tuplen,
657 sizeof(tuplen)) != sizeof(tuplen))
658 elog(ERROR, "tuplestore: write failed");
660 FREEMEM(state, HEAPTUPLESIZE + tuple->t_len);
661 heap_freetuple(tuple);
665 readtup_heap(Tuplestorestate *state, unsigned int len)
667 unsigned int tuplen = len - sizeof(unsigned int) + HEAPTUPLESIZE;
668 HeapTuple tuple = (HeapTuple) palloc(tuplen);
670 USEMEM(state, tuplen);
671 /* reconstruct the HeapTupleData portion */
672 tuple->t_len = len - sizeof(unsigned int);
673 ItemPointerSetInvalid(&(tuple->t_self));
674 tuple->t_datamcxt = CurrentMemoryContext;
675 tuple->t_data = (HeapTupleHeader) (((char *) tuple) + HEAPTUPLESIZE);
676 /* read in the tuple proper */
677 if (BufFileRead(state->myfile, (void *) tuple->t_data,
678 tuple->t_len) != (size_t) tuple->t_len)
679 elog(ERROR, "tuplestore: unexpected end of data");
680 if (state->randomAccess) /* need trailing length word? */
681 if (BufFileRead(state->myfile, (void *) &tuplen,
682 sizeof(tuplen)) != sizeof(tuplen))
683 elog(ERROR, "tuplestore: unexpected end of data");
684 return (void *) tuple;