]> granicus.if.org Git - postgresql/blob - src/backend/utils/sort/tuplestore.c
Restructure indexscan API (index_beginscan, index_getnext) per
[postgresql] / src / backend / utils / sort / tuplestore.c
1 /*-------------------------------------------------------------------------
2  *
3  * tuplestore.c
4  *        Generalized routines for temporary tuple storage.
5  *
6  * This module handles temporary storage of tuples for purposes such
7  * as Materialize nodes, hashjoin batch files, etc.  It is essentially
8  * a dumbed-down version of tuplesort.c; it does no sorting of tuples
9  * but can only store a sequence of tuples and regurgitate it later.
10  * A temporary file is used to handle the data if it exceeds the
11  * space limit specified by the caller.
12  *
13  * The (approximate) amount of memory allowed to the tuplestore is specified
14  * in kilobytes by the caller.  We absorb tuples and simply store them in an
15  * in-memory array as long as we haven't exceeded maxKBytes.  If we reach the
16  * end of the input without exceeding maxKBytes, we just return tuples during
17  * the read phase by scanning the tuple array sequentially.  If we do exceed
18  * maxKBytes, we dump all the tuples into a temp file and then read from that
19  * during the read phase.
20  *
21  * When the caller requests random access to the data, we write the temp file
22  * in a format that allows either forward or backward scan.
23  *
24  *
25  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
26  * Portions Copyright (c) 1994, Regents of the University of California
27  *
28  * IDENTIFICATION
29  *        $Header: /cvsroot/pgsql/src/backend/utils/sort/tuplestore.c,v 1.5 2001/10/28 06:25:57 momjian Exp $
30  *
31  *-------------------------------------------------------------------------
32  */
33
34 #include "postgres.h"
35
36 #include "access/heapam.h"
37 #include "storage/buffile.h"
38 #include "utils/tuplestore.h"
39
40 /*
41  * Possible states of a Tuplestore object.      These denote the states that
42  * persist between calls of Tuplestore routines.
43  */
44 typedef enum
45 {
46         TSS_INITIAL,                            /* Loading tuples; still within memory
47                                                                  * limit */
48         TSS_WRITEFILE,                          /* Loading tuples; writing to temp file */
49         TSS_READMEM,                            /* Reading tuples; entirely in memory */
50         TSS_READFILE                            /* Reading tuples from temp file */
51 } TupStoreStatus;
52
53 /*
54  * Private state of a Tuplestore operation.
55  */
56 struct Tuplestorestate
57 {
58         TupStoreStatus status;          /* enumerated value as shown above */
59         bool            randomAccess;   /* did caller request random access? */
60         long            availMem;               /* remaining memory available, in bytes */
61         BufFile    *myfile;                     /* underlying file, or NULL if none */
62
63         /*
64          * These function pointers decouple the routines that must know what
65          * kind of tuple we are handling from the routines that don't need to
66          * know it. They are set up by the tuplestore_begin_xxx routines.
67          *
68          * (Although tuplestore.c currently only supports heap tuples, I've
69          * copied this part of tuplesort.c so that extension to other kinds of
70          * objects will be easy if it's ever needed.)
71          *
72          * Function to copy a supplied input tuple into palloc'd space. (NB: we
73          * assume that a single pfree() is enough to release the tuple later,
74          * so the representation must be "flat" in one palloc chunk.)
75          * state->availMem must be decreased by the amount of space used.
76          */
77         void       *(*copytup) (Tuplestorestate *state, void *tup);
78
79         /*
80          * Function to write a stored tuple onto tape.  The representation of
81          * the tuple on tape need not be the same as it is in memory;
82          * requirements on the tape representation are given below.  After
83          * writing the tuple, pfree() it, and increase state->availMem by the
84          * amount of memory space thereby released.
85          */
86         void            (*writetup) (Tuplestorestate *state, void *tup);
87
88         /*
89          * Function to read a stored tuple from tape back into memory. 'len'
90          * is the already-read length of the stored tuple.      Create and return
91          * a palloc'd copy, and decrease state->availMem by the amount of
92          * memory space consumed.
93          */
94         void       *(*readtup) (Tuplestorestate *state, unsigned int len);
95
96         /*
97          * This array holds pointers to tuples in memory if we are in state
98          * INITIAL or READMEM.  In states WRITEFILE and READFILE it's not
99          * used.
100          */
101         void      **memtuples;          /* array of pointers to palloc'd tuples */
102         int                     memtupcount;    /* number of tuples currently present */
103         int                     memtupsize;             /* allocated length of memtuples array */
104
105         /*
106          * These variables are used after completion of storing to keep track
107          * of the next tuple to return.  (In the tape case, the tape's current
108          * read position is also critical state.)
109          */
110         int                     current;                /* array index (only used if READMEM) */
111         bool            eof_reached;    /* reached EOF (needed for cursors) */
112
113         /* markpos_xxx holds marked position for mark and restore */
114         int                     markpos_file;   /* file# (only used if READFILE) */
115         long            markpos_offset; /* saved "current", or offset in tape file */
116         bool            markpos_eof;    /* saved "eof_reached" */
117 };
118
119 #define COPYTUP(state,tup)      ((*(state)->copytup) (state, tup))
120 #define WRITETUP(state,tup) ((*(state)->writetup) (state, tup))
121 #define READTUP(state,len)      ((*(state)->readtup) (state, len))
122 #define LACKMEM(state)          ((state)->availMem < 0)
123 #define USEMEM(state,amt)       ((state)->availMem -= (amt))
124 #define FREEMEM(state,amt)      ((state)->availMem += (amt))
125
126 /*--------------------
127  *
128  * NOTES about on-tape representation of tuples:
129  *
130  * We require the first "unsigned int" of a stored tuple to be the total size
131  * on-tape of the tuple, including itself (so it is never zero; an all-zero
132  * unsigned int is used to delimit runs).  The remainder of the stored tuple
133  * may or may not match the in-memory representation of the tuple ---
134  * any conversion needed is the job of the writetup and readtup routines.
135  *
136  * If state->randomAccess is true, then the stored representation of the
137  * tuple must be followed by another "unsigned int" that is a copy of the
138  * length --- so the total tape space used is actually sizeof(unsigned int)
139  * more than the stored length value.  This allows read-backwards.      When
140  * randomAccess is not true, the write/read routines may omit the extra
141  * length word.
142  *
143  * writetup is expected to write both length words as well as the tuple
144  * data.  When readtup is called, the tape is positioned just after the
145  * front length word; readtup must read the tuple data and advance past
146  * the back length word (if present).
147  *
148  * The write/read routines can make use of the tuple description data
149  * stored in the Tuplestorestate record, if needed. They are also expected
150  * to adjust state->availMem by the amount of memory space (not tape space!)
151  * released or consumed.  There is no error return from either writetup
152  * or readtup; they should elog() on failure.
153  *
154  *
155  * NOTES about memory consumption calculations:
156  *
157  * We count space requested for tuples against the maxKBytes limit.
158  * Fixed-size space (primarily the BufFile I/O buffer) is not
159  * counted, nor do we count the variable-size memtuples array.
160  * (Even though that could grow pretty large, it should be small compared
161  * to the tuples proper, so this is not unreasonable.)
162  *
163  * The major deficiency in this approach is that it ignores palloc overhead.
164  * The memory space actually allocated for a palloc chunk is always more
165  * than the request size, and could be considerably more (as much as 2X
166  * larger, in the current aset.c implementation).  So the space used could
167  * be considerably more than maxKBytes says.
168  *
169  * One way to fix this is to add a memory management function that, given
170  * a pointer to a palloc'd chunk, returns the actual space consumed by the
171  * chunk.  This would be very easy in the current aset.c module, but I'm
172  * hesitant to do it because it might be unpleasant to support in future
173  * implementations of memory management.  (For example, a direct
174  * implementation of palloc as malloc could not support such a function
175  * portably.)
176  *
177  * A cruder answer is just to apply a fudge factor, say by initializing
178  * availMem to only three-quarters of what maxKBytes indicates.  This is
179  * probably the right answer if anyone complains that maxKBytes is not being
180  * obeyed very faithfully.
181  *
182  *--------------------
183  */
184
185
186 static Tuplestorestate *tuplestore_begin_common(bool randomAccess,
187                                                 int maxKBytes);
188 static void dumptuples(Tuplestorestate *state);
189 static unsigned int getlen(Tuplestorestate *state, bool eofOK);
190 static void markrunend(Tuplestorestate *state);
191 static void *copytup_heap(Tuplestorestate *state, void *tup);
192 static void writetup_heap(Tuplestorestate *state, void *tup);
193 static void *readtup_heap(Tuplestorestate *state, unsigned int len);
194
195
196 /*
197  *              tuplestore_begin_xxx
198  *
199  * Initialize for a tuple store operation.
200  *
201  * After calling tuplestore_begin, the caller should call tuplestore_puttuple
202  * zero or more times, then call tuplestore_donestoring when all the tuples
203  * have been supplied.  After donestoring, retrieve the tuples in order
204  * by calling tuplestore_gettuple until it returns NULL.  (If random
205  * access was requested, rescan, markpos, and restorepos can also be called.)
206  * Call tuplestore_end to terminate the operation and release memory/disk
207  * space.
208  */
209
210 static Tuplestorestate *
211 tuplestore_begin_common(bool randomAccess, int maxKBytes)
212 {
213         Tuplestorestate *state;
214
215         state = (Tuplestorestate *) palloc(sizeof(Tuplestorestate));
216
217         MemSet((char *) state, 0, sizeof(Tuplestorestate));
218
219         state->status = TSS_INITIAL;
220         state->randomAccess = randomAccess;
221         state->availMem = maxKBytes * 1024L;
222         state->myfile = NULL;
223
224         state->memtupcount = 0;
225         if (maxKBytes > 0)
226                 state->memtupsize = 1024;               /* initial guess */
227         else
228                 state->memtupsize = 1;  /* won't really need any space */
229         state->memtuples = (void **) palloc(state->memtupsize * sizeof(void *));
230
231         return state;
232 }
233
234 Tuplestorestate *
235 tuplestore_begin_heap(bool randomAccess, int maxKBytes)
236 {
237         Tuplestorestate *state = tuplestore_begin_common(randomAccess, maxKBytes);
238
239         state->copytup = copytup_heap;
240         state->writetup = writetup_heap;
241         state->readtup = readtup_heap;
242
243         return state;
244 }
245
246 /*
247  * tuplestore_end
248  *
249  *      Release resources and clean up.
250  */
251 void
252 tuplestore_end(Tuplestorestate *state)
253 {
254         int                     i;
255
256         if (state->myfile)
257                 BufFileClose(state->myfile);
258         if (state->memtuples)
259         {
260                 for (i = 0; i < state->memtupcount; i++)
261                         pfree(state->memtuples[i]);
262                 pfree(state->memtuples);
263         }
264 }
265
266 /*
267  * Accept one tuple while collecting input data.
268  *
269  * Note that the input tuple is always copied; the caller need not save it.
270  */
271 void
272 tuplestore_puttuple(Tuplestorestate *state, void *tuple)
273 {
274         /*
275          * Copy the tuple.      (Must do this even in WRITEFILE case.)
276          */
277         tuple = COPYTUP(state, tuple);
278
279         switch (state->status)
280         {
281                 case TSS_INITIAL:
282
283                         /*
284                          * Stash the tuple in the in-memory array.
285                          */
286                         if (state->memtupcount >= state->memtupsize)
287                         {
288                                 /* Grow the array as needed. */
289                                 state->memtupsize *= 2;
290                                 state->memtuples = (void **)
291                                         repalloc(state->memtuples,
292                                                          state->memtupsize * sizeof(void *));
293                         }
294                         state->memtuples[state->memtupcount++] = tuple;
295
296                         /*
297                          * Done if we still fit in available memory.
298                          */
299                         if (!LACKMEM(state))
300                                 return;
301
302                         /*
303                          * Nope; time to switch to tape-based operation.
304                          */
305                         state->myfile = BufFileCreateTemp();
306                         state->status = TSS_WRITEFILE;
307                         dumptuples(state);
308                         break;
309                 case TSS_WRITEFILE:
310                         WRITETUP(state, tuple);
311                         break;
312                 default:
313                         elog(ERROR, "tuplestore_puttuple: invalid state");
314                         break;
315         }
316 }
317
318 /*
319  * All tuples have been provided; finish writing.
320  */
321 void
322 tuplestore_donestoring(Tuplestorestate *state)
323 {
324         switch (state->status)
325         {
326                 case TSS_INITIAL:
327
328                         /*
329                          * We were able to accumulate all the tuples within the
330                          * allowed amount of memory.  Just set up to scan them.
331                          */
332                         state->current = 0;
333                         state->eof_reached = false;
334                         state->markpos_offset = 0L;
335                         state->markpos_eof = false;
336                         state->status = TSS_READMEM;
337                         break;
338                 case TSS_WRITEFILE:
339
340                         /*
341                          * Write the EOF marker.
342                          */
343                         markrunend(state);
344
345                         /*
346                          * Set up for reading from tape.
347                          */
348                         if (BufFileSeek(state->myfile, 0, 0L, SEEK_SET) != 0)
349                                 elog(ERROR, "tuplestore_donestoring: seek(0) failed");
350                         state->eof_reached = false;
351                         state->markpos_file = 0;
352                         state->markpos_offset = 0L;
353                         state->markpos_eof = false;
354                         state->status = TSS_READFILE;
355                         break;
356                 default:
357                         elog(ERROR, "tuplestore_donestoring: invalid state");
358                         break;
359         }
360 }
361
362 /*
363  * Fetch the next tuple in either forward or back direction.
364  * Returns NULL if no more tuples.      If should_free is set, the
365  * caller must pfree the returned tuple when done with it.
366  */
367 void *
368 tuplestore_gettuple(Tuplestorestate *state, bool forward,
369                                         bool *should_free)
370 {
371         unsigned int tuplen;
372         void       *tup;
373
374         switch (state->status)
375         {
376                 case TSS_READMEM:
377                         Assert(forward || state->randomAccess);
378                         *should_free = false;
379                         if (forward)
380                         {
381                                 if (state->current < state->memtupcount)
382                                         return state->memtuples[state->current++];
383                                 state->eof_reached = true;
384                                 return NULL;
385                         }
386                         else
387                         {
388                                 if (state->current <= 0)
389                                         return NULL;
390
391                                 /*
392                                  * if all tuples are fetched already then we return last
393                                  * tuple, else - tuple before last returned.
394                                  */
395                                 if (state->eof_reached)
396                                         state->eof_reached = false;
397                                 else
398                                 {
399                                         state->current--;       /* last returned tuple */
400                                         if (state->current <= 0)
401                                                 return NULL;
402                                 }
403                                 return state->memtuples[state->current - 1];
404                         }
405                         break;
406
407                 case TSS_READFILE:
408                         Assert(forward || state->randomAccess);
409                         *should_free = true;
410                         if (forward)
411                         {
412                                 if (state->eof_reached)
413                                         return NULL;
414                                 if ((tuplen = getlen(state, true)) != 0)
415                                 {
416                                         tup = READTUP(state, tuplen);
417                                         return tup;
418                                 }
419                                 else
420                                 {
421                                         state->eof_reached = true;
422                                         return NULL;
423                                 }
424                         }
425
426                         /*
427                          * Backward.
428                          *
429                          * if all tuples are fetched already then we return last tuple,
430                          * else - tuple before last returned.
431                          */
432                         if (state->eof_reached)
433                         {
434                                 /*
435                                  * Seek position is pointing just past the zero tuplen at
436                                  * the end of file; back up to fetch last tuple's ending
437                                  * length word.  If seek fails we must have a completely
438                                  * empty file.
439                                  */
440                                 if (BufFileSeek(state->myfile, 0,
441                                                                 -(long) (2 * sizeof(unsigned int)),
442                                                                 SEEK_CUR) != 0)
443                                         return NULL;
444                                 state->eof_reached = false;
445                         }
446                         else
447                         {
448                                 /*
449                                  * Back up and fetch previously-returned tuple's ending
450                                  * length word.  If seek fails, assume we are at start of
451                                  * file.
452                                  */
453                                 if (BufFileSeek(state->myfile, 0,
454                                                                 -(long) sizeof(unsigned int),
455                                                                 SEEK_CUR) != 0)
456                                         return NULL;
457                                 tuplen = getlen(state, false);
458
459                                 /*
460                                  * Back up to get ending length word of tuple before it.
461                                  */
462                                 if (BufFileSeek(state->myfile, 0,
463                                                          -(long) (tuplen + 2 * sizeof(unsigned int)),
464                                                                 SEEK_CUR) != 0)
465                                 {
466                                         /*
467                                          * If that fails, presumably the prev tuple is the
468                                          * first in the file.  Back up so that it becomes next
469                                          * to read in forward direction (not obviously right,
470                                          * but that is what in-memory case does).
471                                          */
472                                         if (BufFileSeek(state->myfile, 0,
473                                                                  -(long) (tuplen + sizeof(unsigned int)),
474                                                                         SEEK_CUR) != 0)
475                                                 elog(ERROR, "tuplestore_gettuple: bogus tuple len in backward scan");
476                                         return NULL;
477                                 }
478                         }
479
480                         tuplen = getlen(state, false);
481
482                         /*
483                          * Now we have the length of the prior tuple, back up and read
484                          * it. Note: READTUP expects we are positioned after the
485                          * initial length word of the tuple, so back up to that point.
486                          */
487                         if (BufFileSeek(state->myfile, 0,
488                                                         -(long) tuplen,
489                                                         SEEK_CUR) != 0)
490                                 elog(ERROR, "tuplestore_gettuple: bogus tuple len in backward scan");
491                         tup = READTUP(state, tuplen);
492                         return tup;
493
494                 default:
495                         elog(ERROR, "tuplestore_gettuple: invalid state");
496                         return NULL;            /* keep compiler quiet */
497         }
498 }
499
500 /*
501  * dumptuples - remove tuples from memory and write to tape
502  */
503 static void
504 dumptuples(Tuplestorestate *state)
505 {
506         int                     i;
507
508         for (i = 0; i < state->memtupcount; i++)
509                 WRITETUP(state, state->memtuples[i]);
510         state->memtupcount = 0;
511 }
512
513 /*
514  * tuplestore_rescan            - rewind and replay the scan
515  */
516 void
517 tuplestore_rescan(Tuplestorestate *state)
518 {
519         Assert(state->randomAccess);
520
521         switch (state->status)
522         {
523                 case TSS_READMEM:
524                         state->current = 0;
525                         state->eof_reached = false;
526                         state->markpos_offset = 0L;
527                         state->markpos_eof = false;
528                         break;
529                 case TSS_READFILE:
530                         if (BufFileSeek(state->myfile, 0, 0L, SEEK_SET) != 0)
531                                 elog(ERROR, "tuplestore_rescan: seek(0) failed");
532                         state->eof_reached = false;
533                         state->markpos_file = 0;
534                         state->markpos_offset = 0L;
535                         state->markpos_eof = false;
536                         break;
537                 default:
538                         elog(ERROR, "tuplestore_rescan: invalid state");
539                         break;
540         }
541 }
542
543 /*
544  * tuplestore_markpos   - saves current position in the tuple sequence
545  */
546 void
547 tuplestore_markpos(Tuplestorestate *state)
548 {
549         Assert(state->randomAccess);
550
551         switch (state->status)
552         {
553                 case TSS_READMEM:
554                         state->markpos_offset = state->current;
555                         state->markpos_eof = state->eof_reached;
556                         break;
557                 case TSS_READFILE:
558                         BufFileTell(state->myfile,
559                                                 &state->markpos_file,
560                                                 &state->markpos_offset);
561                         state->markpos_eof = state->eof_reached;
562                         break;
563                 default:
564                         elog(ERROR, "tuplestore_markpos: invalid state");
565                         break;
566         }
567 }
568
569 /*
570  * tuplestore_restorepos - restores current position in tuple sequence to
571  *                                                last saved position
572  */
573 void
574 tuplestore_restorepos(Tuplestorestate *state)
575 {
576         Assert(state->randomAccess);
577
578         switch (state->status)
579         {
580                 case TSS_READMEM:
581                         state->current = (int) state->markpos_offset;
582                         state->eof_reached = state->markpos_eof;
583                         break;
584                 case TSS_READFILE:
585                         if (BufFileSeek(state->myfile,
586                                                         state->markpos_file,
587                                                         state->markpos_offset,
588                                                         SEEK_SET) != 0)
589                                 elog(ERROR, "tuplestore_restorepos failed");
590                         state->eof_reached = state->markpos_eof;
591                         break;
592                 default:
593                         elog(ERROR, "tuplestore_restorepos: invalid state");
594                         break;
595         }
596 }
597
598
599 /*
600  * Tape interface routines
601  */
602
603 static unsigned int
604 getlen(Tuplestorestate *state, bool eofOK)
605 {
606         unsigned int len;
607
608         if (BufFileRead(state->myfile, (void *) &len, sizeof(len)) != sizeof(len))
609                 elog(ERROR, "tuplestore: unexpected end of tape");
610         if (len == 0 && !eofOK)
611                 elog(ERROR, "tuplestore: unexpected end of data");
612         return len;
613 }
614
615 static void
616 markrunend(Tuplestorestate *state)
617 {
618         unsigned int len = 0;
619
620         if (BufFileWrite(state->myfile, (void *) &len, sizeof(len)) != sizeof(len))
621                 elog(ERROR, "tuplestore: write failed");
622 }
623
624
625 /*
626  * Routines specialized for HeapTuple case
627  */
628
629 static void *
630 copytup_heap(Tuplestorestate *state, void *tup)
631 {
632         HeapTuple       tuple = (HeapTuple) tup;
633
634         USEMEM(state, HEAPTUPLESIZE + tuple->t_len);
635         return (void *) heap_copytuple(tuple);
636 }
637
638 /*
639  * We don't bother to write the HeapTupleData part of the tuple.
640  */
641
642 static void
643 writetup_heap(Tuplestorestate *state, void *tup)
644 {
645         HeapTuple       tuple = (HeapTuple) tup;
646         unsigned int tuplen;
647
648         tuplen = tuple->t_len + sizeof(tuplen);
649         if (BufFileWrite(state->myfile, (void *) &tuplen,
650                                          sizeof(tuplen)) != sizeof(tuplen))
651                 elog(ERROR, "tuplestore: write failed");
652         if (BufFileWrite(state->myfile, (void *) tuple->t_data,
653                                          tuple->t_len) != (size_t) tuple->t_len)
654                 elog(ERROR, "tuplestore: write failed");
655         if (state->randomAccess)        /* need trailing length word? */
656                 if (BufFileWrite(state->myfile, (void *) &tuplen,
657                                                  sizeof(tuplen)) != sizeof(tuplen))
658                         elog(ERROR, "tuplestore: write failed");
659
660         FREEMEM(state, HEAPTUPLESIZE + tuple->t_len);
661         heap_freetuple(tuple);
662 }
663
664 static void *
665 readtup_heap(Tuplestorestate *state, unsigned int len)
666 {
667         unsigned int tuplen = len - sizeof(unsigned int) + HEAPTUPLESIZE;
668         HeapTuple       tuple = (HeapTuple) palloc(tuplen);
669
670         USEMEM(state, tuplen);
671         /* reconstruct the HeapTupleData portion */
672         tuple->t_len = len - sizeof(unsigned int);
673         ItemPointerSetInvalid(&(tuple->t_self));
674         tuple->t_datamcxt = CurrentMemoryContext;
675         tuple->t_data = (HeapTupleHeader) (((char *) tuple) + HEAPTUPLESIZE);
676         /* read in the tuple proper */
677         if (BufFileRead(state->myfile, (void *) tuple->t_data,
678                                         tuple->t_len) != (size_t) tuple->t_len)
679                 elog(ERROR, "tuplestore: unexpected end of data");
680         if (state->randomAccess)        /* need trailing length word? */
681                 if (BufFileRead(state->myfile, (void *) &tuplen,
682                                                 sizeof(tuplen)) != sizeof(tuplen))
683                         elog(ERROR, "tuplestore: unexpected end of data");
684         return (void *) tuple;
685 }