1 /*-------------------------------------------------------------------------
4 * Generalized routines for temporary tuple storage.
6 * This module handles temporary storage of tuples for purposes such
7 * as Materialize nodes, hashjoin batch files, etc. It is essentially
8 * a dumbed-down version of tuplesort.c; it does no sorting of tuples
9 * but can only store and regurgitate a sequence of tuples. However,
10 * because no sort is required, it is allowed to start reading the sequence
11 * before it has all been written. This is particularly useful for cursors,
12 * because it allows random access within the already-scanned portion of
13 * a query without having to process the underlying scan to completion.
14 * A temporary file is used to handle the data if it exceeds the
15 * space limit specified by the caller.
17 * The (approximate) amount of memory allowed to the tuplestore is specified
18 * in kilobytes by the caller. We absorb tuples and simply store them in an
19 * in-memory array as long as we haven't exceeded maxKBytes. If we do exceed
20 * maxKBytes, we dump all the tuples into a temp file and then read from that
23 * When the caller requests random access to the data, we write the temp file
24 * in a format that allows either forward or backward scan. Otherwise, only
25 * forward scan is allowed. But rewind and markpos/restorepos are allowed
28 * Because we allow reading before writing is complete, there are two
29 * interesting positions in the temp file: the current read position and
30 * the current write position. At any given instant, the temp file's seek
31 * position corresponds to one of these, and the other one is remembered in
32 * the Tuplestore's state.
35 * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
36 * Portions Copyright (c) 1994, Regents of the University of California
39 * $Header: /cvsroot/pgsql/src/backend/utils/sort/tuplestore.c,v 1.13 2003/04/29 03:21:29 tgl Exp $
41 *-------------------------------------------------------------------------
46 #include "access/heapam.h"
47 #include "storage/buffile.h"
48 #include "utils/tuplestore.h"
51 * Possible states of a Tuplestore object. These denote the states that
52 * persist between calls of Tuplestore routines.
56 TSS_INMEM, /* Tuples still fit in memory */
57 TSS_WRITEFILE, /* Writing to temp file */
58 TSS_READFILE /* Reading from temp file */
62 * Private state of a Tuplestore operation.
64 struct Tuplestorestate
66 TupStoreStatus status; /* enumerated value as shown above */
67 bool randomAccess; /* did caller request random access? */
68 bool interXact; /* keep open through transactions? */
69 long availMem; /* remaining memory available, in bytes */
70 BufFile *myfile; /* underlying file, or NULL if none */
73 * These function pointers decouple the routines that must know what
74 * kind of tuple we are handling from the routines that don't need to
75 * know it. They are set up by the tuplestore_begin_xxx routines.
77 * (Although tuplestore.c currently only supports heap tuples, I've
78 * copied this part of tuplesort.c so that extension to other kinds of
79 * objects will be easy if it's ever needed.)
81 * Function to copy a supplied input tuple into palloc'd space. (NB: we
82 * assume that a single pfree() is enough to release the tuple later,
83 * so the representation must be "flat" in one palloc chunk.)
84 * state->availMem must be decreased by the amount of space used.
86 void *(*copytup) (Tuplestorestate *state, void *tup);
89 * Function to write a stored tuple onto tape. The representation of
90 * the tuple on tape need not be the same as it is in memory;
91 * requirements on the tape representation are given below. After
92 * writing the tuple, pfree() it, and increase state->availMem by the
93 * amount of memory space thereby released.
95 void (*writetup) (Tuplestorestate *state, void *tup);
98 * Function to read a stored tuple from tape back into memory. 'len'
99 * is the already-read length of the stored tuple. Create and return
100 * a palloc'd copy, and decrease state->availMem by the amount of
101 * memory space consumed.
103 void *(*readtup) (Tuplestorestate *state, unsigned int len);
106 * This array holds pointers to tuples in memory if we are in state
107 * INMEM. In states WRITEFILE and READFILE it's not used.
109 void **memtuples; /* array of pointers to palloc'd tuples */
110 int memtupcount; /* number of tuples currently present */
111 int memtupsize; /* allocated length of memtuples array */
114 * These variables are used to keep track of the current position.
116 * In state WRITEFILE, the current file seek position is the write point,
117 * and the read position is remembered in readpos_xxx; in state READFILE,
118 * the current file seek position is the read point, and the write position
119 * is remembered in writepos_xxx. (The write position is the same as EOF,
120 * but since BufFileSeek doesn't currently implement SEEK_END, we have
121 * to remember it explicitly.)
123 * Special case: if we are in WRITEFILE state and eof_reached is true,
124 * then the read position is implicitly equal to the write position
125 * (and hence to the file seek position); this way we need not update
126 * the readpos_xxx variables on each write.
128 bool eof_reached; /* read reached EOF (always valid) */
129 int current; /* next array index (valid if INMEM) */
130 int readpos_file; /* file# (valid if WRITEFILE and not eof) */
131 long readpos_offset; /* offset (valid if WRITEFILE and not eof) */
132 int writepos_file; /* file# (valid if READFILE) */
133 long writepos_offset; /* offset (valid if READFILE) */
135 /* markpos_xxx holds marked position for mark and restore */
136 int markpos_current; /* saved "current" */
137 int markpos_file; /* saved "readpos_file" */
138 long markpos_offset; /* saved "readpos_offset" */
141 #define COPYTUP(state,tup) ((*(state)->copytup) (state, tup))
142 #define WRITETUP(state,tup) ((*(state)->writetup) (state, tup))
143 #define READTUP(state,len) ((*(state)->readtup) (state, len))
144 #define LACKMEM(state) ((state)->availMem < 0)
145 #define USEMEM(state,amt) ((state)->availMem -= (amt))
146 #define FREEMEM(state,amt) ((state)->availMem += (amt))
148 /*--------------------
150 * NOTES about on-tape representation of tuples:
152 * We require the first "unsigned int" of a stored tuple to be the total size
153 * on-tape of the tuple, including itself (so it is never zero).
154 * The remainder of the stored tuple
155 * may or may not match the in-memory representation of the tuple ---
156 * any conversion needed is the job of the writetup and readtup routines.
158 * If state->randomAccess is true, then the stored representation of the
159 * tuple must be followed by another "unsigned int" that is a copy of the
160 * length --- so the total tape space used is actually sizeof(unsigned int)
161 * more than the stored length value. This allows read-backwards. When
162 * randomAccess is not true, the write/read routines may omit the extra
165 * writetup is expected to write both length words as well as the tuple
166 * data. When readtup is called, the tape is positioned just after the
167 * front length word; readtup must read the tuple data and advance past
168 * the back length word (if present).
170 * The write/read routines can make use of the tuple description data
171 * stored in the Tuplestorestate record, if needed. They are also expected
172 * to adjust state->availMem by the amount of memory space (not tape space!)
173 * released or consumed. There is no error return from either writetup
174 * or readtup; they should elog() on failure.
177 * NOTES about memory consumption calculations:
179 * We count space allocated for tuples against the maxKBytes limit,
180 * plus the space used by the variable-size array memtuples.
181 * Fixed-size space (primarily the BufFile I/O buffer) is not counted.
183 * Note that we count actual space used (as shown by GetMemoryChunkSpace)
184 * rather than the originally-requested size. This is important since
185 * palloc can add substantial overhead. It's not a complete answer since
186 * we won't count any wasted space in palloc allocation blocks, but it's
187 * a lot better than what we were doing before 7.3.
189 *--------------------
193 static Tuplestorestate *tuplestore_begin_common(bool randomAccess,
196 static void dumptuples(Tuplestorestate *state);
197 static unsigned int getlen(Tuplestorestate *state, bool eofOK);
198 static void *copytup_heap(Tuplestorestate *state, void *tup);
199 static void writetup_heap(Tuplestorestate *state, void *tup);
200 static void *readtup_heap(Tuplestorestate *state, unsigned int len);
204 * tuplestore_begin_xxx
206 * Initialize for a tuple store operation.
208 static Tuplestorestate *
209 tuplestore_begin_common(bool randomAccess, bool interXact, int maxKBytes)
211 Tuplestorestate *state;
213 state = (Tuplestorestate *) palloc0(sizeof(Tuplestorestate));
215 state->status = TSS_INMEM;
216 state->randomAccess = randomAccess;
217 state->interXact = interXact;
218 state->availMem = maxKBytes * 1024L;
219 state->myfile = NULL;
221 state->memtupcount = 0;
223 state->memtupsize = 1024; /* initial guess */
225 state->memtupsize = 1; /* won't really need any space */
226 state->memtuples = (void **) palloc(state->memtupsize * sizeof(void *));
228 USEMEM(state, GetMemoryChunkSpace(state->memtuples));
230 state->eof_reached = false;
237 * tuplestore_begin_heap
239 * Create a new tuplestore; other types of tuple stores (other than
240 * "heap" tuple stores, for heap tuples) are possible, but not presently
243 * randomAccess: if true, both forward and backward accesses to the
244 * tuple store are allowed.
246 * interXact: if true, the files used for on-disk storage persist beyond the
247 * end of the current transaction. NOTE: It's the caller's responsibility to
248 * create such a tuplestore in a memory context that will also survive
249 * transaction boundaries, and to ensure the tuplestore is closed when it's
252 * maxKBytes: how much data to store in memory (any data beyond this
253 * amount is paged to disk).
256 tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
258 Tuplestorestate *state = tuplestore_begin_common(randomAccess,
262 state->copytup = copytup_heap;
263 state->writetup = writetup_heap;
264 state->readtup = readtup_heap;
272 * Release resources and clean up.
275 tuplestore_end(Tuplestorestate *state)
280 BufFileClose(state->myfile);
281 if (state->memtuples)
283 for (i = 0; i < state->memtupcount; i++)
284 pfree(state->memtuples[i]);
285 pfree(state->memtuples);
292 * Returns the current eof_reached state.
295 tuplestore_ateof(Tuplestorestate *state)
297 return state->eof_reached;
301 * Accept one tuple and append it to the tuplestore.
303 * Note that the input tuple is always copied; the caller need not save it.
305 * If the read status is currently "AT EOF" then it remains so (the read
306 * pointer advances along with the write pointer); otherwise the read
307 * pointer is unchanged. This is for the convenience of nodeMaterial.c.
310 tuplestore_puttuple(Tuplestorestate *state, void *tuple)
313 * Copy the tuple. (Must do this even in WRITEFILE case.)
315 tuple = COPYTUP(state, tuple);
317 switch (state->status)
320 /* Grow the array as needed */
321 if (state->memtupcount >= state->memtupsize)
323 FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
324 state->memtupsize *= 2;
325 state->memtuples = (void **)
326 repalloc(state->memtuples,
327 state->memtupsize * sizeof(void *));
328 USEMEM(state, GetMemoryChunkSpace(state->memtuples));
331 /* Stash the tuple in the in-memory array */
332 state->memtuples[state->memtupcount++] = tuple;
334 /* If eof_reached, keep read position in sync */
335 if (state->eof_reached)
336 state->current = state->memtupcount;
339 * Done if we still fit in available memory.
345 * Nope; time to switch to tape-based operation.
347 state->myfile = BufFileCreateTemp(state->interXact);
348 state->status = TSS_WRITEFILE;
352 WRITETUP(state, tuple);
356 * Switch from reading to writing.
358 if (!state->eof_reached)
359 BufFileTell(state->myfile,
360 &state->readpos_file, &state->readpos_offset);
361 if (BufFileSeek(state->myfile,
362 state->writepos_file, state->writepos_offset,
364 elog(ERROR, "tuplestore_puttuple: seek(EOF) failed");
365 state->status = TSS_WRITEFILE;
366 WRITETUP(state, tuple);
369 elog(ERROR, "tuplestore_puttuple: invalid state");
375 * Fetch the next tuple in either forward or back direction.
376 * Returns NULL if no more tuples. If should_free is set, the
377 * caller must pfree the returned tuple when done with it.
380 tuplestore_gettuple(Tuplestorestate *state, bool forward,
386 Assert(forward || state->randomAccess);
388 switch (state->status)
391 *should_free = false;
394 if (state->current < state->memtupcount)
395 return state->memtuples[state->current++];
396 state->eof_reached = true;
401 if (state->current <= 0)
405 * if all tuples are fetched already then we return last
406 * tuple, else - tuple before last returned.
408 if (state->eof_reached)
409 state->eof_reached = false;
412 state->current--; /* last returned tuple */
413 if (state->current <= 0)
416 return state->memtuples[state->current - 1];
421 /* Skip state change if we'll just return NULL */
422 if (state->eof_reached && forward)
425 * Switch from writing to reading.
427 BufFileTell(state->myfile,
428 &state->writepos_file, &state->writepos_offset);
429 if (!state->eof_reached)
430 if (BufFileSeek(state->myfile,
431 state->readpos_file, state->readpos_offset,
433 elog(ERROR, "tuplestore_gettuple: seek() failed");
434 state->status = TSS_READFILE;
435 /* FALL THRU into READFILE case */
441 if ((tuplen = getlen(state, true)) != 0)
443 tup = READTUP(state, tuplen);
448 state->eof_reached = true;
456 * if all tuples are fetched already then we return last tuple,
457 * else - tuple before last returned.
459 * Back up to fetch previously-returned tuple's ending
460 * length word. If seek fails, assume we are at start of
463 if (BufFileSeek(state->myfile, 0, -(long) sizeof(unsigned int),
466 tuplen = getlen(state, false);
468 if (state->eof_reached)
470 state->eof_reached = false;
471 /* We will return the tuple returned before returning NULL */
476 * Back up to get ending length word of tuple before it.
478 if (BufFileSeek(state->myfile, 0,
479 -(long) (tuplen + 2 * sizeof(unsigned int)),
483 * If that fails, presumably the prev tuple is the
484 * first in the file. Back up so that it becomes next
485 * to read in forward direction (not obviously right,
486 * but that is what in-memory case does).
488 if (BufFileSeek(state->myfile, 0,
489 -(long) (tuplen + sizeof(unsigned int)),
491 elog(ERROR, "tuplestore_gettuple: bogus tuple len in backward scan");
494 tuplen = getlen(state, false);
498 * Now we have the length of the prior tuple, back up and read
499 * it. Note: READTUP expects we are positioned after the
500 * initial length word of the tuple, so back up to that point.
502 if (BufFileSeek(state->myfile, 0,
505 elog(ERROR, "tuplestore_gettuple: bogus tuple len in backward scan");
506 tup = READTUP(state, tuplen);
510 elog(ERROR, "tuplestore_gettuple: invalid state");
511 return NULL; /* keep compiler quiet */
516 * dumptuples - remove tuples from memory and write to tape
518 * As a side effect, we must set readpos and markpos to the value
519 * corresponding to "current"; otherwise, a dump would lose the current read
523 dumptuples(Tuplestorestate *state)
529 if (i == state->current)
530 BufFileTell(state->myfile,
531 &state->readpos_file, &state->readpos_offset);
532 if (i == state->markpos_current)
533 BufFileTell(state->myfile,
534 &state->markpos_file, &state->markpos_offset);
535 if (i >= state->memtupcount)
537 WRITETUP(state, state->memtuples[i]);
539 state->memtupcount = 0;
543 * tuplestore_rescan - rewind and replay the scan
546 tuplestore_rescan(Tuplestorestate *state)
548 switch (state->status)
551 state->eof_reached = false;
555 state->eof_reached = false;
556 state->readpos_file = 0;
557 state->readpos_offset = 0L;
560 state->eof_reached = false;
561 if (BufFileSeek(state->myfile, 0, 0L, SEEK_SET) != 0)
562 elog(ERROR, "tuplestore_rescan: seek(0) failed");
565 elog(ERROR, "tuplestore_rescan: invalid state");
571 * tuplestore_markpos - saves current position in the tuple sequence
574 tuplestore_markpos(Tuplestorestate *state)
576 switch (state->status)
579 state->markpos_current = state->current;
582 if (state->eof_reached)
584 /* Need to record the implicit read position */
585 BufFileTell(state->myfile,
586 &state->markpos_file,
587 &state->markpos_offset);
591 state->markpos_file = state->readpos_file;
592 state->markpos_offset = state->readpos_offset;
596 BufFileTell(state->myfile,
597 &state->markpos_file,
598 &state->markpos_offset);
601 elog(ERROR, "tuplestore_markpos: invalid state");
607 * tuplestore_restorepos - restores current position in tuple sequence to
608 * last saved position
611 tuplestore_restorepos(Tuplestorestate *state)
613 switch (state->status)
616 state->eof_reached = false;
617 state->current = state->markpos_current;
620 state->eof_reached = false;
621 state->readpos_file = state->markpos_file;
622 state->readpos_offset = state->markpos_offset;
625 state->eof_reached = false;
626 if (BufFileSeek(state->myfile,
628 state->markpos_offset,
630 elog(ERROR, "tuplestore_restorepos failed");
633 elog(ERROR, "tuplestore_restorepos: invalid state");
640 * Tape interface routines
644 getlen(Tuplestorestate *state, bool eofOK)
649 nbytes = BufFileRead(state->myfile, (void *) &len, sizeof(len));
650 if (nbytes == sizeof(len))
653 elog(ERROR, "tuplestore: unexpected end of tape");
655 elog(ERROR, "tuplestore: unexpected end of data");
661 * Routines specialized for HeapTuple case
665 copytup_heap(Tuplestorestate *state, void *tup)
667 HeapTuple tuple = (HeapTuple) tup;
669 tuple = heap_copytuple(tuple);
670 USEMEM(state, GetMemoryChunkSpace(tuple));
671 return (void *) tuple;
675 * We don't bother to write the HeapTupleData part of the tuple.
679 writetup_heap(Tuplestorestate *state, void *tup)
681 HeapTuple tuple = (HeapTuple) tup;
684 tuplen = tuple->t_len + sizeof(tuplen);
685 if (BufFileWrite(state->myfile, (void *) &tuplen,
686 sizeof(tuplen)) != sizeof(tuplen))
687 elog(ERROR, "tuplestore: write failed");
688 if (BufFileWrite(state->myfile, (void *) tuple->t_data,
689 tuple->t_len) != (size_t) tuple->t_len)
690 elog(ERROR, "tuplestore: write failed");
691 if (state->randomAccess) /* need trailing length word? */
692 if (BufFileWrite(state->myfile, (void *) &tuplen,
693 sizeof(tuplen)) != sizeof(tuplen))
694 elog(ERROR, "tuplestore: write failed");
696 FREEMEM(state, GetMemoryChunkSpace(tuple));
697 heap_freetuple(tuple);
701 readtup_heap(Tuplestorestate *state, unsigned int len)
703 unsigned int tuplen = len - sizeof(unsigned int) + HEAPTUPLESIZE;
704 HeapTuple tuple = (HeapTuple) palloc(tuplen);
706 USEMEM(state, GetMemoryChunkSpace(tuple));
707 /* reconstruct the HeapTupleData portion */
708 tuple->t_len = len - sizeof(unsigned int);
709 ItemPointerSetInvalid(&(tuple->t_self));
710 tuple->t_datamcxt = CurrentMemoryContext;
711 tuple->t_data = (HeapTupleHeader) (((char *) tuple) + HEAPTUPLESIZE);
712 /* read in the tuple proper */
713 if (BufFileRead(state->myfile, (void *) tuple->t_data,
714 tuple->t_len) != (size_t) tuple->t_len)
715 elog(ERROR, "tuplestore: unexpected end of data");
716 if (state->randomAccess) /* need trailing length word? */
717 if (BufFileRead(state->myfile, (void *) &tuplen,
718 sizeof(tuplen)) != sizeof(tuplen))
719 elog(ERROR, "tuplestore: unexpected end of data");
720 return (void *) tuple;