]> granicus.if.org Git - postgresql/blob - src/backend/utils/sort/tuplestore.c
Update copyright via script for 2017
[postgresql] / src / backend / utils / sort / tuplestore.c
1 /*-------------------------------------------------------------------------
2  *
3  * tuplestore.c
4  *        Generalized routines for temporary tuple storage.
5  *
6  * This module handles temporary storage of tuples for purposes such
7  * as Materialize nodes, hashjoin batch files, etc.  It is essentially
8  * a dumbed-down version of tuplesort.c; it does no sorting of tuples
9  * but can only store and regurgitate a sequence of tuples.  However,
10  * because no sort is required, it is allowed to start reading the sequence
11  * before it has all been written.  This is particularly useful for cursors,
12  * because it allows random access within the already-scanned portion of
13  * a query without having to process the underlying scan to completion.
14  * Also, it is possible to support multiple independent read pointers.
15  *
16  * A temporary file is used to handle the data if it exceeds the
17  * space limit specified by the caller.
18  *
19  * The (approximate) amount of memory allowed to the tuplestore is specified
20  * in kilobytes by the caller.  We absorb tuples and simply store them in an
21  * in-memory array as long as we haven't exceeded maxKBytes.  If we do exceed
22  * maxKBytes, we dump all the tuples into a temp file and then read from that
23  * when needed.
24  *
25  * Upon creation, a tuplestore supports a single read pointer, numbered 0.
26  * Additional read pointers can be created using tuplestore_alloc_read_pointer.
27  * Mark/restore behavior is supported by copying read pointers.
28  *
29  * When the caller requests backward-scan capability, we write the temp file
30  * in a format that allows either forward or backward scan.  Otherwise, only
31  * forward scan is allowed.  A request for backward scan must be made before
32  * putting any tuples into the tuplestore.  Rewind is normally allowed but
33  * can be turned off via tuplestore_set_eflags; turning off rewind for all
34  * read pointers enables truncation of the tuplestore at the oldest read point
35  * for minimal memory usage.  (The caller must explicitly call tuplestore_trim
36  * at appropriate times for truncation to actually happen.)
37  *
38  * Note: in TSS_WRITEFILE state, the temp file's seek position is the
39  * current write position, and the write-position variables in the tuplestore
40  * aren't kept up to date.  Similarly, in TSS_READFILE state the temp file's
41  * seek position is the active read pointer's position, and that read pointer
42  * isn't kept up to date.  We update the appropriate variables using ftell()
43  * before switching to the other state or activating a different read pointer.
44  *
45  *
46  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
47  * Portions Copyright (c) 1994, Regents of the University of California
48  *
49  * IDENTIFICATION
50  *        src/backend/utils/sort/tuplestore.c
51  *
52  *-------------------------------------------------------------------------
53  */
54
55 #include "postgres.h"
56
57 #include <limits.h>
58
59 #include "access/htup_details.h"
60 #include "commands/tablespace.h"
61 #include "executor/executor.h"
62 #include "miscadmin.h"
63 #include "storage/buffile.h"
64 #include "utils/memutils.h"
65 #include "utils/resowner.h"
66
67
68 /*
69  * Possible states of a Tuplestore object.  These denote the states that
70  * persist between calls of Tuplestore routines.
71  */
72 typedef enum
73 {
74         TSS_INMEM,                                      /* Tuples still fit in memory */
75         TSS_WRITEFILE,                          /* Writing to temp file */
76         TSS_READFILE                            /* Reading from temp file */
77 } TupStoreStatus;
78
79 /*
80  * State for a single read pointer.  If we are in state INMEM then all the
81  * read pointers' "current" fields denote the read positions.  In state
82  * WRITEFILE, the file/offset fields denote the read positions.  In state
83  * READFILE, inactive read pointers have valid file/offset, but the active
84  * read pointer implicitly has position equal to the temp file's seek position.
85  *
86  * Special case: if eof_reached is true, then the pointer's read position is
87  * implicitly equal to the write position, and current/file/offset aren't
88  * maintained.  This way we need not update all the read pointers each time
89  * we write.
90  */
91 typedef struct
92 {
93         int                     eflags;                 /* capability flags */
94         bool            eof_reached;    /* read has reached EOF */
95         int                     current;                /* next array index to read */
96         int                     file;                   /* temp file# */
97         off_t           offset;                 /* byte offset in file */
98 } TSReadPointer;
99
100 /*
101  * Private state of a Tuplestore operation.
102  */
103 struct Tuplestorestate
104 {
105         TupStoreStatus status;          /* enumerated value as shown above */
106         int                     eflags;                 /* capability flags (OR of pointers' flags) */
107         bool            backward;               /* store extra length words in file? */
108         bool            interXact;              /* keep open through transactions? */
109         bool            truncated;              /* tuplestore_trim has removed tuples? */
110         int64           availMem;               /* remaining memory available, in bytes */
111         int64           allowedMem;             /* total memory allowed, in bytes */
112         BufFile    *myfile;                     /* underlying file, or NULL if none */
113         MemoryContext context;          /* memory context for holding tuples */
114         ResourceOwner resowner;         /* resowner for holding temp files */
115
116         /*
117          * These function pointers decouple the routines that must know what kind
118          * of tuple we are handling from the routines that don't need to know it.
119          * They are set up by the tuplestore_begin_xxx routines.
120          *
121          * (Although tuplestore.c currently only supports heap tuples, I've copied
122          * this part of tuplesort.c so that extension to other kinds of objects
123          * will be easy if it's ever needed.)
124          *
125          * Function to copy a supplied input tuple into palloc'd space. (NB: we
126          * assume that a single pfree() is enough to release the tuple later, so
127          * the representation must be "flat" in one palloc chunk.) state->availMem
128          * must be decreased by the amount of space used.
129          */
130         void       *(*copytup) (Tuplestorestate *state, void *tup);
131
132         /*
133          * Function to write a stored tuple onto tape.  The representation of the
134          * tuple on tape need not be the same as it is in memory; requirements on
135          * the tape representation are given below.  After writing the tuple,
136          * pfree() it, and increase state->availMem by the amount of memory space
137          * thereby released.
138          */
139         void            (*writetup) (Tuplestorestate *state, void *tup);
140
141         /*
142          * Function to read a stored tuple from tape back into memory. 'len' is
143          * the already-read length of the stored tuple.  Create and return a
144          * palloc'd copy, and decrease state->availMem by the amount of memory
145          * space consumed.
146          */
147         void       *(*readtup) (Tuplestorestate *state, unsigned int len);
148
149         /*
150          * This array holds pointers to tuples in memory if we are in state INMEM.
151          * In states WRITEFILE and READFILE it's not used.
152          *
153          * When memtupdeleted > 0, the first memtupdeleted pointers are already
154          * released due to a tuplestore_trim() operation, but we haven't expended
155          * the effort to slide the remaining pointers down.  These unused pointers
156          * are set to NULL to catch any invalid accesses.  Note that memtupcount
157          * includes the deleted pointers.
158          */
159         void      **memtuples;          /* array of pointers to palloc'd tuples */
160         int                     memtupdeleted;  /* the first N slots are currently unused */
161         int                     memtupcount;    /* number of tuples currently present */
162         int                     memtupsize;             /* allocated length of memtuples array */
163         bool            growmemtuples;  /* memtuples' growth still underway? */
164
165         /*
166          * These variables are used to keep track of the current positions.
167          *
168          * In state WRITEFILE, the current file seek position is the write point;
169          * in state READFILE, the write position is remembered in writepos_xxx.
170          * (The write position is the same as EOF, but since BufFileSeek doesn't
171          * currently implement SEEK_END, we have to remember it explicitly.)
172          */
173         TSReadPointer *readptrs;        /* array of read pointers */
174         int                     activeptr;              /* index of the active read pointer */
175         int                     readptrcount;   /* number of pointers currently valid */
176         int                     readptrsize;    /* allocated length of readptrs array */
177
178         int                     writepos_file;  /* file# (valid if READFILE state) */
179         off_t           writepos_offset;        /* offset (valid if READFILE state) */
180 };
181
182 #define COPYTUP(state,tup)      ((*(state)->copytup) (state, tup))
183 #define WRITETUP(state,tup) ((*(state)->writetup) (state, tup))
184 #define READTUP(state,len)      ((*(state)->readtup) (state, len))
185 #define LACKMEM(state)          ((state)->availMem < 0)
186 #define USEMEM(state,amt)       ((state)->availMem -= (amt))
187 #define FREEMEM(state,amt)      ((state)->availMem += (amt))
188
189 /*--------------------
190  *
191  * NOTES about on-tape representation of tuples:
192  *
193  * We require the first "unsigned int" of a stored tuple to be the total size
194  * on-tape of the tuple, including itself (so it is never zero).
195  * The remainder of the stored tuple
196  * may or may not match the in-memory representation of the tuple ---
197  * any conversion needed is the job of the writetup and readtup routines.
198  *
199  * If state->backward is true, then the stored representation of
200  * the tuple must be followed by another "unsigned int" that is a copy of the
201  * length --- so the total tape space used is actually sizeof(unsigned int)
202  * more than the stored length value.  This allows read-backwards.  When
203  * state->backward is not set, the write/read routines may omit the extra
204  * length word.
205  *
206  * writetup is expected to write both length words as well as the tuple
207  * data.  When readtup is called, the tape is positioned just after the
208  * front length word; readtup must read the tuple data and advance past
209  * the back length word (if present).
210  *
211  * The write/read routines can make use of the tuple description data
212  * stored in the Tuplestorestate record, if needed. They are also expected
213  * to adjust state->availMem by the amount of memory space (not tape space!)
214  * released or consumed.  There is no error return from either writetup
215  * or readtup; they should ereport() on failure.
216  *
217  *
218  * NOTES about memory consumption calculations:
219  *
220  * We count space allocated for tuples against the maxKBytes limit,
221  * plus the space used by the variable-size array memtuples.
222  * Fixed-size space (primarily the BufFile I/O buffer) is not counted.
223  * We don't worry about the size of the read pointer array, either.
224  *
225  * Note that we count actual space used (as shown by GetMemoryChunkSpace)
226  * rather than the originally-requested size.  This is important since
227  * palloc can add substantial overhead.  It's not a complete answer since
228  * we won't count any wasted space in palloc allocation blocks, but it's
229  * a lot better than what we were doing before 7.3.
230  *
231  *--------------------
232  */
233
234
235 static Tuplestorestate *tuplestore_begin_common(int eflags,
236                                                 bool interXact,
237                                                 int maxKBytes);
238 static void tuplestore_puttuple_common(Tuplestorestate *state, void *tuple);
239 static void dumptuples(Tuplestorestate *state);
240 static unsigned int getlen(Tuplestorestate *state, bool eofOK);
241 static void *copytup_heap(Tuplestorestate *state, void *tup);
242 static void writetup_heap(Tuplestorestate *state, void *tup);
243 static void *readtup_heap(Tuplestorestate *state, unsigned int len);
244
245
246 /*
247  *              tuplestore_begin_xxx
248  *
249  * Initialize for a tuple store operation.
250  */
251 static Tuplestorestate *
252 tuplestore_begin_common(int eflags, bool interXact, int maxKBytes)
253 {
254         Tuplestorestate *state;
255
256         state = (Tuplestorestate *) palloc0(sizeof(Tuplestorestate));
257
258         state->status = TSS_INMEM;
259         state->eflags = eflags;
260         state->interXact = interXact;
261         state->truncated = false;
262         state->allowedMem = maxKBytes * 1024L;
263         state->availMem = state->allowedMem;
264         state->myfile = NULL;
265         state->context = CurrentMemoryContext;
266         state->resowner = CurrentResourceOwner;
267
268         state->memtupdeleted = 0;
269         state->memtupcount = 0;
270
271         /*
272          * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
273          * see comments in grow_memtuples().
274          */
275         state->memtupsize = Max(16384 / sizeof(void *),
276                                                         ALLOCSET_SEPARATE_THRESHOLD / sizeof(void *) + 1);
277
278         state->growmemtuples = true;
279         state->memtuples = (void **) palloc(state->memtupsize * sizeof(void *));
280
281         USEMEM(state, GetMemoryChunkSpace(state->memtuples));
282
283         state->activeptr = 0;
284         state->readptrcount = 1;
285         state->readptrsize = 8;         /* arbitrary */
286         state->readptrs = (TSReadPointer *)
287                 palloc(state->readptrsize * sizeof(TSReadPointer));
288
289         state->readptrs[0].eflags = eflags;
290         state->readptrs[0].eof_reached = false;
291         state->readptrs[0].current = 0;
292
293         return state;
294 }
295
296 /*
297  * tuplestore_begin_heap
298  *
299  * Create a new tuplestore; other types of tuple stores (other than
300  * "heap" tuple stores, for heap tuples) are possible, but not presently
301  * implemented.
302  *
303  * randomAccess: if true, both forward and backward accesses to the
304  * tuple store are allowed.
305  *
306  * interXact: if true, the files used for on-disk storage persist beyond the
307  * end of the current transaction.  NOTE: It's the caller's responsibility to
308  * create such a tuplestore in a memory context and resource owner that will
309  * also survive transaction boundaries, and to ensure the tuplestore is closed
310  * when it's no longer wanted.
311  *
312  * maxKBytes: how much data to store in memory (any data beyond this
313  * amount is paged to disk).  When in doubt, use work_mem.
314  */
315 Tuplestorestate *
316 tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
317 {
318         Tuplestorestate *state;
319         int                     eflags;
320
321         /*
322          * This interpretation of the meaning of randomAccess is compatible with
323          * the pre-8.3 behavior of tuplestores.
324          */
325         eflags = randomAccess ?
326                 (EXEC_FLAG_BACKWARD | EXEC_FLAG_REWIND) :
327                 (EXEC_FLAG_REWIND);
328
329         state = tuplestore_begin_common(eflags, interXact, maxKBytes);
330
331         state->copytup = copytup_heap;
332         state->writetup = writetup_heap;
333         state->readtup = readtup_heap;
334
335         return state;
336 }
337
338 /*
339  * tuplestore_set_eflags
340  *
341  * Set the capability flags for read pointer 0 at a finer grain than is
342  * allowed by tuplestore_begin_xxx.  This must be called before inserting
343  * any data into the tuplestore.
344  *
345  * eflags is a bitmask following the meanings used for executor node
346  * startup flags (see executor.h).  tuplestore pays attention to these bits:
347  *              EXEC_FLAG_REWIND                need rewind to start
348  *              EXEC_FLAG_BACKWARD              need backward fetch
349  * If tuplestore_set_eflags is not called, REWIND is allowed, and BACKWARD
350  * is set per "randomAccess" in the tuplestore_begin_xxx call.
351  *
352  * NOTE: setting BACKWARD without REWIND means the pointer can read backwards,
353  * but not further than the truncation point (the furthest-back read pointer
354  * position at the time of the last tuplestore_trim call).
355  */
356 void
357 tuplestore_set_eflags(Tuplestorestate *state, int eflags)
358 {
359         int                     i;
360
361         if (state->status != TSS_INMEM || state->memtupcount != 0)
362                 elog(ERROR, "too late to call tuplestore_set_eflags");
363
364         state->readptrs[0].eflags = eflags;
365         for (i = 1; i < state->readptrcount; i++)
366                 eflags |= state->readptrs[i].eflags;
367         state->eflags = eflags;
368 }
369
370 /*
371  * tuplestore_alloc_read_pointer - allocate another read pointer.
372  *
373  * Returns the pointer's index.
374  *
375  * The new pointer initially copies the position of read pointer 0.
376  * It can have its own eflags, but if any data has been inserted into
377  * the tuplestore, these eflags must not represent an increase in
378  * requirements.
379  */
380 int
381 tuplestore_alloc_read_pointer(Tuplestorestate *state, int eflags)
382 {
383         /* Check for possible increase of requirements */
384         if (state->status != TSS_INMEM || state->memtupcount != 0)
385         {
386                 if ((state->eflags | eflags) != state->eflags)
387                         elog(ERROR, "too late to require new tuplestore eflags");
388         }
389
390         /* Make room for another read pointer if needed */
391         if (state->readptrcount >= state->readptrsize)
392         {
393                 int                     newcnt = state->readptrsize * 2;
394
395                 state->readptrs = (TSReadPointer *)
396                         repalloc(state->readptrs, newcnt * sizeof(TSReadPointer));
397                 state->readptrsize = newcnt;
398         }
399
400         /* And set it up */
401         state->readptrs[state->readptrcount] = state->readptrs[0];
402         state->readptrs[state->readptrcount].eflags = eflags;
403
404         state->eflags |= eflags;
405
406         return state->readptrcount++;
407 }
408
409 /*
410  * tuplestore_clear
411  *
412  *      Delete all the contents of a tuplestore, and reset its read pointers
413  *      to the start.
414  */
415 void
416 tuplestore_clear(Tuplestorestate *state)
417 {
418         int                     i;
419         TSReadPointer *readptr;
420
421         if (state->myfile)
422                 BufFileClose(state->myfile);
423         state->myfile = NULL;
424         if (state->memtuples)
425         {
426                 for (i = state->memtupdeleted; i < state->memtupcount; i++)
427                 {
428                         FREEMEM(state, GetMemoryChunkSpace(state->memtuples[i]));
429                         pfree(state->memtuples[i]);
430                 }
431         }
432         state->status = TSS_INMEM;
433         state->truncated = false;
434         state->memtupdeleted = 0;
435         state->memtupcount = 0;
436         readptr = state->readptrs;
437         for (i = 0; i < state->readptrcount; readptr++, i++)
438         {
439                 readptr->eof_reached = false;
440                 readptr->current = 0;
441         }
442 }
443
444 /*
445  * tuplestore_end
446  *
447  *      Release resources and clean up.
448  */
449 void
450 tuplestore_end(Tuplestorestate *state)
451 {
452         int                     i;
453
454         if (state->myfile)
455                 BufFileClose(state->myfile);
456         if (state->memtuples)
457         {
458                 for (i = state->memtupdeleted; i < state->memtupcount; i++)
459                         pfree(state->memtuples[i]);
460                 pfree(state->memtuples);
461         }
462         pfree(state->readptrs);
463         pfree(state);
464 }
465
466 /*
467  * tuplestore_select_read_pointer - make the specified read pointer active
468  */
469 void
470 tuplestore_select_read_pointer(Tuplestorestate *state, int ptr)
471 {
472         TSReadPointer *readptr;
473         TSReadPointer *oldptr;
474
475         Assert(ptr >= 0 && ptr < state->readptrcount);
476
477         /* No work if already active */
478         if (ptr == state->activeptr)
479                 return;
480
481         readptr = &state->readptrs[ptr];
482         oldptr = &state->readptrs[state->activeptr];
483
484         switch (state->status)
485         {
486                 case TSS_INMEM:
487                 case TSS_WRITEFILE:
488                         /* no work */
489                         break;
490                 case TSS_READFILE:
491
492                         /*
493                          * First, save the current read position in the pointer about to
494                          * become inactive.
495                          */
496                         if (!oldptr->eof_reached)
497                                 BufFileTell(state->myfile,
498                                                         &oldptr->file,
499                                                         &oldptr->offset);
500
501                         /*
502                          * We have to make the temp file's seek position equal to the
503                          * logical position of the new read pointer.  In eof_reached
504                          * state, that's the EOF, which we have available from the saved
505                          * write position.
506                          */
507                         if (readptr->eof_reached)
508                         {
509                                 if (BufFileSeek(state->myfile,
510                                                                 state->writepos_file,
511                                                                 state->writepos_offset,
512                                                                 SEEK_SET) != 0)
513                                         ereport(ERROR,
514                                                         (errcode_for_file_access(),
515                                                          errmsg("could not seek in tuplestore temporary file: %m")));
516                         }
517                         else
518                         {
519                                 if (BufFileSeek(state->myfile,
520                                                                 readptr->file,
521                                                                 readptr->offset,
522                                                                 SEEK_SET) != 0)
523                                         ereport(ERROR,
524                                                         (errcode_for_file_access(),
525                                                          errmsg("could not seek in tuplestore temporary file: %m")));
526                         }
527                         break;
528                 default:
529                         elog(ERROR, "invalid tuplestore state");
530                         break;
531         }
532
533         state->activeptr = ptr;
534 }
535
536 /*
537  * tuplestore_ateof
538  *
539  * Returns the active read pointer's eof_reached state.
540  */
541 bool
542 tuplestore_ateof(Tuplestorestate *state)
543 {
544         return state->readptrs[state->activeptr].eof_reached;
545 }
546
547 /*
548  * Grow the memtuples[] array, if possible within our memory constraint.  We
549  * must not exceed INT_MAX tuples in memory or the caller-provided memory
550  * limit.  Return TRUE if we were able to enlarge the array, FALSE if not.
551  *
552  * Normally, at each increment we double the size of the array.  When doing
553  * that would exceed a limit, we attempt one last, smaller increase (and then
554  * clear the growmemtuples flag so we don't try any more).  That allows us to
555  * use memory as fully as permitted; sticking to the pure doubling rule could
556  * result in almost half going unused.  Because availMem moves around with
557  * tuple addition/removal, we need some rule to prevent making repeated small
558  * increases in memtupsize, which would just be useless thrashing.  The
559  * growmemtuples flag accomplishes that and also prevents useless
560  * recalculations in this function.
561  */
562 static bool
563 grow_memtuples(Tuplestorestate *state)
564 {
565         int                     newmemtupsize;
566         int                     memtupsize = state->memtupsize;
567         int64           memNowUsed = state->allowedMem - state->availMem;
568
569         /* Forget it if we've already maxed out memtuples, per comment above */
570         if (!state->growmemtuples)
571                 return false;
572
573         /* Select new value of memtupsize */
574         if (memNowUsed <= state->availMem)
575         {
576                 /*
577                  * We've used no more than half of allowedMem; double our usage,
578                  * clamping at INT_MAX tuples.
579                  */
580                 if (memtupsize < INT_MAX / 2)
581                         newmemtupsize = memtupsize * 2;
582                 else
583                 {
584                         newmemtupsize = INT_MAX;
585                         state->growmemtuples = false;
586                 }
587         }
588         else
589         {
590                 /*
591                  * This will be the last increment of memtupsize.  Abandon doubling
592                  * strategy and instead increase as much as we safely can.
593                  *
594                  * To stay within allowedMem, we can't increase memtupsize by more
595                  * than availMem / sizeof(void *) elements. In practice, we want to
596                  * increase it by considerably less, because we need to leave some
597                  * space for the tuples to which the new array slots will refer.  We
598                  * assume the new tuples will be about the same size as the tuples
599                  * we've already seen, and thus we can extrapolate from the space
600                  * consumption so far to estimate an appropriate new size for the
601                  * memtuples array.  The optimal value might be higher or lower than
602                  * this estimate, but it's hard to know that in advance.  We again
603                  * clamp at INT_MAX tuples.
604                  *
605                  * This calculation is safe against enlarging the array so much that
606                  * LACKMEM becomes true, because the memory currently used includes
607                  * the present array; thus, there would be enough allowedMem for the
608                  * new array elements even if no other memory were currently used.
609                  *
610                  * We do the arithmetic in float8, because otherwise the product of
611                  * memtupsize and allowedMem could overflow.  Any inaccuracy in the
612                  * result should be insignificant; but even if we computed a
613                  * completely insane result, the checks below will prevent anything
614                  * really bad from happening.
615                  */
616                 double          grow_ratio;
617
618                 grow_ratio = (double) state->allowedMem / (double) memNowUsed;
619                 if (memtupsize * grow_ratio < INT_MAX)
620                         newmemtupsize = (int) (memtupsize * grow_ratio);
621                 else
622                         newmemtupsize = INT_MAX;
623
624                 /* We won't make any further enlargement attempts */
625                 state->growmemtuples = false;
626         }
627
628         /* Must enlarge array by at least one element, else report failure */
629         if (newmemtupsize <= memtupsize)
630                 goto noalloc;
631
632         /*
633          * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize.  Clamp
634          * to ensure our request won't be rejected.  Note that we can easily
635          * exhaust address space before facing this outcome.  (This is presently
636          * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but
637          * don't rely on that at this distance.)
638          */
639         if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(void *))
640         {
641                 newmemtupsize = (int) (MaxAllocHugeSize / sizeof(void *));
642                 state->growmemtuples = false;   /* can't grow any more */
643         }
644
645         /*
646          * We need to be sure that we do not cause LACKMEM to become true, else
647          * the space management algorithm will go nuts.  The code above should
648          * never generate a dangerous request, but to be safe, check explicitly
649          * that the array growth fits within availMem.  (We could still cause
650          * LACKMEM if the memory chunk overhead associated with the memtuples
651          * array were to increase.  That shouldn't happen because we chose the
652          * initial array size large enough to ensure that palloc will be treating
653          * both old and new arrays as separate chunks.  But we'll check LACKMEM
654          * explicitly below just in case.)
655          */
656         if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(void *)))
657                 goto noalloc;
658
659         /* OK, do it */
660         FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
661         state->memtupsize = newmemtupsize;
662         state->memtuples = (void **)
663                 repalloc_huge(state->memtuples,
664                                           state->memtupsize * sizeof(void *));
665         USEMEM(state, GetMemoryChunkSpace(state->memtuples));
666         if (LACKMEM(state))
667                 elog(ERROR, "unexpected out-of-memory situation in tuplestore");
668         return true;
669
670 noalloc:
671         /* If for any reason we didn't realloc, shut off future attempts */
672         state->growmemtuples = false;
673         return false;
674 }
675
676 /*
677  * Accept one tuple and append it to the tuplestore.
678  *
679  * Note that the input tuple is always copied; the caller need not save it.
680  *
681  * If the active read pointer is currently "at EOF", it remains so (the read
682  * pointer implicitly advances along with the write pointer); otherwise the
683  * read pointer is unchanged.  Non-active read pointers do not move, which
684  * means they are certain to not be "at EOF" immediately after puttuple.
685  * This curious-seeming behavior is for the convenience of nodeMaterial.c and
686  * nodeCtescan.c, which would otherwise need to do extra pointer repositioning
687  * steps.
688  *
689  * tuplestore_puttupleslot() is a convenience routine to collect data from
690  * a TupleTableSlot without an extra copy operation.
691  */
692 void
693 tuplestore_puttupleslot(Tuplestorestate *state,
694                                                 TupleTableSlot *slot)
695 {
696         MinimalTuple tuple;
697         MemoryContext oldcxt = MemoryContextSwitchTo(state->context);
698
699         /*
700          * Form a MinimalTuple in working memory
701          */
702         tuple = ExecCopySlotMinimalTuple(slot);
703         USEMEM(state, GetMemoryChunkSpace(tuple));
704
705         tuplestore_puttuple_common(state, (void *) tuple);
706
707         MemoryContextSwitchTo(oldcxt);
708 }
709
710 /*
711  * "Standard" case to copy from a HeapTuple.  This is actually now somewhat
712  * deprecated, but not worth getting rid of in view of the number of callers.
713  */
714 void
715 tuplestore_puttuple(Tuplestorestate *state, HeapTuple tuple)
716 {
717         MemoryContext oldcxt = MemoryContextSwitchTo(state->context);
718
719         /*
720          * Copy the tuple.  (Must do this even in WRITEFILE case.  Note that
721          * COPYTUP includes USEMEM, so we needn't do that here.)
722          */
723         tuple = COPYTUP(state, tuple);
724
725         tuplestore_puttuple_common(state, (void *) tuple);
726
727         MemoryContextSwitchTo(oldcxt);
728 }
729
730 /*
731  * Similar to tuplestore_puttuple(), but work from values + nulls arrays.
732  * This avoids an extra tuple-construction operation.
733  */
734 void
735 tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc,
736                                          Datum *values, bool *isnull)
737 {
738         MinimalTuple tuple;
739         MemoryContext oldcxt = MemoryContextSwitchTo(state->context);
740
741         tuple = heap_form_minimal_tuple(tdesc, values, isnull);
742         USEMEM(state, GetMemoryChunkSpace(tuple));
743
744         tuplestore_puttuple_common(state, (void *) tuple);
745
746         MemoryContextSwitchTo(oldcxt);
747 }
748
749 static void
750 tuplestore_puttuple_common(Tuplestorestate *state, void *tuple)
751 {
752         TSReadPointer *readptr;
753         int                     i;
754         ResourceOwner oldowner;
755
756         switch (state->status)
757         {
758                 case TSS_INMEM:
759
760                         /*
761                          * Update read pointers as needed; see API spec above.
762                          */
763                         readptr = state->readptrs;
764                         for (i = 0; i < state->readptrcount; readptr++, i++)
765                         {
766                                 if (readptr->eof_reached && i != state->activeptr)
767                                 {
768                                         readptr->eof_reached = false;
769                                         readptr->current = state->memtupcount;
770                                 }
771                         }
772
773                         /*
774                          * Grow the array as needed.  Note that we try to grow the array
775                          * when there is still one free slot remaining --- if we fail,
776                          * there'll still be room to store the incoming tuple, and then
777                          * we'll switch to tape-based operation.
778                          */
779                         if (state->memtupcount >= state->memtupsize - 1)
780                         {
781                                 (void) grow_memtuples(state);
782                                 Assert(state->memtupcount < state->memtupsize);
783                         }
784
785                         /* Stash the tuple in the in-memory array */
786                         state->memtuples[state->memtupcount++] = tuple;
787
788                         /*
789                          * Done if we still fit in available memory and have array slots.
790                          */
791                         if (state->memtupcount < state->memtupsize && !LACKMEM(state))
792                                 return;
793
794                         /*
795                          * Nope; time to switch to tape-based operation.  Make sure that
796                          * the temp file(s) are created in suitable temp tablespaces.
797                          */
798                         PrepareTempTablespaces();
799
800                         /* associate the file with the store's resource owner */
801                         oldowner = CurrentResourceOwner;
802                         CurrentResourceOwner = state->resowner;
803
804                         state->myfile = BufFileCreateTemp(state->interXact);
805
806                         CurrentResourceOwner = oldowner;
807
808                         /*
809                          * Freeze the decision about whether trailing length words will be
810                          * used.  We can't change this choice once data is on tape, even
811                          * though callers might drop the requirement.
812                          */
813                         state->backward = (state->eflags & EXEC_FLAG_BACKWARD) != 0;
814                         state->status = TSS_WRITEFILE;
815                         dumptuples(state);
816                         break;
817                 case TSS_WRITEFILE:
818
819                         /*
820                          * Update read pointers as needed; see API spec above. Note:
821                          * BufFileTell is quite cheap, so not worth trying to avoid
822                          * multiple calls.
823                          */
824                         readptr = state->readptrs;
825                         for (i = 0; i < state->readptrcount; readptr++, i++)
826                         {
827                                 if (readptr->eof_reached && i != state->activeptr)
828                                 {
829                                         readptr->eof_reached = false;
830                                         BufFileTell(state->myfile,
831                                                                 &readptr->file,
832                                                                 &readptr->offset);
833                                 }
834                         }
835
836                         WRITETUP(state, tuple);
837                         break;
838                 case TSS_READFILE:
839
840                         /*
841                          * Switch from reading to writing.
842                          */
843                         if (!state->readptrs[state->activeptr].eof_reached)
844                                 BufFileTell(state->myfile,
845                                                         &state->readptrs[state->activeptr].file,
846                                                         &state->readptrs[state->activeptr].offset);
847                         if (BufFileSeek(state->myfile,
848                                                         state->writepos_file, state->writepos_offset,
849                                                         SEEK_SET) != 0)
850                                 ereport(ERROR,
851                                                 (errcode_for_file_access(),
852                                  errmsg("could not seek in tuplestore temporary file: %m")));
853                         state->status = TSS_WRITEFILE;
854
855                         /*
856                          * Update read pointers as needed; see API spec above.
857                          */
858                         readptr = state->readptrs;
859                         for (i = 0; i < state->readptrcount; readptr++, i++)
860                         {
861                                 if (readptr->eof_reached && i != state->activeptr)
862                                 {
863                                         readptr->eof_reached = false;
864                                         readptr->file = state->writepos_file;
865                                         readptr->offset = state->writepos_offset;
866                                 }
867                         }
868
869                         WRITETUP(state, tuple);
870                         break;
871                 default:
872                         elog(ERROR, "invalid tuplestore state");
873                         break;
874         }
875 }
876
877 /*
878  * Fetch the next tuple in either forward or back direction.
879  * Returns NULL if no more tuples.  If should_free is set, the
880  * caller must pfree the returned tuple when done with it.
881  *
882  * Backward scan is only allowed if randomAccess was set true or
883  * EXEC_FLAG_BACKWARD was specified to tuplestore_set_eflags().
884  */
885 static void *
886 tuplestore_gettuple(Tuplestorestate *state, bool forward,
887                                         bool *should_free)
888 {
889         TSReadPointer *readptr = &state->readptrs[state->activeptr];
890         unsigned int tuplen;
891         void       *tup;
892
893         Assert(forward || (readptr->eflags & EXEC_FLAG_BACKWARD));
894
895         switch (state->status)
896         {
897                 case TSS_INMEM:
898                         *should_free = false;
899                         if (forward)
900                         {
901                                 if (readptr->eof_reached)
902                                         return NULL;
903                                 if (readptr->current < state->memtupcount)
904                                 {
905                                         /* We have another tuple, so return it */
906                                         return state->memtuples[readptr->current++];
907                                 }
908                                 readptr->eof_reached = true;
909                                 return NULL;
910                         }
911                         else
912                         {
913                                 /*
914                                  * if all tuples are fetched already then we return last
915                                  * tuple, else tuple before last returned.
916                                  */
917                                 if (readptr->eof_reached)
918                                 {
919                                         readptr->current = state->memtupcount;
920                                         readptr->eof_reached = false;
921                                 }
922                                 else
923                                 {
924                                         if (readptr->current <= state->memtupdeleted)
925                                         {
926                                                 Assert(!state->truncated);
927                                                 return NULL;
928                                         }
929                                         readptr->current--; /* last returned tuple */
930                                 }
931                                 if (readptr->current <= state->memtupdeleted)
932                                 {
933                                         Assert(!state->truncated);
934                                         return NULL;
935                                 }
936                                 return state->memtuples[readptr->current - 1];
937                         }
938                         break;
939
940                 case TSS_WRITEFILE:
941                         /* Skip state change if we'll just return NULL */
942                         if (readptr->eof_reached && forward)
943                                 return NULL;
944
945                         /*
946                          * Switch from writing to reading.
947                          */
948                         BufFileTell(state->myfile,
949                                                 &state->writepos_file, &state->writepos_offset);
950                         if (!readptr->eof_reached)
951                                 if (BufFileSeek(state->myfile,
952                                                                 readptr->file, readptr->offset,
953                                                                 SEEK_SET) != 0)
954                                         ereport(ERROR,
955                                                         (errcode_for_file_access(),
956                                                          errmsg("could not seek in tuplestore temporary file: %m")));
957                         state->status = TSS_READFILE;
958                         /* FALL THRU into READFILE case */
959
960                 case TSS_READFILE:
961                         *should_free = true;
962                         if (forward)
963                         {
964                                 if ((tuplen = getlen(state, true)) != 0)
965                                 {
966                                         tup = READTUP(state, tuplen);
967                                         return tup;
968                                 }
969                                 else
970                                 {
971                                         readptr->eof_reached = true;
972                                         return NULL;
973                                 }
974                         }
975
976                         /*
977                          * Backward.
978                          *
979                          * if all tuples are fetched already then we return last tuple,
980                          * else tuple before last returned.
981                          *
982                          * Back up to fetch previously-returned tuple's ending length
983                          * word. If seek fails, assume we are at start of file.
984                          */
985                         if (BufFileSeek(state->myfile, 0, -(long) sizeof(unsigned int),
986                                                         SEEK_CUR) != 0)
987                         {
988                                 /* even a failed backwards fetch gets you out of eof state */
989                                 readptr->eof_reached = false;
990                                 Assert(!state->truncated);
991                                 return NULL;
992                         }
993                         tuplen = getlen(state, false);
994
995                         if (readptr->eof_reached)
996                         {
997                                 readptr->eof_reached = false;
998                                 /* We will return the tuple returned before returning NULL */
999                         }
1000                         else
1001                         {
1002                                 /*
1003                                  * Back up to get ending length word of tuple before it.
1004                                  */
1005                                 if (BufFileSeek(state->myfile, 0,
1006                                                                 -(long) (tuplen + 2 * sizeof(unsigned int)),
1007                                                                 SEEK_CUR) != 0)
1008                                 {
1009                                         /*
1010                                          * If that fails, presumably the prev tuple is the first
1011                                          * in the file.  Back up so that it becomes next to read
1012                                          * in forward direction (not obviously right, but that is
1013                                          * what in-memory case does).
1014                                          */
1015                                         if (BufFileSeek(state->myfile, 0,
1016                                                                         -(long) (tuplen + sizeof(unsigned int)),
1017                                                                         SEEK_CUR) != 0)
1018                                                 ereport(ERROR,
1019                                                                 (errcode_for_file_access(),
1020                                                                  errmsg("could not seek in tuplestore temporary file: %m")));
1021                                         Assert(!state->truncated);
1022                                         return NULL;
1023                                 }
1024                                 tuplen = getlen(state, false);
1025                         }
1026
1027                         /*
1028                          * Now we have the length of the prior tuple, back up and read it.
1029                          * Note: READTUP expects we are positioned after the initial
1030                          * length word of the tuple, so back up to that point.
1031                          */
1032                         if (BufFileSeek(state->myfile, 0,
1033                                                         -(long) tuplen,
1034                                                         SEEK_CUR) != 0)
1035                                 ereport(ERROR,
1036                                                 (errcode_for_file_access(),
1037                                  errmsg("could not seek in tuplestore temporary file: %m")));
1038                         tup = READTUP(state, tuplen);
1039                         return tup;
1040
1041                 default:
1042                         elog(ERROR, "invalid tuplestore state");
1043                         return NULL;            /* keep compiler quiet */
1044         }
1045 }
1046
1047 /*
1048  * tuplestore_gettupleslot - exported function to fetch a MinimalTuple
1049  *
1050  * If successful, put tuple in slot and return TRUE; else, clear the slot
1051  * and return FALSE.
1052  *
1053  * If copy is TRUE, the slot receives a copied tuple (allocated in current
1054  * memory context) that will stay valid regardless of future manipulations of
1055  * the tuplestore's state.  If copy is FALSE, the slot may just receive a
1056  * pointer to a tuple held within the tuplestore.  The latter is more
1057  * efficient but the slot contents may be corrupted if additional writes to
1058  * the tuplestore occur.  (If using tuplestore_trim, see comments therein.)
1059  */
1060 bool
1061 tuplestore_gettupleslot(Tuplestorestate *state, bool forward,
1062                                                 bool copy, TupleTableSlot *slot)
1063 {
1064         MinimalTuple tuple;
1065         bool            should_free;
1066
1067         tuple = (MinimalTuple) tuplestore_gettuple(state, forward, &should_free);
1068
1069         if (tuple)
1070         {
1071                 if (copy && !should_free)
1072                 {
1073                         tuple = heap_copy_minimal_tuple(tuple);
1074                         should_free = true;
1075                 }
1076                 ExecStoreMinimalTuple(tuple, slot, should_free);
1077                 return true;
1078         }
1079         else
1080         {
1081                 ExecClearTuple(slot);
1082                 return false;
1083         }
1084 }
1085
1086 /*
1087  * tuplestore_advance - exported function to adjust position without fetching
1088  *
1089  * We could optimize this case to avoid palloc/pfree overhead, but for the
1090  * moment it doesn't seem worthwhile.
1091  */
1092 bool
1093 tuplestore_advance(Tuplestorestate *state, bool forward)
1094 {
1095         void       *tuple;
1096         bool            should_free;
1097
1098         tuple = tuplestore_gettuple(state, forward, &should_free);
1099
1100         if (tuple)
1101         {
1102                 if (should_free)
1103                         pfree(tuple);
1104                 return true;
1105         }
1106         else
1107         {
1108                 return false;
1109         }
1110 }
1111
1112 /*
1113  * Advance over N tuples in either forward or back direction,
1114  * without returning any data.  N<=0 is a no-op.
1115  * Returns TRUE if successful, FALSE if ran out of tuples.
1116  */
1117 bool
1118 tuplestore_skiptuples(Tuplestorestate *state, int64 ntuples, bool forward)
1119 {
1120         TSReadPointer *readptr = &state->readptrs[state->activeptr];
1121
1122         Assert(forward || (readptr->eflags & EXEC_FLAG_BACKWARD));
1123
1124         if (ntuples <= 0)
1125                 return true;
1126
1127         switch (state->status)
1128         {
1129                 case TSS_INMEM:
1130                         if (forward)
1131                         {
1132                                 if (readptr->eof_reached)
1133                                         return false;
1134                                 if (state->memtupcount - readptr->current >= ntuples)
1135                                 {
1136                                         readptr->current += ntuples;
1137                                         return true;
1138                                 }
1139                                 readptr->current = state->memtupcount;
1140                                 readptr->eof_reached = true;
1141                                 return false;
1142                         }
1143                         else
1144                         {
1145                                 if (readptr->eof_reached)
1146                                 {
1147                                         readptr->current = state->memtupcount;
1148                                         readptr->eof_reached = false;
1149                                         ntuples--;
1150                                 }
1151                                 if (readptr->current - state->memtupdeleted > ntuples)
1152                                 {
1153                                         readptr->current -= ntuples;
1154                                         return true;
1155                                 }
1156                                 Assert(!state->truncated);
1157                                 readptr->current = state->memtupdeleted;
1158                                 return false;
1159                         }
1160                         break;
1161
1162                 default:
1163                         /* We don't currently try hard to optimize other cases */
1164                         while (ntuples-- > 0)
1165                         {
1166                                 void       *tuple;
1167                                 bool            should_free;
1168
1169                                 tuple = tuplestore_gettuple(state, forward, &should_free);
1170
1171                                 if (tuple == NULL)
1172                                         return false;
1173                                 if (should_free)
1174                                         pfree(tuple);
1175                                 CHECK_FOR_INTERRUPTS();
1176                         }
1177                         return true;
1178         }
1179 }
1180
1181 /*
1182  * dumptuples - remove tuples from memory and write to tape
1183  *
1184  * As a side effect, we must convert each read pointer's position from
1185  * "current" to file/offset format.  But eof_reached pointers don't
1186  * need to change state.
1187  */
1188 static void
1189 dumptuples(Tuplestorestate *state)
1190 {
1191         int                     i;
1192
1193         for (i = state->memtupdeleted;; i++)
1194         {
1195                 TSReadPointer *readptr = state->readptrs;
1196                 int                     j;
1197
1198                 for (j = 0; j < state->readptrcount; readptr++, j++)
1199                 {
1200                         if (i == readptr->current && !readptr->eof_reached)
1201                                 BufFileTell(state->myfile,
1202                                                         &readptr->file, &readptr->offset);
1203                 }
1204                 if (i >= state->memtupcount)
1205                         break;
1206                 WRITETUP(state, state->memtuples[i]);
1207         }
1208         state->memtupdeleted = 0;
1209         state->memtupcount = 0;
1210 }
1211
1212 /*
1213  * tuplestore_rescan            - rewind the active read pointer to start
1214  */
1215 void
1216 tuplestore_rescan(Tuplestorestate *state)
1217 {
1218         TSReadPointer *readptr = &state->readptrs[state->activeptr];
1219
1220         Assert(readptr->eflags & EXEC_FLAG_REWIND);
1221         Assert(!state->truncated);
1222
1223         switch (state->status)
1224         {
1225                 case TSS_INMEM:
1226                         readptr->eof_reached = false;
1227                         readptr->current = 0;
1228                         break;
1229                 case TSS_WRITEFILE:
1230                         readptr->eof_reached = false;
1231                         readptr->file = 0;
1232                         readptr->offset = 0L;
1233                         break;
1234                 case TSS_READFILE:
1235                         readptr->eof_reached = false;
1236                         if (BufFileSeek(state->myfile, 0, 0L, SEEK_SET) != 0)
1237                                 ereport(ERROR,
1238                                                 (errcode_for_file_access(),
1239                                  errmsg("could not seek in tuplestore temporary file: %m")));
1240                         break;
1241                 default:
1242                         elog(ERROR, "invalid tuplestore state");
1243                         break;
1244         }
1245 }
1246
1247 /*
1248  * tuplestore_copy_read_pointer - copy a read pointer's state to another
1249  */
1250 void
1251 tuplestore_copy_read_pointer(Tuplestorestate *state,
1252                                                          int srcptr, int destptr)
1253 {
1254         TSReadPointer *sptr = &state->readptrs[srcptr];
1255         TSReadPointer *dptr = &state->readptrs[destptr];
1256
1257         Assert(srcptr >= 0 && srcptr < state->readptrcount);
1258         Assert(destptr >= 0 && destptr < state->readptrcount);
1259
1260         /* Assigning to self is a no-op */
1261         if (srcptr == destptr)
1262                 return;
1263
1264         if (dptr->eflags != sptr->eflags)
1265         {
1266                 /* Possible change of overall eflags, so copy and then recompute */
1267                 int                     eflags;
1268                 int                     i;
1269
1270                 *dptr = *sptr;
1271                 eflags = state->readptrs[0].eflags;
1272                 for (i = 1; i < state->readptrcount; i++)
1273                         eflags |= state->readptrs[i].eflags;
1274                 state->eflags = eflags;
1275         }
1276         else
1277                 *dptr = *sptr;
1278
1279         switch (state->status)
1280         {
1281                 case TSS_INMEM:
1282                 case TSS_WRITEFILE:
1283                         /* no work */
1284                         break;
1285                 case TSS_READFILE:
1286
1287                         /*
1288                          * This case is a bit tricky since the active read pointer's
1289                          * position corresponds to the seek point, not what is in its
1290                          * variables.  Assigning to the active requires a seek, and
1291                          * assigning from the active requires a tell, except when
1292                          * eof_reached.
1293                          */
1294                         if (destptr == state->activeptr)
1295                         {
1296                                 if (dptr->eof_reached)
1297                                 {
1298                                         if (BufFileSeek(state->myfile,
1299                                                                         state->writepos_file,
1300                                                                         state->writepos_offset,
1301                                                                         SEEK_SET) != 0)
1302                                                 ereport(ERROR,
1303                                                                 (errcode_for_file_access(),
1304                                                                  errmsg("could not seek in tuplestore temporary file: %m")));
1305                                 }
1306                                 else
1307                                 {
1308                                         if (BufFileSeek(state->myfile,
1309                                                                         dptr->file, dptr->offset,
1310                                                                         SEEK_SET) != 0)
1311                                                 ereport(ERROR,
1312                                                                 (errcode_for_file_access(),
1313                                                                  errmsg("could not seek in tuplestore temporary file: %m")));
1314                                 }
1315                         }
1316                         else if (srcptr == state->activeptr)
1317                         {
1318                                 if (!dptr->eof_reached)
1319                                         BufFileTell(state->myfile,
1320                                                                 &dptr->file,
1321                                                                 &dptr->offset);
1322                         }
1323                         break;
1324                 default:
1325                         elog(ERROR, "invalid tuplestore state");
1326                         break;
1327         }
1328 }
1329
1330 /*
1331  * tuplestore_trim      - remove all no-longer-needed tuples
1332  *
1333  * Calling this function authorizes the tuplestore to delete all tuples
1334  * before the oldest read pointer, if no read pointer is marked as requiring
1335  * REWIND capability.
1336  *
1337  * Note: this is obviously safe if no pointer has BACKWARD capability either.
1338  * If a pointer is marked as BACKWARD but not REWIND capable, it means that
1339  * the pointer can be moved backward but not before the oldest other read
1340  * pointer.
1341  */
1342 void
1343 tuplestore_trim(Tuplestorestate *state)
1344 {
1345         int                     oldest;
1346         int                     nremove;
1347         int                     i;
1348
1349         /*
1350          * Truncation is disallowed if any read pointer requires rewind
1351          * capability.
1352          */
1353         if (state->eflags & EXEC_FLAG_REWIND)
1354                 return;
1355
1356         /*
1357          * We don't bother trimming temp files since it usually would mean more
1358          * work than just letting them sit in kernel buffers until they age out.
1359          */
1360         if (state->status != TSS_INMEM)
1361                 return;
1362
1363         /* Find the oldest read pointer */
1364         oldest = state->memtupcount;
1365         for (i = 0; i < state->readptrcount; i++)
1366         {
1367                 if (!state->readptrs[i].eof_reached)
1368                         oldest = Min(oldest, state->readptrs[i].current);
1369         }
1370
1371         /*
1372          * Note: you might think we could remove all the tuples before the oldest
1373          * "current", since that one is the next to be returned.  However, since
1374          * tuplestore_gettuple returns a direct pointer to our internal copy of
1375          * the tuple, it's likely that the caller has still got the tuple just
1376          * before "current" referenced in a slot. So we keep one extra tuple
1377          * before the oldest "current".  (Strictly speaking, we could require such
1378          * callers to use the "copy" flag to tuplestore_gettupleslot, but for
1379          * efficiency we allow this one case to not use "copy".)
1380          */
1381         nremove = oldest - 1;
1382         if (nremove <= 0)
1383                 return;                                 /* nothing to do */
1384
1385         Assert(nremove >= state->memtupdeleted);
1386         Assert(nremove <= state->memtupcount);
1387
1388         /* Release no-longer-needed tuples */
1389         for (i = state->memtupdeleted; i < nremove; i++)
1390         {
1391                 FREEMEM(state, GetMemoryChunkSpace(state->memtuples[i]));
1392                 pfree(state->memtuples[i]);
1393                 state->memtuples[i] = NULL;
1394         }
1395         state->memtupdeleted = nremove;
1396
1397         /* mark tuplestore as truncated (used for Assert crosschecks only) */
1398         state->truncated = true;
1399
1400         /*
1401          * If nremove is less than 1/8th memtupcount, just stop here, leaving the
1402          * "deleted" slots as NULL.  This prevents us from expending O(N^2) time
1403          * repeatedly memmove-ing a large pointer array.  The worst case space
1404          * wastage is pretty small, since it's just pointers and not whole tuples.
1405          */
1406         if (nremove < state->memtupcount / 8)
1407                 return;
1408
1409         /*
1410          * Slide the array down and readjust pointers.
1411          *
1412          * In mergejoin's current usage, it's demonstrable that there will always
1413          * be exactly one non-removed tuple; so optimize that case.
1414          */
1415         if (nremove + 1 == state->memtupcount)
1416                 state->memtuples[0] = state->memtuples[nremove];
1417         else
1418                 memmove(state->memtuples, state->memtuples + nremove,
1419                                 (state->memtupcount - nremove) * sizeof(void *));
1420
1421         state->memtupdeleted = 0;
1422         state->memtupcount -= nremove;
1423         for (i = 0; i < state->readptrcount; i++)
1424         {
1425                 if (!state->readptrs[i].eof_reached)
1426                         state->readptrs[i].current -= nremove;
1427         }
1428 }
1429
1430 /*
1431  * tuplestore_in_memory
1432  *
1433  * Returns true if the tuplestore has not spilled to disk.
1434  *
1435  * XXX exposing this is a violation of modularity ... should get rid of it.
1436  */
1437 bool
1438 tuplestore_in_memory(Tuplestorestate *state)
1439 {
1440         return (state->status == TSS_INMEM);
1441 }
1442
1443
1444 /*
1445  * Tape interface routines
1446  */
1447
1448 static unsigned int
1449 getlen(Tuplestorestate *state, bool eofOK)
1450 {
1451         unsigned int len;
1452         size_t          nbytes;
1453
1454         nbytes = BufFileRead(state->myfile, (void *) &len, sizeof(len));
1455         if (nbytes == sizeof(len))
1456                 return len;
1457         if (nbytes != 0 || !eofOK)
1458                 ereport(ERROR,
1459                                 (errcode_for_file_access(),
1460                            errmsg("could not read from tuplestore temporary file: %m")));
1461         return 0;
1462 }
1463
1464
1465 /*
1466  * Routines specialized for HeapTuple case
1467  *
1468  * The stored form is actually a MinimalTuple, but for largely historical
1469  * reasons we allow COPYTUP to work from a HeapTuple.
1470  *
1471  * Since MinimalTuple already has length in its first word, we don't need
1472  * to write that separately.
1473  */
1474
1475 static void *
1476 copytup_heap(Tuplestorestate *state, void *tup)
1477 {
1478         MinimalTuple tuple;
1479
1480         tuple = minimal_tuple_from_heap_tuple((HeapTuple) tup);
1481         USEMEM(state, GetMemoryChunkSpace(tuple));
1482         return (void *) tuple;
1483 }
1484
1485 static void
1486 writetup_heap(Tuplestorestate *state, void *tup)
1487 {
1488         MinimalTuple tuple = (MinimalTuple) tup;
1489
1490         /* the part of the MinimalTuple we'll write: */
1491         char       *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
1492         unsigned int tupbodylen = tuple->t_len - MINIMAL_TUPLE_DATA_OFFSET;
1493
1494         /* total on-disk footprint: */
1495         unsigned int tuplen = tupbodylen + sizeof(int);
1496
1497         if (BufFileWrite(state->myfile, (void *) &tuplen,
1498                                          sizeof(tuplen)) != sizeof(tuplen))
1499                 ereport(ERROR,
1500                                 (errcode_for_file_access(),
1501                                  errmsg("could not write to tuplestore temporary file: %m")));
1502         if (BufFileWrite(state->myfile, (void *) tupbody,
1503                                          tupbodylen) != (size_t) tupbodylen)
1504                 ereport(ERROR,
1505                                 (errcode_for_file_access(),
1506                                  errmsg("could not write to tuplestore temporary file: %m")));
1507         if (state->backward)            /* need trailing length word? */
1508                 if (BufFileWrite(state->myfile, (void *) &tuplen,
1509                                                  sizeof(tuplen)) != sizeof(tuplen))
1510                         ereport(ERROR,
1511                                         (errcode_for_file_access(),
1512                                 errmsg("could not write to tuplestore temporary file: %m")));
1513
1514         FREEMEM(state, GetMemoryChunkSpace(tuple));
1515         heap_free_minimal_tuple(tuple);
1516 }
1517
1518 static void *
1519 readtup_heap(Tuplestorestate *state, unsigned int len)
1520 {
1521         unsigned int tupbodylen = len - sizeof(int);
1522         unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET;
1523         MinimalTuple tuple = (MinimalTuple) palloc(tuplen);
1524         char       *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
1525
1526         USEMEM(state, GetMemoryChunkSpace(tuple));
1527         /* read in the tuple proper */
1528         tuple->t_len = tuplen;
1529         if (BufFileRead(state->myfile, (void *) tupbody,
1530                                         tupbodylen) != (size_t) tupbodylen)
1531                 ereport(ERROR,
1532                                 (errcode_for_file_access(),
1533                            errmsg("could not read from tuplestore temporary file: %m")));
1534         if (state->backward)            /* need trailing length word? */
1535                 if (BufFileRead(state->myfile, (void *) &tuplen,
1536                                                 sizeof(tuplen)) != sizeof(tuplen))
1537                         ereport(ERROR,
1538                                         (errcode_for_file_access(),
1539                            errmsg("could not read from tuplestore temporary file: %m")));
1540         return (void *) tuple;
1541 }