1 /*-------------------------------------------------------------------------
4 * Generalized routines for temporary tuple storage.
6 * This module handles temporary storage of tuples for purposes such
7 * as Materialize nodes, hashjoin batch files, etc. It is essentially
8 * a dumbed-down version of tuplesort.c; it does no sorting of tuples
9 * but can only store and regurgitate a sequence of tuples. However,
10 * because no sort is required, it is allowed to start reading the sequence
11 * before it has all been written. This is particularly useful for cursors,
12 * because it allows random access within the already-scanned portion of
13 * a query without having to process the underlying scan to completion.
14 * A temporary file is used to handle the data if it exceeds the
15 * space limit specified by the caller.
17 * The (approximate) amount of memory allowed to the tuplestore is specified
18 * in kilobytes by the caller. We absorb tuples and simply store them in an
19 * in-memory array as long as we haven't exceeded maxKBytes. If we do exceed
20 * maxKBytes, we dump all the tuples into a temp file and then read from that
23 * When the caller requests random access to the data, we write the temp file
24 * in a format that allows either forward or backward scan. Otherwise, only
25 * forward scan is allowed. But rewind and markpos/restorepos are allowed
28 * Because we allow reading before writing is complete, there are two
29 * interesting positions in the temp file: the current read position and
30 * the current write position. At any given instant, the temp file's seek
31 * position corresponds to one of these, and the other one is remembered in
32 * the Tuplestore's state.
35 * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
36 * Portions Copyright (c) 1994, Regents of the University of California
39 * $PostgreSQL: pgsql/src/backend/utils/sort/tuplestore.c,v 1.25 2005/11/22 18:17:27 momjian Exp $
41 *-------------------------------------------------------------------------
46 #include "access/heapam.h"
47 #include "storage/buffile.h"
48 #include "utils/memutils.h"
49 #include "utils/tuplestore.h"
53 * Possible states of a Tuplestore object. These denote the states that
54 * persist between calls of Tuplestore routines.
58 TSS_INMEM, /* Tuples still fit in memory */
59 TSS_WRITEFILE, /* Writing to temp file */
60 TSS_READFILE /* Reading from temp file */
64 * Private state of a Tuplestore operation.
66 struct Tuplestorestate
68 TupStoreStatus status; /* enumerated value as shown above */
69 bool randomAccess; /* did caller request random access? */
70 bool interXact; /* keep open through transactions? */
71 long availMem; /* remaining memory available, in bytes */
72 BufFile *myfile; /* underlying file, or NULL if none */
75 * These function pointers decouple the routines that must know what kind
76 * of tuple we are handling from the routines that don't need to know it.
77 * They are set up by the tuplestore_begin_xxx routines.
79 * (Although tuplestore.c currently only supports heap tuples, I've copied
80 * this part of tuplesort.c so that extension to other kinds of objects
81 * will be easy if it's ever needed.)
83 * Function to copy a supplied input tuple into palloc'd space. (NB: we
84 * assume that a single pfree() is enough to release the tuple later, so
85 * the representation must be "flat" in one palloc chunk.) state->availMem
86 * must be decreased by the amount of space used.
88 void *(*copytup) (Tuplestorestate *state, void *tup);
91 * Function to write a stored tuple onto tape. The representation of the
92 * tuple on tape need not be the same as it is in memory; requirements on
93 * the tape representation are given below. After writing the tuple,
94 * pfree() it, and increase state->availMem by the amount of memory space
97 void (*writetup) (Tuplestorestate *state, void *tup);
100 * Function to read a stored tuple from tape back into memory. 'len' is
101 * the already-read length of the stored tuple. Create and return a
102 * palloc'd copy, and decrease state->availMem by the amount of memory
105 void *(*readtup) (Tuplestorestate *state, unsigned int len);
108 * This array holds pointers to tuples in memory if we are in state INMEM.
109 * In states WRITEFILE and READFILE it's not used.
111 void **memtuples; /* array of pointers to palloc'd tuples */
112 int memtupcount; /* number of tuples currently present */
113 int memtupsize; /* allocated length of memtuples array */
116 * These variables are used to keep track of the current position.
118 * In state WRITEFILE, the current file seek position is the write point,
119 * and the read position is remembered in readpos_xxx; in state READFILE,
120 * the current file seek position is the read point, and the write
121 * position is remembered in writepos_xxx. (The write position is the
122 * same as EOF, but since BufFileSeek doesn't currently implement
123 * SEEK_END, we have to remember it explicitly.)
125 * Special case: if we are in WRITEFILE state and eof_reached is true,
126 * then the read position is implicitly equal to the write position (and
127 * hence to the file seek position); this way we need not update the
128 * readpos_xxx variables on each write.
130 bool eof_reached; /* read reached EOF (always valid) */
131 int current; /* next array index (valid if INMEM) */
132 int readpos_file; /* file# (valid if WRITEFILE and not eof) */
133 long readpos_offset; /* offset (valid if WRITEFILE and not eof) */
134 int writepos_file; /* file# (valid if READFILE) */
135 long writepos_offset; /* offset (valid if READFILE) */
137 /* markpos_xxx holds marked position for mark and restore */
138 int markpos_current; /* saved "current" */
139 int markpos_file; /* saved "readpos_file" */
140 long markpos_offset; /* saved "readpos_offset" */
143 #define COPYTUP(state,tup) ((*(state)->copytup) (state, tup))
144 #define WRITETUP(state,tup) ((*(state)->writetup) (state, tup))
145 #define READTUP(state,len) ((*(state)->readtup) (state, len))
146 #define LACKMEM(state) ((state)->availMem < 0)
147 #define USEMEM(state,amt) ((state)->availMem -= (amt))
148 #define FREEMEM(state,amt) ((state)->availMem += (amt))
150 /*--------------------
152 * NOTES about on-tape representation of tuples:
154 * We require the first "unsigned int" of a stored tuple to be the total size
155 * on-tape of the tuple, including itself (so it is never zero).
156 * The remainder of the stored tuple
157 * may or may not match the in-memory representation of the tuple ---
158 * any conversion needed is the job of the writetup and readtup routines.
160 * If state->randomAccess is true, then the stored representation of the
161 * tuple must be followed by another "unsigned int" that is a copy of the
162 * length --- so the total tape space used is actually sizeof(unsigned int)
163 * more than the stored length value. This allows read-backwards. When
164 * randomAccess is not true, the write/read routines may omit the extra
167 * writetup is expected to write both length words as well as the tuple
168 * data. When readtup is called, the tape is positioned just after the
169 * front length word; readtup must read the tuple data and advance past
170 * the back length word (if present).
172 * The write/read routines can make use of the tuple description data
173 * stored in the Tuplestorestate record, if needed. They are also expected
174 * to adjust state->availMem by the amount of memory space (not tape space!)
175 * released or consumed. There is no error return from either writetup
176 * or readtup; they should ereport() on failure.
179 * NOTES about memory consumption calculations:
181 * We count space allocated for tuples against the maxKBytes limit,
182 * plus the space used by the variable-size array memtuples.
183 * Fixed-size space (primarily the BufFile I/O buffer) is not counted.
185 * Note that we count actual space used (as shown by GetMemoryChunkSpace)
186 * rather than the originally-requested size. This is important since
187 * palloc can add substantial overhead. It's not a complete answer since
188 * we won't count any wasted space in palloc allocation blocks, but it's
189 * a lot better than what we were doing before 7.3.
191 *--------------------
195 static Tuplestorestate *tuplestore_begin_common(bool randomAccess,
198 static void dumptuples(Tuplestorestate *state);
199 static unsigned int getlen(Tuplestorestate *state, bool eofOK);
200 static void *copytup_heap(Tuplestorestate *state, void *tup);
201 static void writetup_heap(Tuplestorestate *state, void *tup);
202 static void *readtup_heap(Tuplestorestate *state, unsigned int len);
206 * tuplestore_begin_xxx
208 * Initialize for a tuple store operation.
210 static Tuplestorestate *
211 tuplestore_begin_common(bool randomAccess, bool interXact, int maxKBytes)
213 Tuplestorestate *state;
215 state = (Tuplestorestate *) palloc0(sizeof(Tuplestorestate));
217 state->status = TSS_INMEM;
218 state->randomAccess = randomAccess;
219 state->interXact = interXact;
220 state->availMem = maxKBytes * 1024L;
221 state->myfile = NULL;
223 state->memtupcount = 0;
224 state->memtupsize = 1024; /* initial guess */
225 state->memtuples = (void **) palloc(state->memtupsize * sizeof(void *));
227 USEMEM(state, GetMemoryChunkSpace(state->memtuples));
229 state->eof_reached = false;
236 * tuplestore_begin_heap
238 * Create a new tuplestore; other types of tuple stores (other than
239 * "heap" tuple stores, for heap tuples) are possible, but not presently
242 * randomAccess: if true, both forward and backward accesses to the
243 * tuple store are allowed.
245 * interXact: if true, the files used for on-disk storage persist beyond the
246 * end of the current transaction. NOTE: It's the caller's responsibility to
247 * create such a tuplestore in a memory context that will also survive
248 * transaction boundaries, and to ensure the tuplestore is closed when it's
251 * maxKBytes: how much data to store in memory (any data beyond this
252 * amount is paged to disk). When in doubt, use work_mem.
255 tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
257 Tuplestorestate *state = tuplestore_begin_common(randomAccess,
261 state->copytup = copytup_heap;
262 state->writetup = writetup_heap;
263 state->readtup = readtup_heap;
271 * Release resources and clean up.
274 tuplestore_end(Tuplestorestate *state)
279 BufFileClose(state->myfile);
280 if (state->memtuples)
282 for (i = 0; i < state->memtupcount; i++)
283 pfree(state->memtuples[i]);
284 pfree(state->memtuples);
291 * Returns the current eof_reached state.
294 tuplestore_ateof(Tuplestorestate *state)
296 return state->eof_reached;
300 * Accept one tuple and append it to the tuplestore.
302 * Note that the input tuple is always copied; the caller need not save it.
304 * If the read status is currently "AT EOF" then it remains so (the read
305 * pointer advances along with the write pointer); otherwise the read
306 * pointer is unchanged. This is for the convenience of nodeMaterial.c.
309 tuplestore_puttuple(Tuplestorestate *state, void *tuple)
312 * Copy the tuple. (Must do this even in WRITEFILE case.)
314 tuple = COPYTUP(state, tuple);
316 switch (state->status)
319 /* Grow the array as needed */
320 if (state->memtupcount >= state->memtupsize)
322 FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
323 state->memtupsize *= 2;
324 state->memtuples = (void **)
325 repalloc(state->memtuples,
326 state->memtupsize * sizeof(void *));
327 USEMEM(state, GetMemoryChunkSpace(state->memtuples));
330 /* Stash the tuple in the in-memory array */
331 state->memtuples[state->memtupcount++] = tuple;
333 /* If eof_reached, keep read position in sync */
334 if (state->eof_reached)
335 state->current = state->memtupcount;
338 * Done if we still fit in available memory.
344 * Nope; time to switch to tape-based operation.
346 state->myfile = BufFileCreateTemp(state->interXact);
347 state->status = TSS_WRITEFILE;
351 WRITETUP(state, tuple);
356 * Switch from reading to writing.
358 if (!state->eof_reached)
359 BufFileTell(state->myfile,
360 &state->readpos_file, &state->readpos_offset);
361 if (BufFileSeek(state->myfile,
362 state->writepos_file, state->writepos_offset,
364 elog(ERROR, "seek to EOF failed");
365 state->status = TSS_WRITEFILE;
366 WRITETUP(state, tuple);
369 elog(ERROR, "invalid tuplestore state");
375 * Fetch the next tuple in either forward or back direction.
376 * Returns NULL if no more tuples. If should_free is set, the
377 * caller must pfree the returned tuple when done with it.
380 tuplestore_gettuple(Tuplestorestate *state, bool forward,
386 Assert(forward || state->randomAccess);
388 switch (state->status)
391 *should_free = false;
394 if (state->current < state->memtupcount)
395 return state->memtuples[state->current++];
396 state->eof_reached = true;
401 if (state->current <= 0)
405 * if all tuples are fetched already then we return last
406 * tuple, else - tuple before last returned.
408 if (state->eof_reached)
409 state->eof_reached = false;
412 state->current--; /* last returned tuple */
413 if (state->current <= 0)
416 return state->memtuples[state->current - 1];
421 /* Skip state change if we'll just return NULL */
422 if (state->eof_reached && forward)
426 * Switch from writing to reading.
428 BufFileTell(state->myfile,
429 &state->writepos_file, &state->writepos_offset);
430 if (!state->eof_reached)
431 if (BufFileSeek(state->myfile,
432 state->readpos_file, state->readpos_offset,
434 elog(ERROR, "seek failed");
435 state->status = TSS_READFILE;
436 /* FALL THRU into READFILE case */
442 if ((tuplen = getlen(state, true)) != 0)
444 tup = READTUP(state, tuplen);
449 state->eof_reached = true;
457 * if all tuples are fetched already then we return last tuple,
458 * else - tuple before last returned.
460 * Back up to fetch previously-returned tuple's ending length
461 * word. If seek fails, assume we are at start of file.
463 if (BufFileSeek(state->myfile, 0, -(long) sizeof(unsigned int),
466 tuplen = getlen(state, false);
468 if (state->eof_reached)
470 state->eof_reached = false;
471 /* We will return the tuple returned before returning NULL */
476 * Back up to get ending length word of tuple before it.
478 if (BufFileSeek(state->myfile, 0,
479 -(long) (tuplen + 2 * sizeof(unsigned int)),
483 * If that fails, presumably the prev tuple is the first
484 * in the file. Back up so that it becomes next to read
485 * in forward direction (not obviously right, but that is
486 * what in-memory case does).
488 if (BufFileSeek(state->myfile, 0,
489 -(long) (tuplen + sizeof(unsigned int)),
491 elog(ERROR, "bogus tuple length in backward scan");
494 tuplen = getlen(state, false);
498 * Now we have the length of the prior tuple, back up and read it.
499 * Note: READTUP expects we are positioned after the initial
500 * length word of the tuple, so back up to that point.
502 if (BufFileSeek(state->myfile, 0,
505 elog(ERROR, "bogus tuple length in backward scan");
506 tup = READTUP(state, tuplen);
510 elog(ERROR, "invalid tuplestore state");
511 return NULL; /* keep compiler quiet */
516 * dumptuples - remove tuples from memory and write to tape
518 * As a side effect, we must set readpos and markpos to the value
519 * corresponding to "current"; otherwise, a dump would lose the current read
523 dumptuples(Tuplestorestate *state)
529 if (i == state->current)
530 BufFileTell(state->myfile,
531 &state->readpos_file, &state->readpos_offset);
532 if (i == state->markpos_current)
533 BufFileTell(state->myfile,
534 &state->markpos_file, &state->markpos_offset);
535 if (i >= state->memtupcount)
537 WRITETUP(state, state->memtuples[i]);
539 state->memtupcount = 0;
543 * tuplestore_rescan - rewind and replay the scan
546 tuplestore_rescan(Tuplestorestate *state)
548 switch (state->status)
551 state->eof_reached = false;
555 state->eof_reached = false;
556 state->readpos_file = 0;
557 state->readpos_offset = 0L;
560 state->eof_reached = false;
561 if (BufFileSeek(state->myfile, 0, 0L, SEEK_SET) != 0)
562 elog(ERROR, "seek to start failed");
565 elog(ERROR, "invalid tuplestore state");
571 * tuplestore_markpos - saves current position in the tuple sequence
574 tuplestore_markpos(Tuplestorestate *state)
576 switch (state->status)
579 state->markpos_current = state->current;
582 if (state->eof_reached)
584 /* Need to record the implicit read position */
585 BufFileTell(state->myfile,
586 &state->markpos_file,
587 &state->markpos_offset);
591 state->markpos_file = state->readpos_file;
592 state->markpos_offset = state->readpos_offset;
596 BufFileTell(state->myfile,
597 &state->markpos_file,
598 &state->markpos_offset);
601 elog(ERROR, "invalid tuplestore state");
607 * tuplestore_restorepos - restores current position in tuple sequence to
608 * last saved position
611 tuplestore_restorepos(Tuplestorestate *state)
613 switch (state->status)
616 state->eof_reached = false;
617 state->current = state->markpos_current;
620 state->eof_reached = false;
621 state->readpos_file = state->markpos_file;
622 state->readpos_offset = state->markpos_offset;
625 state->eof_reached = false;
626 if (BufFileSeek(state->myfile,
628 state->markpos_offset,
630 elog(ERROR, "tuplestore_restorepos failed");
633 elog(ERROR, "invalid tuplestore state");
640 * Tape interface routines
644 getlen(Tuplestorestate *state, bool eofOK)
649 nbytes = BufFileRead(state->myfile, (void *) &len, sizeof(len));
650 if (nbytes == sizeof(len))
653 elog(ERROR, "unexpected end of tape");
655 elog(ERROR, "unexpected end of data");
661 * Routines specialized for HeapTuple case
665 copytup_heap(Tuplestorestate *state, void *tup)
667 HeapTuple tuple = (HeapTuple) tup;
669 tuple = heap_copytuple(tuple);
670 USEMEM(state, GetMemoryChunkSpace(tuple));
671 return (void *) tuple;
675 * We don't bother to write the HeapTupleData part of the tuple.
679 writetup_heap(Tuplestorestate *state, void *tup)
681 HeapTuple tuple = (HeapTuple) tup;
684 tuplen = tuple->t_len + sizeof(tuplen);
685 if (BufFileWrite(state->myfile, (void *) &tuplen,
686 sizeof(tuplen)) != sizeof(tuplen))
687 elog(ERROR, "write failed");
688 if (BufFileWrite(state->myfile, (void *) tuple->t_data,
689 tuple->t_len) != (size_t) tuple->t_len)
690 elog(ERROR, "write failed");
691 if (state->randomAccess) /* need trailing length word? */
692 if (BufFileWrite(state->myfile, (void *) &tuplen,
693 sizeof(tuplen)) != sizeof(tuplen))
694 elog(ERROR, "write failed");
696 FREEMEM(state, GetMemoryChunkSpace(tuple));
697 heap_freetuple(tuple);
701 readtup_heap(Tuplestorestate *state, unsigned int len)
703 unsigned int tuplen = len - sizeof(unsigned int) + HEAPTUPLESIZE;
704 HeapTuple tuple = (HeapTuple) palloc(tuplen);
706 USEMEM(state, GetMemoryChunkSpace(tuple));
707 /* reconstruct the HeapTupleData portion */
708 tuple->t_len = len - sizeof(unsigned int);
709 ItemPointerSetInvalid(&(tuple->t_self));
710 tuple->t_data = (HeapTupleHeader) (((char *) tuple) + HEAPTUPLESIZE);
711 /* read in the tuple proper */
712 if (BufFileRead(state->myfile, (void *) tuple->t_data,
713 tuple->t_len) != (size_t) tuple->t_len)
714 elog(ERROR, "unexpected end of data");
715 if (state->randomAccess) /* need trailing length word? */
716 if (BufFileRead(state->myfile, (void *) &tuplen,
717 sizeof(tuplen)) != sizeof(tuplen))
718 elog(ERROR, "unexpected end of data");
719 return (void *) tuple;