1 /*-------------------------------------------------------------------------
4 * Generalized routines for temporary tuple storage.
6 * This module handles temporary storage of tuples for purposes such
7 * as Materialize nodes, hashjoin batch files, etc. It is essentially
8 * a dumbed-down version of tuplesort.c; it does no sorting of tuples
9 * but can only store and regurgitate a sequence of tuples. However,
10 * because no sort is required, it is allowed to start reading the sequence
11 * before it has all been written. This is particularly useful for cursors,
12 * because it allows random access within the already-scanned portion of
13 * a query without having to process the underlying scan to completion.
14 * Also, it is possible to support multiple independent read pointers.
16 * A temporary file is used to handle the data if it exceeds the
17 * space limit specified by the caller.
19 * The (approximate) amount of memory allowed to the tuplestore is specified
20 * in kilobytes by the caller. We absorb tuples and simply store them in an
21 * in-memory array as long as we haven't exceeded maxKBytes. If we do exceed
22 * maxKBytes, we dump all the tuples into a temp file and then read from that
25 * Upon creation, a tuplestore supports a single read pointer, numbered 0.
26 * Additional read pointers can be created using tuplestore_alloc_read_pointer.
27 * Mark/restore behavior is supported by copying read pointers.
29 * When the caller requests backward-scan capability, we write the temp file
30 * in a format that allows either forward or backward scan. Otherwise, only
31 * forward scan is allowed. A request for backward scan must be made before
32 * putting any tuples into the tuplestore. Rewind is normally allowed but
33 * can be turned off via tuplestore_set_eflags; turning off rewind for all
34 * read pointers enables truncation of the tuplestore at the oldest read point
35 * for minimal memory usage. (The caller must explicitly call tuplestore_trim
36 * at appropriate times for truncation to actually happen.)
38 * Note: in TSS_WRITEFILE state, the temp file's seek position is the
39 * current write position, and the write-position variables in the tuplestore
40 * aren't kept up to date. Similarly, in TSS_READFILE state the temp file's
41 * seek position is the active read pointer's position, and that read pointer
42 * isn't kept up to date. We update the appropriate variables using ftell()
43 * before switching to the other state or activating a different read pointer.
46 * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
47 * Portions Copyright (c) 1994, Regents of the University of California
50 * src/backend/utils/sort/tuplestore.c
52 *-------------------------------------------------------------------------
59 #include "access/htup_details.h"
60 #include "commands/tablespace.h"
61 #include "executor/executor.h"
62 #include "miscadmin.h"
63 #include "storage/buffile.h"
64 #include "utils/memutils.h"
65 #include "utils/resowner.h"
69 * Possible states of a Tuplestore object. These denote the states that
70 * persist between calls of Tuplestore routines.
74 TSS_INMEM, /* Tuples still fit in memory */
75 TSS_WRITEFILE, /* Writing to temp file */
76 TSS_READFILE /* Reading from temp file */
80 * State for a single read pointer. If we are in state INMEM then all the
81 * read pointers' "current" fields denote the read positions. In state
82 * WRITEFILE, the file/offset fields denote the read positions. In state
83 * READFILE, inactive read pointers have valid file/offset, but the active
84 * read pointer implicitly has position equal to the temp file's seek position.
86 * Special case: if eof_reached is true, then the pointer's read position is
87 * implicitly equal to the write position, and current/file/offset aren't
88 * maintained. This way we need not update all the read pointers each time
93 int eflags; /* capability flags */
94 bool eof_reached; /* read has reached EOF */
95 int current; /* next array index to read */
96 int file; /* temp file# */
97 off_t offset; /* byte offset in file */
101 * Private state of a Tuplestore operation.
103 struct Tuplestorestate
105 TupStoreStatus status; /* enumerated value as shown above */
106 int eflags; /* capability flags (OR of pointers' flags) */
107 bool backward; /* store extra length words in file? */
108 bool interXact; /* keep open through transactions? */
109 bool truncated; /* tuplestore_trim has removed tuples? */
110 int64 availMem; /* remaining memory available, in bytes */
111 int64 allowedMem; /* total memory allowed, in bytes */
112 BufFile *myfile; /* underlying file, or NULL if none */
113 MemoryContext context; /* memory context for holding tuples */
114 ResourceOwner resowner; /* resowner for holding temp files */
117 * These function pointers decouple the routines that must know what kind
118 * of tuple we are handling from the routines that don't need to know it.
119 * They are set up by the tuplestore_begin_xxx routines.
121 * (Although tuplestore.c currently only supports heap tuples, I've copied
122 * this part of tuplesort.c so that extension to other kinds of objects
123 * will be easy if it's ever needed.)
125 * Function to copy a supplied input tuple into palloc'd space. (NB: we
126 * assume that a single pfree() is enough to release the tuple later, so
127 * the representation must be "flat" in one palloc chunk.) state->availMem
128 * must be decreased by the amount of space used.
130 void *(*copytup) (Tuplestorestate *state, void *tup);
133 * Function to write a stored tuple onto tape. The representation of the
134 * tuple on tape need not be the same as it is in memory; requirements on
135 * the tape representation are given below. After writing the tuple,
136 * pfree() it, and increase state->availMem by the amount of memory space
139 void (*writetup) (Tuplestorestate *state, void *tup);
142 * Function to read a stored tuple from tape back into memory. 'len' is
143 * the already-read length of the stored tuple. Create and return a
144 * palloc'd copy, and decrease state->availMem by the amount of memory
147 void *(*readtup) (Tuplestorestate *state, unsigned int len);
150 * This array holds pointers to tuples in memory if we are in state INMEM.
151 * In states WRITEFILE and READFILE it's not used.
153 * When memtupdeleted > 0, the first memtupdeleted pointers are already
154 * released due to a tuplestore_trim() operation, but we haven't expended
155 * the effort to slide the remaining pointers down. These unused pointers
156 * are set to NULL to catch any invalid accesses. Note that memtupcount
157 * includes the deleted pointers.
159 void **memtuples; /* array of pointers to palloc'd tuples */
160 int memtupdeleted; /* the first N slots are currently unused */
161 int memtupcount; /* number of tuples currently present */
162 int memtupsize; /* allocated length of memtuples array */
163 bool growmemtuples; /* memtuples' growth still underway? */
166 * These variables are used to keep track of the current positions.
168 * In state WRITEFILE, the current file seek position is the write point;
169 * in state READFILE, the write position is remembered in writepos_xxx.
170 * (The write position is the same as EOF, but since BufFileSeek doesn't
171 * currently implement SEEK_END, we have to remember it explicitly.)
173 TSReadPointer *readptrs; /* array of read pointers */
174 int activeptr; /* index of the active read pointer */
175 int readptrcount; /* number of pointers currently valid */
176 int readptrsize; /* allocated length of readptrs array */
178 int writepos_file; /* file# (valid if READFILE state) */
179 off_t writepos_offset; /* offset (valid if READFILE state) */
182 #define COPYTUP(state,tup) ((*(state)->copytup) (state, tup))
183 #define WRITETUP(state,tup) ((*(state)->writetup) (state, tup))
184 #define READTUP(state,len) ((*(state)->readtup) (state, len))
185 #define LACKMEM(state) ((state)->availMem < 0)
186 #define USEMEM(state,amt) ((state)->availMem -= (amt))
187 #define FREEMEM(state,amt) ((state)->availMem += (amt))
189 /*--------------------
191 * NOTES about on-tape representation of tuples:
193 * We require the first "unsigned int" of a stored tuple to be the total size
194 * on-tape of the tuple, including itself (so it is never zero).
195 * The remainder of the stored tuple
196 * may or may not match the in-memory representation of the tuple ---
197 * any conversion needed is the job of the writetup and readtup routines.
199 * If state->backward is true, then the stored representation of
200 * the tuple must be followed by another "unsigned int" that is a copy of the
201 * length --- so the total tape space used is actually sizeof(unsigned int)
202 * more than the stored length value. This allows read-backwards. When
203 * state->backward is not set, the write/read routines may omit the extra
206 * writetup is expected to write both length words as well as the tuple
207 * data. When readtup is called, the tape is positioned just after the
208 * front length word; readtup must read the tuple data and advance past
209 * the back length word (if present).
211 * The write/read routines can make use of the tuple description data
212 * stored in the Tuplestorestate record, if needed. They are also expected
213 * to adjust state->availMem by the amount of memory space (not tape space!)
214 * released or consumed. There is no error return from either writetup
215 * or readtup; they should ereport() on failure.
218 * NOTES about memory consumption calculations:
220 * We count space allocated for tuples against the maxKBytes limit,
221 * plus the space used by the variable-size array memtuples.
222 * Fixed-size space (primarily the BufFile I/O buffer) is not counted.
223 * We don't worry about the size of the read pointer array, either.
225 * Note that we count actual space used (as shown by GetMemoryChunkSpace)
226 * rather than the originally-requested size. This is important since
227 * palloc can add substantial overhead. It's not a complete answer since
228 * we won't count any wasted space in palloc allocation blocks, but it's
229 * a lot better than what we were doing before 7.3.
231 *--------------------
235 static Tuplestorestate *tuplestore_begin_common(int eflags,
238 static void tuplestore_puttuple_common(Tuplestorestate *state, void *tuple);
239 static void dumptuples(Tuplestorestate *state);
240 static unsigned int getlen(Tuplestorestate *state, bool eofOK);
241 static void *copytup_heap(Tuplestorestate *state, void *tup);
242 static void writetup_heap(Tuplestorestate *state, void *tup);
243 static void *readtup_heap(Tuplestorestate *state, unsigned int len);
247 * tuplestore_begin_xxx
249 * Initialize for a tuple store operation.
251 static Tuplestorestate *
252 tuplestore_begin_common(int eflags, bool interXact, int maxKBytes)
254 Tuplestorestate *state;
256 state = (Tuplestorestate *) palloc0(sizeof(Tuplestorestate));
258 state->status = TSS_INMEM;
259 state->eflags = eflags;
260 state->interXact = interXact;
261 state->truncated = false;
262 state->allowedMem = maxKBytes * 1024L;
263 state->availMem = state->allowedMem;
264 state->myfile = NULL;
265 state->context = CurrentMemoryContext;
266 state->resowner = CurrentResourceOwner;
268 state->memtupdeleted = 0;
269 state->memtupcount = 0;
272 * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
273 * see comments in grow_memtuples().
275 state->memtupsize = Max(16384 / sizeof(void *),
276 ALLOCSET_SEPARATE_THRESHOLD / sizeof(void *) + 1);
278 state->growmemtuples = true;
279 state->memtuples = (void **) palloc(state->memtupsize * sizeof(void *));
281 USEMEM(state, GetMemoryChunkSpace(state->memtuples));
283 state->activeptr = 0;
284 state->readptrcount = 1;
285 state->readptrsize = 8; /* arbitrary */
286 state->readptrs = (TSReadPointer *)
287 palloc(state->readptrsize * sizeof(TSReadPointer));
289 state->readptrs[0].eflags = eflags;
290 state->readptrs[0].eof_reached = false;
291 state->readptrs[0].current = 0;
297 * tuplestore_begin_heap
299 * Create a new tuplestore; other types of tuple stores (other than
300 * "heap" tuple stores, for heap tuples) are possible, but not presently
303 * randomAccess: if true, both forward and backward accesses to the
304 * tuple store are allowed.
306 * interXact: if true, the files used for on-disk storage persist beyond the
307 * end of the current transaction. NOTE: It's the caller's responsibility to
308 * create such a tuplestore in a memory context and resource owner that will
309 * also survive transaction boundaries, and to ensure the tuplestore is closed
310 * when it's no longer wanted.
312 * maxKBytes: how much data to store in memory (any data beyond this
313 * amount is paged to disk). When in doubt, use work_mem.
316 tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
318 Tuplestorestate *state;
322 * This interpretation of the meaning of randomAccess is compatible with
323 * the pre-8.3 behavior of tuplestores.
325 eflags = randomAccess ?
326 (EXEC_FLAG_BACKWARD | EXEC_FLAG_REWIND) :
329 state = tuplestore_begin_common(eflags, interXact, maxKBytes);
331 state->copytup = copytup_heap;
332 state->writetup = writetup_heap;
333 state->readtup = readtup_heap;
339 * tuplestore_set_eflags
341 * Set the capability flags for read pointer 0 at a finer grain than is
342 * allowed by tuplestore_begin_xxx. This must be called before inserting
343 * any data into the tuplestore.
345 * eflags is a bitmask following the meanings used for executor node
346 * startup flags (see executor.h). tuplestore pays attention to these bits:
347 * EXEC_FLAG_REWIND need rewind to start
348 * EXEC_FLAG_BACKWARD need backward fetch
349 * If tuplestore_set_eflags is not called, REWIND is allowed, and BACKWARD
350 * is set per "randomAccess" in the tuplestore_begin_xxx call.
352 * NOTE: setting BACKWARD without REWIND means the pointer can read backwards,
353 * but not further than the truncation point (the furthest-back read pointer
354 * position at the time of the last tuplestore_trim call).
357 tuplestore_set_eflags(Tuplestorestate *state, int eflags)
361 if (state->status != TSS_INMEM || state->memtupcount != 0)
362 elog(ERROR, "too late to call tuplestore_set_eflags");
364 state->readptrs[0].eflags = eflags;
365 for (i = 1; i < state->readptrcount; i++)
366 eflags |= state->readptrs[i].eflags;
367 state->eflags = eflags;
371 * tuplestore_alloc_read_pointer - allocate another read pointer.
373 * Returns the pointer's index.
375 * The new pointer initially copies the position of read pointer 0.
376 * It can have its own eflags, but if any data has been inserted into
377 * the tuplestore, these eflags must not represent an increase in
381 tuplestore_alloc_read_pointer(Tuplestorestate *state, int eflags)
383 /* Check for possible increase of requirements */
384 if (state->status != TSS_INMEM || state->memtupcount != 0)
386 if ((state->eflags | eflags) != state->eflags)
387 elog(ERROR, "too late to require new tuplestore eflags");
390 /* Make room for another read pointer if needed */
391 if (state->readptrcount >= state->readptrsize)
393 int newcnt = state->readptrsize * 2;
395 state->readptrs = (TSReadPointer *)
396 repalloc(state->readptrs, newcnt * sizeof(TSReadPointer));
397 state->readptrsize = newcnt;
401 state->readptrs[state->readptrcount] = state->readptrs[0];
402 state->readptrs[state->readptrcount].eflags = eflags;
404 state->eflags |= eflags;
406 return state->readptrcount++;
412 * Delete all the contents of a tuplestore, and reset its read pointers
416 tuplestore_clear(Tuplestorestate *state)
419 TSReadPointer *readptr;
422 BufFileClose(state->myfile);
423 state->myfile = NULL;
424 if (state->memtuples)
426 for (i = state->memtupdeleted; i < state->memtupcount; i++)
428 FREEMEM(state, GetMemoryChunkSpace(state->memtuples[i]));
429 pfree(state->memtuples[i]);
432 state->status = TSS_INMEM;
433 state->truncated = false;
434 state->memtupdeleted = 0;
435 state->memtupcount = 0;
436 readptr = state->readptrs;
437 for (i = 0; i < state->readptrcount; readptr++, i++)
439 readptr->eof_reached = false;
440 readptr->current = 0;
447 * Release resources and clean up.
450 tuplestore_end(Tuplestorestate *state)
455 BufFileClose(state->myfile);
456 if (state->memtuples)
458 for (i = state->memtupdeleted; i < state->memtupcount; i++)
459 pfree(state->memtuples[i]);
460 pfree(state->memtuples);
462 pfree(state->readptrs);
467 * tuplestore_select_read_pointer - make the specified read pointer active
470 tuplestore_select_read_pointer(Tuplestorestate *state, int ptr)
472 TSReadPointer *readptr;
473 TSReadPointer *oldptr;
475 Assert(ptr >= 0 && ptr < state->readptrcount);
477 /* No work if already active */
478 if (ptr == state->activeptr)
481 readptr = &state->readptrs[ptr];
482 oldptr = &state->readptrs[state->activeptr];
484 switch (state->status)
493 * First, save the current read position in the pointer about to
496 if (!oldptr->eof_reached)
497 BufFileTell(state->myfile,
502 * We have to make the temp file's seek position equal to the
503 * logical position of the new read pointer. In eof_reached
504 * state, that's the EOF, which we have available from the saved
507 if (readptr->eof_reached)
509 if (BufFileSeek(state->myfile,
510 state->writepos_file,
511 state->writepos_offset,
514 (errcode_for_file_access(),
515 errmsg("could not seek in tuplestore temporary file: %m")));
519 if (BufFileSeek(state->myfile,
524 (errcode_for_file_access(),
525 errmsg("could not seek in tuplestore temporary file: %m")));
529 elog(ERROR, "invalid tuplestore state");
533 state->activeptr = ptr;
539 * Returns the active read pointer's eof_reached state.
542 tuplestore_ateof(Tuplestorestate *state)
544 return state->readptrs[state->activeptr].eof_reached;
548 * Grow the memtuples[] array, if possible within our memory constraint. We
549 * must not exceed INT_MAX tuples in memory or the caller-provided memory
550 * limit. Return TRUE if we were able to enlarge the array, FALSE if not.
552 * Normally, at each increment we double the size of the array. When doing
553 * that would exceed a limit, we attempt one last, smaller increase (and then
554 * clear the growmemtuples flag so we don't try any more). That allows us to
555 * use memory as fully as permitted; sticking to the pure doubling rule could
556 * result in almost half going unused. Because availMem moves around with
557 * tuple addition/removal, we need some rule to prevent making repeated small
558 * increases in memtupsize, which would just be useless thrashing. The
559 * growmemtuples flag accomplishes that and also prevents useless
560 * recalculations in this function.
563 grow_memtuples(Tuplestorestate *state)
566 int memtupsize = state->memtupsize;
567 int64 memNowUsed = state->allowedMem - state->availMem;
569 /* Forget it if we've already maxed out memtuples, per comment above */
570 if (!state->growmemtuples)
573 /* Select new value of memtupsize */
574 if (memNowUsed <= state->availMem)
577 * We've used no more than half of allowedMem; double our usage,
578 * clamping at INT_MAX tuples.
580 if (memtupsize < INT_MAX / 2)
581 newmemtupsize = memtupsize * 2;
584 newmemtupsize = INT_MAX;
585 state->growmemtuples = false;
591 * This will be the last increment of memtupsize. Abandon doubling
592 * strategy and instead increase as much as we safely can.
594 * To stay within allowedMem, we can't increase memtupsize by more
595 * than availMem / sizeof(void *) elements. In practice, we want to
596 * increase it by considerably less, because we need to leave some
597 * space for the tuples to which the new array slots will refer. We
598 * assume the new tuples will be about the same size as the tuples
599 * we've already seen, and thus we can extrapolate from the space
600 * consumption so far to estimate an appropriate new size for the
601 * memtuples array. The optimal value might be higher or lower than
602 * this estimate, but it's hard to know that in advance. We again
603 * clamp at INT_MAX tuples.
605 * This calculation is safe against enlarging the array so much that
606 * LACKMEM becomes true, because the memory currently used includes
607 * the present array; thus, there would be enough allowedMem for the
608 * new array elements even if no other memory were currently used.
610 * We do the arithmetic in float8, because otherwise the product of
611 * memtupsize and allowedMem could overflow. Any inaccuracy in the
612 * result should be insignificant; but even if we computed a
613 * completely insane result, the checks below will prevent anything
614 * really bad from happening.
618 grow_ratio = (double) state->allowedMem / (double) memNowUsed;
619 if (memtupsize * grow_ratio < INT_MAX)
620 newmemtupsize = (int) (memtupsize * grow_ratio);
622 newmemtupsize = INT_MAX;
624 /* We won't make any further enlargement attempts */
625 state->growmemtuples = false;
628 /* Must enlarge array by at least one element, else report failure */
629 if (newmemtupsize <= memtupsize)
633 * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize. Clamp
634 * to ensure our request won't be rejected. Note that we can easily
635 * exhaust address space before facing this outcome. (This is presently
636 * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but
637 * don't rely on that at this distance.)
639 if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(void *))
641 newmemtupsize = (int) (MaxAllocHugeSize / sizeof(void *));
642 state->growmemtuples = false; /* can't grow any more */
646 * We need to be sure that we do not cause LACKMEM to become true, else
647 * the space management algorithm will go nuts. The code above should
648 * never generate a dangerous request, but to be safe, check explicitly
649 * that the array growth fits within availMem. (We could still cause
650 * LACKMEM if the memory chunk overhead associated with the memtuples
651 * array were to increase. That shouldn't happen because we chose the
652 * initial array size large enough to ensure that palloc will be treating
653 * both old and new arrays as separate chunks. But we'll check LACKMEM
654 * explicitly below just in case.)
656 if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(void *)))
660 FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
661 state->memtupsize = newmemtupsize;
662 state->memtuples = (void **)
663 repalloc_huge(state->memtuples,
664 state->memtupsize * sizeof(void *));
665 USEMEM(state, GetMemoryChunkSpace(state->memtuples));
667 elog(ERROR, "unexpected out-of-memory situation in tuplestore");
671 /* If for any reason we didn't realloc, shut off future attempts */
672 state->growmemtuples = false;
677 * Accept one tuple and append it to the tuplestore.
679 * Note that the input tuple is always copied; the caller need not save it.
681 * If the active read pointer is currently "at EOF", it remains so (the read
682 * pointer implicitly advances along with the write pointer); otherwise the
683 * read pointer is unchanged. Non-active read pointers do not move, which
684 * means they are certain to not be "at EOF" immediately after puttuple.
685 * This curious-seeming behavior is for the convenience of nodeMaterial.c and
686 * nodeCtescan.c, which would otherwise need to do extra pointer repositioning
689 * tuplestore_puttupleslot() is a convenience routine to collect data from
690 * a TupleTableSlot without an extra copy operation.
693 tuplestore_puttupleslot(Tuplestorestate *state,
694 TupleTableSlot *slot)
697 MemoryContext oldcxt = MemoryContextSwitchTo(state->context);
700 * Form a MinimalTuple in working memory
702 tuple = ExecCopySlotMinimalTuple(slot);
703 USEMEM(state, GetMemoryChunkSpace(tuple));
705 tuplestore_puttuple_common(state, (void *) tuple);
707 MemoryContextSwitchTo(oldcxt);
711 * "Standard" case to copy from a HeapTuple. This is actually now somewhat
712 * deprecated, but not worth getting rid of in view of the number of callers.
715 tuplestore_puttuple(Tuplestorestate *state, HeapTuple tuple)
717 MemoryContext oldcxt = MemoryContextSwitchTo(state->context);
720 * Copy the tuple. (Must do this even in WRITEFILE case. Note that
721 * COPYTUP includes USEMEM, so we needn't do that here.)
723 tuple = COPYTUP(state, tuple);
725 tuplestore_puttuple_common(state, (void *) tuple);
727 MemoryContextSwitchTo(oldcxt);
731 * Similar to tuplestore_puttuple(), but work from values + nulls arrays.
732 * This avoids an extra tuple-construction operation.
735 tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc,
736 Datum *values, bool *isnull)
739 MemoryContext oldcxt = MemoryContextSwitchTo(state->context);
741 tuple = heap_form_minimal_tuple(tdesc, values, isnull);
742 USEMEM(state, GetMemoryChunkSpace(tuple));
744 tuplestore_puttuple_common(state, (void *) tuple);
746 MemoryContextSwitchTo(oldcxt);
750 tuplestore_puttuple_common(Tuplestorestate *state, void *tuple)
752 TSReadPointer *readptr;
754 ResourceOwner oldowner;
756 switch (state->status)
761 * Update read pointers as needed; see API spec above.
763 readptr = state->readptrs;
764 for (i = 0; i < state->readptrcount; readptr++, i++)
766 if (readptr->eof_reached && i != state->activeptr)
768 readptr->eof_reached = false;
769 readptr->current = state->memtupcount;
774 * Grow the array as needed. Note that we try to grow the array
775 * when there is still one free slot remaining --- if we fail,
776 * there'll still be room to store the incoming tuple, and then
777 * we'll switch to tape-based operation.
779 if (state->memtupcount >= state->memtupsize - 1)
781 (void) grow_memtuples(state);
782 Assert(state->memtupcount < state->memtupsize);
785 /* Stash the tuple in the in-memory array */
786 state->memtuples[state->memtupcount++] = tuple;
789 * Done if we still fit in available memory and have array slots.
791 if (state->memtupcount < state->memtupsize && !LACKMEM(state))
795 * Nope; time to switch to tape-based operation. Make sure that
796 * the temp file(s) are created in suitable temp tablespaces.
798 PrepareTempTablespaces();
800 /* associate the file with the store's resource owner */
801 oldowner = CurrentResourceOwner;
802 CurrentResourceOwner = state->resowner;
804 state->myfile = BufFileCreateTemp(state->interXact);
806 CurrentResourceOwner = oldowner;
809 * Freeze the decision about whether trailing length words will be
810 * used. We can't change this choice once data is on tape, even
811 * though callers might drop the requirement.
813 state->backward = (state->eflags & EXEC_FLAG_BACKWARD) != 0;
814 state->status = TSS_WRITEFILE;
820 * Update read pointers as needed; see API spec above. Note:
821 * BufFileTell is quite cheap, so not worth trying to avoid
824 readptr = state->readptrs;
825 for (i = 0; i < state->readptrcount; readptr++, i++)
827 if (readptr->eof_reached && i != state->activeptr)
829 readptr->eof_reached = false;
830 BufFileTell(state->myfile,
836 WRITETUP(state, tuple);
841 * Switch from reading to writing.
843 if (!state->readptrs[state->activeptr].eof_reached)
844 BufFileTell(state->myfile,
845 &state->readptrs[state->activeptr].file,
846 &state->readptrs[state->activeptr].offset);
847 if (BufFileSeek(state->myfile,
848 state->writepos_file, state->writepos_offset,
851 (errcode_for_file_access(),
852 errmsg("could not seek in tuplestore temporary file: %m")));
853 state->status = TSS_WRITEFILE;
856 * Update read pointers as needed; see API spec above.
858 readptr = state->readptrs;
859 for (i = 0; i < state->readptrcount; readptr++, i++)
861 if (readptr->eof_reached && i != state->activeptr)
863 readptr->eof_reached = false;
864 readptr->file = state->writepos_file;
865 readptr->offset = state->writepos_offset;
869 WRITETUP(state, tuple);
872 elog(ERROR, "invalid tuplestore state");
878 * Fetch the next tuple in either forward or back direction.
879 * Returns NULL if no more tuples. If should_free is set, the
880 * caller must pfree the returned tuple when done with it.
882 * Backward scan is only allowed if randomAccess was set true or
883 * EXEC_FLAG_BACKWARD was specified to tuplestore_set_eflags().
886 tuplestore_gettuple(Tuplestorestate *state, bool forward,
889 TSReadPointer *readptr = &state->readptrs[state->activeptr];
893 Assert(forward || (readptr->eflags & EXEC_FLAG_BACKWARD));
895 switch (state->status)
898 *should_free = false;
901 if (readptr->eof_reached)
903 if (readptr->current < state->memtupcount)
905 /* We have another tuple, so return it */
906 return state->memtuples[readptr->current++];
908 readptr->eof_reached = true;
914 * if all tuples are fetched already then we return last
915 * tuple, else tuple before last returned.
917 if (readptr->eof_reached)
919 readptr->current = state->memtupcount;
920 readptr->eof_reached = false;
924 if (readptr->current <= state->memtupdeleted)
926 Assert(!state->truncated);
929 readptr->current--; /* last returned tuple */
931 if (readptr->current <= state->memtupdeleted)
933 Assert(!state->truncated);
936 return state->memtuples[readptr->current - 1];
941 /* Skip state change if we'll just return NULL */
942 if (readptr->eof_reached && forward)
946 * Switch from writing to reading.
948 BufFileTell(state->myfile,
949 &state->writepos_file, &state->writepos_offset);
950 if (!readptr->eof_reached)
951 if (BufFileSeek(state->myfile,
952 readptr->file, readptr->offset,
955 (errcode_for_file_access(),
956 errmsg("could not seek in tuplestore temporary file: %m")));
957 state->status = TSS_READFILE;
958 /* FALL THRU into READFILE case */
964 if ((tuplen = getlen(state, true)) != 0)
966 tup = READTUP(state, tuplen);
971 readptr->eof_reached = true;
979 * if all tuples are fetched already then we return last tuple,
980 * else tuple before last returned.
982 * Back up to fetch previously-returned tuple's ending length
983 * word. If seek fails, assume we are at start of file.
985 if (BufFileSeek(state->myfile, 0, -(long) sizeof(unsigned int),
988 /* even a failed backwards fetch gets you out of eof state */
989 readptr->eof_reached = false;
990 Assert(!state->truncated);
993 tuplen = getlen(state, false);
995 if (readptr->eof_reached)
997 readptr->eof_reached = false;
998 /* We will return the tuple returned before returning NULL */
1003 * Back up to get ending length word of tuple before it.
1005 if (BufFileSeek(state->myfile, 0,
1006 -(long) (tuplen + 2 * sizeof(unsigned int)),
1010 * If that fails, presumably the prev tuple is the first
1011 * in the file. Back up so that it becomes next to read
1012 * in forward direction (not obviously right, but that is
1013 * what in-memory case does).
1015 if (BufFileSeek(state->myfile, 0,
1016 -(long) (tuplen + sizeof(unsigned int)),
1019 (errcode_for_file_access(),
1020 errmsg("could not seek in tuplestore temporary file: %m")));
1021 Assert(!state->truncated);
1024 tuplen = getlen(state, false);
1028 * Now we have the length of the prior tuple, back up and read it.
1029 * Note: READTUP expects we are positioned after the initial
1030 * length word of the tuple, so back up to that point.
1032 if (BufFileSeek(state->myfile, 0,
1036 (errcode_for_file_access(),
1037 errmsg("could not seek in tuplestore temporary file: %m")));
1038 tup = READTUP(state, tuplen);
1042 elog(ERROR, "invalid tuplestore state");
1043 return NULL; /* keep compiler quiet */
1048 * tuplestore_gettupleslot - exported function to fetch a MinimalTuple
1050 * If successful, put tuple in slot and return TRUE; else, clear the slot
1053 * If copy is TRUE, the slot receives a copied tuple (allocated in current
1054 * memory context) that will stay valid regardless of future manipulations of
1055 * the tuplestore's state. If copy is FALSE, the slot may just receive a
1056 * pointer to a tuple held within the tuplestore. The latter is more
1057 * efficient but the slot contents may be corrupted if additional writes to
1058 * the tuplestore occur. (If using tuplestore_trim, see comments therein.)
1061 tuplestore_gettupleslot(Tuplestorestate *state, bool forward,
1062 bool copy, TupleTableSlot *slot)
1067 tuple = (MinimalTuple) tuplestore_gettuple(state, forward, &should_free);
1071 if (copy && !should_free)
1073 tuple = heap_copy_minimal_tuple(tuple);
1076 ExecStoreMinimalTuple(tuple, slot, should_free);
1081 ExecClearTuple(slot);
1087 * tuplestore_advance - exported function to adjust position without fetching
1089 * We could optimize this case to avoid palloc/pfree overhead, but for the
1090 * moment it doesn't seem worthwhile.
1093 tuplestore_advance(Tuplestorestate *state, bool forward)
1098 tuple = tuplestore_gettuple(state, forward, &should_free);
1113 * Advance over N tuples in either forward or back direction,
1114 * without returning any data. N<=0 is a no-op.
1115 * Returns TRUE if successful, FALSE if ran out of tuples.
1118 tuplestore_skiptuples(Tuplestorestate *state, int64 ntuples, bool forward)
1120 TSReadPointer *readptr = &state->readptrs[state->activeptr];
1122 Assert(forward || (readptr->eflags & EXEC_FLAG_BACKWARD));
1127 switch (state->status)
1132 if (readptr->eof_reached)
1134 if (state->memtupcount - readptr->current >= ntuples)
1136 readptr->current += ntuples;
1139 readptr->current = state->memtupcount;
1140 readptr->eof_reached = true;
1145 if (readptr->eof_reached)
1147 readptr->current = state->memtupcount;
1148 readptr->eof_reached = false;
1151 if (readptr->current - state->memtupdeleted > ntuples)
1153 readptr->current -= ntuples;
1156 Assert(!state->truncated);
1157 readptr->current = state->memtupdeleted;
1163 /* We don't currently try hard to optimize other cases */
1164 while (ntuples-- > 0)
1169 tuple = tuplestore_gettuple(state, forward, &should_free);
1175 CHECK_FOR_INTERRUPTS();
1182 * dumptuples - remove tuples from memory and write to tape
1184 * As a side effect, we must convert each read pointer's position from
1185 * "current" to file/offset format. But eof_reached pointers don't
1186 * need to change state.
1189 dumptuples(Tuplestorestate *state)
1193 for (i = state->memtupdeleted;; i++)
1195 TSReadPointer *readptr = state->readptrs;
1198 for (j = 0; j < state->readptrcount; readptr++, j++)
1200 if (i == readptr->current && !readptr->eof_reached)
1201 BufFileTell(state->myfile,
1202 &readptr->file, &readptr->offset);
1204 if (i >= state->memtupcount)
1206 WRITETUP(state, state->memtuples[i]);
1208 state->memtupdeleted = 0;
1209 state->memtupcount = 0;
1213 * tuplestore_rescan - rewind the active read pointer to start
1216 tuplestore_rescan(Tuplestorestate *state)
1218 TSReadPointer *readptr = &state->readptrs[state->activeptr];
1220 Assert(readptr->eflags & EXEC_FLAG_REWIND);
1221 Assert(!state->truncated);
1223 switch (state->status)
1226 readptr->eof_reached = false;
1227 readptr->current = 0;
1230 readptr->eof_reached = false;
1232 readptr->offset = 0L;
1235 readptr->eof_reached = false;
1236 if (BufFileSeek(state->myfile, 0, 0L, SEEK_SET) != 0)
1238 (errcode_for_file_access(),
1239 errmsg("could not seek in tuplestore temporary file: %m")));
1242 elog(ERROR, "invalid tuplestore state");
1248 * tuplestore_copy_read_pointer - copy a read pointer's state to another
1251 tuplestore_copy_read_pointer(Tuplestorestate *state,
1252 int srcptr, int destptr)
1254 TSReadPointer *sptr = &state->readptrs[srcptr];
1255 TSReadPointer *dptr = &state->readptrs[destptr];
1257 Assert(srcptr >= 0 && srcptr < state->readptrcount);
1258 Assert(destptr >= 0 && destptr < state->readptrcount);
1260 /* Assigning to self is a no-op */
1261 if (srcptr == destptr)
1264 if (dptr->eflags != sptr->eflags)
1266 /* Possible change of overall eflags, so copy and then recompute */
1271 eflags = state->readptrs[0].eflags;
1272 for (i = 1; i < state->readptrcount; i++)
1273 eflags |= state->readptrs[i].eflags;
1274 state->eflags = eflags;
1279 switch (state->status)
1288 * This case is a bit tricky since the active read pointer's
1289 * position corresponds to the seek point, not what is in its
1290 * variables. Assigning to the active requires a seek, and
1291 * assigning from the active requires a tell, except when
1294 if (destptr == state->activeptr)
1296 if (dptr->eof_reached)
1298 if (BufFileSeek(state->myfile,
1299 state->writepos_file,
1300 state->writepos_offset,
1303 (errcode_for_file_access(),
1304 errmsg("could not seek in tuplestore temporary file: %m")));
1308 if (BufFileSeek(state->myfile,
1309 dptr->file, dptr->offset,
1312 (errcode_for_file_access(),
1313 errmsg("could not seek in tuplestore temporary file: %m")));
1316 else if (srcptr == state->activeptr)
1318 if (!dptr->eof_reached)
1319 BufFileTell(state->myfile,
1325 elog(ERROR, "invalid tuplestore state");
1331 * tuplestore_trim - remove all no-longer-needed tuples
1333 * Calling this function authorizes the tuplestore to delete all tuples
1334 * before the oldest read pointer, if no read pointer is marked as requiring
1335 * REWIND capability.
1337 * Note: this is obviously safe if no pointer has BACKWARD capability either.
1338 * If a pointer is marked as BACKWARD but not REWIND capable, it means that
1339 * the pointer can be moved backward but not before the oldest other read
1343 tuplestore_trim(Tuplestorestate *state)
1350 * Truncation is disallowed if any read pointer requires rewind
1353 if (state->eflags & EXEC_FLAG_REWIND)
1357 * We don't bother trimming temp files since it usually would mean more
1358 * work than just letting them sit in kernel buffers until they age out.
1360 if (state->status != TSS_INMEM)
1363 /* Find the oldest read pointer */
1364 oldest = state->memtupcount;
1365 for (i = 0; i < state->readptrcount; i++)
1367 if (!state->readptrs[i].eof_reached)
1368 oldest = Min(oldest, state->readptrs[i].current);
1372 * Note: you might think we could remove all the tuples before the oldest
1373 * "current", since that one is the next to be returned. However, since
1374 * tuplestore_gettuple returns a direct pointer to our internal copy of
1375 * the tuple, it's likely that the caller has still got the tuple just
1376 * before "current" referenced in a slot. So we keep one extra tuple
1377 * before the oldest "current". (Strictly speaking, we could require such
1378 * callers to use the "copy" flag to tuplestore_gettupleslot, but for
1379 * efficiency we allow this one case to not use "copy".)
1381 nremove = oldest - 1;
1383 return; /* nothing to do */
1385 Assert(nremove >= state->memtupdeleted);
1386 Assert(nremove <= state->memtupcount);
1388 /* Release no-longer-needed tuples */
1389 for (i = state->memtupdeleted; i < nremove; i++)
1391 FREEMEM(state, GetMemoryChunkSpace(state->memtuples[i]));
1392 pfree(state->memtuples[i]);
1393 state->memtuples[i] = NULL;
1395 state->memtupdeleted = nremove;
1397 /* mark tuplestore as truncated (used for Assert crosschecks only) */
1398 state->truncated = true;
1401 * If nremove is less than 1/8th memtupcount, just stop here, leaving the
1402 * "deleted" slots as NULL. This prevents us from expending O(N^2) time
1403 * repeatedly memmove-ing a large pointer array. The worst case space
1404 * wastage is pretty small, since it's just pointers and not whole tuples.
1406 if (nremove < state->memtupcount / 8)
1410 * Slide the array down and readjust pointers.
1412 * In mergejoin's current usage, it's demonstrable that there will always
1413 * be exactly one non-removed tuple; so optimize that case.
1415 if (nremove + 1 == state->memtupcount)
1416 state->memtuples[0] = state->memtuples[nremove];
1418 memmove(state->memtuples, state->memtuples + nremove,
1419 (state->memtupcount - nremove) * sizeof(void *));
1421 state->memtupdeleted = 0;
1422 state->memtupcount -= nremove;
1423 for (i = 0; i < state->readptrcount; i++)
1425 if (!state->readptrs[i].eof_reached)
1426 state->readptrs[i].current -= nremove;
1431 * tuplestore_in_memory
1433 * Returns true if the tuplestore has not spilled to disk.
1435 * XXX exposing this is a violation of modularity ... should get rid of it.
1438 tuplestore_in_memory(Tuplestorestate *state)
1440 return (state->status == TSS_INMEM);
1445 * Tape interface routines
1449 getlen(Tuplestorestate *state, bool eofOK)
1454 nbytes = BufFileRead(state->myfile, (void *) &len, sizeof(len));
1455 if (nbytes == sizeof(len))
1457 if (nbytes != 0 || !eofOK)
1459 (errcode_for_file_access(),
1460 errmsg("could not read from tuplestore temporary file: %m")));
1466 * Routines specialized for HeapTuple case
1468 * The stored form is actually a MinimalTuple, but for largely historical
1469 * reasons we allow COPYTUP to work from a HeapTuple.
1471 * Since MinimalTuple already has length in its first word, we don't need
1472 * to write that separately.
1476 copytup_heap(Tuplestorestate *state, void *tup)
1480 tuple = minimal_tuple_from_heap_tuple((HeapTuple) tup);
1481 USEMEM(state, GetMemoryChunkSpace(tuple));
1482 return (void *) tuple;
1486 writetup_heap(Tuplestorestate *state, void *tup)
1488 MinimalTuple tuple = (MinimalTuple) tup;
1490 /* the part of the MinimalTuple we'll write: */
1491 char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
1492 unsigned int tupbodylen = tuple->t_len - MINIMAL_TUPLE_DATA_OFFSET;
1494 /* total on-disk footprint: */
1495 unsigned int tuplen = tupbodylen + sizeof(int);
1497 if (BufFileWrite(state->myfile, (void *) &tuplen,
1498 sizeof(tuplen)) != sizeof(tuplen))
1500 (errcode_for_file_access(),
1501 errmsg("could not write to tuplestore temporary file: %m")));
1502 if (BufFileWrite(state->myfile, (void *) tupbody,
1503 tupbodylen) != (size_t) tupbodylen)
1505 (errcode_for_file_access(),
1506 errmsg("could not write to tuplestore temporary file: %m")));
1507 if (state->backward) /* need trailing length word? */
1508 if (BufFileWrite(state->myfile, (void *) &tuplen,
1509 sizeof(tuplen)) != sizeof(tuplen))
1511 (errcode_for_file_access(),
1512 errmsg("could not write to tuplestore temporary file: %m")));
1514 FREEMEM(state, GetMemoryChunkSpace(tuple));
1515 heap_free_minimal_tuple(tuple);
1519 readtup_heap(Tuplestorestate *state, unsigned int len)
1521 unsigned int tupbodylen = len - sizeof(int);
1522 unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET;
1523 MinimalTuple tuple = (MinimalTuple) palloc(tuplen);
1524 char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
1526 USEMEM(state, GetMemoryChunkSpace(tuple));
1527 /* read in the tuple proper */
1528 tuple->t_len = tuplen;
1529 if (BufFileRead(state->myfile, (void *) tupbody,
1530 tupbodylen) != (size_t) tupbodylen)
1532 (errcode_for_file_access(),
1533 errmsg("could not read from tuplestore temporary file: %m")));
1534 if (state->backward) /* need trailing length word? */
1535 if (BufFileRead(state->myfile, (void *) &tuplen,
1536 sizeof(tuplen)) != sizeof(tuplen))
1538 (errcode_for_file_access(),
1539 errmsg("could not read from tuplestore temporary file: %m")));
1540 return (void *) tuple;